Merge pull request #194 from pikasTech/fix/issue-192-runner-retention
fix: 创建 runner 前执行 retention 收敛
This commit is contained in:
@@ -11,6 +11,8 @@ import { staticWorkReadyCapabilitySummary } from "../common/work-ready.js";
|
||||
import { resolveRunnerEnvImage } from "../common/env-image-ref.js";
|
||||
import { ensureSessionPvc } from "./session-pvc.js";
|
||||
import { gitTransportSummary } from "../common/git-transport.js";
|
||||
import { enforceRunnerRetentionBeforeCreate } from "./runner-retention.js";
|
||||
import type { RunnerRetentionOptions, RunnerRetentionSummary } from "./runner-retention.js";
|
||||
|
||||
const reusableCredentialEnvNames = new Set([
|
||||
"AUTH_PASSWORD",
|
||||
@@ -44,9 +46,12 @@ export interface RunnerJobDefaults {
|
||||
envIdentity?: string;
|
||||
artifactCatalogFile?: string;
|
||||
serviceAccountName?: string;
|
||||
jobNamePrefix?: string;
|
||||
lane?: string;
|
||||
runnerIdleTimeoutMs?: number;
|
||||
kubectlCommand?: string;
|
||||
unideskSshEndpointEnv?: JsonRecord;
|
||||
retention?: RunnerRetentionOptions;
|
||||
}
|
||||
|
||||
export interface CreateRunnerJobInput extends JsonRecord {
|
||||
@@ -83,12 +88,14 @@ export async function createKubernetesRunnerJob(options: { store: AgentRunStore;
|
||||
const managerUrl = optionalString(options.input.managerUrl) ?? options.defaults.managerUrl;
|
||||
const sourceCommit = optionalString(options.input.sourceCommit) ?? options.defaults.sourceCommit;
|
||||
const serviceAccountName = optionalString(options.input.serviceAccountName) ?? options.defaults.serviceAccountName;
|
||||
const jobNamePrefix = options.defaults.jobNamePrefix;
|
||||
const lane = options.defaults.lane;
|
||||
const idempotencyKey = optionalString(options.input.idempotencyKey);
|
||||
const transientEnv = assembleToolContextTransientEnv(run.executionPolicy, transientEnvField(options.input.transientEnv), options.defaults);
|
||||
const attemptId = optionalString(options.input.attemptId) ?? `attempt_${Date.now().toString(36)}`;
|
||||
const runnerId = optionalString(options.input.runnerId);
|
||||
const runnerIdleTimeoutMs = optionalPositiveInteger(options.input.runnerIdleTimeoutMs, "runnerIdleTimeoutMs") ?? options.defaults.runnerIdleTimeoutMs;
|
||||
const transientEnvSecretName = transientEnv.length > 0 ? transientEnvSecretNameForRun(run.id, commandId, attemptId) : null;
|
||||
const transientEnvSecretName = transientEnv.length > 0 ? transientEnvSecretNameForRun(run.id, commandId, attemptId, jobNamePrefix) : null;
|
||||
const renderTransientEnv = transientEnvSecretName ? transientEnvWithSecretRefs(transientEnv, transientEnvSecretName) : transientEnv;
|
||||
const normalizedPayload = {
|
||||
commandId,
|
||||
@@ -110,6 +117,10 @@ export async function createKubernetesRunnerJob(options: { store: AgentRunStore;
|
||||
}
|
||||
if (isTerminalRunStatus(run.status)) throw new AgentRunError(run.failureKind ?? (run.status === "cancelled" ? "cancelled" : "schema-invalid"), `run ${run.id} is already terminal: ${run.status}`, { httpStatus: 409 });
|
||||
if (isTerminalCommandState(command.state) || command.state !== "pending") throw new AgentRunError(command.state === "cancelled" ? "cancelled" : "schema-invalid", `command ${commandId} is not pending: ${command.state}`, { httpStatus: 409 });
|
||||
let preCreateRetention: RunnerRetentionSummary | null = null;
|
||||
if (options.defaults.retention) {
|
||||
preCreateRetention = await enforceRunnerRetentionBeforeCreate({ store: options.store, options: { ...options.defaults.retention, namespace }, incomingRunnerCount: 1 });
|
||||
}
|
||||
let sessionPvc: RunnerSessionPvcOptions | undefined;
|
||||
let sessionPvcSummary: JsonRecord | null = null;
|
||||
if (run.sessionRef?.sessionId) {
|
||||
@@ -156,6 +167,8 @@ export async function createKubernetesRunnerJob(options: { store: AgentRunStore;
|
||||
transientEnv: renderTransientEnv,
|
||||
...(runnerIdleTimeoutMs !== undefined ? { runnerIdleTimeoutMs } : {}),
|
||||
...(serviceAccountName ? { serviceAccountName } : {}),
|
||||
...(jobNamePrefix ? { jobNamePrefix } : {}),
|
||||
...(lane ? { lane } : {}),
|
||||
...(sessionPvc ? { sessionPvc } : {}),
|
||||
};
|
||||
const render = renderRunnerJobManifest({ ...renderOptions, attemptId, ...(runnerId ? { runnerId } : {}) });
|
||||
@@ -165,7 +178,7 @@ export async function createKubernetesRunnerJob(options: { store: AgentRunStore;
|
||||
let created: JsonRecord | null = null;
|
||||
try {
|
||||
if (transientEnvSecretName) {
|
||||
await kubectlCreate(transientEnvSecretManifest({ namespace: render.namespace, name: transientEnvSecretName, runId: run.id, commandId, attemptId: render.attemptId, runnerId: render.runnerId, jobName: render.jobName, items: transientEnv }), kubectlCommand, "runner transient env secret");
|
||||
await kubectlCreate(transientEnvSecretManifest({ namespace: render.namespace, name: transientEnvSecretName, runId: run.id, commandId, attemptId: render.attemptId, runnerId: render.runnerId, jobName: render.jobName, lane: lane ?? "v0.1", items: transientEnv }), kubectlCommand, "runner transient env secret");
|
||||
transientEnvSecretCreated = true;
|
||||
}
|
||||
created = await kubectlCreate(render.manifest, kubectlCommand, "runner job");
|
||||
@@ -220,6 +233,7 @@ export async function createKubernetesRunnerJob(options: { store: AgentRunStore;
|
||||
retention: {
|
||||
ttlSecondsAfterFinished: render.ttlSecondsAfterFinished,
|
||||
runnerIdleTimeoutMs: render.runnerIdleTimeoutMs,
|
||||
preCreateCleanup: preCreateRetention,
|
||||
},
|
||||
pollActions: [
|
||||
runnerJobActionDescriptor({ action: "inspect-run", operation: "describe", resourceKind: "run", resourceName: run.id, runId: run.id }),
|
||||
@@ -263,6 +277,7 @@ export async function createKubernetesRunnerJob(options: { store: AgentRunStore;
|
||||
envImage,
|
||||
gitTransport: gitTransportSummary(),
|
||||
workReady: staticWorkReadyCapabilitySummary(),
|
||||
retention: preCreateRetention,
|
||||
toolCredentials: summarizeToolCredentials(render.toolCredentials, render.namespace),
|
||||
sessionPvc: sessionPvcSummary,
|
||||
sessionRef: summarizeSessionRef(run.sessionRef ?? null),
|
||||
@@ -357,11 +372,12 @@ function transientEnvWithSecretRefs(items: RunnerTransientEnv[], secretName: str
|
||||
return items.map((item) => ({ ...item, secretRef: { name: secretName, key: item.name } }));
|
||||
}
|
||||
|
||||
function transientEnvSecretNameForRun(runId: string, commandId: string, attemptId: string): string {
|
||||
return `agentrun-v01-runner-env-${stableHash({ runId, commandId, attemptId }).slice(0, 20)}`;
|
||||
function transientEnvSecretNameForRun(runId: string, commandId: string, attemptId: string, jobNamePrefix: string | undefined): string {
|
||||
const prefix = (jobNamePrefix ?? "agentrun-v01-runner").toLowerCase().replace(/[^a-z0-9-]+/gu, "-").replace(/^-+|-+$/gu, "") || "agentrun-runner";
|
||||
return `${prefix.slice(0, 36).replace(/-+$/u, "")}-env-${stableHash({ runId, commandId, attemptId }).slice(0, 20)}`;
|
||||
}
|
||||
|
||||
function transientEnvSecretManifest(options: { namespace: string; name: string; runId: string; commandId: string; attemptId: string; runnerId: string; jobName: string; items: RunnerTransientEnv[] }): JsonRecord {
|
||||
function transientEnvSecretManifest(options: { namespace: string; name: string; runId: string; commandId: string; attemptId: string; runnerId: string; jobName: string; lane: string; items: RunnerTransientEnv[] }): JsonRecord {
|
||||
const stringData: JsonRecord = {};
|
||||
for (const item of options.items) stringData[item.name] = item.value;
|
||||
return {
|
||||
@@ -374,7 +390,7 @@ function transientEnvSecretManifest(options: { namespace: string; name: string;
|
||||
"app.kubernetes.io/name": "agentrun-runner",
|
||||
"app.kubernetes.io/component": "runner-transient-env",
|
||||
"app.kubernetes.io/part-of": "agentrun",
|
||||
"agentrun.pikastech.local/lane": "v0.1",
|
||||
"agentrun.pikastech.local/lane": options.lane,
|
||||
"agentrun.pikastech.local/run-hash": stableHash(options.runId).slice(0, 12),
|
||||
"agentrun.pikastech.local/runner-job": options.jobName,
|
||||
},
|
||||
|
||||
@@ -0,0 +1,364 @@
|
||||
import { spawn } from "node:child_process";
|
||||
import { AgentRunError } from "../common/errors.js";
|
||||
import { redactJson, redactText } from "../common/redaction.js";
|
||||
import type { CommandRecord, JsonRecord, JsonValue, RunRecord } from "../common/types.js";
|
||||
import { stableHash } from "../common/validation.js";
|
||||
import { isTerminalCommandState, isTerminalRunStatus } from "./store.js";
|
||||
import type { AgentRunStore } from "./store.js";
|
||||
|
||||
export interface RunnerRetentionOptions {
|
||||
namespace: string;
|
||||
maxRunners: number;
|
||||
cleanupOrder: "oldest-inactive-last-active-first";
|
||||
activeHeartbeatMaxAgeMs: number;
|
||||
matchLabels: Record<string, string>;
|
||||
jobNamePrefixes: string[];
|
||||
ageBasedCleanup: {
|
||||
enabled: boolean;
|
||||
maxAgeHours: number | null;
|
||||
};
|
||||
kubectlCommand?: string;
|
||||
}
|
||||
|
||||
export interface RunnerRetentionSummary extends JsonRecord {
|
||||
enabled: boolean;
|
||||
namespace: string;
|
||||
maxRunners: number;
|
||||
incomingRunnerCount: number;
|
||||
liveRunnerCountBefore: number;
|
||||
liveRunnerCountAfter: number;
|
||||
runnerJobCountBefore: number;
|
||||
nonTerminalRunnerPodCountBefore: number;
|
||||
orphanNonTerminalRunnerPodCountBefore: number;
|
||||
inactiveCandidateCount: number;
|
||||
protectedActiveRunnerCount: number;
|
||||
selectedRunnerCount: number;
|
||||
deletedRunnerJobCount: number;
|
||||
deletedRunnerPodCount: number;
|
||||
deletedAssociatedResourceCount: number;
|
||||
activeRunRisk: boolean;
|
||||
reason: string;
|
||||
valuesPrinted: false;
|
||||
}
|
||||
|
||||
type CandidateKind = "idle" | "terminal" | "inactive";
|
||||
|
||||
interface K8sObject {
|
||||
kind?: string;
|
||||
metadata?: {
|
||||
name?: string;
|
||||
namespace?: string;
|
||||
creationTimestamp?: string;
|
||||
labels?: Record<string, string>;
|
||||
annotations?: Record<string, string>;
|
||||
ownerReferences?: Array<{ kind?: string; name?: string }>;
|
||||
};
|
||||
status?: JsonRecord;
|
||||
}
|
||||
|
||||
interface RunnerResourceEntry {
|
||||
resourceKind: "job" | "pod";
|
||||
name: string;
|
||||
jobName: string;
|
||||
runId: string | null;
|
||||
commandId: string | null;
|
||||
runnerId: string | null;
|
||||
createdAtMs: number;
|
||||
terminal: boolean;
|
||||
podPhase: string | null;
|
||||
protectedActive: boolean;
|
||||
protectedReason: string | null;
|
||||
candidateKind: CandidateKind;
|
||||
lastActiveAtMs: number;
|
||||
}
|
||||
|
||||
interface Snapshot {
|
||||
jobs: RunnerResourceEntry[];
|
||||
pods: RunnerResourceEntry[];
|
||||
liveRunnerCount: number;
|
||||
runnerJobCount: number;
|
||||
nonTerminalRunnerPodCount: number;
|
||||
orphanNonTerminalRunnerPodCount: number;
|
||||
}
|
||||
|
||||
export async function enforceRunnerRetentionBeforeCreate(input: { store: AgentRunStore; options: RunnerRetentionOptions; incomingRunnerCount?: number }): Promise<RunnerRetentionSummary> {
|
||||
const incomingRunnerCount = input.incomingRunnerCount ?? 1;
|
||||
const before = await runnerSnapshot(input.store, input.options);
|
||||
const targetBeforeCreate = Math.max(0, input.options.maxRunners - incomingRunnerCount);
|
||||
const requiredDeleteCount = Math.max(0, before.liveRunnerCount - targetBeforeCreate);
|
||||
const candidates = [...before.jobs, ...before.pods]
|
||||
.filter((item) => !item.protectedActive)
|
||||
.sort(compareCandidates);
|
||||
const selected = candidates.slice(0, requiredDeleteCount);
|
||||
const protectedActiveRunnerCount = before.jobs.filter((item) => item.protectedActive).length + before.pods.filter((item) => item.protectedActive).length;
|
||||
if (requiredDeleteCount > 0 && selected.length < requiredDeleteCount) {
|
||||
throw new AgentRunError("infra-failed", "runner retention cannot safely make room for a new runner", {
|
||||
httpStatus: 409,
|
||||
details: {
|
||||
reason: "runner-retention-no-safe-candidate",
|
||||
namespace: input.options.namespace,
|
||||
maxRunners: input.options.maxRunners,
|
||||
incomingRunnerCount,
|
||||
liveRunnerCountBefore: before.liveRunnerCount,
|
||||
requiredDeleteCount,
|
||||
inactiveCandidateCount: candidates.length,
|
||||
protectedActiveRunnerCount,
|
||||
activeRunRisk: protectedActiveRunnerCount > 0,
|
||||
valuesPrinted: false,
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
let deletedRunnerJobCount = 0;
|
||||
let deletedRunnerPodCount = 0;
|
||||
let deletedAssociatedResourceCount = 0;
|
||||
for (const item of selected) {
|
||||
if (item.resourceKind === "job") {
|
||||
await kubectlRun(input.options.kubectlCommand ?? "kubectl", ["delete", "job", item.name, "-n", input.options.namespace, "--ignore-not-found=true"]);
|
||||
deletedRunnerJobCount++;
|
||||
deletedAssociatedResourceCount += await deleteAssociatedResources(input.options, item.jobName);
|
||||
} else {
|
||||
await kubectlRun(input.options.kubectlCommand ?? "kubectl", ["delete", "pod", item.name, "-n", input.options.namespace, "--ignore-not-found=true"]);
|
||||
deletedRunnerPodCount++;
|
||||
}
|
||||
}
|
||||
const after = selected.length > 0 ? await runnerSnapshot(input.store, input.options) : before;
|
||||
return {
|
||||
enabled: true,
|
||||
namespace: input.options.namespace,
|
||||
maxRunners: input.options.maxRunners,
|
||||
incomingRunnerCount,
|
||||
liveRunnerCountBefore: before.liveRunnerCount,
|
||||
liveRunnerCountAfter: after.liveRunnerCount,
|
||||
runnerJobCountBefore: before.runnerJobCount,
|
||||
nonTerminalRunnerPodCountBefore: before.nonTerminalRunnerPodCount,
|
||||
orphanNonTerminalRunnerPodCountBefore: before.orphanNonTerminalRunnerPodCount,
|
||||
inactiveCandidateCount: candidates.length,
|
||||
protectedActiveRunnerCount,
|
||||
selectedRunnerCount: selected.length,
|
||||
deletedRunnerJobCount,
|
||||
deletedRunnerPodCount,
|
||||
deletedAssociatedResourceCount,
|
||||
activeRunRisk: protectedActiveRunnerCount > 0,
|
||||
reason: requiredDeleteCount === 0 ? "within-limit" : "pre-create-retention",
|
||||
selected: selected.map((item) => ({ resourceKind: item.resourceKind, name: item.name, jobName: item.jobName, candidateKind: item.candidateKind, protectedActive: false, lastActiveAt: new Date(item.lastActiveAtMs).toISOString(), valuesPrinted: false })),
|
||||
valuesPrinted: false,
|
||||
} satisfies RunnerRetentionSummary;
|
||||
}
|
||||
|
||||
async function runnerSnapshot(store: AgentRunStore, options: RunnerRetentionOptions): Promise<Snapshot> {
|
||||
const [jobObjects, podObjects] = await Promise.all([
|
||||
kubectlGetList(options, "jobs", labelSelector(options.matchLabels)),
|
||||
kubectlGetList(options, "pods", labelSelector(options.matchLabels)),
|
||||
]);
|
||||
const jobs: RunnerResourceEntry[] = [];
|
||||
for (const item of jobObjects) {
|
||||
const name = objectName(item);
|
||||
if (!name || !matchesJobPrefix(name, options.jobNamePrefixes)) continue;
|
||||
jobs.push(await jobEntry(store, item, options));
|
||||
}
|
||||
const jobNames = new Set(jobs.map((item) => item.jobName));
|
||||
const pods: RunnerResourceEntry[] = [];
|
||||
let nonTerminalRunnerPodCount = 0;
|
||||
let orphanNonTerminalRunnerPodCount = 0;
|
||||
for (const item of podObjects) {
|
||||
const name = objectName(item);
|
||||
if (!name) continue;
|
||||
const jobName = podJobName(item) ?? name;
|
||||
if (!matchesJobPrefix(jobName, options.jobNamePrefixes)) continue;
|
||||
const terminal = podTerminal(item);
|
||||
if (!terminal) nonTerminalRunnerPodCount++;
|
||||
if (jobNames.has(jobName)) continue;
|
||||
if (!terminal) orphanNonTerminalRunnerPodCount++;
|
||||
pods.push(await podEntry(store, item, options, jobName));
|
||||
}
|
||||
return {
|
||||
jobs,
|
||||
pods,
|
||||
liveRunnerCount: jobs.length + pods.filter((item) => !item.terminal).length,
|
||||
runnerJobCount: jobs.length,
|
||||
nonTerminalRunnerPodCount,
|
||||
orphanNonTerminalRunnerPodCount,
|
||||
};
|
||||
}
|
||||
|
||||
async function jobEntry(store: AgentRunStore, item: K8sObject, options: RunnerRetentionOptions): Promise<RunnerResourceEntry> {
|
||||
const name = objectName(item) ?? "unknown";
|
||||
const annotations = item.metadata?.annotations ?? {};
|
||||
const activity = await activityFor(store, {
|
||||
runId: annotations["agentrun.pikastech.local/run-id"] ?? null,
|
||||
commandId: annotations["agentrun.pikastech.local/command-id"] ?? null,
|
||||
runnerId: annotations["agentrun.pikastech.local/runner-id"] ?? null,
|
||||
terminal: jobTerminal(item),
|
||||
createdAtMs: objectCreatedAtMs(item),
|
||||
activeHeartbeatMaxAgeMs: options.activeHeartbeatMaxAgeMs,
|
||||
});
|
||||
return {
|
||||
resourceKind: "job",
|
||||
name,
|
||||
jobName: name,
|
||||
createdAtMs: objectCreatedAtMs(item),
|
||||
terminal: jobTerminal(item),
|
||||
podPhase: null,
|
||||
...activity,
|
||||
};
|
||||
}
|
||||
|
||||
async function podEntry(store: AgentRunStore, item: K8sObject, options: RunnerRetentionOptions, jobName: string): Promise<RunnerResourceEntry> {
|
||||
const name = objectName(item) ?? "unknown";
|
||||
const annotations = item.metadata?.annotations ?? {};
|
||||
const activity = await activityFor(store, {
|
||||
runId: annotations["agentrun.pikastech.local/run-id"] ?? null,
|
||||
commandId: annotations["agentrun.pikastech.local/command-id"] ?? null,
|
||||
runnerId: annotations["agentrun.pikastech.local/runner-id"] ?? null,
|
||||
terminal: podTerminal(item),
|
||||
createdAtMs: objectCreatedAtMs(item),
|
||||
activeHeartbeatMaxAgeMs: options.activeHeartbeatMaxAgeMs,
|
||||
});
|
||||
return {
|
||||
resourceKind: "pod",
|
||||
name,
|
||||
jobName,
|
||||
createdAtMs: objectCreatedAtMs(item),
|
||||
terminal: podTerminal(item),
|
||||
podPhase: stringPath(item, ["status", "phase"]),
|
||||
...activity,
|
||||
};
|
||||
}
|
||||
|
||||
async function activityFor(store: AgentRunStore, input: { runId: string | null; commandId: string | null; runnerId: string | null; terminal: boolean; createdAtMs: number; activeHeartbeatMaxAgeMs: number }): Promise<Pick<RunnerResourceEntry, "runId" | "commandId" | "runnerId" | "protectedActive" | "protectedReason" | "candidateKind" | "lastActiveAtMs">> {
|
||||
let run: RunRecord | null = null;
|
||||
let command: CommandRecord | null = null;
|
||||
if (input.runId) {
|
||||
try { run = await store.getRun(input.runId); } catch { run = null; }
|
||||
}
|
||||
if (input.commandId) {
|
||||
try { command = await store.getCommand(input.commandId); } catch { command = null; }
|
||||
}
|
||||
const lastActiveAtMs = Math.max(input.createdAtMs, isoMs(run?.updatedAt), isoMs(command?.updatedAt), isoMs(run?.leaseExpiresAt));
|
||||
const active = run && !isTerminalRunStatus(run.status) && command && !isTerminalCommandState(command.state) && input.runnerId && run.claimedBy === input.runnerId && heartbeatFresh(run.leaseExpiresAt, input.activeHeartbeatMaxAgeMs);
|
||||
if (active) {
|
||||
return { runId: input.runId, commandId: input.commandId, runnerId: input.runnerId, protectedActive: true, protectedReason: "fresh-active-run", candidateKind: "inactive", lastActiveAtMs };
|
||||
}
|
||||
const candidateKind: CandidateKind = command && isTerminalCommandState(command.state) && !input.terminal ? "idle" : input.terminal || (run !== null && isTerminalRunStatus(run.status)) ? "terminal" : "inactive";
|
||||
return { runId: input.runId, commandId: input.commandId, runnerId: input.runnerId, protectedActive: false, protectedReason: null, candidateKind, lastActiveAtMs };
|
||||
}
|
||||
|
||||
async function kubectlGetList(options: RunnerRetentionOptions, resource: string, selector: string): Promise<K8sObject[]> {
|
||||
const args = ["get", resource, "-n", options.namespace];
|
||||
if (selector) args.push("-l", selector);
|
||||
args.push("-o", "json");
|
||||
const result = await kubectlRun(options.kubectlCommand ?? "kubectl", args);
|
||||
if (result.code !== 0) throw new AgentRunError("infra-failed", `kubectl get ${resource} failed with code ${result.code}`, { httpStatus: 502, details: redactJson({ stderr: redactText(result.stderr.slice(-2000)), stdout: redactText(result.stdout.slice(-1000)), valuesPrinted: false }) });
|
||||
const parsed = parseJsonObject(result.stdout, `kubectl get ${resource}`);
|
||||
const items = parsed.items;
|
||||
return Array.isArray(items) ? items.filter((entry) => typeof entry === "object" && entry !== null && !Array.isArray(entry)).map((entry) => entry as unknown as K8sObject) : [];
|
||||
}
|
||||
|
||||
async function deleteAssociatedResources(options: RunnerRetentionOptions, jobName: string): Promise<number> {
|
||||
let deleted = 0;
|
||||
const selector = `agentrun.pikastech.local/runner-job=${jobName}`;
|
||||
for (const resource of ["secret", "pvc"] as const) {
|
||||
const result = await kubectlRun(options.kubectlCommand ?? "kubectl", ["delete", resource, "-n", options.namespace, "-l", selector, "--ignore-not-found=true"]);
|
||||
if (result.code !== 0) throw new AgentRunError("infra-failed", `kubectl delete associated ${resource} failed with code ${result.code}`, { httpStatus: 502, details: redactJson({ stderr: redactText(result.stderr.slice(-2000)), stdout: redactText(result.stdout.slice(-1000)), valuesPrinted: false }) });
|
||||
deleted += countDeletedLines(result.stdout);
|
||||
}
|
||||
return deleted;
|
||||
}
|
||||
|
||||
async function kubectlRun(kubectlCommand: string, args: string[]): Promise<{ code: number | null; signal: NodeJS.Signals | null; stdout: string; stderr: string }> {
|
||||
const child = spawn(kubectlCommand, args, { stdio: ["ignore", "pipe", "pipe"] });
|
||||
let stdout = "";
|
||||
let stderr = "";
|
||||
child.stdout.setEncoding("utf8");
|
||||
child.stderr.setEncoding("utf8");
|
||||
child.stdout.on("data", (chunk) => { stdout += String(chunk); });
|
||||
child.stderr.on("data", (chunk) => { stderr += String(chunk); });
|
||||
const result = await new Promise<{ code: number | null; signal: NodeJS.Signals | null }>((resolve, reject) => {
|
||||
child.on("error", reject);
|
||||
child.on("close", (code, signal) => resolve({ code, signal }));
|
||||
}).catch((error: unknown) => {
|
||||
throw new AgentRunError("infra-failed", `failed to start kubectl: ${error instanceof Error ? error.message : String(error)}`, { httpStatus: 503 });
|
||||
});
|
||||
return { ...result, stdout: redactText(stdout), stderr: redactText(stderr) };
|
||||
}
|
||||
|
||||
function compareCandidates(left: RunnerResourceEntry, right: RunnerResourceEntry): number {
|
||||
const priority = (kind: CandidateKind): number => kind === "idle" ? 0 : kind === "terminal" ? 1 : 2;
|
||||
const byKind = priority(left.candidateKind) - priority(right.candidateKind);
|
||||
if (byKind !== 0) return byKind;
|
||||
const byActive = left.lastActiveAtMs - right.lastActiveAtMs;
|
||||
if (byActive !== 0) return byActive;
|
||||
return left.name.localeCompare(right.name);
|
||||
}
|
||||
|
||||
function labelSelector(labels: Record<string, string>): string {
|
||||
return Object.entries(labels).map(([key, value]) => `${key}=${value}`).join(",");
|
||||
}
|
||||
|
||||
function matchesJobPrefix(name: string, prefixes: string[]): boolean {
|
||||
return prefixes.length === 0 || prefixes.some((prefix) => name === prefix || name.startsWith(`${prefix}-`));
|
||||
}
|
||||
|
||||
function podJobName(item: K8sObject): string | null {
|
||||
const labels = item.metadata?.labels ?? {};
|
||||
if (labels["job-name"]) return labels["job-name"];
|
||||
return item.metadata?.ownerReferences?.find((owner) => owner.kind === "Job")?.name ?? null;
|
||||
}
|
||||
|
||||
function jobTerminal(item: K8sObject): boolean {
|
||||
const conditions = item.status?.conditions;
|
||||
if (!Array.isArray(conditions)) return false;
|
||||
return conditions.some((condition) => {
|
||||
const record = condition as JsonRecord;
|
||||
return (record.type === "Complete" || record.type === "Failed") && record.status === "True";
|
||||
});
|
||||
}
|
||||
|
||||
function podTerminal(item: K8sObject): boolean {
|
||||
const phase = stringPath(item, ["status", "phase"]);
|
||||
return phase === "Succeeded" || phase === "Failed";
|
||||
}
|
||||
|
||||
function heartbeatFresh(leaseExpiresAt: string | null, activeHeartbeatMaxAgeMs: number): boolean {
|
||||
const leaseMs = isoMs(leaseExpiresAt);
|
||||
if (leaseMs <= 0) return false;
|
||||
return leaseMs + activeHeartbeatMaxAgeMs >= Date.now();
|
||||
}
|
||||
|
||||
function objectName(item: K8sObject): string | null {
|
||||
return typeof item.metadata?.name === "string" && item.metadata.name.length > 0 ? item.metadata.name : null;
|
||||
}
|
||||
|
||||
function objectCreatedAtMs(item: K8sObject): number {
|
||||
return isoMs(item.metadata?.creationTimestamp) || 0;
|
||||
}
|
||||
|
||||
function isoMs(value: string | null | undefined): number {
|
||||
if (!value) return 0;
|
||||
const parsed = Date.parse(value);
|
||||
return Number.isFinite(parsed) ? parsed : 0;
|
||||
}
|
||||
|
||||
function stringPath(value: unknown, path: string[]): string | null {
|
||||
let current: unknown = value;
|
||||
for (const key of path) {
|
||||
if (typeof current !== "object" || current === null || Array.isArray(current)) return null;
|
||||
current = (current as Record<string, unknown>)[key];
|
||||
}
|
||||
return typeof current === "string" ? current : null;
|
||||
}
|
||||
|
||||
function parseJsonObject(text: string, label: string): JsonRecord {
|
||||
try {
|
||||
const parsed = JSON.parse(text) as JsonValue;
|
||||
if (typeof parsed === "object" && parsed !== null && !Array.isArray(parsed)) return parsed;
|
||||
} catch (error) {
|
||||
throw new AgentRunError("infra-failed", `${label} returned invalid JSON: ${error instanceof Error ? error.message : String(error)}`, { httpStatus: 502, details: { stdoutHash: stableHash(text), valuesPrinted: false } });
|
||||
}
|
||||
throw new AgentRunError("infra-failed", `${label} returned non-object JSON`, { httpStatus: 502, details: { stdoutHash: stableHash(text), valuesPrinted: false } });
|
||||
}
|
||||
|
||||
function countDeletedLines(stdout: string): number {
|
||||
return stdout.split(/\r?\n/u).filter((line) => /\sdeleted$/u.test(line.trim())).length;
|
||||
}
|
||||
+72
-3
@@ -8,6 +8,7 @@ import { asRecord, validateBackendProfile, validateCreateCommand, validateCreate
|
||||
import { isBackendProfile } from "../common/backend-profiles.js";
|
||||
import type { ApiErrorBody, ApiOkBody, CommandRecord, JsonRecord, JsonValue, QueueTaskRecord, RunEvent, RunRecord, SessionRecord } from "../common/types.js";
|
||||
import { createKubernetesRunnerJob, type RunnerJobDefaults } from "./kubernetes-runner-job.js";
|
||||
import type { RunnerRetentionOptions } from "./runner-retention.js";
|
||||
import { dispatchQueueTask, refreshQueueTaskFromCore } from "./queue-dispatch.js";
|
||||
import { buildRunResult } from "./result.js";
|
||||
import { runnerJobStatusSummary } from "./runner-job-status.js";
|
||||
@@ -37,6 +38,10 @@ function sessionPvcOptionsForRequest(serverDefaults: { kubectlHandler?: import("
|
||||
|
||||
function runnerJobDefaultsForRequest(defaults: ManagerServerOptions["runnerJobDefaults"], sourceCommit: string): RunnerJobDefaults {
|
||||
const namespace = defaults?.namespace ?? process.env.AGENTRUN_RUNTIME_NAMESPACE ?? "agentrun-v01";
|
||||
const serviceAccountName = defaults?.serviceAccountName ?? process.env.AGENTRUN_RUNNER_SERVICE_ACCOUNT ?? "agentrun-v01-runner";
|
||||
const jobNamePrefix = defaults?.jobNamePrefix ?? process.env.AGENTRUN_RUNNER_JOB_NAME_PREFIX ?? serviceAccountName;
|
||||
const lane = defaults?.lane ?? process.env.AGENTRUN_LANE ?? "v0.1";
|
||||
const retention = runnerRetentionOptionsForRequest(defaults?.retention, namespace, jobNamePrefix, defaults?.kubectlCommand);
|
||||
return {
|
||||
namespace,
|
||||
managerUrl: defaults?.managerUrl ?? process.env.AGENTRUN_INTERNAL_MGR_URL ?? `http://agentrun-mgr.${namespace}.svc.cluster.local:8080`,
|
||||
@@ -45,10 +50,36 @@ function runnerJobDefaultsForRequest(defaults: ManagerServerOptions["runnerJobDe
|
||||
sourceCommit,
|
||||
...optionalStringRecord("envIdentity", defaults?.envIdentity ?? process.env.AGENTRUN_ENV_IDENTITY),
|
||||
...optionalStringRecord("artifactCatalogFile", defaults?.artifactCatalogFile ?? process.env.AGENTRUN_ARTIFACT_CATALOG_FILE),
|
||||
serviceAccountName: defaults?.serviceAccountName ?? process.env.AGENTRUN_RUNNER_SERVICE_ACCOUNT ?? "agentrun-v01-runner",
|
||||
serviceAccountName,
|
||||
jobNamePrefix,
|
||||
lane,
|
||||
...(defaults?.runnerIdleTimeoutMs !== undefined ? { runnerIdleTimeoutMs: defaults.runnerIdleTimeoutMs } : optionalPositiveIntegerRecord("runnerIdleTimeoutMs", process.env.AGENTRUN_RUNNER_IDLE_TIMEOUT_MS)),
|
||||
...(defaults?.kubectlCommand ? { kubectlCommand: defaults.kubectlCommand } : {}),
|
||||
...(defaults?.unideskSshEndpointEnv ? { unideskSshEndpointEnv: defaults.unideskSshEndpointEnv } : {}),
|
||||
...(retention ? { retention } : {}),
|
||||
};
|
||||
}
|
||||
|
||||
function runnerRetentionOptionsForRequest(defaults: RunnerRetentionOptions | undefined, namespace: string, jobNamePrefix: string, kubectlCommand?: string): RunnerRetentionOptions | undefined {
|
||||
if (defaults) return { ...defaults, namespace, ...(kubectlCommand ? { kubectlCommand } : {}) };
|
||||
const maxRunners = optionalPositiveInteger("AGENTRUN_RUNNER_RETENTION_MAX_RUNNERS", process.env.AGENTRUN_RUNNER_RETENTION_MAX_RUNNERS);
|
||||
if (maxRunners === undefined) return undefined;
|
||||
const activeHeartbeatMaxAgeMs = requiredPositiveInteger("AGENTRUN_RUNNER_RETENTION_ACTIVE_HEARTBEAT_MAX_AGE_MS", process.env.AGENTRUN_RUNNER_RETENTION_ACTIVE_HEARTBEAT_MAX_AGE_MS);
|
||||
const cleanupOrder = process.env.AGENTRUN_RUNNER_RETENTION_CLEANUP_ORDER ?? "oldest-inactive-last-active-first";
|
||||
if (cleanupOrder !== "oldest-inactive-last-active-first") throw new AgentRunError("schema-invalid", "AGENTRUN_RUNNER_RETENTION_CLEANUP_ORDER is unsupported", { httpStatus: 500 });
|
||||
const jobNamePrefixes = stringListEnv("AGENTRUN_RUNNER_RETENTION_JOB_NAME_PREFIXES", process.env.AGENTRUN_RUNNER_RETENTION_JOB_NAME_PREFIXES);
|
||||
return {
|
||||
namespace,
|
||||
maxRunners,
|
||||
cleanupOrder,
|
||||
activeHeartbeatMaxAgeMs,
|
||||
matchLabels: jsonRecordEnv("AGENTRUN_RUNNER_RETENTION_MATCH_LABELS_JSON", process.env.AGENTRUN_RUNNER_RETENTION_MATCH_LABELS_JSON),
|
||||
jobNamePrefixes: jobNamePrefixes.length > 0 ? jobNamePrefixes : [jobNamePrefix],
|
||||
ageBasedCleanup: {
|
||||
enabled: booleanEnv("AGENTRUN_RUNNER_RETENTION_AGE_BASED_CLEANUP_ENABLED", process.env.AGENTRUN_RUNNER_RETENTION_AGE_BASED_CLEANUP_ENABLED ?? "false"),
|
||||
maxAgeHours: optionalPositiveInteger("AGENTRUN_RUNNER_RETENTION_AGE_BASED_MAX_AGE_HOURS", process.env.AGENTRUN_RUNNER_RETENTION_AGE_BASED_MAX_AGE_HOURS) ?? null,
|
||||
},
|
||||
...(kubectlCommand ? { kubectlCommand } : {}),
|
||||
};
|
||||
}
|
||||
|
||||
@@ -65,9 +96,12 @@ export interface ManagerServerOptions {
|
||||
envIdentity?: string;
|
||||
artifactCatalogFile?: string;
|
||||
serviceAccountName?: string;
|
||||
jobNamePrefix?: string;
|
||||
lane?: string;
|
||||
runnerIdleTimeoutMs?: number;
|
||||
kubectlCommand?: string;
|
||||
unideskSshEndpointEnv?: JsonRecord;
|
||||
retention?: RunnerRetentionOptions;
|
||||
};
|
||||
sessionPvcOptions?: { kubectlHandler?: import("./session-pvc.js").KubectlHandler; kubectlCommand?: string; storageClassName?: string; size?: string };
|
||||
providerProfileOptions?: { namespace?: string; kubectlCommand?: string };
|
||||
@@ -926,10 +960,45 @@ function optionalStringRecord(key: string, value: unknown): JsonRecord {
|
||||
}
|
||||
|
||||
function optionalPositiveIntegerRecord(key: string, value: unknown): JsonRecord {
|
||||
if (value === undefined || value === null || value === "") return {};
|
||||
const parsed = optionalPositiveInteger(key, value);
|
||||
if (parsed === undefined) return {};
|
||||
return { [key]: parsed };
|
||||
}
|
||||
|
||||
function optionalPositiveInteger(key: string, value: unknown): number | undefined {
|
||||
if (value === undefined || value === null || value === "") return undefined;
|
||||
const parsed = Number(value);
|
||||
if (!Number.isInteger(parsed) || parsed <= 0) throw new AgentRunError("schema-invalid", `${key} must be a positive integer`, { httpStatus: 400 });
|
||||
return { [key]: parsed };
|
||||
return parsed;
|
||||
}
|
||||
|
||||
function requiredPositiveInteger(key: string, value: unknown): number {
|
||||
const parsed = optionalPositiveInteger(key, value);
|
||||
if (parsed === undefined) throw new AgentRunError("schema-invalid", `${key} is required when runner retention is enabled`, { httpStatus: 500 });
|
||||
return parsed;
|
||||
}
|
||||
|
||||
function jsonRecordEnv(key: string, value: unknown): Record<string, string> {
|
||||
if (typeof value !== "string" || value.trim().length === 0) throw new AgentRunError("schema-invalid", `${key} is required when runner retention is enabled`, { httpStatus: 500 });
|
||||
const parsed = JSON.parse(value) as unknown;
|
||||
if (typeof parsed !== "object" || parsed === null || Array.isArray(parsed)) throw new AgentRunError("schema-invalid", `${key} must be a JSON object`, { httpStatus: 500 });
|
||||
const out: Record<string, string> = {};
|
||||
for (const [entryKey, entryValue] of Object.entries(parsed)) {
|
||||
if (typeof entryValue !== "string" || entryValue.length === 0) throw new AgentRunError("schema-invalid", `${key}.${entryKey} must be a non-empty string`, { httpStatus: 500 });
|
||||
out[entryKey] = entryValue;
|
||||
}
|
||||
return out;
|
||||
}
|
||||
|
||||
function stringListEnv(key: string, value: unknown): string[] {
|
||||
if (typeof value !== "string") throw new AgentRunError("schema-invalid", `${key} is required when runner retention is enabled`, { httpStatus: 500 });
|
||||
return value.split(",").map((item) => item.trim()).filter((item) => item.length > 0);
|
||||
}
|
||||
|
||||
function booleanEnv(key: string, value: string): boolean {
|
||||
if (value === "true") return true;
|
||||
if (value === "false") return false;
|
||||
throw new AgentRunError("schema-invalid", `${key} must be true or false`, { httpStatus: 500 });
|
||||
}
|
||||
|
||||
function normalizeError(error: unknown): AgentRunError {
|
||||
|
||||
+16
-5
@@ -50,6 +50,8 @@ export interface RunnerJobRenderOptions {
|
||||
runnerId?: string;
|
||||
sourceCommit?: string;
|
||||
serviceAccountName?: string;
|
||||
jobNamePrefix?: string;
|
||||
lane?: string;
|
||||
imagePullPolicy?: string;
|
||||
backoffLimit?: number;
|
||||
ttlSecondsAfterFinished?: number;
|
||||
@@ -152,9 +154,11 @@ export function renderRunnerJobManifest(options: RunnerJobRenderOptions): { mani
|
||||
const runnerId = options.runnerId ?? `runner_${shortHash(`${options.run.id}:${attemptId}:${options.commandId}`)}`;
|
||||
const sourceCommit = options.sourceCommit ?? process.env.AGENTRUN_SOURCE_COMMIT ?? "unknown";
|
||||
const serviceAccountName = options.serviceAccountName ?? "agentrun-v01-runner";
|
||||
const jobNamePrefix = normalizeJobNamePrefix(options.jobNamePrefix ?? serviceAccountName);
|
||||
const lane = options.lane ?? process.env.AGENTRUN_LANE ?? "v0.1";
|
||||
const ttlSecondsAfterFinished = options.ttlSecondsAfterFinished ?? 86_400;
|
||||
const runnerIdleTimeoutMs = normalizeRunnerIdleTimeoutMs(options.runnerIdleTimeoutMs);
|
||||
const jobName = `agentrun-v01-runner-${shortDnsHash(options.run.id, attemptId)}`;
|
||||
const jobName = `${jobNamePrefix}-${shortDnsHash(options.run.id, attemptId)}`;
|
||||
const secretRefs = credentialProjections(options.run, namespace);
|
||||
const toolCredentials = toolCredentialProjections(options.run, namespace);
|
||||
const sessionPvc = options.sessionPvc;
|
||||
@@ -167,7 +171,7 @@ export function renderRunnerJobManifest(options: RunnerJobRenderOptions): { mani
|
||||
metadata: {
|
||||
name: jobName,
|
||||
namespace,
|
||||
labels: labels(options.run, jobName),
|
||||
labels: labels(options.run, jobName, lane),
|
||||
annotations: {
|
||||
"agentrun.pikastech.local/run-id": options.run.id,
|
||||
"agentrun.pikastech.local/command-id": options.commandId,
|
||||
@@ -179,7 +183,7 @@ export function renderRunnerJobManifest(options: RunnerJobRenderOptions): { mani
|
||||
ttlSecondsAfterFinished,
|
||||
template: {
|
||||
metadata: {
|
||||
labels: labels(options.run, jobName),
|
||||
labels: labels(options.run, jobName, lane),
|
||||
annotations: {
|
||||
"agentrun.pikastech.local/run-id": options.run.id,
|
||||
"agentrun.pikastech.local/command-id": options.commandId,
|
||||
@@ -483,17 +487,24 @@ function defaultRuntimeHome(profile: string): string {
|
||||
return `/home/agentrun/.codex-${sanitizeVolumeName(profile)}`;
|
||||
}
|
||||
|
||||
function labels(run: RunRecord, jobName: string): JsonRecord {
|
||||
function labels(run: RunRecord, jobName: string, lane: string): JsonRecord {
|
||||
return {
|
||||
"app.kubernetes.io/name": "agentrun-runner",
|
||||
"app.kubernetes.io/component": "runner",
|
||||
"app.kubernetes.io/part-of": "agentrun",
|
||||
"agentrun.pikastech.local/lane": "v0.1",
|
||||
"agentrun.pikastech.local/lane": lane,
|
||||
"agentrun.pikastech.local/run-hash": shortHash(run.id),
|
||||
"job-name": jobName,
|
||||
};
|
||||
}
|
||||
|
||||
function normalizeJobNamePrefix(value: string): string {
|
||||
const normalized = value.toLowerCase().replace(/[^a-z0-9-]+/gu, "-").replace(/^-+|-+$/gu, "");
|
||||
if (!normalized) throw new Error("jobNamePrefix must contain at least one DNS label character");
|
||||
if (normalized.length > 46) return normalized.slice(0, 46).replace(/-+$/u, "") || "agentrun-runner";
|
||||
return normalized;
|
||||
}
|
||||
|
||||
function shortDnsHash(...parts: string[]): string {
|
||||
return shortHash(parts.join(":"));
|
||||
}
|
||||
|
||||
@@ -265,6 +265,118 @@ process.exit(1);
|
||||
} finally {
|
||||
await new Promise<void>((resolve) => serverWithKubectl.server.close(() => resolve()));
|
||||
}
|
||||
const retentionKubectl = path.join(context.tmp, "fake-kubectl-retention.js");
|
||||
const retentionCreatedManifest = path.join(context.tmp, "created-retention-runner-job.json");
|
||||
const retentionDeleted = path.join(context.tmp, "retention-deleted.json");
|
||||
await writeFile(retentionDeleted, "[]\n");
|
||||
await writeFile(retentionKubectl, `#!/usr/bin/env bun
|
||||
const args = Bun.argv.slice(2);
|
||||
const deletedPath = ${JSON.stringify(retentionDeleted)};
|
||||
async function readDeleted() {
|
||||
try { return JSON.parse(await Bun.file(deletedPath).text()); } catch { return []; }
|
||||
}
|
||||
async function writeDeleted(items) { await Bun.write(deletedPath, JSON.stringify(items, null, 2) + "\\n"); }
|
||||
async function readStdin() {
|
||||
const chunks = [];
|
||||
for await (const chunk of Bun.stdin.stream()) chunks.push(chunk);
|
||||
return Buffer.concat(chunks.map((chunk) => Buffer.from(chunk))).toString("utf8");
|
||||
}
|
||||
function runnerJob(name, createdAt, commandId) {
|
||||
return {
|
||||
apiVersion: "batch/v1",
|
||||
kind: "Job",
|
||||
metadata: {
|
||||
name,
|
||||
namespace: "agentrun-v02",
|
||||
creationTimestamp: createdAt,
|
||||
labels: { "app.kubernetes.io/part-of": "agentrun", "app.kubernetes.io/name": "agentrun-runner", "app.kubernetes.io/component": "runner", "agentrun.pikastech.local/lane": "v0.2", "job-name": name },
|
||||
annotations: { "agentrun.pikastech.local/run-id": "run-old", "agentrun.pikastech.local/command-id": commandId, "agentrun.pikastech.local/runner-id": "runner-old" },
|
||||
},
|
||||
status: {},
|
||||
};
|
||||
}
|
||||
if (args[0] === "get" && args[1] === "jobs") {
|
||||
const deleted = await readDeleted();
|
||||
const deletedJobs = new Set(deleted.filter((item) => item.resource === "job").map((item) => item.name));
|
||||
const items = [
|
||||
runnerJob("agentrun-v02-runner-old-a", "2026-01-01T00:00:00Z", "cmd-old-a"),
|
||||
runnerJob("agentrun-v02-runner-old-b", "2026-01-01T00:01:00Z", "cmd-old-b"),
|
||||
].filter((item) => !deletedJobs.has(item.metadata.name));
|
||||
console.log(JSON.stringify({ apiVersion: "v1", kind: "List", items }));
|
||||
process.exit(0);
|
||||
}
|
||||
if (args[0] === "get" && args[1] === "pods") {
|
||||
console.log(JSON.stringify({ apiVersion: "v1", kind: "List", items: [] }));
|
||||
process.exit(0);
|
||||
}
|
||||
if (args[0] === "delete") {
|
||||
const deleted = await readDeleted();
|
||||
const resource = args[1];
|
||||
const name = args[2] && !args[2].startsWith("-") ? args[2] : args.join(" ");
|
||||
deleted.push({ resource, name, args });
|
||||
await writeDeleted(deleted);
|
||||
console.log(String(resource) + "/" + String(name).replace(/[^a-z0-9-]+/g, "-") + " deleted");
|
||||
process.exit(0);
|
||||
}
|
||||
if (args[0] === "create") {
|
||||
const text = await readStdin();
|
||||
const manifest = JSON.parse(text);
|
||||
if (manifest.kind === "Job") await Bun.write(${JSON.stringify(retentionCreatedManifest)}, text);
|
||||
console.log(JSON.stringify({ apiVersion: manifest.apiVersion, kind: manifest.kind, metadata: { uid: "retention-job-uid", resourceVersion: "1", name: manifest.metadata.name, namespace: manifest.metadata.namespace } }));
|
||||
process.exit(0);
|
||||
}
|
||||
console.error("unsupported fake retention kubectl args: " + args.join(" "));
|
||||
process.exit(1);
|
||||
`);
|
||||
await chmod(retentionKubectl, 0o755);
|
||||
const serverWithRetention = await startManagerServer({
|
||||
port: 0,
|
||||
host: "127.0.0.1",
|
||||
sourceCommit: "self-test",
|
||||
store: new MemoryAgentRunStore(),
|
||||
runnerJobDefaults: {
|
||||
namespace: "agentrun-v02",
|
||||
managerUrl: "http://agentrun-mgr.agentrun-v02.svc.cluster.local:8080",
|
||||
image: "127.0.0.1:5000/agentrun/agentrun-mgr@sha256:1111111111111111111111111111111111111111111111111111111111111111",
|
||||
serviceAccountName: "agentrun-v02-runner",
|
||||
jobNamePrefix: "agentrun-v02-runner",
|
||||
lane: "v0.2",
|
||||
kubectlCommand: retentionKubectl,
|
||||
retention: {
|
||||
namespace: "agentrun-v02",
|
||||
maxRunners: 1,
|
||||
cleanupOrder: "oldest-inactive-last-active-first",
|
||||
activeHeartbeatMaxAgeMs: 900_000,
|
||||
matchLabels: { "app.kubernetes.io/part-of": "agentrun", "app.kubernetes.io/name": "agentrun-runner", "app.kubernetes.io/component": "runner" },
|
||||
jobNamePrefixes: ["agentrun-v02-runner"],
|
||||
ageBasedCleanup: { enabled: false, maxAgeHours: null },
|
||||
},
|
||||
},
|
||||
});
|
||||
try {
|
||||
const retentionClient = new ManagerClient(serverWithRetention.baseUrl);
|
||||
const retentionItem = await createRunWithCommand(retentionClient, context, "retention job create", "selftest-runner-retention", 15_000);
|
||||
const retentionCreated = await retentionClient.post(`/api/v1/runs/${retentionItem.runId}/runner-jobs`, { commandId: retentionItem.commandId, attemptId: "attempt_selftest_retention" }) as JsonRecord;
|
||||
const retention = (retentionCreated.retention as JsonRecord).preCreateCleanup as JsonRecord;
|
||||
assert.equal(retention.liveRunnerCountBefore, 2);
|
||||
assert.equal(retention.selectedRunnerCount, 2);
|
||||
assert.equal(retention.deletedRunnerJobCount, 2);
|
||||
assert.equal(retention.liveRunnerCountAfter, 0);
|
||||
assert.equal(retention.maxRunners, 1);
|
||||
assert.equal(retention.valuesPrinted, false);
|
||||
assert.match(String(retentionCreated.jobName), /^agentrun-v02-runner-[a-f0-9]{12}$/u);
|
||||
const retentionManifest = JSON.parse(await readFile(retentionCreatedManifest, "utf8")) as JsonRecord;
|
||||
const metadata = retentionManifest.metadata as JsonRecord;
|
||||
const labels = metadata.labels as JsonRecord;
|
||||
assert.equal(labels["agentrun.pikastech.local/lane"], "v0.2");
|
||||
assert.match(String(metadata.name), /^agentrun-v02-runner-[a-f0-9]{12}$/u);
|
||||
const deletedItems = JSON.parse(await readFile(retentionDeleted, "utf8")) as JsonRecord[];
|
||||
assert.equal(deletedItems.filter((item) => item.resource === "job").length, 2);
|
||||
assert.ok(deletedItems.some((item) => String((item.args as string[]).join(" ")).includes("agentrun.pikastech.local/runner-job=agentrun-v02-runner-old-a")), "old runner associated resources should be selected by runner-job label");
|
||||
assertNoSecretLeak(retentionCreated);
|
||||
} finally {
|
||||
await new Promise<void>((resolve) => serverWithRetention.server.close(() => resolve()));
|
||||
}
|
||||
const sessionRunRecord: RunRecord = {
|
||||
id: "run-selftest-session-pvc",
|
||||
tenantId: "unidesk",
|
||||
@@ -307,7 +419,7 @@ process.exit(1);
|
||||
assert.equal(envMap.get("AGENTRUN_SESSION_PVC_NAMESPACE"), "agentrun-v01");
|
||||
assert.equal(envMap.get("AGENTRUN_SESSION_PVC_MOUNT_PATH"), "/home/agentrun/.codex-codex/sessions");
|
||||
assert.equal(envMap.get("AGENTRUN_CODEX_ROLLOUT_SUBDIR"), "sessions");
|
||||
return { name: "runner-k8s-job", tests: ["runner-k8s-job-dry-run", "runner-k8s-job-codex-shell-sandbox-env", "runner-k8s-job-g14-egress-proxy-env", "runner-k8s-job-bounded-git-transport-env", "runner-k8s-job-deepseek-profile-dry-run", "runner-k8s-job-minimax-m3-profile-dry-run", "runner-k8s-job-dsflash-go-profile-dry-run", "runner-k8s-job-dsflash-go-legacy-secretref-normalized", "runner-k8s-job-create-api", "runner-k8s-job-retention-ttl", "runner-job-transient-env", "runner-job-transient-env-secretref", "runner-job-tool-credential-env", "runner-job-unidesk-ssh-tool-credential-env", "runner-job-tool-credential-volume", "runner-job-unidesk-ssh-endpoint-auto-env", "runner-job-unidesk-ssh-transient-env-denied", "runner-k8s-job-session-pvc-volume-and-env"] };
|
||||
return { name: "runner-k8s-job", tests: ["runner-k8s-job-dry-run", "runner-k8s-job-codex-shell-sandbox-env", "runner-k8s-job-g14-egress-proxy-env", "runner-k8s-job-bounded-git-transport-env", "runner-k8s-job-deepseek-profile-dry-run", "runner-k8s-job-minimax-m3-profile-dry-run", "runner-k8s-job-dsflash-go-profile-dry-run", "runner-k8s-job-dsflash-go-legacy-secretref-normalized", "runner-k8s-job-create-api", "runner-k8s-job-retention-ttl", "runner-job-transient-env", "runner-job-transient-env-secretref", "runner-job-tool-credential-env", "runner-job-unidesk-ssh-tool-credential-env", "runner-job-tool-credential-volume", "runner-job-unidesk-ssh-endpoint-auto-env", "runner-job-unidesk-ssh-transient-env-denied", "runner-job-pre-create-retention", "runner-k8s-job-session-pvc-volume-and-env"] };
|
||||
} finally {
|
||||
await new Promise<void>((resolve) => server.server.close(() => resolve()));
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user