Files
pikasTech-unidesk/scripts/code-queue-postgres-rotation-contract-test.ts
T
2026-05-23 13:33:07 +00:00

207 lines
11 KiB
TypeScript

import { codexTasksQueryForTest } from "./src/code-queue";
import { databaseStorageHealth, classifyTransientDatabaseError, restoreDirtyFlushBatch, takeDirtyFlushBatch } from "../src/components/microservices/code-queue/src/database-resilience";
import { classifyRunnerError } from "../src/components/microservices/code-queue/src/runner-error-classifier";
type JsonRecord = Record<string, unknown>;
function assertCondition(condition: unknown, message: string, detail: unknown = {}): void {
if (!condition) throw new Error(`${message}: ${JSON.stringify(detail)}`);
}
function asRecord(value: unknown, label: string): JsonRecord {
assertCondition(typeof value === "object" && value !== null && !Array.isArray(value), `${label} must be an object`, value);
return value as JsonRecord;
}
function asArray(value: unknown, label: string): unknown[] {
assertCondition(Array.isArray(value), `${label} must be an array`, value);
return value as unknown[];
}
function postgresSocketWriteError(): Error {
const error = new TypeError("null is not an object (evaluating 'socket.write')");
error.stack = [
"TypeError: null is not an object (evaluating 'socket.write')",
" at nextWrite (/app/node_modules/postgres/src/connection.js:255:22)",
" at /app/src/components/microservices/code-queue/src/index.ts:4290:15",
].join("\n");
Object.assign(error, { code: "CONNECTION_CLOSED", errno: "CONNECTION_CLOSED" });
return error;
}
function assertDatabaseClassification(): void {
const classification = classifyTransientDatabaseError(postgresSocketWriteError());
assertCondition(classification.transient === true, "socket.write null must be classified as transient database failure", classification);
assertCondition(classification.retryable === true, "socket.write null must be retryable", classification);
assertCondition(classification.kind === "socket-write-null", "socket.write null kind must be explicit", classification);
assertCondition(classification.infrastructureBlocker === true, "socket.write null must be infrastructure-blocker", classification);
assertCondition(classification.evidence.some((item) => String(item).includes("socket.write")), "classification should expose bounded socket.write evidence", classification);
const codeOnly = Object.assign(new Error("write CONNECTION_CLOSED"), { code: "CONNECTION_CLOSED" });
assertCondition(classifyTransientDatabaseError(codeOnly).kind === "connection-closed", "Postgres CONNECTION_CLOSED code should classify even if stack is stripped", classifyTransientDatabaseError(codeOnly));
}
function assertDirtyBatchRestore(): void {
const taskIds = new Set(["task-b", "task-a"]);
const queueIds = new Set(["queue-b", "queue-a"]);
const batch = takeDirtyFlushBatch(taskIds, queueIds, "2026-05-23T00:00:00.000Z");
assertCondition(taskIds.size === 0 && queueIds.size === 0, "taking a dirty flush batch should clear source sets", { taskIds: Array.from(taskIds), queueIds: Array.from(queueIds) });
assertCondition(JSON.stringify(batch.taskIds) === JSON.stringify(["task-a", "task-b"]), "dirty task ids should be sorted for deterministic flush", batch);
restoreDirtyFlushBatch(batch, taskIds, queueIds);
assertCondition(taskIds.has("task-a") && taskIds.has("task-b") && queueIds.has("queue-a") && queueIds.has("queue-b"), "failed dirty flush must restore all ids for retry", {
taskIds: Array.from(taskIds),
queueIds: Array.from(queueIds),
});
}
function assertStorageHealth(): JsonRecord {
const health = asRecord(databaseStorageHealth({
postgresReady: true,
dirtyTaskCount: 2,
dirtyQueueCount: 1,
lastError: postgresSocketWriteError(),
flushInFlight: false,
clientRotationCount: 2,
lastClientRotationAt: "2026-05-23T00:00:01.000Z",
lastClientRotationReason: "flush-dirty-tasks",
transientUncaughtSuppressedCount: 1,
lastTransientUncaughtSuppressedAt: "2026-05-23T00:00:02.000Z",
consecutiveFlushFailures: 1,
lastFlushFailureAt: "2026-05-23T00:00:03.000Z",
nextFlushRetryAt: "2026-05-23T00:00:04.000Z",
}), "database storage health");
assertCondition(health.status === "degraded", "storage health should be degraded after transient flush failure", health);
assertCondition(health.infrastructureBlocker === true, "storage health should mark infrastructure-blocker", health);
assertCondition(health.lastErrorKind === "socket-write-null", "storage health should expose transient kind", health);
const signals = asArray(health.signals, "health.signals");
assertCondition(signals.length === 1, "storage health should include one bounded actionable signal", health);
const signal = asRecord(signals[0], "health.signals[0]");
assertCondition(signal.category === "infrastructure-blocker", "storage signal should be an infrastructure-blocker", signal);
assertCondition(String(signal.commanderAction ?? "").includes("Code Queue infrastructure"), "storage signal should tell commander this is infrastructure", signal);
return health;
}
function fixtureResponse(path: string): JsonRecord {
if (path.includes("/summary")) {
const taskId = decodeURIComponent(path.split("/api/tasks/")[1]?.split("/")[0] ?? "unknown");
return {
ok: true,
status: 200,
body: {
ok: true,
summary: {
id: taskId,
queueId: "default",
status: "retry_wait",
currentAttempt: 2,
maxAttempts: 99,
prompt: "UniDesk#20 Code Queue scheduler Postgres connection rotation crash",
basePrompt: "UniDesk#20 Code Queue scheduler Postgres connection rotation crash",
lastError: "database flush degraded after CONNECTION_CLOSED socket.write null",
lastAssistantMessage: {
at: "2026-05-23T00:00:00.000Z",
seq: 44,
source: "transcript",
text: "Blocked by Postgres CONNECTION_CLOSED while flushing dirty tasks.",
},
},
},
};
}
assertCondition(path.startsWith("/api/microservices/code-queue/proxy/api/tasks/overview"), "unexpected Code Queue path", { path });
const health = assertStorageHealth();
return {
ok: true,
status: 200,
body: {
ok: true,
queue: {
counts: { retry_wait: 1, queued: 0, running: 0, judging: 0 },
storage: {
postgresReady: true,
dirtyTaskCount: 2,
dirtyQueueCount: 1,
lastError: "CONNECTION_CLOSED null socket.write",
health,
},
executionDiagnostics: {
state: "degraded",
degraded: true,
effectiveLiveness: "degraded",
recommendedAction: "observe-degraded",
databaseActiveTaskCount: 0,
schedulerActiveRunSlotCount: 0,
activeHeartbeatCount: 0,
heartbeatRiskTaskIds: [],
staleRecoveryCandidateTaskIds: [],
traceGapTaskIds: [],
},
},
pagination: {
limit: 20,
returned: 1,
total: 1,
hasMore: false,
nextBeforeId: null,
includeActive: true,
},
tasks: [
{
id: "codex_pg_rotation",
queueId: "default",
status: "retry_wait",
currentAttempt: 2,
updatedAt: "2026-05-23T00:00:10.000Z",
prompt: "UniDesk#20 Code Queue scheduler Postgres connection rotation crash",
basePrompt: "UniDesk#20 Code Queue scheduler Postgres connection rotation crash",
displayPrompt: "UniDesk#20 Code Queue scheduler Postgres connection rotation crash",
lastError: "database flush degraded after CONNECTION_CLOSED socket.write null",
},
],
},
};
}
function assertCommanderInfrastructureSignal(): void {
const result = asRecord(codexTasksQueryForTest(["--view", "commander", "--limit", "20"], fixtureResponse), "commander result");
const commander = asRecord(result.commander, "result.commander");
const infrastructure = asRecord(commander.infrastructure, "commander.infrastructure");
assertCondition(infrastructure.infrastructureBlocker === true, "commander view should surface storage issue as infrastructure-blocker", infrastructure);
assertCondition(infrastructure.status === "degraded", "commander infrastructure status should be degraded", infrastructure);
assertCondition(String(infrastructure.actionable ?? "").includes("Code Queue infrastructure-blocker"), "commander infrastructure signal should be actionable", infrastructure);
const signals = asArray(infrastructure.signals, "commander.infrastructure.signals");
assertCondition(signals.length === 1, "commander infrastructure signals should be bounded", infrastructure);
const signal = asRecord(signals[0], "commander.infrastructure.signals[0]");
assertCondition(signal.category === "infrastructure-blocker", "commander signal category should be infrastructure-blocker", signal);
const riskCounts = asRecord(commander.riskCounts, "commander.riskCounts");
assertCondition(riskCounts.infrastructureBlocker === 1, "commander risk counts should expose infrastructure blocker", riskCounts);
const classification = asRecord(commander.classification, "commander.classification");
const byCategory = asRecord(classification.byCategory, "commander.classification.byCategory");
assertCondition(Number(byCategory["infrastructure-blocker"] ?? 0) >= 1, "task classification should identify postgres scheduler crash as infrastructure-blocker", classification);
}
function assertStaleBadResumeClassification(): void {
const classification = classifyRunnerError("app-server error: no rollout found for thread id thread_bad_resume_123", "D601");
assertCondition(classification.staleBadResume === true, "no rollout found for thread id should be stale bad-resume", classification);
assertCondition(classification.failureKind === "stale-bad-resume-thread-rollout-missing", "bad resume failure kind should be explicit", classification);
assertCondition(classification.retryable === true, "bad resume should be retryable after superseding stale thread", classification);
assertCondition(classification.disposition === "service-degraded", "bad resume should be service-degraded, not business failure", classification);
}
assertDatabaseClassification();
assertDirtyBatchRestore();
assertStorageHealth();
assertCommanderInfrastructureSignal();
assertStaleBadResumeClassification();
process.stdout.write(`${JSON.stringify({
ok: true,
checks: [
"CONNECTION_CLOSED socket.write null is classified as transient postgres infrastructure",
"CONNECTION_CLOSED code-only errors are retryable",
"dirty flush batch restore keeps task/queue ids dirty for retry",
"storage health emits bounded infrastructure-blocker signal",
"codex tasks --view commander surfaces the storage signal",
"stale bad-resume no rollout found for thread id is retryable and superseded",
],
}, null, 2)}\n`);