feat: add platform infra codex pool config

This commit is contained in:
Codex
2026-06-09 03:04:44 +00:00
parent 97f19d462f
commit c586918dba
4 changed files with 693 additions and 31 deletions
@@ -0,0 +1,6 @@
pool:
groupName: unidesk-codex-pool
apiKeyName: unidesk-codex-pool-api-key
apiKeySecretName: sub2api-codex-pool-api-key
apiKeySecretKey: API_KEY
minOwnerBalanceUsd: 1000
+97
View File
@@ -0,0 +1,97 @@
const sourceText = await Bun.file(new URL("./src/gc-remote.ts", import.meta.url)).text();
function assertCondition(condition: unknown, message: string, detail: unknown = {}): void {
if (!condition) throw new Error(`${message}: ${JSON.stringify(detail)}`);
}
function functionBody(name: string): string {
const marker = `def ${name}(`;
const start = sourceText.indexOf(marker);
if (start < 0) return "";
const next = sourceText.indexOf("\ndef ", start + marker.length);
return sourceText.slice(start, next < 0 ? sourceText.length : next);
}
assertCondition(
sourceText.includes('if (subaction === "snapshot" || subaction === "growth")')
&& sourceText.includes('if (subaction === "trend")')
&& sourceText.includes('supportedActions: ["plan", "snapshot", "trend", "run", "status"]'),
"gc remote must expose snapshot/growth and trend actions",
);
assertCondition(
sourceText.includes("historyLimit: number")
&& sourceText.includes("saveSnapshot: boolean")
&& sourceText.includes("--history-limit")
&& sourceText.includes("--no-save"),
"gc remote snapshot must expose bounded history and no-save options",
);
const snapshotBody = functionBody("collect_growth_snapshot");
assertCondition(
snapshotBody.includes('"action": "gc remote snapshot"')
&& snapshotBody.includes('"diagnosticStateMutation"')
&& snapshotBody.includes("disk_source_snapshot()")
&& snapshotBody.includes("ci_storage_snapshot()")
&& snapshotBody.includes("registry_growth_snapshot()")
&& snapshotBody.includes("containerd_breakdown_snapshot()"),
"growth snapshot must include disk sources, CI PVC ownership, registry, containerd and diagnostic-state disclosure",
snapshotBody.slice(0, 1200),
);
const storageBody = functionBody("ci_storage_snapshot");
assertCondition(
storageBody.includes('"hwlab"')
&& storageBody.includes('"agentrun"')
&& storageBody.includes('"byOwnerGroup"')
&& storageBody.includes("hwlab g14 control-plane cleanup-runs")
&& storageBody.includes("agentrun v01 control-plane cleanup-runs")
&& storageBody.includes("hostPath")
&& storageBody.includes("activeMountPods")
&& storageBody.includes("estimatedBytes")
&& storageBody.includes('phase in set(["Succeeded", "Failed"])'),
"CI storage snapshot must keep HWLAB and AgentRun owner handoff plus reclaim visibility",
storageBody.slice(0, 1600),
);
const registryBody = functionBody("registry_growth_snapshot");
assertCondition(
registryBody.includes('"dryRun": "daily or before/after every v0.2 CI/CD burst"')
&& registryBody.includes('"maintenanceRun": "weekly, or when root >=80%, or when registry growth exceeds the agreed daily threshold"')
&& registryBody.includes("plan_registry_retention()")
&& registryBody.includes("protected tags")
&& registryBody.includes("newest N tags per repo"),
"registry growth snapshot must disclose dry-run cadence, maintenance trigger and retention protections",
registryBody,
);
const containerdBody = functionBody("containerd_breakdown_snapshot");
assertCondition(
containerdBody.includes('"state": "observation-only"')
&& containerdBody.includes('"cleanupSupported": False')
&& containerdBody.includes("reference-safe image/content classifier"),
"containerd section must stay observation-only until a safe classifier exists",
containerdBody,
);
const trendBody = functionBody("growth_trend_payload");
assertCondition(
trendBody.includes('"latestDelta"')
&& trendBody.includes('"windowDelta"')
&& trendBody.includes('"short-window-rate-noisy"')
&& trendBody.includes('"topGrowingBytes"')
&& trendBody.includes('"registryCounters"'),
"growth trend must expose latest/window delta and registry counters",
trendBody,
);
console.log(JSON.stringify({
ok: true,
checks: [
"gc remote exposes snapshot/growth and trend actions",
"snapshot history is bounded and can be disabled with --no-save",
"snapshot includes source sizes, owner-aware CI PVCs, registry cadence and containerd observation",
"HWLAB and AgentRun retention handoff commands are explicit",
"trend includes latest/window deltas and registry counters",
],
}));
+452 -3
View File
@@ -25,6 +25,8 @@ interface RemoteGcOptions {
limit: number;
resultLimit: number;
full: boolean;
historyLimit: number;
saveSnapshot: boolean;
}
const DEFAULT_REMOTE_OPTIONS: RemoteGcOptions = {
@@ -47,6 +49,8 @@ const DEFAULT_REMOTE_OPTIONS: RemoteGcOptions = {
limit: 50,
resultLimit: 50,
full: false,
historyLimit: 12,
saveSnapshot: true,
};
export async function runRemoteGcCommand(config: UniDeskConfig, providerId: string | undefined, action: string | undefined, args: string[]): Promise<unknown> {
@@ -54,12 +58,14 @@ export async function runRemoteGcCommand(config: UniDeskConfig, providerId: stri
return {
ok: false,
error: "gc-remote-provider-required",
usage: "bun scripts/cli.ts gc remote <providerId> plan|run|status [--confirm]",
usage: "bun scripts/cli.ts gc remote <providerId> plan|snapshot|trend|run|status [--confirm]",
};
}
const subaction = action ?? "plan";
const options = parseRemoteGcOptions(args);
if (subaction === "plan" || subaction === "dry-run") return await runRemoteGc(config, providerId, "plan", options);
if (subaction === "snapshot" || subaction === "growth") return await runRemoteGc(config, providerId, "snapshot", options);
if (subaction === "trend") return await runRemoteGc(config, providerId, "trend", options);
if (subaction === "status") return await runRemoteGc(config, providerId, "status", options);
if (subaction === "run") {
if (!options.confirm) {
@@ -79,7 +85,7 @@ export async function runRemoteGcCommand(config: UniDeskConfig, providerId: stri
ok: false,
error: "unsupported-gc-remote-action",
action: subaction,
supportedActions: ["plan", "run", "status"],
supportedActions: ["plan", "snapshot", "trend", "run", "status"],
};
}
@@ -126,6 +132,10 @@ function parseRemoteGcOptions(args: string[]): RemoteGcOptions {
const value = parseNonNegativeNumber(arg, args[++index]);
if (!Number.isInteger(value) || value <= 0) throw new Error("--result-limit must be a positive integer");
options.resultLimit = Math.min(value, 5000);
} else if (arg === "--history-limit") {
const value = parseNonNegativeNumber(arg, args[++index]);
if (!Number.isInteger(value) || value <= 1) throw new Error("--history-limit must be an integer greater than 1");
options.historyLimit = Math.min(value, 200);
} else if (arg === "--no-journal") {
options.journal = false;
} else if (arg === "--no-docker-logs") {
@@ -145,6 +155,8 @@ function parseRemoteGcOptions(args: string[]): RemoteGcOptions {
options.registryGcOnly = true;
} else if (arg === "--full" || arg === "--raw") {
options.full = true;
} else if (arg === "--no-save" || arg === "--no-snapshot-save") {
options.saveSnapshot = false;
} else {
throw new Error(`unknown gc remote option: ${arg}`);
}
@@ -178,7 +190,7 @@ function parseSize(raw: string): number | null {
return Number.isFinite(bytes) ? bytes : null;
}
async function runRemoteGc(config: UniDeskConfig, providerId: string, action: "plan" | "run" | "status", options: RemoteGcOptions): Promise<unknown> {
async function runRemoteGc(config: UniDeskConfig, providerId: string, action: "plan" | "snapshot" | "trend" | "run" | "status", options: RemoteGcOptions): Promise<unknown> {
const scriptConfig = Buffer.from(JSON.stringify({ providerId, action, options }), "utf8").toString("base64");
const result = await runSshCommandCapture(config, providerId, ["py"], remoteGcPython(scriptConfig));
if (result.exitCode !== 0) {
@@ -210,6 +222,7 @@ async function runRemoteGc(config: UniDeskConfig, providerId: string, action: "p
function remoteGcPython(configBase64: string): string {
return String.raw`
import base64
import calendar
import json
import os
import re
@@ -281,6 +294,7 @@ REGISTRY_PROTECTED_TAGS = set([
EXPECTED_G14_NODE = "ubuntu-rog-zephyrus-g14-ga401iv-ga401iv"
REMOTE_GC_JOB_DIR = "/tmp/unidesk-gc-remote/jobs"
REMOTE_GROWTH_SNAPSHOT_DIR = "/tmp/unidesk-gc-remote/growth-snapshots"
REMOTE_STDOUT_JSON_LIMIT = 256 * 1024
def now_iso():
@@ -496,6 +510,391 @@ def du_size(path, timeout=20):
except Exception:
return path_size(path)
def safe_int(value, default=0):
try:
if value is None:
return default
return int(value)
except Exception:
return default
def iso_to_epoch(value):
try:
return calendar.timegm(time.strptime(str(value), "%Y-%m-%dT%H:%M:%SZ"))
except Exception:
return None
def growth_snapshot_path():
os.makedirs(REMOTE_GROWTH_SNAPSHOT_DIR, exist_ok=True)
provider_slug = re.sub(r"[^A-Za-z0-9._-]+", "-", PROVIDER_ID.lower()).strip("-") or "provider"
return os.path.join(REMOTE_GROWTH_SNAPSHOT_DIR, "%s.jsonl" % provider_slug)
def read_growth_snapshots(limit=None):
path = growth_snapshot_path()
if not os.path.isfile(path):
return []
try:
with open(path, "r", encoding="utf-8") as handle:
lines = handle.readlines()
except OSError:
return []
rows = []
for line in lines[-max(1, int(limit or 200)):]:
line = line.strip()
if not line:
continue
try:
item = json.loads(line)
except Exception:
continue
if isinstance(item, dict):
rows.append(item)
return rows
def append_growth_snapshot(snapshot):
path = growth_snapshot_path()
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, "a", encoding="utf-8") as handle:
handle.write(json.dumps(snapshot, ensure_ascii=False, sort_keys=True))
handle.write("\n")
return path
def source_size_item(source_id, label, path, cleanup_owner, timeout=20):
size = du_size(path, timeout) if os.path.exists(path) else None
return {
"id": source_id,
"label": label,
"path": path,
"exists": size is not None,
"sizeBytes": size,
"sizeHuman": fmt_bytes(size or 0),
"cleanupOwner": cleanup_owner,
}
def disk_source_snapshot():
sources = [
source_size_item("hwlab-host-data", "HWLAB host data", "/var/lib/hwlab", "hwlab-registry-retention", 60),
source_size_item("hwlab-registry", "HWLAB registry", REGISTRY_ROOT, "gc-remote-hwlab-registry", 60),
source_size_item("k3s-storage", "k3s local-path storage", "/var/lib/rancher/k3s/storage", "owner-aware-pvc-retention", 45),
source_size_item("k3s-containerd", "k3s containerd", "/var/lib/rancher/k3s/agent/containerd", "observation-only", 45),
source_size_item("host-containerd", "host containerd", "/var/lib/containerd", "observation-only", 30),
source_size_item("kubelet", "kubelet state", "/var/lib/kubelet", "protected-runtime", 20),
source_size_item("var-log", "host logs", "/var/log", "gc-remote-logs-journald", 20),
source_size_item("tmp", "allowlisted tmp and other tmp", "/tmp", "gc-remote-tmp-allowlist", 20),
source_size_item("apt-cache", "apt archives", "/var/cache/apt/archives", "gc-remote-apt-cache", 10),
source_size_item("hwlab-v02-source", "HWLAB v0.2 source workspace", "/root/hwlab-v02", "protected-source", 20),
source_size_item("agentrun-source", "AgentRun source workspace", "/root/agentrun-v01", "protected-source", 20),
]
return [item for item in sources if item.get("exists")]
def containerd_breakdown_snapshot():
rows = [
source_size_item("k3s-containerd-content", "k3s containerd content store", "/var/lib/rancher/k3s/agent/containerd/io.containerd.content.v1.content", "observation-only", 30),
source_size_item("k3s-containerd-overlayfs", "k3s containerd overlay snapshots", "/var/lib/rancher/k3s/agent/containerd/io.containerd.snapshotter.v1.overlayfs", "observation-only", 30),
source_size_item("host-containerd-content", "host containerd content store", "/var/lib/containerd/io.containerd.content.v1.content", "observation-only", 20),
source_size_item("host-containerd-overlayfs", "host containerd overlay snapshots", "/var/lib/containerd/io.containerd.snapshotter.v1.overlayfs", "observation-only", 20),
]
rows = [item for item in rows if item.get("exists")]
return {
"state": "observation-only",
"cleanupSupported": False,
"reason": "containerd cleanup still requires a reference-safe image/content classifier; this snapshot only classifies growth sources",
"breakdown": rows,
}
def pv_host_path(pv):
spec = (pv or {}).get("spec") or {}
host_path = (spec.get("hostPath") or {}).get("path")
if isinstance(host_path, str) and host_path:
return host_path
local_path = (spec.get("local") or {}).get("path")
if isinstance(local_path, str) and local_path:
return local_path
return None
def pvc_owner_group(namespace, owner):
owner = str(owner or "")
if namespace == "agentrun-ci":
return "agentrun"
if namespace == "hwlab-ci":
if owner.startswith("agentrun-"):
return "agentrun"
return "hwlab"
if namespace.startswith("hwlab-"):
return "hwlab-runtime"
return "other"
def ci_storage_snapshot():
pv_data = kubectl_json(["get", "pv"], 30) or {}
pvc_data = kubectl_json(["get", "pvc", "-A"], 30) or {}
pod_data = kubectl_json(["get", "pod", "-A"], 30) or {}
pvs = {}
for pv in pv_data.get("items") or []:
meta = pv.get("metadata") or {}
name = meta.get("name")
if name:
pvs[name] = pv
mounts = {}
for pod in pod_data.get("items") or []:
meta = pod.get("metadata") or {}
ns = str(meta.get("namespace") or "")
pod_name = str(meta.get("name") or "")
phase = str(((pod.get("status") or {}).get("phase")) or "")
if phase in set(["Succeeded", "Failed"]):
continue
spec = pod.get("spec") or {}
for vol in spec.get("volumes") or []:
claim = (vol.get("persistentVolumeClaim") or {}).get("claimName")
if claim:
mounts.setdefault((ns, claim), []).append(pod_name)
rows = []
for pvc in pvc_data.get("items") or []:
meta = pvc.get("metadata") or {}
spec = pvc.get("spec") or {}
status = pvc.get("status") or {}
ns = str(meta.get("namespace") or "")
name = str(meta.get("name") or "")
if ns not in set(["hwlab-ci", "agentrun-ci"]):
continue
volume = str(spec.get("volumeName") or "")
pv = pvs.get(volume) or {}
pv_spec = pv.get("spec") or {}
owner_refs = meta.get("ownerReferences") or []
owner_kind = None
owner_name = None
if owner_refs:
owner_kind = owner_refs[0].get("kind")
owner_name = owner_refs[0].get("name")
host_path = pv_host_path(pv)
active = sorted(mounts.get((ns, name), []))
estimated = du_size(host_path, 8) if host_path else None
rows.append({
"namespace": ns,
"pvc": name,
"volume": volume or None,
"phase": status.get("phase"),
"ownerKind": owner_kind,
"owner": owner_name,
"ownerGroup": pvc_owner_group(ns, owner_name),
"storageClass": spec.get("storageClassName") or pv_spec.get("storageClassName"),
"reclaimPolicy": pv_spec.get("persistentVolumeReclaimPolicy"),
"hostPath": host_path,
"activeMountPods": active,
"estimatedBytes": estimated,
"estimatedHuman": fmt_bytes(estimated or 0),
})
rows.sort(key=lambda item: safe_int(item.get("estimatedBytes")), reverse=True)
by_namespace = {}
by_owner_group = {}
for row in rows:
for bucket, key in [(by_namespace, row.get("namespace") or "unknown"), (by_owner_group, row.get("ownerGroup") or "unknown")]:
current = bucket.setdefault(key, {"count": 0, "estimatedBytes": 0, "activeMountCount": 0})
current["count"] += 1
current["estimatedBytes"] += safe_int(row.get("estimatedBytes"))
current["activeMountCount"] += len(row.get("activeMountPods") or [])
current["estimatedHuman"] = fmt_bytes(current["estimatedBytes"])
return {
"scope": "hwlab-ci and agentrun-ci PVCs only",
"pvcCount": len(rows),
"estimatedBytes": sum(safe_int(row.get("estimatedBytes")) for row in rows),
"estimatedHuman": fmt_bytes(sum(safe_int(row.get("estimatedBytes")) for row in rows)),
"byNamespace": by_namespace,
"byOwnerGroup": by_owner_group,
"topPvcs": rows[:int(OPTIONS.get("limit") or 50)],
"handoff": {
"hwlab": {
"dryRun": "bun scripts/cli.ts hwlab g14 control-plane cleanup-runs --lane v02 --min-age-minutes 30 --limit 200 --dry-run",
"releasedPvs": "bun scripts/cli.ts hwlab g14 control-plane cleanup-released-pvs --lane all --limit 200 --dry-run",
},
"agentrun": {
"dryRun": "bun scripts/cli.ts agentrun v01 control-plane cleanup-runs --min-age-minutes 30 --limit 200 --dry-run",
"releasedPvs": "bun scripts/cli.ts agentrun v01 control-plane cleanup-released-pvs --limit 200 --dry-run",
},
},
}
def registry_growth_snapshot():
summary = {
"path": REGISTRY_ROOT,
"sizeBytes": du_size(REGISTRY_ROOT, 60) or 0,
}
summary["sizeHuman"] = fmt_bytes(summary["sizeBytes"])
if OPTIONS.get("hwlabRegistry", False):
plan = plan_registry_retention()
retention = dict(plan.get("summary") or {})
for key in ["registrySizeBytes", "estimatedReclaimBytes"]:
if key in retention:
retention[key.replace("Bytes", "Human")] = fmt_bytes(retention.get(key) or 0)
summary["retentionPlan"] = retention
else:
summary["retentionPlan"] = {
"skipped": True,
"reason": "rerun snapshot with --include-hwlab-registry to compute tag/revision retention counters",
}
summary["cadence"] = {
"dryRun": "daily or before/after every v0.2 CI/CD burst",
"maintenanceRun": "weekly, or when root >=80%, or when registry growth exceeds the agreed daily threshold",
"planCommand": "bun scripts/cli.ts gc remote %s plan --target-use-percent 70 --include-hwlab-registry --limit 50" % PROVIDER_ID,
"snapshotCommand": "bun scripts/cli.ts gc remote %s snapshot --include-hwlab-registry --history-limit 12" % PROVIDER_ID,
"runCommand": "bun scripts/cli.ts gc remote %s run --confirm --include-hwlab-registry --target-use-percent 70 --limit 50" % PROVIDER_ID,
"defaultRetention": {
"keepPerRepo": int(OPTIONS.get("registryKeepPerRepo") or 20),
"minAgeHours": float(OPTIONS.get("registryMinAgeHours") or 48),
"protects": ["current workload refs", "digest closure", "protected tags", "recent tags", "newest N tags per repo"],
},
}
return summary
def growth_watermark_policy(root_disk):
use_percent = root_disk.get("usePercent") if isinstance(root_disk, dict) else None
if use_percent is None:
state = "unknown"
action = "collect-snapshot"
elif use_percent < 75:
state = "healthy"
action = "observe-trend"
elif use_percent < 80:
state = "watch"
action = "run-dry-run-plan"
elif use_percent < 85:
state = "maintenance"
action = "schedule-owner-aware-retention"
else:
state = "emergency"
action = "restore-runtime-then-file-evidence"
return {
"state": state,
"recommendedAction": action,
"watermarks": [
{"range": "<75%", "action": "trend only"},
{"range": "75%-80%", "action": "run dry-run plan and identify source"},
{"range": "80%-85%", "action": "small owner-aware retention run"},
{"range": ">=85%", "action": "runtime recovery first, then root-cause growth source"},
],
"growthThresholdPolicy": "If bytes/day remains high for consecutive snapshots, act before 80%; exact threshold should be set from the first week of saved snapshots.",
}
def snapshot_metric_map(snapshot):
metrics = {}
root = snapshot.get("rootDisk") or {}
if isinstance(root, dict) and root.get("usedBytes") is not None:
metrics["root.usedBytes"] = {"value": safe_int(root.get("usedBytes")), "unit": "bytes", "label": "root used bytes"}
for item in snapshot.get("sources") or []:
if not isinstance(item, dict) or item.get("sizeBytes") is None:
continue
key = "source.%s.sizeBytes" % item.get("id")
metrics[key] = {"value": safe_int(item.get("sizeBytes")), "unit": "bytes", "label": item.get("label") or item.get("id")}
storage = ((snapshot.get("ciStorage") or {}).get("byOwnerGroup") or {})
for owner, value in storage.items():
metrics["ciStorage.%s.estimatedBytes" % owner] = {"value": safe_int((value or {}).get("estimatedBytes")), "unit": "bytes", "label": "CI storage %s" % owner}
registry = snapshot.get("registry") or {}
retention = registry.get("retentionPlan") or {}
for key in ["totalTags", "totalRevisions", "deleteTags", "deleteRevisions", "estimatedReclaimBytes"]:
if key in retention and retention.get(key) is not None:
unit = "bytes" if key.endswith("Bytes") else "count"
metrics["registry.%s" % key] = {"value": safe_int(retention.get(key)), "unit": unit, "label": "registry %s" % key}
return metrics
def delta_metric_rows(before, after):
before_metrics = snapshot_metric_map(before)
after_metrics = snapshot_metric_map(after)
before_ts = iso_to_epoch(before.get("observedAt"))
after_ts = iso_to_epoch(after.get("observedAt"))
seconds = (after_ts - before_ts) if before_ts is not None and after_ts is not None else None
rows = []
for key in sorted(set(before_metrics.keys()) | set(after_metrics.keys())):
old = before_metrics.get(key, {"value": 0, "unit": (after_metrics.get(key) or {}).get("unit"), "label": key})
new = after_metrics.get(key, {"value": 0, "unit": old.get("unit"), "label": old.get("label")})
delta = safe_int(new.get("value")) - safe_int(old.get("value"))
row = {
"key": key,
"label": new.get("label") or old.get("label") or key,
"unit": new.get("unit") or old.get("unit"),
"before": old.get("value"),
"after": new.get("value"),
"delta": delta,
}
if row["unit"] == "bytes":
row["beforeHuman"] = fmt_bytes(row["before"] or 0)
row["afterHuman"] = fmt_bytes(row["after"] or 0)
row["deltaHuman"] = ("-" if delta < 0 else "") + fmt_bytes(abs(delta))
if seconds and seconds > 0:
per_day = int(delta * 86400 / seconds)
row["perDayBytes"] = per_day
row["perDayHuman"] = ("-" if per_day < 0 else "") + fmt_bytes(abs(per_day)) + "/day"
rows.append(row)
rows.sort(key=lambda item: safe_int(item.get("delta")), reverse=True)
return {"durationSeconds": seconds, "metrics": rows}
def growth_trend_payload(points):
points = [point for point in points if isinstance(point, dict)]
if len(points) < 2:
return {
"pointCount": len(points),
"state": "insufficient-history",
"message": "Run snapshot at least twice to compute deltas.",
}
latest_delta = delta_metric_rows(points[-2], points[-1])
window_delta = delta_metric_rows(points[0], points[-1])
def rate_warning(delta):
seconds = delta.get("durationSeconds")
if seconds is not None and seconds < 3600:
return {
"code": "short-window-rate-noisy",
"message": "Per-day rates from windows shorter than 1 hour are directional only; use daily snapshots for governance decisions.",
"durationSeconds": seconds,
}
return None
return {
"pointCount": len(points),
"oldestAt": points[0].get("observedAt"),
"latestAt": points[-1].get("observedAt"),
"latestDelta": {
"durationSeconds": latest_delta.get("durationSeconds"),
"rateWarning": rate_warning(latest_delta),
"topGrowingBytes": [row for row in latest_delta.get("metrics", []) if row.get("unit") == "bytes" and safe_int(row.get("delta")) > 0][:10],
"topShrinkingBytes": [row for row in reversed(latest_delta.get("metrics", [])) if row.get("unit") == "bytes" and safe_int(row.get("delta")) < 0][:10],
"registryCounters": [row for row in latest_delta.get("metrics", []) if str(row.get("key", "")).startswith("registry.") and row.get("unit") == "count"],
},
"windowDelta": {
"durationSeconds": window_delta.get("durationSeconds"),
"rateWarning": rate_warning(window_delta),
"topGrowingBytes": [row for row in window_delta.get("metrics", []) if row.get("unit") == "bytes" and safe_int(row.get("delta")) > 0][:10],
"topShrinkingBytes": [row for row in reversed(window_delta.get("metrics", [])) if row.get("unit") == "bytes" and safe_int(row.get("delta")) < 0][:10],
"registryCounters": [row for row in window_delta.get("metrics", []) if str(row.get("key", "")).startswith("registry.") and row.get("unit") == "count"],
},
}
def collect_growth_snapshot(observed_at, preflight):
root_disk = df_snapshot()
sources = disk_source_snapshot()
ci_storage = ci_storage_snapshot()
registry = registry_growth_snapshot()
containerd = containerd_breakdown_snapshot()
return {
"ok": True,
"action": "gc remote snapshot",
"providerId": PROVIDER_ID,
"dryRun": True,
"mutation": False,
"diagnosticStateMutation": bool(OPTIONS.get("saveSnapshot", True)),
"observedAt": observed_at,
"rootDisk": root_disk,
"clusterPreflight": preflight,
"sources": sources,
"registry": registry,
"ciStorage": ci_storage,
"containerd": containerd,
"policy": growth_watermark_policy(root_disk or {}),
"commands": {
"snapshot": "bun scripts/cli.ts gc remote %s snapshot --include-hwlab-registry --history-limit %s" % (PROVIDER_ID, int(OPTIONS.get("historyLimit") or 12)),
"trend": "bun scripts/cli.ts gc remote %s trend --history-limit %s" % (PROVIDER_ID, int(OPTIONS.get("historyLimit") or 12)),
"registryPlan": "bun scripts/cli.ts gc remote %s plan --target-use-percent 70 --include-hwlab-registry --limit 50" % PROVIDER_ID,
"hwlabCiRetention": "bun scripts/cli.ts hwlab g14 control-plane cleanup-runs --lane v02 --min-age-minutes 30 --limit 200 --dry-run",
"agentrunRetention": "bun scripts/cli.ts agentrun v01 control-plane cleanup-runs --min-age-minutes 30 --limit 200 --dry-run",
},
}
def allocated_file_size(path):
try:
stat = os.stat(path)
@@ -1714,6 +2113,56 @@ def plan_payload(observed_at, preflight, protected, candidates, visible):
def main():
observed_at = now_iso()
preflight = cluster_preflight()
if ACTION == "trend":
history_limit = int(OPTIONS.get("historyLimit") or 12)
history = read_growth_snapshots(history_limit)
emit_json({
"ok": True,
"action": "gc remote trend",
"providerId": PROVIDER_ID,
"dryRun": True,
"mutation": False,
"observedAt": observed_at,
"statePath": growth_snapshot_path(),
"historyLimit": history_limit,
"trend": growth_trend_payload(history),
"points": history if bool(OPTIONS.get("full")) else history[-min(len(history), 3):],
"returnedPointCount": min(len(history), 3) if not bool(OPTIONS.get("full")) else len(history),
"totalPointCount": len(history),
"next": {
"snapshot": "bun scripts/cli.ts gc remote %s snapshot --include-hwlab-registry --history-limit %s" % (PROVIDER_ID, history_limit),
},
}, persist_large=True)
return 0
if ACTION == "snapshot":
history_limit = int(OPTIONS.get("historyLimit") or 12)
snapshot = collect_growth_snapshot(observed_at, preflight)
state_path = growth_snapshot_path()
if bool(OPTIONS.get("saveSnapshot", True)):
state_path = append_growth_snapshot(snapshot)
history = read_growth_snapshots(history_limit)
if not bool(OPTIONS.get("saveSnapshot", True)):
history = history + [snapshot]
snapshot.update({
"statePath": state_path,
"historyLimit": history_limit,
"saved": bool(OPTIONS.get("saveSnapshot", True)),
"trend": growth_trend_payload(history[-history_limit:]),
"history": {
"totalPointCount": len(read_growth_snapshots(1000000)) if bool(OPTIONS.get("saveSnapshot", True)) else len(history),
"returnedPointCount": len(history[-min(len(history), 3):]),
"recentPoints": history[-min(len(history), 3):] if bool(OPTIONS.get("full")) else [
{
"observedAt": item.get("observedAt"),
"rootDisk": item.get("rootDisk"),
"sourceCount": len(item.get("sources") or []),
}
for item in history[-min(len(history), 3):]
],
},
})
emit_json(snapshot, persist_large=True)
return 0
protected = collect_protected()
candidates = collect_candidates(observed_at)
visible = visible_items(candidates)
+138 -28
View File
@@ -3,6 +3,7 @@ import { existsSync, readFileSync, readdirSync } from "node:fs";
import { homedir } from "node:os";
import { join } from "node:path";
import type { UniDeskConfig } from "./config";
import { rootPath } from "./config";
import { runSshCommandCapture, type SshCaptureResult } from "./ssh";
const g14K3sRoute = "G14:k3s";
@@ -11,10 +12,12 @@ const serviceName = "sub2api";
const serviceDns = `${serviceName}.${namespace}.svc.cluster.local:8080`;
const fieldManager = "unidesk-platform-infra";
const appSecretName = "sub2api-secrets";
const poolGroupName = "unidesk-codex-pool";
const poolApiKeyName = "unidesk-codex-pool-api-key";
const poolApiKeySecretName = "sub2api-codex-pool-api-key";
const poolApiKeySecretKey = "API_KEY";
const codexPoolConfigPath = rootPath("config", "platform-infra", "sub2api-codex-pool.yaml");
const defaultPoolGroupName = "unidesk-codex-pool";
const defaultPoolApiKeyName = "unidesk-codex-pool-api-key";
const defaultPoolApiKeySecretName = "sub2api-codex-pool-api-key";
const defaultPoolApiKeySecretKey = "API_KEY";
const defaultMinOwnerBalanceUsd = 1000;
interface DisclosureOptions {
full: boolean;
@@ -42,7 +45,16 @@ interface CodexProfile {
error: string | null;
}
interface CodexPoolConfig {
groupName: string;
apiKeyName: string;
apiKeySecretName: string;
apiKeySecretKey: string;
minOwnerBalanceUsd: number;
}
export function codexPoolHelp(): unknown {
const pool = readCodexPoolConfig();
return {
command: "platform-infra sub2api codex-pool plan|sync|validate",
output: "json",
@@ -56,9 +68,10 @@ export function codexPoolHelp(): unknown {
route: g14K3sRoute,
namespace,
serviceDns,
poolGroupName,
poolApiKeySecretName,
poolApiKeySecretKey,
configPath: codexPoolConfigPath,
poolGroupName: pool.groupName,
poolApiKeySecretName: pool.apiKeySecretName,
poolApiKeySecretKey: pool.apiKeySecretKey,
secretValuesPrinted: false,
},
};
@@ -97,6 +110,7 @@ function validateOptions(args: string[], booleanOptions: Set<string>): void {
}
function codexPoolPlan(): Record<string, unknown> {
const pool = readCodexPoolConfig();
const profiles = collectCodexProfiles();
const ok = profiles.length > 0 && profiles.every((profile) => profile.ok);
return {
@@ -109,11 +123,15 @@ function codexPoolPlan(): Record<string, unknown> {
valuesPrinted: false,
},
target: poolTarget(),
config: {
path: codexPoolConfigPath,
pool,
},
profiles: profiles.map(redactProfile),
decision: {
accountType: "openai/apikey",
grouping: `All discovered Codex profiles are bound to one Sub2API group named ${poolGroupName}.`,
unifiedApiKey: `The client-facing API_KEY is controlled by k3s Secret ${namespace}/${poolApiKeySecretName}.${poolApiKeySecretKey}.`,
grouping: `All discovered Codex profiles are bound to one Sub2API group named ${pool.groupName}.`,
unifiedApiKey: `The client-facing API_KEY is controlled by k3s Secret ${namespace}/${pool.apiKeySecretName}.${pool.apiKeySecretKey}.`,
idempotency: "sync reuses the group, account names, and k3s Secret when they already exist; credentials are updated from the current local Codex files.",
configPolicy: "UniDesk-owned durable configuration remains YAML-first; local ~/.codex files and runtime Secrets are not committed.",
},
@@ -124,6 +142,7 @@ function codexPoolPlan(): Record<string, unknown> {
}
async function codexPoolSync(config: UniDeskConfig, options: SyncOptions): Promise<Record<string, unknown>> {
const pool = readCodexPoolConfig();
const profiles = collectCodexProfiles();
const planOk = profiles.length > 0 && profiles.every((profile) => profile.ok);
if (!options.confirm || !planOk) {
@@ -139,10 +158,11 @@ async function codexPoolSync(config: UniDeskConfig, options: SyncOptions): Promi
const payload = {
pool: {
groupName: poolGroupName,
apiKeyName: poolApiKeyName,
apiKeySecretName: poolApiKeySecretName,
apiKeySecretKey: poolApiKeySecretKey,
groupName: pool.groupName,
apiKeyName: pool.apiKeyName,
apiKeySecretName: pool.apiKeySecretName,
apiKeySecretKey: pool.apiKeySecretKey,
minOwnerBalanceUsd: pool.minOwnerBalanceUsd,
},
profiles: profiles.map((profile) => ({
profile: profile.profile,
@@ -158,7 +178,7 @@ async function codexPoolSync(config: UniDeskConfig, options: SyncOptions): Promi
apiKeyFingerprint: fingerprint(profile.apiKey ?? ""),
})),
};
const result = await capture(config, g14K3sRoute, ["script"], syncScript(payload));
const result = await capture(config, g14K3sRoute, ["script"], syncScript(payload, pool));
const parsed = parseJsonOutput(result.stdout);
if (options.raw) {
return {
@@ -183,7 +203,8 @@ async function codexPoolSync(config: UniDeskConfig, options: SyncOptions): Promi
}
async function codexPoolValidate(config: UniDeskConfig, options: DisclosureOptions): Promise<Record<string, unknown>> {
const result = await capture(config, g14K3sRoute, ["script"], validateScript());
const pool = readCodexPoolConfig();
const result = await capture(config, g14K3sRoute, ["script"], validateScript(pool));
const parsed = parseJsonOutput(result.stdout);
if (options.raw) {
return {
@@ -274,6 +295,35 @@ function collectCodexProfiles(): CodexProfile[] {
});
}
function readCodexPoolConfig(): CodexPoolConfig {
const defaults: CodexPoolConfig = {
groupName: defaultPoolGroupName,
apiKeyName: defaultPoolApiKeyName,
apiKeySecretName: defaultPoolApiKeySecretName,
apiKeySecretKey: defaultPoolApiKeySecretKey,
minOwnerBalanceUsd: defaultMinOwnerBalanceUsd,
};
if (!existsSync(codexPoolConfigPath)) return defaults;
const parsed = Bun.YAML.parse(readFileSync(codexPoolConfigPath, "utf8")) as unknown;
if (!isRecord(parsed)) throw new Error(`${codexPoolConfigPath} must contain a YAML object`);
const pool = parsed.pool;
if (!isRecord(pool)) throw new Error(`${codexPoolConfigPath}.pool must be a YAML object`);
const config: CodexPoolConfig = {
groupName: stringValue(pool.groupName) ?? defaults.groupName,
apiKeyName: stringValue(pool.apiKeyName) ?? defaults.apiKeyName,
apiKeySecretName: stringValue(pool.apiKeySecretName) ?? defaults.apiKeySecretName,
apiKeySecretKey: stringValue(pool.apiKeySecretKey) ?? defaults.apiKeySecretKey,
minOwnerBalanceUsd: numberValue(pool.minOwnerBalanceUsd) ?? defaults.minOwnerBalanceUsd,
};
validateKubernetesName(config.groupName, "pool.groupName", false);
validateKubernetesName(config.apiKeySecretName, "pool.apiKeySecretName", true);
if (!/^[A-Za-z_][A-Za-z0-9_]*$/u.test(config.apiKeySecretKey)) {
throw new Error(`${codexPoolConfigPath}.pool.apiKeySecretKey must be a valid secret key name`);
}
if (config.minOwnerBalanceUsd <= 0) throw new Error(`${codexPoolConfigPath}.pool.minOwnerBalanceUsd must be > 0`);
return config;
}
function readAuthAPIKey(authPath: string): { apiKey: string | null; shape: string } {
if (!existsSync(authPath)) return { apiKey: null, shape: "missing" };
const parsed = JSON.parse(readFileSync(authPath, "utf8")) as unknown;
@@ -321,15 +371,16 @@ function redactProfile(profile: CodexProfile): Record<string, unknown> {
};
}
function poolTarget(): Record<string, unknown> {
function poolTarget(pool = readCodexPoolConfig()): Record<string, unknown> {
return {
route: g14K3sRoute,
namespace,
service: serviceName,
serviceDns,
groupName: poolGroupName,
apiKeyName: poolApiKeyName,
apiKeySecret: `${namespace}/${poolApiKeySecretName}.${poolApiKeySecretKey}`,
configPath: codexPoolConfigPath,
groupName: pool.groupName,
apiKeyName: pool.apiKeyName,
apiKeySecret: `${namespace}/${pool.apiKeySecretName}.${pool.apiKeySecretKey}`,
valuesPrinted: false,
};
}
@@ -351,6 +402,20 @@ function stringValue(value: unknown): string | null {
return typeof value === "string" && value.trim().length > 0 ? value.trim() : null;
}
function numberValue(value: unknown): number | null {
if (typeof value === "number" && Number.isFinite(value)) return value;
if (typeof value === "string" && value.trim().length > 0) {
const parsed = Number(value);
return Number.isFinite(parsed) ? parsed : null;
}
return null;
}
function validateKubernetesName(value: string, key: string, strictDns: boolean): void {
const pattern = strictDns ? /^[a-z0-9]([-a-z0-9]*[a-z0-9])?$/u : /^[A-Za-z0-9]([A-Za-z0-9._-]*[A-Za-z0-9])?$/u;
if (!pattern.test(value)) throw new Error(`${codexPoolConfigPath}.${key} has an unsupported format`);
}
function isRecord(value: unknown): value is Record<string, unknown> {
return typeof value === "object" && value !== null && !Array.isArray(value);
}
@@ -359,16 +424,16 @@ function fingerprint(value: string): string {
return createHash("sha256").update(value).digest("hex").slice(0, 12);
}
function syncScript(payload: unknown): string {
function syncScript(payload: unknown, pool: CodexPoolConfig): string {
const encoded = Buffer.from(JSON.stringify(payload), "utf8").toString("base64");
return remotePythonScript("sync", encoded);
return remotePythonScript("sync", encoded, pool);
}
function validateScript(): string {
return remotePythonScript("validate", "");
function validateScript(pool: CodexPoolConfig): string {
return remotePythonScript("validate", "", pool);
}
function remotePythonScript(mode: "sync" | "validate", encodedPayload: string): string {
function remotePythonScript(mode: "sync" | "validate", encodedPayload: string, pool: CodexPoolConfig): string {
return `
set -u
python3 - <<'PY'
@@ -386,10 +451,11 @@ SERVICE_NAME = "${serviceName}"
SERVICE_DNS = "${serviceDns}"
FIELD_MANAGER = "${fieldManager}"
APP_SECRET_NAME = "${appSecretName}"
POOL_GROUP_NAME = "${poolGroupName}"
POOL_API_KEY_NAME = "${poolApiKeyName}"
POOL_API_KEY_SECRET_NAME = "${poolApiKeySecretName}"
POOL_API_KEY_SECRET_KEY = "${poolApiKeySecretKey}"
POOL_GROUP_NAME = "${pool.groupName}"
POOL_API_KEY_NAME = "${pool.apiKeyName}"
POOL_API_KEY_SECRET_NAME = "${pool.apiKeySecretName}"
POOL_API_KEY_SECRET_KEY = "${pool.apiKeySecretKey}"
MIN_OWNER_BALANCE_USD = ${JSON.stringify(pool.minOwnerBalanceUsd)}
MODE = "${mode}"
PAYLOAD_B64 = "${encodedPayload}"
@@ -706,6 +772,38 @@ def ensure_sub2api_api_key(token, api_key, group_id):
"id": existing.get("id") if isinstance(existing, dict) else None,
"name": existing.get("name") if isinstance(existing, dict) else POOL_API_KEY_NAME,
"groupId": existing.get("group_id") if isinstance(existing, dict) else group_id,
"userId": existing.get("user_id") if isinstance(existing, dict) else None,
}
def get_admin_user(token, user_id):
data = ensure_success(curl_api("GET", f"/api/v1/admin/users/{user_id}", bearer=token), "get API key owner")
if not isinstance(data, dict):
raise RuntimeError("API key owner response is not an object")
return data
def ensure_pool_owner_balance(token, user_id):
user = get_admin_user(token, user_id)
current = float(user.get("balance") or 0)
if current >= MIN_OWNER_BALANCE_USD:
return {
"action": "kept-existing",
"userId": user_id,
"balanceBefore": current,
"balanceAfter": current,
"minimumBalanceUsd": MIN_OWNER_BALANCE_USD,
}
updated = ensure_success(curl_api("POST", f"/api/v1/admin/users/{user_id}/balance", bearer=token, payload={
"balance": MIN_OWNER_BALANCE_USD,
"operation": "set",
"notes": "UniDesk Sub2API Codex pool internal API key bootstrap balance.",
}), "set API key owner balance")
after = float(updated.get("balance") or MIN_OWNER_BALANCE_USD) if isinstance(updated, dict) else MIN_OWNER_BALANCE_USD
return {
"action": "set",
"userId": user_id,
"balanceBefore": current,
"balanceAfter": after,
"minimumBalanceUsd": MIN_OWNER_BALANCE_USD,
}
def validate_gateway(api_key):
@@ -744,6 +842,7 @@ def run_sync():
account_results = ensure_accounts(token, profiles, group_id)
api_key, secret_action, secret_apply_stdout = ensure_api_key_secret(group_id)
api_key_result = ensure_sub2api_api_key(token, api_key, group_id)
owner_balance = ensure_pool_owner_balance(token, api_key_result["userId"])
gateway = validate_gateway(api_key)
return {
"ok": gateway["ok"] is True,
@@ -768,9 +867,11 @@ def run_sync():
"sub2apiAction": api_key_result["action"],
"sub2apiId": api_key_result["id"],
"groupId": api_key_result["groupId"],
"userId": api_key_result["userId"],
"keyPreview": api_key_preview(api_key),
"valuesPrinted": False,
},
"ownerBalance": owner_balance,
"validation": {"gatewayModels": gateway},
}
@@ -778,6 +879,11 @@ def run_validate():
api_key = decode_secret_value(POOL_API_KEY_SECRET_NAME, POOL_API_KEY_SECRET_KEY)
if not api_key:
raise RuntimeError(f"{POOL_API_KEY_SECRET_NAME}.{POOL_API_KEY_SECRET_KEY} missing")
admin_email, token = login()
key_item = next((item for item in list_user_keys(token) if item.get("key") == api_key), None)
owner_balance = None
if key_item is not None and key_item.get("user_id") is not None:
owner_balance = ensure_pool_owner_balance(token, key_item["user_id"])
gateway = validate_gateway(api_key)
return {
"ok": gateway["ok"] is True,
@@ -785,11 +891,15 @@ def run_validate():
"namespace": NAMESPACE,
"serviceDns": SERVICE_DNS,
"appPod": APP_POD,
"admin": {"email": admin_email, "tokenPrinted": False},
"apiKey": {
"secret": f"{NAMESPACE}/{POOL_API_KEY_SECRET_NAME}.{POOL_API_KEY_SECRET_KEY}",
"sub2apiId": key_item.get("id") if isinstance(key_item, dict) else None,
"userId": key_item.get("user_id") if isinstance(key_item, dict) else None,
"keyPreview": api_key_preview(api_key),
"valuesPrinted": False,
},
"ownerBalance": owner_balance,
"validation": {"gatewayModels": gateway},
}