diff --git a/src/backend/adapter.ts b/src/backend/adapter.ts index 3adf883..107bc36 100644 --- a/src/backend/adapter.ts +++ b/src/backend/adapter.ts @@ -1,4 +1,4 @@ -import type { BackendTurnResult, CommandRecord, RunRecord } from "../common/types.js"; +import type { BackendEvent, BackendTurnResult, CommandRecord, RunRecord } from "../common/types.js"; import { CodexStdioBackendSession, runCodexStdioTurn, type CodexStdioTurnOptions } from "./codex-stdio.js"; import { backendProfileSpec } from "../common/backend-profiles.js"; @@ -8,6 +8,7 @@ export interface BackendAdapterOptions { codexHome?: string; workspacePath?: string; abortSignal?: AbortSignal; + onEvent?: (event: BackendEvent) => void | Promise; env?: NodeJS.ProcessEnv; } @@ -52,5 +53,6 @@ export function backendTurnOptions(run: RunRecord, command: CommandRecord, optio if (options.env) turnOptions.env = options.env; if (options.codexHome) turnOptions.codexHome = options.codexHome; if (options.abortSignal) turnOptions.abortSignal = options.abortSignal; + if (options.onEvent) turnOptions.onEvent = options.onEvent; return turnOptions; } diff --git a/src/backend/codex-stdio.ts b/src/backend/codex-stdio.ts index d91cd7d..b1cd54e 100644 --- a/src/backend/codex-stdio.ts +++ b/src/backend/codex-stdio.ts @@ -47,6 +47,7 @@ export interface CodexStdioTurnOptions { env?: NodeJS.ProcessEnv; codexHome?: string; abortSignal?: AbortSignal; + onEvent?: (event: BackendEvent) => void | Promise; } interface PendingRequest { @@ -302,15 +303,15 @@ export class CodexStdioBackendSession { return [{ type: "backend_status", payload: { phase: "codex-app-server-closed", appServerExit: closeEvent(closeInfo) } }]; } - async getClient(options: CodexStdioTurnOptions, env: NodeJS.ProcessEnv, events: BackendEvent[]): Promise { + async getClient(options: CodexStdioTurnOptions, env: NodeJS.ProcessEnv, emitEvent: (event: BackendEvent) => void): Promise { const key = codexClientKey(options, env); if (this.client && !this.client.isClosed && this.clientKey === key) { - events.push({ type: "backend_status", payload: { phase: "codex-app-server:reused", ...backendMetadata(options), protocol: codexProtocol } }); + emitEvent({ type: "backend_status", payload: { phase: "codex-app-server:reused", ...backendMetadata(options), protocol: codexProtocol } }); return this.client; } const closeEvents = await this.close(); - events.push(...closeEvents); - events.push({ + for (const event of closeEvents) emitEvent(event); + emitEvent({ type: "backend_status", payload: { phase: "codex-app-server-starting", @@ -332,7 +333,7 @@ export class CodexStdioBackendSession { const initializeResult = requireResponseRecord(await this.client.request("initialize", { clientInfo: { name: "agentrun", title: "AgentRun", version: "0.1.0" }, capabilities: { experimentalApi: true } }, requestTimeoutMs), "initialize"); validateInitializeResponse(initializeResult); this.client.notify("initialized", {}); - events.push({ type: "backend_status", payload: { phase: "initialize:completed", ...backendMetadata(options), protocol: codexProtocol } }); + emitEvent({ type: "backend_status", payload: { phase: "initialize:completed", ...backendMetadata(options), protocol: codexProtocol } }); return this.client; } @@ -356,6 +357,18 @@ async function runCodexStdioTurnWithSession(options: CodexStdioTurnOptions, sess if (secretFailure) return secretFailure; const env = childEnv(options, codexHome); const events: BackendEvent[] = []; + let liveEventWrite = Promise.resolve(); + const emitEvent = (event: BackendEvent): void => { + const redactedEvent: BackendEvent = { ...event, payload: redactJson(event.payload) }; + if (options.onEvent) { + liveEventWrite = liveEventWrite.then(() => Promise.resolve(options.onEvent?.(redactedEvent))).catch(() => undefined); + return; + } + events.push(redactedEvent); + }; + const emitEvents = (nextEvents: BackendEvent[]): void => { + for (const event of nextEvents) emitEvent(event); + }; if (options.abortSignal?.aborted) { const cancelled = { status: "cancelled" as const, failureKind: "cancelled" as const, message: "cancel requested" }; events.push({ type: "backend_status", payload: { phase: "turn-cancelled", failureKind: "cancelled" } }); @@ -374,7 +387,7 @@ async function runCodexStdioTurnWithSession(options: CodexStdioTurnOptions, sess const abortTurn = (): void => { if (terminal) return; terminal = { status: "cancelled", failureKind: "cancelled", message: "cancel requested" }; - events.push({ type: "backend_status", payload: { phase: "turn-cancelled", failureKind: "cancelled" } }); + emitEvent({ type: "backend_status", payload: { phase: "turn-cancelled", failureKind: "cancelled" } }); client?.stop(); terminalResolve(); }; @@ -382,7 +395,7 @@ async function runCodexStdioTurnWithSession(options: CodexStdioTurnOptions, sess const timeout = setTimeout(() => { if (terminal) return; terminal = { status: "failed", failureKind: "backend-timeout", message: `codex stdio turn timed out after ${options.timeoutMs}ms` }; - events.push({ type: "error", payload: { failureKind: terminal.failureKind, message: terminal.message, phase: "turn:timeout" } }); + emitEvent({ type: "error", payload: { failureKind: terminal.failureKind, message: terminal.message, phase: "turn:timeout" } }); client?.stop(); terminalResolve(); }, positiveTimeout(options.timeoutMs)); @@ -392,19 +405,19 @@ async function runCodexStdioTurnWithSession(options: CodexStdioTurnOptions, sess if (normalized.turnId) turnId = normalized.turnId; if (normalized.assistantDelta) assistantText += normalized.assistantDelta; if (typeof normalized.assistantFinal === "string" && normalized.assistantFinal.trim().length > 0) finalAssistantText = normalized.assistantFinal; - events.push(...normalized.events); + emitEvents(normalized.events); if (normalized.terminal && !terminal) { terminal = normalized.terminal; terminalResolve(); } }); try { - client = await session.getClient(options, env, events); + client = await session.getClient(options, env, emitEvent); const startThread = async (phasePrefix = "thread/start"): Promise => { const response = requireResponseRecord(await client!.request("thread/start", withOptionalModel({ cwd: options.cwd, approvalPolicy: options.approvalPolicy, sandbox: options.sandbox, serviceName: "agentrun" }, options.model), requestTimeoutMs), "thread/start"); const nextThreadId = requireNestedId(response, "thread/start", "thread"); - events.push({ type: "backend_status", payload: { phase: `${phasePrefix}:completed`, threadId: nextThreadId } }); + emitEvent({ type: "backend_status", payload: { phase: `${phasePrefix}:completed`, threadId: nextThreadId } }); return nextThreadId; }; @@ -412,11 +425,11 @@ async function runCodexStdioTurnWithSession(options: CodexStdioTurnOptions, sess try { const threadResponse = requireResponseRecord(await client.request("thread/resume", withOptionalModel({ threadId: options.threadId, cwd: options.cwd, approvalPolicy: options.approvalPolicy, sandbox: options.sandbox }, options.model), requestTimeoutMs), "thread/resume"); threadId = requireNestedId(threadResponse, "thread/resume", "thread"); - events.push({ type: "backend_status", payload: { phase: "thread/resume:completed", threadId } }); + emitEvent({ type: "backend_status", payload: { phase: "thread/resume:completed", threadId } }); } catch (error) { const failure = normalizeFailure(error); if (!isStaleThreadResumeFailure(failure)) throw error; - events.push({ + emitEvent({ type: "backend_status", payload: { phase: "thread/resume:stale-thread-fallback", @@ -434,7 +447,7 @@ async function runCodexStdioTurnWithSession(options: CodexStdioTurnOptions, sess const turnResponse = requireResponseRecord(await client.request("turn/start", withOptionalModel({ threadId, input: textInput(options.prompt), cwd: options.cwd, approvalPolicy: options.approvalPolicy }, options.model), requestTimeoutMs), "turn/start"); turnId = requireNestedId(turnResponse, "turn/start", "turn"); - events.push({ type: "backend_status", payload: { phase: "turn/start:completed", turnId } }); + emitEvent({ type: "backend_status", payload: { phase: "turn/start:completed", turnId } }); const race = await Promise.race([ terminalPromise.then(() => ({ kind: "terminal" as const })), @@ -442,14 +455,14 @@ async function runCodexStdioTurnWithSession(options: CodexStdioTurnOptions, sess ]); if (race.kind === "closed" && !terminal) { terminal = terminalFromClose(race.closeInfo); - events.push({ type: "error", payload: { failureKind: terminal.failureKind, message: terminal.message, phase: "transport:closed-before-terminal", appServerExit: closeEvent(race.closeInfo) } }); + emitEvent({ type: "error", payload: { failureKind: terminal.failureKind, message: terminal.message, phase: "transport:closed-before-terminal", appServerExit: closeEvent(race.closeInfo) } }); } if (!terminal) terminal = { status: "failed", failureKind: "backend-response-invalid", message: "codex app-server did not emit turn/completed" }; } catch (error) { if (!terminal) { const failure = normalizeFailure(error); terminal = { status: failure.failureKind === "secret-unavailable" ? "blocked" : "failed", failureKind: failure.failureKind, message: failure.message }; - events.push({ type: "error", payload: { failureKind: failure.failureKind, message: failure.message, phase: failure.phase, details: failure.details } }); + emitEvent({ type: "error", payload: { failureKind: failure.failureKind, message: failure.message, phase: failure.phase, details: failure.details } }); } } finally { stopNotifications(); @@ -457,10 +470,11 @@ async function runCodexStdioTurnWithSession(options: CodexStdioTurnOptions, sess clearTimeout(timeout); } if (!terminal) terminal = { status: "failed", failureKind: "backend-response-invalid", message: "codex app-server finished without terminal status" }; - if (terminal.status !== "completed") events.push(...await session.close()); + if (terminal.status !== "completed") emitEvents(await session.close()); const reply = finalAssistantText.trim().length > 0 ? finalAssistantText : assistantText; if (reply.trim().length > 0) events.push({ type: "assistant_message", payload: { text: reply } }); events.push({ type: "terminal_status", payload: { terminalStatus: terminal.status, failureKind: terminal.failureKind, message: terminal.message } }); + await liveEventWrite; return { terminalStatus: terminal.status, failureKind: terminal.failureKind, failureMessage: terminal.message, events: events.map((event) => ({ ...event, payload: redactJson(event.payload) })), ...(threadId ? { threadId } : {}), ...(turnId ? { turnId } : {}) }; } diff --git a/src/runner/run-once.ts b/src/runner/run-once.ts index e2fcc8d..bd9e437 100644 --- a/src/runner/run-once.ts +++ b/src/runner/run-once.ts @@ -141,7 +141,14 @@ async function executeCommand(api: RunnerManagerApi, options: RunnerOnceOptions, const stopBackendProgress = startBackendProgress(api, options.runId, command.id, attemptId, runner.id, options.backendProfile ?? null); try { const latestRun = await api.getRun(options.runId); - const backendOptions = { ...options, ...(workspacePath ? { workspacePath } : {}), abortSignal: abortController.signal }; + const backendOptions = { + ...options, + ...(workspacePath ? { workspacePath } : {}), + abortSignal: abortController.signal, + onEvent: async (event: BackendEvent) => { + await appendBestEffort(api, options.runId, annotateCommandEvent(event, command.id, attemptId, runner.id)); + }, + }; const result = backendSession ? await backendSession.runTurn(latestRun, command, backendOptions) : await runBackendTurn(latestRun, command, backendOptions); for (const event of result.events) { await api.appendEvent(options.runId, annotateCommandEvent(event, command.id, attemptId, runner.id)); diff --git a/src/selftest/cases/30-codex-stdio.ts b/src/selftest/cases/30-codex-stdio.ts index 8f321fd..1918e1f 100644 --- a/src/selftest/cases/30-codex-stdio.ts +++ b/src/selftest/cases/30-codex-stdio.ts @@ -88,6 +88,13 @@ const selfTest: SelfTestCase = async (context) => { assert.ok(staleEvents.items?.some((event) => event.type === "backend_status" && eventPayload(event).phase === "thread/start:after-stale-resume:completed"), "fallback should start a fresh thread"); assertNoSecretLeak(staleEvents); + const live = await createRunWithCommand(client, context, "hello live events", "selftest-live-tool-events", 15_000); + const livePromise = runOnce({ managerUrl: server.baseUrl, runId: live.runId, codexCommand: context.fakeCodexCommand, codexArgs: context.fakeCodexArgs, codexHome: context.codexHome, env: { CODEX_HOME: context.codexHome, AGENTRUN_FAKE_CODEX_MODE: "slow-tool-events" }, oneShot: true }) as Promise; + await waitForEvent(client, live.runId, (event) => event.type === "tool_call" && eventPayload(event).method === "item/started", "live tool_call start event"); + await waitForEvent(client, live.runId, (event) => event.type === "command_output" && String(eventPayload(event).text ?? "").includes("live output"), "live command output event"); + const liveResult = await livePromise; + assert.equal(liveResult.terminalStatus, "completed", "slow live tool event turn should complete"); + await runFailureCase({ client, managerUrl: server.baseUrl, context, mode: "missing-turn-result", expectedStatus: "failed", expectedFailureKind: "backend-response-invalid" }); await runFailureCase({ client, managerUrl: server.baseUrl, context, mode: "provider-401-rpc-error", expectedStatus: "failed", expectedFailureKind: "provider-auth-failed" }); await runFailureCase({ client, managerUrl: server.baseUrl, context, mode: "provider-429-terminal", expectedStatus: "failed", expectedFailureKind: "provider-rate-limited" }); @@ -99,7 +106,7 @@ const selfTest: SelfTestCase = async (context) => { await runSecretFailureCase({ client, managerUrl: server.baseUrl, context }); await runSpawnFailureCase({ client, managerUrl: server.baseUrl, context }); - return { name: "codex-stdio", tests: ["runner-lease-heartbeat", "codex-stdio-fake-turn", "codex-stdio-projected-writable-home", "codex-stdio-deepseek-profile-fake-turn", "codex-stdio-deepseek-missing-secret-no-fallback", "codex-stdio-config-model-authoritative", "codex-stdio-explicit-model-forwarded", "codex-stdio-final-agent-message-only", "codex-stdio-stale-thread-fallback", "codex-stdio-missing-turn-result", "codex-stdio-provider-auth-failed", "codex-stdio-provider-rate-limited", "codex-stdio-provider-503-rpc-error", "codex-stdio-provider-503-terminal", "codex-stdio-provider-503-retry-event", "codex-stdio-invalid-json", "codex-stdio-timeout", "codex-stdio-secret-unavailable", "codex-stdio-spawn-failure"] }; + return { name: "codex-stdio", tests: ["runner-lease-heartbeat", "codex-stdio-fake-turn", "codex-stdio-projected-writable-home", "codex-stdio-deepseek-profile-fake-turn", "codex-stdio-deepseek-missing-secret-no-fallback", "codex-stdio-config-model-authoritative", "codex-stdio-explicit-model-forwarded", "codex-stdio-final-agent-message-only", "codex-stdio-stale-thread-fallback", "codex-stdio-live-tool-events", "codex-stdio-missing-turn-result", "codex-stdio-provider-auth-failed", "codex-stdio-provider-rate-limited", "codex-stdio-provider-503-rpc-error", "codex-stdio-provider-503-terminal", "codex-stdio-provider-503-retry-event", "codex-stdio-invalid-json", "codex-stdio-timeout", "codex-stdio-secret-unavailable", "codex-stdio-spawn-failure"] }; } finally { await new Promise((resolve) => server.server.close(() => resolve())); } @@ -136,6 +143,16 @@ function eventPayload(event: { payload: unknown }): JsonRecord { return typeof event.payload === "object" && event.payload !== null && !Array.isArray(event.payload) ? event.payload as JsonRecord : {}; } +async function waitForEvent(client: ManagerClient, runId: string, predicate: (event: { type: string; payload: unknown }) => boolean, label: string): Promise { + const deadline = Date.now() + 3_000; + while (Date.now() < deadline) { + const events = await client.get(`/api/v1/runs/${runId}/events?afterSeq=0&limit=100`) as { items?: Array<{ type: string; payload: unknown }> }; + if ((events.items ?? []).some(predicate)) return; + await new Promise((resolve) => setTimeout(resolve, 25)); + } + assert.fail(`timed out waiting for ${label}`); +} + async function createStaleThreadRun(client: ManagerClient, context: SelfTestContext): Promise<{ runId: string; commandId: string }> { const run = await client.post("/api/v1/runs", { tenantId: "unidesk", diff --git a/src/selftest/fake-codex-app-server.ts b/src/selftest/fake-codex-app-server.ts index 245259d..3a994c9 100644 --- a/src/selftest/fake-codex-app-server.ts +++ b/src/selftest/fake-codex-app-server.ts @@ -134,6 +134,20 @@ for await (const line of rl) { respond(message.id, { turn }); continue; } + if (mode === "slow-tool-events") { + turnCounter += 1; + const turn = { id: `turn_selftest_${turnCounter}`, status: "completed" }; + notify("turn/started", { turn }); + notify("item/started", { item: { id: "tool_selftest", type: "commandExecution", command: "sleep 0.05 && echo live" } }); + notify("item/commandExecution/outputDelta", { itemId: "tool_selftest", delta: "live output\n" }); + setTimeout(() => { + notify("item/completed", { item: { id: "tool_selftest", type: "commandExecution", command: "sleep 0.05 && echo live", status: "completed" } }); + notify("item/agentMessage/delta", { itemId: "msg_selftest", delta: "done" }); + notify("turn/completed", { turn }); + respond(message.id, { turn }); + }, 50); + continue; + } turnCounter += 1; const turn = { id: `turn_selftest_${turnCounter}`, status: "completed" }; notify("turn/started", { turn });