207 lines
11 KiB
TypeScript
207 lines
11 KiB
TypeScript
import { codexTasksQueryForTest } from "./src/code-queue";
|
|
import { databaseStorageHealth, classifyTransientDatabaseError, restoreDirtyFlushBatch, takeDirtyFlushBatch } from "../src/components/microservices/code-queue/src/database-resilience";
|
|
import { classifyRunnerError } from "../src/components/microservices/code-queue/src/runner-error-classifier";
|
|
|
|
type JsonRecord = Record<string, unknown>;
|
|
|
|
function assertCondition(condition: unknown, message: string, detail: unknown = {}): void {
|
|
if (!condition) throw new Error(`${message}: ${JSON.stringify(detail)}`);
|
|
}
|
|
|
|
function asRecord(value: unknown, label: string): JsonRecord {
|
|
assertCondition(typeof value === "object" && value !== null && !Array.isArray(value), `${label} must be an object`, value);
|
|
return value as JsonRecord;
|
|
}
|
|
|
|
function asArray(value: unknown, label: string): unknown[] {
|
|
assertCondition(Array.isArray(value), `${label} must be an array`, value);
|
|
return value as unknown[];
|
|
}
|
|
|
|
function postgresSocketWriteError(): Error {
|
|
const error = new TypeError("null is not an object (evaluating 'socket.write')");
|
|
error.stack = [
|
|
"TypeError: null is not an object (evaluating 'socket.write')",
|
|
" at nextWrite (/app/node_modules/postgres/src/connection.js:255:22)",
|
|
" at /app/src/components/microservices/code-queue/src/index.ts:4290:15",
|
|
].join("\n");
|
|
Object.assign(error, { code: "CONNECTION_CLOSED", errno: "CONNECTION_CLOSED" });
|
|
return error;
|
|
}
|
|
|
|
function assertDatabaseClassification(): void {
|
|
const classification = classifyTransientDatabaseError(postgresSocketWriteError());
|
|
assertCondition(classification.transient === true, "socket.write null must be classified as transient database failure", classification);
|
|
assertCondition(classification.retryable === true, "socket.write null must be retryable", classification);
|
|
assertCondition(classification.kind === "socket-write-null", "socket.write null kind must be explicit", classification);
|
|
assertCondition(classification.infrastructureBlocker === true, "socket.write null must be infrastructure-blocker", classification);
|
|
assertCondition(classification.evidence.some((item) => String(item).includes("socket.write")), "classification should expose bounded socket.write evidence", classification);
|
|
const codeOnly = Object.assign(new Error("write CONNECTION_CLOSED"), { code: "CONNECTION_CLOSED" });
|
|
assertCondition(classifyTransientDatabaseError(codeOnly).kind === "connection-closed", "Postgres CONNECTION_CLOSED code should classify even if stack is stripped", classifyTransientDatabaseError(codeOnly));
|
|
}
|
|
|
|
function assertDirtyBatchRestore(): void {
|
|
const taskIds = new Set(["task-b", "task-a"]);
|
|
const queueIds = new Set(["queue-b", "queue-a"]);
|
|
const batch = takeDirtyFlushBatch(taskIds, queueIds, "2026-05-23T00:00:00.000Z");
|
|
assertCondition(taskIds.size === 0 && queueIds.size === 0, "taking a dirty flush batch should clear source sets", { taskIds: Array.from(taskIds), queueIds: Array.from(queueIds) });
|
|
assertCondition(JSON.stringify(batch.taskIds) === JSON.stringify(["task-a", "task-b"]), "dirty task ids should be sorted for deterministic flush", batch);
|
|
restoreDirtyFlushBatch(batch, taskIds, queueIds);
|
|
assertCondition(taskIds.has("task-a") && taskIds.has("task-b") && queueIds.has("queue-a") && queueIds.has("queue-b"), "failed dirty flush must restore all ids for retry", {
|
|
taskIds: Array.from(taskIds),
|
|
queueIds: Array.from(queueIds),
|
|
});
|
|
}
|
|
|
|
function assertStorageHealth(): JsonRecord {
|
|
const health = asRecord(databaseStorageHealth({
|
|
postgresReady: true,
|
|
dirtyTaskCount: 2,
|
|
dirtyQueueCount: 1,
|
|
lastError: postgresSocketWriteError(),
|
|
flushInFlight: false,
|
|
clientRotationCount: 2,
|
|
lastClientRotationAt: "2026-05-23T00:00:01.000Z",
|
|
lastClientRotationReason: "flush-dirty-tasks",
|
|
transientUncaughtSuppressedCount: 1,
|
|
lastTransientUncaughtSuppressedAt: "2026-05-23T00:00:02.000Z",
|
|
consecutiveFlushFailures: 1,
|
|
lastFlushFailureAt: "2026-05-23T00:00:03.000Z",
|
|
nextFlushRetryAt: "2026-05-23T00:00:04.000Z",
|
|
}), "database storage health");
|
|
assertCondition(health.status === "degraded", "storage health should be degraded after transient flush failure", health);
|
|
assertCondition(health.infrastructureBlocker === true, "storage health should mark infrastructure-blocker", health);
|
|
assertCondition(health.lastErrorKind === "socket-write-null", "storage health should expose transient kind", health);
|
|
const signals = asArray(health.signals, "health.signals");
|
|
assertCondition(signals.length === 1, "storage health should include one bounded actionable signal", health);
|
|
const signal = asRecord(signals[0], "health.signals[0]");
|
|
assertCondition(signal.category === "infrastructure-blocker", "storage signal should be an infrastructure-blocker", signal);
|
|
assertCondition(String(signal.commanderAction ?? "").includes("Code Queue infrastructure"), "storage signal should tell commander this is infrastructure", signal);
|
|
return health;
|
|
}
|
|
|
|
function fixtureResponse(path: string): JsonRecord {
|
|
if (path.includes("/summary")) {
|
|
const taskId = decodeURIComponent(path.split("/api/tasks/")[1]?.split("/")[0] ?? "unknown");
|
|
return {
|
|
ok: true,
|
|
status: 200,
|
|
body: {
|
|
ok: true,
|
|
summary: {
|
|
id: taskId,
|
|
queueId: "default",
|
|
status: "retry_wait",
|
|
currentAttempt: 2,
|
|
maxAttempts: 99,
|
|
prompt: "UniDesk#20 Code Queue scheduler Postgres connection rotation crash",
|
|
basePrompt: "UniDesk#20 Code Queue scheduler Postgres connection rotation crash",
|
|
lastError: "database flush degraded after CONNECTION_CLOSED socket.write null",
|
|
lastAssistantMessage: {
|
|
at: "2026-05-23T00:00:00.000Z",
|
|
seq: 44,
|
|
source: "transcript",
|
|
text: "Blocked by Postgres CONNECTION_CLOSED while flushing dirty tasks.",
|
|
},
|
|
},
|
|
},
|
|
};
|
|
}
|
|
assertCondition(path.startsWith("/api/microservices/code-queue/proxy/api/tasks/overview"), "unexpected Code Queue path", { path });
|
|
const health = assertStorageHealth();
|
|
return {
|
|
ok: true,
|
|
status: 200,
|
|
body: {
|
|
ok: true,
|
|
queue: {
|
|
counts: { retry_wait: 1, queued: 0, running: 0, judging: 0 },
|
|
storage: {
|
|
postgresReady: true,
|
|
dirtyTaskCount: 2,
|
|
dirtyQueueCount: 1,
|
|
lastError: "CONNECTION_CLOSED null socket.write",
|
|
health,
|
|
},
|
|
executionDiagnostics: {
|
|
state: "degraded",
|
|
degraded: true,
|
|
effectiveLiveness: "degraded",
|
|
recommendedAction: "observe-degraded",
|
|
databaseActiveTaskCount: 0,
|
|
schedulerActiveRunSlotCount: 0,
|
|
activeHeartbeatCount: 0,
|
|
heartbeatRiskTaskIds: [],
|
|
staleRecoveryCandidateTaskIds: [],
|
|
traceGapTaskIds: [],
|
|
},
|
|
},
|
|
pagination: {
|
|
limit: 20,
|
|
returned: 1,
|
|
total: 1,
|
|
hasMore: false,
|
|
nextBeforeId: null,
|
|
includeActive: true,
|
|
},
|
|
tasks: [
|
|
{
|
|
id: "codex_pg_rotation",
|
|
queueId: "default",
|
|
status: "retry_wait",
|
|
currentAttempt: 2,
|
|
updatedAt: "2026-05-23T00:00:10.000Z",
|
|
prompt: "UniDesk#20 Code Queue scheduler Postgres connection rotation crash",
|
|
basePrompt: "UniDesk#20 Code Queue scheduler Postgres connection rotation crash",
|
|
displayPrompt: "UniDesk#20 Code Queue scheduler Postgres connection rotation crash",
|
|
lastError: "database flush degraded after CONNECTION_CLOSED socket.write null",
|
|
},
|
|
],
|
|
},
|
|
};
|
|
}
|
|
|
|
function assertCommanderInfrastructureSignal(): void {
|
|
const result = asRecord(codexTasksQueryForTest(["--view", "commander", "--limit", "20"], fixtureResponse), "commander result");
|
|
const commander = asRecord(result.commander, "result.commander");
|
|
const infrastructure = asRecord(commander.infrastructure, "commander.infrastructure");
|
|
assertCondition(infrastructure.infrastructureBlocker === true, "commander view should surface storage issue as infrastructure-blocker", infrastructure);
|
|
assertCondition(infrastructure.status === "degraded", "commander infrastructure status should be degraded", infrastructure);
|
|
assertCondition(String(infrastructure.actionable ?? "").includes("Code Queue infrastructure-blocker"), "commander infrastructure signal should be actionable", infrastructure);
|
|
const signals = asArray(infrastructure.signals, "commander.infrastructure.signals");
|
|
assertCondition(signals.length === 1, "commander infrastructure signals should be bounded", infrastructure);
|
|
const signal = asRecord(signals[0], "commander.infrastructure.signals[0]");
|
|
assertCondition(signal.category === "infrastructure-blocker", "commander signal category should be infrastructure-blocker", signal);
|
|
const riskCounts = asRecord(commander.riskCounts, "commander.riskCounts");
|
|
assertCondition(riskCounts.infrastructureBlocker === 1, "commander risk counts should expose infrastructure blocker", riskCounts);
|
|
const classification = asRecord(commander.classification, "commander.classification");
|
|
const byCategory = asRecord(classification.byCategory, "commander.classification.byCategory");
|
|
assertCondition(Number(byCategory["infrastructure-blocker"] ?? 0) >= 1, "task classification should identify postgres scheduler crash as infrastructure-blocker", classification);
|
|
}
|
|
|
|
function assertStaleBadResumeClassification(): void {
|
|
const classification = classifyRunnerError("app-server error: no rollout found for thread id thread_bad_resume_123", "D601");
|
|
assertCondition(classification.staleBadResume === true, "no rollout found for thread id should be stale bad-resume", classification);
|
|
assertCondition(classification.failureKind === "stale-bad-resume-thread-rollout-missing", "bad resume failure kind should be explicit", classification);
|
|
assertCondition(classification.retryable === true, "bad resume should be retryable after superseding stale thread", classification);
|
|
assertCondition(classification.disposition === "service-degraded", "bad resume should be service-degraded, not business failure", classification);
|
|
}
|
|
|
|
assertDatabaseClassification();
|
|
assertDirtyBatchRestore();
|
|
assertStorageHealth();
|
|
assertCommanderInfrastructureSignal();
|
|
assertStaleBadResumeClassification();
|
|
|
|
process.stdout.write(`${JSON.stringify({
|
|
ok: true,
|
|
checks: [
|
|
"CONNECTION_CLOSED socket.write null is classified as transient postgres infrastructure",
|
|
"CONNECTION_CLOSED code-only errors are retryable",
|
|
"dirty flush batch restore keeps task/queue ids dirty for retry",
|
|
"storage health emits bounded infrastructure-blocker signal",
|
|
"codex tasks --view commander surfaces the storage signal",
|
|
"stale bad-resume no rollout found for thread id is retryable and superseded",
|
|
],
|
|
}, null, 2)}\n`);
|