diff --git a/config/platform-infra/sub2api-codex-pool.yaml b/config/platform-infra/sub2api-codex-pool.yaml new file mode 100644 index 00000000..4e3b32bb --- /dev/null +++ b/config/platform-infra/sub2api-codex-pool.yaml @@ -0,0 +1,6 @@ +pool: + groupName: unidesk-codex-pool + apiKeyName: unidesk-codex-pool-api-key + apiKeySecretName: sub2api-codex-pool-api-key + apiKeySecretKey: API_KEY + minOwnerBalanceUsd: 1000 diff --git a/scripts/gc-remote-growth-contract-test.ts b/scripts/gc-remote-growth-contract-test.ts new file mode 100644 index 00000000..db82d67d --- /dev/null +++ b/scripts/gc-remote-growth-contract-test.ts @@ -0,0 +1,97 @@ +const sourceText = await Bun.file(new URL("./src/gc-remote.ts", import.meta.url)).text(); + +function assertCondition(condition: unknown, message: string, detail: unknown = {}): void { + if (!condition) throw new Error(`${message}: ${JSON.stringify(detail)}`); +} + +function functionBody(name: string): string { + const marker = `def ${name}(`; + const start = sourceText.indexOf(marker); + if (start < 0) return ""; + const next = sourceText.indexOf("\ndef ", start + marker.length); + return sourceText.slice(start, next < 0 ? sourceText.length : next); +} + +assertCondition( + sourceText.includes('if (subaction === "snapshot" || subaction === "growth")') + && sourceText.includes('if (subaction === "trend")') + && sourceText.includes('supportedActions: ["plan", "snapshot", "trend", "run", "status"]'), + "gc remote must expose snapshot/growth and trend actions", +); + +assertCondition( + sourceText.includes("historyLimit: number") + && sourceText.includes("saveSnapshot: boolean") + && sourceText.includes("--history-limit") + && sourceText.includes("--no-save"), + "gc remote snapshot must expose bounded history and no-save options", +); + +const snapshotBody = functionBody("collect_growth_snapshot"); +assertCondition( + snapshotBody.includes('"action": "gc remote snapshot"') + && snapshotBody.includes('"diagnosticStateMutation"') + && snapshotBody.includes("disk_source_snapshot()") + && snapshotBody.includes("ci_storage_snapshot()") + && snapshotBody.includes("registry_growth_snapshot()") + && snapshotBody.includes("containerd_breakdown_snapshot()"), + "growth snapshot must include disk sources, CI PVC ownership, registry, containerd and diagnostic-state disclosure", + snapshotBody.slice(0, 1200), +); + +const storageBody = functionBody("ci_storage_snapshot"); +assertCondition( + storageBody.includes('"hwlab"') + && storageBody.includes('"agentrun"') + && storageBody.includes('"byOwnerGroup"') + && storageBody.includes("hwlab g14 control-plane cleanup-runs") + && storageBody.includes("agentrun v01 control-plane cleanup-runs") + && storageBody.includes("hostPath") + && storageBody.includes("activeMountPods") + && storageBody.includes("estimatedBytes") + && storageBody.includes('phase in set(["Succeeded", "Failed"])'), + "CI storage snapshot must keep HWLAB and AgentRun owner handoff plus reclaim visibility", + storageBody.slice(0, 1600), +); + +const registryBody = functionBody("registry_growth_snapshot"); +assertCondition( + registryBody.includes('"dryRun": "daily or before/after every v0.2 CI/CD burst"') + && registryBody.includes('"maintenanceRun": "weekly, or when root >=80%, or when registry growth exceeds the agreed daily threshold"') + && registryBody.includes("plan_registry_retention()") + && registryBody.includes("protected tags") + && registryBody.includes("newest N tags per repo"), + "registry growth snapshot must disclose dry-run cadence, maintenance trigger and retention protections", + registryBody, +); + +const containerdBody = functionBody("containerd_breakdown_snapshot"); +assertCondition( + containerdBody.includes('"state": "observation-only"') + && containerdBody.includes('"cleanupSupported": False') + && containerdBody.includes("reference-safe image/content classifier"), + "containerd section must stay observation-only until a safe classifier exists", + containerdBody, +); + +const trendBody = functionBody("growth_trend_payload"); +assertCondition( + trendBody.includes('"latestDelta"') + && trendBody.includes('"windowDelta"') + && trendBody.includes('"short-window-rate-noisy"') + && trendBody.includes('"topGrowingBytes"') + && trendBody.includes('"registryCounters"'), + "growth trend must expose latest/window delta and registry counters", + trendBody, +); + +console.log(JSON.stringify({ + ok: true, + checks: [ + "gc remote exposes snapshot/growth and trend actions", + "snapshot history is bounded and can be disabled with --no-save", + "snapshot includes source sizes, owner-aware CI PVCs, registry cadence and containerd observation", + "HWLAB and AgentRun retention handoff commands are explicit", + "trend includes latest/window deltas and registry counters", + ], +})); diff --git a/scripts/src/gc-remote.ts b/scripts/src/gc-remote.ts index a265c19a..b68f7333 100644 --- a/scripts/src/gc-remote.ts +++ b/scripts/src/gc-remote.ts @@ -25,6 +25,8 @@ interface RemoteGcOptions { limit: number; resultLimit: number; full: boolean; + historyLimit: number; + saveSnapshot: boolean; } const DEFAULT_REMOTE_OPTIONS: RemoteGcOptions = { @@ -47,6 +49,8 @@ const DEFAULT_REMOTE_OPTIONS: RemoteGcOptions = { limit: 50, resultLimit: 50, full: false, + historyLimit: 12, + saveSnapshot: true, }; export async function runRemoteGcCommand(config: UniDeskConfig, providerId: string | undefined, action: string | undefined, args: string[]): Promise { @@ -54,12 +58,14 @@ export async function runRemoteGcCommand(config: UniDeskConfig, providerId: stri return { ok: false, error: "gc-remote-provider-required", - usage: "bun scripts/cli.ts gc remote plan|run|status [--confirm]", + usage: "bun scripts/cli.ts gc remote plan|snapshot|trend|run|status [--confirm]", }; } const subaction = action ?? "plan"; const options = parseRemoteGcOptions(args); if (subaction === "plan" || subaction === "dry-run") return await runRemoteGc(config, providerId, "plan", options); + if (subaction === "snapshot" || subaction === "growth") return await runRemoteGc(config, providerId, "snapshot", options); + if (subaction === "trend") return await runRemoteGc(config, providerId, "trend", options); if (subaction === "status") return await runRemoteGc(config, providerId, "status", options); if (subaction === "run") { if (!options.confirm) { @@ -79,7 +85,7 @@ export async function runRemoteGcCommand(config: UniDeskConfig, providerId: stri ok: false, error: "unsupported-gc-remote-action", action: subaction, - supportedActions: ["plan", "run", "status"], + supportedActions: ["plan", "snapshot", "trend", "run", "status"], }; } @@ -126,6 +132,10 @@ function parseRemoteGcOptions(args: string[]): RemoteGcOptions { const value = parseNonNegativeNumber(arg, args[++index]); if (!Number.isInteger(value) || value <= 0) throw new Error("--result-limit must be a positive integer"); options.resultLimit = Math.min(value, 5000); + } else if (arg === "--history-limit") { + const value = parseNonNegativeNumber(arg, args[++index]); + if (!Number.isInteger(value) || value <= 1) throw new Error("--history-limit must be an integer greater than 1"); + options.historyLimit = Math.min(value, 200); } else if (arg === "--no-journal") { options.journal = false; } else if (arg === "--no-docker-logs") { @@ -145,6 +155,8 @@ function parseRemoteGcOptions(args: string[]): RemoteGcOptions { options.registryGcOnly = true; } else if (arg === "--full" || arg === "--raw") { options.full = true; + } else if (arg === "--no-save" || arg === "--no-snapshot-save") { + options.saveSnapshot = false; } else { throw new Error(`unknown gc remote option: ${arg}`); } @@ -178,7 +190,7 @@ function parseSize(raw: string): number | null { return Number.isFinite(bytes) ? bytes : null; } -async function runRemoteGc(config: UniDeskConfig, providerId: string, action: "plan" | "run" | "status", options: RemoteGcOptions): Promise { +async function runRemoteGc(config: UniDeskConfig, providerId: string, action: "plan" | "snapshot" | "trend" | "run" | "status", options: RemoteGcOptions): Promise { const scriptConfig = Buffer.from(JSON.stringify({ providerId, action, options }), "utf8").toString("base64"); const result = await runSshCommandCapture(config, providerId, ["py"], remoteGcPython(scriptConfig)); if (result.exitCode !== 0) { @@ -210,6 +222,7 @@ async function runRemoteGc(config: UniDeskConfig, providerId: string, action: "p function remoteGcPython(configBase64: string): string { return String.raw` import base64 +import calendar import json import os import re @@ -281,6 +294,7 @@ REGISTRY_PROTECTED_TAGS = set([ EXPECTED_G14_NODE = "ubuntu-rog-zephyrus-g14-ga401iv-ga401iv" REMOTE_GC_JOB_DIR = "/tmp/unidesk-gc-remote/jobs" +REMOTE_GROWTH_SNAPSHOT_DIR = "/tmp/unidesk-gc-remote/growth-snapshots" REMOTE_STDOUT_JSON_LIMIT = 256 * 1024 def now_iso(): @@ -496,6 +510,391 @@ def du_size(path, timeout=20): except Exception: return path_size(path) +def safe_int(value, default=0): + try: + if value is None: + return default + return int(value) + except Exception: + return default + +def iso_to_epoch(value): + try: + return calendar.timegm(time.strptime(str(value), "%Y-%m-%dT%H:%M:%SZ")) + except Exception: + return None + +def growth_snapshot_path(): + os.makedirs(REMOTE_GROWTH_SNAPSHOT_DIR, exist_ok=True) + provider_slug = re.sub(r"[^A-Za-z0-9._-]+", "-", PROVIDER_ID.lower()).strip("-") or "provider" + return os.path.join(REMOTE_GROWTH_SNAPSHOT_DIR, "%s.jsonl" % provider_slug) + +def read_growth_snapshots(limit=None): + path = growth_snapshot_path() + if not os.path.isfile(path): + return [] + try: + with open(path, "r", encoding="utf-8") as handle: + lines = handle.readlines() + except OSError: + return [] + rows = [] + for line in lines[-max(1, int(limit or 200)):]: + line = line.strip() + if not line: + continue + try: + item = json.loads(line) + except Exception: + continue + if isinstance(item, dict): + rows.append(item) + return rows + +def append_growth_snapshot(snapshot): + path = growth_snapshot_path() + os.makedirs(os.path.dirname(path), exist_ok=True) + with open(path, "a", encoding="utf-8") as handle: + handle.write(json.dumps(snapshot, ensure_ascii=False, sort_keys=True)) + handle.write("\n") + return path + +def source_size_item(source_id, label, path, cleanup_owner, timeout=20): + size = du_size(path, timeout) if os.path.exists(path) else None + return { + "id": source_id, + "label": label, + "path": path, + "exists": size is not None, + "sizeBytes": size, + "sizeHuman": fmt_bytes(size or 0), + "cleanupOwner": cleanup_owner, + } + +def disk_source_snapshot(): + sources = [ + source_size_item("hwlab-host-data", "HWLAB host data", "/var/lib/hwlab", "hwlab-registry-retention", 60), + source_size_item("hwlab-registry", "HWLAB registry", REGISTRY_ROOT, "gc-remote-hwlab-registry", 60), + source_size_item("k3s-storage", "k3s local-path storage", "/var/lib/rancher/k3s/storage", "owner-aware-pvc-retention", 45), + source_size_item("k3s-containerd", "k3s containerd", "/var/lib/rancher/k3s/agent/containerd", "observation-only", 45), + source_size_item("host-containerd", "host containerd", "/var/lib/containerd", "observation-only", 30), + source_size_item("kubelet", "kubelet state", "/var/lib/kubelet", "protected-runtime", 20), + source_size_item("var-log", "host logs", "/var/log", "gc-remote-logs-journald", 20), + source_size_item("tmp", "allowlisted tmp and other tmp", "/tmp", "gc-remote-tmp-allowlist", 20), + source_size_item("apt-cache", "apt archives", "/var/cache/apt/archives", "gc-remote-apt-cache", 10), + source_size_item("hwlab-v02-source", "HWLAB v0.2 source workspace", "/root/hwlab-v02", "protected-source", 20), + source_size_item("agentrun-source", "AgentRun source workspace", "/root/agentrun-v01", "protected-source", 20), + ] + return [item for item in sources if item.get("exists")] + +def containerd_breakdown_snapshot(): + rows = [ + source_size_item("k3s-containerd-content", "k3s containerd content store", "/var/lib/rancher/k3s/agent/containerd/io.containerd.content.v1.content", "observation-only", 30), + source_size_item("k3s-containerd-overlayfs", "k3s containerd overlay snapshots", "/var/lib/rancher/k3s/agent/containerd/io.containerd.snapshotter.v1.overlayfs", "observation-only", 30), + source_size_item("host-containerd-content", "host containerd content store", "/var/lib/containerd/io.containerd.content.v1.content", "observation-only", 20), + source_size_item("host-containerd-overlayfs", "host containerd overlay snapshots", "/var/lib/containerd/io.containerd.snapshotter.v1.overlayfs", "observation-only", 20), + ] + rows = [item for item in rows if item.get("exists")] + return { + "state": "observation-only", + "cleanupSupported": False, + "reason": "containerd cleanup still requires a reference-safe image/content classifier; this snapshot only classifies growth sources", + "breakdown": rows, + } + +def pv_host_path(pv): + spec = (pv or {}).get("spec") or {} + host_path = (spec.get("hostPath") or {}).get("path") + if isinstance(host_path, str) and host_path: + return host_path + local_path = (spec.get("local") or {}).get("path") + if isinstance(local_path, str) and local_path: + return local_path + return None + +def pvc_owner_group(namespace, owner): + owner = str(owner or "") + if namespace == "agentrun-ci": + return "agentrun" + if namespace == "hwlab-ci": + if owner.startswith("agentrun-"): + return "agentrun" + return "hwlab" + if namespace.startswith("hwlab-"): + return "hwlab-runtime" + return "other" + +def ci_storage_snapshot(): + pv_data = kubectl_json(["get", "pv"], 30) or {} + pvc_data = kubectl_json(["get", "pvc", "-A"], 30) or {} + pod_data = kubectl_json(["get", "pod", "-A"], 30) or {} + pvs = {} + for pv in pv_data.get("items") or []: + meta = pv.get("metadata") or {} + name = meta.get("name") + if name: + pvs[name] = pv + mounts = {} + for pod in pod_data.get("items") or []: + meta = pod.get("metadata") or {} + ns = str(meta.get("namespace") or "") + pod_name = str(meta.get("name") or "") + phase = str(((pod.get("status") or {}).get("phase")) or "") + if phase in set(["Succeeded", "Failed"]): + continue + spec = pod.get("spec") or {} + for vol in spec.get("volumes") or []: + claim = (vol.get("persistentVolumeClaim") or {}).get("claimName") + if claim: + mounts.setdefault((ns, claim), []).append(pod_name) + rows = [] + for pvc in pvc_data.get("items") or []: + meta = pvc.get("metadata") or {} + spec = pvc.get("spec") or {} + status = pvc.get("status") or {} + ns = str(meta.get("namespace") or "") + name = str(meta.get("name") or "") + if ns not in set(["hwlab-ci", "agentrun-ci"]): + continue + volume = str(spec.get("volumeName") or "") + pv = pvs.get(volume) or {} + pv_spec = pv.get("spec") or {} + owner_refs = meta.get("ownerReferences") or [] + owner_kind = None + owner_name = None + if owner_refs: + owner_kind = owner_refs[0].get("kind") + owner_name = owner_refs[0].get("name") + host_path = pv_host_path(pv) + active = sorted(mounts.get((ns, name), [])) + estimated = du_size(host_path, 8) if host_path else None + rows.append({ + "namespace": ns, + "pvc": name, + "volume": volume or None, + "phase": status.get("phase"), + "ownerKind": owner_kind, + "owner": owner_name, + "ownerGroup": pvc_owner_group(ns, owner_name), + "storageClass": spec.get("storageClassName") or pv_spec.get("storageClassName"), + "reclaimPolicy": pv_spec.get("persistentVolumeReclaimPolicy"), + "hostPath": host_path, + "activeMountPods": active, + "estimatedBytes": estimated, + "estimatedHuman": fmt_bytes(estimated or 0), + }) + rows.sort(key=lambda item: safe_int(item.get("estimatedBytes")), reverse=True) + by_namespace = {} + by_owner_group = {} + for row in rows: + for bucket, key in [(by_namespace, row.get("namespace") or "unknown"), (by_owner_group, row.get("ownerGroup") or "unknown")]: + current = bucket.setdefault(key, {"count": 0, "estimatedBytes": 0, "activeMountCount": 0}) + current["count"] += 1 + current["estimatedBytes"] += safe_int(row.get("estimatedBytes")) + current["activeMountCount"] += len(row.get("activeMountPods") or []) + current["estimatedHuman"] = fmt_bytes(current["estimatedBytes"]) + return { + "scope": "hwlab-ci and agentrun-ci PVCs only", + "pvcCount": len(rows), + "estimatedBytes": sum(safe_int(row.get("estimatedBytes")) for row in rows), + "estimatedHuman": fmt_bytes(sum(safe_int(row.get("estimatedBytes")) for row in rows)), + "byNamespace": by_namespace, + "byOwnerGroup": by_owner_group, + "topPvcs": rows[:int(OPTIONS.get("limit") or 50)], + "handoff": { + "hwlab": { + "dryRun": "bun scripts/cli.ts hwlab g14 control-plane cleanup-runs --lane v02 --min-age-minutes 30 --limit 200 --dry-run", + "releasedPvs": "bun scripts/cli.ts hwlab g14 control-plane cleanup-released-pvs --lane all --limit 200 --dry-run", + }, + "agentrun": { + "dryRun": "bun scripts/cli.ts agentrun v01 control-plane cleanup-runs --min-age-minutes 30 --limit 200 --dry-run", + "releasedPvs": "bun scripts/cli.ts agentrun v01 control-plane cleanup-released-pvs --limit 200 --dry-run", + }, + }, + } + +def registry_growth_snapshot(): + summary = { + "path": REGISTRY_ROOT, + "sizeBytes": du_size(REGISTRY_ROOT, 60) or 0, + } + summary["sizeHuman"] = fmt_bytes(summary["sizeBytes"]) + if OPTIONS.get("hwlabRegistry", False): + plan = plan_registry_retention() + retention = dict(plan.get("summary") or {}) + for key in ["registrySizeBytes", "estimatedReclaimBytes"]: + if key in retention: + retention[key.replace("Bytes", "Human")] = fmt_bytes(retention.get(key) or 0) + summary["retentionPlan"] = retention + else: + summary["retentionPlan"] = { + "skipped": True, + "reason": "rerun snapshot with --include-hwlab-registry to compute tag/revision retention counters", + } + summary["cadence"] = { + "dryRun": "daily or before/after every v0.2 CI/CD burst", + "maintenanceRun": "weekly, or when root >=80%, or when registry growth exceeds the agreed daily threshold", + "planCommand": "bun scripts/cli.ts gc remote %s plan --target-use-percent 70 --include-hwlab-registry --limit 50" % PROVIDER_ID, + "snapshotCommand": "bun scripts/cli.ts gc remote %s snapshot --include-hwlab-registry --history-limit 12" % PROVIDER_ID, + "runCommand": "bun scripts/cli.ts gc remote %s run --confirm --include-hwlab-registry --target-use-percent 70 --limit 50" % PROVIDER_ID, + "defaultRetention": { + "keepPerRepo": int(OPTIONS.get("registryKeepPerRepo") or 20), + "minAgeHours": float(OPTIONS.get("registryMinAgeHours") or 48), + "protects": ["current workload refs", "digest closure", "protected tags", "recent tags", "newest N tags per repo"], + }, + } + return summary + +def growth_watermark_policy(root_disk): + use_percent = root_disk.get("usePercent") if isinstance(root_disk, dict) else None + if use_percent is None: + state = "unknown" + action = "collect-snapshot" + elif use_percent < 75: + state = "healthy" + action = "observe-trend" + elif use_percent < 80: + state = "watch" + action = "run-dry-run-plan" + elif use_percent < 85: + state = "maintenance" + action = "schedule-owner-aware-retention" + else: + state = "emergency" + action = "restore-runtime-then-file-evidence" + return { + "state": state, + "recommendedAction": action, + "watermarks": [ + {"range": "<75%", "action": "trend only"}, + {"range": "75%-80%", "action": "run dry-run plan and identify source"}, + {"range": "80%-85%", "action": "small owner-aware retention run"}, + {"range": ">=85%", "action": "runtime recovery first, then root-cause growth source"}, + ], + "growthThresholdPolicy": "If bytes/day remains high for consecutive snapshots, act before 80%; exact threshold should be set from the first week of saved snapshots.", + } + +def snapshot_metric_map(snapshot): + metrics = {} + root = snapshot.get("rootDisk") or {} + if isinstance(root, dict) and root.get("usedBytes") is not None: + metrics["root.usedBytes"] = {"value": safe_int(root.get("usedBytes")), "unit": "bytes", "label": "root used bytes"} + for item in snapshot.get("sources") or []: + if not isinstance(item, dict) or item.get("sizeBytes") is None: + continue + key = "source.%s.sizeBytes" % item.get("id") + metrics[key] = {"value": safe_int(item.get("sizeBytes")), "unit": "bytes", "label": item.get("label") or item.get("id")} + storage = ((snapshot.get("ciStorage") or {}).get("byOwnerGroup") or {}) + for owner, value in storage.items(): + metrics["ciStorage.%s.estimatedBytes" % owner] = {"value": safe_int((value or {}).get("estimatedBytes")), "unit": "bytes", "label": "CI storage %s" % owner} + registry = snapshot.get("registry") or {} + retention = registry.get("retentionPlan") or {} + for key in ["totalTags", "totalRevisions", "deleteTags", "deleteRevisions", "estimatedReclaimBytes"]: + if key in retention and retention.get(key) is not None: + unit = "bytes" if key.endswith("Bytes") else "count" + metrics["registry.%s" % key] = {"value": safe_int(retention.get(key)), "unit": unit, "label": "registry %s" % key} + return metrics + +def delta_metric_rows(before, after): + before_metrics = snapshot_metric_map(before) + after_metrics = snapshot_metric_map(after) + before_ts = iso_to_epoch(before.get("observedAt")) + after_ts = iso_to_epoch(after.get("observedAt")) + seconds = (after_ts - before_ts) if before_ts is not None and after_ts is not None else None + rows = [] + for key in sorted(set(before_metrics.keys()) | set(after_metrics.keys())): + old = before_metrics.get(key, {"value": 0, "unit": (after_metrics.get(key) or {}).get("unit"), "label": key}) + new = after_metrics.get(key, {"value": 0, "unit": old.get("unit"), "label": old.get("label")}) + delta = safe_int(new.get("value")) - safe_int(old.get("value")) + row = { + "key": key, + "label": new.get("label") or old.get("label") or key, + "unit": new.get("unit") or old.get("unit"), + "before": old.get("value"), + "after": new.get("value"), + "delta": delta, + } + if row["unit"] == "bytes": + row["beforeHuman"] = fmt_bytes(row["before"] or 0) + row["afterHuman"] = fmt_bytes(row["after"] or 0) + row["deltaHuman"] = ("-" if delta < 0 else "") + fmt_bytes(abs(delta)) + if seconds and seconds > 0: + per_day = int(delta * 86400 / seconds) + row["perDayBytes"] = per_day + row["perDayHuman"] = ("-" if per_day < 0 else "") + fmt_bytes(abs(per_day)) + "/day" + rows.append(row) + rows.sort(key=lambda item: safe_int(item.get("delta")), reverse=True) + return {"durationSeconds": seconds, "metrics": rows} + +def growth_trend_payload(points): + points = [point for point in points if isinstance(point, dict)] + if len(points) < 2: + return { + "pointCount": len(points), + "state": "insufficient-history", + "message": "Run snapshot at least twice to compute deltas.", + } + latest_delta = delta_metric_rows(points[-2], points[-1]) + window_delta = delta_metric_rows(points[0], points[-1]) + def rate_warning(delta): + seconds = delta.get("durationSeconds") + if seconds is not None and seconds < 3600: + return { + "code": "short-window-rate-noisy", + "message": "Per-day rates from windows shorter than 1 hour are directional only; use daily snapshots for governance decisions.", + "durationSeconds": seconds, + } + return None + return { + "pointCount": len(points), + "oldestAt": points[0].get("observedAt"), + "latestAt": points[-1].get("observedAt"), + "latestDelta": { + "durationSeconds": latest_delta.get("durationSeconds"), + "rateWarning": rate_warning(latest_delta), + "topGrowingBytes": [row for row in latest_delta.get("metrics", []) if row.get("unit") == "bytes" and safe_int(row.get("delta")) > 0][:10], + "topShrinkingBytes": [row for row in reversed(latest_delta.get("metrics", [])) if row.get("unit") == "bytes" and safe_int(row.get("delta")) < 0][:10], + "registryCounters": [row for row in latest_delta.get("metrics", []) if str(row.get("key", "")).startswith("registry.") and row.get("unit") == "count"], + }, + "windowDelta": { + "durationSeconds": window_delta.get("durationSeconds"), + "rateWarning": rate_warning(window_delta), + "topGrowingBytes": [row for row in window_delta.get("metrics", []) if row.get("unit") == "bytes" and safe_int(row.get("delta")) > 0][:10], + "topShrinkingBytes": [row for row in reversed(window_delta.get("metrics", [])) if row.get("unit") == "bytes" and safe_int(row.get("delta")) < 0][:10], + "registryCounters": [row for row in window_delta.get("metrics", []) if str(row.get("key", "")).startswith("registry.") and row.get("unit") == "count"], + }, + } + +def collect_growth_snapshot(observed_at, preflight): + root_disk = df_snapshot() + sources = disk_source_snapshot() + ci_storage = ci_storage_snapshot() + registry = registry_growth_snapshot() + containerd = containerd_breakdown_snapshot() + return { + "ok": True, + "action": "gc remote snapshot", + "providerId": PROVIDER_ID, + "dryRun": True, + "mutation": False, + "diagnosticStateMutation": bool(OPTIONS.get("saveSnapshot", True)), + "observedAt": observed_at, + "rootDisk": root_disk, + "clusterPreflight": preflight, + "sources": sources, + "registry": registry, + "ciStorage": ci_storage, + "containerd": containerd, + "policy": growth_watermark_policy(root_disk or {}), + "commands": { + "snapshot": "bun scripts/cli.ts gc remote %s snapshot --include-hwlab-registry --history-limit %s" % (PROVIDER_ID, int(OPTIONS.get("historyLimit") or 12)), + "trend": "bun scripts/cli.ts gc remote %s trend --history-limit %s" % (PROVIDER_ID, int(OPTIONS.get("historyLimit") or 12)), + "registryPlan": "bun scripts/cli.ts gc remote %s plan --target-use-percent 70 --include-hwlab-registry --limit 50" % PROVIDER_ID, + "hwlabCiRetention": "bun scripts/cli.ts hwlab g14 control-plane cleanup-runs --lane v02 --min-age-minutes 30 --limit 200 --dry-run", + "agentrunRetention": "bun scripts/cli.ts agentrun v01 control-plane cleanup-runs --min-age-minutes 30 --limit 200 --dry-run", + }, + } + def allocated_file_size(path): try: stat = os.stat(path) @@ -1714,6 +2113,56 @@ def plan_payload(observed_at, preflight, protected, candidates, visible): def main(): observed_at = now_iso() preflight = cluster_preflight() + if ACTION == "trend": + history_limit = int(OPTIONS.get("historyLimit") or 12) + history = read_growth_snapshots(history_limit) + emit_json({ + "ok": True, + "action": "gc remote trend", + "providerId": PROVIDER_ID, + "dryRun": True, + "mutation": False, + "observedAt": observed_at, + "statePath": growth_snapshot_path(), + "historyLimit": history_limit, + "trend": growth_trend_payload(history), + "points": history if bool(OPTIONS.get("full")) else history[-min(len(history), 3):], + "returnedPointCount": min(len(history), 3) if not bool(OPTIONS.get("full")) else len(history), + "totalPointCount": len(history), + "next": { + "snapshot": "bun scripts/cli.ts gc remote %s snapshot --include-hwlab-registry --history-limit %s" % (PROVIDER_ID, history_limit), + }, + }, persist_large=True) + return 0 + if ACTION == "snapshot": + history_limit = int(OPTIONS.get("historyLimit") or 12) + snapshot = collect_growth_snapshot(observed_at, preflight) + state_path = growth_snapshot_path() + if bool(OPTIONS.get("saveSnapshot", True)): + state_path = append_growth_snapshot(snapshot) + history = read_growth_snapshots(history_limit) + if not bool(OPTIONS.get("saveSnapshot", True)): + history = history + [snapshot] + snapshot.update({ + "statePath": state_path, + "historyLimit": history_limit, + "saved": bool(OPTIONS.get("saveSnapshot", True)), + "trend": growth_trend_payload(history[-history_limit:]), + "history": { + "totalPointCount": len(read_growth_snapshots(1000000)) if bool(OPTIONS.get("saveSnapshot", True)) else len(history), + "returnedPointCount": len(history[-min(len(history), 3):]), + "recentPoints": history[-min(len(history), 3):] if bool(OPTIONS.get("full")) else [ + { + "observedAt": item.get("observedAt"), + "rootDisk": item.get("rootDisk"), + "sourceCount": len(item.get("sources") or []), + } + for item in history[-min(len(history), 3):] + ], + }, + }) + emit_json(snapshot, persist_large=True) + return 0 protected = collect_protected() candidates = collect_candidates(observed_at) visible = visible_items(candidates) diff --git a/scripts/src/platform-infra-sub2api-codex.ts b/scripts/src/platform-infra-sub2api-codex.ts index 4f88632b..83b4649c 100644 --- a/scripts/src/platform-infra-sub2api-codex.ts +++ b/scripts/src/platform-infra-sub2api-codex.ts @@ -3,6 +3,7 @@ import { existsSync, readFileSync, readdirSync } from "node:fs"; import { homedir } from "node:os"; import { join } from "node:path"; import type { UniDeskConfig } from "./config"; +import { rootPath } from "./config"; import { runSshCommandCapture, type SshCaptureResult } from "./ssh"; const g14K3sRoute = "G14:k3s"; @@ -11,10 +12,12 @@ const serviceName = "sub2api"; const serviceDns = `${serviceName}.${namespace}.svc.cluster.local:8080`; const fieldManager = "unidesk-platform-infra"; const appSecretName = "sub2api-secrets"; -const poolGroupName = "unidesk-codex-pool"; -const poolApiKeyName = "unidesk-codex-pool-api-key"; -const poolApiKeySecretName = "sub2api-codex-pool-api-key"; -const poolApiKeySecretKey = "API_KEY"; +const codexPoolConfigPath = rootPath("config", "platform-infra", "sub2api-codex-pool.yaml"); +const defaultPoolGroupName = "unidesk-codex-pool"; +const defaultPoolApiKeyName = "unidesk-codex-pool-api-key"; +const defaultPoolApiKeySecretName = "sub2api-codex-pool-api-key"; +const defaultPoolApiKeySecretKey = "API_KEY"; +const defaultMinOwnerBalanceUsd = 1000; interface DisclosureOptions { full: boolean; @@ -42,7 +45,16 @@ interface CodexProfile { error: string | null; } +interface CodexPoolConfig { + groupName: string; + apiKeyName: string; + apiKeySecretName: string; + apiKeySecretKey: string; + minOwnerBalanceUsd: number; +} + export function codexPoolHelp(): unknown { + const pool = readCodexPoolConfig(); return { command: "platform-infra sub2api codex-pool plan|sync|validate", output: "json", @@ -56,9 +68,10 @@ export function codexPoolHelp(): unknown { route: g14K3sRoute, namespace, serviceDns, - poolGroupName, - poolApiKeySecretName, - poolApiKeySecretKey, + configPath: codexPoolConfigPath, + poolGroupName: pool.groupName, + poolApiKeySecretName: pool.apiKeySecretName, + poolApiKeySecretKey: pool.apiKeySecretKey, secretValuesPrinted: false, }, }; @@ -97,6 +110,7 @@ function validateOptions(args: string[], booleanOptions: Set): void { } function codexPoolPlan(): Record { + const pool = readCodexPoolConfig(); const profiles = collectCodexProfiles(); const ok = profiles.length > 0 && profiles.every((profile) => profile.ok); return { @@ -109,11 +123,15 @@ function codexPoolPlan(): Record { valuesPrinted: false, }, target: poolTarget(), + config: { + path: codexPoolConfigPath, + pool, + }, profiles: profiles.map(redactProfile), decision: { accountType: "openai/apikey", - grouping: `All discovered Codex profiles are bound to one Sub2API group named ${poolGroupName}.`, - unifiedApiKey: `The client-facing API_KEY is controlled by k3s Secret ${namespace}/${poolApiKeySecretName}.${poolApiKeySecretKey}.`, + grouping: `All discovered Codex profiles are bound to one Sub2API group named ${pool.groupName}.`, + unifiedApiKey: `The client-facing API_KEY is controlled by k3s Secret ${namespace}/${pool.apiKeySecretName}.${pool.apiKeySecretKey}.`, idempotency: "sync reuses the group, account names, and k3s Secret when they already exist; credentials are updated from the current local Codex files.", configPolicy: "UniDesk-owned durable configuration remains YAML-first; local ~/.codex files and runtime Secrets are not committed.", }, @@ -124,6 +142,7 @@ function codexPoolPlan(): Record { } async function codexPoolSync(config: UniDeskConfig, options: SyncOptions): Promise> { + const pool = readCodexPoolConfig(); const profiles = collectCodexProfiles(); const planOk = profiles.length > 0 && profiles.every((profile) => profile.ok); if (!options.confirm || !planOk) { @@ -139,10 +158,11 @@ async function codexPoolSync(config: UniDeskConfig, options: SyncOptions): Promi const payload = { pool: { - groupName: poolGroupName, - apiKeyName: poolApiKeyName, - apiKeySecretName: poolApiKeySecretName, - apiKeySecretKey: poolApiKeySecretKey, + groupName: pool.groupName, + apiKeyName: pool.apiKeyName, + apiKeySecretName: pool.apiKeySecretName, + apiKeySecretKey: pool.apiKeySecretKey, + minOwnerBalanceUsd: pool.minOwnerBalanceUsd, }, profiles: profiles.map((profile) => ({ profile: profile.profile, @@ -158,7 +178,7 @@ async function codexPoolSync(config: UniDeskConfig, options: SyncOptions): Promi apiKeyFingerprint: fingerprint(profile.apiKey ?? ""), })), }; - const result = await capture(config, g14K3sRoute, ["script"], syncScript(payload)); + const result = await capture(config, g14K3sRoute, ["script"], syncScript(payload, pool)); const parsed = parseJsonOutput(result.stdout); if (options.raw) { return { @@ -183,7 +203,8 @@ async function codexPoolSync(config: UniDeskConfig, options: SyncOptions): Promi } async function codexPoolValidate(config: UniDeskConfig, options: DisclosureOptions): Promise> { - const result = await capture(config, g14K3sRoute, ["script"], validateScript()); + const pool = readCodexPoolConfig(); + const result = await capture(config, g14K3sRoute, ["script"], validateScript(pool)); const parsed = parseJsonOutput(result.stdout); if (options.raw) { return { @@ -274,6 +295,35 @@ function collectCodexProfiles(): CodexProfile[] { }); } +function readCodexPoolConfig(): CodexPoolConfig { + const defaults: CodexPoolConfig = { + groupName: defaultPoolGroupName, + apiKeyName: defaultPoolApiKeyName, + apiKeySecretName: defaultPoolApiKeySecretName, + apiKeySecretKey: defaultPoolApiKeySecretKey, + minOwnerBalanceUsd: defaultMinOwnerBalanceUsd, + }; + if (!existsSync(codexPoolConfigPath)) return defaults; + const parsed = Bun.YAML.parse(readFileSync(codexPoolConfigPath, "utf8")) as unknown; + if (!isRecord(parsed)) throw new Error(`${codexPoolConfigPath} must contain a YAML object`); + const pool = parsed.pool; + if (!isRecord(pool)) throw new Error(`${codexPoolConfigPath}.pool must be a YAML object`); + const config: CodexPoolConfig = { + groupName: stringValue(pool.groupName) ?? defaults.groupName, + apiKeyName: stringValue(pool.apiKeyName) ?? defaults.apiKeyName, + apiKeySecretName: stringValue(pool.apiKeySecretName) ?? defaults.apiKeySecretName, + apiKeySecretKey: stringValue(pool.apiKeySecretKey) ?? defaults.apiKeySecretKey, + minOwnerBalanceUsd: numberValue(pool.minOwnerBalanceUsd) ?? defaults.minOwnerBalanceUsd, + }; + validateKubernetesName(config.groupName, "pool.groupName", false); + validateKubernetesName(config.apiKeySecretName, "pool.apiKeySecretName", true); + if (!/^[A-Za-z_][A-Za-z0-9_]*$/u.test(config.apiKeySecretKey)) { + throw new Error(`${codexPoolConfigPath}.pool.apiKeySecretKey must be a valid secret key name`); + } + if (config.minOwnerBalanceUsd <= 0) throw new Error(`${codexPoolConfigPath}.pool.minOwnerBalanceUsd must be > 0`); + return config; +} + function readAuthAPIKey(authPath: string): { apiKey: string | null; shape: string } { if (!existsSync(authPath)) return { apiKey: null, shape: "missing" }; const parsed = JSON.parse(readFileSync(authPath, "utf8")) as unknown; @@ -321,15 +371,16 @@ function redactProfile(profile: CodexProfile): Record { }; } -function poolTarget(): Record { +function poolTarget(pool = readCodexPoolConfig()): Record { return { route: g14K3sRoute, namespace, service: serviceName, serviceDns, - groupName: poolGroupName, - apiKeyName: poolApiKeyName, - apiKeySecret: `${namespace}/${poolApiKeySecretName}.${poolApiKeySecretKey}`, + configPath: codexPoolConfigPath, + groupName: pool.groupName, + apiKeyName: pool.apiKeyName, + apiKeySecret: `${namespace}/${pool.apiKeySecretName}.${pool.apiKeySecretKey}`, valuesPrinted: false, }; } @@ -351,6 +402,20 @@ function stringValue(value: unknown): string | null { return typeof value === "string" && value.trim().length > 0 ? value.trim() : null; } +function numberValue(value: unknown): number | null { + if (typeof value === "number" && Number.isFinite(value)) return value; + if (typeof value === "string" && value.trim().length > 0) { + const parsed = Number(value); + return Number.isFinite(parsed) ? parsed : null; + } + return null; +} + +function validateKubernetesName(value: string, key: string, strictDns: boolean): void { + const pattern = strictDns ? /^[a-z0-9]([-a-z0-9]*[a-z0-9])?$/u : /^[A-Za-z0-9]([A-Za-z0-9._-]*[A-Za-z0-9])?$/u; + if (!pattern.test(value)) throw new Error(`${codexPoolConfigPath}.${key} has an unsupported format`); +} + function isRecord(value: unknown): value is Record { return typeof value === "object" && value !== null && !Array.isArray(value); } @@ -359,16 +424,16 @@ function fingerprint(value: string): string { return createHash("sha256").update(value).digest("hex").slice(0, 12); } -function syncScript(payload: unknown): string { +function syncScript(payload: unknown, pool: CodexPoolConfig): string { const encoded = Buffer.from(JSON.stringify(payload), "utf8").toString("base64"); - return remotePythonScript("sync", encoded); + return remotePythonScript("sync", encoded, pool); } -function validateScript(): string { - return remotePythonScript("validate", ""); +function validateScript(pool: CodexPoolConfig): string { + return remotePythonScript("validate", "", pool); } -function remotePythonScript(mode: "sync" | "validate", encodedPayload: string): string { +function remotePythonScript(mode: "sync" | "validate", encodedPayload: string, pool: CodexPoolConfig): string { return ` set -u python3 - <<'PY' @@ -386,10 +451,11 @@ SERVICE_NAME = "${serviceName}" SERVICE_DNS = "${serviceDns}" FIELD_MANAGER = "${fieldManager}" APP_SECRET_NAME = "${appSecretName}" -POOL_GROUP_NAME = "${poolGroupName}" -POOL_API_KEY_NAME = "${poolApiKeyName}" -POOL_API_KEY_SECRET_NAME = "${poolApiKeySecretName}" -POOL_API_KEY_SECRET_KEY = "${poolApiKeySecretKey}" +POOL_GROUP_NAME = "${pool.groupName}" +POOL_API_KEY_NAME = "${pool.apiKeyName}" +POOL_API_KEY_SECRET_NAME = "${pool.apiKeySecretName}" +POOL_API_KEY_SECRET_KEY = "${pool.apiKeySecretKey}" +MIN_OWNER_BALANCE_USD = ${JSON.stringify(pool.minOwnerBalanceUsd)} MODE = "${mode}" PAYLOAD_B64 = "${encodedPayload}" @@ -706,6 +772,38 @@ def ensure_sub2api_api_key(token, api_key, group_id): "id": existing.get("id") if isinstance(existing, dict) else None, "name": existing.get("name") if isinstance(existing, dict) else POOL_API_KEY_NAME, "groupId": existing.get("group_id") if isinstance(existing, dict) else group_id, + "userId": existing.get("user_id") if isinstance(existing, dict) else None, + } + +def get_admin_user(token, user_id): + data = ensure_success(curl_api("GET", f"/api/v1/admin/users/{user_id}", bearer=token), "get API key owner") + if not isinstance(data, dict): + raise RuntimeError("API key owner response is not an object") + return data + +def ensure_pool_owner_balance(token, user_id): + user = get_admin_user(token, user_id) + current = float(user.get("balance") or 0) + if current >= MIN_OWNER_BALANCE_USD: + return { + "action": "kept-existing", + "userId": user_id, + "balanceBefore": current, + "balanceAfter": current, + "minimumBalanceUsd": MIN_OWNER_BALANCE_USD, + } + updated = ensure_success(curl_api("POST", f"/api/v1/admin/users/{user_id}/balance", bearer=token, payload={ + "balance": MIN_OWNER_BALANCE_USD, + "operation": "set", + "notes": "UniDesk Sub2API Codex pool internal API key bootstrap balance.", + }), "set API key owner balance") + after = float(updated.get("balance") or MIN_OWNER_BALANCE_USD) if isinstance(updated, dict) else MIN_OWNER_BALANCE_USD + return { + "action": "set", + "userId": user_id, + "balanceBefore": current, + "balanceAfter": after, + "minimumBalanceUsd": MIN_OWNER_BALANCE_USD, } def validate_gateway(api_key): @@ -744,6 +842,7 @@ def run_sync(): account_results = ensure_accounts(token, profiles, group_id) api_key, secret_action, secret_apply_stdout = ensure_api_key_secret(group_id) api_key_result = ensure_sub2api_api_key(token, api_key, group_id) + owner_balance = ensure_pool_owner_balance(token, api_key_result["userId"]) gateway = validate_gateway(api_key) return { "ok": gateway["ok"] is True, @@ -768,9 +867,11 @@ def run_sync(): "sub2apiAction": api_key_result["action"], "sub2apiId": api_key_result["id"], "groupId": api_key_result["groupId"], + "userId": api_key_result["userId"], "keyPreview": api_key_preview(api_key), "valuesPrinted": False, }, + "ownerBalance": owner_balance, "validation": {"gatewayModels": gateway}, } @@ -778,6 +879,11 @@ def run_validate(): api_key = decode_secret_value(POOL_API_KEY_SECRET_NAME, POOL_API_KEY_SECRET_KEY) if not api_key: raise RuntimeError(f"{POOL_API_KEY_SECRET_NAME}.{POOL_API_KEY_SECRET_KEY} missing") + admin_email, token = login() + key_item = next((item for item in list_user_keys(token) if item.get("key") == api_key), None) + owner_balance = None + if key_item is not None and key_item.get("user_id") is not None: + owner_balance = ensure_pool_owner_balance(token, key_item["user_id"]) gateway = validate_gateway(api_key) return { "ok": gateway["ok"] is True, @@ -785,11 +891,15 @@ def run_validate(): "namespace": NAMESPACE, "serviceDns": SERVICE_DNS, "appPod": APP_POD, + "admin": {"email": admin_email, "tokenPrinted": False}, "apiKey": { "secret": f"{NAMESPACE}/{POOL_API_KEY_SECRET_NAME}.{POOL_API_KEY_SECRET_KEY}", + "sub2apiId": key_item.get("id") if isinstance(key_item, dict) else None, + "userId": key_item.get("user_id") if isinstance(key_item, dict) else None, "keyPreview": api_key_preview(api_key), "valuesPrinted": False, }, + "ownerBalance": owner_balance, "validation": {"gatewayModels": gateway}, }