164 lines
6.8 KiB
JavaScript
164 lines
6.8 KiB
JavaScript
import { readFileSync } from "node:fs";
|
|
import { execFileSync } from "node:child_process";
|
|
import { existsSync } from "node:fs";
|
|
import https from "node:https";
|
|
|
|
const namespace = process.env.NAMESPACE || "";
|
|
const configMap = process.env.CONFIGMAP || "";
|
|
const followerId = process.env.FOLLOWER_ID || "";
|
|
const specRef = process.env.SPEC_REF || "";
|
|
const stateJson = Buffer.from(process.env.STATE_B64 || "", "base64").toString("utf8");
|
|
const tokenPath = "/var/run/secrets/kubernetes.io/serviceaccount/token";
|
|
const caPath = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt";
|
|
const inCluster = Boolean(process.env.KUBERNETES_SERVICE_HOST && existsSync(tokenPath) && existsSync(caPath));
|
|
const host = process.env.KUBERNETES_SERVICE_HOST;
|
|
const port = Number(process.env.KUBERNETES_SERVICE_PORT || "443");
|
|
const token = inCluster ? readFileSync(tokenPath, "utf8").trim() : "";
|
|
const ca = inCluster ? readFileSync(caPath) : null;
|
|
|
|
function request(method, path, body, contentType = "application/json") {
|
|
return new Promise((resolve, reject) => {
|
|
const headers = { authorization: `Bearer ${token}` };
|
|
const payload = body === undefined ? null : typeof body === "string" ? body : JSON.stringify(body);
|
|
if (payload !== null) {
|
|
headers["content-type"] = contentType;
|
|
headers["content-length"] = Buffer.byteLength(payload);
|
|
}
|
|
const req = https.request({ host, port, path, method, ca, headers }, (res) => {
|
|
let text = "";
|
|
res.setEncoding("utf8");
|
|
res.on("data", (chunk) => { text += chunk; });
|
|
res.on("end", () => resolve({ status: res.statusCode || 0, text }));
|
|
});
|
|
req.on("error", reject);
|
|
if (payload !== null) req.write(payload);
|
|
req.end();
|
|
});
|
|
}
|
|
|
|
async function readConfigMap() {
|
|
if (!inCluster) return readConfigMapViaKubectl();
|
|
const result = await request("GET", `/api/v1/namespaces/${encodeURIComponent(namespace)}/configmaps/${encodeURIComponent(configMap)}`);
|
|
if (result.status === 404) return null;
|
|
if (result.status < 200 || result.status >= 300) throw new Error(result.text || `kube api GET configmap status ${result.status}`);
|
|
return JSON.parse(result.text);
|
|
}
|
|
|
|
async function ensureConfigMap() {
|
|
if (await readConfigMap() !== null) return;
|
|
const object = {
|
|
apiVersion: "v1",
|
|
kind: "ConfigMap",
|
|
metadata: { name: configMap, namespace },
|
|
data: { _createdAt: new Date().toISOString(), _specRef: specRef },
|
|
};
|
|
if (!inCluster) {
|
|
execFileSync("kubectl", ["-n", namespace, "apply", "-f", "-"], { input: JSON.stringify(object), encoding: "utf8", stdio: ["pipe", "pipe", "pipe"] });
|
|
return;
|
|
}
|
|
const result = await request("POST", `/api/v1/namespaces/${encodeURIComponent(namespace)}/configmaps`, object);
|
|
if (result.status === 409) return;
|
|
if (result.status < 200 || result.status >= 300) throw new Error(result.text || `kube api POST configmap status ${result.status}`);
|
|
}
|
|
|
|
function readConfigMapViaKubectl() {
|
|
try {
|
|
const stdout = execFileSync("kubectl", ["-n", namespace, "get", "configmap", configMap, "-o", "json"], { encoding: "utf8", stdio: ["ignore", "pipe", "pipe"] });
|
|
return JSON.parse(stdout);
|
|
} catch (error) {
|
|
const stderr = String(error?.stderr || error?.message || "");
|
|
if (/not found/i.test(stderr)) return null;
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
function stringOrNull(value) {
|
|
return typeof value === "string" && value.length > 0 ? value : null;
|
|
}
|
|
|
|
function numberOrNull(value) {
|
|
return typeof value === "number" && Number.isFinite(value) ? value : null;
|
|
}
|
|
|
|
function timestampMs(value) {
|
|
const text = stringOrNull(value);
|
|
if (text === null) return null;
|
|
const parsed = Date.parse(text);
|
|
return Number.isFinite(parsed) ? parsed : null;
|
|
}
|
|
|
|
function roundSeconds(value) {
|
|
return Math.round(value * 10) / 10;
|
|
}
|
|
|
|
function totalSecondsFromRange(startedAt, finishedAt) {
|
|
const startedMs = timestampMs(startedAt);
|
|
if (startedMs === null) return null;
|
|
const finishedMs = timestampMs(finishedAt) ?? Date.now();
|
|
return finishedMs >= startedMs ? roundSeconds((finishedMs - startedMs) / 1000) : null;
|
|
}
|
|
|
|
function terminalPhase(phase) {
|
|
return ["Succeeded", "Failed", "Blocked", "Skipped", "Noop"].includes(phase);
|
|
}
|
|
|
|
function preserveExistingTiming(state, existing) {
|
|
if (numberOrNull(state?.timings?.totalSeconds) !== null) return state;
|
|
const existingTimings = existing?.timings;
|
|
const sourceCommit = stringOrNull(existingTimings?.sourceCommit);
|
|
if (sourceCommit === null || sourceCommit !== stringOrNull(state?.source?.observedSha)) return state;
|
|
const startedAt = stringOrNull(existingTimings?.startedAt);
|
|
const existingFinishedAt = stringOrNull(existingTimings?.finishedAt);
|
|
const finishedAt = existingFinishedAt ?? (terminalPhase(state.phase) ? new Date().toISOString() : null);
|
|
const seconds = totalSecondsFromRange(startedAt, finishedAt) ?? numberOrNull(existingTimings?.totalSeconds);
|
|
if (seconds === null) return state;
|
|
const budgetSeconds = numberOrNull(state?.timings?.budgetSeconds);
|
|
return {
|
|
...state,
|
|
timings: {
|
|
...state.timings,
|
|
totalSeconds: seconds,
|
|
totalStatus: terminalPhase(state.phase) ? "completed" : String(state.phase || "recorded").toLowerCase(),
|
|
totalSource: stringOrNull(existingTimings?.totalSource) ?? "stored-state",
|
|
sourceCommit,
|
|
startedAt,
|
|
finishedAt,
|
|
overBudget: budgetSeconds === null ? null : seconds > budgetSeconds,
|
|
},
|
|
};
|
|
}
|
|
|
|
await ensureConfigMap();
|
|
const current = await readConfigMap();
|
|
const beforeResourceVersion = stringOrNull(current?.metadata?.resourceVersion);
|
|
const beforeUpdatedAt = stringOrNull(current?.data?._updatedAt);
|
|
const currentText = current?.data?.[followerId];
|
|
const existing = typeof currentText === "string" && currentText.length > 0 ? JSON.parse(currentText) : null;
|
|
const incomingState = JSON.parse(stateJson);
|
|
const state = preserveExistingTiming(incomingState, existing);
|
|
const patch = {
|
|
data: {
|
|
[followerId]: JSON.stringify(state),
|
|
_updatedAt: new Date().toISOString(),
|
|
_specRef: specRef,
|
|
},
|
|
};
|
|
if (inCluster) {
|
|
const patchResult = await request("PATCH", `/api/v1/namespaces/${encodeURIComponent(namespace)}/configmaps/${encodeURIComponent(configMap)}`, patch, "application/merge-patch+json");
|
|
if (patchResult.status < 200 || patchResult.status >= 300) throw new Error(patchResult.text || `kube api PATCH configmap status ${patchResult.status}`);
|
|
} else {
|
|
execFileSync("kubectl", ["-n", namespace, "patch", "configmap", configMap, "--type", "merge", "-p", JSON.stringify(patch)], { encoding: "utf8", stdio: ["ignore", "pipe", "pipe"] });
|
|
}
|
|
const updated = await readConfigMap();
|
|
process.stdout.write(JSON.stringify({
|
|
ok: true,
|
|
followerId,
|
|
preservedTiming: state !== incomingState,
|
|
beforeResourceVersion,
|
|
afterResourceVersion: stringOrNull(updated?.metadata?.resourceVersion),
|
|
beforeUpdatedAt,
|
|
afterUpdatedAt: stringOrNull(updated?.data?._updatedAt),
|
|
statusAuthority: "target-node-summary",
|
|
parsedDownstreamCliOutput: false,
|
|
}));
|