feat: add kafka shadow produce management cli

This commit is contained in:
Codex
2026-06-28 10:07:15 +00:00
parent dab32a823e
commit a9862ea745
4 changed files with 700 additions and 11 deletions
+9 -1
View File
@@ -12,8 +12,10 @@ defaults:
targetId: D518 targetId: D518
switch: switch:
enabled: true enabled: true
mode: runtime-poc mode: shadow-produce-only
appIntegrationEnabled: false appIntegrationEnabled: false
shadowProduceEnabled: true
shadowConsumeEnabled: false
operator: operator:
implementation: strimzi implementation: strimzi
@@ -116,3 +118,9 @@ validation:
timeoutSeconds: 45 timeoutSeconds: 45
pollSeconds: 3 pollSeconds: 3
smokeTopic: platform-infra.kafka.smoke.v1 smokeTopic: platform-infra.kafka.smoke.v1
management:
defaultTailLimit: 10
maxTailLimit: 50
defaultShadowTopic: hwlab.agentrun.command.v1
shadowProducerId: unidesk-platform-infra-kafka-cli
+2 -2
View File
@@ -37,8 +37,8 @@
- Kafka for the HWLAB v0.3 / AgentRun v0.2 event-bus POC is a UniDesk-operated platform service in namespace `platform-infra`. It is not owned by `hwlab-v03`, `agentrun-v02`, a per-lane Kafka namespace, or a service repository deployment file. - Kafka for the HWLAB v0.3 / AgentRun v0.2 event-bus POC is a UniDesk-operated platform service in namespace `platform-infra`. It is not owned by `hwlab-v03`, `agentrun-v02`, a per-lane Kafka namespace, or a service repository deployment file.
- The canonical source of truth is `config/platform-infra/kafka.yaml`; target, namespace, Strimzi release URL, cluster name, storage class/size, topic list, client declarations, DLQ names, runtime switch and validation smoke topic must stay in that YAML. Current version numbers and retention values belong only in YAML, not in this reference. - The canonical source of truth is `config/platform-infra/kafka.yaml`; target, namespace, Strimzi release URL, cluster name, storage class/size, topic list, client declarations, DLQ names, runtime switch and validation smoke topic must stay in that YAML. Current version numbers and retention values belong only in YAML, not in this reference.
- The canonical entrypoint is `bun scripts/cli.ts platform-infra kafka plan|apply|status|validate --target D518`. Formal mutation must use that path; raw `kubectl` is bounded diagnosis only. - The canonical entrypoint is `bun scripts/cli.ts platform-infra kafka plan|apply|status|validate|topics|groups|offsets|tail|produce --target <node>`; `--node <node>` is an equivalent selector for node-targeted operations. Formal mutation must use that path; raw `kubectl` is bounded diagnosis only.
- HWLAB v0.3 and AgentRun v0.2 are client namespaces. They may later consume YAML-declared Kafka bootstrap, user Secret metadata and topic contracts, but app producer/consumer switchover must be a separate HWLAB/AgentRun implementation stage. Runtime readiness alone does not prove Workbench projection, SSE or AgentRun command ingestion has migrated. - HWLAB v0.3 and AgentRun v0.2 are client namespaces. They may later consume YAML-declared Kafka bootstrap, user Secret metadata and topic contracts, but app producer/consumer switchover must be a separate HWLAB/AgentRun implementation stage. Runtime readiness alone does not prove Workbench projection, SSE or AgentRun command ingestion has migrated. Shadow produce may write Kafka events for observation only when YAML keeps consumer cutover disabled; it must not replace the current business read path.
- The first POC is a single-node KRaft broker for observability, ordering and replay investigation. It improves auditability and smoke coverage, but it is not a production high-availability claim; replication, backup, min ISR and app-side transactional inbox/outbox are separate decisions. - The first POC is a single-node KRaft broker for observability, ordering and replay investigation. It improves auditability and smoke coverage, but it is not a production high-availability claim; replication, backup, min ISR and app-side transactional inbox/outbox are separate decisions.
- Kafka must stay ClusterIP-only by default. Do not add Ingress, NodePort, LoadBalancer, host networking, public FRP, or browser-facing Kafka access unless a later YAML-controlled platform decision explicitly changes that boundary. - Kafka must stay ClusterIP-only by default. Do not add Ingress, NodePort, LoadBalancer, host networking, public FRP, or browser-facing Kafka access unless a later YAML-controlled platform decision explicitly changes that boundary.
+684 -8
View File
@@ -8,6 +8,7 @@ import {
parseJsonOutput, parseJsonOutput,
readYamlRecord, readYamlRecord,
sha256Hex, sha256Hex,
shQuote,
} from "./platform-infra-ops-library"; } from "./platform-infra-ops-library";
const configFile = rootPath("config", "platform-infra", "kafka.yaml"); const configFile = rootPath("config", "platform-infra", "kafka.yaml");
@@ -60,6 +61,8 @@ interface PlatformKafkaConfig {
enabled: boolean; enabled: boolean;
mode: string; mode: string;
appIntegrationEnabled: boolean; appIntegrationEnabled: boolean;
shadowProduceEnabled: boolean;
shadowConsumeEnabled: boolean;
}; };
}; };
operator: { operator: {
@@ -97,6 +100,12 @@ interface PlatformKafkaConfig {
pollSeconds: number; pollSeconds: number;
smokeTopic: string; smokeTopic: string;
}; };
management: {
defaultTailLimit: number;
maxTailLimit: number;
defaultShadowTopic: string;
shadowProducerId: string;
};
} }
interface CommonOptions { interface CommonOptions {
@@ -111,6 +120,20 @@ interface ApplyOptions extends CommonOptions {
wait: boolean; wait: boolean;
} }
interface KafkaInspectOptions extends CommonOptions {
topic: string | null;
group: string | null;
limit: number | null;
}
interface KafkaProduceOptions extends CommonOptions {
topic: string | null;
key: string | null;
source: string;
eventType: string;
payload: string | null;
}
export async function runPlatformInfraKafkaCommand(config: UniDeskConfig, args: string[]): Promise<Record<string, unknown> | RenderedCliResult> { export async function runPlatformInfraKafkaCommand(config: UniDeskConfig, args: string[]): Promise<Record<string, unknown> | RenderedCliResult> {
const [action = "plan"] = args; const [action = "plan"] = args;
if (action === "plan") { if (action === "plan") {
@@ -125,6 +148,27 @@ export async function runPlatformInfraKafkaCommand(config: UniDeskConfig, args:
return options.full || options.raw ? result : renderStatus(result); return options.full || options.raw ? result : renderStatus(result);
} }
if (action === "validate") return await validate(config, parseCommonOptions(args.slice(1))); if (action === "validate") return await validate(config, parseCommonOptions(args.slice(1)));
if (action === "topics") {
const options = parseInspectOptions(args.slice(1));
const result = await topics(config, options);
return options.full || options.raw ? result : renderTopics(result);
}
if (action === "groups") {
const options = parseInspectOptions(args.slice(1));
const result = await groups(config, options);
return options.full || options.raw ? result : renderGroups(result);
}
if (action === "offsets") {
const options = parseInspectOptions(args.slice(1));
const result = await offsets(config, options);
return options.full || options.raw ? result : renderOffsets(result);
}
if (action === "tail") {
const options = parseInspectOptions(args.slice(1));
const result = await tail(config, options);
return options.full || options.raw ? result : renderTail(result);
}
if (action === "produce") return await produce(config, await parseProduceOptions(args.slice(1)));
return { return {
ok: false, ok: false,
error: "unsupported-platform-infra-kafka-command", error: "unsupported-platform-infra-kafka-command",
@@ -135,16 +179,22 @@ export async function runPlatformInfraKafkaCommand(config: UniDeskConfig, args:
export function kafkaHelp(): Record<string, unknown> { export function kafkaHelp(): Record<string, unknown> {
return { return {
command: "platform-infra kafka plan|apply|status|validate", command: "platform-infra kafka plan|apply|status|validate|topics|groups|offsets|tail|produce",
configTruth: configLabel, configTruth: configLabel,
usage: [ usage: [
"bun scripts/cli.ts platform-infra kafka plan --target D518", "bun scripts/cli.ts platform-infra kafka plan --target D518",
"bun scripts/cli.ts platform-infra kafka plan --node D518",
"bun scripts/cli.ts platform-infra kafka apply --target D518 --dry-run", "bun scripts/cli.ts platform-infra kafka apply --target D518 --dry-run",
"bun scripts/cli.ts platform-infra kafka apply --target D518 --confirm", "bun scripts/cli.ts platform-infra kafka apply --target D518 --confirm",
"bun scripts/cli.ts platform-infra kafka status --target D518 [--full|--raw]", "bun scripts/cli.ts platform-infra kafka status --target D518 [--full|--raw]",
"bun scripts/cli.ts platform-infra kafka validate --target D518 [--full|--raw]", "bun scripts/cli.ts platform-infra kafka validate --target D518 [--full|--raw]",
"bun scripts/cli.ts platform-infra kafka topics --node D518",
"bun scripts/cli.ts platform-infra kafka groups --node D518",
"bun scripts/cli.ts platform-infra kafka offsets --node D518 [--topic hwlab.agentrun.command.v1]",
"bun scripts/cli.ts platform-infra kafka tail --node D518 --topic hwlab.agentrun.command.v1 --limit 5",
"bun scripts/cli.ts platform-infra kafka produce --node D518 --topic hwlab.agentrun.command.v1 --key <trace-or-session>",
], ],
boundary: "Kafka runtime belongs to platform-infra. HWLAB v0.3 and AgentRun v0.2 are client namespaces only.", boundary: "Kafka runtime belongs to platform-infra. P2 supports shadow produce/query only; Kafka consumer cutover stays disabled.",
}; };
} }
@@ -178,6 +228,8 @@ function readKafkaConfig(): PlatformKafkaConfig {
enabled: y.booleanField(switchRecord, "enabled", "defaults.switch"), enabled: y.booleanField(switchRecord, "enabled", "defaults.switch"),
mode: y.stringField(switchRecord, "mode", "defaults.switch"), mode: y.stringField(switchRecord, "mode", "defaults.switch"),
appIntegrationEnabled: y.booleanField(switchRecord, "appIntegrationEnabled", "defaults.switch"), appIntegrationEnabled: y.booleanField(switchRecord, "appIntegrationEnabled", "defaults.switch"),
shadowProduceEnabled: y.booleanField(switchRecord, "shadowProduceEnabled", "defaults.switch"),
shadowConsumeEnabled: y.booleanField(switchRecord, "shadowConsumeEnabled", "defaults.switch"),
}, },
}, },
operator: { operator: {
@@ -215,6 +267,7 @@ function readKafkaConfig(): PlatformKafkaConfig {
pollSeconds: y.integerField(validation, "pollSeconds", "validation"), pollSeconds: y.integerField(validation, "pollSeconds", "validation"),
smokeTopic: y.stringField(validation, "smokeTopic", "validation"), smokeTopic: y.stringField(validation, "smokeTopic", "validation"),
}, },
management: parseManagement(root),
}; };
validateConfig(parsed); validateConfig(parsed);
return parsed; return parsed;
@@ -259,6 +312,16 @@ function parseClient(record: Record<string, unknown>, index: number): KafkaClien
}; };
} }
function parseManagement(root: Record<string, unknown>): PlatformKafkaConfig["management"] {
const management = y.objectField(root, "management", "");
return {
defaultTailLimit: positiveInteger(management, "defaultTailLimit", "management"),
maxTailLimit: positiveInteger(management, "maxTailLimit", "management"),
defaultShadowTopic: topicNameField(management, "defaultShadowTopic", "management"),
shadowProducerId: y.stringField(management, "shadowProducerId", "management"),
};
}
function validateConfig(kafka: PlatformKafkaConfig): void { function validateConfig(kafka: PlatformKafkaConfig): void {
resolveTarget(kafka, kafka.defaults.targetId); resolveTarget(kafka, kafka.defaults.targetId);
if (!/^https:\/\/github\.com\/strimzi\/strimzi-kafka-operator\/releases\/download\//u.test(kafka.operator.manifestUrl)) { if (!/^https:\/\/github\.com\/strimzi\/strimzi-kafka-operator\/releases\/download\//u.test(kafka.operator.manifestUrl)) {
@@ -267,6 +330,9 @@ function validateConfig(kafka: PlatformKafkaConfig): void {
if (kafka.cluster.replicas !== 1) throw new Error(`${configLabel}.cluster.replicas must be 1 for the D518 POC`); if (kafka.cluster.replicas !== 1) throw new Error(`${configLabel}.cluster.replicas must be 1 for the D518 POC`);
if (!/^[0-9]+Gi$/u.test(kafka.cluster.storage.size)) throw new Error(`${configLabel}.cluster.storage.size must be a Gi quantity`); if (!/^[0-9]+Gi$/u.test(kafka.cluster.storage.size)) throw new Error(`${configLabel}.cluster.storage.size must be a Gi quantity`);
if (!kafka.topics.some((topic) => topic.name === kafka.validation.smokeTopic)) throw new Error(`${configLabel}.validation.smokeTopic must reference one of topics[].name`); if (!kafka.topics.some((topic) => topic.name === kafka.validation.smokeTopic)) throw new Error(`${configLabel}.validation.smokeTopic must reference one of topics[].name`);
if (!kafka.topics.some((topic) => topic.name === kafka.management.defaultShadowTopic)) throw new Error(`${configLabel}.management.defaultShadowTopic must reference one of topics[].name`);
if (kafka.management.defaultTailLimit > kafka.management.maxTailLimit) throw new Error(`${configLabel}.management.defaultTailLimit must be <= management.maxTailLimit`);
if (kafka.defaults.switch.shadowConsumeEnabled) throw new Error(`${configLabel}.defaults.switch.shadowConsumeEnabled must stay false for shadow-produce-only stage`);
const topicNames = new Set<string>(); const topicNames = new Set<string>();
for (const topic of kafka.topics) { for (const topic of kafka.topics) {
if (topicNames.has(topic.name)) throw new Error(`${configLabel}.topics contains duplicate topic ${topic.name}`); if (topicNames.has(topic.name)) throw new Error(`${configLabel}.topics contains duplicate topic ${topic.name}`);
@@ -391,6 +457,92 @@ async function validate(config: UniDeskConfig, options: CommonOptions): Promise<
target: targetSummary(target), target: targetSummary(target),
config: compactConfigSummary(kafka, target), config: compactConfigSummary(kafka, target),
validation: parsed ?? null, validation: parsed ?? null,
remote: options.raw && parsed !== null ? parsed : compactCapture(result, { full: options.full || result.exitCode !== 0 }),
};
}
async function topics(config: UniDeskConfig, options: KafkaInspectOptions): Promise<Record<string, unknown>> {
const kafka = readKafkaConfig();
const target = resolveTarget(kafka, options.targetId);
const result = await capture(config, target.route, ["sh"], topicsScript(kafka, target));
const parsed = parseJsonOutput(result.stdout);
return {
ok: result.exitCode === 0 && parsed?.ok === true,
action: "platform-infra-kafka-topics",
mutation: false,
target: targetSummary(target),
config: compactConfigSummary(kafka, target),
query: parsed ?? null,
remote: options.raw && parsed !== null ? parsed : compactCapture(result, { full: options.full || result.exitCode !== 0 }),
};
}
async function groups(config: UniDeskConfig, options: KafkaInspectOptions): Promise<Record<string, unknown>> {
const kafka = readKafkaConfig();
const target = resolveTarget(kafka, options.targetId);
const result = await capture(config, target.route, ["sh"], groupsScript(kafka, target));
const parsed = parseJsonOutput(result.stdout);
return {
ok: result.exitCode === 0 && parsed?.ok === true,
action: "platform-infra-kafka-groups",
mutation: false,
target: targetSummary(target),
config: compactConfigSummary(kafka, target),
query: parsed ?? null,
remote: options.raw && parsed !== null ? parsed : compactCapture(result, { full: options.full || result.exitCode !== 0 }),
};
}
async function offsets(config: UniDeskConfig, options: KafkaInspectOptions): Promise<Record<string, unknown>> {
const kafka = readKafkaConfig();
const target = resolveTarget(kafka, options.targetId);
const topic = resolveTopic(kafka, options.topic, null);
const result = await capture(config, target.route, ["sh"], offsetsScript(kafka, target, topic, options.group));
const parsed = parseJsonOutput(result.stdout);
return {
ok: result.exitCode === 0 && parsed?.ok === true,
action: "platform-infra-kafka-offsets",
mutation: false,
target: targetSummary(target),
config: compactConfigSummary(kafka, target),
query: parsed ?? null,
remote: options.raw && parsed !== null ? parsed : compactCapture(result, { full: options.full || result.exitCode !== 0 }),
};
}
async function tail(config: UniDeskConfig, options: KafkaInspectOptions): Promise<Record<string, unknown>> {
const kafka = readKafkaConfig();
const target = resolveTarget(kafka, options.targetId);
const topic = resolveTopic(kafka, options.topic, kafka.management.defaultShadowTopic);
const limit = boundedLimit(kafka, options.limit);
const result = await capture(config, target.route, ["sh"], tailScript(kafka, target, topic, limit));
const parsed = parseJsonOutput(result.stdout);
return {
ok: result.exitCode === 0 && parsed?.ok === true,
action: "platform-infra-kafka-tail",
mutation: false,
target: targetSummary(target),
config: compactConfigSummary(kafka, target),
query: parsed ?? null,
remote: options.raw && parsed !== null ? parsed : compactCapture(result, { full: options.full || result.exitCode !== 0 }),
};
}
async function produce(config: UniDeskConfig, options: KafkaProduceOptions): Promise<Record<string, unknown>> {
const kafka = readKafkaConfig();
const target = resolveTarget(kafka, options.targetId);
const topic = resolveTopic(kafka, options.topic, kafka.management.defaultShadowTopic);
const payload = buildShadowPayload(kafka, target, options, topic);
const result = await capture(config, target.route, ["sh"], produceScript(kafka, target, topic, options.key, payload));
const parsed = parseJsonOutput(result.stdout);
return {
ok: result.exitCode === 0 && parsed?.ok === true,
action: "platform-infra-kafka-produce",
mutation: true,
mode: "shadow-produce-only",
target: targetSummary(target),
config: compactConfigSummary(kafka, target),
production: parsed ?? null,
remote: options.raw ? parsed : compactCapture(result, { full: options.full || result.exitCode !== 0 }), remote: options.raw ? parsed : compactCapture(result, { full: options.full || result.exitCode !== 0 }),
}; };
} }
@@ -922,6 +1074,308 @@ PY
`; `;
} }
function topicsScript(kafka: PlatformKafkaConfig, target: KafkaTarget): string {
return `
set -u
tmp="$(mktemp -d)"
trap 'rm -rf "$tmp"' EXIT
pod="$(kubectl -n ${target.namespace} get pod -l strimzi.io/cluster=${kafka.cluster.name},strimzi.io/kind=Kafka -o jsonpath='{.items[0].metadata.name}' 2>"$tmp/pod.err" || true)"
if [ -n "$pod" ]; then
kubectl -n ${target.namespace} exec "$pod" -- bin/kafka-topics.sh --bootstrap-server ${brokerBootstrap(kafka)} --describe >"$tmp/topics.out" 2>"$tmp/topics.err"
topics_rc=$?
else
topics_rc=1
printf '%s\\n' 'kafka broker pod not found' >"$tmp/topics.err"
: >"$tmp/topics.out"
fi
python3 - "$tmp" "$pod" "$topics_rc" <<'PY'
import json, os, re, sys
tmp, pod, rc_text = sys.argv[1], sys.argv[2], sys.argv[3]
configured = ${JSON.stringify(kafka.topics.map((topic) => topic.name))}
def text(name, limit=1600):
try:
return open(os.path.join(tmp, name), encoding="utf-8", errors="replace").read()[-limit:]
except FileNotFoundError:
return ""
topics = {}
for raw in text("topics.out", 200000).splitlines():
line = raw.strip()
if not line.startswith("Topic:"):
continue
parts = [part.strip() for part in line.split("\\t") if part.strip()]
data = {}
for part in parts:
if ":" in part:
key, value = part.split(":", 1)
data[key.strip()] = value.strip()
name = data.get("Topic")
if not name or name == "__consumer_offsets":
continue
topic = topics.setdefault(name, {"name": name, "partitions": [], "configured": name in configured})
if "PartitionCount" in data:
topic["partitionCount"] = int(data.get("PartitionCount") or 0)
topic["replicationFactor"] = int(data.get("ReplicationFactor") or 0)
topic["configs"] = data.get("Configs") or ""
if "Partition" in data:
topic["partitions"].append({
"partition": int(data.get("Partition") or 0),
"leader": data.get("Leader"),
"replicas": [item for item in re.split(r",\\s*", data.get("Replicas") or "") if item],
"isr": [item for item in re.split(r",\\s*", data.get("Isr") or "") if item],
})
for name in configured:
topics.setdefault(name, {"name": name, "partitions": [], "configured": True, "missing": True})
payload = {
"ok": int(rc_text) == 0,
"target": "${target.id}",
"namespace": "${target.namespace}",
"cluster": "${kafka.cluster.name}",
"pod": pod or None,
"topics": sorted(topics.values(), key=lambda item: item["name"]),
"valuesPrinted": False,
"stderrTail": text("topics.err"),
}
print(json.dumps(payload, ensure_ascii=False, indent=2))
sys.exit(0 if payload["ok"] else 1)
PY
`;
}
function groupsScript(kafka: PlatformKafkaConfig, target: KafkaTarget): string {
return `
set -u
tmp="$(mktemp -d)"
trap 'rm -rf "$tmp"' EXIT
pod="$(kubectl -n ${target.namespace} get pod -l strimzi.io/cluster=${kafka.cluster.name},strimzi.io/kind=Kafka -o jsonpath='{.items[0].metadata.name}' 2>"$tmp/pod.err" || true)"
if [ -n "$pod" ]; then
kubectl -n ${target.namespace} exec "$pod" -- bin/kafka-consumer-groups.sh --bootstrap-server ${brokerBootstrap(kafka)} --list >"$tmp/groups.out" 2>"$tmp/groups.err"
groups_rc=$?
else
groups_rc=1
printf '%s\\n' 'kafka broker pod not found' >"$tmp/groups.err"
: >"$tmp/groups.out"
fi
python3 - "$tmp" "$pod" "$groups_rc" <<'PY'
import json, os, sys
tmp, pod, rc_text = sys.argv[1], sys.argv[2], sys.argv[3]
def text(name, limit=1600):
try:
return open(os.path.join(tmp, name), encoding="utf-8", errors="replace").read()[-limit:]
except FileNotFoundError:
return ""
groups = [line.strip() for line in text("groups.out", 200000).splitlines() if line.strip()]
payload = {
"ok": int(rc_text) == 0,
"target": "${target.id}",
"namespace": "${target.namespace}",
"cluster": "${kafka.cluster.name}",
"pod": pod or None,
"groups": sorted(groups),
"valuesPrinted": False,
"stderrTail": text("groups.err"),
}
print(json.dumps(payload, ensure_ascii=False, indent=2))
sys.exit(0 if payload["ok"] else 1)
PY
`;
}
function offsetsScript(kafka: PlatformKafkaConfig, target: KafkaTarget, topic: string | null, group: string | null): string {
const topics = topic === null ? kafka.topics.map((item) => item.name) : [topic];
return `
set -u
tmp="$(mktemp -d)"
trap 'rm -rf "$tmp"' EXIT
pod="$(kubectl -n ${target.namespace} get pod -l strimzi.io/cluster=${kafka.cluster.name},strimzi.io/kind=Kafka -o jsonpath='{.items[0].metadata.name}' 2>"$tmp/pod.err" || true)"
topics='${JSON.stringify(topics)}'
if [ -n "$pod" ]; then
python3 - "$topics" <<'PY' >"$tmp/topic-list"
import json, sys
for item in json.loads(sys.argv[1]):
print(item)
PY
offsets_rc=0
: >"$tmp/offsets.out"
: >"$tmp/offsets.err"
while IFS= read -r topic_name; do
kubectl -n ${target.namespace} exec "$pod" -- bin/kafka-get-offsets.sh --bootstrap-server ${brokerBootstrap(kafka)} --topic "$topic_name" >>"$tmp/offsets.out" 2>>"$tmp/offsets.err" || offsets_rc=$?
done <"$tmp/topic-list"
if [ ${group === null ? "0" : "1"} -eq 1 ]; then
kubectl -n ${target.namespace} exec "$pod" -- bin/kafka-consumer-groups.sh --bootstrap-server ${brokerBootstrap(kafka)} --describe --group ${shQuote(group ?? "")} >"$tmp/group.out" 2>"$tmp/group.err" || group_rc=$?
group_rc=\${group_rc:-0}
else
group_rc=0
: >"$tmp/group.out"
: >"$tmp/group.err"
fi
else
offsets_rc=1
group_rc=1
printf '%s\\n' 'kafka broker pod not found' >"$tmp/offsets.err"
: >"$tmp/offsets.out" >"$tmp/group.out" >"$tmp/group.err"
fi
python3 - "$tmp" "$pod" "$offsets_rc" "$group_rc" <<'PY'
import json, os, sys
tmp, pod, offsets_rc, group_rc = sys.argv[1], sys.argv[2], int(sys.argv[3]), int(sys.argv[4])
def text(name, limit=2000):
try:
return open(os.path.join(tmp, name), encoding="utf-8", errors="replace").read()[-limit:]
except FileNotFoundError:
return ""
offsets = []
for line in text("offsets.out", 200000).splitlines():
parts = line.strip().split(":")
if len(parts) != 3:
continue
offsets.append({"topic": parts[0], "partition": int(parts[1]), "endOffset": int(parts[2])})
group_rows = []
for raw in text("group.out", 200000).splitlines():
line = raw.strip()
if not line or line.startswith("GROUP") or line.startswith("Consumer group"):
continue
cols = line.split()
if len(cols) >= 6 and cols[1] != "-":
try:
group_rows.append({"group": cols[0], "topic": cols[1], "partition": int(cols[2]), "currentOffset": int(cols[3]), "logEndOffset": int(cols[4]), "lag": int(cols[5])})
except ValueError:
pass
payload = {
"ok": offsets_rc == 0 and group_rc == 0,
"target": "${target.id}",
"namespace": "${target.namespace}",
"cluster": "${kafka.cluster.name}",
"pod": pod or None,
"topicFilter": ${pythonJsonExpr(topic)},
"groupFilter": ${pythonJsonExpr(group)},
"offsets": sorted(offsets, key=lambda item: (item["topic"], item["partition"])),
"groupOffsets": group_rows,
"valuesPrinted": False,
"stderrTail": (text("offsets.err") + text("group.err"))[-2000:],
}
print(json.dumps(payload, ensure_ascii=False, indent=2))
sys.exit(0 if payload["ok"] else 1)
PY
`;
}
function tailScript(kafka: PlatformKafkaConfig, target: KafkaTarget, topic: string, limit: number): string {
return `
set -u
tmp="$(mktemp -d)"
trap 'rm -rf "$tmp"' EXIT
pod="$(kubectl -n ${target.namespace} get pod -l strimzi.io/cluster=${kafka.cluster.name},strimzi.io/kind=Kafka -o jsonpath='{.items[0].metadata.name}' 2>"$tmp/pod.err" || true)"
if [ -n "$pod" ]; then
kubectl -n ${target.namespace} exec "$pod" -- bin/kafka-get-offsets.sh --bootstrap-server ${brokerBootstrap(kafka)} --topic ${shQuote(topic)} >"$tmp/offsets.out" 2>"$tmp/offsets.err"
offsets_rc=$?
tail_rc=0
: >"$tmp/messages.raw"
while IFS=: read -r topic_name partition end_offset; do
case "$partition" in ''|*[!0-9]*) continue ;; esac
case "$end_offset" in ''|*[!0-9]*) continue ;; esac
start=$(( end_offset > ${limit} ? end_offset - ${limit} : 0 ))
count=$(( end_offset - start ))
if [ "$count" -gt 0 ]; then
timeout ${Math.min(kafka.validation.timeoutSeconds, 20)} kubectl -n ${target.namespace} exec "$pod" -- bin/kafka-console-consumer.sh --bootstrap-server ${brokerBootstrap(kafka)} --topic ${shQuote(topic)} --partition "$partition" --offset "$start" --max-messages "$count" >>"$tmp/messages.raw" 2>>"$tmp/messages.err" || true
fi
done <"$tmp/offsets.out"
else
offsets_rc=1
tail_rc=1
printf '%s\\n' 'kafka broker pod not found' >"$tmp/offsets.err"
: >"$tmp/offsets.out" >"$tmp/messages.raw" >"$tmp/messages.err"
fi
python3 - "$tmp" "$pod" "$offsets_rc" "$tail_rc" <<'PY'
import hashlib, json, os, sys
tmp, pod, offsets_rc, tail_rc = sys.argv[1], sys.argv[2], int(sys.argv[3]), int(sys.argv[4])
def text(name, limit=2000):
try:
return open(os.path.join(tmp, name), encoding="utf-8", errors="replace").read()[-limit:]
except FileNotFoundError:
return ""
offsets = []
for line in text("offsets.out", 200000).splitlines():
parts = line.strip().split(":")
if len(parts) == 3:
offsets.append({"topic": parts[0], "partition": int(parts[1]), "endOffset": int(parts[2])})
messages = []
for index, line in enumerate(text("messages.raw", 200000).splitlines()):
encoded = line.encode()
messages.append({"index": index, "sha256": hashlib.sha256(encoded).hexdigest(), "bytes": len(encoded), "valuePrinted": False})
payload = {
"ok": offsets_rc == 0 and tail_rc == 0,
"target": "${target.id}",
"namespace": "${target.namespace}",
"cluster": "${kafka.cluster.name}",
"pod": pod or None,
"topic": "${topic}",
"limit": ${limit},
"offsets": offsets,
"messages": messages[-${limit}:],
"valuesPrinted": False,
"stderrTail": (text("offsets.err") + text("messages.err"))[-2000:],
}
print(json.dumps(payload, ensure_ascii=False, indent=2))
sys.exit(0 if payload["ok"] else 1)
PY
`;
}
function produceScript(kafka: PlatformKafkaConfig, target: KafkaTarget, topic: string, key: string | null, payload: string): string {
const keyValue = key ?? `${kafka.management.shadowProducerId}-${Date.now()}`;
const separator = "|";
if (keyValue.includes(separator) || keyValue.includes("\n")) throw new Error("kafka produce --key must not contain | or newline");
return `
set -u
tmp="$(mktemp -d)"
trap 'rm -rf "$tmp"' EXIT
pod="$(kubectl -n ${target.namespace} get pod -l strimzi.io/cluster=${kafka.cluster.name},strimzi.io/kind=Kafka -o jsonpath='{.items[0].metadata.name}' 2>"$tmp/pod.err" || true)"
cat >"$tmp/payload.json" <<'UNIDESK_KAFKA_SHADOW_PAYLOAD'
${payload}
UNIDESK_KAFKA_SHADOW_PAYLOAD
printf '%s' ${shQuote(keyValue)} >"$tmp/key.txt"
if [ -n "$pod" ]; then
printf '%s%s%s\\n' "$(cat "$tmp/key.txt")" ${shQuote(separator)} "$(cat "$tmp/payload.json")" | kubectl -n ${target.namespace} exec -i "$pod" -- bin/kafka-console-producer.sh --bootstrap-server ${brokerBootstrap(kafka)} --topic ${shQuote(topic)} --reader-property parse.key=true --reader-property key.separator=${shQuote(separator)} --command-property acks=all >"$tmp/produce.out" 2>"$tmp/produce.err"
produce_rc=$?
else
produce_rc=1
printf '%s\\n' 'kafka broker pod not found' >"$tmp/produce.err"
: >"$tmp/produce.out"
fi
python3 - "$tmp" "$pod" "$produce_rc" <<'PY'
import hashlib, json, os, sys
tmp, pod, produce_rc = sys.argv[1], sys.argv[2], int(sys.argv[3])
def text(name, limit=1200):
try:
return open(os.path.join(tmp, name), encoding="utf-8", errors="replace").read()[-limit:]
except FileNotFoundError:
return ""
payload = open(os.path.join(tmp, "payload.json"), encoding="utf-8").read()
key = open(os.path.join(tmp, "key.txt"), encoding="utf-8").read()
payload_obj = {
"ok": produce_rc == 0,
"target": "${target.id}",
"namespace": "${target.namespace}",
"cluster": "${kafka.cluster.name}",
"topic": "${topic}",
"pod": pod or None,
"keySha256": hashlib.sha256(key.encode()).hexdigest(),
"message": {
"sha256": hashlib.sha256(payload.encode()).hexdigest(),
"bytes": len(payload.encode()),
"valuesPrinted": False,
},
"producer": "${kafka.management.shadowProducerId}",
"mode": "shadow-produce-only",
"consume": {"enabled": False, "performed": False},
"steps": {"produce": {"exitCode": produce_rc, "stdoutTail": text("produce.out"), "stderrTail": text("produce.err")}},
"valuesPrinted": False,
}
print(json.dumps(payload_obj, ensure_ascii=False, indent=2))
sys.exit(0 if payload_obj["ok"] else 1)
PY
`;
}
function configSummary(kafka: PlatformKafkaConfig, target: KafkaTarget): Record<string, unknown> { function configSummary(kafka: PlatformKafkaConfig, target: KafkaTarget): Record<string, unknown> {
return { return {
configPath: configLabel, configPath: configLabel,
@@ -932,6 +1386,7 @@ function configSummary(kafka: PlatformKafkaConfig, target: KafkaTarget): Record<
cluster: clusterSummary(kafka, target), cluster: clusterSummary(kafka, target),
topics: kafka.topics.map((topic) => topicSummary(topic)), topics: kafka.topics.map((topic) => topicSummary(topic)),
clients: kafka.clients.map((client) => clientSummary(client)), clients: kafka.clients.map((client) => clientSummary(client)),
management: kafka.management,
}; };
} }
@@ -955,6 +1410,12 @@ function compactConfigSummary(kafka: PlatformKafkaConfig, target: KafkaTarget):
}, },
topicCount: kafka.topics.length, topicCount: kafka.topics.length,
clientCount: kafka.clients.length, clientCount: kafka.clients.length,
management: {
defaultShadowTopic: kafka.management.defaultShadowTopic,
defaultTailLimit: kafka.management.defaultTailLimit,
maxTailLimit: kafka.management.maxTailLimit,
shadowProducerId: kafka.management.shadowProducerId,
},
valuesPrinted: false, valuesPrinted: false,
}; };
} }
@@ -1026,7 +1487,8 @@ function policyChecks(kafka: PlatformKafkaConfig, target: KafkaTarget, manifest:
{ name: "no-public-exposure", ok: !/^\s*type:\s*(NodePort|LoadBalancer)\s*$/mu.test(manifest) && !/^\s*kind:\s*Ingress\s*$/mu.test(manifest), detail: "Kafka POC is ClusterIP/internal only." }, { name: "no-public-exposure", ok: !/^\s*type:\s*(NodePort|LoadBalancer)\s*$/mu.test(manifest) && !/^\s*kind:\s*Ingress\s*$/mu.test(manifest), detail: "Kafka POC is ClusterIP/internal only." },
{ name: "single-broker-poc", ok: kafka.cluster.replicas === 1, detail: "D518 POC uses one KRaft broker/controller; production HA remains out of scope." }, { name: "single-broker-poc", ok: kafka.cluster.replicas === 1, detail: "D518 POC uses one KRaft broker/controller; production HA remains out of scope." },
{ name: "allow-all-network-policy", ok: manifest.includes("kind: NetworkPolicy") && manifest.includes("name: allow-all") && manifest.includes(`namespace: ${target.namespace}`), detail: `NetworkPolicy/allow-all is rendered in ${target.namespace}.` }, { name: "allow-all-network-policy", ok: manifest.includes("kind: NetworkPolicy") && manifest.includes("name: allow-all") && manifest.includes(`namespace: ${target.namespace}`), detail: `NetworkPolicy/allow-all is rendered in ${target.namespace}.` },
{ name: "app-integration-off-for-p1", ok: kafka.defaults.switch.appIntegrationEnabled === false, detail: "P1 deploys runtime and smoke only; app producer/consumer switchover is a later stage." }, { name: "shadow-produce-enabled", ok: kafka.defaults.switch.shadowProduceEnabled === true, detail: "P2 may publish shadow events to Kafka." },
{ name: "shadow-consume-disabled", ok: kafka.defaults.switch.shadowConsumeEnabled === false && kafka.defaults.switch.appIntegrationEnabled === false, detail: "P2 does not enable Kafka consumer cutover or app integration." },
]; ];
} }
@@ -1088,7 +1550,7 @@ function renderPlan(result: Record<string, unknown>): RenderedCliResult {
` status: ${stringValue(next.status)}`, ` status: ${stringValue(next.status)}`,
` validate: ${stringValue(next.validate)}`, ` validate: ${stringValue(next.validate)}`,
"", "",
"Boundary: Kafka runtime is platform-infra only; HWLAB/AgentRun app integration remains disabled in P1.", "Boundary: Kafka runtime is platform-infra only; P2 allows shadow produce/query and keeps consumer cutover disabled.",
"Disclosure: Secret values are not printed; client entries show only object/key metadata.", "Disclosure: Secret values are not printed; client entries show only object/key metadata.",
]); ]);
} }
@@ -1120,11 +1582,90 @@ function renderStatus(result: Record<string, unknown>): RenderedCliResult {
"", "",
"PODS", "PODS",
...(pods.length === 0 ? ["-"] : table(["POD", "PHASE", "READY"], pods)), ...(pods.length === 0 ? ["-"] : table(["POD", "PHASE", "READY"], pods)),
...remoteErrorLines(result),
"", "",
`NEXT bun scripts/cli.ts platform-infra kafka validate --target ${stringValue(summary.target)}`, `NEXT bun scripts/cli.ts platform-infra kafka validate --target ${stringValue(summary.target)}`,
]); ]);
} }
function renderTopics(result: Record<string, unknown>): RenderedCliResult {
const query = record(result.query);
const topics = arrayRecords(query.topics).map((topic) => [
stringValue(topic.name),
boolText(topic.configured),
stringValue(topic.partitionCount, "0"),
stringValue(topic.replicationFactor, "0"),
stringValue(topic.configs),
]);
return rendered(result, "platform-infra kafka topics", [
"PLATFORM-INFRA KAFKA TOPICS",
...table(["TARGET", "NAMESPACE", "CLUSTER", "POD"], [[stringValue(query.target), stringValue(query.namespace), stringValue(query.cluster), stringValue(query.pod)]]),
"",
...(topics.length === 0 ? ["TOPICS -"] : table(["TOPIC", "YAML", "PARTITIONS", "RF", "CONFIGS"], topics)),
...remoteErrorLines(result),
"",
"Disclosure: message values and Secret values are not printed.",
]);
}
function renderGroups(result: Record<string, unknown>): RenderedCliResult {
const query = record(result.query);
const groups = arrayRecords(query.groups).length > 0 ? arrayRecords(query.groups).map((item) => [stringValue(item.name)]) : (Array.isArray(query.groups) ? query.groups.map((item) => [String(item)]) : []);
return rendered(result, "platform-infra kafka groups", [
"PLATFORM-INFRA KAFKA GROUPS",
...table(["TARGET", "NAMESPACE", "CLUSTER", "POD"], [[stringValue(query.target), stringValue(query.namespace), stringValue(query.cluster), stringValue(query.pod)]]),
"",
...(groups.length === 0 ? ["GROUPS -"] : table(["GROUP"], groups)),
...remoteErrorLines(result),
]);
}
function renderOffsets(result: Record<string, unknown>): RenderedCliResult {
const query = record(result.query);
const offsets = arrayRecords(query.offsets).map((item) => [stringValue(item.topic), stringValue(item.partition), stringValue(item.endOffset)]);
const groupOffsets = arrayRecords(query.groupOffsets).map((item) => [stringValue(item.group), stringValue(item.topic), stringValue(item.partition), stringValue(item.currentOffset), stringValue(item.logEndOffset), stringValue(item.lag)]);
return rendered(result, "platform-infra kafka offsets", [
"PLATFORM-INFRA KAFKA OFFSETS",
...table(["TARGET", "NAMESPACE", "CLUSTER", "POD"], [[stringValue(query.target), stringValue(query.namespace), stringValue(query.cluster), stringValue(query.pod)]]),
"",
"END OFFSETS",
...(offsets.length === 0 ? ["-"] : table(["TOPIC", "PARTITION", "END"], offsets)),
"",
"GROUP OFFSETS",
...(groupOffsets.length === 0 ? ["-"] : table(["GROUP", "TOPIC", "PARTITION", "CURRENT", "END", "LAG"], groupOffsets)),
...remoteErrorLines(result),
]);
}
function renderTail(result: Record<string, unknown>): RenderedCliResult {
const query = record(result.query);
const messages = arrayRecords(query.messages).map((item) => [stringValue(item.index), stringValue(item.sha256), stringValue(item.bytes), boolText(item.valuePrinted)]);
const offsets = arrayRecords(query.offsets).map((item) => [stringValue(item.partition), stringValue(item.endOffset)]);
return rendered(result, "platform-infra kafka tail", [
"PLATFORM-INFRA KAFKA TAIL",
...table(["TARGET", "TOPIC", "LIMIT", "POD"], [[stringValue(query.target), stringValue(query.topic), stringValue(query.limit), stringValue(query.pod)]]),
"",
"OFFSETS",
...(offsets.length === 0 ? ["-"] : table(["PARTITION", "END"], offsets)),
"",
"MESSAGES",
...(messages.length === 0 ? ["-"] : table(["INDEX", "SHA256", "BYTES", "VALUE_PRINTED"], messages)),
...remoteErrorLines(result),
"",
"Disclosure: values are hashed only; use application-side replay tools for payload inspection.",
]);
}
function remoteErrorLines(result: Record<string, unknown>): string[] {
if (result.ok !== false) return [];
const remote = record(result.remote);
const exitCode = stringValue(remote.exitCode);
const stderr = stringValue(remote.stderrTail).replace(/\s+/gu, " ").trim().slice(0, 220);
const stdout = stringValue(remote.stdoutTail).replace(/\s+/gu, " ").trim().slice(0, 220);
const detail = stderr !== "-" ? stderr : stdout;
return ["", "ERROR", ...table(["EXIT", "DETAIL"], [[exitCode, detail]])];
}
function rendered(result: Record<string, unknown>, command: string, lines: string[]): RenderedCliResult { function rendered(result: Record<string, unknown>, command: string, lines: string[]): RenderedCliResult {
return { ok: result.ok !== false, command, renderedText: lines.join("\n"), contentType: "text/plain" }; return { ok: result.ok !== false, command, renderedText: lines.join("\n"), contentType: "text/plain" };
} }
@@ -1141,7 +1682,7 @@ function parseApplyOptions(args: string[]): ApplyOptions {
else if (arg === "--wait") wait = true; else if (arg === "--wait") wait = true;
else { else {
commonArgs.push(arg); commonArgs.push(arg);
if (arg === "--target") { if (arg === "--target" || arg === "--node") {
commonArgs.push(args[index + 1] ?? ""); commonArgs.push(args[index + 1] ?? "");
index += 1; index += 1;
} }
@@ -1157,10 +1698,10 @@ function parseCommonOptions(args: string[]): CommonOptions {
let raw = false; let raw = false;
for (let index = 0; index < args.length; index += 1) { for (let index = 0; index < args.length; index += 1) {
const arg = args[index]; const arg = args[index];
if (arg === "--target") { if (arg === "--target" || arg === "--node") {
const value = args[index + 1]; const value = args[index + 1];
if (value === undefined || value.startsWith("--")) throw new Error("--target requires a value"); if (value === undefined || value.startsWith("--")) throw new Error(`${arg} requires a value`);
if (!/^[A-Za-z0-9._-]+$/u.test(value)) throw new Error("--target must be a simple target id"); if (!/^[A-Za-z0-9._-]+$/u.test(value)) throw new Error(`${arg} must be a simple target id`);
targetId = value; targetId = value;
index += 1; index += 1;
} else if (arg === "--full") { } else if (arg === "--full") {
@@ -1175,10 +1716,145 @@ function parseCommonOptions(args: string[]): CommonOptions {
return { targetId, full, raw }; return { targetId, full, raw };
} }
function parseInspectOptions(args: string[]): KafkaInspectOptions {
const commonArgs: string[] = [];
let topic: string | null = null;
let group: string | null = null;
let limit: number | null = null;
for (let index = 0; index < args.length; index += 1) {
const arg = args[index];
if (arg === "--topic") {
topic = parseTopicOption(args[index + 1], arg);
index += 1;
} else if (arg === "--group") {
group = parseSimpleOption(args[index + 1], arg);
index += 1;
} else if (arg === "--limit") {
const value = args[index + 1];
if (value === undefined || value.startsWith("--")) throw new Error("--limit requires a value");
limit = Number(value);
if (!Number.isInteger(limit) || limit < 1) throw new Error("--limit must be a positive integer");
index += 1;
} else {
commonArgs.push(arg);
if (arg === "--target" || arg === "--node") {
commonArgs.push(args[index + 1] ?? "");
index += 1;
}
}
}
return { ...parseCommonOptions(commonArgs), topic, group, limit };
}
async function parseProduceOptions(args: string[]): Promise<KafkaProduceOptions> {
const commonArgs: string[] = [];
let topic: string | null = null;
let key: string | null = null;
let source = "manual-cli";
let eventType = "platform-infra.kafka.shadow-produce.v1";
let payload: string | null = null;
for (let index = 0; index < args.length; index += 1) {
const arg = args[index];
if (arg === "--topic") {
topic = parseTopicOption(args[index + 1], arg);
index += 1;
} else if (arg === "--key") {
key = parseSimpleOption(args[index + 1], arg);
index += 1;
} else if (arg === "--source") {
source = parseSimpleOption(args[index + 1], arg);
index += 1;
} else if (arg === "--event-type") {
eventType = parseTopicOption(args[index + 1], arg);
index += 1;
} else if (arg === "--payload-stdin") {
payload = await Bun.stdin.text();
} else {
commonArgs.push(arg);
if (arg === "--target" || arg === "--node") {
commonArgs.push(args[index + 1] ?? "");
index += 1;
}
}
}
return { ...parseCommonOptions(commonArgs), topic, key, source, eventType, payload };
}
function bootstrapService(kafka: PlatformKafkaConfig, target: KafkaTarget): string { function bootstrapService(kafka: PlatformKafkaConfig, target: KafkaTarget): string {
return `${kafka.cluster.name}-kafka-bootstrap.${target.namespace}.svc.cluster.local:${kafka.cluster.listeners.plain.port}`; return `${kafka.cluster.name}-kafka-bootstrap.${target.namespace}.svc.cluster.local:${kafka.cluster.listeners.plain.port}`;
} }
function brokerBootstrap(kafka: PlatformKafkaConfig): string {
return `${kafka.cluster.name}-kafka-bootstrap:${kafka.cluster.listeners.plain.port}`;
}
function resolveTopic(kafka: PlatformKafkaConfig, topic: string | null, fallback: string | null): string | null {
const resolved = topic ?? fallback;
if (resolved === null) return null;
if (!kafka.topics.some((item) => item.name === resolved)) throw new Error(`unknown kafka topic ${resolved}; known topics: ${kafka.topics.map((item) => item.name).join(", ")}`);
return resolved;
}
function boundedLimit(kafka: PlatformKafkaConfig, limit: number | null): number {
const value = limit ?? kafka.management.defaultTailLimit;
if (!Number.isInteger(value) || value < 1) throw new Error("limit must be a positive integer");
if (value > kafka.management.maxTailLimit) throw new Error(`limit ${value} exceeds ${configLabel}.management.maxTailLimit=${kafka.management.maxTailLimit}`);
return value;
}
function buildShadowPayload(kafka: PlatformKafkaConfig, target: KafkaTarget, options: KafkaProduceOptions, topic: string): string {
let userPayload: unknown = null;
if (options.payload !== null) {
const trimmed = options.payload.trim();
if (trimmed.length > 0) {
try {
userPayload = JSON.parse(trimmed) as unknown;
} catch {
userPayload = { textSha256: sha256Hex(trimmed), valuesPrinted: false };
}
}
}
return JSON.stringify({
eventId: `evt_${target.id}_${Date.now()}_${Math.random().toString(16).slice(2)}`,
schemaVersion: 1,
eventType: options.eventType,
producer: kafka.management.shadowProducerId,
source: options.source,
node: target.id,
namespace: target.namespace,
topic,
mode: "shadow-produce-only",
consumeEnabled: false,
appIntegrationEnabled: kafka.defaults.switch.appIntegrationEnabled,
shadowProduceEnabled: kafka.defaults.switch.shadowProduceEnabled,
shadowConsumeEnabled: kafka.defaults.switch.shadowConsumeEnabled,
partitionKey: options.key ?? null,
occurredAt: new Date().toISOString(),
payload: userPayload ?? {
kind: "manual-shadow-produce",
spec: kafka.metadata.spec,
valuesPrinted: false,
},
redaction: { valuesPrinted: false },
});
}
function parseTopicOption(value: string | undefined, optionName: string): string {
if (value === undefined || value.startsWith("--")) throw new Error(`${optionName} requires a value`);
if (!/^[A-Za-z0-9._-]+$/u.test(value) || value.includes("..")) throw new Error(`${optionName} has an unsupported Kafka topic/event name`);
return value;
}
function parseSimpleOption(value: string | undefined, optionName: string): string {
if (value === undefined || value.startsWith("--")) throw new Error(`${optionName} requires a value`);
if (!/^[A-Za-z0-9._:/=-]+$/u.test(value)) throw new Error(`${optionName} must be a simple token`);
return value;
}
function pythonJsonExpr(value: unknown): string {
return `json.loads(${JSON.stringify(JSON.stringify(value))})`;
}
function topicNameField(record: Record<string, unknown>, key: string, path: string): string { function topicNameField(record: Record<string, unknown>, key: string, path: string): string {
const value = y.stringField(record, key, path); const value = y.stringField(record, key, path);
if (!/^[A-Za-z0-9._-]+$/u.test(value) || value.includes("..")) throw new Error(`${configLabel}.${path}.${key} has an unsupported Kafka topic name`); if (!/^[A-Za-z0-9._-]+$/u.test(value) || value.includes("..")) throw new Error(`${configLabel}.${path}.${key} has an unsupported Kafka topic name`);
+5
View File
@@ -370,6 +370,11 @@ export function platformInfraHelp(): unknown {
"bun scripts/cli.ts platform-infra kafka apply --target D518 --confirm", "bun scripts/cli.ts platform-infra kafka apply --target D518 --confirm",
"bun scripts/cli.ts platform-infra kafka status --target D518", "bun scripts/cli.ts platform-infra kafka status --target D518",
"bun scripts/cli.ts platform-infra kafka validate --target D518", "bun scripts/cli.ts platform-infra kafka validate --target D518",
"bun scripts/cli.ts platform-infra kafka topics --node D518",
"bun scripts/cli.ts platform-infra kafka groups --node D518",
"bun scripts/cli.ts platform-infra kafka offsets --node D518 --topic hwlab.agentrun.command.v1",
"bun scripts/cli.ts platform-infra kafka tail --node D518 --topic hwlab.agentrun.command.v1 --limit 5",
"bun scripts/cli.ts platform-infra kafka produce --node D518 --topic hwlab.agentrun.command.v1 --key <trace-or-session>",
], ],
description: "Operate YAML-controlled platform-infra services such as Sub2API, LangBot, n8n, WeChat archive workflows, OpenTelemetry tracing, the independent target-scoped secret plane, and the D518 Kafka event bus. Public services use PK01 Caddy+FRP rather than Kubernetes Ingress, NodePort, or LoadBalancer.", description: "Operate YAML-controlled platform-infra services such as Sub2API, LangBot, n8n, WeChat archive workflows, OpenTelemetry tracing, the independent target-scoped secret plane, and the D518 Kafka event bus. Public services use PK01 Caddy+FRP rather than Kubernetes Ingress, NodePort, or LoadBalancer.",
target, target,