fix: add branch follower state cleanup

This commit is contained in:
Codex
2026-07-03 08:26:05 +00:00
parent 9fb35baf9c
commit 1ff91c7a69
+196 -20
View File
@@ -31,7 +31,7 @@ const SPEC_REF = "PJ2026-01060703";
const SPEC_VERSION = "draft-2026-07-03-p0-branch-follower";
type OutputMode = "human" | "json" | "yaml";
type BranchFollowerAction = "help" | "plan" | "apply" | "status" | "run-once" | "events" | "logs";
type BranchFollowerAction = "help" | "plan" | "apply" | "status" | "run-once" | "cleanup-state" | "events" | "logs";
type BranchFollowerPhase =
| "Observed"
| "Noop"
@@ -209,6 +209,7 @@ interface AdapterSummary {
lastTriggeredSha: string | null;
lastSucceededSha: string | null;
pipelineRun: string | null;
pipelineRunPresent: boolean | null;
inFlightJob: string | null;
aligned: boolean | null;
phase: BranchFollowerPhase;
@@ -286,9 +287,16 @@ interface K8sStateRead {
errors: string[];
}
interface K8sFollowerStateRead {
ok: boolean;
stateByFollower: Record<string, Record<string, unknown>>;
present: boolean;
error: string;
}
export function cicdHelp(): unknown {
return {
command: "cicd branch-follower plan|apply|status|run-once|events|logs",
command: "cicd branch-follower plan|apply|status|run-once|cleanup-state|events|logs",
output: "text by default; use --json, --raw, or -o json|yaml for machine output",
usage: [
"bun scripts/cli.ts cicd branch-follower plan",
@@ -297,6 +305,7 @@ export function cicdHelp(): unknown {
"bun scripts/cli.ts cicd branch-follower status --live",
"bun scripts/cli.ts cicd branch-follower run-once --all --dry-run",
"bun scripts/cli.ts cicd branch-follower run-once --follower hwlab-jd01-v03 --confirm --wait",
"bun scripts/cli.ts cicd branch-follower cleanup-state --follower web-probe-sentinel-master --confirm",
"bun scripts/cli.ts cicd branch-follower events --follower agentrun-jd01-v02",
"bun scripts/cli.ts cicd branch-follower logs --follower web-probe-sentinel-master",
],
@@ -310,7 +319,7 @@ export async function runCicdCommand(_config: UniDeskConfig | null, args: string
const top = args[0];
if (top === undefined || isHelpToken(top)) return renderMachine("cicd", cicdHelp(), "json");
if (top !== "branch-follower") {
throw new Error("cicd usage: cicd branch-follower plan|apply|status|run-once|events|logs");
throw new Error("cicd usage: cicd branch-follower plan|apply|status|run-once|cleanup-state|events|logs");
}
const options = parseOptions(args.slice(1));
const command = commandLabel(options);
@@ -325,6 +334,8 @@ export async function runCicdCommand(_config: UniDeskConfig | null, args: string
return renderResult(command, await buildStatus(registry, options), options);
case "run-once":
return renderResult(command, await runOnce(registry, options), options);
case "cleanup-state":
return renderResult(command, cleanupState(registry, options), options);
case "events":
case "logs":
return renderResult(command, await runFollowerDrillDown(registry, options), options);
@@ -338,7 +349,7 @@ function parseOptions(args: string[]): ParsedOptions {
if (actionToken === undefined || isHelpToken(actionToken)) {
return defaultOptions("help", args.slice(actionToken === undefined ? 0 : 1));
}
if (!["plan", "apply", "status", "run-once", "events", "logs"].includes(actionToken)) {
if (!["plan", "apply", "status", "run-once", "cleanup-state", "events", "logs"].includes(actionToken)) {
throw new Error(`cicd branch-follower unknown action: ${actionToken}`);
}
const action = actionToken as BranchFollowerAction;
@@ -395,9 +406,13 @@ function parseOptions(args: string[]): ParsedOptions {
if (options.confirm && options.dryRun) throw new Error("cicd branch-follower accepts only one of --confirm or --dry-run");
if (options.action === "apply" && !options.confirm) options.dryRun = true;
if (options.action === "run-once" && !options.confirm) options.dryRun = true;
if (options.action === "cleanup-state" && !options.confirm) options.dryRun = true;
if (options.action === "run-once" && options.confirm && !options.all && options.followerId === null) {
throw new Error("run-once --confirm requires --all or --follower <id>");
}
if (options.action === "cleanup-state" && options.confirm && !options.all && options.followerId === null) {
throw new Error("cleanup-state --confirm requires --all or --follower <id>");
}
return options;
}
@@ -849,6 +864,43 @@ async function runOnce(registry: BranchFollowerRegistry, options: ParsedOptions)
};
}
function cleanupState(registry: BranchFollowerRegistry, options: ParsedOptions): Record<string, unknown> {
const selected = selectFollowers(registry, options, { includeDisabled: true });
const ids = selected.map((follower) => follower.id);
const before = kubeConfigMapFollowerState(registry, options);
const presentKeys = ids.filter((id) => before.stateByFollower[id] !== undefined);
const command = options.confirm ? removeFollowerStateKeys(registry, options, ids) : null;
return {
ok: command === null ? before.ok : command.exitCode === 0,
action: "cleanup-state",
dryRun: !options.confirm,
confirm: options.confirm,
execution: "k8s-state-configmap-key-delete",
registry: registrySummary(registry),
controller: {
namespace: registry.controller.namespace,
route: registry.controller.kubeRoute,
stateConfigMapName: registry.controller.stateConfigMapName,
},
followers: ids.map((id) => ({
id,
statePresent: before.stateByFollower[id] !== undefined,
cleanup: options.confirm ? "deleted-if-present" : "would-delete-if-present",
})),
stateConfigMapPresent: before.present,
presentKeys,
parsedDownstreamCliOutput: false,
command: command === null ? null : commandCompact(command, options),
errors: before.ok ? [] : [before.error],
next: {
status: "bun scripts/cli.ts cicd branch-follower status",
runOnce: options.followerId === null
? "bun scripts/cli.ts cicd branch-follower run-once --all --confirm --wait"
: `bun scripts/cli.ts cicd branch-follower run-once --follower ${options.followerId} --confirm --wait`,
},
};
}
async function runFollowerDrillDown(registry: BranchFollowerRegistry, options: ParsedOptions): Promise<Record<string, unknown>> {
if (options.followerId === null) {
return {
@@ -939,7 +991,10 @@ async function decideAndMaybeTrigger(
const previousObserved = stringOrNull(recordAt(previous, ["source"])?.observedSha);
const previousTarget = stringOrNull(recordAt(previous, ["target"])?.targetSha);
const superseded = previousInFlight !== null && previousObserved !== null && observedSha !== null && previousObserved !== observedSha;
const nativePipelineRunMatchesObserved = observedSha !== null && live.pipelineRun !== null && live.pipelineRun === expectedPipelineRunName(follower, observedSha);
const nativePipelineRunMatchesObserved = observedSha !== null
&& live.pipelineRunPresent === true
&& live.pipelineRun !== null
&& live.pipelineRun === expectedPipelineRunName(follower, observedSha);
let phase: BranchFollowerPhase;
let decision: string;
let triggerCommand: Record<string, unknown> | undefined;
@@ -1271,6 +1326,7 @@ async function readAdapterStatus(registry: BranchFollowerRegistry, follower: Fol
const observedSha = stringOrNull(bundle.source?.commit);
const runtimeTargetSha = runtimeTargetShaFromWorkloads(follower.nativeStatus.runtime, bundle.workloads);
const pipelineRunName = stringOrNull(asOptionalRecord(bundle.pipelineRun?.metadata)?.name) ?? expectedPipelineRunName(follower, observedSha);
const pipelineRunPresent = follower.nativeStatus.tekton === null ? null : bundle.pipelineRun !== null;
const pipelineSucceeded = pipelineRunSucceeded(bundle.pipelineRun);
const argoReady = follower.nativeStatus.argo === null ? null : argoApplicationReady(bundle.argoApplication);
const runtimeReady = follower.nativeStatus.runtime === null ? null : runtimeWorkloadsReady(follower.nativeStatus.runtime, bundle.workloads);
@@ -1300,7 +1356,8 @@ async function readAdapterStatus(registry: BranchFollowerRegistry, follower: Fol
lastTriggeredSha: null,
lastSucceededSha: aligned && observedSha !== null ? observedSha : null,
pipelineRun: pipelineRunName,
inFlightJob: pipelineSucceeded === null && pipelineRunName !== null ? pipelineRunName : null,
pipelineRunPresent,
inFlightJob: pipelineRunPresent === true && pipelineSucceeded === null && pipelineRunName !== null ? pipelineRunName : null,
aligned,
phase,
message: nativeStatusMessage(ok, phase, observedSha, targetSha, {
@@ -1748,29 +1805,19 @@ function mergeFollowerStatus(
function readK8sState(registry: BranchFollowerRegistry, options: ParsedOptions): K8sStateRead {
const errors: string[] = [];
const stateResult = kubeJson(registry, options, `kubectl -n ${shQuote(registry.controller.namespace)} get configmap ${shQuote(registry.controller.stateConfigMapName)} -o json`, 10_000);
const stateResult = kubeConfigMapFollowerState(registry, options);
const deploymentResult = kubeJson(registry, options, `kubectl -n ${shQuote(registry.controller.namespace)} get deploy ${shQuote(registry.controller.deploymentName)} -o json`, 10_000);
const leaseResult = kubeJson(registry, options, `kubectl -n ${shQuote(registry.controller.namespace)} get lease ${shQuote(registry.controller.leaseName)} -o json`, 10_000);
const podSelector = labelSelector(registry.controller.labels);
const podsResult = kubePodList(registry, options, podSelector);
if (!stateResult.ok && !isNotFoundText(stateResult.error)) errors.push(`state configmap: ${stateResult.error}`);
if (!stateResult.ok) errors.push(`state configmap: ${stateResult.error}`);
if (!deploymentResult.ok && !isNotFoundText(deploymentResult.error)) errors.push(`deployment: ${deploymentResult.error}`);
if (!leaseResult.ok && !isNotFoundText(leaseResult.error)) errors.push(`lease: ${leaseResult.error}`);
if (!podsResult.ok && !isNotFoundText(podsResult.error)) errors.push(`pods: ${podsResult.error}`);
const stateByFollower: Record<string, Record<string, unknown>> = {};
const data = asOptionalRecord(stateResult.value?.data);
if (data !== null) {
for (const [key, value] of Object.entries(data)) {
if (key.startsWith("_")) continue;
if (typeof value !== "string") continue;
const parsed = parseJsonObject(value);
if (parsed !== null) stateByFollower[key] = parsed;
}
}
return {
ok: errors.length === 0,
stateByFollower,
stateConfigMapPresent: stateResult.value !== null,
stateByFollower: stateResult.stateByFollower,
stateConfigMapPresent: stateResult.present,
deployment: deploymentResult.value,
lease: leaseResult.value,
pods: podsResult.value,
@@ -1778,6 +1825,106 @@ function readK8sState(registry: BranchFollowerRegistry, options: ParsedOptions):
};
}
function kubeConfigMapFollowerState(registry: BranchFollowerRegistry, options: ParsedOptions): K8sFollowerStateRead {
const stateByFollower: Record<string, Record<string, unknown>> = {};
const errors: string[] = [];
let present = false;
for (const follower of registry.followers) {
const result = kubeConfigMapDataValue(registry, options, follower.id);
if (!result.ok) {
errors.push(result.error);
continue;
}
if (!result.present) {
return { ok: true, stateByFollower: {}, present: false, error: "" };
}
present = true;
if (result.value === null || result.value.length === 0) continue;
const parsed = parseJsonObject(result.value);
if (parsed === null) {
errors.push(`${follower.id}: invalid state json`);
continue;
}
stateByFollower[follower.id] = parsed;
}
return {
ok: errors.length === 0,
stateByFollower,
present,
error: errors.join("; "),
};
}
function kubeConfigMapDataValue(registry: BranchFollowerRegistry, options: ParsedOptions, key: string): { ok: boolean; present: boolean; value: string | null; error: string } {
const template = `{{ with index .data ${JSON.stringify(key)} }}{{ . }}{{ end }}`;
const script = [
"set -eu",
"tmpdir=$(mktemp -d)",
"cleanup() { rm -rf \"$tmpdir\"; }",
"trap cleanup EXIT INT TERM",
`NAMESPACE=${shQuote(registry.controller.namespace)}`,
`CONFIGMAP=${shQuote(registry.controller.stateConfigMapName)}`,
"export NAMESPACE CONFIGMAP",
`if ! value=$(kubectl -n "$NAMESPACE" get configmap "$CONFIGMAP" -o go-template=${shQuote(template)} 2>"$tmpdir/error"); then`,
" error_b64=$(tail -c 800 \"$tmpdir/error\" | base64 | tr -d '\\n')",
" if grep -qi 'not found' \"$tmpdir/error\"; then",
" printf '{\"ok\":true,\"present\":false,\"valueB64\":null,\"errorB64\":\"%s\"}' \"$error_b64\"",
" exit 0",
" fi",
" printf '{\"ok\":false,\"present\":false,\"valueB64\":null,\"errorB64\":\"%s\"}' \"$error_b64\"",
" exit 0",
"fi",
"value_b64=$(printf '%s' \"$value\" | base64 | tr -d '\\n')",
"printf '{\"ok\":true,\"present\":true,\"valueB64\":\"%s\",\"errorB64\":\"\"}' \"$value_b64\"",
].join("\n");
const result = runKubeScript(registry, options, script, "", 10_000);
const parsed = result.exitCode === 0 ? parseJsonObject(result.stdout) : null;
if (parsed === null) {
return {
ok: false,
present: false,
value: null,
error: redactText(tailText(result.stderr || result.stdout, 800)),
};
}
const errorB64 = typeof parsed.errorB64 === "string" ? parsed.errorB64 : "";
const error = errorB64.length === 0 ? "" : Buffer.from(errorB64, "base64").toString("utf8");
const ok = parsed.ok === true;
const present = parsed.present === true;
const valueB64 = typeof parsed.valueB64 === "string" ? parsed.valueB64 : null;
return {
ok,
present,
value: valueB64 === null ? null : Buffer.from(valueB64, "base64").toString("utf8"),
error: redactText(error),
};
}
function removeFollowerStateKeys(registry: BranchFollowerRegistry, options: ParsedOptions, ids: string[]): CommandResult {
const patch = JSON.stringify({ data: Object.fromEntries(ids.map((id) => [id, null])) });
const script = [
"set -eu",
"tmpdir=$(mktemp -d)",
"cleanup() { rm -rf \"$tmpdir\"; }",
"trap cleanup EXIT INT TERM",
`NAMESPACE=${shQuote(registry.controller.namespace)}`,
`CONFIGMAP=${shQuote(registry.controller.stateConfigMapName)}`,
`PATCH=${shQuote(patch)}`,
"export NAMESPACE CONFIGMAP PATCH",
"if ! kubectl -n \"$NAMESPACE\" get configmap \"$CONFIGMAP\" >/dev/null 2>\"$tmpdir/error\"; then",
" if grep -qi 'not found' \"$tmpdir/error\"; then",
" printf '{\"ok\":true,\"present\":false,\"patched\":false,\"reason\":\"state-configmap-not-found\",\"parsedDownstreamCliOutput\":false}'",
" exit 0",
" fi",
" cat \"$tmpdir/error\" >&2",
" exit 1",
"fi",
"kubectl -n \"$NAMESPACE\" patch configmap \"$CONFIGMAP\" --type merge -p \"$PATCH\" >/dev/null",
"printf '{\"ok\":true,\"present\":true,\"patched\":true,\"parsedDownstreamCliOutput\":false}'",
].join("\n");
return runKubeScript(registry, options, script, "", 10_000);
}
function kubeJson(registry: BranchFollowerRegistry, options: ParsedOptions, command: string, timeoutMs: number): { ok: boolean; value: Record<string, unknown> | null; error: string } {
const result = runKubeScript(registry, options, `set -eu\n${command}`, "", timeoutMs);
const value = result.exitCode === 0 ? parseJsonObject(result.stdout) : null;
@@ -2440,6 +2587,7 @@ function renderHuman(command: string, payload: Record<string, unknown>, options:
if (command.endsWith(" apply")) return renderApplyHuman(payload);
if (command.endsWith(" status")) return renderStatusHuman(payload, options);
if (command.endsWith(" run-once")) return renderRunOnceHuman(payload);
if (command.endsWith(" cleanup-state")) return renderCleanupStateHuman(payload);
if (command.endsWith(" events") || command.endsWith(" logs")) return renderDrillDownHuman(payload);
return `${JSON.stringify(payload, null, 2)}\n`;
}
@@ -2568,6 +2716,34 @@ function renderRunOnceHuman(payload: Record<string, unknown>): string {
].join("\n");
}
function renderCleanupStateHuman(payload: Record<string, unknown>): string {
const controller = asOptionalRecord(payload.controller);
const command = asOptionalRecord(payload.command);
const followers = arrayRecords(payload.followers);
const next = asOptionalRecord(payload.next);
const rows = followers.map((item) => [
item.id,
item.statePresent === true ? "present" : "missing",
item.cleanup ?? "-",
]);
return [
`CI/CD BRANCH-FOLLOWER CLEANUP-STATE (${payload.ok === false ? "failed" : payload.dryRun === true ? "dry-run" : "ok"})`,
"",
table(
["NAMESPACE", "ROUTE", "STATE_CM", "STATE_CM_PRESENT"],
[[controller?.namespace ?? "-", controller?.route ?? "-", controller?.stateConfigMapName ?? "-", payload.stateConfigMapPresent === true ? "present" : "missing"]],
),
"",
table(["FOLLOWER", "STATE", "CLEANUP"], rows),
command === null ? "" : `\nPATCH\nexit=${command.exitCode ?? "-"} timedOut=${command.timedOut ?? "-"}`,
"",
"NEXT",
`status: ${next?.status ?? "-"}`,
`run-once: ${next?.runOnce ?? "-"}`,
"",
].filter((line) => line !== "").join("\n");
}
function renderDrillDownHuman(payload: Record<string, unknown>): string {
if (payload.follower === undefined) {
const followers = arrayRecords(payload.followers);