fix: wait for native sentinel closeout

This commit is contained in:
Codex
2026-07-03 08:43:10 +00:00
parent a571c9e247
commit 0136b5da7d
+142 -10
View File
@@ -240,6 +240,18 @@ interface TriggerResult {
command: Record<string, unknown>;
}
interface NativeCloseoutWaitResult {
ok: boolean;
completed: boolean;
timedOut: boolean;
polls: number;
elapsedMs: number;
refresh: Record<string, unknown> | null;
summary: Record<string, unknown> | null;
statusAuthority: "k8s-native";
parsedDownstreamCliOutput: false;
}
interface FollowerState {
id: string;
adapter: string;
@@ -1027,13 +1039,16 @@ async function decideAndMaybeTrigger(
}
if (options.confirm && (phase === "PendingTrigger" || phase === "Superseded" || (phase === "Observed" && observedSha !== null))) {
const trigger = await executeTrigger(follower, observedSha, options);
const trigger = await executeTrigger(registry, follower, observedSha, options);
triggerCommand = trigger.command;
phase = trigger.ok ? (options.wait || options.controller ? "ClosingOut" : "Triggering") : "Failed";
decision = trigger.ok ? `trigger submitted for ${shortSha(observedSha)}` : `trigger failed for ${shortSha(observedSha)}`;
inFlightJob = trigger.jobId ?? live.inFlightJob;
lastTriggeredSha = observedSha;
if (trigger.ok && options.wait && trigger.completed) {
phase = "Succeeded";
decision = `trigger completed for ${shortSha(observedSha)}`;
inFlightJob = null;
targetSha = observedSha;
lastSucceededSha = observedSha;
}
@@ -1041,6 +1056,7 @@ async function decideAndMaybeTrigger(
}
if (options.dryRun && phase === "PendingTrigger") decision = `${decision}; dry-run did not trigger`;
const statePipelineRun = stringOrNull(triggerCommand?.pipelineRun) ?? live.pipelineRun;
return {
id: follower.id,
@@ -1063,7 +1079,7 @@ async function decideAndMaybeTrigger(
},
lastTriggeredSha,
lastSucceededSha,
pipelineRun: live.pipelineRun,
pipelineRun: statePipelineRun,
inFlightJob,
budgetSource: follower.budgets,
controller: {
@@ -1085,11 +1101,11 @@ async function decideAndMaybeTrigger(
};
}
async function executeTrigger(follower: FollowerSpec, observedSha: string | null, options: ParsedOptions): Promise<TriggerResult> {
async function executeTrigger(registry: BranchFollowerRegistry, follower: FollowerSpec, observedSha: string | null, options: ParsedOptions): Promise<TriggerResult> {
const spec = follower.commands.trigger;
const timeoutSeconds = options.timeoutSeconds ?? spec.timeoutSeconds;
if (follower.adapter === "web-probe-sentinel-cicd" && options.controller) {
return executeNativeSentinelTrigger(follower, observedSha, options, timeoutSeconds);
return await executeNativeSentinelTrigger(registry, follower, observedSha, options, timeoutSeconds);
}
if (!options.wait && !options.controller) {
const job = startJob(`cicd_branch_follower_${safeJobSegment(follower.id)}`, spec.argv, `Trigger ${follower.id} for observed sha ${observedSha ?? "unknown"}`);
@@ -1116,7 +1132,7 @@ async function executeTrigger(follower: FollowerSpec, observedSha: string | null
};
}
function executeNativeSentinelTrigger(follower: FollowerSpec, observedSha: string | null, options: ParsedOptions, timeoutSeconds: number): TriggerResult {
async function executeNativeSentinelTrigger(registry: BranchFollowerRegistry, follower: FollowerSpec, observedSha: string | null, options: ParsedOptions, timeoutSeconds: number): Promise<TriggerResult> {
if (observedSha === null) {
return {
ok: false,
@@ -1146,21 +1162,31 @@ function executeNativeSentinelTrigger(follower: FollowerSpec, observedSha: strin
const pipelineRun = sentinelPipelineRunName(state, false);
const namespace = stringField(recordField(state.cicd, "builder", `${follower.id}.sentinel.cicd`), "namespace", `${follower.id}.sentinel.cicd.builder`);
const manifest = sentinelPublishPipelineRunManifest(state, pipelineRun, true);
const startedAt = Date.now();
const result = runNativeTektonPipelineRun(namespace, pipelineRun, manifest, options.wait, timeoutSeconds);
const payload = parseJsonObject(result.stdout) ?? {};
const pipelineRunCompleted = payload.completed === true;
const failed = payload.failed === true || result.exitCode !== 0;
const stillRunning = payload.stillRunning === true || payload.timedOutWait === true;
const remainingSeconds = Math.max(1, timeoutSeconds - Math.ceil((Date.now() - startedAt) / 1000));
const closeout = !failed && options.wait && pipelineRunCompleted
? await waitNativeSentinelCloseout(registry, follower, observedSha, options, remainingSeconds)
: null;
const message = failed
? tailText(result.stderr || stringOrNull(payload.message) || result.stdout, 500)
: pipelineRunCompleted
? `native sentinel PipelineRun ${pipelineRun} succeeded; runtime closeout remains k8s-native`
: closeout?.completed === true
? `native sentinel PipelineRun ${pipelineRun} succeeded and runtime reached ${shortSha(observedSha)}`
: closeout?.timedOut === true
? `native sentinel PipelineRun ${pipelineRun} succeeded but runtime closeout did not converge within budget`
: pipelineRunCompleted
? `native sentinel PipelineRun ${pipelineRun} succeeded; runtime closeout remains k8s-native`
: stillRunning
? `native sentinel PipelineRun ${pipelineRun} is still running; query status/events/logs for closeout`
: `native sentinel PipelineRun ${pipelineRun} submitted`;
const ok = !failed && (closeout === null || closeout.completed === true);
return {
ok: !failed,
completed: false,
ok,
completed: closeout?.completed === true,
message,
jobId: pipelineRun,
command: {
@@ -1173,6 +1199,7 @@ function executeNativeSentinelTrigger(follower: FollowerSpec, observedSha: strin
wait: options.wait,
pipelineRunCompleted,
stillRunning,
closeout,
statusAuthority: "kubernetes-api-serviceaccount",
parsedDownstreamCliOutput: false,
payload,
@@ -1183,6 +1210,111 @@ function executeNativeSentinelTrigger(follower: FollowerSpec, observedSha: strin
};
}
async function waitNativeSentinelCloseout(
registry: BranchFollowerRegistry,
follower: FollowerSpec,
observedSha: string,
options: ParsedOptions,
timeoutSeconds: number,
): Promise<NativeCloseoutWaitResult> {
const refreshResult = follower.nativeStatus.argo === null ? null : runNativeArgoRefresh(follower.nativeStatus.argo);
const startedAt = Date.now();
const deadline = startedAt + Math.max(1, timeoutSeconds) * 1000;
let polls = 0;
let latest: AdapterSummary | null = null;
while (Date.now() <= deadline) {
polls += 1;
const remainingSeconds = Math.max(1, Math.ceil((deadline - Date.now()) / 1000));
latest = await readAdapterStatus(registry, follower, { ...options, timeoutSeconds: Math.min(10, remainingSeconds) });
if (latest.observedSha === observedSha && latest.aligned === true) {
return {
ok: true,
completed: true,
timedOut: false,
polls,
elapsedMs: Date.now() - startedAt,
refresh: refreshResult === null ? null : commandCompact(refreshResult, options),
summary: nativeCloseoutSummary(latest),
statusAuthority: "k8s-native",
parsedDownstreamCliOutput: false,
};
}
if (Date.now() + 2_000 > deadline) break;
runCommand(["sleep", "2"], repoRoot, { timeoutMs: 3_000 });
}
return {
ok: false,
completed: false,
timedOut: true,
polls,
elapsedMs: Date.now() - startedAt,
refresh: refreshResult === null ? null : commandCompact(refreshResult, options),
summary: latest === null ? null : nativeCloseoutSummary(latest),
statusAuthority: "k8s-native",
parsedDownstreamCliOutput: false,
};
}
function nativeCloseoutSummary(live: AdapterSummary): Record<string, unknown> {
const payload = asOptionalRecord(live.payload);
return {
ok: live.ok,
phase: live.phase,
observedSha: live.observedSha,
targetSha: live.targetSha,
aligned: live.aligned,
pipelineRun: live.pipelineRun,
pipelineRunPresent: live.pipelineRunPresent,
message: live.message,
tekton: asOptionalRecord(payload?.tekton),
argo: asOptionalRecord(payload?.argo),
runtime: asOptionalRecord(payload?.runtime),
errors: Array.isArray(payload?.errors) ? payload.errors.slice(0, 5) : [],
};
}
function runNativeArgoRefresh(argo: NonNullable<NativeStatusSpec["argo"]>): CommandResult {
const patchBase64 = Buffer.from(JSON.stringify({
metadata: {
annotations: {
"argocd.argoproj.io/refresh": "hard",
},
},
}), "utf8").toString("base64");
const script = [
"set -eu",
`ARGO_NAMESPACE=${shQuote(argo.namespace)}`,
`ARGO_APPLICATION=${shQuote(argo.application)}`,
`PATCH_B64=${shQuote(patchBase64)}`,
"export ARGO_NAMESPACE ARGO_APPLICATION PATCH_B64",
"node <<'NODE_ARGO_REFRESH'",
"import { readFileSync } from 'node:fs';",
"import https from 'node:https';",
"const namespace = process.env.ARGO_NAMESPACE || '';",
"const application = process.env.ARGO_APPLICATION || '';",
"const patch = Buffer.from(process.env.PATCH_B64 || '', 'base64').toString('utf8');",
"const host = process.env.KUBERNETES_SERVICE_HOST;",
"const port = Number(process.env.KUBERNETES_SERVICE_PORT || '443');",
"const token = readFileSync('/var/run/secrets/kubernetes.io/serviceaccount/token', 'utf8').trim();",
"const ca = readFileSync('/var/run/secrets/kubernetes.io/serviceaccount/ca.crt');",
"const path = `/apis/argoproj.io/v1alpha1/namespaces/${encodeURIComponent(namespace)}/applications/${encodeURIComponent(application)}`;",
"const req = https.request({ host, port, path, method: 'PATCH', ca, headers: { authorization: `Bearer ${token}`, 'content-type': 'application/merge-patch+json', 'content-length': Buffer.byteLength(patch) } }, (res) => {",
" let body = '';",
" res.setEncoding('utf8');",
" res.on('data', (chunk) => { body += chunk; });",
" res.on('end', () => {",
" if ((res.statusCode || 0) < 200 || (res.statusCode || 0) >= 300) { process.stderr.write(body || `kube api PATCH argo application status ${res.statusCode}`); process.exit(1); }",
" process.stdout.write(JSON.stringify({ ok: true, namespace, application, refresh: 'hard', statusAuthority: 'kubernetes-api-serviceaccount', parsedDownstreamCliOutput: false, valuesRedacted: true }));",
" });",
"});",
"req.on('error', (error) => { process.stderr.write(error?.message || String(error)); process.exit(1); });",
"req.write(patch);",
"req.end();",
"NODE_ARGO_REFRESH",
].join("\n");
return runCommand(["sh", "-lc", script], repoRoot, { timeoutMs: 10_000 });
}
function runNativeTektonPipelineRun(namespace: string, pipelineRun: string, manifest: Record<string, unknown>, wait: boolean, timeoutSeconds: number): CommandResult {
const manifestJson = JSON.stringify(manifest);
const manifestBase64 = Buffer.from(manifestJson, "utf8").toString("base64");
@@ -2187,7 +2319,7 @@ function renderControllerManifests(registry: BranchFollowerRegistry): Record<str
{ apiGroups: ["apps"], resources: ["deployments", "statefulsets"], verbs: ["get", "list", "watch"] },
{ apiGroups: ["tekton.dev"], resources: ["pipelineruns"], verbs: ["get", "list", "watch", "create"] },
{ apiGroups: ["tekton.dev"], resources: ["taskruns"], verbs: ["get", "list", "watch"] },
{ apiGroups: ["argoproj.io"], resources: ["applications"], verbs: ["get", "list", "watch"] },
{ apiGroups: ["argoproj.io"], resources: ["applications"], verbs: ["get", "list", "watch", "patch"] },
],
},
{