diff --git a/AGENTS.md b/AGENTS.md index 7d36c34f..10a1a4fd 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -27,8 +27,9 @@ UniDesk 是一个以主 server 为统一入口的分布式工作平台;本文 - `bun scripts/cli.ts config show`:校验并展示根目录 `config.json`,配置来源规则见 `docs/reference/config.md`。 - `bun scripts/cli.ts check [--full|--files|--scripts-typecheck|--components|--compose|--logs]`:默认只运行轻量配置和 TypeScript 语法检查;关键文件、`scripts/` 类型、组件类型、Docker Compose 和日志策略检查需显式开启,测试入口见 `TEST.md`。 - `bun scripts/cli.ts server start`:以异步 job 启动 database、backend-core、frontend、provider-gateway、code-queue-mgr 和主 server 用户服务,部署规则见 `docs/reference/deployment.md`。 -- `bun scripts/cli.ts server status`:查询固定端口、容器状态、健康检查和访问 URL,判定标准见 `docs/reference/deployment.md`。 -- `bun scripts/cli.ts server logs`:分页返回文件日志与 Docker 日志尾部,日志规则见 `docs/reference/observability.md`。 +- `bun scripts/cli.ts server status`:查询固定端口、swap 摘要、容器状态、健康检查和访问 URL,判定标准见 `docs/reference/deployment.md`。 +- `bun scripts/cli.ts server swap status|ensure [--path /swapfile] [--size 2GiB] [--dry-run]`:以 JSON 查看或幂等创建主 server swapfile,`ensure` 输出 before/after、动作、持久化状态和 degraded/failed 详情,规则见 `docs/reference/deployment.md`。 +- `bun scripts/cli.ts server logs [--tail-bytes N]`:分页返回文件日志与 Docker 日志尾部并带截断元数据,日志规则见 `docs/reference/observability.md`。 - `bun scripts/cli.ts server rebuild `:以 build-first、Compose lock、no-deps force-recreate 和 post-up validation 的异步 job 重建主 server Compose 内单个服务;Code Queue 执行面部署在 D601,规则见 `docs/reference/deployment.md`。 - `bun scripts/cli.ts provider attach [--master-server URL] [--up] [--force]`:在新增计算节点上生成两项配置的 provider-gateway 挂载包;默认只需要主 server URL(默认 `http://74.48.78.17/`)和唯一 Provider ID,生成的 Compose 固定 Docker socket、`pid: "host"`、`restart: always`、只读 `/workspace`、SSH 维护私钥挂载和 loopback egress proxy 端口,规则见 `docs/reference/provider-gateway.md`。 - `bun scripts/cli.ts ssh [ssh-like args...]`:通过 provider-gateway 的 Host SSH / WSL SSH 维护桥打开近似原生 ssh 的交互会话或远端命令,并在远端 PATH 注入 `apply_patch`、`glob` 与 `skill-discover`;`apply-patch`、`py`、`skills`、结构化 `find`、`glob` 和 `argv` 子命令用于避免远端补丁、Python stdin、skill 发现与常用只读命令的嵌套转义问题,使用规则见 `docs/reference/cli.md` 和 `docs/reference/provider-gateway.md`。 @@ -42,7 +43,7 @@ UniDesk 是一个以主 server 为统一入口的分布式工作平台;本文 - `bun scripts/cli.ts codex judge --attempt [--dry-run]`:按指定 task/attempt 用与队列 worker 相同的上下文构建和 MiniMax judge 调用路径单步复现完成判定;`--dry-run` 只输出 prompt/payload 诊断。 - `bun scripts/cli.ts codex interrupt|cancel `:通过 Code Queue 私有代理中断运行任务或取消 queued/retry_wait 任务,规则见 `docs/reference/cli.md`。 - `bun scripts/cli.ts server stop`:以异步 job 停止固定 Compose 项目中的全部 UniDesk 服务,停止后用 `server status` 复核。 -- `bun scripts/cli.ts job list` / `bun scripts/cli.ts job status latest`:查询 `.state/jobs/` 中的异步任务状态,job 机制见 `docs/reference/cli.md`。 +- `bun scripts/cli.ts job list [--limit N]` / `bun scripts/cli.ts job status latest [--tail-bytes N]`:分页查询 `.state/jobs/` 中的异步任务状态,状态输出只读日志尾部并保留完整日志路径,job 机制见 `docs/reference/cli.md`。 - `bun scripts/cli.ts debug health` / `bun scripts/cli.ts debug dispatch` / `bun scripts/cli.ts debug task`:通过 Docker 内网 core、真实 HTTP、WebSocket、系统指标、Docker 状态和 Host SSH 维护桥流程调试健康检查、任务下发与任务结果,调试规则见 `docs/reference/cli.md`。 - `bun scripts/cli.ts e2e run [--only pattern[,pattern...]] [--skip pattern[,pattern...]]`:支持按 check/prefix/wildcard 选择性执行公网 frontend/provider ingress、内网 core/database、provider-gateway 自接入与 Playwright 验证;日常迭代先跑当前问题对应的最小检查集,最终交付再跑全量回归,验收规则见 `docs/reference/e2e.md`。 diff --git a/TEST.md b/TEST.md index 6f49d204..61099dca 100644 --- a/TEST.md +++ b/TEST.md @@ -2,7 +2,7 @@ ## T1 CLI 可观测性与配置校验 -阅读 `AGENTS.md`(本项目 `AGENTS.md` 同时承担 `SKILL.md` 对 `scripts/cli.ts` 的解释职责),然后用 cli 手动测试以下内容:运行 `bun scripts/cli.ts help`、`bun scripts/cli.ts config show`、`bun scripts/cli.ts check`,确认每条命令都有 JSON 输出、失败时包含错误对象、`config.json` 是唯一配置来源,且默认 `check` 只执行轻量配置和 TypeScript 语法检查;需要覆盖关键文件、`scripts/` 类型、`src/components/` 类型、Docker Compose config 和日志策略时,显式运行 `bun scripts/cli.ts check --full`。 +阅读 `AGENTS.md`(本项目 `AGENTS.md` 同时承担 `SKILL.md` 对 `scripts/cli.ts` 的解释职责),然后用 cli 手动测试以下内容:运行 `bun scripts/cli.ts help`、`bun scripts/cli.ts config show`、`bun scripts/cli.ts check`,确认每条命令都有 JSON 输出、失败时包含错误对象、`config.json` 是唯一配置来源,且默认 `check` 只执行轻量配置和 TypeScript 语法检查;需要覆盖关键文件、`scripts/` 类型、`src/components/` 类型、Docker Compose config 和日志策略时,显式运行 `bun scripts/cli.ts check --full`。运行 `set -o pipefail; bun scripts/cli.ts server status | head -1`,确认下游 pipe 关闭时不会打印 Bun EPIPE stack trace。 ## T2 Docker 栈异步启动 @@ -10,7 +10,7 @@ ## T3 主 server 自接入 Provider Gateway -阅读 `AGENTS.md`(本项目 `AGENTS.md` 同时承担 `SKILL.md` 对 `scripts/cli.ts` 的解释职责),然后用 cli 手动测试以下内容:运行 `bun scripts/cli.ts server status` 和 `bun scripts/cli.ts debug health`,确认面向浏览器的公网入口只有 frontend 与 provider ingress,backend-core 显示为 Docker 内部端口,database/OA Event Flow 若因 D601 Code Queue 映射宿主端口也必须显示为受限宿主端口,且 `network.restrictedHostAccess.allowedSourceCidrs` 已生成来源限制,`/api/nodes` 中存在 `main-server` provider,状态为 `online`,`/api/nodes/system-status` 中存在 CPU/内存/硬盘采样,`/api/nodes/docker-status` 中存在 `main-server` 的 Docker 快照,且 provider 标签中能看到 Docker socket 可用性。 +阅读 `AGENTS.md`(本项目 `AGENTS.md` 同时承担 `SKILL.md` 对 `scripts/cli.ts` 的解释职责),然后用 cli 手动测试以下内容:运行 `bun scripts/cli.ts server status`、`bun scripts/cli.ts server swap status` 和 `bun scripts/cli.ts debug health`,确认 `server status` 包含 `swap` 摘要,`server swap status` 快速返回 total memory、active swaps、`/etc/fstab` 持久化状态和 warning;面向浏览器的公网入口只有 frontend 与 provider ingress,backend-core 显示为 Docker 内部端口,database/OA Event Flow 若因 D601 Code Queue 映射宿主端口也必须显示为受限宿主端口,且 `network.restrictedHostAccess.allowedSourceCidrs` 已生成来源限制,`/api/nodes` 中存在 `main-server` provider,状态为 `online`,`/api/nodes/system-status` 中存在 CPU/内存/硬盘采样,`/api/nodes/docker-status` 中存在 `main-server` 的 Docker 快照,且 provider 标签中能看到 Docker socket 可用性。若 `swap.warning` 非空,先运行 `bun scripts/cli.ts server swap ensure --dry-run` 审查动作,再谨慎执行 `bun scripts/cli.ts server swap ensure --size 2GiB`,确认输出包含 `before`/`after`、`actions`、`errors` 和 `status=ok|degraded`;已有 swap 时 ensure 必须 no-op。 ## T4 前端控制台连通 @@ -22,7 +22,7 @@ ## T6 日志第一现场验证 -阅读 `AGENTS.md`(本项目 `AGENTS.md` 同时承担 `SKILL.md` 对 `scripts/cli.ts` 的解释职责),然后用 cli 手动测试以下内容:运行 `bun scripts/cli.ts server logs --tail-bytes 20000`,实际读取输出中列出的 `logs/{YYYYMMDD}/` 文件,确认 backend-core、frontend、provider-gateway、database 都有实时日志;backend-core 与 Code Queue/Codex app-server 日志必须按 `logs/{YYYYMMDD}/{startStamp}_{YYYYMMDD}_{HH}_{service}.jsonl` 小时切片,默认日志族总量不得超过 `1GiB`,超过后会删除最旧切片;日志不得只有启动行,错误日志必须包含可定位的错误消息或 stack。 +阅读 `AGENTS.md`(本项目 `AGENTS.md` 同时承担 `SKILL.md` 对 `scripts/cli.ts` 的解释职责),然后用 cli 手动测试以下内容:运行 `bun scripts/cli.ts server logs --tail-bytes 20000`,确认输出包含 `policy`、每个日志文件的 `sizeBytes/tailBytes/truncated` 和 Docker logs 的 tail 元数据,实际读取输出中列出的 `logs/{YYYYMMDD}/` 文件,确认 backend-core、frontend、provider-gateway、database 都有实时日志;运行 `bun scripts/cli.ts job list --limit 5` 和 `bun scripts/cli.ts job status latest --tail-bytes 20000`,确认 job 列表分页、状态输出只含 stdout/stderr 尾部且保留完整日志路径;backend-core 与 Code Queue/Codex app-server 日志必须按 `logs/{YYYYMMDD}/{startStamp}_{YYYYMMDD}_{HH}_{service}.jsonl` 小时切片,默认日志族总量不得超过 `1GiB`,超过后会删除最旧切片;日志不得只有启动行,错误日志必须包含可定位的错误消息或 stack。 ## T7 停止与端口释放 diff --git a/docs/reference/cli.md b/docs/reference/cli.md index 696f7051..a95de5fc 100644 --- a/docs/reference/cli.md +++ b/docs/reference/cli.md @@ -10,8 +10,9 @@ UniDesk 的统一 CLI 入口是根目录 `scripts/cli.ts`,运行方式固定 - `check` 默认只执行轻量配置校验、Bun 版本检查和 Bun Transpiler 语法解析(覆盖 CLI 入口、主要 `scripts/` 模块和核心组件入口,不做类型推导);关键文件存在性、`scripts/` TypeScript 类型检查、`src/components/` TypeScript 类型检查、Docker Compose config 和日志轮转策略扫描默认不启用,分别通过 `--files`、`--scripts-typecheck`、`--components`、`--compose`、`--logs` 开启,或用 `--full` 一次性开启。 - `server start` 创建异步 job,在后台执行 Docker 构建和启动;命令本身只负责返回 job id、日志路径和启动命令。 - `server stop` 创建异步 job,在后台停止固定 Compose project 中的全部 UniDesk 服务。 -- `server status` 查询公开端口、受限宿主端口、内部端口、Compose 容器、core/frontend/provider/database 健康检查和访问 URL;D601 Code Queue 使用的 PostgreSQL/OA Event Flow host mapping 必须出现在受限宿主端口而不是无条件公开入口中。 -- `server logs` 返回 `logs/` 文件日志和 Docker 容器日志的尾部,默认限制输出大小,避免日志爆炸。 +- `server status` 查询公开端口、受限宿主端口、内部端口、主机 swap 摘要、Compose 容器、core/frontend/provider/database 健康检查和访问 URL;D601 Code Queue 使用的 PostgreSQL/OA Event Flow host mapping 必须出现在受限宿主端口而不是无条件公开入口中。低内存主 server 上 `swap.warning` 非空时,先执行 `server swap status` 或 `server swap ensure`。 +- `server swap status|ensure [--path /swapfile] [--size 2GiB] [--dry-run]` 是主 server swap 管理入口。`status` 仅读 `/proc/meminfo`、`/proc/swaps` 和 `/etc/fstab` 并返回 JSON;`ensure` 在已有任何 active swap 时只报告 no-op,在无 active swap 时创建固定 swapfile、`chmod 600`、`mkswap`、`swapon` 并尽量写入 `/etc/fstab`。输出必须包含 `before`、`after`、total memory、active swap、持久化状态、关键动作和错误详情;若 swap 已启用但 fstab 写入失败,状态为 `degraded`,调用者需按返回的 detail 修复持久化。 +- `server logs` 返回 `logs/` 文件日志和 Docker 容器日志的尾部,默认限制输出大小,避免日志爆炸。实现必须只读取文件末尾字节,不得为了 tail 先把巨大日志完整读入 CLI 内存。 - `server rebuild ` 创建异步 job,先构建目标服务镜像,随后在 `.state/locks/server-compose.lock` 串行保护下用 `--no-deps --force-recreate` 替换目标 service 并等待容器 `healthy/running`;该命令用于替代手工删除容器的兜底流程,其中 `todo-note`、`code-queue-mgr`、`project-manager`、`baidu-netdisk` 和 `oa-event-flow` 只重建主 server 承载的对应后端,不会重建或删除 database 命名卷。D601 Code Queue 执行面不由 `server rebuild` 管理。 - `provider attach [--master-server URL] [--up] [--force]` 在新计算节点生成两项配置的 provider-gateway 挂载包:`.state/provider-.env` 默认只包含 `UNIDESK_MASTER_SERVER` 与 `PROVIDER_ID`,`provider-.yml` 固定 Docker socket、`pid: "host"`、`restart: always`、只读 `/workspace` 和 SSH 维护私钥挂载;`--up` 会立即执行生成的 `docker compose up -d --build`。 - `ssh [ssh-like args...]` 通过 backend-core 内网 WebSocket broker 和 provider-gateway 的 Host SSH / WSL SSH 维护桥连接目标节点;无后续参数时进入远端登录 shell,有后续参数时按 ssh 远端命令体验执行并返回远端 exit code。 @@ -30,7 +31,7 @@ UniDesk 的统一 CLI 入口是根目录 `scripts/cli.ts`,运行方式固定 - `codex interrupt|cancel ` 通过 Code Queue 私有代理请求中断;running/judging 任务会请求 D601 当前 agent run 停止,queued/retry_wait 任务的取消也必须保持与 WebUI 相同代理路径,返回有界 task 摘要和后续查询命令。任何需要接触 active run 的动作仍属于 D601 执行面。 - Code Queue 多队列 lane 由 `codex` 命令命名空间管理:`queues` 列表、`queue create ` 创建、`queue merge --into ` 合并、`move --queue ` 迁移;这些队列管理入口默认由主 server `code-queue-mgr` 直管 PostgreSQL,仍通过稳定 `code-queue` 用户服务代理路径访问。同一个 queue 内部串行执行,不同 queue 之间并行执行。迁移只允许尚未被 scheduler claim 的 `queued`/`retry_wait` 任务,必须满足 `startedAt=null`、`currentAttempt=0` 且没有 active thread/turn;已进入 `running`/`judging` 或已有 claim 标记的任务返回 409,不得被 move/merge 回写成 queued。合并会移动可迁移任务归属并自动删除源 queue 记录,只保留合并后的目标 queue;若 source 或 target queue 存在 active/claimed 任务,合并整体返回 409。合并后的目标 queue 按任务原 `queueEnteredAt`/`createdAt` 时间顺序串行,成功迁移 queued/retry_wait 任务后由 D601 scheduler 轮询推进。 - 所有 `codex` 查询和管理命令必须走与 WebUI 相同的 backend-core 私有代理路径 `/api/microservices/code-queue/proxy/...`;CLI 不得为了提交、移动、中断、取消或队列管理直接调用 D601 内部 Service、数据库、pod curl 或 k3sctl scheduler 子服务。若该路径失败,应先修复 CLI/backend/provider tunnel 链路,而不是绕过控制面。 -- `job list` 与 `job status` 查询 `.state/jobs/` 文件系统状态,是异步命令的可观测入口。 +- `job list [--limit N] [--include-command]` 与 `job status [--tail-bytes N]` 查询 `.state/jobs/` 文件系统状态,是异步命令的可观测入口。`job list` 默认只返回最新 50 条摘要;`job status` 默认只返回 stdout/stderr 末尾 12000 字节,并带 `tailPolicy` 与完整日志路径。 - `debug health`、`debug dispatch` 与 `debug task` 走真实内部 core、WebSocket、数据库、provider、系统指标、Docker 状态和 Host SSH 维护桥流程,只用于开发调试,不写入 `TEST.md` 的正式验收步骤。 - `e2e run [--only pattern[,pattern...]] [--skip pattern[,pattern...]]` 使用 publicHost 派生的公开 frontend/provider ingress URL,并通过 Docker 内网验证 core API、PostgreSQL、provider self-connection、系统指标曲线、Docker 状态快照、provider.upgrade 预检和 Playwright 前端页面,是交付前的自动化 E2E 门禁;CLI 默认输出 check 状态摘要,完整诊断写入 `resultPath`,日常迭代应优先用 `--only` / `--skip` 跑最小必要集合。 @@ -46,7 +47,9 @@ UniDesk 的统一 CLI 入口是根目录 `scripts/cli.ts`,运行方式固定 每条命令的最外层 JSON 包含 `ok`、`command` 和 `data` 或 `error`。失败时 CLI 设置非零退出码,但仍然输出 JSON 错误对象;错误对象应包含 `name`、`message` 和可用的 `stack`。 -`microservice proxy` 是面向人工验证和受控调试的私有后端入口。默认 method 为 GET;使用 `--body-json JSON`、`--body-file path` 或 `--body-stdin` 时默认 method 切换为 POST,也可显式加 `--method POST|PUT|PATCH|DELETE`,但 GET/HEAD 不允许携带请求体。所有请求仍受 config 中的 `allowedMethods` 和 `allowedPathPrefixes` 限制。为了避免 Pipeline snapshot 这类超大业务 JSON 造成 CLI 输出爆炸,响应 body 超过默认阈值时会返回 `bodyOmitted=true`、`bodyPreview`、`bodyBytes` 和 `rawHint`;需要完整 body 时显式添加 `--raw`,或用 `--max-body-bytes ` 调整预览阈值。正式 frontend 展示仍应优先使用业务控件和 `__unideskArrayLimit` 这类展示级裁剪参数,而不是默认倾倒完整 JSON。 +诊断命令默认采用渐进披露:`server logs`、`job list/status`、`codex task/trace/output` 和 `microservice proxy` 都必须有默认条数、字节数或文本预览上限;用户显式传 `--limit`、`--tail-bytes`、`--full-text` 或 `--full` 才扩大单次输出。CLI stdout 遇到下游 pipe 关闭的 `EPIPE` 必须安静退出,不得打印 Bun stack trace。 + +`microservice proxy` 是面向人工验证和受控调试的私有后端入口。默认 method 为 GET;使用 `--body-json JSON`、`--body-file path` 或 `--body-stdin` 时默认 method 切换为 POST,也可显式加 `--method POST|PUT|PATCH|DELETE`,但 GET/HEAD 不允许携带请求体。所有请求仍受 config 中的 `allowedMethods` 和 `allowedPathPrefixes` 限制。为了避免 Pipeline snapshot 这类超大业务 JSON 造成 CLI 输出爆炸,响应 body 超过默认阈值时会返回 `bodyOmitted=true`、`bodyPreview`、`bodyBytes` 和 `rawHint`;`--raw` 仍受默认硬限额保护,需要完整 body 时显式添加 `--raw --full`,或用 `--max-body-bytes ` 调整预览阈值。正式 frontend 展示仍应优先使用业务控件和 `__unideskArrayLimit` 这类展示级裁剪参数,而不是默认倾倒完整 JSON。 `network perf` 用于生成组网性能前后对比数据。标准 Code Queue overview 读路径基准命令是 `bun scripts/cli.ts network perf --service code-queue --path /api/tasks/overview?limit=30 --count 30 --concurrency 1 --label before`,远程主 server 可用 `bun scripts/cli.ts --main-server-ip 74.48.78.17 network perf ...`。输出包含成功/失败数、状态码分布、`x-unidesk-cache`、`x-unidesk-proxy-mode`、`x-unidesk-upstream-proxy-mode` 分布和 min/p50/p90/p95/max;provider-gateway 长连接数据面验收应看到 `proxyModeCounts.provider-ws-http-tunnel`,adapter native Service 数据面验收应看到 upstream proxy mode 为 `kubernetes-native-service`,若出现 `kubernetes-api-service-proxy` 必须结合 `/api/control-plane.nativeServiceProxy.failedServices` 解释 fallback 原因。 diff --git a/docs/reference/deployment.md b/docs/reference/deployment.md index a81fc6e9..fc13417c 100644 --- a/docs/reference/deployment.md +++ b/docs/reference/deployment.md @@ -29,6 +29,14 @@ Compose v2 安装后仍然必须遵守 UniDesk 的服务控制入口:全栈生 版本化用户服务部署优先使用 `bun scripts/cli.ts deploy apply`。`deploy.json` 只声明服务 `id`、`repo` 和 `commitId`;目标节点、Dockerfile、Compose、Kubernetes manifest、健康检查和代理路径继续来自 `config.json` 与现有 manifest。主 server 直管微服务和内部 sidecar,例如 `code-queue-mgr`,也必须支持这一路径:`deploy apply --service code-queue-mgr` 从 `deploy.json` 指定 commit 导出源码、构建镜像、替换固定 Compose service 并验证运行中镜像/健康信息的 commit。部署必须遵循 target-side build:服务部署到哪台 target,就在哪台 target 从 remote commit 导出源码、一次性代理构建镜像并部署;不得把中心构建镜像作为默认分发路径,也不得用 `docker commit` 或脏 worktree 作为部署输入。完整规则见 `docs/reference/deploy.md`。 +## Main Server Swap + +主 server 可能运行在约 2 GiB 内存的小规格机器上,短时 Docker build、Codex/control-plane 调查和日志读取会触发 global OOM。主 server 必须通过 `bun scripts/cli.ts server swap status` 暴露当前 memory/swap 状态,并在 `server status` 的 `swap` 字段中给出同一摘要。 + +缺少 active swap 时,正式修复入口是 `bun scripts/cli.ts server swap ensure [--path /swapfile] [--size 2GiB]`。该命令必须幂等:已有任何 active swap 时只返回 no-op 状态;无 swap 时创建固定 swapfile、设置 `0600`、执行 `mkswap` 与 `swapon`,并尽量把 ` none swap sw 0 0` 写入 `/etc/fstab`。如果当前环境允许 `swapon` 但不允许写 `/etc/fstab`,命令返回 `status=degraded`,并在 JSON 的 `errors`/`actions` 中说明下一步;不得静默假装持久化完成。 + +swap 管理不能被强塞进所有热路径。`server start/status` 可以暴露 warning 或摘要,但不会自动创建 swap;需要变更主机 swap 时必须显式运行 `server swap ensure`,并用返回的 `before`/`after` 和 `fstab.persisted` 作为验收记录。 + ## Start And Stop `bun scripts/cli.ts server start` 与 `bun scripts/cli.ts server stop` 都是异步 job。启动 job 只执行固定 Compose project 的 `up -d --build --remove-orphans`,不得先 `down`,避免在 provider-gateway 旧容器或网络冲突时把长驻控制面容器先删掉又启动失败;停止 job 才允许执行 `down --remove-orphans`。启动和停止流程都禁止删除 Docker named volume。所有会改变主 server Compose 状态的 job 必须通过 `.state/locks/server-compose.lock` 串行化;连续 `server rebuild` 命令只代表连续创建异步 job,不能代表第一个 job 已结束,实际容器变更仍必须由 Compose lock 串行执行。 diff --git a/docs/reference/observability.md b/docs/reference/observability.md index a108f6a4..e2be8653 100644 --- a/docs/reference/observability.md +++ b/docs/reference/observability.md @@ -4,7 +4,7 @@ UniDesk 的可观测性优先级高于静默成功。CLI、服务日志、Docker ## CLI Logs -异步 job 的 stdout 和 stderr 位于 `.state/jobs/`。`job status` 会返回有限尾部,避免输出爆炸,同时保留完整日志文件路径便于继续排查。 +异步 job 的 stdout 和 stderr 位于 `.state/jobs/`。`job list` 默认只返回最新 50 条摘要;`job status` 会返回有限尾部,避免输出爆炸,同时保留完整日志文件路径便于继续排查。实现必须只读取日志尾部字节,不得先把完整 job 日志读入 CLI 内存。 ## Service Logs @@ -18,7 +18,13 @@ UniDesk 的可观测性优先级高于静默成功。CLI、服务日志、Docker ## Log Access -`bun scripts/cli.ts server logs` 同时读取文件日志和 Docker logs 尾部。文件日志是服务崩溃时的第一现场,Docker logs 是容器启动失败和 stdout/stderr 的辅助来源。 +`bun scripts/cli.ts server logs` 同时读取文件日志和 Docker logs 尾部。文件日志是服务崩溃时的第一现场,Docker logs 是容器启动失败和 stdout/stderr 的辅助来源。默认输出必须包含 tail 字节数、是否截断和完整文件路径;扩大读取范围只能通过显式 `--tail-bytes N`,且 CLI 会对单次 tail 设置硬上限。 + +## Diagnostic Output Limits + +所有诊断型 CLI 输出必须优先摘要化、尾部化或分页化,禁止默认倾倒大 JSON、全量日志、全量 trace 或 `.state`/`logs` 宽泛搜索结果。当前硬限额入口包括:`server logs` 默认 3000 bytes tail、`job list` 默认 50 条、`job status` 默认 12000 bytes tail、`codex task/trace/output` 默认分页与文本预览、`microservice proxy` 默认 body 预览且 `--raw` 仍受硬限额保护。确实需要完整响应时必须显式使用对应的 `--full`、`--full-text`、`--tail-bytes` 或 `--limit` 参数,并在验收记录中说明为什么需要扩大输出。 + +CLI 写 stdout/stderr 遇到下游 pipe 关闭的 `EPIPE` 必须安静退出,不能打印 Bun stack trace。常见验证命令是 `set -o pipefail; bun scripts/cli.ts server status | head -1`,应只看到第一行 JSON 而无额外错误噪声。 ## Task Liveness @@ -32,6 +38,6 @@ frontend Bun server 必须提供同源 `/api/frontend-performance`,记录 webu 性能优化必须先用这些指标锁定慢操作名称、路径、耗时和代理层级,再改后端查询或前后端通信策略;不得只凭主观体感改 UI。Code Queue 这类控制面页面出现 `core_proxy`、`GET /api/microservices/code-queue/proxy/api/tasks/overview`、`POST /api/microservices/code-queue/proxy/api/tasks//read` 等超过 1s 的慢操作时,应保留优化前后的性能面板证据,并同时记录 live API 耗时、容器内存、`/health` 存储摘要和是否仍通过 PostgreSQL/append-only archive 重建历史数据。短 TTL cache、warmup 或页面内存缓存只能作为重复请求抖动保护,性能证据必须证明数据库索引/聚合、分页和渐进式披露本身已把核心路径降到目标内,不能用长缓存遮蔽慢 SQL 或全量 JSON 物化。 -当最近失败请求集中出现 frontend `core_proxy` 502/503/504,路径为 `/api/microservices/code-queue/proxy/...` 的 overview、trace 或 summary,且 k3s/k8s Pod 仍在运行时,必须先运行 `bun scripts/cli.ts microservice diagnostics code-queue`,区分 provider-gateway online、WebSocket HTTP tunnel、k3sctl-adapter、Kubernetes API service proxy 和目标 Service 五段状态。provider tunnel 类失败必须记录响应 body/headers 中的 `requestId`、`stage`、`failureReason`、`x-unidesk-request-id` 和 `x-unidesk-tunnel-error`;如需主动验证错误结构,运行 `bun scripts/cli.ts microservice tunnel-self-test code-queue`,该自测应返回预期失败但 `ok=true` 的诊断结果。随后再继续判断“Kubernetes API service proxy 不可达”“Code Queue 进程不可达”和“Code Queue event loop 被热路径同步工作饿死”。排障顺序是同时查看 `/api/frontend-performance`、`/api/performance`、`k3sctl-adapter` `/api/control-plane`、Kubernetes Pod `/live`、`/health`、overview/trace-step curl、`kubectl top pod` 或 Docker stats、容器 `RestartCount`/`OOMKilled` 和 Code Queue 日志;如果 Pod 内 `/health` 也超时,应优先检查实时 output 发布、archive 读取、transcript 构建、统计计算、启动维护、历史 OA backfill 和远程 Provider 准备/SSH 子进程是否阻塞 event loop,而不是先调整 frontend 渲染或代理超时。Code Queue 默认不得在启动时自动执行历史 OA backfill 或通知表索引维护;显式 backfill 必须作为运维动作记录,并在运行期间并发证明 `/live`、`/health` 与 `/api/tasks/overview` 仍快速返回。涉及 D601 等远程 Provider 时,还要检查 `runCodeQueueSsh`/开发容器准备是否仍存在同步子进程、无 timeout 的 SSH、无上限 stdout/stderr 或 stale TUN 重建等待;修复后必须在远程准备探针运行期间并发证明 Pod `/health` 与 `/api/tasks/overview` 仍快速返回。 +当最近失败请求集中出现 frontend `core_proxy` 502/503/504,路径为 `/api/microservices/code-queue/proxy/...` 的 overview、trace 或 summary,且 k3s/k8s Pod 仍在运行时,必须先运行 `bun scripts/cli.ts microservice diagnostics code-queue`,区分 provider-gateway online、WebSocket HTTP tunnel、k3sctl-adapter、Kubernetes API service proxy 和目标 Service 五段状态。provider tunnel 类失败必须记录响应 body/headers 中的 `requestId`、`stage`、`failureReason`、`x-unidesk-request-id` 和 `x-unidesk-tunnel-error`;如需主动验证错误结构,运行 `bun scripts/cli.ts microservice tunnel-self-test code-queue`,该自测应返回预期失败但 `ok=true` 的诊断结果。随后再继续判断“Kubernetes API service proxy 不可达”“Code Queue 进程不可达”和“Code Queue event loop 被热路径同步工作饿死”。如果 `debug health` 或 provider-gateway egress health 显示 `providerGatewayEgressProxyActiveTunnels` 持续偏高、`pendingTunnels` 非零或 `oldestTunnelAgeMs` 长时间增长,应先按 provider-gateway egress tunnel 生命周期排障,确认 `egress_tcp_open`、connect timeout、idle cleanup 与 core socket close 清理是否生效。排障顺序是同时查看 `/api/frontend-performance`、`/api/performance`、`k3sctl-adapter` `/api/control-plane`、Kubernetes Pod `/live`、`/health`、overview/trace-step curl、`kubectl top pod` 或 Docker stats、容器 `RestartCount`/`OOMKilled` 和 Code Queue 日志;如果 Pod 内 `/health` 也超时,应优先检查实时 output 发布、archive 读取、transcript 构建、统计计算、启动维护、历史 OA backfill 和远程 Provider 准备/SSH 子进程是否阻塞 event loop,而不是先调整 frontend 渲染或代理超时。Code Queue 默认不得在启动时自动执行历史 OA backfill 或通知表索引维护;显式 backfill 必须作为运维动作记录,并在运行期间并发证明 `/live`、`/health` 与 `/api/tasks/overview` 仍快速返回。涉及 D601 等远程 Provider 时,还要检查 `runCodeQueueSsh`/开发容器准备是否仍存在同步子进程、无 timeout 的 SSH、无上限 stdout/stderr 或 stale TUN 重建等待;修复后必须在远程准备探针运行期间并发证明 Pod `/health` 与 `/api/tasks/overview` 仍快速返回。 Code Queue task 明明产出最终回复却反复 `retry_wait` 时,应优先用任务详情里的 latest attempt 字段核查 `terminalStatus`、`transportClosedBeforeTerminal`、`appServerExitCode`、`finalResponseChars`、`judge.raw._safetyOverride` 和 attempt output。OpenCode 远程任务中,`opencode completed status=completed exit=0` 加当前 attempt 非空 assistant 输出应对应 `terminalStatus=completed`、`transportClosedBeforeTerminal=false`;如果因为缺少 `step_finish` 事件仍触发 `_safetyOverride=terminal_not_completed`,说明协议终态归一化有回归。相反,当前 attempt 没有最终 assistant response 时即使 tool/read/bash 证据完整,也必须 retry,不能用旧 `task.finalResponse` 或 reasoning/tool evidence 代替可见最终回复。 diff --git a/docs/reference/provider-gateway.md b/docs/reference/provider-gateway.md index 3290151d..f20c2fee 100644 --- a/docs/reference/provider-gateway.md +++ b/docs/reference/provider-gateway.md @@ -100,10 +100,12 @@ backend-core 必须把 provider WebSocket HTTP tunnel 的失败分类到响应 b provider-gateway 可以提供 egress HTTP CONNECT 代理,用于让 Code Queue、Pipeline runner、target-side Docker build 等节点侧执行环境通过既有 provider WebSocket 通道出网。代理默认监听容器内 `0.0.0.0:18789`,节点部署必须只发布为宿主 loopback `127.0.0.1:18789->18789/tcp`,不得开放公网端口;普通 Docker 执行容器可通过同一私有 Docker network 访问 provider-gateway 容器名,k3s/k8s Pod 必须通过显式 Kubernetes Service 暴露同节点 provider-gateway 私有 endpoint,例如 D601 Code Queue 使用 selector 指向 hostNetwork 桥接 Pod 的 `d601-provider-egress-proxy.unidesk.svc.cluster.local:18789`,不得把固定 Docker bridge IP、手工 EndpointSlice 或该 egress Service 当作业务 HTTP 入口。代理只负责把本地 CONNECT/absolute HTTP 请求转换为 `egress_tcp_open`、`egress_tcp_data`、`egress_tcp_close` 消息;backend-core 在主 server 侧建立真实 TCP 连接并把数据回传,避免 D601 等计算节点本地网络不可达时卡死 Codex/Git/NPM/apt/Playwright。 -该能力属于 provider-gateway 通道能力,register/heartbeat 的 `unideskCapabilities` 必须包含 `network.egress-proxy`,labels 必须上报 `providerGatewayEgressProxy*` 状态。不得再为某个用户服务单独注册伪 provider 来实现出网代理;否则节点列表会出现虚假 provider,且代理、统计、升级路径会形成多套通道。代理健康检查使用 `GET /__unidesk/egress-proxy/health`,返回 `connected`、`providerId`、`activeTunnels` 和监听端口;业务服务自己的 `/health` 应把该结果作为排障证据透出。 +该能力属于 provider-gateway 通道能力,register/heartbeat 的 `unideskCapabilities` 必须包含 `network.egress-proxy`,labels 必须上报 `providerGatewayEgressProxy*` 状态。不得再为某个用户服务单独注册伪 provider 来实现出网代理;否则节点列表会出现虚假 provider,且代理、统计、升级路径会形成多套通道。代理健康检查使用 `GET /__unidesk/egress-proxy/health`,返回 `connected`、`providerId`、`activeTunnels`、`pendingTunnels`、`oldestTunnelAgeMs`、`openTimeoutMs`、`idleTimeoutMs` 和监听端口;业务服务自己的 `/health` 应把该结果作为排障证据透出。 egress proxy 的长期边界是“统一 provider 通道,不引入第二控制面”。backend-core 只接受在线 provider socket 上的 `egress_tcp_*` 消息,并在该 socket 关闭时销毁全部对应 TCP relay;provider-gateway 只维护本地 HTTP proxy 与 WebSocket 消息映射,不保存业务状态,不参与任务调度、统计或节点注册以外的控制面。执行容器、用户服务、Pipeline runner 和 provider-side deploy build 不允许直接连接 backend-core provider ingress,也不允许携带 provider token 自行注册;需要出网时只能连接同节点 provider-gateway 的私有 proxy endpoint。当前 k3s/k8s Code Queue 通过 `d601-provider-egress-proxy` Kubernetes Service 连接 D601 provider-gateway egress endpoint,这是 Pod 内的出网入口,不是业务 HTTP 代理入口,也不能替代 Kubernetes API service proxy。部署构建同样不得新建 SSH SOCKS、公网 master proxy 或宿主全局代理;构建脚本只能把 provider-gateway WS egress 作为短生命周期环境变量和 Docker build-arg 注入,并配合目标节点本地 BuildKit/image cache 避免重复下载大依赖层。 +egress tunnel 必须有生命周期边界:provider-gateway 发出 `egress_tcp_open` 后如果主 server 未在 `openTimeoutMs` 内返回 `egress_tcp_opened` 或 close,必须主动关闭本地 client 并向 core 发送 `egress_tcp_close`;provider-gateway 与 backend-core 都必须对长时间无数据的 relay 执行 idle 清理,避免 provider WebSocket 抖动、TCP connect 卡住或上游未关闭时留下 stale tunnel。排障时如果 `activeTunnels` 持续增长、`pendingTunnels` 非零或 `oldestTunnelAgeMs` 明显超过业务请求耗时,应先看 provider-gateway 与 backend-core egress 清理日志,再判断 Code Queue、PostgreSQL 或 OA Event Flow 本身是否慢。 + 故障语义必须显式,不允许静默 fallback。provider-gateway 到 backend-core 的 WebSocket 未连接时,本地 proxy 必须返回 503;执行容器不能自动绕过到 D601 本地直连公网、外部公共代理或主 server 公网 HTTP 端口。`NO_PROXY` 只用于 PostgreSQL、OA Event Flow、ClaudeQQ、frontend/backend-core 内网代理、provider-gateway health 等明确内网链路,不能把 GitHub、模型 API、npm registry 等外部目标加入绕过列表。`hyueapi.com` 是明确的模型 API 例外:该上游会拒绝 provider-gateway egress proxy 出口,Code Queue 必须用 `CODE_QUEUE_EGRESS_PROXY_NO_PROXY` / `NO_PROXY` 将 `hyueapi.com,.hyueapi.com` 配成直连,其它模型 API 仍不得默认绕过 proxy。验收必须同时证明 provider-gateway labels、业务服务 `/health` 和执行容器内 `curl -I https://...` 都走同一 proxy path,hyueapi 例外则以 Code Queue `/health.egressProxy.noProxy` 和目标任务成功完成作为证据。 ## Gateway Version Metadata diff --git a/scripts/cli.ts b/scripts/cli.ts index 2508cefe..8c9e698d 100644 --- a/scripts/cli.ts +++ b/scripts/cli.ts @@ -3,7 +3,7 @@ import { debugDispatch, debugHealth, debugTask, isDebugDispatchCommand, type Deb import { isRebuildableService, rebuildService, stackLogs, stackStatus, startStack, stopStack } from "./src/docker"; import { parseE2ERunOptions, runE2E } from "./src/e2e"; import { emitError, emitJson } from "./src/output"; -import { jobWithTail, listJobs, readJob, runJob } from "./src/jobs"; +import { jobWithTail, listJobs, listJobsSummary, readJob, runJob } from "./src/jobs"; import { parseCheckOptions, runChecks } from "./src/check"; import { runSsh } from "./src/ssh"; import { extractRemoteCliOptions, runRemoteCli } from "./src/remote"; @@ -15,6 +15,7 @@ import { runProviderCommand } from "./src/provider-attach"; import { runScheduleCommand } from "./src/schedules"; import { parseNetworkPerfOptions, runNetworkPerf } from "./src/network-perf"; import { runCiCommand } from "./src/ci"; +import { runSwapCommand } from "./src/swap"; const remoteOptions = extractRemoteCliOptions(process.argv.slice(2)); const args = remoteOptions.args; @@ -32,6 +33,7 @@ function help(): unknown { { command: "server start", description: "Fire-and-forget build/start for database, backend-core, frontend, provider gateway, and managed main-server user services." }, { command: "server stop", description: "Fire-and-forget docker-compose down for the fixed UniDesk stack." }, { command: "server status", description: "Show fixed ports, containers, service health, and public URLs." }, + { command: "server swap status|ensure [--path /swapfile] [--size 2GiB] [--dry-run]", description: "Inspect or idempotently create host swap for low-memory main-server operation." }, { command: "server logs [--tail-bytes N]", description: "Return bounded tails from file logs and docker logs." }, { command: "server rebuild ", description: "Build first, then serialize, force-recreate, and validate one Compose service." }, { command: "provider attach [--master-server URL] [--up] [--force]", description: "Generate the minimal external provider-gateway env/compose bundle; only master server URL and provider id are required." }, @@ -61,7 +63,7 @@ function help(): unknown { { command: "codex judge --attempt N [--dry-run] [--include-prompt]", description: "Replay one stored Code Queue attempt through the same judge context builder and MiniMax judge call path used by the live queue worker." }, { command: "codex interrupt|cancel ", description: "Request interrupt for a running Code Queue task, or cancel a queued/retry_wait task, through the same private proxy." }, { command: "codex (queues | queue create | queue merge --into | move --queue )", description: "List/create/merge Code Queue lanes and move a queued task; merge preserves task queue time order and deletes the source queue record." }, - { command: "job list", description: "List async jobs from .state/jobs." }, + { command: "job list [--limit N] [--include-command]", description: "List async jobs from .state/jobs with a bounded default page." }, { command: "job status [--tail-bytes N]", description: "Show job state with bounded stdout/stderr tails." }, { command: "debug health", description: "Probe internal core, nodes, system/Docker status, frontend, provider ingress, and public boundary." }, { command: "debug dispatch [providerId] [docker.ps|provider.upgrade|host.ssh|microservice.http|echo] [--wait-ms N]", description: "Submit a real internal-core dispatch request for CLI debugging." }, @@ -82,6 +84,10 @@ function numberOption(name: string, defaultValue: number): number { return value; } +function boundedNumberOption(name: string, defaultValue: number, maxValue: number): number { + return Math.min(numberOption(name, defaultValue), maxValue); +} + function stringOption(name: string): string | undefined { const index = args.indexOf(name); if (index === -1) return undefined; @@ -174,8 +180,15 @@ async function main(): Promise { emitJson(commandName, await stackStatus(config)); return; } + if (sub === "swap") { + const result = runSwapCommand(args.slice(2)); + const ok = (result as { ok?: unknown }).ok !== false; + emitJson(commandName, result, ok); + if (!ok) process.exitCode = 1; + return; + } if (sub === "logs") { - emitJson(commandName, stackLogs(config, numberOption("--tail-bytes", 3000))); + emitJson(commandName, stackLogs(config, boundedNumberOption("--tail-bytes", 3000, 500_000))); return; } if (sub === "rebuild") { @@ -229,12 +242,12 @@ async function main(): Promise { if (top === "job") { if (sub === "list") { - emitJson(commandName, { jobs: listJobs() }); + emitJson(commandName, listJobsSummary({ limit: boundedNumberOption("--limit", 50, 500), includeCommand: args.includes("--include-command") })); return; } if (sub === "status") { const id = third === "latest" || third === undefined ? latestJobId() : third; - emitJson(commandName, { job: jobWithTail(readJob(id), numberOption("--tail-bytes", 12000)) }); + emitJson(commandName, { job: jobWithTail(readJob(id), boundedNumberOption("--tail-bytes", 12000, 500_000)) }); return; } } diff --git a/scripts/src/command.ts b/scripts/src/command.ts index 6c58c153..5f7a2c1b 100644 --- a/scripts/src/command.ts +++ b/scripts/src/command.ts @@ -1,5 +1,5 @@ import { spawn, spawnSync } from "node:child_process"; -import { createWriteStream, existsSync, readFileSync } from "node:fs"; +import { closeSync, createWriteStream, existsSync, openSync, readSync, statSync } from "node:fs"; export interface CommandResult { command: string[]; @@ -56,6 +56,16 @@ export async function runCommandToFiles(command: string[], cwd: string, stdoutFi export function tailFile(path: string, maxBytes = 8192): string { if (!existsSync(path)) return ""; - const content = readFileSync(path); - return content.subarray(Math.max(0, content.length - maxBytes)).toString("utf8"); + const safeMaxBytes = Math.max(0, Math.floor(maxBytes)); + if (safeMaxBytes === 0) return ""; + const size = statSync(path).size; + const bytesToRead = Math.min(size, safeMaxBytes); + const buffer = Buffer.alloc(bytesToRead); + const fd = openSync(path, "r"); + try { + readSync(fd, buffer, 0, bytesToRead, size - bytesToRead); + } finally { + closeSync(fd); + } + return buffer.toString("utf8"); } diff --git a/scripts/src/docker.ts b/scripts/src/docker.ts index 35ab3c1e..0b7f02a1 100644 --- a/scripts/src/docker.ts +++ b/scripts/src/docker.ts @@ -1,8 +1,9 @@ -import { chmodSync, existsSync, mkdirSync, readFileSync, readdirSync, writeFileSync } from "node:fs"; +import { chmodSync, existsSync, mkdirSync, readFileSync, readdirSync, statSync, writeFileSync } from "node:fs"; import { basename, dirname, join, resolve } from "node:path"; import { commandOk, runCommand, tailFile } from "./command"; import { type UniDeskConfig, repoRoot, rootPath } from "./config"; import { startJob } from "./jobs"; +import { swapStatus } from "./swap"; export interface ComposeRuntimeEnv { envFile: string; @@ -436,6 +437,7 @@ export async function stackStatus(config: UniDeskConfig): Promise { const overview = dockerExecJson("unidesk-backend-core", "fetch('http://127.0.0.1:8080/api/overview').then(r=>r.json()).then(j=>console.log(JSON.stringify({ok:true,status:200,body:j}))).catch(e=>{console.log(JSON.stringify({ok:false,error:String(e)}));process.exit(1)})"); return { runtimeEnv, + swap: swapStatus(), publicPorts: fixedPorts(config), blockedPublicPorts: [ { name: "backend-core-rest", port: config.network.core.port, listening: isPortListening(config.network.core.port), expected: "not-listening" }, @@ -500,11 +502,37 @@ export function stackLogs(config: UniDeskConfig, tailBytes: number): unknown { const allFiles = listLogFiles(logRoot); const currentFiles = allFiles.filter((path) => basename(path).startsWith(runtimeEnv.logPrefix)); const selectedFiles = (currentFiles.length > 0 ? currentFiles : allFiles.slice(-12)).slice(-12); - const files = selectedFiles.map((path) => ({ path, name: basename(path), tail: tailFile(path, tailBytes) })); + const files = selectedFiles.map((path) => { + const sizeBytes = existsSync(path) ? statSync(path).size : 0; + const truncated = sizeBytes > tailBytes; + return { path, name: basename(path), sizeBytes, tailBytes, truncated, tail: tailFile(path, tailBytes) }; + }); const containerNames = ["unidesk-database", "unidesk-backend-core", "unidesk-frontend", "unidesk-provider-gateway-main", "todo-note-backend", "project-manager-backend", "baidu-netdisk-backend", "oa-event-flow-backend"]; const docker = containerNames.map((name) => { const result = runCommand(["docker", "logs", "--tail", "40", name], repoRoot); - return { name, exitCode: result.exitCode, stdoutTail: result.stdout.slice(-tailBytes), stderrTail: result.stderr.slice(-tailBytes) }; + return { + name, + exitCode: result.exitCode, + tailBytes, + stdoutBytes: Buffer.byteLength(result.stdout, "utf8"), + stderrBytes: Buffer.byteLength(result.stderr, "utf8"), + stdoutTruncated: Buffer.byteLength(result.stdout, "utf8") > tailBytes, + stderrTruncated: Buffer.byteLength(result.stderr, "utf8") > tailBytes, + stdoutTail: result.stdout.slice(-tailBytes), + stderrTail: result.stderr.slice(-tailBytes), + }; }); - return { logRoot, runtimeEnv, files, docker }; + return { + logRoot, + runtimeEnv, + policy: { + defaultTailBytes: 3000, + requestedTailBytes: tailBytes, + selectedFileLimit: 12, + dockerTailLines: 40, + disclosure: "server logs returns tails only; increase with --tail-bytes for a larger bounded tail, and inspect listed paths directly for full logs.", + }, + files, + docker, + }; } diff --git a/scripts/src/jobs.ts b/scripts/src/jobs.ts index ed87cc73..101ffcf4 100644 --- a/scripts/src/jobs.ts +++ b/scripts/src/jobs.ts @@ -1,5 +1,5 @@ import { spawn, spawnSync } from "node:child_process"; -import { existsSync, mkdirSync, readFileSync, readdirSync, writeFileSync } from "node:fs"; +import { existsSync, mkdirSync, readFileSync, readdirSync, statSync, writeFileSync } from "node:fs"; import { join } from "node:path"; import { repoRoot, rootPath } from "./config"; import { runCommandToFiles, tailFile } from "./command"; @@ -141,6 +141,70 @@ export async function runJob(id: string): Promise { return job; } -export function jobWithTail(job: JobRecord, maxBytes = 12000): JobRecord & { stdoutTail: string; stderrTail: string } { - return { ...job, stdoutTail: tailFile(job.stdoutFile, maxBytes), stderrTail: tailFile(job.stderrFile, maxBytes) }; +export function jobWithTail(job: JobRecord, maxBytes = 12000): JobRecord & { + tailPolicy: { + requestedTailBytes: number; + stdoutBytes: number; + stderrBytes: number; + stdoutTruncated: boolean; + stderrTruncated: boolean; + fullLogPaths: { stdoutFile: string; stderrFile: string }; + }; + stdoutTail: string; + stderrTail: string; +} { + const stdoutBytes = existsSync(job.stdoutFile) ? statSync(job.stdoutFile).size : 0; + const stderrBytes = existsSync(job.stderrFile) ? statSync(job.stderrFile).size : 0; + return { + ...job, + tailPolicy: { + requestedTailBytes: maxBytes, + stdoutBytes, + stderrBytes, + stdoutTruncated: stdoutBytes > maxBytes, + stderrTruncated: stderrBytes > maxBytes, + fullLogPaths: { stdoutFile: job.stdoutFile, stderrFile: job.stderrFile }, + }, + stdoutTail: tailFile(job.stdoutFile, maxBytes), + stderrTail: tailFile(job.stderrFile, maxBytes), + }; +} + +export interface JobListOptions { + limit?: number; + includeCommand?: boolean; +} + +export function listJobsSummary(options: JobListOptions = {}): unknown { + const limit = Math.max(1, Math.floor(options.limit ?? 50)); + const jobs = listJobs(); + const returned = jobs.slice(0, limit).map((job) => ({ + id: job.id, + name: job.name, + status: job.status, + runner: job.runner, + runnerPid: job.runnerPid ?? null, + runnerContainer: job.runnerContainer ?? null, + createdAt: job.createdAt, + startedAt: job.startedAt, + finishedAt: job.finishedAt, + exitCode: job.exitCode, + note: job.note, + stdoutFile: job.stdoutFile, + stderrFile: job.stderrFile, + ...(options.includeCommand === true ? { command: job.command, cwd: job.cwd } : {}), + })); + return { + jobs: returned, + total: jobs.length, + returned: returned.length, + limit, + truncated: jobs.length > returned.length, + disclosure: { + defaultLimit: 50, + nextCommand: jobs.length > returned.length ? `bun scripts/cli.ts job list --limit ${Math.min(jobs.length, limit * 2)}` : null, + includeCommandCommand: "bun scripts/cli.ts job list --include-command", + statusCommand: "bun scripts/cli.ts job status --tail-bytes 12000", + }, + }; } diff --git a/scripts/src/microservices.ts b/scripts/src/microservices.ts index 6488f190..b70155df 100644 --- a/scripts/src/microservices.ts +++ b/scripts/src/microservices.ts @@ -3,18 +3,51 @@ import { runCommand } from "./command"; import { type UniDeskConfig, repoRoot } from "./config"; import { jsonByteLength, previewJson } from "./preview"; -export function coreInternalFetch(path: string, init?: { method?: string; body?: unknown }): unknown { +export function coreInternalFetch(path: string, init?: { method?: string; body?: unknown; maxResponseBytes?: number }): unknown { if (!path.startsWith("/")) throw new Error("core internal path must start with /"); + const maxResponseBytes = Math.max(1024, Math.floor(init?.maxResponseBytes ?? 5_000_000)); const code = ` const res = await fetch(${JSON.stringify(`http://127.0.0.1:8080${path}`)}, ${JSON.stringify({ method: init?.method ?? "GET", headers: init?.body === undefined ? undefined : { "content-type": "application/json" }, body: init?.body === undefined ? undefined : JSON.stringify(init.body), })}); - const text = await res.text(); + const maxResponseBytes = ${JSON.stringify(maxResponseBytes)}; + const reader = res.body?.getReader(); + const chunks = []; + let bytes = 0; + let responseTruncated = false; + if (reader) { + while (true) { + const { done, value } = await reader.read(); + if (done) break; + if (bytes + value.byteLength > maxResponseBytes) { + const keep = Math.max(0, maxResponseBytes - bytes); + if (keep > 0) { + chunks.push(value.slice(0, keep)); + bytes += keep; + } + responseTruncated = true; + try { await reader.cancel(); } catch {} + break; + } + chunks.push(value); + bytes += value.byteLength; + } + } + const buffer = new Uint8Array(bytes); + let offset = 0; + for (const chunk of chunks) { + buffer.set(chunk, offset); + offset += chunk.byteLength; + } + const text = new TextDecoder().decode(buffer); let body = null; - try { body = text ? JSON.parse(text) : null; } catch { body = { text }; } - console.log(JSON.stringify({ ok: res.ok, status: res.status, body })); + try { body = text && !responseTruncated ? JSON.parse(text) : null; } catch { body = { text }; } + if (responseTruncated) { + body = { _unideskResponseTruncated: true, maxResponseBytes, bytesRead: bytes, contentLength: res.headers.get("content-length"), textPreview: text }; + } + console.log(JSON.stringify({ ok: res.ok, status: res.status, responseTruncated, responseBytesRead: bytes, responseContentLength: res.headers.get("content-length"), body })); `; const result = runCommand(["docker", "exec", "unidesk-backend-core", "bun", "-e", code], repoRoot); if (result.exitCode !== 0) { @@ -51,6 +84,11 @@ function numberOption(args: string[], name: string, defaultValue: number): numbe return value; } +function cappedNumberOption(args: string[], name: string, defaultValue: number, maxValue: number): number { + const value = numberOption(args, name, defaultValue); + return Math.min(value, maxValue); +} + function stringOption(args: string[], name: string): string | undefined { const index = args.indexOf(name); if (index === -1) return undefined; @@ -95,13 +133,34 @@ function methodOption(args: string[], hasBody = false): string { } export function summarizeMicroserviceProxyResponse(response: unknown, args: string[]): unknown { - if (args.includes("--raw")) return response; - const maxBodyBytes = numberOption(args, "--max-body-bytes", 60_000); + const full = args.includes("--full"); + const raw = args.includes("--raw"); + const maxBodyBytes = full ? numberOption(args, "--max-body-bytes", 5_000_000) : cappedNumberOption(args, "--max-body-bytes", raw ? 120_000 : 60_000, 500_000); if (typeof response !== "object" || response === null || Array.isArray(response)) return response; const record = response as Record; if (!("body" in record)) return response; + if (record.responseTruncated === true) { + return { + ...record, + bodyOmitted: true, + bodyMaxBytes: maxBodyBytes, + rawHint: "The upstream response exceeded the CLI collection cap before JSON parsing; re-run with --raw --full and a specific --max-body-bytes only when the full body is required.", + }; + } const bodyBytes = jsonByteLength(record.body); - if (bodyBytes <= maxBodyBytes) return response; + if (bodyBytes <= maxBodyBytes) { + if (!raw || full) return response; + return { + ...record, + outputPolicy: { + rawRequested: true, + bounded: true, + maxBodyBytes, + bodyBytes, + fullCommand: "Re-run with --raw --full to allow the complete body.", + }, + }; + } const rest = { ...record }; delete rest.body; return { @@ -110,7 +169,9 @@ export function summarizeMicroserviceProxyResponse(response: unknown, args: stri bodyBytes, bodyMaxBytes: maxBodyBytes, bodyPreview: previewJson(record.body, { maxDepth: 3, maxArrayItems: 3, maxObjectKeys: 16, maxStringLength: 320 }), - rawHint: "Re-run with --raw for the full body, or add/tighten __unideskArrayLimit=: in the proxied path.", + rawHint: raw && !full + ? "The --raw response exceeded the default hard limit; re-run with --raw --full for the complete body, or add/tighten __unideskArrayLimit=: in the proxied path." + : "Re-run with --raw --full for the complete body, or add/tighten __unideskArrayLimit=: in the proxied path.", }; } @@ -137,7 +198,11 @@ export async function runMicroserviceCommand(_config: UniDeskConfig, args: strin const id = requireId(idArg, "microservice proxy"); const path = requireProxyPath(pathArg); const body = requestBodyOption(args); - return summarizeMicroserviceProxyResponse(coreInternalFetch(`/api/microservices/${encodeId(id)}/proxy${path}`, { method: methodOption(args, body !== undefined), body }), args); + const full = hasFlag(args, "--full"); + const raw = hasFlag(args, "--raw"); + const maxBodyBytes = full ? numberOption(args, "--max-body-bytes", 5_000_000) : cappedNumberOption(args, "--max-body-bytes", raw ? 120_000 : 60_000, 500_000); + const maxResponseBytes = full ? Math.min(Math.max(maxBodyBytes, 120_000), 5_000_000) : Math.min(Math.max(maxBodyBytes * 3, 240_000), 1_500_000); + return summarizeMicroserviceProxyResponse(coreInternalFetch(`/api/microservices/${encodeId(id)}/proxy${path}`, { method: methodOption(args, body !== undefined), body, maxResponseBytes }), args); } throw new Error("microservice command must be one of: list, status, health, diagnostics, tunnel-self-test, proxy"); } diff --git a/scripts/src/output.ts b/scripts/src/output.ts index addfde5c..f3427535 100644 --- a/scripts/src/output.ts +++ b/scripts/src/output.ts @@ -5,6 +5,20 @@ export interface JsonEnvelope { error?: unknown; } +function isEpipe(error: unknown): boolean { + return typeof error === "object" && error !== null && "code" in error && (error as { code?: unknown }).code === "EPIPE"; +} + +process.stdout.on("error", (error) => { + if (isEpipe(error)) process.exit(0); + throw error; +}); + +process.stderr.on("error", (error) => { + if (isEpipe(error)) process.exit(0); + throw error; +}); + export function emitJson(command: string, data: T, ok = true): void { const envelope: JsonEnvelope = { ok, command, data }; safeStdoutWrite(`${JSON.stringify(envelope, null, 2)}\n`); @@ -22,10 +36,7 @@ function safeStdoutWrite(text: string): void { try { process.stdout.write(text); } catch (error) { - if (typeof error === "object" && error !== null && "code" in error && (error as { code?: unknown }).code === "EPIPE") { - process.exitCode = 0; - return; - } + if (isEpipe(error)) process.exit(0); throw error; } } diff --git a/scripts/src/remote.ts b/scripts/src/remote.ts index d09b52a1..bfccb177 100644 --- a/scripts/src/remote.ts +++ b/scripts/src/remote.ts @@ -27,6 +27,9 @@ interface FetchJsonResult { status?: number; body?: unknown; error?: string; + responseTruncated?: boolean; + responseBytesRead?: number; + responseContentLength?: string | null; } const hostOptions = new Set(["--main-server-ip", "--main-server", "--server"]); @@ -172,19 +175,54 @@ function frontendBaseUrl(host: string, config: UniDeskConfig): string { return `http://${host}:${config.network.frontend.port}`; } -async function readJson(url: string, init?: RequestInit, timeoutMs = 8000): Promise { +async function readJson(url: string, init?: RequestInit, timeoutMs = 8000, maxResponseBytes = 5_000_000): Promise { const controller = new AbortController(); const timer = setTimeout(() => controller.abort(), timeoutMs); try { const res = await fetch(url, { ...init, signal: controller.signal }); - const text = await res.text(); + const reader = res.body?.getReader(); + const chunks: Uint8Array[] = []; + let bytes = 0; + let responseTruncated = false; + if (reader !== undefined) { + while (true) { + const { done, value } = await reader.read(); + if (done) break; + if (bytes + value.byteLength > maxResponseBytes) { + const keep = Math.max(0, maxResponseBytes - bytes); + if (keep > 0) { + chunks.push(value.slice(0, keep)); + bytes += keep; + } + responseTruncated = true; + try { + await reader.cancel(); + } catch { + // Ignore cancel failures after the bounded preview has been collected. + } + break; + } + chunks.push(value); + bytes += value.byteLength; + } + } + const buffer = new Uint8Array(bytes); + let offset = 0; + for (const chunk of chunks) { + buffer.set(chunk, offset); + offset += chunk.byteLength; + } + const text = new TextDecoder().decode(buffer); let body: unknown = null; try { - body = text.length > 0 ? JSON.parse(text) : null; + body = text.length > 0 && !responseTruncated ? JSON.parse(text) : null; } catch { body = { text }; } - return { ok: res.ok, status: res.status, body }; + if (responseTruncated) { + body = { _unideskResponseTruncated: true, maxResponseBytes, bytesRead: bytes, contentLength: res.headers.get("content-length"), textPreview: text }; + } + return { ok: res.ok, status: res.status, body, responseTruncated, responseBytesRead: bytes, responseContentLength: res.headers.get("content-length") }; } catch (error) { return { ok: false, error: error instanceof Error ? error.message : String(error) }; } finally { @@ -208,11 +246,11 @@ async function loginFrontend(host: string, config: UniDeskConfig): Promise { +async function frontendJson(session: FrontendSession, path: string, init?: RequestInit, timeoutMs = 8000, maxResponseBytes = 5_000_000): Promise { const headers = new Headers(init?.headers); headers.set("cookie", session.cookie); if (init?.body !== undefined && !headers.has("content-type")) headers.set("content-type", "application/json"); - return readJson(`${session.baseUrl}${path}`, { ...init, headers }, timeoutMs); + return readJson(`${session.baseUrl}${path}`, { ...init, headers }, timeoutMs, maxResponseBytes); } function stringOption(args: string[], name: string): string | undefined { @@ -231,6 +269,10 @@ function numberOption(args: string[], name: string, defaultValue: number): numbe return value; } +function cappedNumberOption(args: string[], name: string, defaultValue: number, maxValue: number): number { + return Math.min(numberOption(args, name, defaultValue), maxValue); +} + function jsonOption(args: string[], name: string): Record | undefined { const raw = stringOption(args, name); if (raw === undefined) return undefined; @@ -462,7 +504,11 @@ async function remoteMicroservice(session: FrontendSession, args: string[]): Pro }; } if (action === "proxy" && id !== undefined && path !== undefined && path.startsWith("/")) { - const response = await frontendJson(session, `/api/microservices/${encodeURIComponent(id)}/proxy${path}`, undefined, 24_000); + const full = args.includes("--full"); + const raw = args.includes("--raw"); + const maxBodyBytes = full ? numberOption(args, "--max-body-bytes", 5_000_000) : cappedNumberOption(args, "--max-body-bytes", raw ? 120_000 : 60_000, 500_000); + const maxResponseBytes = full ? Math.min(Math.max(maxBodyBytes, 120_000), 5_000_000) : Math.min(Math.max(maxBodyBytes * 3, 240_000), 1_500_000); + const response = await frontendJson(session, `/api/microservices/${encodeURIComponent(id)}/proxy${path}`, undefined, 24_000, maxResponseBytes); return { transport: "frontend", response: summarizeMicroserviceProxyResponse(response, args), diff --git a/scripts/src/swap.ts b/scripts/src/swap.ts new file mode 100644 index 00000000..390c86db --- /dev/null +++ b/scripts/src/swap.ts @@ -0,0 +1,303 @@ +import { accessSync, constants, existsSync, readFileSync, statSync } from "node:fs"; +import { runCommand } from "./command"; +import { repoRoot } from "./config"; + +const defaultSwapPath = "/swapfile"; +const defaultSwapSizeBytes = 2 * 1024 * 1024 * 1024; + +export interface SwapArea { + filename: string; + type: string; + sizeBytes: number; + usedBytes: number; + priority: number | null; +} + +export interface SwapMemoryStatus { + totalBytes: number; + availableBytes: number | null; + swapTotalBytes: number; + swapFreeBytes: number; +} + +export interface SwapStatus { + memory: SwapMemoryStatus; + activeSwaps: SwapArea[]; + configuredPath: string; + configuredPathExists: boolean; + configuredPathMode: string | null; + configuredPathSizeBytes: number | null; + configuredPathActive: boolean; + fstab: { + path: string; + writable: boolean; + persisted: boolean; + matchingLine: string | null; + error: string | null; + }; + warning: string | null; +} + +export interface SwapEnsureResult { + ok: boolean; + status: "ok" | "degraded" | "failed"; + requested: { + path: string; + sizeBytes: number; + }; + before: SwapStatus; + after: SwapStatus; + actions: Array<{ action: string; ok: boolean; detail?: unknown }>; + errors: Array<{ action: string; message: string; detail?: unknown }>; +} + +function shellQuote(value: string): string { + return `'${value.replace(/'/g, `'\\''`)}'`; +} + +function parseByteCount(value: string): number { + const raw = value.trim(); + if (/^\d+$/u.test(raw)) return Number(raw); + const match = raw.match(/^([0-9]+(?:\.[0-9]+)?)([KMGTPE]?i?B?)$/iu); + if (!match) return 0; + const amount = Number(match[1]); + const unit = match[2].toUpperCase(); + const powers: Record = { + K: 1, + KB: 1, + KIB: 1, + M: 2, + MB: 2, + MIB: 2, + G: 3, + GB: 3, + GIB: 3, + T: 4, + TB: 4, + TIB: 4, + P: 5, + PB: 5, + PIB: 5, + E: 6, + EB: 6, + EIB: 6, + }; + return Math.round(amount * (1024 ** (powers[unit] ?? 0))); +} + +function parseMeminfo(): SwapMemoryStatus { + const raw = readFileSync("/proc/meminfo", "utf8"); + const values = new Map(); + for (const line of raw.split("\n")) { + const match = line.match(/^([^:]+):\s+(\d+)\s+kB/u); + if (match) values.set(match[1], Number(match[2]) * 1024); + } + return { + totalBytes: values.get("MemTotal") ?? 0, + availableBytes: values.get("MemAvailable") ?? null, + swapTotalBytes: values.get("SwapTotal") ?? 0, + swapFreeBytes: values.get("SwapFree") ?? 0, + }; +} + +function parseSwaps(): SwapArea[] { + if (!existsSync("/proc/swaps")) return []; + const lines = readFileSync("/proc/swaps", "utf8").trim().split("\n").slice(1); + return lines.map((line) => line.trim().split(/\s+/u)).filter((parts) => parts.length >= 5).map(([filename, type, sizeKiB, usedKiB, priority]) => ({ + filename, + type, + sizeBytes: Number(sizeKiB) * 1024, + usedBytes: Number(usedKiB) * 1024, + priority: Number.isFinite(Number(priority)) ? Number(priority) : null, + })); +} + +function fileMode(path: string): string | null { + if (!existsSync(path)) return null; + return (statSync(path).mode & 0o777).toString(8).padStart(3, "0"); +} + +function fstabStatus(path: string): SwapStatus["fstab"] { + const fstabPath = "/etc/fstab"; + try { + const raw = existsSync(fstabPath) ? readFileSync(fstabPath, "utf8") : ""; + let writable = false; + try { + accessSync(fstabPath, constants.W_OK); + writable = true; + } catch { + writable = false; + } + const matchingLine = raw.split("\n").find((line) => { + const trimmed = line.trim(); + if (trimmed.length === 0 || trimmed.startsWith("#")) return false; + const parts = trimmed.split(/\s+/u); + return parts[0] === path && parts[2] === "swap"; + }) ?? null; + return { + path: fstabPath, + writable, + persisted: matchingLine !== null, + matchingLine, + error: null, + }; + } catch (error) { + return { + path: fstabPath, + writable: false, + persisted: false, + matchingLine: null, + error: error instanceof Error ? error.message : String(error), + }; + } +} + +export function swapStatus(path = defaultSwapPath): SwapStatus { + const memory = parseMeminfo(); + const activeSwaps = parseSwaps(); + const configuredPathExists = existsSync(path); + const configuredPathSizeBytes = configuredPathExists ? statSync(path).size : null; + const configuredPathActive = activeSwaps.some((swap) => swap.filename === path); + const warning = memory.swapTotalBytes > 0 ? null : "swap is not active; low-memory main servers are at risk of global OOM during builds or diagnostics"; + return { + memory, + activeSwaps, + configuredPath: path, + configuredPathExists, + configuredPathMode: fileMode(path), + configuredPathSizeBytes, + configuredPathActive, + fstab: fstabStatus(path), + warning, + }; +} + +function pushAction( + actions: SwapEnsureResult["actions"], + errors: SwapEnsureResult["errors"], + action: string, + command: string[], +): boolean { + const result = runCommand(command, repoRoot, { timeoutMs: 120_000 }); + const ok = result.exitCode === 0; + const detail = { + command, + exitCode: result.exitCode, + stdoutTail: result.stdout.slice(-1200), + stderrTail: result.stderr.slice(-1200), + timedOut: result.timedOut, + }; + actions.push({ action, ok, detail }); + if (!ok) { + errors.push({ + action, + message: result.stderr.trim() || result.stdout.trim() || `command failed with exit code ${result.exitCode}`, + detail, + }); + } + return ok; +} + +function ensureFstabLine(path: string): { ok: boolean; action: string; detail: unknown } { + const line = `${path} none swap sw 0 0`; + const script = [ + "set -euo pipefail", + "touch /etc/fstab", + `grep -Eq '^${path.replace(/[.*+?^${}()|[\]\\]/g, "\\$&")}[[:space:]]+[^[:space:]]+[[:space:]]+swap[[:space:]]' /etc/fstab || printf '%s\\n' ${shellQuote(line)} >> /etc/fstab`, + ].join("\n"); + const result = runCommand(["bash", "-lc", script], repoRoot, { timeoutMs: 30_000 }); + return { + ok: result.exitCode === 0, + action: "persist-fstab", + detail: { + command: ["bash", "-lc", script], + exitCode: result.exitCode, + stdoutTail: result.stdout.slice(-1200), + stderrTail: result.stderr.slice(-1200), + timedOut: result.timedOut, + }, + }; +} + +function parseSizeOption(args: string[], defaultBytes: number): number { + const index = args.indexOf("--size"); + const raw = index === -1 ? undefined : args[index + 1]; + if (raw === undefined) return defaultBytes; + const bytes = parseByteCount(raw); + if (!Number.isFinite(bytes) || bytes <= 0) throw new Error("--size must be a positive byte count such as 2GiB or 4096M"); + return bytes; +} + +function parsePathOption(args: string[], defaultPath: string): string { + const index = args.indexOf("--path"); + if (index === -1) return defaultPath; + const raw = args[index + 1]; + if (raw === undefined || !raw.startsWith("/")) throw new Error("--path must be an absolute path"); + return raw; +} + +function hasFlag(args: string[], name: string): boolean { + return args.includes(name); +} + +export function runSwapCommand(args: string[]): unknown { + const [action = "status"] = args; + const path = parsePathOption(args, defaultSwapPath); + if (action === "status") return swapStatus(path); + if (action === "ensure") { + const sizeBytes = parseSizeOption(args, defaultSwapSizeBytes); + const dryRun = hasFlag(args, "--dry-run"); + const before = swapStatus(path); + const actions: SwapEnsureResult["actions"] = []; + const errors: SwapEnsureResult["errors"] = []; + if (before.memory.swapTotalBytes > 0) { + actions.push({ action: "noop-existing-swap", ok: true, detail: { activeSwaps: before.activeSwaps } }); + const after = swapStatus(path); + return { ok: true, status: "ok", requested: { path, sizeBytes }, before, after, actions, errors } satisfies SwapEnsureResult; + } + if (dryRun) { + actions.push({ action: "dry-run", ok: true, detail: { wouldCreate: path, sizeBytes, wouldPersistFstab: true } }); + const after = swapStatus(path); + return { ok: true, status: "degraded", requested: { path, sizeBytes }, before, after, actions, errors } satisfies SwapEnsureResult; + } + if (!existsSync(path)) { + const sizeMiB = Math.ceil(sizeBytes / 1024 / 1024); + const allocated = pushAction(actions, errors, "allocate-swapfile", ["fallocate", "-l", `${sizeMiB}M`, path]); + if (!allocated) pushAction(actions, errors, "allocate-swapfile-dd-fallback", ["dd", "if=/dev/zero", `of=${path}`, "bs=1M", `count=${sizeMiB}`, "status=none"]); + } else { + const existingBytes = statSync(path).size; + if (existingBytes < sizeBytes) { + const sizeMiB = Math.ceil(sizeBytes / 1024 / 1024); + const resized = pushAction(actions, errors, "resize-existing-swapfile", ["fallocate", "-l", `${sizeMiB}M`, path]); + if (!resized) pushAction(actions, errors, "resize-existing-swapfile-dd-fallback", ["dd", "if=/dev/zero", `of=${path}`, "bs=1M", `count=${sizeMiB}`, "status=none"]); + } else { + actions.push({ action: "reuse-existing-swapfile-path", ok: true, detail: { path, sizeBytes: existingBytes } }); + } + } + pushAction(actions, errors, "chmod-600", ["chmod", "600", path]); + pushAction(actions, errors, "mkswap", ["mkswap", path]); + pushAction(actions, errors, "swapon", ["swapon", path]); + const persist = ensureFstabLine(path); + actions.push({ action: persist.action, ok: persist.ok, detail: persist.detail }); + if (!persist.ok) { + errors.push({ + action: persist.action, + message: "swap is active but /etc/fstab could not be updated; rerun ensure as root or add the returned fstab line manually", + detail: persist.detail, + }); + } + const after = swapStatus(path); + const swapActive = after.memory.swapTotalBytes > 0; + const status = swapActive && after.fstab.persisted ? "ok" : swapActive ? "degraded" : "failed"; + return { + ok: status !== "failed", + status, + requested: { path, sizeBytes }, + before, + after, + actions, + errors, + } satisfies SwapEnsureResult; + } + throw new Error("server swap command must be one of: status, ensure"); +} diff --git a/src/components/backend-core/src/egress-tcp.ts b/src/components/backend-core/src/egress-tcp.ts index dedb45fd..49269144 100644 --- a/src/components/backend-core/src/egress-tcp.ts +++ b/src/components/backend-core/src/egress-tcp.ts @@ -23,6 +23,9 @@ function isValidEgressPort(port: number): boolean { return Number.isInteger(port) && port > 0 && port <= 65_535; } +const egressTcpConnectTimeoutMs = 15_000; +const egressTcpIdleTimeoutMs = 600_000; + function sendEgressClose(provider: ProviderSocket, connectionId: string, error?: string): void { const message: CoreEgressTcpCloseMessage = error === undefined ? { type: "egress_tcp_close", connectionId } @@ -30,13 +33,31 @@ function sendEgressClose(provider: ProviderSocket, connectionId: string, error?: wsSendJson(provider, message); } +function clearConnectionTimers(connection: EgressTcpConnection): void { + if (connection.connectTimer !== null) clearTimeout(connection.connectTimer); + if (connection.idleTimer !== null) clearTimeout(connection.idleTimer); + connection.connectTimer = null; + connection.idleTimer = null; +} + function closeEgressTcpConnection(providerId: string, connectionId: string, error?: string): void { const key = egressTcpKey(providerId, connectionId); const connection = ctx.activeEgressTcpConnections.get(key); if (connection === undefined) return; ctx.activeEgressTcpConnections.delete(key); + clearConnectionTimers(connection); connection.socket.destroy(); - if (error !== undefined) sendEgressClose(connection.provider, connectionId, error); + if (error !== undefined) { + logger("warn", "egress_tcp_connection_closed", { providerId, connectionId, error }); + sendEgressClose(connection.provider, connectionId, error); + } +} + +function refreshConnectionIdle(connection: EgressTcpConnection): void { + if (connection.idleTimer !== null) clearTimeout(connection.idleTimer); + connection.idleTimer = setTimeout(() => { + closeEgressTcpConnection(connection.providerId, connection.connectionId, "egress tcp idle timeout"); + }, egressTcpIdleTimeoutMs); } export function handleEgressTcpOpen(ws: ProviderSocket, message: ProviderEgressTcpOpenMessage): void { @@ -49,13 +70,32 @@ export function handleEgressTcpOpen(ws: ProviderSocket, message: ProviderEgressT const key = egressTcpKey(message.providerId, message.connectionId); closeEgressTcpConnection(message.providerId, message.connectionId); const socket = connectTcp({ host, port }); - const connection: EgressTcpConnection = { providerId: message.providerId, connectionId: message.connectionId, socket, provider: ws }; + const connection: EgressTcpConnection = { + providerId: message.providerId, + connectionId: message.connectionId, + socket, + provider: ws, + connectTimer: null, + idleTimer: null, + }; ctx.activeEgressTcpConnections.set(key, connection); + connection.connectTimer = setTimeout(() => { + closeEgressTcpConnection(message.providerId, message.connectionId, "egress tcp connect timeout"); + }, egressTcpConnectTimeoutMs); + refreshConnectionIdle(connection); socket.on("connect", () => { + if (ctx.activeEgressTcpConnections.get(key) !== connection) return; + if (connection.connectTimer !== null) { + clearTimeout(connection.connectTimer); + connection.connectTimer = null; + } + refreshConnectionIdle(connection); const opened: CoreEgressTcpOpenedMessage = { type: "egress_tcp_opened", connectionId: message.connectionId }; wsSendJson(ws, opened); }); socket.on("data", (chunk) => { + if (ctx.activeEgressTcpConnections.get(key) !== connection) return; + refreshConnectionIdle(connection); const data: CoreEgressTcpDataMessage = { type: "egress_tcp_data", connectionId: message.connectionId, @@ -67,11 +107,13 @@ export function handleEgressTcpOpen(ws: ProviderSocket, message: ProviderEgressT socket.on("close", () => { if (ctx.activeEgressTcpConnections.get(key) !== connection) return; ctx.activeEgressTcpConnections.delete(key); + clearConnectionTimers(connection); sendEgressClose(ws, message.connectionId); }); socket.on("error", (error) => { if (ctx.activeEgressTcpConnections.get(key) !== connection) return; ctx.activeEgressTcpConnections.delete(key); + clearConnectionTimers(connection); sendEgressClose(ws, message.connectionId, error.message); }); } @@ -79,6 +121,7 @@ export function handleEgressTcpOpen(ws: ProviderSocket, message: ProviderEgressT export function handleEgressTcpData(message: ProviderEgressTcpDataMessage): void { const connection = ctx.activeEgressTcpConnections.get(egressTcpKey(message.providerId, message.connectionId)); if (connection === undefined) return; + refreshConnectionIdle(connection); connection.socket.write(Buffer.from(message.data, message.encoding === "base64" ? "base64" : "utf8")); } @@ -90,6 +133,16 @@ export function closeEgressTcpConnectionsForProvider(providerId: string): void { for (const [key, connection] of ctx.activeEgressTcpConnections) { if (connection.providerId !== providerId) continue; ctx.activeEgressTcpConnections.delete(key); + clearConnectionTimers(connection); + connection.socket.destroy(); + } +} + +export function closeEgressTcpConnectionsForSocket(provider: ProviderSocket): void { + for (const [key, connection] of ctx.activeEgressTcpConnections) { + if (connection.provider !== provider) continue; + ctx.activeEgressTcpConnections.delete(key); + clearConnectionTimers(connection); connection.socket.destroy(); } } diff --git a/src/components/backend-core/src/microservice-proxy.ts b/src/components/backend-core/src/microservice-proxy.ts index b3ce67b0..9312eab2 100644 --- a/src/components/backend-core/src/microservice-proxy.ts +++ b/src/components/backend-core/src/microservice-proxy.ts @@ -381,6 +381,7 @@ function isK3sctlManagedMicroservice(service: MicroserviceConfig): boolean { function codeQueueK3sServiceIdForRequest(method: string, targetPath: string): string { const normalizedMethod = method.toUpperCase(); if (targetPath === "/" || targetPath === "/health" || targetPath === "/live" || targetPath === "/api/dev-ready") return "code-queue-scheduler"; + if (targetPath === "/api/queues" || targetPath === "/api/tasks/overview") return "code-queue-scheduler"; if (targetPath === "/api/oa/backfill" || targetPath === "/api/notifications/claudeqq/drain" || targetPath === "/api/notifications/claudeqq/backfill") return "code-queue-scheduler"; if (targetPath === "/api/judge/probe" || targetPath === "/api/judge/self-test" || targetPath === "/api/queue-order/self-test" || targetPath === "/api/reference-injection/self-test" || targetPath === "/api/trace-port/self-test") return "code-queue-scheduler"; if (/^\/api\/tasks\/[^/]+\/(?:steer|interrupt)$/u.test(targetPath)) return "code-queue-scheduler"; @@ -778,7 +779,8 @@ async function microserviceTunnelSelfTestResponse(service: MicroserviceConfig): const bodyRecord = isPlainRecord(body) ? body : {}; const hasRequestId = typeof bodyRecord.requestId === "string" && bodyRecord.requestId.length > 0; const hasStage = typeof bodyRecord.stage === "string" && bodyRecord.stage.length > 0; - const ok = response.status === 502 && hasRequestId && hasStage && headers.requestId === bodyRecord.requestId; + const expectedStatus = response.status === 502 || response.status === 504; + const ok = expectedStatus && hasRequestId && hasStage && headers.requestId === bodyRecord.requestId; return jsonResponse({ ok, serviceId: service.id, @@ -787,7 +789,7 @@ async function microserviceTunnelSelfTestResponse(service: MicroserviceConfig): expectedFailure: true, status: response.status, checks: { - expectedStatus: response.status === 502, + expectedStatus, bodyHasRequestId: hasRequestId, bodyHasStage: hasStage, headerHasRequestId: typeof headers.requestId === "string" && headers.requestId.length > 0, @@ -956,7 +958,7 @@ async function providerHttpTunnelMicroserviceResponse( const durationMs = Date.now() - startedAt; if (message === null) { attempts.push({ attempt, requestId, ok: false, reason: reason ?? "timeout", durationMs, timeoutMs }); - if (retryable && tunnelFailureRetryable(reason) && attempt < maxAttempts) { + if (retryable && reason !== "timeout" && tunnelFailureRetryable(reason) && attempt < maxAttempts) { logger("warn", "http_tunnel_retry", { providerId: service.providerId, serviceId: service.id, diff --git a/src/components/backend-core/src/types.ts b/src/components/backend-core/src/types.ts index a65cdbc0..48041f74 100644 --- a/src/components/backend-core/src/types.ts +++ b/src/components/backend-core/src/types.ts @@ -166,6 +166,8 @@ export interface EgressTcpConnection { connectionId: string; socket: Socket; provider: ProviderSocket; + connectTimer: ReturnType | null; + idleTimer: ReturnType | null; } export type HttpTunnelFailureReason = diff --git a/src/components/microservices/k3sctl-adapter/k3s/ci/unidesk-ci.pipeline.yaml b/src/components/microservices/k3sctl-adapter/k3s/ci/unidesk-ci.pipeline.yaml index d247e69c..7f37047d 100644 --- a/src/components/microservices/k3sctl-adapter/k3s/ci/unidesk-ci.pipeline.yaml +++ b/src/components/microservices/k3sctl-adapter/k3s/ci/unidesk-ci.pipeline.yaml @@ -270,6 +270,37 @@ spec: shift curl -fsS --cacert "$kube_ca" -H "Authorization: Bearer $kube_token" -X "$method" "$@" } + delete_if_exists() { + local path="$1" + local code + code="$(curl -sS -o /tmp/unidesk-ci-delete-response -w "%{http_code}" --cacert "$kube_ca" -H "Authorization: Bearer $kube_token" -X DELETE "$kube_api/$path")" + if [ "$code" = "200" ] || [ "$code" = "202" ] || [ "$code" = "404" ]; then + return 0 + fi + cat /tmp/unidesk-ci-delete-response >&2 + return 1 + } + wait_deleted() { + local path="$1" + local deadline=$((SECONDS + 120)) + local code + while [ "$SECONDS" -lt "$deadline" ]; do + code="$(curl -sS -o /tmp/unidesk-ci-get-response -w "%{http_code}" --cacert "$kube_ca" -H "Authorization: Bearer $kube_token" "$kube_api/$path")" + if [ "$code" = "404" ]; then + return 0 + fi + if [ "$code" != "200" ]; then + cat /tmp/unidesk-ci-get-response >&2 + return 1 + fi + sleep 2 + done + echo "timeout waiting for $path deletion" >&2 + return 1 + } + delete_if_exists "apis/apps/v1/namespaces/$kube_namespace/deployments/code-queue-ci-read" + delete_if_exists "api/v1/namespaces/$kube_namespace/services/code-queue-ci-read" + wait_deleted "apis/apps/v1/namespaces/$kube_namespace/deployments/code-queue-ci-read" cat >/tmp/code-queue-ci-read-deployment.yaml </dev/null - deadline=$((SECONDS + 180)) + deadline=$((SECONDS + 420)) while [ "$SECONDS" -lt "$deadline" ]; do status="$(kube GET "$kube_api/apis/apps/v1/namespaces/$kube_namespace/deployments/code-queue-ci-read")" replicas="$(printf '%s' "$status" | jq -r '.spec.replicas // 1')" diff --git a/src/components/provider-gateway/package.json b/src/components/provider-gateway/package.json index a21be47c..41c1b161 100644 --- a/src/components/provider-gateway/package.json +++ b/src/components/provider-gateway/package.json @@ -1,6 +1,6 @@ { "name": "@unidesk/provider-gateway", - "version": "0.2.21", + "version": "0.2.22", "private": true, "type": "module", "scripts": { diff --git a/src/components/provider-gateway/src/egress-proxy.ts b/src/components/provider-gateway/src/egress-proxy.ts index 278dd54e..848ff07d 100644 --- a/src/components/provider-gateway/src/egress-proxy.ts +++ b/src/components/provider-gateway/src/egress-proxy.ts @@ -29,7 +29,12 @@ interface Tunnel { method: string; opened: boolean; pending: Buffer[]; + pendingBytes: number; closed: boolean; + createdAt: number; + lastActivityAt: number; + openTimer: ReturnType | null; + idleTimer: ReturnType | null; } export interface ProviderEgressProxyHandle { @@ -111,6 +116,10 @@ function proxyUrlFor(host: string, port: number): string { return `http://${host}:${port}`; } +const tunnelOpenTimeoutMs = 15_000; +const tunnelIdleTimeoutMs = 600_000; +const maxPendingBytes = 4 * 1024 * 1024; + export function startProviderEgressProxy(options: ProviderEgressProxyOptions): ProviderEgressProxyHandle { const proxyUrl = proxyUrlFor(options.listenHost, options.listenPort); const tunnels = new Map(); @@ -118,25 +127,73 @@ export function startProviderEgressProxy(options: ProviderEgressProxyOptions): P const send = (message: EgressToCoreMessage): boolean => options.sendToCore(message); - const status = (): Record => ({ - enabled: true, - providerId: options.providerId, - connected: options.isCoreConnected(), - proxyUrl, - listenHost: options.listenHost, - listenPort: options.listenPort, - activeTunnels: tunnels.size, - channel: "provider-gateway", - }); + const status = (): Record => { + const now = Date.now(); + const tunnelList = Array.from(tunnels.values()); + const ages = tunnelList.map((tunnel) => now - tunnel.createdAt); + return { + enabled: true, + providerId: options.providerId, + connected: options.isCoreConnected(), + proxyUrl, + listenHost: options.listenHost, + listenPort: options.listenPort, + activeTunnels: tunnels.size, + pendingTunnels: tunnelList.filter((tunnel) => !tunnel.opened).length, + oldestTunnelAgeMs: ages.length > 0 ? Math.max(...ages) : 0, + openTimeoutMs: tunnelOpenTimeoutMs, + idleTimeoutMs: tunnelIdleTimeoutMs, + maxPendingBytes, + channel: "provider-gateway", + }; + }; + + const clearTunnelTimers = (tunnel: Tunnel): void => { + if (tunnel.openTimer !== null) clearTimeout(tunnel.openTimer); + if (tunnel.idleTimer !== null) clearTimeout(tunnel.idleTimer); + tunnel.openTimer = null; + tunnel.idleTimer = null; + }; + + const destroyTunnel = (tunnel: Tunnel, notifyCore: boolean, error?: string): void => { + tunnels.delete(tunnel.id); + tunnel.closed = true; + clearTunnelTimers(tunnel); + tunnel.pending.splice(0); + tunnel.pendingBytes = 0; + if (!tunnel.client.destroyed) tunnel.client.destroy(); + if (notifyCore) send({ type: "egress_tcp_close", providerId: options.providerId, connectionId: tunnel.id, at: nowIso() }); + if (error !== undefined) { + options.logger("warn", "egress_proxy_tunnel_closed", { + connectionId: tunnel.id, + opened: tunnel.opened, + ageMs: Date.now() - tunnel.createdAt, + error, + }); + } + }; const closeTunnel = (id: string, error?: string): void => { const tunnel = tunnels.get(id); if (tunnel === undefined) return; - tunnels.delete(id); - tunnel.closed = true; - if (!tunnel.client.destroyed) tunnel.client.destroy(); - send({ type: "egress_tcp_close", providerId: options.providerId, connectionId: id, at: nowIso() }); - if (error !== undefined) options.logger("warn", "egress_proxy_tunnel_closed", { connectionId: id, error }); + destroyTunnel(tunnel, true, error); + }; + + const refreshTunnelIdle = (tunnel: Tunnel): void => { + if (tunnel.closed) return; + tunnel.lastActivityAt = Date.now(); + if (tunnel.idleTimer !== null) clearTimeout(tunnel.idleTimer); + tunnel.idleTimer = setTimeout(() => closeTunnel(tunnel.id, "egress proxy idle timeout"), tunnelIdleTimeoutMs); + }; + + const queuePendingChunk = (tunnel: Tunnel, chunk: Buffer): boolean => { + if (tunnel.closed) return false; + tunnel.pending.push(chunk); + tunnel.pendingBytes += chunk.byteLength; + refreshTunnelIdle(tunnel); + if (tunnel.pendingBytes <= maxPendingBytes) return true; + closeTunnel(tunnel.id, "egress proxy pending buffer exceeded"); + return false; }; const handleCoreMessage = (message: EgressFromCoreMessage): boolean => { @@ -144,24 +201,29 @@ export function startProviderEgressProxy(options: ProviderEgressProxyOptions): P if (message.type === "egress_tcp_opened") { if (tunnel === undefined || tunnel.closed) return true; tunnel.opened = true; + if (tunnel.openTimer !== null) { + clearTimeout(tunnel.openTimer); + tunnel.openTimer = null; + } + refreshTunnelIdle(tunnel); if (tunnel.method === "CONNECT") { tunnel.client.write("HTTP/1.1 200 Connection Established\r\nProxy-Agent: UniDesk-ProviderGateway\r\n\r\n"); } for (const chunk of tunnel.pending.splice(0)) { send({ type: "egress_tcp_data", providerId: options.providerId, connectionId: tunnel.id, data: chunk.toString("base64"), encoding: "base64", at: nowIso() }); } + tunnel.pendingBytes = 0; return true; } if (message.type === "egress_tcp_data") { if (tunnel === undefined || tunnel.client.destroyed) return true; + refreshTunnelIdle(tunnel); tunnel.client.write(Buffer.from(message.data, message.encoding === "base64" ? "base64" : "utf8")); return true; } if (message.type === "egress_tcp_close") { if (tunnel !== undefined) { - tunnels.delete(message.connectionId); - tunnel.closed = true; - if (!tunnel.client.destroyed) tunnel.client.destroy(); + destroyTunnel(tunnel, false, message.error); } if (message.error !== undefined && message.error.length > 0) { options.logger("warn", "egress_proxy_remote_close", { connectionId: message.connectionId, error: message.error }); @@ -222,14 +284,28 @@ export function startProviderEgressProxy(options: ProviderEgressProxyOptions): P fail("400 Bad Request", "unsupported proxy target\n"); return; } - const tunnel: Tunnel = { id, client, method: parsed.method, opened: false, pending: [], closed: false }; + const createdAt = Date.now(); + const tunnel: Tunnel = { + id, + client, + method: parsed.method, + opened: false, + pending: [], + pendingBytes: 0, + closed: false, + createdAt, + lastActivityAt: createdAt, + openTimer: null, + idleTimer: null, + }; tunnels.set(id, tunnel); client.on("data", (nextChunk) => { const nextBuffer = Buffer.isBuffer(nextChunk) ? nextChunk : Buffer.from(nextChunk); if (!tunnel.opened) { - tunnel.pending.push(nextBuffer); + queuePendingChunk(tunnel, nextBuffer); return; } + refreshTunnelIdle(tunnel); send({ type: "egress_tcp_data", providerId: options.providerId, connectionId: id, data: nextBuffer.toString("base64"), encoding: "base64", at: nowIso() }); }); client.on("close", () => closeTunnel(id)); @@ -238,10 +314,13 @@ export function startProviderEgressProxy(options: ProviderEgressProxyOptions): P if (!opened) { tunnels.delete(id); tunnel.closed = true; + clearTunnelTimers(tunnel); fail("503 Service Unavailable", "provider-gateway core channel is not connected\n"); return; } - if (firstPayload !== null) tunnel.pending.push(firstPayload); + tunnel.openTimer = setTimeout(() => closeTunnel(id, "egress proxy open timeout"), tunnelOpenTimeoutMs); + refreshTunnelIdle(tunnel); + if (firstPayload !== null) queuePendingChunk(tunnel, firstPayload); }); client.on("error", () => undefined); });