From fd77f74909d6267a8720308ad56e192dce2a44cf Mon Sep 17 00:00:00 2001 From: Codex Date: Sun, 17 May 2026 01:41:19 +0000 Subject: [PATCH] Fix Code Queue WebSearch trace coalescing --- .../microservices/code-queue/src/index.ts | 28 +++- .../microservices/code-queue/src/oa-events.ts | 33 ++++- .../code-queue/src/self-tests.ts | 12 ++ .../microservices/code-queue/src/task-view.ts | 128 +++++++++++++++++- 4 files changed, 189 insertions(+), 12 deletions(-) diff --git a/src/components/microservices/code-queue/src/index.ts b/src/components/microservices/code-queue/src/index.ts index b044bae6..297dca54 100644 --- a/src/components/microservices/code-queue/src/index.ts +++ b/src/components/microservices/code-queue/src/index.ts @@ -130,8 +130,10 @@ import { } from "./oa-events"; import { configureSelfTests, runJudgeInfraSelfTest, runQueueOrderingSelfTest, runReferenceInjectionSelfTest, runTracePortSelfTest } from "./self-tests"; import { + codexToolLifecycleStartedBeforeIn, configureTaskView, formatCommandOutput, + isCodexToolLifecycleOutput, lastAssistantMessage, promptLineCount, recordNumberField, @@ -1018,6 +1020,7 @@ function outputStartsTraceStepInHistory(outputs: LiveOutput[], output: LiveOutpu if (output.channel === "user" && output.method === "enqueue") return false; if (isOpenCodeStepBoundaryMethod(output.method)) return false; if (output.channel === "system") return false; + if (codexToolLifecycleStartedBeforeIn(outputs, output)) return false; if (output.channel === "diff" || output.channel === "tool" || output.channel === "error" || output.channel === "assistant" || output.channel === "reasoning") return true; if (output.channel === "user") return true; if (output.channel !== "command") return true; @@ -1088,9 +1091,18 @@ function recordTaskOutputMetrics(task: QueueTask, output: LiveOutput, op: "set" function outputUpdatesExistingTraceStep(output: LiveOutput): boolean { if (output.channel === "assistant" || output.channel === "reasoning" || output.channel === "diff") return true; + if (isCodexToolLifecycleOutput(output) && output.method === "item/completed") return true; return false; } +function traceStepOutputForProjection(task: QueueTask, output: LiveOutput): LiveOutput { + if (!isCodexToolLifecycleOutput(output) || output.method !== "item/completed" || typeof output.itemId !== "string") return output; + const started = taskFullOutput(task) + .filter((item) => item !== output && isCodexToolLifecycleOutput(item) && item.itemId === output.itemId && item.method === "item/started") + .sort((left, right) => Number(left.seq) - Number(right.seq))[0]; + return started === undefined ? output : { ...output, seq: started.seq, at: output.at, itemId: output.itemId, rawSeqs: [started.seq, output.seq] } as LiveOutput; +} + function errorToJson(error: unknown): JsonValue { if (error instanceof Error) return { name: error.name, message: error.message, stack: error.stack ?? null }; return String(error); @@ -2298,8 +2310,9 @@ configureTaskOutput({ onOutputAppended: (task, output, op) => { const archiveOp = op === "append" ? "append" : "set"; const stepChanged = recordTaskOutputMetrics(task, output, archiveOp); - if (stepChanged) publishCodeQueueTraceStep(task, queueIdOf(task), output, taskOutputMaxSeq(task)); - else if (archiveOp === "append" && outputUpdatesExistingTraceStep(output)) publishCodeQueueTraceStep(task, queueIdOf(task), output, taskOutputMaxSeq(task), null, String(output.text || "").length); + const projectionOutput = traceStepOutputForProjection(task, output); + if (stepChanged) publishCodeQueueTraceStep(task, queueIdOf(task), projectionOutput, taskOutputMaxSeq(task)); + else if ((archiveOp === "append" || output.method === "item/completed") && outputUpdatesExistingTraceStep(output)) publishCodeQueueTraceStep(task, queueIdOf(task), projectionOutput, taskOutputMaxSeq(task), null, String(output.text || "").length); if (archiveOp === "append" && !outputCanChangeStepCount(output)) return; publishTaskOaEvent(task, "output", { onlyStepChange: archiveOp === "append", stepChanged }); }, @@ -4162,9 +4175,14 @@ async function backfillOaTraceStats(url: URL): Promise { const attemptBySeq = outputAttemptIndexMap(output); if (includeSteps) { for (const item of output) { - if (!outputStartsTraceStepInHistory(output, item)) continue; - publishCodeQueueTraceStep(task, queueId, item, outputMaxSeq, attemptBySeq.get(item.seq) ?? null); - stepEventCount += 1; + const projectionOutput = traceStepOutputForProjection(task, item); + if (outputStartsTraceStepInHistory(output, item)) { + publishCodeQueueTraceStep(task, queueId, projectionOutput, outputMaxSeq, attemptBySeq.get(item.seq) ?? null); + stepEventCount += 1; + } else if (outputUpdatesExistingTraceStep(item)) { + publishCodeQueueTraceStep(task, queueId, projectionOutput, outputMaxSeq, attemptBySeq.get(projectionOutput.seq) ?? attemptBySeq.get(item.seq) ?? null, String(item.text || "").length); + stepEventCount += 1; + } } } publishCodeQueueTraceStatsSnapshot(task, queueId, "backfill", traceStats.stepCount, outputMaxSeq, traceStats); diff --git a/src/components/microservices/code-queue/src/oa-events.ts b/src/components/microservices/code-queue/src/oa-events.ts index 0a968530..bd515b0e 100644 --- a/src/components/microservices/code-queue/src/oa-events.ts +++ b/src/components/microservices/code-queue/src/oa-events.ts @@ -198,7 +198,7 @@ function normalizeCommandText(text: string): string { function commandKind(command: string): "read" | "edit" | "run" { if (/\b(apply_patch|git apply|cat >|tee .*<<|sed -i|python3? .*write_text|write|patch|edit|delete|create)\b/iu.test(command)) return "edit"; - if (/\b(rg|grep|find|ls|cat|sed -n|tail|head|git status|git diff|ps|read|glob|search|view)\b/iu.test(command)) return "read"; + if (/\b(rg|grep|find|ls|cat|sed -n|tail|head|git status|git diff|ps|read|glob|search|view|webSearch)\b/iu.test(command)) return "read"; return "run"; } @@ -240,6 +240,7 @@ export function outputTraceKind(output: LiveOutput): "read" | "edit" | "run" | " if (output.channel === "assistant" || output.channel === "user" || output.channel === "reasoning") return "message"; if (output.channel === "tool") { const record = openCodeToolRecord(output); + if (record === null) return commandKind(normalizeCommandText(output.text)); const part = record?.part && typeof record.part === "object" && !Array.isArray(record.part) ? record.part as Record : null; const state = part?.state && typeof part.state === "object" && !Array.isArray(part.state) ? part.state as Record : null; const input = state?.input && typeof state.input === "object" && !Array.isArray(state.input) ? state.input as Record : null; @@ -388,7 +389,7 @@ export function publishCodeQueueTraceStep(task: QueueTask, queueId: string, outp title: outputTitle(output, kind), status: task.status, summaryLines: outputSummaryLines(output), - rawSeqs: [output.seq], + rawSeqs: outputRawSeqs(output), }, }); } @@ -519,6 +520,14 @@ function numberList(value: unknown, fallback: number): number[] { return values.length > 0 ? values : [fallback]; } +function outputRawSeqs(output: LiveOutput): number[] { + const rawSeqs = (output as LiveOutput & { rawSeqs?: unknown }).rawSeqs; + const values = Array.isArray(rawSeqs) + ? rawSeqs.map((item) => Number(item)).filter((item) => Number.isFinite(item)).map((item) => Math.floor(item)) + : []; + return values.length > 0 ? Array.from(new Set(values)) : [output.seq]; +} + function commandLifecycleStatus(payload: JsonRecord, title: string, summaryLines: string[]): string { const source = [title, ...summaryLines].join("\n"); const status = /\bstatus=([A-Za-z0-9_-]+)/u.exec(source)?.[1]; @@ -552,6 +561,24 @@ function traceStepFromEvent(event: unknown): OaTraceStepSummary | null { }; } +function traceStepLifecycleRank(step: OaTraceStepSummary): number { + const source = [step.title, step.status, ...step.summaryLines].join("\n"); + if (/\bitem\/completed\b|status=completed\b|\bcompleted\b/iu.test(source)) return 2; + if (/\bitem\/started\b|status=inProgress\b|\binProgress\b/iu.test(source)) return 1; + return 0; +} + +function mergeOaTraceStepSummary(existing: OaTraceStepSummary | undefined, incoming: OaTraceStepSummary): OaTraceStepSummary { + if (existing === undefined) return incoming; + const selected = traceStepLifecycleRank(incoming) >= traceStepLifecycleRank(existing) ? incoming : existing; + return { + ...existing, + ...selected, + eventSequence: Math.max(existing.eventSequence, incoming.eventSequence), + rawSeqs: Array.from(new Set([...existing.rawSeqs, ...incoming.rawSeqs])), + }; +} + function eventNextAfterSeq(body: Record, events: unknown[], fallback: number): number { const bodyNext = Number(body.nextAfterSeq); const eventNext = events.reduce((max, event) => { @@ -609,7 +636,7 @@ export async function readOaTraceStepsForTask(taskId: string, attemptIndex: numb const events = Array.isArray(body.events) ? body.events : []; for (const event of events) { const step = traceStepFromEvent(event); - if (step !== null) bySeq.set(step.seq, { ...(bySeq.get(step.seq) ?? {}), ...step }); + if (step !== null) bySeq.set(step.seq, mergeOaTraceStepSummary(bySeq.get(step.seq), step)); } const nextAfterSeq = eventNextAfterSeq(body, events, afterSeq); if (events.length < traceStepReadPageLimit || nextAfterSeq <= afterSeq) break; diff --git a/src/components/microservices/code-queue/src/self-tests.ts b/src/components/microservices/code-queue/src/self-tests.ts index 15fd566c..76c9e44e 100644 --- a/src/components/microservices/code-queue/src/self-tests.ts +++ b/src/components/microservices/code-queue/src/self-tests.ts @@ -346,6 +346,17 @@ function runTracePortSelfTest(): JsonValue { assertReferenceTest(!transcript.some((line) => line.status === "opencode/step-start" || line.status === "opencode/step-finish"), "opencode step boundaries should stay out of trace"); assertReferenceTest(!transcript.some((line) => String(line.bodyPreview || "").includes("hidden reasoning")), "reasoning-only opencode assistant text should not duplicate reasoning"); + const codexWebSearchTask = testTask("codex_5004_web_search", "codex web search prompt", "", [], "2026-05-12T00:01:00.000Z"); + codexWebSearchTask.output = [ + { seq: 30, at: "2026-05-12T00:01:00.000Z", channel: "tool", method: "item/started", itemId: "ws_trace", text: "item/started: webSearch\n" }, + { seq: 31, at: "2026-05-12T00:01:01.000Z", channel: "tool", method: "item/completed", itemId: "ws_trace", text: "item/completed: webSearch status=completed\n" }, + ]; + const webSearchTranscript = buildTaskTranscript(codexWebSearchTask, 20, 0); + const webSearchLines = webSearchTranscript.filter((line) => line.rawSeqs.includes(30) || line.rawSeqs.includes(31)); + assertReferenceTest(webSearchLines.length === 1, "codex WebSearch start/completed lifecycle should coalesce into one trace line"); + assertReferenceTest(webSearchLines[0]?.kind === "explored", "codex WebSearch should count as an explored/read trace line"); + assertReferenceTest([30, 31].every((seq) => webSearchLines[0]?.rawSeqs.includes(seq)), "codex WebSearch trace line should preserve lifecycle raw seqs"); + const codexTask = testTask("codex_5002_interleaved_command", "codex command prompt", "", [], "2026-05-12T00:02:00.000Z"); codexTask.output = [ { seq: 10, at: "2026-05-12T00:02:00.000Z", channel: "command", method: "item/started", itemId: "call_long", text: "item/started: /bin/bash -lc \"python3 - <<'PY'\\nprint('hello')\\nPY\" status=inProgress\n" }, @@ -404,6 +415,7 @@ function runTracePortSelfTest(): JsonValue { { name: "reasoning_duplicate_filtered", ok: true }, { name: "interleaved_command_output_single_trace_line", ok: true, rawSeqs: longCommand?.rawSeqs ?? [] }, { name: "interleaved_command_summary_has_command", ok: true, summaryLines: longCommand ? transcriptLineSummaryLines(longCommand) : [] }, + { name: "codex_web_search_lifecycle_coalesced", ok: true, rawSeqs: webSearchLines[0]?.rawSeqs ?? [] }, { name: "message_fragments_coalesced", ok: true, assistantRawSeqs: assistantMessages[0]?.rawSeqs ?? [], reasoningRawSeqs: reasoningMessages[0]?.rawSeqs ?? [] }, { name: "duration_preserved", ok: true, durationMs: explored?.durationMs ?? null }, { name: "remote_opencode_exec_includes_binary", ok: true }, diff --git a/src/components/microservices/code-queue/src/task-view.ts b/src/components/microservices/code-queue/src/task-view.ts index 2d552688..acb6ed50 100644 --- a/src/components/microservices/code-queue/src/task-view.ts +++ b/src/components/microservices/code-queue/src/task-view.ts @@ -50,6 +50,18 @@ export interface TaskViewContext { taskQueueEnteredAt: (task: QueueTask) => string; } +function isCodexToolLifecycleOutput(output: Pick): boolean { + if (output.channel !== "tool" || typeof output.itemId !== "string" || output.itemId.length === 0) return false; + const method = String(output.method || ""); + if (method !== "item/started" && method !== "item/completed") return false; + return /\b(?:webSearch|mcpToolCall|dynamicToolCall)\b/u.test(String(output.text || "")); +} + +function codexToolLifecycleStartedBeforeIn(outputs: Pick[], output: Pick): boolean { + if (!isCodexToolLifecycleOutput(output)) return false; + return outputs.some((item) => item !== output && isCodexToolLifecycleOutput(item) && item.itemId === output.itemId && item.method === "item/started"); +} + const judgeFailRetryLimit = 3; const transcriptCache = new Map(); const codexSessionPathCache = new Map(); @@ -783,6 +795,7 @@ function overlayTraceMessagesFromRawTranscript(oaLines: TranscriptLine[], rawLin function commandKind(command: string): TranscriptKind { + if (/\bwebSearch\b/u.test(command)) return "explored"; if (/\b(apply_patch|git apply|cat >|tee .*<<|sed -i|python3? .*write_text)\b/u.test(command)) return "edited"; if (/\b(rg|grep|find|ls|cat|sed -n|tail|head|git status|git diff|ps)\b/u.test(command)) return "explored"; return "ran"; @@ -1102,7 +1115,9 @@ function buildTaskTranscript(task: QueueTask, limit = 180, rawOutputWindow = 0, ? codexSessionFileChangesByCallId(task) : new Map(); type ActiveMessage = { seq: number; at: string; title: string; status?: string; body: string; rawSeqs: number[] }; + type ActiveCodexTool = { seq: number; at: string; text: string; status?: string; rawSeqs: number[]; itemId?: string }; let activeMessage: ActiveMessage | null = null; + const activeCodexToolsByItemId = new Map(); const flushMessage = (): void => { if (activeMessage === null) return; @@ -1123,6 +1138,21 @@ function buildTaskTranscript(task: QueueTask, limit = 180, rawOutputWindow = 0, activeMessage = { seq: item.seq, at: item.at, title, status, body, rawSeqs: [item.seq] }; }; + const parseCodexToolLifecycle = (item: LiveOutput): { status: string | undefined; text: string } => { + const status = /\bstatus=([A-Za-z0-9_-]+)/u.exec(item.text)?.[1]; + return { status, text: String(item.text || "").trimEnd() }; + }; + + const codexToolLifecycleLine = (tool: ActiveCodexTool): TranscriptLine => { + const kind = commandKind(tool.text); + return transcriptLine(kind, tool.at, tool.seq, shortCommandTitle(tool.text), tool.rawSeqs, "", tool.text, tool.status, fullText); + }; + + const flushCodexTool = (tool: ActiveCodexTool): void => { + entries.push(codexToolLifecycleLine(tool)); + if (tool.itemId !== undefined && activeCodexToolsByItemId.get(tool.itemId) === tool) activeCodexToolsByItemId.delete(tool.itemId); + }; + for (const item of outputItems) { if (initialPrompt !== null && item.channel === "user" && item.method === "enqueue") continue; if (item.channel === "user" && item.method === "turn/steer" && promptHistorySeqs.has(item.seq)) continue; @@ -1191,7 +1221,37 @@ function buildTaskTranscript(task: QueueTask, limit = 180, rawOutputWindow = 0, if (item.channel !== "assistant" && item.channel !== "reasoning") flushMessage(); flushCommand(); - if (item.channel === "diff") { + if (isCodexToolLifecycleOutput(item)) { + const parsed = parseCodexToolLifecycle(item); + const itemId = item.itemId || ""; + const existing = activeCodexToolsByItemId.get(itemId); + if (item.method === "item/started") { + if (existing !== undefined) flushCodexTool(existing); + activeCodexToolsByItemId.set(itemId, { + seq: item.seq, + at: item.at, + text: parsed.text, + status: parsed.status ?? item.method, + rawSeqs: [item.seq], + itemId, + }); + } else if (existing !== undefined) { + existing.at = item.at; + existing.status = parsed.status ?? existing.status; + existing.text = parsed.text.length > 0 ? parsed.text : existing.text; + pushUniqueRawSeq(existing.rawSeqs, item.seq); + flushCodexTool(existing); + } else { + entries.push(codexToolLifecycleLine({ + seq: item.seq, + at: item.at, + text: parsed.text, + status: parsed.status ?? item.method, + rawSeqs: [item.seq], + itemId, + })); + } + } else if (item.channel === "diff") { const text = fileChangeTextWithInlinePatch(item, fileChangeInputs); entries.push(transcriptLine("edited", item.at, item.seq, "Edited files", [item.seq], text, "", item.method, fullText)); } else if (item.channel === "error") { @@ -1221,6 +1281,9 @@ function buildTaskTranscript(task: QueueTask, limit = 180, rawOutputWindow = 0, for (const command of Array.from(activeCommandsByItemId.values()).sort((left, right) => left.seq - right.seq)) { flushCommand(command); } + for (const tool of Array.from(activeCodexToolsByItemId.values()).sort((left, right) => left.seq - right.seq)) { + flushCodexTool(tool); + } return boundedTranscript(coalesceTranscriptMessageFragments(entries), limit); } @@ -1863,12 +1926,67 @@ function oaTraceStepToTranscriptLine(step: OaTraceStepSummary): TranscriptLine { }; } +function isCodexToolLifecycleTranscriptLine(line: TranscriptLine): boolean { + const text = `${line.commandPreview ?? ""}\n${line.bodyPreview ?? ""}\n${line.title}`; + const status = String(line.status || ""); + return (line.kind === "explored" || line.kind === "ran") + && (status === "item/started" || status === "item/completed" || /^item\/(?:started|completed):/u.test(text)) + && /\b(?:webSearch|mcpToolCall|dynamicToolCall)\b/u.test(text); +} + +function mergeCodexToolLifecycleGroup(group: TranscriptLine[]): TranscriptLine { + if (group.length <= 1) return group[0]; + const first = group[0]; + const last = group.at(-1) || first; + const rawSeqs: number[] = []; + for (const line of group) { + for (const seq of Array.isArray(line.rawSeqs) ? line.rawSeqs : [line.seq]) pushUniqueRawSeq(rawSeqs, Number(seq)); + } + const command = String(last.commandPreview || first.commandPreview || last.bodyPreview || first.bodyPreview || last.title || first.title || ""); + return { + ...first, + seq: Number.isFinite(Number(last.seq)) ? Number(last.seq) : Number(first.seq), + at: last.at || first.at, + kind: commandKind(command), + title: shortCommandTitle(command) || String(last.title || first.title || "WebSearch"), + status: last.status || first.status, + commandPreview: command || undefined, + commandOmittedLines: Number(first.commandOmittedLines || 0) + Number(last.commandOmittedLines || 0) || undefined, + bodyPreview: last.bodyPreview || first.bodyPreview, + bodyOmittedLines: Number(first.bodyOmittedLines || 0) + Number(last.bodyOmittedLines || 0) || undefined, + rawSeqs, + }; +} + +function coalesceCodexToolLifecycleTranscriptLines(lines: TranscriptLine[]): TranscriptLine[] { + const rows = sortTranscript([...lines]); + const merged: TranscriptLine[] = []; + let group: TranscriptLine[] = []; + const flush = () => { + if (group.length > 0) merged.push(mergeCodexToolLifecycleGroup(group)); + group = []; + }; + for (const line of rows) { + if (isCodexToolLifecycleTranscriptLine(line)) { + const text = String(line.commandPreview || line.bodyPreview || ""); + if ((line.status === "item/started" || /^item\/started:/u.test(text)) && group.length > 0) flush(); + group.push(line); + if (line.status === "item/completed" || /^item\/completed:/u.test(text)) flush(); + continue; + } + flush(); + merged.push(line); + } + flush(); + return merged; +} + async function oaTraceTranscriptForTask(task: QueueTask, attemptIndex: number | null): Promise { const taskId = task.id; const steps = await readOaTraceStepsForTask(taskId, attemptIndex); - const oaLines = coalesceTranscriptMessageFragments(steps.map(oaTraceStepToTranscriptLine).filter(traceLineVisibleInTraceView)); - const rawLines = fullTranscript(task).filter(traceLineVisibleInTraceView); - return overlayTraceMessagesFromRawTranscript(oaLines, rawLines); + const oaLines = coalesceCodexToolLifecycleTranscriptLines(coalesceTranscriptMessageFragments(steps.map(oaTraceStepToTranscriptLine).filter(traceLineVisibleInTraceView))); + const rawLines = coalesceCodexToolLifecycleTranscriptLines(fullTranscript(task).filter(traceLineVisibleInTraceView)); + return coalesceCodexToolLifecycleTranscriptLines(overlayTraceMessagesFromRawTranscript(oaLines, rawLines)); } function mergeTraceWindowLines(left: TranscriptLine[], right: TranscriptLine[]): TranscriptLine[] { @@ -2384,6 +2502,8 @@ export { buildCompactTaskTranscript, buildTaskTranscript, cachedPreviewTranscript, + codexToolLifecycleStartedBeforeIn, + isCodexToolLifecycleOutput, formatCommandOutput, fullTranscript, lastAssistantMessage,