From ff2b5dce34639206067402e4543d96f794ce4bd2 Mon Sep 17 00:00:00 2001 From: Codex Date: Wed, 10 Jun 2026 00:25:54 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E6=94=AF=E6=8C=81=E9=95=BF=E5=B7=A5?= =?UTF-8?q?=E5=85=B7=E8=B0=83=E7=94=A8=20interrupt=20=E4=B8=8E=E7=A1=AC?= =?UTF-8?q?=E8=B6=85=E6=97=B6=E5=9B=9E=E6=94=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/backend/codex-stdio.ts | 116 ++++++++++++++++++++------ src/runner/run-once.ts | 42 ++++++++-- src/selftest/cases/30-codex-stdio.ts | 56 ++++++++++++- src/selftest/fake-codex-app-server.ts | 37 +++++++- 4 files changed, 213 insertions(+), 38 deletions(-) diff --git a/src/backend/codex-stdio.ts b/src/backend/codex-stdio.ts index b7a5290..a471394 100644 --- a/src/backend/codex-stdio.ts +++ b/src/backend/codex-stdio.ts @@ -423,16 +423,75 @@ async function runCodexStdioTurnWithSession(options: CodexStdioTurnOptions, sess let client: CodexStdioClient | null = null; const requestTimeoutMs = Math.min(positiveTimeout(options.timeoutMs), requestTimeoutCapMs); let stopActiveTurn: (() => void) | undefined; + let activeTurnKey: string | null = null; + let interruptInFlight: Promise | null = null; + let stopAfterInterrupt = false; + const controlRequestTimeoutMs = Math.min(requestTimeoutMs, 5_000); + const activeTurnControl = (activeThreadId: string, activeTurnId: string): CodexActiveTurnControl => ({ + threadId: activeThreadId, + turnId: activeTurnId, + steer: async (prompt: string) => { + await client!.request("turn/steer", { threadId: activeThreadId, expectedTurnId: activeTurnId, input: textInput(prompt) }, requestTimeoutMs); + }, + interrupt: async () => { + await client!.request("turn/interrupt", { threadId: activeThreadId, turnId: activeTurnId }, controlRequestTimeoutMs); + }, + }); + const exposeActiveTurn = (source: string): void => { + if (!client || !threadId || !turnId || !options.onActiveTurn) return; + const key = `${threadId}:${turnId}`; + if (activeTurnKey === key) return; + stopActiveTurn?.(); + activeTurnKey = key; + emitEvent({ type: "backend_status", payload: { phase: "active-turn-control-ready", source, threadId, turnId } }); + const maybeStop = options.onActiveTurn(activeTurnControl(threadId, turnId)); + stopActiveTurn = typeof maybeStop === "function" ? maybeStop : undefined; + }; + const requestInterrupt = (reason: string, triggerPhase: string): Promise => { + const activeClient = client; + const activeThreadId = threadId; + const activeTurnId = turnId; + emitEvent({ type: "backend_status", payload: { phase: "turn-interrupt-requested", reason, triggerPhase, threadId: activeThreadId ?? null, turnId: activeTurnId ?? null } }); + if (!activeClient || !activeThreadId || !activeTurnId) { + emitEvent({ type: "backend_status", payload: { phase: "turn-interrupt-unavailable", reason, triggerPhase, hasClient: Boolean(activeClient), hasThreadId: Boolean(activeThreadId), hasTurnId: Boolean(activeTurnId) } }); + activeClient?.stop(); + return Promise.resolve(); + } + return activeClient.request("turn/interrupt", { threadId: activeThreadId, turnId: activeTurnId }, controlRequestTimeoutMs) + .then(() => { + emitEvent({ type: "backend_status", payload: { phase: "turn/interrupt:completed", reason, triggerPhase, threadId: activeThreadId, turnId: activeTurnId } }); + }) + .catch((error) => { + const failure = normalizeFailure(error); + emitEvent({ type: "error", payload: { failureKind: failure.failureKind, message: failure.message, phase: "turn/interrupt:failed", triggerPhase, details: failure.details } }); + }) + .finally(() => { + if (stopAfterInterrupt && !activeClient.isClosed) activeClient.stop(); + }); + }; + const beginInterruptAndStop = (reason: string, triggerPhase: string): void => { + stopAfterInterrupt = true; + if (!interruptInFlight) interruptInFlight = requestInterrupt(reason, triggerPhase); + else interruptInFlight = interruptInFlight.then(() => requestInterrupt(reason, triggerPhase)); + }; const abortTurn = (): void => { if (terminal) return; terminal = { status: "cancelled", failureKind: "cancelled", message: "cancel requested" }; emitEvent({ type: "backend_status", payload: { phase: "turn-cancelled", failureKind: "cancelled" } }); - client?.stop(); + beginInterruptAndStop("cancel requested", "abort-signal"); terminalResolve(); }; options.abortSignal?.addEventListener("abort", abortTurn, { once: true }); const turnIdleTimeoutMs = positiveTimeout(options.timeoutMs); + let hardTimeout: NodeJS.Timeout | null = null; let idleTimeout: NodeJS.Timeout | null = null; + hardTimeout = setTimeout(() => { + if (terminal) return; + terminal = { status: "failed", failureKind: "backend-timeout", message: `codex stdio turn hard timed out after ${turnIdleTimeoutMs}ms` }; + emitEvent({ type: "error", payload: { failureKind: terminal.failureKind, message: terminal.message, phase: "turn:hard-timeout" } }); + beginInterruptAndStop("hard timeout", "turn:hard-timeout"); + terminalResolve(); + }, turnIdleTimeoutMs); const refreshTurnActivity = (): void => { if (terminal) return; if (idleTimeout) clearTimeout(idleTimeout); @@ -440,7 +499,7 @@ async function runCodexStdioTurnWithSession(options: CodexStdioTurnOptions, sess if (terminal) return; terminal = { status: "failed", failureKind: "backend-timeout", message: `codex stdio turn idle timed out after ${turnIdleTimeoutMs}ms without activity` }; emitEvent({ type: "error", payload: { failureKind: terminal.failureKind, message: terminal.message, phase: "turn:idle-timeout" } }); - client?.stop(); + beginInterruptAndStop("idle timeout", "turn:idle-timeout"); terminalResolve(); }, turnIdleTimeoutMs); }; @@ -449,12 +508,18 @@ async function runCodexStdioTurnWithSession(options: CodexStdioTurnOptions, sess clearTimeout(idleTimeout); idleTimeout = null; }; + const stopTurnHardTimeout = (): void => { + if (!hardTimeout) return; + clearTimeout(hardTimeout); + hardTimeout = null; + }; refreshTurnActivity(); const stopNotifications = session.addNotificationHandler((message) => { refreshTurnActivity(); const normalized = normalizeCodexNotification(message, suppressedNotifications); if (normalized.threadId) threadId = normalized.threadId; if (normalized.turnId) turnId = normalized.turnId; + exposeActiveTurn(normalized.turnId ? "turn-notification" : "notification"); emitEvents(normalized.events); if (normalized.assistantDelta) { assistantText += normalized.assistantDelta.text; @@ -511,30 +576,28 @@ async function runCodexStdioTurnWithSession(options: CodexStdioTurnOptions, sess const promptInjection = initialPromptInjection(options.initialPrompt, willResumeThread); emitEvent({ type: "backend_status", payload: { phase: "initial-prompt-assembly", initialPromptInjected: promptInjection.injected, reason: promptInjection.reason, initialPrompt: options.initialPrompt?.summary ?? { available: false, valuesPrinted: false }, valuesPrinted: false } }); - const turnResponse = requireResponseRecord(await client.request("turn/start", withOptionalModel({ threadId, input: textInputForUserMessage(options.prompt, promptInjection), cwd: options.cwd, approvalPolicy: options.approvalPolicy }, options.model), requestTimeoutMs), "turn/start"); - turnId = requireNestedId(turnResponse, "turn/start", "turn"); - emitEvent({ type: "backend_status", payload: { phase: "turn/start:completed", turnId } }); - if (threadId && turnId && options.onActiveTurn) { - const maybeStop = options.onActiveTurn({ - threadId, - turnId, - steer: async (prompt: string) => { - await client!.request("turn/steer", { threadId: threadId!, expectedTurnId: turnId!, input: textInput(prompt) }, requestTimeoutMs); - }, - interrupt: async () => { - await client!.request("turn/interrupt", { threadId: threadId!, turnId: turnId! }, requestTimeoutMs); - }, - }); - if (typeof maybeStop === "function") stopActiveTurn = maybeStop; + const turnStart = await Promise.race([ + client.request("turn/start", withOptionalModel({ threadId, input: textInputForUserMessage(options.prompt, promptInjection), cwd: options.cwd, approvalPolicy: options.approvalPolicy }, options.model), requestTimeoutMs).then((response) => ({ kind: "response" as const, response })), + terminalPromise.then(() => ({ kind: "terminal" as const })), + ]); + if (turnStart.kind === "response") { + const turnResponse = requireResponseRecord(turnStart.response, "turn/start"); + turnId = requireNestedId(turnResponse, "turn/start", "turn"); + emitEvent({ type: "backend_status", payload: { phase: "turn/start:completed", turnId } }); + exposeActiveTurn("turn-start-response"); + } else { + emitEvent({ type: "backend_status", payload: { phase: "turn/start:interrupted-before-response", threadId: threadId ?? null, turnId: turnId ?? null } }); } - const race = await Promise.race([ - terminalPromise.then(() => ({ kind: "terminal" as const })), - client.closedPromise.then((closeInfo) => ({ kind: "closed" as const, closeInfo })), - ]); - if (race.kind === "closed" && !terminal) { - terminal = terminalFromClose(race.closeInfo); - emitEvent({ type: "error", payload: { failureKind: terminal.failureKind, message: terminal.message, phase: "transport:closed-before-terminal", appServerExit: closeEvent(race.closeInfo) } }); + if (!terminal) { + const race = await Promise.race([ + terminalPromise.then(() => ({ kind: "terminal" as const })), + client.closedPromise.then((closeInfo) => ({ kind: "closed" as const, closeInfo })), + ]); + if (race.kind === "closed" && !terminal) { + terminal = terminalFromClose(race.closeInfo); + emitEvent({ type: "error", payload: { failureKind: terminal.failureKind, message: terminal.message, phase: "transport:closed-before-terminal", appServerExit: closeEvent(race.closeInfo) } }); + } } if (!terminal) terminal = { status: "failed", failureKind: "backend-response-invalid", message: "codex app-server did not emit turn/completed" }; } catch (error) { @@ -548,8 +611,11 @@ async function runCodexStdioTurnWithSession(options: CodexStdioTurnOptions, sess stopNotifications(); options.abortSignal?.removeEventListener("abort", abortTurn); stopTurnIdleTimeout(); + stopTurnHardTimeout(); } if (!terminal) terminal = { status: "failed", failureKind: "backend-response-invalid", message: "codex app-server finished without terminal status" }; + const pendingInterrupt: Promise | null = interruptInFlight as Promise | null; + if (pendingInterrupt) await pendingInterrupt.catch(() => undefined); if (terminal.status !== "completed") emitEvents(await session.close()); emitEvents(flushAssistantDeltaProgress(assistantDeltaProgress)); if (completedAssistantMessages.length === 0) emitEvents(assistantMessageEventsForTurn(assistantText, terminal.status === "completed")); @@ -754,7 +820,7 @@ function normalizeCodexNotification(message: JsonRecord, suppressed: SuppressedN const status = terminalStatusFromValue(turn.status); const error = asRecordAt(turn, "error"); const messageText = typeof error.message === "string" ? redactText(error.message) : null; - const failureKind = status === "completed" ? null : classifyCodexErrorRecord(Object.keys(error).length > 0 ? error : { message: turn.status }, "backend-failed"); + const failureKind = status === "completed" ? null : status === "cancelled" ? "cancelled" : classifyCodexErrorRecord(Object.keys(error).length > 0 ? error : { message: turn.status }, "backend-failed"); const events: BackendEvent[] = [{ type: "backend_status", payload: { phase: method, terminalStatus: status } }]; if (failureKind) events.push({ type: "error", payload: { failureKind, error: redactJson(error), phase: method } }); return { events, terminal: { status, failureKind, message: messageText } }; diff --git a/src/runner/run-once.ts b/src/runner/run-once.ts index c9b5303..6db5bba 100644 --- a/src/runner/run-once.ts +++ b/src/runner/run-once.ts @@ -199,7 +199,7 @@ async function executeCommand(api: RunnerManagerApi, options: RunnerOnceOptions, await appendBestEffort(api, options.runId, annotateCommandEvent(event, command.id, attemptId, runner.id)); }, onActiveTurn: (control: BackendActiveTurnControl) => { - stopSteerWatch = startSteerWatch(api, options.runId, command.id, attemptId, runner.id, control, options.pollIntervalMs); + stopSteerWatch = startActiveTurnCommandWatch(api, options.runId, command.id, attemptId, runner.id, control, options.pollIntervalMs); return () => { stopSteerWatch?.(); stopSteerWatch = undefined; @@ -224,7 +224,7 @@ async function executeCommand(api: RunnerManagerApi, options: RunnerOnceOptions, } } -function startSteerWatch(api: RunnerManagerApi, runId: string, targetCommandId: string, attemptId: string, runnerId: string, control: BackendActiveTurnControl, pollIntervalMs: number | undefined): () => void { +function startActiveTurnCommandWatch(api: RunnerManagerApi, runId: string, targetCommandId: string, attemptId: string, runnerId: string, control: BackendActiveTurnControl, pollIntervalMs: number | undefined): () => void { let stopped = false; let polling = false; const seen = new Set(); @@ -234,13 +234,14 @@ function startSteerWatch(api: RunnerManagerApi, runId: string, targetCommandId: polling = true; try { const commands = await api.listCommands(runId, { afterSeq: 0, limit: 100 }); - const pendingSteer = commands.filter((item) => item.type === "steer" && item.state === "pending" && !seen.has(item.id)); - for (const steerCommand of pendingSteer) { - seen.add(steerCommand.id); - await handleSteerCommand(api, runId, steerCommand, targetCommandId, attemptId, runnerId, control); + const pendingControlCommands = commands.filter((item) => (item.type === "steer" || item.type === "interrupt") && item.state === "pending" && !seen.has(item.id)); + for (const controlCommand of pendingControlCommands) { + seen.add(controlCommand.id); + if (controlCommand.type === "interrupt") await handleInterruptCommand(api, runId, controlCommand, targetCommandId, attemptId, runnerId, control); + else await handleSteerCommand(api, runId, controlCommand, targetCommandId, attemptId, runnerId, control); } } catch { - // The active backend turn remains authoritative; missed steer commands stay pending for the next poll. + // The active backend turn remains authoritative; missed control commands stay pending for the next poll. } finally { polling = false; } @@ -275,11 +276,29 @@ async function handleSteerCommand(api: RunnerManagerApi, runId: string, command: } } +async function handleInterruptCommand(api: RunnerManagerApi, runId: string, command: CommandRecord, targetCommandId: string, attemptId: string, runnerId: string, control: BackendActiveTurnControl): Promise { + const acked = await api.ackCommand(command.id); + if (acked.state === "cancelled") { + await api.reportCommandStatus(command.id, { terminalStatus: "cancelled", failureKind: "cancelled", failureMessage: "interrupt command cancelled before delivery", threadId: control.threadId, turnId: control.turnId }); + return; + } + const reason = interruptReason(command.payload); + await appendBestEffort(api, runId, { type: "backend_status", payload: { phase: "interrupt-command-acknowledged", commandId: command.id, commandType: "interrupt", targetCommandId, attemptId, runnerId, threadId: control.threadId, turnId: control.turnId, reason } }); + try { + await control.interrupt(); + await appendBestEffort(api, runId, { type: "backend_status", payload: { phase: "turn/interrupt:completed", commandId: command.id, commandType: "interrupt", targetCommandId, attemptId, runnerId, threadId: control.threadId, turnId: control.turnId, reason } }); + await api.reportCommandStatus(command.id, { terminalStatus: "completed", failureKind: null, failureMessage: null, threadId: control.threadId, turnId: control.turnId }); + } catch (error) { + const failureKind = failureKindFromError(error); + await reportNonTerminalCommandFailure(api, runId, command.id, attemptId, runnerId, { terminalStatus: terminalStatusForFailure(failureKind), failureKind, message: errorMessage(error) }, "runner:interrupt", control); + } +} + async function failPendingSteerWithoutActiveTurn(api: RunnerManagerApi, runId: string, commands: CommandRecord[], runnerId: string, attemptId: string): Promise { - for (const command of commands.filter((item) => item.type === "steer" && item.state === "pending")) { + for (const command of commands.filter((item) => (item.type === "steer" || item.type === "interrupt") && item.state === "pending")) { const acked = await api.ackCommand(command.id); if (acked.state === "cancelled") continue; - await reportNonTerminalCommandFailure(api, runId, command.id, attemptId, runnerId, { terminalStatus: "blocked", failureKind: "schema-invalid", message: "steer command requires an active turn" }, "runner:steer:no-active-turn"); + await reportNonTerminalCommandFailure(api, runId, command.id, attemptId, runnerId, { terminalStatus: "blocked", failureKind: "schema-invalid", message: `${command.type} command requires an active turn` }, `runner:${command.type}:no-active-turn`); } } @@ -296,6 +315,11 @@ function steerPrompt(payload: JsonRecord): string | null { return null; } +function interruptReason(payload: JsonRecord): string | null { + const value = payload.reason ?? payload.message ?? payload.prompt ?? payload.text; + return typeof value === "string" && value.trim().length > 0 ? value.trim() : null; +} + function isTerminalRun(run: RunRecord): boolean { return run.status === "completed" || run.status === "failed" || run.status === "blocked" || run.status === "cancelled"; } diff --git a/src/selftest/cases/30-codex-stdio.ts b/src/selftest/cases/30-codex-stdio.ts index 66b3af5..e2a1aba 100644 --- a/src/selftest/cases/30-codex-stdio.ts +++ b/src/selftest/cases/30-codex-stdio.ts @@ -181,6 +181,9 @@ const selfTest: SelfTestCase = async (context) => { const liveResult = await livePromise; assert.equal(liveResult.terminalStatus, "completed", "slow live tool event turn should complete"); + await runInterruptBeforeTurnStartResponseCase({ client, managerUrl: server.baseUrl, context }); + await runHardTimeoutDuringToolProgressCase({ client, managerUrl: server.baseUrl, context }); + const noisy = await createRunWithCommand(client, context, "hello noisy reasoning", "selftest-noisy-reasoning-events", 15_000); const noisyResult = await runOnce({ managerUrl: server.baseUrl, runId: noisy.runId, codexCommand: context.fakeCodexCommand, codexArgs: context.fakeCodexArgs, codexHome: context.codexHome, env: { CODEX_HOME: context.codexHome, AGENTRUN_FAKE_CODEX_MODE: "noisy-reasoning-events" }, oneShot: true }) as JsonRecord; assert.equal(noisyResult.terminalStatus, "completed", "noisy reasoning turn should complete"); @@ -239,7 +242,7 @@ const selfTest: SelfTestCase = async (context) => { await runSessionStorageSubdirCase({ client, managerUrl: server.baseUrl, context }); await runSessionStorageNoSecretLeakCase({ client, managerUrl: server.baseUrl, context }); - return { name: "codex-stdio", tests: ["runner-lease-heartbeat", "runner-lease-conflict-recovery", "codex-stdio-fake-turn", "codex-stdio-k8s-sandbox-override", "codex-stdio-projected-writable-home", "codex-stdio-deepseek-profile-fake-turn", "codex-stdio-dsflash-go-profile-fake-turn", "codex-stdio-dsflash-go-config-metadata", "codex-stdio-minimax-m3-profile-fake-turn", "codex-stdio-deepseek-missing-secret-no-fallback", "codex-stdio-minimax-m3-missing-secret-no-fallback", "codex-stdio-config-model-authoritative", "codex-stdio-explicit-model-forwarded", "codex-stdio-final-agent-message-only", "codex-stdio-web-search-progress", "codex-stdio-stale-thread-resume-failed", "codex-stdio-live-tool-events", "codex-stdio-noisy-reasoning-suppression", "codex-stdio-missing-turn-result", "codex-stdio-provider-auth-failed", "codex-stdio-provider-rate-limited", "codex-stdio-provider-invalid-tool-call", "codex-stdio-provider-compact-unsupported", "codex-stdio-provider-503-rpc-error", "codex-stdio-provider-503-terminal", "codex-stdio-provider-503-retry-event", "codex-stdio-invalid-json", "codex-stdio-timeout", "codex-stdio-idle-timeout-progress-refresh", "codex-stdio-command-failure-keeps-run-open", "codex-stdio-secret-unavailable", "codex-stdio-spawn-failure"] }; + return { name: "codex-stdio", tests: ["runner-lease-heartbeat", "runner-lease-conflict-recovery", "codex-stdio-fake-turn", "codex-stdio-k8s-sandbox-override", "codex-stdio-projected-writable-home", "codex-stdio-deepseek-profile-fake-turn", "codex-stdio-dsflash-go-profile-fake-turn", "codex-stdio-dsflash-go-config-metadata", "codex-stdio-minimax-m3-profile-fake-turn", "codex-stdio-deepseek-missing-secret-no-fallback", "codex-stdio-minimax-m3-missing-secret-no-fallback", "codex-stdio-config-model-authoritative", "codex-stdio-explicit-model-forwarded", "codex-stdio-final-agent-message-only", "codex-stdio-web-search-progress", "codex-stdio-stale-thread-resume-failed", "codex-stdio-live-tool-events", "codex-stdio-interrupt-before-turn-start-response", "codex-stdio-hard-timeout-during-tool-progress", "codex-stdio-noisy-reasoning-suppression", "codex-stdio-missing-turn-result", "codex-stdio-provider-auth-failed", "codex-stdio-provider-rate-limited", "codex-stdio-provider-invalid-tool-call", "codex-stdio-provider-compact-unsupported", "codex-stdio-provider-503-rpc-error", "codex-stdio-provider-503-terminal", "codex-stdio-provider-503-retry-event", "codex-stdio-invalid-json", "codex-stdio-timeout", "codex-stdio-idle-timeout-progress-refresh", "codex-stdio-command-failure-keeps-run-open", "codex-stdio-secret-unavailable", "codex-stdio-spawn-failure"] }; } finally { await new Promise((resolve) => server.server.close(() => resolve())); } @@ -281,7 +284,7 @@ async function runLeaseConflictRecoveryCase(options: { client: ManagerClient; ma } async function runSlowProgressIdleCase(options: { client: ManagerClient; managerUrl: string; context: SelfTestContext }): Promise { - const item = await createRunWithCommand(options.client, options.context, "slow progress before terminal", "selftest-slow-progress-idle-refresh", 60); + const item = await createRunWithCommand(options.client, options.context, "slow progress before terminal", "selftest-slow-progress-idle-refresh", 250); const result = await runOnce({ managerUrl: options.managerUrl, runId: item.runId, @@ -296,6 +299,55 @@ async function runSlowProgressIdleCase(options: { client: ManagerClient; manager assert.equal(events.items?.some((event) => event.type === "error" && eventPayload(event).failureKind === "backend-timeout"), false, "progressing turns must not fail on total elapsed time"); } +async function runInterruptBeforeTurnStartResponseCase(options: { client: ManagerClient; managerUrl: string; context: SelfTestContext }): Promise { + const item = await createRunWithCommand(options.client, options.context, "interrupt hanging tool before turn/start response", "selftest-interrupt-before-turn-start-response", 1_000); + const runPromise = runOnce({ + managerUrl: options.managerUrl, + runId: item.runId, + codexCommand: options.context.fakeCodexCommand, + codexArgs: options.context.fakeCodexArgs, + codexHome: options.context.codexHome, + env: { CODEX_HOME: options.context.codexHome, AGENTRUN_FAKE_CODEX_MODE: "tool-hangs-before-turn-start-response" }, + oneShot: true, + }) as Promise; + await waitForEvent(options.client, item.runId, (event) => event.type === "tool_call" && eventPayload(event).itemId === "tool_hang_before_response", "tool_call before turn/start response"); + const interrupt = await options.client.post(`/api/v1/runs/${item.runId}/commands`, { type: "interrupt", payload: { reason: "self-test interrupt hanging tool" }, idempotencyKey: "selftest-interrupt-before-response-command" }) as { id: string }; + const result = await runPromise; + assert.equal(result.terminalStatus, "cancelled", "interrupt should cancel the active turn even if turn/start response has not returned"); + assert.equal(result.failureKind, "cancelled"); + const interruptCommand = await options.client.get(`/api/v1/runs/${item.runId}/commands/${interrupt.id}`) as { state?: string }; + assert.equal(interruptCommand.state, "completed", "interrupt control command should be terminal completed after delivery"); + const events = await options.client.get(`/api/v1/runs/${item.runId}/events?afterSeq=0&limit=100`) as { items?: Array<{ type: string; payload: unknown }> }; + assert.ok(events.items?.some((event) => event.type === "backend_status" && eventPayload(event).phase === "active-turn-control-ready"), "turn/started notification should expose active turn control before turn/start response"); + assert.ok(events.items?.some((event) => event.type === "backend_status" && eventPayload(event).phase === "interrupt-command-acknowledged"), "interrupt command acknowledgement should be visible"); + assert.ok(events.items?.some((event) => event.type === "backend_status" && eventPayload(event).phase === "turn/interrupt:completed"), "interrupt delivery result should be visible"); + assert.ok(events.items?.some((event) => event.type === "backend_status" && eventPayload(event).phase === "turn/start:interrupted-before-response"), "turn/start pending request should be bypassed after interrupt terminal"); + assertNoSecretLeak({ result, events }); +} + +async function runHardTimeoutDuringToolProgressCase(options: { client: ManagerClient; managerUrl: string; context: SelfTestContext }): Promise { + const item = await createRunWithCommand(options.client, options.context, "hard timeout during tool progress", "selftest-hard-timeout-tool-progress", 120); + const result = await runOnce({ + managerUrl: options.managerUrl, + runId: item.runId, + codexCommand: options.context.fakeCodexCommand, + codexArgs: options.context.fakeCodexArgs, + codexHome: options.context.codexHome, + env: { CODEX_HOME: options.context.codexHome, AGENTRUN_FAKE_CODEX_MODE: "hard-timeout-tool-progress" }, + oneShot: true, + }) as JsonRecord; + assert.equal(result.terminalStatus, "failed", "hard timeout should fail even while the tool keeps producing progress"); + assert.equal(result.failureKind, "backend-timeout"); + const events = await options.client.get(`/api/v1/runs/${item.runId}/events?afterSeq=0&limit=100`) as { items?: Array<{ type: string; payload: unknown }> }; + assert.ok(events.items?.some((event) => event.type === "command_output" && String(eventPayload(event).text ?? "").includes("progress")), "progress output should be visible before hard timeout"); + assert.ok(events.items?.some((event) => event.type === "error" && eventPayload(event).phase === "turn:hard-timeout"), "hard timeout should be recorded as an error event"); + assert.ok(events.items?.some((event) => event.type === "backend_status" && eventPayload(event).phase === "turn-interrupt-requested"), "timeout should request backend interrupt before process teardown"); + assert.ok(events.items?.some((event) => event.type === "backend_status" && eventPayload(event).phase === "turn/interrupt:completed"), "timeout interrupt result should be visible"); + const command = await options.client.get(`/api/v1/runs/${item.runId}/commands/${item.commandId}`) as { state?: string }; + assert.equal(command.state, "failed", "hard timed out command should be failed"); + assertNoSecretLeak({ result, events }); +} + async function runFailureDoesNotTerminalRunCase(options: { client: ManagerClient; managerUrl: string; context: SelfTestContext }): Promise { const item = await createRunWithCommand(options.client, options.context, "first command fails", "selftest-command-failure-keeps-run-open", 3_000); const result = await runOnce({ diff --git a/src/selftest/fake-codex-app-server.ts b/src/selftest/fake-codex-app-server.ts index 5c82798..28be56b 100644 --- a/src/selftest/fake-codex-app-server.ts +++ b/src/selftest/fake-codex-app-server.ts @@ -224,6 +224,29 @@ for await (const line of rl) { }, 50); continue; } + if (mode === "tool-hangs-before-turn-start-response") { + turnCounter += 1; + const turn = { id: `turn_selftest_${turnCounter}`, status: "running" }; + notify("turn/started", { turn }); + notify("item/started", { item: { id: "tool_hang_before_response", type: "commandExecution", command: "hwpod cmd git clone", status: "running", processId: process.pid } }); + notify("item/commandExecution/outputDelta", { itemId: "tool_hang_before_response", delta: "clone started\n" }); + activeSteerTurn = { id: turn.id, completed: false, timer: setTimeout(() => undefined, 60_000) }; + continue; + } + if (mode === "hard-timeout-tool-progress") { + turnCounter += 1; + const turn = { id: `turn_selftest_${turnCounter}`, status: "running" }; + notify("turn/started", { turn }); + notify("item/started", { item: { id: "tool_hard_timeout", type: "commandExecution", command: "hwpod cmd long-running", status: "running", processId: process.pid } }); + respond(message.id, { turn }); + activeSteerTurn = { id: turn.id, completed: false, timer: null }; + let ticks = 0; + activeSteerTurn.timer = setInterval(() => { + ticks += 1; + notify("item/commandExecution/outputDelta", { itemId: "tool_hard_timeout", delta: `progress ${ticks}\n` }); + }, 25); + continue; + } if (mode === "steer-waits") { turnCounter += 1; const turn = { id: `turn_selftest_${turnCounter}`, status: "running" }; @@ -276,6 +299,16 @@ for await (const line of rl) { setTimeout(() => completeActiveSteerTurn("steer-applied"), 20); continue; } + if (message.method === "turn/interrupt") { + if ((mode !== "tool-hangs-before-turn-start-response" && mode !== "hard-timeout-tool-progress" && mode !== "steer-waits") || !activeSteerTurn) { + respond(message.id, null, { code: -32000, message: "no active fake turn for interrupt" }); + continue; + } + notify("item/completed", { item: { id: "tool_interrupted", type: "commandExecution", command: "hwpod cmd interrupted", status: "cancelled" } }); + respond(message.id, { interrupted: true }); + setTimeout(() => completeActiveSteerTurn("interrupt-applied", "cancelled"), 20); + continue; + } respond(message.id, null, { code: -32601, message: `unsupported fake method ${message.method ?? "unknown"}` }); } @@ -288,11 +321,11 @@ function notify(method: string, params: unknown): void { process.stdout.write(`${JSON.stringify({ method, params })}\n`); } -function completeActiveSteerTurn(reason: string): void { +function completeActiveSteerTurn(reason: string, status = "completed"): void { if (!activeSteerTurn || activeSteerTurn.completed) return; activeSteerTurn.completed = true; if (activeSteerTurn.timer) clearTimeout(activeSteerTurn.timer); - const turn = { id: activeSteerTurn.id, status: "completed", reason }; + const turn = { id: activeSteerTurn.id, status, reason }; notify("turn/completed", { turn }); }