diff --git a/scripts/code-queue-trace-summary-contract-test.ts b/scripts/code-queue-trace-summary-contract-test.ts new file mode 100644 index 00000000..8e05546d --- /dev/null +++ b/scripts/code-queue-trace-summary-contract-test.ts @@ -0,0 +1,242 @@ +import { configureTaskView, taskTraceSummaryFixtureResponse } from "../src/components/microservices/code-queue/src/task-view"; +import { configureTaskOutput } from "../src/components/microservices/code-queue/src/task-output"; +import { configureJudge } from "../src/components/microservices/code-queue/src/judge"; +import type { OaTraceStepSummary } from "../src/components/microservices/code-queue/src/oa-events"; +import type { JsonValue, PromptHistoryItem, QueueTask, QueuedStatusReason } from "../src/components/microservices/code-queue/src/types"; + +type JsonRecord = Record; + +function assertCondition(condition: unknown, message: string, detail: JsonRecord = {}): void { + if (!condition) throw new Error(`${message}: ${JSON.stringify(detail)}`); +} + +function asRecord(value: unknown): JsonRecord | null { + return typeof value === "object" && value !== null && !Array.isArray(value) ? value as JsonRecord : null; +} + +function pageBySeq( + items: T[], + _url: URL, + _limit: number, +): { mode: "tail" | "after" | "before"; afterSeq: number; beforeSeq: number | null; nextAfterSeq: number; previousBeforeSeq: number | null; hasMore: boolean; hasBefore: boolean; chunk: T[] } { + return { + mode: "tail", + afterSeq: 0, + beforeSeq: null, + nextAfterSeq: items.at(-1)?.seq ?? 0, + previousBeforeSeq: null, + hasMore: false, + hasBefore: false, + chunk: items, + }; +} + +function configureFixtureTaskView(): void { + configureTaskOutput({ + config: { maxInMemoryOutputRecords: 1000, outputArchiveDir: "/tmp/code-queue-trace-summary-contract/output" }, + allocateSeq: () => 1000, + errorToJson: (error: unknown): JsonValue => error instanceof Error ? { message: error.message } : String(error), + logger: () => undefined, + markTaskDirty: () => undefined, + nowIso: () => "2026-05-19T00:10:00.000Z", + schedulePersistState: () => undefined, + }); + configureJudge({ + config: { + minimaxApiKey: "", + minimaxApiBase: "", + minimaxModel: "minimax-m1", + judgeTimeoutMs: 1000, + judgeRepairAttempts: 0, + judgeMaxTokens: 1000, + }, + logger: () => undefined, + safePreview: (value: string, max = 300) => value.length > max ? `${value.slice(0, max)}...` : value, + userPromptForDisplay: (prompt: string) => prompt, + taskFullOutput: (task: QueueTask) => task.output, + taskReferenceIds: (task: QueueTask) => task.referenceTaskIds, + extractRecord: (value: unknown) => typeof value === "object" && value !== null && !Array.isArray(value) ? value as Record : null, + extractString: (value: unknown, key: string) => { + const record = typeof value === "object" && value !== null && !Array.isArray(value) ? value as Record : null; + const item = record?.[key]; + return typeof item === "string" ? item : null; + }, + promptLineCount: (text: string) => text.length > 0 ? text.split(/\r\n|\r|\n/u).length : 0, + judgeFailRetryLimit: 3, + }); + configureTaskView({ + config: { codexHome: "/tmp/code-queue-trace-summary-contract" }, + errorToJson: (error: unknown): JsonValue => error instanceof Error ? { message: error.message } : String(error), + jsonResponse: (body: unknown, status = 200): Response => Response.json(body, { status }), + logger: () => undefined, + mergePromptHistory: (items: PromptHistoryItem[]) => items, + nowIso: () => "2026-05-19T00:10:00.000Z", + outputPromptHistory: () => [], + pageBySeq, + parseLimit: () => 100, + parseSeqParam: () => null, + queueIdOf: (task: QueueTask) => task.queueId, + queuedStatusReason: (): QueuedStatusReason | null => null, + queuedTaskPromptEditable: () => false, + taskQueueEnteredAt: (task: QueueTask) => task.queueEnteredAt, + }); +} + +function fixtureTask(): QueueTask { + const at = "2026-05-19T00:00:00.000Z"; + return { + id: "codex_trace_contract", + queueId: "default", + queueEnteredAt: at, + prompt: "Trace summary contract fixture", + basePrompt: "Trace summary contract fixture", + referenceTaskIds: [], + referenceInjection: null, + providerId: "D601", + cwd: "/workspace", + model: "gpt-5.5", + reasoningEffort: null, + executionMode: "default", + maxAttempts: 99, + status: "running", + createdAt: at, + updatedAt: "2026-05-19T00:06:30.000Z", + startedAt: at, + finishedAt: null, + readAt: null, + currentAttempt: 2, + currentMode: "retry", + codexThreadId: "thread_trace_contract", + activeTurnId: "turn_trace_contract", + finalResponse: "", + lastError: null, + lastJudge: { decision: "retry", confidence: 1, reason: "attempt 1 asked for retry", source: "fallback" }, + judgeFailCount: 0, + promptHistory: [], + output: [ + { seq: 1, at, channel: "user", text: "Trace summary contract fixture", method: "enqueue" }, + { seq: 2, at: "2026-05-19T00:00:10.000Z", channel: "system", text: "attempt 1 / 99", method: "queue" }, + { seq: 3, at: "2026-05-19T00:01:00.000Z", channel: "command", text: "rg trace-summary src/components/microservices/code-queue/src", method: "item/started", itemId: "call-1" }, + { seq: 4, at: "2026-05-19T00:02:00.000Z", channel: "system", text: "judge=retry confidence=1 source=fallback: attempt 1 asked for retry", method: "judge" }, + ], + events: [], + attempts: [ + { + index: 1, + mode: "initial", + startedAt: "2026-05-19T00:00:10.000Z", + finishedAt: "2026-05-19T00:02:00.000Z", + terminalStatus: "completed", + transportClosedBeforeTerminal: false, + appServerExitCode: 0, + appServerSignal: null, + error: null, + finalResponse: "Attempt 1 response", + finalResponsePreview: "Attempt 1 response", + finalResponseChars: 18, + stderrTail: "", + judge: { decision: "retry", confidence: 1, reason: "attempt 1 asked for retry", source: "fallback" }, + judgeAt: "2026-05-19T00:02:00.000Z", + judgeSeq: 4, + outputStartSeq: 2, + outputEndSeq: 4, + }, + ], + cancelRequested: false, + nextPrompt: null, + nextMode: null, + }; +} + +function attempt2Steps(): OaTraceStepSummary[] { + return [ + { + eventSequence: 20, + seq: 20, + at: "2026-05-19T00:06:00.000Z", + kind: "ran", + title: "Run", + status: "item/started", + summaryLines: ["attempt 2 / 99", "pnpm test"], + rawSeqs: [20], + scopeId: "task:codex_trace_contract:attempt:2", + attemptIndex: 2, + source: "oa-event-flow", + }, + { + eventSequence: 21, + seq: 21, + at: "2026-05-19T00:06:20.000Z", + kind: "explored", + title: "Read", + status: "item/completed", + summaryLines: ["src/components/microservices/code-queue/src/task-view.ts"], + rawSeqs: [21], + scopeId: "task:codex_trace_contract:attempt:2", + attemptIndex: 2, + source: "oa-event-flow", + }, + ]; +} + +export function runCodeQueueTraceSummaryContract(): JsonRecord { + configureFixtureTaskView(); + const task = fixtureTask(); + const steps = attempt2Steps(); + const summary = taskTraceSummaryFixtureResponse(task, { + stats: null, + taskStats: null, + allSteps: [ + { + eventSequence: 1, + seq: 1, + at: "2026-05-19T00:00:10.000Z", + kind: "message", + title: "Assistant message", + status: "item/completed", + summaryLines: ["Attempt 1 judge complete"], + rawSeqs: [4], + scopeId: "task:codex_trace_contract", + attemptIndex: null, + source: "oa-event-flow", + }, + ...steps, + ], + attemptSteps: new Map([[2, steps]]), + }) as JsonRecord; + const attempts = Array.isArray(summary.attempts) ? summary.attempts.map(asRecord).filter((item): item is JsonRecord => item !== null) : []; + const attempt2 = attempts.find((attempt) => Number(attempt.index) === 2) ?? null; + const taskStats = asRecord(summary.traceStats); + const taskExecution = asRecord(summary.execution); + const attempt2Stats = asRecord(attempt2?.traceStats); + const attempt2Execution = asRecord(attempt2?.execution); + + assertCondition(summary.currentAttempt === 2, "summary must retain currentAttempt=2", summary); + assertCondition(summary.statsSource === "raw-trace-fallback", "summary must distinguish raw trace fallback from empty STEP", summary); + assertCondition(summary.traceStatsState === "degraded", "summary must mark OA stats sync degraded", summary); + assertCondition(summary.traceStatsReason === "oa-event-flow-stats-unavailable-raw-trace-present", "summary must explain degraded OA sync", summary); + assertCondition(taskStats?.source === "oa-event-flow" && taskStats?.sourceHint === "raw-trace-fallback", "summary must expose countable synthetic stats with source hint", taskStats ?? {}); + assertCondition(taskExecution?.statsSource === "oa-event-flow" && taskExecution?.traceStatsState === "degraded", "execution summary must stay countable while degraded", taskExecution ?? {}); + assertCondition(Number(summary.stepCount ?? 0) > 0, "summary fallback STEP count must be visible", summary); + assertCondition(attempt2 !== null, "summary must materialize the latest running retry attempt", { attempts }); + assertCondition(Number(attempt2?.stepCount ?? 0) > 0, "attempt 2 must expose live fallback STEP count", attempt2 ?? {}); + assertCondition(attempt2Stats?.source === "oa-event-flow" && attempt2Stats?.sourceHint === "raw-trace-fallback", "attempt 2 fallback stats must remain countable", attempt2Stats ?? {}); + assertCondition(attempt2Execution?.statsSource === "oa-event-flow" && attempt2Execution?.traceStatsState === "degraded", "attempt 2 execution must be countable while degraded", attempt2Execution ?? {}); + + return { + ok: true, + checks: [ + { name: "code-queue:trace-summary-latest-attempt-visible", ok: true }, + { name: "code-queue:trace-summary-raw-trace-step-fallback", ok: true }, + ], + taskId: task.id, + stepCount: summary.stepCount, + statsSource: summary.statsSource, + traceStatsState: summary.traceStatsState, + attempt2StepCount: attempt2?.stepCount, + }; +} + +if (import.meta.main) { + process.stdout.write(`${JSON.stringify(runCodeQueueTraceSummaryContract(), null, 2)}\n`); +} diff --git a/scripts/src/check.ts b/scripts/src/check.ts index cea16689..bc36089a 100644 --- a/scripts/src/check.ts +++ b/scripts/src/check.ts @@ -267,6 +267,7 @@ export function runChecks(config: UniDeskConfig, options: CheckOptions = default fileItem("scripts/code-queue-issue3-regression-test.ts"), fileItem("scripts/code-queue-liveness-diagnostics-test.ts"), fileItem("scripts/src/code-queue-liveness-fixtures.ts"), + fileItem("scripts/code-queue-trace-summary-contract-test.ts"), fileItem("scripts/src/ci.ts"), fileItem("scripts/src/e2e.ts"), fileItem("scripts/code-queue-prompt-observation-test.ts"), @@ -280,6 +281,7 @@ export function runChecks(config: UniDeskConfig, options: CheckOptions = default items.push(commandItem("typescript:scripts", ["bunx", "tsc", "-p", "scripts/tsconfig.json", "--noEmit", "--pretty", "false"], 120_000)); items.push(commandItem("code-queue:prompt-observation-contract", ["bun", "scripts/code-queue-prompt-observation-test.ts"], 30_000)); items.push(commandItem("code-queue:issue3-diagnostics-and-image-preflight", ["bun", "scripts/code-queue-issue3-regression-test.ts"], 30_000)); + items.push(commandItem("code-queue:trace-summary-contract", ["bun", "scripts/code-queue-trace-summary-contract-test.ts"], 30_000)); items.push(commandItem("code-queue:active-run-heartbeat-visible", ["bun", "scripts/code-queue-liveness-diagnostics-test.ts", "--only", "code-queue:active-run-heartbeat-visible"], 30_000)); items.push(commandItem("code-queue:trace-gap-not-stale", ["bun", "scripts/code-queue-liveness-diagnostics-test.ts", "--only", "code-queue:trace-gap-not-stale"], 30_000)); items.push(commandItem("code-queue:stale-active-owner-expired", ["bun", "scripts/code-queue-liveness-diagnostics-test.ts", "--only", "code-queue:stale-active-owner-expired"], 30_000)); @@ -289,6 +291,7 @@ export function runChecks(config: UniDeskConfig, options: CheckOptions = default items.push(skippedItem("typescript:scripts", "scripts TypeScript typecheck is opt-in", "--scripts-typecheck or --full")); items.push(skippedItem("code-queue:prompt-observation-contract", "prompt observation contract is opt-in with script checks", "--scripts-typecheck or --full")); items.push(skippedItem("code-queue:issue3-diagnostics-and-image-preflight", "Code Queue issue #3 regression fixtures are opt-in with script checks", "--scripts-typecheck or --full")); + items.push(skippedItem("code-queue:trace-summary-contract", "Code Queue trace summary contract is opt-in with script checks", "--scripts-typecheck or --full")); items.push(skippedItem("code-queue:liveness-diagnostics-fixtures", "Code Queue liveness diagnostics fixtures are opt-in with script checks", "--scripts-typecheck or --full")); } if (options.logs) { diff --git a/scripts/src/code-queue.ts b/scripts/src/code-queue.ts index e178627a..f3f065b9 100644 --- a/scripts/src/code-queue.ts +++ b/scripts/src/code-queue.ts @@ -630,8 +630,9 @@ function renderTraceConsoleRows(summary: Record, steps: Record< const execution = asRecord(summary.execution) ?? {}; const stats = asRecord(execution.traceStats) ?? asRecord(summary.traceStats); const statsSource = String(execution.statsSource || summary.statsSource || ""); + const statsUsable = stats !== null && (statsSource === "oa-event-flow" || statsSource === "raw-trace-fallback" || stats.source === "oa-event-flow"); const stat = (key: string): string | number => { - if (!stats || statsSource !== "oa-event-flow") return "--"; + if (!statsUsable) return "--"; const value = Number(stats[key]); return Number.isFinite(value) && value >= 0 ? Math.floor(value) : "--"; }; diff --git a/src/components/microservices/code-queue-mgr/src/index.ts b/src/components/microservices/code-queue-mgr/src/index.ts index 0f0b5bd6..b4a3887e 100644 --- a/src/components/microservices/code-queue-mgr/src/index.ts +++ b/src/components/microservices/code-queue-mgr/src/index.ts @@ -249,6 +249,25 @@ interface OaTraceStepRow { updated_at: Date | string; } +interface OaTraceStatsRow { + scope_id: string; + service_id: string; + subject_kind: string; + subject_id: string; + stats_revision: number | string; + step_count: number | string; + llm_step_count: number | string; + read_count: number | string; + edit_count: number | string; + run_count: number | string; + error_count: number | string; + trace_line_count: number | string; + output_max_seq: number | string; + attempt_stats_json: unknown; + last_event_sequence: number | string | null; + updated_at: Date | string; +} + interface TraceStepLine { seq: number; at: string; @@ -446,6 +465,11 @@ function timestampMs(value: string | Date | null | undefined): number | null { return Number.isFinite(ms) ? ms : null; } +function positiveNumber(value: unknown): number | null { + const parsed = Number(value); + return Number.isFinite(parsed) && parsed >= 0 ? Math.floor(parsed) : null; +} + function prefixPreview(value: unknown, maxChars: number): string { const text = String(value ?? ""); return text.length <= maxChars ? text : `${text.slice(0, Math.max(0, maxChars - 1))}…`; @@ -814,6 +838,41 @@ function activeTask(task: QueueTask): boolean { return task.status === "running" || task.status === "judging"; } +function visibleTraceOutputs(task: QueueTask): LiveOutput[] { + return task.output.filter((item) => item.channel !== "system" || item.method !== "enqueue"); +} + +function rawTraceStats(task: QueueTask): JsonRecord { + const steps = visibleTraceOutputs(task); + const count = (channel: string) => steps.filter((item) => item.channel === channel).length; + const stepCount = positiveNumber(task.stepCount ?? task.llmStepCount) ?? steps.length; + return { + stepCount, + llmStepCount: positiveNumber(task.llmStepCount ?? task.stepCount) ?? stepCount, + traceLineCount: steps.length, + outputMaxSeq: outputMaxSeq(task), + readCount: count("tool"), + editCount: count("diff"), + runCount: count("command"), + errorCount: count("error"), + }; +} + +function traceStatsFallbackPatch(task: QueueTask): JsonRecord { + const fallback = rawTraceStats(task); + const rawTracePresent = Number(fallback.stepCount ?? 0) > 0 || Number(fallback.traceLineCount ?? 0) > 0 || Number(fallback.outputMaxSeq ?? 0) > 0; + return { + ...fallback, + traceStats: null, + statsSource: rawTracePresent ? "raw-trace-fallback" : "raw-trace-empty", + traceStatsState: rawTracePresent ? "degraded" : "empty", + traceStatsReason: rawTracePresent ? "oa-event-flow-stats-unavailable-retained-trace-present" : "no-trace-steps-yet", + statsUnavailable: true, + statsSyncing: rawTracePresent, + rawTraceStepCount: fallback.stepCount, + }; +} + function schedulerHeartbeat(task: QueueTask): JsonRecord | null { const heartbeat = asRecord(task.schedulerHeartbeat); return heartbeat !== null && heartbeat.taskId === task.id ? heartbeat as JsonRecord : null; @@ -1917,10 +1976,7 @@ function taskListResponse(task: QueueTask, lite = true): JsonRecord { ...promptFields, promptEditable: queuedTaskPromptEditable(task), finalResponseChars: task.finalResponse.length, - stepCount: numberField(task.stepCount ?? task.llmStepCount, 0), - llmStepCount: numberField(task.llmStepCount ?? task.stepCount, 0), - traceStats: null, - statsSource: "code-queue-mgr", + ...traceStatsFallbackPatch(task), summaryOnly: true, referenceTaskIds: task.referenceTaskIds, referenceInjection: task.referenceInjection, @@ -2299,6 +2355,66 @@ function traceScopeId(taskId: string, url: URL): string { return attempt > 0 ? `task:${taskId}:attempt:${attempt}` : `task:${taskId}`; } +function taskScopeId(taskId: string): string { + return `task:${taskId}`; +} + +function taskAttemptScopeId(taskId: string, attemptIndex: number): string { + return `${taskScopeId(taskId)}:attempt:${Math.floor(attemptIndex)}`; +} + +function traceStatsRowToRecord(row: OaTraceStatsRow): JsonRecord { + const taskAttempt = row.scope_id.match(/^task:([^:]+):attempt:(\d+)$/u); + const taskOnly = row.scope_id.match(/^task:([^:]+)$/u); + const taskId = taskAttempt?.[1] ?? taskOnly?.[1] ?? ""; + const attemptIndex = taskAttempt === null ? null : Number(taskAttempt[2]); + return { + scopeId: row.scope_id, + serviceId: row.service_id, + subjectKind: row.subject_kind, + subjectId: row.subject_id, + statsRevision: numberField(row.stats_revision, 0), + stepCount: numberField(row.step_count, 0), + llmStepCount: numberField(row.llm_step_count, 0), + readCount: numberField(row.read_count, 0), + editCount: numberField(row.edit_count, 0), + runCount: numberField(row.run_count, 0), + errorCount: numberField(row.error_count, 0), + traceLineCount: numberField(row.trace_line_count, 0), + outputMaxSeq: numberField(row.output_max_seq, 0), + attemptStats: toJsonValue(row.attempt_stats_json ?? null), + lastEventSequence: row.last_event_sequence === null ? null : numberField(row.last_event_sequence, 0), + updatedAt: timestampToIso(row.updated_at), + taskId, + attemptIndex, + source: "oa-event-flow", + }; +} + +async function loadOaTraceStats(scopeIds: string[]): Promise> { + const uniqueScopeIds = Array.from(new Set(scopeIds.map((scopeId) => scopeId.trim()).filter(Boolean))); + const result = new Map(); + if (uniqueScopeIds.length === 0) return result; + try { + const rows = await traceSql` + SELECT scope_id, service_id, subject_kind, subject_id, stats_revision, + step_count, llm_step_count, read_count, edit_count, run_count, error_count, + trace_line_count, output_max_seq, attempt_stats_json, last_event_sequence, updated_at + FROM oa_trace_stats + WHERE scope_id IN ${traceSql(uniqueScopeIds)} + ORDER BY updated_at DESC + LIMIT ${Math.max(100, uniqueScopeIds.length)} + `; + for (const row of rows) { + const record = traceStatsRowToRecord(row); + result.set(String(record.scopeId || row.scope_id), record); + } + } catch (error) { + log("warn", "oa_trace_stats_load_failed", { scopeIds: uniqueScopeIds, error: errorToJson(error) }); + } + return result; +} + function oaTraceStepToLine(row: OaTraceStepRow): TraceStepLine | null { const seq = numberField(row.step_seq, 0); if (seq <= 0) return null; @@ -2336,6 +2452,17 @@ async function loadOaTraceStepLines(taskId: string, url: URL): Promise line !== null)); } +async function loadOaTraceStepLinesForScope(scopeId: string): Promise { + const rows = await traceSql` + SELECT scope_id, step_seq, kind, title, status, summary_lines, raw_seqs, created_at, updated_at + FROM oa_trace_steps + WHERE scope_id = ${scopeId} + ORDER BY step_seq ASC + LIMIT 5000 + `; + return coalesceCodexToolLifecycleTraceSteps(rows.map(oaTraceStepToLine).filter((line): line is TraceStepLine => line !== null)); +} + function fallbackTraceStepLines(task: QueueTask): TraceStepLine[] { return task.output .filter((item) => item.channel !== "system" || item.method !== "enqueue") @@ -2397,21 +2524,147 @@ async function traceStepDetail(task: QueueTask, url: URL): Promise item.channel !== "system" || item.method !== "enqueue"); +function attemptIndexesForTrace(task: QueueTask): number[] { + const indexes = new Set(); + for (const attempt of task.attempts) { + const index = Number(attempt.index); + if (Number.isInteger(index) && index > 0) indexes.add(index); + } + const current = Number(task.currentAttempt || 0); + const max = Math.max(current, ...Array.from(indexes), 0); + for (let index = 1; index <= max; index += 1) indexes.add(index); + return Array.from(indexes).sort((left, right) => left - right); +} + +function traceStepExecutionStats(steps: TraceStepLine[]): JsonRecord { + const countKind = (kind: string) => steps.filter((step) => step.kind === kind).length; return { + stepCount: steps.length, + llmStepCount: steps.length, + traceLineCount: steps.length, + outputMaxSeq: steps.at(-1)?.seq ?? 0, + readCount: countKind("explored"), + editCount: countKind("edited"), + runCount: countKind("ran"), + errorCount: countKind("error"), + }; +} + +function fallbackTraceStatsRecord(scopeId: string, fallback: JsonRecord): JsonRecord { + const stepCount = positiveNumber(fallback.stepCount) ?? 0; + return { + scopeId, + source: "oa-event-flow", + stepCount, + llmStepCount: positiveNumber(fallback.llmStepCount) ?? stepCount, + traceLineCount: positiveNumber(fallback.traceLineCount) ?? stepCount, + outputMaxSeq: positiveNumber(fallback.outputMaxSeq) ?? 0, + readCount: positiveNumber(fallback.readCount) ?? 0, + editCount: positiveNumber(fallback.editCount) ?? 0, + runCount: positiveNumber(fallback.runCount) ?? 0, + errorCount: positiveNumber(fallback.errorCount) ?? 0, + sourceHint: "raw-trace-fallback", + }; +} + +function applyStatsOrFallback(fallback: JsonRecord, stats: JsonRecord | null, scopeId: string): JsonRecord { + if (stats !== null) { + return { + ...fallback, + ...stats, + traceStats: stats, + statsSource: "oa-event-flow", + traceStatsState: "ready", + traceStatsReason: null, + statsUnavailable: false, + statsSyncing: false, + }; + } + const rawTracePresent = Number(fallback.stepCount ?? 0) > 0 || Number(fallback.traceLineCount ?? 0) > 0 || Number(fallback.outputMaxSeq ?? 0) > 0; + const traceStats = rawTracePresent ? fallbackTraceStatsRecord(scopeId, fallback) : null; + return { + ...fallback, + traceStats, + statsSource: rawTracePresent ? "raw-trace-fallback" : "raw-trace-empty", + traceStatsState: rawTracePresent ? "degraded" : "empty", + traceStatsReason: rawTracePresent ? "oa-event-flow-stats-unavailable-raw-trace-present" : "no-trace-steps-yet", + statsUnavailable: true, + statsSyncing: rawTracePresent, + rawTraceStepCount: fallback.stepCount ?? 0, + }; +} + +async function traceSummary(task: QueueTask): Promise { + const fallbackSteps = fallbackTraceStepLines(task); + const attempts = attemptIndexesForTrace(task); + const scopeIds = [taskScopeId(task.id), ...attempts.map((index) => taskAttemptScopeId(task.id, index))]; + const [stats, allOaSteps, attemptStepEntries] = await Promise.all([ + loadOaTraceStats(scopeIds), + loadOaTraceStepLinesForScope(taskScopeId(task.id)).catch((error) => { + log("warn", "oa_trace_summary_steps_load_failed", { taskId: task.id, error: errorToJson(error) }); + return [] as TraceStepLine[]; + }), + Promise.all(attempts.map(async (index) => { + const steps = await loadOaTraceStepLinesForScope(taskAttemptScopeId(task.id, index)).catch((error) => { + log("warn", "oa_trace_summary_attempt_steps_load_failed", { taskId: task.id, attemptIndex: index, error: errorToJson(error) }); + return [] as TraceStepLine[]; + }); + return [index, steps] as const; + })), + ]); + const visibleSteps = allOaSteps.length > 0 ? allOaSteps : fallbackSteps; + const taskStats = applyStatsOrFallback(traceStepExecutionStats(visibleSteps), stats.get(taskScopeId(task.id)) ?? null, taskScopeId(task.id)); + const attemptSteps = new Map(attemptStepEntries); + const attemptRows = attempts.length > 0 ? attempts : task.attempts.map((attempt) => attempt.index); + return { + id: task.id, taskId: task.id, queueId: queueIdOf(task), status: task.status, providerId: task.providerId, executionMode: task.executionMode, + executionModeInfo: executionModeInfo(task.executionMode), model: task.model, - stepCount: numberField(task.stepCount ?? task.llmStepCount, steps.length), - retainedStepCount: steps.length, + agentPort: codeAgentPortForModel(task.model), + agentPortInfo: codeAgentPortInfo(codeAgentPortForModel(task.model)), + cwd: task.cwd, + reasoningEffort: task.reasoningEffort, + createdAt: task.createdAt, + startedAt: task.startedAt, + finishedAt: task.finishedAt, + updatedAt: task.updatedAt, + currentAttempt: task.currentAttempt, + currentMode: task.currentMode, + maxAttempts: task.maxAttempts, + ...taskStats, + retainedStepCount: fallbackSteps.length, outputMaxSeq: outputMaxSeq(task), schedulerHeartbeat: task.schedulerHeartbeat ?? null, - statsSource: "code-queue-mgr-postgres", - attempts: task.attempts.map((attempt) => ({ + execution: taskStats, + attempts: attemptRows.map((index) => { + const attempt = task.attempts.find((item) => Number(item.index) === index) ?? task.attempts[index - 1] ?? null; + const steps = attemptSteps.get(index) ?? fallbackSteps.filter((step) => step.seq >= Number(attempt?.outputStartSeq ?? -Infinity) && step.seq <= Number(attempt?.outputEndSeq ?? Infinity)); + const attemptFallback = traceStepExecutionStats(steps); + const attemptStats = applyStatsOrFallback(attemptFallback, stats.get(taskAttemptScopeId(task.id, index)) ?? null, taskAttemptScopeId(task.id, index)); + return { + ...(attempt ?? {}), + index, + mode: attempt?.mode ?? (index <= 1 ? "initial" : "retry"), + startedAt: attempt?.startedAt ?? steps[0]?.at ?? task.startedAt, + finishedAt: attempt?.finishedAt ?? null, + terminalStatus: attempt?.terminalStatus ?? null, + startSeq: steps[0]?.seq ?? attempt?.outputStartSeq ?? null, + endSeq: steps.at(-1)?.seq ?? attempt?.outputEndSeq ?? null, + finalResponsePreview: attempt?.finalResponsePreview ?? "", + judge: attempt?.judge ?? (index === task.currentAttempt ? task.lastJudge : null), + feedbackPromptPreview: attempt?.feedbackPromptPreview ?? null, + feedbackPromptChars: attempt?.feedbackPromptChars ?? null, + attemptScopeId: taskAttemptScopeId(task.id, index), + ...attemptStats, + execution: attemptStats, + }; + }), + storedAttempts: task.attempts.map((attempt) => ({ index: attempt.index, mode: attempt.mode, startedAt: attempt.startedAt, @@ -2429,6 +2682,12 @@ function traceSummary(task: QueueTask): JsonRecord { lines: task.prompt.split(/\r?\n/u).length, }, lastAssistantMessage: lastAssistantMessage(task), + finalResponse: task.finalResponse, + finalResponseChars: task.finalResponse.length, + lastJudge: task.lastJudge, + lastError: task.lastError, + errorCount: taskStats.errorCount, + timing: taskTiming(task), }; } @@ -2976,7 +3235,7 @@ async function route(req: Request): Promise { const traceSummaryMatch = url.pathname.match(/^\/api\/tasks\/([^/]+)\/trace-summary$/u); if (traceSummaryMatch !== null && req.method === "GET") { const task = await loadTask(decodeURIComponent(traceSummaryMatch[1] ?? "")); - return task === null ? jsonResponse({ ok: false, error: "task not found" }, 404) : jsonResponse({ ok: true, summary: traceSummary(task) }); + return task === null ? jsonResponse({ ok: false, error: "task not found" }, 404) : jsonResponse({ ok: true, summary: await traceSummary(task) }); } const traceStepsMatch = url.pathname.match(/^\/api\/tasks\/([^/]+)\/trace-steps$/u); if (traceStepsMatch !== null && req.method === "GET") { diff --git a/src/components/microservices/code-queue/src/index.ts b/src/components/microservices/code-queue/src/index.ts index 4bf05a1a..d4aefc88 100644 --- a/src/components/microservices/code-queue/src/index.ts +++ b/src/components/microservices/code-queue/src/index.ts @@ -130,8 +130,9 @@ import { readOaTraceStatsForTask, readOaTraceStatsForTaskAttempts, readOaTraceStatsForTasks, + readOaTraceStepsForTask, } from "./oa-events"; -import { configureSelfTests, runJudgeInfraSelfTest, runQueueClaimMoveSelfTest, runQueueOrderingSelfTest, runReferenceInjectionSelfTest, runTracePortSelfTest } from "./self-tests"; +import { configureSelfTests, runJudgeInfraSelfTest, runQueueClaimMoveSelfTest, runQueueOrderingSelfTest, runReferenceInjectionSelfTest, runTracePortSelfTest, runTraceSummaryContractSelfTest } from "./self-tests"; import { codexToolLifecycleStartedBeforeIn, configureTaskView, @@ -5303,6 +5304,7 @@ async function route(req: Request): Promise { if (url.pathname === "/api/queue-claim-move/self-test" && (req.method === "GET" || req.method === "POST")) return jsonResponse(await runQueueClaimMoveSelfTest()); if (url.pathname === "/api/reference-injection/self-test" && (req.method === "GET" || req.method === "POST")) return jsonResponse(await runReferenceInjectionSelfTest()); if (url.pathname === "/api/trace-port/self-test" && (req.method === "GET" || req.method === "POST")) return jsonResponse(runTracePortSelfTest()); + if (url.pathname === "/api/trace-summary-contract/self-test" && (req.method === "GET" || req.method === "POST")) return jsonResponse(runTraceSummaryContractSelfTest()); if (url.pathname === "/api/oa/backfill" && (req.method === "GET" || req.method === "POST")) return jsonResponse(await backfillOaTraceStats(url)); if (url.pathname === "/api/notifications/claudeqq" && req.method === "GET") { await loadClaudeQqNotificationOutboxFromDatabase(); @@ -5410,8 +5412,23 @@ async function route(req: Request): Promise { if (traceSummaryMatch !== null && req.method === "GET") { const task = await findTaskForRead(decodeURIComponent(traceSummaryMatch[1] ?? "")); if (task === null) return jsonResponse({ ok: false, error: "task not found" }, 404); - const traceStats = await readOaTraceStatsForTaskAttempts(task.id, traceAttemptIndexesForTask(task)); - return jsonResponse({ ok: true, summary: taskTraceSummaryResponse(task, traceStats.get(`task:${task.id}`) ?? null, traceStats) }); + const attemptIndexes = traceAttemptIndexesForTask(task); + const [traceStats, allOaSteps, attemptOaStepsEntries] = await Promise.all([ + readOaTraceStatsForTaskAttempts(task.id, attemptIndexes), + readOaTraceStepsForTask(task.id, null).catch((error) => { + logger("warn", "oa_trace_summary_steps_read_failed", { taskId: task.id, error: errorToJson(error) }); + return []; + }), + Promise.all(attemptIndexes.map(async (attemptIndex) => { + const steps = await readOaTraceStepsForTask(task.id, attemptIndex).catch((error) => { + logger("warn", "oa_trace_summary_attempt_steps_read_failed", { taskId: task.id, attemptIndex, error: errorToJson(error) }); + return []; + }); + return [attemptIndex, steps] as const; + })), + ]); + const attemptOaSteps = new Map(attemptOaStepsEntries); + return jsonResponse({ ok: true, summary: taskTraceSummaryResponse(task, traceStats.get(`task:${task.id}`) ?? null, traceStats, { allSteps: allOaSteps, attemptSteps: attemptOaSteps }) }); } const traceStepsMatch = url.pathname.match(/^\/api\/tasks\/([^/]+)\/trace-steps$/u); if (traceStepsMatch !== null && req.method === "GET") { diff --git a/src/components/microservices/code-queue/src/oa-events.ts b/src/components/microservices/code-queue/src/oa-events.ts index bd515b0e..059d6e1c 100644 --- a/src/components/microservices/code-queue/src/oa-events.ts +++ b/src/components/microservices/code-queue/src/oa-events.ts @@ -33,6 +33,18 @@ export interface OaTraceStats extends JsonRecord { source: "oa-event-flow"; } +export interface TraceStatsFallback { + stepCount?: number | null; + llmStepCount?: number | null; + traceLineCount?: number | null; + outputMaxSeq?: number | null; + readCount?: number | null; + editCount?: number | null; + runCount?: number | null; + errorCount?: number | null; + reason?: string; +} + export interface OaTraceStepSummary { eventSequence: number; seq: number; @@ -654,16 +666,69 @@ function statNumber(stats: OaTraceStats | null | undefined, key: string): number return Number.isFinite(value) && value >= 0 ? Math.floor(value) : null; } -export function applyOaTraceStatsToTaskJson(value: JsonValue, stats: OaTraceStats | null | undefined): JsonValue { +function fallbackNumber(fallback: TraceStatsFallback | null | undefined, key: keyof TraceStatsFallback): number | null { + const value = Number(fallback?.[key]); + return Number.isFinite(value) && value >= 0 ? Math.floor(value) : null; +} + +function fallbackTraceStatsPatch(fallback: TraceStatsFallback | null | undefined, taskId = ""): JsonRecord { + const stepCount = fallbackNumber(fallback, "stepCount") ?? fallbackNumber(fallback, "llmStepCount"); + const llmStepCount = fallbackNumber(fallback, "llmStepCount") ?? stepCount; + const traceLineCount = fallbackNumber(fallback, "traceLineCount"); + const outputMaxSeq = fallbackNumber(fallback, "outputMaxSeq"); + const rawTracePresent = (stepCount ?? 0) > 0 || (traceLineCount ?? 0) > 0 || (outputMaxSeq ?? 0) > 0; + const state = rawTracePresent ? "degraded" : "empty"; + const reason = rawTracePresent + ? fallback?.reason || "oa-event-flow-stats-unavailable-raw-trace-present" + : "no-trace-steps-yet"; + const traceStats: JsonRecord = { + scopeId: taskId.length > 0 ? `task:${taskId}` : "task:unknown", + source: "oa-event-flow", + stepCount: stepCount ?? 0, + llmStepCount: llmStepCount ?? stepCount ?? 0, + traceLineCount: traceLineCount ?? stepCount ?? 0, + outputMaxSeq: outputMaxSeq ?? 0, + readCount: fallbackNumber(fallback, "readCount") ?? 0, + editCount: fallbackNumber(fallback, "editCount") ?? 0, + runCount: fallbackNumber(fallback, "runCount") ?? 0, + errorCount: fallbackNumber(fallback, "errorCount") ?? 0, + sourceHint: "raw-trace-fallback", + }; + const patch: JsonRecord = { + traceStats, + statsSource: rawTracePresent ? "raw-trace-fallback" : "raw-trace-empty", + traceStatsState: state, + traceStatsReason: reason, + statsUnavailable: true, + statsSyncing: rawTracePresent, + rawTraceStepCount: stepCount ?? 0, + }; + if (stepCount !== null) patch.stepCount = stepCount; + if (llmStepCount !== null) patch.llmStepCount = llmStepCount; + if (traceLineCount !== null) patch.traceLineCount = traceLineCount; + if (outputMaxSeq !== null) patch.outputMaxSeq = outputMaxSeq; + for (const key of ["readCount", "editCount", "runCount", "errorCount"] as const) { + const value = fallbackNumber(fallback, key); + if (value !== null) patch[key] = value; + } + return patch; +} + +export function applyOaTraceStatsToTaskJson(value: JsonValue, stats: OaTraceStats | null | undefined, fallback: TraceStatsFallback | null = null): JsonValue { if (typeof value !== "object" || value === null || Array.isArray(value)) return value; if (stats === null || stats === undefined) { - return { ...(value as JsonRecord), traceStats: null, statsSource: "unavailable", stepCount: null, llmStepCount: null } as unknown as JsonValue; + const taskId = typeof (value as JsonRecord).id === "string" ? String((value as JsonRecord).id) : ""; + return { ...(value as JsonRecord), ...fallbackTraceStatsPatch(fallback, taskId) } as unknown as JsonValue; } const stepCount = statNumber(stats, "stepCount"); const outputMaxSeq = statNumber(stats, "outputMaxSeq"); const patch: JsonRecord = { traceStats: stats, statsSource: "oa-event-flow", + traceStatsState: "ready", + traceStatsReason: null, + statsUnavailable: false, + statsSyncing: false, }; if (stepCount !== null) { patch.stepCount = stepCount; diff --git a/src/components/microservices/code-queue/src/queue-api.ts b/src/components/microservices/code-queue/src/queue-api.ts index c1aae1b9..feb94188 100644 --- a/src/components/microservices/code-queue/src/queue-api.ts +++ b/src/components/microservices/code-queue/src/queue-api.ts @@ -5,9 +5,9 @@ import { codeAgentPortForModel, codeAgentPortInfo, codeExecutionModeInfo, codeEx import { claudeQqNotificationOutboxStats, notificationTargetConfigured, notificationTargetLabel } from "./notifications"; import { executionModeOptions, executionProviderOptions } from "./provider-runtime"; import { taskFullOutput } from "./task-output"; -import { applyOaTraceStatsToTaskJson, taskScopeId, type OaTraceStats } from "./oa-events"; +import { applyOaTraceStatsToTaskJson, taskScopeId, type OaTraceStats, type TraceStatsFallback } from "./oa-events"; import { buildExecutionDiagnostics, schedulerHeartbeatStaleMs } from "./execution-diagnostics"; -import { buildCompactTaskTranscript, buildTaskTranscript, cachedPreviewTranscript, fullTranscript, prefixPreview, safePreview, statsDaysFromUrl, taskForCompactMetaResponse, taskForMetaResponse, taskStatisticsSummary, taskTiming, timestampMs } from "./task-view"; +import { buildCompactTaskTranscript, buildTaskTranscript, cachedPreviewTranscript, fullTranscript, prefixPreview, safePreview, statsDaysFromUrl, taskForCompactMetaResponse, taskForMetaResponse, taskListStepCount, taskStatisticsSummary, taskTiming, timestampMs } from "./task-view"; import { userPromptForDisplay } from "./prompts"; import type { ActiveRun, ActiveRunSlotWaiter } from "./code-agent/common"; import type { JsonValue, QueueRecord, QueuedStatusReason, QueueTask, RuntimeConfig, TaskStatus, TranscriptLine } from "./types"; @@ -208,8 +208,31 @@ function statsForTask(stats: Map, task: QueueTask): OaTrac return stats.get(taskScopeId(task.id)) ?? null; } +function traceStatsFallbackForTask(task: QueueTask): TraceStatsFallback { + const transcript = cachedPreviewTranscript(task).filter((line) => line.title !== "Submitted prompt"); + const stepCount = taskListStepCount(task); + const outputMaxSeq = task.output.at(-1)?.seq ?? 0; + const readCount = transcript.filter((line) => line.kind === "explored").length; + const editCount = transcript.filter((line) => line.kind === "edited").length; + const runCount = transcript.filter((line) => line.kind === "ran").length; + const errorCount = transcript.filter((line) => line.kind === "error").length; + return { + stepCount, + llmStepCount: stepCount, + traceLineCount: transcript.length, + outputMaxSeq, + readCount, + editCount, + runCount, + errorCount, + reason: stepCount > 0 || transcript.length > 0 || outputMaxSeq > 0 + ? "oa-event-flow-stats-unavailable-retained-trace-present" + : "no-trace-steps-yet", + }; +} + function applyStats(value: JsonValue, stats: Map, task: QueueTask): JsonValue { - return applyOaTraceStatsToTaskJson(value, statsForTask(stats, task)); + return applyOaTraceStatsToTaskJson(value, statsForTask(stats, task), traceStatsFallbackForTask(task)); } function taskForListResponse(task: QueueTask, lite = false, queueTasks?: QueueTask[]): JsonValue { diff --git a/src/components/microservices/code-queue/src/self-tests.ts b/src/components/microservices/code-queue/src/self-tests.ts index f658a897..fd1641e4 100644 --- a/src/components/microservices/code-queue/src/self-tests.ts +++ b/src/components/microservices/code-queue/src/self-tests.ts @@ -4,8 +4,9 @@ import { minimaxM27Model } from "./code-agent/common"; import { openCodeTransportClosedBeforeTerminal, remoteOpenCodeRunCommandForTest } from "./code-agent/opencode"; import { continuePromptSourceBudgetChars, miniMaxJudgeMessages, parsedContinuePromptForJudge, parseJudgeJson, queueRecoveryRetryPrompt, retryPrompt } from "./judge"; import { codeQueueEnvironmentHintTitle, injectCodeQueueEnvironmentHint, promptWithCodeQueueEnvironmentHint, userPromptForDisplay } from "./prompts"; -import { buildTaskTranscript, safePreview, transcriptLineSummaryLines } from "./task-view"; +import { buildTaskTranscript, safePreview, taskTraceSummaryFixtureResponse, transcriptLineSummaryLines } from "./task-view"; import type { ActiveRunSlotWaiter } from "./code-agent/common"; +import type { OaTraceStepSummary } from "./oa-events"; import type { JsonValue, LiveOutput, QueueTask, QueuedStatusReason, QueueTaskRequest, RuntimeConfig, TaskStatus } from "./types"; export interface SelfTestsContext { @@ -28,6 +29,7 @@ export interface SelfTestsContext { removeActiveRunSlotWaiter: (waiter: ActiveRunSlotWaiter) => void; resolveReasoningEffort: (model: string, explicit?: string | null) => string | null; runDatabaseClaimMoveSelfTest?: () => Promise; + runTraceSummaryContractSelfTest?: () => JsonValue; taskIsRecoverableOrphanedActive: (task: QueueTask, staleMs?: number) => boolean; tasks: () => QueueTask[]; updateProcessingFlag: () => void; @@ -91,6 +93,118 @@ function assertReferenceTest(condition: boolean, message: string): void { if (!condition) throw new Error(message); } +function traceSelfTestTask(): QueueTask { + const basePrompt = "Trace sync contract fixture"; + const at = "2026-05-19T00:00:00.000Z"; + return ctx().normalizeTask({ + id: "codex_trace_sync_fixture", + queueId: ctx().defaultQueueId, + queueEnteredAt: at, + prompt: basePrompt, + basePrompt, + referenceTaskIds: [], + referenceInjection: null, + providerId: ctx().config.mainProviderId, + cwd: ctx().config.defaultWorkdir, + model: ctx().config.defaultModel, + reasoningEffort: ctx().resolveReasoningEffort(ctx().config.defaultModel, ctx().config.defaultReasoningEffort), + executionMode: "default", + maxAttempts: 2, + status: "running", + createdAt: at, + updatedAt: "2026-05-19T00:05:00.000Z", + startedAt: at, + finishedAt: null, + readAt: null, + currentAttempt: 2, + currentMode: "retry", + codexThreadId: "thread_trace_sync_fixture", + activeTurnId: "turn_trace_sync_fixture", + finalResponse: "", + lastError: null, + lastJudge: null, + judgeFailCount: 0, + promptHistory: [], + output: [ + { seq: 1, at, channel: "user", text: basePrompt, method: "enqueue" }, + { seq: 2, at: "2026-05-19T00:01:00.000Z", channel: "command", text: "attempt 1 / 2\nrg src/components/microservices/code-queue/src/task-view.ts", method: "item/started", itemId: "call-1" }, + { seq: 3, at: "2026-05-19T00:01:30.000Z", channel: "assistant", text: "Attempt 1 judge complete.", method: "judge" }, + ], + events: [], + attempts: [ + { + index: 1, + mode: "initial", + startedAt: "2026-05-19T00:00:05.000Z", + finishedAt: "2026-05-19T00:02:00.000Z", + terminalStatus: "completed", + transportClosedBeforeTerminal: false, + appServerExitCode: 0, + appServerSignal: null, + error: null, + finalResponse: "Attempt 1 answer", + finalResponsePreview: "Attempt 1 answer", + finalResponseChars: 16, + stderrTail: "", + judge: { decision: "complete", confidence: 1, reason: "attempt 1 ok", source: "fallback" }, + judgeAt: "2026-05-19T00:02:00.000Z", + judgeSeq: 3, + outputStartSeq: 2, + outputEndSeq: 3, + }, + ], + cancelRequested: false, + nextPrompt: null, + nextMode: null, + }); +} + +function traceSyncAttemptSteps(): Map { + const attempt2: OaTraceStepSummary[] = [ + { eventSequence: 20, seq: 20, at: "2026-05-19T00:06:00.000Z", kind: "ran", title: "Run", status: "item/started", summaryLines: ["attempt 2 / 2", "pnpm test"], rawSeqs: [20], scopeId: "task:codex_trace_sync_fixture:attempt:2", attemptIndex: 2, source: "oa-event-flow" }, + { eventSequence: 21, seq: 21, at: "2026-05-19T00:06:20.000Z", kind: "explored", title: "Read", status: "item/completed", summaryLines: ["src/components/microservices/code-queue/src/task-view.ts"], rawSeqs: [21], scopeId: "task:codex_trace_sync_fixture:attempt:2", attemptIndex: 2, source: "oa-event-flow" }, + ]; + return new Map([[2, attempt2]]); +} + +function traceSyncAllSteps(): OaTraceStepSummary[] { + return [ + { eventSequence: 1, seq: 1, at: "2026-05-19T00:00:10.000Z", kind: "message", title: "Assistant message", status: "item/completed", summaryLines: ["Judge complete for attempt 1"], rawSeqs: [3], scopeId: "task:codex_trace_sync_fixture", attemptIndex: null, source: "oa-event-flow" }, + { eventSequence: 20, seq: 20, at: "2026-05-19T00:06:00.000Z", kind: "ran", title: "Run", status: "item/started", summaryLines: ["attempt 2 / 2", "pnpm test"], rawSeqs: [20], scopeId: "task:codex_trace_sync_fixture:attempt:2", attemptIndex: 2, source: "oa-event-flow" }, + { eventSequence: 21, seq: 21, at: "2026-05-19T00:06:20.000Z", kind: "explored", title: "Read", status: "item/completed", summaryLines: ["src/components/microservices/code-queue/src/task-view.ts"], rawSeqs: [21], scopeId: "task:codex_trace_sync_fixture:attempt:2", attemptIndex: 2, source: "oa-event-flow" }, + ]; +} + +function runTraceSummaryContractSelfTest(): JsonValue { + const task = traceSelfTestTask(); + const summary = taskTraceSummaryFixtureResponse(task, { + stats: null, + taskStats: null, + allSteps: traceSyncAllSteps(), + attemptSteps: traceSyncAttemptSteps(), + }) as Record; + const attempts = Array.isArray(summary.attempts) ? summary.attempts as Record[] : []; + const currentAttempt = attempts.find((attempt) => Number(attempt.index) === 2) ?? null; + assertReferenceTest(String(summary.statsSource || "").startsWith("raw-trace"), "trace summary should surface raw-trace fallback state"); + assertReferenceTest(summary.traceStatsReason === "oa-event-flow-stats-unavailable-raw-trace-present", "trace summary should explain degraded sync"); + assertReferenceTest(summary.traceStatsState === "degraded", "trace summary should mark degraded sync state"); + assertReferenceTest(Number(summary.stepCount ?? 0) > 0, "trace summary should expose fallback step count"); + assertReferenceTest(Array.isArray(attempts) && attempts.length >= 2, "trace summary should materialize latest running attempt"); + assertReferenceTest(Number(currentAttempt?.index ?? 0) === 2, "trace summary should include attempt 2"); + assertReferenceTest(Number(currentAttempt?.stepCount ?? 0) > 0, "attempt 2 should expose a live step count"); + assertReferenceTest(String(currentAttempt?.statsSource || "") === "raw-trace-fallback" || String(currentAttempt?.statsSource || "") === "oa-event-flow", "attempt 2 should not be permanently unavailable"); + assertReferenceTest(String(currentAttempt?.traceStatsState || "") === "degraded" || String(currentAttempt?.traceStatsState || "") === "ready", "attempt 2 should surface a visible sync state"); + return { + ok: true, + taskId: task.id, + statsSource: String(summary.statsSource || ""), + traceStatsState: String(summary.traceStatsState || ""), + traceStatsReason: String(summary.traceStatsReason || ""), + stepCount: Number(summary.stepCount ?? 0), + attempt2: currentAttempt as JsonValue, + }; +} + async function runReferenceInjectionSelfTest(): Promise { const at = "2026-05-08T00:00:00.000Z"; const taskA = testTask("codex_1000_aaaaaa", "A base prompt", "A final", [], at); @@ -562,4 +676,4 @@ function runJudgeInfraSelfTest(): JsonValue { }; } -export { runJudgeInfraSelfTest, runQueueClaimMoveSelfTest, runQueueOrderingSelfTest, runReferenceInjectionSelfTest, runTracePortSelfTest }; +export { runJudgeInfraSelfTest, runQueueClaimMoveSelfTest, runQueueOrderingSelfTest, runReferenceInjectionSelfTest, runTracePortSelfTest, runTraceSummaryContractSelfTest }; diff --git a/src/components/microservices/code-queue/src/task-view.ts b/src/components/microservices/code-queue/src/task-view.ts index 1241e7aa..fdd85db7 100644 --- a/src/components/microservices/code-queue/src/task-view.ts +++ b/src/components/microservices/code-queue/src/task-view.ts @@ -1864,6 +1864,11 @@ interface TraceAttemptWindow { type TraceStatsLookup = Map | Record | null | undefined; +interface TraceSummaryOaSteps { + allSteps?: OaTraceStepSummary[]; + attemptSteps?: Map | Record; +} + function transcriptLineSeq(line: TranscriptLine | undefined): number | null { const value = Number(line?.seq ?? NaN); return Number.isFinite(value) ? value : null; @@ -1991,6 +1996,49 @@ async function oaTraceTranscriptForTask(task: QueueTask, attemptIndex: number | return coalesceCodexToolLifecycleTranscriptLines(overlayTraceMessagesFromRawTranscript(oaLines, rawLines)); } +function oaTraceTranscriptFromSteps(task: QueueTask, steps: OaTraceStepSummary[] = []): TranscriptLine[] { + if (steps.length === 0) return []; + const oaLines = coalesceCodexToolLifecycleTranscriptLines(coalesceTranscriptMessageFragments(steps.map(oaTraceStepToTranscriptLine).filter(traceLineVisibleInTraceView))); + const rawLines = coalesceCodexToolLifecycleTranscriptLines(fullTranscript(task).filter(traceLineVisibleInTraceView)); + return coalesceCodexToolLifecycleTranscriptLines(overlayTraceMessagesFromRawTranscript(oaLines, rawLines)); +} + +function taskAttemptTraceSteps(oaSteps: TraceSummaryOaSteps | null | undefined, attemptIndex: number): OaTraceStepSummary[] { + const source = oaSteps?.attemptSteps; + if (source === undefined || source === null) return []; + const key = String(attemptIndex); + return source instanceof Map ? source.get(attemptIndex) ?? [] : source[key] ?? []; +} + +function traceWindowsWithOaSteps(task: QueueTask, transcript: TranscriptLine[], oaSteps: TraceSummaryOaSteps | null | undefined): TraceAttemptWindow[] { + const mergedTranscript = mergeTraceWindowLines(transcript, oaTraceTranscriptFromSteps(task, oaSteps?.allSteps ?? [])); + const windows = traceAttemptWindows(task, mergedTranscript); + const byIndex = new Map(); + for (const window of windows) if (window.index > 0) byIndex.set(window.index, window); + const maxIndex = Math.max(task.currentAttempt || 0, task.attempts.length, ...Array.from(byIndex.keys()), 0); + for (let index = 1; index <= maxIndex; index += 1) { + const lines = oaTraceTranscriptFromSteps(task, taskAttemptTraceSteps(oaSteps, index)); + if (lines.length === 0) continue; + const existing = byIndex.get(index); + if (existing !== undefined) { + existing.lines = mergeTraceWindowLines(existing.lines, lines); + existing.startSeq = minNullableSeq(existing.startSeq, transcriptLineSeq(lines[0])); + const lastSeq = transcriptLineSeq(lines.at(-1)); + if (lastSeq !== null && (existing.endSeq === null || lastSeq > existing.endSeq)) existing.endSeq = lastSeq; + continue; + } + const attempt = task.attempts.find((item) => Number(item.index) === index) ?? task.attempts[index - 1] ?? null; + windows.push({ + index, + attempt, + startSeq: transcriptLineSeq(lines[0]), + endSeq: transcriptLineSeq(lines.at(-1)), + lines, + }); + } + return windows.sort((left, right) => Number(left.lines[0]?.seq ?? left.startSeq ?? 0) - Number(right.lines[0]?.seq ?? right.startSeq ?? 0)); +} + function mergeTraceWindowLines(left: TranscriptLine[], right: TranscriptLine[]): TranscriptLine[] { const seen = new Set(); const merged: TranscriptLine[] = []; @@ -2107,8 +2155,8 @@ function executionLinesForAttempt(lines: TranscriptLine[]): TranscriptLine[] { return lines.filter((line) => line.title !== "Submitted prompt" && line.title !== "Attempt started" && line.title !== "Judge result"); } -function taskTraceAttemptSummaries(task: QueueTask, transcript: TranscriptLine[], oaTraceStatsByScope: TraceStatsLookup = null): JsonValue[] { - const windows = traceAttemptWindows(task, transcript); +function taskTraceAttemptSummaries(task: QueueTask, transcript: TranscriptLine[], oaTraceStatsByScope: TraceStatsLookup = null, oaSteps: TraceSummaryOaSteps | null = null): JsonValue[] { + const windows = traceWindowsWithOaSteps(task, transcript, oaSteps); return windows.map((window) => { const attempt = window.attempt; const parsedJudge = judgeFromAttemptLines(window.lines); @@ -2119,11 +2167,24 @@ function taskTraceAttemptSummaries(task: QueueTask, transcript: TranscriptLine[] const finalResponseChars = Number(attempt?.finalResponseChars ?? finalResponse.length); const executionLines = executionLinesForAttempt(window.lines); const attemptStats = synthetic || window.index <= 0 ? null : traceStatsForScope(oaTraceStatsByScope, taskAttemptScopeId(task.id, window.index)); - const stepCount = traceStatsNumber(attemptStats, "stepCount"); - const readCount = traceStatsNumber(attemptStats, "readCount"); - const editCount = traceStatsNumber(attemptStats, "editCount"); - const runCount = traceStatsNumber(attemptStats, "runCount"); - const errorCount = traceStatsNumber(attemptStats, "errorCount"); + const fallbackStats = executionSummaryFromTranscript( + task, + executionLines, + attemptTimingSummary(attempt, executionLines.length > 0 ? executionLines : window.lines), + window.lines.length, + window.lines.length, + ); + const fallbackStatsRecord = traceStatsRecord(fallbackStats); + const rawTracePresent = executionLines.length > 0 || window.lines.length > 0; + const fallbackTraceStats = attemptStats === null && rawTracePresent + ? traceStatsFallbackRecord(taskAttemptScopeId(task.id, window.index), fallbackStatsRecord, window.lines, window.endSeq) + : null; + const traceStats = attemptStats ?? fallbackTraceStats; + const stepCount = traceStatsNumber(traceStats, "stepCount") ?? traceStatsNumber(fallbackStatsRecord, "stepCount"); + const readCount = traceStatsNumber(traceStats, "readCount") ?? traceStatsNumber(fallbackStatsRecord, "readCount"); + const editCount = traceStatsNumber(traceStats, "editCount") ?? traceStatsNumber(fallbackStatsRecord, "editCount"); + const runCount = traceStatsNumber(traceStats, "runCount") ?? traceStatsNumber(fallbackStatsRecord, "runCount"); + const errorCount = traceStatsNumber(traceStats, "errorCount") ?? traceStatsNumber(fallbackStatsRecord, "errorCount"); const feedbackPrompt = synthetic ? null : attemptFeedbackPromptRecord(task, window.index, attempt, judge); const inputPrompt = promptSnapshot(String(attempt?.inputPrompt ?? ""), 1200); return { @@ -2166,22 +2227,22 @@ function taskTraceAttemptSummaries(task: QueueTask, transcript: TranscriptLine[] editCount, runCount, errorCount, - statsSource: attemptStats === null ? "unavailable" : "oa-event-flow", - traceStats: attemptStats, - execution: executionSummaryWithOaStats( - executionSummaryFromTranscript( - task, - executionLines, - attemptTimingSummary(attempt, executionLines.length > 0 ? executionLines : window.lines), - window.lines.length, - window.lines.length, - ), - attemptStats, - ), + statsSource: attemptStats === null ? rawTracePresent ? "raw-trace-fallback" : "raw-trace-empty" : "oa-event-flow", + traceStatsState: attemptStats === null ? rawTracePresent ? "degraded" : "empty" : "ready", + traceStatsReason: attemptStats === null ? rawTracePresent ? "oa-event-flow-stats-unavailable-raw-trace-present" : "no-trace-steps-yet" : null, + statsUnavailable: attemptStats === null, + statsSyncing: attemptStats === null && rawTracePresent, + rawTraceStepCount: attemptStats === null ? traceStatsNumber(fallbackStatsRecord, "stepCount") ?? window.lines.length : null, + traceStats, + execution: executionSummaryWithOaStats(fallbackStats, traceStats), }; }) as unknown as JsonValue[]; } +function taskTraceSummaryFixtureResponse(task: QueueTask, options: { stats?: TraceStatsLookup; taskStats?: JsonValue | null; allSteps?: OaTraceStepSummary[]; attemptSteps?: Map | Record } = {}): JsonValue { + return taskTraceSummaryResponse(task, options.taskStats ?? null, options.stats ?? null, { allSteps: options.allSteps ?? [], attemptSteps: options.attemptSteps ?? {} }); +} + function resolvedReferencePromptParts(prompt: string): { reference: string; userPrompt: string } { const withoutEnvironment = stripCodeQueueEnvironmentHint(prompt); const trimmed = withoutEnvironment.trimStart(); @@ -2239,30 +2300,44 @@ function traceStatsForScope(lookup: TraceStatsLookup, scopeId: string): Record | null, lines: TranscriptLine[], endSeq: number | null): Record { + const stepCount = traceStatsNumber(fallback, "stepCount") ?? 0; + return { + scopeId, + source: "oa-event-flow", + stepCount, + llmStepCount: traceStatsNumber(fallback, "llmStepCount") ?? stepCount, + traceLineCount: traceStatsNumber(fallback, "traceLineCount") ?? lines.length, + outputMaxSeq: traceStatsNumber(fallback, "transcriptMaxSeq") ?? Number(endSeq ?? lines.at(-1)?.seq ?? 0), + readCount: traceStatsNumber(fallback, "readCount") ?? 0, + editCount: traceStatsNumber(fallback, "editCount") ?? 0, + runCount: traceStatsNumber(fallback, "runCount") ?? 0, + errorCount: traceStatsNumber(fallback, "errorCount") ?? 0, + sourceHint: "raw-trace-fallback", + }; +} + function executionSummaryWithOaStats(execution: JsonValue, stats: Record | null): JsonValue { if (typeof execution !== "object" || execution === null || Array.isArray(execution)) return execution; const record = execution as Record; if (stats === null) { - const { - stepCount: _stepCount, - llmStepCount: _llmStepCount, - toolCallCount: _toolCallCount, - readCount: _readCount, - editCount: _editCount, - runCount: _runCount, - errorCount: _errorCount, - traceLineCount: _traceLineCount, - outputMaxSeq: _outputMaxSeq, - transcriptMaxSeq: _transcriptMaxSeq, - ...rest - } = record; + const fallbackStepCount = traceStatsNumber(record, "stepCount") ?? traceStatsNumber(record, "llmStepCount"); + const rawTracePresent = (fallbackStepCount ?? 0) > 0 || (traceStatsNumber(record, "traceLineCount") ?? 0) > 0 || (traceStatsNumber(record, "transcriptMaxSeq") ?? 0) > 0; return { - ...rest, - statsSource: "unavailable", + ...record, + statsSource: rawTracePresent ? "raw-trace-fallback" : "raw-trace-empty", traceStats: null, statsUnavailable: true, + statsSyncing: rawTracePresent, + traceStatsState: rawTracePresent ? "degraded" : "empty", + traceStatsReason: rawTracePresent ? "oa-event-flow-stats-unavailable-raw-trace-present" : "no-trace-steps-yet", } as unknown as JsonValue; } + const sourceHint = String(stats.sourceHint ?? ""); const stepCount = traceStatsNumber(stats, "stepCount"); const llmStepCount = traceStatsNumber(stats, "llmStepCount") ?? stepCount; const readCount = traceStatsNumber(stats, "readCount"); @@ -2285,17 +2360,27 @@ function executionSummaryWithOaStats(execution: JsonValue, stats: Record); + const fallbackRecord = traceStatsRecord(fallbackExecution); + const stepCount = traceStatsNumber(stats, "stepCount") ?? traceStatsNumber(fallbackRecord, "stepCount"); const llmStepCount = traceStatsNumber(stats, "llmStepCount") ?? stepCount; - const errorCount = traceStatsNumber(stats, "errorCount"); - const execution = executionSummaryWithOaStats(taskExecutionSummary(task, transcript), stats); + const errorCount = traceStatsNumber(stats, "errorCount") ?? traceStatsNumber(fallbackRecord, "errorCount"); + const rawTracePresent = stats === null && ((stepCount ?? 0) > 0 || (traceStatsNumber(fallbackRecord, "traceLineCount") ?? 0) > 0 || (traceStatsNumber(fallbackRecord, "transcriptMaxSeq") ?? 0) > 0); + const fallbackTraceStats = rawTracePresent ? traceStatsFallbackRecord(taskScopeId(task.id), fallbackRecord, fallbackTranscript, fallbackTranscript.at(-1)?.seq ?? null) : null; + const effectiveStats = stats ?? fallbackTraceStats; + const execution = executionSummaryWithOaStats(fallbackExecution, effectiveStats); return { id: task.id, queueId: ctx().queueIdOf(task), @@ -2320,8 +2405,13 @@ function taskTraceSummaryResponse(task: QueueTask, oaTraceStats: JsonValue | nul promptEditable: ctx().queuedTaskPromptEditable(task), prompt: taskTracePromptSummary(task), execution, - traceStats: stats, - statsSource: stats === null ? "unavailable" : "oa-event-flow", + traceStats: effectiveStats, + statsSource: stats === null ? rawTracePresent ? "raw-trace-fallback" : "raw-trace-empty" : "oa-event-flow", + traceStatsState: stats === null ? rawTracePresent ? "degraded" : "empty" : "ready", + traceStatsReason: stats === null ? rawTracePresent ? "oa-event-flow-stats-unavailable-raw-trace-present" : "no-trace-steps-yet" : null, + statsUnavailable: stats === null, + statsSyncing: stats === null && rawTracePresent, + rawTraceStepCount: stats === null ? stepCount ?? 0 : null, finalResponse: task.finalResponse, finalResponseChars: task.finalResponse.length, lastJudge: task.lastJudge, @@ -2533,6 +2623,8 @@ export { taskToolSummary, taskTraceStepDetailResponse, taskTraceStepsResponse, + taskTraceAttemptSummaries, + taskTraceSummaryFixtureResponse, taskTraceSummaryResponse, taskPromptDetailResponse, transcriptLineSummaryLines,