From e8ca6ff009ecf76b0a51bc790f78356f4122dc57 Mon Sep 17 00:00:00 2001 From: Codex Date: Sun, 28 Jun 2026 09:28:38 +0000 Subject: [PATCH] feat: add platform-infra kafka event bus poc --- config/platform-infra/kafka.yaml | 118 +++ docs/reference/platform-infra.md | 9 + scripts/src/platform-infra-kafka.ts | 1220 +++++++++++++++++++++++++ scripts/src/platform-infra/entry.ts | 9 +- scripts/src/platform-infra/options.ts | 4 + 5 files changed, 1358 insertions(+), 2 deletions(-) create mode 100644 config/platform-infra/kafka.yaml create mode 100644 scripts/src/platform-infra-kafka.ts diff --git a/config/platform-infra/kafka.yaml b/config/platform-infra/kafka.yaml new file mode 100644 index 00000000..8e92db9a --- /dev/null +++ b/config/platform-infra/kafka.yaml @@ -0,0 +1,118 @@ +version: 1 +kind: platform-infra-kafka + +metadata: + id: kafka-event-bus + owner: unidesk + spec: pikasTech/HWLAB#2243 + relatedIssues: + - 2243 + +defaults: + targetId: D518 + switch: + enabled: true + mode: runtime-poc + appIntegrationEnabled: false + +operator: + implementation: strimzi + version: 1.1.0 + manifestUrl: https://github.com/strimzi/strimzi-kafka-operator/releases/download/1.1.0/strimzi-cluster-operator-1.1.0.yaml + deploymentName: strimzi-cluster-operator + serviceAccountName: strimzi-cluster-operator + crds: + - kafkas.kafka.strimzi.io + - kafkanodepools.kafka.strimzi.io + - kafkatopics.kafka.strimzi.io + - kafkausers.kafka.strimzi.io + +targets: + - id: D518 + route: D518:k3s + namespace: platform-infra + role: active + enabled: true + createNamespace: true + storageClassName: local-path + +cluster: + name: platform-infra-kafka + nodePoolName: dual-role + kafkaVersion: 4.3.0 + metadataVersion: 4.3-IV0 + replicas: 1 + storage: + size: 4Gi + deleteClaim: false + listeners: + plain: + enabled: true + port: 9092 + tls: + enabled: true + port: 9093 + authorization: + enabled: false + mode: deferred-app-integration + +topics: + - name: hwlab.agentrun.command.v1 + partitions: 1 + replicas: 1 + retentionMs: 604800000 + cleanupPolicy: delete + description: HWLAB v0.3 admitted user commands for AgentRun v0.2. + - name: agentrun.hwlab.event.v1 + partitions: 1 + replicas: 1 + retentionMs: 604800000 + cleanupPolicy: delete + description: AgentRun v0.2 run/command events for HWLAB v0.3 ingestion. + - name: hwlab.agentrun.command.dlq.v1 + partitions: 1 + replicas: 1 + retentionMs: 1209600000 + cleanupPolicy: delete + description: Dead-letter topic for HWLAB to AgentRun command delivery. + - name: agentrun.hwlab.event.dlq.v1 + partitions: 1 + replicas: 1 + retentionMs: 1209600000 + cleanupPolicy: delete + description: Dead-letter topic for AgentRun to HWLAB event delivery. + - name: platform-infra.kafka.smoke.v1 + partitions: 1 + replicas: 1 + retentionMs: 86400000 + cleanupPolicy: delete + description: Platform-infra Kafka broker smoke topic. + +clients: + - id: hwlab-v03-cloud-api + namespace: hwlab-v03 + serviceAccountName: hwlab-cloud-api + kafkaUserName: hwlab-v03-cloud-api + secretName: hwlab-v03-cloud-api + produceTopics: + - hwlab.agentrun.command.v1 + consumeTopics: + - agentrun.hwlab.event.v1 + dlqTopics: + - agentrun.hwlab.event.dlq.v1 + - id: agentrun-v02-manager + namespace: agentrun-v02 + serviceAccountName: agentrun-manager + kafkaUserName: agentrun-v02-manager + secretName: agentrun-v02-manager + produceTopics: + - agentrun.hwlab.event.v1 + consumeTopics: + - hwlab.agentrun.command.v1 + dlqTopics: + - hwlab.agentrun.command.dlq.v1 + +validation: + timeoutSeconds: 45 + pollSeconds: 3 + smokeTopic: platform-infra.kafka.smoke.v1 diff --git a/docs/reference/platform-infra.md b/docs/reference/platform-infra.md index 508866be..9e77f1e7 100644 --- a/docs/reference/platform-infra.md +++ b/docs/reference/platform-infra.md @@ -33,6 +33,15 @@ - External platform PostgreSQL endpoints for Sub2API are produced by the platform DB YAML and its `platform-db postgres` CLI. Cross-node Sub2API consumers connect directly to that endpoint; the master server is not a PostgreSQL data-plane relay. DNS aliases are optional when the exported `DATABASE_URL` uses a reachable IP with `sslmode=require`; current PK01-specific rules live in `docs/reference/pk01.md`. - Sub2API account sentinel, public exposure, and HTTPS egress proxy are target-scoped YAML decisions. The active target may run them when YAML enables them; the standby G14 target must stay deployed but inactive until YAML promotion. `sentinel.enabledOnTargets` is the authority for where Codex-pool sentinel image, CronJob, Secret and state resources are expected; disabled targets should report sentinel validation as skipped instead of failing on missing runtime sentinel objects. Do not create a second sentinel, FRP client, public management surface, or edge proxy by hand; enable or move those resources only through the target YAML and the `platform-infra sub2api` / `codex-pool --target` CLI paths. +## Kafka Event Bus Boundary + +- Kafka for the HWLAB v0.3 / AgentRun v0.2 event-bus POC is a UniDesk-operated platform service in namespace `platform-infra`. It is not owned by `hwlab-v03`, `agentrun-v02`, a per-lane Kafka namespace, or a service repository deployment file. +- The canonical source of truth is `config/platform-infra/kafka.yaml`; target, namespace, Strimzi release URL, cluster name, storage class/size, topic list, client declarations, DLQ names, runtime switch and validation smoke topic must stay in that YAML. Current version numbers and retention values belong only in YAML, not in this reference. +- The canonical entrypoint is `bun scripts/cli.ts platform-infra kafka plan|apply|status|validate --target D518`. Formal mutation must use that path; raw `kubectl` is bounded diagnosis only. +- HWLAB v0.3 and AgentRun v0.2 are client namespaces. They may later consume YAML-declared Kafka bootstrap, user Secret metadata and topic contracts, but app producer/consumer switchover must be a separate HWLAB/AgentRun implementation stage. Runtime readiness alone does not prove Workbench projection, SSE or AgentRun command ingestion has migrated. +- The first POC is a single-node KRaft broker for observability, ordering and replay investigation. It improves auditability and smoke coverage, but it is not a production high-availability claim; replication, backup, min ISR and app-side transactional inbox/outbox are separate decisions. +- Kafka must stay ClusterIP-only by default. Do not add Ingress, NodePort, LoadBalancer, host networking, public FRP, or browser-facing Kafka access unless a later YAML-controlled platform decision explicitly changes that boundary. + ## LangBot Deployment Boundary - LangBot is a UniDesk-operated public platform service in namespace `platform-infra`. The canonical entrypoint is `bun scripts/cli.ts platform-infra langbot plan|apply|status|logs|validate|bootstrap-api-key|query`; G14 is the default runtime target. diff --git a/scripts/src/platform-infra-kafka.ts b/scripts/src/platform-infra-kafka.ts new file mode 100644 index 00000000..efa1b681 --- /dev/null +++ b/scripts/src/platform-infra-kafka.ts @@ -0,0 +1,1220 @@ +import type { UniDeskConfig } from "./config"; +import { rootPath } from "./config"; +import type { RenderedCliResult } from "./output"; +import { + capture, + compactCapture, + createYamlFieldReader, + parseJsonOutput, + readYamlRecord, + sha256Hex, +} from "./platform-infra-ops-library"; + +const configFile = rootPath("config", "platform-infra", "kafka.yaml"); +const configLabel = "config/platform-infra/kafka.yaml"; +const fieldManager = "unidesk-platform-infra-kafka"; +const y = createYamlFieldReader(configLabel); + +interface KafkaTarget { + id: string; + route: string; + namespace: string; + role: "active" | "standby"; + enabled: boolean; + createNamespace: boolean; + storageClassName: string; +} + +interface KafkaTopicSpec { + name: string; + partitions: number; + replicas: number; + retentionMs: number; + cleanupPolicy: "delete" | "compact"; + description: string; +} + +interface KafkaClientSpec { + id: string; + namespace: string; + serviceAccountName: string; + kafkaUserName: string; + secretName: string; + produceTopics: string[]; + consumeTopics: string[]; + dlqTopics: string[]; +} + +interface PlatformKafkaConfig { + version: number; + kind: "platform-infra-kafka"; + metadata: { + id: string; + owner: string; + spec: string; + relatedIssues: number[]; + }; + defaults: { + targetId: string; + switch: { + enabled: boolean; + mode: string; + appIntegrationEnabled: boolean; + }; + }; + operator: { + implementation: "strimzi"; + version: string; + manifestUrl: string; + deploymentName: string; + serviceAccountName: string; + crds: string[]; + }; + targets: KafkaTarget[]; + cluster: { + name: string; + nodePoolName: string; + kafkaVersion: string; + metadataVersion: string; + replicas: number; + storage: { + size: string; + deleteClaim: boolean; + }; + listeners: { + plain: { enabled: boolean; port: number }; + tls: { enabled: boolean; port: number }; + }; + authorization: { + enabled: boolean; + mode: string; + }; + }; + topics: KafkaTopicSpec[]; + clients: KafkaClientSpec[]; + validation: { + timeoutSeconds: number; + pollSeconds: number; + smokeTopic: string; + }; +} + +interface CommonOptions { + targetId: string | null; + full: boolean; + raw: boolean; +} + +interface ApplyOptions extends CommonOptions { + dryRun: boolean; + confirm: boolean; + wait: boolean; +} + +export async function runPlatformInfraKafkaCommand(config: UniDeskConfig, args: string[]): Promise | RenderedCliResult> { + const [action = "plan"] = args; + if (action === "plan") { + const options = parseCommonOptions(args.slice(1)); + const result = plan(options); + return options.full || options.raw ? result : renderPlan(result); + } + if (action === "apply") return await apply(config, parseApplyOptions(args.slice(1))); + if (action === "status") { + const options = parseCommonOptions(args.slice(1)); + const result = await status(config, options); + return options.full || options.raw ? result : renderStatus(result); + } + if (action === "validate") return await validate(config, parseCommonOptions(args.slice(1))); + return { + ok: false, + error: "unsupported-platform-infra-kafka-command", + args, + help: kafkaHelp(), + }; +} + +export function kafkaHelp(): Record { + return { + command: "platform-infra kafka plan|apply|status|validate", + configTruth: configLabel, + usage: [ + "bun scripts/cli.ts platform-infra kafka plan --target D518", + "bun scripts/cli.ts platform-infra kafka apply --target D518 --dry-run", + "bun scripts/cli.ts platform-infra kafka apply --target D518 --confirm", + "bun scripts/cli.ts platform-infra kafka status --target D518 [--full|--raw]", + "bun scripts/cli.ts platform-infra kafka validate --target D518 [--full|--raw]", + ], + boundary: "Kafka runtime belongs to platform-infra. HWLAB v0.3 and AgentRun v0.2 are client namespaces only.", + }; +} + +function readKafkaConfig(): PlatformKafkaConfig { + const root = readYamlRecord>(configFile, "platform-infra-kafka"); + const version = y.integerField(root, "version", ""); + if (version !== 1) throw new Error(`${configLabel}.version must be 1`); + const metadata = y.objectField(root, "metadata", ""); + const defaults = y.objectField(root, "defaults", ""); + const switchRecord = y.objectField(defaults, "switch", "defaults"); + const operator = y.objectField(root, "operator", ""); + const cluster = y.objectField(root, "cluster", ""); + const storage = y.objectField(cluster, "storage", "cluster"); + const listeners = y.objectField(cluster, "listeners", "cluster"); + const plain = y.objectField(listeners, "plain", "cluster.listeners"); + const tls = y.objectField(listeners, "tls", "cluster.listeners"); + const authorization = y.objectField(cluster, "authorization", "cluster"); + const validation = y.objectField(root, "validation", ""); + const parsed: PlatformKafkaConfig = { + version, + kind: "platform-infra-kafka", + metadata: { + id: y.stringField(metadata, "id", "metadata"), + owner: y.stringField(metadata, "owner", "metadata"), + spec: y.stringField(metadata, "spec", "metadata"), + relatedIssues: y.numberArrayField(metadata, "relatedIssues", "metadata"), + }, + defaults: { + targetId: y.stringField(defaults, "targetId", "defaults"), + switch: { + enabled: y.booleanField(switchRecord, "enabled", "defaults.switch"), + mode: y.stringField(switchRecord, "mode", "defaults.switch"), + appIntegrationEnabled: y.booleanField(switchRecord, "appIntegrationEnabled", "defaults.switch"), + }, + }, + operator: { + implementation: y.enumField(operator, "implementation", "operator", ["strimzi"] as const), + version: y.stringField(operator, "version", "operator"), + manifestUrl: y.httpsUrlField(operator, "manifestUrl", "operator"), + deploymentName: y.kubernetesNameField(operator, "deploymentName", "operator"), + serviceAccountName: y.kubernetesNameField(operator, "serviceAccountName", "operator"), + crds: y.stringArrayField(operator, "crds", "operator"), + }, + targets: y.arrayOfRecords(root.targets, "targets").map(parseTarget), + cluster: { + name: y.kubernetesNameField(cluster, "name", "cluster"), + nodePoolName: y.kubernetesNameField(cluster, "nodePoolName", "cluster"), + kafkaVersion: y.stringField(cluster, "kafkaVersion", "cluster"), + metadataVersion: y.stringField(cluster, "metadataVersion", "cluster"), + replicas: y.integerField(cluster, "replicas", "cluster"), + storage: { + size: y.stringField(storage, "size", "cluster.storage"), + deleteClaim: y.booleanField(storage, "deleteClaim", "cluster.storage"), + }, + listeners: { + plain: { enabled: y.booleanField(plain, "enabled", "cluster.listeners.plain"), port: y.portField(plain, "port", "cluster.listeners.plain") }, + tls: { enabled: y.booleanField(tls, "enabled", "cluster.listeners.tls"), port: y.portField(tls, "port", "cluster.listeners.tls") }, + }, + authorization: { + enabled: y.booleanField(authorization, "enabled", "cluster.authorization"), + mode: y.stringField(authorization, "mode", "cluster.authorization"), + }, + }, + topics: y.arrayOfRecords(root.topics, "topics").map(parseTopic), + clients: y.arrayOfRecords(root.clients, "clients").map(parseClient), + validation: { + timeoutSeconds: y.integerField(validation, "timeoutSeconds", "validation"), + pollSeconds: y.integerField(validation, "pollSeconds", "validation"), + smokeTopic: y.stringField(validation, "smokeTopic", "validation"), + }, + }; + validateConfig(parsed); + return parsed; +} + +function parseTarget(record: Record, index: number): KafkaTarget { + const path = `targets[${index}]`; + return { + id: y.stringField(record, "id", path), + route: y.stringField(record, "route", path), + namespace: y.kubernetesNameField(record, "namespace", path), + role: y.enumField(record, "role", path, ["active", "standby"] as const), + enabled: y.booleanField(record, "enabled", path), + createNamespace: y.booleanField(record, "createNamespace", path), + storageClassName: y.stringField(record, "storageClassName", path), + }; +} + +function parseTopic(record: Record, index: number): KafkaTopicSpec { + const path = `topics[${index}]`; + return { + name: topicNameField(record, "name", path), + partitions: positiveInteger(record, "partitions", path), + replicas: positiveInteger(record, "replicas", path), + retentionMs: positiveInteger(record, "retentionMs", path), + cleanupPolicy: y.enumField(record, "cleanupPolicy", path, ["delete", "compact"] as const), + description: y.stringField(record, "description", path), + }; +} + +function parseClient(record: Record, index: number): KafkaClientSpec { + const path = `clients[${index}]`; + return { + id: y.stringField(record, "id", path), + namespace: y.kubernetesNameField(record, "namespace", path), + serviceAccountName: y.kubernetesNameField(record, "serviceAccountName", path), + kafkaUserName: y.kubernetesNameField(record, "kafkaUserName", path), + secretName: y.kubernetesNameField(record, "secretName", path), + produceTopics: topicArray(record, "produceTopics", path), + consumeTopics: topicArray(record, "consumeTopics", path), + dlqTopics: topicArray(record, "dlqTopics", path), + }; +} + +function validateConfig(kafka: PlatformKafkaConfig): void { + resolveTarget(kafka, kafka.defaults.targetId); + if (!/^https:\/\/github\.com\/strimzi\/strimzi-kafka-operator\/releases\/download\//u.test(kafka.operator.manifestUrl)) { + throw new Error(`${configLabel}.operator.manifestUrl must be an official Strimzi GitHub release URL`); + } + if (kafka.cluster.replicas !== 1) throw new Error(`${configLabel}.cluster.replicas must be 1 for the D518 POC`); + if (!/^[0-9]+Gi$/u.test(kafka.cluster.storage.size)) throw new Error(`${configLabel}.cluster.storage.size must be a Gi quantity`); + if (!kafka.topics.some((topic) => topic.name === kafka.validation.smokeTopic)) throw new Error(`${configLabel}.validation.smokeTopic must reference one of topics[].name`); + const topicNames = new Set(); + for (const topic of kafka.topics) { + if (topicNames.has(topic.name)) throw new Error(`${configLabel}.topics contains duplicate topic ${topic.name}`); + topicNames.add(topic.name); + } + const clientIds = new Set(); + for (const client of kafka.clients) { + if (clientIds.has(client.id)) throw new Error(`${configLabel}.clients contains duplicate id ${client.id}`); + clientIds.add(client.id); + for (const topic of [...client.produceTopics, ...client.consumeTopics, ...client.dlqTopics]) { + if (!topicNames.has(topic)) throw new Error(`${configLabel}.clients.${client.id} references unknown topic ${topic}`); + } + } + if (kafka.validation.timeoutSeconds < 5 || kafka.validation.timeoutSeconds > 55) throw new Error(`${configLabel}.validation.timeoutSeconds must be between 5 and 55`); + if (kafka.validation.pollSeconds < 1 || kafka.validation.pollSeconds > kafka.validation.timeoutSeconds) throw new Error(`${configLabel}.validation.pollSeconds must be between 1 and timeoutSeconds`); +} + +function resolveTarget(kafka: PlatformKafkaConfig, targetId: string | null): KafkaTarget { + const resolved = targetId ?? kafka.defaults.targetId; + const target = kafka.targets.find((item) => item.id.toLowerCase() === resolved.toLowerCase()); + if (target === undefined) throw new Error(`unknown kafka target ${resolved}; known targets: ${kafka.targets.map((item) => item.id).join(", ")}`); + if (!target.enabled) throw new Error(`kafka target ${target.id} is disabled in ${configLabel}`); + return target; +} + +function plan(options: CommonOptions): Record { + const kafka = readKafkaConfig(); + const target = resolveTarget(kafka, options.targetId); + const manifest = renderKafkaManifest(kafka, target); + const policy = policyChecks(kafka, target, manifest); + return { + ok: policy.every((check) => check.ok), + action: "platform-infra-kafka-plan", + mutation: false, + config: configSummary(kafka, target), + renderPlan: { + target: targetSummary(target), + objects: manifestObjectSummary(manifest), + operator: operatorSummary(kafka, target), + cluster: clusterSummary(kafka, target), + topics: kafka.topics.map((topic) => topicSummary(topic)), + clients: kafka.clients.map((client) => clientSummary(client)), + }, + policy, + next: { + dryRun: `bun scripts/cli.ts platform-infra kafka apply --target ${target.id} --dry-run`, + apply: `bun scripts/cli.ts platform-infra kafka apply --target ${target.id} --confirm`, + status: `bun scripts/cli.ts platform-infra kafka status --target ${target.id}`, + validate: `bun scripts/cli.ts platform-infra kafka validate --target ${target.id}`, + }, + }; +} + +async function apply(config: UniDeskConfig, options: ApplyOptions): Promise> { + const kafka = readKafkaConfig(); + const target = resolveTarget(kafka, options.targetId); + const manifest = renderKafkaManifest(kafka, target); + const policy = policyChecks(kafka, target, manifest); + if (!policy.every((check) => check.ok)) return { ok: false, action: "platform-infra-kafka-apply", mode: "policy-blocked", mutation: false, policy }; + const operator = await fetchOperatorManifest(kafka, target); + if (operator.ok !== true) { + return { + ok: false, + action: "platform-infra-kafka-apply", + mode: "operator-manifest-fetch-failed", + mutation: false, + target: targetSummary(target), + config: compactConfigSummary(kafka, target), + policy, + operator, + }; + } + const result = await capture(config, target.route, ["sh"], applyScript(kafka, target, manifest, operator, options)); + const parsed = parseJsonOutput(result.stdout); + return { + ok: result.exitCode === 0 && parsed?.ok === true, + action: "platform-infra-kafka-apply", + mode: options.dryRun ? "dry-run" : "confirmed", + mutation: !options.dryRun, + target: targetSummary(target), + config: compactConfigSummary(kafka, target), + policy, + remote: parsed ?? compactCapture(result, { full: true }), + next: { + status: `bun scripts/cli.ts platform-infra kafka status --target ${target.id}`, + validate: `bun scripts/cli.ts platform-infra kafka validate --target ${target.id}`, + }, + }; +} + +async function status(config: UniDeskConfig, options: CommonOptions): Promise> { + const kafka = readKafkaConfig(); + const target = resolveTarget(kafka, options.targetId); + const result = await capture(config, target.route, ["sh"], statusScript(kafka, target, options.full)); + const parsed = parseJsonOutput(result.stdout); + const summary = parsed === null ? null : statusSummary(parsed); + return { + ok: result.exitCode === 0 && summary?.ready === true, + action: "platform-infra-kafka-status", + mutation: false, + target: targetSummary(target), + config: configSummary(kafka, target), + summary, + remote: options.raw ? parsed : options.full ? parsed : summary ?? compactCapture(result, { full: true }), + next: { + plan: `bun scripts/cli.ts platform-infra kafka plan --target ${target.id}`, + apply: `bun scripts/cli.ts platform-infra kafka apply --target ${target.id} --confirm`, + validate: `bun scripts/cli.ts platform-infra kafka validate --target ${target.id}`, + }, + }; +} + +async function validate(config: UniDeskConfig, options: CommonOptions): Promise> { + const kafka = readKafkaConfig(); + const target = resolveTarget(kafka, options.targetId); + const result = await capture(config, target.route, ["sh"], validateScript(kafka, target, options.full)); + const parsed = parseJsonOutput(result.stdout); + return { + ok: result.exitCode === 0 && parsed?.ok === true, + action: "platform-infra-kafka-validate", + mutation: true, + target: targetSummary(target), + config: compactConfigSummary(kafka, target), + validation: parsed ?? null, + remote: options.raw ? parsed : compactCapture(result, { full: options.full || result.exitCode !== 0 }), + }; +} + +function renderKafkaManifest(kafka: PlatformKafkaConfig, target: KafkaTarget): string { + const listeners = [ + kafka.cluster.listeners.plain.enabled + ? ` - name: plain + port: ${kafka.cluster.listeners.plain.port} + type: internal + tls: false` + : "", + kafka.cluster.listeners.tls.enabled + ? ` - name: tls + port: ${kafka.cluster.listeners.tls.port} + type: internal + tls: true` + : "", + ].filter(Boolean).join("\n"); + const topicDocs = kafka.topics.map((topic) => kafkaTopicManifest(kafka, target, topic)).join("---\n"); + const userDocs = kafka.clients.map((client) => kafkaUserManifest(kafka, target, client)).join("---\n"); + return `apiVersion: v1 +kind: Namespace +metadata: + name: ${target.namespace} + labels: + app.kubernetes.io/name: platform-infra + app.kubernetes.io/managed-by: unidesk + unidesk.ai/runtime-node: ${target.id} +--- +apiVersion: networking.k8s.io/v1 +kind: NetworkPolicy +metadata: + name: allow-all + namespace: ${target.namespace} + labels: + app.kubernetes.io/name: platform-infra + app.kubernetes.io/part-of: platform-infra + app.kubernetes.io/managed-by: unidesk +spec: + podSelector: {} + policyTypes: + - Ingress + - Egress + ingress: + - {} + egress: + - {} +--- +apiVersion: kafka.strimzi.io/v1 +kind: KafkaNodePool +metadata: + name: ${kafka.cluster.nodePoolName} + namespace: ${target.namespace} + labels: + strimzi.io/cluster: ${kafka.cluster.name} + app.kubernetes.io/name: ${kafka.cluster.name} + app.kubernetes.io/component: kafka-node-pool + app.kubernetes.io/part-of: platform-infra + app.kubernetes.io/managed-by: unidesk +spec: + replicas: ${kafka.cluster.replicas} + roles: + - controller + - broker + storage: + type: jbod + volumes: + - id: 0 + type: persistent-claim + size: ${kafka.cluster.storage.size} + class: ${target.storageClassName} + deleteClaim: ${kafka.cluster.storage.deleteClaim} + kraftMetadata: shared +--- +apiVersion: kafka.strimzi.io/v1 +kind: Kafka +metadata: + name: ${kafka.cluster.name} + namespace: ${target.namespace} + labels: + app.kubernetes.io/name: ${kafka.cluster.name} + app.kubernetes.io/component: kafka + app.kubernetes.io/part-of: platform-infra + app.kubernetes.io/managed-by: unidesk + unidesk.ai/runtime-node: ${target.id} + annotations: + unidesk.ai/spec: ${kafka.metadata.spec} + unidesk.ai/app-integration-enabled: "${kafka.defaults.switch.appIntegrationEnabled}" +spec: + kafka: + version: ${kafka.cluster.kafkaVersion} + metadataVersion: ${kafka.cluster.metadataVersion} + listeners: +${listeners} + config: + offsets.topic.replication.factor: 1 + transaction.state.log.replication.factor: 1 + transaction.state.log.min.isr: 1 + default.replication.factor: 1 + min.insync.replicas: 1 + auto.create.topics.enable: false + entityOperator: + topicOperator: {} + userOperator: {} +--- +${topicDocs}${userDocs}`; +} + +function kafkaTopicManifest(kafka: PlatformKafkaConfig, target: KafkaTarget, topic: KafkaTopicSpec): string { + return `apiVersion: kafka.strimzi.io/v1 +kind: KafkaTopic +metadata: + name: ${topicResourceName(topic.name)} + namespace: ${target.namespace} + labels: + strimzi.io/cluster: ${kafka.cluster.name} + app.kubernetes.io/name: ${topicResourceName(topic.name)} + app.kubernetes.io/component: kafka-topic + app.kubernetes.io/part-of: platform-infra + app.kubernetes.io/managed-by: unidesk + annotations: + unidesk.ai/topic-name: ${topic.name} + unidesk.ai/description: ${quoteAnnotation(topic.description)} +spec: + topicName: ${topic.name} + partitions: ${topic.partitions} + replicas: ${topic.replicas} + config: + retention.ms: ${topic.retentionMs} + cleanup.policy: ${topic.cleanupPolicy} +`; +} + +function kafkaUserManifest(kafka: PlatformKafkaConfig, target: KafkaTarget, client: KafkaClientSpec): string { + const authorization = kafka.cluster.authorization.enabled + ? ` authorization: + type: simple + acls: +${clientAcls(client)}` + : ""; + return `apiVersion: kafka.strimzi.io/v1 +kind: KafkaUser +metadata: + name: ${client.kafkaUserName} + namespace: ${target.namespace} + labels: + strimzi.io/cluster: ${kafka.cluster.name} + app.kubernetes.io/name: ${client.kafkaUserName} + app.kubernetes.io/component: kafka-client + app.kubernetes.io/part-of: platform-infra + app.kubernetes.io/managed-by: unidesk + annotations: + unidesk.ai/client-id: ${client.id} + unidesk.ai/client-namespace: ${client.namespace} + unidesk.ai/client-service-account: ${client.serviceAccountName} + unidesk.ai/secret-target: ${client.namespace}/${client.secretName} +spec: + authentication: + type: tls +${authorization}`; +} + +function clientAcls(client: KafkaClientSpec): string { + const lines: string[] = []; + for (const topic of client.produceTopics) { + lines.push(` - resource: + type: topic + name: ${topic} + patternType: literal + operations: + - Write + - Describe`); + } + for (const topic of client.consumeTopics) { + lines.push(` - resource: + type: topic + name: ${topic} + patternType: literal + operations: + - Read + - Describe`); + } + for (const topic of client.dlqTopics) { + lines.push(` - resource: + type: topic + name: ${topic} + patternType: literal + operations: + - Write + - Describe`); + } + lines.push(` - resource: + type: group + name: ${client.id} + patternType: prefix + operations: + - Read`); + return lines.join("\n"); +} + +async function fetchOperatorManifest(kafka: PlatformKafkaConfig, target: KafkaTarget): Promise & { ok: boolean; manifest?: string }> { + const controller = new AbortController(); + const timer = setTimeout(() => controller.abort(), 30_000); + try { + const response = await fetch(kafka.operator.manifestUrl, { signal: controller.signal }); + if (!response.ok) { + return { + ok: false, + url: kafka.operator.manifestUrl, + status: response.status, + statusText: response.statusText, + }; + } + const source = await response.text(); + const manifest = source.replaceAll("namespace: myproject", `namespace: ${target.namespace}`); + if (!manifest.includes("kind: CustomResourceDefinition") || !manifest.includes(`namespace: ${target.namespace}`)) { + return { + ok: false, + url: kafka.operator.manifestUrl, + error: "unexpected-strimzi-operator-manifest-shape", + bytes: Buffer.byteLength(source, "utf8"), + }; + } + return { + ok: true, + url: kafka.operator.manifestUrl, + bytes: Buffer.byteLength(source, "utf8"), + sha256: sha256Hex(source), + namespaceRewrite: target.namespace, + manifest, + }; + } catch (error) { + return { + ok: false, + url: kafka.operator.manifestUrl, + error: error instanceof Error ? error.message : String(error), + }; + } finally { + clearTimeout(timer); + } +} + +function applyScript(kafka: PlatformKafkaConfig, target: KafkaTarget, manifest: string, operator: Record & { manifest?: string }, options: ApplyOptions): string { + const dryRun = options.dryRun; + const mode = dryRun ? "dry-run" : "confirmed"; + const dryServer = dryRun ? "--dry-run=server " : ""; + const dryClient = dryRun ? "--dry-run=client " : ""; + const stepTimeoutSeconds = Math.max(5, Math.min(kafka.validation.timeoutSeconds, 20)); + const checkTimeoutSeconds = Math.max(5, Math.min(kafka.validation.pollSeconds, 10)); + const operatorManifest = operator.manifest ?? ""; + return ` +set -u +tmp="$(mktemp -d)" +trap 'rm -rf "$tmp"' EXIT +operator="$tmp/strimzi-operator.yaml" +manifest="$tmp/kafka.k8s.yaml" +cat >"$operator" <<'UNIDESK_STRIMZI_OPERATOR_YAML' +${operatorManifest} +UNIDESK_STRIMZI_OPERATOR_YAML +cat >"$manifest" <<'UNIDESK_KAFKA_RUNTIME_YAML' +${manifest} +UNIDESK_KAFKA_RUNTIME_YAML + +kubectl create namespace ${target.namespace} ${dryClient}-o yaml >"$tmp/ns.yaml" 2>"$tmp/ns-create.err" +ns_create_rc=$? +if [ "$ns_create_rc" -eq 0 ]; then + timeout ${stepTimeoutSeconds} kubectl apply --server-side ${dryServer}--force-conflicts --field-manager=${fieldManager} -f "$tmp/ns.yaml" >"$tmp/ns.out" 2>"$tmp/ns.err" + ns_rc=$? +else + ns_rc=1 + : >"$tmp/ns.out" + cp "$tmp/ns-create.err" "$tmp/ns.err" +fi + +if [ "$ns_rc" -eq 0 ]; then + timeout ${stepTimeoutSeconds} kubectl -n ${target.namespace} apply --server-side ${dryServer}--force-conflicts --validate=false --field-manager=${fieldManager} -f "$operator" >"$tmp/operator.out" 2>"$tmp/operator.err" + operator_rc=$? +else + operator_rc=1 + printf '%s\\n' 'namespace apply failed; operator skipped' >"$tmp/operator.err" + : >"$tmp/operator.out" +fi + +if [ "$operator_rc" -eq 0 ]; then + timeout ${checkTimeoutSeconds} kubectl get ${kafka.operator.crds.map((name) => `crd/${name}`).join(" ")} >"$tmp/crd-wait.out" 2>"$tmp/crd-wait.err" + crd_ready_rc=$? +else + crd_ready_rc=1 + printf '%s\\n' 'operator apply failed; CRD check skipped' >"$tmp/crd-wait.err" + : >"$tmp/crd-wait.out" +fi + +if [ "$operator_rc" -eq 0 ] && [ "$crd_ready_rc" -eq 0 ]; then + timeout ${stepTimeoutSeconds} kubectl -n ${target.namespace} apply --server-side ${dryServer}--force-conflicts --validate=false --field-manager=${fieldManager} -f "$manifest" >"$tmp/kafka.out" 2>"$tmp/kafka.err" + kafka_rc=$? +elif [ "$operator_rc" -eq 0 ] && [ "${dryRun ? "1" : "0"}" = "1" ]; then + kafka_rc=0 + printf '%s\\n' 'dry-run: Kafka CR server validation skipped because Strimzi CRDs are not installed yet; operator dry-run succeeded.' >"$tmp/kafka.out" + cp "$tmp/crd-wait.err" "$tmp/kafka.err" +else + kafka_rc=1 + printf '%s\\n' 'Kafka CR apply skipped because Strimzi CRDs are not ready.' >"$tmp/kafka.err" + : >"$tmp/kafka.out" +fi +python3 - "$tmp" "$ns_rc" "$operator_rc" "$crd_ready_rc" "$kafka_rc" <<'PY' +import json, os, sys +tmp = sys.argv[1] +ns_rc, operator_rc, crd_ready_rc, kafka_rc = [int(value) for value in sys.argv[2:6]] +def text(name, limit=1800): + try: + return open(os.path.join(tmp, name), encoding="utf-8", errors="replace").read()[-limit:] + except FileNotFoundError: + return "" +kafka_skipped_for_missing_crds = operator_rc == 0 and crd_ready_rc != 0 and kafka_rc == 0 +payload = { + "ok": ${dryRun ? "ns_rc == 0 and operator_rc == 0 and kafka_rc == 0" : "ns_rc == 0 and operator_rc == 0 and crd_ready_rc == 0 and kafka_rc == 0"}, + "target": "${target.id}", + "route": "${target.route}", + "namespace": "${target.namespace}", + "mode": "${mode}", + "mutation": ${dryRun ? "False" : "True"}, + "operator": { + "implementation": "${kafka.operator.implementation}", + "version": "${kafka.operator.version}", + "manifestUrl": "${kafka.operator.manifestUrl}", + "manifestSha256": "${stringValue(operator.sha256)}", + "manifestBytes": ${Number(operator.bytes ?? 0)}, + "namespaceRewrite": "${target.namespace}", + }, + "cluster": { + "name": "${kafka.cluster.name}", + "bootstrap": "${bootstrapService(kafka, target)}", + "storage": "${kafka.cluster.storage.size}", + "replicas": ${kafka.cluster.replicas}, + }, + "steps": { + "namespace": {"exitCode": ns_rc, "stdoutTail": text("ns.out"), "stderrTail": text("ns.err")}, + "operatorApply": {"exitCode": operator_rc, "stdoutTail": text("operator.out"), "stderrTail": text("operator.err")}, + "crdPresent": {"exitCode": crd_ready_rc, "stdoutTail": text("crd-wait.out"), "stderrTail": text("crd-wait.err")}, + "kafkaApply": {"exitCode": kafka_rc, "skippedForMissingCrds": kafka_skipped_for_missing_crds, "stdoutTail": text("kafka.out"), "stderrTail": text("kafka.err")}, + }, + "valuesPrinted": False, + "next": { + "status": "bun scripts/cli.ts platform-infra kafka status --target ${target.id}", + "validate": "bun scripts/cli.ts platform-infra kafka validate --target ${target.id}" + } +} +print(json.dumps(payload, ensure_ascii=False, indent=2)) +sys.exit(0 if payload["ok"] else 1) +PY +`; +} + +function statusScript(kafka: PlatformKafkaConfig, target: KafkaTarget, full: boolean): string { + return ` +set -u +tmp="$(mktemp -d)" +trap 'rm -rf "$tmp"' EXIT +capture_json() { + name="$1" + shift + "$@" -o json >"$tmp/$name.json" 2>"$tmp/$name.err" + rc=$? + printf '%s' "$rc" >"$tmp/$name.rc" +} +capture_json ns kubectl get namespace ${target.namespace} +capture_json crds kubectl get ${kafka.operator.crds.map((name) => `crd/${name}`).join(" ")} +capture_json operator kubectl -n ${target.namespace} get deployment ${kafka.operator.deploymentName} +capture_json kafka kubectl -n ${target.namespace} get kafka.kafka.strimzi.io ${kafka.cluster.name} +capture_json nodepool kubectl -n ${target.namespace} get kafkanodepool.kafka.strimzi.io ${kafka.cluster.nodePoolName} +capture_json topics kubectl -n ${target.namespace} get kafkatopic.kafka.strimzi.io -l strimzi.io/cluster=${kafka.cluster.name} +capture_json users kubectl -n ${target.namespace} get kafkauser.kafka.strimzi.io -l strimzi.io/cluster=${kafka.cluster.name} +capture_json pods kubectl -n ${target.namespace} get pods -l strimzi.io/cluster=${kafka.cluster.name} +capture_json services kubectl -n ${target.namespace} get service -l strimzi.io/cluster=${kafka.cluster.name} +capture_json secrets kubectl -n ${target.namespace} get secret ${kafka.clients.map((client) => client.secretName).join(" ")} +capture_json events kubectl -n ${target.namespace} get events --sort-by=.lastTimestamp +python3 - "$tmp" <<'PY' +import json, os, sys +tmp = sys.argv[1] +expected_topics = ${JSON.stringify(kafka.topics.map((topic) => topic.name))} +expected_clients = ${JSON.stringify(kafka.clients.map((client) => client.kafkaUserName))} +def rc(name): + try: + return int(open(os.path.join(tmp, f"{name}.rc"), encoding="utf-8").read() or "1") + except FileNotFoundError: + return 1 +def load(name): + try: + return json.load(open(os.path.join(tmp, f"{name}.json"), encoding="utf-8")) + except Exception: + return None +def items(name): + data = load(name) + if isinstance(data, dict) and isinstance(data.get("items"), list): + return data["items"] + if isinstance(data, dict) and data.get("kind") != "Status": + return [data] + return [] +def meta_name(item): + return ((item or {}).get("metadata") or {}).get("name") +def conditions(item): + return (((item or {}).get("status") or {}).get("conditions") or []) +def condition_ready(item): + return any(c.get("type") == "Ready" and c.get("status") == "True" for c in conditions(item)) +def deployment_ready(item): + spec = (item or {}).get("spec") or {} + status = (item or {}).get("status") or {} + desired = spec.get("replicas", 1) + available = status.get("availableReplicas", 0) + return {"name": meta_name(item), "desired": desired, "available": available, "ready": available >= desired} +def pod_summary(item): + status = (item or {}).get("status") or {} + return {"name": meta_name(item), "phase": status.get("phase"), "ready": any(c.get("type") == "Ready" and c.get("status") == "True" for c in status.get("conditions", []))} +def service_summary(item): + spec = (item or {}).get("spec") or {} + return {"name": meta_name(item), "type": spec.get("type"), "clusterIP": spec.get("clusterIP"), "ports": [{"name": p.get("name"), "port": p.get("port")} for p in spec.get("ports", [])]} +def topic_summary(item): + spec = (item or {}).get("spec") or {} + return {"resource": meta_name(item), "topicName": spec.get("topicName"), "ready": condition_ready(item)} +def user_summary(item): + status = (item or {}).get("status") or {} + return {"name": meta_name(item), "ready": condition_ready(item), "secret": status.get("secret")} +operator = deployment_ready(load("operator") or {}) +kafka_obj = load("kafka") or {} +topics = [topic_summary(item) for item in items("topics")] +users = [user_summary(item) for item in items("users")] +topic_ready = all(any(item["topicName"] == name and item["ready"] for item in topics) for name in expected_topics) +user_ready = all(any(item["name"] == name and item["ready"] for item in users) for name in expected_clients) +secrets = items("secrets") +payload = { + "ready": rc("ns") == 0 and rc("crds") == 0 and operator["ready"] and condition_ready(kafka_obj) and topic_ready and user_ready, + "target": "${target.id}", + "route": "${target.route}", + "namespace": "${target.namespace}", + "operator": operator, + "cluster": { + "name": "${kafka.cluster.name}", + "ready": condition_ready(kafka_obj), + "conditions": conditions(kafka_obj), + "bootstrap": "${bootstrapService(kafka, target)}", + "plain": "${kafka.cluster.name}-kafka-bootstrap.${target.namespace}.svc.cluster.local:${kafka.cluster.listeners.plain.port}", + "tls": "${kafka.cluster.name}-kafka-bootstrap.${target.namespace}.svc.cluster.local:${kafka.cluster.listeners.tls.port}", + }, + "crdsReady": rc("crds") == 0, + "nodePoolReady": condition_ready(load("nodepool") or {}), + "topicReady": topic_ready, + "userReady": user_ready, + "topics": topics, + "users": users, + "pods": [pod_summary(item) for item in items("pods")], + "services": [service_summary(item) for item in items("services")], + "clientSecrets": [{"name": meta_name(item), "keys": sorted(((item or {}).get("data") or {}).keys()), "valuesPrinted": False} for item in secrets], + "valuesPrinted": False, +} +if ${full ? "False" : "True"}: + payload["cluster"]["conditions"] = [{"type": c.get("type"), "status": c.get("status"), "reason": c.get("reason")} for c in payload["cluster"]["conditions"]] +print(json.dumps(payload, ensure_ascii=False, indent=2)) +sys.exit(0 if payload["ready"] else 1) +PY +`; +} + +function validateScript(kafka: PlatformKafkaConfig, target: KafkaTarget, full: boolean): string { + const topic = kafka.validation.smokeTopic; + return ` +set -u +tmp="$(mktemp -d)" +trap 'rm -rf "$tmp"' EXIT +pod="$(kubectl -n ${target.namespace} get pod -l strimzi.io/cluster=${kafka.cluster.name} -o jsonpath='{.items[0].metadata.name}' 2>"$tmp/pod.err" || true)" +payload="unidesk-kafka-smoke-${target.id}-$(date +%s)-$$" +printf '%s' "$payload" >"$tmp/payload.txt" +if [ -n "$pod" ]; then + printf '%s\\n' "$payload" | kubectl -n ${target.namespace} exec -i "$pod" -- bin/kafka-console-producer.sh --bootstrap-server ${kafka.cluster.name}-kafka-bootstrap:${kafka.cluster.listeners.plain.port} --topic ${topic} --producer-property acks=all >"$tmp/produce.out" 2>"$tmp/produce.err" + produce_rc=$? + if [ "$produce_rc" -eq 0 ]; then + timeout ${kafka.validation.timeoutSeconds} kubectl -n ${target.namespace} exec "$pod" -- bin/kafka-console-consumer.sh --bootstrap-server ${kafka.cluster.name}-kafka-bootstrap:${kafka.cluster.listeners.plain.port} --topic ${topic} --from-beginning --timeout-ms ${kafka.validation.timeoutSeconds * 1000} >"$tmp/consume.raw" 2>"$tmp/consume.err" || true + grep -F "$payload" "$tmp/consume.raw" >"$tmp/consume.out" 2>>"$tmp/consume.err" + consume_rc=$? + else + consume_rc=1 + printf '%s\\n' 'producer failed; consumer skipped' >"$tmp/consume.err" + fi +else + produce_rc=1 + consume_rc=1 + printf '%s\\n' 'kafka pod not found' >"$tmp/produce.err" + printf '%s\\n' 'kafka pod not found' >"$tmp/consume.err" +fi +python3 - "$tmp" "$pod" "$payload" "$produce_rc" "$consume_rc" <<'PY' +import hashlib, json, os, sys +tmp, pod, payload = sys.argv[1], sys.argv[2], sys.argv[3] +produce_rc, consume_rc = int(sys.argv[4]), int(sys.argv[5]) +def text(name, limit=1200): + try: + return open(os.path.join(tmp, name), encoding="utf-8", errors="replace").read()[-limit:] + except FileNotFoundError: + return "" +payload_obj = { + "ok": produce_rc == 0 and consume_rc == 0, + "target": "${target.id}", + "namespace": "${target.namespace}", + "cluster": "${kafka.cluster.name}", + "topic": "${topic}", + "pod": pod or None, + "message": { + "sha256": hashlib.sha256(payload.encode()).hexdigest(), + "valuesPrinted": False + }, + "steps": { + "produce": {"exitCode": produce_rc, "stdoutTail": text("produce.out"), "stderrTail": text("produce.err")}, + "consume": {"exitCode": consume_rc, "stdoutTail": text("consume.out"), "stderrTail": text("consume.err")}, + }, + "valuesPrinted": False, +} +print(json.dumps(payload_obj, ensure_ascii=False, indent=2)) +sys.exit(0 if payload_obj["ok"] else 1) +PY +`; +} + +function configSummary(kafka: PlatformKafkaConfig, target: KafkaTarget): Record { + return { + configPath: configLabel, + spec: kafka.metadata.spec, + target: targetSummary(target), + switch: kafka.defaults.switch, + operator: operatorSummary(kafka, target), + cluster: clusterSummary(kafka, target), + topics: kafka.topics.map((topic) => topicSummary(topic)), + clients: kafka.clients.map((client) => clientSummary(client)), + }; +} + +function compactConfigSummary(kafka: PlatformKafkaConfig, target: KafkaTarget): Record { + return { + configPath: configLabel, + spec: kafka.metadata.spec, + target: targetSummary(target), + switch: kafka.defaults.switch, + operator: { + implementation: kafka.operator.implementation, + version: kafka.operator.version, + deploymentName: kafka.operator.deploymentName, + }, + cluster: { + name: kafka.cluster.name, + bootstrap: bootstrapService(kafka, target), + replicas: kafka.cluster.replicas, + storageSize: kafka.cluster.storage.size, + authorizationEnabled: kafka.cluster.authorization.enabled, + }, + topicCount: kafka.topics.length, + clientCount: kafka.clients.length, + valuesPrinted: false, + }; +} + +function targetSummary(target: KafkaTarget): Record { + return { + id: target.id, + route: target.route, + namespace: target.namespace, + role: target.role, + createNamespace: target.createNamespace, + storageClassName: target.storageClassName, + }; +} + +function operatorSummary(kafka: PlatformKafkaConfig, target: KafkaTarget): Record { + return { + implementation: kafka.operator.implementation, + version: kafka.operator.version, + manifestUrl: kafka.operator.manifestUrl, + deploymentName: kafka.operator.deploymentName, + namespace: target.namespace, + crds: kafka.operator.crds, + }; +} + +function clusterSummary(kafka: PlatformKafkaConfig, target: KafkaTarget): Record { + return { + name: kafka.cluster.name, + nodePoolName: kafka.cluster.nodePoolName, + kafkaVersion: kafka.cluster.kafkaVersion, + metadataVersion: kafka.cluster.metadataVersion, + replicas: kafka.cluster.replicas, + storage: { ...kafka.cluster.storage, storageClassName: target.storageClassName }, + bootstrap: bootstrapService(kafka, target), + authorization: kafka.cluster.authorization, + }; +} + +function topicSummary(topic: KafkaTopicSpec): Record { + return { + name: topic.name, + resourceName: topicResourceName(topic.name), + partitions: topic.partitions, + replicas: topic.replicas, + retentionMs: topic.retentionMs, + cleanupPolicy: topic.cleanupPolicy, + }; +} + +function clientSummary(client: KafkaClientSpec): Record { + return { + id: client.id, + namespace: client.namespace, + serviceAccountName: client.serviceAccountName, + kafkaUserName: client.kafkaUserName, + secretName: client.secretName, + produceTopics: client.produceTopics, + consumeTopics: client.consumeTopics, + dlqTopics: client.dlqTopics, + valuesPrinted: false, + }; +} + +function policyChecks(kafka: PlatformKafkaConfig, target: KafkaTarget, manifest: string): Array> { + return [ + { name: "yaml-source-of-truth", ok: true, detail: "Kafka target, namespace, Strimzi version, topics, clients, storage and switch are read from config/platform-infra/kafka.yaml." }, + { name: "namespace-is-platform-infra", ok: target.namespace === "platform-infra", detail: "Kafka runtime belongs to platform-infra, not an HWLAB or AgentRun lane namespace." }, + { name: "no-public-exposure", ok: !/^\s*type:\s*(NodePort|LoadBalancer)\s*$/mu.test(manifest) && !/^\s*kind:\s*Ingress\s*$/mu.test(manifest), detail: "Kafka POC is ClusterIP/internal only." }, + { name: "single-broker-poc", ok: kafka.cluster.replicas === 1, detail: "D518 POC uses one KRaft broker/controller; production HA remains out of scope." }, + { name: "allow-all-network-policy", ok: manifest.includes("kind: NetworkPolicy") && manifest.includes("name: allow-all") && manifest.includes(`namespace: ${target.namespace}`), detail: `NetworkPolicy/allow-all is rendered in ${target.namespace}.` }, + { name: "app-integration-off-for-p1", ok: kafka.defaults.switch.appIntegrationEnabled === false, detail: "P1 deploys runtime and smoke only; app producer/consumer switchover is a later stage." }, + ]; +} + +function statusSummary(payload: Record): Record { + return { + ready: payload.ready === true, + target: payload.target, + route: payload.route, + namespace: payload.namespace, + operator: payload.operator, + cluster: payload.cluster, + crdsReady: payload.crdsReady === true, + nodePoolReady: payload.nodePoolReady === true, + topics: Array.isArray(payload.topics) ? payload.topics : [], + users: Array.isArray(payload.users) ? payload.users : [], + pods: Array.isArray(payload.pods) ? payload.pods : [], + services: Array.isArray(payload.services) ? payload.services : [], + clientSecrets: Array.isArray(payload.clientSecrets) ? payload.clientSecrets : [], + valuesPrinted: false, + }; +} + +function manifestObjectSummary(yaml: string): Array> { + const objects: Array> = []; + for (const doc of yaml.split(/^---$/mu)) { + const kind = doc.match(/^\s*kind:\s*([A-Za-z0-9._-]+)\s*$/mu)?.[1]; + const name = doc.match(/^\s*name:\s*([A-Za-z0-9._-]+)\s*$/mu)?.[1]; + const namespace = doc.match(/^\s*namespace:\s*([A-Za-z0-9._-]+)\s*$/mu)?.[1] ?? null; + if (kind !== undefined && name !== undefined) objects.push({ kind, name, namespace }); + } + return objects; +} + +function renderPlan(result: Record): RenderedCliResult { + const config = record(result.config); + const target = record(config.target); + const cluster = record(config.cluster); + const operator = record(config.operator); + const switchRecord = record(config.switch); + const policy = arrayRecords(result.policy); + const failed = policy.filter((item) => item.ok === false); + const next = record(result.next); + return rendered(result, "platform-infra kafka plan", [ + "PLATFORM-INFRA KAFKA PLAN", + ...table(["FIELD", "VALUE", "DETAIL", "VALUE"], [ + ["TARGET", stringValue(target.id), "route", stringValue(target.route)], + ["NAMESPACE", stringValue(target.namespace), "role", stringValue(target.role)], + ["OPERATOR", `strimzi ${stringValue(operator.version)}`, "manifest", stringValue(operator.manifestUrl)], + ["CLUSTER", stringValue(cluster.name), "bootstrap", stringValue(cluster.bootstrap)], + ["STORAGE", `${stringValue(record(cluster.storage).size)} ${stringValue(record(cluster.storage).storageClassName)}`, "replicas", stringValue(cluster.replicas)], + ["TOPICS", String(arrayRecords(config.topics).length), "clients", String(arrayRecords(config.clients).length)], + ["SWITCH", stringValue(switchRecord.mode), "appIntegration", boolText(switchRecord.appIntegrationEnabled)], + ["POLICY", failed.length === 0 ? "ok" : `failed=${failed.length}`, "valuesPrinted", "false"], + ]), + "", + "NEXT", + ` dry-run: ${stringValue(next.dryRun)}`, + ` apply: ${stringValue(next.apply)}`, + ` status: ${stringValue(next.status)}`, + ` validate: ${stringValue(next.validate)}`, + "", + "Boundary: Kafka runtime is platform-infra only; HWLAB/AgentRun app integration remains disabled in P1.", + "Disclosure: Secret values are not printed; client entries show only object/key metadata.", + ]); +} + +function renderStatus(result: Record): RenderedCliResult { + const summary = record(result.summary); + const cluster = record(summary.cluster); + const operator = record(summary.operator); + const topics = arrayRecords(summary.topics).map((topic) => [stringValue(topic.topicName), boolText(topic.ready), stringValue(topic.resource)]); + const users = arrayRecords(summary.users).map((user) => [stringValue(user.name), boolText(user.ready), stringValue(user.secret)]); + const pods = arrayRecords(summary.pods).map((pod) => [stringValue(pod.name), stringValue(pod.phase), boolText(pod.ready)]); + return rendered(result, "platform-infra kafka status", [ + "PLATFORM-INFRA KAFKA STATUS", + ...table(["TARGET", "ROUTE", "NAMESPACE", "READY"], [[stringValue(summary.target), stringValue(summary.route), stringValue(summary.namespace), boolText(summary.ready)]]), + "", + "CONTROL", + ...table(["CHECK", "VALUE", "DETAIL"], [ + ["crds", boolText(summary.crdsReady), "Strimzi Kafka CRDs"], + ["operator", boolText(operator.ready), `${stringValue(operator.name)} ${stringValue(operator.available)}/${stringValue(operator.desired)}`], + ["cluster", boolText(record(cluster).ready), stringValue(record(cluster).bootstrap)], + ["nodePool", boolText(summary.nodePoolReady), "dual-role broker/controller"], + ]), + "", + "TOPICS", + ...(topics.length === 0 ? ["-"] : table(["TOPIC", "READY", "RESOURCE"], topics)), + "", + "CLIENTS", + ...(users.length === 0 ? ["-"] : table(["USER", "READY", "SECRET"], users)), + "", + "PODS", + ...(pods.length === 0 ? ["-"] : table(["POD", "PHASE", "READY"], pods)), + "", + `NEXT bun scripts/cli.ts platform-infra kafka validate --target ${stringValue(summary.target)}`, + ]); +} + +function rendered(result: Record, command: string, lines: string[]): RenderedCliResult { + return { ok: result.ok !== false, command, renderedText: lines.join("\n"), contentType: "text/plain" }; +} + +function parseApplyOptions(args: string[]): ApplyOptions { + const commonArgs: string[] = []; + let confirm = false; + let dryRun = false; + let wait = false; + for (let index = 0; index < args.length; index += 1) { + const arg = args[index]; + if (arg === "--confirm") confirm = true; + else if (arg === "--dry-run") dryRun = true; + else if (arg === "--wait") wait = true; + else { + commonArgs.push(arg); + if (arg === "--target") { + commonArgs.push(args[index + 1] ?? ""); + index += 1; + } + } + } + if (confirm && dryRun) throw new Error("kafka apply accepts only one of --confirm or --dry-run"); + return { ...parseCommonOptions(commonArgs), confirm, dryRun: dryRun || !confirm, wait }; +} + +function parseCommonOptions(args: string[]): CommonOptions { + let targetId: string | null = null; + let full = false; + let raw = false; + for (let index = 0; index < args.length; index += 1) { + const arg = args[index]; + if (arg === "--target") { + const value = args[index + 1]; + if (value === undefined || value.startsWith("--")) throw new Error("--target requires a value"); + if (!/^[A-Za-z0-9._-]+$/u.test(value)) throw new Error("--target must be a simple target id"); + targetId = value; + index += 1; + } else if (arg === "--full") { + full = true; + } else if (arg === "--raw") { + raw = true; + full = true; + } else { + throw new Error(`unsupported kafka option: ${arg}`); + } + } + return { targetId, full, raw }; +} + +function bootstrapService(kafka: PlatformKafkaConfig, target: KafkaTarget): string { + return `${kafka.cluster.name}-kafka-bootstrap.${target.namespace}.svc.cluster.local:${kafka.cluster.listeners.plain.port}`; +} + +function topicNameField(record: Record, key: string, path: string): string { + const value = y.stringField(record, key, path); + if (!/^[A-Za-z0-9._-]+$/u.test(value) || value.includes("..")) throw new Error(`${configLabel}.${path}.${key} has an unsupported Kafka topic name`); + return value; +} + +function topicArray(record: Record, key: string, path: string): string[] { + return y.stringArrayField(record, key, path).map((value) => { + if (!/^[A-Za-z0-9._-]+$/u.test(value) || value.includes("..")) throw new Error(`${configLabel}.${path}.${key} contains unsupported topic name ${value}`); + return value; + }); +} + +function topicResourceName(topic: string): string { + return topic.toLowerCase().replace(/[^a-z0-9-]+/gu, "-").replace(/^-+|-+$/gu, "").slice(0, 253); +} + +function positiveInteger(record: Record, key: string, path: string): number { + const value = y.integerField(record, key, path); + if (value < 1) throw new Error(`${configLabel}.${path}.${key} must be >= 1`); + return value; +} + +function quoteAnnotation(value: string): string { + return JSON.stringify(value); +} + +function record(value: unknown): Record { + return typeof value === "object" && value !== null && !Array.isArray(value) ? value as Record : {}; +} + +function arrayRecords(value: unknown): Record[] { + return Array.isArray(value) ? value.map(record) : []; +} + +function stringValue(value: unknown, fallback = "-"): string { + if (value === undefined || value === null || value === "") return fallback; + return String(value); +} + +function boolText(value: unknown): string { + return value === true ? "yes" : value === false ? "no" : "-"; +} + +function table(headers: string[], rows: string[][]): string[] { + const widths = headers.map((header, index) => Math.max(header.length, ...rows.map((row) => row[index]?.length ?? 0))); + const renderRow = (row: string[]) => row.map((cell, index) => cell.padEnd(widths[index] ?? cell.length)).join(" ").trimEnd(); + return [renderRow(headers), ...rows.map(renderRow)]; +} diff --git a/scripts/src/platform-infra/entry.ts b/scripts/src/platform-infra/entry.ts index 546142ca..d692930a 100644 --- a/scripts/src/platform-infra/entry.ts +++ b/scripts/src/platform-infra/entry.ts @@ -314,7 +314,7 @@ export interface ManagedResourceCleanupPlan { export function platformInfraHelp(): unknown { const target = sub2ApiHelpTargetSummary(); return { - command: "platform-infra sub2api|langbot|n8n|wechat-archive|observability|secret-plane ...", + command: "platform-infra sub2api|langbot|n8n|wechat-archive|observability|secret-plane|kafka ...", output: "json", usage: [ "bun scripts/cli.ts platform-infra sub2api plan [--target G14|D601]", @@ -365,8 +365,13 @@ export function platformInfraHelp(): unknown { "bun scripts/cli.ts platform-infra secret-plane apply --target D518 --confirm", "bun scripts/cli.ts platform-infra secret-plane status --target D518", "bun scripts/cli.ts platform-infra secret-plane validate --target D518", + "bun scripts/cli.ts platform-infra kafka plan --target D518", + "bun scripts/cli.ts platform-infra kafka apply --target D518 --dry-run", + "bun scripts/cli.ts platform-infra kafka apply --target D518 --confirm", + "bun scripts/cli.ts platform-infra kafka status --target D518", + "bun scripts/cli.ts platform-infra kafka validate --target D518", ], - description: "Operate YAML-controlled platform-infra services such as Sub2API, LangBot, n8n, WeChat archive workflows, OpenTelemetry tracing and the independent target-scoped secret plane. Public services use PK01 Caddy+FRP rather than Kubernetes Ingress, NodePort, or LoadBalancer.", + description: "Operate YAML-controlled platform-infra services such as Sub2API, LangBot, n8n, WeChat archive workflows, OpenTelemetry tracing, the independent target-scoped secret plane, and the D518 Kafka event bus. Public services use PK01 Caddy+FRP rather than Kubernetes Ingress, NodePort, or LoadBalancer.", target, codexPool: { usage: [ diff --git a/scripts/src/platform-infra/options.ts b/scripts/src/platform-infra/options.ts index cff11905..220d8178 100644 --- a/scripts/src/platform-infra/options.ts +++ b/scripts/src/platform-infra/options.ts @@ -60,6 +60,10 @@ export async function runPlatformInfraCommand(config: UniDeskConfig, args: strin const { runSecretPlaneCommand } = await import("../platform-infra-secret-plane"); return await runSecretPlaneCommand(config, args.slice(1)); } + if (target === "kafka") { + const { runPlatformInfraKafkaCommand } = await import("../platform-infra-kafka"); + return await runPlatformInfraKafkaCommand(config, args.slice(1)); + } if (target !== "sub2api") return unsupported(args); if (action === "plan" || action === undefined) { const planArgs = args.slice(2);