Bound provider egress tunnel lifecycle
This commit is contained in:
@@ -38,6 +38,6 @@ frontend Bun server 必须提供同源 `/api/frontend-performance`,记录 webu
|
||||
|
||||
性能优化必须先用这些指标锁定慢操作名称、路径、耗时和代理层级,再改后端查询或前后端通信策略;不得只凭主观体感改 UI。Code Queue 这类控制面页面出现 `core_proxy`、`GET /api/microservices/code-queue/proxy/api/tasks/overview`、`POST /api/microservices/code-queue/proxy/api/tasks/<id>/read` 等超过 1s 的慢操作时,应保留优化前后的性能面板证据,并同时记录 live API 耗时、容器内存、`/health` 存储摘要和是否仍通过 PostgreSQL/append-only archive 重建历史数据。短 TTL cache、warmup 或页面内存缓存只能作为重复请求抖动保护,性能证据必须证明数据库索引/聚合、分页和渐进式披露本身已把核心路径降到目标内,不能用长缓存遮蔽慢 SQL 或全量 JSON 物化。
|
||||
|
||||
当最近失败请求集中出现 frontend `core_proxy` 502/503/504,路径为 `/api/microservices/code-queue/proxy/...` 的 overview、trace 或 summary,且 k3s/k8s Pod 仍在运行时,必须先运行 `bun scripts/cli.ts microservice diagnostics code-queue`,区分 provider-gateway online、WebSocket HTTP tunnel、k3sctl-adapter、Kubernetes API service proxy 和目标 Service 五段状态。provider tunnel 类失败必须记录响应 body/headers 中的 `requestId`、`stage`、`failureReason`、`x-unidesk-request-id` 和 `x-unidesk-tunnel-error`;如需主动验证错误结构,运行 `bun scripts/cli.ts microservice tunnel-self-test code-queue`,该自测应返回预期失败但 `ok=true` 的诊断结果。随后再继续判断“Kubernetes API service proxy 不可达”“Code Queue 进程不可达”和“Code Queue event loop 被热路径同步工作饿死”。排障顺序是同时查看 `/api/frontend-performance`、`/api/performance`、`k3sctl-adapter` `/api/control-plane`、Kubernetes Pod `/live`、`/health`、overview/trace-step curl、`kubectl top pod` 或 Docker stats、容器 `RestartCount`/`OOMKilled` 和 Code Queue 日志;如果 Pod 内 `/health` 也超时,应优先检查实时 output 发布、archive 读取、transcript 构建、统计计算、启动维护、历史 OA backfill 和远程 Provider 准备/SSH 子进程是否阻塞 event loop,而不是先调整 frontend 渲染或代理超时。Code Queue 默认不得在启动时自动执行历史 OA backfill 或通知表索引维护;显式 backfill 必须作为运维动作记录,并在运行期间并发证明 `/live`、`/health` 与 `/api/tasks/overview` 仍快速返回。涉及 D601 等远程 Provider 时,还要检查 `runCodeQueueSsh`/开发容器准备是否仍存在同步子进程、无 timeout 的 SSH、无上限 stdout/stderr 或 stale TUN 重建等待;修复后必须在远程准备探针运行期间并发证明 Pod `/health` 与 `/api/tasks/overview` 仍快速返回。
|
||||
当最近失败请求集中出现 frontend `core_proxy` 502/503/504,路径为 `/api/microservices/code-queue/proxy/...` 的 overview、trace 或 summary,且 k3s/k8s Pod 仍在运行时,必须先运行 `bun scripts/cli.ts microservice diagnostics code-queue`,区分 provider-gateway online、WebSocket HTTP tunnel、k3sctl-adapter、Kubernetes API service proxy 和目标 Service 五段状态。provider tunnel 类失败必须记录响应 body/headers 中的 `requestId`、`stage`、`failureReason`、`x-unidesk-request-id` 和 `x-unidesk-tunnel-error`;如需主动验证错误结构,运行 `bun scripts/cli.ts microservice tunnel-self-test code-queue`,该自测应返回预期失败但 `ok=true` 的诊断结果。随后再继续判断“Kubernetes API service proxy 不可达”“Code Queue 进程不可达”和“Code Queue event loop 被热路径同步工作饿死”。如果 `debug health` 或 provider-gateway egress health 显示 `providerGatewayEgressProxyActiveTunnels` 持续偏高、`pendingTunnels` 非零或 `oldestTunnelAgeMs` 长时间增长,应先按 provider-gateway egress tunnel 生命周期排障,确认 `egress_tcp_open`、connect timeout、idle cleanup 与 core socket close 清理是否生效。排障顺序是同时查看 `/api/frontend-performance`、`/api/performance`、`k3sctl-adapter` `/api/control-plane`、Kubernetes Pod `/live`、`/health`、overview/trace-step curl、`kubectl top pod` 或 Docker stats、容器 `RestartCount`/`OOMKilled` 和 Code Queue 日志;如果 Pod 内 `/health` 也超时,应优先检查实时 output 发布、archive 读取、transcript 构建、统计计算、启动维护、历史 OA backfill 和远程 Provider 准备/SSH 子进程是否阻塞 event loop,而不是先调整 frontend 渲染或代理超时。Code Queue 默认不得在启动时自动执行历史 OA backfill 或通知表索引维护;显式 backfill 必须作为运维动作记录,并在运行期间并发证明 `/live`、`/health` 与 `/api/tasks/overview` 仍快速返回。涉及 D601 等远程 Provider 时,还要检查 `runCodeQueueSsh`/开发容器准备是否仍存在同步子进程、无 timeout 的 SSH、无上限 stdout/stderr 或 stale TUN 重建等待;修复后必须在远程准备探针运行期间并发证明 Pod `/health` 与 `/api/tasks/overview` 仍快速返回。
|
||||
|
||||
Code Queue task 明明产出最终回复却反复 `retry_wait` 时,应优先用任务详情里的 latest attempt 字段核查 `terminalStatus`、`transportClosedBeforeTerminal`、`appServerExitCode`、`finalResponseChars`、`judge.raw._safetyOverride` 和 attempt output。OpenCode 远程任务中,`opencode completed status=completed exit=0` 加当前 attempt 非空 assistant 输出应对应 `terminalStatus=completed`、`transportClosedBeforeTerminal=false`;如果因为缺少 `step_finish` 事件仍触发 `_safetyOverride=terminal_not_completed`,说明协议终态归一化有回归。相反,当前 attempt 没有最终 assistant response 时即使 tool/read/bash 证据完整,也必须 retry,不能用旧 `task.finalResponse` 或 reasoning/tool evidence 代替可见最终回复。
|
||||
|
||||
@@ -100,10 +100,12 @@ backend-core 必须把 provider WebSocket HTTP tunnel 的失败分类到响应 b
|
||||
|
||||
provider-gateway 可以提供 egress HTTP CONNECT 代理,用于让 Code Queue、Pipeline runner、target-side Docker build 等节点侧执行环境通过既有 provider WebSocket 通道出网。代理默认监听容器内 `0.0.0.0:18789`,节点部署必须只发布为宿主 loopback `127.0.0.1:18789->18789/tcp`,不得开放公网端口;普通 Docker 执行容器可通过同一私有 Docker network 访问 provider-gateway 容器名,k3s/k8s Pod 必须通过显式 Kubernetes Service 暴露同节点 provider-gateway 私有 endpoint,例如 D601 Code Queue 使用 selector 指向 hostNetwork 桥接 Pod 的 `d601-provider-egress-proxy.unidesk.svc.cluster.local:18789`,不得把固定 Docker bridge IP、手工 EndpointSlice 或该 egress Service 当作业务 HTTP 入口。代理只负责把本地 CONNECT/absolute HTTP 请求转换为 `egress_tcp_open`、`egress_tcp_data`、`egress_tcp_close` 消息;backend-core 在主 server 侧建立真实 TCP 连接并把数据回传,避免 D601 等计算节点本地网络不可达时卡死 Codex/Git/NPM/apt/Playwright。
|
||||
|
||||
该能力属于 provider-gateway 通道能力,register/heartbeat 的 `unideskCapabilities` 必须包含 `network.egress-proxy`,labels 必须上报 `providerGatewayEgressProxy*` 状态。不得再为某个用户服务单独注册伪 provider 来实现出网代理;否则节点列表会出现虚假 provider,且代理、统计、升级路径会形成多套通道。代理健康检查使用 `GET /__unidesk/egress-proxy/health`,返回 `connected`、`providerId`、`activeTunnels` 和监听端口;业务服务自己的 `/health` 应把该结果作为排障证据透出。
|
||||
该能力属于 provider-gateway 通道能力,register/heartbeat 的 `unideskCapabilities` 必须包含 `network.egress-proxy`,labels 必须上报 `providerGatewayEgressProxy*` 状态。不得再为某个用户服务单独注册伪 provider 来实现出网代理;否则节点列表会出现虚假 provider,且代理、统计、升级路径会形成多套通道。代理健康检查使用 `GET /__unidesk/egress-proxy/health`,返回 `connected`、`providerId`、`activeTunnels`、`pendingTunnels`、`oldestTunnelAgeMs`、`openTimeoutMs`、`idleTimeoutMs` 和监听端口;业务服务自己的 `/health` 应把该结果作为排障证据透出。
|
||||
|
||||
egress proxy 的长期边界是“统一 provider 通道,不引入第二控制面”。backend-core 只接受在线 provider socket 上的 `egress_tcp_*` 消息,并在该 socket 关闭时销毁全部对应 TCP relay;provider-gateway 只维护本地 HTTP proxy 与 WebSocket 消息映射,不保存业务状态,不参与任务调度、统计或节点注册以外的控制面。执行容器、用户服务、Pipeline runner 和 provider-side deploy build 不允许直接连接 backend-core provider ingress,也不允许携带 provider token 自行注册;需要出网时只能连接同节点 provider-gateway 的私有 proxy endpoint。当前 k3s/k8s Code Queue 通过 `d601-provider-egress-proxy` Kubernetes Service 连接 D601 provider-gateway egress endpoint,这是 Pod 内的出网入口,不是业务 HTTP 代理入口,也不能替代 Kubernetes API service proxy。部署构建同样不得新建 SSH SOCKS、公网 master proxy 或宿主全局代理;构建脚本只能把 provider-gateway WS egress 作为短生命周期环境变量和 Docker build-arg 注入,并配合目标节点本地 BuildKit/image cache 避免重复下载大依赖层。
|
||||
|
||||
egress tunnel 必须有生命周期边界:provider-gateway 发出 `egress_tcp_open` 后如果主 server 未在 `openTimeoutMs` 内返回 `egress_tcp_opened` 或 close,必须主动关闭本地 client 并向 core 发送 `egress_tcp_close`;provider-gateway 与 backend-core 都必须对长时间无数据的 relay 执行 idle 清理,避免 provider WebSocket 抖动、TCP connect 卡住或上游未关闭时留下 stale tunnel。排障时如果 `activeTunnels` 持续增长、`pendingTunnels` 非零或 `oldestTunnelAgeMs` 明显超过业务请求耗时,应先看 provider-gateway 与 backend-core egress 清理日志,再判断 Code Queue、PostgreSQL 或 OA Event Flow 本身是否慢。
|
||||
|
||||
故障语义必须显式,不允许静默 fallback。provider-gateway 到 backend-core 的 WebSocket 未连接时,本地 proxy 必须返回 503;执行容器不能自动绕过到 D601 本地直连公网、外部公共代理或主 server 公网 HTTP 端口。`NO_PROXY` 只用于 PostgreSQL、OA Event Flow、ClaudeQQ、frontend/backend-core 内网代理、provider-gateway health 等明确内网链路,不能把 GitHub、模型 API、npm registry 等外部目标加入绕过列表。`hyueapi.com` 是明确的模型 API 例外:该上游会拒绝 provider-gateway egress proxy 出口,Code Queue 必须用 `CODE_QUEUE_EGRESS_PROXY_NO_PROXY` / `NO_PROXY` 将 `hyueapi.com,.hyueapi.com` 配成直连,其它模型 API 仍不得默认绕过 proxy。验收必须同时证明 provider-gateway labels、业务服务 `/health` 和执行容器内 `curl -I https://...` 都走同一 proxy path,hyueapi 例外则以 Code Queue `/health.egressProxy.noProxy` 和目标任务成功完成作为证据。
|
||||
|
||||
## Gateway Version Metadata
|
||||
|
||||
@@ -23,6 +23,9 @@ function isValidEgressPort(port: number): boolean {
|
||||
return Number.isInteger(port) && port > 0 && port <= 65_535;
|
||||
}
|
||||
|
||||
const egressTcpConnectTimeoutMs = 15_000;
|
||||
const egressTcpIdleTimeoutMs = 600_000;
|
||||
|
||||
function sendEgressClose(provider: ProviderSocket, connectionId: string, error?: string): void {
|
||||
const message: CoreEgressTcpCloseMessage = error === undefined
|
||||
? { type: "egress_tcp_close", connectionId }
|
||||
@@ -30,13 +33,31 @@ function sendEgressClose(provider: ProviderSocket, connectionId: string, error?:
|
||||
wsSendJson(provider, message);
|
||||
}
|
||||
|
||||
function clearConnectionTimers(connection: EgressTcpConnection): void {
|
||||
if (connection.connectTimer !== null) clearTimeout(connection.connectTimer);
|
||||
if (connection.idleTimer !== null) clearTimeout(connection.idleTimer);
|
||||
connection.connectTimer = null;
|
||||
connection.idleTimer = null;
|
||||
}
|
||||
|
||||
function closeEgressTcpConnection(providerId: string, connectionId: string, error?: string): void {
|
||||
const key = egressTcpKey(providerId, connectionId);
|
||||
const connection = ctx.activeEgressTcpConnections.get(key);
|
||||
if (connection === undefined) return;
|
||||
ctx.activeEgressTcpConnections.delete(key);
|
||||
clearConnectionTimers(connection);
|
||||
connection.socket.destroy();
|
||||
if (error !== undefined) sendEgressClose(connection.provider, connectionId, error);
|
||||
if (error !== undefined) {
|
||||
logger("warn", "egress_tcp_connection_closed", { providerId, connectionId, error });
|
||||
sendEgressClose(connection.provider, connectionId, error);
|
||||
}
|
||||
}
|
||||
|
||||
function refreshConnectionIdle(connection: EgressTcpConnection): void {
|
||||
if (connection.idleTimer !== null) clearTimeout(connection.idleTimer);
|
||||
connection.idleTimer = setTimeout(() => {
|
||||
closeEgressTcpConnection(connection.providerId, connection.connectionId, "egress tcp idle timeout");
|
||||
}, egressTcpIdleTimeoutMs);
|
||||
}
|
||||
|
||||
export function handleEgressTcpOpen(ws: ProviderSocket, message: ProviderEgressTcpOpenMessage): void {
|
||||
@@ -49,13 +70,32 @@ export function handleEgressTcpOpen(ws: ProviderSocket, message: ProviderEgressT
|
||||
const key = egressTcpKey(message.providerId, message.connectionId);
|
||||
closeEgressTcpConnection(message.providerId, message.connectionId);
|
||||
const socket = connectTcp({ host, port });
|
||||
const connection: EgressTcpConnection = { providerId: message.providerId, connectionId: message.connectionId, socket, provider: ws };
|
||||
const connection: EgressTcpConnection = {
|
||||
providerId: message.providerId,
|
||||
connectionId: message.connectionId,
|
||||
socket,
|
||||
provider: ws,
|
||||
connectTimer: null,
|
||||
idleTimer: null,
|
||||
};
|
||||
ctx.activeEgressTcpConnections.set(key, connection);
|
||||
connection.connectTimer = setTimeout(() => {
|
||||
closeEgressTcpConnection(message.providerId, message.connectionId, "egress tcp connect timeout");
|
||||
}, egressTcpConnectTimeoutMs);
|
||||
refreshConnectionIdle(connection);
|
||||
socket.on("connect", () => {
|
||||
if (ctx.activeEgressTcpConnections.get(key) !== connection) return;
|
||||
if (connection.connectTimer !== null) {
|
||||
clearTimeout(connection.connectTimer);
|
||||
connection.connectTimer = null;
|
||||
}
|
||||
refreshConnectionIdle(connection);
|
||||
const opened: CoreEgressTcpOpenedMessage = { type: "egress_tcp_opened", connectionId: message.connectionId };
|
||||
wsSendJson(ws, opened);
|
||||
});
|
||||
socket.on("data", (chunk) => {
|
||||
if (ctx.activeEgressTcpConnections.get(key) !== connection) return;
|
||||
refreshConnectionIdle(connection);
|
||||
const data: CoreEgressTcpDataMessage = {
|
||||
type: "egress_tcp_data",
|
||||
connectionId: message.connectionId,
|
||||
@@ -67,11 +107,13 @@ export function handleEgressTcpOpen(ws: ProviderSocket, message: ProviderEgressT
|
||||
socket.on("close", () => {
|
||||
if (ctx.activeEgressTcpConnections.get(key) !== connection) return;
|
||||
ctx.activeEgressTcpConnections.delete(key);
|
||||
clearConnectionTimers(connection);
|
||||
sendEgressClose(ws, message.connectionId);
|
||||
});
|
||||
socket.on("error", (error) => {
|
||||
if (ctx.activeEgressTcpConnections.get(key) !== connection) return;
|
||||
ctx.activeEgressTcpConnections.delete(key);
|
||||
clearConnectionTimers(connection);
|
||||
sendEgressClose(ws, message.connectionId, error.message);
|
||||
});
|
||||
}
|
||||
@@ -79,6 +121,7 @@ export function handleEgressTcpOpen(ws: ProviderSocket, message: ProviderEgressT
|
||||
export function handleEgressTcpData(message: ProviderEgressTcpDataMessage): void {
|
||||
const connection = ctx.activeEgressTcpConnections.get(egressTcpKey(message.providerId, message.connectionId));
|
||||
if (connection === undefined) return;
|
||||
refreshConnectionIdle(connection);
|
||||
connection.socket.write(Buffer.from(message.data, message.encoding === "base64" ? "base64" : "utf8"));
|
||||
}
|
||||
|
||||
@@ -90,6 +133,7 @@ export function closeEgressTcpConnectionsForProvider(providerId: string): void {
|
||||
for (const [key, connection] of ctx.activeEgressTcpConnections) {
|
||||
if (connection.providerId !== providerId) continue;
|
||||
ctx.activeEgressTcpConnections.delete(key);
|
||||
clearConnectionTimers(connection);
|
||||
connection.socket.destroy();
|
||||
}
|
||||
}
|
||||
@@ -98,6 +142,7 @@ export function closeEgressTcpConnectionsForSocket(provider: ProviderSocket): vo
|
||||
for (const [key, connection] of ctx.activeEgressTcpConnections) {
|
||||
if (connection.provider !== provider) continue;
|
||||
ctx.activeEgressTcpConnections.delete(key);
|
||||
clearConnectionTimers(connection);
|
||||
connection.socket.destroy();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -166,6 +166,8 @@ export interface EgressTcpConnection {
|
||||
connectionId: string;
|
||||
socket: Socket;
|
||||
provider: ProviderSocket;
|
||||
connectTimer: ReturnType<typeof setTimeout> | null;
|
||||
idleTimer: ReturnType<typeof setTimeout> | null;
|
||||
}
|
||||
|
||||
export type HttpTunnelFailureReason =
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@unidesk/provider-gateway",
|
||||
"version": "0.2.21",
|
||||
"version": "0.2.22",
|
||||
"private": true,
|
||||
"type": "module",
|
||||
"scripts": {
|
||||
|
||||
@@ -29,7 +29,12 @@ interface Tunnel {
|
||||
method: string;
|
||||
opened: boolean;
|
||||
pending: Buffer[];
|
||||
pendingBytes: number;
|
||||
closed: boolean;
|
||||
createdAt: number;
|
||||
lastActivityAt: number;
|
||||
openTimer: ReturnType<typeof setTimeout> | null;
|
||||
idleTimer: ReturnType<typeof setTimeout> | null;
|
||||
}
|
||||
|
||||
export interface ProviderEgressProxyHandle {
|
||||
@@ -111,6 +116,10 @@ function proxyUrlFor(host: string, port: number): string {
|
||||
return `http://${host}:${port}`;
|
||||
}
|
||||
|
||||
const tunnelOpenTimeoutMs = 15_000;
|
||||
const tunnelIdleTimeoutMs = 600_000;
|
||||
const maxPendingBytes = 4 * 1024 * 1024;
|
||||
|
||||
export function startProviderEgressProxy(options: ProviderEgressProxyOptions): ProviderEgressProxyHandle {
|
||||
const proxyUrl = proxyUrlFor(options.listenHost, options.listenPort);
|
||||
const tunnels = new Map<string, Tunnel>();
|
||||
@@ -118,25 +127,73 @@ export function startProviderEgressProxy(options: ProviderEgressProxyOptions): P
|
||||
|
||||
const send = (message: EgressToCoreMessage): boolean => options.sendToCore(message);
|
||||
|
||||
const status = (): Record<string, JsonValue> => ({
|
||||
enabled: true,
|
||||
providerId: options.providerId,
|
||||
connected: options.isCoreConnected(),
|
||||
proxyUrl,
|
||||
listenHost: options.listenHost,
|
||||
listenPort: options.listenPort,
|
||||
activeTunnels: tunnels.size,
|
||||
channel: "provider-gateway",
|
||||
});
|
||||
const status = (): Record<string, JsonValue> => {
|
||||
const now = Date.now();
|
||||
const tunnelList = Array.from(tunnels.values());
|
||||
const ages = tunnelList.map((tunnel) => now - tunnel.createdAt);
|
||||
return {
|
||||
enabled: true,
|
||||
providerId: options.providerId,
|
||||
connected: options.isCoreConnected(),
|
||||
proxyUrl,
|
||||
listenHost: options.listenHost,
|
||||
listenPort: options.listenPort,
|
||||
activeTunnels: tunnels.size,
|
||||
pendingTunnels: tunnelList.filter((tunnel) => !tunnel.opened).length,
|
||||
oldestTunnelAgeMs: ages.length > 0 ? Math.max(...ages) : 0,
|
||||
openTimeoutMs: tunnelOpenTimeoutMs,
|
||||
idleTimeoutMs: tunnelIdleTimeoutMs,
|
||||
maxPendingBytes,
|
||||
channel: "provider-gateway",
|
||||
};
|
||||
};
|
||||
|
||||
const clearTunnelTimers = (tunnel: Tunnel): void => {
|
||||
if (tunnel.openTimer !== null) clearTimeout(tunnel.openTimer);
|
||||
if (tunnel.idleTimer !== null) clearTimeout(tunnel.idleTimer);
|
||||
tunnel.openTimer = null;
|
||||
tunnel.idleTimer = null;
|
||||
};
|
||||
|
||||
const destroyTunnel = (tunnel: Tunnel, notifyCore: boolean, error?: string): void => {
|
||||
tunnels.delete(tunnel.id);
|
||||
tunnel.closed = true;
|
||||
clearTunnelTimers(tunnel);
|
||||
tunnel.pending.splice(0);
|
||||
tunnel.pendingBytes = 0;
|
||||
if (!tunnel.client.destroyed) tunnel.client.destroy();
|
||||
if (notifyCore) send({ type: "egress_tcp_close", providerId: options.providerId, connectionId: tunnel.id, at: nowIso() });
|
||||
if (error !== undefined) {
|
||||
options.logger("warn", "egress_proxy_tunnel_closed", {
|
||||
connectionId: tunnel.id,
|
||||
opened: tunnel.opened,
|
||||
ageMs: Date.now() - tunnel.createdAt,
|
||||
error,
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
const closeTunnel = (id: string, error?: string): void => {
|
||||
const tunnel = tunnels.get(id);
|
||||
if (tunnel === undefined) return;
|
||||
tunnels.delete(id);
|
||||
tunnel.closed = true;
|
||||
if (!tunnel.client.destroyed) tunnel.client.destroy();
|
||||
send({ type: "egress_tcp_close", providerId: options.providerId, connectionId: id, at: nowIso() });
|
||||
if (error !== undefined) options.logger("warn", "egress_proxy_tunnel_closed", { connectionId: id, error });
|
||||
destroyTunnel(tunnel, true, error);
|
||||
};
|
||||
|
||||
const refreshTunnelIdle = (tunnel: Tunnel): void => {
|
||||
if (tunnel.closed) return;
|
||||
tunnel.lastActivityAt = Date.now();
|
||||
if (tunnel.idleTimer !== null) clearTimeout(tunnel.idleTimer);
|
||||
tunnel.idleTimer = setTimeout(() => closeTunnel(tunnel.id, "egress proxy idle timeout"), tunnelIdleTimeoutMs);
|
||||
};
|
||||
|
||||
const queuePendingChunk = (tunnel: Tunnel, chunk: Buffer): boolean => {
|
||||
if (tunnel.closed) return false;
|
||||
tunnel.pending.push(chunk);
|
||||
tunnel.pendingBytes += chunk.byteLength;
|
||||
refreshTunnelIdle(tunnel);
|
||||
if (tunnel.pendingBytes <= maxPendingBytes) return true;
|
||||
closeTunnel(tunnel.id, "egress proxy pending buffer exceeded");
|
||||
return false;
|
||||
};
|
||||
|
||||
const handleCoreMessage = (message: EgressFromCoreMessage): boolean => {
|
||||
@@ -144,24 +201,29 @@ export function startProviderEgressProxy(options: ProviderEgressProxyOptions): P
|
||||
if (message.type === "egress_tcp_opened") {
|
||||
if (tunnel === undefined || tunnel.closed) return true;
|
||||
tunnel.opened = true;
|
||||
if (tunnel.openTimer !== null) {
|
||||
clearTimeout(tunnel.openTimer);
|
||||
tunnel.openTimer = null;
|
||||
}
|
||||
refreshTunnelIdle(tunnel);
|
||||
if (tunnel.method === "CONNECT") {
|
||||
tunnel.client.write("HTTP/1.1 200 Connection Established\r\nProxy-Agent: UniDesk-ProviderGateway\r\n\r\n");
|
||||
}
|
||||
for (const chunk of tunnel.pending.splice(0)) {
|
||||
send({ type: "egress_tcp_data", providerId: options.providerId, connectionId: tunnel.id, data: chunk.toString("base64"), encoding: "base64", at: nowIso() });
|
||||
}
|
||||
tunnel.pendingBytes = 0;
|
||||
return true;
|
||||
}
|
||||
if (message.type === "egress_tcp_data") {
|
||||
if (tunnel === undefined || tunnel.client.destroyed) return true;
|
||||
refreshTunnelIdle(tunnel);
|
||||
tunnel.client.write(Buffer.from(message.data, message.encoding === "base64" ? "base64" : "utf8"));
|
||||
return true;
|
||||
}
|
||||
if (message.type === "egress_tcp_close") {
|
||||
if (tunnel !== undefined) {
|
||||
tunnels.delete(message.connectionId);
|
||||
tunnel.closed = true;
|
||||
if (!tunnel.client.destroyed) tunnel.client.destroy();
|
||||
destroyTunnel(tunnel, false, message.error);
|
||||
}
|
||||
if (message.error !== undefined && message.error.length > 0) {
|
||||
options.logger("warn", "egress_proxy_remote_close", { connectionId: message.connectionId, error: message.error });
|
||||
@@ -222,14 +284,28 @@ export function startProviderEgressProxy(options: ProviderEgressProxyOptions): P
|
||||
fail("400 Bad Request", "unsupported proxy target\n");
|
||||
return;
|
||||
}
|
||||
const tunnel: Tunnel = { id, client, method: parsed.method, opened: false, pending: [], closed: false };
|
||||
const createdAt = Date.now();
|
||||
const tunnel: Tunnel = {
|
||||
id,
|
||||
client,
|
||||
method: parsed.method,
|
||||
opened: false,
|
||||
pending: [],
|
||||
pendingBytes: 0,
|
||||
closed: false,
|
||||
createdAt,
|
||||
lastActivityAt: createdAt,
|
||||
openTimer: null,
|
||||
idleTimer: null,
|
||||
};
|
||||
tunnels.set(id, tunnel);
|
||||
client.on("data", (nextChunk) => {
|
||||
const nextBuffer = Buffer.isBuffer(nextChunk) ? nextChunk : Buffer.from(nextChunk);
|
||||
if (!tunnel.opened) {
|
||||
tunnel.pending.push(nextBuffer);
|
||||
queuePendingChunk(tunnel, nextBuffer);
|
||||
return;
|
||||
}
|
||||
refreshTunnelIdle(tunnel);
|
||||
send({ type: "egress_tcp_data", providerId: options.providerId, connectionId: id, data: nextBuffer.toString("base64"), encoding: "base64", at: nowIso() });
|
||||
});
|
||||
client.on("close", () => closeTunnel(id));
|
||||
@@ -238,10 +314,13 @@ export function startProviderEgressProxy(options: ProviderEgressProxyOptions): P
|
||||
if (!opened) {
|
||||
tunnels.delete(id);
|
||||
tunnel.closed = true;
|
||||
clearTunnelTimers(tunnel);
|
||||
fail("503 Service Unavailable", "provider-gateway core channel is not connected\n");
|
||||
return;
|
||||
}
|
||||
if (firstPayload !== null) tunnel.pending.push(firstPayload);
|
||||
tunnel.openTimer = setTimeout(() => closeTunnel(id, "egress proxy open timeout"), tunnelOpenTimeoutMs);
|
||||
refreshTunnelIdle(tunnel);
|
||||
if (firstPayload !== null) queuePendingChunk(tunnel, firstPayload);
|
||||
});
|
||||
client.on("error", () => undefined);
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user