fix: keep codex sessions reusable after turn failures

This commit is contained in:
Codex
2026-06-03 00:28:41 +08:00
parent c51180eef6
commit f69ef55ddc
5 changed files with 79 additions and 11 deletions
+2
View File
@@ -86,6 +86,8 @@ Kubernetes Secret 的创建、轮换和权限控制属于集群密钥管理流
- Adapter 必须在调用 Codex 前验证 `auth.json``config.toml` 均存在且可读;缺失时返回 `secret-unavailable`
- Codex 运行时必须使用被投影的 `.codex` 目录;不得 fallback 到镜像内默认凭据或节点宿主机 `~/.codex`
- Codex stdio backend 不得设置 turn/session/conversation 的总时长 timeout`executionPolicy.timeoutMs` 只能作为无 app-server 响应、无 notification、无 assistant/tool/event activity 的 idle timeout。长程任务只要持续产生可见 activity,就必须继续等待 `turn/completed`、取消或真实 transport failure。
- 普通 turn command 失败只终结当前 command,不得把 reusable run/session 置为 terminal;后续 command 必须仍可进入同一个 run/runner。只有显式 cancel、runner lease/claim 失效、资源装配不可恢复或运行面退出才允许终结 run。
-`config.toml` 指向 hyueapi 或其他 OpenAI-compatible upstreamrunner/backend Pod 的 proxy 与 `NO_PROXY` 必须保持该配置可用;不得在日志中打印完整 auth/config 内容。
- 模型名、provider profile、upstream host 可以作为 redacted metadata 输出;provider credential、token、Authorization header 和文件内容不得输出。
- 一个最短 turn 至少要产生 `backend_status`、一个 assistant 或 error event、以及 `terminal_status`
+20 -7
View File
@@ -428,14 +428,27 @@ async function runCodexStdioTurnWithSession(options: CodexStdioTurnOptions, sess
terminalResolve();
};
options.abortSignal?.addEventListener("abort", abortTurn, { once: true });
const timeout = setTimeout(() => {
const turnIdleTimeoutMs = positiveTimeout(options.timeoutMs);
let idleTimeout: NodeJS.Timeout | null = null;
const refreshTurnActivity = (): void => {
if (terminal) return;
terminal = { status: "failed", failureKind: "backend-timeout", message: `codex stdio turn timed out after ${options.timeoutMs}ms` };
emitEvent({ type: "error", payload: { failureKind: terminal.failureKind, message: terminal.message, phase: "turn:timeout" } });
client?.stop();
terminalResolve();
}, positiveTimeout(options.timeoutMs));
if (idleTimeout) clearTimeout(idleTimeout);
idleTimeout = setTimeout(() => {
if (terminal) return;
terminal = { status: "failed", failureKind: "backend-timeout", message: `codex stdio turn idle timed out after ${turnIdleTimeoutMs}ms without activity` };
emitEvent({ type: "error", payload: { failureKind: terminal.failureKind, message: terminal.message, phase: "turn:idle-timeout" } });
client?.stop();
terminalResolve();
}, turnIdleTimeoutMs);
};
const stopTurnIdleTimeout = (): void => {
if (!idleTimeout) return;
clearTimeout(idleTimeout);
idleTimeout = null;
};
refreshTurnActivity();
const stopNotifications = session.addNotificationHandler((message) => {
refreshTurnActivity();
const normalized = normalizeCodexNotification(message, suppressedNotifications);
if (normalized.threadId) threadId = normalized.threadId;
if (normalized.turnId) turnId = normalized.turnId;
@@ -516,7 +529,7 @@ async function runCodexStdioTurnWithSession(options: CodexStdioTurnOptions, sess
stopActiveTurn?.();
stopNotifications();
options.abortSignal?.removeEventListener("abort", abortTurn);
clearTimeout(timeout);
stopTurnIdleTimeout();
}
if (!terminal) terminal = { status: "failed", failureKind: "backend-response-invalid", message: "codex app-server finished without terminal status" };
if (terminal.status !== "completed") emitEvents(await session.close());
+3 -3
View File
@@ -107,7 +107,7 @@ export async function runOnce(options: RunnerOnceOptions): Promise<JsonRecord> {
}
const result = materializationFailure
? await reportCommandFailure(api, options.runId, command.id, runner, attemptId, materializationFailure, "runner:resource-bundle")
? await reportCommandFailure(api, options.runId, command.id, runner, attemptId, materializationFailure, "runner:resource-bundle", { terminalRun: true })
: await executeCommand(api, withResourceAssembly(options, resourceEnv, initialPrompt), command, runner, attemptId, workspacePath, backendSession ?? (backendSession = createBackendSession(currentRun, withResourceAssembly(options, resourceEnv, initialPrompt))));
commandResults.push(result);
if (options.oneShot === true) {
@@ -359,11 +359,11 @@ function annotateCommandEvent(event: BackendEvent, commandId: string, attemptId:
return { ...event, payload: { ...event.payload, commandId, attemptId, runnerId } };
}
async function reportCommandFailure(api: RunnerManagerApi, runId: string, commandId: string, runner: RunnerRecord, attemptId: string, failure: { terminalStatus: TerminalStatus; failureKind: FailureKind; message: string }, phase: string): Promise<CommandExecutionResult> {
async function reportCommandFailure(api: RunnerManagerApi, runId: string, commandId: string, runner: RunnerRecord, attemptId: string, failure: { terminalStatus: TerminalStatus; failureKind: FailureKind; message: string }, phase: string, options: { terminalRun?: boolean } = {}): Promise<CommandExecutionResult> {
await api.appendEvent(runId, { type: "error", payload: { failureKind: failure.failureKind, message: failure.message, phase, commandId, attemptId, runnerId: runner.id } });
await api.appendEvent(runId, { type: "terminal_status", payload: { terminalStatus: failure.terminalStatus, failureKind: failure.failureKind, message: failure.message, commandId, attemptId, runnerId: runner.id } });
await api.reportCommandStatus(commandId, { terminalStatus: failure.terminalStatus, failureKind: failure.failureKind, failureMessage: failure.message });
await api.reportStatus(runId, { terminalStatus: failure.terminalStatus, failureKind: failure.failureKind, failureMessage: failure.message });
if (options.terminalRun === true) await api.reportStatus(runId, { terminalStatus: failure.terminalStatus, failureKind: failure.failureKind, failureMessage: failure.message });
return { commandId, terminalStatus: failure.terminalStatus, failureKind: failure.failureKind } as CommandExecutionResult;
}
+42 -1
View File
@@ -206,15 +206,56 @@ const selfTest: SelfTestCase = async (context) => {
await runFailureCase({ client, managerUrl: server.baseUrl, context, mode: "provider-503-retry-event", expectedStatus: "failed", expectedFailureKind: "provider-unavailable", expectRetryError: true });
await runFailureCase({ client, managerUrl: server.baseUrl, context, mode: "invalid-json", expectedStatus: "failed", expectedFailureKind: "backend-json-parse-error" });
await runFailureCase({ client, managerUrl: server.baseUrl, context, mode: "missing-terminal", expectedStatus: "failed", expectedFailureKind: "backend-timeout", timeoutMs: 500 });
await runSlowProgressIdleCase({ client, managerUrl: server.baseUrl, context });
await runFailureDoesNotTerminalRunCase({ client, managerUrl: server.baseUrl, context });
await runSecretFailureCase({ client, managerUrl: server.baseUrl, context });
await runSpawnFailureCase({ client, managerUrl: server.baseUrl, context });
return { name: "codex-stdio", tests: ["runner-lease-heartbeat", "codex-stdio-fake-turn", "codex-stdio-projected-writable-home", "codex-stdio-deepseek-profile-fake-turn", "codex-stdio-minimax-m3-profile-fake-turn", "codex-stdio-deepseek-missing-secret-no-fallback", "codex-stdio-minimax-m3-missing-secret-no-fallback", "codex-stdio-config-model-authoritative", "codex-stdio-explicit-model-forwarded", "codex-stdio-final-agent-message-only", "codex-stdio-web-search-progress", "codex-stdio-stale-thread-resume-failed", "codex-stdio-live-tool-events", "codex-stdio-noisy-reasoning-suppression", "codex-stdio-missing-turn-result", "codex-stdio-provider-auth-failed", "codex-stdio-provider-rate-limited", "codex-stdio-provider-invalid-tool-call", "codex-stdio-provider-503-rpc-error", "codex-stdio-provider-503-terminal", "codex-stdio-provider-503-retry-event", "codex-stdio-invalid-json", "codex-stdio-timeout", "codex-stdio-secret-unavailable", "codex-stdio-spawn-failure"] };
return { name: "codex-stdio", tests: ["runner-lease-heartbeat", "codex-stdio-fake-turn", "codex-stdio-projected-writable-home", "codex-stdio-deepseek-profile-fake-turn", "codex-stdio-minimax-m3-profile-fake-turn", "codex-stdio-deepseek-missing-secret-no-fallback", "codex-stdio-minimax-m3-missing-secret-no-fallback", "codex-stdio-config-model-authoritative", "codex-stdio-explicit-model-forwarded", "codex-stdio-final-agent-message-only", "codex-stdio-web-search-progress", "codex-stdio-stale-thread-resume-failed", "codex-stdio-live-tool-events", "codex-stdio-noisy-reasoning-suppression", "codex-stdio-missing-turn-result", "codex-stdio-provider-auth-failed", "codex-stdio-provider-rate-limited", "codex-stdio-provider-invalid-tool-call", "codex-stdio-provider-503-rpc-error", "codex-stdio-provider-503-terminal", "codex-stdio-provider-503-retry-event", "codex-stdio-invalid-json", "codex-stdio-timeout", "codex-stdio-idle-timeout-progress-refresh", "codex-stdio-command-failure-keeps-run-open", "codex-stdio-secret-unavailable", "codex-stdio-spawn-failure"] };
} finally {
await new Promise<void>((resolve) => server.server.close(() => resolve()));
}
};
async function runSlowProgressIdleCase(options: { client: ManagerClient; managerUrl: string; context: SelfTestContext }): Promise<void> {
const item = await createRunWithCommand(options.client, options.context, "slow progress before terminal", "selftest-slow-progress-idle-refresh", 60);
const result = await runOnce({
managerUrl: options.managerUrl,
runId: item.runId,
codexCommand: options.context.fakeCodexCommand,
codexArgs: options.context.fakeCodexArgs,
codexHome: options.context.codexHome,
env: { CODEX_HOME: options.context.codexHome, AGENTRUN_FAKE_CODEX_MODE: "slow-progress-before-terminal" },
oneShot: true,
}) as JsonRecord;
assert.equal(result.terminalStatus, "completed", "activity before idle deadline must refresh the turn idle timeout");
const events = await options.client.get(`/api/v1/runs/${item.runId}/events?afterSeq=0&limit=100`) as { items?: Array<{ type: string; payload: unknown }> };
assert.equal(events.items?.some((event) => event.type === "error" && eventPayload(event).failureKind === "backend-timeout"), false, "progressing turns must not fail on total elapsed time");
}
async function runFailureDoesNotTerminalRunCase(options: { client: ManagerClient; managerUrl: string; context: SelfTestContext }): Promise<void> {
const item = await createRunWithCommand(options.client, options.context, "first command fails", "selftest-command-failure-keeps-run-open", 3_000);
const result = await runOnce({
managerUrl: options.managerUrl,
runId: item.runId,
codexCommand: options.context.fakeCodexCommand,
codexArgs: options.context.fakeCodexArgs,
codexHome: options.context.codexHome,
env: { CODEX_HOME: options.context.codexHome, AGENTRUN_FAKE_CODEX_MODE: "provider-503-terminal" },
idleTimeoutMs: 100,
pollIntervalMs: 25,
}) as JsonRecord;
assert.equal(result.stopped, "idle-timeout", "non one-shot runner should remain alive after a failed command until idle timeout");
assert.equal(result.terminalStatus, "failed");
assert.equal(result.failureKind, "provider-unavailable");
const command = await options.client.get(`/api/v1/runs/${item.runId}/commands/${item.commandId}`) as { state?: string };
assert.equal(command.state, "failed");
const run = await options.client.get(`/api/v1/runs/${item.runId}`) as { status?: string; terminalStatus?: string | null; failureKind?: string | null };
assert.equal(["claimed", "running"].includes(String(run.status)), true, "command failure must keep the reusable run/session non-terminal");
assert.equal(run.terminalStatus, null);
assert.equal(run.failureKind, null);
}
async function runFailureCase(options: { client: ManagerClient; managerUrl: string; context: SelfTestContext; mode: string; expectedStatus: TerminalStatus; expectedFailureKind: FailureKind; timeoutMs?: number; expectRetryError?: boolean }): Promise<void> {
const item = await createRunWithCommand(options.client, options.context, `failure ${options.mode}`, `selftest-${options.mode}`, options.timeoutMs ?? 3_000);
const result = await runOnce({
+12
View File
@@ -85,6 +85,18 @@ for await (const line of rl) {
respond(message.id, { turn });
continue;
}
if (mode === "slow-progress-before-terminal") {
turnCounter += 1;
const turn = { id: `turn_selftest_${turnCounter}`, status: "completed" };
notify("turn/started", { turn: { id: turn.id, status: "running" } });
respond(message.id, { turn: { id: turn.id, status: "running" } });
setTimeout(() => notify("item/agentMessage/delta", { itemId: "msg_slow_progress", delta: "still working" }), 40);
setTimeout(() => {
notify("item/completed", { item: { id: "msg_slow_progress", type: "agentMessage", text: "slow progress final" } });
notify("turn/completed", { turn });
}, 90);
continue;
}
if (mode === "provider-503-terminal") {
turnCounter += 1;
const turn = { id: `turn_selftest_${turnCounter}`, status: "failed", error: { message: "HTTP 503 Service Unavailable" } };