cicd wait native closeout for in-flight followers

This commit is contained in:
Codex
2026-07-03 11:48:59 +00:00
parent 75f205a539
commit 8162f2f999
2 changed files with 45 additions and 5 deletions
@@ -70,6 +70,8 @@ Do not backfill, infer, or migrate old branch-follower state when historical tim
If a deterministic Kubernetes Job or PipelineRun is reused and there is no already-stored `timings.startedAt`, the reused object's current wait/check duration is only a stage observation; it must not be promoted to `timings.totalSeconds`. If a deterministic Kubernetes Job or PipelineRun is reused and there is no already-stored `timings.startedAt`, the reused object's current wait/check duration is only a stage observation; it must not be promoted to `timings.totalSeconds`.
When `run-once --confirm --wait` resumes a source change that is already `ClosingOut`, the CLI may wait for native closeout and report a `closeout` stage duration. That closeout-only wait is not the end-to-end total unless the stored state already contains a valid `timings.startedAt`.
State machine phases are `Observed`, `Noop`, `PendingTrigger`, `Triggering`, `ClosingOut`, `Succeeded`, `Failed`, `Superseded`, `Blocked`, and `Skipped`. State machine phases are `Observed`, `Noop`, `PendingTrigger`, `Triggering`, `ClosingOut`, `Succeeded`, `Failed`, `Superseded`, `Blocked`, and `Skipped`.
Status and decision inputs are Kubernetes-native: Status and decision inputs are Kubernetes-native:
+43 -5
View File
@@ -792,6 +792,22 @@ async function decideAndMaybeTrigger(
} }
if (!trigger.ok) warnings.push(trigger.message); if (!trigger.ok) warnings.push(trigger.message);
} }
if (options.confirm && options.wait && phase === "ClosingOut" && observedSha !== null && triggerCommand === undefined) {
const closeout = await waitNativeFollowerCloseout(registry, follower, observedSha, options, options.timeoutSeconds ?? follower.budgets.endToEndSeconds);
triggerCommand = closeoutOnlyCommand(follower, live.pipelineRun, observedSha, closeout);
if (closeout.completed) {
phase = "Succeeded";
decision = `closeout completed for ${shortSha(observedSha)}`;
inFlightJob = null;
targetSha = observedSha;
lastSucceededSha = observedSha;
} else {
decision = closeout.timedOut
? `closeout did not converge for ${shortSha(observedSha)} within budget`
: `closeout remains pending for ${shortSha(observedSha)}`;
warnings.push(decision);
}
}
if (options.dryRun && phase === "PendingTrigger") decision = `${decision}; dry-run did not trigger`; if (options.dryRun && phase === "PendingTrigger") decision = `${decision}; dry-run did not trigger`;
const statePipelineRun = stringOrNull(triggerCommand?.pipelineRun) ?? live.pipelineRun; const statePipelineRun = stringOrNull(triggerCommand?.pipelineRun) ?? live.pipelineRun;
@@ -828,7 +844,7 @@ async function decideAndMaybeTrigger(
decision, decision,
dryRun: options.dryRun, dryRun: options.dryRun,
updatedAt: new Date().toISOString(), updatedAt: new Date().toISOString(),
timings: buildFollowerTimings(follower, live, triggerCommand, null, phase), timings: buildFollowerTimings(follower, live, triggerCommand, asOptionalRecord(previous.timings), phase),
warnings, warnings,
next: followerNextCommands(follower), next: followerNextCommands(follower),
command: triggerCommand ?? { command: triggerCommand ?? {
@@ -1286,6 +1302,23 @@ async function waitNativeFollowerCloseout(
return await waitNativeSentinelCloseout(registry, follower, observedSha, options, timeoutSeconds); return await waitNativeSentinelCloseout(registry, follower, observedSha, options, timeoutSeconds);
} }
function closeoutOnlyCommand(follower: FollowerSpec, pipelineRun: string | null, observedSha: string, closeout: NativeCloseoutWaitResult): Record<string, unknown> {
return {
mode: "k8s-native-closeout",
adapter: follower.adapter,
pipelineRun,
sourceCommit: observedSha,
wait: true,
closeout,
finishedAt: closeout.completed || closeout.timedOut ? new Date().toISOString() : null,
elapsedMs: closeout.elapsedMs,
exitCode: closeout.completed ? 0 : 1,
timedOut: closeout.timedOut,
statusAuthority: "k8s-native",
parsedDownstreamCliOutput: false,
};
}
function nativeCloseoutSummary(live: AdapterSummary): Record<string, unknown> { function nativeCloseoutSummary(live: AdapterSummary): Record<string, unknown> {
const payload = asOptionalRecord(live.payload); const payload = asOptionalRecord(live.payload);
return { return {
@@ -2087,7 +2120,7 @@ function buildFollowerTimings(
phase?: BranchFollowerPhase, phase?: BranchFollowerPhase,
): FollowerState["timings"] { ): FollowerState["timings"] {
const nativePayload = asOptionalRecord(live.payload); const nativePayload = asOptionalRecord(live.payload);
const total = totalTimingFromCommand(triggerCommand, phase) ?? totalTimingFromStored(storedTimings, phase); const total = totalTimingFromCommand(triggerCommand, phase) ?? totalTimingFromStored(storedTimings, phase, stringOrNull(triggerCommand?.finishedAt));
const stages = dedupeTimingStages([ const stages = dedupeTimingStages([
...stageTimingsFromCommand(triggerCommand), ...stageTimingsFromCommand(triggerCommand),
...stageTimingsFromNativePayload(nativePayload), ...stageTimingsFromNativePayload(nativePayload),
@@ -2126,6 +2159,7 @@ function compactTimings(timings: FollowerState["timings"]): FollowerState["timin
function totalTimingFromCommand(command: Record<string, unknown> | undefined, phase?: BranchFollowerPhase): { seconds: number; status: string; source: string; startedAt: string | null; finishedAt: string | null } | null { function totalTimingFromCommand(command: Record<string, unknown> | undefined, phase?: BranchFollowerPhase): { seconds: number; status: string; source: string; startedAt: string | null; finishedAt: string | null } | null {
if (command === undefined) return null; if (command === undefined) return null;
if (command.mode === "k8s-native-closeout") return null;
const payload = asOptionalRecord(command.payload); const payload = asOptionalRecord(command.payload);
if (payload?.reused === true) return null; if (payload?.reused === true) return null;
const startedAt = stringOrNull(command.startedAt); const startedAt = stringOrNull(command.startedAt);
@@ -2150,18 +2184,18 @@ function totalTimingFromCommand(command: Record<string, unknown> | undefined, ph
return { seconds, status, source: stringOrNull(command.mode) ?? stringOrNull(command.status) ?? "command", startedAt, finishedAt }; return { seconds, status, source: stringOrNull(command.mode) ?? stringOrNull(command.status) ?? "command", startedAt, finishedAt };
} }
function totalTimingFromStored(storedTimings: Record<string, unknown> | null | undefined, phase?: BranchFollowerPhase): { seconds: number; status: string; source: string; startedAt: string | null; finishedAt: string | null } | null { function totalTimingFromStored(storedTimings: Record<string, unknown> | null | undefined, phase?: BranchFollowerPhase, finishOverride?: string | null): { seconds: number; status: string; source: string; startedAt: string | null; finishedAt: string | null } | null {
if (storedTimings === null || storedTimings === undefined) return null; if (storedTimings === null || storedTimings === undefined) return null;
const status = stringOrNull(storedTimings.totalStatus); const status = stringOrNull(storedTimings.totalStatus);
const source = stringOrNull(storedTimings.totalSource); const source = stringOrNull(storedTimings.totalSource);
const startedAt = stringOrNull(storedTimings.startedAt); const startedAt = stringOrNull(storedTimings.startedAt);
const finishedAt = stringOrNull(storedTimings.finishedAt); const finishedAt = stringOrNull(storedTimings.finishedAt) ?? finishOverride ?? null;
if (phase === "Noop" && finishedAt === null) return null; if (phase === "Noop" && finishedAt === null) return null;
const seconds = totalSecondsFromRange(startedAt, finishedAt) ?? numberOrNull(storedTimings.totalSeconds); const seconds = totalSecondsFromRange(startedAt, finishedAt) ?? numberOrNull(storedTimings.totalSeconds);
if (seconds === null) return null; if (seconds === null) return null;
return { return {
seconds, seconds,
status: finishedAt === null && phase !== undefined && !terminalPhase(phase) ? phase.toLowerCase() : status ?? "recorded", status: finishedAt === null && phase !== undefined && !terminalPhase(phase) ? phase.toLowerCase() : phase === undefined ? status ?? "recorded" : phase.toLowerCase(),
source: source ?? "stored-state", source: source ?? "stored-state",
startedAt, startedAt,
finishedAt, finishedAt,
@@ -2181,6 +2215,10 @@ function timestampMs(value: string | null): number | null {
return Number.isFinite(parsed) ? parsed : null; return Number.isFinite(parsed) ? parsed : null;
} }
function terminalPhase(phase: BranchFollowerPhase): boolean {
return phase === "Succeeded" || phase === "Failed" || phase === "Blocked" || phase === "Skipped" || phase === "Noop";
}
function stageTimingsFromNativePayload(payload: Record<string, unknown> | null): StageTiming[] { function stageTimingsFromNativePayload(payload: Record<string, unknown> | null): StageTiming[] {
if (payload === null) return []; if (payload === null) return [];
const stages: StageTiming[] = []; const stages: StageTiming[] = [];