fix: add AgentRun runner retention cleanup

This commit is contained in:
Codex
2026-06-18 14:26:43 +00:00
parent dc19a07478
commit 424f7fe492
5 changed files with 607 additions and 2 deletions
+29
View File
@@ -154,6 +154,20 @@ controlPlane:
apiKeySecretRef:
name: agentrun-v01-api-key
key: HWLAB_API_KEY
retention:
maxRunners: 20
cleanupOrder: oldest-inactive-last-active-first
activeHeartbeatMaxAgeMs: 900000
selectors:
matchLabels:
app.kubernetes.io/part-of: agentrun
app.kubernetes.io/name: agentrun-runner
app.kubernetes.io/component: runner
jobNamePrefixes:
- agentrun-v01-runner
ageBasedCleanup:
enabled: false
maxAgeHours: 48
localPostgres:
enabled: true
serviceName: agentrun-v01-postgres
@@ -311,6 +325,21 @@ controlPlane:
- sub2api-egress-proxy.platform-infra
- sub2api-egress-proxy.platform-infra.svc
- sub2api-egress-proxy.platform-infra.svc.cluster.local
retention:
maxRunners: 20
cleanupOrder: oldest-inactive-last-active-first
activeHeartbeatMaxAgeMs: 900000
selectors:
matchLabels:
app.kubernetes.io/part-of: agentrun
app.kubernetes.io/name: agentrun-runner
app.kubernetes.io/component: runner
jobNamePrefixes:
- agentrun-v02-runner
- agentrun-v01-runner
ageBasedCleanup:
enabled: false
maxAgeHours: 48
localPostgres:
enabled: false
gitMirror:
+5 -1
View File
@@ -71,6 +71,8 @@ bun scripts/cli.ts agentrun control-plane trigger-current --dry-run
bun scripts/cli.ts agentrun control-plane trigger-current --confirm
bun scripts/cli.ts agentrun control-plane refresh --dry-run
bun scripts/cli.ts agentrun control-plane refresh --confirm
bun scripts/cli.ts agentrun control-plane cleanup-runners --node D601 --lane v02 --dry-run
bun scripts/cli.ts agentrun control-plane cleanup-runners --node D601 --lane v02 --confirm
bun scripts/cli.ts agentrun control-plane cleanup-runs --min-age-minutes 30 --limit 200 --dry-run
bun scripts/cli.ts agentrun control-plane cleanup-runs --min-age-minutes 30 --limit 200 --confirm
bun scripts/cli.ts agentrun control-plane cleanup-released-pvs --limit 200 --dry-run
@@ -102,7 +104,9 @@ Provider credential Secret 的 `auth.json` 和 `config.toml` 也必须按 lane
AgentRun resource/session client policy 也由 `config/agentrun.yaml` 声明。`client.sessionPolicy``agentrun send session/...` 和相关 session payload 生成的默认 `tenantId``projectId``providerId``backendProfile``workspaceRef` 和 execution policy 来源;lane `secrets[].providerCredential.profile` 声明 provider credential Secret 归属,UniDesk CLI 只按 YAML 聚合 Secret name/key,不再用代码拼接 provider Secret 名称。只读入口 `bun scripts/cli.ts agentrun explain session-policy` 用于查看当前默认 lane、session policy、实际 executionPolicy payload 和 provider credential binding 来源;输出只能包含 Secret metadata、key 名和 `valuesPrinted=false`,不得打印 Secret value。
`cleanup-runs` 是 AgentRun `v0.1` 完成态 CI workspace retention 入口,只清理 `agentrun-ci` namespace 中超过 `--min-age-minutes``agentrun-v01-ci-*` PipelineRun,通过 Tekton ownerRef 释放临时 workspace PVC。dry-run 必须披露候选 PipelineRun、owned PVC、active mount 保护、local-path 实际估算 bytes 和 confirm 命令。默认保护最新完成的 PipelineRun,保留当前 CI/CD 状态证据。`cleanup-released-pvs` 是二次回收入口,只处理 `agentrun-ci``local-path``Delete` reclaim policy 的 `Released` PV;它不触碰 `agentrun-v01` runtime namespace、业务 PVC、Secret、registry storage 或 GitOps desired state。磁盘治理和 G14 safe-stop 规则见 `docs/reference/gc.md`
`cleanup-runners` 是 AgentRun runtime runner retention 入口,只清理 YAML 选中 lane 的 runtime namespace 中匹配 `deployment.runner.retention.selectors` 的 runner Job/Pod。runner 上限、最后活跃排序策略、active heartbeat 窗口、Job name prefix 和是否启用 age-based cleanup 都以 `config/agentrun.yaml` 为唯一真相;命令行不得覆盖这些数值。dry-run 必须披露清理前 runner Job 数、runner 非终态 Pod 数、按最后活跃时间排序的 inactive 候选、selected runner Job、manager facts 可用性和 active run 风险;confirm 只能删除 selected runner Job,并重新统计清理后 runner Job/Pod 数。manager facts 不可用时,只允许清理终态或无活动 Pod 的安全候选,并保留风险字段,不能把 Kubernetes 创建时间冒充为完整最后活跃事实
`cleanup-runs` 是 AgentRun `v0.1` 完成态 CI workspace retention 入口,只清理 `agentrun-ci` namespace 中超过 `--min-age-minutes``agentrun-v01-ci-*` PipelineRun,通过 Tekton ownerRef 释放临时 workspace PVC。dry-run 必须披露候选 PipelineRun、owned PVC、active mount 保护、local-path 实际估算 bytes 和 confirm 命令。默认保护最新完成的 PipelineRun,保留当前 CI/CD 状态证据。`cleanup-released-pvs` 是二次回收入口,只处理 `agentrun-ci``local-path``Delete` reclaim policy 的 `Released` PV;它不触碰 AgentRun runtime namespace、业务 PVC、Secret、registry storage 或 GitOps desired state。磁盘治理和 G14 safe-stop 规则见 `docs/reference/gc.md`
Runner 持久化、空闲退出窗口和 session PVC 相关运维参数的唯一归属是 UniDesk `config/agentrun.yaml` 中目标 lane 的 `deployment.runner.*` 配置;不要在 HWLAB 仓库新增运维 YAML,也不要让 AgentRun service repo 的 `deploy.json` 或 Kubernetes runtime 状态反向成为配置真相。Manager Deployment 需要把 YAML 渲染为 `AGENTRUN_RUNNER_IDLE_TIMEOUT_MS` 等 manager env,但这只能证明控制面配置已到 manager;关闭 HWLAB/AgentRun 长会话或 runner 持久化问题前,还必须通过原入口创建新 turn,并检查新建 runner Job 的 env、session PVC 和 `AGENTRUN_SOURCE_COMMIT`。AgentRun manager 内所有创建 runner Job 的路径,包括 `/api/v1/runs/:runId/runner-jobs`、session send 和 queue dispatch,都必须复用同一 runner defaults helper;新增 `deployment.runner.*` 字段时禁止在某条 route 手写一份 defaults。
+72
View File
@@ -106,6 +106,7 @@ export interface AgentRunLaneSpec {
readonly apiKeySecretRef: { readonly name: string; readonly key: string };
readonly egressProxyUrl: string | null;
readonly noProxyExtra: readonly string[];
readonly retention: AgentRunRunnerRetentionSpec;
};
readonly localPostgres: {
readonly enabled: boolean;
@@ -174,6 +175,20 @@ export interface AgentRunImageBuildSpec {
readonly pollSeconds: number;
}
export interface AgentRunRunnerRetentionSpec {
readonly maxRunners: number;
readonly cleanupOrder: "oldest-inactive-last-active-first";
readonly activeHeartbeatMaxAgeMs: number;
readonly selectors: {
readonly matchLabels: Readonly<Record<string, string>>;
readonly jobNamePrefixes: readonly string[];
};
readonly ageBasedCleanup: {
readonly enabled: boolean;
readonly maxAgeHours: number | null;
};
}
export interface AgentRunLaneTarget {
readonly configPath: string;
readonly spec: AgentRunLaneSpec;
@@ -289,6 +304,7 @@ export function agentRunLaneSummary(spec: AgentRunLaneSpec): Record<string, unkn
apiKeySecretRef: spec.deployment.runner.apiKeySecretRef,
egressProxyUrl: spec.deployment.runner.egressProxyUrl,
noProxyExtra: spec.deployment.runner.noProxyExtra,
retention: spec.deployment.runner.retention,
},
localPostgres: spec.deployment.localPostgres,
},
@@ -525,17 +541,46 @@ function parseDeployment(input: Record<string, unknown>, path: string): AgentRun
apiKeySecretRef: parseSecretRef(recordField(runner, "apiKeySecretRef", `${path}.runner`), `${path}.runner.apiKeySecretRef`),
egressProxyUrl: optionalStringField(runner, "egressProxyUrl", `${path}.runner`) ?? null,
noProxyExtra: optionalStringArrayField(runner, "noProxyExtra", `${path}.runner`),
retention: parseRunnerRetention(recordField(runner, "retention", `${path}.runner`), `${path}.runner.retention`),
},
localPostgres: parseLocalPostgres(localPostgres, `${path}.localPostgres`),
};
}
function parseRunnerRetention(input: Record<string, unknown>, path: string): AgentRunRunnerRetentionSpec {
const selectors = recordField(input, "selectors", path);
const ageBasedCleanup = recordField(input, "ageBasedCleanup", path);
return {
maxRunners: positiveIntegerField(input, "maxRunners", path),
cleanupOrder: enumField(input, "cleanupOrder", path, ["oldest-inactive-last-active-first"]),
activeHeartbeatMaxAgeMs: positiveIntegerField(input, "activeHeartbeatMaxAgeMs", path),
selectors: {
matchLabels: kubernetesLabelRecordField(recordField(selectors, "matchLabels", `${path}.selectors`), `${path}.selectors.matchLabels`),
jobNamePrefixes: stringArrayField(selectors, "jobNamePrefixes", `${path}.selectors`).map((prefix, index) => {
validateKubernetesNamePrefix(prefix, `${path}.selectors.jobNamePrefixes[${index}]`);
return prefix;
}),
},
ageBasedCleanup: {
enabled: booleanField(ageBasedCleanup, "enabled", `${path}.ageBasedCleanup`),
maxAgeHours: optionalPositiveIntegerField(ageBasedCleanup, "maxAgeHours", `${path}.ageBasedCleanup`) ?? null,
},
};
}
function positiveIntegerField(input: Record<string, unknown>, key: string, path: string): number {
const value = integerField(input, key, path);
if (value <= 0) throw new Error(`${path}.${key} must be a positive integer`);
return value;
}
function optionalPositiveIntegerField(input: Record<string, unknown>, key: string, path: string): number | undefined {
const value = optionalIntegerField(input, key, path);
if (value === undefined) return undefined;
if (value <= 0) throw new Error(`${path}.${key} must be a positive integer when set`);
return value;
}
function parseLocalPostgres(input: Record<string, unknown>, path: string): AgentRunLaneSpec["deployment"]["localPostgres"] {
const enabled = booleanField(input, "enabled", path);
if (!enabled) {
@@ -690,6 +735,19 @@ function stringRecordField(obj: Record<string, unknown>, path: string): Readonly
return result;
}
function kubernetesLabelRecordField(obj: Record<string, unknown>, path: string): Readonly<Record<string, string>> {
const result: Record<string, string> = {};
for (const [key, value] of Object.entries(obj)) {
validateKubernetesLabelKey(key, `${path}.${key}`);
if (typeof value !== "string" || value.trim().length === 0) throw new Error(`${path}.${key} must be a non-empty string`);
const trimmed = value.trim();
validateKubernetesLabelValue(trimmed, `${path}.${key}`);
result[key] = trimmed;
}
if (Object.keys(result).length === 0) throw new Error(`${path} must declare at least one label`);
return result;
}
function enumField<T extends string>(obj: Record<string, unknown>, key: string, path: string, values: readonly T[]): T {
const value = stringField(obj, key, path);
if (!values.includes(value as T)) throw new Error(`${path}.${key} must be one of ${values.join(", ")}`);
@@ -738,3 +796,17 @@ function urlField(obj: Record<string, unknown>, key: string, path: string): stri
function validateSimpleId(value: string, path: string): void {
if (!/^[A-Za-z0-9._-]+$/u.test(value)) throw new Error(`${path}.${value} must use a simple id`);
}
function validateKubernetesNamePrefix(value: string, path: string): void {
if (!/^[a-z0-9]([-a-z0-9]*[a-z0-9])?$/u.test(value)) throw new Error(`${path} must be a Kubernetes name prefix`);
}
function validateKubernetesLabelKey(value: string, path: string): void {
const parts = value.split("/");
const name = parts.length === 2 ? parts[1] : parts[0];
if (parts.length > 2 || !name || !/^[A-Za-z0-9]([A-Za-z0-9_.-]{0,61}[A-Za-z0-9])?$/u.test(name)) throw new Error(`${path} must be a Kubernetes label key`);
}
function validateKubernetesLabelValue(value: string, path: string): void {
if (!/^[A-Za-z0-9]([A-Za-z0-9_.-]{0,61}[A-Za-z0-9])?$/u.test(value)) throw new Error(`${path} must be a Kubernetes label value`);
}
+7
View File
@@ -438,6 +438,13 @@ function managerEnv(spec: AgentRunLaneSpec, sourceCommit: string, imageRef: stri
{ name: "AGENTRUN_RUNNER_IMAGE", value: imageRef },
{ name: "AGENTRUN_RUNNER_SERVICE_ACCOUNT", value: spec.deployment.runner.serviceAccount },
{ name: "AGENTRUN_RUNNER_IDLE_TIMEOUT_MS", value: String(spec.deployment.runner.idleTimeoutMs) },
{ name: "AGENTRUN_RUNNER_RETENTION_MAX_RUNNERS", value: String(spec.deployment.runner.retention.maxRunners) },
{ name: "AGENTRUN_RUNNER_RETENTION_CLEANUP_ORDER", value: spec.deployment.runner.retention.cleanupOrder },
{ name: "AGENTRUN_RUNNER_RETENTION_ACTIVE_HEARTBEAT_MAX_AGE_MS", value: String(spec.deployment.runner.retention.activeHeartbeatMaxAgeMs) },
{ name: "AGENTRUN_RUNNER_RETENTION_MATCH_LABELS_JSON", value: JSON.stringify(spec.deployment.runner.retention.selectors.matchLabels) },
{ name: "AGENTRUN_RUNNER_RETENTION_JOB_NAME_PREFIXES", value: spec.deployment.runner.retention.selectors.jobNamePrefixes.join(",") },
{ name: "AGENTRUN_RUNNER_RETENTION_AGE_BASED_CLEANUP_ENABLED", value: String(spec.deployment.runner.retention.ageBasedCleanup.enabled) },
...(spec.deployment.runner.retention.ageBasedCleanup.maxAgeHours === null ? [] : [{ name: "AGENTRUN_RUNNER_RETENTION_AGE_BASED_MAX_AGE_HOURS", value: String(spec.deployment.runner.retention.ageBasedCleanup.maxAgeHours) }]),
...(spec.deployment.runner.egressProxyUrl === null ? [] : [{ name: "AGENTRUN_RUNNER_EGRESS_PROXY_URL", value: spec.deployment.runner.egressProxyUrl }]),
...(spec.deployment.runner.noProxyExtra.length === 0 ? [] : [{ name: "AGENTRUN_RUNNER_NO_PROXY_EXTRA", value: spec.deployment.runner.noProxyExtra.join(",") }]),
{ name: "AGENTRUN_API_KEY", valueFrom: { secretKeyRef: spec.deployment.manager.apiKeySecretRef } },
+494 -1
View File
@@ -65,6 +65,8 @@ export function agentRunHelp(): unknown {
"bun scripts/cli.ts agentrun control-plane trigger-current --confirm",
"bun scripts/cli.ts agentrun control-plane refresh --dry-run",
"bun scripts/cli.ts agentrun control-plane refresh --confirm",
"bun scripts/cli.ts agentrun control-plane cleanup-runners --node D601 --lane v02 --dry-run",
"bun scripts/cli.ts agentrun control-plane cleanup-runners --node D601 --lane v02 --confirm",
"bun scripts/cli.ts agentrun control-plane cleanup-runs --min-age-minutes 30 --limit 200 --dry-run",
"bun scripts/cli.ts agentrun control-plane cleanup-runs --min-age-minutes 30 --limit 200 --confirm",
"bun scripts/cli.ts agentrun control-plane cleanup-released-pvs --limit 200 --dry-run",
@@ -99,6 +101,7 @@ export async function runAgentRunCommand(config: UniDeskConfig | null, args: str
if (action === "expose") return await exposeAgentRun(config, parseConfirmOptions(actionArgs));
if (action === "trigger-current") return await triggerCurrent(config, parseTriggerOptions(actionArgs));
if (action === "refresh") return await refresh(config, parseRefreshOptions(actionArgs));
if (action === "cleanup-runners") return await cleanupRunners(config, parseCleanupRunnersOptions(actionArgs));
if (action === "cleanup-runs") return await cleanupRuns(config, parseCleanupRunsOptions(actionArgs));
if (action === "cleanup-released-pvs") return await cleanupReleasedPvs(config, parseCleanupReleasedPvOptions(actionArgs));
}
@@ -227,7 +230,7 @@ function agentRunHelpText(args: string[]): string {
return [
"Usage: bun scripts/cli.ts agentrun control-plane <action> [options]",
"",
"Actions: plan, apply, status, secret-sync, expose, trigger-current, refresh, cleanup-runs, cleanup-released-pvs",
"Actions: plan, apply, status, secret-sync, expose, trigger-current, refresh, cleanup-runners, cleanup-runs, cleanup-released-pvs",
"Examples:",
" bun scripts/cli.ts agentrun control-plane plan --node D601 --lane v02",
" bun scripts/cli.ts agentrun control-plane apply --node D601 --lane v02 --dry-run",
@@ -238,6 +241,7 @@ function agentRunHelpText(args: string[]): string {
" bun scripts/cli.ts agentrun control-plane status --pipeline-run agentrun-vNN-ci-<short-sha>",
" bun scripts/cli.ts agentrun control-plane expose --dry-run",
" bun scripts/cli.ts agentrun control-plane trigger-current --dry-run",
" bun scripts/cli.ts agentrun control-plane cleanup-runners --node D601 --lane v02 --dry-run",
" bun scripts/cli.ts agentrun control-plane cleanup-runs --min-age-minutes 30 --limit 200 --dry-run",
].join("\n");
}
@@ -1669,6 +1673,12 @@ interface GitMirrorOptions extends ConfirmOptions {
wait: boolean;
}
interface CleanupRunnersOptions extends ConfirmOptions {
node: string | null;
lane: string | null;
timeoutSeconds: number;
}
interface CleanupRunsOptions extends ConfirmOptions {
node: string | null;
lane: string | null;
@@ -1924,6 +1934,17 @@ function parseGitMirrorOptions(args: string[]): GitMirrorOptions {
};
}
function parseCleanupRunnersOptions(args: string[]): CleanupRunnersOptions {
validateOptions(args, new Set(["--confirm", "--dry-run"]), new Set(["--timeout-seconds", "--node", "--lane"]));
const base = parseConfirmOptions(args);
return {
...base,
node: optionValue(args, "--node") ?? null,
lane: optionValue(args, "--lane") ?? null,
timeoutSeconds: positiveIntegerOption(args, "--timeout-seconds", 180, 600),
};
}
function parseCleanupRunsOptions(args: string[]): CleanupRunsOptions {
validateOptions(args, new Set(["--confirm", "--dry-run"]), new Set(["--min-age-minutes", "--limit", "--timeout-seconds", "--node", "--lane"]));
const base = parseConfirmOptions(args);
@@ -2882,6 +2903,43 @@ async function refreshYamlLane(config: UniDeskConfig, options: RefreshOptions):
};
}
async function cleanupRunners(config: UniDeskConfig, options: CleanupRunnersOptions): Promise<Record<string, unknown>> {
const { configPath, spec } = resolveAgentRunLaneTarget(options);
const result = await capture(config, spec.nodeKubeRoute, ["sh", "--", cleanupRunnersScript(options, spec)]);
const payload = captureJsonPayload(result);
const ok = result.exitCode === 0 && payload.ok !== false;
const base = {
...payload,
ok,
command: "agentrun control-plane cleanup-runners",
configPath,
target: agentRunLaneSummary(spec),
mode: options.dryRun || !options.confirm ? "dry-run" : "confirmed-cleanup",
namespace: spec.runtime.namespace,
retention: spec.deployment.runner.retention,
probe: compactCapture(result, { full: result.exitCode !== 0, stdoutTailChars: 3000, stderrTailChars: 3000 }),
};
if (options.dryRun || !options.confirm) {
return {
...base,
dryRun: true,
mutation: false,
next: {
confirm: `bun scripts/cli.ts agentrun control-plane cleanup-runners --node ${spec.nodeId} --lane ${spec.lane} --confirm`,
},
};
}
return {
...base,
dryRun: false,
mutation: true,
followUp: {
dryRun: `bun scripts/cli.ts agentrun control-plane cleanup-runners --node ${spec.nodeId} --lane ${spec.lane} --dry-run`,
status: `bun scripts/cli.ts agentrun control-plane status --node ${spec.nodeId} --lane ${spec.lane}`,
},
};
}
async function cleanupRuns(config: UniDeskConfig, options: CleanupRunsOptions): Promise<Record<string, unknown>> {
const { configPath, spec } = resolveAgentRunLaneTarget(options);
const result = await capture(config, spec.nodeKubeRoute, ["sh", "--", cleanupRunsScript(options, spec.ci.namespace, spec.ci.pipelineRunPrefix)]);
@@ -2958,6 +3016,61 @@ async function cleanupReleasedPvs(config: UniDeskConfig, options: CleanupRelease
};
}
function cleanupRunnersScript(options: CleanupRunnersOptions, spec: AgentRunLaneSpec): string {
const retention = spec.deployment.runner.retention;
const matchLabelsB64 = Buffer.from(JSON.stringify(retention.selectors.matchLabels), "utf8").toString("base64");
const jobNamePrefixesB64 = Buffer.from(JSON.stringify(retention.selectors.jobNamePrefixes), "utf8").toString("base64");
return [
"set -eu",
`namespace=${shQuote(spec.runtime.namespace)}`,
`manager_deployment=${shQuote(spec.runtime.managerDeployment)}`,
`max_runners=${String(retention.maxRunners)}`,
`cleanup_order=${shQuote(retention.cleanupOrder)}`,
`active_heartbeat_max_age_ms=${String(retention.activeHeartbeatMaxAgeMs)}`,
`age_based_cleanup_enabled=${retention.ageBasedCleanup.enabled ? "true" : "false"}`,
`age_based_max_age_hours=${retention.ageBasedCleanup.maxAgeHours === null ? "" : String(retention.ageBasedCleanup.maxAgeHours)}`,
`timeout_seconds=${String(options.timeoutSeconds)}`,
`match_labels_json_b64=${shQuote(matchLabelsB64)}`,
`job_name_prefixes_json_b64=${shQuote(jobNamePrefixesB64)}`,
"match_labels_json=$(printf '%s' \"$match_labels_json_b64\" | base64 -d)",
"job_name_prefixes_json=$(printf '%s' \"$job_name_prefixes_json_b64\" | base64 -d)",
"tmp_dir=$(mktemp -d)",
"trap 'rm -rf \"$tmp_dir\"' EXIT",
"kubectl -n \"$namespace\" get job -o json > \"$tmp_dir/jobs.json\"",
"kubectl -n \"$namespace\" get pod -o json > \"$tmp_dir/pods.json\"",
"facts_exit=0",
"set +e",
"kubectl -n \"$namespace\" exec -i deploy/\"$manager_deployment\" -- env RETENTION_NAMESPACE=\"$namespace\" sh -lc 'cat >/tmp/agentrun-runner-retention.mjs && bun /tmp/agentrun-runner-retention.mjs' > \"$tmp_dir/runner-facts.json\" 2> \"$tmp_dir/runner-facts.err\" <<'NODE'",
cleanupRunnersFactsNodeScript(),
"NODE",
"facts_exit=$?",
"set -e",
"if [ \"$facts_exit\" -ne 0 ]; then",
" printf '%s\\n' '{\"ok\":false,\"items\":[],\"failureKind\":\"manager-facts-unavailable\",\"valuesPrinted\":false}' > \"$tmp_dir/runner-facts.json\"",
"fi",
"MATCH_LABELS_JSON=\"$match_labels_json\" JOB_NAME_PREFIXES_JSON=\"$job_name_prefixes_json\" MAX_RUNNERS=\"$max_runners\" CLEANUP_ORDER=\"$cleanup_order\" ACTIVE_HEARTBEAT_MAX_AGE_MS=\"$active_heartbeat_max_age_ms\" AGE_BASED_CLEANUP_ENABLED=\"$age_based_cleanup_enabled\" AGE_BASED_MAX_AGE_HOURS=\"$age_based_max_age_hours\" FACTS_EXIT=\"$facts_exit\" TMP_DIR=\"$tmp_dir\" NAMESPACE=\"$namespace\" node <<'NODE' > \"$tmp_dir/plan.json\"",
cleanupRunnersPlanNodeScript(),
"NODE",
"if [ " + shQuote(options.confirm && !options.dryRun ? "true" : "false") + " != true ]; then",
" cat \"$tmp_dir/plan.json\"",
" exit 0",
"fi",
"node -e 'const fs=require(\"node:fs\"); const plan=JSON.parse(fs.readFileSync(process.argv[1],\"utf8\")); const names=Array.isArray(plan.selectedRunnerJobs)?plan.selectedRunnerJobs:[]; fs.writeFileSync(process.argv[2], names.join(\"\\n\") + (names.length>0?\"\\n\":\"\"));' \"$tmp_dir/plan.json\" \"$tmp_dir/selected-jobs.txt\"",
"delete_exit=0",
"if [ -s \"$tmp_dir/selected-jobs.txt\" ]; then",
" xargs -r kubectl -n \"$namespace\" delete job --ignore-not-found=true --wait=true --timeout=\"${timeout_seconds}s\" < \"$tmp_dir/selected-jobs.txt\" > \"$tmp_dir/delete.out\" 2> \"$tmp_dir/delete.err\" || delete_exit=$?",
"else",
" : > \"$tmp_dir/delete.out\"",
" : > \"$tmp_dir/delete.err\"",
"fi",
"kubectl -n \"$namespace\" get job -o json > \"$tmp_dir/jobs-after.json\"",
"kubectl -n \"$namespace\" get pod -o json > \"$tmp_dir/pods-after.json\"",
"DELETE_EXIT=\"$delete_exit\" TMP_DIR=\"$tmp_dir\" node <<'NODE'",
cleanupRunnersFinalizeNodeScript(),
"NODE",
].join("\n");
}
function cleanupRunsScript(options: CleanupRunsOptions, namespace: string, pipelineRunPrefix: string): string {
return [
"set -eu",
@@ -4085,6 +4198,386 @@ function manifestObjectRef(object: Record<string, unknown>): Record<string, unkn
};
}
function cleanupRunnersFactsNodeScript(): string {
return String.raw`
function iso(value) {
if (value instanceof Date) return value.toISOString();
if (typeof value === "string" && value.length > 0) {
const ms = Date.parse(value);
return Number.isFinite(ms) ? new Date(ms).toISOString() : null;
}
return null;
}
try {
const { Pool } = await import("pg");
const namespace = process.env.RETENTION_NAMESPACE || process.env.AGENTRUN_RUNTIME_NAMESPACE || "";
const pool = new Pool({ connectionString: process.env.DATABASE_URL });
const sql = [
"SELECT",
" j.id, j.run_id, j.command_id, j.attempt_id, j.runner_id, j.namespace, j.job_name, j.created_at, j.updated_at,",
" r.registered_at AS runner_registered_at, r.heartbeat_at AS runner_heartbeat_at,",
" rr.status AS run_status, rr.terminal_status AS run_terminal_status, rr.failure_kind AS run_failure_kind, rr.updated_at AS run_updated_at",
"FROM agentrun_runner_jobs j",
"LEFT JOIN agentrun_runners r ON r.id = j.runner_id",
"LEFT JOIN agentrun_runs rr ON rr.id = j.run_id",
"WHERE j.namespace = $1",
"ORDER BY COALESCE(r.heartbeat_at, j.updated_at, j.created_at) ASC",
].join("\n");
const result = await pool.query(sql, [namespace]);
await pool.end();
console.log(JSON.stringify({
ok: true,
itemCount: result.rows.length,
items: result.rows.map((row) => ({
id: row.id,
runId: row.run_id,
commandId: row.command_id,
attemptId: row.attempt_id,
runnerId: row.runner_id,
namespace: row.namespace,
jobName: row.job_name,
runnerJobCreatedAt: iso(row.created_at),
runnerJobUpdatedAt: iso(row.updated_at),
runnerRegisteredAt: iso(row.runner_registered_at),
runnerHeartbeatAt: iso(row.runner_heartbeat_at),
runStatus: row.run_status || null,
runTerminalStatus: row.run_terminal_status || null,
runFailureKind: row.run_failure_kind || null,
runUpdatedAt: iso(row.run_updated_at),
valuesPrinted: false,
})),
valuesPrinted: false,
}));
} catch (error) {
console.log(JSON.stringify({
ok: false,
itemCount: 0,
items: [],
failureKind: "manager-runner-facts-query-failed",
message: error instanceof Error ? error.message : String(error),
valuesPrinted: false,
}));
}
`;
}
function cleanupRunnersPlanNodeScript(): string {
return String.raw`
const fs = require("node:fs");
const path = require("node:path");
const tmp = process.env.TMP_DIR;
const namespace = process.env.NAMESPACE;
const maxRunners = Number(process.env.MAX_RUNNERS || 0);
const cleanupOrder = process.env.CLEANUP_ORDER || "";
const activeHeartbeatMaxAgeMs = Number(process.env.ACTIVE_HEARTBEAT_MAX_AGE_MS || 0);
const ageBasedCleanupEnabled = process.env.AGE_BASED_CLEANUP_ENABLED === "true";
const ageBasedMaxAgeHours = process.env.AGE_BASED_MAX_AGE_HOURS ? Number(process.env.AGE_BASED_MAX_AGE_HOURS) : null;
const matchLabels = JSON.parse(process.env.MATCH_LABELS_JSON || "{}");
const jobNamePrefixes = JSON.parse(process.env.JOB_NAME_PREFIXES_JSON || "[]");
const now = Date.now();
function readJson(name) {
return JSON.parse(fs.readFileSync(path.join(tmp, name), "utf8"));
}
function dateMs(value) {
const ms = Date.parse(value || "");
return Number.isFinite(ms) ? ms : null;
}
function ageMs(value) {
const ms = dateMs(value);
return ms === null ? null : Math.max(0, now - ms);
}
function labelsOf(item) {
return item?.metadata?.labels && typeof item.metadata.labels === "object" ? item.metadata.labels : {};
}
function annotationsOf(item) {
return item?.metadata?.annotations && typeof item.metadata.annotations === "object" ? item.metadata.annotations : {};
}
function matchesLabels(labels) {
return Object.entries(matchLabels).every(([key, value]) => labels?.[key] === value);
}
function matchesPrefix(name) {
return jobNamePrefixes.length === 0 || jobNamePrefixes.some((prefix) => name === prefix || name.startsWith(prefix + "-"));
}
function terminalRunStatus(value) {
return value === "completed" || value === "failed" || value === "blocked" || value === "cancelled";
}
function jobCondition(job, type) {
const conditions = Array.isArray(job?.status?.conditions) ? job.status.conditions : [];
return conditions.find((entry) => entry?.type === type && entry?.status === "True") || null;
}
function isTerminalJob(job) {
return jobCondition(job, "Complete") !== null || jobCondition(job, "Failed") !== null || Number(job?.status?.succeeded || 0) > 0 || Number(job?.status?.failed || 0) > 0;
}
function isTerminalPod(pod) {
const phase = pod?.status?.phase || "";
return phase === "Succeeded" || phase === "Failed";
}
function jobNameForPod(pod) {
const labels = labelsOf(pod);
if (typeof labels["job-name"] === "string") return labels["job-name"];
const owner = (Array.isArray(pod?.metadata?.ownerReferences) ? pod.metadata.ownerReferences : []).find((entry) => entry?.kind === "Job" && typeof entry?.name === "string");
return owner?.name || null;
}
function preferredLastActiveAt(fact, job) {
return fact?.runnerHeartbeatAt
|| fact?.runUpdatedAt
|| fact?.runnerJobUpdatedAt
|| job?.status?.completionTime
|| job?.status?.startTime
|| job?.metadata?.creationTimestamp
|| null;
}
function sortOldestLastActive(left, right) {
const leftMs = dateMs(left.lastActiveAt) ?? dateMs(left.createdAt) ?? Number.MAX_SAFE_INTEGER;
const rightMs = dateMs(right.lastActiveAt) ?? dateMs(right.createdAt) ?? Number.MAX_SAFE_INTEGER;
if (leftMs !== rightMs) return leftMs - rightMs;
return String(left.name).localeCompare(String(right.name));
}
function countNonTerminalPods(pods) {
return pods.filter((pod) => !isTerminalPod(pod)).length;
}
const jobs = readJson("jobs.json");
const pods = readJson("pods.json");
const facts = readJson("runner-facts.json");
const factItems = Array.isArray(facts.items) ? facts.items : [];
const factByJob = new Map(factItems.map((item) => [item.jobName, item]));
const allPods = Array.isArray(pods.items) ? pods.items : [];
const podsByJob = new Map();
for (const pod of allPods) {
const jobName = jobNameForPod(pod);
if (!jobName) continue;
const entry = podsByJob.get(jobName) || [];
entry.push(pod);
podsByJob.set(jobName, entry);
}
const matchedJobs = (Array.isArray(jobs.items) ? jobs.items : [])
.map((job) => {
const name = job?.metadata?.name || "";
return { job, name, labels: labelsOf(job), annotations: annotationsOf(job) };
})
.filter((item) => item.name && matchesLabels(item.labels) && matchesPrefix(item.name));
const runnerJobs = matchedJobs.map(({ job, name, labels, annotations }) => {
const fact = factByJob.get(name) || null;
const jobPods = podsByJob.get(name) || [];
const nonTerminalPods = jobPods.filter((pod) => !isTerminalPod(pod));
const terminatingPods = nonTerminalPods.filter((pod) => typeof pod?.metadata?.deletionTimestamp === "string");
const terminal = isTerminalJob(job) || terminalRunStatus(fact?.runStatus) || terminalRunStatus(fact?.runTerminalStatus);
const heartbeatAgeMs = ageMs(fact?.runnerHeartbeatAt);
const heartbeatFresh = heartbeatAgeMs !== null && heartbeatAgeMs <= activeHeartbeatMaxAgeMs;
const hasActivePod = Number(job?.status?.active || 0) > 0 || nonTerminalPods.length > 0;
const lastActiveAt = preferredLastActiveAt(fact, job);
let inactive = false;
let protectedActive = false;
let classification = "unknown";
if (terminal) {
inactive = true;
classification = "terminal";
} else if (heartbeatFresh) {
protectedActive = true;
classification = "active-fresh-heartbeat";
} else if (heartbeatAgeMs !== null) {
inactive = true;
classification = "inactive-stale-heartbeat";
} else if (terminatingPods.length > 0 && nonTerminalPods.length === terminatingPods.length) {
inactive = true;
classification = "inactive-terminating";
} else if (hasActivePod) {
protectedActive = true;
classification = facts.ok === true ? "active-no-heartbeat-row" : "active-unverified-manager-facts";
} else {
inactive = true;
classification = "inactive-no-active-pod";
}
return {
name,
namespace,
createdAt: job?.metadata?.creationTimestamp || null,
lastActiveAt,
lastActiveAgeMs: ageMs(lastActiveAt),
lastActiveSource: fact?.runnerHeartbeatAt ? "runner-heartbeat" : fact?.runUpdatedAt ? "run-updated" : fact?.runnerJobUpdatedAt ? "runner-job-updated" : job?.status?.completionTime ? "job-completion" : job?.status?.startTime ? "job-start" : "job-created",
runId: annotations["agentrun.pikastech.local/run-id"] || fact?.runId || null,
commandId: annotations["agentrun.pikastech.local/command-id"] || fact?.commandId || null,
runnerJobId: fact?.id || null,
runnerId: fact?.runnerId || null,
runStatus: fact?.runStatus || null,
runTerminalStatus: fact?.runTerminalStatus || null,
jobStatus: {
active: Number(job?.status?.active || 0),
succeeded: Number(job?.status?.succeeded || 0),
failed: Number(job?.status?.failed || 0),
ready: Number(job?.status?.ready || 0),
terminating: Number(job?.status?.terminating || 0),
},
podCount: jobPods.length,
nonTerminalPodCount: nonTerminalPods.length,
terminatingPodCount: terminatingPods.length,
inactive,
protectedActive,
classification,
labels,
valuesPrinted: false,
};
});
const runnerJobCount = runnerJobs.length;
const inactiveCandidates = runnerJobs.filter((item) => item.inactive).sort(sortOldestLastActive);
const overLimitCount = Math.max(0, runnerJobCount - maxRunners);
const selectedByName = new Map();
const selectionReasons = new Map();
for (const item of inactiveCandidates.slice(0, overLimitCount)) {
selectedByName.set(item.name, item);
selectionReasons.set(item.name, "over-max-runners");
}
if (ageBasedCleanupEnabled && ageBasedMaxAgeHours !== null) {
const maxAgeMs = ageBasedMaxAgeHours * 3600 * 1000;
for (const item of inactiveCandidates) {
const itemAgeMs = item.lastActiveAgeMs;
if (itemAgeMs !== null && itemAgeMs >= maxAgeMs) {
selectedByName.set(item.name, item);
if (!selectionReasons.has(item.name)) selectionReasons.set(item.name, "age-based-cleanup");
}
}
}
const selected = inactiveCandidates
.filter((item) => selectedByName.has(item.name))
.map((item) => ({ ...item, selectionReason: selectionReasons.get(item.name) || "selected" }));
const matchedPodNames = new Set(runnerJobs.map((item) => item.name));
const matchedPods = allPods.filter((pod) => {
const jobName = jobNameForPod(pod);
return jobName !== null && matchedPodNames.has(jobName);
});
const remainingAfterSelection = runnerJobCount - selected.length;
console.log(JSON.stringify({
ok: true,
planKind: "agentrun-runtime-runner-retention",
generatedAt: new Date(now).toISOString(),
namespace,
criteria: {
maxRunners,
cleanupOrder,
activeHeartbeatMaxAgeMs,
selectors: { matchLabels, jobNamePrefixes },
ageBasedCleanup: { enabled: ageBasedCleanupEnabled, maxAgeHours: ageBasedMaxAgeHours },
},
managerFacts: {
ok: facts.ok === true,
factCount: factItems.length,
factsExit: Number(process.env.FACTS_EXIT || 0),
failureKind: facts.failureKind || null,
message: facts.ok === true ? null : facts.message || null,
valuesPrinted: false,
},
runnerJobCount,
nonTerminalRunnerPodCount: countNonTerminalPods(matchedPods),
overLimitCount,
inactiveCandidateCount: inactiveCandidates.length,
protectedActiveRunnerCount: runnerJobs.filter((item) => item.protectedActive).length,
activeRunRisk: remainingAfterSelection > maxRunners,
remainingRunnerJobCountAfterSelection: remainingAfterSelection,
runnerJobs: runnerJobs.sort(sortOldestLastActive),
candidates: inactiveCandidates,
selected,
selectedRunnerJobs: selected.map((item) => item.name),
selectedRunnerJobCount: selected.length,
valuesPrinted: false,
}));
`;
}
function cleanupRunnersFinalizeNodeScript(): string {
return String.raw`
const fs = require("node:fs");
const path = require("node:path");
const tmp = process.env.TMP_DIR;
const deleteExit = Number(process.env.DELETE_EXIT || 0);
const plan = JSON.parse(fs.readFileSync(path.join(tmp, "plan.json"), "utf8"));
const jobsAfter = JSON.parse(fs.readFileSync(path.join(tmp, "jobs-after.json"), "utf8"));
const podsAfter = JSON.parse(fs.readFileSync(path.join(tmp, "pods-after.json"), "utf8"));
const matchLabels = plan.criteria?.selectors?.matchLabels || {};
const jobNamePrefixes = plan.criteria?.selectors?.jobNamePrefixes || [];
function labelsOf(item) {
return item?.metadata?.labels && typeof item.metadata.labels === "object" ? item.metadata.labels : {};
}
function matchesLabels(labels) {
return Object.entries(matchLabels).every(([key, value]) => labels?.[key] === value);
}
function matchesPrefix(name) {
return jobNamePrefixes.length === 0 || jobNamePrefixes.some((prefix) => name === prefix || name.startsWith(prefix + "-"));
}
function isTerminalPod(pod) {
const phase = pod?.status?.phase || "";
return phase === "Succeeded" || phase === "Failed";
}
function jobNameForPod(pod) {
const labels = labelsOf(pod);
if (typeof labels["job-name"] === "string") return labels["job-name"];
const owner = (Array.isArray(pod?.metadata?.ownerReferences) ? pod.metadata.ownerReferences : []).find((entry) => entry?.kind === "Job" && typeof entry?.name === "string");
return owner?.name || null;
}
function tail(name) {
try {
const text = fs.readFileSync(path.join(tmp, name), "utf8");
return text.length > 3000 ? text.slice(-3000) : text;
} catch {
return "";
}
}
const selected = new Set(Array.isArray(plan.selectedRunnerJobs) ? plan.selectedRunnerJobs : []);
const remainingJobs = (Array.isArray(jobsAfter.items) ? jobsAfter.items : [])
.map((job) => ({ name: job?.metadata?.name || "", labels: labelsOf(job) }))
.filter((item) => item.name && matchesLabels(item.labels) && matchesPrefix(item.name));
const remainingJobNames = new Set(remainingJobs.map((item) => item.name));
const matchedPods = (Array.isArray(podsAfter.items) ? podsAfter.items : []).filter((pod) => {
const jobName = jobNameForPod(pod);
return jobName !== null && remainingJobNames.has(jobName);
});
const remainingSelectedRunnerJobs = Array.from(selected).filter((name) => remainingJobNames.has(name));
const deletedRunnerJobs = Array.from(selected).filter((name) => !remainingJobNames.has(name));
console.log(JSON.stringify({
...plan,
ok: deleteExit === 0,
deletion: { exitCode: deleteExit, stdoutTail: tail("delete.out"), stderrTail: tail("delete.err") },
deletedRunnerJobs,
deletedRunnerJobCount: deletedRunnerJobs.length,
remainingSelectedRunnerJobs,
remainingSelectedRunnerJobCount: remainingSelectedRunnerJobs.length,
after: {
runnerJobCount: remainingJobs.length,
nonTerminalRunnerPodCount: matchedPods.filter((pod) => !isTerminalPod(pod)).length,
overLimitCount: Math.max(0, remainingJobs.length - Number(plan.criteria?.maxRunners || 0)),
},
valuesPrinted: false,
}));
`;
}
function cleanupRunsPlanNodeScript(): string {
return String.raw`
const fs = require("node:fs");