diff --git a/project-management/PJ2026-01/specs/PJ2026-010605-observability-monitoring.md b/project-management/PJ2026-01/specs/PJ2026-010605-observability-monitoring.md index 7adab3d5..fb8dcf51 100644 --- a/project-management/PJ2026-01/specs/PJ2026-010605-observability-monitoring.md +++ b/project-management/PJ2026-01/specs/PJ2026-010605-observability-monitoring.md @@ -289,6 +289,10 @@ Web 哨兵的首条生产 canary 应覆盖 `workbench-dsflash-go-tool-call-10x` provider egress tunnel 生命周期至少应记录:provider proxy 收到 client 请求、发出 `egress_tcp_open`、core 确认 opened、core connect failed/timeout、remote close、本地 open timeout、client close、pending buffer 超限和 idle close。日志和受控摘要必须包含 `providerId`、`connectionId`、目标 `host`/`port`、`method`、`ageMs`、`pendingBytes` 和低基数 failureKind;不得记录 TCP payload、PostgreSQL DSN 密码、HTTP Authorization、Secret、token 或完整业务正文。 +针对 GitHub release、k3s installer、dependency pull 等大二进制下载,egress tunnel 观测与控制面观测必须分离:provider labels 至少分别暴露 `host.ssh.tcp-pool` readiness 与 egress active/pending/opened/stale tunnel 数,CLI 必须提供独立的 egress drill-down。egress drill-down 只能展示目标 host/port、路径摘要和 query key 摘要,不得展示 URL credential、header、token 或 query value;同时必须保留最近关闭 tunnel 的有界生命周期摘要,用于确认中断下载后的自动回收。 + +backend-core 向 provider WebSocket 写入大流量 `egress_tcp_data` 时,必须避免单个下载流无界挤占 control/dispatch 消息;实现可以使用独立通道、优先级队列、节流/yield 或等价 backpressure 策略,但验收必须证明大文件下载期间 `trans argv true` 仍可在可接受时间内返回。下载中断、provider 断开或发送通道关闭时,core 侧必须有有界时间内的 read/write loop 退出日志与 active connection drain 证据。 + `server start|stop|rebuild|restart` 等创建 `.state/jobs` 的命令默认必须返回 async job 摘要,而不是完整 JSON payload、完整 command、完整 runtime env 或 stdout/stderr。摘要至少包含 job id、目标 service、当前状态、stdout/stderr 路径、`job status` 查询命令和必要的 `server logs`/`server status` 后续命令。完整结构只能通过 `--full`/`--raw` 或 `job status ` 渐进展开。`job` 与常见自然输入 `jobs`、`status|get|read` 应指向同一受控状态入口,避免调用者因为别名缺失退回手工文件读取。 #### 6.9.1 目标架构图 diff --git a/scripts/cli.ts b/scripts/cli.ts index d56cc853..242d3a56 100644 --- a/scripts/cli.ts +++ b/scripts/cli.ts @@ -1,7 +1,7 @@ // SPEC: PJ2026-01060509 出站诊断 draft-2026-06-26-p8-egress-job-friction. // UniDesk CLI dispatcher with bounded server lifecycle and job drill-down output. import { readConfig } from "./src/config"; -import { debugDispatch, debugHealth, debugSshPool, debugTask, isDebugDispatchCommand, type DebugDispatchCommand } from "./src/debug"; +import { debugDispatch, debugEgressProxy, debugHealth, debugSshPool, debugTask, isDebugDispatchCommand, type DebugDispatchCommand } from "./src/debug"; import { isRebuildableService, isRestartableService, rebuildService, restartService, stackLogs, stackStatus, startStack, stopStack, unsupportedRebuildService, unsupportedRestartService } from "./src/docker"; import { emitError, emitJson, emitText, isRenderedCliResult } from "./src/output"; import { cancelJob, jobWithTail, listJobs, listJobsSummary, readJob, renderJobLaunchSummary, renderJobStatusSummary, runJob } from "./src/jobs"; @@ -650,6 +650,15 @@ async function main(): Promise { if (!ok) process.exitCode = 1; return; } + if (sub === "egress-proxy") { + const providerId = third ?? ""; + if (providerId.length === 0) throw new Error("debug egress-proxy requires providerId"); + const result = await debugEgressProxy(config, providerId); + const ok = (result as { ok?: unknown }).ok !== false; + emitJson(commandName, result, ok); + if (!ok) process.exitCode = 1; + return; + } if (sub === "dispatch") { const providerId = isDebugDispatchCommand(third) ? config.providerGateway.id : third ?? config.providerGateway.id; const commandArg = isDebugDispatchCommand(third) ? third : fourth; diff --git a/scripts/src/debug.ts b/scripts/src/debug.ts index 3d57409b..f5900b79 100644 --- a/scripts/src/debug.ts +++ b/scripts/src/debug.ts @@ -199,6 +199,91 @@ export async function debugSshPool(_config: UniDeskConfig, providerId: string): }; } +export async function debugEgressProxy(_config: UniDeskConfig, providerId: string): Promise { + const nodesResponse = await coreInternalFetch("/api/nodes"); + const body = recordValue(recordValue(nodesResponse).body); + const nodes = arrayValue(body.nodes); + const node = nodes + .map((item) => recordValue(item)) + .find((item) => item.providerId === providerId) ?? null; + if (node === null) { + return { + ok: false, + providerId, + degradedReason: "provider-not-found", + nodesFetch: nodesResponse, + next: { fullHealth: "bun scripts/cli.ts debug health" }, + }; + } + const labels = recordValue(node.labels); + const active = Number(labels.providerGatewayEgressProxyActiveTunnels ?? 0); + const pending = Number(labels.providerGatewayEgressProxyPendingTunnels ?? 0); + const opened = Number(labels.providerGatewayEgressProxyOpenedTunnels ?? 0); + const stale = Number(labels.providerGatewayEgressProxyStaleTunnels ?? 0); + const enabled = labels.providerGatewayEgressProxy === true; + const connected = labels.providerGatewayEgressProxyConnected === true; + const sshReady = Number(labels.providerGatewaySshDataPoolReady ?? 0); + const activeTunnelDetails = arrayValue(labels.providerGatewayEgressProxyActiveTunnelDetails); + const recentClosedTunnels = arrayValue(labels.providerGatewayEgressProxyRecentClosedTunnels); + const ok = enabled && connected && stale === 0; + return { + ok, + providerId, + node: { + providerId: node.providerId, + name: node.name, + status: node.status, + lastHeartbeat: node.lastHeartbeat ?? null, + updatedAt: node.updatedAt ?? null, + }, + egressProxy: { + enabled, + connected, + port: labels.providerGatewayEgressProxyPort ?? null, + activeTunnels: Number.isFinite(active) ? active : 0, + pendingTunnels: Number.isFinite(pending) ? pending : 0, + openedTunnels: Number.isFinite(opened) ? opened : 0, + staleTunnels: Number.isFinite(stale) ? stale : 0, + oldestTunnelAgeMs: labels.providerGatewayEgressProxyOldestTunnelAgeMs ?? null, + policy: { + openTimeoutMs: labels.providerGatewayEgressProxyOpenTimeoutMs ?? null, + idleTimeoutMs: labels.providerGatewayEgressProxyIdleTimeoutMs ?? null, + maxTunnelAgeMs: labels.providerGatewayEgressProxyMaxTunnelAgeMs ?? null, + staleTunnelIdleMs: labels.providerGatewayEgressProxyStaleTunnelIdleMs ?? null, + maxPendingBytes: labels.providerGatewayEgressProxyMaxPendingBytes ?? null, + }, + activeTunnelDetails, + activeTunnelDetailsCount: activeTunnelDetails.length, + recentClosedTunnels, + recentClosedTunnelCount: recentClosedTunnels.length, + }, + controlPlaneSeparation: { + sshDataTransport: stringValue(labels.providerGatewaySshDataTransport), + sshDataPoolReady: Number.isFinite(sshReady) ? sshReady : 0, + sshDataPoolDesired: labels.providerGatewaySshDataPoolDesired ?? null, + sshDataPoolClaimed: labels.providerGatewaySshDataPoolClaimed ?? null, + note: "SSH/control readiness is reported separately from egress download tunnel activity.", + }, + classification: !enabled + ? "egress-proxy-disabled" + : !connected + ? "egress-proxy-core-channel-disconnected" + : stale > 0 + ? "egress-tunnel-stale" + : active > 0 + ? "egress-tunnel-active" + : "egress-proxy-ready", + outputPolicy: { + redaction: "activeTunnelDetails expose target host/port and path/query-key summary only; URL credentials, headers, tokens and query values are not emitted.", + boundedRecentClosedTunnels: true, + }, + next: { + smoke: `trans ${providerId} argv true`, + fullHealth: "bun scripts/cli.ts debug health", + }, + }; +} + async function waitForTask(taskId: string, timeoutMs: number): Promise { const started = Date.now(); let latest: unknown = null; diff --git a/scripts/src/help.ts b/scripts/src/help.ts index fea14c0e..67666fc3 100644 --- a/scripts/src/help.ts +++ b/scripts/src/help.ts @@ -95,6 +95,7 @@ export function rootHelp(): unknown { { command: "job cancel ", description: "Cancel a queued/running async job through the .state/jobs control entry and keep a terminal canceled record." }, { command: "debug health", description: "Probe internal core, nodes, system/Docker status, frontend, provider ingress, and public boundary." }, { command: "debug ssh-pool ", description: "Show bounded host.ssh.tcp-pool labels for one provider, including ready/claimed/desired/lastError." }, + { command: "debug egress-proxy ", description: "Show provider-gateway egress proxy tunnel counts, stale tunnel diagnosis, active target summaries, and recent closed tunnel lifecycle without URL credential leakage." }, { command: "debug dispatch [providerId] [docker.ps|provider.upgrade|host.ssh|microservice.http|echo] [--wait-ms N]", description: "Submit a real internal-core dispatch request for CLI debugging." }, { command: "debug task ", description: "Read a dispatched task record from internal core for CLI debugging." }, { command: "network perf [--service code-queue --path /api/tasks/overview?limit=30 --count N --concurrency N --label before|after]", description: "Benchmark frontend -> backend-core -> provider/adapter user-service networking and report latency/proxy-mode distributions." }, @@ -539,15 +540,16 @@ function jobHelp(): unknown { function debugHelp(): unknown { return { - command: "debug health|ssh-pool|dispatch|task", + command: "debug health|ssh-pool|egress-proxy|dispatch|task", output: "json", usage: [ "bun scripts/cli.ts debug health", "bun scripts/cli.ts debug ssh-pool ", + "bun scripts/cli.ts debug egress-proxy ", "bun scripts/cli.ts debug dispatch [providerId] [docker.ps|provider.upgrade|host.ssh|microservice.http|echo] [--wait-ms N]", "bun scripts/cli.ts debug task ", ], - description: "Debug the real core/provider/dispatch paths. ssh-pool returns bounded host.ssh.tcp-pool labels for one provider; do not use debug commands as formal TEST.md acceptance steps.", + description: "Debug the real core/provider/dispatch paths. ssh-pool and egress-proxy are separate so SSH/control readiness is not conflated with long download tunnel activity; do not use debug commands as formal TEST.md acceptance steps.", }; } diff --git a/scripts/src/provider-triage.ts b/scripts/src/provider-triage.ts index d0dfbe4d..04bdeb85 100644 --- a/scripts/src/provider-triage.ts +++ b/scripts/src/provider-triage.ts @@ -154,6 +154,20 @@ function providerGatewaySignal(debug: unknown, providerId: string): ProviderTria providerGatewayVersion: labels.providerGatewayVersion ?? null, hostSshConfigured: labels.hostSshConfigured ?? null, hostSshKeyPresent: labels.hostSshKeyPresent ?? null, + sshDataPool: { + transport: labels.providerGatewaySshDataTransport ?? null, + desired: labels.providerGatewaySshDataPoolDesired ?? null, + ready: labels.providerGatewaySshDataPoolReady ?? null, + claimed: labels.providerGatewaySshDataPoolClaimed ?? null, + }, + egressProxy: { + enabled: labels.providerGatewayEgressProxy ?? null, + connected: labels.providerGatewayEgressProxyConnected ?? null, + activeTunnels: labels.providerGatewayEgressProxyActiveTunnels ?? null, + pendingTunnels: labels.providerGatewayEgressProxyPendingTunnels ?? null, + staleTunnels: labels.providerGatewayEgressProxyStaleTunnels ?? null, + oldestTunnelAgeMs: labels.providerGatewayEgressProxyOldestTunnelAgeMs ?? null, + }, capabilities, }); } diff --git a/src/components/backend-core/src/egress_tcp.rs b/src/components/backend-core/src/egress_tcp.rs index b0fd41f6..9605b88e 100644 --- a/src/components/backend-core/src/egress_tcp.rs +++ b/src/components/backend-core/src/egress_tcp.rs @@ -1,7 +1,7 @@ // SPEC: PJ2026-01060509 出站诊断 draft-2026-06-26-p8-egress-job-friction. // Backend-core side of provider egress TCP bridge with stage logs. use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, Instant}; use anyhow::bail; use axum::extract::ws::Message; @@ -16,9 +16,16 @@ use crate::state::{AppState, ProviderConnection}; pub struct EgressTcpConnection { pub provider_id: String, pub connection_id: String, + pub target_host: String, + pub target_port: i64, + pub created_at: Instant, pub writer: mpsc::UnboundedSender>, } +const EGRESS_TCP_READ_CHUNK_BYTES: usize = 16 * 1024; +const EGRESS_TCP_COOPERATIVE_YIELD_BYTES: u64 = 128 * 1024; +const EGRESS_TCP_COOPERATIVE_YIELD_DELAY_MS: u64 = 1; + fn egress_tcp_key(provider_id: &str, connection_id: &str) -> String { format!("{provider_id}:{connection_id}") } @@ -178,11 +185,14 @@ pub async fn handle_egress_tcp_open( EgressTcpConnection { provider_id: provider_id.clone(), connection_id: connection_id.clone(), + target_host: host.clone(), + target_port: port, + created_at: Instant::now(), writer: tx, }, ); let _ = provider.sender.send(Message::Text( - json!({ "type": "egress_tcp_opened", "connectionId": connection_id }).to_string(), + json!({ "type": "egress_tcp_opened", "connectionId": connection_id.as_str() }).to_string(), )); state.log( "info", @@ -200,15 +210,51 @@ pub async fn handle_egress_tcp_open( let provider = provider.clone(); let provider_id = provider_id.clone(); let connection_id = connection_id.clone(); + let host = host.clone(); async move { - let mut buffer = vec![0_u8; 16 * 1024]; + let mut buffer = vec![0_u8; EGRESS_TCP_READ_CHUNK_BYTES]; + let mut bytes_to_provider: u64 = 0; + let mut bytes_since_yield: u64 = 0; + let mut chunks_to_provider: u64 = 0; loop { match reader.read(&mut buffer).await { Ok(0) => break, Ok(size) => { let data = base64::engine::general_purpose::STANDARD.encode(&buffer[..size]); - let _ = provider.sender.send(Message::Text(json!({ "type": "egress_tcp_data", "connectionId": connection_id, "data": data, "encoding": "base64" }).to_string())); + let sent = provider.sender.send(Message::Text( + json!({ + "type": "egress_tcp_data", + "connectionId": connection_id.as_str(), + "data": data, + "encoding": "base64" + }) + .to_string(), + )); + if sent.is_err() { + state.log( + "warn", + "egress_tcp_provider_channel_closed", + Some(json!({ + "providerId": provider_id.as_str(), + "connectionId": connection_id.as_str(), + "targetHost": host.as_str(), + "targetPort": port, + "failureKind": "provider-channel-closed", + "bytesToProvider": bytes_to_provider, + "chunksToProvider": chunks_to_provider + })), + ); + break; + } + bytes_to_provider += size as u64; + bytes_since_yield += size as u64; + chunks_to_provider += 1; + if bytes_since_yield >= EGRESS_TCP_COOPERATIVE_YIELD_BYTES { + bytes_since_yield = 0; + tokio::task::yield_now().await; + tokio::time::sleep(Duration::from_millis(EGRESS_TCP_COOPERATIVE_YIELD_DELAY_MS)).await; + } } Err(error) => { let error_text = error.to_string(); @@ -232,6 +278,19 @@ pub async fn handle_egress_tcp_open( .lock() .await .remove(&egress_tcp_key(&provider_id, &connection_id)); + state.log( + "info", + "egress_tcp_read_loop_closed", + Some(json!({ + "providerId": provider_id.as_str(), + "connectionId": connection_id.as_str(), + "targetHost": host.as_str(), + "targetPort": port, + "bytesToProvider": bytes_to_provider, + "chunksToProvider": chunks_to_provider, + "yieldEveryBytes": EGRESS_TCP_COOPERATIVE_YIELD_BYTES + })), + ); send_egress_close(&provider, &connection_id, None); } }); @@ -239,8 +298,13 @@ pub async fn handle_egress_tcp_open( let state = state.clone(); let provider_id = provider_id.clone(); let connection_id = connection_id.clone(); + let host = host.clone(); async move { + let mut bytes_from_provider: u64 = 0; + let mut chunks_from_provider: u64 = 0; while let Some(data) = rx.recv().await { + bytes_from_provider += data.len() as u64; + chunks_from_provider += 1; if let Err(error) = writer.write_all(&data).await { state.log( "warn", @@ -260,6 +324,18 @@ pub async fn handle_egress_tcp_open( .lock() .await .remove(&egress_tcp_key(&provider_id, &connection_id)); + state.log( + "info", + "egress_tcp_write_loop_closed", + Some(json!({ + "providerId": provider_id.as_str(), + "connectionId": connection_id.as_str(), + "targetHost": host.as_str(), + "targetPort": port, + "bytesFromProvider": bytes_from_provider, + "chunksFromProvider": chunks_from_provider + })), + ); } }); Ok(()) @@ -313,12 +389,12 @@ pub async fn close_egress_tcp_connection( connection_id: &str, error: Option<&str>, ) { - let removed = state + let removed_connection = state .active_egress_tcp .lock() .await - .remove(&egress_tcp_key(provider_id, connection_id)) - .is_some(); + .remove(&egress_tcp_key(provider_id, connection_id)); + let removed = removed_connection.is_some(); if removed || error.is_some() { state.log( if error.is_some() { "warn" } else { "info" }, @@ -327,6 +403,9 @@ pub async fn close_egress_tcp_connection( "providerId": provider_id, "connectionId": connection_id, "removed": removed, + "targetHost": removed_connection.as_ref().map(|connection| connection.target_host.as_str()), + "targetPort": removed_connection.as_ref().map(|connection| connection.target_port), + "ageMs": removed_connection.as_ref().map(|connection| connection.created_at.elapsed().as_millis() as u64), "failureKind": error.map(egress_failure_kind).unwrap_or("normal-close"), "error": error.unwrap_or("") })), @@ -336,5 +415,18 @@ pub async fn close_egress_tcp_connection( pub async fn close_egress_tcp_connections_for_provider(state: &Arc, provider_id: &str) { let mut connections = state.active_egress_tcp.lock().await; + let before = connections.len(); connections.retain(|_, connection| connection.provider_id != provider_id); + let removed = before.saturating_sub(connections.len()); + if removed > 0 { + state.log( + "warn", + "egress_tcp_provider_connections_drained", + Some(json!({ + "providerId": provider_id, + "removed": removed, + "failureKind": "provider-disconnected" + })), + ); + } } diff --git a/src/components/provider-gateway/package.json b/src/components/provider-gateway/package.json index 84ad108c..bafc518c 100644 --- a/src/components/provider-gateway/package.json +++ b/src/components/provider-gateway/package.json @@ -1,6 +1,6 @@ { "name": "@unidesk/provider-gateway", - "version": "0.2.30", + "version": "0.2.31", "private": true, "type": "module", "scripts": { diff --git a/src/components/provider-gateway/src/egress-proxy.ts b/src/components/provider-gateway/src/egress-proxy.ts index 209d2b67..4ea7b22b 100644 --- a/src/components/provider-gateway/src/egress-proxy.ts +++ b/src/components/provider-gateway/src/egress-proxy.ts @@ -20,15 +20,22 @@ interface ProviderEgressProxyOptions { providerId: string; listenHost: string; listenPort: number; + openTimeoutMs: number; + idleTimeoutMs: number; + maxTunnelAgeMs: number; + staleTunnelIdleMs: number; + maxPendingBytes: number; sendToCore: (message: EgressToCoreMessage) => boolean; isCoreConnected: () => boolean; logger: Logger; + onStatusChanged?: () => void; } interface Tunnel { id: string; client: Socket; method: string; + targetPathSummary: string | null; opened: boolean; pending: Buffer[]; pendingBytes: number; @@ -37,8 +44,28 @@ interface Tunnel { lastActivityAt: number; targetHost: string; targetPort: number; + bytesFromClient: number; + bytesToClient: number; openTimer: ReturnType | null; idleTimer: ReturnType | null; + maxAgeTimer: ReturnType | null; +} + +interface ClosedTunnelSummary { + [key: string]: JsonValue; + connectionId: string; + method: string; + targetHost: string; + targetPort: number; + targetPathSummary: string | null; + opened: boolean; + ageMs: number; + idleMs: number; + closedAt: string; + failureKind: string; + closeReason: string | null; + bytesFromClient: number; + bytesToClient: number; } export interface ProviderEgressProxyHandle { @@ -77,7 +104,13 @@ function parseHttpProxyRequest(headerText: string): { method: string; target: st return { method: match[1] ?? "", target: match[2] ?? "", version: match[3] ?? "HTTP/1.1" }; } -function httpRequestTarget(rawTarget: string): { host: string; port: number; rewrittenTarget: string } | null { +function summarizeHttpTargetPath(url: URL): string { + const queryKeys = Array.from(new Set(Array.from(url.searchParams.keys()).filter(Boolean))).sort(); + if (queryKeys.length === 0) return url.pathname || "/"; + return `${url.pathname || "/"}?${queryKeys.slice(0, 8).join(",")}${queryKeys.length > 8 ? ",..." : ""}`; +} + +function httpRequestTarget(rawTarget: string): { host: string; port: number; rewrittenTarget: string; pathSummary: string } | null { try { const url = new URL(rawTarget); if (url.protocol !== "http:") return null; @@ -85,6 +118,7 @@ function httpRequestTarget(rawTarget: string): { host: string; port: number; rew host: url.hostname, port: url.port.length > 0 ? Number(url.port) : 80, rewrittenTarget: `${url.pathname}${url.search}`, + pathSummary: summarizeHttpTargetPath(url), }; } catch { return null; @@ -125,6 +159,7 @@ function egressFailureKind(error: string | undefined): string { const normalized = error.toLowerCase(); if (normalized.includes("open timeout")) return "open-timeout"; if (normalized.includes("idle timeout")) return "idle-timeout"; + if (normalized.includes("max tunnel age")) return "max-age-exceeded"; if (normalized.includes("pending buffer")) return "pending-buffer-exceeded"; if (normalized.includes("not connected")) return "core-channel-not-connected"; if (normalized.includes("timeout")) return "timeout"; @@ -132,13 +167,15 @@ function egressFailureKind(error: string | undefined): string { return "remote-or-socket-error"; } -const tunnelOpenTimeoutMs = 15_000; -const tunnelIdleTimeoutMs = 600_000; -const maxPendingBytes = 4 * 1024 * 1024; +function tunnelState(tunnel: Tunnel, now: number, staleTunnelIdleMs: number): "pending" | "open" | "stale" { + if (!tunnel.opened) return "pending"; + return now - tunnel.lastActivityAt >= staleTunnelIdleMs ? "stale" : "open"; +} export function startProviderEgressProxy(options: ProviderEgressProxyOptions): ProviderEgressProxyHandle { const proxyUrl = proxyUrlFor(options.listenHost, options.listenPort); const tunnels = new Map(); + const recentClosedTunnels: ClosedTunnelSummary[] = []; let closed = false; const send = (message: EgressToCoreMessage): boolean => options.sendToCore(message); @@ -147,6 +184,26 @@ export function startProviderEgressProxy(options: ProviderEgressProxyOptions): P const now = Date.now(); const tunnelList = Array.from(tunnels.values()); const ages = tunnelList.map((tunnel) => now - tunnel.createdAt); + const details = tunnelList + .sort((left, right) => left.createdAt - right.createdAt) + .slice(0, 12) + .map((tunnel) => { + const idleMs = now - tunnel.lastActivityAt; + return { + connectionId: tunnel.id, + method: tunnel.method, + targetHost: tunnel.targetHost, + targetPort: tunnel.targetPort, + targetPathSummary: tunnel.targetPathSummary, + state: tunnelState(tunnel, now, options.staleTunnelIdleMs), + opened: tunnel.opened, + ageMs: now - tunnel.createdAt, + idleMs, + pendingBytes: tunnel.pendingBytes, + bytesFromClient: tunnel.bytesFromClient, + bytesToClient: tunnel.bytesToClient, + }; + }); return { enabled: true, providerId: options.providerId, @@ -156,10 +213,18 @@ export function startProviderEgressProxy(options: ProviderEgressProxyOptions): P listenPort: options.listenPort, activeTunnels: tunnels.size, pendingTunnels: tunnelList.filter((tunnel) => !tunnel.opened).length, + openedTunnels: tunnelList.filter((tunnel) => tunnel.opened).length, + staleTunnels: tunnelList.filter((tunnel) => tunnelState(tunnel, now, options.staleTunnelIdleMs) === "stale").length, oldestTunnelAgeMs: ages.length > 0 ? Math.max(...ages) : 0, - openTimeoutMs: tunnelOpenTimeoutMs, - idleTimeoutMs: tunnelIdleTimeoutMs, - maxPendingBytes, + openTimeoutMs: options.openTimeoutMs, + idleTimeoutMs: options.idleTimeoutMs, + maxTunnelAgeMs: options.maxTunnelAgeMs, + staleTunnelIdleMs: options.staleTunnelIdleMs, + maxPendingBytes: options.maxPendingBytes, + activeTunnelDetails: details, + activeTunnelDetailLimit: 12, + activeTunnelDetailsTruncated: tunnelList.length > details.length, + recentClosedTunnels, channel: "provider-gateway", }; }; @@ -167,31 +232,59 @@ export function startProviderEgressProxy(options: ProviderEgressProxyOptions): P const clearTunnelTimers = (tunnel: Tunnel): void => { if (tunnel.openTimer !== null) clearTimeout(tunnel.openTimer); if (tunnel.idleTimer !== null) clearTimeout(tunnel.idleTimer); + if (tunnel.maxAgeTimer !== null) clearTimeout(tunnel.maxAgeTimer); tunnel.openTimer = null; tunnel.idleTimer = null; + tunnel.maxAgeTimer = null; }; const destroyTunnel = (tunnel: Tunnel, notifyCore: boolean, error?: string): void => { tunnels.delete(tunnel.id); tunnel.closed = true; + const closedAt = nowIso(); + const ageMs = Date.now() - tunnel.createdAt; + const idleMs = Date.now() - tunnel.lastActivityAt; + const pendingBytes = tunnel.pendingBytes; clearTunnelTimers(tunnel); tunnel.pending.splice(0); tunnel.pendingBytes = 0; if (!tunnel.client.destroyed) tunnel.client.destroy(); if (notifyCore) send({ type: "egress_tcp_close", providerId: options.providerId, connectionId: tunnel.id, at: nowIso() }); + const failureKind = egressFailureKind(error); + recentClosedTunnels.unshift({ + connectionId: tunnel.id, + method: tunnel.method, + targetHost: tunnel.targetHost, + targetPort: tunnel.targetPort, + targetPathSummary: tunnel.targetPathSummary, + opened: tunnel.opened, + ageMs, + idleMs, + closedAt, + failureKind, + closeReason: error ?? null, + bytesFromClient: tunnel.bytesFromClient, + bytesToClient: tunnel.bytesToClient, + }); + recentClosedTunnels.splice(16); const event = { providerId: options.providerId, connectionId: tunnel.id, method: tunnel.method, targetHost: tunnel.targetHost, targetPort: tunnel.targetPort, + targetPathSummary: tunnel.targetPathSummary, opened: tunnel.opened, - ageMs: Date.now() - tunnel.createdAt, - pendingBytes: tunnel.pendingBytes, - failureKind: egressFailureKind(error), + ageMs, + idleMs, + pendingBytes, + bytesFromClient: tunnel.bytesFromClient, + bytesToClient: tunnel.bytesToClient, + failureKind, ...(error === undefined ? {} : { error }), }; options.logger(error === undefined ? "info" : "warn", "egress_proxy_tunnel_closed", event); + options.onStatusChanged?.(); }; const closeTunnel = (id: string, error?: string): void => { @@ -204,15 +297,16 @@ export function startProviderEgressProxy(options: ProviderEgressProxyOptions): P if (tunnel.closed) return; tunnel.lastActivityAt = Date.now(); if (tunnel.idleTimer !== null) clearTimeout(tunnel.idleTimer); - tunnel.idleTimer = setTimeout(() => closeTunnel(tunnel.id, "egress proxy idle timeout"), tunnelIdleTimeoutMs); + tunnel.idleTimer = setTimeout(() => closeTunnel(tunnel.id, "egress proxy idle timeout"), options.idleTimeoutMs); }; const queuePendingChunk = (tunnel: Tunnel, chunk: Buffer): boolean => { if (tunnel.closed) return false; tunnel.pending.push(chunk); tunnel.pendingBytes += chunk.byteLength; + tunnel.bytesFromClient += chunk.byteLength; refreshTunnelIdle(tunnel); - if (tunnel.pendingBytes <= maxPendingBytes) return true; + if (tunnel.pendingBytes <= options.maxPendingBytes) return true; closeTunnel(tunnel.id, "egress proxy pending buffer exceeded"); return false; }; @@ -233,6 +327,7 @@ export function startProviderEgressProxy(options: ProviderEgressProxyOptions): P method: tunnel.method, targetHost: tunnel.targetHost, targetPort: tunnel.targetPort, + targetPathSummary: tunnel.targetPathSummary, ageMs: Date.now() - tunnel.createdAt, pendingBytes: tunnel.pendingBytes, }); @@ -243,12 +338,15 @@ export function startProviderEgressProxy(options: ProviderEgressProxyOptions): P send({ type: "egress_tcp_data", providerId: options.providerId, connectionId: tunnel.id, data: chunk.toString("base64"), encoding: "base64", at: nowIso() }); } tunnel.pendingBytes = 0; + options.onStatusChanged?.(); return true; } if (message.type === "egress_tcp_data") { if (tunnel === undefined || tunnel.client.destroyed) return true; refreshTunnelIdle(tunnel); - tunnel.client.write(Buffer.from(message.data, message.encoding === "base64" ? "base64" : "utf8")); + const payload = Buffer.from(message.data, message.encoding === "base64" ? "base64" : "utf8"); + tunnel.bytesToClient += payload.byteLength; + tunnel.client.write(payload); return true; } if (message.type === "egress_tcp_close") { @@ -307,6 +405,7 @@ export function startProviderEgressProxy(options: ProviderEgressProxyOptions): P const id = safeConnectionId(); let firstPayload: Buffer | null = null; let target: { host: string; port: number } | null = null; + let targetPathSummary: string | null = null; if (parsed.method === "CONNECT") { target = parseConnectTarget(parsed.target); firstPayload = rest.length > 0 ? rest : null; @@ -314,6 +413,7 @@ export function startProviderEgressProxy(options: ProviderEgressProxyOptions): P const httpTarget = httpRequestTarget(parsed.target); if (httpTarget !== null) { target = { host: httpTarget.host, port: httpTarget.port }; + targetPathSummary = httpTarget.pathSummary; firstPayload = Buffer.concat([rewriteAbsoluteHttpRequest(head, httpTarget.rewrittenTarget), rest]); } } @@ -326,6 +426,7 @@ export function startProviderEgressProxy(options: ProviderEgressProxyOptions): P id, client, method: parsed.method, + targetPathSummary, opened: false, pending: [], pendingBytes: 0, @@ -334,8 +435,11 @@ export function startProviderEgressProxy(options: ProviderEgressProxyOptions): P lastActivityAt: createdAt, targetHost: target.host, targetPort: target.port, + bytesFromClient: 0, + bytesToClient: 0, openTimer: null, idleTimer: null, + maxAgeTimer: null, }; tunnels.set(id, tunnel); options.logger("info", "egress_proxy_tunnel_open_request", { @@ -344,8 +448,10 @@ export function startProviderEgressProxy(options: ProviderEgressProxyOptions): P method: parsed.method, targetHost: target.host, targetPort: target.port, + targetPathSummary, firstPayloadBytes: firstPayload === null ? 0 : firstPayload.byteLength, }); + options.onStatusChanged?.(); client.on("data", (nextChunk) => { const nextBuffer = Buffer.isBuffer(nextChunk) ? nextChunk : Buffer.from(nextChunk); if (!tunnel.opened) { @@ -353,6 +459,7 @@ export function startProviderEgressProxy(options: ProviderEgressProxyOptions): P return; } refreshTunnelIdle(tunnel); + tunnel.bytesFromClient += nextBuffer.byteLength; send({ type: "egress_tcp_data", providerId: options.providerId, connectionId: id, data: nextBuffer.toString("base64"), encoding: "base64", at: nowIso() }); }); client.on("close", () => closeTunnel(id)); @@ -368,12 +475,15 @@ export function startProviderEgressProxy(options: ProviderEgressProxyOptions): P method: parsed.method, targetHost: target.host, targetPort: target.port, + targetPathSummary, failureKind: "core-channel-not-connected", }); fail("503 Service Unavailable", "provider-gateway core channel is not connected\n"); + options.onStatusChanged?.(); return; } - tunnel.openTimer = setTimeout(() => closeTunnel(id, "egress proxy open timeout"), tunnelOpenTimeoutMs); + tunnel.openTimer = setTimeout(() => closeTunnel(id, "egress proxy open timeout"), options.openTimeoutMs); + tunnel.maxAgeTimer = setTimeout(() => closeTunnel(id, "egress proxy max tunnel age exceeded"), options.maxTunnelAgeMs); refreshTunnelIdle(tunnel); if (firstPayload !== null) queuePendingChunk(tunnel, firstPayload); }); diff --git a/src/components/provider-gateway/src/index.ts b/src/components/provider-gateway/src/index.ts index a7f52e17..04389c7e 100644 --- a/src/components/provider-gateway/src/index.ts +++ b/src/components/provider-gateway/src/index.ts @@ -54,6 +54,11 @@ interface RuntimeConfig { egressProxyEnabled: boolean; egressProxyListenHost: string; egressProxyPort: number; + egressProxyOpenTimeoutMs: number; + egressProxyIdleTimeoutMs: number; + egressProxyMaxTunnelAgeMs: number; + egressProxyStaleTunnelIdleMs: number; + egressProxyMaxPendingBytes: number; logFile: string; } @@ -534,6 +539,11 @@ function readConfig(): RuntimeConfig { egressProxyEnabled: readBooleanEnv("PROVIDER_EGRESS_PROXY_ENABLED", true, "UNIDESK_PROVIDER_EGRESS_PROXY_ENABLED"), egressProxyListenHost: readOptionalStringEnv("PROVIDER_EGRESS_PROXY_LISTEN_HOST", "UNIDESK_PROVIDER_EGRESS_PROXY_LISTEN_HOST") ?? "0.0.0.0", egressProxyPort: readNumberEnv("PROVIDER_EGRESS_PROXY_PORT", 18789, "UNIDESK_PROVIDER_EGRESS_PROXY_PORT"), + egressProxyOpenTimeoutMs: Math.max(1_000, Math.min(120_000, Math.floor(readNumberEnv("PROVIDER_EGRESS_PROXY_OPEN_TIMEOUT_MS", 15_000, "UNIDESK_PROVIDER_EGRESS_PROXY_OPEN_TIMEOUT_MS")))), + egressProxyIdleTimeoutMs: Math.max(30_000, Math.min(3_600_000, Math.floor(readNumberEnv("PROVIDER_EGRESS_PROXY_IDLE_TIMEOUT_MS", 600_000, "UNIDESK_PROVIDER_EGRESS_PROXY_IDLE_TIMEOUT_MS")))), + egressProxyMaxTunnelAgeMs: Math.max(60_000, Math.min(14_400_000, Math.floor(readNumberEnv("PROVIDER_EGRESS_PROXY_MAX_TUNNEL_AGE_MS", 3_600_000, "UNIDESK_PROVIDER_EGRESS_PROXY_MAX_TUNNEL_AGE_MS")))), + egressProxyStaleTunnelIdleMs: Math.max(10_000, Math.min(600_000, Math.floor(readNumberEnv("PROVIDER_EGRESS_PROXY_STALE_TUNNEL_IDLE_MS", 120_000, "UNIDESK_PROVIDER_EGRESS_PROXY_STALE_TUNNEL_IDLE_MS")))), + egressProxyMaxPendingBytes: Math.max(64 * 1024, Math.min(64 * 1024 * 1024, Math.floor(readNumberEnv("PROVIDER_EGRESS_PROXY_MAX_PENDING_BYTES", 4 * 1024 * 1024, "UNIDESK_PROVIDER_EGRESS_PROXY_MAX_PENDING_BYTES")))), logFile: readOptionalStringEnv("LOG_FILE") ?? `/var/log/unidesk/provider-gateway-${providerId}.jsonl`, }; } @@ -587,6 +597,17 @@ function currentLabels(): ProviderLabels { providerGatewayEgressProxyPort: config.egressProxyPort, providerGatewayEgressProxyConnected: egressProxyStatus === null ? false : egressProxyStatus.connected === true, providerGatewayEgressProxyActiveTunnels: egressProxyStatus === null ? 0 : egressProxyStatus.activeTunnels ?? 0, + providerGatewayEgressProxyPendingTunnels: egressProxyStatus === null ? 0 : egressProxyStatus.pendingTunnels ?? 0, + providerGatewayEgressProxyOpenedTunnels: egressProxyStatus === null ? 0 : egressProxyStatus.openedTunnels ?? 0, + providerGatewayEgressProxyStaleTunnels: egressProxyStatus === null ? 0 : egressProxyStatus.staleTunnels ?? 0, + providerGatewayEgressProxyOldestTunnelAgeMs: egressProxyStatus === null ? 0 : egressProxyStatus.oldestTunnelAgeMs ?? 0, + providerGatewayEgressProxyOpenTimeoutMs: egressProxyStatus === null ? config.egressProxyOpenTimeoutMs : egressProxyStatus.openTimeoutMs ?? config.egressProxyOpenTimeoutMs, + providerGatewayEgressProxyIdleTimeoutMs: egressProxyStatus === null ? config.egressProxyIdleTimeoutMs : egressProxyStatus.idleTimeoutMs ?? config.egressProxyIdleTimeoutMs, + providerGatewayEgressProxyMaxTunnelAgeMs: egressProxyStatus === null ? config.egressProxyMaxTunnelAgeMs : egressProxyStatus.maxTunnelAgeMs ?? config.egressProxyMaxTunnelAgeMs, + providerGatewayEgressProxyStaleTunnelIdleMs: egressProxyStatus === null ? config.egressProxyStaleTunnelIdleMs : egressProxyStatus.staleTunnelIdleMs ?? config.egressProxyStaleTunnelIdleMs, + providerGatewayEgressProxyMaxPendingBytes: egressProxyStatus === null ? config.egressProxyMaxPendingBytes : egressProxyStatus.maxPendingBytes ?? config.egressProxyMaxPendingBytes, + providerGatewayEgressProxyActiveTunnelDetails: egressProxyStatus === null ? [] : egressProxyStatus.activeTunnelDetails ?? [], + providerGatewayEgressProxyRecentClosedTunnels: egressProxyStatus === null ? [] : egressProxyStatus.recentClosedTunnels ?? [], providerGatewaySshDataTransport: "tcp-pool", providerGatewaySshDataHost: config.providerDataHost, providerGatewaySshDataPort: config.providerDataPort, @@ -2706,9 +2727,15 @@ if (config.egressProxyEnabled) { providerId: config.providerId, listenHost: config.egressProxyListenHost, listenPort: config.egressProxyPort, + openTimeoutMs: config.egressProxyOpenTimeoutMs, + idleTimeoutMs: config.egressProxyIdleTimeoutMs, + maxTunnelAgeMs: config.egressProxyMaxTunnelAgeMs, + staleTunnelIdleMs: config.egressProxyStaleTunnelIdleMs, + maxPendingBytes: config.egressProxyMaxPendingBytes, isCoreConnected: () => socket?.readyState === WebSocket.OPEN, sendToCore: (message) => sendJsonOk(message), logger, + onStatusChanged: () => sendHeartbeat(), }); }