merge: code queue manager websearch coalesce

This commit is contained in:
Codex
2026-05-17 12:17:18 +00:00
2 changed files with 264 additions and 21 deletions
+1 -1
View File
@@ -54,7 +54,7 @@
{
"id": "code-queue-mgr",
"repo": "https://github.com/pikasTech/unidesk",
"commitId": "97a647bc8d819cf25691764736d3b001ef1b066e"
"commitId": "b0f76056f86dda9dcd43b5ef76d16cc7e990786f"
},
{
"id": "mdtodo",
@@ -227,6 +227,34 @@ interface QueueRow {
updated_at: Date | string;
}
interface OaTraceStepRow {
scope_id: string;
step_seq: number | string;
kind: string;
title: string | null;
status: string | null;
summary_lines: unknown;
raw_seqs: unknown;
created_at: Date | string;
updated_at: Date | string;
}
interface TraceStepLine {
seq: number;
at: string;
kind: string;
label: string;
title: string;
status: string | null;
summaryLines: string[];
commandPreview?: string;
commandOmittedLines?: number;
bodyPreview?: string;
bodyOmittedLines?: number;
rawSeqs: number[];
source: string;
}
interface HttpErrorDetail {
status: number;
body: JsonRecord;
@@ -665,6 +693,19 @@ function normalizeOutput(value: unknown): LiveOutput[] {
}).sort((left, right) => left.seq - right.seq);
}
function stringList(value: unknown): string[] {
if (Array.isArray(value)) return value.map((item) => String(item ?? "")).filter((item) => item.trim().length > 0);
if (typeof value === "string" && value.trim().length > 0) return [value];
return [];
}
function numberList(value: unknown, fallback: number): number[] {
const values = Array.isArray(value) ? value : [];
const numbers = values.map((item) => Number(item)).filter(Number.isFinite).map((item) => Math.floor(item));
if (numbers.length === 0 && Number.isFinite(fallback)) numbers.push(Math.floor(fallback));
return Array.from(new Set(numbers)).sort((left, right) => left - right);
}
function normalizeTask(value: unknown): QueueTask {
const record = asRecord(value) ?? {};
const status = String(record.status || "queued") as TaskStatus;
@@ -1803,6 +1844,43 @@ function outputToTranscriptLine(item: LiveOutput, fullText: boolean): JsonRecord
};
}
function outputToTraceStepLine(item: LiveOutput): TraceStepLine {
const body = safePreview(item.text, 1600);
return {
seq: item.seq,
at: item.at,
kind: transcriptKind(item),
label: item.channel,
title: item.method ?? item.channel,
status: item.method ?? null,
summaryLines: body.length > 0 ? [body] : [],
commandPreview: item.channel === "command" ? body : undefined,
commandOmittedLines: 0,
bodyPreview: item.channel === "command" ? "" : body,
bodyOmittedLines: 0,
rawSeqs: [item.seq],
source: "code-queue-mgr-postgres",
};
}
function traceStepJson(line: TraceStepLine): JsonRecord {
return {
seq: line.seq,
at: line.at,
kind: line.kind,
label: line.label,
title: line.title,
status: line.status,
summaryLines: line.summaryLines,
commandPreview: line.commandPreview ?? null,
commandOmittedLines: line.commandOmittedLines ?? 0,
bodyPreview: line.bodyPreview ?? "",
bodyOmittedLines: line.bodyOmittedLines ?? 0,
rawSeqs: line.rawSeqs,
source: line.source,
};
}
function transcriptPage(task: QueueTask, url: URL): JsonRecord {
const limit = parseLimit(url);
const afterSeq = numberField(url.searchParams.get("afterSeq"), 0);
@@ -1841,14 +1919,174 @@ function transcriptPage(task: QueueTask, url: URL): JsonRecord {
};
}
function traceSteps(task: QueueTask, url: URL): JsonRecord {
function oaTraceStepKind(kind: string): string {
const normalized = String(kind || "").trim().toLowerCase();
if (normalized === "read" || normalized === "explored" || normalized === "explore") return "explored";
if (normalized === "edit" || normalized === "edited") return "edited";
if (normalized === "run" || normalized === "ran" || normalized === "command") return "ran";
if (normalized === "error" || normalized === "failed") return "error";
if (normalized === "message" || normalized === "assistant" || normalized === "user") return "message";
return "system";
}
function traceStepLifecycleMethod(line: TraceStepLine): "item/started" | "item/completed" | null {
const status = String(line.status || "").trim();
if (status === "item/started" || status === "item/completed") return status;
const text = [line.title, ...line.summaryLines, line.commandPreview, line.bodyPreview].map((item) => String(item || "")).join("\n");
if (/\bitem\/started\b/u.test(text)) return "item/started";
if (/\bitem\/completed\b/u.test(text)) return "item/completed";
return null;
}
function traceStepLifecycleTool(line: TraceStepLine): string | null {
const text = [line.title, ...line.summaryLines, line.commandPreview, line.bodyPreview].map((item) => String(item || "")).join("\n");
const match = /\b(webSearch|mcpToolCall|dynamicToolCall)\b/u.exec(text);
return match?.[1] ?? null;
}
function traceStepIsCodexToolLifecycle(line: TraceStepLine): boolean {
return traceStepLifecycleMethod(line) !== null && traceStepLifecycleTool(line) !== null;
}
function pushUniqueNumber(values: number[], value: number): void {
if (Number.isFinite(value) && !values.includes(Math.floor(value))) values.push(Math.floor(value));
}
function pushUniqueString(values: string[], value: string): void {
const text = value.trim();
if (text.length > 0 && !values.includes(text)) values.push(text);
}
function traceStepCommandKind(text: string, fallback: string): string {
if (/\bwebSearch\b/u.test(text)) return "explored";
if (/\b(?:mcpToolCall|dynamicToolCall)\b/u.test(text)) return "ran";
return fallback;
}
function shortTraceStepTitle(text: string, fallback: string): string {
const normalized = text.replace(/\s+/gu, " ").trim();
return normalized.length > 0 ? safePreview(normalized, 140) : fallback;
}
function mergeCodexToolLifecycleTraceSteps(group: TraceStepLine[]): TraceStepLine {
if (group.length <= 1) return group[0] as TraceStepLine;
const first = group[0] as TraceStepLine;
const last = group.at(-1) ?? first;
const rawSeqs: number[] = [];
const summaryLines: string[] = [];
for (const line of group) {
for (const seq of line.rawSeqs.length > 0 ? line.rawSeqs : [line.seq]) pushUniqueNumber(rawSeqs, seq);
for (const summaryLine of line.summaryLines) pushUniqueString(summaryLines, summaryLine);
}
rawSeqs.sort((left, right) => left - right);
const commandText = String(last.commandPreview || first.commandPreview || last.bodyPreview || first.bodyPreview || last.title || first.title || "");
const kind = traceStepCommandKind(commandText, last.kind || first.kind);
return {
...first,
at: last.at || first.at,
kind,
title: shortTraceStepTitle(commandText, last.title || first.title || "Trace step"),
status: last.status || first.status,
summaryLines: summaryLines.length > 0 ? summaryLines : first.summaryLines,
commandPreview: commandText.length > 0 ? commandText : first.commandPreview,
commandOmittedLines: Number(first.commandOmittedLines || 0) + Number(last.commandOmittedLines || 0),
bodyPreview: last.bodyPreview || first.bodyPreview,
bodyOmittedLines: Number(first.bodyOmittedLines || 0) + Number(last.bodyOmittedLines || 0),
rawSeqs,
};
}
function coalesceCodexToolLifecycleTraceSteps(lines: TraceStepLine[]): TraceStepLine[] {
const rows = [...lines].sort((left, right) => left.seq - right.seq);
const merged: TraceStepLine[] = [];
let group: TraceStepLine[] = [];
const flush = (): void => {
if (group.length > 0) merged.push(mergeCodexToolLifecycleTraceSteps(group));
group = [];
};
for (const line of rows) {
if (traceStepIsCodexToolLifecycle(line)) {
const method = traceStepLifecycleMethod(line);
const tool = traceStepLifecycleTool(line);
const groupTool = group.length === 0 ? null : traceStepLifecycleTool(group[0] as TraceStepLine);
if (method === "item/started" && group.length > 0) flush();
if (group.length > 0 && groupTool !== null && tool !== groupTool) flush();
group.push(line);
if (method === "item/completed") flush();
continue;
}
flush();
merged.push(line);
}
flush();
return merged;
}
function traceScopeId(taskId: string, url: URL): string {
const attempt = numberField(url.searchParams.get("attempt"), 0);
return attempt > 0 ? `task:${taskId}:attempt:${attempt}` : `task:${taskId}`;
}
function oaTraceStepToLine(row: OaTraceStepRow): TraceStepLine | null {
const seq = numberField(row.step_seq, 0);
if (seq <= 0) return null;
const kind = oaTraceStepKind(row.kind);
const title = String(row.title || (kind === "explored" ? "Read" : kind === "edited" ? "Edit" : kind === "ran" ? "Run" : "Trace step"));
const summaryLines = stringList(row.summary_lines);
const summaryText = summaryLines.join("\n").trimEnd();
const commandText = summaryText.startsWith("item/") ? summaryText : "";
return {
seq,
at: timestampToIso(row.updated_at) ?? timestampToIso(row.created_at) ?? nowIso(),
kind,
label: row.kind,
title,
status: row.status ?? null,
summaryLines,
commandPreview: commandText.length > 0 ? commandText : undefined,
commandOmittedLines: 0,
bodyPreview: commandText.length > 0 ? "" : summaryText,
bodyOmittedLines: 0,
rawSeqs: numberList(row.raw_seqs, seq),
source: "code-queue-mgr-oa-trace-steps",
};
}
async function loadOaTraceStepLines(taskId: string, url: URL): Promise<TraceStepLine[]> {
const scopeId = traceScopeId(taskId, url);
const rows = await traceSql<OaTraceStepRow[]>`
SELECT scope_id, step_seq, kind, title, status, summary_lines, raw_seqs, created_at, updated_at
FROM oa_trace_steps
WHERE scope_id = ${scopeId}
ORDER BY step_seq ASC
LIMIT 5000
`;
return coalesceCodexToolLifecycleTraceSteps(rows.map(oaTraceStepToLine).filter((line): line is TraceStepLine => line !== null));
}
function fallbackTraceStepLines(task: QueueTask): TraceStepLine[] {
return task.output
.filter((item) => item.channel !== "system" || item.method !== "enqueue")
.sort((left, right) => left.seq - right.seq)
.map(outputToTraceStepLine);
}
async function traceStepLines(task: QueueTask, url: URL): Promise<{ source: string; steps: TraceStepLine[] }> {
try {
const oaSteps = await loadOaTraceStepLines(task.id, url);
if (oaSteps.length > 0) return { source: "code-queue-mgr-oa-trace-steps", steps: oaSteps };
} catch (error) {
log("warn", "oa_trace_steps_load_failed", { taskId: task.id, scopeId: traceScopeId(task.id, url), error: errorToJson(error) });
}
return { source: "code-queue-mgr-postgres", steps: fallbackTraceStepLines(task) };
}
async function traceSteps(task: QueueTask, url: URL): Promise<JsonRecord> {
const limit = parseLimit(url);
const afterSeq = numberField(url.searchParams.get("afterSeq"), 0);
const beforeSeqRaw = url.searchParams.get("beforeSeq");
const tail = url.searchParams.get("tail") === "1";
const visible = task.output
.filter((item) => item.channel !== "system" || item.method !== "enqueue")
.sort((left, right) => left.seq - right.seq);
const { source, steps: visible } = await traceStepLines(task, url);
let rows = visible;
if (beforeSeqRaw !== null) rows = visible.filter((item) => item.seq < numberField(beforeSeqRaw, Number.MAX_SAFE_INTEGER)).slice(-limit);
else if (tail) rows = visible.slice(-limit);
@@ -1858,7 +2096,7 @@ function traceSteps(task: QueueTask, url: URL): JsonRecord {
return {
ok: true,
taskId: task.id,
source: "code-queue-mgr-postgres",
source,
total: visible.length,
returned: rows.length,
limit,
@@ -1868,20 +2106,25 @@ function traceSteps(task: QueueTask, url: URL): JsonRecord {
previousBeforeSeq: firstSeq,
hasMore: lastSeq !== null && visible.some((item) => item.seq > lastSeq),
hasBefore: firstSeq !== null && visible.some((item) => item.seq < firstSeq),
steps: rows.map((item) => ({
seq: item.seq,
at: item.at,
kind: item.channel === "command" ? "ran" : item.channel === "diff" ? "edited" : item.channel === "tool" ? "explored" : item.channel === "error" ? "error" : "message",
label: item.channel,
title: item.method ?? item.channel,
status: item.method ?? null,
bodyPreview: safePreview(item.text, 1600),
bodyOmittedLines: 0,
rawSeqs: [item.seq],
})),
steps: rows.map(traceStepJson),
};
}
async function traceStepDetail(task: QueueTask, url: URL): Promise<JsonRecord | null> {
const seq = numberField(url.searchParams.get("seq"), 0);
const { source, steps } = await traceStepLines(task, url);
const line = steps.find((item) => item.seq === seq || item.rawSeqs.includes(seq)) ?? null;
if (line !== null) {
const payload = traceStepJson(line);
return { ok: true, taskId: task.id, source, step: payload, rawOutput: payload, line: payload };
}
const item = task.output.find((output) => output.seq === seq) ?? null;
if (item === null) return null;
const fallback = outputToTraceStepLine(item);
const payload = traceStepJson(fallback);
return { ok: true, taskId: task.id, source: "code-queue-mgr-postgres", step: item as unknown as JsonValue, rawOutput: item as unknown as JsonValue, line: payload };
}
function traceSummary(task: QueueTask): JsonRecord {
const steps = task.output.filter((item) => item.channel !== "system" || item.method !== "enqueue");
return {
@@ -2459,14 +2702,14 @@ async function route(req: Request): Promise<Response> {
const traceStepsMatch = url.pathname.match(/^\/api\/tasks\/([^/]+)\/trace-steps$/u);
if (traceStepsMatch !== null && req.method === "GET") {
const task = await loadTask(decodeURIComponent(traceStepsMatch[1] ?? ""));
return task === null ? jsonResponse({ ok: false, error: "task not found" }, 404) : jsonResponse(traceSteps(task, url));
return task === null ? jsonResponse({ ok: false, error: "task not found" }, 404) : jsonResponse(await traceSteps(task, url));
}
const traceStepMatch = url.pathname.match(/^\/api\/tasks\/([^/]+)\/trace-step$/u);
if (traceStepMatch !== null && req.method === "GET") {
const task = await loadTask(decodeURIComponent(traceStepMatch[1] ?? ""));
const seq = numberField(url.searchParams.get("seq"), 0);
const item = task?.output.find((output) => output.seq === seq) ?? null;
return task === null || item === null ? jsonResponse({ ok: false, error: "trace step not found" }, 404) : jsonResponse({ ok: true, taskId: task.id, step: item, rawOutput: item });
if (task === null) return jsonResponse({ ok: false, error: "trace step not found" }, 404);
const detail = await traceStepDetail(task, url);
return detail === null ? jsonResponse({ ok: false, error: "trace step not found" }, 404) : jsonResponse(detail);
}
const summaryMatch = url.pathname.match(/^\/api\/tasks\/([^/]+)\/summary$/u);
if (summaryMatch !== null && req.method === "GET") {