fix: support follower state helper planes
This commit is contained in:
@@ -113,6 +113,8 @@ The branch follower must not parse downstream CLI stdout/stderr, `kubectl` human
|
|||||||
|
|
||||||
In-cluster controller and native helper scripts must not require a `kubectl` binary in the image. Native helpers that read or write ConfigMaps, Jobs, PipelineRuns, Argo Applications, Pods or logs must use the serviceaccount token and Kubernetes HTTPS API directly, or a shared native helper that does the same. A missing `kubectl` binary is a product defect in the helper, not a node problem. Operator-side `kubectl` through the controlled CLI/trans boundary remains acceptable only as a transport/debug wrapper.
|
In-cluster controller and native helper scripts must not require a `kubectl` binary in the image. Native helpers that read or write ConfigMaps, Jobs, PipelineRuns, Argo Applications, Pods or logs must use the serviceaccount token and Kubernetes HTTPS API directly, or a shared native helper that does the same. A missing `kubectl` binary is a product defect in the helper, not a node problem. Operator-side `kubectl` through the controlled CLI/trans boundary remains acceptable only as a transport/debug wrapper.
|
||||||
|
|
||||||
|
Native helper scripts that are reused in both execution planes must make the plane explicit. Inside a Pod/Job they use serviceaccount HTTPS API; from the operator/trans boundary they may use the controlled `kubectl` transport. A helper must not assume serviceaccount files exist on the target node, and must not assume `kubectl` exists inside the controller image.
|
||||||
|
|
||||||
The controller automatic loop submits trigger work without a blocking wait; later loops close out via the native state objects above. Failed state must not dedupe a source commit forever: retries may reuse deterministic native objects for the same source commit, and a new compact observation should be able to move the follower back into triggering or closeout.
|
The controller automatic loop submits trigger work without a blocking wait; later loops close out via the native state objects above. Failed state must not dedupe a source commit forever: retries may reuse deterministic native objects for the same source commit, and a new compact observation should be able to move the follower back into triggering or closeout.
|
||||||
|
|
||||||
State ConfigMaps must stay bounded and human-queryable. Store compact summaries, stage refs, conditions, short messages, and drill-down object names; do not store full API payloads or long log dumps. Cleanup is an explicit operator operation for stale/broken state and must not be required for normal convergence.
|
State ConfigMaps must stay bounded and human-queryable. Store compact summaries, stage refs, conditions, short messages, and drill-down object names; do not store full API payloads or long log dumps. Cleanup is an explicit operator operation for stale/broken state and must not be required for normal convergence.
|
||||||
|
|||||||
@@ -1,4 +1,6 @@
|
|||||||
import { readFileSync } from "node:fs";
|
import { readFileSync } from "node:fs";
|
||||||
|
import { execFileSync } from "node:child_process";
|
||||||
|
import { existsSync } from "node:fs";
|
||||||
import https from "node:https";
|
import https from "node:https";
|
||||||
|
|
||||||
const namespace = process.env.NAMESPACE || "";
|
const namespace = process.env.NAMESPACE || "";
|
||||||
@@ -6,10 +8,13 @@ const configMap = process.env.CONFIGMAP || "";
|
|||||||
const followerId = process.env.FOLLOWER_ID || "";
|
const followerId = process.env.FOLLOWER_ID || "";
|
||||||
const specRef = process.env.SPEC_REF || "";
|
const specRef = process.env.SPEC_REF || "";
|
||||||
const stateJson = Buffer.from(process.env.STATE_B64 || "", "base64").toString("utf8");
|
const stateJson = Buffer.from(process.env.STATE_B64 || "", "base64").toString("utf8");
|
||||||
|
const tokenPath = "/var/run/secrets/kubernetes.io/serviceaccount/token";
|
||||||
|
const caPath = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt";
|
||||||
|
const inCluster = Boolean(process.env.KUBERNETES_SERVICE_HOST && existsSync(tokenPath) && existsSync(caPath));
|
||||||
const host = process.env.KUBERNETES_SERVICE_HOST;
|
const host = process.env.KUBERNETES_SERVICE_HOST;
|
||||||
const port = Number(process.env.KUBERNETES_SERVICE_PORT || "443");
|
const port = Number(process.env.KUBERNETES_SERVICE_PORT || "443");
|
||||||
const token = readFileSync("/var/run/secrets/kubernetes.io/serviceaccount/token", "utf8").trim();
|
const token = inCluster ? readFileSync(tokenPath, "utf8").trim() : "";
|
||||||
const ca = readFileSync("/var/run/secrets/kubernetes.io/serviceaccount/ca.crt");
|
const ca = inCluster ? readFileSync(caPath) : null;
|
||||||
|
|
||||||
function request(method, path, body, contentType = "application/json") {
|
function request(method, path, body, contentType = "application/json") {
|
||||||
return new Promise((resolve, reject) => {
|
return new Promise((resolve, reject) => {
|
||||||
@@ -32,6 +37,7 @@ function request(method, path, body, contentType = "application/json") {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async function readConfigMap() {
|
async function readConfigMap() {
|
||||||
|
if (!inCluster) return readConfigMapViaKubectl();
|
||||||
const result = await request("GET", `/api/v1/namespaces/${encodeURIComponent(namespace)}/configmaps/${encodeURIComponent(configMap)}`);
|
const result = await request("GET", `/api/v1/namespaces/${encodeURIComponent(namespace)}/configmaps/${encodeURIComponent(configMap)}`);
|
||||||
if (result.status === 404) return null;
|
if (result.status === 404) return null;
|
||||||
if (result.status < 200 || result.status >= 300) throw new Error(result.text || `kube api GET configmap status ${result.status}`);
|
if (result.status < 200 || result.status >= 300) throw new Error(result.text || `kube api GET configmap status ${result.status}`);
|
||||||
@@ -46,11 +52,26 @@ async function ensureConfigMap() {
|
|||||||
metadata: { name: configMap, namespace },
|
metadata: { name: configMap, namespace },
|
||||||
data: { _createdAt: new Date().toISOString(), _specRef: specRef },
|
data: { _createdAt: new Date().toISOString(), _specRef: specRef },
|
||||||
};
|
};
|
||||||
|
if (!inCluster) {
|
||||||
|
execFileSync("kubectl", ["-n", namespace, "apply", "-f", "-"], { input: JSON.stringify(object), encoding: "utf8", stdio: ["pipe", "pipe", "pipe"] });
|
||||||
|
return;
|
||||||
|
}
|
||||||
const result = await request("POST", `/api/v1/namespaces/${encodeURIComponent(namespace)}/configmaps`, object);
|
const result = await request("POST", `/api/v1/namespaces/${encodeURIComponent(namespace)}/configmaps`, object);
|
||||||
if (result.status === 409) return;
|
if (result.status === 409) return;
|
||||||
if (result.status < 200 || result.status >= 300) throw new Error(result.text || `kube api POST configmap status ${result.status}`);
|
if (result.status < 200 || result.status >= 300) throw new Error(result.text || `kube api POST configmap status ${result.status}`);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function readConfigMapViaKubectl() {
|
||||||
|
try {
|
||||||
|
const stdout = execFileSync("kubectl", ["-n", namespace, "get", "configmap", configMap, "-o", "json"], { encoding: "utf8", stdio: ["ignore", "pipe", "pipe"] });
|
||||||
|
return JSON.parse(stdout);
|
||||||
|
} catch (error) {
|
||||||
|
const stderr = String(error?.stderr || error?.message || "");
|
||||||
|
if (/not found/i.test(stderr)) return null;
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
function stringOrNull(value) {
|
function stringOrNull(value) {
|
||||||
return typeof value === "string" && value.length > 0 ? value : null;
|
return typeof value === "string" && value.length > 0 ? value : null;
|
||||||
}
|
}
|
||||||
@@ -122,8 +143,12 @@ const patch = {
|
|||||||
_specRef: specRef,
|
_specRef: specRef,
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
const patchResult = await request("PATCH", `/api/v1/namespaces/${encodeURIComponent(namespace)}/configmaps/${encodeURIComponent(configMap)}`, patch, "application/merge-patch+json");
|
if (inCluster) {
|
||||||
if (patchResult.status < 200 || patchResult.status >= 300) throw new Error(patchResult.text || `kube api PATCH configmap status ${patchResult.status}`);
|
const patchResult = await request("PATCH", `/api/v1/namespaces/${encodeURIComponent(namespace)}/configmaps/${encodeURIComponent(configMap)}`, patch, "application/merge-patch+json");
|
||||||
|
if (patchResult.status < 200 || patchResult.status >= 300) throw new Error(patchResult.text || `kube api PATCH configmap status ${patchResult.status}`);
|
||||||
|
} else {
|
||||||
|
execFileSync("kubectl", ["-n", namespace, "patch", "configmap", configMap, "--type", "merge", "-p", JSON.stringify(patch)], { encoding: "utf8", stdio: ["ignore", "pipe", "pipe"] });
|
||||||
|
}
|
||||||
const updated = await readConfigMap();
|
const updated = await readConfigMap();
|
||||||
process.stdout.write(JSON.stringify({
|
process.stdout.write(JSON.stringify({
|
||||||
ok: true,
|
ok: true,
|
||||||
|
|||||||
@@ -1,14 +1,19 @@
|
|||||||
import { readFileSync } from "node:fs";
|
import { readFileSync } from "node:fs";
|
||||||
|
import { execFileSync } from "node:child_process";
|
||||||
|
import { existsSync } from "node:fs";
|
||||||
import https from "node:https";
|
import https from "node:https";
|
||||||
|
|
||||||
const namespace = process.env.NAMESPACE || "";
|
const namespace = process.env.NAMESPACE || "";
|
||||||
const configMap = process.env.CONFIGMAP || "";
|
const configMap = process.env.CONFIGMAP || "";
|
||||||
const followerIds = parseFollowerIds(process.env.FOLLOWERS_JSON || "[]");
|
const followerIds = parseFollowerIds(process.env.FOLLOWERS_JSON || "[]");
|
||||||
const maxTimingStages = Number(process.env.MAX_TIMING_STAGES || "24");
|
const maxTimingStages = Number(process.env.MAX_TIMING_STAGES || "24");
|
||||||
|
const tokenPath = "/var/run/secrets/kubernetes.io/serviceaccount/token";
|
||||||
|
const caPath = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt";
|
||||||
|
const inCluster = Boolean(process.env.KUBERNETES_SERVICE_HOST && existsSync(tokenPath) && existsSync(caPath));
|
||||||
const host = process.env.KUBERNETES_SERVICE_HOST;
|
const host = process.env.KUBERNETES_SERVICE_HOST;
|
||||||
const port = Number(process.env.KUBERNETES_SERVICE_PORT || "443");
|
const port = Number(process.env.KUBERNETES_SERVICE_PORT || "443");
|
||||||
const token = readFileSync("/var/run/secrets/kubernetes.io/serviceaccount/token", "utf8").trim();
|
const token = inCluster ? readFileSync(tokenPath, "utf8").trim() : "";
|
||||||
const ca = readFileSync("/var/run/secrets/kubernetes.io/serviceaccount/ca.crt");
|
const ca = inCluster ? readFileSync(caPath) : null;
|
||||||
|
|
||||||
function parseFollowerIds(text) {
|
function parseFollowerIds(text) {
|
||||||
try {
|
try {
|
||||||
@@ -40,6 +45,7 @@ function request(method, path, body, contentType = "application/json") {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async function readConfigMap() {
|
async function readConfigMap() {
|
||||||
|
if (!inCluster) return readConfigMapViaKubectl();
|
||||||
try {
|
try {
|
||||||
const result = await request("GET", `/api/v1/namespaces/${encodeURIComponent(namespace)}/configmaps/${encodeURIComponent(configMap)}`);
|
const result = await request("GET", `/api/v1/namespaces/${encodeURIComponent(namespace)}/configmaps/${encodeURIComponent(configMap)}`);
|
||||||
if (result.status === 404) return { ok: true, present: false, object: null, error: result.text };
|
if (result.status === 404) return { ok: true, present: false, object: null, error: result.text };
|
||||||
@@ -50,6 +56,21 @@ async function readConfigMap() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function readConfigMapViaKubectl() {
|
||||||
|
try {
|
||||||
|
const stdout = execFileSync("kubectl", ["-n", namespace, "get", "configmap", configMap, "-o", "json"], {
|
||||||
|
encoding: "utf8",
|
||||||
|
maxBuffer: 16 * 1024 * 1024,
|
||||||
|
stdio: ["ignore", "pipe", "pipe"],
|
||||||
|
});
|
||||||
|
return { ok: true, present: true, object: JSON.parse(stdout), error: "" };
|
||||||
|
} catch (error) {
|
||||||
|
const stderr = String(error?.stderr || error?.message || "");
|
||||||
|
if (/not found/i.test(stderr)) return { ok: true, present: false, object: null, error: stderr };
|
||||||
|
return { ok: false, present: false, object: null, error: stderr || "kubectl configmap read failed" };
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
function compactStateText(text) {
|
function compactStateText(text) {
|
||||||
if (typeof text !== "string" || text.length === 0) return null;
|
if (typeof text !== "string" || text.length === 0) return null;
|
||||||
let state;
|
let state;
|
||||||
|
|||||||
Reference in New Issue
Block a user