diff --git a/.agents/skills/unidesk-ops/SKILL.md b/.agents/skills/unidesk-ops/SKILL.md index e400820a..3a4d3035 100644 --- a/.agents/skills/unidesk-ops/SKILL.md +++ b/.agents/skills/unidesk-ops/SKILL.md @@ -31,11 +31,11 @@ bun scripts/cli.ts platform-infra egress-proxy k3s-build-benchmark --targets D60 bun scripts/cli.ts platform-infra egress-proxy k3s-build-benchmark \ --targets D601,D518 --profile real-deps-500m --dry-run -# 启动 benchmark:fire-and-forget,之后用 status/logs/traffic 短轮询。 +# 启动 benchmark:fire-and-forget,同时启动 stage recorder 持久化阶段流量证据。 bun scripts/cli.ts platform-infra egress-proxy k3s-build-benchmark \ --targets D601,D518 --profile real-deps-500m --confirm -# 状态表:看 APK/NPM/GO/GIT_MIRROR/REAL_DEPS、failure family 和 proxyserver 采样。 +# 状态表:看 APK/NPM/GO/GIT_MIRROR/REAL_DEPS、STAGE_PROXY、failure family 和可选即时采样。 bun scripts/cli.ts platform-infra egress-proxy k3s-build-benchmark status \ --targets D601,D518 --profile real-deps-500m --traffic-sample-seconds 15 @@ -52,7 +52,7 @@ bun scripts/cli.ts platform-infra egress-proxy k3s-build-benchmark cleanup \ --targets D601,D518 --profile real-deps-500m --confirm ``` -D601/D518 结果必须分表记录:`STATE`、Job/run、duration、`APK/NPM/GO/GIT_MIRROR/REAL_DEPS`、`TRAFFIC_WINDOW`、`TRAFFIC_RATE`、`PROXY_CUM`、`TOP_CLIENT`、`TOP_DEST` 和 `FAILURE`。D518 通过不代表 D601 通过;D601 只证明 k3s/containerd 走到 proxy 也不等于性能达标。未完成 500MiB+ 的真实 k3s image pull + apk/npm/go/git mirror 测试前,不关闭对应 issue,不合并标记为等待运行面验收的 PR。 +D601/D518 结果必须分表记录:`STATE`、Job/run、duration、`APK/NPM/GO/GIT_MIRROR/REAL_DEPS`、`STAGE_PROXY`、`STAGE_PROXY_EVIDENCE`、`TRAFFIC_WINDOW`、`TRAFFIC_RATE`、`PROXY_CUM`、`TOP_CLIENT`、`TOP_DEST` 和 `FAILURE`。D518 通过不代表 D601 通过;D601 只证明 k3s/containerd 走到 proxy 也不等于性能达标。未完成 500MiB+ 的真实 k3s image pull + apk/npm/go/git mirror 测试前,不关闭对应 issue,不合并标记为等待运行面验收的 PR。 ## Egress Proxy 运行面修复入口 @@ -70,6 +70,7 @@ D601 若需要让 `sub2api-egress-proxy` 绕开 pod overlay,可在 YAML 中显 - `TOP_CLIENT=10.42.0.1` 且 `TOP_DEST=registry-1.docker.io:443`:k3s/containerd image pull 已从 proxyserver 视角可见。 - `TOP_DEST=dl-cdn.alpinelinux.org:443`:Pod 内 `apk` 阶段已走 proxy。 - `registry.npmjs.org:443`、`proxy.golang.org:443`、`github.com:443`:分别对应 `npm install`、Go module 拉取和 Git mirror clone/sync。 +- `STAGE_PROXY_EVIDENCE` 是 Job annotation 中持久化的阶段证据;最终 `succeeded` 后仍应保留 apk/npm/go/git-mirror 各阶段 top destination、window bytes、max rate 和 proxy cumulative。`TRAFFIC_*` 列只是 status 命令即时采样。 - `image-pull` failure 表示还卡在 kubelet/containerd 拉镜像;`apk-download`、`npm-download`、`go-download`、`git-mirror` 分别表示 Pod 内依赖阶段失败。 - proxy 窗口 `0 B/s` 但 active cumulative 增长很慢时,按性能不达标处理;先清理 Job,再继续查上游,不要让慢速 benchmark 长时间占用资源。 diff --git a/config/platform-infra/egress-proxy-benchmarks.yaml b/config/platform-infra/egress-proxy-benchmarks.yaml index 58a0a8eb..bc1480f6 100644 --- a/config/platform-infra/egress-proxy-benchmarks.yaml +++ b/config/platform-infra/egress-proxy-benchmarks.yaml @@ -7,6 +7,7 @@ metadata: - 1032 - 1048 - 1077 + - 1110 profiles: real-deps-500m: diff --git a/docs/reference/platform-infra.md b/docs/reference/platform-infra.md index d8111df8..508866be 100644 --- a/docs/reference/platform-infra.md +++ b/docs/reference/platform-infra.md @@ -165,11 +165,11 @@ For an externally backed active target, client traffic reaches PK01 Caddy, PK01 When target-level `egressProxy.enabled=true`, the D601 target renders an in-cluster HTTP/mixed proxy client from the proxy source declared in YAML. The current mature external-egress shape is `sourceType: master-shadowsocks`: master Docker runs `shadowsocks-rust` from `config/platform-infra/sub2api-master-egress-proxy.compose.yaml`, while D601 runs `sing-box` to expose the ClusterIP proxy consumed by Sub2API and, when requested by YAML, the Codex account sentinel. A subscription-backed source is still just another YAML-declared source type; long-term prose must not duplicate the current endpoint, port, password, image tag, or health URL values from YAML/compose. -`platform-infra egress-proxy traffic --target --sample-seconds ` is the proxyserver-side observation entry. It reads the sing-box Clash API through the proxy Pod loopback, reports current per-client rate plus bounded-window cumulative bytes, and includes proxy process cumulative bytes when sing-box reports them. Use this together with k3s CI/CD build benchmarks when proving proxy acceleration or diagnosing whether a workload actually traverses the proxy; client-side timings alone are not enough evidence. +`platform-infra egress-proxy traffic --target --sample-seconds ` is the proxyserver-side observation entry. It reads the sing-box Clash API through the proxy Pod loopback, reports current per-client rate plus bounded-window cumulative bytes, and includes proxy process cumulative bytes when sing-box reports them. Use this together with k3s CI/CD build benchmarks when diagnosing whether a workload is currently traversing the proxy; client-side timings alone are not enough evidence. The egress proxy Deployment may opt into `hostNetwork: true` per target via `config/platform-infra/sub2api.yaml` `targets[].egressProxy.hostNetwork`. When enabled, the manifest renders `hostNetwork: true`, `dnsPolicy: ClusterFirstWithHostNet`, and a RollingUpdate strategy of `maxSurge=0`/`maxUnavailable=1` so the sing-box client bypasses the pod overlay and connects the master upstream directly from the node network; this is the durable fix for a target whose pod-overlay path to the upstream is the throughput bottleneck. It is a per-target YAML decision, not a D601-only default: a target whose pod overlay is already fast enough must keep `hostNetwork: false`, and the `no-host-network` policy check only permits `hostNetwork: true` on the single YAML-declared egress proxy Deployment for a target whose `egressProxy.hostNetwork=true`. Do not generalize one target's hostNetwork experiment to other nodes, and do not leave a one-off `kubectl patch` as the final state; promote or demote hostNetwork only by editing the target YAML and running `platform-infra sub2api apply --target `. -`platform-infra egress-proxy k3s-build-benchmark --targets --profile real-deps-500m` is the production-ready egress proxy throughput acceptance entry. The `real-deps-500m` profile in `config/platform-infra/egress-proxy-benchmarks.yaml` is the only acceptance profile: it renders one Job per target whose kubelet/containerd pulls remote `alpine`, `node` and `golang` images with `imagePullPolicy: Always`, then runs Pod-internal `apk add`, `npm install`, `go mod download` and `git clone --mirror` plus `remote update --prune` stages through the YAML-declared proxy env. Acceptance requires `STATE=succeeded`, `REAL_DEPS >= 500 MiB` (the profile's `realDeps.minProxyMiB`), image-pull plus apk/npm/go/git-mirror evidence, and a proxyserver-observed cumulative traffic above the profile minimum. Cloudflare synthetic downloads and curl-only probes are bypass diagnostics, never acceptance evidence. Status/logs/traffic are short-polled; a started benchmark is fire-and-forget and must be `cleanup`-ed when it stalls or after acceptance to release k3s resources. D601 and D518 must both pass the same profile: a single node passing does not close a cross-node proxy issue, and an optimization on one target must not regress the other. +`platform-infra egress-proxy k3s-build-benchmark --targets --profile real-deps-500m` is the production-ready egress proxy throughput acceptance entry. The `real-deps-500m` profile in `config/platform-infra/egress-proxy-benchmarks.yaml` is the only acceptance profile: it renders one Job per target whose kubelet/containerd pulls remote `alpine`, `node` and `golang` images with `imagePullPolicy: Always`, then runs Pod-internal `apk add`, `npm install`, `go mod download` and `git clone --mirror` plus `remote update --prune` stages through the YAML-declared proxy env. Acceptance requires `STATE=succeeded`, `REAL_DEPS >= 500 MiB` (the profile's `realDeps.minProxyMiB`), image-pull plus apk/npm/go/git-mirror evidence, and proxyserver-observed cumulative traffic above the profile minimum. The command starts a short-lived stage recorder with the benchmark Job; status reads its `STAGE_PROXY_EVIDENCE` from Job annotations so final `succeeded` rows can still show each dependency stage's top destination, window bytes, max rate and proxy cumulative even after the stage has ended. `TRAFFIC_*` columns are immediate diagnostics from the current status call, not the durable acceptance source. Cloudflare synthetic downloads and curl-only probes are bypass diagnostics, never acceptance evidence. Status/logs/traffic are short-polled; a started benchmark is fire-and-forget and must be `cleanup`-ed when it stalls or after acceptance to release k3s resources. D601 and D518 must both pass the same profile: a single node passing does not close a cross-node proxy issue, and an optimization on one target must not regress the other. `platform-infra sub2api validate --target D601 --full` must prove the proxy Deployment/Service is ready and that an app pod can complete the YAML-declared health probe through the proxy. This target-level injection does not by itself bind manually created Sub2API accounts to that proxy; account tests and account-specific upstream transports still need a YAML-declared `manualAccounts.protected[].proxyBinding` when the account must avoid direct egress. Proxy credentials, subscription contents, and generated proxy configs are Secret material and must not be printed. If a direct D601-to-upstream TLS/SNI path is reset, do not leave a one-off plain HTTP CONNECT or JS proxy as the durable fix; use a mature encrypted proxy source, currently master `shadowsocks-rust` plus D601 `sing-box`, through YAML/compose. diff --git a/project-management/PJ2026-01/specs/PJ2026-01060310-real-k3s-deps-proxy-benchmark.md b/project-management/PJ2026-01/specs/PJ2026-01060310-real-k3s-deps-proxy-benchmark.md index 3a5520f4..0e8c025c 100644 --- a/project-management/PJ2026-01/specs/PJ2026-01060310-real-k3s-deps-proxy-benchmark.md +++ b/project-management/PJ2026-01/specs/PJ2026-01060310-real-k3s-deps-proxy-benchmark.md @@ -18,6 +18,8 @@ If the Kubernetes image pull stage fails, the benchmark result is not an applica `platform-infra egress-proxy k3s-build-benchmark` remains the single coordinator. It reads targets from `config/platform-infra/sub2api.yaml`, reads profiles from `config/platform-infra/egress-proxy-benchmarks.yaml`, renders one Job per target, and uses `trans sh -- ...` as the bounded control path. +For `k3s-real-deps`, start also launches a short-lived stage recorder on the same k3s route. The recorder samples the target proxyserver through the proxy Pod loopback, associates traffic with the benchmark Pod IP and current dependency stage, and writes compact stage evidence into the benchmark Job annotation. It exits when the Job completes, fails, disappears, or reaches the benchmark deadline. + The `real-deps-500m` profile renders a multi-stage Kubernetes Job: - `initContainer/apk-add`: image `docker.io/library/alpine:3.20`. @@ -30,7 +32,7 @@ All dependency init containers receive the YAML-declared `sub2api-egress-proxy` ## Observability -The source of truth for traffic is `platform-infra egress-proxy traffic --target --sample-seconds N`. Benchmark status may include this traffic sample. The final evidence table must include proxyserver window bytes/rate/cumulative bytes, top client, and top destination. +The source of truth for durable benchmark evidence is the `STAGE_PROXY_EVIDENCE` table rendered by `k3s-build-benchmark status/logs` from Job annotations. `platform-infra egress-proxy traffic --target --sample-seconds N` remains an immediate diagnostic for the current proxyserver window. The final evidence table must include proxyserver window bytes/rate/cumulative bytes, top client, and top destination per dependency stage. For image pull traffic, the observed proxy client may be the node/k3s/containerd path rather than the benchmark Pod IP. For `apk`, `npm`, `go`, and `git-mirror` stages, the observed proxy client should correspond to the benchmark Pod network path. This distinction must be preserved in issue evidence. @@ -59,6 +61,6 @@ This benchmark must not: - `bun scripts/cli.ts platform-infra egress-proxy k3s-build-benchmark --targets D601,D518 --profile real-deps-500m --dry-run` prints both node plans and the remote image set. - `--confirm` creates one unique Job per node and returns immediately. -- `status --traffic-sample-seconds 15` reports Job state, `image-pull`/`apk`/`npm`/`go`/`git-mirror` failure family when applicable, and proxyserver traffic columns. -- D601 and D518 both have final rows with target, profile, job, state, duration, apk MiB, npm MiB, go MiB, git mirror MiB, proxy traffic window/rate/cumulative, top client, top destination, and failure family. +- `status --traffic-sample-seconds 15` reports Job state, `image-pull`/`apk`/`npm`/`go`/`git-mirror` failure family when applicable, durable `STAGE_PROXY_EVIDENCE`, and optional immediate proxyserver traffic columns. +- D601 and D518 both have final rows with target, profile, job, state, duration, apk MiB, npm MiB, go MiB, git mirror MiB, `STAGE_PROXY`, stage proxy window/rate/cumulative, top client, top destination, and failure family. - Acceptance requires at least 500 MiB of proxyserver-observed traffic per successful node run. If a node cannot reach that point because image pull fails, the issue remains open until the k3s/containerd image pull proxy path is fixed or a blocker is explicitly documented. diff --git a/scripts/src/platform-infra-k3s-build-benchmark.ts b/scripts/src/platform-infra-k3s-build-benchmark.ts index c6111974..0d0f6146 100644 --- a/scripts/src/platform-infra-k3s-build-benchmark.ts +++ b/scripts/src/platform-infra-k3s-build-benchmark.ts @@ -11,6 +11,8 @@ import { resolveTarget } from "./platform-infra/manifest"; const BENCHMARK_CONFIG_PATH = "config/platform-infra/egress-proxy-benchmarks.yaml"; const BENCHMARK_APP = "unidesk-k3s-build-benchmark"; +const STAGE_PROXY_EVIDENCE_ANNOTATION = "unidesk.ai/stage-proxy-evidence"; +const REAL_DEPS_STAGE_ORDER = ["apk", "npm", "go", "git-mirror"]; type K3sBuildAction = "start" | "status" | "logs" | "cleanup"; type ImagePullPolicy = "Always" | "IfNotPresent" | "Never"; @@ -120,11 +122,25 @@ interface TargetStatus { goMiB: number | null; gitMirrorMiB: number | null; realDepsMiB: number | null; + currentStage: string; + stageEvidence: readonly StageProxyEvidence[]; failureFamily: string; logTail: string; traffic?: TrafficSummary; } +interface StageProxyEvidence { + stage: string; + topDestination: string; + topClient: string; + windowBytes: number; + maxRateBps: number; + proxyCumulativeBytes: number; + firstObservedAt: string; + lastObservedAt: string; + samples: number; +} + interface TrafficSummary { ok: boolean; reason: string; @@ -249,7 +265,9 @@ function startRowDetail(row: TargetPlan & { started: boolean; state: string; job if (!row.ok) return `${row.blocker}: ${row.detail}`; if (row.started) { const replaced = record(row.result).replaced; - return replaced === undefined || replaced === 0 ? "status/logs/traffic" : `status/logs/traffic replaced=${replaced}`; + const recorder = text(record(row.result).stageRecorder, ""); + const prefix = replaced === undefined || replaced === 0 ? "status/logs/traffic" : `status/logs/traffic replaced=${replaced}`; + return recorder === "" || recorder === "disabled" ? prefix : `${prefix} stage-recorder=${recorder}`; } const result = record(row.result); const stderr = text(result.stderrPreview, ""); @@ -275,6 +293,7 @@ function statusBenchmarks(plans: readonly TargetPlan[], options: K3sBuildBenchma status.targetId, status.profile, status.state, + status.currentStage, status.jobName, status.durationSeconds === null ? "-" : `${status.durationSeconds}s`, status.outputMiB === null ? "-" : `${status.outputMiB}MiB`, @@ -284,6 +303,7 @@ function statusBenchmarks(plans: readonly TargetPlan[], options: K3sBuildBenchma status.goMiB === null ? "-" : `${status.goMiB}MiB`, status.gitMirrorMiB === null ? "-" : `${status.gitMirrorMiB}MiB`, status.realDepsMiB === null ? "-" : `${status.realDepsMiB}MiB`, + stageEvidenceSummary(status.stageEvidence), status.traffic === undefined ? "-" : bytes(status.traffic.windowBytes), status.traffic === undefined ? "-" : rate(status.traffic.rateBps), status.traffic === undefined ? "-" : bytes(status.traffic.processTotalBytes), @@ -301,14 +321,15 @@ function statusBenchmarks(plans: readonly TargetPlan[], options: K3sBuildBenchma renderedText: [ "PLATFORM-INFRA K3S BUILD BENCHMARK STATUS", "", - ...table(["TARGET", "PROFILE", "STATE", "JOB", "DURATION", "OUTPUT", "DOWNLOAD", "APK", "NPM", "GO", "GIT_MIRROR", "REAL_DEPS", "TRAFFIC_WINDOW", "TRAFFIC_RATE", "PROXY_CUM", "TOP_CLIENT", "TOP_DEST", "FAILURE"], rows), + ...table(["TARGET", "PROFILE", "STATE", "STAGE", "JOB", "DURATION", "OUTPUT", "DOWNLOAD", "APK", "NPM", "GO", "GIT_MIRROR", "REAL_DEPS", "STAGE_PROXY", "TRAFFIC_WINDOW", "TRAFFIC_RATE", "PROXY_CUM", "TOP_CLIENT", "TOP_DEST", "FAILURE"], rows), + ...stageEvidenceSections(statuses), ...logSections, "", "NEXT", ` bun scripts/cli.ts platform-infra egress-proxy k3s-build-benchmark status --targets ${statuses.map((status) => status.targetId).join(",")} --profile ${options.profile} --traffic-sample-seconds 15`, ` bun scripts/cli.ts platform-infra egress-proxy k3s-build-benchmark logs --targets ${statuses.map((status) => status.targetId).join(",")} --profile ${options.profile}`, "", - "Disclosure: traffic columns are proxyserver-side samples only when --traffic-sample-seconds is set; Secret/proxy values are redacted.", + "Disclosure: traffic columns are proxyserver-side samples only when --traffic-sample-seconds is set; STAGE_PROXY_EVIDENCE is persisted by the benchmark stage recorder in Job annotations; Secret/proxy values are redacted.", ].join("\n"), }; } @@ -669,10 +690,28 @@ if ! command -v git >/dev/null 2>&1; then apk add --no-cache git ca-certificates fi git --version -git -c "http.proxy=\${HTTP_PROXY:-}" -c "https.proxy=\${HTTPS_PROXY:-}" -c http.lowSpeedLimit=1024 -c http.lowSpeedTime=120 clone --mirror --filter=blob:none "$GIT_MIRROR_REMOTE" /work/git-mirror/repo.git -git -c "http.proxy=\${HTTP_PROXY:-}" -c "https.proxy=\${HTTPS_PROXY:-}" --git-dir=/work/git-mirror/repo.git remote update --prune -git -c "http.proxy=\${HTTP_PROXY:-}" -c "https.proxy=\${HTTPS_PROXY:-}" --git-dir=/work/git-mirror/repo.git remote -v -git -c "http.proxy=\${HTTP_PROXY:-}" -c "https.proxy=\${HTTPS_PROXY:-}" ls-remote --heads "$GIT_MIRROR_REMOTE" >/work/git-mirror/ls-remote-heads.txt +git_proxy() { + git -c "http.proxy=\${HTTP_PROXY:-}" -c "https.proxy=\${HTTPS_PROXY:-}" -c http.version=HTTP/1.1 -c http.lowSpeedLimit=1024 -c http.lowSpeedTime=120 "$@" +} +attempt=1 +git_ok=0 +while [ "$attempt" -le 3 ]; do + rm -rf /work/git-mirror/repo.git /work/git-mirror/ls-remote-heads.txt + if git_proxy clone --mirror --filter=blob:none "$GIT_MIRROR_REMOTE" /work/git-mirror/repo.git \\ + && git_proxy --git-dir=/work/git-mirror/repo.git remote update --prune \\ + && git_proxy --git-dir=/work/git-mirror/repo.git remote -v \\ + && git_proxy ls-remote --heads "$GIT_MIRROR_REMOTE" >/work/git-mirror/ls-remote-heads.txt; then + git_ok=1 + break + fi + echo "git-mirror-attempt-failed attempt=$attempt remote=$GIT_MIRROR_REMOTE" >&2 + attempt=$((attempt + 1)) + sleep $((attempt * 5)) +done +if [ "$git_ok" != "1" ]; then + echo "git-mirror-failed-after-retries remote=$GIT_MIRROR_REMOTE" >&2 + exit 44 +fi git_mib="$(du -sk /work/git-mirror/repo.git /work/git-mirror/ls-remote-heads.txt 2>/dev/null | awk '{s+=$1} END {printf "%d", int((s+1023)/1024)}')" printf 'gitMirrorMiB=%s\\n' "$git_mib" > /work/stages/git-mirror.env printf 'UNIDESK_K3S_REAL_DEPS_STAGE {"stage":"git-mirror","ok":true,"image":"%s","remote":"%s","mirrorMiB":%s}\\n' "$GIT_MIRROR_IMAGE" "$GIT_MIRROR_REMOTE" "$git_mib" @@ -855,6 +894,9 @@ function benchmarkLabels(target: Sub2ApiTargetConfig, profile: K3sBuildBenchmark function startScript(manifest: Record, target: Sub2ApiTargetConfig, profile: K3sBuildBenchmarkProfile, runId: string, jobName: string): string { const yaml = `${JSON.stringify(manifest, null, 2)}\n`; const encoded = Buffer.from(yaml, "utf8").toString("base64"); + const proxy = target.egressProxy; + const proxySelector = proxy === null ? "" : `app.kubernetes.io/name=${proxy.deploymentName},app.kubernetes.io/component=egress-proxy`; + const recorderEnabled = profile.workload === "k3s-real-deps" && proxy !== null; return ` set -eu tmp="$(mktemp -d)" @@ -872,7 +914,289 @@ if [ "$replaced" != "0" ]; then kubectl -n ${shQuote(target.namespace)} delete jobs -l "$selector" --ignore-not-found >/dev/null 2>&1 fi kubectl apply -f "$manifest" >/dev/null -printf '{"ok":true,"jobName":"%s","namespace":"%s","target":"%s","runId":"%s","profile":"%s","replaced":%s}\\n' ${shQuote(jobName)} ${shQuote(target.namespace)} ${shQuote(target.id)} ${shQuote(runId)} ${shQuote(profile.id)} "$replaced" +stage_recorder="disabled" +recorder_pid="" +if [ ${recorderEnabled ? "1" : "0"} = 1 ]; then + recorder_dir="/tmp/unidesk-k3s-build-benchmark" + mkdir -p "$recorder_dir" + recorder_script="$recorder_dir/${jobName}.stage-recorder.py" + recorder_log="$recorder_dir/${jobName}.stage-recorder.log" + cat > "$recorder_script" <<'PY' +${stageProxyRecorderPython()} +PY + nohup python3 "$recorder_script" ${shQuote(target.namespace)} ${shQuote(jobName)} ${shQuote(proxySelector)} ${shQuote(String(profile.timeoutSeconds))} 1 >"$recorder_log" 2>&1 & + recorder_pid="$!" + stage_recorder="started" +fi +python3 - ${shQuote(jobName)} ${shQuote(target.namespace)} ${shQuote(target.id)} ${shQuote(runId)} ${shQuote(profile.id)} "$replaced" "$stage_recorder" "$recorder_pid" <<'PY' +import json, sys +job_name, namespace, target, run_id, profile, replaced, stage_recorder, recorder_pid = sys.argv[1:9] +print(json.dumps({ + "ok": True, + "jobName": job_name, + "namespace": namespace, + "target": target, + "runId": run_id, + "profile": profile, + "replaced": int(replaced), + "stageRecorder": stage_recorder, + "stageRecorderPid": recorder_pid or None, +}, ensure_ascii=False)) +PY +`; +} + +function stageProxyRecorderPython(): string { + return String.raw`from collections import Counter, defaultdict +import json +import subprocess +import sys +import time + +namespace, job_name, proxy_selector, timeout_raw, interval_raw = sys.argv[1:6] +deadline = time.time() + int(timeout_raw) + 180 +interval = max(1, int(interval_raw)) +annotation_key = "unidesk.ai/stage-proxy-evidence" +stage_by_container = { + "apk-add": "apk", + "npm-install": "npm", + "go-download": "go", + "git-mirror": "git-mirror", +} +container_by_stage = [ + ("apk-add", "apk"), + ("npm-install", "npm"), + ("go-download", "go"), + ("git-mirror", "git-mirror"), +] +stage_order = ["apk", "npm", "go", "git-mirror"] +previous = {} +evidence = {} +last_stage = None + +def utc_now(): + return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) + +def run(args, timeout=10): + return subprocess.run(["kubectl", "-n", namespace, *args], text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=timeout) + +def get_json(args, timeout=10): + result = run(args, timeout=timeout) + if result.returncode != 0: + return None + try: + return json.loads(result.stdout or "{}") + except Exception: + return None + +def number(value): + try: + return int(value) + except Exception: + return 0 + +def field(record, names, default=None): + if not isinstance(record, dict): + return default + for name in names: + value = record.get(name) + if value not in (None, ""): + return value + return default + +def metadata(conn): + value = conn.get("metadata") if isinstance(conn, dict) else None + return value if isinstance(value, dict) else {} + +def client_of(conn): + meta = metadata(conn) + value = field(meta, ["sourceIP", "source_ip", "source", "clientIP", "client_ip", "srcIP", "src_ip"]) + if value is None: + value = field(conn, ["sourceIP", "source_ip", "source"]) + text = str(value or "unknown") + if text.count(":") == 1 and text.rsplit(":", 1)[1].isdigit(): + text = text.rsplit(":", 1)[0] + return text + +def destination_of(conn): + meta = metadata(conn) + host = field(meta, ["host", "destinationIP", "destination_ip", "destination", "dstIP", "dst_ip"]) + port = field(meta, ["destinationPort", "destination_port", "dstPort", "dst_port"]) + if host is None: + host = field(conn, ["host", "destination"]) + if host is None: + return "-" + host_text = str(host) + return f"{host_text}:{port}" if port not in (None, "") else host_text + +def conn_id(conn): + value = field(conn, ["id", "uuid", "connId", "connectionId"]) + if value not in (None, ""): + return str(value) + meta = metadata(conn) + return "|".join([ + client_of(conn), + str(field(meta, ["sourcePort", "source_port"], "")), + destination_of(conn), + str(field(meta, ["network", "type"], "")), + ]) + +def transfer_total(conn): + upload = number(field(conn, ["upload", "up", "uploadBytes", "upload_bytes"], 0)) + download = number(field(conn, ["download", "down", "downloadBytes", "download_bytes"], 0)) + return upload + download + +def process_total(data): + if not isinstance(data, dict): + return 0 + upload = number(field(data, ["uploadTotal", "upload_total", "upload"], 0)) + download = number(field(data, ["downloadTotal", "download_total", "download"], 0)) + return upload + download + +def current_stage_and_pod(): + job = get_json(["get", "job", job_name, "-o", "json"]) + if not isinstance(job, dict): + return None, None, True + status = job.get("status") or {} + done = number(status.get("succeeded")) > 0 or number(status.get("failed")) > 0 + pods = get_json(["get", "pods", "-l", "job-name=" + job_name, "-o", "json"]) + items = pods.get("items", []) if isinstance(pods, dict) else [] + items.sort(key=lambda item: item.get("metadata", {}).get("creationTimestamp", "")) + if not items: + return None, None, done + pod = items[-1] + pod_ip = (pod.get("status") or {}).get("podIP") + raw_statuses = (pod.get("status") or {}).get("initContainerStatuses") or [] + statuses = {item.get("name"): item for item in raw_statuses if isinstance(item, dict)} + for container_name, stage in container_by_stage: + item = statuses.get(container_name) + if item is None: + return stage, pod_ip, done + state = item.get("state") or {} + terminated = state.get("terminated") or {} + if terminated and number(terminated.get("exitCode")) == 0: + continue + return stage, pod_ip, done + return None, pod_ip, done + +def proxy_connections(): + pods = get_json(["get", "pod", "-l", proxy_selector, "-o", "json"]) + items = pods.get("items", []) if isinstance(pods, dict) else [] + if not items: + return None + items.sort(key=lambda item: item.get("metadata", {}).get("creationTimestamp", "")) + pod_name = items[-1].get("metadata", {}).get("name") + if not pod_name: + return None + result = run(["exec", pod_name, "--", "sh", "-c", "if command -v wget >/dev/null 2>&1; then wget -qO- http://127.0.0.1:9090/connections; elif command -v curl >/dev/null 2>&1; then curl -fsS --max-time 3 http://127.0.0.1:9090/connections; else exit 127; fi"], timeout=8) + if result.returncode != 0: + return None + try: + parsed = json.loads(result.stdout or "{}") + except Exception: + return None + connections = parsed.get("connections") if isinstance(parsed, dict) else None + return {"data": parsed, "connections": connections if isinstance(connections, list) else []} + +def ensure_stage(stage): + if stage not in evidence: + evidence[stage] = { + "stage": stage, + "firstObservedAt": utc_now(), + "lastObservedAt": utc_now(), + "samples": 0, + "windowBytes": 0, + "maxRateBps": 0, + "proxyCumulativeBytes": 0, + "destBytes": Counter(), + "clientBytes": Counter(), + } + return evidence[stage] + +def compact_evidence(state): + stages = {} + for stage in stage_order: + item = evidence.get(stage) + if not item: + continue + top_destinations = [{"destination": name, "bytes": count} for name, count in item["destBytes"].most_common(3)] + top_clients = [{"client": name, "bytes": count} for name, count in item["clientBytes"].most_common(2)] + stages[stage] = { + "stage": stage, + "firstObservedAt": item["firstObservedAt"], + "lastObservedAt": item["lastObservedAt"], + "samples": item["samples"], + "windowBytes": item["windowBytes"], + "maxRateBps": item["maxRateBps"], + "proxyCumulativeBytes": item["proxyCumulativeBytes"], + "topDestination": top_destinations[0]["destination"] if top_destinations else "-", + "topClient": top_clients[0]["client"] if top_clients else "-", + "topDestinations": top_destinations, + "topClients": top_clients, + } + return {"version": 1, "state": state, "updatedAt": utc_now(), "stages": stages} + +def patch_evidence(state): + payload = json.dumps(compact_evidence(state), ensure_ascii=False, separators=(",", ":"), sort_keys=True) + result = run(["annotate", "job", job_name, f"{annotation_key}={payload}", "--overwrite"], timeout=8) + return result.returncode == 0 + +def sample_stage(stage, pod_ip, snapshot, elapsed): + global previous + current = {} + for conn in snapshot["connections"]: + current[conn_id(conn)] = { + "client": client_of(conn), + "destination": destination_of(conn), + "total": transfer_total(conn), + } + total_delta = 0 + stage_item = ensure_stage(stage) + for cid, curr in current.items(): + if pod_ip and curr["client"] != pod_ip: + continue + prev = previous.get(cid) + delta = max(0, curr["total"] - (prev["total"] if prev else curr["total"])) + if delta <= 0: + continue + total_delta += delta + stage_item["destBytes"][curr["destination"]] += delta + stage_item["clientBytes"][curr["client"]] += delta + if total_delta > 0: + stage_item["samples"] += 1 + stage_item["lastObservedAt"] = utc_now() + stage_item["windowBytes"] += total_delta + stage_item["maxRateBps"] = max(stage_item["maxRateBps"], int(total_delta / max(elapsed, interval, 1))) + stage_item["proxyCumulativeBytes"] = process_total(snapshot["data"]) + previous = current + +last_time = time.time() +last_patch = 0 +while time.time() < deadline: + stage, pod_ip, done = current_stage_and_pod() + snapshot = proxy_connections() + now = time.time() + elapsed = now - last_time + if stage != last_stage: + previous = {} + last_stage = stage + last_time = now + if done: + break + time.sleep(interval) + continue + if stage and snapshot: + sample_stage(stage, pod_ip, snapshot, elapsed) + last_time = now + if evidence and now - last_patch >= max(interval, 3): + patch_evidence("running" if not done else "completed") + last_patch = now + if done: + break + time.sleep(interval) + +if evidence: + patch_evidence("completed") `; } @@ -920,20 +1244,37 @@ meta = job.get("metadata", {}) status = job.get("status", {}) job_name = meta.get("name") or "-" labels = meta.get("labels", {}) +annotations = meta.get("annotations", {}) or {} pods_result = kubectl(["get", "pods", "-l", "job-name=" + job_name, "-o", "json"]) pods = json.loads(pods_result.stdout or "{}").get("items", []) if pods_result.returncode == 0 else [] pods.sort(key=lambda item: item.get("metadata", {}).get("creationTimestamp", "")) pod_name = pods[-1].get("metadata", {}).get("name") if pods else None waiting_reasons = [] container_names = [] +stage_by_container = {"apk-add": "apk", "npm-install": "npm", "go-download": "go", "git-mirror": "git-mirror"} +container_by_stage = [("apk-add", "apk"), ("npm-install", "npm"), ("go-download", "go"), ("git-mirror", "git-mirror")] +current_stage = "-" if pods: pod_status = pods[-1].get("status", {}) or {} status_groups = [] status_groups.extend((pod_status.get("initContainerStatuses") or [])) status_groups.extend((pod_status.get("containerStatuses") or [])) + init_statuses = {item.get("name"): item for item in (pod_status.get("initContainerStatuses") or []) if isinstance(item, dict)} + for container_name, stage_name in container_by_stage: + item = init_statuses.get(container_name) + if item is None: + current_stage = stage_name + break + state_record = item.get("state") or {} + terminated = state_record.get("terminated") or {} + if terminated and int(terminated.get("exitCode") or 0) == 0: + continue + current_stage = stage_name + break for container_status in status_groups: container_name = container_status.get("name") or "container" container_names.append(container_name) + stage_name = stage_by_container.get(container_name) image_name = container_status.get("image") or "-" state_record = container_status.get("state") or {} waiting = (state_record.get("waiting") or {}) @@ -977,6 +1318,36 @@ for line in reversed(full_logs.splitlines()): except Exception: match = None break + +def parse_stage_evidence(raw): + if not raw: + return [] + try: + parsed = json.loads(raw) + except Exception: + return [] + stages = parsed.get("stages") if isinstance(parsed, dict) else None + if not isinstance(stages, dict): + return [] + order = ["apk", "npm", "go", "git-mirror"] + rows = [] + for stage in order: + item = stages.get(stage) + if not isinstance(item, dict): + continue + rows.append({ + "stage": stage, + "topDestination": item.get("topDestination") or "-", + "topClient": item.get("topClient") or "-", + "windowBytes": int(item.get("windowBytes") or 0), + "maxRateBps": int(item.get("maxRateBps") or 0), + "proxyCumulativeBytes": int(item.get("proxyCumulativeBytes") or 0), + "firstObservedAt": item.get("firstObservedAt") or "-", + "lastObservedAt": item.get("lastObservedAt") or "-", + "samples": int(item.get("samples") or 0), + }) + return rows + conditions = status.get("conditions") or [] failed = any(item.get("type") == "Failed" and item.get("status") == "True" for item in conditions) succeeded = status.get("succeeded", 0) > 0 @@ -1017,9 +1388,11 @@ payload = { "jobName": job_name, "runId": labels.get("unidesk.ai/run-id") or "-", "profile": labels.get("unidesk.ai/benchmark-profile") or "-", + "currentStage": current_stage, "startedAt": status.get("startTime") or (match or {}).get("startedAt"), "completedAt": status.get("completionTime") or (match or {}).get("completedAt"), "result": match, + "stageEvidence": parse_stage_evidence(annotations.get(${shQuote(STAGE_PROXY_EVIDENCE_ANNOTATION)})), "failureFamily": failure_family, "logTail": "\\n".join(waiting_reasons + [logs])[-4000:], } @@ -1066,6 +1439,58 @@ function trafficSpec(target: Sub2ApiTargetConfig): EgressProxyTrafficSpec { }; } +function stageEvidenceList(value: unknown): StageProxyEvidence[] { + return arrayRecords(value).map((item) => ({ + stage: text(item.stage), + topDestination: text(item.topDestination), + topClient: text(item.topClient), + windowBytes: number(item.windowBytes), + maxRateBps: number(item.maxRateBps), + proxyCumulativeBytes: number(item.proxyCumulativeBytes), + firstObservedAt: text(item.firstObservedAt), + lastObservedAt: text(item.lastObservedAt), + samples: number(item.samples), + })).filter((item) => item.stage !== "-"); +} + +function stageEvidenceSummary(items: readonly StageProxyEvidence[]): string { + if (items.length === 0) return "-"; + const byStage = new Map(items.map((item) => [item.stage, item])); + const visible = REAL_DEPS_STAGE_ORDER + .map((stage) => byStage.get(stage)) + .filter((item): item is StageProxyEvidence => item !== undefined); + const git = byStage.get("git-mirror"); + const suffix = git === undefined ? "" : ` git=${compactDestination(git.topDestination)} ${bytes(git.windowBytes)}`; + return `${visible.length}/${REAL_DEPS_STAGE_ORDER.length}${suffix}`; +} + +function stageEvidenceSections(statuses: readonly TargetStatus[]): string[] { + const rows = statuses.flatMap((status) => status.stageEvidence.map((item) => [ + status.targetId, + item.stage, + item.topDestination, + item.topClient, + bytes(item.windowBytes), + rate(item.maxRateBps), + bytes(item.proxyCumulativeBytes), + String(item.samples), + item.lastObservedAt, + ])); + if (rows.length === 0) return []; + return [ + "", + "STAGE_PROXY_EVIDENCE", + ...table(["TARGET", "STAGE", "TOP_DEST", "TOP_CLIENT", "WINDOW", "MAX_RATE", "PROXY_CUM", "SAMPLES", "LAST_OBSERVED"], rows), + ]; +} + +function compactDestination(value: string): string { + if (value === "-" || value.length <= 28) return value; + const withoutPort = value.replace(/:443$/u, ""); + if (withoutPort.length <= 28) return withoutPort; + return `${withoutPort.slice(0, 25)}...`; +} + function normalizeStatus(plan: TargetPlan, parsed: unknown, result: CommandResult): TargetStatus { if (typeof parsed !== "object" || parsed === null) { const state = result.exitCode === 0 ? "transport-unparseable" : "transport-failed"; @@ -1087,6 +1512,8 @@ function normalizeStatus(plan: TargetPlan, parsed: unknown, result: CommandResul goMiB: null, gitMirrorMiB: null, realDepsMiB: null, + currentStage: "-", + stageEvidence: [], failureFamily: result.timedOut ? "transport-timeout" : state, logTail: (result.stderr || result.stdout).slice(-4000), }; @@ -1113,6 +1540,8 @@ function normalizeStatus(plan: TargetPlan, parsed: unknown, result: CommandResul goMiB: nullableNumber(jobResult.goMiB), gitMirrorMiB: nullableNumber(jobResult.gitMirrorMiB), realDepsMiB: nullableNumber(jobResult.realDepsMiB), + currentStage: text(data.currentStage), + stageEvidence: stageEvidenceList(data.stageEvidence), failureFamily: text(data.failureFamily, data.ok === true ? "none" : state === "running" || state === "pending" ? "in-progress" : text(data.reason, "unknown")), logTail: text(data.logTail, result.stderr.slice(-2000)), }; @@ -1138,6 +1567,8 @@ function blockedStatus(plan: TargetPlan, profile: string): TargetStatus { goMiB: null, gitMirrorMiB: null, realDepsMiB: null, + currentStage: "-", + stageEvidence: [], failureFamily: plan.blocker ?? "blocked", logTail: plan.detail ?? "", };