216209ca95
Co-authored-by: AgentRun Codex <agentrun-codex@users.noreply.github.com>
169 lines
8.8 KiB
TypeScript
169 lines
8.8 KiB
TypeScript
import { AgentRunError } from "../common/errors.js";
|
|
import type { CommandRecord, CreateRunInput, JsonRecord, JsonValue, QueueAttemptRef, QueueDispatchResult, QueueTaskRecord, QueueTaskState, RunRecord } from "../common/types.js";
|
|
import { asRecord } from "../common/validation.js";
|
|
import type { AgentRunStore } from "./store.js";
|
|
import { isTerminalQueueTaskState } from "./store.js";
|
|
import { createKubernetesRunnerJob, type CreateRunnerJobInput, type RunnerJobDefaults } from "./kubernetes-runner-job.js";
|
|
|
|
export interface DispatchQueueTaskOptions {
|
|
store: AgentRunStore;
|
|
taskId: string;
|
|
input: JsonRecord;
|
|
defaults: RunnerJobDefaults;
|
|
}
|
|
|
|
export async function dispatchQueueTask(options: DispatchQueueTaskOptions): Promise<QueueDispatchResult> {
|
|
const task = await options.store.getQueueTask(options.taskId);
|
|
assertDispatchable(task);
|
|
|
|
const run = await options.store.createRun(buildRunInput(task, options.input));
|
|
const command = await options.store.createCommand(run.id, { type: "turn", payload: task.payload, idempotencyKey: `queue:${task.id}:command` });
|
|
const runnerJob = await createKubernetesRunnerJob({
|
|
store: options.store,
|
|
runId: run.id,
|
|
input: buildRunnerJobInput(task, command.id, options.input),
|
|
defaults: options.defaults,
|
|
});
|
|
|
|
const attemptId = stringFrom(runnerJob, "attemptId");
|
|
const runnerJobs = await options.store.listRunnerJobs(run.id, command.id);
|
|
const runnerJobRecord = runnerJobs.find((item) => item.attemptId === attemptId) ?? runnerJobs.at(-1) ?? null;
|
|
const sessionId = run.sessionRef?.sessionId ?? null;
|
|
const sessionPath = sessionId ? `/api/v1/sessions/${encodeURIComponent(sessionId)}` : null;
|
|
const latestAttempt: QueueAttemptRef = {
|
|
attemptId,
|
|
state: "running",
|
|
runId: run.id,
|
|
commandId: command.id,
|
|
runnerJobId: runnerJobRecord?.id ?? null,
|
|
sessionId,
|
|
sessionPath,
|
|
};
|
|
const updatedTask = await options.store.updateQueueTaskAttempt(task.id, { state: "running", latestAttempt, sessionPath });
|
|
await options.store.appendEvent(run.id, "backend_status", { phase: "queue-dispatched", taskId: task.id, queue: task.queue, lane: task.lane, attemptId, runnerJobId: runnerJobRecord?.id ?? null, sessionPath });
|
|
|
|
return {
|
|
action: "queue-dispatch",
|
|
mutation: true,
|
|
task: updatedTask,
|
|
run,
|
|
command,
|
|
runnerJob,
|
|
envImage: jsonRecordOrNull(runnerJob.envImage),
|
|
workReady: jsonRecordOrNull(runnerJob.workReady),
|
|
latestAttempt,
|
|
pollActions: [
|
|
dispatchActionDescriptor({ action: "inspect-task", operation: "describe", resourceKind: "task", resourceName: task.id }),
|
|
dispatchActionDescriptor({ action: "inspect-run", operation: "describe", resourceKind: "run", resourceName: run.id, runId: run.id }),
|
|
dispatchActionDescriptor({ action: "inspect-command", operation: "describe", resourceKind: "command", resourceName: command.id, runId: run.id, commandId: command.id }),
|
|
dispatchActionDescriptor({ action: "poll-events", operation: "events", resourceKind: "run", resourceName: run.id, runId: run.id, commandId: command.id, afterSeq: 0, limit: 100 }),
|
|
],
|
|
};
|
|
}
|
|
|
|
function dispatchActionDescriptor(input: { action: string; operation: string; resourceKind: string; resourceName: string; runId?: string | null; commandId?: string | null; afterSeq?: number | null; limit?: number | null }): JsonRecord {
|
|
return {
|
|
action: input.action,
|
|
operation: input.operation,
|
|
resourceKind: input.resourceKind,
|
|
resourceName: input.resourceName,
|
|
runId: input.runId ?? null,
|
|
commandId: input.commandId ?? null,
|
|
...(input.afterSeq !== undefined ? { afterSeq: input.afterSeq } : {}),
|
|
...(input.limit !== undefined ? { limit: input.limit } : {}),
|
|
valuesPrinted: false,
|
|
};
|
|
}
|
|
|
|
export async function refreshQueueTaskFromCore(store: AgentRunStore, taskId: string): Promise<QueueTaskRecord> {
|
|
const task = await store.getQueueTask(taskId);
|
|
if (isTerminalQueueTaskState(task.state)) return task;
|
|
const latestAttempt = task.latestAttempt;
|
|
if (!latestAttempt?.runId || !latestAttempt.commandId) throw new AgentRunError("schema-invalid", `queue task ${taskId} has no Core attempt to refresh`, { httpStatus: 409 });
|
|
const [run, command] = await Promise.all([store.getRun(latestAttempt.runId), store.getCommand(latestAttempt.commandId)]);
|
|
if (command.runId !== run.id) throw new AgentRunError("schema-invalid", `queue task ${taskId} attempt references mismatched run and command`, { httpStatus: 409 });
|
|
const state = queueStateFromCore(run, command);
|
|
const sessionId = run.sessionRef?.sessionId ?? latestAttempt.sessionId ?? null;
|
|
const sessionPath = sessionId ? `/api/v1/sessions/${encodeURIComponent(sessionId)}` : latestAttempt.sessionPath ?? null;
|
|
const nextAttempt: QueueAttemptRef = { ...latestAttempt, state, sessionId, sessionPath };
|
|
return store.updateQueueTaskAttempt(task.id, { state, latestAttempt: nextAttempt, sessionPath });
|
|
}
|
|
|
|
function assertDispatchable(task: QueueTaskRecord): void {
|
|
if (isTerminalQueueTaskState(task.state)) throw new AgentRunError(task.state === "cancelled" ? "cancelled" : "schema-invalid", `queue task ${task.id} is already terminal: ${task.state}`, { httpStatus: 409 });
|
|
if (task.state !== "pending") throw new AgentRunError("schema-invalid", `queue task ${task.id} is not pending: ${task.state}`, { httpStatus: 409 });
|
|
if (!task.workspaceRef) throw new AgentRunError("schema-invalid", "queue dispatch requires workspaceRef", { httpStatus: 400 });
|
|
if (!task.providerId) throw new AgentRunError("schema-invalid", "queue dispatch requires providerId", { httpStatus: 400 });
|
|
if (!task.executionPolicy) throw new AgentRunError("schema-invalid", "queue dispatch requires executionPolicy", { httpStatus: 400 });
|
|
}
|
|
|
|
function queueStateFromCore(run: RunRecord, command: CommandRecord): QueueTaskState {
|
|
const runState = queueTerminalStateFromRun(run);
|
|
if (runState) return runState;
|
|
if (command.state === "completed") return "completed";
|
|
if (command.state === "failed") return "failed";
|
|
if (command.state === "cancelled") return "cancelled";
|
|
return "running";
|
|
}
|
|
|
|
function queueTerminalStateFromRun(run: RunRecord): QueueTaskState | null {
|
|
if (run.status === "completed") return "completed";
|
|
if (run.status === "failed") return "failed";
|
|
if (run.status === "blocked") return "blocked";
|
|
if (run.status === "cancelled") return "cancelled";
|
|
return null;
|
|
}
|
|
|
|
function buildRunInput(task: QueueTaskRecord, input: JsonRecord): CreateRunInput {
|
|
if (!task.workspaceRef || !task.providerId || !task.executionPolicy) throw new AgentRunError("schema-invalid", "queue dispatch task is missing runtime fields", { httpStatus: 400 });
|
|
const traceSink = input.traceSink === undefined ? { kind: "queue", taskId: task.id, queue: task.queue, lane: task.lane } : input.traceSink;
|
|
return {
|
|
tenantId: task.tenantId,
|
|
projectId: task.projectId,
|
|
workspaceRef: task.workspaceRef,
|
|
sessionRef: task.sessionRef,
|
|
resourceBundleRef: task.resourceBundleRef,
|
|
providerId: task.providerId,
|
|
backendProfile: task.backendProfile,
|
|
executionPolicy: task.executionPolicy,
|
|
traceSink: traceSink as JsonValue,
|
|
};
|
|
}
|
|
|
|
function buildRunnerJobInput(task: QueueTaskRecord, commandId: string, input: JsonRecord): CreateRunnerJobInput {
|
|
const jobInput: CreateRunnerJobInput = { commandId, idempotencyKey: optionalString(input.idempotencyKey) ?? `queue:${task.id}:runner-job` };
|
|
const copyString = (inputKey: string, outputKey: keyof CreateRunnerJobInput): void => {
|
|
const value = optionalString(input[inputKey]);
|
|
if (value) jobInput[outputKey] = value as never;
|
|
};
|
|
copyString("managerUrl", "managerUrl");
|
|
copyString("image", "image");
|
|
copyString("namespace", "namespace");
|
|
copyString("attemptId", "attemptId");
|
|
copyString("runnerId", "runnerId");
|
|
copyString("sourceCommit", "sourceCommit");
|
|
copyString("serviceAccountName", "serviceAccountName");
|
|
const imageRef = jsonRecordOrNull(input.imageRef) ?? jsonRecordOrNull(task.metadata.aipodImageRef);
|
|
if (imageRef) jobInput.imageRef = imageRef;
|
|
if (input.transientEnv !== undefined) {
|
|
if (!Array.isArray(input.transientEnv)) throw new AgentRunError("schema-invalid", "transientEnv must be an array", { httpStatus: 400 });
|
|
jobInput.transientEnv = input.transientEnv.map((item, index) => asRecord(item, `transientEnv[${index}]`));
|
|
}
|
|
return jobInput;
|
|
}
|
|
|
|
function stringFrom(record: JsonRecord, key: string): string {
|
|
const value = record[key];
|
|
if (typeof value !== "string" || value.length === 0) throw new AgentRunError("infra-failed", `runner job response missing ${key}`, { httpStatus: 502 });
|
|
return value;
|
|
}
|
|
|
|
function jsonRecordOrNull(value: unknown): JsonRecord | null {
|
|
if (!value || typeof value !== "object" || Array.isArray(value)) return null;
|
|
return value as JsonRecord;
|
|
}
|
|
|
|
function optionalString(value: unknown): string | undefined {
|
|
return typeof value === "string" && value.trim().length > 0 ? value.trim() : undefined;
|
|
}
|