From 0d9c2dad43228f6d4b683880ed5e362356c543f8 Mon Sep 17 00:00:00 2001 From: Lyon <88232613+pikasTech@users.noreply.github.com> Date: Tue, 23 Jun 2026 17:05:29 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E5=BB=B6=E5=90=8E=20codex=20turn=20comp?= =?UTF-8?q?letion=20=E4=BA=8B=E4=BB=B6=20(#233)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/backend/codex-stdio.ts | 11 +++++++---- src/selftest/cases/30-codex-stdio.ts | 9 +++++++++ src/selftest/fake-codex-app-server.ts | 11 +++++++++++ 3 files changed, 27 insertions(+), 4 deletions(-) diff --git a/src/backend/codex-stdio.ts b/src/backend/codex-stdio.ts index 5df00b7..370db5b 100644 --- a/src/backend/codex-stdio.ts +++ b/src/backend/codex-stdio.ts @@ -457,6 +457,7 @@ async function runCodexStdioTurnWithSession(options: CodexStdioTurnOptions, sess let assistantText = ""; const assistantDeltaProgress = createAssistantDeltaProgressState(); const completedAssistantMessages: CompletedAssistantMessage[] = []; + const deferredTerminalEvents: BackendEvent[] = []; const suppressedNotifications = createSuppressedNotificationSummary(); let waitingFor = "codex-app-server"; let lastNotificationMethod: string | null = null; @@ -672,6 +673,7 @@ async function runCodexStdioTurnWithSession(options: CodexStdioTurnOptions, sess completedAssistantMessages.push(normalized.completedAssistantMessage); emitEvent(assistantMessageEventForCompleted(normalized.completedAssistantMessage, completedAssistantMessages.length)); } + if (normalized.terminalEvents) deferredTerminalEvents.push(...normalized.terminalEvents); if (normalized.terminal && !terminal) { terminal = normalized.terminal; terminalResolve(); @@ -798,6 +800,7 @@ async function runCodexStdioTurnWithSession(options: CodexStdioTurnOptions, sess if (finalAssistant) emitEvent(assistantFinalResponseEvent(finalAssistant)); else if (completedAssistantMessages.length === 0) emitEvents(assistantMessageEventsForTurn(assistantText, false)); emitEvents(suppressedNotificationEvents(suppressedNotifications)); + emitEvents(deferredTerminalEvents); emitEvent({ type: "terminal_status", payload: { terminalStatus: terminal.status, failureKind: terminal.failureKind, message: terminal.message } }); await liveEventWrite; return { terminalStatus: terminal.status, failureKind: terminal.failureKind, failureMessage: terminal.message, events: events.map((event) => ({ ...event, payload: redactJson(event.payload) })), ...(threadId ? { threadId } : {}), ...(turnId ? { turnId } : {}) }; @@ -943,7 +946,7 @@ function escapeRegExp(value: string): string { return value.replace(/[.*+?^${}()|[\]\\]/gu, "\\$&"); } -function normalizeCodexNotification(message: JsonRecord, suppressed: SuppressedNotificationSummary): { events: BackendEvent[]; assistantDelta?: { itemId: string | null; text: string }; completedAssistantMessage?: CompletedAssistantMessage; threadId?: string; turnId?: string; terminal?: { status: TerminalStatus; failureKind: FailureKind | null; message: string | null } } { +function normalizeCodexNotification(message: JsonRecord, suppressed: SuppressedNotificationSummary): { events: BackendEvent[]; terminalEvents?: BackendEvent[]; assistantDelta?: { itemId: string | null; text: string }; completedAssistantMessage?: CompletedAssistantMessage; threadId?: string; turnId?: string; terminal?: { status: TerminalStatus; failureKind: FailureKind | null; message: string | null } } { const method = typeof message.method === "string" ? message.method : "unknown"; const params = asRecordAt(message, "params"); if (method === "thread/started") { @@ -999,9 +1002,9 @@ function normalizeCodexNotification(message: JsonRecord, suppressed: SuppressedN const error = asRecordAt(turn, "error"); const messageText = typeof error.message === "string" ? redactText(error.message) : null; const failureKind = status === "completed" ? null : status === "cancelled" ? "cancelled" : classifyCodexErrorRecord(Object.keys(error).length > 0 ? error : { message: turn.status }, "backend-failed"); - const events: BackendEvent[] = [{ type: "backend_status", payload: { phase: method, terminalStatus: status } }]; - if (failureKind) events.push({ type: "error", payload: { failureKind, error: redactJson(error), phase: method } }); - return { events, terminal: { status, failureKind, message: messageText } }; + const terminalEvents: BackendEvent[] = [{ type: "backend_status", payload: { phase: method, terminalStatus: status } }]; + if (failureKind) terminalEvents.push({ type: "error", payload: { failureKind, error: redactJson(error), phase: method } }); + return { events: [], terminalEvents, terminal: { status, failureKind, message: messageText } }; } return { events: [{ type: "backend_status", payload: { phase: method } }] }; } diff --git a/src/selftest/cases/30-codex-stdio.ts b/src/selftest/cases/30-codex-stdio.ts index 21c9ad0..a1e9be4 100644 --- a/src/selftest/cases/30-codex-stdio.ts +++ b/src/selftest/cases/30-codex-stdio.ts @@ -118,6 +118,15 @@ const selfTest: SelfTestCase = async (context) => { assert.ok(finalMessageIndex >= 0 && finalMessageIndex < turnCompletedIndex, "final agentMessage should be emitted before turn/completed instead of being delayed to final response"); assert.equal(finalMessageItems.some((event) => event.type === "backend_status" && String(eventPayload(event).phase ?? "").startsWith("item/agentMessage:")), false, "agentMessage lifecycle must not be persisted as backend_status noise"); + const terminalBeforeFinal = await createRunWithCommand(client, context, "hello terminal before final", "selftest-terminal-before-final-agent-message", 15_000); + const terminalBeforeFinalResult = await runOnce({ managerUrl: server.baseUrl, runId: terminalBeforeFinal.runId, codexCommand: context.fakeCodexCommand, codexArgs: context.fakeCodexArgs, codexHome: context.codexHome, env: { CODEX_HOME: context.codexHome, AGENTRUN_FAKE_CODEX_MODE: "multi-agent-message-terminal-before-final" }, oneShot: true }); + assert.equal(terminalBeforeFinalResult.terminalStatus, "completed", "terminal-before-final agentMessage run should complete"); + const terminalBeforeFinalEvents = await client.get(`/api/v1/runs/${terminalBeforeFinal.runId}/events?afterSeq=0&limit=100`) as { items?: Array<{ type: string; payload: unknown }> }; + const terminalBeforeFinalItems = terminalBeforeFinalEvents.items ?? []; + const delayedFinalIndex = terminalBeforeFinalItems.findIndex((event) => event.type === "assistant_message" && eventPayload(event).itemId === "msg_late_final"); + const delayedTurnCompletedIndex = terminalBeforeFinalItems.findIndex((event) => event.type === "backend_status" && eventPayload(event).phase === "turn/completed"); + assert.ok(delayedFinalIndex >= 0 && delayedTurnCompletedIndex > delayedFinalIndex, "turn/completed must be emitted after final assistant even when Codex sends terminal notification first"); + const webSearch = await createRunWithCommand(client, context, "hello web search progress", "selftest-web-search-progress", 15_000); const webSearchPromise = runOnce({ managerUrl: server.baseUrl, runId: webSearch.runId, codexCommand: context.fakeCodexCommand, codexArgs: context.fakeCodexArgs, codexHome: context.codexHome, env: { CODEX_HOME: context.codexHome, AGENTRUN_FAKE_CODEX_MODE: "web-search-progress" }, oneShot: true }) as Promise; await waitForEvent(client, webSearch.runId, (event) => event.type === "tool_call" && eventPayload(event).type === "webSearch" && eventPayload(event).method === "item/started", "webSearch tool_call start event"); diff --git a/src/selftest/fake-codex-app-server.ts b/src/selftest/fake-codex-app-server.ts index 00cd9db..eb0cdf8 100644 --- a/src/selftest/fake-codex-app-server.ts +++ b/src/selftest/fake-codex-app-server.ts @@ -223,6 +223,17 @@ for await (const line of rl) { respond(message.id, { turn }); continue; } + if (mode === "multi-agent-message-terminal-before-final") { + turnCounter += 1; + const turn = { id: `turn_selftest_${turnCounter}`, status: "completed" }; + notify("turn/started", { turn }); + notify("item/agentMessage/delta", { itemId: "msg_late_progress", delta: "Progress before delayed final. " }); + notify("turn/completed", { turn }); + notify("item/completed", { item: { id: "msg_late_progress", type: "agentMessage", text: "Progress before delayed final." } }); + notify("item/completed", { item: { id: "msg_late_final", type: "agentMessage", text: "Delayed final answer." } }); + respond(message.id, { turn }); + continue; + } if (mode === "web-search-progress") { turnCounter += 1; const turn = { id: `turn_selftest_${turnCounter}`, status: "completed" };