diff --git a/src/components/frontend/src/code-queue.tsx b/src/components/frontend/src/code-queue.tsx index 5e5e2e92..fdd77029 100644 --- a/src/components/frontend/src/code-queue.tsx +++ b/src/components/frontend/src/code-queue.tsx @@ -921,6 +921,64 @@ function coalesceFileChangeTraceSteps(steps: any[]): any[] { return merged; } +function traceStepMessageMergeKey(step: any): string { + if (String(step?.kind || "") !== "message") return ""; + const title = String(step?.title || "").trim().toLowerCase(); + if (title !== "assistant message" && title !== "reasoning") return ""; + return `${title}:${String(step?.status || "")}`; +} + +function mergeMessageTraceStepGroup(group: any[]): any { + if (group.length <= 1) return group[0]; + const first = group[0]; + const last = group.at(-1) || first; + const rawSeqs = group.flatMap((step) => Array.isArray(step?.rawSeqs) ? step.rawSeqs : [step?.seq]).filter((seq) => seq !== undefined); + const seenRawSeqs: any[] = []; + for (const seq of rawSeqs) if (!seenRawSeqs.includes(seq)) seenRawSeqs.push(seq); + const summaryLines = group + .flatMap(traceStepSummaryLines) + .filter((line) => line.trim().length > 0); + return { + ...first, + seq: traceStepSeqValue(last) || traceStepSeqValue(first), + at: last?.at || first?.at, + summaryLines: summaryLines.length > 0 ? [summaryLines.at(-1) || summaryLines[0]] : [], + rawSeqs: seenRawSeqs, + }; +} + +function coalesceMessageTraceSteps(steps: any[]): any[] { + const rows = Array.isArray(steps) ? steps : []; + const merged: any[] = []; + let group: any[] = []; + let groupKey = ""; + const flush = () => { + if (group.length > 0) merged.push(mergeMessageTraceStepGroup(group)); + group = []; + groupKey = ""; + }; + for (const step of rows) { + const key = traceStepMessageMergeKey(step); + if (key.length > 0 && key === groupKey) { + group.push(step); + continue; + } + flush(); + if (key.length > 0) { + group = [step]; + groupKey = key; + } else { + merged.push(step); + } + } + flush(); + return merged; +} + +function coalesceTraceSteps(steps: any[]): any[] { + return coalesceMessageTraceSteps(coalesceFileChangeTraceSteps(steps)); +} + function canonicalExecutionSummary(execution: AnyRecord): AnyRecord { return { ...execution }; } @@ -1638,7 +1696,7 @@ function ProgressivePromptBlock({ task, loading, onLoadPromptPart, testId = "cod } function ProgressiveExecutionSummary({ task, attempt, attemptIndex, loading, onLoadSteps, onLoadStep, testId = "codex-execution-summary" }: AnyRecord) { - const steps = coalesceFileChangeTraceSteps(taskTraceSteps(task, attemptIndex)); + const steps = coalesceTraceSteps(taskTraceSteps(task, attemptIndex)); const execution = canonicalExecutionSummary(attemptExecutionSummary(task, attempt)); const stats = attempt ? attemptOaTraceStats(task, attempt) : taskOaTraceStats(task); const stepDetails = taskTraceStepDetails(task); diff --git a/src/components/microservices/code-queue/src/index.ts b/src/components/microservices/code-queue/src/index.ts index d5246889..031dcfcf 100644 --- a/src/components/microservices/code-queue/src/index.ts +++ b/src/components/microservices/code-queue/src/index.ts @@ -1010,6 +1010,11 @@ function recordTaskOutputMetrics(task: QueueTask, output: LiveOutput, op: "set" return true; } +function outputUpdatesExistingTraceStep(output: LiveOutput): boolean { + if (output.channel === "assistant" || output.channel === "reasoning" || output.channel === "diff") return true; + return false; +} + function errorToJson(error: unknown): JsonValue { if (error instanceof Error) return { name: error.name, message: error.message, stack: error.stack ?? null }; return String(error); @@ -2144,6 +2149,7 @@ configureTaskOutput({ const archiveOp = op === "append" ? "append" : "set"; const stepChanged = recordTaskOutputMetrics(task, output, archiveOp); if (stepChanged) publishCodeQueueTraceStep(task, queueIdOf(task), output, taskOutputMaxSeq(task)); + else if (archiveOp === "append" && outputUpdatesExistingTraceStep(output)) publishCodeQueueTraceStep(task, queueIdOf(task), output, taskOutputMaxSeq(task), null, String(output.text || "").length); if (archiveOp === "append" && !outputCanChangeStepCount(output)) return; publishTaskOaEvent(task, "output", { onlyStepChange: archiveOp === "append", stepChanged }); }, diff --git a/src/components/microservices/code-queue/src/oa-events.ts b/src/components/microservices/code-queue/src/oa-events.ts index 56b43506..0a968530 100644 --- a/src/components/microservices/code-queue/src/oa-events.ts +++ b/src/components/microservices/code-queue/src/oa-events.ts @@ -358,12 +358,12 @@ export function publishCodeQueueTraceStatsSnapshot( }); } -export function publishCodeQueueTraceStep(task: QueueTask, queueId: string, output: LiveOutput, outputMaxSeq: number, attemptIndexOverride: number | null = null): void { +export function publishCodeQueueTraceStep(task: QueueTask, queueId: string, output: LiveOutput, outputMaxSeq: number, attemptIndexOverride: number | null = null, revision = 0): void { const kind = outputTraceKind(output); const attemptIndex = taskOutputAttemptIndex(task, output, attemptIndexOverride); const attemptScopeId = attemptIndex === null ? null : taskAttemptScopeId(task.id, attemptIndex); postOaEvent({ - eventId: `code-queue:trace-step-created:${task.id}:${output.seq}`, + eventId: `code-queue:trace-step-created:${task.id}:${output.seq}:${Math.max(0, Math.floor(revision))}`, type: "trace-step-created", createdAt: output.at || ctx().nowIso(), sourceKind: "service", diff --git a/src/components/microservices/code-queue/src/self-tests.ts b/src/components/microservices/code-queue/src/self-tests.ts index bcc885e9..15fd566c 100644 --- a/src/components/microservices/code-queue/src/self-tests.ts +++ b/src/components/microservices/code-queue/src/self-tests.ts @@ -367,6 +367,22 @@ function runTracePortSelfTest(): JsonValue { assertReferenceTest(["first line", "second line", "third line"].every((part) => String(longCommand.bodyPreview || "").includes(part)), "interleaved command trace line should aggregate all output chunks"); assertReferenceTest(transcriptLineSummaryLines(longCommand).some((line) => line.includes("$ python3 - <<")), "interleaved command summary should expose the command before expansion"); + const fragmentedMessageTask = testTask("codex_5003_fragmented_message", "message fragments prompt", "", [], "2026-05-12T00:03:00.000Z"); + fragmentedMessageTask.output = [ + { seq: 20, at: "2026-05-12T00:03:00.000Z", channel: "assistant", method: "item/agentMessage/delta", itemId: "msg_a", text: "Hello " }, + { seq: 21, at: "2026-05-12T00:03:01.000Z", channel: "assistant", method: "item/agentMessage/delta", itemId: "msg_a", text: "world.\n" }, + { seq: 22, at: "2026-05-12T00:03:02.000Z", channel: "reasoning", method: "item/reasoning/summaryTextDelta", itemId: "rsn_a", text: "Thinking " }, + { seq: 23, at: "2026-05-12T00:03:03.000Z", channel: "reasoning", method: "item/reasoning/summaryTextDelta", itemId: "rsn_a", text: "done.\n" }, + ]; + const fragmentedTranscript = buildTaskTranscript(fragmentedMessageTask, 20, 0); + const assistantMessages = fragmentedTranscript.filter((line) => line.title === "Assistant message"); + const reasoningMessages = fragmentedTranscript.filter((line) => line.title === "Reasoning"); + assertReferenceTest(assistantMessages.length === 1, "assistant message deltas should coalesce into one trace line"); + assertReferenceTest(String(assistantMessages[0]?.bodyPreview || "").includes("Hello world."), "assistant message trace line should contain the joined assistant text"); + assertReferenceTest([20, 21].every((seq) => assistantMessages[0]?.rawSeqs.includes(seq)), "assistant message trace line should preserve raw seqs"); + assertReferenceTest(reasoningMessages.length === 1, "reasoning deltas should coalesce into one trace line"); + assertReferenceTest(String(reasoningMessages[0]?.bodyPreview || "").includes("Thinking done."), "reasoning trace line should contain the joined reasoning text"); + const remoteTask = testTask("codex_5001_remote_opencode", "remote command prompt", "", [], "2026-05-12T00:01:00.000Z"); remoteTask.providerId = "D601"; remoteTask.cwd = "/home/ubuntu"; @@ -388,6 +404,7 @@ function runTracePortSelfTest(): JsonValue { { name: "reasoning_duplicate_filtered", ok: true }, { name: "interleaved_command_output_single_trace_line", ok: true, rawSeqs: longCommand?.rawSeqs ?? [] }, { name: "interleaved_command_summary_has_command", ok: true, summaryLines: longCommand ? transcriptLineSummaryLines(longCommand) : [] }, + { name: "message_fragments_coalesced", ok: true, assistantRawSeqs: assistantMessages[0]?.rawSeqs ?? [], reasoningRawSeqs: reasoningMessages[0]?.rawSeqs ?? [] }, { name: "duration_preserved", ok: true, durationMs: explored?.durationMs ?? null }, { name: "remote_opencode_exec_includes_binary", ok: true }, { name: "opencode_exit0_final_without_step_finish_is_terminal", ok: true }, diff --git a/src/components/microservices/code-queue/src/task-view.ts b/src/components/microservices/code-queue/src/task-view.ts index 235c600f..3467d123 100644 --- a/src/components/microservices/code-queue/src/task-view.ts +++ b/src/components/microservices/code-queue/src/task-view.ts @@ -688,6 +688,71 @@ function transcriptLine(kind: TranscriptKind, at: string, seq: number, title: st }; } +function pushUniqueRawSeq(rawSeqs: number[], seq: number): void { + if (!rawSeqs.includes(seq)) rawSeqs.push(seq); +} + +function messageFragmentMergeKey(line: TranscriptLine): string | null { + if (line.kind !== "message") return null; + const title = String(line.title || "").trim().toLowerCase(); + if (title !== "assistant message" && title !== "reasoning") return null; + return `${title}:${String(line.status || "")}`; +} + +function appendMessageText(left: string, right: string): string { + if (left.length === 0) return right; + if (right.length === 0) return left; + if (!/\s$/u.test(left) && !/^\s|^[,.;:!?)}\]]/u.test(right) && /[A-Za-z0-9]/u.test(left.at(-1) || "") && /[A-Za-z0-9]/u.test(right[0] || "")) return `${left} ${right}`; + return `${left}${right}`; +} + +function mergeTranscriptMessageGroup(group: TranscriptLine[]): TranscriptLine { + const first = group[0]; + const last = group[group.length - 1] || first; + const body = group.reduce((text, line) => appendMessageText(text, String(line.bodyPreview || "")), ""); + const rawSeqs: number[] = []; + for (const line of group) { + for (const seq of Array.isArray(line.rawSeqs) ? line.rawSeqs : []) pushUniqueRawSeq(rawSeqs, seq); + } + return { + ...first, + seq: Number.isFinite(Number(last.seq)) ? Number(last.seq) : Number(first.seq), + at: last.at || first.at, + bodyPreview: body.length > 0 ? body : undefined, + bodyOmittedLines: group.reduce((sum, line) => sum + Number(line.bodyOmittedLines || 0), 0) || undefined, + rawSeqs, + }; +} + +function coalesceTranscriptMessageFragments(entries: TranscriptLine[]): TranscriptLine[] { + const rows = sortTranscript([...entries]); + const merged: TranscriptLine[] = []; + let group: TranscriptLine[] = []; + let groupKey: string | null = null; + const flush = () => { + if (group.length === 0) return; + merged.push(group.length === 1 ? group[0] : mergeTranscriptMessageGroup(group)); + group = []; + groupKey = null; + }; + for (const line of rows) { + const key = messageFragmentMergeKey(line); + if (key !== null && key === groupKey) { + group.push(line); + continue; + } + flush(); + if (key !== null) { + group = [line]; + groupKey = key; + } else { + merged.push(line); + } + } + flush(); + return merged; +} + function commandKind(command: string): TranscriptKind { if (/\b(apply_patch|git apply|cat >|tee .*<<|sed -i|python3? .*write_text)\b/u.test(command)) return "edited"; @@ -1008,11 +1073,34 @@ function buildTaskTranscript(task: QueueTask, limit = 180, rawOutputWindow = 0, const fileChangeInputs = outputItems.some((item) => item.channel === "diff" && item.method === "item/fileChange/outputDelta" && typeof item.itemId === "string") ? codexSessionFileChangesByCallId(task) : new Map(); + type ActiveMessage = { seq: number; at: string; title: string; status?: string; body: string; rawSeqs: number[] }; + let activeMessage: ActiveMessage | null = null; + + const flushMessage = (): void => { + if (activeMessage === null) return; + entries.push(transcriptLine("message", activeMessage.at, activeMessage.seq, activeMessage.title, activeMessage.rawSeqs, activeMessage.body, "", activeMessage.status, fullText)); + activeMessage = null; + }; + + const appendMessage = (item: LiveOutput, title: string, body: string, status?: string): void => { + if (body.length === 0) return; + if (activeMessage !== null && activeMessage.title === title && activeMessage.status === status) { + activeMessage.body += body; + activeMessage.at = item.at; + activeMessage.seq = item.seq; + pushUniqueRawSeq(activeMessage.rawSeqs, item.seq); + return; + } + flushMessage(); + activeMessage = { seq: item.seq, at: item.at, title, status, body, rawSeqs: [item.seq] }; + }; + for (const item of outputItems) { if (initialPrompt !== null && item.channel === "user" && item.method === "enqueue") continue; if (item.channel === "user" && item.method === "turn/steer" && promptHistorySeqs.has(item.seq)) continue; if (isOpenCodeStepBoundaryMethod(item.method)) continue; if (item.channel === "command" && item.method === "item/started") { + flushMessage(); const parsed = parseCommandLine(item.text); const groupedCommand = itemIdCommand(item, parsed); if (groupedCommand !== null) { @@ -1037,6 +1125,7 @@ function buildTaskTranscript(task: QueueTask, limit = 180, rawOutputWindow = 0, continue; } if (item.channel === "command" && item.method === "item/commandExecution/outputDelta") { + flushMessage(); const groupedCommand = itemIdCommand(item); if (groupedCommand !== null) { groupedCommand.body += item.text; @@ -1051,6 +1140,7 @@ function buildTaskTranscript(task: QueueTask, limit = 180, rawOutputWindow = 0, continue; } if (item.channel === "command" && item.method === "item/completed") { + flushMessage(); const parsed = parseCommandLine(item.text); const groupedCommand = itemIdCommand(item, parsed); if (groupedCommand !== null) { @@ -1071,6 +1161,7 @@ function buildTaskTranscript(task: QueueTask, limit = 180, rawOutputWindow = 0, continue; } + if (item.channel !== "assistant" && item.channel !== "reasoning") flushMessage(); flushCommand(); if (item.channel === "diff") { const text = fileChangeTextWithInlinePatch(item, fileChangeInputs); @@ -1079,9 +1170,9 @@ function buildTaskTranscript(task: QueueTask, limit = 180, rawOutputWindow = 0, entries.push(transcriptLine("error", item.at, item.seq, "Error", [item.seq], item.text, "", item.method, fullText)); } else if (item.channel === "assistant") { const body = String(item.method || "").startsWith("opencode/") ? openCodeVisibleAssistantText(item.text) : item.text; - if (body.length > 0) entries.push(transcriptLine("message", item.at, item.seq, "Assistant message", [item.seq], body, "", item.method, fullText)); + appendMessage(item, "Assistant message", body, item.method); } else if (item.channel === "reasoning") { - entries.push(transcriptLine("message", item.at, item.seq, "Reasoning", [item.seq], item.text, "", item.method, fullText)); + appendMessage(item, "Reasoning", item.text, item.method); } else if (item.channel === "user") { entries.push(transcriptLine("message", item.at, item.seq, item.method === "enqueue" ? "Submitted prompt" : "User prompt", [item.seq], item.text, "", item.method, fullText)); } else if (item.channel === "tool" && String(item.method || "").startsWith("opencode/")) { @@ -1097,11 +1188,12 @@ function buildTaskTranscript(task: QueueTask, limit = 180, rawOutputWindow = 0, entries.push(transcriptLine("system", item.at, item.seq, title, [item.seq], item.text, "", item.method, fullText)); } } + flushMessage(); flushCommand(); for (const command of Array.from(activeCommandsByItemId.values()).sort((left, right) => left.seq - right.seq)) { flushCommand(command); } - return boundedTranscript(entries, limit); + return boundedTranscript(coalesceTranscriptMessageFragments(entries), limit); } function buildCompactTaskTranscript(task: QueueTask, limit = 12, rawOutputWindow = 24): TranscriptLine[] { @@ -1745,7 +1837,7 @@ function oaTraceStepToTranscriptLine(step: OaTraceStepSummary): TranscriptLine { async function oaTraceTranscriptForTask(taskId: string, attemptIndex: number | null): Promise { const steps = await readOaTraceStepsForTask(taskId, attemptIndex); - return steps.map(oaTraceStepToTranscriptLine).filter(traceLineVisibleInTraceView); + return coalesceTranscriptMessageFragments(steps.map(oaTraceStepToTranscriptLine).filter(traceLineVisibleInTraceView)); } function mergeTraceWindowLines(left: TranscriptLine[], right: TranscriptLine[]): TranscriptLine[] {