fix: 实时上报 Codex 工具事件

This commit is contained in:
Codex
2026-06-02 03:23:16 +08:00
parent 9b2c637b64
commit 5544db96fb
5 changed files with 73 additions and 19 deletions
+3 -1
View File
@@ -1,4 +1,4 @@
import type { BackendTurnResult, CommandRecord, RunRecord } from "../common/types.js";
import type { BackendEvent, BackendTurnResult, CommandRecord, RunRecord } from "../common/types.js";
import { CodexStdioBackendSession, runCodexStdioTurn, type CodexStdioTurnOptions } from "./codex-stdio.js";
import { backendProfileSpec } from "../common/backend-profiles.js";
@@ -8,6 +8,7 @@ export interface BackendAdapterOptions {
codexHome?: string;
workspacePath?: string;
abortSignal?: AbortSignal;
onEvent?: (event: BackendEvent) => void | Promise<void>;
env?: NodeJS.ProcessEnv;
}
@@ -52,5 +53,6 @@ export function backendTurnOptions(run: RunRecord, command: CommandRecord, optio
if (options.env) turnOptions.env = options.env;
if (options.codexHome) turnOptions.codexHome = options.codexHome;
if (options.abortSignal) turnOptions.abortSignal = options.abortSignal;
if (options.onEvent) turnOptions.onEvent = options.onEvent;
return turnOptions;
}
+30 -16
View File
@@ -47,6 +47,7 @@ export interface CodexStdioTurnOptions {
env?: NodeJS.ProcessEnv;
codexHome?: string;
abortSignal?: AbortSignal;
onEvent?: (event: BackendEvent) => void | Promise<void>;
}
interface PendingRequest {
@@ -302,15 +303,15 @@ export class CodexStdioBackendSession {
return [{ type: "backend_status", payload: { phase: "codex-app-server-closed", appServerExit: closeEvent(closeInfo) } }];
}
async getClient(options: CodexStdioTurnOptions, env: NodeJS.ProcessEnv, events: BackendEvent[]): Promise<CodexStdioClient> {
async getClient(options: CodexStdioTurnOptions, env: NodeJS.ProcessEnv, emitEvent: (event: BackendEvent) => void): Promise<CodexStdioClient> {
const key = codexClientKey(options, env);
if (this.client && !this.client.isClosed && this.clientKey === key) {
events.push({ type: "backend_status", payload: { phase: "codex-app-server:reused", ...backendMetadata(options), protocol: codexProtocol } });
emitEvent({ type: "backend_status", payload: { phase: "codex-app-server:reused", ...backendMetadata(options), protocol: codexProtocol } });
return this.client;
}
const closeEvents = await this.close();
events.push(...closeEvents);
events.push({
for (const event of closeEvents) emitEvent(event);
emitEvent({
type: "backend_status",
payload: {
phase: "codex-app-server-starting",
@@ -332,7 +333,7 @@ export class CodexStdioBackendSession {
const initializeResult = requireResponseRecord(await this.client.request("initialize", { clientInfo: { name: "agentrun", title: "AgentRun", version: "0.1.0" }, capabilities: { experimentalApi: true } }, requestTimeoutMs), "initialize");
validateInitializeResponse(initializeResult);
this.client.notify("initialized", {});
events.push({ type: "backend_status", payload: { phase: "initialize:completed", ...backendMetadata(options), protocol: codexProtocol } });
emitEvent({ type: "backend_status", payload: { phase: "initialize:completed", ...backendMetadata(options), protocol: codexProtocol } });
return this.client;
}
@@ -356,6 +357,18 @@ async function runCodexStdioTurnWithSession(options: CodexStdioTurnOptions, sess
if (secretFailure) return secretFailure;
const env = childEnv(options, codexHome);
const events: BackendEvent[] = [];
let liveEventWrite = Promise.resolve();
const emitEvent = (event: BackendEvent): void => {
const redactedEvent: BackendEvent = { ...event, payload: redactJson(event.payload) };
if (options.onEvent) {
liveEventWrite = liveEventWrite.then(() => Promise.resolve(options.onEvent?.(redactedEvent))).catch(() => undefined);
return;
}
events.push(redactedEvent);
};
const emitEvents = (nextEvents: BackendEvent[]): void => {
for (const event of nextEvents) emitEvent(event);
};
if (options.abortSignal?.aborted) {
const cancelled = { status: "cancelled" as const, failureKind: "cancelled" as const, message: "cancel requested" };
events.push({ type: "backend_status", payload: { phase: "turn-cancelled", failureKind: "cancelled" } });
@@ -374,7 +387,7 @@ async function runCodexStdioTurnWithSession(options: CodexStdioTurnOptions, sess
const abortTurn = (): void => {
if (terminal) return;
terminal = { status: "cancelled", failureKind: "cancelled", message: "cancel requested" };
events.push({ type: "backend_status", payload: { phase: "turn-cancelled", failureKind: "cancelled" } });
emitEvent({ type: "backend_status", payload: { phase: "turn-cancelled", failureKind: "cancelled" } });
client?.stop();
terminalResolve();
};
@@ -382,7 +395,7 @@ async function runCodexStdioTurnWithSession(options: CodexStdioTurnOptions, sess
const timeout = setTimeout(() => {
if (terminal) return;
terminal = { status: "failed", failureKind: "backend-timeout", message: `codex stdio turn timed out after ${options.timeoutMs}ms` };
events.push({ type: "error", payload: { failureKind: terminal.failureKind, message: terminal.message, phase: "turn:timeout" } });
emitEvent({ type: "error", payload: { failureKind: terminal.failureKind, message: terminal.message, phase: "turn:timeout" } });
client?.stop();
terminalResolve();
}, positiveTimeout(options.timeoutMs));
@@ -392,19 +405,19 @@ async function runCodexStdioTurnWithSession(options: CodexStdioTurnOptions, sess
if (normalized.turnId) turnId = normalized.turnId;
if (normalized.assistantDelta) assistantText += normalized.assistantDelta;
if (typeof normalized.assistantFinal === "string" && normalized.assistantFinal.trim().length > 0) finalAssistantText = normalized.assistantFinal;
events.push(...normalized.events);
emitEvents(normalized.events);
if (normalized.terminal && !terminal) {
terminal = normalized.terminal;
terminalResolve();
}
});
try {
client = await session.getClient(options, env, events);
client = await session.getClient(options, env, emitEvent);
const startThread = async (phasePrefix = "thread/start"): Promise<string> => {
const response = requireResponseRecord(await client!.request("thread/start", withOptionalModel({ cwd: options.cwd, approvalPolicy: options.approvalPolicy, sandbox: options.sandbox, serviceName: "agentrun" }, options.model), requestTimeoutMs), "thread/start");
const nextThreadId = requireNestedId(response, "thread/start", "thread");
events.push({ type: "backend_status", payload: { phase: `${phasePrefix}:completed`, threadId: nextThreadId } });
emitEvent({ type: "backend_status", payload: { phase: `${phasePrefix}:completed`, threadId: nextThreadId } });
return nextThreadId;
};
@@ -412,11 +425,11 @@ async function runCodexStdioTurnWithSession(options: CodexStdioTurnOptions, sess
try {
const threadResponse = requireResponseRecord(await client.request("thread/resume", withOptionalModel({ threadId: options.threadId, cwd: options.cwd, approvalPolicy: options.approvalPolicy, sandbox: options.sandbox }, options.model), requestTimeoutMs), "thread/resume");
threadId = requireNestedId(threadResponse, "thread/resume", "thread");
events.push({ type: "backend_status", payload: { phase: "thread/resume:completed", threadId } });
emitEvent({ type: "backend_status", payload: { phase: "thread/resume:completed", threadId } });
} catch (error) {
const failure = normalizeFailure(error);
if (!isStaleThreadResumeFailure(failure)) throw error;
events.push({
emitEvent({
type: "backend_status",
payload: {
phase: "thread/resume:stale-thread-fallback",
@@ -434,7 +447,7 @@ async function runCodexStdioTurnWithSession(options: CodexStdioTurnOptions, sess
const turnResponse = requireResponseRecord(await client.request("turn/start", withOptionalModel({ threadId, input: textInput(options.prompt), cwd: options.cwd, approvalPolicy: options.approvalPolicy }, options.model), requestTimeoutMs), "turn/start");
turnId = requireNestedId(turnResponse, "turn/start", "turn");
events.push({ type: "backend_status", payload: { phase: "turn/start:completed", turnId } });
emitEvent({ type: "backend_status", payload: { phase: "turn/start:completed", turnId } });
const race = await Promise.race([
terminalPromise.then(() => ({ kind: "terminal" as const })),
@@ -442,14 +455,14 @@ async function runCodexStdioTurnWithSession(options: CodexStdioTurnOptions, sess
]);
if (race.kind === "closed" && !terminal) {
terminal = terminalFromClose(race.closeInfo);
events.push({ type: "error", payload: { failureKind: terminal.failureKind, message: terminal.message, phase: "transport:closed-before-terminal", appServerExit: closeEvent(race.closeInfo) } });
emitEvent({ type: "error", payload: { failureKind: terminal.failureKind, message: terminal.message, phase: "transport:closed-before-terminal", appServerExit: closeEvent(race.closeInfo) } });
}
if (!terminal) terminal = { status: "failed", failureKind: "backend-response-invalid", message: "codex app-server did not emit turn/completed" };
} catch (error) {
if (!terminal) {
const failure = normalizeFailure(error);
terminal = { status: failure.failureKind === "secret-unavailable" ? "blocked" : "failed", failureKind: failure.failureKind, message: failure.message };
events.push({ type: "error", payload: { failureKind: failure.failureKind, message: failure.message, phase: failure.phase, details: failure.details } });
emitEvent({ type: "error", payload: { failureKind: failure.failureKind, message: failure.message, phase: failure.phase, details: failure.details } });
}
} finally {
stopNotifications();
@@ -457,10 +470,11 @@ async function runCodexStdioTurnWithSession(options: CodexStdioTurnOptions, sess
clearTimeout(timeout);
}
if (!terminal) terminal = { status: "failed", failureKind: "backend-response-invalid", message: "codex app-server finished without terminal status" };
if (terminal.status !== "completed") events.push(...await session.close());
if (terminal.status !== "completed") emitEvents(await session.close());
const reply = finalAssistantText.trim().length > 0 ? finalAssistantText : assistantText;
if (reply.trim().length > 0) events.push({ type: "assistant_message", payload: { text: reply } });
events.push({ type: "terminal_status", payload: { terminalStatus: terminal.status, failureKind: terminal.failureKind, message: terminal.message } });
await liveEventWrite;
return { terminalStatus: terminal.status, failureKind: terminal.failureKind, failureMessage: terminal.message, events: events.map((event) => ({ ...event, payload: redactJson(event.payload) })), ...(threadId ? { threadId } : {}), ...(turnId ? { turnId } : {}) };
}
+8 -1
View File
@@ -141,7 +141,14 @@ async function executeCommand(api: RunnerManagerApi, options: RunnerOnceOptions,
const stopBackendProgress = startBackendProgress(api, options.runId, command.id, attemptId, runner.id, options.backendProfile ?? null);
try {
const latestRun = await api.getRun(options.runId);
const backendOptions = { ...options, ...(workspacePath ? { workspacePath } : {}), abortSignal: abortController.signal };
const backendOptions = {
...options,
...(workspacePath ? { workspacePath } : {}),
abortSignal: abortController.signal,
onEvent: async (event: BackendEvent) => {
await appendBestEffort(api, options.runId, annotateCommandEvent(event, command.id, attemptId, runner.id));
},
};
const result = backendSession ? await backendSession.runTurn(latestRun, command, backendOptions) : await runBackendTurn(latestRun, command, backendOptions);
for (const event of result.events) {
await api.appendEvent(options.runId, annotateCommandEvent(event, command.id, attemptId, runner.id));
+18 -1
View File
@@ -88,6 +88,13 @@ const selfTest: SelfTestCase = async (context) => {
assert.ok(staleEvents.items?.some((event) => event.type === "backend_status" && eventPayload(event).phase === "thread/start:after-stale-resume:completed"), "fallback should start a fresh thread");
assertNoSecretLeak(staleEvents);
const live = await createRunWithCommand(client, context, "hello live events", "selftest-live-tool-events", 15_000);
const livePromise = runOnce({ managerUrl: server.baseUrl, runId: live.runId, codexCommand: context.fakeCodexCommand, codexArgs: context.fakeCodexArgs, codexHome: context.codexHome, env: { CODEX_HOME: context.codexHome, AGENTRUN_FAKE_CODEX_MODE: "slow-tool-events" }, oneShot: true }) as Promise<JsonRecord>;
await waitForEvent(client, live.runId, (event) => event.type === "tool_call" && eventPayload(event).method === "item/started", "live tool_call start event");
await waitForEvent(client, live.runId, (event) => event.type === "command_output" && String(eventPayload(event).text ?? "").includes("live output"), "live command output event");
const liveResult = await livePromise;
assert.equal(liveResult.terminalStatus, "completed", "slow live tool event turn should complete");
await runFailureCase({ client, managerUrl: server.baseUrl, context, mode: "missing-turn-result", expectedStatus: "failed", expectedFailureKind: "backend-response-invalid" });
await runFailureCase({ client, managerUrl: server.baseUrl, context, mode: "provider-401-rpc-error", expectedStatus: "failed", expectedFailureKind: "provider-auth-failed" });
await runFailureCase({ client, managerUrl: server.baseUrl, context, mode: "provider-429-terminal", expectedStatus: "failed", expectedFailureKind: "provider-rate-limited" });
@@ -99,7 +106,7 @@ const selfTest: SelfTestCase = async (context) => {
await runSecretFailureCase({ client, managerUrl: server.baseUrl, context });
await runSpawnFailureCase({ client, managerUrl: server.baseUrl, context });
return { name: "codex-stdio", tests: ["runner-lease-heartbeat", "codex-stdio-fake-turn", "codex-stdio-projected-writable-home", "codex-stdio-deepseek-profile-fake-turn", "codex-stdio-deepseek-missing-secret-no-fallback", "codex-stdio-config-model-authoritative", "codex-stdio-explicit-model-forwarded", "codex-stdio-final-agent-message-only", "codex-stdio-stale-thread-fallback", "codex-stdio-missing-turn-result", "codex-stdio-provider-auth-failed", "codex-stdio-provider-rate-limited", "codex-stdio-provider-503-rpc-error", "codex-stdio-provider-503-terminal", "codex-stdio-provider-503-retry-event", "codex-stdio-invalid-json", "codex-stdio-timeout", "codex-stdio-secret-unavailable", "codex-stdio-spawn-failure"] };
return { name: "codex-stdio", tests: ["runner-lease-heartbeat", "codex-stdio-fake-turn", "codex-stdio-projected-writable-home", "codex-stdio-deepseek-profile-fake-turn", "codex-stdio-deepseek-missing-secret-no-fallback", "codex-stdio-config-model-authoritative", "codex-stdio-explicit-model-forwarded", "codex-stdio-final-agent-message-only", "codex-stdio-stale-thread-fallback", "codex-stdio-live-tool-events", "codex-stdio-missing-turn-result", "codex-stdio-provider-auth-failed", "codex-stdio-provider-rate-limited", "codex-stdio-provider-503-rpc-error", "codex-stdio-provider-503-terminal", "codex-stdio-provider-503-retry-event", "codex-stdio-invalid-json", "codex-stdio-timeout", "codex-stdio-secret-unavailable", "codex-stdio-spawn-failure"] };
} finally {
await new Promise<void>((resolve) => server.server.close(() => resolve()));
}
@@ -136,6 +143,16 @@ function eventPayload(event: { payload: unknown }): JsonRecord {
return typeof event.payload === "object" && event.payload !== null && !Array.isArray(event.payload) ? event.payload as JsonRecord : {};
}
async function waitForEvent(client: ManagerClient, runId: string, predicate: (event: { type: string; payload: unknown }) => boolean, label: string): Promise<void> {
const deadline = Date.now() + 3_000;
while (Date.now() < deadline) {
const events = await client.get(`/api/v1/runs/${runId}/events?afterSeq=0&limit=100`) as { items?: Array<{ type: string; payload: unknown }> };
if ((events.items ?? []).some(predicate)) return;
await new Promise((resolve) => setTimeout(resolve, 25));
}
assert.fail(`timed out waiting for ${label}`);
}
async function createStaleThreadRun(client: ManagerClient, context: SelfTestContext): Promise<{ runId: string; commandId: string }> {
const run = await client.post("/api/v1/runs", {
tenantId: "unidesk",
+14
View File
@@ -134,6 +134,20 @@ for await (const line of rl) {
respond(message.id, { turn });
continue;
}
if (mode === "slow-tool-events") {
turnCounter += 1;
const turn = { id: `turn_selftest_${turnCounter}`, status: "completed" };
notify("turn/started", { turn });
notify("item/started", { item: { id: "tool_selftest", type: "commandExecution", command: "sleep 0.05 && echo live" } });
notify("item/commandExecution/outputDelta", { itemId: "tool_selftest", delta: "live output\n" });
setTimeout(() => {
notify("item/completed", { item: { id: "tool_selftest", type: "commandExecution", command: "sleep 0.05 && echo live", status: "completed" } });
notify("item/agentMessage/delta", { itemId: "msg_selftest", delta: "done" });
notify("turn/completed", { turn });
respond(message.id, { turn });
}, 50);
continue;
}
turnCounter += 1;
const turn = { id: `turn_selftest_${turnCounter}`, status: "completed" };
notify("turn/started", { turn });