Files
pikasTech-agentrun/src/runner/run-once.ts
T

126 lines
7.2 KiB
TypeScript

import { RunnerManagerApi, failureKindFromError, terminalStatusForFailure, errorMessage } from "./manager-api.js";
import { runBackendTurn, type BackendAdapterOptions } from "../backend/adapter.js";
import { materializeResourceBundle } from "./resource-bundle.js";
import type { BackendProfile, JsonRecord, RunRecord, RunnerRecord } from "../common/types.js";
import { AgentRunError } from "../common/errors.js";
export interface RunnerOnceOptions extends BackendAdapterOptions {
managerUrl: string;
runId: string;
commandId?: string;
runnerId?: string;
attemptId?: string;
leaseMs?: number;
backendProfile?: BackendProfile;
placement?: "host-process" | "kubernetes-job";
sourceCommit?: string;
jobName?: string;
podName?: string;
logPath?: string;
}
export async function runOnce(options: RunnerOnceOptions): Promise<JsonRecord> {
const api = new RunnerManagerApi(options.managerUrl);
const targetRun = await api.client.get(`/api/v1/runs/${encodeURIComponent(options.runId)}`) as RunRecord;
if (isTerminalRun(targetRun)) return { terminalStatus: targetRun.terminalStatus, failureKind: targetRun.failureKind, run: targetRun, skipped: "run-terminal" } as JsonRecord;
if (options.backendProfile && options.backendProfile !== targetRun.backendProfile) {
throw new AgentRunError("schema-invalid", `runner backendProfile ${options.backendProfile} does not match run backendProfile ${targetRun.backendProfile}`, { httpStatus: 400 });
}
const leaseMs = options.leaseMs ?? 60_000;
const attemptId = options.attemptId ?? `attempt_${Date.now().toString(36)}`;
const runner = await api.register({
runId: options.runId,
attemptId,
backendProfile: targetRun.backendProfile,
placement: options.placement ?? "host-process",
sourceCommit: options.sourceCommit ?? process.env.AGENTRUN_SOURCE_COMMIT ?? "unknown",
...(options.runnerId ? { runnerId: options.runnerId } : {}),
...(options.jobName ? { jobName: options.jobName } : {}),
...(options.podName ? { podName: options.podName } : {}),
...(options.logPath ? { logPath: options.logPath } : {}),
}) as RunnerRecord;
let claimed: RunRecord;
try {
claimed = await api.claim(options.runId, runner.id, leaseMs);
await api.heartbeat(options.runId, runner.id, leaseMs);
} catch (error) {
const failureKind = failureKindFromError(error);
if (failureKind !== "runner-lease-conflict") {
await api.reportFailure(options.runId, { terminalStatus: terminalStatusForFailure(failureKind), failureKind, failureMessage: errorMessage(error) });
}
throw error;
}
const commandsResponse = await api.pollCommands(options.runId, { afterSeq: 0, limit: 20, ...(options.commandId ? { commandId: options.commandId } : {}) });
const command = commandsResponse.selected;
if (!command) {
await api.reportStatus(options.runId, { terminalStatus: "blocked", failureKind: "schema-invalid", failureMessage: "no pending turn command" });
return { runner, claimed, terminalStatus: "blocked", failureKind: "schema-invalid", polledCommands: commandsResponse.items.length };
}
await api.ackCommand(command.id);
const acked = await api.getCommand(options.runId, command.id);
if (acked.state === "cancelled") return await reportCancelled(api, options.runId, command.id, runner, claimed, "command cancelled before backend start");
await assertNotCancelled(api, options.runId, command.id);
const abortController = new AbortController();
const stopCancelWatch = watchCancellation(api, options.runId, command.id, abortController);
let workspacePath: string | undefined;
try {
const materialized = await materializeResourceBundle(claimed.resourceBundleRef ?? null, options.env ?? process.env);
if (materialized) {
workspacePath = materialized.workspacePath;
await api.appendEvent(options.runId, { type: "backend_status", payload: materialized.event });
}
await assertNotCancelled(api, options.runId, command.id);
const result = await runBackendTurn(claimed, command, { ...options, ...(workspacePath ? { workspacePath } : {}), abortSignal: abortController.signal });
for (const event of result.events) {
if (event.type !== "terminal_status") await api.appendEvent(options.runId, event);
}
await api.reportCommandStatus(command.id, { terminalStatus: result.terminalStatus, failureKind: result.failureKind, failureMessage: result.failureMessage });
const finalRun = await api.reportStatus(options.runId, { terminalStatus: result.terminalStatus, failureKind: result.failureKind, failureMessage: result.failureMessage, ...(result.threadId ? { threadId: result.threadId } : {}), ...(result.turnId ? { turnId: result.turnId } : {}) }) as RunRecord;
return { runner, commandId: command.id, terminalStatus: result.terminalStatus, failureKind: result.failureKind, run: finalRun } as JsonRecord;
} catch (error) {
const failureKind = failureKindFromError(error);
const terminalStatus = terminalStatusForFailure(failureKind);
const message = errorMessage(error);
await api.appendEvent(options.runId, { type: "error", payload: { failureKind, message, phase: "runner:execute", attemptId, runnerId: runner.id } });
await api.reportCommandStatus(command.id, { terminalStatus, failureKind, failureMessage: message });
const finalRun = await api.reportStatus(options.runId, { terminalStatus, failureKind, failureMessage: message }) as RunRecord;
return { runner, commandId: command.id, terminalStatus, failureKind, run: finalRun } as JsonRecord;
} finally {
stopCancelWatch();
}
}
function isTerminalRun(run: RunRecord): boolean {
return run.status === "completed" || run.status === "failed" || run.status === "blocked" || run.status === "cancelled";
}
async function assertNotCancelled(api: RunnerManagerApi, runId: string, commandId: string): Promise<void> {
const [run, command] = await Promise.all([api.getRun(runId), api.getCommand(runId, commandId)]);
if (run.status === "cancelled" || command.state === "cancelled") throw new AgentRunError("cancelled", "run or command was cancelled", { httpStatus: 409 });
}
function watchCancellation(api: RunnerManagerApi, runId: string, commandId: string, controller: AbortController): () => void {
let stopped = false;
const check = async (): Promise<void> => {
if (stopped || controller.signal.aborted) return;
try {
const [run, command] = await Promise.all([api.getRun(runId), api.getCommand(runId, commandId)]);
if (run.status === "cancelled" || command.state === "cancelled") controller.abort();
} catch {
// Cancellation polling must not hide the backend's own terminal result.
}
};
const timer = setInterval(() => { void check(); }, 2_000);
void check();
return () => {
stopped = true;
clearInterval(timer);
};
}
async function reportCancelled(api: RunnerManagerApi, runId: string, commandId: string, runner: RunnerRecord, claimed: RunRecord, message: string): Promise<JsonRecord> {
await api.reportCommandStatus(commandId, { terminalStatus: "cancelled", failureKind: "cancelled", failureMessage: message });
const finalRun = await api.reportStatus(runId, { terminalStatus: "cancelled", failureKind: "cancelled", failureMessage: message });
return { runner, commandId, claimed, terminalStatus: "cancelled", failureKind: "cancelled", run: finalRun };
}