fix: guard code queue claim move race

This commit is contained in:
Codex
2026-05-17 06:54:30 +00:00
parent bf364baac8
commit b91fb1a2e1
4 changed files with 488 additions and 45 deletions
+1 -1
View File
@@ -26,7 +26,7 @@ UniDesk 的统一 CLI 入口是根目录 `scripts/cli.ts`,运行方式固定
- `codex task <taskId> --trace --tail|--from-start|--after-seq N|--before-seq N --limit N` 按页拉取 Code Queue 的逻辑 trace;响应会返回 `nextAfterSeq``previousBeforeSeq``hasMore``hasBefore` 和下一页/上一页命令,默认 `--trace` 取最新一页,需要完整 prompt/最后 response 时加 `--full`
- `codex output <taskId> --tail|--from-start|--after-seq N|--before-seq N --limit N [--full-text]` 按原始 output seq 分页读取底层记录;当 trace 行提示 `commandOmittedLines``bodyOmittedLines``rawSeqs` 时,用该命令按 seq 补取完整信息,默认仍有单条文本预览上限,显式 `--full-text` 才返回该页全文。
- `codex judge <taskId> --attempt N [--dry-run] [--include-prompt]` 通过 Code Queue 私有代理按指定 attempt 单步复现 judge;后端会从 PostgreSQL task JSON 与 output 归档重建该 attempt 在真实队列 worker 中的 `QueueTask`/`CodexRunResult`,再调用同一套 judge prompt builder 和 MiniMax 请求路径。默认会真实调用 MiniMax,`--dry-run` 只返回 prompt/payload 大小、attempt 窗口和重建来源诊断,`--include-prompt` 仅用于本地深度排查。
- Code Queue 多队列 lane 由 `codex` 命令命名空间管理:`queues` 列表、`queue create <queueId>` 创建、`queue merge <sourceQueueId> --into <targetQueueId>` 合并、`move <taskId> --queue <queueId>` 迁移;同一个 queue 内部串行执行,不同 queue 之间并行执行。合并会移动任务归属并自动删除源 queue 记录,只保留合并后的目标 queue;合并后的目标 queue 按任务原 `queueEnteredAt`/`createdAt` 时间顺序串行迁移 queued/retry_wait 任务后会立即调度目标 queue。
- Code Queue 多队列 lane 由 `codex` 命令命名空间管理:`queues` 列表、`queue create <queueId>` 创建、`queue merge <sourceQueueId> --into <targetQueueId>` 合并、`move <taskId> --queue <queueId>` 迁移;同一个 queue 内部串行执行,不同 queue 之间并行执行。迁移只允许尚未被 scheduler claim 的 `queued`/`retry_wait` 任务,必须满足 `startedAt=null``currentAttempt=0` 且没有 active thread/turn;已进入 `running`/`judging` 或已有 claim 标记的任务返回 409,不得被 move/merge 回写成 queued。合并会移动可迁移任务归属并自动删除源 queue 记录,只保留合并后的目标 queue;若 source 或 target queue 存在 active/claimed 任务,合并整体返回 409。合并后的目标 queue 按任务原 `queueEnteredAt`/`createdAt` 时间顺序串行,成功迁移 queued/retry_wait 任务后会立即调度目标 queue。
- `job list``job status` 查询 `.state/jobs/` 文件系统状态,是异步命令的可观测入口。
- `debug health``debug dispatch``debug task` 走真实内部 core、WebSocket、数据库、provider、系统指标、Docker 状态和 Host SSH 维护桥流程,只用于开发调试,不写入 `TEST.md` 的正式验收步骤。
- `e2e run [--only pattern[,pattern...]] [--skip pattern[,pattern...]]` 使用 publicHost 派生的公开 frontend/provider ingress URL,并通过 Docker 内网验证 core API、PostgreSQL、provider self-connection、系统指标曲线、Docker 状态快照、provider.upgrade 预检和 Playwright 前端页面,是交付前的自动化 E2E 门禁;CLI 默认输出 check 状态摘要,完整诊断写入 `resultPath`,日常迭代应优先用 `--only` / `--skip` 跑最小必要集合。
@@ -128,7 +128,7 @@ import {
readOaTraceStatsForTaskAttempts,
readOaTraceStatsForTasks,
} from "./oa-events";
import { configureSelfTests, runJudgeInfraSelfTest, runQueueOrderingSelfTest, runReferenceInjectionSelfTest, runTracePortSelfTest } from "./self-tests";
import { configureSelfTests, runJudgeInfraSelfTest, runQueueClaimMoveSelfTest, runQueueOrderingSelfTest, runReferenceInjectionSelfTest, runTracePortSelfTest } from "./self-tests";
import {
codexToolLifecycleStartedBeforeIn,
configureTaskView,
@@ -1185,7 +1185,12 @@ function updateNextSeqFromTasks(): void {
state.nextSeq = nextSeq;
}
async function upsertTaskToDatabase(client: SqlExecutor, task: QueueTask): Promise<void> {
interface UpsertTaskOptions {
claimQueueId?: string | null;
}
async function upsertTaskToDatabase(client: SqlExecutor, task: QueueTask, options: UpsertTaskOptions = {}): Promise<boolean> {
const claimQueueId = options.claimQueueId ?? null;
const rows = await client<Array<{ read_at: Date | string | null }>>`
INSERT INTO unidesk_code_queue_tasks (
id,
@@ -1292,9 +1297,62 @@ async function upsertTaskToDatabase(client: SqlExecutor, task: QueueTask): Promi
END,
true
)
WHERE (
${claimQueueId === null}
OR (
unidesk_code_queue_tasks.queue_id = ${claimQueueId}
AND (
(
unidesk_code_queue_tasks.status = 'queued'
AND unidesk_code_queue_tasks.started_at IS NULL
AND unidesk_code_queue_tasks.current_attempt = 0
AND unidesk_code_queue_tasks.codex_thread_id IS NULL
AND unidesk_code_queue_tasks.active_turn_id IS NULL
)
OR (
unidesk_code_queue_tasks.status = 'retry_wait'
AND unidesk_code_queue_tasks.active_turn_id IS NULL
)
)
)
)
AND NOT (
EXCLUDED.status IN ('queued', 'retry_wait')
AND EXCLUDED.started_at IS NULL
AND EXCLUDED.current_attempt = 0
AND EXCLUDED.codex_thread_id IS NULL
AND EXCLUDED.active_turn_id IS NULL
AND (
unidesk_code_queue_tasks.status IN ('running', 'judging')
OR unidesk_code_queue_tasks.started_at IS NOT NULL
OR unidesk_code_queue_tasks.current_attempt > 0
OR unidesk_code_queue_tasks.codex_thread_id IS NOT NULL
OR unidesk_code_queue_tasks.active_turn_id IS NOT NULL
)
)
RETURNING read_at
`;
if (rows.length === 0) {
const current = await client<DatabaseTaskStatusRow[]>`
SELECT id, queue_id, status, started_at, current_attempt, codex_thread_id, active_turn_id
FROM unidesk_code_queue_tasks
WHERE id = ${task.id}
LIMIT 1
`;
logger("warn", "database_task_stale_unclaimed_write_rejected", {
taskId: task.id,
attemptedQueueId: queueIdOf(task),
attemptedStatus: task.status,
attemptedStartedAt: task.startedAt,
attemptedCurrentAttempt: task.currentAttempt,
attemptedCodexThreadId: task.codexThreadId,
attemptedActiveTurnId: task.activeTurnId,
current: databaseStatusRowJson(current[0] ?? null),
});
return false;
}
task.readAt = timestampToIso(rows[0]?.read_at ?? null);
return true;
}
async function upsertQueueToDatabase(client: SqlExecutor, queue: QueueRecord): Promise<void> {
@@ -1347,6 +1405,16 @@ interface DatabaseTaskRow {
task_json: unknown;
}
interface DatabaseTaskStatusRow {
id: string;
queue_id: string;
status: TaskStatus;
started_at: Date | string | null;
current_attempt: number | string | null;
codex_thread_id: string | null;
active_turn_id: string | null;
}
interface DatabaseQueueRow {
id: string;
name: string;
@@ -1376,6 +1444,161 @@ function normalizeDatabaseTaskRows(rows: DatabaseTaskRow[], source: string): Que
return tasks.sort((left, right) => (timestampMs(left.createdAt) ?? 0) - (timestampMs(right.createdAt) ?? 0) || left.id.localeCompare(right.id));
}
function databaseStatusRowJson(row: DatabaseTaskStatusRow | null): JsonValue {
if (row === null) return null;
return {
id: row.id,
queueId: safeQueueId(row.queue_id),
status: row.status,
startedAt: timestampToIso(row.started_at),
currentAttempt: Number(row.current_attempt ?? 0),
codexThreadId: row.codex_thread_id,
activeTurnId: row.active_turn_id,
};
}
function taskIsUnclaimedMovable(task: QueueTask): boolean {
return (task.status === "queued" || task.status === "retry_wait")
&& task.startedAt === null
&& task.currentAttempt === 0
&& task.codexThreadId === null
&& task.activeTurnId === null;
}
function databaseTaskMoveBlocker(row: DatabaseTaskStatusRow | null): string {
if (row === null) return "task not found";
if (row.status !== "queued" && row.status !== "retry_wait") return `status=${row.status}`;
if (row.started_at !== null) return "task already has started_at";
if (Number(row.current_attempt ?? 0) !== 0) return `task already has current_attempt=${Number(row.current_attempt ?? 0)}`;
if (row.codex_thread_id !== null) return "task already has codex_thread_id";
if (row.active_turn_id !== null) return "task already has active_turn_id";
return "";
}
function taskMoveBlocker(task: QueueTask): string {
if (activeRunForTask(task) !== null) return "task has an active agent run";
if (processingQueues.has(queueIdOf(task))) return "queue processor is currently active";
if (activeRunSlotReservations.has(queueIdOf(task))) return "queue is reserving an active run slot";
if (activeRunSlotWaiters.some((waiter) => waiter.taskId === task.id || waiter.queueId === queueIdOf(task))) return "queue is waiting for an active run slot";
if (task.status !== "queued" && task.status !== "retry_wait") return `status=${task.status}`;
if (!taskIsUnclaimedMovable(task)) return "task has already been claimed";
return "";
}
function reconcileHotTaskFromDatabase(task: QueueTask): QueueTask {
const existing = findTask(task.id);
if (existing === null) return rememberHotTask(task);
if (activeRunForTask(existing) !== null) return existing;
Object.assign(existing, task);
return existing;
}
function taskHasClaimMarkers(task: QueueTask): boolean {
return task.status === "running"
|| task.status === "judging"
|| task.startedAt !== null
|| task.currentAttempt > 0
|| task.codexThreadId !== null
|| task.activeTurnId !== null;
}
function shouldPreferHotTaskOverDatabase(hotTask: QueueTask, databaseTask: QueueTask): boolean {
if (activeRunForTask(hotTask) !== null) return true;
if (taskIsUnclaimedMovable(hotTask) && taskHasClaimMarkers(databaseTask)) return false;
const hotUpdatedAt = timestampMs(hotTask.updatedAt) ?? 0;
const databaseUpdatedAt = timestampMs(databaseTask.updatedAt) ?? 0;
return hotUpdatedAt >= databaseUpdatedAt;
}
async function deleteTaskFromDatabase(taskId: string): Promise<void> {
if (!databaseReady) return;
await sql`
DELETE FROM unidesk_code_queue_tasks
WHERE id = ${taskId}
`;
}
async function claimTaskInDatabase(task: QueueTask, expectedQueueId: string): Promise<boolean> {
if (!databaseReady) return true;
const claimed = await sql.begin(async (client) => await upsertTaskToDatabase(client, task, { claimQueueId: expectedQueueId }));
if (claimed) return true;
const databaseTask = await loadTaskFromDatabase(task.id);
if (databaseTask !== null) reconcileHotTaskFromDatabase(databaseTask);
logger("warn", "task_claim_conflict", {
taskId: task.id,
expectedQueueId,
attemptedQueueId: queueIdOf(task),
attemptedStatus: task.status,
attemptedCurrentAttempt: task.currentAttempt,
});
return false;
}
async function runDatabaseClaimMoveSelfTest(): Promise<JsonValue | null> {
if (!databaseReady) return null;
const suffix = String(Date.now());
const taskId = `codex_claim_move_db_${suffix}`;
const queuedAt = nowIso();
const sourceQueueId = `claim_move_db_source_${suffix}`;
const targetQueueId = `claim_move_db_target_${suffix}`;
const before = state.tasks.slice();
const beforeQueues = state.queues.slice();
await deleteTaskFromDatabase(taskId);
try {
const queuedTask = normalizeTask({
...createTask({ prompt: "claim/move DB race self-test", queueId: sourceQueueId }),
id: taskId,
queueId: sourceQueueId,
queueEnteredAt: queuedAt,
createdAt: queuedAt,
updatedAt: queuedAt,
output: [],
});
await sql.begin(async (client) => {
await upsertQueueToDatabase(client, { id: sourceQueueId, name: sourceQueueId, createdAt: queuedAt, updatedAt: queuedAt });
await upsertTaskToDatabase(client, queuedTask);
});
const staleHotTask = normalizeTask(JSON.parse(JSON.stringify(queuedTask)) as QueueTask);
const claimedTask = normalizeTask(JSON.parse(JSON.stringify(queuedTask)) as QueueTask);
const claimedAt = nowIso();
claimedTask.status = "running";
claimedTask.startedAt = claimedAt;
claimedTask.currentAttempt = 1;
claimedTask.currentMode = "initial";
claimedTask.updatedAt = claimedAt;
const claimed = await claimTaskInDatabase(claimedTask, sourceQueueId);
if (!claimed) throw new Error("database claim self-test failed to claim queued task");
state.tasks.splice(0, state.tasks.length, staleHotTask);
const response = await moveTaskToQueue(staleHotTask, new Request(`http://code-queue.local/api/tasks/${taskId}/move`, {
method: "POST",
body: JSON.stringify({ queueId: targetQueueId }),
headers: { "content-type": "application/json" },
}));
const after = await loadTaskFromDatabase(taskId);
const body = await response.json() as Record<string, JsonValue>;
if (response.status !== 409) throw new Error(`database stale move should return 409, got ${response.status}`);
if (after === null) throw new Error("database self-test task disappeared after stale move");
if (after.status !== "running") throw new Error(`database self-test task status changed to ${after.status}`);
if (queueIdOf(after) !== sourceQueueId) throw new Error(`database self-test task queue changed to ${queueIdOf(after)}`);
if (after.currentAttempt !== 1 || after.startedAt === null) throw new Error("database self-test task claim markers were lost");
return {
ok: true,
taskId,
moveStatus: response.status,
databaseStatus: after.status,
databaseQueueId: queueIdOf(after),
currentAttempt: after.currentAttempt,
startedAt: after.startedAt,
response: body as JsonValue,
} as unknown as JsonValue;
} finally {
await deleteTaskFromDatabase(taskId);
await deleteDatabaseQueues([sourceQueueId, targetQueueId]);
state.tasks.splice(0, state.tasks.length, ...before);
state.queues.splice(0, state.queues.length, ...beforeQueues);
}
}
async function loadPrunedDatabaseTaskRows(where: "all" | "hot"): Promise<DatabaseTaskRow[]> {
return await sql<DatabaseTaskRow[]>`
SELECT id, updated_at, status, read_at, task_json
@@ -1674,12 +1897,21 @@ function rememberHotTask(task: QueueTask): QueueTask {
}
async function findTaskForRead(taskId: string): Promise<QueueTask | null> {
return findTask(taskId) ?? await loadTaskFromDatabase(taskId);
const hotTask = findTask(taskId);
if (!databaseReady) return hotTask;
const databaseTask = await loadTaskFromDatabase(taskId);
if (hotTask === null) return databaseTask;
if (databaseTask === null) return hotTask;
return shouldPreferHotTaskOverDatabase(hotTask, databaseTask) ? hotTask : databaseTask;
}
async function findTaskForMutation(taskId: string): Promise<QueueTask | null> {
const task = findTask(taskId) ?? await loadTaskFromDatabase(taskId);
return task === null ? null : rememberHotTask(task);
const hotTask = findTask(taskId);
if (!databaseReady) return hotTask;
const databaseTask = await loadTaskFromDatabase(taskId);
if (databaseTask === null) return hotTask;
if (hotTask === null) return rememberHotTask(databaseTask);
return shouldPreferHotTaskOverDatabase(hotTask, databaseTask) ? hotTask : reconcileHotTaskFromDatabase(databaseTask);
}
async function loadNextSeqFromDatabase(): Promise<number> {
@@ -1703,6 +1935,7 @@ async function flushDirtyTasksToDatabase(force = false): Promise<void> {
dirtyDatabaseTaskIds.clear();
dirtyDatabaseQueueIds.clear();
databaseFlushInFlight = true;
const rejectedTaskIds: string[] = [];
try {
await sql.begin(async (client) => {
for (const id of queueIds) {
@@ -1711,7 +1944,7 @@ async function flushDirtyTasksToDatabase(force = false): Promise<void> {
}
for (const id of ids) {
const task = state.tasks.find((item) => item.id === id);
if (task !== undefined) await upsertTaskToDatabase(client, task);
if (task !== undefined && !await upsertTaskToDatabase(client, task)) rejectedTaskIds.push(id);
}
});
databaseLastError = null;
@@ -1723,6 +1956,10 @@ async function flushDirtyTasksToDatabase(force = false): Promise<void> {
databaseFlushInFlight = false;
if (dirtyDatabaseTaskIds.size > 0 || dirtyDatabaseQueueIds.size > 0) scheduleDatabaseFlush();
}
for (const id of rejectedTaskIds) {
const databaseTask = await loadTaskFromDatabase(id);
if (databaseTask !== null) reconcileHotTaskFromDatabase(databaseTask);
}
}
async function initDatabasePersistence(): Promise<void> {
@@ -2446,6 +2683,7 @@ configureSelfTests({
defaultQueueId,
enqueueActiveRunSlotWaiter,
injectReferencedTaskContext,
moveTaskToQueueForTest: (task, req) => moveTaskToQueue(task, req, { bypassRoleCheck: true }),
nextRunnableTaskFrom,
normalizeTask,
nowIso,
@@ -2454,6 +2692,8 @@ configureSelfTests({
queuedStatusReason,
removeActiveRunSlotWaiter,
resolveReasoningEffort,
runDatabaseClaimMoveSelfTest,
tasks: () => state.tasks,
updateProcessingFlag,
});
@@ -2979,7 +3219,8 @@ function failTaskForFallbackRetryLimit(task: QueueTask, judge: JudgeResult | nul
}
async function runTask(task: QueueTask): Promise<void> {
logger("info", "task_processor_start", { taskId: task.id, queueId: queueIdOf(task), providerId: task.providerId, executionMode: task.executionMode, cwd: task.cwd, maxAttempts: task.maxAttempts, model: task.model, agentPort: codeAgentPortForModel(task.model), promptPreview: safePreview(task.prompt, 240) });
const claimQueueId = queueIdOf(task);
logger("info", "task_processor_start", { taskId: task.id, queueId: claimQueueId, providerId: task.providerId, executionMode: task.executionMode, cwd: task.cwd, maxAttempts: task.maxAttempts, model: task.model, agentPort: codeAgentPortForModel(task.model), promptPreview: safePreview(task.prompt, 240) });
if (task.status === "retry_wait" && task.lastJudge?.source === "fallback" && task.lastJudge.decision === "retry" && fallbackJudgeRetryCount(task) >= fallbackJudgeRetryLimit) {
failTaskForFallbackRetryLimit(task, task.lastJudge);
return;
@@ -3010,6 +3251,11 @@ async function runTask(task: QueueTask): Promise<void> {
task.readAt = null;
task.finishedAt = null;
task.updatedAt = startedAt;
if (!await claimTaskInDatabase(task, claimQueueId)) {
releaseRunSlot();
return;
}
publishTaskOaEvent(task, "claim");
logger("info", "task_run_start", { taskId: task.id, queueId: queueIdOf(task), attempt: task.currentAttempt, mode, providerId: task.providerId, executionMode: task.executionMode, cwd: task.cwd, maxAttempts: task.maxAttempts, model: task.model, agentPort: codeAgentPortForModel(task.model), freshRecovery: needsFreshRecoveryPrompt });
const attemptStartOutput = appendOutput(task, "system", `attempt ${task.currentAttempt}/${task.maxAttempts} queue=${queueIdOf(task)} provider=${task.providerId} executionMode=${task.executionMode} cwd=${task.cwd} mode=${mode} model=${task.model} port=${codeAgentPortForModel(task.model)}\n`, "queue");
@@ -3997,7 +4243,9 @@ function queueMergeBlocker(queueId: string): string | null {
if (activeRunSlotReservations.has(queueId)) return "queue is reserving an active run slot";
if (activeRunSlotWaiters.some((waiter) => waiter.queueId === queueId)) return "queue is waiting for an active run slot";
const activeTask = state.tasks.find((task) => queueIdOf(task) === queueId && (task.status === "running" || task.status === "judging"));
return activeTask === undefined ? null : `task ${activeTask.id} is ${activeTask.status}`;
if (activeTask !== undefined) return `task ${activeTask.id} is ${activeTask.status}`;
const claimedPendingTask = state.tasks.find((task) => queueIdOf(task) === queueId && (task.status === "queued" || task.status === "retry_wait") && !taskIsUnclaimedMovable(task));
return claimedPendingTask === undefined ? null : `task ${claimedPendingTask.id} has already been claimed`;
}
function parseSourceQueueIds(record: Record<string, unknown>, targetQueueId: string): string[] {
@@ -4020,27 +4268,117 @@ function parseSourceQueueIds(record: Record<string, unknown>, targetQueueId: str
return ids;
}
async function mergeDatabaseQueueTasks(sourceQueueIds: string[], targetQueueId: string): Promise<string[]> {
if (!databaseReady || sourceQueueIds.length === 0) return [];
const rows = await sql<Array<{ id: string }>>`
UPDATE unidesk_code_queue_tasks
SET
queue_id = ${targetQueueId},
task_json = jsonb_set(
jsonb_set(
task_json,
'{queueId}',
to_jsonb(${targetQueueId}::text),
async function mergeDatabaseQueueTasks(sourceQueueIds: string[], targetQueueId: string, mergedAt: string): Promise<{ movedTaskIds: string[]; blocker: DatabaseTaskStatusRow | null }> {
if (!databaseReady || sourceQueueIds.length === 0) return { movedTaskIds: [], blocker: null };
return await sql.begin(async (client) => {
const mergeQueueIds = Array.from(new Set([targetQueueId, ...sourceQueueIds]));
const lockedRows = await client<DatabaseTaskStatusRow[]>`
SELECT id, queue_id, status, started_at, current_attempt, codex_thread_id, active_turn_id
FROM unidesk_code_queue_tasks
WHERE queue_id IN ${client(mergeQueueIds)}
ORDER BY updated_at DESC, id DESC
FOR UPDATE
`;
const blocker = lockedRows.find((row) => {
return row.status === "running"
|| row.status === "judging"
|| (
(row.status === "queued" || row.status === "retry_wait")
&& (
row.started_at !== null
|| Number(row.current_attempt ?? 0) > 0
|| row.codex_thread_id !== null
|| row.active_turn_id !== null
)
);
}) ?? null;
if (blocker !== null) return { movedTaskIds: [], blocker };
const rows = await client<Array<{ id: string }>>`
UPDATE unidesk_code_queue_tasks
SET
queue_id = ${targetQueueId},
updated_at = ${mergedAt},
task_json = jsonb_set(
jsonb_set(
jsonb_set(
task_json,
'{queueId}',
to_jsonb(${targetQueueId}::text),
true
),
'{queueEnteredAt}',
to_jsonb(COALESCE(NULLIF(task_json->>'queueEnteredAt', ''), to_char(created_at AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.MS"Z"'))::text),
true
),
'{updatedAt}',
to_jsonb(${mergedAt}::text),
true
),
'{queueEnteredAt}',
to_jsonb(COALESCE(NULLIF(task_json->>'queueEnteredAt', ''), to_char(created_at AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.MS"Z"'))::text),
true
)
WHERE queue_id IN ${sql(sourceQueueIds)}
RETURNING id
`;
return rows.map((row) => row.id);
)
WHERE queue_id IN ${client(sourceQueueIds)}
AND (
status IN ('succeeded', 'failed', 'canceled')
OR (
status IN ('queued', 'retry_wait')
AND started_at IS NULL
AND current_attempt = 0
AND codex_thread_id IS NULL
AND active_turn_id IS NULL
)
)
RETURNING id
`;
return { movedTaskIds: rows.map((row) => row.id), blocker: null };
});
}
async function moveDatabaseTaskToQueue(taskId: string, targetQueueId: string, movedAt: string): Promise<{ ok: boolean; row: DatabaseTaskStatusRow | null; previousQueueId: string | null; blocker: string }> {
if (!databaseReady) return { ok: true, row: null, previousQueueId: null, blocker: "" };
return await sql.begin(async (client) => {
const rows = await client<DatabaseTaskStatusRow[]>`
SELECT id, queue_id, status, started_at, current_attempt, codex_thread_id, active_turn_id
FROM unidesk_code_queue_tasks
WHERE id = ${taskId}
LIMIT 1
FOR UPDATE
`;
const row = rows[0] ?? null;
const blocker = databaseTaskMoveBlocker(row);
if (blocker.length > 0) return { ok: false, row, previousQueueId: row === null ? null : safeQueueId(row.queue_id), blocker };
const previousQueueId = safeQueueId(row?.queue_id);
const updated = await client<DatabaseTaskStatusRow[]>`
UPDATE unidesk_code_queue_tasks
SET
queue_id = ${targetQueueId},
updated_at = ${movedAt},
task_json = jsonb_set(
jsonb_set(
jsonb_set(
task_json,
'{queueId}',
to_jsonb(${targetQueueId}::text),
true
),
'{queueEnteredAt}',
to_jsonb(${movedAt}::text),
true
),
'{updatedAt}',
to_jsonb(${movedAt}::text),
true
)
WHERE id = ${taskId}
AND status IN ('queued', 'retry_wait')
AND started_at IS NULL
AND current_attempt = 0
AND codex_thread_id IS NULL
AND active_turn_id IS NULL
RETURNING id, queue_id, status, started_at, current_attempt, codex_thread_id, active_turn_id
`;
const updatedRow = updated[0] ?? null;
return updatedRow === null
? { ok: false, row, previousQueueId, blocker: "conditional update matched no rows" }
: { ok: true, row: updatedRow, previousQueueId, blocker: "" };
});
}
async function deleteDatabaseQueues(queueIds: string[]): Promise<string[]> {
@@ -4097,6 +4435,17 @@ async function mergeQueues(targetQueueIdValue: string | null, req: Request): Pro
}
const mergedAt = nowIso();
const databaseMerge = await mergeDatabaseQueueTasks(sourceQueueIds, targetQueueId, mergedAt);
if (databaseMerge.blocker !== null) {
const blockerQueueId = safeQueueId(databaseMerge.blocker.queue_id);
const databaseTask = await loadTaskFromDatabase(databaseMerge.blocker.id);
if (databaseTask !== null) reconcileHotTaskFromDatabase(databaseTask);
return jsonResponse({
ok: false,
error: `cannot merge queue ${blockerQueueId}: task ${databaseMerge.blocker.id} is already claimed (${databaseTaskMoveBlocker(databaseMerge.blocker) || databaseMerge.blocker.status})`,
blocker: databaseStatusRowJson(databaseMerge.blocker),
}, 409);
}
const targetQueue = ensureQueue(targetQueueId);
const sourceQueues = sourceQueueIds.map((id) => queueSnapshot(id, mergedAt));
targetQueue.updatedAt = mergedAt;
@@ -4109,11 +4458,11 @@ async function mergeQueues(targetQueueIdValue: string | null, req: Request): Pro
if (!sourceSet.has(previousQueueId)) continue;
task.queueEnteredAt = taskQueueEnteredAt(task);
task.queueId = targetQueueId;
task.updatedAt = mergedAt;
hotMovedTasks.push(task);
markTaskDirty(task.id);
publishTaskOaEvent(task, "queue-merged");
}
const databaseMovedTaskIds = await mergeDatabaseQueueTasks(sourceQueueIds, targetQueueId);
const deletedSourceQueues = deleteQueuesFromState(sourceQueueIds);
const databaseDeletedQueueIds = await deleteDatabaseQueues(sourceQueueIds);
persistState(false);
@@ -4125,14 +4474,14 @@ async function mergeQueues(targetQueueIdValue: string | null, req: Request): Pro
sourceQueueIds,
deletedSourceQueueIds: deletedSourceQueues.map((queue) => queue.id),
hotMovedTaskCount: hotMovedTasks.length,
databaseMovedTaskCount: databaseReady ? databaseMovedTaskIds.length : null,
databaseMovedTaskCount: databaseReady ? databaseMerge.movedTaskIds.length : null,
databaseDeletedQueueIds: databaseReady ? databaseDeletedQueueIds : null,
});
for (const id of mergeQueueIds) mergingQueues.delete(id);
scheduleQueue(targetQueueId);
await flushDirtyTasksToDatabase(true);
const tasks = await loadAllTasksForRead();
const movedIdSet = new Set(databaseReady ? databaseMovedTaskIds : hotMovedTasks.map((task) => task.id));
const movedIdSet = new Set(databaseReady ? databaseMerge.movedTaskIds : hotMovedTasks.map((task) => task.id));
const orderedMovedTaskIds = tasks
.filter((task) => movedIdSet.has(task.id))
.sort(compareTaskQueueOrder)
@@ -4145,7 +4494,7 @@ async function mergeQueues(targetQueueIdValue: string | null, req: Request): Pro
ok: true,
targetQueueId,
sourceQueueIds,
mergedTaskCount: databaseReady ? databaseMovedTaskIds.length : hotMovedTasks.length,
mergedTaskCount: databaseReady ? databaseMerge.movedTaskIds.length : hotMovedTasks.length,
movedTaskIds: orderedMovedTaskIds.slice(0, 500),
targetTaskOrder: targetTaskOrder.slice(0, 500),
order: "merged tasks keep their original queueEnteredAt/createdAt ordering; source queue records are deleted after merge",
@@ -4161,17 +4510,37 @@ async function mergeQueues(targetQueueIdValue: string | null, req: Request): Pro
}
}
async function moveTaskToQueue(task: QueueTask, req: Request): Promise<Response> {
if (!serviceRoleAllowsWrite(config.serviceRole)) return readOnlyRejectResponse(req.method, `/api/tasks/${task.id}/move`);
if (task.status === "running" || task.status === "judging") {
return jsonResponse({ ok: false, error: `cannot move active task ${task.id} while status=${task.status}`, task: taskForResponse(task) }, 409);
}
async function moveTaskToQueue(task: QueueTask, req: Request, options: { bypassRoleCheck?: boolean } = {}): Promise<Response> {
if (options.bypassRoleCheck !== true && !serviceRoleAllowsWrite(config.serviceRole)) return readOnlyRejectResponse(req.method, `/api/tasks/${task.id}/move`);
const body = await readJson(req);
const record = typeof body === "object" && body !== null && !Array.isArray(body) ? body as Record<string, unknown> : {};
const queueId = normalizeQueueId(record.queueId ?? record.id);
const previousQueueId = queueIdOf(task);
const queue = ensureQueue(queueId);
const movedAt = nowIso();
const hotBlocker = taskMoveBlocker(task);
if (hotBlocker.length > 0) {
const databaseTask = databaseReady ? await loadTaskFromDatabase(task.id) : null;
if (databaseTask !== null) task = reconcileHotTaskFromDatabase(databaseTask);
return jsonResponse({
ok: false,
error: `cannot move task ${task.id}: ${hotBlocker}`,
task: taskForResponse(task),
databaseTask: databaseTask === null ? null : taskForResponse(databaseTask),
}, 409);
}
const databaseMove = await moveDatabaseTaskToQueue(task.id, queueId, movedAt);
if (!databaseMove.ok) {
const databaseTask = databaseReady ? await loadTaskFromDatabase(task.id) : null;
if (databaseTask !== null) task = reconcileHotTaskFromDatabase(databaseTask);
return jsonResponse({
ok: false,
error: `cannot move task ${task.id}: ${databaseMove.blocker}`,
blocker: databaseStatusRowJson(databaseMove.row),
task: taskForResponse(task),
databaseTask: databaseTask === null ? null : taskForResponse(databaseTask),
}, databaseMove.row === null ? 404 : 409);
}
const previousQueueId = databaseMove.previousQueueId ?? queueIdOf(task);
const queue = ensureQueue(queueId);
queue.updatedAt = movedAt;
markQueueDirty(queue.id);
task.queueId = queueId;
@@ -4301,6 +4670,7 @@ async function route(req: Request): Promise<Response> {
if (url.pathname === "/api/judge/probe" && (req.method === "GET" || req.method === "POST")) return await runJudgeProbe();
if (url.pathname === "/api/judge/self-test" && (req.method === "GET" || req.method === "POST")) return jsonResponse(runJudgeInfraSelfTest());
if (url.pathname === "/api/queue-order/self-test" && (req.method === "GET" || req.method === "POST")) return jsonResponse(runQueueOrderingSelfTest());
if (url.pathname === "/api/queue-claim-move/self-test" && (req.method === "GET" || req.method === "POST")) return jsonResponse(await runQueueClaimMoveSelfTest());
if (url.pathname === "/api/reference-injection/self-test" && (req.method === "GET" || req.method === "POST")) return jsonResponse(await runReferenceInjectionSelfTest());
if (url.pathname === "/api/trace-port/self-test" && (req.method === "GET" || req.method === "POST")) return jsonResponse(runTracePortSelfTest());
if (url.pathname === "/api/oa/backfill" && (req.method === "GET" || req.method === "POST")) return jsonResponse(await backfillOaTraceStats(url));
@@ -444,7 +444,8 @@ async function loadAllTasksForRead(): Promise<QueueTask[]> {
const tasks = await ctx().loadTasksFromDatabase("all");
const byId = new Map(tasks.map((task) => [task.id, task]));
for (const active of ctx().tasks()) {
byId.set(active.id, active);
const databaseTask = byId.get(active.id);
if (databaseTask === undefined || preferHotTaskForRead(active, databaseTask)) byId.set(active.id, active);
}
ctx().runGarbageCollection();
return Array.from(byId.values()).sort((left, right) => (timestampMs(left.createdAt) ?? 0) - (timestampMs(right.createdAt) ?? 0) || left.id.localeCompare(right.id));
@@ -581,6 +582,30 @@ function activePriority(task: QueueTask): number {
return statusRank[task.status] ?? 9;
}
function taskHasClaimMarkers(task: QueueTask): boolean {
return task.status === "running"
|| task.status === "judging"
|| task.startedAt !== null
|| task.currentAttempt > 0
|| task.codexThreadId !== null
|| task.activeTurnId !== null;
}
function taskIsUnclaimedQueued(task: QueueTask): boolean {
return (task.status === "queued" || task.status === "retry_wait")
&& task.startedAt === null
&& task.currentAttempt === 0
&& task.codexThreadId === null
&& task.activeTurnId === null;
}
function preferHotTaskForRead(hotTask: QueueTask, databaseTask: QueueTask): boolean {
const hotActiveRun = Array.from(ctx().activeRuns.values()).some((run) => run.taskId === hotTask.id);
if (hotActiveRun) return true;
if (taskIsUnclaimedQueued(hotTask) && taskHasClaimMarkers(databaseTask)) return false;
return taskUpdatedSortValue(hotTask) >= taskUpdatedSortValue(databaseTask);
}
function taskUpdatedSortValue(task: QueueTask): number {
const time = Date.parse(task.updatedAt || task.createdAt);
return Number.isFinite(time) ? time : 0;
@@ -1051,7 +1076,8 @@ async function databaseTasksOverviewResponse(url: URL): Promise<Response | null>
const byId = new Map<string, QueueTask>();
for (const task of loadedTasks) byId.set(task.id, task);
for (const task of ctx().tasks()) {
if (seenIds.has(task.id) || byId.has(task.id)) byId.set(task.id, task);
const databaseTask = byId.get(task.id);
if ((seenIds.has(task.id) || databaseTask !== undefined) && (databaseTask === undefined || preferHotTaskForRead(task, databaseTask))) byId.set(task.id, task);
}
const rowsSource = orderedIds
.map((id) => byId.get(id) ?? null)
@@ -1063,8 +1089,13 @@ async function databaseTasksOverviewResponse(url: URL): Promise<Response | null>
let selectedTask: QueueTask | null = null;
for (const id of selectedCandidates) {
if (id.length === 0) continue;
const candidate = ctx().tasks().find((task) => task.id === id)
?? await ctx().loadTaskFromDatabase(id);
const hotCandidate = ctx().tasks().find((task) => task.id === id) ?? null;
const databaseCandidate = await ctx().loadTaskFromDatabase(id);
const candidate = hotCandidate === null
? databaseCandidate
: databaseCandidate === null || preferHotTaskForRead(hotCandidate, databaseCandidate)
? hotCandidate
: databaseCandidate;
if (candidate !== null && taskMatchesQueueFilter(candidate, queueId)) {
selectedTask = candidate;
break;
@@ -17,6 +17,7 @@ export interface SelfTestsContext {
defaultQueueId: string;
enqueueActiveRunSlotWaiter: (task: QueueTask) => ActiveRunSlotWaiter;
injectReferencedTaskContext: (request: QueueTaskRequest, finder?: (id: string) => QueueTask | null | Promise<QueueTask | null>, injectedAt?: string) => Promise<QueueTaskRequest>;
moveTaskToQueueForTest: (task: QueueTask, req: Request) => Promise<Response>;
nextRunnableTaskFrom: (queueId: string, tasks?: QueueTask[]) => QueueTask | null;
normalizeTask: (task: QueueTask) => QueueTask;
nowIso: () => string;
@@ -25,6 +26,8 @@ export interface SelfTestsContext {
queuedStatusReason: (task: QueueTask, tasks?: QueueTask[]) => QueuedStatusReason | null;
removeActiveRunSlotWaiter: (waiter: ActiveRunSlotWaiter) => void;
resolveReasoningEffort: (model: string, explicit?: string | null) => string | null;
runDatabaseClaimMoveSelfTest?: () => Promise<JsonValue | null>;
tasks: () => QueueTask[];
updateProcessingFlag: () => void;
}
@@ -192,6 +195,45 @@ function queueOrderTestTask(id: string, status: TaskStatus, createdAt: string, q
return ctx().normalizeTask(task);
}
async function runQueueClaimMoveSelfTest(): Promise<JsonValue> {
const at = "2026-05-17T06:09:46.702Z";
const task = queueOrderTestTask("codex_claim_move_self_test", "running", at, at);
task.queueId = "claim_move_source";
task.startedAt = at;
task.currentAttempt = 1;
task.currentMode = "initial";
task.codexThreadId = "thread_claim_move_self_test";
task.activeTurnId = "turn_claim_move_self_test";
task.updatedAt = at;
const before = ctx().tasks().slice();
ctx().tasks().push(task);
try {
const response = await ctx().moveTaskToQueueForTest(task, new Request("http://code-queue.local/api/tasks/codex_claim_move_self_test/move", {
method: "POST",
body: JSON.stringify({ queueId: "claim_move_target" }),
headers: { "content-type": "application/json" },
}));
const body = await response.json() as Record<string, JsonValue>;
assertReferenceTest(response.status === 409, "moving a claimed/running task must return 409");
assertReferenceTest(task.queueId === "claim_move_source", "running task queueId must remain unchanged after rejected move");
assertReferenceTest(task.status === "running", "running task status must remain running after rejected move");
assertReferenceTest(task.currentAttempt === 1, "running task currentAttempt must remain claimed after rejected move");
const databaseRace = await ctx().runDatabaseClaimMoveSelfTest?.() ?? null;
return {
ok: true,
cases: [
{ name: "move_running_task_returns_409", ok: true, status: response.status },
{ name: "rejected_move_preserves_queue", ok: true, queueId: task.queueId },
{ name: "rejected_move_preserves_claim_markers", ok: true, status: task.status, currentAttempt: task.currentAttempt, startedAt: task.startedAt },
...(databaseRace === null ? [] : [{ name: "database_claim_blocks_stale_move", ok: true, result: databaseRace }]),
],
response: body as JsonValue,
};
} finally {
ctx().tasks().splice(0, ctx().tasks().length, ...before);
}
}
function runQueueOrderingSelfTest(): JsonValue {
const activeRetry = queueOrderTestTask("codex_4000_active", "retry_wait", "2026-05-11T09:00:00.000Z", "2026-05-11T09:00:00.000Z");
const movedOlderCreated = queueOrderTestTask("codex_3999_moved", "queued", "2026-05-11T08:00:00.000Z", "2026-05-11T08:00:00.000Z") as QueueTask & { queueEnteredAt?: string };
@@ -476,4 +518,4 @@ function runJudgeInfraSelfTest(): JsonValue {
};
}
export { runJudgeInfraSelfTest, runQueueOrderingSelfTest, runReferenceInjectionSelfTest, runTracePortSelfTest };
export { runJudgeInfraSelfTest, runQueueClaimMoveSelfTest, runQueueOrderingSelfTest, runReferenceInjectionSelfTest, runTracePortSelfTest };