From 345b1cb7bb9639fd48d7bb2b3f07225cfa5aa1c6 Mon Sep 17 00:00:00 2001 From: Codex Date: Sun, 5 Jul 2026 08:08:59 +0000 Subject: [PATCH] fix: install JD01 GC growth policy --- config/unidesk-cli.yaml | 16 + docs/reference/gc.md | 2 + scripts/src/gc-remote-policy-runner.py | 477 +++++++++++++++++++++++++ scripts/src/gc-remote-runner.py | 196 +++++++--- scripts/src/gc-remote.ts | 14 +- 5 files changed, 646 insertions(+), 59 deletions(-) create mode 100644 scripts/src/gc-remote-policy-runner.py diff --git a/config/unidesk-cli.yaml b/config/unidesk-cli.yaml index d5319700..0bd980e4 100644 --- a/config/unidesk-cli.yaml +++ b/config/unidesk-cli.yaml @@ -152,21 +152,37 @@ gc: enabled: true overlaySnapshotsRoot: /var/lib/containerd/io.containerd.snapshotter.v1.overlayfs/snapshots contentBlobRoot: /var/lib/containerd/io.containerd.content.v1.content/blobs/sha256 + maxDeletePerRun: 100 localPathStorage: enabled: true root: /var/lib/rancher/k3s/storage orphanDirPrefixes: - pvc- orphanMinAgeMinutes: 0 + maxDeletePerRun: 100 policyTimer: enabled: true name: unidesk-jd01-low-risk-gc onCalendar: daily randomizedDelaySec: 20min + stateDir: /var/lib/unidesk-gc + configDir: /etc/unidesk-gc journalTargetBytes: 256MiB tmpMinAgeHours: 24 includeAptCache: true includeToolCaches: false + includeWebObserveArtifacts: true + includeK3sImageCache: true + includeHostContainerdCache: true + includeLocalPathOrphans: true + agentrunSessionPvcs: + enabled: true + namespace: agentrun-v02 + prefixes: + - agentrun-v01-session- + - agentrun-v02-session- + - agentrun-jd01-v02-session- + maxDeletePerRun: 100 policyTimer: journald: systemMaxUse: 512MiB diff --git a/docs/reference/gc.md b/docs/reference/gc.md index a892a5b7..4a77fc80 100644 --- a/docs/reference/gc.md +++ b/docs/reference/gc.md @@ -101,6 +101,8 @@ JD01 host containerd 只能通过 `gc remote JD01 plan|run --include-host-contai 当 host containerd 的 `ctr` 元数据中 images、containers、tasks、leases、snapshots 和 content 全为空,但 YAML allowlist 下仍残留 overlay snapshot 目录或 content blob 文件时,才能把它们分类为 orphan state。orphan state 清理仍必须通过 `--include-host-containerd-cache` 的 plan/run,执行前重新检查元数据为空、路径在 YAML root 下、名称匹配受控形态、无 symlink、无打开 fd/cwd;不得删除 metadata DB 或扩大到 containerd root。 +JD01 增长降速 policy 由 `gc remote JD01 policy plan|install|status` 管理。policy install 会把原生 Python runner 和 JSON config 写入 YAML 指定的目标路径,再由 systemd timer 周期触发;runner 不读取 host worktree,也不依赖临时 CLI 输出解析。启用阶段必须逐项来自 `config/unidesk-cli.yaml#gc.remote.targets.JD01.policyTimer`:低风险 journal/apt/tmp、可选 tool cache、Web observe stale artifact、AgentRun session PVC、k3s CRI image prune、host containerd orphan state 和 local-path orphan。每个中风险阶段仍使用各自的 owner-aware 保护条件,失败时记录到 policy state,不得扩大成 raw `rm -rf`、raw kubectl 或 containerd metadata 删除。 + JD01 Web observe artifact 是一等 GC 对象。state root 必须来自 YAML;候选按 run 聚合并读取 `manifest.json`、`heartbeat.json`、`pid`、report sha 和 top files。年龄判定以 manifest/heartbeat 的 started/completed/updated 字段、pid 存活和打开 fd 检查为准,不以目录 mtime 为唯一依据,因为手动 GC 或目录遍历可能刷新 mtime。active run、pid alive、open fd、未生成必要 report 的 run 均为 protected。safe 候选只覆盖超过 YAML retention 且可重建的 raw samples、browser-process、network/trace、screenshot 等大 artifact;长期保留 report summary、report json/md、最终截图或诊断摘要由 YAML cap/retention 策略控制。 JD01 Chrome 内存治理应优先管理 observer runner 生命周期,而不是孤立清理 Chrome 进程。Web probe sentinel 和 quick-verify 启动 observer 后,所有终态路径(成功、blocked、失败、timeout、异常)都必须执行 YAML 控制的 `web-probe observe stop`/force stop 流程,并验证对应 runner/Chrome process tree 退出;observe runner 自身也必须从 scenario/YAML 获得最大运行时长或 max samples 兜底,即使调用方退出也会停止采样并关闭 browser。browser freeze policy 只能作为异常保护,不替代正常任务生命周期结束后的 stop。 diff --git a/scripts/src/gc-remote-policy-runner.py b/scripts/src/gc-remote-policy-runner.py new file mode 100644 index 00000000..038934fc --- /dev/null +++ b/scripts/src/gc-remote-policy-runner.py @@ -0,0 +1,477 @@ +#!/usr/bin/env python3 +import json +import os +import re +import shutil +import subprocess +import sys +import time +from datetime import datetime, timezone + + +def now_iso(): + return datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z") + + +def command(args, timeout=60): + started = time.time() + try: + result = subprocess.run(args, text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=timeout) + return { + "exitCode": result.returncode, + "elapsedMs": int((time.time() - started) * 1000), + "stdoutTail": result.stdout[-1200:], + "stderrTail": result.stderr[-1200:], + } + except subprocess.TimeoutExpired as exc: + return { + "exitCode": 124, + "elapsedMs": int((time.time() - started) * 1000), + "stdoutTail": (exc.stdout or "")[-1200:] if isinstance(exc.stdout, str) else "", + "stderrTail": (exc.stderr or "")[-1200:] if isinstance(exc.stderr, str) else "", + "timedOut": True, + } + + +def load_config(path): + with open(path, "r", encoding="utf-8") as handle: + return json.load(handle) + + +def safe_int(value, default=0): + try: + return int(value) + except Exception: + return default + + +def du_size(path): + result = command(["du", "-sb", path], 15) + if result["exitCode"] != 0: + return 0 + try: + return int((result["stdoutTail"] or "0").split()[0]) + except Exception: + return 0 + + +def path_size(path): + try: + stat = os.lstat(path) + except OSError: + return 0 + if os.path.isdir(path) and not os.path.islink(path): + return du_size(path) + return int(getattr(stat, "st_blocks", 0)) * 512 + + +def path_has_open_fd(path): + resolved = os.path.realpath(path) + prefix = resolved.rstrip("/") + "/" + try: + pids = [name for name in os.listdir("/proc") if name.isdigit()] + except OSError: + return True + for pid in pids: + base = os.path.join("/proc", pid) + for name in ["cwd", "root"]: + try: + target = os.path.realpath(os.readlink(os.path.join(base, name))) + except OSError: + continue + if target == resolved or target.startswith(prefix): + return True + fd_dir = os.path.join(base, "fd") + try: + fds = os.listdir(fd_dir) + except OSError: + continue + for fd in fds: + try: + target = os.path.realpath(os.readlink(os.path.join(fd_dir, fd))) + except OSError: + continue + if target == resolved or target.startswith(prefix): + return True + return False + + +def run_json(args, timeout=45): + started = time.time() + try: + process = subprocess.run(args, text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=timeout) + except subprocess.TimeoutExpired as exc: + return None, { + "exitCode": 124, + "elapsedMs": int((time.time() - started) * 1000), + "stdoutTail": (exc.stdout or "")[-1200:] if isinstance(exc.stdout, str) else "", + "stderrTail": (exc.stderr or "")[-1200:] if isinstance(exc.stderr, str) else "", + "timedOut": True, + } + result = { + "exitCode": process.returncode, + "elapsedMs": int((time.time() - started) * 1000), + "stdoutTail": process.stdout[-1200:], + "stderrTail": process.stderr[-1200:], + } + if process.returncode != 0: + return None, result + try: + return json.loads(process.stdout or "{}"), result + except Exception: + return None, result + + +def write_state(config, payload): + state_path = config.get("statePath") + if not state_path: + return + os.makedirs(os.path.dirname(state_path), mode=0o755, exist_ok=True) + tmp = "%s.tmp.%d" % (state_path, os.getpid()) + with open(tmp, "w", encoding="utf-8") as handle: + json.dump(payload, handle, ensure_ascii=False, sort_keys=True) + handle.write("\n") + os.replace(tmp, state_path) + + +def run_journal(config): + target = safe_int(config.get("journalTargetBytes"), 268435456) + result = command(["journalctl", "--vacuum-size=%d" % target], 30) + return {"id": "journal", "ok": result["exitCode"] == 0, "command": result} + + +def run_apt(config): + if not config.get("includeAptCache"): + return {"id": "apt-cache", "ok": True, "skipped": True} + before = du_size("/var/cache/apt/archives") + result = command(["apt-get", "clean"], 30) + after = du_size("/var/cache/apt/archives") + return {"id": "apt-cache", "ok": result["exitCode"] == 0, "reclaimedBytes": max(0, before - after), "command": result} + + +def run_tmp(config): + prefixes = list(config.get("tmpPrefixAllowlist") or []) + protected = set(config.get("tmpExactProtect") or []) + cutoff = time.time() - float(config.get("tmpMinAgeHours") or 24) * 3600.0 + deleted = 0 + reclaimed = 0 + if not os.path.isdir("/tmp"): + return {"id": "tmp-allowlist", "ok": True, "deletedCount": 0, "reclaimedBytes": 0} + for name in os.listdir("/tmp"): + path = os.path.join("/tmp", name) + if path in protected or not any(name.startswith(prefix) for prefix in prefixes): + continue + try: + stat = os.lstat(path) + except OSError: + continue + if stat.st_mtime >= cutoff or path_has_open_fd(path): + continue + before = path_size(path) + if os.path.isdir(path) and not os.path.islink(path): + shutil.rmtree(path, ignore_errors=True) + else: + try: + os.unlink(path) + except FileNotFoundError: + pass + deleted += 1 + reclaimed += before + return {"id": "tmp-allowlist", "ok": True, "deletedCount": deleted, "reclaimedBytes": reclaimed} + + +def run_tool_caches(config): + paths = list(config.get("toolCachePaths") or []) + reclaimed = 0 + deleted = 0 + for path in paths: + resolved = os.path.abspath(path) + if resolved != path or os.path.islink(resolved) or resolved in ["/", "/root", "/root/.npm", "/root/.bun"]: + continue + before = path_size(resolved) + if os.path.isdir(resolved): + shutil.rmtree(resolved, ignore_errors=True) + elif os.path.exists(resolved): + try: + os.unlink(resolved) + except FileNotFoundError: + pass + else: + continue + deleted += 1 + reclaimed += before + return {"id": "tool-caches", "ok": True, "deletedCount": deleted, "reclaimedBytes": reclaimed} + + +def pid_alive(pid): + try: + value = int(pid) + except Exception: + return False + return value > 0 and os.path.exists("/proc/%d" % value) + + +def read_json_file(path): + try: + with open(path, "r", encoding="utf-8") as handle: + return json.load(handle) + except Exception: + return {} + + +def run_web_observe(config): + cfg = config.get("webObserve") or {} + if not cfg.get("enabled"): + return {"id": "web-observe-artifacts", "ok": True, "skipped": True} + roots = [os.path.abspath(item) for item in (cfg.get("observeStateRoots") or cfg.get("webObserveRoots") or []) if isinstance(item, str) and item.startswith("/")] + stale_hours = float(cfg.get("staleRunMaxAgeHours") or 6) + cutoff = time.time() - stale_hours * 3600.0 + deleted = 0 + reclaimed = 0 + protected = 0 + for root in roots: + if not os.path.isdir(root): + continue + for name in os.listdir(root): + path = os.path.abspath(os.path.join(root, name)) + if os.path.dirname(path) != root or os.path.islink(path) or not os.path.isdir(path): + continue + manifest = read_json_file(os.path.join(path, "manifest.json")) + heartbeat = read_json_file(os.path.join(path, "heartbeat.json")) + pid = manifest.get("pid") or heartbeat.get("pid") or read_json_file(os.path.join(path, "pid")).get("pid") + try: + stat = os.lstat(path) + except OSError: + continue + completed = bool(manifest.get("completedAt") or heartbeat.get("completedAt") or manifest.get("status") in ["completed", "failed", "blocked"]) + stale = completed or stat.st_mtime < cutoff + if pid_alive(pid) or not stale or path_has_open_fd(path): + protected += 1 + continue + before = path_size(path) + shutil.rmtree(path, ignore_errors=True) + deleted += 1 + reclaimed += before + return {"id": "web-observe-artifacts", "ok": True, "deletedCount": deleted, "protectedCount": protected, "reclaimedBytes": reclaimed} + + +def active_claims(namespace): + pods, _ = run_json(["kubectl", "-n", namespace, "get", "pod", "-o", "json"], 30) + claims = {} + for pod in (pods or {}).get("items") or []: + if ((pod.get("status") or {}).get("phase")) in ["Succeeded", "Failed"]: + continue + pod_name = ((pod.get("metadata") or {}).get("name")) or "" + for volume in ((pod.get("spec") or {}).get("volumes") or []): + claim = (volume.get("persistentVolumeClaim") or {}).get("claimName") + if claim: + claims.setdefault(claim, []).append(pod_name) + return claims + + +def run_session_pvcs(config): + cfg = config.get("agentrunSessionPvcs") or {} + if not cfg.get("enabled"): + return {"id": "agentrun-session-pvcs", "ok": True, "skipped": True} + namespace = cfg.get("namespace") + prefixes = list(cfg.get("prefixes") or []) + limit = max(1, min(safe_int(cfg.get("maxDeletePerRun"), 100), 1000)) + if not namespace or not prefixes: + return {"id": "agentrun-session-pvcs", "ok": False, "error": "namespace-or-prefixes-missing"} + pv_data, pv_cmd = run_json(["kubectl", "get", "pv", "-o", "json"], 30) + pvc_data, pvc_cmd = run_json(["kubectl", "-n", namespace, "get", "pvc", "-o", "json"], 30) + if pv_data is None or pvc_data is None: + return {"id": "agentrun-session-pvcs", "ok": False, "pvCommand": pv_cmd, "pvcCommand": pvc_cmd} + pvs = {((pv.get("metadata") or {}).get("name")): pv for pv in pv_data.get("items") or []} + active = active_claims(namespace) + selected = [] + protected = 0 + for pvc in pvc_data.get("items") or []: + name = ((pvc.get("metadata") or {}).get("name")) or "" + if not any(name.startswith(prefix) for prefix in prefixes): + continue + pv = pvs.get((pvc.get("spec") or {}).get("volumeName")) or {} + storage_class = (pvc.get("spec") or {}).get("storageClassName") or (pv.get("spec") or {}).get("storageClassName") + reclaim_policy = (pv.get("spec") or {}).get("persistentVolumeReclaimPolicy") + if active.get(name) or storage_class != "local-path" or reclaim_policy != "Delete": + protected += 1 + continue + selected.append(name) + selected = selected[:limit] + if selected: + result = command(["kubectl", "-n", namespace, "delete", "pvc", "--wait=false"] + selected, 45) + ok = result["exitCode"] == 0 + else: + result = None + ok = True + return {"id": "agentrun-session-pvcs", "ok": ok, "deletedPvcCount": len(selected), "protectedCount": protected, "command": result} + + +def ci_active(config): + namespaces = list((config.get("k3sImageCache") or {}).get("ciNamespaces") or []) + active = [] + for namespace in namespaces: + result = command(["sh", "-lc", "KUBECONFIG=/etc/rancher/k3s/k3s.yaml kubectl get pipelinerun,taskrun,job -n %s --no-headers 2>/dev/null | awk '$2 != \"True\" && $2 != \"False\" && $2 != \"Complete\" && $2 != \"Failed\" {print}' | head -20" % namespace], 15) + for line in (result.get("stdoutTail") or "").splitlines(): + if line.strip(): + active.append({"namespace": namespace, "line": line.strip()}) + return active + + +def run_k3s_images(config): + cfg = config.get("k3sImageCache") or {} + if not cfg.get("enabled"): + return {"id": "k3s-image-cache", "ok": True, "skipped": True} + active = ci_active(config) + if active: + return {"id": "k3s-image-cache", "ok": True, "skipped": True, "reason": "ci-active", "activePreview": active[:5]} + endpoint = cfg.get("runtimeEndpoint") or "unix:///run/k3s/containerd/containerd.sock" + before = du_size("/var/lib/rancher/k3s/agent/containerd") + result = command(["crictl", "--runtime-endpoint", endpoint, "rmi", "--prune"], 300) + after = du_size("/var/lib/rancher/k3s/agent/containerd") + return {"id": "k3s-image-cache", "ok": result["exitCode"] == 0, "reclaimedBytes": max(0, before - after), "command": result} + + +def table_data_lines(stdout, header_prefix): + return [line.strip() for line in str(stdout or "").splitlines() if line.strip() and not line.startswith(header_prefix)] + + +def host_containerd_empty(config): + cfg = config.get("hostContainerdCache") or {} + address = cfg.get("address") or "/run/containerd/containerd.sock" + namespaces = cfg.get("namespaces") or ["default"] + active = [] + for namespace in namespaces: + base = ["ctr", "--address", address, "-n", namespace] + for kind, args, header in [ + ("image", ["images", "list", "-q"], ""), + ("container", ["containers", "list", "-q"], ""), + ("task", ["tasks", "list", "-q"], ""), + ("lease", ["leases", "list", "-q"], ""), + ("snapshot", ["snapshots", "list"], "KEY"), + ("content", ["content", "list"], "DIGEST"), + ]: + result = command(base + args, 20) + if result["exitCode"] != 0: + active.append({"namespace": namespace, "kind": kind, "state": "unknown"}) + continue + lines = table_data_lines(result.get("stdoutTail") or "", header) if header else [line for line in (result.get("stdoutTail") or "").splitlines() if line.strip()] + for line in lines: + active.append({"namespace": namespace, "kind": kind, "name": line.split()[0] if line.split() else line}) + return active + + +def direct_child_rows(root, predicate): + rows = [] + if not root or not os.path.isdir(root) or os.path.islink(root): + return rows + real_root = os.path.realpath(os.path.abspath(root)) + for name in sorted(os.listdir(real_root)): + path = os.path.realpath(os.path.abspath(os.path.join(real_root, name))) + if os.path.dirname(path) != real_root or not predicate(name, path): + continue + rows.append({"name": name, "path": path, "sizeBytes": path_size(path)}) + rows.sort(key=lambda item: item["sizeBytes"], reverse=True) + return rows + + +def run_host_containerd(config): + cfg = config.get("hostContainerdCache") or {} + if not cfg.get("enabled"): + return {"id": "host-containerd-orphans", "ok": True, "skipped": True} + active = host_containerd_empty(config) + if active: + return {"id": "host-containerd-orphans", "ok": True, "skipped": True, "reason": "metadata-not-empty", "activePreview": active[:8]} + orphan = cfg.get("orphanCleanup") or {} + if not orphan.get("enabled"): + return {"id": "host-containerd-orphans", "ok": True, "skipped": True, "reason": "orphan-cleanup-disabled"} + overlay_root = os.path.realpath(os.path.abspath(orphan.get("overlaySnapshotsRoot") or "")) + content_root = os.path.realpath(os.path.abspath(orphan.get("contentBlobRoot") or "")) + root = os.path.realpath(os.path.abspath(cfg.get("root") or "")) + if not root or not overlay_root.startswith(root + "/") or not content_root.startswith(root + "/"): + return {"id": "host-containerd-orphans", "ok": False, "error": "orphan-root-outside-containerd-root"} + for root_path in [overlay_root, content_root]: + if os.path.exists(root_path) and path_has_open_fd(root_path): + return {"id": "host-containerd-orphans", "ok": True, "skipped": True, "reason": "open-fd", "root": root_path} + rows = direct_child_rows(overlay_root, lambda name, path: os.path.isdir(path) and not os.path.islink(path) and re.match(r"^[0-9]+$", name) is not None) + rows += direct_child_rows(content_root, lambda name, path: os.path.isfile(path) and not os.path.islink(path) and re.match(r"^[0-9a-f]{64}$", name) is not None) + rows.sort(key=lambda item: item["sizeBytes"], reverse=True) + limit = max(1, min(safe_int(orphan.get("maxDeletePerRun"), 100), 1000)) + reclaimed = 0 + deleted = 0 + for row in rows[:limit]: + before = path_size(row["path"]) + if os.path.isdir(row["path"]): + shutil.rmtree(row["path"], ignore_errors=True) + else: + try: + os.unlink(row["path"]) + except FileNotFoundError: + continue + reclaimed += before + deleted += 1 + return {"id": "host-containerd-orphans", "ok": True, "deletedCount": deleted, "candidateCount": len(rows), "reclaimedBytes": reclaimed} + + +def run_local_path_orphans(config): + cfg = config.get("localPathStorage") or {} + if not cfg.get("enabled"): + return {"id": "local-path-orphans", "ok": True, "skipped": True} + root = os.path.realpath(os.path.abspath(cfg.get("root") or "")) + prefixes = list(cfg.get("orphanDirPrefixes") or []) + if not root or not prefixes or not os.path.isdir(root): + return {"id": "local-path-orphans", "ok": False, "error": "root-or-prefix-missing"} + pv_data, pv_cmd = run_json(["kubectl", "get", "pv", "-o", "json"], 30) + if pv_data is None: + return {"id": "local-path-orphans", "ok": False, "pvCommand": pv_cmd} + referenced = set() + for pv in pv_data.get("items") or []: + spec = pv.get("spec") or {} + path = ((spec.get("hostPath") or {}).get("path")) or ((spec.get("local") or {}).get("path")) + if path: + referenced.add(os.path.realpath(os.path.abspath(path))) + rows = [] + for name in os.listdir(root): + path = os.path.realpath(os.path.abspath(os.path.join(root, name))) + if os.path.dirname(path) != root or not any(name.startswith(prefix) for prefix in prefixes) or path in referenced or path_has_open_fd(path): + continue + if os.path.isdir(path) and not os.path.islink(path): + rows.append({"name": name, "path": path, "sizeBytes": path_size(path)}) + rows.sort(key=lambda item: item["sizeBytes"], reverse=True) + limit = max(1, min(safe_int(cfg.get("maxDeletePerRun"), 100), 1000)) + reclaimed = 0 + deleted = 0 + for row in rows[:limit]: + before = path_size(row["path"]) + shutil.rmtree(row["path"], ignore_errors=True) + reclaimed += before + deleted += 1 + return {"id": "local-path-orphans", "ok": True, "deletedCount": deleted, "candidateCount": len(rows), "reclaimedBytes": reclaimed} + + +def main(): + config_path = sys.argv[1] if len(sys.argv) > 1 else "" + config = load_config(config_path) + results = [] + for fn in [run_journal, run_apt, run_tmp, run_tool_caches, run_web_observe, run_session_pvcs, run_k3s_images, run_host_containerd, run_local_path_orphans]: + try: + results.append(fn(config)) + except Exception as exc: + results.append({"id": getattr(fn, "__name__", "stage"), "ok": False, "error": str(exc)}) + payload = { + "ok": all(item.get("ok") for item in results), + "observedAt": now_iso(), + "providerId": config.get("providerId"), + "unitName": config.get("unitName"), + "resultCount": len(results), + "failedCount": len([item for item in results if not item.get("ok")]), + "reclaimedBytes": sum(safe_int(item.get("reclaimedBytes")) for item in results), + "results": results, + } + write_state(config, payload) + print(json.dumps(payload, ensure_ascii=False)) + return 0 if payload["ok"] else 1 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/scripts/src/gc-remote-runner.py b/scripts/src/gc-remote-runner.py index f84e8dfc..60ab0be8 100644 --- a/scripts/src/gc-remote-runner.py +++ b/scripts/src/gc-remote-runner.py @@ -22,6 +22,7 @@ CONTAINERD_CONFIG = REMOTE_TARGET.get("containerdImageCache") if isinstance(REMO HOST_CONTAINERD_CONFIG = REMOTE_TARGET.get("hostContainerdCache") if isinstance(REMOTE_TARGET.get("hostContainerdCache"), dict) else {} LOCAL_PATH_CONFIG = REMOTE_TARGET.get("localPathStorage") if isinstance(REMOTE_TARGET.get("localPathStorage"), dict) else {} POLICY_TIMER_CONFIG = REMOTE_TARGET.get("policyTimer") if isinstance(REMOTE_TARGET.get("policyTimer"), dict) else {} +POLICY_RUNNER_SOURCE = base64.b64decode("__UNIDESK_GC_REMOTE_POLICY_RUNNER_BASE64__").decode("utf-8") TMP_PREFIX_ALLOWLIST = [ "hwlab-agent-", @@ -1523,61 +1524,52 @@ def render_remote_policy(): tmp_min_age_hours = config_float(POLICY_TIMER_CONFIG, "tmpMinAgeHours", float(OPTIONS.get("tmpMinAgeHours") or 24), minimum=0.0) include_apt_cache = config_bool(POLICY_TIMER_CONFIG, "includeAptCache", bool(OPTIONS.get("aptCache", True))) include_tool_caches = config_bool(POLICY_TIMER_CONFIG, "includeToolCaches", False) - script_path = "/usr/local/sbin/%s.sh" % unit_name + include_web_observe = config_bool(POLICY_TIMER_CONFIG, "includeWebObserveArtifacts", False) + include_k3s_images = config_bool(POLICY_TIMER_CONFIG, "includeK3sImageCache", False) + include_host_containerd = config_bool(POLICY_TIMER_CONFIG, "includeHostContainerdCache", False) + include_local_path_orphans = config_bool(POLICY_TIMER_CONFIG, "includeLocalPathOrphans", False) + state_dir = config_str(POLICY_TIMER_CONFIG, "stateDir", "/var/lib/unidesk-gc") + config_dir = config_str(POLICY_TIMER_CONFIG, "configDir", "/etc/unidesk-gc") + script_path = "/usr/local/sbin/%s.py" % unit_name + config_path = os.path.join(config_dir, "%s.json" % unit_name) + state_path = os.path.join(state_dir, "%s.last.json" % unit_name) service_path = "/etc/systemd/system/%s.service" % unit_name timer_path = "/etc/systemd/system/%s.timer" % unit_name - tool_paths = [item["path"] for item in TOOL_CACHE_ALLOWLIST] if include_tool_caches else [] - script = "\n".join([ - "#!/bin/sh", - "set -eu", - "umask 077", - "journalctl --vacuum-size=%s >/dev/null 2>&1 || true" % int(journal_target), - "apt-get clean >/dev/null 2>&1 || true" if include_apt_cache else ": apt cache disabled by YAML", - "python3 - <<'PY'", - "import json, os, shutil, time", - "prefixes = json.loads(%r)" % json.dumps(TMP_PREFIX_ALLOWLIST), - "protected = set(json.loads(%r))" % json.dumps(sorted(TMP_EXACT_PROTECT)), - "tool_paths = json.loads(%r)" % json.dumps(tool_paths), - "cutoff = time.time() - float(%r) * 3600.0" % tmp_min_age_hours, - "for name in os.listdir('/tmp'):", - " path = os.path.join('/tmp', name)", - " if path in protected or not any(name.startswith(prefix) for prefix in prefixes):", - " continue", - " try:", - " stat = os.lstat(path)", - " except OSError:", - " continue", - " if stat.st_mtime >= cutoff:", - " continue", - " if os.path.isdir(path) and not os.path.islink(path):", - " shutil.rmtree(path, ignore_errors=True)", - " elif os.path.exists(path):", - " try:", - " os.unlink(path)", - " except FileNotFoundError:", - " pass", - "for path in tool_paths:", - " resolved = os.path.abspath(path)", - " if resolved != path or os.path.islink(resolved) or resolved in ['/', '/root', '/root/.npm', '/root/.bun']:", - " continue", - " if os.path.isdir(resolved):", - " shutil.rmtree(resolved, ignore_errors=True)", - " elif os.path.exists(resolved):", - " try:", - " os.unlink(resolved)", - " except FileNotFoundError:", - " pass", - "PY", - "", - ]) + policy_config = { + "providerId": PROVIDER_ID, + "unitName": unit_name, + "statePath": state_path, + "journalTargetBytes": int(journal_target), + "tmpMinAgeHours": tmp_min_age_hours, + "tmpPrefixAllowlist": TMP_PREFIX_ALLOWLIST, + "tmpExactProtect": sorted(TMP_EXACT_PROTECT), + "includeAptCache": include_apt_cache, + "toolCachePaths": [item["path"] for item in TOOL_CACHE_ALLOWLIST] if include_tool_caches else [], + "webObserve": {"enabled": include_web_observe, **MEMORY_CONFIG}, + "k3sImageCache": {"enabled": include_k3s_images, **CONTAINERD_CONFIG}, + "hostContainerdCache": {"enabled": include_host_containerd, **HOST_CONTAINERD_CONFIG}, + "localPathStorage": {"enabled": include_local_path_orphans, **LOCAL_PATH_CONFIG}, + "agentrunSessionPvcs": POLICY_TIMER_CONFIG.get("agentrunSessionPvcs") if isinstance(POLICY_TIMER_CONFIG.get("agentrunSessionPvcs"), dict) else {"enabled": False}, + } + stages = [ + {"id": "journal", "enabled": True, "risk": "low", "mode": "journalctl-vacuum"}, + {"id": "apt-cache", "enabled": include_apt_cache, "risk": "low", "mode": "apt-get-clean"}, + {"id": "tmp-allowlist", "enabled": True, "risk": "low", "mode": "direct-child-prefix-retention"}, + {"id": "tool-caches", "enabled": include_tool_caches, "risk": "medium", "mode": "fixed-path-rebuildable-cache"}, + {"id": "web-observe-artifacts", "enabled": include_web_observe, "risk": "medium", "mode": "manifest-heartbeat-pid-openfd-stale-run"}, + {"id": "agentrun-session-pvcs", "enabled": bool(policy_config["agentrunSessionPvcs"].get("enabled")), "risk": "medium", "mode": "kubectl-delete-pvc-wait-false-owner-aware"}, + {"id": "k3s-image-cache", "enabled": include_k3s_images, "risk": "medium", "mode": "crictl-rmi-prune-ci-idle"}, + {"id": "host-containerd-orphans", "enabled": include_host_containerd, "risk": "medium", "mode": "ctr-metadata-empty-yaml-orphan-state"}, + {"id": "local-path-orphans", "enabled": include_local_path_orphans, "risk": "medium", "mode": "pv-unreferenced-direct-child"}, + ] service = "\n".join([ "[Unit]", - "Description=UniDesk remote low-risk GC for %s" % PROVIDER_ID, + "Description=UniDesk remote growth-slowdown GC for %s" % PROVIDER_ID, "Documentation=config/unidesk-cli.yaml#gc.remote.targets.%s.policyTimer" % PROVIDER_ID, "", "[Service]", "Type=oneshot", - "ExecStart=%s" % script_path, + "ExecStart=/usr/bin/python3 %s %s" % (script_path, config_path), "Nice=10", "IOSchedulingClass=best-effort", "IOSchedulingPriority=7", @@ -1599,6 +1591,10 @@ def render_remote_policy(): return { "unitName": unit_name, "scriptPath": script_path, + "configPath": config_path, + "statePath": state_path, + "configDir": config_dir, + "stateDir": state_dir, "servicePath": service_path, "timerPath": timer_path, "onCalendar": on_calendar, @@ -1608,7 +1604,13 @@ def render_remote_policy(): "tmpMinAgeHours": tmp_min_age_hours, "includeAptCache": include_apt_cache, "includeToolCaches": include_tool_caches, - "script": script, + "includeWebObserveArtifacts": include_web_observe, + "includeK3sImageCache": include_k3s_images, + "includeHostContainerdCache": include_host_containerd, + "includeLocalPathOrphans": include_local_path_orphans, + "stages": stages, + "policyConfig": policy_config, + "script": POLICY_RUNNER_SOURCE, "service": service, "timer": timer, } @@ -1624,30 +1626,106 @@ def remote_policy_plan_payload(observed_at): "observedAt": observed_at, "configSource": "config/unidesk-cli.yaml#gc.remote.targets.%s.policyTimer" % PROVIDER_ID, "enabled": config_bool(POLICY_TIMER_CONFIG, "enabled", False), - "timer": {key: rendered.get(key) for key in ["unitName", "scriptPath", "servicePath", "timerPath", "onCalendar", "randomizedDelaySec", "journalTargetBytes", "journalTarget", "tmpMinAgeHours", "includeAptCache", "includeToolCaches"]}, - "scriptPreview": "\n".join(rendered["script"].splitlines()[:20]), - "servicePreview": rendered["service"], - "timerPreview": rendered["timer"], + "timer": {key: rendered.get(key) for key in ["unitName", "scriptPath", "configPath", "statePath", "servicePath", "timerPath", "onCalendar", "randomizedDelaySec", "journalTargetBytes", "journalTarget", "tmpMinAgeHours", "includeAptCache", "includeToolCaches", "includeWebObserveArtifacts", "includeK3sImageCache", "includeHostContainerdCache", "includeLocalPathOrphans"]}, + "stages": rendered.get("stages"), + "servicePreview": rendered["service"] if bool(OPTIONS.get("full")) else None, + "timerPreview": rendered["timer"] if bool(OPTIONS.get("full")) else None, "installCommand": "bun scripts/cli.ts gc remote %s policy install --confirm" % PROVIDER_ID, + "statusCommand": "bun scripts/cli.ts gc remote %s policy status" % PROVIDER_ID, "policy": { - "risk": "low", + "risk": "low-to-medium-owner-aware", "neverTouches": [ - "k3s runtime directories", - "PVC/PV/local-path data", + "k3s runtime metadata unless a dedicated CRI prune stage is enabled", + "active PVC/PV/local-path data", "Docker images, containers, volumes or Docker build cache", "Secret/auth/config state", "active Web observe runners or Chrome processes", ], - "toolCaches": "disabled unless config/unidesk-cli.yaml enables includeToolCaches for this remote target", + "sourceOfTruth": "config/unidesk-cli.yaml#gc.remote.targets.%s.policyTimer" % PROVIDER_ID, }, } +def remote_policy_status_payload(observed_at): + rendered = render_remote_policy() + timer = command(["systemctl", "show", "%s.timer" % rendered["unitName"], "--property=LoadState,ActiveState,SubState,NextElapseUSecRealtime,LastTriggerUSec"], 10) + service = command(["systemctl", "show", "%s.service" % rendered["unitName"], "--property=LoadState,ActiveState,SubState,Result,ExecMainStatus"], 10) + last = None + try: + with open(rendered["statePath"], "r", encoding="utf-8") as handle: + last = json.load(handle) + except Exception: + last = None + if last is not None and not bool(OPTIONS.get("full")): + last = compact_policy_last_run(last) + return { + "ok": True, + "action": "gc remote policy status", + "providerId": PROVIDER_ID, + "dryRun": True, + "mutation": False, + "observedAt": observed_at, + "timer": {key: rendered.get(key) for key in ["unitName", "scriptPath", "configPath", "statePath", "servicePath", "timerPath", "onCalendar", "randomizedDelaySec"]}, + "systemd": { + "timer": bounded(timer), + "service": bounded(service), + }, + "lastRun": last, + "next": { + "plan": "bun scripts/cli.ts gc remote %s policy plan" % PROVIDER_ID, + "install": "bun scripts/cli.ts gc remote %s policy install --confirm" % PROVIDER_ID, + "disk": "bun scripts/cli.ts gc remote %s status --limit 20" % PROVIDER_ID, + }, + } + +def compact_policy_last_run(last): + results = [] + for item in last.get("results") or []: + if not isinstance(item, dict): + continue + compact = { + key: item.get(key) + for key in [ + "id", + "ok", + "skipped", + "reason", + "error", + "deletedCount", + "deletedPvcCount", + "protectedCount", + "candidateCount", + "reclaimedBytes", + ] + if item.get(key) is not None + } + command_payload = item.get("command") if isinstance(item.get("command"), dict) else None + if command_payload is not None: + compact["command"] = { + key: command_payload.get(key) + for key in ["exitCode", "timedOut", "elapsedMs"] + if command_payload.get(key) is not None + } + results.append(compact) + payload = { + key: last.get(key) + for key in ["ok", "observedAt", "providerId", "unitName", "resultCount", "failedCount", "reclaimedBytes"] + if last.get(key) is not None + } + payload["results"] = results + return payload + def remote_policy_install_payload(observed_at): rendered = render_remote_policy() try: + os.makedirs(rendered["configDir"], mode=0o755, exist_ok=True) + os.makedirs(rendered["stateDir"], mode=0o755, exist_ok=True) with open(rendered["scriptPath"], "w", encoding="utf-8") as handle: handle.write(rendered["script"]) os.chmod(rendered["scriptPath"], 0o755) + with open(rendered["configPath"], "w", encoding="utf-8") as handle: + json.dump(rendered["policyConfig"], handle, sort_keys=True, separators=(",", ":")) + handle.write("\n") + os.chmod(rendered["configPath"], 0o644) with open(rendered["servicePath"], "w", encoding="utf-8") as handle: handle.write(rendered["service"]) with open(rendered["timerPath"], "w", encoding="utf-8") as handle: @@ -1675,7 +1753,8 @@ def remote_policy_install_payload(observed_at): "mutation": True, "observedAt": observed_at, "configSource": "config/unidesk-cli.yaml#gc.remote.targets.%s.policyTimer" % PROVIDER_ID, - "timer": {key: rendered.get(key) for key in ["unitName", "scriptPath", "servicePath", "timerPath", "onCalendar", "randomizedDelaySec", "journalTargetBytes", "journalTarget", "tmpMinAgeHours", "includeAptCache", "includeToolCaches"]}, + "timer": {key: rendered.get(key) for key in ["unitName", "scriptPath", "configPath", "statePath", "servicePath", "timerPath", "onCalendar", "randomizedDelaySec", "journalTargetBytes", "journalTarget", "tmpMinAgeHours", "includeAptCache", "includeToolCaches", "includeWebObserveArtifacts", "includeK3sImageCache", "includeHostContainerdCache", "includeLocalPathOrphans"]}, + "stages": rendered.get("stages"), "systemd": { "daemonReload": bounded(daemon), "enableNow": bounded(enable), @@ -1692,6 +1771,9 @@ def main(): if ACTION == "policy-install": emit_json(remote_policy_install_payload(observed_at), persist_large=False) return 0 + if ACTION == "policy-status": + emit_json(remote_policy_status_payload(observed_at), persist_large=False) + return 0 if ACTION == "trend": history_limit = int(OPTIONS.get("historyLimit") or 12) history = read_growth_snapshots(history_limit) diff --git a/scripts/src/gc-remote.ts b/scripts/src/gc-remote.ts index a39ca35a..c9ec2a6a 100644 --- a/scripts/src/gc-remote.ts +++ b/scripts/src/gc-remote.ts @@ -5,7 +5,7 @@ import { type UniDeskConfig, rootPath } from "./config"; import { remoteGcDegradedFailure } from "./gc-remote-degraded"; import { runSshCommandCapture } from "./ssh"; -type RemoteGcAction = "plan" | "snapshot" | "trend" | "run" | "status" | "policy-plan" | "policy-install"; +type RemoteGcAction = "plan" | "snapshot" | "trend" | "run" | "status" | "policy-plan" | "policy-install" | "policy-status"; interface RemoteGcOptions { confirm: boolean; @@ -81,6 +81,8 @@ const GC_REMOTE_GROWTH_RELATIVE_PATH = "scripts/src/gc-remote-growth.py"; const GC_REMOTE_GROWTH_PLACEHOLDER = "# __UNIDESK_GC_REMOTE_GROWTH_HELPERS__"; const GC_REMOTE_REGISTRY_RELATIVE_PATH = "scripts/src/gc-remote-registry.py"; const GC_REMOTE_REGISTRY_PLACEHOLDER = "# __UNIDESK_GC_REMOTE_REGISTRY_HELPERS__"; +const GC_REMOTE_POLICY_RUNNER_RELATIVE_PATH = "scripts/src/gc-remote-policy-runner.py"; +const GC_REMOTE_POLICY_RUNNER_PLACEHOLDER = "__UNIDESK_GC_REMOTE_POLICY_RUNNER_BASE64__"; export async function runRemoteGcCommand(config: UniDeskConfig, providerId: string | undefined, action: string | undefined, args: string[]): Promise { if (providerId === undefined || providerId.length === 0) { @@ -111,11 +113,14 @@ export async function runRemoteGcCommand(config: UniDeskConfig, providerId: stri } return await runRemoteGc(config, providerId, "policy-install", options); } + if (policyAction === "status") { + return await runRemoteGc(config, providerId, "policy-status", options); + } return { ok: false, error: "unsupported-gc-remote-policy-action", action: policyAction, - supportedActions: ["plan", "render", "dry-run", "install"], + supportedActions: ["plan", "render", "dry-run", "install", "status"], }; } const options = parseRemoteGcOptions(args); @@ -344,16 +349,21 @@ function remoteGcPython(configBase64: string): string { if (!template.includes(GC_REMOTE_REGISTRY_PLACEHOLDER)) { throw new Error(`${GC_REMOTE_RUNNER_RELATIVE_PATH} missing ${GC_REMOTE_REGISTRY_PLACEHOLDER}`); } + if (!template.includes(GC_REMOTE_POLICY_RUNNER_PLACEHOLDER)) { + throw new Error(`${GC_REMOTE_RUNNER_RELATIVE_PATH} missing ${GC_REMOTE_POLICY_RUNNER_PLACEHOLDER}`); + } const webObserveHelpers = readFileSync(rootPath(GC_REMOTE_WEB_OBSERVE_RELATIVE_PATH), "utf8"); const containerdHelpers = readFileSync(rootPath(GC_REMOTE_CONTAINERD_RELATIVE_PATH), "utf8"); const pvcHelpers = readFileSync(rootPath(GC_REMOTE_PVC_RELATIVE_PATH), "utf8"); const growthHelpers = readFileSync(rootPath(GC_REMOTE_GROWTH_RELATIVE_PATH), "utf8"); const registryHelpers = readFileSync(rootPath(GC_REMOTE_REGISTRY_RELATIVE_PATH), "utf8"); + const policyRunner = readFileSync(rootPath(GC_REMOTE_POLICY_RUNNER_RELATIVE_PATH), "utf8"); return template .replace(GC_REMOTE_WEB_OBSERVE_PLACEHOLDER, webObserveHelpers.trimEnd()) .replace(GC_REMOTE_CONTAINERD_PLACEHOLDER, containerdHelpers.trimEnd()) .replace(GC_REMOTE_PVC_PLACEHOLDER, pvcHelpers.trimEnd()) .replace(GC_REMOTE_GROWTH_PLACEHOLDER, growthHelpers.trimEnd()) .replace(GC_REMOTE_REGISTRY_PLACEHOLDER, registryHelpers.trimEnd()) + .replace(GC_REMOTE_POLICY_RUNNER_PLACEHOLDER, Buffer.from(policyRunner, "utf8").toString("base64")) .replace(GC_REMOTE_RUNNER_CONFIG_PLACEHOLDER, configBase64); }