feat: add provider ssh bridge

This commit is contained in:
Codex
2026-05-05 01:24:32 +00:00
parent 2d6829ed44
commit f6d0bd1e3b
18 changed files with 1014 additions and 34 deletions
+3 -2
View File
@@ -10,9 +10,10 @@ UniDesk 是一个以主 server 为统一入口的分布式工作平台;本文
- `bun scripts/cli.ts server start`:以异步 job 启动 database、backend-core、frontend、provider-gateway,部署规则见 `docs/reference/deployment.md`
- `bun scripts/cli.ts server status`:查询固定端口、容器状态、健康检查和访问 URL,判定标准见 `docs/reference/deployment.md`
- `bun scripts/cli.ts server logs`:分页返回文件日志与 Docker 日志尾部,日志规则见 `docs/reference/observability.md`
- `bun scripts/cli.ts ssh <providerId> [ssh-like args...]`:通过 provider-gateway 的 Host SSH / WSL SSH 维护桥打开近似原生 ssh 的交互会话或远端命令,使用规则见 `docs/reference/cli.md``docs/reference/provider-gateway.md`
- `bun scripts/cli.ts server stop`:以异步 job 停止固定 Compose 项目中的全部 UniDesk 服务,停止后用 `server status` 复核。
- `bun scripts/cli.ts job list` / `bun scripts/cli.ts job status latest`:查询 `.state/jobs/` 中的异步任务状态,job 机制见 `docs/reference/cli.md`
- `bun scripts/cli.ts debug health` / `bun scripts/cli.ts debug dispatch`:通过 Docker 内网 core、真实 HTTP、WebSocket、系统指标Docker 状态流程调试健康检查任务下发,调试规则见 `docs/reference/cli.md`
- `bun scripts/cli.ts debug health` / `bun scripts/cli.ts debug dispatch` / `bun scripts/cli.ts debug task`:通过 Docker 内网 core、真实 HTTP、WebSocket、系统指标Docker 状态和 Host SSH 维护桥流程调试健康检查任务下发与任务结果,调试规则见 `docs/reference/cli.md`
- `bun scripts/cli.ts e2e run`:验证公网 frontend/provider ingress、内网 core/database、provider-gateway 自接入、资源指标曲线、Docker 状态快照、provider.upgrade 预检和 Playwright 登录页面,验收规则见 `docs/reference/e2e.md`
## Runtime
@@ -20,7 +21,7 @@ UniDesk 是一个以主 server 为统一入口的分布式工作平台;本文
- `bun`TypeScript 运行时固定使用 Bun,组件入口和 CLI 都直接运行 `.ts` 文件,约束见 `docs/reference/config.md`
- `docker-compose.yml`:主 server 统一编排 core、frontend、database 和本机 provider gateway,且只公开 frontend/provider ingress,服务拓扑见 `docs/reference/deployment.md`
- `src/components/frontend`:前端源码固定使用 TypeScript + React,采用高信息密度工业控制台设计,资源节点含任务管理器风格资源监控与 Docker Desktop 风格状态页,界面规则见 `docs/reference/frontend.md`
- `src/components/provider-gateway`:当前主 server `74.48.78.17` 也作为 provider gateway 接入 UniDesk,外部节点通过 `ws://74.48.78.17:18082/ws/provider` 接入,部署与 Playwright 公网前端验证方法见 `docs/reference/provider-gateway.md`
- `src/components/provider-gateway`:当前主 server `74.48.78.17` 也作为 provider gateway 接入 UniDesk,外部节点通过 `ws://74.48.78.17:18082/ws/provider` 接入,并可配置维护专用 Host SSH / WSL SSH 桥,部署与 Playwright 公网前端验证方法见 `docs/reference/provider-gateway.md`
- `docs/reference/e2e.md`:交付前必须执行的自测门禁、Playwright 登录与 JSON 展示断言、数据库命名卷持久化要求。
## Architecture Docs
+4
View File
@@ -67,3 +67,7 @@
## T16 任务历史诊断信息
阅读 `AGENTS.md`(本项目 `AGENTS.md` 同时承担 `SKILL.md``scripts/cli.ts` 的解释职责),然后用 cli 手动测试以下内容:运行 `bun scripts/cli.ts e2e run`,确认 `frontend:task-history-diagnostics` passed;再登录 frontend,进入 `任务调度 / 任务历史`,确认每个任务行都能看到状态、任务命令和 id、Provider、任务耗时、载荷摘要、诊断信息、更新时间和 `查看原始JSON` 按钮。失败任务必须在默认表格中显示失败原因以及 exit code、timeout、previous status 等关键字段,完整 result 只能点击 `查看原始JSON` 查看。
## T17 Provider Gateway Host SSH / WSL SSH 维护桥
阅读 `AGENTS.md`(本项目 `AGENTS.md` 同时承担 `SKILL.md``scripts/cli.ts` 的解释职责),然后用 cli 手动测试以下内容:确认目标 provider-gateway 已只读挂载维护私钥目录到 `/run/host-ssh`,目标宿主或 WSL 的 sshd 已启动且 `authorized_keys` 包含对应公钥;运行 `bun scripts/cli.ts debug dispatch main-server host.ssh --wait-ms 15000`,再运行 `bun scripts/cli.ts debug task latest`,确认任务通过真实 WebSocket 下发、状态为 `succeeded`、result 中 `probeLine` 包含 `UNIDESK_SSH_TEST``exitCode` 为 0、`hostSshKeyPresent` 为 true。随后运行 `bun scripts/cli.ts ssh main-server hostname`,确认输出是远端 hostname 且进程 exit code 为 0;再用 `printf 'pwd\nexit\n' | bun scripts/cli.ts ssh main-server` 验证无命令参数时能进入并退出远端登录 shell。对 D518 这类无公网 SSH 的 WSL 节点,使用同一命令替换 Provider ID 为 `D518`,必要时先用 debug dispatch 加 `--cwd /home/ubuntu` 覆盖远端工作目录,只能通过 provider-gateway 自连维护桥验证,不得把主 server 直连节点公网 22 端口作为通过标准。
+1
View File
@@ -130,6 +130,7 @@ services:
volumes:
- /var/run/docker.sock:/var/run/docker.sock
- ${UNIDESK_PROVIDER_UPGRADE_HOST_PROJECT_ROOT}:${UNIDESK_PROVIDER_UPGRADE_WORKSPACE_PATH}:ro
- ${UNIDESK_HOST_SSH_KEY_DIR}:/run/host-ssh:ro
- ${UNIDESK_LOG_DIR}:/var/log/unidesk
extra_hosts:
- "host.docker.internal:host-gateway"
+1 -1
View File
@@ -48,7 +48,7 @@
- The main server never initiates connections to nodes, perfectly adapting to environments without public IP and behind NAT
- Interaction with Local Execution Environment
- The primary path for automated task dispatching and execution is via the local Docker socket
- Access to the local environment via WSL SSH is reserved solely as an auxiliary path for emergency maintenance and troubleshooting
- Access to the local environment via WSL SSH is reserved solely as an auxiliary path for emergency maintenance and troubleshooting, exposed only as bounded `host.ssh` probe/exec tasks
- Automating task deployment or dispatching through the WSL SSH channel is forbidden
- Connection Management
- When registering, a node carries an authentication token to verify its identity and declares resources such as GPU/CPU
+11 -2
View File
@@ -11,8 +11,9 @@ UniDesk 的统一 CLI 入口是根目录 `scripts/cli.ts`,运行方式固定
- `server stop` 创建异步 job,在后台停止固定 Compose project 中的全部 UniDesk 服务。
- `server status` 查询公开端口、内部端口、Compose 容器、core/frontend/provider/database 健康检查和访问 URL。
- `server logs` 返回 `logs/` 文件日志和 Docker 容器日志的尾部,默认限制输出大小,避免日志爆炸。
- `ssh <providerId> [ssh-like args...]` 通过 backend-core 内网 WebSocket broker 和 provider-gateway 的 Host SSH / WSL SSH 维护桥连接目标节点;无后续参数时进入远端登录 shell,有后续参数时按 ssh 远端命令体验执行并返回远端 exit code。
- `job list``job status` 查询 `.state/jobs/` 文件系统状态,是异步命令的可观测入口。
- `debug health``debug dispatch` 走真实内部 core、WebSocket、数据库、provider、系统指标Docker 状态流程,只用于开发调试,不写入 `TEST.md` 的正式验收步骤。
- `debug health``debug dispatch``debug task` 走真实内部 core、WebSocket、数据库、provider、系统指标Docker 状态和 Host SSH 维护桥流程,只用于开发调试,不写入 `TEST.md` 的正式验收步骤。
- `e2e run` 使用 publicHost 派生的公开 frontend/provider ingress URL,并通过 Docker 内网验证 core API、PostgreSQL、provider self-connection、系统指标曲线、Docker 状态快照、provider.upgrade 预检和 Playwright 前端页面,是交付前的自动化 E2E 门禁。
## Async Job State
@@ -25,4 +26,12 @@ UniDesk 的统一 CLI 入口是根目录 `scripts/cli.ts`,运行方式固定
## Debug Contract
`debug` 子命令必须复用真实模块与真实端点,禁止维护平行实现。`debug health` 会摘要展示 `/api/nodes/system-status``/api/nodes/docker-status`,避免输出完整快照造成信息爆炸。`debug dispatch` 会在 backend-core 容器内调用内部 `/api/dispatch`core 再通过 WebSocket 将 `docker.ps``provider.upgrade``echo` 任务下发给 provider gateway,因此它可以验证核心调度闭环,同时不需要公开 core REST API。
`debug` 子命令必须复用真实模块与真实端点,禁止维护平行实现。`debug health` 会摘要展示 `/api/nodes/system-status``/api/nodes/docker-status`,避免输出完整快照造成信息爆炸。`debug dispatch` 会在 backend-core 容器内调用内部 `/api/dispatch`core 再通过 WebSocket 将 `docker.ps``provider.upgrade``host.ssh``echo` 任务下发给 provider gateway,因此它可以验证核心调度闭环,同时不需要公开 core REST API。`host.ssh` 默认使用 `mode: "probe"` 做短超时维护桥自检;需要执行明确命令时使用 `--ssh-command` 进入 `mode: "exec"`,并配合 `--wait-ms``debug task` 查看 stdout、stderr、exitCode 与 probeLine。
## SSH Command
`ssh <providerId> [ssh-like args...]` 是面向人的终端透传入口,不包装 JSON 输出。CLI 会在宿主机启动一个 `docker exec -i unidesk-backend-core bun -e ...` brokerbroker 只连接 backend-core 的 Docker 内网 `/ws/ssh`core 再把 stdin/stdout/stderr 流量通过目标 provider 的既有 WebSocket 转发到 provider-gatewayprovider-gateway 最终执行维护用 `ssh -tt` 连接宿主或 WSL sshd。该入口不新增 core 公网端口,不暴露 database,也不改变 frontend/provider ingress 之外的公网边界。
`bun scripts/cli.ts ssh D518` 应表现为登录 D518 WSL 的 shell`bun scripts/cli.ts ssh D518 hostname` 应像 `ssh D518 hostname` 一样只输出远端命令结果并返回远端 exit code。Provider ID 前的目标选择由 UniDesk 节点清单决定,`-p``-i``-l``-o` 等传统 ssh 传输参数由 provider-gateway 部署配置统一管理,CLI 会兼容性消费这些参数但不会覆盖节点侧维护桥配置。
core 只允许声明了 `host.ssh` capability 的 provider 使用 `ssh` 透传或 `host.ssh` dispatch;旧 provider 不支持该能力时必须快速失败并输出错误,不能把未知命令误判成 `echo` 成功。
+4
View File
@@ -18,6 +18,10 @@ TypeScript 运行时固定为 Bun。根目录 CLI、backend-core、frontend 和
`providerGateway.metrics.diskPath` 指定资源监控页的硬盘采样路径,默认是 `/``providerGateway.upgrade` 定义远程升级 provider-gateway 所需的 Compose project、service、仓库挂载路径、派生 env 文件和 updater runner 镜像;这些字段由 CLI 写入 `.state/docker-compose.env`provider-gateway 只通过 WebSocket 接受 `provider.upgrade` 调度,不从隐藏环境或默认值静默补齐。
## SSH Forwarding
`sshForwarding` 定义 provider-gateway 维护专用 Host SSH / WSL SSH 桥的显式配置。CLI 会把 `sshForwarding.keyDir` 写入 `.state/docker-compose.env``UNIDESK_HOST_SSH_KEY_DIR`,Compose 将该目录只读挂载到 provider-gateway 的 `/run/host-ssh`,并把 `sshForwarding.host``sshForwarding.port``sshForwarding.user` 映射为 `HOST_SSH_HOST``HOST_SSH_PORT``HOST_SSH_USER`。目录中必须存在 `id_ed25519` 私钥且权限收紧,provider-gateway 才会把 `hostSshKeyPresent` 上报为 true,并允许 `host.ssh` 维护探测;该桥只用于故障诊断和 WSL 维护,不替代 Docker socket 调度。
## Compose Env Generation
Docker Compose 本身不读取 JSON,因此 CLI 会从 `config.json` 生成 `.state/docker-compose.env`。该文件是派生状态,不应手写;如需改端口、token、provider 标签、登录凭据或主机名,应修改 `config.json` 后重新运行 CLI。CLI 会在保留当前日志前缀的同时刷新新增配置键,避免旧 env 文件遗漏字段。
+1 -1
View File
@@ -7,7 +7,7 @@
- `database` 使用 `postgres:16-alpine`,数据保存到 named volume `unidesk_pgdata_10gb`,初始化 SQL 位于 `src/components/database/init/`
- `backend-core` 是无状态核心服务,提供 Docker 内网 REST API、provider ingress WebSocket、任务调度入口和数据库访问层。
- `frontend` 是唯一公开 Web 控制台,提供登录、从 TSX 转译出的 React 应用资产和到 backend-core 的同源代理。
- `provider-gateway` 是当前主 server 的本机计算节点代理,通过 WebSocket 主动连到 provider ingress,挂载 `/var/run/docker.sock` 作为自动任务执行主路径,并周期性上报系统资源指标与 Docker daemon 状态。
- `provider-gateway` 是当前主 server 的本机计算节点代理,通过 WebSocket 主动连到 provider ingress,挂载 `/var/run/docker.sock` 作为自动任务执行主路径,并周期性上报系统资源指标与 Docker daemon 状态;维护用 Host SSH / WSL SSH 私钥目录只读挂载到 `/run/host-ssh`,不得作为自动任务调度主路径
## Public Exposure Boundary
+9 -3
View File
@@ -10,7 +10,7 @@ Provider Gateway 是计算节点侧容器。它只主动连出到主 server 暴
当前主 server 公网 IP 是 `74.48.78.17``config.json` 中的 `network.publicHost` 必须保持为该地址;公网 frontend 入口是 `http://74.48.78.17:18081/`provider gateway 对外接入入口是 `ws://74.48.78.17:18082/ws/provider`provider ingress 健康检查是 `http://74.48.78.17:18082/health`。主 server 本机 provider 由根目录 `docker-compose.yml``provider-gateway` 服务启动,容器内使用 Docker 内网地址 `ws://backend-core:8081/ws/provider` 自接入;外部计算节点部署 provider-gateway 时必须改用公网 provider ingress URL,并复用 `config.json` / `.state/docker-compose.env` 中的 provider token、心跳间隔和重连参数。
计算节点部署 provider-gateway 的最小方法是:准备可运行 `unidesk_provider-gateway` 镜像的 Docker 环境,为节点分配唯一 `PROVIDER_ID` 与可读 `PROVIDER_NAME`,设置 `PROVIDER_SERVER_URL=ws://74.48.78.17:18082/ws/provider``PROVIDER_TOKEN``PROVIDER_LABELS_JSON``HEARTBEAT_INTERVAL_MS``RECONNECT_BASE_MS``RECONNECT_MAX_MS`,并挂载 `/var/run/docker.sock:/var/run/docker.sock` 作为 Docker 状态采集、任务执行和远程升级的唯一自动化通道。需要支持 `provider.upgrade` 的节点还必须设置 `PROVIDER_UPGRADE_*` 环境变量,把节点上的 UniDesk 仓库只读挂载到 `PROVIDER_UPGRADE_WORKSPACE_PATH`,并确保升级命令只重建 `provider-gateway` service,不影响 database、backend-core、frontend。
计算节点部署 provider-gateway 的最小方法是:准备可运行 `unidesk_provider-gateway` 镜像的 Docker 环境,为节点分配唯一 `PROVIDER_ID` 与可读 `PROVIDER_NAME`,设置 `PROVIDER_SERVER_URL=ws://74.48.78.17:18082/ws/provider``PROVIDER_TOKEN``PROVIDER_LABELS_JSON``HEARTBEAT_INTERVAL_MS``RECONNECT_BASE_MS``RECONNECT_MAX_MS`,并挂载 `/var/run/docker.sock:/var/run/docker.sock` 作为 Docker 状态采集、任务执行和远程升级的唯一自动化通道。需要支持 `provider.upgrade` 的节点还必须设置 `PROVIDER_UPGRADE_*` 环境变量,把节点上的 UniDesk 仓库只读挂载到 `PROVIDER_UPGRADE_WORKSPACE_PATH`,并确保升级命令只重建 `provider-gateway` service,不影响 database、backend-core、frontend。需要维护桥连接 WSL 的节点必须额外设置 `HOST_SSH_HOST=host.docker.internal``HOST_SSH_PORT=22``HOST_SSH_USER=<WSL 用户>``HOST_SSH_KEY=/run/host-ssh/id_ed25519``HOST_REMOTE_CWD=/home/<WSL 用户>`,并把只含维护私钥的宿主目录只读挂载到 `/run/host-ssh`
## WSL Compute Node Deployment
@@ -36,7 +36,7 @@ provider-gateway 部署是否成功必须以 UniDesk frontend 中可见的 Provi
WSL 节点还应补充一次真实调度验证:向该 `PROVIDER_ID` 下发 `docker.ps`,任务必须从 `dispatched` 进入 `succeeded`,并在结果中看到 WSL Docker daemon 返回的容器列表;对于容器化运行的 provider-gateway,列表中通常应包含 `unidesk-provider-gateway-<PROVIDER_ID>`。这一步可以同时证明 provider WebSocket、服务端任务路由、节点侧 Docker socket 和结果回传链路都已贯通。
自动化验证必须使用 Playwright 访问公网 frontend,而不是在容器内直接调 core API 代替浏览器验收。标准命令是 `bun scripts/cli.ts e2e run`;该命令会让 Playwright 打开公网 `http://74.48.78.17:18081/`、登录、抓取页面中的 Provider 信息和 `查看原始JSON` 内容,并检查 Provider 自接入、资源指标、Docker 状态和 `provider.upgrade` 预检。外部新增节点的人工验收应复用同一套前端路径:先确认 Provider 信息出现在节点清单,再确认资源监控和 Docker 状态页面有该节点的数据,最后通过任务调度向该 Provider 下发 `echo``docker.ps` 并在任务历史中查看耗时、状态和失败原因。
自动化验证必须使用 Playwright 访问公网 frontend,而不是在容器内直接调 core API 代替浏览器验收。标准命令是 `bun scripts/cli.ts e2e run`;该命令会让 Playwright 打开公网 `http://74.48.78.17:18081/`、登录、抓取页面中的 Provider 信息和 `查看原始JSON` 内容,并检查 Provider 自接入、资源指标、Docker 状态和 `provider.upgrade` 预检。外部新增节点的人工验收应复用同一套前端路径:先确认 Provider 信息出现在节点清单,再确认资源监控和 Docker 状态页面有该节点的数据,最后通过任务调度向该 Provider 下发 `echo``docker.ps` 或维护专用 `host.ssh` probe,并在任务历史中查看耗时、状态、stdout/stderr 摘要和失败原因。
## Provider Ingress
@@ -60,4 +60,10 @@ backend-core 可以通过真实 WebSocket 调度向在线 provider 下发 `provi
## Host SSH Maintenance Bridge
宿主 SSH 转发只作为应急维护辅助路径,不用于自动任务调度。实现参考 `../web-terminal` 的经验:容器内使用只读挂载的私钥,通过 `ssh -tt` 主动连接宿主 sshd,并设置 `StrictHostKeyChecking=accept-new``ServerAliveInterval``ServerAliveCountMax`本仓库保留 `src/components/provider-gateway/scripts/host-ssh-shell.sh` 作为维护桥接脚本,默认 Compose 不挂载私钥,避免把 SSH 路径误用为调度通道
宿主 SSH / WSL SSH 转发只作为应急维护辅助路径,不用于自动计算任务调度。实现参考 `../web-terminal` 的经验:容器内使用只读挂载的私钥,主动连接宿主或 WSL sshd,并设置 `BatchMode=yes``StrictHostKeyChecking=accept-new``ServerAliveInterval=20``ServerAliveCountMax=3`主 server Compose 会把 `config.json``sshForwarding.keyDir` 只读挂载为 `/run/host-ssh`provider 标签会上报 `hostSshConfigured``hostSshKeyPresent``hostSshTarget`,便于在前端节点清单确认维护桥是否具备条件
维护桥通过真实 WebSocket dispatch 暴露为 `host.ssh` 命令。默认 payload 使用 `mode: "probe"`,远端只执行一个短命令并返回 `UNIDESK_SSH_TEST user=... host=... bridge=host.ssh cwd=...`;需要人工诊断时可以显式使用 `mode: "exec"``command` 字段执行有界命令。所有 `host.ssh` 执行都必须有超时,stdout/stderr 在 task result 中截断展示;自动升级和普通任务仍必须使用 Docker socket 与 `provider.upgrade`,不得把 WSL SSH 维护桥当成调度通道。
面向人的终端入口是 `bun scripts/cli.ts ssh <PROVIDER_ID> [ssh-like args...]`。无后续参数时打开远端登录 shell,有后续参数时执行远端命令并返回远端 exit code;该入口走 backend-core 内网 `/ws/ssh` broker 和 provider 既有 WebSocket,不新增公网 core 端口。传统 ssh 传输参数由 provider-gateway 环境变量统一控制,CLI 只负责把 Provider ID 后的远端命令和终端 stdin/stdout/stderr 透传过去。
验证 WSL SSH 桥时,先在目标 WSL 中启动 sshd 并确保维护公钥写入目标用户的 `authorized_keys`,再确认目标 provider 注册 labels 中 `unideskCapabilities` 包含 `host.ssh`。运行 `bun scripts/cli.ts debug dispatch <PROVIDER_ID> host.ssh --wait-ms 15000` 后,结果应在 `debug task latest` 或前端任务历史中显示 `status: succeeded``probeLine``UNIDESK_SSH_TEST``exitCode: 0`,并且目标节点 labels 中 `hostSshKeyPresent` 为 true;随后运行 `bun scripts/cli.ts ssh <PROVIDER_ID> hostname` 验证近似原生 ssh 的远端命令体验。如果 D518 这类 WSL 节点没有公网 SSH 入口,也必须通过这个 provider-gateway 自连维护桥完成验证,而不是要求主 server 直接连节点公网 22 端口;旧版 provider 未声明 `host.ssh` 时必须先升级 provider-gateway,否则 core 会拒绝 SSH 透传。
+1 -1
View File
@@ -58,7 +58,7 @@
- package.json
- tsconfig.json
- Dockerfile
- src/index.ts (WebSocket client, heartbeat, system/Docker telemetry, Docker adapter, provider.upgrade handler)
- src/index.ts (WebSocket client, heartbeat, system/Docker telemetry, Docker adapter, provider.upgrade handler, Host SSH / WSL SSH maintenance probe)
- scripts/host-ssh-shell.sh (Optional maintenance-only SSH bridge)
- database/ (PostgreSQL initialization and configuration)
- config/postgresql.conf
+54 -5
View File
@@ -1,10 +1,11 @@
import { readConfig } from "./src/config";
import { debugDispatch, debugHealth } from "./src/debug";
import { debugDispatch, debugHealth, debugTask, isDebugDispatchCommand, type DebugDispatchCommand } from "./src/debug";
import { stackLogs, stackStatus, startStack, stopStack } from "./src/docker";
import { runE2E } from "./src/e2e";
import { emitError, emitJson } from "./src/output";
import { jobWithTail, listJobs, readJob, runJob } from "./src/jobs";
import { runChecks } from "./src/check";
import { runSsh } from "./src/ssh";
const args = process.argv.slice(2);
const commandName = args.join(" ") || "help";
@@ -21,10 +22,12 @@ function help(): unknown {
{ command: "server stop", description: "Fire-and-forget docker-compose down for the fixed UniDesk stack." },
{ command: "server status", description: "Show fixed ports, containers, service health, and public URLs." },
{ command: "server logs [--tail-bytes N]", description: "Return bounded tails from file logs and docker logs." },
{ command: "ssh <providerId> [ssh-like args...]", description: "Open a Host SSH / WSL SSH maintenance session through the provider-gateway bridge." },
{ command: "job list", description: "List async jobs from .state/jobs." },
{ command: "job status <jobId|latest> [--tail-bytes N]", description: "Show job state with bounded stdout/stderr tails." },
{ command: "debug health", description: "Probe internal core, nodes, system/Docker status, frontend, provider ingress, and public boundary." },
{ command: "debug dispatch [providerId] [docker.ps|provider.upgrade|echo]", description: "Submit a real internal-core dispatch request for CLI debugging." },
{ command: "debug dispatch [providerId] [docker.ps|provider.upgrade|host.ssh|echo] [--wait-ms N]", description: "Submit a real internal-core dispatch request for CLI debugging." },
{ command: "debug task <taskId|latest>", description: "Read a dispatched task record from internal core for CLI debugging." },
{ command: "e2e run", description: "Run public frontend/provider, internal core/database, and Playwright login E2E checks." },
],
};
@@ -39,6 +42,41 @@ function numberOption(name: string, defaultValue: number): number {
return value;
}
function stringOption(name: string): string | undefined {
const index = args.indexOf(name);
if (index === -1) return undefined;
const raw = args[index + 1];
if (raw === undefined || raw.length === 0) throw new Error(`${name} requires a non-empty value`);
return raw;
}
function jsonOption(name: string): Record<string, unknown> | undefined {
const raw = stringOption(name);
if (raw === undefined) return undefined;
const parsed = JSON.parse(raw) as unknown;
if (typeof parsed !== "object" || parsed === null || Array.isArray(parsed)) throw new Error(`${name} must be a JSON object`);
return parsed as Record<string, unknown>;
}
function dispatchPayload(command: DebugDispatchCommand): Record<string, unknown> {
const explicit = jsonOption("--payload-json") ?? {};
if (command === "provider.upgrade") {
return { source: "cli-debug", mode: stringOption("--mode") ?? stringOption("--upgrade-mode") ?? "plan", ...explicit };
}
if (command === "host.ssh") {
const sshCommand = stringOption("--ssh-command");
return {
source: "cli-debug",
mode: sshCommand === undefined ? "probe" : "exec",
...(sshCommand === undefined ? {} : { command: sshCommand }),
...(stringOption("--cwd") === undefined ? {} : { cwd: stringOption("--cwd") }),
...(args.includes("--timeout-ms") ? { timeoutMs: numberOption("--timeout-ms", 8000) } : {}),
...explicit,
};
}
return { source: "cli-debug", ...explicit };
}
function latestJobId(): string {
const jobs = listJobs();
if (jobs.length === 0) throw new Error("No jobs found");
@@ -60,6 +98,12 @@ async function main(): Promise<void> {
const config = readConfig();
if (top === "ssh") {
const exitCode = await runSsh(config, sub ?? "", args.slice(2));
process.exitCode = exitCode;
return;
}
if (top === "config" && sub === "show") {
emitJson(commandName, { config });
return;
@@ -109,9 +153,14 @@ async function main(): Promise<void> {
return;
}
if (sub === "dispatch") {
const providerId = third ?? config.providerGateway.id;
const dispatchCommand = fourth === "docker.ps" || fourth === "provider.upgrade" || fourth === "echo" ? fourth : "docker.ps";
emitJson(commandName, await debugDispatch(config, providerId, dispatchCommand));
const providerId = isDebugDispatchCommand(third) ? config.providerGateway.id : third ?? config.providerGateway.id;
const commandArg = isDebugDispatchCommand(third) ? third : fourth;
const dispatchCommand = isDebugDispatchCommand(commandArg) ? commandArg : "docker.ps";
emitJson(commandName, await debugDispatch(config, providerId, dispatchCommand, dispatchPayload(dispatchCommand), numberOption("--wait-ms", 0)));
return;
}
if (sub === "task") {
emitJson(commandName, await debugTask(config, third ?? "latest"));
return;
}
}
+42 -5
View File
@@ -1,6 +1,13 @@
import { runCommand } from "./command";
import { type UniDeskConfig, repoRoot } from "./config";
export const dispatchCommands = ["docker.ps", "provider.upgrade", "host.ssh", "echo"] as const;
export type DebugDispatchCommand = typeof dispatchCommands[number];
export function isDebugDispatchCommand(value: unknown): value is DebugDispatchCommand {
return dispatchCommands.includes(value as DebugDispatchCommand);
}
async function readJson(url: string, init?: RequestInit): Promise<unknown> {
const controller = new AbortController();
const timer = setTimeout(() => controller.abort(), 5000);
@@ -134,9 +141,39 @@ export async function debugHealth(config: UniDeskConfig): Promise<unknown> {
};
}
export async function debugDispatch(config: UniDeskConfig, providerId: string, command: "docker.ps" | "provider.upgrade" | "echo"): Promise<unknown> {
return coreInternalFetch("/api/dispatch", {
method: "POST",
body: { providerId, command, payload: command === "provider.upgrade" ? { source: "cli-debug", mode: "plan" } : { source: "cli-debug" } },
});
async function waitForTask(taskId: string, timeoutMs: number): Promise<unknown> {
const started = Date.now();
let latest: unknown = null;
while (Date.now() - started < timeoutMs) {
latest = coreInternalFetch("/api/tasks?limit=100");
const tasks = (latest as { body?: { tasks?: Array<{ id?: string; status?: string; result?: unknown }> } }).body?.tasks ?? [];
const task = tasks.find((item) => item.id === taskId);
if (task?.status === "succeeded" || task?.status === "failed") return { ok: true, task };
await Bun.sleep(500);
}
return { ok: false, timeoutMs, latest };
}
export async function debugTask(_config: UniDeskConfig, taskId: string): Promise<unknown> {
const tasksResponse = coreInternalFetch("/api/tasks?limit=100");
const tasks = (tasksResponse as { body?: { tasks?: Array<{ id?: string }> } }).body?.tasks ?? [];
const task = taskId === "latest" ? tasks[0] : tasks.find((item) => item.id === taskId);
return { tasksResponse, taskId, task: task ?? null };
}
export async function debugDispatch(
config: UniDeskConfig,
providerId: string,
command: DebugDispatchCommand,
payload?: Record<string, unknown>,
waitMs = 0,
): Promise<unknown> {
const dispatchPayload = payload ?? (command === "provider.upgrade" ? { source: "cli-debug", mode: "plan" } : { source: "cli-debug" });
const dispatch = coreInternalFetch("/api/dispatch", {
method: "POST",
body: { providerId, command, payload: dispatchPayload },
});
const taskId = (dispatch as { body?: { taskId?: string } }).body?.taskId ?? "";
const wait = waitMs > 0 && taskId.length > 0 ? await waitForTask(taskId, waitMs) : null;
return { dispatch, wait };
}
+1
View File
@@ -95,6 +95,7 @@ export function writeComposeEnv(config: UniDeskConfig, freshLogPrefix: boolean):
UNIDESK_PROVIDER_UPGRADE_RUNNER_IMAGE: config.providerGateway.upgrade.runnerImage,
UNIDESK_LOG_DIR: logDir,
UNIDESK_LOG_PREFIX: logPrefix,
UNIDESK_HOST_SSH_KEY_DIR: config.sshForwarding.keyDir,
UNIDESK_HOST_SSH_HOST: config.sshForwarding.host,
UNIDESK_HOST_SSH_PORT: String(config.sshForwarding.port),
UNIDESK_HOST_SSH_USER: config.sshForwarding.user,
+2
View File
@@ -331,6 +331,7 @@ async function frontendCheck(config: UniDeskConfig, urls: PublicUrls, checks: E2
await page.getByRole("button", { name: /资源节点/ }).click();
await page.getByRole("button", { name: /资源监控/ }).click();
await page.waitForSelector('[data-testid="node-monitor-page"]', { timeout: 10000 });
await page.locator('[data-testid="node-monitor-page"]').getByRole("button", { name: new RegExp(config.providerGateway.id) }).click();
await page.waitForSelector('[data-testid="metric-chart-cpu"]', { timeout: 10000 });
await page.waitForSelector('[data-testid="metric-chart-memory"]', { timeout: 10000 });
await page.waitForSelector('[data-testid="metric-chart-disk"]', { timeout: 10000 });
@@ -344,6 +345,7 @@ async function frontendCheck(config: UniDeskConfig, urls: PublicUrls, checks: E2
const upgradeControlText = await page.locator('[data-testid="provider-upgrade-control"]').innerText({ timeout: 5000 });
await page.getByRole("button", { name: /Docker 状态/ }).click();
await page.waitForSelector('[data-testid="docker-status-page"]', { timeout: 10000 });
await page.locator('[data-testid="docker-status-page"]').getByRole("button", { name: new RegExp(config.providerGateway.id) }).click();
await page.waitForSelector('[data-testid="docker-container-table"]', { timeout: 10000 });
await page.waitForSelector('[data-testid="database-volume-card"]', { timeout: 10000 });
await page.waitForFunction(() => {
+186
View File
@@ -0,0 +1,186 @@
import { spawn } from "node:child_process";
import { type UniDeskConfig, repoRoot } from "./config";
interface ParsedSshArgs {
remoteCommand: string | null;
}
const sshOptionsWithValue = new Set([
"-B", "-b", "-c", "-D", "-E", "-e", "-F", "-I", "-i", "-J", "-L", "-l", "-m", "-O", "-o", "-p", "-Q", "-R", "-S", "-W", "-w",
]);
function parseSshArgs(args: string[]): ParsedSshArgs {
const remote: string[] = [];
let remoteStarted = false;
for (let index = 0; index < args.length; index += 1) {
const arg = args[index] ?? "";
if (remoteStarted) {
remote.push(arg);
continue;
}
if (arg === "--") {
remoteStarted = true;
continue;
}
if (arg.startsWith("-") && arg !== "-") {
if (sshOptionsWithValue.has(arg) && index + 1 < args.length) index += 1;
continue;
}
remoteStarted = true;
remote.push(arg);
}
return { remoteCommand: remote.length === 0 ? null : remote.join(" ") };
}
function brokerSource(): string {
return String.raw`
const open = JSON.parse(process.argv[2] || process.argv[1] || "{}");
const token = process.env.PROVIDER_TOKEN || "";
const url = "ws://127.0.0.1:8080/ws/ssh?token=" + encodeURIComponent(token);
const ws = new WebSocket(url);
let exitCode = 255;
let canSend = false;
let opened = false;
const pending = [];
const openTimer = setTimeout(() => {
if (opened) return;
process.stderr.write("unidesk ssh bridge timed out waiting for provider session\n");
try { ws.close(); } catch {}
process.exit(255);
}, Number(open.openTimeoutMs || 15000));
function send(value) {
const text = JSON.stringify(value);
if (!canSend || ws.readyState !== WebSocket.OPEN) {
pending.push(text);
return;
}
ws.send(text);
}
function flush() {
while (pending.length > 0 && ws.readyState === WebSocket.OPEN) {
ws.send(pending.shift());
}
}
function decodeData(data) {
return typeof data === "string" ? data : Buffer.from(data).toString("utf8");
}
ws.addEventListener("open", () => {
canSend = true;
send({
type: "ssh.open",
providerId: open.providerId,
command: open.command || undefined,
cwd: open.cwd || undefined,
cols: open.cols || 100,
rows: open.rows || 30,
});
flush();
});
ws.addEventListener("message", (event) => {
const message = JSON.parse(decodeData(event.data));
if (message.type === "ssh.data") {
opened = true;
const chunk = Buffer.from(message.data || "", "base64");
if (message.stream === "stderr") process.stderr.write(chunk);
else process.stdout.write(chunk);
return;
}
if (message.type === "ssh.opened") {
opened = true;
clearTimeout(openTimer);
return;
}
if (message.type === "ssh.dispatched") return;
if (message.type === "ssh.error") {
clearTimeout(openTimer);
process.stderr.write(String(message.message || "ssh bridge error") + "\n");
exitCode = 255;
ws.close();
return;
}
if (message.type === "ssh.exit") {
clearTimeout(openTimer);
exitCode = Number.isInteger(message.exitCode) ? message.exitCode : 255;
ws.close();
}
});
ws.addEventListener("close", () => {
process.exit(exitCode);
});
ws.addEventListener("error", () => {
process.stderr.write("unidesk ssh bridge websocket error\n");
process.exit(255);
});
process.stdin.on("data", (chunk) => {
send({ type: "ssh.input", data: Buffer.from(chunk).toString("base64"), encoding: "base64" });
flush();
});
process.stdin.on("end", () => {
send({ type: "ssh.eof" });
flush();
});
`;
}
function terminalSize(): { cols: number; rows: number } {
return {
cols: Number(process.stdout.columns) > 0 ? Number(process.stdout.columns) : 100,
rows: Number(process.stdout.rows) > 0 ? Number(process.stdout.rows) : 30,
};
}
export async function runSsh(config: UniDeskConfig, providerId: string, args: string[]): Promise<number> {
if (!providerId) throw new Error("ssh requires provider id, for example: bun scripts/cli.ts ssh D518");
const parsed = parseSshArgs(args);
const size = terminalSize();
const payload = {
providerId,
command: parsed.remoteCommand,
cols: size.cols,
rows: size.rows,
};
const child = spawn("docker", [
"exec",
"-i",
"unidesk-backend-core",
"bun",
"-e",
brokerSource(),
"--",
JSON.stringify(payload),
], {
cwd: repoRoot,
stdio: ["pipe", "pipe", "pipe"],
});
const rawMode = parsed.remoteCommand === null && process.stdin.isTTY;
if (rawMode) process.stdin.setRawMode(true);
process.stdin.resume();
process.stdin.pipe(child.stdin);
child.stdout.pipe(process.stdout);
child.stderr.pipe(process.stderr);
return await new Promise<number>((resolve) => {
const restore = (): void => {
process.stdin.unpipe(child.stdin);
if (rawMode) process.stdin.setRawMode(false);
};
child.on("error", (error) => {
restore();
process.stderr.write(`unidesk ssh failed to start broker: ${error.message}\n`);
resolve(255);
});
child.on("close", (code) => {
restore();
resolve(code ?? 255);
});
});
}
+234 -6
View File
@@ -8,9 +8,19 @@ import {
type ApiNodeDockerStatus,
type ApiNodeSystemStatus,
type ApiTask,
type CoreHostSshCloseMessage,
type CoreHostSshEofMessage,
type CoreHostSshInputMessage,
type CoreHostSshOpenMessage,
type CoreHostSshResizeMessage,
type CoreDispatchMessage,
type JsonValue,
type ProviderHostSshDataMessage,
type ProviderHostSshErrorMessage,
type ProviderHostSshExitMessage,
type ProviderHostSshOpenedMessage,
type ProviderLabels,
isProviderDispatchCommand,
type ProviderToCoreMessage,
isProviderToCoreMessage,
} from "../../shared/src/index";
@@ -27,6 +37,8 @@ interface RuntimeConfig {
interface WsData {
providerId?: string;
channel?: "provider" | "ssh";
sessionId?: string;
}
type ProviderSocket = ServerWebSocket<WsData>;
@@ -35,6 +47,7 @@ type SqlClient = ReturnType<typeof postgres>;
const recentLogs: unknown[] = [];
const activeProviders = new Map<string, ProviderSocket>();
const activeSshClients = new Map<string, ProviderSocket>();
const serviceStartedAt = new Date();
const config = readConfig();
const logger = createLogger("backend-core", config.logFile);
@@ -281,11 +294,30 @@ async function upsertNodeOnline(providerId: string, name: string, labels: Provid
async function touchHeartbeat(providerId: string, labels: ProviderLabels): Promise<void> {
await sql`
UPDATE unidesk_nodes
SET labels = ${sql.json(labels)}, status = 'online', last_heartbeat = now(), updated_at = now()
SET labels = unidesk_nodes.labels || ${sql.json(labels)}, status = 'online', last_heartbeat = now(), updated_at = now()
WHERE provider_id = ${providerId}
`;
}
async function providerCapabilities(providerId: string): Promise<string[]> {
if (!dbReady) return [];
const rows = await sql<Array<{ labels: unknown }>>`
SELECT labels
FROM unidesk_nodes
WHERE provider_id = ${providerId}
LIMIT 1
`;
const labels = rows[0]?.labels;
if (typeof labels !== "object" || labels === null || Array.isArray(labels)) return [];
const capabilities = (labels as Record<string, unknown>).unideskCapabilities;
return Array.isArray(capabilities) ? capabilities.filter((item): item is string => typeof item === "string") : [];
}
async function providerSupports(providerId: string, capability: string): Promise<boolean> {
const capabilities = await providerCapabilities(providerId);
return capabilities.includes(capability);
}
async function upsertDockerStatus(providerId: string, status: JsonValue, collectedAt: string): Promise<void> {
await sql`
INSERT INTO unidesk_node_docker_status (provider_id, status, collected_at, updated_at)
@@ -415,17 +447,75 @@ function parseMessage(raw: string | Buffer): ProviderToCoreMessage {
return parsed;
}
function safeSessionId(): string {
return `ssh_${Date.now()}_${Math.random().toString(16).slice(2)}`;
}
function wsSendJson(ws: ProviderSocket, value: unknown): void {
if (ws.readyState === WebSocket.OPEN) ws.send(JSON.stringify(value));
}
function sshClientFor(sessionId: string): ProviderSocket | null {
return activeSshClients.get(sessionId) ?? null;
}
function providerForSsh(providerId: string): ProviderSocket | null {
return activeProviders.get(providerId) ?? null;
}
function closeSshClient(sessionId: string, code = 1000, reason = "ssh session closed"): void {
const client = sshClientFor(sessionId);
activeSshClients.delete(sessionId);
if (client !== null && client.readyState === WebSocket.OPEN) client.close(code, reason);
}
function forwardSshProviderMessage(
message: ProviderHostSshOpenedMessage | ProviderHostSshDataMessage | ProviderHostSshExitMessage | ProviderHostSshErrorMessage,
): void {
const client = sshClientFor(message.sessionId);
if (client === null) {
logger("warn", "ssh_client_missing", { providerId: message.providerId, sessionId: message.sessionId, type: message.type });
return;
}
if (message.type === "host_ssh_opened") {
wsSendJson(client, { type: "ssh.opened", providerId: message.providerId, sessionId: message.sessionId });
return;
}
if (message.type === "host_ssh_data") {
wsSendJson(client, { type: "ssh.data", stream: message.stream, data: message.data, encoding: message.encoding });
return;
}
if (message.type === "host_ssh_exit") {
wsSendJson(client, { type: "ssh.exit", exitCode: message.exitCode, signal: message.signal });
setTimeout(() => closeSshClient(message.sessionId), 50);
return;
}
wsSendJson(client, { type: "ssh.error", message: message.message });
setTimeout(() => closeSshClient(message.sessionId, 1011, "ssh session error"), 50);
}
async function handleProviderMessage(ws: ProviderSocket, raw: string | Buffer): Promise<void> {
const message = parseMessage(raw);
ws.data.providerId = message.providerId;
activeProviders.set(message.providerId, ws);
if (
message.type === "host_ssh_opened" ||
message.type === "host_ssh_data" ||
message.type === "host_ssh_exit" ||
message.type === "host_ssh_error"
) {
forwardSshProviderMessage(message);
return;
}
if (message.type === "register") {
await upsertNodeOnline(message.providerId, message.name, message.labels);
const labels = { ...message.labels, unideskCapabilities: message.capabilities };
await upsertNodeOnline(message.providerId, message.name, labels);
await recordEvent("provider_registered", message.providerId, {
providerId: message.providerId,
name: message.name,
labels: message.labels,
labels,
capabilities: message.capabilities,
});
ws.send(JSON.stringify({ type: "ack", requestId: "register", ok: true, message: "registered" }));
@@ -648,11 +738,17 @@ async function getOverview(): Promise<JsonValue> {
async function dispatchTask(req: Request): Promise<Response> {
const body = (await req.json()) as { providerId?: unknown; command?: unknown; payload?: unknown };
const providerId = typeof body.providerId === "string" ? body.providerId : "";
const command = body.command === "docker.ps" || body.command === "provider.upgrade" || body.command === "echo" ? body.command : "echo";
const command = isProviderDispatchCommand(body.command) ? body.command : null;
const payload = typeof body.payload === "object" && body.payload !== null ? (body.payload as Record<string, JsonValue>) : {};
if (!providerId) {
return jsonResponse({ ok: false, error: "providerId is required" }, 400);
}
if (command === null) {
return jsonResponse({ ok: false, error: "command must be one of docker.ps, provider.upgrade, host.ssh, echo" }, 400);
}
if (command === "host.ssh" && !(await providerSupports(providerId, "host.ssh"))) {
return jsonResponse({ ok: false, error: `provider does not declare host.ssh capability: ${providerId}` }, 409);
}
const taskId = `task_${Date.now()}_${Math.random().toString(16).slice(2)}`;
await sql`
INSERT INTO unidesk_tasks (id, provider_id, command, status, payload, result)
@@ -674,11 +770,113 @@ async function dispatchTask(req: Request): Promise<Response> {
return jsonResponse({ ok: true, taskId, status: "dispatched", providerOnline: true });
}
async function route(req: Request): Promise<Response> {
function numberFromUnknown(value: unknown, fallback: number, min: number, max: number): number {
const parsed = typeof value === "number" ? value : typeof value === "string" ? Number(value) : fallback;
if (!Number.isFinite(parsed)) return fallback;
return Math.max(min, Math.min(max, Math.floor(parsed)));
}
async function handleSshClientMessage(ws: ProviderSocket, raw: string | Buffer): Promise<void> {
const text = typeof raw === "string" ? raw : raw.toString("utf8");
const message = JSON.parse(text) as { type?: unknown; providerId?: unknown; cwd?: unknown; command?: unknown; data?: unknown; encoding?: unknown; cols?: unknown; rows?: unknown };
if (message.type === "ssh.open") {
const providerId = typeof message.providerId === "string" ? message.providerId : "";
if (providerId.length === 0) {
wsSendJson(ws, { type: "ssh.error", message: "providerId is required" });
return;
}
const provider = providerForSsh(providerId);
if (provider === null) {
wsSendJson(ws, { type: "ssh.error", message: `provider is not online: ${providerId}` });
return;
}
if (!(await providerSupports(providerId, "host.ssh"))) {
wsSendJson(ws, { type: "ssh.error", message: `provider does not declare host.ssh capability: ${providerId}` });
return;
}
const sessionId = safeSessionId();
ws.data.channel = "ssh";
ws.data.providerId = providerId;
ws.data.sessionId = sessionId;
activeSshClients.set(sessionId, ws);
const openMessage: CoreHostSshOpenMessage = {
type: "host_ssh_open",
sessionId,
cols: numberFromUnknown(message.cols, 100, 20, 300),
rows: numberFromUnknown(message.rows, 30, 8, 120),
};
if (typeof message.cwd === "string" && message.cwd.length > 0) openMessage.cwd = message.cwd;
if (typeof message.command === "string" && message.command.length > 0) openMessage.command = message.command;
provider.send(JSON.stringify(openMessage));
wsSendJson(ws, { type: "ssh.dispatched", providerId, sessionId });
logger("info", "ssh_session_dispatched", { providerId, sessionId, hasCommand: typeof openMessage.command === "string" });
return;
}
const sessionId = ws.data.sessionId;
const providerId = ws.data.providerId;
if (sessionId === undefined || providerId === undefined) {
if (message.type === "ssh.eof" || message.type === "ssh.close") return;
wsSendJson(ws, { type: "ssh.error", message: "ssh session is not open" });
return;
}
const provider = providerForSsh(providerId);
if (provider === null) {
wsSendJson(ws, { type: "ssh.error", message: `provider went offline: ${providerId}` });
return;
}
if (message.type === "ssh.input") {
if (typeof message.data !== "string" || message.encoding !== "base64") {
wsSendJson(ws, { type: "ssh.error", message: "ssh.input requires base64 data" });
return;
}
const inputMessage: CoreHostSshInputMessage = { type: "host_ssh_input", sessionId, data: message.data, encoding: "base64" };
provider.send(JSON.stringify(inputMessage));
return;
}
if (message.type === "ssh.resize") {
const resizeMessage: CoreHostSshResizeMessage = {
type: "host_ssh_resize",
sessionId,
cols: numberFromUnknown(message.cols, 100, 20, 300),
rows: numberFromUnknown(message.rows, 30, 8, 120),
};
provider.send(JSON.stringify(resizeMessage));
return;
}
if (message.type === "ssh.eof") {
const eofMessage: CoreHostSshEofMessage = { type: "host_ssh_eof", sessionId };
provider.send(JSON.stringify(eofMessage));
return;
}
if (message.type === "ssh.close") {
const closeMessage: CoreHostSshCloseMessage = { type: "host_ssh_close", sessionId };
provider.send(JSON.stringify(closeMessage));
closeSshClient(sessionId);
return;
}
wsSendJson(ws, { type: "ssh.error", message: `unsupported ssh client message: ${String(message.type)}` });
}
async function sshRoute(req: Request, server: Server<WsData>): Promise<Response | undefined> {
const url = new URL(req.url);
const token = url.searchParams.get("token") ?? req.headers.get("x-provider-token");
if (token !== config.providerToken) return jsonResponse({ ok: false, error: "invalid ssh bridge token" }, 401);
const upgraded = server.upgrade(req, { data: { channel: "ssh" } satisfies WsData });
return upgraded ? undefined : jsonResponse({ ok: false, error: "websocket upgrade failed" }, 400);
}
async function route(req: Request, server: Server<WsData>): Promise<Response | undefined> {
const url = new URL(req.url);
if (req.method === "OPTIONS") return jsonResponse({ ok: true });
try {
if (url.pathname === "/ws/ssh") return sshRoute(req, server);
if (url.pathname === "/" || url.pathname === "/health") {
return jsonResponse({ ok: true, service: "unidesk-core", dbReady, startedAt: serviceStartedAt.toISOString() });
}
@@ -714,7 +912,7 @@ async function providerRoute(req: Request, server: Server<WsData>): Promise<Resp
await recordEvent("provider_auth_failed", "unknown", { remote: req.headers.get("x-forwarded-for") ?? null });
return jsonResponse({ ok: false, error: "invalid provider token" }, 401);
}
const upgraded = server.upgrade(req, { data: {} satisfies WsData });
const upgraded = server.upgrade(req, { data: { channel: "provider" } satisfies WsData });
return upgraded ? undefined : jsonResponse({ ok: false, error: "websocket upgrade failed" }, 400);
}
@@ -733,6 +931,36 @@ const apiServer = Bun.serve<WsData>({
port: config.port,
hostname: "0.0.0.0",
fetch: route,
websocket: {
open(ws) {
if (ws.data.channel === "ssh") logger("info", "ssh_client_open");
},
message(ws, raw) {
if (ws.data.channel !== "ssh") {
ws.close(1008, "unsupported websocket channel");
return;
}
handleSshClientMessage(ws, raw).catch((error) => {
logger("error", "ssh_client_message_failed", { providerId: ws.data.providerId ?? null, sessionId: ws.data.sessionId ?? null, error: errorToJson(error) });
wsSendJson(ws, { type: "ssh.error", message: error instanceof Error ? error.message : String(error) });
});
},
close(ws) {
if (ws.data.channel !== "ssh") return;
const { sessionId, providerId } = ws.data;
logger("warn", "ssh_client_close", { providerId: providerId ?? null, sessionId: sessionId ?? null });
if (sessionId !== undefined) {
activeSshClients.delete(sessionId);
if (providerId !== undefined) {
const provider = providerForSsh(providerId);
if (provider !== null) {
const closeMessage: CoreHostSshCloseMessage = { type: "host_ssh_close", sessionId };
provider.send(JSON.stringify(closeMessage));
}
}
}
},
},
});
const providerServer = Bun.serve<WsData>({
+1
View File
@@ -835,6 +835,7 @@ function DispatchPage({ nodes, onDispatched, onRaw }: AnyRecord) {
)),
h("label", null, "Command", h("select", { value: command, onChange: (event: any) => setCommand(event.target.value) },
h("option", { value: "docker.ps" }, "docker.ps"),
h("option", { value: "host.ssh" }, "host.ssh"),
h("option", { value: "echo" }, "echo"),
)),
h("label", null, "来源", h("input", { value: source, onChange: (event: any) => setSource(event.target.value) })),
+361 -4
View File
@@ -2,6 +2,11 @@ import { appendFileSync, existsSync, mkdirSync, readFileSync } from "node:fs";
import { dirname } from "node:path";
import {
type CoreDispatchMessage,
type CoreHostSshCloseMessage,
type CoreHostSshEofMessage,
type CoreHostSshInputMessage,
type CoreHostSshOpenMessage,
type CoreHostSshResizeMessage,
type DockerContainerSummary,
type DockerImageSummary,
type DockerNetworkSummary,
@@ -33,6 +38,12 @@ interface RuntimeConfig {
upgradeComposeProject: string;
upgradeService: string;
upgradeRunnerImage: string;
hostSshHost: string | null;
hostSshPort: number | null;
hostSshUser: string | null;
hostSshKey: string | null;
hostRemoteCwd: string | null;
hostLoginShell: string | null;
logFile: string;
}
@@ -49,6 +60,18 @@ let previousCpuSample: { idle: number; total: number } | null = null;
let reconnectAttempt = 0;
let stopping = false;
interface HostSshSession {
proc: ReturnType<typeof Bun.spawn>;
openedAt: number;
}
interface HostSshStdin {
write(chunk: Uint8Array): unknown;
end(): unknown;
}
const hostSshSessions = new Map<string, HostSshSession>();
function requiredEnv(name: string): string {
const value = process.env[name];
if (value === undefined || value.length === 0) {
@@ -66,6 +89,22 @@ function readNumberEnv(name: string): number {
return parsed;
}
function readOptionalStringEnv(name: string): string | null {
const value = process.env[name];
if (value === undefined || value.length === 0) return null;
return value;
}
function readOptionalNumberEnv(name: string): number | null {
const raw = readOptionalStringEnv(name);
if (raw === null) return null;
const parsed = Number(raw);
if (!Number.isFinite(parsed) || parsed <= 0) {
throw new Error(`Environment variable ${name} must be a positive number, got ${raw}`);
}
return parsed;
}
function readBooleanEnv(name: string): boolean {
const raw = requiredEnv(name);
if (raw === "true") return true;
@@ -93,6 +132,12 @@ function readConfig(): RuntimeConfig {
upgradeComposeProject: requiredEnv("PROVIDER_UPGRADE_COMPOSE_PROJECT"),
upgradeService: requiredEnv("PROVIDER_UPGRADE_SERVICE"),
upgradeRunnerImage: requiredEnv("PROVIDER_UPGRADE_RUNNER_IMAGE"),
hostSshHost: readOptionalStringEnv("HOST_SSH_HOST"),
hostSshPort: readOptionalNumberEnv("HOST_SSH_PORT"),
hostSshUser: readOptionalStringEnv("HOST_SSH_USER"),
hostSshKey: readOptionalStringEnv("HOST_SSH_KEY"),
hostRemoteCwd: readOptionalStringEnv("HOST_REMOTE_CWD"),
hostLoginShell: readOptionalStringEnv("HOST_LOGIN_SHELL"),
logFile: requiredEnv("LOG_FILE"),
};
}
@@ -121,9 +166,13 @@ function withToken(rawUrl: string, token: string): string {
}
function currentLabels(): ProviderLabels {
const hostSshConfigured = isHostSshConfigured();
return {
...config.labels,
dockerSocketPresent: existsSync(config.dockerSocketPath),
hostSshConfigured,
hostSshKeyPresent: config.hostSshKey !== null && existsSync(config.hostSshKey),
hostSshTarget: hostSshConfigured ? `${config.hostSshUser}@${config.hostSshHost}:${config.hostSshPort}` : "not-configured",
runtime: "bun",
gatewayUptimeSeconds: Math.floor((Date.now() - startedAt.getTime()) / 1000),
};
@@ -135,13 +184,15 @@ function sendJson(value: unknown): void {
}
function sendRegister(): void {
const capabilities = ["heartbeat", "system.status", "docker.status", "docker.ps", "provider.upgrade", "echo"];
if (isHostSshConfigured()) capabilities.push("host.ssh");
sendJson({
type: "register",
providerId: config.providerId,
name: config.providerName,
labels: currentLabels(),
startedAt: startedAt.toISOString(),
capabilities: ["heartbeat", "system.status", "docker.status", "docker.ps", "provider.upgrade", "echo"],
capabilities,
});
}
@@ -209,19 +260,23 @@ async function sendTaskStatus(taskId: string, status: ProviderTaskStatusMessage[
});
}
async function runProcessCommand(command: string, args: string[], timeoutMs = 6000): Promise<{ ok: boolean; stdout: string; stderr: string; exitCode: number }> {
async function runProcessCommand(command: string, args: string[], timeoutMs = 6000): Promise<{ ok: boolean; stdout: string; stderr: string; exitCode: number; timedOut: boolean }> {
const proc = Bun.spawn([command, ...args], {
stdout: "pipe",
stderr: "pipe",
});
const timeout = setTimeout(() => proc.kill("SIGKILL"), timeoutMs);
let timedOut = false;
const timeout = setTimeout(() => {
timedOut = true;
proc.kill("SIGKILL");
}, timeoutMs);
const [stdout, stderr, exitCode] = await Promise.all([
new Response(proc.stdout).text(),
new Response(proc.stderr).text(),
proc.exited,
]);
clearTimeout(timeout);
return { ok: exitCode === 0, stdout, stderr, exitCode };
return { ok: exitCode === 0 && !timedOut, stdout, stderr, exitCode, timedOut };
}
async function runDockerCommand(args: string[], timeoutMs = 6000): Promise<{ ok: boolean; stdout: string; stderr: string; exitCode: number }> {
@@ -508,6 +563,274 @@ function shellQuote(value: string): string {
return `'${value.replace(/'/g, `'\\''`)}'`;
}
function isHostSshConfigured(): boolean {
return config.hostSshHost !== null && config.hostSshPort !== null && config.hostSshUser !== null && config.hostSshKey !== null;
}
function missingHostSshFields(): string[] {
const missing: string[] = [];
if (config.hostSshHost === null) missing.push("HOST_SSH_HOST");
if (config.hostSshPort === null) missing.push("HOST_SSH_PORT");
if (config.hostSshUser === null) missing.push("HOST_SSH_USER");
if (config.hostSshKey === null) missing.push("HOST_SSH_KEY");
return missing;
}
function payloadString(payload: Record<string, JsonValue>, key: string): string | null {
const value = payload[key];
return typeof value === "string" && value.length > 0 ? value : null;
}
function payloadTimeoutMs(payload: Record<string, JsonValue>): number {
const raw = payload.timeoutMs;
const value = typeof raw === "number" ? raw : typeof raw === "string" ? Number(raw) : 8000;
if (!Number.isFinite(value) || value <= 0) return 8000;
return Math.max(1000, Math.min(15_000, Math.floor(value)));
}
function truncateText(value: string, maxLength = 4000): string {
return value.length > maxLength ? `${value.slice(0, maxLength)}...<truncated:${value.length}>` : value;
}
function safeKnownHostsName(providerId: string): string {
return providerId.replace(/[^a-zA-Z0-9_.-]/g, "-").slice(0, 80) || "provider";
}
function defaultHostSshProbeCommand(): string {
return "printf 'UNIDESK_SSH_TEST user=%s host=%s bridge=%s cwd=%s\\n' \"$(whoami)\" \"$(hostname)\" \"${UNIDESK_BRIDGE:-}\" \"$(pwd)\"";
}
async function runHostSsh(payload: Record<string, JsonValue>): Promise<JsonValue> {
if (!isHostSshConfigured()) {
throw new Error(`host SSH bridge is not configured; missing ${missingHostSshFields().join(", ")}`);
}
const host = config.hostSshHost ?? "";
const port = config.hostSshPort ?? 22;
const user = config.hostSshUser ?? "";
const key = config.hostSshKey ?? "";
if (!existsSync(key)) {
throw new Error(`host SSH key is not mounted at ${key}`);
}
const mode = payload.mode === "exec" ? "exec" : "probe";
const timeoutMs = payloadTimeoutMs(payload);
const requestedCommand = payloadString(payload, "command");
const command = mode === "exec" && requestedCommand !== null ? requestedCommand : defaultHostSshProbeCommand();
if (command.length > 4000) {
throw new Error(`host SSH command is too long: ${command.length} bytes`);
}
const cwd = payloadString(payload, "cwd") ?? config.hostRemoteCwd;
const scriptParts = [
"set -e",
cwd === null ? null : `cd ${shellQuote(cwd)}`,
"export UNIDESK_BRIDGE=host.ssh",
`export UNIDESK_PROVIDER_ID=${shellQuote(config.providerId)}`,
config.hostLoginShell === null
? command
: `${shellQuote(config.hostLoginShell)} -lc ${shellQuote(command)}`,
].filter((part): part is string => part !== null);
const remoteScript = scriptParts.join("; ");
const result = await runProcessCommand("ssh", [
"-T",
"-i",
key,
"-p",
String(port),
"-o",
"BatchMode=yes",
"-o",
"StrictHostKeyChecking=accept-new",
"-o",
`UserKnownHostsFile=/tmp/unidesk-host-known-hosts-${safeKnownHostsName(config.providerId)}`,
"-o",
"ServerAliveInterval=20",
"-o",
"ServerAliveCountMax=3",
`${user}@${host}`,
remoteScript,
], timeoutMs);
const stdout = truncateText(result.stdout);
const stderr = truncateText(result.stderr);
const probeLine = stdout.split("\n").find((line) => line.includes("UNIDESK_SSH_TEST")) ?? "";
return {
ok: result.ok,
mode,
host,
port,
user,
cwd: cwd ?? "",
timeoutMs,
timedOut: result.timedOut,
exitCode: result.exitCode,
command: truncateText(command, 1000),
stdout,
stderr,
probeLine,
sshKeyPresent: true,
};
}
function hostSshRemoteScript(command: string | null, cwd: string | null, cols?: number, rows?: number): string {
const fallbackCwd = config.hostRemoteCwd ?? `/home/${config.hostSshUser ?? "root"}`;
const requestedCwd = cwd ?? fallbackCwd;
const loginShell = config.hostLoginShell ?? "/bin/bash";
const resize = Number.isFinite(cols) && Number.isFinite(rows)
? `stty rows ${Math.max(8, Math.min(120, Math.floor(rows ?? 30)))} cols ${Math.max(20, Math.min(300, Math.floor(cols ?? 100)))} 2>/dev/null || true`
: "true";
const enterCwd = `cd ${shellQuote(requestedCwd)} 2>/dev/null || cd ${shellQuote(fallbackCwd)} 2>/dev/null || cd`;
const exports = `export UNIDESK_BRIDGE=host.ssh; export UNIDESK_PROVIDER_ID=${shellQuote(config.providerId)}`;
const execPart = command === null || command.length === 0
? `exec ${shellQuote(loginShell)} -l`
: `${shellQuote(loginShell)} -lc ${shellQuote(command)}`;
return `${enterCwd}; ${exports}; ${resize}; ${execPart}`;
}
function hostSshArgs(remoteScript: string): string[] {
if (!isHostSshConfigured()) {
throw new Error(`host SSH bridge is not configured; missing ${missingHostSshFields().join(", ")}`);
}
const key = config.hostSshKey ?? "";
if (!existsSync(key)) {
throw new Error(`host SSH key is not mounted at ${key}`);
}
return [
"-tt",
"-i",
key,
"-p",
String(config.hostSshPort ?? 22),
"-o",
"BatchMode=yes",
"-o",
"StrictHostKeyChecking=accept-new",
"-o",
`UserKnownHostsFile=/tmp/unidesk-host-known-hosts-${safeKnownHostsName(config.providerId)}`,
"-o",
"ServerAliveInterval=20",
"-o",
"ServerAliveCountMax=3",
`${config.hostSshUser}@${config.hostSshHost}`,
remoteScript,
];
}
async function pumpHostSshOutput(sessionId: string, streamName: "stdout" | "stderr", stream: ReadableStream<Uint8Array> | null): Promise<void> {
if (stream === null) return;
const reader = stream.getReader();
while (true) {
const chunk = await reader.read();
if (chunk.done) return;
sendJson({
type: "host_ssh_data",
providerId: config.providerId,
sessionId,
stream: streamName,
data: Buffer.from(chunk.value).toString("base64"),
encoding: "base64",
at: new Date().toISOString(),
});
}
}
function sendHostSshError(sessionId: string, error: unknown): void {
const message = error instanceof Error ? error.message : String(error);
logger("error", "host_ssh_session_error", { sessionId, error: message });
sendJson({
type: "host_ssh_error",
providerId: config.providerId,
sessionId,
message,
at: new Date().toISOString(),
});
}
function startHostSshSession(message: CoreHostSshOpenMessage): void {
if (hostSshSessions.has(message.sessionId)) {
sendHostSshError(message.sessionId, `host SSH session already exists: ${message.sessionId}`);
return;
}
try {
const remoteScript = hostSshRemoteScript(message.command ?? null, message.cwd ?? null, message.cols, message.rows);
const proc = Bun.spawn(["ssh", ...hostSshArgs(remoteScript)], {
stdin: "pipe",
stdout: "pipe",
stderr: "pipe",
});
hostSshSessions.set(message.sessionId, { proc, openedAt: Date.now() });
sendJson({
type: "host_ssh_opened",
providerId: config.providerId,
sessionId: message.sessionId,
at: new Date().toISOString(),
});
const stdoutDone = pumpHostSshOutput(message.sessionId, "stdout", proc.stdout);
const stderrDone = pumpHostSshOutput(message.sessionId, "stderr", proc.stderr);
Promise.all([stdoutDone, stderrDone, proc.exited])
.then(([, , exitCode]) => {
hostSshSessions.delete(message.sessionId);
sendJson({
type: "host_ssh_exit",
providerId: config.providerId,
sessionId: message.sessionId,
exitCode,
signal: null,
at: new Date().toISOString(),
});
})
.catch((error) => {
hostSshSessions.delete(message.sessionId);
sendHostSshError(message.sessionId, error);
});
logger("info", "host_ssh_session_started", { sessionId: message.sessionId, hasCommand: typeof message.command === "string", cwd: message.cwd ?? null });
} catch (error) {
sendHostSshError(message.sessionId, error);
}
}
function writeHostSshInput(message: CoreHostSshInputMessage): void {
const session = hostSshSessions.get(message.sessionId);
if (session === undefined) {
sendHostSshError(message.sessionId, `unknown host SSH session: ${message.sessionId}`);
return;
}
try {
const stdin = session.proc.stdin as HostSshStdin | undefined;
if (stdin === undefined) throw new Error("host SSH stdin is not available");
stdin.write(Buffer.from(message.data, "base64"));
} catch (error) {
sendHostSshError(message.sessionId, error);
}
}
function resizeHostSshSession(message: CoreHostSshResizeMessage): void {
logger("debug", "host_ssh_resize_requested", { sessionId: message.sessionId, cols: message.cols, rows: message.rows });
}
function eofHostSshSession(message: CoreHostSshEofMessage): void {
const session = hostSshSessions.get(message.sessionId);
if (session === undefined) return;
try {
const stdin = session.proc.stdin as HostSshStdin | undefined;
if (stdin !== undefined) stdin.end();
} catch (error) {
sendHostSshError(message.sessionId, error);
}
}
function closeHostSshSession(message: CoreHostSshCloseMessage): void {
const session = hostSshSessions.get(message.sessionId);
if (session === undefined) return;
hostSshSessions.delete(message.sessionId);
session.proc.kill("SIGTERM");
setTimeout(() => {
try {
session.proc.kill("SIGKILL");
} catch {
/* process may already be gone */
}
}, 1000);
}
function safeDockerName(value: string): string {
return value.replace(/[^a-zA-Z0-9_.-]/g, "-").slice(0, 80);
}
@@ -603,6 +926,15 @@ async function handleDispatch(message: CoreDispatchMessage): Promise<void> {
await sendTaskStatus(message.taskId, "succeeded", "provider upgrade command completed", result);
return;
}
if (message.command === "host.ssh") {
const result = await runHostSsh(message.payload);
if ((result as { ok?: unknown }).ok !== true) {
await sendTaskStatus(message.taskId, "failed", "host SSH command failed", result);
return;
}
await sendTaskStatus(message.taskId, "succeeded", "host SSH command completed", result);
return;
}
await sendTaskStatus(message.taskId, "succeeded", "echo completed", { echo: message.payload });
} catch (error) {
const text = error instanceof Error ? `${error.name}: ${error.message}` : String(error);
@@ -618,6 +950,26 @@ function handleMessage(raw: MessageEvent<string>): void {
handleDispatch(parsed as CoreDispatchMessage).catch((error) => logger("error", "dispatch_handler_failed", { error: String(error) }));
return;
}
if (parsed.type === "host_ssh_open") {
startHostSshSession(parsed as CoreHostSshOpenMessage);
return;
}
if (parsed.type === "host_ssh_input") {
writeHostSshInput(parsed as CoreHostSshInputMessage);
return;
}
if (parsed.type === "host_ssh_resize") {
resizeHostSshSession(parsed as CoreHostSshResizeMessage);
return;
}
if (parsed.type === "host_ssh_eof") {
eofHostSshSession(parsed as CoreHostSshEofMessage);
return;
}
if (parsed.type === "host_ssh_close") {
closeHostSshSession(parsed as CoreHostSshCloseMessage);
return;
}
logger("debug", "core_message", parsed as JsonValue);
} catch (error) {
logger("error", "core_message_parse_failed", { error: String(error) });
@@ -676,6 +1028,11 @@ process.on("SIGTERM", () => {
if (heartbeatTimer !== null) clearInterval(heartbeatTimer);
if (systemStatusTimer !== null) clearInterval(systemStatusTimer);
if (dockerStatusTimer !== null) clearInterval(dockerStatusTimer);
for (const [sessionId, session] of hostSshSessions) {
logger("warn", "host_ssh_session_terminated_by_shutdown", { sessionId });
session.proc.kill("SIGTERM");
}
hostSshSessions.clear();
socket?.close(1000, "provider shutdown");
process.exit(0);
});
+98 -4
View File
@@ -106,13 +106,48 @@ export interface ProviderTaskStatusMessage {
result?: JsonValue;
}
export type ProviderDispatchCommand = "docker.ps" | "provider.upgrade" | "host.ssh" | "echo";
export interface CoreDispatchMessage {
type: "dispatch";
taskId: string;
command: "docker.ps" | "provider.upgrade" | "echo";
command: ProviderDispatchCommand;
payload: Record<string, JsonValue>;
}
export interface CoreHostSshOpenMessage {
type: "host_ssh_open";
sessionId: string;
cwd?: string;
command?: string;
cols?: number;
rows?: number;
}
export interface CoreHostSshInputMessage {
type: "host_ssh_input";
sessionId: string;
data: string;
encoding: "base64";
}
export interface CoreHostSshResizeMessage {
type: "host_ssh_resize";
sessionId: string;
cols: number;
rows: number;
}
export interface CoreHostSshCloseMessage {
type: "host_ssh_close";
sessionId: string;
}
export interface CoreHostSshEofMessage {
type: "host_ssh_eof";
sessionId: string;
}
export interface CoreAcknowledgeMessage {
type: "ack";
requestId: string;
@@ -120,14 +155,59 @@ export interface CoreAcknowledgeMessage {
message: string;
}
export interface ProviderHostSshOpenedMessage {
type: "host_ssh_opened";
providerId: string;
sessionId: string;
at: string;
}
export interface ProviderHostSshDataMessage {
type: "host_ssh_data";
providerId: string;
sessionId: string;
stream: "stdout" | "stderr";
data: string;
encoding: "base64";
at: string;
}
export interface ProviderHostSshExitMessage {
type: "host_ssh_exit";
providerId: string;
sessionId: string;
exitCode: number | null;
signal: string | null;
at: string;
}
export interface ProviderHostSshErrorMessage {
type: "host_ssh_error";
providerId: string;
sessionId: string;
message: string;
at: string;
}
export type ProviderToCoreMessage =
| ProviderRegisterMessage
| ProviderHeartbeatMessage
| ProviderSystemStatusMessage
| ProviderDockerStatusMessage
| ProviderTaskStatusMessage;
| ProviderTaskStatusMessage
| ProviderHostSshOpenedMessage
| ProviderHostSshDataMessage
| ProviderHostSshExitMessage
| ProviderHostSshErrorMessage;
export type CoreToProviderMessage = CoreDispatchMessage | CoreAcknowledgeMessage;
export type CoreToProviderMessage =
| CoreDispatchMessage
| CoreHostSshOpenMessage
| CoreHostSshInputMessage
| CoreHostSshResizeMessage
| CoreHostSshCloseMessage
| CoreHostSshEofMessage
| CoreAcknowledgeMessage;
export interface ApiNode {
providerId: string;
@@ -190,11 +270,25 @@ export function parseJsonObject(value: string, name: string): Record<string, Jso
return parsed as Record<string, JsonValue>;
}
export function isProviderDispatchCommand(value: unknown): value is ProviderDispatchCommand {
return value === "docker.ps" || value === "provider.upgrade" || value === "host.ssh" || value === "echo";
}
export function isProviderToCoreMessage(value: unknown): value is ProviderToCoreMessage {
if (typeof value !== "object" || value === null || !("type" in value)) return false;
const msg = value as { type?: unknown; providerId?: unknown };
return (
(msg.type === "register" || msg.type === "heartbeat" || msg.type === "system_status" || msg.type === "docker_status" || msg.type === "task_status") &&
(
msg.type === "register" ||
msg.type === "heartbeat" ||
msg.type === "system_status" ||
msg.type === "docker_status" ||
msg.type === "task_status" ||
msg.type === "host_ssh_opened" ||
msg.type === "host_ssh_data" ||
msg.type === "host_ssh_exit" ||
msg.type === "host_ssh_error"
) &&
typeof msg.providerId === "string" &&
msg.providerId.length > 0
);