diff --git a/docs/reference/microservices.md b/docs/reference/microservices.md index 0fb63560..562a6655 100644 --- a/docs/reference/microservices.md +++ b/docs/reference/microservices.md @@ -157,6 +157,7 @@ Baidu Netdisk 在 UniDesk 语境中按纯后端服务管理:不得暴露百度 - Orchestrator:稳定 `code-queue` ID 的控制/读取路径由 backend-core 分流到 `deployment.mode=internal-sidecar` 的 `code-queue-mgr`;D601 执行面仍登记为 `deployment.mode=k3sctl-managed`,`deployment.adapterServiceId=k3sctl-adapter`,`deployment.k3sServiceId=code-queue`,`backend.proxyMode=k3sctl-adapter-http`,`backend.nodeBaseUrl=k3s://code-queue`。对外登记的 `code-queue` ID 保持稳定,frontend/CLI 不需要知道内部拆分。 - Direct path ban:`code-queue` 不得再登记 `http://code-queue:4222`、`http://host.docker.internal:4222`、NodePort 或 provider-gateway `microservice.http` 作为业务代理目标;frontend 也不得使用旧 `/api/code-queue-direct` 兼容别名作为 Code Queue 页面数据源。provider-gateway 只允许用于维护 D601/D518、部署 adapter、部署 k3s/k8s 节点或诊断节点本机容器。 +- Claim/move consistency:master `code-queue-mgr` 和 D601 scheduler 都必须以 PostgreSQL 状态为权威;move 只允许未 claim 的 `queued`/`retry_wait` 行,merge 在 source/target queue 存在 `running`/`judging` 或 claim marker 时整体返回 409。稳定 `code-queue` 代理可用 `/api/queue-claim-move/self-test` 验证 claimed task move 会被拒绝且数据库仍保持 running/source queue 状态。 - D601 Service boundary:D601 内部可以继续保留 `code-queue-read`、`code-queue-write` 和 `code-queue-scheduler` 三个 Kubernetes Service 作为执行面兼容和过渡对象,但普通提交、queue CRUD、history、readAt 和轻量 overview 不得依赖 `code-queue-write` 或 D601 egress 可用;`code-queue-write` 不 ready 时,主 server `code-queue-mgr` 仍应保证 CLI/WebUI 的提交、列表和历史读取可用。需要 active run、dev-container、judge 或执行面健康的路径才进入 D601 scheduler。 - 服务拆分语义:`code-queue-read` 只承载 GET/HEAD 查询、overview、任务详情、Trace/output/transcript、统计和只读健康,可多副本滚动更新;它必须设置 `CODE_QUEUE_SERVICE_ROLE=read` 与 `CODE_QUEUE_SCHEDULER_ENABLED=false`,且不得接受入队、queue 变更、已读、重试、移动、追加 prompt 或打断这类 mutation。`code-queue-write` 承载入队、queue 创建/合并/更新、已读、手动重试、移动等命令写入,初期保持单副本和 `CODE_QUEUE_SERVICE_ROLE=write`,只把命令和任务状态写入 PostgreSQL,不启动 agent 子进程。`code-queue-scheduler` 是唯一拥有 scheduler 和 active run 的执行服务,设置 `CODE_QUEUE_SERVICE_ROLE=scheduler` 与 `CODE_QUEUE_SCHEDULER_ENABLED=true`,负责从 PostgreSQL 热任务集轮询新写入任务、推进队列、启动 Codex/OpenCode、处理 running task 的 steer/interrupt、发送终态通知和暴露执行端 `/health`。普通 Service 负载均衡不得把 mutation 打到 read,也不得把 running task 控制打到 write。 - 实例语义:D601 是当前唯一 active 执行节点,`code-queue-scheduler` 以一个 scheduler Pod 承载长生命周期 Codex/OpenCode 子进程并轮询主 PostgreSQL 中由 `code-queue-mgr` 写入的 queued/retry_wait 任务。D518 不属于当前 Code Queue k3s 拓扑;在没有原生 k3s-agent 与稳定 Kubernetes 网络前,不得把 D518 写回 `expectedNodeIds` 或恢复 `code-queue-d518` standby。D601 scheduler 默认关闭 `CODE_QUEUE_STARTUP_OA_BACKFILL_ENABLED`;历史 OA Trace/STEP 回填必须通过显式 `/api/oa/backfill` 运维动作触发,不能在每次 Pod 重启时自动批量发布旧事件。 diff --git a/src/components/microservices/code-queue-mgr/src/index.ts b/src/components/microservices/code-queue-mgr/src/index.ts index 659d9c56..2ccc625c 100644 --- a/src/components/microservices/code-queue-mgr/src/index.ts +++ b/src/components/microservices/code-queue-mgr/src/index.ts @@ -205,6 +205,21 @@ interface TaskRow { task_json: unknown; } +type TaskStatusRow = Pick; +type TaskJsonRow = Pick & Partial>; + interface QueueRow { id: string; name: string; @@ -988,8 +1003,58 @@ function queueIdOf(task: QueueTask): string { return safeQueueId(task.queueId); } -function rowToTask(row: Pick): QueueTask { - return normalizeTask(row.task_json); +function rowToTask(row: TaskJsonRow): QueueTask { + const task = normalizeTask(row.task_json); + if (typeof row.id === "string" && row.id.length > 0) task.id = row.id; + if (typeof row.queue_id === "string" && row.queue_id.length > 0) task.queueId = safeQueueId(row.queue_id); + if (typeof row.status === "string" && row.status.length > 0) task.status = row.status as TaskStatus; + if (row.current_attempt !== undefined) task.currentAttempt = Number(row.current_attempt ?? 0); + if (row.current_mode !== undefined) task.currentMode = row.current_mode ?? null; + if (row.codex_thread_id !== undefined) task.codexThreadId = row.codex_thread_id ?? null; + if (row.active_turn_id !== undefined) task.activeTurnId = row.active_turn_id ?? null; + if (row.updated_at !== undefined) task.updatedAt = timestampToIso(row.updated_at) ?? task.updatedAt; + if (row.started_at !== undefined) task.startedAt = timestampToIso(row.started_at); + if (row.finished_at !== undefined) task.finishedAt = timestampToIso(row.finished_at); + if (row.read_at !== undefined) task.readAt = timestampToIso(row.read_at); + return task; +} + +function taskStatusRowJson(row: TaskStatusRow | null): JsonValue { + if (row === null) return null; + return { + id: row.id, + queueId: safeQueueId(row.queue_id), + status: row.status, + startedAt: timestampToIso(row.started_at), + currentAttempt: Number(row.current_attempt ?? 0), + codexThreadId: row.codex_thread_id, + activeTurnId: row.active_turn_id, + }; +} + +function taskIsUnclaimedMovable(task: QueueTask): boolean { + return (task.status === "queued" || task.status === "retry_wait") + && task.startedAt === null + && task.currentAttempt === 0 + && task.codexThreadId === null + && task.activeTurnId === null; +} + +function taskStatusMoveBlocker(row: TaskStatusRow | null): string { + if (row === null) return "task not found"; + if (row.status !== "queued" && row.status !== "retry_wait") return `status=${row.status}`; + if (row.started_at !== null) return "task already has started_at"; + const currentAttempt = Number(row.current_attempt ?? 0); + if (currentAttempt !== 0) return `task already has current_attempt=${currentAttempt}`; + if (row.codex_thread_id !== null) return "task already has codex_thread_id"; + if (row.active_turn_id !== null) return "task already has active_turn_id"; + return ""; +} + +function queueMergeBlocker(row: TaskStatusRow): string { + if (row.status === "running" || row.status === "judging") return `task ${row.id} is ${row.status}`; + if ((row.status === "queued" || row.status === "retry_wait") && taskStatusMoveBlocker(row).length > 0) return `task ${row.id} has already been claimed`; + return ""; } function queueRowToRecord(row: QueueRow): QueueRecord { @@ -1086,6 +1151,7 @@ async function upsertQueue(queue: QueueRecord): Promise { } async function upsertTask(client: SqlExecutor, task: QueueTask): Promise { + const taskHasClaim = hasClaimMarkers(task); await client` INSERT INTO unidesk_code_queue_tasks ( id, queue_id, status, provider_id, execution_mode, model, cwd, prompt, base_prompt, @@ -1132,6 +1198,26 @@ async function upsertTask(client: SqlExecutor, task: QueueTask): Promise { last_output_seq = EXCLUDED.last_output_seq, task_json = EXCLUDED.task_json WHERE unidesk_code_queue_tasks.status NOT IN ('running', 'judging') + AND ( + ${taskHasClaim} + OR unidesk_code_queue_tasks.started_at IS NULL + OR unidesk_code_queue_tasks.started_at = EXCLUDED.started_at + ) + AND ( + ${taskHasClaim} + OR unidesk_code_queue_tasks.current_attempt = 0 + OR unidesk_code_queue_tasks.current_attempt = EXCLUDED.current_attempt + ) + AND ( + ${taskHasClaim} + OR unidesk_code_queue_tasks.codex_thread_id IS NULL + OR unidesk_code_queue_tasks.codex_thread_id = EXCLUDED.codex_thread_id + ) + AND ( + ${taskHasClaim} + OR unidesk_code_queue_tasks.active_turn_id IS NULL + OR unidesk_code_queue_tasks.active_turn_id = EXCLUDED.active_turn_id + ) `; } @@ -1150,8 +1236,20 @@ async function loadQueues(): Promise { } async function loadTask(taskId: string): Promise { - const rows = await traceSql>>` - SELECT task_json + const rows = await traceSql` + SELECT + id, + queue_id, + status, + current_attempt, + current_mode, + codex_thread_id, + active_turn_id, + updated_at, + started_at, + finished_at, + read_at, + task_json FROM unidesk_code_queue_tasks WHERE id = ${taskId} LIMIT 1 @@ -1166,8 +1264,21 @@ async function loadTasksForList(url: URL): Promise<{ tasks: QueueTask[]; total: const queueIdRaw = url.searchParams.get("queueId"); const queueId = queueIdRaw === null || queueIdRaw.length === 0 ? null : safeQueueId(queueIdRaw); const search = String(url.searchParams.get("q") ?? url.searchParams.get("search") ?? "").trim(); - const rows = await traceSql & { total_count: string | number }>>` - SELECT task_json, COUNT(*) OVER() AS total_count + const rows = await traceSql>` + SELECT + id, + queue_id, + status, + current_attempt, + current_mode, + codex_thread_id, + active_turn_id, + updated_at, + started_at, + finished_at, + read_at, + task_json, + COUNT(*) OVER() AS total_count FROM unidesk_code_queue_tasks WHERE (${status === null} OR status = ${status}) AND (${queueId === null} OR queue_id = ${queueId}) @@ -1185,8 +1296,20 @@ async function loadTasksForList(url: URL): Promise<{ tasks: QueueTask[]; total: async function loadTasksByIds(ids: string[]): Promise { const unique = Array.from(new Set(ids.map((id) => id.trim()).filter(Boolean))); if (unique.length === 0) return []; - const rows = await traceSql>>` - SELECT task_json + const rows = await traceSql` + SELECT + id, + queue_id, + status, + current_attempt, + current_mode, + codex_thread_id, + active_turn_id, + updated_at, + started_at, + finished_at, + read_at, + task_json FROM unidesk_code_queue_tasks WHERE id IN ${traceSql(unique)} `; @@ -1307,8 +1430,20 @@ async function queueSummary(tasks?: QueueTask[], queueRecords?: QueueRecord[]): } async function loadAllTasksLite(): Promise { - const rows = await traceSql>>` - SELECT task_json + const rows = await traceSql` + SELECT + id, + queue_id, + status, + current_attempt, + current_mode, + codex_thread_id, + active_turn_id, + updated_at, + started_at, + finished_at, + read_at, + task_json - 'output' - 'events' AS task_json FROM unidesk_code_queue_tasks ORDER BY created_at ASC, id ASC LIMIT 2000 @@ -1838,24 +1973,54 @@ async function mergeQueues(targetQueueIdValue: string | null, req: Request): Pro .filter((id) => id !== targetQueueId); if (sourceQueueIds.length === 0) return jsonResponse({ ok: false, error: "sourceQueueId is required" }, 400); const updatedAt = nowIso(); - await mgrSql.begin(async (client) => { + const result = await mgrSql.begin(async (client): Promise<{ movedTaskIds: string[]; blocker: TaskStatusRow | null }> => { + const mergeQueueIds = Array.from(new Set([targetQueueId, ...sourceQueueIds])); + const lockedRows = await client` + SELECT id, queue_id, status, started_at, current_attempt, codex_thread_id, active_turn_id + FROM unidesk_code_queue_tasks + WHERE queue_id IN ${client(mergeQueueIds)} + ORDER BY updated_at DESC, id DESC + FOR UPDATE + `; + const blocker = lockedRows.find((row) => queueMergeBlocker(row).length > 0) ?? null; + if (blocker !== null) return { movedTaskIds: [], blocker }; await client` INSERT INTO unidesk_code_queue_queues (id, name, created_at, updated_at) VALUES (${targetQueueId}, ${targetQueueId}, ${updatedAt}, ${updatedAt}) ON CONFLICT (id) DO UPDATE SET updated_at = EXCLUDED.updated_at `; - await client` + const movedRows = await client>` UPDATE unidesk_code_queue_tasks SET queue_id = ${targetQueueId}, updated_at = ${updatedAt}, task_json = jsonb_set(jsonb_set(task_json, '{queueId}', to_jsonb(${targetQueueId}::text), true), '{updatedAt}', to_jsonb(${updatedAt}::text), true) WHERE queue_id IN ${client(sourceQueueIds)} - AND status NOT IN ('running', 'judging') + AND ( + status IN ('succeeded', 'failed', 'canceled') + OR ( + status IN ('queued', 'retry_wait') + AND started_at IS NULL + AND current_attempt = 0 + AND codex_thread_id IS NULL + AND active_turn_id IS NULL + ) + ) + RETURNING id `; await client`DELETE FROM unidesk_code_queue_queues WHERE id IN ${client(sourceQueueIds)}`; + return { movedTaskIds: movedRows.map((row) => row.id), blocker: null }; }); - return jsonResponse({ ok: true, targetQueueId, sourceQueueIds, queue: await queueSummary() }, 202); + if (result.blocker !== null) { + const blocker = result.blocker; + const queueId = safeQueueId(blocker.queue_id); + return jsonResponse({ + ok: false, + error: `cannot merge queue ${queueId}: ${queueMergeBlocker(blocker)}`, + blocker: taskStatusRowJson(blocker), + }, 409); + } + return jsonResponse({ ok: true, targetQueueId, sourceQueueIds, movedTaskIds: result.movedTaskIds, queue: await queueSummary() }, 202); } async function markTaskRead(taskId: string): Promise { @@ -1900,32 +2065,155 @@ async function moveTask(taskId: string, req: Request): Promise { const body = asRecord(await readJson(req)) ?? {}; const queueId = normalizeQueueId(body.queueId ?? body.id); const movedAt = nowIso(); - const task = await mgrSql.begin(async (client): Promise => { + const result = await mgrSql.begin(async (client): Promise<{ task: QueueTask | null; blocker: string; row: TaskStatusRow | null }> => { + const rows = await client>` + SELECT + id, + queue_id, + status, + current_attempt, + current_mode, + codex_thread_id, + active_turn_id, + updated_at, + started_at, + finished_at, + read_at, + task_json + FROM unidesk_code_queue_tasks + WHERE id = ${taskId} + FOR UPDATE + `; + const row = rows[0] ?? null; + const blocker = taskStatusMoveBlocker(row); + if (row === null || blocker.length > 0) return { task: row === null ? null : rowToTask(row), blocker, row }; await client` INSERT INTO unidesk_code_queue_queues (id, name, created_at, updated_at) VALUES (${queueId}, ${queueId}, ${movedAt}, ${movedAt}) ON CONFLICT (id) DO UPDATE SET updated_at = EXCLUDED.updated_at `; - const rows = await client>>` - SELECT task_json - FROM unidesk_code_queue_tasks - WHERE id = ${taskId} - FOR UPDATE - `; - if (rows[0] === undefined) return null; - const nextTask = rowToTask(rows[0]); - if (nextTask.status === "running" || nextTask.status === "judging") return nextTask; + const previousQueueId = safeQueueId(row.queue_id); + const nextTask = rowToTask(row); nextTask.queueId = queueId; nextTask.queueEnteredAt = movedAt; nextTask.updatedAt = movedAt; - nextTask.output.push({ seq: outputMaxSeq(nextTask) + 1, at: movedAt, channel: "system", text: `moved to queue=${queueId}\n`, method: "queue/move" }); + nextTask.output.push({ seq: outputMaxSeq(nextTask) + 1, at: movedAt, channel: "system", text: `moved from queue=${previousQueueId} to queue=${queueId}\n`, method: "queue/move" }); nextTask.outputMaxSeq = outputMaxSeq(nextTask); - await upsertTask(client, nextTask); - return nextTask; + const updated = await client` + UPDATE unidesk_code_queue_tasks + SET + queue_id = ${queueId}, + updated_at = ${movedAt}, + output_count = ${nextTask.output.length}, + event_count = ${nextTask.events.length}, + attempt_count = ${nextTask.attempts.length}, + last_output_seq = ${outputMaxSeq(nextTask)}, + task_json = ${client.json(taskJson(nextTask) as unknown as postgres.JSONValue)} + WHERE id = ${taskId} + AND status IN ('queued', 'retry_wait') + AND started_at IS NULL + AND current_attempt = 0 + AND codex_thread_id IS NULL + AND active_turn_id IS NULL + RETURNING id, queue_id, status, started_at, current_attempt, codex_thread_id, active_turn_id + `; + if (updated[0] === undefined) return { task: rowToTask(row), blocker: "conditional update matched no rows", row }; + return { task: nextTask, blocker: "", row: updated[0] }; }); - if (task === null) return jsonResponse({ ok: false, error: "task not found" }, 404); - if (task.status === "running" || task.status === "judging") return jsonResponse({ ok: false, error: `cannot move active task while status=${task.status}`, task: taskListResponse(task) }, 409); - return jsonResponse({ ok: true, task: taskListResponse(task, false), queue: await queueSummary() }, 202); + if (result.task === null) return jsonResponse({ ok: false, error: "task not found" }, 404); + if (result.blocker.length > 0) { + return jsonResponse({ + ok: false, + error: `cannot move task ${taskId}: ${result.blocker}`, + blocker: taskStatusRowJson(result.row), + task: taskListResponse(result.task), + }, 409); + } + return jsonResponse({ ok: true, task: taskListResponse(result.task, false), blocker: taskStatusRowJson(result.row), queue: await queueSummary() }, 202); +} + +async function runQueueClaimMoveSelfTest(): Promise { + const suffix = String(Date.now()); + const taskId = `codex_mgr_claim_move_${suffix}`; + const sourceQueueId = `mgr_claim_move_source_${suffix}`; + const targetQueueId = `mgr_claim_move_target_${suffix}`; + const queuedAt = nowIso(); + await mgrSql`DELETE FROM unidesk_code_queue_tasks WHERE id = ${taskId}`; + try { + const queuedTask = normalizeTask(await createTaskFromRequest({ + prompt: "Code Queue manager claim/move race self-test", + queueId: sourceQueueId, + maxAttempts: 1, + })); + queuedTask.id = taskId; + queuedTask.queueId = sourceQueueId; + queuedTask.queueEnteredAt = queuedAt; + queuedTask.createdAt = queuedAt; + queuedTask.updatedAt = queuedAt; + await mgrSql.begin(async (client) => { + await client` + INSERT INTO unidesk_code_queue_queues (id, name, created_at, updated_at) + VALUES (${sourceQueueId}, ${sourceQueueId}, ${queuedAt}, ${queuedAt}) + ON CONFLICT (id) DO UPDATE SET updated_at = EXCLUDED.updated_at + `; + await upsertTask(client, queuedTask); + }); + + const claimedAt = nowIso(); + const claimedTask = normalizeTask(JSON.parse(JSON.stringify(queuedTask)) as QueueTask); + claimedTask.status = "running"; + claimedTask.startedAt = claimedAt; + claimedTask.currentAttempt = 1; + claimedTask.currentMode = "initial"; + claimedTask.codexThreadId = `thread_${suffix}`; + claimedTask.activeTurnId = `turn_${suffix}`; + claimedTask.updatedAt = claimedAt; + await mgrSql.begin(async (client) => { + await upsertTask(client, claimedTask); + }); + + const moveResponse = await moveTask(taskId, new Request(`http://code-queue-mgr.local/api/tasks/${taskId}/move`, { + method: "POST", + body: JSON.stringify({ queueId: targetQueueId }), + headers: { "content-type": "application/json" }, + })); + const moveBody = await moveResponse.json() as JsonRecord; + const rows = await mgrSql>` + SELECT id, queue_id, status, current_attempt, current_mode, codex_thread_id, active_turn_id, updated_at, started_at, finished_at, read_at, task_json + FROM unidesk_code_queue_tasks + WHERE id = ${taskId} + LIMIT 1 + `; + const after = rows[0] === undefined ? null : rowToTask(rows[0]); + const ok = moveResponse.status === 409 + && after !== null + && after.status === "running" + && queueIdOf(after) === sourceQueueId + && after.currentAttempt === 1 + && after.startedAt !== null + && after.codexThreadId !== null + && after.activeTurnId !== null; + if (!ok) { + throw new Error(`claim/move self-test failed: moveStatus=${moveResponse.status}, after=${JSON.stringify(after === null ? null : taskListResponse(after))}`); + } + return { + ok: true, + taskId, + sourceQueueId, + targetQueueId, + moveStatus: moveResponse.status, + databaseStatus: after.status, + databaseQueueId: queueIdOf(after), + currentAttempt: after.currentAttempt, + startedAt: after.startedAt, + codexThreadId: after.codexThreadId, + activeTurnId: after.activeTurnId, + moveResponse: moveBody, + }; + } finally { + await mgrSql`DELETE FROM unidesk_code_queue_tasks WHERE id = ${taskId}`; + await mgrSql`DELETE FROM unidesk_code_queue_queues WHERE id IN ${mgrSql([sourceQueueId, targetQueueId])}`; + } } async function retryTask(taskId: string, req: Request): Promise { @@ -1933,8 +2221,20 @@ async function retryTask(taskId: string, req: Request): Promise { const explicitPrompt = typeof body.prompt === "string" ? body.prompt.trim() : typeof body.continuePrompt === "string" ? body.continuePrompt.trim() : ""; const queuedAt = nowIso(); const result = await mgrSql.begin(async (client): Promise<{ task: QueueTask | null; changed: boolean }> => { - const rows = await client>>` - SELECT task_json + const rows = await client>` + SELECT + id, + queue_id, + status, + current_attempt, + current_mode, + codex_thread_id, + active_turn_id, + updated_at, + started_at, + finished_at, + read_at, + task_json FROM unidesk_code_queue_tasks WHERE id = ${taskId} FOR UPDATE @@ -1972,8 +2272,20 @@ async function editTask(taskId: string, req: Request): Promise { const body = asRecord(await readJson(req)) ?? {}; const editedAt = nowIso(); const result = await mgrSql.begin(async (client): Promise<{ task: QueueTask | null; changed: boolean }> => { - const rows = await client>>` - SELECT task_json + const rows = await client>` + SELECT + id, + queue_id, + status, + current_attempt, + current_mode, + codex_thread_id, + active_turn_id, + updated_at, + started_at, + finished_at, + read_at, + task_json FROM unidesk_code_queue_tasks WHERE id = ${taskId} FOR UPDATE @@ -2092,12 +2404,16 @@ async function route(req: Request): Promise { noDockerSocket: true, }, endpoints: { - control: ["/api/queues", "/api/tasks", "/api/tasks/:id/(move|retry|read|edit)"], + control: ["/api/queues", "/api/tasks", "/api/tasks/:id/(move|retry|read|edit)", "/api/queue-claim-move/self-test"], traceRead: ["/api/tasks/overview", "/api/tasks/:id/summary", "/api/tasks/:id/trace-summary", "/api/tasks/:id/trace-steps", "/api/tasks/:id/output"], }, }, schemaReady ? 200 : 503); } if (url.pathname === "/logs" && req.method === "GET") return jsonResponse({ ok: true, logs: recentLogs.slice(-100) }); + if (url.pathname === "/api/queue-claim-move/self-test" && (req.method === "POST" || req.method === "GET")) { + if (!schemaReady) return jsonResponse({ ok: false, error: "code-queue-mgr database schema is not ready", schemaLastError }, 503); + return jsonResponse(await runQueueClaimMoveSelfTest(), 200); + } const activeControl = routeActiveControl(url.pathname, req.method); if (activeControl !== null) return jsonResponse(activeControl.body, activeControl.status); if (url.pathname === "/api/queues" && req.method === "GET") { diff --git a/src/components/microservices/code-queue/src/index.ts b/src/components/microservices/code-queue/src/index.ts index 61358d21..f6159ff1 100644 --- a/src/components/microservices/code-queue/src/index.ts +++ b/src/components/microservices/code-queue/src/index.ts @@ -1411,9 +1411,16 @@ async function upsertWorkdirsToDatabase(records: WorkdirRecord[]): Promise interface DatabaseTaskRow { id: string; + queue_id: string; updated_at: Date | string; status: TaskStatus; read_at: Date | string | null; + current_attempt: number | string | null; + current_mode: RunMode | null; + codex_thread_id: string | null; + active_turn_id: string | null; + started_at: Date | string | null; + finished_at: Date | string | null; task_json: unknown; } @@ -1446,7 +1453,16 @@ function normalizeDatabaseTaskRows(rows: DatabaseTaskRow[], source: string): Que if (typeof taskJson !== "object" || taskJson === null || Array.isArray(taskJson)) throw new Error("task_json is not an object"); tasks.push(normalizeTask({ ...(taskJson as Record), + id: row.id, + queueId: safeQueueId(row.queue_id), status: row.status, + currentAttempt: Number(row.current_attempt ?? 0), + currentMode: row.current_mode, + codexThreadId: row.codex_thread_id, + activeTurnId: row.active_turn_id, + updatedAt: timestampToIso(row.updated_at) ?? nowIso(), + startedAt: timestampToIso(row.started_at), + finishedAt: timestampToIso(row.finished_at), readAt: timestampToIso(row.read_at), } as unknown as QueueTask)); } catch (error) { @@ -1622,20 +1638,39 @@ async function runDatabaseClaimMoveSelfTest(): Promise { async function loadPrunedDatabaseTaskRows(where: "all" | "hot"): Promise { if (where === "hot") { return await sql` - SELECT id, updated_at, status, read_at, task_json - 'output' - 'events' AS task_json + SELECT + id, + queue_id, + updated_at, + status, + read_at, + current_attempt, + current_mode, + codex_thread_id, + active_turn_id, + started_at, + finished_at, + task_json - 'output' - 'events' AS task_json FROM unidesk_code_queue_tasks WHERE status IN ('queued', 'running', 'judging', 'retry_wait') ORDER BY created_at ASC, id ASC `; } return await sql` - SELECT id, updated_at, status, read_at, task_json + SELECT id, queue_id, updated_at, status, read_at, current_attempt, current_mode, codex_thread_id, active_turn_id, started_at, finished_at, task_json FROM ( SELECT id, + queue_id, updated_at, status, read_at, + current_attempt, + current_mode, + codex_thread_id, + active_turn_id, + started_at, + finished_at, jsonb_set( jsonb_set( task_json, @@ -1726,13 +1761,20 @@ async function loadTasksFromDatabaseByIds(taskIds: string[]): Promise id.trim()).filter(Boolean))); if (ids.length === 0) return []; const rows = await sql` - SELECT id, updated_at, status, read_at, task_json + SELECT id, queue_id, updated_at, status, read_at, current_attempt, current_mode, codex_thread_id, active_turn_id, started_at, finished_at, task_json FROM ( SELECT id, + queue_id, updated_at, status, read_at, + current_attempt, + current_mode, + codex_thread_id, + active_turn_id, + started_at, + finished_at, task_json - 'output' - 'events' - 'attempts' - 'promptHistory' AS task_json FROM unidesk_code_queue_tasks WHERE id IN ${sql(ids)} @@ -1743,13 +1785,20 @@ async function loadTasksFromDatabaseByIds(taskIds: string[]): Promise { const rows = await sql` - SELECT id, updated_at, status, read_at, task_json + SELECT id, queue_id, updated_at, status, read_at, current_attempt, current_mode, codex_thread_id, active_turn_id, started_at, finished_at, task_json FROM ( SELECT id, + queue_id, updated_at, status, read_at, + current_attempt, + current_mode, + codex_thread_id, + active_turn_id, + started_at, + finished_at, jsonb_set( jsonb_set( task_json,