feat: add egress proxy traffic sampling

This commit is contained in:
Codex
2026-06-26 12:11:38 +00:00
parent 8dd1e336aa
commit a75c2da56b
7 changed files with 496 additions and 5 deletions
@@ -118,7 +118,7 @@ bun scripts/cli.ts hwlab nodes control-plane runtime-image preload --node G14 --
G14 v0.3 的 Tekton/BuildKit base image 也走 `config/hwlab-node-lanes.yaml``baseImageSource` 是公开来源,`baseImage` 是 node-local registry 目标。缺失 base image 时先用 `runtime-image status` 判断 `registryTagPresent`,再用 `preload --confirm` seed;不要手工 `docker tag/push``trigger-current` 后若 PipelineRun 已越过 base image 阶段但卡在某个 service build task,按 TaskRun 单独提 issue/修复,不把它并回 base-image preload 问题。长期边界见 `docs/reference/g14.md`
D601/v03 env-reuse service build task 失败时,先看 `build-<service>` TaskRun 的 `step-publish` 日志;Debian apt、npm、Go module 等外部依赖下载通过 lane YAML 注入 egress proxy 后可能出现 502、reset 或超时。先用 `platform-infra sub2api status|validate` 区分 proxy 整体故障和单个上游 transientproxy 健康但单次下载 transient 时可以受控 `trigger-current --rerun`,重复失败应修 HWLAB `scripts/artifact-publish.mjs` / envRecipe 的有限 retry 后合并发布,不手工 patch pod 或裸删 PipelineRun。若 Pod 内 unset `HTTP_PROXY/HTTPS_PROXY/ALL_PROXY` 后外部 registry/DNS 不可达,说明该 lane 的外部依赖下载依赖 egress proxy;此时 npm/Bun retry 只能降噪,根因仍是 proxy upstream 或 catalog/plan 误触发的过量 build。
D601/v03 env-reuse service build task 失败时,先看 `build-<service>` TaskRun 的 `step-publish` 日志;Debian apt、npm、Go module 等外部依赖下载通过 lane YAML 注入 egress proxy 后可能出现 502、reset 或超时。先用 `platform-infra sub2api status|validate` 区分 proxy 整体故障和单个上游 transientproxy 健康但单次下载 transient 时可以受控 `trigger-current --rerun`,重复失败应修 HWLAB `scripts/artifact-publish.mjs` / envRecipe 的有限 retry 后合并发布,不手工 patch pod 或裸删 PipelineRun。若 Pod 内 unset `HTTP_PROXY/HTTPS_PROXY/ALL_PROXY` 后外部 registry/DNS 不可达,说明该 lane 的外部依赖下载依赖 egress proxy;此时 npm/Bun retry 只能降噪,根因仍是 proxy upstream 或 catalog/plan 误触发的过量 build。凡是为证明 proxy 加速 CI/CD 而跑测速,必须同时采集 `platform-infra egress-proxy traffic --target <id>` 的 proxyserver 侧每客户端速率和窗口累计流量;只贴 PipelineRun 总耗时或 client-side benchmark 不能证明 workload 确实走了 proxy。
---
+2
View File
@@ -165,6 +165,8 @@ For an externally backed active target, client traffic reaches PK01 Caddy, PK01
When target-level `egressProxy.enabled=true`, the D601 target renders an in-cluster HTTP/mixed proxy client from the proxy source declared in YAML. The current mature external-egress shape is `sourceType: master-shadowsocks`: master Docker runs `shadowsocks-rust` from `config/platform-infra/sub2api-master-egress-proxy.compose.yaml`, while D601 runs `sing-box` to expose the ClusterIP proxy consumed by Sub2API and, when requested by YAML, the Codex account sentinel. A subscription-backed source is still just another YAML-declared source type; long-term prose must not duplicate the current endpoint, port, password, image tag, or health URL values from YAML/compose.
`platform-infra egress-proxy traffic --target <id> --sample-seconds <n>` is the proxyserver-side observation entry. It reads the sing-box Clash API through the proxy Pod loopback, reports current per-client rate plus bounded-window cumulative bytes, and includes proxy process cumulative bytes when sing-box reports them. Use this together with k3s CI/CD build benchmarks when proving proxy acceleration or diagnosing whether a workload actually traverses the proxy; client-side timings alone are not enough evidence.
`platform-infra sub2api validate --target D601 --full` must prove the proxy Deployment/Service is ready and that an app pod can complete the YAML-declared health probe through the proxy. This target-level injection does not by itself bind manually created Sub2API accounts to that proxy; account tests and account-specific upstream transports still need a YAML-declared `manualAccounts.protected[].proxyBinding` when the account must avoid direct egress. Proxy credentials, subscription contents, and generated proxy configs are Secret material and must not be printed. If a direct D601-to-upstream TLS/SNI path is reset, do not leave a one-off plain HTTP CONNECT or JS proxy as the durable fix; use a mature encrypted proxy source, currently master `shadowsocks-rust` plus D601 `sing-box`, through YAML/compose.
Adding, removing, exposing, validating, and configuring local Codex consumers are daily operations covered by `$unidesk-sub2api`. The development rule is that ordinary pool membership changes stay YAML-only and do not add code or CI/CD. Code changes are only appropriate when UniDesk needs to render or validate a Sub2API capability that already exists upstream, such as account-level WebSocket mode or per-account upstream User-Agent. If Sub2API itself does not support a desired behavior, do not magic-patch it through UniDesk scripts, Kubernetes hotfixes, local forks, or hidden compatibility paths; either leave the behavior unsupported or pursue it upstream as an explicit Sub2API feature.
+322
View File
@@ -0,0 +1,322 @@
import type { CommandResult } from "./command";
export interface EgressProxyTrafficSpec {
scope: "platform-infra";
targetId: string;
route: string;
namespace: string;
deploymentName: string;
serviceName: string;
port: number;
sourceType: string;
sourceRef: string;
sourceConfigRef: string | null;
sourceFingerprint: string | null;
}
export function egressProxyTrafficScript(spec: EgressProxyTrafficSpec, sampleSeconds: number): string {
return `
set +e
namespace=${shQuote(spec.namespace)}
deployment=${shQuote(spec.deploymentName)}
service_name=${shQuote(spec.serviceName)}
service_port=${shQuote(String(spec.port))}
target_id=${shQuote(spec.targetId)}
sample_seconds=${shQuote(String(sampleSeconds))}
source_type=${shQuote(spec.sourceType)}
source_ref=${shQuote(spec.sourceRef)}
source_config_ref=${shQuote(spec.sourceConfigRef ?? "")}
source_fingerprint=${shQuote(spec.sourceFingerprint ?? "")}
selector="app.kubernetes.io/name=$deployment,app.kubernetes.io/component=egress-proxy"
pod="$(kubectl -n "$namespace" get pod -l "$selector" -o jsonpath='{.items[0].metadata.name}' 2>/dev/null)"
if [ -z "$pod" ]; then
python3 - "$target_id" "$namespace" "$deployment" "$service_name" "$service_port" "$source_type" "$source_ref" "$source_config_ref" "$source_fingerprint" <<'PY'
import json, sys
target, namespace, deployment, service_name, service_port, source_type, source_ref, source_config_ref, source_fingerprint = sys.argv[1:10]
print(json.dumps({
"ok": False,
"reason": "egress-proxy-pod-missing",
"target": target,
"namespace": namespace,
"deployment": deployment,
"serviceName": service_name,
"servicePort": int(service_port),
"source": {"sourceType": source_type, "sourceRef": source_ref, "sourceConfigRef": source_config_ref or None, "sourceFingerprint": source_fingerprint or None, "valuesPrinted": False},
"next": {"status": f"bun scripts/cli.ts platform-infra sub2api status --target {target} --full"},
}, ensure_ascii=False))
PY
exit 0
fi
tmp="$(mktemp -d)"
trap 'rm -rf "$tmp"' EXIT
fetch_one() {
path="$1"
kubectl -n "$namespace" exec "$pod" -- sh -c 'if command -v wget >/dev/null 2>&1; then wget -qO- http://127.0.0.1:9090/connections; elif command -v curl >/dev/null 2>&1; then curl -fsS --max-time 3 http://127.0.0.1:9090/connections; else exit 127; fi' >"$path" 2>"$path.err"
return "$?"
}
idx=0
while [ "$idx" -le "$sample_seconds" ]; do
snapshot="$tmp/snapshot-$idx.json"
date +%s%3N >"$tmp/snapshot-$idx.ms" 2>/dev/null || python3 - <<'PY' >"$tmp/snapshot-$idx.ms"
import time
print(int(time.time() * 1000))
PY
fetch_one "$snapshot"
printf '%s' "$?" >"$tmp/snapshot-$idx.rc"
if [ "$idx" -lt "$sample_seconds" ]; then sleep 1; fi
idx=$((idx + 1))
done
python3 - "$tmp" "$target_id" "$namespace" "$deployment" "$pod" "$service_name" "$service_port" "$sample_seconds" "$source_type" "$source_ref" "$source_config_ref" "$source_fingerprint" <<'PY'
from collections import Counter, defaultdict
import glob
import json
import pathlib
import sys
tmp = pathlib.Path(sys.argv[1])
target, namespace, deployment, pod, service_name, service_port = sys.argv[2:8]
sample_seconds = int(sys.argv[8])
source_type, source_ref, source_config_ref, source_fingerprint = sys.argv[9:13]
def read_text(path, limit=2000):
try:
return pathlib.Path(path).read_text(errors="replace")[-limit:]
except FileNotFoundError:
return ""
def number(value):
try:
return int(value)
except Exception:
return 0
def field(record, names, default=None):
if not isinstance(record, dict):
return default
for name in names:
value = record.get(name)
if value not in (None, ""):
return value
return default
def metadata(conn):
value = conn.get("metadata") if isinstance(conn, dict) else None
return value if isinstance(value, dict) else {}
def client_of(conn):
meta = metadata(conn)
value = field(meta, ["sourceIP", "source_ip", "source", "clientIP", "client_ip", "srcIP", "src_ip"])
if value is None:
value = field(conn, ["sourceIP", "source_ip", "source"])
text = str(value or "unknown")
if text.count(":") == 1 and text.rsplit(":", 1)[1].isdigit():
text = text.rsplit(":", 1)[0]
return text
def destination_of(conn):
meta = metadata(conn)
host = field(meta, ["host", "destinationIP", "destination_ip", "destination", "dstIP", "dst_ip"])
port = field(meta, ["destinationPort", "destination_port", "dstPort", "dst_port"])
if host is None:
host = field(conn, ["host", "destination"])
if host is None:
return "-"
host_text = str(host)
return f"{host_text}:{port}" if port not in (None, "") else host_text
def conn_id(conn):
value = field(conn, ["id", "uuid", "connId", "connectionId"])
if value not in (None, ""):
return str(value)
meta = metadata(conn)
return "|".join([
client_of(conn),
str(field(meta, ["sourcePort", "source_port"], "")),
destination_of(conn),
str(field(meta, ["network", "type"], "")),
])
def upload_of(conn):
return number(field(conn, ["upload", "up", "uploadBytes", "upload_bytes"], 0))
def download_of(conn):
return number(field(conn, ["download", "down", "downloadBytes", "download_bytes"], 0))
def connections(data):
items = data.get("connections") if isinstance(data, dict) else None
return items if isinstance(items, list) else []
snapshots = []
fetch_failures = []
for json_path in sorted(glob.glob(str(tmp / "snapshot-*.json")), key=lambda path: int(path.rsplit("-", 1)[1].split(".", 1)[0])):
index = int(json_path.rsplit("-", 1)[1].split(".", 1)[0])
rc = read_text(tmp / f"snapshot-{index}.rc", 200).strip()
raw = read_text(json_path, 8_000)
ms = number(read_text(tmp / f"snapshot-{index}.ms", 200).strip())
if rc != "0":
fetch_failures.append({"index": index, "exitCode": number(rc), "stderrTail": read_text(f"{json_path}.err", 1200)})
continue
try:
parsed = json.loads(raw)
except Exception as exc:
fetch_failures.append({"index": index, "exitCode": 0, "parseError": str(exc), "stdoutTail": raw[-1200:]})
continue
if isinstance(parsed, dict):
snapshots.append({"index": index, "ms": ms, "data": parsed, "connections": connections(parsed)})
if not snapshots:
print(json.dumps({
"ok": False,
"reason": "clash-api-unavailable",
"target": target,
"namespace": namespace,
"deployment": deployment,
"pod": pod,
"controller": "127.0.0.1:9090",
"fetchFailures": fetch_failures[-3:],
"source": {"sourceType": source_type, "sourceRef": source_ref, "sourceConfigRef": source_config_ref or None, "sourceFingerprint": source_fingerprint or None, "valuesPrinted": False},
"next": {"apply": f"bun scripts/cli.ts platform-infra sub2api apply --target {target} --confirm --wait"},
}, ensure_ascii=False))
sys.exit(0)
first, last = snapshots[0], snapshots[-1]
duration = max(0.001, (last["ms"] - first["ms"]) / 1000.0)
clients = defaultdict(lambda: {
"client": "",
"windowUploadBytes": 0,
"windowDownloadBytes": 0,
"activeUploadBytes": 0,
"activeDownloadBytes": 0,
"activeConnections": 0,
"destinations": Counter(),
})
previous = {}
for snap_index, snap in enumerate(snapshots):
current = {}
for conn in snap["connections"]:
cid = conn_id(conn)
current[cid] = {
"client": client_of(conn),
"destination": destination_of(conn),
"upload": upload_of(conn),
"download": download_of(conn),
}
if snap_index > 0:
for cid, curr in current.items():
client = curr["client"]
clients[client]["client"] = client
prev = previous.get(cid)
if prev is None:
du = max(0, curr["upload"])
dd = max(0, curr["download"])
else:
du = max(0, curr["upload"] - prev["upload"])
dd = max(0, curr["download"] - prev["download"])
clients[client]["windowUploadBytes"] += du
clients[client]["windowDownloadBytes"] += dd
if curr["destination"] != "-":
clients[client]["destinations"][curr["destination"]] += 1
previous = current
for conn in last["connections"]:
client = client_of(conn)
clients[client]["client"] = client
clients[client]["activeConnections"] += 1
clients[client]["activeUploadBytes"] += upload_of(conn)
clients[client]["activeDownloadBytes"] += download_of(conn)
dest = destination_of(conn)
if dest != "-":
clients[client]["destinations"][dest] += 1
def top_level_total(data, names):
return number(field(data, names, 0))
first_data, last_data = first["data"], last["data"]
process_upload_first = top_level_total(first_data, ["uploadTotal", "upload_total", "upload"])
process_download_first = top_level_total(first_data, ["downloadTotal", "download_total", "download"])
process_upload_last = top_level_total(last_data, ["uploadTotal", "upload_total", "upload"])
process_download_last = top_level_total(last_data, ["downloadTotal", "download_total", "download"])
rows = []
for client, values in clients.items():
upload = values["windowUploadBytes"]
download = values["windowDownloadBytes"]
active_upload = values["activeUploadBytes"]
active_download = values["activeDownloadBytes"]
rows.append({
"client": client,
"activeConnections": values["activeConnections"],
"windowUploadBytes": upload,
"windowDownloadBytes": download,
"windowTotalBytes": upload + download,
"uploadBps": upload / duration,
"downloadBps": download / duration,
"totalBps": (upload + download) / duration,
"activeUploadBytes": active_upload,
"activeDownloadBytes": active_download,
"activeTotalBytes": active_upload + active_download,
"topDestinations": [{"destination": name, "samples": count} for name, count in values["destinations"].most_common(5)],
})
rows.sort(key=lambda item: (item["totalBps"], item["windowTotalBytes"], item["activeTotalBytes"]), reverse=True)
window_upload = sum(item["windowUploadBytes"] for item in rows)
window_download = sum(item["windowDownloadBytes"] for item in rows)
active_upload = sum(item["activeUploadBytes"] for item in rows)
active_download = sum(item["activeDownloadBytes"] for item in rows)
process_window_upload = max(0, process_upload_last - process_upload_first)
process_window_download = max(0, process_download_last - process_download_first)
payload = {
"ok": True,
"target": target,
"namespace": namespace,
"deployment": deployment,
"pod": pod,
"serviceName": service_name,
"servicePort": int(service_port),
"controller": "127.0.0.1:9090",
"sampleSecondsRequested": sample_seconds,
"sampleSecondsActual": duration,
"snapshots": len(snapshots),
"fetchFailures": fetch_failures[-3:],
"source": {"sourceType": source_type, "sourceRef": source_ref, "sourceConfigRef": source_config_ref or None, "sourceFingerprint": source_fingerprint or None, "valuesPrinted": False},
"totals": {
"activeConnections": len(last["connections"]),
"clientWindowUploadBytes": window_upload,
"clientWindowDownloadBytes": window_download,
"clientWindowTotalBytes": window_upload + window_download,
"clientUploadBps": window_upload / duration,
"clientDownloadBps": window_download / duration,
"clientTotalBps": (window_upload + window_download) / duration,
"clientActiveUploadBytes": active_upload,
"clientActiveDownloadBytes": active_download,
"clientActiveTotalBytes": active_upload + active_download,
"processUploadTotalBytes": process_upload_last,
"processDownloadTotalBytes": process_download_last,
"processTotalBytes": process_upload_last + process_download_last,
"processWindowUploadBytes": process_window_upload,
"processWindowDownloadBytes": process_window_download,
"processWindowTotalBytes": process_window_upload + process_window_download,
"processWindowBps": (process_window_upload + process_window_download) / duration,
},
"clients": rows,
"disclosure": "Per-client cumulative bytes are measured over this bounded sample window from proxy API connection counters; proxy process totals are included when the API reports them.",
}
print(json.dumps(payload, ensure_ascii=False))
PY
`;
}
export function egressProxyTrafficCompactResult(result: CommandResult): Record<string, unknown> {
return {
exitCode: result.exitCode,
stdoutBytes: Buffer.byteLength(result.stdout, "utf8"),
stderrBytes: Buffer.byteLength(result.stderr, "utf8"),
stdoutTail: result.stdout.slice(-2000),
stderrTail: result.stderr.slice(-2000),
timedOut: result.timedOut,
};
}
function shQuote(value: string): string {
return `'${value.replaceAll("'", "'\"'\"'")}'`;
}
+1
View File
@@ -656,6 +656,7 @@ function platformInfraHelpSummary(): unknown {
"bun scripts/cli.ts platform-infra sub2api codex-pool sentinel-report",
"bun scripts/cli.ts platform-infra egress-proxy benchmark --target D601 --profile no-mirror --confirm",
"bun scripts/cli.ts platform-infra egress-proxy benchmark-status --target D601 --profile no-mirror",
"bun scripts/cli.ts platform-infra egress-proxy traffic --target D601 --sample-seconds 15",
"bun scripts/cli.ts platform-infra langbot plan",
"bun scripts/cli.ts platform-infra langbot apply --confirm",
"bun scripts/cli.ts platform-infra langbot status",
+164 -4
View File
@@ -3,10 +3,12 @@ import { rootPath } from "./config";
import { runCommand } from "./command";
import type { RenderedCliResult } from "./output";
import { egressBenchmarkCompactResult, egressBenchmarkDryRun, egressBenchmarkStartScript, egressBenchmarkStatusScript, type EgressBenchmarkSpec } from "./egress-proxy-benchmark";
import { egressProxyTrafficCompactResult, egressProxyTrafficScript, type EgressProxyTrafficSpec } from "./egress-proxy-traffic";
import { readSub2ApiConfig } from "./platform-infra/config";
import { resolveTarget } from "./platform-infra/manifest";
type BenchmarkAction = "benchmark" | "benchmark-status" | "benchmark-logs";
type EgressProxyAction = BenchmarkAction | "traffic";
interface BenchmarkOptions {
action: BenchmarkAction;
@@ -20,8 +22,22 @@ interface BenchmarkOptions {
tailLines: number;
}
interface TrafficOptions {
action: "traffic";
targetId: string;
sampleSeconds: number;
timeoutSeconds: number;
limit: number;
}
type EgressProxyOptions = BenchmarkOptions | TrafficOptions;
export async function runPlatformInfraEgressProxyCommand(_config: UniDeskConfig, args: string[]): Promise<Record<string, unknown> | RenderedCliResult> {
const options = parseBenchmarkOptions(args);
const options = parseEgressProxyOptions(args);
if (options.action === "traffic") {
const spec = platformTrafficSpec(options);
return proxyTraffic(spec, options);
}
const spec = platformBenchmarkSpec(options);
if (options.action === "benchmark-status" || options.action === "benchmark-logs") return benchmarkStatus(spec, options);
if (options.dryRun) {
@@ -48,6 +64,27 @@ export async function runPlatformInfraEgressProxyCommand(_config: UniDeskConfig,
});
}
function proxyTraffic(spec: EgressProxyTrafficSpec, options: TrafficOptions): RenderedCliResult {
const result = runTrans(spec.route, egressProxyTrafficScript(spec, options.sampleSeconds), options.timeoutSeconds);
const parsed = parseJson(result.stdout);
const data = typeof parsed === "object" && parsed !== null
? parsed as Record<string, unknown>
: { ok: false, reason: "traffic-output-unparseable", stdoutPreview: result.stdout.slice(0, 2000) };
return renderTraffic({
...data,
ok: result.exitCode === 0 && data.ok !== false,
command: "platform-infra egress-proxy traffic",
mode: "traffic",
target: spec.targetId,
namespace: spec.namespace,
serviceName: spec.serviceName,
servicePort: spec.port,
sampleSecondsRequested: options.sampleSeconds,
limit: options.limit,
result: egressProxyTrafficCompactResult(result),
});
}
function benchmarkStatus(spec: EgressBenchmarkSpec, options: BenchmarkOptions): RenderedCliResult {
const result = runTrans(spec.route, egressBenchmarkStatusScript(spec, options.tailLines), options.timeoutSeconds);
const parsed = parseJson(result.stdout);
@@ -88,14 +125,44 @@ function platformBenchmarkSpec(options: BenchmarkOptions): EgressBenchmarkSpec {
};
}
function parseBenchmarkOptions(args: string[]): BenchmarkOptions {
function platformTrafficSpec(options: TrafficOptions): EgressProxyTrafficSpec {
const sub2api = readSub2ApiConfig();
const target = resolveTarget(sub2api, options.targetId);
const proxy = target.egressProxy;
if (proxy === null || !proxy.enabled) throw new Error(`target ${target.id} has no enabled egressProxy`);
return {
scope: "platform-infra",
targetId: target.id,
route: target.route,
namespace: target.namespace,
deploymentName: proxy.deploymentName,
serviceName: proxy.serviceName,
port: proxy.listenPort,
sourceType: proxy.sourceType,
sourceRef: proxy.sourceRef,
sourceConfigRef: proxy.sourceConfigRef,
sourceFingerprint: proxy.sourceFingerprint,
};
}
function parseEgressProxyOptions(args: string[]): EgressProxyOptions {
const actionRaw = args[0] ?? "benchmark";
if (actionRaw !== "benchmark" && actionRaw !== "benchmark-status" && actionRaw !== "benchmark-logs") {
throw new Error("platform-infra egress-proxy usage: benchmark|benchmark-status|benchmark-logs --target D601|D518 --profile no-mirror [--dry-run|--confirm]");
if (!isEgressProxyAction(actionRaw)) {
throw new Error("platform-infra egress-proxy usage: benchmark|benchmark-status|benchmark-logs|traffic --target D601|D518 [--profile no-mirror] [--sample-seconds N]");
}
const action = actionRaw;
const rest = args.slice(1);
const targetId = option(rest, "--target") ?? "D601";
if (action === "traffic") {
const sampleSeconds = positiveIntOption(rest, "--sample-seconds", 5, 45);
return {
action,
targetId,
sampleSeconds,
timeoutSeconds: positiveIntOption(rest, "--timeout-seconds", Math.min(60, sampleSeconds + 20), 60),
limit: positiveIntOption(rest, "--limit", 50, 200),
};
}
const profileRaw = option(rest, "--profile") ?? "no-mirror";
if (profileRaw !== "no-mirror") throw new Error("--profile currently supports no-mirror");
const confirm = rest.includes("--confirm");
@@ -114,6 +181,10 @@ function parseBenchmarkOptions(args: string[]): BenchmarkOptions {
};
}
function isEgressProxyAction(value: string): value is EgressProxyAction {
return value === "benchmark" || value === "benchmark-status" || value === "benchmark-logs" || value === "traffic";
}
function runTrans(route: string, script: string, timeoutSeconds: number) {
return runCommand(["/root/.local/bin/trans", route, "sh", "--", script], rootPath(), { timeoutMs: timeoutSeconds * 1000 });
}
@@ -186,6 +257,68 @@ function renderBenchmark(result: Record<string, unknown>): RenderedCliResult {
return { ok: result.ok !== false, command: text(result.command, "platform-infra egress-proxy benchmark"), renderedText: lines.join("\n"), contentType: "text/plain" };
}
function renderTraffic(result: Record<string, unknown>): RenderedCliResult {
const totals = record(result.totals);
const source = record(result.source);
const next = record(result.next);
const clients = arrayRecords(result.clients).slice(0, Number(result.limit ?? 50));
const allClientCount = arrayRecords(result.clients).length;
const ok = result.ok !== false;
const target = text(result.target);
const reason = text(result.reason, ok ? "ok" : "failed");
const rows = clients.map((client) => {
const destinations = arrayRecords(client.topDestinations)
.map((item) => `${text(item.destination)}(${text(item.samples)})`)
.join(", ");
return [
text(client.client),
text(client.activeConnections, "0"),
rate(client.totalBps),
rate(client.uploadBps),
rate(client.downloadBps),
bytes(client.windowTotalBytes),
bytes(client.activeTotalBytes),
destinations || "-",
];
});
const lines = [
"PLATFORM-INFRA EGRESS-PROXY TRAFFIC",
"",
...table(["TARGET", "STATUS", "POD", "DURATION", "SNAPSHOTS", "ACTIVE"], [[
target,
ok ? "ok" : reason,
text(result.pod),
`${numberText(result.sampleSecondsActual, result.sampleSecondsRequested)}s`,
text(result.snapshots, "0"),
text(totals.activeConnections, "0"),
]]),
"",
"TOTALS",
...table(["SCOPE", "RATE", "WINDOW", "ACTIVE/CUMULATIVE"], [
["per-client", rate(totals.clientTotalBps), bytes(totals.clientWindowTotalBytes), bytes(totals.clientActiveTotalBytes)],
["process", rate(totals.processWindowBps), bytes(totals.processWindowTotalBytes), bytes(totals.processTotalBytes)],
]),
"",
rows.length === 0 ? "CLIENTS\n-" : [
"CLIENTS",
...table(["CLIENT", "CONNS", "TOTAL/s", "UP/s", "DOWN/s", "WINDOW", "ACTIVE_TOTAL", "TOP_DESTINATIONS"], rows),
].join("\n"),
allClientCount > clients.length ? `\nDisclosure: ${allClientCount - clients.length} more clients hidden; rerun with --limit ${Math.min(200, allClientCount)}.` : "",
"",
"SOURCE",
` target=${target} namespace=${text(result.namespace)} service=${text(result.serviceName)}:${text(result.servicePort)} controller=${text(result.controller)}`,
` sourceType=${text(source.sourceType)} sourceRef=${text(source.sourceRef)} sourceFingerprint=${text(source.sourceFingerprint)} valuesPrinted=false`,
"",
"NEXT",
` ${text(next.apply, "")}`,
` ${text(next.status, `bun scripts/cli.ts platform-infra sub2api status --target ${target} --full`)}`,
` bun scripts/cli.ts platform-infra egress-proxy traffic --target ${target} --sample-seconds 15`,
"",
"Disclosure: proxy API is read through pod loopback only. Per-client cumulative bytes are for the sample window; process cumulative is since proxy process start when reported by sing-box.",
].filter((line) => line !== " ");
return { ok, command: "platform-infra egress-proxy traffic", renderedText: lines.join("\n"), contentType: "text/plain" };
}
function table(headers: string[], rows: string[][]): string[] {
const widths = headers.map((header, index) => Math.max(header.length, ...rows.map((row) => row[index]?.length ?? 0)));
const render = (row: string[]) => row.map((cell, index) => cell.padEnd(widths[index] ?? cell.length)).join(" ").trimEnd();
@@ -196,7 +329,34 @@ function record(value: unknown): Record<string, unknown> {
return typeof value === "object" && value !== null && !Array.isArray(value) ? value as Record<string, unknown> : {};
}
function arrayRecords(value: unknown): Array<Record<string, unknown>> {
return Array.isArray(value) ? value.map(record) : [];
}
function text(value: unknown, fallback = "-"): string {
if (value === undefined || value === null || value === "") return fallback;
return String(value);
}
function numberText(value: unknown, fallback: unknown): string {
const number = typeof value === "number" && Number.isFinite(value) ? value : Number(fallback);
if (!Number.isFinite(number)) return text(value);
return number.toFixed(number >= 10 ? 0 : 1).replace(/\.0$/u, "");
}
function bytes(value: unknown): string {
const number = Number(value);
if (!Number.isFinite(number) || number <= 0) return "0 B";
const units = ["B", "KiB", "MiB", "GiB", "TiB"];
let scaled = number;
let unit = 0;
while (scaled >= 1024 && unit < units.length - 1) {
scaled /= 1024;
unit += 1;
}
return `${scaled >= 10 || unit === 0 ? scaled.toFixed(0) : scaled.toFixed(1)} ${units[unit]}`;
}
function rate(value: unknown): string {
return `${bytes(value)}/s`;
}
+1
View File
@@ -327,6 +327,7 @@ export function platformInfraHelp(): unknown {
"bun scripts/cli.ts platform-infra sub2api codex-pool trace --request-id <requestId>",
"bun scripts/cli.ts platform-infra sub2api codex-pool sentinel-image status",
"bun scripts/cli.ts platform-infra sub2api codex-pool sentinel-probe --account unidesk-codex-hy --confirm",
"bun scripts/cli.ts platform-infra egress-proxy traffic --target D601 --sample-seconds 15",
"bun scripts/cli.ts platform-infra langbot plan [--target G14]",
"bun scripts/cli.ts platform-infra langbot apply [--target G14] --confirm",
"bun scripts/cli.ts platform-infra langbot status [--target G14] [--full|--raw]",
@@ -170,6 +170,11 @@ export function renderSingBoxMasterShadowsocksConfig(proxy: Sub2ApiEgressProxyCo
export function renderSingBoxProxyConfig(outbound: Record<string, unknown>, proxy: Sub2ApiEgressProxyConfig): string {
const config = stripUndefined({
log: { level: "info", timestamp: true },
experimental: {
clash_api: {
external_controller: "127.0.0.1:9090",
},
},
inbounds: [
{
type: "mixed",