From e1c0fb524509664531e388072c10217adaa793eb Mon Sep 17 00:00:00 2001 From: Lyon <88232613+pikasTech@users.noreply.github.com> Date: Tue, 2 Jun 2026 10:48:28 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E6=81=A2=E5=A4=8D=20stale=20codex=20thr?= =?UTF-8?q?ead=20(#71)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Codex --- docs/reference/spec-v01-backend-codex.md | 8 +++++-- src/backend/codex-stdio.ts | 29 +++++++++++++++++++++--- src/selftest/cases/30-codex-stdio.ts | 17 +++++++------- 3 files changed, 41 insertions(+), 13 deletions(-) diff --git a/docs/reference/spec-v01-backend-codex.md b/docs/reference/spec-v01-backend-codex.md index 4647e5e..0989c24 100644 --- a/docs/reference/spec-v01-backend-codex.md +++ b/docs/reference/spec-v01-backend-codex.md @@ -18,7 +18,7 @@ Codex stdio backend 是 AgentRun `v0.1` 的第一真实 Code Agent backend kind codex app-server --listen stdio:// ``` -Adapter 通过 stdin 写入换行分隔 JSON-RPC 请求,通过 stdout 逐行读取 JSON-RPC response 和 notification,stderr 只作为有界诊断日志。最小请求序列是 `initialize`、`thread/start` 或 `thread/resume`、`turn/start`;response 中必须提取 thread/turn identity,notification 和后续输出必须归一化为 `backend_status`、`assistant_message`、`tool_call`、`command_output`、`error` 和 `terminal_status` events。运行中 steer 使用同一 app-server 进程的 `turn/steer` JSON-RPC 方法,参数为 `threadId`、`expectedTurnId` 和文本 `input` 数组;取消/中断使用 `turn/interrupt`,参数为 `threadId` 和 `turnId`。 +Adapter 通过 stdin 写入换行分隔 JSON-RPC 请求,通过 stdout 逐行读取 JSON-RPC response 和 notification,stderr 只作为有界诊断日志。最小请求序列是 `initialize`、`thread/start` 或 `thread/resume`、`turn/start`;response 中必须提取 thread/turn identity,notification 和后续输出必须归一化为 `backend_status`、`assistant_message`、`tool_call`、`command_output`、`error` 和 `terminal_status` events。运行中 steer 使用同一 app-server 进程的 `turn/steer` JSON-RPC 方法,参数为 `threadId`、`expectedTurnId` 和文本 `input` 数组;取消/中断使用 `turn/interrupt`,参数为 `threadId` 和 `turnId`。当 `thread/resume` 返回 `no rollout found for thread id` 时,说明 durable session 指向的 Codex rollout 已不在当前 app-server 中;adapter 必须记录 `thread/resume:stale-thread-fallback`,随后执行 `thread/start` 创建 replacement thread,并由 manager 将新的 `threadId` 回写到 SessionRef。其他 `thread/resume` 错误不得 fallback,仍按 failureKind 失败。 不得把以下路径作为 `v0.1` Codex stdio backend 的正式实现或综合联调通过证据:直接 Responses HTTP 代理、OpenAI SDK wrapper、`codex exec` 一次性命令输出、fake provider、固定文本回复、只读 shortcut 或本地 shell 模拟。裸 HTTP 或 `codex exec --json` 可以作为 provider/upstream 诊断,但最终通过必须来自 app-server stdio turn。 @@ -126,13 +126,17 @@ Run 的 `executionPolicy.secretScope` 应引用与 `backendProfile` 匹配的 pr 阅读本文,然后在真实 `agentrun-v01` 运行面按顺序执行 `backendProfile=codex`、`backendProfile=deepseek`、`backendProfile=minimax-m3`、`backendProfile=codex` 四个最短 turn。确认第二个 run 使用 DeepSeek profile,第三个 run 使用 MiniMax-M3 profile,前后两个 `codex` run 仍使用原 Codex profile;四者的 event、log、backend_status、model/upstream metadata 和 failureKind 不互相污染,且任何一个 profile SecretRef 缺失都不会 fallback 到另一个 profile。 +### T7 Stale thread recovery + +阅读本文和 [spec-v01-runtime-assembly.md](spec-v01-runtime-assembly.md),然后构造一个带旧 `SessionRef.threadId` 的真实或 fake app-server run,使 `thread/resume` 返回 `no rollout found for thread id`。确认 adapter 不把该缺失 rollout 直接作为终态失败,而是记录 `thread/resume:stale-thread-fallback`,执行 `thread/start`,完成当前 turn,并在 result/sessionRef 中回写新的 `threadId`。确认 provider auth、rate limit、model config 或其他 protocol error 不走该 fallback。 + ## 规格的实现情况 | 规格项 | 状态 | 说明 | | --- | --- | --- | | Codex stdio backend/profile 规格 | 已定义 | 本文为 v0.1 Codex app-server stdio backend kind 和 profile 权威。 | | Codex Secret projection | 已实现/已通过主闭环 | runner Job 使用只读 Secret projection 和 writable `CODEX_HOME`,Codex 测试凭据来自 `agentrun-v01-provider-codex` 的 `auth.json`/`config.toml`。 | -| Codex adapter | 已实现/已通过主闭环 | 当前代码已实现受控 `codex app-server --listen stdio://`、`initialize`/`thread/start`/`thread/resume`/`turn/start` response 校验、stderr 有界诊断、spawn/JSON parse/response invalid/timeout/provider 5xx availability failureKind,以及包含 retry error notification 的 fake app-server 自测试。 | +| Codex adapter | 已实现/已通过主闭环 | 当前代码已实现受控 `codex app-server --listen stdio://`、`initialize`/`thread/start`/`thread/resume`/`turn/start` response 校验、stale rollout thread fallback、stderr 有界诊断、spawn/JSON parse/response invalid/timeout/provider 5xx availability failureKind,以及包含 retry error notification 的 fake app-server 自测试。 | | 错误可观测与脱敏 | 已实现主路径 | child env、cwd、workspace 和 Codex home 只输出摘要;stderr tail 有界且标记截断;事件和 failure 统一走 redaction。 | | 真实 provider turn | 已通过主闭环 | 真实 Codex provider turn 已经通过 RESTful API 和 CLI 综合联调;每次发布仍按 [spec-v01-validation.md](spec-v01-validation.md) 手动复验。 | | `deepseek` profile | 已实现/已通过主闭环 | 代码已支持 `agentrun-v01-provider-deepseek`、独立 `CODEX_HOME`、同一 `codex app-server --listen stdio://` 协议和 profile metadata;真实 Kubernetes SecretRef、runner Job 和 Codex stdio turn 已通过主闭环。 | diff --git a/src/backend/codex-stdio.ts b/src/backend/codex-stdio.ts index a8add20..90c80ed 100644 --- a/src/backend/codex-stdio.ts +++ b/src/backend/codex-stdio.ts @@ -446,9 +446,26 @@ async function runCodexStdioTurnWithSession(options: CodexStdioTurnOptions, sess }; if (options.threadId) { - const threadResponse = requireResponseRecord(await client.request("thread/resume", withOptionalModel({ threadId: options.threadId, cwd: options.cwd, approvalPolicy: options.approvalPolicy, sandbox: options.sandbox }, options.model), requestTimeoutMs), "thread/resume"); - threadId = requireNestedId(threadResponse, "thread/resume", "thread"); - emitEvent({ type: "backend_status", payload: { phase: "thread/resume:completed", threadId } }); + try { + const threadResponse = requireResponseRecord(await client.request("thread/resume", withOptionalModel({ threadId: options.threadId, cwd: options.cwd, approvalPolicy: options.approvalPolicy, sandbox: options.sandbox }, options.model), requestTimeoutMs), "thread/resume"); + threadId = requireNestedId(threadResponse, "thread/resume", "thread"); + emitEvent({ type: "backend_status", payload: { phase: "thread/resume:completed", threadId } }); + } catch (error) { + const failure = normalizeFailure(error); + if (!isStaleThreadResumeFailure(failure)) throw error; + emitEvent({ + type: "backend_status", + payload: { + phase: "thread/resume:stale-thread-fallback", + requestedThreadId: options.threadId, + failureKind: failure.failureKind, + message: failure.message, + fallback: "thread/start", + valuesPrinted: false, + }, + }); + threadId = await startThread("thread/start:after-stale-resume"); + } } else { threadId = await startThread(); } @@ -901,6 +918,12 @@ function normalizeFailure(error: unknown): CodexStdioFailure { return new CodexStdioFailure(classifyMessageFailureKind(message, "backend-protocol-error"), message, "codex-stdio"); } +function isStaleThreadResumeFailure(error: CodexStdioFailure): boolean { + if (error.phase !== "response:thread/resume") return false; + const text = `${error.message}\n${JSON.stringify(error.details)}`.toLowerCase(); + return text.includes("no rollout found for thread id"); +} + function classifyCodexErrorRecord(error: JsonRecord, fallback: FailureKind): FailureKind { const parts: string[] = []; if (typeof error.message === "string") parts.push(error.message); diff --git a/src/selftest/cases/30-codex-stdio.ts b/src/selftest/cases/30-codex-stdio.ts index 4a3f92b..6597049 100644 --- a/src/selftest/cases/30-codex-stdio.ts +++ b/src/selftest/cases/30-codex-stdio.ts @@ -104,14 +104,15 @@ const selfTest: SelfTestCase = async (context) => { env: { CODEX_HOME: context.codexHome, AGENTRUN_FAKE_CODEX_MODE: "resume-no-rollout" }, oneShot: true, }) as JsonRecord; - assert.equal(staleThreadResult.terminalStatus, "failed", "standard thread resume failure must not start a replacement thread"); - assert.equal(staleThreadResult.failureKind, "backend-protocol-error"); + assert.equal(staleThreadResult.terminalStatus, "completed", "stale thread resume should start a replacement thread and complete the turn"); const staleEnvelope = await client.get(`/api/v1/runs/${staleThread.runId}/commands/${staleThread.commandId}/result`) as JsonRecord; - assert.equal(staleEnvelope.terminalStatus, "failed"); - assert.equal(staleEnvelope.completed, false); - assert.equal((staleEnvelope.sessionRef as JsonRecord).threadId, "thread_missing_rollout"); + assert.equal(staleEnvelope.terminalStatus, "completed"); + assert.equal(staleEnvelope.completed, true); + assert.equal((staleEnvelope.sessionRef as JsonRecord).threadId, "thread_selftest_1"); const staleEvents = await client.get(`/api/v1/runs/${staleThread.runId}/events?afterSeq=0&limit=100`) as { items?: Array<{ type: string; payload: unknown }> }; - assert.equal(staleEvents.items?.some((event) => event.type === "backend_status" && eventPayload(event).phase === "thread/start:completed"), false, "stale standard thread must not create a replacement thread"); + assert.equal(staleEvents.items?.some((event) => event.type === "backend_status" && eventPayload(event).phase === "thread/resume:stale-thread-fallback"), true, "stale resume fallback must be visible"); + assert.equal(staleEvents.items?.some((event) => event.type === "backend_status" && eventPayload(event).phase === "thread/start:after-stale-resume:completed"), true, "stale resume must start a replacement thread"); + assert.equal(staleEvents.items?.some((event) => event.type === "backend_status" && eventPayload(event).phase === "thread/resume:completed"), false, "stale resume must not be reported as a successful resume"); assertNoSecretLeak({ staleThreadResult, staleEnvelope, staleEvents }); const live = await createRunWithCommand(client, context, "hello live events", "selftest-live-tool-events", 15_000); @@ -164,7 +165,7 @@ const selfTest: SelfTestCase = async (context) => { await runSecretFailureCase({ client, managerUrl: server.baseUrl, context }); await runSpawnFailureCase({ client, managerUrl: server.baseUrl, context }); - return { name: "codex-stdio", tests: ["runner-lease-heartbeat", "codex-stdio-fake-turn", "codex-stdio-projected-writable-home", "codex-stdio-deepseek-profile-fake-turn", "codex-stdio-minimax-m3-profile-fake-turn", "codex-stdio-deepseek-missing-secret-no-fallback", "codex-stdio-minimax-m3-missing-secret-no-fallback", "codex-stdio-config-model-authoritative", "codex-stdio-explicit-model-forwarded", "codex-stdio-final-agent-message-only", "codex-stdio-stale-thread-fails", "codex-stdio-live-tool-events", "codex-stdio-noisy-reasoning-suppression", "codex-stdio-missing-turn-result", "codex-stdio-provider-auth-failed", "codex-stdio-provider-rate-limited", "codex-stdio-provider-503-rpc-error", "codex-stdio-provider-503-terminal", "codex-stdio-provider-503-retry-event", "codex-stdio-invalid-json", "codex-stdio-timeout", "codex-stdio-secret-unavailable", "codex-stdio-spawn-failure"] }; + return { name: "codex-stdio", tests: ["runner-lease-heartbeat", "codex-stdio-fake-turn", "codex-stdio-projected-writable-home", "codex-stdio-deepseek-profile-fake-turn", "codex-stdio-minimax-m3-profile-fake-turn", "codex-stdio-deepseek-missing-secret-no-fallback", "codex-stdio-minimax-m3-missing-secret-no-fallback", "codex-stdio-config-model-authoritative", "codex-stdio-explicit-model-forwarded", "codex-stdio-final-agent-message-only", "codex-stdio-stale-thread-fallback", "codex-stdio-live-tool-events", "codex-stdio-noisy-reasoning-suppression", "codex-stdio-missing-turn-result", "codex-stdio-provider-auth-failed", "codex-stdio-provider-rate-limited", "codex-stdio-provider-503-rpc-error", "codex-stdio-provider-503-terminal", "codex-stdio-provider-503-retry-event", "codex-stdio-invalid-json", "codex-stdio-timeout", "codex-stdio-secret-unavailable", "codex-stdio-spawn-failure"] }; } finally { await new Promise((resolve) => server.server.close(() => resolve())); } @@ -236,7 +237,7 @@ async function createStaleThreadRun(client: ManagerClient, context: SelfTestCont }, traceSink: null, }) as { id: string }; - const command = await client.post(`/api/v1/runs/${run.id}/commands`, { type: "turn", payload: { prompt: "hello stale thread" }, idempotencyKey: "selftest-stale-thread-fails" }) as { id: string }; + const command = await client.post(`/api/v1/runs/${run.id}/commands`, { type: "turn", payload: { prompt: "hello stale thread" }, idempotencyKey: "selftest-stale-thread-fallback" }) as { id: string }; return { runId: run.id, commandId: command.id }; }