From b556bb3647339bc99123da634b786e1043e58515 Mon Sep 17 00:00:00 2001 From: Codex Date: Fri, 3 Jul 2026 07:34:58 +0000 Subject: [PATCH] fix: trigger sentinel follower via k8s api --- scripts/src/cicd.ts | 231 +++++++++++++++++- .../src/hwlab-node-web-sentinel-cicd-jobs.ts | 2 +- scripts/src/hwlab-node-web-sentinel-cicd.ts | 47 +++- 3 files changed, 272 insertions(+), 8 deletions(-) diff --git a/scripts/src/cicd.ts b/scripts/src/cicd.ts index 7f62da88..2d425db8 100644 --- a/scripts/src/cicd.ts +++ b/scripts/src/cicd.ts @@ -7,6 +7,10 @@ import { repoRoot, rootPath, type UniDeskConfig } from "./config"; import { runCommand, type CommandResult } from "./command"; import { startJob } from "./jobs"; import type { RenderedCliResult } from "./output"; +import { hwlabRuntimeLaneSpecForNode } from "./hwlab-node-lanes"; +import { loadSentinelCicdState } from "./hwlab-node-web-sentinel-cicd"; +import { sentinelPublishPipelineRunManifest } from "./hwlab-node-web-sentinel-cicd-jobs"; +import { sentinelPipelineRunName } from "./hwlab-node-web-sentinel-cicd-shared"; import { transPath } from "./hwlab-node/runtime-common"; import { configRefGraph, resolveConfigRefString } from "./ops/config-refs"; import { @@ -227,6 +231,14 @@ interface NativeObjectBundle { stderrTail: string; } +interface TriggerResult { + ok: boolean; + completed: boolean; + message: string; + jobId: string | null; + command: Record; +} + interface FollowerState { id: string; adapter: string; @@ -931,7 +943,7 @@ async function decideAndMaybeTrigger( decision = trigger.ok ? `trigger submitted for ${shortSha(observedSha)}` : `trigger failed for ${shortSha(observedSha)}`; inFlightJob = trigger.jobId ?? live.inFlightJob; lastTriggeredSha = observedSha; - if (trigger.ok && options.wait) { + if (trigger.ok && options.wait && trigger.completed) { targetSha = observedSha; lastSucceededSha = observedSha; } @@ -982,13 +994,17 @@ async function decideAndMaybeTrigger( }; } -async function executeTrigger(follower: FollowerSpec, observedSha: string | null, options: ParsedOptions): Promise<{ ok: boolean; message: string; jobId: string | null; command: Record }> { +async function executeTrigger(follower: FollowerSpec, observedSha: string | null, options: ParsedOptions): Promise { const spec = follower.commands.trigger; const timeoutSeconds = options.timeoutSeconds ?? spec.timeoutSeconds; + if (follower.adapter === "web-probe-sentinel-cicd" && options.controller) { + return executeNativeSentinelTrigger(follower, observedSha, options, timeoutSeconds); + } if (!options.wait && !options.controller) { const job = startJob(`cicd_branch_follower_${safeJobSegment(follower.id)}`, spec.argv, `Trigger ${follower.id} for observed sha ${observedSha ?? "unknown"}`); return { ok: true, + completed: false, message: `started async job ${job.id}`, jobId: job.id, command: { @@ -1002,12 +1018,217 @@ async function executeTrigger(follower: FollowerSpec, observedSha: string | null const result = runCommand(spec.argv, repoRoot, { timeoutMs: timeoutSeconds * 1000 }); return { ok: result.exitCode === 0, + completed: result.exitCode === 0 && options.wait, message: result.exitCode === 0 ? "trigger command completed" : tailText(result.stderr || result.stdout, 500), jobId: null, command: commandCompact(result, options), }; } +function executeNativeSentinelTrigger(follower: FollowerSpec, observedSha: string | null, options: ParsedOptions, timeoutSeconds: number): TriggerResult { + if (observedSha === null) { + return { + ok: false, + completed: false, + message: "native sentinel trigger requires observed source sha", + jobId: null, + command: { mode: "k8s-native-tekton", ok: false, reason: "observed-sha-missing", parsedDownstreamCliOutput: false }, + }; + } + if (follower.target.sentinel === null) { + return { + ok: false, + completed: false, + message: "native sentinel trigger requires target.sentinel", + jobId: null, + command: { mode: "k8s-native-tekton", ok: false, reason: "target-sentinel-missing", parsedDownstreamCliOutput: false }, + }; + } + const spec = hwlabRuntimeLaneSpecForNode(follower.target.lane, follower.target.node); + const stageRef = `${follower.source.snapshotPrefix.replace(/\/+$/u, "")}/${observedSha}`; + const state = loadSentinelCicdState(spec, follower.target.sentinel, timeoutSeconds, "cached", { + commit: observedSha, + stageRef, + mirrorCommit: observedSha, + sourceAuthority: "git-mirror-snapshot", + }); + const pipelineRun = sentinelPipelineRunName(state, false); + const namespace = stringField(state.cicd, "builder.namespace", `${follower.id}.sentinel.cicd.builder`); + const manifest = sentinelPublishPipelineRunManifest(state, pipelineRun, true); + const result = runNativeTektonPipelineRun(namespace, pipelineRun, manifest, options.wait, timeoutSeconds); + const payload = parseJsonObject(result.stdout) ?? {}; + const pipelineRunCompleted = payload.completed === true; + const failed = payload.failed === true || result.exitCode !== 0; + const stillRunning = payload.stillRunning === true || payload.timedOutWait === true; + const message = failed + ? tailText(result.stderr || stringOrNull(payload.message) || result.stdout, 500) + : pipelineRunCompleted + ? `native sentinel PipelineRun ${pipelineRun} succeeded; runtime closeout remains k8s-native` + : stillRunning + ? `native sentinel PipelineRun ${pipelineRun} is still running; query status/events/logs for closeout` + : `native sentinel PipelineRun ${pipelineRun} submitted`; + return { + ok: !failed, + completed: false, + message, + jobId: pipelineRun, + command: { + mode: "k8s-native-tekton", + adapter: follower.adapter, + namespace, + pipelineRun, + sourceCommit: observedSha, + sourceStageRef: stageRef, + wait: options.wait, + pipelineRunCompleted, + stillRunning, + statusAuthority: "kubernetes-api-serviceaccount", + parsedDownstreamCliOutput: false, + payload, + exitCode: result.exitCode, + timedOut: result.timedOut, + stderrTail: failed ? redactText(tailText(result.stderr, Math.min(options.tailBytes, 4000))) : "", + }, + }; +} + +function runNativeTektonPipelineRun(namespace: string, pipelineRun: string, manifest: Record, wait: boolean, timeoutSeconds: number): CommandResult { + const manifestJson = JSON.stringify(manifest); + const manifestBase64 = Buffer.from(manifestJson, "utf8").toString("base64"); + const nodeScript = nativeTektonPipelineRunNodeScript(); + const script = [ + "set -eu", + "tmpdir=$(mktemp -d)", + "cleanup() { rm -rf \"$tmpdir\"; }", + "trap cleanup EXIT INT TERM", + "cat >\"$tmpdir/manifest.b64\" <<'UNIDESK_SENTINEL_PIPELINERUN_B64'", + manifestBase64, + "UNIDESK_SENTINEL_PIPELINERUN_B64", + `NAMESPACE=${shQuote(namespace)}`, + `PIPELINERUN=${shQuote(pipelineRun)}`, + `WAIT=${wait ? "true" : "false"}`, + `TIMEOUT_SECONDS=${shQuote(String(timeoutSeconds))}`, + "export NAMESPACE PIPELINERUN WAIT TIMEOUT_SECONDS", + "cat >\"$tmpdir/submit-pipelinerun.mjs\" <<'NODE_NATIVE_TEKTON'", + nodeScript, + "NODE_NATIVE_TEKTON", + "node \"$tmpdir/submit-pipelinerun.mjs\" \"$tmpdir/manifest.b64\"", + ].join("\n"); + return runCommand(["sh", "-lc", script], repoRoot, { timeoutMs: Math.max(5, timeoutSeconds + 10) * 1000 }); +} + +function nativeTektonPipelineRunNodeScript(): string { + return [ + "import { readFileSync } from 'node:fs';", + "import https from 'node:https';", + "const manifestPath = process.argv[2];", + "const namespace = process.env.NAMESPACE || '';", + "const pipelineRun = process.env.PIPELINERUN || '';", + "const shouldWait = process.env.WAIT === 'true';", + "const timeoutSeconds = Number(process.env.TIMEOUT_SECONDS || '60');", + "const host = process.env.KUBERNETES_SERVICE_HOST;", + "const port = Number(process.env.KUBERNETES_SERVICE_PORT || '443');", + "const token = readFileSync('/var/run/secrets/kubernetes.io/serviceaccount/token', 'utf8').trim();", + "const ca = readFileSync('/var/run/secrets/kubernetes.io/serviceaccount/ca.crt');", + "const manifest = JSON.parse(Buffer.from(readFileSync(manifestPath, 'utf8').replace(/\\s+/g, ''), 'base64').toString('utf8'));", + "function request(method, path, body, contentType = 'application/json') {", + " return new Promise((resolve, reject) => {", + " const headers = { authorization: `Bearer ${token}` };", + " const payload = body === undefined ? null : typeof body === 'string' ? body : JSON.stringify(body);", + " if (payload !== null) { headers['content-type'] = contentType; headers['content-length'] = Buffer.byteLength(payload); }", + " const req = https.request({ host, port, path, method, ca, headers }, (res) => {", + " let text = '';", + " res.setEncoding('utf8');", + " res.on('data', (chunk) => { text += chunk; });", + " res.on('end', () => resolve({ status: res.statusCode || 0, text }));", + " });", + " req.on('error', reject);", + " if (payload !== null) req.write(payload);", + " req.end();", + " });", + "}", + "function parseBody(result) {", + " if (!result.text) return null;", + " try { return JSON.parse(result.text); } catch { return null; }", + "}", + "async function getPipelineRun() {", + " const result = await request('GET', `/apis/tekton.dev/v1/namespaces/${encodeURIComponent(namespace)}/pipelineruns/${encodeURIComponent(pipelineRun)}`);", + " if (result.status === 404) return { found: false, object: null, status: result.status, text: result.text };", + " if (result.status < 200 || result.status >= 300) throw new Error(result.text || `kube api GET pipelinerun status ${result.status}`);", + " return { found: true, object: parseBody(result), status: result.status, text: result.text };", + "}", + "function succeededCondition(object) {", + " const conditions = Array.isArray(object?.status?.conditions) ? object.status.conditions : [];", + " return conditions.find((item) => item && item.type === 'Succeeded') || null;", + "}", + "function compact(object) {", + " const condition = succeededCondition(object);", + " return {", + " name: object?.metadata?.name || pipelineRun,", + " namespace: object?.metadata?.namespace || namespace,", + " generation: object?.metadata?.generation ?? null,", + " startTime: object?.status?.startTime || null,", + " completionTime: object?.status?.completionTime || null,", + " conditionStatus: condition?.status || null,", + " conditionReason: condition?.reason || null,", + " conditionMessage: condition?.message || null,", + " };", + "}", + "function delay(ms) { return new Promise((resolve) => setTimeout(resolve, ms)); }", + "let created = false;", + "let reused = false;", + "let latest = await getPipelineRun();", + "if (latest.found) {", + " reused = true;", + "} else {", + " const result = await request('POST', `/apis/tekton.dev/v1/namespaces/${encodeURIComponent(namespace)}/pipelineruns`, manifest);", + " if (result.status === 409) {", + " reused = true;", + " } else if (result.status >= 200 && result.status < 300) {", + " created = true;", + " } else {", + " process.stderr.write(result.text || `kube api POST pipelinerun status ${result.status}`);", + " process.exit(1);", + " }", + " latest = await getPipelineRun();", + "}", + "const deadline = Date.now() + Math.max(1, timeoutSeconds) * 1000;", + "let polls = 0;", + "while (shouldWait) {", + " const condition = succeededCondition(latest.object);", + " if (condition?.status === 'True' || condition?.status === 'False') break;", + " if (Date.now() >= deadline) break;", + " polls += 1;", + " process.stderr.write(JSON.stringify({ event: 'cicd.branch-follower.native-tekton.wait', pipelineRun, namespace, polls, conditionStatus: condition?.status || null, valuesRedacted: true }) + '\\n');", + " await delay(2000);", + " latest = await getPipelineRun();", + "}", + "const condition = succeededCondition(latest.object);", + "const completed = condition?.status === 'True';", + "const failed = condition?.status === 'False';", + "const terminal = completed || failed;", + "const output = {", + " ok: !failed,", + " submitted: true,", + " created,", + " reused,", + " wait: shouldWait,", + " polls,", + " completed,", + " failed,", + " terminal,", + " stillRunning: !terminal,", + " timedOutWait: shouldWait && !terminal,", + " pipelineRun: compact(latest.object),", + " statusAuthority: 'kubernetes-api-serviceaccount',", + " parsedDownstreamCliOutput: false,", + " valuesRedacted: true,", + "};", + "process.stdout.write(JSON.stringify(output));", + "if (failed) process.exit(1);", + ].join("\n"); +} + async function readAdapterStatus(registry: BranchFollowerRegistry, follower: FollowerSpec, options: ParsedOptions): Promise { const timeoutSeconds = options.timeoutSeconds ?? follower.budgets.statusSeconds; const bundle = readNativeObjectBundle(registry, follower, options, timeoutSeconds); @@ -1211,8 +1432,9 @@ function readNativeObjectBundle(registry: BranchFollowerRegistry, follower: Foll "fi", "case \"$source_commit\" in", " [0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f])", + " stage_ref=\"${snapshot_prefix%/}/$source_commit\"", " printf 'UNIDESK_NATIVE_JSON\\tsource\\t'", - " printf '{\"commit\":\"%s\",\"branch\":\"%s\",\"mode\":\"k8s-git-mirror-cache\",\"repoPath\":\"%s\"}' \"$source_commit\" \"$branch\" \"$repo_path\" | base64 | tr -d '\\n'", + " printf '{\"commit\":\"%s\",\"branch\":\"%s\",\"stageRef\":\"%s\",\"sourceAuthority\":\"k8s-git-mirror-snapshot\",\"mode\":\"k8s-git-mirror-cache\",\"repoPath\":\"%s\"}' \"$source_commit\" \"$branch\" \"$stage_ref\" \"$repo_path\" | base64 | tr -d '\\n'", " printf '\\n'", " ;;", " *)", @@ -1779,7 +2001,8 @@ function renderControllerManifests(registry: BranchFollowerRegistry): Record { +export function sentinelPublishPipelineRunManifest(state: SentinelCicdState, pipelineRunName: string, publishGitops: boolean): Record { const namespace = stringAt(state.cicd, "builder.namespace"); const buildkitImage = requireSentinelBuildkitImage(state); const proxyEnv = sentinelImageBuildProxyEnv(state); diff --git a/scripts/src/hwlab-node-web-sentinel-cicd.ts b/scripts/src/hwlab-node-web-sentinel-cicd.ts index 71027ee0..53759170 100644 --- a/scripts/src/hwlab-node-web-sentinel-cicd.ts +++ b/scripts/src/hwlab-node-web-sentinel-cicd.ts @@ -136,7 +136,14 @@ export { const SPEC_REF = "PJ2026-01060508 Web哨兵 draft-2026-07-01-p16-cicd-source-snapshot"; -type SourceResolveMode = "cached" | "sync"; +export type SourceResolveMode = "cached" | "sync"; + +export interface SentinelSourceOverride { + readonly commit: string; + readonly stageRef?: string | null; + readonly mirrorCommit?: string | null; + readonly sourceAuthority: "git-mirror-snapshot"; +} export function runWebProbeSentinelCommand(spec: HwlabRuntimeLaneSpec, options: WebProbeSentinelOptions): RenderedCliResult { if (options.kind === "config") return withWebProbeSentinelConfigRendered(webProbeSentinelConfigPlan(spec, options.action, options.sentinelId)); @@ -534,7 +541,13 @@ function sentinelAlreadyCurrentControlResult(state: SentinelCicdState, observed: }; } -function loadSentinelCicdState(spec: HwlabRuntimeLaneSpec, sentinelId: string | null, timeoutSeconds: number, sourceResolveMode: SourceResolveMode): SentinelCicdState { +export function loadSentinelCicdState( + spec: HwlabRuntimeLaneSpec, + sentinelId: string | null, + timeoutSeconds: number, + sourceResolveMode: SourceResolveMode, + sourceOverride: SentinelSourceOverride | null = null, +): SentinelCicdState { const sentinel = resolveWebProbeSentinel(spec, sentinelId); const configPlan = webProbeSentinelConfigPlan(spec, "status", sentinel.id); const runtime = recordTarget(readWebProbeSentinelConfigRefTarget(spec, sentinel.configRefs.runtime), sentinel.configRefs.runtime); @@ -549,7 +562,9 @@ function loadSentinelCicdState(spec: HwlabRuntimeLaneSpec, sentinelId: string | const nodeId = stringField(controlPlaneTarget, "node"); const controlPlaneNode = recordTarget(valueAtPath(controlPlaneConfig, `nodes.${nodeId}`), `${configRefFile(controlPlaneRef)}#nodes.${nodeId}`); validateSentinelSourceAuthority(cicd); - const sourceHead = resolveSourceHead(spec, cicd, controlPlaneTarget, controlPlaneNode, timeoutSeconds, sourceResolveMode); + const sourceHead = sourceOverride === null + ? resolveSourceHead(spec, cicd, controlPlaneTarget, controlPlaneNode, timeoutSeconds, sourceResolveMode) + : sourceHeadFromOverride(cicd, sourceOverride); const image = sentinelImagePlan(spec, cicd, sourceHead); const manifests = renderSentinelManifests(spec, sentinel.id, runtime, cicd, scenarios, publicExposure, secrets, image); const manifestYaml = `${manifests.map((item) => Bun.YAML.stringify(item).trim()).join("\n---\n")}\n`; @@ -573,6 +588,32 @@ function loadSentinelCicdState(spec: HwlabRuntimeLaneSpec, sentinelId: string | }; } +function sourceHeadFromOverride(cicd: Record, override: SentinelSourceOverride): SourceHead { + if (!/^[0-9a-f]{40}$/iu.test(override.commit)) throw new Error(`sentinel source override commit must be a full sha, got ${override.commit}`); + const stageRef = override.stageRef ?? sentinelSourceSnapshotRef(cicd, override.commit); + if (!stageRef.startsWith("refs/")) throw new Error(`sentinel source override stageRef must be a git ref, got ${stageRef}`); + const mirrorCommit = override.mirrorCommit ?? override.commit; + if (!/^[0-9a-f]{40}$/iu.test(mirrorCommit)) throw new Error(`sentinel source override mirrorCommit must be a full sha, got ${mirrorCommit}`); + return { + ok: true, + repository: stringAt(cicd, "source.repository"), + branch: stringAt(cicd, "source.branch"), + commit: override.commit, + stageRef, + mirrorCommit, + sourceAuthority: override.sourceAuthority, + latestDrift: mirrorCommit !== override.commit, + result: { + exitCode: 0, + timedOut: false, + stdoutBytes: 0, + stderrBytes: 0, + stdoutPreview: "source supplied by cicd branch-follower k8s git-mirror snapshot", + stderrPreview: "", + }, + }; +} + function resolveSourceHead( spec: HwlabRuntimeLaneSpec, cicd: Record,