From ebe506bdd01e8603f98c7a2b1ebddb9700111984 Mon Sep 17 00:00:00 2001 From: Codex Date: Sat, 16 May 2026 18:03:04 +0000 Subject: [PATCH] feat: tunnel k3s native service dataplane Resolve k3s Service ClusterIP through the control API, then reuse an SSH local forward for adapter data-plane requests so UI polling no longer depends on Kubernetes API service proxy. --- .../k3sctl-adapter/docker-compose.d601.yml | 3 + .../microservices/k3sctl-adapter/src/index.ts | 308 ++++++++++++++++-- 2 files changed, 280 insertions(+), 31 deletions(-) diff --git a/src/components/microservices/k3sctl-adapter/docker-compose.d601.yml b/src/components/microservices/k3sctl-adapter/docker-compose.d601.yml index 70da03d2..cd6b5771 100644 --- a/src/components/microservices/k3sctl-adapter/docker-compose.d601.yml +++ b/src/components/microservices/k3sctl-adapter/docker-compose.d601.yml @@ -32,8 +32,11 @@ services: K3SCTL_KUBE_API_REMOTE_HOST: "${K3SCTL_KUBE_API_REMOTE_HOST:-127.0.0.1}" K3SCTL_KUBE_API_REMOTE_PORT: "${K3SCTL_KUBE_API_REMOTE_PORT:-6443}" K3SCTL_NATIVE_SERVICE_PROXY_ENABLED: "${K3SCTL_NATIVE_SERVICE_PROXY_ENABLED:-true}" + K3SCTL_NATIVE_SERVICE_SSH_TUNNEL_ENABLED: "${K3SCTL_NATIVE_SERVICE_SSH_TUNNEL_ENABLED:-true}" K3SCTL_NATIVE_SERVICE_PROBE_TIMEOUT_MS: "${K3SCTL_NATIVE_SERVICE_PROBE_TIMEOUT_MS:-1200}" K3SCTL_NATIVE_SERVICE_FAILURE_COOLDOWN_MS: "${K3SCTL_NATIVE_SERVICE_FAILURE_COOLDOWN_MS:-10000}" + K3SCTL_NATIVE_SERVICE_RESOLUTION_TTL_MS: "${K3SCTL_NATIVE_SERVICE_RESOLUTION_TTL_MS:-30000}" + K3SCTL_NATIVE_SERVICE_TUNNEL_CONNECT_TIMEOUT_MS: "${K3SCTL_NATIVE_SERVICE_TUNNEL_CONNECT_TIMEOUT_MS:-3000}" K3SCTL_NATIVE_SERVICE_URL_CODE_QUEUE: "${K3SCTL_NATIVE_SERVICE_URL_CODE_QUEUE:-}" K3SCTL_NATIVE_SERVICE_URL_MDTODO: "${K3SCTL_NATIVE_SERVICE_URL_MDTODO:-}" K3SCTL_MANIFEST_PATHS: "${K3SCTL_MANIFEST_PATHS:-k3s/code-queue.k3s.json,k3s/mdtodo.k3s.json}" diff --git a/src/components/microservices/k3sctl-adapter/src/index.ts b/src/components/microservices/k3sctl-adapter/src/index.ts index 7eb8b3c1..e91639d2 100644 --- a/src/components/microservices/k3sctl-adapter/src/index.ts +++ b/src/components/microservices/k3sctl-adapter/src/index.ts @@ -1,5 +1,6 @@ import { createHourlyJsonlWriter, logRetentionBytesForService } from "../../../shared/src/rotating-jsonl"; import { existsSync, mkdirSync, readFileSync, writeFileSync } from "node:fs"; +import { createConnection, createServer } from "node:net"; import { dirname, isAbsolute, join } from "node:path"; import { tmpdir } from "node:os"; import { fileURLToPath } from "node:url"; @@ -45,8 +46,11 @@ interface RuntimeConfig { kubeconfigPath: string; kubeApiConnectHost: string; nativeServiceProxyEnabled: boolean; + nativeServiceSshTunnelEnabled: boolean; nativeServiceProbeTimeoutMs: number; nativeServiceFailureCooldownMs: number; + nativeServiceResolutionTtlMs: number; + nativeServiceTunnelConnectTimeoutMs: number; requestTimeoutMs: number; healthTimeoutMs: number; services: ManagedService[]; @@ -60,6 +64,21 @@ interface KubeApiClient { keyFile: string; } +interface NativeServiceEndpoint { + namespace: string; + serviceName: string; + servicePort: number; + clusterIP: string; +} + +interface NativeServiceTunnel { + endpoint: NativeServiceEndpoint; + localPort: number; + proc: ReturnType; + closed: boolean; + startedAt: number; +} + const recentLogs: JsonRecord[] = []; const startedAt = new Date().toISOString(); const adapterRoot = join(dirname(fileURLToPath(import.meta.url)), ".."); @@ -73,6 +92,8 @@ const logWriter = config.logFile : null; const kubeClient = loadKubeApiClient(); const nativeServiceFailures = new Map(); +const nativeServiceEndpointCache = new Map(); +const nativeServiceTunnels = new Map(); logWriter?.prune(); function envString(name: string, fallback: string): string { @@ -269,8 +290,11 @@ function readConfig(): RuntimeConfig { kubeconfigPath: envString("K3SCTL_KUBECONFIG_PATH", "/var/lib/unidesk/k3s/kubeconfig"), kubeApiConnectHost: envString("K3SCTL_KUBE_API_CONNECT_HOST", "host.docker.internal"), nativeServiceProxyEnabled: envBool("K3SCTL_NATIVE_SERVICE_PROXY_ENABLED", true), + nativeServiceSshTunnelEnabled: envBool("K3SCTL_NATIVE_SERVICE_SSH_TUNNEL_ENABLED", envBool("K3SCTL_KUBE_API_SSH_TUNNEL_ENABLED", false)), nativeServiceProbeTimeoutMs: Math.max(250, Math.min(5000, envNumber("K3SCTL_NATIVE_SERVICE_PROBE_TIMEOUT_MS", 1200))), nativeServiceFailureCooldownMs: Math.max(1000, Math.min(60_000, envNumber("K3SCTL_NATIVE_SERVICE_FAILURE_COOLDOWN_MS", 10_000))), + nativeServiceResolutionTtlMs: Math.max(1000, Math.min(300_000, envNumber("K3SCTL_NATIVE_SERVICE_RESOLUTION_TTL_MS", 30_000))), + nativeServiceTunnelConnectTimeoutMs: Math.max(250, Math.min(10_000, envNumber("K3SCTL_NATIVE_SERVICE_TUNNEL_CONNECT_TIMEOUT_MS", 3000))), requestTimeoutMs: Math.max(1000, Math.min(120_000, envNumber("K3SCTL_REQUEST_TIMEOUT_MS", 30_000))), healthTimeoutMs: Math.max(500, Math.min(30_000, envNumber("K3SCTL_HEALTH_TIMEOUT_MS", 2500))), services: mergeServices([...manifestServices, ...inlineServices]), @@ -398,9 +422,16 @@ function isKubernetesServiceRoute(service: ManagedService): boolean { return String(service.route.kind ?? "") === "kubernetes-service"; } +function nativeServiceRef(service: ManagedService): { namespace: string; serviceName: string; servicePort: number } { + return { + namespace: service.namespace, + serviceName: routeString(service, "serviceName", service.id), + servicePort: routeNumber(service, "servicePort", 80), + }; +} + function serviceProxyApiPath(service: ManagedService, targetPath: string): string { - const serviceName = routeString(service, "serviceName", service.id); - const servicePort = routeNumber(service, "servicePort", 80); + const { serviceName, servicePort } = nativeServiceRef(service); const safeTargetPath = targetPath.startsWith("/") ? targetPath : `/${targetPath}`; return `/api/v1/namespaces/${encodeURIComponent(service.namespace)}/services/${encodeURIComponent(`${serviceName}:${servicePort}`)}/proxy${safeTargetPath}`; } @@ -415,17 +446,22 @@ function nativeServiceFailureKey(service: ManagedService): string { return `${service.namespace}/${service.id}`; } -function nativeServiceUrl(service: ManagedService, targetPath: string, query = ""): URL { - const serviceName = routeString(service, "serviceName", service.id); - const servicePort = routeNumber(service, "servicePort", 80); +function nativeServiceOverrideBase(service: ManagedService): string | null { + return envOptionalString(`K3SCTL_NATIVE_SERVICE_URL_${service.id.toUpperCase().replace(/[^A-Z0-9]/gu, "_")}`); +} + +function nativeServiceUrlFromBase(base: string, targetPath: string, query = ""): URL { const safeTargetPath = targetPath.startsWith("/") ? targetPath : `/${targetPath}`; - const override = envOptionalString(`K3SCTL_NATIVE_SERVICE_URL_${service.id.toUpperCase().replace(/[^A-Z0-9]/gu, "_")}`); - const base = override ?? `http://${serviceName}.${service.namespace}.svc.cluster.local:${servicePort}`; const url = new URL(safeTargetPath, base.replace(/\/+$/u, "/")); url.search = query; return url; } +function nativeServiceDnsUrl(service: ManagedService, targetPath: string, query = ""): URL { + const { namespace, serviceName, servicePort } = nativeServiceRef(service); + return nativeServiceUrlFromBase(`http://${serviceName}.${namespace}.svc.cluster.local:${servicePort}`, targetPath, query); +} + function nativeServiceProxyUsable(service: ManagedService): boolean { if (!config.nativeServiceProxyEnabled) return false; const failedAt = nativeServiceFailures.get(nativeServiceFailureKey(service)); @@ -442,6 +478,200 @@ function rememberNativeServiceFailure(service: ManagedService, error: unknown): }); } +async function kubeApiJson(apiPath: string, timeoutMs: number): Promise { + if (kubeClient === null) throw new Error("kubernetes api client is not configured"); + const upstreamUrl = new URL(apiPath, kubeClient.serverUrl); + const args = kubeProxyCurlArgs(kubeClient, "GET", upstreamUrl, new Headers({ accept: "application/json" }), false, timeoutMs); + const proc = Bun.spawn(["curl", ...args], { stdout: "pipe", stderr: "pipe" }); + const [stdout, stderr, exitCode] = await Promise.all([ + new Response(proc.stdout).arrayBuffer(), + new Response(proc.stderr).text(), + proc.exited, + ]); + if (exitCode !== 0) throw new Error(`kubernetes api request failed: ${stderr.slice(0, 4000)}`); + const parsed = parseCurlHeaderBody(Buffer.from(stdout)); + if (parsed.status < 200 || parsed.status >= 300) throw new Error(`kubernetes api request returned ${parsed.status}: ${parsed.bodyText.slice(0, 4000)}`); + const value = JSON.parse(parsed.bodyText) as unknown; + if (typeof value !== "object" || value === null || Array.isArray(value)) throw new Error("kubernetes api response must be a JSON object"); + return value as JsonRecord; +} + +function serviceEndpointCacheKey(service: ManagedService): string { + const { namespace, serviceName, servicePort } = nativeServiceRef(service); + return `${namespace}/${serviceName}:${servicePort}`; +} + +function serviceTunnelKey(endpoint: NativeServiceEndpoint): string { + return `${endpoint.namespace}/${endpoint.serviceName}:${endpoint.servicePort}@${endpoint.clusterIP}`; +} + +async function resolveNativeServiceEndpoint(service: ManagedService): Promise { + const cacheKey = serviceEndpointCacheKey(service); + const cached = nativeServiceEndpointCache.get(cacheKey); + if (cached !== undefined && cached.expiresAt > Date.now()) return cached.endpoint; + + const { namespace, serviceName, servicePort } = nativeServiceRef(service); + const serviceJson = await kubeApiJson(`/api/v1/namespaces/${encodeURIComponent(namespace)}/services/${encodeURIComponent(serviceName)}`, config.nativeServiceProbeTimeoutMs); + const spec = typeof serviceJson.spec === "object" && serviceJson.spec !== null ? serviceJson.spec as JsonRecord : {}; + const clusterIP = typeof spec.clusterIP === "string" ? spec.clusterIP : ""; + if (clusterIP.length === 0 || clusterIP === "None") throw new Error(`kubernetes service ${namespace}/${serviceName} has no clusterIP`); + const endpoint = { namespace, serviceName, servicePort, clusterIP }; + nativeServiceEndpointCache.set(cacheKey, { endpoint, expiresAt: Date.now() + config.nativeServiceResolutionTtlMs }); + return endpoint; +} + +async function allocateLocalPort(): Promise { + return await new Promise((resolve, reject) => { + const server = createServer(); + server.once("error", reject); + server.listen(0, "127.0.0.1", () => { + const address = server.address(); + server.close((error) => { + if (error !== undefined) { + reject(error); + return; + } + if (typeof address === "object" && address !== null) resolve(address.port); + else reject(new Error("failed to allocate local port")); + }); + }); + }); +} + +async function waitForLocalTcp(port: number, timeoutMs: number): Promise { + const deadline = Date.now() + timeoutMs; + let lastError: unknown = null; + while (Date.now() < deadline) { + try { + await new Promise((resolve, reject) => { + const socket = createConnection({ host: "127.0.0.1", port }); + socket.setTimeout(Math.min(500, Math.max(50, deadline - Date.now()))); + socket.once("connect", () => { + socket.destroy(); + resolve(); + }); + socket.once("timeout", () => { + socket.destroy(); + reject(new Error("tcp connect timeout")); + }); + socket.once("error", reject); + }); + return; + } catch (error) { + lastError = error; + await Bun.sleep(50); + } + } + throw lastError instanceof Error ? lastError : new Error(`local tcp port ${port} did not become reachable`); +} + +async function ensureNativeServiceTunnel(endpoint: NativeServiceEndpoint): Promise { + const key = serviceTunnelKey(endpoint); + const existing = nativeServiceTunnels.get(key); + if (existing !== undefined && !existing.closed) return existing; + + const sshHost = envString("K3SCTL_KUBE_API_SSH_HOST", "host.docker.internal"); + const sshUser = envString("K3SCTL_KUBE_API_SSH_USER", "ubuntu"); + const sshKey = envString("K3SCTL_KUBE_API_SSH_KEY", "/run/host-ssh/id_ed25519"); + const localPort = await allocateLocalPort(); + const forward = `127.0.0.1:${localPort}:${endpoint.clusterIP}:${endpoint.servicePort}`; + mkdirSync("/tmp/k3sctl-ssh", { recursive: true }); + const proc = Bun.spawn([ + "ssh", + "-i", sshKey, + "-o", "BatchMode=yes", + "-o", "StrictHostKeyChecking=no", + "-o", "UserKnownHostsFile=/tmp/k3sctl-ssh/known_hosts", + "-o", "ExitOnForwardFailure=yes", + "-o", "ServerAliveInterval=15", + "-o", "ServerAliveCountMax=3", + "-N", + "-L", forward, + `${sshUser}@${sshHost}`, + ], { stdout: "ignore", stderr: "ignore" }); + const tunnel: NativeServiceTunnel = { endpoint, localPort, proc, closed: false, startedAt: Date.now() }; + nativeServiceTunnels.set(key, tunnel); + proc.exited.then((exitCode) => { + tunnel.closed = true; + if (nativeServiceTunnels.get(key) === tunnel) nativeServiceTunnels.delete(key); + log("warn", "native_service_ssh_tunnel_exited", { key, exitCode, localPort, serviceName: endpoint.serviceName, clusterIP: endpoint.clusterIP }); + }).catch((error) => { + tunnel.closed = true; + if (nativeServiceTunnels.get(key) === tunnel) nativeServiceTunnels.delete(key); + log("warn", "native_service_ssh_tunnel_failed", { key, localPort, error: errorToJson(error) }); + }); + try { + await waitForLocalTcp(localPort, config.nativeServiceTunnelConnectTimeoutMs); + log("info", "native_service_ssh_tunnel_ready", { key, localPort, serviceName: endpoint.serviceName, servicePort: endpoint.servicePort, clusterIP: endpoint.clusterIP }); + return tunnel; + } catch (error) { + tunnel.closed = true; + proc.kill(); + nativeServiceTunnels.delete(key); + throw error; + } +} + +async function fetchNativeServiceUrl( + service: ManagedService, + upstreamUrl: URL, + req: Request, + headers: Headers, + bodyText: string, + timeoutMs: number, + extraHeaders: Record = {}, +): Promise { + const controller = new AbortController(); + const timer = setTimeout(() => controller.abort(), timeoutMs); + const startedAt = Date.now(); + try { + const upstream = await fetch(upstreamUrl, { + method: req.method, + headers, + body: bodyText.length > 0 ? bodyText : undefined, + signal: controller.signal, + }); + const body = await boundedText(upstream, 8 * 1024 * 1024); + return new Response(body.text, { + status: upstream.status, + headers: { + "content-type": upstream.headers.get("content-type") ?? "application/octet-stream", + "x-unidesk-proxy-mode": "kubernetes-native-service", + "x-unidesk-k3s-service": service.id, + "x-unidesk-k3s-service-url": upstreamUrl.origin, + "x-unidesk-upstream-duration-ms": String(Date.now() - startedAt), + "x-unidesk-response-truncated": body.truncated ? "true" : "false", + ...extraHeaders, + }, + }); + } finally { + clearTimeout(timer); + } +} + +async function nativeServiceSshTunnelResponse( + service: ManagedService, + req: Request, + headers: Headers, + bodyText: string, + targetPath: string, + query: string, + timeoutMs: number, +): Promise { + if (!config.nativeServiceSshTunnelEnabled) throw new Error("native service SSH tunnel is disabled"); + const endpoint = await resolveNativeServiceEndpoint(service); + const tunnel = await ensureNativeServiceTunnel(endpoint); + const upstreamUrl = nativeServiceUrlFromBase(`http://127.0.0.1:${tunnel.localPort}`, targetPath, query); + return await fetchNativeServiceUrl(service, upstreamUrl, req, headers, bodyText, timeoutMs, { + "x-unidesk-k3s-native-transport": "ssh-local-forward", + "x-unidesk-k3s-service-name": endpoint.serviceName, + "x-unidesk-k3s-service-cluster-ip": endpoint.clusterIP, + "x-unidesk-k3s-service-port": String(endpoint.servicePort), + "x-unidesk-k3s-service-url": `http://${endpoint.serviceName}.${endpoint.namespace}.svc.cluster.local:${endpoint.servicePort}`, + "x-unidesk-k3s-service-tunnel-port": String(tunnel.localPort), + }); +} + function kubernetesEndpointServiceRef(service: ManagedService, endpoint: ManagedEndpoint): { namespace: string; serviceRef: string } { const base = new URL(endpoint.baseUrl); if (base.protocol !== "kubernetes:") throw new Error(`endpoint ${endpoint.id} must use kubernetes:// baseUrl`); @@ -511,36 +741,33 @@ async function nativeServiceProxyResponse( timeoutMs: number, ): Promise { if (!nativeServiceProxyUsable(service)) return null; - const upstreamUrl = nativeServiceUrl(service, targetPath, query); const headers = forwardHeaders(req); const bodyText = req.method === "GET" || req.method === "HEAD" ? "" : await req.text(); - const controller = new AbortController(); - const timer = setTimeout(() => controller.abort(), timeoutMs); - const startedAt = Date.now(); + const override = nativeServiceOverrideBase(service); try { - const upstream = await fetch(upstreamUrl, { - method: req.method, - headers, - body: bodyText.length > 0 ? bodyText : undefined, - signal: controller.signal, - }); - const body = await boundedText(upstream, 8 * 1024 * 1024); - return new Response(body.text, { - status: upstream.status, - headers: { - "content-type": upstream.headers.get("content-type") ?? "application/octet-stream", - "x-unidesk-proxy-mode": "kubernetes-native-service", - "x-unidesk-k3s-service": service.id, - "x-unidesk-k3s-service-url": upstreamUrl.origin, - "x-unidesk-upstream-duration-ms": String(Date.now() - startedAt), - "x-unidesk-response-truncated": body.truncated ? "true" : "false", - }, - }); + if (override !== null) { + return await fetchNativeServiceUrl(service, nativeServiceUrlFromBase(override, targetPath, query), req, headers, bodyText, timeoutMs, { + "x-unidesk-k3s-native-transport": "override-url", + }); + } + if (config.nativeServiceSshTunnelEnabled) { + return await nativeServiceSshTunnelResponse(service, req, headers, bodyText, targetPath, query, timeoutMs); + } + try { + return await fetchNativeServiceUrl(service, nativeServiceDnsUrl(service, targetPath, query), req, headers, bodyText, timeoutMs, { + "x-unidesk-k3s-native-transport": "service-dns", + }); + } catch (dnsError) { + log("warn", "native_service_dns_failed", { + serviceId: service.id, + namespace: service.namespace, + error: errorToJson(dnsError), + }); + return await nativeServiceSshTunnelResponse(service, req, headers, bodyText, targetPath, query, timeoutMs); + } } catch (error) { rememberNativeServiceFailure(service, error); return null; - } finally { - clearTimeout(timer); } } @@ -850,6 +1077,25 @@ async function controlPlaneSnapshot(): Promise { nativeServiceProxy: { enabled: config.nativeServiceProxyEnabled, mode: "kubernetes-native-service", + sshTunnelEnabled: config.nativeServiceSshTunnelEnabled, + resolutionTtlMs: config.nativeServiceResolutionTtlMs, + tunnelConnectTimeoutMs: config.nativeServiceTunnelConnectTimeoutMs, + endpointCache: Array.from(nativeServiceEndpointCache.entries()).map(([key, item]) => ({ + key, + serviceName: item.endpoint.serviceName, + servicePort: item.endpoint.servicePort, + clusterIP: item.endpoint.clusterIP, + expiresInMs: Math.max(0, item.expiresAt - Date.now()), + })), + tunnels: Array.from(nativeServiceTunnels.entries()).map(([key, tunnel]) => ({ + key, + localPort: tunnel.localPort, + serviceName: tunnel.endpoint.serviceName, + servicePort: tunnel.endpoint.servicePort, + clusterIP: tunnel.endpoint.clusterIP, + closed: tunnel.closed, + ageMs: Math.max(0, Date.now() - tunnel.startedAt), + })), failureCooldownMs: config.nativeServiceFailureCooldownMs, failedServices: Array.from(nativeServiceFailures.entries()).map(([key, failedAt]) => ({ key,