From 2a45bfe180444498d483e0fc78e894638cacfc1f Mon Sep 17 00:00:00 2001 From: Codex Date: Tue, 19 May 2026 03:48:17 +0000 Subject: [PATCH] fix(code-queue): add active-run liveness diagnostics --- docs/reference/e2e.md | 4 +- docs/reference/microservices.md | 4 +- docs/reference/observability.md | 6 + .../code-queue-liveness-diagnostics-test.ts | 24 ++ scripts/src/check.ts | 8 + scripts/src/code-queue-liveness-fixtures.ts | 259 ++++++++++++++++++ scripts/src/code-queue.ts | 59 +++- scripts/src/e2e.ts | 18 ++ src/components/frontend/public/style.css | 79 +++++- src/components/frontend/src/code-queue.tsx | 72 +++++ .../code-queue-mgr/src-rs/main.rs | 24 +- .../microservices/code-queue/src/index.ts | 4 +- .../code-queue/src/self-tests.ts | 17 ++ 13 files changed, 561 insertions(+), 17 deletions(-) create mode 100644 scripts/code-queue-liveness-diagnostics-test.ts create mode 100644 scripts/src/code-queue-liveness-fixtures.ts diff --git a/docs/reference/e2e.md b/docs/reference/e2e.md index 4ed0a913..2755403f 100644 --- a/docs/reference/e2e.md +++ b/docs/reference/e2e.md @@ -35,7 +35,7 @@ Typical targeted commands: - Core API: `docker exec unidesk-backend-core` calls internal `GET /api/overview`, which must report `dbReady: true`, `pgdata.volumeName=unidesk_pgdata_10gb`, a positive PostgreSQL database byte count, and at least one online node; internal `GET /api/performance` must report component request statistics, internal operation statistics, PGDATA usage and Code Queue PostgreSQL storage metadata. - Provider self-connection: internal `GET /api/nodes` must contain `main-server` with `status: online`, `labels.providerGatewayVersion` equal to `src/components/provider-gateway/package.json`, `labels.providerGatewayUpgradePolicy: "always-enabled"`, `labels.providerGatewayRestartPolicyOk: true`, `labels.providerGatewayPidModeOk: true`, and `labels.providerGatewayRuntimeGuardOk: true`; internal `GET /api/nodes/system-status` must contain CPU/memory/disk samples plus a non-empty process resource list sorted by `memoryBytes` by default, where `memoryBytes` should use PSS when `/proc/[pid]/smaps_rollup` is available, otherwise `rssBytes - statm.shared` before raw RSS, and must retain `rssBytes` for diagnostics; internal `GET /api/nodes/docker-status` must contain a Docker snapshot for `main-server`; every running `provider-gateway` container visible in Docker snapshots must report `restartPolicy: "always"` and `pidMode: "host"`; public provider ingress `/health` must return ok. - Provider remote control: internal `/api/dispatch` must successfully complete a real `provider.upgrade` task in `mode: "plan"` so the upgrade path is validated without recreating the running gateway during E2E. -- User services: internal `/api/microservices` must include `todo-note` and `oa-event-flow` on `main-server`, canonical `filebrowser` on `D518`, plus `k3sctl-adapter`, `code-queue`, `findjob`, `pipeline`, `met-nonlinear`, `claudeqq` and `filebrowser-d601` on `D601` with `public=false`; `/api/microservices/todo-note/health` must report `storage=postgres`, `/api/microservices/todo-note/proxy/api/instances` must expose the migrated Todo Note lists, and a temporary Todo Note list create/add/toggle/undo/delete cycle must succeed through the real provider-gateway proxy; `/api/microservices/oa-event-flow/health`, `/api/microservices/oa-event-flow/proxy/api/diagnostics`, `/api/microservices/oa-event-flow/proxy/api/events`, `/api/microservices/oa-event-flow/proxy/api/events?tags=service:pipeline` and `/api/microservices/oa-event-flow/proxy/api/stats/trace` must prove the independent OA event table、Pipeline bridge 和 stats center are reachable through UniDesk proxy; `/api/microservices/k3sctl-adapter/health` and `/api/microservices/k3sctl-adapter/proxy/api/control-plane` must expose the D601 `unidesk-k3s` control plane, `kubeApiProxy.mode=kubernetes-api-service-proxy`, D601 active Code Queue instance `servingHealthy=true`, `presentNodeIds` containing `D601`, `missingNodeIds=[]`, `status=healthy`, and `noFallback=true`; `/api/microservices/code-queue/health` must return the active Code Queue backend summary with default model `gpt-5.5`, `egressProxy.connected=true`, and `/api/microservices/code-queue/proxy/api/tasks/overview` must return queue state through backend-core -> k3sctl-adapter -> Kubernetes API service proxy -> k3s/k8s Service, not through a `serviceId=code-queue` provider-gateway direct task or `/api/code-queue-direct`; Code Queue raw prompt observation fields must preserve long prompt tails across create/list/detail/frontend paths, with any shortened text exposed only through explicit `*Preview` objects carrying `chars` and `truncated`; `/api/microservices/filebrowser/health`, `/api/microservices/filebrowser-d601/health` and `/api/microservices/filebrowser/proxy/` must prove File Browser health and WebUI access through UniDesk proxy; `/api/microservices/findjob/health` and `/api/microservices/findjob/proxy/api/summary` must succeed through the real provider-gateway proxy; `/api/microservices/findjob/proxy/api/jobs?__unideskArrayLimit=jobs:5` must return a bounded preview with `_unidesk.arrayLimits` metadata; `/api/microservices/pipeline/health`, `/api/microservices/pipeline/proxy/api/snapshot?__unideskArrayLimit=registry.components:8,runs:3` and `/api/microservices/pipeline/proxy/api/oa-event-flow/diagnostics` must return Pipeline health, registry/run previews and OA event-flow evidence; `/api/microservices/met-nonlinear/health`, `/api/microservices/met-nonlinear/proxy/api/queue`, `/api/microservices/met-nonlinear/proxy/api/projects?root=projects&limit=500`, `/api/microservices/met-nonlinear/proxy/api/projects?root=ex_projects&limit=500`, `/api/microservices/met-nonlinear/proxy/api/projects/config?path=` and `/api/microservices/met-nonlinear/proxy/api/images` must return the D601 TS backend health, queue/GPU policy, full project tree inputs, structured project detail and ready `met-nonlinear-ml:tf26` image status. +- User services: internal `/api/microservices` must include `todo-note` and `oa-event-flow` on `main-server`, canonical `filebrowser` on `D518`, plus `k3sctl-adapter`, `code-queue`, `findjob`, `pipeline`, `met-nonlinear`, `claudeqq` and `filebrowser-d601` on `D601` with `public=false`; `/api/microservices/todo-note/health` must report `storage=postgres`, `/api/microservices/todo-note/proxy/api/instances` must expose the migrated Todo Note lists, and a temporary Todo Note list create/add/toggle/undo/delete cycle must succeed through the real provider-gateway proxy; `/api/microservices/oa-event-flow/health`, `/api/microservices/oa-event-flow/proxy/api/diagnostics`, `/api/microservices/oa-event-flow/proxy/api/events`, `/api/microservices/oa-event-flow/proxy/api/events?tags=service:pipeline` and `/api/microservices/oa-event-flow/proxy/api/stats/trace` must prove the independent OA event table、Pipeline bridge 和 stats center are reachable through UniDesk proxy; `/api/microservices/k3sctl-adapter/health` and `/api/microservices/k3sctl-adapter/proxy/api/control-plane` must expose the D601 `unidesk-k3s` control plane, `kubeApiProxy.mode=kubernetes-api-service-proxy`, D601 active Code Queue instance `servingHealthy=true`, `presentNodeIds` containing `D601`, `missingNodeIds=[]`, `status=healthy`, and `noFallback=true`; `/api/microservices/code-queue/health` must return the active Code Queue backend summary with default model `gpt-5.5`, `egressProxy.connected=true`, `queue.executionDiagnostics` containing DB active state, scheduler active slots, scheduler heartbeat and Trace/OA progress, and `/api/microservices/code-queue/proxy/api/tasks/overview` must return queue state through backend-core -> k3sctl-adapter -> Kubernetes API service proxy -> k3s/k8s Service, not through a `serviceId=code-queue` provider-gateway direct task or `/api/code-queue-direct`; Code Queue raw prompt observation fields must preserve long prompt tails across create/list/detail/frontend paths, with any shortened text exposed only through explicit `*Preview` objects carrying `chars` and `truncated`; `/api/microservices/filebrowser/health`, `/api/microservices/filebrowser-d601/health` and `/api/microservices/filebrowser/proxy/` must prove File Browser health and WebUI access through UniDesk proxy; `/api/microservices/findjob/health` and `/api/microservices/findjob/proxy/api/summary` must succeed through the real provider-gateway proxy; `/api/microservices/findjob/proxy/api/jobs?__unideskArrayLimit=jobs:5` must return a bounded preview with `_unidesk.arrayLimits` metadata; `/api/microservices/pipeline/health`, `/api/microservices/pipeline/proxy/api/snapshot?__unideskArrayLimit=registry.components:8,runs:3` and `/api/microservices/pipeline/proxy/api/oa-event-flow/diagnostics` must return Pipeline health, registry/run previews and OA event-flow evidence; `/api/microservices/met-nonlinear/health`, `/api/microservices/met-nonlinear/proxy/api/queue`, `/api/microservices/met-nonlinear/proxy/api/projects?root=projects&limit=500`, `/api/microservices/met-nonlinear/proxy/api/projects?root=ex_projects&limit=500`, `/api/microservices/met-nonlinear/proxy/api/projects/config?path=` and `/api/microservices/met-nonlinear/proxy/api/images` must return the D601 TS backend health, queue/GPU policy, full project tree inputs, structured project detail and ready `met-nonlinear-ml:tf26` image status. Code Queue liveness fixture checks are first-class E2E selections: `code-queue:active-run-heartbeat-visible`, `code-queue:trace-gap-not-stale`, `code-queue:stale-active-owner-expired`, `code-queue:control-plane-split-brain-diagnostics` and `code-queue:oa-publisher-degraded-visible`. - ClaudeQQ availability: `/api/microservices/claudeqq/health` must only pass when `ready=true`, NapCat HTTP and WebSocket are connected, and `napcat.loginState=logged_in`; `/api/microservices/claudeqq/proxy/api/napcat/login` must show the same logged-in account state and `/api/microservices/claudeqq/proxy/api/events/recent` must prove the backend can read the persistent event cache. A QR-code-only or not-logged-in NapCat state must be treated as unhealthy. - Database: the command writes an `unidesk_e2e_markers` row through `docker exec unidesk-database psql`, confirms provider state is stored in PostgreSQL, and checks Todo Note rows exist in `todo_note_instances` using the same named volume. - Pipeline OA event flow: `microservice:pipeline-oa-event-flow` must prove both no-audit and monitor-audit runs are driven by OA events end to end. The event stream must show `node-finished` as a neutral fact with `pipeline:{pipelineId}` and `epoch:{runId}` tags, OA policy as the source of downstream/audit decisions, monitor decisions as OA control events, and runner control-result evidence. E2E must fail if delivery still depends on a legacy detail audit policy flag as policy authority, independent legacy audit-request points, a legacy batch completion gate, direct monitor-to-runner calls, or frontend/CLI writes to Pipeline `.state`. @@ -55,7 +55,7 @@ Remote update records in the frontend are covered by the same rule: `provider.up Provider operation availability is also covered by the structured rendering rule. `host.ssh` availability must be displayed as badges or equivalent controls derived from capabilities and `hostSsh*` labels, and remote update availability must be displayed from `provider.upgrade` capability plus the `always-enabled` policy; these fields must not require opening raw Provider JSON. -User service pages are covered by the same rule. `Todo Note` must show lists, task tree, filters, reminder input, movement controls, undo/redo and metrics as controls; `OA Event Flow` must show health, live tag stream state, event table, tag filter presets and Trace stats table as controls; `Code Queue` must show queue cards, live transcript, model/cwd/max attempt inputs, judge decision, attempt table, append prompt, interrupt and retry controls; `File Browser` must show D518 as the default target, D601 as an alternate target, a screenshot export action, and an embedded upstream WebUI frame served through `/api/microservices//proxy/` with compact file rows that do not let material-icon fallback text cover file metadata; `FindJob` must show metrics, jobs and drafts as cards/tables; `Pipeline` must show component classes, React Flow graph nodes/edges, run cards, Gantt execution lines and OpenCode Trace timelines as controls; `MET Nonlinear` must show queue rows, GPU/image cards, a real path tree for the project library, structured project/job detail panels, project config preview, `data/` training state, model parameter count, metrics, progress bars, ETA, `epoch/h` speed and history diagnostics as controls; `ClaudeQQ` must show NapCat HTTP/WS/login badges, QR/login panel, event cache, subscriptions and message push controls; the full user-service config, summary, snapshot, jobs preview, drafts, OA events and run JSON can only appear after an explicit `查看原始JSON` click. +User service pages are covered by the same rule. `Todo Note` must show lists, task tree, filters, reminder input, movement controls, undo/redo and metrics as controls; `OA Event Flow` must show health, live tag stream state, event table, tag filter presets and Trace stats table as controls; `Code Queue` must show queue cards, execution liveness diagnostics, live transcript, model/cwd/max attempt inputs, judge decision, attempt table, append prompt, interrupt and retry controls; `File Browser` must show D518 as the default target, D601 as an alternate target, a screenshot export action, and an embedded upstream WebUI frame served through `/api/microservices//proxy/` with compact file rows that do not let material-icon fallback text cover file metadata; `FindJob` must show metrics, jobs and drafts as cards/tables; `Pipeline` must show component classes, React Flow graph nodes/edges, run cards, Gantt execution lines and OpenCode Trace timelines as controls; `MET Nonlinear` must show queue rows, GPU/image cards, a real path tree for the project library, structured project/job detail panels, project config preview, `data/` training state, model parameter count, metrics, progress bars, ETA, `epoch/h` speed and history diagnostics as controls; `ClaudeQQ` must show NapCat HTTP/WS/login badges, QR/login panel, event cache, subscriptions and message push controls; the full user-service config, summary, snapshot, jobs preview, drafts, OA events and run JSON can only appear after an explicit `查看原始JSON` click. ## Public Boundary Rule diff --git a/docs/reference/microservices.md b/docs/reference/microservices.md index fae7786d..5ca3786c 100644 --- a/docs/reference/microservices.md +++ b/docs/reference/microservices.md @@ -180,7 +180,9 @@ D601 上必须显式使用原生 k3s kubeconfig:`KUBECONFIG=/etc/rancher/k3s/k - 服务拆分语义:`code-queue-read` 只承载 GET/HEAD 查询、overview、任务详情、Trace/output/transcript、统计和只读健康,可多副本滚动更新;它必须设置 `CODE_QUEUE_SERVICE_ROLE=read` 与 `CODE_QUEUE_SCHEDULER_ENABLED=false`,且不得接受入队、queue 变更、已读、重试、移动、追加 prompt 或打断这类 mutation。`code-queue-write` 承载入队、queue 创建/合并/更新、已读、手动重试、移动等命令写入,初期保持单副本和 `CODE_QUEUE_SERVICE_ROLE=write`,只把命令和任务状态写入 PostgreSQL,不启动 agent 子进程。`code-queue-scheduler` 是唯一拥有 scheduler 和 active run 的执行服务,设置 `CODE_QUEUE_SERVICE_ROLE=scheduler` 与 `CODE_QUEUE_SCHEDULER_ENABLED=true`,负责从 PostgreSQL 热任务集轮询新写入任务、推进队列、启动 Codex/OpenCode、处理 running task 的 steer/interrupt、发送终态通知和暴露执行端 `/health`。普通 Service 负载均衡不得把 mutation 打到 read,也不得把 running task 控制打到 write。 - 实例语义:D601 是当前唯一 active 执行节点,`code-queue-scheduler` 以一个 scheduler Pod 承载长生命周期 Codex/OpenCode 子进程并轮询主 PostgreSQL 中由 `code-queue-mgr` 写入的 queued/retry_wait 任务。D518 不属于当前 Code Queue k3s 拓扑;在没有原生 k3s-agent 与稳定 Kubernetes 网络前,不得把 D518 写回 `expectedNodeIds` 或恢复 `code-queue-d518` standby。D601 scheduler 默认关闭 `CODE_QUEUE_STARTUP_OA_BACKFILL_ENABLED`;历史 OA Trace/STEP 回填必须通过显式 `/api/oa/backfill` 运维动作触发,不能在每次 Pod 重启时自动批量发布旧事件。 - 滚动更新边界:master `code-queue-mgr` 保证 D601 抖动或执行面滚动更新期间普通提交、queue 管理和历史读取仍可用;但当前 D601 scheduler Pod 内仍直接承载正在运行的 agent 子进程,scheduler Pod 被替换时 active task 仍会进入 restart-recovery/retry 语义,不能宣称 running task 零中断。真正的长期目标是继续把调度器和执行器拆开:scheduler 只负责 claim task 并创建 Kubernetes Job/Pod 或独立 worker,runner 把输出、状态、attempt、事件和通知写回 PostgreSQL/OA Event Flow/归档;只有这样 controller/scheduler 滚动更新才不会影响正在执行的任务。 -- Restart recovery:D601 scheduler 启动时必须把没有本地 active run 的 `running`/`judging` 任务恢复为 `retry_wait` 并先写回 PostgreSQL,再开启新一轮 scheduler 轮询;同时必须清理 `queued`/`retry_wait`/terminal 任务残留的 `activeTurnId`,否则 PG 中残留的 running 或旧 turn id 会阻塞队列且不会被执行。health/overview 中的 `activeTaskIds` 只代表当前进程真实持有的 agent run;数据库里仍处于 `running`/`judging` 但没有本地 run 的任务只能作为 scheduler 侧 `orphanedActiveTaskIds` 暴露,不能计入 active run slot。主 server 直管 `code-queue-mgr` 只有 PostgreSQL 视角,不得把数据库中的 `running`/`judging` 误报为真实 active run;只能作为 `databaseActiveTaskIds`/`executionStateSource=postgres-control-plane` 这类控制面状态返回。 +- Active run liveness:Code Queue 活性判断必须同时读取 PostgreSQL 任务状态、D601 scheduler 本地 active run/active slot/active queue、scheduler-owned heartbeat、Trace/OA 持久化进度和 OA publisher pending/lastError。scheduler heartbeat 至少包含 `taskId`、`attempt`、`activeTurnId`/`codexThreadId`、`owner`/`schedulerInstance`、`lastLocalHeartbeatAt`、`lastObservedAgentEventAt`、`lastPersistedTraceAt`、`outputMaxSeq` 与 `agentPort`;后续如拆出独立 runner,可以在同一结构上追加 worker/claim lease 字段。master `code-queue-mgr` 的 `postgres-control-plane` 视图不能单独作为“任务已卡死/未执行”的依据;当 master 看到 `databaseActiveTaskCount>0`、本地 `activeRunSlotCount=0`,但存在新鲜 scheduler heartbeat 时,`executionDiagnostics.state` 必须报告 `split-brain`/`degraded` 而不是 `healthy`。 +- Trace gap 与 stale active:Trace/OA 长时间没有新 seq 或 publisher 有 pending/lastError 只说明持久化链路可能 degraded;只要 scheduler-owned `lastLocalHeartbeatAt` 仍新鲜,就必须归类为 trace gap,不得触发 stale retry。只有 PostgreSQL 仍为 `running`/`judging`、scheduler 本地没有 active run/slot/waiter,并且 owner heartbeat 已过期时,任务才允许进入 stale recovery candidate;缺失 heartbeat 是 degraded 诊断,不是自动恢复许可。 +- Restart recovery:D601 scheduler 启动或 reconciliation 时只能由 scheduler-owned 恢复入口处理 stale active,必须留下 recovery reason、source/method 审计事件,并使用条件更新防止覆盖并发 owner 写入;恢复原因需要区分 user interrupt、admin stale recovery 和 service restart recovery。禁止裸改 production PostgreSQL 任务状态,禁止把 production scheduler/数据库作为破坏性测试对象。health/overview 中的 `activeTaskIds` 只代表当前进程真实持有的 agent run;数据库里仍处于 `running`/`judging` 但没有本地 run 的任务只能作为 scheduler 侧 `orphanedActiveTaskIds` 或 diagnostics 暴露,不能计入 active run slot。主 server 直管 `code-queue-mgr` 只有 PostgreSQL 视角,不得把数据库中的 `running`/`judging` 误报为真实 active run;只能作为 `databaseActiveTaskIds`/`executionStateSource=postgres-control-plane` 这类控制面状态返回。 - Transient dependency recovery:D601 scheduler/read/write 通过 provider egress 和 TCP gateway 访问主 PostgreSQL、OA Event Flow 与模型 API,必须把 `CONNECTION_CLOSED`、`CONNECT_TIMEOUT`、stale PostgreSQL client、provider egress 瞬时失败和 MiniMax judge provider 初始化失败视为可恢复运行时抖动。实现上应轮换失效数据库 client、重试或降级 judge provider 初始化、释放 active run slot 并继续扫描后续 queued/retry_wait 任务;不得因为一次连接关闭、一次 judge provider transient error 或滚动更新窗口让 scheduler 长期停止推进。 - 部署引用:Code Queue 镜像仍复用 `src/components/microservices/code-queue/Dockerfile`,Kubernetes 运行清单为 `src/components/microservices/k3sctl-adapter/k3s/code-queue.k8s.yaml`,`config.json` 对外记录 k3s manifest `src/components/microservices/k3sctl-adapter/k3s/code-queue.k3s.json`;主 server 根目录 `docker-compose.yml` 不包含 `code-queue` service,旧 D601 direct Compose 文件只作为迁移/本地诊断参考,不是正式运行入口。 - 主服务依赖映射:Code Queue 仍以主 PostgreSQL 为权威数据库,但 D601 k3s Pod 不能依赖公网直连 `74.48.78.17:15432/4255`。Pod 内 `DATABASE_URL` 和 `OA_EVENT_FLOW_BASE_URL` 必须指向集群内 `d601-tcp-egress-gateway` Service,再由该 gateway 通过 D601 provider-gateway egress proxy 的 HTTP CONNECT 转发到主 PostgreSQL 和 OA Event Flow;新增 TCP 依赖时扩展 `TCP_EGRESS_ROUTES`,不得在业务容器里新增一次性公网直连或 ad hoc 隧道。D601 active 实例的 `CODE_QUEUE_NOTIFY_CLAUDEQQ_BASE_URL` 必须使用集群内 ClaudeQQ Service `http://claudeqq.unidesk.svc.cluster.local:3290`,并把 `claudeqq`/`claudeqq.unidesk.svc.cluster.local` 加入 `NO_PROXY`,避免任务完成通知被默认出网代理错误转发。旧 `http://host.docker.internal:3290` 只允许作为迁移期诊断,不得作为 Code Queue k3s Pod 的正式通知路径。这些端口映射只服务受控节点运行时,必须用防火墙或等价策略限制来源,不得成为浏览器或任意公网客户端入口。 diff --git a/docs/reference/observability.md b/docs/reference/observability.md index af755af2..dcbc3566 100644 --- a/docs/reference/observability.md +++ b/docs/reference/observability.md @@ -47,3 +47,9 @@ frontend Bun server 必须提供同源 `/api/frontend-performance`,记录 webu 当最近失败请求集中出现 frontend `core_proxy` 502/503/504,路径为 `/api/microservices/code-queue/proxy/...` 的 overview、trace 或 summary,且 k3s/k8s Pod 仍在运行时,必须先运行 `bun scripts/cli.ts microservice diagnostics code-queue`,区分 provider-gateway online、WebSocket HTTP tunnel、k3sctl-adapter、Kubernetes API service proxy 和目标 Service 五段状态。provider tunnel 类失败必须记录响应 body/headers 中的 `requestId`、`stage`、`failureReason`、`x-unidesk-request-id` 和 `x-unidesk-tunnel-error`;如需主动验证错误结构,运行 `bun scripts/cli.ts microservice tunnel-self-test code-queue`,该自测应返回预期失败但 `ok=true` 的诊断结果。随后再继续判断“Kubernetes API service proxy 不可达”“Code Queue 进程不可达”和“Code Queue event loop 被热路径同步工作饿死”。如果 `debug health` 或 provider-gateway egress health 显示 `providerGatewayEgressProxyActiveTunnels` 持续偏高、`pendingTunnels` 非零或 `oldestTunnelAgeMs` 长时间增长,应先按 provider-gateway egress tunnel 生命周期排障,确认 `egress_tcp_open`、connect timeout、idle cleanup 与 core socket close 清理是否生效。排障顺序是同时查看 `/api/frontend-performance`、`/api/performance`、`k3sctl-adapter` `/api/control-plane`、Kubernetes Pod `/live`、`/health`、overview/trace-step curl、`kubectl top pod` 或 Docker stats、容器 `RestartCount`/`OOMKilled` 和 Code Queue 日志;如果 Pod 内 `/health` 也超时,应优先检查实时 output 发布、archive 读取、transcript 构建、统计计算、启动维护、历史 OA backfill 和远程 Provider 准备/SSH 子进程是否阻塞 event loop,而不是先调整 frontend 渲染或代理超时。Code Queue 默认不得在启动时自动执行历史 OA backfill 或通知表索引维护;显式 backfill 必须作为运维动作记录,并在运行期间并发证明 `/live`、`/health` 与 `/api/tasks/overview` 仍快速返回。涉及 D601 等远程 Provider 时,还要检查 `runCodeQueueSsh`/开发容器准备是否仍存在同步子进程、无 timeout 的 SSH、无上限 stdout/stderr 或 stale TUN 重建等待;修复后必须在远程准备探针运行期间并发证明 Pod `/health` 与 `/api/tasks/overview` 仍快速返回。 Code Queue task 明明产出最终回复却反复 `retry_wait` 时,应优先用任务详情里的 latest attempt 字段核查 `terminalStatus`、`transportClosedBeforeTerminal`、`appServerExitCode`、`finalResponseChars`、`judge.raw._safetyOverride` 和 attempt output。OpenCode 远程任务中,`opencode completed status=completed exit=0` 加当前 attempt 非空 assistant 输出应对应 `terminalStatus=completed`、`transportClosedBeforeTerminal=false`;如果因为缺少 `step_finish` 事件仍触发 `_safetyOverride=terminal_not_completed`,说明协议终态归一化有回归。相反,当前 attempt 没有最终 assistant response 时即使 tool/read/bash 证据完整,也必须 retry,不能用旧 `task.finalResponse` 或 reasoning/tool evidence 代替可见最终回复。 + +### Code Queue Liveness + +Code Queue 的“任务是否卡死”不能由单一控制面字段判断。排障必须同时看 PostgreSQL 中的 `running`/`judging` 任务、D601 scheduler 本地 active run/active slot/active queue、scheduler-owned heartbeat、Trace/OA 持久化进度和 OA publisher pending/lastError。master `code-queue-mgr` 的 `postgres-control-plane` 视图只证明数据库行存在;当它显示 `activeRunSlotCount=0` 但 D601 heartbeat 仍新鲜时,正确结论是 control-plane/execution-plane 分裂,diagnostics 应显示 `split-brain` 或 `degraded`,不能宣称任务未执行或卡死。 + +Trace/OA 长时间没有新 seq 但 scheduler heartbeat 正常时,应归类为 trace gap 或 publisher degraded,不得自动 retry。只有 scheduler 本地没有 active run,且对应 owner heartbeat 已过期时,才允许进入 stale recovery candidate;缺失 heartbeat 只能触发 degraded 诊断和人工确认。任何恢复入口都必须由 scheduler 执行,使用条件更新和审计事件区分 user interrupt、admin stale recovery 与 service restart recovery;禁止直接修改 production PostgreSQL 任务状态来“修复” active run。 diff --git a/scripts/code-queue-liveness-diagnostics-test.ts b/scripts/code-queue-liveness-diagnostics-test.ts new file mode 100644 index 00000000..5cf26253 --- /dev/null +++ b/scripts/code-queue-liveness-diagnostics-test.ts @@ -0,0 +1,24 @@ +import { CODE_QUEUE_LIVENESS_CHECK_NAMES, runCodeQueueLivenessFixtureChecks } from "./src/code-queue-liveness-fixtures"; + +function optionValues(args: string[], name: string): string[] { + const values: string[] = []; + for (let index = 0; index < args.length; index += 1) { + if (args[index] !== name) continue; + const raw = args[index + 1]; + if (raw === undefined || raw.startsWith("--")) throw new Error(`${name} requires a check name`); + values.push(...raw.split(",").map((item) => item.trim()).filter(Boolean)); + index += 1; + } + return values; +} + +function main(): void { + const only = optionValues(Bun.argv.slice(2), "--only"); + const unknown = only.filter((name) => !CODE_QUEUE_LIVENESS_CHECK_NAMES.includes(name as never)); + if (unknown.length > 0) throw new Error(`unknown Code Queue liveness check(s): ${unknown.join(", ")}`); + const result = runCodeQueueLivenessFixtureChecks(only); + process.stdout.write(`${JSON.stringify(result, null, 2)}\n`); + if (!result.ok) process.exit(1); +} + +if (import.meta.main) main(); diff --git a/scripts/src/check.ts b/scripts/src/check.ts index 731eef6b..dffd26dc 100644 --- a/scripts/src/check.ts +++ b/scripts/src/check.ts @@ -237,6 +237,8 @@ export function runChecks(config: UniDeskConfig, options: CheckOptions = default fileItem("src/components/microservices/code-queue-mgr/src/prompt-observation.ts"), fileItem("scripts/src/deploy.ts"), fileItem("scripts/code-queue-issue3-regression-test.ts"), + fileItem("scripts/code-queue-liveness-diagnostics-test.ts"), + fileItem("scripts/src/code-queue-liveness-fixtures.ts"), fileItem("scripts/src/ci.ts"), fileItem("scripts/src/e2e.ts"), fileItem("scripts/code-queue-prompt-observation-test.ts"), @@ -250,10 +252,16 @@ export function runChecks(config: UniDeskConfig, options: CheckOptions = default items.push(commandItem("typescript:scripts", ["bunx", "tsc", "-p", "scripts/tsconfig.json", "--noEmit", "--pretty", "false"], 120_000)); items.push(commandItem("code-queue:prompt-observation-contract", ["bun", "scripts/code-queue-prompt-observation-test.ts"], 30_000)); items.push(commandItem("code-queue:issue3-diagnostics-and-image-preflight", ["bun", "scripts/code-queue-issue3-regression-test.ts"], 30_000)); + items.push(commandItem("code-queue:active-run-heartbeat-visible", ["bun", "scripts/code-queue-liveness-diagnostics-test.ts", "--only", "code-queue:active-run-heartbeat-visible"], 30_000)); + items.push(commandItem("code-queue:trace-gap-not-stale", ["bun", "scripts/code-queue-liveness-diagnostics-test.ts", "--only", "code-queue:trace-gap-not-stale"], 30_000)); + items.push(commandItem("code-queue:stale-active-owner-expired", ["bun", "scripts/code-queue-liveness-diagnostics-test.ts", "--only", "code-queue:stale-active-owner-expired"], 30_000)); + items.push(commandItem("code-queue:control-plane-split-brain-diagnostics", ["bun", "scripts/code-queue-liveness-diagnostics-test.ts", "--only", "code-queue:control-plane-split-brain-diagnostics"], 30_000)); + items.push(commandItem("code-queue:oa-publisher-degraded-visible", ["bun", "scripts/code-queue-liveness-diagnostics-test.ts", "--only", "code-queue:oa-publisher-degraded-visible"], 30_000)); } else { items.push(skippedItem("typescript:scripts", "scripts TypeScript typecheck is opt-in", "--scripts-typecheck or --full")); items.push(skippedItem("code-queue:prompt-observation-contract", "prompt observation contract is opt-in with script checks", "--scripts-typecheck or --full")); items.push(skippedItem("code-queue:issue3-diagnostics-and-image-preflight", "Code Queue issue #3 regression fixtures are opt-in with script checks", "--scripts-typecheck or --full")); + items.push(skippedItem("code-queue:liveness-diagnostics-fixtures", "Code Queue liveness diagnostics fixtures are opt-in with script checks", "--scripts-typecheck or --full")); } if (options.logs) { items.push(unifiedLogRotationItem()); diff --git a/scripts/src/code-queue-liveness-fixtures.ts b/scripts/src/code-queue-liveness-fixtures.ts new file mode 100644 index 00000000..c547d1da --- /dev/null +++ b/scripts/src/code-queue-liveness-fixtures.ts @@ -0,0 +1,259 @@ +import { buildExecutionDiagnostics, buildSchedulerHeartbeat, staleRecoveryCandidate, taskHasTraceGapButFreshHeartbeat } from "../../src/components/microservices/code-queue/src/execution-diagnostics"; +import type { ActiveRun } from "../../src/components/microservices/code-queue/src/code-agent/common"; +import type { CodeQueueExecutionDiagnostics, QueueTask, SchedulerActiveRunHeartbeat, TaskStatus } from "../../src/components/microservices/code-queue/src/types"; + +export const CODE_QUEUE_LIVENESS_CHECK_NAMES = [ + "code-queue:active-run-heartbeat-visible", + "code-queue:trace-gap-not-stale", + "code-queue:stale-active-owner-expired", + "code-queue:control-plane-split-brain-diagnostics", + "code-queue:oa-publisher-degraded-visible", +] as const; + +type CodeQueueLivenessCheckName = typeof CODE_QUEUE_LIVENESS_CHECK_NAMES[number]; + +interface FixtureCheck { + name: CodeQueueLivenessCheckName; + ok: boolean; + detail: Record; +} + +const now = "2026-05-19T00:10:00.000Z"; +const freshAt = "2026-05-19T00:09:50.000Z"; +const oldTraceAt = "2026-05-18T23:40:00.000Z"; +const expiredAt = "2026-05-18T23:50:00.000Z"; + +function assertCondition(condition: unknown, message: string, detail: Record = {}): void { + if (!condition) throw new Error(`${message}: ${JSON.stringify(detail)}`); +} + +function fixtureTask(id: string, status: TaskStatus, heartbeat: SchedulerActiveRunHeartbeat | null = null): QueueTask { + return { + id, + queueId: "default", + queueEnteredAt: "2026-05-19T00:00:00.000Z", + prompt: `${id} prompt`, + basePrompt: `${id} prompt`, + referenceTaskIds: [], + referenceInjection: null, + providerId: "D601", + cwd: "/workspace", + model: "gpt-5.5", + reasoningEffort: null, + executionMode: "default", + maxAttempts: 99, + status, + createdAt: "2026-05-19T00:00:00.000Z", + updatedAt: "2026-05-19T00:00:00.000Z", + startedAt: status === "running" || status === "judging" ? "2026-05-19T00:00:00.000Z" : null, + finishedAt: null, + readAt: null, + currentAttempt: status === "queued" ? 0 : 1, + currentMode: status === "queued" ? null : "initial", + codexThreadId: heartbeat?.codexThreadId ?? null, + activeTurnId: heartbeat?.activeTurnId ?? null, + schedulerHeartbeat: heartbeat, + finalResponse: "", + outputMaxSeq: heartbeat?.outputMaxSeq ?? 0, + lastError: null, + lastJudge: null, + judgeFailCount: 0, + promptHistory: [], + output: [], + events: [], + attempts: [], + cancelRequested: false, + nextPrompt: null, + nextMode: null, + }; +} + +function heartbeat(taskId: string, at: string, overrides: Partial = {}): SchedulerActiveRunHeartbeat { + return { + taskId, + queueId: "default", + attempt: 1, + activeTurnId: "turn_fixture", + codexThreadId: "thread_fixture", + owner: "D601", + schedulerInstance: "code-queue-scheduler-fixture", + executionPlane: "scheduler-execution-plane", + agentPort: "codex", + status: "running", + lastLocalHeartbeatAt: at, + lastObservedAgentEventAt: at, + lastPersistedTraceAt: at, + outputMaxSeq: 10, + source: "scheduler", + ...overrides, + }; +} + +function activeRun(taskId: string, queueId = "default"): ActiveRun { + return { + taskId, + queueId, + app: { stop: () => undefined }, + port: "codex", + threadId: "thread_fixture", + turnId: "turn_fixture", + startedAt: "2026-05-19T00:00:00.000Z", + lastLocalHeartbeatAt: freshAt, + lastObservedAgentEventAt: freshAt, + lastPersistedTraceAt: freshAt, + }; +} + +function schedulerDiagnostics(tasks: QueueTask[], activeRuns: ActiveRun[] = [], oaPublisher: unknown = null): CodeQueueExecutionDiagnostics { + return buildExecutionDiagnostics({ + now, + controlPlane: "D601-code-queue-scheduler", + executionStateSource: "scheduler-execution-plane", + tasks, + activeRuns, + activeRunSlotCount: activeRuns.length, + activeQueueIds: activeRuns.map((run) => run.queueId), + processingQueueIds: [], + orphanedActiveTaskIds: tasks.filter((task) => (task.status === "running" || task.status === "judging") && !activeRuns.some((run) => run.taskId === task.id)).map((task) => task.id), + oaPublisher: oaPublisher as never, + }); +} + +function checkActiveRunHeartbeatVisible(): FixtureCheck { + const task = fixtureTask("codex_fixture_active_1", "running"); + const run = activeRun(task.id); + task.schedulerHeartbeat = buildSchedulerHeartbeat(task, run, { + now: freshAt, + owner: "D601", + schedulerInstance: "code-queue-scheduler-fixture", + agentPort: "codex", + lastObservedAgentEventAt: freshAt, + lastPersistedTraceAt: freshAt, + }); + const diagnostics = schedulerDiagnostics([task], [run]); + assertCondition(diagnostics.activeHeartbeatTaskIds.includes(task.id), "active heartbeat task id must be visible", diagnostics as unknown as Record); + assertCondition(diagnostics.schedulerActiveTaskIds.includes(task.id), "scheduler active task id must be visible", diagnostics as unknown as Record); + assertCondition(diagnostics.lastSchedulerHeartbeatAt === freshAt, "last scheduler heartbeat must be surfaced", diagnostics as unknown as Record); + return { + name: "code-queue:active-run-heartbeat-visible", + ok: true, + detail: { + state: diagnostics.state, + schedulerActiveTaskIds: diagnostics.schedulerActiveTaskIds, + activeHeartbeatTaskIds: diagnostics.activeHeartbeatTaskIds, + lastSchedulerHeartbeatAt: diagnostics.lastSchedulerHeartbeatAt, + heartbeat: task.schedulerHeartbeat, + }, + }; +} + +function checkTraceGapNotStale(): FixtureCheck { + const task = fixtureTask("codex_fixture_trace_gap_1", "running", heartbeat("codex_fixture_trace_gap_1", freshAt, { + lastPersistedTraceAt: oldTraceAt, + outputMaxSeq: 89112, + })); + const decision = staleRecoveryCandidate({ task, localActive: false, now }); + const hasTraceGap = taskHasTraceGapButFreshHeartbeat(task, now); + const diagnostics = schedulerDiagnostics([task], []); + assertCondition(hasTraceGap, "trace gap with fresh owner heartbeat should be classified", { decision, diagnostics }); + assertCondition(decision.allowed === false && decision.reason === "owner-heartbeat-fresh", "fresh heartbeat must block stale retry", { decision }); + assertCondition(diagnostics.traceGapNotStaleTaskIds.includes(task.id), "diagnostics must expose trace gap as not stale", diagnostics as unknown as Record); + assertCondition(!diagnostics.staleRecoveryCandidateTaskIds.includes(task.id), "trace gap must not enter stale recovery candidates", diagnostics as unknown as Record); + return { + name: "code-queue:trace-gap-not-stale", + ok: true, + detail: { + decision, + traceGapNotStaleTaskIds: diagnostics.traceGapNotStaleTaskIds, + staleRecoveryCandidateTaskIds: diagnostics.staleRecoveryCandidateTaskIds, + }, + }; +} + +function checkStaleActiveOwnerExpired(): FixtureCheck { + const task = fixtureTask("codex_fixture_stale_1", "running", heartbeat("codex_fixture_stale_1", expiredAt, { + lastObservedAgentEventAt: expiredAt, + lastPersistedTraceAt: expiredAt, + })); + const decision = staleRecoveryCandidate({ task, localActive: false, now }); + const diagnostics = schedulerDiagnostics([task], []); + assertCondition(decision.allowed === true && decision.reason === "owner-heartbeat-expired", "expired owner heartbeat should be the stale recovery gate", { decision }); + assertCondition(diagnostics.state === "stale-active", "diagnostics must mark stale active only after owner heartbeat expiry", diagnostics as unknown as Record); + assertCondition(diagnostics.staleRecoveryCandidateTaskIds.includes(task.id), "expired owner heartbeat should create a stale candidate", diagnostics as unknown as Record); + return { + name: "code-queue:stale-active-owner-expired", + ok: true, + detail: { + decision, + state: diagnostics.state, + staleRecoveryCandidateTaskIds: diagnostics.staleRecoveryCandidateTaskIds, + }, + }; +} + +function checkControlPlaneSplitBrainDiagnostics(): FixtureCheck { + const task = fixtureTask("codex_fixture_split_1", "running", heartbeat("codex_fixture_split_1", freshAt)); + const diagnostics = buildExecutionDiagnostics({ + now, + controlPlane: "master-code-queue-mgr", + executionStateSource: "postgres-control-plane", + tasks: [task], + activeRuns: [], + activeRunSlotCount: 0, + oaPublisher: null, + }); + assertCondition(diagnostics.state === "split-brain" && diagnostics.splitBrain === true, "master postgres-control-plane must report split-brain when DB active has fresh scheduler heartbeat", diagnostics as unknown as Record); + assertCondition(diagnostics.schedulerActiveRunSlotCount === 0 && diagnostics.databaseActiveTaskCount === 1, "split-brain fixture should preserve the exact control-plane divergence", diagnostics as unknown as Record); + return { + name: "code-queue:control-plane-split-brain-diagnostics", + ok: true, + detail: { + state: diagnostics.state, + splitBrain: diagnostics.splitBrain, + executionStateSource: diagnostics.executionStateSource, + databaseActiveTaskIds: diagnostics.databaseActiveTaskIds, + schedulerActiveRunSlotCount: diagnostics.schedulerActiveRunSlotCount, + heartbeatFreshTaskIds: diagnostics.heartbeatFreshTaskIds, + }, + }; +} + +function checkOaPublisherDegradedVisible(): FixtureCheck { + const oaPublisher = { pending: 3, lastError: "fixture OA publish retry", lastPublishedAt: null }; + const diagnostics = schedulerDiagnostics([], [], oaPublisher); + assertCondition(diagnostics.state === "degraded", "OA publisher pending/lastError must degrade diagnostics", diagnostics as unknown as Record); + assertCondition(diagnostics.oaPublisher === oaPublisher, "OA publisher detail must remain visible", diagnostics as unknown as Record); + return { + name: "code-queue:oa-publisher-degraded-visible", + ok: true, + detail: { + state: diagnostics.state, + degraded: diagnostics.degraded, + oaPublisher: diagnostics.oaPublisher as Record, + }, + }; +} + +export function runCodeQueueLivenessFixtureChecks(only: string[] = []): { ok: boolean; checks: FixtureCheck[]; failures: Array<{ name: string; error: string }> } { + const selected = new Set(only.filter((name) => name.trim().length > 0)); + const runners: Array<[CodeQueueLivenessCheckName, () => FixtureCheck]> = [ + ["code-queue:active-run-heartbeat-visible", checkActiveRunHeartbeatVisible], + ["code-queue:trace-gap-not-stale", checkTraceGapNotStale], + ["code-queue:stale-active-owner-expired", checkStaleActiveOwnerExpired], + ["code-queue:control-plane-split-brain-diagnostics", checkControlPlaneSplitBrainDiagnostics], + ["code-queue:oa-publisher-degraded-visible", checkOaPublisherDegradedVisible], + ]; + const checks: FixtureCheck[] = []; + const failures: Array<{ name: string; error: string }> = []; + for (const [name, run] of runners) { + if (selected.size > 0 && !selected.has(name)) continue; + try { + checks.push(run()); + } catch (error) { + failures.push({ name, error: error instanceof Error ? error.message : String(error) }); + checks.push({ name, ok: false, detail: { error: error instanceof Error ? error.message : String(error) } }); + } + } + if (checks.length === 0) throw new Error(`no Code Queue liveness fixture checks matched: ${Array.from(selected).join(", ")}`); + return { ok: failures.length === 0, checks, failures }; +} diff --git a/scripts/src/code-queue.ts b/scripts/src/code-queue.ts index 0b49510e..ab1c81d2 100644 --- a/scripts/src/code-queue.ts +++ b/scripts/src/code-queue.ts @@ -197,6 +197,52 @@ function compactLastAssistant(value: unknown, full: boolean): Record | null { + const record = asRecord(value); + if (record === null) return null; + return { + taskId: record.taskId ?? null, + attempt: record.attempt ?? null, + owner: record.owner ?? null, + schedulerInstance: record.schedulerInstance ?? null, + agentPort: record.agentPort ?? null, + activeTurnId: record.activeTurnId ?? null, + codexThreadId: record.codexThreadId ?? null, + lastLocalHeartbeatAt: record.lastLocalHeartbeatAt ?? null, + lastObservedAgentEventAt: record.lastObservedAgentEventAt ?? null, + lastPersistedTraceAt: record.lastPersistedTraceAt ?? null, + outputMaxSeq: record.outputMaxSeq ?? null, + }; +} + +function compactExecutionDiagnostics(value: unknown): Record | null { + const record = asRecord(value); + if (record === null) return null; + return { + state: record.state ?? record.health ?? null, + degraded: record.degraded ?? null, + splitBrain: record.splitBrain ?? null, + executionStateSource: record.executionStateSource ?? null, + controlPlane: record.controlPlane ?? null, + databaseActiveTaskCount: record.databaseActiveTaskCount ?? null, + databaseActiveTaskIds: record.databaseActiveTaskIds ?? [], + schedulerActiveRunSlotCount: record.schedulerActiveRunSlotCount ?? null, + schedulerActiveTaskIds: record.schedulerActiveTaskIds ?? [], + activeHeartbeatTaskIds: record.activeHeartbeatTaskIds ?? [], + heartbeatFreshTaskIds: record.heartbeatFreshTaskIds ?? [], + heartbeatExpiredTaskIds: record.heartbeatExpiredTaskIds ?? [], + heartbeatMissingTaskIds: record.heartbeatMissingTaskIds ?? [], + staleRecoveryCandidateTaskIds: record.staleRecoveryCandidateTaskIds ?? [], + traceGapTaskIds: record.traceGapTaskIds ?? [], + traceGapNotStaleTaskIds: record.traceGapNotStaleTaskIds ?? [], + lastSchedulerHeartbeatAt: record.lastSchedulerHeartbeatAt ?? null, + lastObservedAgentEventAt: record.lastObservedAgentEventAt ?? null, + lastPersistedTraceAt: record.lastPersistedTraceAt ?? null, + oaPublisher: record.oaPublisher ?? null, + reasons: record.reasons ?? [], + }; +} + function compactToolSummary(value: unknown, full: boolean): Record { const record = asRecord(value) ?? {}; const items = asArray(record.items).map((item) => { @@ -252,6 +298,7 @@ function compactSummary(summary: unknown, options: CodexTaskOptions, taskId: str codexThreadId: record.codexThreadId ?? null, activeTurnId: record.activeTurnId ?? null, cancelRequested: record.cancelRequested ?? null, + schedulerHeartbeat: compactSchedulerHeartbeat(record.schedulerHeartbeat), }, timing: record.timing ?? null, createdAt: record.createdAt ?? null, @@ -718,7 +765,12 @@ function requireMergeTargetQueueId(args: string[], command: string): string { } function codeQueues(): unknown { - return unwrapCodexResponse(coreInternalFetch(codeQueueProxyPath("/api/queues"))); + const response = unwrapCodexResponse(coreInternalFetch(codeQueueProxyPath("/api/queues"))); + return { + upstream: response.upstream, + queues: response.body.queues ?? [], + queue: compactQueueMutationSummary(response.body.queue), + }; } function codexCreateQueue(queueId: string): unknown { @@ -824,6 +876,7 @@ function compactTaskMutationResponse(task: unknown, options: CompactTaskMutation maxAttempts: record.maxAttempts ?? null, currentAttempt: record.currentAttempt ?? null, cancelRequested: record.cancelRequested ?? null, + schedulerHeartbeat: compactSchedulerHeartbeat(record.schedulerHeartbeat), createdAt: record.createdAt ?? null, startedAt: record.startedAt ?? null, updatedAt: record.updatedAt ?? null, @@ -845,6 +898,10 @@ function compactQueueMutationSummary(value: unknown): Record | return { activeQueueIds: record.activeQueueIds ?? null, activeTaskIds: record.activeTaskIds ?? null, + databaseActiveTaskCount: record.databaseActiveTaskCount ?? null, + databaseActiveTaskIds: record.databaseActiveTaskIds ?? null, + schedulerHeartbeatStaleMs: record.schedulerHeartbeatStaleMs ?? null, + executionDiagnostics: compactExecutionDiagnostics(record.executionDiagnostics), queuedTaskIds: record.queuedTaskIds ?? null, counts: record.counts ?? null, byQueue: Array.isArray(record.byQueue) ? record.byQueue : undefined, diff --git a/scripts/src/e2e.ts b/scripts/src/e2e.ts index 4255b3eb..3323d71c 100644 --- a/scripts/src/e2e.ts +++ b/scripts/src/e2e.ts @@ -3,6 +3,7 @@ import { connect } from "node:net"; import { join } from "node:path"; import { chromium, type Page } from "playwright"; import { createRouteRegistry, MODULES } from "../../src/components/frontend/src/navigation"; +import { CODE_QUEUE_LIVENESS_CHECK_NAMES, runCodeQueueLivenessFixtureChecks } from "./code-queue-liveness-fixtures"; import { runCommand } from "./command"; import { type UniDeskConfig, repoRoot, rootPath } from "./config"; import { boundedJsonDetail } from "./preview"; @@ -166,11 +167,14 @@ const FRONTEND_CHECK_NAMES = [ "frontend:no-console-errors", ] as const; +const CODE_QUEUE_FIXTURE_CHECK_NAMES = [...CODE_QUEUE_LIVENESS_CHECK_NAMES] as const; + const ALL_E2E_CHECK_NAMES = [ ...NETWORK_CHECK_NAMES, ...SERVICE_CHECK_NAMES, ...DATABASE_CHECK_NAMES, ...FRONTEND_CHECK_NAMES, + ...CODE_QUEUE_FIXTURE_CHECK_NAMES, ] as const; function uniqueText(values: string[]): string[] { @@ -552,6 +556,14 @@ function addSelectedCheck(checks: E2ECheck[], options: E2ERunOptions, name: stri addCheck(checks, name, passed, detail); } +function codeQueueFixtureChecks(checks: E2ECheck[], options: E2ERunOptions): void { + const selected = CODE_QUEUE_FIXTURE_CHECK_NAMES.filter((name) => wantsCheck(options, name)); + const result = runCodeQueueLivenessFixtureChecks(selected); + for (const check of result.checks) { + addSelectedCheck(checks, options, check.name, check.ok, check.detail); + } +} + function safeTestId(value: string): string { return value.replace(/[^a-zA-Z0-9_-]/g, "_"); } @@ -3420,8 +3432,14 @@ export async function runE2E( const needDatabase = wantsPrefix(options, "database") || wantsCheck(options, "frontend:task-history-diagnostics"); const needFrontend = wantsPrefix(options, "frontend"); + const needCodeQueueFixtures = wantsAnyCheck(options, [...CODE_QUEUE_FIXTURE_CHECK_NAMES]); const executedSections: string[] = []; + if (needCodeQueueFixtures) { + executedSections.push("code-queue-fixtures"); + codeQueueFixtureChecks(checks, options); + } + if (needNetwork) { executedSections.push("network"); await exposureChecks(config, urls, checks, options); diff --git a/src/components/frontend/public/style.css b/src/components/frontend/public/style.css index a1636554..9b2275e6 100644 --- a/src/components/frontend/public/style.css +++ b/src/components/frontend/public/style.css @@ -2169,6 +2169,81 @@ input:focus, select:focus, textarea:focus { border-color: var(--accent-2); } .codex-trace-status-chip.service { white-space: normal; } +.codex-trace-status-chip.liveness.ok { + border-color: rgba(78, 183, 168, 0.50); + color: var(--accent-2); +} +.codex-trace-status-chip.liveness.warn { + border-color: rgba(215, 161, 58, 0.55); + color: #ffe0a2; +} +.codex-trace-status-chip.liveness.failed { + border-color: rgba(255, 98, 98, 0.58); + color: #ffb2b2; +} +.codex-liveness-panel .panel-body { + display: grid; + gap: 8px; +} +.codex-liveness-grid { + display: grid; + grid-template-columns: repeat(5, minmax(130px, 1fr)); + gap: 8px; + min-width: 0; +} +.codex-liveness-metric { + display: grid; + gap: 4px; + min-width: 0; + padding: 8px; + border: 1px solid rgba(78, 183, 168, 0.22); + background: + linear-gradient(135deg, rgba(78, 183, 168, 0.08), rgba(255,255,255,0.015)), + rgba(0,0,0,0.16); +} +.codex-liveness-metric.warn { + border-color: rgba(215, 161, 58, 0.44); +} +.codex-liveness-metric.failed { + border-color: rgba(255, 98, 98, 0.46); +} +.codex-liveness-metric.ok { + border-color: rgba(78, 183, 168, 0.45); +} +.codex-liveness-metric span { + color: var(--muted); + font-size: 10px; + letter-spacing: 0.12em; + text-transform: uppercase; +} +.codex-liveness-metric strong { + min-width: 0; + color: var(--text); + font-size: 15px; + font-weight: 700; + overflow-wrap: anywhere; +} +.codex-liveness-metric code { + min-width: 0; + color: var(--muted); + font-size: 11px; + overflow-wrap: anywhere; +} +.codex-liveness-reasons { + display: flex; + flex-wrap: wrap; + gap: 5px; + min-width: 0; +} +.codex-liveness-reasons span { + max-width: 100%; + padding: 4px 7px; + border: 1px solid rgba(215, 161, 58, 0.28); + color: #ffe0a2; + background: rgba(215, 161, 58, 0.07); + font-size: 11px; + overflow-wrap: anywhere; +} .codex-mark-all-read-btn { border-color: rgba(78, 183, 168, 0.40); color: #bdece4; @@ -5490,7 +5565,7 @@ input:focus, select:focus, textarea:focus { border-color: var(--accent-2); } } @media (max-width: 1120px) { - .metric-grid, .policy-grid, .security-board, .docker-metrics, .monitor-chart-grid, .monitor-summary-grid, .performance-metric-stack, .codex-load-test-grid, .baidu-doc-grid, .filebrowser-target-grid, .oa-flow-metrics { grid-template-columns: repeat(2, minmax(0, 1fr)); } + .metric-grid, .policy-grid, .security-board, .docker-metrics, .monitor-chart-grid, .monitor-summary-grid, .performance-metric-stack, .codex-load-test-grid, .codex-liveness-grid, .baidu-doc-grid, .filebrowser-target-grid, .oa-flow-metrics { grid-template-columns: repeat(2, minmax(0, 1fr)); } .pipeline-oa-guarantees { grid-template-columns: repeat(2, minmax(0, 1fr)); } .dispatch-form, .schedule-form { grid-template-columns: 1fr 1fr; } .dispatch-actions { align-items: center; } @@ -5680,7 +5755,7 @@ input:focus, select:focus, textarea:focus { border-color: var(--accent-2); } padding: 4px 9px; white-space: nowrap; } - .metric-grid, .policy-grid, .security-board, .dispatch-form, .schedule-form, .schedule-card-grid, .docker-metrics, .monitor-chart-grid, .monitor-summary-grid, .gateway-record-grid, .met-detail-kv, .code-queue-metrics, .codex-stats-summary-grid, .codex-form-grid, .baidu-doc-grid { grid-template-columns: 1fr; } + .metric-grid, .policy-grid, .security-board, .dispatch-form, .schedule-form, .schedule-card-grid, .docker-metrics, .monitor-chart-grid, .monitor-summary-grid, .gateway-record-grid, .met-detail-kv, .code-queue-metrics, .codex-stats-summary-grid, .codex-liveness-grid, .codex-form-grid, .baidu-doc-grid { grid-template-columns: 1fr; } .compact-row, .heartbeat-row, .log-row, .endpoint-list article, .volume-route, .findjob-hero, .pipeline-hero, .code-queue-hero, .claudeqq-login-card, .baidu-login-card, .baidu-pathbar { grid-template-columns: 1fr; align-items: start; } .codex-output-line { grid-template-columns: 1fr; } .codex-transcript { min-height: 360px; } diff --git a/src/components/frontend/src/code-queue.tsx b/src/components/frontend/src/code-queue.tsx index 8b734830..4e0f1162 100644 --- a/src/components/frontend/src/code-queue.tsx +++ b/src/components/frontend/src/code-queue.tsx @@ -219,6 +219,34 @@ function activeTaskIds(queue: any): string[] { return Array.isArray(queue?.activeTaskIds) ? queue.activeTaskIds.map((id: any) => String(id || "")).filter(Boolean) : [String(queue?.activeTaskId || "")].filter(Boolean); } +function stringArray(value: any): string[] { + return Array.isArray(value) ? value.map((item: any) => String(item || "")).filter(Boolean) : []; +} + +function compactIdList(value: any, limit = 3): string { + const ids = stringArray(value); + if (ids.length === 0) return "--"; + const visible = ids.slice(0, limit).join(" / "); + return ids.length > limit ? `${visible} +${ids.length - limit}` : visible; +} + +function executionDiagnosticsFromQueue(queue: any, health: any): AnyRecord { + return objectRecord(queue?.executionDiagnostics) + || objectRecord(health?.body?.queue?.executionDiagnostics) + || objectRecord(health?.queue?.executionDiagnostics) + || objectRecord(health?.body?.executionDiagnostics) + || objectRecord(health?.executionDiagnostics) + || {}; +} + +function diagnosticsTone(state: any): string { + const value = String(state || "unknown").toLowerCase(); + if (value === "healthy") return "ok"; + if (value === "split-brain" || value === "stale-active") return "failed"; + if (value === "degraded") return "warn"; + return "unknown"; +} + const allQueuesId = "__all__"; const queueMobileMediaQuery = "(max-width: 760px)"; const queueDesktopMediaQuery = "(min-width: 761px)"; @@ -1536,6 +1564,48 @@ function CodexStatsIcon() { ); } +function LivenessMetric({ label, value, hint, tone }: AnyRecord) { + return h("div", { className: `codex-liveness-metric ${tone || ""}` }, + h("span", null, label), + h("strong", null, value ?? "--"), + hint ? h("code", null, hint) : null, + ); +} + +function CodeQueueLivenessPanel({ diagnostics, queue, onRaw }: AnyRecord) { + const state = String(diagnostics?.state || diagnostics?.health || "unknown"); + const oaPublisher = objectRecord(diagnostics?.oaPublisher); + const reasons = stringArray(diagnostics?.reasons).slice(0, 3); + const tone = diagnosticsTone(state); + return h(Panel, { + title: "执行活性", + eyebrow: `${String(diagnostics?.executionStateSource || queue?.executionStateSource || "unknown")} / ${String(diagnostics?.controlPlane || "code-queue")}`, + summary: h("div", { className: "codex-trace-status" }, + h("span", { className: `codex-trace-status-chip liveness ${tone}` }, h("b", null, "状态"), state), + h("span", { className: "codex-trace-status-chip" }, h("b", null, "DB active"), String(diagnostics?.databaseActiveTaskCount ?? queue?.databaseActiveTaskCount ?? 0)), + h("span", { className: "codex-trace-status-chip" }, h("b", null, "scheduler slots"), String(diagnostics?.schedulerActiveRunSlotCount ?? queue?.activeRunSlotCount ?? 0)), + h("span", { className: "codex-trace-status-chip" }, h("b", null, "heartbeat"), `${stringArray(diagnostics?.heartbeatFreshTaskIds).length} fresh / ${stringArray(diagnostics?.heartbeatExpiredTaskIds).length} expired`), + oaPublisher ? h("span", { className: "codex-trace-status-chip" }, h("b", null, "OA"), `${Number(oaPublisher.pending || 0)} pending${oaPublisher.lastError ? " / error" : ""}`) : null, + ), + actions: h(RawButton, { title: "Code Queue Execution Diagnostics", data: diagnostics, onOpen: onRaw, testId: "raw-code-queue-execution-diagnostics" }), + className: "codex-liveness-panel", + }, + h("div", { className: "codex-liveness-grid", "data-testid": "codex-liveness-diagnostics" }, + h(LivenessMetric, { tone, label: "健康状态", value: state, hint: diagnostics?.splitBrain ? "split-brain" : diagnostics?.degraded ? "degraded" : "ready" }), + h(LivenessMetric, { label: "PostgreSQL active", value: String(diagnostics?.databaseActiveTaskCount ?? queue?.databaseActiveTaskCount ?? 0), hint: compactIdList(diagnostics?.databaseActiveTaskIds ?? queue?.databaseActiveTaskIds) }), + h(LivenessMetric, { label: "Scheduler active", value: String(diagnostics?.schedulerActiveRunSlotCount ?? queue?.activeRunSlotCount ?? 0), hint: compactIdList(diagnostics?.schedulerActiveTaskIds ?? queue?.activeTaskIds) }), + h(LivenessMetric, { label: "Fresh heartbeat", value: String(stringArray(diagnostics?.heartbeatFreshTaskIds).length), hint: compactIdList(diagnostics?.heartbeatFreshTaskIds) }), + h(LivenessMetric, { tone: stringArray(diagnostics?.traceGapNotStaleTaskIds).length > 0 ? "warn" : "", label: "Trace gap", value: String(stringArray(diagnostics?.traceGapTaskIds).length), hint: compactIdList(diagnostics?.traceGapNotStaleTaskIds) }), + h(LivenessMetric, { tone: stringArray(diagnostics?.staleRecoveryCandidateTaskIds).length > 0 ? "failed" : "", label: "Stale candidates", value: String(stringArray(diagnostics?.staleRecoveryCandidateTaskIds).length), hint: compactIdList(diagnostics?.staleRecoveryCandidateTaskIds) }), + h(LivenessMetric, { label: "Last scheduler heartbeat", value: fmtRelativeAge(diagnostics?.lastSchedulerHeartbeatAt), hint: String(diagnostics?.lastSchedulerHeartbeatAt || "--") }), + h(LivenessMetric, { label: "Last agent event", value: fmtRelativeAge(diagnostics?.lastObservedAgentEventAt), hint: String(diagnostics?.lastObservedAgentEventAt || "--") }), + h(LivenessMetric, { label: "Last trace persist", value: fmtRelativeAge(diagnostics?.lastPersistedTraceAt), hint: String(diagnostics?.lastPersistedTraceAt || "--") }), + h(LivenessMetric, { tone: oaPublisher?.lastError ? "warn" : "", label: "OA publisher", value: `${Number(oaPublisher?.pending || 0)} pending`, hint: oaPublisher?.lastError ? shortText(oaPublisher.lastError, 90) : "ok" }), + ), + reasons.length > 0 ? h("div", { className: "codex-liveness-reasons" }, reasons.map((reason: string) => h("span", { key: reason }, reason))) : null, + ); +} + function CodexStatsPanel({ stats, queueName: activeQueueName, onRaw }: AnyRecord) { const rows = taskStatisticsRows(stats); const totals = taskStatisticsTotals(stats); @@ -2112,6 +2182,7 @@ export function CodeQueuePage({ microservices, onRaw, apiBaseUrl = "/api", initi const tasks = applyLocalReadStateToRows(taskRows(tasksData)); const loadedUnreadTerminalTasks = tasks.filter(taskIsUnreadTerminal); const queue = tasksData?.queue || health?.body?.queue || health?.queue || {}; + const executionDiagnostics = executionDiagnosticsFromQueue(queue, health); const statistics = taskStatistics(tasksData, queue); const pagination = taskPagination(tasksData); const queueRows = knownQueueRows(queue, queueId); @@ -3828,6 +3899,7 @@ export function CodeQueuePage({ microservices, onRaw, apiBaseUrl = "/api", initi h(UniDeskErrorBanner, { error, wide: true }), mergeDialog, h("div", { className: "codex-session-stage codex-session-stage-top" }, + h(CodeQueueLivenessPanel, { diagnostics: executionDiagnostics, queue, onRaw }), sessionPanel, ), h("div", { className: "code-queue-layout" }, diff --git a/src/components/microservices/code-queue-mgr/src-rs/main.rs b/src/components/microservices/code-queue-mgr/src-rs/main.rs index aa40f4f1..b5f407b5 100644 --- a/src/components/microservices/code-queue-mgr/src-rs/main.rs +++ b/src/components/microservices/code-queue-mgr/src-rs/main.rs @@ -782,6 +782,13 @@ fn execution_diagnostics_from_tasks(tasks: &[TaskMeta], now: &str) -> Value { if !trace_gap_not_stale_task_ids.is_empty() { reasons.push("trace progress is stale while scheduler heartbeat is fresh; this is a trace gap, not stale active"); } + let database_active_task_count = database_active_task_ids.len(); + let scheduler_orphaned_active_task_ids = database_active_task_ids.clone(); + let scheduler_orphaned_active_task_count = scheduler_orphaned_active_task_ids.len(); + let active_heartbeat_count = active_heartbeat_task_ids.len(); + let last_scheduler_heartbeat_at = max_timestamp(active_heartbeats.iter().map(|(_, heartbeat)| heartbeat_string_field(heartbeat, "lastLocalHeartbeatAt")).collect()); + let last_observed_agent_event_at = max_timestamp(active_heartbeats.iter().map(|(_, heartbeat)| heartbeat_string_field(heartbeat, "lastObservedAgentEventAt")).collect()); + let last_persisted_trace_at = max_timestamp(active_heartbeats.iter().map(|(_, heartbeat)| heartbeat_string_field(heartbeat, "lastPersistedTraceAt")).collect()); json!({ "state": state, @@ -791,16 +798,16 @@ fn execution_diagnostics_from_tasks(tasks: &[TaskMeta], now: &str) -> Value { "executionStateSource": "postgres-control-plane", "controlPlane": "master-code-queue-mgr", "databaseActiveTaskIds": database_active_task_ids, - "databaseActiveTaskCount": database_active_task_ids.len(), + "databaseActiveTaskCount": database_active_task_count, "schedulerActiveTaskIds": [], "schedulerActiveTaskCount": 0, "schedulerActiveRunSlotCount": 0, "schedulerActiveQueueIds": [], "schedulerProcessingQueueIds": [], - "schedulerOrphanedActiveTaskIds": database_active_task_ids, - "schedulerOrphanedActiveTaskCount": database_active_task_ids.len(), + "schedulerOrphanedActiveTaskIds": scheduler_orphaned_active_task_ids, + "schedulerOrphanedActiveTaskCount": scheduler_orphaned_active_task_count, "activeHeartbeatTaskIds": active_heartbeat_task_ids, - "activeHeartbeatCount": active_heartbeat_task_ids.len(), + "activeHeartbeatCount": active_heartbeat_count, "heartbeatFreshTaskIds": heartbeat_fresh_task_ids, "heartbeatExpiredTaskIds": heartbeat_expired_task_ids, "heartbeatMissingTaskIds": heartbeat_missing_task_ids, @@ -809,9 +816,9 @@ fn execution_diagnostics_from_tasks(tasks: &[TaskMeta], now: &str) -> Value { "traceGapNotStaleTaskIds": trace_gap_not_stale_task_ids, "schedulerHeartbeatStaleMs": SCHEDULER_HEARTBEAT_STALE_MS, "now": now, - "lastSchedulerHeartbeatAt": max_timestamp(active_heartbeats.iter().map(|(_, heartbeat)| heartbeat_string_field(heartbeat, "lastLocalHeartbeatAt")).collect()), - "lastObservedAgentEventAt": max_timestamp(active_heartbeats.iter().map(|(_, heartbeat)| heartbeat_string_field(heartbeat, "lastObservedAgentEventAt")).collect()), - "lastPersistedTraceAt": max_timestamp(active_heartbeats.iter().map(|(_, heartbeat)| heartbeat_string_field(heartbeat, "lastPersistedTraceAt")).collect()), + "lastSchedulerHeartbeatAt": last_scheduler_heartbeat_at, + "lastObservedAgentEventAt": last_observed_agent_event_at, + "lastPersistedTraceAt": last_persisted_trace_at, "oaPublisher": Value::Null, "reasons": reasons, "guidance": [ @@ -1179,6 +1186,7 @@ fn queue_summary(state: &AppState) -> Result { let active_tasks: Vec = active_rows.iter().map(|row| row_to_task(row, true)).collect(); let mut database_active_task_ids: Vec = active_tasks.iter().map(|task| task.id.clone()).collect(); database_active_task_ids.sort(); + let database_active_task_count = database_active_task_ids.len(); let execution_diagnostics = execution_diagnostics_from_tasks(&active_tasks, &now_iso()); if !queues.iter().any(|queue| queue.get("id").and_then(Value::as_str) == Some(DEFAULT_QUEUE_ID)) { let now = now_iso(); @@ -1215,7 +1223,7 @@ fn queue_summary(state: &AppState) -> Result { "activeTaskIds": [], "activeTaskId": Value::Null, "databaseActiveTaskIds": database_active_task_ids, - "databaseActiveTaskCount": database_active_task_ids.len(), + "databaseActiveTaskCount": database_active_task_count, "executionStateSource": "postgres-control-plane", "executionDiagnostics": execution_diagnostics, "schedulerHeartbeatStaleMs": SCHEDULER_HEARTBEAT_STALE_MS, diff --git a/src/components/microservices/code-queue/src/index.ts b/src/components/microservices/code-queue/src/index.ts index 414b6cf0..62d16975 100644 --- a/src/components/microservices/code-queue/src/index.ts +++ b/src/components/microservices/code-queue/src/index.ts @@ -3501,9 +3501,7 @@ function taskIsRecoverableOrphanedActive(task: QueueTask, staleMs = orphanedActi if (task.status !== "running" && task.status !== "judging") return false; if (activeTaskHasLocalRunSlotOrWaiter(task)) return false; const decision = staleRecoveryCandidate({ task, localActive: false, heartbeatStaleMs: staleMs, now: nowIso() }); - if (decision.allowed) return true; - if (decision.reason === "owner-heartbeat-missing") return orphanedActiveTaskAgeMs(task) >= staleMs; - return false; + return decision.allowed; } function schedulerReconcileStatus(tasks: QueueTask[] = state.tasks): JsonValue { diff --git a/src/components/microservices/code-queue/src/self-tests.ts b/src/components/microservices/code-queue/src/self-tests.ts index c24b9339..f658a897 100644 --- a/src/components/microservices/code-queue/src/self-tests.ts +++ b/src/components/microservices/code-queue/src/self-tests.ts @@ -256,6 +256,23 @@ function runQueueOrderingSelfTest(): JsonValue { const orphanRunning = queueOrderTestTask("codex_4400_orphan_running", "running", "2026-05-11T13:00:00.000Z", "2026-05-11T13:00:00.000Z"); orphanRunning.queueId = "queue_orphan_recovery"; orphanRunning.activeTurnId = null; + orphanRunning.schedulerHeartbeat = { + taskId: orphanRunning.id, + queueId: orphanRunning.queueId, + attempt: 1, + activeTurnId: null, + codexThreadId: null, + owner: "self-test", + schedulerInstance: "self-test", + executionPlane: "scheduler-execution-plane", + agentPort: "codex", + status: "running", + lastLocalHeartbeatAt: "2026-05-11T13:00:00.000Z", + lastObservedAgentEventAt: null, + lastPersistedTraceAt: null, + outputMaxSeq: 0, + source: "scheduler", + }; const queuedBehindOrphan = queueOrderTestTask("codex_4401_queued", "queued", "2026-05-11T13:01:00.000Z", "2026-05-11T13:01:00.000Z"); queuedBehindOrphan.queueId = "queue_orphan_recovery"; const originalMaxActiveQueues = ctx().config.maxActiveQueues;