From 857b4bc2989889956ae90ea7c1891c001b7594be Mon Sep 17 00:00:00 2001 From: Codex Date: Sun, 17 May 2026 02:31:51 +0000 Subject: [PATCH] fix code queue read marker persistence --- .../microservices/code-queue/src/index.ts | 48 +++++++++++++++---- 1 file changed, 39 insertions(+), 9 deletions(-) diff --git a/src/components/microservices/code-queue/src/index.ts b/src/components/microservices/code-queue/src/index.ts index 297dca54..fe758aa6 100644 --- a/src/components/microservices/code-queue/src/index.ts +++ b/src/components/microservices/code-queue/src/index.ts @@ -1186,7 +1186,7 @@ function updateNextSeqFromTasks(): void { } async function upsertTaskToDatabase(client: SqlExecutor, task: QueueTask): Promise { - await client` + const rows = await client>` INSERT INTO unidesk_code_queue_tasks ( id, queue_id, @@ -1269,15 +1269,32 @@ async function upsertTaskToDatabase(client: SqlExecutor, task: QueueTask): Promi updated_at = EXCLUDED.updated_at, started_at = EXCLUDED.started_at, finished_at = EXCLUDED.finished_at, - read_at = EXCLUDED.read_at, + read_at = CASE + WHEN EXCLUDED.status IN ('queued', 'running', 'judging', 'retry_wait') THEN NULL + WHEN unidesk_code_queue_tasks.read_at IS NOT NULL AND EXCLUDED.read_at IS NOT NULL THEN GREATEST(unidesk_code_queue_tasks.read_at, EXCLUDED.read_at) + WHEN unidesk_code_queue_tasks.read_at IS NOT NULL THEN unidesk_code_queue_tasks.read_at + ELSE EXCLUDED.read_at + END, last_error = EXCLUDED.last_error, last_judge = EXCLUDED.last_judge, output_count = EXCLUDED.output_count, event_count = EXCLUDED.event_count, attempt_count = EXCLUDED.attempt_count, last_output_seq = EXCLUDED.last_output_seq, - task_json = EXCLUDED.task_json + task_json = jsonb_set( + EXCLUDED.task_json, + '{readAt}', + CASE + WHEN EXCLUDED.status IN ('queued', 'running', 'judging', 'retry_wait') THEN 'null'::jsonb + WHEN unidesk_code_queue_tasks.read_at IS NOT NULL AND EXCLUDED.read_at IS NOT NULL THEN to_jsonb(GREATEST(unidesk_code_queue_tasks.read_at, EXCLUDED.read_at)) + WHEN unidesk_code_queue_tasks.read_at IS NOT NULL THEN to_jsonb(unidesk_code_queue_tasks.read_at) + ELSE COALESCE(to_jsonb(EXCLUDED.read_at), 'null'::jsonb) + END, + true + ) + RETURNING read_at `; + task.readAt = timestampToIso(rows[0]?.read_at ?? null); } async function upsertQueueToDatabase(client: SqlExecutor, queue: QueueRecord): Promise { @@ -1325,6 +1342,8 @@ async function upsertWorkdirsToDatabase(records: WorkdirRecord[]): Promise interface DatabaseTaskRow { id: string; updated_at: Date | string; + status: TaskStatus; + read_at: Date | string | null; task_json: unknown; } @@ -1343,7 +1362,13 @@ function normalizeDatabaseTaskRows(rows: DatabaseTaskRow[], source: string): Que const tasks: QueueTask[] = []; for (const row of rows) { try { - tasks.push(normalizeTask(row.task_json as QueueTask)); + const taskJson = row.task_json; + if (typeof taskJson !== "object" || taskJson === null || Array.isArray(taskJson)) throw new Error("task_json is not an object"); + tasks.push(normalizeTask({ + ...(taskJson as Record), + status: row.status, + readAt: timestampToIso(row.read_at), + } as unknown as QueueTask)); } catch (error) { logger("warn", "database_task_row_ignored", { source, id: String(row.id), error: errorToJson(error) }); } @@ -1353,11 +1378,13 @@ function normalizeDatabaseTaskRows(rows: DatabaseTaskRow[], source: string): Que async function loadPrunedDatabaseTaskRows(where: "all" | "hot"): Promise { return await sql` - SELECT id, updated_at, task_json + SELECT id, updated_at, status, read_at, task_json FROM ( SELECT id, updated_at, + status, + read_at, jsonb_set( jsonb_set( task_json, @@ -1407,8 +1434,7 @@ async function loadPrunedDatabaseTaskRows(where: "all" | "hot"): Promise id.trim()).filter(Boolean))); if (ids.length === 0) return []; const rows = await sql` - SELECT id, updated_at, task_json + SELECT id, updated_at, status, read_at, task_json FROM ( SELECT id, updated_at, + status, + read_at, task_json - 'output' - 'events' - 'attempts' - 'promptHistory' AS task_json FROM unidesk_code_queue_tasks WHERE id IN ${sql(ids)} @@ -1464,11 +1492,13 @@ async function loadTasksFromDatabaseByIds(taskIds: string[]): Promise { const rows = await sql` - SELECT id, updated_at, task_json + SELECT id, updated_at, status, read_at, task_json FROM ( SELECT id, updated_at, + status, + read_at, jsonb_set( jsonb_set( task_json,