fix: 统一 AgentRun threadId 连续性
This commit is contained in:
@@ -38,7 +38,7 @@ Runner 承接的是 HWLAB v0.2 原有 Code Agent 的执行层经验,不承接
|
||||
|
||||
| HWLAB v0.2 参考能力 | 参考入口 | Runner 承接规则 |
|
||||
| --- | --- | --- |
|
||||
| Codex app-server stdio thread/turn 生命周期 | `internal/cloud/codex-stdio-session.ts` | 有 `SessionRef.threadId` 时执行 resume,再 start turn;无 thread 时 start thread;turn terminal 才能上报 completed。 |
|
||||
| Codex app-server stdio thread/turn 生命周期 | `internal/cloud/codex-stdio-session.ts` | 有 command `payload.threadId` 或 `SessionRef.threadId` 时执行 resume,再 start turn;无标准 `threadId` 时 start thread;events、result 和 session record 都以 `threadId` 为唯一 thread identity;turn terminal 才能上报 completed。 |
|
||||
| cancel/interrupt | `internal/cloud/server-code-agent-http.ts`、`internal/cloud/codex-stdio-session.ts` | runner 必须轮询 manager cancel 状态并中止 backend;backend 不支持精确 interrupt 时终止受控进程组。 |
|
||||
| runnerTrace 事件可见性 | `internal/cloud/code-agent-trace-store.ts` | backend 输出必须转成 manager events;每个 terminal/错误/取消都要有事件和 final status。 |
|
||||
| workspace-write 边界 | `internal/cloud/code-agent-contract.ts` | runner 只使用 ResourceBundleRef materialized workspace,不猜 HWLAB Pod 的 `/workspace/hwlab` 或 host path。 |
|
||||
|
||||
@@ -112,7 +112,7 @@ AgentRun 需要提供 durable cancel 能力,建议形态为 `POST /api/v1/runs
|
||||
|
||||
### P1 SessionRef 持久化
|
||||
|
||||
`SessionRef` 需要从 `null/deferred` 升级为可选持久会话引用,支持 HWLAB `conversationId/sessionId/threadId` 到 AgentRun session identity 的映射。session 只能保存 backend thread/session/cache,不保存 API KEY、`auth.json`、`config.toml` 或完整 `CODEX_HOME`。session store 必须与 Secret projection、writable runtime home、Git workspace 分离。runner 启动时,有 SessionRef 则执行 `thread/resume`,没有则执行 `thread/start`;profile 隔离、TTL、GC 和跨 profile 污染防护必须可见。
|
||||
`SessionRef` 需要从 `null/deferred` 升级为可选持久会话引用,支持 HWLAB `conversationId/sessionId/threadId` 到 AgentRun session identity 的映射。session 只能保存 backend thread/session/cache,不保存 API KEY、`auth.json`、`config.toml` 或完整 `CODEX_HOME`。session store 必须与 Secret projection、writable runtime home、Git workspace 分离。runner 启动时,只按 command `payload.threadId` 或 `SessionRef.threadId` 执行 `thread/resume`,没有标准 `threadId` 则执行 `thread/start`;events、result 和 session record 都以 `threadId` 为唯一 thread identity;profile 隔离、TTL、GC 和跨 profile 污染防护必须可见。
|
||||
|
||||
### P1 ResourceBundleRef / bundle materialization
|
||||
|
||||
|
||||
@@ -107,6 +107,7 @@ HWLAB v0.2 原有 Code Agent 已经验证了 profile、session、workspace 和 S
|
||||
|
||||
- P0 允许 `sessionRef=null`,表示不持久化 backend session 文件。
|
||||
- 面向 HWLAB v0.2 原有长会话能力,SessionRef 是承接 Code Agent thread 复用的核心字段:需要支持 `conversationId/sessionId/threadId` 到 backend session identity 的稳定映射。
|
||||
- thread 复用只认标准 `threadId`:单个 command 显式提供 `payload.threadId` 时优先使用,否则使用 `SessionRef.threadId`。协议字段、events、result 和 session record 都以该字段为唯一 thread identity;缺失标准 `threadId` 就按新 thread 启动并在 result/sessionRef 中回写标准字段。
|
||||
- 一旦启用 session,必须只保存 backend session/cache,不保存 API KEY、`auth.json`、`config.toml` 或完整 `CODEX_HOME`。
|
||||
- session 文件目录必须和 profile credential、Git workspace 分开。
|
||||
- runner 启动时,有 SessionRef 则执行 `thread/resume`,没有 SessionRef 则执行 `thread/start`;profile 切换不得复用另一 profile 的 session。
|
||||
|
||||
@@ -54,8 +54,8 @@ export function backendTurnOptions(run: RunRecord, command: CommandRecord, optio
|
||||
timeoutMs: run.executionPolicy.timeoutMs,
|
||||
};
|
||||
if (typeof command.payload.model === "string") turnOptions.model = command.payload.model;
|
||||
if (typeof command.payload.threadId === "string") turnOptions.threadId = command.payload.threadId;
|
||||
else if (typeof run.sessionRef?.threadId === "string") turnOptions.threadId = run.sessionRef.threadId;
|
||||
if (typeof command.payload.threadId === "string" && command.payload.threadId.trim()) turnOptions.threadId = command.payload.threadId.trim();
|
||||
else if (typeof run.sessionRef?.threadId === "string" && run.sessionRef.threadId.trim()) turnOptions.threadId = run.sessionRef.threadId.trim();
|
||||
if (options.codexCommand) turnOptions.command = options.codexCommand;
|
||||
if (options.codexArgs) turnOptions.args = options.codexArgs;
|
||||
if (options.env) turnOptions.env = options.env;
|
||||
|
||||
@@ -446,25 +446,9 @@ async function runCodexStdioTurnWithSession(options: CodexStdioTurnOptions, sess
|
||||
};
|
||||
|
||||
if (options.threadId) {
|
||||
try {
|
||||
const threadResponse = requireResponseRecord(await client.request("thread/resume", withOptionalModel({ threadId: options.threadId, cwd: options.cwd, approvalPolicy: options.approvalPolicy, sandbox: options.sandbox }, options.model), requestTimeoutMs), "thread/resume");
|
||||
threadId = requireNestedId(threadResponse, "thread/resume", "thread");
|
||||
emitEvent({ type: "backend_status", payload: { phase: "thread/resume:completed", threadId } });
|
||||
} catch (error) {
|
||||
const failure = normalizeFailure(error);
|
||||
if (!isStaleThreadResumeFailure(failure)) throw error;
|
||||
emitEvent({
|
||||
type: "backend_status",
|
||||
payload: {
|
||||
phase: "thread/resume:stale-thread-fallback",
|
||||
requestedThreadId: options.threadId,
|
||||
failureKind: failure.failureKind,
|
||||
message: failure.message,
|
||||
fallback: "thread/start",
|
||||
},
|
||||
});
|
||||
threadId = await startThread("thread/start:after-stale-resume");
|
||||
}
|
||||
const threadResponse = requireResponseRecord(await client.request("thread/resume", withOptionalModel({ threadId: options.threadId, cwd: options.cwd, approvalPolicy: options.approvalPolicy, sandbox: options.sandbox }, options.model), requestTimeoutMs), "thread/resume");
|
||||
threadId = requireNestedId(threadResponse, "thread/resume", "thread");
|
||||
emitEvent({ type: "backend_status", payload: { phase: "thread/resume:completed", threadId } });
|
||||
} else {
|
||||
threadId = await startThread();
|
||||
}
|
||||
@@ -884,12 +868,6 @@ function normalizeFailure(error: unknown): CodexStdioFailure {
|
||||
return new CodexStdioFailure(classifyMessageFailureKind(message, "backend-protocol-error"), message, "codex-stdio");
|
||||
}
|
||||
|
||||
function isStaleThreadResumeFailure(error: CodexStdioFailure): boolean {
|
||||
if (error.phase !== "response:thread/resume") return false;
|
||||
const text = `${error.message}\n${JSON.stringify(error.details)}`.toLowerCase();
|
||||
return /no rollout found for thread id/u.test(text);
|
||||
}
|
||||
|
||||
function classifyCodexErrorRecord(error: JsonRecord, fallback: FailureKind): FailureKind {
|
||||
const parts: string[] = [];
|
||||
if (typeof error.message === "string") parts.push(error.message);
|
||||
|
||||
@@ -93,25 +93,26 @@ const selfTest: SelfTestCase = async (context) => {
|
||||
assert.ok(progressMessageIndex >= 0 && progressMessageIndex < turnCompletedIndex, "progress agentMessage should be emitted before turn/completed instead of being delayed to final response");
|
||||
assert.ok(finalMessageIndex >= 0 && finalMessageIndex < turnCompletedIndex, "final agentMessage should be emitted before turn/completed instead of being delayed to final response");
|
||||
|
||||
const staleResume = await createStaleThreadRun(client, context);
|
||||
const staleResumeResult = await runOnce({
|
||||
const staleThread = await createStaleThreadRun(client, context);
|
||||
const staleThreadResult = await runOnce({
|
||||
managerUrl: server.baseUrl,
|
||||
runId: staleResume.runId,
|
||||
commandId: staleResume.commandId,
|
||||
runId: staleThread.runId,
|
||||
commandId: staleThread.commandId,
|
||||
codexCommand: context.fakeCodexCommand,
|
||||
codexArgs: context.fakeCodexArgs,
|
||||
codexHome: context.codexHome,
|
||||
env: { CODEX_HOME: context.codexHome, AGENTRUN_FAKE_CODEX_MODE: "resume-no-rollout" },
|
||||
oneShot: true,
|
||||
}) as JsonRecord;
|
||||
assert.equal(staleResumeResult.terminalStatus, "completed", "stale thread resume should fall back to thread/start and complete the turn");
|
||||
const staleEnvelope = await client.get(`/api/v1/runs/${staleResume.runId}/commands/${staleResume.commandId}/result`) as JsonRecord;
|
||||
assert.equal(staleEnvelope.terminalStatus, "completed");
|
||||
assert.equal((staleEnvelope.sessionRef as JsonRecord).threadId, "thread_selftest_1");
|
||||
const staleEvents = await client.get(`/api/v1/runs/${staleResume.runId}/events?afterSeq=0&limit=100`) as { items?: Array<{ type: string; payload: unknown }> };
|
||||
assert.ok(staleEvents.items?.some((event) => event.type === "backend_status" && eventPayload(event).phase === "thread/resume:stale-thread-fallback"), "stale resume fallback event should be visible");
|
||||
assert.ok(staleEvents.items?.some((event) => event.type === "backend_status" && eventPayload(event).phase === "thread/start:after-stale-resume:completed"), "fallback should start a fresh thread");
|
||||
assertNoSecretLeak(staleEvents);
|
||||
assert.equal(staleThreadResult.terminalStatus, "failed", "standard thread resume failure must not start a replacement thread");
|
||||
assert.equal(staleThreadResult.failureKind, "backend-protocol-error");
|
||||
const staleEnvelope = await client.get(`/api/v1/runs/${staleThread.runId}/commands/${staleThread.commandId}/result`) as JsonRecord;
|
||||
assert.equal(staleEnvelope.terminalStatus, "failed");
|
||||
assert.equal(staleEnvelope.completed, false);
|
||||
assert.equal((staleEnvelope.sessionRef as JsonRecord).threadId, "thread_missing_rollout");
|
||||
const staleEvents = await client.get(`/api/v1/runs/${staleThread.runId}/events?afterSeq=0&limit=100`) as { items?: Array<{ type: string; payload: unknown }> };
|
||||
assert.equal(staleEvents.items?.some((event) => event.type === "backend_status" && eventPayload(event).phase === "thread/start:completed"), false, "stale standard thread must not create a replacement thread");
|
||||
assertNoSecretLeak({ staleThreadResult, staleEnvelope, staleEvents });
|
||||
|
||||
const live = await createRunWithCommand(client, context, "hello live events", "selftest-live-tool-events", 15_000);
|
||||
const livePromise = runOnce({ managerUrl: server.baseUrl, runId: live.runId, codexCommand: context.fakeCodexCommand, codexArgs: context.fakeCodexArgs, codexHome: context.codexHome, env: { CODEX_HOME: context.codexHome, AGENTRUN_FAKE_CODEX_MODE: "slow-tool-events" }, oneShot: true }) as Promise<JsonRecord>;
|
||||
@@ -155,7 +156,7 @@ const selfTest: SelfTestCase = async (context) => {
|
||||
await runSecretFailureCase({ client, managerUrl: server.baseUrl, context });
|
||||
await runSpawnFailureCase({ client, managerUrl: server.baseUrl, context });
|
||||
|
||||
return { name: "codex-stdio", tests: ["runner-lease-heartbeat", "codex-stdio-fake-turn", "codex-stdio-projected-writable-home", "codex-stdio-deepseek-profile-fake-turn", "codex-stdio-minimax-m3-profile-fake-turn", "codex-stdio-deepseek-missing-secret-no-fallback", "codex-stdio-minimax-m3-missing-secret-no-fallback", "codex-stdio-config-model-authoritative", "codex-stdio-explicit-model-forwarded", "codex-stdio-final-agent-message-only", "codex-stdio-stale-thread-fallback", "codex-stdio-live-tool-events", "codex-stdio-noisy-reasoning-suppression", "codex-stdio-missing-turn-result", "codex-stdio-provider-auth-failed", "codex-stdio-provider-rate-limited", "codex-stdio-provider-503-rpc-error", "codex-stdio-provider-503-terminal", "codex-stdio-provider-503-retry-event", "codex-stdio-invalid-json", "codex-stdio-timeout", "codex-stdio-secret-unavailable", "codex-stdio-spawn-failure"] };
|
||||
return { name: "codex-stdio", tests: ["runner-lease-heartbeat", "codex-stdio-fake-turn", "codex-stdio-projected-writable-home", "codex-stdio-deepseek-profile-fake-turn", "codex-stdio-minimax-m3-profile-fake-turn", "codex-stdio-deepseek-missing-secret-no-fallback", "codex-stdio-minimax-m3-missing-secret-no-fallback", "codex-stdio-config-model-authoritative", "codex-stdio-explicit-model-forwarded", "codex-stdio-final-agent-message-only", "codex-stdio-stale-thread-fails", "codex-stdio-live-tool-events", "codex-stdio-noisy-reasoning-suppression", "codex-stdio-missing-turn-result", "codex-stdio-provider-auth-failed", "codex-stdio-provider-rate-limited", "codex-stdio-provider-503-rpc-error", "codex-stdio-provider-503-terminal", "codex-stdio-provider-503-retry-event", "codex-stdio-invalid-json", "codex-stdio-timeout", "codex-stdio-secret-unavailable", "codex-stdio-spawn-failure"] };
|
||||
} finally {
|
||||
await new Promise<void>((resolve) => server.server.close(() => resolve()));
|
||||
}
|
||||
@@ -227,7 +228,7 @@ async function createStaleThreadRun(client: ManagerClient, context: SelfTestCont
|
||||
},
|
||||
traceSink: null,
|
||||
}) as { id: string };
|
||||
const command = await client.post(`/api/v1/runs/${run.id}/commands`, { type: "turn", payload: { prompt: "hello stale thread" }, idempotencyKey: "selftest-stale-thread-fallback" }) as { id: string };
|
||||
const command = await client.post(`/api/v1/runs/${run.id}/commands`, { type: "turn", payload: { prompt: "hello stale thread" }, idempotencyKey: "selftest-stale-thread-fails" }) as { id: string };
|
||||
return { runId: run.id, commandId: command.id };
|
||||
}
|
||||
|
||||
|
||||
@@ -9,6 +9,7 @@ import { ManagerClient } from "../../mgr/client.js";
|
||||
import { runOnce } from "../../runner/run-once.js";
|
||||
import { eventContractSummary } from "../../common/events.js";
|
||||
import type { BackendProfile, JsonRecord, RunEvent } from "../../common/types.js";
|
||||
import { backendTurnOptions } from "../../backend/adapter.js";
|
||||
import { assertNoSecretLeak, createRunWithCommand, profileSecretHome, type SelfTestCase, type SelfTestContext } from "../harness.js";
|
||||
|
||||
const execFile = promisify(execFileCallback);
|
||||
@@ -43,9 +44,10 @@ console.log(JSON.stringify({ apiVersion: manifest.apiVersion, kind: manifest.kin
|
||||
await assertBackendPreflight(client);
|
||||
await assertEventContractAndCompletedSemantics(client, context, server.baseUrl);
|
||||
await assertRunnerJobStatus(client, context);
|
||||
assertThreadResumeStandard(context);
|
||||
await assertSessionProfileIsolation(client, context);
|
||||
await assertResourceBundleFailure(client, context, server.baseUrl);
|
||||
return { name: "hwlab-baseline-contract", tests: ["event-contract", "result-completed-terminal-only", "bounded-output-summary", "runner-job-status", "backend-preflight-redacted", "session-profile-isolation", "resource-bundle-failure-kind"] };
|
||||
return { name: "hwlab-baseline-contract", tests: ["event-contract", "result-completed-terminal-only", "bounded-output-summary", "runner-job-status", "thread-resume-standard", "backend-preflight-redacted", "session-profile-isolation", "resource-bundle-failure-kind"] };
|
||||
} finally {
|
||||
await new Promise<void>((resolve) => server.server.close(() => resolve()));
|
||||
}
|
||||
@@ -117,6 +119,40 @@ async function assertRunnerJobStatus(client: ManagerClient, context: SelfTestCon
|
||||
assertNoSecretLeak({ list, single });
|
||||
}
|
||||
|
||||
function assertThreadResumeStandard(context: SelfTestContext): void {
|
||||
const run = {
|
||||
...runPayload(context, "codex", "selftest-thread-standard-session"),
|
||||
id: "run_thread_standard",
|
||||
status: "claimed",
|
||||
terminalStatus: null,
|
||||
terminalFailureKind: null,
|
||||
terminalFailureMessage: null,
|
||||
sessionRef: {
|
||||
sessionId: "selftest-thread-standard-session",
|
||||
conversationId: "selftest-thread-standard-session",
|
||||
threadId: "thread_from_session_ref",
|
||||
},
|
||||
resourceBundleRef: null,
|
||||
claimedBy: null,
|
||||
leaseExpiresAt: null,
|
||||
createdAt: "2026-06-01T00:00:00.000Z",
|
||||
updatedAt: "2026-06-01T00:00:00.000Z",
|
||||
} as any;
|
||||
const command = {
|
||||
id: "cmd_thread_standard",
|
||||
runId: run.id,
|
||||
type: "turn",
|
||||
payload: { prompt: "resume session thread" },
|
||||
state: "pending",
|
||||
seq: 1,
|
||||
createdAt: run.createdAt,
|
||||
updatedAt: run.updatedAt,
|
||||
} as any;
|
||||
assert.equal(backendTurnOptions(run, command).threadId, "thread_from_session_ref");
|
||||
assert.equal(backendTurnOptions(run, { ...command, payload: { prompt: "explicit wins", threadId: "thread_explicit" } }).threadId, "thread_explicit");
|
||||
assert.equal(backendTurnOptions({ ...run, sessionRef: { sessionId: "selftest-thread-standard-session" } }, command).threadId, undefined);
|
||||
}
|
||||
|
||||
async function assertSessionProfileIsolation(client: ManagerClient, context: SelfTestContext): Promise<void> {
|
||||
const first = await client.post("/api/v1/runs", runPayload(context, "codex", "selftest-profile-boundary-session")) as { id: string };
|
||||
await client.patch(`/api/v1/runs/${first.id}/status`, { terminalStatus: "completed", failureKind: null, failureMessage: null, threadId: "thread_codex_profile_boundary", turnId: "turn_profile_boundary" });
|
||||
|
||||
Reference in New Issue
Block a user