fix: clarify code queue split-brain liveness

This commit is contained in:
Codex
2026-05-20 01:41:16 +00:00
parent b01907739c
commit ada6da3da6
11 changed files with 264 additions and 10 deletions
+1 -1
View File
@@ -50,7 +50,7 @@ Use:
- `bun scripts/cli.ts codex task <taskId> --trace --limit N` or `codex output` only when the summary is insufficient.
- The liveness rules in `docs/reference/observability.md` when master control-plane state and D601 scheduler state appear split.
`split-brain` in queue diagnostics is a control-plane/execution-plane divergence signal, not automatic evidence that the work is dead. If the task heartbeats are fresh and the trace is still advancing, treat the task as live and keep supervising it rather than interrupting or replacing it.
`split-brain` in queue diagnostics is a control-plane/execution-plane divergence signal, not automatic evidence that the work is dead. If the task heartbeats are fresh and the trace is still advancing, treat the task as live and keep supervising it rather than interrupting or replacing it. The queue summary should expose this as `effectiveLiveness=live`, `splitBrainLive=true`, and `recommendedAction=continue-supervision`; expired, missing, or stale-recovery heartbeat evidence should instead surface `effectiveLiveness=at-risk`.
Long-running tasks with fresh trace or heartbeat evidence should normally be left alone. Polling every few minutes is preferred over repeated interrupt/retry cycles.
+1 -1
View File
@@ -50,6 +50,6 @@ Code Queue task 明明产出最终回复却反复 `retry_wait` 时,应优先
### Code Queue Liveness
Code Queue 的“任务是否卡死”不能由单一控制面字段判断。排障必须同时看 PostgreSQL 中的 `running`/`judging` 任务、D601 scheduler 本地 active run/active slot/active queue、scheduler-owned heartbeat、Trace/OA 持久化进度和 OA publisher pending/lastError。master `code-queue-mgr``postgres-control-plane` 视图只证明数据库行存在;当它显示 `activeRunSlotCount=0` 但 D601 heartbeat 仍新鲜时,正确结论是 control-plane/execution-plane 分裂,diagnostics 应显示 `split-brain``degraded`,不能宣称任务未执行或卡死。
Code Queue 的“任务是否卡死”不能由单一控制面字段判断。排障必须同时看 PostgreSQL 中的 `running`/`judging` 任务、D601 scheduler 本地 active run/active slot/active queue、scheduler-owned heartbeat、Trace/OA 持久化进度和 OA publisher pending/lastError。master `code-queue-mgr``postgres-control-plane` 视图只证明数据库行存在;当它显示 `activeRunSlotCount=0` 但 D601 heartbeat 仍新鲜时,正确结论是 control-plane/execution-plane 分裂,diagnostics 应显示 `split-brain``degraded`,不能宣称任务未执行或卡死。诊断输出中的 `effectiveLiveness=live``splitBrainLive=true``recommendedAction=continue-supervision` 表示这是 heartbeat 新鲜的观测分裂,应继续监督;`effectiveLiveness=at-risk``recommendedAction=investigate-heartbeat-risk` 表示存在 expired/missing/stale heartbeat 风险,需要优先人工确认。
Trace/OA 长时间没有新 seq 但 scheduler heartbeat 正常时,应归类为 trace gap 或 publisher degraded,不得自动 retry。只有 scheduler 本地没有 active run,且对应 owner heartbeat 已过期时,才允许进入 stale recovery candidate;缺失 heartbeat 只能触发 degraded 诊断和人工确认。任何恢复入口都必须由 scheduler 执行,使用条件更新和审计事件区分 user interrupt、admin stale recovery 与 service restart recovery;禁止直接修改 production PostgreSQL 任务状态来“修复” active run。
@@ -179,6 +179,7 @@ function checkStaleActiveOwnerExpired(): FixtureCheck {
const diagnostics = schedulerDiagnostics([task], []);
assertCondition(decision.allowed === true && decision.reason === "owner-heartbeat-expired", "expired owner heartbeat should be the stale recovery gate", { decision });
assertCondition(diagnostics.state === "stale-active", "diagnostics must mark stale active only after owner heartbeat expiry", diagnostics as unknown as Record<string, unknown>);
assertCondition(diagnostics.effectiveLiveness === "at-risk" && diagnostics.recommendedAction === "investigate-heartbeat-risk", "expired heartbeat diagnostics must surface at-risk liveness", diagnostics as unknown as Record<string, unknown>);
assertCondition(diagnostics.staleRecoveryCandidateTaskIds.includes(task.id), "expired owner heartbeat should create a stale candidate", diagnostics as unknown as Record<string, unknown>);
return {
name: "code-queue:stale-active-owner-expired",
@@ -186,6 +187,8 @@ function checkStaleActiveOwnerExpired(): FixtureCheck {
detail: {
decision,
state: diagnostics.state,
effectiveLiveness: diagnostics.effectiveLiveness,
recommendedAction: diagnostics.recommendedAction,
staleRecoveryCandidateTaskIds: diagnostics.staleRecoveryCandidateTaskIds,
},
};
@@ -203,6 +206,8 @@ function checkControlPlaneSplitBrainDiagnostics(): FixtureCheck {
oaPublisher: null,
});
assertCondition(diagnostics.state === "split-brain" && diagnostics.splitBrain === true, "master postgres-control-plane must report split-brain when DB active has fresh scheduler heartbeat", diagnostics as unknown as Record<string, unknown>);
assertCondition(diagnostics.splitBrainLive === true && diagnostics.effectiveLiveness === "live", "fresh split-brain diagnostics must be degraded but live", diagnostics as unknown as Record<string, unknown>);
assertCondition(diagnostics.recommendedAction === "continue-supervision", "fresh split-brain diagnostics must recommend continued supervision", diagnostics as unknown as Record<string, unknown>);
assertCondition(diagnostics.schedulerActiveRunSlotCount === 0 && diagnostics.databaseActiveTaskCount === 1, "split-brain fixture should preserve the exact control-plane divergence", diagnostics as unknown as Record<string, unknown>);
return {
name: "code-queue:control-plane-split-brain-diagnostics",
@@ -210,6 +215,9 @@ function checkControlPlaneSplitBrainDiagnostics(): FixtureCheck {
detail: {
state: diagnostics.state,
splitBrain: diagnostics.splitBrain,
splitBrainLive: diagnostics.splitBrainLive,
effectiveLiveness: diagnostics.effectiveLiveness,
recommendedAction: diagnostics.recommendedAction,
executionStateSource: diagnostics.executionStateSource,
databaseActiveTaskIds: diagnostics.databaseActiveTaskIds,
schedulerActiveRunSlotCount: diagnostics.schedulerActiveRunSlotCount,
+52 -1
View File
@@ -239,7 +239,14 @@ function upstreamError(response: unknown): string {
if (typeof bodyError === "string") {
const requestId = typeof body?.requestId === "string" ? ` requestId=${body.requestId}` : "";
const providerId = typeof body?.providerId === "string" ? ` providerId=${body.providerId}` : "";
return `${bodyError}${providerId}${requestId}`;
const observationHint = /\b(microservice|proxy|provider|tunnel|k3sctl|adapter|backend-core|offline|unavailable|timed out|timeout)\b/iu.test(bodyError)
? " The stable code-queue proxy failure is observation-path evidence only; from the supervisor or main-server CLI environment, also check `bun scripts/cli.ts codex queues`, `codex tasks`, or `codex task <taskId>` before declaring the work unevaluable or stopped."
: "";
return `${bodyError}${providerId}${requestId}${observationHint}`;
}
if (typeof record.codeQueueObservationNote === "string") {
const commands = Array.isArray(record.recommendedCommands) ? ` recommendedCommands=${JSON.stringify(record.recommendedCommands)}` : "";
return `${record.codeQueueObservationNote}${commands}`;
}
const status = typeof record.status === "number" ? `HTTP ${record.status}` : "upstream request failed";
return `${status}: ${JSON.stringify(response).slice(0, 1200)}`;
@@ -335,13 +342,56 @@ function compactSchedulerHeartbeat(value: unknown): Record<string, unknown> | nu
};
}
function splitBrainLiveFromDiagnostics(record: Record<string, unknown>): boolean {
if (typeof record.splitBrainLive === "boolean") return record.splitBrainLive;
const state = String(record.state ?? record.health ?? "").toLowerCase();
const riskTaskIds = Array.from(new Set([
...stringList(record.heartbeatRiskTaskIds),
...stringList(record.heartbeatExpiredTaskIds),
...stringList(record.heartbeatMissingTaskIds),
...stringList(record.staleRecoveryCandidateTaskIds),
]));
return state === "split-brain" && stringList(record.heartbeatFreshTaskIds).length > 0 && riskTaskIds.length === 0;
}
function effectiveLivenessFromDiagnostics(record: Record<string, unknown>): string {
if (typeof record.effectiveLiveness === "string" && record.effectiveLiveness.length > 0) return record.effectiveLiveness;
if (stringList(record.heartbeatRiskTaskIds).length > 0
|| stringList(record.heartbeatExpiredTaskIds).length > 0
|| stringList(record.heartbeatMissingTaskIds).length > 0
|| stringList(record.staleRecoveryCandidateTaskIds).length > 0) {
return "at-risk";
}
if (splitBrainLiveFromDiagnostics(record)) return "live";
return String(record.state ?? record.health ?? "unknown") === "healthy" ? "healthy" : "degraded";
}
function recommendedActionFromDiagnostics(record: Record<string, unknown>): string {
if (typeof record.recommendedAction === "string" && record.recommendedAction.length > 0) return record.recommendedAction;
const effectiveLiveness = effectiveLivenessFromDiagnostics(record);
if (effectiveLiveness === "at-risk") return "investigate-heartbeat-risk";
if (effectiveLiveness === "live") return "continue-supervision";
if (effectiveLiveness === "degraded") return "observe-degraded";
return "none";
}
function compactExecutionDiagnostics(value: unknown): Record<string, unknown> | null {
const record = asRecord(value);
if (record === null) return null;
const heartbeatRiskTaskIds = Array.from(new Set([
...stringList(record.heartbeatRiskTaskIds),
...stringList(record.heartbeatExpiredTaskIds),
...stringList(record.heartbeatMissingTaskIds),
...stringList(record.staleRecoveryCandidateTaskIds),
])).sort();
return {
state: record.state ?? record.health ?? null,
degraded: record.degraded ?? null,
splitBrain: record.splitBrain ?? null,
splitBrainLive: splitBrainLiveFromDiagnostics(record),
effectiveLiveness: effectiveLivenessFromDiagnostics({ ...record, heartbeatRiskTaskIds }),
recommendedAction: recommendedActionFromDiagnostics({ ...record, heartbeatRiskTaskIds }),
livenessSummary: record.livenessSummary ?? null,
executionStateSource: record.executionStateSource ?? null,
controlPlane: record.controlPlane ?? null,
databaseActiveTaskCount: record.databaseActiveTaskCount ?? null,
@@ -353,6 +403,7 @@ function compactExecutionDiagnostics(value: unknown): Record<string, unknown> |
heartbeatExpiredTaskIds: record.heartbeatExpiredTaskIds ?? [],
heartbeatMissingTaskIds: record.heartbeatMissingTaskIds ?? [],
staleRecoveryCandidateTaskIds: record.staleRecoveryCandidateTaskIds ?? [],
heartbeatRiskTaskIds,
traceGapTaskIds: record.traceGapTaskIds ?? [],
traceGapNotStaleTaskIds: record.traceGapNotStaleTaskIds ?? [],
lastSchedulerHeartbeatAt: record.lastSchedulerHeartbeatAt ?? null,
+15 -1
View File
@@ -32,7 +32,21 @@ export function coreInternalFetch(path: string, init?: { method?: string; body?:
const command = dockerCoreFetchCommand(path, init);
const result = runCommand(command, repoRoot);
if (result.exitCode !== 0) {
return { ok: false, exitCode: result.exitCode, stdoutTail: result.stdout.slice(-1200), stderrTail: result.stderr.slice(-1200) };
const codeQueueStableProxy = path.startsWith("/api/microservices/code-queue/");
return {
ok: false,
exitCode: result.exitCode,
stdoutTail: result.stdout.slice(-1200),
stderrTail: result.stderr.slice(-1200),
...(codeQueueStableProxy ? {
codeQueueObservationNote: "The stable /api/microservices/code-queue path could not be reached from this CLI container. This does not by itself mean Code Queue tasks are unevaluable or stopped. From the supervisor or main-server CLI environment, use the existing codex queues/tasks/task observation commands instead of treating this container-local proxy failure as liveness evidence.",
recommendedCommands: [
"bun scripts/cli.ts codex queues",
"bun scripts/cli.ts codex tasks --limit 20",
"bun scripts/cli.ts codex task <taskId>",
],
} : {}),
};
}
try {
return JSON.parse(result.stdout.trim()) as unknown;
+33
View File
@@ -2176,6 +2176,9 @@ input:focus, select:focus, textarea:focus { border-color: var(--accent-2); }
.codex-trace-status-chip.liveness.warn {
border-color: rgba(215, 161, 58, 0.55);
color: #ffe0a2;
background:
linear-gradient(135deg, rgba(215, 161, 58, 0.13), rgba(78, 183, 168, 0.06)),
rgba(0,0,0,0.18);
}
.codex-trace-status-chip.liveness.failed {
border-color: rgba(255, 98, 98, 0.58);
@@ -2229,6 +2232,36 @@ input:focus, select:focus, textarea:focus { border-color: var(--accent-2); }
font-size: 11px;
overflow-wrap: anywhere;
}
.codex-liveness-advisory {
display: flex;
flex-wrap: wrap;
gap: 6px;
align-items: center;
min-width: 0;
padding: 7px 8px;
border: 1px solid rgba(78, 183, 168, 0.28);
background: rgba(78, 183, 168, 0.07);
color: var(--muted);
}
.codex-liveness-advisory b {
color: var(--accent-2);
letter-spacing: 0.08em;
text-transform: uppercase;
}
.codex-liveness-advisory span {
min-width: 0;
overflow-wrap: anywhere;
}
.codex-liveness-advisory.warn {
border-color: rgba(215, 161, 58, 0.42);
background: rgba(215, 161, 58, 0.07);
}
.codex-liveness-advisory.warn b { color: var(--warn); }
.codex-liveness-advisory.failed {
border-color: rgba(255, 98, 98, 0.46);
background: rgba(207, 106, 84, 0.09);
}
.codex-liveness-advisory.failed b { color: #ffb2b2; }
.codex-liveness-reasons {
display: flex;
flex-wrap: wrap;
+54 -6
View File
@@ -239,10 +239,39 @@ function executionDiagnosticsFromQueue(queue: any, health: any): AnyRecord {
|| {};
}
function diagnosticsTone(state: any): string {
const value = String(state || "unknown").toLowerCase();
function diagnosticsHeartbeatRiskTaskIds(diagnostics: any): string[] {
return Array.from(new Set([
...stringArray(diagnostics?.heartbeatRiskTaskIds),
...stringArray(diagnostics?.heartbeatExpiredTaskIds),
...stringArray(diagnostics?.heartbeatMissingTaskIds),
...stringArray(diagnostics?.staleRecoveryCandidateTaskIds),
])).sort();
}
function splitBrainLiveDiagnostics(diagnostics: any): boolean {
if (typeof diagnostics?.splitBrainLive === "boolean") return diagnostics.splitBrainLive;
const state = String(diagnostics?.state || diagnostics?.health || "").toLowerCase();
return state === "split-brain"
&& stringArray(diagnostics?.heartbeatFreshTaskIds).length > 0
&& diagnosticsHeartbeatRiskTaskIds(diagnostics).length === 0;
}
function diagnosticsEffectiveLiveness(diagnostics: any): string {
const explicit = String(diagnostics?.effectiveLiveness || "").toLowerCase();
if (explicit) return explicit;
if (diagnosticsHeartbeatRiskTaskIds(diagnostics).length > 0) return "at-risk";
if (splitBrainLiveDiagnostics(diagnostics)) return "live";
const state = String(diagnostics?.state || diagnostics?.health || "unknown").toLowerCase();
return state === "healthy" ? "healthy" : state === "unknown" ? "unknown" : "degraded";
}
function diagnosticsTone(diagnostics: any): string {
const effective = diagnosticsEffectiveLiveness(diagnostics);
if (effective === "live" || effective === "degraded") return "warn";
if (effective === "at-risk") return "failed";
const value = String(diagnostics?.state || diagnostics?.health || "unknown").toLowerCase();
if (value === "healthy") return "ok";
if (value === "split-brain" || value === "stale-active") return "failed";
if (value === "stale-active") return "failed";
if (value === "degraded") return "warn";
return "unknown";
}
@@ -1576,12 +1605,25 @@ function CodeQueueLivenessPanel({ diagnostics, queue, onRaw }: AnyRecord) {
const state = String(diagnostics?.state || diagnostics?.health || "unknown");
const oaPublisher = objectRecord(diagnostics?.oaPublisher);
const reasons = stringArray(diagnostics?.reasons).slice(0, 3);
const tone = diagnosticsTone(state);
const heartbeatRiskTaskIds = diagnosticsHeartbeatRiskTaskIds(diagnostics);
const splitBrainLive = splitBrainLiveDiagnostics(diagnostics);
const effectiveLiveness = diagnosticsEffectiveLiveness(diagnostics);
const recommendedAction = String(diagnostics?.recommendedAction || (heartbeatRiskTaskIds.length > 0 ? "investigate-heartbeat-risk" : splitBrainLive ? "continue-supervision" : effectiveLiveness === "degraded" ? "observe-degraded" : "none"));
const livenessText = String(splitBrainLive
? "执行面 heartbeat 新鲜,任务仍应继续监督。"
: heartbeatRiskTaskIds.length > 0
? "存在 expired/missing/stale heartbeat 风险,请先确认执行面状态。"
: diagnostics?.livenessSummary || (effectiveLiveness === "degraded"
? "Diagnostics are degraded; inspect heartbeat, trace, and OA progress together."
: "Execution diagnostics are healthy."));
const tone = diagnosticsTone(diagnostics);
const stateLabel = splitBrainLive ? "split-brain live" : state;
return h(Panel, {
title: "执行活性",
eyebrow: `${String(diagnostics?.executionStateSource || queue?.executionStateSource || "unknown")} / ${String(diagnostics?.controlPlane || "code-queue")}`,
summary: h("div", { className: "codex-trace-status" },
h("span", { className: `codex-trace-status-chip liveness ${tone}` }, h("b", null, "状态"), state),
h("span", { className: `codex-trace-status-chip liveness ${tone}` }, h("b", null, "状态"), stateLabel),
h("span", { className: `codex-trace-status-chip liveness ${tone}` }, h("b", null, "liveness"), effectiveLiveness),
h("span", { className: "codex-trace-status-chip" }, h("b", null, "DB active"), String(diagnostics?.databaseActiveTaskCount ?? queue?.databaseActiveTaskCount ?? 0)),
h("span", { className: "codex-trace-status-chip" }, h("b", null, "scheduler slots"), String(diagnostics?.schedulerActiveRunSlotCount ?? queue?.activeRunSlotCount ?? 0)),
h("span", { className: "codex-trace-status-chip" }, h("b", null, "heartbeat"), `${stringArray(diagnostics?.heartbeatFreshTaskIds).length} fresh / ${stringArray(diagnostics?.heartbeatExpiredTaskIds).length} expired`),
@@ -1591,10 +1633,12 @@ function CodeQueueLivenessPanel({ diagnostics, queue, onRaw }: AnyRecord) {
className: "codex-liveness-panel",
},
h("div", { className: "codex-liveness-grid", "data-testid": "codex-liveness-diagnostics" },
h(LivenessMetric, { tone, label: "健康状态", value: state, hint: diagnostics?.splitBrain ? "split-brain" : diagnostics?.degraded ? "degraded" : "ready" }),
h(LivenessMetric, { tone, label: "健康状态", value: stateLabel, hint: recommendedAction }),
h(LivenessMetric, { tone, label: "Effective liveness", value: effectiveLiveness, hint: livenessText }),
h(LivenessMetric, { label: "PostgreSQL active", value: String(diagnostics?.databaseActiveTaskCount ?? queue?.databaseActiveTaskCount ?? 0), hint: compactIdList(diagnostics?.databaseActiveTaskIds ?? queue?.databaseActiveTaskIds) }),
h(LivenessMetric, { label: "Scheduler active", value: String(diagnostics?.schedulerActiveRunSlotCount ?? queue?.activeRunSlotCount ?? 0), hint: compactIdList(diagnostics?.schedulerActiveTaskIds ?? queue?.activeTaskIds) }),
h(LivenessMetric, { label: "Fresh heartbeat", value: String(stringArray(diagnostics?.heartbeatFreshTaskIds).length), hint: compactIdList(diagnostics?.heartbeatFreshTaskIds) }),
h(LivenessMetric, { tone: heartbeatRiskTaskIds.length > 0 ? "failed" : splitBrainLive ? "warn" : "", label: "Heartbeat risk", value: String(heartbeatRiskTaskIds.length), hint: heartbeatRiskTaskIds.length > 0 ? compactIdList(heartbeatRiskTaskIds) : splitBrainLive ? "fresh heartbeat: keep supervising" : "--" }),
h(LivenessMetric, { tone: stringArray(diagnostics?.traceGapNotStaleTaskIds).length > 0 ? "warn" : "", label: "Trace gap", value: String(stringArray(diagnostics?.traceGapTaskIds).length), hint: compactIdList(diagnostics?.traceGapNotStaleTaskIds) }),
h(LivenessMetric, { tone: stringArray(diagnostics?.staleRecoveryCandidateTaskIds).length > 0 ? "failed" : "", label: "Stale candidates", value: String(stringArray(diagnostics?.staleRecoveryCandidateTaskIds).length), hint: compactIdList(diagnostics?.staleRecoveryCandidateTaskIds) }),
h(LivenessMetric, { label: "Last scheduler heartbeat", value: fmtRelativeAge(diagnostics?.lastSchedulerHeartbeatAt), hint: String(diagnostics?.lastSchedulerHeartbeatAt || "--") }),
@@ -1602,6 +1646,10 @@ function CodeQueueLivenessPanel({ diagnostics, queue, onRaw }: AnyRecord) {
h(LivenessMetric, { label: "Last trace persist", value: fmtRelativeAge(diagnostics?.lastPersistedTraceAt), hint: String(diagnostics?.lastPersistedTraceAt || "--") }),
h(LivenessMetric, { tone: oaPublisher?.lastError ? "warn" : "", label: "OA publisher", value: `${Number(oaPublisher?.pending || 0)} pending`, hint: oaPublisher?.lastError ? shortText(oaPublisher.lastError, 90) : "ok" }),
),
h("div", { className: `codex-liveness-advisory ${tone}` },
h("b", null, splitBrainLive ? "Observing split" : heartbeatRiskTaskIds.length > 0 ? "Heartbeat risk" : "Liveness note"),
h("span", null, livenessText),
),
reasons.length > 0 ? h("div", { className: "codex-liveness-reasons" }, reasons.map((reason: string) => h("span", { key: reason }, reason))) : null,
);
}
@@ -737,6 +737,14 @@ fn execution_diagnostics_from_tasks(tasks: &[TaskMeta], now: &str) -> Value {
.filter(|task_id| heartbeat_expired_set.contains(*task_id))
.cloned()
.collect();
let mut heartbeat_risk_task_ids: Vec<String> = heartbeat_expired_task_ids
.iter()
.chain(heartbeat_missing_task_ids.iter())
.chain(stale_recovery_candidate_task_ids.iter())
.cloned()
.collect();
heartbeat_risk_task_ids.sort();
heartbeat_risk_task_ids.dedup();
let mut trace_gap_task_ids: Vec<String> = active_heartbeats
.iter()
.filter(|(task_id, heartbeat)| {
@@ -754,6 +762,7 @@ fn execution_diagnostics_from_tasks(tasks: &[TaskMeta], now: &str) -> Value {
.collect();
let split_brain = !database_active_task_ids.is_empty() && !heartbeat_fresh_task_ids.is_empty();
let split_brain_live = split_brain && heartbeat_risk_task_ids.is_empty();
let stale_active = !stale_recovery_candidate_task_ids.is_empty();
let degraded = split_brain
|| stale_active
@@ -769,6 +778,33 @@ fn execution_diagnostics_from_tasks(tasks: &[TaskMeta], now: &str) -> Value {
} else {
"healthy"
};
let effective_liveness = if !heartbeat_risk_task_ids.is_empty() {
"at-risk"
} else if split_brain_live {
"live"
} else if degraded {
"degraded"
} else {
"healthy"
};
let recommended_action = if !heartbeat_risk_task_ids.is_empty() {
"investigate-heartbeat-risk"
} else if split_brain_live {
"continue-supervision"
} else if degraded {
"observe-degraded"
} else {
"none"
};
let liveness_summary = if !heartbeat_risk_task_ids.is_empty() {
"Heartbeat is expired, missing, or stale-recovery eligible for at least one database-active task; investigate before assuming the task is still live."
} else if split_brain_live {
"PostgreSQL and the local control-plane view are split, but scheduler-owned heartbeat is fresh; treat the task as live and continue supervision."
} else if degraded {
"Execution diagnostics are degraded; inspect the listed reasons together with heartbeat and trace progress."
} else {
"Execution diagnostics are healthy."
};
let mut reasons = Vec::new();
if split_brain {
reasons.push("postgres control-plane has database-active tasks while its local active slots are empty, but scheduler heartbeat is fresh");
@@ -795,6 +831,10 @@ fn execution_diagnostics_from_tasks(tasks: &[TaskMeta], now: &str) -> Value {
"health": state,
"degraded": degraded,
"splitBrain": split_brain,
"splitBrainLive": split_brain_live,
"effectiveLiveness": effective_liveness,
"recommendedAction": recommended_action,
"livenessSummary": liveness_summary,
"executionStateSource": "postgres-control-plane",
"controlPlane": "master-code-queue-mgr",
"databaseActiveTaskIds": database_active_task_ids,
@@ -812,6 +852,7 @@ fn execution_diagnostics_from_tasks(tasks: &[TaskMeta], now: &str) -> Value {
"heartbeatExpiredTaskIds": heartbeat_expired_task_ids,
"heartbeatMissingTaskIds": heartbeat_missing_task_ids,
"staleRecoveryCandidateTaskIds": stale_recovery_candidate_task_ids,
"heartbeatRiskTaskIds": heartbeat_risk_task_ids,
"traceGapTaskIds": trace_gap_task_ids,
"traceGapNotStaleTaskIds": trace_gap_not_stale_task_ids,
"schedulerHeartbeatStaleMs": SCHEDULER_HEARTBEAT_STALE_MS,
@@ -859,7 +859,13 @@ function controlPlaneExecutionDiagnostics(tasks: QueueTask[], now = nowIso()): J
const staleRecoveryCandidateTaskIds = heartbeatExpiredTaskIds.slice();
const traceGapTaskIds = heartbeatRows.filter((row) => traceGap(row.task, row.heartbeat, nowMs)).map((row) => row.task.id).sort();
const traceGapNotStaleTaskIds = traceGapTaskIds.filter((taskId) => heartbeatFreshSet.has(taskId));
const heartbeatRiskTaskIds = Array.from(new Set([
...heartbeatExpiredTaskIds,
...heartbeatMissingTaskIds,
...staleRecoveryCandidateTaskIds,
])).sort();
const splitBrain = databaseActiveTaskIds.length > 0 && heartbeatFreshTaskIds.length > 0;
const splitBrainLive = splitBrain && heartbeatRiskTaskIds.length === 0;
const staleActive = staleRecoveryCandidateTaskIds.length > 0;
const degraded = splitBrain
|| staleActive
@@ -867,6 +873,15 @@ function controlPlaneExecutionDiagnostics(tasks: QueueTask[], now = nowIso()): J
|| heartbeatExpiredTaskIds.length > 0
|| traceGapTaskIds.length > 0;
const state = splitBrain ? "split-brain" : staleActive ? "stale-active" : degraded ? "degraded" : "healthy";
const effectiveLiveness = heartbeatRiskTaskIds.length > 0 ? "at-risk" : splitBrainLive ? "live" : degraded ? "degraded" : "healthy";
const recommendedAction = heartbeatRiskTaskIds.length > 0 ? "investigate-heartbeat-risk" : splitBrainLive ? "continue-supervision" : degraded ? "observe-degraded" : "none";
const livenessSummary = heartbeatRiskTaskIds.length > 0
? "Heartbeat is expired, missing, or stale-recovery eligible for at least one database-active task; investigate before assuming the task is still live."
: splitBrainLive
? "PostgreSQL and the local control-plane view are split, but scheduler-owned heartbeat is fresh; treat the task as live and continue supervision."
: degraded
? "Execution diagnostics are degraded; inspect the listed reasons together with heartbeat and trace progress."
: "Execution diagnostics are healthy.";
const reasons: string[] = [];
if (splitBrain) reasons.push("postgres control-plane has database-active tasks while its local active slots are empty, but scheduler heartbeat is fresh");
if (staleActive) reasons.push("owner heartbeat is expired and scheduler has no local active run for at least one database-active task");
@@ -877,6 +892,10 @@ function controlPlaneExecutionDiagnostics(tasks: QueueTask[], now = nowIso()): J
health: state,
degraded,
splitBrain,
splitBrainLive,
effectiveLiveness,
recommendedAction,
livenessSummary,
executionStateSource: "postgres-control-plane",
controlPlane: "master-code-queue-mgr",
databaseActiveTaskIds,
@@ -894,6 +913,7 @@ function controlPlaneExecutionDiagnostics(tasks: QueueTask[], now = nowIso()): J
heartbeatExpiredTaskIds,
heartbeatMissingTaskIds,
staleRecoveryCandidateTaskIds,
heartbeatRiskTaskIds,
traceGapTaskIds,
traceGapNotStaleTaskIds,
schedulerHeartbeatStaleMs,
@@ -158,10 +158,16 @@ export function buildExecutionDiagnostics(input: ExecutionDiagnosticsInput): Cod
.sort();
const traceGapNotStaleTaskIds = traceGapTaskIds.filter((taskId) => heartbeatFreshSet.has(taskId));
const schedulerLocalMissingIds = databaseActiveTaskIds.filter((taskId) => !schedulerActiveTaskSet.has(taskId));
const heartbeatRiskTaskIds = Array.from(new Set([
...heartbeatExpiredTaskIds,
...heartbeatMissingTaskIds,
...staleRecoveryCandidateTaskIds,
])).sort();
const splitBrain = input.executionStateSource === "postgres-control-plane"
&& databaseActiveTaskIds.length > 0
&& schedulerActiveTaskIds.length === 0
&& heartbeatFreshTaskIds.length > 0;
const splitBrainLive = splitBrain && heartbeatRiskTaskIds.length === 0;
const staleActive = staleRecoveryCandidateTaskIds.length > 0;
const degraded = splitBrain
|| staleActive
@@ -171,6 +177,27 @@ export function buildExecutionDiagnostics(input: ExecutionDiagnosticsInput): Cod
|| traceGapTaskIds.length > 0
|| (typeof input.oaPublisher === "object" && input.oaPublisher !== null && !Array.isArray(input.oaPublisher) && (Number((input.oaPublisher as Record<string, JsonValue>).pending ?? 0) > 0 || (input.oaPublisher as Record<string, JsonValue>).lastError !== null));
const state = splitBrain ? "split-brain" : staleActive ? "stale-active" : degraded ? "degraded" : "healthy";
const effectiveLiveness = heartbeatRiskTaskIds.length > 0
? "at-risk"
: splitBrainLive
? "live"
: degraded
? "degraded"
: "healthy";
const recommendedAction = heartbeatRiskTaskIds.length > 0
? "investigate-heartbeat-risk"
: splitBrainLive
? "continue-supervision"
: degraded
? "observe-degraded"
: "none";
const livenessSummary = heartbeatRiskTaskIds.length > 0
? "Heartbeat is expired, missing, or stale-recovery eligible for at least one database-active task; investigate before assuming the task is still live."
: splitBrainLive
? "PostgreSQL and the local control-plane view are split, but scheduler-owned heartbeat is fresh; treat the task as live and continue supervision."
: degraded
? "Execution diagnostics are degraded; inspect the listed reasons together with heartbeat and trace progress."
: "Execution diagnostics are healthy.";
const lastSchedulerHeartbeatAt = maxTimestamp(activeHeartbeats.map((heartbeat) => heartbeat.lastLocalHeartbeatAt));
const lastObservedAgentEventAt = maxTimestamp(activeHeartbeats.map((heartbeat) => heartbeat.lastObservedAgentEventAt));
const lastPersistedTraceAt = maxTimestamp(activeHeartbeats.map((heartbeat) => heartbeat.lastPersistedTraceAt));
@@ -190,6 +217,10 @@ export function buildExecutionDiagnostics(input: ExecutionDiagnosticsInput): Cod
health: state,
degraded,
splitBrain,
splitBrainLive,
effectiveLiveness,
recommendedAction,
livenessSummary,
executionStateSource: input.executionStateSource,
controlPlane: input.controlPlane,
databaseActiveTaskIds,
@@ -207,6 +238,7 @@ export function buildExecutionDiagnostics(input: ExecutionDiagnosticsInput): Cod
heartbeatExpiredTaskIds,
heartbeatMissingTaskIds,
staleRecoveryCandidateTaskIds,
heartbeatRiskTaskIds,
traceGapTaskIds,
traceGapNotStaleTaskIds,
schedulerHeartbeatStaleMs: staleMs,
@@ -33,6 +33,8 @@ export type TranscriptKind = "ran" | "explored" | "edited" | "plan" | "message"
export type CodeQueueExecutionPlane = "scheduler-execution-plane" | "postgres-control-plane";
export type CodeQueueExecutionHealth = "healthy" | "degraded" | "split-brain" | "stale-active";
export type CodeQueueEffectiveLiveness = "healthy" | "live" | "degraded" | "at-risk";
export type CodeQueueRecommendedAction = "none" | "continue-supervision" | "observe-degraded" | "investigate-heartbeat-risk";
export type CodeQueueRecoveryCandidateReason =
| "scheduler-local-active"
@@ -64,6 +66,10 @@ export interface CodeQueueExecutionDiagnostics {
health: CodeQueueExecutionHealth;
degraded: boolean;
splitBrain: boolean;
splitBrainLive: boolean;
effectiveLiveness: CodeQueueEffectiveLiveness;
recommendedAction: CodeQueueRecommendedAction;
livenessSummary: string;
executionStateSource: CodeQueueExecutionPlane;
controlPlane: string;
databaseActiveTaskIds: string[];
@@ -81,6 +87,7 @@ export interface CodeQueueExecutionDiagnostics {
heartbeatExpiredTaskIds: string[];
heartbeatMissingTaskIds: string[];
staleRecoveryCandidateTaskIds: string[];
heartbeatRiskTaskIds: string[];
traceGapTaskIds: string[];
traceGapNotStaleTaskIds: string[];
schedulerHeartbeatStaleMs: number;