140 lines
7.2 KiB
TypeScript
140 lines
7.2 KiB
TypeScript
import { AgentRunError } from "../common/errors.js";
|
|
import type { CommandRecord, CreateRunInput, JsonRecord, JsonValue, QueueAttemptRef, QueueDispatchResult, QueueTaskRecord, QueueTaskState, RunRecord } from "../common/types.js";
|
|
import { asRecord } from "../common/validation.js";
|
|
import type { AgentRunStore } from "./store.js";
|
|
import { isTerminalQueueTaskState } from "./store.js";
|
|
import { createKubernetesRunnerJob, type CreateRunnerJobInput, type RunnerJobDefaults } from "./kubernetes-runner-job.js";
|
|
|
|
export interface DispatchQueueTaskOptions {
|
|
store: AgentRunStore;
|
|
taskId: string;
|
|
input: JsonRecord;
|
|
defaults: RunnerJobDefaults;
|
|
}
|
|
|
|
export async function dispatchQueueTask(options: DispatchQueueTaskOptions): Promise<QueueDispatchResult> {
|
|
const task = await options.store.getQueueTask(options.taskId);
|
|
assertDispatchable(task);
|
|
|
|
const run = await options.store.createRun(buildRunInput(task, options.input));
|
|
const command = await options.store.createCommand(run.id, { type: "turn", payload: task.payload, idempotencyKey: `queue:${task.id}:command` });
|
|
const runnerJob = await createKubernetesRunnerJob({
|
|
store: options.store,
|
|
runId: run.id,
|
|
input: buildRunnerJobInput(task, command.id, options.input),
|
|
defaults: options.defaults,
|
|
});
|
|
|
|
const attemptId = stringFrom(runnerJob, "attemptId");
|
|
const runnerJobs = await options.store.listRunnerJobs(run.id, command.id);
|
|
const runnerJobRecord = runnerJobs.find((item) => item.attemptId === attemptId) ?? runnerJobs.at(-1) ?? null;
|
|
const sessionId = run.sessionRef?.sessionId ?? null;
|
|
const sessionPath = sessionId ? `/api/v1/sessions/${encodeURIComponent(sessionId)}` : null;
|
|
const latestAttempt: QueueAttemptRef = {
|
|
attemptId,
|
|
state: "running",
|
|
runId: run.id,
|
|
commandId: command.id,
|
|
runnerJobId: runnerJobRecord?.id ?? null,
|
|
sessionId,
|
|
sessionPath,
|
|
};
|
|
const updatedTask = await options.store.updateQueueTaskAttempt(task.id, { state: "running", latestAttempt, sessionPath });
|
|
await options.store.appendEvent(run.id, "backend_status", { phase: "queue-dispatched", taskId: task.id, queue: task.queue, lane: task.lane, attemptId, runnerJobId: runnerJobRecord?.id ?? null, sessionPath });
|
|
|
|
return {
|
|
action: "queue-dispatch",
|
|
mutation: true,
|
|
task: updatedTask,
|
|
run,
|
|
command,
|
|
runnerJob,
|
|
latestAttempt,
|
|
pollCommands: {
|
|
queue: `./scripts/agentrun queue show ${task.id}`,
|
|
run: `./scripts/agentrun runs show ${run.id}`,
|
|
command: `./scripts/agentrun commands show ${command.id} --run-id ${run.id}`,
|
|
events: `./scripts/agentrun runs events ${run.id} --after-seq 0 --limit 100`,
|
|
},
|
|
};
|
|
}
|
|
|
|
export async function refreshQueueTaskFromCore(store: AgentRunStore, taskId: string): Promise<QueueTaskRecord> {
|
|
const task = await store.getQueueTask(taskId);
|
|
if (isTerminalQueueTaskState(task.state)) return task;
|
|
const latestAttempt = task.latestAttempt;
|
|
if (!latestAttempt?.runId || !latestAttempt.commandId) throw new AgentRunError("schema-invalid", `queue task ${taskId} has no Core attempt to refresh`, { httpStatus: 409 });
|
|
const [run, command] = await Promise.all([store.getRun(latestAttempt.runId), store.getCommand(latestAttempt.commandId)]);
|
|
if (command.runId !== run.id) throw new AgentRunError("schema-invalid", `queue task ${taskId} attempt references mismatched run and command`, { httpStatus: 409 });
|
|
const state = queueStateFromCore(run, command);
|
|
const sessionId = run.sessionRef?.sessionId ?? latestAttempt.sessionId ?? null;
|
|
const sessionPath = sessionId ? `/api/v1/sessions/${encodeURIComponent(sessionId)}` : latestAttempt.sessionPath ?? null;
|
|
const nextAttempt: QueueAttemptRef = { ...latestAttempt, state, sessionId, sessionPath };
|
|
return store.updateQueueTaskAttempt(task.id, { state, latestAttempt: nextAttempt, sessionPath });
|
|
}
|
|
|
|
function assertDispatchable(task: QueueTaskRecord): void {
|
|
if (isTerminalQueueTaskState(task.state)) throw new AgentRunError(task.state === "cancelled" ? "cancelled" : "schema-invalid", `queue task ${task.id} is already terminal: ${task.state}`, { httpStatus: 409 });
|
|
if (task.state !== "pending") throw new AgentRunError("schema-invalid", `queue task ${task.id} is not pending: ${task.state}`, { httpStatus: 409 });
|
|
if (!task.workspaceRef) throw new AgentRunError("schema-invalid", "queue dispatch requires workspaceRef", { httpStatus: 400 });
|
|
if (!task.providerId) throw new AgentRunError("schema-invalid", "queue dispatch requires providerId", { httpStatus: 400 });
|
|
if (!task.executionPolicy) throw new AgentRunError("schema-invalid", "queue dispatch requires executionPolicy", { httpStatus: 400 });
|
|
}
|
|
|
|
function queueStateFromCore(run: RunRecord, command: CommandRecord): QueueTaskState {
|
|
if (command.state === "completed") return "completed";
|
|
if (command.state === "failed") return "failed";
|
|
if (command.state === "cancelled") return "cancelled";
|
|
if (run.status === "completed") return "completed";
|
|
if (run.status === "failed") return "failed";
|
|
if (run.status === "blocked") return "blocked";
|
|
if (run.status === "cancelled") return "cancelled";
|
|
return "running";
|
|
}
|
|
|
|
function buildRunInput(task: QueueTaskRecord, input: JsonRecord): CreateRunInput {
|
|
if (!task.workspaceRef || !task.providerId || !task.executionPolicy) throw new AgentRunError("schema-invalid", "queue dispatch task is missing runtime fields", { httpStatus: 400 });
|
|
const traceSink = input.traceSink === undefined ? { kind: "queue", taskId: task.id, queue: task.queue, lane: task.lane } : input.traceSink;
|
|
return {
|
|
tenantId: task.tenantId,
|
|
projectId: task.projectId,
|
|
workspaceRef: task.workspaceRef,
|
|
sessionRef: task.sessionRef,
|
|
resourceBundleRef: task.resourceBundleRef,
|
|
providerId: task.providerId,
|
|
backendProfile: task.backendProfile,
|
|
executionPolicy: task.executionPolicy,
|
|
traceSink: traceSink as JsonValue,
|
|
};
|
|
}
|
|
|
|
function buildRunnerJobInput(task: QueueTaskRecord, commandId: string, input: JsonRecord): CreateRunnerJobInput {
|
|
const jobInput: CreateRunnerJobInput = { commandId, idempotencyKey: optionalString(input.idempotencyKey) ?? `queue:${task.id}:runner-job` };
|
|
const copyString = (inputKey: string, outputKey: keyof CreateRunnerJobInput): void => {
|
|
const value = optionalString(input[inputKey]);
|
|
if (value) jobInput[outputKey] = value as never;
|
|
};
|
|
copyString("managerUrl", "managerUrl");
|
|
copyString("image", "image");
|
|
copyString("namespace", "namespace");
|
|
copyString("attemptId", "attemptId");
|
|
copyString("runnerId", "runnerId");
|
|
copyString("sourceCommit", "sourceCommit");
|
|
copyString("serviceAccountName", "serviceAccountName");
|
|
if (input.transientEnv !== undefined) {
|
|
if (!Array.isArray(input.transientEnv)) throw new AgentRunError("schema-invalid", "transientEnv must be an array", { httpStatus: 400 });
|
|
jobInput.transientEnv = input.transientEnv.map((item, index) => asRecord(item, `transientEnv[${index}]`));
|
|
}
|
|
return jobInput;
|
|
}
|
|
|
|
function stringFrom(record: JsonRecord, key: string): string {
|
|
const value = record[key];
|
|
if (typeof value !== "string" || value.length === 0) throw new AgentRunError("infra-failed", `runner job response missing ${key}`, { httpStatus: 502 });
|
|
return value;
|
|
}
|
|
|
|
function optionalString(value: unknown): string | undefined {
|
|
return typeof value === "string" && value.trim().length > 0 ? value.trim() : undefined;
|
|
}
|