feat: tunnel k3s native service dataplane

Resolve k3s Service ClusterIP through the control API, then reuse an SSH local forward for adapter data-plane requests so UI polling no longer depends on Kubernetes API service proxy.
This commit is contained in:
Codex
2026-05-16 18:03:04 +00:00
parent 8ab0762ed6
commit ebe506bdd0
2 changed files with 280 additions and 31 deletions
@@ -32,8 +32,11 @@ services:
K3SCTL_KUBE_API_REMOTE_HOST: "${K3SCTL_KUBE_API_REMOTE_HOST:-127.0.0.1}"
K3SCTL_KUBE_API_REMOTE_PORT: "${K3SCTL_KUBE_API_REMOTE_PORT:-6443}"
K3SCTL_NATIVE_SERVICE_PROXY_ENABLED: "${K3SCTL_NATIVE_SERVICE_PROXY_ENABLED:-true}"
K3SCTL_NATIVE_SERVICE_SSH_TUNNEL_ENABLED: "${K3SCTL_NATIVE_SERVICE_SSH_TUNNEL_ENABLED:-true}"
K3SCTL_NATIVE_SERVICE_PROBE_TIMEOUT_MS: "${K3SCTL_NATIVE_SERVICE_PROBE_TIMEOUT_MS:-1200}"
K3SCTL_NATIVE_SERVICE_FAILURE_COOLDOWN_MS: "${K3SCTL_NATIVE_SERVICE_FAILURE_COOLDOWN_MS:-10000}"
K3SCTL_NATIVE_SERVICE_RESOLUTION_TTL_MS: "${K3SCTL_NATIVE_SERVICE_RESOLUTION_TTL_MS:-30000}"
K3SCTL_NATIVE_SERVICE_TUNNEL_CONNECT_TIMEOUT_MS: "${K3SCTL_NATIVE_SERVICE_TUNNEL_CONNECT_TIMEOUT_MS:-3000}"
K3SCTL_NATIVE_SERVICE_URL_CODE_QUEUE: "${K3SCTL_NATIVE_SERVICE_URL_CODE_QUEUE:-}"
K3SCTL_NATIVE_SERVICE_URL_MDTODO: "${K3SCTL_NATIVE_SERVICE_URL_MDTODO:-}"
K3SCTL_MANIFEST_PATHS: "${K3SCTL_MANIFEST_PATHS:-k3s/code-queue.k3s.json,k3s/mdtodo.k3s.json}"
@@ -1,5 +1,6 @@
import { createHourlyJsonlWriter, logRetentionBytesForService } from "../../../shared/src/rotating-jsonl";
import { existsSync, mkdirSync, readFileSync, writeFileSync } from "node:fs";
import { createConnection, createServer } from "node:net";
import { dirname, isAbsolute, join } from "node:path";
import { tmpdir } from "node:os";
import { fileURLToPath } from "node:url";
@@ -45,8 +46,11 @@ interface RuntimeConfig {
kubeconfigPath: string;
kubeApiConnectHost: string;
nativeServiceProxyEnabled: boolean;
nativeServiceSshTunnelEnabled: boolean;
nativeServiceProbeTimeoutMs: number;
nativeServiceFailureCooldownMs: number;
nativeServiceResolutionTtlMs: number;
nativeServiceTunnelConnectTimeoutMs: number;
requestTimeoutMs: number;
healthTimeoutMs: number;
services: ManagedService[];
@@ -60,6 +64,21 @@ interface KubeApiClient {
keyFile: string;
}
interface NativeServiceEndpoint {
namespace: string;
serviceName: string;
servicePort: number;
clusterIP: string;
}
interface NativeServiceTunnel {
endpoint: NativeServiceEndpoint;
localPort: number;
proc: ReturnType<typeof Bun.spawn>;
closed: boolean;
startedAt: number;
}
const recentLogs: JsonRecord[] = [];
const startedAt = new Date().toISOString();
const adapterRoot = join(dirname(fileURLToPath(import.meta.url)), "..");
@@ -73,6 +92,8 @@ const logWriter = config.logFile
: null;
const kubeClient = loadKubeApiClient();
const nativeServiceFailures = new Map<string, number>();
const nativeServiceEndpointCache = new Map<string, { endpoint: NativeServiceEndpoint; expiresAt: number }>();
const nativeServiceTunnels = new Map<string, NativeServiceTunnel>();
logWriter?.prune();
function envString(name: string, fallback: string): string {
@@ -269,8 +290,11 @@ function readConfig(): RuntimeConfig {
kubeconfigPath: envString("K3SCTL_KUBECONFIG_PATH", "/var/lib/unidesk/k3s/kubeconfig"),
kubeApiConnectHost: envString("K3SCTL_KUBE_API_CONNECT_HOST", "host.docker.internal"),
nativeServiceProxyEnabled: envBool("K3SCTL_NATIVE_SERVICE_PROXY_ENABLED", true),
nativeServiceSshTunnelEnabled: envBool("K3SCTL_NATIVE_SERVICE_SSH_TUNNEL_ENABLED", envBool("K3SCTL_KUBE_API_SSH_TUNNEL_ENABLED", false)),
nativeServiceProbeTimeoutMs: Math.max(250, Math.min(5000, envNumber("K3SCTL_NATIVE_SERVICE_PROBE_TIMEOUT_MS", 1200))),
nativeServiceFailureCooldownMs: Math.max(1000, Math.min(60_000, envNumber("K3SCTL_NATIVE_SERVICE_FAILURE_COOLDOWN_MS", 10_000))),
nativeServiceResolutionTtlMs: Math.max(1000, Math.min(300_000, envNumber("K3SCTL_NATIVE_SERVICE_RESOLUTION_TTL_MS", 30_000))),
nativeServiceTunnelConnectTimeoutMs: Math.max(250, Math.min(10_000, envNumber("K3SCTL_NATIVE_SERVICE_TUNNEL_CONNECT_TIMEOUT_MS", 3000))),
requestTimeoutMs: Math.max(1000, Math.min(120_000, envNumber("K3SCTL_REQUEST_TIMEOUT_MS", 30_000))),
healthTimeoutMs: Math.max(500, Math.min(30_000, envNumber("K3SCTL_HEALTH_TIMEOUT_MS", 2500))),
services: mergeServices([...manifestServices, ...inlineServices]),
@@ -398,9 +422,16 @@ function isKubernetesServiceRoute(service: ManagedService): boolean {
return String(service.route.kind ?? "") === "kubernetes-service";
}
function nativeServiceRef(service: ManagedService): { namespace: string; serviceName: string; servicePort: number } {
return {
namespace: service.namespace,
serviceName: routeString(service, "serviceName", service.id),
servicePort: routeNumber(service, "servicePort", 80),
};
}
function serviceProxyApiPath(service: ManagedService, targetPath: string): string {
const serviceName = routeString(service, "serviceName", service.id);
const servicePort = routeNumber(service, "servicePort", 80);
const { serviceName, servicePort } = nativeServiceRef(service);
const safeTargetPath = targetPath.startsWith("/") ? targetPath : `/${targetPath}`;
return `/api/v1/namespaces/${encodeURIComponent(service.namespace)}/services/${encodeURIComponent(`${serviceName}:${servicePort}`)}/proxy${safeTargetPath}`;
}
@@ -415,17 +446,22 @@ function nativeServiceFailureKey(service: ManagedService): string {
return `${service.namespace}/${service.id}`;
}
function nativeServiceUrl(service: ManagedService, targetPath: string, query = ""): URL {
const serviceName = routeString(service, "serviceName", service.id);
const servicePort = routeNumber(service, "servicePort", 80);
function nativeServiceOverrideBase(service: ManagedService): string | null {
return envOptionalString(`K3SCTL_NATIVE_SERVICE_URL_${service.id.toUpperCase().replace(/[^A-Z0-9]/gu, "_")}`);
}
function nativeServiceUrlFromBase(base: string, targetPath: string, query = ""): URL {
const safeTargetPath = targetPath.startsWith("/") ? targetPath : `/${targetPath}`;
const override = envOptionalString(`K3SCTL_NATIVE_SERVICE_URL_${service.id.toUpperCase().replace(/[^A-Z0-9]/gu, "_")}`);
const base = override ?? `http://${serviceName}.${service.namespace}.svc.cluster.local:${servicePort}`;
const url = new URL(safeTargetPath, base.replace(/\/+$/u, "/"));
url.search = query;
return url;
}
function nativeServiceDnsUrl(service: ManagedService, targetPath: string, query = ""): URL {
const { namespace, serviceName, servicePort } = nativeServiceRef(service);
return nativeServiceUrlFromBase(`http://${serviceName}.${namespace}.svc.cluster.local:${servicePort}`, targetPath, query);
}
function nativeServiceProxyUsable(service: ManagedService): boolean {
if (!config.nativeServiceProxyEnabled) return false;
const failedAt = nativeServiceFailures.get(nativeServiceFailureKey(service));
@@ -442,6 +478,200 @@ function rememberNativeServiceFailure(service: ManagedService, error: unknown):
});
}
async function kubeApiJson(apiPath: string, timeoutMs: number): Promise<JsonRecord> {
if (kubeClient === null) throw new Error("kubernetes api client is not configured");
const upstreamUrl = new URL(apiPath, kubeClient.serverUrl);
const args = kubeProxyCurlArgs(kubeClient, "GET", upstreamUrl, new Headers({ accept: "application/json" }), false, timeoutMs);
const proc = Bun.spawn(["curl", ...args], { stdout: "pipe", stderr: "pipe" });
const [stdout, stderr, exitCode] = await Promise.all([
new Response(proc.stdout).arrayBuffer(),
new Response(proc.stderr).text(),
proc.exited,
]);
if (exitCode !== 0) throw new Error(`kubernetes api request failed: ${stderr.slice(0, 4000)}`);
const parsed = parseCurlHeaderBody(Buffer.from(stdout));
if (parsed.status < 200 || parsed.status >= 300) throw new Error(`kubernetes api request returned ${parsed.status}: ${parsed.bodyText.slice(0, 4000)}`);
const value = JSON.parse(parsed.bodyText) as unknown;
if (typeof value !== "object" || value === null || Array.isArray(value)) throw new Error("kubernetes api response must be a JSON object");
return value as JsonRecord;
}
function serviceEndpointCacheKey(service: ManagedService): string {
const { namespace, serviceName, servicePort } = nativeServiceRef(service);
return `${namespace}/${serviceName}:${servicePort}`;
}
function serviceTunnelKey(endpoint: NativeServiceEndpoint): string {
return `${endpoint.namespace}/${endpoint.serviceName}:${endpoint.servicePort}@${endpoint.clusterIP}`;
}
async function resolveNativeServiceEndpoint(service: ManagedService): Promise<NativeServiceEndpoint> {
const cacheKey = serviceEndpointCacheKey(service);
const cached = nativeServiceEndpointCache.get(cacheKey);
if (cached !== undefined && cached.expiresAt > Date.now()) return cached.endpoint;
const { namespace, serviceName, servicePort } = nativeServiceRef(service);
const serviceJson = await kubeApiJson(`/api/v1/namespaces/${encodeURIComponent(namespace)}/services/${encodeURIComponent(serviceName)}`, config.nativeServiceProbeTimeoutMs);
const spec = typeof serviceJson.spec === "object" && serviceJson.spec !== null ? serviceJson.spec as JsonRecord : {};
const clusterIP = typeof spec.clusterIP === "string" ? spec.clusterIP : "";
if (clusterIP.length === 0 || clusterIP === "None") throw new Error(`kubernetes service ${namespace}/${serviceName} has no clusterIP`);
const endpoint = { namespace, serviceName, servicePort, clusterIP };
nativeServiceEndpointCache.set(cacheKey, { endpoint, expiresAt: Date.now() + config.nativeServiceResolutionTtlMs });
return endpoint;
}
async function allocateLocalPort(): Promise<number> {
return await new Promise((resolve, reject) => {
const server = createServer();
server.once("error", reject);
server.listen(0, "127.0.0.1", () => {
const address = server.address();
server.close((error) => {
if (error !== undefined) {
reject(error);
return;
}
if (typeof address === "object" && address !== null) resolve(address.port);
else reject(new Error("failed to allocate local port"));
});
});
});
}
async function waitForLocalTcp(port: number, timeoutMs: number): Promise<void> {
const deadline = Date.now() + timeoutMs;
let lastError: unknown = null;
while (Date.now() < deadline) {
try {
await new Promise<void>((resolve, reject) => {
const socket = createConnection({ host: "127.0.0.1", port });
socket.setTimeout(Math.min(500, Math.max(50, deadline - Date.now())));
socket.once("connect", () => {
socket.destroy();
resolve();
});
socket.once("timeout", () => {
socket.destroy();
reject(new Error("tcp connect timeout"));
});
socket.once("error", reject);
});
return;
} catch (error) {
lastError = error;
await Bun.sleep(50);
}
}
throw lastError instanceof Error ? lastError : new Error(`local tcp port ${port} did not become reachable`);
}
async function ensureNativeServiceTunnel(endpoint: NativeServiceEndpoint): Promise<NativeServiceTunnel> {
const key = serviceTunnelKey(endpoint);
const existing = nativeServiceTunnels.get(key);
if (existing !== undefined && !existing.closed) return existing;
const sshHost = envString("K3SCTL_KUBE_API_SSH_HOST", "host.docker.internal");
const sshUser = envString("K3SCTL_KUBE_API_SSH_USER", "ubuntu");
const sshKey = envString("K3SCTL_KUBE_API_SSH_KEY", "/run/host-ssh/id_ed25519");
const localPort = await allocateLocalPort();
const forward = `127.0.0.1:${localPort}:${endpoint.clusterIP}:${endpoint.servicePort}`;
mkdirSync("/tmp/k3sctl-ssh", { recursive: true });
const proc = Bun.spawn([
"ssh",
"-i", sshKey,
"-o", "BatchMode=yes",
"-o", "StrictHostKeyChecking=no",
"-o", "UserKnownHostsFile=/tmp/k3sctl-ssh/known_hosts",
"-o", "ExitOnForwardFailure=yes",
"-o", "ServerAliveInterval=15",
"-o", "ServerAliveCountMax=3",
"-N",
"-L", forward,
`${sshUser}@${sshHost}`,
], { stdout: "ignore", stderr: "ignore" });
const tunnel: NativeServiceTunnel = { endpoint, localPort, proc, closed: false, startedAt: Date.now() };
nativeServiceTunnels.set(key, tunnel);
proc.exited.then((exitCode) => {
tunnel.closed = true;
if (nativeServiceTunnels.get(key) === tunnel) nativeServiceTunnels.delete(key);
log("warn", "native_service_ssh_tunnel_exited", { key, exitCode, localPort, serviceName: endpoint.serviceName, clusterIP: endpoint.clusterIP });
}).catch((error) => {
tunnel.closed = true;
if (nativeServiceTunnels.get(key) === tunnel) nativeServiceTunnels.delete(key);
log("warn", "native_service_ssh_tunnel_failed", { key, localPort, error: errorToJson(error) });
});
try {
await waitForLocalTcp(localPort, config.nativeServiceTunnelConnectTimeoutMs);
log("info", "native_service_ssh_tunnel_ready", { key, localPort, serviceName: endpoint.serviceName, servicePort: endpoint.servicePort, clusterIP: endpoint.clusterIP });
return tunnel;
} catch (error) {
tunnel.closed = true;
proc.kill();
nativeServiceTunnels.delete(key);
throw error;
}
}
async function fetchNativeServiceUrl(
service: ManagedService,
upstreamUrl: URL,
req: Request,
headers: Headers,
bodyText: string,
timeoutMs: number,
extraHeaders: Record<string, string> = {},
): Promise<Response> {
const controller = new AbortController();
const timer = setTimeout(() => controller.abort(), timeoutMs);
const startedAt = Date.now();
try {
const upstream = await fetch(upstreamUrl, {
method: req.method,
headers,
body: bodyText.length > 0 ? bodyText : undefined,
signal: controller.signal,
});
const body = await boundedText(upstream, 8 * 1024 * 1024);
return new Response(body.text, {
status: upstream.status,
headers: {
"content-type": upstream.headers.get("content-type") ?? "application/octet-stream",
"x-unidesk-proxy-mode": "kubernetes-native-service",
"x-unidesk-k3s-service": service.id,
"x-unidesk-k3s-service-url": upstreamUrl.origin,
"x-unidesk-upstream-duration-ms": String(Date.now() - startedAt),
"x-unidesk-response-truncated": body.truncated ? "true" : "false",
...extraHeaders,
},
});
} finally {
clearTimeout(timer);
}
}
async function nativeServiceSshTunnelResponse(
service: ManagedService,
req: Request,
headers: Headers,
bodyText: string,
targetPath: string,
query: string,
timeoutMs: number,
): Promise<Response> {
if (!config.nativeServiceSshTunnelEnabled) throw new Error("native service SSH tunnel is disabled");
const endpoint = await resolveNativeServiceEndpoint(service);
const tunnel = await ensureNativeServiceTunnel(endpoint);
const upstreamUrl = nativeServiceUrlFromBase(`http://127.0.0.1:${tunnel.localPort}`, targetPath, query);
return await fetchNativeServiceUrl(service, upstreamUrl, req, headers, bodyText, timeoutMs, {
"x-unidesk-k3s-native-transport": "ssh-local-forward",
"x-unidesk-k3s-service-name": endpoint.serviceName,
"x-unidesk-k3s-service-cluster-ip": endpoint.clusterIP,
"x-unidesk-k3s-service-port": String(endpoint.servicePort),
"x-unidesk-k3s-service-url": `http://${endpoint.serviceName}.${endpoint.namespace}.svc.cluster.local:${endpoint.servicePort}`,
"x-unidesk-k3s-service-tunnel-port": String(tunnel.localPort),
});
}
function kubernetesEndpointServiceRef(service: ManagedService, endpoint: ManagedEndpoint): { namespace: string; serviceRef: string } {
const base = new URL(endpoint.baseUrl);
if (base.protocol !== "kubernetes:") throw new Error(`endpoint ${endpoint.id} must use kubernetes:// baseUrl`);
@@ -511,36 +741,33 @@ async function nativeServiceProxyResponse(
timeoutMs: number,
): Promise<Response | null> {
if (!nativeServiceProxyUsable(service)) return null;
const upstreamUrl = nativeServiceUrl(service, targetPath, query);
const headers = forwardHeaders(req);
const bodyText = req.method === "GET" || req.method === "HEAD" ? "" : await req.text();
const controller = new AbortController();
const timer = setTimeout(() => controller.abort(), timeoutMs);
const startedAt = Date.now();
const override = nativeServiceOverrideBase(service);
try {
const upstream = await fetch(upstreamUrl, {
method: req.method,
headers,
body: bodyText.length > 0 ? bodyText : undefined,
signal: controller.signal,
});
const body = await boundedText(upstream, 8 * 1024 * 1024);
return new Response(body.text, {
status: upstream.status,
headers: {
"content-type": upstream.headers.get("content-type") ?? "application/octet-stream",
"x-unidesk-proxy-mode": "kubernetes-native-service",
"x-unidesk-k3s-service": service.id,
"x-unidesk-k3s-service-url": upstreamUrl.origin,
"x-unidesk-upstream-duration-ms": String(Date.now() - startedAt),
"x-unidesk-response-truncated": body.truncated ? "true" : "false",
},
});
if (override !== null) {
return await fetchNativeServiceUrl(service, nativeServiceUrlFromBase(override, targetPath, query), req, headers, bodyText, timeoutMs, {
"x-unidesk-k3s-native-transport": "override-url",
});
}
if (config.nativeServiceSshTunnelEnabled) {
return await nativeServiceSshTunnelResponse(service, req, headers, bodyText, targetPath, query, timeoutMs);
}
try {
return await fetchNativeServiceUrl(service, nativeServiceDnsUrl(service, targetPath, query), req, headers, bodyText, timeoutMs, {
"x-unidesk-k3s-native-transport": "service-dns",
});
} catch (dnsError) {
log("warn", "native_service_dns_failed", {
serviceId: service.id,
namespace: service.namespace,
error: errorToJson(dnsError),
});
return await nativeServiceSshTunnelResponse(service, req, headers, bodyText, targetPath, query, timeoutMs);
}
} catch (error) {
rememberNativeServiceFailure(service, error);
return null;
} finally {
clearTimeout(timer);
}
}
@@ -850,6 +1077,25 @@ async function controlPlaneSnapshot(): Promise<JsonRecord> {
nativeServiceProxy: {
enabled: config.nativeServiceProxyEnabled,
mode: "kubernetes-native-service",
sshTunnelEnabled: config.nativeServiceSshTunnelEnabled,
resolutionTtlMs: config.nativeServiceResolutionTtlMs,
tunnelConnectTimeoutMs: config.nativeServiceTunnelConnectTimeoutMs,
endpointCache: Array.from(nativeServiceEndpointCache.entries()).map(([key, item]) => ({
key,
serviceName: item.endpoint.serviceName,
servicePort: item.endpoint.servicePort,
clusterIP: item.endpoint.clusterIP,
expiresInMs: Math.max(0, item.expiresAt - Date.now()),
})),
tunnels: Array.from(nativeServiceTunnels.entries()).map(([key, tunnel]) => ({
key,
localPort: tunnel.localPort,
serviceName: tunnel.endpoint.serviceName,
servicePort: tunnel.endpoint.servicePort,
clusterIP: tunnel.endpoint.clusterIP,
closed: tunnel.closed,
ageMs: Math.max(0, Date.now() - tunnel.startedAt),
})),
failureCooldownMs: config.nativeServiceFailureCooldownMs,
failedServices: Array.from(nativeServiceFailures.entries()).map(([key, failedAt]) => ({
key,