fix code queue read marker persistence
This commit is contained in:
@@ -1186,7 +1186,7 @@ function updateNextSeqFromTasks(): void {
|
||||
}
|
||||
|
||||
async function upsertTaskToDatabase(client: SqlExecutor, task: QueueTask): Promise<void> {
|
||||
await client`
|
||||
const rows = await client<Array<{ read_at: Date | string | null }>>`
|
||||
INSERT INTO unidesk_code_queue_tasks (
|
||||
id,
|
||||
queue_id,
|
||||
@@ -1269,15 +1269,32 @@ async function upsertTaskToDatabase(client: SqlExecutor, task: QueueTask): Promi
|
||||
updated_at = EXCLUDED.updated_at,
|
||||
started_at = EXCLUDED.started_at,
|
||||
finished_at = EXCLUDED.finished_at,
|
||||
read_at = EXCLUDED.read_at,
|
||||
read_at = CASE
|
||||
WHEN EXCLUDED.status IN ('queued', 'running', 'judging', 'retry_wait') THEN NULL
|
||||
WHEN unidesk_code_queue_tasks.read_at IS NOT NULL AND EXCLUDED.read_at IS NOT NULL THEN GREATEST(unidesk_code_queue_tasks.read_at, EXCLUDED.read_at)
|
||||
WHEN unidesk_code_queue_tasks.read_at IS NOT NULL THEN unidesk_code_queue_tasks.read_at
|
||||
ELSE EXCLUDED.read_at
|
||||
END,
|
||||
last_error = EXCLUDED.last_error,
|
||||
last_judge = EXCLUDED.last_judge,
|
||||
output_count = EXCLUDED.output_count,
|
||||
event_count = EXCLUDED.event_count,
|
||||
attempt_count = EXCLUDED.attempt_count,
|
||||
last_output_seq = EXCLUDED.last_output_seq,
|
||||
task_json = EXCLUDED.task_json
|
||||
task_json = jsonb_set(
|
||||
EXCLUDED.task_json,
|
||||
'{readAt}',
|
||||
CASE
|
||||
WHEN EXCLUDED.status IN ('queued', 'running', 'judging', 'retry_wait') THEN 'null'::jsonb
|
||||
WHEN unidesk_code_queue_tasks.read_at IS NOT NULL AND EXCLUDED.read_at IS NOT NULL THEN to_jsonb(GREATEST(unidesk_code_queue_tasks.read_at, EXCLUDED.read_at))
|
||||
WHEN unidesk_code_queue_tasks.read_at IS NOT NULL THEN to_jsonb(unidesk_code_queue_tasks.read_at)
|
||||
ELSE COALESCE(to_jsonb(EXCLUDED.read_at), 'null'::jsonb)
|
||||
END,
|
||||
true
|
||||
)
|
||||
RETURNING read_at
|
||||
`;
|
||||
task.readAt = timestampToIso(rows[0]?.read_at ?? null);
|
||||
}
|
||||
|
||||
async function upsertQueueToDatabase(client: SqlExecutor, queue: QueueRecord): Promise<void> {
|
||||
@@ -1325,6 +1342,8 @@ async function upsertWorkdirsToDatabase(records: WorkdirRecord[]): Promise<void>
|
||||
interface DatabaseTaskRow {
|
||||
id: string;
|
||||
updated_at: Date | string;
|
||||
status: TaskStatus;
|
||||
read_at: Date | string | null;
|
||||
task_json: unknown;
|
||||
}
|
||||
|
||||
@@ -1343,7 +1362,13 @@ function normalizeDatabaseTaskRows(rows: DatabaseTaskRow[], source: string): Que
|
||||
const tasks: QueueTask[] = [];
|
||||
for (const row of rows) {
|
||||
try {
|
||||
tasks.push(normalizeTask(row.task_json as QueueTask));
|
||||
const taskJson = row.task_json;
|
||||
if (typeof taskJson !== "object" || taskJson === null || Array.isArray(taskJson)) throw new Error("task_json is not an object");
|
||||
tasks.push(normalizeTask({
|
||||
...(taskJson as Record<string, unknown>),
|
||||
status: row.status,
|
||||
readAt: timestampToIso(row.read_at),
|
||||
} as unknown as QueueTask));
|
||||
} catch (error) {
|
||||
logger("warn", "database_task_row_ignored", { source, id: String(row.id), error: errorToJson(error) });
|
||||
}
|
||||
@@ -1353,11 +1378,13 @@ function normalizeDatabaseTaskRows(rows: DatabaseTaskRow[], source: string): Que
|
||||
|
||||
async function loadPrunedDatabaseTaskRows(where: "all" | "hot"): Promise<DatabaseTaskRow[]> {
|
||||
return await sql<DatabaseTaskRow[]>`
|
||||
SELECT id, updated_at, task_json
|
||||
SELECT id, updated_at, status, read_at, task_json
|
||||
FROM (
|
||||
SELECT
|
||||
id,
|
||||
updated_at,
|
||||
status,
|
||||
read_at,
|
||||
jsonb_set(
|
||||
jsonb_set(
|
||||
task_json,
|
||||
@@ -1407,8 +1434,7 @@ async function loadPrunedDatabaseTaskRows(where: "all" | "hot"): Promise<Databas
|
||||
END,
|
||||
true
|
||||
) AS task_json,
|
||||
created_at,
|
||||
status
|
||||
created_at
|
||||
FROM unidesk_code_queue_tasks
|
||||
) AS pruned_tasks
|
||||
WHERE ${where === "all"} OR status IN ('queued', 'running', 'judging', 'retry_wait')
|
||||
@@ -1449,11 +1475,13 @@ async function loadTasksFromDatabaseByIds(taskIds: string[]): Promise<QueueTask[
|
||||
const ids = Array.from(new Set(taskIds.map((id) => id.trim()).filter(Boolean)));
|
||||
if (ids.length === 0) return [];
|
||||
const rows = await sql<DatabaseTaskRow[]>`
|
||||
SELECT id, updated_at, task_json
|
||||
SELECT id, updated_at, status, read_at, task_json
|
||||
FROM (
|
||||
SELECT
|
||||
id,
|
||||
updated_at,
|
||||
status,
|
||||
read_at,
|
||||
task_json - 'output' - 'events' - 'attempts' - 'promptHistory' AS task_json
|
||||
FROM unidesk_code_queue_tasks
|
||||
WHERE id IN ${sql(ids)}
|
||||
@@ -1464,11 +1492,13 @@ async function loadTasksFromDatabaseByIds(taskIds: string[]): Promise<QueueTask[
|
||||
|
||||
async function loadTaskFromDatabase(taskId: string): Promise<QueueTask | null> {
|
||||
const rows = await sql<DatabaseTaskRow[]>`
|
||||
SELECT id, updated_at, task_json
|
||||
SELECT id, updated_at, status, read_at, task_json
|
||||
FROM (
|
||||
SELECT
|
||||
id,
|
||||
updated_at,
|
||||
status,
|
||||
read_at,
|
||||
jsonb_set(
|
||||
jsonb_set(
|
||||
task_json,
|
||||
|
||||
Reference in New Issue
Block a user