fix: isolate provider egress tunnel diagnostics

This commit is contained in:
Codex
2026-06-28 16:15:41 +00:00
parent d225fd1a61
commit 4a636abb73
9 changed files with 368 additions and 25 deletions
@@ -289,6 +289,10 @@ Web 哨兵的首条生产 canary 应覆盖 `workbench-dsflash-go-tool-call-10x`
provider egress tunnel 生命周期至少应记录:provider proxy 收到 client 请求、发出 `egress_tcp_open`、core 确认 opened、core connect failed/timeout、remote close、本地 open timeout、client close、pending buffer 超限和 idle close。日志和受控摘要必须包含 `providerId``connectionId`、目标 `host`/`port``method``ageMs``pendingBytes` 和低基数 failureKind;不得记录 TCP payload、PostgreSQL DSN 密码、HTTP Authorization、Secret、token 或完整业务正文。
针对 GitHub release、k3s installer、dependency pull 等大二进制下载,egress tunnel 观测与控制面观测必须分离:provider labels 至少分别暴露 `host.ssh.tcp-pool` readiness 与 egress active/pending/opened/stale tunnel 数,CLI 必须提供独立的 egress drill-down。egress drill-down 只能展示目标 host/port、路径摘要和 query key 摘要,不得展示 URL credential、header、token 或 query value;同时必须保留最近关闭 tunnel 的有界生命周期摘要,用于确认中断下载后的自动回收。
backend-core 向 provider WebSocket 写入大流量 `egress_tcp_data` 时,必须避免单个下载流无界挤占 control/dispatch 消息;实现可以使用独立通道、优先级队列、节流/yield 或等价 backpressure 策略,但验收必须证明大文件下载期间 `trans <provider> argv true` 仍可在可接受时间内返回。下载中断、provider 断开或发送通道关闭时,core 侧必须有有界时间内的 read/write loop 退出日志与 active connection drain 证据。
`server start|stop|rebuild|restart` 等创建 `.state/jobs` 的命令默认必须返回 async job 摘要,而不是完整 JSON payload、完整 command、完整 runtime env 或 stdout/stderr。摘要至少包含 job id、目标 service、当前状态、stdout/stderr 路径、`job status` 查询命令和必要的 `server logs`/`server status` 后续命令。完整结构只能通过 `--full`/`--raw``job status <id>` 渐进展开。`job` 与常见自然输入 `jobs``status|get|read` 应指向同一受控状态入口,避免调用者因为别名缺失退回手工文件读取。
#### 6.9.1 目标架构图
+10 -1
View File
@@ -1,7 +1,7 @@
// SPEC: PJ2026-01060509 出站诊断 draft-2026-06-26-p8-egress-job-friction.
// UniDesk CLI dispatcher with bounded server lifecycle and job drill-down output.
import { readConfig } from "./src/config";
import { debugDispatch, debugHealth, debugSshPool, debugTask, isDebugDispatchCommand, type DebugDispatchCommand } from "./src/debug";
import { debugDispatch, debugEgressProxy, debugHealth, debugSshPool, debugTask, isDebugDispatchCommand, type DebugDispatchCommand } from "./src/debug";
import { isRebuildableService, isRestartableService, rebuildService, restartService, stackLogs, stackStatus, startStack, stopStack, unsupportedRebuildService, unsupportedRestartService } from "./src/docker";
import { emitError, emitJson, emitText, isRenderedCliResult } from "./src/output";
import { cancelJob, jobWithTail, listJobs, listJobsSummary, readJob, renderJobLaunchSummary, renderJobStatusSummary, runJob } from "./src/jobs";
@@ -650,6 +650,15 @@ async function main(): Promise<void> {
if (!ok) process.exitCode = 1;
return;
}
if (sub === "egress-proxy") {
const providerId = third ?? "";
if (providerId.length === 0) throw new Error("debug egress-proxy requires providerId");
const result = await debugEgressProxy(config, providerId);
const ok = (result as { ok?: unknown }).ok !== false;
emitJson(commandName, result, ok);
if (!ok) process.exitCode = 1;
return;
}
if (sub === "dispatch") {
const providerId = isDebugDispatchCommand(third) ? config.providerGateway.id : third ?? config.providerGateway.id;
const commandArg = isDebugDispatchCommand(third) ? third : fourth;
+85
View File
@@ -199,6 +199,91 @@ export async function debugSshPool(_config: UniDeskConfig, providerId: string):
};
}
export async function debugEgressProxy(_config: UniDeskConfig, providerId: string): Promise<unknown> {
const nodesResponse = await coreInternalFetch("/api/nodes");
const body = recordValue(recordValue(nodesResponse).body);
const nodes = arrayValue(body.nodes);
const node = nodes
.map((item) => recordValue(item))
.find((item) => item.providerId === providerId) ?? null;
if (node === null) {
return {
ok: false,
providerId,
degradedReason: "provider-not-found",
nodesFetch: nodesResponse,
next: { fullHealth: "bun scripts/cli.ts debug health" },
};
}
const labels = recordValue(node.labels);
const active = Number(labels.providerGatewayEgressProxyActiveTunnels ?? 0);
const pending = Number(labels.providerGatewayEgressProxyPendingTunnels ?? 0);
const opened = Number(labels.providerGatewayEgressProxyOpenedTunnels ?? 0);
const stale = Number(labels.providerGatewayEgressProxyStaleTunnels ?? 0);
const enabled = labels.providerGatewayEgressProxy === true;
const connected = labels.providerGatewayEgressProxyConnected === true;
const sshReady = Number(labels.providerGatewaySshDataPoolReady ?? 0);
const activeTunnelDetails = arrayValue(labels.providerGatewayEgressProxyActiveTunnelDetails);
const recentClosedTunnels = arrayValue(labels.providerGatewayEgressProxyRecentClosedTunnels);
const ok = enabled && connected && stale === 0;
return {
ok,
providerId,
node: {
providerId: node.providerId,
name: node.name,
status: node.status,
lastHeartbeat: node.lastHeartbeat ?? null,
updatedAt: node.updatedAt ?? null,
},
egressProxy: {
enabled,
connected,
port: labels.providerGatewayEgressProxyPort ?? null,
activeTunnels: Number.isFinite(active) ? active : 0,
pendingTunnels: Number.isFinite(pending) ? pending : 0,
openedTunnels: Number.isFinite(opened) ? opened : 0,
staleTunnels: Number.isFinite(stale) ? stale : 0,
oldestTunnelAgeMs: labels.providerGatewayEgressProxyOldestTunnelAgeMs ?? null,
policy: {
openTimeoutMs: labels.providerGatewayEgressProxyOpenTimeoutMs ?? null,
idleTimeoutMs: labels.providerGatewayEgressProxyIdleTimeoutMs ?? null,
maxTunnelAgeMs: labels.providerGatewayEgressProxyMaxTunnelAgeMs ?? null,
staleTunnelIdleMs: labels.providerGatewayEgressProxyStaleTunnelIdleMs ?? null,
maxPendingBytes: labels.providerGatewayEgressProxyMaxPendingBytes ?? null,
},
activeTunnelDetails,
activeTunnelDetailsCount: activeTunnelDetails.length,
recentClosedTunnels,
recentClosedTunnelCount: recentClosedTunnels.length,
},
controlPlaneSeparation: {
sshDataTransport: stringValue(labels.providerGatewaySshDataTransport),
sshDataPoolReady: Number.isFinite(sshReady) ? sshReady : 0,
sshDataPoolDesired: labels.providerGatewaySshDataPoolDesired ?? null,
sshDataPoolClaimed: labels.providerGatewaySshDataPoolClaimed ?? null,
note: "SSH/control readiness is reported separately from egress download tunnel activity.",
},
classification: !enabled
? "egress-proxy-disabled"
: !connected
? "egress-proxy-core-channel-disconnected"
: stale > 0
? "egress-tunnel-stale"
: active > 0
? "egress-tunnel-active"
: "egress-proxy-ready",
outputPolicy: {
redaction: "activeTunnelDetails expose target host/port and path/query-key summary only; URL credentials, headers, tokens and query values are not emitted.",
boundedRecentClosedTunnels: true,
},
next: {
smoke: `trans ${providerId} argv true`,
fullHealth: "bun scripts/cli.ts debug health",
},
};
}
async function waitForTask(taskId: string, timeoutMs: number): Promise<unknown> {
const started = Date.now();
let latest: unknown = null;
+4 -2
View File
@@ -95,6 +95,7 @@ export function rootHelp(): unknown {
{ command: "job cancel <jobId>", description: "Cancel a queued/running async job through the .state/jobs control entry and keep a terminal canceled record." },
{ command: "debug health", description: "Probe internal core, nodes, system/Docker status, frontend, provider ingress, and public boundary." },
{ command: "debug ssh-pool <providerId>", description: "Show bounded host.ssh.tcp-pool labels for one provider, including ready/claimed/desired/lastError." },
{ command: "debug egress-proxy <providerId>", description: "Show provider-gateway egress proxy tunnel counts, stale tunnel diagnosis, active target summaries, and recent closed tunnel lifecycle without URL credential leakage." },
{ command: "debug dispatch [providerId] [docker.ps|provider.upgrade|host.ssh|microservice.http|echo] [--wait-ms N]", description: "Submit a real internal-core dispatch request for CLI debugging." },
{ command: "debug task <taskId|latest>", description: "Read a dispatched task record from internal core for CLI debugging." },
{ command: "network perf [--service code-queue --path /api/tasks/overview?limit=30 --count N --concurrency N --label before|after]", description: "Benchmark frontend -> backend-core -> provider/adapter user-service networking and report latency/proxy-mode distributions." },
@@ -539,15 +540,16 @@ function jobHelp(): unknown {
function debugHelp(): unknown {
return {
command: "debug health|ssh-pool|dispatch|task",
command: "debug health|ssh-pool|egress-proxy|dispatch|task",
output: "json",
usage: [
"bun scripts/cli.ts debug health",
"bun scripts/cli.ts debug ssh-pool <providerId>",
"bun scripts/cli.ts debug egress-proxy <providerId>",
"bun scripts/cli.ts debug dispatch [providerId] [docker.ps|provider.upgrade|host.ssh|microservice.http|echo] [--wait-ms N]",
"bun scripts/cli.ts debug task <taskId|latest>",
],
description: "Debug the real core/provider/dispatch paths. ssh-pool returns bounded host.ssh.tcp-pool labels for one provider; do not use debug commands as formal TEST.md acceptance steps.",
description: "Debug the real core/provider/dispatch paths. ssh-pool and egress-proxy are separate so SSH/control readiness is not conflated with long download tunnel activity; do not use debug commands as formal TEST.md acceptance steps.",
};
}
+14
View File
@@ -154,6 +154,20 @@ function providerGatewaySignal(debug: unknown, providerId: string): ProviderTria
providerGatewayVersion: labels.providerGatewayVersion ?? null,
hostSshConfigured: labels.hostSshConfigured ?? null,
hostSshKeyPresent: labels.hostSshKeyPresent ?? null,
sshDataPool: {
transport: labels.providerGatewaySshDataTransport ?? null,
desired: labels.providerGatewaySshDataPoolDesired ?? null,
ready: labels.providerGatewaySshDataPoolReady ?? null,
claimed: labels.providerGatewaySshDataPoolClaimed ?? null,
},
egressProxy: {
enabled: labels.providerGatewayEgressProxy ?? null,
connected: labels.providerGatewayEgressProxyConnected ?? null,
activeTunnels: labels.providerGatewayEgressProxyActiveTunnels ?? null,
pendingTunnels: labels.providerGatewayEgressProxyPendingTunnels ?? null,
staleTunnels: labels.providerGatewayEgressProxyStaleTunnels ?? null,
oldestTunnelAgeMs: labels.providerGatewayEgressProxyOldestTunnelAgeMs ?? null,
},
capabilities,
});
}
+99 -7
View File
@@ -1,7 +1,7 @@
// SPEC: PJ2026-01060509 出站诊断 draft-2026-06-26-p8-egress-job-friction.
// Backend-core side of provider egress TCP bridge with stage logs.
use std::sync::Arc;
use std::time::Duration;
use std::time::{Duration, Instant};
use anyhow::bail;
use axum::extract::ws::Message;
@@ -16,9 +16,16 @@ use crate::state::{AppState, ProviderConnection};
pub struct EgressTcpConnection {
pub provider_id: String,
pub connection_id: String,
pub target_host: String,
pub target_port: i64,
pub created_at: Instant,
pub writer: mpsc::UnboundedSender<Vec<u8>>,
}
const EGRESS_TCP_READ_CHUNK_BYTES: usize = 16 * 1024;
const EGRESS_TCP_COOPERATIVE_YIELD_BYTES: u64 = 128 * 1024;
const EGRESS_TCP_COOPERATIVE_YIELD_DELAY_MS: u64 = 1;
fn egress_tcp_key(provider_id: &str, connection_id: &str) -> String {
format!("{provider_id}:{connection_id}")
}
@@ -178,11 +185,14 @@ pub async fn handle_egress_tcp_open(
EgressTcpConnection {
provider_id: provider_id.clone(),
connection_id: connection_id.clone(),
target_host: host.clone(),
target_port: port,
created_at: Instant::now(),
writer: tx,
},
);
let _ = provider.sender.send(Message::Text(
json!({ "type": "egress_tcp_opened", "connectionId": connection_id }).to_string(),
json!({ "type": "egress_tcp_opened", "connectionId": connection_id.as_str() }).to_string(),
));
state.log(
"info",
@@ -200,15 +210,51 @@ pub async fn handle_egress_tcp_open(
let provider = provider.clone();
let provider_id = provider_id.clone();
let connection_id = connection_id.clone();
let host = host.clone();
async move {
let mut buffer = vec![0_u8; 16 * 1024];
let mut buffer = vec![0_u8; EGRESS_TCP_READ_CHUNK_BYTES];
let mut bytes_to_provider: u64 = 0;
let mut bytes_since_yield: u64 = 0;
let mut chunks_to_provider: u64 = 0;
loop {
match reader.read(&mut buffer).await {
Ok(0) => break,
Ok(size) => {
let data =
base64::engine::general_purpose::STANDARD.encode(&buffer[..size]);
let _ = provider.sender.send(Message::Text(json!({ "type": "egress_tcp_data", "connectionId": connection_id, "data": data, "encoding": "base64" }).to_string()));
let sent = provider.sender.send(Message::Text(
json!({
"type": "egress_tcp_data",
"connectionId": connection_id.as_str(),
"data": data,
"encoding": "base64"
})
.to_string(),
));
if sent.is_err() {
state.log(
"warn",
"egress_tcp_provider_channel_closed",
Some(json!({
"providerId": provider_id.as_str(),
"connectionId": connection_id.as_str(),
"targetHost": host.as_str(),
"targetPort": port,
"failureKind": "provider-channel-closed",
"bytesToProvider": bytes_to_provider,
"chunksToProvider": chunks_to_provider
})),
);
break;
}
bytes_to_provider += size as u64;
bytes_since_yield += size as u64;
chunks_to_provider += 1;
if bytes_since_yield >= EGRESS_TCP_COOPERATIVE_YIELD_BYTES {
bytes_since_yield = 0;
tokio::task::yield_now().await;
tokio::time::sleep(Duration::from_millis(EGRESS_TCP_COOPERATIVE_YIELD_DELAY_MS)).await;
}
}
Err(error) => {
let error_text = error.to_string();
@@ -232,6 +278,19 @@ pub async fn handle_egress_tcp_open(
.lock()
.await
.remove(&egress_tcp_key(&provider_id, &connection_id));
state.log(
"info",
"egress_tcp_read_loop_closed",
Some(json!({
"providerId": provider_id.as_str(),
"connectionId": connection_id.as_str(),
"targetHost": host.as_str(),
"targetPort": port,
"bytesToProvider": bytes_to_provider,
"chunksToProvider": chunks_to_provider,
"yieldEveryBytes": EGRESS_TCP_COOPERATIVE_YIELD_BYTES
})),
);
send_egress_close(&provider, &connection_id, None);
}
});
@@ -239,8 +298,13 @@ pub async fn handle_egress_tcp_open(
let state = state.clone();
let provider_id = provider_id.clone();
let connection_id = connection_id.clone();
let host = host.clone();
async move {
let mut bytes_from_provider: u64 = 0;
let mut chunks_from_provider: u64 = 0;
while let Some(data) = rx.recv().await {
bytes_from_provider += data.len() as u64;
chunks_from_provider += 1;
if let Err(error) = writer.write_all(&data).await {
state.log(
"warn",
@@ -260,6 +324,18 @@ pub async fn handle_egress_tcp_open(
.lock()
.await
.remove(&egress_tcp_key(&provider_id, &connection_id));
state.log(
"info",
"egress_tcp_write_loop_closed",
Some(json!({
"providerId": provider_id.as_str(),
"connectionId": connection_id.as_str(),
"targetHost": host.as_str(),
"targetPort": port,
"bytesFromProvider": bytes_from_provider,
"chunksFromProvider": chunks_from_provider
})),
);
}
});
Ok(())
@@ -313,12 +389,12 @@ pub async fn close_egress_tcp_connection(
connection_id: &str,
error: Option<&str>,
) {
let removed = state
let removed_connection = state
.active_egress_tcp
.lock()
.await
.remove(&egress_tcp_key(provider_id, connection_id))
.is_some();
.remove(&egress_tcp_key(provider_id, connection_id));
let removed = removed_connection.is_some();
if removed || error.is_some() {
state.log(
if error.is_some() { "warn" } else { "info" },
@@ -327,6 +403,9 @@ pub async fn close_egress_tcp_connection(
"providerId": provider_id,
"connectionId": connection_id,
"removed": removed,
"targetHost": removed_connection.as_ref().map(|connection| connection.target_host.as_str()),
"targetPort": removed_connection.as_ref().map(|connection| connection.target_port),
"ageMs": removed_connection.as_ref().map(|connection| connection.created_at.elapsed().as_millis() as u64),
"failureKind": error.map(egress_failure_kind).unwrap_or("normal-close"),
"error": error.unwrap_or("")
})),
@@ -336,5 +415,18 @@ pub async fn close_egress_tcp_connection(
pub async fn close_egress_tcp_connections_for_provider(state: &Arc<AppState>, provider_id: &str) {
let mut connections = state.active_egress_tcp.lock().await;
let before = connections.len();
connections.retain(|_, connection| connection.provider_id != provider_id);
let removed = before.saturating_sub(connections.len());
if removed > 0 {
state.log(
"warn",
"egress_tcp_provider_connections_drained",
Some(json!({
"providerId": provider_id,
"removed": removed,
"failureKind": "provider-disconnected"
})),
);
}
}
+1 -1
View File
@@ -1,6 +1,6 @@
{
"name": "@unidesk/provider-gateway",
"version": "0.2.30",
"version": "0.2.31",
"private": true,
"type": "module",
"scripts": {
@@ -20,15 +20,22 @@ interface ProviderEgressProxyOptions {
providerId: string;
listenHost: string;
listenPort: number;
openTimeoutMs: number;
idleTimeoutMs: number;
maxTunnelAgeMs: number;
staleTunnelIdleMs: number;
maxPendingBytes: number;
sendToCore: (message: EgressToCoreMessage) => boolean;
isCoreConnected: () => boolean;
logger: Logger;
onStatusChanged?: () => void;
}
interface Tunnel {
id: string;
client: Socket;
method: string;
targetPathSummary: string | null;
opened: boolean;
pending: Buffer[];
pendingBytes: number;
@@ -37,8 +44,28 @@ interface Tunnel {
lastActivityAt: number;
targetHost: string;
targetPort: number;
bytesFromClient: number;
bytesToClient: number;
openTimer: ReturnType<typeof setTimeout> | null;
idleTimer: ReturnType<typeof setTimeout> | null;
maxAgeTimer: ReturnType<typeof setTimeout> | null;
}
interface ClosedTunnelSummary {
[key: string]: JsonValue;
connectionId: string;
method: string;
targetHost: string;
targetPort: number;
targetPathSummary: string | null;
opened: boolean;
ageMs: number;
idleMs: number;
closedAt: string;
failureKind: string;
closeReason: string | null;
bytesFromClient: number;
bytesToClient: number;
}
export interface ProviderEgressProxyHandle {
@@ -77,7 +104,13 @@ function parseHttpProxyRequest(headerText: string): { method: string; target: st
return { method: match[1] ?? "", target: match[2] ?? "", version: match[3] ?? "HTTP/1.1" };
}
function httpRequestTarget(rawTarget: string): { host: string; port: number; rewrittenTarget: string } | null {
function summarizeHttpTargetPath(url: URL): string {
const queryKeys = Array.from(new Set(Array.from(url.searchParams.keys()).filter(Boolean))).sort();
if (queryKeys.length === 0) return url.pathname || "/";
return `${url.pathname || "/"}?${queryKeys.slice(0, 8).join(",")}${queryKeys.length > 8 ? ",..." : ""}`;
}
function httpRequestTarget(rawTarget: string): { host: string; port: number; rewrittenTarget: string; pathSummary: string } | null {
try {
const url = new URL(rawTarget);
if (url.protocol !== "http:") return null;
@@ -85,6 +118,7 @@ function httpRequestTarget(rawTarget: string): { host: string; port: number; rew
host: url.hostname,
port: url.port.length > 0 ? Number(url.port) : 80,
rewrittenTarget: `${url.pathname}${url.search}`,
pathSummary: summarizeHttpTargetPath(url),
};
} catch {
return null;
@@ -125,6 +159,7 @@ function egressFailureKind(error: string | undefined): string {
const normalized = error.toLowerCase();
if (normalized.includes("open timeout")) return "open-timeout";
if (normalized.includes("idle timeout")) return "idle-timeout";
if (normalized.includes("max tunnel age")) return "max-age-exceeded";
if (normalized.includes("pending buffer")) return "pending-buffer-exceeded";
if (normalized.includes("not connected")) return "core-channel-not-connected";
if (normalized.includes("timeout")) return "timeout";
@@ -132,13 +167,15 @@ function egressFailureKind(error: string | undefined): string {
return "remote-or-socket-error";
}
const tunnelOpenTimeoutMs = 15_000;
const tunnelIdleTimeoutMs = 600_000;
const maxPendingBytes = 4 * 1024 * 1024;
function tunnelState(tunnel: Tunnel, now: number, staleTunnelIdleMs: number): "pending" | "open" | "stale" {
if (!tunnel.opened) return "pending";
return now - tunnel.lastActivityAt >= staleTunnelIdleMs ? "stale" : "open";
}
export function startProviderEgressProxy(options: ProviderEgressProxyOptions): ProviderEgressProxyHandle {
const proxyUrl = proxyUrlFor(options.listenHost, options.listenPort);
const tunnels = new Map<string, Tunnel>();
const recentClosedTunnels: ClosedTunnelSummary[] = [];
let closed = false;
const send = (message: EgressToCoreMessage): boolean => options.sendToCore(message);
@@ -147,6 +184,26 @@ export function startProviderEgressProxy(options: ProviderEgressProxyOptions): P
const now = Date.now();
const tunnelList = Array.from(tunnels.values());
const ages = tunnelList.map((tunnel) => now - tunnel.createdAt);
const details = tunnelList
.sort((left, right) => left.createdAt - right.createdAt)
.slice(0, 12)
.map((tunnel) => {
const idleMs = now - tunnel.lastActivityAt;
return {
connectionId: tunnel.id,
method: tunnel.method,
targetHost: tunnel.targetHost,
targetPort: tunnel.targetPort,
targetPathSummary: tunnel.targetPathSummary,
state: tunnelState(tunnel, now, options.staleTunnelIdleMs),
opened: tunnel.opened,
ageMs: now - tunnel.createdAt,
idleMs,
pendingBytes: tunnel.pendingBytes,
bytesFromClient: tunnel.bytesFromClient,
bytesToClient: tunnel.bytesToClient,
};
});
return {
enabled: true,
providerId: options.providerId,
@@ -156,10 +213,18 @@ export function startProviderEgressProxy(options: ProviderEgressProxyOptions): P
listenPort: options.listenPort,
activeTunnels: tunnels.size,
pendingTunnels: tunnelList.filter((tunnel) => !tunnel.opened).length,
openedTunnels: tunnelList.filter((tunnel) => tunnel.opened).length,
staleTunnels: tunnelList.filter((tunnel) => tunnelState(tunnel, now, options.staleTunnelIdleMs) === "stale").length,
oldestTunnelAgeMs: ages.length > 0 ? Math.max(...ages) : 0,
openTimeoutMs: tunnelOpenTimeoutMs,
idleTimeoutMs: tunnelIdleTimeoutMs,
maxPendingBytes,
openTimeoutMs: options.openTimeoutMs,
idleTimeoutMs: options.idleTimeoutMs,
maxTunnelAgeMs: options.maxTunnelAgeMs,
staleTunnelIdleMs: options.staleTunnelIdleMs,
maxPendingBytes: options.maxPendingBytes,
activeTunnelDetails: details,
activeTunnelDetailLimit: 12,
activeTunnelDetailsTruncated: tunnelList.length > details.length,
recentClosedTunnels,
channel: "provider-gateway",
};
};
@@ -167,31 +232,59 @@ export function startProviderEgressProxy(options: ProviderEgressProxyOptions): P
const clearTunnelTimers = (tunnel: Tunnel): void => {
if (tunnel.openTimer !== null) clearTimeout(tunnel.openTimer);
if (tunnel.idleTimer !== null) clearTimeout(tunnel.idleTimer);
if (tunnel.maxAgeTimer !== null) clearTimeout(tunnel.maxAgeTimer);
tunnel.openTimer = null;
tunnel.idleTimer = null;
tunnel.maxAgeTimer = null;
};
const destroyTunnel = (tunnel: Tunnel, notifyCore: boolean, error?: string): void => {
tunnels.delete(tunnel.id);
tunnel.closed = true;
const closedAt = nowIso();
const ageMs = Date.now() - tunnel.createdAt;
const idleMs = Date.now() - tunnel.lastActivityAt;
const pendingBytes = tunnel.pendingBytes;
clearTunnelTimers(tunnel);
tunnel.pending.splice(0);
tunnel.pendingBytes = 0;
if (!tunnel.client.destroyed) tunnel.client.destroy();
if (notifyCore) send({ type: "egress_tcp_close", providerId: options.providerId, connectionId: tunnel.id, at: nowIso() });
const failureKind = egressFailureKind(error);
recentClosedTunnels.unshift({
connectionId: tunnel.id,
method: tunnel.method,
targetHost: tunnel.targetHost,
targetPort: tunnel.targetPort,
targetPathSummary: tunnel.targetPathSummary,
opened: tunnel.opened,
ageMs,
idleMs,
closedAt,
failureKind,
closeReason: error ?? null,
bytesFromClient: tunnel.bytesFromClient,
bytesToClient: tunnel.bytesToClient,
});
recentClosedTunnels.splice(16);
const event = {
providerId: options.providerId,
connectionId: tunnel.id,
method: tunnel.method,
targetHost: tunnel.targetHost,
targetPort: tunnel.targetPort,
targetPathSummary: tunnel.targetPathSummary,
opened: tunnel.opened,
ageMs: Date.now() - tunnel.createdAt,
pendingBytes: tunnel.pendingBytes,
failureKind: egressFailureKind(error),
ageMs,
idleMs,
pendingBytes,
bytesFromClient: tunnel.bytesFromClient,
bytesToClient: tunnel.bytesToClient,
failureKind,
...(error === undefined ? {} : { error }),
};
options.logger(error === undefined ? "info" : "warn", "egress_proxy_tunnel_closed", event);
options.onStatusChanged?.();
};
const closeTunnel = (id: string, error?: string): void => {
@@ -204,15 +297,16 @@ export function startProviderEgressProxy(options: ProviderEgressProxyOptions): P
if (tunnel.closed) return;
tunnel.lastActivityAt = Date.now();
if (tunnel.idleTimer !== null) clearTimeout(tunnel.idleTimer);
tunnel.idleTimer = setTimeout(() => closeTunnel(tunnel.id, "egress proxy idle timeout"), tunnelIdleTimeoutMs);
tunnel.idleTimer = setTimeout(() => closeTunnel(tunnel.id, "egress proxy idle timeout"), options.idleTimeoutMs);
};
const queuePendingChunk = (tunnel: Tunnel, chunk: Buffer): boolean => {
if (tunnel.closed) return false;
tunnel.pending.push(chunk);
tunnel.pendingBytes += chunk.byteLength;
tunnel.bytesFromClient += chunk.byteLength;
refreshTunnelIdle(tunnel);
if (tunnel.pendingBytes <= maxPendingBytes) return true;
if (tunnel.pendingBytes <= options.maxPendingBytes) return true;
closeTunnel(tunnel.id, "egress proxy pending buffer exceeded");
return false;
};
@@ -233,6 +327,7 @@ export function startProviderEgressProxy(options: ProviderEgressProxyOptions): P
method: tunnel.method,
targetHost: tunnel.targetHost,
targetPort: tunnel.targetPort,
targetPathSummary: tunnel.targetPathSummary,
ageMs: Date.now() - tunnel.createdAt,
pendingBytes: tunnel.pendingBytes,
});
@@ -243,12 +338,15 @@ export function startProviderEgressProxy(options: ProviderEgressProxyOptions): P
send({ type: "egress_tcp_data", providerId: options.providerId, connectionId: tunnel.id, data: chunk.toString("base64"), encoding: "base64", at: nowIso() });
}
tunnel.pendingBytes = 0;
options.onStatusChanged?.();
return true;
}
if (message.type === "egress_tcp_data") {
if (tunnel === undefined || tunnel.client.destroyed) return true;
refreshTunnelIdle(tunnel);
tunnel.client.write(Buffer.from(message.data, message.encoding === "base64" ? "base64" : "utf8"));
const payload = Buffer.from(message.data, message.encoding === "base64" ? "base64" : "utf8");
tunnel.bytesToClient += payload.byteLength;
tunnel.client.write(payload);
return true;
}
if (message.type === "egress_tcp_close") {
@@ -307,6 +405,7 @@ export function startProviderEgressProxy(options: ProviderEgressProxyOptions): P
const id = safeConnectionId();
let firstPayload: Buffer | null = null;
let target: { host: string; port: number } | null = null;
let targetPathSummary: string | null = null;
if (parsed.method === "CONNECT") {
target = parseConnectTarget(parsed.target);
firstPayload = rest.length > 0 ? rest : null;
@@ -314,6 +413,7 @@ export function startProviderEgressProxy(options: ProviderEgressProxyOptions): P
const httpTarget = httpRequestTarget(parsed.target);
if (httpTarget !== null) {
target = { host: httpTarget.host, port: httpTarget.port };
targetPathSummary = httpTarget.pathSummary;
firstPayload = Buffer.concat([rewriteAbsoluteHttpRequest(head, httpTarget.rewrittenTarget), rest]);
}
}
@@ -326,6 +426,7 @@ export function startProviderEgressProxy(options: ProviderEgressProxyOptions): P
id,
client,
method: parsed.method,
targetPathSummary,
opened: false,
pending: [],
pendingBytes: 0,
@@ -334,8 +435,11 @@ export function startProviderEgressProxy(options: ProviderEgressProxyOptions): P
lastActivityAt: createdAt,
targetHost: target.host,
targetPort: target.port,
bytesFromClient: 0,
bytesToClient: 0,
openTimer: null,
idleTimer: null,
maxAgeTimer: null,
};
tunnels.set(id, tunnel);
options.logger("info", "egress_proxy_tunnel_open_request", {
@@ -344,8 +448,10 @@ export function startProviderEgressProxy(options: ProviderEgressProxyOptions): P
method: parsed.method,
targetHost: target.host,
targetPort: target.port,
targetPathSummary,
firstPayloadBytes: firstPayload === null ? 0 : firstPayload.byteLength,
});
options.onStatusChanged?.();
client.on("data", (nextChunk) => {
const nextBuffer = Buffer.isBuffer(nextChunk) ? nextChunk : Buffer.from(nextChunk);
if (!tunnel.opened) {
@@ -353,6 +459,7 @@ export function startProviderEgressProxy(options: ProviderEgressProxyOptions): P
return;
}
refreshTunnelIdle(tunnel);
tunnel.bytesFromClient += nextBuffer.byteLength;
send({ type: "egress_tcp_data", providerId: options.providerId, connectionId: id, data: nextBuffer.toString("base64"), encoding: "base64", at: nowIso() });
});
client.on("close", () => closeTunnel(id));
@@ -368,12 +475,15 @@ export function startProviderEgressProxy(options: ProviderEgressProxyOptions): P
method: parsed.method,
targetHost: target.host,
targetPort: target.port,
targetPathSummary,
failureKind: "core-channel-not-connected",
});
fail("503 Service Unavailable", "provider-gateway core channel is not connected\n");
options.onStatusChanged?.();
return;
}
tunnel.openTimer = setTimeout(() => closeTunnel(id, "egress proxy open timeout"), tunnelOpenTimeoutMs);
tunnel.openTimer = setTimeout(() => closeTunnel(id, "egress proxy open timeout"), options.openTimeoutMs);
tunnel.maxAgeTimer = setTimeout(() => closeTunnel(id, "egress proxy max tunnel age exceeded"), options.maxTunnelAgeMs);
refreshTunnelIdle(tunnel);
if (firstPayload !== null) queuePendingChunk(tunnel, firstPayload);
});
@@ -54,6 +54,11 @@ interface RuntimeConfig {
egressProxyEnabled: boolean;
egressProxyListenHost: string;
egressProxyPort: number;
egressProxyOpenTimeoutMs: number;
egressProxyIdleTimeoutMs: number;
egressProxyMaxTunnelAgeMs: number;
egressProxyStaleTunnelIdleMs: number;
egressProxyMaxPendingBytes: number;
logFile: string;
}
@@ -534,6 +539,11 @@ function readConfig(): RuntimeConfig {
egressProxyEnabled: readBooleanEnv("PROVIDER_EGRESS_PROXY_ENABLED", true, "UNIDESK_PROVIDER_EGRESS_PROXY_ENABLED"),
egressProxyListenHost: readOptionalStringEnv("PROVIDER_EGRESS_PROXY_LISTEN_HOST", "UNIDESK_PROVIDER_EGRESS_PROXY_LISTEN_HOST") ?? "0.0.0.0",
egressProxyPort: readNumberEnv("PROVIDER_EGRESS_PROXY_PORT", 18789, "UNIDESK_PROVIDER_EGRESS_PROXY_PORT"),
egressProxyOpenTimeoutMs: Math.max(1_000, Math.min(120_000, Math.floor(readNumberEnv("PROVIDER_EGRESS_PROXY_OPEN_TIMEOUT_MS", 15_000, "UNIDESK_PROVIDER_EGRESS_PROXY_OPEN_TIMEOUT_MS")))),
egressProxyIdleTimeoutMs: Math.max(30_000, Math.min(3_600_000, Math.floor(readNumberEnv("PROVIDER_EGRESS_PROXY_IDLE_TIMEOUT_MS", 600_000, "UNIDESK_PROVIDER_EGRESS_PROXY_IDLE_TIMEOUT_MS")))),
egressProxyMaxTunnelAgeMs: Math.max(60_000, Math.min(14_400_000, Math.floor(readNumberEnv("PROVIDER_EGRESS_PROXY_MAX_TUNNEL_AGE_MS", 3_600_000, "UNIDESK_PROVIDER_EGRESS_PROXY_MAX_TUNNEL_AGE_MS")))),
egressProxyStaleTunnelIdleMs: Math.max(10_000, Math.min(600_000, Math.floor(readNumberEnv("PROVIDER_EGRESS_PROXY_STALE_TUNNEL_IDLE_MS", 120_000, "UNIDESK_PROVIDER_EGRESS_PROXY_STALE_TUNNEL_IDLE_MS")))),
egressProxyMaxPendingBytes: Math.max(64 * 1024, Math.min(64 * 1024 * 1024, Math.floor(readNumberEnv("PROVIDER_EGRESS_PROXY_MAX_PENDING_BYTES", 4 * 1024 * 1024, "UNIDESK_PROVIDER_EGRESS_PROXY_MAX_PENDING_BYTES")))),
logFile: readOptionalStringEnv("LOG_FILE") ?? `/var/log/unidesk/provider-gateway-${providerId}.jsonl`,
};
}
@@ -587,6 +597,17 @@ function currentLabels(): ProviderLabels {
providerGatewayEgressProxyPort: config.egressProxyPort,
providerGatewayEgressProxyConnected: egressProxyStatus === null ? false : egressProxyStatus.connected === true,
providerGatewayEgressProxyActiveTunnels: egressProxyStatus === null ? 0 : egressProxyStatus.activeTunnels ?? 0,
providerGatewayEgressProxyPendingTunnels: egressProxyStatus === null ? 0 : egressProxyStatus.pendingTunnels ?? 0,
providerGatewayEgressProxyOpenedTunnels: egressProxyStatus === null ? 0 : egressProxyStatus.openedTunnels ?? 0,
providerGatewayEgressProxyStaleTunnels: egressProxyStatus === null ? 0 : egressProxyStatus.staleTunnels ?? 0,
providerGatewayEgressProxyOldestTunnelAgeMs: egressProxyStatus === null ? 0 : egressProxyStatus.oldestTunnelAgeMs ?? 0,
providerGatewayEgressProxyOpenTimeoutMs: egressProxyStatus === null ? config.egressProxyOpenTimeoutMs : egressProxyStatus.openTimeoutMs ?? config.egressProxyOpenTimeoutMs,
providerGatewayEgressProxyIdleTimeoutMs: egressProxyStatus === null ? config.egressProxyIdleTimeoutMs : egressProxyStatus.idleTimeoutMs ?? config.egressProxyIdleTimeoutMs,
providerGatewayEgressProxyMaxTunnelAgeMs: egressProxyStatus === null ? config.egressProxyMaxTunnelAgeMs : egressProxyStatus.maxTunnelAgeMs ?? config.egressProxyMaxTunnelAgeMs,
providerGatewayEgressProxyStaleTunnelIdleMs: egressProxyStatus === null ? config.egressProxyStaleTunnelIdleMs : egressProxyStatus.staleTunnelIdleMs ?? config.egressProxyStaleTunnelIdleMs,
providerGatewayEgressProxyMaxPendingBytes: egressProxyStatus === null ? config.egressProxyMaxPendingBytes : egressProxyStatus.maxPendingBytes ?? config.egressProxyMaxPendingBytes,
providerGatewayEgressProxyActiveTunnelDetails: egressProxyStatus === null ? [] : egressProxyStatus.activeTunnelDetails ?? [],
providerGatewayEgressProxyRecentClosedTunnels: egressProxyStatus === null ? [] : egressProxyStatus.recentClosedTunnels ?? [],
providerGatewaySshDataTransport: "tcp-pool",
providerGatewaySshDataHost: config.providerDataHost,
providerGatewaySshDataPort: config.providerDataPort,
@@ -2706,9 +2727,15 @@ if (config.egressProxyEnabled) {
providerId: config.providerId,
listenHost: config.egressProxyListenHost,
listenPort: config.egressProxyPort,
openTimeoutMs: config.egressProxyOpenTimeoutMs,
idleTimeoutMs: config.egressProxyIdleTimeoutMs,
maxTunnelAgeMs: config.egressProxyMaxTunnelAgeMs,
staleTunnelIdleMs: config.egressProxyStaleTunnelIdleMs,
maxPendingBytes: config.egressProxyMaxPendingBytes,
isCoreConnected: () => socket?.readyState === WebSocket.OPEN,
sendToCore: (message) => sendJsonOk(message),
logger,
onStatusChanged: () => sendHeartbeat(),
});
}