From 8d297a98232cc2bcf750c546b10c0fa7fa6b4879 Mon Sep 17 00:00:00 2001 From: lyon Date: Sat, 20 Jun 2026 16:27:25 +0800 Subject: [PATCH] fix: protect runner cleanup artifacts --- src/mgr/kubernetes-runner-job.ts | 1 + src/mgr/runner-retention.ts | 40 +++++++++++++-- src/runner/k8s-job.ts | 35 +++++++++++-- src/selftest/cases/20-runner-k8s-job.ts | 66 +++++++++++++++++++++++-- 4 files changed, 130 insertions(+), 12 deletions(-) diff --git a/src/mgr/kubernetes-runner-job.ts b/src/mgr/kubernetes-runner-job.ts index 884fe43..65fb566 100644 --- a/src/mgr/kubernetes-runner-job.ts +++ b/src/mgr/kubernetes-runner-job.ts @@ -234,6 +234,7 @@ export async function createKubernetesRunnerJob(options: { store: AgentRunStore; workReady: staticWorkReadyCapabilitySummary(), retention: { ttlSecondsAfterFinished: render.ttlSecondsAfterFinished, + ttlPolicy: render.ttlPolicy, runnerIdleTimeoutMs: render.runnerIdleTimeoutMs, preCreateCleanup: preCreateRetention, }, diff --git a/src/mgr/runner-retention.ts b/src/mgr/runner-retention.ts index abe942c..0e3527c 100644 --- a/src/mgr/runner-retention.ts +++ b/src/mgr/runner-retention.ts @@ -32,6 +32,7 @@ export interface RunnerRetentionSummary extends JsonRecord { orphanNonTerminalRunnerPodCountBefore: number; inactiveCandidateCount: number; protectedActiveRunnerCount: number; + protectedActiveRunners: JsonRecord[]; selectedRunnerCount: number; deletedRunnerJobCount: number; deletedRunnerPodCount: number; @@ -90,7 +91,8 @@ export async function enforceRunnerRetentionBeforeCreate(input: { store: AgentRu .filter((item) => !item.protectedActive) .sort(compareCandidates); const selected = candidates.slice(0, requiredDeleteCount); - const protectedActiveRunnerCount = before.jobs.filter((item) => item.protectedActive).length + before.pods.filter((item) => item.protectedActive).length; + const protectedActiveRunners = [...before.jobs, ...before.pods].filter((item) => item.protectedActive); + const protectedActiveRunnerCount = protectedActiveRunners.length; if (requiredDeleteCount > 0 && selected.length < requiredDeleteCount) { throw new AgentRunError("infra-failed", "runner retention cannot safely make room for a new runner", { httpStatus: 409, @@ -103,7 +105,9 @@ export async function enforceRunnerRetentionBeforeCreate(input: { store: AgentRu requiredDeleteCount, inactiveCandidateCount: candidates.length, protectedActiveRunnerCount, + protectedActiveRunners: protectedActiveRunners.map(protectedRunnerSummary), activeRunRisk: protectedActiveRunnerCount > 0, + cleanupPolicy: cleanupPolicySummary(), valuesPrinted: false, }, }); @@ -135,12 +139,14 @@ export async function enforceRunnerRetentionBeforeCreate(input: { store: AgentRu orphanNonTerminalRunnerPodCountBefore: before.orphanNonTerminalRunnerPodCount, inactiveCandidateCount: candidates.length, protectedActiveRunnerCount, + protectedActiveRunners: protectedActiveRunners.map(protectedRunnerSummary), selectedRunnerCount: selected.length, deletedRunnerJobCount, deletedRunnerPodCount, deletedAssociatedResourceCount, activeRunRisk: protectedActiveRunnerCount > 0, reason: requiredDeleteCount === 0 ? "within-limit" : "pre-create-retention", + cleanupPolicy: cleanupPolicySummary(), selected: selected.map((item) => ({ resourceKind: item.resourceKind, name: item.name, jobName: item.jobName, candidateKind: item.candidateKind, protectedActive: false, lastActiveAt: new Date(item.lastActiveAtMs).toISOString(), valuesPrinted: false })), valuesPrinted: false, } satisfies RunnerRetentionSummary; @@ -236,9 +242,9 @@ async function activityFor(store: AgentRunStore, input: { runId: string | null; try { command = await store.getCommand(input.commandId); } catch { command = null; } } const lastActiveAtMs = Math.max(input.createdAtMs, isoMs(run?.updatedAt), isoMs(command?.updatedAt), isoMs(run?.leaseExpiresAt)); - const active = run && !isTerminalRunStatus(run.status) && command && !isTerminalCommandState(command.state) && input.runnerId && run.claimedBy === input.runnerId && heartbeatFresh(run.leaseExpiresAt, input.activeHeartbeatMaxAgeMs); - if (active) { - return { runId: input.runId, commandId: input.commandId, runnerId: input.runnerId, protectedActive: true, protectedReason: "fresh-active-run", candidateKind: "inactive", lastActiveAtMs }; + if (run && command && !isTerminalRunStatus(run.status) && !isTerminalCommandState(command.state) && !input.terminal) { + const freshClaim = input.runnerId && run.claimedBy === input.runnerId && heartbeatFresh(run.leaseExpiresAt, input.activeHeartbeatMaxAgeMs); + return { runId: input.runId, commandId: input.commandId, runnerId: input.runnerId, protectedActive: true, protectedReason: freshClaim ? "fresh-active-run" : "nonterminal-runner-resource", candidateKind: "inactive", lastActiveAtMs }; } const candidateKind: CandidateKind = command && isTerminalCommandState(command.state) && !input.terminal ? "idle" : input.terminal || (run !== null && isTerminalRunStatus(run.status)) ? "terminal" : "inactive"; return { runId: input.runId, commandId: input.commandId, runnerId: input.runnerId, protectedActive: false, protectedReason: null, candidateKind, lastActiveAtMs }; @@ -292,6 +298,32 @@ function compareCandidates(left: RunnerResourceEntry, right: RunnerResourceEntry return left.name.localeCompare(right.name); } +function protectedRunnerSummary(item: RunnerResourceEntry): JsonRecord { + return { + resourceKind: item.resourceKind, + name: item.name, + jobName: item.jobName, + runId: item.runId, + commandId: item.commandId, + runnerId: item.runnerId, + protectedReason: item.protectedReason, + terminal: item.terminal, + podPhase: item.podPhase, + lastActiveAt: new Date(item.lastActiveAtMs).toISOString(), + valuesPrinted: false, + }; +} + +function cleanupPolicySummary(): JsonRecord { + return { + mode: "db-ledger-and-k8s-observation", + forceActive: false, + activeProtection: "protect non-terminal DB run/command while Kubernetes runner resource is non-terminal; fresh heartbeat is not required during manager rolling recovery", + deletionOrder: "idle, terminal, inactive by lastActiveAt", + valuesPrinted: false, + }; +} + function labelSelector(labels: Record): string { return Object.entries(labels).map(([key, value]) => `${key}=${value}`).join(","); } diff --git a/src/runner/k8s-job.ts b/src/runner/k8s-job.ts index 79cd717..7ba5f78 100644 --- a/src/runner/k8s-job.ts +++ b/src/runner/k8s-job.ts @@ -8,6 +8,7 @@ const defaultBootRepoUrl = "http://git-mirror-http.devops-infra.svc.cluster.loca const defaultResourceBinPath = "/usr/local/bin"; const defaultCodexShellSandbox = "danger-full-access"; const defaultRunnerIdleTimeoutMs = 600_000; +const minimumTerminalArtifactTtlSeconds = 7 * 24 * 60 * 60; const fallbackRunnerEgressProxyUrl = "http://g14-provider-egress-proxy.unidesk.svc.cluster.local:18789"; const defaultRunnerNoProxyItems = [ "localhost", @@ -138,6 +139,7 @@ export function renderRunnerJobDryRun(options: RunnerJobRenderOptions): JsonReco workReady: staticWorkReadyCapabilitySummary(), retention: { ttlSecondsAfterFinished: render.ttlSecondsAfterFinished, + ttlPolicy: render.ttlPolicy, runnerIdleTimeoutMs: render.runnerIdleTimeoutMs, }, pollCommands: { @@ -149,7 +151,7 @@ export function renderRunnerJobDryRun(options: RunnerJobRenderOptions): JsonReco }; } -export function renderRunnerJobManifest(options: RunnerJobRenderOptions): { manifest: JsonRecord; namespace: string; jobName: string; runnerJobId: string; runnerId: string; attemptId: string; sourceCommit: string; serviceAccountName: string; secretRefs: CredentialProjection[]; toolCredentials: ToolCredentialProjection[]; warnings: string[]; ttlSecondsAfterFinished: number; runnerIdleTimeoutMs: number } { +export function renderRunnerJobManifest(options: RunnerJobRenderOptions): { manifest: JsonRecord; namespace: string; jobName: string; runnerJobId: string; runnerId: string; attemptId: string; sourceCommit: string; serviceAccountName: string; secretRefs: CredentialProjection[]; toolCredentials: ToolCredentialProjection[]; warnings: string[]; ttlSecondsAfterFinished: number; ttlPolicy: JsonRecord; runnerIdleTimeoutMs: number } { const namespace = options.namespace ?? "agentrun-v01"; const attemptId = options.attemptId ?? `attempt_${Date.now().toString(36)}`; const runnerId = options.runnerId ?? `runner_${shortHash(`${options.run.id}:${attemptId}:${options.commandId}`)}`; @@ -158,13 +160,14 @@ export function renderRunnerJobManifest(options: RunnerJobRenderOptions): { mani const serviceAccountName = options.serviceAccountName ?? "agentrun-v01-runner"; const jobNamePrefix = normalizeJobNamePrefix(options.jobNamePrefix ?? serviceAccountName); const lane = options.lane ?? process.env.AGENTRUN_LANE ?? "v0.1"; - const ttlSecondsAfterFinished = options.ttlSecondsAfterFinished ?? 86_400; + const warnings: string[] = []; + const ttlSecondsAfterFinished = normalizeTtlSecondsAfterFinished(options.ttlSecondsAfterFinished, warnings); + const ttlPolicy = terminalArtifactTtlPolicy(ttlSecondsAfterFinished); const runnerIdleTimeoutMs = normalizeRunnerIdleTimeoutMs(options.runnerIdleTimeoutMs); const jobName = `${jobNamePrefix}-${shortDnsHash(options.run.id, attemptId)}`; const secretRefs = credentialProjections(options.run, namespace); const toolCredentials = toolCredentialProjections(options.run, namespace); const sessionPvc = options.sessionPvc; - const warnings: string[] = []; if (secretRefs.length === 0) warnings.push("run executionPolicy.secretScope 未声明 provider SecretRef;runner 将按 secret-unavailable 上报,而不会降级直连外部凭据"); const env = runnerEnv(options, { namespace, jobName, runnerJobId, runnerId, attemptId, sourceCommit, secretRefs, toolCredentials, sessionPvc, runnerIdleTimeoutMs }); const manifest: JsonRecord = { @@ -178,6 +181,8 @@ export function renderRunnerJobManifest(options: RunnerJobRenderOptions): { mani "agentrun.pikastech.local/run-id": options.run.id, "agentrun.pikastech.local/command-id": options.commandId, "agentrun.pikastech.local/dry-run-render": String(options.dryRun === true), + "agentrun.pikastech.local/terminal-artifact-ttl-policy": "terminal-outbox-reconcile-retention", + "agentrun.pikastech.local/terminal-artifact-ttl-seconds": String(ttlSecondsAfterFinished), }, }, spec: { @@ -189,6 +194,8 @@ export function renderRunnerJobManifest(options: RunnerJobRenderOptions): { mani annotations: { "agentrun.pikastech.local/run-id": options.run.id, "agentrun.pikastech.local/command-id": options.commandId, + "agentrun.pikastech.local/terminal-artifact-ttl-policy": "terminal-outbox-reconcile-retention", + "agentrun.pikastech.local/terminal-artifact-ttl-seconds": String(ttlSecondsAfterFinished), }, }, spec: { @@ -229,7 +236,7 @@ export function renderRunnerJobManifest(options: RunnerJobRenderOptions): { mani }, }, }; - return { manifest, namespace, jobName, runnerJobId, runnerId, attemptId, sourceCommit, serviceAccountName, secretRefs, toolCredentials, warnings, ttlSecondsAfterFinished, runnerIdleTimeoutMs }; + return { manifest, namespace, jobName, runnerJobId, runnerId, attemptId, sourceCommit, serviceAccountName, secretRefs, toolCredentials, warnings, ttlSecondsAfterFinished, ttlPolicy, runnerIdleTimeoutMs }; } function runnerEnv(options: RunnerJobRenderOptions, context: { namespace: string; jobName: string; runnerJobId: string; runnerId: string; attemptId: string; sourceCommit: string; secretRefs: CredentialProjection[]; toolCredentials: ToolCredentialProjection[]; sessionPvc: RunnerSessionPvcOptions | undefined; runnerIdleTimeoutMs: number }): JsonRecord[] { @@ -286,6 +293,26 @@ function normalizeRunnerIdleTimeoutMs(value: number | undefined): number { return value; } +function normalizeTtlSecondsAfterFinished(value: number | undefined, warnings: string[]): number { + if (value === undefined) return minimumTerminalArtifactTtlSeconds; + if (!Number.isInteger(value) || value <= 0) throw new Error("ttlSecondsAfterFinished must be a positive integer"); + if (value < minimumTerminalArtifactTtlSeconds) { + warnings.push(`ttlSecondsAfterFinished ${value} is below terminal artifact retention minimum ${minimumTerminalArtifactTtlSeconds}; raised to preserve runner logs for reconciler recovery`); + return minimumTerminalArtifactTtlSeconds; + } + return value; +} + +function terminalArtifactTtlPolicy(ttlSecondsAfterFinished: number): JsonRecord { + return { + mode: "terminal-outbox-reconcile-retention", + minSeconds: minimumTerminalArtifactTtlSeconds, + ttlSecondsAfterFinished, + reason: "preserve runner Job logs and termination metadata for manager rolling recovery before Kubernetes TTL cleanup", + valuesPrinted: false, + }; +} + function codexShellSandbox(policy: ExecutionPolicy): string { if (policy.sandbox === "workspace-write") return defaultCodexShellSandbox; return policy.sandbox; diff --git a/src/selftest/cases/20-runner-k8s-job.ts b/src/selftest/cases/20-runner-k8s-job.ts index 911f1f1..9d50fc6 100644 --- a/src/selftest/cases/20-runner-k8s-job.ts +++ b/src/selftest/cases/20-runner-k8s-job.ts @@ -4,6 +4,7 @@ import path from "node:path"; import { startManagerServer } from "../../mgr/server.js"; import { MemoryAgentRunStore } from "../../mgr/store.js"; import { ManagerClient } from "../../mgr/client.js"; +import { enforceRunnerRetentionBeforeCreate } from "../../mgr/runner-retention.js"; import { renderRunnerJobDryRun } from "../../runner/k8s-job.js"; import type { JsonRecord, RunRecord } from "../../common/types.js"; import { assertNoSecretLeak, createRunWithCommand, loadArtificerImageRef, type SelfTestCase } from "../harness.js"; @@ -44,7 +45,7 @@ const selfTest: SelfTestCase = async (context) => { }); assert.equal(rendered.dryRun, true); assert.equal(rendered.mutation, false); - assert.equal(((rendered.retention as JsonRecord).ttlSecondsAfterFinished), 86_400); + assert.equal(((rendered.retention as JsonRecord).ttlSecondsAfterFinished), 604_800); assert.equal((rendered.jobIdentity as { serviceAccountName?: string }).serviceAccountName, "agentrun-v01-runner"); assertWorkReadySummary(rendered.workReady as JsonRecord); assertRunnerJobUsesWritableCodexHome(rendered.manifest as JsonRecord, context.codexHome, "codex-0", "/var/run/agentrun/secrets/codex-0"); @@ -214,7 +215,7 @@ process.exit(1); assert.equal((((created as JsonRecord).envImage as JsonRecord).digestPinned), true); assert.equal(((((created as JsonRecord).envImage as JsonRecord).imageRef as JsonRecord).kind), "env-image-dockerfile"); assertWorkReadySummary((created as JsonRecord).workReady as JsonRecord); - assert.equal(((created as JsonRecord).retention as JsonRecord).ttlSecondsAfterFinished, 86_400); + assert.equal(((created as JsonRecord).retention as JsonRecord).ttlSecondsAfterFinished, 604_800); assert.deepEqual((((created as JsonRecord).transientEnv as JsonRecord).names) as string[], ["HWLAB_API_KEY", "HWLAB_RUNTIME_API_URL", "HWLAB_RUNTIME_WEB_URL", "HWLAB_RUNTIME_NAMESPACE", "HWLAB_RUNTIME_LANE", "HWLAB_RUNTIME_ENDPOINT_SOURCE", "HWLAB_RUNTIME_ENDPOINT_LOCKED", "HWLAB_CODE_AGENT_ASSEMBLED_RUNTIME", "UNIDESK_MAIN_SERVER_IP"]); const transientEnvSecret = (created as JsonRecord).transientEnvSecret as JsonRecord; assert.match(String(transientEnvSecret.name), /^agentrun-v01-runner-env-[a-f0-9]{20}$/u); @@ -229,7 +230,7 @@ process.exit(1); const ownerPatch = JSON.parse(await readFile(patchedTransientEnvSecret, "utf8")) as JsonRecord; assert.deepEqual((ownerPatch.args as string[]).slice(0, 3), ["patch", "secret", String(transientEnvSecret.name)]); const manifest = JSON.parse(await readFile(createdManifest, "utf8")) as JsonRecord; - assert.equal((manifest.spec as JsonRecord).ttlSecondsAfterFinished, 86_400); + assert.equal((manifest.spec as JsonRecord).ttlSecondsAfterFinished, 604_800); assertRunnerJobUsesG14EgressProxy(manifest); assertRunnerJobUsesBoundedGitTransport({ manifest, gitTransport: (created as JsonRecord).gitTransport } as JsonRecord); assertRunnerJobUsesTransientEnvSecret(manifest, "HWLAB_API_KEY", String(transientEnvSecret.name)); @@ -377,6 +378,63 @@ process.exit(1); } finally { await new Promise((resolve) => serverWithRetention.server.close(() => resolve())); } + const protectedStore = new MemoryAgentRunStore(); + const protectedServer = await startManagerServer({ port: 0, host: "127.0.0.1", sourceCommit: "self-test", store: protectedStore }); + const protectedKubectl = path.join(context.tmp, "fake-kubectl-retention-protected.js"); + const protectedDeleted = path.join(context.tmp, "retention-protected-deleted.json"); + await writeFile(protectedDeleted, "[]\n"); + try { + const protectedClient = new ManagerClient(protectedServer.baseUrl); + const protectedItem = await createRunWithCommand(protectedClient, context, "protected active runner", "selftest-runner-retention-protected", 15_000); + protectedStore.claimRun(protectedItem.runId, "runner-protected", -60_000); + await writeFile(protectedKubectl, `#!/usr/bin/env bun +const args = Bun.argv.slice(2); +const deletedPath = ${JSON.stringify(protectedDeleted)}; +async function readDeleted() { try { return JSON.parse(await Bun.file(deletedPath).text()); } catch { return []; } } +async function writeDeleted(items) { await Bun.write(deletedPath, JSON.stringify(items, null, 2) + "\\n"); } +if (args[0] === "get" && args[1] === "jobs") { + console.log(JSON.stringify({ apiVersion: "v1", kind: "List", items: [{ + apiVersion: "batch/v1", + kind: "Job", + metadata: { + name: "agentrun-v02-runner-protected", + namespace: "agentrun-v02", + creationTimestamp: "2026-01-01T00:00:00Z", + labels: { "app.kubernetes.io/part-of": "agentrun", "app.kubernetes.io/name": "agentrun-runner", "app.kubernetes.io/component": "runner", "agentrun.pikastech.local/lane": "v0.2", "job-name": "agentrun-v02-runner-protected" }, + annotations: { "agentrun.pikastech.local/run-id": ${JSON.stringify(protectedItem.runId)}, "agentrun.pikastech.local/command-id": ${JSON.stringify(protectedItem.commandId)}, "agentrun.pikastech.local/runner-id": "runner-protected" }, + }, + status: {}, + }] })); + process.exit(0); +} +if (args[0] === "get" && args[1] === "pods") { console.log(JSON.stringify({ apiVersion: "v1", kind: "List", items: [] })); process.exit(0); } +if (args[0] === "delete") { const deleted = await readDeleted(); deleted.push({ resource: args[1], name: args[2], args }); await writeDeleted(deleted); console.log(String(args[1]) + "/" + String(args[2]) + " deleted"); process.exit(0); } +console.error("unsupported fake protected kubectl args: " + args.join(" ")); +process.exit(1); +`, "utf8"); + await chmod(protectedKubectl, 0o755); + await assert.rejects( + () => enforceRunnerRetentionBeforeCreate({ + store: protectedStore, + incomingRunnerCount: 1, + options: { + namespace: "agentrun-v02", + maxRunners: 1, + cleanupOrder: "oldest-inactive-last-active-first", + activeHeartbeatMaxAgeMs: 0, + matchLabels: { "app.kubernetes.io/part-of": "agentrun", "app.kubernetes.io/name": "agentrun-runner", "app.kubernetes.io/component": "runner" }, + jobNamePrefixes: ["agentrun-v02-runner"], + ageBasedCleanup: { enabled: false, maxAgeHours: null }, + kubectlCommand: protectedKubectl, + }, + }), + (error) => error instanceof Error && error.message.includes("runner retention cannot safely make room"), + ); + const protectedDeletedItems = JSON.parse(await readFile(protectedDeleted, "utf8")) as JsonRecord[]; + assert.equal(protectedDeletedItems.length, 0); + } finally { + await new Promise((resolve) => protectedServer.server.close(() => resolve())); + } const sessionRunRecord: RunRecord = { id: "run-selftest-session-pvc", tenantId: "unidesk", @@ -419,7 +477,7 @@ process.exit(1); assert.equal(envMap.get("AGENTRUN_SESSION_PVC_NAMESPACE"), "agentrun-v01"); assert.equal(envMap.get("AGENTRUN_SESSION_PVC_MOUNT_PATH"), "/home/agentrun/.codex-codex/sessions"); assert.equal(envMap.get("AGENTRUN_CODEX_ROLLOUT_SUBDIR"), "sessions"); - return { name: "runner-k8s-job", tests: ["runner-k8s-job-dry-run", "runner-k8s-job-codex-shell-sandbox-env", "runner-k8s-job-g14-egress-proxy-env", "runner-k8s-job-bounded-git-transport-env", "runner-k8s-job-deepseek-profile-dry-run", "runner-k8s-job-minimax-m3-profile-dry-run", "runner-k8s-job-dsflash-go-profile-dry-run", "runner-k8s-job-dsflash-go-legacy-secretref-normalized", "runner-k8s-job-create-api", "runner-k8s-job-retention-ttl", "runner-job-transient-env", "runner-job-transient-env-secretref", "runner-job-tool-credential-env", "runner-job-unidesk-ssh-tool-credential-env", "runner-job-tool-credential-volume", "runner-job-unidesk-ssh-endpoint-auto-env", "runner-job-unidesk-ssh-transient-env-denied", "runner-job-pre-create-retention", "runner-k8s-job-session-pvc-volume-and-env"] }; + return { name: "runner-k8s-job", tests: ["runner-k8s-job-dry-run", "runner-k8s-job-codex-shell-sandbox-env", "runner-k8s-job-g14-egress-proxy-env", "runner-k8s-job-bounded-git-transport-env", "runner-k8s-job-deepseek-profile-dry-run", "runner-k8s-job-minimax-m3-profile-dry-run", "runner-k8s-job-dsflash-go-profile-dry-run", "runner-k8s-job-dsflash-go-legacy-secretref-normalized", "runner-k8s-job-create-api", "runner-k8s-job-retention-ttl", "runner-job-transient-env", "runner-job-transient-env-secretref", "runner-job-tool-credential-env", "runner-job-unidesk-ssh-tool-credential-env", "runner-job-tool-credential-volume", "runner-job-unidesk-ssh-endpoint-auto-env", "runner-job-unidesk-ssh-transient-env-denied", "runner-job-pre-create-retention", "runner-job-pre-create-retention-protects-stale-active", "runner-k8s-job-session-pvc-volume-and-env"] }; } finally { await new Promise((resolve) => server.server.close(() => resolve())); }