fix: remove kubectl from follower state helpers

This commit is contained in:
Codex
2026-07-03 18:04:58 +00:00
parent c5f665726d
commit c94f83d3f0
6 changed files with 190 additions and 63 deletions
@@ -0,0 +1,50 @@
import { readFileSync } from "node:fs";
import https from "node:https";
const namespace = process.env.NAMESPACE || "";
const configMap = process.env.CONFIGMAP || "";
const patch = JSON.parse(Buffer.from(process.env.PATCH_B64 || "", "base64").toString("utf8"));
const host = process.env.KUBERNETES_SERVICE_HOST;
const port = Number(process.env.KUBERNETES_SERVICE_PORT || "443");
const token = readFileSync("/var/run/secrets/kubernetes.io/serviceaccount/token", "utf8").trim();
const ca = readFileSync("/var/run/secrets/kubernetes.io/serviceaccount/ca.crt");
function request(method, path, body, contentType = "application/json") {
return new Promise((resolve, reject) => {
const headers = { authorization: `Bearer ${token}` };
const payload = body === undefined ? null : typeof body === "string" ? body : JSON.stringify(body);
if (payload !== null) {
headers["content-type"] = contentType;
headers["content-length"] = Buffer.byteLength(payload);
}
const req = https.request({ host, port, path, method, ca, headers }, (res) => {
let text = "";
res.setEncoding("utf8");
res.on("data", (chunk) => { text += chunk; });
res.on("end", () => resolve({ status: res.statusCode || 0, text }));
});
req.on("error", reject);
if (payload !== null) req.write(payload);
req.end();
});
}
const path = `/api/v1/namespaces/${encodeURIComponent(namespace)}/configmaps/${encodeURIComponent(configMap)}`;
const before = await request("GET", path);
if (before.status === 404) {
process.stdout.write(JSON.stringify({ ok: true, present: false, patched: false, reason: "state-configmap-not-found", parsedDownstreamCliOutput: false }));
process.exit(0);
}
if (before.status < 200 || before.status >= 300) throw new Error(before.text || `kube api GET configmap status ${before.status}`);
const beforeObject = JSON.parse(before.text);
const result = await request("PATCH", path, patch, "application/merge-patch+json");
if (result.status < 200 || result.status >= 300) throw new Error(result.text || `kube api PATCH configmap status ${result.status}`);
const afterObject = JSON.parse(result.text);
process.stdout.write(JSON.stringify({
ok: true,
present: true,
patched: true,
beforeResourceVersion: beforeObject?.metadata?.resourceVersion || null,
afterResourceVersion: afterObject?.metadata?.resourceVersion || null,
parsedDownstreamCliOutput: false,
}));
+38 -21
View File
@@ -1,38 +1,54 @@
import { execFileSync } from "node:child_process";
import { readFileSync } from "node:fs";
import https from "node:https";
const namespace = process.env.NAMESPACE || "";
const configMap = process.env.CONFIGMAP || "";
const followerId = process.env.FOLLOWER_ID || "";
const specRef = process.env.SPEC_REF || "";
const stateJson = Buffer.from(process.env.STATE_B64 || "", "base64").toString("utf8");
const host = process.env.KUBERNETES_SERVICE_HOST;
const port = Number(process.env.KUBERNETES_SERVICE_PORT || "443");
const token = readFileSync("/var/run/secrets/kubernetes.io/serviceaccount/token", "utf8").trim();
const ca = readFileSync("/var/run/secrets/kubernetes.io/serviceaccount/ca.crt");
function kubectl(args, input) {
return execFileSync("kubectl", ["-n", namespace, ...args], {
input,
encoding: "utf8",
stdio: ["pipe", "pipe", "pipe"],
function request(method, path, body, contentType = "application/json") {
return new Promise((resolve, reject) => {
const headers = { authorization: `Bearer ${token}` };
const payload = body === undefined ? null : typeof body === "string" ? body : JSON.stringify(body);
if (payload !== null) {
headers["content-type"] = contentType;
headers["content-length"] = Buffer.byteLength(payload);
}
const req = https.request({ host, port, path, method, ca, headers }, (res) => {
let text = "";
res.setEncoding("utf8");
res.on("data", (chunk) => { text += chunk; });
res.on("end", () => resolve({ status: res.statusCode || 0, text }));
});
req.on("error", reject);
if (payload !== null) req.write(payload);
req.end();
});
}
function readConfigMap() {
try {
return JSON.parse(kubectl(["get", "configmap", configMap, "-o", "json"]));
} catch (error) {
const stderr = String(error?.stderr || error?.message || "");
if (/not found/i.test(stderr)) return null;
throw error;
}
async function readConfigMap() {
const result = await request("GET", `/api/v1/namespaces/${encodeURIComponent(namespace)}/configmaps/${encodeURIComponent(configMap)}`);
if (result.status === 404) return null;
if (result.status < 200 || result.status >= 300) throw new Error(result.text || `kube api GET configmap status ${result.status}`);
return JSON.parse(result.text);
}
function ensureConfigMap() {
if (readConfigMap() !== null) return;
async function ensureConfigMap() {
if (await readConfigMap() !== null) return;
const object = {
apiVersion: "v1",
kind: "ConfigMap",
metadata: { name: configMap, namespace },
data: { _createdAt: new Date().toISOString(), _specRef: specRef },
};
kubectl(["apply", "-f", "-"], JSON.stringify(object));
const result = await request("POST", `/api/v1/namespaces/${encodeURIComponent(namespace)}/configmaps`, object);
if (result.status === 409) return;
if (result.status < 200 || result.status >= 300) throw new Error(result.text || `kube api POST configmap status ${result.status}`);
}
function stringOrNull(value) {
@@ -91,8 +107,8 @@ function preserveExistingTiming(state, existing) {
};
}
ensureConfigMap();
const current = readConfigMap();
await ensureConfigMap();
const current = await readConfigMap();
const beforeResourceVersion = stringOrNull(current?.metadata?.resourceVersion);
const beforeUpdatedAt = stringOrNull(current?.data?._updatedAt);
const currentText = current?.data?.[followerId];
@@ -106,8 +122,9 @@ const patch = {
_specRef: specRef,
},
};
kubectl(["patch", "configmap", configMap, "--type", "merge", "-p", JSON.stringify(patch)]);
const updated = readConfigMap();
const patchResult = await request("PATCH", `/api/v1/namespaces/${encodeURIComponent(namespace)}/configmaps/${encodeURIComponent(configMap)}`, patch, "application/merge-patch+json");
if (patchResult.status < 200 || patchResult.status >= 300) throw new Error(patchResult.text || `kube api PATCH configmap status ${patchResult.status}`);
const updated = await readConfigMap();
process.stdout.write(JSON.stringify({
ok: true,
followerId,
+33 -12
View File
@@ -1,9 +1,14 @@
import { execFileSync } from "node:child_process";
import { readFileSync } from "node:fs";
import https from "node:https";
const namespace = process.env.NAMESPACE || "";
const configMap = process.env.CONFIGMAP || "";
const followerIds = parseFollowerIds(process.env.FOLLOWERS_JSON || "[]");
const maxTimingStages = Number(process.env.MAX_TIMING_STAGES || "24");
const host = process.env.KUBERNETES_SERVICE_HOST;
const port = Number(process.env.KUBERNETES_SERVICE_PORT || "443");
const token = readFileSync("/var/run/secrets/kubernetes.io/serviceaccount/token", "utf8").trim();
const ca = readFileSync("/var/run/secrets/kubernetes.io/serviceaccount/ca.crt");
function parseFollowerIds(text) {
try {
@@ -14,18 +19,34 @@ function parseFollowerIds(text) {
}
}
function kubectlConfigMap() {
try {
const stdout = execFileSync("kubectl", ["-n", namespace, "get", "configmap", configMap, "-o", "json"], {
encoding: "utf8",
maxBuffer: 16 * 1024 * 1024,
stdio: ["ignore", "pipe", "pipe"],
function request(method, path, body, contentType = "application/json") {
return new Promise((resolve, reject) => {
const headers = { authorization: `Bearer ${token}` };
const payload = body === undefined ? null : typeof body === "string" ? body : JSON.stringify(body);
if (payload !== null) {
headers["content-type"] = contentType;
headers["content-length"] = Buffer.byteLength(payload);
}
const req = https.request({ host, port, path, method, ca, headers }, (res) => {
let text = "";
res.setEncoding("utf8");
res.on("data", (chunk) => { text += chunk; });
res.on("end", () => resolve({ status: res.statusCode || 0, text }));
});
return { ok: true, present: true, object: JSON.parse(stdout), error: "" };
req.on("error", reject);
if (payload !== null) req.write(payload);
req.end();
});
}
async function readConfigMap() {
try {
const result = await request("GET", `/api/v1/namespaces/${encodeURIComponent(namespace)}/configmaps/${encodeURIComponent(configMap)}`);
if (result.status === 404) return { ok: true, present: false, object: null, error: result.text };
if (result.status < 200 || result.status >= 300) return { ok: false, present: false, object: null, error: result.text || `kube api GET configmap status ${result.status}` };
return { ok: true, present: true, object: JSON.parse(result.text), error: "" };
} catch (error) {
const stderr = String(error?.stderr || error?.message || "");
if (/not found/i.test(stderr)) return { ok: true, present: false, object: null, error: stderr };
return { ok: false, present: false, object: null, error: stderr || "kubectl configmap read failed" };
return { ok: false, present: false, object: null, error: error?.message || String(error) };
}
}
@@ -139,7 +160,7 @@ function numberOrNull(value) {
return typeof value === "number" && Number.isFinite(value) ? value : null;
}
const result = kubectlConfigMap();
const result = await readConfigMap();
const errors = [];
const stateByFollower = {};
const valueBytes = {};
+63 -30
View File
@@ -1857,8 +1857,9 @@ function mergeFollowerStatus(
function readK8sState(registry: BranchFollowerRegistry, options: ParsedOptions): K8sStateRead {
const errors: string[] = [];
const stateResult = kubeConfigMapFollowerState(registry, options);
const deploymentResult = kubeJson(registry, options, `kubectl -n ${shQuote(registry.controller.namespace)} get deploy ${shQuote(registry.controller.deploymentName)} -o json`, 10_000);
const leaseResult = kubeJson(registry, options, `kubectl -n ${shQuote(registry.controller.namespace)} get lease ${shQuote(registry.controller.leaseName)} -o json`, 10_000);
const namespace = registry.controller.namespace;
const deploymentResult = kubeJson(registry, options, `kubectl -n ${shQuote(namespace)} get deploy ${shQuote(registry.controller.deploymentName)} -o json`, 10_000, `/apis/apps/v1/namespaces/${encodeURIComponent(namespace)}/deployments/${encodeURIComponent(registry.controller.deploymentName)}`);
const leaseResult = kubeJson(registry, options, `kubectl -n ${shQuote(namespace)} get lease ${shQuote(registry.controller.leaseName)} -o json`, 10_000, `/apis/coordination.k8s.io/v1/namespaces/${encodeURIComponent(namespace)}/leases/${encodeURIComponent(registry.controller.leaseName)}`);
const podSelector = labelSelector(registry.controller.labels);
const podsResult = kubePodList(registry, options, podSelector);
if (!stateResult.ok) errors.push(`state configmap: ${stateResult.error}`);
@@ -1937,31 +1938,54 @@ function stateWriteSummary(followerId: string, result: CommandResult): Record<st
function removeFollowerStateKeys(registry: BranchFollowerRegistry, options: ParsedOptions, ids: string[]): CommandResult {
const patch = JSON.stringify({ data: Object.fromEntries(ids.map((id) => [id, null])) });
const script = [
"set -eu",
"tmpdir=$(mktemp -d)",
"cleanup() { rm -rf \"$tmpdir\"; }",
"trap cleanup EXIT INT TERM",
`NAMESPACE=${shQuote(registry.controller.namespace)}`,
`CONFIGMAP=${shQuote(registry.controller.stateConfigMapName)}`,
`PATCH=${shQuote(patch)}`,
"export NAMESPACE CONFIGMAP PATCH",
"if ! kubectl -n \"$NAMESPACE\" get configmap \"$CONFIGMAP\" >/dev/null 2>\"$tmpdir/error\"; then",
" if grep -qi 'not found' \"$tmpdir/error\"; then",
" printf '{\"ok\":true,\"present\":false,\"patched\":false,\"reason\":\"state-configmap-not-found\",\"parsedDownstreamCliOutput\":false}'",
" exit 0",
" fi",
" cat \"$tmpdir/error\" >&2",
" exit 1",
"fi",
"kubectl -n \"$NAMESPACE\" patch configmap \"$CONFIGMAP\" --type merge -p \"$PATCH\" >/dev/null",
"printf '{\"ok\":true,\"present\":true,\"patched\":true,\"parsedDownstreamCliOutput\":false}'",
].join("\n");
const script = options.inCluster
? [
"set -eu",
"tmpdir=$(mktemp -d)",
"cleanup() { rm -rf \"$tmpdir\"; }",
"trap cleanup EXIT INT TERM",
nativeCicdScriptLoadShell(["patch-configmap-data.mjs"]),
`NAMESPACE=${shQuote(registry.controller.namespace)}`,
`CONFIGMAP=${shQuote(registry.controller.stateConfigMapName)}`,
`PATCH_B64=${shQuote(Buffer.from(patch, "utf8").toString("base64"))}`,
"export NAMESPACE CONFIGMAP PATCH_B64",
"node \"$tmpdir/patch-configmap-data.mjs\"",
].join("\n")
: [
"set -eu",
"tmpdir=$(mktemp -d)",
"cleanup() { rm -rf \"$tmpdir\"; }",
"trap cleanup EXIT INT TERM",
`NAMESPACE=${shQuote(registry.controller.namespace)}`,
`CONFIGMAP=${shQuote(registry.controller.stateConfigMapName)}`,
`PATCH=${shQuote(patch)}`,
"export NAMESPACE CONFIGMAP PATCH",
"if ! kubectl -n \"$NAMESPACE\" get configmap \"$CONFIGMAP\" >/dev/null 2>\"$tmpdir/error\"; then",
" if grep -qi 'not found' \"$tmpdir/error\"; then",
" printf '{\"ok\":true,\"present\":false,\"patched\":false,\"reason\":\"state-configmap-not-found\",\"parsedDownstreamCliOutput\":false}'",
" exit 0",
" fi",
" cat \"$tmpdir/error\" >&2",
" exit 1",
"fi",
"kubectl -n \"$NAMESPACE\" patch configmap \"$CONFIGMAP\" --type merge -p \"$PATCH\" >/dev/null",
"printf '{\"ok\":true,\"present\":true,\"patched\":true,\"parsedDownstreamCliOutput\":false}'",
].join("\n");
return runKubeScript(registry, options, script, "", 10_000);
}
function kubeJson(registry: BranchFollowerRegistry, options: ParsedOptions, command: string, timeoutMs: number): { ok: boolean; value: Record<string, unknown> | null; error: string } {
const result = runKubeScript(registry, options, `set -eu\n${command}`, "", timeoutMs);
function kubeJson(registry: BranchFollowerRegistry, options: ParsedOptions, command: string, timeoutMs: number, inClusterPath?: string): { ok: boolean; value: Record<string, unknown> | null; error: string } {
const script = options.inCluster && inClusterPath !== undefined
? [
"set -eu",
"tmpdir=$(mktemp -d)",
"cleanup() { rm -rf \"$tmpdir\"; }",
"trap cleanup EXIT INT TERM",
nativeCicdScriptLoadShell(["kube-get.mjs"]),
`node "$tmpdir/kube-get.mjs" ${shQuote(inClusterPath)}`,
].join("\n")
: `set -eu\n${command}`;
const result = runKubeScript(registry, options, script, "", timeoutMs);
const value = result.exitCode === 0 ? parseJsonObject(result.stdout) : null;
return {
ok: result.exitCode === 0 && value !== null,
@@ -1972,12 +1996,21 @@ function kubeJson(registry: BranchFollowerRegistry, options: ParsedOptions, comm
function kubePodList(registry: BranchFollowerRegistry, options: ParsedOptions, selector: string): { ok: boolean; value: Record<string, unknown> | null; error: string } {
const command = `kubectl -n ${shQuote(registry.controller.namespace)} get pods -l ${shQuote(selector)} -o name`;
const result = runKubeScript(registry, options, `set -eu\n${command}`, "", 10_000);
const names = result.stdout
.split(/\r?\n/u)
.map((line) => line.trim())
.filter((line) => line.length > 0)
.map((line) => line.replace(/^pod\//u, ""));
const script = options.inCluster
? [
"set -eu",
"tmpdir=$(mktemp -d)",
"cleanup() { rm -rf \"$tmpdir\"; }",
"trap cleanup EXIT INT TERM",
nativeCicdScriptLoadShell(["kube-get.mjs"]),
`node "$tmpdir/kube-get.mjs" ${shQuote(`/api/v1/namespaces/${encodeURIComponent(registry.controller.namespace)}/pods?labelSelector=${encodeURIComponent(selector)}`)}`,
].join("\n")
: `set -eu\n${command}`;
const result = runKubeScript(registry, options, script, "", 10_000);
const parsed = options.inCluster && result.exitCode === 0 ? parseJsonObject(result.stdout) : null;
const names = options.inCluster
? arrayRecords(parsed?.items).map((item) => stringOrNull(asOptionalRecord(item.metadata)?.name)).filter((name): name is string => name !== null)
: result.stdout.split(/\r?\n/u).map((line) => line.trim()).filter((line) => line.length > 0).map((line) => line.replace(/^pod\//u, ""));
return {
ok: result.exitCode === 0,
value: result.exitCode === 0 ? { items: names.map((name) => ({ metadata: { name } })) } : null,