Files
pikasTech-unidesk/scripts/src/code-queue-liveness-fixtures.ts
T

260 lines
11 KiB
TypeScript

import { buildExecutionDiagnostics, buildSchedulerHeartbeat, staleRecoveryCandidate, taskHasTraceGapButFreshHeartbeat } from "../../src/components/microservices/code-queue/src/execution-diagnostics";
import type { ActiveRun } from "../../src/components/microservices/code-queue/src/code-agent/common";
import type { CodeQueueExecutionDiagnostics, QueueTask, SchedulerActiveRunHeartbeat, TaskStatus } from "../../src/components/microservices/code-queue/src/types";
export const CODE_QUEUE_LIVENESS_CHECK_NAMES = [
"code-queue:active-run-heartbeat-visible",
"code-queue:trace-gap-not-stale",
"code-queue:stale-active-owner-expired",
"code-queue:control-plane-split-brain-diagnostics",
"code-queue:oa-publisher-degraded-visible",
] as const;
type CodeQueueLivenessCheckName = typeof CODE_QUEUE_LIVENESS_CHECK_NAMES[number];
interface FixtureCheck {
name: CodeQueueLivenessCheckName;
ok: boolean;
detail: Record<string, unknown>;
}
const now = "2026-05-19T00:10:00.000Z";
const freshAt = "2026-05-19T00:09:50.000Z";
const oldTraceAt = "2026-05-18T23:40:00.000Z";
const expiredAt = "2026-05-18T23:50:00.000Z";
function assertCondition(condition: unknown, message: string, detail: Record<string, unknown> = {}): void {
if (!condition) throw new Error(`${message}: ${JSON.stringify(detail)}`);
}
function fixtureTask(id: string, status: TaskStatus, heartbeat: SchedulerActiveRunHeartbeat | null = null): QueueTask {
return {
id,
queueId: "default",
queueEnteredAt: "2026-05-19T00:00:00.000Z",
prompt: `${id} prompt`,
basePrompt: `${id} prompt`,
referenceTaskIds: [],
referenceInjection: null,
providerId: "D601",
cwd: "/workspace",
model: "gpt-5.5",
reasoningEffort: null,
executionMode: "default",
maxAttempts: 99,
status,
createdAt: "2026-05-19T00:00:00.000Z",
updatedAt: "2026-05-19T00:00:00.000Z",
startedAt: status === "running" || status === "judging" ? "2026-05-19T00:00:00.000Z" : null,
finishedAt: null,
readAt: null,
currentAttempt: status === "queued" ? 0 : 1,
currentMode: status === "queued" ? null : "initial",
codexThreadId: heartbeat?.codexThreadId ?? null,
activeTurnId: heartbeat?.activeTurnId ?? null,
schedulerHeartbeat: heartbeat,
finalResponse: "",
outputMaxSeq: heartbeat?.outputMaxSeq ?? 0,
lastError: null,
lastJudge: null,
judgeFailCount: 0,
promptHistory: [],
output: [],
events: [],
attempts: [],
cancelRequested: false,
nextPrompt: null,
nextMode: null,
};
}
function heartbeat(taskId: string, at: string, overrides: Partial<SchedulerActiveRunHeartbeat> = {}): SchedulerActiveRunHeartbeat {
return {
taskId,
queueId: "default",
attempt: 1,
activeTurnId: "turn_fixture",
codexThreadId: "thread_fixture",
owner: "D601",
schedulerInstance: "code-queue-scheduler-fixture",
executionPlane: "scheduler-execution-plane",
agentPort: "codex",
status: "running",
lastLocalHeartbeatAt: at,
lastObservedAgentEventAt: at,
lastPersistedTraceAt: at,
outputMaxSeq: 10,
source: "scheduler",
...overrides,
};
}
function activeRun(taskId: string, queueId = "default"): ActiveRun {
return {
taskId,
queueId,
app: { stop: () => undefined },
port: "codex",
threadId: "thread_fixture",
turnId: "turn_fixture",
startedAt: "2026-05-19T00:00:00.000Z",
lastLocalHeartbeatAt: freshAt,
lastObservedAgentEventAt: freshAt,
lastPersistedTraceAt: freshAt,
};
}
function schedulerDiagnostics(tasks: QueueTask[], activeRuns: ActiveRun[] = [], oaPublisher: unknown = null): CodeQueueExecutionDiagnostics {
return buildExecutionDiagnostics({
now,
controlPlane: "D601-code-queue-scheduler",
executionStateSource: "scheduler-execution-plane",
tasks,
activeRuns,
activeRunSlotCount: activeRuns.length,
activeQueueIds: activeRuns.map((run) => run.queueId),
processingQueueIds: [],
orphanedActiveTaskIds: tasks.filter((task) => (task.status === "running" || task.status === "judging") && !activeRuns.some((run) => run.taskId === task.id)).map((task) => task.id),
oaPublisher: oaPublisher as never,
});
}
function checkActiveRunHeartbeatVisible(): FixtureCheck {
const task = fixtureTask("codex_fixture_active_1", "running");
const run = activeRun(task.id);
task.schedulerHeartbeat = buildSchedulerHeartbeat(task, run, {
now: freshAt,
owner: "D601",
schedulerInstance: "code-queue-scheduler-fixture",
agentPort: "codex",
lastObservedAgentEventAt: freshAt,
lastPersistedTraceAt: freshAt,
});
const diagnostics = schedulerDiagnostics([task], [run]);
assertCondition(diagnostics.activeHeartbeatTaskIds.includes(task.id), "active heartbeat task id must be visible", diagnostics as unknown as Record<string, unknown>);
assertCondition(diagnostics.schedulerActiveTaskIds.includes(task.id), "scheduler active task id must be visible", diagnostics as unknown as Record<string, unknown>);
assertCondition(diagnostics.lastSchedulerHeartbeatAt === freshAt, "last scheduler heartbeat must be surfaced", diagnostics as unknown as Record<string, unknown>);
return {
name: "code-queue:active-run-heartbeat-visible",
ok: true,
detail: {
state: diagnostics.state,
schedulerActiveTaskIds: diagnostics.schedulerActiveTaskIds,
activeHeartbeatTaskIds: diagnostics.activeHeartbeatTaskIds,
lastSchedulerHeartbeatAt: diagnostics.lastSchedulerHeartbeatAt,
heartbeat: task.schedulerHeartbeat,
},
};
}
function checkTraceGapNotStale(): FixtureCheck {
const task = fixtureTask("codex_fixture_trace_gap_1", "running", heartbeat("codex_fixture_trace_gap_1", freshAt, {
lastPersistedTraceAt: oldTraceAt,
outputMaxSeq: 89112,
}));
const decision = staleRecoveryCandidate({ task, localActive: false, now });
const hasTraceGap = taskHasTraceGapButFreshHeartbeat(task, now);
const diagnostics = schedulerDiagnostics([task], []);
assertCondition(hasTraceGap, "trace gap with fresh owner heartbeat should be classified", { decision, diagnostics });
assertCondition(decision.allowed === false && decision.reason === "owner-heartbeat-fresh", "fresh heartbeat must block stale retry", { decision });
assertCondition(diagnostics.traceGapNotStaleTaskIds.includes(task.id), "diagnostics must expose trace gap as not stale", diagnostics as unknown as Record<string, unknown>);
assertCondition(!diagnostics.staleRecoveryCandidateTaskIds.includes(task.id), "trace gap must not enter stale recovery candidates", diagnostics as unknown as Record<string, unknown>);
return {
name: "code-queue:trace-gap-not-stale",
ok: true,
detail: {
decision,
traceGapNotStaleTaskIds: diagnostics.traceGapNotStaleTaskIds,
staleRecoveryCandidateTaskIds: diagnostics.staleRecoveryCandidateTaskIds,
},
};
}
function checkStaleActiveOwnerExpired(): FixtureCheck {
const task = fixtureTask("codex_fixture_stale_1", "running", heartbeat("codex_fixture_stale_1", expiredAt, {
lastObservedAgentEventAt: expiredAt,
lastPersistedTraceAt: expiredAt,
}));
const decision = staleRecoveryCandidate({ task, localActive: false, now });
const diagnostics = schedulerDiagnostics([task], []);
assertCondition(decision.allowed === true && decision.reason === "owner-heartbeat-expired", "expired owner heartbeat should be the stale recovery gate", { decision });
assertCondition(diagnostics.state === "stale-active", "diagnostics must mark stale active only after owner heartbeat expiry", diagnostics as unknown as Record<string, unknown>);
assertCondition(diagnostics.staleRecoveryCandidateTaskIds.includes(task.id), "expired owner heartbeat should create a stale candidate", diagnostics as unknown as Record<string, unknown>);
return {
name: "code-queue:stale-active-owner-expired",
ok: true,
detail: {
decision,
state: diagnostics.state,
staleRecoveryCandidateTaskIds: diagnostics.staleRecoveryCandidateTaskIds,
},
};
}
function checkControlPlaneSplitBrainDiagnostics(): FixtureCheck {
const task = fixtureTask("codex_fixture_split_1", "running", heartbeat("codex_fixture_split_1", freshAt));
const diagnostics = buildExecutionDiagnostics({
now,
controlPlane: "master-code-queue-mgr",
executionStateSource: "postgres-control-plane",
tasks: [task],
activeRuns: [],
activeRunSlotCount: 0,
oaPublisher: null,
});
assertCondition(diagnostics.state === "split-brain" && diagnostics.splitBrain === true, "master postgres-control-plane must report split-brain when DB active has fresh scheduler heartbeat", diagnostics as unknown as Record<string, unknown>);
assertCondition(diagnostics.schedulerActiveRunSlotCount === 0 && diagnostics.databaseActiveTaskCount === 1, "split-brain fixture should preserve the exact control-plane divergence", diagnostics as unknown as Record<string, unknown>);
return {
name: "code-queue:control-plane-split-brain-diagnostics",
ok: true,
detail: {
state: diagnostics.state,
splitBrain: diagnostics.splitBrain,
executionStateSource: diagnostics.executionStateSource,
databaseActiveTaskIds: diagnostics.databaseActiveTaskIds,
schedulerActiveRunSlotCount: diagnostics.schedulerActiveRunSlotCount,
heartbeatFreshTaskIds: diagnostics.heartbeatFreshTaskIds,
},
};
}
function checkOaPublisherDegradedVisible(): FixtureCheck {
const oaPublisher = { pending: 3, lastError: "fixture OA publish retry", lastPublishedAt: null };
const diagnostics = schedulerDiagnostics([], [], oaPublisher);
assertCondition(diagnostics.state === "degraded", "OA publisher pending/lastError must degrade diagnostics", diagnostics as unknown as Record<string, unknown>);
assertCondition(diagnostics.oaPublisher === oaPublisher, "OA publisher detail must remain visible", diagnostics as unknown as Record<string, unknown>);
return {
name: "code-queue:oa-publisher-degraded-visible",
ok: true,
detail: {
state: diagnostics.state,
degraded: diagnostics.degraded,
oaPublisher: diagnostics.oaPublisher as Record<string, unknown>,
},
};
}
export function runCodeQueueLivenessFixtureChecks(only: string[] = []): { ok: boolean; checks: FixtureCheck[]; failures: Array<{ name: string; error: string }> } {
const selected = new Set(only.filter((name) => name.trim().length > 0));
const runners: Array<[CodeQueueLivenessCheckName, () => FixtureCheck]> = [
["code-queue:active-run-heartbeat-visible", checkActiveRunHeartbeatVisible],
["code-queue:trace-gap-not-stale", checkTraceGapNotStale],
["code-queue:stale-active-owner-expired", checkStaleActiveOwnerExpired],
["code-queue:control-plane-split-brain-diagnostics", checkControlPlaneSplitBrainDiagnostics],
["code-queue:oa-publisher-degraded-visible", checkOaPublisherDegradedVisible],
];
const checks: FixtureCheck[] = [];
const failures: Array<{ name: string; error: string }> = [];
for (const [name, run] of runners) {
if (selected.size > 0 && !selected.has(name)) continue;
try {
checks.push(run());
} catch (error) {
failures.push({ name, error: error instanceof Error ? error.message : String(error) });
checks.push({ name, ok: false, detail: { error: error instanceof Error ? error.message : String(error) } });
}
}
if (checks.length === 0) throw new Error(`no Code Queue liveness fixture checks matched: ${Array.from(selected).join(", ")}`);
return { ok: failures.length === 0, checks, failures };
}