diff --git a/docs/reference/cli.md b/docs/reference/cli.md index 47aac4dd..6200a732 100644 --- a/docs/reference/cli.md +++ b/docs/reference/cli.md @@ -47,6 +47,7 @@ CLI 可以从 `master` 快速演进,但必须兼容 `deploy.json` 固定的 CI - `codex deploy ` 是旧 Code Queue 兼容部署入口,已禁用以防止维护通道直连 D601 部署 Code Queue;当前 dev 自动化只做 `ci run-dev-e2e` smoke,不提供 Code Queue CD,详细规则见 `docs/reference/codex-deploy.md`。 - `codex submit [prompt] [--prompt-file path|--prompt-stdin] [--queue queueId] [--provider-id id] [--cwd path] [--model model] [--reasoning-effort effort] [--execution-mode mode] [--max-attempts N] [--reference-task-id id] [--dry-run]` 通过 backend-core 私有代理向稳定 `code-queue` 用户服务路径提交任务;prompt 必须且只能来自位置参数、文件或 stdin 之一,`--dry-run` 只返回结构化请求且不实际入队。长 prompt、多行 prompt、含引号/反引号/Markdown 表格/JSON/反斜杠的 prompt 必须优先用 `--prompt-stdin` 或 `--prompt-file`,不要拼进 shell 单个参数;位置参数只适合短单行 smoke prompt。stdin 推荐用 quoted heredoc:`cat <<'PROMPT' | bun scripts/cli.ts codex submit --prompt-stdin --queue --dry-run`,文件路径推荐 `bun scripts/cli.ts codex submit --prompt-file /tmp/code-queue-prompt.md --queue --dry-run`,确认 dry-run 后移除 `--dry-run` 提交同一 payload。dry-run 会额外输出 `routingRecommendation`,包含推荐 route、runner、model、风险信号、prompt 自包含/issue 非唯一来源/prod-secret-DB 禁止/运行态或 release 禁止/证据要求/中等复杂度候选等 guard 状态;同时输出 `policyContract`,固定暴露 GPT-5.5、DeepSeek、MiniMax 的风险分层、并发上限和外部 provider 429 退避处置。该建议只用于指挥官 preflight,不会改写 payload,不改变 runtime admission,也不假设生产 MiniMax 或 DeepSeek 可用。`--dry-run` 必须返回完整 prompt、字符数和 `truncated=false` 用于人工验收;真实提交是写入操作,默认只返回 `accepted=true`、task id、队列、写入保护摘要和后续查看命令,必须标记 `promptOmitted=true` 且不得回显 prompt 或 promptPreview。真实提交会经过本机本地串行化保护和短节流,避免同一指挥端并发 submit 把低内存主机或 `code-queue-mgr` 控制面打抖;返回值会附带 `executionMode`、`runnerPermissions` 和低噪声 `submitConcurrencyGuard`,显式说明 requested/effective mode、服务级 runner sandbox/approvalPolicy、锁与等待信息。`--execution-mode` 是 Code Queue runtime placement,不是 Codex sandbox 权限;有效模式是 `default` 和 `windows-native`,`--execution-mode full-access` 等 sandbox-like 值会保留 requested 值并显示 effective `default`,同时提示当前不支持每任务 sandbox override。真实提交的 `queue` 摘要保持低噪声:`submittedTaskIds`、`queuedTaskIds`、`activeTaskIds` 和 `databaseActiveTaskIds` 是有界预览对象,`countContext` 与 `counts` 是权威计数;`submitted.taskStates[]` 直接给出本次 task id、queue id、status 和 `state=queued|running|terminal|unknown`,其来源固定为 `response.tasks[].status`。当本次新任务仍是 queued/retry_wait,`queuedTaskIds.items` 必须包含该 id;当 counts 非零但 active/queued id 列表因为 split-brain-live、上游省略或默认有界披露而不可枚举时,预览必须设置 `idsUnavailable=true`、`itemsOmitted=true` 和 `itemsMeaning=not-enumerated-in-default-submit-output`,不得打印容易误读的 `items=[]`。`queue.activity.effectiveActiveTaskCount` 和 `queue.commanderConcurrency.activeRunnerCount` 是并发判断字段;`splitBrainLive=true` 时继续把 fresh heartbeat/database active 计入 active。需要原始 drill-down 时使用 `queue.listPreviewPolicy.rawCommand`,默认是 `bun scripts/cli.ts microservice proxy code-queue /api/tasks/overview?limit=30 --raw --full`。backend-core 默认把提交、队列 CRUD、已读状态、历史摘要和轻量 Trace 读取分流到主 server `code-queue-mgr`,由它写入主 PostgreSQL;D601 scheduler 只轮询并执行已入库任务。 - `codex steer [prompt|--prompt-file path|--prompt-stdin] [--steer-id id] [--dry-run] [--no-retry|--retry-attempts N] [--full|--raw]` 向运行中的 Code Queue 任务发送纠偏 prompt。CLI 会为同一 task/prompt 生成稳定 `steerId`,也允许显式传入 `--steer-id`;所有 retry 都复用同一 `steerId`,支持后端按 key 抑制重复 trace 注入。真实成功只返回低噪声写入确认,不回显 prompt 或完整任务状态;输出包含 `steer.status`、`steer.deliveryState`、`steer.steerId`、`traceConfirmation` 和 `commands.traceConfirm`。失败默认只返回 `accepted=false`、原因、scope、retryable、attempt 摘要、operator guidance 和 task/read/submit/health drill-down 命令。`upstreamBodyPreview`、request 元数据和 raw upstream failure 必须显式加 `--full` 或 `--raw` 才输出。任务已终态时返回紧凑 `task-already-terminal`、`status=not_accepted`、`deliveryState=not_accepted`、task 状态、终态状态、更新时间、`retryable=false` 和 `codex task` / `codex read` / `codex submit --reference-task-id ` 后续命令。 +- `codex resume [prompt|--prompt-file path|--prompt-stdin] [--resume-id id] [--dry-run] [--full|--raw]` 对已终态或 awaiting-closeout 的原 Code Queue task 创建后续 turn,优先用于 PR 小修、冲突、rebase、补测和 reviewer feedback,保留原 task、attempt、branch/PR 上下文和 `codexThreadId`/OpenCode session。CLI 会为同一 task/prompt 生成稳定 `resumeId`,也允许显式传入;同一 `resumeId` 加同 prompt 返回 `duplicate_suppressed` 且不重复注入,同一 `resumeId` 加不同 prompt 返回 409 conflict。真实成功只返回 taskId、resumeId/turnId、`deliveryState`、是否复用原 `codexThreadId`、有界 trace confirmation 和 `codex task/detail/trace/output` 后续命令,不回显 prompt 或完整 task state。running/judging task 必须 fail closed 并给出 `disposition=use-steer-for-active-task` 与 `codex steer` 命令,不把 resume 伪装成新 task;不存在 task 返回结构化 not accepted。若 delivery timeout 或 trace 未确认,输出 `deliveryUnconfirmed` 和确认命令,调用方先查 `codex task --trace` 再用同一 `resumeId` 重试。 - `codex pr-preflight [--remote] [--push-dry-run --push-dry-run-ref refs/heads/probe/] [--pr-create-dry-run --pr-create-dry-run-head ] [--issue N] [--full|--raw]` 通过稳定 `code-queue` proxy 请求 D601 scheduler `/api/runtime-preflight`,用于 PR 型派单 admission。默认输出是紧凑 commander 视图,显式分出 `schedulerPreflight` 与 `activeRunnerPrCapability`,并附带 `commands` 和 `disclosure`,方便先看 scheduler auth 缺口、再看当前 runner/dev container 的 `gh auth status` 与 `gh pr create --dry-run` 能力;`--full` 或 `--raw` 才展开完整 `preflight`、工具、agent port、Git worktree、GitHub egress、repo/issue/PR 只读探测和观测原文。只报告 `GH_TOKEN`/`GITHUB_TOKEN` 是否存在和来源 key,不打印值。当 auth-broker 配置存在时,`tokenCoverage.source="auth-broker"`、`credentialSource="broker-issued-token"` 且 runner env token 不是成功前提;当仅 env token 存在时,`credentialSource="env-token"` 且 `authBroker.nextAction="use-env-token-until-auth-broker-live"`;两者都缺失时顶层 `ok=false`、`runnerDisposition=infra-blocked`、`degradedReason=auth-broker-needed`,`tokenCoverage.missing` 同时列出 `GH_TOKEN` 与 `GITHUB_TOKEN`,并输出 `authBroker.source="broker/auth-broker-needed"`、`capability.source="missing-token"`。该 `auth-missing` 的 scope 是 `scheduler-runner-env`,不能简化成“当前 active runner/dev container 不能创建 PR”;默认视图必须带 `scopeBoundary` 和 `activeRunnerPrCapability`。GitHub DNS/API 连接失败应归类为 `failureKind=github-transient`、`degradedReason=github-dns-api-transient`,并带 `retryable=true`、`commanderAction=retry-backoff-or-keep-running-if-heartbeat-fresh` 和有界 `githubTransient.failedProbes`;调用方应重试/退避,且在任务 heartbeat/trace 新鲜时继续监督,不把它当成 auth 缺失或 PR 语义失败。`prCapability` 是 runner-facing 合同摘要,必须包含目标分支、token/auth 来源、`systemGhBinaryRequiredForWrites=false`、UniDesk REST `bun scripts/cli.ts gh` 可用性、push dry-run/PR create dry-run 的 `writesRemote=false`、expected PR handoff、真实 PR 创建需要 commander 授权和 `gh pr merge` 的 `unsupported-command` 边界;系统 `gh` binary 缺失只进入 `tools.systemGhBinary`,不得误判为 UniDesk REST `gh` CLI 不可用。`--remote` 在 runner-like 环境里不再依赖本地 `unidesk-backend-core`、`unidesk-database`、`baidu-netdisk-backend` 容器存在;这些缺失只作为本地观测证据。若远程控制面可达,则继续走远程控制面结果;若远程控制面不可达,则结构化返回 `failureKind=control-plane-missing` / `degradedReason=remote-control-plane-unreachable`,而不是把本地 `backend-core-container-missing` 当作最终阻塞。`--pr-create-dry-run` 不 POST GitHub,只证明 runner 内 PR body 生成、`scripts/cli.ts gh pr create --dry-run` 和 branch 参数形态可用;服务端创建权限仍以 token/auth broker、repo/issue/PR read、push dry-run 和最终授权后的真实 PR 创建结果为准。 - `codex task ` 通过 Code Queue 私有代理按任务 ID 查询结构化审阅摘要;默认只返回任务身份、执行 Provider、工作目录、attempt 计数、原始 prompt、最终 response、最后错误和渐进披露命令,适合指挥官审阅完成未读任务且避免上下文爆炸。`--detail` 仍是有界详细摘要:默认只返回少量 attempt/tool 行、短 prompt/response/stderr/feedback 预览和 omitted/truncated 元数据;需要完整 prompt/response 文本或更多 tool/attempt 细节时再显式加 `--full`、`--tool-limit N`、`--trace` 或 `codex output`。该摘要读取默认由主 server `code-queue-mgr` 从 PostgreSQL 返回,不依赖 D601 `code-queue-read` Service 可用。 - `codex tasks [--view supervisor|full] [--queue id] [--status succeeded|running|queued|failed|canceled|judging|retry_wait[,..]] [--unread|--unread-only] [--limit N] [--before-id id]` 通过同一私有代理输出渐进式披露视图。默认 `supervisor` 是低噪声指挥官视图,只返回 `activeRunning`、`running`、`completedUnread`、`recentCompleted`、`queued`、`activity`、`commanderConcurrency` 和 `executionDiagnostics` 的紧凑行;`activeRunning.count` 是 running+judging 的状态计数,`exact=true` 时来自 queue summary counts,`running.returned` 和 `activeRunning.rowPage.returned` 只是本次返回的紧凑行数。`commanderConcurrency.activeRunnerCount` 是并发策略应使用的 active/running 计数,等于 `activity.effectiveActiveTaskCount`;15 并发策略按 `15 - activeRunnerCount` 计算剩余窗口。`commanderConcurrency.splitBrainDisposition=live-count-as-active` 表示 split-brain 有 fresh heartbeat 证据,应继续监督并计入 active;`interventionRequired=true` 才提示介入。prompt/body 只给短预览和原始字符数,`running`/`completedUnread`/`queued` 默认只返回一个有界小页并通过 section `commands.next` 继续分页,`recentCompleted` 默认限量且不重复 `completedUnread` 未读终态,不嵌入完整 Trace、final response 或全量 overview。`--limit` 在 supervisor 中主要是扫描/分页预算,不是返回几十条肥行的开关;CLI 安全上限是 100,输出会在 `filters.requestedLimit`、`filters.effectiveLimit`、`filters.limitCapped` 和 `disclosure.limitPolicy` 说明显式请求是否被 capped;底层 overview 拉取预算独立显示在 `source.requestedLimit` / `source.effectiveLimit`,所以 `--limit 260` 应显示 requested=260、effective=100、source requested/effective=200,而不是只露出一个含糊的 `limit`。`--unread` 是 `--unread-only` 的别名,必须只保留未读终态;`--status` 必须真实过滤支持的状态,未知参数或未知状态必须结构化失败。需要更详细当前页任务行时显式使用 `--view full` 或 `--full`,仍受 `--limit` 和 `--before-id` 分页约束。 diff --git a/docs/reference/code-queue-supervision.md b/docs/reference/code-queue-supervision.md index 06d797f1..833fe25d 100644 --- a/docs/reference/code-queue-supervision.md +++ b/docs/reference/code-queue-supervision.md @@ -251,6 +251,14 @@ bun scripts/cli.ts codex pr-preflight --remote --issue - 没有 token、凭证、临时日志或构建产物进入 commit、PR body 或评论。 - 未授权 runner 收口的 PR 由指挥官审查并决定是否 merge;已授权 runner 收口的普通 PR 在合并后仍要验证目标分支远端 commit 可见,并按态势更新 issue/#20/#24。 +### Runner Resume 收口 + +PR 小修、冲突、rebase、补测和 reviewer feedback 默认优先 resume 原 Code Queue task,而不是派 replacement runner。正式入口是 `bun scripts/cli.ts codex resume --prompt-file [--resume-id id]`;prompt 仍必须来自位置参数、`--prompt-stdin` 或 `--prompt-file` 三选一,复杂多行内容优先文件或 stdin。真实 resume 成功只应返回 taskId、resumeId/turnId、`deliveryState`、是否复用原 `codexThreadId`/session、有界 trace confirmation 和后续 `codex task/detail/trace/output` 查询命令,不回显完整 prompt。 + +`codex resume` 只用于终态或 awaiting-closeout task。running/judging task 必须结构化失败并提示改用 `codex steer`;不存在 task、不可恢复状态或运行面不支持 follow-up turn 时必须 fail closed,不得伪装成新的 `codex submit`。同一个 `resumeId` 加同 prompt 必须 duplicate-suppressed;同一个 `resumeId` 加不同 prompt 必须 409 conflict;delivery timeout 或确认缺失时输出 `deliveryUnconfirmed` 和确认命令,指挥官先查 trace,再用同一 `resumeId` 重试。 + +replacement runner 只用于方向明显错误、质量不可接受、原 task 上下文不可恢复、原分支/PR 已废弃,或 resume 结构化 blocker 已证明无法继续的情况。关闭或替换旧 PR 时必须在 PR/body/final response 中说明 superseded/replacement 关系,避免 competing branch 扩散。 + ## 监控 指挥官必须用 task 级和 queue 级证据监控 Code Queue,不能只看单一状态字段。 @@ -265,6 +273,7 @@ bun scripts/cli.ts codex pr-preflight --remote --issue - `bun scripts/cli.ts codex tasks --status succeeded --unread --limit N`:按具体终态过滤监督结果;不支持的 status filter 必须显式失败,不能扩大为未过滤结果。 - `bun scripts/cli.ts codex task `:默认只查看原始 prompt、最终 response、最后错误和 drill-down 命令,这是完成未读任务审阅的第一步。 - 当默认审阅摘要不足时,再逐级使用 `bun scripts/cli.ts codex task --detail`、`bun scripts/cli.ts codex task --trace --limit N` 或 `codex output`。 +- `bun scripts/cli.ts codex resume --prompt-file `:对已终态或 awaiting-closeout 的原 task 追加后续修正 turn,适合 PR 小修、冲突、rebase、补测和 review 修正;running/judging task 改用 `codex steer`。 - 当 master 控制面状态和 D601 scheduler 状态看起来分裂时,使用 `docs/reference/observability.md` 中的活性规则判断。 默认 supervisor 视图必须保持低噪声。`activeRunning.count` 是指挥官 active running 计数,来源是 queue summary 的 status counts 时 `activeRunning.exact=true`,用于 redline 判断;`activeRunning.rowPage.returned` / `running.returned` 只表示本次返回的紧凑任务行。`activeRunning.redline` 必须写明 `countField`、routine target、burst redline、hard redline、`state` 和 `decisionReady`;只有 `decisionReady=true` 时,才能直接用该 count 做红线/补派判断。`running`、`completedUnread` 和 `queued` 即使传入较大的 `--limit`,默认也只返回一个很小的有界页,并通过 section `commands.next` 继续分页;`--limit` 保留为扫描/分页预算和 full view 返回预算,不得让一次 supervisor 调用输出几十条肥行。每个任务行只应带 task id 和必要摘要,`show`、`detail`、`trace`、`output`、`full`、`read` 使用 section template 表达,让下一步渐进披露动作明确且不重复;默认不得嵌入完整 queue 列表、完整 final response、raw output 页或完整 trace 行。`recentCompleted` 必须默认限量,且不得重复 `completedUnread` 里的未读终态,避免完成历史把当前 running、阻塞和未读审阅挤出视野;需要完整当前页时显式使用 `--view full`。`executionDiagnostics` 只能展示有界 task-id/reason 预览、总数、截断标记和 omitted counts;需要全量诊断时使用输出中的 raw command。`commands.read` 只是在人工审阅后的建议命令,listing 命令绝不能自动执行。 diff --git a/scripts/cli.ts b/scripts/cli.ts index c4e47e16..28e91eac 100644 --- a/scripts/cli.ts +++ b/scripts/cli.ts @@ -83,6 +83,24 @@ function displayCommandName(parts: string[]): string { } return shown.join(" "); } + if (parts[0] === "codex" && parts[1] === "resume" && parts[2] !== undefined) { + const shown = ["codex", "resume", parts[2]]; + const shownValueOptions = new Set(["--prompt-file", "--file", "--resume-id", "--resumeId"]); + const hasPromptFile = parts.includes("--prompt-file") || parts.includes("--file"); + const hasPromptStdin = parts.includes("--prompt-stdin") || parts.includes("--stdin"); + const hasHelp = parts.slice(3).some(isHelpToken); + if (!hasPromptFile && !hasPromptStdin && !hasHelp) shown.push(""); + for (let index = 3; index < parts.length; index += 1) { + const part = parts[index] ?? ""; + if (!part.startsWith("--")) continue; + shown.push(part); + if (shownValueOptions.has(part)) { + shown.push(parts[index + 1] ?? ""); + index += 1; + } + } + return shown.join(" "); + } if (parts[0] === "commander" && parts[1] === "approval" && parts[2] === "request") { const shown: string[] = []; for (let index = 0; index < parts.length; index += 1) { diff --git a/scripts/code-queue-resume-contract-test.ts b/scripts/code-queue-resume-contract-test.ts new file mode 100644 index 00000000..3f11c852 --- /dev/null +++ b/scripts/code-queue-resume-contract-test.ts @@ -0,0 +1,309 @@ +import { spawnSync } from "node:child_process"; +import { writeFileSync, unlinkSync } from "node:fs"; +import { join } from "node:path"; +import { tmpdir } from "node:os"; +import { codexResumeTaskForTest } from "./src/code-queue"; +import { findResumeTraceConfirmation, resumeDuplicateDecision, resumeTraceText } from "../src/components/microservices/code-queue/src/resume-confirmation"; +import type { QueueTask } from "../src/components/microservices/code-queue/src/types"; + +type JsonRecord = Record; + +function assertCondition(condition: unknown, message: string, detail: JsonRecord = {}): void { + if (!condition) throw new Error(`${message}: ${JSON.stringify(detail)}`); +} + +function nestedRecord(value: unknown, path: string[]): JsonRecord { + let current: unknown = value; + for (const key of path) { + assertCondition(current !== null && typeof current === "object" && !Array.isArray(current), "expected object while traversing JSON", { path, key, current }); + current = (current as JsonRecord)[key]; + } + assertCondition(current !== null && typeof current === "object" && !Array.isArray(current), "expected nested object", { path, current }); + return current as JsonRecord; +} + +function runCli(args: string[], stdin?: string): { status: number | null; stdout: string; stderr: string; json: JsonRecord | null } { + const result = spawnSync("bun", ["scripts/cli.ts", ...args], { + cwd: process.cwd(), + input: stdin, + encoding: "utf8", + }); + const stdout = String(result.stdout || ""); + let json: JsonRecord | null = null; + try { + json = JSON.parse(stdout) as JsonRecord; + } catch { + json = null; + } + return { status: result.status, stdout, stderr: String(result.stderr || ""), json }; +} + +function fixtureTask(): QueueTask { + const at = "2026-05-23T00:00:00.000Z"; + return { + id: "codex_resume_fixture", + queueId: "default", + queueEnteredAt: at, + prompt: "base", + basePrompt: "base", + referenceTaskIds: [], + referenceInjection: null, + providerId: "D601", + cwd: "/workspace/unidesk", + model: "gpt-5.5", + reasoningEffort: null, + executionMode: "default", + maxAttempts: 99, + status: "succeeded", + createdAt: at, + updatedAt: at, + startedAt: at, + finishedAt: "2026-05-23T00:10:00.000Z", + readAt: null, + currentAttempt: 1, + currentMode: "initial", + codexThreadId: "thread_resume_fixture", + activeTurnId: null, + finalResponse: "done", + lastError: null, + lastJudge: null, + judgeFailCount: 0, + promptHistory: [], + output: [], + events: [], + attempts: [], + cancelRequested: false, + nextPrompt: null, + nextMode: null, + }; +} + +function deterministicResumeId(taskId: string, prompt: string): string { + return `resume_${Bun.SHA256.hash(`unidesk-code-queue-resume:v1\0${taskId}\0${prompt}`, "hex").slice(0, 24)}`; +} + +function assertDryRunPrompt(response: JsonRecord, expectedText: string): void { + assertCondition(response.ok === true, "CLI dry-run should succeed", response); + const data = nestedRecord(response.data, []); + assertCondition(data.dryRun === true, "dry-run response should expose dryRun=true", data); + const request = nestedRecord(response.data, ["request"]); + assertCondition(request.method === "POST", "dry-run should expose request method", request); + assertCondition(request.path === "/api/tasks/codex_test_task/resume", "dry-run should expose resume path", request); + assertCondition(request.stableProxyPath === "/api/microservices/code-queue/proxy/api/tasks/codex_test_task/resume", "dry-run should expose stable proxy path", request); + assertCondition(request.resumeId === deterministicResumeId("codex_test_task", expectedText), "dry-run should expose deterministic resumeId", request); + const prompt = nestedRecord(response.data, ["request", "body", "prompt"]); + assertCondition(prompt.text === expectedText, "dry-run prompt text mismatch", prompt); + assertCondition(prompt.chars === expectedText.length, "dry-run prompt char count mismatch", prompt); + const commands = nestedRecord(response.data, ["commands"]); + assertCondition(String(commands.run || "").includes(`--resume-id ${String(request.resumeId)}`), "dry-run should expose same resumeId run command", commands); +} + +export function runCodeQueueResumeContract(): JsonRecord { + const positional = runCli(["codex", "resume", "codex_test_task", "fix the PR conflict", "--dry-run"]); + assertDryRunPrompt(positional.json ?? {}, "fix the PR conflict"); + assertCondition(String(positional.json?.command || "").includes(""), "outer command should redact positional resume prompt", positional.json ?? {}); + assertCondition(!String(positional.json?.command || "").includes("fix the PR conflict"), "outer command must not echo positional resume prompt", positional.json ?? {}); + + const stdin = runCli(["codex", "resume", "codex_test_task", "--prompt-stdin", "--dry-run"], "stdin resume prompt\n"); + assertDryRunPrompt(stdin.json ?? {}, "stdin resume prompt\n"); + + const promptFile = join(tmpdir(), `unidesk-code-queue-resume-${process.pid}.txt`); + writeFileSync(promptFile, "file resume prompt", "utf8"); + try { + const fromFile = runCli(["codex", "resume", "codex_test_task", "--prompt-file", promptFile, "--dry-run"]); + assertDryRunPrompt(fromFile.json ?? {}, "file resume prompt"); + } finally { + unlinkSync(promptFile); + } + + const duplicateSource = runCli(["codex", "resume", "codex_test_task", "positional", "--prompt-stdin", "--dry-run"], "stdin\n"); + assertCondition(duplicateSource.status !== 0, "duplicate prompt source should fail", duplicateSource.json ?? { stdout: duplicateSource.stdout }); + assertCondition(String(nestedRecord(duplicateSource.json, ["error"]).message || "").includes("exactly one prompt source"), "duplicate prompt source error should be explicit", duplicateSource.json ?? {}); + + const help = runCli(["codex", "help"]); + const usage = Array.isArray(nestedRecord(help.json?.data, []).usage) ? nestedRecord(help.json?.data, []).usage as unknown[] : []; + assertCondition(usage.some((line) => String(line).includes("codex resume ")), "codex help should list resume", { usage: usage.map(String) }); + + let dryRunFetchCount = 0; + const dryRunDirect = codexResumeTaskForTest("direct_task", ["do not send", "--dry-run"], () => { + dryRunFetchCount += 1; + return { ok: true, status: 200, body: { ok: true } }; + }); + assertCondition(dryRunFetchCount === 0, "dry-run must not call stable proxy helper", { dryRunFetchCount, dryRunDirect }); + + const longPrompt = `${"x".repeat(480)}-tail-secret-marker`; + const longDryRun = codexResumeTaskForTest("direct_task", [longPrompt, "--dry-run"], () => { + throw new Error("dry-run should not fetch"); + }) as JsonRecord; + const longPreview = nestedRecord(longDryRun, ["request", "body", "prompt"]); + assertCondition(longPreview.truncated === true, "long dry-run prompt should be truncated", longPreview); + assertCondition(!String(longPreview.text || "").includes("tail-secret-marker"), "long dry-run must not leak prompt tail", longPreview); + + let fetchPath = ""; + let fetchMethod = ""; + let fetchPrompt = ""; + let fetchResumeId = ""; + const success = codexResumeTaskForTest("direct_task", ["resume this context"], (path, init) => { + fetchPath = path; + fetchMethod = String(init?.method || ""); + fetchPrompt = String((init?.body as JsonRecord | undefined)?.prompt || ""); + fetchResumeId = String((init?.body as JsonRecord | undefined)?.resumeId || ""); + return { + ok: true, + status: 202, + body: { + ok: true, + accepted: true, + duplicateSuppressed: false, + deliveryState: "queued_for_existing_thread", + resumeId: fetchResumeId, + turnId: 9, + reuseOriginalThread: true, + originalCodexThreadId: "thread_original", + codexThreadId: "thread_original", + traceConfirmation: { + taskId: "direct_task", + resumeId: fetchResumeId, + found: true, + accepted: true, + deliveryState: "queued_for_existing_thread", + matchCount: 1, + trace: { seq: 9, at: "2026-05-23T00:00:09.000Z", method: "turn/resume", resumeId: fetchResumeId, promptChars: 19, promptHash: "hash", promptOmitted: true, source: "output" }, + duplicateSuppressionKey: fetchResumeId, + promptOmitted: true, + }, + task: { id: "direct_task", status: "queued", codexThreadId: "thread_original", currentAttempt: 1, currentMode: "initial", prompt: "hidden" }, + queue: { activeTaskIds: [], queuedTaskIds: ["direct_task"] }, + }, + }; + }) as JsonRecord; + assertCondition(fetchPath === "/api/microservices/code-queue/proxy/api/tasks/direct_task/resume", "non-dry-run should use stable resume path", { fetchPath }); + assertCondition(fetchMethod === "POST", "non-dry-run should POST", { fetchMethod }); + assertCondition(fetchPrompt === "resume this context", "non-dry-run should send raw prompt in body", { fetchPrompt }); + assertCondition(fetchResumeId === deterministicResumeId("direct_task", "resume this context"), "non-dry-run should send deterministic resumeId", { fetchResumeId }); + assertCondition(nestedRecord(success, ["resume"]).accepted === true, "successful resume should report accepted=true", success); + assertCondition(nestedRecord(success, ["resume"]).reusedCodexThread === true, "successful resume should report thread reuse", success); + assertCondition(nestedRecord(success, ["resume"]).promptOmitted === true, "successful resume should mark prompt omitted", success); + assertCondition(nestedRecord(success, ["resume"]).deliveryState === "queued_for_existing_thread", "successful resume should expose delivery state", success); + assertCondition(!JSON.stringify(success).includes("resume this context"), "successful resume must not echo prompt text", success); + + const explicitResumeId = "resume_manual_12345"; + const duplicateSuppressed = codexResumeTaskForTest("direct_task", ["same prompt", "--resume-id", explicitResumeId], (_path, init) => { + assertCondition((init?.body as JsonRecord | undefined)?.resumeId === explicitResumeId, "explicit resumeId should be sent unchanged", (init?.body as JsonRecord | undefined) ?? {}); + return { + ok: true, + status: 200, + body: { + ok: true, + accepted: true, + duplicateSuppressed: true, + deliveryState: "duplicate_suppressed", + resumeId: explicitResumeId, + reuseOriginalThread: true, + originalCodexThreadId: "thread_original", + codexThreadId: "thread_original", + traceConfirmation: { + taskId: "direct_task", + resumeId: explicitResumeId, + found: true, + accepted: true, + deliveryState: "queued_for_existing_thread", + matchCount: 1, + trace: { seq: 11, at: "2026-05-23T00:00:11.000Z", method: "turn/resume", resumeId: explicitResumeId, promptChars: 11, promptHash: "hash2", promptOmitted: true, source: "output" }, + duplicateSuppressionKey: explicitResumeId, + }, + task: { id: "direct_task", status: "queued", codexThreadId: "thread_original", prompt: "hidden" }, + queue: { queuedTaskIds: ["direct_task"] }, + }, + }; + }) as JsonRecord; + assertCondition(nestedRecord(duplicateSuppressed, ["resume"]).status === "duplicate_suppressed", "duplicate resume should expose suppression status", duplicateSuppressed); + assertCondition(nestedRecord(duplicateSuppressed, ["resume"]).duplicateSuppressed === true, "duplicate resume should expose duplicateSuppressed", duplicateSuppressed); + + const conflictPrompt = "changed resume request requested-secret-marker"; + const conflict = codexResumeTaskForTest("direct_task", [conflictPrompt, "--resume-id", explicitResumeId], () => ({ + ok: false, + status: 409, + body: { + ok: false, + error: "resumeId already exists with a different prompt hash", + accepted: false, + deliveryState: "not_accepted", + resumeId: explicitResumeId, + existingPromptHash: "old", + requestedPromptHash: "new", + traceConfirmation: { + taskId: "direct_task", + resumeId: explicitResumeId, + found: true, + accepted: true, + deliveryState: "queued_for_existing_thread", + matchCount: 1, + trace: { seq: 11, at: "2026-05-23T00:00:11.000Z", method: "turn/resume", resumeId: explicitResumeId, promptChars: 11, promptHash: "old", promptOmitted: true, source: "output" }, + }, + task: { id: "direct_task", status: "queued", prompt: `${"hidden ".repeat(80)}task-secret-marker` }, + }, + })) as JsonRecord; + assertCondition(conflict.ok === false, "resumeId conflict should fail", conflict); + assertCondition(nestedRecord(conflict, ["resume"]).status === "not_accepted", "conflict should expose not_accepted", conflict); + assertCondition(!JSON.stringify(conflict).includes("requested-secret-marker"), "conflict must not echo requested prompt", conflict); + assertCondition(!JSON.stringify(conflict).includes("task-secret-marker"), "conflict must not echo full task prompt by default", conflict); + + const runningRejection = codexResumeTaskForTest("running_task", ["use steer instead"], () => ({ + ok: false, + status: 409, + body: { + ok: false, + error: "task is active: running", + accepted: false, + deliveryState: "not_accepted", + disposition: "use-steer-for-active-task", + resumeId: deterministicResumeId("running_task", "use steer instead"), + task: { id: "running_task", status: "running", terminalStatus: null, prompt: "hidden active task" }, + }, + })) as JsonRecord; + assertCondition(runningRejection.ok === false, "running task resume should fail closed", runningRejection); + assertCondition(nestedRecord(runningRejection, ["resume"]).reason === "use-steer-for-active-task", "running resume should route to steer", runningRejection); + assertCondition(String(nestedRecord(runningRejection, ["commands"]).steer || "").includes("codex steer running_task"), "running rejection should provide steer command", runningRejection); + + const notFound = codexResumeTaskForTest("missing_task", ["prompt"], () => ({ ok: false, status: 404, body: { ok: false, error: "task not found" } })) as JsonRecord; + assertCondition(notFound.ok === false, "missing task resume should fail", notFound); + assertCondition(nestedRecord(notFound, ["resume"]).status === "not_accepted", "missing task should be not accepted", notFound); + + const task = fixtureTask(); + const resumeId = "resume_contract_12345"; + const prompt = "continue the same PR"; + task.output.push({ seq: 9, at: "2026-05-23T00:00:09.000Z", channel: "user", method: "turn/resume", itemId: resumeId, text: resumeTraceText(resumeId, prompt) }); + const confirmation = findResumeTraceConfirmation(task, resumeId); + assertCondition(confirmation.found === true && confirmation.accepted === true, "confirmation should find resume trace by resumeId", confirmation as unknown as JsonRecord); + assertCondition(confirmation.trace?.promptChars === prompt.length, "confirmation should expose prompt chars without prompt text", (confirmation.trace ?? {}) as unknown as JsonRecord); + assertCondition(!JSON.stringify(confirmation).includes(prompt), "confirmation must not echo prompt text", confirmation as unknown as JsonRecord); + const duplicate = resumeDuplicateDecision(task, resumeId, prompt); + assertCondition(duplicate.duplicate === true && duplicate.conflict === false, "same resumeId and prompt should be duplicate-suppressed", duplicate as unknown as JsonRecord); + const localConflict = resumeDuplicateDecision(task, resumeId, "changed prompt"); + assertCondition(localConflict.duplicate === false && localConflict.conflict === true, "same resumeId with changed prompt should conflict", localConflict as unknown as JsonRecord); + const newlineResumeId = "resume_contract_newline"; + const newlinePrompt = "keep trailing newline\n"; + task.output.push({ seq: 10, at: "2026-05-23T00:00:10.000Z", channel: "user", method: "turn/resume", itemId: newlineResumeId, text: resumeTraceText(newlineResumeId, newlinePrompt) }); + const newlineDuplicate = resumeDuplicateDecision(task, newlineResumeId, newlinePrompt); + assertCondition(newlineDuplicate.duplicate === true && newlineDuplicate.conflict === false, "duplicate suppression should use exact prompt hash, including trailing newline", newlineDuplicate as unknown as JsonRecord); + + return { + ok: true, + checks: [ + "resume positional/stdin/file dry-runs", + "bounded disclosure and outer command redaction", + "non-dry-run sends resumeId and omits prompt from output", + "terminal resume accepted with thread reuse metadata", + "duplicate suppression and conflict behavior", + "running task fails closed with steer command", + "missing task fails closed", + "local resume trace confirmation helpers", + "exact prompt hash survives trailing newline", + ], + }; +} + +if (import.meta.main) { + process.stdout.write(`${JSON.stringify(runCodeQueueResumeContract(), null, 2)}\n`); +} diff --git a/scripts/src/check.ts b/scripts/src/check.ts index 3241df84..797a5138 100644 --- a/scripts/src/check.ts +++ b/scripts/src/check.ts @@ -370,6 +370,7 @@ export function runChecks(config: UniDeskConfig, options: CheckOptions = default items.push(commandItem("code-queue:prompt-lint-contract", ["bun", "scripts/code-queue-prompt-lint-contract-test.ts"], 30_000)); items.push(commandItem("code-queue:cli-steer-contract", ["bun", "scripts/code-queue-cli-steer-test.ts"], 30_000)); items.push(commandItem("code-queue:steer-confirmation-contract", ["bun", "scripts/code-queue-steer-confirmation-contract-test.ts"], 30_000)); + items.push(commandItem("code-queue:resume-contract", ["bun", "scripts/code-queue-resume-contract-test.ts"], 30_000)); items.push(commandItem("code-queue:read-terminal-contract", ["bun", "scripts/code-queue-cli-read-terminal-contract-test.ts"], 30_000)); items.push(commandItem("code-queue:submit-prompt-contract", ["bun", "scripts/code-queue-cli-submit-prompt-contract-test.ts"], 30_000)); items.push(commandItem("code-queue:submit-execution-mode-contract", ["bun", "scripts/code-queue-submit-execution-mode-contract-test.ts"], 30_000)); @@ -407,6 +408,7 @@ export function runChecks(config: UniDeskConfig, options: CheckOptions = default items.push(skippedItem("code-queue:prompt-lint-contract", "Code Queue prompt live-authorization lint contract is opt-in with script checks", "--scripts-typecheck or --full")); items.push(skippedItem("code-queue:cli-steer-contract", "Code Queue steer CLI contract is opt-in with script checks", "--scripts-typecheck or --full")); items.push(skippedItem("code-queue:steer-confirmation-contract", "Code Queue steer delivery confirmation contract is opt-in with script checks", "--scripts-typecheck or --full")); + items.push(skippedItem("code-queue:resume-contract", "Code Queue resume CLI and delivery contract is opt-in with script checks", "--scripts-typecheck or --full")); items.push(skippedItem("code-queue:read-terminal-contract", "Code Queue terminal read contract is opt-in with script checks", "--scripts-typecheck or --full")); items.push(skippedItem("code-queue:submit-prompt-contract", "Code Queue submit prompt contract is opt-in with script checks", "--scripts-typecheck or --full")); items.push(skippedItem("code-queue:submit-execution-mode-contract", "Code Queue submit execution-mode contract is opt-in with script checks", "--scripts-typecheck or --full")); diff --git a/scripts/src/code-queue.ts b/scripts/src/code-queue.ts index f6a30374..622e261e 100644 --- a/scripts/src/code-queue.ts +++ b/scripts/src/code-queue.ts @@ -3,6 +3,7 @@ import { runCommand } from "./command"; import { type UniDeskConfig, repoRoot, rootPath } from "./config"; import { coreInternalFetch } from "./microservices"; import { previewJson } from "./preview"; +import { createResumeId, type ResumeDeliveryState } from "../../src/components/microservices/code-queue/src/resume-confirmation"; import { createSteerId, type SteerDeliveryState } from "../../src/components/microservices/code-queue/src/steer-confirmation"; import { codeAgentPortForModel, @@ -234,6 +235,14 @@ interface CodexSteerConfirmOptions { raw: boolean; } +interface CodexResumeOptions { + prompt: string; + resumeId: string | undefined; + dryRun: boolean; + full: boolean; + raw: boolean; +} + type CodexSteerFailureReason = | "backend-core-unreachable" | "code-queue-microservice-unregistered" @@ -245,6 +254,8 @@ type CodexSteerFailureReason = | "invalid-proxy-response"; type CodexSteerAcceptanceStatus = "accepted" | "not_accepted" | "accepted_response_timeout" | "unknown"; +type CodexResumeAcceptanceStatus = "queued" | "duplicate_suppressed" | "not_accepted" | "accepted_response_timeout" | "unknown"; +type CodexResumeDeliveryState = ResumeDeliveryState; interface ClassifiedCodexSteerError { reason: CodexSteerFailureReason; @@ -468,6 +479,10 @@ function rawSteerConfirmationCommand(taskId: string, steerId: string): string { return `bun scripts/cli.ts microservice proxy code-queue ${steerConfirmationPath(taskId, steerId)} --raw`; } +function resumeCommand(taskId: string, resumeId: string): string { + return `bun scripts/cli.ts codex resume ${taskId} --prompt-file --resume-id ${resumeId}`; +} + function nonNegativeIntegerEnv(name: string, fallback: number): number { const raw = process.env[name]; if (raw === undefined || raw.trim().length === 0) return fallback; @@ -2595,6 +2610,10 @@ export function codexSteerTaskForTest(taskId: string, optionArgs: string[], fetc return codexSteerTask(taskId, optionArgs, fetcher); } +export function codexResumeTaskForTest(taskId: string, optionArgs: string[], fetcher: CodexResponseFetcher): unknown { + return codexResumeTask(taskId, optionArgs, fetcher); +} + export function codexSteerTraceConfirmForTest(taskId: string, optionArgs: string[], fetcher: CodexResponseFetcher): unknown { return codexSteerTraceConfirm(taskId, optionArgs, fetcher); } @@ -4071,6 +4090,13 @@ const steerPromptValueOptions = new Set([ "--steerId", ]); +const resumePromptValueOptions = new Set([ + "--prompt-file", + "--file", + "--resume-id", + "--resumeId", +]); + function referenceTaskIdsFromOptions(args: string[]): string[] { const values = optionValues(args, ["--reference-task-id", "--reference", "--ref"]); const ids: string[] = []; @@ -4207,6 +4233,22 @@ function parseSteerConfirmOptions(args: string[]): CodexSteerConfirmOptions { return { steerId, raw: hasFlag(args, "--raw") }; } +function parseResumeOptions(args: string[]): CodexResumeOptions { + assertKnownOptions(args, { + flags: ["--prompt-stdin", "--stdin", "--dry-run", "--full", "--raw"], + valueOptions: ["--prompt-file", "--file", "--resume-id", "--resumeId"], + }, "codex resume"); + const resumeId = optionValue(args, ["--resume-id", "--resumeId"]); + if (resumeId !== undefined && !/^[A-Za-z0-9._:-]{8,128}$/u.test(resumeId)) throw new Error("--resume-id must be 8-128 chars using letters, numbers, dot, underscore, colon, or dash"); + return { + prompt: promptFromArgs(args, "codex resume", resumePromptValueOptions), + resumeId, + dryRun: hasFlag(args, "--dry-run"), + full: hasFlag(args, "--full") || hasFlag(args, "--raw"), + raw: hasFlag(args, "--raw"), + }; +} + function parsePromptLintOptions(args: string[]): CodexPromptLintOptions { assertKnownOptions(args, { flags: ["--prompt-stdin", "--stdin"], @@ -4308,6 +4350,58 @@ function compactSteerTaskConfirmation(task: unknown, steerId: string): Record { + const record = asRecord(value) ?? {}; + const confirmation = asRecord(record.confirmation) ?? record; + const trace = asRecord(confirmation.trace); + const compactTrace = trace === null ? null : { + seq: trace.seq ?? null, + at: trace.at ?? null, + method: trace.method ?? null, + resumeId: trace.resumeId ?? resumeId, + promptChars: trace.promptChars ?? null, + promptHash: trace.promptHash ?? null, + promptOmitted: true, + source: trace.source ?? null, + }; + return { + taskId: asString(confirmation.taskId) || taskId, + resumeId: asString(confirmation.resumeId) || resumeId, + found: confirmation.found === true, + accepted: confirmation.accepted === true, + deliveryState: asString(confirmation.deliveryState) || (confirmation.found === true ? "queued_for_existing_thread" : "unknown"), + matchCount: asNumber(confirmation.matchCount, 0), + trace: compactTrace, + duplicateSuppressionKey: confirmation.duplicateSuppressionKey ?? resumeId, + promptOmitted: true, + }; +} + +function terminalStatusFromResumeTask(task: Record | null): string { + return terminalStatusFromTask(task); +} + +function compactResumeTaskConfirmation(task: unknown, resumeId: string, source: Record): Record { + const compact = compactSubmitTaskConfirmation(task); + const commands = asRecord(compact.commands) ?? {}; + const record = asRecord(task) ?? {}; + return { + ...compact, + currentAttempt: record.currentAttempt ?? null, + currentMode: record.currentMode ?? null, + terminalStatus: terminalStatusFromResumeTask(record) || null, + reusedCodexThreadId: source.originalCodexThreadId ?? source.codexThreadId ?? record.codexThreadId ?? null, + commands: { + ...commands, + show: `bun scripts/cli.ts codex task ${asString(compact.id) || ""}`, + detail: `bun scripts/cli.ts codex task ${asString(compact.id) || ""} --detail`, + trace: `bun scripts/cli.ts codex task ${asString(compact.id) || ""} --trace --tail --limit ${defaultTraceLimit}`, + output: `bun scripts/cli.ts codex output ${asString(compact.id) || ""} --tail --limit ${defaultOutputLimit}`, + resumeAgain: resumeCommand(asString(compact.id) || "", resumeId), + }, + }; +} + function orderedUniqueStringList(values: string[]): string[] { const seen = new Set(); const items: string[] = []; @@ -6461,6 +6555,147 @@ function codexSteerTraceConfirm(taskId: string, args: string[], fetcher: CodexRe }; } +function compactResumeRejection(taskId: string, resumeId: string, response: unknown, options: CodexResumeOptions): Record { + const record = asRecord(response); + const body = responseBody(record); + const task = asRecord(body?.task); + const status = asString(task?.status); + const upstreamStatus = responseStatus(record); + const bodyError = asString(body?.error); + const disposition = asString(body?.disposition) + || (upstreamStatus === 404 || bodyError?.toLowerCase().includes("not found") ? "task-not-found" : isActiveTaskStatus(status) ? "use-steer-for-active-task" : isTerminalTaskStatus(status) ? "resume-rejected" : "not-terminal"); + const result: Record = { + ok: false, + resume: { + accepted: false, + status: "not_accepted" satisfies CodexResumeAcceptanceStatus, + deliveryState: "not_accepted" satisfies CodexResumeDeliveryState, + resumeId, + reason: disposition, + taskId, + taskStatus: status || null, + terminalStatus: terminalStatusFromResumeTask(task) || null, + retryable: false, + promptOmitted: true, + }, + message: asString(body?.error) || `task ${taskId} is not resumable`, + commands: { + show: `bun scripts/cli.ts codex task ${taskId}`, + detail: `bun scripts/cli.ts codex task ${taskId} --detail`, + ...(disposition === "use-steer-for-active-task" ? { steer: `bun scripts/cli.ts codex steer ${taskId} --prompt-file ` } : {}), + supervisor: `bun scripts/cli.ts codex tasks --view supervisor --limit ${defaultTasksLimit}`, + }, + upstream: { + status: upstreamStatus, + error: bodyError || null, + }, + disclosure: { + defaultPolicy: "compact resume rejection; request prompt, upstream task body, and raw response require explicit --full or --raw", + full: options.full, + raw: options.raw, + defaultOmitted: ["request.body.prompt.text", "task.prompt", "task.finalResponse", "rawFailure"], + }, + }; + if (options.full) { + result.task = compactResumeTaskConfirmation(task, resumeId, body ?? {}); + result.upstreamBodyPreview = previewJson(body ?? record, { maxDepth: 4, maxArrayItems: 8, maxObjectKeys: 24, maxStringLength: 600 }); + } + if (options.raw) result.rawFailure = response; + return result; +} + +function codexResumeTask(taskId: string, args: string[], fetcher: CodexResponseFetcher = coreInternalFetch): unknown { + const options = parseResumeOptions(args); + const resumeId = options.resumeId ?? createResumeId(taskId, options.prompt); + const targetPath = `/api/tasks/${encodeURIComponent(taskId)}/resume`; + const stableProxyPath = codeQueueProxyPath(targetPath); + const rawProxyEquivalent = codeQueueProxyEquivalentCommand(targetPath, `{"prompt":"...","resumeId":"${resumeId}"}`); + const prompt = textView(options.prompt, false, steerPromptPreviewChars); + const request = { + path: targetPath, + stableProxyPath, + method: "POST", + resumeId, + bodySummary: { + resumeId, + promptChars: options.prompt.length, + promptPreviewChars: steerPromptPreviewChars, + promptTruncated: prompt.truncated, + }, + body: { resumeId, prompt }, + contract: { + target: "terminal-or-awaiting-closeout task follow-up turn", + idempotency: "same resumeId and same prompt is duplicate-suppressed; same resumeId and different prompt is rejected as conflict", + activeTaskDisposition: "active running/judging tasks must use codex steer, not resume", + promptEchoPolicy: "default dry-run shows only bounded prompt preview; non-dry-run never echoes prompt text", + }, + }; + if (options.dryRun) { + return { + ok: true, + dryRun: true, + request, + commands: { + run: resumeCommand(taskId, resumeId), + show: `bun scripts/cli.ts codex task ${taskId}`, + supervisor: `bun scripts/cli.ts codex tasks --view supervisor --limit ${defaultTasksLimit}`, + rawProxy: rawProxyEquivalent, + }, + }; + } + const rawResponse = fetcher(stableProxyPath, { method: "POST", body: { prompt: options.prompt, resumeId } }); + const record = asRecord(rawResponse); + const body = responseBody(record); + if (record?.ok !== true || body?.ok !== true) { + return compactResumeRejection(taskId, resumeId, rawResponse, options); + } + const responseResumeId = asString(body.resumeId) || resumeId; + const duplicateSuppressed = body.duplicateSuppressed === true; + const traceConfirmation = compactResumeTraceConfirmation(body.traceConfirmation, taskId, responseResumeId); + const deliveryState = asString(body.deliveryState) || asString(traceConfirmation.deliveryState) || (duplicateSuppressed ? "duplicate_suppressed" : "queued_for_existing_thread"); + const acceptanceStatus: CodexResumeAcceptanceStatus = duplicateSuppressed ? "duplicate_suppressed" : deliveryState === "accepted_response_timeout" ? "accepted_response_timeout" : "queued"; + const traceRecord = asRecord(traceConfirmation.trace); + return { + ok: true, + upstream: { ok: record.ok, status: record.status }, + resume: { + accepted: true, + status: acceptanceStatus, + deliveryState, + resumeId: responseResumeId, + turnId: body.turnId ?? traceRecord?.seq ?? null, + taskId, + promptChars: options.prompt.length, + promptOmitted: true, + duplicateSuppressed, + duplicateSuppressionKey: responseResumeId, + reusedCodexThread: body.reuseOriginalThread === true, + originalCodexThreadId: body.originalCodexThreadId ?? null, + codexThreadId: body.codexThreadId ?? null, + deliveryUnconfirmed: deliveryState === "accepted_response_timeout", + outputPolicy: { + default: "write-confirmation", + promptEchoed: false, + taskDetailEchoed: false, + reason: "codex resume is a write operation; default output confirms queued delivery and provides drill-down commands without echoing prompt text or full task state.", + }, + }, + traceConfirmation, + task: compactResumeTaskConfirmation(body.task, responseResumeId, body), + queue: compactSubmitQueueConfirmation(body.queue), + commands: { + show: `bun scripts/cli.ts codex task ${taskId}`, + detail: `bun scripts/cli.ts codex task ${taskId} --detail`, + trace: `bun scripts/cli.ts codex task ${taskId} --trace --tail --limit ${defaultTraceLimit}`, + output: `bun scripts/cli.ts codex output ${taskId} --tail --limit ${defaultOutputLimit}`, + retrySameResumeId: resumeCommand(taskId, responseResumeId), + supervisor: `bun scripts/cli.ts codex tasks --view supervisor --limit ${defaultTasksLimit}`, + confirmDelivery: `bun scripts/cli.ts codex task ${taskId} --trace --tail --limit ${defaultTraceLimit}`, + ...(deliveryState === "accepted_response_timeout" ? { deliveryUnconfirmed: `bun scripts/cli.ts codex task ${taskId} --trace --tail --limit ${defaultTraceLimit}` } : {}), + }, + }; +} + export async function runCodeQueueCommand(config: UniDeskConfig, args: string[]): Promise { const [action = "task", taskIdArg] = args; if (action === "prompt-lint" || action === "lint-prompt") { @@ -6519,9 +6754,13 @@ export async function runCodeQueueCommand(config: UniDeskConfig, args: string[]) const taskId = requireTaskId(taskIdArg, "codex steer"); return codexSteerTask(taskId, args.slice(2)); } + if (action === "resume") { + const taskId = requireTaskId(taskIdArg, "codex resume"); + return codexResumeTask(taskId, args.slice(2)); + } if (action === "steer-confirm" || action === "steer-confirmation") { const taskId = requireTaskId(taskIdArg, `codex ${action}`); return codexSteerTraceConfirm(taskId, args.slice(2)); } - throw new Error("codex command must be one of: prompt-lint, submit, enqueue, task, summary, show, tasks, overview, unread, terminal-unread, output, judge, read, mark-read, dev-ready, health, skills-sync, pr-preflight, runtime-preflight, queues, queue list, queue create, queue merge, move, steer, steer-confirm, interrupt, cancel"); + throw new Error("codex command must be one of: prompt-lint, submit, enqueue, task, summary, show, tasks, overview, unread, terminal-unread, output, judge, read, mark-read, dev-ready, health, skills-sync, pr-preflight, runtime-preflight, queues, queue list, queue create, queue merge, move, steer, resume, steer-confirm, interrupt, cancel"); } diff --git a/scripts/src/help.ts b/scripts/src/help.ts index f6a7d863..d899786a 100644 --- a/scripts/src/help.ts +++ b/scripts/src/help.ts @@ -64,6 +64,7 @@ export function rootHelp(): unknown { { command: "codex dev-ready", description: "Fetch execution-container readiness, including sanitized skill injection status from /api/dev-ready." }, { command: "codex judge --attempt N [--dry-run] [--include-prompt]", description: "Replay one stored Code Queue attempt through the same judge context builder and MiniMax judge call path used by the live queue worker." }, { command: "codex steer [prompt|--prompt-file path|--prompt-stdin] [--steer-id id] [--dry-run] [--no-retry|--retry-attempts N] [--full|--raw]", description: "Push a corrective prompt into a running Code Queue task with a steerId/idempotency key; retryable tunnel aborts get bounded trace confirmation before any retry guidance." }, + { command: "codex resume [prompt|--prompt-file path|--prompt-stdin] [--resume-id id] [--dry-run] [--full|--raw]", description: "Queue a follow-up turn on a terminal Code Queue task, preserving the original task/thread/PR context and suppressing duplicate resumeId delivery without echoing the prompt." }, { command: "codex steer-confirm --steer-id [--raw]", description: "Read-only lookup for a steerId in task trace so deliveryUnconfirmed can be resolved without resending the corrective prompt." }, { command: "codex interrupt|cancel ", description: "Request interrupt for a running Code Queue task, or cancel a queued/retry_wait task, through the same private proxy." }, { command: "codex (queues [--full|--all] | queue create | queue merge --into | move --queue )", description: "List low-noise queue summaries by default, including effective activity counts that distinguish scheduler-local queues, DB running tasks, and heartbeat-fresh runners; full queue rows require --full/--all." }, @@ -249,7 +250,7 @@ function scheduleHelp(): unknown { function codexHelp(): unknown { return { - command: "codex deploy|prompt-lint|submit|task|tasks|unread|output|read|dev-ready|skills-sync|pr-preflight|judge|steer|interrupt|cancel|queues|queue|move", + command: "codex deploy|prompt-lint|submit|task|tasks|unread|output|read|dev-ready|skills-sync|pr-preflight|judge|steer|resume|interrupt|cancel|queues|queue|move", output: "json", usage: [ "bun scripts/cli.ts codex deploy # disabled legacy deployment entry", @@ -268,6 +269,7 @@ function codexHelp(): unknown { "bun scripts/cli.ts codex pr-preflight [--remote] [--push-dry-run --push-dry-run-ref refs/heads/probe/] [--pr-create-dry-run --pr-create-dry-run-head ] [--issue N] [--full|--raw]", "bun scripts/cli.ts codex judge --attempt N [--dry-run] [--include-prompt]", "bun scripts/cli.ts codex steer [prompt|--prompt-file path|--prompt-stdin] [--steer-id id] [--dry-run] [--no-retry|--retry-attempts N] [--full|--raw]", + "bun scripts/cli.ts codex resume [prompt|--prompt-file path|--prompt-stdin] [--resume-id id] [--dry-run] [--full|--raw]", "bun scripts/cli.ts codex steer-confirm --steer-id [--raw]", "bun scripts/cli.ts codex interrupt|cancel ", "bun scripts/cli.ts codex queues [--full|--all] [--limit N] [--page N|--offset N] | queue create | queue merge --into | move --queue ", diff --git a/src/components/backend-core/src/microservice-proxy.ts b/src/components/backend-core/src/microservice-proxy.ts index 54ef7971..c57b59c8 100644 --- a/src/components/backend-core/src/microservice-proxy.ts +++ b/src/components/backend-core/src/microservice-proxy.ts @@ -402,7 +402,7 @@ function codeQueueK3sServiceIdForRequest(method: string, targetPath: string): st if (targetPath === "/api/queues" || targetPath === "/api/tasks/overview") return "code-queue-scheduler"; if (targetPath === "/api/oa/backfill" || targetPath === "/api/notifications/claudeqq/drain" || targetPath === "/api/notifications/claudeqq/backfill") return "code-queue-scheduler"; if (targetPath === "/api/judge/probe" || targetPath === "/api/judge/self-test" || targetPath === "/api/queue-order/self-test" || targetPath === "/api/reference-injection/self-test" || targetPath === "/api/trace-port/self-test") return "code-queue-scheduler"; - if (/^\/api\/tasks\/[^/]+\/(?:steer|interrupt)$/u.test(targetPath)) return "code-queue-scheduler"; + if (/^\/api\/tasks\/[^/]+\/(?:steer|resume|interrupt)$/u.test(targetPath)) return "code-queue-scheduler"; if (/^\/api\/tasks\/[^/]+$/u.test(targetPath) && normalizedMethod === "DELETE") return "code-queue-scheduler"; if (targetPath === "/api/dev-containers" || /^\/api\/dev-containers(?:\/[^/]+)?\/start$/u.test(targetPath)) return "code-queue-scheduler"; if (/^\/api\/dev-containers(?:\/[^/]+)?\/status$/u.test(targetPath)) return "code-queue-scheduler"; diff --git a/src/components/microservices/code-queue/src/index.ts b/src/components/microservices/code-queue/src/index.ts index e8e651a4..3e274f9e 100644 --- a/src/components/microservices/code-queue/src/index.ts +++ b/src/components/microservices/code-queue/src/index.ts @@ -137,6 +137,7 @@ import { import { collectRuntimePreflight, runtimePreflightJson } from "./runtime-preflight"; import { collectSkillAvailability, collectSkillSyncPreflight, skillAvailabilityJson, skillSyncPreflightJson } from "./skill-availability"; import { createSteerId, findSteerTraceConfirmation, normalizeSteerId, normalizeSteerPromptText, steerDuplicateDecision, steerPromptHash, steerTraceConfirmationJson, steerTraceText } from "./steer-confirmation"; +import { createResumeId, findResumeTraceConfirmation, normalizeResumeId, resumeDuplicateDecision, resumePromptHash, resumeTraceConfirmationJson, resumeTraceText } from "./resume-confirmation"; import { configureSelfTests, runJudgeInfraSelfTest, runQueueClaimMoveSelfTest, runQueueOrderingSelfTest, runReferenceInjectionSelfTest, runStaleActiveRecoverySelfTest, runTracePortSelfTest, runTraceSummaryContractSelfTest } from "./self-tests"; import { codexToolLifecycleStartedBeforeIn, @@ -4873,6 +4874,107 @@ async function steerTask(task: QueueTask, req: Request): Promise { }); } +async function resumeTask(task: QueueTask, req: Request): Promise { + if (!serviceRoleAllowsScheduler(config.serviceRole)) return schedulerOnlyRejectResponse(req.method, `/api/tasks/${task.id}/resume`); + const notReady = requireDatabaseReadyForWrite(req.method, `/api/tasks/${task.id}/resume`); + if (notReady !== null) return notReady; + const body = await readJson(req); + const bodyRecord = typeof body === "object" && body !== null && !Array.isArray(body) ? body as Record : {}; + const prompt = typeof bodyRecord.prompt === "string" ? String(bodyRecord.prompt) : ""; + if (prompt.trim().length === 0) return jsonResponse({ ok: false, error: "prompt is required" }, 400); + const resumeId = normalizeResumeId(bodyRecord.resumeId) ?? createResumeId(task.id, prompt); + const originalCodexThreadId = task.codexThreadId; + const duplicateDecision = resumeDuplicateDecision(task, resumeId, prompt, taskFullOutput(task)); + if (duplicateDecision.duplicate) { + await flushDirtyTasksToDatabase(true); + return jsonResponse({ + ok: true, + accepted: true, + duplicateSuppressed: true, + deliveryState: "duplicate_suppressed", + resumeId, + reuseOriginalThread: originalCodexThreadId !== null, + originalCodexThreadId, + codexThreadId: task.codexThreadId, + traceConfirmation: resumeTraceConfirmationJson(duplicateDecision.confirmation), + task: taskForResponse(task), + queue: await queueSummaryForResponse(), + }); + } + if (duplicateDecision.conflict) { + return jsonResponse({ + ok: false, + error: "resumeId already exists with a different prompt hash", + accepted: false, + deliveryState: "not_accepted", + resumeId, + existingPromptHash: duplicateDecision.existingPromptHash, + requestedPromptHash: duplicateDecision.requestedPromptHash, + traceConfirmation: resumeTraceConfirmationJson(duplicateDecision.confirmation), + task: taskForResponse(task), + }, 409); + } + if (task.status === "running" || task.status === "judging") { + return jsonResponse({ + ok: false, + error: `task is active: ${task.status}`, + accepted: false, + deliveryState: "not_accepted", + disposition: "use-steer-for-active-task", + resumeId, + task: taskForResponse(task), + commands: { + steer: `bun scripts/cli.ts codex steer ${task.id} --prompt-file `, + show: `bun scripts/cli.ts codex task ${task.id}`, + }, + }, 409); + } + if (task.status !== "failed" && task.status !== "canceled" && task.status !== "succeeded") { + return jsonResponse({ + ok: false, + error: `task is not resumable: ${task.status}`, + accepted: false, + deliveryState: "not_accepted", + disposition: "not-terminal", + resumeId, + task: taskForResponse(task), + }, 409); + } + const traceOutput = appendOutput(task, "user", resumeTraceText(resumeId, prompt), "turn/resume", resumeId); + task.status = "queued"; + task.finishedAt = null; + task.readAt = null; + task.cancelRequested = false; + task.lastError = null; + task.maxAttempts = Math.max(task.maxAttempts, task.attempts.length + 1); + task.nextMode = "retry"; + task.nextPrompt = prompt; + setAttemptFeedbackPrompt(task.attempts.at(-1), task.nextPrompt, "manual-resume", task.attempts.length + 1); + task.updatedAt = nowIso(); + task.queueEnteredAt = task.updatedAt; + appendOutput(task, "system", `resume queued resumeId=${resumeId} reuseOriginalThread=${originalCodexThreadId !== null ? "true" : "false"}\n`, "manual-resume", resumeId); + armIdleNotification(); + persistState(); + scheduleQueue(queueIdOf(task)); + await flushDirtyTasksToDatabase(true); + const traceConfirmation = findResumeTraceConfirmation(task, resumeId, taskFullOutput(task)); + return jsonResponse({ + ok: true, + accepted: true, + duplicateSuppressed: false, + deliveryState: traceConfirmation.found ? "queued_for_existing_thread" : "accepted_response_timeout", + resumeId, + turnId: traceOutput?.seq ?? null, + promptHash: resumePromptHash(prompt), + reuseOriginalThread: originalCodexThreadId !== null, + originalCodexThreadId, + codexThreadId: task.codexThreadId, + traceConfirmation: resumeTraceConfirmationJson(traceConfirmation), + task: taskForResponse(task), + queue: await queueSummaryForResponse(), + }, 202); +} + async function editQueuedTaskPrompt(task: QueueTask, req: Request): Promise { if (!serviceRoleAllowsWrite(config.serviceRole)) return readOnlyRejectResponse(req.method, `/api/tasks/${task.id}/edit`); const notReady = requireDatabaseReadyForWrite(req.method, `/api/tasks/${task.id}/edit`); @@ -5797,7 +5899,7 @@ async function route(req: Request): Promise { if (task === null) return jsonResponse({ ok: false, error: "task not found" }, 404); return jsonResponse({ ok: true, summary: taskSummaryResponse(task, url) }); } - const match = url.pathname.match(/^\/api\/tasks\/([^/]+)(?:\/(retry|steer|interrupt|move|read|edit))?$/u); + const match = url.pathname.match(/^\/api\/tasks\/([^/]+)(?:\/(retry|resume|steer|interrupt|move|read|edit))?$/u); if (match !== null) { const action = match[2]; const taskId = decodeURIComponent(match[1] ?? ""); @@ -5812,6 +5914,7 @@ async function route(req: Request): Promise { : await findTaskForMutation(taskId); if (task === null) return jsonResponse({ ok: false, error: "task not found" }, 404); if (action === "retry" && req.method === "POST") return await manualRetry(task, req); + if (action === "resume" && req.method === "POST") return await resumeTask(task, req); if (action === "steer" && req.method === "POST") return await steerTask(task, req); if (action === "interrupt" && req.method === "POST") return await interruptTask(task, req.method); if (action === "move" && req.method === "POST") return await moveTaskToQueue(task, req); diff --git a/src/components/microservices/code-queue/src/resume-confirmation.ts b/src/components/microservices/code-queue/src/resume-confirmation.ts new file mode 100644 index 00000000..799ea878 --- /dev/null +++ b/src/components/microservices/code-queue/src/resume-confirmation.ts @@ -0,0 +1,142 @@ +import { createHash } from "node:crypto"; +import type { JsonValue, LiveOutput, QueueTask } from "./types"; + +const resumeIdPattern = /^[A-Za-z0-9._:-]{8,128}$/u; + +export type ResumeDeliveryState = "queued_for_existing_thread" | "duplicate_suppressed" | "not_accepted" | "accepted_response_timeout" | "unknown"; + +export interface ResumeTraceMatch { + seq: number; + at: string; + method: "turn/resume"; + resumeId: string; + promptChars: number; + promptHash: string; + promptOmitted: true; + source: "output"; +} + +export interface ResumeTraceConfirmation { + taskId: string; + resumeId: string; + found: boolean; + accepted: boolean; + deliveryState: ResumeDeliveryState; + trace: ResumeTraceMatch | null; + matches: ResumeTraceMatch[]; + duplicateSuppressionKey: string; +} + +export interface ResumeDuplicateDecision { + duplicate: boolean; + conflict: boolean; + confirmation: ResumeTraceConfirmation; + existingPromptHash: string | null; + requestedPromptHash: string; +} + +export function normalizeResumeId(value: unknown): string | null { + if (typeof value !== "string") return null; + const text = value.trim(); + if (!resumeIdPattern.test(text)) return null; + return text; +} + +export function resumePromptHash(prompt: string): string { + return createHash("sha256").update(prompt, "utf8").digest("hex"); +} + +export function createResumeId(taskId: string, prompt: string): string { + const hash = createHash("sha256") + .update("unidesk-code-queue-resume:v1", "utf8") + .update("\0", "utf8") + .update(taskId, "utf8") + .update("\0", "utf8") + .update(prompt, "utf8") + .digest("hex") + .slice(0, 24); + return `resume_${hash}`; +} + +export function resumeTraceText(resumeId: string, prompt: string): string { + return `\n[resume id=${resumeId} chars=${prompt.length} sha256=${resumePromptHash(prompt)}] ${prompt}\n`; +} + +export function normalizeResumePromptText(text: string): string { + return text.replace(/^\s*\[resume(?:\s+[^\]]*)?\]\s*/u, "").trimEnd(); +} + +function resumeTraceMeta(text: string): { promptChars: number | null; promptHash: string | null } { + const match = text.match(/^\s*\[resume(?:\s+[^\]]*)?\]\s*/u); + if (match === null) return { promptChars: null, promptHash: null }; + const header = match[0]; + const charsMatch = header.match(/\schars=(\d+)(?=\s|\])/u); + const hashMatch = header.match(/\ssha256=([a-f0-9]{64})(?=\s|\])/u); + return { + promptChars: charsMatch === null ? null : Number(charsMatch[1]), + promptHash: hashMatch?.[1] ?? null, + }; +} + +function outputMatch(item: LiveOutput, resumeId: string): ResumeTraceMatch | null { + if (item.channel !== "user" || item.method !== "turn/resume" || normalizeResumeId(item.itemId) !== resumeId) return null; + const prompt = normalizeResumePromptText(item.text); + const meta = resumeTraceMeta(item.text); + return { + seq: item.seq, + at: item.at, + method: "turn/resume", + resumeId, + promptChars: meta.promptChars ?? prompt.length, + promptHash: meta.promptHash ?? resumePromptHash(prompt), + promptOmitted: true, + source: "output", + }; +} + +export function findResumeTraceConfirmation(task: QueueTask, resumeId: string, output: LiveOutput[] = task.output): ResumeTraceConfirmation { + const matches = output + .map((item) => outputMatch(item, resumeId)) + .filter((item): item is ResumeTraceMatch => item !== null) + .sort((left, right) => left.seq - right.seq); + const trace = matches[0] ?? null; + return { + taskId: task.id, + resumeId, + found: trace !== null, + accepted: trace !== null, + deliveryState: trace !== null ? "queued_for_existing_thread" : "unknown", + trace, + matches, + duplicateSuppressionKey: resumeId, + }; +} + +export function resumeDuplicateDecision(task: QueueTask, resumeId: string, prompt: string, output: LiveOutput[] = task.output): ResumeDuplicateDecision { + const confirmation = findResumeTraceConfirmation(task, resumeId, output); + const requestedPromptHash = resumePromptHash(prompt); + const existingPromptHash = confirmation.trace?.promptHash ?? null; + return { + duplicate: confirmation.found && existingPromptHash === requestedPromptHash, + conflict: confirmation.found && existingPromptHash !== requestedPromptHash, + confirmation, + existingPromptHash, + requestedPromptHash, + }; +} + +export function resumeTraceConfirmationJson(confirmation: ResumeTraceConfirmation): JsonValue { + return { + taskId: confirmation.taskId, + resumeId: confirmation.resumeId, + found: confirmation.found, + accepted: confirmation.accepted, + deliveryState: confirmation.deliveryState, + trace: confirmation.trace as unknown as JsonValue, + matchCount: confirmation.matches.length, + matches: confirmation.matches.slice(0, 5) as unknown as JsonValue, + matchesTruncated: confirmation.matches.length > 5, + duplicateSuppressionKey: confirmation.duplicateSuppressionKey, + promptOmitted: true, + }; +} diff --git a/src/components/microservices/code-queue/src/task-view.ts b/src/components/microservices/code-queue/src/task-view.ts index d04e82be..3af5e988 100644 --- a/src/components/microservices/code-queue/src/task-view.ts +++ b/src/components/microservices/code-queue/src/task-view.ts @@ -1157,6 +1157,7 @@ function buildTaskTranscript(task: QueueTask, limit = 180, rawOutputWindow = 0, for (const item of outputItems) { if (initialPrompt !== null && item.channel === "user" && item.method === "enqueue") continue; if (item.channel === "user" && item.method === "turn/steer" && promptHistorySeqs.has(item.seq)) continue; + if (item.channel === "user" && item.method === "turn/resume") continue; if (isOpenCodeStepBoundaryMethod(item.method)) continue; if (item.channel === "command" && item.method === "item/started") { flushMessage();