fix: fail stale codex thread resume

This commit is contained in:
Codex
2026-06-02 16:22:27 +08:00
parent d49b958649
commit e9843ab687
4 changed files with 16 additions and 36 deletions
+1 -1
View File
@@ -86,7 +86,7 @@ Adapter 必须把 backend 错误映射为稳定 failureKind
| `backend-protocol-error` | backend 输出无法解析、协议字段缺失。 |
| `backend-json-parse-error` | backend stdout 不是合法 JSON-RPC 行。 |
| `backend-response-invalid` | backend JSON-RPC response/terminal notification 缺少必需字段。 |
| `thread-resume-failed` | `thread/resume` 遇到不可归类为旧 rollout 缺失的协议错误,或 replacement `thread/start` 也失败;已有 `SessionRef.threadId` 指向的 Codex rollout 不存在时应先按 `thread/resume:non-resumable` 记录并创建 replacement thread,不再直接终止用户 turn。 |
| `thread-resume-failed` | `thread/resume` 遇到任何协议错误、旧 rollout 缺失`no rollout found for thread id`已有 `SessionRef.threadId` 时只能按 Codex stdio 原生 session 执行 `thread/resume` 后接 `turn/start`;resume 失败必须终止当前 turn,不得再启动替代 `thread/start`、拼接历史 prompt 或用其他上下文冒充继续会话。 |
| `backend-spawn-failed` | backend app-server 进程无法启动。 |
| `backend-failed` | backend 进程非零退出或 terminal error。 |
| `backend-timeout` | executionPolicy timeout 触发。 |
+4 -4
View File
@@ -18,7 +18,7 @@ Codex stdio backend 是 AgentRun `v0.1` 的第一真实 Code Agent backend kind
codex app-server --listen stdio://
```
Adapter 通过 stdin 写入换行分隔 JSON-RPC 请求,通过 stdout 逐行读取 JSON-RPC response 和 notificationstderr 只作为有界诊断日志。最小请求序列是 `initialize``thread/start``thread/resume``turn/start`response 中必须提取 thread/turn identitynotification 和后续输出必须归一化为 `backend_status``assistant_message``tool_call``command_output``error``terminal_status` events。运行中 steer 使用同一 app-server 进程的 `turn/steer` JSON-RPC 方法,参数为 `threadId``expectedTurnId` 和文本 `input` 数组;取消/中断使用 `turn/interrupt`,参数为 `threadId``turnId`已有 `SessionRef.threadId` `thread/resume` 返回 `no rollout found for thread id` 时,说明 durable session 指向的 Codex rollout 已不在当前 app-server 中;adapter 必须把该状态归一为可恢复的 `thread/resume:non-resumable` backend_status,记录 requestedThreadId、originalFailureKind、valuesPrinted=false,然后执行 replacement `thread/start` 并继续当前 `turn/start`。replacement turn 成功后必须以新的 threadId 回写 command/run/sessionRef,后续轮次复用 replacement thread。adapter 不得把旧 rollout 缺失伪装成成功 resumeprovider auth、rate limit、model config 或其他 protocol error 继续按各自 failureKind 失败,不走 replacement
Adapter 通过 stdin 写入换行分隔 JSON-RPC 请求,通过 stdout 逐行读取 JSON-RPC response 和 notificationstderr 只作为有界诊断日志。最小请求序列是 `initialize``thread/start``thread/resume``turn/start`response 中必须提取 thread/turn identitynotification 和后续输出必须归一化为 `backend_status``assistant_message``tool_call``command_output``error``terminal_status` events。运行中 steer 使用同一 app-server 进程的 `turn/steer` JSON-RPC 方法,参数为 `threadId``expectedTurnId` 和文本 `input` 数组;取消/中断使用 `turn/interrupt`,参数为 `threadId``turnId`。已有 `SessionRef.threadId` 时只能执行 Codex stdio 原生 `thread/resume` 后接 `turn/start`;当 `thread/resume` 返回 `no rollout found for thread id` 或任何其他协议错误时,adapter 必须输出 `thread-resume-failed` 并终止当前 turn。adapter 不得启动替代 `thread/start`、拼接历史 prompt、回写新 threadId其他上下文模拟继续会话
不得把以下路径作为 `v0.1` Codex stdio backend 的正式实现或综合联调通过证据:直接 Responses HTTP 代理、OpenAI SDK wrapper、`codex exec` 一次性命令输出、fake provider、固定文本回复、只读 shortcut 或本地 shell 模拟。裸 HTTP 或 `codex exec --json` 可以作为 provider/upstream 诊断,但最终通过必须来自 app-server stdio turn。
@@ -126,9 +126,9 @@ Run 的 `executionPolicy.secretScope` 应引用与 `backendProfile` 匹配的 pr
阅读本文,然后在真实 `agentrun-v01` 运行面按顺序执行 `backendProfile=codex``backendProfile=deepseek``backendProfile=minimax-m3``backendProfile=codex` 四个最短 turn。确认第二个 run 使用 DeepSeek profile,第三个 run 使用 MiniMax-M3 profile,前后两个 `codex` run 仍使用原 Codex profile;四者的 event、log、backend_status、model/upstream metadata 和 failureKind 不互相污染,且任何一个 profile SecretRef 缺失都不会 fallback 到另一个 profile。
### T7 Stale thread replacement
### T7 Stale thread resume failed
阅读本文和 [spec-v01-runtime-assembly.md](spec-v01-runtime-assembly.md),然后构造一个带旧 `SessionRef.threadId` 的真实或 fake app-server run,使 `thread/resume` 返回 `no rollout found for thread id`。确认 adapter 记录 `thread/resume:non-resumable`,随后启动 replacement `thread/start`完成当前 turnresult/sessionRef 必须更新为 replacement threadId,不再让用户轮次以 `thread-resume-failed` 终止。确认 provider auth、rate limit、model config 或其他 protocol error 仍按各自 failureKind 直接失败,不走 replacement
阅读本文和 [spec-v01-runtime-assembly.md](spec-v01-runtime-assembly.md),然后构造一个带旧 `SessionRef.threadId` 的真实或 fake app-server run,使 `thread/resume` 返回 `no rollout found for thread id`。确认 adapter 输出 `thread-resume-failed`终止当前 turnevents/result/sessionRef 不得出现 `thread/resume:non-resumable`、替代 `thread/start`、新 threadId 回写或历史 prompt 拼接。确认 provider auth、rate limit、model config 或其他 protocol error 仍按各自 failureKind 直接失败,不走替代路径
## 规格的实现情况
@@ -136,7 +136,7 @@ Run 的 `executionPolicy.secretScope` 应引用与 `backendProfile` 匹配的 pr
| --- | --- | --- |
| Codex stdio backend/profile 规格 | 已定义 | 本文为 v0.1 Codex app-server stdio backend kind 和 profile 权威。 |
| Codex Secret projection | 已实现/已通过主闭环 | runner Job 使用只读 Secret projection 和 writable `CODEX_HOME`Codex 测试凭据来自 `agentrun-v01-provider-codex``auth.json`/`config.toml`。 |
| Codex adapter | 已实现/已通过主闭环 | 当前代码已实现受控 `codex app-server --listen stdio://``initialize`/`thread/start`/`thread/resume`/`turn/start` response 校验、stale rollout thread replacement、stderr 有界诊断、spawn/JSON parse/response invalid/timeout/provider 5xx/invalid tool-call availability failureKind,以及包含 retry error notification 的 fake app-server 自测试。 |
| Codex adapter | 已实现/已通过主闭环 | 当前代码已实现受控 `codex app-server --listen stdio://``initialize`/`thread/start`/`thread/resume`/`turn/start` response 校验、stale rollout `thread-resume-failed`、stderr 有界诊断、spawn/JSON parse/response invalid/timeout/provider 5xx/invalid tool-call availability failureKind,以及包含 retry error notification 的 fake app-server 自测试。 |
| 错误可观测与脱敏 | 已实现主路径 | child env、cwd、workspace 和 Codex home 只输出摘要;stderr tail 有界且标记截断;事件和 failure 统一走 redaction。 |
| 真实 provider turn | 已通过主闭环 | 真实 Codex provider turn 已经通过 RESTful API 和 CLI 综合联调;每次发布仍按 [spec-v01-validation.md](spec-v01-validation.md) 手动复验。 |
| `deepseek` profile | 已实现/已通过主闭环 | 代码已支持 `agentrun-v01-provider-deepseek`、独立 `CODEX_HOME`、同一 `codex app-server --listen stdio://` 协议和 profile metadata;真实 Kubernetes SecretRef、runner Job 和 Codex stdio turn 已通过主闭环。 |
+1 -21
View File
@@ -452,9 +452,7 @@ async function runCodexStdioTurnWithSession(options: CodexStdioTurnOptions, sess
emitEvent({ type: "backend_status", payload: { phase: "thread/resume:completed", threadId } });
} catch (error) {
const failure = normalizeFailure(error);
if (!isMissingRolloutThreadResumeFailure(failure)) throw error;
emitEvent({ type: "backend_status", payload: threadResumeNonResumablePayload(options.threadId, failure) });
threadId = await startThread("thread/replacement-start");
throw threadResumeFailure(options.threadId, failure);
}
} else {
threadId = await startThread();
@@ -910,12 +908,6 @@ function normalizeFailure(error: unknown): CodexStdioFailure {
return new CodexStdioFailure(classifyMessageFailureKind(message, "backend-protocol-error"), message, "codex-stdio");
}
function isMissingRolloutThreadResumeFailure(error: CodexStdioFailure): boolean {
if (error.phase !== "response:thread/resume") return false;
const text = `${error.message}\n${JSON.stringify(error.details)}`.toLowerCase();
return text.includes("no rollout found for thread id");
}
function threadResumeFailure(threadId: string, error: CodexStdioFailure): CodexStdioFailure {
return new CodexStdioFailure(
"thread-resume-failed",
@@ -931,18 +923,6 @@ function threadResumeFailure(threadId: string, error: CodexStdioFailure): CodexS
);
}
function threadResumeNonResumablePayload(threadId: string, error: CodexStdioFailure): JsonRecord {
return {
phase: "thread/resume:non-resumable",
requestedThreadId: threadId,
originalFailureKind: error.failureKind,
originalPhase: error.phase,
originalDetails: redactJson(error.details),
replacement: "thread/start",
valuesPrinted: false,
};
}
function classifyCodexErrorRecord(error: JsonRecord, fallback: FailureKind): FailureKind {
const parts: string[] = [];
if (typeof error.message === "string") parts.push(error.message);
+10 -10
View File
@@ -105,19 +105,19 @@ const selfTest: SelfTestCase = async (context) => {
env: { CODEX_HOME: context.codexHome, AGENTRUN_FAKE_CODEX_MODE: "resume-no-rollout" },
oneShot: true,
}) as JsonRecord;
assert.equal(staleThreadResult.terminalStatus, "completed", "stale thread resume should create a replacement thread and continue the turn");
assert.equal(staleThreadResult.failureKind, null, "replacement stale thread turn should not expose thread-resume-failed as terminal failure");
assert.equal(staleThreadResult.terminalStatus, "failed", "stale thread resume must fail instead of starting a replacement thread");
assert.equal(staleThreadResult.failureKind, "thread-resume-failed", "stale thread resume must expose thread-resume-failed as the terminal failure");
const staleEnvelope = await client.get(`/api/v1/runs/${staleThread.runId}/commands/${staleThread.commandId}/result`) as JsonRecord;
assert.equal(staleEnvelope.terminalStatus, "completed");
assert.equal(staleEnvelope.failureKind, null);
assert.equal(staleEnvelope.completed, true);
assert.equal((staleEnvelope.sessionRef as JsonRecord).threadId, "thread_selftest_1");
assert.equal(staleEnvelope.terminalStatus, "failed");
assert.equal(staleEnvelope.failureKind, "thread-resume-failed");
assert.equal(staleEnvelope.completed, false);
assert.equal((staleEnvelope.sessionRef as JsonRecord).threadId, "thread_missing_rollout");
const staleEvents = await client.get(`/api/v1/runs/${staleThread.runId}/events?afterSeq=0&limit=100`) as { items?: Array<{ type: string; payload: unknown }> };
const stalePhases = (staleEvents.items ?? []).filter((event) => event.type === "backend_status").map((event) => String(eventPayload(event).phase ?? ""));
assert.equal(staleEvents.items?.some((event) => event.type === "backend_status" && eventPayload(event).phase === "thread/resume:non-resumable"), true, "stale resume must be visible as non-resumable before replacement");
assert.equal(stalePhases.some((phase) => phase === "thread/replacement-start:completed"), true, "stale resume must start a replacement thread");
assert.equal(staleEvents.items?.some((event) => event.type === "backend_status" && eventPayload(event).phase === "thread/resume:non-resumable"), false, "stale resume must not be converted into a replacement path");
assert.equal(stalePhases.some((phase) => phase === "thread/replacement-start:completed"), false, "stale resume must not start a replacement thread");
assert.equal(staleEvents.items?.some((event) => event.type === "backend_status" && eventPayload(event).phase === "thread/resume:completed"), false, "stale resume must not be reported as a successful resume");
assert.equal(staleEvents.items?.some((event) => event.type === "error" && eventPayload(event).failureKind === "thread-resume-failed"), false, "replacement stale resume must not surface terminal thread-resume-failed error");
assert.equal(staleEvents.items?.some((event) => event.type === "error" && eventPayload(event).failureKind === "thread-resume-failed"), true, "stale resume must surface terminal thread-resume-failed error");
assertNoSecretLeak({ staleThreadResult, staleEnvelope, staleEvents });
const live = await createRunWithCommand(client, context, "hello live events", "selftest-live-tool-events", 15_000);
@@ -186,7 +186,7 @@ const selfTest: SelfTestCase = async (context) => {
await runSecretFailureCase({ client, managerUrl: server.baseUrl, context });
await runSpawnFailureCase({ client, managerUrl: server.baseUrl, context });
return { name: "codex-stdio", tests: ["runner-lease-heartbeat", "codex-stdio-fake-turn", "codex-stdio-projected-writable-home", "codex-stdio-deepseek-profile-fake-turn", "codex-stdio-minimax-m3-profile-fake-turn", "codex-stdio-deepseek-missing-secret-no-fallback", "codex-stdio-minimax-m3-missing-secret-no-fallback", "codex-stdio-config-model-authoritative", "codex-stdio-explicit-model-forwarded", "codex-stdio-final-agent-message-only", "codex-stdio-stale-thread-replacement", "codex-stdio-live-tool-events", "codex-stdio-noisy-reasoning-suppression", "codex-stdio-missing-turn-result", "codex-stdio-provider-auth-failed", "codex-stdio-provider-rate-limited", "codex-stdio-provider-invalid-tool-call", "codex-stdio-provider-503-rpc-error", "codex-stdio-provider-503-terminal", "codex-stdio-provider-503-retry-event", "codex-stdio-invalid-json", "codex-stdio-timeout", "codex-stdio-secret-unavailable", "codex-stdio-spawn-failure"] };
return { name: "codex-stdio", tests: ["runner-lease-heartbeat", "codex-stdio-fake-turn", "codex-stdio-projected-writable-home", "codex-stdio-deepseek-profile-fake-turn", "codex-stdio-minimax-m3-profile-fake-turn", "codex-stdio-deepseek-missing-secret-no-fallback", "codex-stdio-minimax-m3-missing-secret-no-fallback", "codex-stdio-config-model-authoritative", "codex-stdio-explicit-model-forwarded", "codex-stdio-final-agent-message-only", "codex-stdio-stale-thread-resume-failed", "codex-stdio-live-tool-events", "codex-stdio-noisy-reasoning-suppression", "codex-stdio-missing-turn-result", "codex-stdio-provider-auth-failed", "codex-stdio-provider-rate-limited", "codex-stdio-provider-invalid-tool-call", "codex-stdio-provider-503-rpc-error", "codex-stdio-provider-503-terminal", "codex-stdio-provider-503-retry-event", "codex-stdio-invalid-json", "codex-stdio-timeout", "codex-stdio-secret-unavailable", "codex-stdio-spawn-failure"] };
} finally {
await new Promise<void>((resolve) => server.server.close(() => resolve()));
}