fix: add Code Queue steer confirmation
This commit is contained in:
@@ -53,7 +53,7 @@ UniDesk 是一个以主 server 为统一入口的分布式工作平台;本文
|
||||
- `bun scripts/cli.ts codex task <taskId>`:按 Code Queue 任务 ID 查询默认审阅摘要,只返回原始 prompt、最终 response、最后错误和渐进披露命令;`--detail`、`codex output` 和 supervisor 大 `--limit` 仍默认有界,完整内容需显式 `--full`/`--full-text`/分页展开;`codex queues [--full] [--limit N] [--page N|--offset N]` 默认分页低噪声输出队列摘要,完整 upstream 只通过 raw command 显式获取。
|
||||
- `bun scripts/cli.ts codex unread [--repo owner/name] [--issue N] [--limit N]`:只读汇总完成未读积压并给出 repo/issue/status/queue 计数和 drill-down/read 命令;批量已读必须显式 `codex unread mark-read ... --confirm`,规则见 `docs/reference/cli.md`。
|
||||
- `bun scripts/cli.ts codex judge <taskId> --attempt <n> [--dry-run]`:按指定 task/attempt 用与队列 worker 相同的上下文构建和 MiniMax judge 调用路径单步复现完成判定;`--dry-run` 只输出 prompt/payload 诊断。
|
||||
- `bun scripts/cli.ts codex steer <taskId> [prompt|--prompt-file path|--prompt-stdin] [--dry-run] [--no-retry|--retry-attempts N]`:通过 Code Queue 私有代理向运行中的 active turn 注入纠偏提示,对 retryable tunnel abort 做有界重试诊断,真实成功只确认写入并返回后续查看命令,不回显 prompt 或完整 task state。
|
||||
- `bun scripts/cli.ts codex steer <taskId> [prompt|--prompt-file path|--prompt-stdin] [--steer-id id] [--dry-run] [--no-retry|--retry-attempts N]` / `codex steer-confirm <taskId> --steer-id <id>`:向运行中的 active turn 注入纠偏提示并用 `steerId` 做幂等/trace 确认;真实输出不回显 prompt,遇到 `deliveryUnconfirmed` 先查确认命令,不重复发送同一纠偏。
|
||||
- `bun scripts/cli.ts codex interrupt|cancel <taskId>`:通过 Code Queue 私有代理中断运行任务或取消 queued/retry_wait 任务,规则见 `docs/reference/cli.md`。
|
||||
- `bun scripts/playwright-cli.ts screenshot|open|eval ...`:UniDesk 仓库自带的 Playwright 指挥手测 wrapper,默认 headless,可用 `--session <id>` 复用 storageState,适合截图、打开页面和一次性 JS 取值;它不实现长驻浏览器 daemon、element ref `click/fill/snapshot` 会返回结构化 unsupported 和 `xvfb-run`/headless 下一步,规则见 `docs/reference/cli.md`。
|
||||
- `bun scripts/cli.ts server stop`:以异步 job 停止固定 Compose 项目中的全部 UniDesk 服务,停止后用 `server status` 复核。
|
||||
|
||||
@@ -46,7 +46,7 @@ CLI 可以从 `master` 快速演进,但必须兼容 `deploy.json` 固定的 CI
|
||||
- `schedule list|get|runs|run|retry-run|delete|upsert-pgdata-backup` 管理 backend-core 定时任务和运行历史。`schedule list`、`schedule get`、`schedule runs --limit N` 和 `schedule runs <scheduleId> --limit N` 是只读观察入口;`schedule run`、`schedule retry-run`、`schedule delete` 和 `schedule upsert-pgdata-backup` 会触发运行或写入配置,生产恢复时必须有明确授权。`schedule runs --limit N` 是全局历史视图,返回 `scope=global` 和 `scheduleId=null`;`schedule runs <scheduleId> --limit N` 是指定 schedule 历史视图,返回 `scope=schedule` 和对应 `scheduleId`。CLI 必须拒绝 `schedule runs 50` 这类纯数字位置参数,并提示使用 `schedule runs --limit 50`,避免把空数组误判成“没有历史 run”。`schedule run <id> --wait-ms N` 触发同一 schedule,并且即使 wait 超时也必须返回 `newRunId` 和 `observeCommand`;`schedule retry-run <failedRunId>` 只接受 failed run,从原 run 反查 `scheduleId` 后重触发同一 schedule,并输出 `originalRunId`、`scheduleId`、`newRunId` 和 `observeCommand`。当 backend-core 目标容器缺失或只观察到 verify-only 容器时,schedule/microservice 命令必须以非零退出并返回 `failureKind=target-stack-not-running`、`runnerDisposition=infra-blocked`、`readOnlyCommands` 和 `authorizationRequiredForRecovery`,不得把 Docker 的 `No such container` 当成成功的空历史。
|
||||
- `codex deploy <commitId>` 是旧 Code Queue 兼容部署入口,已禁用以防止维护通道直连 D601 部署 Code Queue;当前 dev 自动化只做 `ci run-dev-e2e` smoke,不提供 Code Queue CD,详细规则见 `docs/reference/codex-deploy.md`。
|
||||
- `codex submit [prompt] [--prompt-file path|--prompt-stdin] [--queue queueId] [--provider-id id] [--cwd path] [--model model] [--reasoning-effort effort] [--execution-mode mode] [--max-attempts N] [--reference-task-id id] [--dry-run]` 通过 backend-core 私有代理向稳定 `code-queue` 用户服务路径提交任务;prompt 必须且只能来自位置参数、文件或 stdin 之一,`--dry-run` 只返回结构化请求且不实际入队。长 prompt、多行 prompt、含引号/反引号/Markdown 表格/JSON/反斜杠的 prompt 必须优先用 `--prompt-stdin` 或 `--prompt-file`,不要拼进 shell 单个参数;位置参数只适合短单行 smoke prompt。stdin 推荐用 quoted heredoc:`cat <<'PROMPT' | bun scripts/cli.ts codex submit --prompt-stdin --queue <id> --dry-run`,文件路径推荐 `bun scripts/cli.ts codex submit --prompt-file /tmp/code-queue-prompt.md --queue <id> --dry-run`,确认 dry-run 后移除 `--dry-run` 提交同一 payload。dry-run 会额外输出 `routingRecommendation`,包含推荐 route、runner、model、风险信号、prompt 自包含/issue 非唯一来源/prod-secret-DB 禁止/运行态或 release 禁止/证据要求/中等复杂度候选等 guard 状态;同时输出 `policyContract`,固定暴露 GPT-5.5、DeepSeek、MiniMax 的风险分层、并发上限和外部 provider 429 退避处置。该建议只用于指挥官 preflight,不会改写 payload,不改变 runtime admission,也不假设生产 MiniMax 或 DeepSeek 可用。`--dry-run` 必须返回完整 prompt、字符数和 `truncated=false` 用于人工验收;真实提交是写入操作,默认只返回 `accepted=true`、task id、队列、写入保护摘要和后续查看命令,必须标记 `promptOmitted=true` 且不得回显 prompt 或 promptPreview。真实提交会经过本机本地串行化保护和短节流,避免同一指挥端并发 submit 把低内存主机或 `code-queue-mgr` 控制面打抖;返回值会附带 `executionMode`、`runnerPermissions` 和低噪声 `submitConcurrencyGuard`,显式说明 requested/effective mode、服务级 runner sandbox/approvalPolicy、锁与等待信息。`--execution-mode` 是 Code Queue runtime placement,不是 Codex sandbox 权限;有效模式是 `default` 和 `windows-native`,`--execution-mode full-access` 等 sandbox-like 值会保留 requested 值并显示 effective `default`,同时提示当前不支持每任务 sandbox override。真实提交的 `queue` 摘要保持低噪声:`submittedTaskIds`、`queuedTaskIds`、`activeTaskIds` 和 `databaseActiveTaskIds` 是有界预览对象,`countContext` 与 `counts` 是权威计数;`submitted.taskStates[]` 直接给出本次 task id、queue id、status 和 `state=queued|running|terminal|unknown`,其来源固定为 `response.tasks[].status`。当本次新任务仍是 queued/retry_wait,`queuedTaskIds.items` 必须包含该 id;当 counts 非零但 active/queued id 列表因为 split-brain-live、上游省略或默认有界披露而不可枚举时,预览必须设置 `idsUnavailable=true`、`itemsOmitted=true` 和 `itemsMeaning=not-enumerated-in-default-submit-output`,不得打印容易误读的 `items=[]`。`queue.activity.effectiveActiveTaskCount` 和 `queue.commanderConcurrency.activeRunnerCount` 是并发判断字段;`splitBrainLive=true` 时继续把 fresh heartbeat/database active 计入 active。需要原始 drill-down 时使用 `queue.listPreviewPolicy.rawCommand`,默认是 `bun scripts/cli.ts microservice proxy code-queue /api/tasks/overview?limit=30 --raw --full`。backend-core 默认把提交、队列 CRUD、已读状态、历史摘要和轻量 Trace 读取分流到主 server `code-queue-mgr`,由它写入主 PostgreSQL;D601 scheduler 只轮询并执行已入库任务。
|
||||
- `codex steer <taskId> [prompt|--prompt-file path|--prompt-stdin] [--dry-run] [--no-retry|--retry-attempts N] [--full|--raw]` 向运行中的 Code Queue 任务发送纠偏 prompt。真实成功只返回低噪声写入确认,不回显 prompt 或完整任务状态;失败默认只返回 `accepted=false`、原因、scope、retryable、attempt 摘要、operator guidance 和 task/read/submit/health drill-down 命令。`upstreamBodyPreview`、request 元数据和 raw upstream failure 必须显式加 `--full` 或 `--raw` 才输出。任务已终态时返回紧凑 `task-already-terminal`、状态、终态状态、更新时间、`retryable=false` 和 `codex task` / `codex read` / `codex submit --reference-task-id <taskId>` 后续命令。
|
||||
- `codex steer <taskId> [prompt|--prompt-file path|--prompt-stdin] [--steer-id id] [--dry-run] [--no-retry|--retry-attempts N] [--full|--raw]` 向运行中的 Code Queue 任务发送纠偏 prompt。CLI 会为同一 task/prompt 生成稳定 `steerId`,也允许显式传入 `--steer-id`;所有 retry 都复用同一 `steerId`,支持后端按 key 抑制重复 trace 注入。真实成功只返回低噪声写入确认,不回显 prompt 或完整任务状态;输出包含 `steer.status`、`steer.deliveryState`、`steer.steerId`、`traceConfirmation` 和 `commands.traceConfirm`。失败默认只返回 `accepted=false`、原因、scope、retryable、attempt 摘要、operator guidance 和 task/read/submit/health drill-down 命令。`upstreamBodyPreview`、request 元数据和 raw upstream failure 必须显式加 `--full` 或 `--raw` 才输出。任务已终态时返回紧凑 `task-already-terminal`、`status=not_accepted`、`deliveryState=not_accepted`、task 状态、终态状态、更新时间、`retryable=false` 和 `codex task` / `codex read` / `codex submit --reference-task-id <taskId>` 后续命令。
|
||||
- `codex pr-preflight [--remote] [--push-dry-run --push-dry-run-ref refs/heads/probe/<name>] [--pr-create-dry-run --pr-create-dry-run-head <head>] [--issue N] [--full|--raw]` 通过稳定 `code-queue` proxy 请求 D601 scheduler `/api/runtime-preflight`,用于 PR 型派单 admission。默认输出是紧凑 commander 视图,显式分出 `schedulerPreflight` 与 `activeRunnerPrCapability`,并附带 `commands` 和 `disclosure`,方便先看 scheduler auth 缺口、再看当前 runner/dev container 的 `gh auth status` 与 `gh pr create --dry-run` 能力;`--full` 或 `--raw` 才展开完整 `preflight`、工具、agent port、Git worktree、GitHub egress、repo/issue/PR 只读探测和观测原文。只报告 `GH_TOKEN`/`GITHUB_TOKEN` 是否存在和来源 key,不打印值。当 auth-broker 配置存在时,`tokenCoverage.source="auth-broker"`、`credentialSource="broker-issued-token"` 且 runner env token 不是成功前提;当仅 env token 存在时,`credentialSource="env-token"` 且 `authBroker.nextAction="use-env-token-until-auth-broker-live"`;两者都缺失时顶层 `ok=false`、`runnerDisposition=infra-blocked`、`degradedReason=auth-broker-needed`,`tokenCoverage.missing` 同时列出 `GH_TOKEN` 与 `GITHUB_TOKEN`,并输出 `authBroker.source="broker/auth-broker-needed"`、`capability.source="missing-token"`。该 `auth-missing` 的 scope 是 `scheduler-runner-env`,不能简化成“当前 active runner/dev container 不能创建 PR”;默认视图必须带 `scopeBoundary` 和 `activeRunnerPrCapability`。GitHub DNS/API 连接失败应归类为 `failureKind=github-transient`、`degradedReason=github-dns-api-transient`,并带 `retryable=true`、`commanderAction=retry-backoff-or-keep-running-if-heartbeat-fresh` 和有界 `githubTransient.failedProbes`;调用方应重试/退避,且在任务 heartbeat/trace 新鲜时继续监督,不把它当成 auth 缺失或 PR 语义失败。`prCapability` 是 runner-facing 合同摘要,必须包含目标分支、token/auth 来源、`systemGhBinaryRequiredForWrites=false`、UniDesk REST `bun scripts/cli.ts gh` 可用性、push dry-run/PR create dry-run 的 `writesRemote=false`、expected PR handoff、真实 PR 创建需要 commander 授权和 `gh pr merge` 的 `unsupported-command` 边界;系统 `gh` binary 缺失只进入 `tools.systemGhBinary`,不得误判为 UniDesk REST `gh` CLI 不可用。`--remote` 在 runner-like 环境里不再依赖本地 `unidesk-backend-core`、`unidesk-database`、`baidu-netdisk-backend` 容器存在;这些缺失只作为本地观测证据。若远程控制面可达,则继续走远程控制面结果;若远程控制面不可达,则结构化返回 `failureKind=control-plane-missing` / `degradedReason=remote-control-plane-unreachable`,而不是把本地 `backend-core-container-missing` 当作最终阻塞。`--pr-create-dry-run` 不 POST GitHub,只证明 runner 内 PR body 生成、`scripts/cli.ts gh pr create --dry-run` 和 branch 参数形态可用;服务端创建权限仍以 token/auth broker、repo/issue/PR read、push dry-run 和最终授权后的真实 PR 创建结果为准。
|
||||
- `codex task <taskId>` 通过 Code Queue 私有代理按任务 ID 查询结构化审阅摘要;默认只返回任务身份、执行 Provider、工作目录、attempt 计数、原始 prompt、最终 response、最后错误和渐进披露命令,适合指挥官审阅完成未读任务且避免上下文爆炸。`--detail` 仍是有界详细摘要:默认只返回少量 attempt/tool 行、短 prompt/response/stderr/feedback 预览和 omitted/truncated 元数据;需要完整 prompt/response 文本或更多 tool/attempt 细节时再显式加 `--full`、`--tool-limit N`、`--trace` 或 `codex output`。该摘要读取默认由主 server `code-queue-mgr` 从 PostgreSQL 返回,不依赖 D601 `code-queue-read` Service 可用。
|
||||
- `codex tasks [--view supervisor|full] [--queue id] [--status succeeded|running|queued|failed|canceled|judging|retry_wait[,..]] [--unread|--unread-only] [--limit N] [--before-id id]` 通过同一私有代理输出渐进式披露视图。默认 `supervisor` 是低噪声指挥官视图,只返回 `activeRunning`、`running`、`completedUnread`、`recentCompleted`、`queued`、`activity`、`commanderConcurrency` 和 `executionDiagnostics` 的紧凑行;`activeRunning.count` 是 running+judging 的状态计数,`exact=true` 时来自 queue summary counts,`running.returned` 和 `activeRunning.rowPage.returned` 只是本次返回的紧凑行数。`commanderConcurrency.activeRunnerCount` 是并发策略应使用的 active/running 计数,等于 `activity.effectiveActiveTaskCount`;15 并发策略按 `15 - activeRunnerCount` 计算剩余窗口。`commanderConcurrency.splitBrainDisposition=live-count-as-active` 表示 split-brain 有 fresh heartbeat 证据,应继续监督并计入 active;`interventionRequired=true` 才提示介入。prompt/body 只给短预览和原始字符数,`running`/`completedUnread`/`queued` 默认只返回一个有界小页并通过 section `commands.next` 继续分页,`recentCompleted` 默认限量且不重复 `completedUnread` 未读终态,不嵌入完整 Trace、final response 或全量 overview。`--limit` 在 supervisor 中主要是扫描/分页预算,不是返回几十条肥行的开关;CLI 安全上限是 100,输出会在 `filters.requestedLimit`、`filters.effectiveLimit`、`filters.limitCapped` 和 `disclosure.limitPolicy` 说明显式请求是否被 capped;底层 overview 拉取预算独立显示在 `source.requestedLimit` / `source.effectiveLimit`,所以 `--limit 260` 应显示 requested=260、effective=100、source requested/effective=200,而不是只露出一个含糊的 `limit`。`--unread` 是 `--unread-only` 的别名,必须只保留未读终态;`--status` 必须真实过滤支持的状态,未知参数或未知状态必须结构化失败。需要更详细当前页任务行时显式使用 `--view full` 或 `--full`,仍受 `--limit` 和 `--before-id` 分页约束。
|
||||
@@ -56,8 +56,9 @@ CLI 可以从 `master` 快速演进,但必须兼容 `deploy.json` 固定的 CI
|
||||
- `codex read <taskId>` 在人工审阅后标记单个终态任务已读,并在同一次响应中返回稳定任务身份、执行元数据、终态 attempt 摘要、最后错误或 judge 信息和最终 response,避免标记已读后还要额外 drill-down 才能确认结果。该命令不返回完整 prompt、tool logs 或 feedback prompt,只返回字符数、计数和 `codex task/detail/trace/output` 渐进披露命令;列表、overview 和 supervisor 视图只返回这个命令字段,不得自动执行,也不得批量清空未读状态。
|
||||
- `codex dev-ready` 查询 Code Queue `/api/dev-ready` 并返回有界 readiness 摘要,包括工具、Docker、Codex config、SSH 和 `devReady.skills`。`devReady.skills` 只暴露 `UNIDESK_SKILLS_PATH`、是否存在、是否只读、skillCount、`cli-spec` 是否可见和修复建议,不输出宿主 auth/token 文件内容。
|
||||
- `codex judge <taskId> --attempt N [--dry-run] [--include-prompt]` 通过 Code Queue 私有代理按指定 attempt 单步复现 judge;这是执行面诊断入口,仍依赖 D601 scheduler/runner 侧的真实 judge builder、MiniMax 调用路径和执行环境。默认会真实调用 MiniMax,`--dry-run` 只返回 prompt/payload 大小、attempt 窗口和重建来源诊断,`--include-prompt` 仅用于本地深度排查。
|
||||
- `codex steer <taskId> [prompt|--prompt-file path|--prompt-stdin] [--dry-run] [--no-retry|--retry-attempts N] [--full|--raw]` 通过 Code Queue 私有代理向正在运行的 task 注入纠偏提示,正式替代底层 `microservice proxy code-queue /api/tasks/<taskId>/steer` 调用。prompt 必须且只能来自位置参数、文件或 stdin 之一;`--dry-run` 只输出 `method`、`path`、`stableProxyPath`、retry policy、prompt 字符数、截断预览和 raw proxy 等价命令,不触碰运行中 session,也不得泄露超长 prompt 全文。真实执行是写入操作,成功只返回 `accepted=true`、task id、prompt 字符数、`promptOmitted=true`、有界 task/queue 确认、attempt summary 和后续查看命令,不回显 prompt 或完整 task state;路径固定为 `/api/microservices/code-queue/proxy/api/tasks/<taskId>/steer`,只能作用于 D601 scheduler 上存在 active steerable turn 的 running task。默认对 `stable-proxy-failed` 和 `backend-core-unreachable` 这类 retryable control-plane failures 做一次有界重试;`--retry-attempts N` 最大为 3,`--retry-delay-ms N` 最大为 5000,`--no-retry` 用于复现单次失败。
|
||||
- `codex steer` 非 dry-run 失败仍输出 JSON 且退出非零;`.data.diagnostics.reason` 用于 runner 分流,当前包括 `backend-core-unreachable`、`code-queue-microservice-unregistered`、`proxy-unauthorized`、`proxy-404`、`steer-endpoint-404`、`upstream-runtime-rejected`、`stable-proxy-failed` 和 `invalid-proxy-response`。`scope` 区分 `backend-core`、`stable-proxy`、`code-queue-runtime` 或 `unknown`,默认带 `status`、`exitCode`、`retryable`、`attempts`、`retryPolicy`、operator guidance 和推荐交叉验证命令,但不回显 steer prompt、不带 request body、不带 `upstreamBodyPreview`,也不带 raw upstream response;这些详细字段必须显式加 `--full` 或 `--raw` 才展开。若任务不在 running/active-turn 状态,通常归类为 `upstream-runtime-rejected`,不得静默成功。`502 provider HTTP tunnel failed`、`provider-gateway-http-fetch`、`The operation was aborted` 或约 30 秒 tunnel wait abort 会归类为 `stable-proxy-failed`,CLI 会先按 retry policy 重试;如果仍失败,`.data.diagnostics.operatorGuidance.rawProxyEquivalentIsFallback=false` 表示 raw proxy 等价命令走同一条 tunnel,只能用于对照诊断,不应被当作更低噪声 fallback。此时 `.data.steer.deliveryUnconfirmed=true`,指挥官应先看 `codex tasks --view supervisor`、`codex task <taskId>` 和 `microservice health code-queue`,再从主 server CLI 或显式 SSH transport 重试同一个 `codex steer`。若 D601 返回的 409 已包含 terminal task state,CLI 默认改为紧凑终态响应:`reason=task-already-terminal`、task status、terminal status、`updatedAt`/`finishedAt`、`retryable=false`,并只给出 `codex task <taskId>`、`codex read <taskId>` 和 `codex submit --prompt-file <path> --reference-task-id <taskId>` 后续命令,不回显 steer prompt、完整 request body 或大 task object。
|
||||
- `codex steer <taskId> [prompt|--prompt-file path|--prompt-stdin] [--steer-id id] [--dry-run] [--no-retry|--retry-attempts N] [--full|--raw]` 通过 Code Queue 私有代理向正在运行的 task 注入纠偏提示,正式替代底层 `microservice proxy code-queue /api/tasks/<taskId>/steer` 调用。prompt 必须且只能来自位置参数、文件或 stdin 之一;`--dry-run` 只输出 `method`、`path`、`stableProxyPath`、`steerId`、retry policy、prompt 字符数、截断预览、`commands.traceConfirm` 和 raw proxy 等价命令,不触碰运行中 session,也不得泄露超长 prompt 全文。真实执行是写入操作,成功只返回 `accepted=true`、`status=accepted`、`deliveryState`、`steerId`、task id、prompt 字符数、`promptOmitted=true`、有界 `traceConfirmation`、task/queue 确认、attempt summary 和后续查看命令,不回显 prompt 或完整 task state;路径固定为 `/api/microservices/code-queue/proxy/api/tasks/<taskId>/steer`,只能作用于 D601 scheduler 上存在 active steerable turn 的 running task。默认对 `stable-proxy-failed` 和 `backend-core-unreachable` 这类 retryable control-plane failures 做一次有界重试;`--retry-attempts N` 最大为 3,`--retry-delay-ms N` 最大为 5000,`--no-retry` 用于复现单次失败。
|
||||
- `codex steer-confirm <taskId> --steer-id <id> [--raw]` 是只读 trace confirmation lookup。默认输出 `traceConfirmation.found/accepted/deliveryState/trace.seq/trace.at/promptChars/promptHash` 和 `delivery.status`,不回显 prompt;`--raw` 才附带原始 backend confirmation body。该命令用于处理 stable-proxy abort 后的 `deliveryUnconfirmed`,不要用重复 prompt 代替确认查询。
|
||||
- `codex steer` 非 dry-run 失败仍输出 JSON 且退出非零,除非后续 `steer-confirm` 证明 trace 已含该 `steerId`,此时返回 `ok=true`、`steer.status=accepted_response_timeout`、`deliveryState=accepted_response_timeout`,表示 backend 已接受但响应路径超时。`.data.diagnostics.reason` 用于 runner 分流,当前包括 `backend-core-unreachable`、`code-queue-microservice-unregistered`、`proxy-unauthorized`、`proxy-404`、`steer-endpoint-404`、`upstream-runtime-rejected`、`stable-proxy-failed` 和 `invalid-proxy-response`。`scope` 区分 `backend-core`、`stable-proxy`、`code-queue-runtime` 或 `unknown`,默认带 `status`、`exitCode`、`retryable`、`attempts`、`retryPolicy`、operator guidance 和推荐交叉验证命令,但不回显 steer prompt、不带 request body、不带 `upstreamBodyPreview`,也不带 raw upstream response;这些详细字段必须显式加 `--full` 或 `--raw` 才展开。若任务不在 running/active-turn 状态,通常归类为 `upstream-runtime-rejected` 并显示 `status=not_accepted` / `deliveryState=not_accepted`,不得静默成功。`502 provider HTTP tunnel failed`、`provider-gateway-http-fetch`、`The operation was aborted` 或约 30 秒 tunnel wait abort 会归类为 `stable-proxy-failed`,CLI 会先按 retry policy 重试并在失败后自动做一次 `steer-confirm` 查询;如果 `.data.steer.deliveryUnconfirmed=true` 仍为 true,指挥官应运行输出中的 `commands.traceConfirm`,确认仍 unknown 后才用输出中的 `commands.retry` 复用同一个 `--steer-id` 重试。后端已支持同 `steerId`+同 prompt 的重复请求返回 `duplicateSuppressed=true`,不会再次注入 trace;同 `steerId` 搭配不同 prompt 会返回 409 conflict。若 D601 返回的 409 已包含 terminal task state,CLI 默认改为紧凑终态响应:`reason=task-already-terminal`、`status=not_accepted`、`deliveryState=not_accepted`、task status、terminal status、`updatedAt`/`finishedAt`、`retryable=false`,并只给出 `codex task <taskId>`、`codex read <taskId>` 和 `codex submit --prompt-file <path> --reference-task-id <taskId>` 后续命令,不回显 steer prompt、完整 request body 或大 task object。
|
||||
- `codex interrupt|cancel <taskId>` 通过 Code Queue 私有代理请求中断;running/judging 任务会请求 D601 当前 agent run 停止,queued/retry_wait 任务的取消也必须保持与 WebUI 相同代理路径,返回有界 task 摘要和后续查询命令。任何需要接触 active run 的动作仍属于 D601 执行面。
|
||||
- Code Queue 多队列 lane 由 `codex` 命令命名空间管理:`queues [--full|--all] [--limit N] [--page N|--offset N]` 列表、`queue create <queueId>` 创建、`queue merge <sourceQueueId> --into <targetQueueId>` 合并、`move <taskId> --queue <queueId>` 迁移;这些队列管理入口默认由主 server `code-queue-mgr` 直管 PostgreSQL,仍通过稳定 `code-queue` 用户服务代理路径访问。`codex queues` 默认只返回 active/nonempty/unread/runnable queue 摘要、activity、commanderConcurrency、全局 counts 和 execution diagnostics;`--full` 或 `--all` 只切换为完整队列行视图的一页,仍受 `--limit`/`--page`/`--offset` 分页约束,不再默认携带 deprecated full array。summary 和 full 的稳定机读路径都是 `.data.queues.items[]`,全局元数据固定在 `.data.queues.commanderConcurrency`、`.data.queues.activity`、`.data.queues.counts`、`.data.queues.executionDiagnostics`、`.data.queues.activeTaskIds` 和 `.data.queues.queuedTaskIds`;需要完整 upstream 时使用输出中的 raw command。`commanderConcurrency.activeRunnerCount` / `activity.effectiveActiveTaskCount` 是指挥官并发判断的有效活跃数,`schedulerLocalActiveQueueCount`/`activeQueueIds` 只描述本地 scheduler active-run slots,不能覆盖数据库 running 计数或 heartbeat-fresh runner 计数。旧 full 顶层数组语义已作为 deprecated 兼容信息记录,不再作为 `.data.queues` 主形态。同一个 queue 内部串行执行,不同 queue 之间并行执行。迁移只允许尚未被 scheduler claim 的 `queued`/`retry_wait` 任务,必须满足 `startedAt=null`、`currentAttempt=0` 且没有 active thread/turn;已进入 `running`/`judging` 或已有 claim 标记的任务返回 409,不得被 move/merge 回写成 queued。合并会移动可迁移任务归属并自动删除源 queue 记录,只保留合并后的目标 queue;若 source 或 target queue 存在 active/claimed 任务,合并整体返回 409。合并后的目标 queue 按任务原 `queueEnteredAt`/`createdAt` 时间顺序串行,成功迁移 queued/retry_wait 任务后由 D601 scheduler 轮询推进。
|
||||
- 所有 `codex` 查询和管理命令必须走与 WebUI 相同的 backend-core 私有代理路径 `/api/microservices/code-queue/proxy/...`;CLI 不得为了提交、移动、中断、取消或队列管理直接调用 D601 内部 Service、数据库、pod curl 或 k3sctl scheduler 子服务。若该路径失败,应先修复 CLI/backend/provider tunnel 链路,而不是绕过控制面。
|
||||
|
||||
@@ -337,7 +337,10 @@ D601 artifact registry 的 systemd unit inactive 不等于 D601 全局离线。
|
||||
只有存在明确理由时才干预。
|
||||
|
||||
- 如果任务还在运行且 trace 或 scheduler heartbeat 新鲜,应引导而不是 interrupt。
|
||||
- 对运行中任务的引导应优先使用正式 CLI:`bun scripts/cli.ts codex steer <taskId> --prompt-file <path>`。该命令和 `codex task/tasks/read` 复用同一个 backend-core stable proxy helper;`--dry-run` 会显示 `method/path/stableProxyPath`、retry policy、prompt 摘要和 raw proxy 等价命令但不发送。非 dry-run 默认对 `stable-proxy-failed` 和 `backend-core-unreachable` 做一次有界重试,失败时先看 `.data.diagnostics.reason`、`.data.diagnostics.attempts` 和 `.data.diagnostics.operatorGuidance`:`backend-core-unreachable` 属于本机到 core 的观察路径,`code-queue-microservice-unregistered`/`proxy-unauthorized`/`proxy-404` 属于 stable proxy 配置或权限,`steer-endpoint-404`/`upstream-runtime-rejected` 属于 D601 runtime 或任务状态,`stable-proxy-failed` 多为 provider/k3s/tunnel 链路问题。默认失败输出必须保持低噪声:不回显 steer prompt、不带 request body、不带 upstream body preview,也不带 raw response;需要上游预览或原始失败对象时显式重跑 `--full` 或 `--raw`。`502 provider HTTP tunnel failed`、`The operation was aborted`、约 30 秒 provider tunnel wait abort 仍失败时,`.data.steer.deliveryUnconfirmed=true`;指挥官应先用 `codex tasks --view supervisor --limit 20`、`codex task <taskId>` 和 `microservice health code-queue` 交叉确认任务活性,再从主 server CLI 或显式 SSH transport 重试同一个 steer。raw proxy 等价命令走同一条 tunnel,`rawProxyEquivalentIsFallback=false`,只能做诊断对照,不应作为正式 fallback。若任务已终态,CLI 返回紧凑 `task-already-terminal` 响应并给出 `bun scripts/cli.ts codex task <taskId>`、`bun scripts/cli.ts codex read <taskId>` 和 `bun scripts/cli.ts codex submit --prompt-file <path> --reference-task-id <taskId>`;指挥官应提交 follow-up task,而不是继续 steer 终态任务。若正式 CLI 自身不可用,临时通过受控 microservice proxy 调用只能作为现场恢复手段;这类绕行必须记录到指挥简报 issue #24 主体的常驻观察,并创建正式 issue 补齐 CLI 能力,避免长期依赖隐式 API。
|
||||
- 对运行中任务的引导应优先使用正式 CLI:`bun scripts/cli.ts codex steer <taskId> --prompt-file <path>`。该命令和 `codex task/tasks/read` 复用同一个 backend-core stable proxy helper;`--dry-run` 会显示 `method/path/stableProxyPath`、`steerId`、retry policy、prompt 摘要、`commands.traceConfirm` 和 raw proxy 等价命令但不发送。非 dry-run 默认对 `stable-proxy-failed` 和 `backend-core-unreachable` 做一次有界重试,且每次 retry 都复用同一个 `steerId`;后端按同 `steerId`+同 prompt 抑制重复 trace 注入,返回 `duplicateSuppressed=true`,同 `steerId` 搭配不同 prompt 返回 conflict。
|
||||
- 真实 steer 输出必须保持低噪声:成功显示 `steer.status`、`steer.deliveryState`、`steer.steerId`、有界 `traceConfirmation` 和后续命令,不回显 prompt 或完整 task state;失败默认不带 request body、不带 upstream body preview,也不带 raw response,需要上游预览或原始失败对象时显式重跑 `--full` 或 `--raw`。`deliveryState=accepted` 表示 backend 已接受;`not_accepted` 表示任务状态/权限/输入未接受;`accepted_response_timeout` 表示 stable proxy 响应超时但 trace confirmation 找到该 `steerId`;`unknown` 表示响应路径失败且确认查询仍未证明接受。
|
||||
- `502 provider HTTP tunnel failed`、`The operation was aborted`、约 30 秒 provider tunnel wait abort 仍失败时,先看 `.data.diagnostics.reason`、`.data.diagnostics.attempts`、`.data.traceConfirmation` 和 `.data.commands.traceConfirm`。若 `.data.steer.deliveryUnconfirmed=true`,不要立刻复制同一纠偏 prompt 再发;先运行 `bun scripts/cli.ts codex steer-confirm <taskId> --steer-id <id>`。如果确认 accepted 或 accepted_response_timeout,停止重发并继续观察 `codex task <taskId>` / `codex task <taskId> --trace --tail --limit 80`;如果仍 unknown,再用输出中的 retry 命令复用同一个 `--steer-id`。raw proxy 等价命令走同一条 tunnel,`rawProxyEquivalentIsFallback=false`,只能做诊断对照,不应作为正式 fallback。
|
||||
- 失败分流仍以 `.data.diagnostics.reason` 为准:`backend-core-unreachable` 属于本机到 core 的观察路径,`code-queue-microservice-unregistered`/`proxy-unauthorized`/`proxy-404` 属于 stable proxy 配置或权限,`steer-endpoint-404`/`upstream-runtime-rejected` 属于 D601 runtime 或任务状态,`stable-proxy-failed` 多为 provider/k3s/tunnel 链路问题。若任务已终态,CLI 返回紧凑 `task-already-terminal` 响应并给出 `bun scripts/cli.ts codex task <taskId>`、`bun scripts/cli.ts codex read <taskId>` 和 `bun scripts/cli.ts codex submit --prompt-file <path> --reference-task-id <taskId>`;指挥官应提交 follow-up task,而不是继续 steer 终态任务。若正式 CLI 自身不可用,临时通过受控 microservice proxy 调用只能作为现场恢复手段;这类绕行必须记录到指挥简报 issue #24 主体的常驻观察,并创建正式 issue 补齐 CLI 能力,避免长期依赖隐式 API。
|
||||
- 如果任务进入终态但缺少必要验收证据,应使用聚焦 continuation prompt retry 同一任务。
|
||||
- 如果任务被可复用基础设施缺陷阻塞,应把该缺陷分配给合适的空闲或低风险队列,让原业务任务等待,或在修复后 retry。
|
||||
- 如果基础设施缺陷影响 Code Queue 控制面可用性,指挥官可以执行恢复队列所需的最小受控部署,然后验证原任务能继续。
|
||||
|
||||
+1
-1
@@ -67,7 +67,7 @@ function displayCommandName(parts: string[]): string {
|
||||
}
|
||||
if (parts[0] === "codex" && parts[1] === "steer" && parts[2] !== undefined) {
|
||||
const shown = ["codex", "steer", parts[2]];
|
||||
const shownValueOptions = new Set(["--prompt-file", "--file", "--retry-attempts", "--retry-delay-ms"]);
|
||||
const shownValueOptions = new Set(["--prompt-file", "--file", "--retry-attempts", "--retry-delay-ms", "--steer-id", "--steerId"]);
|
||||
const hasPromptFile = parts.includes("--prompt-file") || parts.includes("--file");
|
||||
const hasPromptStdin = parts.includes("--prompt-stdin") || parts.includes("--stdin");
|
||||
const hasHelp = parts.slice(3).some(isHelpToken);
|
||||
|
||||
@@ -2,7 +2,7 @@ import { spawnSync } from "node:child_process";
|
||||
import { writeFileSync, unlinkSync } from "node:fs";
|
||||
import { join } from "node:path";
|
||||
import { tmpdir } from "node:os";
|
||||
import { codexSteerTaskForTest } from "./src/code-queue";
|
||||
import { codexSteerTaskForTest, codexSteerTraceConfirmForTest } from "./src/code-queue";
|
||||
|
||||
type JsonRecord = Record<string, unknown>;
|
||||
|
||||
@@ -45,6 +45,10 @@ function stringArray(value: unknown): string[] {
|
||||
return Array.isArray(value) ? value.map((item) => String(item)) : [];
|
||||
}
|
||||
|
||||
function deterministicSteerId(taskId: string, prompt: string): string {
|
||||
return `steer_${Bun.SHA256.hash(`unidesk-code-queue-steer:v1\0${taskId}\0${prompt}`, "hex").slice(0, 24)}`;
|
||||
}
|
||||
|
||||
function assertDryRunPrompt(response: JsonRecord, expectedText: string): void {
|
||||
assertCondition(response.ok === true, "CLI dry-run should succeed", response);
|
||||
const data = nestedRecord(response.data, []);
|
||||
@@ -57,10 +61,13 @@ function assertDryRunPrompt(response: JsonRecord, expectedText: string): void {
|
||||
assertCondition(prompt.text === expectedText, "dry-run prompt text mismatch", prompt);
|
||||
assertCondition(prompt.chars === expectedText.length, "dry-run prompt char count mismatch", prompt);
|
||||
assertCondition(prompt.truncated === false, "dry-run prompt must not truncate", prompt);
|
||||
assertCondition(String(request.steerId || "").startsWith("steer_"), "dry-run should expose deterministic steerId", request);
|
||||
const bodySummary = nestedRecord(response.data, ["request", "bodySummary"]);
|
||||
assertCondition(bodySummary.promptChars === expectedText.length, "dry-run should expose body prompt char count", bodySummary);
|
||||
assertCondition(bodySummary.steerId === request.steerId, "dry-run body summary should repeat steerId", { bodySummary, request });
|
||||
const commands = nestedRecord(response.data, ["commands"]);
|
||||
assertCondition(String(commands.rawProxy || "").includes("microservice proxy code-queue /api/tasks/codex_test_task/steer --method POST"), "dry-run should expose raw proxy equivalent", commands);
|
||||
assertCondition(String(commands.traceConfirm || "").includes(`--steer-id ${String(request.steerId)}`), "dry-run should expose trace confirmation command", commands);
|
||||
}
|
||||
|
||||
function assertReason(result: unknown, reason: string, status: number | null): void {
|
||||
@@ -124,15 +131,30 @@ export function runCodeQueueCliSteerContract(): JsonRecord {
|
||||
let fetchPath = "";
|
||||
let fetchMethod = "";
|
||||
let fetchPrompt = "";
|
||||
let fetchSteerId = "";
|
||||
const success = codexSteerTaskForTest("direct_task", ["send this"], (path, init) => {
|
||||
fetchPath = path;
|
||||
fetchMethod = String(init?.method || "");
|
||||
fetchPrompt = String((init?.body as JsonRecord | undefined)?.prompt || "");
|
||||
fetchSteerId = String((init?.body as JsonRecord | undefined)?.steerId || "");
|
||||
return {
|
||||
ok: true,
|
||||
status: 200,
|
||||
body: {
|
||||
ok: true,
|
||||
accepted: true,
|
||||
deliveryState: "accepted",
|
||||
steerId: fetchSteerId,
|
||||
traceConfirmation: {
|
||||
taskId: "direct_task",
|
||||
steerId: fetchSteerId,
|
||||
found: true,
|
||||
accepted: true,
|
||||
deliveryState: "accepted",
|
||||
matchCount: 1,
|
||||
trace: { seq: 5, at: "2026-05-23T00:00:00.000Z", method: "turn/steer", steerId: fetchSteerId, promptChars: 9, promptHash: "hash", promptOmitted: true, source: "promptHistory" },
|
||||
duplicateSuppressionKey: fetchSteerId,
|
||||
},
|
||||
task: { id: "direct_task", status: "running", prompt: "p" },
|
||||
queue: { activeTaskIds: ["direct_task"] },
|
||||
},
|
||||
@@ -141,7 +163,12 @@ export function runCodeQueueCliSteerContract(): JsonRecord {
|
||||
assertCondition(fetchPath === "/api/microservices/code-queue/proxy/api/tasks/direct_task/steer", "non-dry-run should use stable proxy path", { fetchPath });
|
||||
assertCondition(fetchMethod === "POST", "non-dry-run should POST", { fetchMethod });
|
||||
assertCondition(fetchPrompt === "send this", "non-dry-run should send raw prompt in body", { fetchPrompt });
|
||||
assertCondition(fetchSteerId === deterministicSteerId("direct_task", "send this"), "non-dry-run should send deterministic steerId", { fetchSteerId });
|
||||
assertCondition(nestedRecord(success, ["steer"]).accepted === true, "successful steer should report accepted=true", success);
|
||||
assertCondition(nestedRecord(success, ["steer"]).steerId === fetchSteerId, "successful steer should report steerId", success);
|
||||
assertCondition(nestedRecord(success, ["steer"]).deliveryState === "accepted", "successful steer should report delivery state", success);
|
||||
assertCondition(nestedRecord(success, ["traceConfirmation"]).accepted === true, "successful steer should expose bounded trace confirmation", success);
|
||||
assertCondition(String(nestedRecord(success, ["commands"]).traceConfirm || "").includes(`--steer-id ${fetchSteerId}`), "successful steer should expose trace confirmation command", success);
|
||||
const successJson = JSON.stringify(success);
|
||||
assertCondition(nestedRecord(success, ["steer"]).promptOmitted === true, "successful steer should mark prompt omitted", success);
|
||||
assertCondition(!successJson.includes("send this"), "successful steer must not echo prompt text", success);
|
||||
@@ -166,38 +193,82 @@ export function runCodeQueueCliSteerContract(): JsonRecord {
|
||||
attempts: [{ attempt: 1, ok: false, durationMs: 30003, timeoutMs: 30000, result: { ok: false, error: "The operation was aborted" } }],
|
||||
};
|
||||
let retryCalls = 0;
|
||||
const retryThenSuccess = codexSteerTaskForTest("direct_task", ["transient correction", "--retry-delay-ms", "0"], () => {
|
||||
const retrySteerIds: string[] = [];
|
||||
const retryThenSuccess = codexSteerTaskForTest("direct_task", ["transient correction", "--retry-delay-ms", "0"], (_path, init) => {
|
||||
retryCalls += 1;
|
||||
retrySteerIds.push(String((init?.body as JsonRecord | undefined)?.steerId || ""));
|
||||
if (retryCalls === 1) return { ok: false, status: 502, body: abortedTunnelBody };
|
||||
return {
|
||||
ok: true,
|
||||
status: 200,
|
||||
body: {
|
||||
ok: true,
|
||||
accepted: true,
|
||||
deliveryState: "accepted",
|
||||
steerId: retrySteerIds[0],
|
||||
traceConfirmation: {
|
||||
taskId: "direct_task",
|
||||
steerId: retrySteerIds[0],
|
||||
found: true,
|
||||
accepted: true,
|
||||
deliveryState: "accepted",
|
||||
matchCount: 1,
|
||||
trace: { seq: 6, at: "2026-05-23T00:00:02.000Z", method: "turn/steer", steerId: retrySteerIds[0], promptChars: 20, promptHash: "hash2", promptOmitted: true, source: "output" },
|
||||
duplicateSuppressionKey: retrySteerIds[0],
|
||||
},
|
||||
task: { id: "direct_task", status: "running", prompt: "hidden" },
|
||||
queue: { activeTaskIds: ["direct_task"] },
|
||||
},
|
||||
};
|
||||
}) as JsonRecord;
|
||||
assertCondition(retryCalls === 2, "retryable 502 tunnel abort should be retried once by default", { retryCalls, retryThenSuccess });
|
||||
assertCondition(retrySteerIds.length === 2 && retrySteerIds[0] === retrySteerIds[1] && retrySteerIds[0] === deterministicSteerId("direct_task", "transient correction"), "retry should reuse a single steerId", { retrySteerIds });
|
||||
assertCondition(nestedRecord(retryThenSuccess, ["steer"]).accepted === true, "retry success should accept steer", retryThenSuccess);
|
||||
assertCondition(nestedRecord(retryThenSuccess, ["steer"]).steerId === retrySteerIds[0], "retry success should report reused steerId", retryThenSuccess);
|
||||
const retrySuccessAttempts = nestedRecord(retryThenSuccess, ["steer"]).attempts;
|
||||
assertCondition(Array.isArray(retrySuccessAttempts) && retrySuccessAttempts.length === 2, "retry success should expose both attempts", retryThenSuccess);
|
||||
assertCondition(String(JSON.stringify(retryThenSuccess)).includes("The operation was aborted"), "retry attempts should preserve aborted tunnel evidence", retryThenSuccess);
|
||||
assertCondition(!String(JSON.stringify(retryThenSuccess)).includes("transient correction"), "retry success must not echo steer prompt", retryThenSuccess);
|
||||
|
||||
let exhaustedCalls = 0;
|
||||
const exhausted = codexSteerTaskForTest("direct_task", ["final correction", "--retry-attempts", "2", "--retry-delay-ms", "0"], () => {
|
||||
const exhaustedSteerIds: string[] = [];
|
||||
const exhausted = codexSteerTaskForTest("direct_task", ["final correction", "--retry-attempts", "2", "--retry-delay-ms", "0"], (path, init) => {
|
||||
exhaustedCalls += 1;
|
||||
exhaustedSteerIds.push(String((init?.body as JsonRecord | undefined)?.steerId || ""));
|
||||
if (path.includes("/steer-confirmation")) {
|
||||
return {
|
||||
ok: true,
|
||||
status: 200,
|
||||
body: {
|
||||
ok: true,
|
||||
confirmation: {
|
||||
taskId: "direct_task",
|
||||
steerId: exhaustedSteerIds[0],
|
||||
found: false,
|
||||
accepted: false,
|
||||
deliveryState: "unknown",
|
||||
matchCount: 0,
|
||||
trace: null,
|
||||
duplicateSuppressionKey: exhaustedSteerIds[0],
|
||||
promptOmitted: true,
|
||||
},
|
||||
},
|
||||
};
|
||||
}
|
||||
return { ok: false, status: 502, body: abortedTunnelBody };
|
||||
}) as JsonRecord;
|
||||
assertCondition(exhaustedCalls === 2, "retryable 502 tunnel abort should honor retry-attempts", { exhaustedCalls, exhausted });
|
||||
assertCondition(exhaustedCalls === 3, "retryable 502 tunnel abort should honor retry-attempts and run confirmation lookup", { exhaustedCalls, exhausted });
|
||||
assertCondition(exhaustedSteerIds[0] === exhaustedSteerIds[1], "exhausted retry should reuse steerId", { exhaustedSteerIds });
|
||||
assertReason(exhausted, "stable-proxy-failed", 502);
|
||||
assertCondition(nestedRecord(exhausted, ["steer"]).status === "unknown", "unconfirmed transport failure should report unknown status", exhausted);
|
||||
assertCondition(nestedRecord(exhausted, ["steer"]).steerId === exhaustedSteerIds[0], "exhausted failure should expose steerId", exhausted);
|
||||
assertCondition(nestedRecord(exhausted, ["traceConfirmation"]).deliveryState === "unknown", "exhausted failure should include trace confirmation lookup", exhausted);
|
||||
const exhaustedDiagnostics = nestedRecord(exhausted, ["diagnostics"]);
|
||||
const exhaustedAttempts = exhaustedDiagnostics.attempts;
|
||||
assertCondition(Array.isArray(exhaustedAttempts) && exhaustedAttempts.length === 2, "exhausted retry diagnostics should expose attempts", exhaustedDiagnostics);
|
||||
assertCondition(String(exhaustedDiagnostics.message || "").includes("The operation was aborted"), "diagnostics should include provider abort message", exhaustedDiagnostics);
|
||||
assertCondition(nestedRecord(exhaustedDiagnostics, ["operatorGuidance"]).rawProxyEquivalentIsFallback === false, "raw proxy equivalent should be diagnostic, not fallback", exhaustedDiagnostics);
|
||||
assertCondition(String(nestedRecord(exhausted, ["commands"]).traceConfirm || "").includes(`--steer-id ${exhaustedSteerIds[0]}`), "failure should expose bounded trace confirmation command", exhausted);
|
||||
assertCondition(String(nestedRecord(exhausted, ["commands"]).rawProxy || "").includes("microservice proxy code-queue /api/tasks/direct_task/steer"), "failure should still expose raw proxy diagnostic command", exhausted);
|
||||
assertCondition(nestedRecord(exhausted, ["steer"]).promptOmitted === true, "failed steer should omit prompt by default", exhausted);
|
||||
assertCondition(!("request" in exhausted), "failed steer should omit request by default", exhausted);
|
||||
@@ -218,6 +289,96 @@ export function runCodeQueueCliSteerContract(): JsonRecord {
|
||||
}) as JsonRecord;
|
||||
assertCondition("rawFailure" in exhaustedRaw, "--raw failed steer should expose raw response", exhaustedRaw);
|
||||
|
||||
const timeoutAcceptedSteerId = deterministicSteerId("direct_task", "accepted but timed out");
|
||||
let timeoutAcceptedCalls = 0;
|
||||
const timeoutAccepted = codexSteerTaskForTest("direct_task", ["accepted but timed out", "--retry-attempts", "1", "--retry-delay-ms", "0"], (path) => {
|
||||
timeoutAcceptedCalls += 1;
|
||||
if (path.includes("/steer-confirmation")) {
|
||||
return {
|
||||
ok: true,
|
||||
status: 200,
|
||||
body: {
|
||||
ok: true,
|
||||
confirmation: {
|
||||
taskId: "direct_task",
|
||||
steerId: timeoutAcceptedSteerId,
|
||||
found: true,
|
||||
accepted: true,
|
||||
deliveryState: "accepted",
|
||||
matchCount: 1,
|
||||
trace: { seq: 77, at: "2026-05-23T00:00:03.000Z", method: "turn/steer", steerId: timeoutAcceptedSteerId, promptChars: 22, promptHash: "hash3", promptOmitted: true, source: "promptHistory" },
|
||||
duplicateSuppressionKey: timeoutAcceptedSteerId,
|
||||
promptOmitted: true,
|
||||
},
|
||||
},
|
||||
};
|
||||
}
|
||||
return { ok: false, status: 502, body: abortedTunnelBody };
|
||||
}) as JsonRecord;
|
||||
assertCondition(timeoutAcceptedCalls === 2, "timeout accepted contract should perform one send and one confirmation lookup", { timeoutAcceptedCalls });
|
||||
assertCondition(timeoutAccepted.ok === true, "trace-confirmed timeout should be treated as accepted", timeoutAccepted);
|
||||
assertCondition(nestedRecord(timeoutAccepted, ["steer"]).status === "accepted_response_timeout", "trace-confirmed timeout should expose accepted_response_timeout", timeoutAccepted);
|
||||
assertCondition(nestedRecord(timeoutAccepted, ["steer"]).deliveryUnconfirmed === false, "trace-confirmed timeout should not remain deliveryUnconfirmed", timeoutAccepted);
|
||||
assertCondition(nestedRecord(timeoutAccepted, ["traceConfirmation"]).accepted === true, "trace-confirmed timeout should include confirmation", timeoutAccepted);
|
||||
|
||||
const explicitSteerId = "steer_manual_12345";
|
||||
const duplicateSuppressed = codexSteerTaskForTest("direct_task", ["same prompt", "--steer-id", explicitSteerId], (_path, init) => {
|
||||
const body = init?.body as JsonRecord | undefined;
|
||||
assertCondition(body?.steerId === explicitSteerId, "explicit steerId should be sent unchanged", body ?? {});
|
||||
return {
|
||||
ok: true,
|
||||
status: 200,
|
||||
body: {
|
||||
ok: true,
|
||||
accepted: true,
|
||||
duplicateSuppressed: true,
|
||||
deliveryState: "accepted",
|
||||
steerId: explicitSteerId,
|
||||
traceConfirmation: {
|
||||
taskId: "direct_task",
|
||||
steerId: explicitSteerId,
|
||||
found: true,
|
||||
accepted: true,
|
||||
deliveryState: "accepted",
|
||||
matchCount: 1,
|
||||
trace: { seq: 88, at: "2026-05-23T00:00:04.000Z", method: "turn/steer", steerId: explicitSteerId, promptChars: 11, promptHash: "hash4", promptOmitted: true, source: "promptHistory" },
|
||||
duplicateSuppressionKey: explicitSteerId,
|
||||
},
|
||||
task: { id: "direct_task", status: "running", prompt: "hidden" },
|
||||
queue: { activeTaskIds: ["direct_task"] },
|
||||
},
|
||||
};
|
||||
}) as JsonRecord;
|
||||
assertCondition(nestedRecord(duplicateSuppressed, ["steer"]).duplicateSuppressed === true, "duplicate suppression should be visible", duplicateSuppressed);
|
||||
assertCondition(nestedRecord(duplicateSuppressed, ["steer"]).steerId === explicitSteerId, "duplicate suppression should preserve steerId", duplicateSuppressed);
|
||||
|
||||
const confirmLookup = codexSteerTraceConfirmForTest("direct_task", ["--steer-id", explicitSteerId], (path) => {
|
||||
assertCondition(path.includes("/api/microservices/code-queue/proxy/api/tasks/direct_task/steer-confirmation"), "confirm lookup should use proxy confirmation endpoint", { path });
|
||||
assertCondition(path.includes(`steerId=${encodeURIComponent(explicitSteerId)}`), "confirm lookup should include steerId query", { path });
|
||||
return {
|
||||
ok: true,
|
||||
status: 200,
|
||||
body: {
|
||||
ok: true,
|
||||
confirmation: {
|
||||
taskId: "direct_task",
|
||||
steerId: explicitSteerId,
|
||||
found: true,
|
||||
accepted: true,
|
||||
deliveryState: "accepted",
|
||||
matchCount: 1,
|
||||
trace: { seq: 88, at: "2026-05-23T00:00:04.000Z", method: "turn/steer", steerId: explicitSteerId, promptChars: 11, promptHash: "hash4", promptOmitted: true, source: "promptHistory" },
|
||||
duplicateSuppressionKey: explicitSteerId,
|
||||
promptOmitted: true,
|
||||
},
|
||||
},
|
||||
};
|
||||
}) as JsonRecord;
|
||||
assertCondition(confirmLookup.ok === true, "trace confirmation lookup should succeed when accepted", confirmLookup);
|
||||
assertCondition(nestedRecord(confirmLookup, ["delivery"]).status === "accepted", "trace confirmation output should expose accepted status", confirmLookup);
|
||||
assertCondition(nestedRecord(confirmLookup, ["traceConfirmation", "trace"]).seq === 88, "trace confirmation output should expose bounded trace seq", confirmLookup);
|
||||
assertCondition(!JSON.stringify(confirmLookup).includes("same prompt"), "trace confirmation lookup must not echo prompt", confirmLookup);
|
||||
|
||||
const terminalPrompt = `${"do not leak ".repeat(40)}tail-secret-marker`;
|
||||
const terminalRejection = codexSteerTaskForTest("completed_task", [terminalPrompt], () => ({
|
||||
ok: false,
|
||||
@@ -241,7 +402,10 @@ export function runCodeQueueCliSteerContract(): JsonRecord {
|
||||
const terminalSteer = nestedRecord(terminalRejection, ["steer"]);
|
||||
assertCondition(terminalRejection.ok === false, "terminal steer rejection should fail", terminalRejection);
|
||||
assertCondition(terminalSteer.reason === "task-already-terminal", "terminal steer rejection should use compact terminal reason", terminalSteer);
|
||||
assertCondition(terminalSteer.status === "succeeded", "terminal steer rejection should expose task status", terminalSteer);
|
||||
assertCondition(terminalSteer.deliveryState === "not_accepted", "terminal steer rejection should expose not_accepted delivery state", terminalSteer);
|
||||
assertCondition(String(terminalSteer.steerId || "").startsWith("steer_"), "terminal steer rejection should expose steerId", terminalSteer);
|
||||
assertCondition(terminalSteer.status === "not_accepted", "terminal steer rejection should expose not_accepted status", terminalSteer);
|
||||
assertCondition(terminalSteer.taskStatus === "succeeded", "terminal steer rejection should expose task status", terminalSteer);
|
||||
assertCondition(terminalSteer.terminalStatus === "completed", "terminal steer rejection should expose terminal status", terminalSteer);
|
||||
assertCondition(terminalSteer.lastUpdate === "2026-05-22T00:00:00.000Z", "terminal steer rejection should expose last update", terminalSteer);
|
||||
assertCondition(terminalSteer.updatedAt === "2026-05-22T00:00:00.000Z", "terminal steer rejection should expose last update time", terminalSteer);
|
||||
@@ -308,6 +472,8 @@ export function runCodeQueueCliSteerContract(): JsonRecord {
|
||||
"successful steer confirms write without echoing prompt",
|
||||
"steer failure classification is JSON-consumable",
|
||||
"retryable tunnel aborts are retried with bounded diagnostics",
|
||||
"retry reuses steerId and trace confirmation distinguishes accepted_response_timeout from unknown",
|
||||
"duplicate suppression and trace confirmation lookup expose bounded output shape",
|
||||
"terminal steer rejection is compact and actionable",
|
||||
"terminal steer rejection full/raw disclosure is explicit",
|
||||
],
|
||||
|
||||
@@ -0,0 +1,86 @@
|
||||
import { findSteerTraceConfirmation, steerDuplicateDecision, steerTraceText } from "../src/components/microservices/code-queue/src/steer-confirmation";
|
||||
import type { QueueTask } from "../src/components/microservices/code-queue/src/types";
|
||||
|
||||
type JsonRecord = Record<string, unknown>;
|
||||
|
||||
function assertCondition(condition: unknown, message: string, detail: JsonRecord = {}): void {
|
||||
if (!condition) throw new Error(`${message}: ${JSON.stringify(detail)}`);
|
||||
}
|
||||
|
||||
function fixtureTask(): QueueTask {
|
||||
const at = "2026-05-23T00:00:00.000Z";
|
||||
return {
|
||||
id: "codex_steer_confirm_fixture",
|
||||
queueId: "default",
|
||||
queueEnteredAt: at,
|
||||
prompt: "base",
|
||||
basePrompt: "base",
|
||||
referenceTaskIds: [],
|
||||
referenceInjection: null,
|
||||
providerId: "D601",
|
||||
cwd: "/workspace",
|
||||
model: "gpt-5.5",
|
||||
reasoningEffort: null,
|
||||
executionMode: "default",
|
||||
maxAttempts: 99,
|
||||
status: "running",
|
||||
createdAt: at,
|
||||
updatedAt: at,
|
||||
startedAt: at,
|
||||
finishedAt: null,
|
||||
readAt: null,
|
||||
currentAttempt: 1,
|
||||
currentMode: "initial",
|
||||
codexThreadId: "thread_fixture",
|
||||
activeTurnId: "turn_fixture",
|
||||
finalResponse: "",
|
||||
lastError: null,
|
||||
lastJudge: null,
|
||||
judgeFailCount: 0,
|
||||
promptHistory: [],
|
||||
output: [],
|
||||
events: [],
|
||||
attempts: [],
|
||||
cancelRequested: false,
|
||||
nextPrompt: null,
|
||||
nextMode: null,
|
||||
};
|
||||
}
|
||||
|
||||
export function runCodeQueueSteerConfirmationContract(): JsonRecord {
|
||||
const task = fixtureTask();
|
||||
const steerId = "steer_contract_12345";
|
||||
const prompt = "correct this running task";
|
||||
task.output.push({ seq: 7, at: "2026-05-23T00:00:07.000Z", channel: "user", method: "turn/steer", itemId: steerId, text: steerTraceText(steerId, prompt) });
|
||||
task.promptHistory.push({ seq: 7, at: "2026-05-23T00:00:07.000Z", method: "turn/steer", text: prompt, steerId });
|
||||
|
||||
const confirmation = findSteerTraceConfirmation(task, steerId);
|
||||
assertCondition(confirmation.found === true && confirmation.accepted === true, "confirmation should find steer trace by steerId", confirmation as unknown as JsonRecord);
|
||||
assertCondition(confirmation.matches.length === 1, "confirmation should coalesce promptHistory/output duplicates by seq", confirmation as unknown as JsonRecord);
|
||||
assertCondition(confirmation.trace?.promptChars === prompt.length, "confirmation should expose prompt chars without prompt text", (confirmation.trace ?? {}) as unknown as JsonRecord);
|
||||
assertCondition(JSON.stringify(confirmation).includes(prompt) === false, "confirmation must not echo prompt text", confirmation as unknown as JsonRecord);
|
||||
|
||||
const duplicate = steerDuplicateDecision(task, steerId, prompt);
|
||||
assertCondition(duplicate.duplicate === true && duplicate.conflict === false, "same steerId and prompt should be duplicate-suppressed", duplicate as unknown as JsonRecord);
|
||||
|
||||
const conflict = steerDuplicateDecision(task, steerId, "different prompt");
|
||||
assertCondition(conflict.duplicate === false && conflict.conflict === true, "same steerId with different prompt should be rejected as conflict", conflict as unknown as JsonRecord);
|
||||
|
||||
const missing = findSteerTraceConfirmation(task, "steer_missing_12345");
|
||||
assertCondition(missing.found === false && missing.deliveryState === "unknown", "missing steerId should remain unknown", missing as unknown as JsonRecord);
|
||||
|
||||
return {
|
||||
ok: true,
|
||||
checks: [
|
||||
"trace confirmation finds steer by steerId",
|
||||
"promptHistory/output duplicate seq is coalesced",
|
||||
"duplicate suppression requires same prompt hash",
|
||||
"steerId conflict is detectable",
|
||||
"missing steerId returns unknown",
|
||||
],
|
||||
};
|
||||
}
|
||||
|
||||
if (import.meta.main) {
|
||||
process.stdout.write(`${JSON.stringify(runCodeQueueSteerConfirmationContract(), null, 2)}\n`);
|
||||
}
|
||||
@@ -36,6 +36,7 @@ const syntaxFiles = [
|
||||
"scripts/code-queue-cli-disclosure-contract-test.ts",
|
||||
"scripts/code-queue-prompt-lint-contract-test.ts",
|
||||
"scripts/code-queue-cli-steer-test.ts",
|
||||
"scripts/code-queue-steer-confirmation-contract-test.ts",
|
||||
"scripts/code-queue-cli-submit-prompt-contract-test.ts",
|
||||
"scripts/code-queue-submit-execution-mode-contract-test.ts",
|
||||
"scripts/code-queue-submit-summary-contract-test.ts",
|
||||
@@ -321,6 +322,7 @@ export function runChecks(config: UniDeskConfig, options: CheckOptions = default
|
||||
fileItem("scripts/code-queue-cli-disclosure-contract-test.ts"),
|
||||
fileItem("scripts/code-queue-prompt-lint-contract-test.ts"),
|
||||
fileItem("scripts/code-queue-cli-steer-test.ts"),
|
||||
fileItem("scripts/code-queue-steer-confirmation-contract-test.ts"),
|
||||
fileItem("scripts/code-queue-cli-read-terminal-contract-test.ts"),
|
||||
fileItem("scripts/code-queue-cli-submit-prompt-contract-test.ts"),
|
||||
fileItem("scripts/code-queue-submit-summary-contract-test.ts"),
|
||||
@@ -367,6 +369,7 @@ export function runChecks(config: UniDeskConfig, options: CheckOptions = default
|
||||
items.push(commandItem("code-queue:cli-disclosure-contract", ["bun", "scripts/code-queue-cli-disclosure-contract-test.ts"], 30_000));
|
||||
items.push(commandItem("code-queue:prompt-lint-contract", ["bun", "scripts/code-queue-prompt-lint-contract-test.ts"], 30_000));
|
||||
items.push(commandItem("code-queue:cli-steer-contract", ["bun", "scripts/code-queue-cli-steer-test.ts"], 30_000));
|
||||
items.push(commandItem("code-queue:steer-confirmation-contract", ["bun", "scripts/code-queue-steer-confirmation-contract-test.ts"], 30_000));
|
||||
items.push(commandItem("code-queue:read-terminal-contract", ["bun", "scripts/code-queue-cli-read-terminal-contract-test.ts"], 30_000));
|
||||
items.push(commandItem("code-queue:submit-prompt-contract", ["bun", "scripts/code-queue-cli-submit-prompt-contract-test.ts"], 30_000));
|
||||
items.push(commandItem("code-queue:submit-execution-mode-contract", ["bun", "scripts/code-queue-submit-execution-mode-contract-test.ts"], 30_000));
|
||||
@@ -403,6 +406,7 @@ export function runChecks(config: UniDeskConfig, options: CheckOptions = default
|
||||
items.push(skippedItem("code-queue:cli-disclosure-contract", "Code Queue CLI disclosure/noise contract is opt-in with script checks", "--scripts-typecheck or --full"));
|
||||
items.push(skippedItem("code-queue:prompt-lint-contract", "Code Queue prompt live-authorization lint contract is opt-in with script checks", "--scripts-typecheck or --full"));
|
||||
items.push(skippedItem("code-queue:cli-steer-contract", "Code Queue steer CLI contract is opt-in with script checks", "--scripts-typecheck or --full"));
|
||||
items.push(skippedItem("code-queue:steer-confirmation-contract", "Code Queue steer delivery confirmation contract is opt-in with script checks", "--scripts-typecheck or --full"));
|
||||
items.push(skippedItem("code-queue:read-terminal-contract", "Code Queue terminal read contract is opt-in with script checks", "--scripts-typecheck or --full"));
|
||||
items.push(skippedItem("code-queue:submit-prompt-contract", "Code Queue submit prompt contract is opt-in with script checks", "--scripts-typecheck or --full"));
|
||||
items.push(skippedItem("code-queue:submit-execution-mode-contract", "Code Queue submit execution-mode contract is opt-in with script checks", "--scripts-typecheck or --full"));
|
||||
|
||||
+198
-25
@@ -3,6 +3,7 @@ import { runCommand } from "./command";
|
||||
import { type UniDeskConfig, repoRoot, rootPath } from "./config";
|
||||
import { coreInternalFetch } from "./microservices";
|
||||
import { previewJson } from "./preview";
|
||||
import { createSteerId, type SteerDeliveryState } from "../../src/components/microservices/code-queue/src/steer-confirmation";
|
||||
import {
|
||||
codeAgentPortForModel,
|
||||
codeExecutionModes,
|
||||
@@ -220,6 +221,7 @@ interface SubmitRoutingRecommendation {
|
||||
|
||||
interface CodexSteerOptions {
|
||||
prompt: string;
|
||||
steerId: string | undefined;
|
||||
dryRun: boolean;
|
||||
retryAttempts: number;
|
||||
retryDelayMs: number;
|
||||
@@ -227,6 +229,11 @@ interface CodexSteerOptions {
|
||||
raw: boolean;
|
||||
}
|
||||
|
||||
interface CodexSteerConfirmOptions {
|
||||
steerId: string;
|
||||
raw: boolean;
|
||||
}
|
||||
|
||||
type CodexSteerFailureReason =
|
||||
| "backend-core-unreachable"
|
||||
| "code-queue-microservice-unregistered"
|
||||
@@ -237,6 +244,8 @@ type CodexSteerFailureReason =
|
||||
| "stable-proxy-failed"
|
||||
| "invalid-proxy-response";
|
||||
|
||||
type CodexSteerAcceptanceStatus = "accepted" | "not_accepted" | "accepted_response_timeout" | "unknown";
|
||||
|
||||
interface ClassifiedCodexSteerError {
|
||||
reason: CodexSteerFailureReason;
|
||||
scope: "backend-core" | "stable-proxy" | "code-queue-runtime" | "unknown";
|
||||
@@ -261,8 +270,12 @@ interface CodexSteerAttemptSummary {
|
||||
scope: ClassifiedCodexSteerError["scope"] | null;
|
||||
retryable: boolean;
|
||||
message: string | null;
|
||||
deliveryState?: CodexSteerDeliveryState | null;
|
||||
steerId?: string | null;
|
||||
}
|
||||
|
||||
type CodexSteerDeliveryState = SteerDeliveryState;
|
||||
|
||||
interface CompactTaskMutationResponseOptions {
|
||||
fullPrompt?: boolean;
|
||||
}
|
||||
@@ -443,6 +456,18 @@ function codeQueueProxyEquivalentCommand(targetPath: string, bodyJson: string):
|
||||
return `bun scripts/cli.ts microservice proxy code-queue ${targetPath} --method POST --body-json '${bodyJson}'`;
|
||||
}
|
||||
|
||||
function steerConfirmationPath(taskId: string, steerId: string): string {
|
||||
return `/api/tasks/${encodeURIComponent(taskId)}/steer-confirmation${queryString({ steerId })}`;
|
||||
}
|
||||
|
||||
function steerConfirmationCommand(taskId: string, steerId: string): string {
|
||||
return `bun scripts/cli.ts codex steer-confirm ${taskId} --steer-id ${steerId}`;
|
||||
}
|
||||
|
||||
function rawSteerConfirmationCommand(taskId: string, steerId: string): string {
|
||||
return `bun scripts/cli.ts microservice proxy code-queue ${steerConfirmationPath(taskId, steerId)} --raw`;
|
||||
}
|
||||
|
||||
function nonNegativeIntegerEnv(name: string, fallback: number): number {
|
||||
const raw = process.env[name];
|
||||
if (raw === undefined || raw.trim().length === 0) return fallback;
|
||||
@@ -745,6 +770,57 @@ function compactSteerFailureDiagnostics(diagnostics: ClassifiedCodexSteerError,
|
||||
};
|
||||
}
|
||||
|
||||
function compactSteerTraceConfirmation(value: unknown, taskId: string, steerId: string): Record<string, unknown> {
|
||||
const record = asRecord(value) ?? {};
|
||||
const confirmation = asRecord(record.confirmation) ?? record;
|
||||
const trace = asRecord(confirmation.trace);
|
||||
return {
|
||||
taskId: asString(confirmation.taskId) || taskId,
|
||||
steerId: asString(confirmation.steerId) || steerId,
|
||||
found: confirmation.found === true,
|
||||
accepted: confirmation.accepted === true,
|
||||
deliveryState: asString(confirmation.deliveryState) || (confirmation.found === true ? "accepted" : "unknown"),
|
||||
matchCount: asNumber(confirmation.matchCount, 0),
|
||||
trace: trace === null ? null : {
|
||||
seq: trace.seq ?? null,
|
||||
at: trace.at ?? null,
|
||||
method: trace.method ?? null,
|
||||
steerId: trace.steerId ?? steerId,
|
||||
promptChars: trace.promptChars ?? null,
|
||||
promptHash: trace.promptHash ?? null,
|
||||
promptOmitted: true,
|
||||
source: trace.source ?? null,
|
||||
},
|
||||
duplicateSuppressionKey: confirmation.duplicateSuppressionKey ?? steerId,
|
||||
promptOmitted: true,
|
||||
};
|
||||
}
|
||||
|
||||
function fetchSteerTraceConfirmation(taskId: string, steerId: string, fetcher: CodexResponseFetcher): Record<string, unknown> {
|
||||
const response = unwrapCodexResponse(fetcher(codeQueueProxyPath(steerConfirmationPath(taskId, steerId))));
|
||||
return compactSteerTraceConfirmation(response.body, taskId, steerId);
|
||||
}
|
||||
|
||||
function safeFetchSteerTraceConfirmation(taskId: string, steerId: string, fetcher: CodexResponseFetcher): Record<string, unknown> {
|
||||
try {
|
||||
return fetchSteerTraceConfirmation(taskId, steerId, fetcher);
|
||||
} catch (error) {
|
||||
return {
|
||||
taskId,
|
||||
steerId,
|
||||
found: false,
|
||||
accepted: false,
|
||||
deliveryState: "unknown",
|
||||
promptOmitted: true,
|
||||
lookupError: error instanceof Error ? error.message : String(error),
|
||||
commands: {
|
||||
retryLookup: steerConfirmationCommand(taskId, steerId),
|
||||
rawLookup: rawSteerConfirmationCommand(taskId, steerId),
|
||||
},
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
function unwrapSteerResponse(response: unknown, targetPath: string, stableProxyPath: string, rawProxyEquivalent: string): { ok: true; upstream: { ok: unknown; status: unknown }; body: Record<string, unknown> } | { ok: false; diagnostics: ClassifiedCodexSteerError; rawResponse: unknown } {
|
||||
const record = asRecord(response);
|
||||
const body = responseBody(record);
|
||||
@@ -763,7 +839,7 @@ function terminalStatusFromTask(task: Record<string, unknown> | null): string {
|
||||
return "";
|
||||
}
|
||||
|
||||
function compactTerminalSteerRejection(taskId: string, response: unknown): Record<string, unknown> | null {
|
||||
function compactTerminalSteerRejection(taskId: string, steerId: string, response: unknown): Record<string, unknown> | null {
|
||||
const record = asRecord(response);
|
||||
const body = responseBody(record);
|
||||
const task = asRecord(body?.task);
|
||||
@@ -775,9 +851,12 @@ function compactTerminalSteerRejection(taskId: string, response: unknown): Recor
|
||||
ok: false,
|
||||
steer: {
|
||||
accepted: false,
|
||||
status: "not_accepted",
|
||||
deliveryState: "not_accepted",
|
||||
steerId,
|
||||
reason: "task-already-terminal",
|
||||
taskId,
|
||||
status,
|
||||
taskStatus: status,
|
||||
terminalStatus: terminalStatus || null,
|
||||
lastUpdate,
|
||||
updatedAt: task?.updatedAt ?? null,
|
||||
@@ -833,7 +912,7 @@ function attachSteerDisclosure(
|
||||
return result;
|
||||
}
|
||||
|
||||
function steerSuccessAttempt(attempt: number, durationMs: number, upstream: { ok: unknown; status: unknown }): CodexSteerAttemptSummary {
|
||||
function steerSuccessAttempt(attempt: number, durationMs: number, upstream: { ok: unknown; status: unknown }, steerId?: string, deliveryState: CodexSteerDeliveryState = "accepted"): CodexSteerAttemptSummary {
|
||||
const status = typeof upstream.status === "number" && Number.isFinite(upstream.status) ? upstream.status : null;
|
||||
return {
|
||||
attempt,
|
||||
@@ -845,6 +924,8 @@ function steerSuccessAttempt(attempt: number, durationMs: number, upstream: { ok
|
||||
scope: null,
|
||||
retryable: false,
|
||||
message: "steer accepted by Code Queue",
|
||||
deliveryState,
|
||||
steerId: steerId ?? null,
|
||||
};
|
||||
}
|
||||
|
||||
@@ -859,6 +940,8 @@ function steerFailureAttempt(attempt: number, durationMs: number, diagnostics: C
|
||||
scope: diagnostics.scope,
|
||||
retryable: diagnostics.retryable,
|
||||
message: diagnostics.message,
|
||||
deliveryState: null,
|
||||
steerId: null,
|
||||
};
|
||||
}
|
||||
|
||||
@@ -2512,6 +2595,10 @@ export function codexSteerTaskForTest(taskId: string, optionArgs: string[], fetc
|
||||
return codexSteerTask(taskId, optionArgs, fetcher);
|
||||
}
|
||||
|
||||
export function codexSteerTraceConfirmForTest(taskId: string, optionArgs: string[], fetcher: CodexResponseFetcher): unknown {
|
||||
return codexSteerTraceConfirm(taskId, optionArgs, fetcher);
|
||||
}
|
||||
|
||||
function isTerminalTaskStatus(status: unknown): boolean {
|
||||
return status === "succeeded" || status === "failed" || status === "canceled";
|
||||
}
|
||||
@@ -3980,6 +4067,8 @@ const steerPromptValueOptions = new Set([
|
||||
"--file",
|
||||
"--retry-attempts",
|
||||
"--retry-delay-ms",
|
||||
"--steer-id",
|
||||
"--steerId",
|
||||
]);
|
||||
|
||||
function referenceTaskIdsFromOptions(args: string[]): string[] {
|
||||
@@ -4089,13 +4178,16 @@ function parseSubmitOptions(args: string[]): CodexSubmitOptions {
|
||||
function parseSteerOptions(args: string[]): CodexSteerOptions {
|
||||
assertKnownOptions(args, {
|
||||
flags: ["--prompt-stdin", "--stdin", "--dry-run", "--no-retry", "--full", "--raw"],
|
||||
valueOptions: ["--prompt-file", "--file", "--retry-attempts", "--retry-delay-ms"],
|
||||
valueOptions: ["--prompt-file", "--file", "--retry-attempts", "--retry-delay-ms", "--steer-id", "--steerId"],
|
||||
}, "codex steer");
|
||||
const retryAttempts = hasFlag(args, "--no-retry")
|
||||
? 1
|
||||
: positiveIntegerOption(args, ["--retry-attempts"], defaultSteerRetryAttempts, maxSteerRetryAttempts);
|
||||
const steerId = optionValue(args, ["--steer-id", "--steerId"]);
|
||||
if (steerId !== undefined && !/^[A-Za-z0-9._:-]{8,128}$/u.test(steerId)) throw new Error("--steer-id must be 8-128 chars using letters, numbers, dot, underscore, colon, or dash");
|
||||
return {
|
||||
prompt: promptFromArgs(args, "codex steer", steerPromptValueOptions),
|
||||
steerId,
|
||||
dryRun: hasFlag(args, "--dry-run"),
|
||||
retryAttempts,
|
||||
retryDelayMs: nonNegativeIntegerOption(args, ["--retry-delay-ms"], defaultSteerRetryDelayMs, maxSteerRetryDelayMs),
|
||||
@@ -4104,6 +4196,17 @@ function parseSteerOptions(args: string[]): CodexSteerOptions {
|
||||
};
|
||||
}
|
||||
|
||||
function parseSteerConfirmOptions(args: string[]): CodexSteerConfirmOptions {
|
||||
assertKnownOptions(args, {
|
||||
flags: ["--raw"],
|
||||
valueOptions: ["--steer-id", "--steerId"],
|
||||
}, "codex steer-confirm");
|
||||
const steerId = optionValue(args, ["--steer-id", "--steerId"]);
|
||||
if (steerId === undefined) throw new Error("codex steer-confirm requires --steer-id <id>");
|
||||
if (!/^[A-Za-z0-9._:-]{8,128}$/u.test(steerId)) throw new Error("--steer-id must be 8-128 chars using letters, numbers, dot, underscore, colon, or dash");
|
||||
return { steerId, raw: hasFlag(args, "--raw") };
|
||||
}
|
||||
|
||||
function parsePromptLintOptions(args: string[]): CodexPromptLintOptions {
|
||||
assertKnownOptions(args, {
|
||||
flags: ["--prompt-stdin", "--stdin"],
|
||||
@@ -4193,6 +4296,18 @@ function compactSubmitTaskConfirmation(task: unknown): Record<string, unknown> {
|
||||
};
|
||||
}
|
||||
|
||||
function compactSteerTaskConfirmation(task: unknown, steerId: string): Record<string, unknown> {
|
||||
const compact = compactSubmitTaskConfirmation(task);
|
||||
const commands = asRecord(compact.commands) ?? {};
|
||||
return {
|
||||
...compact,
|
||||
commands: {
|
||||
...commands,
|
||||
traceConfirm: steerConfirmationCommand(asString(compact.id) || "<taskId>", steerId),
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
function orderedUniqueStringList(values: string[]): string[] {
|
||||
const seen = new Set<string>();
|
||||
const items: string[] = [];
|
||||
@@ -6133,27 +6248,31 @@ function codexInterruptTask(taskId: string): unknown {
|
||||
function codexSteerTask(taskId: string, args: string[], fetcher: CodexResponseFetcher = coreInternalFetch): unknown {
|
||||
const options = parseSteerOptions(args);
|
||||
const promptLint = buildPromptLiveAuthorizationLint(options.prompt);
|
||||
const steerId = options.steerId ?? createSteerId(taskId, options.prompt);
|
||||
const targetPath = `/api/tasks/${encodeURIComponent(taskId)}/steer`;
|
||||
const stableProxyPath = codeQueueProxyPath(targetPath);
|
||||
const rawProxyEquivalent = codeQueueProxyEquivalentCommand(targetPath, "{\"prompt\":\"...\"}");
|
||||
const rawProxyEquivalent = codeQueueProxyEquivalentCommand(targetPath, `{"prompt":"...","steerId":"${steerId}"}`);
|
||||
const prompt = textView(options.prompt, false, steerPromptPreviewChars);
|
||||
const request = {
|
||||
path: targetPath,
|
||||
stableProxyPath,
|
||||
method: "POST",
|
||||
steerId,
|
||||
retryPolicy: {
|
||||
defaultAttempts: defaultSteerRetryAttempts,
|
||||
maxAttempts: options.retryAttempts,
|
||||
delayMs: options.retryDelayMs,
|
||||
retryableReasons: ["stable-proxy-failed", "backend-core-unreachable"],
|
||||
deliveryConfirmation: "success confirms Code Queue accepted the steer request; repeated stable-proxy failures mean delivery is unconfirmed",
|
||||
deliveryConfirmation: "success confirms Code Queue accepted the steer request; timeout-like failures trigger a bounded trace confirmation lookup by steerId",
|
||||
idempotency: "the same steerId is reused for every CLI retry and suppresses duplicate trace injection on a backend that supports the contract",
|
||||
},
|
||||
bodySummary: {
|
||||
steerId,
|
||||
promptChars: options.prompt.length,
|
||||
promptPreviewChars: steerPromptPreviewChars,
|
||||
promptTruncated: prompt.truncated,
|
||||
},
|
||||
body: { prompt },
|
||||
body: { steerId, prompt },
|
||||
};
|
||||
if (options.dryRun) {
|
||||
return {
|
||||
@@ -6162,8 +6281,9 @@ function codexSteerTask(taskId: string, args: string[], fetcher: CodexResponseFe
|
||||
promptLint,
|
||||
request,
|
||||
commands: {
|
||||
run: `bun scripts/cli.ts codex steer ${taskId} --prompt-file <path>`,
|
||||
noRetry: `bun scripts/cli.ts codex steer ${taskId} --prompt-file <path> --no-retry`,
|
||||
run: `bun scripts/cli.ts codex steer ${taskId} --prompt-file <path> --steer-id ${steerId}`,
|
||||
noRetry: `bun scripts/cli.ts codex steer ${taskId} --prompt-file <path> --steer-id ${steerId} --no-retry`,
|
||||
traceConfirm: steerConfirmationCommand(taskId, steerId),
|
||||
rawProxy: rawProxyEquivalent,
|
||||
},
|
||||
};
|
||||
@@ -6173,16 +6293,16 @@ function codexSteerTask(taskId: string, args: string[], fetcher: CodexResponseFe
|
||||
let successfulResponse: { ok: true; upstream: { ok: unknown; status: unknown }; body: Record<string, unknown> } | null = null;
|
||||
for (let attempt = 1; attempt <= options.retryAttempts; attempt += 1) {
|
||||
const startedAt = Date.now();
|
||||
const response = unwrapSteerResponse(fetcher(stableProxyPath, { method: "POST", body: { prompt: options.prompt } }), targetPath, stableProxyPath, rawProxyEquivalent);
|
||||
const response = unwrapSteerResponse(fetcher(stableProxyPath, { method: "POST", body: { prompt: options.prompt, steerId } }), targetPath, stableProxyPath, rawProxyEquivalent);
|
||||
const durationMs = Date.now() - startedAt;
|
||||
if (response.ok) {
|
||||
attempts.push(steerSuccessAttempt(attempt, durationMs, response.upstream));
|
||||
attempts.push(steerSuccessAttempt(attempt, durationMs, response.upstream, asString(response.body.steerId) || steerId, asString(response.body.deliveryState) as CodexSteerDeliveryState || "accepted"));
|
||||
successfulResponse = response;
|
||||
break;
|
||||
}
|
||||
attempts.push(steerFailureAttempt(attempt, durationMs, response.diagnostics));
|
||||
failedResponse = response;
|
||||
const terminalRejection = compactTerminalSteerRejection(taskId, response.rawResponse);
|
||||
const terminalRejection = compactTerminalSteerRejection(taskId, steerId, response.rawResponse);
|
||||
if (terminalRejection !== null) {
|
||||
terminalRejection.commands = {
|
||||
...(asRecord(terminalRejection.commands) ?? {}),
|
||||
@@ -6197,15 +6317,23 @@ function codexSteerTask(taskId: string, args: string[], fetcher: CodexResponseFe
|
||||
if (successfulResponse === null) {
|
||||
const diagnostics = failedResponse?.diagnostics ?? classifySteerFailure(null, targetPath, stableProxyPath, rawProxyEquivalent);
|
||||
const transportDeliveryUnconfirmed = diagnostics.reason === "stable-proxy-failed" || diagnostics.reason === "backend-core-unreachable";
|
||||
const traceConfirmation = transportDeliveryUnconfirmed ? safeFetchSteerTraceConfirmation(taskId, steerId, fetcher) : null;
|
||||
const confirmedAccepted = asBoolean(traceConfirmation?.accepted);
|
||||
const deliveryState = confirmedAccepted ? "accepted_response_timeout" : transportDeliveryUnconfirmed ? "unknown" : "not_accepted";
|
||||
const acceptanceStatus: CodexSteerAcceptanceStatus = confirmedAccepted ? "accepted_response_timeout" : transportDeliveryUnconfirmed ? "unknown" : "not_accepted";
|
||||
const compactDiagnostics = compactSteerFailureDiagnostics(diagnostics, options.full);
|
||||
return {
|
||||
ok: false,
|
||||
ok: confirmedAccepted,
|
||||
steer: {
|
||||
accepted: false,
|
||||
accepted: confirmedAccepted,
|
||||
status: acceptanceStatus,
|
||||
deliveryState,
|
||||
steerId,
|
||||
promptChars: options.prompt.length,
|
||||
promptOmitted: true,
|
||||
attempts,
|
||||
deliveryUnconfirmed: transportDeliveryUnconfirmed,
|
||||
deliveryUnconfirmed: transportDeliveryUnconfirmed && !confirmedAccepted,
|
||||
duplicateSuppressionKey: steerId,
|
||||
retryPolicy: {
|
||||
attempted: attempts.length,
|
||||
maxAttempts: options.retryAttempts,
|
||||
@@ -6233,18 +6361,23 @@ function codexSteerTask(taskId: string, args: string[], fetcher: CodexResponseFe
|
||||
},
|
||||
operatorGuidance: {
|
||||
rawProxyEquivalentIsFallback: false,
|
||||
deliveryUnconfirmed: transportDeliveryUnconfirmed,
|
||||
nextStep: transportDeliveryUnconfirmed
|
||||
? "Check task liveness and retry codex steer from the main-server CLI or explicit SSH transport; do not treat a raw proxy failure as separate evidence that the task rejected the correction."
|
||||
deliveryUnconfirmed: transportDeliveryUnconfirmed && !confirmedAccepted,
|
||||
traceConfirmationChecked: traceConfirmation !== null,
|
||||
nextStep: confirmedAccepted
|
||||
? "The stable proxy response timed out, but trace confirmation found this steerId. Do not resend the same corrective prompt."
|
||||
: transportDeliveryUnconfirmed
|
||||
? "Run the bounded trace confirmation command before retrying. If it remains unknown, retry with the same steerId to avoid duplicate trace injection on an updated backend."
|
||||
: "Inspect the non-retryable reason before resubmitting the correction.",
|
||||
},
|
||||
},
|
||||
traceConfirmation,
|
||||
commands: {
|
||||
dryRun: `bun scripts/cli.ts codex steer ${taskId} --prompt-file <path> --dry-run`,
|
||||
retry: `bun scripts/cli.ts codex steer ${taskId} --prompt-file <path>`,
|
||||
noRetry: `bun scripts/cli.ts codex steer ${taskId} --prompt-file <path> --no-retry`,
|
||||
fullDetails: `bun scripts/cli.ts codex steer ${taskId} --prompt-file <path> --full`,
|
||||
rawDetails: `bun scripts/cli.ts codex steer ${taskId} --prompt-file <path> --raw`,
|
||||
dryRun: `bun scripts/cli.ts codex steer ${taskId} --prompt-file <path> --steer-id ${steerId} --dry-run`,
|
||||
retry: `bun scripts/cli.ts codex steer ${taskId} --prompt-file <path> --steer-id ${steerId}`,
|
||||
noRetry: `bun scripts/cli.ts codex steer ${taskId} --prompt-file <path> --steer-id ${steerId} --no-retry`,
|
||||
traceConfirm: steerConfirmationCommand(taskId, steerId),
|
||||
fullDetails: `bun scripts/cli.ts codex steer ${taskId} --prompt-file <path> --steer-id ${steerId} --full`,
|
||||
rawDetails: `bun scripts/cli.ts codex steer ${taskId} --prompt-file <path> --steer-id ${steerId} --raw`,
|
||||
rawProxy: rawProxyEquivalent,
|
||||
tasks: "bun scripts/cli.ts codex tasks --view supervisor --limit 20",
|
||||
health: "bun scripts/cli.ts microservice health code-queue",
|
||||
@@ -6258,14 +6391,23 @@ function codexSteerTask(taskId: string, args: string[], fetcher: CodexResponseFe
|
||||
...(options.raw ? { rawFailure: failedResponse?.rawResponse ?? null } : {}),
|
||||
};
|
||||
}
|
||||
const responseSteerId = asString(successfulResponse.body.steerId) || steerId;
|
||||
const traceConfirmation = compactSteerTraceConfirmation(successfulResponse.body.traceConfirmation, taskId, responseSteerId);
|
||||
const deliveryState = asString(successfulResponse.body.deliveryState) || asString(traceConfirmation.deliveryState) || "accepted";
|
||||
const duplicateSuppressed = successfulResponse.body.duplicateSuppressed === true;
|
||||
return {
|
||||
ok: true,
|
||||
upstream: successfulResponse.upstream,
|
||||
steer: {
|
||||
accepted: true,
|
||||
status: "accepted",
|
||||
deliveryState,
|
||||
steerId: responseSteerId,
|
||||
taskId,
|
||||
promptChars: options.prompt.length,
|
||||
promptOmitted: true,
|
||||
duplicateSuppressed,
|
||||
duplicateSuppressionKey: responseSteerId,
|
||||
attempts,
|
||||
retryPolicy: {
|
||||
attempted: attempts.length,
|
||||
@@ -6280,18 +6422,45 @@ function codexSteerTask(taskId: string, args: string[], fetcher: CodexResponseFe
|
||||
reason: "codex steer is a write operation; default output confirms delivery and provides drill-down commands without echoing prompt text or full task state.",
|
||||
},
|
||||
},
|
||||
task: compactSubmitTaskConfirmation(successfulResponse.body.task),
|
||||
traceConfirmation,
|
||||
task: compactSteerTaskConfirmation(successfulResponse.body.task, responseSteerId),
|
||||
queue: compactSubmitQueueConfirmation(successfulResponse.body.queue),
|
||||
commands: {
|
||||
show: `bun scripts/cli.ts codex task ${taskId}`,
|
||||
detail: `bun scripts/cli.ts codex task ${taskId} --detail`,
|
||||
trace: `bun scripts/cli.ts codex task ${taskId} --trace --tail --limit ${defaultTraceLimit}`,
|
||||
traceConfirm: steerConfirmationCommand(taskId, responseSteerId),
|
||||
output: `bun scripts/cli.ts codex output ${taskId} --tail --limit ${defaultOutputLimit}`,
|
||||
supervisor: `bun scripts/cli.ts codex tasks --view supervisor --limit ${defaultTasksLimit}`,
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
function codexSteerTraceConfirm(taskId: string, args: string[], fetcher: CodexResponseFetcher = coreInternalFetch): unknown {
|
||||
const options = parseSteerConfirmOptions(args);
|
||||
const path = steerConfirmationPath(taskId, options.steerId);
|
||||
const response = unwrapCodexResponse(fetcher(codeQueueProxyPath(path)));
|
||||
const confirmation = compactSteerTraceConfirmation(response.body, taskId, options.steerId);
|
||||
return {
|
||||
ok: asBoolean(confirmation.accepted),
|
||||
upstream: response.upstream,
|
||||
traceConfirmation: confirmation,
|
||||
delivery: {
|
||||
steerId: options.steerId,
|
||||
status: asBoolean(confirmation.accepted) ? "accepted" : "unknown",
|
||||
deliveryState: confirmation.deliveryState,
|
||||
promptOmitted: true,
|
||||
},
|
||||
commands: {
|
||||
task: `bun scripts/cli.ts codex task ${taskId}`,
|
||||
trace: `bun scripts/cli.ts codex task ${taskId} --trace --tail --limit ${defaultTraceLimit}`,
|
||||
retrySameSteerId: `bun scripts/cli.ts codex steer ${taskId} --prompt-file <path> --steer-id ${options.steerId}`,
|
||||
rawLookup: rawSteerConfirmationCommand(taskId, options.steerId),
|
||||
},
|
||||
...(options.raw ? { raw: response.body } : {}),
|
||||
};
|
||||
}
|
||||
|
||||
export async function runCodeQueueCommand(config: UniDeskConfig, args: string[]): Promise<unknown> {
|
||||
const [action = "task", taskIdArg] = args;
|
||||
if (action === "prompt-lint" || action === "lint-prompt") {
|
||||
@@ -6350,5 +6519,9 @@ export async function runCodeQueueCommand(config: UniDeskConfig, args: string[])
|
||||
const taskId = requireTaskId(taskIdArg, "codex steer");
|
||||
return codexSteerTask(taskId, args.slice(2));
|
||||
}
|
||||
throw new Error("codex command must be one of: prompt-lint, submit, enqueue, task, summary, show, tasks, overview, unread, terminal-unread, output, judge, read, mark-read, dev-ready, health, skills-sync, pr-preflight, runtime-preflight, queues, queue list, queue create, queue merge, move, steer, interrupt, cancel");
|
||||
if (action === "steer-confirm" || action === "steer-confirmation") {
|
||||
const taskId = requireTaskId(taskIdArg, `codex ${action}`);
|
||||
return codexSteerTraceConfirm(taskId, args.slice(2));
|
||||
}
|
||||
throw new Error("codex command must be one of: prompt-lint, submit, enqueue, task, summary, show, tasks, overview, unread, terminal-unread, output, judge, read, mark-read, dev-ready, health, skills-sync, pr-preflight, runtime-preflight, queues, queue list, queue create, queue merge, move, steer, steer-confirm, interrupt, cancel");
|
||||
}
|
||||
|
||||
+5
-3
@@ -63,7 +63,8 @@ export function rootHelp(): unknown {
|
||||
{ command: "codex read <taskId>", description: "Mark one reviewed terminal task read and return terminal metadata plus final response; prompt/tool logs stay behind drill-down commands." },
|
||||
{ command: "codex dev-ready", description: "Fetch execution-container readiness, including sanitized skill injection status from /api/dev-ready." },
|
||||
{ command: "codex judge <taskId> --attempt N [--dry-run] [--include-prompt]", description: "Replay one stored Code Queue attempt through the same judge context builder and MiniMax judge call path used by the live queue worker." },
|
||||
{ command: "codex steer <taskId> [prompt|--prompt-file path|--prompt-stdin] [--dry-run] [--no-retry|--retry-attempts N] [--full|--raw]", description: "Push a corrective prompt into a running Code Queue task; retryable tunnel aborts get bounded diagnostics, terminal-task rejection suggests codex task/read plus codex submit --reference-task-id <taskId>, and upstream/raw rejection details require explicit --full or --raw." },
|
||||
{ command: "codex steer <taskId> [prompt|--prompt-file path|--prompt-stdin] [--steer-id id] [--dry-run] [--no-retry|--retry-attempts N] [--full|--raw]", description: "Push a corrective prompt into a running Code Queue task with a steerId/idempotency key; retryable tunnel aborts get bounded trace confirmation before any retry guidance." },
|
||||
{ command: "codex steer-confirm <taskId> --steer-id <id> [--raw]", description: "Read-only lookup for a steerId in task trace so deliveryUnconfirmed can be resolved without resending the corrective prompt." },
|
||||
{ command: "codex interrupt|cancel <taskId>", description: "Request interrupt for a running Code Queue task, or cancel a queued/retry_wait task, through the same private proxy." },
|
||||
{ command: "codex (queues [--full|--all] | queue create <queueId> | queue merge <sourceQueueId> --into <targetQueueId> | move <taskId> --queue <queueId>)", description: "List low-noise queue summaries by default, including effective activity counts that distinguish scheduler-local queues, DB running tasks, and heartbeat-fresh runners; full queue rows require --full/--all." },
|
||||
{ command: "job list [--limit N] [--include-command]", description: "List async jobs from .state/jobs with a bounded default page." },
|
||||
@@ -266,7 +267,8 @@ function codexHelp(): unknown {
|
||||
"bun scripts/cli.ts codex skills-sync --dry-run [--full]",
|
||||
"bun scripts/cli.ts codex pr-preflight [--remote] [--push-dry-run --push-dry-run-ref refs/heads/probe/<name>] [--pr-create-dry-run --pr-create-dry-run-head <head>] [--issue N] [--full|--raw]",
|
||||
"bun scripts/cli.ts codex judge <taskId> --attempt N [--dry-run] [--include-prompt]",
|
||||
"bun scripts/cli.ts codex steer <taskId> [prompt|--prompt-file path|--prompt-stdin] [--dry-run] [--no-retry|--retry-attempts N] [--full|--raw]",
|
||||
"bun scripts/cli.ts codex steer <taskId> [prompt|--prompt-file path|--prompt-stdin] [--steer-id id] [--dry-run] [--no-retry|--retry-attempts N] [--full|--raw]",
|
||||
"bun scripts/cli.ts codex steer-confirm <taskId> --steer-id <id> [--raw]",
|
||||
"bun scripts/cli.ts codex interrupt|cancel <taskId>",
|
||||
"bun scripts/cli.ts codex queues [--full|--all] [--limit N] [--page N|--offset N] | queue create <queueId> | queue merge <sourceQueueId> --into <targetQueueId> | move <taskId> --queue <queueId>",
|
||||
],
|
||||
@@ -330,7 +332,7 @@ function codexHelp(): unknown {
|
||||
embeddedIn: ["codex submit --dry-run", "codex steer --dry-run"],
|
||||
reference: "docs/reference/code-queue-supervision.md#dev-测试授权分级",
|
||||
},
|
||||
description: "Operate Code Queue through the stable backend-core private proxy path with bounded activity summaries for queue and supervisor views. Real submit/steer success is a low-noise write confirmation and does not echo prompt text; terminal steer rejection returns compact status plus codex task/read/submit follow-up commands, with upstream/raw details behind --full or --raw.",
|
||||
description: "Operate Code Queue through the stable backend-core private proxy path with bounded activity summaries for queue and supervisor views. Real submit/steer success is a low-noise write confirmation and does not echo prompt text; steer output includes steerId, delivery state, and a bounded trace confirmation command; terminal steer rejection returns compact status plus codex task/read/submit follow-up commands, with upstream/raw details behind --full or --raw.",
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
@@ -136,6 +136,7 @@ import {
|
||||
} from "./oa-events";
|
||||
import { collectRuntimePreflight, runtimePreflightJson } from "./runtime-preflight";
|
||||
import { collectSkillAvailability, collectSkillSyncPreflight, skillAvailabilityJson, skillSyncPreflightJson } from "./skill-availability";
|
||||
import { createSteerId, findSteerTraceConfirmation, normalizeSteerId, normalizeSteerPromptText, steerDuplicateDecision, steerPromptHash, steerTraceConfirmationJson, steerTraceText } from "./steer-confirmation";
|
||||
import { configureSelfTests, runJudgeInfraSelfTest, runQueueClaimMoveSelfTest, runQueueOrderingSelfTest, runReferenceInjectionSelfTest, runStaleActiveRecoverySelfTest, runTracePortSelfTest, runTraceSummaryContractSelfTest } from "./self-tests";
|
||||
import {
|
||||
codexToolLifecycleStartedBeforeIn,
|
||||
@@ -918,10 +919,6 @@ function taskQueueEnteredAt(task: QueueTask): string {
|
||||
return entryEvents[0] ?? taskTimestamp(task.createdAt) ?? taskTimestamp(task.updatedAt) ?? nowIso();
|
||||
}
|
||||
|
||||
function normalizeSteerPromptText(text: string): string {
|
||||
return text.replace(/^\s*\[steer\]\s*/u, "").trimEnd();
|
||||
}
|
||||
|
||||
function outputPromptHistory(task: QueueTask): PromptHistoryItem[] {
|
||||
return task.output
|
||||
.filter((item) => item.channel === "user" && item.method === "turn/steer")
|
||||
@@ -930,6 +927,7 @@ function outputPromptHistory(task: QueueTask): PromptHistoryItem[] {
|
||||
at: item.at,
|
||||
method: "turn/steer" as const,
|
||||
text: normalizeSteerPromptText(item.text),
|
||||
...(normalizeSteerId(item.itemId) === null ? {} : { steerId: normalizeSteerId(item.itemId) as string }),
|
||||
}))
|
||||
.filter((item) => item.text.trim().length > 0);
|
||||
}
|
||||
@@ -940,7 +938,8 @@ function mergePromptHistory(items: PromptHistoryItem[]): PromptHistoryItem[] {
|
||||
const seq = Number(item.seq);
|
||||
const text = normalizeSteerPromptText(String(item.text || ""));
|
||||
if (!Number.isFinite(seq) || text.trim().length === 0) continue;
|
||||
byKey.set(`${seq}:${item.method}`, { seq, at: item.at || nowIso(), method: "turn/steer", text });
|
||||
const steerId = normalizeSteerId((item as PromptHistoryItem & { steerId?: unknown }).steerId);
|
||||
byKey.set(`${seq}:${item.method}`, { seq, at: item.at || nowIso(), method: "turn/steer", text, ...(steerId === null ? {} : { steerId }) });
|
||||
}
|
||||
return Array.from(byKey.values()).sort((left, right) => left.seq - right.seq);
|
||||
}
|
||||
@@ -2646,13 +2645,14 @@ function createTask(request: QueueTaskRequest): QueueTask {
|
||||
};
|
||||
}
|
||||
|
||||
function appendPromptHistory(task: QueueTask, output: LiveOutput | null, method: PromptHistoryItem["method"], text: string): void {
|
||||
function appendPromptHistory(task: QueueTask, output: LiveOutput | null, method: PromptHistoryItem["method"], text: string, options: { steerId?: string } = {}): void {
|
||||
if (output === null) return;
|
||||
task.promptHistory = mergePromptHistory([...(Array.isArray(task.promptHistory) ? task.promptHistory : []), {
|
||||
seq: output.seq,
|
||||
at: output.at,
|
||||
method,
|
||||
text,
|
||||
...(options.steerId === undefined ? {} : { steerId: options.steerId }),
|
||||
}]);
|
||||
markTaskDirty(task.id);
|
||||
schedulePersistState();
|
||||
@@ -4812,17 +4812,65 @@ async function createTasks(req: Request): Promise<Response> {
|
||||
async function steerTask(task: QueueTask, req: Request): Promise<Response> {
|
||||
if (!serviceRoleAllowsScheduler(config.serviceRole)) return schedulerOnlyRejectResponse(req.method, `/api/tasks/${task.id}/steer`);
|
||||
const body = await readJson(req);
|
||||
const prompt = typeof (body as Record<string, unknown>).prompt === "string" ? String((body as Record<string, unknown>).prompt) : "";
|
||||
const bodyRecord = typeof body === "object" && body !== null && !Array.isArray(body) ? body as Record<string, unknown> : {};
|
||||
const prompt = typeof bodyRecord.prompt === "string" ? String(bodyRecord.prompt) : "";
|
||||
if (prompt.trim().length === 0) return jsonResponse({ ok: false, error: "prompt is required" }, 400);
|
||||
const steerId = normalizeSteerId(bodyRecord.steerId) ?? createSteerId(task.id, prompt);
|
||||
const duplicateDecision = steerDuplicateDecision(task, steerId, prompt, taskFullOutput(task));
|
||||
if (duplicateDecision.duplicate) {
|
||||
await flushDirtyTasksToDatabase(true);
|
||||
return jsonResponse({
|
||||
ok: true,
|
||||
accepted: true,
|
||||
duplicateSuppressed: true,
|
||||
deliveryState: "accepted",
|
||||
steerId,
|
||||
traceConfirmation: steerTraceConfirmationJson(duplicateDecision.confirmation),
|
||||
task: taskForResponse(task),
|
||||
queue: await queueSummaryForResponse(),
|
||||
});
|
||||
}
|
||||
if (duplicateDecision.conflict) {
|
||||
return jsonResponse({
|
||||
ok: false,
|
||||
error: "steerId already exists with a different prompt hash",
|
||||
accepted: false,
|
||||
deliveryState: "not_accepted",
|
||||
steerId,
|
||||
existingPromptHash: duplicateDecision.existingPromptHash,
|
||||
requestedPromptHash: duplicateDecision.requestedPromptHash,
|
||||
traceConfirmation: steerTraceConfirmationJson(duplicateDecision.confirmation),
|
||||
task: taskForResponse(task),
|
||||
}, 409);
|
||||
}
|
||||
const activeRun = activeRunForTask(task);
|
||||
if (activeRun === null || activeRun.threadId === null || activeRun.turnId === null || typeof activeRun.app.steer !== "function") {
|
||||
return jsonResponse({ ok: false, error: "task does not have an active steerable turn", task: taskForResponse(task) }, 409);
|
||||
return jsonResponse({
|
||||
ok: false,
|
||||
error: "task does not have an active steerable turn",
|
||||
accepted: false,
|
||||
deliveryState: "not_accepted",
|
||||
steerId,
|
||||
traceConfirmation: steerTraceConfirmationJson(duplicateDecision.confirmation),
|
||||
task: taskForResponse(task),
|
||||
}, 409);
|
||||
}
|
||||
const output = appendOutput(task, "user", `\n[steer] ${prompt}\n`, "turn/steer");
|
||||
appendPromptHistory(task, output, "turn/steer", prompt);
|
||||
const output = appendOutput(task, "user", steerTraceText(steerId, prompt), "turn/steer", steerId);
|
||||
appendPromptHistory(task, output, "turn/steer", prompt, { steerId });
|
||||
await activeRun.app.steer(activeRun.threadId, activeRun.turnId, prompt);
|
||||
await flushDirtyTasksToDatabase(true);
|
||||
return jsonResponse({ ok: true, task: taskForResponse(task), queue: await queueSummaryForResponse() });
|
||||
const traceConfirmation = findSteerTraceConfirmation(task, steerId, taskFullOutput(task));
|
||||
return jsonResponse({
|
||||
ok: true,
|
||||
accepted: true,
|
||||
duplicateSuppressed: false,
|
||||
deliveryState: traceConfirmation.found ? "accepted" : "accepted_response_timeout",
|
||||
steerId,
|
||||
promptHash: steerPromptHash(prompt),
|
||||
traceConfirmation: steerTraceConfirmationJson(traceConfirmation),
|
||||
task: taskForResponse(task),
|
||||
queue: await queueSummaryForResponse(),
|
||||
});
|
||||
}
|
||||
|
||||
async function editQueuedTaskPrompt(task: QueueTask, req: Request): Promise<Response> {
|
||||
@@ -5726,6 +5774,17 @@ async function route(req: Request): Promise<Response> {
|
||||
if (task === null) return jsonResponse({ ok: false, error: "task not found" }, 404);
|
||||
return await taskTraceStepDetailResponse(task, url);
|
||||
}
|
||||
const steerConfirmationMatch = url.pathname.match(/^\/api\/tasks\/([^/]+)\/steer-confirmation$/u);
|
||||
if (steerConfirmationMatch !== null && req.method === "GET") {
|
||||
const task = await findTaskForRead(decodeURIComponent(steerConfirmationMatch[1] ?? ""));
|
||||
if (task === null) return jsonResponse({ ok: false, error: "task not found" }, 404);
|
||||
const steerId = normalizeSteerId(url.searchParams.get("steerId"));
|
||||
if (steerId === null) return jsonResponse({ ok: false, error: "steerId is required" }, 400);
|
||||
return jsonResponse({
|
||||
ok: true,
|
||||
confirmation: steerTraceConfirmationJson(findSteerTraceConfirmation(task, steerId, taskFullOutput(task))),
|
||||
});
|
||||
}
|
||||
const judgeTaskMatch = url.pathname.match(/^\/api\/tasks\/([^/]+)\/judge$/u);
|
||||
if (judgeTaskMatch !== null && (req.method === "GET" || req.method === "POST")) {
|
||||
const task = await findTaskForRead(decodeURIComponent(judgeTaskMatch[1] ?? ""));
|
||||
|
||||
@@ -0,0 +1,157 @@
|
||||
import { createHash } from "node:crypto";
|
||||
import type { JsonValue, LiveOutput, PromptHistoryItem, QueueTask } from "./types";
|
||||
|
||||
const steerIdPattern = /^[A-Za-z0-9._:-]{8,128}$/u;
|
||||
|
||||
export type SteerDeliveryState = "accepted" | "not_accepted" | "accepted_response_timeout" | "unknown";
|
||||
|
||||
export interface SteerTraceMatch {
|
||||
seq: number;
|
||||
at: string;
|
||||
method: "turn/steer";
|
||||
steerId: string;
|
||||
promptChars: number;
|
||||
promptHash: string;
|
||||
promptOmitted: true;
|
||||
source: "promptHistory" | "output";
|
||||
}
|
||||
|
||||
export interface SteerTraceConfirmation {
|
||||
taskId: string;
|
||||
steerId: string;
|
||||
found: boolean;
|
||||
accepted: boolean;
|
||||
deliveryState: SteerDeliveryState;
|
||||
trace: SteerTraceMatch | null;
|
||||
matches: SteerTraceMatch[];
|
||||
duplicateSuppressionKey: string;
|
||||
}
|
||||
|
||||
export interface SteerDuplicateDecision {
|
||||
duplicate: boolean;
|
||||
conflict: boolean;
|
||||
confirmation: SteerTraceConfirmation;
|
||||
existingPromptHash: string | null;
|
||||
requestedPromptHash: string;
|
||||
}
|
||||
|
||||
export function normalizeSteerId(value: unknown): string | null {
|
||||
if (typeof value !== "string") return null;
|
||||
const text = value.trim();
|
||||
if (!steerIdPattern.test(text)) return null;
|
||||
return text;
|
||||
}
|
||||
|
||||
export function steerPromptHash(prompt: string): string {
|
||||
return createHash("sha256").update(prompt, "utf8").digest("hex");
|
||||
}
|
||||
|
||||
export function createSteerId(taskId: string, prompt: string): string {
|
||||
const hash = createHash("sha256")
|
||||
.update("unidesk-code-queue-steer:v1", "utf8")
|
||||
.update("\0", "utf8")
|
||||
.update(taskId, "utf8")
|
||||
.update("\0", "utf8")
|
||||
.update(prompt, "utf8")
|
||||
.digest("hex")
|
||||
.slice(0, 24);
|
||||
return `steer_${hash}`;
|
||||
}
|
||||
|
||||
export function steerTraceText(steerId: string, prompt: string): string {
|
||||
return `\n[steer id=${steerId}] ${prompt}\n`;
|
||||
}
|
||||
|
||||
export function normalizeSteerPromptText(text: string): string {
|
||||
return text.replace(/^\s*\[steer(?:\s+[^\]]*)?\]\s*/u, "").trimEnd();
|
||||
}
|
||||
|
||||
function promptHistorySteerId(item: PromptHistoryItem): string | null {
|
||||
const record = item as PromptHistoryItem & { steerId?: unknown };
|
||||
return normalizeSteerId(record.steerId);
|
||||
}
|
||||
|
||||
function promptHistoryMatch(item: PromptHistoryItem, steerId: string): SteerTraceMatch | null {
|
||||
if (promptHistorySteerId(item) !== steerId) return null;
|
||||
const prompt = normalizeSteerPromptText(String(item.text || ""));
|
||||
return {
|
||||
seq: item.seq,
|
||||
at: item.at,
|
||||
method: "turn/steer",
|
||||
steerId,
|
||||
promptChars: prompt.length,
|
||||
promptHash: steerPromptHash(prompt),
|
||||
promptOmitted: true,
|
||||
source: "promptHistory",
|
||||
};
|
||||
}
|
||||
|
||||
function outputMatch(item: LiveOutput, steerId: string): SteerTraceMatch | null {
|
||||
if (item.channel !== "user" || item.method !== "turn/steer" || normalizeSteerId(item.itemId) !== steerId) return null;
|
||||
const prompt = normalizeSteerPromptText(item.text);
|
||||
return {
|
||||
seq: item.seq,
|
||||
at: item.at,
|
||||
method: "turn/steer",
|
||||
steerId,
|
||||
promptChars: prompt.length,
|
||||
promptHash: steerPromptHash(prompt),
|
||||
promptOmitted: true,
|
||||
source: "output",
|
||||
};
|
||||
}
|
||||
|
||||
export function findSteerTraceConfirmation(task: QueueTask, steerId: string, output: LiveOutput[] = task.output): SteerTraceConfirmation {
|
||||
const matches = [
|
||||
...(Array.isArray(task.promptHistory) ? task.promptHistory : []).map((item) => promptHistoryMatch(item, steerId)),
|
||||
...output.map((item) => outputMatch(item, steerId)),
|
||||
]
|
||||
.filter((item): item is SteerTraceMatch => item !== null)
|
||||
.sort((left, right) => left.seq - right.seq);
|
||||
const bySeq = new Map<number, SteerTraceMatch>();
|
||||
for (const match of matches) {
|
||||
const existing = bySeq.get(match.seq);
|
||||
if (existing === undefined || existing.source === "promptHistory") bySeq.set(match.seq, match);
|
||||
}
|
||||
const uniqueMatches = Array.from(bySeq.values()).sort((left, right) => left.seq - right.seq);
|
||||
const trace = uniqueMatches[0] ?? null;
|
||||
return {
|
||||
taskId: task.id,
|
||||
steerId,
|
||||
found: trace !== null,
|
||||
accepted: trace !== null,
|
||||
deliveryState: trace !== null ? "accepted" : "unknown",
|
||||
trace,
|
||||
matches: uniqueMatches,
|
||||
duplicateSuppressionKey: steerId,
|
||||
};
|
||||
}
|
||||
|
||||
export function steerDuplicateDecision(task: QueueTask, steerId: string, prompt: string, output: LiveOutput[] = task.output): SteerDuplicateDecision {
|
||||
const confirmation = findSteerTraceConfirmation(task, steerId, output);
|
||||
const requestedPromptHash = steerPromptHash(prompt);
|
||||
const existingPromptHash = confirmation.trace?.promptHash ?? null;
|
||||
return {
|
||||
duplicate: confirmation.found && existingPromptHash === requestedPromptHash,
|
||||
conflict: confirmation.found && existingPromptHash !== requestedPromptHash,
|
||||
confirmation,
|
||||
existingPromptHash,
|
||||
requestedPromptHash,
|
||||
};
|
||||
}
|
||||
|
||||
export function steerTraceConfirmationJson(confirmation: SteerTraceConfirmation): JsonValue {
|
||||
return {
|
||||
taskId: confirmation.taskId,
|
||||
steerId: confirmation.steerId,
|
||||
found: confirmation.found,
|
||||
accepted: confirmation.accepted,
|
||||
deliveryState: confirmation.deliveryState,
|
||||
trace: confirmation.trace as unknown as JsonValue,
|
||||
matchCount: confirmation.matches.length,
|
||||
matches: confirmation.matches.slice(0, 5) as unknown as JsonValue,
|
||||
matchesTruncated: confirmation.matches.length > 5,
|
||||
duplicateSuppressionKey: confirmation.duplicateSuppressionKey,
|
||||
promptOmitted: true,
|
||||
};
|
||||
}
|
||||
@@ -380,6 +380,7 @@ export interface PromptHistoryItem {
|
||||
at: string;
|
||||
method: "turn/steer";
|
||||
text: string;
|
||||
steerId?: string;
|
||||
}
|
||||
|
||||
export interface QueueTask {
|
||||
|
||||
Reference in New Issue
Block a user