From a9862ea7451510b56ba41307d8538d481d8d5844 Mon Sep 17 00:00:00 2001 From: Codex Date: Sun, 28 Jun 2026 10:07:15 +0000 Subject: [PATCH] feat: add kafka shadow produce management cli --- config/platform-infra/kafka.yaml | 10 +- docs/reference/platform-infra.md | 4 +- scripts/src/platform-infra-kafka.ts | 692 +++++++++++++++++++++++++++- scripts/src/platform-infra/entry.ts | 5 + 4 files changed, 700 insertions(+), 11 deletions(-) diff --git a/config/platform-infra/kafka.yaml b/config/platform-infra/kafka.yaml index 8e92db9a..4fcfb3e6 100644 --- a/config/platform-infra/kafka.yaml +++ b/config/platform-infra/kafka.yaml @@ -12,8 +12,10 @@ defaults: targetId: D518 switch: enabled: true - mode: runtime-poc + mode: shadow-produce-only appIntegrationEnabled: false + shadowProduceEnabled: true + shadowConsumeEnabled: false operator: implementation: strimzi @@ -116,3 +118,9 @@ validation: timeoutSeconds: 45 pollSeconds: 3 smokeTopic: platform-infra.kafka.smoke.v1 + +management: + defaultTailLimit: 10 + maxTailLimit: 50 + defaultShadowTopic: hwlab.agentrun.command.v1 + shadowProducerId: unidesk-platform-infra-kafka-cli diff --git a/docs/reference/platform-infra.md b/docs/reference/platform-infra.md index 9e77f1e7..468ea925 100644 --- a/docs/reference/platform-infra.md +++ b/docs/reference/platform-infra.md @@ -37,8 +37,8 @@ - Kafka for the HWLAB v0.3 / AgentRun v0.2 event-bus POC is a UniDesk-operated platform service in namespace `platform-infra`. It is not owned by `hwlab-v03`, `agentrun-v02`, a per-lane Kafka namespace, or a service repository deployment file. - The canonical source of truth is `config/platform-infra/kafka.yaml`; target, namespace, Strimzi release URL, cluster name, storage class/size, topic list, client declarations, DLQ names, runtime switch and validation smoke topic must stay in that YAML. Current version numbers and retention values belong only in YAML, not in this reference. -- The canonical entrypoint is `bun scripts/cli.ts platform-infra kafka plan|apply|status|validate --target D518`. Formal mutation must use that path; raw `kubectl` is bounded diagnosis only. -- HWLAB v0.3 and AgentRun v0.2 are client namespaces. They may later consume YAML-declared Kafka bootstrap, user Secret metadata and topic contracts, but app producer/consumer switchover must be a separate HWLAB/AgentRun implementation stage. Runtime readiness alone does not prove Workbench projection, SSE or AgentRun command ingestion has migrated. +- The canonical entrypoint is `bun scripts/cli.ts platform-infra kafka plan|apply|status|validate|topics|groups|offsets|tail|produce --target `; `--node ` is an equivalent selector for node-targeted operations. Formal mutation must use that path; raw `kubectl` is bounded diagnosis only. +- HWLAB v0.3 and AgentRun v0.2 are client namespaces. They may later consume YAML-declared Kafka bootstrap, user Secret metadata and topic contracts, but app producer/consumer switchover must be a separate HWLAB/AgentRun implementation stage. Runtime readiness alone does not prove Workbench projection, SSE or AgentRun command ingestion has migrated. Shadow produce may write Kafka events for observation only when YAML keeps consumer cutover disabled; it must not replace the current business read path. - The first POC is a single-node KRaft broker for observability, ordering and replay investigation. It improves auditability and smoke coverage, but it is not a production high-availability claim; replication, backup, min ISR and app-side transactional inbox/outbox are separate decisions. - Kafka must stay ClusterIP-only by default. Do not add Ingress, NodePort, LoadBalancer, host networking, public FRP, or browser-facing Kafka access unless a later YAML-controlled platform decision explicitly changes that boundary. diff --git a/scripts/src/platform-infra-kafka.ts b/scripts/src/platform-infra-kafka.ts index 88445dc8..6867f2e0 100644 --- a/scripts/src/platform-infra-kafka.ts +++ b/scripts/src/platform-infra-kafka.ts @@ -8,6 +8,7 @@ import { parseJsonOutput, readYamlRecord, sha256Hex, + shQuote, } from "./platform-infra-ops-library"; const configFile = rootPath("config", "platform-infra", "kafka.yaml"); @@ -60,6 +61,8 @@ interface PlatformKafkaConfig { enabled: boolean; mode: string; appIntegrationEnabled: boolean; + shadowProduceEnabled: boolean; + shadowConsumeEnabled: boolean; }; }; operator: { @@ -97,6 +100,12 @@ interface PlatformKafkaConfig { pollSeconds: number; smokeTopic: string; }; + management: { + defaultTailLimit: number; + maxTailLimit: number; + defaultShadowTopic: string; + shadowProducerId: string; + }; } interface CommonOptions { @@ -111,6 +120,20 @@ interface ApplyOptions extends CommonOptions { wait: boolean; } +interface KafkaInspectOptions extends CommonOptions { + topic: string | null; + group: string | null; + limit: number | null; +} + +interface KafkaProduceOptions extends CommonOptions { + topic: string | null; + key: string | null; + source: string; + eventType: string; + payload: string | null; +} + export async function runPlatformInfraKafkaCommand(config: UniDeskConfig, args: string[]): Promise | RenderedCliResult> { const [action = "plan"] = args; if (action === "plan") { @@ -125,6 +148,27 @@ export async function runPlatformInfraKafkaCommand(config: UniDeskConfig, args: return options.full || options.raw ? result : renderStatus(result); } if (action === "validate") return await validate(config, parseCommonOptions(args.slice(1))); + if (action === "topics") { + const options = parseInspectOptions(args.slice(1)); + const result = await topics(config, options); + return options.full || options.raw ? result : renderTopics(result); + } + if (action === "groups") { + const options = parseInspectOptions(args.slice(1)); + const result = await groups(config, options); + return options.full || options.raw ? result : renderGroups(result); + } + if (action === "offsets") { + const options = parseInspectOptions(args.slice(1)); + const result = await offsets(config, options); + return options.full || options.raw ? result : renderOffsets(result); + } + if (action === "tail") { + const options = parseInspectOptions(args.slice(1)); + const result = await tail(config, options); + return options.full || options.raw ? result : renderTail(result); + } + if (action === "produce") return await produce(config, await parseProduceOptions(args.slice(1))); return { ok: false, error: "unsupported-platform-infra-kafka-command", @@ -135,16 +179,22 @@ export async function runPlatformInfraKafkaCommand(config: UniDeskConfig, args: export function kafkaHelp(): Record { return { - command: "platform-infra kafka plan|apply|status|validate", + command: "platform-infra kafka plan|apply|status|validate|topics|groups|offsets|tail|produce", configTruth: configLabel, usage: [ "bun scripts/cli.ts platform-infra kafka plan --target D518", + "bun scripts/cli.ts platform-infra kafka plan --node D518", "bun scripts/cli.ts platform-infra kafka apply --target D518 --dry-run", "bun scripts/cli.ts platform-infra kafka apply --target D518 --confirm", "bun scripts/cli.ts platform-infra kafka status --target D518 [--full|--raw]", "bun scripts/cli.ts platform-infra kafka validate --target D518 [--full|--raw]", + "bun scripts/cli.ts platform-infra kafka topics --node D518", + "bun scripts/cli.ts platform-infra kafka groups --node D518", + "bun scripts/cli.ts platform-infra kafka offsets --node D518 [--topic hwlab.agentrun.command.v1]", + "bun scripts/cli.ts platform-infra kafka tail --node D518 --topic hwlab.agentrun.command.v1 --limit 5", + "bun scripts/cli.ts platform-infra kafka produce --node D518 --topic hwlab.agentrun.command.v1 --key ", ], - boundary: "Kafka runtime belongs to platform-infra. HWLAB v0.3 and AgentRun v0.2 are client namespaces only.", + boundary: "Kafka runtime belongs to platform-infra. P2 supports shadow produce/query only; Kafka consumer cutover stays disabled.", }; } @@ -178,6 +228,8 @@ function readKafkaConfig(): PlatformKafkaConfig { enabled: y.booleanField(switchRecord, "enabled", "defaults.switch"), mode: y.stringField(switchRecord, "mode", "defaults.switch"), appIntegrationEnabled: y.booleanField(switchRecord, "appIntegrationEnabled", "defaults.switch"), + shadowProduceEnabled: y.booleanField(switchRecord, "shadowProduceEnabled", "defaults.switch"), + shadowConsumeEnabled: y.booleanField(switchRecord, "shadowConsumeEnabled", "defaults.switch"), }, }, operator: { @@ -215,6 +267,7 @@ function readKafkaConfig(): PlatformKafkaConfig { pollSeconds: y.integerField(validation, "pollSeconds", "validation"), smokeTopic: y.stringField(validation, "smokeTopic", "validation"), }, + management: parseManagement(root), }; validateConfig(parsed); return parsed; @@ -259,6 +312,16 @@ function parseClient(record: Record, index: number): KafkaClien }; } +function parseManagement(root: Record): PlatformKafkaConfig["management"] { + const management = y.objectField(root, "management", ""); + return { + defaultTailLimit: positiveInteger(management, "defaultTailLimit", "management"), + maxTailLimit: positiveInteger(management, "maxTailLimit", "management"), + defaultShadowTopic: topicNameField(management, "defaultShadowTopic", "management"), + shadowProducerId: y.stringField(management, "shadowProducerId", "management"), + }; +} + function validateConfig(kafka: PlatformKafkaConfig): void { resolveTarget(kafka, kafka.defaults.targetId); if (!/^https:\/\/github\.com\/strimzi\/strimzi-kafka-operator\/releases\/download\//u.test(kafka.operator.manifestUrl)) { @@ -267,6 +330,9 @@ function validateConfig(kafka: PlatformKafkaConfig): void { if (kafka.cluster.replicas !== 1) throw new Error(`${configLabel}.cluster.replicas must be 1 for the D518 POC`); if (!/^[0-9]+Gi$/u.test(kafka.cluster.storage.size)) throw new Error(`${configLabel}.cluster.storage.size must be a Gi quantity`); if (!kafka.topics.some((topic) => topic.name === kafka.validation.smokeTopic)) throw new Error(`${configLabel}.validation.smokeTopic must reference one of topics[].name`); + if (!kafka.topics.some((topic) => topic.name === kafka.management.defaultShadowTopic)) throw new Error(`${configLabel}.management.defaultShadowTopic must reference one of topics[].name`); + if (kafka.management.defaultTailLimit > kafka.management.maxTailLimit) throw new Error(`${configLabel}.management.defaultTailLimit must be <= management.maxTailLimit`); + if (kafka.defaults.switch.shadowConsumeEnabled) throw new Error(`${configLabel}.defaults.switch.shadowConsumeEnabled must stay false for shadow-produce-only stage`); const topicNames = new Set(); for (const topic of kafka.topics) { if (topicNames.has(topic.name)) throw new Error(`${configLabel}.topics contains duplicate topic ${topic.name}`); @@ -391,6 +457,92 @@ async function validate(config: UniDeskConfig, options: CommonOptions): Promise< target: targetSummary(target), config: compactConfigSummary(kafka, target), validation: parsed ?? null, + remote: options.raw && parsed !== null ? parsed : compactCapture(result, { full: options.full || result.exitCode !== 0 }), + }; +} + +async function topics(config: UniDeskConfig, options: KafkaInspectOptions): Promise> { + const kafka = readKafkaConfig(); + const target = resolveTarget(kafka, options.targetId); + const result = await capture(config, target.route, ["sh"], topicsScript(kafka, target)); + const parsed = parseJsonOutput(result.stdout); + return { + ok: result.exitCode === 0 && parsed?.ok === true, + action: "platform-infra-kafka-topics", + mutation: false, + target: targetSummary(target), + config: compactConfigSummary(kafka, target), + query: parsed ?? null, + remote: options.raw && parsed !== null ? parsed : compactCapture(result, { full: options.full || result.exitCode !== 0 }), + }; +} + +async function groups(config: UniDeskConfig, options: KafkaInspectOptions): Promise> { + const kafka = readKafkaConfig(); + const target = resolveTarget(kafka, options.targetId); + const result = await capture(config, target.route, ["sh"], groupsScript(kafka, target)); + const parsed = parseJsonOutput(result.stdout); + return { + ok: result.exitCode === 0 && parsed?.ok === true, + action: "platform-infra-kafka-groups", + mutation: false, + target: targetSummary(target), + config: compactConfigSummary(kafka, target), + query: parsed ?? null, + remote: options.raw && parsed !== null ? parsed : compactCapture(result, { full: options.full || result.exitCode !== 0 }), + }; +} + +async function offsets(config: UniDeskConfig, options: KafkaInspectOptions): Promise> { + const kafka = readKafkaConfig(); + const target = resolveTarget(kafka, options.targetId); + const topic = resolveTopic(kafka, options.topic, null); + const result = await capture(config, target.route, ["sh"], offsetsScript(kafka, target, topic, options.group)); + const parsed = parseJsonOutput(result.stdout); + return { + ok: result.exitCode === 0 && parsed?.ok === true, + action: "platform-infra-kafka-offsets", + mutation: false, + target: targetSummary(target), + config: compactConfigSummary(kafka, target), + query: parsed ?? null, + remote: options.raw && parsed !== null ? parsed : compactCapture(result, { full: options.full || result.exitCode !== 0 }), + }; +} + +async function tail(config: UniDeskConfig, options: KafkaInspectOptions): Promise> { + const kafka = readKafkaConfig(); + const target = resolveTarget(kafka, options.targetId); + const topic = resolveTopic(kafka, options.topic, kafka.management.defaultShadowTopic); + const limit = boundedLimit(kafka, options.limit); + const result = await capture(config, target.route, ["sh"], tailScript(kafka, target, topic, limit)); + const parsed = parseJsonOutput(result.stdout); + return { + ok: result.exitCode === 0 && parsed?.ok === true, + action: "platform-infra-kafka-tail", + mutation: false, + target: targetSummary(target), + config: compactConfigSummary(kafka, target), + query: parsed ?? null, + remote: options.raw && parsed !== null ? parsed : compactCapture(result, { full: options.full || result.exitCode !== 0 }), + }; +} + +async function produce(config: UniDeskConfig, options: KafkaProduceOptions): Promise> { + const kafka = readKafkaConfig(); + const target = resolveTarget(kafka, options.targetId); + const topic = resolveTopic(kafka, options.topic, kafka.management.defaultShadowTopic); + const payload = buildShadowPayload(kafka, target, options, topic); + const result = await capture(config, target.route, ["sh"], produceScript(kafka, target, topic, options.key, payload)); + const parsed = parseJsonOutput(result.stdout); + return { + ok: result.exitCode === 0 && parsed?.ok === true, + action: "platform-infra-kafka-produce", + mutation: true, + mode: "shadow-produce-only", + target: targetSummary(target), + config: compactConfigSummary(kafka, target), + production: parsed ?? null, remote: options.raw ? parsed : compactCapture(result, { full: options.full || result.exitCode !== 0 }), }; } @@ -922,6 +1074,308 @@ PY `; } +function topicsScript(kafka: PlatformKafkaConfig, target: KafkaTarget): string { + return ` +set -u +tmp="$(mktemp -d)" +trap 'rm -rf "$tmp"' EXIT +pod="$(kubectl -n ${target.namespace} get pod -l strimzi.io/cluster=${kafka.cluster.name},strimzi.io/kind=Kafka -o jsonpath='{.items[0].metadata.name}' 2>"$tmp/pod.err" || true)" +if [ -n "$pod" ]; then + kubectl -n ${target.namespace} exec "$pod" -- bin/kafka-topics.sh --bootstrap-server ${brokerBootstrap(kafka)} --describe >"$tmp/topics.out" 2>"$tmp/topics.err" + topics_rc=$? +else + topics_rc=1 + printf '%s\\n' 'kafka broker pod not found' >"$tmp/topics.err" + : >"$tmp/topics.out" +fi +python3 - "$tmp" "$pod" "$topics_rc" <<'PY' +import json, os, re, sys +tmp, pod, rc_text = sys.argv[1], sys.argv[2], sys.argv[3] +configured = ${JSON.stringify(kafka.topics.map((topic) => topic.name))} +def text(name, limit=1600): + try: + return open(os.path.join(tmp, name), encoding="utf-8", errors="replace").read()[-limit:] + except FileNotFoundError: + return "" +topics = {} +for raw in text("topics.out", 200000).splitlines(): + line = raw.strip() + if not line.startswith("Topic:"): + continue + parts = [part.strip() for part in line.split("\\t") if part.strip()] + data = {} + for part in parts: + if ":" in part: + key, value = part.split(":", 1) + data[key.strip()] = value.strip() + name = data.get("Topic") + if not name or name == "__consumer_offsets": + continue + topic = topics.setdefault(name, {"name": name, "partitions": [], "configured": name in configured}) + if "PartitionCount" in data: + topic["partitionCount"] = int(data.get("PartitionCount") or 0) + topic["replicationFactor"] = int(data.get("ReplicationFactor") or 0) + topic["configs"] = data.get("Configs") or "" + if "Partition" in data: + topic["partitions"].append({ + "partition": int(data.get("Partition") or 0), + "leader": data.get("Leader"), + "replicas": [item for item in re.split(r",\\s*", data.get("Replicas") or "") if item], + "isr": [item for item in re.split(r",\\s*", data.get("Isr") or "") if item], + }) +for name in configured: + topics.setdefault(name, {"name": name, "partitions": [], "configured": True, "missing": True}) +payload = { + "ok": int(rc_text) == 0, + "target": "${target.id}", + "namespace": "${target.namespace}", + "cluster": "${kafka.cluster.name}", + "pod": pod or None, + "topics": sorted(topics.values(), key=lambda item: item["name"]), + "valuesPrinted": False, + "stderrTail": text("topics.err"), +} +print(json.dumps(payload, ensure_ascii=False, indent=2)) +sys.exit(0 if payload["ok"] else 1) +PY +`; +} + +function groupsScript(kafka: PlatformKafkaConfig, target: KafkaTarget): string { + return ` +set -u +tmp="$(mktemp -d)" +trap 'rm -rf "$tmp"' EXIT +pod="$(kubectl -n ${target.namespace} get pod -l strimzi.io/cluster=${kafka.cluster.name},strimzi.io/kind=Kafka -o jsonpath='{.items[0].metadata.name}' 2>"$tmp/pod.err" || true)" +if [ -n "$pod" ]; then + kubectl -n ${target.namespace} exec "$pod" -- bin/kafka-consumer-groups.sh --bootstrap-server ${brokerBootstrap(kafka)} --list >"$tmp/groups.out" 2>"$tmp/groups.err" + groups_rc=$? +else + groups_rc=1 + printf '%s\\n' 'kafka broker pod not found' >"$tmp/groups.err" + : >"$tmp/groups.out" +fi +python3 - "$tmp" "$pod" "$groups_rc" <<'PY' +import json, os, sys +tmp, pod, rc_text = sys.argv[1], sys.argv[2], sys.argv[3] +def text(name, limit=1600): + try: + return open(os.path.join(tmp, name), encoding="utf-8", errors="replace").read()[-limit:] + except FileNotFoundError: + return "" +groups = [line.strip() for line in text("groups.out", 200000).splitlines() if line.strip()] +payload = { + "ok": int(rc_text) == 0, + "target": "${target.id}", + "namespace": "${target.namespace}", + "cluster": "${kafka.cluster.name}", + "pod": pod or None, + "groups": sorted(groups), + "valuesPrinted": False, + "stderrTail": text("groups.err"), +} +print(json.dumps(payload, ensure_ascii=False, indent=2)) +sys.exit(0 if payload["ok"] else 1) +PY +`; +} + +function offsetsScript(kafka: PlatformKafkaConfig, target: KafkaTarget, topic: string | null, group: string | null): string { + const topics = topic === null ? kafka.topics.map((item) => item.name) : [topic]; + return ` +set -u +tmp="$(mktemp -d)" +trap 'rm -rf "$tmp"' EXIT +pod="$(kubectl -n ${target.namespace} get pod -l strimzi.io/cluster=${kafka.cluster.name},strimzi.io/kind=Kafka -o jsonpath='{.items[0].metadata.name}' 2>"$tmp/pod.err" || true)" +topics='${JSON.stringify(topics)}' +if [ -n "$pod" ]; then + python3 - "$topics" <<'PY' >"$tmp/topic-list" +import json, sys +for item in json.loads(sys.argv[1]): + print(item) +PY + offsets_rc=0 + : >"$tmp/offsets.out" + : >"$tmp/offsets.err" + while IFS= read -r topic_name; do + kubectl -n ${target.namespace} exec "$pod" -- bin/kafka-get-offsets.sh --bootstrap-server ${brokerBootstrap(kafka)} --topic "$topic_name" >>"$tmp/offsets.out" 2>>"$tmp/offsets.err" || offsets_rc=$? + done <"$tmp/topic-list" + if [ ${group === null ? "0" : "1"} -eq 1 ]; then + kubectl -n ${target.namespace} exec "$pod" -- bin/kafka-consumer-groups.sh --bootstrap-server ${brokerBootstrap(kafka)} --describe --group ${shQuote(group ?? "")} >"$tmp/group.out" 2>"$tmp/group.err" || group_rc=$? + group_rc=\${group_rc:-0} + else + group_rc=0 + : >"$tmp/group.out" + : >"$tmp/group.err" + fi +else + offsets_rc=1 + group_rc=1 + printf '%s\\n' 'kafka broker pod not found' >"$tmp/offsets.err" + : >"$tmp/offsets.out" >"$tmp/group.out" >"$tmp/group.err" +fi +python3 - "$tmp" "$pod" "$offsets_rc" "$group_rc" <<'PY' +import json, os, sys +tmp, pod, offsets_rc, group_rc = sys.argv[1], sys.argv[2], int(sys.argv[3]), int(sys.argv[4]) +def text(name, limit=2000): + try: + return open(os.path.join(tmp, name), encoding="utf-8", errors="replace").read()[-limit:] + except FileNotFoundError: + return "" +offsets = [] +for line in text("offsets.out", 200000).splitlines(): + parts = line.strip().split(":") + if len(parts) != 3: + continue + offsets.append({"topic": parts[0], "partition": int(parts[1]), "endOffset": int(parts[2])}) +group_rows = [] +for raw in text("group.out", 200000).splitlines(): + line = raw.strip() + if not line or line.startswith("GROUP") or line.startswith("Consumer group"): + continue + cols = line.split() + if len(cols) >= 6 and cols[1] != "-": + try: + group_rows.append({"group": cols[0], "topic": cols[1], "partition": int(cols[2]), "currentOffset": int(cols[3]), "logEndOffset": int(cols[4]), "lag": int(cols[5])}) + except ValueError: + pass +payload = { + "ok": offsets_rc == 0 and group_rc == 0, + "target": "${target.id}", + "namespace": "${target.namespace}", + "cluster": "${kafka.cluster.name}", + "pod": pod or None, + "topicFilter": ${pythonJsonExpr(topic)}, + "groupFilter": ${pythonJsonExpr(group)}, + "offsets": sorted(offsets, key=lambda item: (item["topic"], item["partition"])), + "groupOffsets": group_rows, + "valuesPrinted": False, + "stderrTail": (text("offsets.err") + text("group.err"))[-2000:], +} +print(json.dumps(payload, ensure_ascii=False, indent=2)) +sys.exit(0 if payload["ok"] else 1) +PY +`; +} + +function tailScript(kafka: PlatformKafkaConfig, target: KafkaTarget, topic: string, limit: number): string { + return ` +set -u +tmp="$(mktemp -d)" +trap 'rm -rf "$tmp"' EXIT +pod="$(kubectl -n ${target.namespace} get pod -l strimzi.io/cluster=${kafka.cluster.name},strimzi.io/kind=Kafka -o jsonpath='{.items[0].metadata.name}' 2>"$tmp/pod.err" || true)" +if [ -n "$pod" ]; then + kubectl -n ${target.namespace} exec "$pod" -- bin/kafka-get-offsets.sh --bootstrap-server ${brokerBootstrap(kafka)} --topic ${shQuote(topic)} >"$tmp/offsets.out" 2>"$tmp/offsets.err" + offsets_rc=$? + tail_rc=0 + : >"$tmp/messages.raw" + while IFS=: read -r topic_name partition end_offset; do + case "$partition" in ''|*[!0-9]*) continue ;; esac + case "$end_offset" in ''|*[!0-9]*) continue ;; esac + start=$(( end_offset > ${limit} ? end_offset - ${limit} : 0 )) + count=$(( end_offset - start )) + if [ "$count" -gt 0 ]; then + timeout ${Math.min(kafka.validation.timeoutSeconds, 20)} kubectl -n ${target.namespace} exec "$pod" -- bin/kafka-console-consumer.sh --bootstrap-server ${brokerBootstrap(kafka)} --topic ${shQuote(topic)} --partition "$partition" --offset "$start" --max-messages "$count" >>"$tmp/messages.raw" 2>>"$tmp/messages.err" || true + fi + done <"$tmp/offsets.out" +else + offsets_rc=1 + tail_rc=1 + printf '%s\\n' 'kafka broker pod not found' >"$tmp/offsets.err" + : >"$tmp/offsets.out" >"$tmp/messages.raw" >"$tmp/messages.err" +fi +python3 - "$tmp" "$pod" "$offsets_rc" "$tail_rc" <<'PY' +import hashlib, json, os, sys +tmp, pod, offsets_rc, tail_rc = sys.argv[1], sys.argv[2], int(sys.argv[3]), int(sys.argv[4]) +def text(name, limit=2000): + try: + return open(os.path.join(tmp, name), encoding="utf-8", errors="replace").read()[-limit:] + except FileNotFoundError: + return "" +offsets = [] +for line in text("offsets.out", 200000).splitlines(): + parts = line.strip().split(":") + if len(parts) == 3: + offsets.append({"topic": parts[0], "partition": int(parts[1]), "endOffset": int(parts[2])}) +messages = [] +for index, line in enumerate(text("messages.raw", 200000).splitlines()): + encoded = line.encode() + messages.append({"index": index, "sha256": hashlib.sha256(encoded).hexdigest(), "bytes": len(encoded), "valuePrinted": False}) +payload = { + "ok": offsets_rc == 0 and tail_rc == 0, + "target": "${target.id}", + "namespace": "${target.namespace}", + "cluster": "${kafka.cluster.name}", + "pod": pod or None, + "topic": "${topic}", + "limit": ${limit}, + "offsets": offsets, + "messages": messages[-${limit}:], + "valuesPrinted": False, + "stderrTail": (text("offsets.err") + text("messages.err"))[-2000:], +} +print(json.dumps(payload, ensure_ascii=False, indent=2)) +sys.exit(0 if payload["ok"] else 1) +PY +`; +} + +function produceScript(kafka: PlatformKafkaConfig, target: KafkaTarget, topic: string, key: string | null, payload: string): string { + const keyValue = key ?? `${kafka.management.shadowProducerId}-${Date.now()}`; + const separator = "|"; + if (keyValue.includes(separator) || keyValue.includes("\n")) throw new Error("kafka produce --key must not contain | or newline"); + return ` +set -u +tmp="$(mktemp -d)" +trap 'rm -rf "$tmp"' EXIT +pod="$(kubectl -n ${target.namespace} get pod -l strimzi.io/cluster=${kafka.cluster.name},strimzi.io/kind=Kafka -o jsonpath='{.items[0].metadata.name}' 2>"$tmp/pod.err" || true)" +cat >"$tmp/payload.json" <<'UNIDESK_KAFKA_SHADOW_PAYLOAD' +${payload} +UNIDESK_KAFKA_SHADOW_PAYLOAD +printf '%s' ${shQuote(keyValue)} >"$tmp/key.txt" +if [ -n "$pod" ]; then + printf '%s%s%s\\n' "$(cat "$tmp/key.txt")" ${shQuote(separator)} "$(cat "$tmp/payload.json")" | kubectl -n ${target.namespace} exec -i "$pod" -- bin/kafka-console-producer.sh --bootstrap-server ${brokerBootstrap(kafka)} --topic ${shQuote(topic)} --reader-property parse.key=true --reader-property key.separator=${shQuote(separator)} --command-property acks=all >"$tmp/produce.out" 2>"$tmp/produce.err" + produce_rc=$? +else + produce_rc=1 + printf '%s\\n' 'kafka broker pod not found' >"$tmp/produce.err" + : >"$tmp/produce.out" +fi +python3 - "$tmp" "$pod" "$produce_rc" <<'PY' +import hashlib, json, os, sys +tmp, pod, produce_rc = sys.argv[1], sys.argv[2], int(sys.argv[3]) +def text(name, limit=1200): + try: + return open(os.path.join(tmp, name), encoding="utf-8", errors="replace").read()[-limit:] + except FileNotFoundError: + return "" +payload = open(os.path.join(tmp, "payload.json"), encoding="utf-8").read() +key = open(os.path.join(tmp, "key.txt"), encoding="utf-8").read() +payload_obj = { + "ok": produce_rc == 0, + "target": "${target.id}", + "namespace": "${target.namespace}", + "cluster": "${kafka.cluster.name}", + "topic": "${topic}", + "pod": pod or None, + "keySha256": hashlib.sha256(key.encode()).hexdigest(), + "message": { + "sha256": hashlib.sha256(payload.encode()).hexdigest(), + "bytes": len(payload.encode()), + "valuesPrinted": False, + }, + "producer": "${kafka.management.shadowProducerId}", + "mode": "shadow-produce-only", + "consume": {"enabled": False, "performed": False}, + "steps": {"produce": {"exitCode": produce_rc, "stdoutTail": text("produce.out"), "stderrTail": text("produce.err")}}, + "valuesPrinted": False, +} +print(json.dumps(payload_obj, ensure_ascii=False, indent=2)) +sys.exit(0 if payload_obj["ok"] else 1) +PY +`; +} + function configSummary(kafka: PlatformKafkaConfig, target: KafkaTarget): Record { return { configPath: configLabel, @@ -932,6 +1386,7 @@ function configSummary(kafka: PlatformKafkaConfig, target: KafkaTarget): Record< cluster: clusterSummary(kafka, target), topics: kafka.topics.map((topic) => topicSummary(topic)), clients: kafka.clients.map((client) => clientSummary(client)), + management: kafka.management, }; } @@ -955,6 +1410,12 @@ function compactConfigSummary(kafka: PlatformKafkaConfig, target: KafkaTarget): }, topicCount: kafka.topics.length, clientCount: kafka.clients.length, + management: { + defaultShadowTopic: kafka.management.defaultShadowTopic, + defaultTailLimit: kafka.management.defaultTailLimit, + maxTailLimit: kafka.management.maxTailLimit, + shadowProducerId: kafka.management.shadowProducerId, + }, valuesPrinted: false, }; } @@ -1026,7 +1487,8 @@ function policyChecks(kafka: PlatformKafkaConfig, target: KafkaTarget, manifest: { name: "no-public-exposure", ok: !/^\s*type:\s*(NodePort|LoadBalancer)\s*$/mu.test(manifest) && !/^\s*kind:\s*Ingress\s*$/mu.test(manifest), detail: "Kafka POC is ClusterIP/internal only." }, { name: "single-broker-poc", ok: kafka.cluster.replicas === 1, detail: "D518 POC uses one KRaft broker/controller; production HA remains out of scope." }, { name: "allow-all-network-policy", ok: manifest.includes("kind: NetworkPolicy") && manifest.includes("name: allow-all") && manifest.includes(`namespace: ${target.namespace}`), detail: `NetworkPolicy/allow-all is rendered in ${target.namespace}.` }, - { name: "app-integration-off-for-p1", ok: kafka.defaults.switch.appIntegrationEnabled === false, detail: "P1 deploys runtime and smoke only; app producer/consumer switchover is a later stage." }, + { name: "shadow-produce-enabled", ok: kafka.defaults.switch.shadowProduceEnabled === true, detail: "P2 may publish shadow events to Kafka." }, + { name: "shadow-consume-disabled", ok: kafka.defaults.switch.shadowConsumeEnabled === false && kafka.defaults.switch.appIntegrationEnabled === false, detail: "P2 does not enable Kafka consumer cutover or app integration." }, ]; } @@ -1088,7 +1550,7 @@ function renderPlan(result: Record): RenderedCliResult { ` status: ${stringValue(next.status)}`, ` validate: ${stringValue(next.validate)}`, "", - "Boundary: Kafka runtime is platform-infra only; HWLAB/AgentRun app integration remains disabled in P1.", + "Boundary: Kafka runtime is platform-infra only; P2 allows shadow produce/query and keeps consumer cutover disabled.", "Disclosure: Secret values are not printed; client entries show only object/key metadata.", ]); } @@ -1120,11 +1582,90 @@ function renderStatus(result: Record): RenderedCliResult { "", "PODS", ...(pods.length === 0 ? ["-"] : table(["POD", "PHASE", "READY"], pods)), + ...remoteErrorLines(result), "", `NEXT bun scripts/cli.ts platform-infra kafka validate --target ${stringValue(summary.target)}`, ]); } +function renderTopics(result: Record): RenderedCliResult { + const query = record(result.query); + const topics = arrayRecords(query.topics).map((topic) => [ + stringValue(topic.name), + boolText(topic.configured), + stringValue(topic.partitionCount, "0"), + stringValue(topic.replicationFactor, "0"), + stringValue(topic.configs), + ]); + return rendered(result, "platform-infra kafka topics", [ + "PLATFORM-INFRA KAFKA TOPICS", + ...table(["TARGET", "NAMESPACE", "CLUSTER", "POD"], [[stringValue(query.target), stringValue(query.namespace), stringValue(query.cluster), stringValue(query.pod)]]), + "", + ...(topics.length === 0 ? ["TOPICS -"] : table(["TOPIC", "YAML", "PARTITIONS", "RF", "CONFIGS"], topics)), + ...remoteErrorLines(result), + "", + "Disclosure: message values and Secret values are not printed.", + ]); +} + +function renderGroups(result: Record): RenderedCliResult { + const query = record(result.query); + const groups = arrayRecords(query.groups).length > 0 ? arrayRecords(query.groups).map((item) => [stringValue(item.name)]) : (Array.isArray(query.groups) ? query.groups.map((item) => [String(item)]) : []); + return rendered(result, "platform-infra kafka groups", [ + "PLATFORM-INFRA KAFKA GROUPS", + ...table(["TARGET", "NAMESPACE", "CLUSTER", "POD"], [[stringValue(query.target), stringValue(query.namespace), stringValue(query.cluster), stringValue(query.pod)]]), + "", + ...(groups.length === 0 ? ["GROUPS -"] : table(["GROUP"], groups)), + ...remoteErrorLines(result), + ]); +} + +function renderOffsets(result: Record): RenderedCliResult { + const query = record(result.query); + const offsets = arrayRecords(query.offsets).map((item) => [stringValue(item.topic), stringValue(item.partition), stringValue(item.endOffset)]); + const groupOffsets = arrayRecords(query.groupOffsets).map((item) => [stringValue(item.group), stringValue(item.topic), stringValue(item.partition), stringValue(item.currentOffset), stringValue(item.logEndOffset), stringValue(item.lag)]); + return rendered(result, "platform-infra kafka offsets", [ + "PLATFORM-INFRA KAFKA OFFSETS", + ...table(["TARGET", "NAMESPACE", "CLUSTER", "POD"], [[stringValue(query.target), stringValue(query.namespace), stringValue(query.cluster), stringValue(query.pod)]]), + "", + "END OFFSETS", + ...(offsets.length === 0 ? ["-"] : table(["TOPIC", "PARTITION", "END"], offsets)), + "", + "GROUP OFFSETS", + ...(groupOffsets.length === 0 ? ["-"] : table(["GROUP", "TOPIC", "PARTITION", "CURRENT", "END", "LAG"], groupOffsets)), + ...remoteErrorLines(result), + ]); +} + +function renderTail(result: Record): RenderedCliResult { + const query = record(result.query); + const messages = arrayRecords(query.messages).map((item) => [stringValue(item.index), stringValue(item.sha256), stringValue(item.bytes), boolText(item.valuePrinted)]); + const offsets = arrayRecords(query.offsets).map((item) => [stringValue(item.partition), stringValue(item.endOffset)]); + return rendered(result, "platform-infra kafka tail", [ + "PLATFORM-INFRA KAFKA TAIL", + ...table(["TARGET", "TOPIC", "LIMIT", "POD"], [[stringValue(query.target), stringValue(query.topic), stringValue(query.limit), stringValue(query.pod)]]), + "", + "OFFSETS", + ...(offsets.length === 0 ? ["-"] : table(["PARTITION", "END"], offsets)), + "", + "MESSAGES", + ...(messages.length === 0 ? ["-"] : table(["INDEX", "SHA256", "BYTES", "VALUE_PRINTED"], messages)), + ...remoteErrorLines(result), + "", + "Disclosure: values are hashed only; use application-side replay tools for payload inspection.", + ]); +} + +function remoteErrorLines(result: Record): string[] { + if (result.ok !== false) return []; + const remote = record(result.remote); + const exitCode = stringValue(remote.exitCode); + const stderr = stringValue(remote.stderrTail).replace(/\s+/gu, " ").trim().slice(0, 220); + const stdout = stringValue(remote.stdoutTail).replace(/\s+/gu, " ").trim().slice(0, 220); + const detail = stderr !== "-" ? stderr : stdout; + return ["", "ERROR", ...table(["EXIT", "DETAIL"], [[exitCode, detail]])]; +} + function rendered(result: Record, command: string, lines: string[]): RenderedCliResult { return { ok: result.ok !== false, command, renderedText: lines.join("\n"), contentType: "text/plain" }; } @@ -1141,7 +1682,7 @@ function parseApplyOptions(args: string[]): ApplyOptions { else if (arg === "--wait") wait = true; else { commonArgs.push(arg); - if (arg === "--target") { + if (arg === "--target" || arg === "--node") { commonArgs.push(args[index + 1] ?? ""); index += 1; } @@ -1157,10 +1698,10 @@ function parseCommonOptions(args: string[]): CommonOptions { let raw = false; for (let index = 0; index < args.length; index += 1) { const arg = args[index]; - if (arg === "--target") { + if (arg === "--target" || arg === "--node") { const value = args[index + 1]; - if (value === undefined || value.startsWith("--")) throw new Error("--target requires a value"); - if (!/^[A-Za-z0-9._-]+$/u.test(value)) throw new Error("--target must be a simple target id"); + if (value === undefined || value.startsWith("--")) throw new Error(`${arg} requires a value`); + if (!/^[A-Za-z0-9._-]+$/u.test(value)) throw new Error(`${arg} must be a simple target id`); targetId = value; index += 1; } else if (arg === "--full") { @@ -1175,10 +1716,145 @@ function parseCommonOptions(args: string[]): CommonOptions { return { targetId, full, raw }; } +function parseInspectOptions(args: string[]): KafkaInspectOptions { + const commonArgs: string[] = []; + let topic: string | null = null; + let group: string | null = null; + let limit: number | null = null; + for (let index = 0; index < args.length; index += 1) { + const arg = args[index]; + if (arg === "--topic") { + topic = parseTopicOption(args[index + 1], arg); + index += 1; + } else if (arg === "--group") { + group = parseSimpleOption(args[index + 1], arg); + index += 1; + } else if (arg === "--limit") { + const value = args[index + 1]; + if (value === undefined || value.startsWith("--")) throw new Error("--limit requires a value"); + limit = Number(value); + if (!Number.isInteger(limit) || limit < 1) throw new Error("--limit must be a positive integer"); + index += 1; + } else { + commonArgs.push(arg); + if (arg === "--target" || arg === "--node") { + commonArgs.push(args[index + 1] ?? ""); + index += 1; + } + } + } + return { ...parseCommonOptions(commonArgs), topic, group, limit }; +} + +async function parseProduceOptions(args: string[]): Promise { + const commonArgs: string[] = []; + let topic: string | null = null; + let key: string | null = null; + let source = "manual-cli"; + let eventType = "platform-infra.kafka.shadow-produce.v1"; + let payload: string | null = null; + for (let index = 0; index < args.length; index += 1) { + const arg = args[index]; + if (arg === "--topic") { + topic = parseTopicOption(args[index + 1], arg); + index += 1; + } else if (arg === "--key") { + key = parseSimpleOption(args[index + 1], arg); + index += 1; + } else if (arg === "--source") { + source = parseSimpleOption(args[index + 1], arg); + index += 1; + } else if (arg === "--event-type") { + eventType = parseTopicOption(args[index + 1], arg); + index += 1; + } else if (arg === "--payload-stdin") { + payload = await Bun.stdin.text(); + } else { + commonArgs.push(arg); + if (arg === "--target" || arg === "--node") { + commonArgs.push(args[index + 1] ?? ""); + index += 1; + } + } + } + return { ...parseCommonOptions(commonArgs), topic, key, source, eventType, payload }; +} + function bootstrapService(kafka: PlatformKafkaConfig, target: KafkaTarget): string { return `${kafka.cluster.name}-kafka-bootstrap.${target.namespace}.svc.cluster.local:${kafka.cluster.listeners.plain.port}`; } +function brokerBootstrap(kafka: PlatformKafkaConfig): string { + return `${kafka.cluster.name}-kafka-bootstrap:${kafka.cluster.listeners.plain.port}`; +} + +function resolveTopic(kafka: PlatformKafkaConfig, topic: string | null, fallback: string | null): string | null { + const resolved = topic ?? fallback; + if (resolved === null) return null; + if (!kafka.topics.some((item) => item.name === resolved)) throw new Error(`unknown kafka topic ${resolved}; known topics: ${kafka.topics.map((item) => item.name).join(", ")}`); + return resolved; +} + +function boundedLimit(kafka: PlatformKafkaConfig, limit: number | null): number { + const value = limit ?? kafka.management.defaultTailLimit; + if (!Number.isInteger(value) || value < 1) throw new Error("limit must be a positive integer"); + if (value > kafka.management.maxTailLimit) throw new Error(`limit ${value} exceeds ${configLabel}.management.maxTailLimit=${kafka.management.maxTailLimit}`); + return value; +} + +function buildShadowPayload(kafka: PlatformKafkaConfig, target: KafkaTarget, options: KafkaProduceOptions, topic: string): string { + let userPayload: unknown = null; + if (options.payload !== null) { + const trimmed = options.payload.trim(); + if (trimmed.length > 0) { + try { + userPayload = JSON.parse(trimmed) as unknown; + } catch { + userPayload = { textSha256: sha256Hex(trimmed), valuesPrinted: false }; + } + } + } + return JSON.stringify({ + eventId: `evt_${target.id}_${Date.now()}_${Math.random().toString(16).slice(2)}`, + schemaVersion: 1, + eventType: options.eventType, + producer: kafka.management.shadowProducerId, + source: options.source, + node: target.id, + namespace: target.namespace, + topic, + mode: "shadow-produce-only", + consumeEnabled: false, + appIntegrationEnabled: kafka.defaults.switch.appIntegrationEnabled, + shadowProduceEnabled: kafka.defaults.switch.shadowProduceEnabled, + shadowConsumeEnabled: kafka.defaults.switch.shadowConsumeEnabled, + partitionKey: options.key ?? null, + occurredAt: new Date().toISOString(), + payload: userPayload ?? { + kind: "manual-shadow-produce", + spec: kafka.metadata.spec, + valuesPrinted: false, + }, + redaction: { valuesPrinted: false }, + }); +} + +function parseTopicOption(value: string | undefined, optionName: string): string { + if (value === undefined || value.startsWith("--")) throw new Error(`${optionName} requires a value`); + if (!/^[A-Za-z0-9._-]+$/u.test(value) || value.includes("..")) throw new Error(`${optionName} has an unsupported Kafka topic/event name`); + return value; +} + +function parseSimpleOption(value: string | undefined, optionName: string): string { + if (value === undefined || value.startsWith("--")) throw new Error(`${optionName} requires a value`); + if (!/^[A-Za-z0-9._:/=-]+$/u.test(value)) throw new Error(`${optionName} must be a simple token`); + return value; +} + +function pythonJsonExpr(value: unknown): string { + return `json.loads(${JSON.stringify(JSON.stringify(value))})`; +} + function topicNameField(record: Record, key: string, path: string): string { const value = y.stringField(record, key, path); if (!/^[A-Za-z0-9._-]+$/u.test(value) || value.includes("..")) throw new Error(`${configLabel}.${path}.${key} has an unsupported Kafka topic name`); diff --git a/scripts/src/platform-infra/entry.ts b/scripts/src/platform-infra/entry.ts index d692930a..c14842a1 100644 --- a/scripts/src/platform-infra/entry.ts +++ b/scripts/src/platform-infra/entry.ts @@ -370,6 +370,11 @@ export function platformInfraHelp(): unknown { "bun scripts/cli.ts platform-infra kafka apply --target D518 --confirm", "bun scripts/cli.ts platform-infra kafka status --target D518", "bun scripts/cli.ts platform-infra kafka validate --target D518", + "bun scripts/cli.ts platform-infra kafka topics --node D518", + "bun scripts/cli.ts platform-infra kafka groups --node D518", + "bun scripts/cli.ts platform-infra kafka offsets --node D518 --topic hwlab.agentrun.command.v1", + "bun scripts/cli.ts platform-infra kafka tail --node D518 --topic hwlab.agentrun.command.v1 --limit 5", + "bun scripts/cli.ts platform-infra kafka produce --node D518 --topic hwlab.agentrun.command.v1 --key ", ], description: "Operate YAML-controlled platform-infra services such as Sub2API, LangBot, n8n, WeChat archive workflows, OpenTelemetry tracing, the independent target-scoped secret plane, and the D518 Kafka event bus. Public services use PK01 Caddy+FRP rather than Kubernetes Ingress, NodePort, or LoadBalancer.", target,