Merge pull request #1037 from pikasTech/fix/1020-sentinel-startup-ready

fix: wait for sentinel observer startup
This commit is contained in:
Lyon
2026-06-26 21:17:20 +08:00
committed by GitHub
+124
View File
@@ -1726,6 +1726,22 @@ function runSentinelQuickVerify(state: SentinelCicdState, reason: string, timeou
valuesRedacted: true,
});
}
const startupReady = waitForQuickVerifyObserverStartup(state, observerId, deadline, sampleIntervalMs, budgetSeconds);
steps.push({ phase: "observe-wait-startup-ready", ok: startupReady.ok, result: startupReady });
if (startupReady.ok !== true) {
return recordQuickVerify(state, finalizeQuickVerifyFailure(state, {
runId,
scenarioId,
reason,
observerId,
promptIndex: 0,
steps,
failure: text(startupReady.failure ?? "observe-startup-ready-wait-failed"),
elapsedMs: elapsedMs(),
warnings: mergeWarnings(Array.isArray(startupReady.warnings) ? startupReady.warnings : [], elapsedWarnings()),
promptSource: prompts.summary,
}));
}
let promptIndex = 0;
const sessionInvarianceChecks = sessionInvarianceChecksByRound(scenario);
for (const item of commandSequence) {
@@ -2353,6 +2369,114 @@ function readAnalysisSummaryFromWorkspace(state: SentinelCicdState, stateDir: st
return { ok: result.exitCode === 0 && parsed?.ok === true, ...record(parsed), result: compactCommand(result), valuesRedacted: true };
}
function waitForQuickVerifyObserverStartup(state: SentinelCicdState, observerId: string, deadline: number, pollIntervalMs: number, budgetSeconds: number): Record<string, unknown> {
const observations: Record<string, unknown>[] = [];
const indexEntry = readLocalObserveIndex(observerId);
if (indexEntry === null) {
return {
ok: false,
failure: "observe-index-entry-missing",
observerId,
valuesRedacted: true,
};
}
const pollSleepMs = Math.max(250, Math.min(500, Math.trunc(pollIntervalMs / 2) || 250));
while (Date.now() < deadline) {
const waitMs = Math.max(1000, Math.min(55_000, deadline - Date.now()));
const script = quickVerifyObserverStartupWaitScript(indexEntry.stateDir, waitMs, pollSleepMs);
const result = runCommand(["trans", `${state.spec.nodeId}:${state.spec.workspace}`, "sh"], repoRoot, { input: script, timeoutMs: waitMs + 5000 });
const payload = parseJsonObject(result.stdout);
if (Array.isArray(payload?.observations)) observations.push(...payload.observations.map(record));
const terminalPayload = {
observerId,
stateDir: indexEntry.stateDir,
status: typeof payload?.status === "string" ? payload.status : null,
heartbeatStatus: typeof payload?.heartbeatStatus === "string" ? payload.heartbeatStatus : null,
startup: record(payload?.startup),
observations: observations.slice(-6),
waitResult: compactCommand(result),
valuesRedacted: true,
};
if (result.exitCode !== 0 || payload === null || payload.ok === false && payload.failure !== "quick-verify-startup-wait-chunk-timeout") {
return {
ok: false,
failure: text(payload?.failure ?? "quick-verify-startup-artifact-wait-failed"),
...terminalPayload,
};
}
if (payload.ok === true) return { ok: true, ...terminalPayload };
}
return {
ok: false,
failure: "quick-verify-timeout-over-budget",
observerId,
stateDir: indexEntry.stateDir,
observations: observations.slice(-6),
warnings: [`quick verify exceeded the configured ${budgetSeconds}s targetValidation budget while waiting for the observe runner startup to finish before sending the first command.`],
valuesRedacted: true,
};
}
function quickVerifyObserverStartupWaitScript(stateDir: string, timeoutMs: number, pollSleepMs: number): string {
return [
"set -eu",
`state_dir=${shellQuote(stateDir)}`,
`timeout_ms=${shellQuote(String(Math.max(1, Math.trunc(timeoutMs))))}`,
`poll_ms=${shellQuote(String(Math.max(250, Math.trunc(pollSleepMs))))}`,
"test -d \"$state_dir\" || { printf '{\"ok\":false,\"failure\":\"state-dir-missing\",\"stateDir\":\"%s\",\"valuesRedacted\":true}\\n' \"$state_dir\"; exit 0; }",
"node - \"$state_dir\" \"$timeout_ms\" \"$poll_ms\" <<'NODE'",
"const fs = require('node:fs');",
"const path = require('node:path');",
"const dir = process.argv[2];",
"const timeoutMs = Number(process.argv[3]);",
"const pollMs = Number(process.argv[4]);",
"const startedAt = Date.now();",
"const startupIds = ['startup-login', 'startup-goto', 'startup-observer-goto'];",
"const readJson = (rel) => { try { return JSON.parse(fs.readFileSync(path.join(dir, rel), 'utf8')); } catch { return null; } };",
"const readJsonl = (rel) => { try { return fs.readFileSync(path.join(dir, rel), 'utf8').split(/\\r?\\n/u).filter(Boolean).map((line) => { try { return JSON.parse(line); } catch { return null; } }).filter(Boolean); } catch { return []; } };",
"const clip = (value, limit = 160) => value == null ? null : String(value).replace(/\\s+/gu, ' ').trim().slice(0, limit);",
"const norm = (value) => String(value || '').trim().toLowerCase().replace(/_/gu, '-');",
"const terminal = new Set(['failed', 'force-stopped', 'stopped', 'abandoned', 'completed']);",
"function commandEvents(control, id) { return control.filter((item) => item && item.commandId === id); }",
"function lastPhase(control, id) { return commandEvents(control, id).filter((item) => typeof item.phase === 'string').slice(-1)[0]?.phase || null; }",
"function firstFailedStartup(control) { return control.filter((item) => item && startupIds.includes(item.commandId) && item.phase === 'failed').slice(-1)[0] || null; }",
"function rowFor() {",
" const heartbeat = readJson('heartbeat.json') || {};",
" const manifest = readJson('manifest.json') || {};",
" const control = readJsonl('control.jsonl');",
" const phases = Object.fromEntries(startupIds.map((id) => [id, lastPhase(control, id)]));",
" const failed = firstFailedStartup(control);",
" const heartbeatStatus = norm(heartbeat.status || manifest.status);",
" const ready = startupIds.every((id) => phases[id] === 'completed') && heartbeatStatus === 'running';",
" const terminalBeforeReady = !ready && terminal.has(heartbeatStatus);",
" const degraded = control.filter((item) => item && item.type === 'observer-startup-degraded').slice(-1)[0] || null;",
" return {",
" ok: ready,",
" status: ready ? 'startup-ready' : terminalBeforeReady ? 'startup-terminal' : 'startup-waiting',",
" heartbeatStatus,",
" startup: { phases, failedCommandId: failed?.commandId || null, failedType: failed?.type || null, failedMessage: clip(failed?.detail?.error?.message || failed?.detail?.error || failed?.error?.message), observerStartupDegraded: !!degraded, degradedReason: clip(degraded?.reason || degraded?.result?.failureKind || degraded?.result?.reason), sampleSeq: heartbeat.sampleSeq ?? null, commandSeq: heartbeat.commandSeq ?? null, currentUrl: clip(heartbeat.currentUrl, 180), observerUrl: clip(heartbeat.observerUrl, 180), valuesRedacted: true },",
" valuesRedacted: true",
" };",
"}",
"const sleep = (ms) => new Promise((resolve) => setTimeout(resolve, ms));",
"(async () => {",
" const observations = [];",
" while (Date.now() - startedAt <= timeoutMs) {",
" const row = rowFor();",
" observations.push(row);",
" if (row.ok === true) { console.log(JSON.stringify({ ok: true, ...row, observations: observations.slice(-6), elapsedMs: Date.now() - startedAt, valuesRedacted: true })); return; }",
" if (row.startup.failedCommandId) { console.log(JSON.stringify({ ok: false, failure: 'observer-startup-command-failed', ...row, observations: observations.slice(-6), elapsedMs: Date.now() - startedAt, valuesRedacted: true })); return; }",
" if (row.status === 'startup-terminal') { console.log(JSON.stringify({ ok: false, failure: 'observer-startup-terminal', ...row, observations: observations.slice(-6), elapsedMs: Date.now() - startedAt, valuesRedacted: true })); return; }",
" await sleep(Math.min(pollMs, Math.max(0, timeoutMs - (Date.now() - startedAt))));",
" }",
" const row = rowFor();",
" observations.push(row);",
" console.log(JSON.stringify({ ok: false, failure: 'quick-verify-startup-wait-chunk-timeout', ...row, observations: observations.slice(-6), elapsedMs: Date.now() - startedAt, valuesRedacted: true }));",
"})().catch((error) => { console.log(JSON.stringify({ ok: false, failure: 'quick-verify-startup-wait-script-error', error: error instanceof Error ? error.message : String(error), valuesRedacted: true })); });",
"NODE",
].join("\n");
}
function collectObserveView(state: SentinelCicdState, observerId: string, view: "turn-summary" | "trace-frame", turn: number | null, timeoutSeconds: number): Record<string, unknown> {
const args = ["web-probe", "observe", "collect", observerId, "--node", state.spec.nodeId, "--lane", state.spec.lane, "--view", view, "--command-timeout-seconds", String(Math.max(5, Math.min(timeoutSeconds, 55))), "--raw", "--compact-raw"];
if (turn !== null) args.push("--turn", String(turn));