Merge pull request #246 from pikasTech/feat/2243-kafka-shadow-producer

feat: shadow produce manager Kafka events
This commit is contained in:
Lyon
2026-06-28 19:53:25 +08:00
committed by GitHub
4 changed files with 272 additions and 1 deletions
+3
View File
@@ -6,6 +6,7 @@
"name": "agentrun",
"dependencies": {
"@openai/codex": "0.133.0",
"kafkajs": "^2.2.4",
"pg": "^8.13.1",
"yaml": "^2.8.0",
},
@@ -92,6 +93,8 @@
"fsevents": ["fsevents@2.3.3", "", { "os": "darwin" }, "sha512-5xoDfX+fL7faATnagmWPpbFtwh/R77WmMMqqHGS65C3vvB0YHrgF+B1YmZ3441tMj5n63k0212XNoJwzlhffQw=="],
"kafkajs": ["kafkajs@2.2.4", "", {}, "sha512-j/YeapB1vfPT2iOIUn/vxdyKEuhuY2PxMBvf5JWux6iSaukAccrMtXEY/Lb7OvavDhOWME589bpLrEdnVHjfjA=="],
"pg": ["pg@8.21.0", "", { "dependencies": { "pg-connection-string": "^2.13.0", "pg-pool": "^3.14.0", "pg-protocol": "^1.14.0", "pg-types": "2.2.0", "pgpass": "1.0.5" }, "optionalDependencies": { "pg-cloudflare": "^1.4.0" }, "peerDependencies": { "pg-native": ">=3.0.1" }, "optionalPeers": ["pg-native"] }, "sha512-AUP1EYJuHraQGsVoCQVIcM7TEJVGtDzxWtGFZd8rds9d+CCXlU5Js1rYgfLNvxy9iJrpHjGrRjoi/3BT9fRyiA=="],
"pg-cloudflare": ["pg-cloudflare@1.4.0", "", {}, "sha512-Vo7z/6rrQYxpNRylp4Tlob2elzbh+N/MOQbxFVWCxS7oEx6jF53GTJFxK2WWpKuBRkmiin4Mt+xofFDjx09R0A=="],
+1
View File
@@ -11,6 +11,7 @@
},
"dependencies": {
"@openai/codex": "0.133.0",
"kafkajs": "^2.2.4",
"pg": "^8.13.1",
"yaml": "^2.8.0"
},
+265
View File
@@ -0,0 +1,265 @@
import { Kafka, logLevel, type Producer } from "kafkajs";
import type { AgentRunStore } from "./store.js";
import type { CommandRecord, JsonRecord, JsonValue, RunEvent, RunRecord } from "../common/types.js";
import { stableHash } from "../common/validation.js";
const TRUE_VALUES = new Set(["1", "true", "yes", "on"]);
const warnedCodes = new Set<string>();
let producerState: { key: string; producer: Producer; connecting: Promise<Producer> | null } | null = null;
interface KafkaShadowConfig {
brokers: string[];
topic: string;
clientId: string;
key: string;
}
interface ShadowEventInput {
env: NodeJS.ProcessEnv;
eventType: string;
run?: RunRecord | null;
command?: CommandRecord | null;
event?: RunEvent;
eventPayload?: JsonRecord;
store?: AgentRunStore;
runId?: string;
}
type StoreFunction = (...args: never[]) => unknown;
export function withKafkaShadowProducer(store: AgentRunStore, env: NodeJS.ProcessEnv = process.env): AgentRunStore {
if (!kafkaShadowConfig(env)) return store;
return new Proxy(store, {
get(target, prop, receiver) {
if (prop === "createRun") {
return async (input: Parameters<AgentRunStore["createRun"]>[0]): Promise<RunRecord> => {
const run = await target.createRun(input);
publishAgentRunShadow({ env, eventType: "agentrun.run.created", run });
return run;
};
}
if (prop === "createCommand") {
return async (runId: string, input: Parameters<AgentRunStore["createCommand"]>[1]): Promise<CommandRecord> => {
const command = await target.createCommand(runId, input);
publishAgentRunShadow({ env, eventType: "agentrun.command.created", store: target, runId, command });
return command;
};
}
if (prop === "appendEvent") {
return async (runId: string, type: RunEvent["type"], payload: JsonRecord): Promise<RunEvent> => {
const event = await target.appendEvent(runId, type, payload);
publishAgentRunShadow({ env, eventType: "agentrun.event.appended", store: target, runId, event, eventPayload: payload });
return event;
};
}
if (prop === "finishCommand") {
return async (commandId: string, result: Parameters<AgentRunStore["finishCommand"]>[1]): Promise<CommandRecord> => {
const command = await target.finishCommand(commandId, result);
publishAgentRunShadow({ env, eventType: "agentrun.command.terminal", store: target, runId: command.runId, command, eventPayload: terminalPayload(result, command) });
return command;
};
}
if (prop === "finishRun") {
return async (runId: string, result: Parameters<AgentRunStore["finishRun"]>[1]): Promise<RunRecord> => {
const run = await target.finishRun(runId, result);
publishAgentRunShadow({ env, eventType: "agentrun.run.terminal", run, eventPayload: terminalPayload(result, null) });
return run;
};
}
const value = Reflect.get(target, prop, receiver) as unknown;
if (typeof value === "function") return (value as StoreFunction).bind(target);
return value;
},
}) as AgentRunStore;
}
function publishAgentRunShadow(input: ShadowEventInput): void {
const config = kafkaShadowConfig(input.env);
if (!config) return;
void Promise.resolve(resolveRun(input))
.then((run) => {
const message = buildAgentRunShadowMessage({ ...input, run, config });
return producerForConfig(config).then((producer) => producer.send({ topic: config.topic, messages: [message] }));
})
.catch((error) => warnShadowProducer("agentrun-kafka-shadow-produce-failed", {
message: error instanceof Error ? error.message : String(error),
eventType: input.eventType,
topic: config.topic,
clientId: config.clientId,
}));
}
async function resolveRun(input: ShadowEventInput): Promise<RunRecord | null> {
if (input.run) return input.run;
if (!input.store || !input.runId) return null;
return await input.store.getRun(input.runId);
}
function buildAgentRunShadowMessage(input: ShadowEventInput & { run: RunRecord | null; config: KafkaShadowConfig }): { key: string; value: string; headers: Record<string, string> } {
const payload = input.eventPayload ?? eventPayload(input.event);
const event = {
schema: "agentrun.hwlab.event.shadow.v1",
eventType: input.eventType,
source: input.config.clientId,
producedAt: new Date().toISOString(),
mode: "shadow-produce-only",
run: runSummary(input.run),
command: commandSummary(input.command),
event: runEventSummary(input.event, payload),
diagnostics: {
payloadSha256: stableHash((payload ?? {}) as JsonValue),
payloadBytes: jsonBytes(payload ?? {}),
valuesPrinted: false,
shadowConsumeEnabled: false,
},
valuesPrinted: false,
};
return {
key: safeText(input.command?.id ?? input.eventPayload?.commandId ?? input.run?.id ?? input.runId) ?? input.config.clientId,
value: JSON.stringify(event),
headers: {
"x-shadow-mode": "produce-only",
"x-values-printed": "false",
"x-event-type": input.eventType,
},
};
}
function kafkaShadowConfig(env: NodeJS.ProcessEnv): KafkaShadowConfig | null {
if (!truthy(env.AGENTRUN_KAFKA_SHADOW_PRODUCE_ENABLED)) return null;
if (truthy(env.AGENTRUN_KAFKA_SHADOW_CONSUME_ENABLED)) {
warnOnce("agentrun-kafka-shadow-consume-ignored", {
message: "AgentRun Kafka consumer cutover is disabled in this stage; producer continues in shadow mode.",
valuesPrinted: false,
});
}
const brokers = csv(env.AGENTRUN_KAFKA_BOOTSTRAP_SERVERS);
const topic = safeText(env.AGENTRUN_KAFKA_EVENT_TOPIC);
const clientId = safeText(env.AGENTRUN_KAFKA_CLIENT_ID);
const missing: string[] = [];
if (brokers.length === 0) missing.push("AGENTRUN_KAFKA_BOOTSTRAP_SERVERS");
if (!topic) missing.push("AGENTRUN_KAFKA_EVENT_TOPIC");
if (!clientId) missing.push("AGENTRUN_KAFKA_CLIENT_ID");
if (missing.length > 0 || !topic || !clientId) {
warnOnce("agentrun-kafka-shadow-config-missing", { missing, valuesPrinted: false });
return null;
}
return { brokers, topic, clientId, key: `${clientId}|${topic}|${brokers.join(",")}` };
}
async function producerForConfig(config: KafkaShadowConfig): Promise<Producer> {
if (producerState?.key === config.key && producerState.producer) return producerState.producer;
if (producerState?.key === config.key && producerState.connecting) return producerState.connecting;
const kafka = new Kafka({ clientId: config.clientId, brokers: config.brokers, logLevel: logLevel.NOTHING });
const producer = kafka.producer({ allowAutoTopicCreation: false });
const connecting = producer.connect().then(() => {
if (producerState?.producer === producer) producerState.connecting = null;
return producer;
}).catch((error) => {
if (producerState?.producer === producer) producerState = null;
throw error;
});
const state: { key: string; producer: Producer; connecting: Promise<Producer> | null } = {
key: config.key,
producer,
connecting,
};
producerState = state;
return await connecting;
}
function runSummary(run: RunRecord | null): JsonRecord | null {
if (!run) return null;
return {
runId: run.id,
status: run.status,
terminalStatus: run.terminalStatus,
failureKind: run.failureKind,
tenantId: run.tenantId,
projectId: run.projectId,
providerId: run.providerId,
backendProfile: run.backendProfile,
sessionId: run.sessionRef?.sessionId ?? null,
conversationId: run.sessionRef?.conversationId ?? null,
threadId: run.sessionRef?.threadId ?? null,
valuesPrinted: false,
};
}
function commandSummary(command: CommandRecord | null | undefined): JsonRecord | null {
if (!command) return null;
return {
commandId: command.id,
runId: command.runId,
seq: command.seq,
type: command.type,
state: command.state,
payloadHash: command.payloadHash,
cancelEpoch: command.cancelEpoch,
valuesPrinted: false,
};
}
function runEventSummary(event: RunEvent | undefined, payload: JsonRecord | null): JsonRecord | null {
if (!event && !payload) return null;
return {
eventId: event?.id ?? null,
runId: event?.runId ?? null,
seq: event?.seq ?? null,
type: event?.type ?? null,
commandId: safeText(event?.commandId ?? payload?.commandId),
phase: safeText(payload?.phase),
state: safeText(payload?.state),
terminalStatus: safeText(payload?.terminalStatus),
failureKind: safeText(payload?.failureKind),
threadId: safeText(payload?.threadId),
turnId: safeText(payload?.turnId),
valuesPrinted: false,
};
}
function terminalPayload(result: Parameters<AgentRunStore["finishCommand"]>[1], command: CommandRecord | null): JsonRecord {
return {
phase: command ? "command-terminal" : "run-terminal",
commandId: command?.id ?? null,
state: command?.state ?? null,
terminalStatus: result.terminalStatus,
failureKind: result.failureKind ?? null,
threadId: result.threadId ?? null,
turnId: result.turnId ?? null,
valuesPrinted: false,
};
}
function eventPayload(event: RunEvent | undefined): JsonRecord | null {
if (!event || typeof event.payload !== "object" || event.payload === null || Array.isArray(event.payload)) return null;
return event.payload as JsonRecord;
}
function truthy(value: string | undefined): boolean {
return TRUE_VALUES.has(String(value ?? "").trim().toLowerCase());
}
function csv(value: string | undefined): string[] {
return String(value ?? "").split(",").map((entry) => entry.trim()).filter(Boolean);
}
function safeText(value: unknown, max = 240): string | null {
const text = typeof value === "string" ? value.trim() : value === undefined || value === null ? "" : String(value).trim();
if (text.length === 0) return null;
return text.length > max ? `${text.slice(0, max)}...` : text;
}
function jsonBytes(value: JsonRecord): number {
return Buffer.byteLength(JSON.stringify(value), "utf8");
}
function warnOnce(code: string, details: JsonRecord): void {
if (warnedCodes.has(code)) return;
warnedCodes.add(code);
warnShadowProducer(code, details);
}
function warnShadowProducer(code: string, details: JsonRecord = {}): void {
console.warn(JSON.stringify({ code, component: "agentrun-kafka-shadow-producer", ...details, valuesPrinted: false }));
}
+3 -1
View File
@@ -3,6 +3,7 @@ import { createServer } from "node:http";
import type { AddressInfo } from "node:net";
import type { AgentRunStore, ListQueueTasksInput, ListSessionsInput, SessionEventPageInput } from "./store.js";
import { openAgentRunStoreFromEnv } from "./store.js";
import { withKafkaShadowProducer } from "./kafka-shadow-producer.js";
import { AgentRunError, errorToJson } from "../common/errors.js";
import { asRecord, validateBackendProfile, validateCreateCommand, validateCreateQueueTask, validateCreateRun, validateQueueTaskState, validateSessionListState } from "../common/validation.js";
import { isBackendProfile } from "../common/backend-profiles.js";
@@ -143,7 +144,8 @@ export interface StartedManagerServer {
}
export async function startManagerServer(options: ManagerServerOptions = {}): Promise<StartedManagerServer> {
const store = options.store ?? await openAgentRunStoreFromEnv();
const rawStore = options.store ?? await openAgentRunStoreFromEnv();
const store = withKafkaShadowProducer(rawStore, process.env);
const sourceCommit = options.sourceCommit ?? process.env.AGENTRUN_SOURCE_COMMIT ?? "unknown";
const runnerJobDefaults = options.runnerJobDefaults;
const sessionPvcDefaults = options.sessionPvcOptions;