From 0136b5da7dd7f2f0c8ee42b0914ff2b6789b962b Mon Sep 17 00:00:00 2001 From: Codex Date: Fri, 3 Jul 2026 08:43:10 +0000 Subject: [PATCH] fix: wait for native sentinel closeout --- scripts/src/cicd.ts | 152 +++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 142 insertions(+), 10 deletions(-) diff --git a/scripts/src/cicd.ts b/scripts/src/cicd.ts index 4455adae..035b77d6 100644 --- a/scripts/src/cicd.ts +++ b/scripts/src/cicd.ts @@ -240,6 +240,18 @@ interface TriggerResult { command: Record; } +interface NativeCloseoutWaitResult { + ok: boolean; + completed: boolean; + timedOut: boolean; + polls: number; + elapsedMs: number; + refresh: Record | null; + summary: Record | null; + statusAuthority: "k8s-native"; + parsedDownstreamCliOutput: false; +} + interface FollowerState { id: string; adapter: string; @@ -1027,13 +1039,16 @@ async function decideAndMaybeTrigger( } if (options.confirm && (phase === "PendingTrigger" || phase === "Superseded" || (phase === "Observed" && observedSha !== null))) { - const trigger = await executeTrigger(follower, observedSha, options); + const trigger = await executeTrigger(registry, follower, observedSha, options); triggerCommand = trigger.command; phase = trigger.ok ? (options.wait || options.controller ? "ClosingOut" : "Triggering") : "Failed"; decision = trigger.ok ? `trigger submitted for ${shortSha(observedSha)}` : `trigger failed for ${shortSha(observedSha)}`; inFlightJob = trigger.jobId ?? live.inFlightJob; lastTriggeredSha = observedSha; if (trigger.ok && options.wait && trigger.completed) { + phase = "Succeeded"; + decision = `trigger completed for ${shortSha(observedSha)}`; + inFlightJob = null; targetSha = observedSha; lastSucceededSha = observedSha; } @@ -1041,6 +1056,7 @@ async function decideAndMaybeTrigger( } if (options.dryRun && phase === "PendingTrigger") decision = `${decision}; dry-run did not trigger`; + const statePipelineRun = stringOrNull(triggerCommand?.pipelineRun) ?? live.pipelineRun; return { id: follower.id, @@ -1063,7 +1079,7 @@ async function decideAndMaybeTrigger( }, lastTriggeredSha, lastSucceededSha, - pipelineRun: live.pipelineRun, + pipelineRun: statePipelineRun, inFlightJob, budgetSource: follower.budgets, controller: { @@ -1085,11 +1101,11 @@ async function decideAndMaybeTrigger( }; } -async function executeTrigger(follower: FollowerSpec, observedSha: string | null, options: ParsedOptions): Promise { +async function executeTrigger(registry: BranchFollowerRegistry, follower: FollowerSpec, observedSha: string | null, options: ParsedOptions): Promise { const spec = follower.commands.trigger; const timeoutSeconds = options.timeoutSeconds ?? spec.timeoutSeconds; if (follower.adapter === "web-probe-sentinel-cicd" && options.controller) { - return executeNativeSentinelTrigger(follower, observedSha, options, timeoutSeconds); + return await executeNativeSentinelTrigger(registry, follower, observedSha, options, timeoutSeconds); } if (!options.wait && !options.controller) { const job = startJob(`cicd_branch_follower_${safeJobSegment(follower.id)}`, spec.argv, `Trigger ${follower.id} for observed sha ${observedSha ?? "unknown"}`); @@ -1116,7 +1132,7 @@ async function executeTrigger(follower: FollowerSpec, observedSha: string | null }; } -function executeNativeSentinelTrigger(follower: FollowerSpec, observedSha: string | null, options: ParsedOptions, timeoutSeconds: number): TriggerResult { +async function executeNativeSentinelTrigger(registry: BranchFollowerRegistry, follower: FollowerSpec, observedSha: string | null, options: ParsedOptions, timeoutSeconds: number): Promise { if (observedSha === null) { return { ok: false, @@ -1146,21 +1162,31 @@ function executeNativeSentinelTrigger(follower: FollowerSpec, observedSha: strin const pipelineRun = sentinelPipelineRunName(state, false); const namespace = stringField(recordField(state.cicd, "builder", `${follower.id}.sentinel.cicd`), "namespace", `${follower.id}.sentinel.cicd.builder`); const manifest = sentinelPublishPipelineRunManifest(state, pipelineRun, true); + const startedAt = Date.now(); const result = runNativeTektonPipelineRun(namespace, pipelineRun, manifest, options.wait, timeoutSeconds); const payload = parseJsonObject(result.stdout) ?? {}; const pipelineRunCompleted = payload.completed === true; const failed = payload.failed === true || result.exitCode !== 0; const stillRunning = payload.stillRunning === true || payload.timedOutWait === true; + const remainingSeconds = Math.max(1, timeoutSeconds - Math.ceil((Date.now() - startedAt) / 1000)); + const closeout = !failed && options.wait && pipelineRunCompleted + ? await waitNativeSentinelCloseout(registry, follower, observedSha, options, remainingSeconds) + : null; const message = failed ? tailText(result.stderr || stringOrNull(payload.message) || result.stdout, 500) - : pipelineRunCompleted - ? `native sentinel PipelineRun ${pipelineRun} succeeded; runtime closeout remains k8s-native` + : closeout?.completed === true + ? `native sentinel PipelineRun ${pipelineRun} succeeded and runtime reached ${shortSha(observedSha)}` + : closeout?.timedOut === true + ? `native sentinel PipelineRun ${pipelineRun} succeeded but runtime closeout did not converge within budget` + : pipelineRunCompleted + ? `native sentinel PipelineRun ${pipelineRun} succeeded; runtime closeout remains k8s-native` : stillRunning ? `native sentinel PipelineRun ${pipelineRun} is still running; query status/events/logs for closeout` : `native sentinel PipelineRun ${pipelineRun} submitted`; + const ok = !failed && (closeout === null || closeout.completed === true); return { - ok: !failed, - completed: false, + ok, + completed: closeout?.completed === true, message, jobId: pipelineRun, command: { @@ -1173,6 +1199,7 @@ function executeNativeSentinelTrigger(follower: FollowerSpec, observedSha: strin wait: options.wait, pipelineRunCompleted, stillRunning, + closeout, statusAuthority: "kubernetes-api-serviceaccount", parsedDownstreamCliOutput: false, payload, @@ -1183,6 +1210,111 @@ function executeNativeSentinelTrigger(follower: FollowerSpec, observedSha: strin }; } +async function waitNativeSentinelCloseout( + registry: BranchFollowerRegistry, + follower: FollowerSpec, + observedSha: string, + options: ParsedOptions, + timeoutSeconds: number, +): Promise { + const refreshResult = follower.nativeStatus.argo === null ? null : runNativeArgoRefresh(follower.nativeStatus.argo); + const startedAt = Date.now(); + const deadline = startedAt + Math.max(1, timeoutSeconds) * 1000; + let polls = 0; + let latest: AdapterSummary | null = null; + while (Date.now() <= deadline) { + polls += 1; + const remainingSeconds = Math.max(1, Math.ceil((deadline - Date.now()) / 1000)); + latest = await readAdapterStatus(registry, follower, { ...options, timeoutSeconds: Math.min(10, remainingSeconds) }); + if (latest.observedSha === observedSha && latest.aligned === true) { + return { + ok: true, + completed: true, + timedOut: false, + polls, + elapsedMs: Date.now() - startedAt, + refresh: refreshResult === null ? null : commandCompact(refreshResult, options), + summary: nativeCloseoutSummary(latest), + statusAuthority: "k8s-native", + parsedDownstreamCliOutput: false, + }; + } + if (Date.now() + 2_000 > deadline) break; + runCommand(["sleep", "2"], repoRoot, { timeoutMs: 3_000 }); + } + return { + ok: false, + completed: false, + timedOut: true, + polls, + elapsedMs: Date.now() - startedAt, + refresh: refreshResult === null ? null : commandCompact(refreshResult, options), + summary: latest === null ? null : nativeCloseoutSummary(latest), + statusAuthority: "k8s-native", + parsedDownstreamCliOutput: false, + }; +} + +function nativeCloseoutSummary(live: AdapterSummary): Record { + const payload = asOptionalRecord(live.payload); + return { + ok: live.ok, + phase: live.phase, + observedSha: live.observedSha, + targetSha: live.targetSha, + aligned: live.aligned, + pipelineRun: live.pipelineRun, + pipelineRunPresent: live.pipelineRunPresent, + message: live.message, + tekton: asOptionalRecord(payload?.tekton), + argo: asOptionalRecord(payload?.argo), + runtime: asOptionalRecord(payload?.runtime), + errors: Array.isArray(payload?.errors) ? payload.errors.slice(0, 5) : [], + }; +} + +function runNativeArgoRefresh(argo: NonNullable): CommandResult { + const patchBase64 = Buffer.from(JSON.stringify({ + metadata: { + annotations: { + "argocd.argoproj.io/refresh": "hard", + }, + }, + }), "utf8").toString("base64"); + const script = [ + "set -eu", + `ARGO_NAMESPACE=${shQuote(argo.namespace)}`, + `ARGO_APPLICATION=${shQuote(argo.application)}`, + `PATCH_B64=${shQuote(patchBase64)}`, + "export ARGO_NAMESPACE ARGO_APPLICATION PATCH_B64", + "node <<'NODE_ARGO_REFRESH'", + "import { readFileSync } from 'node:fs';", + "import https from 'node:https';", + "const namespace = process.env.ARGO_NAMESPACE || '';", + "const application = process.env.ARGO_APPLICATION || '';", + "const patch = Buffer.from(process.env.PATCH_B64 || '', 'base64').toString('utf8');", + "const host = process.env.KUBERNETES_SERVICE_HOST;", + "const port = Number(process.env.KUBERNETES_SERVICE_PORT || '443');", + "const token = readFileSync('/var/run/secrets/kubernetes.io/serviceaccount/token', 'utf8').trim();", + "const ca = readFileSync('/var/run/secrets/kubernetes.io/serviceaccount/ca.crt');", + "const path = `/apis/argoproj.io/v1alpha1/namespaces/${encodeURIComponent(namespace)}/applications/${encodeURIComponent(application)}`;", + "const req = https.request({ host, port, path, method: 'PATCH', ca, headers: { authorization: `Bearer ${token}`, 'content-type': 'application/merge-patch+json', 'content-length': Buffer.byteLength(patch) } }, (res) => {", + " let body = '';", + " res.setEncoding('utf8');", + " res.on('data', (chunk) => { body += chunk; });", + " res.on('end', () => {", + " if ((res.statusCode || 0) < 200 || (res.statusCode || 0) >= 300) { process.stderr.write(body || `kube api PATCH argo application status ${res.statusCode}`); process.exit(1); }", + " process.stdout.write(JSON.stringify({ ok: true, namespace, application, refresh: 'hard', statusAuthority: 'kubernetes-api-serviceaccount', parsedDownstreamCliOutput: false, valuesRedacted: true }));", + " });", + "});", + "req.on('error', (error) => { process.stderr.write(error?.message || String(error)); process.exit(1); });", + "req.write(patch);", + "req.end();", + "NODE_ARGO_REFRESH", + ].join("\n"); + return runCommand(["sh", "-lc", script], repoRoot, { timeoutMs: 10_000 }); +} + function runNativeTektonPipelineRun(namespace: string, pipelineRun: string, manifest: Record, wait: boolean, timeoutSeconds: number): CommandResult { const manifestJson = JSON.stringify(manifest); const manifestBase64 = Buffer.from(manifestJson, "utf8").toString("base64"); @@ -2187,7 +2319,7 @@ function renderControllerManifests(registry: BranchFollowerRegistry): Record