Files
pikasTech-unidesk/scripts/src/hwlab-node-control-plane-runtime.ts
T
2026-06-29 08:13:34 +00:00

2617 lines
126 KiB
TypeScript

import { createHash } from "node:crypto";
import { rootPath } from "./config";
import { runCommand, type CommandResult } from "./command";
import {
HWLAB_NODE_CONTROL_PLANE_CONFIG_PATH,
type ArgoOptions,
type CiBuildBenchmarkCachePolicy,
type CiBuildBenchmarkProfileSpec,
type ControlPlaneEgressProxySpec,
type ControlPlaneGitMirrorGithubTransportSpec,
type ControlPlaneHostRouteEgressProxySpec,
type ControlPlaneK3sInstallSpec,
type ControlPlaneK3sNodeSpec,
type ControlPlaneNodeSpec,
type ControlPlaneRuntimeProxySpec,
type ControlPlaneTargetSpec,
type TektonInstallOptions,
type ToolsImageOptions,
} from "./hwlab-node-control-plane-model";
export function argoDesiredManifest(target: ControlPlaneTargetSpec): Record<string, unknown>[] {
return [argoProjectSkeleton(target), argoApplicationSkeleton(target)];
}
export function argoProjectSkeleton(target: ControlPlaneTargetSpec): Record<string, unknown> {
return {
apiVersion: "argoproj.io/v1alpha1",
kind: "AppProject",
metadata: { name: target.argo.projectName, namespace: target.argo.namespace },
spec: {
sourceRepos: [target.gitMirror.readUrl],
destinations: [{ server: "https://kubernetes.default.svc", namespace: target.runtimeNamespace }],
clusterResourceWhitelist: [{ group: "*", kind: "*" }],
namespaceResourceWhitelist: [{ group: "*", kind: "*" }],
},
};
}
export function argoApplicationSkeleton(target: ControlPlaneTargetSpec): Record<string, unknown> {
return {
apiVersion: "argoproj.io/v1alpha1",
kind: "Application",
metadata: { name: target.argo.applicationName, namespace: target.argo.namespace },
spec: {
project: target.argo.projectName,
source: { repoURL: target.gitMirror.readUrl, targetRevision: target.gitops.branch, path: target.gitops.path },
destination: { server: "https://kubernetes.default.svc", namespace: target.runtimeNamespace },
syncPolicy: { automated: { prune: true, selfHeal: true } },
},
};
}
export function planSummary(node: ControlPlaneNodeSpec, target: ControlPlaneTargetSpec): Record<string, unknown> {
return {
id: target.id,
node: node.id,
kubeRoute: node.kubeRoute,
lane: target.lane,
enabled: target.enabled,
ciNamespace: target.ciNamespace,
runtimeNamespace: target.runtimeNamespace,
k3sNodeConfig: k3sNodeConfigPlan(node),
registry: node.registry.endpoint,
egressProxy: controlPlaneEgressProxySummary(node.egressProxy),
sourceBranch: target.source.branch,
gitopsBranch: target.gitops.branch,
gitopsPath: target.gitops.path,
gitMirrorNamespace: target.gitMirror.namespace,
readUrl: target.gitMirror.readUrl,
writeUrl: target.gitMirror.writeUrl,
pipeline: target.tekton.pipelineName,
pipelineRunPrefix: target.tekton.pipelineRunPrefix,
serviceAccount: target.tekton.serviceAccountName,
gitWorkspaceSecret: tektonGitWorkspaceSecretSummary(target),
runtimeObserverRbac: target.tekton.runtimeObserverRbac,
argoObserverRbac: target.tekton.argoObserverRbac,
tektonInstall: {
enabled: target.tekton.install.enabled,
version: target.tekton.install.version,
manifests: target.tekton.install.manifests,
requiredCrds: target.tekton.install.requiredCrds,
expectedDeploymentNamespaces: target.tekton.install.expectedDeploymentNamespaces,
},
toolsImage: target.tekton.toolsImage,
argoApplication: target.argo.applicationName,
argoInstall: {
enabled: target.argo.install.enabled,
version: target.argo.install.version,
manifestUrl: target.argo.install.manifestUrl,
imageRewrites: target.argo.install.imageRewrites,
},
};
}
export function expectedSummary(node: ControlPlaneNodeSpec, target: ControlPlaneTargetSpec): Record<string, unknown> {
return {
sourceRepo: target.source.repository,
branch: target.source.branch,
gitopsBranch: target.gitops.branch,
runtimePath: target.gitops.path,
runtimeNamespace: target.runtimeNamespace,
namespace: target.ciNamespace,
k3sNodeConfig: k3sNodeConfigPlan(node),
gitMirror: {
namespace: target.gitMirror.namespace,
readUrl: target.gitMirror.readUrl,
writeUrl: target.gitMirror.writeUrl,
cachePvc: target.gitMirror.cachePvcName,
cachePvcStorage: target.gitMirror.cachePvcStorage,
cacheHostPath: target.gitMirror.cacheHostPath,
servicePort: target.gitMirror.servicePort,
readContainerPort: target.gitMirror.readContainerPort,
writeContainerPort: target.gitMirror.writeContainerPort,
deploymentReplicas: target.gitMirror.deploymentReplicas,
syncConfigMap: target.gitMirror.syncConfigMapName,
egressProxy: target.gitMirror.egressProxy,
effectiveEgressProxy: gitMirrorEffectiveEgressProxySummary(node, target),
githubTransport: gitMirrorGithubTransportSummary(target.gitMirror.githubTransport),
statusSummaryKeys: ["localSource", "githubSource", "localGitops", "githubGitops", "pendingFlush", "flushNeeded", "githubInSync"],
},
pipeline: target.tekton.pipelineName,
pipelineRunPrefix: target.tekton.pipelineRunPrefix,
serviceAccount: target.tekton.serviceAccountName,
gitWorkspaceSecret: tektonGitWorkspaceSecretSummary(target),
runtimeObserverRbac: target.tekton.runtimeObserverRbac,
argoObserverRbac: target.tekton.argoObserverRbac,
tektonInstall: {
enabled: target.tekton.install.enabled,
sourceKind: target.tekton.install.sourceKind,
version: target.tekton.install.version,
manifests: target.tekton.install.manifests,
requiredCrds: target.tekton.install.requiredCrds,
expectedDeploymentNamespaces: target.tekton.install.expectedDeploymentNamespaces,
readinessTimeoutSeconds: target.tekton.install.readinessTimeoutSeconds,
},
toolsImage: target.tekton.toolsImage,
argoNamespace: target.argo.namespace,
argoApplication: target.argo.applicationName,
argoInstall: {
enabled: target.argo.install.enabled,
sourceKind: target.argo.install.sourceKind,
version: target.argo.install.version,
manifestUrl: target.argo.install.manifestUrl,
preloadImages: target.argo.install.preloadImages,
imageRewrites: target.argo.install.imageRewrites,
requiredCrds: target.argo.install.requiredCrds,
expectedDeployments: target.argo.install.expectedDeployments,
expectedStatefulSets: target.argo.install.expectedStatefulSets,
},
registry: node.registry.endpoint,
imagePolicy: {
noPrivateInputImages: true,
buildInput: { sourceKind: target.tekton.toolsImage.sourceKind, context: target.tekton.toolsImage.context, dockerfile: target.tekton.toolsImage.dockerfile ?? null, dockerfileInline: target.tekton.toolsImage.dockerfileInline ?? null, composeFile: target.tekton.toolsImage.composeFile ?? null, publicBaseImages: target.tekton.toolsImage.publicBaseImages },
outputImage: target.tekton.toolsImage.output,
},
};
}
export function k3sNodeConfigPlan(node: ControlPlaneNodeSpec): Record<string, unknown> {
if (node.k3s === null) return { managed: false };
const dropIn = k3sDropInContent(node.k3s);
return {
managed: true,
serviceName: node.k3s.serviceName,
dropInPath: node.k3s.dropInPath,
nodeStatusName: node.k3s.nodeStatusName,
desiredMaxPods: node.k3s.kubelet.maxPods,
dropInSha256: sha256Short(dropIn),
execStartPreCount: node.k3s.execStartPre.length,
serverArgCount: node.k3s.serverArgs.length,
};
}
export function k3sInstallPlan(node: ControlPlaneNodeSpec, target: ControlPlaneTargetSpec, spec: ControlPlaneK3sInstallSpec): Record<string, unknown> {
return {
node: node.id,
route: node.route,
kubeRoute: node.kubeRoute,
lane: target.lane,
version: spec.version,
channel: spec.channel,
installScriptUrl: spec.installScriptUrl,
binaryUrl: spec.binaryUrl,
sha256Url: spec.sha256Url,
expectedSha256: `sha256:${spec.expectedSha256}`,
hostProxyConfigRef: spec.hostProxyConfigRef,
proxyEnvPath: spec.proxyEnvPath,
registriesYamlPath: spec.registriesYamlPath,
state: spec.state,
localRegistry: spec.localRegistry,
serviceName: node.k3s?.serviceName ?? null,
nodeStatusName: node.k3s?.nodeStatusName ?? null,
serverArgCount: node.k3s?.serverArgs.length ?? 0,
valuesPrinted: false,
};
}
export function k3sDropInContent(spec: ControlPlaneK3sNodeSpec): string {
return [
"# Managed by UniDesk. Source: config/hwlab-node-control-plane.yaml nodes.<node>.k3s",
"[Service]",
...spec.execStartPre.map((command) => `ExecStartPre=${command.map(systemdExecArg).join(" ")}`),
"ExecStart=",
`ExecStart=${["/usr/local/bin/k3s", ...spec.serverArgs].map(systemdExecArg).join(" ")}`,
"",
].join("\n");
}
export function k3sServiceUnitContent(spec: ControlPlaneK3sNodeSpec, install: ControlPlaneK3sInstallSpec): string {
return [
"[Unit]",
"Description=Lightweight Kubernetes",
"Documentation=https://k3s.io",
"Wants=network-online.target",
"After=network-online.target",
"",
"[Install]",
"WantedBy=multi-user.target",
"",
"[Service]",
"Type=notify",
`EnvironmentFile=-${install.proxyEnvPath}`,
"KillMode=process",
"Delegate=yes",
"LimitNOFILE=1048576",
"LimitNPROC=infinity",
"LimitCORE=infinity",
"TasksMax=infinity",
"TimeoutStartSec=0",
"Restart=always",
"RestartSec=5s",
"ExecStartPre=-/sbin/modprobe br_netfilter",
"ExecStartPre=-/sbin/modprobe overlay",
`ExecStart=${["/usr/local/bin/k3s", ...spec.serverArgs].map(systemdExecArg).join(" ")}`,
"",
].join("\n");
}
export function k3sRegistriesYaml(install: ControlPlaneK3sInstallSpec): string {
const endpoint = install.localRegistry.bind.split(":").slice(0, 2).join(":");
return [
"mirrors:",
` "${endpoint}":`,
" endpoint:",
` - "http://${endpoint}"`,
"configs:",
` "${endpoint}":`,
" tls:",
" insecure_skip_verify: true",
"",
].join("\n");
}
export function controlPlaneEgressProxySummary(proxy: ControlPlaneEgressProxySpec | null): Record<string, unknown> | null {
if (proxy === null) return null;
if (proxy.mode === "host-route") {
return {
mode: proxy.mode,
clientName: proxy.clientName,
hostProxyConfigRef: proxy.hostProxyConfigRef,
proxyEnvPath: proxy.proxyEnvPath,
proxyUrl: proxy.proxyUrl,
noProxyCount: proxy.noProxy.length,
valuesPrinted: false,
};
}
return {
mode: proxy.mode,
clientName: proxy.clientName,
namespace: proxy.namespace,
serviceName: proxy.serviceName,
port: proxy.port,
sourceConfigRef: proxy.sourceConfigRef,
sourceType: proxy.sourceType,
sourceRef: proxy.sourceRef,
sourceKey: proxy.sourceKey,
sourceFingerprint: proxy.sourceFingerprint,
preferredOutbound: proxy.preferredOutbound,
valuesPrinted: false,
};
}
export function hostRouteNoProxy(proxy: ControlPlaneHostRouteEgressProxySpec): readonly string[] {
return [...new Set(["localhost", "127.0.0.1", "::1", "127.0.0.1:5000", "localhost:5000", ...proxy.noProxy])];
}
export function runtimeHostProxyConfig(node: ControlPlaneNodeSpec, spec: ControlPlaneRuntimeProxySpec): Record<string, unknown> {
if (!spec.enabled) {
return {
enabled: false,
mode: "host-route",
configRef: spec.configRef,
hostNetwork: false,
injectEnv: false,
deployments: [],
statefulSets: [],
valuesPrinted: false,
};
}
if (node.egressProxy?.mode !== "host-route") {
throw new Error(`runtimeProxy enabled for ${node.id} requires nodes.${node.id}.egressProxy.mode=host-route`);
}
return {
enabled: true,
mode: spec.mode,
configRef: spec.configRef,
hostNetwork: spec.hostNetwork,
injectEnv: spec.injectEnv,
deployments: spec.deployments,
statefulSets: spec.statefulSets,
proxyUrl: node.egressProxy.proxyUrl,
noProxy: hostRouteNoProxy(node.egressProxy),
valuesPrinted: false,
};
}
export function runtimeHostProxyEnv(node: ControlPlaneNodeSpec, spec: ControlPlaneRuntimeProxySpec): readonly Record<string, string>[] {
if (!spec.enabled || !spec.injectEnv) return [];
if (node.egressProxy?.mode !== "host-route") {
throw new Error(`runtimeProxy env for ${node.id} requires nodes.${node.id}.egressProxy.mode=host-route`);
}
const proxyUrl = node.egressProxy.proxyUrl;
const noProxy = hostRouteNoProxy(node.egressProxy).join(",");
return [
{ name: "HTTP_PROXY", value: proxyUrl },
{ name: "HTTPS_PROXY", value: proxyUrl },
{ name: "ALL_PROXY", value: proxyUrl },
{ name: "NO_PROXY", value: noProxy },
{ name: "http_proxy", value: proxyUrl },
{ name: "https_proxy", value: proxyUrl },
{ name: "all_proxy", value: proxyUrl },
{ name: "no_proxy", value: noProxy },
];
}
export function runtimeProxyReady(status: Record<string, unknown>): boolean {
const proxy = record(status.runtimeProxy);
return proxy.enabled !== true || boolField(proxy, "ready");
}
export function gitMirrorRuntimeProxySpec(node: ControlPlaneNodeSpec, target: ControlPlaneTargetSpec): ControlPlaneRuntimeProxySpec {
const config = target.gitMirror.egressProxy;
if (config === null || config.mode !== "host-route") {
return { enabled: false, mode: "host-route", configRef: null, hostNetwork: false, injectEnv: false, deployments: [], statefulSets: [] };
}
return {
enabled: config.podHostNetwork || config.injectPodEnv,
mode: "host-route",
configRef: `nodes.${node.id}.egressProxy`,
hostNetwork: config.podHostNetwork,
injectEnv: config.injectPodEnv,
deployments: [],
statefulSets: [],
};
}
export function gitMirrorEffectiveEgressProxySummary(node: ControlPlaneNodeSpec, target: ControlPlaneTargetSpec): Record<string, unknown> {
const config = target.gitMirror.egressProxy;
if (config === null || config.mode === "direct") {
return {
mode: "direct",
required: false,
transport: target.gitMirror.githubTransport.mode,
valuesPrinted: false,
};
}
const proxy = node.egressProxy;
if (config.mode === "host-route") {
return {
mode: config.mode,
required: config.required,
transport: target.gitMirror.githubTransport.mode,
podHostNetwork: config.podHostNetwork,
injectPodEnv: config.injectPodEnv,
ready: proxy?.mode === "host-route",
nodeProxy: controlPlaneEgressProxySummary(proxy),
valuesPrinted: false,
};
}
return {
mode: config.mode,
required: config.required,
transport: target.gitMirror.githubTransport.mode,
podHostNetwork: config.podHostNetwork,
injectPodEnv: config.injectPodEnv,
ready: proxy !== null,
nodeProxy: controlPlaneEgressProxySummary(proxy),
valuesPrinted: false,
};
}
export function gitMirrorGithubTransportSummary(transport: ControlPlaneGitMirrorGithubTransportSpec): Record<string, unknown> {
if (transport.mode === "ssh") {
return {
mode: "ssh",
privateKeySecretKey: transport.privateKeySecretKey,
privateKeySourceRef: transport.privateKeySourceRef,
privateKeySourceKey: transport.privateKeySourceKey,
privateKeySourceEncoding: transport.privateKeySourceEncoding,
knownHostsSecretKey: transport.knownHostsSecretKey,
knownHostsSourceRef: transport.knownHostsSourceRef,
knownHostsSourceKey: transport.knownHostsSourceKey,
knownHostsSourceEncoding: transport.knownHostsSourceEncoding,
valuesPrinted: false,
};
}
return {
mode: "https",
username: transport.username,
tokenSecretName: transport.tokenSecretName,
tokenSecretKey: transport.tokenSecretKey,
tokenSourceRef: transport.tokenSourceRef,
tokenSourceKey: transport.tokenSourceKey,
valuesPrinted: false,
};
}
export function tektonGitWorkspaceSecretSummary(target: ControlPlaneTargetSpec): Record<string, unknown> {
const transport = target.gitMirror.githubTransport;
const secret = target.tekton.gitWorkspaceSecret;
return {
name: secret.name,
namespace: secret.namespace,
sourceRefFrom: secret.sourceRefFrom,
privateKeySecretKey: secret.privateKeySecretKey,
privateKeySourceRef: transport.mode === "ssh" ? transport.privateKeySourceRef : null,
privateKeySourceKey: transport.mode === "ssh" ? transport.privateKeySourceKey : null,
knownHostsSecretKey: secret.knownHostsSecretKey,
knownHostsSourceRef: transport.mode === "ssh" ? transport.knownHostsSourceRef : null,
knownHostsSourceKey: transport.mode === "ssh" ? transport.knownHostsSourceKey : null,
valuesPrinted: false,
};
}
export function systemdExecArg(value: string): string {
if (/^[A-Za-z0-9_@%+=:,./-]+$/u.test(value)) return value;
return `"${value.replaceAll("\\", "\\\\").replaceAll("\"", "\\\"").replaceAll("$", "\\$").replaceAll("`", "\\`")}"`;
}
export function k3sInstallSubmitScript(node: ControlPlaneNodeSpec, target: ControlPlaneTargetSpec, install: ControlPlaneK3sInstallSpec): string {
if (node.k3s === null) throw new Error("k3sInstallSubmitScript requires node.k3s");
const runner = k3sInstallRunnerScript(node, target, install);
const encoded = Buffer.from(runner, "utf8").toString("base64");
return `
set -eu
state_dir=${shQuote(install.state.dir)}
runner="$state_dir/install-runner.sh"
pid_file="$state_dir/pid"
status_file=${shQuote(install.state.statusPath)}
mkdir -p "$state_dir"
if [ -s "$pid_file" ]; then
pid="$(cat "$pid_file" 2>/dev/null || true)"
if [ -n "$pid" ] && kill -0 "$pid" >/dev/null 2>&1; then
python3 - "$pid" "$status_file" <<'PY'
import json, pathlib, sys
status = None
path = pathlib.Path(sys.argv[2])
if path.exists():
try:
status = json.loads(path.read_text())
except Exception as exc:
status = {"parseError": str(exc)}
print(json.dumps({"ok": True, "alreadyRunning": True, "pid": sys.argv[1], "status": status, "valuesPrinted": False}, ensure_ascii=False))
PY
exit 0
fi
fi
printf %s ${shQuote(encoded)} | base64 -d >"$runner"
chmod 0700 "$runner"
cat >"$status_file" <<JSON
{"ok":false,"phase":"submitted","node":"${node.id}","lane":"${target.lane}","updatedAt":"$(date -u +%Y-%m-%dT%H:%M:%SZ)","valuesPrinted":false}
JSON
nohup sh "$runner" >${shQuote(install.state.logPath)} 2>&1 &
pid=$!
printf '%s\\n' "$pid" >"$pid_file"
python3 - "$pid" "$state_dir" "$status_file" <<'PY'
import json, sys
print(json.dumps({"ok": True, "submitted": True, "pid": sys.argv[1], "stateDir": sys.argv[2], "statusPath": sys.argv[3], "valuesPrinted": False}, ensure_ascii=False))
PY
`;
}
export function k3sInstallRunnerScript(node: ControlPlaneNodeSpec, target: ControlPlaneTargetSpec, install: ControlPlaneK3sInstallSpec): string {
if (node.k3s === null) throw new Error("k3sInstallRunnerScript requires node.k3s");
const unit = Buffer.from(k3sServiceUnitContent(node.k3s, install), "utf8").toString("base64");
const dropIn = Buffer.from(k3sDropInContent(node.k3s), "utf8").toString("base64");
const registries = Buffer.from(k3sRegistriesYaml(install), "utf8").toString("base64");
return `#!/bin/sh
set -eu
state_dir=${shQuote(install.state.dir)}
status_file=${shQuote(install.state.statusPath)}
proxy_env=${shQuote(install.proxyEnvPath)}
registries_yaml=${shQuote(install.registriesYamlPath)}
binary_url=${shQuote(install.binaryUrl)}
sha256_url=${shQuote(install.sha256Url)}
expected_sha=${shQuote(install.expectedSha256)}
version=${shQuote(install.version)}
node_name=${shQuote(node.k3s.nodeStatusName)}
service_name=${shQuote(node.k3s.serviceName)}
registry_name=${shQuote(install.localRegistry.containerName)}
registry_image=${shQuote(install.localRegistry.image)}
registry_bind=${shQuote(install.localRegistry.bind)}
log_status() {
phase="$1"; ok="$2"; message="$3"
python3 - "$status_file" "$phase" "$ok" "$message" <<'PY'
import json, pathlib, sys, datetime
path = pathlib.Path(sys.argv[1])
payload = {
"ok": sys.argv[3] == "true",
"phase": sys.argv[2],
"message": sys.argv[4],
"updatedAt": datetime.datetime.utcnow().replace(microsecond=0).isoformat() + "Z",
"node": "${node.id}",
"lane": "${target.lane}",
"valuesPrinted": False,
}
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(payload, ensure_ascii=False) + "\\n")
PY
}
log_status starting false "k3s install started"
mkdir -p "$state_dir" /usr/local/bin /etc/rancher/k3s /etc/systemd/system/k3s.service.d
if [ ! -s "$proxy_env" ]; then
log_status failed false "proxy env missing"
exit 41
fi
. "$proxy_env"
export HTTP_PROXY HTTPS_PROXY ALL_PROXY NO_PROXY http_proxy https_proxy all_proxy no_proxy
curl -fsS --max-time 20 --proxy "$HTTPS_PROXY" -o /dev/null https://www.gstatic.com/generate_204
log_status proxy-ready false "host proxy probe passed"
if command -v docker >/dev/null 2>&1; then
if docker ps --format '{{.Names}}' | grep -Fx "$registry_name" >/dev/null 2>&1; then
:
elif docker ps -a --format '{{.Names}}' | grep -Fx "$registry_name" >/dev/null 2>&1; then
docker start "$registry_name" >/dev/null 2>&1 || true
else
docker run -d --restart unless-stopped --name "$registry_name" -p "$registry_bind" "$registry_image" >/dev/null 2>&1 || true
fi
fi
log_status downloading false "downloading k3s binary through host proxy"
current_sha="$(sha256sum /usr/local/bin/k3s 2>/dev/null | cut -d' ' -f1 || true)"
if [ "$current_sha" != "$expected_sha" ]; then
sha_file="$state_dir/sha256sum-amd64.txt"
curl -fL --connect-timeout ${install.downloads.connectTimeoutSeconds} --max-time ${install.downloads.maxTimeSeconds} --retry ${install.downloads.retry} --retry-delay ${install.downloads.retryDelaySeconds} -o "$sha_file" "$sha256_url"
if ! grep -F "$expected_sha" "$sha_file" >/dev/null 2>&1; then
log_status failed false "expected k3s sha256 missing from upstream sha256 file"
exit 44
fi
tmp_bin=/usr/local/bin/k3s.tmp.$$
curl -fL --connect-timeout ${install.downloads.connectTimeoutSeconds} --max-time ${install.downloads.maxTimeSeconds} --retry ${install.downloads.retry} --retry-delay ${install.downloads.retryDelaySeconds} -o "$tmp_bin" "$binary_url"
actual_sha=$(sha256sum "$tmp_bin" | cut -d' ' -f1)
if [ "$actual_sha" != "$expected_sha" ]; then
rm -f "$tmp_bin"
log_status failed false "k3s binary sha256 mismatch"
exit 42
fi
chmod 0755 "$tmp_bin"
mv -f "$tmp_bin" /usr/local/bin/k3s
fi
ln -sf /usr/local/bin/k3s /usr/local/bin/kubectl
ln -sf /usr/local/bin/k3s /usr/local/bin/crictl
ln -sf /usr/local/bin/k3s /usr/local/bin/ctr
printf %s ${shQuote(registries)} | base64 -d >"$registries_yaml"
printf %s ${shQuote(unit)} | base64 -d >/etc/systemd/system/k3s.service
printf %s ${shQuote(dropIn)} | base64 -d >${shQuote(node.k3s.dropInPath)}
log_status systemd false "starting k3s service"
systemctl daemon-reload
systemctl enable --now "$service_name"
for _ in $(seq 1 120); do
if /usr/local/bin/kubectl get node "$node_name" >/tmp/unidesk-k3s-node.out 2>/tmp/unidesk-k3s-node.err; then
if /usr/local/bin/kubectl get node "$node_name" -o jsonpath='{.status.conditions[?(@.type=="Ready")].status}' 2>/dev/null | grep -Fx True >/dev/null 2>&1; then
log_status succeeded true "k3s node ready"
exit 0
fi
fi
sleep 5
done
log_status failed false "timed out waiting for k3s node ready"
exit 43
`;
}
export function k3sInstallStatusScript(node: ControlPlaneNodeSpec, target: ControlPlaneTargetSpec, install: ControlPlaneK3sInstallSpec, tailLines: number): string {
if (node.k3s === null) throw new Error("k3sInstallStatusScript requires node.k3s");
return `
set +e
status_path=${shQuote(install.state.statusPath)}
log_path=${shQuote(install.state.logPath)}
pid_file=${shQuote(`${install.state.dir}/pid`)}
service_name=${shQuote(node.k3s.serviceName)}
node_name=${shQuote(node.k3s.nodeStatusName)}
expected_sha=${shQuote(install.expectedSha256)}
proxy_env=${shQuote(install.proxyEnvPath)}
registry_name=${shQuote(install.localRegistry.containerName)}
running=false
pid=
if [ -s "$pid_file" ]; then
pid="$(cat "$pid_file" 2>/dev/null || true)"
if [ -n "$pid" ] && kill -0 "$pid" >/dev/null 2>&1; then running=true; fi
fi
service_active="$(systemctl is-active "$service_name" 2>/dev/null || true)"
binary_sha="$(sha256sum /usr/local/bin/k3s 2>/dev/null | cut -d' ' -f1 || true)"
proxy_probe=false
if [ -s "$proxy_env" ]; then
. "$proxy_env"
curl -fsS --max-time 20 --proxy "$HTTPS_PROXY" -o /dev/null https://www.gstatic.com/generate_204 >/dev/null 2>&1 && proxy_probe=true
fi
registry_running=false
if command -v docker >/dev/null 2>&1; then
docker ps --format '{{.Names}}' 2>/dev/null | grep -Fx "$registry_name" >/dev/null 2>&1 && registry_running=true
fi
node_ready=false
node_capacity=
node_allocatable=
if command -v kubectl >/dev/null 2>&1; then
ready="$(kubectl get node "$node_name" -o jsonpath='{.status.conditions[?(@.type=="Ready")].status}' 2>/dev/null || true)"
[ "$ready" = "True" ] && node_ready=true
node_capacity="$(kubectl get node "$node_name" -o jsonpath='{.status.capacity.pods}' 2>/dev/null || true)"
node_allocatable="$(kubectl get node "$node_name" -o jsonpath='{.status.allocatable.pods}' 2>/dev/null || true)"
fi
python3 - "$status_path" "$log_path" "$running" "$pid" "$service_active" "$binary_sha" "$expected_sha" "$proxy_probe" "$registry_running" "$node_ready" "$node_capacity" "$node_allocatable" ${shQuote(String(tailLines))} <<'PY'
import json, pathlib, sys
status_path = pathlib.Path(sys.argv[1])
log_path = pathlib.Path(sys.argv[2])
status = None
if status_path.exists():
try:
status = json.loads(status_path.read_text())
except Exception as exc:
status = {"parseError": str(exc), "rawTail": status_path.read_text(errors="replace")[-1000:]}
tail_lines = int(sys.argv[13])
log_tail = ""
if log_path.exists():
log_tail = "\\n".join(log_path.read_text(errors="replace").splitlines()[-tail_lines:])
binary_sha = sys.argv[6]
payload = {
"node": "${node.id}",
"lane": "${target.lane}",
"running": sys.argv[3] == "true",
"pid": sys.argv[4] or None,
"status": status,
"checks": {
"serviceActive": sys.argv[5] == "active",
"binarySha256Ok": binary_sha == sys.argv[7],
"binarySha256Prefix": binary_sha[:12] if binary_sha else "",
"proxyProbe": sys.argv[8] == "true",
"registryRunning": sys.argv[9] == "true",
"nodeReady": sys.argv[10] == "true",
"capacityPods": int(sys.argv[11]) if sys.argv[11].isdigit() else None,
"allocatablePods": int(sys.argv[12]) if sys.argv[12].isdigit() else None,
},
"logBytes": log_path.stat().st_size if log_path.exists() else 0,
"logTail": log_tail,
"valuesPrinted": False,
}
print(json.dumps(payload, ensure_ascii=False))
PY
`;
}
export function statusScript(nodeSpec: ControlPlaneNodeSpec, target: ControlPlaneTargetSpec): string {
const tektonRequiredCrds = shellJsonArray(target.tekton.install.requiredCrds);
const tektonDeploymentNamespaces = shellJsonArray(target.tekton.install.expectedDeploymentNamespaces);
const requiredCrds = shellJsonArray(target.argo.install.requiredCrds);
const argoDeployments = shellJsonArray(target.argo.install.expectedDeployments);
const argoStatefulSets = shellJsonArray(target.argo.install.expectedStatefulSets);
const tektonRuntimeProxy = JSON.stringify(runtimeHostProxyConfig(nodeSpec, target.tekton.install.runtimeProxy));
const argoRuntimeProxy = JSON.stringify(runtimeHostProxyConfig(nodeSpec, target.argo.install.runtimeProxy));
const k3s = nodeSpec.k3s;
const k3sDropIn = k3s === null ? "" : k3sDropInContent(k3s);
const gitMirrorEgressProxyJson = JSON.stringify(gitMirrorEffectiveEgressProxySummary(nodeSpec, target));
return `
set +e
node=${shQuote(target.node)}
lane=${shQuote(target.lane)}
ci_ns=${shQuote(target.ciNamespace)}
runtime_ns=${shQuote(target.runtimeNamespace)}
gitmirror_ns=${shQuote(target.gitMirror.namespace)}
read_deploy=${shQuote(target.gitMirror.serviceReadName)}
write_deploy=${shQuote(target.gitMirror.serviceWriteName)}
read_svc=${shQuote(target.gitMirror.serviceReadName)}
write_svc=${shQuote(target.gitMirror.serviceWriteName)}
cache_pvc=${shQuote(target.gitMirror.cachePvcName)}
cache_host_path=${shQuote(target.gitMirror.cacheHostPath ?? "")}
github_transport_mode=${shQuote(target.gitMirror.githubTransport.mode)}
github_ssh_secret=${shQuote(target.gitMirror.githubTransport.mode === "ssh" ? target.gitMirror.secretName : "")}
github_ssh_private_key=${shQuote(target.gitMirror.githubTransport.mode === "ssh" ? target.gitMirror.githubTransport.privateKeySecretKey : "")}
github_ssh_private_source_ref=${shQuote(target.gitMirror.githubTransport.mode === "ssh" ? target.gitMirror.githubTransport.privateKeySourceRef : "")}
github_ssh_private_source_key=${shQuote(target.gitMirror.githubTransport.mode === "ssh" ? target.gitMirror.githubTransport.privateKeySourceKey : "")}
github_ssh_known_hosts_key=${shQuote(target.gitMirror.githubTransport.mode === "ssh" ? target.gitMirror.githubTransport.knownHostsSecretKey ?? "" : "")}
github_ssh_known_hosts_source_ref=${shQuote(target.gitMirror.githubTransport.mode === "ssh" ? target.gitMirror.githubTransport.knownHostsSourceRef ?? "" : "")}
github_ssh_known_hosts_source_key=${shQuote(target.gitMirror.githubTransport.mode === "ssh" ? target.gitMirror.githubTransport.knownHostsSourceKey ?? "" : "")}
github_token_secret=${shQuote(target.gitMirror.githubTransport.mode === "https" ? target.gitMirror.githubTransport.tokenSecretName : "")}
github_token_key=${shQuote(target.gitMirror.githubTransport.mode === "https" ? target.gitMirror.githubTransport.tokenSecretKey : "")}
github_token_source_ref=${shQuote(target.gitMirror.githubTransport.mode === "https" ? target.gitMirror.githubTransport.tokenSourceRef : "")}
github_token_source_key=${shQuote(target.gitMirror.githubTransport.mode === "https" ? target.gitMirror.githubTransport.tokenSourceKey : "")}
gitmirror_egress_proxy_json=${shQuote(gitMirrorEgressProxyJson)}
pipeline=${shQuote(target.tekton.pipelineName)}
service_account=${shQuote(target.tekton.serviceAccountName)}
runtime_observer_role=${shQuote(target.tekton.runtimeObserverRbac.roleName)}
runtime_observer_rolebinding=${shQuote(target.tekton.runtimeObserverRbac.roleBindingName)}
ci_git_secret=${shQuote(target.tekton.gitWorkspaceSecret.name)}
ci_git_private_key=${shQuote(target.tekton.gitWorkspaceSecret.privateKeySecretKey)}
ci_git_known_hosts_key=${shQuote(target.tekton.gitWorkspaceSecret.knownHostsSecretKey)}
ci_git_source_ref_from=${shQuote(target.tekton.gitWorkspaceSecret.sourceRefFrom)}
ci_git_private_source_ref=${shQuote(target.gitMirror.githubTransport.mode === "ssh" ? target.gitMirror.githubTransport.privateKeySourceRef : "")}
ci_git_private_source_key=${shQuote(target.gitMirror.githubTransport.mode === "ssh" ? target.gitMirror.githubTransport.privateKeySourceKey : "")}
ci_git_known_hosts_source_ref=${shQuote(target.gitMirror.githubTransport.mode === "ssh" ? target.gitMirror.githubTransport.knownHostsSourceRef ?? "" : "")}
ci_git_known_hosts_source_key=${shQuote(target.gitMirror.githubTransport.mode === "ssh" ? target.gitMirror.githubTransport.knownHostsSourceKey ?? "" : "")}
argo_ns=${shQuote(target.argo.namespace)}
argo_project=${shQuote(target.argo.projectName)}
argo_app=${shQuote(target.argo.applicationName)}
argo_observer_role=${shQuote(target.tekton.argoObserverRbac.roleName)}
argo_observer_rolebinding=${shQuote(target.tekton.argoObserverRbac.roleBindingName)}
registry=${shQuote(nodeSpec.registry.endpoint)}
tools_image=${shQuote(target.tekton.toolsImage.output)}
tekton_required_crds_json=${shQuote(tektonRequiredCrds)}
tekton_deployment_namespaces_json=${shQuote(tektonDeploymentNamespaces)}
required_crds_json=${shQuote(requiredCrds)}
argo_deployments_json=${shQuote(argoDeployments)}
argo_statefulsets_json=${shQuote(argoStatefulSets)}
tekton_runtime_proxy_json=${shQuote(tektonRuntimeProxy)}
argo_runtime_proxy_json=${shQuote(argoRuntimeProxy)}
k3s_managed=${k3s === null ? "false" : "true"}
k3s_service=${shQuote(k3s?.serviceName ?? "")}
k3s_dropin=${shQuote(k3s?.dropInPath ?? "")}
k3s_node=${shQuote(k3s?.nodeStatusName ?? "")}
k3s_desired_max_pods=${shQuote(String(k3s?.kubelet.maxPods ?? ""))}
k3s_expected_sha=${shQuote(k3s === null ? "" : sha256Short(k3sDropIn))}
exists_ns() { kubectl get ns "$1" >/dev/null 2>&1 && printf true || printf false; }
exists_res() { kubectl -n "$1" get "$2" "$3" >/dev/null 2>&1 && printf true || printf false; }
deploy_ready() { desired=$(kubectl -n "$1" get deploy "$2" -o 'jsonpath={.spec.replicas}' 2>/dev/null || true); ready=$(kubectl -n "$1" get deploy "$2" -o 'jsonpath={.status.readyReplicas}' 2>/dev/null || true); [ -n "$desired" ] && [ "$desired" -gt 0 ] 2>/dev/null && [ "\${ready:-0}" = "$desired" ] && printf true || printf false; }
sts_ready() { desired=$(kubectl -n "$1" get statefulset "$2" -o 'jsonpath={.spec.replicas}' 2>/dev/null || true); ready=$(kubectl -n "$1" get statefulset "$2" -o 'jsonpath={.status.readyReplicas}' 2>/dev/null || true); [ -n "$desired" ] && [ "$desired" -gt 0 ] 2>/dev/null && [ "\${ready:-0}" = "$desired" ] && printf true || printf false; }
endpoint_ready() { endpoints=$(kubectl -n "$1" get endpoints "$2" -o 'jsonpath={.subsets[*].addresses[*].ip}' 2>/dev/null || true); [ -n "$endpoints" ] && printf true || printf false; }
can_i_runtime() { kubectl auth can-i "$1" "$2" --as="system:serviceaccount:$ci_ns:$service_account" -n "$runtime_ns" >/dev/null 2>&1 && printf true || printf false; }
can_i_argo() { kubectl auth can-i "$1" "$2" --as="system:serviceaccount:$ci_ns:$service_account" -n "$argo_ns" >/dev/null 2>&1 && printf true || printf false; }
runtime_observer_role_exists=$(exists_res "$runtime_ns" role "$runtime_observer_role")
runtime_observer_rolebinding_exists=$(exists_res "$runtime_ns" rolebinding "$runtime_observer_rolebinding")
runtime_observer_can_list_deployments=$(can_i_runtime list deployments.apps)
runtime_observer_can_list_statefulsets=$(can_i_runtime list statefulsets.apps)
runtime_observer_ready=false
if [ "$runtime_observer_role_exists" = true ] && [ "$runtime_observer_rolebinding_exists" = true ] && [ "$runtime_observer_can_list_deployments" = true ] && [ "$runtime_observer_can_list_statefulsets" = true ]; then runtime_observer_ready=true; fi
argo_observer_role_exists=$(exists_res "$argo_ns" role "$argo_observer_role")
argo_observer_rolebinding_exists=$(exists_res "$argo_ns" rolebinding "$argo_observer_rolebinding")
argo_observer_can_get_application=$(can_i_argo get applications.argoproj.io)
argo_observer_ready=false
if [ "$argo_observer_role_exists" = true ] && [ "$argo_observer_rolebinding_exists" = true ] && [ "$argo_observer_can_get_application" = true ]; then argo_observer_ready=true; fi
github_transport_json=$(python3 - "$github_transport_mode" "$gitmirror_ns" "$github_ssh_secret" "$github_ssh_private_key" "$github_ssh_private_source_ref" "$github_ssh_private_source_key" "$github_ssh_known_hosts_key" "$github_ssh_known_hosts_source_ref" "$github_ssh_known_hosts_source_key" "$github_token_secret" "$github_token_key" "$github_token_source_ref" "$github_token_source_key" <<'PY'
import hashlib, json, subprocess, sys
mode, namespace, ssh_secret, ssh_private_key, ssh_private_source_ref, ssh_private_source_key, ssh_known_hosts_key, ssh_known_hosts_source_ref, ssh_known_hosts_source_key, token_secret, token_key, token_source_ref, token_source_key = sys.argv[1:14]
def read_secret(name):
proc = subprocess.run(["kubectl", "-n", namespace, "get", "secret", name, "-o", "json"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
if proc.returncode != 0:
return False, {}, {}
try:
obj = json.loads(proc.stdout)
except Exception:
obj = {}
return True, obj.get("data") or {}, obj.get("metadata", {}).get("annotations") or {}
def fingerprint(value):
return "sha256:" + hashlib.sha256(value.encode()).hexdigest()[:16] if value else None
if mode == "ssh":
exists, data, annotations = read_secret(ssh_secret)
private_encoded = data.get(ssh_private_key) if isinstance(data, dict) else None
known_hosts_encoded = data.get(ssh_known_hosts_key) if ssh_known_hosts_key and isinstance(data, dict) else None
private_present = isinstance(private_encoded, str) and len(private_encoded) > 0
known_hosts_expected = bool(ssh_known_hosts_key)
known_hosts_present = isinstance(known_hosts_encoded, str) and len(known_hosts_encoded) > 0
print(json.dumps({
"mode": mode,
"required": True,
"ready": exists and private_present and (not known_hosts_expected or known_hosts_present),
"secretName": ssh_secret,
"privateKeySecretKey": ssh_private_key,
"privateKeySourceRef": ssh_private_source_ref,
"privateKeySourceKey": ssh_private_source_key,
"privateKeySecretExists": exists,
"privateKeyPresent": private_present,
"privateKeyBytes": len(private_encoded) if private_present else 0,
"privateKeyFingerprint": annotations.get("unidesk.ai/private-key-fingerprint") or fingerprint(private_encoded),
"knownHostsSecretKey": ssh_known_hosts_key or None,
"knownHostsSourceRef": ssh_known_hosts_source_ref or None,
"knownHostsSourceKey": ssh_known_hosts_source_key or None,
"knownHostsPresent": (known_hosts_present if known_hosts_expected else None),
"knownHostsBytes": (len(known_hosts_encoded) if known_hosts_present else 0) if known_hosts_expected else None,
"knownHostsFingerprint": annotations.get("unidesk.ai/known-hosts-fingerprint") or fingerprint(known_hosts_encoded),
"valuesPrinted": False,
}))
raise SystemExit(0)
if mode != "https":
print(json.dumps({"mode": mode, "required": True, "ready": False, "valuesPrinted": False}))
raise SystemExit(0)
exists, data, _ = read_secret(token_secret)
encoded = data.get(token_key) if isinstance(data, dict) else None
present = isinstance(encoded, str) and len(encoded) > 0
print(json.dumps({"mode": mode, "required": True, "ready": exists and present, "tokenSecretName": token_secret, "tokenSecretKey": token_key, "tokenSourceRef": token_source_ref, "tokenSourceKey": token_source_key, "tokenSecretExists": exists, "tokenKeyPresent": present, "tokenKeyBytes": len(encoded) if present else 0, "tokenFingerprint": fingerprint(encoded), "valuesPrinted": False}))
PY
)
ci_git_workspace_json=$(python3 - "$ci_ns" "$ci_git_secret" "$ci_git_private_key" "$ci_git_known_hosts_key" "$ci_git_source_ref_from" "$ci_git_private_source_ref" "$ci_git_private_source_key" "$ci_git_known_hosts_source_ref" "$ci_git_known_hosts_source_key" <<'PY'
import hashlib, json, subprocess, sys
namespace, secret, private_key, known_hosts_key, source_ref_from, private_source_ref, private_source_key, known_hosts_source_ref, known_hosts_source_key = sys.argv[1:10]
proc = subprocess.run(["kubectl", "-n", namespace, "get", "secret", secret, "-o", "json"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
exists = proc.returncode == 0
obj = {}
if exists:
try:
obj = json.loads(proc.stdout)
except Exception:
obj = {}
data = obj.get("data") or {}
annotations = obj.get("metadata", {}).get("annotations") or {}
def fingerprint(value):
return "sha256:" + hashlib.sha256(value.encode()).hexdigest()[:16] if value else None
private_encoded = data.get(private_key) if isinstance(data, dict) else None
known_hosts_encoded = data.get(known_hosts_key) if isinstance(data, dict) else None
private_present = isinstance(private_encoded, str) and len(private_encoded) > 0
known_hosts_present = isinstance(known_hosts_encoded, str) and len(known_hosts_encoded) > 0
print(json.dumps({
"required": True,
"ready": exists and private_present and known_hosts_present,
"namespace": namespace,
"secretName": secret,
"sourceRefFrom": source_ref_from,
"privateKeySecretKey": private_key,
"privateKeySourceRef": private_source_ref,
"privateKeySourceKey": private_source_key,
"privateKeySecretExists": exists,
"privateKeyPresent": private_present,
"privateKeyBytes": len(private_encoded) if private_present else 0,
"privateKeyFingerprint": annotations.get("unidesk.ai/private-key-fingerprint") or fingerprint(private_encoded),
"knownHostsSecretKey": known_hosts_key,
"knownHostsSourceRef": known_hosts_source_ref,
"knownHostsSourceKey": known_hosts_source_key,
"knownHostsPresent": known_hosts_present,
"knownHostsBytes": len(known_hosts_encoded) if known_hosts_present else 0,
"knownHostsFingerprint": annotations.get("unidesk.ai/known-hosts-fingerprint") or fingerprint(known_hosts_encoded),
"valuesPrinted": False,
}))
PY
)
registry_ready=false
if command -v curl >/dev/null 2>&1; then curl -fsS --max-time 3 "http://$registry/v2/" >/tmp/hwlab-registry.out 2>/tmp/hwlab-registry.err && registry_ready=true; fi
tools_repo_tag=\${tools_image#\${registry}/}
tools_repo=\${tools_repo_tag%:*}
tools_tag=\${tools_repo_tag##*:}
tools_image_ready=false
manifest_accept='application/vnd.oci.image.manifest.v1+json, application/vnd.docker.distribution.manifest.v2+json, application/vnd.oci.image.index.v1+json, application/vnd.docker.distribution.manifest.list.v2+json'
if [ "$tools_repo" != "$tools_repo_tag" ] && command -v curl >/dev/null 2>&1; then curl -fsS --max-time 5 -H "Accept: $manifest_accept" "http://$registry/v2/$tools_repo/manifests/$tools_tag" >/tmp/hwlab-tools-image.out 2>/tmp/hwlab-tools-image.err && tools_image_ready=true; fi
cache_host_path_ready=false
if [ -n "$cache_host_path" ] && kubectl -n "$gitmirror_ns" exec deploy/"$read_deploy" -- sh -lc 'test -d /cache' >/dev/null 2>&1; then cache_host_path_ready=true; fi
k3s_fragment=$(python3 - "$k3s_managed" "$k3s_service" "$k3s_dropin" "$k3s_node" "$k3s_desired_max_pods" "$k3s_expected_sha" <<'PY'
import hashlib, json, re, subprocess, sys
managed = sys.argv[1] == "true"
service, dropin, node_name, desired_raw, expected_sha = sys.argv[2:7]
def run(args):
return subprocess.run(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
def to_int(value):
try:
return int(value)
except Exception:
return None
if not managed:
print(json.dumps({"managed": False, "ready": True}))
raise SystemExit(0)
desired = to_int(desired_raw)
node_json = run(["kubectl", "get", "node", node_name, "-o", "json"])
capacity = None
allocatable = None
node_ready = False
if node_json.returncode == 0:
data = json.loads(node_json.stdout)
capacity = to_int(data.get("status", {}).get("capacity", {}).get("pods"))
allocatable = to_int(data.get("status", {}).get("allocatable", {}).get("pods"))
for condition in data.get("status", {}).get("conditions", []):
if condition.get("type") == "Ready":
node_ready = condition.get("status") == "True"
unit = run(["systemctl", "cat", service])
unit_text = unit.stdout if unit.returncode == 0 else ""
dropin_read = run(["cat", dropin])
dropin_exists = dropin_read.returncode == 0
dropin_text = dropin_read.stdout if dropin_exists else ""
dropin_sha = "sha256:" + hashlib.sha256(dropin_text.encode()).hexdigest() if dropin_exists else None
matches = re.findall(r"max-pods=([0-9]+)", unit_text + "\\n" + dropin_text)
configured = to_int(matches[-1]) if matches else None
dropin_matches = dropin_sha == expected_sha
ready = dropin_matches and capacity == desired and allocatable == desired
source = "managed-dropin" if dropin_matches else ("systemd-or-config" if configured is not None else "kubelet-default")
print(json.dumps({
"managed": True,
"ready": ready,
"serviceName": service,
"dropInPath": dropin,
"dropInExists": dropin_exists,
"dropInSha256": dropin_sha,
"expectedDropInSha256": expected_sha,
"dropInMatches": dropin_matches,
"configuredMaxPods": configured,
"desiredMaxPods": desired,
"liveNodeName": node_name,
"liveCapacityPods": capacity,
"liveAllocatablePods": allocatable,
"nodeReady": node_ready,
"restartRequired": not ready,
"source": source,
"unitReadable": unit.returncode == 0,
}))
PY
)
python3 - "$tekton_required_crds_json" "$tekton_deployment_namespaces_json" "$tekton_runtime_proxy_json" <<'PY' >/tmp/hwlab-node-tekton-status-fragments.json
import json, subprocess, sys
required_crds=json.loads(sys.argv[1])
namespaces=json.loads(sys.argv[2])
runtime_proxy=json.loads(sys.argv[3])
def run(args):
return subprocess.run(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
def exists(args):
return run(args).returncode == 0
def env_map(container):
result={}
for item in container.get("env") or []:
name=item.get("name")
if isinstance(name, str) and "value" in item:
result[name]=str(item.get("value") or "")
return result
def runtime_proxy_workload(kind, namespace, name, cfg):
proc=run(["kubectl", "-n", namespace, "get", kind, name, "-o", "json"])
if proc.returncode != 0:
return {"kind": kind, "namespace": namespace, "name": name, "exists": False, "ready": False}
obj=json.loads(proc.stdout or "{}")
template=obj.get("spec", {}).get("template", {})
spec=template.get("spec", {})
annotations=template.get("metadata", {}).get("annotations") or {}
containers=spec.get("containers") or []
expected_env={
"HTTP_PROXY": str(cfg.get("proxyUrl") or ""),
"HTTPS_PROXY": str(cfg.get("proxyUrl") or ""),
"ALL_PROXY": str(cfg.get("proxyUrl") or ""),
"NO_PROXY": ",".join([str(item) for item in cfg.get("noProxy") or []]),
"http_proxy": str(cfg.get("proxyUrl") or ""),
"https_proxy": str(cfg.get("proxyUrl") or ""),
"all_proxy": str(cfg.get("proxyUrl") or ""),
"no_proxy": ",".join([str(item) for item in cfg.get("noProxy") or []]),
}
env_matches=True
if cfg.get("injectEnv"):
env_matches=bool(containers) and all(all(env_map(container).get(key) == value for key, value in expected_env.items()) for container in containers)
host_network_matches=(not cfg.get("hostNetwork")) or spec.get("hostNetwork") is True
dns_policy_matches=(not cfg.get("hostNetwork")) or spec.get("dnsPolicy") == "ClusterFirstWithHostNet"
annotation_matches=annotations.get("unidesk.ai/runtime-proxy") == "host-route" and annotations.get("unidesk.ai/runtime-proxy-config-ref") == str(cfg.get("configRef") or "")
return {
"kind": kind,
"namespace": namespace,
"name": name,
"exists": True,
"ready": host_network_matches and dns_policy_matches and env_matches and annotation_matches,
"hostNetwork": spec.get("hostNetwork") is True,
"hostNetworkMatches": host_network_matches,
"dnsPolicy": spec.get("dnsPolicy") or None,
"dnsPolicyMatches": dns_policy_matches,
"injectEnv": bool(cfg.get("injectEnv")),
"envMatches": env_matches,
"annotationMatches": annotation_matches,
}
def runtime_proxy_status(cfg, namespace_values):
if not cfg.get("enabled"):
return {"enabled": False, "ready": True, "mode": cfg.get("mode") or "host-route", "workloads": [], "valuesPrinted": False}
workloads=[]
missing=[]
for kind, names in (("deployment", cfg.get("deployments") or []), ("statefulset", cfg.get("statefulSets") or [])):
for name in [str(item) for item in names]:
found=[]
for namespace in namespace_values:
proc=run(["kubectl", "-n", namespace, "get", kind, name, "-o", "name"])
if proc.returncode == 0:
found.append(namespace)
if not found:
missing.append({"kind": kind, "name": name})
workloads.append({"kind": kind, "name": name, "exists": False, "ready": False})
else:
workloads.extend(runtime_proxy_workload(kind, namespace, name, cfg) for namespace in found)
return {
"enabled": True,
"mode": cfg.get("mode") or "host-route",
"configRef": cfg.get("configRef"),
"hostNetwork": bool(cfg.get("hostNetwork")),
"injectEnv": bool(cfg.get("injectEnv")),
"selectedDeployments": cfg.get("deployments") or [],
"selectedStatefulSets": cfg.get("statefulSets") or [],
"missing": missing,
"workloads": workloads,
"ready": len(missing) == 0 and all(item.get("ready") for item in workloads),
"valuesPrinted": False,
}
def namespace_deployments(namespace):
proc = run(["kubectl", "-n", namespace, "get", "deploy", "-o", "json"])
if proc.returncode != 0:
return {"namespace": namespace, "namespaceExists": False, "deployments": [], "ready": False}
try:
data=json.loads(proc.stdout)
except Exception:
return {"namespace": namespace, "namespaceExists": True, "deployments": [], "ready": False}
deployments=[]
for item in data.get("items", []):
desired=int(item.get("spec", {}).get("replicas") or 0)
ready=int(item.get("status", {}).get("readyReplicas") or 0)
deployments.append({"name": item.get("metadata", {}).get("name"), "desired": desired, "readyReplicas": ready, "ready": desired > 0 and ready == desired})
return {"namespace": namespace, "namespaceExists": True, "deployments": deployments, "ready": len(deployments) > 0 and all(item["ready"] for item in deployments)}
crds=[{"name": name, "exists": exists(["kubectl", "get", "crd", name])} for name in required_crds]
deployment_namespaces=[namespace_deployments(ns) for ns in namespaces]
print(json.dumps({"crds": crds, "deploymentNamespaces": deployment_namespaces, "crdsReady": all(item["exists"] for item in crds), "deploymentsReady": all(item["ready"] for item in deployment_namespaces) if deployment_namespaces else True, "runtimeProxy": runtime_proxy_status(runtime_proxy, namespaces)}))
PY
tekton_fragment=$(cat /tmp/hwlab-node-tekton-status-fragments.json 2>/dev/null || printf '{}')
tekton_installed=$(python3 - "$tekton_fragment" <<'PY'
import json, sys
data=json.loads(sys.argv[1] or '{}')
print('true' if data.get('crdsReady') and data.get('deploymentsReady') else 'false')
PY
)
python3 - "$required_crds_json" "$argo_deployments_json" "$argo_statefulsets_json" "$argo_runtime_proxy_json" <<'PY' >/tmp/hwlab-node-status-fragments.json
import json, subprocess, sys
required_crds=json.loads(sys.argv[1])
deployments=json.loads(sys.argv[2])
statefulsets=json.loads(sys.argv[3])
runtime_proxy=json.loads(sys.argv[4])
ns="${target.argo.namespace}"
def run(args):
return subprocess.run(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
def exists(args):
return run(args).returncode == 0
def env_map(container):
result={}
for item in container.get("env") or []:
name=item.get("name")
if isinstance(name, str) and "value" in item:
result[name]=str(item.get("value") or "")
return result
def runtime_proxy_workload(kind, namespace, name, cfg):
proc=run(["kubectl", "-n", namespace, "get", kind, name, "-o", "json"])
if proc.returncode != 0:
return {"kind": kind, "namespace": namespace, "name": name, "exists": False, "ready": False}
obj=json.loads(proc.stdout or "{}")
template=obj.get("spec", {}).get("template", {})
spec=template.get("spec", {})
annotations=template.get("metadata", {}).get("annotations") or {}
containers=spec.get("containers") or []
expected_env={
"HTTP_PROXY": str(cfg.get("proxyUrl") or ""),
"HTTPS_PROXY": str(cfg.get("proxyUrl") or ""),
"ALL_PROXY": str(cfg.get("proxyUrl") or ""),
"NO_PROXY": ",".join([str(item) for item in cfg.get("noProxy") or []]),
"http_proxy": str(cfg.get("proxyUrl") or ""),
"https_proxy": str(cfg.get("proxyUrl") or ""),
"all_proxy": str(cfg.get("proxyUrl") or ""),
"no_proxy": ",".join([str(item) for item in cfg.get("noProxy") or []]),
}
env_matches=True
if cfg.get("injectEnv"):
env_matches=bool(containers) and all(all(env_map(container).get(key) == value for key, value in expected_env.items()) for container in containers)
host_network_matches=(not cfg.get("hostNetwork")) or spec.get("hostNetwork") is True
dns_policy_matches=(not cfg.get("hostNetwork")) or spec.get("dnsPolicy") == "ClusterFirstWithHostNet"
annotation_matches=annotations.get("unidesk.ai/runtime-proxy") == "host-route" and annotations.get("unidesk.ai/runtime-proxy-config-ref") == str(cfg.get("configRef") or "")
return {
"kind": kind,
"namespace": namespace,
"name": name,
"exists": True,
"ready": host_network_matches and dns_policy_matches and env_matches and annotation_matches,
"hostNetwork": spec.get("hostNetwork") is True,
"hostNetworkMatches": host_network_matches,
"dnsPolicy": spec.get("dnsPolicy") or None,
"dnsPolicyMatches": dns_policy_matches,
"injectEnv": bool(cfg.get("injectEnv")),
"envMatches": env_matches,
"annotationMatches": annotation_matches,
}
def runtime_proxy_status(cfg):
if not cfg.get("enabled"):
return {"enabled": False, "ready": True, "mode": cfg.get("mode") or "host-route", "workloads": [], "valuesPrinted": False}
workloads=[]
missing=[]
for kind, names in (("deployment", cfg.get("deployments") or []), ("statefulset", cfg.get("statefulSets") or [])):
for name in [str(item) for item in names]:
proc=run(["kubectl", "-n", ns, "get", kind, name, "-o", "name"])
if proc.returncode != 0:
missing.append({"kind": kind, "name": name})
workloads.append({"kind": kind, "name": name, "exists": False, "ready": False})
else:
workloads.append(runtime_proxy_workload(kind, ns, name, cfg))
return {
"enabled": True,
"mode": cfg.get("mode") or "host-route",
"configRef": cfg.get("configRef"),
"hostNetwork": bool(cfg.get("hostNetwork")),
"injectEnv": bool(cfg.get("injectEnv")),
"selectedDeployments": cfg.get("deployments") or [],
"selectedStatefulSets": cfg.get("statefulSets") or [],
"missing": missing,
"workloads": workloads,
"ready": len(missing) == 0 and all(item.get("ready") for item in workloads),
"valuesPrinted": False,
}
def ready(kind, name):
data = run(["kubectl", "-n", ns, "get", kind, name, "-o", "json"])
if data.returncode != 0:
return {"name": name, "exists": False, "ready": False, "desired": None, "readyReplicas": None}
obj=json.loads(data.stdout)
desired=int(obj.get("spec", {}).get("replicas") or 0)
ready_replicas=int(obj.get("status", {}).get("readyReplicas") or 0)
return {"name": name, "exists": True, "ready": desired > 0 and ready_replicas == desired, "desired": desired, "readyReplicas": ready_replicas}
crds=[{"name": name, "exists": exists(["kubectl", "get", "crd", name])} for name in required_crds]
deploy=[ready("deployment", name) for name in deployments]
sts=[ready("statefulset", name) for name in statefulsets]
print(json.dumps({"crds": crds, "deployments": deploy, "statefulSets": sts, "crdsReady": all(item["exists"] for item in crds), "deploymentsReady": all(item["ready"] for item in deploy) if deploy else True, "statefulSetsReady": all(item["ready"] for item in sts) if sts else True, "runtimeProxy": runtime_proxy_status(runtime_proxy)}))
PY
argo_fragment=$(cat /tmp/hwlab-node-status-fragments.json 2>/dev/null || printf '{}')
cat <<JSON
{"observedAt":"$(date -u +%Y-%m-%dT%H:%M:%SZ)","node":"$node","lane":"$lane","components":{"k3sNodeConfig":$k3s_fragment,"tekton":{"installed":$tekton_installed,"controllerReady":$(deploy_ready tekton-pipelines tekton-pipelines-controller),"webhookReady":$(deploy_ready tekton-pipelines tekton-pipelines-webhook),"install":$tekton_fragment},"ciNamespace":{"name":"$ci_ns","exists":$(exists_ns "$ci_ns"),"serviceAccountExists":$(exists_res "$ci_ns" serviceaccount "$service_account"),"pipelineExists":$(exists_res "$ci_ns" pipeline "$pipeline"),"gitWorkspaceSecret":$ci_git_workspace_json},"gitMirror":{"namespace":"$gitmirror_ns","namespaceExists":$(exists_ns "$gitmirror_ns"),"readDeploymentReady":$(deploy_ready "$gitmirror_ns" "$read_deploy"),"writeDeploymentReady":$(deploy_ready "$gitmirror_ns" "$write_deploy"),"readServiceExists":$(exists_res "$gitmirror_ns" service "$read_svc"),"writeServiceExists":$(exists_res "$gitmirror_ns" service "$write_svc"),"readEndpointsReady":$(endpoint_ready "$gitmirror_ns" "$read_svc"),"writeEndpointsReady":$(endpoint_ready "$gitmirror_ns" "$write_svc"),"cachePvcExists":$(exists_res "$gitmirror_ns" pvc "$cache_pvc"),"cacheHostPath":"$cache_host_path","cacheHostPathReady":$cache_host_path_ready,"egressProxy":$gitmirror_egress_proxy_json,"githubTransport":$github_transport_json,"summary":{"localSource":null,"githubSource":null,"localGitops":null,"githubGitops":null,"pendingFlush":null,"flushNeeded":null,"githubInSync":null}},"argo":{"namespace":"$argo_ns","namespaceExists":$(exists_ns "$argo_ns"),"installed":$(kubectl get crd applications.argoproj.io appprojects.argoproj.io >/dev/null 2>&1 && printf true || printf false),"projectExists":$(kubectl -n "$argo_ns" get appproject "$argo_project" >/dev/null 2>&1 && printf true || printf false),"applicationExists":$(kubectl -n "$argo_ns" get application "$argo_app" >/dev/null 2>&1 && printf true || printf false),"argoObserverRbac":{"roleName":"$argo_observer_role","roleExists":$argo_observer_role_exists,"roleBindingName":"$argo_observer_rolebinding","roleBindingExists":$argo_observer_rolebinding_exists,"serviceAccountNamespace":"$ci_ns","serviceAccountName":"$service_account","canGetApplication":$argo_observer_can_get_application,"ready":$argo_observer_ready},"install":$argo_fragment},"registry":{"endpoint":"$registry","ready":$registry_ready,"toolsImage":"$tools_image","toolsImageReady":$tools_image_ready},"runtimeNamespace":{"name":"$runtime_ns","exists":$(exists_ns "$runtime_ns"),"runtimeObserverRbac":{"roleName":"$runtime_observer_role","roleExists":$runtime_observer_role_exists,"roleBindingName":"$runtime_observer_rolebinding","roleBindingExists":$runtime_observer_rolebinding_exists,"serviceAccountNamespace":"$ci_ns","serviceAccountName":"$service_account","canListDeployments":$runtime_observer_can_list_deployments,"canListStatefulSets":$runtime_observer_can_list_statefulsets,"ready":$runtime_observer_ready}}}}
JSON
`;
}
export function applyScript(yaml: string, node: ControlPlaneNodeSpec, target: ControlPlaneTargetSpec): string {
const encoded = Buffer.from(yaml, "utf8").toString("base64");
return `
set +e
manifest=$(mktemp /tmp/hwlab-node-infra.XXXXXX.yaml)
printf %s ${shQuote(encoded)} | base64 -d >"$manifest"
field_manager=${shQuote(controlPlaneFieldManager(target))}
kubectl apply --server-side --force-conflicts --field-manager="$field_manager" -f "$manifest" >/tmp/hwlab-node-infra-apply.out 2>/tmp/hwlab-node-infra-apply.err
kubectl_rc=$?
${k3sApplyScriptFragment(node.k3s, target)}
python3 - "$kubectl_rc" "$k3s_report_file" <<'PY'
import json, pathlib, sys
k3s_report = {}
try:
k3s_report = json.loads(pathlib.Path(sys.argv[2]).read_text(errors='replace'))
except Exception as exc:
k3s_report = {"managed": None, "ok": False, "parseError": str(exc)}
out=pathlib.Path('/tmp/hwlab-node-infra-apply.out').read_text(errors='replace') if pathlib.Path('/tmp/hwlab-node-infra-apply.out').exists() else ''
err=pathlib.Path('/tmp/hwlab-node-infra-apply.err').read_text(errors='replace') if pathlib.Path('/tmp/hwlab-node-infra-apply.err').exists() else ''
print(json.dumps({'k3sNodeConfig': k3s_report, 'kubernetesApply': {'applyExitCode': int(sys.argv[1]), 'stdoutPreview': out[-2000:], 'stderrPreview': err[-2000:], 'runtimeRolloutTriggered': False, 'pk01Touched': False}}, ensure_ascii=False))
PY
rm -f "$manifest"
if [ "$kubectl_rc" != 0 ]; then exit "$kubectl_rc"; fi
exit "$k3s_rc"
`;
}
export function controlPlaneFieldManager(target: ControlPlaneTargetSpec): string {
return `unidesk-hwlab-${target.node.toLowerCase()}-${target.lane}-control-plane`;
}
export function k3sApplyScriptFragment(spec: ControlPlaneK3sNodeSpec | null, target: ControlPlaneTargetSpec): string {
if (spec === null) {
return `
k3s_report_file=$(mktemp /tmp/hwlab-node-k3s.XXXXXX.json)
printf '{"managed":false,"ok":true,"mutation":false}\\n' >"$k3s_report_file"
k3s_rc=0
`;
}
const content = k3sDropInContent(spec);
const encoded = Buffer.from(content, "utf8").toString("base64");
return `
k3s_report_file=$(mktemp /tmp/hwlab-node-k3s.XXXXXX.json)
k3s_service=${shQuote(spec.serviceName)}
k3s_dropin=${shQuote(spec.dropInPath)}
k3s_node=${shQuote(spec.nodeStatusName)}
k3s_namespace=${shQuote(target.ciNamespace)}
k3s_image=${shQuote(target.tekton.toolsImage.output)}
k3s_desired_max_pods=${shQuote(String(spec.kubelet.maxPods))}
k3s_expected_sha=${shQuote(sha256Short(content))}
k3s_before_capacity=$(kubectl get node "$k3s_node" -o 'jsonpath={.status.capacity.pods}' 2>/dev/null || true)
k3s_before_allocatable=$(kubectl get node "$k3s_node" -o 'jsonpath={.status.allocatable.pods}' 2>/dev/null || true)
capacity_restart=false
if [ "$k3s_before_capacity" != "$k3s_desired_max_pods" ] || [ "$k3s_before_allocatable" != "$k3s_desired_max_pods" ]; then capacity_restart=true; fi
k3s_current_dropin_sha=
if [ -f "$k3s_dropin" ]; then k3s_current_dropin_sha=$(sha256sum "$k3s_dropin" | awk '{print "sha256:"$1}'); fi
if [ "$k3s_current_dropin_sha" = "$k3s_expected_sha" ] && [ "$capacity_restart" = false ]; then
python3 - "$k3s_current_dropin_sha" "$k3s_expected_sha" "$k3s_service" "$k3s_dropin" "$k3s_node" "$k3s_desired_max_pods" "$k3s_before_capacity" "$k3s_before_allocatable" <<'PY' >"$k3s_report_file"
import json, sys
dropin_sha, expected_sha, service, dropin, node_name, desired, before_capacity, before_allocatable = sys.argv[1:9]
print(json.dumps({
"managed": True,
"ok": True,
"mutation": False,
"applyMode": "noop",
"completionPending": False,
"serviceName": service,
"dropInPath": dropin,
"dropInSha256": dropin_sha,
"expectedDropInSha256": expected_sha,
"dropInMatches": dropin_sha == expected_sha,
"nodeName": node_name,
"desiredMaxPods": int(desired),
"beforeCapacityPods": int(before_capacity) if before_capacity.isdigit() else None,
"beforeAllocatablePods": int(before_allocatable) if before_allocatable.isdigit() else None,
}, ensure_ascii=False))
PY
k3s_rc=0
else
k3s_job="hwlab-node-k3s-config-$(date +%s)"
k3s_job_manifest=$(mktemp /tmp/hwlab-node-k3s-job.XXXXXX.json)
k3s_host_script=$(mktemp /tmp/hwlab-node-k3s-host.XXXXXX.sh)
k3s_job_apply_stdout=/tmp/hwlab-node-k3s-job-apply.out
k3s_job_apply_stderr=/tmp/hwlab-node-k3s-job-apply.err
k3s_docker_stdout=/tmp/hwlab-node-k3s-docker.out
k3s_docker_stderr=/tmp/hwlab-node-k3s-docker.err
k3s_host_report="/tmp/$k3s_job-report.json"
rm -f "$k3s_host_report"
python3 - "$k3s_job_manifest" "$k3s_host_script" "$k3s_job" "$k3s_namespace" "$k3s_image" "$k3s_dropin" ${shQuote(encoded)} "$k3s_service" "$k3s_desired_max_pods" "$k3s_expected_sha" "$capacity_restart" "$k3s_host_report" <<'PY'
import json, os, shlex, sys
manifest_path, host_script_path, job, namespace, image, dropin, encoded, service, desired, expected_sha, capacity_restart, report_path = sys.argv[1:13]
script = f"""#!/bin/sh
set -eu
expected=/tmp/unidesk-k3s-dropin.conf
printf %s {shlex.quote(encoded)} | base64 -d > "$expected"
host_dropin=/host{shlex.quote(dropin)}
host_report=/host{shlex.quote(report_path)}
mkdir -p "$(dirname "$host_dropin")"
before_sha=
if [ -f "$host_dropin" ]; then before_sha=$(sha256sum "$host_dropin" | awk '{{print "sha256:"$1}}'); fi
changed=false
if ! cmp -s "$expected" "$host_dropin" 2>/dev/null; then
cp "$expected" "$host_dropin"
chown 0:0 "$host_dropin" 2>/dev/null || true
chmod 0644 "$host_dropin"
changed=true
fi
nsenter_path=$(command -v nsenter || true)
host_systemctl() {{
if command -v chroot >/dev/null 2>&1 && [ -x /host/usr/bin/systemctl ]; then
chroot /host /usr/bin/systemctl "$@"
return $?
fi
if [ -n "$nsenter_path" ]; then
"$nsenter_path" -t 1 -m -u -i -n -p -- /usr/bin/systemctl "$@"
return $?
fi
return 127
}}
daemon_reload_rc=0
restart_rc=0
restarted=false
if command -v chroot >/dev/null 2>&1 || [ -n "$nsenter_path" ]; then
host_systemctl daemon-reload || daemon_reload_rc=$?
if [ "$changed" = true ] || [ {shlex.quote(capacity_restart)} = true ]; then
restarted=true
host_systemctl restart {shlex.quote(service)} || restart_rc=$?
fi
else
daemon_reload_rc=127
restart_rc=127
fi
after_sha=
if [ -f "$host_dropin" ]; then after_sha=$(sha256sum "$host_dropin" | awk '{{print "sha256:"$1}}'); fi
service_active=unknown
if command -v chroot >/dev/null 2>&1 || [ -n "$nsenter_path" ]; then service_active=$(host_systemctl is-active {shlex.quote(service)} 2>/dev/null || true); fi
python3 - "$changed" "$restarted" "$daemon_reload_rc" "$restart_rc" "$before_sha" "$after_sha" "$service_active" "$nsenter_path" <<'REPORT' >"$host_report"
import json, sys
changed, restarted = sys.argv[1] == "true", sys.argv[2] == "true"
daemon_reload_rc, restart_rc = int(sys.argv[3] or "0"), int(sys.argv[4] or "0")
print(json.dumps({{
"jobChanged": changed,
"jobRestarted": restarted,
"daemonReloadExitCode": daemon_reload_rc,
"restartExitCode": restart_rc,
"beforeDropInSha256": sys.argv[5] or None,
"dropInSha256": sys.argv[6] or None,
"expectedDropInSha256": {json.dumps(expected_sha)},
"dropInMatches": sys.argv[6] == {json.dumps(expected_sha)},
"serviceActiveText": sys.argv[7] or None,
"nsenterPresent": bool(sys.argv[8]),
}}))
REPORT
chmod 0644 "$host_report" 2>/dev/null || true
cat "$host_report"
"""
with open(host_script_path, "w", encoding="utf-8") as handle:
handle.write(script)
os.chmod(host_script_path, 0o755)
manifest = {
"apiVersion": "batch/v1",
"kind": "Job",
"metadata": {"name": job, "namespace": namespace, "labels": {"app.kubernetes.io/part-of": "hwlab-node-control-plane", "unidesk.ai/operation": "k3s-node-config"}},
"spec": {
"backoffLimit": 0,
"ttlSecondsAfterFinished": 300,
"template": {
"metadata": {"labels": {"app.kubernetes.io/part-of": "hwlab-node-control-plane", "unidesk.ai/operation": "k3s-node-config"}},
"spec": {
"restartPolicy": "Never",
"hostPID": True,
"hostNetwork": True,
"containers": [{
"name": "apply-k3s-node-config",
"image": image,
"imagePullPolicy": "IfNotPresent",
"securityContext": {"privileged": True},
"command": ["/bin/sh", "-lc", script],
"volumeMounts": [{"name": "host-root", "mountPath": "/host"}],
}],
"volumes": [{"name": "host-root", "hostPath": {"path": "/", "type": "Directory"}}],
},
},
},
}
with open(manifest_path, "w", encoding="utf-8") as handle:
json.dump(manifest, handle)
PY
k3s_render_rc=$?
if [ "$k3s_render_rc" != 0 ]; then
python3 - "$k3s_render_rc" "$k3s_expected_sha" "$k3s_service" "$k3s_dropin" "$k3s_node" "$k3s_desired_max_pods" <<'PY' >"$k3s_report_file"
import json, sys
render_rc = int(sys.argv[1] or "1")
expected_sha, service, dropin, node_name, desired = sys.argv[2:7]
print(json.dumps({
"managed": True,
"ok": False,
"mutation": False,
"renderExitCode": render_rc,
"serviceName": service,
"dropInPath": dropin,
"expectedDropInSha256": expected_sha,
"nodeName": node_name,
"desiredMaxPods": int(desired),
}, ensure_ascii=False))
PY
k3s_rc=$k3s_render_rc
else
kubectl apply -f "$k3s_job_manifest" >"$k3s_job_apply_stdout" 2>"$k3s_job_apply_stderr"
k3s_job_apply_rc=$?
k3s_apply_mode=kubernetes-job
k3s_docker_rc=127
if [ "$k3s_job_apply_rc" != 0 ] && command -v docker >/dev/null 2>&1; then
k3s_apply_mode=docker-host-fallback
docker run --rm --privileged --pid=host --network=host -v /:/host --entrypoint /bin/sh "$k3s_image" "/host$k3s_host_script" >"$k3s_docker_stdout" 2>"$k3s_docker_stderr"
k3s_docker_rc=$?
fi
k3s_submit_rc=$k3s_job_apply_rc
if [ "$k3s_job_apply_rc" != 0 ] && [ "$k3s_docker_rc" = 0 ]; then k3s_submit_rc=0; fi
python3 - "$k3s_submit_rc" "$k3s_job_apply_rc" "$k3s_docker_rc" "$k3s_apply_mode" "$k3s_before_capacity" "$k3s_before_allocatable" "$k3s_expected_sha" "$k3s_service" "$k3s_dropin" "$k3s_node" "$k3s_desired_max_pods" "$k3s_job" "$k3s_namespace" "$k3s_host_report" "$k3s_job_apply_stdout" "$k3s_job_apply_stderr" "$k3s_docker_stdout" "$k3s_docker_stderr" <<'PY' >"$k3s_report_file"
import json, pathlib, sys
submit_rc, job_apply_rc, docker_rc = [int(value or "0") for value in sys.argv[1:4]]
apply_mode = sys.argv[4]
before_capacity, before_allocatable = sys.argv[5:7]
expected_sha, service, dropin, node_name, desired, job_name, namespace, host_report = sys.argv[7:15]
def read(path):
return pathlib.Path(path).read_text(errors='replace') if pathlib.Path(path).exists() else ''
try:
host_report_data = json.loads(read(host_report) or "{}")
except Exception:
host_report_data = {}
apply_ok = submit_rc == 0
print(json.dumps({
"managed": True,
"ok": apply_ok,
"mutation": apply_ok,
"completionPending": apply_ok and apply_mode == "kubernetes-job",
"applyMode": apply_mode,
"jobName": job_name,
"namespace": namespace,
"jobApplyExitCode": job_apply_rc,
"dockerFallbackExitCode": docker_rc,
"serviceName": service,
"dropInPath": dropin,
"dropInSha256": host_report_data.get("dropInSha256"),
"expectedDropInSha256": expected_sha,
"dropInMatches": host_report_data.get("dropInSha256") == expected_sha if host_report_data else None,
"daemonReloadExitCode": host_report_data.get("daemonReloadExitCode"),
"restartExitCode": host_report_data.get("restartExitCode"),
"serviceActive": host_report_data.get("serviceActiveText") == "active" if host_report_data else None,
"nodeName": node_name,
"desiredMaxPods": int(desired),
"beforeCapacityPods": int(before_capacity) if before_capacity.isdigit() else None,
"beforeAllocatablePods": int(before_allocatable) if before_allocatable.isdigit() else None,
"hostReportPath": host_report,
"statusCommand": f"bun scripts/cli.ts hwlab nodes control-plane infra status --node {node_name.upper()} --lane ${target.lane}",
"jobCompletionCommand": f"kubectl -n {namespace} wait --for=condition=complete job/{job_name} --timeout=120s",
"jobLogsCommand": f"kubectl -n {namespace} logs job/{job_name} --tail=120",
"jobApplyStdoutPreview": read(sys.argv[15])[-1000:],
"jobApplyStderrPreview": read(sys.argv[16])[-1000:],
"dockerStdoutPreview": read(sys.argv[17])[-1000:],
"dockerStderrPreview": read(sys.argv[18])[-1000:],
}, ensure_ascii=False))
PY
k3s_rc=$k3s_submit_rc
fi
rm -f "$k3s_job_manifest" "$k3s_host_script"
fi
`;
}
export function toolsImageStatus(node: ControlPlaneNodeSpec, target: ControlPlaneTargetSpec, timeoutSeconds: number): {
registryReady: boolean;
toolsImageReady: boolean;
result: Record<string, unknown>;
} {
const result = runTransK3s(node.kubeRoute, registryStatusScript(node.registry.endpoint, target.tekton.toolsImage.output), timeoutSeconds);
const parsed = parseRemoteJson(result.stdout);
const status = typeof parsed === "object" && parsed !== null ? parsed as Record<string, unknown> : {};
return {
registryReady: boolField(status, "registryReady"),
toolsImageReady: boolField(status, "toolsImageReady"),
result: {
status,
command: compactCommandResult(result),
},
};
}
export function applyNext(node: ControlPlaneNodeSpec, target: ControlPlaneTargetSpec, imageStatus: { registryReady: boolean; toolsImageReady: boolean }): Record<string, unknown> {
if (!imageStatus.registryReady) {
return {
status: `bun scripts/cli.ts hwlab nodes control-plane infra status --node ${node.id} --lane ${target.lane}`,
blockedBy: "node-local-registry-not-ready",
};
}
if (!imageStatus.toolsImageReady) {
return {
status: `bun scripts/cli.ts hwlab nodes control-plane infra status --node ${node.id} --lane ${target.lane}`,
blockedBy: "tools-image-missing",
applyBootstrap: `bun scripts/cli.ts hwlab nodes control-plane infra apply --node ${node.id} --lane ${target.lane} --confirm`,
buildToolsImage: "准备受控 D601 tools-image build/publish 入口后提升 control-plane readiness。",
};
}
return { apply: `bun scripts/cli.ts hwlab nodes control-plane infra apply --node ${node.id} --lane ${target.lane} --confirm` };
}
export function statusNext(
node: ControlPlaneNodeSpec,
target: ControlPlaneTargetSpec,
registry: Record<string, unknown>,
gitMirror: Record<string, unknown>,
tekton: Record<string, unknown>,
argo: Record<string, unknown>,
ciNamespace: Record<string, unknown>,
runtimeNamespace: Record<string, unknown>,
k3sNodeConfig: Record<string, unknown>,
): Record<string, unknown> {
const bootstrapMissing = !boolField(ciNamespace, "exists")
|| !boolField(record(ciNamespace.gitWorkspaceSecret), "ready")
|| !boolField(gitMirror, "namespaceExists")
|| !boolField(gitMirror, "readServiceExists")
|| !boolField(gitMirror, "writeServiceExists")
|| (!boolField(gitMirror, "cachePvcExists") && !boolField(gitMirror, "cacheHostPathReady"));
const blockers: string[] = [];
if (node.k3s !== null && !boolField(k3sNodeConfig, "ready")) blockers.push("k3s-node-config-not-applied");
if (!boolField(registry, "ready")) blockers.push("node-local-registry-not-ready");
if (!boolField(registry, "toolsImageReady")) blockers.push("tools-image-missing");
if (!boolField(record(ciNamespace.gitWorkspaceSecret), "ready")) blockers.push("ci-git-workspace-secret-not-ready");
if (!boolField(runtimeNamespace, "exists")) blockers.push("runtime-namespace-missing");
if (!boolField(record(runtimeNamespace.runtimeObserverRbac), "ready")) blockers.push("runtime-observer-rbac-not-ready");
if (bootstrapMissing) blockers.push("control-plane-bootstrap-missing");
const gitMirrorGithubTransport = record(gitMirror.githubTransport);
if (gitMirrorGithubTransport.required === true && !boolField(gitMirrorGithubTransport, "ready")) blockers.push("git-mirror-github-token-secret-not-ready");
const tektonInstall = record(tekton.install);
if (!boolField(tekton, "installed")) blockers.push("tekton-not-installed");
else if (!boolField(tektonInstall, "crdsReady")) blockers.push("tekton-crds-not-ready");
else if (!boolField(tektonInstall, "deploymentsReady")) blockers.push("tekton-deployments-not-ready");
else if (!runtimeProxyReady(tektonInstall)) blockers.push("tekton-runtime-proxy-not-ready");
const argoInstall = record(argo.install);
if (!boolField(argo, "installed")) blockers.push("argocd-not-installed");
else if (!boolField(argoInstall, "crdsReady")) blockers.push("argocd-crds-not-ready");
else if (!boolField(argoInstall, "deploymentsReady")) blockers.push("argocd-deployments-not-ready");
else if (!boolField(argoInstall, "statefulSetsReady")) blockers.push("argocd-statefulsets-not-ready");
else if (!runtimeProxyReady(argoInstall)) blockers.push("argocd-runtime-proxy-not-ready");
else if (!boolField(argo, "projectExists")) blockers.push("argocd-project-missing");
else if (!boolField(argo, "applicationExists")) blockers.push("argocd-application-missing");
if (!boolField(record(argo.argoObserverRbac), "ready")) blockers.push("argocd-observer-rbac-not-ready");
const next: Record<string, unknown> = {
status: `bun scripts/cli.ts hwlab nodes control-plane infra status --node ${node.id} --lane ${target.lane}`,
dryRun: `bun scripts/cli.ts hwlab nodes control-plane infra apply --node ${node.id} --lane ${target.lane} --dry-run`,
};
if (blockers.length > 0) {
next.blockedBy = blockers[0];
next.blockers = blockers;
}
if (!boolField(registry, "toolsImageReady")) {
next.buildToolsImage = "准备受控 D601 tools-image build/publish 入口后提升 control-plane readiness。";
}
if (!boolField(tekton, "installed")) {
next.installTekton = `bun scripts/cli.ts hwlab nodes control-plane infra tekton apply --node ${node.id} --lane ${target.lane} --confirm`;
}
if (!boolField(argo, "installed")) {
next.installArgo = "准备受控 D601 Argo CD 安装入口后再进入 runtime rollout。";
}
if (node.k3s !== null && !boolField(k3sNodeConfig, "ready")) {
next.applyK3sNodeConfig = `bun scripts/cli.ts hwlab nodes control-plane infra apply --node ${node.id} --lane ${target.lane} --confirm`;
}
if (bootstrapMissing) next.applyBootstrap = `bun scripts/cli.ts hwlab nodes control-plane infra apply --node ${node.id} --lane ${target.lane} --confirm`;
else next.reapplyBootstrap = `bun scripts/cli.ts hwlab nodes control-plane infra apply --node ${node.id} --lane ${target.lane} --confirm`;
return next;
}
export function registryStatusScript(registryEndpoint: string, toolsImage: string): string {
return `
set +e
registry=${shQuote(registryEndpoint)}
tools_image=${shQuote(toolsImage)}
registry_ready=false
if command -v curl >/dev/null 2>&1; then curl -fsS --max-time 3 "http://$registry/v2/" >/tmp/hwlab-registry.out 2>/tmp/hwlab-registry.err && registry_ready=true; fi
tools_repo_tag=\${tools_image#\${registry}/}
tools_repo=\${tools_repo_tag%:*}
tools_tag=\${tools_repo_tag##*:}
tools_image_ready=false
manifest_accept='application/vnd.oci.image.manifest.v1+json, application/vnd.docker.distribution.manifest.v2+json, application/vnd.oci.image.index.v1+json, application/vnd.docker.distribution.manifest.list.v2+json'
if [ "$tools_repo" != "$tools_repo_tag" ] && command -v curl >/dev/null 2>&1; then curl -fsS --max-time 5 -H "Accept: $manifest_accept" "http://$registry/v2/$tools_repo/manifests/$tools_tag" >/tmp/hwlab-tools-image.out 2>/tmp/hwlab-tools-image.err && tools_image_ready=true; fi
cat <<JSON
{"registry":"$registry","toolsImage":"$tools_image","registryReady":$registry_ready,"toolsImageReady":$tools_image_ready}
JSON
`;
}
export function toolsImageDockerfile(target: ControlPlaneTargetSpec): string {
const inline = target.tekton.toolsImage.dockerfileInline;
if (inline === undefined) throw new Error(`targets.${target.id}.tekton.toolsImage.dockerfileInline is required for D601 node-local tools-image build`);
return `${inline.lines.join("\n")}\n`;
}
export function toolsImageBuildStartScript(node: ControlPlaneNodeSpec, target: ControlPlaneTargetSpec, dockerfile: string): string {
const stateDir = remoteJobStateDir(target, "tools-image");
const dockerfileEncoded = Buffer.from(dockerfile, "utf8").toString("base64");
const buildArgs = Object.entries(target.tekton.toolsImage.buildArgs).flatMap(([key, value]) => ["--build-arg", `${key}=${value}`]);
const proxyArgs = node.egressProxy === null
? []
: ["--build-arg", "HTTP_PROXY", "--build-arg", "HTTPS_PROXY", "--build-arg", "ALL_PROXY", "--build-arg", "NO_PROXY", "--build-arg", "http_proxy", "--build-arg", "https_proxy", "--build-arg", "all_proxy", "--build-arg", "no_proxy"];
const networkArgs = target.tekton.toolsImage.buildNetwork === null ? [] : ["--network", target.tekton.toolsImage.buildNetwork];
const dockerBuildArgs = [...networkArgs, ...buildArgs, ...proxyArgs, "-f", "$dockerfile", "-t", "$image", "$context_dir"].join(" ");
return `
set -eu
state_dir=${shQuote(stateDir)}
mkdir -p "$state_dir"
if [ -s "$state_dir/pid" ] && kill -0 "$(cat "$state_dir/pid")" >/dev/null 2>&1; then
printf '{"started":false,"reason":"job-already-running","pid":%s,"stateDir":"%s"}\\n' "$(cat "$state_dir/pid")" "$state_dir"
exit 0
fi
cat >"$state_dir/job.sh" <<'JOB'
#!/bin/sh
set -eu
state_dir=${shQuote(stateDir)}
image=${shQuote(target.tekton.toolsImage.output)}
context_dir="$state_dir/context"
dockerfile="$state_dir/${target.tekton.toolsImage.dockerfileInline?.filename ?? "Dockerfile"}"
log="$state_dir/job.log"
status="$state_dir/status.json"
write_status() {
state="$1"; shift
message="$1"; shift || true
python3 - "$status" "$state" "$message" "$image" <<'PY'
import json, pathlib, sys, time
path=pathlib.Path(sys.argv[1])
payload={"state":sys.argv[2],"message":sys.argv[3],"image":sys.argv[4],"updatedAt":time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())}
path.write_text(json.dumps(payload, ensure_ascii=False) + "\\n")
PY
}
run_job() {
write_status running starting
rm -rf "$context_dir"
mkdir -p "$context_dir"
printf %s ${shQuote(dockerfileEncoded)} | base64 -d >"$dockerfile"
${proxyExportBlock(node)}
docker build ${dockerBuildArgs} || return "$?"
docker run --rm "$image" sh -lc 'node --version && npm --version && bun --version && git --version && python3 --version && docker --version && ssh -V' || return "$?"
docker push "$image" || return "$?"
image_id="$(docker image inspect "$image" --format '{{.Id}}' 2>/dev/null || true)"
digest="$(docker image inspect "$image" --format '{{join .RepoDigests ","}}' 2>/dev/null || true)"
python3 - "$status" "$image" "$image_id" "$digest" <<'PY'
import json, pathlib, sys, time
path=pathlib.Path(sys.argv[1])
path.write_text(json.dumps({"state":"succeeded","message":"image-built-and-pushed","image":sys.argv[2],"imageId":sys.argv[3] or None,"repoDigests":[item for item in sys.argv[4].split(",") if item],"updatedAt":time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())}, ensure_ascii=False) + "\\n")
PY
}
run_job >>"$log" 2>&1 || {
rc=$?
write_status failed "exit-$rc"
exit "$rc"
}
JOB
chmod +x "$state_dir/job.sh"
: >"$state_dir/job.log"
nohup "$state_dir/job.sh" >/dev/null 2>&1 &
pid=$!
printf '%s' "$pid" >"$state_dir/pid"
printf '{"started":true,"pid":%s,"stateDir":"%s","statusCommand":"bun scripts/cli.ts hwlab nodes control-plane infra tools-image status --node %s --lane %s","logsCommand":"bun scripts/cli.ts hwlab nodes control-plane infra tools-image logs --node %s --lane %s"}\\n' "$pid" "$state_dir" ${shQuote(node.id)} ${shQuote(target.lane)} ${shQuote(node.id)} ${shQuote(target.lane)}
`;
}
export function tektonInstallApplyStartScript(node: ControlPlaneNodeSpec, target: ControlPlaneTargetSpec): string {
const stateDir = remoteJobStateDir(target, "tekton");
const manifestsEncoded = Buffer.from(JSON.stringify(target.tekton.install.manifests), "utf8").toString("base64");
const crdsEncoded = Buffer.from(JSON.stringify(target.tekton.install.requiredCrds), "utf8").toString("base64");
const namespacesEncoded = Buffer.from(JSON.stringify(target.tekton.install.expectedDeploymentNamespaces), "utf8").toString("base64");
const runtimeProxyEncoded = Buffer.from(JSON.stringify(runtimeHostProxyConfig(node, target.tekton.install.runtimeProxy)), "utf8").toString("base64");
return `
set -eu
state_dir=${shQuote(stateDir)}
mkdir -p "$state_dir"
if [ -s "$state_dir/pid" ] && kill -0 "$(cat "$state_dir/pid")" >/dev/null 2>&1; then
printf '{"started":false,"reason":"job-already-running","pid":%s,"stateDir":"%s"}\\n' "$(cat "$state_dir/pid")" "$state_dir"
exit 0
fi
cat >"$state_dir/job.sh" <<'JOB'
#!/bin/sh
set -eu
state_dir=${shQuote(stateDir)}
field_manager=${shQuote(target.tekton.install.fieldManager)}
readiness_timeout=${shQuote(String(target.tekton.install.readinessTimeoutSeconds))}
log="$state_dir/job.log"
status="$state_dir/status.json"
manifests_json="$state_dir/manifests.json"
crds_json="$state_dir/required-crds.json"
namespaces_json="$state_dir/deployment-namespaces.json"
runtime_proxy_json="$state_dir/runtime-proxy.json"
write_status() {
state="$1"; shift
message="$1"; shift || true
python3 - "$status" "$state" "$message" <<'PY'
import json, pathlib, sys, time
path=pathlib.Path(sys.argv[1])
path.write_text(json.dumps({"state":sys.argv[2],"message":sys.argv[3],"updatedAt":time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())}, ensure_ascii=False) + "\\n")
PY
}
{
write_status running starting
${proxyExportBlock(node)}
printf %s ${shQuote(manifestsEncoded)} | base64 -d >"$manifests_json"
printf %s ${shQuote(crdsEncoded)} | base64 -d >"$crds_json"
printf %s ${shQuote(namespacesEncoded)} | base64 -d >"$namespaces_json"
printf %s ${shQuote(runtimeProxyEncoded)} | base64 -d >"$runtime_proxy_json"
python3 - "$manifests_json" "$crds_json" "$namespaces_json" "$runtime_proxy_json" "$field_manager" "$readiness_timeout" "$state_dir" <<'PY'
import json, pathlib, subprocess, sys, time
manifests=json.loads(pathlib.Path(sys.argv[1]).read_text())
required_crds=json.loads(pathlib.Path(sys.argv[2]).read_text())
namespaces=json.loads(pathlib.Path(sys.argv[3]).read_text())
runtime_proxy=json.loads(pathlib.Path(sys.argv[4]).read_text())
field_manager=sys.argv[5]
timeout=int(sys.argv[6])
state_dir=pathlib.Path(sys.argv[7])
def run(args):
print(json.dumps({"event":"tekton-install-command","argv":args[:2] + ["..."] if len(args) > 2 else args}, ensure_ascii=False), flush=True)
subprocess.run(args, check=True)
def run_checked(args, allow_no_resources=False):
print(json.dumps({"event":"tekton-runtime-proxy-command","argv":args[:4] + ["..."] if len(args) > 4 else args}, ensure_ascii=False), flush=True)
proc=subprocess.run(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
text=(proc.stdout or "") + (proc.stderr or "")
if proc.returncode != 0:
if allow_no_resources and "No resources found" in text:
return ""
sys.stderr.write(text[-2000:])
raise SystemExit(proc.returncode)
return proc.stdout
def names_for(kind, namespace):
stdout=run_checked(["kubectl", "-n", namespace, "get", kind, "-o", "json"], allow_no_resources=True)
if not stdout:
return []
data=json.loads(stdout or "{}")
return [item.get("metadata", {}).get("name") for item in data.get("items", []) if item.get("metadata", {}).get("name")]
def runtime_proxy_env_args(cfg):
proxy=str(cfg.get("proxyUrl") or "")
no_proxy=",".join([str(item) for item in cfg.get("noProxy") or []])
return [
"HTTP_PROXY=" + proxy,
"HTTPS_PROXY=" + proxy,
"ALL_PROXY=" + proxy,
"NO_PROXY=" + no_proxy,
"http_proxy=" + proxy,
"https_proxy=" + proxy,
"all_proxy=" + proxy,
"no_proxy=" + no_proxy,
]
def apply_runtime_proxy(cfg):
if not cfg.get("enabled"):
return
selected={
"deployment": set(str(item) for item in cfg.get("deployments") or []),
"statefulset": set(str(item) for item in cfg.get("statefulSets") or []),
}
observed={"deployment": set(), "statefulset": set()}
remove_env=["HTTP_PROXY-","HTTPS_PROXY-","ALL_PROXY-","NO_PROXY-","http_proxy-","https_proxy-","all_proxy-","no_proxy-"]
for namespace in namespaces:
for kind in ["deployment", "statefulset"]:
for name in names_for(kind, namespace):
observed[kind].add(name)
resource=f"{kind}/{name}"
enabled_for_workload=name in selected[kind]
if enabled_for_workload and cfg.get("injectEnv"):
run_checked(["kubectl", "-n", namespace, "set", "env", resource, *runtime_proxy_env_args(cfg)])
else:
run_checked(["kubectl", "-n", namespace, "set", "env", resource, *remove_env])
patch={"spec":{"template":{"metadata":{"annotations":{
"unidesk.ai/runtime-proxy": "host-route" if enabled_for_workload else None,
"unidesk.ai/runtime-proxy-config-ref": str(cfg.get("configRef") or "") if enabled_for_workload else None,
}},"spec":{
"hostNetwork": bool(enabled_for_workload and cfg.get("hostNetwork")),
"dnsPolicy": "ClusterFirstWithHostNet" if enabled_for_workload and cfg.get("hostNetwork") else "ClusterFirst",
}}}}
run_checked(["kubectl", "-n", namespace, "patch", kind, name, "--type", "merge", "-p", json.dumps(patch)])
missing=[]
for kind, selected_names in selected.items():
missing.extend([f"{kind}/{name}" for name in sorted(selected_names.difference(observed[kind]))])
if missing:
raise SystemExit("runtime proxy selected missing workload: " + ",".join(missing))
for manifest in manifests:
name=manifest["name"]
url=manifest["url"]
dest=state_dir / f"{name}.yaml"
run(["curl", "-fsSL", "--connect-timeout", "20", "--max-time", "180", url, "-o", str(dest)])
run(["kubectl", "apply", "--field-manager", field_manager, "-f", str(dest)])
apply_runtime_proxy(runtime_proxy)
deadline=time.time()+timeout
while True:
missing=[name for name in required_crds if subprocess.run(["kubectl", "get", "crd", name], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL).returncode != 0]
if not missing:
break
if time.time() >= deadline:
raise SystemExit(f"tekton CRDs not ready: {','.join(missing)}")
time.sleep(5)
for namespace in namespaces:
while True:
proc=subprocess.run(["kubectl", "-n", namespace, "get", "deploy", "-o", "json"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
if proc.returncode == 0:
data=json.loads(proc.stdout)
deployments=data.get("items", [])
ready=True
for item in deployments:
desired=int(item.get("spec", {}).get("replicas") or 0)
ready_replicas=int(item.get("status", {}).get("readyReplicas") or 0)
if desired <= 0 or ready_replicas != desired:
ready=False
break
if deployments and ready:
break
if time.time() >= deadline:
stderr=proc.stderr[-500:] if proc.returncode != 0 else ""
raise SystemExit(f"tekton deployments not ready in namespace {namespace}; stderr={stderr}")
time.sleep(5)
PY
write_status succeeded tekton-install-applied
} >>"$log" 2>&1 || {
rc=$?
write_status failed "exit-$rc"
exit "$rc"
}
JOB
chmod +x "$state_dir/job.sh"
: >"$state_dir/job.log"
nohup "$state_dir/job.sh" >/dev/null 2>&1 &
pid=$!
printf '%s' "$pid" >"$state_dir/pid"
printf '{"started":true,"pid":%s,"stateDir":"%s","statusCommand":"bun scripts/cli.ts hwlab nodes control-plane infra tekton status --node %s --lane %s","logsCommand":"bun scripts/cli.ts hwlab nodes control-plane infra tekton logs --node %s --lane %s"}\\n' "$pid" "$state_dir" ${shQuote(node.id)} ${shQuote(target.lane)} ${shQuote(node.id)} ${shQuote(target.lane)}
`;
}
export function argoApplyStartScript(node: ControlPlaneNodeSpec, target: ControlPlaneTargetSpec, desiredYaml: string): string {
const stateDir = remoteJobStateDir(target, "argo");
const desiredEncoded = Buffer.from(desiredYaml, "utf8").toString("base64");
const rewritesEncoded = Buffer.from(JSON.stringify(target.argo.install.imageRewrites), "utf8").toString("base64");
const preloadEncoded = Buffer.from(JSON.stringify(target.argo.install.preloadImages), "utf8").toString("base64");
const runtimeProxyEncoded = Buffer.from(JSON.stringify(runtimeHostProxyConfig(node, target.argo.install.runtimeProxy)), "utf8").toString("base64");
return `
set -eu
state_dir=${shQuote(stateDir)}
mkdir -p "$state_dir"
if [ -s "$state_dir/pid" ] && kill -0 "$(cat "$state_dir/pid")" >/dev/null 2>&1; then
printf '{"started":false,"reason":"job-already-running","pid":%s,"stateDir":"%s"}\\n' "$(cat "$state_dir/pid")" "$state_dir"
exit 0
fi
cat >"$state_dir/job.sh" <<'JOB'
#!/bin/sh
set -eu
state_dir=${shQuote(stateDir)}
namespace=${shQuote(target.argo.namespace)}
manifest_url=${shQuote(target.argo.install.manifestUrl)}
field_manager=${shQuote(target.argo.install.fieldManager)}
readiness_timeout=${shQuote(String(target.argo.install.readinessTimeoutSeconds))}
log="$state_dir/job.log"
status="$state_dir/status.json"
install_yaml="$state_dir/install.yaml"
rendered_yaml="$state_dir/install.rendered.yaml"
desired_yaml="$state_dir/desired.yaml"
write_status() {
state="$1"; shift
message="$1"; shift || true
python3 - "$status" "$state" "$message" <<'PY'
import json, pathlib, sys, time
path=pathlib.Path(sys.argv[1])
path.write_text(json.dumps({"state":sys.argv[2],"message":sys.argv[3],"updatedAt":time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())}, ensure_ascii=False) + "\\n")
PY
}
{
write_status running starting
${proxyExportBlock(node)}
printf %s ${shQuote(desiredEncoded)} | base64 -d >"$desired_yaml"
printf %s ${shQuote(rewritesEncoded)} | base64 -d >"$state_dir/image-rewrites.json"
printf %s ${shQuote(preloadEncoded)} | base64 -d >"$state_dir/preload-images.json"
printf %s ${shQuote(runtimeProxyEncoded)} | base64 -d >"$state_dir/runtime-proxy.json"
kubectl create namespace "$namespace" --dry-run=client -o yaml | kubectl apply --server-side --field-manager="$field_manager" -f - || exit "$?"
python3 - "$state_dir/preload-images.json" "$state_dir/image-rewrites.json" <<'PY' >"$state_dir/pull-images.sh"
import json, pathlib, shlex, sys
preload=json.loads(pathlib.Path(sys.argv[1]).read_text())
rewrites=json.loads(pathlib.Path(sys.argv[2]).read_text())
print("#!/bin/sh")
print("set -eu")
seen=set()
for item in rewrites:
pull=item["pullImage"]
target=item["target"]
if target in seen:
continue
seen.add(target)
print("docker pull " + shlex.quote(pull))
print("docker tag " + shlex.quote(pull) + " " + shlex.quote(target))
print("docker push " + shlex.quote(target))
for image in preload:
if image not in seen and image.startswith("127.0.0.1:5000/"):
print("docker image inspect " + shlex.quote(image) + " >/dev/null")
PY
chmod +x "$state_dir/pull-images.sh"
"$state_dir/pull-images.sh" || exit "$?"
curl -fsSL --max-time 60 "$manifest_url" >"$install_yaml" || exit "$?"
python3 - "$install_yaml" "$state_dir/image-rewrites.json" "$rendered_yaml" ${shQuote(target.argo.install.imagePullPolicy)} <<'PY'
import json, pathlib, sys
text=pathlib.Path(sys.argv[1]).read_text()
rewrites=json.loads(pathlib.Path(sys.argv[2]).read_text())
for item in rewrites:
text=text.replace(item["source"], item["target"])
policy=sys.argv[4]
text=text.replace("imagePullPolicy: Always", "imagePullPolicy: " + policy)
pathlib.Path(sys.argv[3]).write_text(text)
PY
kubectl apply --server-side --field-manager="$field_manager" -n "$namespace" -f "$rendered_yaml" || exit "$?"
python3 - "$state_dir/runtime-proxy.json" "$namespace" <<'PY'
import json, pathlib, subprocess, sys
cfg=json.loads(pathlib.Path(sys.argv[1]).read_text())
namespace=sys.argv[2]
if not cfg.get("enabled"):
raise SystemExit(0)
def run(args, allow_no_resources=False):
print(json.dumps({"event":"argocd-runtime-proxy-command","argv":args[:4] + ["..."] if len(args) > 4 else args}, ensure_ascii=False), flush=True)
proc=subprocess.run(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
text=(proc.stdout or "") + (proc.stderr or "")
if proc.returncode != 0:
if allow_no_resources and "No resources found" in text:
return
sys.stderr.write(text[-2000:])
raise SystemExit(proc.returncode)
def names_for(kind):
proc=subprocess.run(["kubectl", "-n", namespace, "get", kind, "-o", "json"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
text=(proc.stdout or "") + (proc.stderr or "")
if proc.returncode != 0:
if "No resources found" in text:
return []
sys.stderr.write(text[-2000:])
raise SystemExit(proc.returncode)
data=json.loads(proc.stdout or "{}")
return [item.get("metadata", {}).get("name") for item in data.get("items", []) if item.get("metadata", {}).get("name")]
def env_args():
proxy=str(cfg.get("proxyUrl") or "")
no_proxy=",".join([str(item) for item in cfg.get("noProxy") or []])
return [
"HTTP_PROXY=" + proxy,
"HTTPS_PROXY=" + proxy,
"ALL_PROXY=" + proxy,
"NO_PROXY=" + no_proxy,
"http_proxy=" + proxy,
"https_proxy=" + proxy,
"all_proxy=" + proxy,
"no_proxy=" + no_proxy,
]
env_remove_args=["HTTP_PROXY-","HTTPS_PROXY-","ALL_PROXY-","NO_PROXY-","http_proxy-","https_proxy-","all_proxy-","no_proxy-"]
workloads={
"deployment": set(str(item) for item in cfg.get("deployments") or []),
"statefulset": set(str(item) for item in cfg.get("statefulSets") or []),
}
for kind in ["deployment", "statefulset"]:
selected=workloads[kind]
all_names=names_for(kind)
missing=sorted(selected.difference(all_names))
if missing:
raise SystemExit(f"runtime proxy selected missing {kind}: {','.join(missing)}")
for name in all_names:
resource=f"{kind}/{name}"
enabled_for_workload=name in selected
if enabled_for_workload and cfg.get("injectEnv"):
run(["kubectl", "-n", namespace, "set", "env", resource, *env_args()])
else:
run(["kubectl", "-n", namespace, "set", "env", resource, *env_remove_args])
patch={"spec":{"template":{"metadata":{"annotations":{
"unidesk.ai/runtime-proxy": "host-route" if enabled_for_workload else None,
"unidesk.ai/runtime-proxy-config-ref": str(cfg.get("configRef") or "") if enabled_for_workload else None,
}},"spec":{
"hostNetwork": bool(enabled_for_workload and cfg.get("hostNetwork")),
"dnsPolicy": "ClusterFirstWithHostNet" if enabled_for_workload and cfg.get("hostNetwork") else "ClusterFirst",
}}}}
run(["kubectl", "-n", namespace, "patch", kind, name, "--type", "merge", "-p", json.dumps(patch)])
PY
deadline=$(( $(date +%s) + readiness_timeout ))
while [ "$(date +%s)" -lt "$deadline" ]; do
kubectl get crd applications.argoproj.io appprojects.argoproj.io >/dev/null 2>&1 && break
sleep 5
done
kubectl get crd applications.argoproj.io appprojects.argoproj.io >/dev/null || exit "$?"
kubectl apply --server-side --force-conflicts --field-manager="$field_manager" -f "$desired_yaml" || exit "$?"
write_status succeeded argocd-install-applied
} >>"$log" 2>&1 || {
rc=$?
write_status failed "exit-$rc"
exit "$rc"
}
JOB
chmod +x "$state_dir/job.sh"
: >"$state_dir/job.log"
nohup "$state_dir/job.sh" >/dev/null 2>&1 &
pid=$!
printf '%s' "$pid" >"$state_dir/pid"
printf '{"started":true,"pid":%s,"stateDir":"%s","statusCommand":"bun scripts/cli.ts hwlab nodes control-plane infra argo status --node %s --lane %s","logsCommand":"bun scripts/cli.ts hwlab nodes control-plane infra argo logs --node %s --lane %s"}\\n' "$pid" "$state_dir" ${shQuote(node.id)} ${shQuote(target.lane)} ${shQuote(node.id)} ${shQuote(target.lane)}
`;
}
export function ciBuildBenchmarkStartScript(
target: ControlPlaneTargetSpec,
profile: CiBuildBenchmarkProfileSpec,
manifest: Record<string, unknown>,
pipelineName: string,
pipelineRun: string,
sourceCommit: string,
catalogPath: string,
): string {
const stateDir = ciBuildBenchmarkStateDir(target, profile.profile);
const manifestB64 = Buffer.from(JSON.stringify(manifest), "utf8").toString("base64");
return `
set -eu
state_dir=${shQuote(stateDir)}
status_file="$state_dir/status.json"
ns=${shQuote(target.ciNamespace)}
profile=${shQuote(profile.profile)}
pipeline=${shQuote(pipelineName)}
pipeline_run=${shQuote(pipelineRun)}
source_commit=${shQuote(sourceCommit)}
catalog_path=${shQuote(catalogPath)}
mkdir -p "$state_dir"
previous_run=
if [ -s "$status_file" ]; then
previous_run=$(python3 - "$status_file" <<'PY' || true
import json, sys
try:
data=json.load(open(sys.argv[1], encoding="utf-8"))
print(data.get("pipelineRun") or "")
except Exception:
print("")
PY
)
fi
if [ -n "$previous_run" ]; then
previous_status=$(kubectl -n "$ns" get pipelinerun "$previous_run" -o 'jsonpath={.status.conditions[?(@.type=="Succeeded")].status}' 2>/dev/null || true)
if [ -n "$previous_status" ] && [ "$previous_status" != "True" ] && [ "$previous_status" != "False" ]; then
python3 - "$state_dir" "$previous_run" "$profile" ${shQuote(target.node)} ${shQuote(target.lane)} <<'PY'
import json, sys
state_dir, previous_run, profile, node, lane = sys.argv[1:6]
print(json.dumps({
"started": False,
"state": "already-running",
"pipelineRun": previous_run,
"stateDir": state_dir,
"statusCommand": f"bun scripts/cli.ts hwlab nodes control-plane infra ci-build-benchmark status --node {node} --lane {lane} --profile {profile}",
"logsCommand": f"bun scripts/cli.ts hwlab nodes control-plane infra ci-build-benchmark logs --node {node} --lane {lane} --profile {profile}",
}, ensure_ascii=False))
PY
exit 0
fi
fi
manifest_path="$state_dir/$pipeline_run.json"
printf '%s' ${shQuote(manifestB64)} | base64 -d >"$manifest_path"
set +e
pipeline_check=$(kubectl -n "$ns" get pipeline "$pipeline" -o name 2>&1)
pipeline_check_rc=$?
create_output=
create_rc=0
if [ "$pipeline_check_rc" = 0 ]; then
create_output=$(kubectl create -f "$manifest_path" 2>&1)
create_rc=$?
else
create_output="$pipeline_check"
create_rc="$pipeline_check_rc"
fi
printf '%s\\n' "$create_output" >"$state_dir/create.log"
python3 - "$status_file" "$state_dir" "$pipeline_run" "$source_commit" "$profile" "$catalog_path" "$create_rc" "$create_output" ${shQuote(target.node)} ${shQuote(target.lane)} <<'PY'
import datetime, json, sys
status_file, state_dir, pipeline_run, source_commit, profile, catalog_path, rc_raw, output, node, lane = sys.argv[1:11]
rc=int(rc_raw or "0")
payload={
"started": rc == 0,
"state": "started" if rc == 0 else "failed",
"pipelineRun": pipeline_run,
"sourceCommit": source_commit,
"profile": profile,
"catalogPath": catalog_path,
"stateDir": state_dir,
"createdAt": datetime.datetime.now(datetime.timezone.utc).isoformat(),
"exitCode": rc,
"createOutputTail": output[-2000:],
"statusCommand": f"bun scripts/cli.ts hwlab nodes control-plane infra ci-build-benchmark status --node {node} --lane {lane} --profile {profile}",
"logsCommand": f"bun scripts/cli.ts hwlab nodes control-plane infra ci-build-benchmark logs --node {node} --lane {lane} --profile {profile}",
}
open(status_file, "w", encoding="utf-8").write(json.dumps(payload, ensure_ascii=False))
print(json.dumps(payload, ensure_ascii=False))
PY
exit "$create_rc"
`;
}
export function ciBuildBenchmarkStatusScript(target: ControlPlaneTargetSpec, profile: CiBuildBenchmarkProfileSpec, tailLines: number, includeLogs: boolean): string {
const stateDir = ciBuildBenchmarkStateDir(target, profile.profile);
return `
set +e
state_dir=${shQuote(stateDir)}
status_file="$state_dir/status.json"
ns=${shQuote(target.ciNamespace)}
profile=${shQuote(profile.profile)}
tail_lines=${shQuote(String(tailLines))}
include_logs=${includeLogs ? "true" : "false"}
tmp_dir=$(mktemp -d)
pipeline_run=
if [ -s "$status_file" ]; then
pipeline_run=$(python3 - "$status_file" <<'PY' || true
import json, sys
try:
data=json.load(open(sys.argv[1], encoding="utf-8"))
print(data.get("pipelineRun") or "")
except Exception:
print("")
PY
)
fi
if [ -z "$pipeline_run" ]; then
pipeline_run=$(kubectl -n "$ns" get pipelinerun -l "unidesk.ai/benchmark=ci-build,unidesk.ai/benchmark-profile=$profile" -o 'jsonpath={range .items[*]}{.metadata.creationTimestamp}{" "}{.metadata.name}{"\\n"}{end}' 2>/dev/null | sort | tail -n 1 | awk '{print $2}')
fi
if [ -n "$pipeline_run" ]; then
kubectl -n "$ns" get pipelinerun "$pipeline_run" -o json >"$tmp_dir/pipelinerun.json" 2>"$tmp_dir/pipelinerun.err"
kubectl -n "$ns" get taskrun -l "tekton.dev/pipelineRun=$pipeline_run" -o json >"$tmp_dir/taskruns.json" 2>"$tmp_dir/taskruns.err"
kubectl -n "$ns" get pod -l "tekton.dev/pipelineRun=$pipeline_run" -o json >"$tmp_dir/pods.json" 2>"$tmp_dir/pods.err"
if [ "$include_logs" = true ]; then
kubectl -n "$ns" logs -l "tekton.dev/pipelineRun=$pipeline_run" --all-containers --tail="$tail_lines" --prefix=true >"$tmp_dir/logs.txt" 2>"$tmp_dir/logs.err" || true
fi
fi
python3 - "$state_dir" "$status_file" "$tmp_dir" "$pipeline_run" "$include_logs" "$tail_lines" <<'PY'
import json, pathlib, sys
state_dir=pathlib.Path(sys.argv[1])
status_path=pathlib.Path(sys.argv[2])
tmp_dir=pathlib.Path(sys.argv[3])
pipeline_run=sys.argv[4]
include_logs=sys.argv[5] == "true"
tail_lines=int(sys.argv[6])
log_tail_limit=min(6000, max(2000, tail_lines * 80))
def read_json(path):
try:
return json.loads(path.read_text(encoding="utf-8"))
except Exception:
return None
def read_text(path, limit=4000):
try:
return path.read_text(encoding="utf-8", errors="replace")[-limit:]
except Exception:
return ""
def succeeded_condition(obj):
for cond in obj.get("status", {}).get("conditions", []) or []:
if cond.get("type") == "Succeeded":
return cond
return {}
status=None
if status_path.exists():
status=read_json(status_path)
pr=read_json(tmp_dir / "pipelinerun.json") if pipeline_run else None
trs=read_json(tmp_dir / "taskruns.json") if pipeline_run else None
pods=read_json(tmp_dir / "pods.json") if pipeline_run else None
pr_cond=succeeded_condition(pr or {})
task_runs=[]
for item in (trs or {}).get("items", []) or []:
cond=succeeded_condition(item)
labels=item.get("metadata", {}).get("labels", {}) or {}
task_runs.append({
"name": item.get("metadata", {}).get("name"),
"pipelineTask": labels.get("tekton.dev/pipelineTask"),
"status": cond.get("status"),
"reason": cond.get("reason"),
"message": cond.get("message"),
"startTime": item.get("status", {}).get("startTime"),
"completionTime": item.get("status", {}).get("completionTime"),
"podName": item.get("status", {}).get("podName"),
})
pod_rows=[]
for item in (pods or {}).get("items", []) or []:
phase=item.get("status", {}).get("phase")
pod_rows.append({
"name": item.get("metadata", {}).get("name"),
"phase": phase,
"startTime": item.get("status", {}).get("startTime"),
})
state="not-started"
if pr:
status_value=pr_cond.get("status")
if status_value == "True":
state="succeeded"
elif status_value == "False":
state="failed"
elif status_value:
state="running"
else:
state="pending"
elif pipeline_run:
state="missing"
payload={
"stateDir": str(state_dir),
"status": status,
"pipelineRunName": pipeline_run or None,
"state": state,
"pipelineRun": None if not pr else {
"name": pr.get("metadata", {}).get("name"),
"status": pr_cond.get("status"),
"reason": pr_cond.get("reason"),
"message": pr_cond.get("message"),
"createdAt": pr.get("metadata", {}).get("creationTimestamp"),
"startTime": pr.get("status", {}).get("startTime"),
"completionTime": pr.get("status", {}).get("completionTime"),
"sourceCommit": (pr.get("metadata", {}).get("labels", {}) or {}).get("hwlab.pikastech.local/source-commit"),
"catalogPath": (pr.get("metadata", {}).get("annotations", {}) or {}).get("unidesk.ai/catalog-path"),
},
"taskRuns": task_runs,
"pods": pod_rows,
"errors": {
"pipelinerun": read_text(tmp_dir / "pipelinerun.err"),
"taskruns": read_text(tmp_dir / "taskruns.err"),
"pods": read_text(tmp_dir / "pods.err"),
"logs": read_text(tmp_dir / "logs.err") if include_logs else "",
},
"logTail": read_text(tmp_dir / "logs.txt", log_tail_limit) if include_logs else "",
}
print(json.dumps(payload, ensure_ascii=False))
PY
rm -rf "$tmp_dir"
`;
}
export function remoteJobStatusScript(target: ControlPlaneTargetSpec, name: "tools-image" | "tekton" | "argo", tailLines: number): string {
const stateDir = remoteJobStateDir(target, name);
return `
set +e
state_dir=${shQuote(stateDir)}
status_file="$state_dir/status.json"
log_file="$state_dir/job.log"
pid_file="$state_dir/pid"
running=false
pid=null
if [ -s "$pid_file" ]; then
pid_raw="$(cat "$pid_file" 2>/dev/null || true)"
if [ -n "$pid_raw" ] && kill -0 "$pid_raw" >/dev/null 2>&1; then running=true; pid="$pid_raw"; else pid="$pid_raw"; fi
fi
python3 - "$state_dir" "$status_file" "$log_file" "$running" "$pid" ${shQuote(String(tailLines))} <<'PY'
import json, pathlib, sys
state_dir=pathlib.Path(sys.argv[1])
status_path=pathlib.Path(sys.argv[2])
log_path=pathlib.Path(sys.argv[3])
running=sys.argv[4] == "true"
pid=None if sys.argv[5] in ("", "null") else sys.argv[5]
tail_lines=int(sys.argv[6])
status=None
if status_path.exists():
try:
status=json.loads(status_path.read_text())
except Exception as error:
status={"parseError": str(error), "raw": status_path.read_text(errors="replace")[-1000:]}
log_tail=""
if log_path.exists():
lines=log_path.read_text(errors="replace").splitlines()
log_tail="\\n".join(lines[-tail_lines:])
print(json.dumps({"stateDir": str(state_dir), "pid": pid, "running": running, "status": status, "logBytes": log_path.stat().st_size if log_path.exists() else 0, "logTail": log_tail}, ensure_ascii=False))
PY
`;
}
export function remoteJobLogs(node: ControlPlaneNodeSpec, target: ControlPlaneTargetSpec, name: "tools-image" | "tekton" | "argo", options: ToolsImageOptions | TektonInstallOptions | ArgoOptions): Record<string, unknown> {
const result = runTransK3s(node.kubeRoute, remoteJobStatusScript(target, name, options.tailLines), options.timeoutSeconds);
const parsed = parseRemoteJson(result.stdout);
return {
ok: result.exitCode === 0,
command: `hwlab nodes control-plane infra ${name} logs`,
configPath: HWLAB_NODE_CONTROL_PLANE_CONFIG_PATH,
node: node.id,
lane: target.lane,
mutation: false,
job: typeof parsed === "object" && parsed !== null ? parsed : { stdoutPreview: result.stdout.slice(0, 2000) },
result: compactCommandResult(result),
};
}
export function manifestObjectSummary(manifest: readonly Record<string, unknown>[]): Record<string, unknown>[] {
return manifest.map((item) => {
const metadata = record(item.metadata);
return { kind: item.kind ?? null, namespace: metadata.namespace ?? null, name: metadata.name ?? null };
});
}
export function runTransK3s(kubeRoute: string, script: string, timeoutSeconds: number): CommandResult {
return runCommand(["/root/.local/bin/trans", kubeRoute, "sh", "--", script], rootPath(), { timeoutMs: timeoutSeconds * 1000 });
}
export function runTransHost(route: string, script: string, timeoutSeconds: number): CommandResult {
return runCommand(["bun", "scripts/ssh-cli.ts", "ssh", route, "sh", "--", script], rootPath(), { timeoutMs: timeoutSeconds * 1000 });
}
export function proxyExportBlock(node: ControlPlaneNodeSpec): string {
const proxy = node.egressProxy;
if (proxy === null) return " : # no egress proxy configured\n";
if (proxy.mode === "host-route") {
const noProxy = [...new Set(["localhost", "127.0.0.1", "::1", "127.0.0.1:5000", "localhost:5000", ...proxy.noProxy])];
return `
if [ ! -s ${shQuote(proxy.proxyEnvPath)} ]; then echo "host route egress proxy env missing: ${proxy.proxyEnvPath}" >&2; exit 41; fi
. ${shQuote(proxy.proxyEnvPath)}
export HTTP_PROXY="${proxy.proxyUrl}"
export HTTPS_PROXY="$HTTP_PROXY"
export ALL_PROXY="$HTTP_PROXY"
export http_proxy="$HTTP_PROXY"
export https_proxy="$HTTP_PROXY"
export all_proxy="$HTTP_PROXY"
export NO_PROXY=${shQuote(noProxy.join(","))}
export no_proxy="$NO_PROXY"
`;
}
const noProxy = [...new Set(["localhost", "127.0.0.1", "::1", "127.0.0.1:5000", "localhost:5000", ...proxy.noProxy])];
return `
proxy_ip="$(kubectl -n ${shQuote(proxy.namespace)} get svc ${shQuote(proxy.serviceName)} -o 'jsonpath={.spec.clusterIP}' 2>/dev/null || true)"
if [ -z "$proxy_ip" ]; then echo "egress proxy service missing: ${proxy.namespace}/${proxy.serviceName}" >&2; exit 41; fi
export HTTP_PROXY="http://$proxy_ip:${proxy.port}"
export HTTPS_PROXY="$HTTP_PROXY"
export ALL_PROXY="$HTTP_PROXY"
export http_proxy="$HTTP_PROXY"
export https_proxy="$HTTP_PROXY"
export all_proxy="$HTTP_PROXY"
export NO_PROXY=${shQuote(noProxy.join(","))}
export no_proxy="$NO_PROXY"
`;
}
export function remoteJobStateDir(target: ControlPlaneTargetSpec, name: "tools-image" | "tekton" | "argo"): string {
return `/tmp/unidesk-hwlab-node-control-plane/${target.id}/${name}`;
}
export function shellJsonArray(items: readonly string[]): string {
return JSON.stringify([...items]);
}
export function parseRemoteJson(text: string): unknown {
const trimmed = text.trim();
if (trimmed.length === 0) return null;
try { return JSON.parse(trimmed); } catch {
const start = trimmed.indexOf("{");
const end = trimmed.lastIndexOf("}");
if (start >= 0 && end > start) {
try { return JSON.parse(trimmed.slice(start, end + 1)); } catch {}
}
}
return null;
}
export function ciBuildBenchmarkStateDir(target: ControlPlaneTargetSpec, profile: string): string {
return `/tmp/unidesk-hwlab-node-control-plane/${target.id}/ci-build-benchmark-${profile}`;
}
export function ciBuildBenchmarkLiveOk(job: Record<string, unknown>, expectedServices: readonly string[], profile: CiBuildBenchmarkProfileSpec): boolean {
const pipelineRun = record(job.pipelineRun);
const pipelineStatus = renderCell(pipelineRun.status, "");
if (pipelineStatus === "False") return false;
if (pipelineStatus !== "True") return true;
const taskRuns = ciBuildBenchmarkTaskRunRecords(job);
for (const service of expectedServices) {
if (!taskRuns.some((task) => task.pipelineTask === `build-${service}`)) return false;
}
return ciBuildBenchmarkPolicyOk(job, profile.cachePolicy);
}
export function ciBuildBenchmarkTaskRows(job: Record<string, unknown>): Record<string, string>[] {
const pipelineRun = record(job.pipelineRun);
const rows: Record<string, string>[] = [];
if (Object.keys(pipelineRun).length > 0) {
rows.push({
task: "pipeline-total",
status: ciBuildBenchmarkStatusText(pipelineRun.status),
duration: durationBetweenIso(pipelineRun.startTime, pipelineRun.completionTime),
start: shortIsoTime(pipelineRun.startTime),
end: shortIsoTime(pipelineRun.completionTime),
});
}
const taskRuns = ciBuildBenchmarkTaskRunRecords(job).sort((left, right) => renderCell(left.startTime, "").localeCompare(renderCell(right.startTime, "")));
for (const task of taskRuns) {
rows.push({
task: renderCell(task.pipelineTask ?? task.name),
status: ciBuildBenchmarkStatusText(task.status),
duration: durationBetweenIso(task.startTime, task.completionTime),
start: shortIsoTime(task.startTime),
end: shortIsoTime(task.completionTime),
});
}
return rows;
}
export function ciBuildBenchmarkServiceRows(job: Record<string, unknown>, servicesValue: unknown): Record<string, string>[] {
const services = ciBuildBenchmarkExpectedServices(servicesValue);
if (services.length === 0) return [];
const pipelineRun = record(job.pipelineRun);
const pipelineTerminal = pipelineRun.status === "True" || pipelineRun.status === "False";
const taskRuns = ciBuildBenchmarkTaskRunRecords(job);
return services.map((service) => {
const task = taskRuns.find((item) => item.pipelineTask === `build-${service}`);
if (task === undefined) {
const status = pipelineTerminal ? "missing" : "pending";
return {
service,
task: `build-${service}`,
status,
duration: "-",
failure: pipelineRun.status === "True" ? "cache-hit-forbidden" : "-",
};
}
const taskStatus = ciBuildBenchmarkStatusText(task.status);
const failure = task.status === "False" ? classifyCiBuildBenchmarkFailure(`${renderCell(task.reason, "")}\n${renderCell(task.message, "")}`) : "-";
return {
service,
task: renderCell(task.pipelineTask ?? task.name),
status: taskStatus,
duration: durationBetweenIso(task.startTime, task.completionTime),
failure,
};
});
}
export function ciBuildBenchmarkFailureRows(job: Record<string, unknown>, serviceRows: readonly Record<string, string>[], benchmark: Record<string, unknown>): Record<string, string>[] {
const counts = new Map<string, { count: number; scopes: string[] }>();
const add = (family: string, scope: string): void => {
if (family === "-" || family.length === 0) return;
const existing = counts.get(family) ?? { count: 0, scopes: [] };
existing.count += 1;
if (!existing.scopes.includes(scope)) existing.scopes.push(scope);
counts.set(family, existing);
};
for (const row of serviceRows) add(row.failure, row.service);
const pipelineRun = record(job.pipelineRun);
if (pipelineRun.status === "False") {
add(classifyCiBuildBenchmarkFailure(`${renderCell(pipelineRun.reason, "")}\n${renderCell(pipelineRun.message, "")}`), "pipeline");
}
const cachePolicy = record(benchmark.cachePolicy);
if (cachePolicy.forbidBuildkitCache === true && ciBuildBenchmarkLogHasBuildkitCache(job)) add("cache-hit-forbidden", "buildkit-cache");
if (cachePolicy.forbidGitopsCatalogReuse === true && ciBuildBenchmarkLogHasReuse(job)) add("cache-hit-forbidden", "artifact-reuse");
return [...counts.entries()].map(([family, value]) => ({ family, count: String(value.count), scope: value.scopes.join(",") }));
}
export function ciBuildBenchmarkPolicyOk(job: Record<string, unknown>, cachePolicy: CiBuildBenchmarkCachePolicy): boolean {
if (cachePolicy.forbidBuildkitCache && ciBuildBenchmarkLogHasBuildkitCache(job)) return false;
if (cachePolicy.forbidGitopsCatalogReuse && ciBuildBenchmarkLogHasReuse(job)) return false;
return true;
}
export function ciBuildBenchmarkLogHasBuildkitCache(job: Record<string, unknown>): boolean {
const logTail = typeof job.logTail === "string" ? job.logTail : "";
return /"buildkitCacheRef"\s*:\s*"[^"]+"|--import-cache|--export-cache|writing cache image manifest/iu.test(logTail);
}
export function ciBuildBenchmarkLogHasReuse(job: Record<string, unknown>): boolean {
const logTail = typeof job.logTail === "string" ? job.logTail : "";
return /"reusedFrom"\s*:\s*"(?!null)|"status"\s*:\s*"reused"/iu.test(logTail);
}
export function ciBuildBenchmarkTaskRunRecords(job: Record<string, unknown>): Record<string, unknown>[] {
return Array.isArray(job.taskRuns) ? job.taskRuns.map(record) : [];
}
export function ciBuildBenchmarkExpectedServices(value: unknown): string[] {
return Array.isArray(value) ? value.filter((item): item is string => typeof item === "string" && item.length > 0) : [];
}
export function ciBuildBenchmarkStatusText(value: unknown): string {
if (value === "True") return "succeeded";
if (value === "False") return "failed";
if (value === "Unknown") return "running";
return renderCell(value, "pending");
}
export function classifyCiBuildBenchmarkFailure(text: string): string {
const value = text.toLowerCase();
if (/cache-hit-forbidden|reused-from|reuse/i.test(text)) return "cache-hit-forbidden";
if (/no such host|could not resolve|enotfound|dns/i.test(text)) return "dns";
if (/429|rate limit|too many requests|toomanyrequests/i.test(text)) return "rate-limit";
if (/tls|certificate|x509|timeout|timed out|i\/o timeout/i.test(text)) return "tls-timeout";
if (/proxy|connect|connection reset|connection refused|econn/i.test(text)) return "proxy-connect";
if (/unauthorized|authentication required|permission denied|forbidden|denied/i.test(text)) return "auth";
if (/imagepullbackoff|errimagepull|imagepolicy|pull access denied/i.test(text)) return "image-policy";
if (/push|registry|blob upload|manifest invalid|manifest unknown/i.test(text)) return "registry-push";
return value.trim().length === 0 ? "unknown" : "build-script";
}
export function durationBetweenIso(startValue: unknown, endValue: unknown): string {
if (typeof startValue !== "string" || startValue.length === 0) return "-";
const start = Date.parse(startValue);
if (!Number.isFinite(start)) return "-";
const end = typeof endValue === "string" && endValue.length > 0 ? Date.parse(endValue) : Date.now();
if (!Number.isFinite(end) || end < start) return "-";
return formatDurationMs(end - start);
}
export function formatDurationMs(ms: number): string {
const seconds = Math.round(ms / 1000);
const minutes = Math.floor(seconds / 60);
const rest = seconds % 60;
return minutes > 0 ? `${minutes}m${String(rest).padStart(2, "0")}s` : `${seconds}s`;
}
export function shortIsoTime(value: unknown): string {
if (typeof value !== "string" || value.length === 0) return "-";
return value.replace(/^\d{4}-\d{2}-\d{2}T/u, "").replace(/Z$/u, "Z");
}
export function shortDisplay(value: string): string {
return /^[0-9a-f]{40}$/iu.test(value) ? value.slice(0, 12).toLowerCase() : value;
}
export function validateBenchmarkProfileName(value: string, path: string): void {
if (!/^[a-z0-9]([-a-z0-9]*[a-z0-9])?$/u.test(value) || value.length > 63) throw new Error(`${path} must be a DNS-label style benchmark profile`);
}
export function validateBenchmarkCatalogPathTemplate(value: string, path: string): void {
if (!value.includes("{profile}") || !value.includes("{pipelineRun}")) throw new Error(`${path} must include {profile} and {pipelineRun}`);
if (value.startsWith("/") || value.includes("\n") || value.includes("\r")) throw new Error(`${path} must be a relative repo path template`);
const rendered = value.replace(/\{profile\}/gu, "profile").replace(/\{pipelineRun\}/gu, "pipeline-run");
if (rendered.split("/").some((segment) => segment === ".." || segment.length === 0)) throw new Error(`${path} must not contain empty or parent path segments`);
if (!rendered.endsWith(".json")) throw new Error(`${path} must render to a JSON catalog path`);
}
export function asRecord(value: unknown, path: string): Record<string, unknown> {
if (typeof value !== "object" || value === null || Array.isArray(value)) throw new Error(`${path} must be an object`);
return value as Record<string, unknown>;
}
export function record(value: unknown): Record<string, unknown> {
return typeof value === "object" && value !== null && !Array.isArray(value) ? value as Record<string, unknown> : {};
}
export function renderTable(headers: string[], rows: string[][]): string[] {
const widths = headers.map((header, index) => Math.max(header.length, ...rows.map((row) => row[index]?.length ?? 0)));
const render = (row: string[]) => row.map((cell, index) => cell.padEnd(widths[index] ?? cell.length)).join(" ").trimEnd();
return [render(headers), ...rows.map(render)];
}
export function renderCell(value: unknown, fallback = "-"): string {
if (value === undefined || value === null || value === "") return fallback;
return String(value);
}
export function optionsModeFromCommand(command: unknown): string {
const value = String(command ?? "");
if (value.endsWith(" status") || value.endsWith(" logs")) return "status";
return "benchmark";
}
export function stringField(obj: Record<string, unknown>, key: string, path: string): string {
const value = obj[key];
if (typeof value !== "string" || value.length === 0) throw new Error(`${path}.${key} must be a non-empty string`);
return value;
}
export function optionalStringField(obj: Record<string, unknown>, key: string, path: string): string | undefined {
const value = obj[key];
if (value === undefined) return undefined;
if (typeof value !== "string" || value.length === 0) throw new Error(`${path}.${key} must be a non-empty string`);
return value;
}
export function absoluteConfigPathField(obj: Record<string, unknown>, key: string, path: string): string {
const value = stringField(obj, key, path);
if (!value.startsWith("/") || value.includes("\0") || value.includes("..")) throw new Error(`${path}.${key} must be an absolute path without '..'`);
return value;
}
export function validateKubernetesName(value: string, path: string): void {
if (!/^[a-z0-9]([-a-z0-9]*[a-z0-9])?$/u.test(value) || value.length > 253) throw new Error(`${path} must be a Kubernetes resource name`);
}
export function validateSecretKey(value: string, path: string): void {
if (!/^[A-Za-z0-9._-]+$/u.test(value)) throw new Error(`${path} must be a Kubernetes Secret key`);
}
export function validateEnvKey(value: string, path: string): void {
if (!/^[A-Z0-9_]+$/u.test(value)) throw new Error(`${path} must be an env key`);
}
export function validateSourceRef(value: string, path: string): void {
if (!/^[A-Za-z0-9_./-]+$/u.test(value) || value.includes("..")) throw new Error(`${path} has an unsupported sourceRef format`);
}
export function numberField(obj: Record<string, unknown>, key: string, path: string): number {
const value = obj[key];
if (typeof value !== "number" || !Number.isInteger(value) || value <= 0) throw new Error(`${path}.${key} must be a positive integer`);
return value;
}
export function positiveConfigIntegerField(obj: Record<string, unknown>, key: string, path: string): number {
const value = obj[key];
if (typeof value !== "number" || !Number.isInteger(value) || value <= 0) throw new Error(`${path}.${key} must be a positive integer`);
return value;
}
export function nonNegativeIntegerField(obj: Record<string, unknown>, key: string, path: string): number {
const value = obj[key];
if (typeof value !== "number" || !Number.isInteger(value) || value < 0) throw new Error(`${path}.${key} must be a non-negative integer`);
return value;
}
export function numberArrayField(obj: Record<string, unknown>, key: string, path: string): number[] {
const value = obj[key];
if (!Array.isArray(value) || value.some((item) => typeof item !== "number" || !Number.isInteger(item))) throw new Error(`${path}.${key} must be an array of integers`);
return [...value] as number[];
}
export function stringArrayField(obj: Record<string, unknown>, key: string, path: string): string[] {
const value = obj[key];
if (!Array.isArray(value) || value.some((item) => typeof item !== "string" || item.length === 0)) throw new Error(`${path}.${key} must be an array of non-empty strings`);
return [...value] as string[];
}
export function stringRecordField(obj: Record<string, unknown>, path: string): Record<string, string> {
const result: Record<string, string> = {};
for (const [key, value] of Object.entries(obj)) {
if (!/^[A-Za-z_][A-Za-z0-9_]*$/u.test(key)) throw new Error(`${path}.${key} has an unsupported key format`);
if (typeof value !== "string" || value.length === 0) throw new Error(`${path}.${key} must be a non-empty string`);
result[key] = value;
}
return result;
}
export function booleanField(obj: Record<string, unknown>, key: string, path: string): boolean {
const value = obj[key];
if (typeof value !== "boolean") throw new Error(`${path}.${key} must be a boolean`);
return value;
}
export function boolField(obj: Record<string, unknown>, key: string): boolean {
return obj[key] === true;
}
export function numberValue(value: unknown): number | null {
return typeof value === "number" && Number.isFinite(value) ? value : null;
}
export function requiredOption(args: string[], name: string): string {
const index = args.indexOf(name);
if (index === -1) throw new Error(`${name} is required`);
const value = args[index + 1];
if (value === undefined || value.startsWith("--") || value.length === 0) throw new Error(`${name} requires a value`);
return value;
}
export function optionValue(args: string[], name: string): string | null {
const index = args.indexOf(name);
if (index === -1) return null;
const value = args[index + 1];
if (value === undefined || value.startsWith("--") || value.length === 0) throw new Error(`${name} requires a value`);
return value;
}
export function positiveIntegerOption(args: string[], name: string, defaultValue: number, maxValue: number): number {
const raw = optionValue(args, name);
if (raw === null) return defaultValue;
const value = Number(raw);
if (!Number.isInteger(value) || value <= 0) throw new Error(`${name} must be a positive integer`);
return Math.min(value, maxValue);
}
export function compactCommandResult(result: CommandResult): Record<string, unknown> {
return {
exitCode: result.exitCode,
timedOut: result.timedOut,
stdoutBytes: Buffer.byteLength(result.stdout),
stderrBytes: Buffer.byteLength(result.stderr),
stdoutTail: result.stdout.slice(-2000),
stderrTail: result.stderr.slice(-2000),
};
}
export function shQuote(value: string): string {
return `'${value.replace(/'/gu, `'"'"'`)}'`;
}
export function validateHttpsUrl(value: string, path: string): void {
let parsed: URL;
try {
parsed = new URL(value);
} catch {
throw new Error(`${path} must be a valid URL`);
}
if (parsed.protocol !== "https:") throw new Error(`${path} must use https://`);
}
export function hostRouteProxyHostAllowed(value: string): boolean {
if (value === "127.0.0.1" || value === "localhost") return true;
const parts = value.split(".").map((part) => Number(part));
if (parts.length !== 4 || parts.some((part) => !Number.isInteger(part) || part < 0 || part > 255)) return false;
return parts[0] === 10
|| (parts[0] === 172 && parts[1] >= 16 && parts[1] <= 31)
|| (parts[0] === 192 && parts[1] === 168);
}
export function sha256Short(text: string): string {
return `sha256:${createHash("sha256").update(text).digest("hex")}`;
}