diff --git a/bun.lock b/bun.lock index a3e114a..e8c264c 100644 --- a/bun.lock +++ b/bun.lock @@ -6,6 +6,7 @@ "name": "agentrun", "dependencies": { "@openai/codex": "0.133.0", + "kafkajs": "^2.2.4", "pg": "^8.13.1", "yaml": "^2.8.0", }, @@ -92,6 +93,8 @@ "fsevents": ["fsevents@2.3.3", "", { "os": "darwin" }, "sha512-5xoDfX+fL7faATnagmWPpbFtwh/R77WmMMqqHGS65C3vvB0YHrgF+B1YmZ3441tMj5n63k0212XNoJwzlhffQw=="], + "kafkajs": ["kafkajs@2.2.4", "", {}, "sha512-j/YeapB1vfPT2iOIUn/vxdyKEuhuY2PxMBvf5JWux6iSaukAccrMtXEY/Lb7OvavDhOWME589bpLrEdnVHjfjA=="], + "pg": ["pg@8.21.0", "", { "dependencies": { "pg-connection-string": "^2.13.0", "pg-pool": "^3.14.0", "pg-protocol": "^1.14.0", "pg-types": "2.2.0", "pgpass": "1.0.5" }, "optionalDependencies": { "pg-cloudflare": "^1.4.0" }, "peerDependencies": { "pg-native": ">=3.0.1" }, "optionalPeers": ["pg-native"] }, "sha512-AUP1EYJuHraQGsVoCQVIcM7TEJVGtDzxWtGFZd8rds9d+CCXlU5Js1rYgfLNvxy9iJrpHjGrRjoi/3BT9fRyiA=="], "pg-cloudflare": ["pg-cloudflare@1.4.0", "", {}, "sha512-Vo7z/6rrQYxpNRylp4Tlob2elzbh+N/MOQbxFVWCxS7oEx6jF53GTJFxK2WWpKuBRkmiin4Mt+xofFDjx09R0A=="], diff --git a/package.json b/package.json index f299528..8f13bc8 100644 --- a/package.json +++ b/package.json @@ -11,6 +11,7 @@ }, "dependencies": { "@openai/codex": "0.133.0", + "kafkajs": "^2.2.4", "pg": "^8.13.1", "yaml": "^2.8.0" }, diff --git a/src/mgr/kafka-shadow-producer.ts b/src/mgr/kafka-shadow-producer.ts new file mode 100644 index 0000000..6b1f4d3 --- /dev/null +++ b/src/mgr/kafka-shadow-producer.ts @@ -0,0 +1,265 @@ +import { Kafka, logLevel, type Producer } from "kafkajs"; +import type { AgentRunStore } from "./store.js"; +import type { CommandRecord, JsonRecord, JsonValue, RunEvent, RunRecord } from "../common/types.js"; +import { stableHash } from "../common/validation.js"; + +const TRUE_VALUES = new Set(["1", "true", "yes", "on"]); +const warnedCodes = new Set(); +let producerState: { key: string; producer: Producer; connecting: Promise | null } | null = null; + +interface KafkaShadowConfig { + brokers: string[]; + topic: string; + clientId: string; + key: string; +} + +interface ShadowEventInput { + env: NodeJS.ProcessEnv; + eventType: string; + run?: RunRecord | null; + command?: CommandRecord | null; + event?: RunEvent; + eventPayload?: JsonRecord; + store?: AgentRunStore; + runId?: string; +} + +type StoreFunction = (...args: never[]) => unknown; + +export function withKafkaShadowProducer(store: AgentRunStore, env: NodeJS.ProcessEnv = process.env): AgentRunStore { + if (!kafkaShadowConfig(env)) return store; + return new Proxy(store, { + get(target, prop, receiver) { + if (prop === "createRun") { + return async (input: Parameters[0]): Promise => { + const run = await target.createRun(input); + publishAgentRunShadow({ env, eventType: "agentrun.run.created", run }); + return run; + }; + } + if (prop === "createCommand") { + return async (runId: string, input: Parameters[1]): Promise => { + const command = await target.createCommand(runId, input); + publishAgentRunShadow({ env, eventType: "agentrun.command.created", store: target, runId, command }); + return command; + }; + } + if (prop === "appendEvent") { + return async (runId: string, type: RunEvent["type"], payload: JsonRecord): Promise => { + const event = await target.appendEvent(runId, type, payload); + publishAgentRunShadow({ env, eventType: "agentrun.event.appended", store: target, runId, event, eventPayload: payload }); + return event; + }; + } + if (prop === "finishCommand") { + return async (commandId: string, result: Parameters[1]): Promise => { + const command = await target.finishCommand(commandId, result); + publishAgentRunShadow({ env, eventType: "agentrun.command.terminal", store: target, runId: command.runId, command, eventPayload: terminalPayload(result, command) }); + return command; + }; + } + if (prop === "finishRun") { + return async (runId: string, result: Parameters[1]): Promise => { + const run = await target.finishRun(runId, result); + publishAgentRunShadow({ env, eventType: "agentrun.run.terminal", run, eventPayload: terminalPayload(result, null) }); + return run; + }; + } + const value = Reflect.get(target, prop, receiver) as unknown; + if (typeof value === "function") return (value as StoreFunction).bind(target); + return value; + }, + }) as AgentRunStore; +} + +function publishAgentRunShadow(input: ShadowEventInput): void { + const config = kafkaShadowConfig(input.env); + if (!config) return; + void Promise.resolve(resolveRun(input)) + .then((run) => { + const message = buildAgentRunShadowMessage({ ...input, run, config }); + return producerForConfig(config).then((producer) => producer.send({ topic: config.topic, messages: [message] })); + }) + .catch((error) => warnShadowProducer("agentrun-kafka-shadow-produce-failed", { + message: error instanceof Error ? error.message : String(error), + eventType: input.eventType, + topic: config.topic, + clientId: config.clientId, + })); +} + +async function resolveRun(input: ShadowEventInput): Promise { + if (input.run) return input.run; + if (!input.store || !input.runId) return null; + return await input.store.getRun(input.runId); +} + +function buildAgentRunShadowMessage(input: ShadowEventInput & { run: RunRecord | null; config: KafkaShadowConfig }): { key: string; value: string; headers: Record } { + const payload = input.eventPayload ?? eventPayload(input.event); + const event = { + schema: "agentrun.hwlab.event.shadow.v1", + eventType: input.eventType, + source: input.config.clientId, + producedAt: new Date().toISOString(), + mode: "shadow-produce-only", + run: runSummary(input.run), + command: commandSummary(input.command), + event: runEventSummary(input.event, payload), + diagnostics: { + payloadSha256: stableHash((payload ?? {}) as JsonValue), + payloadBytes: jsonBytes(payload ?? {}), + valuesPrinted: false, + shadowConsumeEnabled: false, + }, + valuesPrinted: false, + }; + return { + key: safeText(input.command?.id ?? input.eventPayload?.commandId ?? input.run?.id ?? input.runId) ?? input.config.clientId, + value: JSON.stringify(event), + headers: { + "x-shadow-mode": "produce-only", + "x-values-printed": "false", + "x-event-type": input.eventType, + }, + }; +} + +function kafkaShadowConfig(env: NodeJS.ProcessEnv): KafkaShadowConfig | null { + if (!truthy(env.AGENTRUN_KAFKA_SHADOW_PRODUCE_ENABLED)) return null; + if (truthy(env.AGENTRUN_KAFKA_SHADOW_CONSUME_ENABLED)) { + warnOnce("agentrun-kafka-shadow-consume-ignored", { + message: "AgentRun Kafka consumer cutover is disabled in this stage; producer continues in shadow mode.", + valuesPrinted: false, + }); + } + const brokers = csv(env.AGENTRUN_KAFKA_BOOTSTRAP_SERVERS); + const topic = safeText(env.AGENTRUN_KAFKA_EVENT_TOPIC); + const clientId = safeText(env.AGENTRUN_KAFKA_CLIENT_ID); + const missing: string[] = []; + if (brokers.length === 0) missing.push("AGENTRUN_KAFKA_BOOTSTRAP_SERVERS"); + if (!topic) missing.push("AGENTRUN_KAFKA_EVENT_TOPIC"); + if (!clientId) missing.push("AGENTRUN_KAFKA_CLIENT_ID"); + if (missing.length > 0 || !topic || !clientId) { + warnOnce("agentrun-kafka-shadow-config-missing", { missing, valuesPrinted: false }); + return null; + } + return { brokers, topic, clientId, key: `${clientId}|${topic}|${brokers.join(",")}` }; +} + +async function producerForConfig(config: KafkaShadowConfig): Promise { + if (producerState?.key === config.key && producerState.producer) return producerState.producer; + if (producerState?.key === config.key && producerState.connecting) return producerState.connecting; + const kafka = new Kafka({ clientId: config.clientId, brokers: config.brokers, logLevel: logLevel.NOTHING }); + const producer = kafka.producer({ allowAutoTopicCreation: false }); + const connecting = producer.connect().then(() => { + if (producerState?.producer === producer) producerState.connecting = null; + return producer; + }).catch((error) => { + if (producerState?.producer === producer) producerState = null; + throw error; + }); + const state: { key: string; producer: Producer; connecting: Promise | null } = { + key: config.key, + producer, + connecting, + }; + producerState = state; + return await connecting; +} + +function runSummary(run: RunRecord | null): JsonRecord | null { + if (!run) return null; + return { + runId: run.id, + status: run.status, + terminalStatus: run.terminalStatus, + failureKind: run.failureKind, + tenantId: run.tenantId, + projectId: run.projectId, + providerId: run.providerId, + backendProfile: run.backendProfile, + sessionId: run.sessionRef?.sessionId ?? null, + conversationId: run.sessionRef?.conversationId ?? null, + threadId: run.sessionRef?.threadId ?? null, + valuesPrinted: false, + }; +} + +function commandSummary(command: CommandRecord | null | undefined): JsonRecord | null { + if (!command) return null; + return { + commandId: command.id, + runId: command.runId, + seq: command.seq, + type: command.type, + state: command.state, + payloadHash: command.payloadHash, + cancelEpoch: command.cancelEpoch, + valuesPrinted: false, + }; +} + +function runEventSummary(event: RunEvent | undefined, payload: JsonRecord | null): JsonRecord | null { + if (!event && !payload) return null; + return { + eventId: event?.id ?? null, + runId: event?.runId ?? null, + seq: event?.seq ?? null, + type: event?.type ?? null, + commandId: safeText(event?.commandId ?? payload?.commandId), + phase: safeText(payload?.phase), + state: safeText(payload?.state), + terminalStatus: safeText(payload?.terminalStatus), + failureKind: safeText(payload?.failureKind), + threadId: safeText(payload?.threadId), + turnId: safeText(payload?.turnId), + valuesPrinted: false, + }; +} + +function terminalPayload(result: Parameters[1], command: CommandRecord | null): JsonRecord { + return { + phase: command ? "command-terminal" : "run-terminal", + commandId: command?.id ?? null, + state: command?.state ?? null, + terminalStatus: result.terminalStatus, + failureKind: result.failureKind ?? null, + threadId: result.threadId ?? null, + turnId: result.turnId ?? null, + valuesPrinted: false, + }; +} + +function eventPayload(event: RunEvent | undefined): JsonRecord | null { + if (!event || typeof event.payload !== "object" || event.payload === null || Array.isArray(event.payload)) return null; + return event.payload as JsonRecord; +} + +function truthy(value: string | undefined): boolean { + return TRUE_VALUES.has(String(value ?? "").trim().toLowerCase()); +} + +function csv(value: string | undefined): string[] { + return String(value ?? "").split(",").map((entry) => entry.trim()).filter(Boolean); +} + +function safeText(value: unknown, max = 240): string | null { + const text = typeof value === "string" ? value.trim() : value === undefined || value === null ? "" : String(value).trim(); + if (text.length === 0) return null; + return text.length > max ? `${text.slice(0, max)}...` : text; +} + +function jsonBytes(value: JsonRecord): number { + return Buffer.byteLength(JSON.stringify(value), "utf8"); +} + +function warnOnce(code: string, details: JsonRecord): void { + if (warnedCodes.has(code)) return; + warnedCodes.add(code); + warnShadowProducer(code, details); +} + +function warnShadowProducer(code: string, details: JsonRecord = {}): void { + console.warn(JSON.stringify({ code, component: "agentrun-kafka-shadow-producer", ...details, valuesPrinted: false })); +} diff --git a/src/mgr/server.ts b/src/mgr/server.ts index 1de0721..f638431 100644 --- a/src/mgr/server.ts +++ b/src/mgr/server.ts @@ -3,6 +3,7 @@ import { createServer } from "node:http"; import type { AddressInfo } from "node:net"; import type { AgentRunStore, ListQueueTasksInput, ListSessionsInput, SessionEventPageInput } from "./store.js"; import { openAgentRunStoreFromEnv } from "./store.js"; +import { withKafkaShadowProducer } from "./kafka-shadow-producer.js"; import { AgentRunError, errorToJson } from "../common/errors.js"; import { asRecord, validateBackendProfile, validateCreateCommand, validateCreateQueueTask, validateCreateRun, validateQueueTaskState, validateSessionListState } from "../common/validation.js"; import { isBackendProfile } from "../common/backend-profiles.js"; @@ -143,7 +144,8 @@ export interface StartedManagerServer { } export async function startManagerServer(options: ManagerServerOptions = {}): Promise { - const store = options.store ?? await openAgentRunStoreFromEnv(); + const rawStore = options.store ?? await openAgentRunStoreFromEnv(); + const store = withKafkaShadowProducer(rawStore, process.env); const sourceCommit = options.sourceCommit ?? process.env.AGENTRUN_SOURCE_COMMIT ?? "unknown"; const runnerJobDefaults = options.runnerJobDefaults; const sessionPvcDefaults = options.sessionPvcOptions;