239 lines
14 KiB
TypeScript
239 lines
14 KiB
TypeScript
import { RunnerManagerApi, failureKindFromError, terminalStatusForFailure, errorMessage } from "./manager-api.js";
|
|
import { createBackendSession, runBackendTurn, type BackendAdapterOptions, type BackendSession } from "../backend/adapter.js";
|
|
import { materializeResourceBundle } from "./resource-bundle.js";
|
|
import type { BackendEvent, BackendProfile, CommandRecord, FailureKind, JsonRecord, RunRecord, RunnerRecord, TerminalStatus } from "../common/types.js";
|
|
import { AgentRunError } from "../common/errors.js";
|
|
|
|
export interface RunnerOnceOptions extends BackendAdapterOptions {
|
|
managerUrl: string;
|
|
runId: string;
|
|
commandId?: string;
|
|
runnerId?: string;
|
|
attemptId?: string;
|
|
leaseMs?: number;
|
|
backendProfile?: BackendProfile;
|
|
placement?: "host-process" | "kubernetes-job";
|
|
sourceCommit?: string;
|
|
jobName?: string;
|
|
podName?: string;
|
|
logPath?: string;
|
|
idleTimeoutMs?: number;
|
|
pollIntervalMs?: number;
|
|
oneShot?: boolean;
|
|
}
|
|
|
|
interface CommandExecutionResult extends JsonRecord {
|
|
commandId: string;
|
|
terminalStatus: TerminalStatus;
|
|
failureKind: FailureKind | null;
|
|
}
|
|
|
|
export async function runOnce(options: RunnerOnceOptions): Promise<JsonRecord> {
|
|
const api = new RunnerManagerApi(options.managerUrl);
|
|
const targetRun = await api.getRun(options.runId);
|
|
if (isTerminalRun(targetRun)) return { terminalStatus: targetRun.terminalStatus, failureKind: targetRun.failureKind, run: targetRun, skipped: "run-terminal" } as JsonRecord;
|
|
if (options.backendProfile && options.backendProfile !== targetRun.backendProfile) {
|
|
throw new AgentRunError("schema-invalid", `runner backendProfile ${options.backendProfile} does not match run backendProfile ${targetRun.backendProfile}`, { httpStatus: 400 });
|
|
}
|
|
const leaseMs = options.leaseMs ?? 60_000;
|
|
const attemptId = options.attemptId ?? `attempt_${Date.now().toString(36)}`;
|
|
const runner = await api.register({
|
|
runId: options.runId,
|
|
attemptId,
|
|
backendProfile: targetRun.backendProfile,
|
|
placement: options.placement ?? "host-process",
|
|
sourceCommit: options.sourceCommit ?? process.env.AGENTRUN_SOURCE_COMMIT ?? "unknown",
|
|
...(options.runnerId ? { runnerId: options.runnerId } : {}),
|
|
...(options.jobName ? { jobName: options.jobName } : {}),
|
|
...(options.podName ? { podName: options.podName } : {}),
|
|
...(options.logPath ? { logPath: options.logPath } : {}),
|
|
}) as RunnerRecord;
|
|
let claimed: RunRecord;
|
|
try {
|
|
claimed = await api.claim(options.runId, runner.id, leaseMs);
|
|
await api.heartbeat(options.runId, runner.id, leaseMs);
|
|
} catch (error) {
|
|
const failureKind = failureKindFromError(error);
|
|
if (failureKind !== "runner-lease-conflict") {
|
|
await api.reportFailure(options.runId, { terminalStatus: terminalStatusForFailure(failureKind), failureKind, failureMessage: errorMessage(error) });
|
|
}
|
|
throw error;
|
|
}
|
|
|
|
const stopHeartbeat = startHeartbeat(api, options.runId, runner.id, leaseMs);
|
|
const idleTimeoutMs = options.idleTimeoutMs ?? 120_000;
|
|
const pollIntervalMs = normalizePollIntervalMs(options.pollIntervalMs);
|
|
const commandResults: CommandExecutionResult[] = [];
|
|
let workspacePath: string | undefined;
|
|
let materializationAttempted = false;
|
|
let materializationFailure: { failureKind: FailureKind; terminalStatus: TerminalStatus; message: string } | null = null;
|
|
let backendSession: BackendSession | null = null;
|
|
|
|
try {
|
|
let idleSince = Date.now();
|
|
let firstPoll = true;
|
|
while (true) {
|
|
const currentRun = await api.getRun(options.runId);
|
|
if (isTerminalRun(currentRun)) return { runner, claimed, terminalStatus: currentRun.terminalStatus, failureKind: currentRun.failureKind, run: currentRun, commandsProcessed: commandResults.length, commandResults, stopped: "run-terminal" } as JsonRecord;
|
|
|
|
const commandsResponse = await api.pollCommands(options.runId, { afterSeq: 0, limit: 20, ...(firstPoll && options.commandId ? { commandId: options.commandId } : {}) });
|
|
firstPoll = false;
|
|
const command = commandsResponse.selected;
|
|
if (!command) {
|
|
if (options.oneShot === true) return noPendingResult(runner, claimed, commandResults, commandsResponse.items.length, "one-shot-no-pending");
|
|
if (Date.now() - idleSince >= idleTimeoutMs) return noPendingResult(runner, claimed, commandResults, commandsResponse.items.length, "idle-timeout");
|
|
await sleep(pollIntervalMs);
|
|
continue;
|
|
}
|
|
|
|
idleSince = Date.now();
|
|
if (!materializationAttempted) {
|
|
materializationAttempted = true;
|
|
try {
|
|
const materialized = await materializeResourceBundle(claimed.resourceBundleRef ?? null, options.env ?? process.env);
|
|
if (materialized) {
|
|
workspacePath = materialized.workspacePath;
|
|
await api.appendEvent(options.runId, { type: "backend_status", payload: { ...materialized.event, commandId: command.id, attemptId, runnerId: runner.id } });
|
|
}
|
|
} catch (error) {
|
|
const failureKind = failureKindFromError(error);
|
|
materializationFailure = { failureKind, terminalStatus: terminalStatusForFailure(failureKind), message: errorMessage(error) };
|
|
}
|
|
}
|
|
|
|
const result = materializationFailure
|
|
? await reportCommandFailure(api, options.runId, command.id, runner, attemptId, materializationFailure, "runner:resource-bundle")
|
|
: await executeCommand(api, options, command, runner, attemptId, workspacePath, backendSession ?? (backendSession = createBackendSession(currentRun, options)));
|
|
commandResults.push(result);
|
|
if (options.oneShot === true) {
|
|
const run = await api.reportStatus(options.runId, { terminalStatus: result.terminalStatus, failureKind: result.failureKind, failureMessage: null });
|
|
return { runner, commandId: command.id, terminalStatus: result.terminalStatus, failureKind: result.failureKind, run, commandsProcessed: commandResults.length, commandResults, stopped: "one-shot" } as JsonRecord;
|
|
}
|
|
}
|
|
} catch (error) {
|
|
const failureKind = failureKindFromError(error);
|
|
const terminalStatus = terminalStatusForFailure(failureKind);
|
|
const message = errorMessage(error);
|
|
await api.appendEvent(options.runId, { type: "error", payload: { failureKind, message, phase: "runner:loop", attemptId, runnerId: runner.id } });
|
|
const finalRun = await api.reportStatus(options.runId, { terminalStatus, failureKind, failureMessage: message }) as RunRecord;
|
|
return { runner, terminalStatus, failureKind, run: finalRun, commandsProcessed: commandResults.length, commandResults } as JsonRecord;
|
|
} finally {
|
|
if (backendSession) {
|
|
try {
|
|
const closeEvents = await backendSession.close();
|
|
for (const event of closeEvents) await api.appendEvent(options.runId, annotateCommandEvent(event, "runner-shutdown", attemptId, runner.id));
|
|
} catch {
|
|
// Runner shutdown must not hide the terminal result already reported for the command.
|
|
}
|
|
}
|
|
stopHeartbeat();
|
|
}
|
|
}
|
|
|
|
async function executeCommand(api: RunnerManagerApi, options: RunnerOnceOptions, command: CommandRecord, runner: RunnerRecord, attemptId: string, workspacePath: string | undefined, backendSession: BackendSession | null): Promise<CommandExecutionResult> {
|
|
await api.ackCommand(command.id);
|
|
const acked = await api.getCommand(options.runId, command.id);
|
|
if (acked.state === "cancelled") return await reportCancelled(api, options.runId, command.id, runner, attemptId, "command cancelled before backend start");
|
|
await assertNotCancelled(api, options.runId, command.id);
|
|
const abortController = new AbortController();
|
|
const stopCancelWatch = watchCancellation(api, options.runId, command.id, abortController);
|
|
try {
|
|
const latestRun = await api.getRun(options.runId);
|
|
const backendOptions = { ...options, ...(workspacePath ? { workspacePath } : {}), abortSignal: abortController.signal };
|
|
const result = backendSession ? await backendSession.runTurn(latestRun, command, backendOptions) : await runBackendTurn(latestRun, command, backendOptions);
|
|
for (const event of result.events) {
|
|
await api.appendEvent(options.runId, annotateCommandEvent(event, command.id, attemptId, runner.id));
|
|
}
|
|
await api.reportCommandStatus(command.id, { terminalStatus: result.terminalStatus, failureKind: result.failureKind, failureMessage: result.failureMessage, ...(result.threadId ? { threadId: result.threadId } : {}), ...(result.turnId ? { turnId: result.turnId } : {}) });
|
|
return { commandId: command.id, terminalStatus: result.terminalStatus, failureKind: result.failureKind } as CommandExecutionResult;
|
|
} catch (error) {
|
|
const failureKind = failureKindFromError(error);
|
|
const failure = { failureKind, terminalStatus: terminalStatusForFailure(failureKind), message: errorMessage(error) };
|
|
return await reportCommandFailure(api, options.runId, command.id, runner, attemptId, failure, "runner:execute");
|
|
} finally {
|
|
stopCancelWatch();
|
|
}
|
|
}
|
|
|
|
function isTerminalRun(run: RunRecord): boolean {
|
|
return run.status === "completed" || run.status === "failed" || run.status === "blocked" || run.status === "cancelled";
|
|
}
|
|
|
|
async function assertNotCancelled(api: RunnerManagerApi, runId: string, commandId: string): Promise<void> {
|
|
const [run, command] = await Promise.all([api.getRun(runId), api.getCommand(runId, commandId)]);
|
|
if (run.status === "cancelled" || command.state === "cancelled") throw new AgentRunError("cancelled", "run or command was cancelled", { httpStatus: 409 });
|
|
}
|
|
|
|
function watchCancellation(api: RunnerManagerApi, runId: string, commandId: string, controller: AbortController): () => void {
|
|
let stopped = false;
|
|
const check = async (): Promise<void> => {
|
|
if (stopped || controller.signal.aborted) return;
|
|
try {
|
|
const [run, command] = await Promise.all([api.getRun(runId), api.getCommand(runId, commandId)]);
|
|
if (run.status === "cancelled" || command.state === "cancelled") controller.abort();
|
|
} catch {
|
|
// Cancellation polling must not hide the backend's own terminal result.
|
|
}
|
|
};
|
|
const timer = setInterval(() => { void check(); }, 2_000);
|
|
void check();
|
|
return () => {
|
|
stopped = true;
|
|
clearInterval(timer);
|
|
};
|
|
}
|
|
|
|
function startHeartbeat(api: RunnerManagerApi, runId: string, runnerId: string, leaseMs: number): () => void {
|
|
let stopped = false;
|
|
const beat = async (): Promise<void> => {
|
|
if (stopped) return;
|
|
try {
|
|
await api.heartbeat(runId, runnerId, leaseMs);
|
|
} catch {
|
|
// The next manager call will surface lease or run-terminal details.
|
|
}
|
|
};
|
|
const timer = setInterval(() => { void beat(); }, Math.max(1_000, Math.floor(leaseMs / 3)));
|
|
return () => {
|
|
stopped = true;
|
|
clearInterval(timer);
|
|
};
|
|
}
|
|
|
|
function annotateCommandEvent(event: BackendEvent, commandId: string, attemptId: string, runnerId: string): BackendEvent {
|
|
return { ...event, payload: { ...event.payload, commandId, attemptId, runnerId } };
|
|
}
|
|
|
|
async function reportCommandFailure(api: RunnerManagerApi, runId: string, commandId: string, runner: RunnerRecord, attemptId: string, failure: { terminalStatus: TerminalStatus; failureKind: FailureKind; message: string }, phase: string): Promise<CommandExecutionResult> {
|
|
await api.appendEvent(runId, { type: "error", payload: { failureKind: failure.failureKind, message: failure.message, phase, commandId, attemptId, runnerId: runner.id } });
|
|
await api.appendEvent(runId, { type: "terminal_status", payload: { terminalStatus: failure.terminalStatus, failureKind: failure.failureKind, message: failure.message, commandId, attemptId, runnerId: runner.id } });
|
|
await api.reportCommandStatus(commandId, { terminalStatus: failure.terminalStatus, failureKind: failure.failureKind, failureMessage: failure.message });
|
|
await api.reportStatus(runId, { terminalStatus: failure.terminalStatus, failureKind: failure.failureKind, failureMessage: failure.message });
|
|
return { commandId, terminalStatus: failure.terminalStatus, failureKind: failure.failureKind } as CommandExecutionResult;
|
|
}
|
|
|
|
async function reportCancelled(api: RunnerManagerApi, runId: string, commandId: string, runner: RunnerRecord, attemptId: string, message: string): Promise<CommandExecutionResult> {
|
|
await api.reportCommandStatus(commandId, { terminalStatus: "cancelled", failureKind: "cancelled", failureMessage: message });
|
|
await api.reportStatus(runId, { terminalStatus: "cancelled", failureKind: "cancelled", failureMessage: message });
|
|
await api.appendEvent(runId, { type: "backend_status", payload: { phase: "turn-cancelled", commandId, attemptId, runnerId: runner.id, failureKind: "cancelled", message } });
|
|
await api.appendEvent(runId, { type: "terminal_status", payload: { terminalStatus: "cancelled", failureKind: "cancelled", message, commandId, attemptId, runnerId: runner.id } });
|
|
return { commandId, terminalStatus: "cancelled", failureKind: "cancelled" };
|
|
}
|
|
|
|
function noPendingResult(runner: RunnerRecord, claimed: RunRecord, commandResults: CommandExecutionResult[], polledCommands: number, stopped: string): JsonRecord {
|
|
if (commandResults.length > 0) {
|
|
const last = commandResults.at(-1)!;
|
|
return { runner, claimed, terminalStatus: last.terminalStatus, failureKind: last.failureKind, commandsProcessed: commandResults.length, commandResults, polledCommands, stopped };
|
|
}
|
|
return { runner, claimed, terminalStatus: "blocked", failureKind: "schema-invalid", commandsProcessed: 0, commandResults, polledCommands, stopped };
|
|
}
|
|
|
|
function sleep(ms: number): Promise<void> {
|
|
return new Promise((resolve) => setTimeout(resolve, ms));
|
|
}
|
|
|
|
function normalizePollIntervalMs(value: number | undefined): number {
|
|
if (!Number.isFinite(value ?? NaN)) return 250;
|
|
return Math.max(50, Math.min(2_000, Math.floor(value!)));
|
|
}
|