diff --git a/src/backend/codex-stdio.ts b/src/backend/codex-stdio.ts index 1aefa66..e9d6475 100644 --- a/src/backend/codex-stdio.ts +++ b/src/backend/codex-stdio.ts @@ -18,6 +18,7 @@ const requestTimeoutCapMs = 30_000; const assistantDeltaProgressMinChars = 500; const assistantDeltaProgressLimitChars = 1_200; const defaultIdleWarningMs = 8_000; +const defaultTerminalNotificationDrainMs = 2_000; const childEnvSummaryKeys = [ "CODEX_HOME", @@ -472,15 +473,26 @@ async function runCodexStdioTurnWithSession(options: CodexStdioTurnOptions, sess const terminalPromise = new Promise((resolve) => { terminalResolve = resolve; }); const terminalNotificationDrainMs = codexTerminalNotificationDrainMs(env); let terminalNotificationDrainTimeout: NodeJS.Timeout | null = null; + let terminalNotificationDrainResolved = false; + const resolveTerminalNow = (): void => { + if (terminalNotificationDrainResolved) return; + terminalNotificationDrainResolved = true; + if (terminalNotificationDrainTimeout) { + clearTimeout(terminalNotificationDrainTimeout); + terminalNotificationDrainTimeout = null; + } + terminalResolve(); + }; const resolveTerminalAfterNotificationDrain = (): void => { if (terminalNotificationDrainTimeout) clearTimeout(terminalNotificationDrainTimeout); + if (terminalNotificationDrainResolved) return; if (terminalNotificationDrainMs <= 0) { - terminalResolve(); + resolveTerminalNow(); return; } terminalNotificationDrainTimeout = setTimeout(() => { terminalNotificationDrainTimeout = null; - terminalResolve(); + resolveTerminalNow(); }, terminalNotificationDrainMs); terminalNotificationDrainTimeout.unref?.(); }; @@ -543,7 +555,7 @@ async function runCodexStdioTurnWithSession(options: CodexStdioTurnOptions, sess terminal = { status: "cancelled", failureKind: "cancelled", message: "cancel requested" }; emitEvent({ type: "backend_status", payload: { phase: "turn-cancelled", failureKind: "cancelled" } }); beginInterruptAndStop("cancel requested", "abort-signal"); - terminalResolve(); + resolveTerminalNow(); }; options.abortSignal?.addEventListener("abort", abortTurn, { once: true }); const turnHardTimeoutMs = positiveTimeout(options.timeoutMs); @@ -577,7 +589,7 @@ async function runCodexStdioTurnWithSession(options: CodexStdioTurnOptions, sess emitEvent({ type: "error", payload: { failureKind: terminal.failureKind, message: terminal.message, phase: "turn:hard-timeout", timeoutMs: turnHardTimeoutMs, elapsedMs, idleMs: Math.max(0, Date.now() - lastActivityAt), waitingFor, lastNotificationMethod, threadId: threadId ?? null, turnId: turnId ?? null, retryable: false, retryAttempt: null, retryMaxAttempts: 0, retryExhausted: true, lastToolCall } }); emitCodexOtelSpan("codex_stdio.turn_hard_timeout", options, env, attrs, { status: "error", error: terminal.message }); beginInterruptAndStop("turn hard timeout", "turn:hard-timeout"); - terminalResolve(); + resolveTerminalNow(); }; const scheduleTurnHardTimeout = (): void => { if (hardTimeout) clearTimeout(hardTimeout); @@ -616,7 +628,7 @@ async function runCodexStdioTurnWithSession(options: CodexStdioTurnOptions, sess emitEvent({ type: "error", payload: { failureKind: terminal.failureKind, message: terminal.message, phase: "turn:missing-terminal-after-tool-timeout", timeoutMs: missingTerminalAfterToolTimeoutMs, retryable: false, retryAttempt: null, retryMaxAttempts: 0, retryExhausted: true, lastToolCall } }); emitCodexOtelSpan("codex_stdio.missing_terminal_after_tool_timeout", options, env, attrs, { status: "error", error: terminal.message }); beginInterruptAndStop("missing terminal after tool timeout", "turn:missing-terminal-after-tool-timeout"); - terminalResolve(); + resolveTerminalNow(); }; const scheduleMissingTerminalAfterToolTimeout = (): void => { clearMissingTerminalAfterToolTimeout(); @@ -646,7 +658,7 @@ async function runCodexStdioTurnWithSession(options: CodexStdioTurnOptions, sess emitEvent({ type: "error", payload: { failureKind: terminal.failureKind, message: terminal.message, phase: "turn:idle-timeout" } }); emitCodexOtelSpan("codex_stdio.idle_timeout", options, env, { waitingFor, idleMs: Math.max(0, Date.now() - lastActivityAt), lastNotificationMethod, threadId: threadId ?? null, turnId: turnId ?? null, terminalStatus: terminal.status, failureKind: terminal.failureKind }, { status: "error", error: terminal.message }); beginInterruptAndStop("idle timeout", "turn:idle-timeout"); - terminalResolve(); + resolveTerminalNow(); }, turnIdleTimeoutMs); idleTimeout.unref?.(); }; @@ -691,6 +703,8 @@ async function runCodexStdioTurnWithSession(options: CodexStdioTurnOptions, sess if (normalized.terminal && !terminal) { terminal = normalized.terminal; resolveTerminalAfterNotificationDrain(); + } else if (terminal && terminalNotificationDrainTimeout !== null) { + resolveTerminalAfterNotificationDrain(); } }); try { @@ -1504,7 +1518,7 @@ function codexMissingTerminalAfterToolTimeoutMs(env: NodeJS.ProcessEnv, turnTime function codexTerminalNotificationDrainMs(env: NodeJS.ProcessEnv): number { const configured = Number(env.AGENTRUN_CODEX_TERMINAL_NOTIFICATION_DRAIN_MS); if (Number.isFinite(configured) && configured >= 0) return Math.floor(configured); - return 250; + return defaultTerminalNotificationDrainMs; } function emitCodexNotificationOtel(options: CodexStdioTurnOptions, env: NodeJS.ProcessEnv, message: JsonRecord, state: JsonRecord): void { diff --git a/src/selftest/cases/30-codex-stdio.ts b/src/selftest/cases/30-codex-stdio.ts index a1e9be4..458fefb 100644 --- a/src/selftest/cases/30-codex-stdio.ts +++ b/src/selftest/cases/30-codex-stdio.ts @@ -119,7 +119,7 @@ const selfTest: SelfTestCase = async (context) => { assert.equal(finalMessageItems.some((event) => event.type === "backend_status" && String(eventPayload(event).phase ?? "").startsWith("item/agentMessage:")), false, "agentMessage lifecycle must not be persisted as backend_status noise"); const terminalBeforeFinal = await createRunWithCommand(client, context, "hello terminal before final", "selftest-terminal-before-final-agent-message", 15_000); - const terminalBeforeFinalResult = await runOnce({ managerUrl: server.baseUrl, runId: terminalBeforeFinal.runId, codexCommand: context.fakeCodexCommand, codexArgs: context.fakeCodexArgs, codexHome: context.codexHome, env: { CODEX_HOME: context.codexHome, AGENTRUN_FAKE_CODEX_MODE: "multi-agent-message-terminal-before-final" }, oneShot: true }); + const terminalBeforeFinalResult = await runOnce({ managerUrl: server.baseUrl, runId: terminalBeforeFinal.runId, codexCommand: context.fakeCodexCommand, codexArgs: context.fakeCodexArgs, codexHome: context.codexHome, env: { CODEX_HOME: context.codexHome, AGENTRUN_FAKE_CODEX_MODE: "multi-agent-message-terminal-before-final", AGENTRUN_CODEX_TERMINAL_NOTIFICATION_DRAIN_MS: "300" }, oneShot: true }); assert.equal(terminalBeforeFinalResult.terminalStatus, "completed", "terminal-before-final agentMessage run should complete"); const terminalBeforeFinalEvents = await client.get(`/api/v1/runs/${terminalBeforeFinal.runId}/events?afterSeq=0&limit=100`) as { items?: Array<{ type: string; payload: unknown }> }; const terminalBeforeFinalItems = terminalBeforeFinalEvents.items ?? []; diff --git a/src/selftest/fake-codex-app-server.ts b/src/selftest/fake-codex-app-server.ts index 0886c12..f03c111 100644 --- a/src/selftest/fake-codex-app-server.ts +++ b/src/selftest/fake-codex-app-server.ts @@ -232,8 +232,10 @@ for await (const line of rl) { notify("turn/completed", { turn }); setTimeout(() => { notify("item/completed", { item: { id: "msg_late_progress", type: "agentMessage", text: "Progress before delayed final." } }); + }, 200).unref?.(); + setTimeout(() => { notify("item/completed", { item: { id: "msg_late_final", type: "agentMessage", text: "Delayed final answer." } }); - }, 20).unref?.(); + }, 450).unref?.(); continue; } if (mode === "web-search-progress") {