fix: wait for sentinel observer startup
This commit is contained in:
@@ -1726,6 +1726,22 @@ function runSentinelQuickVerify(state: SentinelCicdState, reason: string, timeou
|
||||
valuesRedacted: true,
|
||||
});
|
||||
}
|
||||
const startupReady = waitForQuickVerifyObserverStartup(state, observerId, deadline, sampleIntervalMs, budgetSeconds);
|
||||
steps.push({ phase: "observe-wait-startup-ready", ok: startupReady.ok, result: startupReady });
|
||||
if (startupReady.ok !== true) {
|
||||
return recordQuickVerify(state, finalizeQuickVerifyFailure(state, {
|
||||
runId,
|
||||
scenarioId,
|
||||
reason,
|
||||
observerId,
|
||||
promptIndex: 0,
|
||||
steps,
|
||||
failure: text(startupReady.failure ?? "observe-startup-ready-wait-failed"),
|
||||
elapsedMs: elapsedMs(),
|
||||
warnings: mergeWarnings(Array.isArray(startupReady.warnings) ? startupReady.warnings : [], elapsedWarnings()),
|
||||
promptSource: prompts.summary,
|
||||
}));
|
||||
}
|
||||
let promptIndex = 0;
|
||||
const sessionInvarianceChecks = sessionInvarianceChecksByRound(scenario);
|
||||
for (const item of commandSequence) {
|
||||
@@ -2353,6 +2369,114 @@ function readAnalysisSummaryFromWorkspace(state: SentinelCicdState, stateDir: st
|
||||
return { ok: result.exitCode === 0 && parsed?.ok === true, ...record(parsed), result: compactCommand(result), valuesRedacted: true };
|
||||
}
|
||||
|
||||
function waitForQuickVerifyObserverStartup(state: SentinelCicdState, observerId: string, deadline: number, pollIntervalMs: number, budgetSeconds: number): Record<string, unknown> {
|
||||
const observations: Record<string, unknown>[] = [];
|
||||
const indexEntry = readLocalObserveIndex(observerId);
|
||||
if (indexEntry === null) {
|
||||
return {
|
||||
ok: false,
|
||||
failure: "observe-index-entry-missing",
|
||||
observerId,
|
||||
valuesRedacted: true,
|
||||
};
|
||||
}
|
||||
const pollSleepMs = Math.max(250, Math.min(500, Math.trunc(pollIntervalMs / 2) || 250));
|
||||
while (Date.now() < deadline) {
|
||||
const waitMs = Math.max(1000, Math.min(55_000, deadline - Date.now()));
|
||||
const script = quickVerifyObserverStartupWaitScript(indexEntry.stateDir, waitMs, pollSleepMs);
|
||||
const result = runCommand(["trans", `${state.spec.nodeId}:${state.spec.workspace}`, "sh"], repoRoot, { input: script, timeoutMs: waitMs + 5000 });
|
||||
const payload = parseJsonObject(result.stdout);
|
||||
if (Array.isArray(payload?.observations)) observations.push(...payload.observations.map(record));
|
||||
const terminalPayload = {
|
||||
observerId,
|
||||
stateDir: indexEntry.stateDir,
|
||||
status: typeof payload?.status === "string" ? payload.status : null,
|
||||
heartbeatStatus: typeof payload?.heartbeatStatus === "string" ? payload.heartbeatStatus : null,
|
||||
startup: record(payload?.startup),
|
||||
observations: observations.slice(-6),
|
||||
waitResult: compactCommand(result),
|
||||
valuesRedacted: true,
|
||||
};
|
||||
if (result.exitCode !== 0 || payload === null || payload.ok === false && payload.failure !== "quick-verify-startup-wait-chunk-timeout") {
|
||||
return {
|
||||
ok: false,
|
||||
failure: text(payload?.failure ?? "quick-verify-startup-artifact-wait-failed"),
|
||||
...terminalPayload,
|
||||
};
|
||||
}
|
||||
if (payload.ok === true) return { ok: true, ...terminalPayload };
|
||||
}
|
||||
return {
|
||||
ok: false,
|
||||
failure: "quick-verify-timeout-over-budget",
|
||||
observerId,
|
||||
stateDir: indexEntry.stateDir,
|
||||
observations: observations.slice(-6),
|
||||
warnings: [`quick verify exceeded the configured ${budgetSeconds}s targetValidation budget while waiting for the observe runner startup to finish before sending the first command.`],
|
||||
valuesRedacted: true,
|
||||
};
|
||||
}
|
||||
|
||||
function quickVerifyObserverStartupWaitScript(stateDir: string, timeoutMs: number, pollSleepMs: number): string {
|
||||
return [
|
||||
"set -eu",
|
||||
`state_dir=${shellQuote(stateDir)}`,
|
||||
`timeout_ms=${shellQuote(String(Math.max(1, Math.trunc(timeoutMs))))}`,
|
||||
`poll_ms=${shellQuote(String(Math.max(250, Math.trunc(pollSleepMs))))}`,
|
||||
"test -d \"$state_dir\" || { printf '{\"ok\":false,\"failure\":\"state-dir-missing\",\"stateDir\":\"%s\",\"valuesRedacted\":true}\\n' \"$state_dir\"; exit 0; }",
|
||||
"node - \"$state_dir\" \"$timeout_ms\" \"$poll_ms\" <<'NODE'",
|
||||
"const fs = require('node:fs');",
|
||||
"const path = require('node:path');",
|
||||
"const dir = process.argv[2];",
|
||||
"const timeoutMs = Number(process.argv[3]);",
|
||||
"const pollMs = Number(process.argv[4]);",
|
||||
"const startedAt = Date.now();",
|
||||
"const startupIds = ['startup-login', 'startup-goto', 'startup-observer-goto'];",
|
||||
"const readJson = (rel) => { try { return JSON.parse(fs.readFileSync(path.join(dir, rel), 'utf8')); } catch { return null; } };",
|
||||
"const readJsonl = (rel) => { try { return fs.readFileSync(path.join(dir, rel), 'utf8').split(/\\r?\\n/u).filter(Boolean).map((line) => { try { return JSON.parse(line); } catch { return null; } }).filter(Boolean); } catch { return []; } };",
|
||||
"const clip = (value, limit = 160) => value == null ? null : String(value).replace(/\\s+/gu, ' ').trim().slice(0, limit);",
|
||||
"const norm = (value) => String(value || '').trim().toLowerCase().replace(/_/gu, '-');",
|
||||
"const terminal = new Set(['failed', 'force-stopped', 'stopped', 'abandoned', 'completed']);",
|
||||
"function commandEvents(control, id) { return control.filter((item) => item && item.commandId === id); }",
|
||||
"function lastPhase(control, id) { return commandEvents(control, id).filter((item) => typeof item.phase === 'string').slice(-1)[0]?.phase || null; }",
|
||||
"function firstFailedStartup(control) { return control.filter((item) => item && startupIds.includes(item.commandId) && item.phase === 'failed').slice(-1)[0] || null; }",
|
||||
"function rowFor() {",
|
||||
" const heartbeat = readJson('heartbeat.json') || {};",
|
||||
" const manifest = readJson('manifest.json') || {};",
|
||||
" const control = readJsonl('control.jsonl');",
|
||||
" const phases = Object.fromEntries(startupIds.map((id) => [id, lastPhase(control, id)]));",
|
||||
" const failed = firstFailedStartup(control);",
|
||||
" const heartbeatStatus = norm(heartbeat.status || manifest.status);",
|
||||
" const ready = startupIds.every((id) => phases[id] === 'completed') && heartbeatStatus === 'running';",
|
||||
" const terminalBeforeReady = !ready && terminal.has(heartbeatStatus);",
|
||||
" const degraded = control.filter((item) => item && item.type === 'observer-startup-degraded').slice(-1)[0] || null;",
|
||||
" return {",
|
||||
" ok: ready,",
|
||||
" status: ready ? 'startup-ready' : terminalBeforeReady ? 'startup-terminal' : 'startup-waiting',",
|
||||
" heartbeatStatus,",
|
||||
" startup: { phases, failedCommandId: failed?.commandId || null, failedType: failed?.type || null, failedMessage: clip(failed?.detail?.error?.message || failed?.detail?.error || failed?.error?.message), observerStartupDegraded: !!degraded, degradedReason: clip(degraded?.reason || degraded?.result?.failureKind || degraded?.result?.reason), sampleSeq: heartbeat.sampleSeq ?? null, commandSeq: heartbeat.commandSeq ?? null, currentUrl: clip(heartbeat.currentUrl, 180), observerUrl: clip(heartbeat.observerUrl, 180), valuesRedacted: true },",
|
||||
" valuesRedacted: true",
|
||||
" };",
|
||||
"}",
|
||||
"const sleep = (ms) => new Promise((resolve) => setTimeout(resolve, ms));",
|
||||
"(async () => {",
|
||||
" const observations = [];",
|
||||
" while (Date.now() - startedAt <= timeoutMs) {",
|
||||
" const row = rowFor();",
|
||||
" observations.push(row);",
|
||||
" if (row.ok === true) { console.log(JSON.stringify({ ok: true, ...row, observations: observations.slice(-6), elapsedMs: Date.now() - startedAt, valuesRedacted: true })); return; }",
|
||||
" if (row.startup.failedCommandId) { console.log(JSON.stringify({ ok: false, failure: 'observer-startup-command-failed', ...row, observations: observations.slice(-6), elapsedMs: Date.now() - startedAt, valuesRedacted: true })); return; }",
|
||||
" if (row.status === 'startup-terminal') { console.log(JSON.stringify({ ok: false, failure: 'observer-startup-terminal', ...row, observations: observations.slice(-6), elapsedMs: Date.now() - startedAt, valuesRedacted: true })); return; }",
|
||||
" await sleep(Math.min(pollMs, Math.max(0, timeoutMs - (Date.now() - startedAt))));",
|
||||
" }",
|
||||
" const row = rowFor();",
|
||||
" observations.push(row);",
|
||||
" console.log(JSON.stringify({ ok: false, failure: 'quick-verify-startup-wait-chunk-timeout', ...row, observations: observations.slice(-6), elapsedMs: Date.now() - startedAt, valuesRedacted: true }));",
|
||||
"})().catch((error) => { console.log(JSON.stringify({ ok: false, failure: 'quick-verify-startup-wait-script-error', error: error instanceof Error ? error.message : String(error), valuesRedacted: true })); });",
|
||||
"NODE",
|
||||
].join("\n");
|
||||
}
|
||||
|
||||
function collectObserveView(state: SentinelCicdState, observerId: string, view: "turn-summary" | "trace-frame", turn: number | null, timeoutSeconds: number): Record<string, unknown> {
|
||||
const args = ["web-probe", "observe", "collect", observerId, "--node", state.spec.nodeId, "--lane", state.spec.lane, "--view", view, "--command-timeout-seconds", String(Math.max(5, Math.min(timeoutSeconds, 55))), "--raw", "--compact-raw"];
|
||||
if (turn !== null) args.push("--turn", String(turn));
|
||||
|
||||
Reference in New Issue
Block a user