Merge pull request #69 from pikasTech/fix/v01-requested-thread-resume-684

fix: 统一 AgentRun threadId 连续性
This commit is contained in:
Lyon
2026-06-02 10:21:32 +08:00
committed by GitHub
7 changed files with 60 additions and 44 deletions
+1 -1
View File
@@ -38,7 +38,7 @@ Runner 承接的是 HWLAB v0.2 原有 Code Agent 的执行层经验,不承接
| HWLAB v0.2 参考能力 | 参考入口 | Runner 承接规则 |
| --- | --- | --- |
| Codex app-server stdio thread/turn 生命周期 | `internal/cloud/codex-stdio-session.ts` | 有 `SessionRef.threadId` 时执行 resume,再 start turn;无 thread 时 start threadturn terminal 才能上报 completed。 |
| Codex app-server stdio thread/turn 生命周期 | `internal/cloud/codex-stdio-session.ts` | 有 command `payload.threadId` `SessionRef.threadId` 时执行 resume,再 start turn;无标准 `threadId` 时 start threadevents、result 和 session record 都以 `threadId` 为唯一 thread identityturn terminal 才能上报 completed。 |
| cancel/interrupt | `internal/cloud/server-code-agent-http.ts``internal/cloud/codex-stdio-session.ts` | runner 必须轮询 manager cancel 状态并中止 backendbackend 不支持精确 interrupt 时终止受控进程组。 |
| runnerTrace 事件可见性 | `internal/cloud/code-agent-trace-store.ts` | backend 输出必须转成 manager events;每个 terminal/错误/取消都要有事件和 final status。 |
| workspace-write 边界 | `internal/cloud/code-agent-contract.ts` | runner 只使用 ResourceBundleRef materialized workspace,不猜 HWLAB Pod 的 `/workspace/hwlab` 或 host path。 |
@@ -112,7 +112,7 @@ AgentRun 需要提供 durable cancel 能力,建议形态为 `POST /api/v1/runs
### P1 SessionRef 持久化
`SessionRef` 需要从 `null/deferred` 升级为可选持久会话引用,支持 HWLAB `conversationId/sessionId/threadId` 到 AgentRun session identity 的映射。session 只能保存 backend thread/session/cache,不保存 API KEY、`auth.json``config.toml` 或完整 `CODEX_HOME`。session store 必须与 Secret projection、writable runtime home、Git workspace 分离。runner 启动时,有 SessionRef 则执行 `thread/resume`,没有则执行 `thread/start`profile 隔离、TTL、GC 和跨 profile 污染防护必须可见。
`SessionRef` 需要从 `null/deferred` 升级为可选持久会话引用,支持 HWLAB `conversationId/sessionId/threadId` 到 AgentRun session identity 的映射。session 只能保存 backend thread/session/cache,不保存 API KEY、`auth.json``config.toml` 或完整 `CODEX_HOME`。session store 必须与 Secret projection、writable runtime home、Git workspace 分离。runner 启动时,只按 command `payload.threadId``SessionRef.threadId` 执行 `thread/resume`,没有标准 `threadId` 则执行 `thread/start`events、result 和 session record 都以 `threadId` 为唯一 thread identityprofile 隔离、TTL、GC 和跨 profile 污染防护必须可见。
### P1 ResourceBundleRef / bundle materialization
@@ -107,6 +107,7 @@ HWLAB v0.2 原有 Code Agent 已经验证了 profile、session、workspace 和 S
- P0 允许 `sessionRef=null`,表示不持久化 backend session 文件。
- 面向 HWLAB v0.2 原有长会话能力,SessionRef 是承接 Code Agent thread 复用的核心字段:需要支持 `conversationId/sessionId/threadId` 到 backend session identity 的稳定映射。
- thread 复用只认标准 `threadId`:单个 command 显式提供 `payload.threadId` 时优先使用,否则使用 `SessionRef.threadId`。协议字段、events、result 和 session record 都以该字段为唯一 thread identity;缺失标准 `threadId` 就按新 thread 启动并在 result/sessionRef 中回写标准字段。
- 一旦启用 session,必须只保存 backend session/cache,不保存 API KEY、`auth.json``config.toml` 或完整 `CODEX_HOME`
- session 文件目录必须和 profile credential、Git workspace 分开。
- runner 启动时,有 SessionRef 则执行 `thread/resume`,没有 SessionRef 则执行 `thread/start`;profile 切换不得复用另一 profile 的 session。
+2 -2
View File
@@ -54,8 +54,8 @@ export function backendTurnOptions(run: RunRecord, command: CommandRecord, optio
timeoutMs: run.executionPolicy.timeoutMs,
};
if (typeof command.payload.model === "string") turnOptions.model = command.payload.model;
if (typeof command.payload.threadId === "string") turnOptions.threadId = command.payload.threadId;
else if (typeof run.sessionRef?.threadId === "string") turnOptions.threadId = run.sessionRef.threadId;
if (typeof command.payload.threadId === "string" && command.payload.threadId.trim()) turnOptions.threadId = command.payload.threadId.trim();
else if (typeof run.sessionRef?.threadId === "string" && run.sessionRef.threadId.trim()) turnOptions.threadId = run.sessionRef.threadId.trim();
if (options.codexCommand) turnOptions.command = options.codexCommand;
if (options.codexArgs) turnOptions.args = options.codexArgs;
if (options.env) turnOptions.env = options.env;
+3 -25
View File
@@ -446,25 +446,9 @@ async function runCodexStdioTurnWithSession(options: CodexStdioTurnOptions, sess
};
if (options.threadId) {
try {
const threadResponse = requireResponseRecord(await client.request("thread/resume", withOptionalModel({ threadId: options.threadId, cwd: options.cwd, approvalPolicy: options.approvalPolicy, sandbox: options.sandbox }, options.model), requestTimeoutMs), "thread/resume");
threadId = requireNestedId(threadResponse, "thread/resume", "thread");
emitEvent({ type: "backend_status", payload: { phase: "thread/resume:completed", threadId } });
} catch (error) {
const failure = normalizeFailure(error);
if (!isStaleThreadResumeFailure(failure)) throw error;
emitEvent({
type: "backend_status",
payload: {
phase: "thread/resume:stale-thread-fallback",
requestedThreadId: options.threadId,
failureKind: failure.failureKind,
message: failure.message,
fallback: "thread/start",
},
});
threadId = await startThread("thread/start:after-stale-resume");
}
const threadResponse = requireResponseRecord(await client.request("thread/resume", withOptionalModel({ threadId: options.threadId, cwd: options.cwd, approvalPolicy: options.approvalPolicy, sandbox: options.sandbox }, options.model), requestTimeoutMs), "thread/resume");
threadId = requireNestedId(threadResponse, "thread/resume", "thread");
emitEvent({ type: "backend_status", payload: { phase: "thread/resume:completed", threadId } });
} else {
threadId = await startThread();
}
@@ -884,12 +868,6 @@ function normalizeFailure(error: unknown): CodexStdioFailure {
return new CodexStdioFailure(classifyMessageFailureKind(message, "backend-protocol-error"), message, "codex-stdio");
}
function isStaleThreadResumeFailure(error: CodexStdioFailure): boolean {
if (error.phase !== "response:thread/resume") return false;
const text = `${error.message}\n${JSON.stringify(error.details)}`.toLowerCase();
return /no rollout found for thread id/u.test(text);
}
function classifyCodexErrorRecord(error: JsonRecord, fallback: FailureKind): FailureKind {
const parts: string[] = [];
if (typeof error.message === "string") parts.push(error.message);
+15 -14
View File
@@ -93,25 +93,26 @@ const selfTest: SelfTestCase = async (context) => {
assert.ok(progressMessageIndex >= 0 && progressMessageIndex < turnCompletedIndex, "progress agentMessage should be emitted before turn/completed instead of being delayed to final response");
assert.ok(finalMessageIndex >= 0 && finalMessageIndex < turnCompletedIndex, "final agentMessage should be emitted before turn/completed instead of being delayed to final response");
const staleResume = await createStaleThreadRun(client, context);
const staleResumeResult = await runOnce({
const staleThread = await createStaleThreadRun(client, context);
const staleThreadResult = await runOnce({
managerUrl: server.baseUrl,
runId: staleResume.runId,
commandId: staleResume.commandId,
runId: staleThread.runId,
commandId: staleThread.commandId,
codexCommand: context.fakeCodexCommand,
codexArgs: context.fakeCodexArgs,
codexHome: context.codexHome,
env: { CODEX_HOME: context.codexHome, AGENTRUN_FAKE_CODEX_MODE: "resume-no-rollout" },
oneShot: true,
}) as JsonRecord;
assert.equal(staleResumeResult.terminalStatus, "completed", "stale thread resume should fall back to thread/start and complete the turn");
const staleEnvelope = await client.get(`/api/v1/runs/${staleResume.runId}/commands/${staleResume.commandId}/result`) as JsonRecord;
assert.equal(staleEnvelope.terminalStatus, "completed");
assert.equal((staleEnvelope.sessionRef as JsonRecord).threadId, "thread_selftest_1");
const staleEvents = await client.get(`/api/v1/runs/${staleResume.runId}/events?afterSeq=0&limit=100`) as { items?: Array<{ type: string; payload: unknown }> };
assert.ok(staleEvents.items?.some((event) => event.type === "backend_status" && eventPayload(event).phase === "thread/resume:stale-thread-fallback"), "stale resume fallback event should be visible");
assert.ok(staleEvents.items?.some((event) => event.type === "backend_status" && eventPayload(event).phase === "thread/start:after-stale-resume:completed"), "fallback should start a fresh thread");
assertNoSecretLeak(staleEvents);
assert.equal(staleThreadResult.terminalStatus, "failed", "standard thread resume failure must not start a replacement thread");
assert.equal(staleThreadResult.failureKind, "backend-protocol-error");
const staleEnvelope = await client.get(`/api/v1/runs/${staleThread.runId}/commands/${staleThread.commandId}/result`) as JsonRecord;
assert.equal(staleEnvelope.terminalStatus, "failed");
assert.equal(staleEnvelope.completed, false);
assert.equal((staleEnvelope.sessionRef as JsonRecord).threadId, "thread_missing_rollout");
const staleEvents = await client.get(`/api/v1/runs/${staleThread.runId}/events?afterSeq=0&limit=100`) as { items?: Array<{ type: string; payload: unknown }> };
assert.equal(staleEvents.items?.some((event) => event.type === "backend_status" && eventPayload(event).phase === "thread/start:completed"), false, "stale standard thread must not create a replacement thread");
assertNoSecretLeak({ staleThreadResult, staleEnvelope, staleEvents });
const live = await createRunWithCommand(client, context, "hello live events", "selftest-live-tool-events", 15_000);
const livePromise = runOnce({ managerUrl: server.baseUrl, runId: live.runId, codexCommand: context.fakeCodexCommand, codexArgs: context.fakeCodexArgs, codexHome: context.codexHome, env: { CODEX_HOME: context.codexHome, AGENTRUN_FAKE_CODEX_MODE: "slow-tool-events" }, oneShot: true }) as Promise<JsonRecord>;
@@ -155,7 +156,7 @@ const selfTest: SelfTestCase = async (context) => {
await runSecretFailureCase({ client, managerUrl: server.baseUrl, context });
await runSpawnFailureCase({ client, managerUrl: server.baseUrl, context });
return { name: "codex-stdio", tests: ["runner-lease-heartbeat", "codex-stdio-fake-turn", "codex-stdio-projected-writable-home", "codex-stdio-deepseek-profile-fake-turn", "codex-stdio-minimax-m3-profile-fake-turn", "codex-stdio-deepseek-missing-secret-no-fallback", "codex-stdio-minimax-m3-missing-secret-no-fallback", "codex-stdio-config-model-authoritative", "codex-stdio-explicit-model-forwarded", "codex-stdio-final-agent-message-only", "codex-stdio-stale-thread-fallback", "codex-stdio-live-tool-events", "codex-stdio-noisy-reasoning-suppression", "codex-stdio-missing-turn-result", "codex-stdio-provider-auth-failed", "codex-stdio-provider-rate-limited", "codex-stdio-provider-503-rpc-error", "codex-stdio-provider-503-terminal", "codex-stdio-provider-503-retry-event", "codex-stdio-invalid-json", "codex-stdio-timeout", "codex-stdio-secret-unavailable", "codex-stdio-spawn-failure"] };
return { name: "codex-stdio", tests: ["runner-lease-heartbeat", "codex-stdio-fake-turn", "codex-stdio-projected-writable-home", "codex-stdio-deepseek-profile-fake-turn", "codex-stdio-minimax-m3-profile-fake-turn", "codex-stdio-deepseek-missing-secret-no-fallback", "codex-stdio-minimax-m3-missing-secret-no-fallback", "codex-stdio-config-model-authoritative", "codex-stdio-explicit-model-forwarded", "codex-stdio-final-agent-message-only", "codex-stdio-stale-thread-fails", "codex-stdio-live-tool-events", "codex-stdio-noisy-reasoning-suppression", "codex-stdio-missing-turn-result", "codex-stdio-provider-auth-failed", "codex-stdio-provider-rate-limited", "codex-stdio-provider-503-rpc-error", "codex-stdio-provider-503-terminal", "codex-stdio-provider-503-retry-event", "codex-stdio-invalid-json", "codex-stdio-timeout", "codex-stdio-secret-unavailable", "codex-stdio-spawn-failure"] };
} finally {
await new Promise<void>((resolve) => server.server.close(() => resolve()));
}
@@ -227,7 +228,7 @@ async function createStaleThreadRun(client: ManagerClient, context: SelfTestCont
},
traceSink: null,
}) as { id: string };
const command = await client.post(`/api/v1/runs/${run.id}/commands`, { type: "turn", payload: { prompt: "hello stale thread" }, idempotencyKey: "selftest-stale-thread-fallback" }) as { id: string };
const command = await client.post(`/api/v1/runs/${run.id}/commands`, { type: "turn", payload: { prompt: "hello stale thread" }, idempotencyKey: "selftest-stale-thread-fails" }) as { id: string };
return { runId: run.id, commandId: command.id };
}
@@ -9,6 +9,7 @@ import { ManagerClient } from "../../mgr/client.js";
import { runOnce } from "../../runner/run-once.js";
import { eventContractSummary } from "../../common/events.js";
import type { BackendProfile, JsonRecord, RunEvent } from "../../common/types.js";
import { backendTurnOptions } from "../../backend/adapter.js";
import { assertNoSecretLeak, createRunWithCommand, profileSecretHome, type SelfTestCase, type SelfTestContext } from "../harness.js";
const execFile = promisify(execFileCallback);
@@ -43,9 +44,10 @@ console.log(JSON.stringify({ apiVersion: manifest.apiVersion, kind: manifest.kin
await assertBackendPreflight(client);
await assertEventContractAndCompletedSemantics(client, context, server.baseUrl);
await assertRunnerJobStatus(client, context);
assertThreadResumeStandard(context);
await assertSessionProfileIsolation(client, context);
await assertResourceBundleFailure(client, context, server.baseUrl);
return { name: "hwlab-baseline-contract", tests: ["event-contract", "result-completed-terminal-only", "bounded-output-summary", "runner-job-status", "backend-preflight-redacted", "session-profile-isolation", "resource-bundle-failure-kind"] };
return { name: "hwlab-baseline-contract", tests: ["event-contract", "result-completed-terminal-only", "bounded-output-summary", "runner-job-status", "thread-resume-standard", "backend-preflight-redacted", "session-profile-isolation", "resource-bundle-failure-kind"] };
} finally {
await new Promise<void>((resolve) => server.server.close(() => resolve()));
}
@@ -117,6 +119,40 @@ async function assertRunnerJobStatus(client: ManagerClient, context: SelfTestCon
assertNoSecretLeak({ list, single });
}
function assertThreadResumeStandard(context: SelfTestContext): void {
const run = {
...runPayload(context, "codex", "selftest-thread-standard-session"),
id: "run_thread_standard",
status: "claimed",
terminalStatus: null,
terminalFailureKind: null,
terminalFailureMessage: null,
sessionRef: {
sessionId: "selftest-thread-standard-session",
conversationId: "selftest-thread-standard-session",
threadId: "thread_from_session_ref",
},
resourceBundleRef: null,
claimedBy: null,
leaseExpiresAt: null,
createdAt: "2026-06-01T00:00:00.000Z",
updatedAt: "2026-06-01T00:00:00.000Z",
} as any;
const command = {
id: "cmd_thread_standard",
runId: run.id,
type: "turn",
payload: { prompt: "resume session thread" },
state: "pending",
seq: 1,
createdAt: run.createdAt,
updatedAt: run.updatedAt,
} as any;
assert.equal(backendTurnOptions(run, command).threadId, "thread_from_session_ref");
assert.equal(backendTurnOptions(run, { ...command, payload: { prompt: "explicit wins", threadId: "thread_explicit" } }).threadId, "thread_explicit");
assert.equal(backendTurnOptions({ ...run, sessionRef: { sessionId: "selftest-thread-standard-session" } }, command).threadId, undefined);
}
async function assertSessionProfileIsolation(client: ManagerClient, context: SelfTestContext): Promise<void> {
const first = await client.post("/api/v1/runs", runPayload(context, "codex", "selftest-profile-boundary-session")) as { id: string };
await client.patch(`/api/v1/runs/${first.id}/status`, { terminalStatus: "completed", failureKind: null, failureMessage: null, threadId: "thread_codex_profile_boundary", turnId: "turn_profile_boundary" });