Files
pikasTech-unidesk/scripts/src/cicd-branch-follower.ts
T

2785 lines
134 KiB
TypeScript

// SPEC: PJ2026-01060703 CI/CD branch follower draft-2026-07-03-p0-branch-follower.
// Responsibility: YAML-first K8s branch-follower controller, status, and adapter orchestration.
import { createHash } from "node:crypto";
import { readFileSync } from "node:fs";
import { isAbsolute } from "node:path";
import { repoRoot, rootPath, type UniDeskConfig } from "./config";
import { runCommand, type CommandResult } from "./command";
import { startJob } from "./jobs";
import type { RenderedCliResult } from "./output";
import { renderMachine, renderResult } from "./cicd-render";
import { hwlabRuntimeLaneSpecForNode } from "./hwlab-node-lanes";
import { agentRunImageArtifact, renderAgentRunGitopsFiles } from "./agentrun-manifests";
import { agentRunPipelineRunName, resolveAgentRunLaneTarget } from "./agentrun-lanes";
import { yamlLaneGitMirrorJobManifest, yamlLaneGitopsPublishJobManifest, yamlLaneGitopsPublishPayloadFromProbe, yamlLanePipelineRunManifest } from "./agentrun/secrets";
import { yamlLaneK3sBuildImageJobManifest } from "./agentrun/yaml-lane";
import { nodeRuntimePipelineRunName } from "./hwlab-node/cleanup";
import { nodeRuntimeGitMirrorJobManifest } from "./hwlab-node/render";
import { nodeRuntimeGitMirrorTarget, nodeRuntimePipelineRunManifest } from "./hwlab-node/web-probe";
import { loadSentinelCicdState } from "./hwlab-node-web-sentinel-cicd";
import { sentinelPublishPipelineRunManifest } from "./hwlab-node-web-sentinel-cicd-jobs";
import { sentinelPipelineRunName } from "./hwlab-node-web-sentinel-cicd-shared";
import { transPath } from "./hwlab-node/runtime-common";
import { configRefGraph, resolveConfigRefString } from "./ops/config-refs";
import { renderControllerManifests, renderControllerReconcileJob, waitForJobShell } from "./cicd-controller-render";
import { buildDebugStep } from "./cicd-debug";
import { runNativeHwlabControlPlaneRefresh } from "./cicd-hwlab-refresh";
import { nativeCicdScriptLoadShell, readNativeObjectBundle } from "./cicd-native-bundle";
import { runNativeK8sJob, runNativeTektonPipelineRun } from "./cicd-native";
import { argoApplicationReady, nativeArgoSummary, nativeGitMirrorReady, nativeGitMirrorRequired, nativeGitMirrorSummary, nativePipelineRunSummary, nativeRuntimeSummary, pipelineRunSucceeded, runtimeTargetShaFromWorkloads, runtimeWorkloadsReady } from "./cicd-native-summary";
import { invalidRuntimeReuseConfig, missingRuntimeReuseConfig, parseRuntimeReuseConfig, RUNTIME_REUSE_CONFIG_PATH, runtimeReuseService, summarizeRuntimeReuseConfig, type RuntimeReuseConfig } from "./cicd-reuse-config";
import { prioritizedTaskRunItems } from "./cicd-taskruns";
import type { AdapterSummary, BranchFollowerAction, BranchFollowerDebugStep, BranchFollowerPhase, BranchFollowerRegistry, ControllerSpec, FollowerSpec, FollowerState, K8sFollowerStateRead, K8sStateRead, NativeCloseoutWaitResult, NativeK8sJobResult, NativeStatusSpec, NativeWorkloadSpec, OutputMode, ParsedOptions, StageTiming, TriggerResult } from "./cicd-types";
import {
arrayField,
asRecord,
booleanField,
integerField,
readYamlRecord,
recordField,
redactText,
shQuote,
stringArrayField,
stringField,
} from "./platform-infra-ops-library";
const DEFAULT_CONFIG_PATH = "config/cicd-branch-followers.yaml";
const SPEC_REF = "PJ2026-01060703";
const SPEC_VERSION = "draft-2026-07-03-p0-branch-follower";
export function cicdHelp(): unknown {
return {
command: "cicd branch-follower plan|apply|status|run-once|debug-step|cleanup-state|events|logs",
output: "text by default; use --json, --raw, or -o json|yaml for machine output",
usage: [
"bun scripts/cli.ts cicd branch-follower plan",
"bun scripts/cli.ts cicd branch-follower apply --confirm --wait",
"bun scripts/cli.ts cicd branch-follower status",
"bun scripts/cli.ts cicd branch-follower status --live",
"bun scripts/cli.ts cicd branch-follower run-once --all --dry-run",
"bun scripts/cli.ts cicd branch-follower run-once --follower hwlab-jd01-v03 --confirm --wait",
"bun scripts/cli.ts cicd branch-follower debug-step --follower web-probe-sentinel-master --step controller-source",
"bun scripts/cli.ts cicd branch-follower debug-step --follower web-probe-sentinel-master --step state-read",
"bun scripts/cli.ts cicd branch-follower debug-step --follower web-probe-sentinel-master --step state-write --confirm",
"bun scripts/cli.ts cicd branch-follower cleanup-state --follower web-probe-sentinel-master --confirm",
"bun scripts/cli.ts cicd branch-follower events --follower agentrun-jd01-v02",
"bun scripts/cli.ts cicd branch-follower logs --follower web-probe-sentinel-master",
],
config: DEFAULT_CONFIG_PATH,
spec: `${SPEC_REF} ${SPEC_VERSION}`,
description: "Deploy and inspect the YAML-first Kubernetes branch follower that follows HWLAB v0.3, AgentRun v0.2, and the selected web-probe sentinel master lane without using host worktrees as source authority.",
};
}
export async function runCicdCommand(_config: UniDeskConfig | null, args: string[]): Promise<RenderedCliResult> {
const top = args[0];
if (top === undefined || isHelpToken(top)) return renderMachine("cicd", cicdHelp(), "json");
if (top !== "branch-follower") {
throw new Error("cicd usage: cicd branch-follower plan|apply|status|run-once|debug-step|cleanup-state|events|logs");
}
const options = parseOptions(args.slice(1));
const command = commandLabel(options);
if (options.action === "help") return renderMachine(command, cicdHelp(), "json");
const registry = readRegistry(options.configPath);
switch (options.action) {
case "plan":
return renderResult(command, buildPlan(registry, options), options);
case "apply":
return renderResult(command, await applyController(registry, options), options);
case "status":
return renderResult(command, await buildStatus(registry, options), options);
case "run-once":
return renderResult(command, await runOnce(registry, options), options);
case "debug-step":
return renderResult(command, await buildDebugStep(registry, options, { selectFollowers, readK8sState, readAdapterStatus, decideAndMaybeTrigger, writeFollowerState, runKubeScript }), options);
case "cleanup-state":
return renderResult(command, cleanupState(registry, options), options);
case "events":
case "logs":
return renderResult(command, await runFollowerDrillDown(registry, options), options);
case "help":
return renderMachine(command, cicdHelp(), "json");
}
}
function parseOptions(args: string[]): ParsedOptions {
const actionToken = args[0];
if (actionToken === undefined || isHelpToken(actionToken)) {
return defaultOptions("help", args.slice(actionToken === undefined ? 0 : 1));
}
if (!["plan", "apply", "status", "run-once", "debug-step", "cleanup-state", "events", "logs"].includes(actionToken)) {
throw new Error(`cicd branch-follower unknown action: ${actionToken}`);
}
const action = actionToken as BranchFollowerAction;
const options = defaultOptions(action, []);
const rest = args.slice(1);
for (let index = 0; index < rest.length; index += 1) {
const arg = rest[index] ?? "";
if (isHelpToken(arg)) {
options.action = "help";
} else if (arg === "--config") {
options.configPath = valueOption(rest, ++index, arg);
} else if (arg === "--follower" || arg === "--target") {
options.followerId = simpleId(valueOption(rest, ++index, arg), arg);
} else if (arg === "--all") {
options.all = true;
} else if (arg === "--confirm") {
options.confirm = true;
} else if (arg === "--dry-run") {
options.dryRun = true;
} else if (arg === "--wait") {
options.wait = true;
} else if (arg === "--in-cluster") {
options.inCluster = true;
} else if (arg === "--controller") {
if (isInClusterRuntime()) options.inCluster = true;
else if (options.action === "status") options.live = true;
} else if (arg === "--live") {
options.live = true;
} else if (arg === "--no-live") {
options.noLive = true;
} else if (arg === "--full") {
options.full = true;
} else if (arg === "--raw" || arg === "--json") {
options.raw = true;
options.output = "json";
} else if (arg === "--record-state") {
options.recordState = true;
} else if (arg === "--step") {
options.debugStep = debugStepOption(valueOption(rest, ++index, arg));
} else if (arg === "-o" || arg === "--output") {
const value = valueOption(rest, ++index, arg);
if (value !== "json" && value !== "yaml" && value !== "wide" && value !== "text") throw new Error(`${arg} must be json, yaml, wide, or text`);
options.output = value === "wide" || value === "text" ? "human" : value;
if (value === "json" || value === "yaml") {
options.raw = true;
}
} else if (arg === "--limit") {
options.limit = positiveInt(valueOption(rest, ++index, arg), arg);
} else if (arg === "--tail-bytes" || arg === "--tail") {
options.tailBytes = positiveInt(valueOption(rest, ++index, arg), arg);
} else if (arg === "--timeout-seconds") {
options.timeoutSeconds = positiveInt(valueOption(rest, ++index, arg), arg);
} else {
throw new Error(`unsupported cicd branch-follower option: ${arg}`);
}
}
if (options.confirm && options.dryRun) throw new Error("cicd branch-follower accepts only one of --confirm or --dry-run");
if (options.action === "apply" && !options.confirm) options.dryRun = true;
if (options.action === "run-once" && !options.confirm) options.dryRun = true;
if (options.action === "debug-step" && !options.confirm) options.dryRun = true;
if (options.action === "cleanup-state" && !options.confirm) options.dryRun = true;
if (options.action === "run-once" && options.confirm && !options.all && options.followerId === null) {
throw new Error("run-once --confirm requires --all or --follower <id>");
}
if (options.action === "cleanup-state" && options.confirm && !options.all && options.followerId === null) {
throw new Error("cleanup-state --confirm requires --all or --follower <id>");
}
if (options.action === "debug-step" && options.followerId === null) {
throw new Error("debug-step requires --follower <id>");
}
return options;
}
function debugStepOption(value: string): BranchFollowerDebugStep {
if (value === "state-read" || value === "controller-source" || value === "status-read" || value === "decide" || value === "state-write") return value;
throw new Error("--step must be state-read, controller-source, status-read, decide, or state-write");
}
function isInClusterRuntime(): boolean {
return Boolean(process.env.KUBERNETES_SERVICE_HOST && process.env.KUBERNETES_SERVICE_PORT);
}
function defaultOptions(action: BranchFollowerAction, _args: string[]): ParsedOptions {
return {
action,
configPath: DEFAULT_CONFIG_PATH,
followerId: null,
all: false,
confirm: false,
dryRun: false,
wait: false,
inCluster: false,
live: false,
noLive: false,
full: false,
raw: false,
recordState: false,
debugStep: null,
output: "human",
limit: 20,
tailBytes: 12000,
timeoutSeconds: null,
};
}
function valueOption(args: string[], index: number, option: string): string {
const value = args[index];
if (value === undefined || value.length === 0 || value.startsWith("--")) throw new Error(`${option} requires a value`);
return value;
}
function simpleId(value: string, option: string): string {
if (!/^[A-Za-z0-9._-]+$/u.test(value)) throw new Error(`${option} must be a simple id`);
return value;
}
function positiveInt(value: string, option: string): number {
const parsed = Number(value);
if (!Number.isInteger(parsed) || parsed <= 0) throw new Error(`${option} must be a positive integer`);
return parsed;
}
function isHelpToken(value: string): boolean {
return value === "-h" || value === "--help" || value === "help";
}
function readRegistry(configPath: string): BranchFollowerRegistry {
const absolute = isAbsolute(configPath) ? configPath : rootPath(configPath);
const rawText = readFileSync(absolute, "utf8");
const root = readYamlRecord<Record<string, unknown>>(absolute, "CicdBranchFollowerRegistry");
const metadata = recordField(root, "metadata", configPath);
const controller = parseController(recordField(root, "controller", configPath));
const followers = arrayField(root, "followers", configPath).map(parseFollower);
const ids = new Set<string>();
for (const follower of followers) {
if (ids.has(follower.id)) throw new Error(`${configPath}.followers has duplicate id ${follower.id}`);
ids.add(follower.id);
}
return {
path: configPath,
rawText,
rawSha256: createHash("sha256").update(rawText).digest("hex"),
metadata: {
id: stringField(metadata, "id", `${configPath}.metadata`),
owner: stringField(metadata, "owner", `${configPath}.metadata`),
specRef: stringField(metadata, "specRef", `${configPath}.metadata`),
version: stringField(metadata, "version", `${configPath}.metadata`),
},
controller,
followers,
};
}
function parseController(root: Record<string, unknown>): ControllerSpec {
const source = recordField(root, "source", "controller");
const authority = recordField(source, "sourceAuthority", "controller.source");
const snapshot = recordField(source, "sourceSnapshot", "controller.source");
const loop = recordField(root, "loop", "controller");
const budgets = recordField(root, "budgets", "controller");
const result: ControllerSpec = {
namespace: stringField(root, "namespace", "controller"),
kubeRoute: stringField(root, "kubeRoute", "controller"),
fieldManager: stringField(root, "fieldManager", "controller"),
serviceAccountName: stringField(root, "serviceAccountName", "controller"),
deploymentName: stringField(root, "deploymentName", "controller"),
configMapName: stringField(root, "configMapName", "controller"),
stateConfigMapName: stringField(root, "stateConfigMapName", "controller"),
leaseName: stringField(root, "leaseName", "controller"),
image: stringField(root, "image", "controller"),
labels: stringMap(recordField(root, "labels", "controller"), "controller.labels"),
source: {
repository: stringField(source, "repository", "controller.source"),
branch: stringField(source, "branch", "controller.source"),
gitMirrorReadUrl: stringField(source, "gitMirrorReadUrl", "controller.source"),
gitMirrorCachePvcName: stringField(source, "gitMirrorCachePvcName", "controller.source"),
githubSsh: parseControllerGithubSsh(recordField(source, "githubSsh", "controller.source")),
sourceAuthority: {
mode: stringField(authority, "mode", "controller.source.sourceAuthority"),
resolver: stringField(authority, "resolver", "controller.source.sourceAuthority"),
allowHostGit: booleanField(authority, "allowHostGit", "controller.source.sourceAuthority"),
allowHostWorkspace: booleanField(authority, "allowHostWorkspace", "controller.source.sourceAuthority"),
allowGithubDirectInPipeline: booleanField(authority, "allowGithubDirectInPipeline", "controller.source.sourceAuthority"),
},
sourceSnapshot: {
stageRefPrefix: stringField(snapshot, "stageRefPrefix", "controller.source.sourceSnapshot"),
missingObjectPolicy: stringField(snapshot, "missingObjectPolicy", "controller.source.sourceSnapshot"),
refreshPolicy: stringField(snapshot, "refreshPolicy", "controller.source.sourceSnapshot"),
},
},
loop: {
intervalSeconds: integerField(loop, "intervalSeconds", "controller.loop"),
reconcileTimeoutSeconds: integerField(loop, "reconcileTimeoutSeconds", "controller.loop"),
leaseDurationSeconds: integerField(loop, "leaseDurationSeconds", "controller.loop"),
terminationGracePeriodSeconds: integerField(loop, "terminationGracePeriodSeconds", "controller.loop"),
},
budgets: {
applyWaitSeconds: integerField(budgets, "applyWaitSeconds", "controller.budgets"),
statusSeconds: integerField(budgets, "statusSeconds", "controller.budgets"),
runOnceSeconds: integerField(budgets, "runOnceSeconds", "controller.budgets"),
reconcileJobTtlSeconds: integerField(budgets, "reconcileJobTtlSeconds", "controller.budgets"),
reconcileJobBackoffLimit: integerField(budgets, "reconcileJobBackoffLimit", "controller.budgets"),
reconcileJobDeadlineGraceSeconds: integerField(budgets, "reconcileJobDeadlineGraceSeconds", "controller.budgets"),
reconcileTransportGraceSeconds: integerField(budgets, "reconcileTransportGraceSeconds", "controller.budgets"),
nativeTransportGraceSeconds: integerField(budgets, "nativeTransportGraceSeconds", "controller.budgets"),
nativePollIntervalSeconds: integerField(budgets, "nativePollIntervalSeconds", "controller.budgets"),
},
};
if (result.source.sourceAuthority.allowHostGit || result.source.sourceAuthority.allowHostWorkspace || result.source.sourceAuthority.allowGithubDirectInPipeline) {
throw new Error("controller.source.sourceAuthority must disable host git, host workspace, and direct GitHub pipeline fallback");
}
return result;
}
function parseControllerGithubSsh(root: Record<string, unknown>): ControllerSpec["source"]["githubSsh"] {
return {
secretName: stringField(root, "secretName", "controller.source.githubSsh"),
privateKeySecretKey: stringField(root, "privateKeySecretKey", "controller.source.githubSsh"),
proxyHost: stringField(root, "proxyHost", "controller.source.githubSsh"),
proxyPort: integerField(root, "proxyPort", "controller.source.githubSsh"),
};
}
function parseFollower(root: Record<string, unknown>, index: number): FollowerSpec {
const label = `followers[${index}]`;
const source = recordField(root, "source", label);
const target = recordField(root, "target", label);
const budgets = recordField(root, "budgets", label);
const commands = recordField(root, "commands", label);
const nativeStatus = recordField(root, "nativeStatus", label);
const closeout = recordField(root, "closeout", label);
const configRefs = stringMap(recordField(target, "configRefs", `${label}.target`), `${label}.target.configRefs`);
return {
id: simpleId(stringField(root, "id", label), `${label}.id`),
enabled: booleanField(root, "enabled", label),
adapter: stringField(root, "adapter", label),
description: stringField(root, "description", label),
source: {
repository: stringField(source, "repository", `${label}.source`),
branch: stringField(source, "branch", `${label}.source`),
branchRef: stringField(source, "branchRef", `${label}.source`),
authorityRef: stringField(source, "authorityRef", `${label}.source`),
snapshotPrefix: stringField(source, "snapshotPrefix", `${label}.source`),
snapshotRef: stringField(source, "snapshotRef", `${label}.source`),
},
target: {
node: stringField(target, "node", `${label}.target`),
lane: stringField(target, "lane", `${label}.target`),
namespace: stringField(target, "namespace", `${label}.target`),
sentinel: typeof target.sentinel === "string" && target.sentinel.length > 0 ? target.sentinel : null,
configRefs,
},
budgets: {
endToEndSeconds: integerField(budgets, "endToEndSeconds", `${label}.budgets`),
statusSeconds: integerField(budgets, "statusSeconds", `${label}.budgets`),
triggerSeconds: integerField(budgets, "triggerSeconds", `${label}.budgets`),
sourceSyncSeconds: integerField(budgets, "sourceSyncSeconds", `${label}.budgets`),
controlPlaneRefreshSeconds: integerField(budgets, "controlPlaneRefreshSeconds", `${label}.budgets`),
capabilityJobTtlSeconds: integerField(budgets, "capabilityJobTtlSeconds", `${label}.budgets`),
capabilityJobBackoffLimit: integerField(budgets, "capabilityJobBackoffLimit", `${label}.budgets`),
},
commands: {
plan: parseCommand(recordField(commands, "plan", `${label}.commands`), `${label}.commands.plan`),
status: parseCommand(recordField(commands, "status", `${label}.commands`), `${label}.commands.status`),
trigger: parseCommand(recordField(commands, "trigger", `${label}.commands`), `${label}.commands.trigger`),
events: parseCommand(recordField(commands, "events", `${label}.commands`), `${label}.commands.events`),
logs: parseCommand(recordField(commands, "logs", `${label}.commands`), `${label}.commands.logs`),
},
nativeStatus: parseNativeStatus(nativeStatus, `${label}.nativeStatus`),
closeoutChecks: stringArrayField(closeout, "checks", `${label}.closeout`),
};
}
function parseNativeStatus(root: Record<string, unknown>, label: string): NativeStatusSpec {
const source = recordField(root, "source", label);
const tekton = asOptionalRecord(root.tekton);
const argo = asOptionalRecord(root.argo);
const runtime = asOptionalRecord(root.runtime);
return {
source: {
gitMirrorReadUrl: stringField(source, "gitMirrorReadUrl", `${label}.source`),
gitMirrorNamespace: stringField(source, "gitMirrorNamespace", `${label}.source`),
gitMirrorDeployment: stringField(source, "gitMirrorDeployment", `${label}.source`),
repoPath: stringField(source, "repoPath", `${label}.source`),
},
tekton: tekton === null
? null
: {
namespace: stringField(tekton, "namespace", `${label}.tekton`),
pipelineRunPrefix: stringField(tekton, "pipelineRunPrefix", `${label}.tekton`),
},
argo: argo === null
? null
: {
namespace: stringField(argo, "namespace", `${label}.argo`),
application: stringField(argo, "application", `${label}.argo`),
},
runtime: runtime === null
? null
: {
namespace: stringField(runtime, "namespace", `${label}.runtime`),
workloads: arrayField(runtime, "workloads", `${label}.runtime`).map((item, index) => parseNativeWorkload(item, `${label}.runtime.workloads[${index}]`)),
},
};
}
function parseNativeWorkload(value: unknown, label: string): NativeWorkloadSpec {
const root = asRecord(value, label);
const kind = stringField(root, "kind", label);
if (kind !== "Deployment" && kind !== "StatefulSet") throw new Error(`${label}.kind must be Deployment or StatefulSet`);
const sourceCommit = asOptionalRecord(root.sourceCommit) ?? {};
return {
kind,
name: stringField(root, "name", label),
sourceCommit: {
labels: optionalStringArrayField(sourceCommit, "labels", `${label}.sourceCommit`),
annotations: optionalStringArrayField(sourceCommit, "annotations", `${label}.sourceCommit`),
podLabels: optionalStringArrayField(sourceCommit, "podLabels", `${label}.sourceCommit`),
podAnnotations: optionalStringArrayField(sourceCommit, "podAnnotations", `${label}.sourceCommit`),
env: optionalStringArrayField(sourceCommit, "env", `${label}.sourceCommit`),
},
};
}
function parseCommand(root: Record<string, unknown>, label: string): CommandSpec {
return {
argv: stringArrayField(root, "argv", label),
timeoutSeconds: integerField(root, "timeoutSeconds", label),
};
}
function optionalStringArrayField(root: Record<string, unknown>, key: string, label: string): string[] {
return root[key] === undefined ? [] : stringArrayField(root, key, label);
}
function stringMap(root: Record<string, unknown>, label: string): Record<string, string> {
const result: Record<string, string> = {};
for (const [key, value] of Object.entries(root)) {
if (typeof value !== "string" || value.length === 0) throw new Error(`${label}.${key} must be a non-empty string`);
result[key] = value;
}
return result;
}
function buildPlan(registry: BranchFollowerRegistry, options: ParsedOptions): Record<string, unknown> {
const selected = selectFollowers(registry, options, { includeDisabled: true });
const followers = selected.map((follower) => {
const branchValue = safeResolveString(follower.source.branchRef);
const graph = configRefGraph([
{ id: "source.branch", ref: follower.source.branchRef },
{ id: "source.authority", ref: follower.source.authorityRef },
{ id: "source.snapshot", ref: follower.source.snapshotRef },
...Object.entries(follower.target.configRefs).map(([id, ref]) => ({ id: `target.${id}`, ref })),
]);
const warnings: string[] = [];
if (branchValue !== null && branchValue !== follower.source.branch) warnings.push(`source.branch ${follower.source.branch} differs from ${follower.source.branchRef} value ${branchValue}`);
return {
id: follower.id,
enabled: follower.enabled,
adapter: follower.adapter,
description: follower.description,
source: {
repository: follower.source.repository,
branch: follower.source.branch,
branchRef: follower.source.branchRef,
resolvedBranch: branchValue,
snapshotPrefix: follower.source.snapshotPrefix,
},
target: follower.target,
budgets: follower.budgets,
commands: redactCommands(follower),
nativeStatus: nativeStatusPlan(follower.nativeStatus),
closeoutChecks: follower.closeoutChecks,
configRefGraph: graph,
warnings,
};
});
return {
ok: true,
action: "plan",
spec: `${SPEC_REF} ${SPEC_VERSION}`,
registry: registrySummary(registry),
hostWorktreeAuthority: false,
sourceAuthority: {
mode: registry.controller.source.sourceAuthority.mode,
resolver: registry.controller.source.sourceAuthority.resolver,
allowHostGit: registry.controller.source.sourceAuthority.allowHostGit,
allowHostWorkspace: registry.controller.source.sourceAuthority.allowHostWorkspace,
allowGithubDirectInPipeline: registry.controller.source.sourceAuthority.allowGithubDirectInPipeline,
},
controller: {
namespace: registry.controller.namespace,
kubeRoute: registry.controller.kubeRoute,
deploymentName: registry.controller.deploymentName,
stateConfigMapName: registry.controller.stateConfigMapName,
leaseName: registry.controller.leaseName,
image: registry.controller.image,
loop: registry.controller.loop,
budgets: registry.controller.budgets,
},
followers,
next: {
apply: "bun scripts/cli.ts cicd branch-follower apply --confirm --wait",
status: "bun scripts/cli.ts cicd branch-follower status",
dryRun: "bun scripts/cli.ts cicd branch-follower run-once --all --dry-run",
},
};
}
async function applyController(registry: BranchFollowerRegistry, options: ParsedOptions): Promise<Record<string, unknown>> {
const manifests = renderControllerManifests(registry);
const manifestYaml = `${manifests.map((item) => Bun.YAML.stringify(item).trim()).join("\n---\n")}\n`;
const manifestBase64 = Buffer.from(manifestYaml, "utf8").toString("base64");
const waitSeconds = options.timeoutSeconds ?? registry.controller.budgets.applyWaitSeconds;
const script = [
"set -eu",
"tmp=$(mktemp)",
"base64 -d >\"$tmp\" <<'UNIDESK_CICD_BRANCH_FOLLOWER_MANIFEST_B64'",
manifestBase64,
"UNIDESK_CICD_BRANCH_FOLLOWER_MANIFEST_B64",
options.dryRun
? `kubectl apply --dry-run=server --field-manager=${shQuote(registry.controller.fieldManager)} -f "$tmp"`
: `kubectl apply --server-side --force-conflicts --field-manager=${shQuote(registry.controller.fieldManager)} -f "$tmp"`,
!options.dryRun && options.wait
? `kubectl -n ${shQuote(registry.controller.namespace)} rollout status deploy/${shQuote(registry.controller.deploymentName)} --timeout=${waitSeconds}s`
: "true",
`kubectl -n ${shQuote(registry.controller.namespace)} get deploy/${shQuote(registry.controller.deploymentName)} cm/${shQuote(registry.controller.configMapName)} cm/${shQuote(registry.controller.stateConfigMapName)} lease/${shQuote(registry.controller.leaseName)} -o wide 2>/dev/null || true`,
].join("\n");
const result = runKubeScript(registry, options, script, "", (waitSeconds + 15) * 1000);
return {
ok: result.exitCode === 0,
action: "apply",
dryRun: options.dryRun,
wait: options.wait,
registry: registrySummary(registry),
objects: manifests.map((item) => objectRef(item)),
manifestSha256: createHash("sha256").update(manifestYaml).digest("hex"),
controller: {
namespace: registry.controller.namespace,
route: registry.controller.kubeRoute,
deploymentName: registry.controller.deploymentName,
stateConfigMapName: registry.controller.stateConfigMapName,
leaseName: registry.controller.leaseName,
hostWorktreeMounted: false,
sourceMode: "k8s-git-mirror-to-emptyDir",
},
command: commandCompact(result, options),
next: {
status: "bun scripts/cli.ts cicd branch-follower status",
logs: `bun scripts/cli.ts cicd branch-follower logs --follower ${registry.followers[0]?.id ?? "<id>"}`,
dryRun: "bun scripts/cli.ts cicd branch-follower run-once --all --dry-run",
},
};
}
async function buildStatus(registry: BranchFollowerRegistry, options: ParsedOptions): Promise<Record<string, unknown>> {
let k8s = readK8sState(registry, options);
const wantsLive = options.live || (!options.noLive && Object.keys(k8s.stateByFollower).length === 0);
const refresh = wantsLive && !options.inCluster ? runControllerReconcileJob(registry, options, { dryRun: true, wait: true, recordState: true }) : null;
if (refresh !== null) k8s = readK8sState(registry, options);
const shouldLive = wantsLive && options.inCluster;
const selected = selectFollowers(registry, options, { includeDisabled: true });
const followers = [];
const detailedFollowers = options.followerId !== null || options.full;
for (const follower of selected) {
const stored = k8s.stateByFollower[follower.id] ?? {};
const live = shouldLive && follower.enabled ? await readAdapterStatus(registry, follower, options) : null;
followers.push(mergeFollowerStatus(registry, follower, stored, live, shouldLive, detailedFollowers));
}
return {
ok: k8s.ok && followers.every((item) => item.ok !== false),
action: "status",
live: shouldLive,
registry: registrySummary(registry),
controller: controllerStatusSummary(registry, k8s),
followers,
refresh,
errors: k8s.errors,
next: {
apply: "bun scripts/cli.ts cicd branch-follower apply --confirm --wait",
liveStatus: "bun scripts/cli.ts cicd branch-follower status --live",
dryRun: "bun scripts/cli.ts cicd branch-follower run-once --all --dry-run",
},
};
}
async function runOnce(registry: BranchFollowerRegistry, options: ParsedOptions): Promise<Record<string, unknown>> {
if (!options.inCluster) {
const refresh = runControllerReconcileJob(registry, options, { dryRun: options.dryRun, wait: true, recordState: true });
const k8s = readK8sState(registry, options);
const selected = selectFollowers(registry, options, { includeDisabled: false });
return {
ok: refresh.ok,
action: "run-once",
dryRun: options.dryRun,
confirm: options.confirm,
wait: true,
controller: false,
execution: "k8s-native-reconcile-job",
registry: registrySummary(registry),
job: refresh,
followers: selected.map((follower) => mergeFollowerStatus(registry, follower, k8s.stateByFollower[follower.id] ?? {}, null, false)),
warnings: refresh.ok ? [] : [`reconcile job failed: ${refresh.message}`],
next: {
status: "bun scripts/cli.ts cicd branch-follower status",
liveStatus: "bun scripts/cli.ts cicd branch-follower status --live",
},
};
}
const selected = selectFollowers(registry, options, { includeDisabled: false });
const previous = readK8sState(registry, options);
const results: FollowerState[] = [];
const stateWriteWarnings: string[] = [];
const stateWrites: Record<string, unknown>[] = [];
for (const follower of selected) {
const oldState = previous.stateByFollower[follower.id] ?? {};
const live = await readAdapterStatus(registry, follower, options);
const state = await decideAndMaybeTrigger(registry, follower, oldState, live, options);
if (!options.dryRun || options.recordState) {
const write = writeFollowerState(registry, state, options);
const writeSummary = stateWriteSummary(follower.id, write);
stateWrites.push(writeSummary);
if (write.exitCode !== 0) {
const warning = `state write failed for ${follower.id}: ${tailText(write.stderr || write.stdout, 300)}`;
state.warnings.push(warning);
stateWriteWarnings.push(warning);
}
}
results.push(state);
}
return {
ok: results.every((item) => item.phase !== "Failed" && item.phase !== "Blocked"),
action: "run-once",
dryRun: options.dryRun,
confirm: options.confirm,
wait: options.wait,
controller: options.inCluster,
registry: registrySummary(registry),
followers: results,
stateWrites,
warnings: stateWriteWarnings,
next: {
status: "bun scripts/cli.ts cicd branch-follower status",
liveStatus: "bun scripts/cli.ts cicd branch-follower status --live",
},
};
}
function cleanupState(registry: BranchFollowerRegistry, options: ParsedOptions): Record<string, unknown> {
const selected = selectFollowers(registry, options, { includeDisabled: true });
const ids = selected.map((follower) => follower.id);
const before = kubeConfigMapFollowerState(registry, options);
const presentKeys = ids.filter((id) => before.stateByFollower[id] !== undefined);
const command = options.confirm ? removeFollowerStateKeys(registry, options, ids) : null;
return {
ok: command === null ? before.ok : command.exitCode === 0,
action: "cleanup-state",
dryRun: !options.confirm,
confirm: options.confirm,
execution: "k8s-state-configmap-key-delete",
registry: registrySummary(registry),
controller: {
namespace: registry.controller.namespace,
route: registry.controller.kubeRoute,
stateConfigMapName: registry.controller.stateConfigMapName,
},
followers: ids.map((id) => ({
id,
statePresent: before.stateByFollower[id] !== undefined,
cleanup: options.confirm ? "deleted-if-present" : "would-delete-if-present",
})),
stateConfigMapPresent: before.present,
presentKeys,
parsedDownstreamCliOutput: false,
command: command === null ? null : commandCompact(command, options),
errors: before.ok ? [] : [before.error],
next: {
status: "bun scripts/cli.ts cicd branch-follower status",
runOnce: options.followerId === null
? "bun scripts/cli.ts cicd branch-follower run-once --all --confirm --wait"
: `bun scripts/cli.ts cicd branch-follower run-once --follower ${options.followerId} --confirm --wait`,
},
};
}
async function runFollowerDrillDown(registry: BranchFollowerRegistry, options: ParsedOptions): Promise<Record<string, unknown>> {
if (options.followerId === null) {
return {
ok: true,
action: options.action,
message: "select one follower to inspect native Kubernetes status objects",
followers: registry.followers.map((follower) => ({
id: follower.id,
adapter: follower.adapter,
statusAuthority: "k8s-native",
nativeStatus: nativeStatusPlan(follower.nativeStatus),
})),
};
}
const follower = registry.followers.find((item) => item.id === options.followerId);
if (follower === undefined) throw new Error(`unknown follower ${options.followerId}`);
if (!options.inCluster) {
const refresh = runControllerReconcileJob(registry, options, { dryRun: true, wait: true, recordState: true });
const k8s = readK8sState(registry, options);
const stored = k8s.stateByFollower[follower.id] ?? {};
const storedSource = asOptionalRecord(stored.source);
const storedTarget = asOptionalRecord(stored.target);
const command = asOptionalRecord(stored.command);
const native = asOptionalRecord(command?.payload);
return {
ok: refresh.ok && k8s.ok && Object.keys(stored).length > 0,
action: options.action,
follower: follower.id,
adapter: follower.adapter,
statusAuthority: "k8s-native-state-configmap",
parsedDownstreamCliOutput: false,
summary: {
phase: stringOrNull(stored.phase) ?? "Observed",
observedSha: stringOrNull(storedSource?.observedSha),
targetSha: stringOrNull(storedTarget?.targetSha),
pipelineRun: stringOrNull(stored.pipelineRun) ?? stringOrNull(stored.inFlightJob),
aligned: null,
message: stringOrNull(stored.decision) ?? "no controller state yet",
},
native,
refresh,
errors: k8s.errors,
next: {
status: `bun scripts/cli.ts cicd branch-follower status --follower ${follower.id}`,
liveStatus: `bun scripts/cli.ts cicd branch-follower status --follower ${follower.id} --live`,
runOnceDryRun: `bun scripts/cli.ts cicd branch-follower run-once --follower ${follower.id} --dry-run`,
},
};
}
const live = await readAdapterStatus(registry, follower, options);
return {
ok: live.ok,
action: options.action,
follower: follower.id,
adapter: follower.adapter,
statusAuthority: "k8s-native",
parsedDownstreamCliOutput: false,
summary: {
phase: live.phase,
observedSha: live.observedSha,
targetSha: live.targetSha,
pipelineRun: live.pipelineRun,
aligned: live.aligned,
message: live.message,
},
native: live.payload,
next: {
status: `bun scripts/cli.ts cicd branch-follower status --follower ${follower.id}`,
runOnceDryRun: `bun scripts/cli.ts cicd branch-follower run-once --follower ${follower.id} --dry-run`,
},
};
}
async function decideAndMaybeTrigger(
registry: BranchFollowerRegistry,
follower: FollowerSpec,
previous: Record<string, unknown>,
live: AdapterSummary,
options: ParsedOptions,
): Promise<FollowerState> {
const warnings: string[] = [];
if (!live.ok) warnings.push(`status command failed: exitCode=${live.exitCode}${live.timedOut ? " timedOut=true" : ""}`);
const observedSha = live.observedSha;
let targetSha = live.targetSha;
const previousPhase = stringOrNull(previous.phase);
const previousLastTriggered = previousPhase === "Failed" ? null : stringOrNull(previous.lastTriggeredSha);
const previousInFlight = stringOrNull(previous.inFlightJob);
const previousLastSucceeded = stringOrNull(previous.lastSucceededSha);
const previousObserved = stringOrNull(recordAt(previous, ["source"])?.observedSha);
const previousTarget = stringOrNull(recordAt(previous, ["target"])?.targetSha);
const superseded = previousInFlight !== null && previousObserved !== null && observedSha !== null && previousObserved !== observedSha;
const nativePipelineRunMatchesObserved = observedSha !== null
&& live.pipelineRunPresent === true
&& live.pipelineRun !== null
&& live.pipelineRun === expectedPipelineRunName(follower, observedSha);
let phase: BranchFollowerPhase;
let decision: string;
let triggerCommand: Record<string, unknown> | undefined;
let inFlightJob: string | null = live.inFlightJob;
let lastTriggeredSha = live.lastTriggeredSha ?? previousLastTriggered ?? (nativePipelineRunMatchesObserved ? observedSha : null);
let lastSucceededSha = live.lastSucceededSha ?? previousLastSucceeded;
if (targetSha === null && observedSha !== null && previousLastSucceeded === observedSha && previousTarget === observedSha) targetSha = observedSha;
if (!follower.enabled) {
phase = "Skipped";
decision = "follower disabled";
} else if (observedSha === null) {
phase = live.ok ? "Observed" : "Blocked";
decision = "status did not expose an observed source sha; adapter trigger-current remains the dedupe authority";
} else if (superseded) {
phase = "Superseded";
decision = `previous in-flight sha ${shortSha(previousObserved)} was superseded by ${shortSha(observedSha)}`;
} else if (targetSha !== null && targetSha === observedSha && live.aligned === true) {
phase = "Noop";
decision = "target already matches observed source sha";
inFlightJob = null;
lastSucceededSha = observedSha;
} else if (lastTriggeredSha !== null && lastTriggeredSha === observedSha && live.pipelineRunPresent === true) {
phase = "ClosingOut";
decision = targetSha === observedSha
? "target sha matches observed source sha but native closeout gates are not complete"
: "same sha was already triggered; use status/events/logs for closeout";
} else if (lastTriggeredSha !== null && lastTriggeredSha === observedSha) {
phase = "PendingTrigger";
decision = "same sha has a trigger record but no PipelineRun is present; retrying trigger";
} else if (targetSha !== null && targetSha === observedSha) {
phase = "ClosingOut";
decision = "target sha matches observed source sha but native closeout gates are not complete";
} else {
phase = "PendingTrigger";
decision = targetSha === null
? "target sha is unknown; trigger-current adapter will dedupe by source snapshot"
: `observed ${shortSha(observedSha)} differs from target ${shortSha(targetSha)}`;
}
if (options.confirm && (phase === "PendingTrigger" || phase === "Superseded" || (phase === "Observed" && observedSha !== null))) {
const trigger = await executeTrigger(registry, follower, observedSha, options);
triggerCommand = trigger.command;
phase = trigger.ok ? (options.wait ? "ClosingOut" : "Triggering") : "Failed";
decision = trigger.ok ? `trigger submitted for ${shortSha(observedSha)}` : `trigger failed for ${shortSha(observedSha)}: ${redactText(trigger.message).slice(0, 220)}`;
inFlightJob = trigger.jobId ?? live.inFlightJob;
lastTriggeredSha = observedSha;
if (trigger.ok && options.wait && trigger.completed) {
phase = "Succeeded";
decision = `trigger completed for ${shortSha(observedSha)}`;
inFlightJob = null;
targetSha = observedSha;
lastSucceededSha = observedSha;
}
if (!trigger.ok) warnings.push(trigger.message);
}
if (options.confirm && options.wait && phase === "ClosingOut" && observedSha !== null && triggerCommand === undefined) {
const closeout = await waitNativeFollowerCloseout(registry, follower, observedSha, options, options.timeoutSeconds ?? follower.budgets.endToEndSeconds);
triggerCommand = closeoutOnlyCommand(follower, live.pipelineRun, observedSha, closeout);
if (closeout.completed) {
phase = "Succeeded";
decision = `closeout completed for ${shortSha(observedSha)}`;
inFlightJob = null;
targetSha = observedSha;
lastSucceededSha = observedSha;
} else {
decision = closeout.timedOut
? `closeout did not converge for ${shortSha(observedSha)} within budget`
: `closeout remains pending for ${shortSha(observedSha)}`;
warnings.push(decision);
}
}
if (options.dryRun && phase === "PendingTrigger") decision = `${decision}; dry-run did not trigger`;
let stateLive: AdapterSummary = live;
let automaticCloseoutAccelerated = false;
if (shouldRefreshAutomaticCloseout(follower, observedSha, live, phase, options)) {
const refresh = runNativeArgoRefresh(follower.nativeStatus.argo as NonNullable<NativeStatusSpec["argo"]>, follower.budgets.controlPlaneRefreshSeconds);
if (refresh.exitCode !== 0) warnings.push(`argo refresh failed: ${redactText(tailText(refresh.stderr || refresh.stdout, 300))}`);
else automaticCloseoutAccelerated = true;
}
if (shouldFlushAutomaticCloseout(follower, observedSha, live, phase, options)) {
const gitMirror = asOptionalRecord(asOptionalRecord(live.payload)?.gitMirror);
const flushKey = stringOrNull(gitMirror?.localGitops) ?? observedSha;
const flush = runNativeGitMirrorStage(registry, follower, observedSha, "flush", follower.budgets.sourceSyncSeconds, flushKey);
if (flush !== null && !flush.result.ok) warnings.push(`git-mirror flush failed: ${redactText(tailText(flush.result.conditionMessage ?? flush.result.logsTail ?? "unknown", 300))}`);
else if (flush !== null) automaticCloseoutAccelerated = true;
}
if (automaticCloseoutAccelerated && observedSha !== null) {
const reread = await readAdapterStatusAfterCloseoutAcceleration(registry, follower, observedSha, options);
if (!reread.ok) {
warnings.push(`post-closeout status re-read failed: ${redactText(tailText(reread.message, 300))}`);
} else if (reread.observedSha === observedSha) {
stateLive = reread;
targetSha = reread.targetSha ?? targetSha;
inFlightJob = reread.inFlightJob;
if (reread.aligned === true) {
phase = "Noop";
decision = "target already matches observed source sha";
inFlightJob = null;
lastSucceededSha = observedSha;
}
}
}
const statePipelineRun = stringOrNull(triggerCommand?.pipelineRun) ?? stateLive.pipelineRun;
return {
id: follower.id,
adapter: follower.adapter,
enabled: follower.enabled,
phase,
source: {
repository: follower.source.repository,
branch: follower.source.branch,
branchRef: follower.source.branchRef,
snapshotPrefix: follower.source.snapshotPrefix,
observedSha: stateLive.observedSha ?? observedSha,
},
target: {
node: follower.target.node,
lane: follower.target.lane,
namespace: follower.target.namespace,
sentinel: follower.target.sentinel,
targetSha,
},
lastTriggeredSha,
lastSucceededSha,
pipelineRun: statePipelineRun,
inFlightJob,
budgetSource: follower.budgets,
controller: {
mode: options.inCluster ? "k8s-controller" : "local-cli",
stateConfigMap: registry.controller.stateConfigMapName,
leaseName: registry.controller.leaseName,
},
decision,
dryRun: options.dryRun,
updatedAt: new Date().toISOString(),
timings: buildFollowerTimings(follower, stateLive, triggerCommand, asOptionalRecord(previous.timings), phase),
warnings,
next: followerNextCommands(follower),
command: triggerCommand ?? {
status: stateLive.command,
exitCode: stateLive.exitCode,
timedOut: stateLive.timedOut,
payload: stateLive.payload,
},
};
}
function shouldRefreshAutomaticCloseout(
follower: FollowerSpec,
observedSha: string | null,
live: AdapterSummary,
phase: BranchFollowerPhase,
options: ParsedOptions,
): boolean {
if (!options.inCluster || !options.confirm || options.wait || options.dryRun) return false;
if (phase !== "ClosingOut" || observedSha === null || follower.nativeStatus.argo === null) return false;
const payload = asOptionalRecord(live.payload);
const tekton = asOptionalRecord(payload?.tekton);
if (tekton?.succeeded !== true) return false;
const argo = asOptionalRecord(payload?.argo);
const runtime = asOptionalRecord(payload?.runtime);
const argoReady = argo?.ready === true;
const runtimeAligned = runtime?.aligned === true;
return !argoReady || !runtimeAligned || live.targetSha !== observedSha;
}
async function readAdapterStatusAfterCloseoutAcceleration(
registry: BranchFollowerRegistry,
follower: FollowerSpec,
observedSha: string,
options: ParsedOptions,
): Promise<AdapterSummary> {
const timeoutSeconds = follower.budgets.statusSeconds;
const startedAt = Date.now();
const deadline = startedAt + Math.max(1, timeoutSeconds) * 1000;
let latest = await readAdapterStatus(registry, follower, { ...options, timeoutSeconds: Math.min(timeoutSeconds, remainingSeconds(startedAt, timeoutSeconds)) });
while (
latest.ok
&& latest.observedSha === observedSha
&& latest.aligned !== true
&& Date.now() < deadline
) {
const remaining = Math.max(0, deadline - Date.now());
const pollSeconds = Math.min(registry.controller.budgets.nativePollIntervalSeconds, Math.ceil(remaining / 1000));
if (pollSeconds <= 0) break;
runCommand(["sleep", String(pollSeconds)], repoRoot, { timeoutMs: (pollSeconds + 1) * 1000 });
if (Date.now() >= deadline) break;
latest = await readAdapterStatus(registry, follower, { ...options, timeoutSeconds: remainingSeconds(startedAt, timeoutSeconds) });
}
return latest;
}
function shouldFlushAutomaticCloseout(
follower: FollowerSpec,
observedSha: string | null,
live: AdapterSummary,
phase: BranchFollowerPhase,
options: ParsedOptions,
): boolean {
if (!options.inCluster || !options.confirm || options.wait || options.dryRun) return false;
if (phase !== "ClosingOut" || observedSha === null) return false;
const payload = asOptionalRecord(live.payload);
const gitMirror = asOptionalRecord(payload?.gitMirror);
if (!shouldFlushNativeGitMirrorDuringCloseout(follower, gitMirror)) return false;
const argo = asOptionalRecord(payload?.argo);
const runtime = asOptionalRecord(payload?.runtime);
const argoReady = argo === null || argo.ready === true;
const runtimeAligned = runtime === null ? live.targetSha === observedSha : runtime.aligned === true;
return argoReady && runtimeAligned;
}
async function executeTrigger(registry: BranchFollowerRegistry, follower: FollowerSpec, observedSha: string | null, options: ParsedOptions): Promise<TriggerResult> {
const spec = follower.commands.trigger;
const timeoutSeconds = options.timeoutSeconds ?? spec.timeoutSeconds;
if (follower.adapter === "hwlab-node-runtime" && options.inCluster) {
return await executeNativeHwlabNodeTrigger(registry, follower, observedSha, options, timeoutSeconds);
}
if (follower.adapter === "agentrun-yaml-lane" && options.inCluster) {
return await executeNativeAgentRunTrigger(registry, follower, observedSha, options, timeoutSeconds);
}
if (follower.adapter === "web-probe-sentinel-cicd" && options.inCluster) {
return await executeNativeSentinelTrigger(registry, follower, observedSha, options, timeoutSeconds);
}
if (!options.wait && !options.inCluster) {
const job = startJob(`cicd_branch_follower_${safeJobSegment(follower.id)}`, spec.argv, `Trigger ${follower.id} for observed sha ${observedSha ?? "unknown"}`);
return {
ok: true,
completed: false,
message: `started async job ${job.id}`,
jobId: job.id,
command: {
mode: "async-job",
argv: spec.argv,
jobId: job.id,
status: `bun scripts/cli.ts job status ${job.id}`,
},
};
}
const result = runCommand(spec.argv, repoRoot, { timeoutMs: timeoutSeconds * 1000 });
return {
ok: result.exitCode === 0,
completed: result.exitCode === 0 && options.wait,
message: result.exitCode === 0 ? "trigger command completed" : tailText(result.stderr || result.stdout, 500),
jobId: null,
command: commandCompact(result, options),
};
}
async function executeNativeHwlabNodeTrigger(registry: BranchFollowerRegistry, follower: FollowerSpec, observedSha: string | null, options: ParsedOptions, timeoutSeconds: number): Promise<TriggerResult> {
if (observedSha === null) {
return nativeTriggerError(follower, "native HWLAB trigger requires observed source sha", "observed-sha-missing");
}
if (follower.nativeStatus.tekton === null) {
return nativeTriggerError(follower, "native HWLAB trigger requires Tekton nativeStatus", "tekton-native-status-missing");
}
const spec = hwlabRuntimeLaneSpecForNode(follower.target.lane, follower.target.node);
const pipelineRun = nodeRuntimePipelineRunName(spec, observedSha);
const namespace = follower.nativeStatus.tekton.namespace;
const startedAt = Date.now();
const sync = runNativeGitMirrorStage(registry, follower, observedSha, "sync", Math.min(remainingSeconds(startedAt, timeoutSeconds), follower.budgets.sourceSyncSeconds));
if (sync !== null && !sync.result.ok) {
return nativeK8sStageFailure(follower, observedSha, "git-mirror-sync", sync.jobName, sync.result, { action: "sync" }, "native git-mirror sync failed", startedAt);
}
const reuseConfig = requireFollowerRuntimeReuseConfig(follower, observedSha, Math.min(remainingSeconds(startedAt, timeoutSeconds), follower.budgets.statusSeconds));
if (!reuseConfig.ok) return nativeReuseConfigFailure(follower, observedSha, reuseConfig, startedAt);
const hwlabReuseError = requiredReuseServiceError(reuseConfig, ["hwlab-cloud-api", "hwlab-runtime"], "runtimeReuse");
if (hwlabReuseError !== null) return nativeReuseConfigFailure(follower, observedSha, invalidRuntimeReuseConfig(reuseConfig, hwlabReuseError), startedAt);
const manifest = annotatePipelineRunReuseConfig(nodeRuntimePipelineRunManifest(spec, observedSha, pipelineRun), reuseConfig);
const refresh = runNativeHwlabControlPlaneRefresh(registry, follower, spec, observedSha, Math.min(remainingSeconds(startedAt, timeoutSeconds), follower.budgets.controlPlaneRefreshSeconds), nativeCapabilityJobName(follower.id, "control-plane-refresh", observedSha));
if (!refresh.result.ok) {
return nativeK8sStageFailure(follower, observedSha, "control-plane-refresh", refresh.jobName, refresh.result, { action: "control-plane-refresh" }, "native HWLAB control-plane refresh failed", startedAt);
}
const result = runNativeTektonPipelineRun(namespace, pipelineRun, manifest, options.wait, remainingSeconds(startedAt, timeoutSeconds), registry.controller.budgets);
const payload = parseJsonObject(result.stdout) ?? {};
const pipelineRunCompleted = payload.completed === true;
const failed = payload.failed === true || result.exitCode !== 0;
const flush = !failed && options.wait && pipelineRunCompleted
? runNativeGitMirrorStage(registry, follower, observedSha, "flush", remainingSeconds(startedAt, timeoutSeconds))
: null;
if (flush !== null && !flush.result.ok) {
return nativeK8sStageFailure(follower, observedSha, "git-mirror-flush", flush.jobName, flush.result, { action: "flush" }, "native git-mirror flush failed", startedAt);
}
const closeout = !failed && options.wait && pipelineRunCompleted
? await waitNativeFollowerCloseout(registry, follower, observedSha, options, remainingSeconds(startedAt, timeoutSeconds))
: null;
return nativeTektonTriggerResult({
follower,
observedSha,
namespace,
pipelineRun,
stageRef: `${follower.source.snapshotPrefix.replace(/\/+$/u, "")}/${observedSha}`,
wait: options.wait,
result,
startedAt,
payload: {
...payload,
nativeCapabilities: {
reuseConfig: summarizeRuntimeReuseConfig(reuseConfig),
gitMirrorSync: sync === null ? null : sync.result,
controlPlaneRefresh: refresh.result,
gitMirrorFlush: flush === null ? null : flush.result,
},
},
closeout,
successMessage: `native HWLAB PipelineRun ${pipelineRun} succeeded and runtime reached ${shortSha(observedSha)}`,
});
}
async function executeNativeAgentRunTrigger(registry: BranchFollowerRegistry, follower: FollowerSpec, observedSha: string | null, options: ParsedOptions, timeoutSeconds: number): Promise<TriggerResult> {
if (observedSha === null) {
return nativeTriggerError(follower, "native AgentRun trigger requires observed source sha", "observed-sha-missing");
}
if (follower.nativeStatus.tekton === null) {
return nativeTriggerError(follower, "native AgentRun trigger requires Tekton nativeStatus", "tekton-native-status-missing");
}
const { configPath, spec } = resolveAgentRunLaneTarget({ node: follower.target.node, lane: follower.target.lane });
const startedAt = Date.now();
const stageRef = `${follower.source.snapshotPrefix.replace(/\/+$/u, "")}/${observedSha}`;
const jobPrefix = `agentrun-bf-${spec.nodeId.toLowerCase()}-${spec.lane}`;
const sync = runNativeGitMirrorStage(registry, follower, observedSha, "sync", Math.min(remainingSeconds(startedAt, timeoutSeconds), follower.budgets.sourceSyncSeconds));
if (sync !== null && !sync.result.ok) {
return nativeK8sStageFailure(follower, observedSha, "git-mirror-sync", sync.jobName, sync.result, { action: "sync" }, "native AgentRun git-mirror sync failed", startedAt);
}
const reuseConfig = requireFollowerRuntimeReuseConfig(follower, observedSha, Math.min(remainingSeconds(startedAt, timeoutSeconds), follower.budgets.statusSeconds));
if (!reuseConfig.ok) return nativeReuseConfigFailure(follower, observedSha, reuseConfig, startedAt);
const agentRunReuseError = requiredReuseServiceError(reuseConfig, ["agentrun-mgr", "manager"], "envReuse");
if (agentRunReuseError !== null) return nativeReuseConfigFailure(follower, observedSha, invalidRuntimeReuseConfig(reuseConfig, agentRunReuseError), startedAt);
const effectiveSpec = applyAgentRunReuseConfig(spec, reuseConfig);
const buildJob = `${jobPrefix}-build-${observedSha.slice(0, 12)}`.slice(0, 63);
const build = runNativeK8sJob(effectiveSpec.ci.namespace, buildJob, yamlLaneK3sBuildImageJobManifest(effectiveSpec, observedSha, buildJob), Math.min(remainingSeconds(startedAt, timeoutSeconds), effectiveSpec.deployment.manager.imageBuild.timeoutSeconds), "buildkit", registry.controller.budgets);
const buildPayload = yamlLaneGitopsPublishPayloadFromProbe({ logsTail: stringOrNull(build.logsTail) ?? "" });
const digest = stringOrNull(buildPayload.digest);
const envIdentity = stringOrNull(buildPayload.envIdentity);
if (!build.ok || digest === null || envIdentity === null) {
return nativeK8sStageFailure(follower, observedSha, "image-build", buildJob, build, buildPayload, "native AgentRun image build failed", startedAt);
}
const image = agentRunImageArtifact(effectiveSpec, { sourceCommit: observedSha, envIdentity, digest, status: stringOrNull(buildPayload.status) ?? "built" });
const renderedFiles = renderAgentRunGitopsFiles(effectiveSpec, { sourceCommit: observedSha, image });
const publishJob = `${jobPrefix}-gitops-${observedSha.slice(0, 12)}`.slice(0, 63);
const publish = runNativeK8sJob(effectiveSpec.gitMirror.namespace, publishJob, yamlLaneGitopsPublishJobManifest(effectiveSpec, renderedFiles, publishJob), remainingSeconds(startedAt, timeoutSeconds), "publish", registry.controller.budgets);
const publishPayload = yamlLaneGitopsPublishPayloadFromProbe({ logsTail: stringOrNull(publish.logsTail) ?? "" });
if (!publish.ok || publishPayload.ok === false || stringOrNull(publishPayload.gitopsCommit) === null) {
return nativeK8sStageFailure(follower, observedSha, "gitops-publish", publishJob, publish, publishPayload, "native AgentRun GitOps publish failed", startedAt);
}
const flush = runNativeGitMirrorStage(registry, follower, observedSha, "flush", remainingSeconds(startedAt, timeoutSeconds));
if (flush !== null && !flush.result.ok) {
return nativeK8sStageFailure(follower, observedSha, "git-mirror-flush", flush.jobName, flush.result, { action: "flush" }, "native AgentRun git-mirror flush failed", startedAt);
}
const pipelineRun = agentRunPipelineRunName(effectiveSpec, observedSha);
const tektonResult = runNativeTektonPipelineRun(follower.nativeStatus.tekton.namespace, pipelineRun, yamlLanePipelineRunManifest(effectiveSpec, observedSha, pipelineRun), options.wait, remainingSeconds(startedAt, timeoutSeconds), registry.controller.budgets);
const tektonPayload = parseJsonObject(tektonResult.stdout) ?? {};
const pipelineRunCompleted = tektonPayload.completed === true;
const failed = tektonPayload.failed === true || tektonResult.exitCode !== 0;
const closeout = !failed && options.wait && pipelineRunCompleted
? await waitNativeFollowerCloseout(registry, follower, observedSha, options, remainingSeconds(startedAt, timeoutSeconds))
: null;
const trigger = nativeTektonTriggerResult({
follower,
observedSha,
namespace: follower.nativeStatus.tekton.namespace,
pipelineRun,
stageRef,
wait: options.wait,
result: tektonResult,
startedAt,
payload: {
...tektonPayload,
agentrun: {
configPath,
reuseConfig: summarizeRuntimeReuseConfig(reuseConfig),
gitMirrorSync: sync === null ? null : { jobName: sync.jobName, payload: sync.result },
imageBuild: { jobName: buildJob, result: build, payload: buildPayload },
gitopsPublish: { jobName: publishJob, result: publish, payload: publishPayload },
gitMirrorFlush: flush === null ? null : { jobName: flush.jobName, payload: flush.result },
},
},
closeout,
successMessage: `native AgentRun PipelineRun ${pipelineRun} succeeded and runtime reached ${shortSha(observedSha)}`,
});
return trigger;
}
function nativeTriggerError(follower: FollowerSpec, message: string, reason: string): TriggerResult {
return {
ok: false,
completed: false,
message,
jobId: null,
command: { mode: "k8s-native-tekton", adapter: follower.adapter, ok: false, reason, parsedDownstreamCliOutput: false },
};
}
function nativeK8sStageFailure(
follower: FollowerSpec,
observedSha: string,
phase: string,
jobName: string,
job: NativeK8sJobResult,
payload: Record<string, unknown>,
message: string,
startedAt?: number,
): TriggerResult {
const detail = [
message,
job.timedOut ? "timedOut=true" : null,
job.conditionReason === null ? null : `reason=${job.conditionReason}`,
job.conditionMessage === null ? null : `condition=${job.conditionMessage}`,
job.logsTail === null ? null : `logs=${tailText(job.logsTail, 500)}`,
].filter((item): item is string => item !== null).join("; ");
return {
ok: false,
completed: false,
message: detail,
jobId: jobName,
command: {
mode: "k8s-native-job",
adapter: follower.adapter,
phase,
jobName,
sourceCommit: observedSha,
ok: false,
startedAt: startedAt === undefined ? null : new Date(startedAt).toISOString(),
finishedAt: new Date().toISOString(),
elapsedMs: startedAt === undefined ? job.elapsedMs : Date.now() - startedAt,
payload,
job,
statusAuthority: "kubernetes-api-serviceaccount",
parsedDownstreamCliOutput: false,
},
};
}
function runNativeGitMirrorStage(registry: BranchFollowerRegistry, follower: FollowerSpec, observedSha: string, action: "sync" | "flush", timeoutSeconds: number, jobKey = observedSha): { jobName: string; namespace: string; result: NativeK8sJobResult } | null {
const job = nativeGitMirrorJobForFollower(follower, jobKey, action);
if (job === null) return null;
const result = runNativeK8sJob(job.namespace, job.jobName, job.manifest, timeoutSeconds, action, registry.controller.budgets);
return { jobName: job.jobName, namespace: job.namespace, result };
}
function nativeGitMirrorJobForFollower(
follower: FollowerSpec,
observedSha: string,
action: "sync" | "flush",
): { namespace: string; jobName: string; manifest: Record<string, unknown> } | null {
const jobName = nativeCapabilityJobName(follower.id, action, observedSha);
if (follower.adapter === "hwlab-node-runtime") {
const spec = hwlabRuntimeLaneSpecForNode(follower.target.lane, follower.target.node);
const mirror = nodeRuntimeGitMirrorTarget(spec);
return {
namespace: mirror.namespace,
jobName,
manifest: nodeRuntimeGitMirrorJobManifest(mirror, action, jobName),
};
}
if (follower.adapter === "agentrun-yaml-lane") {
const { spec } = resolveAgentRunLaneTarget({ node: follower.target.node, lane: follower.target.lane });
return {
namespace: spec.gitMirror.namespace,
jobName,
manifest: yamlLaneGitMirrorJobManifest(spec, action, jobName),
};
}
return null;
}
function nativeTektonTriggerResult(input: {
follower: FollowerSpec;
observedSha: string;
namespace: string;
pipelineRun: string;
stageRef: string;
wait: boolean;
result: CommandResult;
startedAt: number;
payload: Record<string, unknown>;
closeout: NativeCloseoutWaitResult | null;
successMessage: string;
}): TriggerResult {
const pipelineRunCompleted = input.payload.completed === true;
const failed = input.payload.failed === true || input.result.exitCode !== 0;
const stillRunning = input.payload.stillRunning === true || input.payload.timedOutWait === true;
const message = failed
? nativeTektonFailureText(input.payload, input.result)
: input.closeout?.completed === true
? input.successMessage
: input.closeout?.timedOut === true
? `native PipelineRun ${input.pipelineRun} succeeded but runtime closeout did not converge within budget`
: pipelineRunCompleted
? `native PipelineRun ${input.pipelineRun} succeeded; runtime closeout remains k8s-native`
: stillRunning
? `native PipelineRun ${input.pipelineRun} is still running; query status/events/logs for closeout`
: `native PipelineRun ${input.pipelineRun} submitted`;
const ok = !failed && (input.closeout === null || input.closeout.completed === true);
const finishedAt = failed || input.result.timedOut || input.closeout?.completed === true || input.closeout?.timedOut === true
? new Date().toISOString()
: null;
return {
ok,
completed: input.closeout?.completed === true,
message,
jobId: input.pipelineRun,
command: {
mode: "k8s-native-tekton",
adapter: input.follower.adapter,
namespace: input.namespace,
pipelineRun: input.pipelineRun,
sourceCommit: input.observedSha,
sourceStageRef: input.stageRef,
wait: input.wait,
pipelineRunCompleted,
stillRunning,
startedAt: new Date(input.startedAt).toISOString(),
finishedAt,
elapsedMs: Date.now() - input.startedAt,
closeout: input.closeout,
statusAuthority: "kubernetes-api-serviceaccount",
parsedDownstreamCliOutput: false,
payload: input.payload,
exitCode: input.result.exitCode,
timedOut: input.result.timedOut,
stderrTail: failed ? redactText(tailText(input.result.stderr, 1000)) : "",
},
};
}
function nativeTektonFailureText(payload: Record<string, unknown>, result: CommandResult): string {
return [
stringOrNull(payload.message),
stringOrNull(payload.conditionReason),
stringOrNull(payload.conditionMessage),
payload.failed === true ? "PipelineRun failed" : null,
result.timedOut ? "timedOut=true" : null,
].filter((item): item is string => item !== null && item.length > 0).join("; ")
|| "native PipelineRun failed; query events/logs for structured details";
}
function remainingSeconds(startedAt: number, timeoutSeconds: number): number {
return Math.max(1, timeoutSeconds - Math.ceil((Date.now() - startedAt) / 1000));
}
async function executeNativeSentinelTrigger(registry: BranchFollowerRegistry, follower: FollowerSpec, observedSha: string | null, options: ParsedOptions, timeoutSeconds: number): Promise<TriggerResult> {
if (observedSha === null) {
return {
ok: false,
completed: false,
message: "native sentinel trigger requires observed source sha",
jobId: null,
command: { mode: "k8s-native-tekton", ok: false, reason: "observed-sha-missing", parsedDownstreamCliOutput: false },
};
}
if (follower.target.sentinel === null) {
return {
ok: false,
completed: false,
message: "native sentinel trigger requires target.sentinel",
jobId: null,
command: { mode: "k8s-native-tekton", ok: false, reason: "target-sentinel-missing", parsedDownstreamCliOutput: false },
};
}
const spec = hwlabRuntimeLaneSpecForNode(follower.target.lane, follower.target.node);
const stageRef = `${follower.source.snapshotPrefix.replace(/\/+$/u, "")}/${observedSha}`;
const reuseConfig = requireFollowerRuntimeReuseConfig(follower, observedSha, Math.min(timeoutSeconds, follower.budgets.statusSeconds));
if (!reuseConfig.ok) return nativeReuseConfigFailure(follower, observedSha, reuseConfig);
const sentinelReuseError = requiredReuseServiceError(reuseConfig, ["web-probe-sentinel", "monitor-web"], "envReuse");
if (sentinelReuseError !== null) return nativeReuseConfigFailure(follower, observedSha, invalidRuntimeReuseConfig(reuseConfig, sentinelReuseError));
const state = loadSentinelCicdState(spec, follower.target.sentinel, timeoutSeconds, "cached", {
commit: observedSha,
stageRef,
mirrorCommit: observedSha,
sourceAuthority: "git-mirror-snapshot",
}, reuseConfig);
const pipelineRun = sentinelPipelineRunName(state, false);
const namespace = stringField(recordField(state.cicd, "builder", `${follower.id}.sentinel.cicd`), "namespace", `${follower.id}.sentinel.cicd.builder`);
const manifest = sentinelPublishPipelineRunManifest(state, pipelineRun, true);
const startedAt = Date.now();
const result = runNativeTektonPipelineRun(namespace, pipelineRun, manifest, options.wait, timeoutSeconds, registry.controller.budgets);
const payload = parseJsonObject(result.stdout) ?? {};
const pipelineRunCompleted = payload.completed === true;
const failed = payload.failed === true || result.exitCode !== 0;
const stillRunning = payload.stillRunning === true || payload.timedOutWait === true;
const remainingSeconds = Math.max(1, timeoutSeconds - Math.ceil((Date.now() - startedAt) / 1000));
const closeout = !failed && options.wait && pipelineRunCompleted
? await waitNativeSentinelCloseout(registry, follower, observedSha, options, remainingSeconds)
: null;
const message = failed
? tailText(result.stderr || stringOrNull(payload.message) || result.stdout, 500)
: closeout?.completed === true
? `native sentinel PipelineRun ${pipelineRun} succeeded and runtime reached ${shortSha(observedSha)}`
: closeout?.timedOut === true
? `native sentinel PipelineRun ${pipelineRun} succeeded but runtime closeout did not converge within budget`
: pipelineRunCompleted
? `native sentinel PipelineRun ${pipelineRun} succeeded; runtime closeout remains k8s-native`
: stillRunning
? `native sentinel PipelineRun ${pipelineRun} is still running; query status/events/logs for closeout`
: `native sentinel PipelineRun ${pipelineRun} submitted`;
const ok = !failed && (closeout === null || closeout.completed === true);
const finishedAt = failed || result.timedOut || closeout?.completed === true || closeout?.timedOut === true
? new Date().toISOString()
: null;
return {
ok,
completed: closeout?.completed === true,
message,
jobId: pipelineRun,
command: {
mode: "k8s-native-tekton",
adapter: follower.adapter,
namespace,
pipelineRun,
sourceCommit: observedSha,
sourceStageRef: stageRef,
wait: options.wait,
pipelineRunCompleted,
stillRunning,
startedAt: new Date(startedAt).toISOString(),
finishedAt,
elapsedMs: Date.now() - startedAt,
closeout,
reuseConfig: summarizeRuntimeReuseConfig(reuseConfig),
statusAuthority: "kubernetes-api-serviceaccount",
parsedDownstreamCliOutput: false,
payload,
exitCode: result.exitCode,
timedOut: result.timedOut,
stderrTail: failed ? redactText(tailText(result.stderr, Math.min(options.tailBytes, 4000))) : "",
},
};
}
async function waitNativeSentinelCloseout(
registry: BranchFollowerRegistry,
follower: FollowerSpec,
observedSha: string,
options: ParsedOptions,
timeoutSeconds: number,
): Promise<NativeCloseoutWaitResult> {
const refreshResult = follower.nativeStatus.argo === null
? null
: runNativeArgoRefresh(follower.nativeStatus.argo, Math.min(timeoutSeconds, follower.budgets.controlPlaneRefreshSeconds));
const startedAt = Date.now();
const deadline = startedAt + Math.max(1, timeoutSeconds) * 1000;
let polls = 0;
let latest: AdapterSummary | null = null;
let gitMirrorFlush: Record<string, unknown> | null = null;
while (Date.now() <= deadline) {
polls += 1;
const remainingSeconds = Math.max(1, Math.ceil((deadline - Date.now()) / 1000));
latest = await readAdapterStatus(registry, follower, { ...options, timeoutSeconds: Math.min(10, remainingSeconds) });
const latestPayload = asOptionalRecord(latest.payload);
const latestGitMirror = asOptionalRecord(latestPayload?.gitMirror);
if (latest.observedSha === observedSha && asOptionalRecord(latestPayload?.tekton)?.succeeded === true && gitMirrorFlush === null && shouldFlushNativeGitMirrorDuringCloseout(follower, latestGitMirror)) {
const flush = runNativeGitMirrorStage(registry, follower, observedSha, "flush", Math.min(remainingSeconds, follower.budgets.sourceSyncSeconds), stringOrNull(latestGitMirror?.localGitops) ?? observedSha);
gitMirrorFlush = flush === null ? null : {
jobName: flush.jobName,
namespace: flush.namespace,
result: compactNativeK8sJobResult(flush.result),
};
if (flush !== null && !flush.result.ok) {
return {
ok: false,
completed: false,
timedOut: flush.result.timedOut,
polls,
elapsedMs: Date.now() - startedAt,
refresh: refreshResult === null ? null : commandCompact(refreshResult, options),
gitMirrorFlush,
summary: nativeCloseoutSummary(latest),
statusAuthority: "k8s-native",
parsedDownstreamCliOutput: false,
};
}
continue;
}
if (latest.observedSha === observedSha && latest.aligned === true) {
return {
ok: true,
completed: true,
timedOut: false,
polls,
elapsedMs: Date.now() - startedAt,
refresh: refreshResult === null ? null : commandCompact(refreshResult, options),
gitMirrorFlush,
summary: nativeCloseoutSummary(latest),
statusAuthority: "k8s-native",
parsedDownstreamCliOutput: false,
};
}
if (Date.now() + 2_000 > deadline) break;
runCommand(["sleep", "2"], repoRoot, { timeoutMs: 3_000 });
}
return {
ok: false,
completed: false,
timedOut: true,
polls,
elapsedMs: Date.now() - startedAt,
refresh: refreshResult === null ? null : commandCompact(refreshResult, options),
gitMirrorFlush,
summary: latest === null ? null : nativeCloseoutSummary(latest),
statusAuthority: "k8s-native",
parsedDownstreamCliOutput: false,
};
}
function requireFollowerRuntimeReuseConfig(follower: FollowerSpec, observedSha: string, timeoutSeconds: number): RuntimeReuseConfig {
const stageRef = `${follower.source.snapshotPrefix.replace(/\/+$/u, "")}/${observedSha}`;
const objectRef = `${stageRef}:${RUNTIME_REUSE_CONFIG_PATH}`;
const result = runCommand(["git", "--git-dir", follower.nativeStatus.source.repoPath, "show", objectRef], repoRoot, { timeoutMs: Math.max(1, timeoutSeconds) * 1000 });
if (result.exitCode !== 0) {
return missingRuntimeReuseConfig({ sourceCommit: observedSha, stageRef }, redactText(tailText(result.stderr || result.stdout || `${RUNTIME_REUSE_CONFIG_PATH} missing`, 500)));
}
return parseRuntimeReuseConfig(result.stdout, { sourceCommit: observedSha, stageRef });
}
function nativeReuseConfigFailure(follower: FollowerSpec, observedSha: string, reuseConfig: RuntimeReuseConfig, startedAt?: number): TriggerResult {
return {
ok: false,
completed: false,
message: `${RUNTIME_REUSE_CONFIG_PATH} is required for ${follower.id}: ${reuseConfig.errors[0] ?? "invalid reuse config"}`,
jobId: null,
command: {
mode: "k8s-native-reuse-config",
adapter: follower.adapter,
sourceCommit: observedSha,
ok: false,
startedAt: startedAt === undefined ? null : new Date(startedAt).toISOString(),
finishedAt: new Date().toISOString(),
elapsedMs: startedAt === undefined ? null : Date.now() - startedAt,
reuseConfig: summarizeRuntimeReuseConfig(reuseConfig),
statusAuthority: "git-mirror-snapshot",
parsedDownstreamCliOutput: false,
},
};
}
function requiredReuseServiceError(reuseConfig: RuntimeReuseConfig, ids: readonly string[], mode: "runtimeReuse" | "envReuse"): string | null {
const service = runtimeReuseService(reuseConfig, ids);
if (service === null) return `${RUNTIME_REUSE_CONFIG_PATH} must declare service ${ids.join("|")}`;
if (mode === "runtimeReuse") {
if (service.runtimeReuse === null) return `${RUNTIME_REUSE_CONFIG_PATH} service ${service.id} must declare runtimeReuse`;
if (service.runtimeReuse.enabled === false) return `${RUNTIME_REUSE_CONFIG_PATH} service ${service.id} runtimeReuse is disabled`;
if (service.runtimeReuse.codeIdentityPaths.length === 0 && service.runtimeReuse.envIdentityPaths.length === 0) return `${RUNTIME_REUSE_CONFIG_PATH} service ${service.id} runtimeReuse must declare codeIdentity or envIdentity paths`;
return null;
}
if (service.envReuse === null) return `${RUNTIME_REUSE_CONFIG_PATH} service ${service.id} must declare envReuse`;
if (service.envReuse.enabled === false) return `${RUNTIME_REUSE_CONFIG_PATH} service ${service.id} envReuse is disabled`;
if (service.envReuse.envIdentityFiles.length === 0 && service.envReuse.nodeDepsPath === null) return `${RUNTIME_REUSE_CONFIG_PATH} service ${service.id} envReuse must declare envIdentityFiles or nodeDepsPath`;
return null;
}
function annotatePipelineRunReuseConfig(manifest: Record<string, unknown>, reuseConfig: RuntimeReuseConfig): Record<string, unknown> {
const metadata = asOptionalRecord(manifest.metadata) ?? {};
const annotations = asOptionalRecord(metadata.annotations) ?? {};
metadata.annotations = {
...annotations,
"unidesk.ai/reuse-config-path": reuseConfig.path,
"unidesk.ai/reuse-config-sha256": reuseConfig.sha256 ?? "",
};
manifest.metadata = metadata;
return manifest;
}
function applyAgentRunReuseConfig(spec: ReturnType<typeof resolveAgentRunLaneTarget>["spec"], reuseConfig: RuntimeReuseConfig): ReturnType<typeof resolveAgentRunLaneTarget>["spec"] {
const service = runtimeReuseService(reuseConfig, ["agentrun-mgr", "manager"]);
const envReuse = service?.envReuse;
if (envReuse === undefined || envReuse === null) return spec;
if (envReuse.enabled === false) return spec;
const imageBuild = spec.deployment.manager.imageBuild;
return {
...spec,
deployment: {
...spec.deployment,
manager: {
...spec.deployment.manager,
imageBuild: {
...imageBuild,
buildArgs: Object.keys(envReuse.buildArgs).length === 0 ? imageBuild.buildArgs : envReuse.buildArgs,
envIdentityFiles: envReuse.envIdentityFiles.length === 0 ? imageBuild.envIdentityFiles : envReuse.envIdentityFiles,
},
},
},
};
}
function shouldFlushNativeGitMirrorDuringCloseout(follower: FollowerSpec, gitMirror: Record<string, unknown> | null): boolean {
if (!nativeGitMirrorRequired(follower) || gitMirror === null) return false;
if (gitMirror.sourceSnapshotReady === false) return false;
return gitMirror.pendingFlush === true || gitMirror.githubInSync === false;
}
function compactNativeK8sJobResult(result: NativeK8sJobResult): Record<string, unknown> {
return {
ok: result.ok,
completed: result.completed,
failed: result.failed,
timedOut: result.timedOut,
created: result.created,
reused: result.reused,
jobName: result.jobName,
namespace: result.namespace,
polls: result.polls,
elapsedMs: result.elapsedMs,
conditionReason: result.conditionReason,
conditionMessage: result.conditionMessage,
statusAuthority: result.statusAuthority,
parsedDownstreamCliOutput: false,
};
}
async function waitNativeFollowerCloseout(
registry: BranchFollowerRegistry,
follower: FollowerSpec,
observedSha: string,
options: ParsedOptions,
timeoutSeconds: number,
): Promise<NativeCloseoutWaitResult> {
return await waitNativeSentinelCloseout(registry, follower, observedSha, options, timeoutSeconds);
}
function closeoutOnlyCommand(follower: FollowerSpec, pipelineRun: string | null, observedSha: string, closeout: NativeCloseoutWaitResult): Record<string, unknown> {
return {
mode: "k8s-native-closeout",
adapter: follower.adapter,
pipelineRun,
sourceCommit: observedSha,
wait: true,
closeout,
finishedAt: closeout.completed || closeout.timedOut ? new Date().toISOString() : null,
elapsedMs: closeout.elapsedMs,
exitCode: closeout.completed ? 0 : 1,
timedOut: closeout.timedOut,
statusAuthority: "k8s-native",
parsedDownstreamCliOutput: false,
};
}
function nativeCloseoutSummary(live: AdapterSummary): Record<string, unknown> {
const payload = asOptionalRecord(live.payload);
return {
ok: live.ok,
phase: live.phase,
observedSha: live.observedSha,
targetSha: live.targetSha,
aligned: live.aligned,
pipelineRun: live.pipelineRun,
pipelineRunPresent: live.pipelineRunPresent,
message: live.message,
gitMirror: asOptionalRecord(payload?.gitMirror),
reuseConfig: summarizeRuntimeReuseConfigFromRecord(asOptionalRecord(payload?.reuseConfig)),
tekton: asOptionalRecord(payload?.tekton),
taskRuns: compactTaskRunsPayload(asOptionalRecord(payload?.taskRuns)),
planArtifacts: compactPlanArtifactsPayload(asOptionalRecord(payload?.planArtifacts)),
argo: asOptionalRecord(payload?.argo),
runtime: asOptionalRecord(payload?.runtime),
errors: Array.isArray(payload?.errors) ? payload.errors.slice(0, 5) : [],
};
}
function runNativeArgoRefresh(argo: NonNullable<NativeStatusSpec["argo"]>, timeoutSeconds: number): CommandResult {
const patchBase64 = Buffer.from(JSON.stringify({
metadata: {
annotations: {
"argocd.argoproj.io/refresh": "hard",
},
},
}), "utf8").toString("base64");
const script = [
"set -eu",
`ARGO_NAMESPACE=${shQuote(argo.namespace)}`,
`ARGO_APPLICATION=${shQuote(argo.application)}`,
`PATCH_B64=${shQuote(patchBase64)}`,
"export ARGO_NAMESPACE ARGO_APPLICATION PATCH_B64",
"node <<'NODE_ARGO_REFRESH'",
"import { readFileSync } from 'node:fs';",
"import https from 'node:https';",
"const namespace = process.env.ARGO_NAMESPACE || '';",
"const application = process.env.ARGO_APPLICATION || '';",
"const patch = Buffer.from(process.env.PATCH_B64 || '', 'base64').toString('utf8');",
"const host = process.env.KUBERNETES_SERVICE_HOST;",
"const port = Number(process.env.KUBERNETES_SERVICE_PORT || '443');",
"const token = readFileSync('/var/run/secrets/kubernetes.io/serviceaccount/token', 'utf8').trim();",
"const ca = readFileSync('/var/run/secrets/kubernetes.io/serviceaccount/ca.crt');",
"const path = `/apis/argoproj.io/v1alpha1/namespaces/${encodeURIComponent(namespace)}/applications/${encodeURIComponent(application)}`;",
"const req = https.request({ host, port, path, method: 'PATCH', ca, headers: { authorization: `Bearer ${token}`, 'content-type': 'application/merge-patch+json', 'content-length': Buffer.byteLength(patch) } }, (res) => {",
" let body = '';",
" res.setEncoding('utf8');",
" res.on('data', (chunk) => { body += chunk; });",
" res.on('end', () => {",
" if ((res.statusCode || 0) < 200 || (res.statusCode || 0) >= 300) { process.stderr.write(body || `kube api PATCH argo application status ${res.statusCode}`); process.exit(1); }",
" process.stdout.write(JSON.stringify({ ok: true, namespace, application, refresh: 'hard', statusAuthority: 'kubernetes-api-serviceaccount', parsedDownstreamCliOutput: false, valuesRedacted: true }));",
" });",
"});",
"req.on('error', (error) => { process.stderr.write(error?.message || String(error)); process.exit(1); });",
"req.write(patch);",
"req.end();",
"NODE_ARGO_REFRESH",
].join("\n");
return runCommand(["sh", "-lc", script], repoRoot, { timeoutMs: Math.max(1, timeoutSeconds) * 1000 });
}
async function readAdapterStatus(registry: BranchFollowerRegistry, follower: FollowerSpec, options: ParsedOptions): Promise<AdapterSummary> {
const timeoutSeconds = options.timeoutSeconds ?? follower.budgets.statusSeconds;
const startedAt = Date.now();
const sourceSync = runNativeSourceObservationSync(registry, follower, options, Math.min(timeoutSeconds, follower.budgets.sourceSyncSeconds));
const sourceSyncDetail = sourceSync === null || sourceSync.result.ok ? null : redactText(tailText(sourceSync.result.conditionMessage ?? sourceSync.result.logsTail ?? "unknown", 800));
const sourceSyncError = sourceSyncDetail === null ? null : `native source sync failed: ${sourceSyncDetail}`;
const bundle = readNativeObjectBundle(registry, follower, options, remainingSeconds(startedAt, timeoutSeconds), runKubeScript);
const observedSha = sourceSyncError === null ? stringOrNull(bundle.source?.commit) : null;
const runtimeTargetSha = runtimeTargetShaFromWorkloads(follower.nativeStatus.runtime, bundle.workloads);
const pipelineRunName = stringOrNull(asOptionalRecord(bundle.pipelineRun?.metadata)?.name) ?? expectedPipelineRunName(follower, observedSha);
const pipelineRunPresent = follower.nativeStatus.tekton === null ? null : bundle.pipelineRun !== null;
const pipelineSucceeded = pipelineRunSucceeded(bundle.pipelineRun);
const argoReady = follower.nativeStatus.argo === null ? null : argoApplicationReady(bundle.argoApplication);
const runtimeReady = follower.nativeStatus.runtime === null ? null : runtimeWorkloadsReady(follower.nativeStatus.runtime, bundle.workloads);
const gitMirrorRequired = nativeGitMirrorRequired(follower);
const gitMirrorReady = gitMirrorRequired ? nativeGitMirrorReady(bundle.gitMirror) : null;
const hasTektonGate = follower.nativeStatus.tekton !== null;
const hasRuntimeTarget = runtimeTargetSha !== null;
const requiresRuntimeTarget = follower.nativeStatus.runtime !== null;
const hasTargetEvidence = hasRuntimeTarget || (!requiresRuntimeTarget && hasTektonGate);
const pipelineGateOk = !hasTektonGate
|| pipelineSucceeded === true
|| (hasRuntimeTarget && runtimeTargetSha === observedSha && argoReady !== false && runtimeReady !== false);
const aligned = observedSha !== null
&& hasTargetEvidence
&& (requiresRuntimeTarget ? hasRuntimeTarget && runtimeTargetSha === observedSha : true)
&& pipelineGateOk
&& gitMirrorReady !== false
&& argoReady !== false
&& runtimeReady !== false;
const targetSha = hasRuntimeTarget && runtimeTargetSha === observedSha && aligned ? runtimeTargetSha : runtimeTargetSha;
const ok = bundle.ok && sourceSyncError === null;
const phase = inferPhase(ok, aligned, observedSha, targetSha, bundle.timedOut);
const errors = sourceSyncError === null ? bundle.errors : [sourceSyncError, ...bundle.errors];
return {
ok,
command: "native:k8s-git-mirror+tekton+argocd+runtime",
exitCode: bundle.exitCode,
timedOut: bundle.timedOut,
observedSha,
targetSha,
lastTriggeredSha: null,
lastSucceededSha: aligned && observedSha !== null ? observedSha : null,
pipelineRun: pipelineRunName,
pipelineRunPresent,
inFlightJob: pipelineRunPresent === true && pipelineSucceeded === null && pipelineRunName !== null ? pipelineRunName : null,
aligned,
phase,
message: nativeStatusMessage(ok, phase, observedSha, targetSha, {
pipelineSucceeded,
gitMirrorReady,
argoReady,
runtimeReady,
errors,
}),
payload: {
source: bundle.source,
sourceSync: sourceSync === null ? null : sourceSync.result,
reuseConfig: observedSha === null ? null : summarizeRuntimeReuseConfig(requireFollowerRuntimeReuseConfig(follower, observedSha, Math.min(timeoutSeconds, 5))),
gitMirror: nativeGitMirrorSummary(bundle.gitMirror),
tekton: nativePipelineRunSummary(bundle.pipelineRun),
taskRuns: bundle.taskRuns,
planArtifacts: bundle.planArtifacts,
argo: nativeArgoSummary(bundle.argoApplication),
runtime: nativeRuntimeSummary(follower.nativeStatus.runtime, bundle.workloads, observedSha),
timings: { statusRead: { elapsedMs: bundle.elapsedMs, budgetSeconds: timeoutSeconds } },
errors,
statusAuthority: "k8s-native",
parsedDownstreamCliOutput: false,
},
stderrTail: bundle.stderrTail,
stdoutTail: bundle.stdoutTail,
};
}
function runNativeSourceObservationSync(registry: BranchFollowerRegistry, follower: FollowerSpec, options: ParsedOptions, timeoutSeconds: number): { jobName: string; namespace: string; result: NativeK8sJobResult } | null {
if (!options.inCluster) return null;
if (follower.adapter !== "hwlab-node-runtime" && follower.adapter !== "agentrun-yaml-lane") return null;
const bucket = Math.floor(Date.now() / (registry.controller.loop.intervalSeconds * 1000)).toString(36);
return runNativeGitMirrorStage(registry, follower, "source", "sync", timeoutSeconds, `source-${bucket}`);
}
function inferPhase(ok: boolean, aligned: boolean | null, observedSha: string | null, targetSha: string | null, timedOut: boolean): BranchFollowerPhase {
if (!ok || timedOut) return "Blocked";
if (aligned === true) return "Succeeded";
if (observedSha !== null && targetSha !== null && observedSha === targetSha) return "ClosingOut";
if (observedSha !== null) return "PendingTrigger";
return "Observed";
}
function nativeStatusMessage(ok: boolean, phase: BranchFollowerPhase, observedSha: string | null, targetSha: string | null, gates: { pipelineSucceeded: boolean | null; gitMirrorReady: boolean | null; argoReady: boolean | null; runtimeReady: boolean | null; errors: string[] }): string {
if (!ok) return gates.errors[0] ?? "native status read failed";
if (phase === "Noop" || phase === "Succeeded") return `target matches ${shortSha(observedSha)}`;
if (observedSha !== null) {
const gatesText = [
gates.pipelineSucceeded === false ? "pipelineRun not successful" : null,
gates.gitMirrorReady === false ? "git-mirror not flushed" : null,
gates.argoReady === false ? "argo not healthy/synced" : null,
gates.runtimeReady === false ? "runtime not ready" : null,
].filter((item): item is string => item !== null).join("; ");
if (targetSha !== null) return gatesText.length > 0 ? `observed ${shortSha(observedSha)} target ${shortSha(targetSha)}; ${gatesText}` : `observed ${shortSha(observedSha)} target ${shortSha(targetSha)}`;
return gatesText.length > 0 ? `observed ${shortSha(observedSha)}; ${gatesText}` : `observed ${shortSha(observedSha)} target unknown`;
}
return "k8s-native status did not expose observed source sha";
}
function expectedPipelineRunName(follower: FollowerSpec, observedSha: string | null): string | null {
if (observedSha === null || follower.nativeStatus.tekton === null) return null;
return `${follower.nativeStatus.tekton.pipelineRunPrefix}-${shortSha(observedSha)}`;
}
function mergeFollowerStatus(
registry: BranchFollowerRegistry,
follower: FollowerSpec,
stored: Record<string, unknown>,
live: AdapterSummary | null,
liveRequested: boolean,
detailed: boolean,
): Record<string, unknown> {
const storedSource = asOptionalRecord(stored.source);
const storedTarget = asOptionalRecord(stored.target);
const phase = live?.phase ?? stringOrNull(stored.phase) ?? "Observed";
const observedSha = live?.observedSha ?? stringOrNull(storedSource?.observedSha);
const targetSha = live?.targetSha ?? stringOrNull(storedTarget?.targetSha);
const lastTriggeredSha = live?.lastTriggeredSha ?? stringOrNull(stored.lastTriggeredSha);
const lastSucceededSha = live?.lastSucceededSha ?? stringOrNull(stored.lastSucceededSha);
const timings = live === null ? storedFollowerTimingsForStatus(follower, asOptionalRecord(stored.timings), phase, observedSha) : buildFollowerTimings(follower, live, undefined, asOptionalRecord(stored.timings), phase);
const summary: Record<string, unknown> = {
ok: live === null ? true : live.ok,
id: follower.id,
enabled: follower.enabled,
adapter: follower.adapter,
phase,
source: {
repository: follower.source.repository,
branch: follower.source.branch,
observedSha,
},
target: {
node: follower.target.node,
lane: follower.target.lane,
namespace: follower.target.namespace,
sentinel: follower.target.sentinel,
targetSha,
},
lastTriggeredSha,
lastSucceededSha,
pipelineRun: live?.pipelineRun ?? stringOrNull(stored.pipelineRun),
inFlightJob: live?.inFlightJob ?? stringOrNull(stored.inFlightJob),
updatedAt: stringOrNull(stored.updatedAt),
live: liveRequested,
message: live?.message ?? stringOrNull(stored.decision) ?? "no controller state yet",
timings: detailed ? timings : compactListTimings(timings),
drilldown: `bun scripts/cli.ts cicd branch-follower status --follower ${follower.id} --live`,
};
if (!detailed) return summary;
return {
...summary,
source: {
...asOptionalRecord(summary.source),
snapshotPrefix: follower.source.snapshotPrefix,
},
budgetSource: follower.budgets,
stateConfigMap: registry.controller.stateConfigMapName,
warnings: Array.isArray(stored.warnings) ? stored.warnings.slice(0, 6) : [],
next: followerNextCommands(follower),
};
}
function compactListTimings(timings: FollowerState["timings"]): Record<string, unknown> {
return {
budgetSeconds: timings.budgetSeconds,
totalSeconds: timings.totalSeconds,
totalStatus: timings.totalStatus,
sourceCommit: timings.sourceCommit,
overBudget: timings.overBudget,
stages: timings.stages.slice(0, 4).map((stage) => ({
stage: stage.stage,
status: stage.status,
seconds: stage.seconds,
})),
};
}
function readK8sState(registry: BranchFollowerRegistry, options: ParsedOptions): K8sStateRead {
const errors: string[] = [];
const stateResult = kubeConfigMapFollowerState(registry, options);
const namespace = registry.controller.namespace;
const deploymentResult = kubeJson(registry, options, `kubectl -n ${shQuote(namespace)} get deploy ${shQuote(registry.controller.deploymentName)} -o json`, 10_000, `/apis/apps/v1/namespaces/${encodeURIComponent(namespace)}/deployments/${encodeURIComponent(registry.controller.deploymentName)}`);
const leaseResult = kubeJson(registry, options, `kubectl -n ${shQuote(namespace)} get lease ${shQuote(registry.controller.leaseName)} -o json`, 10_000, `/apis/coordination.k8s.io/v1/namespaces/${encodeURIComponent(namespace)}/leases/${encodeURIComponent(registry.controller.leaseName)}`);
const podSelector = labelSelector(registry.controller.labels);
const podsResult = kubePodList(registry, options, podSelector);
if (!stateResult.ok) errors.push(`state configmap: ${stateResult.error}`);
if (!deploymentResult.ok && !isNotFoundText(deploymentResult.error)) errors.push(`deployment: ${deploymentResult.error}`);
if (!leaseResult.ok && !isNotFoundText(leaseResult.error)) errors.push(`lease: ${leaseResult.error}`);
if (!podsResult.ok && !isNotFoundText(podsResult.error)) errors.push(`pods: ${podsResult.error}`);
return {
ok: errors.length === 0,
stateByFollower: stateResult.stateByFollower,
stateMetadata: stateResult.metadata,
stateValueBytes: stateResult.valueBytes,
stateConfigMapPresent: stateResult.present,
deployment: deploymentResult.value,
lease: leaseResult.value,
pods: podsResult.value,
errors,
};
}
function kubeConfigMapFollowerState(registry: BranchFollowerRegistry, options: ParsedOptions): K8sFollowerStateRead {
const followers = options.followerId === null ? registry.followers : registry.followers.filter((follower) => follower.id === options.followerId);
const maxTimingStages = options.followerId === null ? 8 : 16;
const script = [
"set -eu",
"tmpdir=$(mktemp -d)",
"cleanup() { rm -rf \"$tmpdir\"; }",
"trap cleanup EXIT INT TERM",
nativeCicdScriptLoadShell(["read-state-summary.mjs"]),
`NAMESPACE=${shQuote(registry.controller.namespace)}`,
`CONFIGMAP=${shQuote(registry.controller.stateConfigMapName)}`,
`FOLLOWERS_JSON=${shQuote(JSON.stringify(followers.map((follower) => follower.id)))}`,
`MAX_TIMING_STAGES=${maxTimingStages}`,
"export NAMESPACE CONFIGMAP FOLLOWERS_JSON MAX_TIMING_STAGES",
"node \"$tmpdir/read-state-summary.mjs\"",
].join("\n");
const result = runKubeScript(registry, options, script, "", 10_000);
const parsed = result.exitCode === 0 ? parseJsonObject(result.stdout) : null;
if (parsed === null) {
const error = redactText(tailText(result.stderr || result.stdout, 800));
return { ok: false, stateByFollower: {}, present: false, error };
}
const parsedStates = asOptionalRecord(parsed.stateByFollower) ?? {};
const metadata = asOptionalRecord(parsed.metadata);
const parsedValueBytes = asOptionalRecord(parsed.valueBytes) ?? {};
const stateByFollower: Record<string, Record<string, unknown>> = {};
const valueBytes: Record<string, number> = {};
for (const follower of followers) {
const state = asOptionalRecord(parsedStates[follower.id]);
if (state !== null) stateByFollower[follower.id] = state;
const bytes = numberOrNull(parsedValueBytes[follower.id]);
if (bytes !== null) valueBytes[follower.id] = bytes;
}
const errors = Array.isArray(parsed.errors) ? parsed.errors.map(String).filter((item) => item.length > 0) : [];
return {
ok: parsed.ok === true && errors.length === 0,
stateByFollower,
metadata,
valueBytes,
present: parsed.present === true,
error: errors.join("; "),
};
}
function stateWriteSummary(followerId: string, result: CommandResult): Record<string, unknown> {
const parsed = result.exitCode === 0 ? parseJsonObject(result.stdout) : null;
return {
follower: followerId,
ok: result.exitCode === 0 && parsed?.ok !== false,
exitCode: result.exitCode,
timedOut: result.timedOut,
beforeResourceVersion: stringOrNull(parsed?.beforeResourceVersion),
afterResourceVersion: stringOrNull(parsed?.afterResourceVersion),
preservedTiming: parsed?.preservedTiming === true,
message: result.exitCode === 0 ? "state patch command completed" : redactText(tailText(result.stderr || result.stdout, 500)),
parsedDownstreamCliOutput: false,
};
}
function removeFollowerStateKeys(registry: BranchFollowerRegistry, options: ParsedOptions, ids: string[]): CommandResult {
const patch = JSON.stringify({ data: Object.fromEntries(ids.map((id) => [id, null])) });
const script = options.inCluster
? [
"set -eu",
"tmpdir=$(mktemp -d)",
"cleanup() { rm -rf \"$tmpdir\"; }",
"trap cleanup EXIT INT TERM",
nativeCicdScriptLoadShell(["patch-configmap-data.mjs"]),
`NAMESPACE=${shQuote(registry.controller.namespace)}`,
`CONFIGMAP=${shQuote(registry.controller.stateConfigMapName)}`,
`PATCH_B64=${shQuote(Buffer.from(patch, "utf8").toString("base64"))}`,
"export NAMESPACE CONFIGMAP PATCH_B64",
"node \"$tmpdir/patch-configmap-data.mjs\"",
].join("\n")
: [
"set -eu",
"tmpdir=$(mktemp -d)",
"cleanup() { rm -rf \"$tmpdir\"; }",
"trap cleanup EXIT INT TERM",
`NAMESPACE=${shQuote(registry.controller.namespace)}`,
`CONFIGMAP=${shQuote(registry.controller.stateConfigMapName)}`,
`PATCH=${shQuote(patch)}`,
"export NAMESPACE CONFIGMAP PATCH",
"if ! kubectl -n \"$NAMESPACE\" get configmap \"$CONFIGMAP\" >/dev/null 2>\"$tmpdir/error\"; then",
" if grep -qi 'not found' \"$tmpdir/error\"; then",
" printf '{\"ok\":true,\"present\":false,\"patched\":false,\"reason\":\"state-configmap-not-found\",\"parsedDownstreamCliOutput\":false}'",
" exit 0",
" fi",
" cat \"$tmpdir/error\" >&2",
" exit 1",
"fi",
"kubectl -n \"$NAMESPACE\" patch configmap \"$CONFIGMAP\" --type merge -p \"$PATCH\" >/dev/null",
"printf '{\"ok\":true,\"present\":true,\"patched\":true,\"parsedDownstreamCliOutput\":false}'",
].join("\n");
return runKubeScript(registry, options, script, "", 10_000);
}
function kubeJson(registry: BranchFollowerRegistry, options: ParsedOptions, command: string, timeoutMs: number, inClusterPath?: string): { ok: boolean; value: Record<string, unknown> | null; error: string } {
const script = options.inCluster && inClusterPath !== undefined
? [
"set -eu",
"tmpdir=$(mktemp -d)",
"cleanup() { rm -rf \"$tmpdir\"; }",
"trap cleanup EXIT INT TERM",
nativeCicdScriptLoadShell(["kube-get.mjs"]),
`node "$tmpdir/kube-get.mjs" ${shQuote(inClusterPath)}`,
].join("\n")
: `set -eu\n${command}`;
const result = runKubeScript(registry, options, script, "", timeoutMs);
const value = result.exitCode === 0 ? parseJsonObject(result.stdout) : null;
return {
ok: result.exitCode === 0 && value !== null,
value,
error: redactText(tailText(result.stderr || result.stdout, 800)),
};
}
function kubePodList(registry: BranchFollowerRegistry, options: ParsedOptions, selector: string): { ok: boolean; value: Record<string, unknown> | null; error: string } {
const command = `kubectl -n ${shQuote(registry.controller.namespace)} get pods -l ${shQuote(selector)} -o name`;
const script = options.inCluster
? [
"set -eu",
"tmpdir=$(mktemp -d)",
"cleanup() { rm -rf \"$tmpdir\"; }",
"trap cleanup EXIT INT TERM",
nativeCicdScriptLoadShell(["kube-get.mjs"]),
`node "$tmpdir/kube-get.mjs" ${shQuote(`/api/v1/namespaces/${encodeURIComponent(registry.controller.namespace)}/pods?labelSelector=${encodeURIComponent(selector)}`)}`,
].join("\n")
: `set -eu\n${command}`;
const result = runKubeScript(registry, options, script, "", 10_000);
const parsed = options.inCluster && result.exitCode === 0 ? parseJsonObject(result.stdout) : null;
const names = options.inCluster
? arrayRecords(parsed?.items).map((item) => stringOrNull(asOptionalRecord(item.metadata)?.name)).filter((name): name is string => name !== null)
: result.stdout.split(/\r?\n/u).map((line) => line.trim()).filter((line) => line.length > 0).map((line) => line.replace(/^pod\//u, ""));
return {
ok: result.exitCode === 0,
value: result.exitCode === 0 ? { items: names.map((name) => ({ metadata: { name } })) } : null,
error: redactText(tailText(result.stderr || result.stdout, 800)),
};
}
function runKubeScript(registry: BranchFollowerRegistry, options: ParsedOptions, script: string, input: string, timeoutMs: number): CommandResult {
if (options.inCluster) {
return runCommand(["sh", "-lc", script], repoRoot, { input, timeoutMs });
}
return runCommand([transPath(), registry.controller.kubeRoute, "sh"], repoRoot, { input: `${script}\n`, timeoutMs });
}
function compactFollowerStateForConfigMap(state: FollowerState): Record<string, unknown> {
return {
id: state.id,
adapter: state.adapter,
enabled: state.enabled,
phase: state.phase,
source: state.source,
target: state.target,
lastTriggeredSha: state.lastTriggeredSha,
lastSucceededSha: state.lastSucceededSha,
pipelineRun: state.pipelineRun,
inFlightJob: state.inFlightJob,
budgetSource: state.budgetSource,
controller: state.controller,
decision: state.decision,
dryRun: state.dryRun,
updatedAt: state.updatedAt,
timings: compactTimings(state.timings),
warnings: compactStateWarnings(state.warnings),
stateFormat: "compact-v1",
command: compactStateCommand(state.command),
};
}
function compactStateCommand(command: Record<string, unknown> | undefined): Record<string, unknown> | undefined {
if (command === undefined) return undefined;
const closeout = asOptionalRecord(command.closeout);
const closeoutSummary = asOptionalRecord(closeout?.summary);
const payload = compactNativePayload(asOptionalRecord(command.payload)) ?? closeoutSummary;
return {
mode: stringOrNull(command.mode) ?? stringOrNull(command.status),
namespace: stringOrNull(command.namespace),
pipelineRun: stringOrNull(command.pipelineRun),
sourceCommit: stringOrNull(command.sourceCommit),
sourceStageRef: stringOrNull(command.sourceStageRef),
wait: command.wait === true ? true : undefined,
pipelineRunCompleted: command.pipelineRunCompleted === true ? true : undefined,
stillRunning: command.stillRunning === true ? true : undefined,
closeout: closeout === null
? null
: {
ok: closeout.ok === true,
completed: closeout.completed === true,
timedOut: closeout.timedOut === true,
polls: numberOrNull(closeout.polls),
elapsedMs: numberOrNull(closeout.elapsedMs),
gitMirrorFlush: compactCloseoutGitMirrorFlush(asOptionalRecord(closeout.gitMirrorFlush)),
summary: closeoutSummary,
statusAuthority: stringOrNull(closeout.statusAuthority),
parsedDownstreamCliOutput: false,
},
payload,
exitCode: numberOrNull(command.exitCode),
timedOut: command.timedOut === true,
statusAuthority: stringOrNull(command.statusAuthority),
parsedDownstreamCliOutput: false,
};
}
function compactCloseoutGitMirrorFlush(value: Record<string, unknown> | null): Record<string, unknown> | null {
if (value === null) return null;
const result = asOptionalRecord(value.result);
return {
jobName: stringOrNull(value.jobName),
namespace: stringOrNull(value.namespace),
result: result === null
? null
: {
ok: result.ok === true,
completed: result.completed === true,
failed: result.failed === true,
timedOut: result.timedOut === true,
created: result.created === true,
reused: result.reused === true,
jobName: stringOrNull(result.jobName),
namespace: stringOrNull(result.namespace),
elapsedMs: numberOrNull(result.elapsedMs),
conditionReason: stringOrNull(result.conditionReason),
conditionMessage: stringOrNull(result.conditionMessage),
statusAuthority: stringOrNull(result.statusAuthority),
parsedDownstreamCliOutput: false,
},
};
}
function compactStateWarnings(warnings: string[]): string[] {
return warnings.slice(0, 4).map((item) => redactText(item.length <= 800 ? item : `${item.slice(0, 400)} ... ${item.slice(-400)}`));
}
function compactNativePayload(payload: Record<string, unknown> | null): Record<string, unknown> | null {
if (payload === null) return null;
return {
source: compactSourcePayload(asOptionalRecord(payload.source)),
sourceSync: compactSourceSyncPayload(asOptionalRecord(payload.sourceSync)),
reuseConfig: summarizeRuntimeReuseConfigFromRecord(asOptionalRecord(payload.reuseConfig)),
gitMirror: asOptionalRecord(payload.gitMirror),
tekton: asOptionalRecord(payload.tekton),
taskRuns: compactTaskRunsPayload(asOptionalRecord(payload.taskRuns)),
planArtifacts: compactPlanArtifactsPayload(asOptionalRecord(payload.planArtifacts)),
argo: asOptionalRecord(payload.argo),
runtime: asOptionalRecord(payload.runtime),
errors: Array.isArray(payload.errors) ? payload.errors.slice(0, 5) : [],
statusAuthority: stringOrNull(payload.statusAuthority),
parsedDownstreamCliOutput: false,
};
}
function summarizeRuntimeReuseConfigFromRecord(value: Record<string, unknown> | null): Record<string, unknown> | null {
if (value === null) return null;
return {
ok: value.ok === true,
present: value.present === true,
path: stringOrNull(value.path),
sourceCommit: stringOrNull(value.sourceCommit),
stageRef: stringOrNull(value.stageRef),
sha256: stringOrNull(value.sha256),
serviceCount: numberOrNull(value.serviceCount),
serviceIds: compactReuseServiceIds(value),
errors: Array.isArray(value.errors) ? value.errors.map(String).slice(0, 5) : [],
valuesRedacted: true,
};
}
function compactReuseServiceIds(value: Record<string, unknown>): string[] {
if (Array.isArray(value.serviceIds)) return value.serviceIds.map(String).slice(0, 16);
return arrayRecords(value.services).flatMap((service) => {
const id = stringOrNull(service.id);
return id === null ? [] : [id];
}).slice(0, 16);
}
function compactSourceSyncPayload(value: Record<string, unknown> | null): Record<string, unknown> | null {
if (value === null) return null;
return {
ok: value.ok === true,
completed: value.completed === true,
failed: value.failed === true,
timedOut: value.timedOut === true,
created: value.created === true,
reused: value.reused === true,
jobName: stringOrNull(value.jobName),
namespace: stringOrNull(value.namespace),
elapsedMs: numberOrNull(value.elapsedMs),
conditionReason: stringOrNull(value.conditionReason),
conditionMessage: stringOrNull(value.conditionMessage),
statusAuthority: stringOrNull(value.statusAuthority),
parsedDownstreamCliOutput: false,
};
}
function compactTaskRunsPayload(taskRuns: Record<string, unknown> | null): Record<string, unknown> | null {
if (taskRuns === null) return null;
return {
ok: taskRuns.ok === true,
count: numberOrNull(taskRuns.count),
succeededCount: numberOrNull(taskRuns.succeededCount),
failedCount: numberOrNull(taskRuns.failedCount),
activeCount: numberOrNull(taskRuns.activeCount),
performance: asOptionalRecord(taskRuns.performance),
items: Array.isArray(taskRuns.items) ? taskRuns.items.slice(0, 16) : [],
};
}
function compactPlanArtifactsPayload(planArtifacts: Record<string, unknown> | null): Record<string, unknown> | null {
if (planArtifacts === null) return null;
return {
ok: planArtifacts.ok === true,
pipelineRun: stringOrNull(planArtifacts.pipelineRun),
eventFound: planArtifacts.eventFound === true,
degradedReason: stringOrNull(planArtifacts.degradedReason),
sourceCommitId: stringOrNull(planArtifacts.sourceCommitId),
affectedServices: Array.isArray(planArtifacts.affectedServices) ? planArtifacts.affectedServices.slice(0, 40) : [],
rolloutServices: Array.isArray(planArtifacts.rolloutServices) ? planArtifacts.rolloutServices.slice(0, 40) : [],
buildServices: Array.isArray(planArtifacts.buildServices) ? planArtifacts.buildServices.slice(0, 40) : [],
reusedServices: Array.isArray(planArtifacts.reusedServices) ? planArtifacts.reusedServices.slice(0, 40) : [],
buildSkippedCount: numberOrNull(planArtifacts.buildSkippedCount),
summary: stringOrNull(planArtifacts.summary),
disclosure: stringOrNull(planArtifacts.disclosure),
};
}
function compactSourcePayload(source: Record<string, unknown> | null): Record<string, unknown> | null {
if (source === null) return null;
return {
commit: stringOrNull(source.commit),
branch: stringOrNull(source.branch),
repository: stringOrNull(source.repository),
stageRef: stringOrNull(source.stageRef),
sourceAuthority: stringOrNull(source.sourceAuthority),
repoPath: stringOrNull(source.repoPath),
};
}
function buildFollowerTimings(
follower: FollowerSpec,
live: AdapterSummary,
triggerCommand: Record<string, unknown> | undefined,
storedTimings?: Record<string, unknown> | null,
phase?: BranchFollowerPhase,
): FollowerState["timings"] {
const nativePayload = asOptionalRecord(live.payload);
const finishOverride = stringOrNull(triggerCommand?.finishedAt) ?? noopStoredTotalFinishOverride(storedTimings, phase, live);
const total = totalTimingFromCommand(triggerCommand, phase) ?? totalTimingFromStored(storedTimings, phase, finishOverride, live.observedSha);
const stages = dedupeTimingStages([
...stageTimingsFromCommand(triggerCommand),
...stageTimingsFromNativePayload(nativePayload),
]).slice(0, 24);
const stageSourceCommit = stages.length > 0 ? live.observedSha : null;
return {
budgetSeconds: follower.budgets.endToEndSeconds,
totalSeconds: total?.seconds ?? null,
totalStatus: total?.status ?? "unknown",
totalSource: total?.source ?? "-",
sourceCommit: total?.sourceCommit ?? stringOrNull(triggerCommand?.sourceCommit) ?? stageSourceCommit,
startedAt: total?.startedAt ?? null,
finishedAt: total?.finishedAt ?? null,
overBudget: total === null ? null : total.seconds > follower.budgets.endToEndSeconds,
stages,
};
}
function noopStoredTotalFinishOverride(
storedTimings: Record<string, unknown> | null | undefined,
phase: BranchFollowerPhase | undefined,
live: AdapterSummary,
): string | null {
if (phase !== "Noop" || live.aligned !== true || live.observedSha === null) return null;
if (storedTimings === null || storedTimings === undefined) return null;
if (stringOrNull(storedTimings.sourceCommit) !== live.observedSha) return null;
if (stringOrNull(storedTimings.startedAt) === null) return null;
if (stringOrNull(storedTimings.finishedAt) !== null) return null;
return new Date().toISOString();
}
function storedFollowerTimingsForStatus(
follower: FollowerSpec,
storedTimings: Record<string, unknown> | null,
phase: BranchFollowerPhase,
observedSha: string | null,
): FollowerState["timings"] {
const total = totalTimingFromStored(storedTimings, phase, null, observedSha);
const sourceCommit = total?.sourceCommit ?? stringOrNull(storedTimings?.sourceCommit) ?? null;
return {
budgetSeconds: follower.budgets.endToEndSeconds,
totalSeconds: total?.seconds ?? null,
totalStatus: total?.status ?? "unknown",
totalSource: total?.source ?? "-",
sourceCommit,
startedAt: total?.startedAt ?? null,
finishedAt: total?.finishedAt ?? null,
overBudget: total === null ? null : total.seconds > follower.budgets.endToEndSeconds,
stages: sourceCommit === null ? [] : storedStageTimings(storedTimings),
};
}
function storedStageTimings(storedTimings: Record<string, unknown> | null): StageTiming[] {
if (storedTimings === null) return [];
return arrayRecords(storedTimings.stages)
.map((stage) => stageTiming(
stringOrNull(stage.stage) ?? "",
stringOrNull(stage.status) ?? "unknown",
numberOrNull(stage.seconds),
numberOrNull(stage.budgetSeconds),
stringOrNull(stage.source) ?? "stored-state",
stringOrNull(stage.object),
))
.filter((stage) => stage.stage.length > 0)
.slice(0, 24);
}
function compactTimings(timings: FollowerState["timings"]): FollowerState["timings"] {
return {
budgetSeconds: timings.budgetSeconds,
totalSeconds: timings.totalSeconds,
totalStatus: timings.totalStatus,
totalSource: timings.totalSource,
sourceCommit: timings.sourceCommit,
startedAt: timings.startedAt,
finishedAt: timings.finishedAt,
overBudget: timings.overBudget,
stages: timings.stages.slice(0, 24).map((stage) => ({
stage: stage.stage,
status: stage.status,
seconds: stage.seconds,
budgetSeconds: stage.budgetSeconds,
source: stage.source,
object: stage.object,
})),
};
}
function totalTimingFromCommand(command: Record<string, unknown> | undefined, phase?: BranchFollowerPhase): { seconds: number; status: string; source: string; sourceCommit: string | null; startedAt: string | null; finishedAt: string | null } | null {
if (command === undefined) return null;
if (command.mode === "k8s-native-closeout") return null;
const payload = asOptionalRecord(command.payload);
if (payload?.reused === true) return null;
const startedAt = stringOrNull(command.startedAt);
const finishedAt = stringOrNull(command.finishedAt);
const seconds = totalSecondsFromRange(startedAt, finishedAt) ?? secondsFromMs(numberOrNull(command.elapsedMs));
if (seconds === null) return null;
const closeout = asOptionalRecord(command.closeout);
const exitCode = numberOrNull(command.exitCode);
const status = command.ok === false || (exitCode !== null && exitCode !== 0)
? "failed"
: command.timedOut === true || closeout?.timedOut === true
? "timed-out"
: closeout?.completed === true || command.completed === true
? "completed"
: command.stillRunning === true
? "running"
: command.pipelineRunCompleted === true
? "ci-completed"
: phase === undefined
? "submitted"
: phase.toLowerCase();
return { seconds, status, source: stringOrNull(command.mode) ?? stringOrNull(command.status) ?? "command", sourceCommit: stringOrNull(command.sourceCommit), startedAt, finishedAt };
}
function totalTimingFromStored(storedTimings: Record<string, unknown> | null | undefined, phase?: BranchFollowerPhase, finishOverride?: string | null, observedSha?: string | null): { seconds: number; status: string; source: string; sourceCommit: string | null; startedAt: string | null; finishedAt: string | null } | null {
if (storedTimings === null || storedTimings === undefined) return null;
const status = stringOrNull(storedTimings.totalStatus);
const source = stringOrNull(storedTimings.totalSource);
const sourceCommit = stringOrNull(storedTimings.sourceCommit);
if (sourceCommit === null) return null;
if (observedSha !== null && observedSha !== undefined && sourceCommit !== observedSha) return null;
const startedAt = stringOrNull(storedTimings.startedAt);
const finishedAt = stringOrNull(storedTimings.finishedAt) ?? finishOverride ?? null;
if (phase === "Noop" && finishedAt === null) return null;
const seconds = totalSecondsFromRange(startedAt, finishedAt) ?? numberOrNull(storedTimings.totalSeconds);
if (seconds === null) return null;
return {
seconds,
status: finishedAt === null && phase !== undefined && !terminalPhase(phase) ? phase.toLowerCase() : phase === undefined ? status ?? "recorded" : phase.toLowerCase(),
source: source ?? "stored-state",
sourceCommit,
startedAt,
finishedAt,
};
}
function totalSecondsFromRange(startedAt: string | null, finishedAt: string | null): number | null {
const startedMs = timestampMs(startedAt);
if (startedMs === null) return null;
const finishedMs = timestampMs(finishedAt) ?? Date.now();
return finishedMs >= startedMs ? roundSeconds((finishedMs - startedMs) / 1000) : null;
}
function timestampMs(value: string | null): number | null {
if (value === null) return null;
const parsed = Date.parse(value);
return Number.isFinite(parsed) ? parsed : null;
}
function terminalPhase(phase: BranchFollowerPhase): boolean {
return phase === "Succeeded" || phase === "Failed" || phase === "Blocked" || phase === "Skipped" || phase === "Noop";
}
function stageTimingsFromNativePayload(payload: Record<string, unknown> | null): StageTiming[] {
if (payload === null) return [];
const stages: StageTiming[] = [];
const statusRead = asOptionalRecord(asOptionalRecord(payload.timings)?.statusRead);
stages.push(stageTiming("status-read", "ok", secondsFromMs(numberOrNull(statusRead?.elapsedMs)), numberOrNull(statusRead?.budgetSeconds), "native-status", null));
const sourceSyncStage = k8sJobTiming("git-mirror-sync", asOptionalRecord(payload.sourceSync));
if (sourceSyncStage !== null) stages.push(sourceSyncStage);
const reuseConfig = asOptionalRecord(payload.reuseConfig);
if (reuseConfig !== null) {
stages.push(stageTiming("reuse-config", reuseConfig.ok === true ? "ready" : "missing-or-invalid", null, null, "source-gitops", stringOrNull(reuseConfig.path)));
}
const gitMirror = asOptionalRecord(payload.gitMirror);
if (gitMirror !== null) {
const hasGitopsBranch = stringOrNull(gitMirror.gitopsBranch) !== null;
const sourceReady = gitMirror.sourceSnapshotReady === true;
const status = gitMirror.pendingFlush === true
? "pending-flush"
: hasGitopsBranch
? gitMirror.githubInSync === true && sourceReady ? "ready" : "not-ready"
: sourceReady ? "source-ready" : "source-not-ready";
stages.push(stageTiming("git-mirror", status, null, null, "git-mirror-cache", stringOrNull(gitMirror.gitopsBranch) ?? stringOrNull(gitMirror.sourceBranch)));
}
const tekton = asOptionalRecord(payload.tekton);
if (tekton !== null) {
const status = tekton.succeeded === true ? "succeeded" : tekton.succeeded === false ? `failed:${stringOrNull(tekton.reason) ?? "unknown"}` : "running";
stages.push(stageTiming("pipelinerun", status, numberOrNull(tekton.durationSeconds), null, "tekton", stringOrNull(tekton.name)));
}
const taskRuns = asOptionalRecord(payload.taskRuns);
for (const record of taskRuns === null ? [] : prioritizedTaskRunItems(taskRuns)) {
const name = stringOrNull(record.pipelineTask) ?? stringOrNull(record.name) ?? "unknown";
const status = record.status === "True" ? "succeeded" : record.status === "False" ? `failed:${stringOrNull(record.reason) ?? "unknown"}` : "running";
stages.push(stageTiming(`task:${name}`, status, numberOrNull(record.durationSeconds), null, "tekton-taskrun", stringOrNull(record.name)));
}
const argo = asOptionalRecord(payload.argo);
if (argo !== null) {
stages.push(stageTiming("argo", `${stringOrNull(argo.syncStatus) ?? "unknown"}/${stringOrNull(argo.healthStatus) ?? "unknown"}`, numberOrNull(argo.operationDurationSeconds), null, "argocd", stringOrNull(argo.name)));
}
const runtime = asOptionalRecord(payload.runtime);
if (runtime !== null) {
const aligned = runtime.aligned === true ? "aligned" : runtime.aligned === false ? "stale" : "unknown-target";
stages.push(stageTiming("runtime", `${runtime.ready === true ? "ready" : "not-ready"}/${aligned}`, null, null, "kubernetes-workload", stringOrNull(runtime.namespace)));
}
return stages;
}
function stageTimingsFromCommand(command: Record<string, unknown> | undefined): StageTiming[] {
if (command === undefined) return [];
const stages: StageTiming[] = [];
const phase = stringOrNull(command.phase);
const jobStage = phase === null ? null : k8sJobTiming(phase, asOptionalRecord(command.job), stringOrNull(command.jobName));
if (jobStage !== null) stages.push(jobStage);
const payload = asOptionalRecord(command.payload);
if (payload !== null) {
const capabilities = asOptionalRecord(payload.nativeCapabilities);
for (const stage of [
k8sJobTiming("git-mirror-sync", asOptionalRecord(capabilities?.gitMirrorSync)),
k8sJobTiming("control-plane-refresh", asOptionalRecord(capabilities?.controlPlaneRefresh)),
k8sJobTiming("git-mirror-flush", asOptionalRecord(capabilities?.gitMirrorFlush)),
]) {
if (stage !== null) stages.push(stage);
}
const agentrun = asOptionalRecord(payload.agentrun);
const agentrunSync = asOptionalRecord(agentrun?.gitMirrorSync);
const agentrunFlush = asOptionalRecord(agentrun?.gitMirrorFlush);
const imageBuild = asOptionalRecord(agentrun?.imageBuild);
const gitopsPublish = asOptionalRecord(agentrun?.gitopsPublish);
for (const stage of [
k8sJobTiming("git-mirror-sync", asOptionalRecord(agentrunSync?.payload), stringOrNull(agentrunSync?.jobName)),
k8sJobTiming("image-build", asOptionalRecord(imageBuild?.result), stringOrNull(imageBuild?.jobName)),
k8sJobTiming("gitops-publish", asOptionalRecord(gitopsPublish?.result), stringOrNull(gitopsPublish?.jobName)),
k8sJobTiming("git-mirror-flush", asOptionalRecord(agentrunFlush?.payload), stringOrNull(agentrunFlush?.jobName)),
]) {
if (stage !== null) stages.push(stage);
}
const tektonSeconds = secondsFromMs(numberOrNull(payload.elapsedMs));
if (tektonSeconds !== null) {
const status = payload.completed === true ? "completed" : payload.failed === true ? "failed" : payload.stillRunning === true ? "running" : "submitted";
stages.push(stageTiming("pipelinerun-wait", status, tektonSeconds, null, "tekton-submit", stringOrNull(command.pipelineRun)));
}
}
const closeout = asOptionalRecord(command.closeout);
if (closeout !== null) {
const gitMirrorFlush = asOptionalRecord(closeout.gitMirrorFlush);
const gitMirrorFlushStage = k8sJobTiming("git-mirror-flush", asOptionalRecord(gitMirrorFlush?.result), stringOrNull(gitMirrorFlush?.jobName));
if (gitMirrorFlushStage !== null) stages.push(gitMirrorFlushStage);
const status = closeout.completed === true ? "completed" : closeout.timedOut === true ? "timed-out" : "pending";
stages.push(stageTiming("closeout", status, secondsFromMs(numberOrNull(closeout.elapsedMs)), null, "k8s-native-closeout", stringOrNull(command.pipelineRun)));
}
return stages;
}
function k8sJobTiming(stage: string, job: Record<string, unknown> | null, objectOverride?: string | null): StageTiming | null {
if (job === null) return null;
const status = job.completed === true
? job.reused === true ? "reused" : "completed"
: job.failed === true
? "failed"
: job.timedOut === true
? "timed-out"
: "running";
return stageTiming(stage, status, secondsFromMs(numberOrNull(job.elapsedMs)), null, "kubernetes-job", objectOverride ?? stringOrNull(job.jobName));
}
function stageTiming(stage: string, status: string, seconds: number | null, budgetSeconds: number | null, source: string, object: string | null): StageTiming {
return { stage, status, seconds, budgetSeconds, source, object };
}
function dedupeTimingStages(stages: StageTiming[]): StageTiming[] {
const byKey = new Map<string, StageTiming>();
for (const stage of stages) {
if (stage.stage.length === 0) continue;
const key = `${stage.stage}\t${stage.object ?? ""}`;
const previous = byKey.get(key);
if (previous === undefined || previous.seconds === null && stage.seconds !== null) byKey.set(key, stage);
}
return [...byKey.values()];
}
function secondsFromMs(value: number | null): number | null {
return value === null ? null : roundSeconds(value / 1000);
}
function roundSeconds(value: number): number {
return Math.round(value * 10) / 10;
}
function writeFollowerState(registry: BranchFollowerRegistry, state: FollowerState, options: ParsedOptions): CommandResult {
const stateJson = JSON.stringify(compactFollowerStateForConfigMap(state));
const script = [
"set -eu",
`NAMESPACE=${shQuote(registry.controller.namespace)}`,
`CONFIGMAP=${shQuote(registry.controller.stateConfigMapName)}`,
`FOLLOWER_ID=${shQuote(state.id)}`,
`SPEC_REF=${shQuote(SPEC_REF)}`,
`STATE_B64=${shQuote(Buffer.from(stateJson, "utf8").toString("base64"))}`,
"export NAMESPACE CONFIGMAP FOLLOWER_ID SPEC_REF STATE_B64",
"tmpdir=$(mktemp -d)",
"cleanup() { rm -rf \"$tmpdir\"; }",
"trap cleanup EXIT INT TERM",
nativeCicdScriptLoadShell(["patch-follower-state.mjs"]),
"node \"$tmpdir/patch-follower-state.mjs\"",
].join("\n");
return runKubeScript(registry, options, script, "", 10_000);
}
function runControllerReconcileJob(registry: BranchFollowerRegistry, options: ParsedOptions, mode: { dryRun: boolean; wait: boolean; recordState: boolean }): Record<string, unknown> {
const timeoutSeconds = options.timeoutSeconds ?? registry.controller.budgets.runOnceSeconds;
const jobName = `${registry.controller.deploymentName}-once-${Date.now().toString(36)}`.slice(0, 63);
const manifest = renderControllerReconcileJob(registry, options, jobName, mode, timeoutSeconds);
const manifestYaml = `${Bun.YAML.stringify(manifest).trim()}\n`;
const manifestBase64 = Buffer.from(manifestYaml, "utf8").toString("base64");
const script = [
"set -eu",
"tmp=$(mktemp)",
"base64 -d >\"$tmp\" <<'UNIDESK_CICD_RECONCILE_JOB_B64'",
manifestBase64,
"UNIDESK_CICD_RECONCILE_JOB_B64",
`kubectl -n ${shQuote(registry.controller.namespace)} delete job ${shQuote(jobName)} --ignore-not-found=true >/dev/null 2>&1 || true`,
`kubectl apply --server-side --force-conflicts --field-manager=${shQuote(registry.controller.fieldManager)} -f "$tmp" >/dev/null`,
mode.wait ? waitForJobShell(registry.controller.namespace, jobName, timeoutSeconds) : "true",
].join("\n");
const result = runKubeScript(registry, options, script, "", (timeoutSeconds + registry.controller.budgets.reconcileTransportGraceSeconds) * 1000);
return {
ok: result.exitCode === 0,
name: jobName,
namespace: registry.controller.namespace,
mode: mode.dryRun ? "status-refresh" : "confirm-reconcile",
execution: "k8s-native-job",
exitCode: result.exitCode,
timedOut: result.timedOut,
message: result.exitCode === 0 ? "reconcile job completed" : redactText(tailText(result.stderr || result.stdout, 800)),
stdoutBytes: Buffer.byteLength(result.stdout, "utf8"),
stderrBytes: Buffer.byteLength(result.stderr, "utf8"),
stdoutTail: options.full || result.exitCode !== 0 ? redactText(tailText(result.stdout, options.full ? 4000 : 400)) : "",
stderrTail: options.full || result.exitCode !== 0 ? redactText(tailText(result.stderr, options.full ? 2000 : 400)) : "",
};
}
function selectFollowers(registry: BranchFollowerRegistry, options: ParsedOptions, opts: { includeDisabled: boolean }): FollowerSpec[] {
let selected = registry.followers;
if (options.followerId !== null) selected = selected.filter((item) => item.id === options.followerId);
else if (!options.all && options.action === "run-once") selected = selected.filter((item) => item.enabled);
if (!opts.includeDisabled) selected = selected.filter((item) => item.enabled);
if (selected.length === 0) throw new Error(options.followerId === null ? "no followers selected" : `unknown or disabled follower ${options.followerId}`);
return selected;
}
function registrySummary(registry: BranchFollowerRegistry): Record<string, unknown> {
return {
path: registry.path,
sha256: registry.rawSha256,
metadata: registry.metadata,
controller: {
namespace: registry.controller.namespace,
kubeRoute: registry.controller.kubeRoute,
deploymentName: registry.controller.deploymentName,
stateConfigMapName: registry.controller.stateConfigMapName,
leaseName: registry.controller.leaseName,
},
followers: registry.followers.map((item) => item.id),
};
}
function redactCommands(follower: FollowerSpec): Record<string, string> {
return {
plan: follower.commands.plan.argv.join(" "),
status: "native:k8s-git-mirror+tekton+argocd+runtime",
trigger: follower.commands.trigger.argv.join(" "),
events: follower.commands.events.argv.join(" "),
logs: follower.commands.logs.argv.join(" "),
};
}
function nativeStatusPlan(native: NativeStatusSpec): Record<string, unknown> {
return {
source: {
mode: "k8s-git-mirror-cache",
gitMirrorNamespace: native.source.gitMirrorNamespace,
gitMirrorDeployment: native.source.gitMirrorDeployment,
repoPath: native.source.repoPath,
},
tekton: native.tekton,
argo: native.argo,
runtime: native.runtime === null
? null
: {
namespace: native.runtime.namespace,
workloads: native.runtime.workloads.map((item) => ({ kind: item.kind, name: item.name })),
},
parsedDownstreamCliOutput: false,
};
}
function controllerStatusSummary(registry: BranchFollowerRegistry, k8s: K8sStateRead): Record<string, unknown> {
const deploymentStatus = asOptionalRecord(k8s.deployment?.status);
const available = numberOrNull(deploymentStatus?.availableReplicas) ?? 0;
const replicas = numberOrNull(deploymentStatus?.replicas) ?? 0;
const leaseSpec = asOptionalRecord(k8s.lease?.spec);
const podItems = Array.isArray(k8s.pods?.items) ? k8s.pods.items.length : null;
return {
namespace: registry.controller.namespace,
route: registry.controller.kubeRoute,
deploymentName: registry.controller.deploymentName,
deploymentPresent: k8s.deployment !== null,
availableReplicas: available,
replicas,
pods: podItems,
stateConfigMapName: registry.controller.stateConfigMapName,
stateConfigMapPresent: k8s.stateConfigMapPresent,
leaseName: registry.controller.leaseName,
leaseHolder: stringOrNull(leaseSpec?.holderIdentity),
noHostWorktreeAuthority: true,
};
}
function followerNextCommands(follower: FollowerSpec): Record<string, string> {
const next: Record<string, string> = {
status: `bun scripts/cli.ts cicd branch-follower status --follower ${follower.id}`,
liveStatus: `bun scripts/cli.ts cicd branch-follower status --follower ${follower.id} --live`,
dryRun: `bun scripts/cli.ts cicd branch-follower run-once --follower ${follower.id} --dry-run`,
trigger: `bun scripts/cli.ts cicd branch-follower run-once --follower ${follower.id} --confirm --wait`,
events: `bun scripts/cli.ts cicd branch-follower events --follower ${follower.id}`,
logs: `bun scripts/cli.ts cicd branch-follower logs --follower ${follower.id}`,
};
if (follower.nativeStatus.tekton !== null) next.pipelineRuns = `bun scripts/cli.ts cicd branch-follower events --follower ${follower.id}`;
if (follower.nativeStatus.argo !== null) next.argoApplication = `bun scripts/cli.ts cicd branch-follower events --follower ${follower.id}`;
return next;
}
function safeResolveString(ref: string): string | null {
try {
return resolveConfigRefString(ref, ref);
} catch {
return null;
}
}
function parseJsonObject(text: string): Record<string, unknown> | null {
const trimmed = text.trim();
if (trimmed.length === 0) return null;
try {
const parsed = JSON.parse(trimmed) as unknown;
return asOptionalRecord(parsed);
} catch {
const start = trimmed.indexOf("{");
const end = trimmed.lastIndexOf("}");
if (start < 0 || end <= start) return null;
try {
const parsed = JSON.parse(trimmed.slice(start, end + 1)) as unknown;
return asOptionalRecord(parsed);
} catch {
return null;
}
}
}
function recordAt(root: Record<string, unknown>, path: string[]): Record<string, unknown> | null {
let current: unknown = root;
for (const item of path) {
if (typeof current !== "object" || current === null || Array.isArray(current)) return null;
current = (current as Record<string, unknown>)[item];
}
return asOptionalRecord(current);
}
function asOptionalRecord(value: unknown): Record<string, unknown> | null {
if (typeof value !== "object" || value === null || Array.isArray(value)) return null;
return value as Record<string, unknown>;
}
function stringOrNull(value: unknown): string | null {
return typeof value === "string" && value.length > 0 ? value : null;
}
function numberOrNull(value: unknown): number | null {
return typeof value === "number" && Number.isFinite(value) ? value : null;
}
function commandCompact(result: CommandResult, options: ParsedOptions): Record<string, unknown> {
return {
argv: result.command,
exitCode: result.exitCode,
timedOut: result.timedOut,
stdoutBytes: Buffer.byteLength(result.stdout),
stderrBytes: Buffer.byteLength(result.stderr),
stdoutTail: options.full || result.exitCode !== 0 ? redactText(tailText(result.stdout, options.tailBytes)) : "",
stderrTail: options.full || result.exitCode !== 0 ? redactText(tailText(result.stderr, Math.min(options.tailBytes, 4000))) : "",
};
}
function objectRef(item: Record<string, unknown>): Record<string, string> {
const metadata = asRecord(item.metadata, "metadata");
return {
kind: stringField(item, "kind", "manifest"),
namespace: typeof metadata.namespace === "string" ? metadata.namespace : "-",
name: stringField(metadata, "name", "manifest.metadata"),
};
}
function labelSelector(labels: Record<string, string>): string {
return Object.entries(labels).map(([key, value]) => `${key}=${value}`).join(",");
}
function isNotFoundText(value: string): boolean {
return /notfound|not found|notfound|NotFound/u.test(value);
}
function shortSha(value: string | null): string {
if (value === null) return "-";
return value.length > 12 ? value.slice(0, 12) : value;
}
function safeJobSegment(value: string): string {
return value.replace(/[^A-Za-z0-9_.-]/gu, "_").slice(0, 60);
}
function nativeCapabilityJobName(followerId: string, action: string, sha: string): string {
const prefix = `${safeK8sNameSegment(followerId)}-${safeK8sNameSegment(action)}`;
return `${prefix}-${sha.slice(0, 12)}`.replace(/-+/gu, "-").replace(/^-|-$/gu, "").slice(0, 63);
}
function safeK8sNameSegment(value: string): string {
const normalized = value.toLowerCase().replace(/[^a-z0-9-]+/gu, "-").replace(/-+/gu, "-").replace(/^-|-$/gu, "");
return (normalized.length === 0 ? "x" : normalized).slice(0, 40).replace(/-$/u, "");
}
function tailText(text: string, maxChars: number): string {
if (text.length <= maxChars) return text;
return text.slice(text.length - maxChars);
}
function commandLabel(options: ParsedOptions): string {
return `cicd branch-follower ${options.action}`;
}
function arrayRecords(value: unknown): Record<string, unknown>[] {
return Array.isArray(value) ? value.filter((item): item is Record<string, unknown> => typeof item === "object" && item !== null && !Array.isArray(item)) : [];
}