Fix Code Queue WebSearch trace coalescing

This commit is contained in:
Codex
2026-05-17 01:41:19 +00:00
parent 1e0140fdcd
commit fd77f74909
4 changed files with 189 additions and 12 deletions
@@ -130,8 +130,10 @@ import {
} from "./oa-events";
import { configureSelfTests, runJudgeInfraSelfTest, runQueueOrderingSelfTest, runReferenceInjectionSelfTest, runTracePortSelfTest } from "./self-tests";
import {
codexToolLifecycleStartedBeforeIn,
configureTaskView,
formatCommandOutput,
isCodexToolLifecycleOutput,
lastAssistantMessage,
promptLineCount,
recordNumberField,
@@ -1018,6 +1020,7 @@ function outputStartsTraceStepInHistory(outputs: LiveOutput[], output: LiveOutpu
if (output.channel === "user" && output.method === "enqueue") return false;
if (isOpenCodeStepBoundaryMethod(output.method)) return false;
if (output.channel === "system") return false;
if (codexToolLifecycleStartedBeforeIn(outputs, output)) return false;
if (output.channel === "diff" || output.channel === "tool" || output.channel === "error" || output.channel === "assistant" || output.channel === "reasoning") return true;
if (output.channel === "user") return true;
if (output.channel !== "command") return true;
@@ -1088,9 +1091,18 @@ function recordTaskOutputMetrics(task: QueueTask, output: LiveOutput, op: "set"
function outputUpdatesExistingTraceStep(output: LiveOutput): boolean {
if (output.channel === "assistant" || output.channel === "reasoning" || output.channel === "diff") return true;
if (isCodexToolLifecycleOutput(output) && output.method === "item/completed") return true;
return false;
}
function traceStepOutputForProjection(task: QueueTask, output: LiveOutput): LiveOutput {
if (!isCodexToolLifecycleOutput(output) || output.method !== "item/completed" || typeof output.itemId !== "string") return output;
const started = taskFullOutput(task)
.filter((item) => item !== output && isCodexToolLifecycleOutput(item) && item.itemId === output.itemId && item.method === "item/started")
.sort((left, right) => Number(left.seq) - Number(right.seq))[0];
return started === undefined ? output : { ...output, seq: started.seq, at: output.at, itemId: output.itemId, rawSeqs: [started.seq, output.seq] } as LiveOutput;
}
function errorToJson(error: unknown): JsonValue {
if (error instanceof Error) return { name: error.name, message: error.message, stack: error.stack ?? null };
return String(error);
@@ -2298,8 +2310,9 @@ configureTaskOutput({
onOutputAppended: (task, output, op) => {
const archiveOp = op === "append" ? "append" : "set";
const stepChanged = recordTaskOutputMetrics(task, output, archiveOp);
if (stepChanged) publishCodeQueueTraceStep(task, queueIdOf(task), output, taskOutputMaxSeq(task));
else if (archiveOp === "append" && outputUpdatesExistingTraceStep(output)) publishCodeQueueTraceStep(task, queueIdOf(task), output, taskOutputMaxSeq(task), null, String(output.text || "").length);
const projectionOutput = traceStepOutputForProjection(task, output);
if (stepChanged) publishCodeQueueTraceStep(task, queueIdOf(task), projectionOutput, taskOutputMaxSeq(task));
else if ((archiveOp === "append" || output.method === "item/completed") && outputUpdatesExistingTraceStep(output)) publishCodeQueueTraceStep(task, queueIdOf(task), projectionOutput, taskOutputMaxSeq(task), null, String(output.text || "").length);
if (archiveOp === "append" && !outputCanChangeStepCount(output)) return;
publishTaskOaEvent(task, "output", { onlyStepChange: archiveOp === "append", stepChanged });
},
@@ -4162,9 +4175,14 @@ async function backfillOaTraceStats(url: URL): Promise<JsonValue> {
const attemptBySeq = outputAttemptIndexMap(output);
if (includeSteps) {
for (const item of output) {
if (!outputStartsTraceStepInHistory(output, item)) continue;
publishCodeQueueTraceStep(task, queueId, item, outputMaxSeq, attemptBySeq.get(item.seq) ?? null);
stepEventCount += 1;
const projectionOutput = traceStepOutputForProjection(task, item);
if (outputStartsTraceStepInHistory(output, item)) {
publishCodeQueueTraceStep(task, queueId, projectionOutput, outputMaxSeq, attemptBySeq.get(item.seq) ?? null);
stepEventCount += 1;
} else if (outputUpdatesExistingTraceStep(item)) {
publishCodeQueueTraceStep(task, queueId, projectionOutput, outputMaxSeq, attemptBySeq.get(projectionOutput.seq) ?? attemptBySeq.get(item.seq) ?? null, String(item.text || "").length);
stepEventCount += 1;
}
}
}
publishCodeQueueTraceStatsSnapshot(task, queueId, "backfill", traceStats.stepCount, outputMaxSeq, traceStats);
@@ -198,7 +198,7 @@ function normalizeCommandText(text: string): string {
function commandKind(command: string): "read" | "edit" | "run" {
if (/\b(apply_patch|git apply|cat >|tee .*<<|sed -i|python3? .*write_text|write|patch|edit|delete|create)\b/iu.test(command)) return "edit";
if (/\b(rg|grep|find|ls|cat|sed -n|tail|head|git status|git diff|ps|read|glob|search|view)\b/iu.test(command)) return "read";
if (/\b(rg|grep|find|ls|cat|sed -n|tail|head|git status|git diff|ps|read|glob|search|view|webSearch)\b/iu.test(command)) return "read";
return "run";
}
@@ -240,6 +240,7 @@ export function outputTraceKind(output: LiveOutput): "read" | "edit" | "run" | "
if (output.channel === "assistant" || output.channel === "user" || output.channel === "reasoning") return "message";
if (output.channel === "tool") {
const record = openCodeToolRecord(output);
if (record === null) return commandKind(normalizeCommandText(output.text));
const part = record?.part && typeof record.part === "object" && !Array.isArray(record.part) ? record.part as Record<string, unknown> : null;
const state = part?.state && typeof part.state === "object" && !Array.isArray(part.state) ? part.state as Record<string, unknown> : null;
const input = state?.input && typeof state.input === "object" && !Array.isArray(state.input) ? state.input as Record<string, unknown> : null;
@@ -388,7 +389,7 @@ export function publishCodeQueueTraceStep(task: QueueTask, queueId: string, outp
title: outputTitle(output, kind),
status: task.status,
summaryLines: outputSummaryLines(output),
rawSeqs: [output.seq],
rawSeqs: outputRawSeqs(output),
},
});
}
@@ -519,6 +520,14 @@ function numberList(value: unknown, fallback: number): number[] {
return values.length > 0 ? values : [fallback];
}
function outputRawSeqs(output: LiveOutput): number[] {
const rawSeqs = (output as LiveOutput & { rawSeqs?: unknown }).rawSeqs;
const values = Array.isArray(rawSeqs)
? rawSeqs.map((item) => Number(item)).filter((item) => Number.isFinite(item)).map((item) => Math.floor(item))
: [];
return values.length > 0 ? Array.from(new Set(values)) : [output.seq];
}
function commandLifecycleStatus(payload: JsonRecord, title: string, summaryLines: string[]): string {
const source = [title, ...summaryLines].join("\n");
const status = /\bstatus=([A-Za-z0-9_-]+)/u.exec(source)?.[1];
@@ -552,6 +561,24 @@ function traceStepFromEvent(event: unknown): OaTraceStepSummary | null {
};
}
function traceStepLifecycleRank(step: OaTraceStepSummary): number {
const source = [step.title, step.status, ...step.summaryLines].join("\n");
if (/\bitem\/completed\b|status=completed\b|\bcompleted\b/iu.test(source)) return 2;
if (/\bitem\/started\b|status=inProgress\b|\binProgress\b/iu.test(source)) return 1;
return 0;
}
function mergeOaTraceStepSummary(existing: OaTraceStepSummary | undefined, incoming: OaTraceStepSummary): OaTraceStepSummary {
if (existing === undefined) return incoming;
const selected = traceStepLifecycleRank(incoming) >= traceStepLifecycleRank(existing) ? incoming : existing;
return {
...existing,
...selected,
eventSequence: Math.max(existing.eventSequence, incoming.eventSequence),
rawSeqs: Array.from(new Set([...existing.rawSeqs, ...incoming.rawSeqs])),
};
}
function eventNextAfterSeq(body: Record<string, unknown>, events: unknown[], fallback: number): number {
const bodyNext = Number(body.nextAfterSeq);
const eventNext = events.reduce<number>((max, event) => {
@@ -609,7 +636,7 @@ export async function readOaTraceStepsForTask(taskId: string, attemptIndex: numb
const events = Array.isArray(body.events) ? body.events : [];
for (const event of events) {
const step = traceStepFromEvent(event);
if (step !== null) bySeq.set(step.seq, { ...(bySeq.get(step.seq) ?? {}), ...step });
if (step !== null) bySeq.set(step.seq, mergeOaTraceStepSummary(bySeq.get(step.seq), step));
}
const nextAfterSeq = eventNextAfterSeq(body, events, afterSeq);
if (events.length < traceStepReadPageLimit || nextAfterSeq <= afterSeq) break;
@@ -346,6 +346,17 @@ function runTracePortSelfTest(): JsonValue {
assertReferenceTest(!transcript.some((line) => line.status === "opencode/step-start" || line.status === "opencode/step-finish"), "opencode step boundaries should stay out of trace");
assertReferenceTest(!transcript.some((line) => String(line.bodyPreview || "").includes("<think>hidden reasoning</think>")), "reasoning-only opencode assistant text should not duplicate reasoning");
const codexWebSearchTask = testTask("codex_5004_web_search", "codex web search prompt", "", [], "2026-05-12T00:01:00.000Z");
codexWebSearchTask.output = [
{ seq: 30, at: "2026-05-12T00:01:00.000Z", channel: "tool", method: "item/started", itemId: "ws_trace", text: "item/started: webSearch\n" },
{ seq: 31, at: "2026-05-12T00:01:01.000Z", channel: "tool", method: "item/completed", itemId: "ws_trace", text: "item/completed: webSearch status=completed\n" },
];
const webSearchTranscript = buildTaskTranscript(codexWebSearchTask, 20, 0);
const webSearchLines = webSearchTranscript.filter((line) => line.rawSeqs.includes(30) || line.rawSeqs.includes(31));
assertReferenceTest(webSearchLines.length === 1, "codex WebSearch start/completed lifecycle should coalesce into one trace line");
assertReferenceTest(webSearchLines[0]?.kind === "explored", "codex WebSearch should count as an explored/read trace line");
assertReferenceTest([30, 31].every((seq) => webSearchLines[0]?.rawSeqs.includes(seq)), "codex WebSearch trace line should preserve lifecycle raw seqs");
const codexTask = testTask("codex_5002_interleaved_command", "codex command prompt", "", [], "2026-05-12T00:02:00.000Z");
codexTask.output = [
{ seq: 10, at: "2026-05-12T00:02:00.000Z", channel: "command", method: "item/started", itemId: "call_long", text: "item/started: /bin/bash -lc \"python3 - <<'PY'\\nprint('hello')\\nPY\" status=inProgress\n" },
@@ -404,6 +415,7 @@ function runTracePortSelfTest(): JsonValue {
{ name: "reasoning_duplicate_filtered", ok: true },
{ name: "interleaved_command_output_single_trace_line", ok: true, rawSeqs: longCommand?.rawSeqs ?? [] },
{ name: "interleaved_command_summary_has_command", ok: true, summaryLines: longCommand ? transcriptLineSummaryLines(longCommand) : [] },
{ name: "codex_web_search_lifecycle_coalesced", ok: true, rawSeqs: webSearchLines[0]?.rawSeqs ?? [] },
{ name: "message_fragments_coalesced", ok: true, assistantRawSeqs: assistantMessages[0]?.rawSeqs ?? [], reasoningRawSeqs: reasoningMessages[0]?.rawSeqs ?? [] },
{ name: "duration_preserved", ok: true, durationMs: explored?.durationMs ?? null },
{ name: "remote_opencode_exec_includes_binary", ok: true },
@@ -50,6 +50,18 @@ export interface TaskViewContext {
taskQueueEnteredAt: (task: QueueTask) => string;
}
function isCodexToolLifecycleOutput(output: Pick<LiveOutput, "channel" | "method" | "itemId" | "text">): boolean {
if (output.channel !== "tool" || typeof output.itemId !== "string" || output.itemId.length === 0) return false;
const method = String(output.method || "");
if (method !== "item/started" && method !== "item/completed") return false;
return /\b(?:webSearch|mcpToolCall|dynamicToolCall)\b/u.test(String(output.text || ""));
}
function codexToolLifecycleStartedBeforeIn(outputs: Pick<LiveOutput, "channel" | "method" | "itemId" | "text">[], output: Pick<LiveOutput, "channel" | "method" | "itemId" | "text">): boolean {
if (!isCodexToolLifecycleOutput(output)) return false;
return outputs.some((item) => item !== output && isCodexToolLifecycleOutput(item) && item.itemId === output.itemId && item.method === "item/started");
}
const judgeFailRetryLimit = 3;
const transcriptCache = new Map<string, { signature: string; previewTranscript?: TranscriptLine[]; fullTranscript?: TranscriptLine[] }>();
const codexSessionPathCache = new Map<string, string>();
@@ -783,6 +795,7 @@ function overlayTraceMessagesFromRawTranscript(oaLines: TranscriptLine[], rawLin
function commandKind(command: string): TranscriptKind {
if (/\bwebSearch\b/u.test(command)) return "explored";
if (/\b(apply_patch|git apply|cat >|tee .*<<|sed -i|python3? .*write_text)\b/u.test(command)) return "edited";
if (/\b(rg|grep|find|ls|cat|sed -n|tail|head|git status|git diff|ps)\b/u.test(command)) return "explored";
return "ran";
@@ -1102,7 +1115,9 @@ function buildTaskTranscript(task: QueueTask, limit = 180, rawOutputWindow = 0,
? codexSessionFileChangesByCallId(task)
: new Map<string, SessionFileChange>();
type ActiveMessage = { seq: number; at: string; title: string; status?: string; body: string; rawSeqs: number[] };
type ActiveCodexTool = { seq: number; at: string; text: string; status?: string; rawSeqs: number[]; itemId?: string };
let activeMessage: ActiveMessage | null = null;
const activeCodexToolsByItemId = new Map<string, ActiveCodexTool>();
const flushMessage = (): void => {
if (activeMessage === null) return;
@@ -1123,6 +1138,21 @@ function buildTaskTranscript(task: QueueTask, limit = 180, rawOutputWindow = 0,
activeMessage = { seq: item.seq, at: item.at, title, status, body, rawSeqs: [item.seq] };
};
const parseCodexToolLifecycle = (item: LiveOutput): { status: string | undefined; text: string } => {
const status = /\bstatus=([A-Za-z0-9_-]+)/u.exec(item.text)?.[1];
return { status, text: String(item.text || "").trimEnd() };
};
const codexToolLifecycleLine = (tool: ActiveCodexTool): TranscriptLine => {
const kind = commandKind(tool.text);
return transcriptLine(kind, tool.at, tool.seq, shortCommandTitle(tool.text), tool.rawSeqs, "", tool.text, tool.status, fullText);
};
const flushCodexTool = (tool: ActiveCodexTool): void => {
entries.push(codexToolLifecycleLine(tool));
if (tool.itemId !== undefined && activeCodexToolsByItemId.get(tool.itemId) === tool) activeCodexToolsByItemId.delete(tool.itemId);
};
for (const item of outputItems) {
if (initialPrompt !== null && item.channel === "user" && item.method === "enqueue") continue;
if (item.channel === "user" && item.method === "turn/steer" && promptHistorySeqs.has(item.seq)) continue;
@@ -1191,7 +1221,37 @@ function buildTaskTranscript(task: QueueTask, limit = 180, rawOutputWindow = 0,
if (item.channel !== "assistant" && item.channel !== "reasoning") flushMessage();
flushCommand();
if (item.channel === "diff") {
if (isCodexToolLifecycleOutput(item)) {
const parsed = parseCodexToolLifecycle(item);
const itemId = item.itemId || "";
const existing = activeCodexToolsByItemId.get(itemId);
if (item.method === "item/started") {
if (existing !== undefined) flushCodexTool(existing);
activeCodexToolsByItemId.set(itemId, {
seq: item.seq,
at: item.at,
text: parsed.text,
status: parsed.status ?? item.method,
rawSeqs: [item.seq],
itemId,
});
} else if (existing !== undefined) {
existing.at = item.at;
existing.status = parsed.status ?? existing.status;
existing.text = parsed.text.length > 0 ? parsed.text : existing.text;
pushUniqueRawSeq(existing.rawSeqs, item.seq);
flushCodexTool(existing);
} else {
entries.push(codexToolLifecycleLine({
seq: item.seq,
at: item.at,
text: parsed.text,
status: parsed.status ?? item.method,
rawSeqs: [item.seq],
itemId,
}));
}
} else if (item.channel === "diff") {
const text = fileChangeTextWithInlinePatch(item, fileChangeInputs);
entries.push(transcriptLine("edited", item.at, item.seq, "Edited files", [item.seq], text, "", item.method, fullText));
} else if (item.channel === "error") {
@@ -1221,6 +1281,9 @@ function buildTaskTranscript(task: QueueTask, limit = 180, rawOutputWindow = 0,
for (const command of Array.from(activeCommandsByItemId.values()).sort((left, right) => left.seq - right.seq)) {
flushCommand(command);
}
for (const tool of Array.from(activeCodexToolsByItemId.values()).sort((left, right) => left.seq - right.seq)) {
flushCodexTool(tool);
}
return boundedTranscript(coalesceTranscriptMessageFragments(entries), limit);
}
@@ -1863,12 +1926,67 @@ function oaTraceStepToTranscriptLine(step: OaTraceStepSummary): TranscriptLine {
};
}
function isCodexToolLifecycleTranscriptLine(line: TranscriptLine): boolean {
const text = `${line.commandPreview ?? ""}\n${line.bodyPreview ?? ""}\n${line.title}`;
const status = String(line.status || "");
return (line.kind === "explored" || line.kind === "ran")
&& (status === "item/started" || status === "item/completed" || /^item\/(?:started|completed):/u.test(text))
&& /\b(?:webSearch|mcpToolCall|dynamicToolCall)\b/u.test(text);
}
function mergeCodexToolLifecycleGroup(group: TranscriptLine[]): TranscriptLine {
if (group.length <= 1) return group[0];
const first = group[0];
const last = group.at(-1) || first;
const rawSeqs: number[] = [];
for (const line of group) {
for (const seq of Array.isArray(line.rawSeqs) ? line.rawSeqs : [line.seq]) pushUniqueRawSeq(rawSeqs, Number(seq));
}
const command = String(last.commandPreview || first.commandPreview || last.bodyPreview || first.bodyPreview || last.title || first.title || "");
return {
...first,
seq: Number.isFinite(Number(last.seq)) ? Number(last.seq) : Number(first.seq),
at: last.at || first.at,
kind: commandKind(command),
title: shortCommandTitle(command) || String(last.title || first.title || "WebSearch"),
status: last.status || first.status,
commandPreview: command || undefined,
commandOmittedLines: Number(first.commandOmittedLines || 0) + Number(last.commandOmittedLines || 0) || undefined,
bodyPreview: last.bodyPreview || first.bodyPreview,
bodyOmittedLines: Number(first.bodyOmittedLines || 0) + Number(last.bodyOmittedLines || 0) || undefined,
rawSeqs,
};
}
function coalesceCodexToolLifecycleTranscriptLines(lines: TranscriptLine[]): TranscriptLine[] {
const rows = sortTranscript([...lines]);
const merged: TranscriptLine[] = [];
let group: TranscriptLine[] = [];
const flush = () => {
if (group.length > 0) merged.push(mergeCodexToolLifecycleGroup(group));
group = [];
};
for (const line of rows) {
if (isCodexToolLifecycleTranscriptLine(line)) {
const text = String(line.commandPreview || line.bodyPreview || "");
if ((line.status === "item/started" || /^item\/started:/u.test(text)) && group.length > 0) flush();
group.push(line);
if (line.status === "item/completed" || /^item\/completed:/u.test(text)) flush();
continue;
}
flush();
merged.push(line);
}
flush();
return merged;
}
async function oaTraceTranscriptForTask(task: QueueTask, attemptIndex: number | null): Promise<TranscriptLine[]> {
const taskId = task.id;
const steps = await readOaTraceStepsForTask(taskId, attemptIndex);
const oaLines = coalesceTranscriptMessageFragments(steps.map(oaTraceStepToTranscriptLine).filter(traceLineVisibleInTraceView));
const rawLines = fullTranscript(task).filter(traceLineVisibleInTraceView);
return overlayTraceMessagesFromRawTranscript(oaLines, rawLines);
const oaLines = coalesceCodexToolLifecycleTranscriptLines(coalesceTranscriptMessageFragments(steps.map(oaTraceStepToTranscriptLine).filter(traceLineVisibleInTraceView)));
const rawLines = coalesceCodexToolLifecycleTranscriptLines(fullTranscript(task).filter(traceLineVisibleInTraceView));
return coalesceCodexToolLifecycleTranscriptLines(overlayTraceMessagesFromRawTranscript(oaLines, rawLines));
}
function mergeTraceWindowLines(left: TranscriptLine[], right: TranscriptLine[]): TranscriptLine[] {
@@ -2384,6 +2502,8 @@ export {
buildCompactTaskTranscript,
buildTaskTranscript,
cachedPreviewTranscript,
codexToolLifecycleStartedBeforeIn,
isCodexToolLifecycleOutput,
formatCommandOutput,
fullTranscript,
lastAssistantMessage,