fix code queue trace message fragments

This commit is contained in:
Codex
2026-05-16 15:58:30 +00:00
parent 9fad202b96
commit 6263f83926
5 changed files with 180 additions and 7 deletions
+59 -1
View File
@@ -921,6 +921,64 @@ function coalesceFileChangeTraceSteps(steps: any[]): any[] {
return merged;
}
function traceStepMessageMergeKey(step: any): string {
if (String(step?.kind || "") !== "message") return "";
const title = String(step?.title || "").trim().toLowerCase();
if (title !== "assistant message" && title !== "reasoning") return "";
return `${title}:${String(step?.status || "")}`;
}
function mergeMessageTraceStepGroup(group: any[]): any {
if (group.length <= 1) return group[0];
const first = group[0];
const last = group.at(-1) || first;
const rawSeqs = group.flatMap((step) => Array.isArray(step?.rawSeqs) ? step.rawSeqs : [step?.seq]).filter((seq) => seq !== undefined);
const seenRawSeqs: any[] = [];
for (const seq of rawSeqs) if (!seenRawSeqs.includes(seq)) seenRawSeqs.push(seq);
const summaryLines = group
.flatMap(traceStepSummaryLines)
.filter((line) => line.trim().length > 0);
return {
...first,
seq: traceStepSeqValue(last) || traceStepSeqValue(first),
at: last?.at || first?.at,
summaryLines: summaryLines.length > 0 ? [summaryLines.at(-1) || summaryLines[0]] : [],
rawSeqs: seenRawSeqs,
};
}
function coalesceMessageTraceSteps(steps: any[]): any[] {
const rows = Array.isArray(steps) ? steps : [];
const merged: any[] = [];
let group: any[] = [];
let groupKey = "";
const flush = () => {
if (group.length > 0) merged.push(mergeMessageTraceStepGroup(group));
group = [];
groupKey = "";
};
for (const step of rows) {
const key = traceStepMessageMergeKey(step);
if (key.length > 0 && key === groupKey) {
group.push(step);
continue;
}
flush();
if (key.length > 0) {
group = [step];
groupKey = key;
} else {
merged.push(step);
}
}
flush();
return merged;
}
function coalesceTraceSteps(steps: any[]): any[] {
return coalesceMessageTraceSteps(coalesceFileChangeTraceSteps(steps));
}
function canonicalExecutionSummary(execution: AnyRecord): AnyRecord {
return { ...execution };
}
@@ -1638,7 +1696,7 @@ function ProgressivePromptBlock({ task, loading, onLoadPromptPart, testId = "cod
}
function ProgressiveExecutionSummary({ task, attempt, attemptIndex, loading, onLoadSteps, onLoadStep, testId = "codex-execution-summary" }: AnyRecord) {
const steps = coalesceFileChangeTraceSteps(taskTraceSteps(task, attemptIndex));
const steps = coalesceTraceSteps(taskTraceSteps(task, attemptIndex));
const execution = canonicalExecutionSummary(attemptExecutionSummary(task, attempt));
const stats = attempt ? attemptOaTraceStats(task, attempt) : taskOaTraceStats(task);
const stepDetails = taskTraceStepDetails(task);
@@ -1010,6 +1010,11 @@ function recordTaskOutputMetrics(task: QueueTask, output: LiveOutput, op: "set"
return true;
}
function outputUpdatesExistingTraceStep(output: LiveOutput): boolean {
if (output.channel === "assistant" || output.channel === "reasoning" || output.channel === "diff") return true;
return false;
}
function errorToJson(error: unknown): JsonValue {
if (error instanceof Error) return { name: error.name, message: error.message, stack: error.stack ?? null };
return String(error);
@@ -2144,6 +2149,7 @@ configureTaskOutput({
const archiveOp = op === "append" ? "append" : "set";
const stepChanged = recordTaskOutputMetrics(task, output, archiveOp);
if (stepChanged) publishCodeQueueTraceStep(task, queueIdOf(task), output, taskOutputMaxSeq(task));
else if (archiveOp === "append" && outputUpdatesExistingTraceStep(output)) publishCodeQueueTraceStep(task, queueIdOf(task), output, taskOutputMaxSeq(task), null, String(output.text || "").length);
if (archiveOp === "append" && !outputCanChangeStepCount(output)) return;
publishTaskOaEvent(task, "output", { onlyStepChange: archiveOp === "append", stepChanged });
},
@@ -358,12 +358,12 @@ export function publishCodeQueueTraceStatsSnapshot(
});
}
export function publishCodeQueueTraceStep(task: QueueTask, queueId: string, output: LiveOutput, outputMaxSeq: number, attemptIndexOverride: number | null = null): void {
export function publishCodeQueueTraceStep(task: QueueTask, queueId: string, output: LiveOutput, outputMaxSeq: number, attemptIndexOverride: number | null = null, revision = 0): void {
const kind = outputTraceKind(output);
const attemptIndex = taskOutputAttemptIndex(task, output, attemptIndexOverride);
const attemptScopeId = attemptIndex === null ? null : taskAttemptScopeId(task.id, attemptIndex);
postOaEvent({
eventId: `code-queue:trace-step-created:${task.id}:${output.seq}`,
eventId: `code-queue:trace-step-created:${task.id}:${output.seq}:${Math.max(0, Math.floor(revision))}`,
type: "trace-step-created",
createdAt: output.at || ctx().nowIso(),
sourceKind: "service",
@@ -367,6 +367,22 @@ function runTracePortSelfTest(): JsonValue {
assertReferenceTest(["first line", "second line", "third line"].every((part) => String(longCommand.bodyPreview || "").includes(part)), "interleaved command trace line should aggregate all output chunks");
assertReferenceTest(transcriptLineSummaryLines(longCommand).some((line) => line.includes("$ python3 - <<")), "interleaved command summary should expose the command before expansion");
const fragmentedMessageTask = testTask("codex_5003_fragmented_message", "message fragments prompt", "", [], "2026-05-12T00:03:00.000Z");
fragmentedMessageTask.output = [
{ seq: 20, at: "2026-05-12T00:03:00.000Z", channel: "assistant", method: "item/agentMessage/delta", itemId: "msg_a", text: "Hello " },
{ seq: 21, at: "2026-05-12T00:03:01.000Z", channel: "assistant", method: "item/agentMessage/delta", itemId: "msg_a", text: "world.\n" },
{ seq: 22, at: "2026-05-12T00:03:02.000Z", channel: "reasoning", method: "item/reasoning/summaryTextDelta", itemId: "rsn_a", text: "Thinking " },
{ seq: 23, at: "2026-05-12T00:03:03.000Z", channel: "reasoning", method: "item/reasoning/summaryTextDelta", itemId: "rsn_a", text: "done.\n" },
];
const fragmentedTranscript = buildTaskTranscript(fragmentedMessageTask, 20, 0);
const assistantMessages = fragmentedTranscript.filter((line) => line.title === "Assistant message");
const reasoningMessages = fragmentedTranscript.filter((line) => line.title === "Reasoning");
assertReferenceTest(assistantMessages.length === 1, "assistant message deltas should coalesce into one trace line");
assertReferenceTest(String(assistantMessages[0]?.bodyPreview || "").includes("Hello world."), "assistant message trace line should contain the joined assistant text");
assertReferenceTest([20, 21].every((seq) => assistantMessages[0]?.rawSeqs.includes(seq)), "assistant message trace line should preserve raw seqs");
assertReferenceTest(reasoningMessages.length === 1, "reasoning deltas should coalesce into one trace line");
assertReferenceTest(String(reasoningMessages[0]?.bodyPreview || "").includes("Thinking done."), "reasoning trace line should contain the joined reasoning text");
const remoteTask = testTask("codex_5001_remote_opencode", "remote command prompt", "", [], "2026-05-12T00:01:00.000Z");
remoteTask.providerId = "D601";
remoteTask.cwd = "/home/ubuntu";
@@ -388,6 +404,7 @@ function runTracePortSelfTest(): JsonValue {
{ name: "reasoning_duplicate_filtered", ok: true },
{ name: "interleaved_command_output_single_trace_line", ok: true, rawSeqs: longCommand?.rawSeqs ?? [] },
{ name: "interleaved_command_summary_has_command", ok: true, summaryLines: longCommand ? transcriptLineSummaryLines(longCommand) : [] },
{ name: "message_fragments_coalesced", ok: true, assistantRawSeqs: assistantMessages[0]?.rawSeqs ?? [], reasoningRawSeqs: reasoningMessages[0]?.rawSeqs ?? [] },
{ name: "duration_preserved", ok: true, durationMs: explored?.durationMs ?? null },
{ name: "remote_opencode_exec_includes_binary", ok: true },
{ name: "opencode_exit0_final_without_step_finish_is_terminal", ok: true },
@@ -688,6 +688,71 @@ function transcriptLine(kind: TranscriptKind, at: string, seq: number, title: st
};
}
function pushUniqueRawSeq(rawSeqs: number[], seq: number): void {
if (!rawSeqs.includes(seq)) rawSeqs.push(seq);
}
function messageFragmentMergeKey(line: TranscriptLine): string | null {
if (line.kind !== "message") return null;
const title = String(line.title || "").trim().toLowerCase();
if (title !== "assistant message" && title !== "reasoning") return null;
return `${title}:${String(line.status || "")}`;
}
function appendMessageText(left: string, right: string): string {
if (left.length === 0) return right;
if (right.length === 0) return left;
if (!/\s$/u.test(left) && !/^\s|^[,.;:!?)}\]]/u.test(right) && /[A-Za-z0-9]/u.test(left.at(-1) || "") && /[A-Za-z0-9]/u.test(right[0] || "")) return `${left} ${right}`;
return `${left}${right}`;
}
function mergeTranscriptMessageGroup(group: TranscriptLine[]): TranscriptLine {
const first = group[0];
const last = group[group.length - 1] || first;
const body = group.reduce((text, line) => appendMessageText(text, String(line.bodyPreview || "")), "");
const rawSeqs: number[] = [];
for (const line of group) {
for (const seq of Array.isArray(line.rawSeqs) ? line.rawSeqs : []) pushUniqueRawSeq(rawSeqs, seq);
}
return {
...first,
seq: Number.isFinite(Number(last.seq)) ? Number(last.seq) : Number(first.seq),
at: last.at || first.at,
bodyPreview: body.length > 0 ? body : undefined,
bodyOmittedLines: group.reduce((sum, line) => sum + Number(line.bodyOmittedLines || 0), 0) || undefined,
rawSeqs,
};
}
function coalesceTranscriptMessageFragments(entries: TranscriptLine[]): TranscriptLine[] {
const rows = sortTranscript([...entries]);
const merged: TranscriptLine[] = [];
let group: TranscriptLine[] = [];
let groupKey: string | null = null;
const flush = () => {
if (group.length === 0) return;
merged.push(group.length === 1 ? group[0] : mergeTranscriptMessageGroup(group));
group = [];
groupKey = null;
};
for (const line of rows) {
const key = messageFragmentMergeKey(line);
if (key !== null && key === groupKey) {
group.push(line);
continue;
}
flush();
if (key !== null) {
group = [line];
groupKey = key;
} else {
merged.push(line);
}
}
flush();
return merged;
}
function commandKind(command: string): TranscriptKind {
if (/\b(apply_patch|git apply|cat >|tee .*<<|sed -i|python3? .*write_text)\b/u.test(command)) return "edited";
@@ -1008,11 +1073,34 @@ function buildTaskTranscript(task: QueueTask, limit = 180, rawOutputWindow = 0,
const fileChangeInputs = outputItems.some((item) => item.channel === "diff" && item.method === "item/fileChange/outputDelta" && typeof item.itemId === "string")
? codexSessionFileChangesByCallId(task)
: new Map<string, SessionFileChange>();
type ActiveMessage = { seq: number; at: string; title: string; status?: string; body: string; rawSeqs: number[] };
let activeMessage: ActiveMessage | null = null;
const flushMessage = (): void => {
if (activeMessage === null) return;
entries.push(transcriptLine("message", activeMessage.at, activeMessage.seq, activeMessage.title, activeMessage.rawSeqs, activeMessage.body, "", activeMessage.status, fullText));
activeMessage = null;
};
const appendMessage = (item: LiveOutput, title: string, body: string, status?: string): void => {
if (body.length === 0) return;
if (activeMessage !== null && activeMessage.title === title && activeMessage.status === status) {
activeMessage.body += body;
activeMessage.at = item.at;
activeMessage.seq = item.seq;
pushUniqueRawSeq(activeMessage.rawSeqs, item.seq);
return;
}
flushMessage();
activeMessage = { seq: item.seq, at: item.at, title, status, body, rawSeqs: [item.seq] };
};
for (const item of outputItems) {
if (initialPrompt !== null && item.channel === "user" && item.method === "enqueue") continue;
if (item.channel === "user" && item.method === "turn/steer" && promptHistorySeqs.has(item.seq)) continue;
if (isOpenCodeStepBoundaryMethod(item.method)) continue;
if (item.channel === "command" && item.method === "item/started") {
flushMessage();
const parsed = parseCommandLine(item.text);
const groupedCommand = itemIdCommand(item, parsed);
if (groupedCommand !== null) {
@@ -1037,6 +1125,7 @@ function buildTaskTranscript(task: QueueTask, limit = 180, rawOutputWindow = 0,
continue;
}
if (item.channel === "command" && item.method === "item/commandExecution/outputDelta") {
flushMessage();
const groupedCommand = itemIdCommand(item);
if (groupedCommand !== null) {
groupedCommand.body += item.text;
@@ -1051,6 +1140,7 @@ function buildTaskTranscript(task: QueueTask, limit = 180, rawOutputWindow = 0,
continue;
}
if (item.channel === "command" && item.method === "item/completed") {
flushMessage();
const parsed = parseCommandLine(item.text);
const groupedCommand = itemIdCommand(item, parsed);
if (groupedCommand !== null) {
@@ -1071,6 +1161,7 @@ function buildTaskTranscript(task: QueueTask, limit = 180, rawOutputWindow = 0,
continue;
}
if (item.channel !== "assistant" && item.channel !== "reasoning") flushMessage();
flushCommand();
if (item.channel === "diff") {
const text = fileChangeTextWithInlinePatch(item, fileChangeInputs);
@@ -1079,9 +1170,9 @@ function buildTaskTranscript(task: QueueTask, limit = 180, rawOutputWindow = 0,
entries.push(transcriptLine("error", item.at, item.seq, "Error", [item.seq], item.text, "", item.method, fullText));
} else if (item.channel === "assistant") {
const body = String(item.method || "").startsWith("opencode/") ? openCodeVisibleAssistantText(item.text) : item.text;
if (body.length > 0) entries.push(transcriptLine("message", item.at, item.seq, "Assistant message", [item.seq], body, "", item.method, fullText));
appendMessage(item, "Assistant message", body, item.method);
} else if (item.channel === "reasoning") {
entries.push(transcriptLine("message", item.at, item.seq, "Reasoning", [item.seq], item.text, "", item.method, fullText));
appendMessage(item, "Reasoning", item.text, item.method);
} else if (item.channel === "user") {
entries.push(transcriptLine("message", item.at, item.seq, item.method === "enqueue" ? "Submitted prompt" : "User prompt", [item.seq], item.text, "", item.method, fullText));
} else if (item.channel === "tool" && String(item.method || "").startsWith("opencode/")) {
@@ -1097,11 +1188,12 @@ function buildTaskTranscript(task: QueueTask, limit = 180, rawOutputWindow = 0,
entries.push(transcriptLine("system", item.at, item.seq, title, [item.seq], item.text, "", item.method, fullText));
}
}
flushMessage();
flushCommand();
for (const command of Array.from(activeCommandsByItemId.values()).sort((left, right) => left.seq - right.seq)) {
flushCommand(command);
}
return boundedTranscript(entries, limit);
return boundedTranscript(coalesceTranscriptMessageFragments(entries), limit);
}
function buildCompactTaskTranscript(task: QueueTask, limit = 12, rawOutputWindow = 24): TranscriptLine[] {
@@ -1745,7 +1837,7 @@ function oaTraceStepToTranscriptLine(step: OaTraceStepSummary): TranscriptLine {
async function oaTraceTranscriptForTask(taskId: string, attemptIndex: number | null): Promise<TranscriptLine[]> {
const steps = await readOaTraceStepsForTask(taskId, attemptIndex);
return steps.map(oaTraceStepToTranscriptLine).filter(traceLineVisibleInTraceView);
return coalesceTranscriptMessageFragments(steps.map(oaTraceStepToTranscriptLine).filter(traceLineVisibleInTraceView));
}
function mergeTraceWindowLines(left: TranscriptLine[], right: TranscriptLine[]): TranscriptLine[] {