fix: stabilize code queue retry claims

This commit is contained in:
Codex
2026-05-17 10:05:47 +00:00
parent f775990c90
commit 3d2af9ff86
7 changed files with 816 additions and 152 deletions
+1 -1
View File
@@ -28,7 +28,7 @@ UniDesk 的统一 CLI 入口是根目录 `scripts/cli.ts`,运行方式固定
- `codex output <taskId> --tail|--from-start|--after-seq N|--before-seq N --limit N [--full-text]` 按原始 output seq 分页读取底层记录;当 trace 行提示 `commandOmittedLines``bodyOmittedLines``rawSeqs` 时,用该命令按 seq 补取完整信息,默认仍有单条文本预览上限,显式 `--full-text` 才返回该页全文。
- `codex judge <taskId> --attempt N [--dry-run] [--include-prompt]` 通过 Code Queue 私有代理按指定 attempt 单步复现 judge;这是执行面诊断入口,仍依赖 D601 scheduler/runner 侧的真实 judge builder、MiniMax 调用路径和执行环境。默认会真实调用 MiniMax,`--dry-run` 只返回 prompt/payload 大小、attempt 窗口和重建来源诊断,`--include-prompt` 仅用于本地深度排查。
- `codex interrupt|cancel <taskId>` 通过 Code Queue 私有代理请求中断;running/judging 任务会请求 D601 当前 agent run 停止,queued/retry_wait 任务的取消也必须保持与 WebUI 相同代理路径,返回有界 task 摘要和后续查询命令。任何需要接触 active run 的动作仍属于 D601 执行面。
- Code Queue 多队列 lane 由 `codex` 命令命名空间管理:`queues` 列表、`queue create <queueId>` 创建、`queue merge <sourceQueueId> --into <targetQueueId>` 合并、`move <taskId> --queue <queueId>` 迁移;这些队列管理入口默认由主 server `code-queue-mgr` 直管 PostgreSQL,仍通过稳定 `code-queue` 用户服务代理路径访问。同一个 queue 内部串行执行,不同 queue 之间并行执行。合并会移动任务归属并自动删除源 queue 记录,只保留合并后的目标 queue;合并后的目标 queue 按任务原 `queueEnteredAt`/`createdAt` 时间顺序串行迁移 queued/retry_wait 任务后由 D601 scheduler 轮询推进。
- Code Queue 多队列 lane 由 `codex` 命令命名空间管理:`queues` 列表、`queue create <queueId>` 创建、`queue merge <sourceQueueId> --into <targetQueueId>` 合并、`move <taskId> --queue <queueId>` 迁移;这些队列管理入口默认由主 server `code-queue-mgr` 直管 PostgreSQL,仍通过稳定 `code-queue` 用户服务代理路径访问。同一个 queue 内部串行执行,不同 queue 之间并行执行。迁移只允许尚未被 scheduler claim 的 `queued`/`retry_wait` 任务,必须满足 `startedAt=null``currentAttempt=0` 且没有 active thread/turn;已进入 `running`/`judging` 或已有 claim 标记的任务返回 409,不得被 move/merge 回写成 queued。合并会移动可迁移任务归属并自动删除源 queue 记录,只保留合并后的目标 queue;若 source 或 target queue 存在 active/claimed 任务,合并整体返回 409。合并后的目标 queue 按任务原 `queueEnteredAt`/`createdAt` 时间顺序串行,成功迁移 queued/retry_wait 任务后由 D601 scheduler 轮询推进。
- 所有 `codex` 查询和管理命令必须走与 WebUI 相同的 backend-core 私有代理路径 `/api/microservices/code-queue/proxy/...`;CLI 不得为了提交、移动、中断、取消或队列管理直接调用 D601 内部 Service、数据库、pod curl 或 k3sctl scheduler 子服务。若该路径失败,应先修复 CLI/backend/provider tunnel 链路,而不是绕过控制面。
- `job list``job status` 查询 `.state/jobs/` 文件系统状态,是异步命令的可观测入口。
- `debug health``debug dispatch``debug task` 走真实内部 core、WebSocket、数据库、provider、系统指标、Docker 状态和 Host SSH 维护桥流程,只用于开发调试,不写入 `TEST.md` 的正式验收步骤。
@@ -583,6 +583,14 @@ function terminalTask(task: QueueTask): boolean {
return task.status === "succeeded" || task.status === "failed" || task.status === "canceled";
}
function hasClaimMarkers(task: QueueTask): boolean {
return task.startedAt !== null
|| task.currentAttempt > 0
|| task.codexThreadId !== null
|| task.activeTurnId !== null
|| task.attempts.length > 0;
}
function terminalTaskUnread(task: QueueTask): boolean {
return terminalTask(task) && task.readAt === null;
}
@@ -1462,8 +1470,8 @@ function taskListResponse(task: QueueTask, lite = true): JsonRecord {
reasoningEffort: task.reasoningEffort,
maxAttempts: task.maxAttempts,
status: task.status,
queuedReason: task.status === "queued" ? { code: "mgr-visible", label: "QUEUED", message: "Task is visible from master code-queue-mgr; D601 scheduler will pick it up from PostgreSQL." } : null,
queuedReasonLabel: task.status === "queued" ? "QUEUED" : null,
queuedReason: (task.status === "queued" || task.status === "retry_wait") ? { code: "mgr-visible", label: task.status === "retry_wait" ? "RETRY" : "QUEUED", message: "Task is visible from master code-queue-mgr; D601 scheduler will pick it up from PostgreSQL." } : null,
queuedReasonLabel: task.status === "retry_wait" ? "RETRY" : task.status === "queued" ? "QUEUED" : null,
createdAt: task.createdAt,
updatedAt: task.updatedAt,
startedAt: task.startedAt,
@@ -1915,7 +1923,7 @@ async function retryTask(taskId: string, req: Request): Promise<Response> {
if (rows[0] === undefined) return { task: null, changed: false };
const nextTask = rowToTask(rows[0]);
if (!terminalTask(nextTask)) return { task: nextTask, changed: false };
nextTask.status = "queued";
nextTask.status = hasClaimMarkers(nextTask) ? "retry_wait" : "queued";
nextTask.finishedAt = null;
nextTask.readAt = null;
nextTask.cancelRequested = false;
@@ -128,10 +128,12 @@ import {
readOaTraceStatsForTaskAttempts,
readOaTraceStatsForTasks,
} from "./oa-events";
import { configureSelfTests, runJudgeInfraSelfTest, runQueueOrderingSelfTest, runReferenceInjectionSelfTest, runTracePortSelfTest } from "./self-tests";
import { configureSelfTests, runJudgeInfraSelfTest, runQueueClaimMoveSelfTest, runQueueOrderingSelfTest, runReferenceInjectionSelfTest, runTracePortSelfTest } from "./self-tests";
import {
codexToolLifecycleStartedBeforeIn,
configureTaskView,
formatCommandOutput,
isCodexToolLifecycleOutput,
lastAssistantMessage,
promptLineCount,
recordNumberField,
@@ -281,6 +283,10 @@ function serviceRoleAllowsScheduler(role: CodeQueueServiceRole): boolean {
return role === "combined" || role === "scheduler";
}
function serviceRoleReadOnly(role: CodeQueueServiceRole): boolean {
return role === "read";
}
function envList(name: string, fallback: string[]): string[] {
const raw = process.env[name];
const source = raw === undefined || raw.length === 0 ? fallback.join(",") : raw;
@@ -1018,6 +1024,7 @@ function outputStartsTraceStepInHistory(outputs: LiveOutput[], output: LiveOutpu
if (output.channel === "user" && output.method === "enqueue") return false;
if (isOpenCodeStepBoundaryMethod(output.method)) return false;
if (output.channel === "system") return false;
if (codexToolLifecycleStartedBeforeIn(outputs, output)) return false;
if (output.channel === "diff" || output.channel === "tool" || output.channel === "error" || output.channel === "assistant" || output.channel === "reasoning") return true;
if (output.channel === "user") return true;
if (output.channel !== "command") return true;
@@ -1088,9 +1095,18 @@ function recordTaskOutputMetrics(task: QueueTask, output: LiveOutput, op: "set"
function outputUpdatesExistingTraceStep(output: LiveOutput): boolean {
if (output.channel === "assistant" || output.channel === "reasoning" || output.channel === "diff") return true;
if (isCodexToolLifecycleOutput(output) && output.method === "item/completed") return true;
return false;
}
function traceStepOutputForProjection(task: QueueTask, output: LiveOutput): LiveOutput {
if (!isCodexToolLifecycleOutput(output) || output.method !== "item/completed" || typeof output.itemId !== "string") return output;
const started = taskFullOutput(task)
.filter((item) => item !== output && isCodexToolLifecycleOutput(item) && item.itemId === output.itemId && item.method === "item/started")
.sort((left, right) => Number(left.seq) - Number(right.seq))[0];
return started === undefined ? output : { ...output, seq: started.seq, at: output.at, itemId: output.itemId, rawSeqs: [started.seq, output.seq] } as LiveOutput;
}
function errorToJson(error: unknown): JsonValue {
if (error instanceof Error) return { name: error.name, message: error.message, stack: error.stack ?? null };
return String(error);
@@ -1146,6 +1162,7 @@ function runGarbageCollection(): void {
}
function scheduleDatabaseFlush(delayMs = config.databaseFlushIntervalMs): void {
if (serviceRoleReadOnly(config.serviceRole)) return;
if (!databaseReady || (dirtyDatabaseTaskIds.size === 0 && dirtyDatabaseQueueIds.size === 0) || shutdownRequested) return;
if (databaseFlushTimer !== null) return;
databaseFlushTimer = setTimeout(() => {
@@ -1173,8 +1190,13 @@ function updateNextSeqFromTasks(): void {
state.nextSeq = nextSeq;
}
async function upsertTaskToDatabase(client: SqlExecutor, task: QueueTask): Promise<void> {
await client`
interface UpsertTaskOptions {
claimQueueId?: string | null;
}
async function upsertTaskToDatabase(client: SqlExecutor, task: QueueTask, options: UpsertTaskOptions = {}): Promise<boolean> {
const claimQueueId = options.claimQueueId ?? null;
const rows = await client<Array<{ read_at: Date | string | null }>>`
INSERT INTO unidesk_code_queue_tasks (
id,
queue_id,
@@ -1257,15 +1279,85 @@ async function upsertTaskToDatabase(client: SqlExecutor, task: QueueTask): Promi
updated_at = EXCLUDED.updated_at,
started_at = EXCLUDED.started_at,
finished_at = EXCLUDED.finished_at,
read_at = EXCLUDED.read_at,
read_at = CASE
WHEN EXCLUDED.status IN ('queued', 'running', 'judging', 'retry_wait') THEN NULL
WHEN unidesk_code_queue_tasks.read_at IS NOT NULL AND EXCLUDED.read_at IS NOT NULL THEN GREATEST(unidesk_code_queue_tasks.read_at, EXCLUDED.read_at)
WHEN unidesk_code_queue_tasks.read_at IS NOT NULL THEN unidesk_code_queue_tasks.read_at
ELSE EXCLUDED.read_at
END,
last_error = EXCLUDED.last_error,
last_judge = EXCLUDED.last_judge,
output_count = EXCLUDED.output_count,
event_count = EXCLUDED.event_count,
attempt_count = EXCLUDED.attempt_count,
last_output_seq = EXCLUDED.last_output_seq,
task_json = EXCLUDED.task_json
task_json = jsonb_set(
EXCLUDED.task_json,
'{readAt}',
CASE
WHEN EXCLUDED.status IN ('queued', 'running', 'judging', 'retry_wait') THEN 'null'::jsonb
WHEN unidesk_code_queue_tasks.read_at IS NOT NULL AND EXCLUDED.read_at IS NOT NULL THEN to_jsonb(GREATEST(unidesk_code_queue_tasks.read_at, EXCLUDED.read_at))
WHEN unidesk_code_queue_tasks.read_at IS NOT NULL THEN to_jsonb(unidesk_code_queue_tasks.read_at)
ELSE COALESCE(to_jsonb(EXCLUDED.read_at), 'null'::jsonb)
END,
true
)
WHERE (
${claimQueueId === null}
OR (
unidesk_code_queue_tasks.queue_id = ${claimQueueId}
AND (
(
unidesk_code_queue_tasks.status = 'queued'
AND unidesk_code_queue_tasks.started_at IS NULL
AND unidesk_code_queue_tasks.current_attempt = 0
AND unidesk_code_queue_tasks.codex_thread_id IS NULL
AND unidesk_code_queue_tasks.active_turn_id IS NULL
)
OR (
unidesk_code_queue_tasks.status = 'retry_wait'
AND unidesk_code_queue_tasks.active_turn_id IS NULL
)
)
)
)
AND NOT (
EXCLUDED.status IN ('queued', 'retry_wait')
AND EXCLUDED.started_at IS NULL
AND EXCLUDED.current_attempt = 0
AND EXCLUDED.codex_thread_id IS NULL
AND EXCLUDED.active_turn_id IS NULL
AND (
unidesk_code_queue_tasks.status IN ('running', 'judging')
OR unidesk_code_queue_tasks.started_at IS NOT NULL
OR unidesk_code_queue_tasks.current_attempt > 0
OR unidesk_code_queue_tasks.codex_thread_id IS NOT NULL
OR unidesk_code_queue_tasks.active_turn_id IS NOT NULL
)
)
RETURNING read_at
`;
if (rows.length === 0) {
const current = await client<DatabaseTaskStatusRow[]>`
SELECT id, queue_id, status, started_at, current_attempt, codex_thread_id, active_turn_id
FROM unidesk_code_queue_tasks
WHERE id = ${task.id}
LIMIT 1
`;
logger("warn", "database_task_stale_unclaimed_write_rejected", {
taskId: task.id,
attemptedQueueId: queueIdOf(task),
attemptedStatus: task.status,
attemptedStartedAt: task.startedAt,
attemptedCurrentAttempt: task.currentAttempt,
attemptedCodexThreadId: task.codexThreadId,
attemptedActiveTurnId: task.activeTurnId,
current: databaseStatusRowJson(current[0] ?? null),
});
return false;
}
task.readAt = timestampToIso(rows[0]?.read_at ?? null);
return true;
}
async function upsertQueueToDatabase(client: SqlExecutor, queue: QueueRecord): Promise<void> {
@@ -1313,9 +1405,21 @@ async function upsertWorkdirsToDatabase(records: WorkdirRecord[]): Promise<void>
interface DatabaseTaskRow {
id: string;
updated_at: Date | string;
status: TaskStatus;
read_at: Date | string | null;
task_json: unknown;
}
interface DatabaseTaskStatusRow {
id: string;
queue_id: string;
status: TaskStatus;
started_at: Date | string | null;
current_attempt: number | string | null;
codex_thread_id: string | null;
active_turn_id: string | null;
}
interface DatabaseQueueRow {
id: string;
name: string;
@@ -1331,7 +1435,13 @@ function normalizeDatabaseTaskRows(rows: DatabaseTaskRow[], source: string): Que
const tasks: QueueTask[] = [];
for (const row of rows) {
try {
tasks.push(normalizeTask(row.task_json as QueueTask));
const taskJson = row.task_json;
if (typeof taskJson !== "object" || taskJson === null || Array.isArray(taskJson)) throw new Error("task_json is not an object");
tasks.push(normalizeTask({
...(taskJson as Record<string, unknown>),
status: row.status,
readAt: timestampToIso(row.read_at),
} as unknown as QueueTask));
} catch (error) {
logger("warn", "database_task_row_ignored", { source, id: String(row.id), error: errorToJson(error) });
}
@@ -1339,13 +1449,170 @@ function normalizeDatabaseTaskRows(rows: DatabaseTaskRow[], source: string): Que
return tasks.sort((left, right) => (timestampMs(left.createdAt) ?? 0) - (timestampMs(right.createdAt) ?? 0) || left.id.localeCompare(right.id));
}
function databaseStatusRowJson(row: DatabaseTaskStatusRow | null): JsonValue {
if (row === null) return null;
return {
id: row.id,
queueId: safeQueueId(row.queue_id),
status: row.status,
startedAt: timestampToIso(row.started_at),
currentAttempt: Number(row.current_attempt ?? 0),
codexThreadId: row.codex_thread_id,
activeTurnId: row.active_turn_id,
};
}
function taskIsUnclaimedMovable(task: QueueTask): boolean {
return (task.status === "queued" || task.status === "retry_wait")
&& task.startedAt === null
&& task.currentAttempt === 0
&& task.codexThreadId === null
&& task.activeTurnId === null;
}
function databaseTaskMoveBlocker(row: DatabaseTaskStatusRow | null): string {
if (row === null) return "task not found";
if (row.status !== "queued" && row.status !== "retry_wait") return `status=${row.status}`;
if (row.started_at !== null) return "task already has started_at";
if (Number(row.current_attempt ?? 0) !== 0) return `task already has current_attempt=${Number(row.current_attempt ?? 0)}`;
if (row.codex_thread_id !== null) return "task already has codex_thread_id";
if (row.active_turn_id !== null) return "task already has active_turn_id";
return "";
}
function taskMoveBlocker(task: QueueTask): string {
if (activeRunForTask(task) !== null) return "task has an active agent run";
if (processingQueues.has(queueIdOf(task))) return "queue processor is currently active";
if (activeRunSlotReservations.has(queueIdOf(task))) return "queue is reserving an active run slot";
if (activeRunSlotWaiters.some((waiter) => waiter.taskId === task.id || waiter.queueId === queueIdOf(task))) return "queue is waiting for an active run slot";
if (task.status !== "queued" && task.status !== "retry_wait") return `status=${task.status}`;
if (!taskIsUnclaimedMovable(task)) return "task has already been claimed";
return "";
}
function reconcileHotTaskFromDatabase(task: QueueTask): QueueTask {
const existing = findTask(task.id);
if (existing === null) return rememberHotTask(task);
if (activeRunForTask(existing) !== null) return existing;
Object.assign(existing, task);
return existing;
}
function taskHasClaimMarkers(task: QueueTask): boolean {
return task.status === "running"
|| task.status === "judging"
|| task.startedAt !== null
|| task.currentAttempt > 0
|| task.codexThreadId !== null
|| task.activeTurnId !== null;
}
function shouldPreferHotTaskOverDatabase(hotTask: QueueTask, databaseTask: QueueTask): boolean {
if (activeRunForTask(hotTask) !== null) return true;
if (taskIsUnclaimedMovable(hotTask) && taskHasClaimMarkers(databaseTask)) return false;
const hotUpdatedAt = timestampMs(hotTask.updatedAt) ?? 0;
const databaseUpdatedAt = timestampMs(databaseTask.updatedAt) ?? 0;
return hotUpdatedAt >= databaseUpdatedAt;
}
async function deleteTaskFromDatabase(taskId: string): Promise<void> {
if (!databaseReady) return;
await sql`
DELETE FROM unidesk_code_queue_tasks
WHERE id = ${taskId}
`;
}
async function claimTaskInDatabase(task: QueueTask, expectedQueueId: string): Promise<boolean> {
if (!databaseReady) return true;
const claimed = await sql.begin(async (client) => await upsertTaskToDatabase(client, task, { claimQueueId: expectedQueueId }));
if (claimed) return true;
const databaseTask = await loadTaskFromDatabase(task.id);
if (databaseTask !== null) reconcileHotTaskFromDatabase(databaseTask);
logger("warn", "task_claim_conflict", {
taskId: task.id,
expectedQueueId,
attemptedQueueId: queueIdOf(task),
attemptedStatus: task.status,
attemptedCurrentAttempt: task.currentAttempt,
});
return false;
}
async function runDatabaseClaimMoveSelfTest(): Promise<JsonValue | null> {
if (!databaseReady) return null;
const suffix = String(Date.now());
const taskId = `codex_claim_move_db_${suffix}`;
const queuedAt = nowIso();
const sourceQueueId = `claim_move_db_source_${suffix}`;
const targetQueueId = `claim_move_db_target_${suffix}`;
const before = state.tasks.slice();
const beforeQueues = state.queues.slice();
await deleteTaskFromDatabase(taskId);
try {
const queuedTask = normalizeTask({
...createTask({ prompt: "claim/move DB race self-test", queueId: sourceQueueId }),
id: taskId,
queueId: sourceQueueId,
queueEnteredAt: queuedAt,
createdAt: queuedAt,
updatedAt: queuedAt,
output: [],
});
await sql.begin(async (client) => {
await upsertQueueToDatabase(client, { id: sourceQueueId, name: sourceQueueId, createdAt: queuedAt, updatedAt: queuedAt });
await upsertTaskToDatabase(client, queuedTask);
});
const staleHotTask = normalizeTask(JSON.parse(JSON.stringify(queuedTask)) as QueueTask);
const claimedTask = normalizeTask(JSON.parse(JSON.stringify(queuedTask)) as QueueTask);
const claimedAt = nowIso();
claimedTask.status = "running";
claimedTask.startedAt = claimedAt;
claimedTask.currentAttempt = 1;
claimedTask.currentMode = "initial";
claimedTask.updatedAt = claimedAt;
const claimed = await claimTaskInDatabase(claimedTask, sourceQueueId);
if (!claimed) throw new Error("database claim self-test failed to claim queued task");
state.tasks.splice(0, state.tasks.length, staleHotTask);
const response = await moveTaskToQueue(staleHotTask, new Request(`http://code-queue.local/api/tasks/${taskId}/move`, {
method: "POST",
body: JSON.stringify({ queueId: targetQueueId }),
headers: { "content-type": "application/json" },
}), { bypassRoleCheck: true });
const after = await loadTaskFromDatabase(taskId);
const body = await response.json() as Record<string, JsonValue>;
if (response.status !== 409) throw new Error(`database stale move should return 409, got ${response.status}`);
if (after === null) throw new Error("database self-test task disappeared after stale move");
if (after.status !== "running") throw new Error(`database self-test task status changed to ${after.status}`);
if (queueIdOf(after) !== sourceQueueId) throw new Error(`database self-test task queue changed to ${queueIdOf(after)}`);
if (after.currentAttempt !== 1 || after.startedAt === null) throw new Error("database self-test task claim markers were lost");
return {
ok: true,
taskId,
moveStatus: response.status,
databaseStatus: after.status,
databaseQueueId: queueIdOf(after),
currentAttempt: after.currentAttempt,
startedAt: after.startedAt,
response: body as JsonValue,
} as unknown as JsonValue;
} finally {
await deleteTaskFromDatabase(taskId);
await deleteDatabaseQueues([sourceQueueId, targetQueueId]);
state.tasks.splice(0, state.tasks.length, ...before);
state.queues.splice(0, state.queues.length, ...beforeQueues);
}
}
async function loadPrunedDatabaseTaskRows(where: "all" | "hot"): Promise<DatabaseTaskRow[]> {
return await sql<DatabaseTaskRow[]>`
SELECT id, updated_at, task_json
SELECT id, updated_at, status, read_at, task_json
FROM (
SELECT
id,
updated_at,
status,
read_at,
jsonb_set(
jsonb_set(
task_json,
@@ -1395,8 +1662,7 @@ async function loadPrunedDatabaseTaskRows(where: "all" | "hot"): Promise<Databas
END,
true
) AS task_json,
created_at,
status
created_at
FROM unidesk_code_queue_tasks
) AS pruned_tasks
WHERE ${where === "all"} OR status IN ('queued', 'running', 'judging', 'retry_wait')
@@ -1437,11 +1703,13 @@ async function loadTasksFromDatabaseByIds(taskIds: string[]): Promise<QueueTask[
const ids = Array.from(new Set(taskIds.map((id) => id.trim()).filter(Boolean)));
if (ids.length === 0) return [];
const rows = await sql<DatabaseTaskRow[]>`
SELECT id, updated_at, task_json
SELECT id, updated_at, status, read_at, task_json
FROM (
SELECT
id,
updated_at,
status,
read_at,
task_json - 'output' - 'events' - 'attempts' - 'promptHistory' AS task_json
FROM unidesk_code_queue_tasks
WHERE id IN ${sql(ids)}
@@ -1452,11 +1720,13 @@ async function loadTasksFromDatabaseByIds(taskIds: string[]): Promise<QueueTask[
async function loadTaskFromDatabase(taskId: string): Promise<QueueTask | null> {
const rows = await sql<DatabaseTaskRow[]>`
SELECT id, updated_at, task_json
SELECT id, updated_at, status, read_at, task_json
FROM (
SELECT
id,
updated_at,
status,
read_at,
jsonb_set(
jsonb_set(
task_json,
@@ -1636,9 +1906,9 @@ async function findTaskForRead(taskId: string): Promise<QueueTask | null> {
if (!databaseReady) return hotTask;
try {
const databaseTask = await loadTaskFromDatabase(taskId);
if (hotTask === null) return databaseTask;
if (databaseTask === null) return hotTask;
if (hotTask !== null && (timestampMs(hotTask.updatedAt) ?? 0) > (timestampMs(databaseTask.updatedAt) ?? 0)) return hotTask;
return databaseTask;
return shouldPreferHotTaskOverDatabase(hotTask, databaseTask) ? hotTask : databaseTask;
} catch (error) {
databaseLastError = databaseErrorMessage(error);
logger("warn", "read_database_fallback", { taskId, error: errorToJson(error) });
@@ -1649,8 +1919,16 @@ async function findTaskForRead(taskId: string): Promise<QueueTask | null> {
async function findTaskForMutation(taskId: string): Promise<QueueTask | null> {
const hotTask = findTask(taskId);
if (!databaseReady) return hotTask;
const task = hotTask ?? await loadTaskFromDatabase(taskId);
return task === null ? null : rememberHotTask(task);
try {
const databaseTask = await loadTaskFromDatabase(taskId);
if (databaseTask === null) return hotTask;
if (hotTask === null) return rememberHotTask(databaseTask);
return shouldPreferHotTaskOverDatabase(hotTask, databaseTask) ? hotTask : reconcileHotTaskFromDatabase(databaseTask);
} catch (error) {
databaseLastError = databaseErrorMessage(error);
logger("warn", "mutation_database_fallback", { taskId, error: errorToJson(error) });
return hotTask;
}
}
async function loadNextSeqFromDatabase(): Promise<number> {
@@ -1663,6 +1941,11 @@ async function loadNextSeqFromDatabase(): Promise<number> {
}
async function flushDirtyTasksToDatabase(force = false): Promise<void> {
if (serviceRoleReadOnly(config.serviceRole)) {
dirtyDatabaseTaskIds.clear();
dirtyDatabaseQueueIds.clear();
return;
}
if (!databaseReady) return;
if (databaseFlushInFlight && !force) {
scheduleDatabaseFlush();
@@ -1674,6 +1957,7 @@ async function flushDirtyTasksToDatabase(force = false): Promise<void> {
dirtyDatabaseTaskIds.clear();
dirtyDatabaseQueueIds.clear();
databaseFlushInFlight = true;
const rejectedTaskIds: string[] = [];
try {
await sql.begin(async (client) => {
for (const id of queueIds) {
@@ -1682,7 +1966,7 @@ async function flushDirtyTasksToDatabase(force = false): Promise<void> {
}
for (const id of ids) {
const task = state.tasks.find((item) => item.id === id);
if (task !== undefined) await upsertTaskToDatabase(client, task);
if (task !== undefined && !await upsertTaskToDatabase(client, task)) rejectedTaskIds.push(id);
}
});
databaseLastError = null;
@@ -1694,93 +1978,99 @@ async function flushDirtyTasksToDatabase(force = false): Promise<void> {
databaseFlushInFlight = false;
if (dirtyDatabaseTaskIds.size > 0 || dirtyDatabaseQueueIds.size > 0) scheduleDatabaseFlush();
}
for (const id of rejectedTaskIds) {
const databaseTask = await loadTaskFromDatabase(id);
if (databaseTask !== null) reconcileHotTaskFromDatabase(databaseTask);
}
}
async function initDatabasePersistence(): Promise<void> {
logger("info", "database_persistence_init_start", { databaseUrl: redactDatabaseUrl(config.databaseUrl) });
await sql`
CREATE TABLE IF NOT EXISTS unidesk_code_queue_tasks (
id TEXT PRIMARY KEY,
queue_id TEXT NOT NULL DEFAULT 'default',
status TEXT NOT NULL,
provider_id TEXT NOT NULL DEFAULT 'main-server',
execution_mode TEXT NOT NULL DEFAULT 'default',
model TEXT NOT NULL,
cwd TEXT NOT NULL,
prompt TEXT NOT NULL,
base_prompt TEXT NOT NULL DEFAULT '',
reference_task_ids JSONB NOT NULL DEFAULT '[]'::jsonb,
reference_injection JSONB,
reasoning_effort TEXT,
max_attempts INTEGER NOT NULL,
current_attempt INTEGER NOT NULL DEFAULT 0,
current_mode TEXT,
codex_thread_id TEXT,
active_turn_id TEXT,
created_at TIMESTAMPTZ NOT NULL,
updated_at TIMESTAMPTZ NOT NULL,
started_at TIMESTAMPTZ,
finished_at TIMESTAMPTZ,
read_at TIMESTAMPTZ,
last_error TEXT,
last_judge JSONB,
output_count INTEGER NOT NULL DEFAULT 0,
event_count INTEGER NOT NULL DEFAULT 0,
attempt_count INTEGER NOT NULL DEFAULT 0,
last_output_seq BIGINT NOT NULL DEFAULT 0,
task_json JSONB NOT NULL
)
`;
await sql`
CREATE TABLE IF NOT EXISTS unidesk_code_queue_queues (
id TEXT PRIMARY KEY,
name TEXT NOT NULL DEFAULT '',
created_at TIMESTAMPTZ NOT NULL,
updated_at TIMESTAMPTZ NOT NULL
)
`;
await sql`
CREATE TABLE IF NOT EXISTS unidesk_code_queue_workdirs (
provider_id TEXT NOT NULL,
execution_mode TEXT NOT NULL DEFAULT 'default',
path TEXT NOT NULL,
created_at TIMESTAMPTZ NOT NULL,
updated_at TIMESTAMPTZ NOT NULL,
PRIMARY KEY (provider_id, execution_mode, path)
)
`;
await sql`
CREATE TABLE IF NOT EXISTS unidesk_code_queue_notifications (
id TEXT PRIMARY KEY,
kind TEXT NOT NULL,
dedup_key TEXT NOT NULL,
target TEXT NOT NULL,
message TEXT NOT NULL,
created_at TIMESTAMPTZ NOT NULL,
updated_at TIMESTAMPTZ NOT NULL,
attempts INTEGER NOT NULL DEFAULT 0,
next_attempt_at TIMESTAMPTZ NOT NULL,
last_error TEXT,
sent_at TIMESTAMPTZ
)
`;
await sql`ALTER TABLE unidesk_code_queue_tasks ADD COLUMN IF NOT EXISTS queue_id TEXT NOT NULL DEFAULT 'default'`;
await sql`ALTER TABLE unidesk_code_queue_tasks ADD COLUMN IF NOT EXISTS provider_id TEXT NOT NULL DEFAULT 'main-server'`;
await sql`ALTER TABLE unidesk_code_queue_tasks ADD COLUMN IF NOT EXISTS execution_mode TEXT NOT NULL DEFAULT 'default'`;
await sql`ALTER TABLE unidesk_code_queue_tasks ADD COLUMN IF NOT EXISTS base_prompt TEXT NOT NULL DEFAULT ''`;
await sql`ALTER TABLE unidesk_code_queue_tasks ADD COLUMN IF NOT EXISTS reference_task_ids JSONB NOT NULL DEFAULT '[]'::jsonb`;
await sql`ALTER TABLE unidesk_code_queue_tasks ADD COLUMN IF NOT EXISTS reference_injection JSONB`;
await sql`ALTER TABLE unidesk_code_queue_tasks ADD COLUMN IF NOT EXISTS read_at TIMESTAMPTZ`;
await sql`
UPDATE unidesk_code_queue_tasks
SET read_at = NULLIF(task_json->>'readAt', '')::timestamptz
WHERE read_at IS NULL
AND status IN ('succeeded', 'failed', 'canceled')
AND COALESCE(task_json->>'readAt', '') <> ''
AND (task_json->>'readAt') ~ '^\\d{4}-\\d{2}-\\d{2}T'
`;
await sql`ALTER TABLE unidesk_code_queue_queues ADD COLUMN IF NOT EXISTS name TEXT NOT NULL DEFAULT ''`;
await sql`ALTER TABLE unidesk_code_queue_workdirs ADD COLUMN IF NOT EXISTS execution_mode TEXT NOT NULL DEFAULT 'default'`;
if (!serviceRoleReadOnly(config.serviceRole)) {
await sql`
CREATE TABLE IF NOT EXISTS unidesk_code_queue_tasks (
id TEXT PRIMARY KEY,
queue_id TEXT NOT NULL DEFAULT 'default',
status TEXT NOT NULL,
provider_id TEXT NOT NULL DEFAULT 'main-server',
execution_mode TEXT NOT NULL DEFAULT 'default',
model TEXT NOT NULL,
cwd TEXT NOT NULL,
prompt TEXT NOT NULL,
base_prompt TEXT NOT NULL DEFAULT '',
reference_task_ids JSONB NOT NULL DEFAULT '[]'::jsonb,
reference_injection JSONB,
reasoning_effort TEXT,
max_attempts INTEGER NOT NULL,
current_attempt INTEGER NOT NULL DEFAULT 0,
current_mode TEXT,
codex_thread_id TEXT,
active_turn_id TEXT,
created_at TIMESTAMPTZ NOT NULL,
updated_at TIMESTAMPTZ NOT NULL,
started_at TIMESTAMPTZ,
finished_at TIMESTAMPTZ,
read_at TIMESTAMPTZ,
last_error TEXT,
last_judge JSONB,
output_count INTEGER NOT NULL DEFAULT 0,
event_count INTEGER NOT NULL DEFAULT 0,
attempt_count INTEGER NOT NULL DEFAULT 0,
last_output_seq BIGINT NOT NULL DEFAULT 0,
task_json JSONB NOT NULL
)
`;
await sql`
CREATE TABLE IF NOT EXISTS unidesk_code_queue_queues (
id TEXT PRIMARY KEY,
name TEXT NOT NULL DEFAULT '',
created_at TIMESTAMPTZ NOT NULL,
updated_at TIMESTAMPTZ NOT NULL
)
`;
await sql`
CREATE TABLE IF NOT EXISTS unidesk_code_queue_workdirs (
provider_id TEXT NOT NULL,
execution_mode TEXT NOT NULL DEFAULT 'default',
path TEXT NOT NULL,
created_at TIMESTAMPTZ NOT NULL,
updated_at TIMESTAMPTZ NOT NULL,
PRIMARY KEY (provider_id, execution_mode, path)
)
`;
await sql`
CREATE TABLE IF NOT EXISTS unidesk_code_queue_notifications (
id TEXT PRIMARY KEY,
kind TEXT NOT NULL,
dedup_key TEXT NOT NULL,
target TEXT NOT NULL,
message TEXT NOT NULL,
created_at TIMESTAMPTZ NOT NULL,
updated_at TIMESTAMPTZ NOT NULL,
attempts INTEGER NOT NULL DEFAULT 0,
next_attempt_at TIMESTAMPTZ NOT NULL,
last_error TEXT,
sent_at TIMESTAMPTZ
)
`;
await sql`ALTER TABLE unidesk_code_queue_tasks ADD COLUMN IF NOT EXISTS queue_id TEXT NOT NULL DEFAULT 'default'`;
await sql`ALTER TABLE unidesk_code_queue_tasks ADD COLUMN IF NOT EXISTS provider_id TEXT NOT NULL DEFAULT 'main-server'`;
await sql`ALTER TABLE unidesk_code_queue_tasks ADD COLUMN IF NOT EXISTS execution_mode TEXT NOT NULL DEFAULT 'default'`;
await sql`ALTER TABLE unidesk_code_queue_tasks ADD COLUMN IF NOT EXISTS base_prompt TEXT NOT NULL DEFAULT ''`;
await sql`ALTER TABLE unidesk_code_queue_tasks ADD COLUMN IF NOT EXISTS reference_task_ids JSONB NOT NULL DEFAULT '[]'::jsonb`;
await sql`ALTER TABLE unidesk_code_queue_tasks ADD COLUMN IF NOT EXISTS reference_injection JSONB`;
await sql`ALTER TABLE unidesk_code_queue_tasks ADD COLUMN IF NOT EXISTS read_at TIMESTAMPTZ`;
await sql`
UPDATE unidesk_code_queue_tasks
SET read_at = NULLIF(task_json->>'readAt', '')::timestamptz
WHERE read_at IS NULL
AND status IN ('succeeded', 'failed', 'canceled')
AND COALESCE(task_json->>'readAt', '') <> ''
AND (task_json->>'readAt') ~ '^\\d{4}-\\d{2}-\\d{2}T'
`;
await sql`ALTER TABLE unidesk_code_queue_queues ADD COLUMN IF NOT EXISTS name TEXT NOT NULL DEFAULT ''`;
await sql`ALTER TABLE unidesk_code_queue_workdirs ADD COLUMN IF NOT EXISTS execution_mode TEXT NOT NULL DEFAULT 'default'`;
}
const countRows = await sql<Array<{ count: string | number }>>`SELECT COUNT(*) AS count FROM unidesk_code_queue_tasks`;
const hotTasks = await loadTasksFromDatabase("hot");
@@ -1825,7 +2115,7 @@ async function initDatabasePersistence(): Promise<void> {
}
}
ensureDefaultWorkdirRecords();
await upsertWorkdirsToDatabase(sortedWorkdirRecords());
if (!serviceRoleReadOnly(config.serviceRole)) await upsertWorkdirsToDatabase(sortedWorkdirRecords());
databaseReady = true;
if (config.serviceRole === "combined" || config.serviceRole === "scheduler") scheduleStartupDatabaseMaintenance();
runGarbageCollection();
@@ -2311,8 +2601,9 @@ configureTaskOutput({
onOutputAppended: (task, output, op) => {
const archiveOp = op === "append" ? "append" : "set";
const stepChanged = recordTaskOutputMetrics(task, output, archiveOp);
if (stepChanged) publishCodeQueueTraceStep(task, queueIdOf(task), output, taskOutputMaxSeq(task));
else if (archiveOp === "append" && outputUpdatesExistingTraceStep(output)) publishCodeQueueTraceStep(task, queueIdOf(task), output, taskOutputMaxSeq(task), null, String(output.text || "").length);
const projectionOutput = traceStepOutputForProjection(task, output);
if (stepChanged) publishCodeQueueTraceStep(task, queueIdOf(task), projectionOutput, taskOutputMaxSeq(task));
else if ((archiveOp === "append" || output.method === "item/completed") && outputUpdatesExistingTraceStep(output)) publishCodeQueueTraceStep(task, queueIdOf(task), projectionOutput, taskOutputMaxSeq(task), null, String(output.text || "").length);
if (archiveOp === "append" && !outputCanChangeStepCount(output)) return;
publishTaskOaEvent(task, "output", { onlyStepChange: archiveOp === "append", stepChanged });
},
@@ -2416,6 +2707,7 @@ configureSelfTests({
defaultQueueId,
enqueueActiveRunSlotWaiter,
injectReferencedTaskContext,
moveTaskToQueueForTest: (task, req) => moveTaskToQueue(task, req, { bypassRoleCheck: true }),
nextRunnableTaskFrom,
normalizeTask,
nowIso,
@@ -2424,6 +2716,8 @@ configureSelfTests({
queuedStatusReason,
removeActiveRunSlotWaiter,
resolveReasoningEffort,
runDatabaseClaimMoveSelfTest,
tasks: () => state.tasks,
updateProcessingFlag,
});
@@ -2957,7 +3251,8 @@ function failTaskForFallbackRetryLimit(task: QueueTask, judge: JudgeResult | nul
}
async function runTask(task: QueueTask): Promise<void> {
logger("info", "task_processor_start", { taskId: task.id, queueId: queueIdOf(task), providerId: task.providerId, executionMode: task.executionMode, cwd: task.cwd, maxAttempts: task.maxAttempts, model: task.model, agentPort: codeAgentPortForModel(task.model), promptPreview: safePreview(task.prompt, 240) });
const claimQueueId = queueIdOf(task);
logger("info", "task_processor_start", { taskId: task.id, queueId: claimQueueId, providerId: task.providerId, executionMode: task.executionMode, cwd: task.cwd, maxAttempts: task.maxAttempts, model: task.model, agentPort: codeAgentPortForModel(task.model), promptPreview: safePreview(task.prompt, 240) });
if (task.status === "retry_wait" && task.lastJudge?.source === "fallback" && task.lastJudge.decision === "retry" && fallbackJudgeRetryCount(task) >= fallbackJudgeRetryLimit) {
failTaskForFallbackRetryLimit(task, task.lastJudge);
return;
@@ -2988,6 +3283,11 @@ async function runTask(task: QueueTask): Promise<void> {
task.readAt = null;
task.finishedAt = null;
task.updatedAt = startedAt;
if (!await claimTaskInDatabase(task, claimQueueId)) {
releaseRunSlot();
return;
}
publishTaskOaEvent(task, "claim");
logger("info", "task_run_start", { taskId: task.id, queueId: queueIdOf(task), attempt: task.currentAttempt, mode, providerId: task.providerId, executionMode: task.executionMode, cwd: task.cwd, maxAttempts: task.maxAttempts, model: task.model, agentPort: codeAgentPortForModel(task.model), freshRecovery: needsFreshRecoveryPrompt });
const attemptStartOutput = appendOutput(task, "system", `attempt ${task.currentAttempt}/${task.maxAttempts} queue=${queueIdOf(task)} provider=${task.providerId} executionMode=${task.executionMode} cwd=${task.cwd} mode=${mode} model=${task.model} port=${codeAgentPortForModel(task.model)}\n`, "queue");
@@ -4011,7 +4311,9 @@ function queueMergeBlocker(queueId: string): string | null {
if (activeRunSlotReservations.has(queueId)) return "queue is reserving an active run slot";
if (activeRunSlotWaiters.some((waiter) => waiter.queueId === queueId)) return "queue is waiting for an active run slot";
const activeTask = state.tasks.find((task) => queueIdOf(task) === queueId && (task.status === "running" || task.status === "judging"));
return activeTask === undefined ? null : `task ${activeTask.id} is ${activeTask.status}`;
if (activeTask !== undefined) return `task ${activeTask.id} is ${activeTask.status}`;
const claimedPendingTask = state.tasks.find((task) => queueIdOf(task) === queueId && (task.status === "queued" || task.status === "retry_wait") && !taskIsUnclaimedMovable(task));
return claimedPendingTask === undefined ? null : `task ${claimedPendingTask.id} has already been claimed`;
}
function parseSourceQueueIds(record: Record<string, unknown>, targetQueueId: string): string[] {
@@ -4034,27 +4336,117 @@ function parseSourceQueueIds(record: Record<string, unknown>, targetQueueId: str
return ids;
}
async function mergeDatabaseQueueTasks(sourceQueueIds: string[], targetQueueId: string): Promise<string[]> {
if (!databaseReady || sourceQueueIds.length === 0) return [];
const rows = await sql<Array<{ id: string }>>`
UPDATE unidesk_code_queue_tasks
SET
queue_id = ${targetQueueId},
task_json = jsonb_set(
jsonb_set(
task_json,
'{queueId}',
to_jsonb(${targetQueueId}::text),
async function mergeDatabaseQueueTasks(sourceQueueIds: string[], targetQueueId: string, mergedAt: string): Promise<{ movedTaskIds: string[]; blocker: DatabaseTaskStatusRow | null }> {
if (!databaseReady || sourceQueueIds.length === 0) return { movedTaskIds: [], blocker: null };
return await sql.begin(async (client) => {
const mergeQueueIds = Array.from(new Set([targetQueueId, ...sourceQueueIds]));
const lockedRows = await client<DatabaseTaskStatusRow[]>`
SELECT id, queue_id, status, started_at, current_attempt, codex_thread_id, active_turn_id
FROM unidesk_code_queue_tasks
WHERE queue_id IN ${client(mergeQueueIds)}
ORDER BY updated_at DESC, id DESC
FOR UPDATE
`;
const blocker = lockedRows.find((row) => {
return row.status === "running"
|| row.status === "judging"
|| (
(row.status === "queued" || row.status === "retry_wait")
&& (
row.started_at !== null
|| Number(row.current_attempt ?? 0) > 0
|| row.codex_thread_id !== null
|| row.active_turn_id !== null
)
);
}) ?? null;
if (blocker !== null) return { movedTaskIds: [], blocker };
const rows = await client<Array<{ id: string }>>`
UPDATE unidesk_code_queue_tasks
SET
queue_id = ${targetQueueId},
updated_at = ${mergedAt},
task_json = jsonb_set(
jsonb_set(
jsonb_set(
task_json,
'{queueId}',
to_jsonb(${targetQueueId}::text),
true
),
'{queueEnteredAt}',
to_jsonb(COALESCE(NULLIF(task_json->>'queueEnteredAt', ''), to_char(created_at AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.MS"Z"'))::text),
true
),
'{updatedAt}',
to_jsonb(${mergedAt}::text),
true
),
'{queueEnteredAt}',
to_jsonb(COALESCE(NULLIF(task_json->>'queueEnteredAt', ''), to_char(created_at AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.MS"Z"'))::text),
true
)
WHERE queue_id IN ${sql(sourceQueueIds)}
RETURNING id
`;
return rows.map((row) => row.id);
)
WHERE queue_id IN ${client(sourceQueueIds)}
AND (
status IN ('succeeded', 'failed', 'canceled')
OR (
status IN ('queued', 'retry_wait')
AND started_at IS NULL
AND current_attempt = 0
AND codex_thread_id IS NULL
AND active_turn_id IS NULL
)
)
RETURNING id
`;
return { movedTaskIds: rows.map((row) => row.id), blocker: null };
});
}
async function moveDatabaseTaskToQueue(taskId: string, targetQueueId: string, movedAt: string): Promise<{ ok: boolean; row: DatabaseTaskStatusRow | null; previousQueueId: string | null; blocker: string }> {
if (!databaseReady) return { ok: true, row: null, previousQueueId: null, blocker: "" };
return await sql.begin(async (client) => {
const rows = await client<DatabaseTaskStatusRow[]>`
SELECT id, queue_id, status, started_at, current_attempt, codex_thread_id, active_turn_id
FROM unidesk_code_queue_tasks
WHERE id = ${taskId}
LIMIT 1
FOR UPDATE
`;
const row = rows[0] ?? null;
const blocker = databaseTaskMoveBlocker(row);
if (blocker.length > 0) return { ok: false, row, previousQueueId: row === null ? null : safeQueueId(row.queue_id), blocker };
const previousQueueId = safeQueueId(row?.queue_id);
const updated = await client<DatabaseTaskStatusRow[]>`
UPDATE unidesk_code_queue_tasks
SET
queue_id = ${targetQueueId},
updated_at = ${movedAt},
task_json = jsonb_set(
jsonb_set(
jsonb_set(
task_json,
'{queueId}',
to_jsonb(${targetQueueId}::text),
true
),
'{queueEnteredAt}',
to_jsonb(${movedAt}::text),
true
),
'{updatedAt}',
to_jsonb(${movedAt}::text),
true
)
WHERE id = ${taskId}
AND status IN ('queued', 'retry_wait')
AND started_at IS NULL
AND current_attempt = 0
AND codex_thread_id IS NULL
AND active_turn_id IS NULL
RETURNING id, queue_id, status, started_at, current_attempt, codex_thread_id, active_turn_id
`;
const updatedRow = updated[0] ?? null;
return updatedRow === null
? { ok: false, row, previousQueueId, blocker: "conditional update matched no rows" }
: { ok: true, row: updatedRow, previousQueueId, blocker: "" };
});
}
async function deleteDatabaseQueues(queueIds: string[]): Promise<string[]> {
@@ -4114,6 +4506,17 @@ async function mergeQueues(targetQueueIdValue: string | null, req: Request): Pro
}
const mergedAt = nowIso();
const databaseMerge = await mergeDatabaseQueueTasks(sourceQueueIds, targetQueueId, mergedAt);
if (databaseMerge.blocker !== null) {
const blockerQueueId = safeQueueId(databaseMerge.blocker.queue_id);
const databaseTask = await loadTaskFromDatabase(databaseMerge.blocker.id);
if (databaseTask !== null) reconcileHotTaskFromDatabase(databaseTask);
return jsonResponse({
ok: false,
error: `cannot merge queue ${blockerQueueId}: task ${databaseMerge.blocker.id} is already claimed (${databaseTaskMoveBlocker(databaseMerge.blocker) || databaseMerge.blocker.status})`,
blocker: databaseStatusRowJson(databaseMerge.blocker),
}, 409);
}
const targetQueue = ensureQueue(targetQueueId);
const sourceQueues = sourceQueueIds.map((id) => queueSnapshot(id, mergedAt));
targetQueue.updatedAt = mergedAt;
@@ -4126,11 +4529,11 @@ async function mergeQueues(targetQueueIdValue: string | null, req: Request): Pro
if (!sourceSet.has(previousQueueId)) continue;
task.queueEnteredAt = taskQueueEnteredAt(task);
task.queueId = targetQueueId;
task.updatedAt = mergedAt;
hotMovedTasks.push(task);
markTaskDirty(task.id);
publishTaskOaEvent(task, "queue-merged");
}
const databaseMovedTaskIds = await mergeDatabaseQueueTasks(sourceQueueIds, targetQueueId);
const deletedSourceQueues = deleteQueuesFromState(sourceQueueIds);
const databaseDeletedQueueIds = await deleteDatabaseQueues(sourceQueueIds);
persistState(false);
@@ -4142,14 +4545,14 @@ async function mergeQueues(targetQueueIdValue: string | null, req: Request): Pro
sourceQueueIds,
deletedSourceQueueIds: deletedSourceQueues.map((queue) => queue.id),
hotMovedTaskCount: hotMovedTasks.length,
databaseMovedTaskCount: databaseReady ? databaseMovedTaskIds.length : null,
databaseMovedTaskCount: databaseReady ? databaseMerge.movedTaskIds.length : null,
databaseDeletedQueueIds: databaseReady ? databaseDeletedQueueIds : null,
});
for (const id of mergeQueueIds) mergingQueues.delete(id);
scheduleQueue(targetQueueId);
await flushDirtyTasksToDatabase(true);
const tasks = await loadAllTasksForRead();
const movedIdSet = new Set(databaseReady ? databaseMovedTaskIds : hotMovedTasks.map((task) => task.id));
const movedIdSet = new Set(databaseReady ? databaseMerge.movedTaskIds : hotMovedTasks.map((task) => task.id));
const orderedMovedTaskIds = tasks
.filter((task) => movedIdSet.has(task.id))
.sort(compareTaskQueueOrder)
@@ -4162,7 +4565,7 @@ async function mergeQueues(targetQueueIdValue: string | null, req: Request): Pro
ok: true,
targetQueueId,
sourceQueueIds,
mergedTaskCount: databaseReady ? databaseMovedTaskIds.length : hotMovedTasks.length,
mergedTaskCount: databaseReady ? databaseMerge.movedTaskIds.length : hotMovedTasks.length,
movedTaskIds: orderedMovedTaskIds.slice(0, 500),
targetTaskOrder: targetTaskOrder.slice(0, 500),
order: "merged tasks keep their original queueEnteredAt/createdAt ordering; source queue records are deleted after merge",
@@ -4178,19 +4581,39 @@ async function mergeQueues(targetQueueIdValue: string | null, req: Request): Pro
}
}
async function moveTaskToQueue(task: QueueTask, req: Request): Promise<Response> {
if (!serviceRoleAllowsWrite(config.serviceRole)) return readOnlyRejectResponse(req.method, `/api/tasks/${task.id}/move`);
async function moveTaskToQueue(task: QueueTask, req: Request, options: { bypassRoleCheck?: boolean } = {}): Promise<Response> {
if (options.bypassRoleCheck !== true && !serviceRoleAllowsWrite(config.serviceRole)) return readOnlyRejectResponse(req.method, `/api/tasks/${task.id}/move`);
const notReady = requireDatabaseReadyForWrite(req.method, `/api/tasks/${task.id}/move`);
if (notReady !== null) return notReady;
if (task.status === "running" || task.status === "judging") {
return jsonResponse({ ok: false, error: `cannot move active task ${task.id} while status=${task.status}`, task: taskForResponse(task) }, 409);
}
const body = await readJson(req);
const record = typeof body === "object" && body !== null && !Array.isArray(body) ? body as Record<string, unknown> : {};
const queueId = normalizeQueueId(record.queueId ?? record.id);
const previousQueueId = queueIdOf(task);
const queue = ensureQueue(queueId);
const movedAt = nowIso();
const hotBlocker = taskMoveBlocker(task);
if (hotBlocker.length > 0) {
const databaseTask = databaseReady ? await loadTaskFromDatabase(task.id) : null;
if (databaseTask !== null) task = reconcileHotTaskFromDatabase(databaseTask);
return jsonResponse({
ok: false,
error: `cannot move task ${task.id}: ${hotBlocker}`,
task: taskForResponse(task),
databaseTask: databaseTask === null ? null : taskForResponse(databaseTask),
}, 409);
}
const databaseMove = await moveDatabaseTaskToQueue(task.id, queueId, movedAt);
if (!databaseMove.ok) {
const databaseTask = databaseReady ? await loadTaskFromDatabase(task.id) : null;
if (databaseTask !== null) task = reconcileHotTaskFromDatabase(databaseTask);
return jsonResponse({
ok: false,
error: `cannot move task ${task.id}: ${databaseMove.blocker}`,
blocker: databaseStatusRowJson(databaseMove.row),
task: taskForResponse(task),
databaseTask: databaseTask === null ? null : taskForResponse(databaseTask),
}, databaseMove.row === null ? 404 : 409);
}
const previousQueueId = databaseMove.previousQueueId ?? queueIdOf(task);
const queue = ensureQueue(queueId);
queue.updatedAt = movedAt;
markQueueDirty(queue.id);
task.queueId = queueId;
@@ -4224,9 +4647,14 @@ async function backfillOaTraceStats(url: URL): Promise<JsonValue> {
const attemptBySeq = outputAttemptIndexMap(output);
if (includeSteps) {
for (const item of output) {
if (!outputStartsTraceStepInHistory(output, item)) continue;
publishCodeQueueTraceStep(task, queueId, item, outputMaxSeq, attemptBySeq.get(item.seq) ?? null);
stepEventCount += 1;
const projectionOutput = traceStepOutputForProjection(task, item);
if (outputStartsTraceStepInHistory(output, item)) {
publishCodeQueueTraceStep(task, queueId, projectionOutput, outputMaxSeq, attemptBySeq.get(item.seq) ?? null);
stepEventCount += 1;
} else if (outputUpdatesExistingTraceStep(item)) {
publishCodeQueueTraceStep(task, queueId, projectionOutput, outputMaxSeq, attemptBySeq.get(projectionOutput.seq) ?? attemptBySeq.get(item.seq) ?? null, String(item.text || "").length);
stepEventCount += 1;
}
}
}
publishCodeQueueTraceStatsSnapshot(task, queueId, "backfill", traceStats.stepCount, outputMaxSeq, traceStats);
@@ -4315,6 +4743,7 @@ async function route(req: Request): Promise<Response> {
if (url.pathname === "/api/judge/probe" && (req.method === "GET" || req.method === "POST")) return await runJudgeProbe();
if (url.pathname === "/api/judge/self-test" && (req.method === "GET" || req.method === "POST")) return jsonResponse(runJudgeInfraSelfTest());
if (url.pathname === "/api/queue-order/self-test" && (req.method === "GET" || req.method === "POST")) return jsonResponse(runQueueOrderingSelfTest());
if (url.pathname === "/api/queue-claim-move/self-test" && (req.method === "GET" || req.method === "POST")) return jsonResponse(await runQueueClaimMoveSelfTest());
if (url.pathname === "/api/reference-injection/self-test" && (req.method === "GET" || req.method === "POST")) return jsonResponse(await runReferenceInjectionSelfTest());
if (url.pathname === "/api/trace-port/self-test" && (req.method === "GET" || req.method === "POST")) return jsonResponse(runTracePortSelfTest());
if (url.pathname === "/api/oa/backfill" && (req.method === "GET" || req.method === "POST")) return jsonResponse(await backfillOaTraceStats(url));
@@ -198,7 +198,7 @@ function normalizeCommandText(text: string): string {
function commandKind(command: string): "read" | "edit" | "run" {
if (/\b(apply_patch|git apply|cat >|tee .*<<|sed -i|python3? .*write_text|write|patch|edit|delete|create)\b/iu.test(command)) return "edit";
if (/\b(rg|grep|find|ls|cat|sed -n|tail|head|git status|git diff|ps|read|glob|search|view)\b/iu.test(command)) return "read";
if (/\b(rg|grep|find|ls|cat|sed -n|tail|head|git status|git diff|ps|read|glob|search|view|webSearch)\b/iu.test(command)) return "read";
return "run";
}
@@ -240,6 +240,7 @@ export function outputTraceKind(output: LiveOutput): "read" | "edit" | "run" | "
if (output.channel === "assistant" || output.channel === "user" || output.channel === "reasoning") return "message";
if (output.channel === "tool") {
const record = openCodeToolRecord(output);
if (record === null) return commandKind(normalizeCommandText(output.text));
const part = record?.part && typeof record.part === "object" && !Array.isArray(record.part) ? record.part as Record<string, unknown> : null;
const state = part?.state && typeof part.state === "object" && !Array.isArray(part.state) ? part.state as Record<string, unknown> : null;
const input = state?.input && typeof state.input === "object" && !Array.isArray(state.input) ? state.input as Record<string, unknown> : null;
@@ -388,7 +389,7 @@ export function publishCodeQueueTraceStep(task: QueueTask, queueId: string, outp
title: outputTitle(output, kind),
status: task.status,
summaryLines: outputSummaryLines(output),
rawSeqs: [output.seq],
rawSeqs: outputRawSeqs(output),
},
});
}
@@ -519,6 +520,14 @@ function numberList(value: unknown, fallback: number): number[] {
return values.length > 0 ? values : [fallback];
}
function outputRawSeqs(output: LiveOutput): number[] {
const rawSeqs = (output as LiveOutput & { rawSeqs?: unknown }).rawSeqs;
const values = Array.isArray(rawSeqs)
? rawSeqs.map((item) => Number(item)).filter((item) => Number.isFinite(item)).map((item) => Math.floor(item))
: [];
return values.length > 0 ? Array.from(new Set(values)) : [output.seq];
}
function commandLifecycleStatus(payload: JsonRecord, title: string, summaryLines: string[]): string {
const source = [title, ...summaryLines].join("\n");
const status = /\bstatus=([A-Za-z0-9_-]+)/u.exec(source)?.[1];
@@ -552,6 +561,24 @@ function traceStepFromEvent(event: unknown): OaTraceStepSummary | null {
};
}
function traceStepLifecycleRank(step: OaTraceStepSummary): number {
const source = [step.title, step.status, ...step.summaryLines].join("\n");
if (/\bitem\/completed\b|status=completed\b|\bcompleted\b/iu.test(source)) return 2;
if (/\bitem\/started\b|status=inProgress\b|\binProgress\b/iu.test(source)) return 1;
return 0;
}
function mergeOaTraceStepSummary(existing: OaTraceStepSummary | undefined, incoming: OaTraceStepSummary): OaTraceStepSummary {
if (existing === undefined) return incoming;
const selected = traceStepLifecycleRank(incoming) >= traceStepLifecycleRank(existing) ? incoming : existing;
return {
...existing,
...selected,
eventSequence: Math.max(existing.eventSequence, incoming.eventSequence),
rawSeqs: Array.from(new Set([...existing.rawSeqs, ...incoming.rawSeqs])),
};
}
function eventNextAfterSeq(body: Record<string, unknown>, events: unknown[], fallback: number): number {
const bodyNext = Number(body.nextAfterSeq);
const eventNext = events.reduce<number>((max, event) => {
@@ -609,7 +636,7 @@ export async function readOaTraceStepsForTask(taskId: string, attemptIndex: numb
const events = Array.isArray(body.events) ? body.events : [];
for (const event of events) {
const step = traceStepFromEvent(event);
if (step !== null) bySeq.set(step.seq, { ...(bySeq.get(step.seq) ?? {}), ...step });
if (step !== null) bySeq.set(step.seq, mergeOaTraceStepSummary(bySeq.get(step.seq), step));
}
const nextAfterSeq = eventNextAfterSeq(body, events, afterSeq);
if (events.length < traceStepReadPageLimit || nextAfterSeq <= afterSeq) break;
@@ -497,7 +497,8 @@ async function loadAllTasksForRead(): Promise<QueueTask[]> {
const tasks = await ctx().loadTasksFromDatabase("all");
const byId = new Map(tasks.map((task) => [task.id, task]));
for (const active of ctx().tasks()) {
mergeReadTaskByFreshness(byId, active);
const databaseTask = byId.get(active.id);
if (databaseTask === undefined || preferHotTaskForRead(active, databaseTask)) byId.set(active.id, active);
}
ctx().runGarbageCollection();
return Array.from(byId.values()).sort((left, right) => (timestampMs(left.createdAt) ?? 0) - (timestampMs(right.createdAt) ?? 0) || left.id.localeCompare(right.id));
@@ -652,6 +653,30 @@ function activePriority(task: QueueTask): number {
return statusRank[task.status] ?? 9;
}
function taskHasClaimMarkers(task: QueueTask): boolean {
return task.status === "running"
|| task.status === "judging"
|| task.startedAt !== null
|| task.currentAttempt > 0
|| task.codexThreadId !== null
|| task.activeTurnId !== null;
}
function taskIsUnclaimedQueued(task: QueueTask): boolean {
return (task.status === "queued" || task.status === "retry_wait")
&& task.startedAt === null
&& task.currentAttempt === 0
&& task.codexThreadId === null
&& task.activeTurnId === null;
}
function preferHotTaskForRead(hotTask: QueueTask, databaseTask: QueueTask): boolean {
const hotActiveRun = Array.from(ctx().activeRuns.values()).some((run) => run.taskId === hotTask.id);
if (hotActiveRun) return true;
if (taskIsUnclaimedQueued(hotTask) && taskHasClaimMarkers(databaseTask)) return false;
return taskUpdatedSortValue(hotTask) >= taskUpdatedSortValue(databaseTask);
}
function taskUpdatedSortValue(task: QueueTask): number {
const time = Date.parse(task.updatedAt || task.createdAt);
return Number.isFinite(time) ? time : 0;
@@ -1137,7 +1162,8 @@ async function databaseTasksOverviewResponse(url: URL): Promise<Response | null>
const byId = new Map<string, QueueTask>();
for (const task of loadedTasks) byId.set(task.id, task);
for (const task of ctx().tasks()) {
if (seenIds.has(task.id) || byId.has(task.id)) mergeReadTaskByFreshness(byId, task);
const databaseTask = byId.get(task.id);
if ((seenIds.has(task.id) || databaseTask !== undefined) && (databaseTask === undefined || preferHotTaskForRead(task, databaseTask))) byId.set(task.id, task);
}
const rowsSource = orderedIds
.map((id) => byId.get(id) ?? null)
@@ -17,6 +17,7 @@ export interface SelfTestsContext {
defaultQueueId: string;
enqueueActiveRunSlotWaiter: (task: QueueTask) => ActiveRunSlotWaiter;
injectReferencedTaskContext: (request: QueueTaskRequest, finder?: (id: string) => QueueTask | null | Promise<QueueTask | null>, injectedAt?: string) => Promise<QueueTaskRequest>;
moveTaskToQueueForTest: (task: QueueTask, req: Request) => Promise<Response>;
nextRunnableTaskFrom: (queueId: string, tasks?: QueueTask[]) => QueueTask | null;
normalizeTask: (task: QueueTask) => QueueTask;
nowIso: () => string;
@@ -25,6 +26,8 @@ export interface SelfTestsContext {
queuedStatusReason: (task: QueueTask, tasks?: QueueTask[]) => QueuedStatusReason | null;
removeActiveRunSlotWaiter: (waiter: ActiveRunSlotWaiter) => void;
resolveReasoningEffort: (model: string, explicit?: string | null) => string | null;
runDatabaseClaimMoveSelfTest?: () => Promise<JsonValue | null>;
tasks: () => QueueTask[];
updateProcessingFlag: () => void;
}
@@ -192,6 +195,45 @@ function queueOrderTestTask(id: string, status: TaskStatus, createdAt: string, q
return ctx().normalizeTask(task);
}
async function runQueueClaimMoveSelfTest(): Promise<JsonValue> {
const at = "2026-05-17T06:09:46.702Z";
const task = queueOrderTestTask("codex_claim_move_self_test", "running", at, at);
task.queueId = "claim_move_source";
task.startedAt = at;
task.currentAttempt = 1;
task.currentMode = "initial";
task.codexThreadId = "thread_claim_move_self_test";
task.activeTurnId = "turn_claim_move_self_test";
task.updatedAt = at;
const before = ctx().tasks().slice();
ctx().tasks().push(task);
try {
const response = await ctx().moveTaskToQueueForTest(task, new Request("http://code-queue.local/api/tasks/codex_claim_move_self_test/move", {
method: "POST",
body: JSON.stringify({ queueId: "claim_move_target" }),
headers: { "content-type": "application/json" },
}));
const body = await response.json() as Record<string, JsonValue>;
assertReferenceTest(response.status === 409, "moving a claimed/running task must return 409");
assertReferenceTest(task.queueId === "claim_move_source", "running task queueId must remain unchanged after rejected move");
assertReferenceTest(task.status === "running", "running task status must remain running after rejected move");
assertReferenceTest(task.currentAttempt === 1, "running task currentAttempt must remain claimed after rejected move");
const databaseRace = await ctx().runDatabaseClaimMoveSelfTest?.() ?? null;
return {
ok: true,
cases: [
{ name: "move_running_task_returns_409", ok: true, status: response.status },
{ name: "rejected_move_preserves_queue", ok: true, queueId: task.queueId },
{ name: "rejected_move_preserves_claim_markers", ok: true, status: task.status, currentAttempt: task.currentAttempt, startedAt: task.startedAt },
...(databaseRace === null ? [] : [{ name: "database_claim_blocks_stale_move", ok: true, result: databaseRace }]),
],
response: body as JsonValue,
};
} finally {
ctx().tasks().splice(0, ctx().tasks().length, ...before);
}
}
function runQueueOrderingSelfTest(): JsonValue {
const activeRetry = queueOrderTestTask("codex_4000_active", "retry_wait", "2026-05-11T09:00:00.000Z", "2026-05-11T09:00:00.000Z");
const movedOlderCreated = queueOrderTestTask("codex_3999_moved", "queued", "2026-05-11T08:00:00.000Z", "2026-05-11T08:00:00.000Z") as QueueTask & { queueEnteredAt?: string };
@@ -346,6 +388,17 @@ function runTracePortSelfTest(): JsonValue {
assertReferenceTest(!transcript.some((line) => line.status === "opencode/step-start" || line.status === "opencode/step-finish"), "opencode step boundaries should stay out of trace");
assertReferenceTest(!transcript.some((line) => String(line.bodyPreview || "").includes("<think>hidden reasoning</think>")), "reasoning-only opencode assistant text should not duplicate reasoning");
const codexWebSearchTask = testTask("codex_5004_web_search", "codex web search prompt", "", [], "2026-05-12T00:01:00.000Z");
codexWebSearchTask.output = [
{ seq: 30, at: "2026-05-12T00:01:00.000Z", channel: "tool", method: "item/started", itemId: "ws_trace", text: "item/started: webSearch\n" },
{ seq: 31, at: "2026-05-12T00:01:01.000Z", channel: "tool", method: "item/completed", itemId: "ws_trace", text: "item/completed: webSearch status=completed\n" },
];
const webSearchTranscript = buildTaskTranscript(codexWebSearchTask, 20, 0);
const webSearchLines = webSearchTranscript.filter((line) => line.rawSeqs.includes(30) || line.rawSeqs.includes(31));
assertReferenceTest(webSearchLines.length === 1, "codex WebSearch start/completed lifecycle should coalesce into one trace line");
assertReferenceTest(webSearchLines[0]?.kind === "explored", "codex WebSearch should count as an explored/read trace line");
assertReferenceTest([30, 31].every((seq) => webSearchLines[0]?.rawSeqs.includes(seq)), "codex WebSearch trace line should preserve lifecycle raw seqs");
const codexTask = testTask("codex_5002_interleaved_command", "codex command prompt", "", [], "2026-05-12T00:02:00.000Z");
codexTask.output = [
{ seq: 10, at: "2026-05-12T00:02:00.000Z", channel: "command", method: "item/started", itemId: "call_long", text: "item/started: /bin/bash -lc \"python3 - <<'PY'\\nprint('hello')\\nPY\" status=inProgress\n" },
@@ -404,6 +457,7 @@ function runTracePortSelfTest(): JsonValue {
{ name: "reasoning_duplicate_filtered", ok: true },
{ name: "interleaved_command_output_single_trace_line", ok: true, rawSeqs: longCommand?.rawSeqs ?? [] },
{ name: "interleaved_command_summary_has_command", ok: true, summaryLines: longCommand ? transcriptLineSummaryLines(longCommand) : [] },
{ name: "codex_web_search_lifecycle_coalesced", ok: true, rawSeqs: webSearchLines[0]?.rawSeqs ?? [] },
{ name: "message_fragments_coalesced", ok: true, assistantRawSeqs: assistantMessages[0]?.rawSeqs ?? [], reasoningRawSeqs: reasoningMessages[0]?.rawSeqs ?? [] },
{ name: "duration_preserved", ok: true, durationMs: explored?.durationMs ?? null },
{ name: "remote_opencode_exec_includes_binary", ok: true },
@@ -464,4 +518,4 @@ function runJudgeInfraSelfTest(): JsonValue {
};
}
export { runJudgeInfraSelfTest, runQueueOrderingSelfTest, runReferenceInjectionSelfTest, runTracePortSelfTest };
export { runJudgeInfraSelfTest, runQueueClaimMoveSelfTest, runQueueOrderingSelfTest, runReferenceInjectionSelfTest, runTracePortSelfTest };
@@ -50,6 +50,18 @@ export interface TaskViewContext {
taskQueueEnteredAt: (task: QueueTask) => string;
}
function isCodexToolLifecycleOutput(output: Pick<LiveOutput, "channel" | "method" | "itemId" | "text">): boolean {
if (output.channel !== "tool" || typeof output.itemId !== "string" || output.itemId.length === 0) return false;
const method = String(output.method || "");
if (method !== "item/started" && method !== "item/completed") return false;
return /\b(?:webSearch|mcpToolCall|dynamicToolCall)\b/u.test(String(output.text || ""));
}
function codexToolLifecycleStartedBeforeIn(outputs: Pick<LiveOutput, "channel" | "method" | "itemId" | "text">[], output: Pick<LiveOutput, "channel" | "method" | "itemId" | "text">): boolean {
if (!isCodexToolLifecycleOutput(output)) return false;
return outputs.some((item) => item !== output && isCodexToolLifecycleOutput(item) && item.itemId === output.itemId && item.method === "item/started");
}
const judgeFailRetryLimit = 3;
const transcriptCache = new Map<string, { signature: string; previewTranscript?: TranscriptLine[]; fullTranscript?: TranscriptLine[] }>();
const codexSessionPathCache = new Map<string, string>();
@@ -783,6 +795,7 @@ function overlayTraceMessagesFromRawTranscript(oaLines: TranscriptLine[], rawLin
function commandKind(command: string): TranscriptKind {
if (/\bwebSearch\b/u.test(command)) return "explored";
if (/\b(apply_patch|git apply|cat >|tee .*<<|sed -i|python3? .*write_text)\b/u.test(command)) return "edited";
if (/\b(rg|grep|find|ls|cat|sed -n|tail|head|git status|git diff|ps)\b/u.test(command)) return "explored";
return "ran";
@@ -1102,7 +1115,9 @@ function buildTaskTranscript(task: QueueTask, limit = 180, rawOutputWindow = 0,
? codexSessionFileChangesByCallId(task)
: new Map<string, SessionFileChange>();
type ActiveMessage = { seq: number; at: string; title: string; status?: string; body: string; rawSeqs: number[] };
type ActiveCodexTool = { seq: number; at: string; text: string; status?: string; rawSeqs: number[]; itemId?: string };
let activeMessage: ActiveMessage | null = null;
const activeCodexToolsByItemId = new Map<string, ActiveCodexTool>();
const flushMessage = (): void => {
if (activeMessage === null) return;
@@ -1123,6 +1138,21 @@ function buildTaskTranscript(task: QueueTask, limit = 180, rawOutputWindow = 0,
activeMessage = { seq: item.seq, at: item.at, title, status, body, rawSeqs: [item.seq] };
};
const parseCodexToolLifecycle = (item: LiveOutput): { status: string | undefined; text: string } => {
const status = /\bstatus=([A-Za-z0-9_-]+)/u.exec(item.text)?.[1];
return { status, text: String(item.text || "").trimEnd() };
};
const codexToolLifecycleLine = (tool: ActiveCodexTool): TranscriptLine => {
const kind = commandKind(tool.text);
return transcriptLine(kind, tool.at, tool.seq, shortCommandTitle(tool.text), tool.rawSeqs, "", tool.text, tool.status, fullText);
};
const flushCodexTool = (tool: ActiveCodexTool): void => {
entries.push(codexToolLifecycleLine(tool));
if (tool.itemId !== undefined && activeCodexToolsByItemId.get(tool.itemId) === tool) activeCodexToolsByItemId.delete(tool.itemId);
};
for (const item of outputItems) {
if (initialPrompt !== null && item.channel === "user" && item.method === "enqueue") continue;
if (item.channel === "user" && item.method === "turn/steer" && promptHistorySeqs.has(item.seq)) continue;
@@ -1191,7 +1221,37 @@ function buildTaskTranscript(task: QueueTask, limit = 180, rawOutputWindow = 0,
if (item.channel !== "assistant" && item.channel !== "reasoning") flushMessage();
flushCommand();
if (item.channel === "diff") {
if (isCodexToolLifecycleOutput(item)) {
const parsed = parseCodexToolLifecycle(item);
const itemId = item.itemId || "";
const existing = activeCodexToolsByItemId.get(itemId);
if (item.method === "item/started") {
if (existing !== undefined) flushCodexTool(existing);
activeCodexToolsByItemId.set(itemId, {
seq: item.seq,
at: item.at,
text: parsed.text,
status: parsed.status ?? item.method,
rawSeqs: [item.seq],
itemId,
});
} else if (existing !== undefined) {
existing.at = item.at;
existing.status = parsed.status ?? existing.status;
existing.text = parsed.text.length > 0 ? parsed.text : existing.text;
pushUniqueRawSeq(existing.rawSeqs, item.seq);
flushCodexTool(existing);
} else {
entries.push(codexToolLifecycleLine({
seq: item.seq,
at: item.at,
text: parsed.text,
status: parsed.status ?? item.method,
rawSeqs: [item.seq],
itemId,
}));
}
} else if (item.channel === "diff") {
const text = fileChangeTextWithInlinePatch(item, fileChangeInputs);
entries.push(transcriptLine("edited", item.at, item.seq, "Edited files", [item.seq], text, "", item.method, fullText));
} else if (item.channel === "error") {
@@ -1221,6 +1281,9 @@ function buildTaskTranscript(task: QueueTask, limit = 180, rawOutputWindow = 0,
for (const command of Array.from(activeCommandsByItemId.values()).sort((left, right) => left.seq - right.seq)) {
flushCommand(command);
}
for (const tool of Array.from(activeCodexToolsByItemId.values()).sort((left, right) => left.seq - right.seq)) {
flushCodexTool(tool);
}
return boundedTranscript(coalesceTranscriptMessageFragments(entries), limit);
}
@@ -1863,12 +1926,67 @@ function oaTraceStepToTranscriptLine(step: OaTraceStepSummary): TranscriptLine {
};
}
function isCodexToolLifecycleTranscriptLine(line: TranscriptLine): boolean {
const text = `${line.commandPreview ?? ""}\n${line.bodyPreview ?? ""}\n${line.title}`;
const status = String(line.status || "");
return (line.kind === "explored" || line.kind === "ran")
&& (status === "item/started" || status === "item/completed" || /^item\/(?:started|completed):/u.test(text))
&& /\b(?:webSearch|mcpToolCall|dynamicToolCall)\b/u.test(text);
}
function mergeCodexToolLifecycleGroup(group: TranscriptLine[]): TranscriptLine {
if (group.length <= 1) return group[0];
const first = group[0];
const last = group.at(-1) || first;
const rawSeqs: number[] = [];
for (const line of group) {
for (const seq of Array.isArray(line.rawSeqs) ? line.rawSeqs : [line.seq]) pushUniqueRawSeq(rawSeqs, Number(seq));
}
const command = String(last.commandPreview || first.commandPreview || last.bodyPreview || first.bodyPreview || last.title || first.title || "");
return {
...first,
seq: Number.isFinite(Number(last.seq)) ? Number(last.seq) : Number(first.seq),
at: last.at || first.at,
kind: commandKind(command),
title: shortCommandTitle(command) || String(last.title || first.title || "WebSearch"),
status: last.status || first.status,
commandPreview: command || undefined,
commandOmittedLines: Number(first.commandOmittedLines || 0) + Number(last.commandOmittedLines || 0) || undefined,
bodyPreview: last.bodyPreview || first.bodyPreview,
bodyOmittedLines: Number(first.bodyOmittedLines || 0) + Number(last.bodyOmittedLines || 0) || undefined,
rawSeqs,
};
}
function coalesceCodexToolLifecycleTranscriptLines(lines: TranscriptLine[]): TranscriptLine[] {
const rows = sortTranscript([...lines]);
const merged: TranscriptLine[] = [];
let group: TranscriptLine[] = [];
const flush = () => {
if (group.length > 0) merged.push(mergeCodexToolLifecycleGroup(group));
group = [];
};
for (const line of rows) {
if (isCodexToolLifecycleTranscriptLine(line)) {
const text = String(line.commandPreview || line.bodyPreview || "");
if ((line.status === "item/started" || /^item\/started:/u.test(text)) && group.length > 0) flush();
group.push(line);
if (line.status === "item/completed" || /^item\/completed:/u.test(text)) flush();
continue;
}
flush();
merged.push(line);
}
flush();
return merged;
}
async function oaTraceTranscriptForTask(task: QueueTask, attemptIndex: number | null): Promise<TranscriptLine[]> {
const taskId = task.id;
const steps = await readOaTraceStepsForTask(taskId, attemptIndex);
const oaLines = coalesceTranscriptMessageFragments(steps.map(oaTraceStepToTranscriptLine).filter(traceLineVisibleInTraceView));
const rawLines = fullTranscript(task).filter(traceLineVisibleInTraceView);
return overlayTraceMessagesFromRawTranscript(oaLines, rawLines);
const oaLines = coalesceCodexToolLifecycleTranscriptLines(coalesceTranscriptMessageFragments(steps.map(oaTraceStepToTranscriptLine).filter(traceLineVisibleInTraceView)));
const rawLines = coalesceCodexToolLifecycleTranscriptLines(fullTranscript(task).filter(traceLineVisibleInTraceView));
return coalesceCodexToolLifecycleTranscriptLines(overlayTraceMessagesFromRawTranscript(oaLines, rawLines));
}
function mergeTraceWindowLines(left: TranscriptLine[], right: TranscriptLine[]): TranscriptLine[] {
@@ -2384,6 +2502,8 @@ export {
buildCompactTaskTranscript,
buildTaskTranscript,
cachedPreviewTranscript,
codexToolLifecycleStartedBeforeIn,
isCodexToolLifecycleOutput,
formatCommandOutput,
fullTranscript,
lastAssistantMessage,