fix code queue trace summary sync contract

This commit is contained in:
Codex
2026-05-20 04:58:58 +00:00
parent e9be28d6bb
commit 93e4630497
9 changed files with 879 additions and 63 deletions
@@ -0,0 +1,242 @@
import { configureTaskView, taskTraceSummaryFixtureResponse } from "../src/components/microservices/code-queue/src/task-view";
import { configureTaskOutput } from "../src/components/microservices/code-queue/src/task-output";
import { configureJudge } from "../src/components/microservices/code-queue/src/judge";
import type { OaTraceStepSummary } from "../src/components/microservices/code-queue/src/oa-events";
import type { JsonValue, PromptHistoryItem, QueueTask, QueuedStatusReason } from "../src/components/microservices/code-queue/src/types";
type JsonRecord = Record<string, unknown>;
function assertCondition(condition: unknown, message: string, detail: JsonRecord = {}): void {
if (!condition) throw new Error(`${message}: ${JSON.stringify(detail)}`);
}
function asRecord(value: unknown): JsonRecord | null {
return typeof value === "object" && value !== null && !Array.isArray(value) ? value as JsonRecord : null;
}
function pageBySeq<T extends { seq: number }>(
items: T[],
_url: URL,
_limit: number,
): { mode: "tail" | "after" | "before"; afterSeq: number; beforeSeq: number | null; nextAfterSeq: number; previousBeforeSeq: number | null; hasMore: boolean; hasBefore: boolean; chunk: T[] } {
return {
mode: "tail",
afterSeq: 0,
beforeSeq: null,
nextAfterSeq: items.at(-1)?.seq ?? 0,
previousBeforeSeq: null,
hasMore: false,
hasBefore: false,
chunk: items,
};
}
function configureFixtureTaskView(): void {
configureTaskOutput({
config: { maxInMemoryOutputRecords: 1000, outputArchiveDir: "/tmp/code-queue-trace-summary-contract/output" },
allocateSeq: () => 1000,
errorToJson: (error: unknown): JsonValue => error instanceof Error ? { message: error.message } : String(error),
logger: () => undefined,
markTaskDirty: () => undefined,
nowIso: () => "2026-05-19T00:10:00.000Z",
schedulePersistState: () => undefined,
});
configureJudge({
config: {
minimaxApiKey: "",
minimaxApiBase: "",
minimaxModel: "minimax-m1",
judgeTimeoutMs: 1000,
judgeRepairAttempts: 0,
judgeMaxTokens: 1000,
},
logger: () => undefined,
safePreview: (value: string, max = 300) => value.length > max ? `${value.slice(0, max)}...` : value,
userPromptForDisplay: (prompt: string) => prompt,
taskFullOutput: (task: QueueTask) => task.output,
taskReferenceIds: (task: QueueTask) => task.referenceTaskIds,
extractRecord: (value: unknown) => typeof value === "object" && value !== null && !Array.isArray(value) ? value as Record<string, unknown> : null,
extractString: (value: unknown, key: string) => {
const record = typeof value === "object" && value !== null && !Array.isArray(value) ? value as Record<string, unknown> : null;
const item = record?.[key];
return typeof item === "string" ? item : null;
},
promptLineCount: (text: string) => text.length > 0 ? text.split(/\r\n|\r|\n/u).length : 0,
judgeFailRetryLimit: 3,
});
configureTaskView({
config: { codexHome: "/tmp/code-queue-trace-summary-contract" },
errorToJson: (error: unknown): JsonValue => error instanceof Error ? { message: error.message } : String(error),
jsonResponse: (body: unknown, status = 200): Response => Response.json(body, { status }),
logger: () => undefined,
mergePromptHistory: (items: PromptHistoryItem[]) => items,
nowIso: () => "2026-05-19T00:10:00.000Z",
outputPromptHistory: () => [],
pageBySeq,
parseLimit: () => 100,
parseSeqParam: () => null,
queueIdOf: (task: QueueTask) => task.queueId,
queuedStatusReason: (): QueuedStatusReason | null => null,
queuedTaskPromptEditable: () => false,
taskQueueEnteredAt: (task: QueueTask) => task.queueEnteredAt,
});
}
function fixtureTask(): QueueTask {
const at = "2026-05-19T00:00:00.000Z";
return {
id: "codex_trace_contract",
queueId: "default",
queueEnteredAt: at,
prompt: "Trace summary contract fixture",
basePrompt: "Trace summary contract fixture",
referenceTaskIds: [],
referenceInjection: null,
providerId: "D601",
cwd: "/workspace",
model: "gpt-5.5",
reasoningEffort: null,
executionMode: "default",
maxAttempts: 99,
status: "running",
createdAt: at,
updatedAt: "2026-05-19T00:06:30.000Z",
startedAt: at,
finishedAt: null,
readAt: null,
currentAttempt: 2,
currentMode: "retry",
codexThreadId: "thread_trace_contract",
activeTurnId: "turn_trace_contract",
finalResponse: "",
lastError: null,
lastJudge: { decision: "retry", confidence: 1, reason: "attempt 1 asked for retry", source: "fallback" },
judgeFailCount: 0,
promptHistory: [],
output: [
{ seq: 1, at, channel: "user", text: "Trace summary contract fixture", method: "enqueue" },
{ seq: 2, at: "2026-05-19T00:00:10.000Z", channel: "system", text: "attempt 1 / 99", method: "queue" },
{ seq: 3, at: "2026-05-19T00:01:00.000Z", channel: "command", text: "rg trace-summary src/components/microservices/code-queue/src", method: "item/started", itemId: "call-1" },
{ seq: 4, at: "2026-05-19T00:02:00.000Z", channel: "system", text: "judge=retry confidence=1 source=fallback: attempt 1 asked for retry", method: "judge" },
],
events: [],
attempts: [
{
index: 1,
mode: "initial",
startedAt: "2026-05-19T00:00:10.000Z",
finishedAt: "2026-05-19T00:02:00.000Z",
terminalStatus: "completed",
transportClosedBeforeTerminal: false,
appServerExitCode: 0,
appServerSignal: null,
error: null,
finalResponse: "Attempt 1 response",
finalResponsePreview: "Attempt 1 response",
finalResponseChars: 18,
stderrTail: "",
judge: { decision: "retry", confidence: 1, reason: "attempt 1 asked for retry", source: "fallback" },
judgeAt: "2026-05-19T00:02:00.000Z",
judgeSeq: 4,
outputStartSeq: 2,
outputEndSeq: 4,
},
],
cancelRequested: false,
nextPrompt: null,
nextMode: null,
};
}
function attempt2Steps(): OaTraceStepSummary[] {
return [
{
eventSequence: 20,
seq: 20,
at: "2026-05-19T00:06:00.000Z",
kind: "ran",
title: "Run",
status: "item/started",
summaryLines: ["attempt 2 / 99", "pnpm test"],
rawSeqs: [20],
scopeId: "task:codex_trace_contract:attempt:2",
attemptIndex: 2,
source: "oa-event-flow",
},
{
eventSequence: 21,
seq: 21,
at: "2026-05-19T00:06:20.000Z",
kind: "explored",
title: "Read",
status: "item/completed",
summaryLines: ["src/components/microservices/code-queue/src/task-view.ts"],
rawSeqs: [21],
scopeId: "task:codex_trace_contract:attempt:2",
attemptIndex: 2,
source: "oa-event-flow",
},
];
}
export function runCodeQueueTraceSummaryContract(): JsonRecord {
configureFixtureTaskView();
const task = fixtureTask();
const steps = attempt2Steps();
const summary = taskTraceSummaryFixtureResponse(task, {
stats: null,
taskStats: null,
allSteps: [
{
eventSequence: 1,
seq: 1,
at: "2026-05-19T00:00:10.000Z",
kind: "message",
title: "Assistant message",
status: "item/completed",
summaryLines: ["Attempt 1 judge complete"],
rawSeqs: [4],
scopeId: "task:codex_trace_contract",
attemptIndex: null,
source: "oa-event-flow",
},
...steps,
],
attemptSteps: new Map([[2, steps]]),
}) as JsonRecord;
const attempts = Array.isArray(summary.attempts) ? summary.attempts.map(asRecord).filter((item): item is JsonRecord => item !== null) : [];
const attempt2 = attempts.find((attempt) => Number(attempt.index) === 2) ?? null;
const taskStats = asRecord(summary.traceStats);
const taskExecution = asRecord(summary.execution);
const attempt2Stats = asRecord(attempt2?.traceStats);
const attempt2Execution = asRecord(attempt2?.execution);
assertCondition(summary.currentAttempt === 2, "summary must retain currentAttempt=2", summary);
assertCondition(summary.statsSource === "raw-trace-fallback", "summary must distinguish raw trace fallback from empty STEP", summary);
assertCondition(summary.traceStatsState === "degraded", "summary must mark OA stats sync degraded", summary);
assertCondition(summary.traceStatsReason === "oa-event-flow-stats-unavailable-raw-trace-present", "summary must explain degraded OA sync", summary);
assertCondition(taskStats?.source === "oa-event-flow" && taskStats?.sourceHint === "raw-trace-fallback", "summary must expose countable synthetic stats with source hint", taskStats ?? {});
assertCondition(taskExecution?.statsSource === "oa-event-flow" && taskExecution?.traceStatsState === "degraded", "execution summary must stay countable while degraded", taskExecution ?? {});
assertCondition(Number(summary.stepCount ?? 0) > 0, "summary fallback STEP count must be visible", summary);
assertCondition(attempt2 !== null, "summary must materialize the latest running retry attempt", { attempts });
assertCondition(Number(attempt2?.stepCount ?? 0) > 0, "attempt 2 must expose live fallback STEP count", attempt2 ?? {});
assertCondition(attempt2Stats?.source === "oa-event-flow" && attempt2Stats?.sourceHint === "raw-trace-fallback", "attempt 2 fallback stats must remain countable", attempt2Stats ?? {});
assertCondition(attempt2Execution?.statsSource === "oa-event-flow" && attempt2Execution?.traceStatsState === "degraded", "attempt 2 execution must be countable while degraded", attempt2Execution ?? {});
return {
ok: true,
checks: [
{ name: "code-queue:trace-summary-latest-attempt-visible", ok: true },
{ name: "code-queue:trace-summary-raw-trace-step-fallback", ok: true },
],
taskId: task.id,
stepCount: summary.stepCount,
statsSource: summary.statsSource,
traceStatsState: summary.traceStatsState,
attempt2StepCount: attempt2?.stepCount,
};
}
if (import.meta.main) {
process.stdout.write(`${JSON.stringify(runCodeQueueTraceSummaryContract(), null, 2)}\n`);
}
+3
View File
@@ -267,6 +267,7 @@ export function runChecks(config: UniDeskConfig, options: CheckOptions = default
fileItem("scripts/code-queue-issue3-regression-test.ts"),
fileItem("scripts/code-queue-liveness-diagnostics-test.ts"),
fileItem("scripts/src/code-queue-liveness-fixtures.ts"),
fileItem("scripts/code-queue-trace-summary-contract-test.ts"),
fileItem("scripts/src/ci.ts"),
fileItem("scripts/src/e2e.ts"),
fileItem("scripts/code-queue-prompt-observation-test.ts"),
@@ -280,6 +281,7 @@ export function runChecks(config: UniDeskConfig, options: CheckOptions = default
items.push(commandItem("typescript:scripts", ["bunx", "tsc", "-p", "scripts/tsconfig.json", "--noEmit", "--pretty", "false"], 120_000));
items.push(commandItem("code-queue:prompt-observation-contract", ["bun", "scripts/code-queue-prompt-observation-test.ts"], 30_000));
items.push(commandItem("code-queue:issue3-diagnostics-and-image-preflight", ["bun", "scripts/code-queue-issue3-regression-test.ts"], 30_000));
items.push(commandItem("code-queue:trace-summary-contract", ["bun", "scripts/code-queue-trace-summary-contract-test.ts"], 30_000));
items.push(commandItem("code-queue:active-run-heartbeat-visible", ["bun", "scripts/code-queue-liveness-diagnostics-test.ts", "--only", "code-queue:active-run-heartbeat-visible"], 30_000));
items.push(commandItem("code-queue:trace-gap-not-stale", ["bun", "scripts/code-queue-liveness-diagnostics-test.ts", "--only", "code-queue:trace-gap-not-stale"], 30_000));
items.push(commandItem("code-queue:stale-active-owner-expired", ["bun", "scripts/code-queue-liveness-diagnostics-test.ts", "--only", "code-queue:stale-active-owner-expired"], 30_000));
@@ -289,6 +291,7 @@ export function runChecks(config: UniDeskConfig, options: CheckOptions = default
items.push(skippedItem("typescript:scripts", "scripts TypeScript typecheck is opt-in", "--scripts-typecheck or --full"));
items.push(skippedItem("code-queue:prompt-observation-contract", "prompt observation contract is opt-in with script checks", "--scripts-typecheck or --full"));
items.push(skippedItem("code-queue:issue3-diagnostics-and-image-preflight", "Code Queue issue #3 regression fixtures are opt-in with script checks", "--scripts-typecheck or --full"));
items.push(skippedItem("code-queue:trace-summary-contract", "Code Queue trace summary contract is opt-in with script checks", "--scripts-typecheck or --full"));
items.push(skippedItem("code-queue:liveness-diagnostics-fixtures", "Code Queue liveness diagnostics fixtures are opt-in with script checks", "--scripts-typecheck or --full"));
}
if (options.logs) {
+2 -1
View File
@@ -630,8 +630,9 @@ function renderTraceConsoleRows(summary: Record<string, unknown>, steps: Record<
const execution = asRecord(summary.execution) ?? {};
const stats = asRecord(execution.traceStats) ?? asRecord(summary.traceStats);
const statsSource = String(execution.statsSource || summary.statsSource || "");
const statsUsable = stats !== null && (statsSource === "oa-event-flow" || statsSource === "raw-trace-fallback" || stats.source === "oa-event-flow");
const stat = (key: string): string | number => {
if (!stats || statsSource !== "oa-event-flow") return "--";
if (!statsUsable) return "--";
const value = Number(stats[key]);
return Number.isFinite(value) && value >= 0 ? Math.floor(value) : "--";
};
@@ -249,6 +249,25 @@ interface OaTraceStepRow {
updated_at: Date | string;
}
interface OaTraceStatsRow {
scope_id: string;
service_id: string;
subject_kind: string;
subject_id: string;
stats_revision: number | string;
step_count: number | string;
llm_step_count: number | string;
read_count: number | string;
edit_count: number | string;
run_count: number | string;
error_count: number | string;
trace_line_count: number | string;
output_max_seq: number | string;
attempt_stats_json: unknown;
last_event_sequence: number | string | null;
updated_at: Date | string;
}
interface TraceStepLine {
seq: number;
at: string;
@@ -446,6 +465,11 @@ function timestampMs(value: string | Date | null | undefined): number | null {
return Number.isFinite(ms) ? ms : null;
}
function positiveNumber(value: unknown): number | null {
const parsed = Number(value);
return Number.isFinite(parsed) && parsed >= 0 ? Math.floor(parsed) : null;
}
function prefixPreview(value: unknown, maxChars: number): string {
const text = String(value ?? "");
return text.length <= maxChars ? text : `${text.slice(0, Math.max(0, maxChars - 1))}`;
@@ -814,6 +838,41 @@ function activeTask(task: QueueTask): boolean {
return task.status === "running" || task.status === "judging";
}
function visibleTraceOutputs(task: QueueTask): LiveOutput[] {
return task.output.filter((item) => item.channel !== "system" || item.method !== "enqueue");
}
function rawTraceStats(task: QueueTask): JsonRecord {
const steps = visibleTraceOutputs(task);
const count = (channel: string) => steps.filter((item) => item.channel === channel).length;
const stepCount = positiveNumber(task.stepCount ?? task.llmStepCount) ?? steps.length;
return {
stepCount,
llmStepCount: positiveNumber(task.llmStepCount ?? task.stepCount) ?? stepCount,
traceLineCount: steps.length,
outputMaxSeq: outputMaxSeq(task),
readCount: count("tool"),
editCount: count("diff"),
runCount: count("command"),
errorCount: count("error"),
};
}
function traceStatsFallbackPatch(task: QueueTask): JsonRecord {
const fallback = rawTraceStats(task);
const rawTracePresent = Number(fallback.stepCount ?? 0) > 0 || Number(fallback.traceLineCount ?? 0) > 0 || Number(fallback.outputMaxSeq ?? 0) > 0;
return {
...fallback,
traceStats: null,
statsSource: rawTracePresent ? "raw-trace-fallback" : "raw-trace-empty",
traceStatsState: rawTracePresent ? "degraded" : "empty",
traceStatsReason: rawTracePresent ? "oa-event-flow-stats-unavailable-retained-trace-present" : "no-trace-steps-yet",
statsUnavailable: true,
statsSyncing: rawTracePresent,
rawTraceStepCount: fallback.stepCount,
};
}
function schedulerHeartbeat(task: QueueTask): JsonRecord | null {
const heartbeat = asRecord(task.schedulerHeartbeat);
return heartbeat !== null && heartbeat.taskId === task.id ? heartbeat as JsonRecord : null;
@@ -1917,10 +1976,7 @@ function taskListResponse(task: QueueTask, lite = true): JsonRecord {
...promptFields,
promptEditable: queuedTaskPromptEditable(task),
finalResponseChars: task.finalResponse.length,
stepCount: numberField(task.stepCount ?? task.llmStepCount, 0),
llmStepCount: numberField(task.llmStepCount ?? task.stepCount, 0),
traceStats: null,
statsSource: "code-queue-mgr",
...traceStatsFallbackPatch(task),
summaryOnly: true,
referenceTaskIds: task.referenceTaskIds,
referenceInjection: task.referenceInjection,
@@ -2299,6 +2355,66 @@ function traceScopeId(taskId: string, url: URL): string {
return attempt > 0 ? `task:${taskId}:attempt:${attempt}` : `task:${taskId}`;
}
function taskScopeId(taskId: string): string {
return `task:${taskId}`;
}
function taskAttemptScopeId(taskId: string, attemptIndex: number): string {
return `${taskScopeId(taskId)}:attempt:${Math.floor(attemptIndex)}`;
}
function traceStatsRowToRecord(row: OaTraceStatsRow): JsonRecord {
const taskAttempt = row.scope_id.match(/^task:([^:]+):attempt:(\d+)$/u);
const taskOnly = row.scope_id.match(/^task:([^:]+)$/u);
const taskId = taskAttempt?.[1] ?? taskOnly?.[1] ?? "";
const attemptIndex = taskAttempt === null ? null : Number(taskAttempt[2]);
return {
scopeId: row.scope_id,
serviceId: row.service_id,
subjectKind: row.subject_kind,
subjectId: row.subject_id,
statsRevision: numberField(row.stats_revision, 0),
stepCount: numberField(row.step_count, 0),
llmStepCount: numberField(row.llm_step_count, 0),
readCount: numberField(row.read_count, 0),
editCount: numberField(row.edit_count, 0),
runCount: numberField(row.run_count, 0),
errorCount: numberField(row.error_count, 0),
traceLineCount: numberField(row.trace_line_count, 0),
outputMaxSeq: numberField(row.output_max_seq, 0),
attemptStats: toJsonValue(row.attempt_stats_json ?? null),
lastEventSequence: row.last_event_sequence === null ? null : numberField(row.last_event_sequence, 0),
updatedAt: timestampToIso(row.updated_at),
taskId,
attemptIndex,
source: "oa-event-flow",
};
}
async function loadOaTraceStats(scopeIds: string[]): Promise<Map<string, JsonRecord>> {
const uniqueScopeIds = Array.from(new Set(scopeIds.map((scopeId) => scopeId.trim()).filter(Boolean)));
const result = new Map<string, JsonRecord>();
if (uniqueScopeIds.length === 0) return result;
try {
const rows = await traceSql<OaTraceStatsRow[]>`
SELECT scope_id, service_id, subject_kind, subject_id, stats_revision,
step_count, llm_step_count, read_count, edit_count, run_count, error_count,
trace_line_count, output_max_seq, attempt_stats_json, last_event_sequence, updated_at
FROM oa_trace_stats
WHERE scope_id IN ${traceSql(uniqueScopeIds)}
ORDER BY updated_at DESC
LIMIT ${Math.max(100, uniqueScopeIds.length)}
`;
for (const row of rows) {
const record = traceStatsRowToRecord(row);
result.set(String(record.scopeId || row.scope_id), record);
}
} catch (error) {
log("warn", "oa_trace_stats_load_failed", { scopeIds: uniqueScopeIds, error: errorToJson(error) });
}
return result;
}
function oaTraceStepToLine(row: OaTraceStepRow): TraceStepLine | null {
const seq = numberField(row.step_seq, 0);
if (seq <= 0) return null;
@@ -2336,6 +2452,17 @@ async function loadOaTraceStepLines(taskId: string, url: URL): Promise<TraceStep
return coalesceCodexToolLifecycleTraceSteps(rows.map(oaTraceStepToLine).filter((line): line is TraceStepLine => line !== null));
}
async function loadOaTraceStepLinesForScope(scopeId: string): Promise<TraceStepLine[]> {
const rows = await traceSql<OaTraceStepRow[]>`
SELECT scope_id, step_seq, kind, title, status, summary_lines, raw_seqs, created_at, updated_at
FROM oa_trace_steps
WHERE scope_id = ${scopeId}
ORDER BY step_seq ASC
LIMIT 5000
`;
return coalesceCodexToolLifecycleTraceSteps(rows.map(oaTraceStepToLine).filter((line): line is TraceStepLine => line !== null));
}
function fallbackTraceStepLines(task: QueueTask): TraceStepLine[] {
return task.output
.filter((item) => item.channel !== "system" || item.method !== "enqueue")
@@ -2397,21 +2524,147 @@ async function traceStepDetail(task: QueueTask, url: URL): Promise<JsonRecord |
return { ok: true, taskId: task.id, source: "code-queue-mgr-postgres", step: item as unknown as JsonValue, rawOutput: item as unknown as JsonValue, line: payload };
}
function traceSummary(task: QueueTask): JsonRecord {
const steps = task.output.filter((item) => item.channel !== "system" || item.method !== "enqueue");
function attemptIndexesForTrace(task: QueueTask): number[] {
const indexes = new Set<number>();
for (const attempt of task.attempts) {
const index = Number(attempt.index);
if (Number.isInteger(index) && index > 0) indexes.add(index);
}
const current = Number(task.currentAttempt || 0);
const max = Math.max(current, ...Array.from(indexes), 0);
for (let index = 1; index <= max; index += 1) indexes.add(index);
return Array.from(indexes).sort((left, right) => left - right);
}
function traceStepExecutionStats(steps: TraceStepLine[]): JsonRecord {
const countKind = (kind: string) => steps.filter((step) => step.kind === kind).length;
return {
stepCount: steps.length,
llmStepCount: steps.length,
traceLineCount: steps.length,
outputMaxSeq: steps.at(-1)?.seq ?? 0,
readCount: countKind("explored"),
editCount: countKind("edited"),
runCount: countKind("ran"),
errorCount: countKind("error"),
};
}
function fallbackTraceStatsRecord(scopeId: string, fallback: JsonRecord): JsonRecord {
const stepCount = positiveNumber(fallback.stepCount) ?? 0;
return {
scopeId,
source: "oa-event-flow",
stepCount,
llmStepCount: positiveNumber(fallback.llmStepCount) ?? stepCount,
traceLineCount: positiveNumber(fallback.traceLineCount) ?? stepCount,
outputMaxSeq: positiveNumber(fallback.outputMaxSeq) ?? 0,
readCount: positiveNumber(fallback.readCount) ?? 0,
editCount: positiveNumber(fallback.editCount) ?? 0,
runCount: positiveNumber(fallback.runCount) ?? 0,
errorCount: positiveNumber(fallback.errorCount) ?? 0,
sourceHint: "raw-trace-fallback",
};
}
function applyStatsOrFallback(fallback: JsonRecord, stats: JsonRecord | null, scopeId: string): JsonRecord {
if (stats !== null) {
return {
...fallback,
...stats,
traceStats: stats,
statsSource: "oa-event-flow",
traceStatsState: "ready",
traceStatsReason: null,
statsUnavailable: false,
statsSyncing: false,
};
}
const rawTracePresent = Number(fallback.stepCount ?? 0) > 0 || Number(fallback.traceLineCount ?? 0) > 0 || Number(fallback.outputMaxSeq ?? 0) > 0;
const traceStats = rawTracePresent ? fallbackTraceStatsRecord(scopeId, fallback) : null;
return {
...fallback,
traceStats,
statsSource: rawTracePresent ? "raw-trace-fallback" : "raw-trace-empty",
traceStatsState: rawTracePresent ? "degraded" : "empty",
traceStatsReason: rawTracePresent ? "oa-event-flow-stats-unavailable-raw-trace-present" : "no-trace-steps-yet",
statsUnavailable: true,
statsSyncing: rawTracePresent,
rawTraceStepCount: fallback.stepCount ?? 0,
};
}
async function traceSummary(task: QueueTask): Promise<JsonRecord> {
const fallbackSteps = fallbackTraceStepLines(task);
const attempts = attemptIndexesForTrace(task);
const scopeIds = [taskScopeId(task.id), ...attempts.map((index) => taskAttemptScopeId(task.id, index))];
const [stats, allOaSteps, attemptStepEntries] = await Promise.all([
loadOaTraceStats(scopeIds),
loadOaTraceStepLinesForScope(taskScopeId(task.id)).catch((error) => {
log("warn", "oa_trace_summary_steps_load_failed", { taskId: task.id, error: errorToJson(error) });
return [] as TraceStepLine[];
}),
Promise.all(attempts.map(async (index) => {
const steps = await loadOaTraceStepLinesForScope(taskAttemptScopeId(task.id, index)).catch((error) => {
log("warn", "oa_trace_summary_attempt_steps_load_failed", { taskId: task.id, attemptIndex: index, error: errorToJson(error) });
return [] as TraceStepLine[];
});
return [index, steps] as const;
})),
]);
const visibleSteps = allOaSteps.length > 0 ? allOaSteps : fallbackSteps;
const taskStats = applyStatsOrFallback(traceStepExecutionStats(visibleSteps), stats.get(taskScopeId(task.id)) ?? null, taskScopeId(task.id));
const attemptSteps = new Map(attemptStepEntries);
const attemptRows = attempts.length > 0 ? attempts : task.attempts.map((attempt) => attempt.index);
return {
id: task.id,
taskId: task.id,
queueId: queueIdOf(task),
status: task.status,
providerId: task.providerId,
executionMode: task.executionMode,
executionModeInfo: executionModeInfo(task.executionMode),
model: task.model,
stepCount: numberField(task.stepCount ?? task.llmStepCount, steps.length),
retainedStepCount: steps.length,
agentPort: codeAgentPortForModel(task.model),
agentPortInfo: codeAgentPortInfo(codeAgentPortForModel(task.model)),
cwd: task.cwd,
reasoningEffort: task.reasoningEffort,
createdAt: task.createdAt,
startedAt: task.startedAt,
finishedAt: task.finishedAt,
updatedAt: task.updatedAt,
currentAttempt: task.currentAttempt,
currentMode: task.currentMode,
maxAttempts: task.maxAttempts,
...taskStats,
retainedStepCount: fallbackSteps.length,
outputMaxSeq: outputMaxSeq(task),
schedulerHeartbeat: task.schedulerHeartbeat ?? null,
statsSource: "code-queue-mgr-postgres",
attempts: task.attempts.map((attempt) => ({
execution: taskStats,
attempts: attemptRows.map((index) => {
const attempt = task.attempts.find((item) => Number(item.index) === index) ?? task.attempts[index - 1] ?? null;
const steps = attemptSteps.get(index) ?? fallbackSteps.filter((step) => step.seq >= Number(attempt?.outputStartSeq ?? -Infinity) && step.seq <= Number(attempt?.outputEndSeq ?? Infinity));
const attemptFallback = traceStepExecutionStats(steps);
const attemptStats = applyStatsOrFallback(attemptFallback, stats.get(taskAttemptScopeId(task.id, index)) ?? null, taskAttemptScopeId(task.id, index));
return {
...(attempt ?? {}),
index,
mode: attempt?.mode ?? (index <= 1 ? "initial" : "retry"),
startedAt: attempt?.startedAt ?? steps[0]?.at ?? task.startedAt,
finishedAt: attempt?.finishedAt ?? null,
terminalStatus: attempt?.terminalStatus ?? null,
startSeq: steps[0]?.seq ?? attempt?.outputStartSeq ?? null,
endSeq: steps.at(-1)?.seq ?? attempt?.outputEndSeq ?? null,
finalResponsePreview: attempt?.finalResponsePreview ?? "",
judge: attempt?.judge ?? (index === task.currentAttempt ? task.lastJudge : null),
feedbackPromptPreview: attempt?.feedbackPromptPreview ?? null,
feedbackPromptChars: attempt?.feedbackPromptChars ?? null,
attemptScopeId: taskAttemptScopeId(task.id, index),
...attemptStats,
execution: attemptStats,
};
}),
storedAttempts: task.attempts.map((attempt) => ({
index: attempt.index,
mode: attempt.mode,
startedAt: attempt.startedAt,
@@ -2429,6 +2682,12 @@ function traceSummary(task: QueueTask): JsonRecord {
lines: task.prompt.split(/\r?\n/u).length,
},
lastAssistantMessage: lastAssistantMessage(task),
finalResponse: task.finalResponse,
finalResponseChars: task.finalResponse.length,
lastJudge: task.lastJudge,
lastError: task.lastError,
errorCount: taskStats.errorCount,
timing: taskTiming(task),
};
}
@@ -2976,7 +3235,7 @@ async function route(req: Request): Promise<Response> {
const traceSummaryMatch = url.pathname.match(/^\/api\/tasks\/([^/]+)\/trace-summary$/u);
if (traceSummaryMatch !== null && req.method === "GET") {
const task = await loadTask(decodeURIComponent(traceSummaryMatch[1] ?? ""));
return task === null ? jsonResponse({ ok: false, error: "task not found" }, 404) : jsonResponse({ ok: true, summary: traceSummary(task) });
return task === null ? jsonResponse({ ok: false, error: "task not found" }, 404) : jsonResponse({ ok: true, summary: await traceSummary(task) });
}
const traceStepsMatch = url.pathname.match(/^\/api\/tasks\/([^/]+)\/trace-steps$/u);
if (traceStepsMatch !== null && req.method === "GET") {
@@ -130,8 +130,9 @@ import {
readOaTraceStatsForTask,
readOaTraceStatsForTaskAttempts,
readOaTraceStatsForTasks,
readOaTraceStepsForTask,
} from "./oa-events";
import { configureSelfTests, runJudgeInfraSelfTest, runQueueClaimMoveSelfTest, runQueueOrderingSelfTest, runReferenceInjectionSelfTest, runTracePortSelfTest } from "./self-tests";
import { configureSelfTests, runJudgeInfraSelfTest, runQueueClaimMoveSelfTest, runQueueOrderingSelfTest, runReferenceInjectionSelfTest, runTracePortSelfTest, runTraceSummaryContractSelfTest } from "./self-tests";
import {
codexToolLifecycleStartedBeforeIn,
configureTaskView,
@@ -5303,6 +5304,7 @@ async function route(req: Request): Promise<Response> {
if (url.pathname === "/api/queue-claim-move/self-test" && (req.method === "GET" || req.method === "POST")) return jsonResponse(await runQueueClaimMoveSelfTest());
if (url.pathname === "/api/reference-injection/self-test" && (req.method === "GET" || req.method === "POST")) return jsonResponse(await runReferenceInjectionSelfTest());
if (url.pathname === "/api/trace-port/self-test" && (req.method === "GET" || req.method === "POST")) return jsonResponse(runTracePortSelfTest());
if (url.pathname === "/api/trace-summary-contract/self-test" && (req.method === "GET" || req.method === "POST")) return jsonResponse(runTraceSummaryContractSelfTest());
if (url.pathname === "/api/oa/backfill" && (req.method === "GET" || req.method === "POST")) return jsonResponse(await backfillOaTraceStats(url));
if (url.pathname === "/api/notifications/claudeqq" && req.method === "GET") {
await loadClaudeQqNotificationOutboxFromDatabase();
@@ -5410,8 +5412,23 @@ async function route(req: Request): Promise<Response> {
if (traceSummaryMatch !== null && req.method === "GET") {
const task = await findTaskForRead(decodeURIComponent(traceSummaryMatch[1] ?? ""));
if (task === null) return jsonResponse({ ok: false, error: "task not found" }, 404);
const traceStats = await readOaTraceStatsForTaskAttempts(task.id, traceAttemptIndexesForTask(task));
return jsonResponse({ ok: true, summary: taskTraceSummaryResponse(task, traceStats.get(`task:${task.id}`) ?? null, traceStats) });
const attemptIndexes = traceAttemptIndexesForTask(task);
const [traceStats, allOaSteps, attemptOaStepsEntries] = await Promise.all([
readOaTraceStatsForTaskAttempts(task.id, attemptIndexes),
readOaTraceStepsForTask(task.id, null).catch((error) => {
logger("warn", "oa_trace_summary_steps_read_failed", { taskId: task.id, error: errorToJson(error) });
return [];
}),
Promise.all(attemptIndexes.map(async (attemptIndex) => {
const steps = await readOaTraceStepsForTask(task.id, attemptIndex).catch((error) => {
logger("warn", "oa_trace_summary_attempt_steps_read_failed", { taskId: task.id, attemptIndex, error: errorToJson(error) });
return [];
});
return [attemptIndex, steps] as const;
})),
]);
const attemptOaSteps = new Map(attemptOaStepsEntries);
return jsonResponse({ ok: true, summary: taskTraceSummaryResponse(task, traceStats.get(`task:${task.id}`) ?? null, traceStats, { allSteps: allOaSteps, attemptSteps: attemptOaSteps }) });
}
const traceStepsMatch = url.pathname.match(/^\/api\/tasks\/([^/]+)\/trace-steps$/u);
if (traceStepsMatch !== null && req.method === "GET") {
@@ -33,6 +33,18 @@ export interface OaTraceStats extends JsonRecord {
source: "oa-event-flow";
}
export interface TraceStatsFallback {
stepCount?: number | null;
llmStepCount?: number | null;
traceLineCount?: number | null;
outputMaxSeq?: number | null;
readCount?: number | null;
editCount?: number | null;
runCount?: number | null;
errorCount?: number | null;
reason?: string;
}
export interface OaTraceStepSummary {
eventSequence: number;
seq: number;
@@ -654,16 +666,69 @@ function statNumber(stats: OaTraceStats | null | undefined, key: string): number
return Number.isFinite(value) && value >= 0 ? Math.floor(value) : null;
}
export function applyOaTraceStatsToTaskJson(value: JsonValue, stats: OaTraceStats | null | undefined): JsonValue {
function fallbackNumber(fallback: TraceStatsFallback | null | undefined, key: keyof TraceStatsFallback): number | null {
const value = Number(fallback?.[key]);
return Number.isFinite(value) && value >= 0 ? Math.floor(value) : null;
}
function fallbackTraceStatsPatch(fallback: TraceStatsFallback | null | undefined, taskId = ""): JsonRecord {
const stepCount = fallbackNumber(fallback, "stepCount") ?? fallbackNumber(fallback, "llmStepCount");
const llmStepCount = fallbackNumber(fallback, "llmStepCount") ?? stepCount;
const traceLineCount = fallbackNumber(fallback, "traceLineCount");
const outputMaxSeq = fallbackNumber(fallback, "outputMaxSeq");
const rawTracePresent = (stepCount ?? 0) > 0 || (traceLineCount ?? 0) > 0 || (outputMaxSeq ?? 0) > 0;
const state = rawTracePresent ? "degraded" : "empty";
const reason = rawTracePresent
? fallback?.reason || "oa-event-flow-stats-unavailable-raw-trace-present"
: "no-trace-steps-yet";
const traceStats: JsonRecord = {
scopeId: taskId.length > 0 ? `task:${taskId}` : "task:unknown",
source: "oa-event-flow",
stepCount: stepCount ?? 0,
llmStepCount: llmStepCount ?? stepCount ?? 0,
traceLineCount: traceLineCount ?? stepCount ?? 0,
outputMaxSeq: outputMaxSeq ?? 0,
readCount: fallbackNumber(fallback, "readCount") ?? 0,
editCount: fallbackNumber(fallback, "editCount") ?? 0,
runCount: fallbackNumber(fallback, "runCount") ?? 0,
errorCount: fallbackNumber(fallback, "errorCount") ?? 0,
sourceHint: "raw-trace-fallback",
};
const patch: JsonRecord = {
traceStats,
statsSource: rawTracePresent ? "raw-trace-fallback" : "raw-trace-empty",
traceStatsState: state,
traceStatsReason: reason,
statsUnavailable: true,
statsSyncing: rawTracePresent,
rawTraceStepCount: stepCount ?? 0,
};
if (stepCount !== null) patch.stepCount = stepCount;
if (llmStepCount !== null) patch.llmStepCount = llmStepCount;
if (traceLineCount !== null) patch.traceLineCount = traceLineCount;
if (outputMaxSeq !== null) patch.outputMaxSeq = outputMaxSeq;
for (const key of ["readCount", "editCount", "runCount", "errorCount"] as const) {
const value = fallbackNumber(fallback, key);
if (value !== null) patch[key] = value;
}
return patch;
}
export function applyOaTraceStatsToTaskJson(value: JsonValue, stats: OaTraceStats | null | undefined, fallback: TraceStatsFallback | null = null): JsonValue {
if (typeof value !== "object" || value === null || Array.isArray(value)) return value;
if (stats === null || stats === undefined) {
return { ...(value as JsonRecord), traceStats: null, statsSource: "unavailable", stepCount: null, llmStepCount: null } as unknown as JsonValue;
const taskId = typeof (value as JsonRecord).id === "string" ? String((value as JsonRecord).id) : "";
return { ...(value as JsonRecord), ...fallbackTraceStatsPatch(fallback, taskId) } as unknown as JsonValue;
}
const stepCount = statNumber(stats, "stepCount");
const outputMaxSeq = statNumber(stats, "outputMaxSeq");
const patch: JsonRecord = {
traceStats: stats,
statsSource: "oa-event-flow",
traceStatsState: "ready",
traceStatsReason: null,
statsUnavailable: false,
statsSyncing: false,
};
if (stepCount !== null) {
patch.stepCount = stepCount;
@@ -5,9 +5,9 @@ import { codeAgentPortForModel, codeAgentPortInfo, codeExecutionModeInfo, codeEx
import { claudeQqNotificationOutboxStats, notificationTargetConfigured, notificationTargetLabel } from "./notifications";
import { executionModeOptions, executionProviderOptions } from "./provider-runtime";
import { taskFullOutput } from "./task-output";
import { applyOaTraceStatsToTaskJson, taskScopeId, type OaTraceStats } from "./oa-events";
import { applyOaTraceStatsToTaskJson, taskScopeId, type OaTraceStats, type TraceStatsFallback } from "./oa-events";
import { buildExecutionDiagnostics, schedulerHeartbeatStaleMs } from "./execution-diagnostics";
import { buildCompactTaskTranscript, buildTaskTranscript, cachedPreviewTranscript, fullTranscript, prefixPreview, safePreview, statsDaysFromUrl, taskForCompactMetaResponse, taskForMetaResponse, taskStatisticsSummary, taskTiming, timestampMs } from "./task-view";
import { buildCompactTaskTranscript, buildTaskTranscript, cachedPreviewTranscript, fullTranscript, prefixPreview, safePreview, statsDaysFromUrl, taskForCompactMetaResponse, taskForMetaResponse, taskListStepCount, taskStatisticsSummary, taskTiming, timestampMs } from "./task-view";
import { userPromptForDisplay } from "./prompts";
import type { ActiveRun, ActiveRunSlotWaiter } from "./code-agent/common";
import type { JsonValue, QueueRecord, QueuedStatusReason, QueueTask, RuntimeConfig, TaskStatus, TranscriptLine } from "./types";
@@ -208,8 +208,31 @@ function statsForTask(stats: Map<string, OaTraceStats>, task: QueueTask): OaTrac
return stats.get(taskScopeId(task.id)) ?? null;
}
function traceStatsFallbackForTask(task: QueueTask): TraceStatsFallback {
const transcript = cachedPreviewTranscript(task).filter((line) => line.title !== "Submitted prompt");
const stepCount = taskListStepCount(task);
const outputMaxSeq = task.output.at(-1)?.seq ?? 0;
const readCount = transcript.filter((line) => line.kind === "explored").length;
const editCount = transcript.filter((line) => line.kind === "edited").length;
const runCount = transcript.filter((line) => line.kind === "ran").length;
const errorCount = transcript.filter((line) => line.kind === "error").length;
return {
stepCount,
llmStepCount: stepCount,
traceLineCount: transcript.length,
outputMaxSeq,
readCount,
editCount,
runCount,
errorCount,
reason: stepCount > 0 || transcript.length > 0 || outputMaxSeq > 0
? "oa-event-flow-stats-unavailable-retained-trace-present"
: "no-trace-steps-yet",
};
}
function applyStats(value: JsonValue, stats: Map<string, OaTraceStats>, task: QueueTask): JsonValue {
return applyOaTraceStatsToTaskJson(value, statsForTask(stats, task));
return applyOaTraceStatsToTaskJson(value, statsForTask(stats, task), traceStatsFallbackForTask(task));
}
function taskForListResponse(task: QueueTask, lite = false, queueTasks?: QueueTask[]): JsonValue {
@@ -4,8 +4,9 @@ import { minimaxM27Model } from "./code-agent/common";
import { openCodeTransportClosedBeforeTerminal, remoteOpenCodeRunCommandForTest } from "./code-agent/opencode";
import { continuePromptSourceBudgetChars, miniMaxJudgeMessages, parsedContinuePromptForJudge, parseJudgeJson, queueRecoveryRetryPrompt, retryPrompt } from "./judge";
import { codeQueueEnvironmentHintTitle, injectCodeQueueEnvironmentHint, promptWithCodeQueueEnvironmentHint, userPromptForDisplay } from "./prompts";
import { buildTaskTranscript, safePreview, transcriptLineSummaryLines } from "./task-view";
import { buildTaskTranscript, safePreview, taskTraceSummaryFixtureResponse, transcriptLineSummaryLines } from "./task-view";
import type { ActiveRunSlotWaiter } from "./code-agent/common";
import type { OaTraceStepSummary } from "./oa-events";
import type { JsonValue, LiveOutput, QueueTask, QueuedStatusReason, QueueTaskRequest, RuntimeConfig, TaskStatus } from "./types";
export interface SelfTestsContext {
@@ -28,6 +29,7 @@ export interface SelfTestsContext {
removeActiveRunSlotWaiter: (waiter: ActiveRunSlotWaiter) => void;
resolveReasoningEffort: (model: string, explicit?: string | null) => string | null;
runDatabaseClaimMoveSelfTest?: () => Promise<JsonValue | null>;
runTraceSummaryContractSelfTest?: () => JsonValue;
taskIsRecoverableOrphanedActive: (task: QueueTask, staleMs?: number) => boolean;
tasks: () => QueueTask[];
updateProcessingFlag: () => void;
@@ -91,6 +93,118 @@ function assertReferenceTest(condition: boolean, message: string): void {
if (!condition) throw new Error(message);
}
function traceSelfTestTask(): QueueTask {
const basePrompt = "Trace sync contract fixture";
const at = "2026-05-19T00:00:00.000Z";
return ctx().normalizeTask({
id: "codex_trace_sync_fixture",
queueId: ctx().defaultQueueId,
queueEnteredAt: at,
prompt: basePrompt,
basePrompt,
referenceTaskIds: [],
referenceInjection: null,
providerId: ctx().config.mainProviderId,
cwd: ctx().config.defaultWorkdir,
model: ctx().config.defaultModel,
reasoningEffort: ctx().resolveReasoningEffort(ctx().config.defaultModel, ctx().config.defaultReasoningEffort),
executionMode: "default",
maxAttempts: 2,
status: "running",
createdAt: at,
updatedAt: "2026-05-19T00:05:00.000Z",
startedAt: at,
finishedAt: null,
readAt: null,
currentAttempt: 2,
currentMode: "retry",
codexThreadId: "thread_trace_sync_fixture",
activeTurnId: "turn_trace_sync_fixture",
finalResponse: "",
lastError: null,
lastJudge: null,
judgeFailCount: 0,
promptHistory: [],
output: [
{ seq: 1, at, channel: "user", text: basePrompt, method: "enqueue" },
{ seq: 2, at: "2026-05-19T00:01:00.000Z", channel: "command", text: "attempt 1 / 2\nrg src/components/microservices/code-queue/src/task-view.ts", method: "item/started", itemId: "call-1" },
{ seq: 3, at: "2026-05-19T00:01:30.000Z", channel: "assistant", text: "Attempt 1 judge complete.", method: "judge" },
],
events: [],
attempts: [
{
index: 1,
mode: "initial",
startedAt: "2026-05-19T00:00:05.000Z",
finishedAt: "2026-05-19T00:02:00.000Z",
terminalStatus: "completed",
transportClosedBeforeTerminal: false,
appServerExitCode: 0,
appServerSignal: null,
error: null,
finalResponse: "Attempt 1 answer",
finalResponsePreview: "Attempt 1 answer",
finalResponseChars: 16,
stderrTail: "",
judge: { decision: "complete", confidence: 1, reason: "attempt 1 ok", source: "fallback" },
judgeAt: "2026-05-19T00:02:00.000Z",
judgeSeq: 3,
outputStartSeq: 2,
outputEndSeq: 3,
},
],
cancelRequested: false,
nextPrompt: null,
nextMode: null,
});
}
function traceSyncAttemptSteps(): Map<number, OaTraceStepSummary[]> {
const attempt2: OaTraceStepSummary[] = [
{ eventSequence: 20, seq: 20, at: "2026-05-19T00:06:00.000Z", kind: "ran", title: "Run", status: "item/started", summaryLines: ["attempt 2 / 2", "pnpm test"], rawSeqs: [20], scopeId: "task:codex_trace_sync_fixture:attempt:2", attemptIndex: 2, source: "oa-event-flow" },
{ eventSequence: 21, seq: 21, at: "2026-05-19T00:06:20.000Z", kind: "explored", title: "Read", status: "item/completed", summaryLines: ["src/components/microservices/code-queue/src/task-view.ts"], rawSeqs: [21], scopeId: "task:codex_trace_sync_fixture:attempt:2", attemptIndex: 2, source: "oa-event-flow" },
];
return new Map([[2, attempt2]]);
}
function traceSyncAllSteps(): OaTraceStepSummary[] {
return [
{ eventSequence: 1, seq: 1, at: "2026-05-19T00:00:10.000Z", kind: "message", title: "Assistant message", status: "item/completed", summaryLines: ["Judge complete for attempt 1"], rawSeqs: [3], scopeId: "task:codex_trace_sync_fixture", attemptIndex: null, source: "oa-event-flow" },
{ eventSequence: 20, seq: 20, at: "2026-05-19T00:06:00.000Z", kind: "ran", title: "Run", status: "item/started", summaryLines: ["attempt 2 / 2", "pnpm test"], rawSeqs: [20], scopeId: "task:codex_trace_sync_fixture:attempt:2", attemptIndex: 2, source: "oa-event-flow" },
{ eventSequence: 21, seq: 21, at: "2026-05-19T00:06:20.000Z", kind: "explored", title: "Read", status: "item/completed", summaryLines: ["src/components/microservices/code-queue/src/task-view.ts"], rawSeqs: [21], scopeId: "task:codex_trace_sync_fixture:attempt:2", attemptIndex: 2, source: "oa-event-flow" },
];
}
function runTraceSummaryContractSelfTest(): JsonValue {
const task = traceSelfTestTask();
const summary = taskTraceSummaryFixtureResponse(task, {
stats: null,
taskStats: null,
allSteps: traceSyncAllSteps(),
attemptSteps: traceSyncAttemptSteps(),
}) as Record<string, unknown>;
const attempts = Array.isArray(summary.attempts) ? summary.attempts as Record<string, unknown>[] : [];
const currentAttempt = attempts.find((attempt) => Number(attempt.index) === 2) ?? null;
assertReferenceTest(String(summary.statsSource || "").startsWith("raw-trace"), "trace summary should surface raw-trace fallback state");
assertReferenceTest(summary.traceStatsReason === "oa-event-flow-stats-unavailable-raw-trace-present", "trace summary should explain degraded sync");
assertReferenceTest(summary.traceStatsState === "degraded", "trace summary should mark degraded sync state");
assertReferenceTest(Number(summary.stepCount ?? 0) > 0, "trace summary should expose fallback step count");
assertReferenceTest(Array.isArray(attempts) && attempts.length >= 2, "trace summary should materialize latest running attempt");
assertReferenceTest(Number(currentAttempt?.index ?? 0) === 2, "trace summary should include attempt 2");
assertReferenceTest(Number(currentAttempt?.stepCount ?? 0) > 0, "attempt 2 should expose a live step count");
assertReferenceTest(String(currentAttempt?.statsSource || "") === "raw-trace-fallback" || String(currentAttempt?.statsSource || "") === "oa-event-flow", "attempt 2 should not be permanently unavailable");
assertReferenceTest(String(currentAttempt?.traceStatsState || "") === "degraded" || String(currentAttempt?.traceStatsState || "") === "ready", "attempt 2 should surface a visible sync state");
return {
ok: true,
taskId: task.id,
statsSource: String(summary.statsSource || ""),
traceStatsState: String(summary.traceStatsState || ""),
traceStatsReason: String(summary.traceStatsReason || ""),
stepCount: Number(summary.stepCount ?? 0),
attempt2: currentAttempt as JsonValue,
};
}
async function runReferenceInjectionSelfTest(): Promise<JsonValue> {
const at = "2026-05-08T00:00:00.000Z";
const taskA = testTask("codex_1000_aaaaaa", "A base prompt", "A final", [], at);
@@ -562,4 +676,4 @@ function runJudgeInfraSelfTest(): JsonValue {
};
}
export { runJudgeInfraSelfTest, runQueueClaimMoveSelfTest, runQueueOrderingSelfTest, runReferenceInjectionSelfTest, runTracePortSelfTest };
export { runJudgeInfraSelfTest, runQueueClaimMoveSelfTest, runQueueOrderingSelfTest, runReferenceInjectionSelfTest, runTracePortSelfTest, runTraceSummaryContractSelfTest };
@@ -1864,6 +1864,11 @@ interface TraceAttemptWindow {
type TraceStatsLookup = Map<string, JsonValue> | Record<string, JsonValue> | null | undefined;
interface TraceSummaryOaSteps {
allSteps?: OaTraceStepSummary[];
attemptSteps?: Map<number, OaTraceStepSummary[]> | Record<string, OaTraceStepSummary[]>;
}
function transcriptLineSeq(line: TranscriptLine | undefined): number | null {
const value = Number(line?.seq ?? NaN);
return Number.isFinite(value) ? value : null;
@@ -1991,6 +1996,49 @@ async function oaTraceTranscriptForTask(task: QueueTask, attemptIndex: number |
return coalesceCodexToolLifecycleTranscriptLines(overlayTraceMessagesFromRawTranscript(oaLines, rawLines));
}
function oaTraceTranscriptFromSteps(task: QueueTask, steps: OaTraceStepSummary[] = []): TranscriptLine[] {
if (steps.length === 0) return [];
const oaLines = coalesceCodexToolLifecycleTranscriptLines(coalesceTranscriptMessageFragments(steps.map(oaTraceStepToTranscriptLine).filter(traceLineVisibleInTraceView)));
const rawLines = coalesceCodexToolLifecycleTranscriptLines(fullTranscript(task).filter(traceLineVisibleInTraceView));
return coalesceCodexToolLifecycleTranscriptLines(overlayTraceMessagesFromRawTranscript(oaLines, rawLines));
}
function taskAttemptTraceSteps(oaSteps: TraceSummaryOaSteps | null | undefined, attemptIndex: number): OaTraceStepSummary[] {
const source = oaSteps?.attemptSteps;
if (source === undefined || source === null) return [];
const key = String(attemptIndex);
return source instanceof Map ? source.get(attemptIndex) ?? [] : source[key] ?? [];
}
function traceWindowsWithOaSteps(task: QueueTask, transcript: TranscriptLine[], oaSteps: TraceSummaryOaSteps | null | undefined): TraceAttemptWindow[] {
const mergedTranscript = mergeTraceWindowLines(transcript, oaTraceTranscriptFromSteps(task, oaSteps?.allSteps ?? []));
const windows = traceAttemptWindows(task, mergedTranscript);
const byIndex = new Map<number, TraceAttemptWindow>();
for (const window of windows) if (window.index > 0) byIndex.set(window.index, window);
const maxIndex = Math.max(task.currentAttempt || 0, task.attempts.length, ...Array.from(byIndex.keys()), 0);
for (let index = 1; index <= maxIndex; index += 1) {
const lines = oaTraceTranscriptFromSteps(task, taskAttemptTraceSteps(oaSteps, index));
if (lines.length === 0) continue;
const existing = byIndex.get(index);
if (existing !== undefined) {
existing.lines = mergeTraceWindowLines(existing.lines, lines);
existing.startSeq = minNullableSeq(existing.startSeq, transcriptLineSeq(lines[0]));
const lastSeq = transcriptLineSeq(lines.at(-1));
if (lastSeq !== null && (existing.endSeq === null || lastSeq > existing.endSeq)) existing.endSeq = lastSeq;
continue;
}
const attempt = task.attempts.find((item) => Number(item.index) === index) ?? task.attempts[index - 1] ?? null;
windows.push({
index,
attempt,
startSeq: transcriptLineSeq(lines[0]),
endSeq: transcriptLineSeq(lines.at(-1)),
lines,
});
}
return windows.sort((left, right) => Number(left.lines[0]?.seq ?? left.startSeq ?? 0) - Number(right.lines[0]?.seq ?? right.startSeq ?? 0));
}
function mergeTraceWindowLines(left: TranscriptLine[], right: TranscriptLine[]): TranscriptLine[] {
const seen = new Set<string>();
const merged: TranscriptLine[] = [];
@@ -2107,8 +2155,8 @@ function executionLinesForAttempt(lines: TranscriptLine[]): TranscriptLine[] {
return lines.filter((line) => line.title !== "Submitted prompt" && line.title !== "Attempt started" && line.title !== "Judge result");
}
function taskTraceAttemptSummaries(task: QueueTask, transcript: TranscriptLine[], oaTraceStatsByScope: TraceStatsLookup = null): JsonValue[] {
const windows = traceAttemptWindows(task, transcript);
function taskTraceAttemptSummaries(task: QueueTask, transcript: TranscriptLine[], oaTraceStatsByScope: TraceStatsLookup = null, oaSteps: TraceSummaryOaSteps | null = null): JsonValue[] {
const windows = traceWindowsWithOaSteps(task, transcript, oaSteps);
return windows.map((window) => {
const attempt = window.attempt;
const parsedJudge = judgeFromAttemptLines(window.lines);
@@ -2119,11 +2167,24 @@ function taskTraceAttemptSummaries(task: QueueTask, transcript: TranscriptLine[]
const finalResponseChars = Number(attempt?.finalResponseChars ?? finalResponse.length);
const executionLines = executionLinesForAttempt(window.lines);
const attemptStats = synthetic || window.index <= 0 ? null : traceStatsForScope(oaTraceStatsByScope, taskAttemptScopeId(task.id, window.index));
const stepCount = traceStatsNumber(attemptStats, "stepCount");
const readCount = traceStatsNumber(attemptStats, "readCount");
const editCount = traceStatsNumber(attemptStats, "editCount");
const runCount = traceStatsNumber(attemptStats, "runCount");
const errorCount = traceStatsNumber(attemptStats, "errorCount");
const fallbackStats = executionSummaryFromTranscript(
task,
executionLines,
attemptTimingSummary(attempt, executionLines.length > 0 ? executionLines : window.lines),
window.lines.length,
window.lines.length,
);
const fallbackStatsRecord = traceStatsRecord(fallbackStats);
const rawTracePresent = executionLines.length > 0 || window.lines.length > 0;
const fallbackTraceStats = attemptStats === null && rawTracePresent
? traceStatsFallbackRecord(taskAttemptScopeId(task.id, window.index), fallbackStatsRecord, window.lines, window.endSeq)
: null;
const traceStats = attemptStats ?? fallbackTraceStats;
const stepCount = traceStatsNumber(traceStats, "stepCount") ?? traceStatsNumber(fallbackStatsRecord, "stepCount");
const readCount = traceStatsNumber(traceStats, "readCount") ?? traceStatsNumber(fallbackStatsRecord, "readCount");
const editCount = traceStatsNumber(traceStats, "editCount") ?? traceStatsNumber(fallbackStatsRecord, "editCount");
const runCount = traceStatsNumber(traceStats, "runCount") ?? traceStatsNumber(fallbackStatsRecord, "runCount");
const errorCount = traceStatsNumber(traceStats, "errorCount") ?? traceStatsNumber(fallbackStatsRecord, "errorCount");
const feedbackPrompt = synthetic ? null : attemptFeedbackPromptRecord(task, window.index, attempt, judge);
const inputPrompt = promptSnapshot(String(attempt?.inputPrompt ?? ""), 1200);
return {
@@ -2166,22 +2227,22 @@ function taskTraceAttemptSummaries(task: QueueTask, transcript: TranscriptLine[]
editCount,
runCount,
errorCount,
statsSource: attemptStats === null ? "unavailable" : "oa-event-flow",
traceStats: attemptStats,
execution: executionSummaryWithOaStats(
executionSummaryFromTranscript(
task,
executionLines,
attemptTimingSummary(attempt, executionLines.length > 0 ? executionLines : window.lines),
window.lines.length,
window.lines.length,
),
attemptStats,
),
statsSource: attemptStats === null ? rawTracePresent ? "raw-trace-fallback" : "raw-trace-empty" : "oa-event-flow",
traceStatsState: attemptStats === null ? rawTracePresent ? "degraded" : "empty" : "ready",
traceStatsReason: attemptStats === null ? rawTracePresent ? "oa-event-flow-stats-unavailable-raw-trace-present" : "no-trace-steps-yet" : null,
statsUnavailable: attemptStats === null,
statsSyncing: attemptStats === null && rawTracePresent,
rawTraceStepCount: attemptStats === null ? traceStatsNumber(fallbackStatsRecord, "stepCount") ?? window.lines.length : null,
traceStats,
execution: executionSummaryWithOaStats(fallbackStats, traceStats),
};
}) as unknown as JsonValue[];
}
function taskTraceSummaryFixtureResponse(task: QueueTask, options: { stats?: TraceStatsLookup; taskStats?: JsonValue | null; allSteps?: OaTraceStepSummary[]; attemptSteps?: Map<number, OaTraceStepSummary[]> | Record<string, OaTraceStepSummary[]> } = {}): JsonValue {
return taskTraceSummaryResponse(task, options.taskStats ?? null, options.stats ?? null, { allSteps: options.allSteps ?? [], attemptSteps: options.attemptSteps ?? {} });
}
function resolvedReferencePromptParts(prompt: string): { reference: string; userPrompt: string } {
const withoutEnvironment = stripCodeQueueEnvironmentHint(prompt);
const trimmed = withoutEnvironment.trimStart();
@@ -2239,30 +2300,44 @@ function traceStatsForScope(lookup: TraceStatsLookup, scopeId: string): Record<s
return traceStatsRecord(value);
}
function taskScopeId(taskId: string): string {
return `task:${taskId}`;
}
function traceStatsFallbackRecord(scopeId: string, fallback: Record<string, JsonValue> | null, lines: TranscriptLine[], endSeq: number | null): Record<string, JsonValue> {
const stepCount = traceStatsNumber(fallback, "stepCount") ?? 0;
return {
scopeId,
source: "oa-event-flow",
stepCount,
llmStepCount: traceStatsNumber(fallback, "llmStepCount") ?? stepCount,
traceLineCount: traceStatsNumber(fallback, "traceLineCount") ?? lines.length,
outputMaxSeq: traceStatsNumber(fallback, "transcriptMaxSeq") ?? Number(endSeq ?? lines.at(-1)?.seq ?? 0),
readCount: traceStatsNumber(fallback, "readCount") ?? 0,
editCount: traceStatsNumber(fallback, "editCount") ?? 0,
runCount: traceStatsNumber(fallback, "runCount") ?? 0,
errorCount: traceStatsNumber(fallback, "errorCount") ?? 0,
sourceHint: "raw-trace-fallback",
};
}
function executionSummaryWithOaStats(execution: JsonValue, stats: Record<string, JsonValue> | null): JsonValue {
if (typeof execution !== "object" || execution === null || Array.isArray(execution)) return execution;
const record = execution as Record<string, JsonValue>;
if (stats === null) {
const {
stepCount: _stepCount,
llmStepCount: _llmStepCount,
toolCallCount: _toolCallCount,
readCount: _readCount,
editCount: _editCount,
runCount: _runCount,
errorCount: _errorCount,
traceLineCount: _traceLineCount,
outputMaxSeq: _outputMaxSeq,
transcriptMaxSeq: _transcriptMaxSeq,
...rest
} = record;
const fallbackStepCount = traceStatsNumber(record, "stepCount") ?? traceStatsNumber(record, "llmStepCount");
const rawTracePresent = (fallbackStepCount ?? 0) > 0 || (traceStatsNumber(record, "traceLineCount") ?? 0) > 0 || (traceStatsNumber(record, "transcriptMaxSeq") ?? 0) > 0;
return {
...rest,
statsSource: "unavailable",
...record,
statsSource: rawTracePresent ? "raw-trace-fallback" : "raw-trace-empty",
traceStats: null,
statsUnavailable: true,
statsSyncing: rawTracePresent,
traceStatsState: rawTracePresent ? "degraded" : "empty",
traceStatsReason: rawTracePresent ? "oa-event-flow-stats-unavailable-raw-trace-present" : "no-trace-steps-yet",
} as unknown as JsonValue;
}
const sourceHint = String(stats.sourceHint ?? "");
const stepCount = traceStatsNumber(stats, "stepCount");
const llmStepCount = traceStatsNumber(stats, "llmStepCount") ?? stepCount;
const readCount = traceStatsNumber(stats, "readCount");
@@ -2285,17 +2360,27 @@ function executionSummaryWithOaStats(execution: JsonValue, stats: Record<string,
...(outputMaxSeq === null ? {} : { outputMaxSeq, transcriptMaxSeq: outputMaxSeq }),
statsSource: "oa-event-flow",
traceStats: stats,
statsUnavailable: sourceHint === "raw-trace-fallback",
statsSyncing: sourceHint === "raw-trace-fallback",
traceStatsState: sourceHint === "raw-trace-fallback" ? "degraded" : "ready",
traceStatsReason: sourceHint === "raw-trace-fallback" ? "oa-event-flow-stats-unavailable-raw-trace-present" : null,
} as unknown as JsonValue;
}
function taskTraceSummaryResponse(task: QueueTask, oaTraceStats: JsonValue | null = null, oaTraceStatsByScope: TraceStatsLookup = null): JsonValue {
function taskTraceSummaryResponse(task: QueueTask, oaTraceStats: JsonValue | null = null, oaTraceStatsByScope: TraceStatsLookup = null, oaSteps: TraceSummaryOaSteps | null = null): JsonValue {
const transcript = cachedPreviewTranscript(task);
const attempts = taskTraceAttemptSummaries(task, transcript, oaTraceStatsByScope);
const attempts = taskTraceAttemptSummaries(task, transcript, oaTraceStatsByScope, oaSteps);
const stats = traceStatsRecord(oaTraceStats);
const stepCount = traceStatsNumber(stats, "stepCount");
const fallbackTranscript = mergeTraceWindowLines(transcript, oaTraceTranscriptFromSteps(task, oaSteps?.allSteps ?? []));
const fallbackExecution = executionSummaryFromTranscript(task, fallbackTranscript, taskTiming(task) as Record<string, JsonValue>);
const fallbackRecord = traceStatsRecord(fallbackExecution);
const stepCount = traceStatsNumber(stats, "stepCount") ?? traceStatsNumber(fallbackRecord, "stepCount");
const llmStepCount = traceStatsNumber(stats, "llmStepCount") ?? stepCount;
const errorCount = traceStatsNumber(stats, "errorCount");
const execution = executionSummaryWithOaStats(taskExecutionSummary(task, transcript), stats);
const errorCount = traceStatsNumber(stats, "errorCount") ?? traceStatsNumber(fallbackRecord, "errorCount");
const rawTracePresent = stats === null && ((stepCount ?? 0) > 0 || (traceStatsNumber(fallbackRecord, "traceLineCount") ?? 0) > 0 || (traceStatsNumber(fallbackRecord, "transcriptMaxSeq") ?? 0) > 0);
const fallbackTraceStats = rawTracePresent ? traceStatsFallbackRecord(taskScopeId(task.id), fallbackRecord, fallbackTranscript, fallbackTranscript.at(-1)?.seq ?? null) : null;
const effectiveStats = stats ?? fallbackTraceStats;
const execution = executionSummaryWithOaStats(fallbackExecution, effectiveStats);
return {
id: task.id,
queueId: ctx().queueIdOf(task),
@@ -2320,8 +2405,13 @@ function taskTraceSummaryResponse(task: QueueTask, oaTraceStats: JsonValue | nul
promptEditable: ctx().queuedTaskPromptEditable(task),
prompt: taskTracePromptSummary(task),
execution,
traceStats: stats,
statsSource: stats === null ? "unavailable" : "oa-event-flow",
traceStats: effectiveStats,
statsSource: stats === null ? rawTracePresent ? "raw-trace-fallback" : "raw-trace-empty" : "oa-event-flow",
traceStatsState: stats === null ? rawTracePresent ? "degraded" : "empty" : "ready",
traceStatsReason: stats === null ? rawTracePresent ? "oa-event-flow-stats-unavailable-raw-trace-present" : "no-trace-steps-yet" : null,
statsUnavailable: stats === null,
statsSyncing: stats === null && rawTracePresent,
rawTraceStepCount: stats === null ? stepCount ?? 0 : null,
finalResponse: task.finalResponse,
finalResponseChars: task.finalResponse.length,
lastJudge: task.lastJudge,
@@ -2533,6 +2623,8 @@ export {
taskToolSummary,
taskTraceStepDetailResponse,
taskTraceStepsResponse,
taskTraceAttemptSummaries,
taskTraceSummaryFixtureResponse,
taskTraceSummaryResponse,
taskPromptDetailResponse,
transcriptLineSummaryLines,