Add Code Queue resume contract

This commit is contained in:
Codex
2026-05-23 10:28:57 +00:00
parent 184667b4cb
commit fd8e21e611
11 changed files with 830 additions and 4 deletions
+1
View File
@@ -47,6 +47,7 @@ CLI 可以从 `master` 快速演进,但必须兼容 `deploy.json` 固定的 CI
- `codex deploy <commitId>` 是旧 Code Queue 兼容部署入口,已禁用以防止维护通道直连 D601 部署 Code Queue;当前 dev 自动化只做 `ci run-dev-e2e` smoke,不提供 Code Queue CD,详细规则见 `docs/reference/codex-deploy.md`
- `codex submit [prompt] [--prompt-file path|--prompt-stdin] [--queue queueId] [--provider-id id] [--cwd path] [--model model] [--reasoning-effort effort] [--execution-mode mode] [--max-attempts N] [--reference-task-id id] [--dry-run]` 通过 backend-core 私有代理向稳定 `code-queue` 用户服务路径提交任务;prompt 必须且只能来自位置参数、文件或 stdin 之一,`--dry-run` 只返回结构化请求且不实际入队。长 prompt、多行 prompt、含引号/反引号/Markdown 表格/JSON/反斜杠的 prompt 必须优先用 `--prompt-stdin``--prompt-file`,不要拼进 shell 单个参数;位置参数只适合短单行 smoke prompt。stdin 推荐用 quoted heredoc`cat <<'PROMPT' | bun scripts/cli.ts codex submit --prompt-stdin --queue <id> --dry-run`,文件路径推荐 `bun scripts/cli.ts codex submit --prompt-file /tmp/code-queue-prompt.md --queue <id> --dry-run`,确认 dry-run 后移除 `--dry-run` 提交同一 payload。dry-run 会额外输出 `routingRecommendation`,包含推荐 route、runner、model、风险信号、prompt 自包含/issue 非唯一来源/prod-secret-DB 禁止/运行态或 release 禁止/证据要求/中等复杂度候选等 guard 状态;同时输出 `policyContract`,固定暴露 GPT-5.5、DeepSeek、MiniMax 的风险分层、并发上限和外部 provider 429 退避处置。该建议只用于指挥官 preflight,不会改写 payload,不改变 runtime admission,也不假设生产 MiniMax 或 DeepSeek 可用。`--dry-run` 必须返回完整 prompt、字符数和 `truncated=false` 用于人工验收;真实提交是写入操作,默认只返回 `accepted=true`、task id、队列、写入保护摘要和后续查看命令,必须标记 `promptOmitted=true` 且不得回显 prompt 或 promptPreview。真实提交会经过本机本地串行化保护和短节流,避免同一指挥端并发 submit 把低内存主机或 `code-queue-mgr` 控制面打抖;返回值会附带 `executionMode``runnerPermissions` 和低噪声 `submitConcurrencyGuard`,显式说明 requested/effective mode、服务级 runner sandbox/approvalPolicy、锁与等待信息。`--execution-mode` 是 Code Queue runtime placement,不是 Codex sandbox 权限;有效模式是 `default``windows-native``--execution-mode full-access` 等 sandbox-like 值会保留 requested 值并显示 effective `default`,同时提示当前不支持每任务 sandbox override。真实提交的 `queue` 摘要保持低噪声:`submittedTaskIds``queuedTaskIds``activeTaskIds``databaseActiveTaskIds` 是有界预览对象,`countContext``counts` 是权威计数;`submitted.taskStates[]` 直接给出本次 task id、queue id、status 和 `state=queued|running|terminal|unknown`,其来源固定为 `response.tasks[].status`。当本次新任务仍是 queued/retry_wait`queuedTaskIds.items` 必须包含该 id;当 counts 非零但 active/queued id 列表因为 split-brain-live、上游省略或默认有界披露而不可枚举时,预览必须设置 `idsUnavailable=true``itemsOmitted=true``itemsMeaning=not-enumerated-in-default-submit-output`,不得打印容易误读的 `items=[]``queue.activity.effectiveActiveTaskCount``queue.commanderConcurrency.activeRunnerCount` 是并发判断字段;`splitBrainLive=true` 时继续把 fresh heartbeat/database active 计入 active。需要原始 drill-down 时使用 `queue.listPreviewPolicy.rawCommand`,默认是 `bun scripts/cli.ts microservice proxy code-queue /api/tasks/overview?limit=30 --raw --full`。backend-core 默认把提交、队列 CRUD、已读状态、历史摘要和轻量 Trace 读取分流到主 server `code-queue-mgr`,由它写入主 PostgreSQLD601 scheduler 只轮询并执行已入库任务。
- `codex steer <taskId> [prompt|--prompt-file path|--prompt-stdin] [--steer-id id] [--dry-run] [--no-retry|--retry-attempts N] [--full|--raw]` 向运行中的 Code Queue 任务发送纠偏 prompt。CLI 会为同一 task/prompt 生成稳定 `steerId`,也允许显式传入 `--steer-id`;所有 retry 都复用同一 `steerId`,支持后端按 key 抑制重复 trace 注入。真实成功只返回低噪声写入确认,不回显 prompt 或完整任务状态;输出包含 `steer.status``steer.deliveryState``steer.steerId``traceConfirmation``commands.traceConfirm`。失败默认只返回 `accepted=false`、原因、scope、retryable、attempt 摘要、operator guidance 和 task/read/submit/health drill-down 命令。`upstreamBodyPreview`、request 元数据和 raw upstream failure 必须显式加 `--full``--raw` 才输出。任务已终态时返回紧凑 `task-already-terminal``status=not_accepted``deliveryState=not_accepted`、task 状态、终态状态、更新时间、`retryable=false``codex task` / `codex read` / `codex submit --reference-task-id <taskId>` 后续命令。
- `codex resume <taskId> [prompt|--prompt-file path|--prompt-stdin] [--resume-id id] [--dry-run] [--full|--raw]` 对已终态或 awaiting-closeout 的原 Code Queue task 创建后续 turn,优先用于 PR 小修、冲突、rebase、补测和 reviewer feedback,保留原 task、attempt、branch/PR 上下文和 `codexThreadId`/OpenCode session。CLI 会为同一 task/prompt 生成稳定 `resumeId`,也允许显式传入;同一 `resumeId` 加同 prompt 返回 `duplicate_suppressed` 且不重复注入,同一 `resumeId` 加不同 prompt 返回 409 conflict。真实成功只返回 taskId、resumeId/turnId、`deliveryState`、是否复用原 `codexThreadId`、有界 trace confirmation 和 `codex task/detail/trace/output` 后续命令,不回显 prompt 或完整 task state。running/judging task 必须 fail closed 并给出 `disposition=use-steer-for-active-task``codex steer` 命令,不把 resume 伪装成新 task;不存在 task 返回结构化 not accepted。若 delivery timeout 或 trace 未确认,输出 `deliveryUnconfirmed` 和确认命令,调用方先查 `codex task <taskId> --trace` 再用同一 `resumeId` 重试。
- `codex pr-preflight [--remote] [--push-dry-run --push-dry-run-ref refs/heads/probe/<name>] [--pr-create-dry-run --pr-create-dry-run-head <head>] [--issue N] [--full|--raw]` 通过稳定 `code-queue` proxy 请求 D601 scheduler `/api/runtime-preflight`,用于 PR 型派单 admission。默认输出是紧凑 commander 视图,显式分出 `schedulerPreflight``activeRunnerPrCapability`,并附带 `commands``disclosure`,方便先看 scheduler auth 缺口、再看当前 runner/dev container 的 `gh auth status``gh pr create --dry-run` 能力;`--full``--raw` 才展开完整 `preflight`、工具、agent port、Git worktree、GitHub egress、repo/issue/PR 只读探测和观测原文。只报告 `GH_TOKEN`/`GITHUB_TOKEN` 是否存在和来源 key,不打印值。当 auth-broker 配置存在时,`tokenCoverage.source="auth-broker"``credentialSource="broker-issued-token"` 且 runner env token 不是成功前提;当仅 env token 存在时,`credentialSource="env-token"``authBroker.nextAction="use-env-token-until-auth-broker-live"`;两者都缺失时顶层 `ok=false``runnerDisposition=infra-blocked``degradedReason=auth-broker-needed``tokenCoverage.missing` 同时列出 `GH_TOKEN``GITHUB_TOKEN`,并输出 `authBroker.source="broker/auth-broker-needed"``capability.source="missing-token"`。该 `auth-missing` 的 scope 是 `scheduler-runner-env`,不能简化成“当前 active runner/dev container 不能创建 PR”;默认视图必须带 `scopeBoundary``activeRunnerPrCapability`。GitHub DNS/API 连接失败应归类为 `failureKind=github-transient``degradedReason=github-dns-api-transient`,并带 `retryable=true``commanderAction=retry-backoff-or-keep-running-if-heartbeat-fresh` 和有界 `githubTransient.failedProbes`;调用方应重试/退避,且在任务 heartbeat/trace 新鲜时继续监督,不把它当成 auth 缺失或 PR 语义失败。`prCapability` 是 runner-facing 合同摘要,必须包含目标分支、token/auth 来源、`systemGhBinaryRequiredForWrites=false`、UniDesk REST `bun scripts/cli.ts gh` 可用性、push dry-run/PR create dry-run 的 `writesRemote=false`、expected PR handoff、真实 PR 创建需要 commander 授权和 `gh pr merge``unsupported-command` 边界;系统 `gh` binary 缺失只进入 `tools.systemGhBinary`,不得误判为 UniDesk REST `gh` CLI 不可用。`--remote` 在 runner-like 环境里不再依赖本地 `unidesk-backend-core``unidesk-database``baidu-netdisk-backend` 容器存在;这些缺失只作为本地观测证据。若远程控制面可达,则继续走远程控制面结果;若远程控制面不可达,则结构化返回 `failureKind=control-plane-missing` / `degradedReason=remote-control-plane-unreachable`,而不是把本地 `backend-core-container-missing` 当作最终阻塞。`--pr-create-dry-run` 不 POST GitHub,只证明 runner 内 PR body 生成、`scripts/cli.ts gh pr create --dry-run` 和 branch 参数形态可用;服务端创建权限仍以 token/auth broker、repo/issue/PR read、push dry-run 和最终授权后的真实 PR 创建结果为准。
- `codex task <taskId>` 通过 Code Queue 私有代理按任务 ID 查询结构化审阅摘要;默认只返回任务身份、执行 Provider、工作目录、attempt 计数、原始 prompt、最终 response、最后错误和渐进披露命令,适合指挥官审阅完成未读任务且避免上下文爆炸。`--detail` 仍是有界详细摘要:默认只返回少量 attempt/tool 行、短 prompt/response/stderr/feedback 预览和 omitted/truncated 元数据;需要完整 prompt/response 文本或更多 tool/attempt 细节时再显式加 `--full``--tool-limit N``--trace``codex output`。该摘要读取默认由主 server `code-queue-mgr` 从 PostgreSQL 返回,不依赖 D601 `code-queue-read` Service 可用。
- `codex tasks [--view supervisor|full] [--queue id] [--status succeeded|running|queued|failed|canceled|judging|retry_wait[,..]] [--unread|--unread-only] [--limit N] [--before-id id]` 通过同一私有代理输出渐进式披露视图。默认 `supervisor` 是低噪声指挥官视图,只返回 `activeRunning``running``completedUnread``recentCompleted``queued``activity``commanderConcurrency``executionDiagnostics` 的紧凑行;`activeRunning.count` 是 running+judging 的状态计数,`exact=true` 时来自 queue summary counts`running.returned``activeRunning.rowPage.returned` 只是本次返回的紧凑行数。`commanderConcurrency.activeRunnerCount` 是并发策略应使用的 active/running 计数,等于 `activity.effectiveActiveTaskCount`15 并发策略按 `15 - activeRunnerCount` 计算剩余窗口。`commanderConcurrency.splitBrainDisposition=live-count-as-active` 表示 split-brain 有 fresh heartbeat 证据,应继续监督并计入 active;`interventionRequired=true` 才提示介入。prompt/body 只给短预览和原始字符数,`running`/`completedUnread`/`queued` 默认只返回一个有界小页并通过 section `commands.next` 继续分页,`recentCompleted` 默认限量且不重复 `completedUnread` 未读终态,不嵌入完整 Trace、final response 或全量 overview。`--limit` 在 supervisor 中主要是扫描/分页预算,不是返回几十条肥行的开关;CLI 安全上限是 100,输出会在 `filters.requestedLimit``filters.effectiveLimit``filters.limitCapped``disclosure.limitPolicy` 说明显式请求是否被 capped;底层 overview 拉取预算独立显示在 `source.requestedLimit` / `source.effectiveLimit`,所以 `--limit 260` 应显示 requested=260、effective=100、source requested/effective=200,而不是只露出一个含糊的 `limit``--unread``--unread-only` 的别名,必须只保留未读终态;`--status` 必须真实过滤支持的状态,未知参数或未知状态必须结构化失败。需要更详细当前页任务行时显式使用 `--view full``--full`,仍受 `--limit``--before-id` 分页约束。
+9
View File
@@ -251,6 +251,14 @@ bun scripts/cli.ts codex pr-preflight --remote --issue <issue-number>
- 没有 token、凭证、临时日志或构建产物进入 commit、PR body 或评论。
- 未授权 runner 收口的 PR 由指挥官审查并决定是否 merge;已授权 runner 收口的普通 PR 在合并后仍要验证目标分支远端 commit 可见,并按态势更新 issue/#20/#24
### Runner Resume 收口
PR 小修、冲突、rebase、补测和 reviewer feedback 默认优先 resume 原 Code Queue task,而不是派 replacement runner。正式入口是 `bun scripts/cli.ts codex resume <taskId> --prompt-file <path> [--resume-id id]`prompt 仍必须来自位置参数、`--prompt-stdin``--prompt-file` 三选一,复杂多行内容优先文件或 stdin。真实 resume 成功只应返回 taskId、resumeId/turnId、`deliveryState`、是否复用原 `codexThreadId`/session、有界 trace confirmation 和后续 `codex task/detail/trace/output` 查询命令,不回显完整 prompt。
`codex resume` 只用于终态或 awaiting-closeout task。running/judging task 必须结构化失败并提示改用 `codex steer`;不存在 task、不可恢复状态或运行面不支持 follow-up turn 时必须 fail closed,不得伪装成新的 `codex submit`。同一个 `resumeId` 加同 prompt 必须 duplicate-suppressed;同一个 `resumeId` 加不同 prompt 必须 409 conflictdelivery timeout 或确认缺失时输出 `deliveryUnconfirmed` 和确认命令,指挥官先查 trace,再用同一 `resumeId` 重试。
replacement runner 只用于方向明显错误、质量不可接受、原 task 上下文不可恢复、原分支/PR 已废弃,或 resume 结构化 blocker 已证明无法继续的情况。关闭或替换旧 PR 时必须在 PR/body/final response 中说明 superseded/replacement 关系,避免 competing branch 扩散。
## 监控
指挥官必须用 task 级和 queue 级证据监控 Code Queue,不能只看单一状态字段。
@@ -265,6 +273,7 @@ bun scripts/cli.ts codex pr-preflight --remote --issue <issue-number>
- `bun scripts/cli.ts codex tasks --status succeeded --unread --limit N`:按具体终态过滤监督结果;不支持的 status filter 必须显式失败,不能扩大为未过滤结果。
- `bun scripts/cli.ts codex task <taskId>`:默认只查看原始 prompt、最终 response、最后错误和 drill-down 命令,这是完成未读任务审阅的第一步。
- 当默认审阅摘要不足时,再逐级使用 `bun scripts/cli.ts codex task <taskId> --detail``bun scripts/cli.ts codex task <taskId> --trace --limit N``codex output`
- `bun scripts/cli.ts codex resume <taskId> --prompt-file <path>`:对已终态或 awaiting-closeout 的原 task 追加后续修正 turn,适合 PR 小修、冲突、rebase、补测和 review 修正;running/judging task 改用 `codex steer`
- 当 master 控制面状态和 D601 scheduler 状态看起来分裂时,使用 `docs/reference/observability.md` 中的活性规则判断。
默认 supervisor 视图必须保持低噪声。`activeRunning.count` 是指挥官 active running 计数,来源是 queue summary 的 status counts 时 `activeRunning.exact=true`,用于 redline 判断;`activeRunning.rowPage.returned` / `running.returned` 只表示本次返回的紧凑任务行。`activeRunning.redline` 必须写明 `countField`、routine target、burst redline、hard redline、`state``decisionReady`;只有 `decisionReady=true` 时,才能直接用该 count 做红线/补派判断。`running``completedUnread``queued` 即使传入较大的 `--limit`,默认也只返回一个很小的有界页,并通过 section `commands.next` 继续分页;`--limit` 保留为扫描/分页预算和 full view 返回预算,不得让一次 supervisor 调用输出几十条肥行。每个任务行只应带 task id 和必要摘要,`show``detail``trace``output``full``read` 使用 section template 表达,让下一步渐进披露动作明确且不重复;默认不得嵌入完整 queue 列表、完整 final response、raw output 页或完整 trace 行。`recentCompleted` 必须默认限量,且不得重复 `completedUnread` 里的未读终态,避免完成历史把当前 running、阻塞和未读审阅挤出视野;需要完整当前页时显式使用 `--view full``executionDiagnostics` 只能展示有界 task-id/reason 预览、总数、截断标记和 omitted counts;需要全量诊断时使用输出中的 raw command。`commands.read` 只是在人工审阅后的建议命令,listing 命令绝不能自动执行。
+18
View File
@@ -83,6 +83,24 @@ function displayCommandName(parts: string[]): string {
}
return shown.join(" ");
}
if (parts[0] === "codex" && parts[1] === "resume" && parts[2] !== undefined) {
const shown = ["codex", "resume", parts[2]];
const shownValueOptions = new Set(["--prompt-file", "--file", "--resume-id", "--resumeId"]);
const hasPromptFile = parts.includes("--prompt-file") || parts.includes("--file");
const hasPromptStdin = parts.includes("--prompt-stdin") || parts.includes("--stdin");
const hasHelp = parts.slice(3).some(isHelpToken);
if (!hasPromptFile && !hasPromptStdin && !hasHelp) shown.push("<prompt:redacted>");
for (let index = 3; index < parts.length; index += 1) {
const part = parts[index] ?? "";
if (!part.startsWith("--")) continue;
shown.push(part);
if (shownValueOptions.has(part)) {
shown.push(parts[index + 1] ?? "<missing>");
index += 1;
}
}
return shown.join(" ");
}
if (parts[0] === "commander" && parts[1] === "approval" && parts[2] === "request") {
const shown: string[] = [];
for (let index = 0; index < parts.length; index += 1) {
+309
View File
@@ -0,0 +1,309 @@
import { spawnSync } from "node:child_process";
import { writeFileSync, unlinkSync } from "node:fs";
import { join } from "node:path";
import { tmpdir } from "node:os";
import { codexResumeTaskForTest } from "./src/code-queue";
import { findResumeTraceConfirmation, resumeDuplicateDecision, resumeTraceText } from "../src/components/microservices/code-queue/src/resume-confirmation";
import type { QueueTask } from "../src/components/microservices/code-queue/src/types";
type JsonRecord = Record<string, unknown>;
function assertCondition(condition: unknown, message: string, detail: JsonRecord = {}): void {
if (!condition) throw new Error(`${message}: ${JSON.stringify(detail)}`);
}
function nestedRecord(value: unknown, path: string[]): JsonRecord {
let current: unknown = value;
for (const key of path) {
assertCondition(current !== null && typeof current === "object" && !Array.isArray(current), "expected object while traversing JSON", { path, key, current });
current = (current as JsonRecord)[key];
}
assertCondition(current !== null && typeof current === "object" && !Array.isArray(current), "expected nested object", { path, current });
return current as JsonRecord;
}
function runCli(args: string[], stdin?: string): { status: number | null; stdout: string; stderr: string; json: JsonRecord | null } {
const result = spawnSync("bun", ["scripts/cli.ts", ...args], {
cwd: process.cwd(),
input: stdin,
encoding: "utf8",
});
const stdout = String(result.stdout || "");
let json: JsonRecord | null = null;
try {
json = JSON.parse(stdout) as JsonRecord;
} catch {
json = null;
}
return { status: result.status, stdout, stderr: String(result.stderr || ""), json };
}
function fixtureTask(): QueueTask {
const at = "2026-05-23T00:00:00.000Z";
return {
id: "codex_resume_fixture",
queueId: "default",
queueEnteredAt: at,
prompt: "base",
basePrompt: "base",
referenceTaskIds: [],
referenceInjection: null,
providerId: "D601",
cwd: "/workspace/unidesk",
model: "gpt-5.5",
reasoningEffort: null,
executionMode: "default",
maxAttempts: 99,
status: "succeeded",
createdAt: at,
updatedAt: at,
startedAt: at,
finishedAt: "2026-05-23T00:10:00.000Z",
readAt: null,
currentAttempt: 1,
currentMode: "initial",
codexThreadId: "thread_resume_fixture",
activeTurnId: null,
finalResponse: "done",
lastError: null,
lastJudge: null,
judgeFailCount: 0,
promptHistory: [],
output: [],
events: [],
attempts: [],
cancelRequested: false,
nextPrompt: null,
nextMode: null,
};
}
function deterministicResumeId(taskId: string, prompt: string): string {
return `resume_${Bun.SHA256.hash(`unidesk-code-queue-resume:v1\0${taskId}\0${prompt}`, "hex").slice(0, 24)}`;
}
function assertDryRunPrompt(response: JsonRecord, expectedText: string): void {
assertCondition(response.ok === true, "CLI dry-run should succeed", response);
const data = nestedRecord(response.data, []);
assertCondition(data.dryRun === true, "dry-run response should expose dryRun=true", data);
const request = nestedRecord(response.data, ["request"]);
assertCondition(request.method === "POST", "dry-run should expose request method", request);
assertCondition(request.path === "/api/tasks/codex_test_task/resume", "dry-run should expose resume path", request);
assertCondition(request.stableProxyPath === "/api/microservices/code-queue/proxy/api/tasks/codex_test_task/resume", "dry-run should expose stable proxy path", request);
assertCondition(request.resumeId === deterministicResumeId("codex_test_task", expectedText), "dry-run should expose deterministic resumeId", request);
const prompt = nestedRecord(response.data, ["request", "body", "prompt"]);
assertCondition(prompt.text === expectedText, "dry-run prompt text mismatch", prompt);
assertCondition(prompt.chars === expectedText.length, "dry-run prompt char count mismatch", prompt);
const commands = nestedRecord(response.data, ["commands"]);
assertCondition(String(commands.run || "").includes(`--resume-id ${String(request.resumeId)}`), "dry-run should expose same resumeId run command", commands);
}
export function runCodeQueueResumeContract(): JsonRecord {
const positional = runCli(["codex", "resume", "codex_test_task", "fix the PR conflict", "--dry-run"]);
assertDryRunPrompt(positional.json ?? {}, "fix the PR conflict");
assertCondition(String(positional.json?.command || "").includes("<prompt:redacted>"), "outer command should redact positional resume prompt", positional.json ?? {});
assertCondition(!String(positional.json?.command || "").includes("fix the PR conflict"), "outer command must not echo positional resume prompt", positional.json ?? {});
const stdin = runCli(["codex", "resume", "codex_test_task", "--prompt-stdin", "--dry-run"], "stdin resume prompt\n");
assertDryRunPrompt(stdin.json ?? {}, "stdin resume prompt\n");
const promptFile = join(tmpdir(), `unidesk-code-queue-resume-${process.pid}.txt`);
writeFileSync(promptFile, "file resume prompt", "utf8");
try {
const fromFile = runCli(["codex", "resume", "codex_test_task", "--prompt-file", promptFile, "--dry-run"]);
assertDryRunPrompt(fromFile.json ?? {}, "file resume prompt");
} finally {
unlinkSync(promptFile);
}
const duplicateSource = runCli(["codex", "resume", "codex_test_task", "positional", "--prompt-stdin", "--dry-run"], "stdin\n");
assertCondition(duplicateSource.status !== 0, "duplicate prompt source should fail", duplicateSource.json ?? { stdout: duplicateSource.stdout });
assertCondition(String(nestedRecord(duplicateSource.json, ["error"]).message || "").includes("exactly one prompt source"), "duplicate prompt source error should be explicit", duplicateSource.json ?? {});
const help = runCli(["codex", "help"]);
const usage = Array.isArray(nestedRecord(help.json?.data, []).usage) ? nestedRecord(help.json?.data, []).usage as unknown[] : [];
assertCondition(usage.some((line) => String(line).includes("codex resume <taskId>")), "codex help should list resume", { usage: usage.map(String) });
let dryRunFetchCount = 0;
const dryRunDirect = codexResumeTaskForTest("direct_task", ["do not send", "--dry-run"], () => {
dryRunFetchCount += 1;
return { ok: true, status: 200, body: { ok: true } };
});
assertCondition(dryRunFetchCount === 0, "dry-run must not call stable proxy helper", { dryRunFetchCount, dryRunDirect });
const longPrompt = `${"x".repeat(480)}-tail-secret-marker`;
const longDryRun = codexResumeTaskForTest("direct_task", [longPrompt, "--dry-run"], () => {
throw new Error("dry-run should not fetch");
}) as JsonRecord;
const longPreview = nestedRecord(longDryRun, ["request", "body", "prompt"]);
assertCondition(longPreview.truncated === true, "long dry-run prompt should be truncated", longPreview);
assertCondition(!String(longPreview.text || "").includes("tail-secret-marker"), "long dry-run must not leak prompt tail", longPreview);
let fetchPath = "";
let fetchMethod = "";
let fetchPrompt = "";
let fetchResumeId = "";
const success = codexResumeTaskForTest("direct_task", ["resume this context"], (path, init) => {
fetchPath = path;
fetchMethod = String(init?.method || "");
fetchPrompt = String((init?.body as JsonRecord | undefined)?.prompt || "");
fetchResumeId = String((init?.body as JsonRecord | undefined)?.resumeId || "");
return {
ok: true,
status: 202,
body: {
ok: true,
accepted: true,
duplicateSuppressed: false,
deliveryState: "queued_for_existing_thread",
resumeId: fetchResumeId,
turnId: 9,
reuseOriginalThread: true,
originalCodexThreadId: "thread_original",
codexThreadId: "thread_original",
traceConfirmation: {
taskId: "direct_task",
resumeId: fetchResumeId,
found: true,
accepted: true,
deliveryState: "queued_for_existing_thread",
matchCount: 1,
trace: { seq: 9, at: "2026-05-23T00:00:09.000Z", method: "turn/resume", resumeId: fetchResumeId, promptChars: 19, promptHash: "hash", promptOmitted: true, source: "output" },
duplicateSuppressionKey: fetchResumeId,
promptOmitted: true,
},
task: { id: "direct_task", status: "queued", codexThreadId: "thread_original", currentAttempt: 1, currentMode: "initial", prompt: "hidden" },
queue: { activeTaskIds: [], queuedTaskIds: ["direct_task"] },
},
};
}) as JsonRecord;
assertCondition(fetchPath === "/api/microservices/code-queue/proxy/api/tasks/direct_task/resume", "non-dry-run should use stable resume path", { fetchPath });
assertCondition(fetchMethod === "POST", "non-dry-run should POST", { fetchMethod });
assertCondition(fetchPrompt === "resume this context", "non-dry-run should send raw prompt in body", { fetchPrompt });
assertCondition(fetchResumeId === deterministicResumeId("direct_task", "resume this context"), "non-dry-run should send deterministic resumeId", { fetchResumeId });
assertCondition(nestedRecord(success, ["resume"]).accepted === true, "successful resume should report accepted=true", success);
assertCondition(nestedRecord(success, ["resume"]).reusedCodexThread === true, "successful resume should report thread reuse", success);
assertCondition(nestedRecord(success, ["resume"]).promptOmitted === true, "successful resume should mark prompt omitted", success);
assertCondition(nestedRecord(success, ["resume"]).deliveryState === "queued_for_existing_thread", "successful resume should expose delivery state", success);
assertCondition(!JSON.stringify(success).includes("resume this context"), "successful resume must not echo prompt text", success);
const explicitResumeId = "resume_manual_12345";
const duplicateSuppressed = codexResumeTaskForTest("direct_task", ["same prompt", "--resume-id", explicitResumeId], (_path, init) => {
assertCondition((init?.body as JsonRecord | undefined)?.resumeId === explicitResumeId, "explicit resumeId should be sent unchanged", (init?.body as JsonRecord | undefined) ?? {});
return {
ok: true,
status: 200,
body: {
ok: true,
accepted: true,
duplicateSuppressed: true,
deliveryState: "duplicate_suppressed",
resumeId: explicitResumeId,
reuseOriginalThread: true,
originalCodexThreadId: "thread_original",
codexThreadId: "thread_original",
traceConfirmation: {
taskId: "direct_task",
resumeId: explicitResumeId,
found: true,
accepted: true,
deliveryState: "queued_for_existing_thread",
matchCount: 1,
trace: { seq: 11, at: "2026-05-23T00:00:11.000Z", method: "turn/resume", resumeId: explicitResumeId, promptChars: 11, promptHash: "hash2", promptOmitted: true, source: "output" },
duplicateSuppressionKey: explicitResumeId,
},
task: { id: "direct_task", status: "queued", codexThreadId: "thread_original", prompt: "hidden" },
queue: { queuedTaskIds: ["direct_task"] },
},
};
}) as JsonRecord;
assertCondition(nestedRecord(duplicateSuppressed, ["resume"]).status === "duplicate_suppressed", "duplicate resume should expose suppression status", duplicateSuppressed);
assertCondition(nestedRecord(duplicateSuppressed, ["resume"]).duplicateSuppressed === true, "duplicate resume should expose duplicateSuppressed", duplicateSuppressed);
const conflictPrompt = "changed resume request requested-secret-marker";
const conflict = codexResumeTaskForTest("direct_task", [conflictPrompt, "--resume-id", explicitResumeId], () => ({
ok: false,
status: 409,
body: {
ok: false,
error: "resumeId already exists with a different prompt hash",
accepted: false,
deliveryState: "not_accepted",
resumeId: explicitResumeId,
existingPromptHash: "old",
requestedPromptHash: "new",
traceConfirmation: {
taskId: "direct_task",
resumeId: explicitResumeId,
found: true,
accepted: true,
deliveryState: "queued_for_existing_thread",
matchCount: 1,
trace: { seq: 11, at: "2026-05-23T00:00:11.000Z", method: "turn/resume", resumeId: explicitResumeId, promptChars: 11, promptHash: "old", promptOmitted: true, source: "output" },
},
task: { id: "direct_task", status: "queued", prompt: `${"hidden ".repeat(80)}task-secret-marker` },
},
})) as JsonRecord;
assertCondition(conflict.ok === false, "resumeId conflict should fail", conflict);
assertCondition(nestedRecord(conflict, ["resume"]).status === "not_accepted", "conflict should expose not_accepted", conflict);
assertCondition(!JSON.stringify(conflict).includes("requested-secret-marker"), "conflict must not echo requested prompt", conflict);
assertCondition(!JSON.stringify(conflict).includes("task-secret-marker"), "conflict must not echo full task prompt by default", conflict);
const runningRejection = codexResumeTaskForTest("running_task", ["use steer instead"], () => ({
ok: false,
status: 409,
body: {
ok: false,
error: "task is active: running",
accepted: false,
deliveryState: "not_accepted",
disposition: "use-steer-for-active-task",
resumeId: deterministicResumeId("running_task", "use steer instead"),
task: { id: "running_task", status: "running", terminalStatus: null, prompt: "hidden active task" },
},
})) as JsonRecord;
assertCondition(runningRejection.ok === false, "running task resume should fail closed", runningRejection);
assertCondition(nestedRecord(runningRejection, ["resume"]).reason === "use-steer-for-active-task", "running resume should route to steer", runningRejection);
assertCondition(String(nestedRecord(runningRejection, ["commands"]).steer || "").includes("codex steer running_task"), "running rejection should provide steer command", runningRejection);
const notFound = codexResumeTaskForTest("missing_task", ["prompt"], () => ({ ok: false, status: 404, body: { ok: false, error: "task not found" } })) as JsonRecord;
assertCondition(notFound.ok === false, "missing task resume should fail", notFound);
assertCondition(nestedRecord(notFound, ["resume"]).status === "not_accepted", "missing task should be not accepted", notFound);
const task = fixtureTask();
const resumeId = "resume_contract_12345";
const prompt = "continue the same PR";
task.output.push({ seq: 9, at: "2026-05-23T00:00:09.000Z", channel: "user", method: "turn/resume", itemId: resumeId, text: resumeTraceText(resumeId, prompt) });
const confirmation = findResumeTraceConfirmation(task, resumeId);
assertCondition(confirmation.found === true && confirmation.accepted === true, "confirmation should find resume trace by resumeId", confirmation as unknown as JsonRecord);
assertCondition(confirmation.trace?.promptChars === prompt.length, "confirmation should expose prompt chars without prompt text", (confirmation.trace ?? {}) as unknown as JsonRecord);
assertCondition(!JSON.stringify(confirmation).includes(prompt), "confirmation must not echo prompt text", confirmation as unknown as JsonRecord);
const duplicate = resumeDuplicateDecision(task, resumeId, prompt);
assertCondition(duplicate.duplicate === true && duplicate.conflict === false, "same resumeId and prompt should be duplicate-suppressed", duplicate as unknown as JsonRecord);
const localConflict = resumeDuplicateDecision(task, resumeId, "changed prompt");
assertCondition(localConflict.duplicate === false && localConflict.conflict === true, "same resumeId with changed prompt should conflict", localConflict as unknown as JsonRecord);
const newlineResumeId = "resume_contract_newline";
const newlinePrompt = "keep trailing newline\n";
task.output.push({ seq: 10, at: "2026-05-23T00:00:10.000Z", channel: "user", method: "turn/resume", itemId: newlineResumeId, text: resumeTraceText(newlineResumeId, newlinePrompt) });
const newlineDuplicate = resumeDuplicateDecision(task, newlineResumeId, newlinePrompt);
assertCondition(newlineDuplicate.duplicate === true && newlineDuplicate.conflict === false, "duplicate suppression should use exact prompt hash, including trailing newline", newlineDuplicate as unknown as JsonRecord);
return {
ok: true,
checks: [
"resume positional/stdin/file dry-runs",
"bounded disclosure and outer command redaction",
"non-dry-run sends resumeId and omits prompt from output",
"terminal resume accepted with thread reuse metadata",
"duplicate suppression and conflict behavior",
"running task fails closed with steer command",
"missing task fails closed",
"local resume trace confirmation helpers",
"exact prompt hash survives trailing newline",
],
};
}
if (import.meta.main) {
process.stdout.write(`${JSON.stringify(runCodeQueueResumeContract(), null, 2)}\n`);
}
+2
View File
@@ -370,6 +370,7 @@ export function runChecks(config: UniDeskConfig, options: CheckOptions = default
items.push(commandItem("code-queue:prompt-lint-contract", ["bun", "scripts/code-queue-prompt-lint-contract-test.ts"], 30_000));
items.push(commandItem("code-queue:cli-steer-contract", ["bun", "scripts/code-queue-cli-steer-test.ts"], 30_000));
items.push(commandItem("code-queue:steer-confirmation-contract", ["bun", "scripts/code-queue-steer-confirmation-contract-test.ts"], 30_000));
items.push(commandItem("code-queue:resume-contract", ["bun", "scripts/code-queue-resume-contract-test.ts"], 30_000));
items.push(commandItem("code-queue:read-terminal-contract", ["bun", "scripts/code-queue-cli-read-terminal-contract-test.ts"], 30_000));
items.push(commandItem("code-queue:submit-prompt-contract", ["bun", "scripts/code-queue-cli-submit-prompt-contract-test.ts"], 30_000));
items.push(commandItem("code-queue:submit-execution-mode-contract", ["bun", "scripts/code-queue-submit-execution-mode-contract-test.ts"], 30_000));
@@ -407,6 +408,7 @@ export function runChecks(config: UniDeskConfig, options: CheckOptions = default
items.push(skippedItem("code-queue:prompt-lint-contract", "Code Queue prompt live-authorization lint contract is opt-in with script checks", "--scripts-typecheck or --full"));
items.push(skippedItem("code-queue:cli-steer-contract", "Code Queue steer CLI contract is opt-in with script checks", "--scripts-typecheck or --full"));
items.push(skippedItem("code-queue:steer-confirmation-contract", "Code Queue steer delivery confirmation contract is opt-in with script checks", "--scripts-typecheck or --full"));
items.push(skippedItem("code-queue:resume-contract", "Code Queue resume CLI and delivery contract is opt-in with script checks", "--scripts-typecheck or --full"));
items.push(skippedItem("code-queue:read-terminal-contract", "Code Queue terminal read contract is opt-in with script checks", "--scripts-typecheck or --full"));
items.push(skippedItem("code-queue:submit-prompt-contract", "Code Queue submit prompt contract is opt-in with script checks", "--scripts-typecheck or --full"));
items.push(skippedItem("code-queue:submit-execution-mode-contract", "Code Queue submit execution-mode contract is opt-in with script checks", "--scripts-typecheck or --full"));
+240 -1
View File
@@ -3,6 +3,7 @@ import { runCommand } from "./command";
import { type UniDeskConfig, repoRoot, rootPath } from "./config";
import { coreInternalFetch } from "./microservices";
import { previewJson } from "./preview";
import { createResumeId, type ResumeDeliveryState } from "../../src/components/microservices/code-queue/src/resume-confirmation";
import { createSteerId, type SteerDeliveryState } from "../../src/components/microservices/code-queue/src/steer-confirmation";
import {
codeAgentPortForModel,
@@ -234,6 +235,14 @@ interface CodexSteerConfirmOptions {
raw: boolean;
}
interface CodexResumeOptions {
prompt: string;
resumeId: string | undefined;
dryRun: boolean;
full: boolean;
raw: boolean;
}
type CodexSteerFailureReason =
| "backend-core-unreachable"
| "code-queue-microservice-unregistered"
@@ -245,6 +254,8 @@ type CodexSteerFailureReason =
| "invalid-proxy-response";
type CodexSteerAcceptanceStatus = "accepted" | "not_accepted" | "accepted_response_timeout" | "unknown";
type CodexResumeAcceptanceStatus = "queued" | "duplicate_suppressed" | "not_accepted" | "accepted_response_timeout" | "unknown";
type CodexResumeDeliveryState = ResumeDeliveryState;
interface ClassifiedCodexSteerError {
reason: CodexSteerFailureReason;
@@ -468,6 +479,10 @@ function rawSteerConfirmationCommand(taskId: string, steerId: string): string {
return `bun scripts/cli.ts microservice proxy code-queue ${steerConfirmationPath(taskId, steerId)} --raw`;
}
function resumeCommand(taskId: string, resumeId: string): string {
return `bun scripts/cli.ts codex resume ${taskId} --prompt-file <path> --resume-id ${resumeId}`;
}
function nonNegativeIntegerEnv(name: string, fallback: number): number {
const raw = process.env[name];
if (raw === undefined || raw.trim().length === 0) return fallback;
@@ -2595,6 +2610,10 @@ export function codexSteerTaskForTest(taskId: string, optionArgs: string[], fetc
return codexSteerTask(taskId, optionArgs, fetcher);
}
export function codexResumeTaskForTest(taskId: string, optionArgs: string[], fetcher: CodexResponseFetcher): unknown {
return codexResumeTask(taskId, optionArgs, fetcher);
}
export function codexSteerTraceConfirmForTest(taskId: string, optionArgs: string[], fetcher: CodexResponseFetcher): unknown {
return codexSteerTraceConfirm(taskId, optionArgs, fetcher);
}
@@ -4071,6 +4090,13 @@ const steerPromptValueOptions = new Set([
"--steerId",
]);
const resumePromptValueOptions = new Set([
"--prompt-file",
"--file",
"--resume-id",
"--resumeId",
]);
function referenceTaskIdsFromOptions(args: string[]): string[] {
const values = optionValues(args, ["--reference-task-id", "--reference", "--ref"]);
const ids: string[] = [];
@@ -4207,6 +4233,22 @@ function parseSteerConfirmOptions(args: string[]): CodexSteerConfirmOptions {
return { steerId, raw: hasFlag(args, "--raw") };
}
function parseResumeOptions(args: string[]): CodexResumeOptions {
assertKnownOptions(args, {
flags: ["--prompt-stdin", "--stdin", "--dry-run", "--full", "--raw"],
valueOptions: ["--prompt-file", "--file", "--resume-id", "--resumeId"],
}, "codex resume");
const resumeId = optionValue(args, ["--resume-id", "--resumeId"]);
if (resumeId !== undefined && !/^[A-Za-z0-9._:-]{8,128}$/u.test(resumeId)) throw new Error("--resume-id must be 8-128 chars using letters, numbers, dot, underscore, colon, or dash");
return {
prompt: promptFromArgs(args, "codex resume", resumePromptValueOptions),
resumeId,
dryRun: hasFlag(args, "--dry-run"),
full: hasFlag(args, "--full") || hasFlag(args, "--raw"),
raw: hasFlag(args, "--raw"),
};
}
function parsePromptLintOptions(args: string[]): CodexPromptLintOptions {
assertKnownOptions(args, {
flags: ["--prompt-stdin", "--stdin"],
@@ -4308,6 +4350,58 @@ function compactSteerTaskConfirmation(task: unknown, steerId: string): Record<st
};
}
function compactResumeTraceConfirmation(value: unknown, taskId: string, resumeId: string): Record<string, unknown> {
const record = asRecord(value) ?? {};
const confirmation = asRecord(record.confirmation) ?? record;
const trace = asRecord(confirmation.trace);
const compactTrace = trace === null ? null : {
seq: trace.seq ?? null,
at: trace.at ?? null,
method: trace.method ?? null,
resumeId: trace.resumeId ?? resumeId,
promptChars: trace.promptChars ?? null,
promptHash: trace.promptHash ?? null,
promptOmitted: true,
source: trace.source ?? null,
};
return {
taskId: asString(confirmation.taskId) || taskId,
resumeId: asString(confirmation.resumeId) || resumeId,
found: confirmation.found === true,
accepted: confirmation.accepted === true,
deliveryState: asString(confirmation.deliveryState) || (confirmation.found === true ? "queued_for_existing_thread" : "unknown"),
matchCount: asNumber(confirmation.matchCount, 0),
trace: compactTrace,
duplicateSuppressionKey: confirmation.duplicateSuppressionKey ?? resumeId,
promptOmitted: true,
};
}
function terminalStatusFromResumeTask(task: Record<string, unknown> | null): string {
return terminalStatusFromTask(task);
}
function compactResumeTaskConfirmation(task: unknown, resumeId: string, source: Record<string, unknown>): Record<string, unknown> {
const compact = compactSubmitTaskConfirmation(task);
const commands = asRecord(compact.commands) ?? {};
const record = asRecord(task) ?? {};
return {
...compact,
currentAttempt: record.currentAttempt ?? null,
currentMode: record.currentMode ?? null,
terminalStatus: terminalStatusFromResumeTask(record) || null,
reusedCodexThreadId: source.originalCodexThreadId ?? source.codexThreadId ?? record.codexThreadId ?? null,
commands: {
...commands,
show: `bun scripts/cli.ts codex task ${asString(compact.id) || "<taskId>"}`,
detail: `bun scripts/cli.ts codex task ${asString(compact.id) || "<taskId>"} --detail`,
trace: `bun scripts/cli.ts codex task ${asString(compact.id) || "<taskId>"} --trace --tail --limit ${defaultTraceLimit}`,
output: `bun scripts/cli.ts codex output ${asString(compact.id) || "<taskId>"} --tail --limit ${defaultOutputLimit}`,
resumeAgain: resumeCommand(asString(compact.id) || "<taskId>", resumeId),
},
};
}
function orderedUniqueStringList(values: string[]): string[] {
const seen = new Set<string>();
const items: string[] = [];
@@ -6461,6 +6555,147 @@ function codexSteerTraceConfirm(taskId: string, args: string[], fetcher: CodexRe
};
}
function compactResumeRejection(taskId: string, resumeId: string, response: unknown, options: CodexResumeOptions): Record<string, unknown> {
const record = asRecord(response);
const body = responseBody(record);
const task = asRecord(body?.task);
const status = asString(task?.status);
const upstreamStatus = responseStatus(record);
const bodyError = asString(body?.error);
const disposition = asString(body?.disposition)
|| (upstreamStatus === 404 || bodyError?.toLowerCase().includes("not found") ? "task-not-found" : isActiveTaskStatus(status) ? "use-steer-for-active-task" : isTerminalTaskStatus(status) ? "resume-rejected" : "not-terminal");
const result: Record<string, unknown> = {
ok: false,
resume: {
accepted: false,
status: "not_accepted" satisfies CodexResumeAcceptanceStatus,
deliveryState: "not_accepted" satisfies CodexResumeDeliveryState,
resumeId,
reason: disposition,
taskId,
taskStatus: status || null,
terminalStatus: terminalStatusFromResumeTask(task) || null,
retryable: false,
promptOmitted: true,
},
message: asString(body?.error) || `task ${taskId} is not resumable`,
commands: {
show: `bun scripts/cli.ts codex task ${taskId}`,
detail: `bun scripts/cli.ts codex task ${taskId} --detail`,
...(disposition === "use-steer-for-active-task" ? { steer: `bun scripts/cli.ts codex steer ${taskId} --prompt-file <path>` } : {}),
supervisor: `bun scripts/cli.ts codex tasks --view supervisor --limit ${defaultTasksLimit}`,
},
upstream: {
status: upstreamStatus,
error: bodyError || null,
},
disclosure: {
defaultPolicy: "compact resume rejection; request prompt, upstream task body, and raw response require explicit --full or --raw",
full: options.full,
raw: options.raw,
defaultOmitted: ["request.body.prompt.text", "task.prompt", "task.finalResponse", "rawFailure"],
},
};
if (options.full) {
result.task = compactResumeTaskConfirmation(task, resumeId, body ?? {});
result.upstreamBodyPreview = previewJson(body ?? record, { maxDepth: 4, maxArrayItems: 8, maxObjectKeys: 24, maxStringLength: 600 });
}
if (options.raw) result.rawFailure = response;
return result;
}
function codexResumeTask(taskId: string, args: string[], fetcher: CodexResponseFetcher = coreInternalFetch): unknown {
const options = parseResumeOptions(args);
const resumeId = options.resumeId ?? createResumeId(taskId, options.prompt);
const targetPath = `/api/tasks/${encodeURIComponent(taskId)}/resume`;
const stableProxyPath = codeQueueProxyPath(targetPath);
const rawProxyEquivalent = codeQueueProxyEquivalentCommand(targetPath, `{"prompt":"...","resumeId":"${resumeId}"}`);
const prompt = textView(options.prompt, false, steerPromptPreviewChars);
const request = {
path: targetPath,
stableProxyPath,
method: "POST",
resumeId,
bodySummary: {
resumeId,
promptChars: options.prompt.length,
promptPreviewChars: steerPromptPreviewChars,
promptTruncated: prompt.truncated,
},
body: { resumeId, prompt },
contract: {
target: "terminal-or-awaiting-closeout task follow-up turn",
idempotency: "same resumeId and same prompt is duplicate-suppressed; same resumeId and different prompt is rejected as conflict",
activeTaskDisposition: "active running/judging tasks must use codex steer, not resume",
promptEchoPolicy: "default dry-run shows only bounded prompt preview; non-dry-run never echoes prompt text",
},
};
if (options.dryRun) {
return {
ok: true,
dryRun: true,
request,
commands: {
run: resumeCommand(taskId, resumeId),
show: `bun scripts/cli.ts codex task ${taskId}`,
supervisor: `bun scripts/cli.ts codex tasks --view supervisor --limit ${defaultTasksLimit}`,
rawProxy: rawProxyEquivalent,
},
};
}
const rawResponse = fetcher(stableProxyPath, { method: "POST", body: { prompt: options.prompt, resumeId } });
const record = asRecord(rawResponse);
const body = responseBody(record);
if (record?.ok !== true || body?.ok !== true) {
return compactResumeRejection(taskId, resumeId, rawResponse, options);
}
const responseResumeId = asString(body.resumeId) || resumeId;
const duplicateSuppressed = body.duplicateSuppressed === true;
const traceConfirmation = compactResumeTraceConfirmation(body.traceConfirmation, taskId, responseResumeId);
const deliveryState = asString(body.deliveryState) || asString(traceConfirmation.deliveryState) || (duplicateSuppressed ? "duplicate_suppressed" : "queued_for_existing_thread");
const acceptanceStatus: CodexResumeAcceptanceStatus = duplicateSuppressed ? "duplicate_suppressed" : deliveryState === "accepted_response_timeout" ? "accepted_response_timeout" : "queued";
const traceRecord = asRecord(traceConfirmation.trace);
return {
ok: true,
upstream: { ok: record.ok, status: record.status },
resume: {
accepted: true,
status: acceptanceStatus,
deliveryState,
resumeId: responseResumeId,
turnId: body.turnId ?? traceRecord?.seq ?? null,
taskId,
promptChars: options.prompt.length,
promptOmitted: true,
duplicateSuppressed,
duplicateSuppressionKey: responseResumeId,
reusedCodexThread: body.reuseOriginalThread === true,
originalCodexThreadId: body.originalCodexThreadId ?? null,
codexThreadId: body.codexThreadId ?? null,
deliveryUnconfirmed: deliveryState === "accepted_response_timeout",
outputPolicy: {
default: "write-confirmation",
promptEchoed: false,
taskDetailEchoed: false,
reason: "codex resume is a write operation; default output confirms queued delivery and provides drill-down commands without echoing prompt text or full task state.",
},
},
traceConfirmation,
task: compactResumeTaskConfirmation(body.task, responseResumeId, body),
queue: compactSubmitQueueConfirmation(body.queue),
commands: {
show: `bun scripts/cli.ts codex task ${taskId}`,
detail: `bun scripts/cli.ts codex task ${taskId} --detail`,
trace: `bun scripts/cli.ts codex task ${taskId} --trace --tail --limit ${defaultTraceLimit}`,
output: `bun scripts/cli.ts codex output ${taskId} --tail --limit ${defaultOutputLimit}`,
retrySameResumeId: resumeCommand(taskId, responseResumeId),
supervisor: `bun scripts/cli.ts codex tasks --view supervisor --limit ${defaultTasksLimit}`,
confirmDelivery: `bun scripts/cli.ts codex task ${taskId} --trace --tail --limit ${defaultTraceLimit}`,
...(deliveryState === "accepted_response_timeout" ? { deliveryUnconfirmed: `bun scripts/cli.ts codex task ${taskId} --trace --tail --limit ${defaultTraceLimit}` } : {}),
},
};
}
export async function runCodeQueueCommand(config: UniDeskConfig, args: string[]): Promise<unknown> {
const [action = "task", taskIdArg] = args;
if (action === "prompt-lint" || action === "lint-prompt") {
@@ -6519,9 +6754,13 @@ export async function runCodeQueueCommand(config: UniDeskConfig, args: string[])
const taskId = requireTaskId(taskIdArg, "codex steer");
return codexSteerTask(taskId, args.slice(2));
}
if (action === "resume") {
const taskId = requireTaskId(taskIdArg, "codex resume");
return codexResumeTask(taskId, args.slice(2));
}
if (action === "steer-confirm" || action === "steer-confirmation") {
const taskId = requireTaskId(taskIdArg, `codex ${action}`);
return codexSteerTraceConfirm(taskId, args.slice(2));
}
throw new Error("codex command must be one of: prompt-lint, submit, enqueue, task, summary, show, tasks, overview, unread, terminal-unread, output, judge, read, mark-read, dev-ready, health, skills-sync, pr-preflight, runtime-preflight, queues, queue list, queue create, queue merge, move, steer, steer-confirm, interrupt, cancel");
throw new Error("codex command must be one of: prompt-lint, submit, enqueue, task, summary, show, tasks, overview, unread, terminal-unread, output, judge, read, mark-read, dev-ready, health, skills-sync, pr-preflight, runtime-preflight, queues, queue list, queue create, queue merge, move, steer, resume, steer-confirm, interrupt, cancel");
}
+3 -1
View File
@@ -64,6 +64,7 @@ export function rootHelp(): unknown {
{ command: "codex dev-ready", description: "Fetch execution-container readiness, including sanitized skill injection status from /api/dev-ready." },
{ command: "codex judge <taskId> --attempt N [--dry-run] [--include-prompt]", description: "Replay one stored Code Queue attempt through the same judge context builder and MiniMax judge call path used by the live queue worker." },
{ command: "codex steer <taskId> [prompt|--prompt-file path|--prompt-stdin] [--steer-id id] [--dry-run] [--no-retry|--retry-attempts N] [--full|--raw]", description: "Push a corrective prompt into a running Code Queue task with a steerId/idempotency key; retryable tunnel aborts get bounded trace confirmation before any retry guidance." },
{ command: "codex resume <taskId> [prompt|--prompt-file path|--prompt-stdin] [--resume-id id] [--dry-run] [--full|--raw]", description: "Queue a follow-up turn on a terminal Code Queue task, preserving the original task/thread/PR context and suppressing duplicate resumeId delivery without echoing the prompt." },
{ command: "codex steer-confirm <taskId> --steer-id <id> [--raw]", description: "Read-only lookup for a steerId in task trace so deliveryUnconfirmed can be resolved without resending the corrective prompt." },
{ command: "codex interrupt|cancel <taskId>", description: "Request interrupt for a running Code Queue task, or cancel a queued/retry_wait task, through the same private proxy." },
{ command: "codex (queues [--full|--all] | queue create <queueId> | queue merge <sourceQueueId> --into <targetQueueId> | move <taskId> --queue <queueId>)", description: "List low-noise queue summaries by default, including effective activity counts that distinguish scheduler-local queues, DB running tasks, and heartbeat-fresh runners; full queue rows require --full/--all." },
@@ -249,7 +250,7 @@ function scheduleHelp(): unknown {
function codexHelp(): unknown {
return {
command: "codex deploy|prompt-lint|submit|task|tasks|unread|output|read|dev-ready|skills-sync|pr-preflight|judge|steer|interrupt|cancel|queues|queue|move",
command: "codex deploy|prompt-lint|submit|task|tasks|unread|output|read|dev-ready|skills-sync|pr-preflight|judge|steer|resume|interrupt|cancel|queues|queue|move",
output: "json",
usage: [
"bun scripts/cli.ts codex deploy <commitId> # disabled legacy deployment entry",
@@ -268,6 +269,7 @@ function codexHelp(): unknown {
"bun scripts/cli.ts codex pr-preflight [--remote] [--push-dry-run --push-dry-run-ref refs/heads/probe/<name>] [--pr-create-dry-run --pr-create-dry-run-head <head>] [--issue N] [--full|--raw]",
"bun scripts/cli.ts codex judge <taskId> --attempt N [--dry-run] [--include-prompt]",
"bun scripts/cli.ts codex steer <taskId> [prompt|--prompt-file path|--prompt-stdin] [--steer-id id] [--dry-run] [--no-retry|--retry-attempts N] [--full|--raw]",
"bun scripts/cli.ts codex resume <taskId> [prompt|--prompt-file path|--prompt-stdin] [--resume-id id] [--dry-run] [--full|--raw]",
"bun scripts/cli.ts codex steer-confirm <taskId> --steer-id <id> [--raw]",
"bun scripts/cli.ts codex interrupt|cancel <taskId>",
"bun scripts/cli.ts codex queues [--full|--all] [--limit N] [--page N|--offset N] | queue create <queueId> | queue merge <sourceQueueId> --into <targetQueueId> | move <taskId> --queue <queueId>",
@@ -402,7 +402,7 @@ function codeQueueK3sServiceIdForRequest(method: string, targetPath: string): st
if (targetPath === "/api/queues" || targetPath === "/api/tasks/overview") return "code-queue-scheduler";
if (targetPath === "/api/oa/backfill" || targetPath === "/api/notifications/claudeqq/drain" || targetPath === "/api/notifications/claudeqq/backfill") return "code-queue-scheduler";
if (targetPath === "/api/judge/probe" || targetPath === "/api/judge/self-test" || targetPath === "/api/queue-order/self-test" || targetPath === "/api/reference-injection/self-test" || targetPath === "/api/trace-port/self-test") return "code-queue-scheduler";
if (/^\/api\/tasks\/[^/]+\/(?:steer|interrupt)$/u.test(targetPath)) return "code-queue-scheduler";
if (/^\/api\/tasks\/[^/]+\/(?:steer|resume|interrupt)$/u.test(targetPath)) return "code-queue-scheduler";
if (/^\/api\/tasks\/[^/]+$/u.test(targetPath) && normalizedMethod === "DELETE") return "code-queue-scheduler";
if (targetPath === "/api/dev-containers" || /^\/api\/dev-containers(?:\/[^/]+)?\/start$/u.test(targetPath)) return "code-queue-scheduler";
if (/^\/api\/dev-containers(?:\/[^/]+)?\/status$/u.test(targetPath)) return "code-queue-scheduler";
@@ -137,6 +137,7 @@ import {
import { collectRuntimePreflight, runtimePreflightJson } from "./runtime-preflight";
import { collectSkillAvailability, collectSkillSyncPreflight, skillAvailabilityJson, skillSyncPreflightJson } from "./skill-availability";
import { createSteerId, findSteerTraceConfirmation, normalizeSteerId, normalizeSteerPromptText, steerDuplicateDecision, steerPromptHash, steerTraceConfirmationJson, steerTraceText } from "./steer-confirmation";
import { createResumeId, findResumeTraceConfirmation, normalizeResumeId, resumeDuplicateDecision, resumePromptHash, resumeTraceConfirmationJson, resumeTraceText } from "./resume-confirmation";
import { configureSelfTests, runJudgeInfraSelfTest, runQueueClaimMoveSelfTest, runQueueOrderingSelfTest, runReferenceInjectionSelfTest, runStaleActiveRecoverySelfTest, runTracePortSelfTest, runTraceSummaryContractSelfTest } from "./self-tests";
import {
codexToolLifecycleStartedBeforeIn,
@@ -4873,6 +4874,107 @@ async function steerTask(task: QueueTask, req: Request): Promise<Response> {
});
}
async function resumeTask(task: QueueTask, req: Request): Promise<Response> {
if (!serviceRoleAllowsScheduler(config.serviceRole)) return schedulerOnlyRejectResponse(req.method, `/api/tasks/${task.id}/resume`);
const notReady = requireDatabaseReadyForWrite(req.method, `/api/tasks/${task.id}/resume`);
if (notReady !== null) return notReady;
const body = await readJson(req);
const bodyRecord = typeof body === "object" && body !== null && !Array.isArray(body) ? body as Record<string, unknown> : {};
const prompt = typeof bodyRecord.prompt === "string" ? String(bodyRecord.prompt) : "";
if (prompt.trim().length === 0) return jsonResponse({ ok: false, error: "prompt is required" }, 400);
const resumeId = normalizeResumeId(bodyRecord.resumeId) ?? createResumeId(task.id, prompt);
const originalCodexThreadId = task.codexThreadId;
const duplicateDecision = resumeDuplicateDecision(task, resumeId, prompt, taskFullOutput(task));
if (duplicateDecision.duplicate) {
await flushDirtyTasksToDatabase(true);
return jsonResponse({
ok: true,
accepted: true,
duplicateSuppressed: true,
deliveryState: "duplicate_suppressed",
resumeId,
reuseOriginalThread: originalCodexThreadId !== null,
originalCodexThreadId,
codexThreadId: task.codexThreadId,
traceConfirmation: resumeTraceConfirmationJson(duplicateDecision.confirmation),
task: taskForResponse(task),
queue: await queueSummaryForResponse(),
});
}
if (duplicateDecision.conflict) {
return jsonResponse({
ok: false,
error: "resumeId already exists with a different prompt hash",
accepted: false,
deliveryState: "not_accepted",
resumeId,
existingPromptHash: duplicateDecision.existingPromptHash,
requestedPromptHash: duplicateDecision.requestedPromptHash,
traceConfirmation: resumeTraceConfirmationJson(duplicateDecision.confirmation),
task: taskForResponse(task),
}, 409);
}
if (task.status === "running" || task.status === "judging") {
return jsonResponse({
ok: false,
error: `task is active: ${task.status}`,
accepted: false,
deliveryState: "not_accepted",
disposition: "use-steer-for-active-task",
resumeId,
task: taskForResponse(task),
commands: {
steer: `bun scripts/cli.ts codex steer ${task.id} --prompt-file <path>`,
show: `bun scripts/cli.ts codex task ${task.id}`,
},
}, 409);
}
if (task.status !== "failed" && task.status !== "canceled" && task.status !== "succeeded") {
return jsonResponse({
ok: false,
error: `task is not resumable: ${task.status}`,
accepted: false,
deliveryState: "not_accepted",
disposition: "not-terminal",
resumeId,
task: taskForResponse(task),
}, 409);
}
const traceOutput = appendOutput(task, "user", resumeTraceText(resumeId, prompt), "turn/resume", resumeId);
task.status = "queued";
task.finishedAt = null;
task.readAt = null;
task.cancelRequested = false;
task.lastError = null;
task.maxAttempts = Math.max(task.maxAttempts, task.attempts.length + 1);
task.nextMode = "retry";
task.nextPrompt = prompt;
setAttemptFeedbackPrompt(task.attempts.at(-1), task.nextPrompt, "manual-resume", task.attempts.length + 1);
task.updatedAt = nowIso();
task.queueEnteredAt = task.updatedAt;
appendOutput(task, "system", `resume queued resumeId=${resumeId} reuseOriginalThread=${originalCodexThreadId !== null ? "true" : "false"}\n`, "manual-resume", resumeId);
armIdleNotification();
persistState();
scheduleQueue(queueIdOf(task));
await flushDirtyTasksToDatabase(true);
const traceConfirmation = findResumeTraceConfirmation(task, resumeId, taskFullOutput(task));
return jsonResponse({
ok: true,
accepted: true,
duplicateSuppressed: false,
deliveryState: traceConfirmation.found ? "queued_for_existing_thread" : "accepted_response_timeout",
resumeId,
turnId: traceOutput?.seq ?? null,
promptHash: resumePromptHash(prompt),
reuseOriginalThread: originalCodexThreadId !== null,
originalCodexThreadId,
codexThreadId: task.codexThreadId,
traceConfirmation: resumeTraceConfirmationJson(traceConfirmation),
task: taskForResponse(task),
queue: await queueSummaryForResponse(),
}, 202);
}
async function editQueuedTaskPrompt(task: QueueTask, req: Request): Promise<Response> {
if (!serviceRoleAllowsWrite(config.serviceRole)) return readOnlyRejectResponse(req.method, `/api/tasks/${task.id}/edit`);
const notReady = requireDatabaseReadyForWrite(req.method, `/api/tasks/${task.id}/edit`);
@@ -5797,7 +5899,7 @@ async function route(req: Request): Promise<Response> {
if (task === null) return jsonResponse({ ok: false, error: "task not found" }, 404);
return jsonResponse({ ok: true, summary: taskSummaryResponse(task, url) });
}
const match = url.pathname.match(/^\/api\/tasks\/([^/]+)(?:\/(retry|steer|interrupt|move|read|edit))?$/u);
const match = url.pathname.match(/^\/api\/tasks\/([^/]+)(?:\/(retry|resume|steer|interrupt|move|read|edit))?$/u);
if (match !== null) {
const action = match[2];
const taskId = decodeURIComponent(match[1] ?? "");
@@ -5812,6 +5914,7 @@ async function route(req: Request): Promise<Response> {
: await findTaskForMutation(taskId);
if (task === null) return jsonResponse({ ok: false, error: "task not found" }, 404);
if (action === "retry" && req.method === "POST") return await manualRetry(task, req);
if (action === "resume" && req.method === "POST") return await resumeTask(task, req);
if (action === "steer" && req.method === "POST") return await steerTask(task, req);
if (action === "interrupt" && req.method === "POST") return await interruptTask(task, req.method);
if (action === "move" && req.method === "POST") return await moveTaskToQueue(task, req);
@@ -0,0 +1,142 @@
import { createHash } from "node:crypto";
import type { JsonValue, LiveOutput, QueueTask } from "./types";
const resumeIdPattern = /^[A-Za-z0-9._:-]{8,128}$/u;
export type ResumeDeliveryState = "queued_for_existing_thread" | "duplicate_suppressed" | "not_accepted" | "accepted_response_timeout" | "unknown";
export interface ResumeTraceMatch {
seq: number;
at: string;
method: "turn/resume";
resumeId: string;
promptChars: number;
promptHash: string;
promptOmitted: true;
source: "output";
}
export interface ResumeTraceConfirmation {
taskId: string;
resumeId: string;
found: boolean;
accepted: boolean;
deliveryState: ResumeDeliveryState;
trace: ResumeTraceMatch | null;
matches: ResumeTraceMatch[];
duplicateSuppressionKey: string;
}
export interface ResumeDuplicateDecision {
duplicate: boolean;
conflict: boolean;
confirmation: ResumeTraceConfirmation;
existingPromptHash: string | null;
requestedPromptHash: string;
}
export function normalizeResumeId(value: unknown): string | null {
if (typeof value !== "string") return null;
const text = value.trim();
if (!resumeIdPattern.test(text)) return null;
return text;
}
export function resumePromptHash(prompt: string): string {
return createHash("sha256").update(prompt, "utf8").digest("hex");
}
export function createResumeId(taskId: string, prompt: string): string {
const hash = createHash("sha256")
.update("unidesk-code-queue-resume:v1", "utf8")
.update("\0", "utf8")
.update(taskId, "utf8")
.update("\0", "utf8")
.update(prompt, "utf8")
.digest("hex")
.slice(0, 24);
return `resume_${hash}`;
}
export function resumeTraceText(resumeId: string, prompt: string): string {
return `\n[resume id=${resumeId} chars=${prompt.length} sha256=${resumePromptHash(prompt)}] ${prompt}\n`;
}
export function normalizeResumePromptText(text: string): string {
return text.replace(/^\s*\[resume(?:\s+[^\]]*)?\]\s*/u, "").trimEnd();
}
function resumeTraceMeta(text: string): { promptChars: number | null; promptHash: string | null } {
const match = text.match(/^\s*\[resume(?:\s+[^\]]*)?\]\s*/u);
if (match === null) return { promptChars: null, promptHash: null };
const header = match[0];
const charsMatch = header.match(/\schars=(\d+)(?=\s|\])/u);
const hashMatch = header.match(/\ssha256=([a-f0-9]{64})(?=\s|\])/u);
return {
promptChars: charsMatch === null ? null : Number(charsMatch[1]),
promptHash: hashMatch?.[1] ?? null,
};
}
function outputMatch(item: LiveOutput, resumeId: string): ResumeTraceMatch | null {
if (item.channel !== "user" || item.method !== "turn/resume" || normalizeResumeId(item.itemId) !== resumeId) return null;
const prompt = normalizeResumePromptText(item.text);
const meta = resumeTraceMeta(item.text);
return {
seq: item.seq,
at: item.at,
method: "turn/resume",
resumeId,
promptChars: meta.promptChars ?? prompt.length,
promptHash: meta.promptHash ?? resumePromptHash(prompt),
promptOmitted: true,
source: "output",
};
}
export function findResumeTraceConfirmation(task: QueueTask, resumeId: string, output: LiveOutput[] = task.output): ResumeTraceConfirmation {
const matches = output
.map((item) => outputMatch(item, resumeId))
.filter((item): item is ResumeTraceMatch => item !== null)
.sort((left, right) => left.seq - right.seq);
const trace = matches[0] ?? null;
return {
taskId: task.id,
resumeId,
found: trace !== null,
accepted: trace !== null,
deliveryState: trace !== null ? "queued_for_existing_thread" : "unknown",
trace,
matches,
duplicateSuppressionKey: resumeId,
};
}
export function resumeDuplicateDecision(task: QueueTask, resumeId: string, prompt: string, output: LiveOutput[] = task.output): ResumeDuplicateDecision {
const confirmation = findResumeTraceConfirmation(task, resumeId, output);
const requestedPromptHash = resumePromptHash(prompt);
const existingPromptHash = confirmation.trace?.promptHash ?? null;
return {
duplicate: confirmation.found && existingPromptHash === requestedPromptHash,
conflict: confirmation.found && existingPromptHash !== requestedPromptHash,
confirmation,
existingPromptHash,
requestedPromptHash,
};
}
export function resumeTraceConfirmationJson(confirmation: ResumeTraceConfirmation): JsonValue {
return {
taskId: confirmation.taskId,
resumeId: confirmation.resumeId,
found: confirmation.found,
accepted: confirmation.accepted,
deliveryState: confirmation.deliveryState,
trace: confirmation.trace as unknown as JsonValue,
matchCount: confirmation.matches.length,
matches: confirmation.matches.slice(0, 5) as unknown as JsonValue,
matchesTruncated: confirmation.matches.length > 5,
duplicateSuppressionKey: confirmation.duplicateSuppressionKey,
promptOmitted: true,
};
}
@@ -1157,6 +1157,7 @@ function buildTaskTranscript(task: QueueTask, limit = 180, rawOutputWindow = 0,
for (const item of outputItems) {
if (initialPrompt !== null && item.channel === "user" && item.method === "enqueue") continue;
if (item.channel === "user" && item.method === "turn/steer" && promptHistorySeqs.has(item.seq)) continue;
if (item.channel === "user" && item.method === "turn/resume") continue;
if (isOpenCodeStepBoundaryMethod(item.method)) continue;
if (item.channel === "command" && item.method === "item/started") {
flushMessage();