diff --git a/src/mgr/runner-reconciler.ts b/src/mgr/runner-reconciler.ts index 7553b70..1d6721e 100644 --- a/src/mgr/runner-reconciler.ts +++ b/src/mgr/runner-reconciler.ts @@ -2,6 +2,7 @@ import { spawn } from "node:child_process"; import { AgentRunError } from "../common/errors.js"; import { redactJson, redactText } from "../common/redaction.js"; import type { BackendEvent, CommandRecord, EventType, FailureKind, JsonRecord, JsonValue, RunnerJobRecord, TerminalStatus } from "../common/types.js"; +import { emitAgentRunOtelSpan } from "../common/otel-trace.js"; import { nowIso, stableHash } from "../common/validation.js"; import type { AgentRunStore } from "./store.js"; import { isTerminalCommandState, isTerminalRunStatus } from "./store.js"; @@ -143,9 +144,14 @@ async function recoverTerminalOutboxIfNeeded(store: AgentRunStore, job: RunnerJo valuesPrinted: false, }; } + const beforeCommand = await getCommandOrNull(store, job.commandId); const eventReplay = await replayTerminalOutboxEvents(store, search.outbox); const nextCommand = await store.finishCommand(job.commandId, search.outbox.report); const runRecovery = await recoverTerminalOutboxRun(store, search.outbox); + const otel = await emitTerminalOutboxOtelSpans(store, search.outbox, nextCommand, { + commandTransitioned: beforeCommand === null || (!isTerminalCommandState(beforeCommand.state) && isTerminalCommandState(nextCommand.state)), + runTransitioned: stringValue(runRecovery.state) === "terminal-outbox-replayed", + }); return { state: "recovered", terminalReportState: "terminal-outbox-replayed", @@ -162,6 +168,7 @@ async function recoverTerminalOutboxIfNeeded(store: AgentRunStore, job: RunnerJo parseFailedCount: search.parseFailedCount, eventReplay, runRecovery, + otel, valuesPrinted: false, }; } catch (error) { @@ -176,6 +183,51 @@ async function recoverTerminalOutboxIfNeeded(store: AgentRunStore, job: RunnerJo } } +async function getCommandOrNull(store: AgentRunStore, commandId: string): Promise { + try { + return await store.getCommand(commandId); + } catch { + return null; + } +} + +async function emitTerminalOutboxOtelSpans(store: AgentRunStore, outbox: TerminalOutboxRecord, command: CommandRecord, state: { commandTransitioned: boolean; runTransitioned: boolean }): Promise { + if (!state.commandTransitioned && !state.runTransitioned) return { emitted: false, reason: "already-terminal", valuesPrinted: false }; + const run = await store.getRun(outbox.runId); + const common = { + source: "manager-reconciler", + phase: "command-terminal", + outboxKey: outbox.outboxKey, + outboxPhase: outbox.phase, + terminalStatus: outbox.report.terminalStatus, + failureKind: outbox.report.failureKind, + commandId: outbox.commandId, + attemptId: outbox.attemptId, + runnerId: outbox.runnerId, + runnerJobId: outbox.runnerJobId, + terminalRun: outbox.terminalRun === true, + commandTransitioned: state.commandTransitioned, + runTransitioned: state.runTransitioned, + }; + const commandSpan = await emitAgentRunOtelSpan("runner_command_terminal", run, process.env, { + command, + kind: 2, + status: outbox.report.terminalStatus === "failed" || outbox.report.terminalStatus === "blocked" ? "error" : "ok", + error: outbox.report.terminalStatus === "failed" || outbox.report.terminalStatus === "blocked" ? outbox.report.failureMessage ?? outbox.report.failureKind ?? "terminal failure" : undefined, + attributes: common, + }); + const terminalSpan = command.type === "turn" + ? await emitAgentRunOtelSpan(`runner_terminal.${outbox.report.terminalStatus}`, run, process.env, { + command, + kind: 2, + status: outbox.report.terminalStatus === "failed" || outbox.report.terminalStatus === "blocked" ? "error" : "ok", + error: outbox.report.terminalStatus === "failed" || outbox.report.terminalStatus === "blocked" ? outbox.report.failureMessage ?? outbox.report.failureKind ?? "terminal failure" : undefined, + attributes: { ...common, eventType: "terminal_status" }, + }) + : { skipped: true, reason: "non-turn-command", valuesPrinted: false }; + return { emitted: true, commandSpan, terminalSpan, valuesPrinted: false }; +} + async function replayTerminalOutboxEvents(store: AgentRunStore, outbox: TerminalOutboxRecord): Promise { const existing = await store.listEvents(outbox.runId, 0, 1_000); const existingKeys = new Set(existing.map((event) => eventContentKey({ type: event.type, payload: event.payload })));