diff --git a/src/mgr/kubernetes-runner-job.ts b/src/mgr/kubernetes-runner-job.ts index 72a7554..50c549a 100644 --- a/src/mgr/kubernetes-runner-job.ts +++ b/src/mgr/kubernetes-runner-job.ts @@ -11,6 +11,8 @@ import { staticWorkReadyCapabilitySummary } from "../common/work-ready.js"; import { resolveRunnerEnvImage } from "../common/env-image-ref.js"; import { ensureSessionPvc } from "./session-pvc.js"; import { gitTransportSummary } from "../common/git-transport.js"; +import { enforceRunnerRetentionBeforeCreate } from "./runner-retention.js"; +import type { RunnerRetentionOptions, RunnerRetentionSummary } from "./runner-retention.js"; const reusableCredentialEnvNames = new Set([ "AUTH_PASSWORD", @@ -44,9 +46,12 @@ export interface RunnerJobDefaults { envIdentity?: string; artifactCatalogFile?: string; serviceAccountName?: string; + jobNamePrefix?: string; + lane?: string; runnerIdleTimeoutMs?: number; kubectlCommand?: string; unideskSshEndpointEnv?: JsonRecord; + retention?: RunnerRetentionOptions; } export interface CreateRunnerJobInput extends JsonRecord { @@ -83,12 +88,14 @@ export async function createKubernetesRunnerJob(options: { store: AgentRunStore; const managerUrl = optionalString(options.input.managerUrl) ?? options.defaults.managerUrl; const sourceCommit = optionalString(options.input.sourceCommit) ?? options.defaults.sourceCommit; const serviceAccountName = optionalString(options.input.serviceAccountName) ?? options.defaults.serviceAccountName; + const jobNamePrefix = options.defaults.jobNamePrefix; + const lane = options.defaults.lane; const idempotencyKey = optionalString(options.input.idempotencyKey); const transientEnv = assembleToolContextTransientEnv(run.executionPolicy, transientEnvField(options.input.transientEnv), options.defaults); const attemptId = optionalString(options.input.attemptId) ?? `attempt_${Date.now().toString(36)}`; const runnerId = optionalString(options.input.runnerId); const runnerIdleTimeoutMs = optionalPositiveInteger(options.input.runnerIdleTimeoutMs, "runnerIdleTimeoutMs") ?? options.defaults.runnerIdleTimeoutMs; - const transientEnvSecretName = transientEnv.length > 0 ? transientEnvSecretNameForRun(run.id, commandId, attemptId) : null; + const transientEnvSecretName = transientEnv.length > 0 ? transientEnvSecretNameForRun(run.id, commandId, attemptId, jobNamePrefix) : null; const renderTransientEnv = transientEnvSecretName ? transientEnvWithSecretRefs(transientEnv, transientEnvSecretName) : transientEnv; const normalizedPayload = { commandId, @@ -110,6 +117,10 @@ export async function createKubernetesRunnerJob(options: { store: AgentRunStore; } if (isTerminalRunStatus(run.status)) throw new AgentRunError(run.failureKind ?? (run.status === "cancelled" ? "cancelled" : "schema-invalid"), `run ${run.id} is already terminal: ${run.status}`, { httpStatus: 409 }); if (isTerminalCommandState(command.state) || command.state !== "pending") throw new AgentRunError(command.state === "cancelled" ? "cancelled" : "schema-invalid", `command ${commandId} is not pending: ${command.state}`, { httpStatus: 409 }); + let preCreateRetention: RunnerRetentionSummary | null = null; + if (options.defaults.retention) { + preCreateRetention = await enforceRunnerRetentionBeforeCreate({ store: options.store, options: { ...options.defaults.retention, namespace }, incomingRunnerCount: 1 }); + } let sessionPvc: RunnerSessionPvcOptions | undefined; let sessionPvcSummary: JsonRecord | null = null; if (run.sessionRef?.sessionId) { @@ -156,6 +167,8 @@ export async function createKubernetesRunnerJob(options: { store: AgentRunStore; transientEnv: renderTransientEnv, ...(runnerIdleTimeoutMs !== undefined ? { runnerIdleTimeoutMs } : {}), ...(serviceAccountName ? { serviceAccountName } : {}), + ...(jobNamePrefix ? { jobNamePrefix } : {}), + ...(lane ? { lane } : {}), ...(sessionPvc ? { sessionPvc } : {}), }; const render = renderRunnerJobManifest({ ...renderOptions, attemptId, ...(runnerId ? { runnerId } : {}) }); @@ -165,7 +178,7 @@ export async function createKubernetesRunnerJob(options: { store: AgentRunStore; let created: JsonRecord | null = null; try { if (transientEnvSecretName) { - await kubectlCreate(transientEnvSecretManifest({ namespace: render.namespace, name: transientEnvSecretName, runId: run.id, commandId, attemptId: render.attemptId, runnerId: render.runnerId, jobName: render.jobName, items: transientEnv }), kubectlCommand, "runner transient env secret"); + await kubectlCreate(transientEnvSecretManifest({ namespace: render.namespace, name: transientEnvSecretName, runId: run.id, commandId, attemptId: render.attemptId, runnerId: render.runnerId, jobName: render.jobName, lane: lane ?? "v0.1", items: transientEnv }), kubectlCommand, "runner transient env secret"); transientEnvSecretCreated = true; } created = await kubectlCreate(render.manifest, kubectlCommand, "runner job"); @@ -220,6 +233,7 @@ export async function createKubernetesRunnerJob(options: { store: AgentRunStore; retention: { ttlSecondsAfterFinished: render.ttlSecondsAfterFinished, runnerIdleTimeoutMs: render.runnerIdleTimeoutMs, + preCreateCleanup: preCreateRetention, }, pollActions: [ runnerJobActionDescriptor({ action: "inspect-run", operation: "describe", resourceKind: "run", resourceName: run.id, runId: run.id }), @@ -263,6 +277,7 @@ export async function createKubernetesRunnerJob(options: { store: AgentRunStore; envImage, gitTransport: gitTransportSummary(), workReady: staticWorkReadyCapabilitySummary(), + retention: preCreateRetention, toolCredentials: summarizeToolCredentials(render.toolCredentials, render.namespace), sessionPvc: sessionPvcSummary, sessionRef: summarizeSessionRef(run.sessionRef ?? null), @@ -357,11 +372,12 @@ function transientEnvWithSecretRefs(items: RunnerTransientEnv[], secretName: str return items.map((item) => ({ ...item, secretRef: { name: secretName, key: item.name } })); } -function transientEnvSecretNameForRun(runId: string, commandId: string, attemptId: string): string { - return `agentrun-v01-runner-env-${stableHash({ runId, commandId, attemptId }).slice(0, 20)}`; +function transientEnvSecretNameForRun(runId: string, commandId: string, attemptId: string, jobNamePrefix: string | undefined): string { + const prefix = (jobNamePrefix ?? "agentrun-v01-runner").toLowerCase().replace(/[^a-z0-9-]+/gu, "-").replace(/^-+|-+$/gu, "") || "agentrun-runner"; + return `${prefix.slice(0, 36).replace(/-+$/u, "")}-env-${stableHash({ runId, commandId, attemptId }).slice(0, 20)}`; } -function transientEnvSecretManifest(options: { namespace: string; name: string; runId: string; commandId: string; attemptId: string; runnerId: string; jobName: string; items: RunnerTransientEnv[] }): JsonRecord { +function transientEnvSecretManifest(options: { namespace: string; name: string; runId: string; commandId: string; attemptId: string; runnerId: string; jobName: string; lane: string; items: RunnerTransientEnv[] }): JsonRecord { const stringData: JsonRecord = {}; for (const item of options.items) stringData[item.name] = item.value; return { @@ -374,7 +390,7 @@ function transientEnvSecretManifest(options: { namespace: string; name: string; "app.kubernetes.io/name": "agentrun-runner", "app.kubernetes.io/component": "runner-transient-env", "app.kubernetes.io/part-of": "agentrun", - "agentrun.pikastech.local/lane": "v0.1", + "agentrun.pikastech.local/lane": options.lane, "agentrun.pikastech.local/run-hash": stableHash(options.runId).slice(0, 12), "agentrun.pikastech.local/runner-job": options.jobName, }, diff --git a/src/mgr/runner-retention.ts b/src/mgr/runner-retention.ts new file mode 100644 index 0000000..4fd3bd3 --- /dev/null +++ b/src/mgr/runner-retention.ts @@ -0,0 +1,364 @@ +import { spawn } from "node:child_process"; +import { AgentRunError } from "../common/errors.js"; +import { redactJson, redactText } from "../common/redaction.js"; +import type { CommandRecord, JsonRecord, JsonValue, RunRecord } from "../common/types.js"; +import { stableHash } from "../common/validation.js"; +import { isTerminalCommandState, isTerminalRunStatus } from "./store.js"; +import type { AgentRunStore } from "./store.js"; + +export interface RunnerRetentionOptions { + namespace: string; + maxRunners: number; + cleanupOrder: "oldest-inactive-last-active-first"; + activeHeartbeatMaxAgeMs: number; + matchLabels: Record; + jobNamePrefixes: string[]; + ageBasedCleanup: { + enabled: boolean; + maxAgeHours: number | null; + }; + kubectlCommand?: string; +} + +export interface RunnerRetentionSummary extends JsonRecord { + enabled: boolean; + namespace: string; + maxRunners: number; + incomingRunnerCount: number; + liveRunnerCountBefore: number; + liveRunnerCountAfter: number; + runnerJobCountBefore: number; + nonTerminalRunnerPodCountBefore: number; + orphanNonTerminalRunnerPodCountBefore: number; + inactiveCandidateCount: number; + protectedActiveRunnerCount: number; + selectedRunnerCount: number; + deletedRunnerJobCount: number; + deletedRunnerPodCount: number; + deletedAssociatedResourceCount: number; + activeRunRisk: boolean; + reason: string; + valuesPrinted: false; +} + +type CandidateKind = "idle" | "terminal" | "inactive"; + +interface K8sObject { + kind?: string; + metadata?: { + name?: string; + namespace?: string; + creationTimestamp?: string; + labels?: Record; + annotations?: Record; + ownerReferences?: Array<{ kind?: string; name?: string }>; + }; + status?: JsonRecord; +} + +interface RunnerResourceEntry { + resourceKind: "job" | "pod"; + name: string; + jobName: string; + runId: string | null; + commandId: string | null; + runnerId: string | null; + createdAtMs: number; + terminal: boolean; + podPhase: string | null; + protectedActive: boolean; + protectedReason: string | null; + candidateKind: CandidateKind; + lastActiveAtMs: number; +} + +interface Snapshot { + jobs: RunnerResourceEntry[]; + pods: RunnerResourceEntry[]; + liveRunnerCount: number; + runnerJobCount: number; + nonTerminalRunnerPodCount: number; + orphanNonTerminalRunnerPodCount: number; +} + +export async function enforceRunnerRetentionBeforeCreate(input: { store: AgentRunStore; options: RunnerRetentionOptions; incomingRunnerCount?: number }): Promise { + const incomingRunnerCount = input.incomingRunnerCount ?? 1; + const before = await runnerSnapshot(input.store, input.options); + const targetBeforeCreate = Math.max(0, input.options.maxRunners - incomingRunnerCount); + const requiredDeleteCount = Math.max(0, before.liveRunnerCount - targetBeforeCreate); + const candidates = [...before.jobs, ...before.pods] + .filter((item) => !item.protectedActive) + .sort(compareCandidates); + const selected = candidates.slice(0, requiredDeleteCount); + const protectedActiveRunnerCount = before.jobs.filter((item) => item.protectedActive).length + before.pods.filter((item) => item.protectedActive).length; + if (requiredDeleteCount > 0 && selected.length < requiredDeleteCount) { + throw new AgentRunError("infra-failed", "runner retention cannot safely make room for a new runner", { + httpStatus: 409, + details: { + reason: "runner-retention-no-safe-candidate", + namespace: input.options.namespace, + maxRunners: input.options.maxRunners, + incomingRunnerCount, + liveRunnerCountBefore: before.liveRunnerCount, + requiredDeleteCount, + inactiveCandidateCount: candidates.length, + protectedActiveRunnerCount, + activeRunRisk: protectedActiveRunnerCount > 0, + valuesPrinted: false, + }, + }); + } + + let deletedRunnerJobCount = 0; + let deletedRunnerPodCount = 0; + let deletedAssociatedResourceCount = 0; + for (const item of selected) { + if (item.resourceKind === "job") { + await kubectlRun(input.options.kubectlCommand ?? "kubectl", ["delete", "job", item.name, "-n", input.options.namespace, "--ignore-not-found=true"]); + deletedRunnerJobCount++; + deletedAssociatedResourceCount += await deleteAssociatedResources(input.options, item.jobName); + } else { + await kubectlRun(input.options.kubectlCommand ?? "kubectl", ["delete", "pod", item.name, "-n", input.options.namespace, "--ignore-not-found=true"]); + deletedRunnerPodCount++; + } + } + const after = selected.length > 0 ? await runnerSnapshot(input.store, input.options) : before; + return { + enabled: true, + namespace: input.options.namespace, + maxRunners: input.options.maxRunners, + incomingRunnerCount, + liveRunnerCountBefore: before.liveRunnerCount, + liveRunnerCountAfter: after.liveRunnerCount, + runnerJobCountBefore: before.runnerJobCount, + nonTerminalRunnerPodCountBefore: before.nonTerminalRunnerPodCount, + orphanNonTerminalRunnerPodCountBefore: before.orphanNonTerminalRunnerPodCount, + inactiveCandidateCount: candidates.length, + protectedActiveRunnerCount, + selectedRunnerCount: selected.length, + deletedRunnerJobCount, + deletedRunnerPodCount, + deletedAssociatedResourceCount, + activeRunRisk: protectedActiveRunnerCount > 0, + reason: requiredDeleteCount === 0 ? "within-limit" : "pre-create-retention", + selected: selected.map((item) => ({ resourceKind: item.resourceKind, name: item.name, jobName: item.jobName, candidateKind: item.candidateKind, protectedActive: false, lastActiveAt: new Date(item.lastActiveAtMs).toISOString(), valuesPrinted: false })), + valuesPrinted: false, + } satisfies RunnerRetentionSummary; +} + +async function runnerSnapshot(store: AgentRunStore, options: RunnerRetentionOptions): Promise { + const [jobObjects, podObjects] = await Promise.all([ + kubectlGetList(options, "jobs", labelSelector(options.matchLabels)), + kubectlGetList(options, "pods", labelSelector(options.matchLabels)), + ]); + const jobs: RunnerResourceEntry[] = []; + for (const item of jobObjects) { + const name = objectName(item); + if (!name || !matchesJobPrefix(name, options.jobNamePrefixes)) continue; + jobs.push(await jobEntry(store, item, options)); + } + const jobNames = new Set(jobs.map((item) => item.jobName)); + const pods: RunnerResourceEntry[] = []; + let nonTerminalRunnerPodCount = 0; + let orphanNonTerminalRunnerPodCount = 0; + for (const item of podObjects) { + const name = objectName(item); + if (!name) continue; + const jobName = podJobName(item) ?? name; + if (!matchesJobPrefix(jobName, options.jobNamePrefixes)) continue; + const terminal = podTerminal(item); + if (!terminal) nonTerminalRunnerPodCount++; + if (jobNames.has(jobName)) continue; + if (!terminal) orphanNonTerminalRunnerPodCount++; + pods.push(await podEntry(store, item, options, jobName)); + } + return { + jobs, + pods, + liveRunnerCount: jobs.length + pods.filter((item) => !item.terminal).length, + runnerJobCount: jobs.length, + nonTerminalRunnerPodCount, + orphanNonTerminalRunnerPodCount, + }; +} + +async function jobEntry(store: AgentRunStore, item: K8sObject, options: RunnerRetentionOptions): Promise { + const name = objectName(item) ?? "unknown"; + const annotations = item.metadata?.annotations ?? {}; + const activity = await activityFor(store, { + runId: annotations["agentrun.pikastech.local/run-id"] ?? null, + commandId: annotations["agentrun.pikastech.local/command-id"] ?? null, + runnerId: annotations["agentrun.pikastech.local/runner-id"] ?? null, + terminal: jobTerminal(item), + createdAtMs: objectCreatedAtMs(item), + activeHeartbeatMaxAgeMs: options.activeHeartbeatMaxAgeMs, + }); + return { + resourceKind: "job", + name, + jobName: name, + createdAtMs: objectCreatedAtMs(item), + terminal: jobTerminal(item), + podPhase: null, + ...activity, + }; +} + +async function podEntry(store: AgentRunStore, item: K8sObject, options: RunnerRetentionOptions, jobName: string): Promise { + const name = objectName(item) ?? "unknown"; + const annotations = item.metadata?.annotations ?? {}; + const activity = await activityFor(store, { + runId: annotations["agentrun.pikastech.local/run-id"] ?? null, + commandId: annotations["agentrun.pikastech.local/command-id"] ?? null, + runnerId: annotations["agentrun.pikastech.local/runner-id"] ?? null, + terminal: podTerminal(item), + createdAtMs: objectCreatedAtMs(item), + activeHeartbeatMaxAgeMs: options.activeHeartbeatMaxAgeMs, + }); + return { + resourceKind: "pod", + name, + jobName, + createdAtMs: objectCreatedAtMs(item), + terminal: podTerminal(item), + podPhase: stringPath(item, ["status", "phase"]), + ...activity, + }; +} + +async function activityFor(store: AgentRunStore, input: { runId: string | null; commandId: string | null; runnerId: string | null; terminal: boolean; createdAtMs: number; activeHeartbeatMaxAgeMs: number }): Promise> { + let run: RunRecord | null = null; + let command: CommandRecord | null = null; + if (input.runId) { + try { run = await store.getRun(input.runId); } catch { run = null; } + } + if (input.commandId) { + try { command = await store.getCommand(input.commandId); } catch { command = null; } + } + const lastActiveAtMs = Math.max(input.createdAtMs, isoMs(run?.updatedAt), isoMs(command?.updatedAt), isoMs(run?.leaseExpiresAt)); + const active = run && !isTerminalRunStatus(run.status) && command && !isTerminalCommandState(command.state) && input.runnerId && run.claimedBy === input.runnerId && heartbeatFresh(run.leaseExpiresAt, input.activeHeartbeatMaxAgeMs); + if (active) { + return { runId: input.runId, commandId: input.commandId, runnerId: input.runnerId, protectedActive: true, protectedReason: "fresh-active-run", candidateKind: "inactive", lastActiveAtMs }; + } + const candidateKind: CandidateKind = command && isTerminalCommandState(command.state) && !input.terminal ? "idle" : input.terminal || (run !== null && isTerminalRunStatus(run.status)) ? "terminal" : "inactive"; + return { runId: input.runId, commandId: input.commandId, runnerId: input.runnerId, protectedActive: false, protectedReason: null, candidateKind, lastActiveAtMs }; +} + +async function kubectlGetList(options: RunnerRetentionOptions, resource: string, selector: string): Promise { + const args = ["get", resource, "-n", options.namespace]; + if (selector) args.push("-l", selector); + args.push("-o", "json"); + const result = await kubectlRun(options.kubectlCommand ?? "kubectl", args); + if (result.code !== 0) throw new AgentRunError("infra-failed", `kubectl get ${resource} failed with code ${result.code}`, { httpStatus: 502, details: redactJson({ stderr: redactText(result.stderr.slice(-2000)), stdout: redactText(result.stdout.slice(-1000)), valuesPrinted: false }) }); + const parsed = parseJsonObject(result.stdout, `kubectl get ${resource}`); + const items = parsed.items; + return Array.isArray(items) ? items.filter((entry) => typeof entry === "object" && entry !== null && !Array.isArray(entry)).map((entry) => entry as unknown as K8sObject) : []; +} + +async function deleteAssociatedResources(options: RunnerRetentionOptions, jobName: string): Promise { + let deleted = 0; + const selector = `agentrun.pikastech.local/runner-job=${jobName}`; + for (const resource of ["secret", "pvc"] as const) { + const result = await kubectlRun(options.kubectlCommand ?? "kubectl", ["delete", resource, "-n", options.namespace, "-l", selector, "--ignore-not-found=true"]); + if (result.code !== 0) throw new AgentRunError("infra-failed", `kubectl delete associated ${resource} failed with code ${result.code}`, { httpStatus: 502, details: redactJson({ stderr: redactText(result.stderr.slice(-2000)), stdout: redactText(result.stdout.slice(-1000)), valuesPrinted: false }) }); + deleted += countDeletedLines(result.stdout); + } + return deleted; +} + +async function kubectlRun(kubectlCommand: string, args: string[]): Promise<{ code: number | null; signal: NodeJS.Signals | null; stdout: string; stderr: string }> { + const child = spawn(kubectlCommand, args, { stdio: ["ignore", "pipe", "pipe"] }); + let stdout = ""; + let stderr = ""; + child.stdout.setEncoding("utf8"); + child.stderr.setEncoding("utf8"); + child.stdout.on("data", (chunk) => { stdout += String(chunk); }); + child.stderr.on("data", (chunk) => { stderr += String(chunk); }); + const result = await new Promise<{ code: number | null; signal: NodeJS.Signals | null }>((resolve, reject) => { + child.on("error", reject); + child.on("close", (code, signal) => resolve({ code, signal })); + }).catch((error: unknown) => { + throw new AgentRunError("infra-failed", `failed to start kubectl: ${error instanceof Error ? error.message : String(error)}`, { httpStatus: 503 }); + }); + return { ...result, stdout: redactText(stdout), stderr: redactText(stderr) }; +} + +function compareCandidates(left: RunnerResourceEntry, right: RunnerResourceEntry): number { + const priority = (kind: CandidateKind): number => kind === "idle" ? 0 : kind === "terminal" ? 1 : 2; + const byKind = priority(left.candidateKind) - priority(right.candidateKind); + if (byKind !== 0) return byKind; + const byActive = left.lastActiveAtMs - right.lastActiveAtMs; + if (byActive !== 0) return byActive; + return left.name.localeCompare(right.name); +} + +function labelSelector(labels: Record): string { + return Object.entries(labels).map(([key, value]) => `${key}=${value}`).join(","); +} + +function matchesJobPrefix(name: string, prefixes: string[]): boolean { + return prefixes.length === 0 || prefixes.some((prefix) => name === prefix || name.startsWith(`${prefix}-`)); +} + +function podJobName(item: K8sObject): string | null { + const labels = item.metadata?.labels ?? {}; + if (labels["job-name"]) return labels["job-name"]; + return item.metadata?.ownerReferences?.find((owner) => owner.kind === "Job")?.name ?? null; +} + +function jobTerminal(item: K8sObject): boolean { + const conditions = item.status?.conditions; + if (!Array.isArray(conditions)) return false; + return conditions.some((condition) => { + const record = condition as JsonRecord; + return (record.type === "Complete" || record.type === "Failed") && record.status === "True"; + }); +} + +function podTerminal(item: K8sObject): boolean { + const phase = stringPath(item, ["status", "phase"]); + return phase === "Succeeded" || phase === "Failed"; +} + +function heartbeatFresh(leaseExpiresAt: string | null, activeHeartbeatMaxAgeMs: number): boolean { + const leaseMs = isoMs(leaseExpiresAt); + if (leaseMs <= 0) return false; + return leaseMs + activeHeartbeatMaxAgeMs >= Date.now(); +} + +function objectName(item: K8sObject): string | null { + return typeof item.metadata?.name === "string" && item.metadata.name.length > 0 ? item.metadata.name : null; +} + +function objectCreatedAtMs(item: K8sObject): number { + return isoMs(item.metadata?.creationTimestamp) || 0; +} + +function isoMs(value: string | null | undefined): number { + if (!value) return 0; + const parsed = Date.parse(value); + return Number.isFinite(parsed) ? parsed : 0; +} + +function stringPath(value: unknown, path: string[]): string | null { + let current: unknown = value; + for (const key of path) { + if (typeof current !== "object" || current === null || Array.isArray(current)) return null; + current = (current as Record)[key]; + } + return typeof current === "string" ? current : null; +} + +function parseJsonObject(text: string, label: string): JsonRecord { + try { + const parsed = JSON.parse(text) as JsonValue; + if (typeof parsed === "object" && parsed !== null && !Array.isArray(parsed)) return parsed; + } catch (error) { + throw new AgentRunError("infra-failed", `${label} returned invalid JSON: ${error instanceof Error ? error.message : String(error)}`, { httpStatus: 502, details: { stdoutHash: stableHash(text), valuesPrinted: false } }); + } + throw new AgentRunError("infra-failed", `${label} returned non-object JSON`, { httpStatus: 502, details: { stdoutHash: stableHash(text), valuesPrinted: false } }); +} + +function countDeletedLines(stdout: string): number { + return stdout.split(/\r?\n/u).filter((line) => /\sdeleted$/u.test(line.trim())).length; +} diff --git a/src/mgr/server.ts b/src/mgr/server.ts index 8889e59..830b6dc 100644 --- a/src/mgr/server.ts +++ b/src/mgr/server.ts @@ -8,6 +8,7 @@ import { asRecord, validateBackendProfile, validateCreateCommand, validateCreate import { isBackendProfile } from "../common/backend-profiles.js"; import type { ApiErrorBody, ApiOkBody, CommandRecord, JsonRecord, JsonValue, QueueTaskRecord, RunEvent, RunRecord, SessionRecord } from "../common/types.js"; import { createKubernetesRunnerJob, type RunnerJobDefaults } from "./kubernetes-runner-job.js"; +import type { RunnerRetentionOptions } from "./runner-retention.js"; import { dispatchQueueTask, refreshQueueTaskFromCore } from "./queue-dispatch.js"; import { buildRunResult } from "./result.js"; import { runnerJobStatusSummary } from "./runner-job-status.js"; @@ -37,6 +38,10 @@ function sessionPvcOptionsForRequest(serverDefaults: { kubectlHandler?: import(" function runnerJobDefaultsForRequest(defaults: ManagerServerOptions["runnerJobDefaults"], sourceCommit: string): RunnerJobDefaults { const namespace = defaults?.namespace ?? process.env.AGENTRUN_RUNTIME_NAMESPACE ?? "agentrun-v01"; + const serviceAccountName = defaults?.serviceAccountName ?? process.env.AGENTRUN_RUNNER_SERVICE_ACCOUNT ?? "agentrun-v01-runner"; + const jobNamePrefix = defaults?.jobNamePrefix ?? process.env.AGENTRUN_RUNNER_JOB_NAME_PREFIX ?? serviceAccountName; + const lane = defaults?.lane ?? process.env.AGENTRUN_LANE ?? "v0.1"; + const retention = runnerRetentionOptionsForRequest(defaults?.retention, namespace, jobNamePrefix, defaults?.kubectlCommand); return { namespace, managerUrl: defaults?.managerUrl ?? process.env.AGENTRUN_INTERNAL_MGR_URL ?? `http://agentrun-mgr.${namespace}.svc.cluster.local:8080`, @@ -45,10 +50,36 @@ function runnerJobDefaultsForRequest(defaults: ManagerServerOptions["runnerJobDe sourceCommit, ...optionalStringRecord("envIdentity", defaults?.envIdentity ?? process.env.AGENTRUN_ENV_IDENTITY), ...optionalStringRecord("artifactCatalogFile", defaults?.artifactCatalogFile ?? process.env.AGENTRUN_ARTIFACT_CATALOG_FILE), - serviceAccountName: defaults?.serviceAccountName ?? process.env.AGENTRUN_RUNNER_SERVICE_ACCOUNT ?? "agentrun-v01-runner", + serviceAccountName, + jobNamePrefix, + lane, ...(defaults?.runnerIdleTimeoutMs !== undefined ? { runnerIdleTimeoutMs: defaults.runnerIdleTimeoutMs } : optionalPositiveIntegerRecord("runnerIdleTimeoutMs", process.env.AGENTRUN_RUNNER_IDLE_TIMEOUT_MS)), ...(defaults?.kubectlCommand ? { kubectlCommand: defaults.kubectlCommand } : {}), ...(defaults?.unideskSshEndpointEnv ? { unideskSshEndpointEnv: defaults.unideskSshEndpointEnv } : {}), + ...(retention ? { retention } : {}), + }; +} + +function runnerRetentionOptionsForRequest(defaults: RunnerRetentionOptions | undefined, namespace: string, jobNamePrefix: string, kubectlCommand?: string): RunnerRetentionOptions | undefined { + if (defaults) return { ...defaults, namespace, ...(kubectlCommand ? { kubectlCommand } : {}) }; + const maxRunners = optionalPositiveInteger("AGENTRUN_RUNNER_RETENTION_MAX_RUNNERS", process.env.AGENTRUN_RUNNER_RETENTION_MAX_RUNNERS); + if (maxRunners === undefined) return undefined; + const activeHeartbeatMaxAgeMs = requiredPositiveInteger("AGENTRUN_RUNNER_RETENTION_ACTIVE_HEARTBEAT_MAX_AGE_MS", process.env.AGENTRUN_RUNNER_RETENTION_ACTIVE_HEARTBEAT_MAX_AGE_MS); + const cleanupOrder = process.env.AGENTRUN_RUNNER_RETENTION_CLEANUP_ORDER ?? "oldest-inactive-last-active-first"; + if (cleanupOrder !== "oldest-inactive-last-active-first") throw new AgentRunError("schema-invalid", "AGENTRUN_RUNNER_RETENTION_CLEANUP_ORDER is unsupported", { httpStatus: 500 }); + const jobNamePrefixes = stringListEnv("AGENTRUN_RUNNER_RETENTION_JOB_NAME_PREFIXES", process.env.AGENTRUN_RUNNER_RETENTION_JOB_NAME_PREFIXES); + return { + namespace, + maxRunners, + cleanupOrder, + activeHeartbeatMaxAgeMs, + matchLabels: jsonRecordEnv("AGENTRUN_RUNNER_RETENTION_MATCH_LABELS_JSON", process.env.AGENTRUN_RUNNER_RETENTION_MATCH_LABELS_JSON), + jobNamePrefixes: jobNamePrefixes.length > 0 ? jobNamePrefixes : [jobNamePrefix], + ageBasedCleanup: { + enabled: booleanEnv("AGENTRUN_RUNNER_RETENTION_AGE_BASED_CLEANUP_ENABLED", process.env.AGENTRUN_RUNNER_RETENTION_AGE_BASED_CLEANUP_ENABLED ?? "false"), + maxAgeHours: optionalPositiveInteger("AGENTRUN_RUNNER_RETENTION_AGE_BASED_MAX_AGE_HOURS", process.env.AGENTRUN_RUNNER_RETENTION_AGE_BASED_MAX_AGE_HOURS) ?? null, + }, + ...(kubectlCommand ? { kubectlCommand } : {}), }; } @@ -65,9 +96,12 @@ export interface ManagerServerOptions { envIdentity?: string; artifactCatalogFile?: string; serviceAccountName?: string; + jobNamePrefix?: string; + lane?: string; runnerIdleTimeoutMs?: number; kubectlCommand?: string; unideskSshEndpointEnv?: JsonRecord; + retention?: RunnerRetentionOptions; }; sessionPvcOptions?: { kubectlHandler?: import("./session-pvc.js").KubectlHandler; kubectlCommand?: string; storageClassName?: string; size?: string }; providerProfileOptions?: { namespace?: string; kubectlCommand?: string }; @@ -926,10 +960,45 @@ function optionalStringRecord(key: string, value: unknown): JsonRecord { } function optionalPositiveIntegerRecord(key: string, value: unknown): JsonRecord { - if (value === undefined || value === null || value === "") return {}; + const parsed = optionalPositiveInteger(key, value); + if (parsed === undefined) return {}; + return { [key]: parsed }; +} + +function optionalPositiveInteger(key: string, value: unknown): number | undefined { + if (value === undefined || value === null || value === "") return undefined; const parsed = Number(value); if (!Number.isInteger(parsed) || parsed <= 0) throw new AgentRunError("schema-invalid", `${key} must be a positive integer`, { httpStatus: 400 }); - return { [key]: parsed }; + return parsed; +} + +function requiredPositiveInteger(key: string, value: unknown): number { + const parsed = optionalPositiveInteger(key, value); + if (parsed === undefined) throw new AgentRunError("schema-invalid", `${key} is required when runner retention is enabled`, { httpStatus: 500 }); + return parsed; +} + +function jsonRecordEnv(key: string, value: unknown): Record { + if (typeof value !== "string" || value.trim().length === 0) throw new AgentRunError("schema-invalid", `${key} is required when runner retention is enabled`, { httpStatus: 500 }); + const parsed = JSON.parse(value) as unknown; + if (typeof parsed !== "object" || parsed === null || Array.isArray(parsed)) throw new AgentRunError("schema-invalid", `${key} must be a JSON object`, { httpStatus: 500 }); + const out: Record = {}; + for (const [entryKey, entryValue] of Object.entries(parsed)) { + if (typeof entryValue !== "string" || entryValue.length === 0) throw new AgentRunError("schema-invalid", `${key}.${entryKey} must be a non-empty string`, { httpStatus: 500 }); + out[entryKey] = entryValue; + } + return out; +} + +function stringListEnv(key: string, value: unknown): string[] { + if (typeof value !== "string") throw new AgentRunError("schema-invalid", `${key} is required when runner retention is enabled`, { httpStatus: 500 }); + return value.split(",").map((item) => item.trim()).filter((item) => item.length > 0); +} + +function booleanEnv(key: string, value: string): boolean { + if (value === "true") return true; + if (value === "false") return false; + throw new AgentRunError("schema-invalid", `${key} must be true or false`, { httpStatus: 500 }); } function normalizeError(error: unknown): AgentRunError { diff --git a/src/runner/k8s-job.ts b/src/runner/k8s-job.ts index cea28bc..d5d711f 100644 --- a/src/runner/k8s-job.ts +++ b/src/runner/k8s-job.ts @@ -50,6 +50,8 @@ export interface RunnerJobRenderOptions { runnerId?: string; sourceCommit?: string; serviceAccountName?: string; + jobNamePrefix?: string; + lane?: string; imagePullPolicy?: string; backoffLimit?: number; ttlSecondsAfterFinished?: number; @@ -152,9 +154,11 @@ export function renderRunnerJobManifest(options: RunnerJobRenderOptions): { mani const runnerId = options.runnerId ?? `runner_${shortHash(`${options.run.id}:${attemptId}:${options.commandId}`)}`; const sourceCommit = options.sourceCommit ?? process.env.AGENTRUN_SOURCE_COMMIT ?? "unknown"; const serviceAccountName = options.serviceAccountName ?? "agentrun-v01-runner"; + const jobNamePrefix = normalizeJobNamePrefix(options.jobNamePrefix ?? serviceAccountName); + const lane = options.lane ?? process.env.AGENTRUN_LANE ?? "v0.1"; const ttlSecondsAfterFinished = options.ttlSecondsAfterFinished ?? 86_400; const runnerIdleTimeoutMs = normalizeRunnerIdleTimeoutMs(options.runnerIdleTimeoutMs); - const jobName = `agentrun-v01-runner-${shortDnsHash(options.run.id, attemptId)}`; + const jobName = `${jobNamePrefix}-${shortDnsHash(options.run.id, attemptId)}`; const secretRefs = credentialProjections(options.run, namespace); const toolCredentials = toolCredentialProjections(options.run, namespace); const sessionPvc = options.sessionPvc; @@ -167,7 +171,7 @@ export function renderRunnerJobManifest(options: RunnerJobRenderOptions): { mani metadata: { name: jobName, namespace, - labels: labels(options.run, jobName), + labels: labels(options.run, jobName, lane), annotations: { "agentrun.pikastech.local/run-id": options.run.id, "agentrun.pikastech.local/command-id": options.commandId, @@ -179,7 +183,7 @@ export function renderRunnerJobManifest(options: RunnerJobRenderOptions): { mani ttlSecondsAfterFinished, template: { metadata: { - labels: labels(options.run, jobName), + labels: labels(options.run, jobName, lane), annotations: { "agentrun.pikastech.local/run-id": options.run.id, "agentrun.pikastech.local/command-id": options.commandId, @@ -483,17 +487,24 @@ function defaultRuntimeHome(profile: string): string { return `/home/agentrun/.codex-${sanitizeVolumeName(profile)}`; } -function labels(run: RunRecord, jobName: string): JsonRecord { +function labels(run: RunRecord, jobName: string, lane: string): JsonRecord { return { "app.kubernetes.io/name": "agentrun-runner", "app.kubernetes.io/component": "runner", "app.kubernetes.io/part-of": "agentrun", - "agentrun.pikastech.local/lane": "v0.1", + "agentrun.pikastech.local/lane": lane, "agentrun.pikastech.local/run-hash": shortHash(run.id), "job-name": jobName, }; } +function normalizeJobNamePrefix(value: string): string { + const normalized = value.toLowerCase().replace(/[^a-z0-9-]+/gu, "-").replace(/^-+|-+$/gu, ""); + if (!normalized) throw new Error("jobNamePrefix must contain at least one DNS label character"); + if (normalized.length > 46) return normalized.slice(0, 46).replace(/-+$/u, "") || "agentrun-runner"; + return normalized; +} + function shortDnsHash(...parts: string[]): string { return shortHash(parts.join(":")); } diff --git a/src/selftest/cases/20-runner-k8s-job.ts b/src/selftest/cases/20-runner-k8s-job.ts index 778fc0c..9c9f86d 100644 --- a/src/selftest/cases/20-runner-k8s-job.ts +++ b/src/selftest/cases/20-runner-k8s-job.ts @@ -265,6 +265,118 @@ process.exit(1); } finally { await new Promise((resolve) => serverWithKubectl.server.close(() => resolve())); } + const retentionKubectl = path.join(context.tmp, "fake-kubectl-retention.js"); + const retentionCreatedManifest = path.join(context.tmp, "created-retention-runner-job.json"); + const retentionDeleted = path.join(context.tmp, "retention-deleted.json"); + await writeFile(retentionDeleted, "[]\n"); + await writeFile(retentionKubectl, `#!/usr/bin/env bun +const args = Bun.argv.slice(2); +const deletedPath = ${JSON.stringify(retentionDeleted)}; +async function readDeleted() { + try { return JSON.parse(await Bun.file(deletedPath).text()); } catch { return []; } +} +async function writeDeleted(items) { await Bun.write(deletedPath, JSON.stringify(items, null, 2) + "\\n"); } +async function readStdin() { + const chunks = []; + for await (const chunk of Bun.stdin.stream()) chunks.push(chunk); + return Buffer.concat(chunks.map((chunk) => Buffer.from(chunk))).toString("utf8"); +} +function runnerJob(name, createdAt, commandId) { + return { + apiVersion: "batch/v1", + kind: "Job", + metadata: { + name, + namespace: "agentrun-v02", + creationTimestamp: createdAt, + labels: { "app.kubernetes.io/part-of": "agentrun", "app.kubernetes.io/name": "agentrun-runner", "app.kubernetes.io/component": "runner", "agentrun.pikastech.local/lane": "v0.2", "job-name": name }, + annotations: { "agentrun.pikastech.local/run-id": "run-old", "agentrun.pikastech.local/command-id": commandId, "agentrun.pikastech.local/runner-id": "runner-old" }, + }, + status: {}, + }; +} +if (args[0] === "get" && args[1] === "jobs") { + const deleted = await readDeleted(); + const deletedJobs = new Set(deleted.filter((item) => item.resource === "job").map((item) => item.name)); + const items = [ + runnerJob("agentrun-v02-runner-old-a", "2026-01-01T00:00:00Z", "cmd-old-a"), + runnerJob("agentrun-v02-runner-old-b", "2026-01-01T00:01:00Z", "cmd-old-b"), + ].filter((item) => !deletedJobs.has(item.metadata.name)); + console.log(JSON.stringify({ apiVersion: "v1", kind: "List", items })); + process.exit(0); +} +if (args[0] === "get" && args[1] === "pods") { + console.log(JSON.stringify({ apiVersion: "v1", kind: "List", items: [] })); + process.exit(0); +} +if (args[0] === "delete") { + const deleted = await readDeleted(); + const resource = args[1]; + const name = args[2] && !args[2].startsWith("-") ? args[2] : args.join(" "); + deleted.push({ resource, name, args }); + await writeDeleted(deleted); + console.log(String(resource) + "/" + String(name).replace(/[^a-z0-9-]+/g, "-") + " deleted"); + process.exit(0); +} +if (args[0] === "create") { + const text = await readStdin(); + const manifest = JSON.parse(text); + if (manifest.kind === "Job") await Bun.write(${JSON.stringify(retentionCreatedManifest)}, text); + console.log(JSON.stringify({ apiVersion: manifest.apiVersion, kind: manifest.kind, metadata: { uid: "retention-job-uid", resourceVersion: "1", name: manifest.metadata.name, namespace: manifest.metadata.namespace } })); + process.exit(0); +} +console.error("unsupported fake retention kubectl args: " + args.join(" ")); +process.exit(1); +`); + await chmod(retentionKubectl, 0o755); + const serverWithRetention = await startManagerServer({ + port: 0, + host: "127.0.0.1", + sourceCommit: "self-test", + store: new MemoryAgentRunStore(), + runnerJobDefaults: { + namespace: "agentrun-v02", + managerUrl: "http://agentrun-mgr.agentrun-v02.svc.cluster.local:8080", + image: "127.0.0.1:5000/agentrun/agentrun-mgr@sha256:1111111111111111111111111111111111111111111111111111111111111111", + serviceAccountName: "agentrun-v02-runner", + jobNamePrefix: "agentrun-v02-runner", + lane: "v0.2", + kubectlCommand: retentionKubectl, + retention: { + namespace: "agentrun-v02", + maxRunners: 1, + cleanupOrder: "oldest-inactive-last-active-first", + activeHeartbeatMaxAgeMs: 900_000, + matchLabels: { "app.kubernetes.io/part-of": "agentrun", "app.kubernetes.io/name": "agentrun-runner", "app.kubernetes.io/component": "runner" }, + jobNamePrefixes: ["agentrun-v02-runner"], + ageBasedCleanup: { enabled: false, maxAgeHours: null }, + }, + }, + }); + try { + const retentionClient = new ManagerClient(serverWithRetention.baseUrl); + const retentionItem = await createRunWithCommand(retentionClient, context, "retention job create", "selftest-runner-retention", 15_000); + const retentionCreated = await retentionClient.post(`/api/v1/runs/${retentionItem.runId}/runner-jobs`, { commandId: retentionItem.commandId, attemptId: "attempt_selftest_retention" }) as JsonRecord; + const retention = (retentionCreated.retention as JsonRecord).preCreateCleanup as JsonRecord; + assert.equal(retention.liveRunnerCountBefore, 2); + assert.equal(retention.selectedRunnerCount, 2); + assert.equal(retention.deletedRunnerJobCount, 2); + assert.equal(retention.liveRunnerCountAfter, 0); + assert.equal(retention.maxRunners, 1); + assert.equal(retention.valuesPrinted, false); + assert.match(String(retentionCreated.jobName), /^agentrun-v02-runner-[a-f0-9]{12}$/u); + const retentionManifest = JSON.parse(await readFile(retentionCreatedManifest, "utf8")) as JsonRecord; + const metadata = retentionManifest.metadata as JsonRecord; + const labels = metadata.labels as JsonRecord; + assert.equal(labels["agentrun.pikastech.local/lane"], "v0.2"); + assert.match(String(metadata.name), /^agentrun-v02-runner-[a-f0-9]{12}$/u); + const deletedItems = JSON.parse(await readFile(retentionDeleted, "utf8")) as JsonRecord[]; + assert.equal(deletedItems.filter((item) => item.resource === "job").length, 2); + assert.ok(deletedItems.some((item) => String((item.args as string[]).join(" ")).includes("agentrun.pikastech.local/runner-job=agentrun-v02-runner-old-a")), "old runner associated resources should be selected by runner-job label"); + assertNoSecretLeak(retentionCreated); + } finally { + await new Promise((resolve) => serverWithRetention.server.close(() => resolve())); + } const sessionRunRecord: RunRecord = { id: "run-selftest-session-pvc", tenantId: "unidesk", @@ -307,7 +419,7 @@ process.exit(1); assert.equal(envMap.get("AGENTRUN_SESSION_PVC_NAMESPACE"), "agentrun-v01"); assert.equal(envMap.get("AGENTRUN_SESSION_PVC_MOUNT_PATH"), "/home/agentrun/.codex-codex/sessions"); assert.equal(envMap.get("AGENTRUN_CODEX_ROLLOUT_SUBDIR"), "sessions"); - return { name: "runner-k8s-job", tests: ["runner-k8s-job-dry-run", "runner-k8s-job-codex-shell-sandbox-env", "runner-k8s-job-g14-egress-proxy-env", "runner-k8s-job-bounded-git-transport-env", "runner-k8s-job-deepseek-profile-dry-run", "runner-k8s-job-minimax-m3-profile-dry-run", "runner-k8s-job-dsflash-go-profile-dry-run", "runner-k8s-job-dsflash-go-legacy-secretref-normalized", "runner-k8s-job-create-api", "runner-k8s-job-retention-ttl", "runner-job-transient-env", "runner-job-transient-env-secretref", "runner-job-tool-credential-env", "runner-job-unidesk-ssh-tool-credential-env", "runner-job-tool-credential-volume", "runner-job-unidesk-ssh-endpoint-auto-env", "runner-job-unidesk-ssh-transient-env-denied", "runner-k8s-job-session-pvc-volume-and-env"] }; + return { name: "runner-k8s-job", tests: ["runner-k8s-job-dry-run", "runner-k8s-job-codex-shell-sandbox-env", "runner-k8s-job-g14-egress-proxy-env", "runner-k8s-job-bounded-git-transport-env", "runner-k8s-job-deepseek-profile-dry-run", "runner-k8s-job-minimax-m3-profile-dry-run", "runner-k8s-job-dsflash-go-profile-dry-run", "runner-k8s-job-dsflash-go-legacy-secretref-normalized", "runner-k8s-job-create-api", "runner-k8s-job-retention-ttl", "runner-job-transient-env", "runner-job-transient-env-secretref", "runner-job-tool-credential-env", "runner-job-unidesk-ssh-tool-credential-env", "runner-job-tool-credential-volume", "runner-job-unidesk-ssh-endpoint-auto-env", "runner-job-unidesk-ssh-transient-env-denied", "runner-job-pre-create-retention", "runner-k8s-job-session-pvc-volume-and-env"] }; } finally { await new Promise((resolve) => server.server.close(() => resolve())); }