From 9d46ca2531ddee753db3e7bf0af5edd5cbe69e60 Mon Sep 17 00:00:00 2001 From: Codex Date: Tue, 19 May 2026 03:32:16 +0000 Subject: [PATCH] fix(code-queue): add issue 3 gateway diagnostics --- docs/reference/codex-deploy.md | 8 +- docs/reference/deploy.md | 4 + scripts/code-queue-issue3-regression-test.ts | 106 ++++++ scripts/src/check.ts | 3 + scripts/src/deploy.ts | 104 +++++- .../backend-core/src/microservice_proxy.rs | 316 +++++++++++++++++- .../code-queue-mgr/src-rs/main.rs | 225 ++++++++++++- .../microservices/code-queue-mgr/src/index.ts | 112 +++++++ .../code-queue/src/code-agent/codex.ts | 15 +- .../code-queue/src/code-agent/common.ts | 4 + .../code-queue/src/code-agent/opencode.ts | 13 +- .../code-queue/src/execution-diagnostics.ts | 221 ++++++++++++ .../microservices/code-queue/src/index.ts | 157 ++++++++- .../microservices/code-queue/src/queue-api.ts | 22 +- .../microservices/code-queue/src/task-view.ts | 3 + .../microservices/code-queue/src/types.ts | 64 ++++ .../k3sctl-adapter/k3s/code-queue.k3s.json | 82 ++++- .../microservices/k3sctl-adapter/src/index.ts | 74 +++- 18 files changed, 1494 insertions(+), 39 deletions(-) create mode 100644 scripts/code-queue-issue3-regression-test.ts create mode 100644 src/components/microservices/code-queue/src/execution-diagnostics.ts diff --git a/docs/reference/codex-deploy.md b/docs/reference/codex-deploy.md index bb45d7be..2aa531d2 100644 --- a/docs/reference/codex-deploy.md +++ b/docs/reference/codex-deploy.md @@ -20,8 +20,8 @@ bun scripts/cli.ts codex deploy 2. 在 D601 的 deploy cache 中通过本机 provider-gateway WS egress proxy 执行 `git fetch` remote,并用 `git archive ` 导出 tracked files 到一次性 export 目录;不得让 D601 直连 GitHub,也不得临时创建 SSH SOCKS、公网 master proxy 或 backend-core/provider-ingress fallback。 3. 用 `rsync --delete` 同步导出的 repo 到 `/home/ubuntu/cq-deploy`,保留 `.state/`、`logs/`、`.git/`、`node_modules/` 和 `dist/`。 4. 在 D601 用目标 Docker daemon 的本地 BuildKit builder 构建 `unidesk-code-queue:d601`,复用 D601 上已有基础镜像、inline cache 和 Code Queue build-base;provider-gateway WS egress 是唯一允许的构建代理通道,只作为本次 build 的环境变量与 build-arg 注入,并配合本次 build 的 `--network host` 让 RUN 阶段访问 D601 宿主 loopback proxy,不能污染 D601 宿主 Docker/HTTP proxy 配置,不能新建 SSH SOCKS、公网 master proxy 或直连 fallback。 -5. `docker save` 镜像并导入原生 k3s containerd:`docker save unidesk-code-queue:d601 | sudo ctr --address /run/k3s/containerd/containerd.sock -n k8s.io images import -`。 -6. `kubectl apply -f src/components/microservices/k3sctl-adapter/k3s/code-queue.k8s.yaml`,其中包含 Code Queue 和 `d601-tcp-egress-gateway`。 +5. `docker save` 镜像并导入原生 k3s containerd:`docker save unidesk-code-queue:d601 | sudo ctr --address /run/k3s/containerd/containerd.sock -n k8s.io images import -`。导入后必须用同一个 containerd socket 验证 `unidesk-code-queue:d601` tag 存在;D601 Docker daemon 的本地 tag 不是 k3s containerd 的 source of truth。 +6. `kubectl apply -f src/components/microservices/k3sctl-adapter/k3s/code-queue.k8s.yaml`,其中包含 Code Queue、`d601-provider-egress-proxy` 和 `d601-tcp-egress-gateway`。apply 后必须验证 `code-queue`、`code-queue-read`、`code-queue-write`、`d601-provider-egress-proxy`、`d601-tcp-egress-gateway` 这些 Deployment 的 container image 都是 `unidesk-code-queue:d601`,不能让 kubelet 回退到 Docker Hub 或其他外部 registry。 7. 将解析后的 40 位 remote commit 写入 `deployment/code-queue` 的 `CODE_QUEUE_DEPLOY_COMMIT` / `CODE_QUEUE_DEPLOY_REQUESTED_COMMIT`,并记录到 Deployment annotation。 8. `kubectl -n unidesk rollout restart deployment/d601-tcp-egress-gateway deployment/code-queue` 并等待 rollout 完成。 9. 通过 backend-core 的真实微服务代理读取 Code Queue `/health`,强制校验 `deploy.commit` 等于本次解析出的 remote commit;如果健康的是旧服务或旧 Pod,job 必须失败。 @@ -37,7 +37,9 @@ bun scripts/cli.ts microservice health code-queue bun scripts/cli.ts microservice proxy code-queue '/api/tasks/overview?limit=5&transcriptLimit=1&compact=1&afterSeq=0&preferId=' ``` -D601 原生 k3s 的人工诊断必须显式使用 host kubeconfig:`KUBECONFIG=/etc/rancher/k3s/k3s.yaml kubectl -n unidesk ...`。D601 上的默认 `kubectl` context 可能指向 Docker Desktop 或其他本地集群,不能作为 UniDesk Code Queue 部署是否 ready 的证据。部署后直接查 k3s 时,至少确认 `deployment/code-queue`、`code-queue-read`、`code-queue-write`、`d601-provider-egress-proxy` 和 `d601-tcp-egress-gateway` ready,Pod 环境中的 `UNIDESK_DEPLOY_REQUESTED_COMMIT`/`CODE_QUEUE_DEPLOY_REQUESTED_COMMIT` 等于期望 commit,并且 scheduler `/health` 暴露 PostgreSQL ready、`storage.lastError=null`、egress proxy connected 和 MiniMax `NO_PROXY` 例外。 +D601 原生 k3s 的人工诊断必须显式使用 host kubeconfig:`KUBECONFIG=/etc/rancher/k3s/k3s.yaml kubectl -n unidesk ...`。D601 上的默认 `kubectl` context 可能指向 Docker Desktop 或其他本地集群,不能作为 UniDesk Code Queue 部署是否 ready 的证据。部署后直接查 k3s 时,至少确认 `deployment/code-queue`、`code-queue-read`、`code-queue-write`、`d601-provider-egress-proxy` 和 `d601-tcp-egress-gateway` ready,两个 egress Service 的 Endpoints 非空,Pod 环境中的 `UNIDESK_DEPLOY_REQUESTED_COMMIT`/`CODE_QUEUE_DEPLOY_REQUESTED_COMMIT` 等于期望 commit,并且 scheduler `/health` 暴露 PostgreSQL ready、`storage.lastError=null`、egress proxy connected、stale active/retry_wait reconcile 状态和 MiniMax `NO_PROXY` 例外。 + +`bun scripts/cli.ts microservice diagnostics code-queue` 必须作为 Code Queue 的长期诊断入口。它必须显式报告 `d601-provider-egress-proxy` 和 `d601-tcp-egress-gateway` 的 Deployment available、Endpoint non-empty、scheduler 到 PostgreSQL route、以及 stale active/retry_wait reconcile;任何一项失败都应让诊断结果 degraded/failing,而不是只显示 scheduler HTTP healthy。 ## Boundaries diff --git a/docs/reference/deploy.md b/docs/reference/deploy.md index 7b4f7628..32f278e4 100644 --- a/docs/reference/deploy.md +++ b/docs/reference/deploy.md @@ -163,6 +163,10 @@ The reconciler selects the executor from `config.json`: - Control bridges that UniDesk needs in order to inspect or repair an orchestrator must stay in this direct class. In particular, `k3sctl-adapter` is a UniDesk-managed bridge to native k3s and must remain outside k3s; Docker packaging on Docker Desktop/WSL must create an explicit host-local bridge, currently an adapter-container SSH local tunnel, to reach `/etc/rancher/k3s/k3s.yaml` and WSL `127.0.0.1:6443`. - `deployment.mode=k3sctl-managed`: the target behavior is to build on the active control target, verify native k3s on the host OS/WSL distro, import the image into native k3s/containerd, apply the existing Kubernetes manifest, stamp the Deployment and wait for rollout. On D601, persistent dev apply is currently allowed only for `backend-core` and `frontend` in `unidesk-dev`; normal production services still cannot use a maintenance-channel direct rollout. The executor must use the native kubeconfig and containerd socket, for example `/etc/rancher/k3s/k3s.yaml` and `/run/k3s/containerd/containerd.sock`; running k3s itself in Docker is forbidden for both control-plane and worker nodes. A `rancher/k3s` image or legacy container may only be used as a temporary artifact source during migration, and any active containerized k3s control plane must be stopped before verification succeeds. The executor must preload a valid `rancher/mirrored-pause:3.6` sandbox image into native k3s containerd through the provider-gateway one-shot egress path, verify its entrypoint is `/pause`, and reject fake or sleep-based replacement images. Code Queue's k3s migration executor must also stop/remove the legacy direct Docker `code-queue-backend` after k3s rollout, so there is never a second scheduler running beside the native k3s scheduler. +D601 Docker local images are not the source of truth for k3s runtime availability. For Code Queue, the deploy gate must verify `unidesk-code-queue:d601` exists in native k3s containerd after import with `ctr --address /run/k3s/containerd/containerd.sock -n k8s.io images ls`, and it must fail before rollout if the tag is missing. The same gate must verify every production Code Queue Deployment that uses the image (`code-queue`, `code-queue-read`, `code-queue-write`, `d601-provider-egress-proxy`, `d601-tcp-egress-gateway`) still references exactly `unidesk-code-queue:d601`; otherwise kubelet may attempt an external registry pull and leave base gateways in `ImagePullBackOff`. + +Code Queue health and diagnostics must cover its k3s dependencies, not only scheduler HTTP health. `bun scripts/cli.ts microservice diagnostics code-queue` and the `/health` aggregation must mark the service degraded/failing when `d601-provider-egress-proxy` or `d601-tcp-egress-gateway` Deployment availability or Endpoint readiness is missing, when the scheduler reports `storage.lastError` or PostgreSQL route failure through `d601-tcp-egress-gateway.unidesk.svc.cluster.local:15432`, or when stale active/retry_wait reconcile reports recoverable active tasks without a local run. + Existing service-specific commands such as Code Queue deploy are disabled as direct D601 deploy paths. Their build/import/rollout semantics should converge later into one controlled target-side deployment path instead of keeping parallel implementations. Decision Center is a standard `k3sctl-managed` service in this model, but D601 maintenance-channel direct apply must not deploy it. Future controlled CD for Decision Center should build `src/components/microservices/decision-center/Dockerfile` on D601, import `unidesk-decision-center:d601` into native k3s containerd, apply `src/components/microservices/k3sctl-adapter/k3s/decision-center.k8s.yaml`, stamp the Deployment, and verify health through `/api/microservices/decision-center/health`. It must not add a main-server Compose service, NodePort, hostPort, or provider-gateway direct HTTP backend for Decision Center. diff --git a/scripts/code-queue-issue3-regression-test.ts b/scripts/code-queue-issue3-regression-test.ts new file mode 100644 index 00000000..2227a380 --- /dev/null +++ b/scripts/code-queue-issue3-regression-test.ts @@ -0,0 +1,106 @@ +import { readFileSync } from "node:fs"; +import { codeQueueContainerdImagePreflight, codeQueueManifestImagePreflight } from "./src/deploy"; + +type JsonRecord = Record; + +function assertCondition(condition: unknown, message: string, detail: JsonRecord = {}): void { + if (!condition) throw new Error(`${message}: ${JSON.stringify(detail)}`); +} + +function gatewayCheck(diagnostics: JsonRecord): JsonRecord { + const checks = diagnostics.checks as JsonRecord | undefined; + const deployment = checks?.deployment as JsonRecord | undefined; + const endpoint = checks?.endpoint as JsonRecord | undefined; + const target = checks?.targetService as JsonRecord | undefined; + const deploymentAvailable = deployment?.available === true; + const endpointNonEmpty = Number(endpoint?.readyAddressCount ?? 0) > 0; + return { + ok: diagnostics.ok === true && deploymentAvailable && endpointNonEmpty && target?.ok === true, + deploymentAvailable, + endpointNonEmpty, + targetServiceOk: target?.ok === true, + }; +} + +function schedulerStorageCheck(schedulerHealth: JsonRecord): JsonRecord { + const queue = schedulerHealth.queue as JsonRecord | undefined; + const storage = queue?.storage as JsonRecord | undefined; + const lastError = storage?.lastError ?? null; + return { + ok: storage?.postgresReady === true && lastError === null, + postgresReady: storage?.postgresReady === true, + lastError, + }; +} + +function staleReconcileCheck(schedulerHealth: JsonRecord): JsonRecord { + const queue = schedulerHealth.queue as JsonRecord | undefined; + const reconcile = queue?.reconcile as JsonRecord | undefined; + const recoverable = Number(reconcile?.recoverableOrphanedActiveTaskCount ?? queue?.orphanedActiveTaskCount ?? 0); + return { + ok: recoverable === 0, + recoverableOrphanedActiveTaskCount: recoverable, + retryWaitTaskCount: Number(reconcile?.retryWaitTaskCount ?? 0), + }; +} + +function runIssue3Regression(): JsonRecord { + const missingEndpointGateway = gatewayCheck({ + ok: false, + checks: { + deployment: { ok: true, available: true, availableReplicas: 1 }, + endpoint: { ok: false, readyAddressCount: 0 }, + targetService: { ok: false }, + }, + }); + assertCondition(missingEndpointGateway.ok === false, "microservice:code-queue-egress-gateway-health must fail on empty endpoint", missingEndpointGateway); + assertCondition(missingEndpointGateway.deploymentAvailable === true, "gateway deployment availability should be reported separately", missingEndpointGateway); + assertCondition(missingEndpointGateway.endpointNonEmpty === false, "gateway endpoint non-empty check should be explicit", missingEndpointGateway); + + const storageFailure = schedulerStorageCheck({ + queue: { + storage: { + postgresReady: false, + lastError: "CONNECT_TIMEOUT d601-tcp-egress-gateway.unidesk.svc.cluster.local:15432", + }, + }, + }); + assertCondition(storageFailure.ok === false, "diagnostics must fail when scheduler storage.lastError reports PostgreSQL route failure", storageFailure); + assertCondition(String(storageFailure.lastError).includes("CONNECT_TIMEOUT"), "storage failure detail should preserve route error", storageFailure); + + const staleReconcile = staleReconcileCheck({ + queue: { + orphanedActiveTaskCount: 1, + reconcile: { + recoverableOrphanedActiveTaskCount: 1, + retryWaitTaskCount: 2, + }, + }, + }); + assertCondition(staleReconcile.ok === false, "code-queue:stale-active-reconcile must fail while recoverable active tasks remain", staleReconcile); + assertCondition(staleReconcile.retryWaitTaskCount === 2, "retry_wait reconcile count should be visible", staleReconcile); + + const missingImage = codeQueueContainerdImagePreflight("docker.io/library/busybox:latest\n", "unidesk-code-queue:d601"); + assertCondition(missingImage.ok === false, "code-queue:containerd-image-preflight must fail when k3s containerd lacks unidesk-code-queue:d601", missingImage); + const presentImage = codeQueueContainerdImagePreflight("docker.io/library/busybox:latest\nunidesk-code-queue:d601 application/vnd.oci.image.manifest.v1+json\n", "unidesk-code-queue:d601"); + assertCondition(presentImage.ok === true, "code-queue:containerd-image-preflight should pass when the tag is present", presentImage); + + const manifestPath = "src/components/microservices/k3sctl-adapter/k3s/code-queue.k8s.yaml"; + const manifest = readFileSync(manifestPath, "utf8"); + const manifestImage = codeQueueManifestImagePreflight(manifest, "unidesk-code-queue:d601"); + assertCondition(manifestImage.ok === true, "Code Queue k8s manifest deployments must all use unidesk-code-queue:d601", manifestImage); + + return { + ok: true, + checks: [ + { name: "microservice:code-queue-egress-gateway-health", ok: true }, + { name: "microservice:code-queue-postgres-route-health", ok: true }, + { name: "code-queue:containerd-image-preflight", ok: true }, + { name: "code-queue:stale-active-reconcile", ok: true }, + ], + }; +} + +if (import.meta.main) { + process.stdout.write(`${JSON.stringify(runIssue3Regression(), null, 2)}\n`); +} diff --git a/scripts/src/check.ts b/scripts/src/check.ts index 01551816..731eef6b 100644 --- a/scripts/src/check.ts +++ b/scripts/src/check.ts @@ -236,6 +236,7 @@ export function runChecks(config: UniDeskConfig, options: CheckOptions = default fileItem("src/components/microservices/code-queue-mgr/src/index.ts"), fileItem("src/components/microservices/code-queue-mgr/src/prompt-observation.ts"), fileItem("scripts/src/deploy.ts"), + fileItem("scripts/code-queue-issue3-regression-test.ts"), fileItem("scripts/src/ci.ts"), fileItem("scripts/src/e2e.ts"), fileItem("scripts/code-queue-prompt-observation-test.ts"), @@ -248,9 +249,11 @@ export function runChecks(config: UniDeskConfig, options: CheckOptions = default if (options.scriptsTypecheck) { items.push(commandItem("typescript:scripts", ["bunx", "tsc", "-p", "scripts/tsconfig.json", "--noEmit", "--pretty", "false"], 120_000)); items.push(commandItem("code-queue:prompt-observation-contract", ["bun", "scripts/code-queue-prompt-observation-test.ts"], 30_000)); + items.push(commandItem("code-queue:issue3-diagnostics-and-image-preflight", ["bun", "scripts/code-queue-issue3-regression-test.ts"], 30_000)); } else { items.push(skippedItem("typescript:scripts", "scripts TypeScript typecheck is opt-in", "--scripts-typecheck or --full")); items.push(skippedItem("code-queue:prompt-observation-contract", "prompt observation contract is opt-in with script checks", "--scripts-typecheck or --full")); + items.push(skippedItem("code-queue:issue3-diagnostics-and-image-preflight", "Code Queue issue #3 regression fixtures are opt-in with script checks", "--scripts-typecheck or --full")); } if (options.logs) { items.push(unifiedLogRotationItem()); diff --git a/scripts/src/deploy.ts b/scripts/src/deploy.ts index 1ccb19e1..db176170 100644 --- a/scripts/src/deploy.ts +++ b/scripts/src/deploy.ts @@ -853,6 +853,40 @@ function buildImageTag(service: UniDeskMicroserviceConfig): string { return `unidesk-${service.id}:${service.providerId.toLowerCase()}`; } +export function codeQueueContainerdImagePreflight(imageListText: string, expectedImage: string): { ok: boolean; expectedImage: string; matchedLine: string | null; error: string | null } { + const matchedLine = imageListText + .split(/\r?\n/u) + .map((line) => line.trim()) + .find((line) => line.length > 0 && line.includes(expectedImage)) ?? null; + return matchedLine === null + ? { + ok: false, + expectedImage, + matchedLine: null, + error: `native k3s containerd is missing required image tag: ${expectedImage}`, + } + : { ok: true, expectedImage, matchedLine, error: null }; +} + +export function codeQueueManifestImagePreflight(manifestText: string, expectedImage: string): { ok: boolean; expectedImage: string; objects: Array<{ kind: string; name: string; images: string[]; ok: boolean }>; errors: string[] } { + const requiredObjectNames = new Set(["code-queue", "code-queue-read", "code-queue-write", "d601-provider-egress-proxy", "d601-tcp-egress-gateway"]); + const objects: Array<{ kind: string; name: string; images: string[]; ok: boolean }> = []; + for (const documentText of manifestText.split(/^---\s*$/mu)) { + const kind = documentText.match(/^kind:\s*(\S+)\s*$/mu)?.[1] ?? ""; + if (kind !== "Deployment") continue; + const deploymentName = documentText.match(/^metadata:\s*$[\s\S]*?^\s{2}name:\s*([A-Za-z0-9_.-]+)\s*$/mu)?.[1] ?? ""; + if (!requiredObjectNames.has(deploymentName)) continue; + const images = Array.from(documentText.matchAll(/^\s*image:\s*(?:"([^"]+)"|([^\s#]+))/gmu)).map((match) => match[1] ?? match[2] ?? ""); + objects.push({ kind, name: deploymentName, images, ok: images.length > 0 && images.every((image) => image === expectedImage) }); + } + const presentNames = new Set(objects.map((object) => object.name)); + const errors = [ + ...Array.from(requiredObjectNames).filter((name) => !presentNames.has(name)).map((name) => `required deployment missing from manifest: ${name}`), + ...objects.filter((object) => !object.ok).map((object) => `deployment ${object.name} must use only ${expectedImage}; found ${object.images.join(",") || ""}`), + ]; + return { ok: errors.length === 0, expectedImage, objects, errors }; +} + function directComposeFile(service: UniDeskMicroserviceConfig): string { return targetIsMain(service) ? rootPath("docker-compose.yml") @@ -1554,17 +1588,54 @@ function ensureNativeK3sScript(): string { function importK3sImageScript(service: UniDeskMicroserviceConfig): string { const image = buildImageTag(service); const archive = `/tmp/unidesk-${safeId(service.id)}-k3s-image.tar`; + const manifest = `${targetWorkDir(service)}/${k8sManifestPath(service)}`; return [ "set -euo pipefail", ...rootAccessPrelude(), `image=${shellQuote(image)}`, `archive=${shellQuote(archive)}`, + `manifest=${shellQuote(manifest)}`, "docker image inspect \"$image\" >/dev/null", + "if [ -f \"$manifest\" ]; then", + " bad_manifest_images=$(python3 - \"$manifest\" \"$image\" <<'PY'", + "import re, sys", + "path, expected = sys.argv[1], sys.argv[2]", + "required = {'code-queue','code-queue-read','code-queue-write','d601-provider-egress-proxy','d601-tcp-egress-gateway'}", + "text = open(path, encoding='utf-8').read()", + "bad = []", + "seen = set()", + "for doc in re.split(r'(?m)^---\\s*$', text):", + " if not re.search(r'(?m)^kind:\\s*Deployment\\s*$', doc):", + " continue", + " match = re.search(r'(?ms)^metadata:\\s*$.*?^ name:\\s*([A-Za-z0-9_.-]+)\\s*$', doc)", + " name = match.group(1) if match else ''", + " if name not in required:", + " continue", + " seen.add(name)", + " images = re.findall(r'(?m)^\\s*image:\\s*\"?([^\"\\s#]+)\"?', doc)", + " if not images or any(image != expected for image in images):", + " found = ','.join(images) if images else ''", + " bad.append(f'{name}:{found}')", + "missing = sorted(required - seen)", + "bad.extend(f'{name}:' for name in missing)", + "print('\\n'.join(bad))", + "PY", + " )", + " if [ -n \"$bad_manifest_images\" ]; then", + " printf 'code_queue_manifest_image_preflight_failed image=%s\\n%s\\n' \"$image\" \"$bad_manifest_images\" >&2", + " exit 1", + " fi", + " echo code_queue_manifest_image_preflight=ok image=$image", + "fi", "rm -f \"$archive\"", "docker save \"$image\" -o \"$archive\"", `root_exec ctr --address ${shellQuote(nativeK3sCtrAddress)} -n k8s.io images import "$archive"`, "rm -f \"$archive\"", - `root_exec ctr --address ${shellQuote(nativeK3sCtrAddress)} -n k8s.io images ls | grep -F "$image" || true`, + `if ! root_exec ctr --address ${shellQuote(nativeK3sCtrAddress)} -n k8s.io images ls | grep -F "$image"; then`, + " printf 'native_k3s_containerd_image_missing=%s\\n' \"$image\" >&2", + " exit 1", + "fi", + "printf 'native_k3s_containerd_image_present=%s\\n' \"$image\"", ].join("\n"); } @@ -1621,6 +1692,35 @@ function applyK8sScript(service: UniDeskMicroserviceConfig): string { ].filter(Boolean).join("\n"); } +function verifyK8sImagesScript(service: UniDeskMicroserviceConfig): string { + const namespace = k8sNamespaceForService(service); + const image = buildImageTag(service); + const deployments = k8sDeploymentsForService(service); + return [ + "set -euo pipefail", + `image=${shellQuote(image)}`, + `namespace=${shellQuote(namespace)}`, + `deployments=(${deployments.map(shellQuote).join(" ")})`, + "for deployment in \"${deployments[@]}\"; do", + ` actual=$(KUBECONFIG=${shellQuote(k8sKubeconfig)} kubectl -n "$namespace" get deployment "$deployment" -o jsonpath='{range .spec.template.spec.containers[*]}{.image}{\"\\n\"}{end}')`, + " if [ -z \"$actual\" ]; then", + " echo \"deployment_image_missing=$deployment\" >&2", + " exit 1", + " fi", + " while IFS= read -r actual_image; do", + " [ -z \"$actual_image\" ] && continue", + " if [ \"$actual_image\" != \"$image\" ]; then", + " printf 'deployment_image_mismatch deployment=%s expected=%s actual=%s\\n' \"$deployment\" \"$image\" \"$actual_image\" >&2", + " exit 1", + " fi", + " done < `deployment/${name}`); const namespace = k8sNamespaceForService(service); @@ -2223,6 +2323,8 @@ async function applyOneService(config: UniDeskConfig, service: UniDeskMicroservi if (!pushStep(steps, imageImport)) return { ok: false, serviceId: service.id, startedAt, finishedAt: nowIso(), resolvedCommit, before, steps }; const apply = await step(config, service, "kubectl-apply", applyK8sScript(service), targetWorkDir(service), 60_000, false); if (!pushStep(steps, apply)) return { ok: false, serviceId: service.id, startedAt, finishedAt: nowIso(), resolvedCommit, before, steps }; + const k8sImages = await step(config, service, "k8s-image-preflight", verifyK8sImagesScript(service), targetWorkDir(service), 60_000, false); + if (!pushStep(steps, k8sImages)) return { ok: false, serviceId: service.id, startedAt, finishedAt: nowIso(), resolvedCommit, before, steps }; const stamp = await step(config, service, "stamp-deploy-commit", stampK8sScript(service, desired, resolvedCommit), targetWorkDir(service), 60_000, false); if (!pushStep(steps, stamp)) return { ok: false, serviceId: service.id, startedAt, finishedAt: nowIso(), resolvedCommit, before, steps }; const rollout = await step(config, service, "rollout", rolloutK8sScript(service), targetWorkDir(service), 240_000, true); diff --git a/src/components/backend-core/src/microservice_proxy.rs b/src/components/backend-core/src/microservice_proxy.rs index c557a67e..f47c54be 100644 --- a/src/components/backend-core/src/microservice_proxy.rs +++ b/src/components/backend-core/src/microservice_proxy.rs @@ -149,6 +149,168 @@ fn code_queue_k3s_service_id_for_request(method: &Method, target_path: &str) -> } } +fn json_bool(value: &Value, path: &[&str]) -> Option { + let mut current = value; + for key in path { + current = current.get(*key)?; + } + current.as_bool() +} + +fn json_i64(value: &Value, path: &[&str]) -> Option { + let mut current = value; + for key in path { + current = current.get(*key)?; + } + current.as_i64() +} + +fn code_queue_route_postgres_check(scheduler_body: &Value) -> Value { + let storage = scheduler_body + .get("queue") + .and_then(|queue| queue.get("storage")) + .unwrap_or(&Value::Null); + let postgres_ready = storage + .get("postgresReady") + .and_then(Value::as_bool) + .unwrap_or(false); + let last_error = storage.get("lastError").cloned().unwrap_or(Value::Null); + json!({ + "ok": postgres_ready && last_error.is_null(), + "route": "d601-tcp-egress-gateway.unidesk.svc.cluster.local:15432", + "postgresReady": postgres_ready, + "lastError": last_error, + "source": "scheduler.health.queue.storage" + }) +} + +fn code_queue_reconcile_check(scheduler_body: &Value) -> Value { + let queue = scheduler_body.get("queue").unwrap_or(&Value::Null); + let reconcile = queue.get("reconcile").unwrap_or(&Value::Null); + let orphaned_active_count = queue + .get("orphanedActiveTaskCount") + .and_then(Value::as_i64) + .unwrap_or(0); + let active_run_slot_waiters = queue + .get("activeRunSlotWaiters") + .and_then(Value::as_array) + .map(|items| items.len() as i64) + .unwrap_or(0); + let recoverable_orphaned_active_count = reconcile + .get("recoverableOrphanedActiveTaskCount") + .and_then(Value::as_i64) + .unwrap_or(orphaned_active_count); + let retry_wait_count = reconcile + .get("retryWaitTaskCount") + .and_then(Value::as_i64) + .or_else(|| { + queue + .get("counts") + .and_then(|counts| counts.get("retry_wait")) + .and_then(Value::as_i64) + }) + .unwrap_or(0); + let active_without_local_run = recoverable_orphaned_active_count > 0; + json!({ + "ok": !active_without_local_run, + "status": if active_without_local_run { "recovery-required" } else { "idle-or-reconciled" }, + "orphanedActiveTaskCount": orphaned_active_count, + "recoverableOrphanedActiveTaskCount": recoverable_orphaned_active_count, + "retryWaitTaskCount": retry_wait_count, + "activeRunSlotWaiterCount": active_run_slot_waiters, + "lastRunAt": reconcile.get("lastRunAt").cloned().unwrap_or(Value::Null), + "lastReason": reconcile.get("lastReason").cloned().unwrap_or(Value::Null), + "lastRecovered": reconcile.get("lastRecovered").cloned().unwrap_or(Value::Null), + "source": "scheduler.health.queue.reconcile" + }) +} + +fn code_queue_dependency_check(service_id: &str, diagnostics: &Value) -> Value { + let checks = diagnostics.get("checks").unwrap_or(&Value::Null); + let deployment_available = json_bool(checks, &["deployment", "available"]).unwrap_or(false); + let endpoint_non_empty = json_i64(checks, &["endpoint", "readyAddressCount"]) + .map(|count| count > 0) + .unwrap_or_else(|| json_bool(checks, &["endpoint", "ok"]).unwrap_or(false)); + let target_ok = json_bool(checks, &["targetService", "ok"]) + .or_else(|| json_bool(checks, &["kubernetesApiServiceProxy", "ok"])) + .or_else(|| json_bool(checks, &["managedService", "ok"])) + .unwrap_or(false); + let ok = diagnostics + .get("ok") + .and_then(Value::as_bool) + .unwrap_or(false) + && deployment_available + && endpoint_non_empty + && target_ok; + json!({ + "ok": ok, + "serviceId": service_id, + "deploymentAvailable": deployment_available, + "endpointNonEmpty": endpoint_non_empty, + "targetServiceOk": target_ok, + "deployment": compact_json(checks.get("deployment").unwrap_or(&json!({ "ok": false, "skipped": true }))), + "endpoint": compact_json(checks.get("endpoint").unwrap_or(&json!({ "ok": false, "skipped": true }))), + "targetService": compact_json(checks.get("targetService").unwrap_or(&json!({ "ok": false, "skipped": true }))), + "managedService": compact_json(checks.get("managedService").unwrap_or(&json!({ "ok": false, "skipped": true }))), + "kubernetesApiServiceProxy": compact_json(checks.get("kubernetesApiServiceProxy").unwrap_or(&json!({ "ok": false, "skipped": true }))) + }) +} + +async fn code_queue_adapter_diagnostics( + state: &Arc, + service: &MicroserviceConfig, + k3s_service_id: &str, +) -> Value { + let adapter_service_id = service + .deployment + .adapter_service_id + .as_deref() + .unwrap_or("k3sctl-adapter"); + let Some(adapter) = service_by_id(state, adapter_service_id) else { + return json!({ + "ok": false, + "serviceId": k3s_service_id, + "error": format!("k3sctl adapter microservice not found: {adapter_service_id}") + }); + }; + let adapter_path = format!( + "/api/services/{}/diagnostics", + urlencoding_like(k3s_service_id) + ); + let response = fetch_microservice_upstream_response( + state, + &adapter, + &Method::GET, + &adapter_path, + &ProxyOptions { + query: String::new(), + json_array_limits: json!({}), + }, + &json!({ "accept": "application/json" }), + String::new(), + ) + .await; + let status = response.status().as_u16(); + let (status2, content_type, body_text) = response_to_text(response).await.unwrap_or(( + status, + "application/json".to_string(), + "{}".to_string(), + )); + let body: Value = serde_json::from_str(&body_text) + .unwrap_or_else(|_| json!({ "text": truncate_text(&body_text, 4000) })); + if (200..300).contains(&status2) && body.get("ok").and_then(Value::as_bool) == Some(true) { + body + } else { + json!({ + "ok": false, + "serviceId": k3s_service_id, + "status": status2, + "contentType": content_type, + "body": body + }) + } +} + fn code_queue_scheduler_only_path(method: &Method, target_path: &str) -> bool { code_queue_k3s_service_id_for_request(method, target_path) == "code-queue-scheduler" && !(target_path == "/" || target_path == "/health") @@ -191,7 +353,10 @@ fn code_queue_fallback_workdirs_response( Some(response_with_body( 200, "application/json; charset=utf-8", - Body::from(serde_json::to_string(&fallback).unwrap_or_else(|_| "{\"ok\":true,\"workdirs\":[]}".to_string())), + Body::from( + serde_json::to_string(&fallback) + .unwrap_or_else(|_| "{\"ok\":true,\"workdirs\":[]}".to_string()), + ), )) } @@ -977,12 +1142,18 @@ async fn fetch_microservice_upstream_response( ) .await; if method == Method::GET && target_path == "/api/workdirs" { - let (status, content_type, response_body) = response_to_text(response).await.unwrap_or(( - 502, - "text/plain".to_string(), - "failed to read code-queue workdirs response".to_string(), - )); - if let Some(fallback) = code_queue_fallback_workdirs_response(service, status, &content_type, &response_body) { + let (status, content_type, response_body) = + response_to_text(response).await.unwrap_or(( + 502, + "text/plain".to_string(), + "failed to read code-queue workdirs response".to_string(), + )); + if let Some(fallback) = code_queue_fallback_workdirs_response( + service, + status, + &content_type, + &response_body, + ) { return fallback; } return response_with_body(status, &content_type, Body::from(response_body)); @@ -1522,15 +1693,63 @@ async fn code_queue_health_response( .unwrap_or_else(|_| json!({ "text": truncate_text(&scheduler_text, 4000) })); let scheduler_healthy = (200..300).contains(&scheduler_status) && scheduler_body.get("ok").and_then(Value::as_bool) != Some(false); - let status = if mgr_healthy { 200 } else { 503 }; + let scheduler_diagnostics = + code_queue_adapter_diagnostics(state, service, "code-queue-scheduler").await; + let provider_egress_diagnostics = + code_queue_adapter_diagnostics(state, service, "d601-provider-egress-proxy").await; + let tcp_egress_diagnostics = + code_queue_adapter_diagnostics(state, service, "d601-tcp-egress-gateway").await; + let scheduler_dependency = + code_queue_dependency_check("code-queue-scheduler", &scheduler_diagnostics); + let provider_egress = + code_queue_dependency_check("d601-provider-egress-proxy", &provider_egress_diagnostics); + let tcp_egress = + code_queue_dependency_check("d601-tcp-egress-gateway", &tcp_egress_diagnostics); + let postgres_route = code_queue_route_postgres_check(&scheduler_body); + let stale_reconcile = code_queue_reconcile_check(&scheduler_body); + let dependencies_ok = scheduler_dependency + .get("ok") + .and_then(Value::as_bool) + .unwrap_or(false) + && provider_egress + .get("ok") + .and_then(Value::as_bool) + .unwrap_or(false) + && tcp_egress + .get("ok") + .and_then(Value::as_bool) + .unwrap_or(false) + && postgres_route + .get("ok") + .and_then(Value::as_bool) + .unwrap_or(false) + && stale_reconcile + .get("ok") + .and_then(Value::as_bool) + .unwrap_or(false); + let ok = mgr_healthy && scheduler_healthy && dependencies_ok; + let status = if ok { 200 } else { 503 }; let body = json!({ - "ok": mgr_healthy, + "ok": ok, "service": "code-queue", "role": "hybrid-master-control-plane", + "status": if ok { "healthy" } else { "degraded" }, "checkedAt": crate::json_util::now_iso(), "manager": { "ok": mgr_healthy, "status": mgr_status, "body": mgr_body, "timedOut": false }, "trace": { "ok": mgr_healthy, "source": "code-queue-mgr", "readonlyUserConfigured": trace_body.get("readonlyUserConfigured").cloned().unwrap_or(Value::Null), "body": trace_body }, "scheduler": { "ok": scheduler_healthy, "status": scheduler_status, "body": scheduler_body, "timedOut": false, "requiredFor": ["running task steer", "running task interrupt", "scheduler claim/runner execution", "dev-container control"] }, + "checks": { + "scheduler": scheduler_dependency, + "d601ProviderEgressProxy": provider_egress, + "d601TcpEgressGateway": tcp_egress, + "postgresRoute": postgres_route, + "staleActiveReconcile": stale_reconcile + }, + "diagnostics": { + "scheduler": scheduler_diagnostics, + "d601ProviderEgressProxy": provider_egress_diagnostics, + "d601TcpEgressGateway": tcp_egress_diagnostics + }, }); if head_only { response_with_body(status, "application/json; charset=utf-8", Body::empty()) @@ -1698,8 +1917,73 @@ async fn k3sctl_managed_diagnostics_response( .get("checks") .cloned() .unwrap_or_else(|| json!({})); - let ok = - (200..300).contains(&status2) && provider_online && proxy_mode == "provider-ws-http-tunnel"; + let mut code_queue_checks = Value::Null; + let mut code_queue_dependencies = Value::Null; + let mut code_queue_adapter = Value::Null; + let mut code_queue_ok = true; + if service.id == "code-queue" { + let scheduler_body = adapter_body + .get("checks") + .and_then(|checks| checks.get("targetService")) + .and_then(|target| target.get("body")) + .cloned() + .unwrap_or(Value::Null); + let scheduler_diagnostics = + code_queue_adapter_diagnostics(state, service, "code-queue-scheduler").await; + let provider_egress_diagnostics = + code_queue_adapter_diagnostics(state, service, "d601-provider-egress-proxy").await; + let tcp_egress_diagnostics = + code_queue_adapter_diagnostics(state, service, "d601-tcp-egress-gateway").await; + let scheduler_dependency = + code_queue_dependency_check("code-queue-scheduler", &scheduler_diagnostics); + let provider_egress = + code_queue_dependency_check("d601-provider-egress-proxy", &provider_egress_diagnostics); + let tcp_egress = + code_queue_dependency_check("d601-tcp-egress-gateway", &tcp_egress_diagnostics); + let postgres_route = code_queue_route_postgres_check(&scheduler_body); + let stale_reconcile = code_queue_reconcile_check(&scheduler_body); + code_queue_ok = scheduler_dependency + .get("ok") + .and_then(Value::as_bool) + .unwrap_or(false) + && provider_egress + .get("ok") + .and_then(Value::as_bool) + .unwrap_or(false) + && tcp_egress + .get("ok") + .and_then(Value::as_bool) + .unwrap_or(false) + && postgres_route + .get("ok") + .and_then(Value::as_bool) + .unwrap_or(false) + && stale_reconcile + .get("ok") + .and_then(Value::as_bool) + .unwrap_or(false); + code_queue_checks = json!({ + "scheduler": scheduler_dependency, + "d601ProviderEgressProxy": provider_egress, + "d601TcpEgressGateway": tcp_egress, + "postgresRoute": postgres_route, + "staleActiveReconcile": stale_reconcile + }); + code_queue_dependencies = json!({ + "postgresRoute": "d601-tcp-egress-gateway.unidesk.svc.cluster.local:15432", + "requiredDeployments": ["d601-tcp-egress-gateway", "d601-provider-egress-proxy", "code-queue"], + "requiredEndpoints": ["d601-tcp-egress-gateway", "d601-provider-egress-proxy", "code-queue-scheduler"] + }); + code_queue_adapter = json!({ + "scheduler": scheduler_diagnostics, + "d601ProviderEgressProxy": provider_egress_diagnostics, + "d601TcpEgressGateway": tcp_egress_diagnostics + }); + } + let ok = (200..300).contains(&status2) + && provider_online + && proxy_mode == "provider-ws-http-tunnel" + && code_queue_ok; json_response( json!({ "ok": ok, @@ -1715,10 +1999,18 @@ async fn k3sctl_managed_diagnostics_response( "kubernetesApiServiceProxy": compact_json(adapter_checks.get("kubernetesApiServiceProxy").unwrap_or(&json!({ "ok": false, "skipped": true }))), "targetService": compact_json(adapter_checks.get("targetService").or_else(|| adapter_checks.get("managedService")).unwrap_or(&json!({ "ok": false, "skipped": true }))), }, + "codeQueue": { + "ok": code_queue_ok, + "checks": code_queue_checks, + "dependencies": code_queue_dependencies, + "adapter": code_queue_adapter + }, "adapter": adapter_body, }), - if (200..300).contains(&status2) { + if ok { 200 + } else if (200..300).contains(&status2) { + 503 } else { status2 }, diff --git a/src/components/microservices/code-queue-mgr/src-rs/main.rs b/src/components/microservices/code-queue-mgr/src-rs/main.rs index 85e8496f..aa40f4f1 100644 --- a/src/components/microservices/code-queue-mgr/src-rs/main.rs +++ b/src/components/microservices/code-queue-mgr/src-rs/main.rs @@ -1,9 +1,9 @@ #![recursion_limit = "512"] -use chrono::{SecondsFormat, Utc}; +use chrono::{DateTime, SecondsFormat, Utc}; use postgres::{Client, NoTls}; use serde_json::{json, Map, Value}; -use std::collections::VecDeque; +use std::collections::{HashSet, VecDeque}; use std::env; use std::error::Error; use std::fs::{create_dir_all, OpenOptions}; @@ -19,6 +19,7 @@ const MAX_LIST_LIMIT: i64 = 200; const MAX_OUTPUT_PAGE_LIMIT: usize = 500; const MAX_PREVIEW_CHARS: usize = 6000; const WORKDIR_MAX_CHARS: usize = 512; +const SCHEDULER_HEARTBEAT_STALE_MS: i64 = 5 * 60 * 1000; #[derive(Clone)] struct Config { @@ -556,6 +557,10 @@ fn terminal_status(status: &str) -> bool { matches!(status, "succeeded" | "failed" | "canceled") } +fn active_status(status: &str) -> bool { + matches!(status, "running" | "judging") +} + fn queued_prompt_editable(task: &TaskMeta) -> bool { task.status == "queued" && task.started_at.is_none() @@ -636,6 +641,188 @@ fn output_max_seq(task: &TaskMeta) -> i64 { task.last_output_seq.max(from_output) } +fn task_scheduler_heartbeat(task: &TaskMeta) -> Option { + task.task_json + .as_ref() + .and_then(|value| value.get("schedulerHeartbeat")) + .filter(|value| value.is_object()) + .cloned() +} + +fn parse_iso_ms(value: Option<&str>) -> Option { + value + .and_then(|text| DateTime::parse_from_rfc3339(text).ok()) + .map(|date| date.timestamp_millis()) +} + +fn heartbeat_string_field(heartbeat: &Value, key: &str) -> Option { + heartbeat.get(key).and_then(Value::as_str).map(|value| value.to_string()) +} + +fn heartbeat_task_id(heartbeat: &Value) -> String { + heartbeat_string_field(heartbeat, "taskId").unwrap_or_default() +} + +fn heartbeat_fresh(heartbeat: &Value, now_ms: i64) -> bool { + parse_iso_ms(heartbeat.get("lastLocalHeartbeatAt").and_then(Value::as_str)) + .map(|at| now_ms.saturating_sub(at) < SCHEDULER_HEARTBEAT_STALE_MS) + .unwrap_or(false) +} + +fn max_timestamp(values: Vec>) -> Value { + let mut best: Option<(String, i64)> = None; + for value in values.into_iter().flatten() { + let Some(ms) = parse_iso_ms(Some(&value)) else { continue }; + if best.as_ref().map(|(_, best_ms)| ms >= *best_ms).unwrap_or(true) { + best = Some((value, ms)); + } + } + best.map(|(value, _)| json!(value)).unwrap_or(Value::Null) +} + +fn task_has_trace_gap(task: &TaskMeta, heartbeat: &Value, now_ms: i64) -> bool { + let trace_ms = parse_iso_ms(heartbeat.get("lastPersistedTraceAt").and_then(Value::as_str)); + let output_seq = heartbeat.get("outputMaxSeq").and_then(Value::as_i64).unwrap_or_else(|| output_max_seq(task)); + trace_ms + .map(|at| now_ms.saturating_sub(at) >= SCHEDULER_HEARTBEAT_STALE_MS && output_seq > 0) + .unwrap_or(false) +} + +fn execution_diagnostics_from_tasks(tasks: &[TaskMeta], now: &str) -> Value { + let now_ms = parse_iso_ms(Some(now)).unwrap_or_else(|| Utc::now().timestamp_millis()); + let mut database_active_task_ids: Vec = tasks.iter().filter(|task| active_status(&task.status)).map(|task| task.id.clone()).collect(); + database_active_task_ids.sort(); + + let mut active_heartbeats: Vec<(String, Value)> = tasks + .iter() + .filter_map(|task| task_scheduler_heartbeat(task).map(|heartbeat| (task.id.clone(), heartbeat))) + .collect(); + active_heartbeats.sort_by(|left, right| left.0.cmp(&right.0)); + + let mut active_heartbeat_task_ids: Vec = active_heartbeats.iter().map(|(_, heartbeat)| heartbeat_task_id(heartbeat)).filter(|id| !id.is_empty()).collect(); + active_heartbeat_task_ids.sort(); + active_heartbeat_task_ids.dedup(); + + let mut heartbeat_fresh_task_ids: Vec = active_heartbeats + .iter() + .filter(|(_, heartbeat)| heartbeat_fresh(heartbeat, now_ms)) + .map(|(_, heartbeat)| heartbeat_task_id(heartbeat)) + .filter(|id| !id.is_empty()) + .collect(); + heartbeat_fresh_task_ids.sort(); + heartbeat_fresh_task_ids.dedup(); + + let mut heartbeat_expired_task_ids: Vec = active_heartbeats + .iter() + .filter(|(task_id, heartbeat)| { + tasks.iter().any(|task| &task.id == task_id && active_status(&task.status)) + && !heartbeat_fresh(heartbeat, now_ms) + }) + .map(|(_, heartbeat)| heartbeat_task_id(heartbeat)) + .filter(|id| !id.is_empty()) + .collect(); + heartbeat_expired_task_ids.sort(); + heartbeat_expired_task_ids.dedup(); + + let heartbeat_task_set: HashSet = active_heartbeat_task_ids.iter().cloned().collect(); + let heartbeat_fresh_set: HashSet = heartbeat_fresh_task_ids.iter().cloned().collect(); + let heartbeat_expired_set: HashSet = heartbeat_expired_task_ids.iter().cloned().collect(); + let heartbeat_missing_task_ids: Vec = database_active_task_ids + .iter() + .filter(|task_id| !heartbeat_task_set.contains(*task_id)) + .cloned() + .collect(); + let stale_recovery_candidate_task_ids: Vec = database_active_task_ids + .iter() + .filter(|task_id| heartbeat_expired_set.contains(*task_id)) + .cloned() + .collect(); + let mut trace_gap_task_ids: Vec = active_heartbeats + .iter() + .filter(|(task_id, heartbeat)| { + tasks.iter().find(|task| &task.id == task_id).map(|task| task_has_trace_gap(task, heartbeat, now_ms)).unwrap_or(false) + }) + .map(|(_, heartbeat)| heartbeat_task_id(heartbeat)) + .filter(|id| !id.is_empty()) + .collect(); + trace_gap_task_ids.sort(); + trace_gap_task_ids.dedup(); + let trace_gap_not_stale_task_ids: Vec = trace_gap_task_ids + .iter() + .filter(|task_id| heartbeat_fresh_set.contains(*task_id)) + .cloned() + .collect(); + + let split_brain = !database_active_task_ids.is_empty() && !heartbeat_fresh_task_ids.is_empty(); + let stale_active = !stale_recovery_candidate_task_ids.is_empty(); + let degraded = split_brain + || stale_active + || !heartbeat_missing_task_ids.is_empty() + || !heartbeat_expired_task_ids.is_empty() + || !trace_gap_task_ids.is_empty(); + let state = if split_brain { + "split-brain" + } else if stale_active { + "stale-active" + } else if degraded { + "degraded" + } else { + "healthy" + }; + let mut reasons = Vec::new(); + if split_brain { + reasons.push("postgres control-plane has database-active tasks while its local active slots are empty, but scheduler heartbeat is fresh"); + } + if stale_active { + reasons.push("owner heartbeat is expired and scheduler has no local active run for at least one database-active task"); + } + if !heartbeat_missing_task_ids.is_empty() { + reasons.push("database-active tasks are missing scheduler-owned heartbeat"); + } + if !trace_gap_not_stale_task_ids.is_empty() { + reasons.push("trace progress is stale while scheduler heartbeat is fresh; this is a trace gap, not stale active"); + } + + json!({ + "state": state, + "health": state, + "degraded": degraded, + "splitBrain": split_brain, + "executionStateSource": "postgres-control-plane", + "controlPlane": "master-code-queue-mgr", + "databaseActiveTaskIds": database_active_task_ids, + "databaseActiveTaskCount": database_active_task_ids.len(), + "schedulerActiveTaskIds": [], + "schedulerActiveTaskCount": 0, + "schedulerActiveRunSlotCount": 0, + "schedulerActiveQueueIds": [], + "schedulerProcessingQueueIds": [], + "schedulerOrphanedActiveTaskIds": database_active_task_ids, + "schedulerOrphanedActiveTaskCount": database_active_task_ids.len(), + "activeHeartbeatTaskIds": active_heartbeat_task_ids, + "activeHeartbeatCount": active_heartbeat_task_ids.len(), + "heartbeatFreshTaskIds": heartbeat_fresh_task_ids, + "heartbeatExpiredTaskIds": heartbeat_expired_task_ids, + "heartbeatMissingTaskIds": heartbeat_missing_task_ids, + "staleRecoveryCandidateTaskIds": stale_recovery_candidate_task_ids, + "traceGapTaskIds": trace_gap_task_ids, + "traceGapNotStaleTaskIds": trace_gap_not_stale_task_ids, + "schedulerHeartbeatStaleMs": SCHEDULER_HEARTBEAT_STALE_MS, + "now": now, + "lastSchedulerHeartbeatAt": max_timestamp(active_heartbeats.iter().map(|(_, heartbeat)| heartbeat_string_field(heartbeat, "lastLocalHeartbeatAt")).collect()), + "lastObservedAgentEventAt": max_timestamp(active_heartbeats.iter().map(|(_, heartbeat)| heartbeat_string_field(heartbeat, "lastObservedAgentEventAt")).collect()), + "lastPersistedTraceAt": max_timestamp(active_heartbeats.iter().map(|(_, heartbeat)| heartbeat_string_field(heartbeat, "lastPersistedTraceAt")).collect()), + "oaPublisher": Value::Null, + "reasons": reasons, + "guidance": [ + "Do not use master activeRunSlotCount=0 alone to declare a task stuck.", + "Check PostgreSQL task status, scheduler active runs/slots, scheduler heartbeat, and trace/OA publisher progress together.", + "Trace gap with a fresh scheduler heartbeat must not trigger stale retry.", + "Stale recovery is allowed only when the scheduler has no local active run and the owner heartbeat is expired." + ] + }) +} + fn last_assistant_message(task: &TaskMeta) -> Value { let final_text = final_response(task); if !final_text.trim().is_empty() { @@ -724,6 +911,7 @@ fn task_list_response(_state: &AppState, task: &TaskMeta, lite: bool) -> Value { "currentMode": task.current_mode, "judgeFailCount": task.task_json.as_ref().and_then(|value| value.get("judgeFailCount")).and_then(Value::as_i64).unwrap_or(0), "judgeFailRetryLimit": 3, + "schedulerHeartbeat": task_scheduler_heartbeat(task).unwrap_or(Value::Null), "codexThreadId": task.codex_thread_id, "activeTurnId": task.active_turn_id, "lastError": task.last_error, @@ -936,7 +1124,6 @@ fn queue_summary(state: &AppState) -> Result { let mut counts = Map::new(); let mut total = 0_i64; let mut unread_terminal_total = 0_i64; - let mut database_active_task_ids = Vec::new(); for row in rows { let id = safe_queue_id(row.get::<_, String>("id").as_str()); let status_counts = json!({ @@ -956,9 +1143,6 @@ fn queue_summary(state: &AppState) -> Result { let unread_terminal = row.get::<_, i64>("unread_terminal"); total += queue_total; unread_terminal_total += unread_terminal; - if let Some(active) = row.get::<_, Option>("active_task_id") { - database_active_task_ids.push(active); - } queues.push(json!({ "id": id, "name": row.get::<_, String>("name"), @@ -972,6 +1156,30 @@ fn queue_summary(state: &AppState) -> Result { "updatedAt": row.get::<_, Option>("updated_at") })); } + let active_rows = client + .query( + " + SELECT id, queue_id, status, provider_id, execution_mode, model, cwd, prompt, base_prompt, + reference_task_ids, reference_injection, reasoning_effort, max_attempts, current_attempt, + current_mode, codex_thread_id, active_turn_id, + to_json(created_at)#>>'{}' AS created_at, + to_json(updated_at)#>>'{}' AS updated_at, + to_json(started_at)#>>'{}' AS started_at, + to_json(finished_at)#>>'{}' AS finished_at, + to_json(read_at)#>>'{}' AS read_at, + last_error, last_judge, output_count, event_count, attempt_count, last_output_seq, + task_json - 'output' - 'events' - 'promptHistory' AS task_json + FROM unidesk_code_queue_tasks + WHERE status IN ('running', 'judging') + ORDER BY updated_at DESC, id ASC + ", + &[], + ) + .map_err(|error| error.to_string())?; + let active_tasks: Vec = active_rows.iter().map(|row| row_to_task(row, true)).collect(); + let mut database_active_task_ids: Vec = active_tasks.iter().map(|task| task.id.clone()).collect(); + database_active_task_ids.sort(); + let execution_diagnostics = execution_diagnostics_from_tasks(&active_tasks, &now_iso()); if !queues.iter().any(|queue| queue.get("id").and_then(Value::as_str) == Some(DEFAULT_QUEUE_ID)) { let now = now_iso(); queues.insert(0, json!({ @@ -1009,6 +1217,8 @@ fn queue_summary(state: &AppState) -> Result { "databaseActiveTaskIds": database_active_task_ids, "databaseActiveTaskCount": database_active_task_ids.len(), "executionStateSource": "postgres-control-plane", + "executionDiagnostics": execution_diagnostics, + "schedulerHeartbeatStaleMs": SCHEDULER_HEARTBEAT_STALE_MS, "orphanedActiveTaskIds": [], "orphanedActiveTaskCount": 0, "processing": false, @@ -1793,6 +2003,7 @@ fn task_summary(task: &TaskMeta) -> Value { "currentMode": task.current_mode, "judgeFailCount": task.task_json.as_ref().and_then(|value| value.get("judgeFailCount")).and_then(Value::as_i64).unwrap_or(0), "judgeFailRetryLimit": 3, + "schedulerHeartbeat": task_scheduler_heartbeat(task).unwrap_or(Value::Null), "codexThreadId": task.codex_thread_id, "activeTurnId": task.active_turn_id, "createdAt": task.created_at, @@ -1835,6 +2046,7 @@ fn trace_summary(task: &TaskMeta) -> Value { "stepCount": task.task_json.as_ref().and_then(|value| value.get("stepCount")).and_then(Value::as_i64).unwrap_or(outputs.len() as i64), "retainedStepCount": outputs.len(), "outputMaxSeq": output_max_seq(task), + "schedulerHeartbeat": task_scheduler_heartbeat(task).unwrap_or(Value::Null), "statsSource": "code-queue-mgr-rust-postgres", "attempts": attempts_array(task), "prompt": { "initialPreview": preview(&task.prompt, 1600), "basePreview": preview(&task.base_prompt, 1600), "chars": task.prompt.chars().count(), "lines": task.prompt.lines().count() }, @@ -2144,6 +2356,7 @@ fn health(state: &AppState) -> (Value, u16) { "schemaReady": ready, "schemaLastError": last_error, "taskCount": task_count, + "queue": if ready { queue_summary(state).unwrap_or(Value::Null) } else { Value::Null }, "resourceBudget": { "targetMemoryMb": 100, "mgrPoolMax": state.config.mgr_pool_max, diff --git a/src/components/microservices/code-queue-mgr/src/index.ts b/src/components/microservices/code-queue-mgr/src/index.ts index c0a364ec..6b99282b 100644 --- a/src/components/microservices/code-queue-mgr/src/index.ts +++ b/src/components/microservices/code-queue-mgr/src/index.ts @@ -166,6 +166,7 @@ interface QueueTask { currentMode: RunMode | null; codexThreadId: string | null; activeTurnId: string | null; + schedulerHeartbeat?: JsonRecord | null; finalResponse: string; stepCount?: number; llmStepCount?: number; @@ -287,6 +288,7 @@ const workdirMaxLength = 512; const defaultCodeModels = ["gpt-5.5", "gpt-5.4-mini", "gpt-5.4", "minimax-m2.7"]; const codeExecutionModes: CodeExecutionMode[] = ["default", "windows-native"]; const codexStatsTimeZone = "Asia/Shanghai"; +const schedulerHeartbeatStaleMs = 5 * 60 * 1000; const codexStatsDateFormatter = new Intl.DateTimeFormat("en-CA", { timeZone: codexStatsTimeZone, year: "numeric", @@ -808,6 +810,108 @@ function outputMaxSeq(task: QueueTask): number { return Math.max(0, numberField(task.outputMaxSeq, 0), ...outputSeq, ...promptSeq, ...attemptSeq); } +function activeTask(task: QueueTask): boolean { + return task.status === "running" || task.status === "judging"; +} + +function schedulerHeartbeat(task: QueueTask): JsonRecord | null { + const heartbeat = asRecord(task.schedulerHeartbeat); + return heartbeat !== null && heartbeat.taskId === task.id ? heartbeat as JsonRecord : null; +} + +function heartbeatTimestamp(heartbeat: JsonRecord | null, key: string): string | null { + const value = heartbeat?.[key]; + return typeof value === "string" && timestampMs(value) !== null ? value : null; +} + +function maxTimestamp(values: Array): string | null { + let best: string | null = null; + let bestMs = -Infinity; + for (const value of values) { + const ms = timestampMs(value ?? null); + if (ms === null || ms < bestMs) continue; + best = value ?? null; + bestMs = ms; + } + return best; +} + +function heartbeatFresh(heartbeat: JsonRecord | null, nowMs: number): boolean { + const atMs = timestampMs(heartbeatTimestamp(heartbeat, "lastLocalHeartbeatAt")); + return atMs !== null && nowMs - atMs < schedulerHeartbeatStaleMs; +} + +function traceGap(task: QueueTask, heartbeat: JsonRecord | null, nowMs: number): boolean { + const traceAt = timestampMs(heartbeatTimestamp(heartbeat, "lastPersistedTraceAt")); + return traceAt !== null && nowMs - traceAt >= schedulerHeartbeatStaleMs && outputMaxSeq(task) > 0; +} + +function controlPlaneExecutionDiagnostics(tasks: QueueTask[], now = nowIso()): JsonRecord { + const nowMs = timestampMs(now) ?? Date.now(); + const databaseActiveTaskIds = tasks.filter(activeTask).map((task) => task.id).sort(); + const heartbeatRows = tasks.map((task) => ({ task, heartbeat: schedulerHeartbeat(task) })).filter((row): row is { task: QueueTask; heartbeat: JsonRecord } => row.heartbeat !== null); + const activeHeartbeatTaskIds = heartbeatRows.map((row) => String(row.heartbeat.taskId ?? row.task.id)).sort(); + const heartbeatFreshTaskIds = heartbeatRows.filter((row) => heartbeatFresh(row.heartbeat, nowMs)).map((row) => String(row.heartbeat.taskId ?? row.task.id)).sort(); + const heartbeatFreshSet = new Set(heartbeatFreshTaskIds); + const heartbeatExpiredTaskIds = heartbeatRows.filter((row) => activeTask(row.task) && !heartbeatFreshSet.has(row.task.id)).map((row) => row.task.id).sort(); + const activeHeartbeatSet = new Set(activeHeartbeatTaskIds); + const heartbeatMissingTaskIds = databaseActiveTaskIds.filter((taskId) => !activeHeartbeatSet.has(taskId)); + const staleRecoveryCandidateTaskIds = heartbeatExpiredTaskIds.slice(); + const traceGapTaskIds = heartbeatRows.filter((row) => traceGap(row.task, row.heartbeat, nowMs)).map((row) => row.task.id).sort(); + const traceGapNotStaleTaskIds = traceGapTaskIds.filter((taskId) => heartbeatFreshSet.has(taskId)); + const splitBrain = databaseActiveTaskIds.length > 0 && heartbeatFreshTaskIds.length > 0; + const staleActive = staleRecoveryCandidateTaskIds.length > 0; + const degraded = splitBrain + || staleActive + || heartbeatMissingTaskIds.length > 0 + || heartbeatExpiredTaskIds.length > 0 + || traceGapTaskIds.length > 0; + const state = splitBrain ? "split-brain" : staleActive ? "stale-active" : degraded ? "degraded" : "healthy"; + const reasons: string[] = []; + if (splitBrain) reasons.push("postgres control-plane has database-active tasks while its local active slots are empty, but scheduler heartbeat is fresh"); + if (staleActive) reasons.push("owner heartbeat is expired and scheduler has no local active run for at least one database-active task"); + if (heartbeatMissingTaskIds.length > 0) reasons.push("database-active tasks are missing scheduler-owned heartbeat"); + if (traceGapNotStaleTaskIds.length > 0) reasons.push("trace progress is stale while scheduler heartbeat is fresh; this is a trace gap, not stale active"); + return { + state, + health: state, + degraded, + splitBrain, + executionStateSource: "postgres-control-plane", + controlPlane: "master-code-queue-mgr", + databaseActiveTaskIds, + databaseActiveTaskCount: databaseActiveTaskIds.length, + schedulerActiveTaskIds: [], + schedulerActiveTaskCount: 0, + schedulerActiveRunSlotCount: 0, + schedulerActiveQueueIds: [], + schedulerProcessingQueueIds: [], + schedulerOrphanedActiveTaskIds: databaseActiveTaskIds, + schedulerOrphanedActiveTaskCount: databaseActiveTaskIds.length, + activeHeartbeatTaskIds, + activeHeartbeatCount: activeHeartbeatTaskIds.length, + heartbeatFreshTaskIds, + heartbeatExpiredTaskIds, + heartbeatMissingTaskIds, + staleRecoveryCandidateTaskIds, + traceGapTaskIds, + traceGapNotStaleTaskIds, + schedulerHeartbeatStaleMs, + now, + lastSchedulerHeartbeatAt: maxTimestamp(heartbeatRows.map((row) => heartbeatTimestamp(row.heartbeat, "lastLocalHeartbeatAt"))), + lastObservedAgentEventAt: maxTimestamp(heartbeatRows.map((row) => heartbeatTimestamp(row.heartbeat, "lastObservedAgentEventAt"))), + lastPersistedTraceAt: maxTimestamp(heartbeatRows.map((row) => heartbeatTimestamp(row.heartbeat, "lastPersistedTraceAt"))), + oaPublisher: null, + reasons, + guidance: [ + "Do not use master activeRunSlotCount=0 alone to declare a task stuck.", + "Check PostgreSQL task status, scheduler active runs/slots, scheduler heartbeat, and trace/OA publisher progress together.", + "Trace gap with a fresh scheduler heartbeat must not trigger stale retry.", + "Stale recovery is allowed only when the scheduler has no local active run and the owner heartbeat is expired.", + ], + }; +} + function normalizeOutput(value: unknown): LiveOutput[] { if (!Array.isArray(value)) return []; return value.flatMap((item): LiveOutput[] => { @@ -876,6 +980,7 @@ function normalizeTask(value: unknown): QueueTask { currentMode: record.currentMode === "initial" || record.currentMode === "retry" ? record.currentMode : null, codexThreadId: typeof record.codexThreadId === "string" ? record.codexThreadId : null, activeTurnId: typeof record.activeTurnId === "string" ? record.activeTurnId : null, + schedulerHeartbeat: asRecord(record.schedulerHeartbeat) as JsonRecord | null, finalResponse: typeof record.finalResponse === "string" ? record.finalResponse : "", lastError: typeof record.lastError === "string" ? record.lastError : null, lastJudge: toJsonValue(record.lastJudge ?? null), @@ -1153,6 +1258,7 @@ async function createTaskFromRequest(value: unknown): Promise { currentMode: null, codexThreadId: null, activeTurnId: null, + schedulerHeartbeat: null, finalResponse: "", stepCount: 0, llmStepCount: 0, @@ -1581,6 +1687,8 @@ async function queueSummary(tasks?: QueueTask[], queueRecords?: QueueRecord[]): databaseActiveTaskIds, databaseActiveTaskCount: databaseActiveTaskIds.length, executionStateSource: "postgres-control-plane", + executionDiagnostics: controlPlaneExecutionDiagnostics(taskRows), + schedulerHeartbeatStaleMs, orphanedActiveTaskIds: [], orphanedActiveTaskCount: 0, processing: false, @@ -1817,6 +1925,7 @@ function taskListResponse(task: QueueTask, lite = true): JsonRecord { currentMode: task.currentMode, judgeFailCount: task.judgeFailCount, judgeFailRetryLimit: 3, + schedulerHeartbeat: task.schedulerHeartbeat ?? null, codexThreadId: task.codexThreadId, activeTurnId: task.activeTurnId, lastError: task.lastError, @@ -2280,6 +2389,7 @@ function traceSummary(task: QueueTask): JsonRecord { stepCount: numberField(task.stepCount ?? task.llmStepCount, steps.length), retainedStepCount: steps.length, outputMaxSeq: outputMaxSeq(task), + schedulerHeartbeat: task.schedulerHeartbeat ?? null, statsSource: "code-queue-mgr-postgres", attempts: task.attempts.map((attempt) => ({ index: attempt.index, @@ -2766,6 +2876,7 @@ async function route(req: Request): Promise { try { if (url.pathname === "/" || url.pathname === "/health" || url.pathname === "/live") { const taskCount = schemaReady ? Number((await traceSql>`SELECT COUNT(*) AS count FROM unidesk_code_queue_tasks`)[0]?.count ?? 0) : null; + const queue = schemaReady ? await queueSummary() : null; return jsonResponse({ ok: schemaReady, service: "code-queue-mgr", @@ -2780,6 +2891,7 @@ async function route(req: Request): Promise { schemaReady, schemaLastError, taskCount, + queue, resourceBudget: { targetMemoryMb: 100, mgrPoolMax: config.mgrDatabasePoolMax, diff --git a/src/components/microservices/code-queue/src/code-agent/codex.ts b/src/components/microservices/code-queue/src/code-agent/codex.ts index e83c9a2a..ecf1ae34 100644 --- a/src/components/microservices/code-queue/src/code-agent/codex.ts +++ b/src/components/microservices/code-queue/src/code-agent/codex.ts @@ -20,6 +20,7 @@ export interface CodexPortContext { queueIdOf: (task: QueueTask) => string; recordNumberField: (record: Record | null, keys: string[]) => number | null; recordStringField: (record: Record | null, keys: string[]) => string; + refreshSchedulerHeartbeat: (task: QueueTask, options?: { observedAgentEvent?: boolean; persistedTrace?: boolean }) => void; remoteAppServerCommand: (task: QueueTask) => string; windowsNativeAppServerCommand: (task: QueueTask) => string; resolveReasoningEffort: (model: string, explicit?: string | null) => string | null; @@ -322,16 +323,19 @@ function hasRecentCommandOutputDelta(task: QueueTask, itemId: string | undefined function handleNotification(task: QueueTask, message: Record, terminal: (status: TerminalStatus, error: string | null) => void): void { const method = typeof message.method === "string" ? message.method : "unknown"; const params = extractRecord(message.params); + ctx().refreshSchedulerHeartbeat(task, { observedAgentEvent: true }); ctx().addEvent(task, eventSummary(message)); if (method === "thread/started") { const threadId = extractString(extractRecord(params?.thread), "id"); if (threadId !== null) task.codexThreadId = threadId; + ctx().refreshSchedulerHeartbeat(task, { observedAgentEvent: true }); ctx().appendOutput(task, "system", `thread started ${threadId ?? "unknown"}\n`, method); return; } if (method === "turn/started") { const turnId = extractString(extractRecord(params?.turn), "id"); task.activeTurnId = turnId; + ctx().refreshSchedulerHeartbeat(task, { observedAgentEvent: true }); ctx().appendOutput(task, "system", `turn started ${turnId ?? "unknown"}\n`, method); return; } @@ -383,6 +387,7 @@ function handleNotification(task: QueueTask, message: Record, t const status = terminalStatus(String(turn?.status ?? "failed")); const error = extractRecord(turn?.error); task.activeTurnId = null; + ctx().refreshSchedulerHeartbeat(task, { observedAgentEvent: true }); ctx().appendOutput(task, status === "completed" ? "system" : "error", `turn completed status=${status ?? "unknown"}\n`, method); terminal(status, typeof error?.message === "string" ? error.message : null); } @@ -416,11 +421,17 @@ export async function runCodexTurn(task: QueueTask, prompt: string): Promise { if (terminalSeen) return; diff --git a/src/components/microservices/code-queue/src/code-agent/common.ts b/src/components/microservices/code-queue/src/code-agent/common.ts index 01677b88..1e3001bf 100644 --- a/src/components/microservices/code-queue/src/code-agent/common.ts +++ b/src/components/microservices/code-queue/src/code-agent/common.ts @@ -17,6 +17,10 @@ export interface ActiveRun { port: CodeAgentPortKind; threadId: string | null; turnId: string | null; + startedAt: string; + lastLocalHeartbeatAt: string; + lastObservedAgentEventAt: string | null; + lastPersistedTraceAt: string | null; } export interface ActiveRunSlotWaiter { diff --git a/src/components/microservices/code-queue/src/code-agent/opencode.ts b/src/components/microservices/code-queue/src/code-agent/opencode.ts index a4c810ad..de68bbb8 100644 --- a/src/components/microservices/code-queue/src/code-agent/opencode.ts +++ b/src/components/microservices/code-queue/src/code-agent/opencode.ts @@ -25,6 +25,7 @@ export interface OpenCodePortContext { providerIsMain: (providerId: string) => boolean; queueIdOf: (task: QueueTask) => string; recordStringField: (record: Record | null, keys: string[], max?: number) => string; + refreshSchedulerHeartbeat: (task: QueueTask, options?: { observedAgentEvent?: boolean; persistedTrace?: boolean }) => void; remoteHostWorkdirForTask: (task: QueueTask) => string; safePreview: (value: string, max?: number) => string; shellQuote: (value: string) => string; @@ -238,6 +239,7 @@ class OpenCodeRunClient implements CodeAgentClient { const trimmed = String(line).trim(); if (trimmed.length === 0) continue; this.lastActivityAt = Date.now(); + ctx().refreshSchedulerHeartbeat(this.task, { observedAgentEvent: true }); this.handleLine(trimmed); } } catch (error) { @@ -295,7 +297,12 @@ class OpenCodeRunClient implements CodeAgentClient { this.sessionId = sessionId; if (this.task.codexThreadId === null) this.task.codexThreadId = sessionId; const run = ctx().activeRuns.get(ctx().queueIdOf(this.task)); - if (run?.app === this) run.threadId = sessionId; + if (run?.app === this) { + run.threadId = sessionId; + run.lastLocalHeartbeatAt = ctx().nowIso(); + run.lastObservedAgentEventAt = run.lastLocalHeartbeatAt; + } + ctx().refreshSchedulerHeartbeat(this.task, { observedAgentEvent: true }); if (this.sessionAnnounced) return; const resumed = this.task.currentMode === "retry" || this.task.attempts.length > 0; ctx().appendOutput(this.task, "system", `opencode session ${resumed ? "resumed" : "started"} ${sessionId}\n`, resumed ? "opencode/session-resume" : "opencode/session-start"); @@ -363,8 +370,10 @@ async function runOpenCodeTurnOnce(task: QueueTask, prompt: string): Promise { const idleMs = Date.now() - app.lastActivityAt; diff --git a/src/components/microservices/code-queue/src/execution-diagnostics.ts b/src/components/microservices/code-queue/src/execution-diagnostics.ts new file mode 100644 index 00000000..76cc7ada --- /dev/null +++ b/src/components/microservices/code-queue/src/execution-diagnostics.ts @@ -0,0 +1,221 @@ +// 重构前 index.ts 只读参考:commit 6a04144d3f5103014f75b637d7e6bc2f45bf007f,blob 56e590c1a6b5ca7ad128bf2c992f60e46c355a58;可用 `git show 6a04144d3f5103014f75b637d7e6bc2f45bf007f:src/components/microservices/code-queue/src/index.ts` 查看。 + +import type { ActiveRun } from "./code-agent/common"; +import type { CodeQueueExecutionDiagnostics, JsonValue, QueueTask, SchedulerActiveRunHeartbeat } from "./types"; + +export const schedulerHeartbeatStaleMs = 5 * 60 * 1000; + +interface ExecutionDiagnosticsInput { + now?: string; + heartbeatStaleMs?: number; + controlPlane: string; + executionStateSource: "scheduler-execution-plane" | "postgres-control-plane"; + tasks: QueueTask[]; + activeRuns?: Iterable; + activeRunSlotCount?: number; + activeQueueIds?: string[]; + processingQueueIds?: string[]; + orphanedActiveTaskIds?: string[]; + oaPublisher?: JsonValue | null; +} + +interface StaleRecoveryInput { + now?: string; + heartbeatStaleMs?: number; + task: QueueTask; + localActive: boolean; +} + +function timestampMs(value: string | null | undefined): number | null { + if (typeof value !== "string" || value.length === 0) return null; + const time = Date.parse(value); + return Number.isFinite(time) ? time : null; +} + +function maxTimestamp(values: Array): string | null { + let best: string | null = null; + let bestMs = -Infinity; + for (const value of values) { + const ms = timestampMs(value); + if (ms === null || ms < bestMs) continue; + best = value ?? null; + bestMs = ms; + } + return best; +} + +function activeTask(task: QueueTask): boolean { + return task.status === "running" || task.status === "judging"; +} + +function taskHeartbeat(task: QueueTask): SchedulerActiveRunHeartbeat | null { + const heartbeat = task.schedulerHeartbeat; + if (heartbeat === undefined || heartbeat === null || heartbeat.taskId !== task.id) return null; + return heartbeat; +} + +function heartbeatFresh(heartbeat: SchedulerActiveRunHeartbeat | null, nowMs: number, staleMs: number): boolean { + const atMs = timestampMs(heartbeat?.lastLocalHeartbeatAt); + return atMs !== null && nowMs - atMs < staleMs; +} + +function outputMaxSeq(task: QueueTask): number { + const values = [ + Number(task.outputMaxSeq ?? 0), + ...((Array.isArray(task.output) ? task.output : []).map((item) => Number(item.seq ?? 0))), + ...((Array.isArray(task.attempts) ? task.attempts : []).map((item) => Number(item.outputEndSeq ?? 0))), + ].filter((value) => Number.isFinite(value) && value >= 0); + return values.length > 0 ? Math.max(...values.map((value) => Math.floor(value))) : 0; +} + +export function staleRecoveryCandidate(input: StaleRecoveryInput): { allowed: boolean; reason: string; heartbeatFresh: boolean; heartbeatAgeMs: number | null } { + if (!activeTask(input.task)) return { allowed: false, reason: "not-db-active", heartbeatFresh: false, heartbeatAgeMs: null }; + if (input.localActive) return { allowed: false, reason: "scheduler-local-active", heartbeatFresh: true, heartbeatAgeMs: 0 }; + const nowMs = timestampMs(input.now) ?? Date.now(); + const heartbeat = taskHeartbeat(input.task); + if (heartbeat === null) return { allowed: false, reason: "owner-heartbeat-missing", heartbeatFresh: false, heartbeatAgeMs: null }; + const heartbeatMs = timestampMs(heartbeat.lastLocalHeartbeatAt); + if (heartbeatMs === null) return { allowed: false, reason: "owner-heartbeat-missing", heartbeatFresh: false, heartbeatAgeMs: null }; + const heartbeatAgeMs = Math.max(0, nowMs - heartbeatMs); + const fresh = heartbeatAgeMs < (input.heartbeatStaleMs ?? schedulerHeartbeatStaleMs); + if (fresh) return { allowed: false, reason: "owner-heartbeat-fresh", heartbeatFresh: true, heartbeatAgeMs }; + return { allowed: true, reason: "owner-heartbeat-expired", heartbeatFresh: false, heartbeatAgeMs }; +} + +export function taskHasTraceGapButFreshHeartbeat(task: QueueTask, now: string, heartbeatStaleMs = schedulerHeartbeatStaleMs): boolean { + if (!activeTask(task)) return false; + const heartbeat = taskHeartbeat(task); + if (!heartbeatFresh(heartbeat, timestampMs(now) ?? Date.now(), heartbeatStaleMs)) return false; + const traceAt = timestampMs(heartbeat?.lastPersistedTraceAt); + if (traceAt === null) return false; + return (timestampMs(now) ?? Date.now()) - traceAt >= heartbeatStaleMs; +} + +export function buildSchedulerHeartbeat(task: QueueTask, run: ActiveRun | null, options: { + now: string; + owner: string; + schedulerInstance: string; + agentPort: string; + lastObservedAgentEventAt?: string | null; + lastPersistedTraceAt?: string | null; +}): SchedulerActiveRunHeartbeat { + const previous = taskHeartbeat(task); + return { + taskId: task.id, + queueId: task.queueId, + attempt: Number.isInteger(task.currentAttempt) && task.currentAttempt > 0 ? task.currentAttempt : 0, + activeTurnId: run?.turnId ?? task.activeTurnId ?? null, + codexThreadId: run?.threadId ?? task.codexThreadId ?? null, + owner: options.owner, + schedulerInstance: options.schedulerInstance, + executionPlane: "scheduler-execution-plane", + agentPort: run?.port ?? options.agentPort, + status: task.status, + lastLocalHeartbeatAt: options.now, + lastObservedAgentEventAt: options.lastObservedAgentEventAt ?? previous?.lastObservedAgentEventAt ?? null, + lastPersistedTraceAt: options.lastPersistedTraceAt ?? previous?.lastPersistedTraceAt ?? null, + outputMaxSeq: outputMaxSeq(task), + source: "scheduler", + }; +} + +export function buildExecutionDiagnostics(input: ExecutionDiagnosticsInput): CodeQueueExecutionDiagnostics { + const now = input.now ?? new Date().toISOString(); + const nowMs = timestampMs(now) ?? Date.now(); + const staleMs = input.heartbeatStaleMs ?? schedulerHeartbeatStaleMs; + const databaseActiveTaskIds = input.tasks.filter(activeTask).map((task) => task.id).sort(); + const activeRunRows = Array.from(input.activeRuns ?? []); + const schedulerActiveTaskIds = Array.from(new Set(activeRunRows.map((run) => run.taskId).filter(Boolean))).sort(); + const schedulerActiveTaskSet = new Set(schedulerActiveTaskIds); + const activeHeartbeats = input.tasks + .map((task) => taskHeartbeat(task)) + .filter((heartbeat): heartbeat is SchedulerActiveRunHeartbeat => heartbeat !== null); + const heartbeatFreshTaskIds = activeHeartbeats + .filter((heartbeat) => heartbeatFresh(heartbeat, nowMs, staleMs)) + .map((heartbeat) => heartbeat.taskId) + .sort(); + const heartbeatFreshSet = new Set(heartbeatFreshTaskIds); + const heartbeatExpiredTaskIds = activeHeartbeats + .filter((heartbeat) => { + const heartbeatTask = input.tasks.find((task) => task.id === heartbeat.taskId); + return heartbeatTask !== undefined && activeTask(heartbeatTask) && !heartbeatFreshSet.has(heartbeat.taskId); + }) + .map((heartbeat) => heartbeat.taskId) + .sort(); + const activeHeartbeatTaskIds = activeHeartbeats.map((heartbeat) => heartbeat.taskId).sort(); + const activeHeartbeatSet = new Set(activeHeartbeatTaskIds); + const heartbeatMissingTaskIds = databaseActiveTaskIds.filter((taskId) => !activeHeartbeatSet.has(taskId)); + const staleRecoveryCandidateTaskIds = input.tasks + .filter((task) => staleRecoveryCandidate({ task, now, heartbeatStaleMs: staleMs, localActive: schedulerActiveTaskSet.has(task.id) }).allowed) + .map((task) => task.id) + .sort(); + const traceGapTaskIds = activeHeartbeats + .filter((heartbeat) => { + const traceAt = timestampMs(heartbeat.lastPersistedTraceAt); + return traceAt !== null && nowMs - traceAt >= staleMs; + }) + .map((heartbeat) => heartbeat.taskId) + .sort(); + const traceGapNotStaleTaskIds = traceGapTaskIds.filter((taskId) => heartbeatFreshSet.has(taskId)); + const schedulerLocalMissingIds = databaseActiveTaskIds.filter((taskId) => !schedulerActiveTaskSet.has(taskId)); + const splitBrain = input.executionStateSource === "postgres-control-plane" + && databaseActiveTaskIds.length > 0 + && schedulerActiveTaskIds.length === 0 + && heartbeatFreshTaskIds.length > 0; + const staleActive = staleRecoveryCandidateTaskIds.length > 0; + const degraded = splitBrain + || staleActive + || heartbeatMissingTaskIds.length > 0 + || heartbeatExpiredTaskIds.length > 0 + || schedulerLocalMissingIds.length > 0 + || traceGapTaskIds.length > 0 + || (typeof input.oaPublisher === "object" && input.oaPublisher !== null && !Array.isArray(input.oaPublisher) && (Number((input.oaPublisher as Record).pending ?? 0) > 0 || (input.oaPublisher as Record).lastError !== null)); + const state = splitBrain ? "split-brain" : staleActive ? "stale-active" : degraded ? "degraded" : "healthy"; + const lastSchedulerHeartbeatAt = maxTimestamp(activeHeartbeats.map((heartbeat) => heartbeat.lastLocalHeartbeatAt)); + const lastObservedAgentEventAt = maxTimestamp(activeHeartbeats.map((heartbeat) => heartbeat.lastObservedAgentEventAt)); + const lastPersistedTraceAt = maxTimestamp(activeHeartbeats.map((heartbeat) => heartbeat.lastPersistedTraceAt)); + const reasons: string[] = []; + if (splitBrain) reasons.push("postgres control-plane has database-active tasks while its local active slots are empty, but scheduler heartbeat is fresh"); + if (staleActive) reasons.push("owner heartbeat is expired and scheduler has no local active run for at least one database-active task"); + if (heartbeatMissingTaskIds.length > 0) reasons.push("database-active tasks are missing scheduler-owned heartbeat"); + if (traceGapNotStaleTaskIds.length > 0) reasons.push("trace progress is stale while scheduler heartbeat is fresh; this is a trace gap, not stale active"); + const guidance = [ + "Do not use master activeRunSlotCount=0 alone to declare a task stuck.", + "Check PostgreSQL task status, scheduler active runs/slots, scheduler heartbeat, and trace/OA publisher progress together.", + "Trace gap with a fresh scheduler heartbeat must not trigger stale retry.", + "Stale recovery is allowed only when the scheduler has no local active run and the owner heartbeat is expired.", + ]; + return { + state, + health: state, + degraded, + splitBrain, + executionStateSource: input.executionStateSource, + controlPlane: input.controlPlane, + databaseActiveTaskIds, + databaseActiveTaskCount: databaseActiveTaskIds.length, + schedulerActiveTaskIds, + schedulerActiveTaskCount: schedulerActiveTaskIds.length, + schedulerActiveRunSlotCount: input.activeRunSlotCount ?? schedulerActiveTaskIds.length, + schedulerActiveQueueIds: (input.activeQueueIds ?? activeRunRows.map((run) => run.queueId)).slice().sort(), + schedulerProcessingQueueIds: (input.processingQueueIds ?? []).slice().sort(), + schedulerOrphanedActiveTaskIds: (input.orphanedActiveTaskIds ?? schedulerLocalMissingIds).slice().sort(), + schedulerOrphanedActiveTaskCount: (input.orphanedActiveTaskIds ?? schedulerLocalMissingIds).length, + activeHeartbeatTaskIds, + activeHeartbeatCount: activeHeartbeatTaskIds.length, + heartbeatFreshTaskIds, + heartbeatExpiredTaskIds, + heartbeatMissingTaskIds, + staleRecoveryCandidateTaskIds, + traceGapTaskIds, + traceGapNotStaleTaskIds, + schedulerHeartbeatStaleMs: staleMs, + now, + lastSchedulerHeartbeatAt, + lastObservedAgentEventAt, + lastPersistedTraceAt, + oaPublisher: input.oaPublisher ?? null, + reasons, + guidance, + }; +} diff --git a/src/components/microservices/code-queue/src/index.ts b/src/components/microservices/code-queue/src/index.ts index dc300f24..414b6cf0 100644 --- a/src/components/microservices/code-queue/src/index.ts +++ b/src/components/microservices/code-queue/src/index.ts @@ -25,6 +25,7 @@ import type { RunMode, RuntimeConfig, CodeQueueServiceRole, + SchedulerActiveRunHeartbeat, TaskStatus, WorkdirRecord, } from "./types"; @@ -114,6 +115,7 @@ import { taskForListResponse, transcriptChunkResponse, } from "./queue-api"; +import { buildExecutionDiagnostics, buildSchedulerHeartbeat, schedulerHeartbeatStaleMs, staleRecoveryCandidate } from "./execution-diagnostics"; import { ReferenceTaskLookupError, configureReferences, injectReferencedTaskContext, taskReferenceIds } from "./references"; import { applyOaTraceStatsToTaskJson, @@ -207,6 +209,14 @@ let shutdownRequested = false; let serviceReady = false; let sql: SqlClient = createSqlClient(); let sqlRotationInFlight: Promise | null = null; +const schedulerReconcileAudit = { + lastRunAt: null as string | null, + lastReason: null as string | null, + lastLoaded: 0, + lastChanged: 0, + lastRecovered: 0, + lastError: null as string | null, +}; function createSqlClient(): SqlClient { return postgres(config.databaseUrl, { @@ -966,6 +976,30 @@ function taskCanHaveActiveTurn(status: TaskStatus): boolean { return status === "running" || status === "judging"; } +function normalizeSchedulerHeartbeat(task: QueueTask): SchedulerActiveRunHeartbeat | null { + const heartbeat = task.schedulerHeartbeat; + if (heartbeat === null || heartbeat === undefined || typeof heartbeat !== "object") return null; + if (heartbeat.taskId !== task.id) return null; + if (typeof heartbeat.lastLocalHeartbeatAt !== "string" || timestampMs(heartbeat.lastLocalHeartbeatAt) === null) return null; + return { + taskId: task.id, + queueId: safeQueueId(heartbeat.queueId), + attempt: Number.isInteger(heartbeat.attempt) && heartbeat.attempt >= 0 ? heartbeat.attempt : Math.max(0, Number(task.currentAttempt || 0)), + activeTurnId: typeof heartbeat.activeTurnId === "string" && heartbeat.activeTurnId.length > 0 ? heartbeat.activeTurnId : null, + codexThreadId: typeof heartbeat.codexThreadId === "string" && heartbeat.codexThreadId.length > 0 ? heartbeat.codexThreadId : null, + owner: typeof heartbeat.owner === "string" && heartbeat.owner.length > 0 ? heartbeat.owner : config.instanceId, + schedulerInstance: typeof heartbeat.schedulerInstance === "string" && heartbeat.schedulerInstance.length > 0 ? heartbeat.schedulerInstance : config.instanceId, + executionPlane: "scheduler-execution-plane", + agentPort: typeof heartbeat.agentPort === "string" && heartbeat.agentPort.length > 0 ? heartbeat.agentPort : codeAgentPortForModel(task.model), + status: taskCanHaveActiveTurn(heartbeat.status) ? heartbeat.status : task.status, + lastLocalHeartbeatAt: heartbeat.lastLocalHeartbeatAt, + lastObservedAgentEventAt: typeof heartbeat.lastObservedAgentEventAt === "string" && timestampMs(heartbeat.lastObservedAgentEventAt) !== null ? heartbeat.lastObservedAgentEventAt : null, + lastPersistedTraceAt: typeof heartbeat.lastPersistedTraceAt === "string" && timestampMs(heartbeat.lastPersistedTraceAt) !== null ? heartbeat.lastPersistedTraceAt : null, + outputMaxSeq: Number.isInteger(heartbeat.outputMaxSeq) && heartbeat.outputMaxSeq >= 0 ? heartbeat.outputMaxSeq : taskViewOutputMaxSeq(task), + source: "scheduler", + }; +} + function normalizeTask(task: QueueTask): QueueTask { task.queueId = safeQueueId(task.queueId); task.queueEnteredAt = taskQueueEnteredAt(task); @@ -996,6 +1030,8 @@ function normalizeTask(task: QueueTask): QueueTask { task.stepCount = stepCount; task.llmStepCount = stepCount; task.outputMaxSeq = Math.max(normalizeTaskMetric(task.outputMaxSeq) ?? 0, taskViewOutputMaxSeq(task)); + task.schedulerHeartbeat = normalizeSchedulerHeartbeat(task); + if (!taskCanHaveActiveTurn(task.status)) task.schedulerHeartbeat = null; pruneTaskHotState(task); return task; } @@ -1044,6 +1080,29 @@ function publishQueueEvent(reason: string, queueId = ""): void { publishCodeQueueQueueUpdated(queueId, reason); } +function refreshSchedulerHeartbeat(task: QueueTask, options: { observedAgentEvent?: boolean; persistedTrace?: boolean } = {}): void { + if (!taskCanHaveActiveTurn(task.status)) { + task.schedulerHeartbeat = null; + return; + } + const run = activeRunForTask(task); + const now = nowIso(); + if (run !== null) { + run.lastLocalHeartbeatAt = now; + if (options.observedAgentEvent === true) run.lastObservedAgentEventAt = now; + if (options.persistedTrace === true) run.lastPersistedTraceAt = now; + } + const previous = normalizeSchedulerHeartbeat(task); + task.schedulerHeartbeat = buildSchedulerHeartbeat(task, run, { + now, + owner: config.instanceId, + schedulerInstance: config.instanceId, + agentPort: codeAgentPortForModel(task.model), + lastObservedAgentEventAt: options.observedAgentEvent === true ? now : run?.lastObservedAgentEventAt ?? previous?.lastObservedAgentEventAt ?? null, + lastPersistedTraceAt: options.persistedTrace === true ? now : run?.lastPersistedTraceAt ?? previous?.lastPersistedTraceAt ?? null, + }); +} + function isOpenCodeStepBoundaryMethod(method: string | undefined): boolean { return method === "opencode/step-start" || method === "opencode/step-finish"; } @@ -1228,6 +1287,8 @@ function markTaskDirty(taskId: string): void { } function persistTaskState(task: QueueTask): void { + if (taskCanHaveActiveTurn(task.status)) refreshSchedulerHeartbeat(task); + else task.schedulerHeartbeat = null; markTaskDirty(task.id); persistState(false); publishTaskOaEvent(task, "persist"); @@ -2567,6 +2628,7 @@ function appendPromptHistory(task: QueueTask, output: LiveOutput | null, method: function addEvent(task: QueueTask, event: CodexEventSummary): void { task.events.push(event); if (config.maxInMemoryEventRecords > 0 && task.events.length > config.maxInMemoryEventRecords) task.events.splice(0, task.events.length - config.maxInMemoryEventRecords); + refreshSchedulerHeartbeat(task, { observedAgentEvent: true }); markTaskDirty(task.id); publishTaskOaEvent(task, "agent-event", { onlyStepChange: true }); } @@ -2757,6 +2819,7 @@ configureTaskOutput({ markTaskDirty, nowIso, onOutputAppended: (task, output, op) => { + refreshSchedulerHeartbeat(task, { persistedTrace: true }); const archiveOp = op === "append" ? "append" : "set"; const stepChanged = recordTaskOutputMetrics(task, output, archiveOp); const projectionOutput = traceStepOutputForProjection(task, output); @@ -2832,12 +2895,14 @@ configureQueueApi({ parseTextLimit, processing: () => processing, processingQueues, + oaEventPublisherStatus: () => oaEventPublisherStatus() as JsonValue, queueHeadTask, queueIdOf, queues: () => state.queues, queueTaskIsRunnable, queuedStatusReason, queuedTaskPromptEditable, + reconcileStatus: schedulerReconcileStatus, runGarbageCollection, safeQueueId, safeQueueName, @@ -2908,6 +2973,7 @@ configureCodexPort({ queueIdOf, recordNumberField, recordStringField, + refreshSchedulerHeartbeat, remoteAppServerCommand, windowsNativeAppServerCommand, resolveReasoningEffort, @@ -2932,6 +2998,7 @@ configureOpenCodePort({ providerIsMain, queueIdOf, recordStringField, + refreshSchedulerHeartbeat, remoteHostWorkdirForTask, safePreview, shellQuote, @@ -3433,7 +3500,41 @@ function orphanedActiveTaskAgeMs(task: QueueTask): number { function taskIsRecoverableOrphanedActive(task: QueueTask, staleMs = orphanedActiveTaskRecoveryStaleMs): boolean { if (task.status !== "running" && task.status !== "judging") return false; if (activeTaskHasLocalRunSlotOrWaiter(task)) return false; - return orphanedActiveTaskAgeMs(task) >= staleMs; + const decision = staleRecoveryCandidate({ task, localActive: false, heartbeatStaleMs: staleMs, now: nowIso() }); + if (decision.allowed) return true; + if (decision.reason === "owner-heartbeat-missing") return orphanedActiveTaskAgeMs(task) >= staleMs; + return false; +} + +function schedulerReconcileStatus(tasks: QueueTask[] = state.tasks): JsonValue { + const orphanedActiveTasks = tasks + .filter((task) => (task.status === "running" || task.status === "judging") && !activeTaskHasLocalRunSlotOrWaiter(task)) + .sort((left, right) => left.id.localeCompare(right.id)); + const recoverableOrphanedActiveTasks = orphanedActiveTasks + .filter((task) => taskIsRecoverableOrphanedActive(task)) + .sort((left, right) => left.id.localeCompare(right.id)); + const retryWaitTaskIds = tasks + .filter((task) => task.status === "retry_wait") + .map((task) => task.id) + .sort(); + return { + ok: recoverableOrphanedActiveTasks.length === 0, + schedulerEnabled: config.schedulerEnabled, + pollIntervalMs: config.schedulerPollIntervalMs, + staleActiveRecoveryStaleMs: orphanedActiveTaskRecoveryStaleMs, + orphanedActiveTaskCount: orphanedActiveTasks.length, + orphanedActiveTaskIds: orphanedActiveTasks.map((task) => task.id), + recoverableOrphanedActiveTaskCount: recoverableOrphanedActiveTasks.length, + recoverableOrphanedActiveTaskIds: recoverableOrphanedActiveTasks.map((task) => task.id), + retryWaitTaskCount: retryWaitTaskIds.length, + retryWaitTaskIds, + lastRunAt: schedulerReconcileAudit.lastRunAt, + lastReason: schedulerReconcileAudit.lastReason, + lastLoaded: schedulerReconcileAudit.lastLoaded, + lastChanged: schedulerReconcileAudit.lastChanged, + lastRecovered: schedulerReconcileAudit.lastRecovered, + lastError: schedulerReconcileAudit.lastError, + }; } function clearInactiveActiveTurnIds(): number { @@ -3485,6 +3586,8 @@ function queueActiveTasksForRestartRetry(reason: string, method: string, options } async function recoverOrphanedActiveTasks(reason: string, method: string, options: { taskIds?: Set; staleMs?: number } = {}): Promise { + schedulerReconcileAudit.lastRunAt = nowIso(); + schedulerReconcileAudit.lastReason = reason; const staleMs = options.staleMs ?? orphanedActiveTaskRecoveryStaleMs; const staleOrphanTaskIds = new Set( state.tasks @@ -3492,8 +3595,13 @@ async function recoverOrphanedActiveTasks(reason: string, method: string, option .filter((task) => taskIsRecoverableOrphanedActive(task, staleMs)) .map((task) => task.id), ); - if (staleOrphanTaskIds.size === 0) return 0; + if (staleOrphanTaskIds.size === 0) { + schedulerReconcileAudit.lastRecovered = 0; + schedulerReconcileAudit.lastError = null; + return 0; + } const recovered = queueActiveTasksForRestartRetry(reason, method, { taskIds: staleOrphanTaskIds }); + schedulerReconcileAudit.lastRecovered = recovered; if (recovered === 0) return 0; logger("warn", "scheduler_orphaned_active_tasks_requeued", { reason, @@ -3502,6 +3610,7 @@ async function recoverOrphanedActiveTasks(reason: string, method: string, option dirtyTaskCount: dirtyDatabaseTaskIds.size, }); await flushDirtyTasksToDatabase(true); + schedulerReconcileAudit.lastError = null; logger("info", "scheduler_orphaned_active_tasks_flushed", { reason, recovered }); scheduleQueue(); return recovered; @@ -3567,6 +3676,7 @@ async function runTask(task: QueueTask): Promise { task.readAt = null; task.finishedAt = null; task.updatedAt = startedAt; + refreshSchedulerHeartbeat(task); let claimed = false; try { claimed = await claimTaskInDatabase(task, claimQueueId); @@ -4053,14 +4163,25 @@ function mergeSchedulerDatabaseTasks(tasks: QueueTask[]): number { async function refreshSchedulerTasksFromDatabase(reason: string): Promise { if (!databaseReady || !config.schedulerEnabled) return 0; - const tasks = await loadTasksFromDatabase("hot"); - const changed = mergeSchedulerDatabaseTasks(tasks); - const recovered = await recoverOrphanedActiveTasks("Scheduler recovered database active task without local run", `scheduler/${reason}-recovery`); - if (changed > 0) { - logger("info", "scheduler_database_hot_tasks_refreshed", { reason, changed, loaded: tasks.length }); - scheduleQueue(); + schedulerReconcileAudit.lastRunAt = nowIso(); + schedulerReconcileAudit.lastReason = reason; + try { + const tasks = await loadTasksFromDatabase("hot"); + const changed = mergeSchedulerDatabaseTasks(tasks); + const recovered = await recoverOrphanedActiveTasks("Scheduler recovered database active task without local run", `scheduler/${reason}-recovery`); + schedulerReconcileAudit.lastLoaded = tasks.length; + schedulerReconcileAudit.lastChanged = changed; + schedulerReconcileAudit.lastRecovered = recovered; + schedulerReconcileAudit.lastError = null; + if (changed > 0) { + logger("info", "scheduler_database_hot_tasks_refreshed", { reason, changed, loaded: tasks.length }); + scheduleQueue(); + } + return changed + recovered; + } catch (error) { + schedulerReconcileAudit.lastError = databaseErrorMessage(error); + throw error; } - return changed + recovered; } function startSchedulerDatabasePoller(): void { @@ -4081,6 +4202,23 @@ function activeRunForTask(task: QueueTask): ActiveRun | null { return Array.from(activeRuns.values()).find((run) => run.taskId === task.id) ?? null; } +function executionDiagnosticsForTasks(tasks: QueueTask[] = state.tasks): JsonValue { + return buildExecutionDiagnostics({ + controlPlane: "D601-code-queue-scheduler", + executionStateSource: "scheduler-execution-plane", + tasks, + activeRuns: activeRuns.values(), + activeRunSlotCount: activeRunSlotCount(), + activeQueueIds: activeRunSlotQueueIds(), + processingQueueIds: Array.from(processingQueues), + orphanedActiveTaskIds: tasks + .filter((task) => (task.status === "running" || task.status === "judging") && activeRunForTask(task) === null) + .map((task) => task.id), + oaPublisher: oaEventPublisherStatus() as JsonValue, + heartbeatStaleMs: schedulerHeartbeatStaleMs, + }) as unknown as JsonValue; +} + function installShutdownHandlers(): void { const stop = (signal: NodeJS.Signals): void => { if (shutdownRequested) process.exit(0); @@ -5053,6 +5191,7 @@ async function route(req: Request): Promise { schedulerEnabled: config.schedulerEnabled, schedulerPollIntervalMs: config.schedulerPollIntervalMs, queue: queueSummary(false, state.tasks), + executionDiagnostics: executionDiagnosticsForTasks(state.tasks), egressProxy: await providerGatewayEgressProxyStatus(), oaEventPublisher: oaEventPublisherStatus(), startedAt: serviceStartedAt, diff --git a/src/components/microservices/code-queue/src/queue-api.ts b/src/components/microservices/code-queue/src/queue-api.ts index 64d83743..c1aae1b9 100644 --- a/src/components/microservices/code-queue/src/queue-api.ts +++ b/src/components/microservices/code-queue/src/queue-api.ts @@ -6,13 +6,14 @@ import { claudeQqNotificationOutboxStats, notificationTargetConfigured, notifica import { executionModeOptions, executionProviderOptions } from "./provider-runtime"; import { taskFullOutput } from "./task-output"; import { applyOaTraceStatsToTaskJson, taskScopeId, type OaTraceStats } from "./oa-events"; +import { buildExecutionDiagnostics, schedulerHeartbeatStaleMs } from "./execution-diagnostics"; import { buildCompactTaskTranscript, buildTaskTranscript, cachedPreviewTranscript, fullTranscript, prefixPreview, safePreview, statsDaysFromUrl, taskForCompactMetaResponse, taskForMetaResponse, taskStatisticsSummary, taskTiming, timestampMs } from "./task-view"; import { userPromptForDisplay } from "./prompts"; import type { ActiveRun, ActiveRunSlotWaiter } from "./code-agent/common"; import type { JsonValue, QueueRecord, QueuedStatusReason, QueueTask, RuntimeConfig, TaskStatus, TranscriptLine } from "./types"; export interface QueueApiContext { - config: Pick; + config: Pick; activeRunSlotQueueIds: () => string[]; activeRunSlotWaiterSummaries: () => JsonValue[]; activeRuns: Map; @@ -34,12 +35,14 @@ export interface QueueApiContext { parseTextLimit: (url: URL) => number; processing: () => boolean; processingQueues: Set; + oaEventPublisherStatus: () => JsonValue; queueHeadTask: (queueId: string, tasks?: QueueTask[]) => QueueTask | null; queueIdOf: (task: QueueTask) => string; queues: () => QueueRecord[]; queueTaskIsRunnable: (task: QueueTask) => boolean; queuedStatusReason: (task: QueueTask, tasks?: QueueTask[]) => QueuedStatusReason | null; queuedTaskPromptEditable: (task: QueueTask) => boolean; + reconcileStatus: (tasks: QueueTask[]) => JsonValue; runGarbageCollection: () => void; safeQueueId: (value: unknown) => string; safeQueueName: (value: unknown, queueId: string) => string; @@ -258,6 +261,7 @@ function taskForListResponse(task: QueueTask, lite = false, queueTasks?: QueueTa currentMode: task.currentMode, judgeFailCount: task.judgeFailCount, judgeFailRetryLimit: ctx().judgeFailRetryLimit, + schedulerHeartbeat: task.schedulerHeartbeat ?? null, codexThreadId: task.codexThreadId, activeTurnId: task.activeTurnId, lastError: task.lastError, @@ -314,6 +318,7 @@ function taskForListResponse(task: QueueTask, lite = false, queueTasks?: QueueTa currentMode: task.currentMode, judgeFailCount: task.judgeFailCount, judgeFailRetryLimit: ctx().judgeFailRetryLimit, + schedulerHeartbeat: task.schedulerHeartbeat ?? null, codexThreadId: task.codexThreadId, activeTurnId: task.activeTurnId, lastError: task.lastError, @@ -410,6 +415,18 @@ function queueSummary(includeDevReady = true, tasks: QueueTask[] = ctx().tasks() const activeTaskIds = Array.from(activeTaskIdSet).sort(); const activeTaskId = activeTaskIds[0] ?? null; const queues = perQueueSummaries(tasks, queueRecords); + const executionDiagnostics = buildExecutionDiagnostics({ + controlPlane: "D601-code-queue-scheduler", + executionStateSource: "scheduler-execution-plane", + tasks, + activeRuns: ctx().activeRuns.values(), + activeRunSlotCount: activeRunSlots.length, + activeQueueIds: activeRunSlots, + processingQueueIds: Array.from(ctx().processingQueues), + orphanedActiveTaskIds, + oaPublisher: ctx().oaEventPublisherStatus(), + heartbeatStaleMs: schedulerHeartbeatStaleMs, + }); const summary: Record = { total: tasks.length, defaultQueueId: ctx().defaultQueueId, @@ -424,6 +441,8 @@ function queueSummary(includeDevReady = true, tasks: QueueTask[] = ctx().tasks() orphanedActiveTaskIds, orphanedActiveTaskCount: orphanedActiveTaskIds.length, processing: ctx().processing(), + executionDiagnostics: executionDiagnostics as unknown as JsonValue, + schedulerHeartbeatStaleMs, counts, unreadTerminal, judgeConfigured: ctx().config.minimaxApiKey.length > 0, @@ -489,6 +508,7 @@ function queueSummary(includeDevReady = true, tasks: QueueTask[] = ctx().tasks() lastError: ctx().codexSqliteLogExporter().lastError, }, }, + reconcile: ctx().reconcileStatus(tasks), }; if (includeDevReady) summary.devReady = ctx().collectDevReady(); return summary; diff --git a/src/components/microservices/code-queue/src/task-view.ts b/src/components/microservices/code-queue/src/task-view.ts index acb6ed50..1241e7aa 100644 --- a/src/components/microservices/code-queue/src/task-view.ts +++ b/src/components/microservices/code-queue/src/task-view.ts @@ -1463,6 +1463,7 @@ function taskForMetaResponse(task: QueueTask): JsonValue { judgeFailRetryLimit, codexThreadId: task.codexThreadId, activeTurnId: task.activeTurnId, + schedulerHeartbeat: task.schedulerHeartbeat ?? null, finalResponse: task.finalResponse, lastError: task.lastError, lastJudge: task.lastJudge, @@ -1533,6 +1534,7 @@ function taskForCompactMetaResponse(task: QueueTask): JsonValue { judgeFailRetryLimit, codexThreadId: task.codexThreadId, activeTurnId: task.activeTurnId, + schedulerHeartbeat: task.schedulerHeartbeat ?? null, finalResponse: prefixPreview(task.finalResponse, 1200), lastError: task.lastError, lastJudge: task.lastJudge, @@ -2312,6 +2314,7 @@ function taskTraceSummaryResponse(task: QueueTask, oaTraceStats: JsonValue | nul updatedAt: task.updatedAt, currentAttempt: task.currentAttempt, maxAttempts: task.maxAttempts, + schedulerHeartbeat: task.schedulerHeartbeat ?? null, stepCount, llmStepCount, promptEditable: ctx().queuedTaskPromptEditable(task), diff --git a/src/components/microservices/code-queue/src/types.ts b/src/components/microservices/code-queue/src/types.ts index b704e299..644bf163 100644 --- a/src/components/microservices/code-queue/src/types.ts +++ b/src/components/microservices/code-queue/src/types.ts @@ -30,6 +30,69 @@ export type TerminalStatus = "completed" | "interrupted" | "failed" | null; export type TranscriptKind = "ran" | "explored" | "edited" | "plan" | "message" | "system" | "error"; +export type CodeQueueExecutionPlane = "scheduler-execution-plane" | "postgres-control-plane"; + +export type CodeQueueExecutionHealth = "healthy" | "degraded" | "split-brain" | "stale-active"; + +export type CodeQueueRecoveryCandidateReason = + | "scheduler-local-active" + | "owner-heartbeat-fresh" + | "owner-heartbeat-expired" + | "owner-heartbeat-missing" + | "not-db-active"; + +export interface SchedulerActiveRunHeartbeat { + taskId: string; + queueId: string; + attempt: number; + activeTurnId: string | null; + codexThreadId: string | null; + owner: string; + schedulerInstance: string; + executionPlane: "scheduler-execution-plane"; + agentPort: string; + status: TaskStatus; + lastLocalHeartbeatAt: string; + lastObservedAgentEventAt: string | null; + lastPersistedTraceAt: string | null; + outputMaxSeq: number; + source: "scheduler"; +} + +export interface CodeQueueExecutionDiagnostics { + state: CodeQueueExecutionHealth; + health: CodeQueueExecutionHealth; + degraded: boolean; + splitBrain: boolean; + executionStateSource: CodeQueueExecutionPlane; + controlPlane: string; + databaseActiveTaskIds: string[]; + databaseActiveTaskCount: number; + schedulerActiveTaskIds: string[]; + schedulerActiveTaskCount: number; + schedulerActiveRunSlotCount: number; + schedulerActiveQueueIds: string[]; + schedulerProcessingQueueIds: string[]; + schedulerOrphanedActiveTaskIds: string[]; + schedulerOrphanedActiveTaskCount: number; + activeHeartbeatTaskIds: string[]; + activeHeartbeatCount: number; + heartbeatFreshTaskIds: string[]; + heartbeatExpiredTaskIds: string[]; + heartbeatMissingTaskIds: string[]; + staleRecoveryCandidateTaskIds: string[]; + traceGapTaskIds: string[]; + traceGapNotStaleTaskIds: string[]; + schedulerHeartbeatStaleMs: number; + now: string; + lastSchedulerHeartbeatAt: string | null; + lastObservedAgentEventAt: string | null; + lastPersistedTraceAt: string | null; + oaPublisher: JsonValue | null; + reasons: string[]; + guidance: string[]; +} + export interface RuntimeConfig { host: string; port: number; @@ -330,6 +393,7 @@ export interface QueueTask { currentMode: RunMode | null; codexThreadId: string | null; activeTurnId: string | null; + schedulerHeartbeat?: SchedulerActiveRunHeartbeat | null; finalResponse: string; stepCount?: number; llmStepCount?: number; diff --git a/src/components/microservices/k3sctl-adapter/k3s/code-queue.k3s.json b/src/components/microservices/k3sctl-adapter/k3s/code-queue.k3s.json index 1dd94aff..486c2e49 100644 --- a/src/components/microservices/k3sctl-adapter/k3s/code-queue.k3s.json +++ b/src/components/microservices/k3sctl-adapter/k3s/code-queue.k3s.json @@ -16,7 +16,8 @@ "route": { "kind": "kubernetes-service", "serviceName": "code-queue-scheduler", - "servicePort": 4222 + "servicePort": 4222, + "deploymentName": "code-queue" }, "activeInstanceId": "D601", "singleWriter": true, @@ -127,7 +128,8 @@ "route": { "kind": "kubernetes-service", "serviceName": "code-queue-scheduler", - "servicePort": 4222 + "servicePort": 4222, + "deploymentName": "code-queue" }, "activeInstanceId": "D601-scheduler", "singleWriter": true, @@ -146,5 +148,81 @@ ], "requireAllInstancesHealthy": false } + }, + { + "apiVersion": "unidesk.ai/k3s/v1", + "kind": "ManagedKubernetesService", + "metadata": { + "name": "d601-provider-egress-proxy", + "namespace": "unidesk" + }, + "spec": { + "adapterServiceId": "k3sctl-adapter", + "controlPlane": { + "type": "kubernetes", + "cluster": "unidesk-k3s", + "context": "unidesk-k3s" + }, + "route": { + "kind": "kubernetes-service", + "serviceName": "d601-provider-egress-proxy", + "servicePort": 18789, + "deploymentName": "d601-provider-egress-proxy" + }, + "activeInstanceId": "D601-provider-egress", + "singleWriter": false, + "expectedNodeIds": [ + "D601" + ], + "instances": [ + { + "id": "D601-provider-egress", + "nodeId": "D601", + "role": "primary", + "baseUrl": "kubernetes://unidesk/services/d601-provider-egress-proxy:18789", + "healthPath": "/__unidesk/egress-proxy/health", + "healthMode": "pod-ready" + } + ], + "requireAllInstancesHealthy": false + } + }, + { + "apiVersion": "unidesk.ai/k3s/v1", + "kind": "ManagedKubernetesService", + "metadata": { + "name": "d601-tcp-egress-gateway", + "namespace": "unidesk" + }, + "spec": { + "adapterServiceId": "k3sctl-adapter", + "controlPlane": { + "type": "kubernetes", + "cluster": "unidesk-k3s", + "context": "unidesk-k3s" + }, + "route": { + "kind": "kubernetes-service", + "serviceName": "d601-tcp-egress-gateway", + "servicePort": 18080, + "deploymentName": "d601-tcp-egress-gateway" + }, + "activeInstanceId": "D601-tcp-egress", + "singleWriter": false, + "expectedNodeIds": [ + "D601" + ], + "instances": [ + { + "id": "D601-tcp-egress", + "nodeId": "D601", + "role": "primary", + "baseUrl": "kubernetes://unidesk/services/d601-tcp-egress-gateway:18080", + "healthPath": "/health", + "healthMode": "service-proxy" + } + ], + "requireAllInstancesHealthy": false + } } ] diff --git a/src/components/microservices/k3sctl-adapter/src/index.ts b/src/components/microservices/k3sctl-adapter/src/index.ts index 1000dfac..e8a9d17f 100644 --- a/src/components/microservices/k3sctl-adapter/src/index.ts +++ b/src/components/microservices/k3sctl-adapter/src/index.ts @@ -430,6 +430,10 @@ function nativeServiceRef(service: ManagedService): { namespace: string; service }; } +function deploymentName(service: ManagedService): string { + return routeString(service, "deploymentName", service.id); +} + function serviceProxyApiPath(service: ManagedService, targetPath: string): string { const { serviceName, servicePort } = nativeServiceRef(service); const safeTargetPath = targetPath.startsWith("/") ? targetPath : `/${targetPath}`; @@ -922,6 +926,7 @@ async function probeEndpoint(endpoint: ManagedEndpoint): Promise { async function probeKubernetesServiceActive(service: ManagedService): Promise { const endpoint = activeEndpoint(service); + if (endpoint.healthMode === "pod-ready") return await probeKubernetesPodReady(service, endpoint); return probeKubernetesEndpoint(service, endpoint, true); } @@ -995,6 +1000,68 @@ function podSummary(item: unknown): JsonRecord { }; } +function conditionTrue(item: unknown, type: string): boolean { + const conditions = jsonAtPath(item, "status.conditions"); + return Array.isArray(conditions) && conditions.some((condition) => { + const record = typeof condition === "object" && condition !== null ? condition as Record : {}; + return record.type === type && record.status === "True"; + }); +} + +function integerAtPath(item: unknown, path: string): number { + const value = jsonAtPath(item, path); + return typeof value === "number" && Number.isFinite(value) ? Math.floor(value) : 0; +} + +async function deploymentAvailabilityCheck(service: ManagedService): Promise { + const name = deploymentName(service); + const namespace = service.namespace; + try { + const deployment = await kubeApiJson(`/apis/apps/v1/namespaces/${encodeURIComponent(namespace)}/deployments/${encodeURIComponent(name)}`, config.healthTimeoutMs); + const desiredReplicas = Math.max(1, integerAtPath(deployment, "spec.replicas")); + const availableReplicas = integerAtPath(deployment, "status.availableReplicas"); + const readyReplicas = integerAtPath(deployment, "status.readyReplicas"); + const updatedReplicas = integerAtPath(deployment, "status.updatedReplicas"); + const unavailableReplicas = integerAtPath(deployment, "status.unavailableReplicas"); + const available = conditionTrue(deployment, "Available") || availableReplicas >= desiredReplicas; + return { + ok: available, + name, + namespace, + available, + desiredReplicas, + availableReplicas, + readyReplicas, + updatedReplicas, + unavailableReplicas, + }; + } catch (error) { + return { ok: false, name, namespace, available: false, error: errorToJson(error) }; + } +} + +async function endpointNonEmptyCheck(service: ManagedService): Promise { + const { namespace, serviceName } = nativeServiceRef(service); + try { + const endpoints = await kubeApiJson(`/api/v1/namespaces/${encodeURIComponent(namespace)}/endpoints/${encodeURIComponent(serviceName)}`, config.healthTimeoutMs); + const subsets = Array.isArray(endpoints.subsets) ? endpoints.subsets : []; + const addresses = subsets.flatMap((subset) => { + const record = typeof subset === "object" && subset !== null ? subset as Record : {}; + return Array.isArray(record.addresses) ? record.addresses : []; + }); + const readyAddressCount = addresses.length; + return { + ok: readyAddressCount > 0, + serviceName, + namespace, + readyAddressCount, + subsetCount: subsets.length, + }; + } catch (error) { + return { ok: false, serviceName, namespace, readyAddressCount: 0, error: errorToJson(error) }; + } +} + async function probeKubernetesPodReady(service: ManagedService, endpoint: ManagedEndpoint): Promise { const checkedAt = new Date().toISOString(); const { namespace } = kubernetesEndpointServiceRef(service, endpoint); @@ -1195,6 +1262,8 @@ async function serviceDiagnostics(service: ManagedService): Promise status: "unhealthy", error: errorToJson(error), } satisfies JsonRecord)); + const deployment = await deploymentAvailabilityCheck(service); + const endpoint = await endpointNonEmptyCheck(service); const kubernetesApiServiceProxyOk = kubernetesApiServiceProxy.ok === true; const targetServiceOk = targetService.ok === true; const checks = { @@ -1212,8 +1281,11 @@ async function serviceDiagnostics(service: ManagedService): Promise }, targetService, managedService, + deployment, + endpoint, } satisfies Record; - const ok = kubernetesApiServiceProxyOk && targetServiceOk; + const managedServiceOk = managedService.ok === true; + const ok = kubernetesApiServiceProxyOk && (targetServiceOk || managedServiceOk) && deployment.ok === true && endpoint.ok === true; return { ok, service: "k3sctl-adapter",