fix: protect runner cleanup artifacts

This commit is contained in:
lyon
2026-06-20 16:27:25 +08:00
parent f95822e8ba
commit 8d297a9823
4 changed files with 130 additions and 12 deletions
+1
View File
@@ -234,6 +234,7 @@ export async function createKubernetesRunnerJob(options: { store: AgentRunStore;
workReady: staticWorkReadyCapabilitySummary(),
retention: {
ttlSecondsAfterFinished: render.ttlSecondsAfterFinished,
ttlPolicy: render.ttlPolicy,
runnerIdleTimeoutMs: render.runnerIdleTimeoutMs,
preCreateCleanup: preCreateRetention,
},
+36 -4
View File
@@ -32,6 +32,7 @@ export interface RunnerRetentionSummary extends JsonRecord {
orphanNonTerminalRunnerPodCountBefore: number;
inactiveCandidateCount: number;
protectedActiveRunnerCount: number;
protectedActiveRunners: JsonRecord[];
selectedRunnerCount: number;
deletedRunnerJobCount: number;
deletedRunnerPodCount: number;
@@ -90,7 +91,8 @@ export async function enforceRunnerRetentionBeforeCreate(input: { store: AgentRu
.filter((item) => !item.protectedActive)
.sort(compareCandidates);
const selected = candidates.slice(0, requiredDeleteCount);
const protectedActiveRunnerCount = before.jobs.filter((item) => item.protectedActive).length + before.pods.filter((item) => item.protectedActive).length;
const protectedActiveRunners = [...before.jobs, ...before.pods].filter((item) => item.protectedActive);
const protectedActiveRunnerCount = protectedActiveRunners.length;
if (requiredDeleteCount > 0 && selected.length < requiredDeleteCount) {
throw new AgentRunError("infra-failed", "runner retention cannot safely make room for a new runner", {
httpStatus: 409,
@@ -103,7 +105,9 @@ export async function enforceRunnerRetentionBeforeCreate(input: { store: AgentRu
requiredDeleteCount,
inactiveCandidateCount: candidates.length,
protectedActiveRunnerCount,
protectedActiveRunners: protectedActiveRunners.map(protectedRunnerSummary),
activeRunRisk: protectedActiveRunnerCount > 0,
cleanupPolicy: cleanupPolicySummary(),
valuesPrinted: false,
},
});
@@ -135,12 +139,14 @@ export async function enforceRunnerRetentionBeforeCreate(input: { store: AgentRu
orphanNonTerminalRunnerPodCountBefore: before.orphanNonTerminalRunnerPodCount,
inactiveCandidateCount: candidates.length,
protectedActiveRunnerCount,
protectedActiveRunners: protectedActiveRunners.map(protectedRunnerSummary),
selectedRunnerCount: selected.length,
deletedRunnerJobCount,
deletedRunnerPodCount,
deletedAssociatedResourceCount,
activeRunRisk: protectedActiveRunnerCount > 0,
reason: requiredDeleteCount === 0 ? "within-limit" : "pre-create-retention",
cleanupPolicy: cleanupPolicySummary(),
selected: selected.map((item) => ({ resourceKind: item.resourceKind, name: item.name, jobName: item.jobName, candidateKind: item.candidateKind, protectedActive: false, lastActiveAt: new Date(item.lastActiveAtMs).toISOString(), valuesPrinted: false })),
valuesPrinted: false,
} satisfies RunnerRetentionSummary;
@@ -236,9 +242,9 @@ async function activityFor(store: AgentRunStore, input: { runId: string | null;
try { command = await store.getCommand(input.commandId); } catch { command = null; }
}
const lastActiveAtMs = Math.max(input.createdAtMs, isoMs(run?.updatedAt), isoMs(command?.updatedAt), isoMs(run?.leaseExpiresAt));
const active = run && !isTerminalRunStatus(run.status) && command && !isTerminalCommandState(command.state) && input.runnerId && run.claimedBy === input.runnerId && heartbeatFresh(run.leaseExpiresAt, input.activeHeartbeatMaxAgeMs);
if (active) {
return { runId: input.runId, commandId: input.commandId, runnerId: input.runnerId, protectedActive: true, protectedReason: "fresh-active-run", candidateKind: "inactive", lastActiveAtMs };
if (run && command && !isTerminalRunStatus(run.status) && !isTerminalCommandState(command.state) && !input.terminal) {
const freshClaim = input.runnerId && run.claimedBy === input.runnerId && heartbeatFresh(run.leaseExpiresAt, input.activeHeartbeatMaxAgeMs);
return { runId: input.runId, commandId: input.commandId, runnerId: input.runnerId, protectedActive: true, protectedReason: freshClaim ? "fresh-active-run" : "nonterminal-runner-resource", candidateKind: "inactive", lastActiveAtMs };
}
const candidateKind: CandidateKind = command && isTerminalCommandState(command.state) && !input.terminal ? "idle" : input.terminal || (run !== null && isTerminalRunStatus(run.status)) ? "terminal" : "inactive";
return { runId: input.runId, commandId: input.commandId, runnerId: input.runnerId, protectedActive: false, protectedReason: null, candidateKind, lastActiveAtMs };
@@ -292,6 +298,32 @@ function compareCandidates(left: RunnerResourceEntry, right: RunnerResourceEntry
return left.name.localeCompare(right.name);
}
function protectedRunnerSummary(item: RunnerResourceEntry): JsonRecord {
return {
resourceKind: item.resourceKind,
name: item.name,
jobName: item.jobName,
runId: item.runId,
commandId: item.commandId,
runnerId: item.runnerId,
protectedReason: item.protectedReason,
terminal: item.terminal,
podPhase: item.podPhase,
lastActiveAt: new Date(item.lastActiveAtMs).toISOString(),
valuesPrinted: false,
};
}
function cleanupPolicySummary(): JsonRecord {
return {
mode: "db-ledger-and-k8s-observation",
forceActive: false,
activeProtection: "protect non-terminal DB run/command while Kubernetes runner resource is non-terminal; fresh heartbeat is not required during manager rolling recovery",
deletionOrder: "idle, terminal, inactive by lastActiveAt",
valuesPrinted: false,
};
}
function labelSelector(labels: Record<string, string>): string {
return Object.entries(labels).map(([key, value]) => `${key}=${value}`).join(",");
}
+31 -4
View File
@@ -8,6 +8,7 @@ const defaultBootRepoUrl = "http://git-mirror-http.devops-infra.svc.cluster.loca
const defaultResourceBinPath = "/usr/local/bin";
const defaultCodexShellSandbox = "danger-full-access";
const defaultRunnerIdleTimeoutMs = 600_000;
const minimumTerminalArtifactTtlSeconds = 7 * 24 * 60 * 60;
const fallbackRunnerEgressProxyUrl = "http://g14-provider-egress-proxy.unidesk.svc.cluster.local:18789";
const defaultRunnerNoProxyItems = [
"localhost",
@@ -138,6 +139,7 @@ export function renderRunnerJobDryRun(options: RunnerJobRenderOptions): JsonReco
workReady: staticWorkReadyCapabilitySummary(),
retention: {
ttlSecondsAfterFinished: render.ttlSecondsAfterFinished,
ttlPolicy: render.ttlPolicy,
runnerIdleTimeoutMs: render.runnerIdleTimeoutMs,
},
pollCommands: {
@@ -149,7 +151,7 @@ export function renderRunnerJobDryRun(options: RunnerJobRenderOptions): JsonReco
};
}
export function renderRunnerJobManifest(options: RunnerJobRenderOptions): { manifest: JsonRecord; namespace: string; jobName: string; runnerJobId: string; runnerId: string; attemptId: string; sourceCommit: string; serviceAccountName: string; secretRefs: CredentialProjection[]; toolCredentials: ToolCredentialProjection[]; warnings: string[]; ttlSecondsAfterFinished: number; runnerIdleTimeoutMs: number } {
export function renderRunnerJobManifest(options: RunnerJobRenderOptions): { manifest: JsonRecord; namespace: string; jobName: string; runnerJobId: string; runnerId: string; attemptId: string; sourceCommit: string; serviceAccountName: string; secretRefs: CredentialProjection[]; toolCredentials: ToolCredentialProjection[]; warnings: string[]; ttlSecondsAfterFinished: number; ttlPolicy: JsonRecord; runnerIdleTimeoutMs: number } {
const namespace = options.namespace ?? "agentrun-v01";
const attemptId = options.attemptId ?? `attempt_${Date.now().toString(36)}`;
const runnerId = options.runnerId ?? `runner_${shortHash(`${options.run.id}:${attemptId}:${options.commandId}`)}`;
@@ -158,13 +160,14 @@ export function renderRunnerJobManifest(options: RunnerJobRenderOptions): { mani
const serviceAccountName = options.serviceAccountName ?? "agentrun-v01-runner";
const jobNamePrefix = normalizeJobNamePrefix(options.jobNamePrefix ?? serviceAccountName);
const lane = options.lane ?? process.env.AGENTRUN_LANE ?? "v0.1";
const ttlSecondsAfterFinished = options.ttlSecondsAfterFinished ?? 86_400;
const warnings: string[] = [];
const ttlSecondsAfterFinished = normalizeTtlSecondsAfterFinished(options.ttlSecondsAfterFinished, warnings);
const ttlPolicy = terminalArtifactTtlPolicy(ttlSecondsAfterFinished);
const runnerIdleTimeoutMs = normalizeRunnerIdleTimeoutMs(options.runnerIdleTimeoutMs);
const jobName = `${jobNamePrefix}-${shortDnsHash(options.run.id, attemptId)}`;
const secretRefs = credentialProjections(options.run, namespace);
const toolCredentials = toolCredentialProjections(options.run, namespace);
const sessionPvc = options.sessionPvc;
const warnings: string[] = [];
if (secretRefs.length === 0) warnings.push("run executionPolicy.secretScope 未声明 provider SecretRefrunner 将按 secret-unavailable 上报,而不会降级直连外部凭据");
const env = runnerEnv(options, { namespace, jobName, runnerJobId, runnerId, attemptId, sourceCommit, secretRefs, toolCredentials, sessionPvc, runnerIdleTimeoutMs });
const manifest: JsonRecord = {
@@ -178,6 +181,8 @@ export function renderRunnerJobManifest(options: RunnerJobRenderOptions): { mani
"agentrun.pikastech.local/run-id": options.run.id,
"agentrun.pikastech.local/command-id": options.commandId,
"agentrun.pikastech.local/dry-run-render": String(options.dryRun === true),
"agentrun.pikastech.local/terminal-artifact-ttl-policy": "terminal-outbox-reconcile-retention",
"agentrun.pikastech.local/terminal-artifact-ttl-seconds": String(ttlSecondsAfterFinished),
},
},
spec: {
@@ -189,6 +194,8 @@ export function renderRunnerJobManifest(options: RunnerJobRenderOptions): { mani
annotations: {
"agentrun.pikastech.local/run-id": options.run.id,
"agentrun.pikastech.local/command-id": options.commandId,
"agentrun.pikastech.local/terminal-artifact-ttl-policy": "terminal-outbox-reconcile-retention",
"agentrun.pikastech.local/terminal-artifact-ttl-seconds": String(ttlSecondsAfterFinished),
},
},
spec: {
@@ -229,7 +236,7 @@ export function renderRunnerJobManifest(options: RunnerJobRenderOptions): { mani
},
},
};
return { manifest, namespace, jobName, runnerJobId, runnerId, attemptId, sourceCommit, serviceAccountName, secretRefs, toolCredentials, warnings, ttlSecondsAfterFinished, runnerIdleTimeoutMs };
return { manifest, namespace, jobName, runnerJobId, runnerId, attemptId, sourceCommit, serviceAccountName, secretRefs, toolCredentials, warnings, ttlSecondsAfterFinished, ttlPolicy, runnerIdleTimeoutMs };
}
function runnerEnv(options: RunnerJobRenderOptions, context: { namespace: string; jobName: string; runnerJobId: string; runnerId: string; attemptId: string; sourceCommit: string; secretRefs: CredentialProjection[]; toolCredentials: ToolCredentialProjection[]; sessionPvc: RunnerSessionPvcOptions | undefined; runnerIdleTimeoutMs: number }): JsonRecord[] {
@@ -286,6 +293,26 @@ function normalizeRunnerIdleTimeoutMs(value: number | undefined): number {
return value;
}
function normalizeTtlSecondsAfterFinished(value: number | undefined, warnings: string[]): number {
if (value === undefined) return minimumTerminalArtifactTtlSeconds;
if (!Number.isInteger(value) || value <= 0) throw new Error("ttlSecondsAfterFinished must be a positive integer");
if (value < minimumTerminalArtifactTtlSeconds) {
warnings.push(`ttlSecondsAfterFinished ${value} is below terminal artifact retention minimum ${minimumTerminalArtifactTtlSeconds}; raised to preserve runner logs for reconciler recovery`);
return minimumTerminalArtifactTtlSeconds;
}
return value;
}
function terminalArtifactTtlPolicy(ttlSecondsAfterFinished: number): JsonRecord {
return {
mode: "terminal-outbox-reconcile-retention",
minSeconds: minimumTerminalArtifactTtlSeconds,
ttlSecondsAfterFinished,
reason: "preserve runner Job logs and termination metadata for manager rolling recovery before Kubernetes TTL cleanup",
valuesPrinted: false,
};
}
function codexShellSandbox(policy: ExecutionPolicy): string {
if (policy.sandbox === "workspace-write") return defaultCodexShellSandbox;
return policy.sandbox;
+62 -4
View File
@@ -4,6 +4,7 @@ import path from "node:path";
import { startManagerServer } from "../../mgr/server.js";
import { MemoryAgentRunStore } from "../../mgr/store.js";
import { ManagerClient } from "../../mgr/client.js";
import { enforceRunnerRetentionBeforeCreate } from "../../mgr/runner-retention.js";
import { renderRunnerJobDryRun } from "../../runner/k8s-job.js";
import type { JsonRecord, RunRecord } from "../../common/types.js";
import { assertNoSecretLeak, createRunWithCommand, loadArtificerImageRef, type SelfTestCase } from "../harness.js";
@@ -44,7 +45,7 @@ const selfTest: SelfTestCase = async (context) => {
});
assert.equal(rendered.dryRun, true);
assert.equal(rendered.mutation, false);
assert.equal(((rendered.retention as JsonRecord).ttlSecondsAfterFinished), 86_400);
assert.equal(((rendered.retention as JsonRecord).ttlSecondsAfterFinished), 604_800);
assert.equal((rendered.jobIdentity as { serviceAccountName?: string }).serviceAccountName, "agentrun-v01-runner");
assertWorkReadySummary(rendered.workReady as JsonRecord);
assertRunnerJobUsesWritableCodexHome(rendered.manifest as JsonRecord, context.codexHome, "codex-0", "/var/run/agentrun/secrets/codex-0");
@@ -214,7 +215,7 @@ process.exit(1);
assert.equal((((created as JsonRecord).envImage as JsonRecord).digestPinned), true);
assert.equal(((((created as JsonRecord).envImage as JsonRecord).imageRef as JsonRecord).kind), "env-image-dockerfile");
assertWorkReadySummary((created as JsonRecord).workReady as JsonRecord);
assert.equal(((created as JsonRecord).retention as JsonRecord).ttlSecondsAfterFinished, 86_400);
assert.equal(((created as JsonRecord).retention as JsonRecord).ttlSecondsAfterFinished, 604_800);
assert.deepEqual((((created as JsonRecord).transientEnv as JsonRecord).names) as string[], ["HWLAB_API_KEY", "HWLAB_RUNTIME_API_URL", "HWLAB_RUNTIME_WEB_URL", "HWLAB_RUNTIME_NAMESPACE", "HWLAB_RUNTIME_LANE", "HWLAB_RUNTIME_ENDPOINT_SOURCE", "HWLAB_RUNTIME_ENDPOINT_LOCKED", "HWLAB_CODE_AGENT_ASSEMBLED_RUNTIME", "UNIDESK_MAIN_SERVER_IP"]);
const transientEnvSecret = (created as JsonRecord).transientEnvSecret as JsonRecord;
assert.match(String(transientEnvSecret.name), /^agentrun-v01-runner-env-[a-f0-9]{20}$/u);
@@ -229,7 +230,7 @@ process.exit(1);
const ownerPatch = JSON.parse(await readFile(patchedTransientEnvSecret, "utf8")) as JsonRecord;
assert.deepEqual((ownerPatch.args as string[]).slice(0, 3), ["patch", "secret", String(transientEnvSecret.name)]);
const manifest = JSON.parse(await readFile(createdManifest, "utf8")) as JsonRecord;
assert.equal((manifest.spec as JsonRecord).ttlSecondsAfterFinished, 86_400);
assert.equal((manifest.spec as JsonRecord).ttlSecondsAfterFinished, 604_800);
assertRunnerJobUsesG14EgressProxy(manifest);
assertRunnerJobUsesBoundedGitTransport({ manifest, gitTransport: (created as JsonRecord).gitTransport } as JsonRecord);
assertRunnerJobUsesTransientEnvSecret(manifest, "HWLAB_API_KEY", String(transientEnvSecret.name));
@@ -377,6 +378,63 @@ process.exit(1);
} finally {
await new Promise<void>((resolve) => serverWithRetention.server.close(() => resolve()));
}
const protectedStore = new MemoryAgentRunStore();
const protectedServer = await startManagerServer({ port: 0, host: "127.0.0.1", sourceCommit: "self-test", store: protectedStore });
const protectedKubectl = path.join(context.tmp, "fake-kubectl-retention-protected.js");
const protectedDeleted = path.join(context.tmp, "retention-protected-deleted.json");
await writeFile(protectedDeleted, "[]\n");
try {
const protectedClient = new ManagerClient(protectedServer.baseUrl);
const protectedItem = await createRunWithCommand(protectedClient, context, "protected active runner", "selftest-runner-retention-protected", 15_000);
protectedStore.claimRun(protectedItem.runId, "runner-protected", -60_000);
await writeFile(protectedKubectl, `#!/usr/bin/env bun
const args = Bun.argv.slice(2);
const deletedPath = ${JSON.stringify(protectedDeleted)};
async function readDeleted() { try { return JSON.parse(await Bun.file(deletedPath).text()); } catch { return []; } }
async function writeDeleted(items) { await Bun.write(deletedPath, JSON.stringify(items, null, 2) + "\\n"); }
if (args[0] === "get" && args[1] === "jobs") {
console.log(JSON.stringify({ apiVersion: "v1", kind: "List", items: [{
apiVersion: "batch/v1",
kind: "Job",
metadata: {
name: "agentrun-v02-runner-protected",
namespace: "agentrun-v02",
creationTimestamp: "2026-01-01T00:00:00Z",
labels: { "app.kubernetes.io/part-of": "agentrun", "app.kubernetes.io/name": "agentrun-runner", "app.kubernetes.io/component": "runner", "agentrun.pikastech.local/lane": "v0.2", "job-name": "agentrun-v02-runner-protected" },
annotations: { "agentrun.pikastech.local/run-id": ${JSON.stringify(protectedItem.runId)}, "agentrun.pikastech.local/command-id": ${JSON.stringify(protectedItem.commandId)}, "agentrun.pikastech.local/runner-id": "runner-protected" },
},
status: {},
}] }));
process.exit(0);
}
if (args[0] === "get" && args[1] === "pods") { console.log(JSON.stringify({ apiVersion: "v1", kind: "List", items: [] })); process.exit(0); }
if (args[0] === "delete") { const deleted = await readDeleted(); deleted.push({ resource: args[1], name: args[2], args }); await writeDeleted(deleted); console.log(String(args[1]) + "/" + String(args[2]) + " deleted"); process.exit(0); }
console.error("unsupported fake protected kubectl args: " + args.join(" "));
process.exit(1);
`, "utf8");
await chmod(protectedKubectl, 0o755);
await assert.rejects(
() => enforceRunnerRetentionBeforeCreate({
store: protectedStore,
incomingRunnerCount: 1,
options: {
namespace: "agentrun-v02",
maxRunners: 1,
cleanupOrder: "oldest-inactive-last-active-first",
activeHeartbeatMaxAgeMs: 0,
matchLabels: { "app.kubernetes.io/part-of": "agentrun", "app.kubernetes.io/name": "agentrun-runner", "app.kubernetes.io/component": "runner" },
jobNamePrefixes: ["agentrun-v02-runner"],
ageBasedCleanup: { enabled: false, maxAgeHours: null },
kubectlCommand: protectedKubectl,
},
}),
(error) => error instanceof Error && error.message.includes("runner retention cannot safely make room"),
);
const protectedDeletedItems = JSON.parse(await readFile(protectedDeleted, "utf8")) as JsonRecord[];
assert.equal(protectedDeletedItems.length, 0);
} finally {
await new Promise<void>((resolve) => protectedServer.server.close(() => resolve()));
}
const sessionRunRecord: RunRecord = {
id: "run-selftest-session-pvc",
tenantId: "unidesk",
@@ -419,7 +477,7 @@ process.exit(1);
assert.equal(envMap.get("AGENTRUN_SESSION_PVC_NAMESPACE"), "agentrun-v01");
assert.equal(envMap.get("AGENTRUN_SESSION_PVC_MOUNT_PATH"), "/home/agentrun/.codex-codex/sessions");
assert.equal(envMap.get("AGENTRUN_CODEX_ROLLOUT_SUBDIR"), "sessions");
return { name: "runner-k8s-job", tests: ["runner-k8s-job-dry-run", "runner-k8s-job-codex-shell-sandbox-env", "runner-k8s-job-g14-egress-proxy-env", "runner-k8s-job-bounded-git-transport-env", "runner-k8s-job-deepseek-profile-dry-run", "runner-k8s-job-minimax-m3-profile-dry-run", "runner-k8s-job-dsflash-go-profile-dry-run", "runner-k8s-job-dsflash-go-legacy-secretref-normalized", "runner-k8s-job-create-api", "runner-k8s-job-retention-ttl", "runner-job-transient-env", "runner-job-transient-env-secretref", "runner-job-tool-credential-env", "runner-job-unidesk-ssh-tool-credential-env", "runner-job-tool-credential-volume", "runner-job-unidesk-ssh-endpoint-auto-env", "runner-job-unidesk-ssh-transient-env-denied", "runner-job-pre-create-retention", "runner-k8s-job-session-pvc-volume-and-env"] };
return { name: "runner-k8s-job", tests: ["runner-k8s-job-dry-run", "runner-k8s-job-codex-shell-sandbox-env", "runner-k8s-job-g14-egress-proxy-env", "runner-k8s-job-bounded-git-transport-env", "runner-k8s-job-deepseek-profile-dry-run", "runner-k8s-job-minimax-m3-profile-dry-run", "runner-k8s-job-dsflash-go-profile-dry-run", "runner-k8s-job-dsflash-go-legacy-secretref-normalized", "runner-k8s-job-create-api", "runner-k8s-job-retention-ttl", "runner-job-transient-env", "runner-job-transient-env-secretref", "runner-job-tool-credential-env", "runner-job-unidesk-ssh-tool-credential-env", "runner-job-tool-credential-volume", "runner-job-unidesk-ssh-endpoint-auto-env", "runner-job-unidesk-ssh-transient-env-denied", "runner-job-pre-create-retention", "runner-job-pre-create-retention-protects-stale-active", "runner-k8s-job-session-pvc-volume-and-env"] };
} finally {
await new Promise<void>((resolve) => server.server.close(() => resolve()));
}