feat(v0.1): codex-stdio emit codex-rollout-storage-mounted + session-store-evicted upgrade

PR C 收尾:codex-stdio.ts 加 observability + new failureKind 升级路径

- 启动时读 env(不是 process.env)发出 codex-rollout-storage-mounted 事件:
  pvcName / pvcNamespace / mountPath / codexRolloutSubdir / valuesPrinted=false
- thread/resume 失败 + 'no rollout found for thread id' 消息 + AGENTRUN_SESSION_PVC_NAME
  已设 → 升级为 session-store-evicted,区别于 thread-resume-failed
- isNoRolloutFoundMessage helper 隔离匹配逻辑
- 4 新 selftest case:
  codex-stdio-session-storage-mounted(事件存在 + 字段对齐)
  codex-stdio-session-storage-evicted(failureKind 升级)
  codex-stdio-session-storage-subdir(AGENTRUN_CODEX_ROLLOUT_SUBDIR 配置生效)
  codex-stdio-session-storage-no-secret-leak(事件不泄露)

PR C 全部完成:runner Job 直接挂载 PVC + codex-stdio observability +
session-store-evicted 升级 + 5 新 selftest(1 runner + 4 codex)
This commit is contained in:
Codex
2026-06-03 20:38:11 +08:00
parent f08a4e75cd
commit 7ccea67391
2 changed files with 125 additions and 0 deletions
+19
View File
@@ -478,6 +478,13 @@ async function runCodexStdioTurnWithSession(options: CodexStdioTurnOptions, sess
};
const willResumeThread = Boolean(options.threadId);
const sessionPvcName = env.AGENTRUN_SESSION_PVC_NAME?.trim() || null;
const sessionPvcNamespace = env.AGENTRUN_SESSION_PVC_NAMESPACE?.trim() || null;
const sessionPvcMountPath = env.AGENTRUN_SESSION_PVC_MOUNT_PATH?.trim() || null;
const codexRolloutSubdirEnv = env.AGENTRUN_CODEX_ROLLOUT_SUBDIR?.trim() || null;
if (sessionPvcName && sessionPvcNamespace && sessionPvcMountPath) {
emitEvent({ type: "backend_status", payload: { phase: "codex-rollout-storage-mounted", pvcName: sessionPvcName, pvcNamespace: sessionPvcNamespace, mountPath: sessionPvcMountPath, codexRolloutSubdir: codexRolloutSubdirEnv ?? "sessions", valuesPrinted: false } });
}
if (options.threadId) {
try {
const threadResponse = requireResponseRecord(await client.request("thread/resume", withOptionalModel({ threadId: options.threadId, cwd: options.cwd, approvalPolicy: options.approvalPolicy, sandbox: options.sandbox }, options.model), requestTimeoutMs), "thread/resume");
@@ -485,6 +492,14 @@ async function runCodexStdioTurnWithSession(options: CodexStdioTurnOptions, sess
emitEvent({ type: "backend_status", payload: { phase: "thread/resume:completed", threadId } });
} catch (error) {
const failure = normalizeFailure(error);
if (sessionPvcName && isNoRolloutFoundMessage(failure.message)) {
throw new CodexStdioFailure(
"session-store-evicted",
`codex app-server thread/resume reported no rollout found for PVC-backed session; session storage was likely evicted`,
"thread/resume",
{ requestedThreadId: options.threadId, pvcName: sessionPvcName, pvcNamespace: sessionPvcNamespace, pvcMountPath: sessionPvcMountPath, originalFailureKind: failure.failureKind, originalPhase: failure.phase, originalDetails: redactJson(failure.details), valuesPrinted: false },
);
}
throw threadResumeFailure(options.threadId, failure);
}
} else {
@@ -1033,6 +1048,10 @@ function threadResumeFailure(threadId: string, error: CodexStdioFailure): CodexS
);
}
function isNoRolloutFoundMessage(message: string): boolean {
return /no rollout found for thread id/i.test(message);
}
function classifyCodexErrorRecord(error: JsonRecord, fallback: FailureKind): FailureKind {
const parts: string[] = [];
if (typeof error.message === "string") parts.push(error.message);
+106
View File
@@ -210,6 +210,10 @@ const selfTest: SelfTestCase = async (context) => {
await runFailureDoesNotTerminalRunCase({ client, managerUrl: server.baseUrl, context });
await runSecretFailureCase({ client, managerUrl: server.baseUrl, context });
await runSpawnFailureCase({ client, managerUrl: server.baseUrl, context });
await runSessionStorageMountedCase({ client, managerUrl: server.baseUrl, context });
await runSessionStorageEvictedCase({ client, managerUrl: server.baseUrl, context });
await runSessionStorageSubdirCase({ client, managerUrl: server.baseUrl, context });
await runSessionStorageNoSecretLeakCase({ client, managerUrl: server.baseUrl, context });
return { name: "codex-stdio", tests: ["runner-lease-heartbeat", "codex-stdio-fake-turn", "codex-stdio-projected-writable-home", "codex-stdio-deepseek-profile-fake-turn", "codex-stdio-minimax-m3-profile-fake-turn", "codex-stdio-deepseek-missing-secret-no-fallback", "codex-stdio-minimax-m3-missing-secret-no-fallback", "codex-stdio-config-model-authoritative", "codex-stdio-explicit-model-forwarded", "codex-stdio-final-agent-message-only", "codex-stdio-web-search-progress", "codex-stdio-stale-thread-resume-failed", "codex-stdio-live-tool-events", "codex-stdio-noisy-reasoning-suppression", "codex-stdio-missing-turn-result", "codex-stdio-provider-auth-failed", "codex-stdio-provider-rate-limited", "codex-stdio-provider-invalid-tool-call", "codex-stdio-provider-503-rpc-error", "codex-stdio-provider-503-terminal", "codex-stdio-provider-503-retry-event", "codex-stdio-invalid-json", "codex-stdio-timeout", "codex-stdio-idle-timeout-progress-refresh", "codex-stdio-command-failure-keeps-run-open", "codex-stdio-secret-unavailable", "codex-stdio-spawn-failure"] };
} finally {
@@ -372,4 +376,106 @@ async function runSpawnFailureCase(options: { client: ManagerClient; managerUrl:
assertNoSecretLeak(events);
}
async function runSessionStorageMountedCase(options: { client: ManagerClient; managerUrl: string; context: SelfTestContext }): Promise<void> {
const item = await createRunWithCommand(options.client, options.context, "session storage mounted", "selftest-session-storage-mounted", 3_000);
const result = await runOnce({
managerUrl: options.managerUrl,
runId: item.runId,
codexCommand: options.context.fakeCodexCommand,
codexArgs: options.context.fakeCodexArgs,
codexHome: options.context.codexHome,
env: {
CODEX_HOME: options.context.codexHome,
AGENTRUN_SESSION_PVC_NAME: "agentrun-v01-session-selftest-mounted",
AGENTRUN_SESSION_PVC_NAMESPACE: "agentrun-v01",
AGENTRUN_SESSION_PVC_MOUNT_PATH: "/home/agentrun/.codex-codex/sessions",
AGENTRUN_CODEX_ROLLOUT_SUBDIR: "sessions",
},
oneShot: true,
}) as JsonRecord;
assert.equal(result.terminalStatus, "completed");
const events = await options.client.get(`/api/v1/runs/${item.runId}/events?afterSeq=0&limit=100`) as { items?: Array<{ type: string; payload: unknown }> };
const mounted = (events.items ?? []).find((event) => event.type === "backend_status" && eventPayload(event).phase === "codex-rollout-storage-mounted");
assert.ok(mounted, "codex-rollout-storage-mounted event must be emitted when AGENTRUN_SESSION_PVC_NAME is set");
const payload = eventPayload(mounted);
assert.equal(payload.pvcName, "agentrun-v01-session-selftest-mounted");
assert.equal(payload.pvcNamespace, "agentrun-v01");
assert.equal(payload.mountPath, "/home/agentrun/.codex-codex/sessions");
assert.equal(payload.codexRolloutSubdir, "sessions");
assert.equal(payload.valuesPrinted, false);
}
async function runSessionStorageEvictedCase(options: { client: ManagerClient; managerUrl: string; context: SelfTestContext }): Promise<void> {
const stale = await createStaleThreadRun(options.client, options.context);
const result = await runOnce({
managerUrl: options.managerUrl,
runId: stale.runId,
commandId: stale.commandId,
codexCommand: options.context.fakeCodexCommand,
codexArgs: options.context.fakeCodexArgs,
codexHome: options.context.codexHome,
env: {
CODEX_HOME: options.context.codexHome,
AGENTRUN_FAKE_CODEX_MODE: "resume-no-rollout",
AGENTRUN_SESSION_PVC_NAME: "agentrun-v01-session-selftest-evicted",
AGENTRUN_SESSION_PVC_NAMESPACE: "agentrun-v01",
AGENTRUN_SESSION_PVC_MOUNT_PATH: "/home/agentrun/.codex-codex/sessions",
AGENTRUN_CODEX_ROLLOUT_SUBDIR: "sessions",
},
oneShot: true,
}) as JsonRecord;
assert.equal(result.terminalStatus, "failed");
assert.equal(result.failureKind, "session-store-evicted", "with PVC env set, no rollout found must be classified as session-store-evicted");
}
async function runSessionStorageSubdirCase(options: { client: ManagerClient; managerUrl: string; context: SelfTestContext }): Promise<void> {
const item = await createRunWithCommand(options.client, options.context, "session storage subdir", "selftest-session-storage-subdir", 3_000);
await runOnce({
managerUrl: options.managerUrl,
runId: item.runId,
codexCommand: options.context.fakeCodexCommand,
codexArgs: options.context.fakeCodexArgs,
codexHome: options.context.codexHome,
env: {
CODEX_HOME: options.context.codexHome,
AGENTRUN_SESSION_PVC_NAME: "agentrun-v01-session-selftest-subdir",
AGENTRUN_SESSION_PVC_NAMESPACE: "agentrun-v01",
AGENTRUN_SESSION_PVC_MOUNT_PATH: "/home/agentrun/.codex-deepseek/custom",
AGENTRUN_CODEX_ROLLOUT_SUBDIR: "custom",
},
oneShot: true,
});
const events = await options.client.get(`/api/v1/runs/${item.runId}/events?afterSeq=0&limit=100`) as { items?: Array<{ type: string; payload: unknown }> };
const mounted = (events.items ?? []).find((event) => event.type === "backend_status" && eventPayload(event).phase === "codex-rollout-storage-mounted");
assert.ok(mounted, "storage-mounted event must fire for custom subdir");
const payload = eventPayload(mounted);
assert.equal(payload.codexRolloutSubdir, "custom", "AGENTRUN_CODEX_ROLLOUT_SUBDIR must be observed in the storage-mounted event");
assert.equal(payload.mountPath, "/home/agentrun/.codex-deepseek/custom", "mount path must use the rollout subdir suffix");
}
async function runSessionStorageNoSecretLeakCase(options: { client: ManagerClient; managerUrl: string; context: SelfTestContext }): Promise<void> {
const item = await createRunWithCommand(options.client, options.context, "session storage no leak", "selftest-session-storage-no-leak", 3_000);
await runOnce({
managerUrl: options.managerUrl,
runId: item.runId,
codexCommand: options.context.fakeCodexCommand,
codexArgs: options.context.fakeCodexArgs,
codexHome: options.context.codexHome,
env: {
CODEX_HOME: options.context.codexHome,
AGENTRUN_SESSION_PVC_NAME: "agentrun-v01-session-selftest-leak",
AGENTRUN_SESSION_PVC_NAMESPACE: "agentrun-v01",
AGENTRUN_SESSION_PVC_MOUNT_PATH: "/home/agentrun/.codex-codex/sessions",
AGENTRUN_CODEX_ROLLOUT_SUBDIR: "sessions",
},
oneShot: true,
});
const events = await options.client.get(`/api/v1/runs/${item.runId}/events?afterSeq=0&limit=100`) as { items?: Array<{ type: string; payload: unknown }> };
assertNoSecretLeak(events);
}
export default selfTest;