fix: guard code queue manager moves

This commit is contained in:
Codex
2026-05-17 11:48:07 +00:00
parent a24f8b9968
commit 00e5766794
3 changed files with 405 additions and 39 deletions
+1
View File
@@ -157,6 +157,7 @@ Baidu Netdisk 在 UniDesk 语境中按纯后端服务管理:不得暴露百度
- Orchestrator:稳定 `code-queue` ID 的控制/读取路径由 backend-core 分流到 `deployment.mode=internal-sidecar``code-queue-mgr`D601 执行面仍登记为 `deployment.mode=k3sctl-managed``deployment.adapterServiceId=k3sctl-adapter``deployment.k3sServiceId=code-queue``backend.proxyMode=k3sctl-adapter-http``backend.nodeBaseUrl=k3s://code-queue`。对外登记的 `code-queue` ID 保持稳定,frontend/CLI 不需要知道内部拆分。
- Direct path ban`code-queue` 不得再登记 `http://code-queue:4222``http://host.docker.internal:4222`、NodePort 或 provider-gateway `microservice.http` 作为业务代理目标;frontend 也不得使用旧 `/api/code-queue-direct` 兼容别名作为 Code Queue 页面数据源。provider-gateway 只允许用于维护 D601/D518、部署 adapter、部署 k3s/k8s 节点或诊断节点本机容器。
- Claim/move consistencymaster `code-queue-mgr` 和 D601 scheduler 都必须以 PostgreSQL 状态为权威;move 只允许未 claim 的 `queued`/`retry_wait` 行,merge 在 source/target queue 存在 `running`/`judging` 或 claim marker 时整体返回 409。稳定 `code-queue` 代理可用 `/api/queue-claim-move/self-test` 验证 claimed task move 会被拒绝且数据库仍保持 running/source queue 状态。
- D601 Service boundaryD601 内部可以继续保留 `code-queue-read``code-queue-write``code-queue-scheduler` 三个 Kubernetes Service 作为执行面兼容和过渡对象,但普通提交、queue CRUD、history、readAt 和轻量 overview 不得依赖 `code-queue-write` 或 D601 egress 可用;`code-queue-write` 不 ready 时,主 server `code-queue-mgr` 仍应保证 CLI/WebUI 的提交、列表和历史读取可用。需要 active run、dev-container、judge 或执行面健康的路径才进入 D601 scheduler。
- 服务拆分语义:`code-queue-read` 只承载 GET/HEAD 查询、overview、任务详情、Trace/output/transcript、统计和只读健康,可多副本滚动更新;它必须设置 `CODE_QUEUE_SERVICE_ROLE=read``CODE_QUEUE_SCHEDULER_ENABLED=false`,且不得接受入队、queue 变更、已读、重试、移动、追加 prompt 或打断这类 mutation。`code-queue-write` 承载入队、queue 创建/合并/更新、已读、手动重试、移动等命令写入,初期保持单副本和 `CODE_QUEUE_SERVICE_ROLE=write`,只把命令和任务状态写入 PostgreSQL,不启动 agent 子进程。`code-queue-scheduler` 是唯一拥有 scheduler 和 active run 的执行服务,设置 `CODE_QUEUE_SERVICE_ROLE=scheduler``CODE_QUEUE_SCHEDULER_ENABLED=true`,负责从 PostgreSQL 热任务集轮询新写入任务、推进队列、启动 Codex/OpenCode、处理 running task 的 steer/interrupt、发送终态通知和暴露执行端 `/health`。普通 Service 负载均衡不得把 mutation 打到 read,也不得把 running task 控制打到 write。
- 实例语义:D601 是当前唯一 active 执行节点,`code-queue-scheduler` 以一个 scheduler Pod 承载长生命周期 Codex/OpenCode 子进程并轮询主 PostgreSQL 中由 `code-queue-mgr` 写入的 queued/retry_wait 任务。D518 不属于当前 Code Queue k3s 拓扑;在没有原生 k3s-agent 与稳定 Kubernetes 网络前,不得把 D518 写回 `expectedNodeIds` 或恢复 `code-queue-d518` standby。D601 scheduler 默认关闭 `CODE_QUEUE_STARTUP_OA_BACKFILL_ENABLED`;历史 OA Trace/STEP 回填必须通过显式 `/api/oa/backfill` 运维动作触发,不能在每次 Pod 重启时自动批量发布旧事件。
@@ -205,6 +205,21 @@ interface TaskRow {
task_json: unknown;
}
type TaskStatusRow = Pick<TaskRow, "id" | "queue_id" | "status" | "started_at" | "current_attempt" | "codex_thread_id" | "active_turn_id">;
type TaskJsonRow = Pick<TaskRow, "task_json"> & Partial<Pick<TaskRow,
"id"
| "queue_id"
| "status"
| "current_attempt"
| "current_mode"
| "codex_thread_id"
| "active_turn_id"
| "updated_at"
| "started_at"
| "finished_at"
| "read_at"
>>;
interface QueueRow {
id: string;
name: string;
@@ -988,8 +1003,58 @@ function queueIdOf(task: QueueTask): string {
return safeQueueId(task.queueId);
}
function rowToTask(row: Pick<TaskRow, "task_json">): QueueTask {
return normalizeTask(row.task_json);
function rowToTask(row: TaskJsonRow): QueueTask {
const task = normalizeTask(row.task_json);
if (typeof row.id === "string" && row.id.length > 0) task.id = row.id;
if (typeof row.queue_id === "string" && row.queue_id.length > 0) task.queueId = safeQueueId(row.queue_id);
if (typeof row.status === "string" && row.status.length > 0) task.status = row.status as TaskStatus;
if (row.current_attempt !== undefined) task.currentAttempt = Number(row.current_attempt ?? 0);
if (row.current_mode !== undefined) task.currentMode = row.current_mode ?? null;
if (row.codex_thread_id !== undefined) task.codexThreadId = row.codex_thread_id ?? null;
if (row.active_turn_id !== undefined) task.activeTurnId = row.active_turn_id ?? null;
if (row.updated_at !== undefined) task.updatedAt = timestampToIso(row.updated_at) ?? task.updatedAt;
if (row.started_at !== undefined) task.startedAt = timestampToIso(row.started_at);
if (row.finished_at !== undefined) task.finishedAt = timestampToIso(row.finished_at);
if (row.read_at !== undefined) task.readAt = timestampToIso(row.read_at);
return task;
}
function taskStatusRowJson(row: TaskStatusRow | null): JsonValue {
if (row === null) return null;
return {
id: row.id,
queueId: safeQueueId(row.queue_id),
status: row.status,
startedAt: timestampToIso(row.started_at),
currentAttempt: Number(row.current_attempt ?? 0),
codexThreadId: row.codex_thread_id,
activeTurnId: row.active_turn_id,
};
}
function taskIsUnclaimedMovable(task: QueueTask): boolean {
return (task.status === "queued" || task.status === "retry_wait")
&& task.startedAt === null
&& task.currentAttempt === 0
&& task.codexThreadId === null
&& task.activeTurnId === null;
}
function taskStatusMoveBlocker(row: TaskStatusRow | null): string {
if (row === null) return "task not found";
if (row.status !== "queued" && row.status !== "retry_wait") return `status=${row.status}`;
if (row.started_at !== null) return "task already has started_at";
const currentAttempt = Number(row.current_attempt ?? 0);
if (currentAttempt !== 0) return `task already has current_attempt=${currentAttempt}`;
if (row.codex_thread_id !== null) return "task already has codex_thread_id";
if (row.active_turn_id !== null) return "task already has active_turn_id";
return "";
}
function queueMergeBlocker(row: TaskStatusRow): string {
if (row.status === "running" || row.status === "judging") return `task ${row.id} is ${row.status}`;
if ((row.status === "queued" || row.status === "retry_wait") && taskStatusMoveBlocker(row).length > 0) return `task ${row.id} has already been claimed`;
return "";
}
function queueRowToRecord(row: QueueRow): QueueRecord {
@@ -1086,6 +1151,7 @@ async function upsertQueue(queue: QueueRecord): Promise<void> {
}
async function upsertTask(client: SqlExecutor, task: QueueTask): Promise<void> {
const taskHasClaim = hasClaimMarkers(task);
await client`
INSERT INTO unidesk_code_queue_tasks (
id, queue_id, status, provider_id, execution_mode, model, cwd, prompt, base_prompt,
@@ -1132,6 +1198,26 @@ async function upsertTask(client: SqlExecutor, task: QueueTask): Promise<void> {
last_output_seq = EXCLUDED.last_output_seq,
task_json = EXCLUDED.task_json
WHERE unidesk_code_queue_tasks.status NOT IN ('running', 'judging')
AND (
${taskHasClaim}
OR unidesk_code_queue_tasks.started_at IS NULL
OR unidesk_code_queue_tasks.started_at = EXCLUDED.started_at
)
AND (
${taskHasClaim}
OR unidesk_code_queue_tasks.current_attempt = 0
OR unidesk_code_queue_tasks.current_attempt = EXCLUDED.current_attempt
)
AND (
${taskHasClaim}
OR unidesk_code_queue_tasks.codex_thread_id IS NULL
OR unidesk_code_queue_tasks.codex_thread_id = EXCLUDED.codex_thread_id
)
AND (
${taskHasClaim}
OR unidesk_code_queue_tasks.active_turn_id IS NULL
OR unidesk_code_queue_tasks.active_turn_id = EXCLUDED.active_turn_id
)
`;
}
@@ -1150,8 +1236,20 @@ async function loadQueues(): Promise<QueueRecord[]> {
}
async function loadTask(taskId: string): Promise<QueueTask | null> {
const rows = await traceSql<Array<Pick<TaskRow, "task_json">>>`
SELECT task_json
const rows = await traceSql<TaskJsonRow[]>`
SELECT
id,
queue_id,
status,
current_attempt,
current_mode,
codex_thread_id,
active_turn_id,
updated_at,
started_at,
finished_at,
read_at,
task_json
FROM unidesk_code_queue_tasks
WHERE id = ${taskId}
LIMIT 1
@@ -1166,8 +1264,21 @@ async function loadTasksForList(url: URL): Promise<{ tasks: QueueTask[]; total:
const queueIdRaw = url.searchParams.get("queueId");
const queueId = queueIdRaw === null || queueIdRaw.length === 0 ? null : safeQueueId(queueIdRaw);
const search = String(url.searchParams.get("q") ?? url.searchParams.get("search") ?? "").trim();
const rows = await traceSql<Array<Pick<TaskRow, "task_json"> & { total_count: string | number }>>`
SELECT task_json, COUNT(*) OVER() AS total_count
const rows = await traceSql<Array<TaskJsonRow & { total_count: string | number }>>`
SELECT
id,
queue_id,
status,
current_attempt,
current_mode,
codex_thread_id,
active_turn_id,
updated_at,
started_at,
finished_at,
read_at,
task_json,
COUNT(*) OVER() AS total_count
FROM unidesk_code_queue_tasks
WHERE (${status === null} OR status = ${status})
AND (${queueId === null} OR queue_id = ${queueId})
@@ -1185,8 +1296,20 @@ async function loadTasksForList(url: URL): Promise<{ tasks: QueueTask[]; total:
async function loadTasksByIds(ids: string[]): Promise<QueueTask[]> {
const unique = Array.from(new Set(ids.map((id) => id.trim()).filter(Boolean)));
if (unique.length === 0) return [];
const rows = await traceSql<Array<Pick<TaskRow, "task_json">>>`
SELECT task_json
const rows = await traceSql<TaskJsonRow[]>`
SELECT
id,
queue_id,
status,
current_attempt,
current_mode,
codex_thread_id,
active_turn_id,
updated_at,
started_at,
finished_at,
read_at,
task_json
FROM unidesk_code_queue_tasks
WHERE id IN ${traceSql(unique)}
`;
@@ -1307,8 +1430,20 @@ async function queueSummary(tasks?: QueueTask[], queueRecords?: QueueRecord[]):
}
async function loadAllTasksLite(): Promise<QueueTask[]> {
const rows = await traceSql<Array<Pick<TaskRow, "task_json">>>`
SELECT task_json
const rows = await traceSql<TaskJsonRow[]>`
SELECT
id,
queue_id,
status,
current_attempt,
current_mode,
codex_thread_id,
active_turn_id,
updated_at,
started_at,
finished_at,
read_at,
task_json - 'output' - 'events' AS task_json
FROM unidesk_code_queue_tasks
ORDER BY created_at ASC, id ASC
LIMIT 2000
@@ -1838,24 +1973,54 @@ async function mergeQueues(targetQueueIdValue: string | null, req: Request): Pro
.filter((id) => id !== targetQueueId);
if (sourceQueueIds.length === 0) return jsonResponse({ ok: false, error: "sourceQueueId is required" }, 400);
const updatedAt = nowIso();
await mgrSql.begin(async (client) => {
const result = await mgrSql.begin(async (client): Promise<{ movedTaskIds: string[]; blocker: TaskStatusRow | null }> => {
const mergeQueueIds = Array.from(new Set([targetQueueId, ...sourceQueueIds]));
const lockedRows = await client<TaskStatusRow[]>`
SELECT id, queue_id, status, started_at, current_attempt, codex_thread_id, active_turn_id
FROM unidesk_code_queue_tasks
WHERE queue_id IN ${client(mergeQueueIds)}
ORDER BY updated_at DESC, id DESC
FOR UPDATE
`;
const blocker = lockedRows.find((row) => queueMergeBlocker(row).length > 0) ?? null;
if (blocker !== null) return { movedTaskIds: [], blocker };
await client`
INSERT INTO unidesk_code_queue_queues (id, name, created_at, updated_at)
VALUES (${targetQueueId}, ${targetQueueId}, ${updatedAt}, ${updatedAt})
ON CONFLICT (id) DO UPDATE SET updated_at = EXCLUDED.updated_at
`;
await client`
const movedRows = await client<Array<{ id: string }>>`
UPDATE unidesk_code_queue_tasks
SET
queue_id = ${targetQueueId},
updated_at = ${updatedAt},
task_json = jsonb_set(jsonb_set(task_json, '{queueId}', to_jsonb(${targetQueueId}::text), true), '{updatedAt}', to_jsonb(${updatedAt}::text), true)
WHERE queue_id IN ${client(sourceQueueIds)}
AND status NOT IN ('running', 'judging')
AND (
status IN ('succeeded', 'failed', 'canceled')
OR (
status IN ('queued', 'retry_wait')
AND started_at IS NULL
AND current_attempt = 0
AND codex_thread_id IS NULL
AND active_turn_id IS NULL
)
)
RETURNING id
`;
await client`DELETE FROM unidesk_code_queue_queues WHERE id IN ${client(sourceQueueIds)}`;
return { movedTaskIds: movedRows.map((row) => row.id), blocker: null };
});
return jsonResponse({ ok: true, targetQueueId, sourceQueueIds, queue: await queueSummary() }, 202);
if (result.blocker !== null) {
const blocker = result.blocker;
const queueId = safeQueueId(blocker.queue_id);
return jsonResponse({
ok: false,
error: `cannot merge queue ${queueId}: ${queueMergeBlocker(blocker)}`,
blocker: taskStatusRowJson(blocker),
}, 409);
}
return jsonResponse({ ok: true, targetQueueId, sourceQueueIds, movedTaskIds: result.movedTaskIds, queue: await queueSummary() }, 202);
}
async function markTaskRead(taskId: string): Promise<Response> {
@@ -1900,32 +2065,155 @@ async function moveTask(taskId: string, req: Request): Promise<Response> {
const body = asRecord(await readJson(req)) ?? {};
const queueId = normalizeQueueId(body.queueId ?? body.id);
const movedAt = nowIso();
const task = await mgrSql.begin(async (client): Promise<QueueTask | null> => {
const result = await mgrSql.begin(async (client): Promise<{ task: QueueTask | null; blocker: string; row: TaskStatusRow | null }> => {
const rows = await client<Array<TaskJsonRow & TaskStatusRow>>`
SELECT
id,
queue_id,
status,
current_attempt,
current_mode,
codex_thread_id,
active_turn_id,
updated_at,
started_at,
finished_at,
read_at,
task_json
FROM unidesk_code_queue_tasks
WHERE id = ${taskId}
FOR UPDATE
`;
const row = rows[0] ?? null;
const blocker = taskStatusMoveBlocker(row);
if (row === null || blocker.length > 0) return { task: row === null ? null : rowToTask(row), blocker, row };
await client`
INSERT INTO unidesk_code_queue_queues (id, name, created_at, updated_at)
VALUES (${queueId}, ${queueId}, ${movedAt}, ${movedAt})
ON CONFLICT (id) DO UPDATE SET updated_at = EXCLUDED.updated_at
`;
const rows = await client<Array<Pick<TaskRow, "task_json">>>`
SELECT task_json
FROM unidesk_code_queue_tasks
WHERE id = ${taskId}
FOR UPDATE
`;
if (rows[0] === undefined) return null;
const nextTask = rowToTask(rows[0]);
if (nextTask.status === "running" || nextTask.status === "judging") return nextTask;
const previousQueueId = safeQueueId(row.queue_id);
const nextTask = rowToTask(row);
nextTask.queueId = queueId;
nextTask.queueEnteredAt = movedAt;
nextTask.updatedAt = movedAt;
nextTask.output.push({ seq: outputMaxSeq(nextTask) + 1, at: movedAt, channel: "system", text: `moved to queue=${queueId}\n`, method: "queue/move" });
nextTask.output.push({ seq: outputMaxSeq(nextTask) + 1, at: movedAt, channel: "system", text: `moved from queue=${previousQueueId} to queue=${queueId}\n`, method: "queue/move" });
nextTask.outputMaxSeq = outputMaxSeq(nextTask);
await upsertTask(client, nextTask);
return nextTask;
const updated = await client<TaskStatusRow[]>`
UPDATE unidesk_code_queue_tasks
SET
queue_id = ${queueId},
updated_at = ${movedAt},
output_count = ${nextTask.output.length},
event_count = ${nextTask.events.length},
attempt_count = ${nextTask.attempts.length},
last_output_seq = ${outputMaxSeq(nextTask)},
task_json = ${client.json(taskJson(nextTask) as unknown as postgres.JSONValue)}
WHERE id = ${taskId}
AND status IN ('queued', 'retry_wait')
AND started_at IS NULL
AND current_attempt = 0
AND codex_thread_id IS NULL
AND active_turn_id IS NULL
RETURNING id, queue_id, status, started_at, current_attempt, codex_thread_id, active_turn_id
`;
if (updated[0] === undefined) return { task: rowToTask(row), blocker: "conditional update matched no rows", row };
return { task: nextTask, blocker: "", row: updated[0] };
});
if (task === null) return jsonResponse({ ok: false, error: "task not found" }, 404);
if (task.status === "running" || task.status === "judging") return jsonResponse({ ok: false, error: `cannot move active task while status=${task.status}`, task: taskListResponse(task) }, 409);
return jsonResponse({ ok: true, task: taskListResponse(task, false), queue: await queueSummary() }, 202);
if (result.task === null) return jsonResponse({ ok: false, error: "task not found" }, 404);
if (result.blocker.length > 0) {
return jsonResponse({
ok: false,
error: `cannot move task ${taskId}: ${result.blocker}`,
blocker: taskStatusRowJson(result.row),
task: taskListResponse(result.task),
}, 409);
}
return jsonResponse({ ok: true, task: taskListResponse(result.task, false), blocker: taskStatusRowJson(result.row), queue: await queueSummary() }, 202);
}
async function runQueueClaimMoveSelfTest(): Promise<JsonRecord> {
const suffix = String(Date.now());
const taskId = `codex_mgr_claim_move_${suffix}`;
const sourceQueueId = `mgr_claim_move_source_${suffix}`;
const targetQueueId = `mgr_claim_move_target_${suffix}`;
const queuedAt = nowIso();
await mgrSql`DELETE FROM unidesk_code_queue_tasks WHERE id = ${taskId}`;
try {
const queuedTask = normalizeTask(await createTaskFromRequest({
prompt: "Code Queue manager claim/move race self-test",
queueId: sourceQueueId,
maxAttempts: 1,
}));
queuedTask.id = taskId;
queuedTask.queueId = sourceQueueId;
queuedTask.queueEnteredAt = queuedAt;
queuedTask.createdAt = queuedAt;
queuedTask.updatedAt = queuedAt;
await mgrSql.begin(async (client) => {
await client`
INSERT INTO unidesk_code_queue_queues (id, name, created_at, updated_at)
VALUES (${sourceQueueId}, ${sourceQueueId}, ${queuedAt}, ${queuedAt})
ON CONFLICT (id) DO UPDATE SET updated_at = EXCLUDED.updated_at
`;
await upsertTask(client, queuedTask);
});
const claimedAt = nowIso();
const claimedTask = normalizeTask(JSON.parse(JSON.stringify(queuedTask)) as QueueTask);
claimedTask.status = "running";
claimedTask.startedAt = claimedAt;
claimedTask.currentAttempt = 1;
claimedTask.currentMode = "initial";
claimedTask.codexThreadId = `thread_${suffix}`;
claimedTask.activeTurnId = `turn_${suffix}`;
claimedTask.updatedAt = claimedAt;
await mgrSql.begin(async (client) => {
await upsertTask(client, claimedTask);
});
const moveResponse = await moveTask(taskId, new Request(`http://code-queue-mgr.local/api/tasks/${taskId}/move`, {
method: "POST",
body: JSON.stringify({ queueId: targetQueueId }),
headers: { "content-type": "application/json" },
}));
const moveBody = await moveResponse.json() as JsonRecord;
const rows = await mgrSql<Array<TaskJsonRow & TaskStatusRow>>`
SELECT id, queue_id, status, current_attempt, current_mode, codex_thread_id, active_turn_id, updated_at, started_at, finished_at, read_at, task_json
FROM unidesk_code_queue_tasks
WHERE id = ${taskId}
LIMIT 1
`;
const after = rows[0] === undefined ? null : rowToTask(rows[0]);
const ok = moveResponse.status === 409
&& after !== null
&& after.status === "running"
&& queueIdOf(after) === sourceQueueId
&& after.currentAttempt === 1
&& after.startedAt !== null
&& after.codexThreadId !== null
&& after.activeTurnId !== null;
if (!ok) {
throw new Error(`claim/move self-test failed: moveStatus=${moveResponse.status}, after=${JSON.stringify(after === null ? null : taskListResponse(after))}`);
}
return {
ok: true,
taskId,
sourceQueueId,
targetQueueId,
moveStatus: moveResponse.status,
databaseStatus: after.status,
databaseQueueId: queueIdOf(after),
currentAttempt: after.currentAttempt,
startedAt: after.startedAt,
codexThreadId: after.codexThreadId,
activeTurnId: after.activeTurnId,
moveResponse: moveBody,
};
} finally {
await mgrSql`DELETE FROM unidesk_code_queue_tasks WHERE id = ${taskId}`;
await mgrSql`DELETE FROM unidesk_code_queue_queues WHERE id IN ${mgrSql([sourceQueueId, targetQueueId])}`;
}
}
async function retryTask(taskId: string, req: Request): Promise<Response> {
@@ -1933,8 +2221,20 @@ async function retryTask(taskId: string, req: Request): Promise<Response> {
const explicitPrompt = typeof body.prompt === "string" ? body.prompt.trim() : typeof body.continuePrompt === "string" ? body.continuePrompt.trim() : "";
const queuedAt = nowIso();
const result = await mgrSql.begin(async (client): Promise<{ task: QueueTask | null; changed: boolean }> => {
const rows = await client<Array<Pick<TaskRow, "task_json">>>`
SELECT task_json
const rows = await client<Array<TaskJsonRow & TaskStatusRow>>`
SELECT
id,
queue_id,
status,
current_attempt,
current_mode,
codex_thread_id,
active_turn_id,
updated_at,
started_at,
finished_at,
read_at,
task_json
FROM unidesk_code_queue_tasks
WHERE id = ${taskId}
FOR UPDATE
@@ -1972,8 +2272,20 @@ async function editTask(taskId: string, req: Request): Promise<Response> {
const body = asRecord(await readJson(req)) ?? {};
const editedAt = nowIso();
const result = await mgrSql.begin(async (client): Promise<{ task: QueueTask | null; changed: boolean }> => {
const rows = await client<Array<Pick<TaskRow, "task_json">>>`
SELECT task_json
const rows = await client<Array<TaskJsonRow & TaskStatusRow>>`
SELECT
id,
queue_id,
status,
current_attempt,
current_mode,
codex_thread_id,
active_turn_id,
updated_at,
started_at,
finished_at,
read_at,
task_json
FROM unidesk_code_queue_tasks
WHERE id = ${taskId}
FOR UPDATE
@@ -2092,12 +2404,16 @@ async function route(req: Request): Promise<Response> {
noDockerSocket: true,
},
endpoints: {
control: ["/api/queues", "/api/tasks", "/api/tasks/:id/(move|retry|read|edit)"],
control: ["/api/queues", "/api/tasks", "/api/tasks/:id/(move|retry|read|edit)", "/api/queue-claim-move/self-test"],
traceRead: ["/api/tasks/overview", "/api/tasks/:id/summary", "/api/tasks/:id/trace-summary", "/api/tasks/:id/trace-steps", "/api/tasks/:id/output"],
},
}, schemaReady ? 200 : 503);
}
if (url.pathname === "/logs" && req.method === "GET") return jsonResponse({ ok: true, logs: recentLogs.slice(-100) });
if (url.pathname === "/api/queue-claim-move/self-test" && (req.method === "POST" || req.method === "GET")) {
if (!schemaReady) return jsonResponse({ ok: false, error: "code-queue-mgr database schema is not ready", schemaLastError }, 503);
return jsonResponse(await runQueueClaimMoveSelfTest(), 200);
}
const activeControl = routeActiveControl(url.pathname, req.method);
if (activeControl !== null) return jsonResponse(activeControl.body, activeControl.status);
if (url.pathname === "/api/queues" && req.method === "GET") {
@@ -1411,9 +1411,16 @@ async function upsertWorkdirsToDatabase(records: WorkdirRecord[]): Promise<void>
interface DatabaseTaskRow {
id: string;
queue_id: string;
updated_at: Date | string;
status: TaskStatus;
read_at: Date | string | null;
current_attempt: number | string | null;
current_mode: RunMode | null;
codex_thread_id: string | null;
active_turn_id: string | null;
started_at: Date | string | null;
finished_at: Date | string | null;
task_json: unknown;
}
@@ -1446,7 +1453,16 @@ function normalizeDatabaseTaskRows(rows: DatabaseTaskRow[], source: string): Que
if (typeof taskJson !== "object" || taskJson === null || Array.isArray(taskJson)) throw new Error("task_json is not an object");
tasks.push(normalizeTask({
...(taskJson as Record<string, unknown>),
id: row.id,
queueId: safeQueueId(row.queue_id),
status: row.status,
currentAttempt: Number(row.current_attempt ?? 0),
currentMode: row.current_mode,
codexThreadId: row.codex_thread_id,
activeTurnId: row.active_turn_id,
updatedAt: timestampToIso(row.updated_at) ?? nowIso(),
startedAt: timestampToIso(row.started_at),
finishedAt: timestampToIso(row.finished_at),
readAt: timestampToIso(row.read_at),
} as unknown as QueueTask));
} catch (error) {
@@ -1622,20 +1638,39 @@ async function runDatabaseClaimMoveSelfTest(): Promise<JsonValue | null> {
async function loadPrunedDatabaseTaskRows(where: "all" | "hot"): Promise<DatabaseTaskRow[]> {
if (where === "hot") {
return await sql<DatabaseTaskRow[]>`
SELECT id, updated_at, status, read_at, task_json - 'output' - 'events' AS task_json
SELECT
id,
queue_id,
updated_at,
status,
read_at,
current_attempt,
current_mode,
codex_thread_id,
active_turn_id,
started_at,
finished_at,
task_json - 'output' - 'events' AS task_json
FROM unidesk_code_queue_tasks
WHERE status IN ('queued', 'running', 'judging', 'retry_wait')
ORDER BY created_at ASC, id ASC
`;
}
return await sql<DatabaseTaskRow[]>`
SELECT id, updated_at, status, read_at, task_json
SELECT id, queue_id, updated_at, status, read_at, current_attempt, current_mode, codex_thread_id, active_turn_id, started_at, finished_at, task_json
FROM (
SELECT
id,
queue_id,
updated_at,
status,
read_at,
current_attempt,
current_mode,
codex_thread_id,
active_turn_id,
started_at,
finished_at,
jsonb_set(
jsonb_set(
task_json,
@@ -1726,13 +1761,20 @@ async function loadTasksFromDatabaseByIds(taskIds: string[]): Promise<QueueTask[
const ids = Array.from(new Set(taskIds.map((id) => id.trim()).filter(Boolean)));
if (ids.length === 0) return [];
const rows = await sql<DatabaseTaskRow[]>`
SELECT id, updated_at, status, read_at, task_json
SELECT id, queue_id, updated_at, status, read_at, current_attempt, current_mode, codex_thread_id, active_turn_id, started_at, finished_at, task_json
FROM (
SELECT
id,
queue_id,
updated_at,
status,
read_at,
current_attempt,
current_mode,
codex_thread_id,
active_turn_id,
started_at,
finished_at,
task_json - 'output' - 'events' - 'attempts' - 'promptHistory' AS task_json
FROM unidesk_code_queue_tasks
WHERE id IN ${sql(ids)}
@@ -1743,13 +1785,20 @@ async function loadTasksFromDatabaseByIds(taskIds: string[]): Promise<QueueTask[
async function loadTaskFromDatabase(taskId: string): Promise<QueueTask | null> {
const rows = await sql<DatabaseTaskRow[]>`
SELECT id, updated_at, status, read_at, task_json
SELECT id, queue_id, updated_at, status, read_at, current_attempt, current_mode, codex_thread_id, active_turn_id, started_at, finished_at, task_json
FROM (
SELECT
id,
queue_id,
updated_at,
status,
read_at,
current_attempt,
current_mode,
codex_thread_id,
active_turn_id,
started_at,
finished_at,
jsonb_set(
jsonb_set(
task_json,