import { codexTasksQueryForTest } from "./src/code-queue"; import { databaseStorageHealth, classifyTransientDatabaseError, restoreDirtyFlushBatch, takeDirtyFlushBatch } from "../src/components/microservices/code-queue/src/database-resilience"; import { classifyRunnerError } from "../src/components/microservices/code-queue/src/runner-error-classifier"; type JsonRecord = Record; function assertCondition(condition: unknown, message: string, detail: unknown = {}): void { if (!condition) throw new Error(`${message}: ${JSON.stringify(detail)}`); } function asRecord(value: unknown, label: string): JsonRecord { assertCondition(typeof value === "object" && value !== null && !Array.isArray(value), `${label} must be an object`, value); return value as JsonRecord; } function asArray(value: unknown, label: string): unknown[] { assertCondition(Array.isArray(value), `${label} must be an array`, value); return value as unknown[]; } function postgresSocketWriteError(): Error { const error = new TypeError("null is not an object (evaluating 'socket.write')"); error.stack = [ "TypeError: null is not an object (evaluating 'socket.write')", " at nextWrite (/app/node_modules/postgres/src/connection.js:255:22)", " at /app/src/components/microservices/code-queue/src/index.ts:4290:15", ].join("\n"); Object.assign(error, { code: "CONNECTION_CLOSED", errno: "CONNECTION_CLOSED" }); return error; } function assertDatabaseClassification(): void { const classification = classifyTransientDatabaseError(postgresSocketWriteError()); assertCondition(classification.transient === true, "socket.write null must be classified as transient database failure", classification); assertCondition(classification.retryable === true, "socket.write null must be retryable", classification); assertCondition(classification.kind === "socket-write-null", "socket.write null kind must be explicit", classification); assertCondition(classification.infrastructureBlocker === true, "socket.write null must be infrastructure-blocker", classification); assertCondition(classification.evidence.some((item) => String(item).includes("socket.write")), "classification should expose bounded socket.write evidence", classification); const codeOnly = Object.assign(new Error("write CONNECTION_CLOSED"), { code: "CONNECTION_CLOSED" }); assertCondition(classifyTransientDatabaseError(codeOnly).kind === "connection-closed", "Postgres CONNECTION_CLOSED code should classify even if stack is stripped", classifyTransientDatabaseError(codeOnly)); } function assertDirtyBatchRestore(): void { const taskIds = new Set(["task-b", "task-a"]); const queueIds = new Set(["queue-b", "queue-a"]); const batch = takeDirtyFlushBatch(taskIds, queueIds, "2026-05-23T00:00:00.000Z"); assertCondition(taskIds.size === 0 && queueIds.size === 0, "taking a dirty flush batch should clear source sets", { taskIds: Array.from(taskIds), queueIds: Array.from(queueIds) }); assertCondition(JSON.stringify(batch.taskIds) === JSON.stringify(["task-a", "task-b"]), "dirty task ids should be sorted for deterministic flush", batch); restoreDirtyFlushBatch(batch, taskIds, queueIds); assertCondition(taskIds.has("task-a") && taskIds.has("task-b") && queueIds.has("queue-a") && queueIds.has("queue-b"), "failed dirty flush must restore all ids for retry", { taskIds: Array.from(taskIds), queueIds: Array.from(queueIds), }); } function assertStorageHealth(): JsonRecord { const health = asRecord(databaseStorageHealth({ postgresReady: true, dirtyTaskCount: 2, dirtyQueueCount: 1, lastError: postgresSocketWriteError(), flushInFlight: false, clientRotationCount: 2, lastClientRotationAt: "2026-05-23T00:00:01.000Z", lastClientRotationReason: "flush-dirty-tasks", transientUncaughtSuppressedCount: 1, lastTransientUncaughtSuppressedAt: "2026-05-23T00:00:02.000Z", consecutiveFlushFailures: 1, lastFlushFailureAt: "2026-05-23T00:00:03.000Z", nextFlushRetryAt: "2026-05-23T00:00:04.000Z", }), "database storage health"); assertCondition(health.status === "degraded", "storage health should be degraded after transient flush failure", health); assertCondition(health.infrastructureBlocker === true, "storage health should mark infrastructure-blocker", health); assertCondition(health.lastErrorKind === "socket-write-null", "storage health should expose transient kind", health); const signals = asArray(health.signals, "health.signals"); assertCondition(signals.length === 1, "storage health should include one bounded actionable signal", health); const signal = asRecord(signals[0], "health.signals[0]"); assertCondition(signal.category === "infrastructure-blocker", "storage signal should be an infrastructure-blocker", signal); assertCondition(String(signal.commanderAction ?? "").includes("Code Queue infrastructure"), "storage signal should tell commander this is infrastructure", signal); return health; } function fixtureResponse(path: string): JsonRecord { if (path.includes("/summary")) { const taskId = decodeURIComponent(path.split("/api/tasks/")[1]?.split("/")[0] ?? "unknown"); return { ok: true, status: 200, body: { ok: true, summary: { id: taskId, queueId: "default", status: "retry_wait", currentAttempt: 2, maxAttempts: 99, prompt: "UniDesk#20 Code Queue scheduler Postgres connection rotation crash", basePrompt: "UniDesk#20 Code Queue scheduler Postgres connection rotation crash", lastError: "database flush degraded after CONNECTION_CLOSED socket.write null", lastAssistantMessage: { at: "2026-05-23T00:00:00.000Z", seq: 44, source: "transcript", text: "Blocked by Postgres CONNECTION_CLOSED while flushing dirty tasks.", }, }, }, }; } assertCondition(path.startsWith("/api/microservices/code-queue/proxy/api/tasks/overview"), "unexpected Code Queue path", { path }); const health = assertStorageHealth(); return { ok: true, status: 200, body: { ok: true, queue: { counts: { retry_wait: 1, queued: 0, running: 0, judging: 0 }, storage: { postgresReady: true, dirtyTaskCount: 2, dirtyQueueCount: 1, lastError: "CONNECTION_CLOSED null socket.write", health, }, executionDiagnostics: { state: "degraded", degraded: true, effectiveLiveness: "degraded", recommendedAction: "observe-degraded", databaseActiveTaskCount: 0, schedulerActiveRunSlotCount: 0, activeHeartbeatCount: 0, heartbeatRiskTaskIds: [], staleRecoveryCandidateTaskIds: [], traceGapTaskIds: [], }, }, pagination: { limit: 20, returned: 1, total: 1, hasMore: false, nextBeforeId: null, includeActive: true, }, tasks: [ { id: "codex_pg_rotation", queueId: "default", status: "retry_wait", currentAttempt: 2, updatedAt: "2026-05-23T00:00:10.000Z", prompt: "UniDesk#20 Code Queue scheduler Postgres connection rotation crash", basePrompt: "UniDesk#20 Code Queue scheduler Postgres connection rotation crash", displayPrompt: "UniDesk#20 Code Queue scheduler Postgres connection rotation crash", lastError: "database flush degraded after CONNECTION_CLOSED socket.write null", }, ], }, }; } function assertCommanderInfrastructureSignal(): void { const result = asRecord(codexTasksQueryForTest(["--view", "commander", "--limit", "20"], fixtureResponse), "commander result"); const commander = asRecord(result.commander, "result.commander"); const infrastructure = asRecord(commander.infrastructure, "commander.infrastructure"); assertCondition(infrastructure.infrastructureBlocker === true, "commander view should surface storage issue as infrastructure-blocker", infrastructure); assertCondition(infrastructure.status === "degraded", "commander infrastructure status should be degraded", infrastructure); assertCondition(String(infrastructure.actionable ?? "").includes("Code Queue infrastructure-blocker"), "commander infrastructure signal should be actionable", infrastructure); const signals = asArray(infrastructure.signals, "commander.infrastructure.signals"); assertCondition(signals.length === 1, "commander infrastructure signals should be bounded", infrastructure); const signal = asRecord(signals[0], "commander.infrastructure.signals[0]"); assertCondition(signal.category === "infrastructure-blocker", "commander signal category should be infrastructure-blocker", signal); const riskCounts = asRecord(commander.riskCounts, "commander.riskCounts"); assertCondition(riskCounts.infrastructureBlocker === 1, "commander risk counts should expose infrastructure blocker", riskCounts); const classification = asRecord(commander.classification, "commander.classification"); const byCategory = asRecord(classification.byCategory, "commander.classification.byCategory"); assertCondition(Number(byCategory["infrastructure-blocker"] ?? 0) >= 1, "task classification should identify postgres scheduler crash as infrastructure-blocker", classification); } function assertStaleBadResumeClassification(): void { const classification = classifyRunnerError("app-server error: no rollout found for thread id thread_bad_resume_123", "D601"); assertCondition(classification.staleBadResume === true, "no rollout found for thread id should be stale bad-resume", classification); assertCondition(classification.failureKind === "stale-bad-resume-thread-rollout-missing", "bad resume failure kind should be explicit", classification); assertCondition(classification.retryable === true, "bad resume should be retryable after superseding stale thread", classification); assertCondition(classification.disposition === "service-degraded", "bad resume should be service-degraded, not business failure", classification); } assertDatabaseClassification(); assertDirtyBatchRestore(); assertStorageHealth(); assertCommanderInfrastructureSignal(); assertStaleBadResumeClassification(); process.stdout.write(`${JSON.stringify({ ok: true, checks: [ "CONNECTION_CLOSED socket.write null is classified as transient postgres infrastructure", "CONNECTION_CLOSED code-only errors are retryable", "dirty flush batch restore keeps task/queue ids dirty for retry", "storage health emits bounded infrastructure-blocker signal", "codex tasks --view commander surfaces the storage signal", "stale bad-resume no rollout found for thread id is retryable and superseded", ], }, null, 2)}\n`);