fix: harden code queue postgres rotation

This commit is contained in:
Codex
2026-05-23 13:32:39 +00:00
parent ed513628e6
commit bb5351cc94
7 changed files with 597 additions and 37 deletions
@@ -0,0 +1,206 @@
import { codexTasksQueryForTest } from "./src/code-queue";
import { databaseStorageHealth, classifyTransientDatabaseError, restoreDirtyFlushBatch, takeDirtyFlushBatch } from "../src/components/microservices/code-queue/src/database-resilience";
import { classifyRunnerError } from "../src/components/microservices/code-queue/src/runner-error-classifier";
type JsonRecord = Record<string, unknown>;
function assertCondition(condition: unknown, message: string, detail: unknown = {}): void {
if (!condition) throw new Error(`${message}: ${JSON.stringify(detail)}`);
}
function asRecord(value: unknown, label: string): JsonRecord {
assertCondition(typeof value === "object" && value !== null && !Array.isArray(value), `${label} must be an object`, value);
return value as JsonRecord;
}
function asArray(value: unknown, label: string): unknown[] {
assertCondition(Array.isArray(value), `${label} must be an array`, value);
return value as unknown[];
}
function postgresSocketWriteError(): Error {
const error = new TypeError("null is not an object (evaluating 'socket.write')");
error.stack = [
"TypeError: null is not an object (evaluating 'socket.write')",
" at nextWrite (/app/node_modules/postgres/src/connection.js:255:22)",
" at /app/src/components/microservices/code-queue/src/index.ts:4290:15",
].join("\n");
Object.assign(error, { code: "CONNECTION_CLOSED", errno: "CONNECTION_CLOSED" });
return error;
}
function assertDatabaseClassification(): void {
const classification = classifyTransientDatabaseError(postgresSocketWriteError());
assertCondition(classification.transient === true, "socket.write null must be classified as transient database failure", classification);
assertCondition(classification.retryable === true, "socket.write null must be retryable", classification);
assertCondition(classification.kind === "socket-write-null", "socket.write null kind must be explicit", classification);
assertCondition(classification.infrastructureBlocker === true, "socket.write null must be infrastructure-blocker", classification);
assertCondition(classification.evidence.some((item) => String(item).includes("socket.write")), "classification should expose bounded socket.write evidence", classification);
const codeOnly = Object.assign(new Error("write CONNECTION_CLOSED"), { code: "CONNECTION_CLOSED" });
assertCondition(classifyTransientDatabaseError(codeOnly).kind === "connection-closed", "Postgres CONNECTION_CLOSED code should classify even if stack is stripped", classifyTransientDatabaseError(codeOnly));
}
function assertDirtyBatchRestore(): void {
const taskIds = new Set(["task-b", "task-a"]);
const queueIds = new Set(["queue-b", "queue-a"]);
const batch = takeDirtyFlushBatch(taskIds, queueIds, "2026-05-23T00:00:00.000Z");
assertCondition(taskIds.size === 0 && queueIds.size === 0, "taking a dirty flush batch should clear source sets", { taskIds: Array.from(taskIds), queueIds: Array.from(queueIds) });
assertCondition(JSON.stringify(batch.taskIds) === JSON.stringify(["task-a", "task-b"]), "dirty task ids should be sorted for deterministic flush", batch);
restoreDirtyFlushBatch(batch, taskIds, queueIds);
assertCondition(taskIds.has("task-a") && taskIds.has("task-b") && queueIds.has("queue-a") && queueIds.has("queue-b"), "failed dirty flush must restore all ids for retry", {
taskIds: Array.from(taskIds),
queueIds: Array.from(queueIds),
});
}
function assertStorageHealth(): JsonRecord {
const health = asRecord(databaseStorageHealth({
postgresReady: true,
dirtyTaskCount: 2,
dirtyQueueCount: 1,
lastError: postgresSocketWriteError(),
flushInFlight: false,
clientRotationCount: 2,
lastClientRotationAt: "2026-05-23T00:00:01.000Z",
lastClientRotationReason: "flush-dirty-tasks",
transientUncaughtSuppressedCount: 1,
lastTransientUncaughtSuppressedAt: "2026-05-23T00:00:02.000Z",
consecutiveFlushFailures: 1,
lastFlushFailureAt: "2026-05-23T00:00:03.000Z",
nextFlushRetryAt: "2026-05-23T00:00:04.000Z",
}), "database storage health");
assertCondition(health.status === "degraded", "storage health should be degraded after transient flush failure", health);
assertCondition(health.infrastructureBlocker === true, "storage health should mark infrastructure-blocker", health);
assertCondition(health.lastErrorKind === "socket-write-null", "storage health should expose transient kind", health);
const signals = asArray(health.signals, "health.signals");
assertCondition(signals.length === 1, "storage health should include one bounded actionable signal", health);
const signal = asRecord(signals[0], "health.signals[0]");
assertCondition(signal.category === "infrastructure-blocker", "storage signal should be an infrastructure-blocker", signal);
assertCondition(String(signal.commanderAction ?? "").includes("Code Queue infrastructure"), "storage signal should tell commander this is infrastructure", signal);
return health;
}
function fixtureResponse(path: string): JsonRecord {
if (path.includes("/summary")) {
const taskId = decodeURIComponent(path.split("/api/tasks/")[1]?.split("/")[0] ?? "unknown");
return {
ok: true,
status: 200,
body: {
ok: true,
summary: {
id: taskId,
queueId: "default",
status: "retry_wait",
currentAttempt: 2,
maxAttempts: 99,
prompt: "UniDesk#20 Code Queue scheduler Postgres connection rotation crash",
basePrompt: "UniDesk#20 Code Queue scheduler Postgres connection rotation crash",
lastError: "database flush degraded after CONNECTION_CLOSED socket.write null",
lastAssistantMessage: {
at: "2026-05-23T00:00:00.000Z",
seq: 44,
source: "transcript",
text: "Blocked by Postgres CONNECTION_CLOSED while flushing dirty tasks.",
},
},
},
};
}
assertCondition(path.startsWith("/api/microservices/code-queue/proxy/api/tasks/overview"), "unexpected Code Queue path", { path });
const health = assertStorageHealth();
return {
ok: true,
status: 200,
body: {
ok: true,
queue: {
counts: { retry_wait: 1, queued: 0, running: 0, judging: 0 },
storage: {
postgresReady: true,
dirtyTaskCount: 2,
dirtyQueueCount: 1,
lastError: "CONNECTION_CLOSED null socket.write",
health,
},
executionDiagnostics: {
state: "degraded",
degraded: true,
effectiveLiveness: "degraded",
recommendedAction: "observe-degraded",
databaseActiveTaskCount: 0,
schedulerActiveRunSlotCount: 0,
activeHeartbeatCount: 0,
heartbeatRiskTaskIds: [],
staleRecoveryCandidateTaskIds: [],
traceGapTaskIds: [],
},
},
pagination: {
limit: 20,
returned: 1,
total: 1,
hasMore: false,
nextBeforeId: null,
includeActive: true,
},
tasks: [
{
id: "codex_pg_rotation",
queueId: "default",
status: "retry_wait",
currentAttempt: 2,
updatedAt: "2026-05-23T00:00:10.000Z",
prompt: "UniDesk#20 Code Queue scheduler Postgres connection rotation crash",
basePrompt: "UniDesk#20 Code Queue scheduler Postgres connection rotation crash",
displayPrompt: "UniDesk#20 Code Queue scheduler Postgres connection rotation crash",
lastError: "database flush degraded after CONNECTION_CLOSED socket.write null",
},
],
},
};
}
function assertCommanderInfrastructureSignal(): void {
const result = asRecord(codexTasksQueryForTest(["--view", "commander", "--limit", "20"], fixtureResponse), "commander result");
const commander = asRecord(result.commander, "result.commander");
const infrastructure = asRecord(commander.infrastructure, "commander.infrastructure");
assertCondition(infrastructure.infrastructureBlocker === true, "commander view should surface storage issue as infrastructure-blocker", infrastructure);
assertCondition(infrastructure.status === "degraded", "commander infrastructure status should be degraded", infrastructure);
assertCondition(String(infrastructure.actionable ?? "").includes("Code Queue infrastructure-blocker"), "commander infrastructure signal should be actionable", infrastructure);
const signals = asArray(infrastructure.signals, "commander.infrastructure.signals");
assertCondition(signals.length === 1, "commander infrastructure signals should be bounded", infrastructure);
const signal = asRecord(signals[0], "commander.infrastructure.signals[0]");
assertCondition(signal.category === "infrastructure-blocker", "commander signal category should be infrastructure-blocker", signal);
const riskCounts = asRecord(commander.riskCounts, "commander.riskCounts");
assertCondition(riskCounts.infrastructureBlocker === 1, "commander risk counts should expose infrastructure blocker", riskCounts);
const classification = asRecord(commander.classification, "commander.classification");
const byCategory = asRecord(classification.byCategory, "commander.classification.byCategory");
assertCondition(Number(byCategory["infrastructure-blocker"] ?? 0) >= 1, "task classification should identify postgres scheduler crash as infrastructure-blocker", classification);
}
function assertStaleBadResumeClassification(): void {
const classification = classifyRunnerError("app-server error: no rollout found for thread id thread_bad_resume_123", "D601");
assertCondition(classification.staleBadResume === true, "no rollout found for thread id should be stale bad-resume", classification);
assertCondition(classification.failureKind === "stale-bad-resume-thread-rollout-missing", "bad resume failure kind should be explicit", classification);
assertCondition(classification.retryable === true, "bad resume should be retryable after superseding stale thread", classification);
assertCondition(classification.disposition === "service-degraded", "bad resume should be service-degraded, not business failure", classification);
}
assertDatabaseClassification();
assertDirtyBatchRestore();
assertStorageHealth();
assertCommanderInfrastructureSignal();
assertStaleBadResumeClassification();
process.stdout.write(`${JSON.stringify({
ok: true,
checks: [
"CONNECTION_CLOSED socket.write null is classified as transient postgres infrastructure",
"CONNECTION_CLOSED code-only errors are retryable",
"dirty flush batch restore keeps task/queue ids dirty for retry",
"storage health emits bounded infrastructure-blocker signal",
"codex tasks --view commander surfaces the storage signal",
"stale bad-resume no rollout found for thread id is retryable and superseded",
],
}, null, 2)}\n`);
+4
View File
@@ -50,6 +50,7 @@ const syntaxFiles = [
"scripts/microservice-health-output-contract-test.ts",
"scripts/code-queue-supervisor-disclosure-contract-test.ts",
"scripts/code-queue-commander-view-contract-test.ts",
"scripts/code-queue-postgres-rotation-contract-test.ts",
"scripts/ssh-argv-guidance-contract-test.ts",
"src/components/frontend/src/index.ts",
"src/components/frontend/src/app.tsx",
@@ -356,6 +357,7 @@ export function runChecks(config: UniDeskConfig, options: CheckOptions = default
fileItem("scripts/code-queue-gh-auth-redaction-contract-test.ts"),
fileItem("scripts/code-queue-supervisor-disclosure-contract-test.ts"),
fileItem("scripts/code-queue-commander-view-contract-test.ts"),
fileItem("scripts/code-queue-postgres-rotation-contract-test.ts"),
fileItem("scripts/host-codex-commander-skeleton-contract-test.ts"),
fileItem("scripts/host-codex-commander-no-daemon-smoke-contract-test.ts"),
fileItem("scripts/host-codex-commander-prompt-lint-contract-test.ts"),
@@ -410,6 +412,7 @@ export function runChecks(config: UniDeskConfig, options: CheckOptions = default
items.push(commandItem("code-queue:gh-auth-redaction-contract", ["bun", "scripts/code-queue-gh-auth-redaction-contract-test.ts"], 30_000));
items.push(commandItem("code-queue:supervisor-disclosure-contract", ["bun", "scripts/code-queue-supervisor-disclosure-contract-test.ts"], 30_000));
items.push(commandItem("code-queue:commander-view-contract", ["bun", "scripts/code-queue-commander-view-contract-test.ts"], 30_000));
items.push(commandItem("code-queue:postgres-rotation-contract", ["bun", "scripts/code-queue-postgres-rotation-contract-test.ts"], 30_000));
items.push(commandItem("host-codex-commander:skeleton-contract", ["bun", "scripts/host-codex-commander-skeleton-contract-test.ts"], 30_000));
items.push(commandItem("host-codex-commander:no-daemon-smoke-contract", ["bun", "scripts/host-codex-commander-no-daemon-smoke-contract-test.ts"], 30_000));
items.push(commandItem("host-codex-commander:prompt-lint-contract", ["bun", "scripts/host-codex-commander-prompt-lint-contract-test.ts"], 30_000));
@@ -452,6 +455,7 @@ export function runChecks(config: UniDeskConfig, options: CheckOptions = default
items.push(skippedItem("code-queue:gh-auth-redaction-contract", "Code Queue GitHub auth output redaction contract is opt-in with script checks", "--scripts-typecheck or --full"));
items.push(skippedItem("code-queue:supervisor-disclosure-contract", "Code Queue supervisor disclosure contract is opt-in with script checks", "--scripts-typecheck or --full"));
items.push(skippedItem("code-queue:commander-view-contract", "Code Queue commander view contract is opt-in with script checks", "--scripts-typecheck or --full"));
items.push(skippedItem("code-queue:postgres-rotation-contract", "Code Queue postgres rotation crash contract is opt-in with script checks", "--scripts-typecheck or --full"));
items.push(skippedItem("host-codex-commander:skeleton-contract", "host Codex commander skeleton contract is opt-in with script checks", "--scripts-typecheck or --full"));
items.push(skippedItem("host-codex-commander:no-daemon-smoke-contract", "host Codex commander no-daemon smoke contract is opt-in with script checks", "--scripts-typecheck or --full"));
items.push(skippedItem("host-codex-commander:prompt-lint-contract", "host Codex commander prompt boundary lint contract is opt-in with script checks", "--scripts-typecheck or --full"));
+46 -1
View File
@@ -3861,12 +3861,54 @@ function blockerLikeFinalResponseSignals(task: Record<string, unknown>, summary:
if (pattern.test(text) && !signals.includes(id)) signals.push(id);
};
add("blocked", /\b(blocked|blocker|cannot proceed|can't proceed|stuck|waiting for commander|needs authorization|need authorization|requires approval|permission denied)\b|||||/iu);
add("infra-auth-network", /\b(auth|token|credential|github transient|dns|rate limit|429|timeout|timed out|unreachable|offline|proxy|tunnel|provider unavailable)\b|||||/iu);
add("infra-auth-network", /\b(auth|token|credential|github transient|dns|rate limit|429|timeout|timed out|unreachable|offline|proxy|tunnel|provider unavailable|postgres|database|connection_closed|socket\.write|null is not an object)\b||||||/iu);
add("merge-or-test-failure", /\b(conflict|merge failed|tests? failed|typecheck failed|syntax failed|build failed|ci failed|e2e failed)\b||||/iu);
add("not-deployed", /\b(not deployed|not rebuilt|not rolled out|deploy skipped|rollout skipped|needs rollout|requires deploy)\b|||线|/iu);
return signals;
}
function commanderInfrastructureSignals(rawQueue: Record<string, unknown>): Record<string, unknown> {
const storage = asRecord(rawQueue.storage) ?? {};
const health = asRecord(storage.health) ?? {};
const rawSignals = asArray(health.signals).map((item) => asRecord(item)).filter((item): item is Record<string, unknown> => item !== null);
const boundedSignals = rawSignals.slice(0, 3).map((signal) => ({
id: asString(signal.id) || "code-queue-infrastructure-blocker",
category: asString(signal.category) || "infrastructure-blocker",
severity: asString(signal.severity) || "high",
retryable: asBoolean(signal.retryable),
bounded: signal.bounded !== false,
message: boundedInlineString(asString(signal.message), 240).text,
evidence: stringList(signal.evidence).slice(0, diagnosticsReasonPreviewLimit),
commanderAction: boundedInlineString(asString(signal.commanderAction), 260).text,
source: asString(signal.source) || "code-queue",
}));
const storageDegraded = health.degraded === true || storage.postgresReady === false || storage.lastError !== null && storage.lastError !== undefined;
return {
infrastructureBlocker: storageDegraded || boundedSignals.some((signal) => signal.category === "infrastructure-blocker"),
status: health.status ?? (storageDegraded ? "degraded" : "ready"),
source: "queue.storage.health",
signalCount: rawSignals.length,
omittedSignalCount: Math.max(0, rawSignals.length - boundedSignals.length),
bounded: true,
signals: boundedSignals,
storage: {
postgresReady: storage.postgresReady ?? health.postgresReady ?? null,
dirtyTaskCount: storage.dirtyTaskCount ?? health.dirtyTaskCount ?? null,
dirtyQueueCount: storage.dirtyQueueCount ?? health.dirtyQueueCount ?? null,
consecutiveFlushFailures: health.consecutiveFlushFailures ?? null,
lastFlushFailureAt: health.lastFlushFailureAt ?? null,
nextFlushRetryAt: health.nextFlushRetryAt ?? null,
clientRotationCount: health.clientRotationCount ?? null,
lastClientRotationAt: health.lastClientRotationAt ?? null,
lastErrorKind: health.lastErrorKind ?? null,
lastErrorTransient: health.lastErrorTransient ?? null,
},
actionable: storageDegraded
? "Treat as Code Queue infrastructure-blocker; inspect storage health/logs and wait for bounded dirty-flush retry before duplicating or canceling business tasks."
: "No Code Queue storage infrastructure blocker reported in this overview page.",
};
}
function commanderAttentionReasons(
task: Record<string, unknown>,
summary: Record<string, unknown> | null,
@@ -4123,6 +4165,7 @@ function codexTasksCommanderResult(
const rawQueue = asRecord(taskPage.queue) ?? {};
const rawDiagnostics = asRecord(rawQueue.executionDiagnostics) ?? {};
const diagnostics = supervisorExecutionDiagnostics(rawDiagnostics);
const infrastructure = commanderInfrastructureSignals(rawQueue);
const activity = compactCodeQueueActivity(rawQueue, diagnostics, { snapshotRole: "supervisor-poll" });
const commanderConcurrency = asRecord(activity.commanderConcurrency) ?? {};
const runningTasks = sortRunningWatchTasks(allTasks);
@@ -4219,6 +4262,7 @@ function codexTasksCommanderResult(
riskCounts: {
attention: attentionCounts(attentionItems, returnedAttention),
activeRisks: activeRiskTasks.length,
infrastructureBlocker: infrastructure.infrastructureBlocker === true ? 1 : 0,
heartbeatRiskTaskIds: stringList(rawDiagnostics.heartbeatRiskTaskIds).length,
staleRecoveryCandidateTaskIds: stringList(rawDiagnostics.staleRecoveryCandidateTaskIds).length,
traceGapTaskIds: stringList(rawDiagnostics.traceGapTaskIds).length,
@@ -4227,6 +4271,7 @@ function codexTasksCommanderResult(
},
highPriorityIssues: commanderHighPriorityIssues(allTasks, summaries),
classification: commanderClassificationCounts(allTasks, summaries),
infrastructure,
executionDiagnostics: diagnostics,
degraded,
commands: {
@@ -0,0 +1,190 @@
import type { JsonValue } from "./types";
export type DatabaseTransientKind =
| "deadlock"
| "connection-closed"
| "connection-reset"
| "connection-timeout"
| "connection-terminated"
| "socket-write-null"
| "unknown";
export interface DatabaseTransientClassification {
transient: boolean;
retryable: boolean;
kind: DatabaseTransientKind | null;
source: "postgres-deadlock" | "postgres-client" | "unknown";
infrastructureBlocker: boolean;
message: string;
evidence: string[];
recommendedAction: string;
}
export interface DirtyFlushBatch {
taskIds: string[];
queueIds: string[];
takenAt: string;
}
export interface DatabaseStorageHealthInput {
postgresReady: boolean;
dirtyTaskCount: number;
dirtyQueueCount: number;
lastError: unknown;
flushInFlight: boolean;
clientRotationCount: number;
lastClientRotationAt: string | null;
lastClientRotationReason: string | null;
transientUncaughtSuppressedCount: number;
lastTransientUncaughtSuppressedAt: string | null;
consecutiveFlushFailures: number;
lastFlushFailureAt: string | null;
nextFlushRetryAt: string | null;
}
function errorField(error: unknown, key: "code" | "errno"): string {
if (typeof error !== "object" || error === null) return "";
const value = (error as Record<string, unknown>)[key];
return typeof value === "string" ? value : "";
}
export function databaseErrorText(error: unknown): string {
if (error instanceof Error) return `${error.name}: ${error.message}\n${error.stack ?? ""}\n${errorField(error, "code")}\n${errorField(error, "errno")}`;
return String(error);
}
function collectEvidence(text: string, patterns: RegExp[], limit = 5): string[] {
const evidence: string[] = [];
for (const pattern of patterns) {
for (const match of text.matchAll(pattern)) {
const value = String(match[0] ?? "").replace(/\s+/gu, " ").trim();
if (value.length > 0 && !evidence.includes(value)) evidence.push(value.slice(0, 220));
if (evidence.length >= limit) return evidence;
}
}
return evidence;
}
export function classifyTransientDatabaseError(error: unknown): DatabaseTransientClassification {
const text = databaseErrorText(error);
const lower = text.toLowerCase();
const deadlock = /deadlock detected/iu.test(text);
const code = `${errorField(error, "code")}\n${errorField(error, "errno")}`;
const postgresClientCode = /CONNECTION_CLOSED|CONNECTION_DESTROYED|CONNECTION_ENDED|CONNECT_TIMEOUT/iu.test(code);
const postgresClient = postgresClientCode || /node_modules\/postgres|postgres(?:@\d+(?:\.\d+){0,2})?\/src|postgres\/src|d601-tcp-egress-gateway|:15432|unidesk_code_queue|set_config/iu.test(text);
const combined = `${text}\n${code}`;
const socketWriteNull = /socket\.write|null is not an object|cannot read propert(?:y|ies) ['"]?write|evaluating ['"]socket\.write/iu.test(combined);
const connectionClosed = /CONNECTION_CLOSED|socket connection was closed|connection closed/iu.test(combined);
const connectionReset = /ECONNRESET|EPIPE|broken pipe/iu.test(combined);
const connectionTimeout = /ETIMEDOUT|CONNECT_TIMEOUT|Connection timeout|connect timed out/iu.test(combined);
const connectionTerminated = /Connection terminated|CONNECTION_DESTROYED|CONNECTION_ENDED/iu.test(combined);
const transient = deadlock || (postgresClient && (socketWriteNull || connectionClosed || connectionReset || connectionTimeout || connectionTerminated));
const kind: DatabaseTransientKind | null = deadlock
? "deadlock"
: socketWriteNull && postgresClient
? "socket-write-null"
: connectionClosed && postgresClient
? "connection-closed"
: connectionReset && postgresClient
? "connection-reset"
: connectionTimeout && postgresClient
? "connection-timeout"
: connectionTerminated && postgresClient
? "connection-terminated"
: transient
? "unknown"
: null;
const evidence = collectEvidence(lower, [
/postgres(?:@\d+(?:\.\d+){0,2})?\/src\/connection\.js:\d+/giu,
/node_modules\/postgres\/src\/connection\.js:\d+/giu,
/connection_closed/giu,
/socket\.write/giu,
/null is not an object/giu,
/d601-tcp-egress-gateway[^\s'"]*/giu,
/:15432/giu,
/deadlock detected/giu,
/econnreset|epipe|etimedout|connect_timeout/giu,
]);
return {
transient,
retryable: transient,
kind,
source: deadlock ? "postgres-deadlock" : postgresClient ? "postgres-client" : "unknown",
infrastructureBlocker: transient,
message: transient
? "PostgreSQL client connection was transiently closed; rotate the client, keep dirty writes queued, and retry without crashing the scheduler."
: "Error is not recognized as a transient PostgreSQL client failure.",
evidence,
recommendedAction: transient
? "Treat as Code Queue infrastructure, not task/business failure; inspect scheduler storage health and let bounded dirty-flush retry run before duplicating work."
: "Handle through the caller's normal error path.",
};
}
export function isTransientDatabaseError(error: unknown): boolean {
return classifyTransientDatabaseError(error).transient;
}
export function takeDirtyFlushBatch(taskIds: Set<string>, queueIds: Set<string>, nowIso: string): DirtyFlushBatch {
const batch = {
taskIds: Array.from(taskIds).sort(),
queueIds: Array.from(queueIds).sort(),
takenAt: nowIso,
};
taskIds.clear();
queueIds.clear();
return batch;
}
export function restoreDirtyFlushBatch(batch: DirtyFlushBatch, taskIds: Set<string>, queueIds: Set<string>): void {
for (const taskId of batch.taskIds) taskIds.add(taskId);
for (const queueId of batch.queueIds) queueIds.add(queueId);
}
export function databaseStorageHealth(input: DatabaseStorageHealthInput): JsonValue {
const classification = input.lastError === null ? null : classifyTransientDatabaseError(input.lastError);
const dirtyCount = Math.max(0, input.dirtyTaskCount) + Math.max(0, input.dirtyQueueCount);
const infrastructureBlocker = input.postgresReady !== true || input.lastError !== null || input.consecutiveFlushFailures > 0;
const status = input.postgresReady !== true
? "unavailable"
: infrastructureBlocker
? "degraded"
: input.flushInFlight || dirtyCount > 0
? "recovering"
: "ready";
const signal = infrastructureBlocker
? [{
id: classification?.kind === "socket-write-null" ? "postgres-client-socket-write-null" : "postgres-connection-rotation",
category: "infrastructure-blocker",
severity: input.postgresReady !== true ? "critical" : "high",
retryable: classification?.retryable ?? true,
bounded: true,
message: classification?.message ?? "Code Queue PostgreSQL storage is not ready or has pending dirty writes after a flush failure.",
evidence: classification?.evidence ?? [],
commanderAction: classification?.recommendedAction ?? "Inspect Code Queue storage health and scheduler logs before treating affected tasks as failed.",
source: "code-queue.storage.postgres",
} as Record<string, JsonValue>]
: [];
return {
status,
degraded: infrastructureBlocker,
infrastructureBlocker,
postgresReady: input.postgresReady,
dirtyTaskCount: input.dirtyTaskCount,
dirtyQueueCount: input.dirtyQueueCount,
dirtyCount,
flushInFlight: input.flushInFlight,
consecutiveFlushFailures: input.consecutiveFlushFailures,
lastFlushFailureAt: input.lastFlushFailureAt,
nextFlushRetryAt: input.nextFlushRetryAt,
clientRotationCount: input.clientRotationCount,
lastClientRotationAt: input.lastClientRotationAt,
lastClientRotationReason: input.lastClientRotationReason,
transientUncaughtSuppressedCount: input.transientUncaughtSuppressedCount,
lastTransientUncaughtSuppressedAt: input.lastTransientUncaughtSuppressedAt,
lastErrorKind: classification?.kind ?? null,
lastErrorTransient: classification?.transient ?? false,
lastErrorMessage: input.lastError === null ? null : databaseErrorText(input.lastError).split(/\r?\n/u)[0]?.slice(0, 500) ?? null,
signals: signal as unknown as JsonValue,
};
}
@@ -119,6 +119,7 @@ import {
} from "./queue-api";
import { buildExecutionDiagnostics, buildSchedulerHeartbeat, normalizeSchedulerHeartbeatStaleMs, schedulerHeartbeatStaleMs, schedulerHeartbeatStaleMsMax, staleRecoveryCandidate } from "./execution-diagnostics";
import { ReferenceTaskLookupError, configureReferences, injectReferencedTaskContext, taskReferenceIds } from "./references";
import { classifyTransientDatabaseError, databaseStorageHealth, isTransientDatabaseError, restoreDirtyFlushBatch, takeDirtyFlushBatch } from "./database-resilience";
import {
applyOaTraceStatsToTaskJson,
configureOaEvents,
@@ -236,8 +237,17 @@ function createSqlClient(): SqlClient {
let databaseReady = false;
let databaseLastError: string | null = null;
let databaseLastErrorDetail: unknown = null;
let databaseFlushTimer: ReturnType<typeof setTimeout> | null = null;
let databaseFlushInFlight = false;
let databaseFlushFailureCount = 0;
let databaseLastFlushFailureAt: string | null = null;
let databaseNextFlushRetryAt: string | null = null;
let databaseClientRotationCount = 0;
let databaseLastClientRotationAt: string | null = null;
let databaseLastClientRotationReason: string | null = null;
let databaseTransientUncaughtSuppressedCount = 0;
let databaseLastTransientUncaughtSuppressedAt: string | null = null;
const dirtyDatabaseTaskIds = new Set<string>();
const dirtyDatabaseQueueIds = new Set<string>();
const workdirRecords = new Map<string, WorkdirRecord>();
@@ -1220,29 +1230,27 @@ function errorToJson(error: unknown): JsonValue {
return String(error);
}
function transientDatabaseErrorMessage(error: unknown): string {
const message = error instanceof Error ? error.message : String(error);
const stack = error instanceof Error ? error.stack ?? "" : "";
return `${message}\n${stack}`;
}
function isTransientDatabaseError(error: unknown): boolean {
const text = transientDatabaseErrorMessage(error);
if (/deadlock detected/iu.test(text)) return true;
const looksLikePostgresClient = /node_modules\/postgres|postgres\/src|d601-tcp-egress-gateway|:15432|unidesk_code_queue|set_config/iu.test(text);
if (!looksLikePostgresClient) return false;
return /CONNECTION_CLOSED|ECONNRESET|EPIPE|ETIMEDOUT|Connection terminated|socket connection was closed|socket\.write|null is not an object/iu.test(text);
}
function transientDatabaseRetryDelayMs(attempt: number): number {
return Math.min(2000, 150 * Math.max(1, attempt));
}
function scheduleDatabaseFlushRetry(delayMs: number): void {
if (serviceRoleReadOnly(config.serviceRole)) return;
if (!databaseReady || shutdownRequested) return;
if (dirtyDatabaseTaskIds.size === 0 && dirtyDatabaseQueueIds.size === 0) return;
if (databaseFlushTimer !== null) return;
const boundedDelayMs = Math.max(100, Math.min(30_000, Math.floor(delayMs)));
scheduleDatabaseFlush(boundedDelayMs, { retry: true });
}
async function rotateSqlClientAfterTransientError(reason: string, error: unknown): Promise<void> {
databaseLastError = databaseErrorMessage(error);
setDatabaseLastError(error);
if (sqlRotationInFlight !== null) return sqlRotationInFlight;
const previous = sql;
sql = createSqlClient();
databaseClientRotationCount += 1;
databaseLastClientRotationAt = nowIso();
databaseLastClientRotationReason = reason;
sqlRotationInFlight = previous.end({ timeout: 1 })
.catch((closeError) => logger("warn", "database_client_close_failed", { reason, error: errorToJson(closeError) }))
.finally(() => {
@@ -1259,7 +1267,8 @@ async function withTransientDatabaseRetries<T>(operation: string, action: () =>
return await action();
} catch (error) {
lastError = error;
if (!isTransientDatabaseError(error) || attempt >= attempts) throw error;
const classification = classifyTransientDatabaseError(error);
if (!classification.transient || attempt >= attempts) throw error;
logger("warn", "database_transient_retry", { operation, attempt, attempts, delayMs: transientDatabaseRetryDelayMs(attempt), error: errorToJson(error) });
await rotateSqlClientAfterTransientError(operation, error);
await Bun.sleep(transientDatabaseRetryDelayMs(attempt));
@@ -1291,6 +1300,16 @@ function databaseErrorMessage(error: unknown): string {
return String(error);
}
function setDatabaseLastError(error: unknown): void {
databaseLastError = databaseErrorMessage(error);
databaseLastErrorDetail = error;
}
function clearDatabaseLastError(): void {
databaseLastError = null;
databaseLastErrorDetail = null;
}
function markTaskDirty(taskId: string): void {
dirtyDatabaseTaskIds.add(taskId);
scheduleDatabaseFlush();
@@ -1319,14 +1338,15 @@ function runGarbageCollection(): void {
if (typeof gc === "function") gc(true);
}
function scheduleDatabaseFlush(delayMs = config.databaseFlushIntervalMs): void {
function scheduleDatabaseFlush(delayMs = config.databaseFlushIntervalMs, options: { retry?: boolean } = {}): void {
if (serviceRoleReadOnly(config.serviceRole)) return;
if (!databaseReady || (dirtyDatabaseTaskIds.size === 0 && dirtyDatabaseQueueIds.size === 0) || shutdownRequested) return;
if (databaseFlushTimer !== null) return;
if (options.retry === true) databaseNextFlushRetryAt = new Date(Date.now() + Math.max(0, delayMs)).toISOString();
databaseFlushTimer = setTimeout(() => {
databaseFlushTimer = null;
void flushDirtyTasksToDatabase().catch((error) => {
databaseLastError = databaseErrorMessage(error);
setDatabaseLastError(error);
logger("error", "database_flush_failed", { error: errorToJson(error), dirtyTaskCount: dirtyDatabaseTaskIds.size });
});
}, delayMs);
@@ -2105,7 +2125,7 @@ function scheduleStartupDatabaseMaintenance(): void {
durationMs: Math.round(performance.now() - started),
});
})().catch((error) => {
databaseLastError = databaseErrorMessage(error);
setDatabaseLastError(error);
logger("warn", "database_startup_maintenance_failed", { error: errorToJson(error) });
});
}, 1000).unref?.();
@@ -2129,7 +2149,7 @@ async function findTaskForRead(taskId: string): Promise<QueueTask | null> {
if (databaseTask === null) return hotTask;
return shouldPreferHotTaskOverDatabase(hotTask, databaseTask) ? hotTask : databaseTask;
} catch (error) {
databaseLastError = databaseErrorMessage(error);
setDatabaseLastError(error);
logger("warn", "read_database_fallback", { taskId, error: errorToJson(error) });
return hotTask;
}
@@ -2144,7 +2164,7 @@ async function findTaskForMutation(taskId: string): Promise<QueueTask | null> {
if (hotTask === null) return rememberHotTask(databaseTask);
return shouldPreferHotTaskOverDatabase(hotTask, databaseTask) ? hotTask : reconcileHotTaskFromDatabase(databaseTask);
} catch (error) {
databaseLastError = databaseErrorMessage(error);
setDatabaseLastError(error);
logger("warn", "mutation_database_fallback", { taskId, error: errorToJson(error) });
return hotTask;
}
@@ -2170,11 +2190,10 @@ async function flushDirtyTasksToDatabase(force = false): Promise<void> {
scheduleDatabaseFlush();
return;
}
const ids = Array.from(dirtyDatabaseTaskIds).sort();
const queueIds = Array.from(dirtyDatabaseQueueIds).sort();
const batch = takeDirtyFlushBatch(dirtyDatabaseTaskIds, dirtyDatabaseQueueIds, nowIso());
const ids = batch.taskIds;
const queueIds = batch.queueIds;
if (ids.length === 0 && queueIds.length === 0) return;
dirtyDatabaseTaskIds.clear();
dirtyDatabaseQueueIds.clear();
databaseFlushInFlight = true;
let rejectedTaskIds: string[] = [];
try {
@@ -2190,10 +2209,32 @@ async function flushDirtyTasksToDatabase(force = false): Promise<void> {
}
return currentRejectedTaskIds;
}));
databaseLastError = null;
clearDatabaseLastError();
databaseFlushFailureCount = 0;
databaseLastFlushFailureAt = null;
databaseNextFlushRetryAt = null;
} catch (error) {
for (const id of ids) dirtyDatabaseTaskIds.add(id);
for (const id of queueIds) dirtyDatabaseQueueIds.add(id);
restoreDirtyFlushBatch(batch, dirtyDatabaseTaskIds, dirtyDatabaseQueueIds);
setDatabaseLastError(error);
databaseFlushFailureCount += 1;
databaseLastFlushFailureAt = nowIso();
const classification = classifyTransientDatabaseError(error);
if (classification.transient) {
await rotateSqlClientAfterTransientError("flush-dirty-tasks-final", error);
const retryDelayMs = Math.min(30_000, Math.max(config.databaseFlushIntervalMs, transientDatabaseRetryDelayMs(databaseFlushFailureCount + 1) * Math.max(1, databaseFlushFailureCount)));
logger("error", "database_flush_degraded_retry_scheduled", {
operation: "flush-dirty-tasks",
force,
failureCount: databaseFlushFailureCount,
retryDelayMs,
dirtyTaskCount: dirtyDatabaseTaskIds.size,
dirtyQueueCount: dirtyDatabaseQueueIds.size,
classification: classification as unknown as JsonValue,
error: errorToJson(error),
});
scheduleDatabaseFlushRetry(retryDelayMs);
return;
}
throw error;
} finally {
databaseFlushInFlight = false;
@@ -2368,7 +2409,7 @@ async function initDatabasePersistenceWithRetry(): Promise<void> {
await initDatabasePersistence();
return;
} catch (error) {
databaseLastError = databaseErrorMessage(error);
setDatabaseLastError(error);
const elapsedMs = Date.now() - started;
logger("warn", "database_persistence_init_retry", { attempt, elapsedMs, error: errorToJson(error) });
if (elapsedMs > 90_000 && !reportedLongWait) {
@@ -2923,7 +2964,23 @@ configureQueueApi({
databaseLastError: () => databaseLastError,
databaseReady: () => databaseReady,
defaultQueueId,
dirtyDatabaseQueueCount: () => dirtyDatabaseQueueIds.size,
dirtyDatabaseTaskCount: () => dirtyDatabaseTaskIds.size,
databaseStorageHealth: () => databaseStorageHealth({
postgresReady: databaseReady,
dirtyTaskCount: dirtyDatabaseTaskIds.size,
dirtyQueueCount: dirtyDatabaseQueueIds.size,
lastError: databaseLastErrorDetail ?? databaseLastError,
flushInFlight: databaseFlushInFlight,
clientRotationCount: databaseClientRotationCount,
lastClientRotationAt: databaseLastClientRotationAt,
lastClientRotationReason: databaseLastClientRotationReason,
transientUncaughtSuppressedCount: databaseTransientUncaughtSuppressedCount,
lastTransientUncaughtSuppressedAt: databaseLastTransientUncaughtSuppressedAt,
consecutiveFlushFailures: databaseFlushFailureCount,
lastFlushFailureAt: databaseLastFlushFailureAt,
nextFlushRetryAt: databaseNextFlushRetryAt,
}),
jsonResponse,
judgeFailRetryLimit,
loadQueuesFromDatabase,
@@ -3489,9 +3546,24 @@ function latestAttemptExternalProvider429(task: QueueTask): boolean {
return latestAttemptRunnerErrorClassification(task)?.externalProvider429 === true;
}
function latestAttemptStaleBadResume(task: QueueTask): boolean {
return latestAttemptRunnerErrorClassification(task)?.staleBadResume === true;
}
function clearStaleBadResumeThread(task: QueueTask): boolean {
if (!latestAttemptStaleBadResume(task) || task.codexThreadId === null) return false;
const previousThreadId = task.codexThreadId;
task.codexThreadId = null;
task.activeTurnId = null;
appendOutput(task, "system", `stale bad-resume thread ${previousThreadId} superseded; next retry will start a fresh Codex thread because app-server reported no rollout found for thread id\n`, "thread/resume-superseded");
logger("warn", "stale_bad_resume_thread_superseded", { taskId: task.id, previousThreadId });
return true;
}
function retryBackoffEvidence(task: QueueTask, completedAttempts: number, reason: string): JsonValue {
const runnerClassification = latestAttemptRunnerErrorClassification(task);
const externalProvider429 = runnerClassification?.externalProvider429 === true;
const staleBadResume = runnerClassification?.staleBadResume === true;
const baseDelayMs = retryBackoffMs(completedAttempts);
const jittered = externalProvider429
? retryBackoffWithJitterMs(task, completedAttempts, 30_000)
@@ -3505,9 +3577,12 @@ function retryBackoffEvidence(task: QueueTask, completedAttempts: number, reason
delayMs: jittered.delayMs,
maxDelayMs: retryBackoffMaxMs,
externalProvider429,
staleBadResume,
failureKind: runnerClassification?.failureKind ?? null,
runnerDisposition: runnerClassification?.disposition ?? null,
commanderBenefit: "fewer failed runners and clearer retry/backoff evidence",
commanderBenefit: staleBadResume
? "stale resume thread is superseded once and retried on a fresh Codex thread"
: "fewer failed runners and clearer retry/backoff evidence",
};
}
@@ -4072,6 +4147,7 @@ async function runTask(task: QueueTask): Promise<void> {
const nextPrompt = judgeFailContinuationPrompt(task, judge, task.judgeFailCount);
task.nextPrompt = nextPrompt;
task.nextMode = "retry";
clearStaleBadResumeThread(task);
setAttemptFeedbackPrompt(latestAttempt, nextPrompt, "judge-fail-retry", task.attempts.length + 1);
task.updatedAt = nowIso();
appendOutput(task, "system", `judge=fail treated as retry (${task.judgeFailCount}/${judgeFailRetryLimit}); appending continuation prompt to existing session\n`, "queue");
@@ -4135,6 +4211,7 @@ async function runTask(task: QueueTask): Promise<void> {
const nextPrompt = retryPrompt(task, judge);
task.nextPrompt = nextPrompt;
task.nextMode = "retry";
clearStaleBadResumeThread(task);
setAttemptFeedbackPrompt(latestAttempt, nextPrompt, judge.continuePrompt?.trim() ? "judge-continue-prompt" : "judge-retry-generated", task.attempts.length + 1);
task.updatedAt = nowIso();
persistTaskState(task);
@@ -4516,7 +4593,7 @@ function startSchedulerDatabasePoller(): void {
const interval = setInterval(() => {
if (!serviceReady || shutdownRequested) return;
void refreshSchedulerTasksFromDatabase("poll").catch((error) => {
databaseLastError = databaseErrorMessage(error);
setDatabaseLastError(error);
logger("warn", "scheduler_database_poll_failed", { error: errorToJson(error) });
});
}, config.schedulerPollIntervalMs);
@@ -4572,8 +4649,17 @@ function installShutdownHandlers(): void {
process.once("SIGINT", stop);
process.on("uncaughtException", (error) => {
if (isTransientDatabaseError(error)) {
databaseTransientUncaughtSuppressedCount += 1;
databaseLastTransientUncaughtSuppressedAt = nowIso();
void rotateSqlClientAfterTransientError("uncaught-exception", error).catch((closeError) => logger("warn", "database_client_rotate_uncaught_failed", { error: errorToJson(closeError) }));
logger("error", "transient_database_uncaught_exception_suppressed", { error: errorToJson(error) });
if (dirtyDatabaseTaskIds.size > 0 || dirtyDatabaseQueueIds.size > 0) scheduleDatabaseFlushRetry(config.databaseFlushIntervalMs);
logger("error", "transient_database_uncaught_exception_suppressed", {
suppressedCount: databaseTransientUncaughtSuppressedCount,
dirtyTaskCount: dirtyDatabaseTaskIds.size,
dirtyQueueCount: dirtyDatabaseQueueIds.size,
classification: classifyTransientDatabaseError(error) as unknown as JsonValue,
error: errorToJson(error),
});
return;
}
logger("error", "uncaught_exception", { error: errorToJson(error) });
@@ -4581,8 +4667,17 @@ function installShutdownHandlers(): void {
});
process.on("unhandledRejection", (reason) => {
if (isTransientDatabaseError(reason)) {
databaseTransientUncaughtSuppressedCount += 1;
databaseLastTransientUncaughtSuppressedAt = nowIso();
void rotateSqlClientAfterTransientError("unhandled-rejection", reason).catch((error) => logger("warn", "database_client_rotate_rejection_failed", { error: errorToJson(error) }));
logger("error", "transient_database_unhandled_rejection_suppressed", { error: errorToJson(reason) });
if (dirtyDatabaseTaskIds.size > 0 || dirtyDatabaseQueueIds.size > 0) scheduleDatabaseFlushRetry(config.databaseFlushIntervalMs);
logger("error", "transient_database_unhandled_rejection_suppressed", {
suppressedCount: databaseTransientUncaughtSuppressedCount,
dirtyTaskCount: dirtyDatabaseTaskIds.size,
dirtyQueueCount: dirtyDatabaseQueueIds.size,
classification: classifyTransientDatabaseError(reason) as unknown as JsonValue,
error: errorToJson(reason),
});
return;
}
logger("error", "unhandled_rejection", { error: errorToJson(reason) });
@@ -5965,7 +6060,7 @@ async function startDatabaseBackedRuntime(): Promise<void> {
persistState();
serviceReady = true;
void refreshSchedulerTasksFromDatabase("startup").catch((error) => {
databaseLastError = databaseErrorMessage(error);
setDatabaseLastError(error);
logger("warn", "scheduler_database_startup_refresh_failed", { error: errorToJson(error) });
});
startSchedulerDatabasePoller();
@@ -5979,6 +6074,6 @@ async function startDatabaseBackedRuntime(): Promise<void> {
void initDatabasePersistenceWithRetry()
.then(() => startDatabaseBackedRuntime())
.catch((error) => {
databaseLastError = databaseErrorMessage(error);
setDatabaseLastError(error);
logger("error", "database_persistence_init_failed", { error: errorToJson(error) });
});
@@ -24,7 +24,9 @@ export interface QueueApiContext {
databaseLastError: () => string | null;
databaseReady: () => boolean;
defaultQueueId: string;
dirtyDatabaseQueueCount: () => number;
dirtyDatabaseTaskCount: () => number;
databaseStorageHealth: () => JsonValue;
jsonResponse: (body: unknown, status?: number) => Response;
judgeFailRetryLimit: number;
loadQueuesFromDatabase: () => Promise<QueueRecord[]>;
@@ -526,7 +528,9 @@ function queueSummary(includeDevReady = true, tasks: QueueTask[] = ctx().tasks()
postgresConfigured: true,
postgresReady: ctx().databaseReady(),
dirtyTaskCount: ctx().dirtyDatabaseTaskCount(),
dirtyQueueCount: ctx().dirtyDatabaseQueueCount(),
lastError: ctx().databaseLastError(),
health: ctx().databaseStorageHealth(),
outputArchiveDir: ctx().config.outputArchiveDir,
inMemoryOutputRecords: ctx().config.maxInMemoryOutputRecords,
inMemoryEventRecords: ctx().config.maxInMemoryEventRecords,
@@ -24,6 +24,7 @@ export interface RunnerErrorClassification {
globalBlocker: boolean;
singlePathObservation: boolean;
externalProvider429: boolean;
staleBadResume: boolean;
reason: string;
evidence: string[];
recommendedTriageCommand: string;
@@ -117,6 +118,12 @@ export function classifyRunnerError(message: string, providerId?: string | null)
/exceeded retry limit/gu,
/tokens per min|requests per min/gu,
]);
const staleBadResumeEvidence = collectEvidence(normalized, [
/no rollout found for thread id [a-z0-9._:-]+/gu,
/no rollout found for thread id/gu,
/rollout .*not found .*thread/gu,
/missing .*rollout .*thread/gu,
]);
const runnerLocalEvidence = collectEvidence(normalized, [
/spawn .*enoent/gu,
/command not found/gu,
@@ -133,8 +140,16 @@ export function classifyRunnerError(message: string, providerId?: string | null)
let reason = "The runner failure needs provider triage before it can be treated as a global blocker.";
let evidence: string[] = [];
let externalProvider429 = false;
let staleBadResume = false;
if (externalProvider429Evidence.length > 0) {
if (staleBadResumeEvidence.length > 0) {
scope = "scheduler";
disposition = "service-degraded";
failureKind = "stale-bad-resume-thread-rollout-missing";
staleBadResume = true;
reason = "Codex app-server rejected resume because the saved thread id has no rollout; the scheduler should stop reusing that stale resume thread and reopen bounded recovery on a fresh thread.";
evidence = staleBadResumeEvidence;
} else if (externalProvider429Evidence.length > 0) {
scope = "external-provider";
disposition = "retryable-transient";
failureKind = "external-provider-rate-limit";
@@ -187,6 +202,7 @@ export function classifyRunnerError(message: string, providerId?: string | null)
globalBlocker: false,
singlePathObservation: true,
externalProvider429,
staleBadResume,
reason,
evidence,
recommendedTriageCommand: providerTriageCommand(providerId, raw, scope),