Files
pikasTech-agentrun/src/mgr/server.ts
T
2026-06-11 21:00:29 +08:00

657 lines
42 KiB
TypeScript

import type { Server } from "node:http";
import { createServer } from "node:http";
import type { AddressInfo } from "node:net";
import type { AgentRunStore, ListQueueTasksInput, ListSessionsInput, SessionEventPageInput } from "./store.js";
import { openAgentRunStoreFromEnv } from "./store.js";
import { AgentRunError, errorToJson } from "../common/errors.js";
import { asRecord, validateBackendProfile, validateCreateCommand, validateCreateQueueTask, validateCreateRun, validateQueueTaskState, validateSessionListState } from "../common/validation.js";
import { isBackendProfile } from "../common/backend-profiles.js";
import type { ApiErrorBody, ApiOkBody, JsonRecord, JsonValue, QueueTaskRecord, RunEvent } from "../common/types.js";
import { createKubernetesRunnerJob } from "./kubernetes-runner-job.js";
import { dispatchQueueTask, refreshQueueTaskFromCore } from "./queue-dispatch.js";
import { buildRunResult } from "./result.js";
import { runnerJobStatusSummary } from "./runner-job-status.js";
import { createSessionPvc, deleteSessionPvc, getSessionPvcSummary, refreshSessionPvcSummary, runSessionStorageGc } from "./session-pvc.js";
import type { SessionPvcSummary } from "./session-pvc.js";
import type { SessionPvcOptions } from "./session-pvc.js";
import { assertManagerRequestAuthorized, managerAuthSummary, managerServerAuthConfigFromEnv, type ManagerAuthConfig } from "./auth.js";
import { getProviderProfileConfig, getProviderProfileValidation, listBackendCapabilities, listProviderProfiles, removeProviderProfile, setProviderProfileConfig, setProviderProfileCredential, showProviderProfile, validateProviderProfile } from "./provider-profiles.js";
import { listToolCredentials, setGithubSshToolCredential, showToolCredential } from "./tool-credentials.js";
import { aipodSpecFromInput, applyAipodSpec, deleteAipodSpec, listAipodSpecs, renderAipodSpecByName, showAipodSpec } from "../common/aipod-specs.js";
import { staticWorkReadyCapabilitySummary } from "../common/work-ready.js";
function pvcOptions(defaults: { kubectlCommand?: string } | undefined): SessionPvcOptions {
return defaults?.kubectlCommand ? { kubectlCommand: defaults.kubectlCommand } : {};
}
function sessionPvcOptionsForRequest(serverDefaults: { kubectlHandler?: import("./session-pvc.js").KubectlHandler; kubectlCommand?: string; storageClassName?: string; size?: string } | undefined, runnerJobDefaults: { kubectlCommand?: string } | undefined): SessionPvcOptions {
if (serverDefaults?.kubectlHandler) {
const opts: SessionPvcOptions = { kubectlHandler: serverDefaults.kubectlHandler };
if (serverDefaults.kubectlCommand) opts.kubectlCommand = serverDefaults.kubectlCommand;
if (serverDefaults.storageClassName) opts.storageClassName = serverDefaults.storageClassName;
if (serverDefaults.size) opts.size = serverDefaults.size;
return opts;
}
return pvcOptions(runnerJobDefaults);
}
export interface ManagerServerOptions {
store?: AgentRunStore;
port?: number;
host?: string;
sourceCommit?: string;
runnerJobDefaults?: {
namespace?: string;
managerUrl?: string;
image?: string;
envIdentity?: string;
artifactCatalogFile?: string;
serviceAccountName?: string;
kubectlCommand?: string;
unideskSshEndpointEnv?: JsonRecord;
};
sessionPvcOptions?: { kubectlHandler?: import("./session-pvc.js").KubectlHandler; kubectlCommand?: string; storageClassName?: string; size?: string };
providerProfileOptions?: { namespace?: string; kubectlCommand?: string };
toolCredentialOptions?: { namespace?: string; kubectlCommand?: string };
aipodSpecDir?: string;
auth?: ManagerAuthConfig;
}
export interface StartedManagerServer {
server: Server;
baseUrl: string;
store: AgentRunStore;
}
export async function startManagerServer(options: ManagerServerOptions = {}): Promise<StartedManagerServer> {
const store = options.store ?? await openAgentRunStoreFromEnv();
const sourceCommit = options.sourceCommit ?? process.env.AGENTRUN_SOURCE_COMMIT ?? "unknown";
const runnerJobDefaults = options.runnerJobDefaults;
const sessionPvcDefaults = options.sessionPvcOptions;
const providerProfileDefaults = options.providerProfileOptions;
const toolCredentialDefaults = options.toolCredentialOptions;
const aipodSpecDir = options.aipodSpecDir ?? process.env.AGENTRUN_AIPOD_SPEC_DIR;
const auth = options.auth ?? managerServerAuthConfigFromEnv();
const authSummary = managerAuthSummary(auth);
const server = createServer(async (req, res) => {
const traceId = `trc_${Date.now().toString(36)}_${Math.random().toString(36).slice(2, 8)}`;
try {
const method = req.method ?? "GET";
const url = new URL(req.url ?? "/", "http://agentrun.local");
assertManagerRequestAuthorized(req, url.pathname, auth);
const data = await route({ method, url, body: await readBody(req), store, sourceCommit, authSummary, ...(runnerJobDefaults ? { runnerJobDefaults } : {}), ...(sessionPvcDefaults ? { sessionPvcDefaults } : {}), ...(providerProfileDefaults ? { providerProfileDefaults } : {}), ...(toolCredentialDefaults ? { toolCredentialDefaults } : {}), ...(aipodSpecDir ? { aipodSpecDir } : {}) });
writeJson(res, 200, { ok: true, data, traceId });
} catch (error) {
const agentError = normalizeError(error);
writeJson(res, agentError.httpStatus, { ok: false, failureKind: agentError.failureKind, message: agentError.message, traceId, error: errorToJson(error) });
}
});
await new Promise<void>((resolve) => server.listen(options.port ?? 0, options.host ?? "127.0.0.1", resolve));
const address = server.address() as AddressInfo;
return { server, baseUrl: `http://${address.address}:${address.port}`, store };
}
async function readBody(req: import("node:http").IncomingMessage): Promise<unknown> {
if (req.method === "GET" || req.method === "HEAD") return null;
const chunks: Buffer[] = [];
for await (const chunk of req) chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk));
const text = Buffer.concat(chunks).toString("utf8").trim();
if (text.length === 0) return null;
return JSON.parse(text) as unknown;
}
async function refreshQueueTaskForRead(store: AgentRunStore, taskId: string): Promise<QueueTaskRecord> {
const task = await store.getQueueTask(taskId);
return await refreshQueueTaskRecordForRead(store, task);
}
async function refreshQueuePageForRead(store: AgentRunStore, input: ListQueueTasksInput): Promise<void> {
const page = await store.listQueueTasks(input);
await refreshQueueTaskRecordsForRead(store, page.items);
}
async function refreshRunningQueueTasksForRead(store: AgentRunStore, queue?: string): Promise<void> {
const page = await store.listQueueTasks({ ...(queue ? { queue } : {}), state: "running", limit: 100 });
await refreshQueueTaskRecordsForRead(store, page.items);
}
async function refreshQueueTaskRecordsForRead(store: AgentRunStore, tasks: QueueTaskRecord[]): Promise<void> {
await Promise.all(tasks.map((task) => refreshQueueTaskRecordForRead(store, task)));
}
async function refreshQueueTaskRecordForRead(store: AgentRunStore, task: QueueTaskRecord): Promise<QueueTaskRecord> {
if (!task.latestAttempt?.runId || !task.latestAttempt.commandId) return task;
try {
return await refreshQueueTaskFromCore(store, task.id);
} catch {
return task;
}
}
async function queueCommanderForRead(store: AgentRunStore, queue: string | undefined, readerId: string | null): Promise<JsonValue> {
const snapshot = await store.queueCommander(queue, readerId) as unknown as JsonRecord;
const items = Array.isArray(snapshot.items) ? snapshot.items : [];
const enrichedItems = await Promise.all(items.map(async (item) => {
const task = asJsonRecord(item);
if (!task) return item as JsonValue;
const supervisor = await queueTaskSupervisor(store, task);
return supervisor ? { ...task, supervisor, valuesPrinted: false } : task;
}));
return { ...snapshot, items: enrichedItems, valuesPrinted: false } as JsonValue;
}
async function queueTaskSupervisor(store: AgentRunStore, task: JsonRecord): Promise<JsonRecord | null> {
const attempt = asJsonRecord(task.latestAttempt);
const runId = stringJsonValue(attempt?.runId);
if (!runId) return null;
try {
const result = await buildRunResult(store, runId, stringJsonValue(attempt?.commandId) ?? undefined);
const liveness = asJsonRecord(result.liveness);
const lastActivity = asJsonRecord(liveness?.lastActivity ?? liveness?.lastCommandActivity);
const timeoutBudget = asJsonRecord(liveness?.timeoutBudget);
const terminalClassification = asJsonRecord(result.terminalClassification ?? liveness?.terminalClassification);
return {
runId: stringJsonValue(result.runId),
commandId: stringJsonValue(result.commandId),
status: stringJsonValue(result.status),
terminalStatus: stringJsonValue(result.terminalStatus),
failureKind: stringJsonValue(result.failureKind),
diagnosis: asJsonRecord(result.diagnosis),
terminalClassification: terminalClassification ? compactTerminalClassification(terminalClassification) : null,
phase: stringJsonValue(liveness?.phase),
active: liveness?.active === true,
lastSeq: numberJsonValue(liveness?.lastSeq ?? result.lastSeq),
lastActivity: lastActivity ? compactActivity(lastActivity) : null,
timeoutBudget: timeoutBudget ? compactTimeoutBudget(timeoutBudget) : null,
recoveryActions: compactRecoveryActions(liveness?.recoveryActions),
valuesPrinted: false,
};
} catch (error) {
return { phase: "unavailable", failureKind: "infra-failed", message: error instanceof Error ? error.message : String(error), valuesPrinted: false };
}
}
function compactActivity(activity: JsonRecord): JsonRecord {
return {
sourceSeq: numberJsonValue(activity.sourceSeq ?? activity.seq),
eventId: stringJsonValue(activity.eventId),
activityKind: stringJsonValue(activity.activityKind),
type: stringJsonValue(activity.type),
status: stringJsonValue(activity.status),
toolName: stringJsonValue(activity.toolName),
itemId: stringJsonValue(activity.itemId),
ageMs: numberJsonValue(activity.ageMs),
summary: boundedJsonString(activity.summary, 180),
valuesPrinted: false,
};
}
function compactTimeoutBudget(budget: JsonRecord): JsonRecord {
return {
state: stringJsonValue(budget.state),
timeoutKind: stringJsonValue(budget.timeoutKind),
timeoutMs: numberJsonValue(budget.timeoutMs),
elapsedMs: numberJsonValue(budget.elapsedMs),
idleElapsedMs: numberJsonValue(budget.idleElapsedMs),
remainingMs: numberJsonValue(budget.remainingMs),
startedAt: stringJsonValue(budget.startedAt),
idleStartedAt: stringJsonValue(budget.idleStartedAt),
lastActivityAt: stringJsonValue(budget.lastActivityAt),
lastActivitySeq: numberJsonValue(budget.lastActivitySeq),
commandElapsedMs: numberJsonValue(budget.commandElapsedMs),
runElapsedMs: numberJsonValue(budget.runElapsedMs),
source: stringJsonValue(budget.source),
valuesPrinted: false,
};
}
function compactTerminalClassification(record: JsonRecord): JsonRecord {
return {
category: stringJsonValue(record.category),
confidence: stringJsonValue(record.confidence),
providerEvidence: stringJsonValue(record.providerEvidence),
providerInterruption: stringJsonValue(record.providerInterruption),
providerInterruptionKnown: record.providerInterruptionKnown === true,
providerInterruptionReason: boundedJsonString(record.providerInterruptionReason, 240),
retryInterruptionObserved: record.retryInterruptionObserved === true,
retryInterruptionSeq: numberJsonValue(record.retryInterruptionSeq),
retryInterruptionKind: stringJsonValue(record.retryInterruptionKind),
hardTimeout: record.hardTimeout === true,
idleTimeout: record.idleTimeout === true,
timeoutKind: stringJsonValue(record.timeoutKind),
transportDisconnectObserved: record.transportDisconnectObserved === true,
transportDisconnectSeq: numberJsonValue(record.transportDisconnectSeq),
reason: boundedJsonString(record.reason, 240),
valuesPrinted: false,
};
}
function compactRecoveryActions(value: JsonValue | undefined): JsonValue[] {
if (!Array.isArray(value)) return [];
return value.slice(0, 5).map((item) => {
const action = asJsonRecord(item);
if (!action) return { action: "unknown", valuesPrinted: false };
return {
action: stringJsonValue(action.action),
reason: stringJsonValue(action.reason),
runId: stringJsonValue(action.runId),
commandId: stringJsonValue(action.commandId),
sessionId: stringJsonValue(action.sessionId),
afterSeq: numberJsonValue(action.afterSeq),
command: boundedJsonString(action.command, 220),
hint: boundedJsonString(action.hint, 220),
valuesPrinted: false,
};
});
}
async function route({ method, url, body, store, sourceCommit, authSummary, runnerJobDefaults, sessionPvcDefaults, providerProfileDefaults, toolCredentialDefaults, aipodSpecDir }: { method: string; url: URL; body: unknown; store: AgentRunStore; sourceCommit: string; authSummary?: JsonRecord; runnerJobDefaults?: NonNullable<ManagerServerOptions["runnerJobDefaults"]>; sessionPvcDefaults?: NonNullable<ManagerServerOptions["sessionPvcOptions"]>; providerProfileDefaults?: NonNullable<ManagerServerOptions["providerProfileOptions"]>; toolCredentialDefaults?: NonNullable<ManagerServerOptions["toolCredentialOptions"]>; aipodSpecDir?: string }): Promise<JsonValue> {
const path = url.pathname;
if (method === "GET" && (path === "/health" || path === "/health/live" || path === "/health/readiness")) {
const database = await store.health();
const ready = path === "/health/live" ? true : database.ready;
return { serviceId: "agentrun-mgr", live: true, ready, database, sourceCommit, auth: authSummary ?? null, runnerWorkReady: staticWorkReadyCapabilitySummary(), secretRefs: { databaseUrl: database.adapter === "postgres" ? "redacted" : "not-used", valuesPrinted: false } };
}
if (method === "GET" && path === "/api/v1/backends") return await listBackendCapabilities(providerProfileDefaults) as JsonValue;
if (method === "GET" && path === "/api/v1/tool-credentials") return await listToolCredentials(toolCredentialDefaults) as JsonValue;
const toolCredentialMatch = path.match(/^\/api\/v1\/tool-credentials\/([^/]+)$/u);
if (method === "GET" && toolCredentialMatch) return await showToolCredential(decodeURIComponent(toolCredentialMatch[1] ?? ""), toolCredentialDefaults) as JsonValue;
const toolCredentialGithubSshMatch = path.match(/^\/api\/v1\/tool-credentials\/github-ssh\/credential$/u);
if (method === "PUT" && toolCredentialGithubSshMatch) return await setGithubSshToolCredential(body ?? {}, toolCredentialDefaults) as JsonValue;
if (method === "GET" && path === "/api/v1/aipod-specs") return await listAipodSpecs(aipodSpecDir) as JsonValue;
if (method === "POST" && path === "/api/v1/aipod-specs") return await applyAipodSpec(body ?? {}, aipodSpecDir) as JsonValue;
const aipodSpecMatch = path.match(/^\/api\/v1\/aipod-specs\/([^/]+)$/u);
if (method === "GET" && aipodSpecMatch) return await showAipodSpec(decodeURIComponent(aipodSpecMatch[1] ?? ""), aipodSpecDir) as JsonValue;
if (method === "PUT" && aipodSpecMatch) return await applyNamedAipodSpec(decodeURIComponent(aipodSpecMatch[1] ?? ""), body, aipodSpecDir) as JsonValue;
if (method === "DELETE" && aipodSpecMatch) return await deleteAipodSpec(decodeURIComponent(aipodSpecMatch[1] ?? ""), aipodSpecDir) as JsonValue;
const aipodSpecRenderMatch = path.match(/^\/api\/v1\/aipod-specs\/([^/]+)\/render$/u);
if (method === "POST" && aipodSpecRenderMatch) return await renderAipodSpecByName(decodeURIComponent(aipodSpecRenderMatch[1] ?? ""), asRecord(body ?? {}, "aipodSpecRender"), aipodSpecDir) as JsonValue;
if (method === "GET" && path === "/api/v1/provider-profiles") return await listProviderProfiles(providerProfileDefaults) as JsonValue;
const providerProfileMatch = path.match(/^\/api\/v1\/provider-profiles\/([^/]+)$/u);
if (method === "GET" && providerProfileMatch) return await showProviderProfile(providerProfileMatch[1] ?? "", providerProfileDefaults) as JsonValue;
if (method === "DELETE" && providerProfileMatch) return await removeProviderProfile(providerProfileMatch[1] ?? "", providerProfileDefaults) as JsonValue;
const providerConfigMatch = path.match(/^\/api\/v1\/provider-profiles\/([^/]+)\/config$/u);
if (method === "GET" && providerConfigMatch) return await getProviderProfileConfig(providerConfigMatch[1] ?? "", providerProfileDefaults) as JsonValue;
if (method === "PUT" && providerConfigMatch) return await setProviderProfileConfig(providerConfigMatch[1] ?? "", body, providerProfileDefaults) as JsonValue;
const providerCredentialMatch = path.match(/^\/api\/v1\/provider-profiles\/([^/]+)\/credential$/u);
if (method === "PUT" && providerCredentialMatch) return await setProviderProfileCredential(providerCredentialMatch[1] ?? "", body, providerProfileDefaults) as JsonValue;
const providerValidationCreateMatch = path.match(/^\/api\/v1\/provider-profiles\/([^/]+)\/validate$/u);
if (method === "POST" && providerValidationCreateMatch) return await validateProviderProfile(providerValidationCreateMatch[1] ?? "", body, { store, sourceCommit, ...(providerProfileDefaults ?? {}), ...(runnerJobDefaults ? { runnerJobDefaults } : {}) }) as JsonValue;
const providerValidationShowMatch = path.match(/^\/api\/v1\/provider-profiles\/([^/]+)\/validations\/([^/]+)$/u);
if (method === "GET" && providerValidationShowMatch) return await getProviderProfileValidation(providerValidationShowMatch[1] ?? "", providerValidationShowMatch[2] ?? "", { store }) as JsonValue;
if (method === "GET" && path === "/api/v1/sessions") {
const input: ListSessionsInput = { limit: integerQuery(url, "limit", 50) };
const state = url.searchParams.get("state");
const backendProfile = url.searchParams.get("backendProfile") ?? url.searchParams.get("profile");
const readerId = url.searchParams.get("readerId");
const cursor = url.searchParams.get("cursor");
if (state) input.state = validateSessionListState(state);
if (backendProfile) input.backendProfile = validateBackendProfile(backendProfile);
if (readerId) input.readerId = readerId;
if (cursor) input.cursor = cursor;
return await store.listSessions(input) as unknown as JsonValue;
}
const sessionMatch = path.match(/^\/api\/v1\/sessions\/([^/]+)$/u);
if (method === "GET" && sessionMatch) {
const summary = await store.getSessionSummary(sessionMatch[1] ?? "", url.searchParams.get("readerId"));
const runId = summary.activeRunId ?? summary.lastRunId;
if (!runId) return summary as unknown as JsonValue;
const commandId = summary.activeCommandId ?? summary.lastCommandId ?? undefined;
try {
const result = await buildRunResult(store, runId, commandId);
return {
...summary,
liveness: result.liveness ?? null,
supervisor: {
runId: result.runId,
commandId: result.commandId,
status: result.status,
terminalStatus: result.terminalStatus,
lastSeq: result.lastSeq,
liveness: result.liveness ?? null,
recoveryActions: typeof result.liveness === "object" && result.liveness !== null && !Array.isArray(result.liveness) ? (result.liveness as JsonRecord).recoveryActions ?? [] : [],
...(result.steerDelivery ? { steerDelivery: result.steerDelivery } : {}),
valuesPrinted: false,
},
} as unknown as JsonValue;
} catch (error) {
return {
...summary,
liveness: {
phase: "unavailable",
active: summary.active,
failureKind: "infra-failed",
message: error instanceof Error ? error.message : String(error),
valuesPrinted: false,
},
} as unknown as JsonValue;
}
}
const sessionTraceMatch = path.match(/^\/api\/v1\/sessions\/([^/]+)\/trace$/u);
if (method === "GET" && sessionTraceMatch) {
const input: SessionEventPageInput = { afterSeq: integerQuery(url, "afterSeq", 0), limit: integerQuery(url, "limit", 100) };
const runId = url.searchParams.get("runId");
if (runId) input.runId = runId;
return await store.listSessionTrace(sessionTraceMatch[1] ?? "", input) as unknown as JsonValue;
}
const sessionOutputMatch = path.match(/^\/api\/v1\/sessions\/([^/]+)\/output$/u);
if (method === "GET" && sessionOutputMatch) {
const input: SessionEventPageInput = { afterSeq: integerQuery(url, "afterSeq", 0), limit: integerQuery(url, "limit", 100) };
const runId = url.searchParams.get("runId");
if (runId) input.runId = runId;
return await store.listSessionOutput(sessionOutputMatch[1] ?? "", input) as unknown as JsonValue;
}
const sessionReadMatch = path.match(/^\/api\/v1\/sessions\/([^/]+)\/read$/u);
if (method === "POST" && sessionReadMatch) {
const record = body === null ? {} : asRecord(body, "read");
const readerId = typeof record.readerId === "string" && record.readerId.trim().length > 0 ? record.readerId.trim() : "cli";
return await store.markSessionRead(sessionReadMatch[1] ?? "", readerId) as unknown as JsonValue;
}
const sessionControlMatch = path.match(/^\/api\/v1\/sessions\/([^/]+)\/control$/u);
if (method === "POST" && sessionControlMatch) {
const record = asRecord(body ?? {}, "sessionControl");
const action = typeof record.action === "string" ? record.action : "";
const session = await store.getSessionSummary(sessionControlMatch[1] ?? "", typeof record.readerId === "string" ? record.readerId : null);
if (action === "read") return await store.markSessionRead(session.sessionId, typeof record.readerId === "string" && record.readerId.trim().length > 0 ? record.readerId.trim() : "cli") as unknown as JsonValue;
if (action === "cancel") {
const reason = typeof record.reason === "string" && record.reason.trim().length > 0 ? record.reason.trim() : undefined;
if (session.activeCommandId) return await store.cancelCommand(session.activeCommandId, reason) as unknown as JsonValue;
if (session.activeRunId) return await store.cancelRun(session.activeRunId, reason) as unknown as JsonValue;
throw new AgentRunError("schema-invalid", `session ${session.sessionId} has no active run or command`, { httpStatus: 409 });
}
throw new AgentRunError("schema-invalid", `session control action ${action} is not supported`, { httpStatus: 400 });
}
const sessionStorageMatch = path.match(/^\/api\/v1\/sessions\/([^/]+)\/storage$/u);
if (method === "GET" && sessionStorageMatch) {
const summary = await getSessionPvcSummary({ store, sessionId: sessionStorageMatch[1] ?? "", options: sessionPvcOptionsForRequest(sessionPvcDefaults, runnerJobDefaults) });
return summary as unknown as JsonValue;
}
if (method === "DELETE" && sessionStorageMatch) {
return await deleteSessionPvc({ store, sessionId: sessionStorageMatch[1] ?? "", options: sessionPvcOptionsForRequest(sessionPvcDefaults, runnerJobDefaults) }) as unknown as JsonValue;
}
const sessionStorageRefreshMatch = path.match(/^\/api\/v1\/sessions\/([^/]+)\/storage\/refresh$/u);
if (method === "POST" && sessionStorageRefreshMatch) {
const record = asRecord(body ?? {}, "sessionStorageRefresh");
const summary: SessionPvcSummary = {
pvcName: stringField(record, "pvcName"),
namespace: stringField(record, "namespace"),
pvcPhase: typeof record.pvcPhase === "string" ? record.pvcPhase : null,
storageSizeBytes: typeof record.storageSizeBytes === "number" ? record.storageSizeBytes : null,
storageFilesCount: typeof record.storageFilesCount === "number" ? record.storageFilesCount : null,
storageSha256: typeof record.storageSha256 === "string" ? record.storageSha256 : null,
storageUpdatedAt: typeof record.storageUpdatedAt === "string" ? record.storageUpdatedAt : new Date().toISOString(),
codexRolloutSubdir: typeof record.codexRolloutSubdir === "string" && record.codexRolloutSubdir.length > 0 ? record.codexRolloutSubdir : "sessions",
valuesPrinted: false,
};
const refreshed = await refreshSessionPvcSummary({ store, sessionId: sessionStorageRefreshMatch[1] ?? "", summary, options: sessionPvcOptionsForRequest(sessionPvcDefaults, runnerJobDefaults) });
return { action: "session-storage-refreshed", sessionId: refreshed.sessionId, summary: refreshed } as unknown as JsonValue;
}
if (method === "POST" && path === "/api/v1/sessions/storage/gc") {
const cycle = await runSessionStorageGc({ store, options: sessionPvcOptionsForRequest(sessionPvcDefaults, runnerJobDefaults) });
return cycle as unknown as JsonValue;
}
if (method === "POST" && path === "/api/v1/sessions") {
const record = asRecord(body ?? {}, "sessionCreate");
const sessionId = typeof record.sessionId === "string" && record.sessionId.trim().length > 0 ? record.sessionId.trim() : `sess_${Date.now().toString(36)}_${Math.random().toString(36).slice(2, 8)}`;
const tenantId = stringField(record, "tenantId");
const projectId = stringField(record, "projectId");
const backendProfileRaw = typeof record.backendProfile === "string" ? record.backendProfile : "codex";
if (!isBackendProfile(backendProfileRaw)) throw new AgentRunError("schema-invalid", `backendProfile ${backendProfileRaw} is not supported`, { httpStatus: 400 });
const conversationId = typeof record.conversationId === "string" ? record.conversationId : null;
const codexRolloutSubdir = typeof record.codexRolloutSubdir === "string" && record.codexRolloutSubdir.length > 0 ? record.codexRolloutSubdir : "sessions";
const expiresAt = typeof record.expiresAt === "string" ? record.expiresAt : new Date(Date.now() + 30 * 24 * 60 * 60 * 1000).toISOString();
const existing = await store.getSession(sessionId);
if (existing) {
if (existing.storageKind === "evicted") throw new AgentRunError("session-store-evicted", `session ${sessionId} storage has been evicted; create a new sessionId`, { httpStatus: 409 });
if (existing.storageKind === "none" || !existing.storagePvcName) {
const recovered = await createSessionPvc({ store, sessionId, options: { ...sessionPvcOptionsForRequest(sessionPvcDefaults, runnerJobDefaults), defaultCodexRolloutSubdir: codexRolloutSubdir } });
return { action: "session-storage-recovered", session: existing, pvc: recovered, valuesPrinted: false } as unknown as JsonValue;
}
return { action: "session-exists", session: existing, pvcName: existing.storagePvcName ?? null, pvcPhase: existing.storagePvcPhase ?? null, codexRolloutSubdir: existing.codexRolloutSubdir ?? "sessions", valuesPrinted: false } as unknown as JsonValue;
}
const now = new Date().toISOString();
const session = await store.upsertSession({
sessionId,
tenantId,
projectId,
backendProfile: backendProfileRaw as never,
conversationId,
threadId: null,
metadata: typeof record.metadata === "object" && record.metadata !== null && !Array.isArray(record.metadata) ? record.metadata as Record<string, JsonValue> : {},
expiresAt,
codexRolloutSubdir,
});
const pvc = await createSessionPvc({ store, sessionId, options: { ...sessionPvcOptionsForRequest(sessionPvcDefaults, runnerJobDefaults), defaultCodexRolloutSubdir: codexRolloutSubdir } });
return { action: "session-created", session, pvc, valuesPrinted: false } as unknown as JsonValue;
}
if (method === "POST" && path === "/api/v1/queue/tasks") return await store.createQueueTask(validateCreateQueueTask(body)) as unknown as JsonValue;
if (method === "GET" && path === "/api/v1/queue/tasks") {
const state = url.searchParams.get("state");
const listInput: ListQueueTasksInput = { updatedAfter: integerQuery(url, "updatedAfter", 0), limit: integerQuery(url, "limit", 50) };
const queue = url.searchParams.get("queue");
const cursor = url.searchParams.get("cursor");
if (queue) listInput.queue = queue;
if (state) listInput.state = validateQueueTaskState(state);
if (cursor) listInput.cursor = cursor;
await refreshQueuePageForRead(store, listInput);
return await store.listQueueTasks(listInput) as unknown as JsonValue;
}
const queueTaskMatch = path.match(/^\/api\/v1\/queue\/tasks\/([^/]+)$/u);
if (method === "GET" && queueTaskMatch) return await refreshQueueTaskForRead(store, queueTaskMatch[1] ?? "") as unknown as JsonValue;
const queueTaskDispatchMatch = path.match(/^\/api\/v1\/queue\/tasks\/([^/]+)\/dispatch$/u);
if (method === "POST" && queueTaskDispatchMatch) {
const namespace = runnerJobDefaults?.namespace ?? process.env.AGENTRUN_RUNTIME_NAMESPACE ?? "agentrun-v01";
return await dispatchQueueTask({
store,
taskId: queueTaskDispatchMatch[1] ?? "",
input: asRecord(body ?? {}, "queueDispatch"),
defaults: {
namespace,
managerUrl: runnerJobDefaults?.managerUrl ?? process.env.AGENTRUN_INTERNAL_MGR_URL ?? `http://agentrun-mgr.${namespace}.svc.cluster.local:8080`,
image: runnerJobDefaults?.image ?? process.env.AGENTRUN_RUNNER_IMAGE ?? "",
sourceCommit,
...optionalStringRecord("envIdentity", runnerJobDefaults?.envIdentity ?? process.env.AGENTRUN_ENV_IDENTITY),
...optionalStringRecord("artifactCatalogFile", runnerJobDefaults?.artifactCatalogFile ?? process.env.AGENTRUN_ARTIFACT_CATALOG_FILE),
serviceAccountName: runnerJobDefaults?.serviceAccountName ?? process.env.AGENTRUN_RUNNER_SERVICE_ACCOUNT ?? "agentrun-v01-runner",
...(runnerJobDefaults?.kubectlCommand ? { kubectlCommand: runnerJobDefaults.kubectlCommand } : {}),
...(runnerJobDefaults?.unideskSshEndpointEnv ? { unideskSshEndpointEnv: runnerJobDefaults.unideskSshEndpointEnv } : {}),
},
}) as unknown as JsonValue;
}
const queueTaskRefreshMatch = path.match(/^\/api\/v1\/queue\/tasks\/([^/]+)\/refresh$/u);
if (method === "POST" && queueTaskRefreshMatch) return await refreshQueueTaskFromCore(store, queueTaskRefreshMatch[1] ?? "") as unknown as JsonValue;
const queueTaskCancelMatch = path.match(/^\/api\/v1\/queue\/tasks\/([^/]+)\/cancel$/u);
if (method === "POST" && queueTaskCancelMatch) {
const record = body === null ? {} : asRecord(body, "cancel");
const reason = typeof record.reason === "string" && record.reason.trim().length > 0 ? record.reason.trim() : undefined;
return await store.cancelQueueTask(queueTaskCancelMatch[1] ?? "", reason) as unknown as JsonValue;
}
const queueTaskReadMatch = path.match(/^\/api\/v1\/queue\/tasks\/([^/]+)\/read$/u);
if (method === "POST" && queueTaskReadMatch) {
const record = body === null ? {} : asRecord(body, "read");
const readerId = typeof record.readerId === "string" && record.readerId.trim().length > 0 ? record.readerId.trim() : "cli";
return await store.markQueueTaskRead(queueTaskReadMatch[1] ?? "", readerId) as unknown as JsonValue;
}
if (method === "GET" && path === "/api/v1/queue/stats") {
const queue = url.searchParams.get("queue") ?? undefined;
await refreshRunningQueueTasksForRead(store, queue);
return await store.queueStats(queue) as unknown as JsonValue;
}
if (method === "GET" && path === "/api/v1/queue/commander") {
const queue = url.searchParams.get("queue") ?? undefined;
await refreshRunningQueueTasksForRead(store, queue);
return await queueCommanderForRead(store, queue, url.searchParams.get("readerId"));
}
if (method === "POST" && path === "/api/v1/runs") return await store.createRun(validateCreateRun(body)) as unknown as JsonValue;
const runMatch = path.match(/^\/api\/v1\/runs\/([^/]+)$/u);
if (method === "GET" && runMatch) return await store.getRun(runMatch[1] ?? "") as unknown as JsonValue;
const eventMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/events$/u);
if (method === "GET" && eventMatch) {
const afterSeq = integerQuery(url, "afterSeq", 0);
const limit = integerQuery(url, "limit", 100);
return { items: await store.listEvents(eventMatch[1] ?? "", afterSeq, limit) as unknown as JsonValue };
}
const runResultMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/result$/u);
if (method === "GET" && runResultMatch) return await buildRunResult(store, runResultMatch[1] ?? "", url.searchParams.get("commandId") ?? undefined) as JsonValue;
const runCancelMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/cancel$/u);
if (method === "POST" && runCancelMatch) {
const record = body === null ? {} : asRecord(body, "cancel");
const reason = typeof record.reason === "string" && record.reason.trim().length > 0 ? record.reason.trim() : undefined;
return await store.cancelRun(runCancelMatch[1] ?? "", reason) as unknown as JsonValue;
}
const commandCreateMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/commands$/u);
if (method === "POST" && commandCreateMatch) return await store.createCommand(commandCreateMatch[1] ?? "", validateCreateCommand(body)) as unknown as JsonValue;
if (method === "GET" && commandCreateMatch) return { items: await store.listCommands(commandCreateMatch[1] ?? "", integerQuery(url, "afterSeq", 0), integerQuery(url, "limit", 20)) as unknown as JsonValue };
const runnerJobMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/runner-jobs$/u);
if (method === "POST" && runnerJobMatch) {
const namespace = runnerJobDefaults?.namespace ?? process.env.AGENTRUN_RUNTIME_NAMESPACE ?? "agentrun-v01";
return await createKubernetesRunnerJob({
store,
runId: runnerJobMatch[1] ?? "",
input: asRecord(body ?? {}, "runnerJob") as never,
defaults: {
namespace,
managerUrl: runnerJobDefaults?.managerUrl ?? process.env.AGENTRUN_INTERNAL_MGR_URL ?? `http://agentrun-mgr.${namespace}.svc.cluster.local:8080`,
image: runnerJobDefaults?.image ?? process.env.AGENTRUN_RUNNER_IMAGE ?? "",
sourceCommit,
...optionalStringRecord("envIdentity", runnerJobDefaults?.envIdentity ?? process.env.AGENTRUN_ENV_IDENTITY),
...optionalStringRecord("artifactCatalogFile", runnerJobDefaults?.artifactCatalogFile ?? process.env.AGENTRUN_ARTIFACT_CATALOG_FILE),
serviceAccountName: runnerJobDefaults?.serviceAccountName ?? process.env.AGENTRUN_RUNNER_SERVICE_ACCOUNT ?? "agentrun-v01-runner",
...(runnerJobDefaults?.kubectlCommand ? { kubectlCommand: runnerJobDefaults.kubectlCommand } : {}),
...(runnerJobDefaults?.unideskSshEndpointEnv ? { unideskSshEndpointEnv: runnerJobDefaults.unideskSshEndpointEnv } : {}),
},
}) as unknown as JsonValue;
}
if (method === "GET" && runnerJobMatch) {
const runId = runnerJobMatch[1] ?? "";
const commandId = url.searchParams.get("commandId") ?? undefined;
const jobs = await store.listRunnerJobs(runId, commandId);
const events = await store.listEvents(runId, 0, 500);
return { items: jobs.map((job) => runnerJobStatusSummary(job, events)), count: jobs.length, lastSeq: events.at(-1)?.seq ?? 0 };
}
const runnerJobShowMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/runner-jobs\/([^/]+)$/u);
if (method === "GET" && runnerJobShowMatch) {
const runId = runnerJobShowMatch[1] ?? "";
const runnerJobId = runnerJobShowMatch[2] ?? "";
const jobs = await store.listRunnerJobs(runId);
const job = jobs.find((item) => item.id === runnerJobId);
if (!job) throw new AgentRunError("schema-invalid", `runner job ${runnerJobId} was not found`, { httpStatus: 404 });
return runnerJobStatusSummary(job, await store.listEvents(runId, 0, 500)) as JsonValue;
}
const commandShowMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/commands\/([^/]+)$/u);
if (method === "GET" && commandShowMatch) return await store.getCommand(commandShowMatch[2] ?? "") as unknown as JsonValue;
const commandResultMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/commands\/([^/]+)\/result$/u);
if (method === "GET" && commandResultMatch) return await buildRunResult(store, commandResultMatch[1] ?? "", commandResultMatch[2] ?? "") as JsonValue;
if (method === "POST" && path === "/api/v1/runners/register") return await store.registerRunner(asRecord(body ?? {}, "runner")) as unknown as JsonValue;
const claimMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/claim$/u);
if (method === "POST" && claimMatch) {
const record = asRecord(body, "claim");
const runnerId = typeof record.runnerId === "string" ? record.runnerId : "";
if (runnerId.length === 0) throw new AgentRunError("schema-invalid", "runnerId is required", { httpStatus: 400 });
return await store.claimRun(claimMatch[1] ?? "", runnerId, numberField(record, "leaseMs", 60_000)) as unknown as JsonValue;
}
const leaseMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/lease$/u);
if (method === "PATCH" && leaseMatch) {
const record = asRecord(body, "lease");
const runnerId = typeof record.runnerId === "string" ? record.runnerId : "";
if (runnerId.length === 0) throw new AgentRunError("schema-invalid", "runnerId is required", { httpStatus: 400 });
return await store.heartbeat(leaseMatch[1] ?? "", runnerId, numberField(record, "leaseMs", 60_000)) as unknown as JsonValue;
}
const eventsAppendMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/events$/u);
if (method === "POST" && eventsAppendMatch) {
const record = asRecord(body, "event");
const type = typeof record.type === "string" ? record.type as RunEvent["type"] : "backend_status";
return await store.appendEvent(eventsAppendMatch[1] ?? "", type, asRecord(record.payload ?? {}, "event.payload")) as unknown as JsonValue;
}
const statusMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/status$/u);
if (method === "PATCH" && statusMatch) {
const record = asRecord(body, "status");
const terminalStatus = record.terminalStatus === "completed" || record.terminalStatus === "failed" || record.terminalStatus === "blocked" || record.terminalStatus === "cancelled" ? record.terminalStatus : "failed";
return await store.finishRun(statusMatch[1] ?? "", {
terminalStatus,
failureKind: typeof record.failureKind === "string" ? record.failureKind as never : null,
failureMessage: typeof record.failureMessage === "string" ? record.failureMessage : null,
...(typeof record.threadId === "string" ? { threadId: record.threadId } : {}),
...(typeof record.turnId === "string" ? { turnId: record.turnId } : {}),
}) as unknown as JsonValue;
}
const ackMatch = path.match(/^\/api\/v1\/commands\/([^/]+)\/ack$/u);
if (method === "POST" && ackMatch) return await store.ackCommand(ackMatch[1] ?? "") as unknown as JsonValue;
const commandStatusMatch = path.match(/^\/api\/v1\/commands\/([^/]+)\/status$/u);
if (method === "PATCH" && commandStatusMatch) {
const record = asRecord(body, "commandStatus");
const terminalStatus = record.terminalStatus === "completed" || record.terminalStatus === "failed" || record.terminalStatus === "blocked" || record.terminalStatus === "cancelled" ? record.terminalStatus : "failed";
return await store.finishCommand(commandStatusMatch[1] ?? "", {
terminalStatus,
failureKind: typeof record.failureKind === "string" ? record.failureKind as never : null,
failureMessage: typeof record.failureMessage === "string" ? record.failureMessage : null,
...(typeof record.threadId === "string" ? { threadId: record.threadId } : {}),
...(typeof record.turnId === "string" ? { turnId: record.turnId } : {}),
}) as unknown as JsonValue;
}
const commandCancelMatch = path.match(/^\/api\/v1\/commands\/([^/]+)\/cancel$/u);
if (method === "POST" && commandCancelMatch) {
const record = body === null ? {} : asRecord(body, "cancel");
const reason = typeof record.reason === "string" && record.reason.trim().length > 0 ? record.reason.trim() : undefined;
return await store.cancelCommand(commandCancelMatch[1] ?? "", reason) as unknown as JsonValue;
}
throw new AgentRunError("schema-invalid", `unsupported route ${method} ${path}`, { httpStatus: 404 });
}
async function applyNamedAipodSpec(name: string, body: unknown, dir?: string): Promise<JsonValue> {
const spec = aipodSpecFromInput(body ?? {}, `api:${name}`);
if (spec.metadata.name.toLowerCase() !== name.trim().toLowerCase()) {
throw new AgentRunError("schema-invalid", "aipod-spec URL name must match metadata.name", { httpStatus: 400, details: { urlName: name, metadataName: spec.metadata.name, valuesPrinted: false } });
}
return await applyAipodSpec(spec, dir) as JsonValue;
}
function integerQuery(url: URL, key: string, fallback: number): number {
const value = Number(url.searchParams.get(key));
return Number.isInteger(value) && value >= 0 ? value : fallback;
}
function numberField(record: JsonRecord, key: string, fallback: number): number {
const value = record[key];
return typeof value === "number" && Number.isFinite(value) ? value : fallback;
}
function asJsonRecord(value: unknown): JsonRecord | null {
return typeof value === "object" && value !== null && !Array.isArray(value) ? value as JsonRecord : null;
}
function stringJsonValue(value: JsonValue | undefined): string | null {
return typeof value === "string" && value.length > 0 ? value : null;
}
function numberJsonValue(value: JsonValue | undefined): number | null {
return typeof value === "number" && Number.isFinite(value) ? value : null;
}
function boundedJsonString(value: JsonValue | undefined, limit: number): string | null {
if (typeof value !== "string" || value.length === 0) return null;
const normalized = value.replace(/\s+/gu, " ").trim();
return normalized.length > limit ? `${normalized.slice(0, Math.max(0, limit - 3))}...` : normalized;
}
function stringField(record: JsonRecord, key: string): string {
const value = record[key];
if (typeof value !== "string" || value.trim().length === 0) throw new AgentRunError("schema-invalid", `${key} is required`, { httpStatus: 400 });
return value.trim();
}
function optionalStringRecord(key: string, value: unknown): JsonRecord {
return typeof value === "string" && value.trim().length > 0 ? { [key]: value.trim() } : {};
}
function normalizeError(error: unknown): AgentRunError {
if (error instanceof AgentRunError) return error;
return new AgentRunError("infra-failed", error instanceof Error ? error.message : String(error), { httpStatus: 500 });
}
function writeJson(res: import("node:http").ServerResponse, statusCode: number, body: ApiOkBody | ApiErrorBody): void {
const text = `${JSON.stringify(body)}\n`;
res.writeHead(statusCode, { "content-type": "application/json; charset=utf-8", "content-length": Buffer.byteLength(text) });
res.end(text);
}