diff --git a/docs/reference/spec-v01-agentrun-runner.md b/docs/reference/spec-v01-agentrun-runner.md index c6ccc53..314a902 100644 --- a/docs/reference/spec-v01-agentrun-runner.md +++ b/docs/reference/spec-v01-agentrun-runner.md @@ -38,7 +38,7 @@ Runner 承接的是 HWLAB v0.2 原有 Code Agent 的执行层经验,不承接 | HWLAB v0.2 参考能力 | 参考入口 | Runner 承接规则 | | --- | --- | --- | -| Codex app-server stdio thread/turn 生命周期 | `internal/cloud/codex-stdio-session.ts` | 有 `SessionRef.threadId` 时执行 resume,再 start turn;无 thread 时 start thread;turn terminal 才能上报 completed。 | +| Codex app-server stdio thread/turn 生命周期 | `internal/cloud/codex-stdio-session.ts` | 有 command `payload.threadId` 或 `SessionRef.threadId` 时执行 resume,再 start turn;无标准 `threadId` 时 start thread;events、result 和 session record 都以 `threadId` 为唯一 thread identity;turn terminal 才能上报 completed。 | | cancel/interrupt | `internal/cloud/server-code-agent-http.ts`、`internal/cloud/codex-stdio-session.ts` | runner 必须轮询 manager cancel 状态并中止 backend;backend 不支持精确 interrupt 时终止受控进程组。 | | runnerTrace 事件可见性 | `internal/cloud/code-agent-trace-store.ts` | backend 输出必须转成 manager events;每个 terminal/错误/取消都要有事件和 final status。 | | workspace-write 边界 | `internal/cloud/code-agent-contract.ts` | runner 只使用 ResourceBundleRef materialized workspace,不猜 HWLAB Pod 的 `/workspace/hwlab` 或 host path。 | diff --git a/docs/reference/spec-v01-hwlab-manual-dispatch.md b/docs/reference/spec-v01-hwlab-manual-dispatch.md index aae649f..61a8525 100644 --- a/docs/reference/spec-v01-hwlab-manual-dispatch.md +++ b/docs/reference/spec-v01-hwlab-manual-dispatch.md @@ -112,7 +112,7 @@ AgentRun 需要提供 durable cancel 能力,建议形态为 `POST /api/v1/runs ### P1 SessionRef 持久化 -`SessionRef` 需要从 `null/deferred` 升级为可选持久会话引用,支持 HWLAB `conversationId/sessionId/threadId` 到 AgentRun session identity 的映射。session 只能保存 backend thread/session/cache,不保存 API KEY、`auth.json`、`config.toml` 或完整 `CODEX_HOME`。session store 必须与 Secret projection、writable runtime home、Git workspace 分离。runner 启动时,有 SessionRef 则执行 `thread/resume`,没有则执行 `thread/start`;profile 隔离、TTL、GC 和跨 profile 污染防护必须可见。 +`SessionRef` 需要从 `null/deferred` 升级为可选持久会话引用,支持 HWLAB `conversationId/sessionId/threadId` 到 AgentRun session identity 的映射。session 只能保存 backend thread/session/cache,不保存 API KEY、`auth.json`、`config.toml` 或完整 `CODEX_HOME`。session store 必须与 Secret projection、writable runtime home、Git workspace 分离。runner 启动时,只按 command `payload.threadId` 或 `SessionRef.threadId` 执行 `thread/resume`,没有标准 `threadId` 则执行 `thread/start`;events、result 和 session record 都以 `threadId` 为唯一 thread identity;profile 隔离、TTL、GC 和跨 profile 污染防护必须可见。 ### P1 ResourceBundleRef / bundle materialization diff --git a/docs/reference/spec-v01-runtime-assembly.md b/docs/reference/spec-v01-runtime-assembly.md index 2cdd966..bdf71ba 100644 --- a/docs/reference/spec-v01-runtime-assembly.md +++ b/docs/reference/spec-v01-runtime-assembly.md @@ -107,6 +107,7 @@ HWLAB v0.2 原有 Code Agent 已经验证了 profile、session、workspace 和 S - P0 允许 `sessionRef=null`,表示不持久化 backend session 文件。 - 面向 HWLAB v0.2 原有长会话能力,SessionRef 是承接 Code Agent thread 复用的核心字段:需要支持 `conversationId/sessionId/threadId` 到 backend session identity 的稳定映射。 +- thread 复用只认标准 `threadId`:单个 command 显式提供 `payload.threadId` 时优先使用,否则使用 `SessionRef.threadId`。协议字段、events、result 和 session record 都以该字段为唯一 thread identity;缺失标准 `threadId` 就按新 thread 启动并在 result/sessionRef 中回写标准字段。 - 一旦启用 session,必须只保存 backend session/cache,不保存 API KEY、`auth.json`、`config.toml` 或完整 `CODEX_HOME`。 - session 文件目录必须和 profile credential、Git workspace 分开。 - runner 启动时,有 SessionRef 则执行 `thread/resume`,没有 SessionRef 则执行 `thread/start`;profile 切换不得复用另一 profile 的 session。 diff --git a/src/backend/adapter.ts b/src/backend/adapter.ts index 943d063..905ff05 100644 --- a/src/backend/adapter.ts +++ b/src/backend/adapter.ts @@ -54,8 +54,8 @@ export function backendTurnOptions(run: RunRecord, command: CommandRecord, optio timeoutMs: run.executionPolicy.timeoutMs, }; if (typeof command.payload.model === "string") turnOptions.model = command.payload.model; - if (typeof command.payload.threadId === "string") turnOptions.threadId = command.payload.threadId; - else if (typeof run.sessionRef?.threadId === "string") turnOptions.threadId = run.sessionRef.threadId; + if (typeof command.payload.threadId === "string" && command.payload.threadId.trim()) turnOptions.threadId = command.payload.threadId.trim(); + else if (typeof run.sessionRef?.threadId === "string" && run.sessionRef.threadId.trim()) turnOptions.threadId = run.sessionRef.threadId.trim(); if (options.codexCommand) turnOptions.command = options.codexCommand; if (options.codexArgs) turnOptions.args = options.codexArgs; if (options.env) turnOptions.env = options.env; diff --git a/src/backend/codex-stdio.ts b/src/backend/codex-stdio.ts index 2d20ad0..14a2504 100644 --- a/src/backend/codex-stdio.ts +++ b/src/backend/codex-stdio.ts @@ -446,25 +446,9 @@ async function runCodexStdioTurnWithSession(options: CodexStdioTurnOptions, sess }; if (options.threadId) { - try { - const threadResponse = requireResponseRecord(await client.request("thread/resume", withOptionalModel({ threadId: options.threadId, cwd: options.cwd, approvalPolicy: options.approvalPolicy, sandbox: options.sandbox }, options.model), requestTimeoutMs), "thread/resume"); - threadId = requireNestedId(threadResponse, "thread/resume", "thread"); - emitEvent({ type: "backend_status", payload: { phase: "thread/resume:completed", threadId } }); - } catch (error) { - const failure = normalizeFailure(error); - if (!isStaleThreadResumeFailure(failure)) throw error; - emitEvent({ - type: "backend_status", - payload: { - phase: "thread/resume:stale-thread-fallback", - requestedThreadId: options.threadId, - failureKind: failure.failureKind, - message: failure.message, - fallback: "thread/start", - }, - }); - threadId = await startThread("thread/start:after-stale-resume"); - } + const threadResponse = requireResponseRecord(await client.request("thread/resume", withOptionalModel({ threadId: options.threadId, cwd: options.cwd, approvalPolicy: options.approvalPolicy, sandbox: options.sandbox }, options.model), requestTimeoutMs), "thread/resume"); + threadId = requireNestedId(threadResponse, "thread/resume", "thread"); + emitEvent({ type: "backend_status", payload: { phase: "thread/resume:completed", threadId } }); } else { threadId = await startThread(); } @@ -884,12 +868,6 @@ function normalizeFailure(error: unknown): CodexStdioFailure { return new CodexStdioFailure(classifyMessageFailureKind(message, "backend-protocol-error"), message, "codex-stdio"); } -function isStaleThreadResumeFailure(error: CodexStdioFailure): boolean { - if (error.phase !== "response:thread/resume") return false; - const text = `${error.message}\n${JSON.stringify(error.details)}`.toLowerCase(); - return /no rollout found for thread id/u.test(text); -} - function classifyCodexErrorRecord(error: JsonRecord, fallback: FailureKind): FailureKind { const parts: string[] = []; if (typeof error.message === "string") parts.push(error.message); diff --git a/src/selftest/cases/30-codex-stdio.ts b/src/selftest/cases/30-codex-stdio.ts index 416e937..70752ee 100644 --- a/src/selftest/cases/30-codex-stdio.ts +++ b/src/selftest/cases/30-codex-stdio.ts @@ -93,25 +93,26 @@ const selfTest: SelfTestCase = async (context) => { assert.ok(progressMessageIndex >= 0 && progressMessageIndex < turnCompletedIndex, "progress agentMessage should be emitted before turn/completed instead of being delayed to final response"); assert.ok(finalMessageIndex >= 0 && finalMessageIndex < turnCompletedIndex, "final agentMessage should be emitted before turn/completed instead of being delayed to final response"); - const staleResume = await createStaleThreadRun(client, context); - const staleResumeResult = await runOnce({ + const staleThread = await createStaleThreadRun(client, context); + const staleThreadResult = await runOnce({ managerUrl: server.baseUrl, - runId: staleResume.runId, - commandId: staleResume.commandId, + runId: staleThread.runId, + commandId: staleThread.commandId, codexCommand: context.fakeCodexCommand, codexArgs: context.fakeCodexArgs, codexHome: context.codexHome, env: { CODEX_HOME: context.codexHome, AGENTRUN_FAKE_CODEX_MODE: "resume-no-rollout" }, oneShot: true, }) as JsonRecord; - assert.equal(staleResumeResult.terminalStatus, "completed", "stale thread resume should fall back to thread/start and complete the turn"); - const staleEnvelope = await client.get(`/api/v1/runs/${staleResume.runId}/commands/${staleResume.commandId}/result`) as JsonRecord; - assert.equal(staleEnvelope.terminalStatus, "completed"); - assert.equal((staleEnvelope.sessionRef as JsonRecord).threadId, "thread_selftest_1"); - const staleEvents = await client.get(`/api/v1/runs/${staleResume.runId}/events?afterSeq=0&limit=100`) as { items?: Array<{ type: string; payload: unknown }> }; - assert.ok(staleEvents.items?.some((event) => event.type === "backend_status" && eventPayload(event).phase === "thread/resume:stale-thread-fallback"), "stale resume fallback event should be visible"); - assert.ok(staleEvents.items?.some((event) => event.type === "backend_status" && eventPayload(event).phase === "thread/start:after-stale-resume:completed"), "fallback should start a fresh thread"); - assertNoSecretLeak(staleEvents); + assert.equal(staleThreadResult.terminalStatus, "failed", "standard thread resume failure must not start a replacement thread"); + assert.equal(staleThreadResult.failureKind, "backend-protocol-error"); + const staleEnvelope = await client.get(`/api/v1/runs/${staleThread.runId}/commands/${staleThread.commandId}/result`) as JsonRecord; + assert.equal(staleEnvelope.terminalStatus, "failed"); + assert.equal(staleEnvelope.completed, false); + assert.equal((staleEnvelope.sessionRef as JsonRecord).threadId, "thread_missing_rollout"); + const staleEvents = await client.get(`/api/v1/runs/${staleThread.runId}/events?afterSeq=0&limit=100`) as { items?: Array<{ type: string; payload: unknown }> }; + assert.equal(staleEvents.items?.some((event) => event.type === "backend_status" && eventPayload(event).phase === "thread/start:completed"), false, "stale standard thread must not create a replacement thread"); + assertNoSecretLeak({ staleThreadResult, staleEnvelope, staleEvents }); const live = await createRunWithCommand(client, context, "hello live events", "selftest-live-tool-events", 15_000); const livePromise = runOnce({ managerUrl: server.baseUrl, runId: live.runId, codexCommand: context.fakeCodexCommand, codexArgs: context.fakeCodexArgs, codexHome: context.codexHome, env: { CODEX_HOME: context.codexHome, AGENTRUN_FAKE_CODEX_MODE: "slow-tool-events" }, oneShot: true }) as Promise; @@ -155,7 +156,7 @@ const selfTest: SelfTestCase = async (context) => { await runSecretFailureCase({ client, managerUrl: server.baseUrl, context }); await runSpawnFailureCase({ client, managerUrl: server.baseUrl, context }); - return { name: "codex-stdio", tests: ["runner-lease-heartbeat", "codex-stdio-fake-turn", "codex-stdio-projected-writable-home", "codex-stdio-deepseek-profile-fake-turn", "codex-stdio-minimax-m3-profile-fake-turn", "codex-stdio-deepseek-missing-secret-no-fallback", "codex-stdio-minimax-m3-missing-secret-no-fallback", "codex-stdio-config-model-authoritative", "codex-stdio-explicit-model-forwarded", "codex-stdio-final-agent-message-only", "codex-stdio-stale-thread-fallback", "codex-stdio-live-tool-events", "codex-stdio-noisy-reasoning-suppression", "codex-stdio-missing-turn-result", "codex-stdio-provider-auth-failed", "codex-stdio-provider-rate-limited", "codex-stdio-provider-503-rpc-error", "codex-stdio-provider-503-terminal", "codex-stdio-provider-503-retry-event", "codex-stdio-invalid-json", "codex-stdio-timeout", "codex-stdio-secret-unavailable", "codex-stdio-spawn-failure"] }; + return { name: "codex-stdio", tests: ["runner-lease-heartbeat", "codex-stdio-fake-turn", "codex-stdio-projected-writable-home", "codex-stdio-deepseek-profile-fake-turn", "codex-stdio-minimax-m3-profile-fake-turn", "codex-stdio-deepseek-missing-secret-no-fallback", "codex-stdio-minimax-m3-missing-secret-no-fallback", "codex-stdio-config-model-authoritative", "codex-stdio-explicit-model-forwarded", "codex-stdio-final-agent-message-only", "codex-stdio-stale-thread-fails", "codex-stdio-live-tool-events", "codex-stdio-noisy-reasoning-suppression", "codex-stdio-missing-turn-result", "codex-stdio-provider-auth-failed", "codex-stdio-provider-rate-limited", "codex-stdio-provider-503-rpc-error", "codex-stdio-provider-503-terminal", "codex-stdio-provider-503-retry-event", "codex-stdio-invalid-json", "codex-stdio-timeout", "codex-stdio-secret-unavailable", "codex-stdio-spawn-failure"] }; } finally { await new Promise((resolve) => server.server.close(() => resolve())); } @@ -227,7 +228,7 @@ async function createStaleThreadRun(client: ManagerClient, context: SelfTestCont }, traceSink: null, }) as { id: string }; - const command = await client.post(`/api/v1/runs/${run.id}/commands`, { type: "turn", payload: { prompt: "hello stale thread" }, idempotencyKey: "selftest-stale-thread-fallback" }) as { id: string }; + const command = await client.post(`/api/v1/runs/${run.id}/commands`, { type: "turn", payload: { prompt: "hello stale thread" }, idempotencyKey: "selftest-stale-thread-fails" }) as { id: string }; return { runId: run.id, commandId: command.id }; } diff --git a/src/selftest/cases/60-hwlab-baseline-contract.ts b/src/selftest/cases/60-hwlab-baseline-contract.ts index 3b3d3dd..d30bd72 100644 --- a/src/selftest/cases/60-hwlab-baseline-contract.ts +++ b/src/selftest/cases/60-hwlab-baseline-contract.ts @@ -9,6 +9,7 @@ import { ManagerClient } from "../../mgr/client.js"; import { runOnce } from "../../runner/run-once.js"; import { eventContractSummary } from "../../common/events.js"; import type { BackendProfile, JsonRecord, RunEvent } from "../../common/types.js"; +import { backendTurnOptions } from "../../backend/adapter.js"; import { assertNoSecretLeak, createRunWithCommand, profileSecretHome, type SelfTestCase, type SelfTestContext } from "../harness.js"; const execFile = promisify(execFileCallback); @@ -43,9 +44,10 @@ console.log(JSON.stringify({ apiVersion: manifest.apiVersion, kind: manifest.kin await assertBackendPreflight(client); await assertEventContractAndCompletedSemantics(client, context, server.baseUrl); await assertRunnerJobStatus(client, context); + assertThreadResumeStandard(context); await assertSessionProfileIsolation(client, context); await assertResourceBundleFailure(client, context, server.baseUrl); - return { name: "hwlab-baseline-contract", tests: ["event-contract", "result-completed-terminal-only", "bounded-output-summary", "runner-job-status", "backend-preflight-redacted", "session-profile-isolation", "resource-bundle-failure-kind"] }; + return { name: "hwlab-baseline-contract", tests: ["event-contract", "result-completed-terminal-only", "bounded-output-summary", "runner-job-status", "thread-resume-standard", "backend-preflight-redacted", "session-profile-isolation", "resource-bundle-failure-kind"] }; } finally { await new Promise((resolve) => server.server.close(() => resolve())); } @@ -117,6 +119,40 @@ async function assertRunnerJobStatus(client: ManagerClient, context: SelfTestCon assertNoSecretLeak({ list, single }); } +function assertThreadResumeStandard(context: SelfTestContext): void { + const run = { + ...runPayload(context, "codex", "selftest-thread-standard-session"), + id: "run_thread_standard", + status: "claimed", + terminalStatus: null, + terminalFailureKind: null, + terminalFailureMessage: null, + sessionRef: { + sessionId: "selftest-thread-standard-session", + conversationId: "selftest-thread-standard-session", + threadId: "thread_from_session_ref", + }, + resourceBundleRef: null, + claimedBy: null, + leaseExpiresAt: null, + createdAt: "2026-06-01T00:00:00.000Z", + updatedAt: "2026-06-01T00:00:00.000Z", + } as any; + const command = { + id: "cmd_thread_standard", + runId: run.id, + type: "turn", + payload: { prompt: "resume session thread" }, + state: "pending", + seq: 1, + createdAt: run.createdAt, + updatedAt: run.updatedAt, + } as any; + assert.equal(backendTurnOptions(run, command).threadId, "thread_from_session_ref"); + assert.equal(backendTurnOptions(run, { ...command, payload: { prompt: "explicit wins", threadId: "thread_explicit" } }).threadId, "thread_explicit"); + assert.equal(backendTurnOptions({ ...run, sessionRef: { sessionId: "selftest-thread-standard-session" } }, command).threadId, undefined); +} + async function assertSessionProfileIsolation(client: ManagerClient, context: SelfTestContext): Promise { const first = await client.post("/api/v1/runs", runPayload(context, "codex", "selftest-profile-boundary-session")) as { id: string }; await client.patch(`/api/v1/runs/${first.id}/status`, { terminalStatus: "completed", failureKind: null, failureMessage: null, threadId: "thread_codex_profile_boundary", turnId: "turn_profile_boundary" });