fix: add JD01 GC retention controls

This commit is contained in:
Codex
2026-07-05 04:18:22 +00:00
parent e956a0ec2a
commit ab3566435c
18 changed files with 2205 additions and 1240 deletions
+1 -1
View File
@@ -4,7 +4,7 @@ UniDesk 是一个以主 server 为统一入口的分布式工作平台。本文
## P0: 文件体积与脚本分流 ## P0: 文件体积与脚本分流
- P0: 任何源码/CLI 文件超过 3000 行必须先按职责拆分再继续,禁止继续追加绕过 - P0: 任何源码/CLI 文件超过 3000 行必须先按职责差分拆到 2000 行以下再继续,禁止卡在 3000 行边界反复触发
- P0: 禁止把 shell/Node/Python 等脚本作为大段字符串内嵌;脚本必须放入原生后缀文件(如 `.sh`/`.mjs`/`.py`)并从文件加载。 - P0: 禁止把 shell/Node/Python 等脚本作为大段字符串内嵌;脚本必须放入原生后缀文件(如 `.sh`/`.mjs`/`.py`)并从文件加载。
## P0: 主 worktree 同步提交第一原则 ## P0: 主 worktree 同步提交第一原则
+8 -1
View File
@@ -719,7 +719,7 @@ controlPlane:
- hyueapi.com - hyueapi.com
- .hyueapi.com - .hyueapi.com
retention: retention:
maxRunners: 20 maxRunners: 3
cleanupOrder: oldest-inactive-last-active-first cleanupOrder: oldest-inactive-last-active-first
activeHeartbeatMaxAgeMs: 900000 activeHeartbeatMaxAgeMs: 900000
selectors: selectors:
@@ -734,6 +734,13 @@ controlPlane:
ageBasedCleanup: ageBasedCleanup:
enabled: false enabled: false
maxAgeHours: 48 maxAgeHours: 48
sessionPvcRetention:
enabled: true
prefixes:
- agentrun-v01-session-
- agentrun-v02-session-
- agentrun-jd01-v02-session-
maxDeletePerRun: 1000
cancelLifecycle: cancelLifecycle:
deliveryMode: manager-epoch deliveryMode: manager-epoch
gracefulAbortMs: 15000 gracefulAbortMs: 15000
+24 -1
View File
@@ -133,8 +133,31 @@ gc:
hwlabNode: JD01 hwlabNode: JD01
hwlabLane: v03 hwlabLane: v03
agentrunNode: JD01 agentrunNode: JD01
agentrunLane: v02 agentrunLane: jd01-v02
limit: 80 limit: 80
containerdImageCache:
enabled: true
runtimeEndpoint: unix:///run/k3s/containerd/containerd.sock
namespace: k8s.io
ciNamespaces:
- hwlab-ci
- agentrun-ci
hostContainerdCache:
enabled: true
root: /var/lib/containerd
address: /run/containerd/containerd.sock
namespaces:
- default
orphanCleanup:
enabled: true
overlaySnapshotsRoot: /var/lib/containerd/io.containerd.snapshotter.v1.overlayfs/snapshots
contentBlobRoot: /var/lib/containerd/io.containerd.content.v1.content/blobs/sha256
localPathStorage:
enabled: true
root: /var/lib/rancher/k3s/storage
orphanDirPrefixes:
- pvc-
orphanMinAgeMinutes: 0
policyTimer: policyTimer:
enabled: true enabled: true
name: unidesk-jd01-low-risk-gc name: unidesk-jd01-low-risk-gc
+2
View File
@@ -14,6 +14,8 @@ Local worktrees, D601 runtime files, copied scripts, copied images, ad-hoc Kuber
When stable release lanes such as `release/v1` are enabled, the desired-state ref must be explicit in the command, job log and deploy output. Until that support exists, commands that are documented to read `origin/master:deploy.json` must keep doing so and must not silently switch to another branch or a dirty manifest. When stable release lanes such as `release/v1` are enabled, the desired-state ref must be explicit in the command, job log and deploy output. Until that support exists, commands that are documented to read `origin/master:deploy.json` must keep doing so and must not silently switch to another branch or a dirty manifest.
Source and CLI files must not be kept near the 3000-line split boundary. Once a file exceeds 3000 lines, split it by responsibility until the original file is below 2000 lines before continuing feature or fix work. Do not make token-preserving micro-edits that leave the file just under or exactly at 3000 lines; that only guarantees the next small change will trigger the same split problem again.
## Prohibited Deployment Truth ## Prohibited Deployment Truth
The following practices are not acceptable as the long-term or hidden source of a working environment: The following practices are not acceptable as the long-term or hidden source of a working environment:
+8
View File
@@ -93,6 +93,14 @@ JD01 远端 plan 必须适配短连接:`snapshot` 和轻量 `plan` 返回有
JD01 PVC 归因必须按 YAML 配置的 namespace 集合读取 k8s API,不得复用 G14 专属 namespace 硬编码。报告至少包含 namespace、PVC、PV、host path、requested size、estimated actual bytes、active mount pods、owner/session/PipelineRun/runId、phase 和 reclaim policy。默认只做 plan 和归因;删除 PVC/PV、local-path host path、k3s storage、containerd snapshot/blob 或 workload 对象必须通过对应高层 retention 子命令和 GitOps/运行面 owner 判定,不能由 remote GC 扩大成 raw `kubectl delete` 或 host path 删除。 JD01 PVC 归因必须按 YAML 配置的 namespace 集合读取 k8s API,不得复用 G14 专属 namespace 硬编码。报告至少包含 namespace、PVC、PV、host path、requested size、estimated actual bytes、active mount pods、owner/session/PipelineRun/runId、phase 和 reclaim policy。默认只做 plan 和归因;删除 PVC/PV、local-path host path、k3s storage、containerd snapshot/blob 或 workload 对象必须通过对应高层 retention 子命令和 GitOps/运行面 owner 判定,不能由 remote GC 扩大成 raw `kubectl delete` 或 host path 删除。
JD01/AgentRun 这类 PVC retention 确认入口必须适配短连接:确认步骤只提交经过 plan 选中的 Kubernetes 删除请求并快速返回,不能等待 local-path PV 后端同步回收完成;收敛状态通过下一次 dry-run、`gc remote JD01 status` 或专用 status 子命令查询。若一次提交在 transport 窗口内仍不稳定,应降低 YAML/CLI 批量,而不是改成手工 raw kubectl 或 host path 删除。
JD01 local-path storage 中没有 PV 引用的 orphan 目录只能通过 `gc remote JD01 plan|run --include-local-path-orphans` 进入候选。该入口必须从 YAML 读取 storage root、目录前缀 allowlist 和年龄策略,只允许删除 root 的直接子目录,且执行前重新确认无 PV 引用、无 symlink、无打开 fd/cwd;不得把它扩大成通用 `/var/lib/rancher/k3s/storage` 清空或 raw host path 删除。
JD01 host containerd 只能通过 `gc remote JD01 plan|run --include-host-containerd-cache` 进入候选。该入口必须从 YAML 读取 containerd root、socket address 和 namespace allowlist;只有 host containerd 目标 namespace 中没有 task/container 时才允许执行 `ctr images prune --all`,不得直接删除 `/var/lib/containerd` 下的 content、snapshot 或 metadata 路径。
当 host containerd 的 `ctr` 元数据中 images、containers、tasks、leases、snapshots 和 content 全为空,但 YAML allowlist 下仍残留 overlay snapshot 目录或 content blob 文件时,才能把它们分类为 orphan state。orphan state 清理仍必须通过 `--include-host-containerd-cache` 的 plan/run,执行前重新检查元数据为空、路径在 YAML root 下、名称匹配受控形态、无 symlink、无打开 fd/cwd;不得删除 metadata DB 或扩大到 containerd root。
JD01 Web observe artifact 是一等 GC 对象。state root 必须来自 YAML;候选按 run 聚合并读取 `manifest.json``heartbeat.json``pid`、report sha 和 top files。年龄判定以 manifest/heartbeat 的 started/completed/updated 字段、pid 存活和打开 fd 检查为准,不以目录 mtime 为唯一依据,因为手动 GC 或目录遍历可能刷新 mtime。active run、pid alive、open fd、未生成必要 report 的 run 均为 protected。safe 候选只覆盖超过 YAML retention 且可重建的 raw samples、browser-process、network/trace、screenshot 等大 artifact;长期保留 report summary、report json/md、最终截图或诊断摘要由 YAML cap/retention 策略控制。 JD01 Web observe artifact 是一等 GC 对象。state root 必须来自 YAML;候选按 run 聚合并读取 `manifest.json``heartbeat.json``pid`、report sha 和 top files。年龄判定以 manifest/heartbeat 的 started/completed/updated 字段、pid 存活和打开 fd 检查为准,不以目录 mtime 为唯一依据,因为手动 GC 或目录遍历可能刷新 mtime。active run、pid alive、open fd、未生成必要 report 的 run 均为 protected。safe 候选只覆盖超过 YAML retention 且可重建的 raw samples、browser-process、network/trace、screenshot 等大 artifact;长期保留 report summary、report json/md、最终截图或诊断摘要由 YAML cap/retention 策略控制。
JD01 Chrome 内存治理应优先管理 observer runner 生命周期,而不是孤立清理 Chrome 进程。Web probe sentinel 和 quick-verify 启动 observer 后,所有终态路径(成功、blocked、失败、timeout、异常)都必须执行 YAML 控制的 `web-probe observe stop`/force stop 流程,并验证对应 runner/Chrome process tree 退出;observe runner 自身也必须从 scenario/YAML 获得最大运行时长或 max samples 兜底,即使调用方退出也会停止采样并关闭 browser。browser freeze policy 只能作为异常保护,不替代正常任务生命周期结束后的 stop。 JD01 Chrome 内存治理应优先管理 observer runner 生命周期,而不是孤立清理 Chrome 进程。Web probe sentinel 和 quick-verify 启动 observer 后,所有终态路径(成功、blocked、失败、timeout、异常)都必须执行 YAML 控制的 `web-probe observe stop`/force stop 流程,并验证对应 runner/Chrome process tree 退出;observe runner 自身也必须从 scenario/YAML 获得最大运行时长或 max samples 兜底,即使调用方退出也会停止采样并关闭 browser。browser freeze policy 只能作为异常保护,不替代正常任务生命周期结束后的 stop。
+18
View File
@@ -223,6 +223,11 @@ export interface AgentRunRunnerRetentionSpec {
readonly enabled: boolean; readonly enabled: boolean;
readonly maxAgeHours: number | null; readonly maxAgeHours: number | null;
}; };
readonly sessionPvcRetention: {
readonly enabled: boolean;
readonly prefixes: readonly string[];
readonly maxDeletePerRun: number;
};
} }
export type AgentRunCancelLifecycleStage = "accepted" | "persisted" | "delivered" | "aborting" | "terminalized" | "fenced" | "late-write-rejected"; export type AgentRunCancelLifecycleStage = "accepted" | "persisted" | "delivered" | "aborting" | "terminalized" | "fenced" | "late-write-rejected";
@@ -701,6 +706,14 @@ function parseCancelLifecycleStages(input: unknown, path: string): readonly Agen
function parseRunnerRetention(input: Record<string, unknown>, path: string): AgentRunRunnerRetentionSpec { function parseRunnerRetention(input: Record<string, unknown>, path: string): AgentRunRunnerRetentionSpec {
const selectors = recordField(input, "selectors", path); const selectors = recordField(input, "selectors", path);
const ageBasedCleanup = recordField(input, "ageBasedCleanup", path); const ageBasedCleanup = recordField(input, "ageBasedCleanup", path);
const sessionPvcRetentionRaw = input.sessionPvcRetention;
const sessionPvcRetention = typeof sessionPvcRetentionRaw === "object" && sessionPvcRetentionRaw !== null && !Array.isArray(sessionPvcRetentionRaw)
? sessionPvcRetentionRaw as Record<string, unknown>
: {};
const sessionPvcPrefixes = sessionPvcRetention.prefixes === undefined ? [] : stringArrayField(sessionPvcRetention, "prefixes", `${path}.sessionPvcRetention`);
for (const [index, prefix] of sessionPvcPrefixes.entries()) {
if (!/^[a-z0-9]([-a-z0-9]*[a-z0-9-])?$/u.test(prefix)) throw new Error(`${path}.sessionPvcRetention.prefixes[${index}] must be a lowercase Kubernetes PVC name prefix`);
}
return { return {
maxRunners: positiveIntegerField(input, "maxRunners", path), maxRunners: positiveIntegerField(input, "maxRunners", path),
cleanupOrder: enumField(input, "cleanupOrder", path, ["oldest-inactive-last-active-first"]), cleanupOrder: enumField(input, "cleanupOrder", path, ["oldest-inactive-last-active-first"]),
@@ -716,6 +729,11 @@ function parseRunnerRetention(input: Record<string, unknown>, path: string): Age
enabled: booleanField(ageBasedCleanup, "enabled", `${path}.ageBasedCleanup`), enabled: booleanField(ageBasedCleanup, "enabled", `${path}.ageBasedCleanup`),
maxAgeHours: optionalPositiveIntegerField(ageBasedCleanup, "maxAgeHours", `${path}.ageBasedCleanup`) ?? null, maxAgeHours: optionalPositiveIntegerField(ageBasedCleanup, "maxAgeHours", `${path}.ageBasedCleanup`) ?? null,
}, },
sessionPvcRetention: {
enabled: sessionPvcRetention.enabled === undefined ? false : booleanField(sessionPvcRetention, "enabled", `${path}.sessionPvcRetention`),
prefixes: sessionPvcPrefixes,
maxDeletePerRun: optionalPositiveIntegerField(sessionPvcRetention, "maxDeletePerRun", `${path}.sessionPvcRetention`) ?? 100,
},
}; };
} }
@@ -0,0 +1,102 @@
import { execFileSync, spawnSync } from "node:child_process";
function runJson(args) {
return JSON.parse(execFileSync("kubectl", args, { encoding: "utf8", maxBuffer: 32 * 1024 * 1024 }));
}
function duBytes(path) {
if (!path || !path.startsWith("/var/lib/rancher/k3s/storage/")) return null;
const result = spawnSync("du", ["-sb", path], { encoding: "utf8", timeout: 8000 });
if (result.status !== 0) return null;
const value = Number(result.stdout.trim().split(/\s+/u)[0]);
return Number.isFinite(value) ? value : null;
}
const namespace = process.env.NAMESPACE;
const confirm = process.env.CONFIRM === "true";
const enabled = process.env.ENABLED === "true";
const limit = Math.max(1, Math.min(Number(process.env.LIMIT || "100"), 1000));
const prefixes = JSON.parse(Buffer.from(process.env.PREFIXES_JSON_B64 || "W10=", "base64").toString("utf8"));
if (!enabled) {
console.log(JSON.stringify({ ok: false, error: "session-pvc-retention-disabled", selectedPvcCount: 0, mutation: false }));
process.exit(0);
}
if (!namespace || !Array.isArray(prefixes) || prefixes.length === 0) throw new Error("session PVC cleanup requires namespace and YAML prefixes");
const pvData = runJson(["get", "pv", "-o", "json"]);
const pvcData = runJson(["-n", namespace, "get", "pvc", "-o", "json"]);
const podData = runJson(["-n", namespace, "get", "pod", "-o", "json"]);
const pvs = new Map((pvData.items || []).map((pv) => [pv.metadata?.name, pv]));
const activeClaims = new Map();
for (const pod of podData.items || []) {
const phase = pod.status?.phase;
if (phase === "Succeeded" || phase === "Failed") continue;
for (const volume of pod.spec?.volumes || []) {
const claim = volume.persistentVolumeClaim?.claimName;
if (!claim) continue;
const list = activeClaims.get(claim) || [];
list.push(pod.metadata?.name);
activeClaims.set(claim, list);
}
}
const candidates = [];
const protectedRows = [];
for (const pvc of pvcData.items || []) {
const name = pvc.metadata?.name || "";
const matchedPrefix = prefixes.find((prefix) => name.startsWith(prefix));
if (!matchedPrefix) continue;
const activeMountPods = activeClaims.get(name) || [];
const pv = pvs.get(pvc.spec?.volumeName);
const storageClass = pvc.spec?.storageClassName || pv?.spec?.storageClassName || null;
const reclaimPolicy = pv?.spec?.persistentVolumeReclaimPolicy || null;
const hostPath = pv?.spec?.hostPath?.path || pv?.spec?.local?.path || null;
const row = {
namespace,
pvc: name,
volume: pvc.spec?.volumeName || null,
matchedPrefix,
phase: pvc.status?.phase || null,
pvPhase: pv?.status?.phase || null,
storageClass,
reclaimPolicy,
activeMountCount: activeMountPods.length,
activeMountPods: activeMountPods.slice(0, 5),
estimatedBytes: duBytes(hostPath),
};
if (activeMountPods.length > 0 || storageClass !== "local-path" || reclaimPolicy !== "Delete") {
protectedRows.push({ ...row, reason: activeMountPods.length > 0 ? "active-mount" : "not-local-path-delete" });
} else {
candidates.push(row);
}
}
candidates.sort((a, b) => (b.estimatedBytes || 0) - (a.estimatedBytes || 0));
const selected = candidates.slice(0, limit);
const result = {
ok: true,
planKind: "agentrun-session-pvc-retention",
namespace,
dryRun: !confirm,
mutation: confirm,
criteria: { prefixes, storageClass: "local-path", reclaimPolicy: "Delete", requireNoActiveMount: true, limit },
candidatePvcCount: candidates.length,
selectedPvcCount: selected.length,
protectedPvcCount: protectedRows.length,
estimatedReclaimBytes: selected.reduce((sum, item) => sum + (item.estimatedBytes || 0), 0),
selectedPreview: selected.slice(0, 12),
protectedPreview: protectedRows.slice(0, 12),
deletedPvcCount: 0,
valuesPrinted: false,
};
if (confirm && selected.length > 0) {
for (let index = 0; index < selected.length; index += 50) {
execFileSync("kubectl", ["-n", namespace, "delete", "pvc", "--wait=false", ...selected.slice(index, index + 50).map((item) => item.pvc)], { encoding: "utf8", maxBuffer: 1024 * 1024 });
}
result.deletedPvcCount = selected.length;
result.deleteMode = "submit-only-wait-false";
}
console.log(JSON.stringify(result));
+13 -1
View File
@@ -33,7 +33,7 @@ import {
} from "../agentrun-manifests"; } from "../agentrun-manifests";
import { sha256Fingerprint } from "../platform-infra-ops-library"; import { sha256Fingerprint } from "../platform-infra-ops-library";
import type { CleanupReleasedPvOptions, CleanupRunnersOptions, CleanupRunsOptions, ConfirmOptions, GitMirrorOptions, LaneConfirmOptions, RefreshOptions, SecretSyncOptions, StatusOptions } from "./options"; import type { CleanupReleasedPvOptions, CleanupRunnersOptions, CleanupRunsOptions, CleanupSessionPvcsOptions, ConfirmOptions, GitMirrorOptions, LaneConfirmOptions, RefreshOptions, SecretSyncOptions, StatusOptions } from "./options";
import { agentRunControlPlaneStatusCommand } from "./public-exposure"; import { agentRunControlPlaneStatusCommand } from "./public-exposure";
import { applyYamlScript, manifestObjectRef, yamlLaneGitMirrorStatusScript } from "./secrets"; import { applyYamlScript, manifestObjectRef, yamlLaneGitMirrorStatusScript } from "./secrets";
import { compactAgentRunLaneStatusTarget, compactLaneSecretsStatus } from "./trigger"; import { compactAgentRunLaneStatusTarget, compactLaneSecretsStatus } from "./trigger";
@@ -193,6 +193,18 @@ export function parseCleanupReleasedPvOptions(args: string[]): CleanupReleasedPv
}; };
} }
export function parseCleanupSessionPvcsOptions(args: string[]): CleanupSessionPvcsOptions {
validateOptions(args, new Set(["--confirm", "--dry-run"]), new Set(["--limit", "--timeout-seconds", "--node", "--lane"]));
const base = parseConfirmOptions(args);
return {
...base,
node: optionValue(args, "--node") ?? null,
lane: optionValue(args, "--lane") ?? null,
limit: positiveIntegerOption(args, "--limit", 100, 1000),
timeoutSeconds: positiveIntegerOption(args, "--timeout-seconds", 180, 900),
};
}
export function validateOptions(args: string[], booleanOptions: Set<string>, valueOptions: Set<string>): void { export function validateOptions(args: string[], booleanOptions: Set<string>, valueOptions: Set<string>): void {
for (let index = 0; index < args.length; index += 1) { for (let index = 0; index < args.length; index += 1) {
const arg = args[index]; const arg = args[index];
+7 -3
View File
@@ -34,7 +34,7 @@ import {
import { sha256Fingerprint } from "../platform-infra-ops-library"; import { sha256Fingerprint } from "../platform-infra-ops-library";
import type { AgentRunResourceVerb, AgentRunRestCompatGroup } from "./utils"; import type { AgentRunResourceVerb, AgentRunRestCompatGroup } from "./utils";
import { controlPlaneApply, controlPlanePlan, parseCleanupReleasedPvOptions, parseCleanupRunnersOptions, parseCleanupRunsOptions, parseConfirmOptions, parseGitMirrorOptions, parseLaneConfirmOptions, parseRefreshOptions, parseSecretSyncOptions, status } from "./control-plane"; import { controlPlaneApply, controlPlanePlan, parseCleanupReleasedPvOptions, parseCleanupRunnersOptions, parseCleanupRunsOptions, parseCleanupSessionPvcsOptions, parseConfirmOptions, parseGitMirrorOptions, parseLaneConfirmOptions, parseRefreshOptions, parseSecretSyncOptions, status } from "./control-plane";
import { gitMirrorStatus } from "./git-mirror"; import { gitMirrorStatus } from "./git-mirror";
import { agentRunExplain, isRecord, parseGitMirrorStatusOptions, parseStatusOptions, parseTriggerOptions } from "./options"; import { agentRunExplain, isRecord, parseGitMirrorStatusOptions, parseStatusOptions, parseTriggerOptions } from "./options";
import { renderAgentRunControlPlaneActionSummary, renderAgentRunControlPlanePlanSummary, renderAgentRunControlPlaneStatusSummary } from "./public-exposure"; import { renderAgentRunControlPlaneActionSummary, renderAgentRunControlPlanePlanSummary, renderAgentRunControlPlaneStatusSummary } from "./public-exposure";
@@ -43,7 +43,7 @@ import { agentRunGetKindHelp, runAgentRunResourceCommand } from "./resource-acti
import { runAgentRunRestCompatCommand, runGitMirrorJob, startAsyncAgentRunJob } from "./rest-bridge"; import { runAgentRunRestCompatCommand, runGitMirrorJob, startAsyncAgentRunJob } from "./rest-bridge";
import { exposeAgentRun, restartYamlLane, secretSync, triggerCurrent } from "./trigger"; import { exposeAgentRun, restartYamlLane, secretSync, triggerCurrent } from "./trigger";
import { unsupported } from "./utils"; import { unsupported } from "./utils";
import { cleanupReleasedPvs, cleanupRunners, cleanupRuns, refresh } from "./yaml-lane"; import { cleanupReleasedPvs, cleanupRunners, cleanupRuns, cleanupSessionPvcs, refresh } from "./yaml-lane";
export function agentRunHelp(): unknown { export function agentRunHelp(): unknown {
return { return {
@@ -143,6 +143,9 @@ export async function runAgentRunCommand(config: UniDeskConfig | null, args: str
return options.full || options.raw ? result : renderAgentRunControlPlaneActionSummary(result, "AGENTRUN RUNNER CLEANUP"); return options.full || options.raw ? result : renderAgentRunControlPlaneActionSummary(result, "AGENTRUN RUNNER CLEANUP");
} }
if (action === "cleanup-runs") return await cleanupRuns(config, parseCleanupRunsOptions(actionArgs)); if (action === "cleanup-runs") return await cleanupRuns(config, parseCleanupRunsOptions(actionArgs));
if (action === "cleanup-session-pvcs") {
return await cleanupSessionPvcs(config, parseCleanupSessionPvcsOptions(actionArgs));
}
if (action === "cleanup-released-pvs") return await cleanupReleasedPvs(config, parseCleanupReleasedPvOptions(actionArgs)); if (action === "cleanup-released-pvs") return await cleanupReleasedPvs(config, parseCleanupReleasedPvOptions(actionArgs));
} }
if (group === "git-mirror") { if (group === "git-mirror") {
@@ -271,7 +274,7 @@ export function agentRunHelpText(args: string[]): string {
return [ return [
"Usage: bun scripts/cli.ts agentrun control-plane <action> [options]", "Usage: bun scripts/cli.ts agentrun control-plane <action> [options]",
"", "",
"Actions: plan, apply, status, secret-sync, expose, trigger-current, refresh, cleanup-runners, cleanup-runs, cleanup-released-pvs", "Actions: plan, apply, status, secret-sync, expose, trigger-current, refresh, cleanup-runners, cleanup-runs, cleanup-session-pvcs, cleanup-released-pvs",
"Examples:", "Examples:",
" bun scripts/cli.ts agentrun control-plane plan --node D601 --lane v02", " bun scripts/cli.ts agentrun control-plane plan --node D601 --lane v02",
" bun scripts/cli.ts agentrun control-plane apply --node D601 --lane v02 --dry-run", " bun scripts/cli.ts agentrun control-plane apply --node D601 --lane v02 --dry-run",
@@ -283,6 +286,7 @@ export function agentRunHelpText(args: string[]): string {
" bun scripts/cli.ts agentrun control-plane expose --dry-run", " bun scripts/cli.ts agentrun control-plane expose --dry-run",
" bun scripts/cli.ts agentrun control-plane trigger-current --dry-run", " bun scripts/cli.ts agentrun control-plane trigger-current --dry-run",
" bun scripts/cli.ts agentrun control-plane cleanup-runners --node D601 --lane v02 --dry-run", " bun scripts/cli.ts agentrun control-plane cleanup-runners --node D601 --lane v02 --dry-run",
" bun scripts/cli.ts agentrun control-plane cleanup-session-pvcs --node JD01 --lane jd01-v02 --dry-run",
" bun scripts/cli.ts agentrun control-plane cleanup-runs --min-age-minutes 30 --limit 200 --dry-run", " bun scripts/cli.ts agentrun control-plane cleanup-runs --min-age-minutes 30 --limit 200 --dry-run",
].join("\n"); ].join("\n");
} }
+7
View File
@@ -265,6 +265,13 @@ export interface CleanupReleasedPvOptions extends ConfirmOptions {
timeoutSeconds: number; timeoutSeconds: number;
} }
export interface CleanupSessionPvcsOptions extends ConfirmOptions {
node: string | null;
lane: string | null;
limit: number;
timeoutSeconds: number;
}
export interface DisclosureOptions { export interface DisclosureOptions {
full: boolean; full: boolean;
raw: boolean; raw: boolean;
+50 -1
View File
@@ -35,7 +35,7 @@ import {
} from "../agentrun-manifests"; } from "../agentrun-manifests";
import { sha256Fingerprint } from "../platform-infra-ops-library"; import { sha256Fingerprint } from "../platform-infra-ops-library";
import type { CleanupReleasedPvOptions, CleanupRunnersOptions, CleanupRunsOptions, RefreshOptions } from "./options"; import type { CleanupReleasedPvOptions, CleanupRunnersOptions, CleanupRunsOptions, CleanupSessionPvcsOptions, RefreshOptions } from "./options";
import { cleanupReleasedPvsFinalizeNodeScript, cleanupReleasedPvsPlanNodeScript, cleanupRunnersFinalizeNodeScript, cleanupRunsFinalizeNodeScript, cleanupRunsPlanNodeScript, refreshYamlLaneScript } from "./git-mirror"; import { cleanupReleasedPvsFinalizeNodeScript, cleanupReleasedPvsPlanNodeScript, cleanupRunnersFinalizeNodeScript, cleanupRunsFinalizeNodeScript, cleanupRunsPlanNodeScript, refreshYamlLaneScript } from "./git-mirror";
import { cleanupRunnersFactsNodeScript, cleanupRunnersPlanNodeScript, collectLaneSecretSources, createYamlLaneJobScript, yamlLaneGitopsPublishJobManifest, yamlLaneGitopsPublishPayloadFromProbe, yamlLaneJobProbeScript } from "./secrets"; import { cleanupRunnersFactsNodeScript, cleanupRunnersPlanNodeScript, collectLaneSecretSources, createYamlLaneJobScript, yamlLaneGitopsPublishJobManifest, yamlLaneGitopsPublishPayloadFromProbe, yamlLaneJobProbeScript } from "./secrets";
import { capture, captureJsonPayload, compactCapture, progressEvent, shQuote, sleep, stringOrNull } from "./utils"; import { capture, captureJsonPayload, compactCapture, progressEvent, shQuote, sleep, stringOrNull } from "./utils";
@@ -204,6 +204,55 @@ export async function cleanupReleasedPvs(config: UniDeskConfig, options: Cleanup
}; };
} }
export async function cleanupSessionPvcs(config: UniDeskConfig, options: CleanupSessionPvcsOptions): Promise<Record<string, unknown>> {
const { configPath, spec } = resolveAgentRunLaneTarget(options);
const result = await capture(config, spec.nodeKubeRoute, ["sh", "--", cleanupSessionPvcsScript(options, spec)]);
const payload = captureJsonPayload(result);
const ok = result.exitCode === 0 && payload.ok !== false;
const base = {
...payload,
ok,
command: "agentrun control-plane cleanup-session-pvcs",
configPath,
target: { node: spec.nodeId, lane: spec.lane, namespace: spec.runtime.namespace },
mode: options.dryRun || !options.confirm ? "dry-run" : "confirmed-cleanup",
namespace: spec.runtime.namespace,
retention: spec.deployment.runner.retention.sessionPvcRetention,
probe: result.exitCode === 0 ? undefined : compactCapture(result, { full: true, stdoutTailChars: 3000, stderrTailChars: 3000 }),
};
if (options.dryRun || !options.confirm) {
return { ...base, dryRun: true, mutation: false, next: { confirm: `bun scripts/cli.ts agentrun control-plane cleanup-session-pvcs --node ${spec.nodeId} --lane ${spec.lane} --limit ${options.limit} --confirm` } };
}
return {
...base,
dryRun: false,
mutation: true,
followUp: {
dryRun: `bun scripts/cli.ts agentrun control-plane cleanup-session-pvcs --node ${spec.nodeId} --lane ${spec.lane} --limit ${options.limit} --dry-run`,
diskPressure: `bun scripts/cli.ts gc remote ${spec.nodeId} status --limit 20`,
},
};
}
export function cleanupSessionPvcsScript(options: CleanupSessionPvcsOptions, spec: AgentRunLaneSpec): string {
const retention = spec.deployment.runner.retention.sessionPvcRetention;
const script = readFileSync(rootPath("scripts/src/agentrun/cleanup-session-pvcs.mjs"), "utf8");
return [
"set -eu",
`namespace=${shQuote(spec.runtime.namespace)}`,
`confirm=${options.confirm && !options.dryRun ? "true" : "false"}`,
`limit=${String(Math.min(options.limit, retention.maxDeletePerRun))}`,
`enabled=${retention.enabled ? "true" : "false"}`,
`prefixes_json_b64=${shQuote(Buffer.from(JSON.stringify(retention.prefixes), "utf8").toString("base64"))}`,
"tmp_dir=$(mktemp -d)",
"trap 'rm -rf \"$tmp_dir\"' EXIT",
"cat > \"$tmp_dir/cleanup-session-pvcs.mjs\" <<'NODE'",
script,
"NODE",
"env NAMESPACE=\"$namespace\" CONFIRM=\"$confirm\" LIMIT=\"$limit\" ENABLED=\"$enabled\" PREFIXES_JSON_B64=\"$prefixes_json_b64\" node \"$tmp_dir/cleanup-session-pvcs.mjs\"",
].join("\n");
}
export function cleanupRunnersScript(options: CleanupRunnersOptions, spec: AgentRunLaneSpec): string { export function cleanupRunnersScript(options: CleanupRunnersOptions, spec: AgentRunLaneSpec): string {
const retention = spec.deployment.runner.retention; const retention = spec.deployment.runner.retention;
const matchLabelsB64 = Buffer.from(JSON.stringify(retention.selectors.matchLabels), "utf8").toString("base64"); const matchLabelsB64 = Buffer.from(JSON.stringify(retention.selectors.matchLabels), "utf8").toString("base64");
+386
View File
@@ -0,0 +1,386 @@
def k3s_crictl_base():
endpoint = str(CONTAINERD_CONFIG.get("runtimeEndpoint") or "unix:///run/k3s/containerd/containerd.sock")
return ["crictl", "--runtime-endpoint", endpoint]
def shell_single_quote(value):
return "'" + str(value).replace("'", "'\"'\"'") + "'"
def k3s_crictl_json(args, timeout=30):
result = command(k3s_crictl_base() + args + ["-o", "json"], timeout)
if result["exitCode"] != 0:
return None, result
try:
return json.loads(result["stdout"] or "{}"), result
except Exception:
return None, result
def ci_activity_snapshot_for_prune():
namespaces = config_list(CONTAINERD_CONFIG, "ciNamespaces", ["hwlab-ci", "agentrun-ci"])
active = []
commands = []
for namespace in namespaces:
result = command(["sh", "-lc", "KUBECONFIG=/etc/rancher/k3s/k3s.yaml kubectl get pipelinerun,taskrun,job -n %s --no-headers 2>/dev/null | awk '$2 != \"True\" && $2 != \"False\" && $2 != \"Complete\" && $2 != \"Failed\" {print}' | head -20" % shell_single_quote(namespace)], 15)
commands.append({"namespace": namespace, "command": bounded(result)})
for line in (result.get("stdout") or "").splitlines():
if line.strip():
active.append({"namespace": namespace, "line": line.strip()})
return {"ok": True, "activeCount": len(active), "activePreview": active[:20], "commands": commands}
def compact_ci_activity(activity):
return {
"ok": activity.get("ok"),
"activeCount": activity.get("activeCount"),
"activePreview": activity.get("activePreview") or [],
}
def compact_image_ref(ref):
ref = str(ref or "")
return ref if len(ref) <= 120 else ref[:117] + "..."
def k3s_cri_image_rows():
images, image_cmd = k3s_crictl_json(["images"], 45)
containers, container_cmd = k3s_crictl_json(["ps", "-a"], 30)
if images is None:
return None, {"ok": False, "reason": "crictl-images-failed", "command": bounded(image_cmd)}
if containers is None:
return None, {"ok": False, "reason": "crictl-ps-failed", "command": bounded(container_cmd)}
used = set()
for container in containers.get("containers") or []:
for key in ["imageRef", "image", "imageId"]:
value = container.get(key)
if isinstance(value, str) and value:
used.add(value)
image = container.get("image") or {}
if isinstance(image, dict):
for key in ["image", "annotations", "userSpecifiedImage"]:
value = image.get(key)
if isinstance(value, str) and value:
used.add(value)
rows = []
for image in images.get("images") or []:
refs = []
for key in ["repoTags", "repoDigests"]:
value = image.get(key)
if isinstance(value, list):
refs.extend([str(item) for item in value if item])
image_id = str(image.get("id") or "")
pinned = bool(image.get("pinned"))
size = safe_int(image.get("size_") or image.get("size") or 0)
in_use = pinned or image_id in used or any(ref in used for ref in refs)
rows.append({"id": image_id, "refs": refs, "sizeBytes": size, "inUse": in_use, "pinned": pinned})
return rows, {"ok": True, "imageCommand": bounded(image_cmd), "containerCommand": bounded(container_cmd)}
def k3s_image_cache_candidate():
if not config_bool(CONTAINERD_CONFIG, "enabled", False):
return {
"id": "k3s-cri-image-prune:disabled",
"kind": "k3s-cri-image-prune-disabled",
"risk": "blocked",
"description": "K3s CRI image prune is disabled in YAML",
"estimatedReclaimBytes": 0,
"configSource": "config/unidesk-cli.yaml#gc.remote.targets.%s.containerdImageCache.enabled" % PROVIDER_ID,
}
activity = ci_activity_snapshot_for_prune()
if int(activity.get("activeCount") or 0) > 0:
return {
"id": "k3s-cri-image-prune:ci-active",
"kind": "k3s-cri-image-prune-blocked",
"risk": "blocked",
"description": "K3s CRI image prune is blocked while CI workloads are active",
"estimatedReclaimBytes": 0,
"ciActivity": compact_ci_activity(activity),
}
rows, meta = k3s_cri_image_rows()
if rows is None:
return {
"id": "k3s-cri-image-prune:unavailable",
"kind": "k3s-cri-image-prune-unavailable",
"risk": "blocked",
"description": "K3s CRI image list is unavailable",
"estimatedReclaimBytes": 0,
"diagnostic": meta,
}
unused = [row for row in rows if not row.get("inUse")]
estimated = sum(safe_int(row.get("sizeBytes")) for row in unused)
if estimated <= 0:
return None
return {
"id": "k3s-cri-image-prune:unused",
"kind": "k3s-cri-image-prune",
"risk": "medium",
"description": "Prune unused k3s CRI images through crictl rmi --prune; no containerd paths are deleted directly",
"sizeBytes": estimated,
"estimatedReclaimBytes": estimated,
"imageCount": len(rows),
"unusedImageCount": len(unused),
"unusedPreview": [{"id": row.get("id"), "refs": [compact_image_ref(ref) for ref in (row.get("refs") or [])[:2]], "sizeBytes": row.get("sizeBytes")} for row in unused[:3]],
"ciActivity": compact_ci_activity(activity),
"action": {"command": k3s_crictl_base() + ["rmi", "--prune"], "mode": "cri-unused-images-only"},
}
def execute_k3s_image_cache_prune():
activity = ci_activity_snapshot_for_prune()
if int(activity.get("activeCount") or 0) > 0:
raise RuntimeError("refusing k3s image prune while CI workloads are active")
before = du_size("/var/lib/rancher/k3s/agent/containerd", 45) or 0
result = command(k3s_crictl_base() + ["rmi", "--prune"], 300)
if result["exitCode"] != 0:
raise RuntimeError((result["stderr"] or result["stdout"] or "crictl rmi --prune failed").strip())
after = du_size("/var/lib/rancher/k3s/agent/containerd", 45) or 0
return {"reclaimedBytes": max(0, before - after), "commandOutput": bounded(result), "ciActivity": compact_ci_activity(activity)}
def host_ctr_base(namespace=None):
address = config_str(HOST_CONTAINERD_CONFIG, "address", "")
args = ["ctr"]
if address:
args.extend(["--address", address])
if namespace:
args.extend(["-n", namespace])
return args
def host_ctr(args, timeout=30, namespace=None):
return command(host_ctr_base(namespace) + args, timeout)
def host_containerd_namespaces():
configured = config_list(HOST_CONTAINERD_CONFIG, "namespaces", [])
if configured:
return configured, {"source": "yaml", "command": None}
result = host_ctr(["namespaces", "list", "-q"], 20)
if result["exitCode"] != 0:
return [], {"source": "ctr", "command": bounded(result), "error": "ctr-namespaces-failed"}
return [line.strip() for line in (result.get("stdout") or "").splitlines() if line.strip()], {"source": "ctr", "command": bounded(result)}
def host_containerd_activity():
if not config_bool(HOST_CONTAINERD_CONFIG, "enabled", False):
return {"ok": False, "reason": "host-containerd-cache-disabled", "activeCount": 0}
root = config_str(HOST_CONTAINERD_CONFIG, "root", "")
if not root or not os.path.isdir(root):
return {"ok": False, "reason": "host-containerd-root-unavailable", "root": root, "activeCount": 0}
namespaces, namespace_meta = host_containerd_namespaces()
active = []
commands = []
for namespace in namespaces:
task_result = host_ctr(["tasks", "list", "-q"], 20, namespace)
container_result = host_ctr(["containers", "list", "-q"], 20, namespace)
image_result = host_ctr(["images", "list", "-q"], 20, namespace)
lease_result = host_ctr(["leases", "list", "-q"], 20, namespace)
snapshot_result = host_ctr(["snapshots", "list"], 20, namespace)
content_result = host_ctr(["content", "list"], 20, namespace)
snapshot_lines = table_data_lines(snapshot_result.get("stdout") or "", "KEY")
content_lines = table_data_lines(content_result.get("stdout") or "", "DIGEST")
commands.append({
"namespace": namespace,
"tasks": bounded(task_result),
"containers": bounded(container_result),
"images": bounded(image_result),
"leases": bounded(lease_result),
"snapshots": bounded(snapshot_result),
"content": bounded(content_result),
})
for kind, result in [("task", task_result), ("container", container_result), ("lease", lease_result)]:
if result["exitCode"] != 0:
active.append({"namespace": namespace, "kind": kind, "state": "unknown", "reason": "ctr-list-failed"})
continue
for line in (result.get("stdout") or "").splitlines():
if line.strip():
active.append({"namespace": namespace, "kind": kind, "name": line.strip()})
if snapshot_result["exitCode"] != 0:
active.append({"namespace": namespace, "kind": "snapshot", "state": "unknown", "reason": "ctr-list-failed"})
for line in snapshot_lines:
active.append({"namespace": namespace, "kind": "snapshot", "name": line.split()[0] if line.split() else line})
if content_result["exitCode"] != 0:
active.append({"namespace": namespace, "kind": "content", "state": "unknown", "reason": "ctr-list-failed"})
for line in content_lines:
active.append({"namespace": namespace, "kind": "content", "name": line.split()[0] if line.split() else line})
return {
"ok": True,
"root": root,
"namespaces": namespaces,
"namespaceMeta": namespace_meta,
"activeCount": len(active),
"activePreview": active[:20],
"commands": commands,
}
def compact_host_containerd_activity(activity):
return {
"ok": activity.get("ok"),
"reason": activity.get("reason"),
"root": activity.get("root"),
"namespaces": activity.get("namespaces"),
"activeCount": activity.get("activeCount"),
"activePreview": activity.get("activePreview") or [],
}
def table_data_lines(stdout, header_prefix):
lines = [line.strip() for line in str(stdout or "").splitlines() if line.strip()]
return [line for line in lines if not line.startswith(header_prefix)]
def host_containerd_orphan_config():
value = HOST_CONTAINERD_CONFIG.get("orphanCleanup") if isinstance(HOST_CONTAINERD_CONFIG, dict) else None
return value if isinstance(value, dict) else {}
def direct_child_paths(root, predicate):
if not root or not os.path.isdir(root) or os.path.islink(root):
return []
rows = []
for name in sorted(os.listdir(root)):
path = os.path.realpath(os.path.abspath(os.path.join(root, name)))
if os.path.dirname(path) != os.path.realpath(os.path.abspath(root)):
continue
if not predicate(name, path):
continue
rows.append({"name": name, "path": path, "estimatedReclaimBytes": du_size(path, 10) or path_size(path)})
return rows
def host_containerd_orphan_rows(activity):
cfg = host_containerd_orphan_config()
if not config_bool(cfg, "enabled", False):
return [], {"ok": False, "reason": "host-containerd-orphan-cleanup-disabled"}
if not activity.get("ok") or int(activity.get("activeCount") or 0) > 0:
return [], {"ok": False, "reason": "host-containerd-metadata-not-empty", "activity": compact_host_containerd_activity(activity)}
overlay_root = os.path.realpath(os.path.abspath(config_str(cfg, "overlaySnapshotsRoot", "")))
content_root = os.path.realpath(os.path.abspath(config_str(cfg, "contentBlobRoot", "")))
root = os.path.realpath(os.path.abspath(config_str(HOST_CONTAINERD_CONFIG, "root", "")))
if not root or not overlay_root.startswith(root.rstrip("/") + "/") or not content_root.startswith(root.rstrip("/") + "/"):
return [], {"ok": False, "reason": "host-containerd-orphan-root-outside-containerd-root", "root": root}
open_roots = []
for candidate_root in [overlay_root, content_root]:
if os.path.exists(candidate_root) and path_has_open_fd(candidate_root):
open_roots.append(candidate_root)
if open_roots:
return [], {"ok": False, "reason": "host-containerd-orphan-root-open-fd", "openRoots": open_roots}
overlay_rows = direct_child_paths(overlay_root, lambda name, path: os.path.isdir(path) and not os.path.islink(path) and re.match(r"^[0-9]+$", name) is not None)
content_rows = direct_child_paths(content_root, lambda name, path: os.path.isfile(path) and not os.path.islink(path) and re.match(r"^[0-9a-f]{64}$", name) is not None)
safe_rows = []
for kind, rows in [("overlay-snapshot-dir", overlay_rows), ("content-blob-file", content_rows)]:
for row in rows:
safe_rows.append({**row, "kind": kind})
safe_rows.sort(key=lambda item: safe_int(item.get("estimatedReclaimBytes")), reverse=True)
return safe_rows, {
"ok": True,
"root": root,
"overlaySnapshotsRoot": overlay_root,
"contentBlobRoot": content_root,
"overlayCandidateCount": len(overlay_rows),
"contentCandidateCount": len(content_rows),
"protectedCount": 0,
"protectedPreview": [],
}
def host_containerd_orphan_candidate(activity):
rows, meta = host_containerd_orphan_rows(activity)
if not meta.get("ok"):
return None
limit = int(OPTIONS.get("limit") or 50)
selected = rows[:limit]
estimated = sum(safe_int(row.get("estimatedReclaimBytes")) for row in selected)
if estimated <= 0:
return None
return {
"id": "host-containerd-orphan-state:delete",
"kind": "host-containerd-orphan-state-delete",
"risk": "medium",
"description": "Delete YAML-allowlisted host containerd orphan snapshot/content files only when ctr metadata has no tasks, containers, leases, images, snapshots or content",
"path": meta.get("root"),
"sizeBytes": estimated,
"estimatedReclaimBytes": estimated,
"orphanCount": len(rows),
"selectedOrphanCount": len(selected),
"overlayCandidateCount": meta.get("overlayCandidateCount"),
"contentCandidateCount": meta.get("contentCandidateCount"),
"protectedCount": meta.get("protectedCount"),
"selectedPreview": [{"kind": row.get("kind"), "name": row.get("name"), "estimatedReclaimBytes": row.get("estimatedReclaimBytes")} for row in selected[:8]],
"protectedPreview": meta.get("protectedPreview"),
"action": {"op": "remove-yaml-allowlisted-host-containerd-orphans", "limit": limit},
}
def host_containerd_cache_candidate():
activity = host_containerd_activity()
if not activity.get("ok"):
return {
"id": "host-containerd-cache:unavailable",
"kind": "host-containerd-cache-unavailable",
"risk": "blocked",
"description": "Host containerd cache cleanup is disabled or unavailable by YAML",
"estimatedReclaimBytes": 0,
"diagnostic": compact_host_containerd_activity(activity),
}
if int(activity.get("activeCount") or 0) > 0:
return {
"id": "host-containerd-cache:active",
"kind": "host-containerd-cache-blocked",
"risk": "blocked",
"description": "Host containerd cache prune is blocked while host containerd tasks or containers exist",
"estimatedReclaimBytes": 0,
"activity": compact_host_containerd_activity(activity),
}
orphan = host_containerd_orphan_candidate(activity)
if orphan:
return orphan
root = activity.get("root") or ""
size = du_size(root, 45) or 0
if size <= 0:
return None
return {
"id": "host-containerd-cache:prune-unused",
"kind": "host-containerd-cache-prune",
"risk": "medium",
"description": "Prune host containerd images in YAML-selected namespaces only when no host containerd tasks or containers exist",
"path": root,
"sizeBytes": size,
"estimatedReclaimBytes": size,
"activity": compact_host_containerd_activity(activity),
"action": {"command": "ctr images prune --all per namespace", "mode": "host-containerd-unused-images-only"},
}
def execute_host_containerd_cache_prune():
activity = host_containerd_activity()
if not activity.get("ok"):
raise RuntimeError("host containerd cache cleanup unavailable: %s" % activity.get("reason"))
if int(activity.get("activeCount") or 0) > 0:
raise RuntimeError("refusing host containerd prune while tasks or containers exist")
root = activity.get("root") or ""
before = du_size(root, 45) or 0
results = []
for namespace in activity.get("namespaces") or []:
result = host_ctr(["images", "prune", "--all"], 300, namespace)
results.append({"namespace": namespace, "imagesPrune": bounded(result)})
if result["exitCode"] != 0:
raise RuntimeError("host containerd image prune failed in namespace %s: %s" % (namespace, (result.get("stderr") or result.get("stdout") or "").strip()))
after = du_size(root, 45) or 0
return {
"reclaimedBytes": max(0, before - after),
"activity": compact_host_containerd_activity(activity),
"commandResults": results[:8],
}
def execute_host_containerd_orphan_cleanup():
activity = host_containerd_activity()
rows, meta = host_containerd_orphan_rows(activity)
if not meta.get("ok"):
raise RuntimeError("host containerd orphan cleanup unavailable: %s" % meta.get("reason"))
for root_path in [meta.get("overlaySnapshotsRoot"), meta.get("contentBlobRoot")]:
if root_path and os.path.exists(root_path) and path_has_open_fd(root_path):
raise RuntimeError("refusing host containerd orphan cleanup with open fd/cwd under root: %s" % root_path)
limit = int(OPTIONS.get("limit") or 50)
selected = rows[:limit]
reclaimed = 0
deleted = []
for row in selected:
path = row.get("path")
before = du_size(path, 10) or path_size(path)
if row.get("kind") == "overlay-snapshot-dir":
shutil.rmtree(path, ignore_errors=True)
elif row.get("kind") == "content-blob-file":
os.unlink(path)
else:
raise RuntimeError("unsupported host containerd orphan kind: %s" % row.get("kind"))
reclaimed += before
deleted.append({"kind": row.get("kind"), "name": row.get("name"), "reclaimedBytes": before})
return {
"reclaimedBytes": reclaimed,
"deletedOrphanCount": len(deleted),
"deletedPreview": deleted[:12],
"root": meta.get("root"),
}
+292
View File
@@ -0,0 +1,292 @@
def registry_growth_snapshot():
summary = {
"path": REGISTRY_ROOT,
"sizeBytes": du_size(REGISTRY_ROOT, 60) or 0,
}
summary["sizeHuman"] = fmt_bytes(summary["sizeBytes"])
if OPTIONS.get("hwlabRegistry", False):
plan = plan_registry_retention()
retention = dict(plan.get("summary") or {})
for key in ["registrySizeBytes", "estimatedReclaimBytes"]:
if key in retention:
retention[key.replace("Bytes", "Human")] = fmt_bytes(retention.get(key) or 0)
summary["retentionPlan"] = retention
else:
summary["retentionPlan"] = {
"skipped": True,
"reason": "rerun snapshot with --include-hwlab-registry to compute tag/revision retention counters",
}
summary["cadence"] = {
"dryRun": "daily or before/after every v0.2 CI/CD burst",
"maintenanceRun": "weekly, or when root >=80%, or when registry growth exceeds the agreed daily threshold",
"planCommand": "bun scripts/cli.ts gc remote %s plan --target-use-percent 70 --include-hwlab-registry --limit 50" % PROVIDER_ID,
"snapshotCommand": "bun scripts/cli.ts gc remote %s snapshot --include-hwlab-registry --history-limit 12" % PROVIDER_ID,
"runCommand": "bun scripts/cli.ts gc remote %s run --confirm --include-hwlab-registry --target-use-percent 70 --limit 50" % PROVIDER_ID,
"defaultRetention": {
"keepPerRepo": int(OPTIONS.get("registryKeepPerRepo") or 20),
"minAgeHours": float(OPTIONS.get("registryMinAgeHours") or 48),
"protects": ["current workload refs", "digest closure", "protected tags", "recent tags", "newest N tags per repo"],
},
}
return summary
def growth_watermark_policy(root_disk):
use_percent = root_disk.get("usePercent") if isinstance(root_disk, dict) else None
if use_percent is None:
state = "unknown"
action = "collect-snapshot"
elif use_percent < 75:
state = "healthy"
action = "observe-trend"
elif use_percent < 80:
state = "watch"
action = "run-dry-run-plan"
elif use_percent < 85:
state = "maintenance"
action = "schedule-owner-aware-retention"
else:
state = "emergency"
action = "restore-runtime-then-file-evidence"
return {
"state": state,
"recommendedAction": action,
"watermarks": [
{"range": "<75%", "action": "trend only"},
{"range": "75%-80%", "action": "run dry-run plan and identify source"},
{"range": "80%-85%", "action": "small owner-aware retention run"},
{"range": ">=85%", "action": "runtime recovery first, then root-cause growth source"},
],
"growthThresholdPolicy": "If bytes/day remains high for consecutive snapshots, act before 80%; exact threshold should be set from the first week of saved snapshots.",
}
def snapshot_metric_map(snapshot):
metrics = {}
root = snapshot.get("rootDisk") or {}
if isinstance(root, dict) and root.get("usedBytes") is not None:
metrics["root.usedBytes"] = {"value": safe_int(root.get("usedBytes")), "unit": "bytes", "label": "root used bytes"}
for item in snapshot.get("sources") or []:
if not isinstance(item, dict) or item.get("sizeBytes") is None:
continue
key = "source.%s.sizeBytes" % item.get("id")
metrics[key] = {"value": safe_int(item.get("sizeBytes")), "unit": "bytes", "label": item.get("label") or item.get("id")}
storage = ((snapshot.get("ciStorage") or {}).get("byOwnerGroup") or {})
if not storage:
storage = ((snapshot.get("pvcAttribution") or {}).get("byOwnerGroup") or {})
for owner, value in storage.items():
metrics["ciStorage.%s.estimatedBytes" % owner] = {"value": safe_int((value or {}).get("estimatedBytes")), "unit": "bytes", "label": "CI storage %s" % owner}
memory = snapshot.get("memoryPressure") or {}
memory_summary = memory.get("summary") or {}
if memory_summary.get("matchedRssBytes") is not None:
metrics["memoryPressure.matchedRssBytes"] = {"value": safe_int(memory_summary.get("matchedRssBytes")), "unit": "bytes", "label": "matched observer/chrome RSS"}
if memory_summary.get("observeStateBytes") is not None:
metrics["memoryPressure.observeStateBytes"] = {"value": safe_int(memory_summary.get("observeStateBytes")), "unit": "bytes", "label": "web observe state bytes"}
for key in ["matchedProcessCount", "activeObserverSignals", "staleObserverSignals"]:
if memory_summary.get(key) is not None:
metrics["memoryPressure.%s" % key] = {"value": safe_int(memory_summary.get(key)), "unit": "count", "label": "memory pressure %s" % key}
registry = snapshot.get("registry") or {}
retention = registry.get("retentionPlan") or {}
for key in ["totalTags", "totalRevisions", "deleteTags", "deleteRevisions", "estimatedReclaimBytes"]:
if key in retention and retention.get(key) is not None:
unit = "bytes" if key.endswith("Bytes") else "count"
metrics["registry.%s" % key] = {"value": safe_int(retention.get(key)), "unit": unit, "label": "registry %s" % key}
return metrics
def delta_metric_rows(before, after):
before_metrics = snapshot_metric_map(before)
after_metrics = snapshot_metric_map(after)
before_ts = iso_to_epoch(before.get("observedAt"))
after_ts = iso_to_epoch(after.get("observedAt"))
seconds = (after_ts - before_ts) if before_ts is not None and after_ts is not None else None
rows = []
for key in sorted(set(before_metrics.keys()) | set(after_metrics.keys())):
old = before_metrics.get(key, {"value": 0, "unit": (after_metrics.get(key) or {}).get("unit"), "label": key})
new = after_metrics.get(key, {"value": 0, "unit": old.get("unit"), "label": old.get("label")})
delta = safe_int(new.get("value")) - safe_int(old.get("value"))
row = {
"key": key,
"label": new.get("label") or old.get("label") or key,
"unit": new.get("unit") or old.get("unit"),
"before": old.get("value"),
"after": new.get("value"),
"delta": delta,
}
if row["unit"] == "bytes":
row["beforeHuman"] = fmt_bytes(row["before"] or 0)
row["afterHuman"] = fmt_bytes(row["after"] or 0)
row["deltaHuman"] = ("-" if delta < 0 else "") + fmt_bytes(abs(delta))
if seconds and seconds > 0:
per_day = int(delta * 86400 / seconds)
row["perDayBytes"] = per_day
row["perDayHuman"] = ("-" if per_day < 0 else "") + fmt_bytes(abs(per_day)) + "/day"
rows.append(row)
rows.sort(key=lambda item: safe_int(item.get("delta")), reverse=True)
return {"durationSeconds": seconds, "metrics": rows}
def growth_trend_payload(points):
points = [point for point in points if isinstance(point, dict)]
if len(points) < 2:
return {
"pointCount": len(points),
"state": "insufficient-history",
"message": "Run snapshot at least twice to compute deltas.",
}
latest_delta = delta_metric_rows(points[-2], points[-1])
window_delta = delta_metric_rows(points[0], points[-1])
def rate_warning(delta):
seconds = delta.get("durationSeconds")
if seconds is not None and seconds < 3600:
return {
"code": "short-window-rate-noisy",
"message": "Per-day rates from windows shorter than 1 hour are directional only; use daily snapshots for governance decisions.",
"durationSeconds": seconds,
}
return None
return {
"pointCount": len(points),
"oldestAt": points[0].get("observedAt"),
"latestAt": points[-1].get("observedAt"),
"latestDelta": {
"durationSeconds": latest_delta.get("durationSeconds"),
"rateWarning": rate_warning(latest_delta),
"topGrowingBytes": [row for row in latest_delta.get("metrics", []) if row.get("unit") == "bytes" and safe_int(row.get("delta")) > 0][:10],
"topShrinkingBytes": [row for row in reversed(latest_delta.get("metrics", [])) if row.get("unit") == "bytes" and safe_int(row.get("delta")) < 0][:10],
"registryCounters": [row for row in latest_delta.get("metrics", []) if str(row.get("key", "")).startswith("registry.") and row.get("unit") == "count"],
},
"windowDelta": {
"durationSeconds": window_delta.get("durationSeconds"),
"rateWarning": rate_warning(window_delta),
"topGrowingBytes": [row for row in window_delta.get("metrics", []) if row.get("unit") == "bytes" and safe_int(row.get("delta")) > 0][:10],
"topShrinkingBytes": [row for row in reversed(window_delta.get("metrics", [])) if row.get("unit") == "bytes" and safe_int(row.get("delta")) < 0][:10],
"registryCounters": [row for row in window_delta.get("metrics", []) if str(row.get("key", "")).startswith("registry.") and row.get("unit") == "count"],
},
}
def compact_metric_rows(rows, limit=3):
compact = []
for row in (rows or [])[:limit]:
compact.append({
"key": row.get("key"),
"label": row.get("label"),
"unit": row.get("unit"),
"delta": row.get("delta"),
"deltaHuman": row.get("deltaHuman"),
"perDayHuman": row.get("perDayHuman"),
})
return compact
def compact_trend_payload(payload):
if payload.get("state") == "insufficient-history":
return payload
latest = payload.get("latestDelta") or {}
window = payload.get("windowDelta") or {}
return {
"pointCount": payload.get("pointCount"),
"oldestAt": payload.get("oldestAt"),
"latestAt": payload.get("latestAt"),
"latestDelta": {
"durationSeconds": latest.get("durationSeconds"),
"rateWarning": latest.get("rateWarning"),
"topGrowingBytes": compact_metric_rows(latest.get("topGrowingBytes") or [], 1),
"topShrinkingBytes": compact_metric_rows(latest.get("topShrinkingBytes") or [], 1),
"registryCounters": compact_metric_rows(latest.get("registryCounters") or [], 1),
},
"windowDelta": {
"durationSeconds": window.get("durationSeconds"),
"rateWarning": window.get("rateWarning"),
"topGrowingBytes": compact_metric_rows(window.get("topGrowingBytes") or [], 1),
"topShrinkingBytes": compact_metric_rows(window.get("topShrinkingBytes") or [], 1),
"registryCounters": compact_metric_rows(window.get("registryCounters") or [], 1),
},
"fullDisclosure": "rerun trend --full for all metric rows",
}
def compact_growth_point(item):
registry = item.get("registry") or {}
retention = registry.get("retentionPlan") or {}
ci_storage = item.get("ciStorage") or {}
containerd = item.get("containerd") or {}
memory = item.get("memoryPressure") or {}
memory_summary = memory.get("summary") or {}
observe = (memory.get("webObserve") or {})
return {
"observedAt": item.get("observedAt"),
"rootDisk": item.get("rootDisk"),
"sourceCount": len(item.get("sources") or []),
"registry": {
"sizeBytes": registry.get("sizeBytes"),
"sizeHuman": registry.get("sizeHuman"),
"totalTags": retention.get("totalTags"),
"totalRevisions": retention.get("totalRevisions"),
"deleteTags": retention.get("deleteTags"),
"deleteRevisions": retention.get("deleteRevisions"),
"estimatedReclaimBytes": retention.get("estimatedReclaimBytes"),
"estimatedReclaimHuman": retention.get("estimatedReclaimHuman"),
},
"ciStorage": {
"pvcCount": ci_storage.get("pvcCount"),
"estimatedBytes": ci_storage.get("estimatedBytes"),
"estimatedHuman": ci_storage.get("estimatedHuman"),
"byOwnerGroup": ci_storage.get("byOwnerGroup"),
},
"containerd": {
"state": containerd.get("state"),
"cleanupSupported": containerd.get("cleanupSupported"),
},
"memoryPressure": {
"matchedProcessCount": memory_summary.get("matchedProcessCount"),
"matchedRssBytes": memory_summary.get("matchedRssBytes"),
"matchedRssHuman": memory_summary.get("matchedRssHuman"),
"activeObserverSignals": memory_summary.get("activeObserverSignals"),
"staleObserverSignals": memory_summary.get("staleObserverSignals"),
"observeStateBytes": memory_summary.get("observeStateBytes"),
"observeStateHuman": memory_summary.get("observeStateHuman"),
"webObserveRootCount": observe.get("rootCount"),
},
}
def collect_growth_snapshot(observed_at, preflight):
root_disk = df_snapshot()
sources = disk_source_snapshot()
ci_storage = ci_storage_snapshot()
memory_pressure = collect_memory_pressure()
compact_pvc = compact_pvc_attribution(ci_storage)
if bool(OPTIONS.get("full")):
public_pvc = ci_storage
public_memory = memory_pressure
else:
public_pvc = compact_ci_storage_summary(ci_storage)
public_memory = compact_memory_summary(memory_pressure)
registry = registry_growth_snapshot()
containerd = containerd_breakdown_snapshot()
commands = {
"snapshot": "bun scripts/cli.ts gc remote %s snapshot --include-hwlab-registry --history-limit %s" % (PROVIDER_ID, int(OPTIONS.get("historyLimit") or 12)),
"trend": "bun scripts/cli.ts gc remote %s trend --history-limit %s" % (PROVIDER_ID, int(OPTIONS.get("historyLimit") or 12)),
"registryPlan": "bun scripts/cli.ts gc remote %s plan --target-use-percent 70 --include-hwlab-registry --limit 50" % PROVIDER_ID,
"hwlabCiRetention": ((ci_storage.get("handoff") or {}).get("hwlab") or {}).get("dryRun"),
"agentrunRetention": ((ci_storage.get("handoff") or {}).get("agentrun") or {}).get("dryRun"),
"remotePolicy": "bun scripts/cli.ts gc remote %s policy plan" % PROVIDER_ID,
}
if not bool(OPTIONS.get("full")):
commands = {
"trend": "bun scripts/cli.ts gc remote %s trend --history-limit %s" % (PROVIDER_ID, int(OPTIONS.get("historyLimit") or 12)),
"status": "bun scripts/cli.ts gc remote %s status --limit %s" % (PROVIDER_ID, int(OPTIONS.get("limit") or 50)),
"full": "bun scripts/cli.ts gc remote %s snapshot --full --no-save" % PROVIDER_ID,
}
return {
"ok": True,
"action": "gc remote snapshot",
"providerId": PROVIDER_ID,
"dryRun": True,
"mutation": False,
"diagnosticStateMutation": bool(OPTIONS.get("saveSnapshot", True)),
"observedAt": observed_at,
"rootDisk": root_disk,
"clusterPreflight": preflight,
"sources": sources,
"registry": registry,
"pvcAttribution": public_pvc,
"memoryPressure": public_memory,
"containerd": containerd,
"policy": growth_watermark_policy(root_disk or {}),
"commands": commands,
}
+383
View File
@@ -0,0 +1,383 @@
def pv_host_path(pv):
spec = (pv or {}).get("spec") or {}
host_path = (spec.get("hostPath") or {}).get("path")
if isinstance(host_path, str) and host_path:
return host_path
local_path = (spec.get("local") or {}).get("path")
if isinstance(local_path, str) and local_path:
return local_path
return None
def pvc_owner_group(namespace, owner):
owner = str(owner or "")
if namespace == "agentrun-ci":
return "agentrun"
if namespace == "hwlab-ci":
if owner.startswith("agentrun-"):
return "agentrun"
return "hwlab"
if namespace.startswith("hwlab-"):
return "hwlab-runtime"
return "other"
def parse_k8s_quantity(value):
if value is None:
return None
raw = str(value).strip()
match = re.match(r"^([0-9]+(?:\.[0-9]+)?)(Ki|Mi|Gi|Ti|K|M|G|T)?$", raw)
if not match:
return None
multiplier = {
None: 1,
"K": 1000,
"M": 1000**2,
"G": 1000**3,
"T": 1000**4,
"Ki": 1024,
"Mi": 1024**2,
"Gi": 1024**3,
"Ti": 1024**4,
}.get(match.group(2), 1)
return int(float(match.group(1)) * multiplier)
def metadata_owner(meta):
refs = meta.get("ownerReferences") or []
if refs:
first = refs[0] or {}
return first.get("kind"), first.get("name"), [{"kind": item.get("kind"), "name": item.get("name")} for item in refs[:5]]
labels = meta.get("labels") or {}
annotations = meta.get("annotations") or {}
for key in [
"tekton.dev/pipelineRun",
"tekton.dev/taskRun",
"agentrun.unidesk/run-id",
"hwlab.unidesk/run-id",
"app.kubernetes.io/instance",
]:
value = labels.get(key) or annotations.get(key)
if value:
return "Label", value, []
return None, None, []
def ci_storage_snapshot():
namespaces = set(config_list(PVC_CONFIG, "namespaces", ["hwlab-ci", "agentrun-ci"]))
candidate_namespaces = set(config_list(PVC_CONFIG, "candidateNamespaces", []))
hwlab_node = config_str(PVC_CONFIG, "hwlabNode", PROVIDER_ID)
hwlab_lane = config_str(PVC_CONFIG, "hwlabLane", "v03")
agentrun_node = config_str(PVC_CONFIG, "agentrunNode", PROVIDER_ID)
agentrun_lane = config_str(PVC_CONFIG, "agentrunLane", "v02")
limit = config_int(PVC_CONFIG, "limit", int(OPTIONS.get("limit") or 50), minimum=1, maximum=5000)
pv_data = kubectl_json(["get", "pv"], 30) or {}
pvc_data = kubectl_json(["get", "pvc", "-A"], 30) or {}
pod_data = kubectl_json(["get", "pod", "-A"], 30) or {}
pvs = {}
for pv in pv_data.get("items") or []:
meta = pv.get("metadata") or {}
name = meta.get("name")
if name:
pvs[name] = pv
mounts = {}
for pod in pod_data.get("items") or []:
meta = pod.get("metadata") or {}
ns = str(meta.get("namespace") or "")
pod_name = str(meta.get("name") or "")
phase = str(((pod.get("status") or {}).get("phase")) or "")
if phase in set(["Succeeded", "Failed"]):
continue
spec = pod.get("spec") or {}
for vol in spec.get("volumes") or []:
claim = (vol.get("persistentVolumeClaim") or {}).get("claimName")
if claim:
mounts.setdefault((ns, claim), []).append(pod_name)
rows = []
for pvc in pvc_data.get("items") or []:
meta = pvc.get("metadata") or {}
spec = pvc.get("spec") or {}
status = pvc.get("status") or {}
ns = str(meta.get("namespace") or "")
name = str(meta.get("name") or "")
if ns not in namespaces:
continue
volume = str(spec.get("volumeName") or "")
pv = pvs.get(volume) or {}
pv_spec = pv.get("spec") or {}
pv_meta = pv.get("metadata") or {}
owner_kind, owner_name, owner_refs = metadata_owner(meta)
requested = parse_k8s_quantity((((spec.get("resources") or {}).get("requests") or {}).get("storage")))
host_path = pv_host_path(pv)
active = sorted(mounts.get((ns, name), []))
estimated = du_size(host_path, 8) if host_path else None
candidate_reasons = []
if not active:
candidate_reasons.append("no-active-mount-observed")
if status.get("phase") != "Bound":
candidate_reasons.append("pvc-not-bound")
if (pv.get("status") or {}).get("phase") == "Released":
candidate_reasons.append("pv-released")
review_candidate = ns in candidate_namespaces and len(candidate_reasons) > 0
rows.append({
"namespace": ns,
"pvc": name,
"volume": volume or None,
"phase": status.get("phase"),
"pvPhase": (pv.get("status") or {}).get("phase"),
"ownerKind": owner_kind,
"owner": owner_name,
"ownerRefs": owner_refs,
"ownerGroup": pvc_owner_group(ns, owner_name),
"storageClass": spec.get("storageClassName") or pv_spec.get("storageClassName"),
"reclaimPolicy": pv_spec.get("persistentVolumeReclaimPolicy"),
"requestedBytes": requested,
"requestedHuman": fmt_bytes(requested or 0),
"hostPath": host_path,
"pvCreatedAt": (pv_meta.get("creationTimestamp") if isinstance(pv_meta, dict) else None),
"pvcCreatedAt": meta.get("creationTimestamp"),
"activeMountPods": active,
"estimatedBytes": estimated,
"estimatedHuman": fmt_bytes(estimated or 0),
"reviewCandidate": review_candidate,
"reviewReasons": candidate_reasons,
"dryRunOnly": True,
})
rows.sort(key=lambda item: safe_int(item.get("estimatedBytes")), reverse=True)
by_namespace = {}
by_owner_group = {}
for row in rows:
for bucket, key in [(by_namespace, row.get("namespace") or "unknown"), (by_owner_group, row.get("ownerGroup") or "unknown")]:
current = bucket.setdefault(key, {"count": 0, "estimatedBytes": 0, "activeMountCount": 0})
current["count"] += 1
current["estimatedBytes"] += safe_int(row.get("estimatedBytes"))
current["activeMountCount"] += len(row.get("activeMountPods") or [])
current["estimatedHuman"] = fmt_bytes(current["estimatedBytes"])
review_candidates = [row for row in rows if row.get("reviewCandidate")]
return {
"scope": "YAML-configured PVC namespaces",
"configSource": "config/unidesk-cli.yaml#gc.remote.targets.%s.pvcAttribution" % PROVIDER_ID,
"namespaces": sorted(namespaces),
"candidateNamespaces": sorted(candidate_namespaces),
"pvcCount": len(rows),
"reviewCandidateCount": len(review_candidates),
"estimatedBytes": sum(safe_int(row.get("estimatedBytes")) for row in rows),
"estimatedHuman": fmt_bytes(sum(safe_int(row.get("estimatedBytes")) for row in rows)),
"requestedBytes": sum(safe_int(row.get("requestedBytes")) for row in rows),
"requestedHuman": fmt_bytes(sum(safe_int(row.get("requestedBytes")) for row in rows)),
"byNamespace": by_namespace,
"byOwnerGroup": by_owner_group,
"topPvcs": rows[:limit],
"reviewCandidates": review_candidates[:limit],
"handoff": {
"hwlab": {
"dryRun": "bun scripts/cli.ts hwlab nodes control-plane cleanup-runs --node %s --lane %s --min-age-minutes 30 --limit 200 --dry-run" % (hwlab_node, hwlab_lane),
"releasedPvs": "bun scripts/cli.ts hwlab nodes control-plane cleanup-released-pvs --node %s --lane %s --limit 200 --dry-run" % (hwlab_node, hwlab_lane),
},
"agentrun": {
"dryRun": "bun scripts/cli.ts agentrun control-plane cleanup-runs --node %s --lane %s --min-age-minutes 30 --limit 200 --dry-run" % (agentrun_node, agentrun_lane),
"releasedPvs": "bun scripts/cli.ts agentrun control-plane cleanup-released-pvs --node %s --lane %s --limit 200 --dry-run" % (agentrun_node, agentrun_lane),
},
},
"policy": "analysis-only; remote GC never deletes PVC/PV/local-path data and only hands off to owner-aware retention commands",
}
def compact_pvc_row(row):
return {
"namespace": row.get("namespace"),
"pvc": row.get("pvc"),
"phase": row.get("phase"),
"pvPhase": row.get("pvPhase"),
"ownerKind": row.get("ownerKind"),
"owner": row.get("owner"),
"ownerGroup": row.get("ownerGroup"),
"estimatedBytes": row.get("estimatedBytes"),
"estimatedHuman": row.get("estimatedHuman"),
"activeMountCount": len(row.get("activeMountPods") or []),
"reviewCandidate": row.get("reviewCandidate"),
"reviewReasons": row.get("reviewReasons"),
}
def compact_pvc_attribution(payload):
if bool(OPTIONS.get("full")):
return payload
top = payload.get("topPvcs") or []
review = payload.get("reviewCandidates") or []
compact_top = [compact_pvc_row(row) for row in top[:8] if isinstance(row, dict)]
return {
"configSource": payload.get("configSource"),
"candidateNamespaces": payload.get("candidateNamespaces"),
"pvcCount": payload.get("pvcCount"),
"reviewCandidateCount": payload.get("reviewCandidateCount"),
"estimatedBytes": payload.get("estimatedBytes"),
"estimatedHuman": payload.get("estimatedHuman"),
"byNamespace": payload.get("byNamespace"),
"byOwnerGroup": payload.get("byOwnerGroup"),
"topPvcs": compact_top,
"reviewCandidates": [compact_pvc_row(row) for row in review[:2] if isinstance(row, dict)],
"handoff": payload.get("handoff"),
"compacted": True,
"fullDisclosure": "rerun with --full for hostPath, creation timestamps and complete row details",
}
def compact_ci_storage_summary(payload):
return {
"scope": payload.get("scope"),
"configSource": payload.get("configSource"),
"pvcCount": payload.get("pvcCount"),
"reviewCandidateCount": payload.get("reviewCandidateCount"),
"estimatedBytes": payload.get("estimatedBytes"),
"estimatedHuman": payload.get("estimatedHuman"),
"requestedBytes": payload.get("requestedBytes"),
"requestedHuman": payload.get("requestedHuman"),
"compacted": True,
"fullDisclosure": "use pvcAttribution or --full for row-level details",
}
def local_path_storage_root():
root = config_str(LOCAL_PATH_CONFIG, "root", "")
if not root:
return ""
return os.path.realpath(os.path.abspath(root))
def local_path_orphan_prefixes():
return config_list(LOCAL_PATH_CONFIG, "orphanDirPrefixes", [])
def is_direct_local_path_child(root, path):
resolved = os.path.realpath(os.path.abspath(path))
return os.path.dirname(resolved) == root and resolved.startswith(root.rstrip("/") + "/")
def local_path_referenced_paths(root):
pv_data = kubectl_json(["get", "pv"], 30) or {}
referenced = set()
for pv in pv_data.get("items") or []:
host_path = pv_host_path(pv)
if not host_path:
continue
resolved = os.path.realpath(os.path.abspath(host_path))
if resolved == root or resolved.startswith(root.rstrip("/") + "/"):
referenced.add(resolved)
return referenced
def assert_local_path_orphan(path, referenced=None):
root = local_path_storage_root()
if not root:
raise RuntimeError("localPathStorage.root is not configured")
prefixes = local_path_orphan_prefixes()
resolved = os.path.realpath(os.path.abspath(path))
name = os.path.basename(resolved)
if not is_direct_local_path_child(root, resolved):
raise RuntimeError("refusing to remove local-path orphan outside configured direct storage root: %s" % path)
if os.path.islink(path) or not os.path.isdir(resolved):
raise RuntimeError("refusing to remove non-directory or symlink local-path orphan: %s" % path)
if not prefixes or not any(name.startswith(prefix) for prefix in prefixes):
raise RuntimeError("refusing to remove local-path orphan outside YAML prefix allowlist: %s" % path)
refs = referenced if referenced is not None else local_path_referenced_paths(root)
for ref in refs:
if resolved == ref or ref.startswith(resolved.rstrip("/") + "/") or resolved.startswith(ref.rstrip("/") + "/"):
raise RuntimeError("refusing to remove local-path path still referenced by PV: %s" % path)
if path_has_open_fd(resolved):
raise RuntimeError("refusing to remove local-path orphan with open fd/cwd reference: %s" % path)
return resolved
def local_path_orphan_rows():
if not config_bool(LOCAL_PATH_CONFIG, "enabled", False):
return [], {"ok": False, "reason": "local-path-orphan-cleanup-disabled"}
root = local_path_storage_root()
prefixes = local_path_orphan_prefixes()
if not root or not os.path.isdir(root) or os.path.islink(root):
return [], {"ok": False, "reason": "local-path-root-unavailable", "root": root}
if not prefixes:
return [], {"ok": False, "reason": "local-path-prefix-allowlist-empty", "root": root}
referenced = local_path_referenced_paths(root)
min_age_minutes = config_float(LOCAL_PATH_CONFIG, "orphanMinAgeMinutes", 0.0, minimum=0.0)
cutoff = time.time() - min_age_minutes * 60.0
rows = []
protected = []
for name in sorted(os.listdir(root)):
path = os.path.join(root, name)
resolved = os.path.realpath(os.path.abspath(path))
try:
stat = os.lstat(path)
except OSError:
continue
if not os.path.isdir(path) or os.path.islink(path) or not any(name.startswith(prefix) for prefix in prefixes):
continue
row = {"path": resolved, "name": name, "sizeBytes": 0, "estimatedReclaimBytes": 0}
if not is_direct_local_path_child(root, resolved):
protected.append({**row, "reason": "not-direct-child"})
continue
if stat.st_mtime >= cutoff:
protected.append({**row, "reason": "younger-than-min-age"})
continue
referenced_by = [ref for ref in referenced if resolved == ref or ref.startswith(resolved.rstrip("/") + "/") or resolved.startswith(ref.rstrip("/") + "/")]
if referenced_by:
protected.append({**row, "reason": "pv-referenced", "referencedCount": len(referenced_by)})
continue
if path_has_open_fd(resolved):
protected.append({**row, "reason": "open-fd"})
continue
size = du_size(resolved, 10) or path_size(resolved)
rows.append({**row, "sizeBytes": size, "estimatedReclaimBytes": size})
rows.sort(key=lambda item: safe_int(item.get("estimatedReclaimBytes")), reverse=True)
return rows, {
"ok": True,
"root": root,
"prefixes": prefixes,
"referencedPathCount": len(referenced),
"protectedCount": len(protected),
"protectedPreview": protected[:8],
"minAgeMinutes": min_age_minutes,
}
def local_path_orphan_candidate():
rows, meta = local_path_orphan_rows()
if not meta.get("ok"):
return {
"id": "k3s-local-path-orphans:unavailable",
"kind": "k3s-local-path-orphans-unavailable",
"risk": "blocked",
"description": "K3s local-path orphan cleanup is unavailable or disabled by YAML",
"estimatedReclaimBytes": 0,
"diagnostic": meta,
}
limit = int(OPTIONS.get("limit") or 50)
selected = rows[:limit]
estimated = sum(safe_int(row.get("estimatedReclaimBytes")) for row in selected)
if estimated <= 0:
return None
return {
"id": "k3s-local-path-orphans:delete",
"kind": "k3s-local-path-orphans-delete",
"risk": "medium",
"description": "Delete YAML-allowlisted k3s local-path storage directories that no PV references and no process has open",
"path": meta.get("root"),
"sizeBytes": estimated,
"estimatedReclaimBytes": estimated,
"orphanCount": len(rows),
"selectedOrphanCount": len(selected),
"protectedCount": meta.get("protectedCount"),
"referencedPathCount": meta.get("referencedPathCount"),
"selectedPreview": [{"name": row.get("name"), "path": row.get("path"), "estimatedReclaimBytes": row.get("estimatedReclaimBytes")} for row in selected[:8]],
"protectedPreview": meta.get("protectedPreview"),
"action": {"op": "rm-recursive", "allowlist": "yaml-local-path-orphan", "root": meta.get("root"), "limit": limit},
}
def execute_local_path_orphan_cleanup():
rows, meta = local_path_orphan_rows()
if not meta.get("ok"):
raise RuntimeError("local-path orphan cleanup unavailable: %s" % meta.get("reason"))
limit = int(OPTIONS.get("limit") or 50)
selected = rows[:limit]
referenced = local_path_referenced_paths(local_path_storage_root())
reclaimed = 0
deleted = []
for row in selected:
path = assert_local_path_orphan(row.get("path"), referenced)
before = du_size(path, 10) or path_size(path)
shutil.rmtree(path, ignore_errors=True)
reclaimed += before
deleted.append({"name": row.get("name"), "path": path, "reclaimedBytes": before})
return {
"reclaimedBytes": reclaimed,
"deletedOrphanCount": len(deleted),
"deletedPreview": deleted[:12],
"root": meta.get("root"),
"protectedCount": meta.get("protectedCount"),
}
+677
View File
@@ -0,0 +1,677 @@
def active_hwlab_ci_writes():
result = command(["sh", "-lc", "KUBECONFIG=/etc/rancher/k3s/k3s.yaml kubectl get pipelinerun,taskrun -n hwlab-ci --no-headers 2>/dev/null | awk '$2 != \"True\" && $2 != \"False\" {print}' | head -40"], 15)
lines = [line for line in (result.get("stdout") or "").splitlines() if line.strip()]
return {"ok": result["exitCode"] == 0, "activeCount": len(lines), "activePreview": lines, "command": bounded(result)}
def active_hwlab_ci_jobs():
result = command(["sh", "-lc", "KUBECONFIG=/etc/rancher/k3s/k3s.yaml kubectl get jobs -n hwlab-ci --no-headers 2>/dev/null | awk '$2 != \"Complete\" && $2 != \"Failed\" {print}' | head -40"], 15)
lines = [line for line in (result.get("stdout") or "").splitlines() if line.strip()]
return {"ok": result["exitCode"] == 0, "activeCount": len(lines), "activePreview": lines, "command": bounded(result)}
def wait_no_active_hwlab_ci(timeout=180):
deadline = time.time() + timeout
last = None
while time.time() < deadline:
writes = active_hwlab_ci_writes()
jobs = active_hwlab_ci_jobs()
last = {"writes": writes, "jobs": jobs}
if writes.get("ok") and jobs.get("ok") and int(writes.get("activeCount") or 0) == 0 and int(jobs.get("activeCount") or 0) == 0:
return {"ok": True, "last": last}
time.sleep(5)
return {"ok": False, "last": last}
def kubectl_json(args, timeout=20):
result = command(["env", "KUBECONFIG=/etc/rancher/k3s/k3s.yaml", "kubectl"] + args + ["-o", "json"], timeout)
if result["exitCode"] != 0:
return None
try:
return json.loads(result["stdout"] or "{}")
except Exception:
return None
def kctl(args, timeout=30):
return command(["env", "KUBECONFIG=/etc/rancher/k3s/k3s.yaml", "kubectl"] + args, timeout)
def workload_image_refs():
result = command(["sh", "-lc", "KUBECONFIG=/etc/rancher/k3s/k3s.yaml kubectl get deploy,sts,ds,pod -A -o jsonpath='{range .items[*]}{range .spec.containers[*]}{.image}{\"\\n\"}{end}{range .spec.initContainers[*]}{.image}{\"\\n\"}{end}{range .spec.template.spec.containers[*]}{.image}{\"\\n\"}{end}{range .spec.template.spec.initContainers[*]}{.image}{\"\\n\"}{end}{end}' 2>/dev/null | sort -u"], 30)
refs = set()
digests = set()
for image in (result.get("stdout") or "").splitlines():
image = image.strip()
if not image.startswith("127.0.0.1:5000/"):
continue
ref = image.split("127.0.0.1:5000/", 1)[1]
if "@sha256:" in ref:
repo, digest = ref.split("@", 1)
refs.add((repo, "@" + digest))
digests.add("sha256:" + digest.split(":", 1)[1])
elif ":" in ref:
repo, tag = ref.rsplit(":", 1)
refs.add((repo, tag))
return refs, digests, bounded(result)
def registry_request(method, path, headers=None, timeout=20):
url = "http://127.0.0.1:5000" + path
req = urllib.request.Request(url, method=method, headers=headers or {})
with urllib.request.urlopen(req, timeout=timeout) as response:
body = response.read()
return {"status": response.status, "headers": dict(response.headers), "body": body.decode("utf-8", errors="replace")}
def registry_tag_rows():
rows = []
root = REGISTRY_REPOSITORY_ROOT
if not os.path.isdir(root):
return rows
for repo_root, dirs, files in os.walk(root):
if os.path.basename(repo_root) != "tags":
continue
rel = os.path.relpath(repo_root, root)
suffix = "/_manifests/tags"
if not rel.endswith(suffix):
continue
repo = rel[:-len(suffix)]
try:
tags = os.listdir(repo_root)
except OSError:
continue
for tag in sorted(tags):
link = os.path.join(repo_root, tag, "current", "link")
if not os.path.isfile(link):
continue
try:
with open(link, "r", encoding="utf-8") as handle:
digest = handle.read().strip()
stat = os.stat(link)
except OSError:
continue
rows.append({
"repo": repo,
"tag": tag,
"digest": digest,
"mtime": stat.st_mtime,
"mtimeIso": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(stat.st_mtime)),
"path": os.path.join(repo_root, tag),
})
return rows
def registry_revision_rows():
rows = []
root = REGISTRY_REPOSITORY_ROOT
if not os.path.isdir(root):
return rows
for repo_root, dirs, files in os.walk(root):
if os.path.basename(repo_root) != "sha256":
continue
rel = os.path.relpath(repo_root, root)
suffix = "/_manifests/revisions/sha256"
if not rel.endswith(suffix):
continue
repo = rel[:-len(suffix)]
try:
revisions = os.listdir(repo_root)
except OSError:
continue
for digest_hex in sorted(revisions):
path = os.path.join(repo_root, digest_hex)
link = os.path.join(path, "link")
if not os.path.isfile(link):
continue
try:
with open(link, "r", encoding="utf-8") as handle:
digest = handle.read().strip()
stat = os.stat(link)
except OSError:
continue
rows.append({
"repo": repo,
"digest": digest,
"mtime": stat.st_mtime,
"mtimeIso": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(stat.st_mtime)),
"path": path,
})
return rows
def registry_retention_repo(repo):
return repo.startswith("hwlab/hwlab-") or repo.startswith("hwlab/cache/hwlab-")
def registry_digest_hex(digest):
if not isinstance(digest, str) or not digest.startswith("sha256:"):
return None
value = digest.split(":", 1)[1]
if re.match(r"^[0-9a-f]{64}$", value) is None:
return None
return value
def registry_blob_data_path(digest):
value = registry_digest_hex(digest)
if value is None:
return None
return os.path.join(REGISTRY_ROOT, "docker/registry/v2/blobs/sha256", value[:2], value, "data")
_manifest_cache = {}
def registry_manifest_json(digest):
if digest in _manifest_cache:
return _manifest_cache[digest]
path = registry_blob_data_path(digest)
if path is None or not os.path.isfile(path):
_manifest_cache[digest] = None
return None
try:
with open(path, "rb") as handle:
data = handle.read(8 * 1024 * 1024)
value = json.loads(data.decode("utf-8"))
except Exception:
value = None
_manifest_cache[digest] = value
return value
def registry_manifest_refs(digest):
manifest = registry_manifest_json(digest)
if not isinstance(manifest, dict):
return set()
refs = set()
config = manifest.get("config") or {}
config_digest = config.get("digest")
if isinstance(config_digest, str) and registry_digest_hex(config_digest) is not None:
refs.add(config_digest)
for item in manifest.get("layers") or []:
item_digest = (item or {}).get("digest")
if isinstance(item_digest, str) and registry_digest_hex(item_digest) is not None:
refs.add(item_digest)
for item in manifest.get("manifests") or []:
item_digest = (item or {}).get("digest")
if isinstance(item_digest, str) and registry_digest_hex(item_digest) is not None:
refs.add(item_digest)
return refs
def registry_digest_closure(seed):
seen = set()
stack = list(seed)
while stack:
digest = stack.pop()
if digest in seen or registry_digest_hex(digest) is None:
continue
seen.add(digest)
for child in registry_manifest_refs(digest):
if child not in seen:
stack.append(child)
return seen
def registry_blob_size(digest):
path = registry_blob_data_path(digest)
if path is None or not os.path.isfile(path):
return 0
try:
return int(os.lstat(path).st_blocks) * 512
except OSError:
return 0
def estimate_registry_reclaim(delete_manifest_digests, kept_manifest_digests):
deleted = registry_digest_closure(delete_manifest_digests)
kept = registry_digest_closure(kept_manifest_digests)
reclaim = deleted - kept
return sum(registry_blob_size(digest) for digest in reclaim)
def plan_registry_retention():
keep_per_repo = int(OPTIONS.get("registryKeepPerRepo") if OPTIONS.get("registryKeepPerRepo") is not None else 5)
min_age_hours = float(OPTIONS.get("registryMinAgeHours") if OPTIONS.get("registryMinAgeHours") is not None else 48)
cutoff = time.time() - min_age_hours * 3600
refs, digests, refs_command = workload_image_refs()
rows = registry_tag_rows()
revision_rows = registry_revision_rows()
by_repo = {}
for row in rows:
by_repo.setdefault(row["repo"], []).append(row)
keep = set()
keep_reasons = {}
for repo, items in by_repo.items():
items.sort(key=lambda item: item["mtime"], reverse=True)
for row in items[:keep_per_repo]:
key = (row["repo"], row["tag"])
keep.add(key)
keep_reasons[key] = "latest-per-repo"
for row in items:
key = (row["repo"], row["tag"])
if row["tag"] in REGISTRY_PROTECTED_TAGS:
keep.add(key)
keep_reasons[key] = "protected-tag"
if key in refs:
keep.add(key)
keep_reasons[key] = "workload-tag-ref"
if row["digest"] in digests:
keep.add(key)
keep_reasons[key] = "workload-digest-ref"
if row["repo"].startswith("hwlab/cache/"):
keep.add(key)
keep_reasons[key] = "cache-repo"
if row["mtime"] >= cutoff:
keep.add(key)
keep_reasons[key] = "recent-tag"
delete_rows = []
kept_count = 0
delete_by_repo = {}
keep_by_repo = {}
kept_digests = set()
for row in rows:
key = (row["repo"], row["tag"])
should_delete = (
key not in keep
and row["repo"].startswith("hwlab/hwlab-")
and re.match(r"^[0-9a-f]{7,40}$", row["tag"]) is not None
)
if should_delete:
delete_rows.append(row)
delete_by_repo[row["repo"]] = delete_by_repo.get(row["repo"], 0) + 1
else:
kept_count += 1
kept_digests.add(row["digest"])
keep_by_repo[row["repo"]] = keep_by_repo.get(row["repo"], 0) + 1
protected_digests = kept_digests | digests
protected_digests.update(row["digest"] for row in revision_rows if not registry_retention_repo(row["repo"]))
protected_digests = registry_digest_closure(protected_digests)
delete_revision_rows = []
revision_delete_by_repo = {}
for row in revision_rows:
if not registry_retention_repo(row["repo"]):
continue
if row["digest"] in protected_digests:
continue
delete_revision_rows.append(row)
revision_delete_by_repo[row["repo"]] = revision_delete_by_repo.get(row["repo"], 0) + 1
kept_revision_digests = set(row["digest"] for row in revision_rows if row not in delete_revision_rows)
delete_revision_digests = set(row["digest"] for row in delete_revision_rows)
deletable_manifests = {}
for row in delete_rows:
if row["digest"] in kept_digests:
continue
deletable_manifests.setdefault(row["repo"], set()).add(row["digest"])
for row in delete_revision_rows:
deletable_manifests.setdefault(row["repo"], set()).add(row["digest"])
deletable_manifest_count = sum(len(items) for items in deletable_manifests.values())
registry_size = du_size(REGISTRY_ROOT, 30) or 0
estimate = estimate_registry_reclaim(delete_revision_digests, kept_revision_digests)
return {
"tagRows": rows,
"revisionRows": revision_rows,
"deleteRows": delete_rows,
"deleteRevisionRows": delete_revision_rows,
"summary": {
"totalTags": len(rows),
"totalRevisions": len(revision_rows),
"repoCount": len(by_repo),
"keepPerRepo": keep_per_repo,
"minAgeHours": min_age_hours,
"protectedWorkloadRefs": len(refs),
"protectedDigestRefs": len(digests),
"protectedDigestClosure": len(protected_digests),
"keptTags": kept_count,
"deleteTags": len(delete_rows),
"deleteManifests": deletable_manifest_count,
"deleteRevisions": len(delete_revision_rows),
"deleteByRepo": delete_by_repo,
"revisionDeleteByRepo": revision_delete_by_repo,
"keepByRepo": keep_by_repo,
"registrySizeBytes": registry_size,
"estimatedReclaimBytes": estimate,
},
"deleteManifestsByRepo": {repo: sorted(list(digests)) for repo, digests in deletable_manifests.items()},
"refsCommand": refs_command,
}
def registry_deployment_preflight():
dep = kubectl_json(["-n", "hwlab-ci", "get", "deploy", "hwlab-registry"], 20)
if not dep:
return {"ok": False, "reason": "registry-deployment-missing"}
spec = ((dep.get("spec") or {}).get("template") or {}).get("spec") or {}
containers = spec.get("containers") or []
volumes = spec.get("volumes") or []
registry_container = next((item for item in containers if item.get("name") == "registry"), containers[0] if containers else {})
mounts = registry_container.get("volumeMounts") or []
has_host_path = any(((vol.get("hostPath") or {}).get("path") == REGISTRY_ROOT and vol.get("name") == "storage") for vol in volumes)
has_mount = any((mount.get("name") == "storage" and mount.get("mountPath") == "/var/lib/registry") for mount in mounts)
image = str(registry_container.get("image") or "")
ok = bool(has_host_path and has_mount and image.startswith("registry:") and spec.get("hostNetwork") is True)
return {
"ok": ok,
"reason": "ok" if ok else "unexpected-registry-deployment-shape",
"image": image,
"hostNetwork": spec.get("hostNetwork"),
"hasExpectedHostPath": has_host_path,
"hasExpectedMount": has_mount,
"replicas": (dep.get("spec") or {}).get("replicas"),
"readyReplicas": (dep.get("status") or {}).get("readyReplicas"),
}
def cronjob_suspend_states(names):
states = {}
for name in names:
data = kubectl_json(["-n", "hwlab-ci", "get", "cronjob", name], 15)
if data:
states[name] = bool(((data.get("spec") or {}).get("suspend")) is True)
return states
def patch_cronjob_suspend(name, suspend):
payload = json.dumps({"spec": {"suspend": bool(suspend)}})
return kctl(["-n", "hwlab-ci", "patch", "cronjob", name, "--type=merge", "-p", payload], 30)
def wait_registry_pod_count(target, timeout=90):
deadline = time.time() + timeout
last = None
while time.time() < deadline:
result = kctl(["-n", "hwlab-ci", "get", "pods", "-l", "app.kubernetes.io/name=hwlab-registry", "--no-headers"], 20)
last = bounded(result)
lines = [line for line in (result.get("stdout") or "").splitlines() if line.strip()]
active = []
for line in lines:
parts = line.split()
status = parts[2] if len(parts) >= 3 else ""
if status in set(["Completed", "Error", "Failed", "Succeeded"]):
continue
active.append(line)
if len(active) == target:
return {"ok": True, "lines": active, "allLines": lines, "last": last}
time.sleep(2)
return {"ok": False, "lines": [], "last": last}
def wait_pod_terminal(name, timeout=900):
deadline = time.time() + timeout
last = None
while time.time() < deadline:
data = kubectl_json(["-n", "hwlab-ci", "get", "pod", name], 20)
if data:
phase = ((data.get("status") or {}).get("phase")) or ""
last = {"phase": phase}
if phase == "Succeeded":
return {"ok": True, "phase": phase}
if phase == "Failed":
return {"ok": False, "phase": phase}
time.sleep(3)
return {"ok": False, "phase": "Timeout", "last": last}
def execute_registry_retention():
if PROVIDER_ID.upper() != "G14":
raise RuntimeError("HWLAB registry retention is only supported on G14")
deployment = registry_deployment_preflight()
if not deployment.get("ok"):
raise RuntimeError("registry deployment preflight failed: %s" % deployment.get("reason"))
plan = plan_registry_retention()
delete_rows = plan.get("deleteRows") or []
delete_revision_rows = plan.get("deleteRevisionRows") or []
delete_manifests = plan.get("deleteManifestsByRepo") or {}
if not delete_rows and not delete_revision_rows:
return {"reclaimedBytes": 0, "commandOutput": {"message": "no registry tags or revisions matched conservative retention", "registryPlan": plan.get("summary")}}
if not delete_manifests:
return {"reclaimedBytes": 0, "commandOutput": {"message": "matched manifests are still referenced by retained manifests; registry GC would not reclaim blobs", "registryPlan": plan.get("summary")}}
cronjobs = ["hwlab-g14-branch-poller", "hwlab-v02-branch-poller"]
original_crons = cronjob_suspend_states(cronjobs)
before = du_size(REGISTRY_ROOT, 60) or 0
gc_name = "hwlab-registry-gc-%s" % int(time.time())
steps = []
try:
for name in original_crons:
result = patch_cronjob_suspend(name, True)
steps.append({"step": "suspend-cronjob", "name": name, "result": bounded(result)})
if result["exitCode"] != 0:
raise RuntimeError("failed to suspend cronjob %s" % name)
idle_after_suspend = wait_no_active_hwlab_ci(180)
steps.append({"step": "idle-after-suspend", "result": idle_after_suspend})
if not idle_after_suspend.get("ok"):
raise RuntimeError("refusing registry maintenance because hwlab-ci did not become idle after suspend")
deleted_manifests = []
for repo, digests in delete_manifests.items():
encoded_repo = "/".join(urllib.parse.quote(part, safe="") for part in repo.split("/"))
for digest in digests:
try:
result = registry_request("DELETE", "/v2/%s/manifests/%s" % (encoded_repo, urllib.parse.quote(digest, safe=":")), {"Accept": "application/vnd.docker.distribution.manifest.v2+json, application/vnd.oci.image.manifest.v1+json"})
deleted_manifests.append({"repo": repo, "digest": digest, "status": result.get("status")})
except urllib.error.HTTPError as exc:
if exc.code == 404:
deleted_manifests.append({"repo": repo, "digest": digest, "status": 404})
else:
raise
steps.append({"step": "registry-api-delete-manifests", "count": len(deleted_manifests), "preview": deleted_manifests[:20]})
scale_down = kctl(["-n", "hwlab-ci", "scale", "deploy", "hwlab-registry", "--replicas=0"], 60)
steps.append({"step": "scale-registry-down", "result": bounded(scale_down)})
if scale_down["exitCode"] != 0:
raise RuntimeError("failed to scale registry down")
waited_down = wait_registry_pod_count(0, 120)
steps.append({"step": "wait-registry-down", "result": waited_down})
if not waited_down.get("ok"):
raise RuntimeError("registry pod did not scale down")
deleted = []
for row in delete_rows:
path = os.path.abspath(str(row.get("path") or ""))
if not path.startswith(REGISTRY_REPOSITORY_ROOT + "/") or "/_manifests/tags/" not in path:
raise RuntimeError("refusing unexpected registry tag path: %s" % path)
if not re.match(r"^[0-9a-f]{7,40}$", str(row.get("tag") or "")):
raise RuntimeError("refusing unexpected registry tag name: %s" % row.get("tag"))
if os.path.isdir(path) and not os.path.islink(path):
shutil.rmtree(path)
deleted.append({"repo": row.get("repo"), "tag": row.get("tag"), "digest": row.get("digest")})
steps.append({"step": "delete-tag-directories", "count": len(deleted)})
deleted_revisions = []
for row in delete_revision_rows:
path = os.path.abspath(str(row.get("path") or ""))
digest_hex = registry_digest_hex(str(row.get("digest") or ""))
if digest_hex is None:
raise RuntimeError("refusing unexpected registry revision digest: %s" % row.get("digest"))
if not path.startswith(REGISTRY_REPOSITORY_ROOT + "/") or "/_manifests/revisions/sha256/" not in path:
raise RuntimeError("refusing unexpected registry revision path: %s" % path)
if os.path.basename(path) != digest_hex:
raise RuntimeError("refusing registry revision path/digest mismatch: %s" % path)
if os.path.isdir(path) and not os.path.islink(path):
shutil.rmtree(path)
deleted_revisions.append({"repo": row.get("repo"), "digest": row.get("digest")})
steps.append({"step": "delete-revision-directories", "count": len(deleted_revisions)})
overrides = {
"apiVersion": "v1",
"spec": {
"restartPolicy": "Never",
"containers": [{
"name": "registry-gc",
"image": "registry:2.8.3",
"command": ["registry", "garbage-collect", "/etc/docker/registry/config.yml"],
"volumeMounts": [{"name": "storage", "mountPath": "/var/lib/registry"}],
}],
"volumes": [{"name": "storage", "hostPath": {"path": REGISTRY_ROOT, "type": "DirectoryOrCreate"}}],
},
}
run_gc = kctl(["-n", "hwlab-ci", "run", gc_name, "--restart=Never", "--image=registry:2.8.3", "--overrides=%s" % json.dumps(overrides)], 60)
steps.append({"step": "start-registry-gc-pod", "result": bounded(run_gc), "pod": gc_name})
if run_gc["exitCode"] != 0:
raise RuntimeError("failed to start registry GC pod")
waited_gc = wait_pod_terminal(gc_name, 900)
steps.append({"step": "wait-registry-gc", "result": waited_gc})
logs = kctl(["-n", "hwlab-ci", "logs", gc_name], 120)
steps.append({"step": "registry-gc-logs", "result": bounded(logs)})
if not waited_gc.get("ok"):
raise RuntimeError("registry GC pod did not complete successfully")
finally:
cleanup_gc = kctl(["-n", "hwlab-ci", "delete", "pod", gc_name, "--ignore-not-found=true"], 60)
steps.append({"step": "delete-registry-gc-pod", "result": bounded(cleanup_gc)})
scale_up = kctl(["-n", "hwlab-ci", "scale", "deploy", "hwlab-registry", "--replicas=%s" % int(deployment.get("replicas") or 1)], 60)
steps.append({"step": "scale-registry-up", "result": bounded(scale_up)})
rollout = kctl(["-n", "hwlab-ci", "rollout", "status", "deploy/hwlab-registry", "--timeout=180s"], 200)
steps.append({"step": "wait-registry-rollout", "result": bounded(rollout)})
for name, was_suspended in original_crons.items():
restore = patch_cronjob_suspend(name, was_suspended)
steps.append({"step": "restore-cronjob", "name": name, "suspend": was_suspended, "result": bounded(restore)})
after = du_size(REGISTRY_ROOT, 60) or 0
return {
"reclaimedBytes": max(0, before - after),
"commandOutput": {
"registryPlan": plan.get("summary"),
"deletedTagCount": len(delete_rows),
"deletedRevisionCount": len(delete_revision_rows),
"deletedManifestCount": sum(len(items) for items in delete_manifests.values()),
"diskBeforeBytes": before,
"diskAfterBytes": after,
"steps": steps[-12:],
},
}
def execute_registry_garbage_collect_only():
if PROVIDER_ID.upper() != "G14":
raise RuntimeError("HWLAB registry garbage-collect is only supported on G14")
deployment = registry_deployment_preflight()
if not deployment.get("ok"):
raise RuntimeError("registry deployment preflight failed: %s" % deployment.get("reason"))
cronjobs = ["hwlab-g14-branch-poller", "hwlab-v02-branch-poller"]
original_crons = cronjob_suspend_states(cronjobs)
before = du_size(REGISTRY_ROOT, 60) or 0
gc_name = "hwlab-registry-gc-%s" % int(time.time())
steps = []
try:
for name in original_crons:
result = patch_cronjob_suspend(name, True)
steps.append({"step": "suspend-cronjob", "name": name, "result": bounded(result)})
if result["exitCode"] != 0:
raise RuntimeError("failed to suspend cronjob %s" % name)
idle_after_suspend = wait_no_active_hwlab_ci(180)
steps.append({"step": "idle-after-suspend", "result": idle_after_suspend})
if not idle_after_suspend.get("ok"):
raise RuntimeError("refusing registry maintenance because hwlab-ci did not become idle after suspend")
scale_down = kctl(["-n", "hwlab-ci", "scale", "deploy", "hwlab-registry", "--replicas=0"], 60)
steps.append({"step": "scale-registry-down", "result": bounded(scale_down)})
if scale_down["exitCode"] != 0:
raise RuntimeError("failed to scale registry down")
waited_down = wait_registry_pod_count(0, 120)
steps.append({"step": "wait-registry-down", "result": waited_down})
if not waited_down.get("ok"):
raise RuntimeError("registry pod did not scale down")
overrides = {
"apiVersion": "v1",
"spec": {
"restartPolicy": "Never",
"containers": [{
"name": "registry-gc",
"image": "registry:2.8.3",
"command": ["registry", "garbage-collect", "/etc/docker/registry/config.yml"],
"volumeMounts": [{"name": "storage", "mountPath": "/var/lib/registry"}],
}],
"volumes": [{"name": "storage", "hostPath": {"path": REGISTRY_ROOT, "type": "DirectoryOrCreate"}}],
},
}
run_gc = kctl(["-n", "hwlab-ci", "run", gc_name, "--restart=Never", "--image=registry:2.8.3", "--overrides=%s" % json.dumps(overrides)], 60)
steps.append({"step": "start-registry-gc-pod", "result": bounded(run_gc), "pod": gc_name})
if run_gc["exitCode"] != 0:
raise RuntimeError("failed to start registry GC pod")
waited_gc = wait_pod_terminal(gc_name, 900)
steps.append({"step": "wait-registry-gc", "result": waited_gc})
logs = kctl(["-n", "hwlab-ci", "logs", gc_name], 120)
steps.append({"step": "registry-gc-logs", "result": bounded(logs)})
if not waited_gc.get("ok"):
raise RuntimeError("registry GC pod did not complete successfully")
finally:
cleanup_gc = kctl(["-n", "hwlab-ci", "delete", "pod", gc_name, "--ignore-not-found=true"], 60)
steps.append({"step": "delete-registry-gc-pod", "result": bounded(cleanup_gc)})
scale_up = kctl(["-n", "hwlab-ci", "scale", "deploy", "hwlab-registry", "--replicas=%s" % int(deployment.get("replicas") or 1)], 60)
steps.append({"step": "scale-registry-up", "result": bounded(scale_up)})
rollout = kctl(["-n", "hwlab-ci", "rollout", "status", "deploy/hwlab-registry", "--timeout=180s"], 200)
steps.append({"step": "wait-registry-rollout", "result": bounded(rollout)})
for name, was_suspended in original_crons.items():
restore = patch_cronjob_suspend(name, was_suspended)
steps.append({"step": "restore-cronjob", "name": name, "suspend": was_suspended, "result": bounded(restore)})
after = du_size(REGISTRY_ROOT, 60) or 0
return {
"reclaimedBytes": max(0, before - after),
"commandOutput": {
"message": "official registry garbage-collect only; no additional tag deletion",
"diskBeforeBytes": before,
"diskAfterBytes": after,
"steps": steps[-12:],
},
}
def start_registry_retention_job(mode):
job_id = "g14-registry-%s-%s" % (int(time.time()), os.getpid())
paths = job_paths(job_id)
started_at = now_iso()
initial = {
"ok": True,
"action": "gc remote status",
"providerId": PROVIDER_ID,
"jobId": job_id,
"status": "running",
"kind": "hwlab-registry-retention-gc" if mode == "retention" else "hwlab-registry-garbage-collect",
"mode": mode,
"startedAt": started_at,
"statePath": paths["state"],
"logPath": paths["log"],
"options": OPTIONS,
}
write_json_atomic(paths["state"], initial)
pid = os.fork()
if pid != 0:
return {
"status": "started",
"reclaimedBytes": None,
"commandOutput": {
"jobId": job_id,
"pid": pid,
"statePath": paths["state"],
"logPath": paths["log"],
"statusCommand": "bun scripts/cli.ts gc remote %s status --job-id %s" % (PROVIDER_ID, job_id),
"message": "registry retention GC is running as a detached remote job",
},
}
try:
os.setsid()
except Exception:
pass
try:
devnull = os.open(os.devnull, os.O_RDONLY)
os.dup2(devnull, 0)
os.close(devnull)
except Exception:
pass
try:
log_handle = open(paths["log"], "a", encoding="utf-8", buffering=1)
os.dup2(log_handle.fileno(), 1)
os.dup2(log_handle.fileno(), 2)
except Exception:
log_handle = None
try:
print("[%s] starting HWLAB registry %s job %s" % (now_iso(), mode, job_id), flush=True)
result = execute_registry_retention() if mode == "retention" else execute_registry_garbage_collect_only()
payload = dict(initial)
payload.update({
"status": "succeeded",
"finishedAt": now_iso(),
"result": result,
"diskAfter": df_snapshot(),
"clusterAfter": cluster_preflight(),
})
write_json_atomic(paths["state"], payload)
print("[%s] completed HWLAB registry %s job %s" % (now_iso(), mode, job_id), flush=True)
os._exit(0)
except Exception as exc:
payload = dict(initial)
payload.update({
"ok": False,
"status": "failed",
"finishedAt": now_iso(),
"error": str(exc),
"diskAfter": df_snapshot(),
"clusterAfter": cluster_preflight(),
})
try:
write_json_atomic(paths["state"], payload)
except Exception:
pass
print("[%s] failed HWLAB registry %s job %s: %s" % (now_iso(), mode, job_id, exc), flush=True)
os._exit(1)
finally:
try:
if log_handle:
log_handle.close()
except Exception:
pass
File diff suppressed because it is too large Load Diff
+57
View File
@@ -0,0 +1,57 @@
def configured_observe_roots():
roots = config_list(MEMORY_CONFIG, "observeStateRoots", config_list(MEMORY_CONFIG, "webObserveRoots", []))
return [os.path.abspath(item) for item in roots if isinstance(item, str) and item.startswith("/")]
def is_direct_observe_run_path(path):
resolved = os.path.abspath(path)
for root in configured_observe_roots():
if os.path.dirname(resolved) == root and resolved.startswith(root.rstrip("/") + "/"):
return True
return False
def path_has_open_fd(path):
resolved = os.path.realpath(path)
prefix = resolved.rstrip("/") + "/"
proc_root = "/proc"
try:
pids = [name for name in os.listdir(proc_root) if name.isdigit()]
except OSError:
return True
for pid in pids:
base = os.path.join(proc_root, pid)
for name in ["cwd", "root"]:
try:
target = os.path.realpath(os.readlink(os.path.join(base, name)))
except OSError:
continue
if target == resolved or target.startswith(prefix):
return True
fd_dir = os.path.join(base, "fd")
try:
fds = os.listdir(fd_dir)
except OSError:
continue
for fd in fds:
try:
target = os.path.realpath(os.readlink(os.path.join(fd_dir, fd)))
except OSError:
continue
if target == resolved or target.startswith(prefix):
return True
return False
def assert_web_observe_candidate(path):
resolved = os.path.abspath(path)
if not is_direct_observe_run_path(resolved):
raise RuntimeError("refusing to remove web-observe path outside configured direct run roots: %s" % path)
if os.path.islink(resolved) or not os.path.isdir(resolved):
raise RuntimeError("refusing to remove non-directory or symlink web-observe path: %s" % path)
stale_hours = config_float(MEMORY_CONFIG, "staleRunMaxAgeHours", 6.0, minimum=0.0)
record = observe_run_record(resolved, stale_hours)
if record.get("pidAlive"):
raise RuntimeError("refusing to remove active web-observe run with live pid: %s" % path)
if not record.get("staleSignal"):
raise RuntimeError("refusing to remove web-observe run without stale signal: %s" % path)
if path_has_open_fd(resolved):
raise RuntimeError("refusing to remove web-observe run with open fd/cwd reference: %s" % path)
return record
+61 -1
View File
@@ -18,6 +18,10 @@ interface RemoteGcOptions {
tmp: boolean; tmp: boolean;
tmpMinAgeHours: number; tmpMinAgeHours: number;
toolCaches: boolean; toolCaches: boolean;
webObserveArtifacts: boolean;
k3sImageCache: boolean;
hostContainerdCache: boolean;
localPathOrphans: boolean;
aptCache: boolean; aptCache: boolean;
coreDumps: boolean; coreDumps: boolean;
coreDumpMinAgeHours: number; coreDumpMinAgeHours: number;
@@ -45,6 +49,10 @@ const DEFAULT_REMOTE_OPTIONS: RemoteGcOptions = {
tmp: true, tmp: true,
tmpMinAgeHours: 24, tmpMinAgeHours: 24,
toolCaches: false, toolCaches: false,
webObserveArtifacts: false,
k3sImageCache: false,
hostContainerdCache: false,
localPathOrphans: false,
aptCache: true, aptCache: true,
coreDumps: true, coreDumps: true,
coreDumpMinAgeHours: 1, coreDumpMinAgeHours: 1,
@@ -63,6 +71,16 @@ const GC_CONFIG_RELATIVE_PATH = "config/unidesk-cli.yaml";
const GC_REMOTE_CONFIG_REF = `${GC_CONFIG_RELATIVE_PATH}#gc.remote.targets`; const GC_REMOTE_CONFIG_REF = `${GC_CONFIG_RELATIVE_PATH}#gc.remote.targets`;
const GC_REMOTE_RUNNER_RELATIVE_PATH = "scripts/src/gc-remote-runner.py"; const GC_REMOTE_RUNNER_RELATIVE_PATH = "scripts/src/gc-remote-runner.py";
const GC_REMOTE_RUNNER_CONFIG_PLACEHOLDER = "__UNIDESK_GC_REMOTE_CONFIG_BASE64__"; const GC_REMOTE_RUNNER_CONFIG_PLACEHOLDER = "__UNIDESK_GC_REMOTE_CONFIG_BASE64__";
const GC_REMOTE_WEB_OBSERVE_RELATIVE_PATH = "scripts/src/gc-remote-web-observe.py";
const GC_REMOTE_WEB_OBSERVE_PLACEHOLDER = "# __UNIDESK_GC_REMOTE_WEB_OBSERVE_HELPERS__";
const GC_REMOTE_CONTAINERD_RELATIVE_PATH = "scripts/src/gc-remote-containerd.py";
const GC_REMOTE_CONTAINERD_PLACEHOLDER = "# __UNIDESK_GC_REMOTE_CONTAINERD_HELPERS__";
const GC_REMOTE_PVC_RELATIVE_PATH = "scripts/src/gc-remote-pvc.py";
const GC_REMOTE_PVC_PLACEHOLDER = "# __UNIDESK_GC_REMOTE_PVC_HELPERS__";
const GC_REMOTE_GROWTH_RELATIVE_PATH = "scripts/src/gc-remote-growth.py";
const GC_REMOTE_GROWTH_PLACEHOLDER = "# __UNIDESK_GC_REMOTE_GROWTH_HELPERS__";
const GC_REMOTE_REGISTRY_RELATIVE_PATH = "scripts/src/gc-remote-registry.py";
const GC_REMOTE_REGISTRY_PLACEHOLDER = "# __UNIDESK_GC_REMOTE_REGISTRY_HELPERS__";
export async function runRemoteGcCommand(config: UniDeskConfig, providerId: string | undefined, action: string | undefined, args: string[]): Promise<unknown> { export async function runRemoteGcCommand(config: UniDeskConfig, providerId: string | undefined, action: string | undefined, args: string[]): Promise<unknown> {
if (providerId === undefined || providerId.length === 0) { if (providerId === undefined || providerId.length === 0) {
@@ -186,6 +204,22 @@ function parseRemoteGcOptions(args: string[]): RemoteGcOptions {
options.toolCaches = true; options.toolCaches = true;
} else if (arg === "--no-tool-caches") { } else if (arg === "--no-tool-caches") {
options.toolCaches = false; options.toolCaches = false;
} else if (arg === "--include-web-observe-artifacts") {
options.webObserveArtifacts = true;
} else if (arg === "--no-web-observe-artifacts") {
options.webObserveArtifacts = false;
} else if (arg === "--include-k3s-image-cache") {
options.k3sImageCache = true;
} else if (arg === "--no-k3s-image-cache") {
options.k3sImageCache = false;
} else if (arg === "--include-host-containerd-cache") {
options.hostContainerdCache = true;
} else if (arg === "--no-host-containerd-cache") {
options.hostContainerdCache = false;
} else if (arg === "--include-local-path-orphans") {
options.localPathOrphans = true;
} else if (arg === "--no-local-path-orphans") {
options.localPathOrphans = false;
} else if (arg === "--no-apt-cache") { } else if (arg === "--no-apt-cache") {
options.aptCache = false; options.aptCache = false;
} else if (arg === "--no-core-dumps") { } else if (arg === "--no-core-dumps") {
@@ -295,5 +329,31 @@ function remoteGcPython(configBase64: string): string {
if (!template.includes(GC_REMOTE_RUNNER_CONFIG_PLACEHOLDER)) { if (!template.includes(GC_REMOTE_RUNNER_CONFIG_PLACEHOLDER)) {
throw new Error(`${GC_REMOTE_RUNNER_RELATIVE_PATH} missing ${GC_REMOTE_RUNNER_CONFIG_PLACEHOLDER}`); throw new Error(`${GC_REMOTE_RUNNER_RELATIVE_PATH} missing ${GC_REMOTE_RUNNER_CONFIG_PLACEHOLDER}`);
} }
return template.replace(GC_REMOTE_RUNNER_CONFIG_PLACEHOLDER, configBase64); if (!template.includes(GC_REMOTE_WEB_OBSERVE_PLACEHOLDER)) {
throw new Error(`${GC_REMOTE_RUNNER_RELATIVE_PATH} missing ${GC_REMOTE_WEB_OBSERVE_PLACEHOLDER}`);
}
if (!template.includes(GC_REMOTE_CONTAINERD_PLACEHOLDER)) {
throw new Error(`${GC_REMOTE_RUNNER_RELATIVE_PATH} missing ${GC_REMOTE_CONTAINERD_PLACEHOLDER}`);
}
if (!template.includes(GC_REMOTE_PVC_PLACEHOLDER)) {
throw new Error(`${GC_REMOTE_RUNNER_RELATIVE_PATH} missing ${GC_REMOTE_PVC_PLACEHOLDER}`);
}
if (!template.includes(GC_REMOTE_GROWTH_PLACEHOLDER)) {
throw new Error(`${GC_REMOTE_RUNNER_RELATIVE_PATH} missing ${GC_REMOTE_GROWTH_PLACEHOLDER}`);
}
if (!template.includes(GC_REMOTE_REGISTRY_PLACEHOLDER)) {
throw new Error(`${GC_REMOTE_RUNNER_RELATIVE_PATH} missing ${GC_REMOTE_REGISTRY_PLACEHOLDER}`);
}
const webObserveHelpers = readFileSync(rootPath(GC_REMOTE_WEB_OBSERVE_RELATIVE_PATH), "utf8");
const containerdHelpers = readFileSync(rootPath(GC_REMOTE_CONTAINERD_RELATIVE_PATH), "utf8");
const pvcHelpers = readFileSync(rootPath(GC_REMOTE_PVC_RELATIVE_PATH), "utf8");
const growthHelpers = readFileSync(rootPath(GC_REMOTE_GROWTH_RELATIVE_PATH), "utf8");
const registryHelpers = readFileSync(rootPath(GC_REMOTE_REGISTRY_RELATIVE_PATH), "utf8");
return template
.replace(GC_REMOTE_WEB_OBSERVE_PLACEHOLDER, webObserveHelpers.trimEnd())
.replace(GC_REMOTE_CONTAINERD_PLACEHOLDER, containerdHelpers.trimEnd())
.replace(GC_REMOTE_PVC_PLACEHOLDER, pvcHelpers.trimEnd())
.replace(GC_REMOTE_GROWTH_PLACEHOLDER, growthHelpers.trimEnd())
.replace(GC_REMOTE_REGISTRY_PLACEHOLDER, registryHelpers.trimEnd())
.replace(GC_REMOTE_RUNNER_CONFIG_PLACEHOLDER, configBase64);
} }