diff --git a/src/components/microservices/code-queue/src/index.ts b/src/components/microservices/code-queue/src/index.ts index f6159ff1..8a14f7bd 100644 --- a/src/components/microservices/code-queue/src/index.ts +++ b/src/components/microservices/code-queue/src/index.ts @@ -171,6 +171,7 @@ const maxTaskAttempts = 99; const referenceInjectionMaxRounds: number | null = null; const retryBackoffBaseMs = 1000; const retryBackoffMaxMs = 10 * 60 * 1000; +const orphanedActiveTaskRecoveryStaleMs = 5 * 60 * 1000; const queueIdPattern = /^[A-Za-z0-9][A-Za-z0-9_.-]{0,63}$/u; const queueNameMaxLength = 80; const workdirMaxLength = 512; @@ -2792,6 +2793,7 @@ configureSelfTests({ normalizeTask, nowIso, processingQueues, + queueActiveTasksForRestartRetry, queueHeadTask, queuedStatusReason, removeActiveRunSlotWaiter, @@ -3335,6 +3337,24 @@ function taskHasLocalExecutionRecoveryClaim(task: QueueTask): boolean { return activeRunSlotReservations.has(queueIdOf(task)); } +function activeTaskHasLocalRunSlotOrWaiter(task: QueueTask): boolean { + const queueId = queueIdOf(task); + return activeRunForTask(task) !== null + || activeRunSlotReservations.has(queueId) + || activeRunSlotWaiters.some((waiter) => waiter.taskId === task.id || waiter.queueId === queueId); +} + +function orphanedActiveTaskAgeMs(task: QueueTask): number { + const updatedAt = timestampMs(task.updatedAt) ?? timestampMs(task.startedAt) ?? timestampMs(task.createdAt) ?? Date.now(); + return Math.max(0, Date.now() - updatedAt); +} + +function taskIsRecoverableOrphanedActive(task: QueueTask, staleMs = orphanedActiveTaskRecoveryStaleMs): boolean { + if (task.status !== "running" && task.status !== "judging") return false; + if (activeTaskHasLocalRunSlotOrWaiter(task)) return false; + return orphanedActiveTaskAgeMs(task) >= staleMs; +} + function clearInactiveActiveTurnIds(): number { let changed = 0; for (const task of state.tasks) { @@ -3347,12 +3367,13 @@ function clearInactiveActiveTurnIds(): number { return changed; } -function queueActiveTasksForRestartRetry(reason: string, method: string, options: { onlyActiveRuns?: boolean } = {}): number { +function queueActiveTasksForRestartRetry(reason: string, method: string, options: { onlyActiveRuns?: boolean; taskIds?: Set } = {}): number { if (!config.schedulerEnabled || !serviceRoleAllowsScheduler(config.serviceRole)) return 0; let recovered = 0; const recoveredTaskIds: string[] = []; for (const task of state.tasks) { if (task.status !== "running" && task.status !== "judging") continue; + if (options.taskIds !== undefined && !options.taskIds.has(task.id)) continue; if (options.onlyActiveRuns === true && !taskHasLocalExecutionRecoveryClaim(task)) continue; task.status = "retry_wait"; task.finishedAt = null; @@ -3382,6 +3403,27 @@ function queueActiveTasksForRestartRetry(reason: string, method: string, options return recovered; } +async function recoverOrphanedActiveTasks(reason: string, method: string): Promise { + const staleOrphanTaskIds = new Set( + state.tasks + .filter((task) => taskIsRecoverableOrphanedActive(task)) + .map((task) => task.id), + ); + if (staleOrphanTaskIds.size === 0) return 0; + const recovered = queueActiveTasksForRestartRetry(reason, method, { taskIds: staleOrphanTaskIds }); + if (recovered === 0) return 0; + logger("warn", "scheduler_orphaned_active_tasks_requeued", { + reason, + recovered, + taskIds: Array.from(staleOrphanTaskIds).sort(), + dirtyTaskCount: dirtyDatabaseTaskIds.size, + }); + await flushDirtyTasksToDatabase(true); + logger("info", "scheduler_orphaned_active_tasks_flushed", { reason, recovered }); + scheduleQueue(); + return recovered; +} + function failTaskForFallbackRetryLimit(task: QueueTask, judge: JudgeResult | null): void { const count = fallbackJudgeRetryCount(task); const reason = `Fallback/non-LLM judge retry limit reached (${count}/${fallbackJudgeRetryLimit}). ${judge?.reason ?? task.lastJudge?.reason ?? "MiniMax judge was unavailable."}`; @@ -3900,11 +3942,12 @@ async function refreshSchedulerTasksFromDatabase(reason: string): Promise 0) { logger("info", "scheduler_database_hot_tasks_refreshed", { reason, changed, loaded: tasks.length }); scheduleQueue(); } - return changed; + return changed + recovered; } function startSchedulerDatabasePoller(): void { diff --git a/src/components/microservices/code-queue/src/self-tests.ts b/src/components/microservices/code-queue/src/self-tests.ts index 82d40cc9..412c3017 100644 --- a/src/components/microservices/code-queue/src/self-tests.ts +++ b/src/components/microservices/code-queue/src/self-tests.ts @@ -22,6 +22,7 @@ export interface SelfTestsContext { normalizeTask: (task: QueueTask) => QueueTask; nowIso: () => string; processingQueues: Set; + queueActiveTasksForRestartRetry: (reason: string, method: string, options?: { onlyActiveRuns?: boolean; taskIds?: Set }) => number; queueHeadTask: (queueId: string, tasks?: QueueTask[]) => QueueTask | null; queuedStatusReason: (task: QueueTask, tasks?: QueueTask[]) => QueuedStatusReason | null; removeActiveRunSlotWaiter: (waiter: ActiveRunSlotWaiter) => void; @@ -251,7 +252,13 @@ function runQueueOrderingSelfTest(): JsonValue { mergeTargetEarly.queueId = "queue_merge_target"; mergeSourceMiddle.queueId = "queue_merge_target"; mergeTargetLate.queueId = "queue_merge_target"; + const orphanRunning = queueOrderTestTask("codex_4400_orphan_running", "running", "2026-05-11T13:00:00.000Z", "2026-05-11T13:00:00.000Z"); + orphanRunning.queueId = "queue_orphan_recovery"; + orphanRunning.activeTurnId = null; + const queuedBehindOrphan = queueOrderTestTask("codex_4401_queued", "queued", "2026-05-11T13:01:00.000Z", "2026-05-11T13:01:00.000Z"); + queuedBehindOrphan.queueId = "queue_orphan_recovery"; const originalMaxActiveQueues = ctx().config.maxActiveQueues; + const beforeTasks = ctx().tasks().slice(); assertReferenceTest(ctx().queueHeadTask("queue_order_test", blockedByRetry)?.id === activeRetry.id, "retry_wait head must keep blocking a moved older-created task"); assertReferenceTest(ctx().nextRunnableTaskFrom("queue_order_test", blockedByRetry)?.id === activeRetry.id, "next runnable should be the retry_wait head"); @@ -289,6 +296,17 @@ function runQueueOrderingSelfTest(): JsonValue { ctx().removeActiveRunSlotWaiter(secondWaiter); } } + try { + ctx().tasks().push(orphanRunning, queuedBehindOrphan); + const recovered = ctx().queueActiveTasksForRestartRetry("orphaned active task self-test", "self-test", { taskIds: new Set([orphanRunning.id]) }); + assertReferenceTest(recovered >= 1, "orphaned running task should be recovered for retry"); + assertReferenceTest(orphanRunning.status === "retry_wait", "orphaned running task should become retry_wait"); + assertReferenceTest(orphanRunning.activeTurnId === null, "orphaned running task should clear activeTurnId"); + assertReferenceTest(orphanRunning.lastError === "orphaned active task self-test", "orphaned running task should record recovery reason"); + assertReferenceTest(ctx().nextRunnableTaskFrom("queue_orphan_recovery")?.id === orphanRunning.id, "recovered orphan should be the next runnable queue head"); + } finally { + ctx().tasks().splice(0, ctx().tasks().length, ...beforeTasks); + } return { ok: true, @@ -301,6 +319,7 @@ function runQueueOrderingSelfTest(): JsonValue { { name: "max_active_queues_zero_is_unbounded", ok: true }, { name: "idle_processing_queue_does_not_consume_active_run_slot", ok: true }, { name: "active_run_slot_waiters_are_fifo", ok: true }, + { name: "orphaned_running_task_recovers_to_retry_wait", ok: true }, ], }; }