fix: bound remote gc stdout
This commit is contained in:
@@ -70,6 +70,7 @@ Registry 执行必须以远端异步 job 完成,并具备以下维护保护:
|
||||
- 缩容 registry 后运行官方 `registry:2.8.3` garbage-collect pod。
|
||||
- finally 阶段删除 GC pod、恢复 registry replicas、等待 rollout、恢复 CronJob suspend 状态。
|
||||
- 状态查询使用 `gc remote G14 status --job-id <id>`,不使用长 SSH 会话等待。
|
||||
- `gc remote` 的 stdout 是有界 JSON:当 `--full` 或大量候选导致结果过大时,完整结果会写入远端 `/tmp/unidesk-gc-remote/jobs/<job>.json`,stdout 只返回摘要、`jobId`、`statePath` 和 `statusCommand`,再用 `gc remote G14 status --job-id <job>` 渐进查询,避免输出爆炸被误判为 JSON 失败。
|
||||
|
||||
## Safe Stop Line
|
||||
|
||||
|
||||
@@ -276,6 +276,7 @@ REGISTRY_PROTECTED_TAGS = set([
|
||||
|
||||
EXPECTED_G14_NODE = "ubuntu-rog-zephyrus-g14-ga401iv-ga401iv"
|
||||
REMOTE_GC_JOB_DIR = "/tmp/unidesk-gc-remote/jobs"
|
||||
REMOTE_STDOUT_JSON_LIMIT = 256 * 1024
|
||||
|
||||
def now_iso():
|
||||
return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
|
||||
@@ -315,6 +316,9 @@ def job_paths(job_id):
|
||||
"log": os.path.join(REMOTE_GC_JOB_DIR, "%s.log" % job_id),
|
||||
}
|
||||
|
||||
def status_command(job_id):
|
||||
return "bun scripts/cli.ts gc remote %s status --job-id %s" % (PROVIDER_ID, job_id)
|
||||
|
||||
def write_json_atomic(path, payload):
|
||||
tmp = "%s.tmp.%s" % (path, os.getpid())
|
||||
with open(tmp, "w", encoding="utf-8") as handle:
|
||||
@@ -333,6 +337,89 @@ def read_file_tail(path, limit=12000):
|
||||
except OSError:
|
||||
return ""
|
||||
|
||||
def stdout_page(items):
|
||||
if not isinstance(items, list):
|
||||
return items
|
||||
raw_limit = OPTIONS.get("resultLimit") or OPTIONS.get("limit") or 50
|
||||
try:
|
||||
limit = int(raw_limit)
|
||||
except Exception:
|
||||
limit = 50
|
||||
limit = max(1, min(limit, 100))
|
||||
return items[:limit]
|
||||
|
||||
def compact_payload_for_stdout(payload, full_size_bytes, job_id=None, paths=None):
|
||||
compact = {
|
||||
"ok": payload.get("ok", True),
|
||||
"action": payload.get("action") or "gc remote",
|
||||
"providerId": payload.get("providerId") or PROVIDER_ID,
|
||||
"output": {
|
||||
"truncated": True,
|
||||
"reason": "stdout-size-guard",
|
||||
"fullResultBytes": full_size_bytes,
|
||||
},
|
||||
}
|
||||
for key in [
|
||||
"dryRun", "mutation", "observedAt", "status", "kind", "mode",
|
||||
"startedAt", "finishedAt", "error", "message", "options",
|
||||
"diskBefore", "diskAfter", "clusterPreflight", "clusterAfter",
|
||||
"summary", "policy",
|
||||
]:
|
||||
if key in payload:
|
||||
compact[key] = payload[key]
|
||||
if job_id:
|
||||
state_path = paths["state"] if paths else payload.get("statePath")
|
||||
compact["jobId"] = job_id
|
||||
compact["statePath"] = state_path
|
||||
compact["statusCommand"] = status_command(job_id)
|
||||
compact["fullResult"] = {
|
||||
"jobId": job_id,
|
||||
"statePath": state_path,
|
||||
"statusCommand": status_command(job_id),
|
||||
}
|
||||
compact["output"]["fullResultJobId"] = job_id
|
||||
if "results" in payload:
|
||||
results = payload.get("results") or []
|
||||
compact["results"] = stdout_page(results)
|
||||
compact["returnedResultCount"] = len(compact["results"])
|
||||
compact["omittedResultCount"] = max(0, len(results) - len(compact["results"])) if isinstance(results, list) else 0
|
||||
if "candidates" in payload:
|
||||
candidates = payload.get("candidates") or []
|
||||
compact["candidates"] = stdout_page(candidates)
|
||||
compact["returnedCandidateCount"] = len(compact["candidates"])
|
||||
compact["omittedCandidateCount"] = max(0, len(candidates) - len(compact["candidates"])) if isinstance(candidates, list) else 0
|
||||
if "protected" in payload:
|
||||
compact["protected"] = payload["protected"]
|
||||
if "logTail" in payload:
|
||||
compact["logTail"] = str(payload.get("logTail") or "")[-12000:]
|
||||
return compact
|
||||
|
||||
def emit_json(payload, persist_large=True):
|
||||
raw = json.dumps(payload, ensure_ascii=False, indent=2)
|
||||
full_size = len(raw.encode("utf-8"))
|
||||
if full_size <= REMOTE_STDOUT_JSON_LIMIT:
|
||||
print(raw)
|
||||
return
|
||||
job_id = str(payload.get("jobId") or "")
|
||||
paths = None
|
||||
if persist_large:
|
||||
if not job_id:
|
||||
provider_slug = re.sub(r"[^A-Za-z0-9._-]+", "-", PROVIDER_ID.lower()).strip("-") or "provider"
|
||||
job_id = "%s-gc-output-%s-%s" % (provider_slug, int(time.time()), os.getpid())
|
||||
paths = job_paths(job_id)
|
||||
payload = dict(payload)
|
||||
payload.update({
|
||||
"jobId": job_id,
|
||||
"statePath": paths["state"],
|
||||
"statusCommand": status_command(job_id),
|
||||
"outputPersistedAt": now_iso(),
|
||||
})
|
||||
write_json_atomic(paths["state"], payload)
|
||||
elif job_id:
|
||||
paths = job_paths(job_id)
|
||||
compact = compact_payload_for_stdout(payload, full_size, job_id or None, paths)
|
||||
print(json.dumps(compact, ensure_ascii=False, indent=2))
|
||||
|
||||
def remote_gc_job_status():
|
||||
job_id = job_id_or_none()
|
||||
if not job_id:
|
||||
@@ -1560,16 +1647,16 @@ def main():
|
||||
candidates = collect_candidates(observed_at)
|
||||
visible = visible_items(candidates)
|
||||
if ACTION == "plan":
|
||||
print(json.dumps(plan_payload(observed_at, preflight, protected, candidates, visible), ensure_ascii=False, indent=2))
|
||||
emit_json(plan_payload(observed_at, preflight, protected, candidates, visible), persist_large=True)
|
||||
return 0
|
||||
if ACTION == "status":
|
||||
print(json.dumps(remote_gc_job_status(), ensure_ascii=False, indent=2))
|
||||
emit_json(remote_gc_job_status(), persist_large=False)
|
||||
return 0
|
||||
if ACTION != "run":
|
||||
print(json.dumps({"ok": False, "error": "unsupported-remote-gc-action", "action": ACTION}, ensure_ascii=False, indent=2))
|
||||
emit_json({"ok": False, "error": "unsupported-remote-gc-action", "action": ACTION}, persist_large=False)
|
||||
return 0
|
||||
if PROVIDER_ID.upper() == "G14" and not preflight.get("ok"):
|
||||
print(json.dumps({
|
||||
emit_json({
|
||||
"ok": False,
|
||||
"error": "gc-remote-g14-preflight-failed",
|
||||
"action": "gc remote run",
|
||||
@@ -1578,7 +1665,7 @@ def main():
|
||||
"mutation": False,
|
||||
"clusterPreflight": preflight,
|
||||
"plan": plan_payload(observed_at, preflight, protected, candidates, visible),
|
||||
}, ensure_ascii=False, indent=2))
|
||||
}, persist_large=True)
|
||||
return 0
|
||||
disk_before = df_snapshot()
|
||||
results = []
|
||||
@@ -1624,7 +1711,7 @@ def main():
|
||||
"results": returned,
|
||||
"protected": protected,
|
||||
}
|
||||
print(json.dumps(payload, ensure_ascii=False, indent=2))
|
||||
emit_json(payload, persist_large=True)
|
||||
return 0
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
Reference in New Issue
Block a user