Files
pikasTech-unidesk/src/components/microservices/k3sctl-adapter/src/index.ts
T
2026-05-17 06:55:04 +00:00

1302 lines
55 KiB
TypeScript

import { createHourlyJsonlWriter, logRetentionBytesForService } from "../../../shared/src/rotating-jsonl";
import { existsSync, mkdirSync, readFileSync, writeFileSync } from "node:fs";
import { createConnection, createServer } from "node:net";
import { dirname, isAbsolute, join } from "node:path";
import { tmpdir } from "node:os";
import { fileURLToPath } from "node:url";
type JsonValue = string | number | boolean | null | JsonValue[] | { [key: string]: JsonValue };
type JsonRecord = Record<string, JsonValue>;
type InstanceRole = "primary" | "standby" | "worker";
type EndpointHealthMode = "service-proxy" | "pod-ready";
interface ManagedEndpoint {
id: string;
nodeId: string;
role: InstanceRole;
baseUrl: string;
healthPath: string;
healthMode: EndpointHealthMode;
}
interface ManagedService {
id: string;
namespace: string;
kind: string;
controlPlane: JsonRecord;
route: JsonRecord;
activeInstanceId: string;
singleWriter: boolean;
requireAllInstancesHealthy: boolean;
expectedNodeIds: string[];
endpoints: ManagedEndpoint[];
}
interface RuntimeConfig {
host: string;
port: number;
logFile: string;
manifestPaths: string[];
clusterId: string;
nodeId: string;
kubectlEnabled: boolean;
kubectlContext: string;
kubeApiProxyEnabled: boolean;
kubeconfigPath: string;
kubeApiConnectHost: string;
nativeServiceProxyEnabled: boolean;
nativeServiceSshTunnelEnabled: boolean;
nativeServiceProbeTimeoutMs: number;
nativeServiceFailureCooldownMs: number;
nativeServiceResolutionTtlMs: number;
nativeServiceTunnelConnectTimeoutMs: number;
requestTimeoutMs: number;
healthTimeoutMs: number;
services: ManagedService[];
}
interface KubeApiClient {
serverUrl: URL;
connectHost: string;
caFile: string;
certFile: string;
keyFile: string;
}
interface NativeServiceEndpoint {
namespace: string;
serviceName: string;
servicePort: number;
clusterIP: string;
}
interface NativeServiceTunnel {
endpoint: NativeServiceEndpoint;
localPort: number;
proc: ReturnType<typeof Bun.spawn>;
closed: boolean;
startedAt: number;
}
const recentLogs: JsonRecord[] = [];
const startedAt = new Date().toISOString();
const adapterRoot = join(dirname(fileURLToPath(import.meta.url)), "..");
const config = readConfig();
const logWriter = config.logFile
? createHourlyJsonlWriter({
baseLogFile: config.logFile,
service: "k3sctl-adapter",
maxBytes: logRetentionBytesForService("k3sctl-adapter"),
})
: null;
const kubeClient = loadKubeApiClient();
const nativeServiceFailures = new Map<string, number>();
const nativeServiceEndpointCache = new Map<string, { endpoint: NativeServiceEndpoint; expiresAt: number }>();
const nativeServiceTunnels = new Map<string, NativeServiceTunnel>();
logWriter?.prune();
function envString(name: string, fallback: string): string {
const value = process.env[name];
return value === undefined || value.length === 0 ? fallback : value;
}
function envNumber(name: string, fallback: number): number {
const raw = process.env[name];
if (raw === undefined || raw.trim().length === 0) return fallback;
const value = Number(raw);
return Number.isFinite(value) && value > 0 ? Math.floor(value) : fallback;
}
function envBool(name: string, fallback: boolean): boolean {
const raw = process.env[name];
if (raw === undefined || raw.trim().length === 0) return fallback;
const normalized = raw.trim().toLowerCase();
if (["1", "true", "yes", "on"].includes(normalized)) return true;
if (["0", "false", "no", "off"].includes(normalized)) return false;
return fallback;
}
function envOptionalString(name: string): string | null {
const value = process.env[name];
return value === undefined || value.trim().length === 0 ? null : value.trim();
}
function asRecord(value: unknown, path: string): Record<string, unknown> {
if (typeof value !== "object" || value === null || Array.isArray(value)) throw new Error(`${path} must be an object`);
return value as Record<string, unknown>;
}
function stringField(value: Record<string, unknown>, key: string, path: string): string {
const field = value[key];
if (typeof field !== "string" || field.length === 0) throw new Error(`${path}.${key} must be a non-empty string`);
return field;
}
function optionalStringField(value: Record<string, unknown>, key: string, fallback: string): string {
const field = value[key];
if (field === undefined || field === null || field === "") return fallback;
if (typeof field !== "string") throw new Error(`${key} must be a string`);
return field;
}
function optionalBoolField(value: Record<string, unknown>, key: string, fallback: boolean): boolean {
const field = value[key];
if (field === undefined || field === null) return fallback;
if (typeof field !== "boolean") throw new Error(`${key} must be a boolean`);
return field;
}
function isJsonValue(value: unknown): value is JsonValue {
if (value === null) return true;
if (typeof value === "string" || typeof value === "number" || typeof value === "boolean") return true;
if (Array.isArray(value)) return value.every(isJsonValue);
if (typeof value === "object") return Object.values(value as Record<string, unknown>).every(isJsonValue);
return false;
}
function manifestJsonRecord(value: unknown, path: string): JsonRecord {
if (value === undefined || value === null) return {};
if (typeof value !== "object" || Array.isArray(value) || !isJsonValue(value)) throw new Error(`${path} must be a JSON object`);
return value as JsonRecord;
}
function stringArrayField(value: Record<string, unknown>, key: string, fallback: string[]): string[] {
const field = value[key];
if (field === undefined || field === null) return fallback;
if (!Array.isArray(field) || field.some((item) => typeof item !== "string" || item.length === 0)) {
throw new Error(`${key} must be an array of non-empty strings`);
}
return field;
}
function normalizeRole(value: string): InstanceRole {
if (value === "primary" || value === "standby" || value === "worker") return value;
return "worker";
}
function normalizeHealthMode(value: string): EndpointHealthMode {
if (value === "service-proxy" || value === "pod-ready") return value;
return "service-proxy";
}
function parseEndpoint(value: unknown, index: number, ownerPath = "endpoint"): ManagedEndpoint {
const path = `${ownerPath}[${index}]`;
const item = asRecord(value, path);
const id = stringField(item, "id", path);
const nodeId = optionalStringField(item, "nodeId", id);
return {
id,
nodeId,
role: normalizeRole(optionalStringField(item, "role", id === "D601" ? "primary" : "standby")),
baseUrl: stringField(item, "baseUrl", path).replace(/\/+$/u, ""),
healthPath: optionalStringField(item, "healthPath", "/health"),
healthMode: normalizeHealthMode(optionalStringField(item, "healthMode", "service-proxy")),
};
}
function parseManagedKubernetesManifest(value: unknown, index: number, ownerPath = "manifests"): ManagedService {
const path = `${ownerPath}[${index}]`;
const manifest = asRecord(value, path);
const metadata = asRecord(manifest.metadata, `${path}.metadata`);
const spec = asRecord(manifest.spec, `${path}.spec`);
const kind = optionalStringField(manifest, "kind", "ManagedKubernetesService");
const serviceId = stringField(metadata, "name", `${path}.metadata`);
if (kind !== "ManagedKubernetesService") throw new Error(`${path}.kind must be ManagedKubernetesService; direct ManagedHttpService manifests are not allowed in pure k3s mode`);
const instancesRaw = spec.instances;
if (!Array.isArray(instancesRaw) || instancesRaw.length === 0) throw new Error(`${path}.spec.instances must be a non-empty array`);
const endpoints = instancesRaw.map((endpoint, endpointIndex) => parseEndpoint(endpoint, endpointIndex, `${path}.spec.instances`));
const activeInstanceId = optionalStringField(spec, "activeInstanceId", endpoints[0]?.id ?? serviceId);
if (!endpoints.some((endpoint) => endpoint.id === activeInstanceId)) throw new Error(`${path}.spec.activeInstanceId must match one instance id`);
const controlPlane = manifestJsonRecord(spec.controlPlane, `${path}.spec.controlPlane`);
const route = manifestJsonRecord(spec.route, `${path}.spec.route`);
if (String(route.kind ?? "") !== "kubernetes-service") throw new Error(`${path}.spec.route.kind must be kubernetes-service`);
return {
id: serviceId,
namespace: optionalStringField(metadata, "namespace", optionalStringField(spec, "namespace", "unidesk")),
kind,
controlPlane,
route,
activeInstanceId,
singleWriter: optionalBoolField(spec, "singleWriter", true),
requireAllInstancesHealthy: optionalBoolField(spec, "requireAllInstancesHealthy", false),
expectedNodeIds: stringArrayField(spec, "expectedNodeIds", endpoints.map((endpoint) => endpoint.nodeId)),
endpoints,
};
}
function parseServiceOrManifest(value: unknown, index: number, ownerPath = "services"): ManagedService {
const item = asRecord(value, `${ownerPath}[${index}]`);
if (typeof item.kind === "string" || item.metadata !== undefined || item.spec !== undefined) {
return parseManagedKubernetesManifest(item, index, ownerPath);
}
throw new Error(`${ownerPath}[${index}] must be a ManagedKubernetesService manifest; static HTTP service declarations are not allowed in pure k3s mode`);
}
function parseServices(raw: string, ownerPath = "K3SCTL_SERVICES_JSON"): ManagedService[] {
const value = raw.trim().length === 0 ? [] : JSON.parse(raw) as unknown;
if (!Array.isArray(value)) throw new Error("K3SCTL_SERVICES_JSON must be an array");
return value.map((item, index) => parseServiceOrManifest(item, index, ownerPath));
}
function manifestPaths(raw: string): string[] {
return raw.split(",").map((item) => item.trim()).filter((item) => item.length > 0);
}
function resolveManifestPath(path: string): string {
if (isAbsolute(path)) return path;
const adapterRelative = join(adapterRoot, path);
if (existsSync(adapterRelative)) return adapterRelative;
return path;
}
function readManifestServices(paths: string[]): ManagedService[] {
const services: ManagedService[] = [];
for (const path of paths) {
const resolved = resolveManifestPath(path);
const parsed = JSON.parse(readFileSync(resolved, "utf8")) as unknown;
const records = Array.isArray(parsed) ? parsed : [parsed];
for (const [index, record] of records.entries()) {
services.push(parseServiceOrManifest(record, index, `manifest:${path}`));
}
}
return services;
}
function mergeServices(services: ManagedService[]): ManagedService[] {
const seen = new Set<string>();
for (const service of services) {
const key = `${service.namespace}/${service.id}`;
if (seen.has(key)) throw new Error(`duplicate k3s managed service: ${key}`);
seen.add(key);
}
return services;
}
function readConfig(): RuntimeConfig {
const paths = manifestPaths(envString("K3SCTL_MANIFEST_PATHS", "k3s/code-queue.k3s.json,k3s/mdtodo.k3s.json,k3s/claudeqq.k3s.json,k3s/decision-center.k3s.json"));
const inlineServices = parseServices(envString("K3SCTL_SERVICES_JSON", "[]"));
const manifestServices = readManifestServices(paths);
return {
host: envString("HOST", "0.0.0.0"),
port: envNumber("PORT", 4266),
logFile: envString("LOG_FILE", "/var/log/unidesk/k3sctl-adapter.jsonl"),
manifestPaths: paths,
clusterId: envString("K3SCTL_CLUSTER_ID", "unidesk-k3s"),
nodeId: envString("K3SCTL_NODE_ID", "D601"),
kubectlEnabled: envBool("K3SCTL_KUBECTL_ENABLED", false),
kubectlContext: envString("K3SCTL_KUBECTL_CONTEXT", ""),
kubeApiProxyEnabled: envBool("K3SCTL_KUBE_API_PROXY_ENABLED", true),
kubeconfigPath: envString("K3SCTL_KUBECONFIG_PATH", "/var/lib/unidesk/k3s/kubeconfig"),
kubeApiConnectHost: envString("K3SCTL_KUBE_API_CONNECT_HOST", "host.docker.internal"),
nativeServiceProxyEnabled: envBool("K3SCTL_NATIVE_SERVICE_PROXY_ENABLED", true),
nativeServiceSshTunnelEnabled: envBool("K3SCTL_NATIVE_SERVICE_SSH_TUNNEL_ENABLED", envBool("K3SCTL_KUBE_API_SSH_TUNNEL_ENABLED", false)),
nativeServiceProbeTimeoutMs: Math.max(250, Math.min(5000, envNumber("K3SCTL_NATIVE_SERVICE_PROBE_TIMEOUT_MS", 1200))),
nativeServiceFailureCooldownMs: Math.max(1000, Math.min(60_000, envNumber("K3SCTL_NATIVE_SERVICE_FAILURE_COOLDOWN_MS", 10_000))),
nativeServiceResolutionTtlMs: Math.max(1000, Math.min(300_000, envNumber("K3SCTL_NATIVE_SERVICE_RESOLUTION_TTL_MS", 30_000))),
nativeServiceTunnelConnectTimeoutMs: Math.max(250, Math.min(10_000, envNumber("K3SCTL_NATIVE_SERVICE_TUNNEL_CONNECT_TIMEOUT_MS", 3000))),
requestTimeoutMs: Math.max(1000, Math.min(120_000, envNumber("K3SCTL_REQUEST_TIMEOUT_MS", 30_000))),
healthTimeoutMs: Math.max(500, Math.min(30_000, envNumber("K3SCTL_HEALTH_TIMEOUT_MS", 2500))),
services: mergeServices([...manifestServices, ...inlineServices]),
};
}
function log(level: "debug" | "info" | "warn" | "error", event: string, detail: JsonRecord = {}): void {
const record: JsonRecord = { at: new Date().toISOString(), service: "k3sctl-adapter", level, event, ...detail };
recentLogs.push(record);
while (recentLogs.length > 500) recentLogs.shift();
try {
logWriter?.appendJson(record, new Date(String(record.at)));
} catch {
// Logging must never break proxying.
}
const line = JSON.stringify(record);
const writer = level === "error" ? console.error : level === "warn" ? console.warn : console.log;
writer(line);
}
function kubeconfigScalar(text: string, key: string): string {
const escaped = key.replace(/[.*+?^${}()|[\]\\]/gu, "\\$&");
const match = text.match(new RegExp(`^\\s*${escaped}:\\s*([^\\s]+)\\s*$`, "mu"));
return match?.[1] ?? "";
}
function writeKubeSecretFile(dir: string, name: string, base64Value: string): string {
const path = join(dir, name);
writeFileSync(path, Buffer.from(base64Value, "base64"), { mode: 0o600 });
return path;
}
function loadKubeApiClient(): KubeApiClient | null {
if (!config.kubeApiProxyEnabled) return null;
if (!existsSync(config.kubeconfigPath)) {
log("warn", "kubeconfig_missing", { path: config.kubeconfigPath });
return null;
}
try {
const raw = readFileSync(config.kubeconfigPath, "utf8");
const server = kubeconfigScalar(raw, "server");
const ca = kubeconfigScalar(raw, "certificate-authority-data");
const cert = kubeconfigScalar(raw, "client-certificate-data");
const key = kubeconfigScalar(raw, "client-key-data");
if (server.length === 0 || ca.length === 0 || cert.length === 0 || key.length === 0) throw new Error("kubeconfig must include server, CA, client certificate, and client key data");
const dir = join(tmpdir(), `unidesk-k3sctl-kube-${config.clusterId}`);
mkdirSync(dir, { recursive: true, mode: 0o700 });
const client: KubeApiClient = {
serverUrl: new URL(server),
connectHost: config.kubeApiConnectHost,
caFile: writeKubeSecretFile(dir, "ca.crt", ca),
certFile: writeKubeSecretFile(dir, "client.crt", cert),
keyFile: writeKubeSecretFile(dir, "client.key", key),
};
log("info", "kube_api_client_loaded", { kubeconfigPath: config.kubeconfigPath, serverHost: client.serverUrl.hostname, connectHost: client.connectHost });
return client;
} catch (error) {
log("error", "kube_api_client_failed", { kubeconfigPath: config.kubeconfigPath, error: errorToJson(error) });
return null;
}
}
function jsonResponse(body: unknown, status = 200, headers: Record<string, string> = {}): Response {
return new Response(JSON.stringify(body), {
status,
headers: { "content-type": "application/json; charset=utf-8", ...headers },
});
}
function errorToJson(error: unknown): JsonRecord {
if (error instanceof Error) return { name: error.name, message: error.message, stack: error.stack ?? "" };
return { message: String(error) };
}
function serviceById(id: string): ManagedService | null {
return config.services.find((service) => service.id === id) ?? null;
}
function activeEndpoint(service: ManagedService): ManagedEndpoint {
const endpoint = service.endpoints.find((item) => item.id === service.activeInstanceId);
if (endpoint === undefined) throw new Error(`active endpoint not found for service ${service.id}: ${service.activeInstanceId}`);
return endpoint;
}
function endpointUrl(endpoint: ManagedEndpoint, targetPath: string, query = ""): string {
const base = new URL(endpoint.baseUrl);
const upstream = new URL(targetPath, base);
upstream.search = query;
return upstream.toString();
}
async function boundedText(response: Response, maxBytes = 1_000_000): Promise<{ text: string; truncated: boolean }> {
const reader = response.body?.getReader();
if (reader === undefined) return { text: "", truncated: false };
const chunks: Uint8Array[] = [];
let total = 0;
let truncated = false;
while (true) {
const item = await reader.read();
if (item.done) break;
total += item.value.byteLength;
if (total <= maxBytes) {
chunks.push(item.value);
} else {
truncated = true;
const remaining = Math.max(0, maxBytes - (total - item.value.byteLength));
if (remaining > 0) chunks.push(item.value.slice(0, remaining));
break;
}
}
return { text: Buffer.concat(chunks).toString("utf8"), truncated };
}
function routeString(service: ManagedService, key: string, fallback: string): string {
const value = service.route[key];
return typeof value === "string" && value.length > 0 ? value : fallback;
}
function routeNumber(service: ManagedService, key: string, fallback: number): number {
const value = service.route[key];
return typeof value === "number" && Number.isFinite(value) && value > 0 ? Math.floor(value) : fallback;
}
function isKubernetesServiceRoute(service: ManagedService): boolean {
return String(service.route.kind ?? "") === "kubernetes-service";
}
function nativeServiceRef(service: ManagedService): { namespace: string; serviceName: string; servicePort: number } {
return {
namespace: service.namespace,
serviceName: routeString(service, "serviceName", service.id),
servicePort: routeNumber(service, "servicePort", 80),
};
}
function serviceProxyApiPath(service: ManagedService, targetPath: string): string {
const { serviceName, servicePort } = nativeServiceRef(service);
const safeTargetPath = targetPath.startsWith("/") ? targetPath : `/${targetPath}`;
return `/api/v1/namespaces/${encodeURIComponent(service.namespace)}/services/${encodeURIComponent(`${serviceName}:${servicePort}`)}/proxy${safeTargetPath}`;
}
function endpointProxyApiPath(service: ManagedService, endpoint: ManagedEndpoint, targetPath: string): string {
const { namespace, serviceRef } = kubernetesEndpointServiceRef(service, endpoint);
const safeTargetPath = targetPath.startsWith("/") ? targetPath : `/${targetPath}`;
return `/api/v1/namespaces/${encodeURIComponent(namespace)}/services/${encodeURIComponent(serviceRef)}/proxy${safeTargetPath}`;
}
function nativeServiceFailureKey(service: ManagedService): string {
return `${service.namespace}/${service.id}`;
}
function nativeServiceOverrideBase(service: ManagedService): string | null {
return envOptionalString(`K3SCTL_NATIVE_SERVICE_URL_${service.id.toUpperCase().replace(/[^A-Z0-9]/gu, "_")}`);
}
function nativeServiceUrlFromBase(base: string, targetPath: string, query = ""): URL {
const safeTargetPath = targetPath.startsWith("/") ? targetPath : `/${targetPath}`;
const url = new URL(safeTargetPath, base.replace(/\/+$/u, "/"));
url.search = query;
return url;
}
function nativeServiceDnsUrl(service: ManagedService, targetPath: string, query = ""): URL {
const { namespace, serviceName, servicePort } = nativeServiceRef(service);
return nativeServiceUrlFromBase(`http://${serviceName}.${namespace}.svc.cluster.local:${servicePort}`, targetPath, query);
}
function nativeServiceProxyUsable(service: ManagedService): boolean {
if (!config.nativeServiceProxyEnabled) return false;
const failedAt = nativeServiceFailures.get(nativeServiceFailureKey(service));
return failedAt === undefined || Date.now() - failedAt >= config.nativeServiceFailureCooldownMs;
}
function rememberNativeServiceFailure(service: ManagedService, error: unknown): void {
nativeServiceFailures.set(nativeServiceFailureKey(service), Date.now());
log("warn", "native_service_proxy_failed", {
serviceId: service.id,
namespace: service.namespace,
cooldownMs: config.nativeServiceFailureCooldownMs,
error: errorToJson(error),
});
}
async function kubeApiJson(apiPath: string, timeoutMs: number): Promise<JsonRecord> {
if (kubeClient === null) throw new Error("kubernetes api client is not configured");
const upstreamUrl = new URL(apiPath, kubeClient.serverUrl);
const args = kubeProxyCurlArgs(kubeClient, "GET", upstreamUrl, new Headers({ accept: "application/json" }), false, timeoutMs);
const proc = Bun.spawn(["curl", ...args], { stdout: "pipe", stderr: "pipe" });
const [stdout, stderr, exitCode] = await Promise.all([
new Response(proc.stdout).arrayBuffer(),
new Response(proc.stderr).text(),
proc.exited,
]);
if (exitCode !== 0) throw new Error(`kubernetes api request failed: ${stderr.slice(0, 4000)}`);
const parsed = parseCurlHeaderBody(Buffer.from(stdout));
if (parsed.status < 200 || parsed.status >= 300) throw new Error(`kubernetes api request returned ${parsed.status}: ${parsed.bodyText.slice(0, 4000)}`);
const value = JSON.parse(parsed.bodyText) as unknown;
if (typeof value !== "object" || value === null || Array.isArray(value)) throw new Error("kubernetes api response must be a JSON object");
return value as JsonRecord;
}
function serviceEndpointCacheKey(service: ManagedService): string {
const { namespace, serviceName, servicePort } = nativeServiceRef(service);
return `${namespace}/${serviceName}:${servicePort}`;
}
function serviceTunnelKey(endpoint: NativeServiceEndpoint): string {
return `${endpoint.namespace}/${endpoint.serviceName}:${endpoint.servicePort}@${endpoint.clusterIP}`;
}
async function resolveNativeServiceEndpoint(service: ManagedService): Promise<NativeServiceEndpoint> {
const cacheKey = serviceEndpointCacheKey(service);
const cached = nativeServiceEndpointCache.get(cacheKey);
if (cached !== undefined && cached.expiresAt > Date.now()) return cached.endpoint;
const { namespace, serviceName, servicePort } = nativeServiceRef(service);
const serviceJson = await kubeApiJson(`/api/v1/namespaces/${encodeURIComponent(namespace)}/services/${encodeURIComponent(serviceName)}`, config.nativeServiceProbeTimeoutMs);
const spec = typeof serviceJson.spec === "object" && serviceJson.spec !== null ? serviceJson.spec as JsonRecord : {};
const clusterIP = typeof spec.clusterIP === "string" ? spec.clusterIP : "";
if (clusterIP.length === 0 || clusterIP === "None") throw new Error(`kubernetes service ${namespace}/${serviceName} has no clusterIP`);
const endpoint = { namespace, serviceName, servicePort, clusterIP };
nativeServiceEndpointCache.set(cacheKey, { endpoint, expiresAt: Date.now() + config.nativeServiceResolutionTtlMs });
return endpoint;
}
async function allocateLocalPort(): Promise<number> {
return await new Promise((resolve, reject) => {
const server = createServer();
server.once("error", reject);
server.listen(0, "127.0.0.1", () => {
const address = server.address();
server.close((error) => {
if (error !== undefined) {
reject(error);
return;
}
if (typeof address === "object" && address !== null) resolve(address.port);
else reject(new Error("failed to allocate local port"));
});
});
});
}
async function waitForLocalTcp(port: number, timeoutMs: number): Promise<void> {
const deadline = Date.now() + timeoutMs;
let lastError: unknown = null;
while (Date.now() < deadline) {
try {
await new Promise<void>((resolve, reject) => {
const socket = createConnection({ host: "127.0.0.1", port });
socket.setTimeout(Math.min(500, Math.max(50, deadline - Date.now())));
socket.once("connect", () => {
socket.destroy();
resolve();
});
socket.once("timeout", () => {
socket.destroy();
reject(new Error("tcp connect timeout"));
});
socket.once("error", reject);
});
return;
} catch (error) {
lastError = error;
await Bun.sleep(50);
}
}
throw lastError instanceof Error ? lastError : new Error(`local tcp port ${port} did not become reachable`);
}
async function ensureNativeServiceTunnel(endpoint: NativeServiceEndpoint): Promise<NativeServiceTunnel> {
const key = serviceTunnelKey(endpoint);
const existing = nativeServiceTunnels.get(key);
if (existing !== undefined && !existing.closed) return existing;
const sshHost = envString("K3SCTL_KUBE_API_SSH_HOST", "host.docker.internal");
const sshUser = envString("K3SCTL_KUBE_API_SSH_USER", "ubuntu");
const sshKey = envString("K3SCTL_KUBE_API_SSH_KEY", "/run/host-ssh/id_ed25519");
const localPort = await allocateLocalPort();
const forward = `127.0.0.1:${localPort}:${endpoint.clusterIP}:${endpoint.servicePort}`;
mkdirSync("/tmp/k3sctl-ssh", { recursive: true });
const proc = Bun.spawn([
"ssh",
"-i", sshKey,
"-o", "BatchMode=yes",
"-o", "StrictHostKeyChecking=no",
"-o", "UserKnownHostsFile=/tmp/k3sctl-ssh/known_hosts",
"-o", "ExitOnForwardFailure=yes",
"-o", "ServerAliveInterval=15",
"-o", "ServerAliveCountMax=3",
"-N",
"-L", forward,
`${sshUser}@${sshHost}`,
], { stdout: "ignore", stderr: "ignore" });
const tunnel: NativeServiceTunnel = { endpoint, localPort, proc, closed: false, startedAt: Date.now() };
nativeServiceTunnels.set(key, tunnel);
proc.exited.then((exitCode) => {
tunnel.closed = true;
if (nativeServiceTunnels.get(key) === tunnel) nativeServiceTunnels.delete(key);
log("warn", "native_service_ssh_tunnel_exited", { key, exitCode, localPort, serviceName: endpoint.serviceName, clusterIP: endpoint.clusterIP });
}).catch((error) => {
tunnel.closed = true;
if (nativeServiceTunnels.get(key) === tunnel) nativeServiceTunnels.delete(key);
log("warn", "native_service_ssh_tunnel_failed", { key, localPort, error: errorToJson(error) });
});
try {
await waitForLocalTcp(localPort, config.nativeServiceTunnelConnectTimeoutMs);
log("info", "native_service_ssh_tunnel_ready", { key, localPort, serviceName: endpoint.serviceName, servicePort: endpoint.servicePort, clusterIP: endpoint.clusterIP });
return tunnel;
} catch (error) {
tunnel.closed = true;
proc.kill();
nativeServiceTunnels.delete(key);
throw error;
}
}
async function fetchNativeServiceUrl(
service: ManagedService,
upstreamUrl: URL,
req: Request,
headers: Headers,
bodyText: string,
timeoutMs: number,
extraHeaders: Record<string, string> = {},
): Promise<Response> {
const controller = new AbortController();
const timer = setTimeout(() => controller.abort(), timeoutMs);
const startedAt = Date.now();
try {
const upstream = await fetch(upstreamUrl, {
method: req.method,
headers,
body: bodyText.length > 0 ? bodyText : undefined,
signal: controller.signal,
});
const body = await boundedText(upstream, 8 * 1024 * 1024);
return new Response(body.text, {
status: upstream.status,
headers: {
"content-type": upstream.headers.get("content-type") ?? "application/octet-stream",
"x-unidesk-proxy-mode": "kubernetes-native-service",
"x-unidesk-k3s-service": service.id,
"x-unidesk-k3s-service-url": upstreamUrl.origin,
"x-unidesk-upstream-duration-ms": String(Date.now() - startedAt),
"x-unidesk-response-truncated": body.truncated ? "true" : "false",
...extraHeaders,
},
});
} finally {
clearTimeout(timer);
}
}
async function nativeServiceSshTunnelResponse(
service: ManagedService,
req: Request,
headers: Headers,
bodyText: string,
targetPath: string,
query: string,
timeoutMs: number,
): Promise<Response> {
if (!config.nativeServiceSshTunnelEnabled) throw new Error("native service SSH tunnel is disabled");
const endpoint = await resolveNativeServiceEndpoint(service);
const tunnel = await ensureNativeServiceTunnel(endpoint);
const upstreamUrl = nativeServiceUrlFromBase(`http://127.0.0.1:${tunnel.localPort}`, targetPath, query);
return await fetchNativeServiceUrl(service, upstreamUrl, req, headers, bodyText, timeoutMs, {
"x-unidesk-k3s-native-transport": "ssh-local-forward",
"x-unidesk-k3s-service-name": endpoint.serviceName,
"x-unidesk-k3s-service-cluster-ip": endpoint.clusterIP,
"x-unidesk-k3s-service-port": String(endpoint.servicePort),
"x-unidesk-k3s-service-url": `http://${endpoint.serviceName}.${endpoint.namespace}.svc.cluster.local:${endpoint.servicePort}`,
"x-unidesk-k3s-service-tunnel-port": String(tunnel.localPort),
});
}
function kubernetesEndpointServiceRef(service: ManagedService, endpoint: ManagedEndpoint): { namespace: string; serviceRef: string } {
const base = new URL(endpoint.baseUrl);
if (base.protocol !== "kubernetes:") throw new Error(`endpoint ${endpoint.id} must use kubernetes:// baseUrl`);
const namespace = base.hostname || service.namespace;
const parts = base.pathname.split("/").filter(Boolean);
if (parts.length !== 2 || parts[0] !== "services" || parts[1].length === 0) {
throw new Error(`endpoint ${endpoint.id} baseUrl must be kubernetes://<namespace>/services/<service>:<port>`);
}
return { namespace, serviceRef: parts[1] };
}
function kubeProxyCurlArgs(client: KubeApiClient, method: string, url: URL, headers: Headers, hasBody: boolean, timeoutMs: number): string[] {
const args = [
"-sS",
"--show-error",
"--location",
"--max-time", String(Math.max(1, Math.ceil(timeoutMs / 1000))),
"--request", method,
"--cacert", client.caFile,
"--cert", client.certFile,
"--key", client.keyFile,
"--dump-header", "-",
];
const port = url.port || (url.protocol === "https:" ? "443" : "80");
if ((url.hostname === "127.0.0.1" || url.hostname === "localhost") && client.connectHost.length > 0) {
args.push("--connect-to", `${url.hostname}:${port}:${client.connectHost}:${port}`);
}
for (const [name, value] of headers.entries()) args.push("--header", `${name}: ${value}`);
if (hasBody) args.push("--data-binary", "@-");
args.push(url.toString());
return args;
}
function parseCurlHeaderBody(output: Buffer): { status: number; contentType: string; bodyText: string } {
const text = output.toString("utf8");
const separator = text.indexOf("\r\n\r\n") >= 0 ? "\r\n\r\n" : "\n\n";
const index = text.indexOf(separator);
if (index < 0) return { status: 502, contentType: "text/plain; charset=utf-8", bodyText: text };
let headerText = text.slice(0, index);
let bodyText = text.slice(index + separator.length);
while (/^HTTP\/\d(?:\.\d)?\s+1\d\d\b/mu.test(headerText)) {
const nextIndex = bodyText.indexOf(separator);
if (nextIndex < 0) break;
headerText = bodyText.slice(0, nextIndex);
bodyText = bodyText.slice(nextIndex + separator.length);
}
const status = Number(headerText.match(/^HTTP\/\d(?:\.\d)?\s+(\d+)/mu)?.[1] ?? 502);
const contentType = headerText.match(/^content-type:\s*(.+)$/imu)?.[1]?.trim() || "application/octet-stream";
return { status: Number.isFinite(status) ? status : 502, contentType, bodyText };
}
function bodyPreview(bodyText: string, contentType: string): JsonValue {
if (contentType.toLowerCase().includes("json")) {
try {
return JSON.parse(bodyText) as JsonValue;
} catch {
return bodyText.slice(0, 2000);
}
}
return bodyText.slice(0, 2000);
}
function responseProbeRecord(response: Response, bodyText: string, startedAt: number): JsonRecord {
const contentType = response.headers.get("content-type") ?? "application/octet-stream";
return {
ok: response.ok,
status: response.ok ? "healthy" : "unhealthy",
upstreamStatus: response.status,
contentType,
proxyMode: response.headers.get("x-unidesk-proxy-mode") ?? "",
durationMs: Date.now() - startedAt,
body: bodyPreview(bodyText, contentType),
};
}
async function kubeApiServiceProxyResponse(
service: ManagedService,
req: Request,
targetPath: string,
query: string,
timeoutMs: number,
): Promise<Response> {
return kubeApiProxyResponse(service, req, serviceProxyApiPath(service, targetPath), query, timeoutMs);
}
async function kubeApiServiceProxyProbe(service: ManagedService, targetPath: string, timeoutMs: number): Promise<JsonRecord> {
const startedAt = Date.now();
try {
const request = new Request("http://k3sctl-adapter.local/diagnostics", { method: "GET", headers: { accept: "application/json" } });
const response = await kubeApiServiceProxyResponse(service, request, targetPath, "", timeoutMs);
const bodyText = await response.text();
return responseProbeRecord(response, bodyText, startedAt);
} catch (error) {
return {
ok: false,
status: "unhealthy",
upstreamStatus: null,
proxyMode: "kubernetes-api-service-proxy",
durationMs: Date.now() - startedAt,
error: errorToJson(error),
};
}
}
async function nativeServiceProxyResponse(
service: ManagedService,
req: Request,
targetPath: string,
query: string,
timeoutMs: number,
): Promise<Response | null> {
if (!nativeServiceProxyUsable(service)) return null;
const headers = forwardHeaders(req);
const bodyText = req.method === "GET" || req.method === "HEAD" ? "" : await req.text();
const override = nativeServiceOverrideBase(service);
try {
if (override !== null) {
return await fetchNativeServiceUrl(service, nativeServiceUrlFromBase(override, targetPath, query), req, headers, bodyText, timeoutMs, {
"x-unidesk-k3s-native-transport": "override-url",
});
}
if (config.nativeServiceSshTunnelEnabled) {
return await nativeServiceSshTunnelResponse(service, req, headers, bodyText, targetPath, query, timeoutMs);
}
try {
return await fetchNativeServiceUrl(service, nativeServiceDnsUrl(service, targetPath, query), req, headers, bodyText, timeoutMs, {
"x-unidesk-k3s-native-transport": "service-dns",
});
} catch (dnsError) {
log("warn", "native_service_dns_failed", {
serviceId: service.id,
namespace: service.namespace,
error: errorToJson(dnsError),
});
return await nativeServiceSshTunnelResponse(service, req, headers, bodyText, targetPath, query, timeoutMs);
}
} catch (error) {
rememberNativeServiceFailure(service, error);
return null;
}
}
async function kubeApiEndpointProxyResponse(
service: ManagedService,
endpoint: ManagedEndpoint,
req: Request,
targetPath: string,
query: string,
timeoutMs: number,
): Promise<Response> {
return kubeApiProxyResponse(service, req, endpointProxyApiPath(service, endpoint, targetPath), query, timeoutMs);
}
async function kubeApiProxyResponse(
service: ManagedService,
req: Request,
apiPath: string,
query: string,
timeoutMs: number,
): Promise<Response> {
if (kubeClient === null) {
return jsonResponse({ ok: false, error: "kubernetes api proxy is not configured", serviceId: service.id, kubeconfigPath: config.kubeconfigPath, noFallback: true }, 502);
}
const upstreamUrl = new URL(apiPath, kubeClient.serverUrl);
upstreamUrl.search = query;
const headers = forwardHeaders(req);
const bodyText = req.method === "GET" || req.method === "HEAD" ? "" : await req.text();
const args = kubeProxyCurlArgs(kubeClient, req.method, upstreamUrl, headers, bodyText.length > 0, timeoutMs);
const proc = Bun.spawn(["curl", ...args], {
stdin: bodyText.length > 0 ? "pipe" : "ignore",
stdout: "pipe",
stderr: "pipe",
});
if (bodyText.length > 0) {
proc.stdin?.write(bodyText);
proc.stdin?.end();
}
const [stdout, stderr, exitCode] = await Promise.all([
new Response(proc.stdout).arrayBuffer(),
new Response(proc.stderr).text(),
proc.exited,
]);
if (exitCode !== 0) {
log("error", "kube_api_proxy_failed", { serviceId: service.id, apiPath, exitCode, stderr: stderr.slice(0, 2000), noFallback: true });
return jsonResponse({ ok: false, error: "kubernetes api service proxy failed", serviceId: service.id, detail: stderr.slice(0, 4000), noFallback: true }, 502);
}
const parsed = parseCurlHeaderBody(Buffer.from(stdout));
return new Response(parsed.bodyText, {
status: parsed.status,
headers: {
"content-type": parsed.contentType,
"x-unidesk-proxy-mode": "kubernetes-api-service-proxy",
"x-unidesk-k3s-service": service.id,
"x-unidesk-response-truncated": "false",
},
});
}
async function probeEndpoint(endpoint: ManagedEndpoint): Promise<JsonRecord> {
const controller = new AbortController();
const timer = setTimeout(() => controller.abort(), config.healthTimeoutMs);
const checkedAt = new Date().toISOString();
try {
const response = await fetch(endpointUrl(endpoint, endpoint.healthPath), {
method: "GET",
headers: { accept: "application/json" },
signal: controller.signal,
});
const contentType = response.headers.get("content-type") ?? "application/octet-stream";
const bodyText = await response.text();
let body: JsonValue = bodyText.slice(0, 2000);
try {
body = JSON.parse(bodyText) as JsonValue;
} catch {
// Keep text preview for non-JSON health endpoints.
}
return {
id: endpoint.id,
nodeId: endpoint.nodeId,
role: endpoint.role,
baseUrl: endpoint.baseUrl,
healthPath: endpoint.healthPath,
healthy: response.ok,
status: response.ok ? "healthy" : "unhealthy",
upstreamStatus: response.status,
contentType,
checkedAt,
body,
};
} catch (error) {
return {
id: endpoint.id,
nodeId: endpoint.nodeId,
role: endpoint.role,
baseUrl: endpoint.baseUrl,
healthPath: endpoint.healthPath,
healthy: false,
status: "unhealthy",
upstreamStatus: null,
contentType: null,
checkedAt,
error: error instanceof Error ? error.message : String(error),
};
} finally {
clearTimeout(timer);
}
}
async function probeKubernetesServiceActive(service: ManagedService): Promise<JsonRecord> {
const endpoint = activeEndpoint(service);
return probeKubernetesEndpoint(service, endpoint, true);
}
async function probeKubernetesEndpoint(service: ManagedService, endpoint: ManagedEndpoint, active = false): Promise<JsonRecord> {
if (!active && endpoint.healthMode === "pod-ready") return await probeKubernetesPodReady(service, endpoint);
const checkedAt = new Date().toISOString();
let response: Response;
if (active) {
const request = new Request("http://k3sctl-adapter.local/health", { method: "GET", headers: { accept: "application/json" } });
response = await nativeServiceProxyResponse(service, request.clone(), endpoint.healthPath, "", config.healthTimeoutMs)
?? await kubeApiServiceProxyResponse(service, request, endpoint.healthPath, "", config.healthTimeoutMs);
} else {
response = await kubeApiEndpointProxyResponse(
service,
endpoint,
new Request("http://k3sctl-adapter.local/health", { method: "GET", headers: { accept: "application/json" } }),
endpoint.healthPath,
"",
config.healthTimeoutMs,
);
}
const contentType = response.headers.get("content-type") ?? "application/octet-stream";
const bodyText = await response.text();
let body: JsonValue = bodyText.slice(0, 2000);
try {
body = JSON.parse(bodyText) as JsonValue;
} catch {
// Health endpoint may return text.
}
return {
id: endpoint.id,
nodeId: endpoint.nodeId,
role: endpoint.role,
baseUrl: endpoint.baseUrl,
healthPath: endpoint.healthPath,
healthMode: endpoint.healthMode,
proxyMode: response.headers.get("x-unidesk-proxy-mode") ?? "kubernetes-api-service-proxy",
route: service.route,
healthy: response.ok,
status: response.ok ? "healthy" : "unhealthy",
upstreamStatus: response.status,
contentType,
checkedAt,
body,
};
}
function jsonAtPath(value: unknown, path: string): unknown {
return path.split(".").reduce((current, key) => {
if (typeof current !== "object" || current === null) return undefined;
return (current as Record<string, unknown>)[key];
}, value);
}
function podReady(item: unknown): boolean {
const conditions = jsonAtPath(item, "status.conditions");
return Array.isArray(conditions) && conditions.some((condition) => {
const record = typeof condition === "object" && condition !== null ? condition as Record<string, unknown> : {};
return record.type === "Ready" && record.status === "True";
});
}
function podSummary(item: unknown): JsonRecord {
const metadata = typeof jsonAtPath(item, "metadata") === "object" && jsonAtPath(item, "metadata") !== null ? jsonAtPath(item, "metadata") as Record<string, unknown> : {};
return {
name: typeof metadata.name === "string" ? metadata.name : "",
nodeName: typeof jsonAtPath(item, "spec.nodeName") === "string" ? jsonAtPath(item, "spec.nodeName") as string : "",
phase: typeof jsonAtPath(item, "status.phase") === "string" ? jsonAtPath(item, "status.phase") as string : "",
podIP: typeof jsonAtPath(item, "status.podIP") === "string" ? jsonAtPath(item, "status.podIP") as string : "",
ready: podReady(item),
};
}
async function probeKubernetesPodReady(service: ManagedService, endpoint: ManagedEndpoint): Promise<JsonRecord> {
const checkedAt = new Date().toISOString();
const { namespace } = kubernetesEndpointServiceRef(service, endpoint);
const labelSelector = new URLSearchParams({
labelSelector: `app.kubernetes.io/name=${service.id},unidesk.ai/instance-id=${endpoint.id}`,
}).toString();
const response = await kubeApiProxyResponse(
service,
new Request("http://k3sctl-adapter.local/api/pods", { method: "GET", headers: { accept: "application/json" } }),
`/api/v1/namespaces/${encodeURIComponent(namespace)}/pods`,
`?${labelSelector}`,
config.healthTimeoutMs,
);
const contentType = response.headers.get("content-type") ?? "application/octet-stream";
const bodyText = await response.text();
let body: JsonValue = bodyText.slice(0, 2000);
let pods: JsonRecord[] = [];
try {
const parsed = JSON.parse(bodyText) as JsonRecord;
const items = Array.isArray(parsed.items) ? parsed.items : [];
pods = items.map(podSummary);
body = { itemCount: items.length, pods };
} catch {
// Keep the raw text preview below.
}
const healthy = response.ok && pods.some((pod) => pod.ready === true);
return {
id: endpoint.id,
nodeId: endpoint.nodeId,
role: endpoint.role,
baseUrl: endpoint.baseUrl,
healthPath: endpoint.healthPath,
healthMode: endpoint.healthMode,
proxyMode: "kubernetes-api-pod-readiness",
route: service.route,
healthy,
status: healthy ? "healthy" : "unhealthy",
upstreamStatus: response.status,
contentType,
checkedAt,
body,
};
}
async function serviceStatus(service: ManagedService): Promise<JsonRecord> {
const instances = isKubernetesServiceRoute(service)
? await Promise.all(service.endpoints.map((endpoint) => endpoint.id === service.activeInstanceId ? probeKubernetesServiceActive(service) : probeKubernetesEndpoint(service, endpoint)))
: [{
id: service.activeInstanceId,
nodeId: activeEndpoint(service).nodeId,
role: activeEndpoint(service).role,
baseUrl: activeEndpoint(service).baseUrl,
healthPath: activeEndpoint(service).healthPath,
healthy: false,
status: "invalid-route",
upstreamStatus: null,
contentType: null,
checkedAt: new Date().toISOString(),
error: "k3s managed service route must be kubernetes-service",
noFallback: true,
}];
const active = instances.find((item) => item.id === service.activeInstanceId) ?? null;
const activeHealthy = active?.healthy === true;
const allInstancesHealthy = instances.every((item) => item.healthy === true);
const expectedNodeIds = service.expectedNodeIds;
const presentNodeIds = Array.from(new Set(instances.filter((item) => item.healthy === true).map((item) => String(item.nodeId))));
const missingNodeIds = expectedNodeIds.filter((nodeId) => !presentNodeIds.includes(nodeId));
const topologyComplete = missingNodeIds.length === 0;
const requiredTopologyHealthy = !service.requireAllInstancesHealthy || (topologyComplete && allInstancesHealthy);
const healthy = activeHealthy && requiredTopologyHealthy;
return {
id: service.id,
namespace: service.namespace,
kind: service.kind,
controlPlane: service.controlPlane,
route: service.route,
activeInstanceId: service.activeInstanceId,
singleWriter: service.singleWriter,
expectedNodeIds,
presentNodeIds,
missingNodeIds,
topologyComplete,
topologyHealthy: topologyComplete && allInstancesHealthy,
servingHealthy: activeHealthy,
healthy,
status: healthy ? (topologyComplete ? "healthy" : "degraded") : "unhealthy",
active,
instances,
};
}
async function kubectlSnapshot(): Promise<JsonValue> {
if (!config.kubectlEnabled) return { enabled: false };
const args = ["get", "nodes", "-o", "json"];
if (config.kubectlContext.length > 0) args.unshift("--context", config.kubectlContext);
const proc = Bun.spawn(["kubectl", ...args], { stdout: "pipe", stderr: "pipe" });
const [stdout, stderr, exitCode] = await Promise.all([new Response(proc.stdout).text(), new Response(proc.stderr).text(), proc.exited]);
if (exitCode !== 0) return { enabled: true, ok: false, exitCode, stderr: stderr.slice(0, 4000) };
try {
const parsed = JSON.parse(stdout) as JsonRecord;
const items = Array.isArray(parsed.items) ? parsed.items : [];
return { enabled: true, ok: true, nodeCount: items.length, nodes: items.slice(0, 20) as JsonValue };
} catch (error) {
return { enabled: true, ok: false, error: error instanceof Error ? error.message : String(error), stdout: stdout.slice(0, 4000) };
}
}
async function controlPlaneSnapshot(): Promise<JsonRecord> {
const services = await Promise.all(config.services.map(serviceStatus));
const managedServicesHealthy = services.every((service) => service.healthy === true);
return {
ok: true,
service: "k3sctl-adapter",
clusterId: config.clusterId,
nodeId: config.nodeId,
startedAt,
manifestPaths: config.manifestPaths,
managedServicesHealthy,
noFallback: true,
runtimePath: config.nativeServiceProxyEnabled
? "frontend -> backend-core -> provider websocket HTTP tunnel -> k3sctl-adapter -> kubernetes native service/DNS -> k3s service"
: "frontend -> backend-core -> k3sctl-adapter -> kubernetes api service proxy -> k3s service",
nativeServiceProxy: {
enabled: config.nativeServiceProxyEnabled,
mode: "kubernetes-native-service",
sshTunnelEnabled: config.nativeServiceSshTunnelEnabled,
resolutionTtlMs: config.nativeServiceResolutionTtlMs,
tunnelConnectTimeoutMs: config.nativeServiceTunnelConnectTimeoutMs,
endpointCache: Array.from(nativeServiceEndpointCache.entries()).map(([key, item]) => ({
key,
serviceName: item.endpoint.serviceName,
servicePort: item.endpoint.servicePort,
clusterIP: item.endpoint.clusterIP,
expiresInMs: Math.max(0, item.expiresAt - Date.now()),
})),
tunnels: Array.from(nativeServiceTunnels.entries()).map(([key, tunnel]) => ({
key,
localPort: tunnel.localPort,
serviceName: tunnel.endpoint.serviceName,
servicePort: tunnel.endpoint.servicePort,
clusterIP: tunnel.endpoint.clusterIP,
closed: tunnel.closed,
ageMs: Math.max(0, Date.now() - tunnel.startedAt),
})),
failureCooldownMs: config.nativeServiceFailureCooldownMs,
failedServices: Array.from(nativeServiceFailures.entries()).map(([key, failedAt]) => ({
key,
failedAt: new Date(failedAt).toISOString(),
retryAfterMs: Math.max(0, config.nativeServiceFailureCooldownMs - (Date.now() - failedAt)),
})),
},
kubeApiProxy: {
enabled: config.kubeApiProxyEnabled,
configured: kubeClient !== null,
kubeconfigPath: config.kubeconfigPath,
connectHost: config.kubeApiConnectHost,
serverHost: kubeClient?.serverUrl.hostname ?? null,
mode: "kubernetes-api-service-proxy",
},
services,
kubectl: await kubectlSnapshot(),
};
}
async function serviceDiagnostics(service: ManagedService): Promise<JsonRecord> {
const checkedAt = new Date().toISOString();
const healthPath = activeEndpoint(service).healthPath;
const healthTimeoutMs = Math.max(500, Math.min(config.healthTimeoutMs, 5000));
const kubernetesApiServiceProxy = await kubeApiServiceProxyProbe(service, healthPath, healthTimeoutMs);
const targetServiceStartedAt = Date.now();
let targetService: JsonRecord;
try {
const nativeRequest = new Request("http://k3sctl-adapter.local/diagnostics", { method: "GET", headers: { accept: "application/json" } });
const native = await nativeServiceProxyResponse(service, nativeRequest.clone(), healthPath, "", healthTimeoutMs);
const response = native ?? await kubeApiServiceProxyResponse(service, nativeRequest, healthPath, "", healthTimeoutMs);
const bodyText = await response.text();
targetService = responseProbeRecord(response, bodyText, targetServiceStartedAt);
} catch (error) {
targetService = {
ok: false,
status: "unhealthy",
upstreamStatus: null,
proxyMode: "k3sctl-service-health",
durationMs: Date.now() - targetServiceStartedAt,
error: errorToJson(error),
};
}
const managedService = await serviceStatus(service).then((status) => ({
ok: status.healthy === true,
status: String(status.status ?? ""),
servingHealthy: status.servingHealthy === true,
topologyComplete: status.topologyComplete === true,
activeInstanceId: String(status.activeInstanceId ?? ""),
active: status.active ?? null,
missingNodeIds: Array.isArray(status.missingNodeIds) ? status.missingNodeIds as JsonValue : [],
} satisfies JsonRecord)).catch((error) => ({
ok: false,
status: "unhealthy",
error: errorToJson(error),
} satisfies JsonRecord));
const kubernetesApiServiceProxyOk = kubernetesApiServiceProxy.ok === true;
const targetServiceOk = targetService.ok === true;
const checks = {
k3sctlAdapter: {
ok: true,
nodeId: config.nodeId,
clusterId: config.clusterId,
startedAt,
},
kubernetesApiServiceProxy: {
...kubernetesApiServiceProxy,
configured: kubeClient !== null,
kubeconfigPath: config.kubeconfigPath,
connectHost: config.kubeApiConnectHost,
},
targetService,
managedService,
} satisfies Record<string, JsonValue>;
const ok = kubernetesApiServiceProxyOk && targetServiceOk;
return {
ok,
service: "k3sctl-adapter",
serviceId: service.id,
namespace: service.namespace,
checkedAt,
healthPath,
route: service.route,
noFallback: true,
checks,
};
}
function forwardHeaders(request: Request): Headers {
const headers = new Headers();
for (const name of ["accept", "content-type", "x-requested-with"]) {
const value = request.headers.get(name);
if (value !== null) headers.set(name, value);
}
return headers;
}
async function proxyToService(service: ManagedService, req: Request, targetPath: string, query: string): Promise<Response> {
if (isKubernetesServiceRoute(service)) {
const native = await nativeServiceProxyResponse(service, req.clone(), targetPath, query, config.requestTimeoutMs);
if (native !== null) return native;
return kubeApiServiceProxyResponse(service, req, targetPath, query, config.requestTimeoutMs);
}
log("error", "k3sctl_route_not_kubernetes_service", { serviceId: service.id, route: service.route, noFallback: true });
return jsonResponse({ ok: false, error: "k3s managed service route must be kubernetes-service", serviceId: service.id, route: service.route, noFallback: true }, 500);
}
async function route(req: Request): Promise<Response> {
const url = new URL(req.url);
if (req.method === "OPTIONS") return jsonResponse({ ok: true });
try {
if (url.pathname === "/" || url.pathname === "/health") {
const snapshot = await controlPlaneSnapshot();
return jsonResponse({
ok: true,
service: "k3sctl-adapter",
clusterId: config.clusterId,
nodeId: config.nodeId,
startedAt,
managedServiceCount: config.services.length,
managedServicesHealthy: snapshot.managedServicesHealthy,
kubeApiProxyConfigured: kubeClient !== null,
noFallback: true,
});
}
if (url.pathname === "/logs" && req.method === "GET") return jsonResponse({ ok: true, logs: recentLogs.slice(-100) });
if (url.pathname === "/api/services" && req.method === "GET") {
return jsonResponse({ ok: true, clusterId: config.clusterId, services: await Promise.all(config.services.map(serviceStatus)) });
}
if (url.pathname === "/api/control-plane" && req.method === "GET") return jsonResponse(await controlPlaneSnapshot());
const healthMatch = url.pathname.match(/^\/api\/services\/([^/]+)\/health$/u);
if (healthMatch !== null && (req.method === "GET" || req.method === "HEAD")) {
const service = serviceById(decodeURIComponent(healthMatch[1] ?? ""));
if (service === null) return jsonResponse({ ok: false, error: "managed service not found" }, 404);
const status = await serviceStatus(service);
return req.method === "HEAD" ? new Response(null, { status: status.healthy === true ? 200 : 503 }) : jsonResponse({ ok: status.healthy === true, managedService: status }, status.healthy === true ? 200 : 503);
}
const diagnosticsMatch = url.pathname.match(/^\/api\/services\/([^/]+)\/diagnostics$/u);
if (diagnosticsMatch !== null && (req.method === "GET" || req.method === "HEAD")) {
const service = serviceById(decodeURIComponent(diagnosticsMatch[1] ?? ""));
if (service === null) return jsonResponse({ ok: false, error: "managed service not found" }, 404);
const diagnostics = await serviceDiagnostics(service);
return req.method === "HEAD" ? new Response(null, { status: diagnostics.ok === true ? 200 : 503 }) : jsonResponse(diagnostics, diagnostics.ok === true ? 200 : 503);
}
const proxyMatch = url.pathname.match(/^\/api\/services\/([^/]+)\/proxy(\/.*)$/u);
if (proxyMatch !== null) {
const service = serviceById(decodeURIComponent(proxyMatch[1] ?? ""));
if (service === null) return jsonResponse({ ok: false, error: "managed service not found" }, 404);
const targetPath = proxyMatch[2] ?? "/";
return await proxyToService(service, req, targetPath, url.search);
}
return jsonResponse({ ok: false, error: "not found" }, 404);
} catch (error) {
log("error", "request_failed", { path: url.pathname, error: errorToJson(error) });
return jsonResponse({ ok: false, error: error instanceof Error ? error.message : String(error) }, 500);
}
}
Bun.serve({ hostname: config.host, port: config.port, idleTimeout: 120, fetch: route });
log("info", "service_started", { port: config.port, clusterId: config.clusterId, nodeId: config.nodeId, managedServiceCount: config.services.length });