feat: route code queue egress through provider gateway

This commit is contained in:
Codex
2026-05-15 02:51:49 +00:00
parent c206f13216
commit a7e9ecc32f
10 changed files with 602 additions and 6 deletions
+2
View File
@@ -130,6 +130,8 @@ Baidu Netdisk 在 UniDesk 语境中按纯后端服务管理:不得暴露百度
- 代码引用:`https://github.com/pikasTech/unidesk` 与配置中的 `repository.commitId`;服务源码位于 `src/components/microservices/code-queue`,属于 UniDesk 自有控制面组件。
- 部署引用:UniDesk 仓库中的 `src/components/microservices/code-queue/docker-compose.d601.yml`Dockerfile 为 `src/components/microservices/code-queue/Dockerfile`,容器名为 `code-queue-backend`;主 server 根目录 `docker-compose.yml` 不再包含 `code-queue` service。
- 主服务依赖映射:D601 Code Queue 仍以主 PostgreSQL 为权威数据库,`DATABASE_URL` 必须指向主 server 受限端口映射;`OA_EVENT_FLOW_BASE_URL` 必须指向主 server OA Event Flow 受限端口映射;`CODE_QUEUE_NOTIFY_CLAUDEQQ_BASE_URL` 在 D601 上直接使用本机 ClaudeQQ 映射 `http://host.docker.internal:3290`。这些端口映射只服务 D601 运行时,必须用防火墙或等价策略限制来源,不得成为浏览器或任意公网客户端入口。
- 默认出网代理:D601 `code-queue-backend` 必须默认把 `HTTP_PROXY``HTTPS_PROXY``ALL_PROXY` 注入给 Codex/OpenCode、`git``curl``npm` 等任务子进程;代理上游必须是 D601 provider-gateway 暴露在 provider-gateway Docker 网络内的 egress HTTP CONNECT 端口,而不是 Code Queue 自建伪 provider WebSocket 或交互 shell 临时 `export`。Code Queue Compose 必须加入 provider-gateway 网络,并通过 `CODE_QUEUE_EGRESS_PROXY_URL` 指向 `http://unidesk-provider-gateway-D601:18789`provider-gateway 再复用已注册的 provider WebSocket 通道,把 TCP open/data/close 消息转发给主 server backend-core 出网,不依赖 D601 本地直连公网。`NO_PROXY` 必须覆盖 `localhost``127.0.0.1``host.docker.internal`、provider-gateway 容器名、主 server 地址和 UniDesk 内部服务名,避免 PostgreSQL、OA Event Flow、ClaudeQQ、microservice health 等内网链路绕远或递归进入代理;`/health` 必须暴露 egress proxy 的 `enabled``connected``proxyUrl``channel=provider-gateway` 和上游 provider-gateway health,作为 Codex 网络卡死排障的第一证据。
- 出网代理无 fallback 纪律:Code Queue 的运行时配置只允许一个默认出网路径,即 provider-gateway egress proxy;不得在代码中同时保留 Code Queue 自建 WebSocket proxy、临时 shell proxy、D601 本地直连公网、主 server direct HTTP proxy 等隐式分支。Compose 层必须显式设置大小写 `HTTP_PROXY``HTTPS_PROXY``ALL_PROXY``NO_PROXY`,服务启动后再把同一组变量写入 `process.env`,确保 service 自检、Codex/OpenCode app-server、任务 shell、`git``curl``npm` 使用一致路径。任何新增网络 fallback 都必须先进入本参考文档并配套 `/health` 可见状态,否则视为残留旧路径。
- 上线纪律:Code Queue 相关的前端或后端改进必须在同一任务内正式上线并验证公网 frontend 或 live API,不能只停留在源码、构建产物或“后续再上线”。重建 `frontend` 只替换无状态 WebUI 容器,不会触碰 D601 `code-queue-backend`、PostgreSQL 队列或运行中 Codex thread,不能以“可能影响长期任务”为由延迟前端上线;`code-queue-backend` 本身带有 restart-recovery,允许按 D601 Compose 重启/替换,停止、重启或重建后必须从持久化状态恢复运行中和排队任务。修改 Code Queue 自身时不得等待当前 Code Queue task 结束、等待 queue idle 或等待 `0 running` 后才重启;这会等待自己退出形成自锁。应直接执行 D601 `docker compose -f src/components/microservices/code-queue/docker-compose.d601.yml up -d --build --force-recreate code-queue` 或等价 build-first 路径,并用恢复后的 live API 或公网 frontend 证明任务和队列仍可读可继续。
- 更名与灾备恢复:旧版 Codex 队列服务名只允许作为兼容诊断和一次性迁移来源;`code-queue-backend` 容器自身 `/health` 正常但 `microservice health code-queue` 返回 `microservice not found`、或服务目录仍只出现旧服务 ID 时,优先判定为 backend-core 仍加载旧 `MICROSERVICES_JSON`,必须刷新 `.state/docker-compose.env` 并显式重建/重建替换 `backend-core`,随后用 `microservice list` 验证 `id=code-queue``nodeBaseUrl=http://host.docker.internal:4222` 和容器摘要。若更名后 `unidesk_code_queue_*` 为空而历史 `unidesk_codex_queue_*` 表仍有队列数据,恢复前必须先停止 `code-queue-backend`,备份 `.state/code-queue` 与当前 `unidesk_code_queue_*` 表,再把历史本地状态目录合并到 `.state/code-queue/`,并用 `docker exec -i unidesk-database psql ...` 这类保持 stdin 的方式把 `unidesk_codex_queue_tasks``unidesk_codex_queue_queues``unidesk_codex_queue_notifications` 迁移到对应 `unidesk_code_queue_*` 表;不得在确认 `/api/tasks``/api/queues` 和 output archive 可读前删除历史本地状态目录或旧 PostgreSQL 表。迁移完成后只允许在 D601 用 `docker compose -f src/components/microservices/code-queue/docker-compose.d601.yml up -d --build code-queue` 启动目标服务;禁止再通过主 server Compose 启动旧 `code-queue` service。
- Codex 认证:容器只从 D601 的 `/home/ubuntu/.codex/config.toml` 同步 Codex provider 配置到 D601 `.state/code-queue/codex-home`,并通过 D601 `.state/code-queue-d601.env` 透传 `OPENAI_API_KEY``CRS_OAI_KEY` 等 provider 所需变量;这些 provider 环境变量不得写入仓库,必须由 D601 Compose env-file 注入,确保容器重建和重启后不会丢失认证。新增 provider 的 `env_key` 时必须增加同类运行时透传和 Compose env 持久化,禁止把 Codex 或 MiniMax 密钥写入仓库文件。Code Queue 容器必须只读挂载 D601 host 的 SSH 目录到 `/root/.ssh`(默认 `/home/ubuntu/.ssh`),让容器内 `git push``ssh -T git@github.com` 与 host 使用同一套 GitHub SSH key/known_hosts;不得把私钥复制进镜像或仓库。
+10
View File
@@ -90,6 +90,16 @@ provider ingress 是唯一允许公网暴露的 provider 连接接口,当前
超大 JSON 响应可以使用 `jsonArrayLimits` 在 provider-gateway 返回前裁剪指定数组,并在响应体中写入 `_unidesk.arrayLimits` 元数据,便于 UniDesk frontend 预览列表而不展示裸 JSON。长期应优先推动业务后端提供分页 API;裁剪只是 UniDesk 集成层的展示保护。
## Egress Proxy
provider-gateway 可以在节点本地 Docker 网络内提供 egress HTTP CONNECT 代理,用于让 Code Queue、Pipeline runner 等节点侧执行容器通过既有 provider WebSocket 通道出网。代理默认监听容器内 `0.0.0.0:18789`,不需要公网端口;使用方必须加入 provider-gateway 的 Docker network,并把 `HTTP_PROXY``HTTPS_PROXY``ALL_PROXY` 指向 provider-gateway 容器名。代理只负责把本地 CONNECT/absolute HTTP 请求转换为 `egress_tcp_open``egress_tcp_data``egress_tcp_close` 消息;backend-core 在主 server 侧建立真实 TCP 连接并把数据回传,避免 D601 等计算节点本地网络不可达时卡死 Codex/Git/NPM。
该能力属于 provider-gateway 通道能力,register/heartbeat 的 `unideskCapabilities` 必须包含 `network.egress-proxy`labels 必须上报 `providerGatewayEgressProxy*` 状态。不得再为某个用户服务单独注册伪 provider 来实现出网代理;否则节点列表会出现虚假 provider,且代理、统计、升级路径会形成多套通道。代理健康检查使用 `GET /__unidesk/egress-proxy/health`,返回 `connected``providerId``activeTunnels` 和监听端口;业务服务自己的 `/health` 应把该结果作为排障证据透出。
egress proxy 的长期边界是“统一 provider 通道,不引入第二控制面”。backend-core 只接受在线 provider socket 上的 `egress_tcp_*` 消息,并在该 socket 关闭时销毁全部对应 TCP relayprovider-gateway 只维护本地 HTTP proxy 与 WebSocket 消息映射,不保存业务状态,不参与任务调度、统计或节点注册以外的控制面。执行容器、用户服务和 Pipeline runner 不允许直接连接 backend-core provider ingress,也不允许携带 provider token 自行注册;需要出网时只能连接同节点 provider-gateway 暴露在私有 Docker network 内的 proxy endpoint。
故障语义必须显式,不允许静默 fallback。provider-gateway 到 backend-core 的 WebSocket 未连接时,本地 proxy 必须返回 503;执行容器不能自动绕过到 D601 本地直连公网、外部公共代理或主 server 公网 HTTP 端口。`NO_PROXY` 只用于 PostgreSQL、OA Event Flow、ClaudeQQ、frontend/backend-core 内网代理、provider-gateway health 等明确内网链路,不能把 GitHub、模型 API、npm registry 等外部目标加入绕过列表。验收必须同时证明 provider-gateway labels、业务服务 `/health` 和执行容器内 `curl -I https://...` 都走同一 proxy path。
## Gateway Version Metadata
provider-gateway 必须从自身 `package.json` 读取版本号,并在 register 与 heartbeat labels 中上报 `providerGatewayName``providerGatewayVersion``providerGatewayStartedAt``providerGatewayUpgradePolicy`。backend-core 将这些 labels 合并到 `unidesk_nodes.labels`,frontend 在节点清单、资源监控和 `资源节点 / 网关版本` 中展示;旧节点缺少这些字段时只能显示版本未知,不能用猜测值替代。`unideskCapabilities``hostSshConfigured``hostSshKeyPresent``hostSshTarget` 也是 WebUI 运维可用性徽标的数据源,用于直接显示每个计算节点的 SSH 透传可用性与远程更新可用性。
+112
View File
@@ -2,6 +2,7 @@ import type { Server, ServerWebSocket } from "bun";
import { createHash } from "node:crypto";
import { createReadStream } from "node:fs";
import { mkdir, rm, stat } from "node:fs/promises";
import { connect as connectTcp, type Socket } from "node:net";
import { basename, dirname, relative, resolve, posix as pathPosix } from "node:path";
import postgres from "postgres";
import { createHourlyJsonlWriter, logRetentionBytesForService } from "../../shared/src/rotating-jsonl";
@@ -17,7 +18,13 @@ import {
type CoreHostSshOpenMessage,
type CoreHostSshResizeMessage,
type CoreDispatchMessage,
type CoreEgressTcpCloseMessage,
type CoreEgressTcpDataMessage,
type CoreEgressTcpOpenedMessage,
type JsonValue,
type ProviderEgressTcpCloseMessage,
type ProviderEgressTcpDataMessage,
type ProviderEgressTcpOpenMessage,
type ProviderHostSshDataMessage,
type ProviderHostSshErrorMessage,
type ProviderHostSshExitMessage,
@@ -178,6 +185,13 @@ type ScheduleAction = DispatchScheduleAction | PgdataBackupScheduleAction;
type TaskTerminalWaiter = (task: RawTaskRow | null) => void;
interface EgressTcpConnection {
providerId: string;
connectionId: string;
socket: Socket;
provider: ProviderSocket;
}
interface MicroserviceProxyCacheEntry {
expiresAt: number;
staleExpiresAt: number;
@@ -201,6 +215,7 @@ interface MicroserviceAvailabilityEntry {
const recentLogs: unknown[] = [];
const activeProviders = new Map<string, ProviderSocket>();
const activeSshClients = new Map<string, ProviderSocket>();
const activeEgressTcpConnections = new Map<string, EgressTcpConnection>();
const serviceStartedAt = new Date();
const config = readConfig();
const logger = createLogger("backend-core", config.logFile);
@@ -977,6 +992,89 @@ function wsSendJson(ws: ProviderSocket, value: unknown): void {
if (ws.readyState === WebSocket.OPEN) ws.send(JSON.stringify(value));
}
function egressTcpKey(providerId: string, connectionId: string): string {
return `${providerId}:${connectionId}`;
}
function isValidEgressHost(host: string): boolean {
return host.length > 0 && host.length <= 253 && !/[\s/\\:@\0]/u.test(host);
}
function isValidEgressPort(port: number): boolean {
return Number.isInteger(port) && port > 0 && port <= 65_535;
}
function sendEgressClose(provider: ProviderSocket, connectionId: string, error?: string): void {
const message: CoreEgressTcpCloseMessage = error === undefined
? { type: "egress_tcp_close", connectionId }
: { type: "egress_tcp_close", connectionId, error };
wsSendJson(provider, message);
}
function closeEgressTcpConnection(providerId: string, connectionId: string, error?: string): void {
const key = egressTcpKey(providerId, connectionId);
const connection = activeEgressTcpConnections.get(key);
if (connection === undefined) return;
activeEgressTcpConnections.delete(key);
connection.socket.destroy();
if (error !== undefined) sendEgressClose(connection.provider, connectionId, error);
}
function handleEgressTcpOpen(ws: ProviderSocket, message: ProviderEgressTcpOpenMessage): void {
const host = message.host.trim();
const port = Number(message.port);
if (!isValidEgressHost(host) || !isValidEgressPort(port)) {
sendEgressClose(ws, message.connectionId, "invalid egress target");
return;
}
const key = egressTcpKey(message.providerId, message.connectionId);
closeEgressTcpConnection(message.providerId, message.connectionId);
const socket = connectTcp({ host, port });
const connection: EgressTcpConnection = { providerId: message.providerId, connectionId: message.connectionId, socket, provider: ws };
activeEgressTcpConnections.set(key, connection);
socket.on("connect", () => {
const opened: CoreEgressTcpOpenedMessage = { type: "egress_tcp_opened", connectionId: message.connectionId };
wsSendJson(ws, opened);
});
socket.on("data", (chunk) => {
const data: CoreEgressTcpDataMessage = {
type: "egress_tcp_data",
connectionId: message.connectionId,
data: chunk.toString("base64"),
encoding: "base64",
};
wsSendJson(ws, data);
});
socket.on("close", () => {
if (activeEgressTcpConnections.get(key) !== connection) return;
activeEgressTcpConnections.delete(key);
sendEgressClose(ws, message.connectionId);
});
socket.on("error", (error) => {
if (activeEgressTcpConnections.get(key) !== connection) return;
activeEgressTcpConnections.delete(key);
sendEgressClose(ws, message.connectionId, error.message);
});
}
function handleEgressTcpData(message: ProviderEgressTcpDataMessage): void {
const connection = activeEgressTcpConnections.get(egressTcpKey(message.providerId, message.connectionId));
if (connection === undefined) return;
connection.socket.write(Buffer.from(message.data, message.encoding === "base64" ? "base64" : "utf8"));
}
function handleEgressTcpClose(message: ProviderEgressTcpCloseMessage): void {
closeEgressTcpConnection(message.providerId, message.connectionId);
}
function closeEgressTcpConnectionsForProvider(providerId: string): void {
for (const [key, connection] of activeEgressTcpConnections) {
if (connection.providerId !== providerId) continue;
activeEgressTcpConnections.delete(key);
connection.socket.destroy();
}
}
function sshClientFor(sessionId: string): ProviderSocket | null {
return activeSshClients.get(sessionId) ?? null;
}
@@ -1031,6 +1129,19 @@ async function handleProviderMessage(ws: ProviderSocket, raw: string | Buffer):
return;
}
if (message.type === "egress_tcp_open") {
handleEgressTcpOpen(ws, message);
return;
}
if (message.type === "egress_tcp_data") {
handleEgressTcpData(message);
return;
}
if (message.type === "egress_tcp_close") {
handleEgressTcpClose(message);
return;
}
if (message.type === "register") {
const labels = { ...message.labels, unideskCapabilities: message.capabilities };
await upsertNodeOnline(message.providerId, message.name, labels);
@@ -3499,6 +3610,7 @@ const providerServer = Bun.serve<WsData>({
const providerId = ws.data.providerId;
logger("warn", "provider_socket_close", { providerId: providerId ?? null });
if (providerId !== undefined) {
closeEgressTcpConnectionsForProvider(providerId);
if (activeProviders.get(providerId) !== ws) {
logger("info", "provider_socket_close_ignored_replaced", { providerId });
return;
@@ -40,6 +40,17 @@ services:
CODE_QUEUE_DEV_CONTAINER_DEFAULT_PROVIDER_ID: "${CODE_QUEUE_DEV_CONTAINER_DEFAULT_PROVIDER_ID:-D601}"
CODE_QUEUE_DEV_CONTAINER_IMAGE: "${CODE_QUEUE_DEV_CONTAINER_IMAGE:-}"
CODE_QUEUE_DEV_CONTAINER_WORKDIR: "${CODE_QUEUE_DEV_CONTAINER_WORKDIR:-/home/ubuntu}"
CODE_QUEUE_EGRESS_PROXY_ENABLED: "${CODE_QUEUE_EGRESS_PROXY_ENABLED:-true}"
CODE_QUEUE_EGRESS_PROXY_URL: "${CODE_QUEUE_EGRESS_PROXY_URL:-http://unidesk-provider-gateway-D601:18789}"
CODE_QUEUE_EGRESS_PROXY_NO_PROXY: "${CODE_QUEUE_EGRESS_PROXY_NO_PROXY:-localhost,127.0.0.1,::1,host.docker.internal,unidesk-provider-gateway-D601,74.48.78.17,backend-core,oa-event-flow,database}"
HTTP_PROXY: "${CODE_QUEUE_EGRESS_PROXY_URL:-http://unidesk-provider-gateway-D601:18789}"
HTTPS_PROXY: "${CODE_QUEUE_EGRESS_PROXY_URL:-http://unidesk-provider-gateway-D601:18789}"
ALL_PROXY: "${CODE_QUEUE_EGRESS_PROXY_URL:-http://unidesk-provider-gateway-D601:18789}"
http_proxy: "${CODE_QUEUE_EGRESS_PROXY_URL:-http://unidesk-provider-gateway-D601:18789}"
https_proxy: "${CODE_QUEUE_EGRESS_PROXY_URL:-http://unidesk-provider-gateway-D601:18789}"
all_proxy: "${CODE_QUEUE_EGRESS_PROXY_URL:-http://unidesk-provider-gateway-D601:18789}"
NO_PROXY: "${CODE_QUEUE_EGRESS_PROXY_NO_PROXY:-localhost,127.0.0.1,::1,host.docker.internal,unidesk-provider-gateway-D601,74.48.78.17,backend-core,oa-event-flow,database}"
no_proxy: "${CODE_QUEUE_EGRESS_PROXY_NO_PROXY:-localhost,127.0.0.1,::1,host.docker.internal,unidesk-provider-gateway-D601,74.48.78.17,backend-core,oa-event-flow,database}"
CODE_QUEUE_WINDOWS_NATIVE_CODEX_DEFAULT_WORKDIR: "${CODE_QUEUE_WINDOWS_NATIVE_CODEX_DEFAULT_WORKDIR:-/mnt/f/Work/ConStart}"
CODE_QUEUE_WINDOWS_NATIVE_CODEX_BRIDGE_DIR: "${CODE_QUEUE_WINDOWS_NATIVE_CODEX_BRIDGE_DIR:-/home/ubuntu/.unidesk/code-queue/windows-native-codex}"
CODE_QUEUE_WINDOWS_NATIVE_CODEX_COMMAND: "${CODE_QUEUE_WINDOWS_NATIVE_CODEX_COMMAND:-codex app-server --listen stdio://}"
@@ -66,8 +77,16 @@ services:
- ${CODE_QUEUE_STATE_DIR:-../../../../.state/code-queue}:/var/lib/unidesk/code-queue
extra_hosts:
- "host.docker.internal:host-gateway"
networks:
- default
- provider-gateway
healthcheck:
test: ["CMD-SHELL", "curl -fsS --max-time 2 http://127.0.0.1:4222/health >/dev/null"]
interval: 5s
timeout: 3s
retries: 20
networks:
provider-gateway:
external: true
name: ${CODE_QUEUE_PROVIDER_GATEWAY_NETWORK:-unidesk-provider-d601_default}
@@ -169,6 +169,18 @@ const config = readConfig();
const logger = createLogger("code-queue", config.logFile);
configureOaEvents({ baseUrl: config.oaEventFlowBaseUrl, logger, nowIso });
const providerGatewayEgressProxy = configureProviderGatewayEgressProxy();
if (providerGatewayEgressProxy.enabled) {
const proxyEnv = providerGatewayEgressProxy.proxyUrl;
process.env.HTTP_PROXY = proxyEnv;
process.env.HTTPS_PROXY = proxyEnv;
process.env.http_proxy = proxyEnv;
process.env.https_proxy = proxyEnv;
process.env.ALL_PROXY = proxyEnv;
process.env.all_proxy = proxyEnv;
process.env.NO_PROXY = providerGatewayEgressProxy.noProxy;
process.env.no_proxy = providerGatewayEgressProxy.noProxy;
}
const state = emptyState();
let processing = false;
const processingQueues = new Set<string>();
@@ -283,6 +295,9 @@ function readConfig(): RuntimeConfig {
const mainProviderId = envString("CODE_QUEUE_MAIN_PROVIDER_ID", "D601");
const devContainerDefaultProviderId = envString("CODE_QUEUE_DEV_CONTAINER_DEFAULT_PROVIDER_ID", "D601");
const remoteDefaultWorkdir = envString("CODE_QUEUE_REMOTE_WORKDIR", "/home/ubuntu");
const defaultWorkdir = envString("CODE_QUEUE_WORKDIR", "/workspace");
const devContainerMasterHost = envString("CODE_QUEUE_DEV_CONTAINER_MASTER_HOST", "74.48.78.17");
const defaultProviderGatewayProxyHost = `unidesk-provider-gateway-${mainProviderId}`;
const executionProviderIds = Array.from(new Set([
mainProviderId,
...envList("CODE_QUEUE_EXECUTION_PROVIDER_IDS", [devContainerDefaultProviderId]),
@@ -293,7 +308,7 @@ function readConfig(): RuntimeConfig {
dataDir,
outputArchiveDir: envString("CODE_QUEUE_OUTPUT_ARCHIVE_DIR", resolve(dataDir, "output-archive")),
logFile: envString("LOG_FILE", "/var/log/unidesk/code-queue.jsonl"),
defaultWorkdir: envString("CODE_QUEUE_WORKDIR", "/workspace"),
defaultWorkdir,
mainProviderId,
remoteDefaultWorkdir,
executionProviderIds,
@@ -339,7 +354,21 @@ function readConfig(): RuntimeConfig {
maxInMemoryOutputRecords: envNonNegativeNumber("CODE_QUEUE_IN_MEMORY_OUTPUT_RECORDS", 10),
maxInMemoryEventRecords: envNonNegativeNumber("CODE_QUEUE_IN_MEMORY_EVENT_RECORDS", 10),
maxActiveQueues: Math.max(0, Math.min(32, envNumber("CODE_QUEUE_MAX_ACTIVE_QUEUES", 0))),
devContainerMasterHost: envString("CODE_QUEUE_DEV_CONTAINER_MASTER_HOST", "74.48.78.17"),
egressProxyEnabled: envBool("CODE_QUEUE_EGRESS_PROXY_ENABLED", true),
egressProxyUrl: envString("CODE_QUEUE_EGRESS_PROXY_URL", `http://${defaultProviderGatewayProxyHost}:18789`).replace(/\/+$/u, ""),
egressProxyNoProxy: envString("CODE_QUEUE_EGRESS_PROXY_NO_PROXY", [
"localhost",
"127.0.0.1",
"::1",
"host.docker.internal",
defaultProviderGatewayProxyHost,
devContainerMasterHost,
"74.48.78.17",
"backend-core",
"oa-event-flow",
"database",
].join(",")),
devContainerMasterHost,
devContainerDefaultProviderId,
devContainerImage: envString("CODE_QUEUE_DEV_CONTAINER_IMAGE", ""),
devContainerWorkdir: envString("CODE_QUEUE_DEV_CONTAINER_WORKDIR", remoteDefaultWorkdir),
@@ -351,6 +380,48 @@ function readConfig(): RuntimeConfig {
};
}
function configureProviderGatewayEgressProxy(): { enabled: boolean; proxyUrl: string; noProxy: string; channel: string } {
if (!config.egressProxyEnabled) {
return { enabled: false, proxyUrl: "", noProxy: config.egressProxyNoProxy, channel: "provider-gateway" };
}
return {
enabled: true,
proxyUrl: config.egressProxyUrl,
noProxy: config.egressProxyNoProxy,
channel: "provider-gateway",
};
}
async function providerGatewayEgressProxyStatus(): Promise<JsonValue> {
if (!providerGatewayEgressProxy.enabled) return { enabled: false, channel: "provider-gateway" };
const base = {
enabled: true,
channel: providerGatewayEgressProxy.channel,
proxyUrl: providerGatewayEgressProxy.proxyUrl,
noProxy: providerGatewayEgressProxy.noProxy,
};
const controller = new AbortController();
const timer = setTimeout(() => controller.abort(), 600);
try {
const url = new URL(providerGatewayEgressProxy.proxyUrl);
url.pathname = "/__unidesk/egress-proxy/health";
url.search = "";
const response = await fetch(url, { signal: controller.signal });
const bodyText = await response.text();
let upstream: JsonValue = bodyText.slice(0, 1000);
try {
upstream = JSON.parse(bodyText) as JsonValue;
} catch {
// Keep the bounded body text as evidence if the proxy returned a non-JSON failure page.
}
return { ...base, connected: response.ok, status: response.status, upstream };
} catch (error) {
return { ...base, connected: false, error: error instanceof Error ? error.message : String(error) };
} finally {
clearTimeout(timer);
}
}
function createLogger(service: string, logFile: string) {
const writer = createHourlyJsonlWriter({
baseLogFile: logFile,
@@ -3689,7 +3760,13 @@ async function route(req: Request): Promise<Response> {
const url = new URL(req.url);
if (req.method === "OPTIONS") return jsonResponse({ ok: true });
try {
if (url.pathname === "/" || url.pathname === "/health") return jsonResponse({ ok: true, service: "code-queue", queue: await queueSummaryForHealth(false), startedAt: serviceStartedAt });
if (url.pathname === "/" || url.pathname === "/health") return jsonResponse({
ok: true,
service: "code-queue",
queue: await queueSummaryForHealth(false),
egressProxy: await providerGatewayEgressProxyStatus(),
startedAt: serviceStartedAt,
});
if (url.pathname === "/logs") return jsonResponse({ ok: true, logs: recentLogs.slice(-parseLimit(url)) });
if (url.pathname === "/api/events" && req.method === "GET") return jsonResponse({ ok: false, error: "Code Queue private SSE was removed; subscribe to oa-event-flow /api/events/stream with service:code-queue tags." }, 410);
if (url.pathname === "/api/dev-ready" && req.method === "GET") return jsonResponse({ ok: true, devReady: collectDevReady() });
@@ -80,6 +80,9 @@ export interface RuntimeConfig {
maxInMemoryOutputRecords: number;
maxInMemoryEventRecords: number;
maxActiveQueues: number;
egressProxyEnabled: boolean;
egressProxyUrl: string;
egressProxyNoProxy: string;
devContainerMasterHost: string;
devContainerDefaultProviderId: string;
devContainerImage: string;
+1 -1
View File
@@ -1,6 +1,6 @@
{
"name": "@unidesk/provider-gateway",
"version": "0.2.17",
"version": "0.2.18",
"private": true,
"type": "module",
"scripts": {
@@ -0,0 +1,268 @@
import { createServer, type Server, type Socket } from "node:net";
import type {
CoreEgressTcpCloseMessage,
CoreEgressTcpDataMessage,
CoreEgressTcpOpenedMessage,
JsonValue,
ProviderEgressTcpCloseMessage,
ProviderEgressTcpDataMessage,
ProviderEgressTcpOpenMessage,
} from "../../shared/src/index";
type Logger = (level: "debug" | "info" | "warn" | "error", message: string, data?: JsonValue) => void;
type EgressToCoreMessage = ProviderEgressTcpOpenMessage | ProviderEgressTcpDataMessage | ProviderEgressTcpCloseMessage;
type EgressFromCoreMessage = CoreEgressTcpOpenedMessage | CoreEgressTcpDataMessage | CoreEgressTcpCloseMessage;
interface ProviderEgressProxyOptions {
providerId: string;
listenHost: string;
listenPort: number;
sendToCore: (message: EgressToCoreMessage) => boolean;
isCoreConnected: () => boolean;
logger: Logger;
}
interface Tunnel {
id: string;
client: Socket;
method: string;
opened: boolean;
pending: Buffer[];
closed: boolean;
}
export interface ProviderEgressProxyHandle {
status: () => Record<string, JsonValue>;
handleCoreMessage: (message: EgressFromCoreMessage) => boolean;
closeAll: (error?: string) => void;
close: () => void;
}
function nowIso(): string {
return new Date().toISOString();
}
function safeConnectionId(): string {
return `egress_${Date.now()}_${Math.random().toString(16).slice(2)}`;
}
function parseConnectTarget(target: string): { host: string; port: number } | null {
const trimmed = target.trim();
if (trimmed.startsWith("[")) {
const end = trimmed.indexOf("]");
if (end <= 1 || trimmed[end + 1] !== ":") return null;
const port = Number(trimmed.slice(end + 2));
return Number.isInteger(port) ? { host: trimmed.slice(1, end), port } : null;
}
const index = trimmed.lastIndexOf(":");
if (index <= 0) return null;
const port = Number(trimmed.slice(index + 1));
return Number.isInteger(port) ? { host: trimmed.slice(0, index), port } : null;
}
function parseHttpProxyRequest(headerText: string): { method: string; target: string; version: string } | null {
const line = headerText.split(/\r?\n/u)[0] ?? "";
const match = /^([A-Z]+)\s+(\S+)\s+(HTTP\/\d(?:\.\d)?)$/u.exec(line);
if (match === null) return null;
return { method: match[1] ?? "", target: match[2] ?? "", version: match[3] ?? "HTTP/1.1" };
}
function httpRequestTarget(rawTarget: string): { host: string; port: number; rewrittenTarget: string } | null {
try {
const url = new URL(rawTarget);
if (url.protocol !== "http:") return null;
return {
host: url.hostname,
port: url.port.length > 0 ? Number(url.port) : 80,
rewrittenTarget: `${url.pathname}${url.search}`,
};
} catch {
return null;
}
}
function rewriteAbsoluteHttpRequest(header: Buffer, rewrittenTarget: string): Buffer {
const text = header.toString("latin1");
const lines = text.split(/\r\n/u);
const first = lines[0] ?? "";
const match = /^([A-Z]+)\s+\S+\s+(HTTP\/\d(?:\.\d)?)$/u.exec(first);
if (match !== null) lines[0] = `${match[1]} ${rewrittenTarget} ${match[2]}`;
return Buffer.from(lines.join("\r\n"), "latin1");
}
function response(status: string, body: string, contentType = "text/plain; charset=utf-8"): string {
return [
`HTTP/1.1 ${status}`,
`Content-Type: ${contentType}`,
"Cache-Control: no-store",
"Connection: close",
`Content-Length: ${Buffer.byteLength(body)}`,
"",
body,
].join("\r\n");
}
function jsonResponse(value: JsonValue): string {
return response("200 OK", `${JSON.stringify(value)}\n`, "application/json; charset=utf-8");
}
function proxyUrlFor(host: string, port: number): string {
return `http://${host}:${port}`;
}
export function startProviderEgressProxy(options: ProviderEgressProxyOptions): ProviderEgressProxyHandle {
const proxyUrl = proxyUrlFor(options.listenHost, options.listenPort);
const tunnels = new Map<string, Tunnel>();
let closed = false;
const send = (message: EgressToCoreMessage): boolean => options.sendToCore(message);
const status = (): Record<string, JsonValue> => ({
enabled: true,
providerId: options.providerId,
connected: options.isCoreConnected(),
proxyUrl,
listenHost: options.listenHost,
listenPort: options.listenPort,
activeTunnels: tunnels.size,
channel: "provider-gateway",
});
const closeTunnel = (id: string, error?: string): void => {
const tunnel = tunnels.get(id);
if (tunnel === undefined) return;
tunnels.delete(id);
tunnel.closed = true;
if (!tunnel.client.destroyed) tunnel.client.destroy();
send({ type: "egress_tcp_close", providerId: options.providerId, connectionId: id, at: nowIso() });
if (error !== undefined) options.logger("warn", "egress_proxy_tunnel_closed", { connectionId: id, error });
};
const handleCoreMessage = (message: EgressFromCoreMessage): boolean => {
const tunnel = tunnels.get(message.connectionId);
if (message.type === "egress_tcp_opened") {
if (tunnel === undefined || tunnel.closed) return true;
tunnel.opened = true;
if (tunnel.method === "CONNECT") {
tunnel.client.write("HTTP/1.1 200 Connection Established\r\nProxy-Agent: UniDesk-ProviderGateway\r\n\r\n");
}
for (const chunk of tunnel.pending.splice(0)) {
send({ type: "egress_tcp_data", providerId: options.providerId, connectionId: tunnel.id, data: chunk.toString("base64"), encoding: "base64", at: nowIso() });
}
return true;
}
if (message.type === "egress_tcp_data") {
if (tunnel === undefined || tunnel.client.destroyed) return true;
tunnel.client.write(Buffer.from(message.data, message.encoding === "base64" ? "base64" : "utf8"));
return true;
}
if (message.type === "egress_tcp_close") {
if (tunnel !== undefined) {
tunnels.delete(message.connectionId);
tunnel.closed = true;
if (!tunnel.client.destroyed) tunnel.client.destroy();
}
if (message.error !== undefined && message.error.length > 0) {
options.logger("warn", "egress_proxy_remote_close", { connectionId: message.connectionId, error: message.error });
}
return true;
}
return false;
};
const server: Server = createServer((client) => {
let header = Buffer.alloc(0);
let settled = false;
const fail = (statusCode: string, text: string): void => {
if (!client.destroyed) client.end(response(statusCode, text));
};
client.on("data", (chunk) => {
const buffer = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk);
if (settled) return;
header = Buffer.concat([header, buffer]);
const headerEnd = header.indexOf("\r\n\r\n");
if (headerEnd < 0) {
if (header.length > 64 * 1024) {
settled = true;
fail("431 Request Header Fields Too Large", "proxy request header too large\n");
}
return;
}
settled = true;
const head = header.subarray(0, headerEnd + 4);
const rest = header.subarray(headerEnd + 4);
const parsed = parseHttpProxyRequest(head.toString("latin1"));
if (parsed === null) {
fail("400 Bad Request", "invalid proxy request\n");
return;
}
if (parsed.method === "GET" && (parsed.target === "/health" || parsed.target === "/__unidesk/egress-proxy/health")) {
client.end(jsonResponse({ ok: true, service: "provider-gateway-egress-proxy", ...status() }));
return;
}
if (!options.isCoreConnected()) {
fail("503 Service Unavailable", "provider-gateway core channel is not connected\n");
return;
}
const id = safeConnectionId();
let firstPayload: Buffer | null = null;
let target: { host: string; port: number } | null = null;
if (parsed.method === "CONNECT") {
target = parseConnectTarget(parsed.target);
firstPayload = rest.length > 0 ? rest : null;
} else {
const httpTarget = httpRequestTarget(parsed.target);
if (httpTarget !== null) {
target = { host: httpTarget.host, port: httpTarget.port };
firstPayload = Buffer.concat([rewriteAbsoluteHttpRequest(head, httpTarget.rewrittenTarget), rest]);
}
}
if (target === null || target.port <= 0 || target.port > 65_535) {
fail("400 Bad Request", "unsupported proxy target\n");
return;
}
const tunnel: Tunnel = { id, client, method: parsed.method, opened: false, pending: [], closed: false };
tunnels.set(id, tunnel);
client.on("data", (nextChunk) => {
const nextBuffer = Buffer.isBuffer(nextChunk) ? nextChunk : Buffer.from(nextChunk);
if (!tunnel.opened) {
tunnel.pending.push(nextBuffer);
return;
}
send({ type: "egress_tcp_data", providerId: options.providerId, connectionId: id, data: nextBuffer.toString("base64"), encoding: "base64", at: nowIso() });
});
client.on("close", () => closeTunnel(id));
client.on("error", (error) => closeTunnel(id, error.message));
const opened = send({ type: "egress_tcp_open", providerId: options.providerId, connectionId: id, host: target.host, port: target.port, at: nowIso() });
if (!opened) {
tunnels.delete(id);
tunnel.closed = true;
fail("503 Service Unavailable", "provider-gateway core channel is not connected\n");
return;
}
if (firstPayload !== null) tunnel.pending.push(firstPayload);
});
client.on("error", () => undefined);
});
server.listen(options.listenPort, options.listenHost, () => {
options.logger("info", "egress_proxy_listening", status());
});
server.on("error", (error) => {
options.logger("error", "egress_proxy_listen_failed", { proxyUrl, error: error.message });
});
return {
status,
handleCoreMessage,
closeAll: (error?: string) => {
for (const id of Array.from(tunnels.keys())) closeTunnel(id, error);
},
close: () => {
closed = true;
for (const id of Array.from(tunnels.keys())) closeTunnel(id, "egress proxy stopping");
server.close();
},
};
}
@@ -1,6 +1,9 @@
import { spawnSync } from "node:child_process";
import { existsSync, readFileSync, readdirSync } from "node:fs";
import {
type CoreEgressTcpCloseMessage,
type CoreEgressTcpDataMessage,
type CoreEgressTcpOpenedMessage,
type CoreDispatchMessage,
type CoreHostSshCloseMessage,
type CoreHostSshEofMessage,
@@ -20,6 +23,7 @@ import {
parseJsonObject,
} from "../../shared/src/index";
import { createHourlyJsonlWriter, logRetentionBytesForService } from "../../shared/src/rotating-jsonl";
import { startProviderEgressProxy, type ProviderEgressProxyHandle } from "./egress-proxy";
interface RuntimeConfig {
serverUrl: string;
@@ -45,6 +49,9 @@ interface RuntimeConfig {
hostSshKey: string | null;
hostRemoteCwd: string | null;
hostLoginShell: string | null;
egressProxyEnabled: boolean;
egressProxyListenHost: string;
egressProxyPort: number;
logFile: string;
}
@@ -114,6 +121,7 @@ interface DockerContainerInspectDetails {
const hostSshSessions = new Map<string, HostSshSession>();
const microserviceHttpCache = new Map<string, MicroserviceHttpCacheEntry>();
const microserviceHttpInFlight = new Map<string, Promise<JsonValue>>();
let providerEgressProxy: ProviderEgressProxyHandle | null = null;
const gatewayMetadata = readGatewayMetadata();
const defaultMasterServer = "http://74.48.78.17/";
const defaultProviderToken = "unidesk-dev-token-change-me";
@@ -196,6 +204,15 @@ function readOptionalNumberEnv(name: string, ...aliases: string[]): number | nul
return parsed;
}
function readBooleanEnv(name: string, fallback: boolean, ...aliases: string[]): boolean {
const raw = readEnv(name, ...aliases);
if (raw === null || raw.trim().length === 0) return fallback;
const value = raw.trim().toLowerCase();
if (value === "1" || value === "true" || value === "yes" || value === "on") return true;
if (value === "0" || value === "false" || value === "no" || value === "off") return false;
return fallback;
}
function providerServerUrlFromMaster(rawMaster: string): string {
const raw = rawMaster.trim() || defaultMasterServer;
const withScheme = /^[a-z][a-z0-9+.-]*:\/\//iu.test(raw) ? raw : `http://${raw}`;
@@ -462,6 +479,9 @@ function readConfig(): RuntimeConfig {
hostSshKey: readOptionalStringEnv("HOST_SSH_KEY") ?? (mountedHostSshKey ? defaultHostSshKey : null),
hostRemoteCwd: readOptionalStringEnv("HOST_REMOTE_CWD") ?? defaultHostRemoteCwd(hostSshUser),
hostLoginShell: readOptionalStringEnv("HOST_LOGIN_SHELL") ?? (hostSshUser === null ? null : "/bin/bash"),
egressProxyEnabled: readBooleanEnv("PROVIDER_EGRESS_PROXY_ENABLED", true, "UNIDESK_PROVIDER_EGRESS_PROXY_ENABLED"),
egressProxyListenHost: readOptionalStringEnv("PROVIDER_EGRESS_PROXY_LISTEN_HOST", "UNIDESK_PROVIDER_EGRESS_PROXY_LISTEN_HOST") ?? "0.0.0.0",
egressProxyPort: readNumberEnv("PROVIDER_EGRESS_PROXY_PORT", 18789, "UNIDESK_PROVIDER_EGRESS_PROXY_PORT"),
logFile: readOptionalStringEnv("LOG_FILE") ?? `/var/log/unidesk/provider-gateway-${providerId}.jsonl`,
};
}
@@ -497,6 +517,7 @@ function withToken(rawUrl: string, token: string): string {
function currentLabels(): ProviderLabels {
const hostSshConfigured = isHostSshConfigured();
const containerGuard = refreshSelfContainerGuard();
const egressProxyStatus = providerEgressProxy?.status() ?? null;
return {
...config.labels,
dockerSocketPresent: existsSync(config.dockerSocketPath),
@@ -509,6 +530,10 @@ function currentLabels(): ProviderLabels {
providerGatewayStartedAt: startedAt.toISOString(),
providerGatewayUpgradePolicy: "always-enabled",
providerGatewayMicroserviceHttpCache: true,
providerGatewayEgressProxy: config.egressProxyEnabled,
providerGatewayEgressProxyPort: config.egressProxyPort,
providerGatewayEgressProxyConnected: egressProxyStatus === null ? false : egressProxyStatus.connected === true,
providerGatewayEgressProxyActiveTunnels: egressProxyStatus === null ? 0 : egressProxyStatus.activeTunnels ?? 0,
providerGatewayDockerRestartGuard: true,
providerGatewayContainerId: containerGuard.containerId,
providerGatewayContainerName: containerGuard.containerName,
@@ -529,9 +554,16 @@ function sendJson(value: unknown): void {
socket.send(JSON.stringify(value));
}
function sendJsonOk(value: unknown): boolean {
if (!socket || socket.readyState !== WebSocket.OPEN) return false;
socket.send(JSON.stringify(value));
return true;
}
function sendRegister(): void {
const capabilities = ["heartbeat", "system.status", "docker.status", "docker.ps", "provider.upgrade", "microservice.http", "microservice.http.cache", "echo"];
if (isHostSshConfigured()) capabilities.push("host.ssh");
if (config.egressProxyEnabled) capabilities.push("network.egress-proxy");
sendJson({
type: "register",
providerId: config.providerId,
@@ -2043,6 +2075,14 @@ function handleMessage(raw: MessageEvent<string>): void {
closeHostSshSession(parsed as CoreHostSshCloseMessage);
return;
}
if (
parsed.type === "egress_tcp_opened" ||
parsed.type === "egress_tcp_data" ||
parsed.type === "egress_tcp_close"
) {
providerEgressProxy?.handleCoreMessage(parsed as CoreEgressTcpOpenedMessage | CoreEgressTcpDataMessage | CoreEgressTcpCloseMessage);
return;
}
logger("debug", "core_message", parsed as JsonValue);
} catch (error) {
logger("error", "core_message_parse_failed", { error: String(error) });
@@ -2135,6 +2175,7 @@ function connect(): void {
socket.addEventListener("close", (event) => {
logger("warn", "connect_close", { code: event.code, reason: event.reason });
clearConnectionTimers();
providerEgressProxy?.closeAll("provider-gateway core socket closed");
scheduleReconnect();
});
socket.addEventListener("error", () => {
@@ -2153,8 +2194,20 @@ process.on("SIGTERM", () => {
session.proc.kill("SIGTERM");
}
hostSshSessions.clear();
providerEgressProxy?.close();
socket?.close(1000, "provider shutdown");
process.exit(0);
});
if (config.egressProxyEnabled) {
providerEgressProxy = startProviderEgressProxy({
providerId: config.providerId,
listenHost: config.egressProxyListenHost,
listenPort: config.egressProxyPort,
isCoreConnected: () => socket?.readyState === WebSocket.OPEN,
sendToCore: (message) => sendJsonOk(message),
logger,
});
}
connect();
+54 -2
View File
@@ -219,6 +219,49 @@ export interface ProviderHostSshErrorMessage {
at: string;
}
export interface ProviderEgressTcpOpenMessage {
type: "egress_tcp_open";
providerId: string;
connectionId: string;
host: string;
port: number;
at: string;
}
export interface ProviderEgressTcpDataMessage {
type: "egress_tcp_data";
providerId: string;
connectionId: string;
data: string;
encoding: "base64";
at: string;
}
export interface ProviderEgressTcpCloseMessage {
type: "egress_tcp_close";
providerId: string;
connectionId: string;
at: string;
}
export interface CoreEgressTcpOpenedMessage {
type: "egress_tcp_opened";
connectionId: string;
}
export interface CoreEgressTcpDataMessage {
type: "egress_tcp_data";
connectionId: string;
data: string;
encoding: "base64";
}
export interface CoreEgressTcpCloseMessage {
type: "egress_tcp_close";
connectionId: string;
error?: string;
}
export type ProviderToCoreMessage =
| ProviderRegisterMessage
| ProviderHeartbeatMessage
@@ -228,7 +271,10 @@ export type ProviderToCoreMessage =
| ProviderHostSshOpenedMessage
| ProviderHostSshDataMessage
| ProviderHostSshExitMessage
| ProviderHostSshErrorMessage;
| ProviderHostSshErrorMessage
| ProviderEgressTcpOpenMessage
| ProviderEgressTcpDataMessage
| ProviderEgressTcpCloseMessage;
export type CoreToProviderMessage =
| CoreDispatchMessage
@@ -237,6 +283,9 @@ export type CoreToProviderMessage =
| CoreHostSshResizeMessage
| CoreHostSshCloseMessage
| CoreHostSshEofMessage
| CoreEgressTcpOpenedMessage
| CoreEgressTcpDataMessage
| CoreEgressTcpCloseMessage
| CoreAcknowledgeMessage;
export interface ApiNode {
@@ -317,7 +366,10 @@ export function isProviderToCoreMessage(value: unknown): value is ProviderToCore
msg.type === "host_ssh_opened" ||
msg.type === "host_ssh_data" ||
msg.type === "host_ssh_exit" ||
msg.type === "host_ssh_error"
msg.type === "host_ssh_error" ||
msg.type === "egress_tcp_open" ||
msg.type === "egress_tcp_data" ||
msg.type === "egress_tcp_close"
) &&
typeof msg.providerId === "string" &&
msg.providerId.length > 0