diff --git a/docs/reference/microservices.md b/docs/reference/microservices.md index 7ee4aa4e..d4408cb6 100644 --- a/docs/reference/microservices.md +++ b/docs/reference/microservices.md @@ -132,7 +132,7 @@ Baidu Netdisk 在 UniDesk 语境中按纯后端服务管理:不得暴露百度 - 主服务依赖映射:D601 Code Queue 仍以主 PostgreSQL 为权威数据库,`DATABASE_URL` 必须指向主 server 受限端口映射;`OA_EVENT_FLOW_BASE_URL` 必须指向主 server OA Event Flow 受限端口映射;`CODE_QUEUE_NOTIFY_CLAUDEQQ_BASE_URL` 在 D601 上直接使用本机 ClaudeQQ 映射 `http://host.docker.internal:3290`。这些端口映射只服务 D601 运行时,必须用防火墙或等价策略限制来源,不得成为浏览器或任意公网客户端入口。 - 默认出网代理:D601 `code-queue-backend` 必须默认把 `HTTP_PROXY`、`HTTPS_PROXY` 和 `ALL_PROXY` 注入给 Codex/OpenCode、`git`、`curl`、`npm` 等任务子进程;代理上游必须是 D601 provider-gateway 暴露在 provider-gateway Docker 网络内的 egress HTTP CONNECT 端口,而不是 Code Queue 自建伪 provider WebSocket 或交互 shell 临时 `export`。Code Queue Compose 必须加入 provider-gateway 网络,并通过 `CODE_QUEUE_EGRESS_PROXY_URL` 指向 `http://unidesk-provider-gateway-D601:18789`;provider-gateway 再复用已注册的 provider WebSocket 通道,把 TCP open/data/close 消息转发给主 server backend-core 出网,不依赖 D601 本地直连公网。`NO_PROXY` 必须覆盖 `localhost`、`127.0.0.1`、`host.docker.internal`、provider-gateway 容器名、主 server 地址和 UniDesk 内部服务名,避免 PostgreSQL、OA Event Flow、ClaudeQQ、microservice health 等内网链路绕远或递归进入代理;`/health` 必须暴露 egress proxy 的 `enabled`、`connected`、`proxyUrl`、`channel=provider-gateway` 和上游 provider-gateway health,作为 Codex 网络卡死排障的第一证据。远程开发/执行容器不得只依赖这些环境变量,必须在容器网络层用 TUN 默认路由和 OUTPUT 防火墙强制外网流量只能经 master TUN 出口。 - 出网代理无 fallback 纪律:Code Queue 的运行时配置只允许一个默认出网路径,即 provider-gateway egress proxy;不得在代码中同时保留 Code Queue 自建 WebSocket proxy、临时 shell proxy、D601 本地直连公网、主 server direct HTTP proxy 等隐式分支。Compose 层必须显式设置大小写 `HTTP_PROXY`、`HTTPS_PROXY`、`ALL_PROXY` 和 `NO_PROXY`,服务启动后再把同一组变量写入 `process.env`,确保 service 自检、Codex/OpenCode app-server、任务 shell、`git`、`curl`、`npm` 使用一致路径。任何新增网络 fallback 都必须先进入本参考文档并配套 `/health` 可见状态,否则视为残留旧路径。 -- 上线纪律:Code Queue 相关的前端或后端改进必须在同一任务内正式上线并验证公网 frontend 或 live API,不能只停留在源码、构建产物或“后续再上线”。重建 `frontend` 只替换无状态 WebUI 容器,不会触碰 D601 `code-queue-backend`、PostgreSQL 队列或运行中 Codex thread,不能以“可能影响长期任务”为由延迟前端上线;`code-queue-backend` 本身带有 restart-recovery,允许按 D601 Compose 重启/替换,停止、重启或重建后必须从持久化状态恢复运行中和排队任务。修改 Code Queue 自身时不得等待当前 Code Queue task 结束、等待 queue idle 或等待 `0 running` 后才重启;这会等待自己退出形成自锁。应通过 D601 上的 `~/cq-deploy` symlink 执行 `cd ~/cq-deploy && CODE_QUEUE_ENV_FILE=.state/code-queue-d601.env docker compose -f src/components/microservices/code-queue/docker-compose.d601.yml up -d --build --force-recreate code-queue` 或等价 build-first 路径,并用恢复后的 live API 或公网 frontend 证明任务和队列仍可读可继续;不要在 provider-gateway Host SSH 命令中使用 `/home/ubuntu/unidesk-code-queue-deploy` 全路径触发 provider-gateway 自保护误判。 +- 上线纪律:Code Queue 相关的前端或后端改进必须在同一任务内正式上线并验证公网 frontend 或 live API,不能只停留在源码、构建产物或“后续再上线”。重建 `frontend` 只替换无状态 WebUI 容器,不会触碰 D601 `code-queue-backend`、PostgreSQL 队列或运行中 Codex thread,不能以“可能影响长期任务”为由延迟前端上线;`code-queue-backend` 本身带有 restart-recovery,允许按 D601 Compose 重启/替换,停止、重启或重建后必须从持久化状态恢复运行中和排队任务。修改 Code Queue 自身时不得等待当前 Code Queue task 结束、等待 queue idle 或等待 `0 running` 后才重启;这会等待自己退出形成自锁。应通过 D601 上的 `~/cq-deploy` symlink 执行 `cd ~/cq-deploy && CODE_QUEUE_ENV_FILE=.state/code-queue-d601.env docker compose -f src/components/microservices/code-queue/docker-compose.d601.yml up -d --build --force-recreate code-queue` 或等价 build-first 路径,并用恢复后的 live API 或公网 frontend 证明任务和队列仍可读可继续;不要在 provider-gateway Host SSH 命令中使用 `/home/ubuntu/unidesk-code-queue-deploy` 全路径触发 provider-gateway 自保护误判。该命令必须由 D601 provider SSH 在宿主侧 detached 调度,不能从正在运行的 `code-queue-backend` 容器里以前台方式执行会重建自身的 Compose 命令;否则发起命令的容器被替换时可能中断远程任务并留下 `Created`/`Exited` 的半完成容器。若 registry metadata 临时不可达且只是恢复已构建镜像,可用同一路径的 `--no-build --force-recreate` 先恢复服务,再完成源码 build 验证。 - 更名与灾备恢复:旧版 Codex 队列服务名只允许作为兼容诊断和一次性迁移来源;`code-queue-backend` 容器自身 `/health` 正常但 `microservice health code-queue` 返回 `microservice not found`、或服务目录仍只出现旧服务 ID 时,优先判定为 backend-core 仍加载旧 `MICROSERVICES_JSON`,必须刷新 `.state/docker-compose.env` 并显式重建/重建替换 `backend-core`,随后用 `microservice list` 验证 `id=code-queue`、`nodeBaseUrl=http://code-queue:4222` 和容器摘要。若更名后 `unidesk_code_queue_*` 为空而历史 `unidesk_codex_queue_*` 表仍有队列数据,恢复前必须先停止 `code-queue-backend`,备份 `.state/code-queue` 与当前 `unidesk_code_queue_*` 表,再把历史本地状态目录合并到 `.state/code-queue/`,并用 `docker exec -i unidesk-database psql ...` 这类保持 stdin 的方式把 `unidesk_codex_queue_tasks`、`unidesk_codex_queue_queues` 和 `unidesk_codex_queue_notifications` 迁移到对应 `unidesk_code_queue_*` 表;不得在确认 `/api/tasks`、`/api/queues` 和 output archive 可读前删除历史本地状态目录或旧 PostgreSQL 表。迁移完成后只允许在 D601 用 `docker compose -f src/components/microservices/code-queue/docker-compose.d601.yml up -d --build code-queue` 启动目标服务;禁止再通过主 server Compose 启动旧 `code-queue` service。 - Codex 认证:容器只从 D601 的 `/home/ubuntu/.codex/config.toml` 同步 Codex provider 配置到 D601 `.state/code-queue/codex-home`,并通过 D601 `.state/code-queue-d601.env` 透传 `OPENAI_API_KEY`、`CRS_OAI_KEY` 等 provider 所需变量;这些 provider 环境变量不得写入仓库,必须由 D601 Compose env-file 注入,确保容器重建和重启后不会丢失认证。新增 provider 的 `env_key` 时必须增加同类运行时透传和 Compose env 持久化,禁止把 Codex 或 MiniMax 密钥写入仓库文件。Code Queue 容器必须只读挂载 D601 host 的 SSH 目录到 `/root/.ssh`(默认 `/home/ubuntu/.ssh`),让容器内 `git push`、`ssh -T git@github.com` 与 host 使用同一套 GitHub SSH key/known_hosts;不得把私钥复制进镜像或仓库。 - Develop-ready 镜像:Code Queue 镜像必须在启动前预装 UniDesk/Pipeline 调试所需工具,至少包含 `codex`、`bun`、`node`、`npm`/`npx`、`git`、`rg`、`curl`、`python3`/`pip3`、`docker`、`docker compose`、`docker-compose`、`jq`、`ssh`、`rsync`、`make`、`gcc`/`g++`、`iptables`、`tar`、`gzip` 和 `unzip`;不得依赖 Codex 任务运行时再 `apt-get install` 这些基础环境。 diff --git a/src/components/microservices/code-queue/src/index.ts b/src/components/microservices/code-queue/src/index.ts index 4babfe01..cd0ca975 100644 --- a/src/components/microservices/code-queue/src/index.ts +++ b/src/components/microservices/code-queue/src/index.ts @@ -116,6 +116,7 @@ import { ReferenceTaskLookupError, configureReferences, injectReferencedTaskCont import { applyOaTraceStatsToTaskJson, configureOaEvents, + oaEventPublisherStatus, publishCodeQueueJudgeEvent, publishCodeQueueQueueUpdated, publishCodeQueueTaskUpdated, @@ -3789,6 +3790,7 @@ async function route(req: Request): Promise { service: "code-queue", queue: await queueSummaryForHealth(false), egressProxy: await providerGatewayEgressProxyStatus(), + oaEventPublisher: oaEventPublisherStatus(), startedAt: serviceStartedAt, }); if (url.pathname === "/logs") return jsonResponse({ ok: true, logs: recentLogs.slice(-parseLimit(url)) }); diff --git a/src/components/microservices/code-queue/src/judge.ts b/src/components/microservices/code-queue/src/judge.ts index 37107d30..6fcefa1e 100644 --- a/src/components/microservices/code-queue/src/judge.ts +++ b/src/components/microservices/code-queue/src/judge.ts @@ -129,8 +129,8 @@ function fallbackJudge(result: CodexRunResult, minimaxError?: string | JudgeFail } export const retryInstruction = "这是同一个 Codex thread 的 continuation,不是新任务。请基于上文继续完成原始任务;只做最小必要状态核查,避免从头重新摸索、避免重复已经完成的修改。"; -const codeQueueD601DeployCommand = "bun scripts/cli.ts ssh D601 'cd ~/cq-deploy && CODE_QUEUE_ENV_FILE=.state/code-queue-d601.env docker compose -f src/components/microservices/code-queue/docker-compose.d601.yml up -d --build --force-recreate code-queue'"; -const codeQueueRestartSafetyGuidance = `Code Queue 服务具备 restart-recovery,允许在任何时候重启、重建或替换 D601 的 \`code-queue-backend\`;当当前任务修改 Code Queue 自身时,禁止等待当前 Code Queue task 退出或等待队列归零后再重启,因为这会等待自己退出形成自锁。正确做法是在 D601 直接执行 \`${codeQueueD601DeployCommand}\` 或等价 no-deps force-recreate,并在恢复后用 live health/task 查询验证。`; +const codeQueueD601DeployCommand = "bun scripts/cli.ts ssh D601 \"nohup bash -lc 'cd ~/cq-deploy && CODE_QUEUE_ENV_FILE=.state/code-queue-d601.env docker compose -f src/components/microservices/code-queue/docker-compose.d601.yml up -d --build --force-recreate code-queue > /tmp/unidesk-code-queue-redeploy.log 2>&1' >/tmp/unidesk-code-queue-redeploy-launch.log 2>&1 & echo scheduled:/tmp/unidesk-code-queue-redeploy.log\""; +const codeQueueRestartSafetyGuidance = `Code Queue 服务具备 restart-recovery,允许在任何时候重启、重建或替换 D601 的 \`code-queue-backend\`;当当前任务修改 Code Queue 自身时,禁止等待当前 Code Queue task 退出或等待队列归零后再重启,因为这会等待自己退出形成自锁。正确做法是通过 D601 provider SSH 在宿主侧 detached 调度 \`${codeQueueD601DeployCommand}\` 或等价 detached no-deps force-recreate;禁止从正在运行的 \`code-queue-backend\` 容器里以前台方式执行会重建自身的 compose 命令,否则发起命令的容器被替换时可能留下 Created/Exited 的半完成状态。调度后在恢复的 live health/task 查询中验证。`; export function explicitUserInterrupt(task: QueueTask, result: CodexRunResult): boolean { return result.terminalStatus === "interrupted" diff --git a/src/components/microservices/code-queue/src/oa-events.ts b/src/components/microservices/code-queue/src/oa-events.ts index edfae6ae..1ec9d917 100644 --- a/src/components/microservices/code-queue/src/oa-events.ts +++ b/src/components/microservices/code-queue/src/oa-events.ts @@ -23,6 +23,11 @@ interface OaEventEnvelope { payload: JsonRecord; } +interface QueuedOaEvent { + event: OaEventEnvelope; + attempts: number; +} + export interface OaTraceStats extends JsonRecord { scopeId: string; source: "oa-event-flow"; @@ -42,8 +47,22 @@ export interface OaTraceStepSummary { source: "oa-event-flow"; } -const postTimeoutMs = 2500; +const postTimeoutMs = 10000; +const publishConcurrency = 4; +const maxQueuedEvents = 20_000; +const maxPublishAttempts = 5; let context: OaEventContext | null = null; +let queuedEvents: QueuedOaEvent[] = []; +let activePublishes = 0; +let totalEnqueued = 0; +let totalSucceeded = 0; +let totalFailed = 0; +let totalRetried = 0; +let totalAbandoned = 0; +let totalDropped = 0; +let lastEnqueuedAt: string | null = null; +let lastSucceededAt: string | null = null; +let lastError: JsonRecord | null = null; export function configureOaEvents(runtimeContext: OaEventContext): void { const baseUrl = runtimeContext.baseUrl.replace(/\/+$/u, ""); @@ -78,22 +97,91 @@ function taskTags(task: QueueTask, queueId: string, extra: string[] = [], attemp return tags; } -function postOaEvent(event: OaEventEnvelope): void { +export function oaEventPublisherStatus(): JsonRecord { + return { + pending: queuedEvents.length, + active: activePublishes, + concurrency: publishConcurrency, + maxQueuedEvents, + maxPublishAttempts, + totalEnqueued, + totalSucceeded, + totalFailed, + totalRetried, + totalAbandoned, + totalDropped, + lastEnqueuedAt, + lastSucceededAt, + lastError, + }; +} + +async function sendOaEvent(item: QueuedOaEvent): Promise { const runtime = ctx(); + const event = item.event; const controller = new AbortController(); const timer = setTimeout(() => controller.abort(), postTimeoutMs); - void fetch(`${runtime.baseUrl}/api/events`, { - method: "POST", - headers: { "content-type": "application/json" }, - body: JSON.stringify(event), - signal: controller.signal, - }).then(async (response) => { + try { + const response = await fetch(`${runtime.baseUrl}/api/events`, { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify(event), + signal: controller.signal, + }); if (!response.ok) { - runtime.logger("warn", "oa_event_publish_failed", { eventId: event.eventId, type: event.type, status: response.status, body: safePreview(await response.text(), 1000) }); + totalFailed += 1; + lastError = { eventId: event.eventId, type: event.type, attempt: item.attempts, status: response.status, body: safePreview(await response.text(), 1000) }; + runtime.logger("warn", "oa_event_publish_failed", lastError); + return false; } - }).catch((error) => { - runtime.logger("warn", "oa_event_publish_error", { eventId: event.eventId, type: event.type, error: error instanceof Error ? error.message : String(error) }); - }).finally(() => clearTimeout(timer)); + totalSucceeded += 1; + lastSucceededAt = runtime.nowIso(); + return true; + } catch (error) { + totalFailed += 1; + lastError = { eventId: event.eventId, type: event.type, attempt: item.attempts, error: error instanceof Error ? error.message : String(error) }; + runtime.logger("warn", "oa_event_publish_error", lastError); + return false; + } finally { + clearTimeout(timer); + } +} + +function pumpOaEventQueue(): void { + while (activePublishes < publishConcurrency && queuedEvents.length > 0) { + const item = queuedEvents.shift(); + if (item === undefined) return; + activePublishes += 1; + void sendOaEvent(item).then((ok) => { + if (!ok) { + if (item.attempts < maxPublishAttempts) { + totalRetried += 1; + queuedEvents.push({ event: item.event, attempts: item.attempts + 1 }); + } else { + totalAbandoned += 1; + lastError = { eventId: item.event.eventId, type: item.event.type, attempt: item.attempts, error: "oa event publish attempts exhausted" }; + ctx().logger("error", "oa_event_publish_abandoned", lastError); + } + } + }).finally(() => { + activePublishes = Math.max(0, activePublishes - 1); + pumpOaEventQueue(); + }); + } +} + +function postOaEvent(event: OaEventEnvelope): void { + const runtime = ctx(); + totalEnqueued += 1; + lastEnqueuedAt = runtime.nowIso(); + if (queuedEvents.length >= maxQueuedEvents) { + const dropped = queuedEvents.shift(); + totalDropped += 1; + lastError = { eventId: dropped?.event.eventId ?? "", type: dropped?.event.type ?? "", error: "oa event publish queue overflow" }; + runtime.logger("error", "oa_event_publish_queue_overflow", { ...lastError, pending: queuedEvents.length, maxQueuedEvents }); + } + queuedEvents.push({ event, attempts: 1 }); + pumpOaEventQueue(); } function normalizeCommandText(text: string): string {