diff --git a/docs/reference/cli.md b/docs/reference/cli.md index 49caaad6..0165f35c 100644 --- a/docs/reference/cli.md +++ b/docs/reference/cli.md @@ -26,7 +26,7 @@ UniDesk 的统一 CLI 入口是根目录 `scripts/cli.ts`,运行方式固定 - `codex task --trace --tail|--from-start|--after-seq N|--before-seq N --limit N` 按页拉取 Code Queue 的逻辑 trace;响应会返回 `nextAfterSeq`、`previousBeforeSeq`、`hasMore`、`hasBefore` 和下一页/上一页命令,默认 `--trace` 取最新一页,需要完整 prompt/最后 response 时加 `--full`。 - `codex output --tail|--from-start|--after-seq N|--before-seq N --limit N [--full-text]` 按原始 output seq 分页读取底层记录;当 trace 行提示 `commandOmittedLines`、`bodyOmittedLines` 或 `rawSeqs` 时,用该命令按 seq 补取完整信息,默认仍有单条文本预览上限,显式 `--full-text` 才返回该页全文。 - `codex judge --attempt N [--dry-run] [--include-prompt]` 通过 Code Queue 私有代理按指定 attempt 单步复现 judge;后端会从 PostgreSQL task JSON 与 output 归档重建该 attempt 在真实队列 worker 中的 `QueueTask`/`CodexRunResult`,再调用同一套 judge prompt builder 和 MiniMax 请求路径。默认会真实调用 MiniMax,`--dry-run` 只返回 prompt/payload 大小、attempt 窗口和重建来源诊断,`--include-prompt` 仅用于本地深度排查。 -- Code Queue 多队列 lane 由 `codex` 命令命名空间管理:`queues` 列表、`queue create ` 创建、`queue merge --into ` 合并、`move --queue ` 迁移;同一个 queue 内部串行执行,不同 queue 之间并行执行。合并会移动任务归属并自动删除源 queue 记录,只保留合并后的目标 queue;合并后的目标 queue 按任务原 `queueEnteredAt`/`createdAt` 时间顺序串行。迁移 queued/retry_wait 任务后会立即调度目标 queue。 +- Code Queue 多队列 lane 由 `codex` 命令命名空间管理:`queues` 列表、`queue create ` 创建、`queue merge --into ` 合并、`move --queue ` 迁移;同一个 queue 内部串行执行,不同 queue 之间并行执行。迁移只允许尚未被 scheduler claim 的 `queued`/`retry_wait` 任务,必须满足 `startedAt=null`、`currentAttempt=0` 且没有 active thread/turn;已进入 `running`/`judging` 或已有 claim 标记的任务返回 409,不得被 move/merge 回写成 queued。合并会移动可迁移任务归属并自动删除源 queue 记录,只保留合并后的目标 queue;若 source 或 target queue 存在 active/claimed 任务,合并整体返回 409。合并后的目标 queue 按任务原 `queueEnteredAt`/`createdAt` 时间顺序串行,成功迁移 queued/retry_wait 任务后会立即调度目标 queue。 - `job list` 与 `job status` 查询 `.state/jobs/` 文件系统状态,是异步命令的可观测入口。 - `debug health`、`debug dispatch` 与 `debug task` 走真实内部 core、WebSocket、数据库、provider、系统指标、Docker 状态和 Host SSH 维护桥流程,只用于开发调试,不写入 `TEST.md` 的正式验收步骤。 - `e2e run [--only pattern[,pattern...]] [--skip pattern[,pattern...]]` 使用 publicHost 派生的公开 frontend/provider ingress URL,并通过 Docker 内网验证 core API、PostgreSQL、provider self-connection、系统指标曲线、Docker 状态快照、provider.upgrade 预检和 Playwright 前端页面,是交付前的自动化 E2E 门禁;CLI 默认输出 check 状态摘要,完整诊断写入 `resultPath`,日常迭代应优先用 `--only` / `--skip` 跑最小必要集合。 diff --git a/src/components/microservices/code-queue/src/index.ts b/src/components/microservices/code-queue/src/index.ts index fe758aa6..571186ba 100644 --- a/src/components/microservices/code-queue/src/index.ts +++ b/src/components/microservices/code-queue/src/index.ts @@ -128,7 +128,7 @@ import { readOaTraceStatsForTaskAttempts, readOaTraceStatsForTasks, } from "./oa-events"; -import { configureSelfTests, runJudgeInfraSelfTest, runQueueOrderingSelfTest, runReferenceInjectionSelfTest, runTracePortSelfTest } from "./self-tests"; +import { configureSelfTests, runJudgeInfraSelfTest, runQueueClaimMoveSelfTest, runQueueOrderingSelfTest, runReferenceInjectionSelfTest, runTracePortSelfTest } from "./self-tests"; import { codexToolLifecycleStartedBeforeIn, configureTaskView, @@ -1185,7 +1185,12 @@ function updateNextSeqFromTasks(): void { state.nextSeq = nextSeq; } -async function upsertTaskToDatabase(client: SqlExecutor, task: QueueTask): Promise { +interface UpsertTaskOptions { + claimQueueId?: string | null; +} + +async function upsertTaskToDatabase(client: SqlExecutor, task: QueueTask, options: UpsertTaskOptions = {}): Promise { + const claimQueueId = options.claimQueueId ?? null; const rows = await client>` INSERT INTO unidesk_code_queue_tasks ( id, @@ -1292,9 +1297,62 @@ async function upsertTaskToDatabase(client: SqlExecutor, task: QueueTask): Promi END, true ) + WHERE ( + ${claimQueueId === null} + OR ( + unidesk_code_queue_tasks.queue_id = ${claimQueueId} + AND ( + ( + unidesk_code_queue_tasks.status = 'queued' + AND unidesk_code_queue_tasks.started_at IS NULL + AND unidesk_code_queue_tasks.current_attempt = 0 + AND unidesk_code_queue_tasks.codex_thread_id IS NULL + AND unidesk_code_queue_tasks.active_turn_id IS NULL + ) + OR ( + unidesk_code_queue_tasks.status = 'retry_wait' + AND unidesk_code_queue_tasks.active_turn_id IS NULL + ) + ) + ) + ) + AND NOT ( + EXCLUDED.status IN ('queued', 'retry_wait') + AND EXCLUDED.started_at IS NULL + AND EXCLUDED.current_attempt = 0 + AND EXCLUDED.codex_thread_id IS NULL + AND EXCLUDED.active_turn_id IS NULL + AND ( + unidesk_code_queue_tasks.status IN ('running', 'judging') + OR unidesk_code_queue_tasks.started_at IS NOT NULL + OR unidesk_code_queue_tasks.current_attempt > 0 + OR unidesk_code_queue_tasks.codex_thread_id IS NOT NULL + OR unidesk_code_queue_tasks.active_turn_id IS NOT NULL + ) + ) RETURNING read_at `; + if (rows.length === 0) { + const current = await client` + SELECT id, queue_id, status, started_at, current_attempt, codex_thread_id, active_turn_id + FROM unidesk_code_queue_tasks + WHERE id = ${task.id} + LIMIT 1 + `; + logger("warn", "database_task_stale_unclaimed_write_rejected", { + taskId: task.id, + attemptedQueueId: queueIdOf(task), + attemptedStatus: task.status, + attemptedStartedAt: task.startedAt, + attemptedCurrentAttempt: task.currentAttempt, + attemptedCodexThreadId: task.codexThreadId, + attemptedActiveTurnId: task.activeTurnId, + current: databaseStatusRowJson(current[0] ?? null), + }); + return false; + } task.readAt = timestampToIso(rows[0]?.read_at ?? null); + return true; } async function upsertQueueToDatabase(client: SqlExecutor, queue: QueueRecord): Promise { @@ -1347,6 +1405,16 @@ interface DatabaseTaskRow { task_json: unknown; } +interface DatabaseTaskStatusRow { + id: string; + queue_id: string; + status: TaskStatus; + started_at: Date | string | null; + current_attempt: number | string | null; + codex_thread_id: string | null; + active_turn_id: string | null; +} + interface DatabaseQueueRow { id: string; name: string; @@ -1376,6 +1444,161 @@ function normalizeDatabaseTaskRows(rows: DatabaseTaskRow[], source: string): Que return tasks.sort((left, right) => (timestampMs(left.createdAt) ?? 0) - (timestampMs(right.createdAt) ?? 0) || left.id.localeCompare(right.id)); } +function databaseStatusRowJson(row: DatabaseTaskStatusRow | null): JsonValue { + if (row === null) return null; + return { + id: row.id, + queueId: safeQueueId(row.queue_id), + status: row.status, + startedAt: timestampToIso(row.started_at), + currentAttempt: Number(row.current_attempt ?? 0), + codexThreadId: row.codex_thread_id, + activeTurnId: row.active_turn_id, + }; +} + +function taskIsUnclaimedMovable(task: QueueTask): boolean { + return (task.status === "queued" || task.status === "retry_wait") + && task.startedAt === null + && task.currentAttempt === 0 + && task.codexThreadId === null + && task.activeTurnId === null; +} + +function databaseTaskMoveBlocker(row: DatabaseTaskStatusRow | null): string { + if (row === null) return "task not found"; + if (row.status !== "queued" && row.status !== "retry_wait") return `status=${row.status}`; + if (row.started_at !== null) return "task already has started_at"; + if (Number(row.current_attempt ?? 0) !== 0) return `task already has current_attempt=${Number(row.current_attempt ?? 0)}`; + if (row.codex_thread_id !== null) return "task already has codex_thread_id"; + if (row.active_turn_id !== null) return "task already has active_turn_id"; + return ""; +} + +function taskMoveBlocker(task: QueueTask): string { + if (activeRunForTask(task) !== null) return "task has an active agent run"; + if (processingQueues.has(queueIdOf(task))) return "queue processor is currently active"; + if (activeRunSlotReservations.has(queueIdOf(task))) return "queue is reserving an active run slot"; + if (activeRunSlotWaiters.some((waiter) => waiter.taskId === task.id || waiter.queueId === queueIdOf(task))) return "queue is waiting for an active run slot"; + if (task.status !== "queued" && task.status !== "retry_wait") return `status=${task.status}`; + if (!taskIsUnclaimedMovable(task)) return "task has already been claimed"; + return ""; +} + +function reconcileHotTaskFromDatabase(task: QueueTask): QueueTask { + const existing = findTask(task.id); + if (existing === null) return rememberHotTask(task); + if (activeRunForTask(existing) !== null) return existing; + Object.assign(existing, task); + return existing; +} + +function taskHasClaimMarkers(task: QueueTask): boolean { + return task.status === "running" + || task.status === "judging" + || task.startedAt !== null + || task.currentAttempt > 0 + || task.codexThreadId !== null + || task.activeTurnId !== null; +} + +function shouldPreferHotTaskOverDatabase(hotTask: QueueTask, databaseTask: QueueTask): boolean { + if (activeRunForTask(hotTask) !== null) return true; + if (taskIsUnclaimedMovable(hotTask) && taskHasClaimMarkers(databaseTask)) return false; + const hotUpdatedAt = timestampMs(hotTask.updatedAt) ?? 0; + const databaseUpdatedAt = timestampMs(databaseTask.updatedAt) ?? 0; + return hotUpdatedAt >= databaseUpdatedAt; +} + +async function deleteTaskFromDatabase(taskId: string): Promise { + if (!databaseReady) return; + await sql` + DELETE FROM unidesk_code_queue_tasks + WHERE id = ${taskId} + `; +} + +async function claimTaskInDatabase(task: QueueTask, expectedQueueId: string): Promise { + if (!databaseReady) return true; + const claimed = await sql.begin(async (client) => await upsertTaskToDatabase(client, task, { claimQueueId: expectedQueueId })); + if (claimed) return true; + const databaseTask = await loadTaskFromDatabase(task.id); + if (databaseTask !== null) reconcileHotTaskFromDatabase(databaseTask); + logger("warn", "task_claim_conflict", { + taskId: task.id, + expectedQueueId, + attemptedQueueId: queueIdOf(task), + attemptedStatus: task.status, + attemptedCurrentAttempt: task.currentAttempt, + }); + return false; +} + +async function runDatabaseClaimMoveSelfTest(): Promise { + if (!databaseReady) return null; + const suffix = String(Date.now()); + const taskId = `codex_claim_move_db_${suffix}`; + const queuedAt = nowIso(); + const sourceQueueId = `claim_move_db_source_${suffix}`; + const targetQueueId = `claim_move_db_target_${suffix}`; + const before = state.tasks.slice(); + const beforeQueues = state.queues.slice(); + await deleteTaskFromDatabase(taskId); + try { + const queuedTask = normalizeTask({ + ...createTask({ prompt: "claim/move DB race self-test", queueId: sourceQueueId }), + id: taskId, + queueId: sourceQueueId, + queueEnteredAt: queuedAt, + createdAt: queuedAt, + updatedAt: queuedAt, + output: [], + }); + await sql.begin(async (client) => { + await upsertQueueToDatabase(client, { id: sourceQueueId, name: sourceQueueId, createdAt: queuedAt, updatedAt: queuedAt }); + await upsertTaskToDatabase(client, queuedTask); + }); + const staleHotTask = normalizeTask(JSON.parse(JSON.stringify(queuedTask)) as QueueTask); + const claimedTask = normalizeTask(JSON.parse(JSON.stringify(queuedTask)) as QueueTask); + const claimedAt = nowIso(); + claimedTask.status = "running"; + claimedTask.startedAt = claimedAt; + claimedTask.currentAttempt = 1; + claimedTask.currentMode = "initial"; + claimedTask.updatedAt = claimedAt; + const claimed = await claimTaskInDatabase(claimedTask, sourceQueueId); + if (!claimed) throw new Error("database claim self-test failed to claim queued task"); + state.tasks.splice(0, state.tasks.length, staleHotTask); + const response = await moveTaskToQueue(staleHotTask, new Request(`http://code-queue.local/api/tasks/${taskId}/move`, { + method: "POST", + body: JSON.stringify({ queueId: targetQueueId }), + headers: { "content-type": "application/json" }, + })); + const after = await loadTaskFromDatabase(taskId); + const body = await response.json() as Record; + if (response.status !== 409) throw new Error(`database stale move should return 409, got ${response.status}`); + if (after === null) throw new Error("database self-test task disappeared after stale move"); + if (after.status !== "running") throw new Error(`database self-test task status changed to ${after.status}`); + if (queueIdOf(after) !== sourceQueueId) throw new Error(`database self-test task queue changed to ${queueIdOf(after)}`); + if (after.currentAttempt !== 1 || after.startedAt === null) throw new Error("database self-test task claim markers were lost"); + return { + ok: true, + taskId, + moveStatus: response.status, + databaseStatus: after.status, + databaseQueueId: queueIdOf(after), + currentAttempt: after.currentAttempt, + startedAt: after.startedAt, + response: body as JsonValue, + } as unknown as JsonValue; + } finally { + await deleteTaskFromDatabase(taskId); + await deleteDatabaseQueues([sourceQueueId, targetQueueId]); + state.tasks.splice(0, state.tasks.length, ...before); + state.queues.splice(0, state.queues.length, ...beforeQueues); + } +} + async function loadPrunedDatabaseTaskRows(where: "all" | "hot"): Promise { return await sql` SELECT id, updated_at, status, read_at, task_json @@ -1674,12 +1897,21 @@ function rememberHotTask(task: QueueTask): QueueTask { } async function findTaskForRead(taskId: string): Promise { - return findTask(taskId) ?? await loadTaskFromDatabase(taskId); + const hotTask = findTask(taskId); + if (!databaseReady) return hotTask; + const databaseTask = await loadTaskFromDatabase(taskId); + if (hotTask === null) return databaseTask; + if (databaseTask === null) return hotTask; + return shouldPreferHotTaskOverDatabase(hotTask, databaseTask) ? hotTask : databaseTask; } async function findTaskForMutation(taskId: string): Promise { - const task = findTask(taskId) ?? await loadTaskFromDatabase(taskId); - return task === null ? null : rememberHotTask(task); + const hotTask = findTask(taskId); + if (!databaseReady) return hotTask; + const databaseTask = await loadTaskFromDatabase(taskId); + if (databaseTask === null) return hotTask; + if (hotTask === null) return rememberHotTask(databaseTask); + return shouldPreferHotTaskOverDatabase(hotTask, databaseTask) ? hotTask : reconcileHotTaskFromDatabase(databaseTask); } async function loadNextSeqFromDatabase(): Promise { @@ -1703,6 +1935,7 @@ async function flushDirtyTasksToDatabase(force = false): Promise { dirtyDatabaseTaskIds.clear(); dirtyDatabaseQueueIds.clear(); databaseFlushInFlight = true; + const rejectedTaskIds: string[] = []; try { await sql.begin(async (client) => { for (const id of queueIds) { @@ -1711,7 +1944,7 @@ async function flushDirtyTasksToDatabase(force = false): Promise { } for (const id of ids) { const task = state.tasks.find((item) => item.id === id); - if (task !== undefined) await upsertTaskToDatabase(client, task); + if (task !== undefined && !await upsertTaskToDatabase(client, task)) rejectedTaskIds.push(id); } }); databaseLastError = null; @@ -1723,6 +1956,10 @@ async function flushDirtyTasksToDatabase(force = false): Promise { databaseFlushInFlight = false; if (dirtyDatabaseTaskIds.size > 0 || dirtyDatabaseQueueIds.size > 0) scheduleDatabaseFlush(); } + for (const id of rejectedTaskIds) { + const databaseTask = await loadTaskFromDatabase(id); + if (databaseTask !== null) reconcileHotTaskFromDatabase(databaseTask); + } } async function initDatabasePersistence(): Promise { @@ -2446,6 +2683,7 @@ configureSelfTests({ defaultQueueId, enqueueActiveRunSlotWaiter, injectReferencedTaskContext, + moveTaskToQueueForTest: (task, req) => moveTaskToQueue(task, req, { bypassRoleCheck: true }), nextRunnableTaskFrom, normalizeTask, nowIso, @@ -2454,6 +2692,8 @@ configureSelfTests({ queuedStatusReason, removeActiveRunSlotWaiter, resolveReasoningEffort, + runDatabaseClaimMoveSelfTest, + tasks: () => state.tasks, updateProcessingFlag, }); @@ -2979,7 +3219,8 @@ function failTaskForFallbackRetryLimit(task: QueueTask, judge: JudgeResult | nul } async function runTask(task: QueueTask): Promise { - logger("info", "task_processor_start", { taskId: task.id, queueId: queueIdOf(task), providerId: task.providerId, executionMode: task.executionMode, cwd: task.cwd, maxAttempts: task.maxAttempts, model: task.model, agentPort: codeAgentPortForModel(task.model), promptPreview: safePreview(task.prompt, 240) }); + const claimQueueId = queueIdOf(task); + logger("info", "task_processor_start", { taskId: task.id, queueId: claimQueueId, providerId: task.providerId, executionMode: task.executionMode, cwd: task.cwd, maxAttempts: task.maxAttempts, model: task.model, agentPort: codeAgentPortForModel(task.model), promptPreview: safePreview(task.prompt, 240) }); if (task.status === "retry_wait" && task.lastJudge?.source === "fallback" && task.lastJudge.decision === "retry" && fallbackJudgeRetryCount(task) >= fallbackJudgeRetryLimit) { failTaskForFallbackRetryLimit(task, task.lastJudge); return; @@ -3010,6 +3251,11 @@ async function runTask(task: QueueTask): Promise { task.readAt = null; task.finishedAt = null; task.updatedAt = startedAt; + if (!await claimTaskInDatabase(task, claimQueueId)) { + releaseRunSlot(); + return; + } + publishTaskOaEvent(task, "claim"); logger("info", "task_run_start", { taskId: task.id, queueId: queueIdOf(task), attempt: task.currentAttempt, mode, providerId: task.providerId, executionMode: task.executionMode, cwd: task.cwd, maxAttempts: task.maxAttempts, model: task.model, agentPort: codeAgentPortForModel(task.model), freshRecovery: needsFreshRecoveryPrompt }); const attemptStartOutput = appendOutput(task, "system", `attempt ${task.currentAttempt}/${task.maxAttempts} queue=${queueIdOf(task)} provider=${task.providerId} executionMode=${task.executionMode} cwd=${task.cwd} mode=${mode} model=${task.model} port=${codeAgentPortForModel(task.model)}\n`, "queue"); @@ -3997,7 +4243,9 @@ function queueMergeBlocker(queueId: string): string | null { if (activeRunSlotReservations.has(queueId)) return "queue is reserving an active run slot"; if (activeRunSlotWaiters.some((waiter) => waiter.queueId === queueId)) return "queue is waiting for an active run slot"; const activeTask = state.tasks.find((task) => queueIdOf(task) === queueId && (task.status === "running" || task.status === "judging")); - return activeTask === undefined ? null : `task ${activeTask.id} is ${activeTask.status}`; + if (activeTask !== undefined) return `task ${activeTask.id} is ${activeTask.status}`; + const claimedPendingTask = state.tasks.find((task) => queueIdOf(task) === queueId && (task.status === "queued" || task.status === "retry_wait") && !taskIsUnclaimedMovable(task)); + return claimedPendingTask === undefined ? null : `task ${claimedPendingTask.id} has already been claimed`; } function parseSourceQueueIds(record: Record, targetQueueId: string): string[] { @@ -4020,27 +4268,117 @@ function parseSourceQueueIds(record: Record, targetQueueId: str return ids; } -async function mergeDatabaseQueueTasks(sourceQueueIds: string[], targetQueueId: string): Promise { - if (!databaseReady || sourceQueueIds.length === 0) return []; - const rows = await sql>` - UPDATE unidesk_code_queue_tasks - SET - queue_id = ${targetQueueId}, - task_json = jsonb_set( - jsonb_set( - task_json, - '{queueId}', - to_jsonb(${targetQueueId}::text), +async function mergeDatabaseQueueTasks(sourceQueueIds: string[], targetQueueId: string, mergedAt: string): Promise<{ movedTaskIds: string[]; blocker: DatabaseTaskStatusRow | null }> { + if (!databaseReady || sourceQueueIds.length === 0) return { movedTaskIds: [], blocker: null }; + return await sql.begin(async (client) => { + const mergeQueueIds = Array.from(new Set([targetQueueId, ...sourceQueueIds])); + const lockedRows = await client` + SELECT id, queue_id, status, started_at, current_attempt, codex_thread_id, active_turn_id + FROM unidesk_code_queue_tasks + WHERE queue_id IN ${client(mergeQueueIds)} + ORDER BY updated_at DESC, id DESC + FOR UPDATE + `; + const blocker = lockedRows.find((row) => { + return row.status === "running" + || row.status === "judging" + || ( + (row.status === "queued" || row.status === "retry_wait") + && ( + row.started_at !== null + || Number(row.current_attempt ?? 0) > 0 + || row.codex_thread_id !== null + || row.active_turn_id !== null + ) + ); + }) ?? null; + if (blocker !== null) return { movedTaskIds: [], blocker }; + const rows = await client>` + UPDATE unidesk_code_queue_tasks + SET + queue_id = ${targetQueueId}, + updated_at = ${mergedAt}, + task_json = jsonb_set( + jsonb_set( + jsonb_set( + task_json, + '{queueId}', + to_jsonb(${targetQueueId}::text), + true + ), + '{queueEnteredAt}', + to_jsonb(COALESCE(NULLIF(task_json->>'queueEnteredAt', ''), to_char(created_at AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.MS"Z"'))::text), + true + ), + '{updatedAt}', + to_jsonb(${mergedAt}::text), true - ), - '{queueEnteredAt}', - to_jsonb(COALESCE(NULLIF(task_json->>'queueEnteredAt', ''), to_char(created_at AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.MS"Z"'))::text), - true - ) - WHERE queue_id IN ${sql(sourceQueueIds)} - RETURNING id - `; - return rows.map((row) => row.id); + ) + WHERE queue_id IN ${client(sourceQueueIds)} + AND ( + status IN ('succeeded', 'failed', 'canceled') + OR ( + status IN ('queued', 'retry_wait') + AND started_at IS NULL + AND current_attempt = 0 + AND codex_thread_id IS NULL + AND active_turn_id IS NULL + ) + ) + RETURNING id + `; + return { movedTaskIds: rows.map((row) => row.id), blocker: null }; + }); +} + +async function moveDatabaseTaskToQueue(taskId: string, targetQueueId: string, movedAt: string): Promise<{ ok: boolean; row: DatabaseTaskStatusRow | null; previousQueueId: string | null; blocker: string }> { + if (!databaseReady) return { ok: true, row: null, previousQueueId: null, blocker: "" }; + return await sql.begin(async (client) => { + const rows = await client` + SELECT id, queue_id, status, started_at, current_attempt, codex_thread_id, active_turn_id + FROM unidesk_code_queue_tasks + WHERE id = ${taskId} + LIMIT 1 + FOR UPDATE + `; + const row = rows[0] ?? null; + const blocker = databaseTaskMoveBlocker(row); + if (blocker.length > 0) return { ok: false, row, previousQueueId: row === null ? null : safeQueueId(row.queue_id), blocker }; + const previousQueueId = safeQueueId(row?.queue_id); + const updated = await client` + UPDATE unidesk_code_queue_tasks + SET + queue_id = ${targetQueueId}, + updated_at = ${movedAt}, + task_json = jsonb_set( + jsonb_set( + jsonb_set( + task_json, + '{queueId}', + to_jsonb(${targetQueueId}::text), + true + ), + '{queueEnteredAt}', + to_jsonb(${movedAt}::text), + true + ), + '{updatedAt}', + to_jsonb(${movedAt}::text), + true + ) + WHERE id = ${taskId} + AND status IN ('queued', 'retry_wait') + AND started_at IS NULL + AND current_attempt = 0 + AND codex_thread_id IS NULL + AND active_turn_id IS NULL + RETURNING id, queue_id, status, started_at, current_attempt, codex_thread_id, active_turn_id + `; + const updatedRow = updated[0] ?? null; + return updatedRow === null + ? { ok: false, row, previousQueueId, blocker: "conditional update matched no rows" } + : { ok: true, row: updatedRow, previousQueueId, blocker: "" }; + }); } async function deleteDatabaseQueues(queueIds: string[]): Promise { @@ -4097,6 +4435,17 @@ async function mergeQueues(targetQueueIdValue: string | null, req: Request): Pro } const mergedAt = nowIso(); + const databaseMerge = await mergeDatabaseQueueTasks(sourceQueueIds, targetQueueId, mergedAt); + if (databaseMerge.blocker !== null) { + const blockerQueueId = safeQueueId(databaseMerge.blocker.queue_id); + const databaseTask = await loadTaskFromDatabase(databaseMerge.blocker.id); + if (databaseTask !== null) reconcileHotTaskFromDatabase(databaseTask); + return jsonResponse({ + ok: false, + error: `cannot merge queue ${blockerQueueId}: task ${databaseMerge.blocker.id} is already claimed (${databaseTaskMoveBlocker(databaseMerge.blocker) || databaseMerge.blocker.status})`, + blocker: databaseStatusRowJson(databaseMerge.blocker), + }, 409); + } const targetQueue = ensureQueue(targetQueueId); const sourceQueues = sourceQueueIds.map((id) => queueSnapshot(id, mergedAt)); targetQueue.updatedAt = mergedAt; @@ -4109,11 +4458,11 @@ async function mergeQueues(targetQueueIdValue: string | null, req: Request): Pro if (!sourceSet.has(previousQueueId)) continue; task.queueEnteredAt = taskQueueEnteredAt(task); task.queueId = targetQueueId; + task.updatedAt = mergedAt; hotMovedTasks.push(task); markTaskDirty(task.id); publishTaskOaEvent(task, "queue-merged"); } - const databaseMovedTaskIds = await mergeDatabaseQueueTasks(sourceQueueIds, targetQueueId); const deletedSourceQueues = deleteQueuesFromState(sourceQueueIds); const databaseDeletedQueueIds = await deleteDatabaseQueues(sourceQueueIds); persistState(false); @@ -4125,14 +4474,14 @@ async function mergeQueues(targetQueueIdValue: string | null, req: Request): Pro sourceQueueIds, deletedSourceQueueIds: deletedSourceQueues.map((queue) => queue.id), hotMovedTaskCount: hotMovedTasks.length, - databaseMovedTaskCount: databaseReady ? databaseMovedTaskIds.length : null, + databaseMovedTaskCount: databaseReady ? databaseMerge.movedTaskIds.length : null, databaseDeletedQueueIds: databaseReady ? databaseDeletedQueueIds : null, }); for (const id of mergeQueueIds) mergingQueues.delete(id); scheduleQueue(targetQueueId); await flushDirtyTasksToDatabase(true); const tasks = await loadAllTasksForRead(); - const movedIdSet = new Set(databaseReady ? databaseMovedTaskIds : hotMovedTasks.map((task) => task.id)); + const movedIdSet = new Set(databaseReady ? databaseMerge.movedTaskIds : hotMovedTasks.map((task) => task.id)); const orderedMovedTaskIds = tasks .filter((task) => movedIdSet.has(task.id)) .sort(compareTaskQueueOrder) @@ -4145,7 +4494,7 @@ async function mergeQueues(targetQueueIdValue: string | null, req: Request): Pro ok: true, targetQueueId, sourceQueueIds, - mergedTaskCount: databaseReady ? databaseMovedTaskIds.length : hotMovedTasks.length, + mergedTaskCount: databaseReady ? databaseMerge.movedTaskIds.length : hotMovedTasks.length, movedTaskIds: orderedMovedTaskIds.slice(0, 500), targetTaskOrder: targetTaskOrder.slice(0, 500), order: "merged tasks keep their original queueEnteredAt/createdAt ordering; source queue records are deleted after merge", @@ -4161,17 +4510,37 @@ async function mergeQueues(targetQueueIdValue: string | null, req: Request): Pro } } -async function moveTaskToQueue(task: QueueTask, req: Request): Promise { - if (!serviceRoleAllowsWrite(config.serviceRole)) return readOnlyRejectResponse(req.method, `/api/tasks/${task.id}/move`); - if (task.status === "running" || task.status === "judging") { - return jsonResponse({ ok: false, error: `cannot move active task ${task.id} while status=${task.status}`, task: taskForResponse(task) }, 409); - } +async function moveTaskToQueue(task: QueueTask, req: Request, options: { bypassRoleCheck?: boolean } = {}): Promise { + if (options.bypassRoleCheck !== true && !serviceRoleAllowsWrite(config.serviceRole)) return readOnlyRejectResponse(req.method, `/api/tasks/${task.id}/move`); const body = await readJson(req); const record = typeof body === "object" && body !== null && !Array.isArray(body) ? body as Record : {}; const queueId = normalizeQueueId(record.queueId ?? record.id); - const previousQueueId = queueIdOf(task); - const queue = ensureQueue(queueId); const movedAt = nowIso(); + const hotBlocker = taskMoveBlocker(task); + if (hotBlocker.length > 0) { + const databaseTask = databaseReady ? await loadTaskFromDatabase(task.id) : null; + if (databaseTask !== null) task = reconcileHotTaskFromDatabase(databaseTask); + return jsonResponse({ + ok: false, + error: `cannot move task ${task.id}: ${hotBlocker}`, + task: taskForResponse(task), + databaseTask: databaseTask === null ? null : taskForResponse(databaseTask), + }, 409); + } + const databaseMove = await moveDatabaseTaskToQueue(task.id, queueId, movedAt); + if (!databaseMove.ok) { + const databaseTask = databaseReady ? await loadTaskFromDatabase(task.id) : null; + if (databaseTask !== null) task = reconcileHotTaskFromDatabase(databaseTask); + return jsonResponse({ + ok: false, + error: `cannot move task ${task.id}: ${databaseMove.blocker}`, + blocker: databaseStatusRowJson(databaseMove.row), + task: taskForResponse(task), + databaseTask: databaseTask === null ? null : taskForResponse(databaseTask), + }, databaseMove.row === null ? 404 : 409); + } + const previousQueueId = databaseMove.previousQueueId ?? queueIdOf(task); + const queue = ensureQueue(queueId); queue.updatedAt = movedAt; markQueueDirty(queue.id); task.queueId = queueId; @@ -4301,6 +4670,7 @@ async function route(req: Request): Promise { if (url.pathname === "/api/judge/probe" && (req.method === "GET" || req.method === "POST")) return await runJudgeProbe(); if (url.pathname === "/api/judge/self-test" && (req.method === "GET" || req.method === "POST")) return jsonResponse(runJudgeInfraSelfTest()); if (url.pathname === "/api/queue-order/self-test" && (req.method === "GET" || req.method === "POST")) return jsonResponse(runQueueOrderingSelfTest()); + if (url.pathname === "/api/queue-claim-move/self-test" && (req.method === "GET" || req.method === "POST")) return jsonResponse(await runQueueClaimMoveSelfTest()); if (url.pathname === "/api/reference-injection/self-test" && (req.method === "GET" || req.method === "POST")) return jsonResponse(await runReferenceInjectionSelfTest()); if (url.pathname === "/api/trace-port/self-test" && (req.method === "GET" || req.method === "POST")) return jsonResponse(runTracePortSelfTest()); if (url.pathname === "/api/oa/backfill" && (req.method === "GET" || req.method === "POST")) return jsonResponse(await backfillOaTraceStats(url)); diff --git a/src/components/microservices/code-queue/src/queue-api.ts b/src/components/microservices/code-queue/src/queue-api.ts index 82a99661..442c6ed9 100644 --- a/src/components/microservices/code-queue/src/queue-api.ts +++ b/src/components/microservices/code-queue/src/queue-api.ts @@ -444,7 +444,8 @@ async function loadAllTasksForRead(): Promise { const tasks = await ctx().loadTasksFromDatabase("all"); const byId = new Map(tasks.map((task) => [task.id, task])); for (const active of ctx().tasks()) { - byId.set(active.id, active); + const databaseTask = byId.get(active.id); + if (databaseTask === undefined || preferHotTaskForRead(active, databaseTask)) byId.set(active.id, active); } ctx().runGarbageCollection(); return Array.from(byId.values()).sort((left, right) => (timestampMs(left.createdAt) ?? 0) - (timestampMs(right.createdAt) ?? 0) || left.id.localeCompare(right.id)); @@ -581,6 +582,30 @@ function activePriority(task: QueueTask): number { return statusRank[task.status] ?? 9; } +function taskHasClaimMarkers(task: QueueTask): boolean { + return task.status === "running" + || task.status === "judging" + || task.startedAt !== null + || task.currentAttempt > 0 + || task.codexThreadId !== null + || task.activeTurnId !== null; +} + +function taskIsUnclaimedQueued(task: QueueTask): boolean { + return (task.status === "queued" || task.status === "retry_wait") + && task.startedAt === null + && task.currentAttempt === 0 + && task.codexThreadId === null + && task.activeTurnId === null; +} + +function preferHotTaskForRead(hotTask: QueueTask, databaseTask: QueueTask): boolean { + const hotActiveRun = Array.from(ctx().activeRuns.values()).some((run) => run.taskId === hotTask.id); + if (hotActiveRun) return true; + if (taskIsUnclaimedQueued(hotTask) && taskHasClaimMarkers(databaseTask)) return false; + return taskUpdatedSortValue(hotTask) >= taskUpdatedSortValue(databaseTask); +} + function taskUpdatedSortValue(task: QueueTask): number { const time = Date.parse(task.updatedAt || task.createdAt); return Number.isFinite(time) ? time : 0; @@ -1051,7 +1076,8 @@ async function databaseTasksOverviewResponse(url: URL): Promise const byId = new Map(); for (const task of loadedTasks) byId.set(task.id, task); for (const task of ctx().tasks()) { - if (seenIds.has(task.id) || byId.has(task.id)) byId.set(task.id, task); + const databaseTask = byId.get(task.id); + if ((seenIds.has(task.id) || databaseTask !== undefined) && (databaseTask === undefined || preferHotTaskForRead(task, databaseTask))) byId.set(task.id, task); } const rowsSource = orderedIds .map((id) => byId.get(id) ?? null) @@ -1063,8 +1089,13 @@ async function databaseTasksOverviewResponse(url: URL): Promise let selectedTask: QueueTask | null = null; for (const id of selectedCandidates) { if (id.length === 0) continue; - const candidate = ctx().tasks().find((task) => task.id === id) - ?? await ctx().loadTaskFromDatabase(id); + const hotCandidate = ctx().tasks().find((task) => task.id === id) ?? null; + const databaseCandidate = await ctx().loadTaskFromDatabase(id); + const candidate = hotCandidate === null + ? databaseCandidate + : databaseCandidate === null || preferHotTaskForRead(hotCandidate, databaseCandidate) + ? hotCandidate + : databaseCandidate; if (candidate !== null && taskMatchesQueueFilter(candidate, queueId)) { selectedTask = candidate; break; diff --git a/src/components/microservices/code-queue/src/self-tests.ts b/src/components/microservices/code-queue/src/self-tests.ts index 76c9e44e..82d40cc9 100644 --- a/src/components/microservices/code-queue/src/self-tests.ts +++ b/src/components/microservices/code-queue/src/self-tests.ts @@ -17,6 +17,7 @@ export interface SelfTestsContext { defaultQueueId: string; enqueueActiveRunSlotWaiter: (task: QueueTask) => ActiveRunSlotWaiter; injectReferencedTaskContext: (request: QueueTaskRequest, finder?: (id: string) => QueueTask | null | Promise, injectedAt?: string) => Promise; + moveTaskToQueueForTest: (task: QueueTask, req: Request) => Promise; nextRunnableTaskFrom: (queueId: string, tasks?: QueueTask[]) => QueueTask | null; normalizeTask: (task: QueueTask) => QueueTask; nowIso: () => string; @@ -25,6 +26,8 @@ export interface SelfTestsContext { queuedStatusReason: (task: QueueTask, tasks?: QueueTask[]) => QueuedStatusReason | null; removeActiveRunSlotWaiter: (waiter: ActiveRunSlotWaiter) => void; resolveReasoningEffort: (model: string, explicit?: string | null) => string | null; + runDatabaseClaimMoveSelfTest?: () => Promise; + tasks: () => QueueTask[]; updateProcessingFlag: () => void; } @@ -192,6 +195,45 @@ function queueOrderTestTask(id: string, status: TaskStatus, createdAt: string, q return ctx().normalizeTask(task); } +async function runQueueClaimMoveSelfTest(): Promise { + const at = "2026-05-17T06:09:46.702Z"; + const task = queueOrderTestTask("codex_claim_move_self_test", "running", at, at); + task.queueId = "claim_move_source"; + task.startedAt = at; + task.currentAttempt = 1; + task.currentMode = "initial"; + task.codexThreadId = "thread_claim_move_self_test"; + task.activeTurnId = "turn_claim_move_self_test"; + task.updatedAt = at; + const before = ctx().tasks().slice(); + ctx().tasks().push(task); + try { + const response = await ctx().moveTaskToQueueForTest(task, new Request("http://code-queue.local/api/tasks/codex_claim_move_self_test/move", { + method: "POST", + body: JSON.stringify({ queueId: "claim_move_target" }), + headers: { "content-type": "application/json" }, + })); + const body = await response.json() as Record; + assertReferenceTest(response.status === 409, "moving a claimed/running task must return 409"); + assertReferenceTest(task.queueId === "claim_move_source", "running task queueId must remain unchanged after rejected move"); + assertReferenceTest(task.status === "running", "running task status must remain running after rejected move"); + assertReferenceTest(task.currentAttempt === 1, "running task currentAttempt must remain claimed after rejected move"); + const databaseRace = await ctx().runDatabaseClaimMoveSelfTest?.() ?? null; + return { + ok: true, + cases: [ + { name: "move_running_task_returns_409", ok: true, status: response.status }, + { name: "rejected_move_preserves_queue", ok: true, queueId: task.queueId }, + { name: "rejected_move_preserves_claim_markers", ok: true, status: task.status, currentAttempt: task.currentAttempt, startedAt: task.startedAt }, + ...(databaseRace === null ? [] : [{ name: "database_claim_blocks_stale_move", ok: true, result: databaseRace }]), + ], + response: body as JsonValue, + }; + } finally { + ctx().tasks().splice(0, ctx().tasks().length, ...before); + } +} + function runQueueOrderingSelfTest(): JsonValue { const activeRetry = queueOrderTestTask("codex_4000_active", "retry_wait", "2026-05-11T09:00:00.000Z", "2026-05-11T09:00:00.000Z"); const movedOlderCreated = queueOrderTestTask("codex_3999_moved", "queued", "2026-05-11T08:00:00.000Z", "2026-05-11T08:00:00.000Z") as QueueTask & { queueEnteredAt?: string }; @@ -476,4 +518,4 @@ function runJudgeInfraSelfTest(): JsonValue { }; } -export { runJudgeInfraSelfTest, runQueueOrderingSelfTest, runReferenceInjectionSelfTest, runTracePortSelfTest }; +export { runJudgeInfraSelfTest, runQueueClaimMoveSelfTest, runQueueOrderingSelfTest, runReferenceInjectionSelfTest, runTracePortSelfTest };