From 40a274d52b60f52e3cdc023128f279ace7c2259f Mon Sep 17 00:00:00 2001 From: Codex Date: Tue, 2 Jun 2026 12:08:38 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E6=94=B6=E6=95=9B=20stale=20thread=20?= =?UTF-8?q?=E5=92=8C=20tool-call=20=E9=94=99=E8=AF=AF=E5=BD=92=E5=9B=A0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- docs/reference/spec-v01-backend-adapter.md | 2 ++ docs/reference/spec-v01-backend-codex.md | 8 ++--- src/backend/codex-stdio.ts | 34 +++++++++++++--------- src/common/types.ts | 2 ++ src/selftest/cases/30-codex-stdio.ts | 20 ++++++++----- src/selftest/fake-codex-app-server.ts | 15 ++++++++++ 6 files changed, 55 insertions(+), 26 deletions(-) diff --git a/docs/reference/spec-v01-backend-adapter.md b/docs/reference/spec-v01-backend-adapter.md index 253461b..5b41e6d 100644 --- a/docs/reference/spec-v01-backend-adapter.md +++ b/docs/reference/spec-v01-backend-adapter.md @@ -86,9 +86,11 @@ Adapter 必须把 backend 错误映射为稳定 failureKind: | `backend-protocol-error` | backend 输出无法解析、协议字段缺失。 | | `backend-json-parse-error` | backend stdout 不是合法 JSON-RPC 行。 | | `backend-response-invalid` | backend JSON-RPC response/terminal notification 缺少必需字段。 | +| `thread-resume-failed` | 已有 `SessionRef.threadId` 指向的 Codex rollout 不存在;adapter 必须失败本轮并保留原 session 指针,不得自动创建 replacement thread。 | | `backend-spawn-failed` | backend app-server 进程无法启动。 | | `backend-failed` | backend 进程非零退出或 terminal error。 | | `backend-timeout` | executionPolicy timeout 触发。 | +| `provider-invalid-tool-call` | provider / Codex app-server 返回无效 tool-call arguments JSON,例如 `invalid_prompt` 与 `invalid function arguments json string`。 | | `cancelled` | interrupt/cancel 生效。 | ## Credential Boundary diff --git a/docs/reference/spec-v01-backend-codex.md b/docs/reference/spec-v01-backend-codex.md index 0989c24..9ae1ca3 100644 --- a/docs/reference/spec-v01-backend-codex.md +++ b/docs/reference/spec-v01-backend-codex.md @@ -18,7 +18,7 @@ Codex stdio backend 是 AgentRun `v0.1` 的第一真实 Code Agent backend kind codex app-server --listen stdio:// ``` -Adapter 通过 stdin 写入换行分隔 JSON-RPC 请求,通过 stdout 逐行读取 JSON-RPC response 和 notification,stderr 只作为有界诊断日志。最小请求序列是 `initialize`、`thread/start` 或 `thread/resume`、`turn/start`;response 中必须提取 thread/turn identity,notification 和后续输出必须归一化为 `backend_status`、`assistant_message`、`tool_call`、`command_output`、`error` 和 `terminal_status` events。运行中 steer 使用同一 app-server 进程的 `turn/steer` JSON-RPC 方法,参数为 `threadId`、`expectedTurnId` 和文本 `input` 数组;取消/中断使用 `turn/interrupt`,参数为 `threadId` 和 `turnId`。当 `thread/resume` 返回 `no rollout found for thread id` 时,说明 durable session 指向的 Codex rollout 已不在当前 app-server 中;adapter 必须记录 `thread/resume:stale-thread-fallback`,随后执行 `thread/start` 创建 replacement thread,并由 manager 将新的 `threadId` 回写到 SessionRef。其他 `thread/resume` 错误不得 fallback,仍按 failureKind 失败。 +Adapter 通过 stdin 写入换行分隔 JSON-RPC 请求,通过 stdout 逐行读取 JSON-RPC response 和 notification,stderr 只作为有界诊断日志。最小请求序列是 `initialize`、`thread/start` 或 `thread/resume`、`turn/start`;response 中必须提取 thread/turn identity,notification 和后续输出必须归一化为 `backend_status`、`assistant_message`、`tool_call`、`command_output`、`error` 和 `terminal_status` events。运行中 steer 使用同一 app-server 进程的 `turn/steer` JSON-RPC 方法,参数为 `threadId`、`expectedTurnId` 和文本 `input` 数组;取消/中断使用 `turn/interrupt`,参数为 `threadId` 和 `turnId`。当已有 `SessionRef.threadId` 的 `thread/resume` 返回 `no rollout found for thread id` 时,说明 durable session 指向的 Codex rollout 已不在当前 app-server 中;adapter 必须以 `thread-resume-failed` 失败本轮,保留原 session 指针供上层重新建会话或显式清理。adapter 不得自动执行 `thread/start` 创建 replacement thread,也不得把旧 thread 问题伪装成成功 resume;provider auth、rate limit、model config 或其他 protocol error 继续按各自 failureKind 失败。 不得把以下路径作为 `v0.1` Codex stdio backend 的正式实现或综合联调通过证据:直接 Responses HTTP 代理、OpenAI SDK wrapper、`codex exec` 一次性命令输出、fake provider、固定文本回复、只读 shortcut 或本地 shell 模拟。裸 HTTP 或 `codex exec --json` 可以作为 provider/upstream 诊断,但最终通过必须来自 app-server stdio turn。 @@ -126,9 +126,9 @@ Run 的 `executionPolicy.secretScope` 应引用与 `backendProfile` 匹配的 pr 阅读本文,然后在真实 `agentrun-v01` 运行面按顺序执行 `backendProfile=codex`、`backendProfile=deepseek`、`backendProfile=minimax-m3`、`backendProfile=codex` 四个最短 turn。确认第二个 run 使用 DeepSeek profile,第三个 run 使用 MiniMax-M3 profile,前后两个 `codex` run 仍使用原 Codex profile;四者的 event、log、backend_status、model/upstream metadata 和 failureKind 不互相污染,且任何一个 profile SecretRef 缺失都不会 fallback 到另一个 profile。 -### T7 Stale thread recovery +### T7 Stale thread resume failure -阅读本文和 [spec-v01-runtime-assembly.md](spec-v01-runtime-assembly.md),然后构造一个带旧 `SessionRef.threadId` 的真实或 fake app-server run,使 `thread/resume` 返回 `no rollout found for thread id`。确认 adapter 不把该缺失 rollout 直接作为终态失败,而是记录 `thread/resume:stale-thread-fallback`,执行 `thread/start`,完成当前 turn,并在 result/sessionRef 中回写新的 `threadId`。确认 provider auth、rate limit、model config 或其他 protocol error 不走该 fallback。 +阅读本文和 [spec-v01-runtime-assembly.md](spec-v01-runtime-assembly.md),然后构造一个带旧 `SessionRef.threadId` 的真实或 fake app-server run,使 `thread/resume` 返回 `no rollout found for thread id`。确认 adapter 以 `thread-resume-failed` 失败本轮,事件中包含 `error.failureKind=thread-resume-failed`,result/sessionRef 仍保留原 `threadId`,且不会出现任何 `thread/start` 事件。确认 provider auth、rate limit、model config 或其他 protocol error 也按各自 failureKind 直接失败。 ## 规格的实现情况 @@ -136,7 +136,7 @@ Run 的 `executionPolicy.secretScope` 应引用与 `backendProfile` 匹配的 pr | --- | --- | --- | | Codex stdio backend/profile 规格 | 已定义 | 本文为 v0.1 Codex app-server stdio backend kind 和 profile 权威。 | | Codex Secret projection | 已实现/已通过主闭环 | runner Job 使用只读 Secret projection 和 writable `CODEX_HOME`,Codex 测试凭据来自 `agentrun-v01-provider-codex` 的 `auth.json`/`config.toml`。 | -| Codex adapter | 已实现/已通过主闭环 | 当前代码已实现受控 `codex app-server --listen stdio://`、`initialize`/`thread/start`/`thread/resume`/`turn/start` response 校验、stale rollout thread fallback、stderr 有界诊断、spawn/JSON parse/response invalid/timeout/provider 5xx availability failureKind,以及包含 retry error notification 的 fake app-server 自测试。 | +| Codex adapter | 已实现/已通过主闭环 | 当前代码已实现受控 `codex app-server --listen stdio://`、`initialize`/`thread/start`/`thread/resume`/`turn/start` response 校验、stale rollout thread 失败归因、stderr 有界诊断、spawn/JSON parse/response invalid/timeout/provider 5xx/invalid tool-call availability failureKind,以及包含 retry error notification 的 fake app-server 自测试。 | | 错误可观测与脱敏 | 已实现主路径 | child env、cwd、workspace 和 Codex home 只输出摘要;stderr tail 有界且标记截断;事件和 failure 统一走 redaction。 | | 真实 provider turn | 已通过主闭环 | 真实 Codex provider turn 已经通过 RESTful API 和 CLI 综合联调;每次发布仍按 [spec-v01-validation.md](spec-v01-validation.md) 手动复验。 | | `deepseek` profile | 已实现/已通过主闭环 | 代码已支持 `agentrun-v01-provider-deepseek`、独立 `CODEX_HOME`、同一 `codex app-server --listen stdio://` 协议和 profile metadata;真实 Kubernetes SecretRef、runner Job 和 Codex stdio turn 已通过主闭环。 | diff --git a/src/backend/codex-stdio.ts b/src/backend/codex-stdio.ts index bd630c7..20f3f74 100644 --- a/src/backend/codex-stdio.ts +++ b/src/backend/codex-stdio.ts @@ -452,19 +452,8 @@ async function runCodexStdioTurnWithSession(options: CodexStdioTurnOptions, sess emitEvent({ type: "backend_status", payload: { phase: "thread/resume:completed", threadId } }); } catch (error) { const failure = normalizeFailure(error); - if (!isStaleThreadResumeFailure(failure)) throw error; - emitEvent({ - type: "backend_status", - payload: { - phase: "thread/resume:stale-thread-fallback", - requestedThreadId: options.threadId, - failureKind: failure.failureKind, - message: failure.message, - fallback: "thread/start", - valuesPrinted: false, - }, - }); - threadId = await startThread("thread/start:after-stale-resume"); + if (!isMissingRolloutThreadResumeFailure(failure)) throw error; + throw threadResumeFailure(options.threadId, failure); } } else { threadId = await startThread(); @@ -920,12 +909,27 @@ function normalizeFailure(error: unknown): CodexStdioFailure { return new CodexStdioFailure(classifyMessageFailureKind(message, "backend-protocol-error"), message, "codex-stdio"); } -function isStaleThreadResumeFailure(error: CodexStdioFailure): boolean { +function isMissingRolloutThreadResumeFailure(error: CodexStdioFailure): boolean { if (error.phase !== "response:thread/resume") return false; const text = `${error.message}\n${JSON.stringify(error.details)}`.toLowerCase(); return text.includes("no rollout found for thread id"); } +function threadResumeFailure(threadId: string, error: CodexStdioFailure): CodexStdioFailure { + return new CodexStdioFailure( + "thread-resume-failed", + `codex app-server thread/resume failed for existing thread: ${error.message}`, + "thread/resume", + { + requestedThreadId: threadId, + originalFailureKind: error.failureKind, + originalPhase: error.phase, + originalDetails: redactJson(error.details), + valuesPrinted: false, + }, + ); +} + function classifyCodexErrorRecord(error: JsonRecord, fallback: FailureKind): FailureKind { const parts: string[] = []; if (typeof error.message === "string") parts.push(error.message); @@ -937,6 +941,8 @@ function classifyCodexErrorRecord(error: JsonRecord, fallback: FailureKind): Fai function classifyMessageFailureKind(message: string, fallback: FailureKind): FailureKind { const text = String(message || "").toLowerCase(); + if (/invalid[_ -]?prompt/u.test(text) && /invalid function arguments json string|tool_call_id/u.test(text)) return "provider-invalid-tool-call"; + if (/invalid function arguments json string/u.test(text)) return "provider-invalid-tool-call"; if (/rate.?limit|too many requests|\b429\b/u.test(text)) return "provider-rate-limited"; if (/\b401\b|\b403\b|unauthori[sz]ed|forbidden|invalid api key|authentication|auth failed|oauth|access token/u.test(text)) return "provider-auth-failed"; if (isProviderUnavailableMessage(text)) return "provider-unavailable"; diff --git a/src/common/types.ts b/src/common/types.ts index 4ea38a1..40aa093 100644 --- a/src/common/types.ts +++ b/src/common/types.ts @@ -12,9 +12,11 @@ export type FailureKind = | "backend-spawn-failed" | "backend-json-parse-error" | "backend-response-invalid" + | "thread-resume-failed" | "backend-timeout" | "provider-auth-failed" | "provider-rate-limited" + | "provider-invalid-tool-call" | "provider-unavailable" | "infra-failed" | "cancelled"; diff --git a/src/selftest/cases/30-codex-stdio.ts b/src/selftest/cases/30-codex-stdio.ts index bc46104..c32b165 100644 --- a/src/selftest/cases/30-codex-stdio.ts +++ b/src/selftest/cases/30-codex-stdio.ts @@ -105,15 +105,18 @@ const selfTest: SelfTestCase = async (context) => { env: { CODEX_HOME: context.codexHome, AGENTRUN_FAKE_CODEX_MODE: "resume-no-rollout" }, oneShot: true, }) as JsonRecord; - assert.equal(staleThreadResult.terminalStatus, "completed", "stale thread resume should start a replacement thread and complete the turn"); + assert.equal(staleThreadResult.terminalStatus, "failed", "stale thread resume must fail instead of creating a replacement thread"); + assert.equal(staleThreadResult.failureKind, "thread-resume-failed", "stale thread resume failure kind"); const staleEnvelope = await client.get(`/api/v1/runs/${staleThread.runId}/commands/${staleThread.commandId}/result`) as JsonRecord; - assert.equal(staleEnvelope.terminalStatus, "completed"); - assert.equal(staleEnvelope.completed, true); - assert.equal((staleEnvelope.sessionRef as JsonRecord).threadId, "thread_selftest_1"); + assert.equal(staleEnvelope.terminalStatus, "failed"); + assert.equal(staleEnvelope.failureKind, "thread-resume-failed"); + assert.equal(staleEnvelope.completed, false); + assert.equal((staleEnvelope.sessionRef as JsonRecord).threadId, "thread_missing_rollout"); const staleEvents = await client.get(`/api/v1/runs/${staleThread.runId}/events?afterSeq=0&limit=100`) as { items?: Array<{ type: string; payload: unknown }> }; - assert.equal(staleEvents.items?.some((event) => event.type === "backend_status" && eventPayload(event).phase === "thread/resume:stale-thread-fallback"), true, "stale resume fallback must be visible"); - assert.equal(staleEvents.items?.some((event) => event.type === "backend_status" && eventPayload(event).phase === "thread/start:after-stale-resume:completed"), true, "stale resume must start a replacement thread"); + const stalePhases = (staleEvents.items ?? []).filter((event) => event.type === "backend_status").map((event) => String(eventPayload(event).phase ?? "")); + assert.equal(stalePhases.some((phase) => phase.startsWith("thread/start")), false, "stale resume must not start a replacement thread"); assert.equal(staleEvents.items?.some((event) => event.type === "backend_status" && eventPayload(event).phase === "thread/resume:completed"), false, "stale resume must not be reported as a successful resume"); + assert.equal(staleEvents.items?.some((event) => event.type === "error" && eventPayload(event).failureKind === "thread-resume-failed"), true, "stale resume failure must be visible as an error event"); assertNoSecretLeak({ staleThreadResult, staleEnvelope, staleEvents }); const live = await createRunWithCommand(client, context, "hello live events", "selftest-live-tool-events", 15_000); @@ -173,6 +176,7 @@ const selfTest: SelfTestCase = async (context) => { await runFailureCase({ client, managerUrl: server.baseUrl, context, mode: "missing-turn-result", expectedStatus: "failed", expectedFailureKind: "backend-response-invalid" }); await runFailureCase({ client, managerUrl: server.baseUrl, context, mode: "provider-401-rpc-error", expectedStatus: "failed", expectedFailureKind: "provider-auth-failed" }); await runFailureCase({ client, managerUrl: server.baseUrl, context, mode: "provider-429-terminal", expectedStatus: "failed", expectedFailureKind: "provider-rate-limited" }); + await runFailureCase({ client, managerUrl: server.baseUrl, context, mode: "provider-invalid-tool-call", expectedStatus: "failed", expectedFailureKind: "provider-invalid-tool-call" }); await runFailureCase({ client, managerUrl: server.baseUrl, context, mode: "provider-503-rpc-error", expectedStatus: "failed", expectedFailureKind: "provider-unavailable" }); await runFailureCase({ client, managerUrl: server.baseUrl, context, mode: "provider-503-terminal", expectedStatus: "failed", expectedFailureKind: "provider-unavailable" }); await runFailureCase({ client, managerUrl: server.baseUrl, context, mode: "provider-503-retry-event", expectedStatus: "failed", expectedFailureKind: "provider-unavailable", expectRetryError: true }); @@ -181,7 +185,7 @@ const selfTest: SelfTestCase = async (context) => { await runSecretFailureCase({ client, managerUrl: server.baseUrl, context }); await runSpawnFailureCase({ client, managerUrl: server.baseUrl, context }); - return { name: "codex-stdio", tests: ["runner-lease-heartbeat", "codex-stdio-fake-turn", "codex-stdio-projected-writable-home", "codex-stdio-deepseek-profile-fake-turn", "codex-stdio-minimax-m3-profile-fake-turn", "codex-stdio-deepseek-missing-secret-no-fallback", "codex-stdio-minimax-m3-missing-secret-no-fallback", "codex-stdio-config-model-authoritative", "codex-stdio-explicit-model-forwarded", "codex-stdio-final-agent-message-only", "codex-stdio-stale-thread-fallback", "codex-stdio-live-tool-events", "codex-stdio-noisy-reasoning-suppression", "codex-stdio-missing-turn-result", "codex-stdio-provider-auth-failed", "codex-stdio-provider-rate-limited", "codex-stdio-provider-503-rpc-error", "codex-stdio-provider-503-terminal", "codex-stdio-provider-503-retry-event", "codex-stdio-invalid-json", "codex-stdio-timeout", "codex-stdio-secret-unavailable", "codex-stdio-spawn-failure"] }; + return { name: "codex-stdio", tests: ["runner-lease-heartbeat", "codex-stdio-fake-turn", "codex-stdio-projected-writable-home", "codex-stdio-deepseek-profile-fake-turn", "codex-stdio-minimax-m3-profile-fake-turn", "codex-stdio-deepseek-missing-secret-no-fallback", "codex-stdio-minimax-m3-missing-secret-no-fallback", "codex-stdio-config-model-authoritative", "codex-stdio-explicit-model-forwarded", "codex-stdio-final-agent-message-only", "codex-stdio-stale-thread-resume-fails", "codex-stdio-live-tool-events", "codex-stdio-noisy-reasoning-suppression", "codex-stdio-missing-turn-result", "codex-stdio-provider-auth-failed", "codex-stdio-provider-rate-limited", "codex-stdio-provider-invalid-tool-call", "codex-stdio-provider-503-rpc-error", "codex-stdio-provider-503-terminal", "codex-stdio-provider-503-retry-event", "codex-stdio-invalid-json", "codex-stdio-timeout", "codex-stdio-secret-unavailable", "codex-stdio-spawn-failure"] }; } finally { await new Promise((resolve) => server.server.close(() => resolve())); } @@ -264,7 +268,7 @@ async function createStaleThreadRun(client: ManagerClient, context: SelfTestCont }, traceSink: null, }) as { id: string }; - const command = await client.post(`/api/v1/runs/${run.id}/commands`, { type: "turn", payload: { prompt: "hello stale thread" }, idempotencyKey: "selftest-stale-thread-fallback" }) as { id: string }; + const command = await client.post(`/api/v1/runs/${run.id}/commands`, { type: "turn", payload: { prompt: "hello stale thread" }, idempotencyKey: "selftest-stale-thread-resume-fails" }) as { id: string }; return { runId: run.id, commandId: command.id }; } diff --git a/src/selftest/fake-codex-app-server.ts b/src/selftest/fake-codex-app-server.ts index 10bc5ad..977911d 100644 --- a/src/selftest/fake-codex-app-server.ts +++ b/src/selftest/fake-codex-app-server.ts @@ -100,6 +100,21 @@ for await (const line of rl) { respond(message.id, { turn }); continue; } + if (mode === "provider-invalid-tool-call") { + turnCounter += 1; + const turn = { + id: `turn_selftest_${turnCounter}`, + status: "failed", + error: { + message: "invalid params, invalid function arguments json string, tool_call_id: call_function_selftest_2 (2013)", + code: "invalid_prompt", + }, + }; + notify("turn/started", { turn: { id: turn.id, status: "running" } }); + notify("turn/completed", { turn }); + respond(message.id, { turn }); + continue; + } if (mode === "provider-503-retry-event") { turnCounter += 1; const turn = {