From 28cc2af1213c6bbda636a5fd10393bdc70120560 Mon Sep 17 00:00:00 2001 From: Codex Date: Sat, 16 May 2026 16:03:53 +0000 Subject: [PATCH] feat: add provider websocket http data plane --- bun.lock | 22 ++ docs/reference/cli.md | 2 + docs/reference/microservices.md | 2 +- docs/reference/provider-gateway.md | 4 +- package.json | 4 +- scripts/cli.ts | 7 + scripts/src/network-perf.ts | 196 ++++++++++++++++++ scripts/src/remote.ts | 15 +- src/components/backend-core/src/context.ts | 2 + src/components/backend-core/src/index.ts | 6 + .../backend-core/src/microservice-proxy.ts | 91 +++++++- .../backend-core/src/provider-registry.ts | 16 ++ src/components/backend-core/src/types.ts | 7 + .../microservices/code-queue/Dockerfile | 4 + .../k3sctl-adapter/docker-compose.d601.yml | 5 + .../microservices/k3sctl-adapter/src/index.ts | 120 ++++++++++- src/components/provider-gateway/package.json | 2 +- src/components/provider-gateway/src/index.ts | 40 +++- src/components/shared/src/index.ts | 22 +- 19 files changed, 545 insertions(+), 22 deletions(-) create mode 100644 scripts/src/network-perf.ts diff --git a/bun.lock b/bun.lock index f119ac77..ad3dfd39 100644 --- a/bun.lock +++ b/bun.lock @@ -8,7 +8,9 @@ "@types/bun": "latest", "@types/node": "latest", "playwright": "^1.59.1", + "postgres": "^3.4.9", "typescript": "latest", + "xlsx": "^0.18.5", }, }, }, @@ -17,16 +19,36 @@ "@types/node": ["@types/node@25.6.0", "", { "dependencies": { "undici-types": "~7.19.0" } }, "sha512-+qIYRKdNYJwY3vRCZMdJbPLJAtGjQBudzZzdzwQYkEPQd+PJGixUL5QfvCLDaULoLv+RhT3LDkwEfKaAkgSmNQ=="], + "adler-32": ["adler-32@1.3.1", "", {}, "sha512-ynZ4w/nUUv5rrsR8UUGoe1VC9hZj6V5hU9Qw1HlMDJGEJw5S7TfTErWTjMys6M7vr0YWcPqs3qAr4ss0nDfP+A=="], + "bun-types": ["bun-types@1.3.13", "", { "dependencies": { "@types/node": "*" } }, "sha512-QXKeHLlOLqQX9LgYaHJfzdBaV21T63HhFJnvuRCcjZiaUDpbs5ED1MgxbMra71CsryN/1dAoXuJJJwIv/2drVA=="], + "cfb": ["cfb@1.2.2", "", { "dependencies": { "adler-32": "~1.3.0", "crc-32": "~1.2.0" } }, "sha512-KfdUZsSOw19/ObEWasvBP/Ac4reZvAGauZhs6S/gqNhXhI7cKwvlH7ulj+dOEYnca4bm4SGo8C1bTAQvnTjgQA=="], + + "codepage": ["codepage@1.15.0", "", {}, "sha512-3g6NUTPd/YtuuGrhMnOMRjFc+LJw/bnMp3+0r/Wcz3IXUuCosKRJvMphm5+Q+bvTVGcJJuRvVLuYba+WojaFaA=="], + + "crc-32": ["crc-32@1.2.2", "", { "bin": { "crc32": "bin/crc32.njs" } }, "sha512-ROmzCKrTnOwybPcJApAA6WBWij23HVfGVNKqqrZpuyZOHqK2CwHSvpGuyt/UNNvaIjEd8X5IFGp4Mh+Ie1IHJQ=="], + + "frac": ["frac@1.1.2", "", {}, "sha512-w/XBfkibaTl3YDqASwfDUqkna4Z2p9cFSr1aHDt0WoMTECnRfBOv2WArlZILlqgWlmdIlALXGpM2AOhEk5W3IA=="], + "fsevents": ["fsevents@2.3.2", "", { "os": "darwin" }, "sha512-xiqMQR4xAeHTuB9uWm+fFRcIOgKBMiOBP+eXiyT7jsgVCq1bkVygt00oASowB7EdtpOHaaPgKt812P9ab+DDKA=="], "playwright": ["playwright@1.59.1", "", { "dependencies": { "playwright-core": "1.59.1" }, "optionalDependencies": { "fsevents": "2.3.2" }, "bin": { "playwright": "cli.js" } }, "sha512-C8oWjPR3F81yljW9o5OxcWzfh6avkVwDD2VYdwIGqTkl+OGFISgypqzfu7dOe4QNLL2aqcWBmI3PMtLIK233lw=="], "playwright-core": ["playwright-core@1.59.1", "", { "bin": { "playwright-core": "cli.js" } }, "sha512-HBV/RJg81z5BiiZ9yPzIiClYV/QMsDCKUyogwH9p3MCP6IYjUFu/MActgYAvK0oWyV9NlwM3GLBjADyWgydVyg=="], + "postgres": ["postgres@3.4.9", "", {}, "sha512-GD3qdB0x1z9xgFI6cdRD6xu2Sp2WCOEoe3mtnyB5Ee0XrrL5Pe+e4CCnJrRMnL1zYtRDZmQQVbvOttLnKDLnaw=="], + + "ssf": ["ssf@0.11.2", "", { "dependencies": { "frac": "~1.1.2" } }, "sha512-+idbmIXoYET47hH+d7dfm2epdOMUDjqcB4648sTZ+t2JwoyBFL/insLfB/racrDmsKB3diwsDA696pZMieAC5g=="], + "typescript": ["typescript@6.0.3", "", { "bin": { "tsc": "bin/tsc", "tsserver": "bin/tsserver" } }, "sha512-y2TvuxSZPDyQakkFRPZHKFm+KKVqIisdg9/CZwm9ftvKXLP8NRWj38/ODjNbr43SsoXqNuAisEf1GdCxqWcdBw=="], "undici-types": ["undici-types@7.19.2", "", {}, "sha512-qYVnV5OEm2AW8cJMCpdV20CDyaN3g0AjDlOGf1OW4iaDEx8MwdtChUp4zu4H0VP3nDRF/8RKWH+IPp9uW0YGZg=="], + + "wmf": ["wmf@1.0.2", "", {}, "sha512-/p9K7bEh0Dj6WbXg4JG0xvLQmIadrner1bi45VMJTfnbVHsc7yIajZyoSoK60/dtVBs12Fm6WkUI5/3WAVsNMw=="], + + "word": ["word@0.3.0", "", {}, "sha512-OELeY0Q61OXpdUfTp+oweA/vtLVg5VDOXh+3he3PNzLGG/y0oylSOC1xRVj0+l4vQ3tj/bB1HVHv1ocXkQceFA=="], + + "xlsx": ["xlsx@0.18.5", "", { "dependencies": { "adler-32": "~1.3.0", "cfb": "~1.2.1", "codepage": "~1.15.0", "crc-32": "~1.2.1", "ssf": "~0.11.2", "wmf": "~1.0.1", "word": "~0.3.0" }, "bin": { "xlsx": "bin/xlsx.njs" } }, "sha512-dmg3LCjBPHZnQp5/F/+nnTa+miPJxUXB6vtk42YjBBKayDNagxGEeIdWApkYPOf3Z3pm3k62Knjzp7lMeTEtFQ=="], } } diff --git a/docs/reference/cli.md b/docs/reference/cli.md index 7aa16b03..94d1d5fd 100644 --- a/docs/reference/cli.md +++ b/docs/reference/cli.md @@ -44,6 +44,8 @@ UniDesk 的统一 CLI 入口是根目录 `scripts/cli.ts`,运行方式固定 `microservice proxy` 是面向人工验证的私有后端读取入口。正式写入型用户服务操作由 frontend 同源代理或 E2E 直接调用 backend-core 完成,并由 config 中的 `allowedMethods` 限制;CLI `proxy` 默认仍作为 GET/HEAD 读取验证入口,必要时可显式加 `--method POST|PUT|PATCH|DELETE` 调用无需自定义请求体的受控调试/自测端点,例如 `bun scripts/cli.ts microservice proxy baidu-netdisk /api/self-test --method POST --raw`。为了避免 Pipeline snapshot 这类超大业务 JSON 造成 CLI 输出爆炸,响应 body 超过默认阈值时会返回 `bodyOmitted=true`、`bodyPreview`、`bodyBytes` 和 `rawHint`;需要完整 body 时显式添加 `--raw`,或用 `--max-body-bytes ` 调整预览阈值。正式 frontend 展示仍应优先使用业务控件和 `__unideskArrayLimit` 这类展示级裁剪参数,而不是默认倾倒完整 JSON。 +`network perf` 用于生成组网性能前后对比数据。标准 Code Queue overview 读路径基准命令是 `bun scripts/cli.ts network perf --service code-queue --path /api/tasks/overview?limit=30 --count 30 --concurrency 1 --label before`,远程主 server 可用 `bun scripts/cli.ts --main-server-ip 74.48.78.17 network perf ...`。输出包含成功/失败数、状态码分布、`x-unidesk-cache`、`x-unidesk-proxy-mode`、`x-unidesk-upstream-proxy-mode` 分布和 min/p50/p90/p95/max;provider-gateway 长连接数据面验收应看到 `proxyModeCounts.provider-ws-http-tunnel`,adapter native Service 数据面验收应看到 upstream proxy mode 为 `kubernetes-native-service`,若出现 `kubernetes-api-service-proxy` 必须结合 `/api/control-plane.nativeServiceProxy.failedServices` 解释 fallback 原因。 + ## Debug Contract `debug` 子命令必须复用真实模块与真实端点,禁止维护平行实现。`debug health` 会摘要展示 `/api/nodes/system-status` 和 `/api/nodes/docker-status`,避免输出完整快照造成信息爆炸。`debug dispatch` 会在 backend-core 容器内调用内部 `/api/dispatch`,core 再通过 WebSocket 将 `docker.ps`、`provider.upgrade`、`host.ssh`、`microservice.http` 或 `echo` 任务下发给 provider gateway,因此它可以验证核心调度闭环,同时不需要公开 core REST API。`provider.upgrade` 默认使用 `mode: "plan"` 预检;需要验证一键升级时必须显式加 `--mode schedule`,并通过 `--wait-ms` 或 `debug task` 确认任务进入 `succeeded`、result 中包含 updater 容器信息和 `policy: "always-enabled"`。`host.ssh` 默认使用 `mode: "probe"` 做短超时维护桥自检;需要执行明确命令时使用 `--ssh-command` 进入 `mode: "exec"`,并配合 `--wait-ms` 和 `debug task` 查看 stdout、stderr、exitCode 与 probeLine。`microservice.http` 只用于开发调试 provider-gateway 私有 HTTP 代理,正式用户入口应使用 `microservice` CLI 或 frontend 的用户服务页面。 diff --git a/docs/reference/microservices.md b/docs/reference/microservices.md index c2583025..7bf61175 100644 --- a/docs/reference/microservices.md +++ b/docs/reference/microservices.md @@ -181,7 +181,7 @@ Baidu Netdisk 在 UniDesk 语境中按纯后端服务管理:不得暴露百度 - ClaudeQQ 通知:Code Queue 在 D601 上通过 `CODE_QUEUE_NOTIFY_CLAUDEQQ_BASE_URL=http://host.docker.internal:3290` 直接调用本机 ClaudeQQ 后端 `POST /api/push/text`,在每个任务进入 `succeeded`、`failed` 或 `canceled` 终态后向配置目标发送最终 response,并附带 task id、queue、状态、模型、attempt、当前 running/queued/retry_wait 数和任务总耗时;当所有 queue 进入 `0 running / 0 queued` 空闲态时,必须单独发送一次空闲提醒。通知由 `CODE_QUEUE_NOTIFY_CLAUDEQQ_ENABLED` 控制,目标由 `CODE_QUEUE_NOTIFY_CLAUDEQQ_TARGET_TYPE=private|group`、`CODE_QUEUE_NOTIFY_CLAUDEQQ_USER_ID`、`CODE_QUEUE_NOTIFY_CLAUDEQQ_GROUP_ID` 配置,默认私聊 `645275593`;代理基址、最终 response 最大字符数、单次超时和发送尝试次数分别由 `CODE_QUEUE_NOTIFY_CLAUDEQQ_BASE_URL`、`CODE_QUEUE_NOTIFY_CLAUDEQQ_MAX_RESPONSE_CHARS`、`CODE_QUEUE_NOTIFY_CLAUDEQQ_TIMEOUT_MS` 和 `CODE_QUEUE_NOTIFY_CLAUDEQQ_SEND_ATTEMPTS` 配置。任务终态和队列空闲通知必须先写入 PostgreSQL outbox 表 `unidesk_code_queue_notifications` 再异步发送;不得使用 `.state/code-queue/claudeqq-notifications.json`、`CODE_QUEUE_NOTIFY_CLAUDEQQ_OUTBOX_PATH` 或任何本地 JSON 作为通知权威存储。发送失败、NapCat 离线、代理 502 或容器重启时不能丢通知,必须按 `CODE_QUEUE_NOTIFY_CLAUDEQQ_RETRY_INTERVAL_MS` 指数退避重试并跨进程/容器重启保留。`/health` 的 `queue.notifications.claudeqq` 必须暴露非敏感配置、目标配置状态和 PostgreSQL outbox 统计;`GET /api/notifications/claudeqq` 返回 outbox 明细,`POST /api/notifications/claudeqq/drain` 手动触发发送,`POST /api/notifications/claudeqq/backfill` 可按 `since` 补入某次故障窗口内已终态任务,确保 QQ/NapCat 超时或离线不会让任务完成通知永久丢失。 - OA 接入:Code Queue 后端通过 D601 env-file 中的 `OA_EVENT_FLOW_BASE_URL` 指向主 server OA Event Flow 受限端口映射,发布每个 TraceView 可见执行行的 `trace-step-created`、幂等种子/乱序校正用 `trace-stats-snapshot`、`task-updated` 和 queue 事件;服务启动或手动 backfill 时必须用相同 `eventId` 幂等回放历史 TraceView 可见执行行,避免历史任务停留在旧 STEP 统计口径。前端通过 `oa-event-flow` 的 `service:code-queue` tag stream 更新 STEP 和 Trace Summary,Code Queue 私有 SSE 不再作为刷新权威。`STEP` 表示 TraceView 可见且非 system 的执行行数;system 行可保留在任务原始输出/数据库中,但默认不展示、不计入 STEP,工具调用数必须由 `readCount+editCount+runCount` 展示,不能复用 `stepCount`。 - 代理路径:只允许 `/health`、`/logs` 和 `/api/` 前缀;允许方法为 `GET`、`HEAD`、`POST`、`DELETE`、`PATCH`。Code Queue 只在 Compose 内网暴露 `4222/tcp`,不得映射或开放到公网。 -- UniDesk 前端:`用户服务 / Code Queue` React 页面负责展示队列卡片、任务 ID、复制任务 ID、引用按钮、任务耗时、默认模型、模型下拉、执行 Provider 下拉、执行模式下拉、Provider/模式对应默认工作目录、显式入队份数、引用任务 ID、清空输入、创建成功提示、MiniMax judge 状态、Codex CLI-like 输出流、attempt 终态、追加 prompt、打断和手动重试控件;选择 `windows-native` 时应优先切到支持 Windows 原生 Codex 的非主 server Provider,并把工作目录提示切到 `/mnt/` 默认路径;整个 agent loop 消息流统一命名为专有名词 `Trace`,`Trace` 包含 assistant message、user prompt、system event 和 tool call;Code Queue 与 Pipeline/OpenCode messages 必须共用 `src/components/frontend/src/trace.tsx` 的 Trace 公共组件、统一 Trace item 接口和 codex/opencode port 适配层;连续 read/edit/run 工具调用只是在 Trace 内折叠为可展开工具调用组,汇总格式至少包含 `xx read, xx edit, xx run`,并展示读取文件、编辑文件、运行命令和耗时摘要;最近 3 个工具调用保持展开,工具调用内容不得自动换行且必须在工具调用块内部横向滚动,工具调用组展开后不得再增加额外左侧缩进;message 与 prompt 必须自动换行,普通 message 不显示左侧项目符号缩进且永不折叠;点击队列卡片引用按钮必须自动把该任务 ID 写入提交表单的引用任务 ID 输入框;引用任务 ID 创建新任务时必须自动注入 `bun scripts/cli.ts codex task ` 的提示,让 Codex 读取初始 prompt、最后消息和工具摘要后继续;连续执行同一 prompt 应使用 `入队份数` 一次性生成多条队列任务,而不是依赖快速连点按钮;左侧 queue/session 卡片的 `QUEUED` 状态必须显示原因,例如 `QUEUED(PREV TASK)`、`QUEUED(MEM LIMIT)`、`QUEUED(ACTIVE LIMIT)`;原始任务 JSON 只能通过显式 `查看原始JSON` 打开。 +- UniDesk 前端:`用户服务 / Code Queue` React 页面负责展示队列卡片、任务 ID、复制任务 ID、引用按钮、任务耗时、默认模型、模型下拉、执行 Provider 下拉、执行模式下拉、Provider/模式对应默认工作目录、工作目录下拉菜单、新建工作目录选项、删除已保存工作目录选项、显式入队份数、引用任务 ID、清空输入、创建成功提示、MiniMax judge 状态、Codex CLI-like 输出流、attempt 终态、追加 prompt、打断和手动重试控件;工作目录选项由 Code Queue 后端持久化到 PostgreSQL,前端不得用 localStorage/sessionStorage/IndexedDB 保存,删除只移除下拉菜单选项,不递归删除磁盘项目目录;选择 `windows-native` 时应优先切到支持 Windows 原生 Codex 的非主 server Provider,并把工作目录提示切到 `/mnt/` 默认路径;整个 agent loop 消息流统一命名为专有名词 `Trace`,`Trace` 包含 assistant message、user prompt、system event 和 tool call;Code Queue 与 Pipeline/OpenCode messages 必须共用 `src/components/frontend/src/trace.tsx` 的 Trace 公共组件、统一 Trace item 接口和 codex/opencode port 适配层;连续 read/edit/run 工具调用只是在 Trace 内折叠为可展开工具调用组,汇总格式至少包含 `xx read, xx edit, xx run`,并展示读取文件、编辑文件、运行命令和耗时摘要;最近 3 个工具调用保持展开,工具调用内容不得自动换行且必须在工具调用块内部横向滚动,工具调用组展开后不得再增加额外左侧缩进;message 与 prompt 必须自动换行,普通 message 不显示左侧项目符号缩进且永不折叠;点击队列卡片引用按钮必须自动把该任务 ID 写入提交表单的引用任务 ID 输入框;引用任务 ID 创建新任务时必须自动注入 `bun scripts/cli.ts codex task ` 的提示,让 Codex 读取初始 prompt、最后消息和工具摘要后继续;连续执行同一 prompt 应使用 `入队份数` 一次性生成多条队列任务,而不是依赖快速连点按钮;左侧 queue/session 卡片的 `QUEUED` 状态必须显示原因,例如 `QUEUED(PREV TASK)`、`QUEUED(MEM LIMIT)`、`QUEUED(ACTIVE LIMIT)`;原始任务 JSON 只能通过显式 `查看原始JSON` 打开。 ### MDTODO k3s-Managed diff --git a/docs/reference/provider-gateway.md b/docs/reference/provider-gateway.md index 69a12e07..ba24bb1f 100644 --- a/docs/reference/provider-gateway.md +++ b/docs/reference/provider-gateway.md @@ -86,7 +86,9 @@ provider ingress 是唯一允许公网暴露的 provider 连接接口,当前 ## User Service HTTP Proxy -`microservice.http` 是 provider-gateway 给 `deployment.mode=unidesk-direct` 用户服务使用的私有后端访问能力。backend-core 通过真实 WebSocket dispatch 下发目标 service id、节点本机 `targetBaseUrl`、path、query、method、request body、timeout 和可选 JSON 数组裁剪参数;provider-gateway 支持 `GET`、`HEAD`、`POST`、`PUT`、`PATCH`、`DELETE`,但最终允许方法必须由每个用户服务的 `backend.allowedMethods` 显式配置。provider-gateway 只允许访问 `http://127.0.0.1`、`http://localhost`、`http://host.docker.internal` 这些节点本地地址;主 server 内置 Todo Note 后端可使用 Compose 服务名 `http://todo-note:4211`。`deployment.mode=k3sctl-managed` 的 Code Queue 不得通过 provider-gateway `microservice.http` 直连业务容器,正式路径只能是 backend-core -> `k3sctl-adapter` -> Kubernetes API service proxy -> k3s/k8s Service。该能力不打开 provider-gateway 入站端口,也不替代业务仓库自身 Dockerfile/docker-compose。 +`microservice.http` 是 provider-gateway 给 `deployment.mode=unidesk-direct` 用户服务使用的私有后端访问能力。新 provider 必须同时声明 `microservice.http.tunnel`:backend-core 对 UI 高频读请求优先复用既有 provider WebSocket 发送 `http_tunnel_request` 并等待 `http_tunnel_response`,不再为每个轮询创建 `unidesk_tasks` 调度记录;旧 provider 未声明该能力时才回落到原 `dispatch` 任务路径。响应头会标记 `x-unidesk-proxy-mode=provider-ws-http-tunnel` 或旧 `provider-task`,用于性能验收。 + +backend-core 下发目标 service id、节点本机 `targetBaseUrl`、path、query、method、request body、timeout 和可选 JSON 数组裁剪参数;provider-gateway 支持 `GET`、`HEAD`、`POST`、`PUT`、`PATCH`、`DELETE`,但最终允许方法必须由每个用户服务的 `backend.allowedMethods` 显式配置。provider-gateway 只允许访问 `http://127.0.0.1`、`http://localhost`、`http://host.docker.internal` 这些节点本地地址;主 server 内置 Todo Note 后端可使用 Compose 服务名 `http://todo-note:4211`。`deployment.mode=k3sctl-managed` 的 Code Queue 不得通过 provider-gateway 直连业务容器,正式路径只能是 backend-core -> provider WebSocket HTTP tunnel -> `k3sctl-adapter` -> Kubernetes native Service/DNS,必要时显式 fallback 到 Kubernetes API service proxy -> k3s/k8s Service。该能力不打开 provider-gateway 入站端口,也不替代业务仓库自身 Dockerfile/docker-compose。 超大 JSON 响应可以使用 `jsonArrayLimits` 在 provider-gateway 返回前裁剪指定数组,并在响应体中写入 `_unidesk.arrayLimits` 元数据,便于 UniDesk frontend 预览列表而不展示裸 JSON。长期应优先推动业务后端提供分页 API;裁剪只是 UniDesk 集成层的展示保护。 diff --git a/package.json b/package.json index ce121d6f..472fb9ff 100644 --- a/package.json +++ b/package.json @@ -12,6 +12,8 @@ "@types/bun": "latest", "@types/node": "latest", "playwright": "^1.59.1", - "typescript": "latest" + "postgres": "^3.4.9", + "typescript": "latest", + "xlsx": "^0.18.5" } } diff --git a/scripts/cli.ts b/scripts/cli.ts index 8b562c9d..664ba8af 100644 --- a/scripts/cli.ts +++ b/scripts/cli.ts @@ -12,6 +12,7 @@ import { runCodeQueueCommand } from "./src/code-queue"; import { runCodeQueueDeployCompatCommand, runDeployCommand } from "./src/deploy"; import { runProviderCommand } from "./src/provider-attach"; import { runScheduleCommand } from "./src/schedules"; +import { parseNetworkPerfOptions, runNetworkPerf } from "./src/network-perf"; const remoteOptions = extractRemoteCliOptions(process.argv.slice(2)); const args = remoteOptions.args; @@ -56,6 +57,7 @@ function help(): unknown { { command: "debug health", description: "Probe internal core, nodes, system/Docker status, frontend, provider ingress, and public boundary." }, { command: "debug dispatch [providerId] [docker.ps|provider.upgrade|host.ssh|microservice.http|echo] [--wait-ms N]", description: "Submit a real internal-core dispatch request for CLI debugging." }, { command: "debug task ", description: "Read a dispatched task record from internal core for CLI debugging." }, + { command: "network perf [--service code-queue --path /api/tasks/overview?limit=30 --count N --concurrency N --label before|after]", description: "Benchmark frontend -> backend-core -> provider/adapter user-service networking and report latency/proxy-mode distributions." }, { command: "e2e run [--only pattern[,pattern...]] [--skip pattern[,pattern...]]", description: "Run selected public/internal/Playwright E2E checks; use --only for focused iteration and rerun without filters for final regression." }, ], }; @@ -240,6 +242,11 @@ async function main(): Promise { } } + if (top === "network" && sub === "perf") { + emitJson(commandName, await runNetworkPerf(parseNetworkPerfOptions(config, args.slice(2)))); + return; + } + if (top === "e2e" && sub === "run") { const result = await runE2E(config, parseE2ERunOptions(args.slice(2))); const ok = (result as { ok?: unknown }).ok === true; diff --git a/scripts/src/network-perf.ts b/scripts/src/network-perf.ts new file mode 100644 index 00000000..78aec599 --- /dev/null +++ b/scripts/src/network-perf.ts @@ -0,0 +1,196 @@ +import { type UniDeskConfig } from "./config"; + +export interface NetworkPerfOptions { + baseUrl: string; + username: string; + password: string; + serviceId: string; + path: string; + count: number; + concurrency: number; + timeoutMs: number; + cacheBust: boolean; + label: string; +} + +interface NetworkPerfSample { + index: number; + ok: boolean; + status: number; + durationMs: number; + cache: string; + proxyMode: string; + upstreamProxyMode: string; + responseTruncated: string; + bytes: number; + error: string | null; +} + +function argValue(args: string[], name: string): string | null { + const index = args.indexOf(name); + if (index === -1) return null; + const value = args[index + 1]; + if (value === undefined || value.startsWith("--")) throw new Error(`${name} requires a value`); + return value; +} + +function numberArg(args: string[], name: string, fallback: number): number { + const raw = argValue(args, name); + if (raw === null) return fallback; + const value = Number(raw); + if (!Number.isInteger(value) || value <= 0) throw new Error(`${name} must be a positive integer`); + return value; +} + +function normalizeBaseUrl(value: string): string { + return value.replace(/\/+$/u, ""); +} + +function frontendBaseUrl(host: string, config: UniDeskConfig): string { + if (host.startsWith("http://") || host.startsWith("https://")) return normalizeBaseUrl(host); + if (/:\d+$/u.test(host)) return `http://${host}`; + return `http://${host}:${config.network.frontend.port}`; +} + +export function parseNetworkPerfOptions(config: UniDeskConfig, args: string[], fallbackHost?: string): NetworkPerfOptions { + const baseUrl = argValue(args, "--url") ?? (fallbackHost === undefined + ? `http://${config.network.publicHost}:${config.network.frontend.port}` + : frontendBaseUrl(fallbackHost, config)); + return { + baseUrl: normalizeBaseUrl(baseUrl), + username: argValue(args, "--username") ?? config.auth.username, + password: argValue(args, "--password") ?? config.auth.password, + serviceId: argValue(args, "--service") ?? argValue(args, "--service-id") ?? "code-queue", + path: argValue(args, "--path") ?? "/api/tasks/overview?limit=30", + count: numberArg(args, "--count", 20), + concurrency: numberArg(args, "--concurrency", 1), + timeoutMs: numberArg(args, "--timeout-ms", 45_000), + cacheBust: !args.includes("--no-cache-bust"), + label: argValue(args, "--label") ?? "network-perf", + }; +} + +async function login(options: NetworkPerfOptions): Promise { + const response = await fetch(`${options.baseUrl}/login`, { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify({ username: options.username, password: options.password }), + signal: AbortSignal.timeout(options.timeoutMs), + }); + const text = await response.text(); + if (!response.ok) throw new Error(`login failed: HTTP ${response.status} ${text.slice(0, 300)}`); + const cookie = response.headers.get("set-cookie")?.split(";", 1)[0] ?? ""; + if (cookie.length === 0) throw new Error("login response did not set a session cookie"); + return cookie; +} + +function sampleUrl(options: NetworkPerfOptions, index: number): string { + const [pathOnly = "/", query = ""] = options.path.split("?", 2); + const params = new URLSearchParams(query); + if (options.cacheBust) params.set("__networkPerf", `${Date.now()}-${index}`); + const search = params.toString(); + return `${options.baseUrl}/api/microservices/${encodeURIComponent(options.serviceId)}/proxy${pathOnly}${search.length > 0 ? `?${search}` : ""}`; +} + +async function runOne(options: NetworkPerfOptions, cookie: string, index: number): Promise { + const started = performance.now(); + try { + const response = await fetch(sampleUrl(options, index), { + headers: { + accept: "application/json", + cookie, + }, + signal: AbortSignal.timeout(options.timeoutMs), + }); + const bodyText = await response.text(); + return { + index, + ok: response.ok, + status: response.status, + durationMs: Math.round((performance.now() - started) * 10) / 10, + cache: response.headers.get("x-unidesk-cache") ?? "", + proxyMode: response.headers.get("x-unidesk-proxy-mode") ?? "", + upstreamProxyMode: response.headers.get("x-unidesk-upstream-proxy-mode") ?? "", + responseTruncated: response.headers.get("x-unidesk-response-truncated") ?? "", + bytes: bodyText.length, + error: null, + }; + } catch (error) { + return { + index, + ok: false, + status: 0, + durationMs: Math.round((performance.now() - started) * 10) / 10, + cache: "", + proxyMode: "", + upstreamProxyMode: "", + responseTruncated: "", + bytes: 0, + error: error instanceof Error ? error.message : String(error), + }; + } +} + +function percentile(values: number[], percentileValue: number): number { + if (values.length === 0) return 0; + const sorted = values.slice().sort((left, right) => left - right); + if (percentileValue <= 0) return sorted[0] ?? 0; + if (percentileValue >= 100) return sorted[sorted.length - 1] ?? 0; + const index = Math.min(sorted.length - 1, Math.max(0, Math.ceil((percentileValue / 100) * sorted.length) - 1)); + return sorted[index] ?? 0; +} + +function countBy(samples: NetworkPerfSample[], key: keyof NetworkPerfSample): Record { + const counts: Record = {}; + for (const sample of samples) { + const value = String(sample[key] ?? ""); + counts[value] = (counts[value] ?? 0) + 1; + } + return counts; +} + +export async function runNetworkPerf(options: NetworkPerfOptions): Promise> { + const cookie = await login(options); + const samples: NetworkPerfSample[] = []; + let nextIndex = 0; + const workers = Array.from({ length: Math.min(options.concurrency, options.count) }, async () => { + while (nextIndex < options.count) { + const index = nextIndex; + nextIndex += 1; + samples.push(await runOne(options, cookie, index)); + } + }); + const startedAt = Date.now(); + await Promise.all(workers); + samples.sort((left, right) => left.index - right.index); + const durations = samples.map((sample) => sample.durationMs); + const successfulDurations = samples.filter((sample) => sample.ok).map((sample) => sample.durationMs); + return { + ok: samples.every((sample) => sample.ok), + label: options.label, + measuredAt: new Date(startedAt).toISOString(), + baseUrl: options.baseUrl, + serviceId: options.serviceId, + path: options.path, + count: options.count, + concurrency: options.concurrency, + timeoutMs: options.timeoutMs, + cacheBust: options.cacheBust, + successCount: samples.filter((sample) => sample.ok).length, + failureCount: samples.filter((sample) => !sample.ok).length, + statusCounts: countBy(samples, "status"), + cacheCounts: countBy(samples, "cache"), + proxyModeCounts: countBy(samples, "proxyMode"), + upstreamProxyModeCounts: countBy(samples, "upstreamProxyMode"), + durationMs: { + min: percentile(durations, 0), + p50: percentile(durations, 50), + p90: percentile(durations, 90), + p95: percentile(durations, 95), + max: percentile(durations, 100), + successfulP50: percentile(successfulDurations, 50), + successfulP95: percentile(successfulDurations, 95), + }, + samples, + }; +} diff --git a/scripts/src/remote.ts b/scripts/src/remote.ts index bd26deb7..55843035 100644 --- a/scripts/src/remote.ts +++ b/scripts/src/remote.ts @@ -2,6 +2,7 @@ import { spawn } from "node:child_process"; import { type UniDeskConfig } from "./config"; import { type DebugDispatchCommand, isDebugDispatchCommand } from "./debug"; import { summarizeMicroserviceProxyResponse } from "./microservices"; +import { parseNetworkPerfOptions, runNetworkPerf } from "./network-perf"; import { isSshSkillDiscoveryArgs, parseSshArgs } from "./ssh"; import { codexJudgeQueryAsync, codexOutputQueryAsync, codexTaskQueryAsync } from "./code-queue"; @@ -495,6 +496,14 @@ async function remoteCodeQueue(session: FrontendSession, args: string[]): Promis }; } +async function remoteNetworkPerf(options: RemoteCliOptions, config: UniDeskConfig, args: string[]): Promise { + if (options.host === null) throw new Error("network perf requires --main-server-ip when using remote frontend transport"); + return { + transport: "frontend", + result: await runNetworkPerf(parseNetworkPerfOptions(config, args.slice(2), options.host)), + }; +} + async function runRemoteSshOverFrontend(session: FrontendSession, providerId: string | undefined, args: string[]): Promise { if (!providerId) throw new Error("remote ssh requires provider id, for example: bun scripts/cli.ts --main-server-ip 74.48.78.17 ssh D601 hostname"); const parsed = parseSshArgs(args); @@ -549,7 +558,7 @@ async function runRemoteCliOverFrontend(options: RemoteCliOptions, config: UniDe emitRemoteJson(name, { transport: "frontend", baseUrl: session.baseUrl, - commands: ["debug health", "debug dispatch", "debug task", "ssh ", "ssh skills", "microservice list", "microservice status ", "microservice health ", "microservice proxy ", "codex task ", "codex judge --attempt N"], + commands: ["debug health", "debug dispatch", "debug task", "ssh ", "ssh skills", "microservice list", "microservice status ", "microservice health ", "microservice proxy ", "codex task ", "codex judge --attempt N", "network perf"], }); return 0; } @@ -573,6 +582,10 @@ async function runRemoteCliOverFrontend(options: RemoteCliOptions, config: UniDe emitRemoteJson(name, await remoteCodeQueue(session, args)); return 0; } + if (top === "network" && sub === "perf") { + emitRemoteJson(name, await remoteNetworkPerf(options, config, args)); + return 0; + } if (top === "ssh") { return await runRemoteSshOverFrontend(session, sub, args.slice(2)); } diff --git a/src/components/backend-core/src/context.ts b/src/components/backend-core/src/context.ts index 2ff76254..e2d3dce5 100644 --- a/src/components/backend-core/src/context.ts +++ b/src/components/backend-core/src/context.ts @@ -1,6 +1,7 @@ import type { JsonValue } from "../../shared/src/index"; import type { EgressTcpConnection, + HttpTunnelWaiter, LoggerFn, MicroserviceAvailabilityEntry, MicroserviceProxyCacheEntry, @@ -22,6 +23,7 @@ export const ctx = { activeProviders: new Map(), activeSshClients: new Map(), activeEgressTcpConnections: new Map(), + httpTunnelWaiters: new Map(), taskTerminalWaiters: new Map>(), microserviceProxyCache: new Map(), microserviceProxyRefreshes: new Map>(), diff --git a/src/components/backend-core/src/index.ts b/src/components/backend-core/src/index.ts index f7208f82..bf4a0ca8 100644 --- a/src/components/backend-core/src/index.ts +++ b/src/components/backend-core/src/index.ts @@ -172,6 +172,12 @@ const providerServer = Bun.serve({ logger("warn", "provider_socket_close", { providerId: providerId ?? null }); if (providerId !== undefined) { closeEgressTcpConnectionsForProvider(providerId); + for (const [requestId, waiter] of ctx.httpTunnelWaiters) { + if (requestId.startsWith(`${providerId}:`)) { + ctx.httpTunnelWaiters.delete(requestId); + waiter(null); + } + } if (ctx.activeProviders.get(providerId) !== ws) { logger("info", "provider_socket_close_ignored_replaced", { providerId }); return; diff --git a/src/components/backend-core/src/microservice-proxy.ts b/src/components/backend-core/src/microservice-proxy.ts index e2e2973f..c607fe03 100644 --- a/src/components/backend-core/src/microservice-proxy.ts +++ b/src/components/backend-core/src/microservice-proxy.ts @@ -202,11 +202,15 @@ function responseFromMicroserviceResult(task: RawTaskRow | null): Response { if (task === null) return jsonResponse({ ok: false, error: "microservice proxy task missing" }, 502); if (task.status !== "succeeded") return jsonResponse({ ok: false, error: "microservice proxy task failed", task }, 502); const result = dockerStatusRecord(task.result); + return responseFromProviderMicroserviceResult(result, "provider-task"); +} + +function responseFromProviderMicroserviceResult(result: Record, proxyMode: string): Response { const status = Number(result.status); const contentType = typeof result.contentType === "string" ? result.contentType : "application/json; charset=utf-8"; const bodyText = typeof result.bodyText === "string" ? result.bodyText : ""; if (!Number.isInteger(status) || status < 100 || status > 599) { - return jsonResponse({ ok: false, error: "microservice proxy returned invalid upstream status", task }, 502); + return jsonResponse({ ok: false, error: "microservice proxy returned invalid upstream status", result }, 502); } if (result.truncated === true && contentTypeIsJson(contentType)) { try { @@ -215,8 +219,6 @@ function responseFromMicroserviceResult(task: RawTaskRow | null): Response { return jsonResponse({ ok: false, error: "microservice proxy response was truncated before a JSON boundary", - providerId: task.provider_id, - command: task.command, upstreamStatus: status, upstreamBodyBytes: result.upstreamBodyBytes ?? null, returnedBodyBytes: result.returnedBodyBytes ?? bodyText.length, @@ -229,6 +231,8 @@ function responseFromMicroserviceResult(task: RawTaskRow | null): Response { status, headers: { "content-type": contentType, + "x-unidesk-proxy-mode": proxyMode, + "x-unidesk-upstream-proxy-mode": typeof result.proxyMode === "string" ? result.proxyMode : "", "x-unidesk-response-truncated": result.truncated === true ? "true" : "false", }, }); @@ -607,6 +611,84 @@ async function k3sctlAdapterMicroserviceResponse( return fetchMicroserviceUpstreamResponse(adapter, method, adapterTargetPath, proxyOptions, requestHeaders, bodyText, abortSignal); } +function providerHttpTunnelRequestId(providerId: string): string { + return `${providerId}:http_${Date.now()}_${Math.random().toString(16).slice(2)}`; +} + +async function waitForProviderHttpTunnelResponse( + providerId: string, + requestId: string, + timeoutMs: number, + abortSignal?: AbortSignal, +): Promise<{ providerId: string; requestId: string; ok: boolean; result: JsonValue } | null> { + return await new Promise((resolve) => { + let settled = false; + let abortHandler: (() => void) | null = null; + const timer = setTimeout(() => settle(null), Math.max(1, timeoutMs)); + const settle = (message: { providerId: string; requestId: string; ok: boolean; result: JsonValue } | null): void => { + if (settled) return; + settled = true; + clearTimeout(timer); + if (abortHandler !== null) abortSignal?.removeEventListener("abort", abortHandler); + ctx.httpTunnelWaiters.delete(requestId); + resolve(message); + }; + abortHandler = () => settle(null); + if (abortSignal !== undefined) { + if (abortSignal.aborted) { + settle(null); + return; + } + abortSignal.addEventListener("abort", abortHandler, { once: true }); + } + ctx.httpTunnelWaiters.set(requestId, (message) => { + if (message !== null && message.providerId !== providerId) { + logger("warn", "http_tunnel_provider_mismatch", { requestId, expectedProviderId: providerId, actualProviderId: message.providerId }); + settle(null); + return; + } + settle(message); + }); + }); +} + +async function providerHttpTunnelMicroserviceResponse( + service: MicroserviceConfig, + method: string, + targetPath: string, + proxyOptions: { query: string; jsonArrayLimits: Record }, + requestHeaders: Record, + bodyText: string, + abortSignal?: AbortSignal, +): Promise { + const socket = ctx.activeProviders.get(service.providerId); + if (socket === undefined) return jsonResponse({ ok: false, error: `provider is offline: ${service.providerId}` }, 503); + const requestId = providerHttpTunnelRequestId(service.providerId); + const timeoutMs = service.backend.timeoutMs + 3000; + const waiter = waitForProviderHttpTunnelResponse(service.providerId, requestId, timeoutMs, abortSignal); + socket.send(JSON.stringify({ + type: "http_tunnel_request", + requestId, + payload: { + source: "microservice-frontend-proxy", + serviceId: service.id, + method, + targetBaseUrl: service.backend.nodeBaseUrl, + path: targetPath, + query: proxyOptions.query, + requestHeaders, + bodyText, + jsonArrayLimits: proxyOptions.jsonArrayLimits, + timeoutMs: service.backend.timeoutMs, + cacheTtlMs: providerMicroserviceCacheTtlMs(service.id, targetPath), + }, + })); + const message = await waiter; + if (message === null) return jsonResponse({ ok: false, error: "provider HTTP tunnel timed out or disconnected", providerId: service.providerId, requestId }, 504); + if (!message.ok) return jsonResponse({ ok: false, error: "provider HTTP tunnel failed", providerId: service.providerId, requestId, result: message.result }, 502); + return responseFromProviderMicroserviceResult(dockerStatusRecord(message.result), "provider-ws-http-tunnel"); +} + async function fetchMicroserviceUpstreamResponse( service: MicroserviceConfig, method: string, @@ -625,6 +707,9 @@ async function fetchMicroserviceUpstreamResponse( if (!(await providerSupports(service.providerId, "microservice.http"))) { return jsonResponse({ ok: false, error: `provider does not declare microservice.http capability: ${service.providerId}` }, 409); } + if (await providerSupports(service.providerId, "microservice.http.tunnel")) { + return providerHttpTunnelMicroserviceResponse(service, method, targetPath, proxyOptions, requestHeaders, bodyText, abortSignal); + } const { taskId, providerOnline } = await createAndSendTask(service.providerId, "microservice.http", { source: "microservice-frontend-proxy", serviceId: service.id, diff --git a/src/components/backend-core/src/provider-registry.ts b/src/components/backend-core/src/provider-registry.ts index 35818509..81fdfb78 100644 --- a/src/components/backend-core/src/provider-registry.ts +++ b/src/components/backend-core/src/provider-registry.ts @@ -81,6 +81,22 @@ export async function handleProviderMessage(ws: ProviderSocket, raw: string | Bu return; } + if (message.type === "http_tunnel_response") { + const waiter = ctx.httpTunnelWaiters.get(message.requestId); + if (waiter === undefined) { + logger("warn", "http_tunnel_response_without_waiter", { providerId: message.providerId, requestId: message.requestId }); + return; + } + ctx.httpTunnelWaiters.delete(message.requestId); + waiter({ + providerId: message.providerId, + requestId: message.requestId, + ok: message.ok, + result: message.result, + }); + return; + } + if (message.type === "egress_tcp_open") { handleEgressTcpOpen(ws, message); return; diff --git a/src/components/backend-core/src/types.ts b/src/components/backend-core/src/types.ts index c6e38786..681160b9 100644 --- a/src/components/backend-core/src/types.ts +++ b/src/components/backend-core/src/types.ts @@ -188,4 +188,11 @@ export interface MicroserviceAvailabilityEntry { probe: Record; } +export type HttpTunnelWaiter = (message: { + providerId: string; + requestId: string; + ok: boolean; + result: JsonValue; +} | null) => void; + export type LoggerFn = (level: "debug" | "info" | "warn" | "error", message: string, data?: JsonValue) => void; diff --git a/src/components/microservices/code-queue/Dockerfile b/src/components/microservices/code-queue/Dockerfile index 75c9735e..ce1f5add 100644 --- a/src/components/microservices/code-queue/Dockerfile +++ b/src/components/microservices/code-queue/Dockerfile @@ -46,7 +46,11 @@ RUN (command -v codex >/dev/null 2>&1 && command -v opencode >/dev/null 2>&1 && WORKDIR /app/src/components/microservices/code-queue COPY src/components/microservices/code-queue/package.json ./package.json RUN test -d node_modules/typescript || bun install +WORKDIR /app +COPY package.json /app/package.json +RUN bun install COPY src/components/shared /app/src/components/shared +WORKDIR /app/src/components/microservices/code-queue COPY src/components/microservices/code-queue/tsconfig.json ./tsconfig.json COPY src/components/microservices/code-queue/src ./src diff --git a/src/components/microservices/k3sctl-adapter/docker-compose.d601.yml b/src/components/microservices/k3sctl-adapter/docker-compose.d601.yml index 2d7787cc..70da03d2 100644 --- a/src/components/microservices/k3sctl-adapter/docker-compose.d601.yml +++ b/src/components/microservices/k3sctl-adapter/docker-compose.d601.yml @@ -31,6 +31,11 @@ services: K3SCTL_KUBE_API_LOCAL_PORT: "${K3SCTL_KUBE_API_LOCAL_PORT:-6443}" K3SCTL_KUBE_API_REMOTE_HOST: "${K3SCTL_KUBE_API_REMOTE_HOST:-127.0.0.1}" K3SCTL_KUBE_API_REMOTE_PORT: "${K3SCTL_KUBE_API_REMOTE_PORT:-6443}" + K3SCTL_NATIVE_SERVICE_PROXY_ENABLED: "${K3SCTL_NATIVE_SERVICE_PROXY_ENABLED:-true}" + K3SCTL_NATIVE_SERVICE_PROBE_TIMEOUT_MS: "${K3SCTL_NATIVE_SERVICE_PROBE_TIMEOUT_MS:-1200}" + K3SCTL_NATIVE_SERVICE_FAILURE_COOLDOWN_MS: "${K3SCTL_NATIVE_SERVICE_FAILURE_COOLDOWN_MS:-10000}" + K3SCTL_NATIVE_SERVICE_URL_CODE_QUEUE: "${K3SCTL_NATIVE_SERVICE_URL_CODE_QUEUE:-}" + K3SCTL_NATIVE_SERVICE_URL_MDTODO: "${K3SCTL_NATIVE_SERVICE_URL_MDTODO:-}" K3SCTL_MANIFEST_PATHS: "${K3SCTL_MANIFEST_PATHS:-k3s/code-queue.k3s.json,k3s/mdtodo.k3s.json}" K3SCTL_SERVICES_JSON: "${K3SCTL_SERVICES_JSON:-[]}" UNIDESK_LOG_RETENTION_BYTES: "${UNIDESK_LOG_RETENTION_BYTES:-512MiB}" diff --git a/src/components/microservices/k3sctl-adapter/src/index.ts b/src/components/microservices/k3sctl-adapter/src/index.ts index cdc0ee92..0389bfe1 100644 --- a/src/components/microservices/k3sctl-adapter/src/index.ts +++ b/src/components/microservices/k3sctl-adapter/src/index.ts @@ -44,6 +44,9 @@ interface RuntimeConfig { kubeApiProxyEnabled: boolean; kubeconfigPath: string; kubeApiConnectHost: string; + nativeServiceProxyEnabled: boolean; + nativeServiceProbeTimeoutMs: number; + nativeServiceFailureCooldownMs: number; requestTimeoutMs: number; healthTimeoutMs: number; services: ManagedService[]; @@ -69,6 +72,7 @@ const logWriter = config.logFile }) : null; const kubeClient = loadKubeApiClient(); +const nativeServiceFailures = new Map(); logWriter?.prune(); function envString(name: string, fallback: string): string { @@ -92,6 +96,11 @@ function envBool(name: string, fallback: boolean): boolean { return fallback; } +function envOptionalString(name: string): string | null { + const value = process.env[name]; + return value === undefined || value.trim().length === 0 ? null : value.trim(); +} + function asRecord(value: unknown, path: string): Record { if (typeof value !== "object" || value === null || Array.isArray(value)) throw new Error(`${path} must be an object`); return value as Record; @@ -259,6 +268,9 @@ function readConfig(): RuntimeConfig { kubeApiProxyEnabled: envBool("K3SCTL_KUBE_API_PROXY_ENABLED", true), kubeconfigPath: envString("K3SCTL_KUBECONFIG_PATH", "/var/lib/unidesk/k3s/kubeconfig"), kubeApiConnectHost: envString("K3SCTL_KUBE_API_CONNECT_HOST", "host.docker.internal"), + nativeServiceProxyEnabled: envBool("K3SCTL_NATIVE_SERVICE_PROXY_ENABLED", true), + nativeServiceProbeTimeoutMs: Math.max(250, Math.min(5000, envNumber("K3SCTL_NATIVE_SERVICE_PROBE_TIMEOUT_MS", 1200))), + nativeServiceFailureCooldownMs: Math.max(1000, Math.min(60_000, envNumber("K3SCTL_NATIVE_SERVICE_FAILURE_COOLDOWN_MS", 10_000))), requestTimeoutMs: Math.max(1000, Math.min(120_000, envNumber("K3SCTL_REQUEST_TIMEOUT_MS", 30_000))), healthTimeoutMs: Math.max(500, Math.min(30_000, envNumber("K3SCTL_HEALTH_TIMEOUT_MS", 2500))), services: mergeServices([...manifestServices, ...inlineServices]), @@ -399,6 +411,37 @@ function endpointProxyApiPath(service: ManagedService, endpoint: ManagedEndpoint return `/api/v1/namespaces/${encodeURIComponent(namespace)}/services/${encodeURIComponent(serviceRef)}/proxy${safeTargetPath}`; } +function nativeServiceFailureKey(service: ManagedService): string { + return `${service.namespace}/${service.id}`; +} + +function nativeServiceUrl(service: ManagedService, targetPath: string, query = ""): URL { + const serviceName = routeString(service, "serviceName", service.id); + const servicePort = routeNumber(service, "servicePort", 80); + const safeTargetPath = targetPath.startsWith("/") ? targetPath : `/${targetPath}`; + const override = envOptionalString(`K3SCTL_NATIVE_SERVICE_URL_${service.id.toUpperCase().replace(/[^A-Z0-9]/gu, "_")}`); + const base = override ?? `http://${serviceName}.${service.namespace}.svc.cluster.local:${servicePort}`; + const url = new URL(safeTargetPath, base.replace(/\/+$/u, "/")); + url.search = query; + return url; +} + +function nativeServiceProxyUsable(service: ManagedService): boolean { + if (!config.nativeServiceProxyEnabled) return false; + const failedAt = nativeServiceFailures.get(nativeServiceFailureKey(service)); + return failedAt === undefined || Date.now() - failedAt >= config.nativeServiceFailureCooldownMs; +} + +function rememberNativeServiceFailure(service: ManagedService, error: unknown): void { + nativeServiceFailures.set(nativeServiceFailureKey(service), Date.now()); + log("warn", "native_service_proxy_failed", { + serviceId: service.id, + namespace: service.namespace, + cooldownMs: config.nativeServiceFailureCooldownMs, + error: errorToJson(error), + }); +} + function kubernetesEndpointServiceRef(service: ManagedService, endpoint: ManagedEndpoint): { namespace: string; serviceRef: string } { const base = new URL(endpoint.baseUrl); if (base.protocol !== "kubernetes:") throw new Error(`endpoint ${endpoint.id} must use kubernetes:// baseUrl`); @@ -460,6 +503,48 @@ async function kubeApiServiceProxyResponse( return kubeApiProxyResponse(service, req, serviceProxyApiPath(service, targetPath), query, timeoutMs); } +async function nativeServiceProxyResponse( + service: ManagedService, + req: Request, + targetPath: string, + query: string, + timeoutMs: number, +): Promise { + if (!nativeServiceProxyUsable(service)) return null; + const upstreamUrl = nativeServiceUrl(service, targetPath, query); + const headers = forwardHeaders(req); + const bodyText = req.method === "GET" || req.method === "HEAD" ? "" : await req.text(); + const controller = new AbortController(); + const nativeTimeoutMs = Math.min(timeoutMs, config.nativeServiceProbeTimeoutMs); + const timer = setTimeout(() => controller.abort(), nativeTimeoutMs); + const startedAt = Date.now(); + try { + const upstream = await fetch(upstreamUrl, { + method: req.method, + headers, + body: bodyText.length > 0 ? bodyText : undefined, + signal: controller.signal, + }); + const body = await boundedText(upstream, 8 * 1024 * 1024); + return new Response(body.text, { + status: upstream.status, + headers: { + "content-type": upstream.headers.get("content-type") ?? "application/octet-stream", + "x-unidesk-proxy-mode": "kubernetes-native-service", + "x-unidesk-k3s-service": service.id, + "x-unidesk-k3s-service-url": upstreamUrl.origin, + "x-unidesk-upstream-duration-ms": String(Date.now() - startedAt), + "x-unidesk-response-truncated": body.truncated ? "true" : "false", + }, + }); + } catch (error) { + rememberNativeServiceFailure(service, error); + return null; + } finally { + clearTimeout(timer); + } +} + async function kubeApiEndpointProxyResponse( service: ManagedService, endpoint: ManagedEndpoint, @@ -574,15 +659,13 @@ async function probeKubernetesServiceActive(service: ManagedService): Promise { if (!active && endpoint.healthMode === "pod-ready") return await probeKubernetesPodReady(service, endpoint); const checkedAt = new Date().toISOString(); - const response = active - ? await kubeApiServiceProxyResponse( - service, - new Request("http://k3sctl-adapter.local/health", { method: "GET", headers: { accept: "application/json" } }), - endpoint.healthPath, - "", - config.healthTimeoutMs, - ) - : await kubeApiEndpointProxyResponse( + let response: Response; + if (active) { + const request = new Request("http://k3sctl-adapter.local/health", { method: "GET", headers: { accept: "application/json" } }); + response = await nativeServiceProxyResponse(service, request.clone(), endpoint.healthPath, "", config.healthTimeoutMs) + ?? await kubeApiServiceProxyResponse(service, request, endpoint.healthPath, "", config.healthTimeoutMs); + } else { + response = await kubeApiEndpointProxyResponse( service, endpoint, new Request("http://k3sctl-adapter.local/health", { method: "GET", headers: { accept: "application/json" } }), @@ -590,6 +673,7 @@ async function probeKubernetesEndpoint(service: ManagedService, endpoint: Manage "", config.healthTimeoutMs, ); + } const contentType = response.headers.get("content-type") ?? "application/octet-stream"; const bodyText = await response.text(); let body: JsonValue = bodyText.slice(0, 2000); @@ -605,7 +689,7 @@ async function probeKubernetesEndpoint(service: ManagedService, endpoint: Manage baseUrl: endpoint.baseUrl, healthPath: endpoint.healthPath, healthMode: endpoint.healthMode, - proxyMode: "kubernetes-api-service-proxy", + proxyMode: response.headers.get("x-unidesk-proxy-mode") ?? "kubernetes-api-service-proxy", route: service.route, healthy: response.ok, status: response.ok ? "healthy" : "unhealthy", @@ -761,7 +845,19 @@ async function controlPlaneSnapshot(): Promise { manifestPaths: config.manifestPaths, managedServicesHealthy, noFallback: true, - runtimePath: "frontend -> backend-core -> k3sctl-adapter -> kubernetes api service proxy -> k3s service", + runtimePath: config.nativeServiceProxyEnabled + ? "frontend -> backend-core -> provider websocket HTTP tunnel -> k3sctl-adapter -> kubernetes native service/DNS -> k3s service" + : "frontend -> backend-core -> k3sctl-adapter -> kubernetes api service proxy -> k3s service", + nativeServiceProxy: { + enabled: config.nativeServiceProxyEnabled, + mode: "kubernetes-native-service", + failureCooldownMs: config.nativeServiceFailureCooldownMs, + failedServices: Array.from(nativeServiceFailures.entries()).map(([key, failedAt]) => ({ + key, + failedAt: new Date(failedAt).toISOString(), + retryAfterMs: Math.max(0, config.nativeServiceFailureCooldownMs - (Date.now() - failedAt)), + })), + }, kubeApiProxy: { enabled: config.kubeApiProxyEnabled, configured: kubeClient !== null, @@ -786,6 +882,8 @@ function forwardHeaders(request: Request): Headers { async function proxyToService(service: ManagedService, req: Request, targetPath: string, query: string): Promise { if (isKubernetesServiceRoute(service)) { + const native = await nativeServiceProxyResponse(service, req.clone(), targetPath, query, config.requestTimeoutMs); + if (native !== null) return native; return kubeApiServiceProxyResponse(service, req, targetPath, query, config.requestTimeoutMs); } log("error", "k3sctl_route_not_kubernetes_service", { serviceId: service.id, route: service.route, noFallback: true }); diff --git a/src/components/provider-gateway/package.json b/src/components/provider-gateway/package.json index 5e436141..d132e823 100644 --- a/src/components/provider-gateway/package.json +++ b/src/components/provider-gateway/package.json @@ -1,6 +1,6 @@ { "name": "@unidesk/provider-gateway", - "version": "0.2.19", + "version": "0.2.20", "private": true, "type": "module", "scripts": { diff --git a/src/components/provider-gateway/src/index.ts b/src/components/provider-gateway/src/index.ts index a7f12bbd..262e8d24 100644 --- a/src/components/provider-gateway/src/index.ts +++ b/src/components/provider-gateway/src/index.ts @@ -5,6 +5,7 @@ import { type CoreEgressTcpDataMessage, type CoreEgressTcpOpenedMessage, type CoreDispatchMessage, + type CoreHttpTunnelRequestMessage, type CoreHostSshCloseMessage, type CoreHostSshEofMessage, type CoreHostSshInputMessage, @@ -561,7 +562,7 @@ function sendJsonOk(value: unknown): boolean { } function sendRegister(): void { - const capabilities = ["heartbeat", "system.status", "docker.status", "docker.ps", "provider.upgrade", "microservice.http", "microservice.http.cache", "echo"]; + const capabilities = ["heartbeat", "system.status", "docker.status", "docker.ps", "provider.upgrade", "microservice.http", "microservice.http.cache", "microservice.http.tunnel", "echo"]; if (isHostSshConfigured()) capabilities.push("host.ssh"); if (config.egressProxyEnabled) capabilities.push("network.egress-proxy"); sendJson({ @@ -1989,6 +1990,7 @@ async function runMicroserviceHttp(payload: Record): Promise< truncated: bounded.truncated, transform: transformed.transform, upstreamDurationMs: Date.now() - requestStartedAt, + proxyMode: "provider-gateway-http-fetch", }; })(); if (cacheable) microserviceHttpInFlight.set(cacheKey, requestPromise); @@ -2053,6 +2055,38 @@ async function handleDispatch(message: CoreDispatchMessage): Promise { } } +async function handleHttpTunnelRequest(message: CoreHttpTunnelRequestMessage): Promise { + const startedAt = Date.now(); + try { + const result = await runMicroserviceHttp(message.payload); + sendJson({ + type: "http_tunnel_response", + providerId: config.providerId, + requestId: message.requestId, + ok: (result as { ok?: unknown }).ok === true, + result, + at: new Date().toISOString(), + }); + logger("debug", "http_tunnel_completed", { + requestId: message.requestId, + serviceId: typeof message.payload.serviceId === "string" ? message.payload.serviceId : "", + durationMs: Date.now() - startedAt, + ok: (result as { ok?: unknown }).ok === true, + }); + } catch (error) { + const text = error instanceof Error ? `${error.name}: ${error.message}` : String(error); + logger("error", "http_tunnel_failed", { requestId: message.requestId, error: text }); + sendJson({ + type: "http_tunnel_response", + providerId: config.providerId, + requestId: message.requestId, + ok: false, + result: { ok: false, error: text }, + at: new Date().toISOString(), + }); + } +} + function handleMessage(raw: MessageEvent): void { try { const parsed = JSON.parse(raw.data) as { type?: unknown }; @@ -2060,6 +2094,10 @@ function handleMessage(raw: MessageEvent): void { handleDispatch(parsed as CoreDispatchMessage).catch((error) => logger("error", "dispatch_handler_failed", { error: String(error) })); return; } + if (parsed.type === "http_tunnel_request") { + handleHttpTunnelRequest(parsed as CoreHttpTunnelRequestMessage).catch((error) => logger("error", "http_tunnel_handler_failed", { error: String(error) })); + return; + } if (parsed.type === "host_ssh_open") { startHostSshSession(parsed as CoreHostSshOpenMessage); return; diff --git a/src/components/shared/src/index.ts b/src/components/shared/src/index.ts index fe3c1bec..c82191c8 100644 --- a/src/components/shared/src/index.ts +++ b/src/components/shared/src/index.ts @@ -244,6 +244,21 @@ export interface ProviderEgressTcpCloseMessage { at: string; } +export interface CoreHttpTunnelRequestMessage { + type: "http_tunnel_request"; + requestId: string; + payload: Record; +} + +export interface ProviderHttpTunnelResponseMessage { + type: "http_tunnel_response"; + providerId: string; + requestId: string; + ok: boolean; + result: JsonValue; + at: string; +} + export interface CoreEgressTcpOpenedMessage { type: "egress_tcp_opened"; connectionId: string; @@ -274,10 +289,12 @@ export type ProviderToCoreMessage = | ProviderHostSshErrorMessage | ProviderEgressTcpOpenMessage | ProviderEgressTcpDataMessage - | ProviderEgressTcpCloseMessage; + | ProviderEgressTcpCloseMessage + | ProviderHttpTunnelResponseMessage; export type CoreToProviderMessage = | CoreDispatchMessage + | CoreHttpTunnelRequestMessage | CoreHostSshOpenMessage | CoreHostSshInputMessage | CoreHostSshResizeMessage @@ -369,7 +386,8 @@ export function isProviderToCoreMessage(value: unknown): value is ProviderToCore msg.type === "host_ssh_error" || msg.type === "egress_tcp_open" || msg.type === "egress_tcp_data" || - msg.type === "egress_tcp_close" + msg.type === "egress_tcp_close" || + msg.type === "http_tunnel_response" ) && typeof msg.providerId === "string" && msg.providerId.length > 0