From 909e8b31f8a5eaaf89fdd1dae9233b066e0302b0 Mon Sep 17 00:00:00 2001 From: lyon Date: Sat, 20 Jun 2026 16:12:25 +0800 Subject: [PATCH] fix: recover terminal runner outbox --- src/mgr/runner-reconciler.ts | 296 ++++++++++++++++++++++++++++++++++- src/runner/run-once.ts | 163 ++++++++++++++----- 2 files changed, 421 insertions(+), 38 deletions(-) diff --git a/src/mgr/runner-reconciler.ts b/src/mgr/runner-reconciler.ts index fa33a70..22c66ce 100644 --- a/src/mgr/runner-reconciler.ts +++ b/src/mgr/runner-reconciler.ts @@ -1,7 +1,7 @@ import { spawn } from "node:child_process"; import { AgentRunError } from "../common/errors.js"; import { redactJson, redactText } from "../common/redaction.js"; -import type { CommandRecord, FailureKind, JsonRecord, JsonValue, RunnerJobRecord, TerminalStatus } from "../common/types.js"; +import type { BackendEvent, CommandRecord, EventType, FailureKind, JsonRecord, JsonValue, RunnerJobRecord, TerminalStatus } from "../common/types.js"; import { nowIso, stableHash } from "../common/validation.js"; import type { AgentRunStore } from "./store.js"; import { isTerminalCommandState, isTerminalRunStatus } from "./store.js"; @@ -37,6 +37,35 @@ interface K8sList { items?: K8sObject[]; } +interface CommandTerminalReport { + terminalStatus: TerminalStatus; + failureKind: FailureKind | null; + failureMessage: string | null; + threadId?: string; + turnId?: string; +} + +interface TerminalOutboxRecord { + outboxKey: string; + phase: string | null; + runId: string; + commandId: string; + runnerId: string; + runnerJobId: string | null; + attemptId: string; + report: CommandTerminalReport; + terminalRun: boolean; + events: BackendEvent[]; +} + +interface TerminalOutboxSearch { + outbox: TerminalOutboxRecord | null; + lineCount: number; + candidateCount: number; + parseFailedCount: number; + stdoutHash: string; +} + export async function reconcileRunnerJobsOnce(input: RunnerReconcilerOptions): Promise { const limit = clampLimit(input.limit ?? 20); const jobs = await input.store.listRunnerJobsForReconciliation(limit); @@ -45,7 +74,9 @@ export async function reconcileRunnerJobsOnce(input: RunnerReconcilerOptions): P let runClosureCount = 0; for (const job of jobs) { - const observation = await observeRunnerJob(job, { ...(input.namespace ? { namespace: input.namespace } : {}), ...(input.kubectlCommand ? { kubectlCommand: input.kubectlCommand } : {}) }); + let observation = await observeRunnerJob(job, { ...(input.namespace ? { namespace: input.namespace } : {}), ...(input.kubectlCommand ? { kubectlCommand: input.kubectlCommand } : {}) }); + const terminalOutbox = await recoverTerminalOutboxIfNeeded(input.store, job, observation, { ...(input.namespace ? { namespace: input.namespace } : {}), ...(input.kubectlCommand ? { kubectlCommand: input.kubectlCommand } : {}) }); + if (terminalOutbox) observation = mergeTerminalOutboxObservation(observation, terminalOutbox); await input.store.updateRunnerJobResult(job.id, { observation }); if (stringValue(observation.category) === "runner-job-observe-failed") observeFailedCount++; const runClosure = await closeOpenRunWhenCommandTerminal(input.store, job); @@ -59,6 +90,7 @@ export async function reconcileRunnerJobsOnce(input: RunnerReconcilerOptions): P observedRunnerPhase: stringValue(observation.observedRunnerPhase) ?? "unknown", terminalReportState: stringValue(observation.terminalReportState) ?? "unknown", runReportState: stringValue(observation.runReportState) ?? "unknown", + terminalOutboxState: terminalOutboxState(observation), runClosure, valuesPrinted: false, }); @@ -77,6 +109,114 @@ export async function reconcileRunnerJobsOnce(input: RunnerReconcilerOptions): P }; } +async function recoverTerminalOutboxIfNeeded(store: AgentRunStore, job: RunnerJobRecord, observation: JsonRecord, input: { namespace?: string; kubectlCommand?: string }): Promise { + const phase = stringValue(observation.observedRunnerPhase); + if (phase !== "k8s:succeeded" && phase !== "k8s:failed") return null; + try { + const command = await store.getCommand(job.commandId); + if (isTerminalCommandState(command.state)) { + return { state: "command-terminal", terminalReportState: "terminal-command-already-recorded", commandState: command.state, valuesPrinted: false }; + } + const namespace = input.namespace ?? job.namespace; + const kubectlCommand = input.kubectlCommand ?? "kubectl"; + const logs = await getRunnerJobLogs(kubectlCommand, namespace, job.jobName); + if (logs.code !== 0) { + return { + state: "logs-unavailable", + terminalReportState: "terminal-outbox-unavailable", + runReportState: "not-updated", + failureKind: "infra-failed", + failureMessage: redactText(logs.stderr || `kubectl logs failed with code ${logs.code}`).slice(0, 300), + valuesPrinted: false, + }; + } + const search = findLatestTerminalOutbox(logs.stdout, job); + if (!search.outbox) { + return { + state: "not-found", + terminalReportState: "terminal-outbox-missing", + runReportState: "not-updated", + lineCount: search.lineCount, + candidateCount: search.candidateCount, + parseFailedCount: search.parseFailedCount, + stdoutHash: search.stdoutHash, + valuesPrinted: false, + }; + } + const eventReplay = await replayTerminalOutboxEvents(store, search.outbox); + const nextCommand = await store.finishCommand(job.commandId, search.outbox.report); + const runRecovery = await recoverTerminalOutboxRun(store, search.outbox); + return { + state: "recovered", + terminalReportState: "terminal-outbox-replayed", + runReportState: stringValue(runRecovery.state) ?? "not-updated", + outboxKey: search.outbox.outboxKey, + phase: search.outbox.phase, + commandState: nextCommand.state, + terminalStatus: search.outbox.report.terminalStatus, + failureKind: search.outbox.report.failureKind, + failureMessageHash: search.outbox.report.failureMessage ? stableHash({ message: search.outbox.report.failureMessage }) : null, + terminalRun: search.outbox.terminalRun, + lineCount: search.lineCount, + candidateCount: search.candidateCount, + parseFailedCount: search.parseFailedCount, + eventReplay, + runRecovery, + valuesPrinted: false, + }; + } catch (error) { + return { + state: "recovery-failed", + terminalReportState: "terminal-outbox-recovery-failed", + runReportState: "not-updated", + failureKind: "infra-failed", + failureMessage: error instanceof Error ? redactText(error.message).slice(0, 300) : "unknown terminal outbox recovery failure", + valuesPrinted: false, + }; + } +} + +async function replayTerminalOutboxEvents(store: AgentRunStore, outbox: TerminalOutboxRecord): Promise { + const existing = await store.listEvents(outbox.runId, 0, 1_000); + const existingKeys = new Set(existing.map((event) => eventContentKey({ type: event.type, payload: event.payload }))); + let appendedCount = 0; + let skippedCount = 0; + for (const event of outbox.events) { + const key = eventContentKey(event); + if (existingKeys.has(key)) { + skippedCount++; + continue; + } + await store.appendEvent(outbox.runId, event.type, event.payload); + existingKeys.add(key); + appendedCount++; + } + return { eventCount: outbox.events.length, appendedCount, skippedCount, valuesPrinted: false }; +} + +async function recoverTerminalOutboxRun(store: AgentRunStore, outbox: TerminalOutboxRecord): Promise { + if (outbox.terminalRun !== true) return { state: "not-requested", valuesPrinted: false }; + const run = await store.getRun(outbox.runId); + if (isTerminalRunStatus(run.status)) return { state: "already-terminal", runStatus: run.status, valuesPrinted: false }; + const next = await store.finishRun(outbox.runId, outbox.report); + return { state: "terminal-outbox-replayed", runStatus: next.status, terminalStatus: outbox.report.terminalStatus, valuesPrinted: false }; +} + +function mergeTerminalOutboxObservation(observation: JsonRecord, terminalOutbox: JsonRecord): JsonRecord { + return { + ...observation, + terminalOutbox, + terminalReportState: stringValue(terminalOutbox.terminalReportState) ?? stringValue(observation.terminalReportState) ?? "unknown", + runReportState: stringValue(terminalOutbox.runReportState) ?? stringValue(observation.runReportState) ?? "unknown", + evidenceLevel: terminalOutbox.state === "recovered" ? "high" : observation.evidenceLevel ?? "medium", + valuesPrinted: false, + }; +} + +function terminalOutboxState(observation: JsonRecord): string { + return stringValue(recordValue(observation.terminalOutbox)?.state) ?? "not-checked"; +} + export function startRunnerJobReconciler(input: RunnerReconcilerLoopOptions): () => void { const intervalMs = Math.max(1_000, input.intervalMs); let stopped = false; @@ -256,6 +396,158 @@ async function getK8sList(kubectlCommand: string, resource: string, namespace: s return Array.isArray(parsed.items) ? parsed.items.filter((item) => typeof item === "object" && item !== null && !Array.isArray(item)) : []; } +async function getRunnerJobLogs(kubectlCommand: string, namespace: string, jobName: string): Promise<{ code: number | null; signal: NodeJS.Signals | null; stdout: string; stderr: string }> { + return await kubectlRun(kubectlCommand, ["logs", `job/${jobName}`, "-n", namespace, "--tail=2000"]); +} + +function findLatestTerminalOutbox(stdout: string, job: RunnerJobRecord): TerminalOutboxSearch { + let outbox: TerminalOutboxRecord | null = null; + let lineCount = 0; + let candidateCount = 0; + let parseFailedCount = 0; + for (const line of stdout.split(/\r?\n/u)) { + if (line.trim().length === 0) continue; + lineCount++; + if (!line.includes("terminal.outbox")) continue; + let parsed: unknown; + try { + parsed = JSON.parse(line); + } catch { + parseFailedCount++; + continue; + } + const candidate = terminalOutboxFromRecord(parsed); + if (!candidate) continue; + candidateCount++; + if (matchesTerminalOutbox(candidate, job)) outbox = candidate; + } + return { outbox, lineCount, candidateCount, parseFailedCount, stdoutHash: stableHash({ stdoutTail: stdout.slice(-20_000), stdoutLength: stdout.length }) }; +} + +function terminalOutboxFromRecord(value: unknown): TerminalOutboxRecord | null { + const record = recordValue(value); + if (!record || record.label !== "terminal.outbox") return null; + const outboxKey = stringValue(record.outboxKey); + const runId = stringValue(record.runId); + const commandId = stringValue(record.commandId); + const runnerId = stringValue(record.runnerId); + const attemptId = stringValue(record.attemptId); + const reportRecord = recordValue(record.report); + if (!outboxKey || !runId || !commandId || !runnerId || !attemptId || !reportRecord) return null; + const report = terminalReportFromRecord(reportRecord); + if (!report) return null; + const events = Array.isArray(record.events) ? record.events.map(eventFromRecord).filter((event): event is BackendEvent => event !== null) : []; + return { + outboxKey, + phase: stringValue(record.phase), + runId, + commandId, + runnerId, + runnerJobId: stringValue(record.runnerJobId), + attemptId, + report, + terminalRun: record.terminalRun === true, + events, + }; +} + +function terminalReportFromRecord(record: JsonRecord): CommandTerminalReport | null { + const terminalStatus = terminalStatusValue(record.terminalStatus); + const failureKind = failureKindValue(record.failureKind); + if (!terminalStatus || failureKind === undefined) return null; + const threadId = stringValue(record.threadId); + const turnId = stringValue(record.turnId); + return { + terminalStatus, + failureKind, + failureMessage: nullableString(record.failureMessage), + ...(threadId ? { threadId } : {}), + ...(turnId ? { turnId } : {}), + }; +} + +function eventFromRecord(value: unknown): BackendEvent | null { + const record = recordValue(value); + if (!record) return null; + const type = eventTypeValue(record.type); + const payload = recordValue(record.payload); + if (!type || !payload) return null; + return { type, payload }; +} + +function matchesTerminalOutbox(outbox: TerminalOutboxRecord, job: RunnerJobRecord): boolean { + if (outbox.runId !== job.runId || outbox.commandId !== job.commandId) return false; + if (outbox.runnerJobId && outbox.runnerJobId !== job.id) return false; + return true; +} + +function eventContentKey(event: BackendEvent): string { + return stableHash({ type: event.type, payload: stripTerminalOutboxEventPayload(event.payload) }); +} + +function stripTerminalOutboxEventPayload(payload: JsonRecord): JsonRecord { + const next: JsonRecord = {}; + for (const [key, value] of Object.entries(payload)) { + if (key === "terminalOutboxKey" || key === "terminalOutboxIndex") continue; + next[key] = value; + } + return next; +} + +function recordValue(value: unknown): JsonRecord | null { + return typeof value === "object" && value !== null && !Array.isArray(value) ? value as JsonRecord : null; +} + +const eventTypes = new Set(["backend_status", "assistant_message", "tool_call", "command_output", "diff", "error", "terminal_status"]); +const terminalStatuses = new Set(["completed", "failed", "blocked", "cancelled"]); +const failureKinds = new Set([ + "auth-missing", + "auth-failed", + "schema-invalid", + "tenant-policy-denied", + "secret-unavailable", + "prompt-unavailable", + "prompt-too-large", + "required-skill-unavailable", + "skill-unavailable", + "runner-lease-conflict", + "backend-failed", + "backend-protocol-error", + "backend-spawn-failed", + "backend-json-parse-error", + "backend-response-invalid", + "thread-resume-failed", + "backend-timeout", + "provider-auth-failed", + "provider-rate-limited", + "provider-stream-disconnected", + "provider-http-error", + "provider-invalid-tool-call", + "provider-compact-unsupported", + "provider-unavailable", + "infra-failed", + "session-store-evicted", + "cancelled", +]); + +function eventTypeValue(value: JsonValue | undefined): EventType | null { + return typeof value === "string" && eventTypes.has(value) ? value as EventType : null; +} + +function terminalStatusValue(value: JsonValue | undefined): TerminalStatus | null { + return typeof value === "string" && terminalStatuses.has(value) ? value as TerminalStatus : null; +} + +function failureKindValue(value: JsonValue | undefined): FailureKind | null | undefined { + if (value === null) return null; + if (typeof value === "string" && failureKinds.has(value)) return value as FailureKind; + return undefined; +} + +function nullableString(value: JsonValue | undefined): string | null { + return typeof value === "string" ? value : null; +} + async function kubectlRun(kubectlCommand: string, args: string[]): Promise<{ code: number | null; signal: NodeJS.Signals | null; stdout: string; stderr: string }> { const child = spawn(kubectlCommand, args, { stdio: ["ignore", "pipe", "pipe"] }); let stdout = ""; diff --git a/src/runner/run-once.ts b/src/runner/run-once.ts index ada8911..8f4c478 100644 --- a/src/runner/run-once.ts +++ b/src/runner/run-once.ts @@ -6,6 +6,7 @@ import { materializeResourceBundle } from "./resource-bundle.js"; import type { BackendEvent, BackendProfile, CommandRecord, FailureKind, InitialPromptAssembly, JsonRecord, RunRecord, RunnerRecord, TerminalStatus } from "../common/types.js"; import { AgentRunError } from "../common/errors.js"; import { smokeBundledWorkReadyCapabilities, smokeImageWorkReadyCapabilities } from "../common/work-ready.js"; +import { stableHash } from "../common/validation.js"; export interface RunnerOnceOptions extends BackendAdapterOptions { managerUrl: string; @@ -41,10 +42,25 @@ interface RunnerFailure { details?: JsonRecord | null; } +interface CommandTerminalReport { + terminalStatus: TerminalStatus; + failureKind: FailureKind | null; + failureMessage: string | null; + threadId?: string; + turnId?: string; +} + interface RunnerLogSink { write(label: string, payload: JsonRecord): Promise; } +class TerminalOutboxReportError extends Error { + constructor(readonly cause: unknown) { + super(`terminal report failed after durable outbox write: ${errorMessage(cause)}`); + this.name = "TerminalOutboxReportError"; + } +} + export async function runOnce(options: RunnerOnceOptions): Promise { const runnerLog = await createRunnerLogSink(options.logPath); await runnerLog.write("runner.starting", { runId: options.runId, commandId: options.commandId ?? null, runnerJobId: options.runnerJobId ?? null, valuesPrinted: false }); @@ -114,7 +130,7 @@ export async function runOnce(options: RunnerOnceOptions): Promise { firstPoll = false; const command = commandsResponse.selected; if (!command) { - await failPendingSteerWithoutActiveTurn(api, options.runId, commandsResponse.items, runner.id, attemptId); + await failPendingSteerWithoutActiveTurn(api, options.runId, commandsResponse.items, runner.id, attemptId, runnerLog, options.runnerJobId); if (options.oneShot === true) return noPendingResult(runner, claimed, commandResults, commandsResponse.items.length, "one-shot-no-pending"); if (Date.now() - idleSince >= idleTimeoutMs) return noPendingResult(runner, claimed, commandResults, commandsResponse.items.length, "idle-timeout"); await sleep(pollIntervalMs); @@ -143,7 +159,7 @@ export async function runOnce(options: RunnerOnceOptions): Promise { } const result = materializationFailure - ? await reportCommandFailure(api, options.runId, command.id, runner, attemptId, materializationFailure, "runner:resource-bundle", { terminalRun: true }) + ? await reportCommandFailure(api, options.runId, command.id, runner, attemptId, materializationFailure, "runner:resource-bundle", runnerLog, { terminalRun: true, ...(options.runnerJobId ? { runnerJobId: options.runnerJobId } : {}) }) : await executeCommand(api, withResourceAssembly(options, resourceEnv, initialPrompt), command, runner, attemptId, workspacePath, backendSession ?? (backendSession = createBackendSession(currentRun, withResourceAssembly(options, resourceEnv, initialPrompt))), runnerLog); commandResults.push(result); idleSince = Date.now(); @@ -227,7 +243,7 @@ function pathDelimiter(): string { async function executeCommand(api: RunnerManagerApi, options: RunnerOnceOptions, command: CommandRecord, runner: RunnerRecord, attemptId: string, workspacePath: string | undefined, backendSession: BackendSession | null, runnerLog: RunnerLogSink): Promise { await api.ackCommand(command.id); const acked = await api.getCommand(options.runId, command.id); - if (acked.state === "cancelled") return await reportCancelled(api, options.runId, command.id, runner, attemptId, "command cancelled before backend start"); + if (acked.state === "cancelled") return await reportCancelled(api, options.runId, command.id, runner, attemptId, "command cancelled before backend start", runnerLog, options.runnerJobId); await assertNotCancelled(api, options.runId, command.id); await api.appendEvent(options.runId, { type: "backend_status", payload: { phase: "backend-turn-started", commandId: command.id, attemptId, runnerId: runner.id, backendProfile: options.backendProfile ?? null, workspaceReady: Boolean(workspacePath) } }); await runnerLog.write("command.started", { runId: options.runId, commandId: command.id, runnerId: runner.id, runnerJobId: options.runnerJobId ?? null, attemptId, backendProfile: options.backendProfile ?? null, valuesPrinted: false }); @@ -235,6 +251,7 @@ async function executeCommand(api: RunnerManagerApi, options: RunnerOnceOptions, const stopCancelWatch = watchCancellation(api, options.runId, command.id, abortController); const backendProgress = startBackendProgress(); let stopSteerWatch: (() => void) | undefined; + const terminalOutboxEvents: BackendEvent[] = []; try { const latestRun = await api.getRun(options.runId); const backendOptions = { @@ -254,10 +271,14 @@ async function executeCommand(api: RunnerManagerApi, options: RunnerOnceOptions, }, onEvent: async (event: BackendEvent) => { await runnerLog.write("backend.event", runnerLogEventSummary(event, options.runId, command.id, attemptId, runner.id, options.runnerJobId ?? null)); + if (shouldKeepTerminalOutboxEvent(event)) { + terminalOutboxEvents.push(event); + return; + } await appendBestEffort(api, options.runId, annotateCommandEvent(event, command.id, attemptId, runner.id)); }, onActiveTurn: (control: BackendActiveTurnControl) => { - stopSteerWatch = startActiveTurnCommandWatch(api, options.runId, command.id, attemptId, runner.id, control, options.pollIntervalMs); + stopSteerWatch = startActiveTurnCommandWatch(api, options.runId, command.id, attemptId, runner.id, control, options.pollIntervalMs, runnerLog, options.runnerJobId); return () => { stopSteerWatch?.(); stopSteerWatch = undefined; @@ -265,16 +286,26 @@ async function executeCommand(api: RunnerManagerApi, options: RunnerOnceOptions, }, }; const result = backendSession ? await backendSession.runTurn(latestRun, command, backendOptions) : await runBackendTurn(latestRun, command, backendOptions); - for (const event of result.events) { - await api.appendEvent(options.runId, annotateCommandEvent(event, command.id, attemptId, runner.id)); - } - await api.reportCommandStatus(command.id, { terminalStatus: result.terminalStatus, failureKind: result.failureKind, failureMessage: result.failureMessage, ...(result.threadId ? { threadId: result.threadId } : {}), ...(result.turnId ? { turnId: result.turnId } : {}) }); + for (const event of result.events) if (shouldKeepTerminalOutboxEvent(event)) terminalOutboxEvents.push(event); + const report: CommandTerminalReport = { terminalStatus: result.terminalStatus, failureKind: result.failureKind, failureMessage: result.failureMessage, ...(result.threadId ? { threadId: result.threadId } : {}), ...(result.turnId ? { turnId: result.turnId } : {}) }; + await reportTerminalCommand(api, { + runId: options.runId, + commandId: command.id, + runnerId: runner.id, + runnerJobId: options.runnerJobId ?? null, + attemptId, + report, + events: terminalOutboxEvents, + phase: "runner:execute", + runnerLog, + }); await runnerLog.write("command.terminal", { runId: options.runId, commandId: command.id, runnerId: runner.id, runnerJobId: options.runnerJobId ?? null, attemptId, terminalStatus: result.terminalStatus, failureKind: result.failureKind, valuesPrinted: false }); return { commandId: command.id, terminalStatus: result.terminalStatus, failureKind: result.failureKind } as CommandExecutionResult; } catch (error) { + if (error instanceof TerminalOutboxReportError) throw error; const failureKind = failureKindFromError(error); const failure = { failureKind, terminalStatus: terminalStatusForFailure(failureKind), message: errorMessage(error) }; - return await reportCommandFailure(api, options.runId, command.id, runner, attemptId, failure, "runner:execute"); + return await reportCommandFailure(api, options.runId, command.id, runner, attemptId, failure, "runner:execute", runnerLog, { ...(options.runnerJobId ? { runnerJobId: options.runnerJobId } : {}), events: terminalOutboxEvents }); } finally { stopSteerWatch?.(); const progressSummary = backendProgress.stop(); @@ -283,7 +314,7 @@ async function executeCommand(api: RunnerManagerApi, options: RunnerOnceOptions, } } -function startActiveTurnCommandWatch(api: RunnerManagerApi, runId: string, targetCommandId: string, attemptId: string, runnerId: string, control: BackendActiveTurnControl, pollIntervalMs: number | undefined): () => void { +function startActiveTurnCommandWatch(api: RunnerManagerApi, runId: string, targetCommandId: string, attemptId: string, runnerId: string, control: BackendActiveTurnControl, pollIntervalMs: number | undefined, runnerLog: RunnerLogSink, runnerJobId: string | undefined): () => void { let stopped = false; let polling = false; const seen = new Set(); @@ -296,8 +327,8 @@ function startActiveTurnCommandWatch(api: RunnerManagerApi, runId: string, targe const pendingControlCommands = commands.filter((item) => (item.type === "steer" || item.type === "interrupt") && item.state === "pending" && !seen.has(item.id)); for (const controlCommand of pendingControlCommands) { seen.add(controlCommand.id); - if (controlCommand.type === "interrupt") await handleInterruptCommand(api, runId, controlCommand, targetCommandId, attemptId, runnerId, control); - else await handleSteerCommand(api, runId, controlCommand, targetCommandId, attemptId, runnerId, control); + if (controlCommand.type === "interrupt") await handleInterruptCommand(api, runId, controlCommand, targetCommandId, attemptId, runnerId, control, runnerLog, runnerJobId); + else await handleSteerCommand(api, runId, controlCommand, targetCommandId, attemptId, runnerId, control, runnerLog, runnerJobId); } } catch { // The active backend turn remains authoritative; missed control commands stay pending for the next poll. @@ -313,32 +344,32 @@ function startActiveTurnCommandWatch(api: RunnerManagerApi, runId: string, targe }; } -async function handleSteerCommand(api: RunnerManagerApi, runId: string, command: CommandRecord, targetCommandId: string, attemptId: string, runnerId: string, control: BackendActiveTurnControl): Promise { +async function handleSteerCommand(api: RunnerManagerApi, runId: string, command: CommandRecord, targetCommandId: string, attemptId: string, runnerId: string, control: BackendActiveTurnControl, runnerLog: RunnerLogSink, runnerJobId: string | undefined): Promise { const acked = await api.ackCommand(command.id); if (acked.state === "cancelled") { - await api.reportCommandStatus(command.id, { terminalStatus: "cancelled", failureKind: "cancelled", failureMessage: "steer command cancelled before delivery", threadId: control.threadId, turnId: control.turnId }); + await reportTerminalCommand(api, { runId, commandId: command.id, runnerId, runnerJobId: runnerJobId ?? null, attemptId, report: { terminalStatus: "cancelled", failureKind: "cancelled", failureMessage: "steer command cancelled before delivery", threadId: control.threadId, turnId: control.turnId }, phase: "runner:steer:cancelled", runnerLog }); return; } const prompt = steerPrompt(command.payload); if (!prompt) { - await reportNonTerminalCommandFailure(api, runId, command.id, attemptId, runnerId, { terminalStatus: "blocked", failureKind: "schema-invalid", message: "steer command payload requires a non-empty prompt, message, or text" }, "runner:steer:payload", control); + await reportNonTerminalCommandFailure(api, runId, command.id, attemptId, runnerId, { terminalStatus: "blocked", failureKind: "schema-invalid", message: "steer command payload requires a non-empty prompt, message, or text" }, "runner:steer:payload", runnerLog, runnerJobId, control); return; } await appendBestEffort(api, runId, { type: "backend_status", payload: { phase: "steer-command-acknowledged", commandId: command.id, commandType: "steer", targetCommandId, deliveryState: "acknowledged-by-runner", backendAccepted: false, targetEffect: "not-yet-observed", attemptId, runnerId, threadId: control.threadId, turnId: control.turnId } }); try { await control.steer(prompt); await appendBestEffort(api, runId, { type: "backend_status", payload: { phase: "turn/steer:completed", commandId: command.id, commandType: "steer", targetCommandId, deliveryState: "forwarded-to-backend", backendAccepted: true, targetEffect: "not-guaranteed", semantics: "turn/steer RPC returned; target turn liveness must be read from the target command result", attemptId, runnerId, threadId: control.threadId, turnId: control.turnId } }); - await api.reportCommandStatus(command.id, { terminalStatus: "completed", failureKind: null, failureMessage: null, threadId: control.threadId, turnId: control.turnId }); + await reportTerminalCommand(api, { runId, commandId: command.id, runnerId, runnerJobId: runnerJobId ?? null, attemptId, report: { terminalStatus: "completed", failureKind: null, failureMessage: null, threadId: control.threadId, turnId: control.turnId }, phase: "runner:steer", runnerLog }); } catch (error) { const failureKind = failureKindFromError(error); - await reportNonTerminalCommandFailure(api, runId, command.id, attemptId, runnerId, { terminalStatus: terminalStatusForFailure(failureKind), failureKind, message: errorMessage(error) }, "runner:steer", control); + await reportNonTerminalCommandFailure(api, runId, command.id, attemptId, runnerId, { terminalStatus: terminalStatusForFailure(failureKind), failureKind, message: errorMessage(error) }, "runner:steer", runnerLog, runnerJobId, control); } } -async function handleInterruptCommand(api: RunnerManagerApi, runId: string, command: CommandRecord, targetCommandId: string, attemptId: string, runnerId: string, control: BackendActiveTurnControl): Promise { +async function handleInterruptCommand(api: RunnerManagerApi, runId: string, command: CommandRecord, targetCommandId: string, attemptId: string, runnerId: string, control: BackendActiveTurnControl, runnerLog: RunnerLogSink, runnerJobId: string | undefined): Promise { const acked = await api.ackCommand(command.id); if (acked.state === "cancelled") { - await api.reportCommandStatus(command.id, { terminalStatus: "cancelled", failureKind: "cancelled", failureMessage: "interrupt command cancelled before delivery", threadId: control.threadId, turnId: control.turnId }); + await reportTerminalCommand(api, { runId, commandId: command.id, runnerId, runnerJobId: runnerJobId ?? null, attemptId, report: { terminalStatus: "cancelled", failureKind: "cancelled", failureMessage: "interrupt command cancelled before delivery", threadId: control.threadId, turnId: control.turnId }, phase: "runner:interrupt:cancelled", runnerLog }); return; } const reason = interruptReason(command.payload); @@ -346,24 +377,24 @@ async function handleInterruptCommand(api: RunnerManagerApi, runId: string, comm try { await control.interrupt(); await appendBestEffort(api, runId, { type: "backend_status", payload: { phase: "turn/interrupt:completed", commandId: command.id, commandType: "interrupt", targetCommandId, attemptId, runnerId, threadId: control.threadId, turnId: control.turnId, reason } }); - await api.reportCommandStatus(command.id, { terminalStatus: "completed", failureKind: null, failureMessage: null, threadId: control.threadId, turnId: control.turnId }); + await reportTerminalCommand(api, { runId, commandId: command.id, runnerId, runnerJobId: runnerJobId ?? null, attemptId, report: { terminalStatus: "completed", failureKind: null, failureMessage: null, threadId: control.threadId, turnId: control.turnId }, phase: "runner:interrupt", runnerLog }); } catch (error) { const failureKind = failureKindFromError(error); - await reportNonTerminalCommandFailure(api, runId, command.id, attemptId, runnerId, { terminalStatus: terminalStatusForFailure(failureKind), failureKind, message: errorMessage(error) }, "runner:interrupt", control); + await reportNonTerminalCommandFailure(api, runId, command.id, attemptId, runnerId, { terminalStatus: terminalStatusForFailure(failureKind), failureKind, message: errorMessage(error) }, "runner:interrupt", runnerLog, runnerJobId, control); } } -async function failPendingSteerWithoutActiveTurn(api: RunnerManagerApi, runId: string, commands: CommandRecord[], runnerId: string, attemptId: string): Promise { +async function failPendingSteerWithoutActiveTurn(api: RunnerManagerApi, runId: string, commands: CommandRecord[], runnerId: string, attemptId: string, runnerLog: RunnerLogSink, runnerJobId: string | undefined): Promise { for (const command of commands.filter((item) => (item.type === "steer" || item.type === "interrupt") && item.state === "pending")) { const acked = await api.ackCommand(command.id); if (acked.state === "cancelled") continue; - await reportNonTerminalCommandFailure(api, runId, command.id, attemptId, runnerId, { terminalStatus: "blocked", failureKind: "schema-invalid", message: `${command.type} command requires an active turn` }, `runner:${command.type}:no-active-turn`); + await reportNonTerminalCommandFailure(api, runId, command.id, attemptId, runnerId, { terminalStatus: "blocked", failureKind: "schema-invalid", message: `${command.type} command requires an active turn` }, `runner:${command.type}:no-active-turn`, runnerLog, runnerJobId); } } -async function reportNonTerminalCommandFailure(api: RunnerManagerApi, runId: string, commandId: string, attemptId: string, runnerId: string, failure: { terminalStatus: TerminalStatus; failureKind: FailureKind; message: string }, phase: string, control?: BackendActiveTurnControl): Promise { - await appendBestEffort(api, runId, { type: "error", payload: { failureKind: failure.failureKind, message: failure.message, phase, commandId, attemptId, runnerId, ...(control ? { threadId: control.threadId, turnId: control.turnId } : {}) } }); - await api.reportCommandStatus(commandId, { terminalStatus: failure.terminalStatus, failureKind: failure.failureKind, failureMessage: failure.message, ...(control ? { threadId: control.threadId, turnId: control.turnId } : {}) }); +async function reportNonTerminalCommandFailure(api: RunnerManagerApi, runId: string, commandId: string, attemptId: string, runnerId: string, failure: { terminalStatus: TerminalStatus; failureKind: FailureKind; message: string }, phase: string, runnerLog: RunnerLogSink, runnerJobId: string | undefined, control?: BackendActiveTurnControl): Promise { + const event: BackendEvent = { type: "error", payload: { failureKind: failure.failureKind, message: failure.message, phase, commandId, attemptId, runnerId, ...(control ? { threadId: control.threadId, turnId: control.turnId } : {}) } }; + await reportTerminalCommand(api, { runId, commandId, runnerId, runnerJobId: runnerJobId ?? null, attemptId, report: { terminalStatus: failure.terminalStatus, failureKind: failure.failureKind, failureMessage: failure.message, ...(control ? { threadId: control.threadId, turnId: control.turnId } : {}) }, events: [event], phase, runnerLog }); } function failureDetailsFromError(error: unknown): JsonRecord | null { @@ -529,20 +560,80 @@ function annotateCommandEvent(event: BackendEvent, commandId: string, attemptId: return { ...event, payload: { ...event.payload, commandId, attemptId, runnerId } }; } -async function reportCommandFailure(api: RunnerManagerApi, runId: string, commandId: string, runner: RunnerRecord, attemptId: string, failure: RunnerFailure, phase: string, options: { terminalRun?: boolean } = {}): Promise { +async function reportTerminalCommand(api: RunnerManagerApi, input: { runId: string; commandId: string; runnerId: string; runnerJobId: string | null; attemptId: string; report: CommandTerminalReport; events?: BackendEvent[]; phase: string; runnerLog: RunnerLogSink; terminalRun?: boolean }): Promise { + const outboxKey = terminalOutboxKey(input); + const events = (input.events ?? []).map((event, index) => annotateTerminalOutboxEvent(event, input.commandId, input.attemptId, input.runnerId, outboxKey, index)); + const outboxPayload: JsonRecord = { + schema: "agentrun-terminal-outbox-v1", + outboxKey, + phase: input.phase, + runId: input.runId, + commandId: input.commandId, + runnerId: input.runnerId, + runnerJobId: input.runnerJobId, + attemptId: input.attemptId, + report: terminalReportJson(input.report), + terminalRun: input.terminalRun === true, + events: events.map((event) => ({ type: event.type, payload: event.payload })), + eventCount: events.length, + valuesPrinted: false, + }; + await writeTerminalOutbox(input.runnerLog, outboxPayload); + for (const event of events) await appendBestEffort(api, input.runId, event); + try { + await api.reportCommandStatus(input.commandId, input.report); + if (input.terminalRun === true) await api.reportStatus(input.runId, input.report); + } catch (error) { + throw new TerminalOutboxReportError(error); + } + await input.runnerLog.write("terminal.outbox.ack", { runId: input.runId, commandId: input.commandId, runnerId: input.runnerId, runnerJobId: input.runnerJobId, attemptId: input.attemptId, outboxKey, valuesPrinted: false }); +} + +function terminalOutboxKey(input: { runId: string; commandId: string; runnerId: string; attemptId: string; phase: string; report: CommandTerminalReport; terminalRun?: boolean }): string { + return stableHash({ schema: "agentrun-terminal-outbox-v1", runId: input.runId, commandId: input.commandId, runnerId: input.runnerId, attemptId: input.attemptId, phase: input.phase, terminalRun: input.terminalRun === true, report: terminalReportJson(input.report) }); +} + +function terminalReportJson(report: CommandTerminalReport): JsonRecord { + return { terminalStatus: report.terminalStatus, failureKind: report.failureKind, failureMessage: report.failureMessage, threadId: report.threadId ?? null, turnId: report.turnId ?? null }; +} + +function annotateTerminalOutboxEvent(event: BackendEvent, commandId: string, attemptId: string, runnerId: string, outboxKey: string, index: number): BackendEvent { + const annotated = annotateCommandEvent(event, commandId, attemptId, runnerId); + return { ...annotated, payload: { ...annotated.payload, terminalOutboxKey: outboxKey, terminalOutboxIndex: index } }; +} + +async function writeTerminalOutbox(runnerLog: RunnerLogSink, payload: JsonRecord): Promise { + await runnerLog.write("terminal.outbox", payload); + try { + process.stdout.write(JSON.stringify({ at: new Date().toISOString(), label: "terminal.outbox", ...payload, valuesPrinted: false }) + "\n"); + } catch { + // Kubernetes stdout is the recovery path; if it is unavailable, the file log remains diagnostic evidence. + } +} + +function shouldKeepTerminalOutboxEvent(event: BackendEvent): boolean { + if (event.type === "terminal_status") return true; + if (event.type !== "assistant_message") return false; + return event.payload.final === true || typeof event.payload.text === "string"; +} + +async function reportCommandFailure(api: RunnerManagerApi, runId: string, commandId: string, runner: RunnerRecord, attemptId: string, failure: RunnerFailure, phase: string, runnerLog: RunnerLogSink, options: { terminalRun?: boolean; runnerJobId?: string; events?: BackendEvent[] } = {}): Promise { const details = failure.details ? { details: failure.details } : {}; - await api.appendEvent(runId, { type: "error", payload: { failureKind: failure.failureKind, message: failure.message, phase, commandId, attemptId, runnerId: runner.id, ...details } }); - await api.appendEvent(runId, { type: "terminal_status", payload: { terminalStatus: failure.terminalStatus, failureKind: failure.failureKind, message: failure.message, commandId, attemptId, runnerId: runner.id, ...details } }); - await api.reportCommandStatus(commandId, { terminalStatus: failure.terminalStatus, failureKind: failure.failureKind, failureMessage: failure.message }); - if (options.terminalRun === true) await api.reportStatus(runId, { terminalStatus: failure.terminalStatus, failureKind: failure.failureKind, failureMessage: failure.message }); + const events: BackendEvent[] = [ + ...(options.events ?? []), + { type: "error", payload: { failureKind: failure.failureKind, message: failure.message, phase, commandId, attemptId, runnerId: runner.id, ...details } }, + { type: "terminal_status", payload: { terminalStatus: failure.terminalStatus, failureKind: failure.failureKind, message: failure.message, commandId, attemptId, runnerId: runner.id, ...details } }, + ]; + await reportTerminalCommand(api, { runId, commandId, runnerId: runner.id, runnerJobId: options.runnerJobId ?? null, attemptId, report: { terminalStatus: failure.terminalStatus, failureKind: failure.failureKind, failureMessage: failure.message }, events, phase, runnerLog, terminalRun: options.terminalRun === true }); return { commandId, terminalStatus: failure.terminalStatus, failureKind: failure.failureKind } as CommandExecutionResult; } -async function reportCancelled(api: RunnerManagerApi, runId: string, commandId: string, runner: RunnerRecord, attemptId: string, message: string): Promise { - await api.reportCommandStatus(commandId, { terminalStatus: "cancelled", failureKind: "cancelled", failureMessage: message }); - await api.reportStatus(runId, { terminalStatus: "cancelled", failureKind: "cancelled", failureMessage: message }); - await api.appendEvent(runId, { type: "backend_status", payload: { phase: "turn-cancelled", commandId, attemptId, runnerId: runner.id, failureKind: "cancelled", message } }); - await api.appendEvent(runId, { type: "terminal_status", payload: { terminalStatus: "cancelled", failureKind: "cancelled", message, commandId, attemptId, runnerId: runner.id } }); +async function reportCancelled(api: RunnerManagerApi, runId: string, commandId: string, runner: RunnerRecord, attemptId: string, message: string, runnerLog: RunnerLogSink, runnerJobId: string | undefined): Promise { + const events: BackendEvent[] = [ + { type: "backend_status", payload: { phase: "turn-cancelled", commandId, attemptId, runnerId: runner.id, failureKind: "cancelled", message } }, + { type: "terminal_status", payload: { terminalStatus: "cancelled", failureKind: "cancelled", message, commandId, attemptId, runnerId: runner.id } }, + ]; + await reportTerminalCommand(api, { runId, commandId, runnerId: runner.id, runnerJobId: runnerJobId ?? null, attemptId, report: { terminalStatus: "cancelled", failureKind: "cancelled", failureMessage: message }, events, phase: "runner:cancelled", runnerLog, terminalRun: true }); return { commandId, terminalStatus: "cancelled", failureKind: "cancelled" }; }