fix: reconciler emits terminal otel spans (#231)
This commit is contained in:
@@ -2,6 +2,7 @@ import { spawn } from "node:child_process";
|
||||
import { AgentRunError } from "../common/errors.js";
|
||||
import { redactJson, redactText } from "../common/redaction.js";
|
||||
import type { BackendEvent, CommandRecord, EventType, FailureKind, JsonRecord, JsonValue, RunnerJobRecord, TerminalStatus } from "../common/types.js";
|
||||
import { emitAgentRunOtelSpan } from "../common/otel-trace.js";
|
||||
import { nowIso, stableHash } from "../common/validation.js";
|
||||
import type { AgentRunStore } from "./store.js";
|
||||
import { isTerminalCommandState, isTerminalRunStatus } from "./store.js";
|
||||
@@ -143,9 +144,14 @@ async function recoverTerminalOutboxIfNeeded(store: AgentRunStore, job: RunnerJo
|
||||
valuesPrinted: false,
|
||||
};
|
||||
}
|
||||
const beforeCommand = await getCommandOrNull(store, job.commandId);
|
||||
const eventReplay = await replayTerminalOutboxEvents(store, search.outbox);
|
||||
const nextCommand = await store.finishCommand(job.commandId, search.outbox.report);
|
||||
const runRecovery = await recoverTerminalOutboxRun(store, search.outbox);
|
||||
const otel = await emitTerminalOutboxOtelSpans(store, search.outbox, nextCommand, {
|
||||
commandTransitioned: beforeCommand === null || (!isTerminalCommandState(beforeCommand.state) && isTerminalCommandState(nextCommand.state)),
|
||||
runTransitioned: stringValue(runRecovery.state) === "terminal-outbox-replayed",
|
||||
});
|
||||
return {
|
||||
state: "recovered",
|
||||
terminalReportState: "terminal-outbox-replayed",
|
||||
@@ -162,6 +168,7 @@ async function recoverTerminalOutboxIfNeeded(store: AgentRunStore, job: RunnerJo
|
||||
parseFailedCount: search.parseFailedCount,
|
||||
eventReplay,
|
||||
runRecovery,
|
||||
otel,
|
||||
valuesPrinted: false,
|
||||
};
|
||||
} catch (error) {
|
||||
@@ -176,6 +183,51 @@ async function recoverTerminalOutboxIfNeeded(store: AgentRunStore, job: RunnerJo
|
||||
}
|
||||
}
|
||||
|
||||
async function getCommandOrNull(store: AgentRunStore, commandId: string): Promise<CommandRecord | null> {
|
||||
try {
|
||||
return await store.getCommand(commandId);
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
async function emitTerminalOutboxOtelSpans(store: AgentRunStore, outbox: TerminalOutboxRecord, command: CommandRecord, state: { commandTransitioned: boolean; runTransitioned: boolean }): Promise<JsonRecord> {
|
||||
if (!state.commandTransitioned && !state.runTransitioned) return { emitted: false, reason: "already-terminal", valuesPrinted: false };
|
||||
const run = await store.getRun(outbox.runId);
|
||||
const common = {
|
||||
source: "manager-reconciler",
|
||||
phase: "command-terminal",
|
||||
outboxKey: outbox.outboxKey,
|
||||
outboxPhase: outbox.phase,
|
||||
terminalStatus: outbox.report.terminalStatus,
|
||||
failureKind: outbox.report.failureKind,
|
||||
commandId: outbox.commandId,
|
||||
attemptId: outbox.attemptId,
|
||||
runnerId: outbox.runnerId,
|
||||
runnerJobId: outbox.runnerJobId,
|
||||
terminalRun: outbox.terminalRun === true,
|
||||
commandTransitioned: state.commandTransitioned,
|
||||
runTransitioned: state.runTransitioned,
|
||||
};
|
||||
const commandSpan = await emitAgentRunOtelSpan("runner_command_terminal", run, process.env, {
|
||||
command,
|
||||
kind: 2,
|
||||
status: outbox.report.terminalStatus === "failed" || outbox.report.terminalStatus === "blocked" ? "error" : "ok",
|
||||
error: outbox.report.terminalStatus === "failed" || outbox.report.terminalStatus === "blocked" ? outbox.report.failureMessage ?? outbox.report.failureKind ?? "terminal failure" : undefined,
|
||||
attributes: common,
|
||||
});
|
||||
const terminalSpan = command.type === "turn"
|
||||
? await emitAgentRunOtelSpan(`runner_terminal.${outbox.report.terminalStatus}`, run, process.env, {
|
||||
command,
|
||||
kind: 2,
|
||||
status: outbox.report.terminalStatus === "failed" || outbox.report.terminalStatus === "blocked" ? "error" : "ok",
|
||||
error: outbox.report.terminalStatus === "failed" || outbox.report.terminalStatus === "blocked" ? outbox.report.failureMessage ?? outbox.report.failureKind ?? "terminal failure" : undefined,
|
||||
attributes: { ...common, eventType: "terminal_status" },
|
||||
})
|
||||
: { skipped: true, reason: "non-turn-command", valuesPrinted: false };
|
||||
return { emitted: true, commandSpan, terminalSpan, valuesPrinted: false };
|
||||
}
|
||||
|
||||
async function replayTerminalOutboxEvents(store: AgentRunStore, outbox: TerminalOutboxRecord): Promise<JsonRecord> {
|
||||
const existing = await store.listEvents(outbox.runId, 0, 1_000);
|
||||
const existingKeys = new Set(existing.map((event) => eventContentKey({ type: event.type, payload: event.payload })));
|
||||
|
||||
Reference in New Issue
Block a user