fix: 恢复 stale codex thread (#71)

Co-authored-by: Codex <codex@pikas.tech>
This commit is contained in:
Lyon
2026-06-02 10:48:28 +08:00
committed by GitHub
parent 98f6e420e6
commit e1c0fb5245
3 changed files with 41 additions and 13 deletions
+6 -2
View File
@@ -18,7 +18,7 @@ Codex stdio backend 是 AgentRun `v0.1` 的第一真实 Code Agent backend kind
codex app-server --listen stdio://
```
Adapter 通过 stdin 写入换行分隔 JSON-RPC 请求,通过 stdout 逐行读取 JSON-RPC response 和 notificationstderr 只作为有界诊断日志。最小请求序列是 `initialize``thread/start``thread/resume``turn/start`response 中必须提取 thread/turn identitynotification 和后续输出必须归一化为 `backend_status``assistant_message``tool_call``command_output``error``terminal_status` events。运行中 steer 使用同一 app-server 进程的 `turn/steer` JSON-RPC 方法,参数为 `threadId``expectedTurnId` 和文本 `input` 数组;取消/中断使用 `turn/interrupt`,参数为 `threadId``turnId`
Adapter 通过 stdin 写入换行分隔 JSON-RPC 请求,通过 stdout 逐行读取 JSON-RPC response 和 notificationstderr 只作为有界诊断日志。最小请求序列是 `initialize``thread/start``thread/resume``turn/start`response 中必须提取 thread/turn identitynotification 和后续输出必须归一化为 `backend_status``assistant_message``tool_call``command_output``error``terminal_status` events。运行中 steer 使用同一 app-server 进程的 `turn/steer` JSON-RPC 方法,参数为 `threadId``expectedTurnId` 和文本 `input` 数组;取消/中断使用 `turn/interrupt`,参数为 `threadId``turnId``thread/resume` 返回 `no rollout found for thread id` 时,说明 durable session 指向的 Codex rollout 已不在当前 app-server 中;adapter 必须记录 `thread/resume:stale-thread-fallback`,随后执行 `thread/start` 创建 replacement thread,并由 manager 将新的 `threadId` 回写到 SessionRef。其他 `thread/resume` 错误不得 fallback,仍按 failureKind 失败。
不得把以下路径作为 `v0.1` Codex stdio backend 的正式实现或综合联调通过证据:直接 Responses HTTP 代理、OpenAI SDK wrapper、`codex exec` 一次性命令输出、fake provider、固定文本回复、只读 shortcut 或本地 shell 模拟。裸 HTTP 或 `codex exec --json` 可以作为 provider/upstream 诊断,但最终通过必须来自 app-server stdio turn。
@@ -126,13 +126,17 @@ Run 的 `executionPolicy.secretScope` 应引用与 `backendProfile` 匹配的 pr
阅读本文,然后在真实 `agentrun-v01` 运行面按顺序执行 `backendProfile=codex``backendProfile=deepseek``backendProfile=minimax-m3``backendProfile=codex` 四个最短 turn。确认第二个 run 使用 DeepSeek profile,第三个 run 使用 MiniMax-M3 profile,前后两个 `codex` run 仍使用原 Codex profile;四者的 event、log、backend_status、model/upstream metadata 和 failureKind 不互相污染,且任何一个 profile SecretRef 缺失都不会 fallback 到另一个 profile。
### T7 Stale thread recovery
阅读本文和 [spec-v01-runtime-assembly.md](spec-v01-runtime-assembly.md),然后构造一个带旧 `SessionRef.threadId` 的真实或 fake app-server run,使 `thread/resume` 返回 `no rollout found for thread id`。确认 adapter 不把该缺失 rollout 直接作为终态失败,而是记录 `thread/resume:stale-thread-fallback`,执行 `thread/start`,完成当前 turn,并在 result/sessionRef 中回写新的 `threadId`。确认 provider auth、rate limit、model config 或其他 protocol error 不走该 fallback。
## 规格的实现情况
| 规格项 | 状态 | 说明 |
| --- | --- | --- |
| Codex stdio backend/profile 规格 | 已定义 | 本文为 v0.1 Codex app-server stdio backend kind 和 profile 权威。 |
| Codex Secret projection | 已实现/已通过主闭环 | runner Job 使用只读 Secret projection 和 writable `CODEX_HOME`Codex 测试凭据来自 `agentrun-v01-provider-codex``auth.json`/`config.toml`。 |
| Codex adapter | 已实现/已通过主闭环 | 当前代码已实现受控 `codex app-server --listen stdio://``initialize`/`thread/start`/`thread/resume`/`turn/start` response 校验、stderr 有界诊断、spawn/JSON parse/response invalid/timeout/provider 5xx availability failureKind,以及包含 retry error notification 的 fake app-server 自测试。 |
| Codex adapter | 已实现/已通过主闭环 | 当前代码已实现受控 `codex app-server --listen stdio://``initialize`/`thread/start`/`thread/resume`/`turn/start` response 校验、stale rollout thread fallback、stderr 有界诊断、spawn/JSON parse/response invalid/timeout/provider 5xx availability failureKind,以及包含 retry error notification 的 fake app-server 自测试。 |
| 错误可观测与脱敏 | 已实现主路径 | child env、cwd、workspace 和 Codex home 只输出摘要;stderr tail 有界且标记截断;事件和 failure 统一走 redaction。 |
| 真实 provider turn | 已通过主闭环 | 真实 Codex provider turn 已经通过 RESTful API 和 CLI 综合联调;每次发布仍按 [spec-v01-validation.md](spec-v01-validation.md) 手动复验。 |
| `deepseek` profile | 已实现/已通过主闭环 | 代码已支持 `agentrun-v01-provider-deepseek`、独立 `CODEX_HOME`、同一 `codex app-server --listen stdio://` 协议和 profile metadata;真实 Kubernetes SecretRef、runner Job 和 Codex stdio turn 已通过主闭环。 |
+26 -3
View File
@@ -446,9 +446,26 @@ async function runCodexStdioTurnWithSession(options: CodexStdioTurnOptions, sess
};
if (options.threadId) {
const threadResponse = requireResponseRecord(await client.request("thread/resume", withOptionalModel({ threadId: options.threadId, cwd: options.cwd, approvalPolicy: options.approvalPolicy, sandbox: options.sandbox }, options.model), requestTimeoutMs), "thread/resume");
threadId = requireNestedId(threadResponse, "thread/resume", "thread");
emitEvent({ type: "backend_status", payload: { phase: "thread/resume:completed", threadId } });
try {
const threadResponse = requireResponseRecord(await client.request("thread/resume", withOptionalModel({ threadId: options.threadId, cwd: options.cwd, approvalPolicy: options.approvalPolicy, sandbox: options.sandbox }, options.model), requestTimeoutMs), "thread/resume");
threadId = requireNestedId(threadResponse, "thread/resume", "thread");
emitEvent({ type: "backend_status", payload: { phase: "thread/resume:completed", threadId } });
} catch (error) {
const failure = normalizeFailure(error);
if (!isStaleThreadResumeFailure(failure)) throw error;
emitEvent({
type: "backend_status",
payload: {
phase: "thread/resume:stale-thread-fallback",
requestedThreadId: options.threadId,
failureKind: failure.failureKind,
message: failure.message,
fallback: "thread/start",
valuesPrinted: false,
},
});
threadId = await startThread("thread/start:after-stale-resume");
}
} else {
threadId = await startThread();
}
@@ -901,6 +918,12 @@ function normalizeFailure(error: unknown): CodexStdioFailure {
return new CodexStdioFailure(classifyMessageFailureKind(message, "backend-protocol-error"), message, "codex-stdio");
}
function isStaleThreadResumeFailure(error: CodexStdioFailure): boolean {
if (error.phase !== "response:thread/resume") return false;
const text = `${error.message}\n${JSON.stringify(error.details)}`.toLowerCase();
return text.includes("no rollout found for thread id");
}
function classifyCodexErrorRecord(error: JsonRecord, fallback: FailureKind): FailureKind {
const parts: string[] = [];
if (typeof error.message === "string") parts.push(error.message);
+9 -8
View File
@@ -104,14 +104,15 @@ const selfTest: SelfTestCase = async (context) => {
env: { CODEX_HOME: context.codexHome, AGENTRUN_FAKE_CODEX_MODE: "resume-no-rollout" },
oneShot: true,
}) as JsonRecord;
assert.equal(staleThreadResult.terminalStatus, "failed", "standard thread resume failure must not start a replacement thread");
assert.equal(staleThreadResult.failureKind, "backend-protocol-error");
assert.equal(staleThreadResult.terminalStatus, "completed", "stale thread resume should start a replacement thread and complete the turn");
const staleEnvelope = await client.get(`/api/v1/runs/${staleThread.runId}/commands/${staleThread.commandId}/result`) as JsonRecord;
assert.equal(staleEnvelope.terminalStatus, "failed");
assert.equal(staleEnvelope.completed, false);
assert.equal((staleEnvelope.sessionRef as JsonRecord).threadId, "thread_missing_rollout");
assert.equal(staleEnvelope.terminalStatus, "completed");
assert.equal(staleEnvelope.completed, true);
assert.equal((staleEnvelope.sessionRef as JsonRecord).threadId, "thread_selftest_1");
const staleEvents = await client.get(`/api/v1/runs/${staleThread.runId}/events?afterSeq=0&limit=100`) as { items?: Array<{ type: string; payload: unknown }> };
assert.equal(staleEvents.items?.some((event) => event.type === "backend_status" && eventPayload(event).phase === "thread/start:completed"), false, "stale standard thread must not create a replacement thread");
assert.equal(staleEvents.items?.some((event) => event.type === "backend_status" && eventPayload(event).phase === "thread/resume:stale-thread-fallback"), true, "stale resume fallback must be visible");
assert.equal(staleEvents.items?.some((event) => event.type === "backend_status" && eventPayload(event).phase === "thread/start:after-stale-resume:completed"), true, "stale resume must start a replacement thread");
assert.equal(staleEvents.items?.some((event) => event.type === "backend_status" && eventPayload(event).phase === "thread/resume:completed"), false, "stale resume must not be reported as a successful resume");
assertNoSecretLeak({ staleThreadResult, staleEnvelope, staleEvents });
const live = await createRunWithCommand(client, context, "hello live events", "selftest-live-tool-events", 15_000);
@@ -164,7 +165,7 @@ const selfTest: SelfTestCase = async (context) => {
await runSecretFailureCase({ client, managerUrl: server.baseUrl, context });
await runSpawnFailureCase({ client, managerUrl: server.baseUrl, context });
return { name: "codex-stdio", tests: ["runner-lease-heartbeat", "codex-stdio-fake-turn", "codex-stdio-projected-writable-home", "codex-stdio-deepseek-profile-fake-turn", "codex-stdio-minimax-m3-profile-fake-turn", "codex-stdio-deepseek-missing-secret-no-fallback", "codex-stdio-minimax-m3-missing-secret-no-fallback", "codex-stdio-config-model-authoritative", "codex-stdio-explicit-model-forwarded", "codex-stdio-final-agent-message-only", "codex-stdio-stale-thread-fails", "codex-stdio-live-tool-events", "codex-stdio-noisy-reasoning-suppression", "codex-stdio-missing-turn-result", "codex-stdio-provider-auth-failed", "codex-stdio-provider-rate-limited", "codex-stdio-provider-503-rpc-error", "codex-stdio-provider-503-terminal", "codex-stdio-provider-503-retry-event", "codex-stdio-invalid-json", "codex-stdio-timeout", "codex-stdio-secret-unavailable", "codex-stdio-spawn-failure"] };
return { name: "codex-stdio", tests: ["runner-lease-heartbeat", "codex-stdio-fake-turn", "codex-stdio-projected-writable-home", "codex-stdio-deepseek-profile-fake-turn", "codex-stdio-minimax-m3-profile-fake-turn", "codex-stdio-deepseek-missing-secret-no-fallback", "codex-stdio-minimax-m3-missing-secret-no-fallback", "codex-stdio-config-model-authoritative", "codex-stdio-explicit-model-forwarded", "codex-stdio-final-agent-message-only", "codex-stdio-stale-thread-fallback", "codex-stdio-live-tool-events", "codex-stdio-noisy-reasoning-suppression", "codex-stdio-missing-turn-result", "codex-stdio-provider-auth-failed", "codex-stdio-provider-rate-limited", "codex-stdio-provider-503-rpc-error", "codex-stdio-provider-503-terminal", "codex-stdio-provider-503-retry-event", "codex-stdio-invalid-json", "codex-stdio-timeout", "codex-stdio-secret-unavailable", "codex-stdio-spawn-failure"] };
} finally {
await new Promise<void>((resolve) => server.server.close(() => resolve()));
}
@@ -236,7 +237,7 @@ async function createStaleThreadRun(client: ManagerClient, context: SelfTestCont
},
traceSink: null,
}) as { id: string };
const command = await client.post(`/api/v1/runs/${run.id}/commands`, { type: "turn", payload: { prompt: "hello stale thread" }, idempotencyKey: "selftest-stale-thread-fails" }) as { id: string };
const command = await client.post(`/api/v1/runs/${run.id}/commands`, { type: "turn", payload: { prompt: "hello stale thread" }, idempotencyKey: "selftest-stale-thread-fallback" }) as { id: string };
return { runId: run.id, commandId: command.id };
}