fix: stabilize code queue proxy and egress

This commit is contained in:
Codex
2026-05-15 04:52:16 +00:00
parent a7e9ecc32f
commit 89fe767623
11 changed files with 426 additions and 60 deletions
+1 -1
View File
@@ -516,7 +516,7 @@
"containerName": "code-queue-backend"
},
"backend": {
"nodeBaseUrl": "http://host.docker.internal:4222",
"nodeBaseUrl": "http://code-queue:4222",
"nodeBindHost": "127.0.0.1",
"nodePort": 4222,
"proxyMode": "provider-gateway-http",
+9 -7
View File
@@ -9,7 +9,7 @@ UniDesk 用户服务是挂载到 UniDesk 核心服务上的、面向用户使用
- 用户服务后端端口默认只绑定计算节点本机地址,例如 `127.0.0.1:<port>`,不得直接暴露公网。
- 浏览器只访问 UniDesk frontendfrontend 通过同源 `/api/microservices/*` 代理到 backend-corebackend-core 再通过目标 provider-gateway 的 `microservice.http` 能力访问计算节点本机后端。
- backend-core REST API、database 和计算节点用户服务后端都不得新增公网端口;公网入口仍只有 frontend 和 provider ingress。
- `microservice.http` 只允许 provider-gateway 访问 `http://127.0.0.1``http://localhost``http://host.docker.internal` 这类节点本地地址;主 server 内置用户服务可使用同一 Compose 网络内的显式服务名,例如 `todo-note:4211`D601 Code Queue 等计算节点服务必须使用节点本机映射。backend-core 还必须用 `allowedPathPrefixes``allowedMethods` 同时限制可代理路径和 HTTP 方法。
- `microservice.http` 只允许 provider-gateway 访问 `http://127.0.0.1``http://localhost``http://host.docker.internal` 这类节点本地地址,或明确登记为同一私有 Docker network 内的服务名;主 server 内置用户服务可使用同一 Compose 网络内的显式服务名,例如 `todo-note:4211`D601 Code Queue 必须使用 provider-gateway network 内的 `code-queue:4222`。backend-core 还必须用 `allowedPathPrefixes``allowedMethods` 同时限制可代理路径和 HTTP 方法。
## Config Contract
@@ -126,17 +126,18 @@ Baidu Netdisk 在 UniDesk 语境中按纯后端服务管理:不得暴露百度
当前 Code Queue 作为 `id=code-queue` 的用户服务登记在 `config.json`,长期部署在 D601,并接入统一 `oa-event-flow` 发布 Trace/STEP 事实事件与读取统计中心:
- Provider`D601`,由 D601 provider-gateway 通过 `microservice.http` 访问 D601 本机映射 `http://host.docker.internal:4222`;容器对 D601 host 绑定 `127.0.0.1:4222`,不得直接暴露公网
- Provider`D601`,由 D601 provider-gateway 通过 `microservice.http` 访问同一 provider-gateway Docker network 内的 `http://code-queue:4222``code-queue-backend` 仍可对 D601 host 绑定 `127.0.0.1:4222` 作为节点本地维护入口,但 UniDesk 正式代理链路不得依赖 `host.docker.internal:4222`,避免 Linux/Docker Desktop loopback 绑定差异导致 frontend 502
- 代码引用:`https://github.com/pikasTech/unidesk` 与配置中的 `repository.commitId`;服务源码位于 `src/components/microservices/code-queue`,属于 UniDesk 自有控制面组件。
- 部署引用:UniDesk 仓库中的 `src/components/microservices/code-queue/docker-compose.d601.yml`Dockerfile 为 `src/components/microservices/code-queue/Dockerfile`,容器名为 `code-queue-backend`;主 server 根目录 `docker-compose.yml` 不再包含 `code-queue` service。
- 主服务依赖映射:D601 Code Queue 仍以主 PostgreSQL 为权威数据库,`DATABASE_URL` 必须指向主 server 受限端口映射;`OA_EVENT_FLOW_BASE_URL` 必须指向主 server OA Event Flow 受限端口映射;`CODE_QUEUE_NOTIFY_CLAUDEQQ_BASE_URL` 在 D601 上直接使用本机 ClaudeQQ 映射 `http://host.docker.internal:3290`。这些端口映射只服务 D601 运行时,必须用防火墙或等价策略限制来源,不得成为浏览器或任意公网客户端入口。
- 默认出网代理:D601 `code-queue-backend` 必须默认把 `HTTP_PROXY``HTTPS_PROXY``ALL_PROXY` 注入给 Codex/OpenCode、`git``curl``npm` 等任务子进程;代理上游必须是 D601 provider-gateway 暴露在 provider-gateway Docker 网络内的 egress HTTP CONNECT 端口,而不是 Code Queue 自建伪 provider WebSocket 或交互 shell 临时 `export`。Code Queue Compose 必须加入 provider-gateway 网络,并通过 `CODE_QUEUE_EGRESS_PROXY_URL` 指向 `http://unidesk-provider-gateway-D601:18789`provider-gateway 再复用已注册的 provider WebSocket 通道,把 TCP open/data/close 消息转发给主 server backend-core 出网,不依赖 D601 本地直连公网。`NO_PROXY` 必须覆盖 `localhost``127.0.0.1``host.docker.internal`、provider-gateway 容器名、主 server 地址和 UniDesk 内部服务名,避免 PostgreSQL、OA Event Flow、ClaudeQQ、microservice health 等内网链路绕远或递归进入代理;`/health` 必须暴露 egress proxy 的 `enabled``connected``proxyUrl``channel=provider-gateway` 和上游 provider-gateway health,作为 Codex 网络卡死排障的第一证据。
- 默认出网代理:D601 `code-queue-backend` 必须默认把 `HTTP_PROXY``HTTPS_PROXY``ALL_PROXY` 注入给 Codex/OpenCode、`git``curl``npm` 等任务子进程;代理上游必须是 D601 provider-gateway 暴露在 provider-gateway Docker 网络内的 egress HTTP CONNECT 端口,而不是 Code Queue 自建伪 provider WebSocket 或交互 shell 临时 `export`。Code Queue Compose 必须加入 provider-gateway 网络,并通过 `CODE_QUEUE_EGRESS_PROXY_URL` 指向 `http://unidesk-provider-gateway-D601:18789`provider-gateway 再复用已注册的 provider WebSocket 通道,把 TCP open/data/close 消息转发给主 server backend-core 出网,不依赖 D601 本地直连公网。`NO_PROXY` 必须覆盖 `localhost``127.0.0.1``host.docker.internal`、provider-gateway 容器名、主 server 地址和 UniDesk 内部服务名,避免 PostgreSQL、OA Event Flow、ClaudeQQ、microservice health 等内网链路绕远或递归进入代理;`/health` 必须暴露 egress proxy 的 `enabled``connected``proxyUrl``channel=provider-gateway` 和上游 provider-gateway health,作为 Codex 网络卡死排障的第一证据。远程开发/执行容器不得只依赖这些环境变量,必须在容器网络层用 TUN 默认路由和 OUTPUT 防火墙强制外网流量只能经 master TUN 出口。
- 出网代理无 fallback 纪律:Code Queue 的运行时配置只允许一个默认出网路径,即 provider-gateway egress proxy;不得在代码中同时保留 Code Queue 自建 WebSocket proxy、临时 shell proxy、D601 本地直连公网、主 server direct HTTP proxy 等隐式分支。Compose 层必须显式设置大小写 `HTTP_PROXY``HTTPS_PROXY``ALL_PROXY``NO_PROXY`,服务启动后再把同一组变量写入 `process.env`,确保 service 自检、Codex/OpenCode app-server、任务 shell、`git``curl``npm` 使用一致路径。任何新增网络 fallback 都必须先进入本参考文档并配套 `/health` 可见状态,否则视为残留旧路径。
- 上线纪律:Code Queue 相关的前端或后端改进必须在同一任务内正式上线并验证公网 frontend 或 live API,不能只停留在源码、构建产物或“后续再上线”。重建 `frontend` 只替换无状态 WebUI 容器,不会触碰 D601 `code-queue-backend`、PostgreSQL 队列或运行中 Codex thread,不能以“可能影响长期任务”为由延迟前端上线;`code-queue-backend` 本身带有 restart-recovery,允许按 D601 Compose 重启/替换,停止、重启或重建后必须从持久化状态恢复运行中和排队任务。修改 Code Queue 自身时不得等待当前 Code Queue task 结束、等待 queue idle 或等待 `0 running` 后才重启;这会等待自己退出形成自锁。应直接执行 D601 `docker compose -f src/components/microservices/code-queue/docker-compose.d601.yml up -d --build --force-recreate code-queue` 或等价 build-first 路径,并用恢复后的 live API 或公网 frontend 证明任务和队列仍可读可继续。
- 更名与灾备恢复:旧版 Codex 队列服务名只允许作为兼容诊断和一次性迁移来源;`code-queue-backend` 容器自身 `/health` 正常但 `microservice health code-queue` 返回 `microservice not found`、或服务目录仍只出现旧服务 ID 时,优先判定为 backend-core 仍加载旧 `MICROSERVICES_JSON`,必须刷新 `.state/docker-compose.env` 并显式重建/重建替换 `backend-core`,随后用 `microservice list` 验证 `id=code-queue``nodeBaseUrl=http://host.docker.internal:4222` 和容器摘要。若更名后 `unidesk_code_queue_*` 为空而历史 `unidesk_codex_queue_*` 表仍有队列数据,恢复前必须先停止 `code-queue-backend`,备份 `.state/code-queue` 与当前 `unidesk_code_queue_*` 表,再把历史本地状态目录合并到 `.state/code-queue/`,并用 `docker exec -i unidesk-database psql ...` 这类保持 stdin 的方式把 `unidesk_codex_queue_tasks``unidesk_codex_queue_queues``unidesk_codex_queue_notifications` 迁移到对应 `unidesk_code_queue_*` 表;不得在确认 `/api/tasks``/api/queues` 和 output archive 可读前删除历史本地状态目录或旧 PostgreSQL 表。迁移完成后只允许在 D601 用 `docker compose -f src/components/microservices/code-queue/docker-compose.d601.yml up -d --build code-queue` 启动目标服务;禁止再通过主 server Compose 启动旧 `code-queue` service。
- 上线纪律:Code Queue 相关的前端或后端改进必须在同一任务内正式上线并验证公网 frontend 或 live API,不能只停留在源码、构建产物或“后续再上线”。重建 `frontend` 只替换无状态 WebUI 容器,不会触碰 D601 `code-queue-backend`、PostgreSQL 队列或运行中 Codex thread,不能以“可能影响长期任务”为由延迟前端上线;`code-queue-backend` 本身带有 restart-recovery,允许按 D601 Compose 重启/替换,停止、重启或重建后必须从持久化状态恢复运行中和排队任务。修改 Code Queue 自身时不得等待当前 Code Queue task 结束、等待 queue idle 或等待 `0 running` 后才重启;这会等待自己退出形成自锁。应通过 D601 上的 `~/cq-deploy` symlink 执行 `cd ~/cq-deploy && CODE_QUEUE_ENV_FILE=.state/code-queue-d601.env docker compose -f src/components/microservices/code-queue/docker-compose.d601.yml up -d --build --force-recreate code-queue` 或等价 build-first 路径,并用恢复后的 live API 或公网 frontend 证明任务和队列仍可读可继续;不要在 provider-gateway Host SSH 命令中使用 `/home/ubuntu/unidesk-code-queue-deploy` 全路径触发 provider-gateway 自保护误判
- 更名与灾备恢复:旧版 Codex 队列服务名只允许作为兼容诊断和一次性迁移来源;`code-queue-backend` 容器自身 `/health` 正常但 `microservice health code-queue` 返回 `microservice not found`、或服务目录仍只出现旧服务 ID 时,优先判定为 backend-core 仍加载旧 `MICROSERVICES_JSON`,必须刷新 `.state/docker-compose.env` 并显式重建/重建替换 `backend-core`,随后用 `microservice list` 验证 `id=code-queue``nodeBaseUrl=http://code-queue:4222` 和容器摘要。若更名后 `unidesk_code_queue_*` 为空而历史 `unidesk_codex_queue_*` 表仍有队列数据,恢复前必须先停止 `code-queue-backend`,备份 `.state/code-queue` 与当前 `unidesk_code_queue_*` 表,再把历史本地状态目录合并到 `.state/code-queue/`,并用 `docker exec -i unidesk-database psql ...` 这类保持 stdin 的方式把 `unidesk_codex_queue_tasks``unidesk_codex_queue_queues``unidesk_codex_queue_notifications` 迁移到对应 `unidesk_code_queue_*` 表;不得在确认 `/api/tasks``/api/queues` 和 output archive 可读前删除历史本地状态目录或旧 PostgreSQL 表。迁移完成后只允许在 D601 用 `docker compose -f src/components/microservices/code-queue/docker-compose.d601.yml up -d --build code-queue` 启动目标服务;禁止再通过主 server Compose 启动旧 `code-queue` service。
- Codex 认证:容器只从 D601 的 `/home/ubuntu/.codex/config.toml` 同步 Codex provider 配置到 D601 `.state/code-queue/codex-home`,并通过 D601 `.state/code-queue-d601.env` 透传 `OPENAI_API_KEY``CRS_OAI_KEY` 等 provider 所需变量;这些 provider 环境变量不得写入仓库,必须由 D601 Compose env-file 注入,确保容器重建和重启后不会丢失认证。新增 provider 的 `env_key` 时必须增加同类运行时透传和 Compose env 持久化,禁止把 Codex 或 MiniMax 密钥写入仓库文件。Code Queue 容器必须只读挂载 D601 host 的 SSH 目录到 `/root/.ssh`(默认 `/home/ubuntu/.ssh`),让容器内 `git push``ssh -T git@github.com` 与 host 使用同一套 GitHub SSH key/known_hosts;不得把私钥复制进镜像或仓库。
- Develop-ready 镜像:Code Queue 镜像必须在启动前预装 UniDesk/Pipeline 调试所需工具,至少包含 `codex``bun``node``npm`/`npx``git``rg``curl``python3`/`pip3``docker``docker compose``docker-compose``jq``ssh``rsync``make``gcc`/`g++``tar``gzip``unzip`;不得依赖 Codex 任务运行时再 `apt-get install` 这些基础环境。
- 远程开发容器与任务执行 ProviderCode Queue 必须能通过 live API 拉起 D601 等计算节点上的开发容器,入口为 `POST /api/dev-containers/<providerId>/start`,默认 Provider 为 `D601`。该流程由 Code Queue 调用 UniDesk `ssh <providerId>` 维护桥在目标节点创建 `unidesk-codex-dev-<providerId>`,并在 Code Queue 所在节点与开发容器之间建立 `ssh -w` TUN 点对点链路;服务所在节点负责对开发容器的 TUN 源地址做 NAT/MASQUERADE,开发容器默认路由和 DNS 改走该 TUN,从而让 `ping google.com`、DNS、HTTP(S) 等出网都经主 server 全局代理,而不是依赖 D601 本地网络。提交 Code Queue 任务时必须支持选择执行 Provider:`D601` 在 D601 `code-queue-backend` 容器中本机执行,默认工作目录为 `/workspace`;其他 Provider 在对应 `unidesk-codex-dev-<providerId>` 容器中执行,默认工作目录为 `/home/ubuntu`,可按任务覆盖 `cwd`。远程任务启动前必须自动复用或拉起该 Provider 的开发容器、同步 Codex 配置和允许的运行时 provider 环境变量,并通过同一 master TUN/NAT 链路出网;目标 host 存在 `/mnt` 时,开发容器必须挂载 host `/mnt:/mnt`,确保 D601 这类 WSL 节点的 Windows 盘符路径如 `/mnt/f/Work/ConStart` 在任务容器内可见,避免 agent 因缺少真实工作区而搜索到无关项目。TUN 建立必须幂等处理 stale 状态:启动前清理旧 `tun<id>`、默认路由旧 tunnel SSH 进程,缺失旧设备不能导致失败,冷启动运行时准备要有有界但足够的 timeout。验收必须保留三类日志:容器直连 `google.com` 在建隧道前失败、容器建隧道后 `ping google.com` 成功、服务所在节点上对应 `UNIDESK-CODEX-DEV-<providerId>` NAT 链或 `tun<id>` 计数在 ping 前后增长;涉及 WSL 工作区任务时还必须在开发容器内验证目标 `/mnt/...` 路径可读。开发容器代理密钥只生成到 `.state/code-queue/dev-proxy/` 与目标节点用户目录,不得提交到仓库。
- Develop-ready 镜像:Code Queue 镜像必须在启动前预装 UniDesk/Pipeline 调试所需工具,至少包含 `codex``bun``node``npm`/`npx``git``rg``curl``python3`/`pip3``docker``docker compose``docker-compose``jq``ssh``rsync``make``gcc`/`g++``iptables``tar``gzip``unzip`;不得依赖 Codex 任务运行时再 `apt-get install` 这些基础环境。
- 远程开发容器与任务执行 ProviderCode Queue 必须能通过 live API 拉起 D601 等计算节点上的开发容器,入口为 `POST /api/dev-containers/<providerId>/start`,默认 Provider 为 `D601`。该流程由 Code Queue 调用 UniDesk `ssh <providerId>` 维护桥在目标节点创建 `unidesk-codex-dev-<providerId>`,并在 Code Queue 所在节点与开发容器之间建立 `ssh -w` TUN 点对点链路;服务所在节点负责对开发容器的 TUN 源地址做 NAT/MASQUERADE,开发容器默认路由和 DNS 改走该 TUN,从而让 `ping google.com`、DNS、HTTP(S) 等出网都经主 server 全局代理,而不是依赖 D601 本地网络。提交 Code Queue 任务时必须支持选择执行 Provider:`D601` 在 D601 `code-queue-backend` 容器中本机执行,默认工作目录为 `/workspace`;其他 Provider 在对应 `unidesk-codex-dev-<providerId>` 容器中执行,默认工作目录为 `/home/ubuntu`,可按任务覆盖 `cwd`。远程任务启动前必须自动复用或拉起该 Provider 的开发容器、同步 Codex 配置和允许的运行时 provider 环境变量,并通过同一 master TUN/NAT 链路出网;目标 host 存在 `/mnt` 时,开发容器必须挂载 host `/mnt:/mnt`,确保 D601 这类 WSL 节点的 Windows 盘符路径如 `/mnt/f/Work/ConStart` 在任务容器内可见,避免 agent 因缺少真实工作区而搜索到无关项目。TUN 建立必须幂等处理 stale 状态:启动前清理旧 `tun<id>`、默认路由旧 tunnel SSH 进程和旧 OUTPUT 跳转,缺失旧设备不能导致失败,冷启动运行时准备要有有界但足够的 timeout。TUN 建立后必须创建 `UD-CQ-EGRESS-<provider>` OUTPUT 链,规则只允许 loopback、既有连接、`tun<id>` 出口以及到 master server 的 SSH tunnel 控制连接,随后 reject 其他 IPv4/IPv6 出站包;这条网络层封口是开发/执行容器的权威外网边界,不能用 `HTTP_PROXY`/`NO_PROXY` 环境变量替代,容器镜像也必须使用已解析出的唯一 `unidesk-code-queue:<provider>` 或显式 `image`,缺失时直接失败,禁止 provider-gateway image、`latest` 或其他隐式镜像 fallback。验收必须保留三类日志:容器建隧道后 `ping google.com` 成功、强制指定原 Docker 网卡直连外网被 `sealed_direct_ping=blocked_expected` 拦截、服务所在节点上对应 `UNIDESK-CODEX-DEV-<providerId>` NAT 链或 `tun<id>` 计数在 ping 前后增长;涉及 WSL 工作区任务时还必须在开发容器内验证目标 `/mnt/...` 路径可读。`GET /api/dev-containers/<providerId>/status` 必须展示默认路由、`route_8_8_8_8``egressFirewallChain` 和 OUTPUT 链跳转。开发容器代理密钥只生成到 `.state/code-queue/dev-proxy/` 与目标节点用户目录,不得提交到仓库。
- 远程维护桥调用:Code Queue 已迁移到 D601 后,`code-queue-backend` 容器内没有主 server 的 `unidesk-backend-core` 容器,不能再把 `bun scripts/cli.ts ssh ...` 实现为本地 `docker exec unidesk-backend-core`。Code Queue 后端发起的 provider 维护命令必须通过主 server frontend `/api/dispatch` 进入 backend-core,再由目标 provider-gateway 执行 `host.ssh`;需要传递脚本时必须使用 base64 临时文件,超过 Host SSH 单命令长度上限时分块上传到目标 `/tmp` 后再执行,避免恢复到本地 Docker broker、交互 stdin 或手工 shell fallback。
- 远程 Provider 准备不得阻塞控制面:Code Queue 在请求处理、队列调度、远程开发容器准备、Host SSH/WSL SSH 透传、Codex/OpenCode 启动和日志导出路径中,禁止使用会长时间占用 Bun event loop 的同步子进程调用,例如针对远程 Provider 的 `spawnSync``execSync``execFileSync`。远程命令必须通过异步子进程执行,带显式 timeout、超时 kill、stdout/stderr 上限和任务 output 进度记录;远程准备失败只能让对应任务进入失败或 retry,不能让 `POST /api/tasks`、SSE `/api/events``/health`、overview 或前端 direct proxy 等控制面请求等待远程 SSH 结束。凡是改动 D601/远程 Provider 准备、`api/dev-containers/*`、任务入队启动或 `runCodeQueueSsh` 等路径,验收必须在一个远程 SSH/status/start 探针运行期间并发验证容器直连 `/health``/api/tasks/overview` 仍能在 1s 内返回,证明远程超时不会复发为全站刷新卡死。
- OpenCode 远程执行:`minimax-m2.7` 走 OpenCode JSON event port 时,本地和远程命令都必须显式执行 `opencode run ...`;远程 Docker exec 不得退化成 `exec run ...`,否则会在目标容器内变成 `bash: exec: run: not found`。OpenCode JSON stream 的终态判定以“当前进程退出码 + 当前 attempt 的最终 assistant response”为准:`exit=0` 且当前 attempt 产生非空最终回复时,即使上游没有发 `step_finish` 事件,也应视为正常 terminal;非零退出、无当前最终回复或传输关闭才进入 retry。每个 attempt 的 `finalResponse` 必须只来自当前 OpenCode/Codex turn,禁止在当前 turn 未产出最终回复时回退复用 task 上一次 `finalResponse`,否则会把旧任务内容误判为本轮完成。
- Codex 控制:服务内部启动 `codex app-server --listen stdio://`,用 JSON-RPC 调用 `thread/start``turn/start``turn/steer``turn/interrupt`,并监听 `turn/completed`、assistant delta、reasoning delta、command output delta、file diff delta 等通知生成前端可轮询的 transcript。
@@ -147,6 +148,7 @@ Baidu Netdisk 在 UniDesk 语境中按纯后端服务管理:不得暴露百度
- 内存优化过程与防回归:Code Queue 已迁移到 D601,但内存治理仍必须按“PostgreSQL 权威源优先、进程热状态最小化、容器硬上限兜底”的顺序设计。长期可复用的优化路径是:先确认任务、queue、readAt、promptHistory、active session 和通知 outbox 均可从 PostgreSQL 恢复;再把历史任务列表、详情、统计、Trace/output 和 `/health` 的只读查询改为 PostgreSQL 直读或聚合查询;随后只把 `queued``running``judging``retry_wait` 等调度必需任务载入 Bun 堆,并在 PostgreSQL 查询侧裁剪 hot `output`/`events`;最后用 dirty-only flush、append-only 输出归档、Codex SQLite 小批量导出、`bun --smol``mem_limit=600m``memswap_limit=1536m``NODE_OPTIONS=--max-old-space-size=768` 和 cgroup memory watchdog 作为运行时防线。PostgreSQL 到进程的单次读取足够快,不能为了减少 SQL 查询把全部历史 `task_json`、Trace、output 或统计摘要常驻内存;任何新增缓存都必须有默认较小的环境变量上限、明确淘汰策略、可从 PostgreSQL 或 append-only 归档重建,且不得影响重启恢复。新增或修改 `/api/tasks`、overview、stats、summary、transcript、output、trace、health、flush、scheduler 和通知路径时,禁止在常规请求中调用会物化全量历史任务 JSON 的代码,禁止启动后无条件重写全量历史 task JSON,禁止用未设上限的 `Map`/数组保存历史 output/event/Trace`CODE_QUEUE_MAX_ACTIVE_QUEUES=0` 表示不按 queue 数量设置全局排队上限;如显式设置为正数,必须同时说明内存预算并补充内存压测验收。memory watchdog 必须以 cgroup working set 为主要判断,且在 swap 仍有余量时不得提前杀掉唯一 active run;否则 TypeScript/Playwright 这类短时高内存验证会被错误中断并让 retry 队列反复震荡。
- 列表/详情延迟优化原则:Code Queue 控制面交互的长期目标是常规历史规模下首屏、`GET /api/tasks/overview``POST /api/tasks/<id>/read` 和分页加载均在 1s 内完成;性能面板出现十几秒级 `code_queue_direct_proxy``core_proxy` 慢操作时,必须优先按后端查询形态和前后端通信策略定位,不能把问题归因于 React 渲染后只改 UI。后端优化顺序是:先为 queue、status、updated/created 时间、readAt/terminal unread 和常用筛选条件补齐 PostgreSQL 索引;再用 SQL `COUNT``GROUP BY`、条件聚合和分页 ID 查询生成 queue/status/stats/unread 摘要;随后按 ID 轻量加载当前页、selected、active 和 unread priority task,禁止为了列表或已读操作解析完整 Trace、output archive、Codex transcript 或物化全量历史 `task_json``read`/`read-all` 这类 mutation 必须是 SQL-only 更新并返回最小 patch/queue 计数,不能触发 overview 全量重算或重载所有任务;启动 warm 只能预热小体积聚合和索引路径,不得把历史任务作为常驻缓存。允许 frontend/backend 代理使用秒级、严格有界、mutation 自动失效的 overview micro-cache 来吸收重复刷新,但 cache 只能作为抖动保护,不能替代数据库索引、聚合查询和分页披露,也不能让 stale readAt/queue/status 状态跨设备可见。
- Trace/实时输出热路径防回归:Code Queue 的 `appendOutput`、output archive append、`publishTaskEvent`、SSE `/api/events`、任务列表、overview、task meta 和 `/health` 都属于热路径,必须保持 O(1) 或明确小常数上界;这些路径不得同步调用完整 transcript 构建器、`taskFullOutput`、output archive 全量读取、Codex session/log 文件解析、完整 `task_json` 物化或任何会随历史输出长度增长的统计。输出追加时必须增量维护轻量持久化指标,至少包括 `stepCount``llmStepCount``outputMaxSeq` 或等价字段;列表、overview、meta、SSE 事件和 `/health` 只能读取这些指标或小体积 SQL 聚合。完整 Trace、`trace-summary``trace-steps``trace-step`、transcript/output 详情允许在显式详情请求中解析归档,但必须分页或有界、使用短 TTL 或容量受限缓存,并在 archive append 后失效。若 frontend 性能面板出现 Code Queue direct proxy 502、`/api/tasks/overview`/trace 接口成批超时,或容器内 `/health` 在 active output 持续追加时也卡住,优先按 Bun event-loop starvation/backpressure 排查,而不是先改 React 渲染;修复必须证明热路径不再随 output/archive 历史线性增长。
- Trace STEP 权威来源:`GET /api/tasks/<id>/trace-steps``GET /api/tasks/<id>/trace-step` 必须直接从 `oa-event-flow` 读取 `trace-step-created` 事实事件,并在响应中暴露 `source=oa-event-flow`;不得在 OA 事件缺失、读取失败或本地 transcript 数量更多/更少时静默回退到 `task-transcript`、Codex session JSONL、output archive 或内存热状态。OA 事件不可用应显式失败或返回空事实集,避免 STEP 计数和执行过程摘要重新形成双路径分叉。
- 完成判定:app-server `turn/completed``turn.status=completed|interrupted|failed` 只代表 Codex turn 已结束;即使 `completed` 也必须把原始任务、当前 attempt 的 assistant 最终回复、command/file-change 事件、stderr tail 和 current attempt events 组成 execution record 交给 judge 判断是否真的完成。MiniMax judge 输入必须做有界压缩,保留终态、最终回复、关键错误/命令/部署证据和摘要计数,避免长 transcript 让 MiniMax 请求超时;默认 `UNIDESK_CODE_QUEUE_MINIMAX_JUDGE_TIMEOUT_MS=90000`。MiniMax judge 之前和之后都必须保留少量协议级硬门禁:明确用户 interrupt 判为 fail;当前 attempt `terminalStatus=failed|null`、传输在终态前关闭、或当前 attempt 最终回复为空时判为 retry;这些门禁只保护“本轮 turn 是否可被验收”的事实,不得发明业务实现要求。协议门禁通过后,配置了 `UNIDESK_CODE_QUEUE_MINIMAX_API_KEY` 且 MiniMax 可用时,MiniMax `MiniMax-M2.7` 对业务是否 `complete|retry|fail` 的判定是权威结果;当且仅当 MiniMax LLM 调用失效(未配置、额度/限流/网络/超时不可用、JSON 去噪与 repair 全部耗尽、或返回超预算反馈且修复耗尽)时,才允许启用非 LLM/fallback 判断。MiniMax 返回必须先做 JSON 去噪,支持去除 Markdown fence、`json` 标签和从夹杂文本中提取平衡 JSON object;如果去噪后仍无法解析,服务必须把解析错误和上一轮去噪前原始回答反馈给 MiniMax 做 JSON repair 重试,重试次数由 `UNIDESK_CODE_QUEUE_MINIMAX_JUDGE_REPAIR_ATTEMPTS` 控制,默认 `2`,耗尽后才进入 fallback,并在 fallback 原因、task JSON、attempt summary 和 TraceView 中保留 MiniMax 失败阶段、是否超时、耗时、prompt/payload 大小、HTTP 状态、错误名和响应预览。
- Judge 权威边界:MiniMax 成功返回可解析、预算内的 judge JSON 后,Code Queue 不得用旧 attempt 的 429/exceeded retry limit 证据、历史 output 字符串、面向特定任务的正则或 `hardCompletionBlockers`/`retryRequiredReasons` 覆盖一次协议有效的完成判定;尤其不能因为 attempt 1 的限流中断仍在历史输出里,就禁止 MiniMax 把 attempt 2 的正常完成判为 `complete`。允许的本地 safety override 必须限定为协议事实和系统交付纪律:用户显式打断、当前 attempt 未正常终止、当前 attempt 没有最终 assistant response、最终回复停在并发文件确认而非交付、或 runtime/UI/service 变更承认未部署验证。所有 override 都必须写入 `_safetyOverride`、生成紧凑 continuation prompt,并由自测或 judge probe 覆盖;不得把业务猜测伪装成本地硬门禁。
- Retry/推进语义:`retry` 不是新开一个独立任务或完全新 session;只要已有 `codexThreadId`,服务必须 `thread/resume` 原 thread 并 append 一个继续执行 prompt。continuation/judge feedback prompt 只应携带本轮缺口、恢复原因、验收要求和有界原始任务摘要,禁止重新注入完整引用上下文、历史 transcript 或长 JSON;服务重启恢复类 feedback 尤其必须保持短 prompt,依赖现有 thread 上文继续。超长 prompt 必须在 prompt 合成源头解决:每个 feedback/recovery/judge 生成器都要从结构化字段选择必要信息、去重合并缺口并提供按需查询入口,禁止先合成超长 prompt 再在末端用 substring/safePreview 一刀切硬截断;硬截断会静默丢失验收信息,风险高于长 prompt 本身。若 MiniMax `continuePrompt` 超出预算,必须要求 MiniMax 基于原始 judge 输入重新合成紧凑反馈,repair 耗尽后才可进入 fallback;不得把已生成的长 prompt 截尾后发送给 Codex。若 MiniMax 成功返回了预算内 `continuePrompt`,必须原样使用该反馈,不得再用 71-Freq、`period_sum/mpu_read_num``mpu_read_num`、历史限流中断等字符串识别把它覆盖成“简洁原始需求 continuation”。只有 judge 判定 `complete` 后,队列 worker 才把当前任务标为成功并推进下一个 queued/retry_wait 任务。非 LLM/fallback 判定产生的 `retry` 最多累计 `3` 次;达到上限后当前任务必须转为 `failed` 并记录原因,worker 继续推进后续 queued/retry_wait 任务,避免 fallback safety override 或硬编码判断造成无限循环。
@@ -16,6 +16,7 @@ RUN apt-get update \
git \
gzip \
iproute2 \
iptables \
iputils-ping \
jq \
make \
@@ -8,5 +8,10 @@
},
"dependencies": {
"postgres": "latest"
},
"devDependencies": {
"@types/bun": "latest",
"@types/node": "latest",
"typescript": "latest"
}
}
@@ -82,10 +82,13 @@ async function startDevContainerPlan(plan: DevContainerPlan, options: {
const before = await run("main-server", ctx().masterProxyEvidenceScript(plan), 15_000, "master-proxy-evidence-before-ping");
const ping = await run(plan.providerId, ctx().devContainerPingScript(plan), 20_000, "remote-container-ping-google");
const after = await run("main-server", ctx().masterProxyEvidenceScript(plan), 15_000, "master-proxy-evidence-after-ping");
verification.pingGoogleOk = ping.exitCode === 0 && /0% packet loss|1 packets received|1 received/iu.test(ping.stdout);
verification.pingGoogleOk = ping.exitCode === 0 && /0% packet loss|1 packets received|1 received|bytes from/iu.test(ping.stdout);
verification.directPingEvidence = commands.find((command) => command.name === "remote-tunnel-start")?.stdout.includes("direct_ping=failed_expected") === true
? `direct ${plan.providerId} container ping failed before tunnel`
: "direct ping did not fail before tunnel";
verification.sealedEgressEvidence = ping.stdout.includes("sealed_direct_ping=blocked_expected")
? `direct ${plan.providerId} container egress blocked after tunnel seal`
: "sealed direct egress block was not observed";
verification.masterProxyEvidenceBefore = ctx().safePreview(before.stdout, 2000);
verification.pingGoogleLog = ping.stdout;
verification.masterProxyEvidenceAfter = ctx().safePreview(after.stdout, 2000);
@@ -156,13 +159,14 @@ export async function startDevContainer(req: Request, providerFromPath: string |
containerWorkdir: plan.containerWorkdir,
},
masterProxy: {
mode: "ssh-tun-nat",
mode: "ssh-tun-nat-sealed",
masterHost: plan.masterHost,
tunId: plan.tunId,
tunName: plan.tunName,
serverIp: plan.serverIp,
clientIp: plan.clientIp,
natChain: plan.natChain,
egressFirewallChain: plan.egressFirewallChain,
},
verification: result.verification,
commands: result.commands,
@@ -175,11 +179,12 @@ export async function startDevContainer(req: Request, providerFromPath: string |
providerId,
containerName: plan.containerName,
masterProxy: {
mode: "ssh-tun-nat",
mode: "ssh-tun-nat-sealed",
masterHost: plan.masterHost,
tunName: plan.tunName,
clientIp: plan.clientIp,
natChain: plan.natChain,
egressFirewallChain: plan.egressFirewallChain,
},
}, 500);
}
@@ -193,7 +198,7 @@ export async function devContainerStatus(providerFromPath: string | null): Promi
CONTAINER=${ctx().shellQuote(plan.containerName)}
docker inspect "$CONTAINER" --format 'container={{.Name}} state={{.State.Status}} image={{.Config.Image}} workdir={{ index .Config.Labels "unidesk.workdir" }} started={{.State.StartedAt}}' 2>/dev/null || true
if docker inspect "$CONTAINER" >/dev/null 2>&1; then
docker exec "$CONTAINER" bash -lc 'export PATH=/tmp/unidesk-tools:$PATH; echo default=$(ip route show default | head -1); echo resolv=$(tr "\\n" " " </etc/resolv.conf); echo pwd=$(pwd); command -v codex || true; ip addr show ${plan.tunName} 2>/dev/null || true' || true
docker exec "$CONTAINER" bash -lc 'export PATH=/tmp/unidesk-tools:$PATH; source /tmp/unidesk-dev-egress.env 2>/dev/null || true; echo default=$(ip route show default | head -1); echo route_8_8_8_8=$(ip route get 8.8.8.8 | head -1); echo resolv=$(tr "\\n" " " </etc/resolv.conf); echo pwd=$(pwd); command -v codex || true; ip addr show ${plan.tunName} 2>/dev/null || true; if [ -n "\${EGRESS_CHAIN:-}" ]; then iptables -S "$EGRESS_CHAIN" 2>/dev/null || true; iptables -S OUTPUT 2>/dev/null | grep "$EGRESS_CHAIN" || true; fi' || true
fi`;
commands.push(await ctx().runCodeQueueSsh(providerId, statusScript, 15_000, "remote-container-status"));
commands.push(await ctx().runCodeQueueSsh("main-server", ctx().masterProxyEvidenceScript(plan), 15_000, "master-proxy-status"));
@@ -202,11 +207,12 @@ fi`;
providerId,
containerName: plan.containerName,
masterProxy: {
mode: "ssh-tun-nat",
mode: "ssh-tun-nat-sealed",
masterHost: plan.masterHost,
tunName: plan.tunName,
clientIp: plan.clientIp,
natChain: plan.natChain,
egressFirewallChain: plan.egressFirewallChain,
},
commands,
});
@@ -3777,7 +3777,7 @@ async function route(req: Request): Promise<Response> {
defaultProviderId: plan.providerId,
startEndpoint: `/api/dev-containers/${encodeURIComponent(plan.providerId)}/start`,
statusEndpoint: `/api/dev-containers/${encodeURIComponent(plan.providerId)}/status`,
masterProxyMode: "ssh-tun-nat",
masterProxyMode: "ssh-tun-nat-sealed",
defaultPlan: {
containerName: plan.containerName,
image: plan.image,
@@ -3787,6 +3787,7 @@ async function route(req: Request): Promise<Response> {
serverIp: plan.serverIp,
clientIp: plan.clientIp,
natChain: plan.natChain,
egressFirewallChain: plan.egressFirewallChain,
},
});
}
@@ -118,7 +118,7 @@ function fallbackJudge(result: CodexRunResult, minimaxError?: string | JudgeFail
}
export const retryInstruction = "这是同一个 Codex thread 的 continuation,不是新任务。请基于上文继续完成原始任务;只做最小必要状态核查,避免从头重新摸索、避免重复已经完成的修改。";
const codeQueueD601DeployCommand = "bun scripts/cli.ts ssh D601 'cd /home/ubuntu/unidesk-code-queue-deploy && CODE_QUEUE_ENV_FILE=/home/ubuntu/unidesk-code-queue-deploy/.state/code-queue-d601.env docker compose -f src/components/microservices/code-queue/docker-compose.d601.yml up -d --build --force-recreate code-queue'";
const codeQueueD601DeployCommand = "bun scripts/cli.ts ssh D601 'cd ~/cq-deploy && CODE_QUEUE_ENV_FILE=.state/code-queue-d601.env docker compose -f src/components/microservices/code-queue/docker-compose.d601.yml up -d --build --force-recreate code-queue'";
const codeQueueRestartSafetyGuidance = `Code Queue 服务具备 restart-recovery,允许在任何时候重启、重建或替换 D601 的 \`code-queue-backend\`;当当前任务修改 Code Queue 自身时,禁止等待当前 Code Queue task 退出或等待队列归零后再重启,因为这会等待自己退出形成自锁。正确做法是在 D601 直接执行 \`${codeQueueD601DeployCommand}\` 或等价 no-deps force-recreate,并在恢复后用 live health/task 查询验证。`;
export function explicitUserInterrupt(task: QueueTask, result: CodexRunResult): boolean {
@@ -28,6 +28,20 @@ export interface OaTraceStats extends JsonRecord {
source: "oa-event-flow";
}
export interface OaTraceStepSummary {
eventSequence: number;
seq: number;
at: string;
kind: string;
title: string;
status: string;
summaryLines: string[];
rawSeqs: number[];
scopeId: string;
attemptIndex: number | null;
source: "oa-event-flow";
}
const postTimeoutMs = 2500;
let context: OaEventContext | null = null;
@@ -119,6 +133,19 @@ function recordString(record: Record<string, unknown> | null, keys: string[]): s
return "";
}
function asJsonRecord(value: unknown): JsonRecord {
return typeof value === "object" && value !== null && !Array.isArray(value) ? value as JsonRecord : {};
}
function stringValue(value: unknown, fallback = ""): string {
return typeof value === "string" ? value : fallback;
}
function positiveInteger(value: unknown): number | null {
const parsed = Number(value);
return Number.isInteger(parsed) && parsed > 0 ? Math.floor(parsed) : null;
}
export function outputTraceKind(output: LiveOutput): "read" | "edit" | "run" | "error" | "message" | "system" {
if (output.channel === "diff") return "edit";
if (output.channel === "error") return "error";
@@ -338,6 +365,125 @@ export async function readOaTraceStatsForTaskAttempts(taskId: string, attemptInd
return readOaTraceStatsForScopeIds([taskScopeId(taskId), ...uniqueAttempts.map((index) => taskAttemptScopeId(taskId, index))]);
}
function eventPayloadRecord(event: unknown): JsonRecord {
if (typeof event !== "object" || event === null || Array.isArray(event)) return {};
return asJsonRecord((event as Record<string, unknown>).payload);
}
function stringList(value: unknown, maxItems = 8): string[] {
if (!Array.isArray(value)) return [];
return value.map((item) => String(item ?? "").trimEnd()).filter((item) => item.length > 0).slice(0, maxItems);
}
function numberList(value: unknown, fallback: number): number[] {
if (!Array.isArray(value)) return [fallback];
const values = value.map((item) => Number(item)).filter((item) => Number.isFinite(item)).map((item) => Math.floor(item));
return values.length > 0 ? values : [fallback];
}
function commandLifecycleStatus(payload: JsonRecord, title: string, summaryLines: string[]): string {
const source = [title, ...summaryLines].join("\n");
const status = /\bstatus=([A-Za-z0-9_-]+)/u.exec(source)?.[1];
if (status !== undefined && status.length > 0) return status;
const method = stringValue(payload.method);
if (method.length > 0) return method;
return stringValue(payload.status);
}
function traceStepFromEvent(event: unknown): OaTraceStepSummary | null {
if (typeof event !== "object" || event === null || Array.isArray(event)) return null;
const record = event as Record<string, unknown>;
const payload = eventPayloadRecord(record);
const seq = Number(payload.seq ?? payload.stepSeq ?? payload.outputSeq);
if (!Number.isFinite(seq)) return null;
const eventSequence = Number(record.sequence ?? 0);
const title = stringValue(payload.title, stringValue(payload.method, stringValue(payload.kind))).slice(0, 300);
const summaryLines = stringList(payload.summaryLines);
return {
eventSequence: Number.isFinite(eventSequence) ? Math.floor(eventSequence) : 0,
seq: Math.floor(seq),
at: stringValue(record.createdAt, stringValue(payload.createdAt, ctx().nowIso())),
kind: stringValue(payload.kind, stringValue(payload.stepKind, stringValue(payload.channel, "system"))),
title,
status: commandLifecycleStatus(payload, title, summaryLines).slice(0, 120),
summaryLines,
rawSeqs: numberList(payload.rawSeqs, Math.floor(seq)),
scopeId: stringValue(payload.scopeId, stringValue(payload.attemptScopeId)),
attemptIndex: positiveInteger(payload.attemptIndex),
source: "oa-event-flow",
};
}
function eventNextAfterSeq(body: Record<string, unknown>, events: unknown[], fallback: number): number {
const bodyNext = Number(body.nextAfterSeq);
const eventNext = events.reduce<number>((max, event) => {
if (typeof event !== "object" || event === null || Array.isArray(event)) return max;
const sequence = Number((event as Record<string, unknown>).sequence);
return Number.isFinite(sequence) ? Math.max(max, Math.floor(sequence)) : max;
}, fallback);
return Number.isFinite(bodyNext) && bodyNext > fallback ? Math.max(eventNext, Math.floor(bodyNext)) : eventNext;
}
const traceStepReadPageLimit = 500;
const traceStepReadMaxPages = 40;
const traceStepReadAttempts = 3;
const traceStepReadTimeoutMs = 10000;
async function waitMs(ms: number): Promise<void> {
await new Promise((resolve) => setTimeout(resolve, ms));
}
async function fetchOaTraceStepPage(url: URL): Promise<Record<string, unknown>> {
let lastError: unknown = null;
for (let attempt = 1; attempt <= traceStepReadAttempts; attempt += 1) {
const controller = new AbortController();
const timer = setTimeout(() => controller.abort(), traceStepReadTimeoutMs);
try {
const response = await fetch(url, { signal: controller.signal });
if (!response.ok) throw new Error(`status=${response.status}`);
return await response.json() as Record<string, unknown>;
} catch (error) {
lastError = error;
if (attempt < traceStepReadAttempts) await waitMs(250 * attempt);
} finally {
clearTimeout(timer);
}
}
throw lastError instanceof Error ? lastError : new Error(String(lastError ?? "unknown error"));
}
export async function readOaTraceStepsForTask(taskId: string, attemptIndex: number | null = null): Promise<OaTraceStepSummary[]> {
const cleanTaskId = String(taskId || "").trim();
if (cleanTaskId.length === 0) return [];
const runtime = ctx();
const tags = [`task:${cleanTaskId}`, "trace"];
if (attemptIndex !== null && Number.isInteger(attemptIndex) && attemptIndex > 0) tags.push(`attempt:${Math.floor(attemptIndex)}`);
const bySeq = new Map<number, OaTraceStepSummary>();
let afterSeq = 0;
for (let page = 0; page < traceStepReadMaxPages; page += 1) {
const url = new URL(`${runtime.baseUrl}/api/events`);
url.searchParams.set("tags", tags.join(","));
url.searchParams.set("type", "trace-step-created");
url.searchParams.set("limit", String(traceStepReadPageLimit));
url.searchParams.set("afterSeq", String(afterSeq));
try {
const body = await fetchOaTraceStepPage(url);
const events = Array.isArray(body.events) ? body.events : [];
for (const event of events) {
const step = traceStepFromEvent(event);
if (step !== null) bySeq.set(step.seq, { ...(bySeq.get(step.seq) ?? {}), ...step });
}
const nextAfterSeq = eventNextAfterSeq(body, events, afterSeq);
if (events.length < traceStepReadPageLimit || nextAfterSeq <= afterSeq) break;
afterSeq = nextAfterSeq;
} catch (error) {
runtime.logger("warn", "oa_trace_steps_read_failed", { taskId: cleanTaskId, attemptIndex, page, error: error instanceof Error ? error.message : String(error) });
throw error;
}
}
return Array.from(bySeq.values()).sort((left, right) => left.seq - right.seq);
}
function statNumber(stats: OaTraceStats | null | undefined, key: string): number | null {
const value = Number(stats?.[key]);
return Number.isFinite(value) && value >= 0 ? Math.floor(value) : null;
@@ -118,7 +118,7 @@ function buildDevContainerPlan(providerId: string, body: Record<string, unknown>
? imageFromBody
: ctx().config.devContainerImage.length > 0
? ctx().config.devContainerImage
: "unidesk-code-queue:latest";
: `unidesk-code-queue:${safeProvider.toLowerCase()}`;
const workdir = workdirFromBody.length > 0 ? workdirFromBody : ctx().config.devContainerWorkdir;
const thirdOctet = providerId === "D601" ? 6 : Math.max(1, Math.min(250, Math.floor(tunId / 64)));
const baseOctet = providerId === "D601" ? 0 : (tunId % 64) * 4;
@@ -138,6 +138,7 @@ function buildDevContainerPlan(providerId: string, body: Record<string, unknown>
serverIp: `10.214.${thirdOctet}.${serverLastOctet}`,
clientIp: `10.214.${thirdOctet}.${clientLastOctet}`,
natChain: safeDockerName(`UNIDESK-CODEX-DEV-${safeProvider}`).toUpperCase().slice(0, 28),
egressFirewallChain: safeDockerName(`UD-CQ-EGRESS-${safeProvider}`).toUpperCase().slice(0, 28),
keyDir: `/home/ubuntu/.unidesk/codex-dev-proxy/${safeProvider}`,
masterKeyPath: resolve(ctx().config.defaultWorkdir, ".state/code-queue/dev-proxy", safeProvider, "id_ed25519"),
};
@@ -151,18 +152,27 @@ function appendLimited(buffer: string, chunk: Buffer | string, maxBytes: number)
return { value: buffer + text.slice(0, remaining), truncated: true };
}
function runCodeQueueSsh(providerId: string, script: string, timeoutMs: number, name: string): Promise<DevContainerCommandLog> {
const started = Date.now();
interface CliRunResult {
code: number | null;
signal: NodeJS.Signals | null;
stdout: string;
stderr: string;
timedOut: boolean;
truncated: boolean;
spawnError: Error | null;
}
function runCliCommand(args: string[], timeoutMs: number): Promise<CliRunResult> {
const maxBuffer = 8 * 1024 * 1024;
return new Promise((resolveLog) => {
return new Promise((resolveRun) => {
let stdout = "";
let stderr = "";
let truncated = false;
let timedOut = false;
let spawnError: Error | null = null;
const child = spawn("bun", ["scripts/cli.ts", "ssh", providerId, "bash -s"], {
const child = spawn("bun", args, {
cwd: ctx().config.defaultWorkdir,
stdio: ["pipe", "pipe", "pipe"],
stdio: ["ignore", "pipe", "pipe"],
});
const timer = setTimeout(() => {
timedOut = true;
@@ -183,28 +193,152 @@ function runCodeQueueSsh(providerId: string, script: string, timeoutMs: number,
child.on("error", (error) => {
spawnError = error;
});
child.stdin.on("error", () => undefined);
child.on("close", (code, signal) => {
clearTimeout(timer);
const errorParts: string[] = [];
if (stderr.length > 0) errorParts.push(stderr);
if (timedOut) errorParts.push(`Error: command timed out after ${timeoutMs}ms`);
if (spawnError !== null) errorParts.push(`${spawnError.name}: ${spawnError.message}`);
if (truncated) errorParts.push(`output truncated at ${maxBuffer} bytes`);
resolveLog({
name,
providerId,
exitCode: timedOut && code === 0 ? null : code,
resolveRun({
code: timedOut && code === 0 ? null : code,
signal: signal as NodeJS.Signals | null,
durationMs: Date.now() - started,
stdout,
stderr: errorParts.join("\n").trim(),
stderr,
timedOut,
truncated,
spawnError,
});
});
child.stdin.end(script);
});
}
function buildFrontendHostSshArgs(providerId: string, command: string, timeoutMs: number): string[] {
const waitMs = Math.max(timeoutMs + 5_000, 20_000);
return [
"scripts/cli.ts",
"--main-server-ip",
ctx().config.devContainerMasterHost,
"debug",
"dispatch",
providerId,
"host.ssh",
"--wait-ms",
String(waitMs),
"--payload-json",
JSON.stringify({ source: "code-queue", mode: "exec", command, timeoutMs }),
];
}
function parseFrontendHostSshResult(run: CliRunResult): { stdout: string; stderr: string; exitCode: number | null } {
let stdout = run.stdout;
let stderr = run.stderr;
let exitCode = run.code;
try {
const parsed = JSON.parse(run.stdout) as {
ok?: boolean;
error?: unknown;
data?: {
dispatch?: unknown;
wait?: {
task?: {
status?: string;
result?: {
stdout?: string;
stderr?: string;
exitCode?: number;
};
};
};
};
};
const task = parsed.data?.wait?.task;
const result = task?.result;
if (result !== undefined) {
stdout = typeof result.stdout === "string" ? result.stdout : "";
stderr = [typeof result.stderr === "string" ? result.stderr : "", run.stderr].filter((part) => part.length > 0).join("\n");
exitCode = typeof result.exitCode === "number" ? result.exitCode : task?.status === "succeeded" ? 0 : 1;
} else if (parsed.ok === false || task?.status === "failed") {
stdout = "";
stderr = [JSON.stringify(parsed.error ?? task ?? parsed.data?.dispatch ?? parsed), run.stderr].filter((part) => part.length > 0).join("\n");
exitCode = 1;
}
} catch {
// Keep raw stdout/stderr when the CLI itself failed before returning dispatch JSON.
}
return { stdout, stderr, exitCode };
}
async function runFrontendHostSsh(providerId: string, command: string, timeoutMs: number): Promise<CliRunResult & { parsedStdout: string; parsedStderr: string; parsedExitCode: number | null }> {
const run = await runCliCommand(buildFrontendHostSshArgs(providerId, command, timeoutMs), Math.max(timeoutMs + 15_000, 30_000));
const parsed = parseFrontendHostSshResult(run);
return { ...run, parsedStdout: parsed.stdout, parsedStderr: parsed.stderr, parsedExitCode: parsed.exitCode };
}
function encodedScriptCommand(encodedScript: string): string {
return [
"set -euo pipefail",
"tmp=$(mktemp /tmp/unidesk-code-queue-ssh.XXXXXX)",
"cleanup(){ rm -f \"$tmp\"; }",
"trap cleanup EXIT",
"base64 -d > \"$tmp\" <<'UNIDESK_CODE_QUEUE_SCRIPT'",
encodedScript,
"UNIDESK_CODE_QUEUE_SCRIPT",
"bash \"$tmp\"",
].join("\n");
}
function commandLogFromRun(name: string, providerId: string, started: number, run: CliRunResult & { parsedStdout: string; parsedStderr: string; parsedExitCode: number | null }): DevContainerCommandLog {
const errorParts: string[] = [];
if (run.parsedStderr.length > 0) errorParts.push(run.parsedStderr);
if (run.timedOut) errorParts.push(`Error: command timed out after ${Date.now() - started}ms`);
if (run.spawnError !== null) errorParts.push(`${run.spawnError.name}: ${run.spawnError.message}`);
if (run.truncated) errorParts.push("output truncated at 8388608 bytes");
return {
name,
providerId,
exitCode: run.parsedExitCode,
signal: run.signal,
durationMs: Date.now() - started,
stdout: run.parsedStdout,
stderr: errorParts.join("\n").trim(),
};
}
async function uploadAndRunLargeScript(providerId: string, encodedScript: string, timeoutMs: number, name: string, started: number): Promise<DevContainerCommandLog> {
const token = `${Date.now()}-${Math.random().toString(16).slice(2)}`.replace(/[^A-Za-z0-9.-]/gu, "-");
const b64Path = `/tmp/unidesk-code-queue-${token}.b64`;
const scriptPath = `/tmp/unidesk-code-queue-${token}.sh`;
const init = await runFrontendHostSsh(providerId, `set -euo pipefail\numask 077\n: > ${shellQuote(b64Path)}`, 15_000);
if (init.parsedExitCode !== 0) return commandLogFromRun(name, providerId, started, init);
const chunkSize = 1800;
for (let offset = 0; offset < encodedScript.length; offset += chunkSize) {
const chunk = encodedScript.slice(offset, offset + chunkSize);
const appendCommand = [
"set -euo pipefail",
`cat >> ${shellQuote(b64Path)} <<'UNIDESK_CODE_QUEUE_CHUNK'`,
chunk,
"UNIDESK_CODE_QUEUE_CHUNK",
].join("\n");
const append = await runFrontendHostSsh(providerId, appendCommand, 15_000);
if (append.parsedExitCode !== 0) return commandLogFromRun(name, providerId, started, append);
}
const runCommand = [
"set -uo pipefail",
`base64 -d ${shellQuote(b64Path)} > ${shellQuote(scriptPath)}`,
`rm -f ${shellQuote(b64Path)}`,
`bash ${shellQuote(scriptPath)}`,
"code=$?",
`rm -f ${shellQuote(scriptPath)}`,
"exit \"$code\"",
].join("\n");
const run = await runFrontendHostSsh(providerId, runCommand, timeoutMs);
return commandLogFromRun(name, providerId, started, run);
}
function runCodeQueueSsh(providerId: string, script: string, timeoutMs: number, name: string): Promise<DevContainerCommandLog> {
const started = Date.now();
const encodedScript = Buffer.from(script, "utf8").toString("base64");
const inlineCommand = encodedScriptCommand(encodedScript);
if (inlineCommand.length > 3_400) return uploadAndRunLargeScript(providerId, encodedScript, timeoutMs, name, started);
return runFrontendHostSsh(providerId, inlineCommand, timeoutMs).then((run) => commandLogFromRun(name, providerId, started, run));
}
function throwIfCommandFailed(command: DevContainerCommandLog): void {
if (command.exitCode === 0) return;
throw new Error(`${command.name} failed on ${command.providerId ?? "local"} exit=${command.exitCode} stderr=${ctx().safePreview(command.stderr, 1000)}`);
@@ -295,8 +429,6 @@ function remoteContainerStartScript(plan: DevContainerPlan, forceRecreate: boole
return `set -euo pipefail
CONTAINER=${shellQuote(plan.containerName)}
REQUESTED_IMAGE=${shellQuote(plan.image)}
CODEX_IMAGE=unidesk-code-queue:latest
FALLBACK_IMAGE=${shellQuote(`unidesk_provider-gateway:${plan.providerId.toLowerCase()}`)}
KEY_DIR=${shellQuote(plan.keyDir)}
WORKDIR=${shellQuote(plan.workdir)}
CONTAINER_WORKDIR=${shellQuote(plan.containerWorkdir)}
@@ -306,16 +438,9 @@ SSH_MOUNT_ARGS=()
if [ -d "$SSH_DIR" ]; then SSH_MOUNT_ARGS=(-v "$SSH_DIR":/root/.ssh:ro); fi
HOST_PATH_MOUNT_ARGS=()
if [ -d /mnt ]; then HOST_PATH_MOUNT_ARGS+=(-v /mnt:/mnt); fi
if ! docker image inspect "$IMAGE" >/dev/null 2>&1 && docker image inspect "$CODEX_IMAGE" >/dev/null 2>&1; then
IMAGE="$CODEX_IMAGE"
fi
if ! docker image inspect "$IMAGE" >/dev/null 2>&1; then
if docker image inspect "$FALLBACK_IMAGE" >/dev/null 2>&1; then
IMAGE="$FALLBACK_IMAGE"
else
echo "missing requested image $REQUESTED_IMAGE, codex image $CODEX_IMAGE and fallback $FALLBACK_IMAGE" >&2
exit 1
fi
echo "missing dev container image $REQUESTED_IMAGE" >&2
exit 1
fi
test -r "$KEY_DIR/id_ed25519"
test -r "$KEY_DIR/known_hosts"
@@ -336,12 +461,15 @@ cid=$(docker run -d \\
--name "$CONTAINER" \\
--hostname ${shellQuote(`codex-dev-${plan.providerId}`)} \\
--user root \\
--no-healthcheck \\
--cap-add NET_ADMIN \\
--device /dev/net/tun \\
--add-host host.docker.internal:host-gateway \\
--label unidesk.role=codex-dev \\
--label unidesk.provider=${shellQuote(plan.providerId)} \\
--label "unidesk.workdir=$WORKDIR" \\
--label com.docker.compose.project=unidesk-code-queue-dev \\
--label com.docker.compose.service=codex-dev \\
--env-file "$KEY_DIR/codex-env" \\
-e CODEX_HOME=${shellQuote(plan.remoteCodexHome)} \\
-e CODEX_INTERNAL_ORIGINATOR_OVERRIDE=unidesk_code_queue \\
@@ -362,6 +490,7 @@ cid=$(docker run -d \\
"$IMAGE" \\
bash -lc 'mkdir -p /tmp/unidesk-tools; ln -sf /usr/local/bin/busybox /tmp/unidesk-tools/ip; ln -sf /usr/local/bin/busybox /tmp/unidesk-tools/ping; export PATH=/tmp/unidesk-tools:$PATH; echo ready; sleep infinity')
docker exec "$CONTAINER" bash -lc 'mkdir -p /tmp/unidesk-tools; ln -sf /usr/local/bin/busybox /tmp/unidesk-tools/ip; ln -sf /usr/local/bin/busybox /tmp/unidesk-tools/ping; export PATH=/tmp/unidesk-tools:$PATH; echo container=$(hostname); pwd; ip route show default; ls -ld /dev/net/tun /run/unidesk-dev-proxy/id_ed25519 /var/lib/unidesk/code-queue/codex-home /var/lib/unidesk/code-queue/opencode-xdg; command -v ssh; command -v ip; command -v ping'
docker exec "$CONTAINER" bash -lc 'command -v iptables; iptables --version'
echo "remote_container_ready container=$CONTAINER cid=$cid image=$IMAGE workdir=$WORKDIR"`;
}
@@ -393,14 +522,21 @@ mkdir -p /tmp/unidesk-tools
ln -sf /usr/local/bin/busybox /tmp/unidesk-tools/ip
ln -sf /usr/local/bin/busybox /tmp/unidesk-tools/ping
export PATH=/tmp/unidesk-tools:$PATH
MASTER=${plan.masterHost}
TUN_ID=${plan.tunId}
TUN=${plan.tunName}
CLIENT_IP=${plan.clientIp}
SERVER_IP=${plan.serverIp}
MASTER=${shellQuote(plan.masterHost)}
TUN_ID=${shellQuote(String(plan.tunId))}
TUN=${shellQuote(plan.tunName)}
CLIENT_IP=${shellQuote(plan.clientIp)}
SERVER_IP=${shellQuote(plan.serverIp)}
EGRESS_CHAIN=${shellQuote(plan.egressFirewallChain)}
KNOWN_HOSTS=/tmp/unidesk-dev-proxy-known_hosts
cp /run/unidesk-dev-proxy/known_hosts "$KNOWN_HOSTS"
chmod 600 "$KNOWN_HOSTS"
MASTER_IP=$(getent ahostsv4 "$MASTER" 2>/dev/null | awk '{print $1; exit}' || true)
if [ -z "$MASTER_IP" ]; then MASTER_IP="$MASTER"; fi
case "$MASTER_IP" in
*.*.*.*) ;;
*) echo "cannot resolve IPv4 master host: $MASTER" >&2; exit 1 ;;
esac
DEFAULT_LINE=$(ip route show default | head -1)
DEFAULT_GW=$(printf '%s\\n' "$DEFAULT_LINE" | sed -n 's/^default via \\([^ ]*\\) dev \\([^ ]*\\).*/\\1/p')
DEFAULT_DEV=$(printf '%s\\n' "$DEFAULT_LINE" | sed -n 's/^default via \\([^ ]*\\) dev \\([^ ]*\\).*/\\2/p')
@@ -415,6 +551,10 @@ else
fi
if [ -z "$ORIG_GW" ] || [ -z "$ORIG_DEV" ]; then echo "cannot detect docker bridge route: default=$DEFAULT_LINE link=$LINK_ROUTE" >&2; exit 1; fi
( ping -c 1 -W 3 google.com >/tmp/direct-ping.log 2>&1 && echo direct_ping=unexpected_ok ) || echo direct_ping=failed_expected
if ! command -v iptables >/dev/null 2>&1; then
echo "iptables is required for sealed dev-container egress" >&2
exit 1
fi
if command -v pkill >/dev/null 2>&1; then
pkill -f "ssh .* -w $TUN_ID:$TUN_ID .*$MASTER" >/dev/null 2>&1 || true
elif command -v busybox >/dev/null 2>&1; then
@@ -425,7 +565,11 @@ elif command -v busybox >/dev/null 2>&1; then
done
fi
ip link delete "$TUN" >/dev/null 2>&1 || true
ip route replace $MASTER/32 via "$ORIG_GW" dev "$ORIG_DEV"
while iptables -w -D OUTPUT -j "$EGRESS_CHAIN" >/dev/null 2>&1; do :; done
if command -v ip6tables >/dev/null 2>&1; then
while ip6tables -w -D OUTPUT -j "\${EGRESS_CHAIN}6" >/dev/null 2>&1; do :; done
fi
ip route replace "$MASTER_IP/32" via "$ORIG_GW" dev "$ORIG_DEV"
ip route replace default via "$ORIG_GW" dev "$ORIG_DEV"
SSH_LOG=/tmp/unidesk-dev-proxy-ssh.log
rm -f "$SSH_LOG"
@@ -445,7 +589,37 @@ ip addr replace $CLIENT_IP peer $SERVER_IP dev "$TUN"
ip link set "$TUN" up
ip route replace default via $SERVER_IP dev "$TUN"
printf 'nameserver 8.8.8.8\\nnameserver 1.1.1.1\\noptions timeout:2 attempts:2\\n' > /etc/resolv.conf
echo "container_tunnel_ready orig=$ORIG_GW/$ORIG_DEV route_to_master=$(ip route get $MASTER | head -1) default=$(ip route show default | head -1) dns=$(tr '\\n' ' ' </etc/resolv.conf)"
iptables -w -N "$EGRESS_CHAIN" 2>/dev/null || true
iptables -w -F "$EGRESS_CHAIN"
iptables -w -A "$EGRESS_CHAIN" -o lo -j RETURN
iptables -w -A "$EGRESS_CHAIN" -m conntrack --ctstate ESTABLISHED,RELATED -j RETURN
iptables -w -A "$EGRESS_CHAIN" -o "$TUN" -j RETURN
iptables -w -A "$EGRESS_CHAIN" -d "$MASTER_IP/32" -o "$ORIG_DEV" -p tcp --dport 22 -j RETURN
iptables -w -A "$EGRESS_CHAIN" -j REJECT --reject-with icmp-port-unreachable
iptables -w -I OUTPUT 1 -j "$EGRESS_CHAIN"
if command -v ip6tables >/dev/null 2>&1; then
CHAIN6="\${EGRESS_CHAIN}6"
ip6tables -w -N "$CHAIN6" 2>/dev/null || true
ip6tables -w -F "$CHAIN6"
ip6tables -w -A "$CHAIN6" -o lo -j RETURN
ip6tables -w -A "$CHAIN6" -m conntrack --ctstate ESTABLISHED,RELATED -j RETURN
ip6tables -w -A "$CHAIN6" -d ::1/128 -j RETURN
ip6tables -w -A "$CHAIN6" -d fc00::/7 -j RETURN
ip6tables -w -A "$CHAIN6" -d fe80::/10 -j RETURN
ip6tables -w -A "$CHAIN6" -j REJECT
ip6tables -w -I OUTPUT 1 -j "$CHAIN6"
fi
cat > /tmp/unidesk-dev-egress.env <<EOF
EGRESS_CHAIN=$EGRESS_CHAIN
ORIG_DEV=$ORIG_DEV
TUN=$TUN
MASTER=$MASTER
MASTER_IP=$MASTER_IP
SERVER_IP=$SERVER_IP
EOF
echo "container_tunnel_ready orig=$ORIG_GW/$ORIG_DEV route_to_master=$(ip route get $MASTER_IP | head -1) default=$(ip route show default | head -1) dns=$(tr '\\n' ' ' </etc/resolv.conf)"
echo "sealed_egress_ready chain=$EGRESS_CHAIN allowed=tun:$TUN,master:$MASTER_IP:22/$ORIG_DEV"
iptables -S "$EGRESS_CHAIN"
INNER`;
}
@@ -473,7 +647,7 @@ ip -s link show "$TUN"`;
function devContainerPingScript(plan: DevContainerPlan): string {
return `set -euo pipefail
CONTAINER=${shellQuote(plan.containerName)}
docker exec "$CONTAINER" bash -lc 'export PATH=/tmp/unidesk-tools:$PATH; echo route=$(ip route show default | head -1); echo resolv=$(tr "\\n" " " </etc/resolv.conf); ping -c 1 -W 5 google.com'`;
docker exec "$CONTAINER" bash -lc 'export PATH=/tmp/unidesk-tools:$PATH; source /tmp/unidesk-dev-egress.env 2>/dev/null || true; echo route=$(ip route show default | head -1); echo route_8_8_8_8=$(ip route get 8.8.8.8 | head -1); echo resolv=$(tr "\\n" " " </etc/resolv.conf); if [ -n "\${EGRESS_CHAIN:-}" ]; then iptables -S "$EGRESS_CHAIN"; fi; if [ -n "\${ORIG_DEV:-}" ]; then ( ping -I "$ORIG_DEV" -c 1 -W 3 8.8.8.8 >/tmp/sealed-direct-ping.log 2>&1 && echo sealed_direct_ping=unexpected_ok ) || echo sealed_direct_ping=blocked_expected; fi; ping -c 1 -W 5 google.com'`;
}
function windowsNativeBridgeServerSource(): string {
@@ -22,6 +22,7 @@ import { codeAgentPortForModel, codeAgentPortInfo, codeExecutionModeInfo, extrac
import { currentTaskPromptMarker, resolvedReferenceContextTitle, stripCodeQueueEnvironmentHint, userPromptForDisplay } from "./prompts";
import { outputArchiveSignature, taskFullOutput } from "./task-output";
import { retryPrompt } from "./judge";
import { readOaTraceStepsForTask, type OaTraceStepSummary } from "./oa-events";
export interface TaskViewContext {
config: Pick<RuntimeConfig, "codexHome">;
@@ -1716,6 +1717,37 @@ function traceLineVisibleInTraceView(line: TranscriptLine): boolean {
return line.kind !== "system" || traceSystemLineIsError(line);
}
function oaTraceStepKind(kind: string): TranscriptKind {
const normalized = String(kind || "").trim().toLowerCase();
if (normalized === "read" || normalized === "explored" || normalized === "explore") return "explored";
if (normalized === "edit" || normalized === "edited") return "edited";
if (normalized === "run" || normalized === "ran" || normalized === "command") return "ran";
if (normalized === "error" || normalized === "failed") return "error";
if (normalized === "message" || normalized === "assistant" || normalized === "user") return "message";
return "system";
}
function oaTraceStepToTranscriptLine(step: OaTraceStepSummary): TranscriptLine {
const kind = oaTraceStepKind(step.kind);
const summaryText = step.summaryLines.join("\n").trimEnd();
const commandText = summaryText.startsWith("item/") ? summaryText : "";
return {
seq: step.seq,
at: step.at,
kind,
title: step.title || (kind === "explored" ? "Read" : kind === "edited" ? "Edit" : kind === "ran" ? "Run" : "Trace step"),
status: step.status || undefined,
commandPreview: commandText.length > 0 ? commandText : undefined,
bodyPreview: commandText.length > 0 ? undefined : summaryText || undefined,
rawSeqs: step.rawSeqs,
};
}
async function oaTraceTranscriptForTask(taskId: string, attemptIndex: number | null): Promise<TranscriptLine[]> {
const steps = await readOaTraceStepsForTask(taskId, attemptIndex);
return steps.map(oaTraceStepToTranscriptLine).filter(traceLineVisibleInTraceView);
}
function mergeTraceWindowLines(left: TranscriptLine[], right: TranscriptLine[]): TranscriptLine[] {
const seen = new Set<string>();
const merged: TranscriptLine[] = [];
@@ -2114,14 +2146,11 @@ function taskPromptDetailResponse(task: QueueTask, url: URL): Response {
});
}
function taskTraceStepsResponse(task: QueueTask, url: URL): Response {
async function taskTraceStepsResponse(task: QueueTask, url: URL): Promise<Response> {
const limit = ctx().parseLimit(url);
const attemptIndex = ctx().parseSeqParam(url, "attempt", null);
const agentPort = codeAgentPortForModel(task.model);
const previewTranscript = cachedPreviewTranscript(task);
const attemptWindow = attemptIndex === null ? null : traceAttemptWindows(task, previewTranscript).find((window) => window.index === attemptIndex) ?? null;
const sourceTranscript = attemptWindow === null ? previewTranscript : executionLinesForAttempt(attemptWindow.lines);
const transcript = sourceTranscript.filter(traceLineVisibleInTraceView);
const transcript = await oaTraceTranscriptForTask(task.id, attemptIndex);
const page = ctx().pageBySeq(transcript, url, limit);
return ctx().jsonResponse({
ok: true,
@@ -2132,6 +2161,7 @@ function taskTraceStepsResponse(task: QueueTask, url: URL): Response {
agentPort,
agentPortInfo: codeAgentPortInfo(agentPort),
attempt: attemptIndex,
source: "oa-event-flow",
steps: page.chunk.map((line) => ({
seq: line.seq,
at: line.at,
@@ -2155,11 +2185,11 @@ function taskTraceStepsResponse(task: QueueTask, url: URL): Response {
});
}
function taskTraceStepDetailResponse(task: QueueTask, url: URL): Response {
async function taskTraceStepDetailResponse(task: QueueTask, url: URL): Promise<Response> {
const seq = Number(url.searchParams.get("seq"));
if (!Number.isFinite(seq)) return ctx().jsonResponse({ ok: false, error: "seq is required" }, 400);
const agentPort = codeAgentPortForModel(task.model);
const transcript = fullTranscript(task).filter(traceLineVisibleInTraceView);
const transcript = await oaTraceTranscriptForTask(task.id, null);
const line = transcript.find((item) => Number(item.seq) === seq || item.rawSeqs.includes(seq));
if (line === undefined) return ctx().jsonResponse({ ok: false, error: "trace step not found", seq }, 404);
return ctx().jsonResponse({
@@ -446,6 +446,7 @@ export interface DevContainerPlan {
serverIp: string;
clientIp: string;
natChain: string;
egressFirewallChain: string;
keyDir: string;
masterKeyPath: string;
}