From 41fdaba973a853051fa9ffcdd89daebcbc9c2501 Mon Sep 17 00:00:00 2001 From: Codex Date: Fri, 8 May 2026 03:57:53 +0000 Subject: [PATCH] feat: harden codex queue runtime Add model selection, batch enqueue controls, dev-ready health checks, transcript pagination, queue watchdog recovery, and MiniMax judge JSON repair for codex-queue. --- TEST.md | 2 +- config.json | 2 +- docker-compose.yml | 8 +- docs/reference/frontend.md | 2 +- docs/reference/microservices.md | 13 +- scripts/src/docker.ts | 3 +- scripts/src/e2e.ts | 5 +- src/components/backend-core/src/index.ts | 1 + src/components/frontend/public/style.css | 249 +++++- src/components/frontend/src/codex-queue.tsx | 461 +++++++++-- src/components/frontend/src/index.ts | 1 + .../microservices/codex-queue/Dockerfile | 32 +- .../microservices/codex-queue/src/index.ts | 778 ++++++++++++++++-- 13 files changed, 1415 insertions(+), 142 deletions(-) diff --git a/TEST.md b/TEST.md index 5dff0efa..2e825420 100644 --- a/TEST.md +++ b/TEST.md @@ -95,7 +95,7 @@ ## T23 Main Server Codex Queue Microservice -阅读 `AGENTS.md`(本项目 `AGENTS.md` 同时承担 `SKILL.md` 对 `scripts/cli.ts` 的解释职责),然后用 cli 手动测试以下内容:运行 `bun scripts/cli.ts microservice list`,确认 `codex-queue` 显示为 `providerId=main-server`、`public=false`、`frontendOnly=true`、仓库 URL `https://github.com/pikasTech/unidesk`、`codex-queue:4222` 后端映射和 `codex-queue-backend` 容器摘要;运行 `bun scripts/cli.ts server rebuild codex-queue` 并用 `bun scripts/cli.ts job status latest` 等待成功,再运行 `bun scripts/cli.ts microservice health codex-queue` 和 `bun scripts/cli.ts microservice proxy codex-queue /api/tasks`,确认链路通过 backend-core、main-server provider-gateway 和 Codex Queue 后端。随后登录公网 frontend `http://74.48.78.17:18081/`,进入 `微服务 / Codex Queue`,确认页面显示默认模型 `gpt-5.4-mini`、队列指标、任务提交表单、Codex CLI-like 输出、attempt 表、MiniMax/fallback judge 状态、追加 prompt、打断和重试控件;通过页面提交一个小任务,确认任务进入 queued/running/succeeded 或可解释的 failed 状态,并且输出区能看到运行中的 Codex 消息。测试异常中断时可以提交长任务后点击 `打断`,确认任务变为 canceled 或被 judge 标记为非成功终态;自动重试只应在服务端/传输异常、任务正常结束但 execution record 显示未完成、或 judge 判定 retry 时发生;retry 必须复用已有 Codex thread 并 append 继续执行 prompt,只有当前任务 complete 后才推进队列中的下一个任务。Codex provider key 只能通过 `OPENAI_API_KEY`、`CRS_OAI_KEY` 这类运行时环境透传,MiniMax API key 只能通过 `UNIDESK_CODEX_QUEUE_MINIMAX_API_KEY` 这类运行时环境传入,禁止写入 `config.json`、Dockerfile、源码或测试文档。 +阅读 `AGENTS.md`(本项目 `AGENTS.md` 同时承担 `SKILL.md` 对 `scripts/cli.ts` 的解释职责),然后用 cli 手动测试以下内容:运行 `bun scripts/cli.ts microservice list`,确认 `codex-queue` 显示为 `providerId=main-server`、`public=false`、`frontendOnly=true`、仓库 URL `https://github.com/pikasTech/unidesk`、`codex-queue:4222` 后端映射和 `codex-queue-backend` 容器摘要;运行 `bun scripts/cli.ts server rebuild codex-queue` 并用 `bun scripts/cli.ts job status latest` 等待成功,再运行 `bun scripts/cli.ts microservice health codex-queue`、`bun scripts/cli.ts microservice proxy codex-queue /api/dev-ready --raw` 和 `bun scripts/cli.ts microservice proxy codex-queue /api/tasks`,确认链路通过 backend-core、main-server provider-gateway 和 Codex Queue 后端,且 `queue.devReady.ok=true`、`devReady.missingTools=[]`、`docker.versionOk=true`、`docker.composeOk=true`,必需工具包含 `docker`、`docker-compose`、`jq`、`ssh`、`rsync`、`pip3` 和 `unzip`;提交会产生较多命令输出的小任务后,`/health` 和 `/api/tasks` 仍必须在常规 CLI 超时内返回,容器内不得堆积无超时 healthcheck 进程。随后登录公网 frontend `http://74.48.78.17:18081/`,进入 `微服务 / Codex Queue`,确认页面显示默认模型 `gpt-5.4-mini`、默认工作目录 `/root/unidesk`、模型下拉菜单包含 `gpt-5.4-mini`/`gpt-5.4`/`gpt-5.5`、入队份数、队列指标、任务提交表单、Codex CLI-like 输出、attempt 表、MiniMax/fallback judge 状态、追加 prompt、打断和重试控件;通过页面提交一个小任务,确认任务进入 queued/running/succeeded 或可解释的 failed 状态,并且输出区能看到运行中的 Codex 消息。批量验收时设置 `入队份数=5` 或用 `---` 分隔 5 段 prompt,一次性入队 5 条任务,确认 5 条任务按顺序运行并全部进入 succeeded 或可解释的非成功终态,不能只运行第一条后停止。测试异常中断时可以提交长任务后点击 `打断`,确认任务变为 canceled 或被 judge 标记为非成功终态;自动重试只应在服务端/传输异常、任务正常结束但 execution record 显示未完成、或 judge 判定 retry 时发生;retry 必须复用已有 Codex thread 并 append 继续执行 prompt,只有当前任务 complete 后才推进队列中的下一个任务。MiniMax judge 必须能处理 Markdown fence/夹杂文本等 JSON 去噪;若去噪后仍失败,必须把解析错误和上一轮去噪前原始回答反馈给 MiniMax 修复后重试,日志中应出现 `judge_json_parse_retry`,且 repair 成功时仍以 `source=minimax` 返回。Codex provider key 只能通过 `OPENAI_API_KEY`、`CRS_OAI_KEY` 这类运行时环境透传,MiniMax API key 只能通过 `UNIDESK_CODEX_QUEUE_MINIMAX_API_KEY` 这类运行时环境传入,禁止写入 `config.json`、Dockerfile、源码或测试文档。 ## T24 MET Nonlinear D601 GPU Microservice diff --git a/config.json b/config.json index 44b84edd..8f6f3ba6 100644 --- a/config.json +++ b/config.json @@ -177,7 +177,7 @@ "/api/" ], "healthPath": "/health", - "timeoutMs": 30000 + "timeoutMs": 90000 }, "development": { "providerId": "D601", diff --git a/docker-compose.yml b/docker-compose.yml index b4c197ad..45c50ffc 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -105,10 +105,11 @@ services: HOST: "0.0.0.0" PORT: "4222" CODEX_QUEUE_STATE_PATH: "/var/lib/unidesk/codex-queue/state.json" - CODEX_QUEUE_WORKDIR: "/workspace" + CODEX_QUEUE_WORKDIR: "/root/unidesk" CODEX_QUEUE_CODEX_HOME: "/var/lib/unidesk/codex-queue/codex-home" CODEX_QUEUE_SOURCE_CODEX_CONFIG: "/root/.codex/config.toml" CODEX_QUEUE_DEFAULT_MODEL: "gpt-5.4-mini" + CODEX_QUEUE_MODELS: "gpt-5.4-mini,gpt-5.4,gpt-5.5" CODEX_QUEUE_SANDBOX: "danger-full-access" CODEX_QUEUE_APPROVAL_POLICY: "never" CODEX_QUEUE_MAX_ATTEMPTS: "3" @@ -117,9 +118,12 @@ services: MINIMAX_API_KEY: "${UNIDESK_CODEX_QUEUE_MINIMAX_API_KEY:-}" MINIMAX_API_BASE: "${UNIDESK_CODEX_QUEUE_MINIMAX_API_BASE:-https://api.minimaxi.com/v1}" MINIMAX_MODEL: "${UNIDESK_CODEX_QUEUE_MINIMAX_MODEL:-MiniMax-M2.7}" + MINIMAX_JUDGE_TIMEOUT_MS: "${UNIDESK_CODEX_QUEUE_MINIMAX_JUDGE_TIMEOUT_MS:-60000}" + MINIMAX_JUDGE_REPAIR_ATTEMPTS: "${UNIDESK_CODEX_QUEUE_MINIMAX_JUDGE_REPAIR_ATTEMPTS:-2}" LOG_FILE: "/var/log/unidesk/${UNIDESK_LOG_PREFIX}_codex-queue.jsonl" volumes: - /var/run/docker.sock:/var/run/docker.sock + - .:/root/unidesk - .:/workspace - /root/.codex/config.toml:/root/.codex/config.toml:ro - ${UNIDESK_LOG_DIR}:/var/log/unidesk @@ -127,7 +131,7 @@ services: extra_hosts: - "host.docker.internal:host-gateway" healthcheck: - test: ["CMD", "bun", "-e", "fetch('http://127.0.0.1:4222/health').then(r=>process.exit(r.ok?0:1)).catch(()=>process.exit(1))"] + test: ["CMD-SHELL", "curl -fsS --max-time 2 http://127.0.0.1:4222/health >/dev/null"] interval: 5s timeout: 3s retries: 20 diff --git a/docs/reference/frontend.md b/docs/reference/frontend.md index 8ee4fb59..7bbb89f6 100644 --- a/docs/reference/frontend.md +++ b/docs/reference/frontend.md @@ -58,7 +58,7 @@ frontend shell 必须把左侧主模块与顶部子标签编译为统一的 URL - `服务目录` 必须显示 service id、Provider、仓库 URL、commit id、业务 Dockerfile/docker-compose 引用、节点后端私有映射、SSH 透传开发入口和运行态容器摘要。 - `Todo Note` 子标签必须把主 server `todo-note-backend` 后端渲染为 UniDesk React 控件,包括迁移清单、树形任务、筛选、提醒、拖放/移动、撤销/重做、字号控制和显式原始 JSON 按钮。 - `FindJob` 子标签必须把 D601 findjob 后端渲染为 UniDesk React 控件,包括岗位指标、岗位预览、草稿报告和显式原始 JSON 按钮。 - - `Codex Queue` 子标签必须把主 server `codex-queue-backend` 后端渲染为 UniDesk React 控件,包括串行队列、任务提交/批量提交、默认模型 `gpt-5.4-mini`、MiniMax judge 状态、Codex CLI-like 输出流、attempt 终态、运行中追加 prompt、打断、手动重试和显式原始 JSON 按钮。 + - `Codex Queue` 子标签必须把主 server `codex-queue-backend` 后端渲染为 UniDesk React 控件,包括串行队列、任务提交/批量提交、模型下拉、显式入队份数、默认模型 `gpt-5.4-mini`、MiniMax judge 状态、Codex CLI-like 输出流、attempt 终态、运行中追加 prompt、打断、手动重试和显式原始 JSON 按钮;连续执行同一 prompt 应通过入队份数一次性生成多条任务,避免快速连点造成操作员误判。 - 业务 microservice 页面不得 iframe 业务旧前端、Todo Note 原 Vite 前端或 Pipeline 自身 WebUI,不得把 microservice 后端端口暴露为浏览器直连 URL,也不得把业务 API 的 JSON 裸铺在页面上。 - `Pipeline` 子标签是 D601 `/home/ubuntu/pipeline` 的 UniDesk host UI。 - Pipeline 仓库自带 WebUI 前端已经废弃;UniDesk frontend 是唯一用户可见的 Pipeline UI。 diff --git a/docs/reference/microservices.md b/docs/reference/microservices.md index bbbe206e..32b72939 100644 --- a/docs/reference/microservices.md +++ b/docs/reference/microservices.md @@ -56,15 +56,16 @@ Todo Note 数据迁移后必须验证:`microservice proxy todo-note /api/insta - 代码引用:`https://github.com/pikasTech/unidesk` 与配置中的 `repository.commitId`;服务源码位于 `src/components/microservices/codex-queue`,属于 UniDesk 自有控制面组件。 - 部署引用:UniDesk 根仓库 `docker-compose.yml` 中的 `codex-queue` service,Dockerfile 为 `src/components/microservices/codex-queue/Dockerfile`,容器名为 `codex-queue-backend`。 - Codex 认证:容器只从主 server 的 `/root/.codex/config.toml` 同步 Codex provider 配置到 `.state/codex-queue/codex-home`,并通过运行时环境透传 `OPENAI_API_KEY`、`CRS_OAI_KEY` 等 provider 所需变量;新增 provider 的 `env_key` 时必须增加同类运行时透传,禁止把 Codex 或 MiniMax 密钥写入仓库文件。 +- Develop-ready 镜像:Codex Queue 镜像必须在启动前预装 UniDesk/Pipeline 调试所需工具,至少包含 `codex`、`bun`、`node`、`npm`/`npx`、`git`、`rg`、`curl`、`python3`/`pip3`、`docker`、`docker compose`、`docker-compose`、`jq`、`ssh`、`rsync`、`make`、`gcc`/`g++`、`tar`、`gzip` 和 `unzip`;不得依赖 Codex 任务运行时再 `apt-get install` 这些基础环境。 - Codex 控制:服务内部启动 `codex app-server --listen stdio://`,用 JSON-RPC 调用 `thread/start`、`turn/start`、`turn/steer` 和 `turn/interrupt`,并监听 `turn/completed`、assistant delta、reasoning delta、command output delta、file diff delta 等通知生成前端可轮询的 transcript。 -- 队列语义:`POST /api/tasks` 或 `/api/tasks/batch` 入队,服务始终只运行一个 Codex turn;当前任务真正终止后才推进下一个任务。`GET /api/tasks` 与 `GET /api/tasks/{id}` 返回队列、attempt、judge 和输出;`POST /api/tasks/{id}/steer` 向运行中 turn 推入 prompt;`POST /api/tasks/{id}/interrupt` 或 `DELETE /api/tasks/{id}` 打断/取消;`POST /api/tasks/{id}/retry` 手动重试。 -- 完成判定:app-server `turn/completed` 的 `turn.status=completed|interrupted|failed` 只代表 Codex turn 已结束;即使 `completed` 也必须把原始任务、assistant 最终回复、command/file-change 事件、stderr tail 和 recent events 组成 execution record 交给 judge 判断是否真的完成。配置了 `UNIDESK_CODEX_QUEUE_MINIMAX_API_KEY` 时使用 MiniMax `MiniMax-M2.7` 判定 `complete|retry|fail`,否则使用 fallback 规则。 +- 队列语义:`POST /api/tasks` 或 `/api/tasks/batch` 入队,服务始终只运行一个 Codex turn;当前任务真正终止后才推进下一个任务。`GET /api/tasks` 与 `GET /api/tasks/{id}` 返回队列、attempt、judge 和输出;`POST /api/tasks/{id}/steer` 向运行中 turn 推入 prompt;`POST /api/tasks/{id}/interrupt` 或 `DELETE /api/tasks/{id}` 打断/取消;`POST /api/tasks/{id}/retry` 手动重试。队列 worker 必须隔离单个 task 的异常,不能因为某个 app-server 或 judge 异常让后续 queued 任务停止;当存在 queued/retry_wait 且 worker 空闲时,watchdog 必须自动重新调度。 +- 完成判定:app-server `turn/completed` 的 `turn.status=completed|interrupted|failed` 只代表 Codex turn 已结束;即使 `completed` 也必须把原始任务、assistant 最终回复、command/file-change 事件、stderr tail 和 recent events 组成 execution record 交给 judge 判断是否真的完成。配置了 `UNIDESK_CODEX_QUEUE_MINIMAX_API_KEY` 时使用 MiniMax `MiniMax-M2.7` 判定 `complete|retry|fail`,否则使用 fallback 规则。MiniMax 返回必须先做 JSON 去噪,支持去除 Markdown fence、`json` 标签和从夹杂文本中提取平衡 JSON object;如果去噪后仍无法解析,服务必须把解析错误和上一轮去噪前原始回答反馈给 MiniMax 做 JSON repair 重试,重试次数由 `UNIDESK_CODEX_QUEUE_MINIMAX_JUDGE_REPAIR_ATTEMPTS` 控制,默认 `2`,耗尽后才进入 fallback,并在 fallback 原因中保留 MiniMax 失败信息。 - Retry/推进语义:`retry` 不是新开一个独立任务或完全新 session;只要已有 `codexThreadId`,服务必须 `thread/resume` 原 thread 并 append 一个继续执行 prompt。只有 judge 判定 `complete` 后,队列 worker 才把当前任务标为成功并推进下一个 queued/retry_wait 任务。 - Judge 探针:`GET|POST /api/judge/probe` 使用同一套 judge 逻辑跑内置 synthetic execution records,覆盖正常完成、正常结束但只给计划、传输中断和用户打断四类样本,返回 `hits`、`total`、`hitRate`、每例 `expected` 与 `decision`;该接口不得回显 MiniMax API key。 -- 模型选择:默认 Codex 模型是 `gpt-5.4-mini`,每个入队任务可覆盖 `model`、`cwd`、`reasoningEffort` 和 `maxAttempts`。 -- 状态与日志:队列状态保存在 `.state/codex-queue/state.json` 对应的容器挂载路径,日志写入 UniDesk `logs/{YYYYMMDD}/..._codex-queue.jsonl`,`/logs` 端点返回最近结构化日志。 +- 模型选择:默认 Codex 模型是 `gpt-5.4-mini`,内置模型队列包含 `gpt-5.4-mini`、`gpt-5.4`、`gpt-5.5`;每个入队任务可通过前端模型下拉菜单或 API 覆盖 `model`、`cwd`、`reasoningEffort` 和 `maxAttempts`。 +- 状态与日志:默认工作目录为容器内 `/root/unidesk`,该路径映射主 server 的 `~/unidesk`;同时保留 `/workspace` 映射以兼容历史任务。队列状态保存在 `.state/codex-queue/state.json` 对应的容器挂载路径,日志写入 UniDesk `logs/{YYYYMMDD}/..._codex-queue.jsonl`,`/logs` 端点返回最近结构化日志。`/health` 的 `queue.devReady` 和 `/api/dev-ready` 必须暴露 develop-ready 自检,包括必需工具、Docker socket、`docker compose`、默认工作目录和 Codex config 状态。Codex CLI-like 输出可能很大,服务必须对输出条数设上限并节流状态持久化,禁止对每个 output delta 同步重写完整 state 导致 `/health` 和控制 API 卡死;容器 healthcheck 必须使用带超时的 HTTP 探针,不能留下堆积的无超时探针进程。 - 代理路径:只允许 `/health`、`/logs` 和 `/api/` 前缀;允许方法为 `GET`、`HEAD`、`POST`、`DELETE`。Codex Queue 只在 Compose 内网暴露 `4222/tcp`,不得映射或开放到公网。 -- UniDesk 前端:`微服务 / Codex Queue` React 页面负责展示队列卡片、默认模型、MiniMax judge 状态、Codex CLI-like 输出流、attempt 终态、追加 prompt、打断和手动重试控件;原始任务 JSON 只能通过显式 `查看原始JSON` 打开。 +- UniDesk 前端:`微服务 / Codex Queue` React 页面负责展示队列卡片、默认模型、模型下拉、显式入队份数、MiniMax judge 状态、Codex CLI-like 输出流、attempt 终态、追加 prompt、打断和手动重试控件;连续执行同一 prompt 应使用 `入队份数` 一次性生成多条队列任务,而不是依赖快速连点按钮;原始任务 JSON 只能通过显式 `查看原始JSON` 打开。 ## D601 Microservices @@ -157,7 +158,7 @@ microservice 交付必须同时通过后端、CLI 和公网 frontend 验证: - 运行 `bun scripts/cli.ts microservice health pipeline` 与 `bun scripts/cli.ts microservice proxy pipeline '/api/snapshot?__unideskArrayLimit=registry.components:8,runs:3'`,确认真实链路经过 backend-core、WebSocket、D601 provider-gateway 和 D601 本机 Pipeline 后端,且 run/procedure 摘要包含甘特图所需时间字段。 - 运行 `bun scripts/cli.ts microservice health met-nonlinear`、`bun scripts/cli.ts microservice proxy met-nonlinear /api/queue`、`bun scripts/cli.ts microservice proxy met-nonlinear '/api/projects?root=projects&limit=20'` 和 `bun scripts/cli.ts microservice proxy met-nonlinear /api/images`,确认真实链路经过 backend-core、WebSocket、D601 provider-gateway 和 D601 本机 MET Nonlinear TS 后端。 - 运行 `bun scripts/cli.ts microservice health todo-note` 与 `bun scripts/cli.ts microservice proxy todo-note /api/instances`,确认真实链路经过 backend-core、WebSocket、main-server provider-gateway 和主 server `todo-note-backend` 后端;输出中必须包含五个迁移清单和 PostgreSQL 存储健康状态。 -- 运行 `bun scripts/cli.ts microservice health codex-queue` 与 `bun scripts/cli.ts microservice proxy codex-queue /api/tasks`,确认真实链路经过 backend-core、WebSocket、main-server provider-gateway 和主 server `codex-queue-backend` 后端;再通过 frontend 提交一个 `gpt-5.4-mini` 小任务,确认队列串行推进、输出实时更新、结束后有 judge 判定,且运行中可追加 prompt 或打断。 +- 运行 `bun scripts/cli.ts microservice health codex-queue` 与 `bun scripts/cli.ts microservice proxy codex-queue /api/tasks`,确认真实链路经过 backend-core、WebSocket、main-server provider-gateway 和主 server `codex-queue-backend` 后端;再通过公网 frontend 提交一个 `gpt-5.4-mini` 小任务,确认队列串行推进、输出实时更新、结束后有 judge 判定,且运行中可追加 prompt 或打断。批量验收必须通过公网 frontend 设置 `入队份数=5` 或使用多段 prompt 分隔,一次性入队 5 条任务,并确认 5 条任务按顺序进入 running/judging/succeeded,而不是只运行第一条。 - 在 D601 上用 `bun scripts/cli.ts ssh D601 ...` 调试业务仓库和容器,确认 `curl http://127.0.0.1:3254/api/health` 可用;不要把调试服务部署到主 server。 - 在 D601 上用 `bun scripts/cli.ts ssh D601 ...` 调试业务仓库和容器,确认 `curl http://127.0.0.1:18082/health` 和 `curl http://127.0.0.1:18082/api/snapshot` 可用;不要把 Pipeline 调试服务部署到主 server。 - 在 D601 上用 `bun scripts/cli.ts ssh D601 ...` 调试 `~/met_nonlinear`,确认 `curl http://127.0.0.1:3288/health` 可用;最终验收必须回到公网 UniDesk frontend,通过项目库选择、Fork、加入待启动队列和启动队列完成,不要把 MET Nonlinear 后端、Docker build 或训练任务部署到主 server。 diff --git a/scripts/src/docker.ts b/scripts/src/docker.ts index 7f531cec..db383639 100644 --- a/scripts/src/docker.ts +++ b/scripts/src/docker.ts @@ -113,7 +113,8 @@ export function writeComposeEnv(config: UniDeskConfig, freshLogPrefix: boolean): UNIDESK_HOST_SSH_USER: config.sshForwarding.user, UNIDESK_CODEX_QUEUE_MINIMAX_API_KEY: runtimeSecret("UNIDESK_CODEX_QUEUE_MINIMAX_API_KEY"), UNIDESK_CODEX_QUEUE_MINIMAX_MODEL: runtimeSecret("UNIDESK_CODEX_QUEUE_MINIMAX_MODEL") || "MiniMax-M2.7", - UNIDESK_CODEX_QUEUE_MINIMAX_API_BASE: runtimeSecret("UNIDESK_CODEX_QUEUE_MINIMAX_API_BASE") || "https://api.minimax.io/v1", + UNIDESK_CODEX_QUEUE_MINIMAX_API_BASE: runtimeSecret("UNIDESK_CODEX_QUEUE_MINIMAX_API_BASE") || "https://api.minimaxi.com/v1", + UNIDESK_CODEX_QUEUE_MINIMAX_JUDGE_TIMEOUT_MS: runtimeSecret("UNIDESK_CODEX_QUEUE_MINIMAX_JUDGE_TIMEOUT_MS") || "60000", }; writeFileSync(envFile, Object.entries(lines).map(([key, value]) => `${key}=${envValue(value)}`).join("\n") + "\n", "utf8"); return { envFile, logDir, logPrefix }; diff --git a/scripts/src/e2e.ts b/scripts/src/e2e.ts index 70ee99e2..77b18ee7 100644 --- a/scripts/src/e2e.ts +++ b/scripts/src/e2e.ts @@ -1113,8 +1113,11 @@ async function frontendCheck(config: UniDeskConfig, urls: PublicUrls, checks: E2 const lower = text.toLowerCase(); return lower.includes("codex queue 控制台") && text.includes("gpt-5.4-mini") + && text.includes("gpt-5.4") + && text.includes("gpt-5.5") && text.includes("仅 UniDesk frontend 代理访问") && text.includes("提交任务") + && text.includes("入队份数") && text.includes("追加 prompt") && text.includes("打断") && lower.includes("attempts"); @@ -1412,7 +1415,7 @@ async function frontendCheck(config: UniDeskConfig, urls: PublicUrls, checks: E2 addSelectedCheck(checks, options, "frontend:microservice-catalog-visible", microserviceCatalogTextLower.includes("findjob") && microserviceCatalogTextLower.includes("pipeline") && microserviceCatalogTextLower.includes("todo note") && microserviceCatalogTextLower.includes("met nonlinear") && microserviceCatalogTextLower.includes("codex queue") && microserviceCatalogText.includes("D601") && microserviceCatalogText.includes(config.providerGateway.id) && microserviceCatalogTextLower.includes("private") && microserviceCatalogText.includes("https://gitee.com/Lyon1998/findjob") && microserviceCatalogText.includes("https://github.com/pikasTech/pipeline") && microserviceCatalogText.includes("https://github.com/pikasTech/met_nonlinear") && microserviceCatalogText.includes("https://gitee.com/Lyon1998/todo_note") && microserviceCatalogText.includes("https://github.com/pikasTech/unidesk"), { microserviceCatalogPreview: microserviceCatalogText.slice(0, 1800) }); addSelectedCheck(checks, options, "frontend:todo-note-integrated-visible", todoNoteTextLower.includes("todo note 工作台") && todoNoteText.includes("CONSTAR") && todoNoteText.includes("大论文") && todoNoteText.includes("UI E2E smoke task") && todoNoteText.includes("撤销") && todoNoteText.includes("重做") && todoNoteText.includes("全部展开") && todoNoteText.includes("仅 UniDesk frontend 代理访问"), { todoNoteTextPreview: todoNoteText.slice(0, 1400) }); addSelectedCheck(checks, options, "frontend:findjob-integrated-visible", findjobTextLower.includes("findjob 工作台".toLowerCase()) && findjobText.includes("岗位总量") && findjobText.includes("D601") && findjobText.includes("近期岗位") && findjobText.includes("仅 UniDesk frontend 代理访问") && /岗位总量\s+\d+/.test(findjobText) && /health\s+ok/i.test(findjobText) && /[1-9]\d*\/[1-9]\d*\s+preview/i.test(findjobText), { findjobTextPreview: findjobText.slice(0, 1200) }); - addSelectedCheck(checks, options, "frontend:codex-queue-integrated-visible", codexQueueTextLower.includes("codex queue 控制台".toLowerCase()) && codexQueueText.includes("gpt-5.4-mini") && codexQueueText.includes("提交任务") && codexQueueText.includes("追加 prompt") && codexQueueText.includes("打断") && codexQueueTextLower.includes("attempts") && codexQueueText.includes("仅 UniDesk frontend 代理访问"), { codexQueueTextPreview: codexQueueText.slice(0, 1400) }); + addSelectedCheck(checks, options, "frontend:codex-queue-integrated-visible", codexQueueTextLower.includes("codex queue 控制台".toLowerCase()) && codexQueueText.includes("gpt-5.4-mini") && codexQueueText.includes("gpt-5.4") && codexQueueText.includes("gpt-5.5") && codexQueueText.includes("提交任务") && codexQueueText.includes("入队份数") && codexQueueText.includes("追加 prompt") && codexQueueText.includes("打断") && codexQueueTextLower.includes("attempts") && codexQueueText.includes("仅 UniDesk frontend 代理访问"), { codexQueueTextPreview: codexQueueText.slice(0, 1400) }); addSelectedCheck(checks, options, "frontend:url-route-deeplink", routeInitialPath === "/app/pipeline/" && routeDockerPath === "/nodes/docker/" && routeBackPath === "/app/pipeline/" && routeOverviewPath === "/ops/status/" && routeDeepLinkText.toLowerCase().includes("pipeline v2 工作台".toLowerCase()) && routeOverviewText.includes("核心指标"), { routeInitialPath, routeDockerPath, routeBackIntermediatePath, routeBackPath, routeOverviewPath, routeDeepLinkPreview: routeDeepLinkText.slice(0, 1200), routeOverviewPreview: routeOverviewText.slice(0, 800) }); addSelectedCheck(checks, options, "frontend:pipeline-integrated-visible", pipelineTextLower.includes("pipeline v2 工作台".toLowerCase()) && pipelineText.includes("D601") && pipelineText.includes("控制图") && pipelineText.includes("评分器") && /epoch\s+甘特图/i.test(pipelineText) && pipelineText.includes("运行材料索引") && pipelineText.includes("仅 UniDesk frontend 代理访问") && /Health\s+OK/i.test(pipelineText) && /组件\s+\d+/.test(pipelineText) && /运行记录\s+[1-9]\d*/.test(pipelineText), { pipelineTextPreview: pipelineText.slice(0, 1200) }); addSelectedCheck(checks, options, "frontend:pipeline-react-flow-visible", pipelineFlowNodeCount > 0 && pipelineFlowEdgeCount > 0, { pipelineFlowNodeCount, pipelineFlowEdgeCount }); diff --git a/src/components/backend-core/src/index.ts b/src/components/backend-core/src/index.ts index dea5508a..a1baf1bc 100644 --- a/src/components/backend-core/src/index.ts +++ b/src/components/backend-core/src/index.ts @@ -1252,6 +1252,7 @@ markStaleTasksFailed().catch((error) => logger("error", "task_timeout_sweep_fail const apiServer = Bun.serve({ port: config.port, hostname: "0.0.0.0", + idleTimeout: 120, fetch: route, websocket: { open(ws) { diff --git a/src/components/frontend/public/style.css b/src/components/frontend/public/style.css index 872e4e73..f8ef617f 100644 --- a/src/components/frontend/public/style.css +++ b/src/components/frontend/public/style.css @@ -1217,10 +1217,69 @@ input:focus, select:focus, textarea:focus { border-color: var(--accent-2); } } .codex-queue-layout { display: grid; - grid-template-columns: minmax(320px, 0.52fr) minmax(680px, 1.58fr); + grid-template-columns: minmax(300px, 0.52fr) minmax(0, 1.58fr); gap: 10px; align-items: start; } +.codex-session-stage { + min-width: 0; + width: 100%; +} +.codex-session-stage .codex-output-panel { + width: 100%; +} +.codex-session-stage .codex-transcript { + min-height: 620px; + max-height: calc(100vh - 230px); +} +.codex-session-shell { + display: grid; + grid-template-columns: minmax(280px, 0.34fr) minmax(0, 1fr); + min-width: 0; + align-items: stretch; +} +.codex-session-shell.queue-collapsed { + grid-template-columns: minmax(0, 1fr); +} +.codex-session-sidebar { + display: grid; + grid-template-rows: auto minmax(0, 1fr); + gap: 8px; + min-width: 0; + padding: 10px; + border-right: 1px solid var(--line); + background: + radial-gradient(circle at 0 0, rgba(78, 183, 168, 0.11), transparent 42%), + rgba(6, 10, 13, 0.72); +} +.codex-session-sidebar-head { + display: flex; + align-items: center; + justify-content: space-between; + gap: 8px; + min-width: 0; +} +.codex-session-sidebar-head > div { + display: grid; + gap: 2px; + min-width: 0; +} +.codex-session-sidebar-head span { + color: var(--muted); + font-size: 10px; + letter-spacing: 0.16em; + text-transform: uppercase; +} +.codex-session-sidebar-head strong { + min-width: 0; + color: var(--text); + overflow: hidden; + text-overflow: ellipsis; + white-space: nowrap; +} +.codex-session-main { + min-width: 0; +} .codex-left-rail, .codex-main-stage, .codex-detail-grid, @@ -1229,11 +1288,29 @@ input:focus, select:focus, textarea:focus { border-color: var(--accent-2); } display: grid; gap: 10px; } +.codex-compose-panel, +.codex-compose-panel .panel-body, +.codex-task-form, +.codex-task-form label { + min-width: 0; + overflow: hidden; +} .codex-form-grid { display: grid; - grid-template-columns: minmax(120px, 0.8fr) minmax(180px, 1fr) 92px; + grid-template-columns: repeat(2, minmax(0, 1fr)); gap: 8px; } +.codex-form-grid label:nth-child(2) { + grid-column: 1 / -1; +} +.codex-task-form textarea, +.codex-steer-form textarea, +.codex-form-grid input, +.codex-form-grid select { + min-width: 0; + max-width: 100%; + box-sizing: border-box; +} .codex-task-list { display: grid; gap: 7px; @@ -1242,6 +1319,46 @@ input:focus, select:focus, textarea:focus { border-color: var(--accent-2); } overflow: auto; align-content: start; } +.codex-task-list-session { + min-height: 0; + max-height: calc(100vh - 318px); +} +.codex-task-section { + display: grid; + gap: 7px; + min-width: 0; +} +.codex-task-section + .codex-task-section { + margin-top: 6px; + padding-top: 9px; + border-top: 1px solid rgba(255,255,255,0.07); +} +.codex-task-section-head { + display: flex; + align-items: center; + justify-content: space-between; + gap: 8px; + color: var(--muted); + font-size: 11px; + letter-spacing: 0.08em; + text-transform: uppercase; +} +.codex-task-section-head code { + color: var(--accent-2); +} +.codex-task-section-list { + display: grid; + gap: 7px; + min-width: 0; +} +.codex-task-section-empty { + margin: 0; + padding: 8px 9px; + border: 1px dashed var(--line-soft); + color: var(--muted); + font-size: 12px; + background: rgba(255,255,255,0.02); +} .codex-task-card { display: grid; gap: 6px; @@ -1282,6 +1399,10 @@ input:focus, select:focus, textarea:focus { border-color: var(--accent-2); } .codex-output-panel .panel-body { padding: 0; } +.codex-output-stack { + display: grid; + min-width: 0; +} .codex-transcript { min-height: 520px; max-height: calc(100vh - 300px); @@ -1300,6 +1421,87 @@ input:focus, select:focus, textarea:focus { border-color: var(--accent-2); } color: var(--muted); background: rgba(255,255,255,0.025); } +.codex-transcript-item { + display: grid; + grid-template-columns: 22px minmax(0, 1fr); + gap: 8px; + padding: 9px 0; + border-bottom: 1px solid rgba(255,255,255,0.045); +} +.codex-transcript-bullet { + color: var(--accent); + font-size: 20px; + line-height: 1.1; + text-align: center; +} +.codex-transcript-main { + display: grid; + gap: 6px; + min-width: 0; +} +.codex-transcript-title { + display: flex; + align-items: center; + gap: 7px; + min-width: 0; + color: var(--text); +} +.codex-transcript-title strong { + min-width: 0; + overflow: hidden; + text-overflow: ellipsis; + white-space: nowrap; +} +.codex-transcript-title time { + margin-left: auto; + color: var(--muted); + font-size: 10px; + white-space: nowrap; +} +.codex-transcript-title code { + max-width: 90px; + overflow: hidden; + text-overflow: ellipsis; + white-space: nowrap; + color: var(--accent-2); +} +.codex-transcript-command, +.codex-transcript-body { + margin: 0; + white-space: pre-wrap; + overflow-wrap: anywhere; + border-left: 2px solid rgba(78, 183, 168, 0.26); + padding: 2px 0 2px 10px; + color: #d9e8e7; + font-size: 12px; + line-height: 1.48; +} +.codex-transcript-command { + display: block; + width: 100%; + margin-top: 2px; + color: #a7c7c3; + border-color: rgba(215, 161, 58, 0.34); + background: rgba(215, 161, 58, 0.035); +} +.codex-transcript-item.explored .codex-output-channel { color: #8fc7ee; border-color: rgba(105, 174, 232, 0.46); background: rgba(105, 174, 232, 0.08); } +.codex-transcript-item.edited .codex-output-channel { color: #b6da89; border-color: rgba(182, 218, 137, 0.42); background: rgba(182, 218, 137, 0.07); } +.codex-transcript-item.error .codex-output-channel { color: var(--danger); border-color: rgba(207, 106, 84, 0.52); background: rgba(207, 106, 84, 0.08); } +.codex-raw-output { + border-top: 1px solid var(--line); + background: rgba(6, 10, 13, 0.92); +} +.codex-raw-output summary { + cursor: pointer; + padding: 8px 12px; + color: var(--muted); + font-size: 12px; +} +.codex-raw-output > div { + max-height: 360px; + overflow: auto; + padding: 0 12px 12px; +} .codex-output-line { display: grid; grid-template-columns: 130px minmax(0, 1fr); @@ -1344,6 +1546,46 @@ input:focus, select:focus, textarea:focus { border-color: var(--accent-2); } .codex-detail-grid { grid-template-columns: minmax(320px, 1fr) minmax(320px, 1fr); } +.codex-prompt-panel { + grid-column: 1 / -1; +} +.codex-prompt-detail { + display: grid; + gap: 9px; + min-width: 0; +} +.codex-prompt-meta { + display: flex; + flex-wrap: wrap; + gap: 6px; + align-items: center; + min-width: 0; +} +.codex-prompt-meta span:not(.status-badge) { + max-width: 100%; + padding: 3px 7px; + border: 1px solid var(--line-soft); + color: var(--muted); + background: rgba(255,255,255,0.025); + font-size: 11px; + overflow-wrap: anywhere; +} +.codex-prompt-full { + max-height: 360px; + margin: 0; + padding: 10px; + overflow: auto; + white-space: pre-wrap; + overflow-wrap: anywhere; + border: 1px solid rgba(215, 161, 58, 0.28); + border-left: 3px solid rgba(215, 161, 58, 0.58); + color: #e7ded0; + background: + linear-gradient(90deg, rgba(215, 161, 58, 0.08), transparent 34%), + rgba(6, 10, 13, 0.84); + font-size: 12px; + line-height: 1.5; +} .codex-judge-card { display: grid; gap: 8px; @@ -3250,7 +3492,8 @@ input:focus, select:focus, textarea:focus { border-color: var(--accent-2); } .pipeline-oa-guarantees { grid-template-columns: repeat(2, minmax(0, 1fr)); } .dispatch-form { grid-template-columns: 1fr 1fr; } .dispatch-actions { align-items: center; } - .page-grid, .docker-layout, .monitor-layout, .findjob-grid, .findjob-hero, .pipeline-grid, .pipeline-hero, .met-grid, .met-form-grid, .codex-queue-layout, .codex-queue-hero, .codex-detail-grid { grid-template-columns: 1fr; } + .page-grid, .docker-layout, .monitor-layout, .findjob-grid, .findjob-hero, .pipeline-grid, .pipeline-hero, .met-grid, .met-form-grid, .codex-queue-layout, .codex-queue-hero, .codex-detail-grid, .codex-session-shell { grid-template-columns: 1fr; } + .codex-session-sidebar { border-right: 0; border-bottom: 1px solid var(--line); } .pipeline-control-shell { grid-template-columns: 1fr; } .pipeline-node-control { max-height: none; min-height: 0; } .findjob-grid .panel:nth-child(3), .pipeline-grid .panel:nth-child(3), .pipeline-grid .panel:nth-child(5), .met-grid .panel:nth-child(3), .met-grid .panel:nth-child(5), .met-detail-panel { grid-column: 1; } diff --git a/src/components/frontend/src/codex-queue.tsx b/src/components/frontend/src/codex-queue.tsx index 343a1201..6dbfa304 100644 --- a/src/components/frontend/src/codex-queue.tsx +++ b/src/components/frontend/src/codex-queue.tsx @@ -33,11 +33,16 @@ async function requestJson(path: string, options: AnyRecord = {}): Promise const response = await fetch(path, { credentials: "same-origin", ...options, body, headers }); const text = await response.text(); let payload = null; + let parseFailed = false; try { payload = text ? JSON.parse(text) : null; } catch { + parseFailed = true; payload = { text }; } + if (parseFailed) { + throw new Error(`Codex Queue 返回了无效 JSON(${text.length} bytes),可能是代理响应过大或被截断`); + } if (!response.ok || payload?.ok === false) { const message = payload?.error?.message || payload?.error || `HTTP ${response.status}`; const error = new Error(message); @@ -106,10 +111,62 @@ function taskRows(data: any): any[] { return Array.isArray(data?.tasks) ? data.tasks : []; } +function taskSortValue(task: any): number { + const time = Date.parse(String(task?.updatedAt || task?.createdAt || "")); + return Number.isFinite(time) ? time : 0; +} + +function mergeTaskRows(groups: any[][], activeTaskId = ""): any[] { + const byId = new Map(); + for (const group of groups) { + for (const task of group) { + const id = String(task?.id || ""); + if (id.length > 0 && !byId.has(id)) byId.set(id, task); + } + } + const statusRank: Record = { running: 0, judging: 1, retry_wait: 2, queued: 3 }; + return Array.from(byId.values()).sort((left, right) => { + const leftActive = String(left?.id || "") === activeTaskId ? 0 : 1; + const rightActive = String(right?.id || "") === activeTaskId ? 0 : 1; + if (leftActive !== rightActive) return leftActive - rightActive; + const rankDelta = (statusRank[String(left?.status || "")] ?? 9) - (statusRank[String(right?.status || "")] ?? 9); + if (rankDelta !== 0) return rankDelta; + return taskSortValue(right) - taskSortValue(left); + }); +} + +async function loadTaskQueue(apiBaseUrl: string, healthResult: any): Promise { + const statuses = ["running", "judging", "retry_wait", "queued"]; + const results = await Promise.all(statuses.map(async (status) => { + try { + return await requestJson(codexApi(apiBaseUrl, `/api/tasks?status=${encodeURIComponent(status)}&limit=80`)); + } catch { + return null; + } + })); + const historyResult = await requestJson(codexApi(apiBaseUrl, "/api/tasks?limit=160")).catch(() => null); + const queue = results.find((item) => item?.queue)?.queue || historyResult?.queue || healthResult?.queue || healthResult?.body?.queue || {}; + const rows = mergeTaskRows([...results.map((item) => taskRows(item)), taskRows(historyResult)], String(queue?.activeTaskId || "")); + if (rows.length > 0) return { ok: true, queue, tasks: rows }; + return requestJson(codexApi(apiBaseUrl, "/api/tasks?limit=5")); +} + function taskOutput(task: any): any[] { return Array.isArray(task?.output) ? task.output : []; } +function taskTranscript(task: any): any[] { + if (Array.isArray(task?.transcript)) return task.transcript; + return taskOutput(task).map((item: any) => ({ + seq: item.seq, + at: item.at, + kind: item.channel === "error" ? "error" : item.channel === "command" ? "ran" : "message", + title: item.method || item.channel || "message", + bodyPreview: String(item.text || ""), + rawSeqs: [item.seq], + })); +} + function taskAttempts(task: any): any[] { return Array.isArray(task?.attempts) ? task.attempts : []; } @@ -125,6 +182,11 @@ function splitPromptTasks(prompt: string): string[] { .filter(Boolean); } +function repeatCountValue(value: any): number { + const count = Number(value); + return Number.isFinite(count) ? Math.max(1, Math.min(50, Math.floor(count))) : 1; +} + function channelLabel(channel: string): string { const labels: Record = { system: "SYS", @@ -139,15 +201,63 @@ function channelLabel(channel: string): string { return labels[channel] || channel.toUpperCase(); } +function transcriptKindLabel(kind: string): string { + const labels: Record = { + ran: "Ran", + explored: "Explored", + edited: "Edited", + plan: "Plan", + message: "Message", + system: "System", + error: "Error", + }; + return labels[kind] || "Message"; +} + +function omittedLabel(lines: any): string { + const count = Number(lines || 0); + return Number.isFinite(count) && count > 0 ? `… +${Math.floor(count)} lines` : ""; +} + function taskIsActive(task: any): boolean { return ["running", "judging", "retry_wait"].includes(String(task?.status || "")); } +function taskIsTerminal(task: any): boolean { + return ["succeeded", "failed", "canceled"].includes(String(task?.status || "")); +} + +function taskHasDetail(task: any): boolean { + return Boolean(task?._detailLoaded) + || (Array.isArray(task?.transcript) && task.transcript.length > 0) + || (Array.isArray(task?.output) && task.output.length > 0) + || (Array.isArray(task?.events) && task.events.length > 0); +} + +function mergeTranscriptRows(existing: any[], incoming: any[]): any[] { + const byKey = new Map(); + for (const item of [...(Array.isArray(existing) ? existing : []), ...(Array.isArray(incoming) ? incoming : [])]) { + const key = `${Number(item?.seq ?? 0)}:${String(item?.kind || "message")}`; + byKey.set(key, item); + } + return Array.from(byKey.values()).sort((left, right) => Number(left?.seq ?? 0) - Number(right?.seq ?? 0)); +} + +function transcriptMaxSeq(transcript: any[]): number { + return (Array.isArray(transcript) ? transcript : []).reduce((max, item) => Math.max(max, Number(item?.seq ?? 0)), 0); +} + function countValue(counts: AnyRecord, key: string): string { const value = Number(counts[key] ?? 0); return Number.isFinite(value) ? String(value) : "0"; } +function codexModelOptions(queue: any, currentModel: string): string[] { + const configured = Array.isArray(queue?.codexModels) ? queue.codexModels : []; + const fallback = ["gpt-5.4-mini", "gpt-5.4", "gpt-5.5"]; + return Array.from(new Set([...configured, ...fallback, currentModel].map((item) => String(item || "").trim()).filter(Boolean))); +} + function TaskCard({ task, selected, onSelect }: AnyRecord) { const judge = task?.lastJudge || {}; return h("button", { @@ -169,22 +279,95 @@ function TaskCard({ task, selected, onSelect }: AnyRecord) { ); } -function Transcript({ task, autoScroll }: AnyRecord) { +function TaskListSection({ title, tasks, selectedId, onSelect, emptyText }: AnyRecord) { + const rows = Array.isArray(tasks) ? tasks : []; + return h("section", { className: "codex-task-section" }, + h("div", { className: "codex-task-section-head" }, + h("span", null, title), + h("code", null, String(rows.length)), + ), + rows.length === 0 + ? h("p", { className: "codex-task-section-empty" }, emptyText) + : h("div", { className: "codex-task-section-list" }, rows.map((task: any) => h(TaskCard, { + key: task.id, + task, + selected: selectedId === task.id, + onSelect: () => onSelect(task.id), + }))), + ); +} + +function Transcript({ task, autoScroll, loading }: AnyRecord) { const ref = useRef(null); - const output = taskOutput(task); + const transcript = taskTranscript(task); useEffect(() => { if (autoScroll && ref.current) ref.current.scrollTop = ref.current.scrollHeight; - }, [autoScroll, output.length, task?.id]); + }, [autoScroll, transcript.length, task?.id]); if (!task) return h(EmptyState, { title: "未选择任务", text: "从左侧队列选择任务,或提交新 Codex 任务。" }); + if (loading && !taskHasDetail(task)) return h("div", { className: "codex-transcript", ref, "data-testid": "codex-output" }, + h("div", { className: "codex-output-empty" }, "正在加载完整 session 记录..."), + ); return h("div", { className: "codex-transcript", ref, "data-testid": "codex-output" }, - output.length === 0 ? h("div", { className: "codex-output-empty" }, "等待 Codex 输出...") : output.map((item: any) => h("article", { key: `${item.seq}-${item.channel}`, className: `codex-output-line ${item.channel || "system"}` }, - h("div", { className: "codex-output-meta" }, - h("span", { className: "codex-output-channel" }, channelLabel(String(item.channel || "system"))), - h("span", null, fmtDate(item.at)), - item.method ? h("code", null, item.method) : null, + transcript.length === 0 ? h("div", { className: "codex-output-empty" }, "等待 Codex 输出...") : transcript.map((item: any) => { + const kind = String(item.kind || "message"); + const isCommand = ["ran", "explored", "edited"].includes(kind); + const commandMore = omittedLabel(item.commandOmittedLines); + const bodyMore = omittedLabel(item.bodyOmittedLines); + const commandText = String(item.commandPreview || (isCommand ? item.title || "" : "")); + return h("article", { key: `${item.seq}-${kind}`, className: `codex-transcript-item ${kind}` }, + h("div", { className: "codex-transcript-bullet" }, "•"), + h("div", { className: "codex-transcript-main" }, + h("div", { className: "codex-transcript-title" }, + h("span", { className: "codex-output-channel" }, transcriptKindLabel(kind)), + isCommand ? null : h("strong", null, String(item.title || transcriptKindLabel(kind))), + item.status ? h("code", null, String(item.status)) : null, + h("time", null, fmtDate(item.at)), + ), + commandText ? h("pre", { className: "codex-transcript-command" }, + commandText, + commandMore ? `\n${commandMore}` : "", + ) : null, + item.bodyPreview ? h("pre", { className: "codex-transcript-body" }, + String(item.bodyPreview), + bodyMore ? `\n${bodyMore} (查看原始JSON获取完整记录)` : "", + ) : null, + ), + ); + }), + ); +} + +function PromptDetail({ task }: AnyRecord) { + if (!task) return h(EmptyState, { title: "未选择任务", text: "选择队列或历史 session 后,这里显示完整 prompt、模型和工作目录。" }); + const promptText = String(task?.prompt || ""); + const lines = promptText.length > 0 ? promptText.split(/\r\n|\r|\n/u).length : 0; + return h("div", { className: "codex-prompt-detail", "data-testid": "codex-task-prompt-detail" }, + h("div", { className: "codex-prompt-meta" }, + h(StatusBadge, { status: task?.status }, task?.status || "unknown"), + h("span", null, `model=${task?.model || "--"}`), + h("span", null, `cwd=${task?.cwd || "--"}`), + h("span", null, `created=${fmtDate(task?.createdAt)}`), + h("span", null, `${lines} lines / ${promptText.length} chars`), + ), + h("pre", { className: "codex-prompt-full", "data-testid": "codex-task-prompt-full" }, promptText || "空 prompt"), + ); +} + +function RawTranscript({ task }: AnyRecord) { + const output = taskOutput(task); + if (!task || output.length === 0) return h(EmptyState, { title: "暂无原始消息", text: "原始 Codex app-server 消息会保留在任务 JSON 中。" }); + return h("details", { className: "codex-raw-output" }, + h("summary", null, `原始 messages (${output.length})`), + h("div", null, + output.map((item: any) => h("article", { key: `${item.seq}-${item.channel}`, className: `codex-output-line ${item.channel || "system"}` }, + h("div", { className: "codex-output-meta" }, + h("span", { className: "codex-output-channel" }, channelLabel(String(item.channel || "system"))), + h("span", null, fmtDate(item.at)), + item.method ? h("code", null, item.method) : null, + ), + h("pre", null, String(item.text || "")), + )), ), - h("pre", null, String(item.text || "")), - )), ); } @@ -215,21 +398,31 @@ function AttemptTable({ task }: AnyRecord) { export function CodexQueuePage({ microservices, onRaw, apiBaseUrl = "/api" }: AnyRecord) { const service = microservices.find((item: any) => item.id === "codex-queue") || null; + const selectedIdRef = useRef(""); + const queueLoadTokenRef = useRef(0); + const detailLoadTokenRef = useRef(0); + const detailInFlightRef = useRef<{ taskId: string; token: number; promise: Promise } | null>(null); + const sessionCacheRef = useRef>(new Map()); const [health, setHealth] = useState(null); const [tasksData, setTasksData] = useState(null); const [selectedId, setSelectedId] = useState(""); const [selectedTask, setSelectedTask] = useState(null); + const [selectedDetailLoading, setSelectedDetailLoading] = useState(false); const [prompt, setPrompt] = useState("请在 UniDesk 工作区中完成一个很小的验证任务:读取 package.json 并总结项目名称,不要修改文件。"); const [model, setModel] = useState("gpt-5.4-mini"); - const [cwd, setCwd] = useState("/workspace"); + const [cwd, setCwd] = useState("/root/unidesk"); const [maxAttempts, setMaxAttempts] = useState(3); + const [repeatCount, setRepeatCount] = useState(1); const [steerPrompt, setSteerPrompt] = useState(""); const [autoScroll, setAutoScroll] = useState(true); + const [queueSidebarOpen, setQueueSidebarOpen] = useState(true); const [busy, setBusy] = useState(false); const [error, setError] = useState(""); const [refreshedAt, setRefreshedAt] = useState(null); const tasks = taskRows(tasksData); + const queuedTasks = tasks.filter((task: any) => !taskIsTerminal(task)); + const historyTasks = tasks.filter((task: any) => taskIsTerminal(task)); const queue = tasksData?.queue || health?.body?.queue || health?.queue || {}; const counts = queueCounts(queue); const activeTaskId = queue?.activeTaskId || tasks.find((task: any) => taskIsActive(task))?.id || ""; @@ -237,28 +430,122 @@ export function CodexQueuePage({ microservices, onRaw, apiBaseUrl = "/api" }: An const repository = service ? microserviceRepository(service) : {}; const backend = service ? microserviceBackend(service) : {}; const promptParts = useMemo(() => splitPromptTasks(prompt), [prompt]); + const enqueueItems = useMemo(() => { + const count = repeatCountValue(repeatCount); + return promptParts.flatMap((text) => Array.from({ length: count }, () => text)); + }, [promptParts, repeatCount]); + const codexModels = codexModelOptions(queue, model); const selectedCanSteer = selectedTask?.id && selectedTask?.activeTurnId && String(selectedTask?.status) === "running"; const selectedCanInterrupt = selectedTask?.id && !["succeeded", "failed", "canceled"].includes(String(selectedTask?.status || "")); const selectedCanRetry = selectedTask?.id && ["succeeded", "failed", "canceled"].includes(String(selectedTask?.status || "")); - async function load(preferId = selectedId): Promise { + function publishCachedTask(taskId: string, patch: AnyRecord, token: number): AnyRecord { + const cached = sessionCacheRef.current.get(taskId) || {}; + const existingTask = cached.task || {}; + const nextTranscript = Object.prototype.hasOwnProperty.call(patch, "transcript") + ? patch.transcript + : (Array.isArray(existingTask.transcript) ? existingTask.transcript : []); + const task = { + ...existingTask, + ...patch, + transcript: nextTranscript, + output: Array.isArray(patch.output) ? patch.output : (Array.isArray(existingTask.output) ? existingTask.output : []), + events: Array.isArray(patch.events) ? patch.events : (Array.isArray(existingTask.events) ? existingTask.events : []), + }; + const entry = { + ...cached, + task, + maxSeq: transcriptMaxSeq(nextTranscript), + complete: Boolean(patch._transcriptComplete ?? cached.complete), + }; + sessionCacheRef.current.set(taskId, entry); + if (token === detailLoadTokenRef.current && selectedIdRef.current === taskId) setSelectedTask(task); + return entry; + } + + async function loadTaskDetail(taskId: string): Promise { + if (!service || !taskId) return; + const token = detailLoadTokenRef.current; + const cached = sessionCacheRef.current.get(taskId); + if (cached?.task) { + setSelectedTask(cached.task); + setSelectedDetailLoading(false); + if (cached.complete && taskIsTerminal(cached.task)) return; + } else { + setSelectedDetailLoading(true); + } + const existing = detailInFlightRef.current; + if (existing?.taskId === taskId && existing.token === token) return existing.promise; + const promise = (async () => { + try { + const meta = await requestJson(codexApi(apiBaseUrl, `/api/tasks/${encodeURIComponent(taskId)}?meta=1`)); + if (token !== detailLoadTokenRef.current || selectedIdRef.current !== taskId) return; + const current = sessionCacheRef.current.get(taskId); + const currentTranscript = Array.isArray(current?.task?.transcript) ? current.task.transcript : []; + const metaTask = meta?.task || {}; + publishCachedTask(taskId, { ...metaTask, transcript: currentTranscript, _detailLoaded: currentTranscript.length > 0, _transcriptComplete: current?.complete }, token); + const startSeq = currentTranscript.length > 0 && !taskIsTerminal(metaTask) + ? Math.max(0, transcriptMaxSeq(currentTranscript) - 1) + : transcriptMaxSeq(currentTranscript); + let afterSeq = current?.complete && taskIsTerminal(metaTask) ? transcriptMaxSeq(currentTranscript) : startSeq; + let hasMore = true; + while (hasMore) { + const chunk = await requestJson(codexApi(apiBaseUrl, `/api/tasks/${encodeURIComponent(taskId)}/transcript?afterSeq=${encodeURIComponent(String(afterSeq))}&limit=32`)); + if (token !== detailLoadTokenRef.current || selectedIdRef.current !== taskId) return; + const cachedNow = sessionCacheRef.current.get(taskId); + const existingTranscript = Array.isArray(cachedNow?.task?.transcript) ? cachedNow.task.transcript : []; + const mergedTranscript = mergeTranscriptRows(existingTranscript, Array.isArray(chunk?.transcript) ? chunk.transcript : []); + const complete = Boolean(!chunk?.hasMore); + publishCachedTask(taskId, { + status: chunk?.status || metaTask.status, + updatedAt: chunk?.updatedAt || metaTask.updatedAt, + transcript: mergedTranscript, + _detailLoaded: complete || mergedTranscript.length > 0, + _transcriptComplete: complete, + }, token); + hasMore = Boolean(chunk?.hasMore); + afterSeq = Number(chunk?.nextAfterSeq ?? transcriptMaxSeq(mergedTranscript)); + if (!hasMore) break; + await new Promise((resolve) => window.setTimeout(resolve, 0)); + } + } finally { + if (detailInFlightRef.current?.taskId === taskId && detailInFlightRef.current?.token === token) detailInFlightRef.current = null; + if (token === detailLoadTokenRef.current && selectedIdRef.current === taskId) setSelectedDetailLoading(false); + } + })(); + detailInFlightRef.current = { taskId, token, promise }; + await promise; + } + + async function load(preferId = selectedIdRef.current): Promise { if (!service) return; - const [healthResult, tasksResult] = await Promise.all([ - requestJson(`${apiBaseUrl}/microservices/codex-queue/health`), - requestJson(codexApi(apiBaseUrl, "/api/tasks?limit=80")), - ]); + const token = queueLoadTokenRef.current + 1; + queueLoadTokenRef.current = token; + const requestedId = String(preferId || selectedIdRef.current || ""); + const healthResult = await requestJson(`${apiBaseUrl}/microservices/codex-queue/health`); + const tasksResult = await loadTaskQueue(apiBaseUrl, healthResult); + if (token !== queueLoadTokenRef.current) return; setHealth(healthResult); setTasksData(tasksResult); const rows = taskRows(tasksResult); - const nextId = preferId && rows.some((task: any) => task.id === preferId) - ? preferId + const latestRequestedId = requestedId || selectedIdRef.current; + const nextId = latestRequestedId && rows.some((task: any) => task.id === latestRequestedId) + ? latestRequestedId : (tasksResult?.queue?.activeTaskId || rows.find((task: any) => taskIsActive(task))?.id || rows[0]?.id || ""); + const previousId = selectedIdRef.current; + if (previousId !== nextId) detailLoadTokenRef.current += 1; + selectedIdRef.current = nextId; setSelectedId(nextId); - if (nextId) { - const detail = await requestJson(codexApi(apiBaseUrl, `/api/tasks/${encodeURIComponent(nextId)}`)); - setSelectedTask(detail?.task || null); - } else { + const row = rows.find((task: any) => task.id === nextId); + if (row) { + const cached = sessionCacheRef.current.get(nextId); + if (cached?.task) sessionCacheRef.current.set(nextId, { ...cached, task: { ...row, ...cached.task, status: row.status, updatedAt: row.updatedAt } }); + } + if (nextId) void loadTaskDetail(nextId).catch((err) => setError(errorMessage(err, "加载 Codex session 详情失败"))); + else { + detailLoadTokenRef.current += 1; setSelectedTask(null); + setSelectedDetailLoading(false); } setRefreshedAt(new Date()); } @@ -278,12 +565,13 @@ export function CodexQueuePage({ microservices, onRaw, apiBaseUrl = "/api" }: An async function enqueue(event: any): Promise { event.preventDefault(); await guarded(async () => { - if (promptParts.length === 0) throw new Error("prompt 不能为空"); - const body = promptParts.length === 1 - ? { prompt: promptParts[0], model, cwd, maxAttempts: Number(maxAttempts) } - : { tasks: promptParts.map((text) => ({ prompt: text, model, cwd, maxAttempts: Number(maxAttempts) })) }; - const result = await requestJson(codexApi(apiBaseUrl, promptParts.length === 1 ? "/api/tasks" : "/api/tasks/batch"), { method: "POST", body }); + if (enqueueItems.length === 0) throw new Error("prompt 不能为空"); + const body = enqueueItems.length === 1 + ? { prompt: enqueueItems[0], model, cwd, maxAttempts: Number(maxAttempts) } + : { tasks: enqueueItems.map((text) => ({ prompt: text, model, cwd, maxAttempts: Number(maxAttempts) })) }; + const result = await requestJson(codexApi(apiBaseUrl, enqueueItems.length === 1 ? "/api/tasks" : "/api/tasks/batch"), { method: "POST", body }); const firstId = result?.tasks?.[0]?.id || ""; + selectedIdRef.current = firstId; await load(firstId); }, "Codex 任务入队失败"); } @@ -314,17 +602,85 @@ export function CodexQueuePage({ microservices, onRaw, apiBaseUrl = "/api" }: An }, "重新入队失败"); } + function selectTask(taskId: string): void { + selectedIdRef.current = taskId; + detailLoadTokenRef.current += 1; + setSelectedId(taskId); + const cached = sessionCacheRef.current.get(taskId); + if (cached?.task) { + setSelectedTask(cached.task); + setSelectedDetailLoading(false); + } else { + setSelectedDetailLoading(true); + const row = tasks.find((task: any) => task.id === taskId); + if (row) setSelectedTask(row); + else setSelectedTask(null); + } + void load(taskId).catch((err) => setError(errorMessage(err, "切换 Codex session 失败"))); + } + useEffect(() => { - void guarded(() => load(), "Codex Queue 加载失败"); + void guarded(() => load(selectedIdRef.current), "Codex Queue 加载失败"); }, [service?.id, service?.runtime?.providerStatus]); useEffect(() => { if (!service) return undefined; const timer = window.setInterval(() => { - void load(selectedId).catch((err) => setError(errorMessage(err, "Codex Queue 轮询失败"))); + void load(selectedIdRef.current).catch((err) => setError(errorMessage(err, "Codex Queue 轮询失败"))); }, 1500); return () => window.clearInterval(timer); - }, [service?.id, selectedId]); + }, [service?.id]); + + const taskListContent = tasks.length === 0 ? h(EmptyState, { title: "队列为空", text: "提交一个任务后,Codex 会串行执行并保存输出。" }) : [ + h(TaskListSection, { + key: "active", + title: "运行 / 排队", + tasks: queuedTasks, + selectedId, + emptyText: "当前没有运行或排队任务。", + onSelect: selectTask, + }), + h(TaskListSection, { + key: "history", + title: "历史 session", + tasks: historyTasks, + selectedId, + emptyText: "最近没有完成、失败或取消的 session。", + onSelect: selectTask, + }), + ]; + + const sessionPanel = h(Panel, { + title: selectedTask ? `Session ${String(selectedTask.id).slice(0, 22)}` : "Session 输出", + eyebrow: selectedTask ? `${selectedTask.status} / ${selectedTask.model}` : "Codex CLI-like stream", + actions: h("div", { className: "panel-actions" }, + h("button", { type: "button", className: "ghost-btn", onClick: () => setQueueSidebarOpen((value: boolean) => !value), "data-testid": "codex-queue-sidebar-toggle" }, queueSidebarOpen ? "收起队列" : "显示队列"), + h("label", { className: "inline-check" }, h("input", { type: "checkbox", checked: autoScroll, onChange: (event: any) => setAutoScroll(Boolean(event.target.checked)) }), "自动滚动"), + h("button", { type: "button", className: "ghost-btn", disabled: !selectedCanInterrupt || busy, onClick: () => void interrupt(), "data-testid": "codex-interrupt-button" }, "打断"), + h("button", { type: "button", className: "ghost-btn", disabled: !selectedCanRetry || busy, onClick: () => void retry() }, "重试"), + selectedTask ? h(RawButton, { title: "Codex Task", data: selectedTask, onOpen: onRaw, testId: "raw-codex-task" }) : null, + ), + className: "codex-output-panel", + }, + h("div", { className: `codex-session-shell ${queueSidebarOpen ? "" : "queue-collapsed"}` }, + queueSidebarOpen ? h("aside", { className: "codex-session-sidebar", "data-testid": "codex-session-sidebar" }, + h("div", { className: "codex-session-sidebar-head" }, + h("div", null, + h("span", null, "Queue"), + h("strong", null, `${tasks.length} sessions`), + ), + h("button", { type: "button", className: "ghost-btn", onClick: () => setQueueSidebarOpen(false) }, "收起"), + ), + h("div", { className: "codex-task-list codex-task-list-session" }, taskListContent), + ) : null, + h("div", { className: "codex-session-main" }, + h("div", { className: "codex-output-stack" }, + h(Transcript, { task: selectedTask, autoScroll, loading: selectedDetailLoading }), + h(RawTranscript, { task: selectedTask }), + ), + ), + ), + ); if (!service) return h(EmptyState, { title: "Codex Queue 未登记", text: "请在 config.json 的 microservices 中登记 id=codex-queue" }); @@ -350,7 +706,7 @@ export function CodexQueuePage({ microservices, onRaw, apiBaseUrl = "/api" }: An h("div", { className: "microservice-ref-card" }, h("span", null, "Codex"), h("strong", null, queue?.defaultModel || "gpt-5.4-mini"), - h("code", null, "codex app-server --listen stdio://"), + h("code", null, `models: ${codexModels.join(" / ")}`), ), h("div", { className: "microservice-ref-card" }, h("span", null, "Backend"), @@ -367,50 +723,33 @@ export function CodexQueuePage({ microservices, onRaw, apiBaseUrl = "/api" }: An h(MetricCard, { label: "异常/取消", value: String(Number(counts.failed || 0) + Number(counts.canceled || 0)), hint: "terminal non-success", tone: Number(counts.failed || 0) > 0 ? "fail" : "" }), h(MetricCard, { label: "最近刷新", value: refreshedAt ? fmtClock(refreshedAt) : "--", hint: "1.5s polling" }), ), + h("div", { className: "codex-session-stage" }, sessionPanel), h("div", { className: "codex-queue-layout" }, h("div", { className: "codex-left-rail" }, - h(Panel, { title: "提交任务", eyebrow: promptParts.length > 1 ? `${promptParts.length} tasks` : "Single or Batch", className: "codex-compose-panel" }, + h(Panel, { title: "提交任务", eyebrow: enqueueItems.length > 1 ? `${enqueueItems.length} tasks` : "Single or Batch", className: "codex-compose-panel" }, h("form", { className: "codex-task-form", onSubmit: enqueue, "data-testid": "codex-queue-task-form" }, h("label", null, "Prompt / 多任务用单独一行 --- 分隔", h("textarea", { value: prompt, rows: 8, onChange: (event: any) => setPrompt(event.target.value), placeholder: "写入 Codex 任务;多个任务之间用 --- 分隔。" }), ), h("div", { className: "codex-form-grid" }, - h("label", null, "模型", h("input", { value: model, onChange: (event: any) => setModel(event.target.value), placeholder: "gpt-5.4-mini" })), - h("label", null, "工作目录", h("input", { value: cwd, onChange: (event: any) => setCwd(event.target.value), placeholder: "/workspace" })), + h("label", null, "模型", + h("select", { value: model, onChange: (event: any) => setModel(event.target.value), "data-testid": "codex-model-select" }, + codexModels.map((name) => h("option", { key: name, value: name }, name)), + ), + ), + h("label", null, "工作目录", h("input", { value: cwd, onChange: (event: any) => setCwd(event.target.value), placeholder: queue?.defaultWorkdir || "/root/unidesk" })), h("label", null, "最大尝试", h("input", { type: "number", min: 1, max: 10, value: maxAttempts, onChange: (event: any) => setMaxAttempts(Number(event.target.value)) })), + h("label", null, "入队份数", h("input", { type: "number", min: 1, max: 50, value: repeatCount, onChange: (event: any) => setRepeatCount(Number(event.target.value)), "data-testid": "codex-repeat-count-input" })), ), - h("button", { type: "submit", className: "primary-btn", disabled: busy || promptParts.length === 0 }, promptParts.length > 1 ? `批量入队 ${promptParts.length} 个任务` : "入队并运行"), - ), - ), - h(Panel, { title: "队列", eyebrow: `${tasks.length} visible` }, - h("div", { className: "codex-task-list" }, - tasks.length === 0 ? h(EmptyState, { title: "队列为空", text: "提交一个任务后,Codex 会串行执行并保存输出。" }) : tasks.map((task: any) => h(TaskCard, { - key: task.id, - task, - selected: selectedId === task.id, - onSelect: () => { - setSelectedId(task.id); - void load(task.id); - }, - })), + h("button", { type: "submit", className: "primary-btn", disabled: busy || enqueueItems.length === 0, "data-testid": "codex-enqueue-button" }, enqueueItems.length > 1 ? `批量入队 ${enqueueItems.length} 个任务` : "入队并运行"), ), ), ), h("div", { className: "codex-main-stage" }, - h(Panel, { - title: selectedTask ? `Session ${String(selectedTask.id).slice(0, 22)}` : "Session 输出", - eyebrow: selectedTask ? `${selectedTask.status} / ${selectedTask.model}` : "Codex CLI-like stream", - actions: h("div", { className: "panel-actions" }, - h("label", { className: "inline-check" }, h("input", { type: "checkbox", checked: autoScroll, onChange: (event: any) => setAutoScroll(Boolean(event.target.checked)) }), "自动滚动"), - h("button", { type: "button", className: "ghost-btn", disabled: !selectedCanInterrupt || busy, onClick: () => void interrupt(), "data-testid": "codex-interrupt-button" }, "打断"), - h("button", { type: "button", className: "ghost-btn", disabled: !selectedCanRetry || busy, onClick: () => void retry() }, "重试"), - selectedTask ? h(RawButton, { title: "Codex Task", data: selectedTask, onOpen: onRaw, testId: "raw-codex-task" }) : null, - ), - className: "codex-output-panel", - }, - h(Transcript, { task: selectedTask, autoScroll }), - ), h("div", { className: "codex-detail-grid" }, + h(Panel, { title: "Prompt 全量", eyebrow: selectedTask ? String(selectedTask.id) : "selected task", className: "codex-prompt-panel" }, + h(PromptDetail, { task: selectedTask }), + ), h(Panel, { title: "运行控制", eyebrow: selectedCanSteer ? "Active turn steer" : "Steer when running" }, h("form", { className: "codex-steer-form", onSubmit: steer }, h("label", null, "追加 prompt", diff --git a/src/components/frontend/src/index.ts b/src/components/frontend/src/index.ts index 080b1458..25c90e01 100644 --- a/src/components/frontend/src/index.ts +++ b/src/components/frontend/src/index.ts @@ -295,6 +295,7 @@ function isStaticAssetPath(pathname: string): boolean { const server = Bun.serve({ port: config.port, hostname: "0.0.0.0", + idleTimeout: 120, async fetch(req) { const url = new URL(req.url); logger("debug", "request", { path: url.pathname }); diff --git a/src/components/microservices/codex-queue/Dockerfile b/src/components/microservices/codex-queue/Dockerfile index cbccede5..de92de85 100644 --- a/src/components/microservices/codex-queue/Dockerfile +++ b/src/components/microservices/codex-queue/Dockerfile @@ -1,7 +1,36 @@ FROM oven/bun:1-debian RUN apt-get update \ - && apt-get install -y --no-install-recommends ca-certificates curl git bash ripgrep procps python3 make g++ bubblewrap docker.io npm \ + && apt-get install -y --no-install-recommends \ + bash \ + bubblewrap \ + ca-certificates \ + curl \ + docker-cli \ + docker-compose \ + file \ + g++ \ + gcc \ + git \ + gzip \ + jq \ + make \ + npm \ + openssh-client \ + patch \ + pkg-config \ + procps \ + python3 \ + python3-pip \ + ripgrep \ + rsync \ + tar \ + tini \ + unzip \ + xz-utils \ + && mkdir -p /usr/local/lib/docker/cli-plugins /root/.docker/cli-plugins \ + && ln -sf /usr/bin/docker-compose /usr/local/lib/docker/cli-plugins/docker-compose \ + && ln -sf /usr/bin/docker-compose /root/.docker/cli-plugins/docker-compose \ && npm install -g @openai/codex@0.128.0 \ && apt-get clean \ && rm -rf /var/lib/apt/lists/* @@ -12,4 +41,5 @@ COPY src/components/microservices/codex-queue/tsconfig.json ./tsconfig.json COPY src/components/microservices/codex-queue/src ./src EXPOSE 4222 +ENTRYPOINT ["tini", "--"] CMD ["bun", "run", "src/index.ts"] diff --git a/src/components/microservices/codex-queue/src/index.ts b/src/components/microservices/codex-queue/src/index.ts index fe835996..8951e74f 100644 --- a/src/components/microservices/codex-queue/src/index.ts +++ b/src/components/microservices/codex-queue/src/index.ts @@ -1,4 +1,4 @@ -import { spawn, type ChildProcessWithoutNullStreams } from "node:child_process"; +import { spawn, spawnSync, type ChildProcessWithoutNullStreams } from "node:child_process"; import { appendFileSync, copyFileSync, existsSync, mkdirSync, readFileSync, renameSync, writeFileSync } from "node:fs"; import { dirname, resolve } from "node:path"; import * as readline from "node:readline"; @@ -9,6 +9,7 @@ type RunMode = "initial" | "retry"; type JudgeDecision = "complete" | "retry" | "fail"; type OutputChannel = "system" | "user" | "assistant" | "reasoning" | "command" | "diff" | "tool" | "error"; type TerminalStatus = "completed" | "interrupted" | "failed" | null; +type TranscriptKind = "ran" | "explored" | "edited" | "plan" | "message" | "system" | "error"; interface RuntimeConfig { host: string; @@ -19,6 +20,7 @@ interface RuntimeConfig { codexHome: string; sourceCodexConfig: string; defaultModel: string; + codexModels: string[]; defaultReasoningEffort: string | null; sandbox: "read-only" | "workspace-write" | "danger-full-access"; approvalPolicy: "untrusted" | "on-failure" | "on-request" | "never"; @@ -27,6 +29,7 @@ interface RuntimeConfig { minimaxApiBase: string; minimaxModel: string; judgeTimeoutMs: number; + judgeRepairAttempts: number; } interface QueueTaskRequest { @@ -46,6 +49,19 @@ interface LiveOutput { itemId?: string; } +interface TranscriptLine { + seq: number; + at: string; + kind: TranscriptKind; + title: string; + status?: string; + commandPreview?: string; + commandOmittedLines?: number; + bodyPreview?: string; + bodyOmittedLines?: number; + rawSeqs: number[]; +} + interface CodexEventSummary { at: string; method: string; @@ -78,6 +94,16 @@ interface JudgeResult { raw?: JsonValue; } +interface ParsedJudgeJson { + value: Record; + source: string; +} + +interface MiniMaxJudgeResponse { + rawText: string; + content: string; +} + interface QueueTask { id: string; prompt: string; @@ -135,6 +161,7 @@ interface JudgeProbeCase { finalResponse: string; expected: JudgeDecision; terminalStatus: TerminalStatus; + cancelRequested?: boolean; transportClosedBeforeTerminal?: boolean; terminalError?: string | null; stderrTail?: string; @@ -156,6 +183,10 @@ const logger = createLogger("codex-queue", config.logFile); const state = readState(config.statePath); let processing = false; let activeRun: ActiveRun | null = null; +let devReadyCache: { checkedAtMs: number; value: JsonValue } | null = null; +let persistTimer: ReturnType | null = null; +let persistDirty = false; +let shutdownRequested = false; function envString(name: string, fallback: string): string { const value = process.env[name]; @@ -174,6 +205,16 @@ function envNumber(name: string, fallback: number): number { return Number.isFinite(value) && value > 0 ? Math.floor(value) : fallback; } +function envList(name: string, fallback: string[]): string[] { + const raw = process.env[name]; + const source = raw === undefined || raw.length === 0 ? fallback.join(",") : raw; + return Array.from(new Set(source.split(",").map((item) => item.trim()).filter(Boolean))); +} + +function withRequiredModel(models: string[], model: string): string[] { + return models.includes(model) ? models : [model, ...models]; +} + function sandboxValue(raw: string): RuntimeConfig["sandbox"] { if (raw === "read-only" || raw === "workspace-write" || raw === "danger-full-access") return raw; return "danger-full-access"; @@ -185,23 +226,26 @@ function approvalValue(raw: string): RuntimeConfig["approvalPolicy"] { } function readConfig(): RuntimeConfig { + const defaultModel = envString("CODEX_QUEUE_DEFAULT_MODEL", "gpt-5.4-mini"); return { host: envString("HOST", "0.0.0.0"), port: envNumber("PORT", 4222), statePath: envString("CODEX_QUEUE_STATE_PATH", "/var/lib/unidesk/codex-queue/state.json"), logFile: envString("LOG_FILE", "/var/log/unidesk/codex-queue.jsonl"), - defaultWorkdir: envString("CODEX_QUEUE_WORKDIR", "/workspace"), + defaultWorkdir: envString("CODEX_QUEUE_WORKDIR", "/root/unidesk"), codexHome: envString("CODEX_QUEUE_CODEX_HOME", "/var/lib/unidesk/codex-queue/codex-home"), sourceCodexConfig: envString("CODEX_QUEUE_SOURCE_CODEX_CONFIG", "/root/.codex/config.toml"), - defaultModel: envString("CODEX_QUEUE_DEFAULT_MODEL", "gpt-5.4-mini"), + defaultModel, + codexModels: withRequiredModel(envList("CODEX_QUEUE_MODELS", ["gpt-5.4-mini", "gpt-5.4", "gpt-5.5"]), defaultModel), defaultReasoningEffort: envNullableString("CODEX_QUEUE_REASONING_EFFORT"), sandbox: sandboxValue(envString("CODEX_QUEUE_SANDBOX", "danger-full-access")), approvalPolicy: approvalValue(envString("CODEX_QUEUE_APPROVAL_POLICY", "never")), defaultMaxAttempts: Math.max(1, Math.min(10, envNumber("CODEX_QUEUE_MAX_ATTEMPTS", 3))), minimaxApiKey: envString("MINIMAX_API_KEY", ""), - minimaxApiBase: envString("MINIMAX_API_BASE", "https://api.minimax.io/v1").replace(/\/+$/u, ""), + minimaxApiBase: envString("MINIMAX_API_BASE", "https://api.minimaxi.com/v1").replace(/\/+$/u, ""), minimaxModel: envString("MINIMAX_MODEL", "MiniMax-M2.7"), - judgeTimeoutMs: envNumber("MINIMAX_JUDGE_TIMEOUT_MS", 30_000), + judgeTimeoutMs: envNumber("MINIMAX_JUDGE_TIMEOUT_MS", 60_000), + judgeRepairAttempts: Math.max(0, Math.min(5, envNumber("MINIMAX_JUDGE_REPAIR_ATTEMPTS", 2))), }; } @@ -257,6 +301,11 @@ function readState(path: string): PersistedState { } function persistState(): void { + persistDirty = false; + if (persistTimer !== null) { + clearTimeout(persistTimer); + persistTimer = null; + } state.updatedAt = nowIso(); mkdirSync(dirname(config.statePath), { recursive: true }); const tmp = `${config.statePath}.tmp`; @@ -264,6 +313,15 @@ function persistState(): void { renameSync(tmp, config.statePath); } +function schedulePersistState(delayMs = 1000): void { + persistDirty = true; + if (persistTimer !== null) return; + persistTimer = setTimeout(() => { + persistTimer = null; + if (persistDirty) persistState(); + }, delayMs); +} + function prepareCodexHome(): void { mkdirSync(config.codexHome, { recursive: true }); if (existsSync(config.sourceCodexConfig)) { @@ -278,6 +336,189 @@ function safePreview(value: string, max = 900): string { return compact.length > max ? `${compact.slice(0, max)}...` : compact; } +function linePreview(text: string, maxLines: number, maxChars: number): { text: string; omittedLines: number } { + const clean = text.replace(/\u001b\[[0-9;]*m/gu, "").trimEnd(); + if (clean.length === 0) return { text: "", omittedLines: 0 }; + const lines = clean.split(/\r?\n/u); + const kept: string[] = []; + let chars = 0; + for (const line of lines) { + if (kept.length >= maxLines || chars + line.length > maxChars) break; + kept.push(line); + chars += line.length + 1; + } + return { text: kept.join("\n"), omittedLines: Math.max(0, lines.length - kept.length) }; +} + +function compactNoisyLine(line: string): string { + const compact = line.replace(/\s+/gu, " ").trimEnd(); + const hasEncodedBlob = /[A-Za-z0-9+/=]{220,}/u.test(compact); + const hasSshWrapper = compact.includes("UNIDESK_SSH_TOOL_DIR") || compact.includes("apply_patch") || compact.includes("base64 -d"); + if (compact.length > 420 && (hasEncodedBlob || hasSshWrapper)) { + return `${compact.slice(0, 220)} ... [omitted noisy wrapper, ${compact.length - 220} chars]`; + } + return compact.length > 1200 ? `${compact.slice(0, 900)} ... [omitted ${compact.length - 900} chars]` : line; +} + +function compactTranscriptBody(text: string): string { + return text.split(/\r?\n/u).map(compactNoisyLine).join("\n"); +} + +function parseCommandLine(text: string): { command: string; status?: string } | null { + const match = text.match(/^item\/(?:started|completed):\s+([\s\S]*?)\s+status=([A-Za-z0-9_-]+)/u); + if (match === null) return null; + return { command: match[1]?.trim() ?? "", status: match[2] }; +} + +function extractOuterQuotedShellArg(text: string): { body: string; trailing: string } | null { + const quote = text[0]; + if (quote !== "\"" && quote !== "'") return null; + let escaped = false; + for (let index = 1; index < text.length; index += 1) { + const char = text[index] ?? ""; + if (quote === "\"" && escaped) { + escaped = false; + continue; + } + if (quote === "\"" && char === "\\") { + escaped = true; + continue; + } + if (char === quote) return { body: text.slice(1, index), trailing: text.slice(index + 1).trim() }; + } + return null; +} + +function decodeShellDoubleQuoted(text: string): string { + return text + .replace(/\\(["\\$`])/gu, "$1") + .replace(/\\\r?\n/gu, ""); +} + +function displayCommand(command: string): string { + const normalized = command.trim().replace(/\\n/gu, "\n"); + const match = normalized.match(/^(?:\/usr\/bin\/env\s+)?(?:\/(?:usr\/)?bin\/)?(?:bash|sh)\s+-lc\s+([\s\S]+)$/u); + if (match === null) return normalized; + const shellText = (match[1] ?? "").trim(); + const shellArg = extractOuterQuotedShellArg(shellText); + if (shellArg === null) return (match[1] ?? normalized).trim(); + const quote = shellText[0]; + const body = quote === "\"" ? decodeShellDoubleQuoted(shellArg.body) : shellArg.body; + return shellArg.trailing.length > 0 ? `${body} ${shellArg.trailing}` : body; +} + +function shortCommandTitle(command: string): string { + const firstLine = displayCommand(command).split(/\r?\n/u).find((line) => line.trim().length > 0)?.trim() ?? command.trim(); + return safePreview(firstLine, 180); +} + +function commandPreview(command: string): { text: string; omittedLines: number } { + return linePreview(compactTranscriptBody(displayCommand(command)), 10, 8000); +} + +function outputPreview(text: string): { text: string; omittedLines: number } { + return linePreview(compactTranscriptBody(text), 4, 1600); +} + +function fullMessageBody(text: string): { text: string; omittedLines: number } { + return { text: text.replace(/\u001b\[[0-9;]*m/gu, "").trimEnd(), omittedLines: 0 }; +} + +function transcriptLine(kind: TranscriptKind, at: string, seq: number, title: string, rawSeqs: number[], body = "", command = "", status?: string): TranscriptLine { + const bodyInfo = kind === "message" ? fullMessageBody(body) : outputPreview(body); + const commandInfo = command.length > 0 ? commandPreview(command) : { text: "", omittedLines: 0 }; + return { + seq, + at, + kind, + title, + status, + commandPreview: commandInfo.text || undefined, + commandOmittedLines: commandInfo.omittedLines || undefined, + bodyPreview: bodyInfo.text || undefined, + bodyOmittedLines: bodyInfo.omittedLines || undefined, + rawSeqs, + }; +} + +function commandPath(command: string): string | null { + const result = spawnSync("sh", ["-lc", `command -v ${command}`], { encoding: "utf8", timeout: 2_000 }); + if (result.status !== 0) return null; + const stdout = typeof result.stdout === "string" ? result.stdout.trim() : ""; + return stdout.length > 0 ? stdout.split(/\r?\n/u)[0] ?? null : null; +} + +function runProbe(command: string, args: string[], timeout = 3_000): { ok: boolean; output: string } { + const result = spawnSync(command, args, { encoding: "utf8", timeout }); + const stdout = typeof result.stdout === "string" ? result.stdout : ""; + const stderr = typeof result.stderr === "string" ? result.stderr : ""; + const output = safePreview(`${stdout}\n${stderr}`, 600); + return { ok: result.status === 0, output }; +} + +function collectDevReady(): JsonValue { + const now = Date.now(); + if (devReadyCache !== null && now - devReadyCache.checkedAtMs < 30_000) return devReadyCache.value; + const requiredTools = [ + "bash", + "bun", + "node", + "npm", + "npx", + "codex", + "git", + "rg", + "curl", + "python3", + "pip3", + "docker", + "docker-compose", + "jq", + "ssh", + "rsync", + "make", + "gcc", + "g++", + "tar", + "gzip", + "unzip", + ]; + const tools = requiredTools.map((name) => { + const path = commandPath(name); + return { name, ok: path !== null, path }; + }); + const missingTools = tools.filter((tool) => !tool.ok).map((tool) => tool.name); + const dockerProbe = runProbe("docker", ["version", "--format", "{{.Client.Version}} client / {{.Server.Version}} server"]); + const composeProbe = runProbe("docker", ["compose", "version"]); + const workdirExists = existsSync(config.defaultWorkdir); + const dockerSocketExists = existsSync("/var/run/docker.sock"); + const codexConfigReady = existsSync(config.sourceCodexConfig) || existsSync(resolve(config.codexHome, "config.toml")); + const ok = missingTools.length === 0 && dockerProbe.ok && composeProbe.ok && workdirExists && dockerSocketExists && codexConfigReady; + const value: JsonValue = { + ok, + missingTools, + tools: tools as unknown as JsonValue, + workdir: { path: config.defaultWorkdir, exists: workdirExists }, + docker: { + socketPath: "/var/run/docker.sock", + socketExists: dockerSocketExists, + versionOk: dockerProbe.ok, + version: dockerProbe.output, + composeOk: composeProbe.ok, + composeVersion: composeProbe.output, + }, + codexConfig: { + sourcePath: config.sourceCodexConfig, + sourceExists: existsSync(config.sourceCodexConfig), + homeConfigPath: resolve(config.codexHome, "config.toml"), + homeConfigExists: existsSync(resolve(config.codexHome, "config.toml")), + ready: codexConfigReady, + }, + }; + devReadyCache = { checkedAtMs: now, value }; + return value; +} + function makeTaskId(): string { return `codex_${Date.now()}_${Math.random().toString(16).slice(2, 8)}`; } @@ -333,9 +574,9 @@ function appendOutput(task: QueueTask, channel: OutputChannel, text: string, met } else { task.output.push({ seq: state.nextSeq++, at: nowIso(), channel, text, method, itemId }); } - if (task.output.length > 1200) task.output.splice(0, task.output.length - 1200); + if (task.output.length > 600) task.output.splice(0, task.output.length - 600); task.updatedAt = nowIso(); - persistState(); + schedulePersistState(); } function addEvent(task: QueueTask, event: CodexEventSummary): void { @@ -343,13 +584,200 @@ function addEvent(task: QueueTask, event: CodexEventSummary): void { if (task.events.length > 400) task.events.splice(0, task.events.length - 400); } -function taskForResponse(task: QueueTask, full = false): JsonValue { +function commandKind(command: string): TranscriptKind { + if (/\b(apply_patch|git apply|cat >|tee .*<<|sed -i|python3? .*write_text)\b/u.test(command)) return "edited"; + if (/\b(rg|grep|find|ls|cat|sed -n|tail|head|git status|git diff|ps)\b/u.test(command)) return "explored"; + return "ran"; +} + +function commandKindLabel(kind: TranscriptKind): string { + if (kind === "edited") return "Edited"; + if (kind === "explored") return "Explored"; + return "Ran"; +} + +function buildTaskTranscript(task: QueueTask, limit = 180): TranscriptLine[] { + const entries: TranscriptLine[] = []; + let activeCommand: { seq: number; at: string; command: string; status?: string; body: string; rawSeqs: number[] } | null = null; + + const flushCommand = (): void => { + if (activeCommand === null) return; + const kind = commandKind(activeCommand.command); + entries.push(transcriptLine( + kind, + activeCommand.at, + activeCommand.seq, + shortCommandTitle(activeCommand.command), + activeCommand.rawSeqs, + activeCommand.body, + activeCommand.command, + activeCommand.status, + )); + activeCommand = null; + }; + + for (const item of task.output) { + if (item.channel === "command" && item.method === "item/started") { + flushCommand(); + const parsed = parseCommandLine(item.text); + activeCommand = { + seq: item.seq, + at: item.at, + command: parsed?.command || item.text, + status: parsed?.status, + body: "", + rawSeqs: [item.seq], + }; + continue; + } + if (item.channel === "command" && item.method === "item/commandExecution/outputDelta") { + if (activeCommand !== null) { + activeCommand.body += item.text; + activeCommand.rawSeqs.push(item.seq); + } else { + entries.push(transcriptLine("ran", item.at, item.seq, "Command output", [item.seq], item.text)); + } + continue; + } + if (item.channel === "command" && item.method === "item/completed") { + const parsed = parseCommandLine(item.text); + if (activeCommand !== null) { + activeCommand.status = parsed?.status ?? activeCommand.status; + activeCommand.rawSeqs.push(item.seq); + flushCommand(); + } else { + const command = parsed?.command || item.text; + const kind = commandKind(command); + entries.push(transcriptLine(kind, item.at, item.seq, shortCommandTitle(command), [item.seq], "", command, parsed?.status)); + } + continue; + } + + flushCommand(); + if (item.channel === "diff") { + entries.push(transcriptLine("edited", item.at, item.seq, "Edited files", [item.seq], item.text, "", item.method)); + } else if (item.channel === "error") { + entries.push(transcriptLine("error", item.at, item.seq, "Error", [item.seq], item.text, "", item.method)); + } else if (item.channel === "assistant") { + entries.push(transcriptLine("message", item.at, item.seq, "Assistant message", [item.seq], item.text, "", item.method)); + } else if (item.channel === "reasoning") { + entries.push(transcriptLine("message", item.at, item.seq, "Reasoning", [item.seq], item.text, "", item.method)); + } else if (item.channel === "user") { + entries.push(transcriptLine("message", item.at, item.seq, item.method === "enqueue" ? "Submitted prompt" : "User prompt", [item.seq], item.text, "", item.method)); + } else { + const title = item.method === "queue" && item.text.startsWith("attempt ") + ? "Attempt started" + : item.method === "startup" || item.method === "shutdown" + ? "Queue recovered" + : item.method === "judge" + ? "Judge result" + : "System"; + entries.push(transcriptLine("system", item.at, item.seq, title, [item.seq], item.text, "", item.method)); + } + } + flushCommand(); + return entries.slice(-limit); +} + +function outputForResponse(task: QueueTask, includeRaw: boolean): LiveOutput[] { + if (includeRaw) return task.output; + return task.output.slice(-80).map((item) => ({ ...item, text: safePreview(item.text, 4000) })); +} + +function taskForResponse(task: QueueTask, full = false, includeRaw = full): JsonValue { return { ...task, prompt: full ? task.prompt : safePreview(task.prompt, 2000), finalResponse: full ? task.finalResponse : safePreview(task.finalResponse, 5000), - output: full ? task.output : task.output.slice(-240), - events: full ? task.events : task.events.slice(-120), + output: outputForResponse(task, includeRaw), + events: includeRaw ? task.events : task.events.slice(-120), + transcript: buildTaskTranscript(task, full ? 360 : 120), + } as unknown as JsonValue; +} + +function fullTranscript(task: QueueTask): TranscriptLine[] { + return buildTaskTranscript(task, Number.MAX_SAFE_INTEGER); +} + +function taskForMetaResponse(task: QueueTask): JsonValue { + const transcript = fullTranscript(task); + return { + id: task.id, + prompt: task.prompt, + cwd: task.cwd, + model: task.model, + reasoningEffort: task.reasoningEffort, + maxAttempts: task.maxAttempts, + status: task.status, + createdAt: task.createdAt, + updatedAt: task.updatedAt, + startedAt: task.startedAt, + finishedAt: task.finishedAt, + currentAttempt: task.currentAttempt, + currentMode: task.currentMode, + codexThreadId: task.codexThreadId, + activeTurnId: task.activeTurnId, + finalResponse: safePreview(task.finalResponse, 20000), + lastError: task.lastError, + lastJudge: task.lastJudge, + attempts: task.attempts, + cancelRequested: task.cancelRequested, + nextMode: task.nextMode, + outputCount: task.output.length, + eventCount: task.events.length, + transcriptCount: transcript.length, + transcriptMaxSeq: transcript.at(-1)?.seq ?? 0, + transcript: [], + output: [], + events: [], + } as unknown as JsonValue; +} + +function transcriptChunkResponse(task: QueueTask, url: URL): Response { + const afterSeqRaw = Number(url.searchParams.get("afterSeq") ?? 0); + const afterSeq = Number.isFinite(afterSeqRaw) ? afterSeqRaw : 0; + const limit = parseLimit(url); + const transcript = fullTranscript(task); + const chunk = transcript.filter((line) => Number(line.seq) > afterSeq).slice(0, limit); + const nextAfterSeq = chunk.at(-1)?.seq ?? afterSeq; + return jsonResponse({ + ok: true, + taskId: task.id, + status: task.status, + updatedAt: task.updatedAt, + transcript: chunk, + afterSeq, + nextAfterSeq, + hasMore: transcript.some((line) => Number(line.seq) > Number(nextAfterSeq)), + total: transcript.length, + maxSeq: transcript.at(-1)?.seq ?? 0, + }); +} + +function taskForListResponse(task: QueueTask): JsonValue { + return { + id: task.id, + prompt: safePreview(task.prompt, 2000), + cwd: task.cwd, + model: task.model, + reasoningEffort: task.reasoningEffort, + maxAttempts: task.maxAttempts, + status: task.status, + createdAt: task.createdAt, + updatedAt: task.updatedAt, + startedAt: task.startedAt, + finishedAt: task.finishedAt, + currentAttempt: task.currentAttempt, + currentMode: task.currentMode, + codexThreadId: task.codexThreadId, + activeTurnId: task.activeTurnId, + lastError: task.lastError, + lastJudge: task.lastJudge, + cancelRequested: task.cancelRequested, + outputCount: task.output.length, + eventCount: task.events.length, + attemptCount: task.attempts.length, + attempts: task.attempts.slice(-3), } as unknown as JsonValue; } @@ -366,8 +794,11 @@ function queueSummary(): JsonValue { counts, judgeConfigured: config.minimaxApiKey.length > 0, minimaxModel: config.minimaxModel, + minimaxJudgeRepairAttempts: config.judgeRepairAttempts, defaultModel: config.defaultModel, + codexModels: config.codexModels, defaultWorkdir: config.defaultWorkdir, + devReady: collectDevReady(), }; } @@ -665,13 +1096,28 @@ async function runCodexTurn(task: QueueTask, prompt: string): Promise item.method === "turn/interrupt" || item.text.includes("interrupt requested"))); +} + function judgePrompt(task: QueueTask, result: CodexRunResult): string { const latestAttempt = task.attempts[task.attempts.length - 1] ?? null; return JSON.stringify({ @@ -709,20 +1168,88 @@ function judgePrompt(task: QueueTask, result: CodexRunResult): string { policy: { complete: "Use only when the transcript/final answer shows the current task is actually done; the queue worker will then advance to the next queued task.", retry: "Use when the current task is incomplete, Codex only made a plan, skipped requested edits/commands, produced partial work, needs another turn, or hit a transient network/server/disconnected/transport/internal error. Retry must resume the existing thread when a thread id exists and append continuePrompt.", - fail: "Use when interrupted by user, blocked on missing user input, or failed for deterministic non-retryable reasons.", + fail: "Use when explicitly interrupted/canceled by the user, blocked on missing user input, or failed for deterministic non-retryable reasons. Do not retry explicit user interrupts.", }, }); } -function parseJudgeJson(text: string): Record { - try { - return JSON.parse(text) as Record; - } catch { - const start = text.indexOf("{"); - const end = text.lastIndexOf("}"); - if (start >= 0 && end > start) return JSON.parse(text.slice(start, end + 1)) as Record; - throw new Error("MiniMax judge did not return JSON"); +function parseRecordJson(text: string, source: string): ParsedJudgeJson { + const parsed = JSON.parse(text) as unknown; + if (typeof parsed === "string") return parseJudgeJson(parsed); + if (typeof parsed !== "object" || parsed === null || Array.isArray(parsed)) throw new Error(`${source} parsed to non-object JSON`); + return { value: parsed as Record, source }; +} + +function balancedJsonCandidates(text: string): string[] { + const candidates: string[] = []; + for (let start = 0; start < text.length; start += 1) { + if (text[start] !== "{") continue; + let depth = 0; + let inString = false; + let escaped = false; + for (let index = start; index < text.length; index += 1) { + const char = text[index] ?? ""; + if (inString) { + if (escaped) { + escaped = false; + } else if (char === "\\") { + escaped = true; + } else if (char === "\"") { + inString = false; + } + continue; + } + if (char === "\"") { + inString = true; + } else if (char === "{") { + depth += 1; + } else if (char === "}") { + depth -= 1; + if (depth === 0) { + candidates.push(text.slice(start, index + 1)); + break; + } + } + } } + return candidates; +} + +function judgeJsonCandidates(text: string): Array<{ source: string; text: string }> { + const normalized = text.replace(/^\uFEFF/u, "").trim(); + const candidates: Array<{ source: string; text: string }> = [{ source: "direct", text: normalized }]; + const fenced = /```[ \t]*(?:json|JSON|javascript|js)?[^\n\r]*[\r\n]+([\s\S]*?)```/gu; + for (const match of normalized.matchAll(fenced)) { + const body = typeof match[1] === "string" ? match[1].trim() : ""; + if (body.length > 0) candidates.push({ source: "fenced", text: body }); + } + const strippedFence = normalized + .replace(/^```[^\n\r]*[\r\n]?/u, "") + .replace(/```$/u, "") + .trim(); + if (strippedFence !== normalized && strippedFence.length > 0) candidates.push({ source: "stripped_fence", text: strippedFence }); + const strippedLabel = normalized.replace(/^(?:json|JSON)\s*[:\n\r]\s*/u, "").trim(); + if (strippedLabel !== normalized && strippedLabel.length > 0) candidates.push({ source: "stripped_label", text: strippedLabel }); + for (const candidate of balancedJsonCandidates(normalized)) candidates.push({ source: "balanced_object", text: candidate }); + const seen = new Set(); + return candidates.filter((candidate) => { + const key = candidate.text; + if (key.length === 0 || seen.has(key)) return false; + seen.add(key); + return true; + }); +} + +function parseJudgeJson(text: string): ParsedJudgeJson { + let lastError = "no candidate JSON was found"; + for (const candidate of judgeJsonCandidates(text)) { + try { + return parseRecordJson(candidate.text, candidate.source); + } catch (error) { + lastError = error instanceof Error ? error.message : String(error); + } + } + throw new Error(`MiniMax judge did not return parseable JSON after denoise: ${lastError}; preview=${safePreview(text, 500)}`); } function normalizedDecision(value: unknown): JudgeDecision { @@ -732,7 +1259,63 @@ function normalizedDecision(value: unknown): JudgeDecision { } async function judgeTask(task: QueueTask, result: CodexRunResult): Promise { + if (explicitUserInterrupt(task, result)) return fallbackJudge(result); if (config.minimaxApiKey.length === 0) return fallbackJudge(result); + const messages: Array<{ role: "system" | "user" | "assistant"; content: string }> = [ + { role: "system", content: "You are a strict task-state classifier. Return compact JSON only. Never wrap JSON in Markdown fences." }, + { role: "user", content: judgePrompt(task, result) }, + ]; + try { + let lastParseError: string | null = null; + for (let repairAttempt = 0; repairAttempt <= config.judgeRepairAttempts; repairAttempt += 1) { + const response = await requestMiniMaxJudge(messages); + const preDenoiseContent = response.content; + try { + const parsedResult = parseJudgeJson(preDenoiseContent); + const parsed = parsedResult.value; + if (parsedResult.source !== "direct") { + logger("info", "judge_json_denoised", { taskId: task.id, parseSource: parsedResult.source, repairAttempt }); + } + const confidenceRaw = Number(parsed.confidence ?? 0.5); + return { + decision: normalizedDecision(parsed.decision), + confidence: Number.isFinite(confidenceRaw) ? Math.max(0, Math.min(1, confidenceRaw)) : 0.5, + reason: typeof parsed.reason === "string" ? parsed.reason : "MiniMax judge returned a decision.", + continuePrompt: typeof parsed.continuePrompt === "string" && parsed.continuePrompt.trim().length > 0 ? parsed.continuePrompt : undefined, + source: "minimax", + raw: { ...(parsed as Record), _parseSource: parsedResult.source, _repairAttempt: repairAttempt }, + }; + } catch (error) { + lastParseError = error instanceof Error ? error.message : String(error); + if (repairAttempt >= config.judgeRepairAttempts) throw new Error(lastParseError); + logger("warn", "judge_json_parse_retry", { + taskId: task.id, + repairAttempt: repairAttempt + 1, + maxRepairAttempts: config.judgeRepairAttempts, + error: safePreview(lastParseError, 800), + preDenoiseResponsePreview: safePreview(preDenoiseContent, 1200), + }); + messages.push({ role: "assistant", content: preDenoiseContent }); + messages.push({ + role: "user", + content: JSON.stringify({ + instruction: "Your previous judge answer could not be parsed as JSON even after cleanup. Return ONLY one raw JSON object. No Markdown fences, no prose, no comments.", + parseError: lastParseError, + requiredSchema: { decision: "complete|retry|fail", confidence: "0..1", reason: "short string", continuePrompt: "required when decision=retry unless no useful prompt is possible" }, + previousAnswerRaw: preDenoiseContent, + }), + }); + } + } + throw new Error(lastParseError ?? "MiniMax judge exhausted JSON repair attempts"); + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + logger("warn", "judge_failed_fallback", { taskId: task.id, error: message }); + return fallbackJudge(result, message); + } +} + +async function requestMiniMaxJudge(messages: Array<{ role: "system" | "user" | "assistant"; content: string }>): Promise { const controller = new AbortController(); const timer = setTimeout(() => controller.abort(), config.judgeTimeoutMs); try { @@ -742,31 +1325,27 @@ async function judgeTask(task: QueueTask, result: CodexRunResult): Promise; - const first = Array.isArray(payload.choices) ? extractRecord(payload.choices[0]) : null; - const content = extractString(first?.message, "content") ?? rawText; - const parsed = parseJudgeJson(content); - const confidenceRaw = Number(parsed.confidence ?? 0.5); - return { - decision: normalizedDecision(parsed.decision), - confidence: Number.isFinite(confidenceRaw) ? Math.max(0, Math.min(1, confidenceRaw)) : 0.5, - reason: typeof parsed.reason === "string" ? parsed.reason : "MiniMax judge returned a decision.", - continuePrompt: typeof parsed.continuePrompt === "string" && parsed.continuePrompt.trim().length > 0 ? parsed.continuePrompt : undefined, - source: "minimax", - raw: parsed as JsonValue, - }; - } catch (error) { - logger("warn", "judge_failed_fallback", { taskId: task.id, error: error instanceof Error ? error.message : String(error) }); - return fallbackJudge(result); + let content = rawText; + try { + const payload = JSON.parse(rawText) as Record; + const first = Array.isArray(payload.choices) ? extractRecord(payload.choices[0]) : null; + const message = extractRecord(first?.message); + content = extractString(message, "content") + ?? extractString(payload, "content") + ?? extractString(payload, "reply") + ?? extractString(payload, "text") + ?? (Object.prototype.hasOwnProperty.call(payload, "decision") ? JSON.stringify(payload) : rawText); + } catch { + content = rawText; + } + return { rawText, content }; } finally { clearTimeout(timer); } @@ -788,13 +1367,13 @@ const defaultJudgeProbeCases: JudgeProbeCase[] = [ }, { id: "completed_but_plan_only", - prompt: "Create /workspace/tmp/judge_probe.txt containing exactly judge-probe-ok, then summarize the file path.", - finalResponse: "I can do that. Plan: create the file under /workspace/tmp and then summarize the path.", + prompt: "Create /root/unidesk/tmp/judge_probe.txt containing exactly judge-probe-ok, then summarize the file path.", + finalResponse: "I can do that. Plan: create the file under /root/unidesk/tmp and then summarize the path.", expected: "retry", terminalStatus: "completed", outputs: [ - { channel: "user", text: "Create /workspace/tmp/judge_probe.txt containing exactly judge-probe-ok, then summarize the file path.\n", method: "enqueue" }, - { channel: "assistant", text: "I can do that. Plan: create the file under /workspace/tmp and then summarize the path.", method: "item/agentMessage/delta" }, + { channel: "user", text: "Create /root/unidesk/tmp/judge_probe.txt containing exactly judge-probe-ok, then summarize the file path.\n", method: "enqueue" }, + { channel: "assistant", text: "I can do that. Plan: create the file under /root/unidesk/tmp and then summarize the path.", method: "item/agentMessage/delta" }, { channel: "system", text: "turn completed status=completed\n", method: "turn/completed" }, ], events: [{ at: nowIso(), method: "turn/completed", status: "completed" }], @@ -819,6 +1398,7 @@ const defaultJudgeProbeCases: JudgeProbeCase[] = [ finalResponse: "", expected: "fail", terminalStatus: "interrupted", + cancelRequested: true, outputs: [ { channel: "user", text: "Run a long shell command, then produce a report.\n", method: "enqueue" }, { channel: "system", text: "interrupt requested\n", method: "turn/interrupt" }, @@ -862,7 +1442,7 @@ function taskForJudgeProbe(probe: JudgeProbeCase): QueueTask { output: (probe.outputs ?? []).map(outputForProbe), events: probe.events ?? [], attempts: [], - cancelRequested: false, + cancelRequested: probe.cancelRequested ?? false, nextPrompt: null, nextMode: null, }; @@ -886,15 +1466,14 @@ function resultForJudgeProbe(probe: JudgeProbeCase, task: QueueTask): CodexRunRe } async function runJudgeProbe(): Promise { - const results = []; - for (const probe of defaultJudgeProbeCases) { + const results = await Promise.all(defaultJudgeProbeCases.map(async (probe) => { const task = taskForJudgeProbe(probe); const result = resultForJudgeProbe(probe, task); const startedAt = nowIso(); const finishedAt = nowIso(); task.attempts.push(attemptFromResult(task, "initial", startedAt, finishedAt, result)); const judge = await judgeTask(task, result); - results.push({ + return { id: probe.id, expected: probe.expected, decision: judge.decision, @@ -903,8 +1482,8 @@ async function runJudgeProbe(): Promise { source: judge.source, reason: judge.reason, continuePrompt: judge.continuePrompt ?? null, - }); - } + }; + })); const hits = results.filter((result) => result.hit).length; const hitRate = results.length === 0 ? 0 : hits / results.length; logger("info", "judge_probe_completed", { configured: config.minimaxApiKey.length > 0, model: config.minimaxModel, hits, total: results.length, hitRate }); @@ -940,11 +1519,27 @@ function retryPrompt(task: QueueTask, judge: JudgeResult): string { return [retryInstruction, "原始任务:", task.prompt].join("\n\n"); } +function queueActiveTasksForRestartRetry(reason: string, method: string): number { + let recovered = 0; + for (const task of state.tasks) { + if (task.status !== "running" && task.status !== "judging") continue; + task.status = "retry_wait"; + task.activeTurnId = null; + task.lastError = reason; + task.nextMode = "retry"; + task.nextPrompt = retryPrompt(task, { decision: "retry", confidence: 1, reason, source: "fallback" }); + task.updatedAt = nowIso(); + appendOutput(task, "system", `${reason}; task queued for retry\n`, method); + recovered += 1; + } + return recovered; +} + async function runTask(task: QueueTask): Promise { logger("info", "task_run_start", { taskId: task.id, maxAttempts: task.maxAttempts, model: task.model, promptPreview: safePreview(task.prompt, 240) }); task.startedAt ??= nowIso(); task.lastError = null; - while (task.attempts.length < task.maxAttempts && !task.cancelRequested) { + while (task.attempts.length < task.maxAttempts && !task.cancelRequested && !shutdownRequested) { const mode = task.nextMode ?? (task.attempts.length === 0 ? "initial" : "retry"); const prompt = task.nextPrompt ?? task.prompt; const startedAt = nowIso(); @@ -994,6 +1589,14 @@ async function runTask(task: QueueTask): Promise { persistState(); } + if (shutdownRequested) { + if (task.status === "running" || task.status === "judging") { + queueActiveTasksForRestartRetry("Service stopping while task was active", "shutdown"); + persistState(); + } + return; + } + if (task.cancelRequested) { task.status = "canceled"; task.finishedAt = nowIso(); @@ -1009,13 +1612,26 @@ async function runTask(task: QueueTask): Promise { } async function processQueue(): Promise { - if (processing) return; + if (processing || shutdownRequested) return; processing = true; try { while (true) { + if (shutdownRequested) break; const task = state.tasks.find((item) => item.status === "queued" || item.status === "retry_wait") ?? null; if (task === null) break; - await runTask(task); + try { + await runTask(task); + } catch (error) { + const message = error instanceof Error ? error.stack ?? error.message : String(error); + appendOutput(task, "error", `${message}\n`, "queue-loop"); + task.status = "failed"; + task.finishedAt = nowIso(); + task.activeTurnId = null; + task.lastError = safePreview(message, 2000); + task.updatedAt = nowIso(); + persistState(); + logger("error", "task_failed_by_queue_exception", { taskId: task.id, error: safePreview(message, 1000) }); + } } } finally { processing = false; @@ -1024,6 +1640,7 @@ async function processQueue(): Promise { } function scheduleQueue(): void { + if (shutdownRequested) return; void processQueue().catch((error) => { logger("error", "queue_loop_failed", { error: error instanceof Error ? error.stack ?? error.message : String(error) }); processing = false; @@ -1031,6 +1648,31 @@ function scheduleQueue(): void { }); } +function hasRunnableTask(): boolean { + return state.tasks.some((task) => task.status === "queued" || task.status === "retry_wait"); +} + +function installShutdownHandlers(): void { + const stop = (signal: NodeJS.Signals): void => { + if (shutdownRequested) process.exit(0); + shutdownRequested = true; + const recovered = queueActiveTasksForRestartRetry("Service stopping while task was active", "shutdown"); + if (activeRun !== null) activeRun.app.stop(); + persistState(); + logger("warn", "service_shutdown_requeued_active_tasks", { signal, recovered }); + process.exit(0); + }; + process.once("SIGTERM", stop); + process.once("SIGINT", stop); +} + +setInterval(() => { + if (!processing && hasRunnableTask()) { + logger("warn", "queue_watchdog_rescheduled", { runnable: state.tasks.filter((task) => task.status === "queued" || task.status === "retry_wait").length }); + scheduleQueue(); + } +}, 5000).unref?.(); + function jsonResponse(body: unknown, status = 200): Response { return new Response(JSON.stringify(body, null, 2), { status, @@ -1132,13 +1774,20 @@ async function route(req: Request): Promise { try { if (url.pathname === "/" || url.pathname === "/health") return jsonResponse({ ok: true, service: "codex-queue", queue: queueSummary(), startedAt: serviceStartedAt }); if (url.pathname === "/logs") return jsonResponse({ ok: true, logs: recentLogs.slice(-parseLimit(url)) }); + if (url.pathname === "/api/dev-ready" && req.method === "GET") return jsonResponse({ ok: true, devReady: collectDevReady() }); if (url.pathname === "/api/judge/probe" && (req.method === "GET" || req.method === "POST")) return runJudgeProbe(); if (url.pathname === "/api/tasks" && req.method === "GET") { const status = url.searchParams.get("status"); - const tasks = state.tasks.filter((task) => status === null || task.status === status).slice(-parseLimit(url)).reverse().map((task) => taskForResponse(task)); + const tasks = state.tasks.filter((task) => status === null || task.status === status).slice(-parseLimit(url)).reverse().map((task) => taskForListResponse(task)); return jsonResponse({ ok: true, queue: queueSummary(), tasks }); } if ((url.pathname === "/api/tasks" || url.pathname === "/api/tasks/batch") && req.method === "POST") return createTasks(req); + const transcriptMatch = url.pathname.match(/^\/api\/tasks\/([^/]+)\/transcript$/u); + if (transcriptMatch !== null && req.method === "GET") { + const task = findTask(decodeURIComponent(transcriptMatch[1] ?? "")); + if (task === null) return jsonResponse({ ok: false, error: "task not found" }, 404); + return transcriptChunkResponse(task, url); + } const match = url.pathname.match(/^\/api\/tasks\/([^/]+)(?:\/(retry|steer|interrupt))?$/u); if (match !== null) { const task = findTask(decodeURIComponent(match[1] ?? "")); @@ -1148,7 +1797,11 @@ async function route(req: Request): Promise { if (action === "steer" && req.method === "POST") return steerTask(task, req); if (action === "interrupt" && req.method === "POST") return interruptTask(task); if (action !== undefined) return jsonResponse({ ok: false, error: "not found" }, 404); - if (req.method === "GET") return jsonResponse({ ok: true, task: taskForResponse(task, true) }); + if (req.method === "GET") { + if (url.searchParams.get("meta") === "1") return jsonResponse({ ok: true, task: taskForMetaResponse(task) }); + const includeRaw = url.searchParams.get("raw") === "1" || url.searchParams.get("full") === "1"; + return jsonResponse({ ok: true, task: taskForResponse(task, true, includeRaw) }); + } if (req.method === "DELETE") return interruptTask(task); return jsonResponse({ ok: false, error: "method not allowed" }, 405); } @@ -1159,18 +1812,15 @@ async function route(req: Request): Promise { } } +installShutdownHandlers(); prepareCodexHome(); Bun.serve({ hostname: config.host, port: config.port, idleTimeout: 120, fetch: route }); logger("info", "service_started", { port: config.port, statePath: config.statePath, workdir: config.defaultWorkdir, defaultModel: config.defaultModel, judgeConfigured: config.minimaxApiKey.length > 0 }); -for (const task of state.tasks) { - if (task.status === "running" || task.status === "judging") { - task.status = "retry_wait"; - task.activeTurnId = null; - task.lastError = "Service restarted while task was active."; - task.nextMode = "retry"; - task.nextPrompt = retryPrompt(task, { decision: "retry", confidence: 1, reason: "Service restart", source: "fallback" }); - appendOutput(task, "system", "service restarted; task queued for retry\n", "startup"); - } +{ + const devReady = collectDevReady() as Record; + logger(devReady.ok === true ? "info" : "warn", "dev_ready_check", devReady); } +const startupRecovered = queueActiveTasksForRestartRetry("Service restarted while task was active", "startup"); +if (startupRecovered > 0) logger("warn", "startup_requeued_active_tasks", { recovered: startupRecovered }); persistState(); scheduleQueue();