fix: install JD01 GC growth policy

This commit is contained in:
Codex
2026-07-05 08:08:59 +00:00
parent 496b5fc729
commit 345b1cb7bb
5 changed files with 646 additions and 59 deletions
+16
View File
@@ -152,21 +152,37 @@ gc:
enabled: true
overlaySnapshotsRoot: /var/lib/containerd/io.containerd.snapshotter.v1.overlayfs/snapshots
contentBlobRoot: /var/lib/containerd/io.containerd.content.v1.content/blobs/sha256
maxDeletePerRun: 100
localPathStorage:
enabled: true
root: /var/lib/rancher/k3s/storage
orphanDirPrefixes:
- pvc-
orphanMinAgeMinutes: 0
maxDeletePerRun: 100
policyTimer:
enabled: true
name: unidesk-jd01-low-risk-gc
onCalendar: daily
randomizedDelaySec: 20min
stateDir: /var/lib/unidesk-gc
configDir: /etc/unidesk-gc
journalTargetBytes: 256MiB
tmpMinAgeHours: 24
includeAptCache: true
includeToolCaches: false
includeWebObserveArtifacts: true
includeK3sImageCache: true
includeHostContainerdCache: true
includeLocalPathOrphans: true
agentrunSessionPvcs:
enabled: true
namespace: agentrun-v02
prefixes:
- agentrun-v01-session-
- agentrun-v02-session-
- agentrun-jd01-v02-session-
maxDeletePerRun: 100
policyTimer:
journald:
systemMaxUse: 512MiB
+2
View File
@@ -101,6 +101,8 @@ JD01 host containerd 只能通过 `gc remote JD01 plan|run --include-host-contai
当 host containerd 的 `ctr` 元数据中 images、containers、tasks、leases、snapshots 和 content 全为空,但 YAML allowlist 下仍残留 overlay snapshot 目录或 content blob 文件时,才能把它们分类为 orphan state。orphan state 清理仍必须通过 `--include-host-containerd-cache` 的 plan/run,执行前重新检查元数据为空、路径在 YAML root 下、名称匹配受控形态、无 symlink、无打开 fd/cwd;不得删除 metadata DB 或扩大到 containerd root。
JD01 增长降速 policy 由 `gc remote JD01 policy plan|install|status` 管理。policy install 会把原生 Python runner 和 JSON config 写入 YAML 指定的目标路径,再由 systemd timer 周期触发;runner 不读取 host worktree,也不依赖临时 CLI 输出解析。启用阶段必须逐项来自 `config/unidesk-cli.yaml#gc.remote.targets.JD01.policyTimer`:低风险 journal/apt/tmp、可选 tool cache、Web observe stale artifact、AgentRun session PVC、k3s CRI image prune、host containerd orphan state 和 local-path orphan。每个中风险阶段仍使用各自的 owner-aware 保护条件,失败时记录到 policy state,不得扩大成 raw `rm -rf`、raw kubectl 或 containerd metadata 删除。
JD01 Web observe artifact 是一等 GC 对象。state root 必须来自 YAML;候选按 run 聚合并读取 `manifest.json``heartbeat.json``pid`、report sha 和 top files。年龄判定以 manifest/heartbeat 的 started/completed/updated 字段、pid 存活和打开 fd 检查为准,不以目录 mtime 为唯一依据,因为手动 GC 或目录遍历可能刷新 mtime。active run、pid alive、open fd、未生成必要 report 的 run 均为 protected。safe 候选只覆盖超过 YAML retention 且可重建的 raw samples、browser-process、network/trace、screenshot 等大 artifact;长期保留 report summary、report json/md、最终截图或诊断摘要由 YAML cap/retention 策略控制。
JD01 Chrome 内存治理应优先管理 observer runner 生命周期,而不是孤立清理 Chrome 进程。Web probe sentinel 和 quick-verify 启动 observer 后,所有终态路径(成功、blocked、失败、timeout、异常)都必须执行 YAML 控制的 `web-probe observe stop`/force stop 流程,并验证对应 runner/Chrome process tree 退出;observe runner 自身也必须从 scenario/YAML 获得最大运行时长或 max samples 兜底,即使调用方退出也会停止采样并关闭 browser。browser freeze policy 只能作为异常保护,不替代正常任务生命周期结束后的 stop。
+477
View File
@@ -0,0 +1,477 @@
#!/usr/bin/env python3
import json
import os
import re
import shutil
import subprocess
import sys
import time
from datetime import datetime, timezone
def now_iso():
return datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z")
def command(args, timeout=60):
started = time.time()
try:
result = subprocess.run(args, text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=timeout)
return {
"exitCode": result.returncode,
"elapsedMs": int((time.time() - started) * 1000),
"stdoutTail": result.stdout[-1200:],
"stderrTail": result.stderr[-1200:],
}
except subprocess.TimeoutExpired as exc:
return {
"exitCode": 124,
"elapsedMs": int((time.time() - started) * 1000),
"stdoutTail": (exc.stdout or "")[-1200:] if isinstance(exc.stdout, str) else "",
"stderrTail": (exc.stderr or "")[-1200:] if isinstance(exc.stderr, str) else "",
"timedOut": True,
}
def load_config(path):
with open(path, "r", encoding="utf-8") as handle:
return json.load(handle)
def safe_int(value, default=0):
try:
return int(value)
except Exception:
return default
def du_size(path):
result = command(["du", "-sb", path], 15)
if result["exitCode"] != 0:
return 0
try:
return int((result["stdoutTail"] or "0").split()[0])
except Exception:
return 0
def path_size(path):
try:
stat = os.lstat(path)
except OSError:
return 0
if os.path.isdir(path) and not os.path.islink(path):
return du_size(path)
return int(getattr(stat, "st_blocks", 0)) * 512
def path_has_open_fd(path):
resolved = os.path.realpath(path)
prefix = resolved.rstrip("/") + "/"
try:
pids = [name for name in os.listdir("/proc") if name.isdigit()]
except OSError:
return True
for pid in pids:
base = os.path.join("/proc", pid)
for name in ["cwd", "root"]:
try:
target = os.path.realpath(os.readlink(os.path.join(base, name)))
except OSError:
continue
if target == resolved or target.startswith(prefix):
return True
fd_dir = os.path.join(base, "fd")
try:
fds = os.listdir(fd_dir)
except OSError:
continue
for fd in fds:
try:
target = os.path.realpath(os.readlink(os.path.join(fd_dir, fd)))
except OSError:
continue
if target == resolved or target.startswith(prefix):
return True
return False
def run_json(args, timeout=45):
started = time.time()
try:
process = subprocess.run(args, text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=timeout)
except subprocess.TimeoutExpired as exc:
return None, {
"exitCode": 124,
"elapsedMs": int((time.time() - started) * 1000),
"stdoutTail": (exc.stdout or "")[-1200:] if isinstance(exc.stdout, str) else "",
"stderrTail": (exc.stderr or "")[-1200:] if isinstance(exc.stderr, str) else "",
"timedOut": True,
}
result = {
"exitCode": process.returncode,
"elapsedMs": int((time.time() - started) * 1000),
"stdoutTail": process.stdout[-1200:],
"stderrTail": process.stderr[-1200:],
}
if process.returncode != 0:
return None, result
try:
return json.loads(process.stdout or "{}"), result
except Exception:
return None, result
def write_state(config, payload):
state_path = config.get("statePath")
if not state_path:
return
os.makedirs(os.path.dirname(state_path), mode=0o755, exist_ok=True)
tmp = "%s.tmp.%d" % (state_path, os.getpid())
with open(tmp, "w", encoding="utf-8") as handle:
json.dump(payload, handle, ensure_ascii=False, sort_keys=True)
handle.write("\n")
os.replace(tmp, state_path)
def run_journal(config):
target = safe_int(config.get("journalTargetBytes"), 268435456)
result = command(["journalctl", "--vacuum-size=%d" % target], 30)
return {"id": "journal", "ok": result["exitCode"] == 0, "command": result}
def run_apt(config):
if not config.get("includeAptCache"):
return {"id": "apt-cache", "ok": True, "skipped": True}
before = du_size("/var/cache/apt/archives")
result = command(["apt-get", "clean"], 30)
after = du_size("/var/cache/apt/archives")
return {"id": "apt-cache", "ok": result["exitCode"] == 0, "reclaimedBytes": max(0, before - after), "command": result}
def run_tmp(config):
prefixes = list(config.get("tmpPrefixAllowlist") or [])
protected = set(config.get("tmpExactProtect") or [])
cutoff = time.time() - float(config.get("tmpMinAgeHours") or 24) * 3600.0
deleted = 0
reclaimed = 0
if not os.path.isdir("/tmp"):
return {"id": "tmp-allowlist", "ok": True, "deletedCount": 0, "reclaimedBytes": 0}
for name in os.listdir("/tmp"):
path = os.path.join("/tmp", name)
if path in protected or not any(name.startswith(prefix) for prefix in prefixes):
continue
try:
stat = os.lstat(path)
except OSError:
continue
if stat.st_mtime >= cutoff or path_has_open_fd(path):
continue
before = path_size(path)
if os.path.isdir(path) and not os.path.islink(path):
shutil.rmtree(path, ignore_errors=True)
else:
try:
os.unlink(path)
except FileNotFoundError:
pass
deleted += 1
reclaimed += before
return {"id": "tmp-allowlist", "ok": True, "deletedCount": deleted, "reclaimedBytes": reclaimed}
def run_tool_caches(config):
paths = list(config.get("toolCachePaths") or [])
reclaimed = 0
deleted = 0
for path in paths:
resolved = os.path.abspath(path)
if resolved != path or os.path.islink(resolved) or resolved in ["/", "/root", "/root/.npm", "/root/.bun"]:
continue
before = path_size(resolved)
if os.path.isdir(resolved):
shutil.rmtree(resolved, ignore_errors=True)
elif os.path.exists(resolved):
try:
os.unlink(resolved)
except FileNotFoundError:
pass
else:
continue
deleted += 1
reclaimed += before
return {"id": "tool-caches", "ok": True, "deletedCount": deleted, "reclaimedBytes": reclaimed}
def pid_alive(pid):
try:
value = int(pid)
except Exception:
return False
return value > 0 and os.path.exists("/proc/%d" % value)
def read_json_file(path):
try:
with open(path, "r", encoding="utf-8") as handle:
return json.load(handle)
except Exception:
return {}
def run_web_observe(config):
cfg = config.get("webObserve") or {}
if not cfg.get("enabled"):
return {"id": "web-observe-artifacts", "ok": True, "skipped": True}
roots = [os.path.abspath(item) for item in (cfg.get("observeStateRoots") or cfg.get("webObserveRoots") or []) if isinstance(item, str) and item.startswith("/")]
stale_hours = float(cfg.get("staleRunMaxAgeHours") or 6)
cutoff = time.time() - stale_hours * 3600.0
deleted = 0
reclaimed = 0
protected = 0
for root in roots:
if not os.path.isdir(root):
continue
for name in os.listdir(root):
path = os.path.abspath(os.path.join(root, name))
if os.path.dirname(path) != root or os.path.islink(path) or not os.path.isdir(path):
continue
manifest = read_json_file(os.path.join(path, "manifest.json"))
heartbeat = read_json_file(os.path.join(path, "heartbeat.json"))
pid = manifest.get("pid") or heartbeat.get("pid") or read_json_file(os.path.join(path, "pid")).get("pid")
try:
stat = os.lstat(path)
except OSError:
continue
completed = bool(manifest.get("completedAt") or heartbeat.get("completedAt") or manifest.get("status") in ["completed", "failed", "blocked"])
stale = completed or stat.st_mtime < cutoff
if pid_alive(pid) or not stale or path_has_open_fd(path):
protected += 1
continue
before = path_size(path)
shutil.rmtree(path, ignore_errors=True)
deleted += 1
reclaimed += before
return {"id": "web-observe-artifacts", "ok": True, "deletedCount": deleted, "protectedCount": protected, "reclaimedBytes": reclaimed}
def active_claims(namespace):
pods, _ = run_json(["kubectl", "-n", namespace, "get", "pod", "-o", "json"], 30)
claims = {}
for pod in (pods or {}).get("items") or []:
if ((pod.get("status") or {}).get("phase")) in ["Succeeded", "Failed"]:
continue
pod_name = ((pod.get("metadata") or {}).get("name")) or ""
for volume in ((pod.get("spec") or {}).get("volumes") or []):
claim = (volume.get("persistentVolumeClaim") or {}).get("claimName")
if claim:
claims.setdefault(claim, []).append(pod_name)
return claims
def run_session_pvcs(config):
cfg = config.get("agentrunSessionPvcs") or {}
if not cfg.get("enabled"):
return {"id": "agentrun-session-pvcs", "ok": True, "skipped": True}
namespace = cfg.get("namespace")
prefixes = list(cfg.get("prefixes") or [])
limit = max(1, min(safe_int(cfg.get("maxDeletePerRun"), 100), 1000))
if not namespace or not prefixes:
return {"id": "agentrun-session-pvcs", "ok": False, "error": "namespace-or-prefixes-missing"}
pv_data, pv_cmd = run_json(["kubectl", "get", "pv", "-o", "json"], 30)
pvc_data, pvc_cmd = run_json(["kubectl", "-n", namespace, "get", "pvc", "-o", "json"], 30)
if pv_data is None or pvc_data is None:
return {"id": "agentrun-session-pvcs", "ok": False, "pvCommand": pv_cmd, "pvcCommand": pvc_cmd}
pvs = {((pv.get("metadata") or {}).get("name")): pv for pv in pv_data.get("items") or []}
active = active_claims(namespace)
selected = []
protected = 0
for pvc in pvc_data.get("items") or []:
name = ((pvc.get("metadata") or {}).get("name")) or ""
if not any(name.startswith(prefix) for prefix in prefixes):
continue
pv = pvs.get((pvc.get("spec") or {}).get("volumeName")) or {}
storage_class = (pvc.get("spec") or {}).get("storageClassName") or (pv.get("spec") or {}).get("storageClassName")
reclaim_policy = (pv.get("spec") or {}).get("persistentVolumeReclaimPolicy")
if active.get(name) or storage_class != "local-path" or reclaim_policy != "Delete":
protected += 1
continue
selected.append(name)
selected = selected[:limit]
if selected:
result = command(["kubectl", "-n", namespace, "delete", "pvc", "--wait=false"] + selected, 45)
ok = result["exitCode"] == 0
else:
result = None
ok = True
return {"id": "agentrun-session-pvcs", "ok": ok, "deletedPvcCount": len(selected), "protectedCount": protected, "command": result}
def ci_active(config):
namespaces = list((config.get("k3sImageCache") or {}).get("ciNamespaces") or [])
active = []
for namespace in namespaces:
result = command(["sh", "-lc", "KUBECONFIG=/etc/rancher/k3s/k3s.yaml kubectl get pipelinerun,taskrun,job -n %s --no-headers 2>/dev/null | awk '$2 != \"True\" && $2 != \"False\" && $2 != \"Complete\" && $2 != \"Failed\" {print}' | head -20" % namespace], 15)
for line in (result.get("stdoutTail") or "").splitlines():
if line.strip():
active.append({"namespace": namespace, "line": line.strip()})
return active
def run_k3s_images(config):
cfg = config.get("k3sImageCache") or {}
if not cfg.get("enabled"):
return {"id": "k3s-image-cache", "ok": True, "skipped": True}
active = ci_active(config)
if active:
return {"id": "k3s-image-cache", "ok": True, "skipped": True, "reason": "ci-active", "activePreview": active[:5]}
endpoint = cfg.get("runtimeEndpoint") or "unix:///run/k3s/containerd/containerd.sock"
before = du_size("/var/lib/rancher/k3s/agent/containerd")
result = command(["crictl", "--runtime-endpoint", endpoint, "rmi", "--prune"], 300)
after = du_size("/var/lib/rancher/k3s/agent/containerd")
return {"id": "k3s-image-cache", "ok": result["exitCode"] == 0, "reclaimedBytes": max(0, before - after), "command": result}
def table_data_lines(stdout, header_prefix):
return [line.strip() for line in str(stdout or "").splitlines() if line.strip() and not line.startswith(header_prefix)]
def host_containerd_empty(config):
cfg = config.get("hostContainerdCache") or {}
address = cfg.get("address") or "/run/containerd/containerd.sock"
namespaces = cfg.get("namespaces") or ["default"]
active = []
for namespace in namespaces:
base = ["ctr", "--address", address, "-n", namespace]
for kind, args, header in [
("image", ["images", "list", "-q"], ""),
("container", ["containers", "list", "-q"], ""),
("task", ["tasks", "list", "-q"], ""),
("lease", ["leases", "list", "-q"], ""),
("snapshot", ["snapshots", "list"], "KEY"),
("content", ["content", "list"], "DIGEST"),
]:
result = command(base + args, 20)
if result["exitCode"] != 0:
active.append({"namespace": namespace, "kind": kind, "state": "unknown"})
continue
lines = table_data_lines(result.get("stdoutTail") or "", header) if header else [line for line in (result.get("stdoutTail") or "").splitlines() if line.strip()]
for line in lines:
active.append({"namespace": namespace, "kind": kind, "name": line.split()[0] if line.split() else line})
return active
def direct_child_rows(root, predicate):
rows = []
if not root or not os.path.isdir(root) or os.path.islink(root):
return rows
real_root = os.path.realpath(os.path.abspath(root))
for name in sorted(os.listdir(real_root)):
path = os.path.realpath(os.path.abspath(os.path.join(real_root, name)))
if os.path.dirname(path) != real_root or not predicate(name, path):
continue
rows.append({"name": name, "path": path, "sizeBytes": path_size(path)})
rows.sort(key=lambda item: item["sizeBytes"], reverse=True)
return rows
def run_host_containerd(config):
cfg = config.get("hostContainerdCache") or {}
if not cfg.get("enabled"):
return {"id": "host-containerd-orphans", "ok": True, "skipped": True}
active = host_containerd_empty(config)
if active:
return {"id": "host-containerd-orphans", "ok": True, "skipped": True, "reason": "metadata-not-empty", "activePreview": active[:8]}
orphan = cfg.get("orphanCleanup") or {}
if not orphan.get("enabled"):
return {"id": "host-containerd-orphans", "ok": True, "skipped": True, "reason": "orphan-cleanup-disabled"}
overlay_root = os.path.realpath(os.path.abspath(orphan.get("overlaySnapshotsRoot") or ""))
content_root = os.path.realpath(os.path.abspath(orphan.get("contentBlobRoot") or ""))
root = os.path.realpath(os.path.abspath(cfg.get("root") or ""))
if not root or not overlay_root.startswith(root + "/") or not content_root.startswith(root + "/"):
return {"id": "host-containerd-orphans", "ok": False, "error": "orphan-root-outside-containerd-root"}
for root_path in [overlay_root, content_root]:
if os.path.exists(root_path) and path_has_open_fd(root_path):
return {"id": "host-containerd-orphans", "ok": True, "skipped": True, "reason": "open-fd", "root": root_path}
rows = direct_child_rows(overlay_root, lambda name, path: os.path.isdir(path) and not os.path.islink(path) and re.match(r"^[0-9]+$", name) is not None)
rows += direct_child_rows(content_root, lambda name, path: os.path.isfile(path) and not os.path.islink(path) and re.match(r"^[0-9a-f]{64}$", name) is not None)
rows.sort(key=lambda item: item["sizeBytes"], reverse=True)
limit = max(1, min(safe_int(orphan.get("maxDeletePerRun"), 100), 1000))
reclaimed = 0
deleted = 0
for row in rows[:limit]:
before = path_size(row["path"])
if os.path.isdir(row["path"]):
shutil.rmtree(row["path"], ignore_errors=True)
else:
try:
os.unlink(row["path"])
except FileNotFoundError:
continue
reclaimed += before
deleted += 1
return {"id": "host-containerd-orphans", "ok": True, "deletedCount": deleted, "candidateCount": len(rows), "reclaimedBytes": reclaimed}
def run_local_path_orphans(config):
cfg = config.get("localPathStorage") or {}
if not cfg.get("enabled"):
return {"id": "local-path-orphans", "ok": True, "skipped": True}
root = os.path.realpath(os.path.abspath(cfg.get("root") or ""))
prefixes = list(cfg.get("orphanDirPrefixes") or [])
if not root or not prefixes or not os.path.isdir(root):
return {"id": "local-path-orphans", "ok": False, "error": "root-or-prefix-missing"}
pv_data, pv_cmd = run_json(["kubectl", "get", "pv", "-o", "json"], 30)
if pv_data is None:
return {"id": "local-path-orphans", "ok": False, "pvCommand": pv_cmd}
referenced = set()
for pv in pv_data.get("items") or []:
spec = pv.get("spec") or {}
path = ((spec.get("hostPath") or {}).get("path")) or ((spec.get("local") or {}).get("path"))
if path:
referenced.add(os.path.realpath(os.path.abspath(path)))
rows = []
for name in os.listdir(root):
path = os.path.realpath(os.path.abspath(os.path.join(root, name)))
if os.path.dirname(path) != root or not any(name.startswith(prefix) for prefix in prefixes) or path in referenced or path_has_open_fd(path):
continue
if os.path.isdir(path) and not os.path.islink(path):
rows.append({"name": name, "path": path, "sizeBytes": path_size(path)})
rows.sort(key=lambda item: item["sizeBytes"], reverse=True)
limit = max(1, min(safe_int(cfg.get("maxDeletePerRun"), 100), 1000))
reclaimed = 0
deleted = 0
for row in rows[:limit]:
before = path_size(row["path"])
shutil.rmtree(row["path"], ignore_errors=True)
reclaimed += before
deleted += 1
return {"id": "local-path-orphans", "ok": True, "deletedCount": deleted, "candidateCount": len(rows), "reclaimedBytes": reclaimed}
def main():
config_path = sys.argv[1] if len(sys.argv) > 1 else ""
config = load_config(config_path)
results = []
for fn in [run_journal, run_apt, run_tmp, run_tool_caches, run_web_observe, run_session_pvcs, run_k3s_images, run_host_containerd, run_local_path_orphans]:
try:
results.append(fn(config))
except Exception as exc:
results.append({"id": getattr(fn, "__name__", "stage"), "ok": False, "error": str(exc)})
payload = {
"ok": all(item.get("ok") for item in results),
"observedAt": now_iso(),
"providerId": config.get("providerId"),
"unitName": config.get("unitName"),
"resultCount": len(results),
"failedCount": len([item for item in results if not item.get("ok")]),
"reclaimedBytes": sum(safe_int(item.get("reclaimedBytes")) for item in results),
"results": results,
}
write_state(config, payload)
print(json.dumps(payload, ensure_ascii=False))
return 0 if payload["ok"] else 1
if __name__ == "__main__":
raise SystemExit(main())
+139 -57
View File
@@ -22,6 +22,7 @@ CONTAINERD_CONFIG = REMOTE_TARGET.get("containerdImageCache") if isinstance(REMO
HOST_CONTAINERD_CONFIG = REMOTE_TARGET.get("hostContainerdCache") if isinstance(REMOTE_TARGET.get("hostContainerdCache"), dict) else {}
LOCAL_PATH_CONFIG = REMOTE_TARGET.get("localPathStorage") if isinstance(REMOTE_TARGET.get("localPathStorage"), dict) else {}
POLICY_TIMER_CONFIG = REMOTE_TARGET.get("policyTimer") if isinstance(REMOTE_TARGET.get("policyTimer"), dict) else {}
POLICY_RUNNER_SOURCE = base64.b64decode("__UNIDESK_GC_REMOTE_POLICY_RUNNER_BASE64__").decode("utf-8")
TMP_PREFIX_ALLOWLIST = [
"hwlab-agent-",
@@ -1523,61 +1524,52 @@ def render_remote_policy():
tmp_min_age_hours = config_float(POLICY_TIMER_CONFIG, "tmpMinAgeHours", float(OPTIONS.get("tmpMinAgeHours") or 24), minimum=0.0)
include_apt_cache = config_bool(POLICY_TIMER_CONFIG, "includeAptCache", bool(OPTIONS.get("aptCache", True)))
include_tool_caches = config_bool(POLICY_TIMER_CONFIG, "includeToolCaches", False)
script_path = "/usr/local/sbin/%s.sh" % unit_name
include_web_observe = config_bool(POLICY_TIMER_CONFIG, "includeWebObserveArtifacts", False)
include_k3s_images = config_bool(POLICY_TIMER_CONFIG, "includeK3sImageCache", False)
include_host_containerd = config_bool(POLICY_TIMER_CONFIG, "includeHostContainerdCache", False)
include_local_path_orphans = config_bool(POLICY_TIMER_CONFIG, "includeLocalPathOrphans", False)
state_dir = config_str(POLICY_TIMER_CONFIG, "stateDir", "/var/lib/unidesk-gc")
config_dir = config_str(POLICY_TIMER_CONFIG, "configDir", "/etc/unidesk-gc")
script_path = "/usr/local/sbin/%s.py" % unit_name
config_path = os.path.join(config_dir, "%s.json" % unit_name)
state_path = os.path.join(state_dir, "%s.last.json" % unit_name)
service_path = "/etc/systemd/system/%s.service" % unit_name
timer_path = "/etc/systemd/system/%s.timer" % unit_name
tool_paths = [item["path"] for item in TOOL_CACHE_ALLOWLIST] if include_tool_caches else []
script = "\n".join([
"#!/bin/sh",
"set -eu",
"umask 077",
"journalctl --vacuum-size=%s >/dev/null 2>&1 || true" % int(journal_target),
"apt-get clean >/dev/null 2>&1 || true" if include_apt_cache else ": apt cache disabled by YAML",
"python3 - <<'PY'",
"import json, os, shutil, time",
"prefixes = json.loads(%r)" % json.dumps(TMP_PREFIX_ALLOWLIST),
"protected = set(json.loads(%r))" % json.dumps(sorted(TMP_EXACT_PROTECT)),
"tool_paths = json.loads(%r)" % json.dumps(tool_paths),
"cutoff = time.time() - float(%r) * 3600.0" % tmp_min_age_hours,
"for name in os.listdir('/tmp'):",
" path = os.path.join('/tmp', name)",
" if path in protected or not any(name.startswith(prefix) for prefix in prefixes):",
" continue",
" try:",
" stat = os.lstat(path)",
" except OSError:",
" continue",
" if stat.st_mtime >= cutoff:",
" continue",
" if os.path.isdir(path) and not os.path.islink(path):",
" shutil.rmtree(path, ignore_errors=True)",
" elif os.path.exists(path):",
" try:",
" os.unlink(path)",
" except FileNotFoundError:",
" pass",
"for path in tool_paths:",
" resolved = os.path.abspath(path)",
" if resolved != path or os.path.islink(resolved) or resolved in ['/', '/root', '/root/.npm', '/root/.bun']:",
" continue",
" if os.path.isdir(resolved):",
" shutil.rmtree(resolved, ignore_errors=True)",
" elif os.path.exists(resolved):",
" try:",
" os.unlink(resolved)",
" except FileNotFoundError:",
" pass",
"PY",
"",
])
policy_config = {
"providerId": PROVIDER_ID,
"unitName": unit_name,
"statePath": state_path,
"journalTargetBytes": int(journal_target),
"tmpMinAgeHours": tmp_min_age_hours,
"tmpPrefixAllowlist": TMP_PREFIX_ALLOWLIST,
"tmpExactProtect": sorted(TMP_EXACT_PROTECT),
"includeAptCache": include_apt_cache,
"toolCachePaths": [item["path"] for item in TOOL_CACHE_ALLOWLIST] if include_tool_caches else [],
"webObserve": {"enabled": include_web_observe, **MEMORY_CONFIG},
"k3sImageCache": {"enabled": include_k3s_images, **CONTAINERD_CONFIG},
"hostContainerdCache": {"enabled": include_host_containerd, **HOST_CONTAINERD_CONFIG},
"localPathStorage": {"enabled": include_local_path_orphans, **LOCAL_PATH_CONFIG},
"agentrunSessionPvcs": POLICY_TIMER_CONFIG.get("agentrunSessionPvcs") if isinstance(POLICY_TIMER_CONFIG.get("agentrunSessionPvcs"), dict) else {"enabled": False},
}
stages = [
{"id": "journal", "enabled": True, "risk": "low", "mode": "journalctl-vacuum"},
{"id": "apt-cache", "enabled": include_apt_cache, "risk": "low", "mode": "apt-get-clean"},
{"id": "tmp-allowlist", "enabled": True, "risk": "low", "mode": "direct-child-prefix-retention"},
{"id": "tool-caches", "enabled": include_tool_caches, "risk": "medium", "mode": "fixed-path-rebuildable-cache"},
{"id": "web-observe-artifacts", "enabled": include_web_observe, "risk": "medium", "mode": "manifest-heartbeat-pid-openfd-stale-run"},
{"id": "agentrun-session-pvcs", "enabled": bool(policy_config["agentrunSessionPvcs"].get("enabled")), "risk": "medium", "mode": "kubectl-delete-pvc-wait-false-owner-aware"},
{"id": "k3s-image-cache", "enabled": include_k3s_images, "risk": "medium", "mode": "crictl-rmi-prune-ci-idle"},
{"id": "host-containerd-orphans", "enabled": include_host_containerd, "risk": "medium", "mode": "ctr-metadata-empty-yaml-orphan-state"},
{"id": "local-path-orphans", "enabled": include_local_path_orphans, "risk": "medium", "mode": "pv-unreferenced-direct-child"},
]
service = "\n".join([
"[Unit]",
"Description=UniDesk remote low-risk GC for %s" % PROVIDER_ID,
"Description=UniDesk remote growth-slowdown GC for %s" % PROVIDER_ID,
"Documentation=config/unidesk-cli.yaml#gc.remote.targets.%s.policyTimer" % PROVIDER_ID,
"",
"[Service]",
"Type=oneshot",
"ExecStart=%s" % script_path,
"ExecStart=/usr/bin/python3 %s %s" % (script_path, config_path),
"Nice=10",
"IOSchedulingClass=best-effort",
"IOSchedulingPriority=7",
@@ -1599,6 +1591,10 @@ def render_remote_policy():
return {
"unitName": unit_name,
"scriptPath": script_path,
"configPath": config_path,
"statePath": state_path,
"configDir": config_dir,
"stateDir": state_dir,
"servicePath": service_path,
"timerPath": timer_path,
"onCalendar": on_calendar,
@@ -1608,7 +1604,13 @@ def render_remote_policy():
"tmpMinAgeHours": tmp_min_age_hours,
"includeAptCache": include_apt_cache,
"includeToolCaches": include_tool_caches,
"script": script,
"includeWebObserveArtifacts": include_web_observe,
"includeK3sImageCache": include_k3s_images,
"includeHostContainerdCache": include_host_containerd,
"includeLocalPathOrphans": include_local_path_orphans,
"stages": stages,
"policyConfig": policy_config,
"script": POLICY_RUNNER_SOURCE,
"service": service,
"timer": timer,
}
@@ -1624,30 +1626,106 @@ def remote_policy_plan_payload(observed_at):
"observedAt": observed_at,
"configSource": "config/unidesk-cli.yaml#gc.remote.targets.%s.policyTimer" % PROVIDER_ID,
"enabled": config_bool(POLICY_TIMER_CONFIG, "enabled", False),
"timer": {key: rendered.get(key) for key in ["unitName", "scriptPath", "servicePath", "timerPath", "onCalendar", "randomizedDelaySec", "journalTargetBytes", "journalTarget", "tmpMinAgeHours", "includeAptCache", "includeToolCaches"]},
"scriptPreview": "\n".join(rendered["script"].splitlines()[:20]),
"servicePreview": rendered["service"],
"timerPreview": rendered["timer"],
"timer": {key: rendered.get(key) for key in ["unitName", "scriptPath", "configPath", "statePath", "servicePath", "timerPath", "onCalendar", "randomizedDelaySec", "journalTargetBytes", "journalTarget", "tmpMinAgeHours", "includeAptCache", "includeToolCaches", "includeWebObserveArtifacts", "includeK3sImageCache", "includeHostContainerdCache", "includeLocalPathOrphans"]},
"stages": rendered.get("stages"),
"servicePreview": rendered["service"] if bool(OPTIONS.get("full")) else None,
"timerPreview": rendered["timer"] if bool(OPTIONS.get("full")) else None,
"installCommand": "bun scripts/cli.ts gc remote %s policy install --confirm" % PROVIDER_ID,
"statusCommand": "bun scripts/cli.ts gc remote %s policy status" % PROVIDER_ID,
"policy": {
"risk": "low",
"risk": "low-to-medium-owner-aware",
"neverTouches": [
"k3s runtime directories",
"PVC/PV/local-path data",
"k3s runtime metadata unless a dedicated CRI prune stage is enabled",
"active PVC/PV/local-path data",
"Docker images, containers, volumes or Docker build cache",
"Secret/auth/config state",
"active Web observe runners or Chrome processes",
],
"toolCaches": "disabled unless config/unidesk-cli.yaml enables includeToolCaches for this remote target",
"sourceOfTruth": "config/unidesk-cli.yaml#gc.remote.targets.%s.policyTimer" % PROVIDER_ID,
},
}
def remote_policy_status_payload(observed_at):
rendered = render_remote_policy()
timer = command(["systemctl", "show", "%s.timer" % rendered["unitName"], "--property=LoadState,ActiveState,SubState,NextElapseUSecRealtime,LastTriggerUSec"], 10)
service = command(["systemctl", "show", "%s.service" % rendered["unitName"], "--property=LoadState,ActiveState,SubState,Result,ExecMainStatus"], 10)
last = None
try:
with open(rendered["statePath"], "r", encoding="utf-8") as handle:
last = json.load(handle)
except Exception:
last = None
if last is not None and not bool(OPTIONS.get("full")):
last = compact_policy_last_run(last)
return {
"ok": True,
"action": "gc remote policy status",
"providerId": PROVIDER_ID,
"dryRun": True,
"mutation": False,
"observedAt": observed_at,
"timer": {key: rendered.get(key) for key in ["unitName", "scriptPath", "configPath", "statePath", "servicePath", "timerPath", "onCalendar", "randomizedDelaySec"]},
"systemd": {
"timer": bounded(timer),
"service": bounded(service),
},
"lastRun": last,
"next": {
"plan": "bun scripts/cli.ts gc remote %s policy plan" % PROVIDER_ID,
"install": "bun scripts/cli.ts gc remote %s policy install --confirm" % PROVIDER_ID,
"disk": "bun scripts/cli.ts gc remote %s status --limit 20" % PROVIDER_ID,
},
}
def compact_policy_last_run(last):
results = []
for item in last.get("results") or []:
if not isinstance(item, dict):
continue
compact = {
key: item.get(key)
for key in [
"id",
"ok",
"skipped",
"reason",
"error",
"deletedCount",
"deletedPvcCount",
"protectedCount",
"candidateCount",
"reclaimedBytes",
]
if item.get(key) is not None
}
command_payload = item.get("command") if isinstance(item.get("command"), dict) else None
if command_payload is not None:
compact["command"] = {
key: command_payload.get(key)
for key in ["exitCode", "timedOut", "elapsedMs"]
if command_payload.get(key) is not None
}
results.append(compact)
payload = {
key: last.get(key)
for key in ["ok", "observedAt", "providerId", "unitName", "resultCount", "failedCount", "reclaimedBytes"]
if last.get(key) is not None
}
payload["results"] = results
return payload
def remote_policy_install_payload(observed_at):
rendered = render_remote_policy()
try:
os.makedirs(rendered["configDir"], mode=0o755, exist_ok=True)
os.makedirs(rendered["stateDir"], mode=0o755, exist_ok=True)
with open(rendered["scriptPath"], "w", encoding="utf-8") as handle:
handle.write(rendered["script"])
os.chmod(rendered["scriptPath"], 0o755)
with open(rendered["configPath"], "w", encoding="utf-8") as handle:
json.dump(rendered["policyConfig"], handle, sort_keys=True, separators=(",", ":"))
handle.write("\n")
os.chmod(rendered["configPath"], 0o644)
with open(rendered["servicePath"], "w", encoding="utf-8") as handle:
handle.write(rendered["service"])
with open(rendered["timerPath"], "w", encoding="utf-8") as handle:
@@ -1675,7 +1753,8 @@ def remote_policy_install_payload(observed_at):
"mutation": True,
"observedAt": observed_at,
"configSource": "config/unidesk-cli.yaml#gc.remote.targets.%s.policyTimer" % PROVIDER_ID,
"timer": {key: rendered.get(key) for key in ["unitName", "scriptPath", "servicePath", "timerPath", "onCalendar", "randomizedDelaySec", "journalTargetBytes", "journalTarget", "tmpMinAgeHours", "includeAptCache", "includeToolCaches"]},
"timer": {key: rendered.get(key) for key in ["unitName", "scriptPath", "configPath", "statePath", "servicePath", "timerPath", "onCalendar", "randomizedDelaySec", "journalTargetBytes", "journalTarget", "tmpMinAgeHours", "includeAptCache", "includeToolCaches", "includeWebObserveArtifacts", "includeK3sImageCache", "includeHostContainerdCache", "includeLocalPathOrphans"]},
"stages": rendered.get("stages"),
"systemd": {
"daemonReload": bounded(daemon),
"enableNow": bounded(enable),
@@ -1692,6 +1771,9 @@ def main():
if ACTION == "policy-install":
emit_json(remote_policy_install_payload(observed_at), persist_large=False)
return 0
if ACTION == "policy-status":
emit_json(remote_policy_status_payload(observed_at), persist_large=False)
return 0
if ACTION == "trend":
history_limit = int(OPTIONS.get("historyLimit") or 12)
history = read_growth_snapshots(history_limit)
+12 -2
View File
@@ -5,7 +5,7 @@ import { type UniDeskConfig, rootPath } from "./config";
import { remoteGcDegradedFailure } from "./gc-remote-degraded";
import { runSshCommandCapture } from "./ssh";
type RemoteGcAction = "plan" | "snapshot" | "trend" | "run" | "status" | "policy-plan" | "policy-install";
type RemoteGcAction = "plan" | "snapshot" | "trend" | "run" | "status" | "policy-plan" | "policy-install" | "policy-status";
interface RemoteGcOptions {
confirm: boolean;
@@ -81,6 +81,8 @@ const GC_REMOTE_GROWTH_RELATIVE_PATH = "scripts/src/gc-remote-growth.py";
const GC_REMOTE_GROWTH_PLACEHOLDER = "# __UNIDESK_GC_REMOTE_GROWTH_HELPERS__";
const GC_REMOTE_REGISTRY_RELATIVE_PATH = "scripts/src/gc-remote-registry.py";
const GC_REMOTE_REGISTRY_PLACEHOLDER = "# __UNIDESK_GC_REMOTE_REGISTRY_HELPERS__";
const GC_REMOTE_POLICY_RUNNER_RELATIVE_PATH = "scripts/src/gc-remote-policy-runner.py";
const GC_REMOTE_POLICY_RUNNER_PLACEHOLDER = "__UNIDESK_GC_REMOTE_POLICY_RUNNER_BASE64__";
export async function runRemoteGcCommand(config: UniDeskConfig, providerId: string | undefined, action: string | undefined, args: string[]): Promise<unknown> {
if (providerId === undefined || providerId.length === 0) {
@@ -111,11 +113,14 @@ export async function runRemoteGcCommand(config: UniDeskConfig, providerId: stri
}
return await runRemoteGc(config, providerId, "policy-install", options);
}
if (policyAction === "status") {
return await runRemoteGc(config, providerId, "policy-status", options);
}
return {
ok: false,
error: "unsupported-gc-remote-policy-action",
action: policyAction,
supportedActions: ["plan", "render", "dry-run", "install"],
supportedActions: ["plan", "render", "dry-run", "install", "status"],
};
}
const options = parseRemoteGcOptions(args);
@@ -344,16 +349,21 @@ function remoteGcPython(configBase64: string): string {
if (!template.includes(GC_REMOTE_REGISTRY_PLACEHOLDER)) {
throw new Error(`${GC_REMOTE_RUNNER_RELATIVE_PATH} missing ${GC_REMOTE_REGISTRY_PLACEHOLDER}`);
}
if (!template.includes(GC_REMOTE_POLICY_RUNNER_PLACEHOLDER)) {
throw new Error(`${GC_REMOTE_RUNNER_RELATIVE_PATH} missing ${GC_REMOTE_POLICY_RUNNER_PLACEHOLDER}`);
}
const webObserveHelpers = readFileSync(rootPath(GC_REMOTE_WEB_OBSERVE_RELATIVE_PATH), "utf8");
const containerdHelpers = readFileSync(rootPath(GC_REMOTE_CONTAINERD_RELATIVE_PATH), "utf8");
const pvcHelpers = readFileSync(rootPath(GC_REMOTE_PVC_RELATIVE_PATH), "utf8");
const growthHelpers = readFileSync(rootPath(GC_REMOTE_GROWTH_RELATIVE_PATH), "utf8");
const registryHelpers = readFileSync(rootPath(GC_REMOTE_REGISTRY_RELATIVE_PATH), "utf8");
const policyRunner = readFileSync(rootPath(GC_REMOTE_POLICY_RUNNER_RELATIVE_PATH), "utf8");
return template
.replace(GC_REMOTE_WEB_OBSERVE_PLACEHOLDER, webObserveHelpers.trimEnd())
.replace(GC_REMOTE_CONTAINERD_PLACEHOLDER, containerdHelpers.trimEnd())
.replace(GC_REMOTE_PVC_PLACEHOLDER, pvcHelpers.trimEnd())
.replace(GC_REMOTE_GROWTH_PLACEHOLDER, growthHelpers.trimEnd())
.replace(GC_REMOTE_REGISTRY_PLACEHOLDER, registryHelpers.trimEnd())
.replace(GC_REMOTE_POLICY_RUNNER_PLACEHOLDER, Buffer.from(policyRunner, "utf8").toString("base64"))
.replace(GC_REMOTE_RUNNER_CONFIG_PLACEHOLDER, configBase64);
}