merge: codex deploy fallback
# Conflicts: # AGENTS.md # TEST.md # config.json # deploy.json # docs/reference/cli.md # docs/reference/microservices.md # docs/reference/observability.md # scripts/cli.ts # scripts/src/microservices.ts # src/components/backend-core/src/microservice-proxy.ts # src/components/microservices/code-queue/src/index.ts # src/components/microservices/code-queue/src/queue-api.ts
This commit is contained in:
@@ -27,8 +27,9 @@ UniDesk 是一个以主 server 为统一入口的分布式工作平台;本文
|
||||
- `bun scripts/cli.ts config show`:校验并展示根目录 `config.json`,配置来源规则见 `docs/reference/config.md`。
|
||||
- `bun scripts/cli.ts check [--full|--files|--scripts-typecheck|--components|--compose|--logs]`:默认只运行轻量配置和 TypeScript 语法检查;关键文件、`scripts/` 类型、组件类型、Docker Compose 和日志策略检查需显式开启,测试入口见 `TEST.md`。
|
||||
- `bun scripts/cli.ts server start`:以异步 job 启动 database、backend-core、frontend、provider-gateway、code-queue-mgr 和主 server 用户服务,部署规则见 `docs/reference/deployment.md`。
|
||||
- `bun scripts/cli.ts server status`:查询固定端口、容器状态、健康检查和访问 URL,判定标准见 `docs/reference/deployment.md`。
|
||||
- `bun scripts/cli.ts server logs`:分页返回文件日志与 Docker 日志尾部,日志规则见 `docs/reference/observability.md`。
|
||||
- `bun scripts/cli.ts server status`:查询固定端口、swap 摘要、容器状态、健康检查和访问 URL,判定标准见 `docs/reference/deployment.md`。
|
||||
- `bun scripts/cli.ts server swap status|ensure [--path /swapfile] [--size 2GiB] [--dry-run]`:以 JSON 查看或幂等创建主 server swapfile,`ensure` 输出 before/after、动作、持久化状态和 degraded/failed 详情,规则见 `docs/reference/deployment.md`。
|
||||
- `bun scripts/cli.ts server logs [--tail-bytes N]`:分页返回文件日志与 Docker 日志尾部并带截断元数据,日志规则见 `docs/reference/observability.md`。
|
||||
- `bun scripts/cli.ts server rebuild <backend-core|frontend|provider-gateway|todo-note|code-queue-mgr|project-manager|baidu-netdisk|oa-event-flow>`:以 build-first、Compose lock、no-deps force-recreate 和 post-up validation 的异步 job 重建主 server Compose 内单个服务;Code Queue 执行面部署在 D601,规则见 `docs/reference/deployment.md`。
|
||||
- `bun scripts/cli.ts provider attach <providerId> [--master-server URL] [--up] [--force]`:在新增计算节点上生成两项配置的 provider-gateway 挂载包;默认只需要主 server URL(默认 `http://74.48.78.17/`)和唯一 Provider ID,生成的 Compose 固定 Docker socket、`pid: "host"`、`restart: always`、只读 `/workspace`、SSH 维护私钥挂载和 loopback egress proxy 端口,规则见 `docs/reference/provider-gateway.md`。
|
||||
- `bun scripts/cli.ts ssh <providerId> [ssh-like args...]`:通过 provider-gateway 的 Host SSH / WSL SSH 维护桥打开近似原生 ssh 的交互会话或远端命令,并在远端 PATH 注入 `apply_patch`、`glob` 与 `skill-discover`;`apply-patch`、`py`、`skills`、结构化 `find`、`glob` 和 `argv` 子命令用于避免远端补丁、Python stdin、skill 发现与常用只读命令的嵌套转义问题,使用规则见 `docs/reference/cli.md` 和 `docs/reference/provider-gateway.md`。
|
||||
@@ -42,7 +43,7 @@ UniDesk 是一个以主 server 为统一入口的分布式工作平台;本文
|
||||
- `bun scripts/cli.ts codex judge <taskId> --attempt <n> [--dry-run]`:按指定 task/attempt 用与队列 worker 相同的上下文构建和 MiniMax judge 调用路径单步复现完成判定;`--dry-run` 只输出 prompt/payload 诊断。
|
||||
- `bun scripts/cli.ts codex interrupt|cancel <taskId>`:通过 Code Queue 私有代理中断运行任务或取消 queued/retry_wait 任务,规则见 `docs/reference/cli.md`。
|
||||
- `bun scripts/cli.ts server stop`:以异步 job 停止固定 Compose 项目中的全部 UniDesk 服务,停止后用 `server status` 复核。
|
||||
- `bun scripts/cli.ts job list` / `bun scripts/cli.ts job status latest`:查询 `.state/jobs/` 中的异步任务状态,job 机制见 `docs/reference/cli.md`。
|
||||
- `bun scripts/cli.ts job list [--limit N]` / `bun scripts/cli.ts job status latest [--tail-bytes N]`:分页查询 `.state/jobs/` 中的异步任务状态,状态输出只读日志尾部并保留完整日志路径,job 机制见 `docs/reference/cli.md`。
|
||||
- `bun scripts/cli.ts debug health` / `bun scripts/cli.ts debug dispatch` / `bun scripts/cli.ts debug task`:通过 Docker 内网 core、真实 HTTP、WebSocket、系统指标、Docker 状态和 Host SSH 维护桥流程调试健康检查、任务下发与任务结果,调试规则见 `docs/reference/cli.md`。
|
||||
- `bun scripts/cli.ts e2e run [--only pattern[,pattern...]] [--skip pattern[,pattern...]]`:支持按 check/prefix/wildcard 选择性执行公网 frontend/provider ingress、内网 core/database、provider-gateway 自接入与 Playwright 验证;日常迭代先跑当前问题对应的最小检查集,最终交付再跑全量回归,验收规则见 `docs/reference/e2e.md`。
|
||||
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
|
||||
## T1 CLI 可观测性与配置校验
|
||||
|
||||
阅读 `AGENTS.md`(本项目 `AGENTS.md` 同时承担 `SKILL.md` 对 `scripts/cli.ts` 的解释职责),然后用 cli 手动测试以下内容:运行 `bun scripts/cli.ts help`、`bun scripts/cli.ts config show`、`bun scripts/cli.ts check`,确认每条命令都有 JSON 输出、失败时包含错误对象、`config.json` 是唯一配置来源,且默认 `check` 只执行轻量配置和 TypeScript 语法检查;需要覆盖关键文件、`scripts/` 类型、`src/components/` 类型、Docker Compose config 和日志策略时,显式运行 `bun scripts/cli.ts check --full`。
|
||||
阅读 `AGENTS.md`(本项目 `AGENTS.md` 同时承担 `SKILL.md` 对 `scripts/cli.ts` 的解释职责),然后用 cli 手动测试以下内容:运行 `bun scripts/cli.ts help`、`bun scripts/cli.ts config show`、`bun scripts/cli.ts check`,确认每条命令都有 JSON 输出、失败时包含错误对象、`config.json` 是唯一配置来源,且默认 `check` 只执行轻量配置和 TypeScript 语法检查;需要覆盖关键文件、`scripts/` 类型、`src/components/` 类型、Docker Compose config 和日志策略时,显式运行 `bun scripts/cli.ts check --full`。运行 `set -o pipefail; bun scripts/cli.ts server status | head -1`,确认下游 pipe 关闭时不会打印 Bun EPIPE stack trace。
|
||||
|
||||
## T2 Docker 栈异步启动
|
||||
|
||||
@@ -10,7 +10,7 @@
|
||||
|
||||
## T3 主 server 自接入 Provider Gateway
|
||||
|
||||
阅读 `AGENTS.md`(本项目 `AGENTS.md` 同时承担 `SKILL.md` 对 `scripts/cli.ts` 的解释职责),然后用 cli 手动测试以下内容:运行 `bun scripts/cli.ts server status` 和 `bun scripts/cli.ts debug health`,确认面向浏览器的公网入口只有 frontend 与 provider ingress,backend-core 显示为 Docker 内部端口,database/OA Event Flow 若因 D601 Code Queue 映射宿主端口也必须显示为受限宿主端口,且 `network.restrictedHostAccess.allowedSourceCidrs` 已生成来源限制,`/api/nodes` 中存在 `main-server` provider,状态为 `online`,`/api/nodes/system-status` 中存在 CPU/内存/硬盘采样,`/api/nodes/docker-status` 中存在 `main-server` 的 Docker 快照,且 provider 标签中能看到 Docker socket 可用性。
|
||||
阅读 `AGENTS.md`(本项目 `AGENTS.md` 同时承担 `SKILL.md` 对 `scripts/cli.ts` 的解释职责),然后用 cli 手动测试以下内容:运行 `bun scripts/cli.ts server status`、`bun scripts/cli.ts server swap status` 和 `bun scripts/cli.ts debug health`,确认 `server status` 包含 `swap` 摘要,`server swap status` 快速返回 total memory、active swaps、`/etc/fstab` 持久化状态和 warning;面向浏览器的公网入口只有 frontend 与 provider ingress,backend-core 显示为 Docker 内部端口,database/OA Event Flow 若因 D601 Code Queue 映射宿主端口也必须显示为受限宿主端口,且 `network.restrictedHostAccess.allowedSourceCidrs` 已生成来源限制,`/api/nodes` 中存在 `main-server` provider,状态为 `online`,`/api/nodes/system-status` 中存在 CPU/内存/硬盘采样,`/api/nodes/docker-status` 中存在 `main-server` 的 Docker 快照,且 provider 标签中能看到 Docker socket 可用性。若 `swap.warning` 非空,先运行 `bun scripts/cli.ts server swap ensure --dry-run` 审查动作,再谨慎执行 `bun scripts/cli.ts server swap ensure --size 2GiB`,确认输出包含 `before`/`after`、`actions`、`errors` 和 `status=ok|degraded`;已有 swap 时 ensure 必须 no-op。
|
||||
|
||||
## T4 前端控制台连通
|
||||
|
||||
@@ -22,7 +22,7 @@
|
||||
|
||||
## T6 日志第一现场验证
|
||||
|
||||
阅读 `AGENTS.md`(本项目 `AGENTS.md` 同时承担 `SKILL.md` 对 `scripts/cli.ts` 的解释职责),然后用 cli 手动测试以下内容:运行 `bun scripts/cli.ts server logs --tail-bytes 20000`,实际读取输出中列出的 `logs/{YYYYMMDD}/` 文件,确认 backend-core、frontend、provider-gateway、database 都有实时日志;backend-core 与 Code Queue/Codex app-server 日志必须按 `logs/{YYYYMMDD}/{startStamp}_{YYYYMMDD}_{HH}_{service}.jsonl` 小时切片,默认日志族总量不得超过 `1GiB`,超过后会删除最旧切片;日志不得只有启动行,错误日志必须包含可定位的错误消息或 stack。
|
||||
阅读 `AGENTS.md`(本项目 `AGENTS.md` 同时承担 `SKILL.md` 对 `scripts/cli.ts` 的解释职责),然后用 cli 手动测试以下内容:运行 `bun scripts/cli.ts server logs --tail-bytes 20000`,确认输出包含 `policy`、每个日志文件的 `sizeBytes/tailBytes/truncated` 和 Docker logs 的 tail 元数据,实际读取输出中列出的 `logs/{YYYYMMDD}/` 文件,确认 backend-core、frontend、provider-gateway、database 都有实时日志;运行 `bun scripts/cli.ts job list --limit 5` 和 `bun scripts/cli.ts job status latest --tail-bytes 20000`,确认 job 列表分页、状态输出只含 stdout/stderr 尾部且保留完整日志路径;backend-core 与 Code Queue/Codex app-server 日志必须按 `logs/{YYYYMMDD}/{startStamp}_{YYYYMMDD}_{HH}_{service}.jsonl` 小时切片,默认日志族总量不得超过 `1GiB`,超过后会删除最旧切片;日志不得只有启动行,错误日志必须包含可定位的错误消息或 stack。
|
||||
|
||||
## T7 停止与端口释放
|
||||
|
||||
|
||||
@@ -10,8 +10,9 @@ UniDesk 的统一 CLI 入口是根目录 `scripts/cli.ts`,运行方式固定
|
||||
- `check` 默认只执行轻量配置校验、Bun 版本检查和 Bun Transpiler 语法解析(覆盖 CLI 入口、主要 `scripts/` 模块和核心组件入口,不做类型推导);关键文件存在性、`scripts/` TypeScript 类型检查、`src/components/` TypeScript 类型检查、Docker Compose config 和日志轮转策略扫描默认不启用,分别通过 `--files`、`--scripts-typecheck`、`--components`、`--compose`、`--logs` 开启,或用 `--full` 一次性开启。
|
||||
- `server start` 创建异步 job,在后台执行 Docker 构建和启动;命令本身只负责返回 job id、日志路径和启动命令。
|
||||
- `server stop` 创建异步 job,在后台停止固定 Compose project 中的全部 UniDesk 服务。
|
||||
- `server status` 查询公开端口、受限宿主端口、内部端口、Compose 容器、core/frontend/provider/database 健康检查和访问 URL;D601 Code Queue 使用的 PostgreSQL/OA Event Flow host mapping 必须出现在受限宿主端口而不是无条件公开入口中。
|
||||
- `server logs` 返回 `logs/` 文件日志和 Docker 容器日志的尾部,默认限制输出大小,避免日志爆炸。
|
||||
- `server status` 查询公开端口、受限宿主端口、内部端口、主机 swap 摘要、Compose 容器、core/frontend/provider/database 健康检查和访问 URL;D601 Code Queue 使用的 PostgreSQL/OA Event Flow host mapping 必须出现在受限宿主端口而不是无条件公开入口中。低内存主 server 上 `swap.warning` 非空时,先执行 `server swap status` 或 `server swap ensure`。
|
||||
- `server swap status|ensure [--path /swapfile] [--size 2GiB] [--dry-run]` 是主 server swap 管理入口。`status` 仅读 `/proc/meminfo`、`/proc/swaps` 和 `/etc/fstab` 并返回 JSON;`ensure` 在已有任何 active swap 时只报告 no-op,在无 active swap 时创建固定 swapfile、`chmod 600`、`mkswap`、`swapon` 并尽量写入 `/etc/fstab`。输出必须包含 `before`、`after`、total memory、active swap、持久化状态、关键动作和错误详情;若 swap 已启用但 fstab 写入失败,状态为 `degraded`,调用者需按返回的 detail 修复持久化。
|
||||
- `server logs` 返回 `logs/` 文件日志和 Docker 容器日志的尾部,默认限制输出大小,避免日志爆炸。实现必须只读取文件末尾字节,不得为了 tail 先把巨大日志完整读入 CLI 内存。
|
||||
- `server rebuild <backend-core|frontend|provider-gateway|todo-note|code-queue-mgr|project-manager|baidu-netdisk|oa-event-flow>` 创建异步 job,先构建目标服务镜像,随后在 `.state/locks/server-compose.lock` 串行保护下用 `--no-deps --force-recreate` 替换目标 service 并等待容器 `healthy/running`;该命令用于替代手工删除容器的兜底流程,其中 `todo-note`、`code-queue-mgr`、`project-manager`、`baidu-netdisk` 和 `oa-event-flow` 只重建主 server 承载的对应后端,不会重建或删除 database 命名卷。D601 Code Queue 执行面不由 `server rebuild` 管理。
|
||||
- `provider attach <providerId> [--master-server URL] [--up] [--force]` 在新计算节点生成两项配置的 provider-gateway 挂载包:`.state/provider-<ID>.env` 默认只包含 `UNIDESK_MASTER_SERVER` 与 `PROVIDER_ID`,`provider-<ID>.yml` 固定 Docker socket、`pid: "host"`、`restart: always`、只读 `/workspace` 和 SSH 维护私钥挂载;`--up` 会立即执行生成的 `docker compose up -d --build`。
|
||||
- `ssh <providerId> [ssh-like args...]` 通过 backend-core 内网 WebSocket broker 和 provider-gateway 的 Host SSH / WSL SSH 维护桥连接目标节点;无后续参数时进入远端登录 shell,有后续参数时按 ssh 远端命令体验执行并返回远端 exit code。
|
||||
@@ -30,7 +31,7 @@ UniDesk 的统一 CLI 入口是根目录 `scripts/cli.ts`,运行方式固定
|
||||
- `codex interrupt|cancel <taskId>` 通过 Code Queue 私有代理请求中断;running/judging 任务会请求 D601 当前 agent run 停止,queued/retry_wait 任务的取消也必须保持与 WebUI 相同代理路径,返回有界 task 摘要和后续查询命令。任何需要接触 active run 的动作仍属于 D601 执行面。
|
||||
- Code Queue 多队列 lane 由 `codex` 命令命名空间管理:`queues` 列表、`queue create <queueId>` 创建、`queue merge <sourceQueueId> --into <targetQueueId>` 合并、`move <taskId> --queue <queueId>` 迁移;这些队列管理入口默认由主 server `code-queue-mgr` 直管 PostgreSQL,仍通过稳定 `code-queue` 用户服务代理路径访问。同一个 queue 内部串行执行,不同 queue 之间并行执行。迁移只允许尚未被 scheduler claim 的 `queued`/`retry_wait` 任务,必须满足 `startedAt=null`、`currentAttempt=0` 且没有 active thread/turn;已进入 `running`/`judging` 或已有 claim 标记的任务返回 409,不得被 move/merge 回写成 queued。合并会移动可迁移任务归属并自动删除源 queue 记录,只保留合并后的目标 queue;若 source 或 target queue 存在 active/claimed 任务,合并整体返回 409。合并后的目标 queue 按任务原 `queueEnteredAt`/`createdAt` 时间顺序串行,成功迁移 queued/retry_wait 任务后由 D601 scheduler 轮询推进。
|
||||
- 所有 `codex` 查询和管理命令必须走与 WebUI 相同的 backend-core 私有代理路径 `/api/microservices/code-queue/proxy/...`;CLI 不得为了提交、移动、中断、取消或队列管理直接调用 D601 内部 Service、数据库、pod curl 或 k3sctl scheduler 子服务。若该路径失败,应先修复 CLI/backend/provider tunnel 链路,而不是绕过控制面。
|
||||
- `job list` 与 `job status` 查询 `.state/jobs/` 文件系统状态,是异步命令的可观测入口。
|
||||
- `job list [--limit N] [--include-command]` 与 `job status <jobId|latest> [--tail-bytes N]` 查询 `.state/jobs/` 文件系统状态,是异步命令的可观测入口。`job list` 默认只返回最新 50 条摘要;`job status` 默认只返回 stdout/stderr 末尾 12000 字节,并带 `tailPolicy` 与完整日志路径。
|
||||
- `debug health`、`debug dispatch` 与 `debug task` 走真实内部 core、WebSocket、数据库、provider、系统指标、Docker 状态和 Host SSH 维护桥流程,只用于开发调试,不写入 `TEST.md` 的正式验收步骤。
|
||||
- `e2e run [--only pattern[,pattern...]] [--skip pattern[,pattern...]]` 使用 publicHost 派生的公开 frontend/provider ingress URL,并通过 Docker 内网验证 core API、PostgreSQL、provider self-connection、系统指标曲线、Docker 状态快照、provider.upgrade 预检和 Playwright 前端页面,是交付前的自动化 E2E 门禁;CLI 默认输出 check 状态摘要,完整诊断写入 `resultPath`,日常迭代应优先用 `--only` / `--skip` 跑最小必要集合。
|
||||
|
||||
@@ -46,7 +47,9 @@ UniDesk 的统一 CLI 入口是根目录 `scripts/cli.ts`,运行方式固定
|
||||
|
||||
每条命令的最外层 JSON 包含 `ok`、`command` 和 `data` 或 `error`。失败时 CLI 设置非零退出码,但仍然输出 JSON 错误对象;错误对象应包含 `name`、`message` 和可用的 `stack`。
|
||||
|
||||
`microservice proxy` 是面向人工验证和受控调试的私有后端入口。默认 method 为 GET;使用 `--body-json JSON`、`--body-file path` 或 `--body-stdin` 时默认 method 切换为 POST,也可显式加 `--method POST|PUT|PATCH|DELETE`,但 GET/HEAD 不允许携带请求体。所有请求仍受 config 中的 `allowedMethods` 和 `allowedPathPrefixes` 限制。为了避免 Pipeline snapshot 这类超大业务 JSON 造成 CLI 输出爆炸,响应 body 超过默认阈值时会返回 `bodyOmitted=true`、`bodyPreview`、`bodyBytes` 和 `rawHint`;需要完整 body 时显式添加 `--raw`,或用 `--max-body-bytes <N>` 调整预览阈值。正式 frontend 展示仍应优先使用业务控件和 `__unideskArrayLimit` 这类展示级裁剪参数,而不是默认倾倒完整 JSON。
|
||||
诊断命令默认采用渐进披露:`server logs`、`job list/status`、`codex task/trace/output` 和 `microservice proxy` 都必须有默认条数、字节数或文本预览上限;用户显式传 `--limit`、`--tail-bytes`、`--full-text` 或 `--full` 才扩大单次输出。CLI stdout 遇到下游 pipe 关闭的 `EPIPE` 必须安静退出,不得打印 Bun stack trace。
|
||||
|
||||
`microservice proxy` 是面向人工验证和受控调试的私有后端入口。默认 method 为 GET;使用 `--body-json JSON`、`--body-file path` 或 `--body-stdin` 时默认 method 切换为 POST,也可显式加 `--method POST|PUT|PATCH|DELETE`,但 GET/HEAD 不允许携带请求体。所有请求仍受 config 中的 `allowedMethods` 和 `allowedPathPrefixes` 限制。为了避免 Pipeline snapshot 这类超大业务 JSON 造成 CLI 输出爆炸,响应 body 超过默认阈值时会返回 `bodyOmitted=true`、`bodyPreview`、`bodyBytes` 和 `rawHint`;`--raw` 仍受默认硬限额保护,需要完整 body 时显式添加 `--raw --full`,或用 `--max-body-bytes <N>` 调整预览阈值。正式 frontend 展示仍应优先使用业务控件和 `__unideskArrayLimit` 这类展示级裁剪参数,而不是默认倾倒完整 JSON。
|
||||
|
||||
`network perf` 用于生成组网性能前后对比数据。标准 Code Queue overview 读路径基准命令是 `bun scripts/cli.ts network perf --service code-queue --path /api/tasks/overview?limit=30 --count 30 --concurrency 1 --label before`,远程主 server 可用 `bun scripts/cli.ts --main-server-ip 74.48.78.17 network perf ...`。输出包含成功/失败数、状态码分布、`x-unidesk-cache`、`x-unidesk-proxy-mode`、`x-unidesk-upstream-proxy-mode` 分布和 min/p50/p90/p95/max;provider-gateway 长连接数据面验收应看到 `proxyModeCounts.provider-ws-http-tunnel`,adapter native Service 数据面验收应看到 upstream proxy mode 为 `kubernetes-native-service`,若出现 `kubernetes-api-service-proxy` 必须结合 `/api/control-plane.nativeServiceProxy.failedServices` 解释 fallback 原因。
|
||||
|
||||
|
||||
@@ -29,6 +29,14 @@ Compose v2 安装后仍然必须遵守 UniDesk 的服务控制入口:全栈生
|
||||
|
||||
版本化用户服务部署优先使用 `bun scripts/cli.ts deploy apply`。`deploy.json` 只声明服务 `id`、`repo` 和 `commitId`;目标节点、Dockerfile、Compose、Kubernetes manifest、健康检查和代理路径继续来自 `config.json` 与现有 manifest。主 server 直管微服务和内部 sidecar,例如 `code-queue-mgr`,也必须支持这一路径:`deploy apply --service code-queue-mgr` 从 `deploy.json` 指定 commit 导出源码、构建镜像、替换固定 Compose service 并验证运行中镜像/健康信息的 commit。部署必须遵循 target-side build:服务部署到哪台 target,就在哪台 target 从 remote commit 导出源码、一次性代理构建镜像并部署;不得把中心构建镜像作为默认分发路径,也不得用 `docker commit` 或脏 worktree 作为部署输入。完整规则见 `docs/reference/deploy.md`。
|
||||
|
||||
## Main Server Swap
|
||||
|
||||
主 server 可能运行在约 2 GiB 内存的小规格机器上,短时 Docker build、Codex/control-plane 调查和日志读取会触发 global OOM。主 server 必须通过 `bun scripts/cli.ts server swap status` 暴露当前 memory/swap 状态,并在 `server status` 的 `swap` 字段中给出同一摘要。
|
||||
|
||||
缺少 active swap 时,正式修复入口是 `bun scripts/cli.ts server swap ensure [--path /swapfile] [--size 2GiB]`。该命令必须幂等:已有任何 active swap 时只返回 no-op 状态;无 swap 时创建固定 swapfile、设置 `0600`、执行 `mkswap` 与 `swapon`,并尽量把 `<path> none swap sw 0 0` 写入 `/etc/fstab`。如果当前环境允许 `swapon` 但不允许写 `/etc/fstab`,命令返回 `status=degraded`,并在 JSON 的 `errors`/`actions` 中说明下一步;不得静默假装持久化完成。
|
||||
|
||||
swap 管理不能被强塞进所有热路径。`server start/status` 可以暴露 warning 或摘要,但不会自动创建 swap;需要变更主机 swap 时必须显式运行 `server swap ensure`,并用返回的 `before`/`after` 和 `fstab.persisted` 作为验收记录。
|
||||
|
||||
## Start And Stop
|
||||
|
||||
`bun scripts/cli.ts server start` 与 `bun scripts/cli.ts server stop` 都是异步 job。启动 job 只执行固定 Compose project 的 `up -d --build --remove-orphans`,不得先 `down`,避免在 provider-gateway 旧容器或网络冲突时把长驻控制面容器先删掉又启动失败;停止 job 才允许执行 `down --remove-orphans`。启动和停止流程都禁止删除 Docker named volume。所有会改变主 server Compose 状态的 job 必须通过 `.state/locks/server-compose.lock` 串行化;连续 `server rebuild` 命令只代表连续创建异步 job,不能代表第一个 job 已结束,实际容器变更仍必须由 Compose lock 串行执行。
|
||||
|
||||
@@ -4,7 +4,7 @@ UniDesk 的可观测性优先级高于静默成功。CLI、服务日志、Docker
|
||||
|
||||
## CLI Logs
|
||||
|
||||
异步 job 的 stdout 和 stderr 位于 `.state/jobs/`。`job status` 会返回有限尾部,避免输出爆炸,同时保留完整日志文件路径便于继续排查。
|
||||
异步 job 的 stdout 和 stderr 位于 `.state/jobs/`。`job list` 默认只返回最新 50 条摘要;`job status` 会返回有限尾部,避免输出爆炸,同时保留完整日志文件路径便于继续排查。实现必须只读取日志尾部字节,不得先把完整 job 日志读入 CLI 内存。
|
||||
|
||||
## Service Logs
|
||||
|
||||
@@ -18,7 +18,13 @@ UniDesk 的可观测性优先级高于静默成功。CLI、服务日志、Docker
|
||||
|
||||
## Log Access
|
||||
|
||||
`bun scripts/cli.ts server logs` 同时读取文件日志和 Docker logs 尾部。文件日志是服务崩溃时的第一现场,Docker logs 是容器启动失败和 stdout/stderr 的辅助来源。
|
||||
`bun scripts/cli.ts server logs` 同时读取文件日志和 Docker logs 尾部。文件日志是服务崩溃时的第一现场,Docker logs 是容器启动失败和 stdout/stderr 的辅助来源。默认输出必须包含 tail 字节数、是否截断和完整文件路径;扩大读取范围只能通过显式 `--tail-bytes N`,且 CLI 会对单次 tail 设置硬上限。
|
||||
|
||||
## Diagnostic Output Limits
|
||||
|
||||
所有诊断型 CLI 输出必须优先摘要化、尾部化或分页化,禁止默认倾倒大 JSON、全量日志、全量 trace 或 `.state`/`logs` 宽泛搜索结果。当前硬限额入口包括:`server logs` 默认 3000 bytes tail、`job list` 默认 50 条、`job status` 默认 12000 bytes tail、`codex task/trace/output` 默认分页与文本预览、`microservice proxy` 默认 body 预览且 `--raw` 仍受硬限额保护。确实需要完整响应时必须显式使用对应的 `--full`、`--full-text`、`--tail-bytes` 或 `--limit` 参数,并在验收记录中说明为什么需要扩大输出。
|
||||
|
||||
CLI 写 stdout/stderr 遇到下游 pipe 关闭的 `EPIPE` 必须安静退出,不能打印 Bun stack trace。常见验证命令是 `set -o pipefail; bun scripts/cli.ts server status | head -1`,应只看到第一行 JSON 而无额外错误噪声。
|
||||
|
||||
## Task Liveness
|
||||
|
||||
@@ -32,6 +38,6 @@ frontend Bun server 必须提供同源 `/api/frontend-performance`,记录 webu
|
||||
|
||||
性能优化必须先用这些指标锁定慢操作名称、路径、耗时和代理层级,再改后端查询或前后端通信策略;不得只凭主观体感改 UI。Code Queue 这类控制面页面出现 `core_proxy`、`GET /api/microservices/code-queue/proxy/api/tasks/overview`、`POST /api/microservices/code-queue/proxy/api/tasks/<id>/read` 等超过 1s 的慢操作时,应保留优化前后的性能面板证据,并同时记录 live API 耗时、容器内存、`/health` 存储摘要和是否仍通过 PostgreSQL/append-only archive 重建历史数据。短 TTL cache、warmup 或页面内存缓存只能作为重复请求抖动保护,性能证据必须证明数据库索引/聚合、分页和渐进式披露本身已把核心路径降到目标内,不能用长缓存遮蔽慢 SQL 或全量 JSON 物化。
|
||||
|
||||
当最近失败请求集中出现 frontend `core_proxy` 502/503/504,路径为 `/api/microservices/code-queue/proxy/...` 的 overview、trace 或 summary,且 k3s/k8s Pod 仍在运行时,必须先运行 `bun scripts/cli.ts microservice diagnostics code-queue`,区分 provider-gateway online、WebSocket HTTP tunnel、k3sctl-adapter、Kubernetes API service proxy 和目标 Service 五段状态。provider tunnel 类失败必须记录响应 body/headers 中的 `requestId`、`stage`、`failureReason`、`x-unidesk-request-id` 和 `x-unidesk-tunnel-error`;如需主动验证错误结构,运行 `bun scripts/cli.ts microservice tunnel-self-test code-queue`,该自测应返回预期失败但 `ok=true` 的诊断结果。随后再继续判断“Kubernetes API service proxy 不可达”“Code Queue 进程不可达”和“Code Queue event loop 被热路径同步工作饿死”。排障顺序是同时查看 `/api/frontend-performance`、`/api/performance`、`k3sctl-adapter` `/api/control-plane`、Kubernetes Pod `/live`、`/health`、overview/trace-step curl、`kubectl top pod` 或 Docker stats、容器 `RestartCount`/`OOMKilled` 和 Code Queue 日志;如果 Pod 内 `/health` 也超时,应优先检查实时 output 发布、archive 读取、transcript 构建、统计计算、启动维护、历史 OA backfill 和远程 Provider 准备/SSH 子进程是否阻塞 event loop,而不是先调整 frontend 渲染或代理超时。Code Queue 默认不得在启动时自动执行历史 OA backfill 或通知表索引维护;显式 backfill 必须作为运维动作记录,并在运行期间并发证明 `/live`、`/health` 与 `/api/tasks/overview` 仍快速返回。涉及 D601 等远程 Provider 时,还要检查 `runCodeQueueSsh`/开发容器准备是否仍存在同步子进程、无 timeout 的 SSH、无上限 stdout/stderr 或 stale TUN 重建等待;修复后必须在远程准备探针运行期间并发证明 Pod `/health` 与 `/api/tasks/overview` 仍快速返回。
|
||||
当最近失败请求集中出现 frontend `core_proxy` 502/503/504,路径为 `/api/microservices/code-queue/proxy/...` 的 overview、trace 或 summary,且 k3s/k8s Pod 仍在运行时,必须先运行 `bun scripts/cli.ts microservice diagnostics code-queue`,区分 provider-gateway online、WebSocket HTTP tunnel、k3sctl-adapter、Kubernetes API service proxy 和目标 Service 五段状态。provider tunnel 类失败必须记录响应 body/headers 中的 `requestId`、`stage`、`failureReason`、`x-unidesk-request-id` 和 `x-unidesk-tunnel-error`;如需主动验证错误结构,运行 `bun scripts/cli.ts microservice tunnel-self-test code-queue`,该自测应返回预期失败但 `ok=true` 的诊断结果。随后再继续判断“Kubernetes API service proxy 不可达”“Code Queue 进程不可达”和“Code Queue event loop 被热路径同步工作饿死”。如果 `debug health` 或 provider-gateway egress health 显示 `providerGatewayEgressProxyActiveTunnels` 持续偏高、`pendingTunnels` 非零或 `oldestTunnelAgeMs` 长时间增长,应先按 provider-gateway egress tunnel 生命周期排障,确认 `egress_tcp_open`、connect timeout、idle cleanup 与 core socket close 清理是否生效。排障顺序是同时查看 `/api/frontend-performance`、`/api/performance`、`k3sctl-adapter` `/api/control-plane`、Kubernetes Pod `/live`、`/health`、overview/trace-step curl、`kubectl top pod` 或 Docker stats、容器 `RestartCount`/`OOMKilled` 和 Code Queue 日志;如果 Pod 内 `/health` 也超时,应优先检查实时 output 发布、archive 读取、transcript 构建、统计计算、启动维护、历史 OA backfill 和远程 Provider 准备/SSH 子进程是否阻塞 event loop,而不是先调整 frontend 渲染或代理超时。Code Queue 默认不得在启动时自动执行历史 OA backfill 或通知表索引维护;显式 backfill 必须作为运维动作记录,并在运行期间并发证明 `/live`、`/health` 与 `/api/tasks/overview` 仍快速返回。涉及 D601 等远程 Provider 时,还要检查 `runCodeQueueSsh`/开发容器准备是否仍存在同步子进程、无 timeout 的 SSH、无上限 stdout/stderr 或 stale TUN 重建等待;修复后必须在远程准备探针运行期间并发证明 Pod `/health` 与 `/api/tasks/overview` 仍快速返回。
|
||||
|
||||
Code Queue task 明明产出最终回复却反复 `retry_wait` 时,应优先用任务详情里的 latest attempt 字段核查 `terminalStatus`、`transportClosedBeforeTerminal`、`appServerExitCode`、`finalResponseChars`、`judge.raw._safetyOverride` 和 attempt output。OpenCode 远程任务中,`opencode completed status=completed exit=0` 加当前 attempt 非空 assistant 输出应对应 `terminalStatus=completed`、`transportClosedBeforeTerminal=false`;如果因为缺少 `step_finish` 事件仍触发 `_safetyOverride=terminal_not_completed`,说明协议终态归一化有回归。相反,当前 attempt 没有最终 assistant response 时即使 tool/read/bash 证据完整,也必须 retry,不能用旧 `task.finalResponse` 或 reasoning/tool evidence 代替可见最终回复。
|
||||
|
||||
@@ -100,10 +100,12 @@ backend-core 必须把 provider WebSocket HTTP tunnel 的失败分类到响应 b
|
||||
|
||||
provider-gateway 可以提供 egress HTTP CONNECT 代理,用于让 Code Queue、Pipeline runner、target-side Docker build 等节点侧执行环境通过既有 provider WebSocket 通道出网。代理默认监听容器内 `0.0.0.0:18789`,节点部署必须只发布为宿主 loopback `127.0.0.1:18789->18789/tcp`,不得开放公网端口;普通 Docker 执行容器可通过同一私有 Docker network 访问 provider-gateway 容器名,k3s/k8s Pod 必须通过显式 Kubernetes Service 暴露同节点 provider-gateway 私有 endpoint,例如 D601 Code Queue 使用 selector 指向 hostNetwork 桥接 Pod 的 `d601-provider-egress-proxy.unidesk.svc.cluster.local:18789`,不得把固定 Docker bridge IP、手工 EndpointSlice 或该 egress Service 当作业务 HTTP 入口。代理只负责把本地 CONNECT/absolute HTTP 请求转换为 `egress_tcp_open`、`egress_tcp_data`、`egress_tcp_close` 消息;backend-core 在主 server 侧建立真实 TCP 连接并把数据回传,避免 D601 等计算节点本地网络不可达时卡死 Codex/Git/NPM/apt/Playwright。
|
||||
|
||||
该能力属于 provider-gateway 通道能力,register/heartbeat 的 `unideskCapabilities` 必须包含 `network.egress-proxy`,labels 必须上报 `providerGatewayEgressProxy*` 状态。不得再为某个用户服务单独注册伪 provider 来实现出网代理;否则节点列表会出现虚假 provider,且代理、统计、升级路径会形成多套通道。代理健康检查使用 `GET /__unidesk/egress-proxy/health`,返回 `connected`、`providerId`、`activeTunnels` 和监听端口;业务服务自己的 `/health` 应把该结果作为排障证据透出。
|
||||
该能力属于 provider-gateway 通道能力,register/heartbeat 的 `unideskCapabilities` 必须包含 `network.egress-proxy`,labels 必须上报 `providerGatewayEgressProxy*` 状态。不得再为某个用户服务单独注册伪 provider 来实现出网代理;否则节点列表会出现虚假 provider,且代理、统计、升级路径会形成多套通道。代理健康检查使用 `GET /__unidesk/egress-proxy/health`,返回 `connected`、`providerId`、`activeTunnels`、`pendingTunnels`、`oldestTunnelAgeMs`、`openTimeoutMs`、`idleTimeoutMs` 和监听端口;业务服务自己的 `/health` 应把该结果作为排障证据透出。
|
||||
|
||||
egress proxy 的长期边界是“统一 provider 通道,不引入第二控制面”。backend-core 只接受在线 provider socket 上的 `egress_tcp_*` 消息,并在该 socket 关闭时销毁全部对应 TCP relay;provider-gateway 只维护本地 HTTP proxy 与 WebSocket 消息映射,不保存业务状态,不参与任务调度、统计或节点注册以外的控制面。执行容器、用户服务、Pipeline runner 和 provider-side deploy build 不允许直接连接 backend-core provider ingress,也不允许携带 provider token 自行注册;需要出网时只能连接同节点 provider-gateway 的私有 proxy endpoint。当前 k3s/k8s Code Queue 通过 `d601-provider-egress-proxy` Kubernetes Service 连接 D601 provider-gateway egress endpoint,这是 Pod 内的出网入口,不是业务 HTTP 代理入口,也不能替代 Kubernetes API service proxy。部署构建同样不得新建 SSH SOCKS、公网 master proxy 或宿主全局代理;构建脚本只能把 provider-gateway WS egress 作为短生命周期环境变量和 Docker build-arg 注入,并配合目标节点本地 BuildKit/image cache 避免重复下载大依赖层。
|
||||
|
||||
egress tunnel 必须有生命周期边界:provider-gateway 发出 `egress_tcp_open` 后如果主 server 未在 `openTimeoutMs` 内返回 `egress_tcp_opened` 或 close,必须主动关闭本地 client 并向 core 发送 `egress_tcp_close`;provider-gateway 与 backend-core 都必须对长时间无数据的 relay 执行 idle 清理,避免 provider WebSocket 抖动、TCP connect 卡住或上游未关闭时留下 stale tunnel。排障时如果 `activeTunnels` 持续增长、`pendingTunnels` 非零或 `oldestTunnelAgeMs` 明显超过业务请求耗时,应先看 provider-gateway 与 backend-core egress 清理日志,再判断 Code Queue、PostgreSQL 或 OA Event Flow 本身是否慢。
|
||||
|
||||
故障语义必须显式,不允许静默 fallback。provider-gateway 到 backend-core 的 WebSocket 未连接时,本地 proxy 必须返回 503;执行容器不能自动绕过到 D601 本地直连公网、外部公共代理或主 server 公网 HTTP 端口。`NO_PROXY` 只用于 PostgreSQL、OA Event Flow、ClaudeQQ、frontend/backend-core 内网代理、provider-gateway health 等明确内网链路,不能把 GitHub、模型 API、npm registry 等外部目标加入绕过列表。`hyueapi.com` 是明确的模型 API 例外:该上游会拒绝 provider-gateway egress proxy 出口,Code Queue 必须用 `CODE_QUEUE_EGRESS_PROXY_NO_PROXY` / `NO_PROXY` 将 `hyueapi.com,.hyueapi.com` 配成直连,其它模型 API 仍不得默认绕过 proxy。验收必须同时证明 provider-gateway labels、业务服务 `/health` 和执行容器内 `curl -I https://...` 都走同一 proxy path,hyueapi 例外则以 Code Queue `/health.egressProxy.noProxy` 和目标任务成功完成作为证据。
|
||||
|
||||
## Gateway Version Metadata
|
||||
|
||||
+18
-5
@@ -3,7 +3,7 @@ import { debugDispatch, debugHealth, debugTask, isDebugDispatchCommand, type Deb
|
||||
import { isRebuildableService, rebuildService, stackLogs, stackStatus, startStack, stopStack } from "./src/docker";
|
||||
import { parseE2ERunOptions, runE2E } from "./src/e2e";
|
||||
import { emitError, emitJson } from "./src/output";
|
||||
import { jobWithTail, listJobs, readJob, runJob } from "./src/jobs";
|
||||
import { jobWithTail, listJobs, listJobsSummary, readJob, runJob } from "./src/jobs";
|
||||
import { parseCheckOptions, runChecks } from "./src/check";
|
||||
import { runSsh } from "./src/ssh";
|
||||
import { extractRemoteCliOptions, runRemoteCli } from "./src/remote";
|
||||
@@ -15,6 +15,7 @@ import { runProviderCommand } from "./src/provider-attach";
|
||||
import { runScheduleCommand } from "./src/schedules";
|
||||
import { parseNetworkPerfOptions, runNetworkPerf } from "./src/network-perf";
|
||||
import { runCiCommand } from "./src/ci";
|
||||
import { runSwapCommand } from "./src/swap";
|
||||
|
||||
const remoteOptions = extractRemoteCliOptions(process.argv.slice(2));
|
||||
const args = remoteOptions.args;
|
||||
@@ -32,6 +33,7 @@ function help(): unknown {
|
||||
{ command: "server start", description: "Fire-and-forget build/start for database, backend-core, frontend, provider gateway, and managed main-server user services." },
|
||||
{ command: "server stop", description: "Fire-and-forget docker-compose down for the fixed UniDesk stack." },
|
||||
{ command: "server status", description: "Show fixed ports, containers, service health, and public URLs." },
|
||||
{ command: "server swap status|ensure [--path /swapfile] [--size 2GiB] [--dry-run]", description: "Inspect or idempotently create host swap for low-memory main-server operation." },
|
||||
{ command: "server logs [--tail-bytes N]", description: "Return bounded tails from file logs and docker logs." },
|
||||
{ command: "server rebuild <backend-core|frontend|provider-gateway|todo-note|code-queue-mgr|project-manager|baidu-netdisk|oa-event-flow>", description: "Build first, then serialize, force-recreate, and validate one Compose service." },
|
||||
{ command: "provider attach <providerId> [--master-server URL] [--up] [--force]", description: "Generate the minimal external provider-gateway env/compose bundle; only master server URL and provider id are required." },
|
||||
@@ -61,7 +63,7 @@ function help(): unknown {
|
||||
{ command: "codex judge <taskId> --attempt N [--dry-run] [--include-prompt]", description: "Replay one stored Code Queue attempt through the same judge context builder and MiniMax judge call path used by the live queue worker." },
|
||||
{ command: "codex interrupt|cancel <taskId>", description: "Request interrupt for a running Code Queue task, or cancel a queued/retry_wait task, through the same private proxy." },
|
||||
{ command: "codex (queues | queue create <queueId> | queue merge <sourceQueueId> --into <targetQueueId> | move <taskId> --queue <queueId>)", description: "List/create/merge Code Queue lanes and move a queued task; merge preserves task queue time order and deletes the source queue record." },
|
||||
{ command: "job list", description: "List async jobs from .state/jobs." },
|
||||
{ command: "job list [--limit N] [--include-command]", description: "List async jobs from .state/jobs with a bounded default page." },
|
||||
{ command: "job status <jobId|latest> [--tail-bytes N]", description: "Show job state with bounded stdout/stderr tails." },
|
||||
{ command: "debug health", description: "Probe internal core, nodes, system/Docker status, frontend, provider ingress, and public boundary." },
|
||||
{ command: "debug dispatch [providerId] [docker.ps|provider.upgrade|host.ssh|microservice.http|echo] [--wait-ms N]", description: "Submit a real internal-core dispatch request for CLI debugging." },
|
||||
@@ -82,6 +84,10 @@ function numberOption(name: string, defaultValue: number): number {
|
||||
return value;
|
||||
}
|
||||
|
||||
function boundedNumberOption(name: string, defaultValue: number, maxValue: number): number {
|
||||
return Math.min(numberOption(name, defaultValue), maxValue);
|
||||
}
|
||||
|
||||
function stringOption(name: string): string | undefined {
|
||||
const index = args.indexOf(name);
|
||||
if (index === -1) return undefined;
|
||||
@@ -174,8 +180,15 @@ async function main(): Promise<void> {
|
||||
emitJson(commandName, await stackStatus(config));
|
||||
return;
|
||||
}
|
||||
if (sub === "swap") {
|
||||
const result = runSwapCommand(args.slice(2));
|
||||
const ok = (result as { ok?: unknown }).ok !== false;
|
||||
emitJson(commandName, result, ok);
|
||||
if (!ok) process.exitCode = 1;
|
||||
return;
|
||||
}
|
||||
if (sub === "logs") {
|
||||
emitJson(commandName, stackLogs(config, numberOption("--tail-bytes", 3000)));
|
||||
emitJson(commandName, stackLogs(config, boundedNumberOption("--tail-bytes", 3000, 500_000)));
|
||||
return;
|
||||
}
|
||||
if (sub === "rebuild") {
|
||||
@@ -229,12 +242,12 @@ async function main(): Promise<void> {
|
||||
|
||||
if (top === "job") {
|
||||
if (sub === "list") {
|
||||
emitJson(commandName, { jobs: listJobs() });
|
||||
emitJson(commandName, listJobsSummary({ limit: boundedNumberOption("--limit", 50, 500), includeCommand: args.includes("--include-command") }));
|
||||
return;
|
||||
}
|
||||
if (sub === "status") {
|
||||
const id = third === "latest" || third === undefined ? latestJobId() : third;
|
||||
emitJson(commandName, { job: jobWithTail(readJob(id), numberOption("--tail-bytes", 12000)) });
|
||||
emitJson(commandName, { job: jobWithTail(readJob(id), boundedNumberOption("--tail-bytes", 12000, 500_000)) });
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
+13
-3
@@ -1,5 +1,5 @@
|
||||
import { spawn, spawnSync } from "node:child_process";
|
||||
import { createWriteStream, existsSync, readFileSync } from "node:fs";
|
||||
import { closeSync, createWriteStream, existsSync, openSync, readSync, statSync } from "node:fs";
|
||||
|
||||
export interface CommandResult {
|
||||
command: string[];
|
||||
@@ -56,6 +56,16 @@ export async function runCommandToFiles(command: string[], cwd: string, stdoutFi
|
||||
|
||||
export function tailFile(path: string, maxBytes = 8192): string {
|
||||
if (!existsSync(path)) return "";
|
||||
const content = readFileSync(path);
|
||||
return content.subarray(Math.max(0, content.length - maxBytes)).toString("utf8");
|
||||
const safeMaxBytes = Math.max(0, Math.floor(maxBytes));
|
||||
if (safeMaxBytes === 0) return "";
|
||||
const size = statSync(path).size;
|
||||
const bytesToRead = Math.min(size, safeMaxBytes);
|
||||
const buffer = Buffer.alloc(bytesToRead);
|
||||
const fd = openSync(path, "r");
|
||||
try {
|
||||
readSync(fd, buffer, 0, bytesToRead, size - bytesToRead);
|
||||
} finally {
|
||||
closeSync(fd);
|
||||
}
|
||||
return buffer.toString("utf8");
|
||||
}
|
||||
|
||||
+32
-4
@@ -1,8 +1,9 @@
|
||||
import { chmodSync, existsSync, mkdirSync, readFileSync, readdirSync, writeFileSync } from "node:fs";
|
||||
import { chmodSync, existsSync, mkdirSync, readFileSync, readdirSync, statSync, writeFileSync } from "node:fs";
|
||||
import { basename, dirname, join, resolve } from "node:path";
|
||||
import { commandOk, runCommand, tailFile } from "./command";
|
||||
import { type UniDeskConfig, repoRoot, rootPath } from "./config";
|
||||
import { startJob } from "./jobs";
|
||||
import { swapStatus } from "./swap";
|
||||
|
||||
export interface ComposeRuntimeEnv {
|
||||
envFile: string;
|
||||
@@ -436,6 +437,7 @@ export async function stackStatus(config: UniDeskConfig): Promise<unknown> {
|
||||
const overview = dockerExecJson("unidesk-backend-core", "fetch('http://127.0.0.1:8080/api/overview').then(r=>r.json()).then(j=>console.log(JSON.stringify({ok:true,status:200,body:j}))).catch(e=>{console.log(JSON.stringify({ok:false,error:String(e)}));process.exit(1)})");
|
||||
return {
|
||||
runtimeEnv,
|
||||
swap: swapStatus(),
|
||||
publicPorts: fixedPorts(config),
|
||||
blockedPublicPorts: [
|
||||
{ name: "backend-core-rest", port: config.network.core.port, listening: isPortListening(config.network.core.port), expected: "not-listening" },
|
||||
@@ -500,11 +502,37 @@ export function stackLogs(config: UniDeskConfig, tailBytes: number): unknown {
|
||||
const allFiles = listLogFiles(logRoot);
|
||||
const currentFiles = allFiles.filter((path) => basename(path).startsWith(runtimeEnv.logPrefix));
|
||||
const selectedFiles = (currentFiles.length > 0 ? currentFiles : allFiles.slice(-12)).slice(-12);
|
||||
const files = selectedFiles.map((path) => ({ path, name: basename(path), tail: tailFile(path, tailBytes) }));
|
||||
const files = selectedFiles.map((path) => {
|
||||
const sizeBytes = existsSync(path) ? statSync(path).size : 0;
|
||||
const truncated = sizeBytes > tailBytes;
|
||||
return { path, name: basename(path), sizeBytes, tailBytes, truncated, tail: tailFile(path, tailBytes) };
|
||||
});
|
||||
const containerNames = ["unidesk-database", "unidesk-backend-core", "unidesk-frontend", "unidesk-provider-gateway-main", "todo-note-backend", "project-manager-backend", "baidu-netdisk-backend", "oa-event-flow-backend"];
|
||||
const docker = containerNames.map((name) => {
|
||||
const result = runCommand(["docker", "logs", "--tail", "40", name], repoRoot);
|
||||
return { name, exitCode: result.exitCode, stdoutTail: result.stdout.slice(-tailBytes), stderrTail: result.stderr.slice(-tailBytes) };
|
||||
return {
|
||||
name,
|
||||
exitCode: result.exitCode,
|
||||
tailBytes,
|
||||
stdoutBytes: Buffer.byteLength(result.stdout, "utf8"),
|
||||
stderrBytes: Buffer.byteLength(result.stderr, "utf8"),
|
||||
stdoutTruncated: Buffer.byteLength(result.stdout, "utf8") > tailBytes,
|
||||
stderrTruncated: Buffer.byteLength(result.stderr, "utf8") > tailBytes,
|
||||
stdoutTail: result.stdout.slice(-tailBytes),
|
||||
stderrTail: result.stderr.slice(-tailBytes),
|
||||
};
|
||||
});
|
||||
return { logRoot, runtimeEnv, files, docker };
|
||||
return {
|
||||
logRoot,
|
||||
runtimeEnv,
|
||||
policy: {
|
||||
defaultTailBytes: 3000,
|
||||
requestedTailBytes: tailBytes,
|
||||
selectedFileLimit: 12,
|
||||
dockerTailLines: 40,
|
||||
disclosure: "server logs returns tails only; increase with --tail-bytes for a larger bounded tail, and inspect listed paths directly for full logs.",
|
||||
},
|
||||
files,
|
||||
docker,
|
||||
};
|
||||
}
|
||||
|
||||
+67
-3
@@ -1,5 +1,5 @@
|
||||
import { spawn, spawnSync } from "node:child_process";
|
||||
import { existsSync, mkdirSync, readFileSync, readdirSync, writeFileSync } from "node:fs";
|
||||
import { existsSync, mkdirSync, readFileSync, readdirSync, statSync, writeFileSync } from "node:fs";
|
||||
import { join } from "node:path";
|
||||
import { repoRoot, rootPath } from "./config";
|
||||
import { runCommandToFiles, tailFile } from "./command";
|
||||
@@ -141,6 +141,70 @@ export async function runJob(id: string): Promise<JobRecord> {
|
||||
return job;
|
||||
}
|
||||
|
||||
export function jobWithTail(job: JobRecord, maxBytes = 12000): JobRecord & { stdoutTail: string; stderrTail: string } {
|
||||
return { ...job, stdoutTail: tailFile(job.stdoutFile, maxBytes), stderrTail: tailFile(job.stderrFile, maxBytes) };
|
||||
export function jobWithTail(job: JobRecord, maxBytes = 12000): JobRecord & {
|
||||
tailPolicy: {
|
||||
requestedTailBytes: number;
|
||||
stdoutBytes: number;
|
||||
stderrBytes: number;
|
||||
stdoutTruncated: boolean;
|
||||
stderrTruncated: boolean;
|
||||
fullLogPaths: { stdoutFile: string; stderrFile: string };
|
||||
};
|
||||
stdoutTail: string;
|
||||
stderrTail: string;
|
||||
} {
|
||||
const stdoutBytes = existsSync(job.stdoutFile) ? statSync(job.stdoutFile).size : 0;
|
||||
const stderrBytes = existsSync(job.stderrFile) ? statSync(job.stderrFile).size : 0;
|
||||
return {
|
||||
...job,
|
||||
tailPolicy: {
|
||||
requestedTailBytes: maxBytes,
|
||||
stdoutBytes,
|
||||
stderrBytes,
|
||||
stdoutTruncated: stdoutBytes > maxBytes,
|
||||
stderrTruncated: stderrBytes > maxBytes,
|
||||
fullLogPaths: { stdoutFile: job.stdoutFile, stderrFile: job.stderrFile },
|
||||
},
|
||||
stdoutTail: tailFile(job.stdoutFile, maxBytes),
|
||||
stderrTail: tailFile(job.stderrFile, maxBytes),
|
||||
};
|
||||
}
|
||||
|
||||
export interface JobListOptions {
|
||||
limit?: number;
|
||||
includeCommand?: boolean;
|
||||
}
|
||||
|
||||
export function listJobsSummary(options: JobListOptions = {}): unknown {
|
||||
const limit = Math.max(1, Math.floor(options.limit ?? 50));
|
||||
const jobs = listJobs();
|
||||
const returned = jobs.slice(0, limit).map((job) => ({
|
||||
id: job.id,
|
||||
name: job.name,
|
||||
status: job.status,
|
||||
runner: job.runner,
|
||||
runnerPid: job.runnerPid ?? null,
|
||||
runnerContainer: job.runnerContainer ?? null,
|
||||
createdAt: job.createdAt,
|
||||
startedAt: job.startedAt,
|
||||
finishedAt: job.finishedAt,
|
||||
exitCode: job.exitCode,
|
||||
note: job.note,
|
||||
stdoutFile: job.stdoutFile,
|
||||
stderrFile: job.stderrFile,
|
||||
...(options.includeCommand === true ? { command: job.command, cwd: job.cwd } : {}),
|
||||
}));
|
||||
return {
|
||||
jobs: returned,
|
||||
total: jobs.length,
|
||||
returned: returned.length,
|
||||
limit,
|
||||
truncated: jobs.length > returned.length,
|
||||
disclosure: {
|
||||
defaultLimit: 50,
|
||||
nextCommand: jobs.length > returned.length ? `bun scripts/cli.ts job list --limit ${Math.min(jobs.length, limit * 2)}` : null,
|
||||
includeCommandCommand: "bun scripts/cli.ts job list --include-command",
|
||||
statusCommand: "bun scripts/cli.ts job status <jobId> --tail-bytes 12000",
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
@@ -3,18 +3,51 @@ import { runCommand } from "./command";
|
||||
import { type UniDeskConfig, repoRoot } from "./config";
|
||||
import { jsonByteLength, previewJson } from "./preview";
|
||||
|
||||
export function coreInternalFetch(path: string, init?: { method?: string; body?: unknown }): unknown {
|
||||
export function coreInternalFetch(path: string, init?: { method?: string; body?: unknown; maxResponseBytes?: number }): unknown {
|
||||
if (!path.startsWith("/")) throw new Error("core internal path must start with /");
|
||||
const maxResponseBytes = Math.max(1024, Math.floor(init?.maxResponseBytes ?? 5_000_000));
|
||||
const code = `
|
||||
const res = await fetch(${JSON.stringify(`http://127.0.0.1:8080${path}`)}, ${JSON.stringify({
|
||||
method: init?.method ?? "GET",
|
||||
headers: init?.body === undefined ? undefined : { "content-type": "application/json" },
|
||||
body: init?.body === undefined ? undefined : JSON.stringify(init.body),
|
||||
})});
|
||||
const text = await res.text();
|
||||
const maxResponseBytes = ${JSON.stringify(maxResponseBytes)};
|
||||
const reader = res.body?.getReader();
|
||||
const chunks = [];
|
||||
let bytes = 0;
|
||||
let responseTruncated = false;
|
||||
if (reader) {
|
||||
while (true) {
|
||||
const { done, value } = await reader.read();
|
||||
if (done) break;
|
||||
if (bytes + value.byteLength > maxResponseBytes) {
|
||||
const keep = Math.max(0, maxResponseBytes - bytes);
|
||||
if (keep > 0) {
|
||||
chunks.push(value.slice(0, keep));
|
||||
bytes += keep;
|
||||
}
|
||||
responseTruncated = true;
|
||||
try { await reader.cancel(); } catch {}
|
||||
break;
|
||||
}
|
||||
chunks.push(value);
|
||||
bytes += value.byteLength;
|
||||
}
|
||||
}
|
||||
const buffer = new Uint8Array(bytes);
|
||||
let offset = 0;
|
||||
for (const chunk of chunks) {
|
||||
buffer.set(chunk, offset);
|
||||
offset += chunk.byteLength;
|
||||
}
|
||||
const text = new TextDecoder().decode(buffer);
|
||||
let body = null;
|
||||
try { body = text ? JSON.parse(text) : null; } catch { body = { text }; }
|
||||
console.log(JSON.stringify({ ok: res.ok, status: res.status, body }));
|
||||
try { body = text && !responseTruncated ? JSON.parse(text) : null; } catch { body = { text }; }
|
||||
if (responseTruncated) {
|
||||
body = { _unideskResponseTruncated: true, maxResponseBytes, bytesRead: bytes, contentLength: res.headers.get("content-length"), textPreview: text };
|
||||
}
|
||||
console.log(JSON.stringify({ ok: res.ok, status: res.status, responseTruncated, responseBytesRead: bytes, responseContentLength: res.headers.get("content-length"), body }));
|
||||
`;
|
||||
const result = runCommand(["docker", "exec", "unidesk-backend-core", "bun", "-e", code], repoRoot);
|
||||
if (result.exitCode !== 0) {
|
||||
@@ -51,6 +84,11 @@ function numberOption(args: string[], name: string, defaultValue: number): numbe
|
||||
return value;
|
||||
}
|
||||
|
||||
function cappedNumberOption(args: string[], name: string, defaultValue: number, maxValue: number): number {
|
||||
const value = numberOption(args, name, defaultValue);
|
||||
return Math.min(value, maxValue);
|
||||
}
|
||||
|
||||
function stringOption(args: string[], name: string): string | undefined {
|
||||
const index = args.indexOf(name);
|
||||
if (index === -1) return undefined;
|
||||
@@ -95,13 +133,34 @@ function methodOption(args: string[], hasBody = false): string {
|
||||
}
|
||||
|
||||
export function summarizeMicroserviceProxyResponse(response: unknown, args: string[]): unknown {
|
||||
if (args.includes("--raw")) return response;
|
||||
const maxBodyBytes = numberOption(args, "--max-body-bytes", 60_000);
|
||||
const full = args.includes("--full");
|
||||
const raw = args.includes("--raw");
|
||||
const maxBodyBytes = full ? numberOption(args, "--max-body-bytes", 5_000_000) : cappedNumberOption(args, "--max-body-bytes", raw ? 120_000 : 60_000, 500_000);
|
||||
if (typeof response !== "object" || response === null || Array.isArray(response)) return response;
|
||||
const record = response as Record<string, unknown>;
|
||||
if (!("body" in record)) return response;
|
||||
if (record.responseTruncated === true) {
|
||||
return {
|
||||
...record,
|
||||
bodyOmitted: true,
|
||||
bodyMaxBytes: maxBodyBytes,
|
||||
rawHint: "The upstream response exceeded the CLI collection cap before JSON parsing; re-run with --raw --full and a specific --max-body-bytes only when the full body is required.",
|
||||
};
|
||||
}
|
||||
const bodyBytes = jsonByteLength(record.body);
|
||||
if (bodyBytes <= maxBodyBytes) return response;
|
||||
if (bodyBytes <= maxBodyBytes) {
|
||||
if (!raw || full) return response;
|
||||
return {
|
||||
...record,
|
||||
outputPolicy: {
|
||||
rawRequested: true,
|
||||
bounded: true,
|
||||
maxBodyBytes,
|
||||
bodyBytes,
|
||||
fullCommand: "Re-run with --raw --full to allow the complete body.",
|
||||
},
|
||||
};
|
||||
}
|
||||
const rest = { ...record };
|
||||
delete rest.body;
|
||||
return {
|
||||
@@ -110,7 +169,9 @@ export function summarizeMicroserviceProxyResponse(response: unknown, args: stri
|
||||
bodyBytes,
|
||||
bodyMaxBytes: maxBodyBytes,
|
||||
bodyPreview: previewJson(record.body, { maxDepth: 3, maxArrayItems: 3, maxObjectKeys: 16, maxStringLength: 320 }),
|
||||
rawHint: "Re-run with --raw for the full body, or add/tighten __unideskArrayLimit=<path>:<limit> in the proxied path.",
|
||||
rawHint: raw && !full
|
||||
? "The --raw response exceeded the default hard limit; re-run with --raw --full for the complete body, or add/tighten __unideskArrayLimit=<path>:<limit> in the proxied path."
|
||||
: "Re-run with --raw --full for the complete body, or add/tighten __unideskArrayLimit=<path>:<limit> in the proxied path.",
|
||||
};
|
||||
}
|
||||
|
||||
@@ -137,7 +198,11 @@ export async function runMicroserviceCommand(_config: UniDeskConfig, args: strin
|
||||
const id = requireId(idArg, "microservice proxy");
|
||||
const path = requireProxyPath(pathArg);
|
||||
const body = requestBodyOption(args);
|
||||
return summarizeMicroserviceProxyResponse(coreInternalFetch(`/api/microservices/${encodeId(id)}/proxy${path}`, { method: methodOption(args, body !== undefined), body }), args);
|
||||
const full = hasFlag(args, "--full");
|
||||
const raw = hasFlag(args, "--raw");
|
||||
const maxBodyBytes = full ? numberOption(args, "--max-body-bytes", 5_000_000) : cappedNumberOption(args, "--max-body-bytes", raw ? 120_000 : 60_000, 500_000);
|
||||
const maxResponseBytes = full ? Math.min(Math.max(maxBodyBytes, 120_000), 5_000_000) : Math.min(Math.max(maxBodyBytes * 3, 240_000), 1_500_000);
|
||||
return summarizeMicroserviceProxyResponse(coreInternalFetch(`/api/microservices/${encodeId(id)}/proxy${path}`, { method: methodOption(args, body !== undefined), body, maxResponseBytes }), args);
|
||||
}
|
||||
throw new Error("microservice command must be one of: list, status, health, diagnostics, tunnel-self-test, proxy");
|
||||
}
|
||||
|
||||
+15
-4
@@ -5,6 +5,20 @@ export interface JsonEnvelope<T> {
|
||||
error?: unknown;
|
||||
}
|
||||
|
||||
function isEpipe(error: unknown): boolean {
|
||||
return typeof error === "object" && error !== null && "code" in error && (error as { code?: unknown }).code === "EPIPE";
|
||||
}
|
||||
|
||||
process.stdout.on("error", (error) => {
|
||||
if (isEpipe(error)) process.exit(0);
|
||||
throw error;
|
||||
});
|
||||
|
||||
process.stderr.on("error", (error) => {
|
||||
if (isEpipe(error)) process.exit(0);
|
||||
throw error;
|
||||
});
|
||||
|
||||
export function emitJson<T>(command: string, data: T, ok = true): void {
|
||||
const envelope: JsonEnvelope<T> = { ok, command, data };
|
||||
safeStdoutWrite(`${JSON.stringify(envelope, null, 2)}\n`);
|
||||
@@ -22,10 +36,7 @@ function safeStdoutWrite(text: string): void {
|
||||
try {
|
||||
process.stdout.write(text);
|
||||
} catch (error) {
|
||||
if (typeof error === "object" && error !== null && "code" in error && (error as { code?: unknown }).code === "EPIPE") {
|
||||
process.exitCode = 0;
|
||||
return;
|
||||
}
|
||||
if (isEpipe(error)) process.exit(0);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
+53
-7
@@ -27,6 +27,9 @@ interface FetchJsonResult {
|
||||
status?: number;
|
||||
body?: unknown;
|
||||
error?: string;
|
||||
responseTruncated?: boolean;
|
||||
responseBytesRead?: number;
|
||||
responseContentLength?: string | null;
|
||||
}
|
||||
|
||||
const hostOptions = new Set(["--main-server-ip", "--main-server", "--server"]);
|
||||
@@ -172,19 +175,54 @@ function frontendBaseUrl(host: string, config: UniDeskConfig): string {
|
||||
return `http://${host}:${config.network.frontend.port}`;
|
||||
}
|
||||
|
||||
async function readJson(url: string, init?: RequestInit, timeoutMs = 8000): Promise<FetchJsonResult> {
|
||||
async function readJson(url: string, init?: RequestInit, timeoutMs = 8000, maxResponseBytes = 5_000_000): Promise<FetchJsonResult> {
|
||||
const controller = new AbortController();
|
||||
const timer = setTimeout(() => controller.abort(), timeoutMs);
|
||||
try {
|
||||
const res = await fetch(url, { ...init, signal: controller.signal });
|
||||
const text = await res.text();
|
||||
const reader = res.body?.getReader();
|
||||
const chunks: Uint8Array[] = [];
|
||||
let bytes = 0;
|
||||
let responseTruncated = false;
|
||||
if (reader !== undefined) {
|
||||
while (true) {
|
||||
const { done, value } = await reader.read();
|
||||
if (done) break;
|
||||
if (bytes + value.byteLength > maxResponseBytes) {
|
||||
const keep = Math.max(0, maxResponseBytes - bytes);
|
||||
if (keep > 0) {
|
||||
chunks.push(value.slice(0, keep));
|
||||
bytes += keep;
|
||||
}
|
||||
responseTruncated = true;
|
||||
try {
|
||||
await reader.cancel();
|
||||
} catch {
|
||||
// Ignore cancel failures after the bounded preview has been collected.
|
||||
}
|
||||
break;
|
||||
}
|
||||
chunks.push(value);
|
||||
bytes += value.byteLength;
|
||||
}
|
||||
}
|
||||
const buffer = new Uint8Array(bytes);
|
||||
let offset = 0;
|
||||
for (const chunk of chunks) {
|
||||
buffer.set(chunk, offset);
|
||||
offset += chunk.byteLength;
|
||||
}
|
||||
const text = new TextDecoder().decode(buffer);
|
||||
let body: unknown = null;
|
||||
try {
|
||||
body = text.length > 0 ? JSON.parse(text) : null;
|
||||
body = text.length > 0 && !responseTruncated ? JSON.parse(text) : null;
|
||||
} catch {
|
||||
body = { text };
|
||||
}
|
||||
return { ok: res.ok, status: res.status, body };
|
||||
if (responseTruncated) {
|
||||
body = { _unideskResponseTruncated: true, maxResponseBytes, bytesRead: bytes, contentLength: res.headers.get("content-length"), textPreview: text };
|
||||
}
|
||||
return { ok: res.ok, status: res.status, body, responseTruncated, responseBytesRead: bytes, responseContentLength: res.headers.get("content-length") };
|
||||
} catch (error) {
|
||||
return { ok: false, error: error instanceof Error ? error.message : String(error) };
|
||||
} finally {
|
||||
@@ -208,11 +246,11 @@ async function loginFrontend(host: string, config: UniDeskConfig): Promise<Front
|
||||
return { baseUrl, cookie };
|
||||
}
|
||||
|
||||
async function frontendJson(session: FrontendSession, path: string, init?: RequestInit, timeoutMs = 8000): Promise<FetchJsonResult> {
|
||||
async function frontendJson(session: FrontendSession, path: string, init?: RequestInit, timeoutMs = 8000, maxResponseBytes = 5_000_000): Promise<FetchJsonResult> {
|
||||
const headers = new Headers(init?.headers);
|
||||
headers.set("cookie", session.cookie);
|
||||
if (init?.body !== undefined && !headers.has("content-type")) headers.set("content-type", "application/json");
|
||||
return readJson(`${session.baseUrl}${path}`, { ...init, headers }, timeoutMs);
|
||||
return readJson(`${session.baseUrl}${path}`, { ...init, headers }, timeoutMs, maxResponseBytes);
|
||||
}
|
||||
|
||||
function stringOption(args: string[], name: string): string | undefined {
|
||||
@@ -231,6 +269,10 @@ function numberOption(args: string[], name: string, defaultValue: number): numbe
|
||||
return value;
|
||||
}
|
||||
|
||||
function cappedNumberOption(args: string[], name: string, defaultValue: number, maxValue: number): number {
|
||||
return Math.min(numberOption(args, name, defaultValue), maxValue);
|
||||
}
|
||||
|
||||
function jsonOption(args: string[], name: string): Record<string, unknown> | undefined {
|
||||
const raw = stringOption(args, name);
|
||||
if (raw === undefined) return undefined;
|
||||
@@ -462,7 +504,11 @@ async function remoteMicroservice(session: FrontendSession, args: string[]): Pro
|
||||
};
|
||||
}
|
||||
if (action === "proxy" && id !== undefined && path !== undefined && path.startsWith("/")) {
|
||||
const response = await frontendJson(session, `/api/microservices/${encodeURIComponent(id)}/proxy${path}`, undefined, 24_000);
|
||||
const full = args.includes("--full");
|
||||
const raw = args.includes("--raw");
|
||||
const maxBodyBytes = full ? numberOption(args, "--max-body-bytes", 5_000_000) : cappedNumberOption(args, "--max-body-bytes", raw ? 120_000 : 60_000, 500_000);
|
||||
const maxResponseBytes = full ? Math.min(Math.max(maxBodyBytes, 120_000), 5_000_000) : Math.min(Math.max(maxBodyBytes * 3, 240_000), 1_500_000);
|
||||
const response = await frontendJson(session, `/api/microservices/${encodeURIComponent(id)}/proxy${path}`, undefined, 24_000, maxResponseBytes);
|
||||
return {
|
||||
transport: "frontend",
|
||||
response: summarizeMicroserviceProxyResponse(response, args),
|
||||
|
||||
@@ -0,0 +1,303 @@
|
||||
import { accessSync, constants, existsSync, readFileSync, statSync } from "node:fs";
|
||||
import { runCommand } from "./command";
|
||||
import { repoRoot } from "./config";
|
||||
|
||||
const defaultSwapPath = "/swapfile";
|
||||
const defaultSwapSizeBytes = 2 * 1024 * 1024 * 1024;
|
||||
|
||||
export interface SwapArea {
|
||||
filename: string;
|
||||
type: string;
|
||||
sizeBytes: number;
|
||||
usedBytes: number;
|
||||
priority: number | null;
|
||||
}
|
||||
|
||||
export interface SwapMemoryStatus {
|
||||
totalBytes: number;
|
||||
availableBytes: number | null;
|
||||
swapTotalBytes: number;
|
||||
swapFreeBytes: number;
|
||||
}
|
||||
|
||||
export interface SwapStatus {
|
||||
memory: SwapMemoryStatus;
|
||||
activeSwaps: SwapArea[];
|
||||
configuredPath: string;
|
||||
configuredPathExists: boolean;
|
||||
configuredPathMode: string | null;
|
||||
configuredPathSizeBytes: number | null;
|
||||
configuredPathActive: boolean;
|
||||
fstab: {
|
||||
path: string;
|
||||
writable: boolean;
|
||||
persisted: boolean;
|
||||
matchingLine: string | null;
|
||||
error: string | null;
|
||||
};
|
||||
warning: string | null;
|
||||
}
|
||||
|
||||
export interface SwapEnsureResult {
|
||||
ok: boolean;
|
||||
status: "ok" | "degraded" | "failed";
|
||||
requested: {
|
||||
path: string;
|
||||
sizeBytes: number;
|
||||
};
|
||||
before: SwapStatus;
|
||||
after: SwapStatus;
|
||||
actions: Array<{ action: string; ok: boolean; detail?: unknown }>;
|
||||
errors: Array<{ action: string; message: string; detail?: unknown }>;
|
||||
}
|
||||
|
||||
function shellQuote(value: string): string {
|
||||
return `'${value.replace(/'/g, `'\\''`)}'`;
|
||||
}
|
||||
|
||||
function parseByteCount(value: string): number {
|
||||
const raw = value.trim();
|
||||
if (/^\d+$/u.test(raw)) return Number(raw);
|
||||
const match = raw.match(/^([0-9]+(?:\.[0-9]+)?)([KMGTPE]?i?B?)$/iu);
|
||||
if (!match) return 0;
|
||||
const amount = Number(match[1]);
|
||||
const unit = match[2].toUpperCase();
|
||||
const powers: Record<string, number> = {
|
||||
K: 1,
|
||||
KB: 1,
|
||||
KIB: 1,
|
||||
M: 2,
|
||||
MB: 2,
|
||||
MIB: 2,
|
||||
G: 3,
|
||||
GB: 3,
|
||||
GIB: 3,
|
||||
T: 4,
|
||||
TB: 4,
|
||||
TIB: 4,
|
||||
P: 5,
|
||||
PB: 5,
|
||||
PIB: 5,
|
||||
E: 6,
|
||||
EB: 6,
|
||||
EIB: 6,
|
||||
};
|
||||
return Math.round(amount * (1024 ** (powers[unit] ?? 0)));
|
||||
}
|
||||
|
||||
function parseMeminfo(): SwapMemoryStatus {
|
||||
const raw = readFileSync("/proc/meminfo", "utf8");
|
||||
const values = new Map<string, number>();
|
||||
for (const line of raw.split("\n")) {
|
||||
const match = line.match(/^([^:]+):\s+(\d+)\s+kB/u);
|
||||
if (match) values.set(match[1], Number(match[2]) * 1024);
|
||||
}
|
||||
return {
|
||||
totalBytes: values.get("MemTotal") ?? 0,
|
||||
availableBytes: values.get("MemAvailable") ?? null,
|
||||
swapTotalBytes: values.get("SwapTotal") ?? 0,
|
||||
swapFreeBytes: values.get("SwapFree") ?? 0,
|
||||
};
|
||||
}
|
||||
|
||||
function parseSwaps(): SwapArea[] {
|
||||
if (!existsSync("/proc/swaps")) return [];
|
||||
const lines = readFileSync("/proc/swaps", "utf8").trim().split("\n").slice(1);
|
||||
return lines.map((line) => line.trim().split(/\s+/u)).filter((parts) => parts.length >= 5).map(([filename, type, sizeKiB, usedKiB, priority]) => ({
|
||||
filename,
|
||||
type,
|
||||
sizeBytes: Number(sizeKiB) * 1024,
|
||||
usedBytes: Number(usedKiB) * 1024,
|
||||
priority: Number.isFinite(Number(priority)) ? Number(priority) : null,
|
||||
}));
|
||||
}
|
||||
|
||||
function fileMode(path: string): string | null {
|
||||
if (!existsSync(path)) return null;
|
||||
return (statSync(path).mode & 0o777).toString(8).padStart(3, "0");
|
||||
}
|
||||
|
||||
function fstabStatus(path: string): SwapStatus["fstab"] {
|
||||
const fstabPath = "/etc/fstab";
|
||||
try {
|
||||
const raw = existsSync(fstabPath) ? readFileSync(fstabPath, "utf8") : "";
|
||||
let writable = false;
|
||||
try {
|
||||
accessSync(fstabPath, constants.W_OK);
|
||||
writable = true;
|
||||
} catch {
|
||||
writable = false;
|
||||
}
|
||||
const matchingLine = raw.split("\n").find((line) => {
|
||||
const trimmed = line.trim();
|
||||
if (trimmed.length === 0 || trimmed.startsWith("#")) return false;
|
||||
const parts = trimmed.split(/\s+/u);
|
||||
return parts[0] === path && parts[2] === "swap";
|
||||
}) ?? null;
|
||||
return {
|
||||
path: fstabPath,
|
||||
writable,
|
||||
persisted: matchingLine !== null,
|
||||
matchingLine,
|
||||
error: null,
|
||||
};
|
||||
} catch (error) {
|
||||
return {
|
||||
path: fstabPath,
|
||||
writable: false,
|
||||
persisted: false,
|
||||
matchingLine: null,
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
export function swapStatus(path = defaultSwapPath): SwapStatus {
|
||||
const memory = parseMeminfo();
|
||||
const activeSwaps = parseSwaps();
|
||||
const configuredPathExists = existsSync(path);
|
||||
const configuredPathSizeBytes = configuredPathExists ? statSync(path).size : null;
|
||||
const configuredPathActive = activeSwaps.some((swap) => swap.filename === path);
|
||||
const warning = memory.swapTotalBytes > 0 ? null : "swap is not active; low-memory main servers are at risk of global OOM during builds or diagnostics";
|
||||
return {
|
||||
memory,
|
||||
activeSwaps,
|
||||
configuredPath: path,
|
||||
configuredPathExists,
|
||||
configuredPathMode: fileMode(path),
|
||||
configuredPathSizeBytes,
|
||||
configuredPathActive,
|
||||
fstab: fstabStatus(path),
|
||||
warning,
|
||||
};
|
||||
}
|
||||
|
||||
function pushAction(
|
||||
actions: SwapEnsureResult["actions"],
|
||||
errors: SwapEnsureResult["errors"],
|
||||
action: string,
|
||||
command: string[],
|
||||
): boolean {
|
||||
const result = runCommand(command, repoRoot, { timeoutMs: 120_000 });
|
||||
const ok = result.exitCode === 0;
|
||||
const detail = {
|
||||
command,
|
||||
exitCode: result.exitCode,
|
||||
stdoutTail: result.stdout.slice(-1200),
|
||||
stderrTail: result.stderr.slice(-1200),
|
||||
timedOut: result.timedOut,
|
||||
};
|
||||
actions.push({ action, ok, detail });
|
||||
if (!ok) {
|
||||
errors.push({
|
||||
action,
|
||||
message: result.stderr.trim() || result.stdout.trim() || `command failed with exit code ${result.exitCode}`,
|
||||
detail,
|
||||
});
|
||||
}
|
||||
return ok;
|
||||
}
|
||||
|
||||
function ensureFstabLine(path: string): { ok: boolean; action: string; detail: unknown } {
|
||||
const line = `${path} none swap sw 0 0`;
|
||||
const script = [
|
||||
"set -euo pipefail",
|
||||
"touch /etc/fstab",
|
||||
`grep -Eq '^${path.replace(/[.*+?^${}()|[\]\\]/g, "\\$&")}[[:space:]]+[^[:space:]]+[[:space:]]+swap[[:space:]]' /etc/fstab || printf '%s\\n' ${shellQuote(line)} >> /etc/fstab`,
|
||||
].join("\n");
|
||||
const result = runCommand(["bash", "-lc", script], repoRoot, { timeoutMs: 30_000 });
|
||||
return {
|
||||
ok: result.exitCode === 0,
|
||||
action: "persist-fstab",
|
||||
detail: {
|
||||
command: ["bash", "-lc", script],
|
||||
exitCode: result.exitCode,
|
||||
stdoutTail: result.stdout.slice(-1200),
|
||||
stderrTail: result.stderr.slice(-1200),
|
||||
timedOut: result.timedOut,
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
function parseSizeOption(args: string[], defaultBytes: number): number {
|
||||
const index = args.indexOf("--size");
|
||||
const raw = index === -1 ? undefined : args[index + 1];
|
||||
if (raw === undefined) return defaultBytes;
|
||||
const bytes = parseByteCount(raw);
|
||||
if (!Number.isFinite(bytes) || bytes <= 0) throw new Error("--size must be a positive byte count such as 2GiB or 4096M");
|
||||
return bytes;
|
||||
}
|
||||
|
||||
function parsePathOption(args: string[], defaultPath: string): string {
|
||||
const index = args.indexOf("--path");
|
||||
if (index === -1) return defaultPath;
|
||||
const raw = args[index + 1];
|
||||
if (raw === undefined || !raw.startsWith("/")) throw new Error("--path must be an absolute path");
|
||||
return raw;
|
||||
}
|
||||
|
||||
function hasFlag(args: string[], name: string): boolean {
|
||||
return args.includes(name);
|
||||
}
|
||||
|
||||
export function runSwapCommand(args: string[]): unknown {
|
||||
const [action = "status"] = args;
|
||||
const path = parsePathOption(args, defaultSwapPath);
|
||||
if (action === "status") return swapStatus(path);
|
||||
if (action === "ensure") {
|
||||
const sizeBytes = parseSizeOption(args, defaultSwapSizeBytes);
|
||||
const dryRun = hasFlag(args, "--dry-run");
|
||||
const before = swapStatus(path);
|
||||
const actions: SwapEnsureResult["actions"] = [];
|
||||
const errors: SwapEnsureResult["errors"] = [];
|
||||
if (before.memory.swapTotalBytes > 0) {
|
||||
actions.push({ action: "noop-existing-swap", ok: true, detail: { activeSwaps: before.activeSwaps } });
|
||||
const after = swapStatus(path);
|
||||
return { ok: true, status: "ok", requested: { path, sizeBytes }, before, after, actions, errors } satisfies SwapEnsureResult;
|
||||
}
|
||||
if (dryRun) {
|
||||
actions.push({ action: "dry-run", ok: true, detail: { wouldCreate: path, sizeBytes, wouldPersistFstab: true } });
|
||||
const after = swapStatus(path);
|
||||
return { ok: true, status: "degraded", requested: { path, sizeBytes }, before, after, actions, errors } satisfies SwapEnsureResult;
|
||||
}
|
||||
if (!existsSync(path)) {
|
||||
const sizeMiB = Math.ceil(sizeBytes / 1024 / 1024);
|
||||
const allocated = pushAction(actions, errors, "allocate-swapfile", ["fallocate", "-l", `${sizeMiB}M`, path]);
|
||||
if (!allocated) pushAction(actions, errors, "allocate-swapfile-dd-fallback", ["dd", "if=/dev/zero", `of=${path}`, "bs=1M", `count=${sizeMiB}`, "status=none"]);
|
||||
} else {
|
||||
const existingBytes = statSync(path).size;
|
||||
if (existingBytes < sizeBytes) {
|
||||
const sizeMiB = Math.ceil(sizeBytes / 1024 / 1024);
|
||||
const resized = pushAction(actions, errors, "resize-existing-swapfile", ["fallocate", "-l", `${sizeMiB}M`, path]);
|
||||
if (!resized) pushAction(actions, errors, "resize-existing-swapfile-dd-fallback", ["dd", "if=/dev/zero", `of=${path}`, "bs=1M", `count=${sizeMiB}`, "status=none"]);
|
||||
} else {
|
||||
actions.push({ action: "reuse-existing-swapfile-path", ok: true, detail: { path, sizeBytes: existingBytes } });
|
||||
}
|
||||
}
|
||||
pushAction(actions, errors, "chmod-600", ["chmod", "600", path]);
|
||||
pushAction(actions, errors, "mkswap", ["mkswap", path]);
|
||||
pushAction(actions, errors, "swapon", ["swapon", path]);
|
||||
const persist = ensureFstabLine(path);
|
||||
actions.push({ action: persist.action, ok: persist.ok, detail: persist.detail });
|
||||
if (!persist.ok) {
|
||||
errors.push({
|
||||
action: persist.action,
|
||||
message: "swap is active but /etc/fstab could not be updated; rerun ensure as root or add the returned fstab line manually",
|
||||
detail: persist.detail,
|
||||
});
|
||||
}
|
||||
const after = swapStatus(path);
|
||||
const swapActive = after.memory.swapTotalBytes > 0;
|
||||
const status = swapActive && after.fstab.persisted ? "ok" : swapActive ? "degraded" : "failed";
|
||||
return {
|
||||
ok: status !== "failed",
|
||||
status,
|
||||
requested: { path, sizeBytes },
|
||||
before,
|
||||
after,
|
||||
actions,
|
||||
errors,
|
||||
} satisfies SwapEnsureResult;
|
||||
}
|
||||
throw new Error("server swap command must be one of: status, ensure");
|
||||
}
|
||||
@@ -23,6 +23,9 @@ function isValidEgressPort(port: number): boolean {
|
||||
return Number.isInteger(port) && port > 0 && port <= 65_535;
|
||||
}
|
||||
|
||||
const egressTcpConnectTimeoutMs = 15_000;
|
||||
const egressTcpIdleTimeoutMs = 600_000;
|
||||
|
||||
function sendEgressClose(provider: ProviderSocket, connectionId: string, error?: string): void {
|
||||
const message: CoreEgressTcpCloseMessage = error === undefined
|
||||
? { type: "egress_tcp_close", connectionId }
|
||||
@@ -30,13 +33,31 @@ function sendEgressClose(provider: ProviderSocket, connectionId: string, error?:
|
||||
wsSendJson(provider, message);
|
||||
}
|
||||
|
||||
function clearConnectionTimers(connection: EgressTcpConnection): void {
|
||||
if (connection.connectTimer !== null) clearTimeout(connection.connectTimer);
|
||||
if (connection.idleTimer !== null) clearTimeout(connection.idleTimer);
|
||||
connection.connectTimer = null;
|
||||
connection.idleTimer = null;
|
||||
}
|
||||
|
||||
function closeEgressTcpConnection(providerId: string, connectionId: string, error?: string): void {
|
||||
const key = egressTcpKey(providerId, connectionId);
|
||||
const connection = ctx.activeEgressTcpConnections.get(key);
|
||||
if (connection === undefined) return;
|
||||
ctx.activeEgressTcpConnections.delete(key);
|
||||
clearConnectionTimers(connection);
|
||||
connection.socket.destroy();
|
||||
if (error !== undefined) sendEgressClose(connection.provider, connectionId, error);
|
||||
if (error !== undefined) {
|
||||
logger("warn", "egress_tcp_connection_closed", { providerId, connectionId, error });
|
||||
sendEgressClose(connection.provider, connectionId, error);
|
||||
}
|
||||
}
|
||||
|
||||
function refreshConnectionIdle(connection: EgressTcpConnection): void {
|
||||
if (connection.idleTimer !== null) clearTimeout(connection.idleTimer);
|
||||
connection.idleTimer = setTimeout(() => {
|
||||
closeEgressTcpConnection(connection.providerId, connection.connectionId, "egress tcp idle timeout");
|
||||
}, egressTcpIdleTimeoutMs);
|
||||
}
|
||||
|
||||
export function handleEgressTcpOpen(ws: ProviderSocket, message: ProviderEgressTcpOpenMessage): void {
|
||||
@@ -49,13 +70,32 @@ export function handleEgressTcpOpen(ws: ProviderSocket, message: ProviderEgressT
|
||||
const key = egressTcpKey(message.providerId, message.connectionId);
|
||||
closeEgressTcpConnection(message.providerId, message.connectionId);
|
||||
const socket = connectTcp({ host, port });
|
||||
const connection: EgressTcpConnection = { providerId: message.providerId, connectionId: message.connectionId, socket, provider: ws };
|
||||
const connection: EgressTcpConnection = {
|
||||
providerId: message.providerId,
|
||||
connectionId: message.connectionId,
|
||||
socket,
|
||||
provider: ws,
|
||||
connectTimer: null,
|
||||
idleTimer: null,
|
||||
};
|
||||
ctx.activeEgressTcpConnections.set(key, connection);
|
||||
connection.connectTimer = setTimeout(() => {
|
||||
closeEgressTcpConnection(message.providerId, message.connectionId, "egress tcp connect timeout");
|
||||
}, egressTcpConnectTimeoutMs);
|
||||
refreshConnectionIdle(connection);
|
||||
socket.on("connect", () => {
|
||||
if (ctx.activeEgressTcpConnections.get(key) !== connection) return;
|
||||
if (connection.connectTimer !== null) {
|
||||
clearTimeout(connection.connectTimer);
|
||||
connection.connectTimer = null;
|
||||
}
|
||||
refreshConnectionIdle(connection);
|
||||
const opened: CoreEgressTcpOpenedMessage = { type: "egress_tcp_opened", connectionId: message.connectionId };
|
||||
wsSendJson(ws, opened);
|
||||
});
|
||||
socket.on("data", (chunk) => {
|
||||
if (ctx.activeEgressTcpConnections.get(key) !== connection) return;
|
||||
refreshConnectionIdle(connection);
|
||||
const data: CoreEgressTcpDataMessage = {
|
||||
type: "egress_tcp_data",
|
||||
connectionId: message.connectionId,
|
||||
@@ -67,11 +107,13 @@ export function handleEgressTcpOpen(ws: ProviderSocket, message: ProviderEgressT
|
||||
socket.on("close", () => {
|
||||
if (ctx.activeEgressTcpConnections.get(key) !== connection) return;
|
||||
ctx.activeEgressTcpConnections.delete(key);
|
||||
clearConnectionTimers(connection);
|
||||
sendEgressClose(ws, message.connectionId);
|
||||
});
|
||||
socket.on("error", (error) => {
|
||||
if (ctx.activeEgressTcpConnections.get(key) !== connection) return;
|
||||
ctx.activeEgressTcpConnections.delete(key);
|
||||
clearConnectionTimers(connection);
|
||||
sendEgressClose(ws, message.connectionId, error.message);
|
||||
});
|
||||
}
|
||||
@@ -79,6 +121,7 @@ export function handleEgressTcpOpen(ws: ProviderSocket, message: ProviderEgressT
|
||||
export function handleEgressTcpData(message: ProviderEgressTcpDataMessage): void {
|
||||
const connection = ctx.activeEgressTcpConnections.get(egressTcpKey(message.providerId, message.connectionId));
|
||||
if (connection === undefined) return;
|
||||
refreshConnectionIdle(connection);
|
||||
connection.socket.write(Buffer.from(message.data, message.encoding === "base64" ? "base64" : "utf8"));
|
||||
}
|
||||
|
||||
@@ -90,6 +133,16 @@ export function closeEgressTcpConnectionsForProvider(providerId: string): void {
|
||||
for (const [key, connection] of ctx.activeEgressTcpConnections) {
|
||||
if (connection.providerId !== providerId) continue;
|
||||
ctx.activeEgressTcpConnections.delete(key);
|
||||
clearConnectionTimers(connection);
|
||||
connection.socket.destroy();
|
||||
}
|
||||
}
|
||||
|
||||
export function closeEgressTcpConnectionsForSocket(provider: ProviderSocket): void {
|
||||
for (const [key, connection] of ctx.activeEgressTcpConnections) {
|
||||
if (connection.provider !== provider) continue;
|
||||
ctx.activeEgressTcpConnections.delete(key);
|
||||
clearConnectionTimers(connection);
|
||||
connection.socket.destroy();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -381,6 +381,7 @@ function isK3sctlManagedMicroservice(service: MicroserviceConfig): boolean {
|
||||
function codeQueueK3sServiceIdForRequest(method: string, targetPath: string): string {
|
||||
const normalizedMethod = method.toUpperCase();
|
||||
if (targetPath === "/" || targetPath === "/health" || targetPath === "/live" || targetPath === "/api/dev-ready") return "code-queue-scheduler";
|
||||
if (targetPath === "/api/queues" || targetPath === "/api/tasks/overview") return "code-queue-scheduler";
|
||||
if (targetPath === "/api/oa/backfill" || targetPath === "/api/notifications/claudeqq/drain" || targetPath === "/api/notifications/claudeqq/backfill") return "code-queue-scheduler";
|
||||
if (targetPath === "/api/judge/probe" || targetPath === "/api/judge/self-test" || targetPath === "/api/queue-order/self-test" || targetPath === "/api/reference-injection/self-test" || targetPath === "/api/trace-port/self-test") return "code-queue-scheduler";
|
||||
if (/^\/api\/tasks\/[^/]+\/(?:steer|interrupt)$/u.test(targetPath)) return "code-queue-scheduler";
|
||||
@@ -778,7 +779,8 @@ async function microserviceTunnelSelfTestResponse(service: MicroserviceConfig):
|
||||
const bodyRecord = isPlainRecord(body) ? body : {};
|
||||
const hasRequestId = typeof bodyRecord.requestId === "string" && bodyRecord.requestId.length > 0;
|
||||
const hasStage = typeof bodyRecord.stage === "string" && bodyRecord.stage.length > 0;
|
||||
const ok = response.status === 502 && hasRequestId && hasStage && headers.requestId === bodyRecord.requestId;
|
||||
const expectedStatus = response.status === 502 || response.status === 504;
|
||||
const ok = expectedStatus && hasRequestId && hasStage && headers.requestId === bodyRecord.requestId;
|
||||
return jsonResponse({
|
||||
ok,
|
||||
serviceId: service.id,
|
||||
@@ -787,7 +789,7 @@ async function microserviceTunnelSelfTestResponse(service: MicroserviceConfig):
|
||||
expectedFailure: true,
|
||||
status: response.status,
|
||||
checks: {
|
||||
expectedStatus: response.status === 502,
|
||||
expectedStatus,
|
||||
bodyHasRequestId: hasRequestId,
|
||||
bodyHasStage: hasStage,
|
||||
headerHasRequestId: typeof headers.requestId === "string" && headers.requestId.length > 0,
|
||||
@@ -956,7 +958,7 @@ async function providerHttpTunnelMicroserviceResponse(
|
||||
const durationMs = Date.now() - startedAt;
|
||||
if (message === null) {
|
||||
attempts.push({ attempt, requestId, ok: false, reason: reason ?? "timeout", durationMs, timeoutMs });
|
||||
if (retryable && tunnelFailureRetryable(reason) && attempt < maxAttempts) {
|
||||
if (retryable && reason !== "timeout" && tunnelFailureRetryable(reason) && attempt < maxAttempts) {
|
||||
logger("warn", "http_tunnel_retry", {
|
||||
providerId: service.providerId,
|
||||
serviceId: service.id,
|
||||
|
||||
@@ -166,6 +166,8 @@ export interface EgressTcpConnection {
|
||||
connectionId: string;
|
||||
socket: Socket;
|
||||
provider: ProviderSocket;
|
||||
connectTimer: ReturnType<typeof setTimeout> | null;
|
||||
idleTimer: ReturnType<typeof setTimeout> | null;
|
||||
}
|
||||
|
||||
export type HttpTunnelFailureReason =
|
||||
|
||||
@@ -270,6 +270,37 @@ spec:
|
||||
shift
|
||||
curl -fsS --cacert "$kube_ca" -H "Authorization: Bearer $kube_token" -X "$method" "$@"
|
||||
}
|
||||
delete_if_exists() {
|
||||
local path="$1"
|
||||
local code
|
||||
code="$(curl -sS -o /tmp/unidesk-ci-delete-response -w "%{http_code}" --cacert "$kube_ca" -H "Authorization: Bearer $kube_token" -X DELETE "$kube_api/$path")"
|
||||
if [ "$code" = "200" ] || [ "$code" = "202" ] || [ "$code" = "404" ]; then
|
||||
return 0
|
||||
fi
|
||||
cat /tmp/unidesk-ci-delete-response >&2
|
||||
return 1
|
||||
}
|
||||
wait_deleted() {
|
||||
local path="$1"
|
||||
local deadline=$((SECONDS + 120))
|
||||
local code
|
||||
while [ "$SECONDS" -lt "$deadline" ]; do
|
||||
code="$(curl -sS -o /tmp/unidesk-ci-get-response -w "%{http_code}" --cacert "$kube_ca" -H "Authorization: Bearer $kube_token" "$kube_api/$path")"
|
||||
if [ "$code" = "404" ]; then
|
||||
return 0
|
||||
fi
|
||||
if [ "$code" != "200" ]; then
|
||||
cat /tmp/unidesk-ci-get-response >&2
|
||||
return 1
|
||||
fi
|
||||
sleep 2
|
||||
done
|
||||
echo "timeout waiting for $path deletion" >&2
|
||||
return 1
|
||||
}
|
||||
delete_if_exists "apis/apps/v1/namespaces/$kube_namespace/deployments/code-queue-ci-read"
|
||||
delete_if_exists "api/v1/namespaces/$kube_namespace/services/code-queue-ci-read"
|
||||
wait_deleted "apis/apps/v1/namespaces/$kube_namespace/deployments/code-queue-ci-read"
|
||||
cat >/tmp/code-queue-ci-read-deployment.yaml <<YAML
|
||||
apiVersion: apps/v1
|
||||
kind: Deployment
|
||||
@@ -287,6 +318,8 @@ spec:
|
||||
matchLabels:
|
||||
app.kubernetes.io/name: code-queue
|
||||
app.kubernetes.io/component: ci-read
|
||||
strategy:
|
||||
type: Recreate
|
||||
template:
|
||||
metadata:
|
||||
labels:
|
||||
@@ -382,6 +415,13 @@ spec:
|
||||
periodSeconds: 5
|
||||
timeoutSeconds: 3
|
||||
failureThreshold: 20
|
||||
startupProbe:
|
||||
httpGet:
|
||||
path: /live
|
||||
port: http
|
||||
periodSeconds: 5
|
||||
timeoutSeconds: 3
|
||||
failureThreshold: 84
|
||||
livenessProbe:
|
||||
httpGet:
|
||||
path: /live
|
||||
@@ -396,11 +436,17 @@ spec:
|
||||
limits:
|
||||
memory: 1Gi
|
||||
volumeMounts:
|
||||
- name: source
|
||||
mountPath: /app
|
||||
- name: state
|
||||
mountPath: /var/lib/unidesk/code-queue-ci
|
||||
- name: logs
|
||||
mountPath: /var/log/unidesk
|
||||
volumes:
|
||||
- name: source
|
||||
hostPath:
|
||||
path: "$(workspaces.source.path)/repo"
|
||||
type: Directory
|
||||
- name: state
|
||||
emptyDir: {}
|
||||
- name: logs
|
||||
@@ -434,7 +480,7 @@ spec:
|
||||
-H "Content-Type: application/apply-patch+yaml" \
|
||||
--data-binary @/tmp/code-queue-ci-read-01 \
|
||||
"$kube_api/api/v1/namespaces/$kube_namespace/services/code-queue-ci-read?fieldManager=unidesk-ci&force=true" >/dev/null
|
||||
deadline=$((SECONDS + 180))
|
||||
deadline=$((SECONDS + 420))
|
||||
while [ "$SECONDS" -lt "$deadline" ]; do
|
||||
status="$(kube GET "$kube_api/apis/apps/v1/namespaces/$kube_namespace/deployments/code-queue-ci-read")"
|
||||
replicas="$(printf '%s' "$status" | jq -r '.spec.replicas // 1')"
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@unidesk/provider-gateway",
|
||||
"version": "0.2.21",
|
||||
"version": "0.2.22",
|
||||
"private": true,
|
||||
"type": "module",
|
||||
"scripts": {
|
||||
|
||||
@@ -29,7 +29,12 @@ interface Tunnel {
|
||||
method: string;
|
||||
opened: boolean;
|
||||
pending: Buffer[];
|
||||
pendingBytes: number;
|
||||
closed: boolean;
|
||||
createdAt: number;
|
||||
lastActivityAt: number;
|
||||
openTimer: ReturnType<typeof setTimeout> | null;
|
||||
idleTimer: ReturnType<typeof setTimeout> | null;
|
||||
}
|
||||
|
||||
export interface ProviderEgressProxyHandle {
|
||||
@@ -111,6 +116,10 @@ function proxyUrlFor(host: string, port: number): string {
|
||||
return `http://${host}:${port}`;
|
||||
}
|
||||
|
||||
const tunnelOpenTimeoutMs = 15_000;
|
||||
const tunnelIdleTimeoutMs = 600_000;
|
||||
const maxPendingBytes = 4 * 1024 * 1024;
|
||||
|
||||
export function startProviderEgressProxy(options: ProviderEgressProxyOptions): ProviderEgressProxyHandle {
|
||||
const proxyUrl = proxyUrlFor(options.listenHost, options.listenPort);
|
||||
const tunnels = new Map<string, Tunnel>();
|
||||
@@ -118,25 +127,73 @@ export function startProviderEgressProxy(options: ProviderEgressProxyOptions): P
|
||||
|
||||
const send = (message: EgressToCoreMessage): boolean => options.sendToCore(message);
|
||||
|
||||
const status = (): Record<string, JsonValue> => ({
|
||||
enabled: true,
|
||||
providerId: options.providerId,
|
||||
connected: options.isCoreConnected(),
|
||||
proxyUrl,
|
||||
listenHost: options.listenHost,
|
||||
listenPort: options.listenPort,
|
||||
activeTunnels: tunnels.size,
|
||||
channel: "provider-gateway",
|
||||
});
|
||||
const status = (): Record<string, JsonValue> => {
|
||||
const now = Date.now();
|
||||
const tunnelList = Array.from(tunnels.values());
|
||||
const ages = tunnelList.map((tunnel) => now - tunnel.createdAt);
|
||||
return {
|
||||
enabled: true,
|
||||
providerId: options.providerId,
|
||||
connected: options.isCoreConnected(),
|
||||
proxyUrl,
|
||||
listenHost: options.listenHost,
|
||||
listenPort: options.listenPort,
|
||||
activeTunnels: tunnels.size,
|
||||
pendingTunnels: tunnelList.filter((tunnel) => !tunnel.opened).length,
|
||||
oldestTunnelAgeMs: ages.length > 0 ? Math.max(...ages) : 0,
|
||||
openTimeoutMs: tunnelOpenTimeoutMs,
|
||||
idleTimeoutMs: tunnelIdleTimeoutMs,
|
||||
maxPendingBytes,
|
||||
channel: "provider-gateway",
|
||||
};
|
||||
};
|
||||
|
||||
const clearTunnelTimers = (tunnel: Tunnel): void => {
|
||||
if (tunnel.openTimer !== null) clearTimeout(tunnel.openTimer);
|
||||
if (tunnel.idleTimer !== null) clearTimeout(tunnel.idleTimer);
|
||||
tunnel.openTimer = null;
|
||||
tunnel.idleTimer = null;
|
||||
};
|
||||
|
||||
const destroyTunnel = (tunnel: Tunnel, notifyCore: boolean, error?: string): void => {
|
||||
tunnels.delete(tunnel.id);
|
||||
tunnel.closed = true;
|
||||
clearTunnelTimers(tunnel);
|
||||
tunnel.pending.splice(0);
|
||||
tunnel.pendingBytes = 0;
|
||||
if (!tunnel.client.destroyed) tunnel.client.destroy();
|
||||
if (notifyCore) send({ type: "egress_tcp_close", providerId: options.providerId, connectionId: tunnel.id, at: nowIso() });
|
||||
if (error !== undefined) {
|
||||
options.logger("warn", "egress_proxy_tunnel_closed", {
|
||||
connectionId: tunnel.id,
|
||||
opened: tunnel.opened,
|
||||
ageMs: Date.now() - tunnel.createdAt,
|
||||
error,
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
const closeTunnel = (id: string, error?: string): void => {
|
||||
const tunnel = tunnels.get(id);
|
||||
if (tunnel === undefined) return;
|
||||
tunnels.delete(id);
|
||||
tunnel.closed = true;
|
||||
if (!tunnel.client.destroyed) tunnel.client.destroy();
|
||||
send({ type: "egress_tcp_close", providerId: options.providerId, connectionId: id, at: nowIso() });
|
||||
if (error !== undefined) options.logger("warn", "egress_proxy_tunnel_closed", { connectionId: id, error });
|
||||
destroyTunnel(tunnel, true, error);
|
||||
};
|
||||
|
||||
const refreshTunnelIdle = (tunnel: Tunnel): void => {
|
||||
if (tunnel.closed) return;
|
||||
tunnel.lastActivityAt = Date.now();
|
||||
if (tunnel.idleTimer !== null) clearTimeout(tunnel.idleTimer);
|
||||
tunnel.idleTimer = setTimeout(() => closeTunnel(tunnel.id, "egress proxy idle timeout"), tunnelIdleTimeoutMs);
|
||||
};
|
||||
|
||||
const queuePendingChunk = (tunnel: Tunnel, chunk: Buffer): boolean => {
|
||||
if (tunnel.closed) return false;
|
||||
tunnel.pending.push(chunk);
|
||||
tunnel.pendingBytes += chunk.byteLength;
|
||||
refreshTunnelIdle(tunnel);
|
||||
if (tunnel.pendingBytes <= maxPendingBytes) return true;
|
||||
closeTunnel(tunnel.id, "egress proxy pending buffer exceeded");
|
||||
return false;
|
||||
};
|
||||
|
||||
const handleCoreMessage = (message: EgressFromCoreMessage): boolean => {
|
||||
@@ -144,24 +201,29 @@ export function startProviderEgressProxy(options: ProviderEgressProxyOptions): P
|
||||
if (message.type === "egress_tcp_opened") {
|
||||
if (tunnel === undefined || tunnel.closed) return true;
|
||||
tunnel.opened = true;
|
||||
if (tunnel.openTimer !== null) {
|
||||
clearTimeout(tunnel.openTimer);
|
||||
tunnel.openTimer = null;
|
||||
}
|
||||
refreshTunnelIdle(tunnel);
|
||||
if (tunnel.method === "CONNECT") {
|
||||
tunnel.client.write("HTTP/1.1 200 Connection Established\r\nProxy-Agent: UniDesk-ProviderGateway\r\n\r\n");
|
||||
}
|
||||
for (const chunk of tunnel.pending.splice(0)) {
|
||||
send({ type: "egress_tcp_data", providerId: options.providerId, connectionId: tunnel.id, data: chunk.toString("base64"), encoding: "base64", at: nowIso() });
|
||||
}
|
||||
tunnel.pendingBytes = 0;
|
||||
return true;
|
||||
}
|
||||
if (message.type === "egress_tcp_data") {
|
||||
if (tunnel === undefined || tunnel.client.destroyed) return true;
|
||||
refreshTunnelIdle(tunnel);
|
||||
tunnel.client.write(Buffer.from(message.data, message.encoding === "base64" ? "base64" : "utf8"));
|
||||
return true;
|
||||
}
|
||||
if (message.type === "egress_tcp_close") {
|
||||
if (tunnel !== undefined) {
|
||||
tunnels.delete(message.connectionId);
|
||||
tunnel.closed = true;
|
||||
if (!tunnel.client.destroyed) tunnel.client.destroy();
|
||||
destroyTunnel(tunnel, false, message.error);
|
||||
}
|
||||
if (message.error !== undefined && message.error.length > 0) {
|
||||
options.logger("warn", "egress_proxy_remote_close", { connectionId: message.connectionId, error: message.error });
|
||||
@@ -222,14 +284,28 @@ export function startProviderEgressProxy(options: ProviderEgressProxyOptions): P
|
||||
fail("400 Bad Request", "unsupported proxy target\n");
|
||||
return;
|
||||
}
|
||||
const tunnel: Tunnel = { id, client, method: parsed.method, opened: false, pending: [], closed: false };
|
||||
const createdAt = Date.now();
|
||||
const tunnel: Tunnel = {
|
||||
id,
|
||||
client,
|
||||
method: parsed.method,
|
||||
opened: false,
|
||||
pending: [],
|
||||
pendingBytes: 0,
|
||||
closed: false,
|
||||
createdAt,
|
||||
lastActivityAt: createdAt,
|
||||
openTimer: null,
|
||||
idleTimer: null,
|
||||
};
|
||||
tunnels.set(id, tunnel);
|
||||
client.on("data", (nextChunk) => {
|
||||
const nextBuffer = Buffer.isBuffer(nextChunk) ? nextChunk : Buffer.from(nextChunk);
|
||||
if (!tunnel.opened) {
|
||||
tunnel.pending.push(nextBuffer);
|
||||
queuePendingChunk(tunnel, nextBuffer);
|
||||
return;
|
||||
}
|
||||
refreshTunnelIdle(tunnel);
|
||||
send({ type: "egress_tcp_data", providerId: options.providerId, connectionId: id, data: nextBuffer.toString("base64"), encoding: "base64", at: nowIso() });
|
||||
});
|
||||
client.on("close", () => closeTunnel(id));
|
||||
@@ -238,10 +314,13 @@ export function startProviderEgressProxy(options: ProviderEgressProxyOptions): P
|
||||
if (!opened) {
|
||||
tunnels.delete(id);
|
||||
tunnel.closed = true;
|
||||
clearTunnelTimers(tunnel);
|
||||
fail("503 Service Unavailable", "provider-gateway core channel is not connected\n");
|
||||
return;
|
||||
}
|
||||
if (firstPayload !== null) tunnel.pending.push(firstPayload);
|
||||
tunnel.openTimer = setTimeout(() => closeTunnel(id, "egress proxy open timeout"), tunnelOpenTimeoutMs);
|
||||
refreshTunnelIdle(tunnel);
|
||||
if (firstPayload !== null) queuePendingChunk(tunnel, firstPayload);
|
||||
});
|
||||
client.on("error", () => undefined);
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user