From fc5171d71938e906ad3050b0a1b295cfbc7290bc Mon Sep 17 00:00:00 2001 From: Codex Date: Tue, 2 Jun 2026 01:07:56 +0800 Subject: [PATCH] fix: restart stale codex threads --- src/backend/codex-stdio.ts | 43 +++++++++++++++++++++---- src/selftest/cases/30-codex-stdio.ts | 46 ++++++++++++++++++++++++++- src/selftest/fake-codex-app-server.ts | 4 +++ 3 files changed, 85 insertions(+), 8 deletions(-) diff --git a/src/backend/codex-stdio.ts b/src/backend/codex-stdio.ts index def3ef3..871575e 100644 --- a/src/backend/codex-stdio.ts +++ b/src/backend/codex-stdio.ts @@ -330,13 +330,36 @@ export async function runCodexStdioTurn(options: CodexStdioTurnOptions): Promise client.notify("initialized", {}); events.push({ type: "backend_status", payload: { phase: "initialize:completed", ...backendMetadata(options), protocol: codexProtocol } }); - const threadMethod = options.threadId ? "thread/resume" : "thread/start"; - const threadParams: JsonRecord = options.threadId - ? withOptionalModel({ threadId: options.threadId, cwd: options.cwd, approvalPolicy: options.approvalPolicy, sandbox: options.sandbox }, options.model) - : withOptionalModel({ cwd: options.cwd, approvalPolicy: options.approvalPolicy, sandbox: options.sandbox, serviceName: "agentrun" }, options.model); - const threadResponse = requireResponseRecord(await client.request(threadMethod, threadParams, requestTimeoutMs), threadMethod); - threadId = requireNestedId(threadResponse, threadMethod, "thread"); - events.push({ type: "backend_status", payload: { phase: `${threadMethod}:completed`, threadId } }); + const startThread = async (phasePrefix = "thread/start"): Promise => { + const response = requireResponseRecord(await client!.request("thread/start", withOptionalModel({ cwd: options.cwd, approvalPolicy: options.approvalPolicy, sandbox: options.sandbox, serviceName: "agentrun" }, options.model), requestTimeoutMs), "thread/start"); + const nextThreadId = requireNestedId(response, "thread/start", "thread"); + events.push({ type: "backend_status", payload: { phase: `${phasePrefix}:completed`, threadId: nextThreadId } }); + return nextThreadId; + }; + + if (options.threadId) { + try { + const threadResponse = requireResponseRecord(await client.request("thread/resume", withOptionalModel({ threadId: options.threadId, cwd: options.cwd, approvalPolicy: options.approvalPolicy, sandbox: options.sandbox }, options.model), requestTimeoutMs), "thread/resume"); + threadId = requireNestedId(threadResponse, "thread/resume", "thread"); + events.push({ type: "backend_status", payload: { phase: "thread/resume:completed", threadId } }); + } catch (error) { + const failure = normalizeFailure(error); + if (!isStaleThreadResumeFailure(failure)) throw error; + events.push({ + type: "backend_status", + payload: { + phase: "thread/resume:stale-thread-fallback", + requestedThreadId: options.threadId, + failureKind: failure.failureKind, + message: failure.message, + fallback: "thread/start", + }, + }); + threadId = await startThread("thread/start:after-stale-resume"); + } + } else { + threadId = await startThread(); + } const turnResponse = requireResponseRecord(await client.request("turn/start", withOptionalModel({ threadId, input: textInput(options.prompt), cwd: options.cwd, approvalPolicy: options.approvalPolicy }, options.model), requestTimeoutMs), "turn/start"); turnId = requireNestedId(turnResponse, "turn/start", "turn"); @@ -644,6 +667,12 @@ function normalizeFailure(error: unknown): CodexStdioFailure { return new CodexStdioFailure(classifyMessageFailureKind(message, "backend-protocol-error"), message, "codex-stdio"); } +function isStaleThreadResumeFailure(error: CodexStdioFailure): boolean { + if (error.phase !== "response:thread/resume") return false; + const text = `${error.message}\n${JSON.stringify(error.details)}`.toLowerCase(); + return /no rollout found for thread id/u.test(text); +} + function classifyCodexErrorRecord(error: JsonRecord, fallback: FailureKind): FailureKind { const parts: string[] = []; if (typeof error.message === "string") parts.push(error.message); diff --git a/src/selftest/cases/30-codex-stdio.ts b/src/selftest/cases/30-codex-stdio.ts index 5a1be40..8f321fd 100644 --- a/src/selftest/cases/30-codex-stdio.ts +++ b/src/selftest/cases/30-codex-stdio.ts @@ -68,6 +68,26 @@ const selfTest: SelfTestCase = async (context) => { assert.equal(assistantEvents.length, 1, "backend should emit one final assistant_message event"); assert.equal(eventPayload(assistantEvents[0] ?? { payload: {} }).text, "Final answer only."); + const staleResume = await createStaleThreadRun(client, context); + const staleResumeResult = await runOnce({ + managerUrl: server.baseUrl, + runId: staleResume.runId, + commandId: staleResume.commandId, + codexCommand: context.fakeCodexCommand, + codexArgs: context.fakeCodexArgs, + codexHome: context.codexHome, + env: { CODEX_HOME: context.codexHome, AGENTRUN_FAKE_CODEX_MODE: "resume-no-rollout" }, + oneShot: true, + }) as JsonRecord; + assert.equal(staleResumeResult.terminalStatus, "completed", "stale thread resume should fall back to thread/start and complete the turn"); + const staleEnvelope = await client.get(`/api/v1/runs/${staleResume.runId}/commands/${staleResume.commandId}/result`) as JsonRecord; + assert.equal(staleEnvelope.terminalStatus, "completed"); + assert.equal((staleEnvelope.sessionRef as JsonRecord).threadId, "thread_selftest_1"); + const staleEvents = await client.get(`/api/v1/runs/${staleResume.runId}/events?afterSeq=0&limit=100`) as { items?: Array<{ type: string; payload: unknown }> }; + assert.ok(staleEvents.items?.some((event) => event.type === "backend_status" && eventPayload(event).phase === "thread/resume:stale-thread-fallback"), "stale resume fallback event should be visible"); + assert.ok(staleEvents.items?.some((event) => event.type === "backend_status" && eventPayload(event).phase === "thread/start:after-stale-resume:completed"), "fallback should start a fresh thread"); + assertNoSecretLeak(staleEvents); + await runFailureCase({ client, managerUrl: server.baseUrl, context, mode: "missing-turn-result", expectedStatus: "failed", expectedFailureKind: "backend-response-invalid" }); await runFailureCase({ client, managerUrl: server.baseUrl, context, mode: "provider-401-rpc-error", expectedStatus: "failed", expectedFailureKind: "provider-auth-failed" }); await runFailureCase({ client, managerUrl: server.baseUrl, context, mode: "provider-429-terminal", expectedStatus: "failed", expectedFailureKind: "provider-rate-limited" }); @@ -79,7 +99,7 @@ const selfTest: SelfTestCase = async (context) => { await runSecretFailureCase({ client, managerUrl: server.baseUrl, context }); await runSpawnFailureCase({ client, managerUrl: server.baseUrl, context }); - return { name: "codex-stdio", tests: ["runner-lease-heartbeat", "codex-stdio-fake-turn", "codex-stdio-projected-writable-home", "codex-stdio-deepseek-profile-fake-turn", "codex-stdio-deepseek-missing-secret-no-fallback", "codex-stdio-config-model-authoritative", "codex-stdio-explicit-model-forwarded", "codex-stdio-final-agent-message-only", "codex-stdio-missing-turn-result", "codex-stdio-provider-auth-failed", "codex-stdio-provider-rate-limited", "codex-stdio-provider-503-rpc-error", "codex-stdio-provider-503-terminal", "codex-stdio-provider-503-retry-event", "codex-stdio-invalid-json", "codex-stdio-timeout", "codex-stdio-secret-unavailable", "codex-stdio-spawn-failure"] }; + return { name: "codex-stdio", tests: ["runner-lease-heartbeat", "codex-stdio-fake-turn", "codex-stdio-projected-writable-home", "codex-stdio-deepseek-profile-fake-turn", "codex-stdio-deepseek-missing-secret-no-fallback", "codex-stdio-config-model-authoritative", "codex-stdio-explicit-model-forwarded", "codex-stdio-final-agent-message-only", "codex-stdio-stale-thread-fallback", "codex-stdio-missing-turn-result", "codex-stdio-provider-auth-failed", "codex-stdio-provider-rate-limited", "codex-stdio-provider-503-rpc-error", "codex-stdio-provider-503-terminal", "codex-stdio-provider-503-retry-event", "codex-stdio-invalid-json", "codex-stdio-timeout", "codex-stdio-secret-unavailable", "codex-stdio-spawn-failure"] }; } finally { await new Promise((resolve) => server.server.close(() => resolve())); } @@ -116,6 +136,30 @@ function eventPayload(event: { payload: unknown }): JsonRecord { return typeof event.payload === "object" && event.payload !== null && !Array.isArray(event.payload) ? event.payload as JsonRecord : {}; } +async function createStaleThreadRun(client: ManagerClient, context: SelfTestContext): Promise<{ runId: string; commandId: string }> { + const run = await client.post("/api/v1/runs", { + tenantId: "unidesk", + projectId: "pikasTech/unidesk", + workspaceRef: { kind: "host-path", path: context.workspace }, + sessionRef: { sessionId: "selftest-stale-thread-session", conversationId: "selftest-stale-thread-session", threadId: "thread_missing_rollout" }, + providerId: "G14", + backendProfile: "codex", + executionPolicy: { + sandbox: "workspace-write", + approval: "never", + timeoutMs: 15_000, + network: "default", + secretScope: { + allowCredentialEcho: false, + providerCredentials: [{ profile: "codex", secretRef: { name: "agentrun-v01-provider-codex", keys: ["auth.json", "config.toml"], mountPath: context.codexHome } }], + }, + }, + traceSink: null, + }) as { id: string }; + const command = await client.post(`/api/v1/runs/${run.id}/commands`, { type: "turn", payload: { prompt: "hello stale thread" }, idempotencyKey: "selftest-stale-thread-fallback" }) as { id: string }; + return { runId: run.id, commandId: command.id }; +} + async function runSecretFailureCase(options: { client: ManagerClient; managerUrl: string; context: SelfTestContext }): Promise { const item = await createRunWithCommand(options.client, options.context, "failure missing secret files", "selftest-secret-unavailable", 3_000); const result = await runOnce({ diff --git a/src/selftest/fake-codex-app-server.ts b/src/selftest/fake-codex-app-server.ts index 14c142d..4c2584f 100644 --- a/src/selftest/fake-codex-app-server.ts +++ b/src/selftest/fake-codex-app-server.ts @@ -36,6 +36,10 @@ for await (const line of rl) { } if (message.method === "thread/resume") { observedThreadModel = Object.hasOwn(message.params ?? {}, "model"); + if (mode === "resume-no-rollout") { + respond(message.id, null, { code: -32000, message: `no rollout found for thread id ${String(message.params?.threadId ?? "unknown")}` }); + continue; + } if (mode === "reject-unexpected-model" && observedThreadModel) { respond(message.id, null, { code: -32000, message: "thread/resume unexpectedly included model" }); continue;