fix: clarify code queue liveness snapshots

This commit is contained in:
Codex
2026-05-23 12:36:20 +00:00
parent 15d074424c
commit 758377c551
4 changed files with 338 additions and 23 deletions
+5 -1
View File
@@ -289,9 +289,13 @@ commander 视图的任务分类必须是确定性字段,至少区分 `business
队列诊断中的 `split-brain` 表示控制面/执行面观测分裂,不自动证明任务已经死亡。只要任务 heartbeat 还在刷新、trace 仍在推进,就不能把它判成服务中断或要求立刻 stop;应把它视为 `splitBrainLive=true` 的 live 任务,继续监督并推进 #20 里的已排任务,而不是 interrupt、替换或把 backend 当成已经挂掉。队列摘要应显示 `effectiveLiveness=live``splitBrainLive=true``recommendedAction=continue-supervision`compact 输出还应在 `executionDiagnostics.liveness` 中重复这些低噪声字段,并突出 `activeHeartbeatCount`、有界 `heartbeatFreshTaskIds``databaseActiveTaskCount``schedulerActiveRunSlotCount`。当 master/control-plane 的 `schedulerActiveRunSlotCount=0``heartbeatFreshTaskIds` 非空时,active 数应优先按 scheduler heartbeat 摘要解释为 live,而不是按 master 本地 slot 0 解释为执行停摆。只有 heartbeat expired/missing 或满足 stale-recovery 条件时,才应显示 `effectiveLiveness=at-risk` 并进入恢复判断。
`codex submit` 成功后的 `queue` 是 submit-confirmation 即时 bounded snapshot,不是恢复判据。它必须同时区分 `submitted.taskStates[]` 中本次提交任务的 queued/running 状态、`queue.countContext.databaseActive` / `activity.databaseActiveTaskCount` 中 PostgreSQL active running、`activity.schedulerHeartbeatFreshness` 中 scheduler heartbeat freshness,以及 `activity.recovery` 中的 transient risk。若同一个提交回显里出现 `counts.running>0``queued>0``heartbeatRiskTaskCount>0``staleRecoveryCandidateTaskCount>0`,但 `lastObservedAgentEventAt` 明显早于 submit time 或还没有下一次 supervisor poll 确认,输出应保留候选可见性并给出 `re-poll supervisor before recovery`;这类单次 bounded snapshot 只能设置 `attentionRequired=true`,不得把 `commanderConcurrency.interventionRequired` 直接升级为高风险恢复。默认 drill-down 是重新运行 `codex tasks --view supervisor --limit 20` 或 raw overview,而不是 restart、cancel、interrupt 或 DB write。
默认 supervisor poll 也遵循同一低噪声语义:heartbeat expired/missing、`heartbeatRiskTaskIds``staleRecoveryCandidateTaskIds` 必须可见,但第一次 poll 只表示 `transient-needs-repoll``activity.recovery.hint` 应为 `re-poll supervisor before recovery`。只有 repeated poll 仍确认 owner heartbeat expired、scheduler local no active run、database-active task 仍存在,并且输出显式带 `repeatedPollConfirmed=true` 或 confirmed stale candidate,才允许进入 bounded dry-run reconcile;真实恢复仍受高风险边界约束。
stale-active 恢复和 `/api/scheduler/reconcile?staleMs=...` 诊断入口的 heartbeat stale 阈值必须按安全下限归一化:缺省和低于默认 5 分钟的值都按 5 分钟处理,过大值按 24 小时上限截断,并在结构化响应中返回 `requestedStaleMs*``staleMsAdjusted``staleMsAdjustmentReason``minStaleMs``maxStaleMs`。任何 `staleMs=0` 或过低阈值都不能把仍有 fresh scheduler heartbeat 的任务判成 stale/recoverable。
`codex queues``codex tasks --view commander` 和默认 supervisor 视图的 `activity` / `commanderConcurrency` 是指挥官并发治理的主读数。并发决策固定使用 `commanderConcurrency.activeRunnerCount` 或 commander `activeRunners.count`,它等于 `activity.effectiveActiveTaskCount`15 并发策略的可补窗口按 `15 - activeRunnerCount` 计算,不能用 `activeQueueIds.length` 或 scheduler-local slot 数替代。`effectiveActiveTaskCount` 表示用于调度判断的有效活跃任务数;`databaseRunningTaskCount` 来自 PostgreSQL 中 `running` 状态计数;`databaseActiveTaskCount` 覆盖 running/judging 等数据库活跃任务;`heartbeatFreshActiveTaskCount` 表示 heartbeat-fresh 的有效 runner 数;`schedulerLocalActiveQueueCount``schedulerLocalActiveRunSlotCount` 只表示当前控制面本地可见 active run slots。`activeQueueIds``activeQueueCount` 是 scheduler-local 字段,可能在 `counts.running>0` 且 heartbeat 新鲜时为 0;看到这种组合时应按 `activity.effectiveActiveTaskCount``activity.heartbeatFreshActiveTaskCount``splitBrainLive` 决策,不得把空 `activeQueueIds` 当作零并发或停摆证据。`commanderConcurrency.splitBrainDisposition=live-count-as-active` 表示 split-brain 仍是 live 且应计入 active runner`interventionRequired=true`heartbeat risk、stale recovery candidates,或非 `continue-supervision` 的 recommended action 才进入人工介入/恢复判断
`codex queues``codex tasks --view commander` 和默认 supervisor 视图的 `activity` / `commanderConcurrency` 是指挥官并发治理的主读数。并发决策固定使用 `commanderConcurrency.activeRunnerCount` 或 commander `activeRunners.count`,它等于 `activity.effectiveActiveTaskCount`15 并发策略的可补窗口按 `15 - activeRunnerCount` 计算,不能用 `activeQueueIds.length` 或 scheduler-local slot 数替代。`effectiveActiveTaskCount` 表示用于调度判断的有效活跃任务数;`databaseRunningTaskCount` 来自 PostgreSQL 中 `running` 状态计数;`databaseActiveTaskCount` 覆盖 running/judging 等数据库活跃任务;`heartbeatFreshActiveTaskCount` 表示 heartbeat-fresh 的有效 runner 数;`schedulerLocalActiveQueueCount``schedulerLocalActiveRunSlotCount` 只表示当前控制面本地可见 active run slots。`activeQueueIds``activeQueueCount` 是 scheduler-local 字段,可能在 `counts.running>0` 且 heartbeat 新鲜时为 0;看到这种组合时应按 `activity.effectiveActiveTaskCount``activity.heartbeatFreshActiveTaskCount``splitBrainLive` 决策,不得把空 `activeQueueIds` 当作零并发或停摆证据。`commanderConcurrency.splitBrainDisposition=live-count-as-active` 表示 split-brain 仍是 live 且应计入 active runner`attentionRequired=true` 表示需要人工看一眼或重新 poll`interventionRequired=true` 才表示当前输出已经足以进入高风险介入路径。单次 heartbeat risk、stale recovery candidates`recommendedAction=investigate-heartbeat-risk` 应先落到 `attentionRequired=true``re-poll supervisor before recovery`,不得直接等价为恢复授权
单次 `provider is not online`、SSH 超时、proxy 超时或 registry 请求失败只能证明“当前观察路径失败”,不能单独升级为 D601 全局离线、CI/CD 全局阻塞或业务任务不可推进。指挥官和 runner 必须用多信号裁决运行面状态,至少区分以下观察面:
@@ -147,6 +147,46 @@ export function runCodeQueueSubmitSummaryContract(): JsonRecord {
assertCondition(String(liveStateDisclosure.splitBrainDisposition || "").includes("continue supervision"), "stateDisclosure should explain split-brain-live disposition", liveStateDisclosure);
assertCondition(String(liveStateDisclosure.idsUnavailableMeaning || "").includes("not that there are no tasks"), "stateDisclosure should prevent empty-list misread", liveStateDisclosure);
const transientRiskSubmit = compactSubmitSuccessResponseForTest({
tasks: [task("codex_submitted_queued_during_stale_snapshot", "queued", "high-concurrency")],
queue: {
counts: { running: 7, queued: 1 },
activeTaskIds: { items: [], count: 7, returned: 0, truncated: true },
queuedTaskIds: { items: [], count: 1, returned: 0, truncated: true },
databaseActiveTaskCount: 7,
executionDiagnostics: {
state: "stale-active",
effectiveLiveness: "at-risk",
recommendedAction: "investigate-heartbeat-risk",
databaseActiveTaskCount: 7,
databaseActiveTaskIds: manyIds("stale-db-active", 7),
schedulerActiveRunSlotCount: 0,
activeHeartbeatCount: 7,
lastSchedulerHeartbeatAt: "2026-05-22T23:50:00.000Z",
lastObservedAgentEventAt: "2026-05-22T23:49:30.000Z",
heartbeatExpiredTaskIds: manyIds("stale-db-active", 7),
staleRecoveryCandidateTaskIds: manyIds("stale-db-active", 7),
heartbeatRiskTaskIds: manyIds("stale-db-active", 7),
},
},
}, { ok: true, status: 200 }, { mode: "local-atomic-directory-submit-serialization", acquiredAfterMs: 1, heldMs: 2, throttleMs: 2000 });
const transientQueue = asRecord(asRecord(transientRiskSubmit).queue);
const transientActivity = asRecord(transientQueue.activity);
const transientConcurrency = asRecord(transientQueue.commanderConcurrency);
const transientRecovery = asRecord(transientActivity.recovery);
const transientStateDisclosure = asRecord(transientQueue.stateDisclosure);
const transientDiagnostics = asRecord(transientQueue.executionDiagnostics);
const transientDiagnosticsRecovery = asRecord(transientDiagnostics.recovery);
assertCondition(transientActivity.heartbeatRiskTaskCount === 7 && transientActivity.staleRecoveryCandidateTaskCount === 7, "submit activity should keep stale-active candidates visible", transientActivity);
assertCondition(transientRecovery.disposition === "transient-needs-repoll", "submit stale snapshot should be classified as transient until re-polled", transientRecovery);
assertCondition(transientRecovery.hint === "re-poll supervisor before recovery", "submit stale snapshot should emit the re-poll hint", transientRecovery);
assertCondition(transientRecovery.boundedSnapshot === true && transientRecovery.snapshotRole === "submit-confirmation", "submit recovery context should identify bounded confirmation snapshot", transientRecovery);
assertCondition(transientRecovery.lastObservedAgentEventBeforeSubmit === true, "submit recovery context should compare agent event time with submit time", transientRecovery);
assertCondition(transientRecovery.recoveryMutationAllowedByThisSnapshot === false, "single submit snapshot must not allow recovery mutation", transientRecovery);
assertCondition(transientConcurrency.attentionRequired === true && transientConcurrency.interventionRequired === false, "submit heartbeat risk should require attention but not direct high-risk intervention", transientConcurrency);
assertCondition(transientStateDisclosure.transientRiskHint === "re-poll supervisor before recovery", "stateDisclosure should repeat low-noise re-poll hint", transientStateDisclosure);
assertCondition(transientDiagnosticsRecovery.snapshotRole === "submit-confirmation" && transientDiagnosticsRecovery.hint === "re-poll supervisor before recovery", "submit execution diagnostics should carry submit snapshot recovery semantics", transientDiagnosticsRecovery);
const queuedIdsOmitted = compactSubmitSuccessResponseForTest({
tasks: [task("codex_submitted_already_running", "running", "live-fast-lane")],
queue: {
@@ -177,6 +217,7 @@ export function runCodeQueueSubmitSummaryContract(): JsonRecord {
"running count context falls back to database active ids",
"nonzero count with omitted id lists uses idsUnavailable instead of items=[]",
"split-brain-live submit summary says continue supervision and count as active",
"stale submit snapshots preserve candidates but require re-poll before recovery",
"bounded id previews disclose omitted counts",
"submit confirmation omits prompt text and remains low-noise",
],
@@ -241,6 +241,64 @@ function splitBrainLiveSupervisorFixtureResponse(path: string): JsonRecord {
};
}
function staleSnapshotSupervisorFixtureResponse(path: string): JsonRecord {
if (path.includes("/summary")) return fixtureResponse(path);
assertCondition(path.startsWith("/api/microservices/code-queue/proxy/api/tasks/overview"), "unexpected path", { path });
const staleTaskIds = manyIds("stale-active", 7);
const tasks = [
...staleTaskIds.map((taskId, index) => task(
taskId,
"running",
`2026-05-22T02:${String(50 - index).padStart(2, "0")}:00.000Z`,
)),
task("stale-snapshot-new-queued", "queued", "2026-05-22T02:55:00.000Z"),
];
return {
ok: true,
status: 200,
body: {
ok: true,
queue: {
counts: { running: 7, queued: 1 },
activeQueueIds: [],
activeTaskIds: { items: [], count: 7, returned: 0, truncated: true },
activeRunSlotCount: 0,
databaseActiveTaskCount: 7,
executionDiagnostics: {
state: "stale-active",
splitBrain: false,
effectiveLiveness: "at-risk",
recommendedAction: "investigate-heartbeat-risk",
now: "2026-05-22T03:00:00.000Z",
databaseActiveTaskCount: 7,
databaseActiveTaskIds: staleTaskIds,
schedulerActiveRunSlotCount: 0,
schedulerActiveTaskIds: [],
activeHeartbeatCount: 7,
activeHeartbeatTaskIds: staleTaskIds,
heartbeatFreshTaskIds: [],
heartbeatExpiredTaskIds: staleTaskIds,
heartbeatMissingTaskIds: [],
staleRecoveryCandidateTaskIds: staleTaskIds,
heartbeatRiskTaskIds: staleTaskIds,
lastSchedulerHeartbeatAt: "2026-05-22T02:45:00.000Z",
lastObservedAgentEventAt: "2026-05-22T02:44:30.000Z",
reasons: ["owner heartbeat is expired and scheduler has no local active run for at least one database-active task"],
},
},
pagination: {
limit: 200,
returned: 8,
total: 8,
hasMore: false,
nextBeforeId: null,
includeActive: true,
},
tasks,
},
};
}
export function runCodeQueueSupervisorDisclosureContract(): JsonRecord {
const supervisor = codexTasksQueryForTest(["--view", "supervisor", "--limit", "20"], fixtureResponse);
const cappedLimit = codexTasksQueryForTest(["--view", "supervisor", "--limit", "260"], fixtureResponse);
@@ -249,6 +307,7 @@ export function runCodeQueueSupervisorDisclosureContract(): JsonRecord {
const runningFiltered = codexTasksQueryForTest(["--status", "running", "--limit", "40"], manyRunningFixtureResponse);
const unreadFiltered = codexTasksQueryForTest(["--unread", "--limit", "20"], fixtureResponse);
const splitBrainLive = codexTasksQueryForTest(["--view", "supervisor", "--limit", "20"], splitBrainLiveSupervisorFixtureResponse);
const staleSnapshot = codexTasksQueryForTest(["--view", "supervisor", "--limit", "20"], staleSnapshotSupervisorFixtureResponse);
const supervisorBody = JSON.stringify(supervisor);
const fullBody = JSON.stringify(full);
@@ -283,6 +342,12 @@ export function runCodeQueueSupervisorDisclosureContract(): JsonRecord {
const splitBrainLiveActivity = asRecord(splitBrainLiveView.activity);
const splitBrainLiveConcurrency = asRecord(splitBrainLiveView.commanderConcurrency);
const splitBrainLiveCounts = asRecord(splitBrainLiveView.counts);
const staleSnapshotView = asRecord(asRecord(staleSnapshot).supervisor);
const staleSnapshotActivity = asRecord(staleSnapshotView.activity);
const staleSnapshotConcurrency = asRecord(staleSnapshotView.commanderConcurrency);
const staleSnapshotDiagnostics = asRecord(staleSnapshotView.executionDiagnostics);
const staleSnapshotRecovery = asRecord(staleSnapshotActivity.recovery);
const staleSnapshotDiagnosticsRecovery = asRecord(staleSnapshotDiagnostics.recovery);
const cappedFilters = asRecord(cappedSupervisorView.filters);
const cappedSource = asRecord(cappedSupervisorView.source);
const cappedLimitPolicy = asRecord(asRecord(cappedSupervisorView.disclosure).limitPolicy);
@@ -359,6 +424,14 @@ export function runCodeQueueSupervisorDisclosureContract(): JsonRecord {
assertCondition(String(splitBrainLiveConcurrency.decisionRule ?? "").includes("15 - activeRunnerCount"), "split-brain supervisor should give 15-concurrency arithmetic", splitBrainLiveConcurrency);
assertCondition(String(splitBrainLiveActivity.activeQueueIdsNote ?? "").includes("zero local queue ids does not mean zero active runners"), "split-brain supervisor activity should explain activeQueueIds are local-only", splitBrainLiveActivity);
assertCondition(String(splitBrainLiveActivity.interpretation ?? "").includes("continue supervision"), "split-brain supervisor activity should not imply scheduler stoppage", splitBrainLiveActivity);
assertCondition(staleSnapshotActivity.heartbeatRiskTaskCount === 7 && staleSnapshotActivity.staleRecoveryCandidateTaskCount === 7, "stale supervisor snapshot should keep heartbeat-risk candidates visible", staleSnapshotActivity);
assertCondition(staleSnapshotActivity.effectiveActiveTaskCount === 7 && staleSnapshotActivity.databaseRunningTaskCount === 7, "stale supervisor snapshot should preserve database-active running count", staleSnapshotActivity);
assertCondition(staleSnapshotRecovery.disposition === "transient-needs-repoll" && staleSnapshotRecovery.hint === "re-poll supervisor before recovery", "stale supervisor snapshot should ask for re-poll before recovery", staleSnapshotRecovery);
assertCondition(staleSnapshotRecovery.snapshotRole === "supervisor-poll" && staleSnapshotRecovery.boundedSnapshot === false, "supervisor recovery context should distinguish poll from submit confirmation", staleSnapshotRecovery);
assertCondition(staleSnapshotRecovery.recoveryMutationAllowedByThisSnapshot === false, "single supervisor poll must not allow recovery mutation", staleSnapshotRecovery);
assertCondition(staleSnapshotConcurrency.attentionRequired === true && staleSnapshotConcurrency.interventionRequired === false, "stale supervisor snapshot should require attention but not immediate high-risk intervention", staleSnapshotConcurrency);
assertCondition(staleSnapshotDiagnostics.effectiveLiveness === "at-risk" && staleSnapshotDiagnostics.recommendedAction === "investigate-heartbeat-risk", "stale supervisor diagnostics should keep at-risk visibility", staleSnapshotDiagnostics);
assertCondition(staleSnapshotDiagnosticsRecovery.hint === "re-poll supervisor before recovery", "stale supervisor diagnostics should carry the re-poll hint", staleSnapshotDiagnosticsRecovery);
return {
ok: true,
@@ -376,6 +449,7 @@ export function runCodeQueueSupervisorDisclosureContract(): JsonRecord {
"full view remains detailed",
"split-brain live supervisor activity distinguishes scheduler-local, database, and heartbeat counts",
"commander concurrency block names the active runner count and 15-concurrency rule",
"stale-active supervisor snapshot remains visible but requires re-poll before recovery",
],
supervisorChars: supervisorBody.length,
fullChars: fullBody.length,
+218 -22
View File
@@ -300,6 +300,17 @@ interface CompactTaskMutationResponseOptions {
interface CompactSubmitQueueConfirmationOptions {
submittedTasks?: Record<string, unknown>[];
idPreviewLimit?: number;
submittedAt?: string | null;
}
type CodeQueueLivenessSnapshotRole = "submit-confirmation" | "supervisor-poll" | "queue-summary";
interface CompactCodeQueueActivityOptions {
schedulerLocalActiveQueueIds?: string[];
runnableQueueCount?: number | null;
snapshotRole?: CodeQueueLivenessSnapshotRole;
snapshotAt?: string | null;
submittedAt?: string | null;
}
interface CodexTasksOptions {
@@ -649,6 +660,26 @@ function asNumber(value: unknown, fallback = 0): number {
return typeof value === "number" && Number.isFinite(value) ? value : fallback;
}
function timestampMs(value: unknown): number | null {
const text = asString(value);
if (text.length === 0) return null;
const parsed = Date.parse(text);
return Number.isFinite(parsed) ? parsed : null;
}
function maxIsoTimestamp(values: unknown[]): string | null {
let best: string | null = null;
let bestMs = -Infinity;
for (const value of values) {
const text = asString(value);
const parsed = timestampMs(text);
if (parsed === null || parsed < bestMs) continue;
best = text;
bestMs = parsed;
}
return best;
}
function finiteNumber(value: unknown): number | null {
return typeof value === "number" && Number.isFinite(value) ? value : null;
}
@@ -1696,16 +1727,101 @@ function activeHeartbeatCountFromDiagnostics(record: Record<string, unknown>, ac
return Number.isFinite(explicit) ? explicit : Math.max(activeHeartbeatTaskIds.count, heartbeatFreshTaskIds.count);
}
function compactRecoveryDecision(record: Record<string, unknown>, counts: {
heartbeatRiskTaskCount: number;
staleRecoveryCandidateTaskCount: number;
}, options: {
snapshotRole?: CodeQueueLivenessSnapshotRole;
snapshotAt?: string | null;
submittedAt?: string | null;
} = {}): Record<string, unknown> {
const confirmedIds = boundedUniqueStringList(record.staleRecoveryConfirmedTaskIds ?? record.confirmedStaleRecoveryCandidateTaskIds);
const repeatedPollConfirmed = asBoolean(record.repeatedPollConfirmed)
|| asBoolean(record.recoveryRepeatedPollConfirmed)
|| confirmedIds.count > 0;
const riskVisible = counts.heartbeatRiskTaskCount > 0 || counts.staleRecoveryCandidateTaskCount > 0;
const recoveryMutationAllowedByThisSnapshot = riskVisible && repeatedPollConfirmed;
const rePollBeforeRecovery = riskVisible && !repeatedPollConfirmed;
const snapshotRole = options.snapshotRole ?? "supervisor-poll";
const snapshotAt = options.snapshotAt ?? (asString(record.now) || null);
const submittedAt = options.submittedAt ?? null;
const lastObservedAgentEventAt = asString(record.lastObservedAgentEventAt) || null;
const lastSchedulerHeartbeatAt = asString(record.lastSchedulerHeartbeatAt) || null;
const lastObservedMs = timestampMs(lastObservedAgentEventAt);
const submittedMs = timestampMs(submittedAt);
const snapshotMs = timestampMs(snapshotAt);
const lastObservedAgentEventBeforeSubmit = lastObservedMs !== null && submittedMs !== null && lastObservedMs < submittedMs;
const lastObservedAgentEventBeforeSnapshot = lastObservedMs !== null && snapshotMs !== null && lastObservedMs < snapshotMs;
const hint = riskVisible
? recoveryMutationAllowedByThisSnapshot
? "run dry-run reconcile before recovery"
: "re-poll supervisor before recovery"
: null;
if (!riskVisible) {
return {
riskVisible: false,
disposition: "none",
hint: null,
rePollBeforeRecovery: false,
pollConfirmationRequired: false,
repeatedPollConfirmed,
recoveryMutationAllowedByThisSnapshot: false,
heartbeatRiskTaskCount: 0,
staleRecoveryCandidateTaskCount: 0,
snapshotRole,
boundedSnapshot: snapshotRole === "submit-confirmation",
};
}
return {
riskVisible,
disposition: !riskVisible
? "none"
: recoveryMutationAllowedByThisSnapshot
? "confirmed-stale-active-candidate"
: "transient-needs-repoll",
hint,
rePollBeforeRecovery,
pollConfirmationRequired: rePollBeforeRecovery,
repeatedPollConfirmed,
recoveryMutationAllowedByThisSnapshot,
staleActiveCandidateVisibility: counts.staleRecoveryCandidateTaskCount > 0 ? "visible-candidate-not-mutation-proof" : "none",
heartbeatRiskTaskCount: counts.heartbeatRiskTaskCount,
staleRecoveryCandidateTaskCount: counts.staleRecoveryCandidateTaskCount,
confirmedStaleRecoveryCandidateTaskIds: confirmedIds.items,
confirmedStaleRecoveryCandidateTaskCount: confirmedIds.count,
snapshotRole,
boundedSnapshot: snapshotRole === "submit-confirmation",
snapshotAt,
submittedAt,
lastObservedAgentEventAt,
lastSchedulerHeartbeatAt,
lastObservedAgentEventBeforeSubmit,
lastObservedAgentEventBeforeSnapshot,
nextPollCommand: rePollBeforeRecovery ? "bun scripts/cli.ts codex tasks --view supervisor --limit 20" : null,
dryRunReconcileCommand: recoveryMutationAllowedByThisSnapshot ? "bun scripts/cli.ts microservice proxy code-queue '/api/scheduler/reconcile?dryRun=1' --raw" : null,
mutationBoundary: "Never recover, restart, cancel, or interrupt from a single bounded liveness snapshot.",
};
}
function compactLivenessDecision(record: Record<string, unknown>, lists: {
activeHeartbeatTaskIds: { items: string[]; count: number; truncated: boolean; omitted: number };
heartbeatFreshTaskIds: { items: string[]; count: number; truncated: boolean; omitted: number };
heartbeatRiskTaskIds: { items: string[]; count: number };
databaseActiveTaskIds: { count: number };
}): Record<string, unknown> {
staleRecoveryCandidateTaskIds?: { count: number };
}, recoveryOptions: {
snapshotRole?: CodeQueueLivenessSnapshotRole;
snapshotAt?: string | null;
submittedAt?: string | null;
} = {}): Record<string, unknown> {
const splitBrainLive = splitBrainLiveFromDiagnostics(record);
const effectiveLiveness = effectiveLivenessFromDiagnostics(record);
const recommendedAction = recommendedActionFromDiagnostics(record);
const activeHeartbeatCount = activeHeartbeatCountFromDiagnostics(record, lists.activeHeartbeatTaskIds, lists.heartbeatFreshTaskIds);
const recovery = compactRecoveryDecision(record, {
heartbeatRiskTaskCount: lists.heartbeatRiskTaskIds.count,
staleRecoveryCandidateTaskCount: lists.staleRecoveryCandidateTaskIds?.count ?? stringList(record.staleRecoveryCandidateTaskIds).length,
}, recoveryOptions);
return {
effectiveLiveness,
recommendedAction,
@@ -1717,8 +1833,12 @@ function compactLivenessDecision(record: Record<string, unknown>, lists: {
databaseActiveTaskCount: asNumber(record.databaseActiveTaskCount, lists.databaseActiveTaskIds.count),
schedulerActiveRunSlotCount: record.schedulerActiveRunSlotCount ?? null,
heartbeatRiskTaskCount: lists.heartbeatRiskTaskIds.count,
staleRecoveryCandidateTaskCount: lists.staleRecoveryCandidateTaskIds?.count ?? stringList(record.staleRecoveryCandidateTaskIds).length,
recovery,
interpretation: splitBrainLive
? "scheduler heartbeat is fresh; treat active task count from heartbeat as live and continue supervision"
: recovery.rePollBeforeRecovery === true
? "heartbeat risk is visible in this bounded snapshot; re-poll supervisor before recovery"
: effectiveLiveness === "at-risk"
? "heartbeat risk is present; investigate heartbeat freshness before recovery"
: effectiveLiveness === "degraded"
@@ -1749,7 +1869,11 @@ function boundedInlineString(value: unknown, maxChars: number): { text: string |
};
}
function compactExecutionDiagnostics(value: unknown): Record<string, unknown> | null {
function compactExecutionDiagnostics(value: unknown, recoveryOptions: {
snapshotRole?: CodeQueueLivenessSnapshotRole;
snapshotAt?: string | null;
submittedAt?: string | null;
} = {}): Record<string, unknown> | null {
const record = asRecord(value);
if (record === null) return null;
const fullHeartbeatRiskTaskIds = Array.from(new Set([
@@ -1776,7 +1900,12 @@ function compactExecutionDiagnostics(value: unknown): Record<string, unknown> |
heartbeatFreshTaskIds,
heartbeatRiskTaskIds,
databaseActiveTaskIds,
});
staleRecoveryCandidateTaskIds,
}, recoveryOptions);
const recovery = asRecord(liveness.recovery) ?? compactRecoveryDecision(record, {
heartbeatRiskTaskCount: heartbeatRiskTaskIds.count,
staleRecoveryCandidateTaskCount: staleRecoveryCandidateTaskIds.count,
}, recoveryOptions);
const omittedCounts = {
databaseActiveTaskIds: databaseActiveTaskIds.omitted,
schedulerActiveTaskIds: schedulerActiveTaskIds.omitted,
@@ -1798,6 +1927,7 @@ function compactExecutionDiagnostics(value: unknown): Record<string, unknown> |
splitBrainLive: splitBrainLiveFromDiagnostics(record),
effectiveLiveness: liveness.effectiveLiveness,
recommendedAction: liveness.recommendedAction,
recovery,
liveness,
livenessSummary: livenessSummary.text,
livenessSummaryChars: livenessSummary.chars,
@@ -1832,8 +1962,12 @@ function compactExecutionDiagnostics(value: unknown): Record<string, unknown> |
};
}
function compactQueueExecutionDiagnostics(value: unknown): Record<string, unknown> | null {
const diagnostics = compactExecutionDiagnostics(value);
function compactQueueExecutionDiagnostics(value: unknown, recoveryOptions: {
snapshotRole?: CodeQueueLivenessSnapshotRole;
snapshotAt?: string | null;
submittedAt?: string | null;
} = {}): Record<string, unknown> | null {
const diagnostics = compactExecutionDiagnostics(value, recoveryOptions);
if (diagnostics === null) return null;
const listBudget = asRecord(diagnostics.listBudget) ?? {};
const omittedCounts = asRecord(listBudget.omittedCounts) ?? {};
@@ -1844,6 +1978,7 @@ function compactQueueExecutionDiagnostics(value: unknown): Record<string, unknow
splitBrainLive: diagnostics.splitBrainLive ?? null,
effectiveLiveness: diagnostics.effectiveLiveness ?? null,
recommendedAction: diagnostics.recommendedAction ?? null,
recovery: diagnostics.recovery ?? null,
liveness: diagnostics.liveness ?? null,
executionStateSource: diagnostics.executionStateSource ?? null,
controlPlane: diagnostics.controlPlane ?? null,
@@ -1883,7 +2018,7 @@ function stringListCount(value: unknown): number | null {
function compactCodeQueueActivity(
queue: Record<string, unknown>,
diagnostics: Record<string, unknown> | null,
options: { schedulerLocalActiveQueueIds?: string[]; runnableQueueCount?: number | null } = {},
options: CompactCodeQueueActivityOptions = {},
): Record<string, unknown> {
const rawDiagnostics = asRecord(queue.executionDiagnostics) ?? {};
const compactDiagnostics = diagnostics ?? {};
@@ -1938,6 +2073,17 @@ function compactCodeQueueActivity(
const recommendedAction = asString(rawDiagnostics.recommendedAction ?? compactDiagnostics.recommendedAction ?? rawLiveness.recommendedAction ?? compactLiveness.recommendedAction);
const splitBrain = asBoolean(rawDiagnostics.splitBrain) || asBoolean(compactDiagnostics.splitBrain) || executionState === "split-brain";
const splitBrainLive = splitBrainLiveFromDiagnostics(rawDiagnostics) || splitBrainLiveFromDiagnostics(compactDiagnostics);
const snapshotAt = options.snapshotAt ?? (asString(rawDiagnostics.now ?? compactDiagnostics.now) || null);
const recovery = compactRecoveryDecision({ ...compactDiagnostics, ...rawDiagnostics }, {
heartbeatRiskTaskCount,
staleRecoveryCandidateTaskCount,
}, {
snapshotRole: options.snapshotRole,
snapshotAt,
submittedAt: options.submittedAt ?? null,
});
const recoveryMutationAllowed = recovery.recoveryMutationAllowedByThisSnapshot === true;
const rePollBeforeRecovery = recovery.rePollBeforeRecovery === true;
const effectiveActiveSource = heartbeatFreshActiveTaskCount > 0 && heartbeatFreshActiveTaskCount >= databaseActiveTaskCount
? "heartbeat-fresh"
: databaseActiveTaskCount > 0
@@ -1948,8 +2094,13 @@ function compactCodeQueueActivity(
const activeQueueIdsNote = schedulerLocalActiveQueueIds.length === 0 && effectiveActiveTaskCount > 0
? "activeQueueIds are scheduler-local only; zero local queue ids does not mean zero active runners when database or heartbeat counts are nonzero."
: "activeQueueIds are scheduler-local active-run slots; use effectiveActiveTaskCount for commander concurrency decisions.";
const recommendedActionIntervenes = recommendedAction.length > 0 && recommendedAction !== "continue-supervision";
const interventionRequired = heartbeatRiskTaskCount > 0 || staleRecoveryCandidateTaskCount > 0 || recommendedActionIntervenes || (splitBrain && !splitBrainLive);
const recommendedActionIntervenes = recommendedAction.length > 0
&& !["none", "continue-supervision", "observe-degraded", "investigate-heartbeat-risk"].includes(recommendedAction);
const attentionRequired = heartbeatRiskTaskCount > 0
|| staleRecoveryCandidateTaskCount > 0
|| recommendedActionIntervenes
|| (splitBrain && !splitBrainLive);
const interventionRequired = recoveryMutationAllowed || recommendedActionIntervenes || ((splitBrain && !splitBrainLive) && !rePollBeforeRecovery);
const splitBrainDisposition = splitBrainLive
? "live-count-as-active"
: splitBrain
@@ -1962,15 +2113,35 @@ function compactCodeQueueActivity(
: "control-plane and execution-plane activity signals are not split-brain.";
const interventionReason = splitBrainLive
? "fresh heartbeat makes split-brain live; count these runners as active and continue supervision."
: heartbeatRiskTaskCount > 0
? "heartbeat risk is present; inspect before adding replacement work or recovering tasks."
: staleRecoveryCandidateTaskCount > 0
? "stale recovery candidates are present; follow the recovery runbook before changing concurrency."
: rePollBeforeRecovery
? "heartbeat risk is visible, but this snapshot is not repeated-poll confirmation; re-poll supervisor before recovery."
: recoveryMutationAllowed
? "repeated-poll confirmation is present; run dry-run reconcile before any recovery mutation."
: recommendedActionIntervenes
? `execution diagnostics recommend ${recommendedAction}; intervene before adding work.`
: splitBrain
? "split-brain is not proven live; inspect raw diagnostics before treating capacity as available."
: "no intervention signal in compact activity summary.";
const schedulerHeartbeatFreshness = {
source: "executionDiagnostics.schedulerHeartbeat",
heartbeatFreshActiveTaskCount,
activeHeartbeatTaskCount,
heartbeatRiskTaskCount,
staleRecoveryCandidateTaskCount,
lastSchedulerHeartbeatAt: recovery.lastSchedulerHeartbeatAt,
lastObservedAgentEventAt: recovery.lastObservedAgentEventAt,
};
const snapshotSemantics = {
role: recovery.snapshotRole,
boundedSnapshot: recovery.boundedSnapshot,
snapshotAt: recovery.snapshotAt,
submittedAt: recovery.submittedAt,
submitConfirmationIsImmediate: recovery.snapshotRole === "submit-confirmation",
databaseActiveField: "activity.databaseActiveTaskCount",
schedulerHeartbeatFreshnessField: "activity.schedulerHeartbeatFreshness",
queuedSubmittedStateField: "submitted.taskStates[]",
recoveryHint: recovery.hint,
};
const commanderConcurrency = {
activeRunnerCount: effectiveActiveTaskCount,
activeRunnerCountField: "activity.effectiveActiveTaskCount",
@@ -1978,8 +2149,11 @@ function compactCodeQueueActivity(
decisionRule: "subtract activeRunnerCount from the commander concurrency target; for a 15-runner policy, remaining slots = 15 - activeRunnerCount.",
splitBrainDisposition,
splitBrainReason,
attentionRequired,
interventionRequired,
interventionReason,
recoveryHint: recovery.hint,
recoveryDisposition: recovery.disposition,
};
return {
effectiveActiveTaskCount,
@@ -1990,6 +2164,9 @@ function compactCodeQueueActivity(
activeHeartbeatTaskCount,
heartbeatRiskTaskCount,
staleRecoveryCandidateTaskCount,
schedulerHeartbeatFreshness,
recovery,
snapshotSemantics,
schedulerLocalActiveQueueCount: schedulerLocalActiveQueueIds.length,
schedulerLocalActiveRunSlotCount,
runnableQueueCount,
@@ -2003,8 +2180,10 @@ function compactCodeQueueActivity(
activeQueueIdsNote,
interpretation: splitBrainLive
? "split-brain live: database-active tasks have fresh scheduler heartbeat; continue supervision."
: heartbeatRiskTaskCount > 0
? "heartbeat risk is present; investigate before retry or recovery."
: rePollBeforeRecovery
? "heartbeat risk is visible; re-poll supervisor before recovery."
: recoveryMutationAllowed
? "repeated poll confirms stale-active candidate; dry-run reconcile is the next recovery check."
: effectiveActiveTaskCount > 0
? "active work is present; compare database, heartbeat, and scheduler-local counts before changing concurrency."
: "no active work observed by database, heartbeat, or scheduler-local signals.",
@@ -2012,7 +2191,7 @@ function compactCodeQueueActivity(
}
function supervisorExecutionDiagnostics(value: unknown): Record<string, unknown> | null {
const diagnostics = compactExecutionDiagnostics(value);
const diagnostics = compactExecutionDiagnostics(value, { snapshotRole: "supervisor-poll" });
if (diagnostics === null) return null;
const liveness = asRecord(diagnostics.liveness) ?? {};
const listBudget = asRecord(diagnostics.listBudget) ?? {};
@@ -2020,6 +2199,7 @@ function supervisorExecutionDiagnostics(value: unknown): Record<string, unknown>
state: diagnostics.state ?? null,
effectiveLiveness: diagnostics.effectiveLiveness ?? null,
recommendedAction: diagnostics.recommendedAction ?? null,
recovery: diagnostics.recovery ?? null,
splitBrainLive: diagnostics.splitBrainLive ?? null,
activeHeartbeatCount: diagnostics.activeHeartbeatCount ?? null,
databaseActiveTaskCount: diagnostics.databaseActiveTaskCount ?? null,
@@ -3943,7 +4123,7 @@ function codexTasksCommanderResult(
const rawQueue = asRecord(taskPage.queue) ?? {};
const rawDiagnostics = asRecord(rawQueue.executionDiagnostics) ?? {};
const diagnostics = supervisorExecutionDiagnostics(rawDiagnostics);
const activity = compactCodeQueueActivity(rawQueue, diagnostics);
const activity = compactCodeQueueActivity(rawQueue, diagnostics, { snapshotRole: "supervisor-poll" });
const commanderConcurrency = asRecord(activity.commanderConcurrency) ?? {};
const runningTasks = sortRunningWatchTasks(allTasks);
const unreadCompletedTasks = sortCompletedWatchTasks(allTasks).filter((task) => taskUnreadTerminal(task));
@@ -4104,7 +4284,7 @@ function codexTasksOverviewResult(
const queuedSection = buildSupervisorTaskSection(queuedTasks, summaries, sectionLimit, sectionNextCommand(queuedTasks, sectionLimit, options, nextCommand), fullCommand);
const pagination = taskPage.pagination;
const diagnostics = supervisorExecutionDiagnostics(asRecord(taskPage.queue)?.executionDiagnostics);
const activity = compactCodeQueueActivity(asRecord(taskPage.queue) ?? {}, diagnostics);
const activity = compactCodeQueueActivity(asRecord(taskPage.queue) ?? {}, diagnostics, { snapshotRole: "supervisor-poll" });
const commanderConcurrency = asRecord(activity.commanderConcurrency) ?? {};
const activeRunning = supervisorActiveRunningSummary(taskPage, options, runningSection, diagnostics);
const visibleSupervisorItems = [...runningSection.items, ...unreadSection.items, ...recentSection.items, ...queuedSection.items];
@@ -4509,8 +4689,8 @@ function compactQueuesResponse(body: Record<string, unknown>, options: CodexQueu
const activeQueues = queues.filter((row) => typeof row.id === "string" && activeIds.includes(row.id));
const selected = options.full ? queues : Array.from(new Map([...activeQueues, ...unreadQueues, ...runnableQueues, ...nonemptyQueues].map((row) => [String(row.id), row])).values());
const visible = selected.slice(options.offset, options.offset + options.limit);
const diagnostics = compactQueueExecutionDiagnostics(queue.executionDiagnostics);
const activity = compactCodeQueueActivity(queue, diagnostics, { schedulerLocalActiveQueueIds: activeIds, runnableQueueCount: runnableQueues.length });
const diagnostics = compactQueueExecutionDiagnostics(queue.executionDiagnostics, { snapshotRole: "queue-summary" });
const activity = compactCodeQueueActivity(queue, diagnostics, { schedulerLocalActiveQueueIds: activeIds, runnableQueueCount: runnableQueues.length, snapshotRole: "queue-summary" });
const commanderConcurrency = asRecord(activity.commanderConcurrency) ?? {};
const activeTaskIds = boundedUniqueStringList(queue.activeTaskIds, Math.min(options.limit, maxTasksLimit));
const queuedTaskIds = boundedUniqueStringList(queue.queuedTaskIds, Math.min(options.limit, maxTasksLimit));
@@ -5195,8 +5375,16 @@ function compactSubmitQueueConfirmation(value: unknown, options: CompactSubmitQu
|| databaseActivePreview.truncated === true
|| queuedPreview.truncated === true
|| submittedPreview.truncated === true;
const diagnostics = compactQueueExecutionDiagnostics(record.executionDiagnostics);
const activity = compactCodeQueueActivity(record, diagnostics);
const diagnostics = compactQueueExecutionDiagnostics(record.executionDiagnostics, {
snapshotRole: "submit-confirmation",
snapshotAt: options.submittedAt ?? null,
submittedAt: options.submittedAt ?? null,
});
const activity = compactCodeQueueActivity(record, diagnostics, {
snapshotRole: "submit-confirmation",
snapshotAt: options.submittedAt ?? null,
submittedAt: options.submittedAt ?? null,
});
const commanderConcurrency = asRecord(activity.commanderConcurrency) ?? {};
const unavailableIdLists = {
activeTaskIds: activePreview.idsUnavailable === true,
@@ -5245,6 +5433,13 @@ function compactSubmitQueueConfirmation(value: unknown, options: CompactSubmitQu
queueCountsSource: "response.queue.counts",
activeCountField: "queue.countContext.active",
commanderActiveRunnerCountField: "queue.activity.effectiveActiveTaskCount",
snapshotRole: "submit-confirmation",
boundedSnapshot: true,
submittedAt: options.submittedAt ?? null,
schedulerHeartbeatFreshnessField: "queue.activity.schedulerHeartbeatFreshness",
databaseActiveField: "queue.countContext.databaseActive",
queuedSubmittedTaskStateField: "submitted.taskStates[]",
transientRiskHint: asRecord(activity.recovery)?.hint ?? null,
splitBrainLive: diagnostics?.splitBrainLive ?? false,
splitBrainDisposition: diagnostics?.splitBrainLive === true ? "live-counts-remain-active; continue supervision unless commanderConcurrency.interventionRequired=true" : "not-split-brain-live",
idsUnavailableMeaning: "A nonzero count with idsUnavailable=true means ids were omitted or unavailable in the bounded submit summary, not that there are no tasks.",
@@ -5277,6 +5472,7 @@ function compactSubmitSuccessResponse(body: Record<string, unknown>, upstream: R
const firstTaskId = taskIds[0] ?? null;
const firstQueueId = queueIds[0] ?? null;
const firstSubmittedTask = submittedTasks[0] ?? {};
const submittedAt = maxIsoTimestamp(submittedTasks.flatMap((task) => [task.createdAt, task.updatedAt]));
const queueRecord = asRecord(body.queue);
const runnerPermissions = compactRunnerPermissions(queueRecord?.runnerPermissions);
return {
@@ -5305,7 +5501,7 @@ function compactSubmitSuccessResponse(body: Record<string, unknown>, upstream: R
reason: "codex submit is a write operation; default output confirms persistence and provides drill-down commands without echoing prompt text.",
},
},
queue: compactSubmitQueueConfirmation(body.queue, { submittedTasks }),
queue: compactSubmitQueueConfirmation(body.queue, { submittedTasks, submittedAt }),
submitConcurrencyGuard: compactSubmitConcurrencyGuard(lock),
commands: {
firstTask: firstTaskId === null ? null : `bun scripts/cli.ts codex task ${firstTaskId}`,