import type { Server } from "node:http"; import { createServer } from "node:http"; import type { AddressInfo } from "node:net"; import type { AgentRunStore, ListQueueTasksInput, ListSessionsInput, SessionEventPageInput } from "./store.js"; import { openAgentRunStoreFromEnv } from "./store.js"; import { AgentRunError, errorToJson } from "../common/errors.js"; import { asRecord, validateBackendProfile, validateCreateCommand, validateCreateQueueTask, validateCreateRun, validateQueueTaskState, validateSessionListState } from "../common/validation.js"; import { isBackendProfile } from "../common/backend-profiles.js"; import type { ApiErrorBody, ApiOkBody, CommandRecord, JsonRecord, JsonValue, QueueTaskRecord, RunEvent, RunRecord, SessionRecord } from "../common/types.js"; import { createKubernetesRunnerJob, type RunnerJobDefaults } from "./kubernetes-runner-job.js"; import type { RunnerRetentionOptions } from "./runner-retention.js"; import { dispatchQueueTask, refreshQueueTaskFromCore } from "./queue-dispatch.js"; import { buildRunResult } from "./result.js"; import { runnerJobStatusSummary } from "./runner-job-status.js"; import { createSessionPvc, deleteSessionPvc, getSessionPvcSummary, refreshSessionPvcSummary, runSessionStorageGc } from "./session-pvc.js"; import type { SessionPvcSummary } from "./session-pvc.js"; import type { SessionPvcOptions } from "./session-pvc.js"; import { assertManagerRequestAuthorized, managerAuthSummary, managerServerAuthConfigFromEnv, type ManagerAuthConfig } from "./auth.js"; import { getProviderProfileConfig, getProviderProfileValidation, listBackendCapabilities, listProviderProfiles, removeProviderProfile, setProviderProfileConfig, setProviderProfileCredential, showProviderProfile, validateProviderProfile } from "./provider-profiles.js"; import { listToolCredentials, setGithubSshToolCredential, showToolCredential } from "./tool-credentials.js"; import { aipodSpecFromInput, applyAipodSpec, deleteAipodSpec, listAipodSpecs, renderAipodSpecByName, showAipodSpec } from "../common/aipod-specs.js"; import { staticWorkReadyCapabilitySummary } from "../common/work-ready.js"; import { emitAgentRunOtelSpan } from "../common/otel-trace.js"; function pvcOptions(defaults: { kubectlCommand?: string } | undefined): SessionPvcOptions { return defaults?.kubectlCommand ? { kubectlCommand: defaults.kubectlCommand } : {}; } function sessionPvcOptionsForRequest(serverDefaults: { kubectlHandler?: import("./session-pvc.js").KubectlHandler; kubectlCommand?: string; storageClassName?: string; size?: string } | undefined, runnerJobDefaults: { kubectlCommand?: string } | undefined): SessionPvcOptions { if (serverDefaults?.kubectlHandler) { const opts: SessionPvcOptions = { kubectlHandler: serverDefaults.kubectlHandler }; if (serverDefaults.kubectlCommand) opts.kubectlCommand = serverDefaults.kubectlCommand; if (serverDefaults.storageClassName) opts.storageClassName = serverDefaults.storageClassName; if (serverDefaults.size) opts.size = serverDefaults.size; return opts; } return pvcOptions(runnerJobDefaults); } function runnerJobDefaultsForRequest(defaults: ManagerServerOptions["runnerJobDefaults"], sourceCommit: string): RunnerJobDefaults { const namespace = defaults?.namespace ?? process.env.AGENTRUN_RUNTIME_NAMESPACE ?? "agentrun-v01"; const serviceAccountName = defaults?.serviceAccountName ?? process.env.AGENTRUN_RUNNER_SERVICE_ACCOUNT ?? "agentrun-v01-runner"; const jobNamePrefix = defaults?.jobNamePrefix ?? process.env.AGENTRUN_RUNNER_JOB_NAME_PREFIX ?? serviceAccountName; const lane = defaults?.lane ?? process.env.AGENTRUN_LANE ?? "v0.1"; const retention = runnerRetentionOptionsForRequest(defaults?.retention, namespace, jobNamePrefix, defaults?.kubectlCommand); return { namespace, managerUrl: defaults?.managerUrl ?? process.env.AGENTRUN_INTERNAL_MGR_URL ?? `http://agentrun-mgr.${namespace}.svc.cluster.local:8080`, image: defaults?.image ?? process.env.AGENTRUN_RUNNER_IMAGE ?? "", ...optionalStringRecord("bootRepoUrl", defaults?.bootRepoUrl ?? process.env.AGENTRUN_BOOT_REPO_URL), sourceCommit, ...optionalStringRecord("envIdentity", defaults?.envIdentity ?? process.env.AGENTRUN_ENV_IDENTITY), ...optionalStringRecord("artifactCatalogFile", defaults?.artifactCatalogFile ?? process.env.AGENTRUN_ARTIFACT_CATALOG_FILE), serviceAccountName, jobNamePrefix, lane, ...(defaults?.runnerIdleTimeoutMs !== undefined ? { runnerIdleTimeoutMs: defaults.runnerIdleTimeoutMs } : optionalPositiveIntegerRecord("runnerIdleTimeoutMs", process.env.AGENTRUN_RUNNER_IDLE_TIMEOUT_MS)), ...(defaults?.kubectlCommand ? { kubectlCommand: defaults.kubectlCommand } : {}), ...(defaults?.unideskSshEndpointEnv ? { unideskSshEndpointEnv: defaults.unideskSshEndpointEnv } : {}), ...(retention ? { retention } : {}), }; } function runnerRetentionOptionsForRequest(defaults: RunnerRetentionOptions | undefined, namespace: string, jobNamePrefix: string, kubectlCommand?: string): RunnerRetentionOptions | undefined { if (defaults) return { ...defaults, namespace, ...(kubectlCommand ? { kubectlCommand } : {}) }; const maxRunners = optionalPositiveInteger("AGENTRUN_RUNNER_RETENTION_MAX_RUNNERS", process.env.AGENTRUN_RUNNER_RETENTION_MAX_RUNNERS); if (maxRunners === undefined) return undefined; const activeHeartbeatMaxAgeMs = requiredPositiveInteger("AGENTRUN_RUNNER_RETENTION_ACTIVE_HEARTBEAT_MAX_AGE_MS", process.env.AGENTRUN_RUNNER_RETENTION_ACTIVE_HEARTBEAT_MAX_AGE_MS); const cleanupOrder = process.env.AGENTRUN_RUNNER_RETENTION_CLEANUP_ORDER ?? "oldest-inactive-last-active-first"; if (cleanupOrder !== "oldest-inactive-last-active-first") throw new AgentRunError("schema-invalid", "AGENTRUN_RUNNER_RETENTION_CLEANUP_ORDER is unsupported", { httpStatus: 500 }); const jobNamePrefixes = stringListEnv("AGENTRUN_RUNNER_RETENTION_JOB_NAME_PREFIXES", process.env.AGENTRUN_RUNNER_RETENTION_JOB_NAME_PREFIXES); return { namespace, maxRunners, cleanupOrder, activeHeartbeatMaxAgeMs, matchLabels: jsonRecordEnv("AGENTRUN_RUNNER_RETENTION_MATCH_LABELS_JSON", process.env.AGENTRUN_RUNNER_RETENTION_MATCH_LABELS_JSON), jobNamePrefixes: jobNamePrefixes.length > 0 ? jobNamePrefixes : [jobNamePrefix], ageBasedCleanup: { enabled: booleanEnv("AGENTRUN_RUNNER_RETENTION_AGE_BASED_CLEANUP_ENABLED", process.env.AGENTRUN_RUNNER_RETENTION_AGE_BASED_CLEANUP_ENABLED ?? "false"), maxAgeHours: optionalPositiveInteger("AGENTRUN_RUNNER_RETENTION_AGE_BASED_MAX_AGE_HOURS", process.env.AGENTRUN_RUNNER_RETENTION_AGE_BASED_MAX_AGE_HOURS) ?? null, }, ...(kubectlCommand ? { kubectlCommand } : {}), }; } export interface ManagerServerOptions { store?: AgentRunStore; port?: number; host?: string; sourceCommit?: string; runnerJobDefaults?: { namespace?: string; managerUrl?: string; image?: string; bootRepoUrl?: string; envIdentity?: string; artifactCatalogFile?: string; serviceAccountName?: string; jobNamePrefix?: string; lane?: string; runnerIdleTimeoutMs?: number; kubectlCommand?: string; unideskSshEndpointEnv?: JsonRecord; retention?: RunnerRetentionOptions; }; sessionPvcOptions?: { kubectlHandler?: import("./session-pvc.js").KubectlHandler; kubectlCommand?: string; storageClassName?: string; size?: string }; providerProfileOptions?: { namespace?: string; kubectlCommand?: string }; toolCredentialOptions?: { namespace?: string; kubectlCommand?: string }; aipodSpecDir?: string; auth?: ManagerAuthConfig; } export interface StartedManagerServer { server: Server; baseUrl: string; store: AgentRunStore; } export async function startManagerServer(options: ManagerServerOptions = {}): Promise { const store = options.store ?? await openAgentRunStoreFromEnv(); const sourceCommit = options.sourceCommit ?? process.env.AGENTRUN_SOURCE_COMMIT ?? "unknown"; const runnerJobDefaults = options.runnerJobDefaults; const sessionPvcDefaults = options.sessionPvcOptions; const providerProfileDefaults = options.providerProfileOptions; const toolCredentialDefaults = options.toolCredentialOptions; const aipodSpecDir = options.aipodSpecDir ?? process.env.AGENTRUN_AIPOD_SPEC_DIR; const auth = options.auth ?? managerServerAuthConfigFromEnv(); const authSummary = managerAuthSummary(auth); const server = createServer(async (req, res) => { const traceId = `trc_${Date.now().toString(36)}_${Math.random().toString(36).slice(2, 8)}`; try { const method = req.method ?? "GET"; const url = new URL(req.url ?? "/", "http://agentrun.local"); assertManagerRequestAuthorized(req, url.pathname, auth); const data = await route({ method, url, body: await readBody(req), store, sourceCommit, authSummary, ...(runnerJobDefaults ? { runnerJobDefaults } : {}), ...(sessionPvcDefaults ? { sessionPvcDefaults } : {}), ...(providerProfileDefaults ? { providerProfileDefaults } : {}), ...(toolCredentialDefaults ? { toolCredentialDefaults } : {}), ...(aipodSpecDir ? { aipodSpecDir } : {}) }); writeJson(res, 200, { ok: true, data, traceId }); } catch (error) { const agentError = normalizeError(error); writeJson(res, agentError.httpStatus, { ok: false, failureKind: agentError.failureKind, message: agentError.message, traceId, error: errorToJson(error) }); } }); await new Promise((resolve) => server.listen(options.port ?? 0, options.host ?? "127.0.0.1", resolve)); const address = server.address() as AddressInfo; return { server, baseUrl: `http://${address.address}:${address.port}`, store }; } async function readBody(req: import("node:http").IncomingMessage): Promise { if (req.method === "GET" || req.method === "HEAD") return null; const chunks: Buffer[] = []; for await (const chunk of req) chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk)); const text = Buffer.concat(chunks).toString("utf8").trim(); if (text.length === 0) return null; return JSON.parse(text) as unknown; } async function refreshQueueTaskForRead(store: AgentRunStore, taskId: string): Promise { const task = await store.getQueueTask(taskId); const refreshed = await refreshQueueTaskRecordForRead(store, task); return await enrichQueueTaskForRead(store, refreshed as unknown as JsonRecord); } async function refreshQueuePageForRead(store: AgentRunStore, input: ListQueueTasksInput): Promise { const page = await store.listQueueTasks(input); await refreshQueueTaskRecordsForRead(store, page.items); } async function refreshRunningQueueTasksForRead(store: AgentRunStore, queue?: string): Promise { const page = await store.listQueueTasks({ ...(queue ? { queue } : {}), state: "running", limit: 100 }); await refreshQueueTaskRecordsForRead(store, page.items); } async function refreshQueueTaskRecordsForRead(store: AgentRunStore, tasks: QueueTaskRecord[]): Promise { await Promise.all(tasks.map((task) => refreshQueueTaskRecordForRead(store, task))); } async function refreshQueueTaskRecordForRead(store: AgentRunStore, task: QueueTaskRecord): Promise { if (!task.latestAttempt?.runId || !task.latestAttempt.commandId) return task; try { return await refreshQueueTaskFromCore(store, task.id); } catch { return task; } } async function queueCommanderForRead(store: AgentRunStore, queue: string | undefined, readerId: string | null): Promise { const snapshot = await store.queueCommander(queue, readerId) as unknown as JsonRecord; const items = Array.isArray(snapshot.items) ? snapshot.items : []; const enrichedItems = await Promise.all(items.map(async (item) => { const task = asJsonRecord(item); return task ? await enrichQueueTaskForRead(store, task) as JsonValue : item as JsonValue; })); const byId = new Map(); for (const item of enrichedItems) { const task = asJsonRecord(item); const id = stringJsonValue(task?.id); if (id) byId.set(id, item); } if (readerId) { const unfiltered = await store.queueCommander(queue, null) as unknown as JsonRecord; const unfilteredItems = Array.isArray(unfiltered.items) ? unfiltered.items : []; for (const item of unfilteredItems) { const task = asJsonRecord(item); const id = stringJsonValue(task?.id); if (!task || !id || byId.has(id)) continue; const enriched = await enrichQueueTaskForRead(store, task); if (queueTaskHasActiveSession(enriched)) byId.set(id, enriched as unknown as JsonValue); } } const visibleItems = Array.from(byId.values()); const activeSessionCount = visibleItems.filter((item) => queueTaskHasActiveSession(asJsonRecord(item))).length; return { ...snapshot, items: visibleItems, activeSessionCount, valuesPrinted: false } as JsonValue; } async function enrichQueueTaskForRead(store: AgentRunStore, task: JsonRecord): Promise { const activeSession = await queueTaskActiveSession(store, task); const supervisor = await queueTaskSupervisor(store, task, activeSession); const activeBySession = activeSession?.active === true || stringJsonValue(activeSession?.activeRunId) !== null || stringJsonValue(activeSession?.activeCommandId) !== null; return { ...task, ...(activeSession ? { activeSession } : {}), ...(supervisor ? { supervisor } : {}), ...(activeBySession ? { active: true, attentionState: "active-session" } : {}), valuesPrinted: false, }; } async function queueTaskSupervisor(store: AgentRunStore, task: JsonRecord, activeSession: JsonRecord | null = null): Promise { const attempt = asJsonRecord(task.latestAttempt); const attemptRunId = stringJsonValue(attempt?.runId); const attemptCommandId = stringJsonValue(attempt?.commandId); const activeRunId = stringJsonValue(activeSession?.activeRunId); const activeCommandId = stringJsonValue(activeSession?.activeCommandId); const runId = activeRunId ?? attemptRunId; if (!runId) return null; const commandId = activeCommandId ?? attemptCommandId ?? undefined; try { const result = await buildRunResult(store, runId, commandId); const liveness = asJsonRecord(result.liveness); const lastActivity = asJsonRecord(liveness?.lastActivity ?? liveness?.lastCommandActivity); const timeoutBudget = asJsonRecord(liveness?.timeoutBudget); const terminalClassification = asJsonRecord(result.terminalClassification ?? liveness?.terminalClassification); const lease = asJsonRecord(liveness?.lease); return { source: activeRunId ? "active-session" : "latest-attempt", attemptRunId, attemptCommandId, activeSession, runId: stringJsonValue(result.runId), commandId: stringJsonValue(result.commandId), executionState: stringJsonValue(activeSession?.executionState), activeRunId, activeCommandId, status: stringJsonValue(result.status), terminalStatus: stringJsonValue(result.terminalStatus), failureKind: stringJsonValue(result.failureKind), diagnosis: asJsonRecord(result.diagnosis), terminalClassification: terminalClassification ? compactTerminalClassification(terminalClassification) : null, phase: stringJsonValue(liveness?.phase), active: liveness?.active === true, lastSeq: numberJsonValue(liveness?.lastSeq ?? result.lastSeq), lastEventAt: stringJsonValue(liveness?.lastEventAt), lastEventAgeMs: numberJsonValue(liveness?.lastEventAgeMs), lastActivity: lastActivity ? compactActivity(lastActivity) : null, timeoutBudget: timeoutBudget ? compactTimeoutBudget(timeoutBudget) : null, lease: lease ? compactLease(lease) : null, leaseRemainingMs: numberJsonValue(lease?.leaseRemainingMs), leaseExpired: lease?.leaseExpired === true, recoveryActions: compactRecoveryActions(liveness?.recoveryActions), valuesPrinted: false, }; } catch (error) { return { phase: "unavailable", failureKind: "infra-failed", message: error instanceof Error ? error.message : String(error), valuesPrinted: false }; } } async function queueTaskActiveSession(store: AgentRunStore, task: JsonRecord): Promise { const sessionId = queueTaskSessionId(task); if (!sessionId) return null; try { const session = await store.getSessionSummary(sessionId, null) as unknown as JsonRecord; const activeRunId = stringJsonValue(session.activeRunId); const activeCommandId = stringJsonValue(session.activeCommandId); const active = session.active === true || activeRunId !== null || activeCommandId !== null || stringJsonValue(session.executionState) === "running"; if (!active) return null; return { sessionId, sessionPath: stringJsonValue(session.sessionPath) ?? `/api/v1/sessions/${sessionId}`, executionState: stringJsonValue(session.executionState), attentionState: stringJsonValue(session.attentionState), active, activeRunId, activeCommandId, lastRunId: stringJsonValue(session.lastRunId), lastCommandId: stringJsonValue(session.lastCommandId), terminalStatus: stringJsonValue(session.terminalStatus), failureKind: stringJsonValue(session.failureKind), lastActivityAt: stringJsonValue(session.lastActivityAt), updatedAt: stringJsonValue(session.updatedAt), valuesPrinted: false, }; } catch { return null; } } function queueTaskSessionId(task: JsonRecord): string | null { return stringJsonValue(asJsonRecord(task.sessionRef)?.sessionId) ?? stringJsonValue(asJsonRecord(task.latestAttempt)?.sessionId) ?? null; } function queueTaskHasActiveSession(task: JsonRecord | null): boolean { const activeSession = asJsonRecord(task?.activeSession); const supervisor = asJsonRecord(task?.supervisor); return activeSession?.active === true || supervisor?.source === "active-session" || supervisor?.active === true; } function compactActivity(activity: JsonRecord): JsonRecord { return { sourceSeq: numberJsonValue(activity.sourceSeq ?? activity.seq), eventId: stringJsonValue(activity.eventId), activityKind: stringJsonValue(activity.activityKind), type: stringJsonValue(activity.type), status: stringJsonValue(activity.status), toolName: stringJsonValue(activity.toolName), itemId: stringJsonValue(activity.itemId), ageMs: numberJsonValue(activity.ageMs), summary: boundedJsonString(activity.summary, 180), valuesPrinted: false, }; } function compactTimeoutBudget(budget: JsonRecord): JsonRecord { return { state: stringJsonValue(budget.state), timeoutKind: stringJsonValue(budget.timeoutKind), timeoutMs: numberJsonValue(budget.timeoutMs), elapsedMs: numberJsonValue(budget.elapsedMs), idleElapsedMs: numberJsonValue(budget.idleElapsedMs), remainingMs: numberJsonValue(budget.remainingMs), startedAt: stringJsonValue(budget.startedAt), idleStartedAt: stringJsonValue(budget.idleStartedAt), lastActivityAt: stringJsonValue(budget.lastActivityAt), lastActivitySeq: numberJsonValue(budget.lastActivitySeq), commandElapsedMs: numberJsonValue(budget.commandElapsedMs), runElapsedMs: numberJsonValue(budget.runElapsedMs), source: stringJsonValue(budget.source), valuesPrinted: false, }; } function compactLease(lease: JsonRecord): JsonRecord { return { claimedBy: stringJsonValue(lease.claimedBy), leaseExpiresAt: stringJsonValue(lease.leaseExpiresAt), leaseExpired: lease.leaseExpired === true, leaseRemainingMs: numberJsonValue(lease.leaseRemainingMs), valuesPrinted: false, }; } function compactTerminalClassification(record: JsonRecord): JsonRecord { return { category: stringJsonValue(record.category), confidence: stringJsonValue(record.confidence), providerEvidence: stringJsonValue(record.providerEvidence), providerInterruption: stringJsonValue(record.providerInterruption), providerInterruptionKnown: record.providerInterruptionKnown === true, providerInterruptionReason: boundedJsonString(record.providerInterruptionReason, 240), retryInterruptionObserved: record.retryInterruptionObserved === true, retryInterruptionSeq: numberJsonValue(record.retryInterruptionSeq), retryInterruptionKind: stringJsonValue(record.retryInterruptionKind), hardTimeout: record.hardTimeout === true, idleTimeout: record.idleTimeout === true, timeoutKind: stringJsonValue(record.timeoutKind), transportDisconnectObserved: record.transportDisconnectObserved === true, transportDisconnectSeq: numberJsonValue(record.transportDisconnectSeq), reason: boundedJsonString(record.reason, 240), valuesPrinted: false, }; } function compactRecoveryActions(value: JsonValue | undefined): JsonValue[] { if (!Array.isArray(value)) return []; return value.slice(0, 5).map((item) => { const action = asJsonRecord(item); if (!action) return { action: "unknown", valuesPrinted: false }; return { action: stringJsonValue(action.action), operation: stringJsonValue(action.operation), resourceKind: stringJsonValue(action.resourceKind), resourceName: stringJsonValue(action.resourceName), reason: stringJsonValue(action.reason), reasonHint: boundedJsonString(action.reasonHint, 220), reasonRequired: action.reasonRequired === true, inputKind: stringJsonValue(action.inputKind), runId: stringJsonValue(action.runId), commandId: stringJsonValue(action.commandId), runnerJobId: stringJsonValue(action.runnerJobId), sessionId: stringJsonValue(action.sessionId), afterSeq: numberJsonValue(action.afterSeq), limit: numberJsonValue(action.limit), failureMessage: boundedJsonString(action.failureMessage, 220), valuesPrinted: false, }; }); } async function route({ method, url, body, store, sourceCommit, authSummary, runnerJobDefaults, sessionPvcDefaults, providerProfileDefaults, toolCredentialDefaults, aipodSpecDir }: { method: string; url: URL; body: unknown; store: AgentRunStore; sourceCommit: string; authSummary?: JsonRecord; runnerJobDefaults?: NonNullable; sessionPvcDefaults?: NonNullable; providerProfileDefaults?: NonNullable; toolCredentialDefaults?: NonNullable; aipodSpecDir?: string }): Promise { const path = url.pathname; if (method === "GET" && (path === "/health" || path === "/health/live" || path === "/health/readiness")) { const database = await store.health(); const ready = path === "/health/live" ? true : database.ready; return { serviceId: "agentrun-mgr", live: true, ready, database, sourceCommit, auth: authSummary ?? null, runnerWorkReady: staticWorkReadyCapabilitySummary(), secretRefs: { databaseUrl: database.adapter === "postgres" ? "redacted" : "not-used", valuesPrinted: false } }; } if (method === "GET" && path === "/api/v1/backends") return await listBackendCapabilities(providerProfileDefaults) as JsonValue; if (method === "GET" && path === "/api/v1/tool-credentials") return await listToolCredentials(toolCredentialDefaults) as JsonValue; const toolCredentialMatch = path.match(/^\/api\/v1\/tool-credentials\/([^/]+)$/u); if (method === "GET" && toolCredentialMatch) return await showToolCredential(decodeURIComponent(toolCredentialMatch[1] ?? ""), toolCredentialDefaults) as JsonValue; const toolCredentialGithubSshMatch = path.match(/^\/api\/v1\/tool-credentials\/github-ssh\/credential$/u); if (method === "PUT" && toolCredentialGithubSshMatch) return await setGithubSshToolCredential(body ?? {}, toolCredentialDefaults) as JsonValue; if (method === "GET" && path === "/api/v1/aipod-specs") return await listAipodSpecs(aipodSpecDir) as JsonValue; if (method === "POST" && path === "/api/v1/aipod-specs") return await applyAipodSpec(body ?? {}, aipodSpecDir) as JsonValue; const aipodSpecMatch = path.match(/^\/api\/v1\/aipod-specs\/([^/]+)$/u); if (method === "GET" && aipodSpecMatch) return await showAipodSpec(decodeURIComponent(aipodSpecMatch[1] ?? ""), aipodSpecDir) as JsonValue; if (method === "PUT" && aipodSpecMatch) return await applyNamedAipodSpec(decodeURIComponent(aipodSpecMatch[1] ?? ""), body, aipodSpecDir) as JsonValue; if (method === "DELETE" && aipodSpecMatch) return await deleteAipodSpec(decodeURIComponent(aipodSpecMatch[1] ?? ""), aipodSpecDir) as JsonValue; const aipodSpecRenderMatch = path.match(/^\/api\/v1\/aipod-specs\/([^/]+)\/render$/u); if (method === "POST" && aipodSpecRenderMatch) return await renderAipodSpecByName(decodeURIComponent(aipodSpecRenderMatch[1] ?? ""), asRecord(body ?? {}, "aipodSpecRender"), aipodSpecDir) as JsonValue; if (method === "GET" && path === "/api/v1/provider-profiles") return await listProviderProfiles(providerProfileDefaults) as JsonValue; const providerProfileMatch = path.match(/^\/api\/v1\/provider-profiles\/([^/]+)$/u); if (method === "GET" && providerProfileMatch) return await showProviderProfile(providerProfileMatch[1] ?? "", providerProfileDefaults) as JsonValue; if (method === "DELETE" && providerProfileMatch) return await removeProviderProfile(providerProfileMatch[1] ?? "", providerProfileDefaults) as JsonValue; const providerConfigMatch = path.match(/^\/api\/v1\/provider-profiles\/([^/]+)\/config$/u); if (method === "GET" && providerConfigMatch) return await getProviderProfileConfig(providerConfigMatch[1] ?? "", providerProfileDefaults) as JsonValue; if (method === "PUT" && providerConfigMatch) return await setProviderProfileConfig(providerConfigMatch[1] ?? "", body, providerProfileDefaults) as JsonValue; const providerCredentialMatch = path.match(/^\/api\/v1\/provider-profiles\/([^/]+)\/credential$/u); if (method === "PUT" && providerCredentialMatch) return await setProviderProfileCredential(providerCredentialMatch[1] ?? "", body, providerProfileDefaults) as JsonValue; const providerValidationCreateMatch = path.match(/^\/api\/v1\/provider-profiles\/([^/]+)\/validate$/u); if (method === "POST" && providerValidationCreateMatch) return await validateProviderProfile(providerValidationCreateMatch[1] ?? "", body, { store, sourceCommit, ...(providerProfileDefaults ?? {}), ...(runnerJobDefaults ? { runnerJobDefaults } : {}) }) as JsonValue; const providerValidationShowMatch = path.match(/^\/api\/v1\/provider-profiles\/([^/]+)\/validations\/([^/]+)$/u); if (method === "GET" && providerValidationShowMatch) return await getProviderProfileValidation(providerValidationShowMatch[1] ?? "", providerValidationShowMatch[2] ?? "", { store }) as JsonValue; if (method === "GET" && path === "/api/v1/sessions") { const input: ListSessionsInput = { limit: integerQuery(url, "limit", 50) }; const state = url.searchParams.get("state"); const backendProfile = url.searchParams.get("backendProfile") ?? url.searchParams.get("profile"); const readerId = url.searchParams.get("readerId"); const cursor = url.searchParams.get("cursor"); if (state) input.state = validateSessionListState(state); if (backendProfile) input.backendProfile = validateBackendProfile(backendProfile); if (readerId) input.readerId = readerId; if (cursor) input.cursor = cursor; return await store.listSessions(input) as unknown as JsonValue; } const sessionMatch = path.match(/^\/api\/v1\/sessions\/([^/]+)$/u); if (method === "GET" && sessionMatch) { const summary = await store.getSessionSummary(sessionMatch[1] ?? "", url.searchParams.get("readerId")); const runId = summary.activeRunId ?? summary.lastRunId; if (!runId) return summary as unknown as JsonValue; const commandId = summary.activeCommandId ?? summary.lastCommandId ?? undefined; try { const result = await buildRunResult(store, runId, commandId); const liveness = asJsonRecord(result.liveness); const lastActivity = asJsonRecord(liveness?.lastActivity ?? liveness?.lastCommandActivity); const timeoutBudget = asJsonRecord(liveness?.timeoutBudget); const terminalClassification = asJsonRecord(result.terminalClassification ?? liveness?.terminalClassification); return { ...summary, liveness: result.liveness ?? null, supervisor: { sessionId: summary.sessionId, executionState: summary.executionState, attentionState: summary.attentionState, active: summary.active, activeRunId: summary.activeRunId, activeCommandId: summary.activeCommandId, lastRunId: summary.lastRunId, lastCommandId: summary.lastCommandId, runId: result.runId, commandId: result.commandId, status: result.status, terminalStatus: result.terminalStatus, failureKind: result.failureKind, terminalClassification: terminalClassification ? compactTerminalClassification(terminalClassification) : null, phase: stringJsonValue(liveness?.phase), lastEventAgeMs: numberJsonValue(liveness?.lastEventAgeMs), lastActivity: lastActivity ? compactActivity(lastActivity) : null, timeoutBudget: timeoutBudget ? compactTimeoutBudget(timeoutBudget) : null, lastSeq: result.lastSeq, liveness: result.liveness ?? null, recoveryActions: compactRecoveryActions(liveness?.recoveryActions), ...(result.steerDelivery ? { steerDelivery: result.steerDelivery } : {}), valuesPrinted: false, }, } as unknown as JsonValue; } catch (error) { return { ...summary, liveness: { phase: "unavailable", active: summary.active, failureKind: "infra-failed", message: error instanceof Error ? error.message : String(error), valuesPrinted: false, }, } as unknown as JsonValue; } } const sessionTraceMatch = path.match(/^\/api\/v1\/sessions\/([^/]+)\/trace$/u); if (method === "GET" && sessionTraceMatch) { const input: SessionEventPageInput = { afterSeq: integerQuery(url, "afterSeq", 0), limit: integerQuery(url, "limit", 100) }; const runId = url.searchParams.get("runId"); if (runId) input.runId = runId; return await store.listSessionTrace(sessionTraceMatch[1] ?? "", input) as unknown as JsonValue; } const sessionOutputMatch = path.match(/^\/api\/v1\/sessions\/([^/]+)\/output$/u); if (method === "GET" && sessionOutputMatch) { const input: SessionEventPageInput = { afterSeq: integerQuery(url, "afterSeq", 0), limit: integerQuery(url, "limit", 100) }; const runId = url.searchParams.get("runId"); if (runId) input.runId = runId; return await store.listSessionOutput(sessionOutputMatch[1] ?? "", input) as unknown as JsonValue; } const sessionSendMatch = path.match(/^\/api\/v1\/sessions\/([^/]+)\/send$/u); if (method === "POST" && sessionSendMatch) { return await sendToSession({ store, sessionId: sessionSendMatch[1] ?? "", body, sourceCommit, runnerJobDefaults }) as JsonValue; } const sessionReadMatch = path.match(/^\/api\/v1\/sessions\/([^/]+)\/read$/u); if (method === "POST" && sessionReadMatch) { const record = body === null ? {} : asRecord(body, "read"); const readerId = typeof record.readerId === "string" && record.readerId.trim().length > 0 ? record.readerId.trim() : "cli"; return await store.markSessionRead(sessionReadMatch[1] ?? "", readerId) as unknown as JsonValue; } const sessionControlMatch = path.match(/^\/api\/v1\/sessions\/([^/]+)\/control$/u); if (method === "POST" && sessionControlMatch) { const record = asRecord(body ?? {}, "sessionControl"); const action = typeof record.action === "string" ? record.action : ""; const session = await store.getSessionSummary(sessionControlMatch[1] ?? "", typeof record.readerId === "string" ? record.readerId : null); if (action === "read") return await store.markSessionRead(session.sessionId, typeof record.readerId === "string" && record.readerId.trim().length > 0 ? record.readerId.trim() : "cli") as unknown as JsonValue; if (action === "cancel") { const reason = typeof record.reason === "string" && record.reason.trim().length > 0 ? record.reason.trim() : undefined; if (session.activeCommandId) return await store.cancelCommand(session.activeCommandId, reason) as unknown as JsonValue; if (session.activeRunId) return await store.cancelRun(session.activeRunId, reason) as unknown as JsonValue; throw new AgentRunError("schema-invalid", `session ${session.sessionId} has no active run or command`, { httpStatus: 409 }); } throw new AgentRunError("schema-invalid", `session control action ${action} is not supported`, { httpStatus: 400 }); } const sessionStorageMatch = path.match(/^\/api\/v1\/sessions\/([^/]+)\/storage$/u); if (method === "GET" && sessionStorageMatch) { const summary = await getSessionPvcSummary({ store, sessionId: sessionStorageMatch[1] ?? "", options: sessionPvcOptionsForRequest(sessionPvcDefaults, runnerJobDefaults) }); return summary as unknown as JsonValue; } if (method === "DELETE" && sessionStorageMatch) { return await deleteSessionPvc({ store, sessionId: sessionStorageMatch[1] ?? "", options: sessionPvcOptionsForRequest(sessionPvcDefaults, runnerJobDefaults) }) as unknown as JsonValue; } const sessionStorageRefreshMatch = path.match(/^\/api\/v1\/sessions\/([^/]+)\/storage\/refresh$/u); if (method === "POST" && sessionStorageRefreshMatch) { const record = asRecord(body ?? {}, "sessionStorageRefresh"); const summary: SessionPvcSummary = { pvcName: stringField(record, "pvcName"), namespace: stringField(record, "namespace"), pvcPhase: typeof record.pvcPhase === "string" ? record.pvcPhase : null, storageSizeBytes: typeof record.storageSizeBytes === "number" ? record.storageSizeBytes : null, storageFilesCount: typeof record.storageFilesCount === "number" ? record.storageFilesCount : null, storageSha256: typeof record.storageSha256 === "string" ? record.storageSha256 : null, storageUpdatedAt: typeof record.storageUpdatedAt === "string" ? record.storageUpdatedAt : new Date().toISOString(), codexRolloutSubdir: typeof record.codexRolloutSubdir === "string" && record.codexRolloutSubdir.length > 0 ? record.codexRolloutSubdir : "sessions", valuesPrinted: false, }; const refreshed = await refreshSessionPvcSummary({ store, sessionId: sessionStorageRefreshMatch[1] ?? "", summary, options: sessionPvcOptionsForRequest(sessionPvcDefaults, runnerJobDefaults) }); return { action: "session-storage-refreshed", sessionId: refreshed.sessionId, summary: refreshed } as unknown as JsonValue; } if (method === "POST" && path === "/api/v1/sessions/storage/gc") { const cycle = await runSessionStorageGc({ store, options: sessionPvcOptionsForRequest(sessionPvcDefaults, runnerJobDefaults) }); return cycle as unknown as JsonValue; } if (method === "POST" && path === "/api/v1/sessions") { const record = asRecord(body ?? {}, "sessionCreate"); const sessionId = typeof record.sessionId === "string" && record.sessionId.trim().length > 0 ? record.sessionId.trim() : `sess_${Date.now().toString(36)}_${Math.random().toString(36).slice(2, 8)}`; const tenantId = stringField(record, "tenantId"); const projectId = stringField(record, "projectId"); const backendProfileRaw = typeof record.backendProfile === "string" ? record.backendProfile : "codex"; if (!isBackendProfile(backendProfileRaw)) throw new AgentRunError("schema-invalid", `backendProfile ${backendProfileRaw} is not supported`, { httpStatus: 400 }); const conversationId = typeof record.conversationId === "string" ? record.conversationId : null; const codexRolloutSubdir = typeof record.codexRolloutSubdir === "string" && record.codexRolloutSubdir.length > 0 ? record.codexRolloutSubdir : "sessions"; const expiresAt = typeof record.expiresAt === "string" ? record.expiresAt : new Date(Date.now() + 30 * 24 * 60 * 60 * 1000).toISOString(); const existing = await store.getSession(sessionId); if (existing) { if (existing.storageKind === "evicted") throw new AgentRunError("session-store-evicted", `session ${sessionId} storage has been evicted; create a new sessionId`, { httpStatus: 409 }); if (existing.storageKind === "none" || !existing.storagePvcName) { const recovered = await createSessionPvc({ store, sessionId, options: { ...sessionPvcOptionsForRequest(sessionPvcDefaults, runnerJobDefaults), defaultCodexRolloutSubdir: codexRolloutSubdir } }); return { action: "session-storage-recovered", session: existing, pvc: recovered, valuesPrinted: false } as unknown as JsonValue; } return { action: "session-exists", session: existing, pvcName: existing.storagePvcName ?? null, pvcPhase: existing.storagePvcPhase ?? null, codexRolloutSubdir: existing.codexRolloutSubdir ?? "sessions", valuesPrinted: false } as unknown as JsonValue; } const now = new Date().toISOString(); const session = await store.upsertSession({ sessionId, tenantId, projectId, backendProfile: backendProfileRaw as never, conversationId, threadId: null, metadata: typeof record.metadata === "object" && record.metadata !== null && !Array.isArray(record.metadata) ? record.metadata as Record : {}, expiresAt, codexRolloutSubdir, }); const pvc = await createSessionPvc({ store, sessionId, options: { ...sessionPvcOptionsForRequest(sessionPvcDefaults, runnerJobDefaults), defaultCodexRolloutSubdir: codexRolloutSubdir } }); return { action: "session-created", session, pvc, valuesPrinted: false } as unknown as JsonValue; } if (method === "POST" && path === "/api/v1/queue/tasks") return await store.createQueueTask(validateCreateQueueTask(body)) as unknown as JsonValue; if (method === "GET" && path === "/api/v1/queue/tasks") { const state = url.searchParams.get("state"); const listInput: ListQueueTasksInput = { updatedAfter: integerQuery(url, "updatedAfter", 0), limit: integerQuery(url, "limit", 50) }; const queue = url.searchParams.get("queue"); const cursor = url.searchParams.get("cursor"); if (queue) listInput.queue = queue; if (state) listInput.state = validateQueueTaskState(state); if (cursor) listInput.cursor = cursor; await refreshQueuePageForRead(store, listInput); return await store.listQueueTasks(listInput) as unknown as JsonValue; } const queueTaskMatch = path.match(/^\/api\/v1\/queue\/tasks\/([^/]+)$/u); if (method === "GET" && queueTaskMatch) return await refreshQueueTaskForRead(store, queueTaskMatch[1] ?? "") as unknown as JsonValue; const queueTaskDispatchMatch = path.match(/^\/api\/v1\/queue\/tasks\/([^/]+)\/dispatch$/u); if (method === "POST" && queueTaskDispatchMatch) { return await dispatchQueueTask({ store, taskId: queueTaskDispatchMatch[1] ?? "", input: asRecord(body ?? {}, "queueDispatch"), defaults: runnerJobDefaultsForRequest(runnerJobDefaults, sourceCommit), }) as unknown as JsonValue; } const queueTaskRefreshMatch = path.match(/^\/api\/v1\/queue\/tasks\/([^/]+)\/refresh$/u); if (method === "POST" && queueTaskRefreshMatch) return await refreshQueueTaskFromCore(store, queueTaskRefreshMatch[1] ?? "") as unknown as JsonValue; const queueTaskCancelMatch = path.match(/^\/api\/v1\/queue\/tasks\/([^/]+)\/cancel$/u); if (method === "POST" && queueTaskCancelMatch) { const record = body === null ? {} : asRecord(body, "cancel"); const reason = typeof record.reason === "string" && record.reason.trim().length > 0 ? record.reason.trim() : undefined; return await store.cancelQueueTask(queueTaskCancelMatch[1] ?? "", reason) as unknown as JsonValue; } const queueTaskReadMatch = path.match(/^\/api\/v1\/queue\/tasks\/([^/]+)\/read$/u); if (method === "POST" && queueTaskReadMatch) { const record = body === null ? {} : asRecord(body, "read"); const readerId = typeof record.readerId === "string" && record.readerId.trim().length > 0 ? record.readerId.trim() : "cli"; return await store.markQueueTaskRead(queueTaskReadMatch[1] ?? "", readerId) as unknown as JsonValue; } if (method === "GET" && path === "/api/v1/queue/stats") { const queue = url.searchParams.get("queue") ?? undefined; await refreshRunningQueueTasksForRead(store, queue); return await store.queueStats(queue) as unknown as JsonValue; } if (method === "GET" && path === "/api/v1/queue/commander") { const queue = url.searchParams.get("queue") ?? undefined; await refreshRunningQueueTasksForRead(store, queue); return await queueCommanderForRead(store, queue, url.searchParams.get("readerId")); } if (method === "POST" && path === "/api/v1/runs") { const startedAt = Date.now(); const run = await store.createRun(validateCreateRun(body)); void emitAgentRunOtelSpan("run_created", run, process.env, { startTimeMs: startedAt, kind: 2, attributes: { "http.method": "POST", "http.route": "/api/v1/runs", "http.status_code": 200, backendProfile: run.backendProfile, providerId: run.providerId } }); return run as unknown as JsonValue; } const runMatch = path.match(/^\/api\/v1\/runs\/([^/]+)$/u); if (method === "GET" && runMatch) return await store.getRun(runMatch[1] ?? "") as unknown as JsonValue; const eventMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/events$/u); if (method === "GET" && eventMatch) { const startedAt = Date.now(); const runId = eventMatch[1] ?? ""; const afterSeq = integerQuery(url, "afterSeq", 0); const limit = integerQuery(url, "limit", 100); const run = await store.getRun(runId); const items = await store.listEvents(runId, afterSeq, limit); void emitAgentRunOtelSpan("projection_sync", run, process.env, { startTimeMs: startedAt, kind: 2, attributes: { "http.method": "GET", "http.route": "/api/v1/runs/:runId/events", "http.status_code": 200, afterSeq, limit, eventCount: items.length } }); return { items: items as unknown as JsonValue }; } const runResultMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/result$/u); if (method === "GET" && runResultMatch) { const startedAt = Date.now(); const runId = runResultMatch[1] ?? ""; const run = await store.getRun(runId); const commandId = url.searchParams.get("commandId") ?? undefined; const command = commandId ? await store.getCommand(commandId) : null; const result = await buildRunResult(store, runId, commandId) as JsonValue; void emitAgentRunOtelSpan("command_result", run, process.env, { command, startTimeMs: startedAt, kind: 2, attributes: { "http.method": "GET", "http.route": "/api/v1/runs/:runId/result", "http.status_code": 200, terminalStatus: typeof result === "object" && result !== null && !Array.isArray(result) ? (result as JsonRecord).terminalStatus ?? null : null } }); return result; } const runCancelMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/cancel$/u); if (method === "POST" && runCancelMatch) { const record = body === null ? {} : asRecord(body, "cancel"); const reason = typeof record.reason === "string" && record.reason.trim().length > 0 ? record.reason.trim() : undefined; return await store.cancelRun(runCancelMatch[1] ?? "", reason) as unknown as JsonValue; } const commandCreateMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/commands$/u); if (method === "POST" && commandCreateMatch) { const startedAt = Date.now(); const runId = commandCreateMatch[1] ?? ""; const command = await store.createCommand(runId, validateCreateCommand(body)); const run = await store.getRun(runId); void emitAgentRunOtelSpan("command_created", run, process.env, { command, startTimeMs: startedAt, kind: 2, attributes: { "http.method": "POST", "http.route": "/api/v1/runs/:runId/commands", "http.status_code": 200, commandType: command.type, commandState: command.state } }); return command as unknown as JsonValue; } if (method === "GET" && commandCreateMatch) return { items: await store.listCommands(commandCreateMatch[1] ?? "", integerQuery(url, "afterSeq", 0), integerQuery(url, "limit", 20)) as unknown as JsonValue }; const runnerJobMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/runner-jobs$/u); if (method === "POST" && runnerJobMatch) { const startedAt = Date.now(); const runId = runnerJobMatch[1] ?? ""; const runnerJob = await createKubernetesRunnerJob({ store, runId, input: asRecord(body ?? {}, "runnerJob") as never, defaults: runnerJobDefaultsForRequest(runnerJobDefaults, sourceCommit), }) as unknown as JsonRecord; const run = await store.getRun(runId); const command = typeof runnerJob.commandId === "string" ? await store.getCommand(runnerJob.commandId) : null; void emitAgentRunOtelSpan("runner_job_created", run, process.env, { command, startTimeMs: startedAt, kind: 2, attributes: { "http.method": "POST", "http.route": "/api/v1/runs/:runId/runner-jobs", "http.status_code": 200, jobName: typeof runnerJob.jobName === "string" ? runnerJob.jobName : null, namespace: typeof runnerJob.namespace === "string" ? runnerJob.namespace : null } }); return runnerJob as unknown as JsonValue; } if (method === "GET" && runnerJobMatch) { const runId = runnerJobMatch[1] ?? ""; const commandId = url.searchParams.get("commandId") ?? undefined; const jobs = await store.listRunnerJobs(runId, commandId); const events = await store.listEvents(runId, 0, 500); return { items: jobs.map((job) => runnerJobStatusSummary(job, events)), count: jobs.length, lastSeq: events.at(-1)?.seq ?? 0 }; } const runnerJobShowMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/runner-jobs\/([^/]+)$/u); if (method === "GET" && runnerJobShowMatch) { const runId = runnerJobShowMatch[1] ?? ""; const runnerJobId = runnerJobShowMatch[2] ?? ""; const jobs = await store.listRunnerJobs(runId); const job = jobs.find((item) => item.id === runnerJobId); if (!job) throw new AgentRunError("schema-invalid", `runner job ${runnerJobId} was not found`, { httpStatus: 404 }); return runnerJobStatusSummary(job, await store.listEvents(runId, 0, 500)) as JsonValue; } const commandShowMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/commands\/([^/]+)$/u); if (method === "GET" && commandShowMatch) return await store.getCommand(commandShowMatch[2] ?? "") as unknown as JsonValue; const commandResultMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/commands\/([^/]+)\/result$/u); if (method === "GET" && commandResultMatch) { const startedAt = Date.now(); const runId = commandResultMatch[1] ?? ""; const commandId = commandResultMatch[2] ?? ""; const [run, command] = await Promise.all([store.getRun(runId), store.getCommand(commandId)]); const result = await buildRunResult(store, runId, commandId) as JsonValue; void emitAgentRunOtelSpan("command_result", run, process.env, { command, startTimeMs: startedAt, kind: 2, attributes: { "http.method": "GET", "http.route": "/api/v1/runs/:runId/commands/:commandId/result", "http.status_code": 200, terminalStatus: typeof result === "object" && result !== null && !Array.isArray(result) ? (result as JsonRecord).terminalStatus ?? null : null } }); return result; } if (method === "POST" && path === "/api/v1/runners/register") return await store.registerRunner(asRecord(body ?? {}, "runner")) as unknown as JsonValue; const claimMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/claim$/u); if (method === "POST" && claimMatch) { const record = asRecord(body, "claim"); const runnerId = typeof record.runnerId === "string" ? record.runnerId : ""; if (runnerId.length === 0) throw new AgentRunError("schema-invalid", "runnerId is required", { httpStatus: 400 }); return await store.claimRun(claimMatch[1] ?? "", runnerId, numberField(record, "leaseMs", 60_000)) as unknown as JsonValue; } const leaseMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/lease$/u); if (method === "PATCH" && leaseMatch) { const record = asRecord(body, "lease"); const runnerId = typeof record.runnerId === "string" ? record.runnerId : ""; if (runnerId.length === 0) throw new AgentRunError("schema-invalid", "runnerId is required", { httpStatus: 400 }); return await store.heartbeat(leaseMatch[1] ?? "", runnerId, numberField(record, "leaseMs", 60_000)) as unknown as JsonValue; } const eventsAppendMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/events$/u); if (method === "POST" && eventsAppendMatch) { const startedAt = Date.now(); const runId = eventsAppendMatch[1] ?? ""; const record = asRecord(body, "event"); const type = typeof record.type === "string" ? record.type as RunEvent["type"] : "backend_status"; const payload = asRecord(record.payload ?? {}, "event.payload"); const [run, event] = await Promise.all([ store.getRun(runId), store.appendEvent(runId, type, payload), ]); emitRunEventOtelSpan(type, payload, run, startedAt); return event as unknown as JsonValue; } const statusMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/status$/u); if (method === "PATCH" && statusMatch) { const record = asRecord(body, "status"); const terminalStatus = record.terminalStatus === "completed" || record.terminalStatus === "failed" || record.terminalStatus === "blocked" || record.terminalStatus === "cancelled" ? record.terminalStatus : "failed"; return await store.finishRun(statusMatch[1] ?? "", { terminalStatus, failureKind: typeof record.failureKind === "string" ? record.failureKind as never : null, failureMessage: typeof record.failureMessage === "string" ? record.failureMessage : null, ...(typeof record.threadId === "string" ? { threadId: record.threadId } : {}), ...(typeof record.turnId === "string" ? { turnId: record.turnId } : {}), }) as unknown as JsonValue; } const ackMatch = path.match(/^\/api\/v1\/commands\/([^/]+)\/ack$/u); if (method === "POST" && ackMatch) return await store.ackCommand(ackMatch[1] ?? "") as unknown as JsonValue; const commandStatusMatch = path.match(/^\/api\/v1\/commands\/([^/]+)\/status$/u); if (method === "PATCH" && commandStatusMatch) { const record = asRecord(body, "commandStatus"); const terminalStatus = record.terminalStatus === "completed" || record.terminalStatus === "failed" || record.terminalStatus === "blocked" || record.terminalStatus === "cancelled" ? record.terminalStatus : "failed"; return await store.finishCommand(commandStatusMatch[1] ?? "", { terminalStatus, failureKind: typeof record.failureKind === "string" ? record.failureKind as never : null, failureMessage: typeof record.failureMessage === "string" ? record.failureMessage : null, ...(typeof record.threadId === "string" ? { threadId: record.threadId } : {}), ...(typeof record.turnId === "string" ? { turnId: record.turnId } : {}), }) as unknown as JsonValue; } const commandCancelMatch = path.match(/^\/api\/v1\/commands\/([^/]+)\/cancel$/u); if (method === "POST" && commandCancelMatch) { const record = body === null ? {} : asRecord(body, "cancel"); const reason = typeof record.reason === "string" && record.reason.trim().length > 0 ? record.reason.trim() : undefined; return await store.cancelCommand(commandCancelMatch[1] ?? "", reason) as unknown as JsonValue; } throw new AgentRunError("schema-invalid", `unsupported route ${method} ${path}`, { httpStatus: 404 }); } async function applyNamedAipodSpec(name: string, body: unknown, dir?: string): Promise { const spec = aipodSpecFromInput(body ?? {}, `api:${name}`); if (spec.metadata.name.toLowerCase() !== name.trim().toLowerCase()) { throw new AgentRunError("schema-invalid", "aipod-spec URL name must match metadata.name", { httpStatus: 400, details: { urlName: name, metadataName: spec.metadata.name, valuesPrinted: false } }); } return await applyAipodSpec(spec, dir) as JsonValue; } async function sendToSession(input: { store: AgentRunStore; sessionId: string; body: unknown; sourceCommit: string; runnerJobDefaults?: ManagerServerOptions["runnerJobDefaults"] }): Promise { const startedAt = Date.now(); const record = asRecord(input.body ?? {}, "sessionSend"); const dryRun = record.dryRun === true; const existing = await input.store.getSession(input.sessionId); const active = existing ? await activeReceivableCommand(input.store, existing) : null; const payload = sessionSendPayload(record); const commandIdempotencyKey = optionalString(record.commandIdempotencyKey) ?? optionalString(record.idempotencyKey); if (active) { const commandBody: JsonRecord = { type: "steer", payload }; if (commandIdempotencyKey) commandBody.idempotencyKey = commandIdempotencyKey; const request = { method: "POST", path: `/api/v1/runs/${active.run.id}/commands`, commandType: "steer", payloadBytes: jsonByteLength(payload), valuesPrinted: false }; if (dryRun) return sessionSendPlan(input.sessionId, "steer", active, request, null); const command = await input.store.createCommand(active.run.id, validateCreateCommand(commandBody)); void emitAgentRunOtelSpan("command_created", active.run, process.env, { command, startTimeMs: startedAt, kind: 2, attributes: { "http.method": "POST", "http.route": "/api/v1/sessions/:sessionId/send", "http.status_code": 200, decision: "steer", commandType: command.type, commandState: command.state } }); void emitAgentRunOtelSpan("session_send", active.run, process.env, { command, startTimeMs: startedAt, kind: 2, attributes: { "http.method": "POST", "http.route": "/api/v1/sessions/:sessionId/send", "http.status_code": 200, decision: "steer", reusedActiveRun: true } }); return sessionSendResponse({ sessionId: input.sessionId, decision: "steer", run: active.run, command, runnerJob: null, activeBefore: active, dryRun: false }); } const runRecord = asRecord(record.run ?? record.runBase ?? null, "sessionSend.run"); const runBody = sessionSendRunBody(input.sessionId, runRecord); const commandBody: JsonRecord = { type: "turn", payload }; if (commandIdempotencyKey) commandBody.idempotencyKey = commandIdempotencyKey; const runnerJobBody = record.runnerJob === undefined || record.runnerJob === null ? {} : asRecord(record.runnerJob, "sessionSend.runnerJob"); const createRunnerJob = record.createRunnerJob !== false; const request = { method: "POST", path: `/api/v1/sessions/${input.sessionId}/send`, commandType: "turn", runBytes: jsonByteLength(runBody), payloadBytes: jsonByteLength(payload), createRunnerJob, runnerJobBytes: createRunnerJob ? jsonByteLength(runnerJobBody) : 0, valuesPrinted: false, }; if (dryRun) return sessionSendPlan(input.sessionId, "turn", active, request, runBody); const run = await input.store.createRun(validateCreateRun(runBody)); void emitAgentRunOtelSpan("run_created", run, process.env, { startTimeMs: startedAt, kind: 2, attributes: { "http.method": "POST", "http.route": "/api/v1/sessions/:sessionId/send", "http.status_code": 200, decision: "turn", backendProfile: run.backendProfile, providerId: run.providerId } }); const command = await input.store.createCommand(run.id, validateCreateCommand(commandBody)); void emitAgentRunOtelSpan("command_created", run, process.env, { command, startTimeMs: startedAt, kind: 2, attributes: { "http.method": "POST", "http.route": "/api/v1/sessions/:sessionId/send", "http.status_code": 200, decision: "turn", commandType: command.type, commandState: command.state } }); let runnerJob: JsonValue = null; if (createRunnerJob) { runnerJob = await createKubernetesRunnerJob({ store: input.store, runId: run.id, input: { ...runnerJobBody, commandId: command.id } as never, defaults: runnerJobDefaultsForRequest(input.runnerJobDefaults, input.sourceCommit), }) as unknown as JsonValue; const runnerJobRecord = asJsonRecord(runnerJob); void emitAgentRunOtelSpan("runner_job_created", run, process.env, { command, startTimeMs: startedAt, kind: 2, attributes: { "http.method": "POST", "http.route": "/api/v1/sessions/:sessionId/send", "http.status_code": 200, decision: "turn", jobName: stringJsonValue(runnerJobRecord?.jobName), namespace: stringJsonValue(runnerJobRecord?.namespace) } }); } void emitAgentRunOtelSpan("session_send", run, process.env, { command, startTimeMs: startedAt, kind: 2, attributes: { "http.method": "POST", "http.route": "/api/v1/sessions/:sessionId/send", "http.status_code": 200, decision: "turn", createRunnerJob } }); return sessionSendResponse({ sessionId: input.sessionId, decision: "turn", run, command, runnerJob, activeBefore: active, dryRun: false }); } async function activeReceivableCommand(store: AgentRunStore, session: SessionRecord): Promise<{ run: RunRecord; command: CommandRecord; reason: string; leaseExpired: boolean } | null> { if (!session.activeRunId || !session.activeCommandId) return null; const [run, command] = await Promise.all([store.getRun(session.activeRunId), store.getCommand(session.activeCommandId)]); if (run.sessionRef?.sessionId !== session.sessionId || command.runId !== run.id) return null; if (runIsTerminal(run) || commandIsTerminal(command)) return null; if (command.type !== "turn") return null; const leaseExpired = run.leaseExpiresAt ? Date.parse(run.leaseExpiresAt) <= Date.now() : false; if (leaseExpired) return null; if (command.state !== "acknowledged") return null; if (run.status !== "claimed" && run.status !== "running") return null; return { run, command, reason: "active-turn-running", leaseExpired }; } function sessionSendPayload(record: JsonRecord): JsonRecord { const payload = asJsonRecord(record.payload); if (payload) return payload; const prompt = optionalString(record.prompt) ?? optionalString(record.message) ?? optionalString(record.text); if (!prompt) throw new AgentRunError("schema-invalid", "sessions send requires payload or non-empty prompt/message/text", { httpStatus: 400 }); return { prompt }; } function sessionSendRunBody(sessionId: string, runRecord: JsonRecord): JsonRecord { const sessionRef = asJsonRecord(runRecord.sessionRef) ?? {}; const metadata = asJsonRecord(sessionRef.metadata) ?? {}; return { ...runRecord, sessionRef: { ...sessionRef, sessionId, metadata } }; } function sessionSendPlan(sessionId: string, decision: "steer" | "turn", active: Awaited>, request: JsonRecord, runBody: JsonRecord | null): JsonRecord { return { action: "session-send-plan", dryRun: true, mutation: false, sessionId, decision, internalCommandType: decision, activeBefore: active ? activeBeforeSummary(active) : null, request, ...(runBody ? { run: { bodyBytes: jsonByteLength(runBody), sessionRef: summarizeSendSessionRef(runBody), valuesPrinted: false } } : {}), next: { confirm: managerActionDescriptor({ action: "send-session", operation: "send", resourceKind: "session", resourceName: sessionId, sessionId, inputKind: "prompt" }), note: "Remove --dry-run to perform the mutation. Manager will decide internal steer vs turn from durable session state." }, valuesPrinted: false, }; } function sessionSendResponse(input: { sessionId: string; decision: "steer" | "turn"; run: RunRecord; command: CommandRecord; runnerJob: JsonValue; activeBefore: Awaited>; dryRun: false }): JsonRecord { return { action: "session-send", dryRun: input.dryRun, mutation: true, sessionId: input.sessionId, decision: input.decision, internalCommandType: input.command.type, run: input.run as unknown as JsonRecord, command: input.command as unknown as JsonRecord, runnerJob: input.runnerJob, activeBefore: input.activeBefore ? activeBeforeSummary(input.activeBefore) : null, pollActions: [ managerActionDescriptor({ action: "inspect-session", operation: "describe", resourceKind: "session", resourceName: input.sessionId, sessionId: input.sessionId, readerId: "cli" }), managerActionDescriptor({ action: "poll-trace", operation: "events", resourceKind: "run", resourceName: input.run.id, runId: input.run.id, commandId: input.command.id, sessionId: input.sessionId, afterSeq: 0, limit: 100 }), managerActionDescriptor({ action: "poll-output", operation: "logs", resourceKind: "session", resourceName: input.sessionId, runId: input.run.id, commandId: input.command.id, sessionId: input.sessionId, afterSeq: 0, limit: 100 }), managerActionDescriptor({ action: "read-session", operation: "read", resourceKind: "session", resourceName: input.sessionId, sessionId: input.sessionId, readerId: "cli" }), managerActionDescriptor({ action: "cancel-session", operation: "cancel", resourceKind: "session", resourceName: input.sessionId, sessionId: input.sessionId, reasonRequired: true }), ], valuesPrinted: false, }; } function managerActionDescriptor(input: { action: string; operation: string; resourceKind: string; resourceName: string; runId?: string | null; commandId?: string | null; sessionId?: string | null; afterSeq?: number | null; limit?: number | null; readerId?: string | null; reasonRequired?: boolean; inputKind?: string | null }): JsonRecord { return { action: input.action, operation: input.operation, resourceKind: input.resourceKind, resourceName: input.resourceName, runId: input.runId ?? null, commandId: input.commandId ?? null, sessionId: input.sessionId ?? null, ...(input.afterSeq !== undefined ? { afterSeq: input.afterSeq } : {}), ...(input.limit !== undefined ? { limit: input.limit } : {}), ...(input.readerId ? { readerId: input.readerId } : {}), ...(input.reasonRequired === true ? { reasonRequired: true } : {}), ...(input.inputKind ? { inputKind: input.inputKind } : {}), valuesPrinted: false, }; } function activeBeforeSummary(active: NonNullable>>): JsonRecord { return { runId: active.run.id, commandId: active.command.id, commandState: active.command.state, runStatus: active.run.status, leaseExpiresAt: active.run.leaseExpiresAt, leaseExpired: active.leaseExpired, reason: active.reason, valuesPrinted: false }; } function summarizeSendSessionRef(runBody: JsonRecord): JsonRecord { const ref = asJsonRecord(runBody.sessionRef) ?? {}; return { sessionId: optionalString(ref.sessionId), conversationId: optionalString(ref.conversationId), threadId: optionalString(ref.threadId), metadataKeys: Object.keys(asJsonRecord(ref.metadata) ?? {}).sort(), valuesPrinted: false }; } function runIsTerminal(run: RunRecord): boolean { return run.status === "completed" || run.status === "failed" || run.status === "blocked" || run.status === "cancelled"; } function commandIsTerminal(command: CommandRecord): boolean { return command.state === "completed" || command.state === "failed" || command.state === "cancelled"; } function emitRunEventOtelSpan(type: RunEvent["type"], payload: JsonRecord, run: RunRecord, startedAt: number): void { const phase = stringJsonValue(payload.phase); const terminalStatus = stringJsonValue(payload.terminalStatus); const failureKind = stringJsonValue(payload.failureKind); const eventName = runEventOtelSpanName(type, phase, terminalStatus, failureKind); if (!eventName) return; const isError = type === "error" || terminalStatus === "failed" || terminalStatus === "blocked"; void emitAgentRunOtelSpan(eventName, run, process.env, { startTimeMs: startedAt, kind: 2, status: isError ? "error" : "ok", error: isError ? boundedJsonString(payload.message, 300) ?? failureKind ?? eventName : undefined, attributes: { "http.method": "POST", "http.route": "/api/v1/runs/:runId/events", "http.status_code": 200, eventType: type, phase, terminalStatus, failureKind, commandId: stringJsonValue(payload.commandId), attemptId: stringJsonValue(payload.attemptId), runnerId: stringJsonValue(payload.runnerId), threadId: stringJsonValue(payload.threadId), turnId: stringJsonValue(payload.turnId), willRetry: typeof payload.willRetry === "boolean" ? payload.willRetry : null, message: boundedJsonString(payload.message, 300), }, }); } function runEventOtelSpanName(type: RunEvent["type"], phase: string | null, terminalStatus: string | null, failureKind: string | null): string | null { if (type === "error") return failureKind ? `runner_error.${otelNamePart(failureKind)}` : "runner_error"; if (type === "terminal_status") return terminalStatus ? `runner_terminal.${otelNamePart(terminalStatus)}` : "runner_terminal"; if (type !== "backend_status") return null; if (!phase) return null; if (phase === "cancel-requested" || phase === "turn-cancelled" || phase === "command-terminal") return `runner_${otelNamePart(phase)}`; if (phase.startsWith("runner-claim-") || phase.startsWith("turn/interrupt:")) return `runner_${otelNamePart(phase)}`; return null; } function otelNamePart(value: string): string { const normalized = value.toLowerCase().replace(/[^a-z0-9]+/gu, "_").replace(/^_+|_+$/gu, ""); return normalized.length > 0 ? normalized : "unknown"; } function optionalString(value: JsonValue | undefined): string | null { return typeof value === "string" && value.trim().length > 0 ? value.trim() : null; } function jsonByteLength(value: unknown): number { return Buffer.byteLength(JSON.stringify(value ?? null), "utf8"); } function integerQuery(url: URL, key: string, fallback: number): number { const value = Number(url.searchParams.get(key)); return Number.isInteger(value) && value >= 0 ? value : fallback; } function numberField(record: JsonRecord, key: string, fallback: number): number { const value = record[key]; return typeof value === "number" && Number.isFinite(value) ? value : fallback; } function asJsonRecord(value: unknown): JsonRecord | null { return typeof value === "object" && value !== null && !Array.isArray(value) ? value as JsonRecord : null; } function stringJsonValue(value: JsonValue | undefined): string | null { return typeof value === "string" && value.length > 0 ? value : null; } function numberJsonValue(value: JsonValue | undefined): number | null { return typeof value === "number" && Number.isFinite(value) ? value : null; } function boundedJsonString(value: JsonValue | undefined, limit: number): string | null { if (typeof value !== "string" || value.length === 0) return null; const normalized = value.replace(/\s+/gu, " ").trim(); return normalized.length > limit ? `${normalized.slice(0, Math.max(0, limit - 3))}...` : normalized; } function stringField(record: JsonRecord, key: string): string { const value = record[key]; if (typeof value !== "string" || value.trim().length === 0) throw new AgentRunError("schema-invalid", `${key} is required`, { httpStatus: 400 }); return value.trim(); } function optionalStringRecord(key: string, value: unknown): JsonRecord { return typeof value === "string" && value.trim().length > 0 ? { [key]: value.trim() } : {}; } function optionalPositiveIntegerRecord(key: string, value: unknown): JsonRecord { const parsed = optionalPositiveInteger(key, value); if (parsed === undefined) return {}; return { [key]: parsed }; } function optionalPositiveInteger(key: string, value: unknown): number | undefined { if (value === undefined || value === null || value === "") return undefined; const parsed = Number(value); if (!Number.isInteger(parsed) || parsed <= 0) throw new AgentRunError("schema-invalid", `${key} must be a positive integer`, { httpStatus: 400 }); return parsed; } function requiredPositiveInteger(key: string, value: unknown): number { const parsed = optionalPositiveInteger(key, value); if (parsed === undefined) throw new AgentRunError("schema-invalid", `${key} is required when runner retention is enabled`, { httpStatus: 500 }); return parsed; } function jsonRecordEnv(key: string, value: unknown): Record { if (typeof value !== "string" || value.trim().length === 0) throw new AgentRunError("schema-invalid", `${key} is required when runner retention is enabled`, { httpStatus: 500 }); const parsed = JSON.parse(value) as unknown; if (typeof parsed !== "object" || parsed === null || Array.isArray(parsed)) throw new AgentRunError("schema-invalid", `${key} must be a JSON object`, { httpStatus: 500 }); const out: Record = {}; for (const [entryKey, entryValue] of Object.entries(parsed)) { if (typeof entryValue !== "string" || entryValue.length === 0) throw new AgentRunError("schema-invalid", `${key}.${entryKey} must be a non-empty string`, { httpStatus: 500 }); out[entryKey] = entryValue; } return out; } function stringListEnv(key: string, value: unknown): string[] { if (typeof value !== "string") throw new AgentRunError("schema-invalid", `${key} is required when runner retention is enabled`, { httpStatus: 500 }); return value.split(",").map((item) => item.trim()).filter((item) => item.length > 0); } function booleanEnv(key: string, value: string): boolean { if (value === "true") return true; if (value === "false") return false; throw new AgentRunError("schema-invalid", `${key} must be true or false`, { httpStatus: 500 }); } function normalizeError(error: unknown): AgentRunError { if (error instanceof AgentRunError) return error; return new AgentRunError("infra-failed", error instanceof Error ? error.message : String(error), { httpStatus: 500 }); } function writeJson(res: import("node:http").ServerResponse, statusCode: number, body: ApiOkBody | ApiErrorBody): void { const text = `${JSON.stringify(body)}\n`; res.writeHead(statusCode, { "content-type": "application/json; charset=utf-8", "content-length": Buffer.byteLength(text) }); res.end(text); }