fix: recover terminal runner outbox

This commit is contained in:
lyon
2026-06-20 16:12:25 +08:00
parent 11ac140e90
commit 909e8b31f8
2 changed files with 421 additions and 38 deletions
+294 -2
View File
@@ -1,7 +1,7 @@
import { spawn } from "node:child_process";
import { AgentRunError } from "../common/errors.js";
import { redactJson, redactText } from "../common/redaction.js";
import type { CommandRecord, FailureKind, JsonRecord, JsonValue, RunnerJobRecord, TerminalStatus } from "../common/types.js";
import type { BackendEvent, CommandRecord, EventType, FailureKind, JsonRecord, JsonValue, RunnerJobRecord, TerminalStatus } from "../common/types.js";
import { nowIso, stableHash } from "../common/validation.js";
import type { AgentRunStore } from "./store.js";
import { isTerminalCommandState, isTerminalRunStatus } from "./store.js";
@@ -37,6 +37,35 @@ interface K8sList {
items?: K8sObject[];
}
interface CommandTerminalReport {
terminalStatus: TerminalStatus;
failureKind: FailureKind | null;
failureMessage: string | null;
threadId?: string;
turnId?: string;
}
interface TerminalOutboxRecord {
outboxKey: string;
phase: string | null;
runId: string;
commandId: string;
runnerId: string;
runnerJobId: string | null;
attemptId: string;
report: CommandTerminalReport;
terminalRun: boolean;
events: BackendEvent[];
}
interface TerminalOutboxSearch {
outbox: TerminalOutboxRecord | null;
lineCount: number;
candidateCount: number;
parseFailedCount: number;
stdoutHash: string;
}
export async function reconcileRunnerJobsOnce(input: RunnerReconcilerOptions): Promise<JsonRecord> {
const limit = clampLimit(input.limit ?? 20);
const jobs = await input.store.listRunnerJobsForReconciliation(limit);
@@ -45,7 +74,9 @@ export async function reconcileRunnerJobsOnce(input: RunnerReconcilerOptions): P
let runClosureCount = 0;
for (const job of jobs) {
const observation = await observeRunnerJob(job, { ...(input.namespace ? { namespace: input.namespace } : {}), ...(input.kubectlCommand ? { kubectlCommand: input.kubectlCommand } : {}) });
let observation = await observeRunnerJob(job, { ...(input.namespace ? { namespace: input.namespace } : {}), ...(input.kubectlCommand ? { kubectlCommand: input.kubectlCommand } : {}) });
const terminalOutbox = await recoverTerminalOutboxIfNeeded(input.store, job, observation, { ...(input.namespace ? { namespace: input.namespace } : {}), ...(input.kubectlCommand ? { kubectlCommand: input.kubectlCommand } : {}) });
if (terminalOutbox) observation = mergeTerminalOutboxObservation(observation, terminalOutbox);
await input.store.updateRunnerJobResult(job.id, { observation });
if (stringValue(observation.category) === "runner-job-observe-failed") observeFailedCount++;
const runClosure = await closeOpenRunWhenCommandTerminal(input.store, job);
@@ -59,6 +90,7 @@ export async function reconcileRunnerJobsOnce(input: RunnerReconcilerOptions): P
observedRunnerPhase: stringValue(observation.observedRunnerPhase) ?? "unknown",
terminalReportState: stringValue(observation.terminalReportState) ?? "unknown",
runReportState: stringValue(observation.runReportState) ?? "unknown",
terminalOutboxState: terminalOutboxState(observation),
runClosure,
valuesPrinted: false,
});
@@ -77,6 +109,114 @@ export async function reconcileRunnerJobsOnce(input: RunnerReconcilerOptions): P
};
}
async function recoverTerminalOutboxIfNeeded(store: AgentRunStore, job: RunnerJobRecord, observation: JsonRecord, input: { namespace?: string; kubectlCommand?: string }): Promise<JsonRecord | null> {
const phase = stringValue(observation.observedRunnerPhase);
if (phase !== "k8s:succeeded" && phase !== "k8s:failed") return null;
try {
const command = await store.getCommand(job.commandId);
if (isTerminalCommandState(command.state)) {
return { state: "command-terminal", terminalReportState: "terminal-command-already-recorded", commandState: command.state, valuesPrinted: false };
}
const namespace = input.namespace ?? job.namespace;
const kubectlCommand = input.kubectlCommand ?? "kubectl";
const logs = await getRunnerJobLogs(kubectlCommand, namespace, job.jobName);
if (logs.code !== 0) {
return {
state: "logs-unavailable",
terminalReportState: "terminal-outbox-unavailable",
runReportState: "not-updated",
failureKind: "infra-failed",
failureMessage: redactText(logs.stderr || `kubectl logs failed with code ${logs.code}`).slice(0, 300),
valuesPrinted: false,
};
}
const search = findLatestTerminalOutbox(logs.stdout, job);
if (!search.outbox) {
return {
state: "not-found",
terminalReportState: "terminal-outbox-missing",
runReportState: "not-updated",
lineCount: search.lineCount,
candidateCount: search.candidateCount,
parseFailedCount: search.parseFailedCount,
stdoutHash: search.stdoutHash,
valuesPrinted: false,
};
}
const eventReplay = await replayTerminalOutboxEvents(store, search.outbox);
const nextCommand = await store.finishCommand(job.commandId, search.outbox.report);
const runRecovery = await recoverTerminalOutboxRun(store, search.outbox);
return {
state: "recovered",
terminalReportState: "terminal-outbox-replayed",
runReportState: stringValue(runRecovery.state) ?? "not-updated",
outboxKey: search.outbox.outboxKey,
phase: search.outbox.phase,
commandState: nextCommand.state,
terminalStatus: search.outbox.report.terminalStatus,
failureKind: search.outbox.report.failureKind,
failureMessageHash: search.outbox.report.failureMessage ? stableHash({ message: search.outbox.report.failureMessage }) : null,
terminalRun: search.outbox.terminalRun,
lineCount: search.lineCount,
candidateCount: search.candidateCount,
parseFailedCount: search.parseFailedCount,
eventReplay,
runRecovery,
valuesPrinted: false,
};
} catch (error) {
return {
state: "recovery-failed",
terminalReportState: "terminal-outbox-recovery-failed",
runReportState: "not-updated",
failureKind: "infra-failed",
failureMessage: error instanceof Error ? redactText(error.message).slice(0, 300) : "unknown terminal outbox recovery failure",
valuesPrinted: false,
};
}
}
async function replayTerminalOutboxEvents(store: AgentRunStore, outbox: TerminalOutboxRecord): Promise<JsonRecord> {
const existing = await store.listEvents(outbox.runId, 0, 1_000);
const existingKeys = new Set(existing.map((event) => eventContentKey({ type: event.type, payload: event.payload })));
let appendedCount = 0;
let skippedCount = 0;
for (const event of outbox.events) {
const key = eventContentKey(event);
if (existingKeys.has(key)) {
skippedCount++;
continue;
}
await store.appendEvent(outbox.runId, event.type, event.payload);
existingKeys.add(key);
appendedCount++;
}
return { eventCount: outbox.events.length, appendedCount, skippedCount, valuesPrinted: false };
}
async function recoverTerminalOutboxRun(store: AgentRunStore, outbox: TerminalOutboxRecord): Promise<JsonRecord> {
if (outbox.terminalRun !== true) return { state: "not-requested", valuesPrinted: false };
const run = await store.getRun(outbox.runId);
if (isTerminalRunStatus(run.status)) return { state: "already-terminal", runStatus: run.status, valuesPrinted: false };
const next = await store.finishRun(outbox.runId, outbox.report);
return { state: "terminal-outbox-replayed", runStatus: next.status, terminalStatus: outbox.report.terminalStatus, valuesPrinted: false };
}
function mergeTerminalOutboxObservation(observation: JsonRecord, terminalOutbox: JsonRecord): JsonRecord {
return {
...observation,
terminalOutbox,
terminalReportState: stringValue(terminalOutbox.terminalReportState) ?? stringValue(observation.terminalReportState) ?? "unknown",
runReportState: stringValue(terminalOutbox.runReportState) ?? stringValue(observation.runReportState) ?? "unknown",
evidenceLevel: terminalOutbox.state === "recovered" ? "high" : observation.evidenceLevel ?? "medium",
valuesPrinted: false,
};
}
function terminalOutboxState(observation: JsonRecord): string {
return stringValue(recordValue(observation.terminalOutbox)?.state) ?? "not-checked";
}
export function startRunnerJobReconciler(input: RunnerReconcilerLoopOptions): () => void {
const intervalMs = Math.max(1_000, input.intervalMs);
let stopped = false;
@@ -256,6 +396,158 @@ async function getK8sList(kubectlCommand: string, resource: string, namespace: s
return Array.isArray(parsed.items) ? parsed.items.filter((item) => typeof item === "object" && item !== null && !Array.isArray(item)) : [];
}
async function getRunnerJobLogs(kubectlCommand: string, namespace: string, jobName: string): Promise<{ code: number | null; signal: NodeJS.Signals | null; stdout: string; stderr: string }> {
return await kubectlRun(kubectlCommand, ["logs", `job/${jobName}`, "-n", namespace, "--tail=2000"]);
}
function findLatestTerminalOutbox(stdout: string, job: RunnerJobRecord): TerminalOutboxSearch {
let outbox: TerminalOutboxRecord | null = null;
let lineCount = 0;
let candidateCount = 0;
let parseFailedCount = 0;
for (const line of stdout.split(/\r?\n/u)) {
if (line.trim().length === 0) continue;
lineCount++;
if (!line.includes("terminal.outbox")) continue;
let parsed: unknown;
try {
parsed = JSON.parse(line);
} catch {
parseFailedCount++;
continue;
}
const candidate = terminalOutboxFromRecord(parsed);
if (!candidate) continue;
candidateCount++;
if (matchesTerminalOutbox(candidate, job)) outbox = candidate;
}
return { outbox, lineCount, candidateCount, parseFailedCount, stdoutHash: stableHash({ stdoutTail: stdout.slice(-20_000), stdoutLength: stdout.length }) };
}
function terminalOutboxFromRecord(value: unknown): TerminalOutboxRecord | null {
const record = recordValue(value);
if (!record || record.label !== "terminal.outbox") return null;
const outboxKey = stringValue(record.outboxKey);
const runId = stringValue(record.runId);
const commandId = stringValue(record.commandId);
const runnerId = stringValue(record.runnerId);
const attemptId = stringValue(record.attemptId);
const reportRecord = recordValue(record.report);
if (!outboxKey || !runId || !commandId || !runnerId || !attemptId || !reportRecord) return null;
const report = terminalReportFromRecord(reportRecord);
if (!report) return null;
const events = Array.isArray(record.events) ? record.events.map(eventFromRecord).filter((event): event is BackendEvent => event !== null) : [];
return {
outboxKey,
phase: stringValue(record.phase),
runId,
commandId,
runnerId,
runnerJobId: stringValue(record.runnerJobId),
attemptId,
report,
terminalRun: record.terminalRun === true,
events,
};
}
function terminalReportFromRecord(record: JsonRecord): CommandTerminalReport | null {
const terminalStatus = terminalStatusValue(record.terminalStatus);
const failureKind = failureKindValue(record.failureKind);
if (!terminalStatus || failureKind === undefined) return null;
const threadId = stringValue(record.threadId);
const turnId = stringValue(record.turnId);
return {
terminalStatus,
failureKind,
failureMessage: nullableString(record.failureMessage),
...(threadId ? { threadId } : {}),
...(turnId ? { turnId } : {}),
};
}
function eventFromRecord(value: unknown): BackendEvent | null {
const record = recordValue(value);
if (!record) return null;
const type = eventTypeValue(record.type);
const payload = recordValue(record.payload);
if (!type || !payload) return null;
return { type, payload };
}
function matchesTerminalOutbox(outbox: TerminalOutboxRecord, job: RunnerJobRecord): boolean {
if (outbox.runId !== job.runId || outbox.commandId !== job.commandId) return false;
if (outbox.runnerJobId && outbox.runnerJobId !== job.id) return false;
return true;
}
function eventContentKey(event: BackendEvent): string {
return stableHash({ type: event.type, payload: stripTerminalOutboxEventPayload(event.payload) });
}
function stripTerminalOutboxEventPayload(payload: JsonRecord): JsonRecord {
const next: JsonRecord = {};
for (const [key, value] of Object.entries(payload)) {
if (key === "terminalOutboxKey" || key === "terminalOutboxIndex") continue;
next[key] = value;
}
return next;
}
function recordValue(value: unknown): JsonRecord | null {
return typeof value === "object" && value !== null && !Array.isArray(value) ? value as JsonRecord : null;
}
const eventTypes = new Set<string>(["backend_status", "assistant_message", "tool_call", "command_output", "diff", "error", "terminal_status"]);
const terminalStatuses = new Set<string>(["completed", "failed", "blocked", "cancelled"]);
const failureKinds = new Set<string>([
"auth-missing",
"auth-failed",
"schema-invalid",
"tenant-policy-denied",
"secret-unavailable",
"prompt-unavailable",
"prompt-too-large",
"required-skill-unavailable",
"skill-unavailable",
"runner-lease-conflict",
"backend-failed",
"backend-protocol-error",
"backend-spawn-failed",
"backend-json-parse-error",
"backend-response-invalid",
"thread-resume-failed",
"backend-timeout",
"provider-auth-failed",
"provider-rate-limited",
"provider-stream-disconnected",
"provider-http-error",
"provider-invalid-tool-call",
"provider-compact-unsupported",
"provider-unavailable",
"infra-failed",
"session-store-evicted",
"cancelled",
]);
function eventTypeValue(value: JsonValue | undefined): EventType | null {
return typeof value === "string" && eventTypes.has(value) ? value as EventType : null;
}
function terminalStatusValue(value: JsonValue | undefined): TerminalStatus | null {
return typeof value === "string" && terminalStatuses.has(value) ? value as TerminalStatus : null;
}
function failureKindValue(value: JsonValue | undefined): FailureKind | null | undefined {
if (value === null) return null;
if (typeof value === "string" && failureKinds.has(value)) return value as FailureKind;
return undefined;
}
function nullableString(value: JsonValue | undefined): string | null {
return typeof value === "string" ? value : null;
}
async function kubectlRun(kubectlCommand: string, args: string[]): Promise<{ code: number | null; signal: NodeJS.Signals | null; stdout: string; stderr: string }> {
const child = spawn(kubectlCommand, args, { stdio: ["ignore", "pipe", "pipe"] });
let stdout = "";
+127 -36
View File
@@ -6,6 +6,7 @@ import { materializeResourceBundle } from "./resource-bundle.js";
import type { BackendEvent, BackendProfile, CommandRecord, FailureKind, InitialPromptAssembly, JsonRecord, RunRecord, RunnerRecord, TerminalStatus } from "../common/types.js";
import { AgentRunError } from "../common/errors.js";
import { smokeBundledWorkReadyCapabilities, smokeImageWorkReadyCapabilities } from "../common/work-ready.js";
import { stableHash } from "../common/validation.js";
export interface RunnerOnceOptions extends BackendAdapterOptions {
managerUrl: string;
@@ -41,10 +42,25 @@ interface RunnerFailure {
details?: JsonRecord | null;
}
interface CommandTerminalReport {
terminalStatus: TerminalStatus;
failureKind: FailureKind | null;
failureMessage: string | null;
threadId?: string;
turnId?: string;
}
interface RunnerLogSink {
write(label: string, payload: JsonRecord): Promise<void>;
}
class TerminalOutboxReportError extends Error {
constructor(readonly cause: unknown) {
super(`terminal report failed after durable outbox write: ${errorMessage(cause)}`);
this.name = "TerminalOutboxReportError";
}
}
export async function runOnce(options: RunnerOnceOptions): Promise<JsonRecord> {
const runnerLog = await createRunnerLogSink(options.logPath);
await runnerLog.write("runner.starting", { runId: options.runId, commandId: options.commandId ?? null, runnerJobId: options.runnerJobId ?? null, valuesPrinted: false });
@@ -114,7 +130,7 @@ export async function runOnce(options: RunnerOnceOptions): Promise<JsonRecord> {
firstPoll = false;
const command = commandsResponse.selected;
if (!command) {
await failPendingSteerWithoutActiveTurn(api, options.runId, commandsResponse.items, runner.id, attemptId);
await failPendingSteerWithoutActiveTurn(api, options.runId, commandsResponse.items, runner.id, attemptId, runnerLog, options.runnerJobId);
if (options.oneShot === true) return noPendingResult(runner, claimed, commandResults, commandsResponse.items.length, "one-shot-no-pending");
if (Date.now() - idleSince >= idleTimeoutMs) return noPendingResult(runner, claimed, commandResults, commandsResponse.items.length, "idle-timeout");
await sleep(pollIntervalMs);
@@ -143,7 +159,7 @@ export async function runOnce(options: RunnerOnceOptions): Promise<JsonRecord> {
}
const result = materializationFailure
? await reportCommandFailure(api, options.runId, command.id, runner, attemptId, materializationFailure, "runner:resource-bundle", { terminalRun: true })
? await reportCommandFailure(api, options.runId, command.id, runner, attemptId, materializationFailure, "runner:resource-bundle", runnerLog, { terminalRun: true, ...(options.runnerJobId ? { runnerJobId: options.runnerJobId } : {}) })
: await executeCommand(api, withResourceAssembly(options, resourceEnv, initialPrompt), command, runner, attemptId, workspacePath, backendSession ?? (backendSession = createBackendSession(currentRun, withResourceAssembly(options, resourceEnv, initialPrompt))), runnerLog);
commandResults.push(result);
idleSince = Date.now();
@@ -227,7 +243,7 @@ function pathDelimiter(): string {
async function executeCommand(api: RunnerManagerApi, options: RunnerOnceOptions, command: CommandRecord, runner: RunnerRecord, attemptId: string, workspacePath: string | undefined, backendSession: BackendSession | null, runnerLog: RunnerLogSink): Promise<CommandExecutionResult> {
await api.ackCommand(command.id);
const acked = await api.getCommand(options.runId, command.id);
if (acked.state === "cancelled") return await reportCancelled(api, options.runId, command.id, runner, attemptId, "command cancelled before backend start");
if (acked.state === "cancelled") return await reportCancelled(api, options.runId, command.id, runner, attemptId, "command cancelled before backend start", runnerLog, options.runnerJobId);
await assertNotCancelled(api, options.runId, command.id);
await api.appendEvent(options.runId, { type: "backend_status", payload: { phase: "backend-turn-started", commandId: command.id, attemptId, runnerId: runner.id, backendProfile: options.backendProfile ?? null, workspaceReady: Boolean(workspacePath) } });
await runnerLog.write("command.started", { runId: options.runId, commandId: command.id, runnerId: runner.id, runnerJobId: options.runnerJobId ?? null, attemptId, backendProfile: options.backendProfile ?? null, valuesPrinted: false });
@@ -235,6 +251,7 @@ async function executeCommand(api: RunnerManagerApi, options: RunnerOnceOptions,
const stopCancelWatch = watchCancellation(api, options.runId, command.id, abortController);
const backendProgress = startBackendProgress();
let stopSteerWatch: (() => void) | undefined;
const terminalOutboxEvents: BackendEvent[] = [];
try {
const latestRun = await api.getRun(options.runId);
const backendOptions = {
@@ -254,10 +271,14 @@ async function executeCommand(api: RunnerManagerApi, options: RunnerOnceOptions,
},
onEvent: async (event: BackendEvent) => {
await runnerLog.write("backend.event", runnerLogEventSummary(event, options.runId, command.id, attemptId, runner.id, options.runnerJobId ?? null));
if (shouldKeepTerminalOutboxEvent(event)) {
terminalOutboxEvents.push(event);
return;
}
await appendBestEffort(api, options.runId, annotateCommandEvent(event, command.id, attemptId, runner.id));
},
onActiveTurn: (control: BackendActiveTurnControl) => {
stopSteerWatch = startActiveTurnCommandWatch(api, options.runId, command.id, attemptId, runner.id, control, options.pollIntervalMs);
stopSteerWatch = startActiveTurnCommandWatch(api, options.runId, command.id, attemptId, runner.id, control, options.pollIntervalMs, runnerLog, options.runnerJobId);
return () => {
stopSteerWatch?.();
stopSteerWatch = undefined;
@@ -265,16 +286,26 @@ async function executeCommand(api: RunnerManagerApi, options: RunnerOnceOptions,
},
};
const result = backendSession ? await backendSession.runTurn(latestRun, command, backendOptions) : await runBackendTurn(latestRun, command, backendOptions);
for (const event of result.events) {
await api.appendEvent(options.runId, annotateCommandEvent(event, command.id, attemptId, runner.id));
}
await api.reportCommandStatus(command.id, { terminalStatus: result.terminalStatus, failureKind: result.failureKind, failureMessage: result.failureMessage, ...(result.threadId ? { threadId: result.threadId } : {}), ...(result.turnId ? { turnId: result.turnId } : {}) });
for (const event of result.events) if (shouldKeepTerminalOutboxEvent(event)) terminalOutboxEvents.push(event);
const report: CommandTerminalReport = { terminalStatus: result.terminalStatus, failureKind: result.failureKind, failureMessage: result.failureMessage, ...(result.threadId ? { threadId: result.threadId } : {}), ...(result.turnId ? { turnId: result.turnId } : {}) };
await reportTerminalCommand(api, {
runId: options.runId,
commandId: command.id,
runnerId: runner.id,
runnerJobId: options.runnerJobId ?? null,
attemptId,
report,
events: terminalOutboxEvents,
phase: "runner:execute",
runnerLog,
});
await runnerLog.write("command.terminal", { runId: options.runId, commandId: command.id, runnerId: runner.id, runnerJobId: options.runnerJobId ?? null, attemptId, terminalStatus: result.terminalStatus, failureKind: result.failureKind, valuesPrinted: false });
return { commandId: command.id, terminalStatus: result.terminalStatus, failureKind: result.failureKind } as CommandExecutionResult;
} catch (error) {
if (error instanceof TerminalOutboxReportError) throw error;
const failureKind = failureKindFromError(error);
const failure = { failureKind, terminalStatus: terminalStatusForFailure(failureKind), message: errorMessage(error) };
return await reportCommandFailure(api, options.runId, command.id, runner, attemptId, failure, "runner:execute");
return await reportCommandFailure(api, options.runId, command.id, runner, attemptId, failure, "runner:execute", runnerLog, { ...(options.runnerJobId ? { runnerJobId: options.runnerJobId } : {}), events: terminalOutboxEvents });
} finally {
stopSteerWatch?.();
const progressSummary = backendProgress.stop();
@@ -283,7 +314,7 @@ async function executeCommand(api: RunnerManagerApi, options: RunnerOnceOptions,
}
}
function startActiveTurnCommandWatch(api: RunnerManagerApi, runId: string, targetCommandId: string, attemptId: string, runnerId: string, control: BackendActiveTurnControl, pollIntervalMs: number | undefined): () => void {
function startActiveTurnCommandWatch(api: RunnerManagerApi, runId: string, targetCommandId: string, attemptId: string, runnerId: string, control: BackendActiveTurnControl, pollIntervalMs: number | undefined, runnerLog: RunnerLogSink, runnerJobId: string | undefined): () => void {
let stopped = false;
let polling = false;
const seen = new Set<string>();
@@ -296,8 +327,8 @@ function startActiveTurnCommandWatch(api: RunnerManagerApi, runId: string, targe
const pendingControlCommands = commands.filter((item) => (item.type === "steer" || item.type === "interrupt") && item.state === "pending" && !seen.has(item.id));
for (const controlCommand of pendingControlCommands) {
seen.add(controlCommand.id);
if (controlCommand.type === "interrupt") await handleInterruptCommand(api, runId, controlCommand, targetCommandId, attemptId, runnerId, control);
else await handleSteerCommand(api, runId, controlCommand, targetCommandId, attemptId, runnerId, control);
if (controlCommand.type === "interrupt") await handleInterruptCommand(api, runId, controlCommand, targetCommandId, attemptId, runnerId, control, runnerLog, runnerJobId);
else await handleSteerCommand(api, runId, controlCommand, targetCommandId, attemptId, runnerId, control, runnerLog, runnerJobId);
}
} catch {
// The active backend turn remains authoritative; missed control commands stay pending for the next poll.
@@ -313,32 +344,32 @@ function startActiveTurnCommandWatch(api: RunnerManagerApi, runId: string, targe
};
}
async function handleSteerCommand(api: RunnerManagerApi, runId: string, command: CommandRecord, targetCommandId: string, attemptId: string, runnerId: string, control: BackendActiveTurnControl): Promise<void> {
async function handleSteerCommand(api: RunnerManagerApi, runId: string, command: CommandRecord, targetCommandId: string, attemptId: string, runnerId: string, control: BackendActiveTurnControl, runnerLog: RunnerLogSink, runnerJobId: string | undefined): Promise<void> {
const acked = await api.ackCommand(command.id);
if (acked.state === "cancelled") {
await api.reportCommandStatus(command.id, { terminalStatus: "cancelled", failureKind: "cancelled", failureMessage: "steer command cancelled before delivery", threadId: control.threadId, turnId: control.turnId });
await reportTerminalCommand(api, { runId, commandId: command.id, runnerId, runnerJobId: runnerJobId ?? null, attemptId, report: { terminalStatus: "cancelled", failureKind: "cancelled", failureMessage: "steer command cancelled before delivery", threadId: control.threadId, turnId: control.turnId }, phase: "runner:steer:cancelled", runnerLog });
return;
}
const prompt = steerPrompt(command.payload);
if (!prompt) {
await reportNonTerminalCommandFailure(api, runId, command.id, attemptId, runnerId, { terminalStatus: "blocked", failureKind: "schema-invalid", message: "steer command payload requires a non-empty prompt, message, or text" }, "runner:steer:payload", control);
await reportNonTerminalCommandFailure(api, runId, command.id, attemptId, runnerId, { terminalStatus: "blocked", failureKind: "schema-invalid", message: "steer command payload requires a non-empty prompt, message, or text" }, "runner:steer:payload", runnerLog, runnerJobId, control);
return;
}
await appendBestEffort(api, runId, { type: "backend_status", payload: { phase: "steer-command-acknowledged", commandId: command.id, commandType: "steer", targetCommandId, deliveryState: "acknowledged-by-runner", backendAccepted: false, targetEffect: "not-yet-observed", attemptId, runnerId, threadId: control.threadId, turnId: control.turnId } });
try {
await control.steer(prompt);
await appendBestEffort(api, runId, { type: "backend_status", payload: { phase: "turn/steer:completed", commandId: command.id, commandType: "steer", targetCommandId, deliveryState: "forwarded-to-backend", backendAccepted: true, targetEffect: "not-guaranteed", semantics: "turn/steer RPC returned; target turn liveness must be read from the target command result", attemptId, runnerId, threadId: control.threadId, turnId: control.turnId } });
await api.reportCommandStatus(command.id, { terminalStatus: "completed", failureKind: null, failureMessage: null, threadId: control.threadId, turnId: control.turnId });
await reportTerminalCommand(api, { runId, commandId: command.id, runnerId, runnerJobId: runnerJobId ?? null, attemptId, report: { terminalStatus: "completed", failureKind: null, failureMessage: null, threadId: control.threadId, turnId: control.turnId }, phase: "runner:steer", runnerLog });
} catch (error) {
const failureKind = failureKindFromError(error);
await reportNonTerminalCommandFailure(api, runId, command.id, attemptId, runnerId, { terminalStatus: terminalStatusForFailure(failureKind), failureKind, message: errorMessage(error) }, "runner:steer", control);
await reportNonTerminalCommandFailure(api, runId, command.id, attemptId, runnerId, { terminalStatus: terminalStatusForFailure(failureKind), failureKind, message: errorMessage(error) }, "runner:steer", runnerLog, runnerJobId, control);
}
}
async function handleInterruptCommand(api: RunnerManagerApi, runId: string, command: CommandRecord, targetCommandId: string, attemptId: string, runnerId: string, control: BackendActiveTurnControl): Promise<void> {
async function handleInterruptCommand(api: RunnerManagerApi, runId: string, command: CommandRecord, targetCommandId: string, attemptId: string, runnerId: string, control: BackendActiveTurnControl, runnerLog: RunnerLogSink, runnerJobId: string | undefined): Promise<void> {
const acked = await api.ackCommand(command.id);
if (acked.state === "cancelled") {
await api.reportCommandStatus(command.id, { terminalStatus: "cancelled", failureKind: "cancelled", failureMessage: "interrupt command cancelled before delivery", threadId: control.threadId, turnId: control.turnId });
await reportTerminalCommand(api, { runId, commandId: command.id, runnerId, runnerJobId: runnerJobId ?? null, attemptId, report: { terminalStatus: "cancelled", failureKind: "cancelled", failureMessage: "interrupt command cancelled before delivery", threadId: control.threadId, turnId: control.turnId }, phase: "runner:interrupt:cancelled", runnerLog });
return;
}
const reason = interruptReason(command.payload);
@@ -346,24 +377,24 @@ async function handleInterruptCommand(api: RunnerManagerApi, runId: string, comm
try {
await control.interrupt();
await appendBestEffort(api, runId, { type: "backend_status", payload: { phase: "turn/interrupt:completed", commandId: command.id, commandType: "interrupt", targetCommandId, attemptId, runnerId, threadId: control.threadId, turnId: control.turnId, reason } });
await api.reportCommandStatus(command.id, { terminalStatus: "completed", failureKind: null, failureMessage: null, threadId: control.threadId, turnId: control.turnId });
await reportTerminalCommand(api, { runId, commandId: command.id, runnerId, runnerJobId: runnerJobId ?? null, attemptId, report: { terminalStatus: "completed", failureKind: null, failureMessage: null, threadId: control.threadId, turnId: control.turnId }, phase: "runner:interrupt", runnerLog });
} catch (error) {
const failureKind = failureKindFromError(error);
await reportNonTerminalCommandFailure(api, runId, command.id, attemptId, runnerId, { terminalStatus: terminalStatusForFailure(failureKind), failureKind, message: errorMessage(error) }, "runner:interrupt", control);
await reportNonTerminalCommandFailure(api, runId, command.id, attemptId, runnerId, { terminalStatus: terminalStatusForFailure(failureKind), failureKind, message: errorMessage(error) }, "runner:interrupt", runnerLog, runnerJobId, control);
}
}
async function failPendingSteerWithoutActiveTurn(api: RunnerManagerApi, runId: string, commands: CommandRecord[], runnerId: string, attemptId: string): Promise<void> {
async function failPendingSteerWithoutActiveTurn(api: RunnerManagerApi, runId: string, commands: CommandRecord[], runnerId: string, attemptId: string, runnerLog: RunnerLogSink, runnerJobId: string | undefined): Promise<void> {
for (const command of commands.filter((item) => (item.type === "steer" || item.type === "interrupt") && item.state === "pending")) {
const acked = await api.ackCommand(command.id);
if (acked.state === "cancelled") continue;
await reportNonTerminalCommandFailure(api, runId, command.id, attemptId, runnerId, { terminalStatus: "blocked", failureKind: "schema-invalid", message: `${command.type} command requires an active turn` }, `runner:${command.type}:no-active-turn`);
await reportNonTerminalCommandFailure(api, runId, command.id, attemptId, runnerId, { terminalStatus: "blocked", failureKind: "schema-invalid", message: `${command.type} command requires an active turn` }, `runner:${command.type}:no-active-turn`, runnerLog, runnerJobId);
}
}
async function reportNonTerminalCommandFailure(api: RunnerManagerApi, runId: string, commandId: string, attemptId: string, runnerId: string, failure: { terminalStatus: TerminalStatus; failureKind: FailureKind; message: string }, phase: string, control?: BackendActiveTurnControl): Promise<void> {
await appendBestEffort(api, runId, { type: "error", payload: { failureKind: failure.failureKind, message: failure.message, phase, commandId, attemptId, runnerId, ...(control ? { threadId: control.threadId, turnId: control.turnId } : {}) } });
await api.reportCommandStatus(commandId, { terminalStatus: failure.terminalStatus, failureKind: failure.failureKind, failureMessage: failure.message, ...(control ? { threadId: control.threadId, turnId: control.turnId } : {}) });
async function reportNonTerminalCommandFailure(api: RunnerManagerApi, runId: string, commandId: string, attemptId: string, runnerId: string, failure: { terminalStatus: TerminalStatus; failureKind: FailureKind; message: string }, phase: string, runnerLog: RunnerLogSink, runnerJobId: string | undefined, control?: BackendActiveTurnControl): Promise<void> {
const event: BackendEvent = { type: "error", payload: { failureKind: failure.failureKind, message: failure.message, phase, commandId, attemptId, runnerId, ...(control ? { threadId: control.threadId, turnId: control.turnId } : {}) } };
await reportTerminalCommand(api, { runId, commandId, runnerId, runnerJobId: runnerJobId ?? null, attemptId, report: { terminalStatus: failure.terminalStatus, failureKind: failure.failureKind, failureMessage: failure.message, ...(control ? { threadId: control.threadId, turnId: control.turnId } : {}) }, events: [event], phase, runnerLog });
}
function failureDetailsFromError(error: unknown): JsonRecord | null {
@@ -529,20 +560,80 @@ function annotateCommandEvent(event: BackendEvent, commandId: string, attemptId:
return { ...event, payload: { ...event.payload, commandId, attemptId, runnerId } };
}
async function reportCommandFailure(api: RunnerManagerApi, runId: string, commandId: string, runner: RunnerRecord, attemptId: string, failure: RunnerFailure, phase: string, options: { terminalRun?: boolean } = {}): Promise<CommandExecutionResult> {
async function reportTerminalCommand(api: RunnerManagerApi, input: { runId: string; commandId: string; runnerId: string; runnerJobId: string | null; attemptId: string; report: CommandTerminalReport; events?: BackendEvent[]; phase: string; runnerLog: RunnerLogSink; terminalRun?: boolean }): Promise<void> {
const outboxKey = terminalOutboxKey(input);
const events = (input.events ?? []).map((event, index) => annotateTerminalOutboxEvent(event, input.commandId, input.attemptId, input.runnerId, outboxKey, index));
const outboxPayload: JsonRecord = {
schema: "agentrun-terminal-outbox-v1",
outboxKey,
phase: input.phase,
runId: input.runId,
commandId: input.commandId,
runnerId: input.runnerId,
runnerJobId: input.runnerJobId,
attemptId: input.attemptId,
report: terminalReportJson(input.report),
terminalRun: input.terminalRun === true,
events: events.map((event) => ({ type: event.type, payload: event.payload })),
eventCount: events.length,
valuesPrinted: false,
};
await writeTerminalOutbox(input.runnerLog, outboxPayload);
for (const event of events) await appendBestEffort(api, input.runId, event);
try {
await api.reportCommandStatus(input.commandId, input.report);
if (input.terminalRun === true) await api.reportStatus(input.runId, input.report);
} catch (error) {
throw new TerminalOutboxReportError(error);
}
await input.runnerLog.write("terminal.outbox.ack", { runId: input.runId, commandId: input.commandId, runnerId: input.runnerId, runnerJobId: input.runnerJobId, attemptId: input.attemptId, outboxKey, valuesPrinted: false });
}
function terminalOutboxKey(input: { runId: string; commandId: string; runnerId: string; attemptId: string; phase: string; report: CommandTerminalReport; terminalRun?: boolean }): string {
return stableHash({ schema: "agentrun-terminal-outbox-v1", runId: input.runId, commandId: input.commandId, runnerId: input.runnerId, attemptId: input.attemptId, phase: input.phase, terminalRun: input.terminalRun === true, report: terminalReportJson(input.report) });
}
function terminalReportJson(report: CommandTerminalReport): JsonRecord {
return { terminalStatus: report.terminalStatus, failureKind: report.failureKind, failureMessage: report.failureMessage, threadId: report.threadId ?? null, turnId: report.turnId ?? null };
}
function annotateTerminalOutboxEvent(event: BackendEvent, commandId: string, attemptId: string, runnerId: string, outboxKey: string, index: number): BackendEvent {
const annotated = annotateCommandEvent(event, commandId, attemptId, runnerId);
return { ...annotated, payload: { ...annotated.payload, terminalOutboxKey: outboxKey, terminalOutboxIndex: index } };
}
async function writeTerminalOutbox(runnerLog: RunnerLogSink, payload: JsonRecord): Promise<void> {
await runnerLog.write("terminal.outbox", payload);
try {
process.stdout.write(JSON.stringify({ at: new Date().toISOString(), label: "terminal.outbox", ...payload, valuesPrinted: false }) + "\n");
} catch {
// Kubernetes stdout is the recovery path; if it is unavailable, the file log remains diagnostic evidence.
}
}
function shouldKeepTerminalOutboxEvent(event: BackendEvent): boolean {
if (event.type === "terminal_status") return true;
if (event.type !== "assistant_message") return false;
return event.payload.final === true || typeof event.payload.text === "string";
}
async function reportCommandFailure(api: RunnerManagerApi, runId: string, commandId: string, runner: RunnerRecord, attemptId: string, failure: RunnerFailure, phase: string, runnerLog: RunnerLogSink, options: { terminalRun?: boolean; runnerJobId?: string; events?: BackendEvent[] } = {}): Promise<CommandExecutionResult> {
const details = failure.details ? { details: failure.details } : {};
await api.appendEvent(runId, { type: "error", payload: { failureKind: failure.failureKind, message: failure.message, phase, commandId, attemptId, runnerId: runner.id, ...details } });
await api.appendEvent(runId, { type: "terminal_status", payload: { terminalStatus: failure.terminalStatus, failureKind: failure.failureKind, message: failure.message, commandId, attemptId, runnerId: runner.id, ...details } });
await api.reportCommandStatus(commandId, { terminalStatus: failure.terminalStatus, failureKind: failure.failureKind, failureMessage: failure.message });
if (options.terminalRun === true) await api.reportStatus(runId, { terminalStatus: failure.terminalStatus, failureKind: failure.failureKind, failureMessage: failure.message });
const events: BackendEvent[] = [
...(options.events ?? []),
{ type: "error", payload: { failureKind: failure.failureKind, message: failure.message, phase, commandId, attemptId, runnerId: runner.id, ...details } },
{ type: "terminal_status", payload: { terminalStatus: failure.terminalStatus, failureKind: failure.failureKind, message: failure.message, commandId, attemptId, runnerId: runner.id, ...details } },
];
await reportTerminalCommand(api, { runId, commandId, runnerId: runner.id, runnerJobId: options.runnerJobId ?? null, attemptId, report: { terminalStatus: failure.terminalStatus, failureKind: failure.failureKind, failureMessage: failure.message }, events, phase, runnerLog, terminalRun: options.terminalRun === true });
return { commandId, terminalStatus: failure.terminalStatus, failureKind: failure.failureKind } as CommandExecutionResult;
}
async function reportCancelled(api: RunnerManagerApi, runId: string, commandId: string, runner: RunnerRecord, attemptId: string, message: string): Promise<CommandExecutionResult> {
await api.reportCommandStatus(commandId, { terminalStatus: "cancelled", failureKind: "cancelled", failureMessage: message });
await api.reportStatus(runId, { terminalStatus: "cancelled", failureKind: "cancelled", failureMessage: message });
await api.appendEvent(runId, { type: "backend_status", payload: { phase: "turn-cancelled", commandId, attemptId, runnerId: runner.id, failureKind: "cancelled", message } });
await api.appendEvent(runId, { type: "terminal_status", payload: { terminalStatus: "cancelled", failureKind: "cancelled", message, commandId, attemptId, runnerId: runner.id } });
async function reportCancelled(api: RunnerManagerApi, runId: string, commandId: string, runner: RunnerRecord, attemptId: string, message: string, runnerLog: RunnerLogSink, runnerJobId: string | undefined): Promise<CommandExecutionResult> {
const events: BackendEvent[] = [
{ type: "backend_status", payload: { phase: "turn-cancelled", commandId, attemptId, runnerId: runner.id, failureKind: "cancelled", message } },
{ type: "terminal_status", payload: { terminalStatus: "cancelled", failureKind: "cancelled", message, commandId, attemptId, runnerId: runner.id } },
];
await reportTerminalCommand(api, { runId, commandId, runnerId: runner.id, runnerJobId: runnerJobId ?? null, attemptId, report: { terminalStatus: "cancelled", failureKind: "cancelled", failureMessage: message }, events, phase: "runner:cancelled", runnerLog, terminalRun: true });
return { commandId, terminalStatus: "cancelled", failureKind: "cancelled" };
}