diff --git a/deploy.json b/deploy.json index 112d6f40..5995f40a 100644 --- a/deploy.json +++ b/deploy.json @@ -54,7 +54,7 @@ { "id": "code-queue-mgr", "repo": "https://github.com/pikasTech/unidesk", - "commitId": "97a647bc8d819cf25691764736d3b001ef1b066e" + "commitId": "b0f76056f86dda9dcd43b5ef76d16cc7e990786f" }, { "id": "mdtodo", diff --git a/src/components/microservices/code-queue-mgr/src/index.ts b/src/components/microservices/code-queue-mgr/src/index.ts index 2ccc625c..daf56e86 100644 --- a/src/components/microservices/code-queue-mgr/src/index.ts +++ b/src/components/microservices/code-queue-mgr/src/index.ts @@ -227,6 +227,34 @@ interface QueueRow { updated_at: Date | string; } +interface OaTraceStepRow { + scope_id: string; + step_seq: number | string; + kind: string; + title: string | null; + status: string | null; + summary_lines: unknown; + raw_seqs: unknown; + created_at: Date | string; + updated_at: Date | string; +} + +interface TraceStepLine { + seq: number; + at: string; + kind: string; + label: string; + title: string; + status: string | null; + summaryLines: string[]; + commandPreview?: string; + commandOmittedLines?: number; + bodyPreview?: string; + bodyOmittedLines?: number; + rawSeqs: number[]; + source: string; +} + interface HttpErrorDetail { status: number; body: JsonRecord; @@ -665,6 +693,19 @@ function normalizeOutput(value: unknown): LiveOutput[] { }).sort((left, right) => left.seq - right.seq); } +function stringList(value: unknown): string[] { + if (Array.isArray(value)) return value.map((item) => String(item ?? "")).filter((item) => item.trim().length > 0); + if (typeof value === "string" && value.trim().length > 0) return [value]; + return []; +} + +function numberList(value: unknown, fallback: number): number[] { + const values = Array.isArray(value) ? value : []; + const numbers = values.map((item) => Number(item)).filter(Number.isFinite).map((item) => Math.floor(item)); + if (numbers.length === 0 && Number.isFinite(fallback)) numbers.push(Math.floor(fallback)); + return Array.from(new Set(numbers)).sort((left, right) => left - right); +} + function normalizeTask(value: unknown): QueueTask { const record = asRecord(value) ?? {}; const status = String(record.status || "queued") as TaskStatus; @@ -1803,6 +1844,43 @@ function outputToTranscriptLine(item: LiveOutput, fullText: boolean): JsonRecord }; } +function outputToTraceStepLine(item: LiveOutput): TraceStepLine { + const body = safePreview(item.text, 1600); + return { + seq: item.seq, + at: item.at, + kind: transcriptKind(item), + label: item.channel, + title: item.method ?? item.channel, + status: item.method ?? null, + summaryLines: body.length > 0 ? [body] : [], + commandPreview: item.channel === "command" ? body : undefined, + commandOmittedLines: 0, + bodyPreview: item.channel === "command" ? "" : body, + bodyOmittedLines: 0, + rawSeqs: [item.seq], + source: "code-queue-mgr-postgres", + }; +} + +function traceStepJson(line: TraceStepLine): JsonRecord { + return { + seq: line.seq, + at: line.at, + kind: line.kind, + label: line.label, + title: line.title, + status: line.status, + summaryLines: line.summaryLines, + commandPreview: line.commandPreview ?? null, + commandOmittedLines: line.commandOmittedLines ?? 0, + bodyPreview: line.bodyPreview ?? "", + bodyOmittedLines: line.bodyOmittedLines ?? 0, + rawSeqs: line.rawSeqs, + source: line.source, + }; +} + function transcriptPage(task: QueueTask, url: URL): JsonRecord { const limit = parseLimit(url); const afterSeq = numberField(url.searchParams.get("afterSeq"), 0); @@ -1841,14 +1919,174 @@ function transcriptPage(task: QueueTask, url: URL): JsonRecord { }; } -function traceSteps(task: QueueTask, url: URL): JsonRecord { +function oaTraceStepKind(kind: string): string { + const normalized = String(kind || "").trim().toLowerCase(); + if (normalized === "read" || normalized === "explored" || normalized === "explore") return "explored"; + if (normalized === "edit" || normalized === "edited") return "edited"; + if (normalized === "run" || normalized === "ran" || normalized === "command") return "ran"; + if (normalized === "error" || normalized === "failed") return "error"; + if (normalized === "message" || normalized === "assistant" || normalized === "user") return "message"; + return "system"; +} + +function traceStepLifecycleMethod(line: TraceStepLine): "item/started" | "item/completed" | null { + const status = String(line.status || "").trim(); + if (status === "item/started" || status === "item/completed") return status; + const text = [line.title, ...line.summaryLines, line.commandPreview, line.bodyPreview].map((item) => String(item || "")).join("\n"); + if (/\bitem\/started\b/u.test(text)) return "item/started"; + if (/\bitem\/completed\b/u.test(text)) return "item/completed"; + return null; +} + +function traceStepLifecycleTool(line: TraceStepLine): string | null { + const text = [line.title, ...line.summaryLines, line.commandPreview, line.bodyPreview].map((item) => String(item || "")).join("\n"); + const match = /\b(webSearch|mcpToolCall|dynamicToolCall)\b/u.exec(text); + return match?.[1] ?? null; +} + +function traceStepIsCodexToolLifecycle(line: TraceStepLine): boolean { + return traceStepLifecycleMethod(line) !== null && traceStepLifecycleTool(line) !== null; +} + +function pushUniqueNumber(values: number[], value: number): void { + if (Number.isFinite(value) && !values.includes(Math.floor(value))) values.push(Math.floor(value)); +} + +function pushUniqueString(values: string[], value: string): void { + const text = value.trim(); + if (text.length > 0 && !values.includes(text)) values.push(text); +} + +function traceStepCommandKind(text: string, fallback: string): string { + if (/\bwebSearch\b/u.test(text)) return "explored"; + if (/\b(?:mcpToolCall|dynamicToolCall)\b/u.test(text)) return "ran"; + return fallback; +} + +function shortTraceStepTitle(text: string, fallback: string): string { + const normalized = text.replace(/\s+/gu, " ").trim(); + return normalized.length > 0 ? safePreview(normalized, 140) : fallback; +} + +function mergeCodexToolLifecycleTraceSteps(group: TraceStepLine[]): TraceStepLine { + if (group.length <= 1) return group[0] as TraceStepLine; + const first = group[0] as TraceStepLine; + const last = group.at(-1) ?? first; + const rawSeqs: number[] = []; + const summaryLines: string[] = []; + for (const line of group) { + for (const seq of line.rawSeqs.length > 0 ? line.rawSeqs : [line.seq]) pushUniqueNumber(rawSeqs, seq); + for (const summaryLine of line.summaryLines) pushUniqueString(summaryLines, summaryLine); + } + rawSeqs.sort((left, right) => left - right); + const commandText = String(last.commandPreview || first.commandPreview || last.bodyPreview || first.bodyPreview || last.title || first.title || ""); + const kind = traceStepCommandKind(commandText, last.kind || first.kind); + return { + ...first, + at: last.at || first.at, + kind, + title: shortTraceStepTitle(commandText, last.title || first.title || "Trace step"), + status: last.status || first.status, + summaryLines: summaryLines.length > 0 ? summaryLines : first.summaryLines, + commandPreview: commandText.length > 0 ? commandText : first.commandPreview, + commandOmittedLines: Number(first.commandOmittedLines || 0) + Number(last.commandOmittedLines || 0), + bodyPreview: last.bodyPreview || first.bodyPreview, + bodyOmittedLines: Number(first.bodyOmittedLines || 0) + Number(last.bodyOmittedLines || 0), + rawSeqs, + }; +} + +function coalesceCodexToolLifecycleTraceSteps(lines: TraceStepLine[]): TraceStepLine[] { + const rows = [...lines].sort((left, right) => left.seq - right.seq); + const merged: TraceStepLine[] = []; + let group: TraceStepLine[] = []; + const flush = (): void => { + if (group.length > 0) merged.push(mergeCodexToolLifecycleTraceSteps(group)); + group = []; + }; + for (const line of rows) { + if (traceStepIsCodexToolLifecycle(line)) { + const method = traceStepLifecycleMethod(line); + const tool = traceStepLifecycleTool(line); + const groupTool = group.length === 0 ? null : traceStepLifecycleTool(group[0] as TraceStepLine); + if (method === "item/started" && group.length > 0) flush(); + if (group.length > 0 && groupTool !== null && tool !== groupTool) flush(); + group.push(line); + if (method === "item/completed") flush(); + continue; + } + flush(); + merged.push(line); + } + flush(); + return merged; +} + +function traceScopeId(taskId: string, url: URL): string { + const attempt = numberField(url.searchParams.get("attempt"), 0); + return attempt > 0 ? `task:${taskId}:attempt:${attempt}` : `task:${taskId}`; +} + +function oaTraceStepToLine(row: OaTraceStepRow): TraceStepLine | null { + const seq = numberField(row.step_seq, 0); + if (seq <= 0) return null; + const kind = oaTraceStepKind(row.kind); + const title = String(row.title || (kind === "explored" ? "Read" : kind === "edited" ? "Edit" : kind === "ran" ? "Run" : "Trace step")); + const summaryLines = stringList(row.summary_lines); + const summaryText = summaryLines.join("\n").trimEnd(); + const commandText = summaryText.startsWith("item/") ? summaryText : ""; + return { + seq, + at: timestampToIso(row.updated_at) ?? timestampToIso(row.created_at) ?? nowIso(), + kind, + label: row.kind, + title, + status: row.status ?? null, + summaryLines, + commandPreview: commandText.length > 0 ? commandText : undefined, + commandOmittedLines: 0, + bodyPreview: commandText.length > 0 ? "" : summaryText, + bodyOmittedLines: 0, + rawSeqs: numberList(row.raw_seqs, seq), + source: "code-queue-mgr-oa-trace-steps", + }; +} + +async function loadOaTraceStepLines(taskId: string, url: URL): Promise { + const scopeId = traceScopeId(taskId, url); + const rows = await traceSql` + SELECT scope_id, step_seq, kind, title, status, summary_lines, raw_seqs, created_at, updated_at + FROM oa_trace_steps + WHERE scope_id = ${scopeId} + ORDER BY step_seq ASC + LIMIT 5000 + `; + return coalesceCodexToolLifecycleTraceSteps(rows.map(oaTraceStepToLine).filter((line): line is TraceStepLine => line !== null)); +} + +function fallbackTraceStepLines(task: QueueTask): TraceStepLine[] { + return task.output + .filter((item) => item.channel !== "system" || item.method !== "enqueue") + .sort((left, right) => left.seq - right.seq) + .map(outputToTraceStepLine); +} + +async function traceStepLines(task: QueueTask, url: URL): Promise<{ source: string; steps: TraceStepLine[] }> { + try { + const oaSteps = await loadOaTraceStepLines(task.id, url); + if (oaSteps.length > 0) return { source: "code-queue-mgr-oa-trace-steps", steps: oaSteps }; + } catch (error) { + log("warn", "oa_trace_steps_load_failed", { taskId: task.id, scopeId: traceScopeId(task.id, url), error: errorToJson(error) }); + } + return { source: "code-queue-mgr-postgres", steps: fallbackTraceStepLines(task) }; +} + +async function traceSteps(task: QueueTask, url: URL): Promise { const limit = parseLimit(url); const afterSeq = numberField(url.searchParams.get("afterSeq"), 0); const beforeSeqRaw = url.searchParams.get("beforeSeq"); const tail = url.searchParams.get("tail") === "1"; - const visible = task.output - .filter((item) => item.channel !== "system" || item.method !== "enqueue") - .sort((left, right) => left.seq - right.seq); + const { source, steps: visible } = await traceStepLines(task, url); let rows = visible; if (beforeSeqRaw !== null) rows = visible.filter((item) => item.seq < numberField(beforeSeqRaw, Number.MAX_SAFE_INTEGER)).slice(-limit); else if (tail) rows = visible.slice(-limit); @@ -1858,7 +2096,7 @@ function traceSteps(task: QueueTask, url: URL): JsonRecord { return { ok: true, taskId: task.id, - source: "code-queue-mgr-postgres", + source, total: visible.length, returned: rows.length, limit, @@ -1868,20 +2106,25 @@ function traceSteps(task: QueueTask, url: URL): JsonRecord { previousBeforeSeq: firstSeq, hasMore: lastSeq !== null && visible.some((item) => item.seq > lastSeq), hasBefore: firstSeq !== null && visible.some((item) => item.seq < firstSeq), - steps: rows.map((item) => ({ - seq: item.seq, - at: item.at, - kind: item.channel === "command" ? "ran" : item.channel === "diff" ? "edited" : item.channel === "tool" ? "explored" : item.channel === "error" ? "error" : "message", - label: item.channel, - title: item.method ?? item.channel, - status: item.method ?? null, - bodyPreview: safePreview(item.text, 1600), - bodyOmittedLines: 0, - rawSeqs: [item.seq], - })), + steps: rows.map(traceStepJson), }; } +async function traceStepDetail(task: QueueTask, url: URL): Promise { + const seq = numberField(url.searchParams.get("seq"), 0); + const { source, steps } = await traceStepLines(task, url); + const line = steps.find((item) => item.seq === seq || item.rawSeqs.includes(seq)) ?? null; + if (line !== null) { + const payload = traceStepJson(line); + return { ok: true, taskId: task.id, source, step: payload, rawOutput: payload, line: payload }; + } + const item = task.output.find((output) => output.seq === seq) ?? null; + if (item === null) return null; + const fallback = outputToTraceStepLine(item); + const payload = traceStepJson(fallback); + return { ok: true, taskId: task.id, source: "code-queue-mgr-postgres", step: item as unknown as JsonValue, rawOutput: item as unknown as JsonValue, line: payload }; +} + function traceSummary(task: QueueTask): JsonRecord { const steps = task.output.filter((item) => item.channel !== "system" || item.method !== "enqueue"); return { @@ -2459,14 +2702,14 @@ async function route(req: Request): Promise { const traceStepsMatch = url.pathname.match(/^\/api\/tasks\/([^/]+)\/trace-steps$/u); if (traceStepsMatch !== null && req.method === "GET") { const task = await loadTask(decodeURIComponent(traceStepsMatch[1] ?? "")); - return task === null ? jsonResponse({ ok: false, error: "task not found" }, 404) : jsonResponse(traceSteps(task, url)); + return task === null ? jsonResponse({ ok: false, error: "task not found" }, 404) : jsonResponse(await traceSteps(task, url)); } const traceStepMatch = url.pathname.match(/^\/api\/tasks\/([^/]+)\/trace-step$/u); if (traceStepMatch !== null && req.method === "GET") { const task = await loadTask(decodeURIComponent(traceStepMatch[1] ?? "")); - const seq = numberField(url.searchParams.get("seq"), 0); - const item = task?.output.find((output) => output.seq === seq) ?? null; - return task === null || item === null ? jsonResponse({ ok: false, error: "trace step not found" }, 404) : jsonResponse({ ok: true, taskId: task.id, step: item, rawOutput: item }); + if (task === null) return jsonResponse({ ok: false, error: "trace step not found" }, 404); + const detail = await traceStepDetail(task, url); + return detail === null ? jsonResponse({ ok: false, error: "trace step not found" }, 404) : jsonResponse(detail); } const summaryMatch = url.pathname.match(/^\/api\/tasks\/([^/]+)\/summary$/u); if (summaryMatch !== null && req.method === "GET") {