import { readFileSync } from "node:fs"; import { execFileSync } from "node:child_process"; import { existsSync } from "node:fs"; import https from "node:https"; const namespace = process.env.NAMESPACE || ""; const configMap = process.env.CONFIGMAP || ""; const followerId = process.env.FOLLOWER_ID || ""; const specRef = process.env.SPEC_REF || ""; const stateJson = Buffer.from(process.env.STATE_B64 || "", "base64").toString("utf8"); const tokenPath = "/var/run/secrets/kubernetes.io/serviceaccount/token"; const caPath = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt"; const inCluster = Boolean(process.env.KUBERNETES_SERVICE_HOST && existsSync(tokenPath) && existsSync(caPath)); const host = process.env.KUBERNETES_SERVICE_HOST; const port = Number(process.env.KUBERNETES_SERVICE_PORT || "443"); const token = inCluster ? readFileSync(tokenPath, "utf8").trim() : ""; const ca = inCluster ? readFileSync(caPath) : null; function request(method, path, body, contentType = "application/json") { return new Promise((resolve, reject) => { const headers = { authorization: `Bearer ${token}` }; const payload = body === undefined ? null : typeof body === "string" ? body : JSON.stringify(body); if (payload !== null) { headers["content-type"] = contentType; headers["content-length"] = Buffer.byteLength(payload); } const req = https.request({ host, port, path, method, ca, headers }, (res) => { let text = ""; res.setEncoding("utf8"); res.on("data", (chunk) => { text += chunk; }); res.on("end", () => resolve({ status: res.statusCode || 0, text })); }); req.on("error", reject); if (payload !== null) req.write(payload); req.end(); }); } async function readConfigMap() { if (!inCluster) return readConfigMapViaKubectl(); const result = await request("GET", `/api/v1/namespaces/${encodeURIComponent(namespace)}/configmaps/${encodeURIComponent(configMap)}`); if (result.status === 404) return null; if (result.status < 200 || result.status >= 300) throw new Error(result.text || `kube api GET configmap status ${result.status}`); return JSON.parse(result.text); } async function ensureConfigMap() { if (await readConfigMap() !== null) return; const object = { apiVersion: "v1", kind: "ConfigMap", metadata: { name: configMap, namespace }, data: { _createdAt: new Date().toISOString(), _specRef: specRef }, }; if (!inCluster) { execFileSync("kubectl", ["-n", namespace, "apply", "-f", "-"], { input: JSON.stringify(object), encoding: "utf8", stdio: ["pipe", "pipe", "pipe"] }); return; } const result = await request("POST", `/api/v1/namespaces/${encodeURIComponent(namespace)}/configmaps`, object); if (result.status === 409) return; if (result.status < 200 || result.status >= 300) throw new Error(result.text || `kube api POST configmap status ${result.status}`); } function readConfigMapViaKubectl() { try { const stdout = execFileSync("kubectl", ["-n", namespace, "get", "configmap", configMap, "-o", "json"], { encoding: "utf8", stdio: ["ignore", "pipe", "pipe"] }); return JSON.parse(stdout); } catch (error) { const stderr = String(error?.stderr || error?.message || ""); if (/not found/i.test(stderr)) return null; throw error; } } function stringOrNull(value) { return typeof value === "string" && value.length > 0 ? value : null; } function numberOrNull(value) { return typeof value === "number" && Number.isFinite(value) ? value : null; } function timestampMs(value) { const text = stringOrNull(value); if (text === null) return null; const parsed = Date.parse(text); return Number.isFinite(parsed) ? parsed : null; } function roundSeconds(value) { return Math.round(value * 10) / 10; } function totalSecondsFromRange(startedAt, finishedAt) { const startedMs = timestampMs(startedAt); if (startedMs === null) return null; const finishedMs = timestampMs(finishedAt) ?? Date.now(); return finishedMs >= startedMs ? roundSeconds((finishedMs - startedMs) / 1000) : null; } function terminalPhase(phase) { return ["Succeeded", "Failed", "Blocked", "Skipped", "Noop"].includes(phase); } function preserveExistingTiming(state, existing) { if (numberOrNull(state?.timings?.totalSeconds) !== null) return state; const existingTimings = existing?.timings; const sourceCommit = stringOrNull(existingTimings?.sourceCommit); if (sourceCommit === null || sourceCommit !== stringOrNull(state?.source?.observedSha)) return state; const startedAt = stringOrNull(existingTimings?.startedAt); const existingFinishedAt = stringOrNull(existingTimings?.finishedAt); const finishedAt = existingFinishedAt ?? (terminalPhase(state.phase) ? new Date().toISOString() : null); const seconds = totalSecondsFromRange(startedAt, finishedAt) ?? numberOrNull(existingTimings?.totalSeconds); if (seconds === null) return state; const budgetSeconds = numberOrNull(state?.timings?.budgetSeconds); return { ...state, timings: { ...state.timings, totalSeconds: seconds, totalStatus: terminalPhase(state.phase) ? "completed" : String(state.phase || "recorded").toLowerCase(), totalSource: stringOrNull(existingTimings?.totalSource) ?? "stored-state", sourceCommit, startedAt, finishedAt, overBudget: budgetSeconds === null ? null : seconds > budgetSeconds, }, }; } function recordOrNull(value) { return typeof value === "object" && value !== null && !Array.isArray(value) ? value : null; } function sameObservedSource(state, existing) { const incomingCommit = stringOrNull(state?.source?.observedSha) ?? stringOrNull(state?.command?.sourceCommit) ?? stringOrNull(state?.command?.payload?.agentrun?.sourceCommit); const existingCommit = stringOrNull(existing?.source?.observedSha) ?? stringOrNull(existing?.command?.sourceCommit) ?? stringOrNull(existing?.command?.payload?.agentrun?.sourceCommit); return incomingCommit !== null && incomingCommit === existingCommit; } function hasAgentRunTriggerEvidence(value) { const agentrun = recordOrNull(value); return agentrun !== null && ( recordOrNull(agentrun.ciConsumption) !== null || recordOrNull(agentrun.reusePlan) !== null || recordOrNull(agentrun.imageBuild) !== null || recordOrNull(agentrun.gitopsPublish) !== null ); } function preserveAgentRunTriggerEvidence(state, existing) { if (!sameObservedSource(state, existing)) return state; const existingAgentrun = recordOrNull(existing?.command?.payload?.agentrun); if (!hasAgentRunTriggerEvidence(existingAgentrun)) return state; const command = recordOrNull(state.command); if (command === null) return state; const payload = recordOrNull(command.payload); if (payload === null) return state; const incomingAgentrun = recordOrNull(payload.agentrun) ?? {}; const preservedKeys = [ "configPath", "sourceCommit", "reuseConfig", "reusePlan", "ciConsumption", "gitMirrorSync", "imageBuild", "gitopsPublish", "gitMirrorFlush", ]; let changed = false; const mergedAgentrun = { ...incomingAgentrun }; for (const key of preservedKeys) { if (mergedAgentrun[key] === undefined || mergedAgentrun[key] === null) { const existingValue = existingAgentrun[key]; if (existingValue !== undefined && existingValue !== null) { mergedAgentrun[key] = existingValue; changed = true; } } } if (!changed) return state; return { ...state, command: { ...command, payload: { ...payload, agentrun: mergedAgentrun, }, }, }; } await ensureConfigMap(); const current = await readConfigMap(); const beforeResourceVersion = stringOrNull(current?.metadata?.resourceVersion); const beforeUpdatedAt = stringOrNull(current?.data?._updatedAt); const currentText = current?.data?.[followerId]; const existing = typeof currentText === "string" && currentText.length > 0 ? JSON.parse(currentText) : null; const incomingState = JSON.parse(stateJson); const state = preserveAgentRunTriggerEvidence(preserveExistingTiming(incomingState, existing), existing); const patch = { data: { [followerId]: JSON.stringify(state), _updatedAt: new Date().toISOString(), _specRef: specRef, }, }; if (inCluster) { const patchResult = await request("PATCH", `/api/v1/namespaces/${encodeURIComponent(namespace)}/configmaps/${encodeURIComponent(configMap)}`, patch, "application/merge-patch+json"); if (patchResult.status < 200 || patchResult.status >= 300) throw new Error(patchResult.text || `kube api PATCH configmap status ${patchResult.status}`); } else { execFileSync("kubectl", ["-n", namespace, "patch", "configmap", configMap, "--type", "merge", "-p", JSON.stringify(patch)], { encoding: "utf8", stdio: ["ignore", "pipe", "pipe"] }); } const updated = await readConfigMap(); process.stdout.write(JSON.stringify({ ok: true, followerId, preservedTiming: state !== incomingState, beforeResourceVersion, afterResourceVersion: stringOrNull(updated?.metadata?.resourceVersion), beforeUpdatedAt, afterUpdatedAt: stringOrNull(updated?.data?._updatedAt), statusAuthority: "target-node-summary", parsedDownstreamCliOutput: false, }));