From bb5351cc94057b59aa0c4c579945ea48682a2528 Mon Sep 17 00:00:00 2001 From: Codex Date: Sat, 23 May 2026 13:32:39 +0000 Subject: [PATCH] fix: harden code queue postgres rotation --- ...e-queue-postgres-rotation-contract-test.ts | 206 ++++++++++++++++++ scripts/src/check.ts | 4 + scripts/src/code-queue.ts | 47 +++- .../code-queue/src/database-resilience.ts | 190 ++++++++++++++++ .../microservices/code-queue/src/index.ts | 165 +++++++++++--- .../microservices/code-queue/src/queue-api.ts | 4 + .../code-queue/src/runner-error-classifier.ts | 18 +- 7 files changed, 597 insertions(+), 37 deletions(-) create mode 100644 scripts/code-queue-postgres-rotation-contract-test.ts create mode 100644 src/components/microservices/code-queue/src/database-resilience.ts diff --git a/scripts/code-queue-postgres-rotation-contract-test.ts b/scripts/code-queue-postgres-rotation-contract-test.ts new file mode 100644 index 00000000..96b34b2e --- /dev/null +++ b/scripts/code-queue-postgres-rotation-contract-test.ts @@ -0,0 +1,206 @@ +import { codexTasksQueryForTest } from "./src/code-queue"; +import { databaseStorageHealth, classifyTransientDatabaseError, restoreDirtyFlushBatch, takeDirtyFlushBatch } from "../src/components/microservices/code-queue/src/database-resilience"; +import { classifyRunnerError } from "../src/components/microservices/code-queue/src/runner-error-classifier"; + +type JsonRecord = Record; + +function assertCondition(condition: unknown, message: string, detail: unknown = {}): void { + if (!condition) throw new Error(`${message}: ${JSON.stringify(detail)}`); +} + +function asRecord(value: unknown, label: string): JsonRecord { + assertCondition(typeof value === "object" && value !== null && !Array.isArray(value), `${label} must be an object`, value); + return value as JsonRecord; +} + +function asArray(value: unknown, label: string): unknown[] { + assertCondition(Array.isArray(value), `${label} must be an array`, value); + return value as unknown[]; +} + +function postgresSocketWriteError(): Error { + const error = new TypeError("null is not an object (evaluating 'socket.write')"); + error.stack = [ + "TypeError: null is not an object (evaluating 'socket.write')", + " at nextWrite (/app/node_modules/postgres/src/connection.js:255:22)", + " at /app/src/components/microservices/code-queue/src/index.ts:4290:15", + ].join("\n"); + Object.assign(error, { code: "CONNECTION_CLOSED", errno: "CONNECTION_CLOSED" }); + return error; +} + +function assertDatabaseClassification(): void { + const classification = classifyTransientDatabaseError(postgresSocketWriteError()); + assertCondition(classification.transient === true, "socket.write null must be classified as transient database failure", classification); + assertCondition(classification.retryable === true, "socket.write null must be retryable", classification); + assertCondition(classification.kind === "socket-write-null", "socket.write null kind must be explicit", classification); + assertCondition(classification.infrastructureBlocker === true, "socket.write null must be infrastructure-blocker", classification); + assertCondition(classification.evidence.some((item) => String(item).includes("socket.write")), "classification should expose bounded socket.write evidence", classification); + const codeOnly = Object.assign(new Error("write CONNECTION_CLOSED"), { code: "CONNECTION_CLOSED" }); + assertCondition(classifyTransientDatabaseError(codeOnly).kind === "connection-closed", "Postgres CONNECTION_CLOSED code should classify even if stack is stripped", classifyTransientDatabaseError(codeOnly)); +} + +function assertDirtyBatchRestore(): void { + const taskIds = new Set(["task-b", "task-a"]); + const queueIds = new Set(["queue-b", "queue-a"]); + const batch = takeDirtyFlushBatch(taskIds, queueIds, "2026-05-23T00:00:00.000Z"); + assertCondition(taskIds.size === 0 && queueIds.size === 0, "taking a dirty flush batch should clear source sets", { taskIds: Array.from(taskIds), queueIds: Array.from(queueIds) }); + assertCondition(JSON.stringify(batch.taskIds) === JSON.stringify(["task-a", "task-b"]), "dirty task ids should be sorted for deterministic flush", batch); + restoreDirtyFlushBatch(batch, taskIds, queueIds); + assertCondition(taskIds.has("task-a") && taskIds.has("task-b") && queueIds.has("queue-a") && queueIds.has("queue-b"), "failed dirty flush must restore all ids for retry", { + taskIds: Array.from(taskIds), + queueIds: Array.from(queueIds), + }); +} + +function assertStorageHealth(): JsonRecord { + const health = asRecord(databaseStorageHealth({ + postgresReady: true, + dirtyTaskCount: 2, + dirtyQueueCount: 1, + lastError: postgresSocketWriteError(), + flushInFlight: false, + clientRotationCount: 2, + lastClientRotationAt: "2026-05-23T00:00:01.000Z", + lastClientRotationReason: "flush-dirty-tasks", + transientUncaughtSuppressedCount: 1, + lastTransientUncaughtSuppressedAt: "2026-05-23T00:00:02.000Z", + consecutiveFlushFailures: 1, + lastFlushFailureAt: "2026-05-23T00:00:03.000Z", + nextFlushRetryAt: "2026-05-23T00:00:04.000Z", + }), "database storage health"); + assertCondition(health.status === "degraded", "storage health should be degraded after transient flush failure", health); + assertCondition(health.infrastructureBlocker === true, "storage health should mark infrastructure-blocker", health); + assertCondition(health.lastErrorKind === "socket-write-null", "storage health should expose transient kind", health); + const signals = asArray(health.signals, "health.signals"); + assertCondition(signals.length === 1, "storage health should include one bounded actionable signal", health); + const signal = asRecord(signals[0], "health.signals[0]"); + assertCondition(signal.category === "infrastructure-blocker", "storage signal should be an infrastructure-blocker", signal); + assertCondition(String(signal.commanderAction ?? "").includes("Code Queue infrastructure"), "storage signal should tell commander this is infrastructure", signal); + return health; +} + +function fixtureResponse(path: string): JsonRecord { + if (path.includes("/summary")) { + const taskId = decodeURIComponent(path.split("/api/tasks/")[1]?.split("/")[0] ?? "unknown"); + return { + ok: true, + status: 200, + body: { + ok: true, + summary: { + id: taskId, + queueId: "default", + status: "retry_wait", + currentAttempt: 2, + maxAttempts: 99, + prompt: "UniDesk#20 Code Queue scheduler Postgres connection rotation crash", + basePrompt: "UniDesk#20 Code Queue scheduler Postgres connection rotation crash", + lastError: "database flush degraded after CONNECTION_CLOSED socket.write null", + lastAssistantMessage: { + at: "2026-05-23T00:00:00.000Z", + seq: 44, + source: "transcript", + text: "Blocked by Postgres CONNECTION_CLOSED while flushing dirty tasks.", + }, + }, + }, + }; + } + assertCondition(path.startsWith("/api/microservices/code-queue/proxy/api/tasks/overview"), "unexpected Code Queue path", { path }); + const health = assertStorageHealth(); + return { + ok: true, + status: 200, + body: { + ok: true, + queue: { + counts: { retry_wait: 1, queued: 0, running: 0, judging: 0 }, + storage: { + postgresReady: true, + dirtyTaskCount: 2, + dirtyQueueCount: 1, + lastError: "CONNECTION_CLOSED null socket.write", + health, + }, + executionDiagnostics: { + state: "degraded", + degraded: true, + effectiveLiveness: "degraded", + recommendedAction: "observe-degraded", + databaseActiveTaskCount: 0, + schedulerActiveRunSlotCount: 0, + activeHeartbeatCount: 0, + heartbeatRiskTaskIds: [], + staleRecoveryCandidateTaskIds: [], + traceGapTaskIds: [], + }, + }, + pagination: { + limit: 20, + returned: 1, + total: 1, + hasMore: false, + nextBeforeId: null, + includeActive: true, + }, + tasks: [ + { + id: "codex_pg_rotation", + queueId: "default", + status: "retry_wait", + currentAttempt: 2, + updatedAt: "2026-05-23T00:00:10.000Z", + prompt: "UniDesk#20 Code Queue scheduler Postgres connection rotation crash", + basePrompt: "UniDesk#20 Code Queue scheduler Postgres connection rotation crash", + displayPrompt: "UniDesk#20 Code Queue scheduler Postgres connection rotation crash", + lastError: "database flush degraded after CONNECTION_CLOSED socket.write null", + }, + ], + }, + }; +} + +function assertCommanderInfrastructureSignal(): void { + const result = asRecord(codexTasksQueryForTest(["--view", "commander", "--limit", "20"], fixtureResponse), "commander result"); + const commander = asRecord(result.commander, "result.commander"); + const infrastructure = asRecord(commander.infrastructure, "commander.infrastructure"); + assertCondition(infrastructure.infrastructureBlocker === true, "commander view should surface storage issue as infrastructure-blocker", infrastructure); + assertCondition(infrastructure.status === "degraded", "commander infrastructure status should be degraded", infrastructure); + assertCondition(String(infrastructure.actionable ?? "").includes("Code Queue infrastructure-blocker"), "commander infrastructure signal should be actionable", infrastructure); + const signals = asArray(infrastructure.signals, "commander.infrastructure.signals"); + assertCondition(signals.length === 1, "commander infrastructure signals should be bounded", infrastructure); + const signal = asRecord(signals[0], "commander.infrastructure.signals[0]"); + assertCondition(signal.category === "infrastructure-blocker", "commander signal category should be infrastructure-blocker", signal); + const riskCounts = asRecord(commander.riskCounts, "commander.riskCounts"); + assertCondition(riskCounts.infrastructureBlocker === 1, "commander risk counts should expose infrastructure blocker", riskCounts); + const classification = asRecord(commander.classification, "commander.classification"); + const byCategory = asRecord(classification.byCategory, "commander.classification.byCategory"); + assertCondition(Number(byCategory["infrastructure-blocker"] ?? 0) >= 1, "task classification should identify postgres scheduler crash as infrastructure-blocker", classification); +} + +function assertStaleBadResumeClassification(): void { + const classification = classifyRunnerError("app-server error: no rollout found for thread id thread_bad_resume_123", "D601"); + assertCondition(classification.staleBadResume === true, "no rollout found for thread id should be stale bad-resume", classification); + assertCondition(classification.failureKind === "stale-bad-resume-thread-rollout-missing", "bad resume failure kind should be explicit", classification); + assertCondition(classification.retryable === true, "bad resume should be retryable after superseding stale thread", classification); + assertCondition(classification.disposition === "service-degraded", "bad resume should be service-degraded, not business failure", classification); +} + +assertDatabaseClassification(); +assertDirtyBatchRestore(); +assertStorageHealth(); +assertCommanderInfrastructureSignal(); +assertStaleBadResumeClassification(); + +process.stdout.write(`${JSON.stringify({ + ok: true, + checks: [ + "CONNECTION_CLOSED socket.write null is classified as transient postgres infrastructure", + "CONNECTION_CLOSED code-only errors are retryable", + "dirty flush batch restore keeps task/queue ids dirty for retry", + "storage health emits bounded infrastructure-blocker signal", + "codex tasks --view commander surfaces the storage signal", + "stale bad-resume no rollout found for thread id is retryable and superseded", + ], +}, null, 2)}\n`); diff --git a/scripts/src/check.ts b/scripts/src/check.ts index f37cd20f..bdd75ba6 100644 --- a/scripts/src/check.ts +++ b/scripts/src/check.ts @@ -50,6 +50,7 @@ const syntaxFiles = [ "scripts/microservice-health-output-contract-test.ts", "scripts/code-queue-supervisor-disclosure-contract-test.ts", "scripts/code-queue-commander-view-contract-test.ts", + "scripts/code-queue-postgres-rotation-contract-test.ts", "scripts/ssh-argv-guidance-contract-test.ts", "src/components/frontend/src/index.ts", "src/components/frontend/src/app.tsx", @@ -356,6 +357,7 @@ export function runChecks(config: UniDeskConfig, options: CheckOptions = default fileItem("scripts/code-queue-gh-auth-redaction-contract-test.ts"), fileItem("scripts/code-queue-supervisor-disclosure-contract-test.ts"), fileItem("scripts/code-queue-commander-view-contract-test.ts"), + fileItem("scripts/code-queue-postgres-rotation-contract-test.ts"), fileItem("scripts/host-codex-commander-skeleton-contract-test.ts"), fileItem("scripts/host-codex-commander-no-daemon-smoke-contract-test.ts"), fileItem("scripts/host-codex-commander-prompt-lint-contract-test.ts"), @@ -410,6 +412,7 @@ export function runChecks(config: UniDeskConfig, options: CheckOptions = default items.push(commandItem("code-queue:gh-auth-redaction-contract", ["bun", "scripts/code-queue-gh-auth-redaction-contract-test.ts"], 30_000)); items.push(commandItem("code-queue:supervisor-disclosure-contract", ["bun", "scripts/code-queue-supervisor-disclosure-contract-test.ts"], 30_000)); items.push(commandItem("code-queue:commander-view-contract", ["bun", "scripts/code-queue-commander-view-contract-test.ts"], 30_000)); + items.push(commandItem("code-queue:postgres-rotation-contract", ["bun", "scripts/code-queue-postgres-rotation-contract-test.ts"], 30_000)); items.push(commandItem("host-codex-commander:skeleton-contract", ["bun", "scripts/host-codex-commander-skeleton-contract-test.ts"], 30_000)); items.push(commandItem("host-codex-commander:no-daemon-smoke-contract", ["bun", "scripts/host-codex-commander-no-daemon-smoke-contract-test.ts"], 30_000)); items.push(commandItem("host-codex-commander:prompt-lint-contract", ["bun", "scripts/host-codex-commander-prompt-lint-contract-test.ts"], 30_000)); @@ -452,6 +455,7 @@ export function runChecks(config: UniDeskConfig, options: CheckOptions = default items.push(skippedItem("code-queue:gh-auth-redaction-contract", "Code Queue GitHub auth output redaction contract is opt-in with script checks", "--scripts-typecheck or --full")); items.push(skippedItem("code-queue:supervisor-disclosure-contract", "Code Queue supervisor disclosure contract is opt-in with script checks", "--scripts-typecheck or --full")); items.push(skippedItem("code-queue:commander-view-contract", "Code Queue commander view contract is opt-in with script checks", "--scripts-typecheck or --full")); + items.push(skippedItem("code-queue:postgres-rotation-contract", "Code Queue postgres rotation crash contract is opt-in with script checks", "--scripts-typecheck or --full")); items.push(skippedItem("host-codex-commander:skeleton-contract", "host Codex commander skeleton contract is opt-in with script checks", "--scripts-typecheck or --full")); items.push(skippedItem("host-codex-commander:no-daemon-smoke-contract", "host Codex commander no-daemon smoke contract is opt-in with script checks", "--scripts-typecheck or --full")); items.push(skippedItem("host-codex-commander:prompt-lint-contract", "host Codex commander prompt boundary lint contract is opt-in with script checks", "--scripts-typecheck or --full")); diff --git a/scripts/src/code-queue.ts b/scripts/src/code-queue.ts index 0dc84256..e67cdbba 100644 --- a/scripts/src/code-queue.ts +++ b/scripts/src/code-queue.ts @@ -3861,12 +3861,54 @@ function blockerLikeFinalResponseSignals(task: Record, summary: if (pattern.test(text) && !signals.includes(id)) signals.push(id); }; add("blocked", /\b(blocked|blocker|cannot proceed|can't proceed|stuck|waiting for commander|needs authorization|need authorization|requires approval|permission denied)\b|阻塞|卡住|等待授权|需要授权|权限不足/iu); - add("infra-auth-network", /\b(auth|token|credential|github transient|dns|rate limit|429|timeout|timed out|unreachable|offline|proxy|tunnel|provider unavailable)\b|鉴权|令牌|网络|超时|不可达/iu); + add("infra-auth-network", /\b(auth|token|credential|github transient|dns|rate limit|429|timeout|timed out|unreachable|offline|proxy|tunnel|provider unavailable|postgres|database|connection_closed|socket\.write|null is not an object)\b|鉴权|令牌|网络|超时|不可达|数据库/iu); add("merge-or-test-failure", /\b(conflict|merge failed|tests? failed|typecheck failed|syntax failed|build failed|ci failed|e2e failed)\b|冲突|测试失败|构建失败|检查失败/iu); add("not-deployed", /\b(not deployed|not rebuilt|not rolled out|deploy skipped|rollout skipped|needs rollout|requires deploy)\b|未部署|未重建|未上线|需要部署/iu); return signals; } +function commanderInfrastructureSignals(rawQueue: Record): Record { + const storage = asRecord(rawQueue.storage) ?? {}; + const health = asRecord(storage.health) ?? {}; + const rawSignals = asArray(health.signals).map((item) => asRecord(item)).filter((item): item is Record => item !== null); + const boundedSignals = rawSignals.slice(0, 3).map((signal) => ({ + id: asString(signal.id) || "code-queue-infrastructure-blocker", + category: asString(signal.category) || "infrastructure-blocker", + severity: asString(signal.severity) || "high", + retryable: asBoolean(signal.retryable), + bounded: signal.bounded !== false, + message: boundedInlineString(asString(signal.message), 240).text, + evidence: stringList(signal.evidence).slice(0, diagnosticsReasonPreviewLimit), + commanderAction: boundedInlineString(asString(signal.commanderAction), 260).text, + source: asString(signal.source) || "code-queue", + })); + const storageDegraded = health.degraded === true || storage.postgresReady === false || storage.lastError !== null && storage.lastError !== undefined; + return { + infrastructureBlocker: storageDegraded || boundedSignals.some((signal) => signal.category === "infrastructure-blocker"), + status: health.status ?? (storageDegraded ? "degraded" : "ready"), + source: "queue.storage.health", + signalCount: rawSignals.length, + omittedSignalCount: Math.max(0, rawSignals.length - boundedSignals.length), + bounded: true, + signals: boundedSignals, + storage: { + postgresReady: storage.postgresReady ?? health.postgresReady ?? null, + dirtyTaskCount: storage.dirtyTaskCount ?? health.dirtyTaskCount ?? null, + dirtyQueueCount: storage.dirtyQueueCount ?? health.dirtyQueueCount ?? null, + consecutiveFlushFailures: health.consecutiveFlushFailures ?? null, + lastFlushFailureAt: health.lastFlushFailureAt ?? null, + nextFlushRetryAt: health.nextFlushRetryAt ?? null, + clientRotationCount: health.clientRotationCount ?? null, + lastClientRotationAt: health.lastClientRotationAt ?? null, + lastErrorKind: health.lastErrorKind ?? null, + lastErrorTransient: health.lastErrorTransient ?? null, + }, + actionable: storageDegraded + ? "Treat as Code Queue infrastructure-blocker; inspect storage health/logs and wait for bounded dirty-flush retry before duplicating or canceling business tasks." + : "No Code Queue storage infrastructure blocker reported in this overview page.", + }; +} + function commanderAttentionReasons( task: Record, summary: Record | null, @@ -4123,6 +4165,7 @@ function codexTasksCommanderResult( const rawQueue = asRecord(taskPage.queue) ?? {}; const rawDiagnostics = asRecord(rawQueue.executionDiagnostics) ?? {}; const diagnostics = supervisorExecutionDiagnostics(rawDiagnostics); + const infrastructure = commanderInfrastructureSignals(rawQueue); const activity = compactCodeQueueActivity(rawQueue, diagnostics, { snapshotRole: "supervisor-poll" }); const commanderConcurrency = asRecord(activity.commanderConcurrency) ?? {}; const runningTasks = sortRunningWatchTasks(allTasks); @@ -4219,6 +4262,7 @@ function codexTasksCommanderResult( riskCounts: { attention: attentionCounts(attentionItems, returnedAttention), activeRisks: activeRiskTasks.length, + infrastructureBlocker: infrastructure.infrastructureBlocker === true ? 1 : 0, heartbeatRiskTaskIds: stringList(rawDiagnostics.heartbeatRiskTaskIds).length, staleRecoveryCandidateTaskIds: stringList(rawDiagnostics.staleRecoveryCandidateTaskIds).length, traceGapTaskIds: stringList(rawDiagnostics.traceGapTaskIds).length, @@ -4227,6 +4271,7 @@ function codexTasksCommanderResult( }, highPriorityIssues: commanderHighPriorityIssues(allTasks, summaries), classification: commanderClassificationCounts(allTasks, summaries), + infrastructure, executionDiagnostics: diagnostics, degraded, commands: { diff --git a/src/components/microservices/code-queue/src/database-resilience.ts b/src/components/microservices/code-queue/src/database-resilience.ts new file mode 100644 index 00000000..291ed5fd --- /dev/null +++ b/src/components/microservices/code-queue/src/database-resilience.ts @@ -0,0 +1,190 @@ +import type { JsonValue } from "./types"; + +export type DatabaseTransientKind = + | "deadlock" + | "connection-closed" + | "connection-reset" + | "connection-timeout" + | "connection-terminated" + | "socket-write-null" + | "unknown"; + +export interface DatabaseTransientClassification { + transient: boolean; + retryable: boolean; + kind: DatabaseTransientKind | null; + source: "postgres-deadlock" | "postgres-client" | "unknown"; + infrastructureBlocker: boolean; + message: string; + evidence: string[]; + recommendedAction: string; +} + +export interface DirtyFlushBatch { + taskIds: string[]; + queueIds: string[]; + takenAt: string; +} + +export interface DatabaseStorageHealthInput { + postgresReady: boolean; + dirtyTaskCount: number; + dirtyQueueCount: number; + lastError: unknown; + flushInFlight: boolean; + clientRotationCount: number; + lastClientRotationAt: string | null; + lastClientRotationReason: string | null; + transientUncaughtSuppressedCount: number; + lastTransientUncaughtSuppressedAt: string | null; + consecutiveFlushFailures: number; + lastFlushFailureAt: string | null; + nextFlushRetryAt: string | null; +} + +function errorField(error: unknown, key: "code" | "errno"): string { + if (typeof error !== "object" || error === null) return ""; + const value = (error as Record)[key]; + return typeof value === "string" ? value : ""; +} + +export function databaseErrorText(error: unknown): string { + if (error instanceof Error) return `${error.name}: ${error.message}\n${error.stack ?? ""}\n${errorField(error, "code")}\n${errorField(error, "errno")}`; + return String(error); +} + +function collectEvidence(text: string, patterns: RegExp[], limit = 5): string[] { + const evidence: string[] = []; + for (const pattern of patterns) { + for (const match of text.matchAll(pattern)) { + const value = String(match[0] ?? "").replace(/\s+/gu, " ").trim(); + if (value.length > 0 && !evidence.includes(value)) evidence.push(value.slice(0, 220)); + if (evidence.length >= limit) return evidence; + } + } + return evidence; +} + +export function classifyTransientDatabaseError(error: unknown): DatabaseTransientClassification { + const text = databaseErrorText(error); + const lower = text.toLowerCase(); + const deadlock = /deadlock detected/iu.test(text); + const code = `${errorField(error, "code")}\n${errorField(error, "errno")}`; + const postgresClientCode = /CONNECTION_CLOSED|CONNECTION_DESTROYED|CONNECTION_ENDED|CONNECT_TIMEOUT/iu.test(code); + const postgresClient = postgresClientCode || /node_modules\/postgres|postgres(?:@\d+(?:\.\d+){0,2})?\/src|postgres\/src|d601-tcp-egress-gateway|:15432|unidesk_code_queue|set_config/iu.test(text); + const combined = `${text}\n${code}`; + const socketWriteNull = /socket\.write|null is not an object|cannot read propert(?:y|ies) ['"]?write|evaluating ['"]socket\.write/iu.test(combined); + const connectionClosed = /CONNECTION_CLOSED|socket connection was closed|connection closed/iu.test(combined); + const connectionReset = /ECONNRESET|EPIPE|broken pipe/iu.test(combined); + const connectionTimeout = /ETIMEDOUT|CONNECT_TIMEOUT|Connection timeout|connect timed out/iu.test(combined); + const connectionTerminated = /Connection terminated|CONNECTION_DESTROYED|CONNECTION_ENDED/iu.test(combined); + const transient = deadlock || (postgresClient && (socketWriteNull || connectionClosed || connectionReset || connectionTimeout || connectionTerminated)); + const kind: DatabaseTransientKind | null = deadlock + ? "deadlock" + : socketWriteNull && postgresClient + ? "socket-write-null" + : connectionClosed && postgresClient + ? "connection-closed" + : connectionReset && postgresClient + ? "connection-reset" + : connectionTimeout && postgresClient + ? "connection-timeout" + : connectionTerminated && postgresClient + ? "connection-terminated" + : transient + ? "unknown" + : null; + const evidence = collectEvidence(lower, [ + /postgres(?:@\d+(?:\.\d+){0,2})?\/src\/connection\.js:\d+/giu, + /node_modules\/postgres\/src\/connection\.js:\d+/giu, + /connection_closed/giu, + /socket\.write/giu, + /null is not an object/giu, + /d601-tcp-egress-gateway[^\s'"]*/giu, + /:15432/giu, + /deadlock detected/giu, + /econnreset|epipe|etimedout|connect_timeout/giu, + ]); + return { + transient, + retryable: transient, + kind, + source: deadlock ? "postgres-deadlock" : postgresClient ? "postgres-client" : "unknown", + infrastructureBlocker: transient, + message: transient + ? "PostgreSQL client connection was transiently closed; rotate the client, keep dirty writes queued, and retry without crashing the scheduler." + : "Error is not recognized as a transient PostgreSQL client failure.", + evidence, + recommendedAction: transient + ? "Treat as Code Queue infrastructure, not task/business failure; inspect scheduler storage health and let bounded dirty-flush retry run before duplicating work." + : "Handle through the caller's normal error path.", + }; +} + +export function isTransientDatabaseError(error: unknown): boolean { + return classifyTransientDatabaseError(error).transient; +} + +export function takeDirtyFlushBatch(taskIds: Set, queueIds: Set, nowIso: string): DirtyFlushBatch { + const batch = { + taskIds: Array.from(taskIds).sort(), + queueIds: Array.from(queueIds).sort(), + takenAt: nowIso, + }; + taskIds.clear(); + queueIds.clear(); + return batch; +} + +export function restoreDirtyFlushBatch(batch: DirtyFlushBatch, taskIds: Set, queueIds: Set): void { + for (const taskId of batch.taskIds) taskIds.add(taskId); + for (const queueId of batch.queueIds) queueIds.add(queueId); +} + +export function databaseStorageHealth(input: DatabaseStorageHealthInput): JsonValue { + const classification = input.lastError === null ? null : classifyTransientDatabaseError(input.lastError); + const dirtyCount = Math.max(0, input.dirtyTaskCount) + Math.max(0, input.dirtyQueueCount); + const infrastructureBlocker = input.postgresReady !== true || input.lastError !== null || input.consecutiveFlushFailures > 0; + const status = input.postgresReady !== true + ? "unavailable" + : infrastructureBlocker + ? "degraded" + : input.flushInFlight || dirtyCount > 0 + ? "recovering" + : "ready"; + const signal = infrastructureBlocker + ? [{ + id: classification?.kind === "socket-write-null" ? "postgres-client-socket-write-null" : "postgres-connection-rotation", + category: "infrastructure-blocker", + severity: input.postgresReady !== true ? "critical" : "high", + retryable: classification?.retryable ?? true, + bounded: true, + message: classification?.message ?? "Code Queue PostgreSQL storage is not ready or has pending dirty writes after a flush failure.", + evidence: classification?.evidence ?? [], + commanderAction: classification?.recommendedAction ?? "Inspect Code Queue storage health and scheduler logs before treating affected tasks as failed.", + source: "code-queue.storage.postgres", + } as Record] + : []; + return { + status, + degraded: infrastructureBlocker, + infrastructureBlocker, + postgresReady: input.postgresReady, + dirtyTaskCount: input.dirtyTaskCount, + dirtyQueueCount: input.dirtyQueueCount, + dirtyCount, + flushInFlight: input.flushInFlight, + consecutiveFlushFailures: input.consecutiveFlushFailures, + lastFlushFailureAt: input.lastFlushFailureAt, + nextFlushRetryAt: input.nextFlushRetryAt, + clientRotationCount: input.clientRotationCount, + lastClientRotationAt: input.lastClientRotationAt, + lastClientRotationReason: input.lastClientRotationReason, + transientUncaughtSuppressedCount: input.transientUncaughtSuppressedCount, + lastTransientUncaughtSuppressedAt: input.lastTransientUncaughtSuppressedAt, + lastErrorKind: classification?.kind ?? null, + lastErrorTransient: classification?.transient ?? false, + lastErrorMessage: input.lastError === null ? null : databaseErrorText(input.lastError).split(/\r?\n/u)[0]?.slice(0, 500) ?? null, + signals: signal as unknown as JsonValue, + }; +} diff --git a/src/components/microservices/code-queue/src/index.ts b/src/components/microservices/code-queue/src/index.ts index 3e274f9e..cc2d1c1e 100644 --- a/src/components/microservices/code-queue/src/index.ts +++ b/src/components/microservices/code-queue/src/index.ts @@ -119,6 +119,7 @@ import { } from "./queue-api"; import { buildExecutionDiagnostics, buildSchedulerHeartbeat, normalizeSchedulerHeartbeatStaleMs, schedulerHeartbeatStaleMs, schedulerHeartbeatStaleMsMax, staleRecoveryCandidate } from "./execution-diagnostics"; import { ReferenceTaskLookupError, configureReferences, injectReferencedTaskContext, taskReferenceIds } from "./references"; +import { classifyTransientDatabaseError, databaseStorageHealth, isTransientDatabaseError, restoreDirtyFlushBatch, takeDirtyFlushBatch } from "./database-resilience"; import { applyOaTraceStatsToTaskJson, configureOaEvents, @@ -236,8 +237,17 @@ function createSqlClient(): SqlClient { let databaseReady = false; let databaseLastError: string | null = null; +let databaseLastErrorDetail: unknown = null; let databaseFlushTimer: ReturnType | null = null; let databaseFlushInFlight = false; +let databaseFlushFailureCount = 0; +let databaseLastFlushFailureAt: string | null = null; +let databaseNextFlushRetryAt: string | null = null; +let databaseClientRotationCount = 0; +let databaseLastClientRotationAt: string | null = null; +let databaseLastClientRotationReason: string | null = null; +let databaseTransientUncaughtSuppressedCount = 0; +let databaseLastTransientUncaughtSuppressedAt: string | null = null; const dirtyDatabaseTaskIds = new Set(); const dirtyDatabaseQueueIds = new Set(); const workdirRecords = new Map(); @@ -1220,29 +1230,27 @@ function errorToJson(error: unknown): JsonValue { return String(error); } -function transientDatabaseErrorMessage(error: unknown): string { - const message = error instanceof Error ? error.message : String(error); - const stack = error instanceof Error ? error.stack ?? "" : ""; - return `${message}\n${stack}`; -} - -function isTransientDatabaseError(error: unknown): boolean { - const text = transientDatabaseErrorMessage(error); - if (/deadlock detected/iu.test(text)) return true; - const looksLikePostgresClient = /node_modules\/postgres|postgres\/src|d601-tcp-egress-gateway|:15432|unidesk_code_queue|set_config/iu.test(text); - if (!looksLikePostgresClient) return false; - return /CONNECTION_CLOSED|ECONNRESET|EPIPE|ETIMEDOUT|Connection terminated|socket connection was closed|socket\.write|null is not an object/iu.test(text); -} - function transientDatabaseRetryDelayMs(attempt: number): number { return Math.min(2000, 150 * Math.max(1, attempt)); } +function scheduleDatabaseFlushRetry(delayMs: number): void { + if (serviceRoleReadOnly(config.serviceRole)) return; + if (!databaseReady || shutdownRequested) return; + if (dirtyDatabaseTaskIds.size === 0 && dirtyDatabaseQueueIds.size === 0) return; + if (databaseFlushTimer !== null) return; + const boundedDelayMs = Math.max(100, Math.min(30_000, Math.floor(delayMs))); + scheduleDatabaseFlush(boundedDelayMs, { retry: true }); +} + async function rotateSqlClientAfterTransientError(reason: string, error: unknown): Promise { - databaseLastError = databaseErrorMessage(error); + setDatabaseLastError(error); if (sqlRotationInFlight !== null) return sqlRotationInFlight; const previous = sql; sql = createSqlClient(); + databaseClientRotationCount += 1; + databaseLastClientRotationAt = nowIso(); + databaseLastClientRotationReason = reason; sqlRotationInFlight = previous.end({ timeout: 1 }) .catch((closeError) => logger("warn", "database_client_close_failed", { reason, error: errorToJson(closeError) })) .finally(() => { @@ -1259,7 +1267,8 @@ async function withTransientDatabaseRetries(operation: string, action: () => return await action(); } catch (error) { lastError = error; - if (!isTransientDatabaseError(error) || attempt >= attempts) throw error; + const classification = classifyTransientDatabaseError(error); + if (!classification.transient || attempt >= attempts) throw error; logger("warn", "database_transient_retry", { operation, attempt, attempts, delayMs: transientDatabaseRetryDelayMs(attempt), error: errorToJson(error) }); await rotateSqlClientAfterTransientError(operation, error); await Bun.sleep(transientDatabaseRetryDelayMs(attempt)); @@ -1291,6 +1300,16 @@ function databaseErrorMessage(error: unknown): string { return String(error); } +function setDatabaseLastError(error: unknown): void { + databaseLastError = databaseErrorMessage(error); + databaseLastErrorDetail = error; +} + +function clearDatabaseLastError(): void { + databaseLastError = null; + databaseLastErrorDetail = null; +} + function markTaskDirty(taskId: string): void { dirtyDatabaseTaskIds.add(taskId); scheduleDatabaseFlush(); @@ -1319,14 +1338,15 @@ function runGarbageCollection(): void { if (typeof gc === "function") gc(true); } -function scheduleDatabaseFlush(delayMs = config.databaseFlushIntervalMs): void { +function scheduleDatabaseFlush(delayMs = config.databaseFlushIntervalMs, options: { retry?: boolean } = {}): void { if (serviceRoleReadOnly(config.serviceRole)) return; if (!databaseReady || (dirtyDatabaseTaskIds.size === 0 && dirtyDatabaseQueueIds.size === 0) || shutdownRequested) return; if (databaseFlushTimer !== null) return; + if (options.retry === true) databaseNextFlushRetryAt = new Date(Date.now() + Math.max(0, delayMs)).toISOString(); databaseFlushTimer = setTimeout(() => { databaseFlushTimer = null; void flushDirtyTasksToDatabase().catch((error) => { - databaseLastError = databaseErrorMessage(error); + setDatabaseLastError(error); logger("error", "database_flush_failed", { error: errorToJson(error), dirtyTaskCount: dirtyDatabaseTaskIds.size }); }); }, delayMs); @@ -2105,7 +2125,7 @@ function scheduleStartupDatabaseMaintenance(): void { durationMs: Math.round(performance.now() - started), }); })().catch((error) => { - databaseLastError = databaseErrorMessage(error); + setDatabaseLastError(error); logger("warn", "database_startup_maintenance_failed", { error: errorToJson(error) }); }); }, 1000).unref?.(); @@ -2129,7 +2149,7 @@ async function findTaskForRead(taskId: string): Promise { if (databaseTask === null) return hotTask; return shouldPreferHotTaskOverDatabase(hotTask, databaseTask) ? hotTask : databaseTask; } catch (error) { - databaseLastError = databaseErrorMessage(error); + setDatabaseLastError(error); logger("warn", "read_database_fallback", { taskId, error: errorToJson(error) }); return hotTask; } @@ -2144,7 +2164,7 @@ async function findTaskForMutation(taskId: string): Promise { if (hotTask === null) return rememberHotTask(databaseTask); return shouldPreferHotTaskOverDatabase(hotTask, databaseTask) ? hotTask : reconcileHotTaskFromDatabase(databaseTask); } catch (error) { - databaseLastError = databaseErrorMessage(error); + setDatabaseLastError(error); logger("warn", "mutation_database_fallback", { taskId, error: errorToJson(error) }); return hotTask; } @@ -2170,11 +2190,10 @@ async function flushDirtyTasksToDatabase(force = false): Promise { scheduleDatabaseFlush(); return; } - const ids = Array.from(dirtyDatabaseTaskIds).sort(); - const queueIds = Array.from(dirtyDatabaseQueueIds).sort(); + const batch = takeDirtyFlushBatch(dirtyDatabaseTaskIds, dirtyDatabaseQueueIds, nowIso()); + const ids = batch.taskIds; + const queueIds = batch.queueIds; if (ids.length === 0 && queueIds.length === 0) return; - dirtyDatabaseTaskIds.clear(); - dirtyDatabaseQueueIds.clear(); databaseFlushInFlight = true; let rejectedTaskIds: string[] = []; try { @@ -2190,10 +2209,32 @@ async function flushDirtyTasksToDatabase(force = false): Promise { } return currentRejectedTaskIds; })); - databaseLastError = null; + clearDatabaseLastError(); + databaseFlushFailureCount = 0; + databaseLastFlushFailureAt = null; + databaseNextFlushRetryAt = null; } catch (error) { - for (const id of ids) dirtyDatabaseTaskIds.add(id); - for (const id of queueIds) dirtyDatabaseQueueIds.add(id); + restoreDirtyFlushBatch(batch, dirtyDatabaseTaskIds, dirtyDatabaseQueueIds); + setDatabaseLastError(error); + databaseFlushFailureCount += 1; + databaseLastFlushFailureAt = nowIso(); + const classification = classifyTransientDatabaseError(error); + if (classification.transient) { + await rotateSqlClientAfterTransientError("flush-dirty-tasks-final", error); + const retryDelayMs = Math.min(30_000, Math.max(config.databaseFlushIntervalMs, transientDatabaseRetryDelayMs(databaseFlushFailureCount + 1) * Math.max(1, databaseFlushFailureCount))); + logger("error", "database_flush_degraded_retry_scheduled", { + operation: "flush-dirty-tasks", + force, + failureCount: databaseFlushFailureCount, + retryDelayMs, + dirtyTaskCount: dirtyDatabaseTaskIds.size, + dirtyQueueCount: dirtyDatabaseQueueIds.size, + classification: classification as unknown as JsonValue, + error: errorToJson(error), + }); + scheduleDatabaseFlushRetry(retryDelayMs); + return; + } throw error; } finally { databaseFlushInFlight = false; @@ -2368,7 +2409,7 @@ async function initDatabasePersistenceWithRetry(): Promise { await initDatabasePersistence(); return; } catch (error) { - databaseLastError = databaseErrorMessage(error); + setDatabaseLastError(error); const elapsedMs = Date.now() - started; logger("warn", "database_persistence_init_retry", { attempt, elapsedMs, error: errorToJson(error) }); if (elapsedMs > 90_000 && !reportedLongWait) { @@ -2923,7 +2964,23 @@ configureQueueApi({ databaseLastError: () => databaseLastError, databaseReady: () => databaseReady, defaultQueueId, + dirtyDatabaseQueueCount: () => dirtyDatabaseQueueIds.size, dirtyDatabaseTaskCount: () => dirtyDatabaseTaskIds.size, + databaseStorageHealth: () => databaseStorageHealth({ + postgresReady: databaseReady, + dirtyTaskCount: dirtyDatabaseTaskIds.size, + dirtyQueueCount: dirtyDatabaseQueueIds.size, + lastError: databaseLastErrorDetail ?? databaseLastError, + flushInFlight: databaseFlushInFlight, + clientRotationCount: databaseClientRotationCount, + lastClientRotationAt: databaseLastClientRotationAt, + lastClientRotationReason: databaseLastClientRotationReason, + transientUncaughtSuppressedCount: databaseTransientUncaughtSuppressedCount, + lastTransientUncaughtSuppressedAt: databaseLastTransientUncaughtSuppressedAt, + consecutiveFlushFailures: databaseFlushFailureCount, + lastFlushFailureAt: databaseLastFlushFailureAt, + nextFlushRetryAt: databaseNextFlushRetryAt, + }), jsonResponse, judgeFailRetryLimit, loadQueuesFromDatabase, @@ -3489,9 +3546,24 @@ function latestAttemptExternalProvider429(task: QueueTask): boolean { return latestAttemptRunnerErrorClassification(task)?.externalProvider429 === true; } +function latestAttemptStaleBadResume(task: QueueTask): boolean { + return latestAttemptRunnerErrorClassification(task)?.staleBadResume === true; +} + +function clearStaleBadResumeThread(task: QueueTask): boolean { + if (!latestAttemptStaleBadResume(task) || task.codexThreadId === null) return false; + const previousThreadId = task.codexThreadId; + task.codexThreadId = null; + task.activeTurnId = null; + appendOutput(task, "system", `stale bad-resume thread ${previousThreadId} superseded; next retry will start a fresh Codex thread because app-server reported no rollout found for thread id\n`, "thread/resume-superseded"); + logger("warn", "stale_bad_resume_thread_superseded", { taskId: task.id, previousThreadId }); + return true; +} + function retryBackoffEvidence(task: QueueTask, completedAttempts: number, reason: string): JsonValue { const runnerClassification = latestAttemptRunnerErrorClassification(task); const externalProvider429 = runnerClassification?.externalProvider429 === true; + const staleBadResume = runnerClassification?.staleBadResume === true; const baseDelayMs = retryBackoffMs(completedAttempts); const jittered = externalProvider429 ? retryBackoffWithJitterMs(task, completedAttempts, 30_000) @@ -3505,9 +3577,12 @@ function retryBackoffEvidence(task: QueueTask, completedAttempts: number, reason delayMs: jittered.delayMs, maxDelayMs: retryBackoffMaxMs, externalProvider429, + staleBadResume, failureKind: runnerClassification?.failureKind ?? null, runnerDisposition: runnerClassification?.disposition ?? null, - commanderBenefit: "fewer failed runners and clearer retry/backoff evidence", + commanderBenefit: staleBadResume + ? "stale resume thread is superseded once and retried on a fresh Codex thread" + : "fewer failed runners and clearer retry/backoff evidence", }; } @@ -4072,6 +4147,7 @@ async function runTask(task: QueueTask): Promise { const nextPrompt = judgeFailContinuationPrompt(task, judge, task.judgeFailCount); task.nextPrompt = nextPrompt; task.nextMode = "retry"; + clearStaleBadResumeThread(task); setAttemptFeedbackPrompt(latestAttempt, nextPrompt, "judge-fail-retry", task.attempts.length + 1); task.updatedAt = nowIso(); appendOutput(task, "system", `judge=fail treated as retry (${task.judgeFailCount}/${judgeFailRetryLimit}); appending continuation prompt to existing session\n`, "queue"); @@ -4135,6 +4211,7 @@ async function runTask(task: QueueTask): Promise { const nextPrompt = retryPrompt(task, judge); task.nextPrompt = nextPrompt; task.nextMode = "retry"; + clearStaleBadResumeThread(task); setAttemptFeedbackPrompt(latestAttempt, nextPrompt, judge.continuePrompt?.trim() ? "judge-continue-prompt" : "judge-retry-generated", task.attempts.length + 1); task.updatedAt = nowIso(); persistTaskState(task); @@ -4516,7 +4593,7 @@ function startSchedulerDatabasePoller(): void { const interval = setInterval(() => { if (!serviceReady || shutdownRequested) return; void refreshSchedulerTasksFromDatabase("poll").catch((error) => { - databaseLastError = databaseErrorMessage(error); + setDatabaseLastError(error); logger("warn", "scheduler_database_poll_failed", { error: errorToJson(error) }); }); }, config.schedulerPollIntervalMs); @@ -4572,8 +4649,17 @@ function installShutdownHandlers(): void { process.once("SIGINT", stop); process.on("uncaughtException", (error) => { if (isTransientDatabaseError(error)) { + databaseTransientUncaughtSuppressedCount += 1; + databaseLastTransientUncaughtSuppressedAt = nowIso(); void rotateSqlClientAfterTransientError("uncaught-exception", error).catch((closeError) => logger("warn", "database_client_rotate_uncaught_failed", { error: errorToJson(closeError) })); - logger("error", "transient_database_uncaught_exception_suppressed", { error: errorToJson(error) }); + if (dirtyDatabaseTaskIds.size > 0 || dirtyDatabaseQueueIds.size > 0) scheduleDatabaseFlushRetry(config.databaseFlushIntervalMs); + logger("error", "transient_database_uncaught_exception_suppressed", { + suppressedCount: databaseTransientUncaughtSuppressedCount, + dirtyTaskCount: dirtyDatabaseTaskIds.size, + dirtyQueueCount: dirtyDatabaseQueueIds.size, + classification: classifyTransientDatabaseError(error) as unknown as JsonValue, + error: errorToJson(error), + }); return; } logger("error", "uncaught_exception", { error: errorToJson(error) }); @@ -4581,8 +4667,17 @@ function installShutdownHandlers(): void { }); process.on("unhandledRejection", (reason) => { if (isTransientDatabaseError(reason)) { + databaseTransientUncaughtSuppressedCount += 1; + databaseLastTransientUncaughtSuppressedAt = nowIso(); void rotateSqlClientAfterTransientError("unhandled-rejection", reason).catch((error) => logger("warn", "database_client_rotate_rejection_failed", { error: errorToJson(error) })); - logger("error", "transient_database_unhandled_rejection_suppressed", { error: errorToJson(reason) }); + if (dirtyDatabaseTaskIds.size > 0 || dirtyDatabaseQueueIds.size > 0) scheduleDatabaseFlushRetry(config.databaseFlushIntervalMs); + logger("error", "transient_database_unhandled_rejection_suppressed", { + suppressedCount: databaseTransientUncaughtSuppressedCount, + dirtyTaskCount: dirtyDatabaseTaskIds.size, + dirtyQueueCount: dirtyDatabaseQueueIds.size, + classification: classifyTransientDatabaseError(reason) as unknown as JsonValue, + error: errorToJson(reason), + }); return; } logger("error", "unhandled_rejection", { error: errorToJson(reason) }); @@ -5965,7 +6060,7 @@ async function startDatabaseBackedRuntime(): Promise { persistState(); serviceReady = true; void refreshSchedulerTasksFromDatabase("startup").catch((error) => { - databaseLastError = databaseErrorMessage(error); + setDatabaseLastError(error); logger("warn", "scheduler_database_startup_refresh_failed", { error: errorToJson(error) }); }); startSchedulerDatabasePoller(); @@ -5979,6 +6074,6 @@ async function startDatabaseBackedRuntime(): Promise { void initDatabasePersistenceWithRetry() .then(() => startDatabaseBackedRuntime()) .catch((error) => { - databaseLastError = databaseErrorMessage(error); + setDatabaseLastError(error); logger("error", "database_persistence_init_failed", { error: errorToJson(error) }); }); diff --git a/src/components/microservices/code-queue/src/queue-api.ts b/src/components/microservices/code-queue/src/queue-api.ts index 04dc6086..69af9486 100644 --- a/src/components/microservices/code-queue/src/queue-api.ts +++ b/src/components/microservices/code-queue/src/queue-api.ts @@ -24,7 +24,9 @@ export interface QueueApiContext { databaseLastError: () => string | null; databaseReady: () => boolean; defaultQueueId: string; + dirtyDatabaseQueueCount: () => number; dirtyDatabaseTaskCount: () => number; + databaseStorageHealth: () => JsonValue; jsonResponse: (body: unknown, status?: number) => Response; judgeFailRetryLimit: number; loadQueuesFromDatabase: () => Promise; @@ -526,7 +528,9 @@ function queueSummary(includeDevReady = true, tasks: QueueTask[] = ctx().tasks() postgresConfigured: true, postgresReady: ctx().databaseReady(), dirtyTaskCount: ctx().dirtyDatabaseTaskCount(), + dirtyQueueCount: ctx().dirtyDatabaseQueueCount(), lastError: ctx().databaseLastError(), + health: ctx().databaseStorageHealth(), outputArchiveDir: ctx().config.outputArchiveDir, inMemoryOutputRecords: ctx().config.maxInMemoryOutputRecords, inMemoryEventRecords: ctx().config.maxInMemoryEventRecords, diff --git a/src/components/microservices/code-queue/src/runner-error-classifier.ts b/src/components/microservices/code-queue/src/runner-error-classifier.ts index 0b69b2df..1fd451c8 100644 --- a/src/components/microservices/code-queue/src/runner-error-classifier.ts +++ b/src/components/microservices/code-queue/src/runner-error-classifier.ts @@ -24,6 +24,7 @@ export interface RunnerErrorClassification { globalBlocker: boolean; singlePathObservation: boolean; externalProvider429: boolean; + staleBadResume: boolean; reason: string; evidence: string[]; recommendedTriageCommand: string; @@ -117,6 +118,12 @@ export function classifyRunnerError(message: string, providerId?: string | null) /exceeded retry limit/gu, /tokens per min|requests per min/gu, ]); + const staleBadResumeEvidence = collectEvidence(normalized, [ + /no rollout found for thread id [a-z0-9._:-]+/gu, + /no rollout found for thread id/gu, + /rollout .*not found .*thread/gu, + /missing .*rollout .*thread/gu, + ]); const runnerLocalEvidence = collectEvidence(normalized, [ /spawn .*enoent/gu, /command not found/gu, @@ -133,8 +140,16 @@ export function classifyRunnerError(message: string, providerId?: string | null) let reason = "The runner failure needs provider triage before it can be treated as a global blocker."; let evidence: string[] = []; let externalProvider429 = false; + let staleBadResume = false; - if (externalProvider429Evidence.length > 0) { + if (staleBadResumeEvidence.length > 0) { + scope = "scheduler"; + disposition = "service-degraded"; + failureKind = "stale-bad-resume-thread-rollout-missing"; + staleBadResume = true; + reason = "Codex app-server rejected resume because the saved thread id has no rollout; the scheduler should stop reusing that stale resume thread and reopen bounded recovery on a fresh thread."; + evidence = staleBadResumeEvidence; + } else if (externalProvider429Evidence.length > 0) { scope = "external-provider"; disposition = "retryable-transient"; failureKind = "external-provider-rate-limit"; @@ -187,6 +202,7 @@ export function classifyRunnerError(message: string, providerId?: string | null) globalBlocker: false, singlePathObservation: true, externalProvider429, + staleBadResume, reason, evidence, recommendedTriageCommand: providerTriageCommand(providerId, raw, scope),