fix: recover orphaned code queue active tasks
This commit is contained in:
@@ -171,6 +171,7 @@ const maxTaskAttempts = 99;
|
||||
const referenceInjectionMaxRounds: number | null = null;
|
||||
const retryBackoffBaseMs = 1000;
|
||||
const retryBackoffMaxMs = 10 * 60 * 1000;
|
||||
const orphanedActiveTaskRecoveryStaleMs = 5 * 60 * 1000;
|
||||
const queueIdPattern = /^[A-Za-z0-9][A-Za-z0-9_.-]{0,63}$/u;
|
||||
const queueNameMaxLength = 80;
|
||||
const workdirMaxLength = 512;
|
||||
@@ -2792,6 +2793,7 @@ configureSelfTests({
|
||||
normalizeTask,
|
||||
nowIso,
|
||||
processingQueues,
|
||||
queueActiveTasksForRestartRetry,
|
||||
queueHeadTask,
|
||||
queuedStatusReason,
|
||||
removeActiveRunSlotWaiter,
|
||||
@@ -3335,6 +3337,24 @@ function taskHasLocalExecutionRecoveryClaim(task: QueueTask): boolean {
|
||||
return activeRunSlotReservations.has(queueIdOf(task));
|
||||
}
|
||||
|
||||
function activeTaskHasLocalRunSlotOrWaiter(task: QueueTask): boolean {
|
||||
const queueId = queueIdOf(task);
|
||||
return activeRunForTask(task) !== null
|
||||
|| activeRunSlotReservations.has(queueId)
|
||||
|| activeRunSlotWaiters.some((waiter) => waiter.taskId === task.id || waiter.queueId === queueId);
|
||||
}
|
||||
|
||||
function orphanedActiveTaskAgeMs(task: QueueTask): number {
|
||||
const updatedAt = timestampMs(task.updatedAt) ?? timestampMs(task.startedAt) ?? timestampMs(task.createdAt) ?? Date.now();
|
||||
return Math.max(0, Date.now() - updatedAt);
|
||||
}
|
||||
|
||||
function taskIsRecoverableOrphanedActive(task: QueueTask, staleMs = orphanedActiveTaskRecoveryStaleMs): boolean {
|
||||
if (task.status !== "running" && task.status !== "judging") return false;
|
||||
if (activeTaskHasLocalRunSlotOrWaiter(task)) return false;
|
||||
return orphanedActiveTaskAgeMs(task) >= staleMs;
|
||||
}
|
||||
|
||||
function clearInactiveActiveTurnIds(): number {
|
||||
let changed = 0;
|
||||
for (const task of state.tasks) {
|
||||
@@ -3347,12 +3367,13 @@ function clearInactiveActiveTurnIds(): number {
|
||||
return changed;
|
||||
}
|
||||
|
||||
function queueActiveTasksForRestartRetry(reason: string, method: string, options: { onlyActiveRuns?: boolean } = {}): number {
|
||||
function queueActiveTasksForRestartRetry(reason: string, method: string, options: { onlyActiveRuns?: boolean; taskIds?: Set<string> } = {}): number {
|
||||
if (!config.schedulerEnabled || !serviceRoleAllowsScheduler(config.serviceRole)) return 0;
|
||||
let recovered = 0;
|
||||
const recoveredTaskIds: string[] = [];
|
||||
for (const task of state.tasks) {
|
||||
if (task.status !== "running" && task.status !== "judging") continue;
|
||||
if (options.taskIds !== undefined && !options.taskIds.has(task.id)) continue;
|
||||
if (options.onlyActiveRuns === true && !taskHasLocalExecutionRecoveryClaim(task)) continue;
|
||||
task.status = "retry_wait";
|
||||
task.finishedAt = null;
|
||||
@@ -3382,6 +3403,27 @@ function queueActiveTasksForRestartRetry(reason: string, method: string, options
|
||||
return recovered;
|
||||
}
|
||||
|
||||
async function recoverOrphanedActiveTasks(reason: string, method: string): Promise<number> {
|
||||
const staleOrphanTaskIds = new Set(
|
||||
state.tasks
|
||||
.filter((task) => taskIsRecoverableOrphanedActive(task))
|
||||
.map((task) => task.id),
|
||||
);
|
||||
if (staleOrphanTaskIds.size === 0) return 0;
|
||||
const recovered = queueActiveTasksForRestartRetry(reason, method, { taskIds: staleOrphanTaskIds });
|
||||
if (recovered === 0) return 0;
|
||||
logger("warn", "scheduler_orphaned_active_tasks_requeued", {
|
||||
reason,
|
||||
recovered,
|
||||
taskIds: Array.from(staleOrphanTaskIds).sort(),
|
||||
dirtyTaskCount: dirtyDatabaseTaskIds.size,
|
||||
});
|
||||
await flushDirtyTasksToDatabase(true);
|
||||
logger("info", "scheduler_orphaned_active_tasks_flushed", { reason, recovered });
|
||||
scheduleQueue();
|
||||
return recovered;
|
||||
}
|
||||
|
||||
function failTaskForFallbackRetryLimit(task: QueueTask, judge: JudgeResult | null): void {
|
||||
const count = fallbackJudgeRetryCount(task);
|
||||
const reason = `Fallback/non-LLM judge retry limit reached (${count}/${fallbackJudgeRetryLimit}). ${judge?.reason ?? task.lastJudge?.reason ?? "MiniMax judge was unavailable."}`;
|
||||
@@ -3900,11 +3942,12 @@ async function refreshSchedulerTasksFromDatabase(reason: string): Promise<number
|
||||
if (!databaseReady || !config.schedulerEnabled) return 0;
|
||||
const tasks = await loadTasksFromDatabase("hot");
|
||||
const changed = mergeSchedulerDatabaseTasks(tasks);
|
||||
const recovered = await recoverOrphanedActiveTasks("Scheduler recovered database active task without local run", `scheduler/${reason}-recovery`);
|
||||
if (changed > 0) {
|
||||
logger("info", "scheduler_database_hot_tasks_refreshed", { reason, changed, loaded: tasks.length });
|
||||
scheduleQueue();
|
||||
}
|
||||
return changed;
|
||||
return changed + recovered;
|
||||
}
|
||||
|
||||
function startSchedulerDatabasePoller(): void {
|
||||
|
||||
@@ -22,6 +22,7 @@ export interface SelfTestsContext {
|
||||
normalizeTask: (task: QueueTask) => QueueTask;
|
||||
nowIso: () => string;
|
||||
processingQueues: Set<string>;
|
||||
queueActiveTasksForRestartRetry: (reason: string, method: string, options?: { onlyActiveRuns?: boolean; taskIds?: Set<string> }) => number;
|
||||
queueHeadTask: (queueId: string, tasks?: QueueTask[]) => QueueTask | null;
|
||||
queuedStatusReason: (task: QueueTask, tasks?: QueueTask[]) => QueuedStatusReason | null;
|
||||
removeActiveRunSlotWaiter: (waiter: ActiveRunSlotWaiter) => void;
|
||||
@@ -251,7 +252,13 @@ function runQueueOrderingSelfTest(): JsonValue {
|
||||
mergeTargetEarly.queueId = "queue_merge_target";
|
||||
mergeSourceMiddle.queueId = "queue_merge_target";
|
||||
mergeTargetLate.queueId = "queue_merge_target";
|
||||
const orphanRunning = queueOrderTestTask("codex_4400_orphan_running", "running", "2026-05-11T13:00:00.000Z", "2026-05-11T13:00:00.000Z");
|
||||
orphanRunning.queueId = "queue_orphan_recovery";
|
||||
orphanRunning.activeTurnId = null;
|
||||
const queuedBehindOrphan = queueOrderTestTask("codex_4401_queued", "queued", "2026-05-11T13:01:00.000Z", "2026-05-11T13:01:00.000Z");
|
||||
queuedBehindOrphan.queueId = "queue_orphan_recovery";
|
||||
const originalMaxActiveQueues = ctx().config.maxActiveQueues;
|
||||
const beforeTasks = ctx().tasks().slice();
|
||||
|
||||
assertReferenceTest(ctx().queueHeadTask("queue_order_test", blockedByRetry)?.id === activeRetry.id, "retry_wait head must keep blocking a moved older-created task");
|
||||
assertReferenceTest(ctx().nextRunnableTaskFrom("queue_order_test", blockedByRetry)?.id === activeRetry.id, "next runnable should be the retry_wait head");
|
||||
@@ -289,6 +296,17 @@ function runQueueOrderingSelfTest(): JsonValue {
|
||||
ctx().removeActiveRunSlotWaiter(secondWaiter);
|
||||
}
|
||||
}
|
||||
try {
|
||||
ctx().tasks().push(orphanRunning, queuedBehindOrphan);
|
||||
const recovered = ctx().queueActiveTasksForRestartRetry("orphaned active task self-test", "self-test", { taskIds: new Set([orphanRunning.id]) });
|
||||
assertReferenceTest(recovered >= 1, "orphaned running task should be recovered for retry");
|
||||
assertReferenceTest(orphanRunning.status === "retry_wait", "orphaned running task should become retry_wait");
|
||||
assertReferenceTest(orphanRunning.activeTurnId === null, "orphaned running task should clear activeTurnId");
|
||||
assertReferenceTest(orphanRunning.lastError === "orphaned active task self-test", "orphaned running task should record recovery reason");
|
||||
assertReferenceTest(ctx().nextRunnableTaskFrom("queue_orphan_recovery")?.id === orphanRunning.id, "recovered orphan should be the next runnable queue head");
|
||||
} finally {
|
||||
ctx().tasks().splice(0, ctx().tasks().length, ...beforeTasks);
|
||||
}
|
||||
|
||||
return {
|
||||
ok: true,
|
||||
@@ -301,6 +319,7 @@ function runQueueOrderingSelfTest(): JsonValue {
|
||||
{ name: "max_active_queues_zero_is_unbounded", ok: true },
|
||||
{ name: "idle_processing_queue_does_not_consume_active_run_slot", ok: true },
|
||||
{ name: "active_run_slot_waiters_are_fifo", ok: true },
|
||||
{ name: "orphaned_running_task_recovers_to_retry_wait", ok: true },
|
||||
],
|
||||
};
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user