diff --git a/src/components/microservices/code-queue/src/index.ts b/src/components/microservices/code-queue/src/index.ts index 8a14f7bd..b1df3600 100644 --- a/src/components/microservices/code-queue/src/index.ts +++ b/src/components/microservices/code-queue/src/index.ts @@ -1549,15 +1549,11 @@ async function deleteTaskFromDatabase(taskId: string): Promise { async function claimTaskInDatabase(task: QueueTask, expectedQueueId: string): Promise { if (!databaseReady) return true; - const claimed = await withTimeout( - sql.begin(async (client) => { - const timeout = `${config.databaseClaimTimeoutMs}ms`; - await client`SELECT set_config('statement_timeout', ${timeout}, true), set_config('lock_timeout', ${timeout}, true)`; - return await upsertTaskToDatabase(client, task, { claimQueueId: expectedQueueId }); - }), - config.databaseClaimTimeoutMs, - `database claim timed out after ${config.databaseClaimTimeoutMs}ms`, - ); + const timeout = `${config.databaseClaimTimeoutMs}ms`; + const claimed = await sql.begin(async (client) => { + await client`SELECT set_config('statement_timeout', ${timeout}, true), set_config('lock_timeout', ${timeout}, true)`; + return await upsertTaskToDatabase(client, task, { claimQueueId: expectedQueueId }); + }); if (claimed) return true; const databaseTask = await loadTaskFromDatabase(task.id); if (databaseTask !== null) reconcileHotTaskFromDatabase(databaseTask); @@ -2799,6 +2795,7 @@ configureSelfTests({ removeActiveRunSlotWaiter, resolveReasoningEffort, runDatabaseClaimMoveSelfTest, + taskIsRecoverableOrphanedActive, tasks: () => state.tasks, updateProcessingFlag, }); @@ -3338,10 +3335,8 @@ function taskHasLocalExecutionRecoveryClaim(task: QueueTask): boolean { } function activeTaskHasLocalRunSlotOrWaiter(task: QueueTask): boolean { - const queueId = queueIdOf(task); return activeRunForTask(task) !== null - || activeRunSlotReservations.has(queueId) - || activeRunSlotWaiters.some((waiter) => waiter.taskId === task.id || waiter.queueId === queueId); + || activeRunSlotWaiters.some((waiter) => waiter.taskId === task.id); } function orphanedActiveTaskAgeMs(task: QueueTask): number { @@ -3497,6 +3492,7 @@ async function runTask(task: QueueTask): Promise { } if (!claimed) { releaseRunSlot(); + await recoverOrphanedActiveTasks("Database claim found stale active task without local run", "database/claim-recovery"); return; } publishTaskOaEvent(task, "claim"); diff --git a/src/components/microservices/code-queue/src/self-tests.ts b/src/components/microservices/code-queue/src/self-tests.ts index 412c3017..c24b9339 100644 --- a/src/components/microservices/code-queue/src/self-tests.ts +++ b/src/components/microservices/code-queue/src/self-tests.ts @@ -28,6 +28,7 @@ export interface SelfTestsContext { removeActiveRunSlotWaiter: (waiter: ActiveRunSlotWaiter) => void; resolveReasoningEffort: (model: string, explicit?: string | null) => string | null; runDatabaseClaimMoveSelfTest?: () => Promise; + taskIsRecoverableOrphanedActive: (task: QueueTask, staleMs?: number) => boolean; tasks: () => QueueTask[]; updateProcessingFlag: () => void; } @@ -296,16 +297,23 @@ function runQueueOrderingSelfTest(): JsonValue { ctx().removeActiveRunSlotWaiter(secondWaiter); } } - try { - ctx().tasks().push(orphanRunning, queuedBehindOrphan); - const recovered = ctx().queueActiveTasksForRestartRetry("orphaned active task self-test", "self-test", { taskIds: new Set([orphanRunning.id]) }); - assertReferenceTest(recovered >= 1, "orphaned running task should be recovered for retry"); - assertReferenceTest(orphanRunning.status === "retry_wait", "orphaned running task should become retry_wait"); - assertReferenceTest(orphanRunning.activeTurnId === null, "orphaned running task should clear activeTurnId"); - assertReferenceTest(orphanRunning.lastError === "orphaned active task self-test", "orphaned running task should record recovery reason"); - assertReferenceTest(ctx().nextRunnableTaskFrom("queue_orphan_recovery")?.id === orphanRunning.id, "recovered orphan should be the next runnable queue head"); - } finally { - ctx().tasks().splice(0, ctx().tasks().length, ...beforeTasks); + { + const reservationMarker = "queue_orphan_recovery"; + const hadSameQueueReservation = ctx().activeRunSlotReservations.has(reservationMarker); + try { + ctx().tasks().push(orphanRunning, queuedBehindOrphan); + ctx().activeRunSlotReservations.add(reservationMarker); + assertReferenceTest(ctx().taskIsRecoverableOrphanedActive(orphanRunning, 0), "same-queue run slot reservation must not hide an orphaned task"); + const recovered = ctx().queueActiveTasksForRestartRetry("orphaned active task self-test", "self-test", { taskIds: new Set([orphanRunning.id]) }); + assertReferenceTest(recovered >= 1, "orphaned running task should be recovered for retry"); + assertReferenceTest(orphanRunning.status === "retry_wait", "orphaned running task should become retry_wait"); + assertReferenceTest(orphanRunning.activeTurnId === null, "orphaned running task should clear activeTurnId"); + assertReferenceTest(orphanRunning.lastError === "orphaned active task self-test", "orphaned running task should record recovery reason"); + assertReferenceTest(ctx().nextRunnableTaskFrom("queue_orphan_recovery")?.id === orphanRunning.id, "recovered orphan should be the next runnable queue head"); + } finally { + if (!hadSameQueueReservation) ctx().activeRunSlotReservations.delete(reservationMarker); + ctx().tasks().splice(0, ctx().tasks().length, ...beforeTasks); + } } return {