diff --git a/scripts/native/cicd/patch-follower-state.mjs b/scripts/native/cicd/patch-follower-state.mjs new file mode 100644 index 00000000..4fecfe5b --- /dev/null +++ b/scripts/native/cicd/patch-follower-state.mjs @@ -0,0 +1,108 @@ +import { execFileSync } from "node:child_process"; + +const namespace = process.env.NAMESPACE || ""; +const configMap = process.env.CONFIGMAP || ""; +const followerId = process.env.FOLLOWER_ID || ""; +const specRef = process.env.SPEC_REF || ""; +const stateJson = Buffer.from(process.env.STATE_B64 || "", "base64").toString("utf8"); + +function kubectl(args, input) { + return execFileSync("kubectl", ["-n", namespace, ...args], { + input, + encoding: "utf8", + stdio: ["pipe", "pipe", "pipe"], + }); +} + +function readConfigMap() { + try { + return JSON.parse(kubectl(["get", "configmap", configMap, "-o", "json"])); + } catch (error) { + const stderr = String(error?.stderr || error?.message || ""); + if (/not found/i.test(stderr)) return null; + throw error; + } +} + +function ensureConfigMap() { + if (readConfigMap() !== null) return; + const object = { + apiVersion: "v1", + kind: "ConfigMap", + metadata: { name: configMap, namespace }, + data: { _createdAt: new Date().toISOString(), _specRef: specRef }, + }; + kubectl(["apply", "-f", "-"], JSON.stringify(object)); +} + +function stringOrNull(value) { + return typeof value === "string" && value.length > 0 ? value : null; +} + +function numberOrNull(value) { + return typeof value === "number" && Number.isFinite(value) ? value : null; +} + +function timestampMs(value) { + const text = stringOrNull(value); + if (text === null) return null; + const parsed = Date.parse(text); + return Number.isFinite(parsed) ? parsed : null; +} + +function roundSeconds(value) { + return Math.round(value * 10) / 10; +} + +function totalSecondsFromRange(startedAt, finishedAt) { + const startedMs = timestampMs(startedAt); + if (startedMs === null) return null; + const finishedMs = timestampMs(finishedAt) ?? Date.now(); + return finishedMs >= startedMs ? roundSeconds((finishedMs - startedMs) / 1000) : null; +} + +function terminalPhase(phase) { + return ["Succeeded", "Failed", "Blocked", "Skipped", "Noop"].includes(phase); +} + +function preserveExistingTiming(state, existing) { + if (numberOrNull(state?.timings?.totalSeconds) !== null) return state; + const existingTimings = existing?.timings; + const sourceCommit = stringOrNull(existingTimings?.sourceCommit); + if (sourceCommit === null || sourceCommit !== stringOrNull(state?.source?.observedSha)) return state; + const startedAt = stringOrNull(existingTimings?.startedAt); + const existingFinishedAt = stringOrNull(existingTimings?.finishedAt); + const finishedAt = existingFinishedAt ?? (terminalPhase(state.phase) ? new Date().toISOString() : null); + const seconds = totalSecondsFromRange(startedAt, finishedAt) ?? numberOrNull(existingTimings?.totalSeconds); + if (seconds === null) return state; + const budgetSeconds = numberOrNull(state?.timings?.budgetSeconds); + return { + ...state, + timings: { + ...state.timings, + totalSeconds: seconds, + totalStatus: terminalPhase(state.phase) ? "completed" : String(state.phase || "recorded").toLowerCase(), + totalSource: stringOrNull(existingTimings?.totalSource) ?? "stored-state", + sourceCommit, + startedAt, + finishedAt, + overBudget: budgetSeconds === null ? null : seconds > budgetSeconds, + }, + }; +} + +ensureConfigMap(); +const current = readConfigMap(); +const currentText = current?.data?.[followerId]; +const existing = typeof currentText === "string" && currentText.length > 0 ? JSON.parse(currentText) : null; +const incomingState = JSON.parse(stateJson); +const state = preserveExistingTiming(incomingState, existing); +const patch = { + data: { + [followerId]: JSON.stringify(state), + _updatedAt: new Date().toISOString(), + _specRef: specRef, + }, +}; +kubectl(["patch", "configmap", configMap, "--type", "merge", "-p", JSON.stringify(patch)]); +process.stdout.write(JSON.stringify({ ok: true, followerId, preservedTiming: state !== incomingState, statusAuthority: "target-node-summary", parsedDownstreamCliOutput: false })); diff --git a/scripts/native/cicd/read-follower-state.mjs b/scripts/native/cicd/read-follower-state.mjs deleted file mode 100644 index 6928d65d..00000000 --- a/scripts/native/cicd/read-follower-state.mjs +++ /dev/null @@ -1,16 +0,0 @@ -import { execFileSync } from "node:child_process"; - -const namespace = process.env.NAMESPACE || ""; -const configMap = process.env.CONFIGMAP || ""; -const followerId = process.env.FOLLOWER_ID || ""; - -try { - const raw = execFileSync("kubectl", ["-n", namespace, "get", "configmap", configMap, "-o", "json"], { - encoding: "utf8", - stdio: ["ignore", "pipe", "pipe"], - }); - const data = JSON.parse(raw).data || {}; - process.stdout.write(data[followerId] || "{}"); -} catch { - process.stdout.write("{}"); -} diff --git a/scripts/src/cicd.ts b/scripts/src/cicd.ts index 0b83c8cf..d58c12b0 100644 --- a/scripts/src/cicd.ts +++ b/scripts/src/cicd.ts @@ -2399,114 +2399,22 @@ function roundSeconds(value: number): number { } function writeFollowerState(registry: BranchFollowerRegistry, state: FollowerState, options: ParsedOptions): CommandResult { - const stateForWrite = preserveWriteTimeTotalTiming(registry, state, options); - const json = JSON.stringify(compactFollowerStateForConfigMap(stateForWrite)); - const dataPatch = JSON.stringify({ data: { [state.id]: json, _updatedAt: new Date().toISOString(), _specRef: SPEC_REF } }); - if (options.inCluster) { - const patchBase64 = Buffer.from(dataPatch, "utf8").toString("base64"); - const createBase64 = Buffer.from(JSON.stringify({ - metadata: { - name: registry.controller.stateConfigMapName, - namespace: registry.controller.namespace, - }, - data: { - _createdAt: new Date().toISOString(), - _specRef: SPEC_REF, - }, - }), "utf8").toString("base64"); - const script = [ - "set -eu", - `PATCH_B64=${shQuote(patchBase64)}`, - `CREATE_B64=${shQuote(createBase64)}`, - `NAMESPACE=${shQuote(registry.controller.namespace)}`, - `CONFIGMAP=${shQuote(registry.controller.stateConfigMapName)}`, - "export PATCH_B64 CREATE_B64 NAMESPACE CONFIGMAP", - "node <<'NODE_KUBE_PATCH'", - "import { readFileSync } from 'node:fs';", - "import https from 'node:https';", - "const host = process.env.KUBERNETES_SERVICE_HOST;", - "const port = Number(process.env.KUBERNETES_SERVICE_PORT || '443');", - "const namespace = process.env.NAMESPACE;", - "const name = process.env.CONFIGMAP;", - "const token = readFileSync('/var/run/secrets/kubernetes.io/serviceaccount/token', 'utf8').trim();", - "const ca = readFileSync('/var/run/secrets/kubernetes.io/serviceaccount/ca.crt');", - "const patch = Buffer.from(process.env.PATCH_B64 || '', 'base64').toString('utf8');", - "const create = Buffer.from(process.env.CREATE_B64 || '', 'base64').toString('utf8');", - "function request(method, path, body, contentType) {", - " return new Promise((resolve, reject) => {", - " const req = https.request({ host, port, path, method, ca, headers: { authorization: `Bearer ${token}`, ...(body ? { 'content-type': contentType || 'application/json', 'content-length': Buffer.byteLength(body) } : {}) } }, (res) => {", - " let text = '';", - " res.setEncoding('utf8');", - " res.on('data', (chunk) => { text += chunk; });", - " res.on('end', () => resolve({ status: res.statusCode || 0, text }));", - " });", - " req.on('error', reject);", - " if (body) req.write(body);", - " req.end();", - " });", - "}", - "const base = `/api/v1/namespaces/${namespace}/configmaps`;", - "let result = await request('PATCH', `${base}/${name}`, patch, 'application/merge-patch+json');", - "if (result.status === 404) {", - " const created = await request('POST', base, create, 'application/json');", - " if (created.status < 200 || created.status >= 300) { process.stderr.write(created.text); process.exit(1); }", - " result = await request('PATCH', `${base}/${name}`, patch, 'application/merge-patch+json');", - "}", - "if (result.status < 200 || result.status >= 300) { process.stderr.write(result.text); process.exit(1); }", - "NODE_KUBE_PATCH", - ].join("\n"); - return runKubeScript(registry, options, script, "", 10_000); - } - const script = [ - "set -eu", - `kubectl -n ${shQuote(registry.controller.namespace)} create configmap ${shQuote(registry.controller.stateConfigMapName)} --from-literal=_createdAt="$(date -Iseconds)" --dry-run=client -o yaml | kubectl apply -f - >/dev/null`, - `kubectl -n ${shQuote(registry.controller.namespace)} patch configmap ${shQuote(registry.controller.stateConfigMapName)} --type merge -p ${shQuote(dataPatch)} >/dev/null`, - ].join("\n"); - return runKubeScript(registry, options, script, "", 10_000); -} - -function preserveWriteTimeTotalTiming(registry: BranchFollowerRegistry, state: FollowerState, options: ParsedOptions): FollowerState { - if (state.timings.totalSeconds !== null) return state; - const existing = readExistingFollowerStateForWrite(registry, state.id, options); - const existingTimings = asOptionalRecord(existing?.timings); - if (existingTimings === null) return state; - const sourceCommit = stringOrNull(existingTimings.sourceCommit); - if (sourceCommit === null || sourceCommit !== state.source.observedSha) return state; - const startedAt = stringOrNull(existingTimings.startedAt); - const existingFinishedAt = stringOrNull(existingTimings.finishedAt); - const finishedAt = existingFinishedAt ?? (terminalPhase(state.phase) ? new Date().toISOString() : null); - const seconds = totalSecondsFromRange(startedAt, finishedAt) ?? numberOrNull(existingTimings.totalSeconds); - if (seconds === null) return state; - return { - ...state, - timings: { - ...state.timings, - totalSeconds: seconds, - totalStatus: terminalPhase(state.phase) ? "completed" : state.phase.toLowerCase(), - totalSource: stringOrNull(existingTimings.totalSource) ?? "stored-state", - sourceCommit, - startedAt, - finishedAt, - overBudget: seconds > state.timings.budgetSeconds, - }, - }; -} - -function readExistingFollowerStateForWrite(registry: BranchFollowerRegistry, followerId: string, options: ParsedOptions): Record | null { + const stateJson = JSON.stringify(compactFollowerStateForConfigMap(state)); const script = [ "set -eu", `NAMESPACE=${shQuote(registry.controller.namespace)}`, `CONFIGMAP=${shQuote(registry.controller.stateConfigMapName)}`, - `FOLLOWER_ID=${shQuote(followerId)}`, - "export NAMESPACE CONFIGMAP FOLLOWER_ID", + `FOLLOWER_ID=${shQuote(state.id)}`, + `SPEC_REF=${shQuote(SPEC_REF)}`, + `STATE_B64=${shQuote(Buffer.from(stateJson, "utf8").toString("base64"))}`, + "export NAMESPACE CONFIGMAP FOLLOWER_ID SPEC_REF STATE_B64", "tmpdir=$(mktemp -d)", "cleanup() { rm -rf \"$tmpdir\"; }", "trap cleanup EXIT INT TERM", - nativeCicdScriptLoadShell(["read-follower-state.mjs"]), - "node \"$tmpdir/read-follower-state.mjs\"", + nativeCicdScriptLoadShell(["patch-follower-state.mjs"]), + "node \"$tmpdir/patch-follower-state.mjs\"", ].join("\n"); - const result = runKubeScript(registry, options, script, "", 10_000); - return result.exitCode === 0 ? parseJsonObject(result.stdout) : null; + return runKubeScript(registry, options, script, "", 10_000); } function runControllerReconcileJob(registry: BranchFollowerRegistry, options: ParsedOptions, mode: { dryRun: boolean; wait: boolean; recordState: boolean }): Record {