fix: patch follower timing atomically

This commit is contained in:
Codex
2026-07-03 17:32:12 +00:00
parent 72f6a7a0e4
commit babcb7b364
3 changed files with 116 additions and 116 deletions
@@ -0,0 +1,108 @@
import { execFileSync } from "node:child_process";
const namespace = process.env.NAMESPACE || "";
const configMap = process.env.CONFIGMAP || "";
const followerId = process.env.FOLLOWER_ID || "";
const specRef = process.env.SPEC_REF || "";
const stateJson = Buffer.from(process.env.STATE_B64 || "", "base64").toString("utf8");
function kubectl(args, input) {
return execFileSync("kubectl", ["-n", namespace, ...args], {
input,
encoding: "utf8",
stdio: ["pipe", "pipe", "pipe"],
});
}
function readConfigMap() {
try {
return JSON.parse(kubectl(["get", "configmap", configMap, "-o", "json"]));
} catch (error) {
const stderr = String(error?.stderr || error?.message || "");
if (/not found/i.test(stderr)) return null;
throw error;
}
}
function ensureConfigMap() {
if (readConfigMap() !== null) return;
const object = {
apiVersion: "v1",
kind: "ConfigMap",
metadata: { name: configMap, namespace },
data: { _createdAt: new Date().toISOString(), _specRef: specRef },
};
kubectl(["apply", "-f", "-"], JSON.stringify(object));
}
function stringOrNull(value) {
return typeof value === "string" && value.length > 0 ? value : null;
}
function numberOrNull(value) {
return typeof value === "number" && Number.isFinite(value) ? value : null;
}
function timestampMs(value) {
const text = stringOrNull(value);
if (text === null) return null;
const parsed = Date.parse(text);
return Number.isFinite(parsed) ? parsed : null;
}
function roundSeconds(value) {
return Math.round(value * 10) / 10;
}
function totalSecondsFromRange(startedAt, finishedAt) {
const startedMs = timestampMs(startedAt);
if (startedMs === null) return null;
const finishedMs = timestampMs(finishedAt) ?? Date.now();
return finishedMs >= startedMs ? roundSeconds((finishedMs - startedMs) / 1000) : null;
}
function terminalPhase(phase) {
return ["Succeeded", "Failed", "Blocked", "Skipped", "Noop"].includes(phase);
}
function preserveExistingTiming(state, existing) {
if (numberOrNull(state?.timings?.totalSeconds) !== null) return state;
const existingTimings = existing?.timings;
const sourceCommit = stringOrNull(existingTimings?.sourceCommit);
if (sourceCommit === null || sourceCommit !== stringOrNull(state?.source?.observedSha)) return state;
const startedAt = stringOrNull(existingTimings?.startedAt);
const existingFinishedAt = stringOrNull(existingTimings?.finishedAt);
const finishedAt = existingFinishedAt ?? (terminalPhase(state.phase) ? new Date().toISOString() : null);
const seconds = totalSecondsFromRange(startedAt, finishedAt) ?? numberOrNull(existingTimings?.totalSeconds);
if (seconds === null) return state;
const budgetSeconds = numberOrNull(state?.timings?.budgetSeconds);
return {
...state,
timings: {
...state.timings,
totalSeconds: seconds,
totalStatus: terminalPhase(state.phase) ? "completed" : String(state.phase || "recorded").toLowerCase(),
totalSource: stringOrNull(existingTimings?.totalSource) ?? "stored-state",
sourceCommit,
startedAt,
finishedAt,
overBudget: budgetSeconds === null ? null : seconds > budgetSeconds,
},
};
}
ensureConfigMap();
const current = readConfigMap();
const currentText = current?.data?.[followerId];
const existing = typeof currentText === "string" && currentText.length > 0 ? JSON.parse(currentText) : null;
const incomingState = JSON.parse(stateJson);
const state = preserveExistingTiming(incomingState, existing);
const patch = {
data: {
[followerId]: JSON.stringify(state),
_updatedAt: new Date().toISOString(),
_specRef: specRef,
},
};
kubectl(["patch", "configmap", configMap, "--type", "merge", "-p", JSON.stringify(patch)]);
process.stdout.write(JSON.stringify({ ok: true, followerId, preservedTiming: state !== incomingState, statusAuthority: "target-node-summary", parsedDownstreamCliOutput: false }));
@@ -1,16 +0,0 @@
import { execFileSync } from "node:child_process";
const namespace = process.env.NAMESPACE || "";
const configMap = process.env.CONFIGMAP || "";
const followerId = process.env.FOLLOWER_ID || "";
try {
const raw = execFileSync("kubectl", ["-n", namespace, "get", "configmap", configMap, "-o", "json"], {
encoding: "utf8",
stdio: ["ignore", "pipe", "pipe"],
});
const data = JSON.parse(raw).data || {};
process.stdout.write(data[followerId] || "{}");
} catch {
process.stdout.write("{}");
}
+8 -100
View File
@@ -2399,114 +2399,22 @@ function roundSeconds(value: number): number {
}
function writeFollowerState(registry: BranchFollowerRegistry, state: FollowerState, options: ParsedOptions): CommandResult {
const stateForWrite = preserveWriteTimeTotalTiming(registry, state, options);
const json = JSON.stringify(compactFollowerStateForConfigMap(stateForWrite));
const dataPatch = JSON.stringify({ data: { [state.id]: json, _updatedAt: new Date().toISOString(), _specRef: SPEC_REF } });
if (options.inCluster) {
const patchBase64 = Buffer.from(dataPatch, "utf8").toString("base64");
const createBase64 = Buffer.from(JSON.stringify({
metadata: {
name: registry.controller.stateConfigMapName,
namespace: registry.controller.namespace,
},
data: {
_createdAt: new Date().toISOString(),
_specRef: SPEC_REF,
},
}), "utf8").toString("base64");
const script = [
"set -eu",
`PATCH_B64=${shQuote(patchBase64)}`,
`CREATE_B64=${shQuote(createBase64)}`,
`NAMESPACE=${shQuote(registry.controller.namespace)}`,
`CONFIGMAP=${shQuote(registry.controller.stateConfigMapName)}`,
"export PATCH_B64 CREATE_B64 NAMESPACE CONFIGMAP",
"node <<'NODE_KUBE_PATCH'",
"import { readFileSync } from 'node:fs';",
"import https from 'node:https';",
"const host = process.env.KUBERNETES_SERVICE_HOST;",
"const port = Number(process.env.KUBERNETES_SERVICE_PORT || '443');",
"const namespace = process.env.NAMESPACE;",
"const name = process.env.CONFIGMAP;",
"const token = readFileSync('/var/run/secrets/kubernetes.io/serviceaccount/token', 'utf8').trim();",
"const ca = readFileSync('/var/run/secrets/kubernetes.io/serviceaccount/ca.crt');",
"const patch = Buffer.from(process.env.PATCH_B64 || '', 'base64').toString('utf8');",
"const create = Buffer.from(process.env.CREATE_B64 || '', 'base64').toString('utf8');",
"function request(method, path, body, contentType) {",
" return new Promise((resolve, reject) => {",
" const req = https.request({ host, port, path, method, ca, headers: { authorization: `Bearer ${token}`, ...(body ? { 'content-type': contentType || 'application/json', 'content-length': Buffer.byteLength(body) } : {}) } }, (res) => {",
" let text = '';",
" res.setEncoding('utf8');",
" res.on('data', (chunk) => { text += chunk; });",
" res.on('end', () => resolve({ status: res.statusCode || 0, text }));",
" });",
" req.on('error', reject);",
" if (body) req.write(body);",
" req.end();",
" });",
"}",
"const base = `/api/v1/namespaces/${namespace}/configmaps`;",
"let result = await request('PATCH', `${base}/${name}`, patch, 'application/merge-patch+json');",
"if (result.status === 404) {",
" const created = await request('POST', base, create, 'application/json');",
" if (created.status < 200 || created.status >= 300) { process.stderr.write(created.text); process.exit(1); }",
" result = await request('PATCH', `${base}/${name}`, patch, 'application/merge-patch+json');",
"}",
"if (result.status < 200 || result.status >= 300) { process.stderr.write(result.text); process.exit(1); }",
"NODE_KUBE_PATCH",
].join("\n");
return runKubeScript(registry, options, script, "", 10_000);
}
const script = [
"set -eu",
`kubectl -n ${shQuote(registry.controller.namespace)} create configmap ${shQuote(registry.controller.stateConfigMapName)} --from-literal=_createdAt="$(date -Iseconds)" --dry-run=client -o yaml | kubectl apply -f - >/dev/null`,
`kubectl -n ${shQuote(registry.controller.namespace)} patch configmap ${shQuote(registry.controller.stateConfigMapName)} --type merge -p ${shQuote(dataPatch)} >/dev/null`,
].join("\n");
return runKubeScript(registry, options, script, "", 10_000);
}
function preserveWriteTimeTotalTiming(registry: BranchFollowerRegistry, state: FollowerState, options: ParsedOptions): FollowerState {
if (state.timings.totalSeconds !== null) return state;
const existing = readExistingFollowerStateForWrite(registry, state.id, options);
const existingTimings = asOptionalRecord(existing?.timings);
if (existingTimings === null) return state;
const sourceCommit = stringOrNull(existingTimings.sourceCommit);
if (sourceCommit === null || sourceCommit !== state.source.observedSha) return state;
const startedAt = stringOrNull(existingTimings.startedAt);
const existingFinishedAt = stringOrNull(existingTimings.finishedAt);
const finishedAt = existingFinishedAt ?? (terminalPhase(state.phase) ? new Date().toISOString() : null);
const seconds = totalSecondsFromRange(startedAt, finishedAt) ?? numberOrNull(existingTimings.totalSeconds);
if (seconds === null) return state;
return {
...state,
timings: {
...state.timings,
totalSeconds: seconds,
totalStatus: terminalPhase(state.phase) ? "completed" : state.phase.toLowerCase(),
totalSource: stringOrNull(existingTimings.totalSource) ?? "stored-state",
sourceCommit,
startedAt,
finishedAt,
overBudget: seconds > state.timings.budgetSeconds,
},
};
}
function readExistingFollowerStateForWrite(registry: BranchFollowerRegistry, followerId: string, options: ParsedOptions): Record<string, unknown> | null {
const stateJson = JSON.stringify(compactFollowerStateForConfigMap(state));
const script = [
"set -eu",
`NAMESPACE=${shQuote(registry.controller.namespace)}`,
`CONFIGMAP=${shQuote(registry.controller.stateConfigMapName)}`,
`FOLLOWER_ID=${shQuote(followerId)}`,
"export NAMESPACE CONFIGMAP FOLLOWER_ID",
`FOLLOWER_ID=${shQuote(state.id)}`,
`SPEC_REF=${shQuote(SPEC_REF)}`,
`STATE_B64=${shQuote(Buffer.from(stateJson, "utf8").toString("base64"))}`,
"export NAMESPACE CONFIGMAP FOLLOWER_ID SPEC_REF STATE_B64",
"tmpdir=$(mktemp -d)",
"cleanup() { rm -rf \"$tmpdir\"; }",
"trap cleanup EXIT INT TERM",
nativeCicdScriptLoadShell(["read-follower-state.mjs"]),
"node \"$tmpdir/read-follower-state.mjs\"",
nativeCicdScriptLoadShell(["patch-follower-state.mjs"]),
"node \"$tmpdir/patch-follower-state.mjs\"",
].join("\n");
const result = runKubeScript(registry, options, script, "", 10_000);
return result.exitCode === 0 ? parseJsonObject(result.stdout) : null;
return runKubeScript(registry, options, script, "", 10_000);
}
function runControllerReconcileJob(registry: BranchFollowerRegistry, options: ParsedOptions, mode: { dryRun: boolean; wait: boolean; recordState: boolean }): Record<string, unknown> {