From dd448c7218223f0275f3e3e9012b76693e1aa938 Mon Sep 17 00:00:00 2001 From: Codex Date: Sun, 17 May 2026 08:39:03 +0000 Subject: [PATCH] Bound provider egress tunnel lifecycle --- docs/reference/observability.md | 2 +- docs/reference/provider-gateway.md | 4 +- src/components/backend-core/src/egress-tcp.ts | 49 ++++++- src/components/backend-core/src/types.ts | 2 + src/components/provider-gateway/package.json | 2 +- .../provider-gateway/src/egress-proxy.ts | 121 +++++++++++++++--- 6 files changed, 154 insertions(+), 26 deletions(-) diff --git a/docs/reference/observability.md b/docs/reference/observability.md index e4edd79f..e2be8653 100644 --- a/docs/reference/observability.md +++ b/docs/reference/observability.md @@ -38,6 +38,6 @@ frontend Bun server 必须提供同源 `/api/frontend-performance`,记录 webu 性能优化必须先用这些指标锁定慢操作名称、路径、耗时和代理层级,再改后端查询或前后端通信策略;不得只凭主观体感改 UI。Code Queue 这类控制面页面出现 `core_proxy`、`GET /api/microservices/code-queue/proxy/api/tasks/overview`、`POST /api/microservices/code-queue/proxy/api/tasks//read` 等超过 1s 的慢操作时,应保留优化前后的性能面板证据,并同时记录 live API 耗时、容器内存、`/health` 存储摘要和是否仍通过 PostgreSQL/append-only archive 重建历史数据。短 TTL cache、warmup 或页面内存缓存只能作为重复请求抖动保护,性能证据必须证明数据库索引/聚合、分页和渐进式披露本身已把核心路径降到目标内,不能用长缓存遮蔽慢 SQL 或全量 JSON 物化。 -当最近失败请求集中出现 frontend `core_proxy` 502/503/504,路径为 `/api/microservices/code-queue/proxy/...` 的 overview、trace 或 summary,且 k3s/k8s Pod 仍在运行时,必须先运行 `bun scripts/cli.ts microservice diagnostics code-queue`,区分 provider-gateway online、WebSocket HTTP tunnel、k3sctl-adapter、Kubernetes API service proxy 和目标 Service 五段状态。provider tunnel 类失败必须记录响应 body/headers 中的 `requestId`、`stage`、`failureReason`、`x-unidesk-request-id` 和 `x-unidesk-tunnel-error`;如需主动验证错误结构,运行 `bun scripts/cli.ts microservice tunnel-self-test code-queue`,该自测应返回预期失败但 `ok=true` 的诊断结果。随后再继续判断“Kubernetes API service proxy 不可达”“Code Queue 进程不可达”和“Code Queue event loop 被热路径同步工作饿死”。排障顺序是同时查看 `/api/frontend-performance`、`/api/performance`、`k3sctl-adapter` `/api/control-plane`、Kubernetes Pod `/live`、`/health`、overview/trace-step curl、`kubectl top pod` 或 Docker stats、容器 `RestartCount`/`OOMKilled` 和 Code Queue 日志;如果 Pod 内 `/health` 也超时,应优先检查实时 output 发布、archive 读取、transcript 构建、统计计算、启动维护、历史 OA backfill 和远程 Provider 准备/SSH 子进程是否阻塞 event loop,而不是先调整 frontend 渲染或代理超时。Code Queue 默认不得在启动时自动执行历史 OA backfill 或通知表索引维护;显式 backfill 必须作为运维动作记录,并在运行期间并发证明 `/live`、`/health` 与 `/api/tasks/overview` 仍快速返回。涉及 D601 等远程 Provider 时,还要检查 `runCodeQueueSsh`/开发容器准备是否仍存在同步子进程、无 timeout 的 SSH、无上限 stdout/stderr 或 stale TUN 重建等待;修复后必须在远程准备探针运行期间并发证明 Pod `/health` 与 `/api/tasks/overview` 仍快速返回。 +当最近失败请求集中出现 frontend `core_proxy` 502/503/504,路径为 `/api/microservices/code-queue/proxy/...` 的 overview、trace 或 summary,且 k3s/k8s Pod 仍在运行时,必须先运行 `bun scripts/cli.ts microservice diagnostics code-queue`,区分 provider-gateway online、WebSocket HTTP tunnel、k3sctl-adapter、Kubernetes API service proxy 和目标 Service 五段状态。provider tunnel 类失败必须记录响应 body/headers 中的 `requestId`、`stage`、`failureReason`、`x-unidesk-request-id` 和 `x-unidesk-tunnel-error`;如需主动验证错误结构,运行 `bun scripts/cli.ts microservice tunnel-self-test code-queue`,该自测应返回预期失败但 `ok=true` 的诊断结果。随后再继续判断“Kubernetes API service proxy 不可达”“Code Queue 进程不可达”和“Code Queue event loop 被热路径同步工作饿死”。如果 `debug health` 或 provider-gateway egress health 显示 `providerGatewayEgressProxyActiveTunnels` 持续偏高、`pendingTunnels` 非零或 `oldestTunnelAgeMs` 长时间增长,应先按 provider-gateway egress tunnel 生命周期排障,确认 `egress_tcp_open`、connect timeout、idle cleanup 与 core socket close 清理是否生效。排障顺序是同时查看 `/api/frontend-performance`、`/api/performance`、`k3sctl-adapter` `/api/control-plane`、Kubernetes Pod `/live`、`/health`、overview/trace-step curl、`kubectl top pod` 或 Docker stats、容器 `RestartCount`/`OOMKilled` 和 Code Queue 日志;如果 Pod 内 `/health` 也超时,应优先检查实时 output 发布、archive 读取、transcript 构建、统计计算、启动维护、历史 OA backfill 和远程 Provider 准备/SSH 子进程是否阻塞 event loop,而不是先调整 frontend 渲染或代理超时。Code Queue 默认不得在启动时自动执行历史 OA backfill 或通知表索引维护;显式 backfill 必须作为运维动作记录,并在运行期间并发证明 `/live`、`/health` 与 `/api/tasks/overview` 仍快速返回。涉及 D601 等远程 Provider 时,还要检查 `runCodeQueueSsh`/开发容器准备是否仍存在同步子进程、无 timeout 的 SSH、无上限 stdout/stderr 或 stale TUN 重建等待;修复后必须在远程准备探针运行期间并发证明 Pod `/health` 与 `/api/tasks/overview` 仍快速返回。 Code Queue task 明明产出最终回复却反复 `retry_wait` 时,应优先用任务详情里的 latest attempt 字段核查 `terminalStatus`、`transportClosedBeforeTerminal`、`appServerExitCode`、`finalResponseChars`、`judge.raw._safetyOverride` 和 attempt output。OpenCode 远程任务中,`opencode completed status=completed exit=0` 加当前 attempt 非空 assistant 输出应对应 `terminalStatus=completed`、`transportClosedBeforeTerminal=false`;如果因为缺少 `step_finish` 事件仍触发 `_safetyOverride=terminal_not_completed`,说明协议终态归一化有回归。相反,当前 attempt 没有最终 assistant response 时即使 tool/read/bash 证据完整,也必须 retry,不能用旧 `task.finalResponse` 或 reasoning/tool evidence 代替可见最终回复。 diff --git a/docs/reference/provider-gateway.md b/docs/reference/provider-gateway.md index 3290151d..f20c2fee 100644 --- a/docs/reference/provider-gateway.md +++ b/docs/reference/provider-gateway.md @@ -100,10 +100,12 @@ backend-core 必须把 provider WebSocket HTTP tunnel 的失败分类到响应 b provider-gateway 可以提供 egress HTTP CONNECT 代理,用于让 Code Queue、Pipeline runner、target-side Docker build 等节点侧执行环境通过既有 provider WebSocket 通道出网。代理默认监听容器内 `0.0.0.0:18789`,节点部署必须只发布为宿主 loopback `127.0.0.1:18789->18789/tcp`,不得开放公网端口;普通 Docker 执行容器可通过同一私有 Docker network 访问 provider-gateway 容器名,k3s/k8s Pod 必须通过显式 Kubernetes Service 暴露同节点 provider-gateway 私有 endpoint,例如 D601 Code Queue 使用 selector 指向 hostNetwork 桥接 Pod 的 `d601-provider-egress-proxy.unidesk.svc.cluster.local:18789`,不得把固定 Docker bridge IP、手工 EndpointSlice 或该 egress Service 当作业务 HTTP 入口。代理只负责把本地 CONNECT/absolute HTTP 请求转换为 `egress_tcp_open`、`egress_tcp_data`、`egress_tcp_close` 消息;backend-core 在主 server 侧建立真实 TCP 连接并把数据回传,避免 D601 等计算节点本地网络不可达时卡死 Codex/Git/NPM/apt/Playwright。 -该能力属于 provider-gateway 通道能力,register/heartbeat 的 `unideskCapabilities` 必须包含 `network.egress-proxy`,labels 必须上报 `providerGatewayEgressProxy*` 状态。不得再为某个用户服务单独注册伪 provider 来实现出网代理;否则节点列表会出现虚假 provider,且代理、统计、升级路径会形成多套通道。代理健康检查使用 `GET /__unidesk/egress-proxy/health`,返回 `connected`、`providerId`、`activeTunnels` 和监听端口;业务服务自己的 `/health` 应把该结果作为排障证据透出。 +该能力属于 provider-gateway 通道能力,register/heartbeat 的 `unideskCapabilities` 必须包含 `network.egress-proxy`,labels 必须上报 `providerGatewayEgressProxy*` 状态。不得再为某个用户服务单独注册伪 provider 来实现出网代理;否则节点列表会出现虚假 provider,且代理、统计、升级路径会形成多套通道。代理健康检查使用 `GET /__unidesk/egress-proxy/health`,返回 `connected`、`providerId`、`activeTunnels`、`pendingTunnels`、`oldestTunnelAgeMs`、`openTimeoutMs`、`idleTimeoutMs` 和监听端口;业务服务自己的 `/health` 应把该结果作为排障证据透出。 egress proxy 的长期边界是“统一 provider 通道,不引入第二控制面”。backend-core 只接受在线 provider socket 上的 `egress_tcp_*` 消息,并在该 socket 关闭时销毁全部对应 TCP relay;provider-gateway 只维护本地 HTTP proxy 与 WebSocket 消息映射,不保存业务状态,不参与任务调度、统计或节点注册以外的控制面。执行容器、用户服务、Pipeline runner 和 provider-side deploy build 不允许直接连接 backend-core provider ingress,也不允许携带 provider token 自行注册;需要出网时只能连接同节点 provider-gateway 的私有 proxy endpoint。当前 k3s/k8s Code Queue 通过 `d601-provider-egress-proxy` Kubernetes Service 连接 D601 provider-gateway egress endpoint,这是 Pod 内的出网入口,不是业务 HTTP 代理入口,也不能替代 Kubernetes API service proxy。部署构建同样不得新建 SSH SOCKS、公网 master proxy 或宿主全局代理;构建脚本只能把 provider-gateway WS egress 作为短生命周期环境变量和 Docker build-arg 注入,并配合目标节点本地 BuildKit/image cache 避免重复下载大依赖层。 +egress tunnel 必须有生命周期边界:provider-gateway 发出 `egress_tcp_open` 后如果主 server 未在 `openTimeoutMs` 内返回 `egress_tcp_opened` 或 close,必须主动关闭本地 client 并向 core 发送 `egress_tcp_close`;provider-gateway 与 backend-core 都必须对长时间无数据的 relay 执行 idle 清理,避免 provider WebSocket 抖动、TCP connect 卡住或上游未关闭时留下 stale tunnel。排障时如果 `activeTunnels` 持续增长、`pendingTunnels` 非零或 `oldestTunnelAgeMs` 明显超过业务请求耗时,应先看 provider-gateway 与 backend-core egress 清理日志,再判断 Code Queue、PostgreSQL 或 OA Event Flow 本身是否慢。 + 故障语义必须显式,不允许静默 fallback。provider-gateway 到 backend-core 的 WebSocket 未连接时,本地 proxy 必须返回 503;执行容器不能自动绕过到 D601 本地直连公网、外部公共代理或主 server 公网 HTTP 端口。`NO_PROXY` 只用于 PostgreSQL、OA Event Flow、ClaudeQQ、frontend/backend-core 内网代理、provider-gateway health 等明确内网链路,不能把 GitHub、模型 API、npm registry 等外部目标加入绕过列表。`hyueapi.com` 是明确的模型 API 例外:该上游会拒绝 provider-gateway egress proxy 出口,Code Queue 必须用 `CODE_QUEUE_EGRESS_PROXY_NO_PROXY` / `NO_PROXY` 将 `hyueapi.com,.hyueapi.com` 配成直连,其它模型 API 仍不得默认绕过 proxy。验收必须同时证明 provider-gateway labels、业务服务 `/health` 和执行容器内 `curl -I https://...` 都走同一 proxy path,hyueapi 例外则以 Code Queue `/health.egressProxy.noProxy` 和目标任务成功完成作为证据。 ## Gateway Version Metadata diff --git a/src/components/backend-core/src/egress-tcp.ts b/src/components/backend-core/src/egress-tcp.ts index dedb45fd..b9cad80d 100644 --- a/src/components/backend-core/src/egress-tcp.ts +++ b/src/components/backend-core/src/egress-tcp.ts @@ -23,6 +23,9 @@ function isValidEgressPort(port: number): boolean { return Number.isInteger(port) && port > 0 && port <= 65_535; } +const egressTcpConnectTimeoutMs = 15_000; +const egressTcpIdleTimeoutMs = 600_000; + function sendEgressClose(provider: ProviderSocket, connectionId: string, error?: string): void { const message: CoreEgressTcpCloseMessage = error === undefined ? { type: "egress_tcp_close", connectionId } @@ -30,13 +33,31 @@ function sendEgressClose(provider: ProviderSocket, connectionId: string, error?: wsSendJson(provider, message); } +function clearConnectionTimers(connection: EgressTcpConnection): void { + if (connection.connectTimer !== null) clearTimeout(connection.connectTimer); + if (connection.idleTimer !== null) clearTimeout(connection.idleTimer); + connection.connectTimer = null; + connection.idleTimer = null; +} + function closeEgressTcpConnection(providerId: string, connectionId: string, error?: string): void { const key = egressTcpKey(providerId, connectionId); const connection = ctx.activeEgressTcpConnections.get(key); if (connection === undefined) return; ctx.activeEgressTcpConnections.delete(key); + clearConnectionTimers(connection); connection.socket.destroy(); - if (error !== undefined) sendEgressClose(connection.provider, connectionId, error); + if (error !== undefined) { + logger("warn", "egress_tcp_connection_closed", { providerId, connectionId, error }); + sendEgressClose(connection.provider, connectionId, error); + } +} + +function refreshConnectionIdle(connection: EgressTcpConnection): void { + if (connection.idleTimer !== null) clearTimeout(connection.idleTimer); + connection.idleTimer = setTimeout(() => { + closeEgressTcpConnection(connection.providerId, connection.connectionId, "egress tcp idle timeout"); + }, egressTcpIdleTimeoutMs); } export function handleEgressTcpOpen(ws: ProviderSocket, message: ProviderEgressTcpOpenMessage): void { @@ -49,13 +70,32 @@ export function handleEgressTcpOpen(ws: ProviderSocket, message: ProviderEgressT const key = egressTcpKey(message.providerId, message.connectionId); closeEgressTcpConnection(message.providerId, message.connectionId); const socket = connectTcp({ host, port }); - const connection: EgressTcpConnection = { providerId: message.providerId, connectionId: message.connectionId, socket, provider: ws }; + const connection: EgressTcpConnection = { + providerId: message.providerId, + connectionId: message.connectionId, + socket, + provider: ws, + connectTimer: null, + idleTimer: null, + }; ctx.activeEgressTcpConnections.set(key, connection); + connection.connectTimer = setTimeout(() => { + closeEgressTcpConnection(message.providerId, message.connectionId, "egress tcp connect timeout"); + }, egressTcpConnectTimeoutMs); + refreshConnectionIdle(connection); socket.on("connect", () => { + if (ctx.activeEgressTcpConnections.get(key) !== connection) return; + if (connection.connectTimer !== null) { + clearTimeout(connection.connectTimer); + connection.connectTimer = null; + } + refreshConnectionIdle(connection); const opened: CoreEgressTcpOpenedMessage = { type: "egress_tcp_opened", connectionId: message.connectionId }; wsSendJson(ws, opened); }); socket.on("data", (chunk) => { + if (ctx.activeEgressTcpConnections.get(key) !== connection) return; + refreshConnectionIdle(connection); const data: CoreEgressTcpDataMessage = { type: "egress_tcp_data", connectionId: message.connectionId, @@ -67,11 +107,13 @@ export function handleEgressTcpOpen(ws: ProviderSocket, message: ProviderEgressT socket.on("close", () => { if (ctx.activeEgressTcpConnections.get(key) !== connection) return; ctx.activeEgressTcpConnections.delete(key); + clearConnectionTimers(connection); sendEgressClose(ws, message.connectionId); }); socket.on("error", (error) => { if (ctx.activeEgressTcpConnections.get(key) !== connection) return; ctx.activeEgressTcpConnections.delete(key); + clearConnectionTimers(connection); sendEgressClose(ws, message.connectionId, error.message); }); } @@ -79,6 +121,7 @@ export function handleEgressTcpOpen(ws: ProviderSocket, message: ProviderEgressT export function handleEgressTcpData(message: ProviderEgressTcpDataMessage): void { const connection = ctx.activeEgressTcpConnections.get(egressTcpKey(message.providerId, message.connectionId)); if (connection === undefined) return; + refreshConnectionIdle(connection); connection.socket.write(Buffer.from(message.data, message.encoding === "base64" ? "base64" : "utf8")); } @@ -90,6 +133,7 @@ export function closeEgressTcpConnectionsForProvider(providerId: string): void { for (const [key, connection] of ctx.activeEgressTcpConnections) { if (connection.providerId !== providerId) continue; ctx.activeEgressTcpConnections.delete(key); + clearConnectionTimers(connection); connection.socket.destroy(); } } @@ -98,6 +142,7 @@ export function closeEgressTcpConnectionsForSocket(provider: ProviderSocket): vo for (const [key, connection] of ctx.activeEgressTcpConnections) { if (connection.provider !== provider) continue; ctx.activeEgressTcpConnections.delete(key); + clearConnectionTimers(connection); connection.socket.destroy(); } } diff --git a/src/components/backend-core/src/types.ts b/src/components/backend-core/src/types.ts index f887c182..c3c34480 100644 --- a/src/components/backend-core/src/types.ts +++ b/src/components/backend-core/src/types.ts @@ -166,6 +166,8 @@ export interface EgressTcpConnection { connectionId: string; socket: Socket; provider: ProviderSocket; + connectTimer: ReturnType | null; + idleTimer: ReturnType | null; } export type HttpTunnelFailureReason = diff --git a/src/components/provider-gateway/package.json b/src/components/provider-gateway/package.json index a21be47c..41c1b161 100644 --- a/src/components/provider-gateway/package.json +++ b/src/components/provider-gateway/package.json @@ -1,6 +1,6 @@ { "name": "@unidesk/provider-gateway", - "version": "0.2.21", + "version": "0.2.22", "private": true, "type": "module", "scripts": { diff --git a/src/components/provider-gateway/src/egress-proxy.ts b/src/components/provider-gateway/src/egress-proxy.ts index 278dd54e..848ff07d 100644 --- a/src/components/provider-gateway/src/egress-proxy.ts +++ b/src/components/provider-gateway/src/egress-proxy.ts @@ -29,7 +29,12 @@ interface Tunnel { method: string; opened: boolean; pending: Buffer[]; + pendingBytes: number; closed: boolean; + createdAt: number; + lastActivityAt: number; + openTimer: ReturnType | null; + idleTimer: ReturnType | null; } export interface ProviderEgressProxyHandle { @@ -111,6 +116,10 @@ function proxyUrlFor(host: string, port: number): string { return `http://${host}:${port}`; } +const tunnelOpenTimeoutMs = 15_000; +const tunnelIdleTimeoutMs = 600_000; +const maxPendingBytes = 4 * 1024 * 1024; + export function startProviderEgressProxy(options: ProviderEgressProxyOptions): ProviderEgressProxyHandle { const proxyUrl = proxyUrlFor(options.listenHost, options.listenPort); const tunnels = new Map(); @@ -118,25 +127,73 @@ export function startProviderEgressProxy(options: ProviderEgressProxyOptions): P const send = (message: EgressToCoreMessage): boolean => options.sendToCore(message); - const status = (): Record => ({ - enabled: true, - providerId: options.providerId, - connected: options.isCoreConnected(), - proxyUrl, - listenHost: options.listenHost, - listenPort: options.listenPort, - activeTunnels: tunnels.size, - channel: "provider-gateway", - }); + const status = (): Record => { + const now = Date.now(); + const tunnelList = Array.from(tunnels.values()); + const ages = tunnelList.map((tunnel) => now - tunnel.createdAt); + return { + enabled: true, + providerId: options.providerId, + connected: options.isCoreConnected(), + proxyUrl, + listenHost: options.listenHost, + listenPort: options.listenPort, + activeTunnels: tunnels.size, + pendingTunnels: tunnelList.filter((tunnel) => !tunnel.opened).length, + oldestTunnelAgeMs: ages.length > 0 ? Math.max(...ages) : 0, + openTimeoutMs: tunnelOpenTimeoutMs, + idleTimeoutMs: tunnelIdleTimeoutMs, + maxPendingBytes, + channel: "provider-gateway", + }; + }; + + const clearTunnelTimers = (tunnel: Tunnel): void => { + if (tunnel.openTimer !== null) clearTimeout(tunnel.openTimer); + if (tunnel.idleTimer !== null) clearTimeout(tunnel.idleTimer); + tunnel.openTimer = null; + tunnel.idleTimer = null; + }; + + const destroyTunnel = (tunnel: Tunnel, notifyCore: boolean, error?: string): void => { + tunnels.delete(tunnel.id); + tunnel.closed = true; + clearTunnelTimers(tunnel); + tunnel.pending.splice(0); + tunnel.pendingBytes = 0; + if (!tunnel.client.destroyed) tunnel.client.destroy(); + if (notifyCore) send({ type: "egress_tcp_close", providerId: options.providerId, connectionId: tunnel.id, at: nowIso() }); + if (error !== undefined) { + options.logger("warn", "egress_proxy_tunnel_closed", { + connectionId: tunnel.id, + opened: tunnel.opened, + ageMs: Date.now() - tunnel.createdAt, + error, + }); + } + }; const closeTunnel = (id: string, error?: string): void => { const tunnel = tunnels.get(id); if (tunnel === undefined) return; - tunnels.delete(id); - tunnel.closed = true; - if (!tunnel.client.destroyed) tunnel.client.destroy(); - send({ type: "egress_tcp_close", providerId: options.providerId, connectionId: id, at: nowIso() }); - if (error !== undefined) options.logger("warn", "egress_proxy_tunnel_closed", { connectionId: id, error }); + destroyTunnel(tunnel, true, error); + }; + + const refreshTunnelIdle = (tunnel: Tunnel): void => { + if (tunnel.closed) return; + tunnel.lastActivityAt = Date.now(); + if (tunnel.idleTimer !== null) clearTimeout(tunnel.idleTimer); + tunnel.idleTimer = setTimeout(() => closeTunnel(tunnel.id, "egress proxy idle timeout"), tunnelIdleTimeoutMs); + }; + + const queuePendingChunk = (tunnel: Tunnel, chunk: Buffer): boolean => { + if (tunnel.closed) return false; + tunnel.pending.push(chunk); + tunnel.pendingBytes += chunk.byteLength; + refreshTunnelIdle(tunnel); + if (tunnel.pendingBytes <= maxPendingBytes) return true; + closeTunnel(tunnel.id, "egress proxy pending buffer exceeded"); + return false; }; const handleCoreMessage = (message: EgressFromCoreMessage): boolean => { @@ -144,24 +201,29 @@ export function startProviderEgressProxy(options: ProviderEgressProxyOptions): P if (message.type === "egress_tcp_opened") { if (tunnel === undefined || tunnel.closed) return true; tunnel.opened = true; + if (tunnel.openTimer !== null) { + clearTimeout(tunnel.openTimer); + tunnel.openTimer = null; + } + refreshTunnelIdle(tunnel); if (tunnel.method === "CONNECT") { tunnel.client.write("HTTP/1.1 200 Connection Established\r\nProxy-Agent: UniDesk-ProviderGateway\r\n\r\n"); } for (const chunk of tunnel.pending.splice(0)) { send({ type: "egress_tcp_data", providerId: options.providerId, connectionId: tunnel.id, data: chunk.toString("base64"), encoding: "base64", at: nowIso() }); } + tunnel.pendingBytes = 0; return true; } if (message.type === "egress_tcp_data") { if (tunnel === undefined || tunnel.client.destroyed) return true; + refreshTunnelIdle(tunnel); tunnel.client.write(Buffer.from(message.data, message.encoding === "base64" ? "base64" : "utf8")); return true; } if (message.type === "egress_tcp_close") { if (tunnel !== undefined) { - tunnels.delete(message.connectionId); - tunnel.closed = true; - if (!tunnel.client.destroyed) tunnel.client.destroy(); + destroyTunnel(tunnel, false, message.error); } if (message.error !== undefined && message.error.length > 0) { options.logger("warn", "egress_proxy_remote_close", { connectionId: message.connectionId, error: message.error }); @@ -222,14 +284,28 @@ export function startProviderEgressProxy(options: ProviderEgressProxyOptions): P fail("400 Bad Request", "unsupported proxy target\n"); return; } - const tunnel: Tunnel = { id, client, method: parsed.method, opened: false, pending: [], closed: false }; + const createdAt = Date.now(); + const tunnel: Tunnel = { + id, + client, + method: parsed.method, + opened: false, + pending: [], + pendingBytes: 0, + closed: false, + createdAt, + lastActivityAt: createdAt, + openTimer: null, + idleTimer: null, + }; tunnels.set(id, tunnel); client.on("data", (nextChunk) => { const nextBuffer = Buffer.isBuffer(nextChunk) ? nextChunk : Buffer.from(nextChunk); if (!tunnel.opened) { - tunnel.pending.push(nextBuffer); + queuePendingChunk(tunnel, nextBuffer); return; } + refreshTunnelIdle(tunnel); send({ type: "egress_tcp_data", providerId: options.providerId, connectionId: id, data: nextBuffer.toString("base64"), encoding: "base64", at: nowIso() }); }); client.on("close", () => closeTunnel(id)); @@ -238,10 +314,13 @@ export function startProviderEgressProxy(options: ProviderEgressProxyOptions): P if (!opened) { tunnels.delete(id); tunnel.closed = true; + clearTunnelTimers(tunnel); fail("503 Service Unavailable", "provider-gateway core channel is not connected\n"); return; } - if (firstPayload !== null) tunnel.pending.push(firstPayload); + tunnel.openTimer = setTimeout(() => closeTunnel(id, "egress proxy open timeout"), tunnelOpenTimeoutMs); + refreshTunnelIdle(tunnel); + if (firstPayload !== null) queuePendingChunk(tunnel, firstPayload); }); client.on("error", () => undefined); });