fix: recover code queue stale active tasks
This commit is contained in:
@@ -1549,15 +1549,11 @@ async function deleteTaskFromDatabase(taskId: string): Promise<void> {
|
||||
|
||||
async function claimTaskInDatabase(task: QueueTask, expectedQueueId: string): Promise<boolean> {
|
||||
if (!databaseReady) return true;
|
||||
const claimed = await withTimeout(
|
||||
sql.begin(async (client) => {
|
||||
const timeout = `${config.databaseClaimTimeoutMs}ms`;
|
||||
await client`SELECT set_config('statement_timeout', ${timeout}, true), set_config('lock_timeout', ${timeout}, true)`;
|
||||
return await upsertTaskToDatabase(client, task, { claimQueueId: expectedQueueId });
|
||||
}),
|
||||
config.databaseClaimTimeoutMs,
|
||||
`database claim timed out after ${config.databaseClaimTimeoutMs}ms`,
|
||||
);
|
||||
const timeout = `${config.databaseClaimTimeoutMs}ms`;
|
||||
const claimed = await sql.begin(async (client) => {
|
||||
await client`SELECT set_config('statement_timeout', ${timeout}, true), set_config('lock_timeout', ${timeout}, true)`;
|
||||
return await upsertTaskToDatabase(client, task, { claimQueueId: expectedQueueId });
|
||||
});
|
||||
if (claimed) return true;
|
||||
const databaseTask = await loadTaskFromDatabase(task.id);
|
||||
if (databaseTask !== null) reconcileHotTaskFromDatabase(databaseTask);
|
||||
@@ -2799,6 +2795,7 @@ configureSelfTests({
|
||||
removeActiveRunSlotWaiter,
|
||||
resolveReasoningEffort,
|
||||
runDatabaseClaimMoveSelfTest,
|
||||
taskIsRecoverableOrphanedActive,
|
||||
tasks: () => state.tasks,
|
||||
updateProcessingFlag,
|
||||
});
|
||||
@@ -3338,10 +3335,8 @@ function taskHasLocalExecutionRecoveryClaim(task: QueueTask): boolean {
|
||||
}
|
||||
|
||||
function activeTaskHasLocalRunSlotOrWaiter(task: QueueTask): boolean {
|
||||
const queueId = queueIdOf(task);
|
||||
return activeRunForTask(task) !== null
|
||||
|| activeRunSlotReservations.has(queueId)
|
||||
|| activeRunSlotWaiters.some((waiter) => waiter.taskId === task.id || waiter.queueId === queueId);
|
||||
|| activeRunSlotWaiters.some((waiter) => waiter.taskId === task.id);
|
||||
}
|
||||
|
||||
function orphanedActiveTaskAgeMs(task: QueueTask): number {
|
||||
@@ -3497,6 +3492,7 @@ async function runTask(task: QueueTask): Promise<void> {
|
||||
}
|
||||
if (!claimed) {
|
||||
releaseRunSlot();
|
||||
await recoverOrphanedActiveTasks("Database claim found stale active task without local run", "database/claim-recovery");
|
||||
return;
|
||||
}
|
||||
publishTaskOaEvent(task, "claim");
|
||||
|
||||
@@ -28,6 +28,7 @@ export interface SelfTestsContext {
|
||||
removeActiveRunSlotWaiter: (waiter: ActiveRunSlotWaiter) => void;
|
||||
resolveReasoningEffort: (model: string, explicit?: string | null) => string | null;
|
||||
runDatabaseClaimMoveSelfTest?: () => Promise<JsonValue | null>;
|
||||
taskIsRecoverableOrphanedActive: (task: QueueTask, staleMs?: number) => boolean;
|
||||
tasks: () => QueueTask[];
|
||||
updateProcessingFlag: () => void;
|
||||
}
|
||||
@@ -296,16 +297,23 @@ function runQueueOrderingSelfTest(): JsonValue {
|
||||
ctx().removeActiveRunSlotWaiter(secondWaiter);
|
||||
}
|
||||
}
|
||||
try {
|
||||
ctx().tasks().push(orphanRunning, queuedBehindOrphan);
|
||||
const recovered = ctx().queueActiveTasksForRestartRetry("orphaned active task self-test", "self-test", { taskIds: new Set([orphanRunning.id]) });
|
||||
assertReferenceTest(recovered >= 1, "orphaned running task should be recovered for retry");
|
||||
assertReferenceTest(orphanRunning.status === "retry_wait", "orphaned running task should become retry_wait");
|
||||
assertReferenceTest(orphanRunning.activeTurnId === null, "orphaned running task should clear activeTurnId");
|
||||
assertReferenceTest(orphanRunning.lastError === "orphaned active task self-test", "orphaned running task should record recovery reason");
|
||||
assertReferenceTest(ctx().nextRunnableTaskFrom("queue_orphan_recovery")?.id === orphanRunning.id, "recovered orphan should be the next runnable queue head");
|
||||
} finally {
|
||||
ctx().tasks().splice(0, ctx().tasks().length, ...beforeTasks);
|
||||
{
|
||||
const reservationMarker = "queue_orphan_recovery";
|
||||
const hadSameQueueReservation = ctx().activeRunSlotReservations.has(reservationMarker);
|
||||
try {
|
||||
ctx().tasks().push(orphanRunning, queuedBehindOrphan);
|
||||
ctx().activeRunSlotReservations.add(reservationMarker);
|
||||
assertReferenceTest(ctx().taskIsRecoverableOrphanedActive(orphanRunning, 0), "same-queue run slot reservation must not hide an orphaned task");
|
||||
const recovered = ctx().queueActiveTasksForRestartRetry("orphaned active task self-test", "self-test", { taskIds: new Set([orphanRunning.id]) });
|
||||
assertReferenceTest(recovered >= 1, "orphaned running task should be recovered for retry");
|
||||
assertReferenceTest(orphanRunning.status === "retry_wait", "orphaned running task should become retry_wait");
|
||||
assertReferenceTest(orphanRunning.activeTurnId === null, "orphaned running task should clear activeTurnId");
|
||||
assertReferenceTest(orphanRunning.lastError === "orphaned active task self-test", "orphaned running task should record recovery reason");
|
||||
assertReferenceTest(ctx().nextRunnableTaskFrom("queue_orphan_recovery")?.id === orphanRunning.id, "recovered orphan should be the next runnable queue head");
|
||||
} finally {
|
||||
if (!hadSameQueueReservation) ctx().activeRunSlotReservations.delete(reservationMarker);
|
||||
ctx().tasks().splice(0, ctx().tasks().length, ...beforeTasks);
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
|
||||
Reference in New Issue
Block a user