feat: add code queue execution plane inspection

This commit is contained in:
Codex
2026-05-24 05:55:06 +00:00
parent 13644484a4
commit 63d77a3790
8 changed files with 1221 additions and 4 deletions
+1 -1
View File
@@ -68,7 +68,7 @@ UniDesk 是一个以主 server 为统一入口的分布式工作平台;本文
- `bun scripts/cli.ts hwlab cd audit --env dev` / `status|preflight|apply --dry-run`HWLAB DEV CD 指挥侧 wrapper,通过 D601 provider SSH 调用 HWLAB repo-owned `scripts/dev-cd-apply.mjs` 并强制原生 k3s kubeconfig`audit` 在 CD 恢复后只读分类 control-plane、SecretRef、registry、Lease、artifact/workload、16666/16667 和 DB/runtime durability 阻塞,真实 apply 只暴露 host-commander-only 命令形状,规则见 `docs/reference/hwlab.md`
- `bun scripts/cli.ts ci install/status/run/publish-backend-core/publish-user-service/run-dev-e2e/logs`:在 D601 原生 k3s 上安装和运行 Tekton CI,支持每 commit 检查、Code Queue 只读性能门禁、`CI.json` catalog 驱动的 backend-core 与 user-service commit-pinned 镜像发布和手动触发的 `origin/master:deploy.json#environments.dev` 临时 namespace e2ecatalog/producer/consumer 分工见 `docs/reference/cicd-standardization.md``run-dev-e2e` 的 Git 控制 runner、短 launcher 和 no-CD 边界见 `docs/reference/dev-ci-runner.md`Tekton 规则见 `docs/reference/ci.md`
- `bun scripts/cli.ts codex deploy <commitId>`:旧 Code Queue 兼容部署入口已禁用,原因是它会绕过受控部署边界直连 D601 部署 Code Queue;规则见 `docs/reference/codex-deploy.md`
- `bun scripts/cli.ts codex prompt-lint [prompt|--prompt-file path|--prompt-stdin]` / `codex submit [prompt] [--prompt-file path|--prompt-stdin] [--queue <id>]` / `codex pr-preflight [--remote]``prompt-lint` 在派发/steer 前 dry-run 检查 runner prompt 的 DEV 测试授权分级(`read-only`/`live-read`/`live-mutating`)且不回显 prompt`submit --dry-run` 同时给出 MiniMax/GPT/人工路由建议、该 lint 结果和 requested/effective execution mode;真实提交成功只返回写入确认、task id、服务级 runnerPermissions 和后续查看命令,不回显 prompt;`pr-preflight` 只读检查 D601 scheduler/runner 的 GitHub token、egress 和 PR 能力,PR 型派单前必须使用,规则见 `docs/reference/cli.md``docs/reference/code-queue-supervision.md`
- `bun scripts/cli.ts codex prompt-lint [prompt|--prompt-file path|--prompt-stdin]` / `codex submit [prompt] [--prompt-file path|--prompt-stdin] [--queue <id>]` / `codex execution-plane [--full|--raw]` / `codex pr-preflight [--remote]``prompt-lint` 在派发/steer 前 dry-run 检查 runner prompt 的 DEV 测试授权分级(`read-only`/`live-read`/`live-mutating`)且不回显 prompt`submit --dry-run` 同时给出 MiniMax/GPT/人工路由建议、该 lint 结果和 requested/effective execution mode;真实提交成功只返回写入确认、task id、服务级 runnerPermissions 和后续查看命令,不回显 prompt;`execution-plane` 只读比较 D601 原生 k3s 正式 Code Queue 执行面、旧 Compose 残留、commit/digest/worktree/probe drift`pr-preflight` 只读检查 D601 scheduler/runner 的 GitHub token、egress 和 PR 能力,PR 型派单前必须使用,规则见 `docs/reference/cli.md``docs/reference/code-queue-supervision.md`
- `bun scripts/cli.ts codex task <taskId>`:按 Code Queue 任务 ID 查询默认审阅摘要,只返回原始 prompt、最终 response、最后错误和渐进披露命令;`codex tasks --view commander` 是 host commander 推荐轮询入口,默认有界显示 active runner 精确计数、queued/retry_wait、terminal-unread、active 风险、分类和 drill-down 命令;`--view supervisor|full``codex output` 和大 `--limit` 仍默认有界,完整内容需显式 `--full`/`--full-text`/分页展开;`codex queues [--full] [--limit N] [--page N|--offset N]` 默认分页低噪声输出队列摘要,完整 upstream 只通过 raw command 显式获取。
- `bun scripts/cli.ts codex unread [--repo owner/name] [--issue N] [--limit N]`:只读汇总完成未读积压并给出 repo/issue/status/queue 计数和 drill-down/read 命令;批量已读必须显式 `codex unread mark-read ... --confirm`,规则见 `docs/reference/cli.md`
- `bun scripts/cli.ts codex judge <taskId> --attempt <n> [--dry-run]`:按指定 task/attempt 用与队列 worker 相同的上下文构建和 MiniMax judge 调用路径单步复现完成判定;`--dry-run` 只输出 prompt/payload 诊断。
+1
View File
@@ -280,6 +280,7 @@ replacement runner 只用于方向明显错误、质量不可接受、原 task
- `bun scripts/cli.ts codex tasks --view commander --limit N`host commander 轮询的推荐入口。输出是有界 action map,必须直接显示 `activeRunners.count`、计数来源、split-brain/heartbeat 处置、queued/retry_wait 精确计数、terminal-unread 总数和已省略行数、active 风险数、stale/heartbeat/trace gap、`finalResponse` 已出现但仍非终态的 awaiting terminal/judge、blocker-like final response、HWLAB#7/#99/#116/#164/#317 与 UniDesk#20/#118 命中、任务分类和下一步 drill-down 命令。默认不得输出完整 prompt、完整 final response、raw output、完整 trace 或 raw overview;需要详情只能按 task id 使用 `codex task``codex task --trace``codex output``codex read``rawOverview` 命令渐进展开。
- `bun scripts/cli.ts codex tasks --view supervisor --limit N`:查看默认低噪声监督视图,包括 `activeRunning`、running、完成未读、少量最近完成、queued/runnable、activity、commanderConcurrency、execution diagnostics、任务分类和下一步 drill-down 命令。默认行只保留 task id、队列、短 prompt/body 预览和原始字符数;`--limit` 是扫描/分页预算,不是返回几十条肥行的开关,CLI effective limit 安全上限为 100,输出必须用 `filters.requestedLimit``filters.effectiveLimit``filters.limitCapped``source.requestedLimit``source.effectiveLimit` 区分用户请求、CLI cap 和 overview 源拉取预算;例如 `--limit 260` 应明确显示 requested=260、effective=100、source=200`running.returned` 只是低噪声返回行数。`show/detail/trace/output/full/read` 放在 section template 中,避免每条任务重复刷屏,需要更多内容再按 taskId 展开。刚执行 `codex submit` 后也可以先读 submit 返回的 `submitted.taskStates[]``queue.countContext``queue.activity.effectiveActiveTaskCount``queue.stateDisclosure`;若某个 id preview 有 `idsUnavailable=true`,不要把它当成空队列,按 `queue.listPreviewPolicy.rawCommand` 或本 supervisor 命令继续查。
- `bun scripts/cli.ts codex queues`:默认是 commander-first 队列态势摘要,`--commander` 是显式同义开关。输出前部固定使用 `.data.queues.commander`,先给出 `activeRunnerCount``source``target=15``slotDeficit``queuedCount``runningTasks``heartbeat.fresh``heartbeat.risk``heartbeat.staleRecoveryCandidates`、active/runnable queue 小页和 drill-down 命令;历史 queue item 列表保留在 `.data.queues.items[]`,但只是分页的次要行。需要完整队列行视图时加 `--full`,但 `--full` 仍默认分页,继续用 `--limit N``--page N``--offset N` 渐进展开。summary 和 full 都使用稳定 JSON path `.data.queues.items[]` 读取队列行,并从 `.data.queues.commander``.data.queues.commanderConcurrency``.data.queues.activity``.data.queues.counts``.data.queues.executionDiagnostics` 读取全局活跃计数和执行诊断;完整 upstream 只通过输出中的 raw command 显式获取。若 `/api/queues` 没有返回 task row`runningTasks.items[].name` 会是 `null``nameSource=not-returned-by-api-queues`,此时按返回的 `codex task <taskId>` 或 supervisor 命令展开,不要假设任务没有名称。
- `bun scripts/cli.ts codex execution-plane [--full|--raw]`:只读巡检 D601 原生 k3s `unidesk` namespace 下的正式执行面。该命令强制使用 `KUBECONFIG=/etc/rancher/k3s/k3s.yaml` 并确认 node `d601`,默认低噪声返回 `summary.formalExecutionPlane``summary.deploymentDrift``summary.deprecatedComposeResidual``executionPlane.deployments[]``drift.status``residual.status``judgeProbe.behaviorVersion`。它比较三类 Deployment`code-queue` 必须是 scheduler`code-queue-read` 必须是 read`code-queue-write` 必须是 write;同时比较 deployment env/annotation commit、Pod `imageID` digest、宿主 `/home/ubuntu/cq-deploy` HEAD、以及 `/api/judge/probe``behaviorVersion=code-queue-judge-probe:v1`。任何 commit/digest/worktree/probe 不一致或缺少可比 marker 都必须输出 `deployment-drift`,不能写成 healthy。检测到旧 Docker Compose `code-queue-backend` 或旧 `127.0.0.1:4222` 监听时必须输出 `deprecated-compose-residual`。默认不打印完整 Kubernetes Deployment JSON、环境变量全集、SecretRef 值、judge probe 原始结果或命令 stdout;需要逐项展开时使用 `--full`,需要安全裁剪后的原始观察对象时使用 `--raw`
- `bun scripts/cli.ts codex unread --limit N`:查看完成未读审阅积压的默认 triage,按 repo、issue、status 和 queue 汇总,并给出有界最新任务紧凑行;默认行只包含 task id、状态、queue、issues、updatedAt/finishedAt 和一条 `nextStep`,不重复每任务 `show/detail/trace/output/read` 命令,也不输出 raw prompt、final response、trace 或 output。完整 per-task 命令必须显式使用 `codex unread --full``codex unread --view full``codex unread list` 或单任务 `codex task <taskId>`/`codex read <taskId>` 展开;默认输出必须保留一次性的模板命令和分页命令。
- `bun scripts/cli.ts codex unread mark-read --repo owner/name --issue N --limit N --confirm`:批量已读入口,必须显式 `mark-read``--confirm`,否则结构化失败且不 POST `/read`
- `bun scripts/cli.ts codex tasks --unread --limit N`:兼容查看完成未读审阅积压;`--unread``--unread-only` 等价,不能被静默忽略。
@@ -0,0 +1,196 @@
import {
expectedJudgeProbeBehaviorVersion,
runCodeQueueExecutionPlaneForTest,
type CodeQueueExecutionPlaneObservation,
} from "./src/code-queue-execution-plane";
type JsonRecord = Record<string, unknown>;
function assertCondition(condition: unknown, message: string, detail: JsonRecord = {}): void {
if (!condition) throw new Error(`${message}: ${JSON.stringify(detail)}`);
}
const commit = "62c613eefc84292fe1874a837685b073ac6c7295";
const otherCommit = "0c3cdb4ee06a23361ed511a2da033d67b53d16f4";
const digest = `sha256:${"1".repeat(64)}`;
const otherDigest = `sha256:${"2".repeat(64)}`;
function deployment(name: string, role: "scheduler" | "read" | "write", schedulerEnabled: "true" | "false", overrides: Partial<CodeQueueExecutionPlaneObservation["deployments"][number]> = {}): CodeQueueExecutionPlaneObservation["deployments"][number] {
return {
name,
namespace: "unidesk",
observed: true,
expectedRole: role,
expectedSchedulerEnabled: schedulerEnabled,
labels: {
app: "code-queue",
component: role === "scheduler" ? null : role,
deploymentMode: "k3sctl-managed",
instanceId: role === "scheduler" ? "D601" : `D601-${role}`,
},
annotations: {
deployRef: "origin/master:deploy.json#environments.prod.services.code-queue",
deployCommit: commit,
deployRequestedCommit: commit,
imageSource: "deploy-env-commit",
},
replicas: {
desired: role === "read" ? 2 : 1,
ready: role === "read" ? 2 : 1,
available: role === "read" ? 2 : 1,
updated: role === "read" ? 2 : 1,
},
nodeSelector: "D601",
image: "unidesk-code-queue:d601",
env: {
serviceRole: role,
schedulerEnabled,
unideskDeployCommit: commit,
unideskDeployRequestedCommit: commit,
codeQueueDeployCommit: commit,
codeQueueDeployRequestedCommit: commit,
},
repoHostPath: "/home/ubuntu/cq-deploy",
error: null,
...overrides,
};
}
function healthyObservation(overrides: Partial<CodeQueueExecutionPlaneObservation> = {}): CodeQueueExecutionPlaneObservation {
const base: CodeQueueExecutionPlaneObservation = {
checkedAt: "2026-05-24T00:00:00.000Z",
namespace: "unidesk",
kubeconfig: "/etc/rancher/k3s/k3s.yaml",
worktreePath: "/home/ubuntu/cq-deploy",
guard: {
status: "pass",
refusal: false,
refusalSignals: [],
kubeconfig: "/etc/rancher/k3s/k3s.yaml",
expectedKubeconfig: "/etc/rancher/k3s/k3s.yaml",
currentContext: "default",
apiServer: "https://127.0.0.1:6443",
nodeNames: ["d601"],
nodeCount: 1,
requiredNodeName: "d601",
requiredNodePresent: true,
commandsOk: true,
summary: "D601 native k3s guard passed with explicit KUBECONFIG.",
},
deployments: [
deployment("code-queue", "scheduler", "true"),
deployment("code-queue-read", "read", "false"),
deployment("code-queue-write", "write", "false"),
],
pods: [
{ name: "code-queue-aaa", instanceId: "D601", component: null, nodeName: "d601", phase: "Running", ready: true, imageID: `registry/unidesk-code-queue@${digest}`, digest },
{ name: "code-queue-read-aaa", instanceId: "D601-read", component: "read", nodeName: "d601", phase: "Running", ready: true, imageID: `registry/unidesk-code-queue@${digest}`, digest },
{ name: "code-queue-write-aaa", instanceId: "D601-write", component: "write", nodeName: "d601", phase: "Running", ready: true, imageID: `registry/unidesk-code-queue@${digest}`, digest },
],
services: [
{ name: "code-queue", observed: true, type: "ClusterIP", clusterIP: "10.43.0.1", ports: ["http:4222->http"], selector: { app: "code-queue", component: null, instanceId: "D601" }, error: null },
{ name: "code-queue-read", observed: true, type: "ClusterIP", clusterIP: "10.43.0.2", ports: ["http:4222->http"], selector: { app: "code-queue", component: "read", instanceId: null }, error: null },
{ name: "code-queue-write", observed: true, type: "ClusterIP", clusterIP: "10.43.0.3", ports: ["http:4222->http"], selector: { app: "code-queue", component: "write", instanceId: null }, error: null },
],
worktree: {
path: "/home/ubuntu/cq-deploy",
ok: true,
head: commit,
error: null,
},
residual: {
composeBackend: { ok: true, present: false, containers: [], error: null },
loopbackPort4222: { ok: true, present: false, listeners: [], error: null },
},
judgeProbe: {
ok: true,
attempted: true,
behaviorVersion: expectedJudgeProbeBehaviorVersion,
expectedBehaviorVersion: expectedJudgeProbeBehaviorVersion,
configured: true,
model: "minimax-m2.7",
hits: 8,
total: 8,
hitRate: 1,
serviceProxyPath: "/api/v1/namespaces/unidesk/services/code-queue/proxy/api/judge/probe",
error: null,
raw: { ok: true, behaviorVersion: expectedJudgeProbeBehaviorVersion, results: [{ id: "bounded" }] },
},
commandDiagnostics: {},
};
return { ...base, ...overrides };
}
async function checkHealthyNoDrift(): Promise<void> {
const result = await runCodeQueueExecutionPlaneForTest([], healthyObservation());
assertCondition(result.ok === true, "healthy fixture should pass", result);
assertCondition((result.summary as JsonRecord).deploymentDrift === false, "healthy fixture should not report deployment drift", result);
assertCondition((result.summary as JsonRecord).deprecatedComposeResidual === false, "healthy fixture should not report residual compose", result);
}
async function checkDeploymentDrift(): Promise<void> {
const obs = healthyObservation({
deployments: [
deployment("code-queue", "scheduler", "true", { env: { ...deployment("code-queue", "scheduler", "true").env, codeQueueDeployCommit: otherCommit } }),
deployment("code-queue-read", "read", "false"),
deployment("code-queue-write", "write", "false"),
],
pods: [
{ name: "code-queue-aaa", instanceId: "D601", component: null, nodeName: "d601", phase: "Running", ready: true, imageID: `registry/unidesk-code-queue@${digest}`, digest },
{ name: "code-queue-read-aaa", instanceId: "D601-read", component: "read", nodeName: "d601", phase: "Running", ready: true, imageID: `registry/unidesk-code-queue@${otherDigest}`, digest: otherDigest },
],
judgeProbe: {
...healthyObservation().judgeProbe,
behaviorVersion: "legacy",
},
});
const result = await runCodeQueueExecutionPlaneForTest([], obs);
const drift = result.drift as JsonRecord;
assertCondition(result.ok === false, "drift fixture should fail", result);
assertCondition(drift.status === "deployment-drift", "drift status should be deployment-drift", result);
assertCondition(JSON.stringify(drift).includes("deployment-drift"), "drift signal code should be visible", result);
}
async function checkDeprecatedComposeResidual(): Promise<void> {
const result = await runCodeQueueExecutionPlaneForTest([], healthyObservation({
residual: {
composeBackend: { ok: true, present: true, containers: [{ name: "code-queue-backend", status: "Up 3 days", image: "unidesk-code-queue:old" }], error: null },
loopbackPort4222: { ok: true, present: true, listeners: [{ localAddress: "127.0.0.1:4222", process: "users:((\"bun\",pid=1,fd=12))", line: "LISTEN 0 128 127.0.0.1:4222 0.0.0.0:* users:((\"bun\",pid=1,fd=12))" }], error: null },
},
}));
const residual = result.residual as JsonRecord;
assertCondition(result.ok === false, "residual fixture should fail", result);
assertCondition(residual.status === "deprecated-compose-residual", "residual status should be explicit", result);
assertCondition(JSON.stringify(result.blockers).includes("deprecated-compose-residual"), "residual blocker code should be visible", result);
}
async function checkProgressiveDisclosure(): Promise<void> {
const compact = await runCodeQueueExecutionPlaneForTest([], healthyObservation());
assertCondition(!("details" in compact), "default output should omit details", compact);
assertCondition(!("rawObservation" in compact), "default output should omit raw observation", compact);
const full = await runCodeQueueExecutionPlaneForTest(["--full"], healthyObservation());
assertCondition("details" in full, "--full should include details", full);
assertCondition(!("rawObservation" in full), "--full should still omit raw observation", full);
const raw = await runCodeQueueExecutionPlaneForTest(["--raw"], healthyObservation());
assertCondition("details" in raw && "rawObservation" in raw, "--raw should include details and raw observation", raw);
}
async function main(): Promise<void> {
const checks = [
["code-queue:execution-plane-healthy-no-drift", checkHealthyNoDrift],
["code-queue:execution-plane-deployment-drift", checkDeploymentDrift],
["code-queue:execution-plane-deprecated-compose-residual", checkDeprecatedComposeResidual],
["code-queue:execution-plane-progressive-disclosure", checkProgressiveDisclosure],
] as const;
const results = [];
for (const [name, check] of checks) {
await check();
results.push({ name, ok: true });
}
process.stdout.write(`${JSON.stringify({ ok: true, results }, null, 2)}\n`);
}
if (import.meta.main) {
await main();
}
+5
View File
@@ -20,6 +20,7 @@ const syntaxFiles = [
"scripts/src/artifact-registry.ts",
"scripts/src/auth-broker.ts",
"scripts/src/code-queue.ts",
"scripts/src/code-queue-execution-plane.ts",
"scripts/src/command.ts",
"scripts/src/d601-k3s-guard.ts",
"scripts/src/hwlab-cd.ts",
@@ -48,6 +49,7 @@ const syntaxFiles = [
"scripts/code-queue-submit-summary-contract-test.ts",
"scripts/code-queue-cli-read-terminal-contract-test.ts",
"scripts/code-queue-gh-auth-redaction-contract-test.ts",
"scripts/code-queue-execution-plane-contract-test.ts",
"scripts/d601-recovery-guardrails-contract-test.ts",
"scripts/hwlab-cd-wrapper-contract-test.ts",
"scripts/code-queue-queues-shape-contract-test.ts",
@@ -359,6 +361,7 @@ export function runChecks(config: UniDeskConfig, options: CheckOptions = default
fileItem("scripts/code-queue-submit-summary-contract-test.ts"),
fileItem("scripts/code-queue-submit-routing-contract-test.ts"),
fileItem("scripts/code-queue-gh-auth-redaction-contract-test.ts"),
fileItem("scripts/code-queue-execution-plane-contract-test.ts"),
fileItem("scripts/code-queue-queues-shape-contract-test.ts"),
fileItem("scripts/code-queue-supervisor-disclosure-contract-test.ts"),
fileItem("scripts/code-queue-commander-view-contract-test.ts"),
@@ -416,6 +419,7 @@ export function runChecks(config: UniDeskConfig, options: CheckOptions = default
items.push(commandItem("code-queue:submit-summary-contract", ["bun", "scripts/code-queue-submit-summary-contract-test.ts"], 30_000));
items.push(commandItem("code-queue:submit-routing-contract", ["bun", "scripts/code-queue-submit-routing-contract-test.ts"], 30_000));
items.push(commandItem("code-queue:gh-auth-redaction-contract", ["bun", "scripts/code-queue-gh-auth-redaction-contract-test.ts"], 30_000));
items.push(commandItem("code-queue:execution-plane-contract", ["bun", "scripts/code-queue-execution-plane-contract-test.ts"], 30_000));
items.push(commandItem("code-queue:queues-shape-contract", ["bun", "scripts/code-queue-queues-shape-contract-test.ts"], 30_000));
items.push(commandItem("code-queue:supervisor-disclosure-contract", ["bun", "scripts/code-queue-supervisor-disclosure-contract-test.ts"], 30_000));
items.push(commandItem("code-queue:commander-view-contract", ["bun", "scripts/code-queue-commander-view-contract-test.ts"], 30_000));
@@ -461,6 +465,7 @@ export function runChecks(config: UniDeskConfig, options: CheckOptions = default
items.push(skippedItem("code-queue:submit-summary-contract", "Code Queue submit summary contract is opt-in with script checks", "--scripts-typecheck or --full"));
items.push(skippedItem("code-queue:submit-routing-contract", "Code Queue submit routing contract is opt-in with script checks", "--scripts-typecheck or --full"));
items.push(skippedItem("code-queue:gh-auth-redaction-contract", "Code Queue GitHub auth output redaction contract is opt-in with script checks", "--scripts-typecheck or --full"));
items.push(skippedItem("code-queue:execution-plane-contract", "Code Queue execution plane drift contract is opt-in with script checks", "--scripts-typecheck or --full"));
items.push(skippedItem("code-queue:queues-shape-contract", "Code Queue queues shape contract is opt-in with script checks", "--scripts-typecheck or --full"));
items.push(skippedItem("code-queue:supervisor-disclosure-contract", "Code Queue supervisor disclosure contract is opt-in with script checks", "--scripts-typecheck or --full"));
items.push(skippedItem("code-queue:commander-view-contract", "Code Queue commander view contract is opt-in with script checks", "--scripts-typecheck or --full"));
+997
View File
@@ -0,0 +1,997 @@
import { runCommand, type CommandResult } from "./command";
import { repoRoot } from "./config";
import { classifyD601K3sTarget, d601NativeKubeconfig, d601RequiredNodeName, type D601K3sGuardClassification } from "./d601-k3s-guard";
type PlaneStatus = "ready" | "blocked" | "degraded";
type SignalSeverity = "blocker" | "warning";
interface ExecutionPlaneOptions {
namespace: string;
kubeconfig: string;
worktreePath: string;
full: boolean;
raw: boolean;
skipProbe: boolean;
timeoutMs: number;
}
interface ProbeResult {
ok: boolean;
command: string[];
exitCode: number | null;
stdout: string;
stderr: string;
timedOut: boolean;
}
interface DeploymentObservation {
name: string;
namespace: string;
observed: boolean;
expectedRole: "scheduler" | "read" | "write";
expectedSchedulerEnabled: string;
labels: {
app: string | null;
component: string | null;
deploymentMode: string | null;
instanceId: string | null;
};
annotations: {
deployRef: string | null;
deployCommit: string | null;
deployRequestedCommit: string | null;
imageSource: string | null;
};
replicas: {
desired: number | null;
ready: number;
available: number;
updated: number;
};
nodeSelector: string | null;
image: string | null;
env: {
serviceRole: string | null;
schedulerEnabled: string | null;
unideskDeployCommit: string | null;
unideskDeployRequestedCommit: string | null;
codeQueueDeployCommit: string | null;
codeQueueDeployRequestedCommit: string | null;
};
repoHostPath: string | null;
error: string | null;
}
interface PodObservation {
name: string;
instanceId: string | null;
component: string | null;
nodeName: string | null;
phase: string | null;
ready: boolean;
imageID: string | null;
digest: string | null;
}
interface ServiceObservation {
name: string;
observed: boolean;
type: string | null;
clusterIP: string | null;
ports: string[];
selector: {
app: string | null;
component: string | null;
instanceId: string | null;
};
error: string | null;
}
interface WorktreeObservation {
path: string;
ok: boolean;
head: string | null;
error: string | null;
}
interface ResidualObservation {
composeBackend: {
ok: boolean;
present: boolean;
containers: Array<{ name: string; status: string | null; image: string | null }>;
error: string | null;
};
loopbackPort4222: {
ok: boolean;
present: boolean;
listeners: Array<{ localAddress: string; process: string | null; line: string }>;
error: string | null;
};
}
interface JudgeProbeObservation {
ok: boolean;
attempted: boolean;
behaviorVersion: string | null;
expectedBehaviorVersion: string;
configured: boolean | null;
model: string | null;
hits: number | null;
total: number | null;
hitRate: number | null;
serviceProxyPath: string | null;
error: string | null;
raw: unknown;
}
export interface CodeQueueExecutionPlaneObservation {
checkedAt: string;
namespace: string;
kubeconfig: string;
worktreePath: string;
guard: D601K3sGuardClassification;
deployments: DeploymentObservation[];
pods: PodObservation[];
services: ServiceObservation[];
worktree: WorktreeObservation;
residual: ResidualObservation;
judgeProbe: JudgeProbeObservation;
commandDiagnostics: Record<string, unknown>;
}
interface ExecutionPlaneCollector {
collect(options: ExecutionPlaneOptions): Promise<CodeQueueExecutionPlaneObservation> | CodeQueueExecutionPlaneObservation;
}
interface DriftSignal {
code: string;
severity: SignalSeverity;
message: string;
expected?: unknown;
observed?: unknown;
field?: string;
}
const expectedNamespace = "unidesk";
const expectedWorktreePath = "/home/ubuntu/cq-deploy";
export const expectedJudgeProbeBehaviorVersion = "code-queue-judge-probe:v1";
const expectedDeployments = [
{ name: "code-queue", role: "scheduler" as const, schedulerEnabled: "true" },
{ name: "code-queue-read", role: "read" as const, schedulerEnabled: "false" },
{ name: "code-queue-write", role: "write" as const, schedulerEnabled: "false" },
];
function nowIso(): string {
return new Date().toISOString();
}
function hasFlag(args: string[], name: string): boolean {
return args.includes(name);
}
function optionValue(args: string[], names: string[]): string | undefined {
for (let index = 0; index < args.length; index += 1) {
if (!names.includes(args[index] ?? "")) continue;
const value = args[index + 1];
if (value === undefined || value.startsWith("--")) throw new Error(`${args[index]} requires a value`);
return value;
}
return undefined;
}
function parsePositiveInteger(value: string | undefined, fallback: number, max: number): number {
if (value === undefined) return fallback;
const parsed = Number(value);
if (!Number.isInteger(parsed) || parsed <= 0) throw new Error("--timeout-ms must be a positive integer");
return Math.min(parsed, max);
}
function parseExecutionPlaneOptions(args: string[]): ExecutionPlaneOptions {
const knownFlags = new Set(["--full", "--raw", "--skip-probe"]);
const knownValues = new Set(["--namespace", "--kubeconfig", "--worktree", "--timeout-ms"]);
for (let index = 0; index < args.length; index += 1) {
const arg = args[index] ?? "";
if (!arg.startsWith("--")) throw new Error(`codex execution-plane does not accept positional argument: ${arg}`);
if (knownFlags.has(arg)) continue;
if (knownValues.has(arg)) {
index += 1;
continue;
}
throw new Error(`unknown codex execution-plane option: ${arg}`);
}
const raw = hasFlag(args, "--raw");
return {
namespace: optionValue(args, ["--namespace"]) ?? expectedNamespace,
kubeconfig: optionValue(args, ["--kubeconfig"]) ?? d601NativeKubeconfig,
worktreePath: optionValue(args, ["--worktree"]) ?? expectedWorktreePath,
full: raw || hasFlag(args, "--full"),
raw,
skipProbe: hasFlag(args, "--skip-probe"),
timeoutMs: parsePositiveInteger(optionValue(args, ["--timeout-ms"]), 15_000, 60_000),
};
}
function firstLine(text: string): string | null {
const line = text.split(/\r?\n/u).map((item) => item.trim()).find((item) => item.length > 0);
return line ?? null;
}
function lines(text: string): string[] {
return text.split(/\r?\n/u).map((item) => item.trim()).filter(Boolean);
}
function nullable(value: string | undefined): string | null {
const trimmed = (value ?? "").trim();
return trimmed.length === 0 ? null : trimmed;
}
function numberOrNull(value: string | undefined): number | null {
const trimmed = (value ?? "").trim();
if (trimmed.length === 0) return null;
const parsed = Number(trimmed);
return Number.isFinite(parsed) ? parsed : null;
}
function boolFromString(value: string | null): boolean {
return value === "true";
}
function commandProbe(result: CommandResult): ProbeResult {
return {
ok: result.exitCode === 0 && !result.timedOut,
command: result.command,
exitCode: result.exitCode,
stdout: result.stdout,
stderr: result.stderr,
timedOut: result.timedOut,
};
}
function safeError(probe: ProbeResult): string | null {
if (probe.ok) return null;
const text = firstLine(probe.stderr) ?? firstLine(probe.stdout);
return text ?? `exitCode=${probe.exitCode ?? "null"}`;
}
function runKubectl(args: string[], options: ExecutionPlaneOptions): ProbeResult {
return commandProbe(runCommand(["kubectl", ...args], repoRoot, {
timeoutMs: options.timeoutMs,
env: { ...process.env, KUBECONFIG: options.kubeconfig },
}));
}
function collectGuard(options: ExecutionPlaneOptions): { guard: D601K3sGuardClassification; diagnostics: Record<string, unknown> } {
const context = runKubectl(["config", "current-context"], options);
const server = runKubectl(["config", "view", "--minify", "-o", "jsonpath={.clusters[0].cluster.server}"], options);
const nodes = runKubectl(["get", "nodes", "-o", "jsonpath={range .items[*]}{.metadata.name}{\"\\n\"}{end}"], options);
const combinedText = [context.stdout, context.stderr, server.stdout, server.stderr, nodes.stdout, nodes.stderr].join("\n");
const guard = classifyD601K3sTarget({
kubeconfig: options.kubeconfig,
expectedKubeconfig: d601NativeKubeconfig,
currentContext: firstLine(context.stdout),
apiServer: firstLine(server.stdout),
nodeNames: lines(nodes.stdout),
commandsOk: context.ok && server.ok && nodes.ok,
combinedText,
});
return {
guard,
diagnostics: {
context: compactProbe(context),
server: compactProbe(server),
nodes: compactProbe(nodes),
},
};
}
function compactProbe(probe: ProbeResult): Record<string, unknown> {
return {
ok: probe.ok,
exitCode: probe.exitCode,
timedOut: probe.timedOut,
stdoutPreview: firstLine(probe.stdout),
stderrPreview: firstLine(probe.stderr),
};
}
function deploymentJsonPath(): string {
return [
"{.metadata.name}",
"{\"\\t\"}{.metadata.namespace}",
"{\"\\t\"}{.metadata.labels.app\\.kubernetes\\.io/name}",
"{\"\\t\"}{.metadata.labels.app\\.kubernetes\\.io/component}",
"{\"\\t\"}{.metadata.labels.unidesk\\.ai/deployment-mode}",
"{\"\\t\"}{.metadata.labels.unidesk\\.ai/instance-id}",
"{\"\\t\"}{.metadata.annotations.unidesk\\.ai/deploy-ref}",
"{\"\\t\"}{.metadata.annotations.unidesk\\.ai/deploy-commit}",
"{\"\\t\"}{.metadata.annotations.unidesk\\.ai/deploy-requested-commit}",
"{\"\\t\"}{.metadata.annotations.unidesk\\.ai/image-source}",
"{\"\\t\"}{.status.readyReplicas}",
"{\"\\t\"}{.status.availableReplicas}",
"{\"\\t\"}{.status.updatedReplicas}",
"{\"\\t\"}{.spec.replicas}",
"{\"\\t\"}{.spec.template.spec.nodeSelector.unidesk\\.ai/node-id}",
"{\"\\t\"}{.spec.template.spec.containers[?(@.name==\"code-queue\")].image}",
"{\"\\t\"}{.spec.template.spec.containers[?(@.name==\"code-queue\")].env[?(@.name==\"CODE_QUEUE_SERVICE_ROLE\")].value}",
"{\"\\t\"}{.spec.template.spec.containers[?(@.name==\"code-queue\")].env[?(@.name==\"CODE_QUEUE_SCHEDULER_ENABLED\")].value}",
"{\"\\t\"}{.spec.template.spec.containers[?(@.name==\"code-queue\")].env[?(@.name==\"UNIDESK_DEPLOY_COMMIT\")].value}",
"{\"\\t\"}{.spec.template.spec.containers[?(@.name==\"code-queue\")].env[?(@.name==\"UNIDESK_DEPLOY_REQUESTED_COMMIT\")].value}",
"{\"\\t\"}{.spec.template.spec.containers[?(@.name==\"code-queue\")].env[?(@.name==\"CODE_QUEUE_DEPLOY_COMMIT\")].value}",
"{\"\\t\"}{.spec.template.spec.containers[?(@.name==\"code-queue\")].env[?(@.name==\"CODE_QUEUE_DEPLOY_REQUESTED_COMMIT\")].value}",
"{\"\\t\"}{.spec.template.spec.volumes[?(@.name==\"repo\")].hostPath.path}",
].join("");
}
function parseDeployment(fields: string[], expected: typeof expectedDeployments[number], namespace: string, error: string | null): DeploymentObservation {
return {
name: nullable(fields[0]) ?? expected.name,
namespace: nullable(fields[1]) ?? namespace,
observed: error === null && nullable(fields[0]) !== null,
expectedRole: expected.role,
expectedSchedulerEnabled: expected.schedulerEnabled,
labels: {
app: nullable(fields[2]),
component: nullable(fields[3]),
deploymentMode: nullable(fields[4]),
instanceId: nullable(fields[5]),
},
annotations: {
deployRef: nullable(fields[6]),
deployCommit: nullable(fields[7]),
deployRequestedCommit: nullable(fields[8]),
imageSource: nullable(fields[9]),
},
replicas: {
ready: numberOrNull(fields[10]) ?? 0,
available: numberOrNull(fields[11]) ?? 0,
updated: numberOrNull(fields[12]) ?? 0,
desired: numberOrNull(fields[13]),
},
nodeSelector: nullable(fields[14]),
image: nullable(fields[15]),
env: {
serviceRole: nullable(fields[16]),
schedulerEnabled: nullable(fields[17]),
unideskDeployCommit: nullable(fields[18]),
unideskDeployRequestedCommit: nullable(fields[19]),
codeQueueDeployCommit: nullable(fields[20]),
codeQueueDeployRequestedCommit: nullable(fields[21]),
},
repoHostPath: nullable(fields[22]),
error,
};
}
function collectDeployments(options: ExecutionPlaneOptions): DeploymentObservation[] {
return expectedDeployments.map((expected) => {
const probe = runKubectl(["-n", options.namespace, "get", "deployment", expected.name, "-o", `jsonpath=${deploymentJsonPath()}`], options);
const error = safeError(probe);
const fields = probe.ok ? probe.stdout.trimEnd().split("\t") : [];
return parseDeployment(fields, expected, options.namespace, error);
});
}
function collectPods(options: ExecutionPlaneOptions): PodObservation[] {
const path = [
"{range .items[*]}",
"{.metadata.name}{\"\\t\"}",
"{.metadata.labels.unidesk\\.ai/instance-id}{\"\\t\"}",
"{.metadata.labels.app\\.kubernetes\\.io/component}{\"\\t\"}",
"{.spec.nodeName}{\"\\t\"}",
"{.status.phase}{\"\\t\"}",
"{.status.containerStatuses[?(@.name==\"code-queue\")].ready}{\"\\t\"}",
"{.status.containerStatuses[?(@.name==\"code-queue\")].imageID}{\"\\n\"}",
"{end}",
].join("");
const probe = runKubectl(["-n", options.namespace, "get", "pods", "-l", "app.kubernetes.io/name=code-queue", "-o", `jsonpath=${path}`], options);
if (!probe.ok) return [];
return lines(probe.stdout).map((line) => {
const fields = line.split("\t");
const imageID = nullable(fields[6]);
return {
name: nullable(fields[0]) ?? "unknown",
instanceId: nullable(fields[1]),
component: nullable(fields[2]),
nodeName: nullable(fields[3]),
phase: nullable(fields[4]),
ready: boolFromString(nullable(fields[5])),
imageID,
digest: digestFromText(imageID),
};
});
}
function serviceJsonPath(): string {
return [
"{.metadata.name}",
"{\"\\t\"}{.spec.type}",
"{\"\\t\"}{.spec.clusterIP}",
"{\"\\t\"}{range .spec.ports[*]}{.name}:{.port}->{.targetPort}{\",\"}{end}",
"{\"\\t\"}{.spec.selector.app\\.kubernetes\\.io/name}",
"{\"\\t\"}{.spec.selector.app\\.kubernetes\\.io/component}",
"{\"\\t\"}{.spec.selector.unidesk\\.ai/instance-id}",
].join("");
}
function collectServices(options: ExecutionPlaneOptions): ServiceObservation[] {
return expectedDeployments.map((expected) => {
const probe = runKubectl(["-n", options.namespace, "get", "service", expected.name, "-o", `jsonpath=${serviceJsonPath()}`], options);
const error = safeError(probe);
const fields = probe.ok ? probe.stdout.trimEnd().split("\t") : [];
return {
name: nullable(fields[0]) ?? expected.name,
observed: error === null && nullable(fields[0]) !== null,
type: nullable(fields[1]),
clusterIP: nullable(fields[2]),
ports: (nullable(fields[3]) ?? "").split(",").map((item) => item.trim()).filter(Boolean),
selector: {
app: nullable(fields[4]),
component: nullable(fields[5]),
instanceId: nullable(fields[6]),
},
error,
};
});
}
function collectWorktree(options: ExecutionPlaneOptions): WorktreeObservation {
const probe = commandProbe(runCommand(["git", "-C", options.worktreePath, "rev-parse", "HEAD"], repoRoot, { timeoutMs: 5_000 }));
return {
path: options.worktreePath,
ok: probe.ok,
head: probe.ok ? firstLine(probe.stdout) : null,
error: safeError(probe),
};
}
function collectResidual(): ResidualObservation {
const docker = commandProbe(runCommand(["docker", "ps", "-a", "--filter", "name=code-queue-backend", "--format", "{{.Names}}\t{{.Status}}\t{{.Image}}"], repoRoot, { timeoutMs: 8_000 }));
const containers = docker.ok
? lines(docker.stdout).map((line) => {
const fields = line.split("\t");
return { name: fields[0] ?? "unknown", status: nullable(fields[1]), image: nullable(fields[2]) };
}).filter((item) => item.name === "code-queue-backend" || item.name.includes("code-queue-backend"))
: [];
const ss = commandProbe(runCommand(["ss", "-H", "-ltnp"], repoRoot, { timeoutMs: 8_000 }));
const listeners = ss.ok
? (lines(ss.stdout)
.filter((line) => line.includes(":4222"))
.map((line) => {
const fields = line.split(/\s+/u);
const localAddress = fields[3] ?? fields[2] ?? "";
const processText = fields.find((field) => field.includes("users:")) ?? null;
return { localAddress, process: processText, line };
})
.filter((item) => item.localAddress.startsWith("127.0.0.1:4222") || item.localAddress.startsWith("[::1]:4222")))
: [];
return {
composeBackend: {
ok: docker.ok,
present: containers.length > 0,
containers,
error: safeError(docker),
},
loopbackPort4222: {
ok: ss.ok,
present: listeners.length > 0,
listeners,
error: safeError(ss),
},
};
}
function parseJson(text: string): unknown {
try {
return JSON.parse(text) as unknown;
} catch {
return null;
}
}
function asRecord(value: unknown): Record<string, unknown> | null {
return typeof value === "object" && value !== null && !Array.isArray(value) ? value as Record<string, unknown> : null;
}
function stringValue(value: unknown): string | null {
return typeof value === "string" && value.trim().length > 0 ? value.trim() : null;
}
function numberValue(value: unknown): number | null {
return typeof value === "number" && Number.isFinite(value) ? value : null;
}
function collectJudgeProbe(options: ExecutionPlaneOptions): JudgeProbeObservation {
if (options.skipProbe) {
return {
ok: false,
attempted: false,
behaviorVersion: null,
expectedBehaviorVersion: expectedJudgeProbeBehaviorVersion,
configured: null,
model: null,
hits: null,
total: null,
hitRate: null,
serviceProxyPath: null,
error: "--skip-probe requested",
raw: null,
};
}
const paths = [
`/api/v1/namespaces/${options.namespace}/services/code-queue/proxy/api/judge/probe`,
`/api/v1/namespaces/${options.namespace}/services/code-queue:4222/proxy/api/judge/probe`,
`/api/v1/namespaces/${options.namespace}/services/code-queue:http/proxy/api/judge/probe`,
];
for (const path of paths) {
const probe = runKubectl(["--request-timeout=15s", "get", "--raw", path], options);
if (!probe.ok) continue;
const raw = parseJson(probe.stdout);
const record = asRecord(raw);
if (record === null) {
return {
ok: false,
attempted: true,
behaviorVersion: null,
expectedBehaviorVersion: expectedJudgeProbeBehaviorVersion,
configured: null,
model: null,
hits: null,
total: null,
hitRate: null,
serviceProxyPath: path,
error: "judge probe returned non-JSON response",
raw: probe.stdout.slice(0, 2000),
};
}
return {
ok: record.ok === true,
attempted: true,
behaviorVersion: stringValue(record.behaviorVersion ?? asRecord(record.behavior)?.version ?? record.version),
expectedBehaviorVersion: expectedJudgeProbeBehaviorVersion,
configured: typeof record.configured === "boolean" ? record.configured : null,
model: stringValue(record.model),
hits: numberValue(record.hits),
total: numberValue(record.total),
hitRate: numberValue(record.hitRate),
serviceProxyPath: path,
error: record.ok === true ? null : stringValue(record.error) ?? "judge probe returned ok=false",
raw,
};
}
return {
ok: false,
attempted: true,
behaviorVersion: null,
expectedBehaviorVersion: expectedJudgeProbeBehaviorVersion,
configured: null,
model: null,
hits: null,
total: null,
hitRate: null,
serviceProxyPath: null,
error: "judge probe could not be read through Kubernetes API service proxy",
raw: null,
};
}
function digestFromText(value: string | null): string | null {
if (value === null) return null;
const match = value.match(/sha256:[0-9a-f]{64}/iu);
return match?.[0].toLowerCase() ?? null;
}
class LiveExecutionPlaneCollector implements ExecutionPlaneCollector {
collect(options: ExecutionPlaneOptions): CodeQueueExecutionPlaneObservation {
const { guard, diagnostics } = collectGuard(options);
const residual = collectResidual();
const worktree = collectWorktree(options);
if (guard.status !== "pass") {
return {
checkedAt: nowIso(),
namespace: options.namespace,
kubeconfig: options.kubeconfig,
worktreePath: options.worktreePath,
guard,
deployments: [],
pods: [],
services: [],
worktree,
residual,
judgeProbe: {
ok: false,
attempted: false,
behaviorVersion: null,
expectedBehaviorVersion: expectedJudgeProbeBehaviorVersion,
configured: null,
model: null,
hits: null,
total: null,
hitRate: null,
serviceProxyPath: null,
error: "D601 k3s guard did not pass; runtime probe skipped",
raw: null,
},
commandDiagnostics: { guard: diagnostics },
};
}
return {
checkedAt: nowIso(),
namespace: options.namespace,
kubeconfig: options.kubeconfig,
worktreePath: options.worktreePath,
guard,
deployments: collectDeployments(options),
pods: collectPods(options),
services: collectServices(options),
worktree,
residual,
judgeProbe: collectJudgeProbe(options),
commandDiagnostics: { guard: diagnostics },
};
}
}
function safeCommit(value: string | null): string | null {
if (value === null) return null;
const trimmed = value.trim().toLowerCase();
if (!/^[0-9a-f]{7,40}$/u.test(trimmed)) return null;
return trimmed;
}
function commitMatches(left: string, right: string): boolean {
const a = left.toLowerCase();
const b = right.toLowerCase();
return a === b || (a.length >= 7 && b.startsWith(a)) || (b.length >= 7 && a.startsWith(b));
}
function uniqueStrings(values: Array<string | null>): string[] {
return [...new Set(values.filter((value): value is string => value !== null && value.length > 0))];
}
function deploymentCommitMarkers(deployment: DeploymentObservation): Record<string, string | null> {
return {
annotationDeployCommit: safeCommit(deployment.annotations.deployCommit),
annotationDeployRequestedCommit: safeCommit(deployment.annotations.deployRequestedCommit),
unideskDeployCommit: safeCommit(deployment.env.unideskDeployCommit),
unideskDeployRequestedCommit: safeCommit(deployment.env.unideskDeployRequestedCommit),
codeQueueDeployCommit: safeCommit(deployment.env.codeQueueDeployCommit),
codeQueueDeployRequestedCommit: safeCommit(deployment.env.codeQueueDeployRequestedCommit),
};
}
function deploymentFormalSignals(deployment: DeploymentObservation): DriftSignal[] {
const signals: DriftSignal[] = [];
if (!deployment.observed) {
signals.push({ code: "execution-plane-deployment-missing", severity: "blocker", message: `${deployment.name} deployment is missing`, expected: deployment.name, observed: deployment.error });
return signals;
}
if (deployment.namespace !== expectedNamespace) {
signals.push({ code: "execution-plane-namespace-mismatch", severity: "blocker", message: `${deployment.name} is not in ${expectedNamespace}`, expected: expectedNamespace, observed: deployment.namespace });
}
if (deployment.labels.app !== "code-queue") {
signals.push({ code: "execution-plane-app-label-mismatch", severity: "blocker", message: `${deployment.name} app label is not code-queue`, expected: "code-queue", observed: deployment.labels.app });
}
if (deployment.labels.deploymentMode !== "k3sctl-managed") {
signals.push({ code: "execution-plane-mode-mismatch", severity: "blocker", message: `${deployment.name} is not k3sctl-managed`, expected: "k3sctl-managed", observed: deployment.labels.deploymentMode });
}
if (deployment.expectedRole !== "scheduler" && deployment.labels.component !== deployment.expectedRole) {
signals.push({ code: "execution-plane-component-mismatch", severity: "blocker", message: `${deployment.name} component label does not match role`, expected: deployment.expectedRole, observed: deployment.labels.component });
}
if (deployment.nodeSelector !== "D601" && deployment.nodeSelector !== "d601") {
signals.push({ code: "execution-plane-node-selector-mismatch", severity: "blocker", message: `${deployment.name} is not pinned to D601`, expected: "D601", observed: deployment.nodeSelector });
}
if (deployment.env.serviceRole !== deployment.expectedRole) {
signals.push({ code: "execution-plane-role-mismatch", severity: "blocker", message: `${deployment.name} CODE_QUEUE_SERVICE_ROLE mismatch`, expected: deployment.expectedRole, observed: deployment.env.serviceRole });
}
if (deployment.env.schedulerEnabled !== deployment.expectedSchedulerEnabled) {
signals.push({ code: "execution-plane-scheduler-flag-mismatch", severity: "blocker", message: `${deployment.name} CODE_QUEUE_SCHEDULER_ENABLED mismatch`, expected: deployment.expectedSchedulerEnabled, observed: deployment.env.schedulerEnabled });
}
if (deployment.image === null || !deployment.image.includes("unidesk-code-queue")) {
signals.push({ code: "execution-plane-image-mismatch", severity: "blocker", message: `${deployment.name} image is not a Code Queue image`, expected: "unidesk-code-queue", observed: deployment.image });
}
if (deployment.replicas.ready <= 0) {
signals.push({ code: "execution-plane-no-ready-replica", severity: "blocker", message: `${deployment.name} has no ready replicas`, expected: "readyReplicas > 0", observed: deployment.replicas.ready });
}
return signals;
}
function driftSignals(observation: CodeQueueExecutionPlaneObservation): DriftSignal[] {
const signals: DriftSignal[] = [];
const commitValues = observation.deployments.flatMap((deployment) => Object.values(deploymentCommitMarkers(deployment)));
const uniqueCommits = uniqueStrings(commitValues);
const worktreeHead = safeCommit(observation.worktree.head);
if (uniqueCommits.length === 0) {
signals.push({
code: "deployment-drift",
severity: "blocker",
field: "deployment.commitMarkers",
message: "Code Queue deployments do not expose comparable deploy commit markers",
expected: "UNIDESK_DEPLOY_COMMIT or CODE_QUEUE_DEPLOY_COMMIT",
observed: "missing",
});
}
if (uniqueCommits.length > 1) {
signals.push({
code: "deployment-drift",
severity: "blocker",
field: "deployment.commitMarkers",
message: "Code Queue deployment commit markers disagree",
observed: uniqueCommits,
});
}
for (const deployment of observation.deployments) {
if (uniqueStrings(Object.values(deploymentCommitMarkers(deployment))).length === 0) {
signals.push({
code: "deployment-drift",
severity: "blocker",
field: `deployment.${deployment.name}.commitMarkers`,
message: `${deployment.name} does not expose comparable deploy commit markers`,
expected: "UNIDESK_DEPLOY_COMMIT or CODE_QUEUE_DEPLOY_COMMIT",
observed: "missing",
});
}
}
if (worktreeHead === null) {
signals.push({
code: "deployment-drift",
severity: "blocker",
field: "mountedWorktree.head",
message: "Mounted Code Queue worktree HEAD could not be observed",
expected: observation.worktreePath,
observed: observation.worktree.error,
});
} else if (uniqueCommits.length > 0 && !uniqueCommits.some((commit) => commitMatches(commit, worktreeHead))) {
signals.push({
code: "deployment-drift",
severity: "blocker",
field: "mountedWorktree.head",
message: "Mounted worktree HEAD does not match deployment commit markers",
expected: uniqueCommits,
observed: worktreeHead,
});
}
const repoHostPaths = uniqueStrings(observation.deployments.map((deployment) => deployment.repoHostPath));
if (repoHostPaths.length !== 1 || repoHostPaths[0] !== observation.worktreePath) {
signals.push({
code: "deployment-drift",
severity: "blocker",
field: "deployment.repoHostPath",
message: "Code Queue deployments do not all mount only the expected worktree path",
expected: observation.worktreePath,
observed: repoHostPaths,
});
}
const podDigests = uniqueStrings(observation.pods.map((pod) => pod.digest));
if (podDigests.length === 0) {
signals.push({
code: "deployment-drift",
severity: "blocker",
field: "artifact.digest",
message: "No running Code Queue pod image digest was observed",
expected: "sha256 digest from pod status imageID",
observed: "missing",
});
} else if (podDigests.length > 1) {
signals.push({
code: "deployment-drift",
severity: "blocker",
field: "artifact.digest",
message: "Code Queue pod artifact digests disagree",
observed: podDigests,
});
}
if (!observation.judgeProbe.attempted) {
signals.push({
code: "deployment-drift",
severity: "warning",
field: "judgeProbe.behaviorVersion",
message: "/api/judge/probe behavior version was not checked",
expected: expectedJudgeProbeBehaviorVersion,
observed: observation.judgeProbe.error,
});
} else if (observation.judgeProbe.behaviorVersion !== expectedJudgeProbeBehaviorVersion) {
signals.push({
code: "deployment-drift",
severity: "blocker",
field: "judgeProbe.behaviorVersion",
message: "/api/judge/probe behavior version does not match the repo contract",
expected: expectedJudgeProbeBehaviorVersion,
observed: observation.judgeProbe.behaviorVersion,
});
} else if (!observation.judgeProbe.ok) {
signals.push({
code: "deployment-drift",
severity: "blocker",
field: "judgeProbe.ok",
message: "/api/judge/probe returned a non-ready result",
expected: true,
observed: observation.judgeProbe.error,
});
}
return signals;
}
function residualSignals(observation: CodeQueueExecutionPlaneObservation): DriftSignal[] {
const signals: DriftSignal[] = [];
if (observation.residual.composeBackend.present || observation.residual.loopbackPort4222.present) {
signals.push({
code: "deprecated-compose-residual",
severity: "blocker",
message: "Deprecated Docker Compose code-queue-backend or old 127.0.0.1:4222 listener is still present",
observed: {
composeBackendPresent: observation.residual.composeBackend.present,
loopbackPort4222Present: observation.residual.loopbackPort4222.present,
},
});
}
if (!observation.residual.composeBackend.ok) {
signals.push({
code: "deprecated-compose-residual-observation-gap",
severity: "warning",
message: "Could not observe Docker Compose residual state",
observed: observation.residual.composeBackend.error,
});
}
if (!observation.residual.loopbackPort4222.ok) {
signals.push({
code: "deprecated-port-observation-gap",
severity: "warning",
message: "Could not observe local listener state for 127.0.0.1:4222",
observed: observation.residual.loopbackPort4222.error,
});
}
return signals;
}
function compactDeployment(deployment: DeploymentObservation): Record<string, unknown> {
return {
name: deployment.name,
observed: deployment.observed,
role: deployment.env.serviceRole,
expectedRole: deployment.expectedRole,
schedulerEnabled: deployment.env.schedulerEnabled,
expectedSchedulerEnabled: deployment.expectedSchedulerEnabled,
readyReplicas: deployment.replicas.ready,
image: deployment.image,
nodeSelector: deployment.nodeSelector,
deployCommit: deployment.env.codeQueueDeployCommit ?? deployment.env.unideskDeployCommit ?? deployment.annotations.deployCommit,
requestedCommit: deployment.env.codeQueueDeployRequestedCommit ?? deployment.env.unideskDeployRequestedCommit ?? deployment.annotations.deployRequestedCommit,
repoHostPath: deployment.repoHostPath,
error: deployment.error,
};
}
function compactJudgeProbe(probe: JudgeProbeObservation, full: boolean): Record<string, unknown> {
return {
ok: probe.ok,
attempted: probe.attempted,
behaviorVersion: probe.behaviorVersion,
expectedBehaviorVersion: probe.expectedBehaviorVersion,
configured: probe.configured,
model: probe.model,
hits: probe.hits,
total: probe.total,
hitRate: probe.hitRate,
serviceProxyPath: probe.serviceProxyPath,
error: probe.error,
...(full ? { raw: probe.raw } : {}),
};
}
function evaluateObservation(observation: CodeQueueExecutionPlaneObservation, options: ExecutionPlaneOptions): Record<string, unknown> {
const formalSignals = observation.deployments.flatMap(deploymentFormalSignals);
const deploymentDriftSignals = driftSignals(observation);
const deprecatedResidualSignals = residualSignals(observation);
const guardSignals: DriftSignal[] = observation.guard.status === "pass"
? []
: [{
code: "d601-k3s-guard-blocked",
severity: "blocker",
message: observation.guard.summary,
expected: { kubeconfig: d601NativeKubeconfig, node: d601RequiredNodeName },
observed: { status: observation.guard.status, nodes: observation.guard.nodeNames },
}];
const signals = [...guardSignals, ...formalSignals, ...deploymentDriftSignals, ...deprecatedResidualSignals];
const blockers = signals.filter((signal) => signal.severity === "blocker");
const warnings = signals.filter((signal) => signal.severity === "warning");
const status: PlaneStatus = blockers.length > 0 ? "blocked" : warnings.length > 0 ? "degraded" : "ready";
const formal = formalSignals.length === 0 && observation.deployments.length === expectedDeployments.length;
const uniqueDigests = uniqueStrings(observation.pods.map((pod) => pod.digest));
const uniqueCommits = uniqueStrings(observation.deployments.flatMap((deployment) => Object.values(deploymentCommitMarkers(deployment))));
const result: Record<string, unknown> = {
ok: status === "ready",
surface: "code-queue-execution-plane",
checkedAt: observation.checkedAt,
devTestClass: "live-read",
mutation: false,
namespace: observation.namespace,
kubeconfig: observation.kubeconfig,
status,
runnerDisposition: status === "ready" ? "ready" : "infra-blocked",
summary: {
formalExecutionPlane: formal,
deploymentDrift: deploymentDriftSignals.some((signal) => signal.severity === "blocker"),
deprecatedComposeResidual: deprecatedResidualSignals.some((signal) => signal.code === "deprecated-compose-residual"),
blockerCount: blockers.length,
warningCount: warnings.length,
deploymentsObserved: observation.deployments.filter((deployment) => deployment.observed).length,
readyPodCount: observation.pods.filter((pod) => pod.ready).length,
artifactDigest: uniqueDigests.length === 1 ? uniqueDigests[0] : null,
deploymentCommit: uniqueCommits.length === 1 ? uniqueCommits[0] : null,
mountedWorktreeHead: observation.worktree.head,
judgeProbeBehaviorVersion: observation.judgeProbe.behaviorVersion,
},
guard: {
status: observation.guard.status,
kubeconfig: observation.guard.kubeconfig,
nodeNames: observation.guard.nodeNames,
requiredNodeName: observation.guard.requiredNodeName,
requiredNodePresent: observation.guard.requiredNodePresent,
summary: observation.guard.summary,
},
executionPlane: {
formal,
expectedDeployments: expectedDeployments.map((deployment) => deployment.name),
deployments: observation.deployments.map(compactDeployment),
},
drift: {
status: deploymentDriftSignals.some((signal) => signal.severity === "blocker") ? "deployment-drift" : "none",
signalCount: deploymentDriftSignals.length,
signals: deploymentDriftSignals.slice(0, options.full ? undefined : 5),
omittedSignals: options.full ? 0 : Math.max(0, deploymentDriftSignals.length - 5),
comparedFields: ["deployment env/annotation commit", "artifact digest", "mounted worktree HEAD", "/api/judge/probe behaviorVersion"],
},
residual: {
status: deprecatedResidualSignals.some((signal) => signal.code === "deprecated-compose-residual") ? "deprecated-compose-residual" : "none",
composeBackendPresent: observation.residual.composeBackend.present,
loopbackPort4222Present: observation.residual.loopbackPort4222.present,
containers: observation.residual.composeBackend.containers,
listeners: observation.residual.loopbackPort4222.listeners,
},
judgeProbe: compactJudgeProbe(observation.judgeProbe, options.full),
blockers,
warnings,
commands: {
refresh: "bun scripts/cli.ts codex execution-plane",
full: "bun scripts/cli.ts codex execution-plane --full",
raw: "bun scripts/cli.ts codex execution-plane --raw",
},
disclosure: {
defaultView: "compact-low-noise",
fullDetailOmitted: !options.full,
rawOmitted: !options.raw,
expandWith: "bun scripts/cli.ts codex execution-plane --full",
rawWith: "bun scripts/cli.ts codex execution-plane --raw",
secretValuesPrinted: false,
rawKubernetesDeploymentJsonPrinted: false,
},
};
if (options.full) {
result.details = {
deployments: observation.deployments,
pods: observation.pods,
services: observation.services,
worktree: observation.worktree,
commandDiagnostics: observation.commandDiagnostics,
};
}
if (options.raw) result.rawObservation = observation;
return result;
}
export async function runCodeQueueExecutionPlane(args: string[], collector: ExecutionPlaneCollector = new LiveExecutionPlaneCollector()): Promise<Record<string, unknown>> {
const options = parseExecutionPlaneOptions(args);
const observation = await collector.collect(options);
return evaluateObservation(observation, options);
}
export async function runCodeQueueExecutionPlaneForTest(args: string[], observation: CodeQueueExecutionPlaneObservation): Promise<Record<string, unknown>> {
return runCodeQueueExecutionPlane(args, { collect: () => observation });
}
+3 -1
View File
@@ -1,6 +1,7 @@
import { mkdirSync, readFileSync, rmSync, statSync, writeFileSync } from "node:fs";
import { runCommand } from "./command";
import { type UniDeskConfig, repoRoot, rootPath } from "./config";
import { runCodeQueueExecutionPlane } from "./code-queue-execution-plane";
import { coreInternalFetch } from "./microservices";
import { previewJson } from "./preview";
import { createResumeId, type ResumeDeliveryState } from "../../src/components/microservices/code-queue/src/resume-confirmation";
@@ -8373,6 +8374,7 @@ export async function runCodeQueueCommand(config: UniDeskConfig, args: string[])
return codeQueueDevReady();
}
if (action === "skills-sync") return codeQueueSkillsSync(args.slice(1));
if (action === "execution-plane" || action === "exec-plane" || action === "runtime-plane") return await runCodeQueueExecutionPlane(args.slice(1));
if (action === "pr-preflight" || action === "runtime-preflight") return codeQueuePrPreflight(args.slice(1), { config });
if (action === "output") {
const taskId = requireTaskId(taskIdArg, "codex output");
@@ -8416,5 +8418,5 @@ export async function runCodeQueueCommand(config: UniDeskConfig, args: string[])
const taskId = requireTaskId(taskIdArg, `codex ${action}`);
return codexSteerTraceConfirm(taskId, args.slice(2));
}
throw new Error("codex command must be one of: prompt-lint, submit, enqueue, task, summary, show, tasks, overview, unread, terminal-unread, output, judge, read, mark-read, dev-ready, health, skills-sync, pr-preflight, runtime-preflight, queues, queue list, queue create, queue merge, move, steer, resume, steer-confirm, interrupt, cancel");
throw new Error("codex command must be one of: prompt-lint, submit, enqueue, task, summary, show, tasks, overview, unread, terminal-unread, output, judge, read, mark-read, dev-ready, health, skills-sync, execution-plane, exec-plane, runtime-plane, pr-preflight, runtime-preflight, queues, queue list, queue create, queue merge, move, steer, resume, steer-confirm, interrupt, cancel");
}
+12 -2
View File
@@ -57,6 +57,7 @@ export function rootHelp(): unknown {
{ command: "codex prompt-lint [prompt|--prompt-file path|--prompt-stdin]", description: "Dry-run lint a runner prompt for DEV test class read-only/live-read/live-mutating authorization without echoing prompt text or touching live services." },
{ command: "codex submit [prompt] [--prompt-file path|--prompt-stdin] [--queue queueId] [--provider-id id] [--cwd path] [--model model] [--execution-mode mode] [--max-attempts N] [--reference-task-id id] [--dry-run]", description: "Submit a Code Queue task through backend-core -> code-queue proxy; --dry-run shows the structured request, routing recommendation, and prompt live-authorization lint while real success only confirms the write and task id." },
{ command: "codex skills-sync --dry-run [--full]", description: "Inspect the controlled runner skills hostPath lifecycle contract without copying files, restarting services, reading secrets, or mutating live runner paths." },
{ command: "codex execution-plane [--full|--raw]", description: "Read-only D601 native k3s Code Queue execution-plane drift inspection; compares formal deployments, deprecated Compose residuals, commit markers, pod digest, mounted worktree HEAD, and judge probe behavior version." },
{ command: "codex pr-preflight [--remote] [--push-dry-run --push-dry-run-ref refs/heads/probe/<name>] [--pr-create-dry-run --pr-create-dry-run-head <head>] [--issue N] [--full|--raw]", description: "Read-only PR admission check with compact commander output by default; use --full or --raw to expand the full runtime preflight, tool, and observation payload." },
{ command: "codex task <taskId> [--detail] [--trace --tail|--from-start|--after-seq N|--before-seq N --limit N] [--full]", description: "Fetch the bounded review view by default; --detail is still capped, while --full/trace/output explicitly expand evidence." },
{ command: "codex tasks [--view commander|supervisor|full] [--queue id] [--status status[,status]] [--unread|--unread-only] [--limit N] [--before-id id]", description: "Show Code Queue task state with progressive disclosure; --view commander is the recommended bounded host-commander loop, supervisor keeps compact sections, and full returns detailed rows." },
@@ -264,7 +265,7 @@ function scheduleHelp(): unknown {
function codexHelp(): unknown {
return {
command: "codex deploy|prompt-lint|submit|task|tasks|unread|output|read|dev-ready|skills-sync|pr-preflight|judge|steer|resume|interrupt|cancel|queues|queue|move",
command: "codex deploy|prompt-lint|submit|task|tasks|unread|output|read|dev-ready|skills-sync|execution-plane|pr-preflight|judge|steer|resume|interrupt|cancel|queues|queue|move",
output: "json",
usage: [
"bun scripts/cli.ts codex deploy <commitId> # disabled legacy deployment entry",
@@ -280,6 +281,7 @@ function codexHelp(): unknown {
"bun scripts/cli.ts codex read <taskId>",
"bun scripts/cli.ts codex dev-ready",
"bun scripts/cli.ts codex skills-sync --dry-run [--full]",
"bun scripts/cli.ts codex execution-plane [--full|--raw]",
"bun scripts/cli.ts codex pr-preflight [--remote] [--push-dry-run --push-dry-run-ref refs/heads/probe/<name>] [--pr-create-dry-run --pr-create-dry-run-head <head>] [--issue N] [--full|--raw]",
"bun scripts/cli.ts codex judge <taskId> --attempt N [--dry-run] [--include-prompt]",
"bun scripts/cli.ts codex steer <taskId> [prompt|--prompt-file path|--prompt-stdin] [--steer-id id] [--dry-run] [--no-retry|--retry-attempts N] [--full|--raw]",
@@ -322,6 +324,14 @@ function codexHelp(): unknown {
fields: ["activeRunnerCount", "source", "target=15", "slotDeficit", "runningTasks", "heartbeat.fresh", "heartbeat.risk", "heartbeat.staleRecoveryCandidates", "queuedCount"],
rule: "Use data.queues.commander.activeRunnerCount and slotDeficit for quick capacity decisions; activeQueueIds are scheduler-local and can be empty during split-brain live.",
},
executionPlane: {
command: "bun scripts/cli.ts codex execution-plane",
mutation: false,
kubeconfig: "/etc/rancher/k3s/k3s.yaml",
namespace: "unidesk",
defaultPolicy: "compact drift/residual summary; deployments, pods, services and probe detail require --full, raw sanitized observations require --raw",
blockers: ["deployment-drift", "deprecated-compose-residual", "d601-k3s-guard-blocked"],
},
examples: {
promptLint: "bun scripts/cli.ts codex prompt-lint --prompt-file /tmp/code-queue-prompt.md",
stdin: [
@@ -334,7 +344,7 @@ function codexHelp(): unknown {
},
disclosure: {
defaultPolicy: "low-noise JSON by default; write commands confirm persistence, list/detail/output commands return bounded summaries with drill-down commands",
expand: ["codex task <taskId> --full", "codex task <taskId> --trace --limit N", "codex output <taskId> --after-seq N --limit N --full-text", "codex tasks --view full --limit N", "codex skills-sync --dry-run --full"],
expand: ["codex task <taskId> --full", "codex task <taskId> --trace --limit N", "codex output <taskId> --after-seq N --limit N --full-text", "codex tasks --view full --limit N", "codex skills-sync --dry-run --full", "codex execution-plane --full"],
},
activityFields: {
path: "data.queues.activity and data.supervisor.activity",
@@ -177,6 +177,7 @@ const firstPaintOverviewWarmUrl = "http://code-queue.local/api/tasks/overview?li
let firstPaintOverviewWarmInFlight: Promise<void> | null = null;
const judgeFailRetryLimit = 3;
const fallbackJudgeRetryLimit = 3;
const judgeProbeBehaviorVersion = "code-queue-judge-probe:v1";
const maxTaskAttempts = 99;
const referenceInjectionMaxRounds: number | null = null;
const retryBackoffBaseMs = 1000;
@@ -3286,6 +3287,11 @@ async function runJudgeProbe(): Promise<Response> {
logger("info", "judge_probe_completed", { configured: config.minimaxApiKey.length > 0, model: config.minimaxModel, hits, total: results.length, hitRate });
return jsonResponse({
ok: true,
behaviorVersion: judgeProbeBehaviorVersion,
behavior: {
version: judgeProbeBehaviorVersion,
contract: "defaultJudgeProbeCases decision/hit contract",
},
configured: config.minimaxApiKey.length > 0,
model: config.minimaxModel,
hits,