Merge pull request #1490 from pikasTech/fix/1476-sentinel-native-trigger

fix: trigger sentinel follower via k8s api
This commit is contained in:
Lyon
2026-07-03 15:36:07 +08:00
committed by GitHub
3 changed files with 272 additions and 8 deletions
+227 -4
View File
@@ -7,6 +7,10 @@ import { repoRoot, rootPath, type UniDeskConfig } from "./config";
import { runCommand, type CommandResult } from "./command";
import { startJob } from "./jobs";
import type { RenderedCliResult } from "./output";
import { hwlabRuntimeLaneSpecForNode } from "./hwlab-node-lanes";
import { loadSentinelCicdState } from "./hwlab-node-web-sentinel-cicd";
import { sentinelPublishPipelineRunManifest } from "./hwlab-node-web-sentinel-cicd-jobs";
import { sentinelPipelineRunName } from "./hwlab-node-web-sentinel-cicd-shared";
import { transPath } from "./hwlab-node/runtime-common";
import { configRefGraph, resolveConfigRefString } from "./ops/config-refs";
import {
@@ -227,6 +231,14 @@ interface NativeObjectBundle {
stderrTail: string;
}
interface TriggerResult {
ok: boolean;
completed: boolean;
message: string;
jobId: string | null;
command: Record<string, unknown>;
}
interface FollowerState {
id: string;
adapter: string;
@@ -931,7 +943,7 @@ async function decideAndMaybeTrigger(
decision = trigger.ok ? `trigger submitted for ${shortSha(observedSha)}` : `trigger failed for ${shortSha(observedSha)}`;
inFlightJob = trigger.jobId ?? live.inFlightJob;
lastTriggeredSha = observedSha;
if (trigger.ok && options.wait) {
if (trigger.ok && options.wait && trigger.completed) {
targetSha = observedSha;
lastSucceededSha = observedSha;
}
@@ -982,13 +994,17 @@ async function decideAndMaybeTrigger(
};
}
async function executeTrigger(follower: FollowerSpec, observedSha: string | null, options: ParsedOptions): Promise<{ ok: boolean; message: string; jobId: string | null; command: Record<string, unknown> }> {
async function executeTrigger(follower: FollowerSpec, observedSha: string | null, options: ParsedOptions): Promise<TriggerResult> {
const spec = follower.commands.trigger;
const timeoutSeconds = options.timeoutSeconds ?? spec.timeoutSeconds;
if (follower.adapter === "web-probe-sentinel-cicd" && options.controller) {
return executeNativeSentinelTrigger(follower, observedSha, options, timeoutSeconds);
}
if (!options.wait && !options.controller) {
const job = startJob(`cicd_branch_follower_${safeJobSegment(follower.id)}`, spec.argv, `Trigger ${follower.id} for observed sha ${observedSha ?? "unknown"}`);
return {
ok: true,
completed: false,
message: `started async job ${job.id}`,
jobId: job.id,
command: {
@@ -1002,12 +1018,217 @@ async function executeTrigger(follower: FollowerSpec, observedSha: string | null
const result = runCommand(spec.argv, repoRoot, { timeoutMs: timeoutSeconds * 1000 });
return {
ok: result.exitCode === 0,
completed: result.exitCode === 0 && options.wait,
message: result.exitCode === 0 ? "trigger command completed" : tailText(result.stderr || result.stdout, 500),
jobId: null,
command: commandCompact(result, options),
};
}
function executeNativeSentinelTrigger(follower: FollowerSpec, observedSha: string | null, options: ParsedOptions, timeoutSeconds: number): TriggerResult {
if (observedSha === null) {
return {
ok: false,
completed: false,
message: "native sentinel trigger requires observed source sha",
jobId: null,
command: { mode: "k8s-native-tekton", ok: false, reason: "observed-sha-missing", parsedDownstreamCliOutput: false },
};
}
if (follower.target.sentinel === null) {
return {
ok: false,
completed: false,
message: "native sentinel trigger requires target.sentinel",
jobId: null,
command: { mode: "k8s-native-tekton", ok: false, reason: "target-sentinel-missing", parsedDownstreamCliOutput: false },
};
}
const spec = hwlabRuntimeLaneSpecForNode(follower.target.lane, follower.target.node);
const stageRef = `${follower.source.snapshotPrefix.replace(/\/+$/u, "")}/${observedSha}`;
const state = loadSentinelCicdState(spec, follower.target.sentinel, timeoutSeconds, "cached", {
commit: observedSha,
stageRef,
mirrorCommit: observedSha,
sourceAuthority: "git-mirror-snapshot",
});
const pipelineRun = sentinelPipelineRunName(state, false);
const namespace = stringField(state.cicd, "builder.namespace", `${follower.id}.sentinel.cicd.builder`);
const manifest = sentinelPublishPipelineRunManifest(state, pipelineRun, true);
const result = runNativeTektonPipelineRun(namespace, pipelineRun, manifest, options.wait, timeoutSeconds);
const payload = parseJsonObject(result.stdout) ?? {};
const pipelineRunCompleted = payload.completed === true;
const failed = payload.failed === true || result.exitCode !== 0;
const stillRunning = payload.stillRunning === true || payload.timedOutWait === true;
const message = failed
? tailText(result.stderr || stringOrNull(payload.message) || result.stdout, 500)
: pipelineRunCompleted
? `native sentinel PipelineRun ${pipelineRun} succeeded; runtime closeout remains k8s-native`
: stillRunning
? `native sentinel PipelineRun ${pipelineRun} is still running; query status/events/logs for closeout`
: `native sentinel PipelineRun ${pipelineRun} submitted`;
return {
ok: !failed,
completed: false,
message,
jobId: pipelineRun,
command: {
mode: "k8s-native-tekton",
adapter: follower.adapter,
namespace,
pipelineRun,
sourceCommit: observedSha,
sourceStageRef: stageRef,
wait: options.wait,
pipelineRunCompleted,
stillRunning,
statusAuthority: "kubernetes-api-serviceaccount",
parsedDownstreamCliOutput: false,
payload,
exitCode: result.exitCode,
timedOut: result.timedOut,
stderrTail: failed ? redactText(tailText(result.stderr, Math.min(options.tailBytes, 4000))) : "",
},
};
}
function runNativeTektonPipelineRun(namespace: string, pipelineRun: string, manifest: Record<string, unknown>, wait: boolean, timeoutSeconds: number): CommandResult {
const manifestJson = JSON.stringify(manifest);
const manifestBase64 = Buffer.from(manifestJson, "utf8").toString("base64");
const nodeScript = nativeTektonPipelineRunNodeScript();
const script = [
"set -eu",
"tmpdir=$(mktemp -d)",
"cleanup() { rm -rf \"$tmpdir\"; }",
"trap cleanup EXIT INT TERM",
"cat >\"$tmpdir/manifest.b64\" <<'UNIDESK_SENTINEL_PIPELINERUN_B64'",
manifestBase64,
"UNIDESK_SENTINEL_PIPELINERUN_B64",
`NAMESPACE=${shQuote(namespace)}`,
`PIPELINERUN=${shQuote(pipelineRun)}`,
`WAIT=${wait ? "true" : "false"}`,
`TIMEOUT_SECONDS=${shQuote(String(timeoutSeconds))}`,
"export NAMESPACE PIPELINERUN WAIT TIMEOUT_SECONDS",
"cat >\"$tmpdir/submit-pipelinerun.mjs\" <<'NODE_NATIVE_TEKTON'",
nodeScript,
"NODE_NATIVE_TEKTON",
"node \"$tmpdir/submit-pipelinerun.mjs\" \"$tmpdir/manifest.b64\"",
].join("\n");
return runCommand(["sh", "-lc", script], repoRoot, { timeoutMs: Math.max(5, timeoutSeconds + 10) * 1000 });
}
function nativeTektonPipelineRunNodeScript(): string {
return [
"import { readFileSync } from 'node:fs';",
"import https from 'node:https';",
"const manifestPath = process.argv[2];",
"const namespace = process.env.NAMESPACE || '';",
"const pipelineRun = process.env.PIPELINERUN || '';",
"const shouldWait = process.env.WAIT === 'true';",
"const timeoutSeconds = Number(process.env.TIMEOUT_SECONDS || '60');",
"const host = process.env.KUBERNETES_SERVICE_HOST;",
"const port = Number(process.env.KUBERNETES_SERVICE_PORT || '443');",
"const token = readFileSync('/var/run/secrets/kubernetes.io/serviceaccount/token', 'utf8').trim();",
"const ca = readFileSync('/var/run/secrets/kubernetes.io/serviceaccount/ca.crt');",
"const manifest = JSON.parse(Buffer.from(readFileSync(manifestPath, 'utf8').replace(/\\s+/g, ''), 'base64').toString('utf8'));",
"function request(method, path, body, contentType = 'application/json') {",
" return new Promise((resolve, reject) => {",
" const headers = { authorization: `Bearer ${token}` };",
" const payload = body === undefined ? null : typeof body === 'string' ? body : JSON.stringify(body);",
" if (payload !== null) { headers['content-type'] = contentType; headers['content-length'] = Buffer.byteLength(payload); }",
" const req = https.request({ host, port, path, method, ca, headers }, (res) => {",
" let text = '';",
" res.setEncoding('utf8');",
" res.on('data', (chunk) => { text += chunk; });",
" res.on('end', () => resolve({ status: res.statusCode || 0, text }));",
" });",
" req.on('error', reject);",
" if (payload !== null) req.write(payload);",
" req.end();",
" });",
"}",
"function parseBody(result) {",
" if (!result.text) return null;",
" try { return JSON.parse(result.text); } catch { return null; }",
"}",
"async function getPipelineRun() {",
" const result = await request('GET', `/apis/tekton.dev/v1/namespaces/${encodeURIComponent(namespace)}/pipelineruns/${encodeURIComponent(pipelineRun)}`);",
" if (result.status === 404) return { found: false, object: null, status: result.status, text: result.text };",
" if (result.status < 200 || result.status >= 300) throw new Error(result.text || `kube api GET pipelinerun status ${result.status}`);",
" return { found: true, object: parseBody(result), status: result.status, text: result.text };",
"}",
"function succeededCondition(object) {",
" const conditions = Array.isArray(object?.status?.conditions) ? object.status.conditions : [];",
" return conditions.find((item) => item && item.type === 'Succeeded') || null;",
"}",
"function compact(object) {",
" const condition = succeededCondition(object);",
" return {",
" name: object?.metadata?.name || pipelineRun,",
" namespace: object?.metadata?.namespace || namespace,",
" generation: object?.metadata?.generation ?? null,",
" startTime: object?.status?.startTime || null,",
" completionTime: object?.status?.completionTime || null,",
" conditionStatus: condition?.status || null,",
" conditionReason: condition?.reason || null,",
" conditionMessage: condition?.message || null,",
" };",
"}",
"function delay(ms) { return new Promise((resolve) => setTimeout(resolve, ms)); }",
"let created = false;",
"let reused = false;",
"let latest = await getPipelineRun();",
"if (latest.found) {",
" reused = true;",
"} else {",
" const result = await request('POST', `/apis/tekton.dev/v1/namespaces/${encodeURIComponent(namespace)}/pipelineruns`, manifest);",
" if (result.status === 409) {",
" reused = true;",
" } else if (result.status >= 200 && result.status < 300) {",
" created = true;",
" } else {",
" process.stderr.write(result.text || `kube api POST pipelinerun status ${result.status}`);",
" process.exit(1);",
" }",
" latest = await getPipelineRun();",
"}",
"const deadline = Date.now() + Math.max(1, timeoutSeconds) * 1000;",
"let polls = 0;",
"while (shouldWait) {",
" const condition = succeededCondition(latest.object);",
" if (condition?.status === 'True' || condition?.status === 'False') break;",
" if (Date.now() >= deadline) break;",
" polls += 1;",
" process.stderr.write(JSON.stringify({ event: 'cicd.branch-follower.native-tekton.wait', pipelineRun, namespace, polls, conditionStatus: condition?.status || null, valuesRedacted: true }) + '\\n');",
" await delay(2000);",
" latest = await getPipelineRun();",
"}",
"const condition = succeededCondition(latest.object);",
"const completed = condition?.status === 'True';",
"const failed = condition?.status === 'False';",
"const terminal = completed || failed;",
"const output = {",
" ok: !failed,",
" submitted: true,",
" created,",
" reused,",
" wait: shouldWait,",
" polls,",
" completed,",
" failed,",
" terminal,",
" stillRunning: !terminal,",
" timedOutWait: shouldWait && !terminal,",
" pipelineRun: compact(latest.object),",
" statusAuthority: 'kubernetes-api-serviceaccount',",
" parsedDownstreamCliOutput: false,",
" valuesRedacted: true,",
"};",
"process.stdout.write(JSON.stringify(output));",
"if (failed) process.exit(1);",
].join("\n");
}
async function readAdapterStatus(registry: BranchFollowerRegistry, follower: FollowerSpec, options: ParsedOptions): Promise<AdapterSummary> {
const timeoutSeconds = options.timeoutSeconds ?? follower.budgets.statusSeconds;
const bundle = readNativeObjectBundle(registry, follower, options, timeoutSeconds);
@@ -1211,8 +1432,9 @@ function readNativeObjectBundle(registry: BranchFollowerRegistry, follower: Foll
"fi",
"case \"$source_commit\" in",
" [0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f])",
" stage_ref=\"${snapshot_prefix%/}/$source_commit\"",
" printf 'UNIDESK_NATIVE_JSON\\tsource\\t'",
" printf '{\"commit\":\"%s\",\"branch\":\"%s\",\"mode\":\"k8s-git-mirror-cache\",\"repoPath\":\"%s\"}' \"$source_commit\" \"$branch\" \"$repo_path\" | base64 | tr -d '\\n'",
" printf '{\"commit\":\"%s\",\"branch\":\"%s\",\"stageRef\":\"%s\",\"sourceAuthority\":\"k8s-git-mirror-snapshot\",\"mode\":\"k8s-git-mirror-cache\",\"repoPath\":\"%s\"}' \"$source_commit\" \"$branch\" \"$stage_ref\" \"$repo_path\" | base64 | tr -d '\\n'",
" printf '\\n'",
" ;;",
" *)",
@@ -1779,7 +2001,8 @@ function renderControllerManifests(registry: BranchFollowerRegistry): Record<str
{ apiGroups: [""], resources: ["pods", "pods/log", "configmaps", "events"], verbs: ["get", "list", "watch"] },
{ apiGroups: [""], resources: ["pods/exec"], verbs: ["create"] },
{ apiGroups: ["apps"], resources: ["deployments", "statefulsets"], verbs: ["get", "list", "watch"] },
{ apiGroups: ["tekton.dev"], resources: ["pipelineruns"], verbs: ["get", "list", "watch"] },
{ apiGroups: ["tekton.dev"], resources: ["pipelineruns"], verbs: ["get", "list", "watch", "create"] },
{ apiGroups: ["tekton.dev"], resources: ["taskruns"], verbs: ["get", "list", "watch"] },
{ apiGroups: ["argoproj.io"], resources: ["applications"], verbs: ["get", "list", "watch"] },
],
},
@@ -367,7 +367,7 @@ export function runSentinelPublishJob(state: SentinelCicdState, publishGitops: b
return withSentinelRemoteJobDiagnostics(state, { ok: false, phase: "pipelinerun-timeout", resourceKind: "PipelineRun", jobName: pipelineRunName, payload: { ok: false, status: "timeout", valuesRedacted: true }, polls, elapsedMs: Date.now() - startedAt, probe: lastProbe, valuesRedacted: true }, "publish");
}
function sentinelPublishPipelineRunManifest(state: SentinelCicdState, pipelineRunName: string, publishGitops: boolean): Record<string, unknown> {
export function sentinelPublishPipelineRunManifest(state: SentinelCicdState, pipelineRunName: string, publishGitops: boolean): Record<string, unknown> {
const namespace = stringAt(state.cicd, "builder.namespace");
const buildkitImage = requireSentinelBuildkitImage(state);
const proxyEnv = sentinelImageBuildProxyEnv(state);
+44 -3
View File
@@ -136,7 +136,14 @@ export {
const SPEC_REF = "PJ2026-01060508 Web哨兵 draft-2026-07-01-p16-cicd-source-snapshot";
type SourceResolveMode = "cached" | "sync";
export type SourceResolveMode = "cached" | "sync";
export interface SentinelSourceOverride {
readonly commit: string;
readonly stageRef?: string | null;
readonly mirrorCommit?: string | null;
readonly sourceAuthority: "git-mirror-snapshot";
}
export function runWebProbeSentinelCommand(spec: HwlabRuntimeLaneSpec, options: WebProbeSentinelOptions): RenderedCliResult {
if (options.kind === "config") return withWebProbeSentinelConfigRendered(webProbeSentinelConfigPlan(spec, options.action, options.sentinelId));
@@ -534,7 +541,13 @@ function sentinelAlreadyCurrentControlResult(state: SentinelCicdState, observed:
};
}
function loadSentinelCicdState(spec: HwlabRuntimeLaneSpec, sentinelId: string | null, timeoutSeconds: number, sourceResolveMode: SourceResolveMode): SentinelCicdState {
export function loadSentinelCicdState(
spec: HwlabRuntimeLaneSpec,
sentinelId: string | null,
timeoutSeconds: number,
sourceResolveMode: SourceResolveMode,
sourceOverride: SentinelSourceOverride | null = null,
): SentinelCicdState {
const sentinel = resolveWebProbeSentinel(spec, sentinelId);
const configPlan = webProbeSentinelConfigPlan(spec, "status", sentinel.id);
const runtime = recordTarget(readWebProbeSentinelConfigRefTarget(spec, sentinel.configRefs.runtime), sentinel.configRefs.runtime);
@@ -549,7 +562,9 @@ function loadSentinelCicdState(spec: HwlabRuntimeLaneSpec, sentinelId: string |
const nodeId = stringField(controlPlaneTarget, "node");
const controlPlaneNode = recordTarget(valueAtPath(controlPlaneConfig, `nodes.${nodeId}`), `${configRefFile(controlPlaneRef)}#nodes.${nodeId}`);
validateSentinelSourceAuthority(cicd);
const sourceHead = resolveSourceHead(spec, cicd, controlPlaneTarget, controlPlaneNode, timeoutSeconds, sourceResolveMode);
const sourceHead = sourceOverride === null
? resolveSourceHead(spec, cicd, controlPlaneTarget, controlPlaneNode, timeoutSeconds, sourceResolveMode)
: sourceHeadFromOverride(cicd, sourceOverride);
const image = sentinelImagePlan(spec, cicd, sourceHead);
const manifests = renderSentinelManifests(spec, sentinel.id, runtime, cicd, scenarios, publicExposure, secrets, image);
const manifestYaml = `${manifests.map((item) => Bun.YAML.stringify(item).trim()).join("\n---\n")}\n`;
@@ -573,6 +588,32 @@ function loadSentinelCicdState(spec: HwlabRuntimeLaneSpec, sentinelId: string |
};
}
function sourceHeadFromOverride(cicd: Record<string, unknown>, override: SentinelSourceOverride): SourceHead {
if (!/^[0-9a-f]{40}$/iu.test(override.commit)) throw new Error(`sentinel source override commit must be a full sha, got ${override.commit}`);
const stageRef = override.stageRef ?? sentinelSourceSnapshotRef(cicd, override.commit);
if (!stageRef.startsWith("refs/")) throw new Error(`sentinel source override stageRef must be a git ref, got ${stageRef}`);
const mirrorCommit = override.mirrorCommit ?? override.commit;
if (!/^[0-9a-f]{40}$/iu.test(mirrorCommit)) throw new Error(`sentinel source override mirrorCommit must be a full sha, got ${mirrorCommit}`);
return {
ok: true,
repository: stringAt(cicd, "source.repository"),
branch: stringAt(cicd, "source.branch"),
commit: override.commit,
stageRef,
mirrorCommit,
sourceAuthority: override.sourceAuthority,
latestDrift: mirrorCommit !== override.commit,
result: {
exitCode: 0,
timedOut: false,
stdoutBytes: 0,
stderrBytes: 0,
stdoutPreview: "source supplied by cicd branch-follower k8s git-mirror snapshot",
stderrPreview: "",
},
};
}
function resolveSourceHead(
spec: HwlabRuntimeLaneSpec,
cicd: Record<string, unknown>,