fix: enforce runner retention before job create

This commit is contained in:
lyon
2026-06-19 22:35:55 +08:00
parent 56ecb493ef
commit 1be0e8e02f
5 changed files with 587 additions and 15 deletions
+22 -6
View File
@@ -11,6 +11,8 @@ import { staticWorkReadyCapabilitySummary } from "../common/work-ready.js";
import { resolveRunnerEnvImage } from "../common/env-image-ref.js";
import { ensureSessionPvc } from "./session-pvc.js";
import { gitTransportSummary } from "../common/git-transport.js";
import { enforceRunnerRetentionBeforeCreate } from "./runner-retention.js";
import type { RunnerRetentionOptions, RunnerRetentionSummary } from "./runner-retention.js";
const reusableCredentialEnvNames = new Set([
"AUTH_PASSWORD",
@@ -44,9 +46,12 @@ export interface RunnerJobDefaults {
envIdentity?: string;
artifactCatalogFile?: string;
serviceAccountName?: string;
jobNamePrefix?: string;
lane?: string;
runnerIdleTimeoutMs?: number;
kubectlCommand?: string;
unideskSshEndpointEnv?: JsonRecord;
retention?: RunnerRetentionOptions;
}
export interface CreateRunnerJobInput extends JsonRecord {
@@ -83,12 +88,14 @@ export async function createKubernetesRunnerJob(options: { store: AgentRunStore;
const managerUrl = optionalString(options.input.managerUrl) ?? options.defaults.managerUrl;
const sourceCommit = optionalString(options.input.sourceCommit) ?? options.defaults.sourceCommit;
const serviceAccountName = optionalString(options.input.serviceAccountName) ?? options.defaults.serviceAccountName;
const jobNamePrefix = options.defaults.jobNamePrefix;
const lane = options.defaults.lane;
const idempotencyKey = optionalString(options.input.idempotencyKey);
const transientEnv = assembleToolContextTransientEnv(run.executionPolicy, transientEnvField(options.input.transientEnv), options.defaults);
const attemptId = optionalString(options.input.attemptId) ?? `attempt_${Date.now().toString(36)}`;
const runnerId = optionalString(options.input.runnerId);
const runnerIdleTimeoutMs = optionalPositiveInteger(options.input.runnerIdleTimeoutMs, "runnerIdleTimeoutMs") ?? options.defaults.runnerIdleTimeoutMs;
const transientEnvSecretName = transientEnv.length > 0 ? transientEnvSecretNameForRun(run.id, commandId, attemptId) : null;
const transientEnvSecretName = transientEnv.length > 0 ? transientEnvSecretNameForRun(run.id, commandId, attemptId, jobNamePrefix) : null;
const renderTransientEnv = transientEnvSecretName ? transientEnvWithSecretRefs(transientEnv, transientEnvSecretName) : transientEnv;
const normalizedPayload = {
commandId,
@@ -110,6 +117,10 @@ export async function createKubernetesRunnerJob(options: { store: AgentRunStore;
}
if (isTerminalRunStatus(run.status)) throw new AgentRunError(run.failureKind ?? (run.status === "cancelled" ? "cancelled" : "schema-invalid"), `run ${run.id} is already terminal: ${run.status}`, { httpStatus: 409 });
if (isTerminalCommandState(command.state) || command.state !== "pending") throw new AgentRunError(command.state === "cancelled" ? "cancelled" : "schema-invalid", `command ${commandId} is not pending: ${command.state}`, { httpStatus: 409 });
let preCreateRetention: RunnerRetentionSummary | null = null;
if (options.defaults.retention) {
preCreateRetention = await enforceRunnerRetentionBeforeCreate({ store: options.store, options: { ...options.defaults.retention, namespace }, incomingRunnerCount: 1 });
}
let sessionPvc: RunnerSessionPvcOptions | undefined;
let sessionPvcSummary: JsonRecord | null = null;
if (run.sessionRef?.sessionId) {
@@ -156,6 +167,8 @@ export async function createKubernetesRunnerJob(options: { store: AgentRunStore;
transientEnv: renderTransientEnv,
...(runnerIdleTimeoutMs !== undefined ? { runnerIdleTimeoutMs } : {}),
...(serviceAccountName ? { serviceAccountName } : {}),
...(jobNamePrefix ? { jobNamePrefix } : {}),
...(lane ? { lane } : {}),
...(sessionPvc ? { sessionPvc } : {}),
};
const render = renderRunnerJobManifest({ ...renderOptions, attemptId, ...(runnerId ? { runnerId } : {}) });
@@ -165,7 +178,7 @@ export async function createKubernetesRunnerJob(options: { store: AgentRunStore;
let created: JsonRecord | null = null;
try {
if (transientEnvSecretName) {
await kubectlCreate(transientEnvSecretManifest({ namespace: render.namespace, name: transientEnvSecretName, runId: run.id, commandId, attemptId: render.attemptId, runnerId: render.runnerId, jobName: render.jobName, items: transientEnv }), kubectlCommand, "runner transient env secret");
await kubectlCreate(transientEnvSecretManifest({ namespace: render.namespace, name: transientEnvSecretName, runId: run.id, commandId, attemptId: render.attemptId, runnerId: render.runnerId, jobName: render.jobName, lane: lane ?? "v0.1", items: transientEnv }), kubectlCommand, "runner transient env secret");
transientEnvSecretCreated = true;
}
created = await kubectlCreate(render.manifest, kubectlCommand, "runner job");
@@ -220,6 +233,7 @@ export async function createKubernetesRunnerJob(options: { store: AgentRunStore;
retention: {
ttlSecondsAfterFinished: render.ttlSecondsAfterFinished,
runnerIdleTimeoutMs: render.runnerIdleTimeoutMs,
preCreateCleanup: preCreateRetention,
},
pollActions: [
runnerJobActionDescriptor({ action: "inspect-run", operation: "describe", resourceKind: "run", resourceName: run.id, runId: run.id }),
@@ -263,6 +277,7 @@ export async function createKubernetesRunnerJob(options: { store: AgentRunStore;
envImage,
gitTransport: gitTransportSummary(),
workReady: staticWorkReadyCapabilitySummary(),
retention: preCreateRetention,
toolCredentials: summarizeToolCredentials(render.toolCredentials, render.namespace),
sessionPvc: sessionPvcSummary,
sessionRef: summarizeSessionRef(run.sessionRef ?? null),
@@ -357,11 +372,12 @@ function transientEnvWithSecretRefs(items: RunnerTransientEnv[], secretName: str
return items.map((item) => ({ ...item, secretRef: { name: secretName, key: item.name } }));
}
function transientEnvSecretNameForRun(runId: string, commandId: string, attemptId: string): string {
return `agentrun-v01-runner-env-${stableHash({ runId, commandId, attemptId }).slice(0, 20)}`;
function transientEnvSecretNameForRun(runId: string, commandId: string, attemptId: string, jobNamePrefix: string | undefined): string {
const prefix = (jobNamePrefix ?? "agentrun-v01-runner").toLowerCase().replace(/[^a-z0-9-]+/gu, "-").replace(/^-+|-+$/gu, "") || "agentrun-runner";
return `${prefix.slice(0, 36).replace(/-+$/u, "")}-env-${stableHash({ runId, commandId, attemptId }).slice(0, 20)}`;
}
function transientEnvSecretManifest(options: { namespace: string; name: string; runId: string; commandId: string; attemptId: string; runnerId: string; jobName: string; items: RunnerTransientEnv[] }): JsonRecord {
function transientEnvSecretManifest(options: { namespace: string; name: string; runId: string; commandId: string; attemptId: string; runnerId: string; jobName: string; lane: string; items: RunnerTransientEnv[] }): JsonRecord {
const stringData: JsonRecord = {};
for (const item of options.items) stringData[item.name] = item.value;
return {
@@ -374,7 +390,7 @@ function transientEnvSecretManifest(options: { namespace: string; name: string;
"app.kubernetes.io/name": "agentrun-runner",
"app.kubernetes.io/component": "runner-transient-env",
"app.kubernetes.io/part-of": "agentrun",
"agentrun.pikastech.local/lane": "v0.1",
"agentrun.pikastech.local/lane": options.lane,
"agentrun.pikastech.local/run-hash": stableHash(options.runId).slice(0, 12),
"agentrun.pikastech.local/runner-job": options.jobName,
},
+364
View File
@@ -0,0 +1,364 @@
import { spawn } from "node:child_process";
import { AgentRunError } from "../common/errors.js";
import { redactJson, redactText } from "../common/redaction.js";
import type { CommandRecord, JsonRecord, JsonValue, RunRecord } from "../common/types.js";
import { stableHash } from "../common/validation.js";
import { isTerminalCommandState, isTerminalRunStatus } from "./store.js";
import type { AgentRunStore } from "./store.js";
export interface RunnerRetentionOptions {
namespace: string;
maxRunners: number;
cleanupOrder: "oldest-inactive-last-active-first";
activeHeartbeatMaxAgeMs: number;
matchLabels: Record<string, string>;
jobNamePrefixes: string[];
ageBasedCleanup: {
enabled: boolean;
maxAgeHours: number | null;
};
kubectlCommand?: string;
}
export interface RunnerRetentionSummary extends JsonRecord {
enabled: boolean;
namespace: string;
maxRunners: number;
incomingRunnerCount: number;
liveRunnerCountBefore: number;
liveRunnerCountAfter: number;
runnerJobCountBefore: number;
nonTerminalRunnerPodCountBefore: number;
orphanNonTerminalRunnerPodCountBefore: number;
inactiveCandidateCount: number;
protectedActiveRunnerCount: number;
selectedRunnerCount: number;
deletedRunnerJobCount: number;
deletedRunnerPodCount: number;
deletedAssociatedResourceCount: number;
activeRunRisk: boolean;
reason: string;
valuesPrinted: false;
}
type CandidateKind = "idle" | "terminal" | "inactive";
interface K8sObject {
kind?: string;
metadata?: {
name?: string;
namespace?: string;
creationTimestamp?: string;
labels?: Record<string, string>;
annotations?: Record<string, string>;
ownerReferences?: Array<{ kind?: string; name?: string }>;
};
status?: JsonRecord;
}
interface RunnerResourceEntry {
resourceKind: "job" | "pod";
name: string;
jobName: string;
runId: string | null;
commandId: string | null;
runnerId: string | null;
createdAtMs: number;
terminal: boolean;
podPhase: string | null;
protectedActive: boolean;
protectedReason: string | null;
candidateKind: CandidateKind;
lastActiveAtMs: number;
}
interface Snapshot {
jobs: RunnerResourceEntry[];
pods: RunnerResourceEntry[];
liveRunnerCount: number;
runnerJobCount: number;
nonTerminalRunnerPodCount: number;
orphanNonTerminalRunnerPodCount: number;
}
export async function enforceRunnerRetentionBeforeCreate(input: { store: AgentRunStore; options: RunnerRetentionOptions; incomingRunnerCount?: number }): Promise<RunnerRetentionSummary> {
const incomingRunnerCount = input.incomingRunnerCount ?? 1;
const before = await runnerSnapshot(input.store, input.options);
const targetBeforeCreate = Math.max(0, input.options.maxRunners - incomingRunnerCount);
const requiredDeleteCount = Math.max(0, before.liveRunnerCount - targetBeforeCreate);
const candidates = [...before.jobs, ...before.pods]
.filter((item) => !item.protectedActive)
.sort(compareCandidates);
const selected = candidates.slice(0, requiredDeleteCount);
const protectedActiveRunnerCount = before.jobs.filter((item) => item.protectedActive).length + before.pods.filter((item) => item.protectedActive).length;
if (requiredDeleteCount > 0 && selected.length < requiredDeleteCount) {
throw new AgentRunError("infra-failed", "runner retention cannot safely make room for a new runner", {
httpStatus: 409,
details: {
reason: "runner-retention-no-safe-candidate",
namespace: input.options.namespace,
maxRunners: input.options.maxRunners,
incomingRunnerCount,
liveRunnerCountBefore: before.liveRunnerCount,
requiredDeleteCount,
inactiveCandidateCount: candidates.length,
protectedActiveRunnerCount,
activeRunRisk: protectedActiveRunnerCount > 0,
valuesPrinted: false,
},
});
}
let deletedRunnerJobCount = 0;
let deletedRunnerPodCount = 0;
let deletedAssociatedResourceCount = 0;
for (const item of selected) {
if (item.resourceKind === "job") {
await kubectlRun(input.options.kubectlCommand ?? "kubectl", ["delete", "job", item.name, "-n", input.options.namespace, "--ignore-not-found=true"]);
deletedRunnerJobCount++;
deletedAssociatedResourceCount += await deleteAssociatedResources(input.options, item.jobName);
} else {
await kubectlRun(input.options.kubectlCommand ?? "kubectl", ["delete", "pod", item.name, "-n", input.options.namespace, "--ignore-not-found=true"]);
deletedRunnerPodCount++;
}
}
const after = selected.length > 0 ? await runnerSnapshot(input.store, input.options) : before;
return {
enabled: true,
namespace: input.options.namespace,
maxRunners: input.options.maxRunners,
incomingRunnerCount,
liveRunnerCountBefore: before.liveRunnerCount,
liveRunnerCountAfter: after.liveRunnerCount,
runnerJobCountBefore: before.runnerJobCount,
nonTerminalRunnerPodCountBefore: before.nonTerminalRunnerPodCount,
orphanNonTerminalRunnerPodCountBefore: before.orphanNonTerminalRunnerPodCount,
inactiveCandidateCount: candidates.length,
protectedActiveRunnerCount,
selectedRunnerCount: selected.length,
deletedRunnerJobCount,
deletedRunnerPodCount,
deletedAssociatedResourceCount,
activeRunRisk: protectedActiveRunnerCount > 0,
reason: requiredDeleteCount === 0 ? "within-limit" : "pre-create-retention",
selected: selected.map((item) => ({ resourceKind: item.resourceKind, name: item.name, jobName: item.jobName, candidateKind: item.candidateKind, protectedActive: false, lastActiveAt: new Date(item.lastActiveAtMs).toISOString(), valuesPrinted: false })),
valuesPrinted: false,
} satisfies RunnerRetentionSummary;
}
async function runnerSnapshot(store: AgentRunStore, options: RunnerRetentionOptions): Promise<Snapshot> {
const [jobObjects, podObjects] = await Promise.all([
kubectlGetList(options, "jobs", labelSelector(options.matchLabels)),
kubectlGetList(options, "pods", labelSelector(options.matchLabels)),
]);
const jobs: RunnerResourceEntry[] = [];
for (const item of jobObjects) {
const name = objectName(item);
if (!name || !matchesJobPrefix(name, options.jobNamePrefixes)) continue;
jobs.push(await jobEntry(store, item, options));
}
const jobNames = new Set(jobs.map((item) => item.jobName));
const pods: RunnerResourceEntry[] = [];
let nonTerminalRunnerPodCount = 0;
let orphanNonTerminalRunnerPodCount = 0;
for (const item of podObjects) {
const name = objectName(item);
if (!name) continue;
const jobName = podJobName(item) ?? name;
if (!matchesJobPrefix(jobName, options.jobNamePrefixes)) continue;
const terminal = podTerminal(item);
if (!terminal) nonTerminalRunnerPodCount++;
if (jobNames.has(jobName)) continue;
if (!terminal) orphanNonTerminalRunnerPodCount++;
pods.push(await podEntry(store, item, options, jobName));
}
return {
jobs,
pods,
liveRunnerCount: jobs.length + pods.filter((item) => !item.terminal).length,
runnerJobCount: jobs.length,
nonTerminalRunnerPodCount,
orphanNonTerminalRunnerPodCount,
};
}
async function jobEntry(store: AgentRunStore, item: K8sObject, options: RunnerRetentionOptions): Promise<RunnerResourceEntry> {
const name = objectName(item) ?? "unknown";
const annotations = item.metadata?.annotations ?? {};
const activity = await activityFor(store, {
runId: annotations["agentrun.pikastech.local/run-id"] ?? null,
commandId: annotations["agentrun.pikastech.local/command-id"] ?? null,
runnerId: annotations["agentrun.pikastech.local/runner-id"] ?? null,
terminal: jobTerminal(item),
createdAtMs: objectCreatedAtMs(item),
activeHeartbeatMaxAgeMs: options.activeHeartbeatMaxAgeMs,
});
return {
resourceKind: "job",
name,
jobName: name,
createdAtMs: objectCreatedAtMs(item),
terminal: jobTerminal(item),
podPhase: null,
...activity,
};
}
async function podEntry(store: AgentRunStore, item: K8sObject, options: RunnerRetentionOptions, jobName: string): Promise<RunnerResourceEntry> {
const name = objectName(item) ?? "unknown";
const annotations = item.metadata?.annotations ?? {};
const activity = await activityFor(store, {
runId: annotations["agentrun.pikastech.local/run-id"] ?? null,
commandId: annotations["agentrun.pikastech.local/command-id"] ?? null,
runnerId: annotations["agentrun.pikastech.local/runner-id"] ?? null,
terminal: podTerminal(item),
createdAtMs: objectCreatedAtMs(item),
activeHeartbeatMaxAgeMs: options.activeHeartbeatMaxAgeMs,
});
return {
resourceKind: "pod",
name,
jobName,
createdAtMs: objectCreatedAtMs(item),
terminal: podTerminal(item),
podPhase: stringPath(item, ["status", "phase"]),
...activity,
};
}
async function activityFor(store: AgentRunStore, input: { runId: string | null; commandId: string | null; runnerId: string | null; terminal: boolean; createdAtMs: number; activeHeartbeatMaxAgeMs: number }): Promise<Pick<RunnerResourceEntry, "runId" | "commandId" | "runnerId" | "protectedActive" | "protectedReason" | "candidateKind" | "lastActiveAtMs">> {
let run: RunRecord | null = null;
let command: CommandRecord | null = null;
if (input.runId) {
try { run = await store.getRun(input.runId); } catch { run = null; }
}
if (input.commandId) {
try { command = await store.getCommand(input.commandId); } catch { command = null; }
}
const lastActiveAtMs = Math.max(input.createdAtMs, isoMs(run?.updatedAt), isoMs(command?.updatedAt), isoMs(run?.leaseExpiresAt));
const active = run && !isTerminalRunStatus(run.status) && command && !isTerminalCommandState(command.state) && input.runnerId && run.claimedBy === input.runnerId && heartbeatFresh(run.leaseExpiresAt, input.activeHeartbeatMaxAgeMs);
if (active) {
return { runId: input.runId, commandId: input.commandId, runnerId: input.runnerId, protectedActive: true, protectedReason: "fresh-active-run", candidateKind: "inactive", lastActiveAtMs };
}
const candidateKind: CandidateKind = command && isTerminalCommandState(command.state) && !input.terminal ? "idle" : input.terminal || (run !== null && isTerminalRunStatus(run.status)) ? "terminal" : "inactive";
return { runId: input.runId, commandId: input.commandId, runnerId: input.runnerId, protectedActive: false, protectedReason: null, candidateKind, lastActiveAtMs };
}
async function kubectlGetList(options: RunnerRetentionOptions, resource: string, selector: string): Promise<K8sObject[]> {
const args = ["get", resource, "-n", options.namespace];
if (selector) args.push("-l", selector);
args.push("-o", "json");
const result = await kubectlRun(options.kubectlCommand ?? "kubectl", args);
if (result.code !== 0) throw new AgentRunError("infra-failed", `kubectl get ${resource} failed with code ${result.code}`, { httpStatus: 502, details: redactJson({ stderr: redactText(result.stderr.slice(-2000)), stdout: redactText(result.stdout.slice(-1000)), valuesPrinted: false }) });
const parsed = parseJsonObject(result.stdout, `kubectl get ${resource}`);
const items = parsed.items;
return Array.isArray(items) ? items.filter((entry) => typeof entry === "object" && entry !== null && !Array.isArray(entry)).map((entry) => entry as unknown as K8sObject) : [];
}
async function deleteAssociatedResources(options: RunnerRetentionOptions, jobName: string): Promise<number> {
let deleted = 0;
const selector = `agentrun.pikastech.local/runner-job=${jobName}`;
for (const resource of ["secret", "pvc"] as const) {
const result = await kubectlRun(options.kubectlCommand ?? "kubectl", ["delete", resource, "-n", options.namespace, "-l", selector, "--ignore-not-found=true"]);
if (result.code !== 0) throw new AgentRunError("infra-failed", `kubectl delete associated ${resource} failed with code ${result.code}`, { httpStatus: 502, details: redactJson({ stderr: redactText(result.stderr.slice(-2000)), stdout: redactText(result.stdout.slice(-1000)), valuesPrinted: false }) });
deleted += countDeletedLines(result.stdout);
}
return deleted;
}
async function kubectlRun(kubectlCommand: string, args: string[]): Promise<{ code: number | null; signal: NodeJS.Signals | null; stdout: string; stderr: string }> {
const child = spawn(kubectlCommand, args, { stdio: ["ignore", "pipe", "pipe"] });
let stdout = "";
let stderr = "";
child.stdout.setEncoding("utf8");
child.stderr.setEncoding("utf8");
child.stdout.on("data", (chunk) => { stdout += String(chunk); });
child.stderr.on("data", (chunk) => { stderr += String(chunk); });
const result = await new Promise<{ code: number | null; signal: NodeJS.Signals | null }>((resolve, reject) => {
child.on("error", reject);
child.on("close", (code, signal) => resolve({ code, signal }));
}).catch((error: unknown) => {
throw new AgentRunError("infra-failed", `failed to start kubectl: ${error instanceof Error ? error.message : String(error)}`, { httpStatus: 503 });
});
return { ...result, stdout: redactText(stdout), stderr: redactText(stderr) };
}
function compareCandidates(left: RunnerResourceEntry, right: RunnerResourceEntry): number {
const priority = (kind: CandidateKind): number => kind === "idle" ? 0 : kind === "terminal" ? 1 : 2;
const byKind = priority(left.candidateKind) - priority(right.candidateKind);
if (byKind !== 0) return byKind;
const byActive = left.lastActiveAtMs - right.lastActiveAtMs;
if (byActive !== 0) return byActive;
return left.name.localeCompare(right.name);
}
function labelSelector(labels: Record<string, string>): string {
return Object.entries(labels).map(([key, value]) => `${key}=${value}`).join(",");
}
function matchesJobPrefix(name: string, prefixes: string[]): boolean {
return prefixes.length === 0 || prefixes.some((prefix) => name === prefix || name.startsWith(`${prefix}-`));
}
function podJobName(item: K8sObject): string | null {
const labels = item.metadata?.labels ?? {};
if (labels["job-name"]) return labels["job-name"];
return item.metadata?.ownerReferences?.find((owner) => owner.kind === "Job")?.name ?? null;
}
function jobTerminal(item: K8sObject): boolean {
const conditions = item.status?.conditions;
if (!Array.isArray(conditions)) return false;
return conditions.some((condition) => {
const record = condition as JsonRecord;
return (record.type === "Complete" || record.type === "Failed") && record.status === "True";
});
}
function podTerminal(item: K8sObject): boolean {
const phase = stringPath(item, ["status", "phase"]);
return phase === "Succeeded" || phase === "Failed";
}
function heartbeatFresh(leaseExpiresAt: string | null, activeHeartbeatMaxAgeMs: number): boolean {
const leaseMs = isoMs(leaseExpiresAt);
if (leaseMs <= 0) return false;
return leaseMs + activeHeartbeatMaxAgeMs >= Date.now();
}
function objectName(item: K8sObject): string | null {
return typeof item.metadata?.name === "string" && item.metadata.name.length > 0 ? item.metadata.name : null;
}
function objectCreatedAtMs(item: K8sObject): number {
return isoMs(item.metadata?.creationTimestamp) || 0;
}
function isoMs(value: string | null | undefined): number {
if (!value) return 0;
const parsed = Date.parse(value);
return Number.isFinite(parsed) ? parsed : 0;
}
function stringPath(value: unknown, path: string[]): string | null {
let current: unknown = value;
for (const key of path) {
if (typeof current !== "object" || current === null || Array.isArray(current)) return null;
current = (current as Record<string, unknown>)[key];
}
return typeof current === "string" ? current : null;
}
function parseJsonObject(text: string, label: string): JsonRecord {
try {
const parsed = JSON.parse(text) as JsonValue;
if (typeof parsed === "object" && parsed !== null && !Array.isArray(parsed)) return parsed;
} catch (error) {
throw new AgentRunError("infra-failed", `${label} returned invalid JSON: ${error instanceof Error ? error.message : String(error)}`, { httpStatus: 502, details: { stdoutHash: stableHash(text), valuesPrinted: false } });
}
throw new AgentRunError("infra-failed", `${label} returned non-object JSON`, { httpStatus: 502, details: { stdoutHash: stableHash(text), valuesPrinted: false } });
}
function countDeletedLines(stdout: string): number {
return stdout.split(/\r?\n/u).filter((line) => /\sdeleted$/u.test(line.trim())).length;
}
+72 -3
View File
@@ -8,6 +8,7 @@ import { asRecord, validateBackendProfile, validateCreateCommand, validateCreate
import { isBackendProfile } from "../common/backend-profiles.js";
import type { ApiErrorBody, ApiOkBody, CommandRecord, JsonRecord, JsonValue, QueueTaskRecord, RunEvent, RunRecord, SessionRecord } from "../common/types.js";
import { createKubernetesRunnerJob, type RunnerJobDefaults } from "./kubernetes-runner-job.js";
import type { RunnerRetentionOptions } from "./runner-retention.js";
import { dispatchQueueTask, refreshQueueTaskFromCore } from "./queue-dispatch.js";
import { buildRunResult } from "./result.js";
import { runnerJobStatusSummary } from "./runner-job-status.js";
@@ -37,6 +38,10 @@ function sessionPvcOptionsForRequest(serverDefaults: { kubectlHandler?: import("
function runnerJobDefaultsForRequest(defaults: ManagerServerOptions["runnerJobDefaults"], sourceCommit: string): RunnerJobDefaults {
const namespace = defaults?.namespace ?? process.env.AGENTRUN_RUNTIME_NAMESPACE ?? "agentrun-v01";
const serviceAccountName = defaults?.serviceAccountName ?? process.env.AGENTRUN_RUNNER_SERVICE_ACCOUNT ?? "agentrun-v01-runner";
const jobNamePrefix = defaults?.jobNamePrefix ?? process.env.AGENTRUN_RUNNER_JOB_NAME_PREFIX ?? serviceAccountName;
const lane = defaults?.lane ?? process.env.AGENTRUN_LANE ?? "v0.1";
const retention = runnerRetentionOptionsForRequest(defaults?.retention, namespace, jobNamePrefix, defaults?.kubectlCommand);
return {
namespace,
managerUrl: defaults?.managerUrl ?? process.env.AGENTRUN_INTERNAL_MGR_URL ?? `http://agentrun-mgr.${namespace}.svc.cluster.local:8080`,
@@ -45,10 +50,36 @@ function runnerJobDefaultsForRequest(defaults: ManagerServerOptions["runnerJobDe
sourceCommit,
...optionalStringRecord("envIdentity", defaults?.envIdentity ?? process.env.AGENTRUN_ENV_IDENTITY),
...optionalStringRecord("artifactCatalogFile", defaults?.artifactCatalogFile ?? process.env.AGENTRUN_ARTIFACT_CATALOG_FILE),
serviceAccountName: defaults?.serviceAccountName ?? process.env.AGENTRUN_RUNNER_SERVICE_ACCOUNT ?? "agentrun-v01-runner",
serviceAccountName,
jobNamePrefix,
lane,
...(defaults?.runnerIdleTimeoutMs !== undefined ? { runnerIdleTimeoutMs: defaults.runnerIdleTimeoutMs } : optionalPositiveIntegerRecord("runnerIdleTimeoutMs", process.env.AGENTRUN_RUNNER_IDLE_TIMEOUT_MS)),
...(defaults?.kubectlCommand ? { kubectlCommand: defaults.kubectlCommand } : {}),
...(defaults?.unideskSshEndpointEnv ? { unideskSshEndpointEnv: defaults.unideskSshEndpointEnv } : {}),
...(retention ? { retention } : {}),
};
}
function runnerRetentionOptionsForRequest(defaults: RunnerRetentionOptions | undefined, namespace: string, jobNamePrefix: string, kubectlCommand?: string): RunnerRetentionOptions | undefined {
if (defaults) return { ...defaults, namespace, ...(kubectlCommand ? { kubectlCommand } : {}) };
const maxRunners = optionalPositiveInteger("AGENTRUN_RUNNER_RETENTION_MAX_RUNNERS", process.env.AGENTRUN_RUNNER_RETENTION_MAX_RUNNERS);
if (maxRunners === undefined) return undefined;
const activeHeartbeatMaxAgeMs = requiredPositiveInteger("AGENTRUN_RUNNER_RETENTION_ACTIVE_HEARTBEAT_MAX_AGE_MS", process.env.AGENTRUN_RUNNER_RETENTION_ACTIVE_HEARTBEAT_MAX_AGE_MS);
const cleanupOrder = process.env.AGENTRUN_RUNNER_RETENTION_CLEANUP_ORDER ?? "oldest-inactive-last-active-first";
if (cleanupOrder !== "oldest-inactive-last-active-first") throw new AgentRunError("schema-invalid", "AGENTRUN_RUNNER_RETENTION_CLEANUP_ORDER is unsupported", { httpStatus: 500 });
const jobNamePrefixes = stringListEnv("AGENTRUN_RUNNER_RETENTION_JOB_NAME_PREFIXES", process.env.AGENTRUN_RUNNER_RETENTION_JOB_NAME_PREFIXES);
return {
namespace,
maxRunners,
cleanupOrder,
activeHeartbeatMaxAgeMs,
matchLabels: jsonRecordEnv("AGENTRUN_RUNNER_RETENTION_MATCH_LABELS_JSON", process.env.AGENTRUN_RUNNER_RETENTION_MATCH_LABELS_JSON),
jobNamePrefixes: jobNamePrefixes.length > 0 ? jobNamePrefixes : [jobNamePrefix],
ageBasedCleanup: {
enabled: booleanEnv("AGENTRUN_RUNNER_RETENTION_AGE_BASED_CLEANUP_ENABLED", process.env.AGENTRUN_RUNNER_RETENTION_AGE_BASED_CLEANUP_ENABLED ?? "false"),
maxAgeHours: optionalPositiveInteger("AGENTRUN_RUNNER_RETENTION_AGE_BASED_MAX_AGE_HOURS", process.env.AGENTRUN_RUNNER_RETENTION_AGE_BASED_MAX_AGE_HOURS) ?? null,
},
...(kubectlCommand ? { kubectlCommand } : {}),
};
}
@@ -65,9 +96,12 @@ export interface ManagerServerOptions {
envIdentity?: string;
artifactCatalogFile?: string;
serviceAccountName?: string;
jobNamePrefix?: string;
lane?: string;
runnerIdleTimeoutMs?: number;
kubectlCommand?: string;
unideskSshEndpointEnv?: JsonRecord;
retention?: RunnerRetentionOptions;
};
sessionPvcOptions?: { kubectlHandler?: import("./session-pvc.js").KubectlHandler; kubectlCommand?: string; storageClassName?: string; size?: string };
providerProfileOptions?: { namespace?: string; kubectlCommand?: string };
@@ -926,10 +960,45 @@ function optionalStringRecord(key: string, value: unknown): JsonRecord {
}
function optionalPositiveIntegerRecord(key: string, value: unknown): JsonRecord {
if (value === undefined || value === null || value === "") return {};
const parsed = optionalPositiveInteger(key, value);
if (parsed === undefined) return {};
return { [key]: parsed };
}
function optionalPositiveInteger(key: string, value: unknown): number | undefined {
if (value === undefined || value === null || value === "") return undefined;
const parsed = Number(value);
if (!Number.isInteger(parsed) || parsed <= 0) throw new AgentRunError("schema-invalid", `${key} must be a positive integer`, { httpStatus: 400 });
return { [key]: parsed };
return parsed;
}
function requiredPositiveInteger(key: string, value: unknown): number {
const parsed = optionalPositiveInteger(key, value);
if (parsed === undefined) throw new AgentRunError("schema-invalid", `${key} is required when runner retention is enabled`, { httpStatus: 500 });
return parsed;
}
function jsonRecordEnv(key: string, value: unknown): Record<string, string> {
if (typeof value !== "string" || value.trim().length === 0) throw new AgentRunError("schema-invalid", `${key} is required when runner retention is enabled`, { httpStatus: 500 });
const parsed = JSON.parse(value) as unknown;
if (typeof parsed !== "object" || parsed === null || Array.isArray(parsed)) throw new AgentRunError("schema-invalid", `${key} must be a JSON object`, { httpStatus: 500 });
const out: Record<string, string> = {};
for (const [entryKey, entryValue] of Object.entries(parsed)) {
if (typeof entryValue !== "string" || entryValue.length === 0) throw new AgentRunError("schema-invalid", `${key}.${entryKey} must be a non-empty string`, { httpStatus: 500 });
out[entryKey] = entryValue;
}
return out;
}
function stringListEnv(key: string, value: unknown): string[] {
if (typeof value !== "string") throw new AgentRunError("schema-invalid", `${key} is required when runner retention is enabled`, { httpStatus: 500 });
return value.split(",").map((item) => item.trim()).filter((item) => item.length > 0);
}
function booleanEnv(key: string, value: string): boolean {
if (value === "true") return true;
if (value === "false") return false;
throw new AgentRunError("schema-invalid", `${key} must be true or false`, { httpStatus: 500 });
}
function normalizeError(error: unknown): AgentRunError {
+16 -5
View File
@@ -50,6 +50,8 @@ export interface RunnerJobRenderOptions {
runnerId?: string;
sourceCommit?: string;
serviceAccountName?: string;
jobNamePrefix?: string;
lane?: string;
imagePullPolicy?: string;
backoffLimit?: number;
ttlSecondsAfterFinished?: number;
@@ -152,9 +154,11 @@ export function renderRunnerJobManifest(options: RunnerJobRenderOptions): { mani
const runnerId = options.runnerId ?? `runner_${shortHash(`${options.run.id}:${attemptId}:${options.commandId}`)}`;
const sourceCommit = options.sourceCommit ?? process.env.AGENTRUN_SOURCE_COMMIT ?? "unknown";
const serviceAccountName = options.serviceAccountName ?? "agentrun-v01-runner";
const jobNamePrefix = normalizeJobNamePrefix(options.jobNamePrefix ?? serviceAccountName);
const lane = options.lane ?? process.env.AGENTRUN_LANE ?? "v0.1";
const ttlSecondsAfterFinished = options.ttlSecondsAfterFinished ?? 86_400;
const runnerIdleTimeoutMs = normalizeRunnerIdleTimeoutMs(options.runnerIdleTimeoutMs);
const jobName = `agentrun-v01-runner-${shortDnsHash(options.run.id, attemptId)}`;
const jobName = `${jobNamePrefix}-${shortDnsHash(options.run.id, attemptId)}`;
const secretRefs = credentialProjections(options.run, namespace);
const toolCredentials = toolCredentialProjections(options.run, namespace);
const sessionPvc = options.sessionPvc;
@@ -167,7 +171,7 @@ export function renderRunnerJobManifest(options: RunnerJobRenderOptions): { mani
metadata: {
name: jobName,
namespace,
labels: labels(options.run, jobName),
labels: labels(options.run, jobName, lane),
annotations: {
"agentrun.pikastech.local/run-id": options.run.id,
"agentrun.pikastech.local/command-id": options.commandId,
@@ -179,7 +183,7 @@ export function renderRunnerJobManifest(options: RunnerJobRenderOptions): { mani
ttlSecondsAfterFinished,
template: {
metadata: {
labels: labels(options.run, jobName),
labels: labels(options.run, jobName, lane),
annotations: {
"agentrun.pikastech.local/run-id": options.run.id,
"agentrun.pikastech.local/command-id": options.commandId,
@@ -483,17 +487,24 @@ function defaultRuntimeHome(profile: string): string {
return `/home/agentrun/.codex-${sanitizeVolumeName(profile)}`;
}
function labels(run: RunRecord, jobName: string): JsonRecord {
function labels(run: RunRecord, jobName: string, lane: string): JsonRecord {
return {
"app.kubernetes.io/name": "agentrun-runner",
"app.kubernetes.io/component": "runner",
"app.kubernetes.io/part-of": "agentrun",
"agentrun.pikastech.local/lane": "v0.1",
"agentrun.pikastech.local/lane": lane,
"agentrun.pikastech.local/run-hash": shortHash(run.id),
"job-name": jobName,
};
}
function normalizeJobNamePrefix(value: string): string {
const normalized = value.toLowerCase().replace(/[^a-z0-9-]+/gu, "-").replace(/^-+|-+$/gu, "");
if (!normalized) throw new Error("jobNamePrefix must contain at least one DNS label character");
if (normalized.length > 46) return normalized.slice(0, 46).replace(/-+$/u, "") || "agentrun-runner";
return normalized;
}
function shortDnsHash(...parts: string[]): string {
return shortHash(parts.join(":"));
}
+113 -1
View File
@@ -265,6 +265,118 @@ process.exit(1);
} finally {
await new Promise<void>((resolve) => serverWithKubectl.server.close(() => resolve()));
}
const retentionKubectl = path.join(context.tmp, "fake-kubectl-retention.js");
const retentionCreatedManifest = path.join(context.tmp, "created-retention-runner-job.json");
const retentionDeleted = path.join(context.tmp, "retention-deleted.json");
await writeFile(retentionDeleted, "[]\n");
await writeFile(retentionKubectl, `#!/usr/bin/env bun
const args = Bun.argv.slice(2);
const deletedPath = ${JSON.stringify(retentionDeleted)};
async function readDeleted() {
try { return JSON.parse(await Bun.file(deletedPath).text()); } catch { return []; }
}
async function writeDeleted(items) { await Bun.write(deletedPath, JSON.stringify(items, null, 2) + "\\n"); }
async function readStdin() {
const chunks = [];
for await (const chunk of Bun.stdin.stream()) chunks.push(chunk);
return Buffer.concat(chunks.map((chunk) => Buffer.from(chunk))).toString("utf8");
}
function runnerJob(name, createdAt, commandId) {
return {
apiVersion: "batch/v1",
kind: "Job",
metadata: {
name,
namespace: "agentrun-v02",
creationTimestamp: createdAt,
labels: { "app.kubernetes.io/part-of": "agentrun", "app.kubernetes.io/name": "agentrun-runner", "app.kubernetes.io/component": "runner", "agentrun.pikastech.local/lane": "v0.2", "job-name": name },
annotations: { "agentrun.pikastech.local/run-id": "run-old", "agentrun.pikastech.local/command-id": commandId, "agentrun.pikastech.local/runner-id": "runner-old" },
},
status: {},
};
}
if (args[0] === "get" && args[1] === "jobs") {
const deleted = await readDeleted();
const deletedJobs = new Set(deleted.filter((item) => item.resource === "job").map((item) => item.name));
const items = [
runnerJob("agentrun-v02-runner-old-a", "2026-01-01T00:00:00Z", "cmd-old-a"),
runnerJob("agentrun-v02-runner-old-b", "2026-01-01T00:01:00Z", "cmd-old-b"),
].filter((item) => !deletedJobs.has(item.metadata.name));
console.log(JSON.stringify({ apiVersion: "v1", kind: "List", items }));
process.exit(0);
}
if (args[0] === "get" && args[1] === "pods") {
console.log(JSON.stringify({ apiVersion: "v1", kind: "List", items: [] }));
process.exit(0);
}
if (args[0] === "delete") {
const deleted = await readDeleted();
const resource = args[1];
const name = args[2] && !args[2].startsWith("-") ? args[2] : args.join(" ");
deleted.push({ resource, name, args });
await writeDeleted(deleted);
console.log(String(resource) + "/" + String(name).replace(/[^a-z0-9-]+/g, "-") + " deleted");
process.exit(0);
}
if (args[0] === "create") {
const text = await readStdin();
const manifest = JSON.parse(text);
if (manifest.kind === "Job") await Bun.write(${JSON.stringify(retentionCreatedManifest)}, text);
console.log(JSON.stringify({ apiVersion: manifest.apiVersion, kind: manifest.kind, metadata: { uid: "retention-job-uid", resourceVersion: "1", name: manifest.metadata.name, namespace: manifest.metadata.namespace } }));
process.exit(0);
}
console.error("unsupported fake retention kubectl args: " + args.join(" "));
process.exit(1);
`);
await chmod(retentionKubectl, 0o755);
const serverWithRetention = await startManagerServer({
port: 0,
host: "127.0.0.1",
sourceCommit: "self-test",
store: new MemoryAgentRunStore(),
runnerJobDefaults: {
namespace: "agentrun-v02",
managerUrl: "http://agentrun-mgr.agentrun-v02.svc.cluster.local:8080",
image: "127.0.0.1:5000/agentrun/agentrun-mgr@sha256:1111111111111111111111111111111111111111111111111111111111111111",
serviceAccountName: "agentrun-v02-runner",
jobNamePrefix: "agentrun-v02-runner",
lane: "v0.2",
kubectlCommand: retentionKubectl,
retention: {
namespace: "agentrun-v02",
maxRunners: 1,
cleanupOrder: "oldest-inactive-last-active-first",
activeHeartbeatMaxAgeMs: 900_000,
matchLabels: { "app.kubernetes.io/part-of": "agentrun", "app.kubernetes.io/name": "agentrun-runner", "app.kubernetes.io/component": "runner" },
jobNamePrefixes: ["agentrun-v02-runner"],
ageBasedCleanup: { enabled: false, maxAgeHours: null },
},
},
});
try {
const retentionClient = new ManagerClient(serverWithRetention.baseUrl);
const retentionItem = await createRunWithCommand(retentionClient, context, "retention job create", "selftest-runner-retention", 15_000);
const retentionCreated = await retentionClient.post(`/api/v1/runs/${retentionItem.runId}/runner-jobs`, { commandId: retentionItem.commandId, attemptId: "attempt_selftest_retention" }) as JsonRecord;
const retention = (retentionCreated.retention as JsonRecord).preCreateCleanup as JsonRecord;
assert.equal(retention.liveRunnerCountBefore, 2);
assert.equal(retention.selectedRunnerCount, 2);
assert.equal(retention.deletedRunnerJobCount, 2);
assert.equal(retention.liveRunnerCountAfter, 0);
assert.equal(retention.maxRunners, 1);
assert.equal(retention.valuesPrinted, false);
assert.match(String(retentionCreated.jobName), /^agentrun-v02-runner-[a-f0-9]{12}$/u);
const retentionManifest = JSON.parse(await readFile(retentionCreatedManifest, "utf8")) as JsonRecord;
const metadata = retentionManifest.metadata as JsonRecord;
const labels = metadata.labels as JsonRecord;
assert.equal(labels["agentrun.pikastech.local/lane"], "v0.2");
assert.match(String(metadata.name), /^agentrun-v02-runner-[a-f0-9]{12}$/u);
const deletedItems = JSON.parse(await readFile(retentionDeleted, "utf8")) as JsonRecord[];
assert.equal(deletedItems.filter((item) => item.resource === "job").length, 2);
assert.ok(deletedItems.some((item) => String((item.args as string[]).join(" ")).includes("agentrun.pikastech.local/runner-job=agentrun-v02-runner-old-a")), "old runner associated resources should be selected by runner-job label");
assertNoSecretLeak(retentionCreated);
} finally {
await new Promise<void>((resolve) => serverWithRetention.server.close(() => resolve()));
}
const sessionRunRecord: RunRecord = {
id: "run-selftest-session-pvc",
tenantId: "unidesk",
@@ -307,7 +419,7 @@ process.exit(1);
assert.equal(envMap.get("AGENTRUN_SESSION_PVC_NAMESPACE"), "agentrun-v01");
assert.equal(envMap.get("AGENTRUN_SESSION_PVC_MOUNT_PATH"), "/home/agentrun/.codex-codex/sessions");
assert.equal(envMap.get("AGENTRUN_CODEX_ROLLOUT_SUBDIR"), "sessions");
return { name: "runner-k8s-job", tests: ["runner-k8s-job-dry-run", "runner-k8s-job-codex-shell-sandbox-env", "runner-k8s-job-g14-egress-proxy-env", "runner-k8s-job-bounded-git-transport-env", "runner-k8s-job-deepseek-profile-dry-run", "runner-k8s-job-minimax-m3-profile-dry-run", "runner-k8s-job-dsflash-go-profile-dry-run", "runner-k8s-job-dsflash-go-legacy-secretref-normalized", "runner-k8s-job-create-api", "runner-k8s-job-retention-ttl", "runner-job-transient-env", "runner-job-transient-env-secretref", "runner-job-tool-credential-env", "runner-job-unidesk-ssh-tool-credential-env", "runner-job-tool-credential-volume", "runner-job-unidesk-ssh-endpoint-auto-env", "runner-job-unidesk-ssh-transient-env-denied", "runner-k8s-job-session-pvc-volume-and-env"] };
return { name: "runner-k8s-job", tests: ["runner-k8s-job-dry-run", "runner-k8s-job-codex-shell-sandbox-env", "runner-k8s-job-g14-egress-proxy-env", "runner-k8s-job-bounded-git-transport-env", "runner-k8s-job-deepseek-profile-dry-run", "runner-k8s-job-minimax-m3-profile-dry-run", "runner-k8s-job-dsflash-go-profile-dry-run", "runner-k8s-job-dsflash-go-legacy-secretref-normalized", "runner-k8s-job-create-api", "runner-k8s-job-retention-ttl", "runner-job-transient-env", "runner-job-transient-env-secretref", "runner-job-tool-credential-env", "runner-job-unidesk-ssh-tool-credential-env", "runner-job-tool-credential-volume", "runner-job-unidesk-ssh-endpoint-auto-env", "runner-job-unidesk-ssh-transient-env-denied", "runner-job-pre-create-retention", "runner-k8s-job-session-pvc-volume-and-env"] };
} finally {
await new Promise<void>((resolve) => server.server.close(() => resolve()));
}