feat: preserve k3s proxy stage evidence

This commit is contained in:
Codex
2026-06-27 06:09:42 +00:00
parent 05fb46d40a
commit aec638441d
5 changed files with 451 additions and 16 deletions
+4 -3
View File
@@ -31,11 +31,11 @@ bun scripts/cli.ts platform-infra egress-proxy k3s-build-benchmark --targets D60
bun scripts/cli.ts platform-infra egress-proxy k3s-build-benchmark \
--targets D601,D518 --profile real-deps-500m --dry-run
# 启动 benchmarkfire-and-forget之后用 status/logs/traffic 短轮询
# 启动 benchmarkfire-and-forget同时启动 stage recorder 持久化阶段流量证据
bun scripts/cli.ts platform-infra egress-proxy k3s-build-benchmark \
--targets D601,D518 --profile real-deps-500m --confirm
# 状态表:看 APK/NPM/GO/GIT_MIRROR/REAL_DEPS、failure family 和 proxyserver 采样。
# 状态表:看 APK/NPM/GO/GIT_MIRROR/REAL_DEPS、STAGE_PROXY、failure family 和可选即时采样。
bun scripts/cli.ts platform-infra egress-proxy k3s-build-benchmark status \
--targets D601,D518 --profile real-deps-500m --traffic-sample-seconds 15
@@ -52,7 +52,7 @@ bun scripts/cli.ts platform-infra egress-proxy k3s-build-benchmark cleanup \
--targets D601,D518 --profile real-deps-500m --confirm
```
D601/D518 结果必须分表记录:`STATE`、Job/run、duration、`APK/NPM/GO/GIT_MIRROR/REAL_DEPS``TRAFFIC_WINDOW``TRAFFIC_RATE``PROXY_CUM``TOP_CLIENT``TOP_DEST``FAILURE`。D518 通过不代表 D601 通过;D601 只证明 k3s/containerd 走到 proxy 也不等于性能达标。未完成 500MiB+ 的真实 k3s image pull + apk/npm/go/git mirror 测试前,不关闭对应 issue,不合并标记为等待运行面验收的 PR。
D601/D518 结果必须分表记录:`STATE`、Job/run、duration、`APK/NPM/GO/GIT_MIRROR/REAL_DEPS``STAGE_PROXY``STAGE_PROXY_EVIDENCE``TRAFFIC_WINDOW``TRAFFIC_RATE``PROXY_CUM``TOP_CLIENT``TOP_DEST``FAILURE`。D518 通过不代表 D601 通过;D601 只证明 k3s/containerd 走到 proxy 也不等于性能达标。未完成 500MiB+ 的真实 k3s image pull + apk/npm/go/git mirror 测试前,不关闭对应 issue,不合并标记为等待运行面验收的 PR。
## Egress Proxy 运行面修复入口
@@ -70,6 +70,7 @@ D601 若需要让 `sub2api-egress-proxy` 绕开 pod overlay,可在 YAML 中显
- `TOP_CLIENT=10.42.0.1``TOP_DEST=registry-1.docker.io:443`k3s/containerd image pull 已从 proxyserver 视角可见。
- `TOP_DEST=dl-cdn.alpinelinux.org:443`Pod 内 `apk` 阶段已走 proxy。
- `registry.npmjs.org:443``proxy.golang.org:443``github.com:443`:分别对应 `npm install`、Go module 拉取和 Git mirror clone/sync。
- `STAGE_PROXY_EVIDENCE` 是 Job annotation 中持久化的阶段证据;最终 `succeeded` 后仍应保留 apk/npm/go/git-mirror 各阶段 top destination、window bytes、max rate 和 proxy cumulative。`TRAFFIC_*` 列只是 status 命令即时采样。
- `image-pull` failure 表示还卡在 kubelet/containerd 拉镜像;`apk-download``npm-download``go-download``git-mirror` 分别表示 Pod 内依赖阶段失败。
- proxy 窗口 `0 B/s` 但 active cumulative 增长很慢时,按性能不达标处理;先清理 Job,再继续查上游,不要让慢速 benchmark 长时间占用资源。
@@ -7,6 +7,7 @@ metadata:
- 1032
- 1048
- 1077
- 1110
profiles:
real-deps-500m:
+2 -2
View File
@@ -165,11 +165,11 @@ For an externally backed active target, client traffic reaches PK01 Caddy, PK01
When target-level `egressProxy.enabled=true`, the D601 target renders an in-cluster HTTP/mixed proxy client from the proxy source declared in YAML. The current mature external-egress shape is `sourceType: master-shadowsocks`: master Docker runs `shadowsocks-rust` from `config/platform-infra/sub2api-master-egress-proxy.compose.yaml`, while D601 runs `sing-box` to expose the ClusterIP proxy consumed by Sub2API and, when requested by YAML, the Codex account sentinel. A subscription-backed source is still just another YAML-declared source type; long-term prose must not duplicate the current endpoint, port, password, image tag, or health URL values from YAML/compose.
`platform-infra egress-proxy traffic --target <id> --sample-seconds <n>` is the proxyserver-side observation entry. It reads the sing-box Clash API through the proxy Pod loopback, reports current per-client rate plus bounded-window cumulative bytes, and includes proxy process cumulative bytes when sing-box reports them. Use this together with k3s CI/CD build benchmarks when proving proxy acceleration or diagnosing whether a workload actually traverses the proxy; client-side timings alone are not enough evidence.
`platform-infra egress-proxy traffic --target <id> --sample-seconds <n>` is the proxyserver-side observation entry. It reads the sing-box Clash API through the proxy Pod loopback, reports current per-client rate plus bounded-window cumulative bytes, and includes proxy process cumulative bytes when sing-box reports them. Use this together with k3s CI/CD build benchmarks when diagnosing whether a workload is currently traversing the proxy; client-side timings alone are not enough evidence.
The egress proxy Deployment may opt into `hostNetwork: true` per target via `config/platform-infra/sub2api.yaml` `targets[].egressProxy.hostNetwork`. When enabled, the manifest renders `hostNetwork: true`, `dnsPolicy: ClusterFirstWithHostNet`, and a RollingUpdate strategy of `maxSurge=0`/`maxUnavailable=1` so the sing-box client bypasses the pod overlay and connects the master upstream directly from the node network; this is the durable fix for a target whose pod-overlay path to the upstream is the throughput bottleneck. It is a per-target YAML decision, not a D601-only default: a target whose pod overlay is already fast enough must keep `hostNetwork: false`, and the `no-host-network` policy check only permits `hostNetwork: true` on the single YAML-declared egress proxy Deployment for a target whose `egressProxy.hostNetwork=true`. Do not generalize one target's hostNetwork experiment to other nodes, and do not leave a one-off `kubectl patch` as the final state; promote or demote hostNetwork only by editing the target YAML and running `platform-infra sub2api apply --target <id>`.
`platform-infra egress-proxy k3s-build-benchmark --targets <ids> --profile real-deps-500m` is the production-ready egress proxy throughput acceptance entry. The `real-deps-500m` profile in `config/platform-infra/egress-proxy-benchmarks.yaml` is the only acceptance profile: it renders one Job per target whose kubelet/containerd pulls remote `alpine`, `node` and `golang` images with `imagePullPolicy: Always`, then runs Pod-internal `apk add`, `npm install`, `go mod download` and `git clone --mirror` plus `remote update --prune` stages through the YAML-declared proxy env. Acceptance requires `STATE=succeeded`, `REAL_DEPS >= 500 MiB` (the profile's `realDeps.minProxyMiB`), image-pull plus apk/npm/go/git-mirror evidence, and a proxyserver-observed cumulative traffic above the profile minimum. Cloudflare synthetic downloads and curl-only probes are bypass diagnostics, never acceptance evidence. Status/logs/traffic are short-polled; a started benchmark is fire-and-forget and must be `cleanup`-ed when it stalls or after acceptance to release k3s resources. D601 and D518 must both pass the same profile: a single node passing does not close a cross-node proxy issue, and an optimization on one target must not regress the other.
`platform-infra egress-proxy k3s-build-benchmark --targets <ids> --profile real-deps-500m` is the production-ready egress proxy throughput acceptance entry. The `real-deps-500m` profile in `config/platform-infra/egress-proxy-benchmarks.yaml` is the only acceptance profile: it renders one Job per target whose kubelet/containerd pulls remote `alpine`, `node` and `golang` images with `imagePullPolicy: Always`, then runs Pod-internal `apk add`, `npm install`, `go mod download` and `git clone --mirror` plus `remote update --prune` stages through the YAML-declared proxy env. Acceptance requires `STATE=succeeded`, `REAL_DEPS >= 500 MiB` (the profile's `realDeps.minProxyMiB`), image-pull plus apk/npm/go/git-mirror evidence, and proxyserver-observed cumulative traffic above the profile minimum. The command starts a short-lived stage recorder with the benchmark Job; status reads its `STAGE_PROXY_EVIDENCE` from Job annotations so final `succeeded` rows can still show each dependency stage's top destination, window bytes, max rate and proxy cumulative even after the stage has ended. `TRAFFIC_*` columns are immediate diagnostics from the current status call, not the durable acceptance source. Cloudflare synthetic downloads and curl-only probes are bypass diagnostics, never acceptance evidence. Status/logs/traffic are short-polled; a started benchmark is fire-and-forget and must be `cleanup`-ed when it stalls or after acceptance to release k3s resources. D601 and D518 must both pass the same profile: a single node passing does not close a cross-node proxy issue, and an optimization on one target must not regress the other.
`platform-infra sub2api validate --target D601 --full` must prove the proxy Deployment/Service is ready and that an app pod can complete the YAML-declared health probe through the proxy. This target-level injection does not by itself bind manually created Sub2API accounts to that proxy; account tests and account-specific upstream transports still need a YAML-declared `manualAccounts.protected[].proxyBinding` when the account must avoid direct egress. Proxy credentials, subscription contents, and generated proxy configs are Secret material and must not be printed. If a direct D601-to-upstream TLS/SNI path is reset, do not leave a one-off plain HTTP CONNECT or JS proxy as the durable fix; use a mature encrypted proxy source, currently master `shadowsocks-rust` plus D601 `sing-box`, through YAML/compose.
@@ -18,6 +18,8 @@ If the Kubernetes image pull stage fails, the benchmark result is not an applica
`platform-infra egress-proxy k3s-build-benchmark` remains the single coordinator. It reads targets from `config/platform-infra/sub2api.yaml`, reads profiles from `config/platform-infra/egress-proxy-benchmarks.yaml`, renders one Job per target, and uses `trans <target.route> sh -- ...` as the bounded control path.
For `k3s-real-deps`, start also launches a short-lived stage recorder on the same k3s route. The recorder samples the target proxyserver through the proxy Pod loopback, associates traffic with the benchmark Pod IP and current dependency stage, and writes compact stage evidence into the benchmark Job annotation. It exits when the Job completes, fails, disappears, or reaches the benchmark deadline.
The `real-deps-500m` profile renders a multi-stage Kubernetes Job:
- `initContainer/apk-add`: image `docker.io/library/alpine:3.20`.
@@ -30,7 +32,7 @@ All dependency init containers receive the YAML-declared `sub2api-egress-proxy`
## Observability
The source of truth for traffic is `platform-infra egress-proxy traffic --target <node> --sample-seconds N`. Benchmark status may include this traffic sample. The final evidence table must include proxyserver window bytes/rate/cumulative bytes, top client, and top destination.
The source of truth for durable benchmark evidence is the `STAGE_PROXY_EVIDENCE` table rendered by `k3s-build-benchmark status/logs` from Job annotations. `platform-infra egress-proxy traffic --target <node> --sample-seconds N` remains an immediate diagnostic for the current proxyserver window. The final evidence table must include proxyserver window bytes/rate/cumulative bytes, top client, and top destination per dependency stage.
For image pull traffic, the observed proxy client may be the node/k3s/containerd path rather than the benchmark Pod IP. For `apk`, `npm`, `go`, and `git-mirror` stages, the observed proxy client should correspond to the benchmark Pod network path. This distinction must be preserved in issue evidence.
@@ -59,6 +61,6 @@ This benchmark must not:
- `bun scripts/cli.ts platform-infra egress-proxy k3s-build-benchmark --targets D601,D518 --profile real-deps-500m --dry-run` prints both node plans and the remote image set.
- `--confirm` creates one unique Job per node and returns immediately.
- `status --traffic-sample-seconds 15` reports Job state, `image-pull`/`apk`/`npm`/`go`/`git-mirror` failure family when applicable, and proxyserver traffic columns.
- D601 and D518 both have final rows with target, profile, job, state, duration, apk MiB, npm MiB, go MiB, git mirror MiB, proxy traffic window/rate/cumulative, top client, top destination, and failure family.
- `status --traffic-sample-seconds 15` reports Job state, `image-pull`/`apk`/`npm`/`go`/`git-mirror` failure family when applicable, durable `STAGE_PROXY_EVIDENCE`, and optional immediate proxyserver traffic columns.
- D601 and D518 both have final rows with target, profile, job, state, duration, apk MiB, npm MiB, go MiB, git mirror MiB, `STAGE_PROXY`, stage proxy window/rate/cumulative, top client, top destination, and failure family.
- Acceptance requires at least 500 MiB of proxyserver-observed traffic per successful node run. If a node cannot reach that point because image pull fails, the issue remains open until the k3s/containerd image pull proxy path is fixed or a blocker is explicitly documented.
@@ -11,6 +11,8 @@ import { resolveTarget } from "./platform-infra/manifest";
const BENCHMARK_CONFIG_PATH = "config/platform-infra/egress-proxy-benchmarks.yaml";
const BENCHMARK_APP = "unidesk-k3s-build-benchmark";
const STAGE_PROXY_EVIDENCE_ANNOTATION = "unidesk.ai/stage-proxy-evidence";
const REAL_DEPS_STAGE_ORDER = ["apk", "npm", "go", "git-mirror"];
type K3sBuildAction = "start" | "status" | "logs" | "cleanup";
type ImagePullPolicy = "Always" | "IfNotPresent" | "Never";
@@ -120,11 +122,25 @@ interface TargetStatus {
goMiB: number | null;
gitMirrorMiB: number | null;
realDepsMiB: number | null;
currentStage: string;
stageEvidence: readonly StageProxyEvidence[];
failureFamily: string;
logTail: string;
traffic?: TrafficSummary;
}
interface StageProxyEvidence {
stage: string;
topDestination: string;
topClient: string;
windowBytes: number;
maxRateBps: number;
proxyCumulativeBytes: number;
firstObservedAt: string;
lastObservedAt: string;
samples: number;
}
interface TrafficSummary {
ok: boolean;
reason: string;
@@ -249,7 +265,9 @@ function startRowDetail(row: TargetPlan & { started: boolean; state: string; job
if (!row.ok) return `${row.blocker}: ${row.detail}`;
if (row.started) {
const replaced = record(row.result).replaced;
return replaced === undefined || replaced === 0 ? "status/logs/traffic" : `status/logs/traffic replaced=${replaced}`;
const recorder = text(record(row.result).stageRecorder, "");
const prefix = replaced === undefined || replaced === 0 ? "status/logs/traffic" : `status/logs/traffic replaced=${replaced}`;
return recorder === "" || recorder === "disabled" ? prefix : `${prefix} stage-recorder=${recorder}`;
}
const result = record(row.result);
const stderr = text(result.stderrPreview, "");
@@ -275,6 +293,7 @@ function statusBenchmarks(plans: readonly TargetPlan[], options: K3sBuildBenchma
status.targetId,
status.profile,
status.state,
status.currentStage,
status.jobName,
status.durationSeconds === null ? "-" : `${status.durationSeconds}s`,
status.outputMiB === null ? "-" : `${status.outputMiB}MiB`,
@@ -284,6 +303,7 @@ function statusBenchmarks(plans: readonly TargetPlan[], options: K3sBuildBenchma
status.goMiB === null ? "-" : `${status.goMiB}MiB`,
status.gitMirrorMiB === null ? "-" : `${status.gitMirrorMiB}MiB`,
status.realDepsMiB === null ? "-" : `${status.realDepsMiB}MiB`,
stageEvidenceSummary(status.stageEvidence),
status.traffic === undefined ? "-" : bytes(status.traffic.windowBytes),
status.traffic === undefined ? "-" : rate(status.traffic.rateBps),
status.traffic === undefined ? "-" : bytes(status.traffic.processTotalBytes),
@@ -301,14 +321,15 @@ function statusBenchmarks(plans: readonly TargetPlan[], options: K3sBuildBenchma
renderedText: [
"PLATFORM-INFRA K3S BUILD BENCHMARK STATUS",
"",
...table(["TARGET", "PROFILE", "STATE", "JOB", "DURATION", "OUTPUT", "DOWNLOAD", "APK", "NPM", "GO", "GIT_MIRROR", "REAL_DEPS", "TRAFFIC_WINDOW", "TRAFFIC_RATE", "PROXY_CUM", "TOP_CLIENT", "TOP_DEST", "FAILURE"], rows),
...table(["TARGET", "PROFILE", "STATE", "STAGE", "JOB", "DURATION", "OUTPUT", "DOWNLOAD", "APK", "NPM", "GO", "GIT_MIRROR", "REAL_DEPS", "STAGE_PROXY", "TRAFFIC_WINDOW", "TRAFFIC_RATE", "PROXY_CUM", "TOP_CLIENT", "TOP_DEST", "FAILURE"], rows),
...stageEvidenceSections(statuses),
...logSections,
"",
"NEXT",
` bun scripts/cli.ts platform-infra egress-proxy k3s-build-benchmark status --targets ${statuses.map((status) => status.targetId).join(",")} --profile ${options.profile} --traffic-sample-seconds 15`,
` bun scripts/cli.ts platform-infra egress-proxy k3s-build-benchmark logs --targets ${statuses.map((status) => status.targetId).join(",")} --profile ${options.profile}`,
"",
"Disclosure: traffic columns are proxyserver-side samples only when --traffic-sample-seconds is set; Secret/proxy values are redacted.",
"Disclosure: traffic columns are proxyserver-side samples only when --traffic-sample-seconds is set; STAGE_PROXY_EVIDENCE is persisted by the benchmark stage recorder in Job annotations; Secret/proxy values are redacted.",
].join("\n"),
};
}
@@ -669,10 +690,28 @@ if ! command -v git >/dev/null 2>&1; then
apk add --no-cache git ca-certificates
fi
git --version
git -c "http.proxy=\${HTTP_PROXY:-}" -c "https.proxy=\${HTTPS_PROXY:-}" -c http.lowSpeedLimit=1024 -c http.lowSpeedTime=120 clone --mirror --filter=blob:none "$GIT_MIRROR_REMOTE" /work/git-mirror/repo.git
git -c "http.proxy=\${HTTP_PROXY:-}" -c "https.proxy=\${HTTPS_PROXY:-}" --git-dir=/work/git-mirror/repo.git remote update --prune
git -c "http.proxy=\${HTTP_PROXY:-}" -c "https.proxy=\${HTTPS_PROXY:-}" --git-dir=/work/git-mirror/repo.git remote -v
git -c "http.proxy=\${HTTP_PROXY:-}" -c "https.proxy=\${HTTPS_PROXY:-}" ls-remote --heads "$GIT_MIRROR_REMOTE" >/work/git-mirror/ls-remote-heads.txt
git_proxy() {
git -c "http.proxy=\${HTTP_PROXY:-}" -c "https.proxy=\${HTTPS_PROXY:-}" -c http.version=HTTP/1.1 -c http.lowSpeedLimit=1024 -c http.lowSpeedTime=120 "$@"
}
attempt=1
git_ok=0
while [ "$attempt" -le 3 ]; do
rm -rf /work/git-mirror/repo.git /work/git-mirror/ls-remote-heads.txt
if git_proxy clone --mirror --filter=blob:none "$GIT_MIRROR_REMOTE" /work/git-mirror/repo.git \\
&& git_proxy --git-dir=/work/git-mirror/repo.git remote update --prune \\
&& git_proxy --git-dir=/work/git-mirror/repo.git remote -v \\
&& git_proxy ls-remote --heads "$GIT_MIRROR_REMOTE" >/work/git-mirror/ls-remote-heads.txt; then
git_ok=1
break
fi
echo "git-mirror-attempt-failed attempt=$attempt remote=$GIT_MIRROR_REMOTE" >&2
attempt=$((attempt + 1))
sleep $((attempt * 5))
done
if [ "$git_ok" != "1" ]; then
echo "git-mirror-failed-after-retries remote=$GIT_MIRROR_REMOTE" >&2
exit 44
fi
git_mib="$(du -sk /work/git-mirror/repo.git /work/git-mirror/ls-remote-heads.txt 2>/dev/null | awk '{s+=$1} END {printf "%d", int((s+1023)/1024)}')"
printf 'gitMirrorMiB=%s\\n' "$git_mib" > /work/stages/git-mirror.env
printf 'UNIDESK_K3S_REAL_DEPS_STAGE {"stage":"git-mirror","ok":true,"image":"%s","remote":"%s","mirrorMiB":%s}\\n' "$GIT_MIRROR_IMAGE" "$GIT_MIRROR_REMOTE" "$git_mib"
@@ -855,6 +894,9 @@ function benchmarkLabels(target: Sub2ApiTargetConfig, profile: K3sBuildBenchmark
function startScript(manifest: Record<string, unknown>, target: Sub2ApiTargetConfig, profile: K3sBuildBenchmarkProfile, runId: string, jobName: string): string {
const yaml = `${JSON.stringify(manifest, null, 2)}\n`;
const encoded = Buffer.from(yaml, "utf8").toString("base64");
const proxy = target.egressProxy;
const proxySelector = proxy === null ? "" : `app.kubernetes.io/name=${proxy.deploymentName},app.kubernetes.io/component=egress-proxy`;
const recorderEnabled = profile.workload === "k3s-real-deps" && proxy !== null;
return `
set -eu
tmp="$(mktemp -d)"
@@ -872,7 +914,289 @@ if [ "$replaced" != "0" ]; then
kubectl -n ${shQuote(target.namespace)} delete jobs -l "$selector" --ignore-not-found >/dev/null 2>&1
fi
kubectl apply -f "$manifest" >/dev/null
printf '{"ok":true,"jobName":"%s","namespace":"%s","target":"%s","runId":"%s","profile":"%s","replaced":%s}\\n' ${shQuote(jobName)} ${shQuote(target.namespace)} ${shQuote(target.id)} ${shQuote(runId)} ${shQuote(profile.id)} "$replaced"
stage_recorder="disabled"
recorder_pid=""
if [ ${recorderEnabled ? "1" : "0"} = 1 ]; then
recorder_dir="/tmp/unidesk-k3s-build-benchmark"
mkdir -p "$recorder_dir"
recorder_script="$recorder_dir/${jobName}.stage-recorder.py"
recorder_log="$recorder_dir/${jobName}.stage-recorder.log"
cat > "$recorder_script" <<'PY'
${stageProxyRecorderPython()}
PY
nohup python3 "$recorder_script" ${shQuote(target.namespace)} ${shQuote(jobName)} ${shQuote(proxySelector)} ${shQuote(String(profile.timeoutSeconds))} 1 >"$recorder_log" 2>&1 &
recorder_pid="$!"
stage_recorder="started"
fi
python3 - ${shQuote(jobName)} ${shQuote(target.namespace)} ${shQuote(target.id)} ${shQuote(runId)} ${shQuote(profile.id)} "$replaced" "$stage_recorder" "$recorder_pid" <<'PY'
import json, sys
job_name, namespace, target, run_id, profile, replaced, stage_recorder, recorder_pid = sys.argv[1:9]
print(json.dumps({
"ok": True,
"jobName": job_name,
"namespace": namespace,
"target": target,
"runId": run_id,
"profile": profile,
"replaced": int(replaced),
"stageRecorder": stage_recorder,
"stageRecorderPid": recorder_pid or None,
}, ensure_ascii=False))
PY
`;
}
function stageProxyRecorderPython(): string {
return String.raw`from collections import Counter, defaultdict
import json
import subprocess
import sys
import time
namespace, job_name, proxy_selector, timeout_raw, interval_raw = sys.argv[1:6]
deadline = time.time() + int(timeout_raw) + 180
interval = max(1, int(interval_raw))
annotation_key = "unidesk.ai/stage-proxy-evidence"
stage_by_container = {
"apk-add": "apk",
"npm-install": "npm",
"go-download": "go",
"git-mirror": "git-mirror",
}
container_by_stage = [
("apk-add", "apk"),
("npm-install", "npm"),
("go-download", "go"),
("git-mirror", "git-mirror"),
]
stage_order = ["apk", "npm", "go", "git-mirror"]
previous = {}
evidence = {}
last_stage = None
def utc_now():
return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
def run(args, timeout=10):
return subprocess.run(["kubectl", "-n", namespace, *args], text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=timeout)
def get_json(args, timeout=10):
result = run(args, timeout=timeout)
if result.returncode != 0:
return None
try:
return json.loads(result.stdout or "{}")
except Exception:
return None
def number(value):
try:
return int(value)
except Exception:
return 0
def field(record, names, default=None):
if not isinstance(record, dict):
return default
for name in names:
value = record.get(name)
if value not in (None, ""):
return value
return default
def metadata(conn):
value = conn.get("metadata") if isinstance(conn, dict) else None
return value if isinstance(value, dict) else {}
def client_of(conn):
meta = metadata(conn)
value = field(meta, ["sourceIP", "source_ip", "source", "clientIP", "client_ip", "srcIP", "src_ip"])
if value is None:
value = field(conn, ["sourceIP", "source_ip", "source"])
text = str(value or "unknown")
if text.count(":") == 1 and text.rsplit(":", 1)[1].isdigit():
text = text.rsplit(":", 1)[0]
return text
def destination_of(conn):
meta = metadata(conn)
host = field(meta, ["host", "destinationIP", "destination_ip", "destination", "dstIP", "dst_ip"])
port = field(meta, ["destinationPort", "destination_port", "dstPort", "dst_port"])
if host is None:
host = field(conn, ["host", "destination"])
if host is None:
return "-"
host_text = str(host)
return f"{host_text}:{port}" if port not in (None, "") else host_text
def conn_id(conn):
value = field(conn, ["id", "uuid", "connId", "connectionId"])
if value not in (None, ""):
return str(value)
meta = metadata(conn)
return "|".join([
client_of(conn),
str(field(meta, ["sourcePort", "source_port"], "")),
destination_of(conn),
str(field(meta, ["network", "type"], "")),
])
def transfer_total(conn):
upload = number(field(conn, ["upload", "up", "uploadBytes", "upload_bytes"], 0))
download = number(field(conn, ["download", "down", "downloadBytes", "download_bytes"], 0))
return upload + download
def process_total(data):
if not isinstance(data, dict):
return 0
upload = number(field(data, ["uploadTotal", "upload_total", "upload"], 0))
download = number(field(data, ["downloadTotal", "download_total", "download"], 0))
return upload + download
def current_stage_and_pod():
job = get_json(["get", "job", job_name, "-o", "json"])
if not isinstance(job, dict):
return None, None, True
status = job.get("status") or {}
done = number(status.get("succeeded")) > 0 or number(status.get("failed")) > 0
pods = get_json(["get", "pods", "-l", "job-name=" + job_name, "-o", "json"])
items = pods.get("items", []) if isinstance(pods, dict) else []
items.sort(key=lambda item: item.get("metadata", {}).get("creationTimestamp", ""))
if not items:
return None, None, done
pod = items[-1]
pod_ip = (pod.get("status") or {}).get("podIP")
raw_statuses = (pod.get("status") or {}).get("initContainerStatuses") or []
statuses = {item.get("name"): item for item in raw_statuses if isinstance(item, dict)}
for container_name, stage in container_by_stage:
item = statuses.get(container_name)
if item is None:
return stage, pod_ip, done
state = item.get("state") or {}
terminated = state.get("terminated") or {}
if terminated and number(terminated.get("exitCode")) == 0:
continue
return stage, pod_ip, done
return None, pod_ip, done
def proxy_connections():
pods = get_json(["get", "pod", "-l", proxy_selector, "-o", "json"])
items = pods.get("items", []) if isinstance(pods, dict) else []
if not items:
return None
items.sort(key=lambda item: item.get("metadata", {}).get("creationTimestamp", ""))
pod_name = items[-1].get("metadata", {}).get("name")
if not pod_name:
return None
result = run(["exec", pod_name, "--", "sh", "-c", "if command -v wget >/dev/null 2>&1; then wget -qO- http://127.0.0.1:9090/connections; elif command -v curl >/dev/null 2>&1; then curl -fsS --max-time 3 http://127.0.0.1:9090/connections; else exit 127; fi"], timeout=8)
if result.returncode != 0:
return None
try:
parsed = json.loads(result.stdout or "{}")
except Exception:
return None
connections = parsed.get("connections") if isinstance(parsed, dict) else None
return {"data": parsed, "connections": connections if isinstance(connections, list) else []}
def ensure_stage(stage):
if stage not in evidence:
evidence[stage] = {
"stage": stage,
"firstObservedAt": utc_now(),
"lastObservedAt": utc_now(),
"samples": 0,
"windowBytes": 0,
"maxRateBps": 0,
"proxyCumulativeBytes": 0,
"destBytes": Counter(),
"clientBytes": Counter(),
}
return evidence[stage]
def compact_evidence(state):
stages = {}
for stage in stage_order:
item = evidence.get(stage)
if not item:
continue
top_destinations = [{"destination": name, "bytes": count} for name, count in item["destBytes"].most_common(3)]
top_clients = [{"client": name, "bytes": count} for name, count in item["clientBytes"].most_common(2)]
stages[stage] = {
"stage": stage,
"firstObservedAt": item["firstObservedAt"],
"lastObservedAt": item["lastObservedAt"],
"samples": item["samples"],
"windowBytes": item["windowBytes"],
"maxRateBps": item["maxRateBps"],
"proxyCumulativeBytes": item["proxyCumulativeBytes"],
"topDestination": top_destinations[0]["destination"] if top_destinations else "-",
"topClient": top_clients[0]["client"] if top_clients else "-",
"topDestinations": top_destinations,
"topClients": top_clients,
}
return {"version": 1, "state": state, "updatedAt": utc_now(), "stages": stages}
def patch_evidence(state):
payload = json.dumps(compact_evidence(state), ensure_ascii=False, separators=(",", ":"), sort_keys=True)
result = run(["annotate", "job", job_name, f"{annotation_key}={payload}", "--overwrite"], timeout=8)
return result.returncode == 0
def sample_stage(stage, pod_ip, snapshot, elapsed):
global previous
current = {}
for conn in snapshot["connections"]:
current[conn_id(conn)] = {
"client": client_of(conn),
"destination": destination_of(conn),
"total": transfer_total(conn),
}
total_delta = 0
stage_item = ensure_stage(stage)
for cid, curr in current.items():
if pod_ip and curr["client"] != pod_ip:
continue
prev = previous.get(cid)
delta = max(0, curr["total"] - (prev["total"] if prev else curr["total"]))
if delta <= 0:
continue
total_delta += delta
stage_item["destBytes"][curr["destination"]] += delta
stage_item["clientBytes"][curr["client"]] += delta
if total_delta > 0:
stage_item["samples"] += 1
stage_item["lastObservedAt"] = utc_now()
stage_item["windowBytes"] += total_delta
stage_item["maxRateBps"] = max(stage_item["maxRateBps"], int(total_delta / max(elapsed, interval, 1)))
stage_item["proxyCumulativeBytes"] = process_total(snapshot["data"])
previous = current
last_time = time.time()
last_patch = 0
while time.time() < deadline:
stage, pod_ip, done = current_stage_and_pod()
snapshot = proxy_connections()
now = time.time()
elapsed = now - last_time
if stage != last_stage:
previous = {}
last_stage = stage
last_time = now
if done:
break
time.sleep(interval)
continue
if stage and snapshot:
sample_stage(stage, pod_ip, snapshot, elapsed)
last_time = now
if evidence and now - last_patch >= max(interval, 3):
patch_evidence("running" if not done else "completed")
last_patch = now
if done:
break
time.sleep(interval)
if evidence:
patch_evidence("completed")
`;
}
@@ -920,20 +1244,37 @@ meta = job.get("metadata", {})
status = job.get("status", {})
job_name = meta.get("name") or "-"
labels = meta.get("labels", {})
annotations = meta.get("annotations", {}) or {}
pods_result = kubectl(["get", "pods", "-l", "job-name=" + job_name, "-o", "json"])
pods = json.loads(pods_result.stdout or "{}").get("items", []) if pods_result.returncode == 0 else []
pods.sort(key=lambda item: item.get("metadata", {}).get("creationTimestamp", ""))
pod_name = pods[-1].get("metadata", {}).get("name") if pods else None
waiting_reasons = []
container_names = []
stage_by_container = {"apk-add": "apk", "npm-install": "npm", "go-download": "go", "git-mirror": "git-mirror"}
container_by_stage = [("apk-add", "apk"), ("npm-install", "npm"), ("go-download", "go"), ("git-mirror", "git-mirror")]
current_stage = "-"
if pods:
pod_status = pods[-1].get("status", {}) or {}
status_groups = []
status_groups.extend((pod_status.get("initContainerStatuses") or []))
status_groups.extend((pod_status.get("containerStatuses") or []))
init_statuses = {item.get("name"): item for item in (pod_status.get("initContainerStatuses") or []) if isinstance(item, dict)}
for container_name, stage_name in container_by_stage:
item = init_statuses.get(container_name)
if item is None:
current_stage = stage_name
break
state_record = item.get("state") or {}
terminated = state_record.get("terminated") or {}
if terminated and int(terminated.get("exitCode") or 0) == 0:
continue
current_stage = stage_name
break
for container_status in status_groups:
container_name = container_status.get("name") or "container"
container_names.append(container_name)
stage_name = stage_by_container.get(container_name)
image_name = container_status.get("image") or "-"
state_record = container_status.get("state") or {}
waiting = (state_record.get("waiting") or {})
@@ -977,6 +1318,36 @@ for line in reversed(full_logs.splitlines()):
except Exception:
match = None
break
def parse_stage_evidence(raw):
if not raw:
return []
try:
parsed = json.loads(raw)
except Exception:
return []
stages = parsed.get("stages") if isinstance(parsed, dict) else None
if not isinstance(stages, dict):
return []
order = ["apk", "npm", "go", "git-mirror"]
rows = []
for stage in order:
item = stages.get(stage)
if not isinstance(item, dict):
continue
rows.append({
"stage": stage,
"topDestination": item.get("topDestination") or "-",
"topClient": item.get("topClient") or "-",
"windowBytes": int(item.get("windowBytes") or 0),
"maxRateBps": int(item.get("maxRateBps") or 0),
"proxyCumulativeBytes": int(item.get("proxyCumulativeBytes") or 0),
"firstObservedAt": item.get("firstObservedAt") or "-",
"lastObservedAt": item.get("lastObservedAt") or "-",
"samples": int(item.get("samples") or 0),
})
return rows
conditions = status.get("conditions") or []
failed = any(item.get("type") == "Failed" and item.get("status") == "True" for item in conditions)
succeeded = status.get("succeeded", 0) > 0
@@ -1017,9 +1388,11 @@ payload = {
"jobName": job_name,
"runId": labels.get("unidesk.ai/run-id") or "-",
"profile": labels.get("unidesk.ai/benchmark-profile") or "-",
"currentStage": current_stage,
"startedAt": status.get("startTime") or (match or {}).get("startedAt"),
"completedAt": status.get("completionTime") or (match or {}).get("completedAt"),
"result": match,
"stageEvidence": parse_stage_evidence(annotations.get(${shQuote(STAGE_PROXY_EVIDENCE_ANNOTATION)})),
"failureFamily": failure_family,
"logTail": "\\n".join(waiting_reasons + [logs])[-4000:],
}
@@ -1066,6 +1439,58 @@ function trafficSpec(target: Sub2ApiTargetConfig): EgressProxyTrafficSpec {
};
}
function stageEvidenceList(value: unknown): StageProxyEvidence[] {
return arrayRecords(value).map((item) => ({
stage: text(item.stage),
topDestination: text(item.topDestination),
topClient: text(item.topClient),
windowBytes: number(item.windowBytes),
maxRateBps: number(item.maxRateBps),
proxyCumulativeBytes: number(item.proxyCumulativeBytes),
firstObservedAt: text(item.firstObservedAt),
lastObservedAt: text(item.lastObservedAt),
samples: number(item.samples),
})).filter((item) => item.stage !== "-");
}
function stageEvidenceSummary(items: readonly StageProxyEvidence[]): string {
if (items.length === 0) return "-";
const byStage = new Map(items.map((item) => [item.stage, item]));
const visible = REAL_DEPS_STAGE_ORDER
.map((stage) => byStage.get(stage))
.filter((item): item is StageProxyEvidence => item !== undefined);
const git = byStage.get("git-mirror");
const suffix = git === undefined ? "" : ` git=${compactDestination(git.topDestination)} ${bytes(git.windowBytes)}`;
return `${visible.length}/${REAL_DEPS_STAGE_ORDER.length}${suffix}`;
}
function stageEvidenceSections(statuses: readonly TargetStatus[]): string[] {
const rows = statuses.flatMap((status) => status.stageEvidence.map((item) => [
status.targetId,
item.stage,
item.topDestination,
item.topClient,
bytes(item.windowBytes),
rate(item.maxRateBps),
bytes(item.proxyCumulativeBytes),
String(item.samples),
item.lastObservedAt,
]));
if (rows.length === 0) return [];
return [
"",
"STAGE_PROXY_EVIDENCE",
...table(["TARGET", "STAGE", "TOP_DEST", "TOP_CLIENT", "WINDOW", "MAX_RATE", "PROXY_CUM", "SAMPLES", "LAST_OBSERVED"], rows),
];
}
function compactDestination(value: string): string {
if (value === "-" || value.length <= 28) return value;
const withoutPort = value.replace(/:443$/u, "");
if (withoutPort.length <= 28) return withoutPort;
return `${withoutPort.slice(0, 25)}...`;
}
function normalizeStatus(plan: TargetPlan, parsed: unknown, result: CommandResult): TargetStatus {
if (typeof parsed !== "object" || parsed === null) {
const state = result.exitCode === 0 ? "transport-unparseable" : "transport-failed";
@@ -1087,6 +1512,8 @@ function normalizeStatus(plan: TargetPlan, parsed: unknown, result: CommandResul
goMiB: null,
gitMirrorMiB: null,
realDepsMiB: null,
currentStage: "-",
stageEvidence: [],
failureFamily: result.timedOut ? "transport-timeout" : state,
logTail: (result.stderr || result.stdout).slice(-4000),
};
@@ -1113,6 +1540,8 @@ function normalizeStatus(plan: TargetPlan, parsed: unknown, result: CommandResul
goMiB: nullableNumber(jobResult.goMiB),
gitMirrorMiB: nullableNumber(jobResult.gitMirrorMiB),
realDepsMiB: nullableNumber(jobResult.realDepsMiB),
currentStage: text(data.currentStage),
stageEvidence: stageEvidenceList(data.stageEvidence),
failureFamily: text(data.failureFamily, data.ok === true ? "none" : state === "running" || state === "pending" ? "in-progress" : text(data.reason, "unknown")),
logTail: text(data.logTail, result.stderr.slice(-2000)),
};
@@ -1138,6 +1567,8 @@ function blockedStatus(plan: TargetPlan, profile: string): TargetStatus {
goMiB: null,
gitMirrorMiB: null,
realDepsMiB: null,
currentStage: "-",
stageEvidence: [],
failureFamily: plan.blocker ?? "blocked",
logTail: plan.detail ?? "",
};