Stabilize provider HTTP tunnel diagnostics

This commit is contained in:
Codex
2026-05-17 06:53:03 +00:00
parent aaf6e74aa4
commit bf364baac8
12 changed files with 518 additions and 50 deletions
+1 -1
View File
@@ -113,7 +113,7 @@ bun scripts/cli.ts ssh D601 glob --root /home/ubuntu/pikapython --pattern '**/*-
`--main-server-ip` 是一个全局前缀,必须放在需要透传的命令同一次调用中,例如 `bun scripts/cli.ts --main-server-ip 74.48.78.17 debug health`。默认传输是公网 frontend:本地 CLI 读取本仓库 `config.json` 中的 frontend 登录账号密码,登录 `http://<ip>:<frontendPort>/` 获取 HttpOnly session cookie,然后通过 frontend 的 `/api/*` 同源代理访问 backend-core 内网 API;因此计算节点只需要能访问公网 frontend,不需要主 server SSH key,也不需要打开 backend-core REST API 或 PostgreSQL 端口。
默认 frontend 传输支持 `debug health``debug dispatch``debug task``microservice list/status/health/proxy``decision upload/list/show/health``codex task <taskId>``codex output <taskId>``codex judge <taskId> --attempt N``ssh <PROVIDER_ID> <remote-command>`。其中 `ssh` 的 remote frontend 传输使用 `host.ssh` dispatch 执行有界远端命令,适合 `ssh D601 hostname``ssh D601 skills` 这类自测;交互式登录 shell 仍应在主 server 本机 CLI 使用,或显式切换到旧 SSH 传输后在主 server 上执行。frontend 远程透传不会流式转发本地 stdin,因此 `ssh py < script.py``ssh apply-patch < patch.diff` 这类 stdin-backed helper 必须在主 server 本机运行,或显式切换到 `--main-server-transport ssh`。若确实需要旧行为,可使用 `--main-server-key <key>``--main-server-transport ssh`,这时 CLI 会通过 SSH 登录主 server 的 `--main-server-root` 目录执行同一个 `bun scripts/cli.ts <command>`
默认 frontend 传输支持 `debug health``debug dispatch``debug task``microservice list/status/health/diagnostics/tunnel-self-test/proxy``decision upload/list/show/health``codex task <taskId>``codex output <taskId>``codex judge <taskId> --attempt N``ssh <PROVIDER_ID> <remote-command>`。其中 `ssh` 的 remote frontend 传输使用 `host.ssh` dispatch 执行有界远端命令,适合 `ssh D601 hostname``ssh D601 skills` 这类自测;交互式登录 shell 仍应在主 server 本机 CLI 使用,或显式切换到旧 SSH 传输后在主 server 上执行。frontend 远程透传不会流式转发本地 stdin,因此 `ssh py < script.py``ssh apply-patch < patch.diff` 这类 stdin-backed helper 必须在主 server 本机运行,或显式切换到 `--main-server-transport ssh`。若确实需要旧行为,可使用 `--main-server-key <key>``--main-server-transport ssh`,这时 CLI 会通过 SSH 登录主 server 的 `--main-server-root` 目录执行同一个 `bun scripts/cli.ts <command>`
计算节点可以用该入口测试自身的远程升级闭环,而不需要在计算节点公开 core REST API 或 database。标准顺序是:先运行 `bun scripts/cli.ts --main-server-ip 74.48.78.17 debug health` 确认主 server 看到当前 Provider 在线,且该 Provider labels 中 `unideskCapabilities` 包含 `host.ssh``hostSshConfigured=true``hostSshKeyPresent=true`;再运行 `bun scripts/cli.ts --main-server-ip 74.48.78.17 debug dispatch <PROVIDER_ID> provider.upgrade --mode schedule --wait-ms 15000` 触发真实 `provider.upgrade`;随后再次运行 `debug health` 确认节点重新上线;最后运行 `bun scripts/cli.ts --main-server-ip 74.48.78.17 debug dispatch <PROVIDER_ID> host.ssh --wait-ms 15000``bun scripts/cli.ts --main-server-ip 74.48.78.17 ssh <PROVIDER_ID> hostname` 验证 SSH 透传能力。provider-gateway 新部署或升级后没有完成这组 remote CLI 自测,不能视为交付完成。
+2
View File
@@ -316,6 +316,8 @@ ClaudeQQ 的业务源码和持久化数据仍在 D601,但正式运行由 k3s
- `bun scripts/cli.ts microservice health oa-event-flow``bun scripts/cli.ts microservice proxy oa-event-flow /api/diagnostics --raw``bun scripts/cli.ts microservice proxy oa-event-flow '/api/events?tags=service:code-queue&limit=20' --raw`:验证统一 OA 事件流、事件表、tag 查询和统计中心。
- `bun scripts/cli.ts microservice health k3sctl-adapter``bun scripts/cli.ts microservice proxy k3sctl-adapter /api/control-plane --raw`:验证 D601 `unidesk-k3s` 控制面 adapter、manifest、D601 scheduler/read/write 实例状态、`presentNodeIds` 包含 `D601``missingNodeIds=[]` 和 no-fallback 运行路径。
- `bun scripts/cli.ts microservice health code-queue``bun scripts/cli.ts microservice proxy code-queue /api/tasks/overview`:验证 Code Queue 经过 backend-core -> k3sctl-adapter -> k3s Service proxy 的单一路径,其中 `/health` 指向 `code-queue-scheduler`overview/详情只读请求指向 `code-queue-read`,写入类请求指向 `code-queue-write`;输出不得出现 `serviceId=code-queue` 的 provider-gateway `microservice.http` 业务代理任务,写入、追加 prompt、打断和 readAt/未读状态都必须由 backend 写入 PostgreSQLfrontend 不得用本地存储伪造成功状态。
- `bun scripts/cli.ts microservice diagnostics code-queue`:拆分 k3sctl-managed 链路健康,返回 `providerGateway``httpTunnel``k3sctlAdapter``kubernetesApiServiceProxy``targetService` 五段状态。该命令仍通过 backend-core 用户服务代理访问,不允许浏览器或 CLI 绕到 k3s、NodePort、Pod IP 或 D601 本机业务端口。
- `bun scripts/cli.ts microservice tunnel-self-test code-queue`:触发一次预期失败的 provider HTTP tunnel 请求,用于确认失败响应包含 `requestId``stage``x-unidesk-request-id``x-unidesk-tunnel-error`;该自测只访问 provider 侧无效 loopback 端口,不创建 Code Queue 队列,也不绕过正式 backend-core 入口。
- `bun scripts/cli.ts microservice health filebrowser``bun scripts/cli.ts microservice health filebrowser-d601``bun scripts/cli.ts microservice proxy filebrowser / --max-body-bytes 2000`:验证 D518 主 File Browser 和 D601 备用 File Browser 私有代理链路;浏览器 WebUI 必须通过 `/api/microservices/filebrowser/proxy/``/api/microservices/filebrowser-d601/proxy/` 访问,不得直接开放 `4251` 公网端口。
- `bun scripts/cli.ts --main-server-ip 74.48.78.17 microservice health findjob`:在计算节点或其他非主 server 主机上通过公网 frontend remote CLI 进行同一验证,不需要主 server SSH key。
+1 -1
View File
@@ -32,6 +32,6 @@ frontend Bun server 必须提供同源 `/api/frontend-performance`,记录 webu
性能优化必须先用这些指标锁定慢操作名称、路径、耗时和代理层级,再改后端查询或前后端通信策略;不得只凭主观体感改 UI。Code Queue 这类控制面页面出现 `core_proxy``GET /api/microservices/code-queue/proxy/api/tasks/overview``POST /api/microservices/code-queue/proxy/api/tasks/<id>/read` 等超过 1s 的慢操作时,应保留优化前后的性能面板证据,并同时记录 live API 耗时、容器内存、`/health` 存储摘要和是否仍通过 PostgreSQL/append-only archive 重建历史数据。短 TTL cache、warmup 或页面内存缓存只能作为重复请求抖动保护,性能证据必须证明数据库索引/聚合、分页和渐进式披露本身已把核心路径降到目标内,不能用长缓存遮蔽慢 SQL 或全量 JSON 物化。
当最近失败请求集中出现 frontend `core_proxy` 502,路径为 `/api/microservices/code-queue/proxy/...` 的 overview、trace 或 summary,且 k3s/k8s Pod 仍在运行时,必须区分“Kubernetes API service proxy 不可达”“Code Queue 进程不可达”和“Code Queue event loop 被热路径同步工作饿死”。排障顺序是同时查看 `/api/frontend-performance``/api/performance``k3sctl-adapter` `/api/control-plane`、Kubernetes Pod `/live``/health`、overview/trace-step curl、`kubectl top pod` 或 Docker stats、容器 `RestartCount`/`OOMKilled` 和 Code Queue 日志;如果 Pod 内 `/health` 也超时,应优先检查实时 output 发布、archive 读取、transcript 构建、统计计算、启动维护、历史 OA backfill 和远程 Provider 准备/SSH 子进程是否阻塞 event loop,而不是先调整 frontend 渲染或代理超时。Code Queue 默认不得在启动时自动执行历史 OA backfill 或通知表索引维护;显式 backfill 必须作为运维动作记录,并在运行期间并发证明 `/live``/health``/api/tasks/overview` 仍快速返回。涉及 D601 等远程 Provider 时,还要检查 `runCodeQueueSsh`/开发容器准备是否仍存在同步子进程、无 timeout 的 SSH、无上限 stdout/stderr 或 stale TUN 重建等待;修复后必须在远程准备探针运行期间并发证明 Pod `/health``/api/tasks/overview` 仍快速返回。
当最近失败请求集中出现 frontend `core_proxy` 502/503/504,路径为 `/api/microservices/code-queue/proxy/...` 的 overview、trace 或 summary,且 k3s/k8s Pod 仍在运行时,必须先运行 `bun scripts/cli.ts microservice diagnostics code-queue`,区分 provider-gateway online、WebSocket HTTP tunnel、k3sctl-adapter、Kubernetes API service proxy 和目标 Service 五段状态。provider tunnel 类失败必须记录响应 body/headers 中的 `requestId``stage``failureReason``x-unidesk-request-id``x-unidesk-tunnel-error`;如需主动验证错误结构,运行 `bun scripts/cli.ts microservice tunnel-self-test code-queue`,该自测应返回预期失败但 `ok=true` 的诊断结果。随后再继续判断“Kubernetes API service proxy 不可达”“Code Queue 进程不可达”和“Code Queue event loop 被热路径同步工作饿死”。排障顺序是同时查看 `/api/frontend-performance``/api/performance``k3sctl-adapter` `/api/control-plane`、Kubernetes Pod `/live``/health`、overview/trace-step curl、`kubectl top pod` 或 Docker stats、容器 `RestartCount`/`OOMKilled` 和 Code Queue 日志;如果 Pod 内 `/health` 也超时,应优先检查实时 output 发布、archive 读取、transcript 构建、统计计算、启动维护、历史 OA backfill 和远程 Provider 准备/SSH 子进程是否阻塞 event loop,而不是先调整 frontend 渲染或代理超时。Code Queue 默认不得在启动时自动执行历史 OA backfill 或通知表索引维护;显式 backfill 必须作为运维动作记录,并在运行期间并发证明 `/live``/health``/api/tasks/overview` 仍快速返回。涉及 D601 等远程 Provider 时,还要检查 `runCodeQueueSsh`/开发容器准备是否仍存在同步子进程、无 timeout 的 SSH、无上限 stdout/stderr 或 stale TUN 重建等待;修复后必须在远程准备探针运行期间并发证明 Pod `/health``/api/tasks/overview` 仍快速返回。
Code Queue task 明明产出最终回复却反复 `retry_wait` 时,应优先用任务详情里的 latest attempt 字段核查 `terminalStatus``transportClosedBeforeTerminal``appServerExitCode``finalResponseChars``judge.raw._safetyOverride` 和 attempt output。OpenCode 远程任务中,`opencode completed status=completed exit=0` 加当前 attempt 非空 assistant 输出应对应 `terminalStatus=completed``transportClosedBeforeTerminal=false`;如果因为缺少 `step_finish` 事件仍触发 `_safetyOverride=terminal_not_completed`,说明协议终态归一化有回归。相反,当前 attempt 没有最终 assistant response 时即使 tool/read/bash 证据完整,也必须 retry,不能用旧 `task.finalResponse` 或 reasoning/tool evidence 代替可见最终回复。
+2
View File
@@ -92,6 +92,8 @@ provider ingress 是唯一允许公网暴露的 provider 连接接口,当前
backend-core 下发目标 service id、节点本机 `targetBaseUrl`、path、query、method、request body、timeout 和可选 JSON 数组裁剪参数;provider-gateway 支持 `GET``HEAD``POST``PUT``PATCH``DELETE`,但最终允许方法必须由每个用户服务的 `backend.allowedMethods` 显式配置。provider-gateway 只允许访问 `http://127.0.0.1``http://localhost``http://host.docker.internal` 这些节点本地地址;主 server 内置 Todo Note 后端可使用 Compose 服务名 `http://todo-note:4211``deployment.mode=k3sctl-managed` 的 Code Queue 不得通过 provider-gateway 直连业务容器,正式路径只能是 backend-core -> provider WebSocket HTTP tunnel -> `k3sctl-adapter` -> Kubernetes native Service/DNS,必要时显式 fallback 到 Kubernetes API service proxy -> k3s/k8s Service。该能力不打开 provider-gateway 入站端口,也不替代业务仓库自身 Dockerfile/docker-compose。
backend-core 必须把 provider WebSocket HTTP tunnel 的失败分类到响应 body 和 headers:失败响应至少包含 `requestId``providerId``serviceId``stage``failureReason` 或 provider result,并带 `x-unidesk-request-id``x-unidesk-tunnel-error``GET`/`HEAD` 非 stream 请求允许短超时分层重试;`POST``PATCH``PUT``DELETE` 这类可能产生副作用的请求不得自动重复。Provider 重连时 backend-core 必须先确认 close 事件来自当前 active socket,旧 socket 被新 socket 替换后的迟到 close 不得清理新连接上的 tunnel waiter,也不得把节点误标 offline。
超大 JSON 响应可以使用 `jsonArrayLimits` 在 provider-gateway 返回前裁剪指定数组,并在响应体中写入 `_unidesk.arrayLimits` 元数据,便于 UniDesk frontend 预览列表而不展示裸 JSON。长期应优先推动业务后端提供分页 API;裁剪只是 UniDesk 集成层的展示保护。
## Egress Proxy
+2
View File
@@ -45,6 +45,8 @@ function help(): unknown {
{ command: "microservice list", description: "List UniDesk-managed user services and their provider/runtime mapping." },
{ command: "microservice status <id>", description: "Show one user service config, repository reference, backend mapping, and runtime status." },
{ command: "microservice health <id>", description: "Probe one user service through backend-core -> provider-gateway HTTP proxy." },
{ command: "microservice diagnostics <id>", description: "Split k3sctl-managed proxy health into provider-gateway, HTTP tunnel, adapter, Kubernetes API service proxy, and target Service checks." },
{ command: "microservice tunnel-self-test <id>", description: "Trigger an expected provider HTTP tunnel failure and verify requestId/stage diagnostics are returned." },
{ command: "microservice proxy <id> <path> [--method GET|POST|PUT|PATCH|DELETE] [--raw] [--max-body-bytes N]", description: "Access a private user-service backend path through the same frontend-only proxy used by WebUI; large bodies are summarized unless --raw is set." },
{ command: "decision upload <markdown-file> [--title text] [--type meeting|decision] [--level G0|G1|G2|G3|P0|P1|P2|P3|none] [--status active|blocked|parked|done] [--linked-goal-id id] [--evidence url]", description: "Upload a meeting note or decision record through backend-core -> decision-center user-service proxy." },
{ command: "decision list [--type ...] [--status ...] [--level ...] [--linked-goal-id id] [--limit N]", description: "List Decision Center records through the user-service proxy." },
+9 -1
View File
@@ -95,10 +95,18 @@ export async function runMicroserviceCommand(_config: UniDeskConfig, args: strin
const id = requireId(idArg, "microservice health");
return coreInternalFetch(`/api/microservices/${encodeId(id)}/health`);
}
if (action === "diagnostics") {
const id = requireId(idArg, "microservice diagnostics");
return coreInternalFetch(`/api/microservices/${encodeId(id)}/diagnostics`);
}
if (action === "tunnel-self-test") {
const id = requireId(idArg, "microservice tunnel-self-test");
return coreInternalFetch(`/api/microservices/${encodeId(id)}/tunnel-self-test`);
}
if (action === "proxy") {
const id = requireId(idArg, "microservice proxy");
const path = requireProxyPath(pathArg);
return summarizeMicroserviceProxyResponse(coreInternalFetch(`/api/microservices/${encodeId(id)}/proxy${path}`, { method: methodOption(args) }), args);
}
throw new Error("microservice command must be one of: list, status, health, proxy");
throw new Error("microservice command must be one of: list, status, health, diagnostics, tunnel-self-test, proxy");
}
+3 -3
View File
@@ -455,7 +455,7 @@ async function remoteMicroservice(session: FrontendSession, args: string[]): Pro
if (action === "list") {
return { transport: "frontend", response: await frontendJson(session, "/api/microservices", undefined, 12_000) };
}
if ((action === "status" || action === "health") && id !== undefined) {
if ((action === "status" || action === "health" || action === "diagnostics" || action === "tunnel-self-test") && id !== undefined) {
return {
transport: "frontend",
response: await frontendJson(session, `/api/microservices/${encodeURIComponent(id)}/${action}`, undefined, 18_000),
@@ -468,7 +468,7 @@ async function remoteMicroservice(session: FrontendSession, args: string[]): Pro
response: summarizeMicroserviceProxyResponse(response, args),
};
}
throw new Error("remote microservice command must be: microservice list | status <id> | health <id> | proxy <id> <path>");
throw new Error("remote microservice command must be: microservice list | status <id> | health <id> | diagnostics <id> | tunnel-self-test <id> | proxy <id> <path>");
}
async function remoteCodeQueue(session: FrontendSession, args: string[]): Promise<unknown> {
@@ -559,7 +559,7 @@ async function runRemoteCliOverFrontend(options: RemoteCliOptions, config: UniDe
emitRemoteJson(name, {
transport: "frontend",
baseUrl: session.baseUrl,
commands: ["debug health", "debug dispatch", "debug task", "ssh <providerId> <command>", "ssh <providerId> skills", "microservice list", "microservice status <id>", "microservice health <id>", "microservice proxy <id> <path>", "decision upload <markdown-file>", "decision list", "decision show <id>", "codex task <taskId>", "codex judge <taskId> --attempt N", "network perf"],
commands: ["debug health", "debug dispatch", "debug task", "ssh <providerId> <command>", "ssh <providerId> skills", "microservice list", "microservice status <id>", "microservice health <id>", "microservice diagnostics <id>", "microservice tunnel-self-test <id>", "microservice proxy <id> <path>", "decision upload <markdown-file>", "decision list", "decision show <id>", "codex task <taskId>", "codex judge <taskId> --attempt N", "network perf"],
});
return 0;
}
@@ -93,3 +93,11 @@ export function closeEgressTcpConnectionsForProvider(providerId: string): void {
connection.socket.destroy();
}
}
export function closeEgressTcpConnectionsForSocket(provider: ProviderSocket): void {
for (const [key, connection] of ctx.activeEgressTcpConnections) {
if (connection.provider !== provider) continue;
ctx.activeEgressTcpConnections.delete(key);
connection.socket.destroy();
}
}
+7 -6
View File
@@ -9,7 +9,7 @@ import { recordRequestPerformance, withPerformanceOperation, getPerformance } fr
import { handleProviderMessage, markProviderOffline, markStaleProvidersOffline } from "./provider-registry";
import { markStaleTasksFailed, dispatchTask } from "./task-dispatcher";
import { handleSshClientMessage, sshRoute } from "./ssh-bridge";
import { closeEgressTcpConnectionsForProvider } from "./egress-tcp";
import { closeEgressTcpConnectionsForProvider, closeEgressTcpConnectionsForSocket } from "./egress-tcp";
import { scheduledTaskRoute, runDueScheduledTasks, recoverScheduledRuns } from "./scheduler";
import { microserviceRoute, getMicroservices } from "./microservice-proxy";
import { getOverview, codexQueueLoadTest } from "./overview";
@@ -171,17 +171,18 @@ const providerServer = Bun.serve<WsData>({
const providerId = ws.data.providerId;
logger("warn", "provider_socket_close", { providerId: providerId ?? null });
if (providerId !== undefined) {
if (ctx.activeProviders.get(providerId) !== ws) {
closeEgressTcpConnectionsForSocket(ws);
logger("info", "provider_socket_close_ignored_replaced", { providerId });
return;
}
closeEgressTcpConnectionsForProvider(providerId);
for (const [requestId, waiter] of ctx.httpTunnelWaiters) {
if (requestId.startsWith(`${providerId}:`)) {
ctx.httpTunnelWaiters.delete(requestId);
waiter(null);
waiter(null, "provider-disconnected");
}
}
if (ctx.activeProviders.get(providerId) !== ws) {
logger("info", "provider_socket_close_ignored_replaced", { providerId });
return;
}
markProviderOffline(providerId).catch((error) => logger("error", "provider_offline_mark_failed", { providerId, error: errorToJson(error) }));
}
},
@@ -1,6 +1,6 @@
import type { JsonValue } from "../../shared/src/index";
import { ctx, config, logger } from "./context";
import type { MicroserviceConfig, MicroserviceProxyCacheEntry, MicroserviceHealthAssessment, MicroserviceAvailabilityEntry, RawTaskRow } from "./types";
import type { HttpTunnelFailureReason, MicroserviceConfig, MicroserviceProxyCacheEntry, MicroserviceHealthAssessment, MicroserviceAvailabilityEntry, RawTaskRow } from "./types";
import { jsonResponse, errorToJson, compactJson, isPlainRecord, truncateText } from "./http";
import { createAndSendTask, waitForTaskTerminal, providerSupports } from "./task-dispatcher";
import { getNodes, getNodeDockerStatuses } from "./db";
@@ -12,6 +12,7 @@ import { getNodes, getNodeDockerStatuses } from "./db";
const microserviceProxyMaxBodyTextLength = 8 * 1024 * 1024;
const microserviceAvailabilityTtlMs = 30_000;
const codeQueueOverviewPathFallbackStaleMs = 30_000;
const providerHttpTunnelMaxAttempts = 3;
const microserviceForwardRequestHeaders = [
"accept",
"content-type",
@@ -456,6 +457,13 @@ function responseFromMicroserviceCache(entry: MicroserviceProxyCacheEntry, state
});
}
function isMicroserviceTransientFailureResponse(response: Response): boolean {
if (response.status !== 502 && response.status !== 503 && response.status !== 504) return false;
return response.headers.get("x-unidesk-transient-error") === "true"
|| response.headers.get("x-unidesk-tunnel-error") !== null
|| response.headers.get("x-unidesk-upstream-proxy-mode") === "provider-gateway-http-fetch";
}
function readMicroserviceCache(key: string): Response | null {
const entry = ctx.microserviceProxyCache.get(key);
if (entry === undefined) return null;
@@ -626,43 +634,248 @@ async function k3sctlAdapterMicroserviceResponse(
return fetchMicroserviceUpstreamResponse(adapter, method, adapterTargetPath, proxyOptions, requestHeaders, bodyText, abortSignal);
}
async function k3sctlManagedDiagnosticsResponse(service: MicroserviceConfig): Promise<Response> {
const adapterServiceId = service.deployment.adapterServiceId ?? "k3sctl-adapter";
const adapter = microserviceById(adapterServiceId);
const checkedAt = new Date().toISOString();
const providerId = adapter?.providerId ?? service.providerId;
const providerOnline = ctx.activeProviders.has(providerId);
const providerTunnelCapable = await providerSupports(providerId, "microservice.http.tunnel");
if (adapter === null) {
return jsonResponse({
ok: false,
serviceId: service.id,
checkedAt,
requestPath: "/diagnostics",
checks: {
providerGateway: { ok: providerOnline, providerId, online: providerOnline },
httpTunnel: { ok: providerTunnelCapable, providerId, capable: providerTunnelCapable },
k3sctlAdapter: { ok: false, serviceId: adapterServiceId, error: `k3sctl adapter microservice not found: ${adapterServiceId}` },
kubernetesApiServiceProxy: { ok: false, skipped: true },
targetService: { ok: false, skipped: true },
},
}, 502);
}
const k3sServiceId = service.id === "code-queue"
? codeQueueK3sServiceIdForRequest("GET", service.backend.healthPath)
: service.deployment.k3sServiceId ?? service.id;
const adapterPath = `/api/services/${encodeURIComponent(k3sServiceId)}/diagnostics`;
const response = await fetchMicroserviceUpstreamResponse(
adapter,
"GET",
adapterPath,
{ query: "", jsonArrayLimits: {} },
{ accept: "application/json" },
"",
);
const contentType = response.headers.get("content-type") ?? "application/json; charset=utf-8";
const bodyText = await response.text();
let adapterBody: JsonValue = bodyText;
try {
adapterBody = JSON.parse(bodyText) as JsonValue;
} catch {
adapterBody = bodyText.slice(0, 4000);
}
const bodyRecord = isPlainRecord(adapterBody) ? adapterBody : {};
const adapterChecks = isPlainRecord(bodyRecord.checks) ? bodyRecord.checks : {};
const checks = {
providerGateway: {
ok: providerOnline,
providerId,
online: providerOnline,
activeSocketCount: ctx.activeProviders.size,
},
httpTunnel: {
ok: response.ok && response.headers.get("x-unidesk-proxy-mode") === "provider-ws-http-tunnel",
providerId,
capable: providerTunnelCapable,
requestId: response.headers.get("x-unidesk-request-id") ?? null,
attempts: response.headers.get("x-unidesk-http-tunnel-attempts") ?? null,
upstreamProxyMode: response.headers.get("x-unidesk-upstream-proxy-mode") ?? null,
proxyStatus: response.status,
},
k3sctlAdapter: {
ok: response.ok,
serviceId: adapter.id,
providerId: adapter.providerId,
status: response.status,
contentType,
},
kubernetesApiServiceProxy: compactJson(adapterChecks.kubernetesApiServiceProxy ?? { ok: false, skipped: true }),
targetService: compactJson(adapterChecks.targetService ?? adapterChecks.managedService ?? { ok: false, skipped: true }),
} satisfies Record<string, JsonValue>;
const httpTunnelCheck = checks.httpTunnel as Record<string, JsonValue>;
return jsonResponse({
ok: response.ok && providerOnline && httpTunnelCheck.ok === true,
serviceId: service.id,
k3sServiceId,
checkedAt,
path: service.backend.healthPath,
chain: "CLI/frontend -> backend-core -> provider-gateway HTTP tunnel -> k3sctl-adapter -> Kubernetes API service proxy -> k3s Service",
checks,
adapter: adapterBody,
}, response.ok ? 200 : response.status);
}
async function microserviceTunnelSelfTestResponse(service: MicroserviceConfig): Promise<Response> {
const tunnelService = isK3sctlManagedMicroservice(service)
? microserviceById(service.deployment.adapterServiceId ?? "k3sctl-adapter")
: service;
if (tunnelService === null) {
return jsonResponse({ ok: false, serviceId: service.id, error: "tunnel service not found", adapterServiceId: service.deployment.adapterServiceId ?? null }, 502);
}
if (!(await providerSupports(tunnelService.providerId, "microservice.http.tunnel"))) {
return jsonResponse({
ok: false,
serviceId: service.id,
providerId: tunnelService.providerId,
error: `provider does not declare microservice.http.tunnel capability: ${tunnelService.providerId}`,
}, 409);
}
const probeService = {
...tunnelService,
backend: {
...tunnelService.backend,
nodeBaseUrl: "http://127.0.0.1:1",
timeoutMs: 1000,
},
};
const response = await providerHttpTunnelMicroserviceResponse(
probeService,
"GET",
"/",
{ query: "", jsonArrayLimits: {} },
{ accept: "application/json" },
"",
);
const headers = {
requestId: response.headers.get("x-unidesk-request-id"),
tunnelError: response.headers.get("x-unidesk-tunnel-error"),
providerId: response.headers.get("x-unidesk-provider-id"),
serviceId: response.headers.get("x-unidesk-service-id"),
transient: response.headers.get("x-unidesk-transient-error"),
};
const bodyText = await response.text();
let body: JsonValue = bodyText.slice(0, 4000);
try {
body = JSON.parse(bodyText) as JsonValue;
} catch {
// Keep bounded text body for malformed JSON diagnostics.
}
const bodyRecord = isPlainRecord(body) ? body : {};
const hasRequestId = typeof bodyRecord.requestId === "string" && bodyRecord.requestId.length > 0;
const hasStage = typeof bodyRecord.stage === "string" && bodyRecord.stage.length > 0;
const ok = response.status === 502 && hasRequestId && hasStage && headers.requestId === bodyRecord.requestId;
return jsonResponse({
ok,
serviceId: service.id,
tunnelServiceId: tunnelService.id,
providerId: tunnelService.providerId,
expectedFailure: true,
status: response.status,
checks: {
expectedStatus: response.status === 502,
bodyHasRequestId: hasRequestId,
bodyHasStage: hasStage,
headerHasRequestId: typeof headers.requestId === "string" && headers.requestId.length > 0,
headerHasTunnelError: typeof headers.tunnelError === "string" && headers.tunnelError.length > 0,
},
headers,
body,
}, ok ? 200 : 502);
}
function providerHttpTunnelRequestId(providerId: string): string {
return `${providerId}:http_${Date.now()}_${Math.random().toString(16).slice(2)}`;
}
function canRetryProviderHttpTunnel(method: string, targetPath: string): boolean {
const normalizedMethod = method.toUpperCase();
if (normalizedMethod !== "GET" && normalizedMethod !== "HEAD") return false;
return !targetPath.endsWith("/stream");
}
function providerHttpTunnelWaitMs(service: MicroserviceConfig, attempt: number, retryable: boolean): number {
const baseTimeoutMs = Math.max(1000, service.backend.timeoutMs);
if (!retryable) return baseTimeoutMs + 3000;
if (attempt === 1) return Math.min(baseTimeoutMs + 3000, Math.max(5000, Math.floor(baseTimeoutMs * 0.45)));
if (attempt === 2) return Math.min(baseTimeoutMs + 3000, Math.max(6000, Math.floor(baseTimeoutMs * 0.7)));
return baseTimeoutMs + 3000;
}
function tunnelErrorBody(
service: MicroserviceConfig,
requestId: string,
error: string,
stage: string,
status: number,
extra: Record<string, JsonValue> = {},
): Response {
const response = jsonResponse({
ok: false,
error,
stage,
providerId: service.providerId,
serviceId: service.id,
requestId,
...extra,
}, status);
response.headers.set("x-unidesk-request-id", requestId);
response.headers.set("x-unidesk-provider-id", service.providerId);
response.headers.set("x-unidesk-service-id", service.id);
response.headers.set("x-unidesk-tunnel-error", stage);
if (status === 502 || status === 503 || status === 504) response.headers.set("x-unidesk-transient-error", "true");
return response;
}
function providerHttpTunnelFailureStatus(reason: HttpTunnelFailureReason | null): number {
if (reason === "aborted") return 499;
if (reason === "provider-disconnected") return 503;
if (reason === "send-failed") return 502;
return 504;
}
function tunnelFailureRetryable(reason: HttpTunnelFailureReason | null): boolean {
return reason === "timeout" || reason === "provider-disconnected" || reason === "send-failed";
}
async function waitForProviderHttpTunnelResponse(
providerId: string,
requestId: string,
timeoutMs: number,
abortSignal?: AbortSignal,
): Promise<{ providerId: string; requestId: string; ok: boolean; result: JsonValue } | null> {
): Promise<{ message: { providerId: string; requestId: string; ok: boolean; result: JsonValue } | null; reason: HttpTunnelFailureReason | null }> {
return await new Promise((resolve) => {
let settled = false;
let abortHandler: (() => void) | null = null;
const timer = setTimeout(() => settle(null), Math.max(1, timeoutMs));
const settle = (message: { providerId: string; requestId: string; ok: boolean; result: JsonValue } | null): void => {
const timer = setTimeout(() => settle(null, "timeout"), Math.max(1, timeoutMs));
const settle = (
message: { providerId: string; requestId: string; ok: boolean; result: JsonValue } | null,
reason: HttpTunnelFailureReason | null = null,
): void => {
if (settled) return;
settled = true;
clearTimeout(timer);
if (abortHandler !== null) abortSignal?.removeEventListener("abort", abortHandler);
ctx.httpTunnelWaiters.delete(requestId);
resolve(message);
resolve({ message, reason });
};
abortHandler = () => settle(null);
abortHandler = () => settle(null, "aborted");
if (abortSignal !== undefined) {
if (abortSignal.aborted) {
settle(null);
settle(null, "aborted");
return;
}
abortSignal.addEventListener("abort", abortHandler, { once: true });
}
ctx.httpTunnelWaiters.set(requestId, (message) => {
ctx.httpTunnelWaiters.set(requestId, (message, reason) => {
if (message !== null && message.providerId !== providerId) {
logger("warn", "http_tunnel_provider_mismatch", { requestId, expectedProviderId: providerId, actualProviderId: message.providerId });
settle(null);
settle(null, "provider-mismatch");
return;
}
settle(message);
settle(message, reason ?? null);
});
});
}
@@ -676,32 +889,116 @@ async function providerHttpTunnelMicroserviceResponse(
bodyText: string,
abortSignal?: AbortSignal,
): Promise<Response> {
const socket = ctx.activeProviders.get(service.providerId);
if (socket === undefined) return jsonResponse({ ok: false, error: `provider is offline: ${service.providerId}` }, 503);
const requestId = providerHttpTunnelRequestId(service.providerId);
const timeoutMs = service.backend.timeoutMs + 3000;
const waiter = waitForProviderHttpTunnelResponse(service.providerId, requestId, timeoutMs, abortSignal);
socket.send(JSON.stringify({
type: "http_tunnel_request",
requestId,
payload: {
source: "microservice-frontend-proxy",
serviceId: service.id,
method,
targetBaseUrl: service.backend.nodeBaseUrl,
path: targetPath,
query: proxyOptions.query,
requestHeaders,
bodyText,
jsonArrayLimits: proxyOptions.jsonArrayLimits,
timeoutMs: service.backend.timeoutMs,
cacheTtlMs: providerMicroserviceCacheTtlMs(service.id, targetPath),
},
}));
const message = await waiter;
if (message === null) return jsonResponse({ ok: false, error: "provider HTTP tunnel timed out or disconnected", providerId: service.providerId, requestId }, 504);
if (!message.ok) return jsonResponse({ ok: false, error: "provider HTTP tunnel failed", providerId: service.providerId, requestId, result: message.result }, 502);
return responseFromProviderMicroserviceResult(dockerStatusRecord(message.result), "provider-ws-http-tunnel");
const retryable = canRetryProviderHttpTunnel(method, targetPath);
const maxAttempts = retryable ? providerHttpTunnelMaxAttempts : 1;
const attempts: JsonValue[] = [];
let lastRequestId = "";
for (let attempt = 1; attempt <= maxAttempts; attempt += 1) {
const socket = ctx.activeProviders.get(service.providerId);
const requestId = providerHttpTunnelRequestId(service.providerId);
lastRequestId = requestId;
if (socket === undefined) {
attempts.push({ attempt, requestId, ok: false, reason: "provider-offline" });
return tunnelErrorBody(service, requestId, `provider is offline: ${service.providerId}`, "provider-gateway-online", 503, {
retryable,
attempts,
});
}
const timeoutMs = providerHttpTunnelWaitMs(service, attempt, retryable);
const startedAt = Date.now();
const waiter = waitForProviderHttpTunnelResponse(service.providerId, requestId, timeoutMs, abortSignal);
try {
socket.send(JSON.stringify({
type: "http_tunnel_request",
requestId,
payload: {
source: "microservice-frontend-proxy",
serviceId: service.id,
method,
targetBaseUrl: service.backend.nodeBaseUrl,
path: targetPath,
query: proxyOptions.query,
requestHeaders,
bodyText,
jsonArrayLimits: proxyOptions.jsonArrayLimits,
timeoutMs: service.backend.timeoutMs,
cacheTtlMs: providerMicroserviceCacheTtlMs(service.id, targetPath),
},
}));
} catch (error) {
ctx.httpTunnelWaiters.get(requestId)?.(null, "send-failed");
const durationMs = Date.now() - startedAt;
attempts.push({ attempt, requestId, ok: false, reason: "send-failed", durationMs, error: errorToJson(error) });
if (attempt < maxAttempts) {
logger("warn", "http_tunnel_send_retry", { providerId: service.providerId, serviceId: service.id, requestId, attempt, maxAttempts, error: errorToJson(error) });
await Bun.sleep(Math.min(500, 75 * attempt));
continue;
}
return tunnelErrorBody(service, requestId, "provider HTTP tunnel send failed", "http-tunnel-send", 502, {
retryable,
attempts,
detail: errorToJson(error),
});
}
const { message, reason } = await waiter;
const durationMs = Date.now() - startedAt;
if (message === null) {
attempts.push({ attempt, requestId, ok: false, reason: reason ?? "timeout", durationMs, timeoutMs });
if (retryable && tunnelFailureRetryable(reason) && attempt < maxAttempts) {
logger("warn", "http_tunnel_retry", {
providerId: service.providerId,
serviceId: service.id,
requestId,
attempt,
maxAttempts,
reason: reason ?? "timeout",
durationMs,
timeoutMs,
});
await Bun.sleep(Math.min(750, 100 * attempt));
continue;
}
return tunnelErrorBody(
service,
requestId,
"provider HTTP tunnel timed out or disconnected",
reason === "provider-disconnected" ? "http-tunnel-provider-disconnected" : reason === "aborted" ? "client-aborted" : "http-tunnel-wait",
providerHttpTunnelFailureStatus(reason),
{ retryable, attempts, timeoutMs, failureReason: reason ?? "timeout" },
);
}
attempts.push({ attempt, requestId, ok: message.ok, durationMs });
if (!message.ok) {
const result = dockerStatusRecord(message.result);
const resultError = typeof result.error === "string" ? result.error : "provider HTTP tunnel failed";
logger("warn", "http_tunnel_provider_error", {
providerId: service.providerId,
serviceId: service.id,
requestId,
attempt,
maxAttempts,
durationMs,
result: compactJson(result),
});
if (retryable && attempt < maxAttempts) {
await Bun.sleep(Math.min(750, 100 * attempt));
continue;
}
return tunnelErrorBody(service, requestId, "provider HTTP tunnel failed", "provider-gateway-http-fetch", 502, {
retryable,
attempts,
result: message.result,
providerError: resultError,
});
}
const response = responseFromProviderMicroserviceResult(dockerStatusRecord(message.result), "provider-ws-http-tunnel");
response.headers.set("x-unidesk-request-id", requestId);
response.headers.set("x-unidesk-http-tunnel-attempt", String(attempt));
response.headers.set("x-unidesk-http-tunnel-attempts", String(attempts.length));
response.headers.set("x-unidesk-provider-id", service.providerId);
return response;
}
return tunnelErrorBody(service, lastRequestId, "provider HTTP tunnel exhausted attempts", "http-tunnel-wait", 504, { retryable, attempts });
}
async function fetchMicroserviceUpstreamResponse(
@@ -901,14 +1198,29 @@ export async function microserviceRoute(req: Request, url: URL): Promise<Respons
? "/"
: suffix.startsWith(`${proxyPrefix}/`)
? `/${suffix.slice(proxyPrefix.length + 1)}`
: suffix === "diagnostics"
? "/diagnostics"
: suffix === "tunnel-self-test"
? "/tunnel-self-test"
: "";
if (targetPath.length === 0) return jsonResponse({ ok: false, error: "microservice route must be /status, /health, or /proxy/<path>" }, 404);
if (targetPath.length === 0) return jsonResponse({ ok: false, error: "microservice route must be /status, /health, /diagnostics, /tunnel-self-test, or /proxy/<path>" }, 404);
if (suffix === "health" && method !== "GET" && method !== "HEAD") {
return jsonResponse({ ok: false, error: "microservice health only supports GET/HEAD" }, 405);
}
if (suffix === "diagnostics" && method !== "GET" && method !== "HEAD") {
return jsonResponse({ ok: false, error: "microservice diagnostics only supports GET/HEAD" }, 405);
}
if (suffix === "tunnel-self-test" && method !== "GET" && method !== "HEAD") {
return jsonResponse({ ok: false, error: "microservice tunnel self-test only supports GET/HEAD" }, 405);
}
if (!isMicroserviceMethodAllowed(service, method)) {
return jsonResponse({ ok: false, error: "microservice method is not allowed", serviceId, method, allowedMethods: service.backend.allowedMethods }, 405);
}
if (suffix === "diagnostics") {
if (!isK3sctlManagedMicroservice(service)) return strictMicroserviceHealthResponse(service, method === "HEAD");
return k3sctlManagedDiagnosticsResponse(service);
}
if (suffix === "tunnel-self-test") return microserviceTunnelSelfTestResponse(service);
if (!isMicroservicePathAllowed(service, targetPath)) {
return jsonResponse({ ok: false, error: "microservice path is not allowed", serviceId, targetPath }, 403);
}
@@ -951,6 +1263,14 @@ export async function microserviceRoute(req: Request, url: URL): Promise<Respons
}
}
const response = await fetchMicroserviceUpstreamResponse(service, method, targetPath, proxyOptions, requestHeaders, bodyText, req.signal);
if ((method === "GET" || method === "HEAD") && isMicroserviceTransientFailureResponse(response)) {
const stale = readStaleMicroserviceCache(cacheKey) ?? readMicroservicePathFallback(service, method, targetPath);
if (stale !== null) {
stale.headers.set("x-unidesk-cache", "stale-on-transient-failure");
stale.headers.set("x-unidesk-stale-reason", String(response.status));
return stale;
}
}
if ((method === "GET" || method === "HEAD") && cacheTtlMs > 0) {
const snapshot = await cacheableResponseSnapshot(response);
rememberMicroserviceCache(cacheKey, cacheTtlMs, snapshot);
+8 -1
View File
@@ -168,6 +168,13 @@ export interface EgressTcpConnection {
provider: ProviderSocket;
}
export type HttpTunnelFailureReason =
| "timeout"
| "aborted"
| "provider-disconnected"
| "provider-mismatch"
| "send-failed";
export interface MicroserviceProxyCacheEntry {
expiresAt: number;
staleExpiresAt: number;
@@ -193,6 +200,6 @@ export type HttpTunnelWaiter = (message: {
requestId: string;
ok: boolean;
result: JsonValue;
} | null) => void;
} | null, reason?: HttpTunnelFailureReason) => void;
export type LoggerFn = (level: "debug" | "info" | "warn" | "error", message: string, data?: JsonValue) => void;
@@ -723,6 +723,30 @@ function parseCurlHeaderBody(output: Buffer): { status: number; contentType: str
return { status: Number.isFinite(status) ? status : 502, contentType, bodyText };
}
function bodyPreview(bodyText: string, contentType: string): JsonValue {
if (contentType.toLowerCase().includes("json")) {
try {
return JSON.parse(bodyText) as JsonValue;
} catch {
return bodyText.slice(0, 2000);
}
}
return bodyText.slice(0, 2000);
}
function responseProbeRecord(response: Response, bodyText: string, startedAt: number): JsonRecord {
const contentType = response.headers.get("content-type") ?? "application/octet-stream";
return {
ok: response.ok,
status: response.ok ? "healthy" : "unhealthy",
upstreamStatus: response.status,
contentType,
proxyMode: response.headers.get("x-unidesk-proxy-mode") ?? "",
durationMs: Date.now() - startedAt,
body: bodyPreview(bodyText, contentType),
};
}
async function kubeApiServiceProxyResponse(
service: ManagedService,
req: Request,
@@ -733,6 +757,25 @@ async function kubeApiServiceProxyResponse(
return kubeApiProxyResponse(service, req, serviceProxyApiPath(service, targetPath), query, timeoutMs);
}
async function kubeApiServiceProxyProbe(service: ManagedService, targetPath: string, timeoutMs: number): Promise<JsonRecord> {
const startedAt = Date.now();
try {
const request = new Request("http://k3sctl-adapter.local/diagnostics", { method: "GET", headers: { accept: "application/json" } });
const response = await kubeApiServiceProxyResponse(service, request, targetPath, "", timeoutMs);
const bodyText = await response.text();
return responseProbeRecord(response, bodyText, startedAt);
} catch (error) {
return {
ok: false,
status: "unhealthy",
upstreamStatus: null,
proxyMode: "kubernetes-api-service-proxy",
durationMs: Date.now() - startedAt,
error: errorToJson(error),
};
}
}
async function nativeServiceProxyResponse(
service: ManagedService,
req: Request,
@@ -1116,6 +1159,74 @@ async function controlPlaneSnapshot(): Promise<JsonRecord> {
};
}
async function serviceDiagnostics(service: ManagedService): Promise<JsonRecord> {
const checkedAt = new Date().toISOString();
const healthPath = activeEndpoint(service).healthPath;
const healthTimeoutMs = Math.max(500, Math.min(config.healthTimeoutMs, 5000));
const kubernetesApiServiceProxy = await kubeApiServiceProxyProbe(service, healthPath, healthTimeoutMs);
const targetServiceStartedAt = Date.now();
let targetService: JsonRecord;
try {
const nativeRequest = new Request("http://k3sctl-adapter.local/diagnostics", { method: "GET", headers: { accept: "application/json" } });
const native = await nativeServiceProxyResponse(service, nativeRequest.clone(), healthPath, "", healthTimeoutMs);
const response = native ?? await kubeApiServiceProxyResponse(service, nativeRequest, healthPath, "", healthTimeoutMs);
const bodyText = await response.text();
targetService = responseProbeRecord(response, bodyText, targetServiceStartedAt);
} catch (error) {
targetService = {
ok: false,
status: "unhealthy",
upstreamStatus: null,
proxyMode: "k3sctl-service-health",
durationMs: Date.now() - targetServiceStartedAt,
error: errorToJson(error),
};
}
const managedService = await serviceStatus(service).then((status) => ({
ok: status.healthy === true,
status: String(status.status ?? ""),
servingHealthy: status.servingHealthy === true,
topologyComplete: status.topologyComplete === true,
activeInstanceId: String(status.activeInstanceId ?? ""),
active: status.active ?? null,
missingNodeIds: Array.isArray(status.missingNodeIds) ? status.missingNodeIds as JsonValue : [],
} satisfies JsonRecord)).catch((error) => ({
ok: false,
status: "unhealthy",
error: errorToJson(error),
} satisfies JsonRecord));
const kubernetesApiServiceProxyOk = kubernetesApiServiceProxy.ok === true;
const targetServiceOk = targetService.ok === true;
const checks = {
k3sctlAdapter: {
ok: true,
nodeId: config.nodeId,
clusterId: config.clusterId,
startedAt,
},
kubernetesApiServiceProxy: {
...kubernetesApiServiceProxy,
configured: kubeClient !== null,
kubeconfigPath: config.kubeconfigPath,
connectHost: config.kubeApiConnectHost,
},
targetService,
managedService,
} satisfies Record<string, JsonValue>;
const ok = kubernetesApiServiceProxyOk && targetServiceOk;
return {
ok,
service: "k3sctl-adapter",
serviceId: service.id,
namespace: service.namespace,
checkedAt,
healthPath,
route: service.route,
noFallback: true,
checks,
};
}
function forwardHeaders(request: Request): Headers {
const headers = new Headers();
for (const name of ["accept", "content-type", "x-requested-with"]) {
@@ -1165,6 +1276,13 @@ async function route(req: Request): Promise<Response> {
const status = await serviceStatus(service);
return req.method === "HEAD" ? new Response(null, { status: status.healthy === true ? 200 : 503 }) : jsonResponse({ ok: status.healthy === true, managedService: status }, status.healthy === true ? 200 : 503);
}
const diagnosticsMatch = url.pathname.match(/^\/api\/services\/([^/]+)\/diagnostics$/u);
if (diagnosticsMatch !== null && (req.method === "GET" || req.method === "HEAD")) {
const service = serviceById(decodeURIComponent(diagnosticsMatch[1] ?? ""));
if (service === null) return jsonResponse({ ok: false, error: "managed service not found" }, 404);
const diagnostics = await serviceDiagnostics(service);
return req.method === "HEAD" ? new Response(null, { status: diagnostics.ok === true ? 200 : 503 }) : jsonResponse(diagnostics, diagnostics.ok === true ? 200 : 503);
}
const proxyMatch = url.pathname.match(/^\/api\/services\/([^/]+)\/proxy(\/.*)$/u);
if (proxyMatch !== null) {
const service = serviceById(decodeURIComponent(proxyMatch[1] ?? ""));