diff --git a/docs/reference/code-queue-supervision.md b/docs/reference/code-queue-supervision.md index def62628..ed1d9f7c 100644 --- a/docs/reference/code-queue-supervision.md +++ b/docs/reference/code-queue-supervision.md @@ -289,9 +289,13 @@ commander 视图的任务分类必须是确定性字段,至少区分 `business 队列诊断中的 `split-brain` 表示控制面/执行面观测分裂,不自动证明任务已经死亡。只要任务 heartbeat 还在刷新、trace 仍在推进,就不能把它判成服务中断或要求立刻 stop;应把它视为 `splitBrainLive=true` 的 live 任务,继续监督并推进 #20 里的已排任务,而不是 interrupt、替换或把 backend 当成已经挂掉。队列摘要应显示 `effectiveLiveness=live`、`splitBrainLive=true` 和 `recommendedAction=continue-supervision`;compact 输出还应在 `executionDiagnostics.liveness` 中重复这些低噪声字段,并突出 `activeHeartbeatCount`、有界 `heartbeatFreshTaskIds`、`databaseActiveTaskCount` 和 `schedulerActiveRunSlotCount`。当 master/control-plane 的 `schedulerActiveRunSlotCount=0` 但 `heartbeatFreshTaskIds` 非空时,active 数应优先按 scheduler heartbeat 摘要解释为 live,而不是按 master 本地 slot 0 解释为执行停摆。只有 heartbeat expired/missing 或满足 stale-recovery 条件时,才应显示 `effectiveLiveness=at-risk` 并进入恢复判断。 +`codex submit` 成功后的 `queue` 是 submit-confirmation 即时 bounded snapshot,不是恢复判据。它必须同时区分 `submitted.taskStates[]` 中本次提交任务的 queued/running 状态、`queue.countContext.databaseActive` / `activity.databaseActiveTaskCount` 中 PostgreSQL active running、`activity.schedulerHeartbeatFreshness` 中 scheduler heartbeat freshness,以及 `activity.recovery` 中的 transient risk。若同一个提交回显里出现 `counts.running>0`、`queued>0`、`heartbeatRiskTaskCount>0` 或 `staleRecoveryCandidateTaskCount>0`,但 `lastObservedAgentEventAt` 明显早于 submit time 或还没有下一次 supervisor poll 确认,输出应保留候选可见性并给出 `re-poll supervisor before recovery`;这类单次 bounded snapshot 只能设置 `attentionRequired=true`,不得把 `commanderConcurrency.interventionRequired` 直接升级为高风险恢复。默认 drill-down 是重新运行 `codex tasks --view supervisor --limit 20` 或 raw overview,而不是 restart、cancel、interrupt 或 DB write。 + +默认 supervisor poll 也遵循同一低噪声语义:heartbeat expired/missing、`heartbeatRiskTaskIds` 和 `staleRecoveryCandidateTaskIds` 必须可见,但第一次 poll 只表示 `transient-needs-repoll`,`activity.recovery.hint` 应为 `re-poll supervisor before recovery`。只有 repeated poll 仍确认 owner heartbeat expired、scheduler local no active run、database-active task 仍存在,并且输出显式带 `repeatedPollConfirmed=true` 或 confirmed stale candidate,才允许进入 bounded dry-run reconcile;真实恢复仍受高风险边界约束。 + stale-active 恢复和 `/api/scheduler/reconcile?staleMs=...` 诊断入口的 heartbeat stale 阈值必须按安全下限归一化:缺省和低于默认 5 分钟的值都按 5 分钟处理,过大值按 24 小时上限截断,并在结构化响应中返回 `requestedStaleMs*`、`staleMsAdjusted`、`staleMsAdjustmentReason`、`minStaleMs` 和 `maxStaleMs`。任何 `staleMs=0` 或过低阈值都不能把仍有 fresh scheduler heartbeat 的任务判成 stale/recoverable。 -`codex queues`、`codex tasks --view commander` 和默认 supervisor 视图的 `activity` / `commanderConcurrency` 是指挥官并发治理的主读数。并发决策固定使用 `commanderConcurrency.activeRunnerCount` 或 commander `activeRunners.count`,它等于 `activity.effectiveActiveTaskCount`;15 并发策略的可补窗口按 `15 - activeRunnerCount` 计算,不能用 `activeQueueIds.length` 或 scheduler-local slot 数替代。`effectiveActiveTaskCount` 表示用于调度判断的有效活跃任务数;`databaseRunningTaskCount` 来自 PostgreSQL 中 `running` 状态计数;`databaseActiveTaskCount` 覆盖 running/judging 等数据库活跃任务;`heartbeatFreshActiveTaskCount` 表示 heartbeat-fresh 的有效 runner 数;`schedulerLocalActiveQueueCount` 和 `schedulerLocalActiveRunSlotCount` 只表示当前控制面本地可见 active run slots。`activeQueueIds` 与 `activeQueueCount` 是 scheduler-local 字段,可能在 `counts.running>0` 且 heartbeat 新鲜时为 0;看到这种组合时应按 `activity.effectiveActiveTaskCount`、`activity.heartbeatFreshActiveTaskCount` 和 `splitBrainLive` 决策,不得把空 `activeQueueIds` 当作零并发或停摆证据。`commanderConcurrency.splitBrainDisposition=live-count-as-active` 表示 split-brain 仍是 live 且应计入 active runner;`interventionRequired=true`、heartbeat risk、stale recovery candidates,或非 `continue-supervision` 的 recommended action 才进入人工介入/恢复判断。 +`codex queues`、`codex tasks --view commander` 和默认 supervisor 视图的 `activity` / `commanderConcurrency` 是指挥官并发治理的主读数。并发决策固定使用 `commanderConcurrency.activeRunnerCount` 或 commander `activeRunners.count`,它等于 `activity.effectiveActiveTaskCount`;15 并发策略的可补窗口按 `15 - activeRunnerCount` 计算,不能用 `activeQueueIds.length` 或 scheduler-local slot 数替代。`effectiveActiveTaskCount` 表示用于调度判断的有效活跃任务数;`databaseRunningTaskCount` 来自 PostgreSQL 中 `running` 状态计数;`databaseActiveTaskCount` 覆盖 running/judging 等数据库活跃任务;`heartbeatFreshActiveTaskCount` 表示 heartbeat-fresh 的有效 runner 数;`schedulerLocalActiveQueueCount` 和 `schedulerLocalActiveRunSlotCount` 只表示当前控制面本地可见 active run slots。`activeQueueIds` 与 `activeQueueCount` 是 scheduler-local 字段,可能在 `counts.running>0` 且 heartbeat 新鲜时为 0;看到这种组合时应按 `activity.effectiveActiveTaskCount`、`activity.heartbeatFreshActiveTaskCount` 和 `splitBrainLive` 决策,不得把空 `activeQueueIds` 当作零并发或停摆证据。`commanderConcurrency.splitBrainDisposition=live-count-as-active` 表示 split-brain 仍是 live 且应计入 active runner;`attentionRequired=true` 表示需要人工看一眼或重新 poll,`interventionRequired=true` 才表示当前输出已经足以进入高风险介入路径。单次 heartbeat risk、stale recovery candidates 或 `recommendedAction=investigate-heartbeat-risk` 应先落到 `attentionRequired=true` 加 `re-poll supervisor before recovery`,不得直接等价为恢复授权。 单次 `provider is not online`、SSH 超时、proxy 超时或 registry 请求失败只能证明“当前观察路径失败”,不能单独升级为 D601 全局离线、CI/CD 全局阻塞或业务任务不可推进。指挥官和 runner 必须用多信号裁决运行面状态,至少区分以下观察面: diff --git a/scripts/code-queue-submit-summary-contract-test.ts b/scripts/code-queue-submit-summary-contract-test.ts index 6385109a..23583a1d 100644 --- a/scripts/code-queue-submit-summary-contract-test.ts +++ b/scripts/code-queue-submit-summary-contract-test.ts @@ -147,6 +147,46 @@ export function runCodeQueueSubmitSummaryContract(): JsonRecord { assertCondition(String(liveStateDisclosure.splitBrainDisposition || "").includes("continue supervision"), "stateDisclosure should explain split-brain-live disposition", liveStateDisclosure); assertCondition(String(liveStateDisclosure.idsUnavailableMeaning || "").includes("not that there are no tasks"), "stateDisclosure should prevent empty-list misread", liveStateDisclosure); + const transientRiskSubmit = compactSubmitSuccessResponseForTest({ + tasks: [task("codex_submitted_queued_during_stale_snapshot", "queued", "high-concurrency")], + queue: { + counts: { running: 7, queued: 1 }, + activeTaskIds: { items: [], count: 7, returned: 0, truncated: true }, + queuedTaskIds: { items: [], count: 1, returned: 0, truncated: true }, + databaseActiveTaskCount: 7, + executionDiagnostics: { + state: "stale-active", + effectiveLiveness: "at-risk", + recommendedAction: "investigate-heartbeat-risk", + databaseActiveTaskCount: 7, + databaseActiveTaskIds: manyIds("stale-db-active", 7), + schedulerActiveRunSlotCount: 0, + activeHeartbeatCount: 7, + lastSchedulerHeartbeatAt: "2026-05-22T23:50:00.000Z", + lastObservedAgentEventAt: "2026-05-22T23:49:30.000Z", + heartbeatExpiredTaskIds: manyIds("stale-db-active", 7), + staleRecoveryCandidateTaskIds: manyIds("stale-db-active", 7), + heartbeatRiskTaskIds: manyIds("stale-db-active", 7), + }, + }, + }, { ok: true, status: 200 }, { mode: "local-atomic-directory-submit-serialization", acquiredAfterMs: 1, heldMs: 2, throttleMs: 2000 }); + const transientQueue = asRecord(asRecord(transientRiskSubmit).queue); + const transientActivity = asRecord(transientQueue.activity); + const transientConcurrency = asRecord(transientQueue.commanderConcurrency); + const transientRecovery = asRecord(transientActivity.recovery); + const transientStateDisclosure = asRecord(transientQueue.stateDisclosure); + const transientDiagnostics = asRecord(transientQueue.executionDiagnostics); + const transientDiagnosticsRecovery = asRecord(transientDiagnostics.recovery); + assertCondition(transientActivity.heartbeatRiskTaskCount === 7 && transientActivity.staleRecoveryCandidateTaskCount === 7, "submit activity should keep stale-active candidates visible", transientActivity); + assertCondition(transientRecovery.disposition === "transient-needs-repoll", "submit stale snapshot should be classified as transient until re-polled", transientRecovery); + assertCondition(transientRecovery.hint === "re-poll supervisor before recovery", "submit stale snapshot should emit the re-poll hint", transientRecovery); + assertCondition(transientRecovery.boundedSnapshot === true && transientRecovery.snapshotRole === "submit-confirmation", "submit recovery context should identify bounded confirmation snapshot", transientRecovery); + assertCondition(transientRecovery.lastObservedAgentEventBeforeSubmit === true, "submit recovery context should compare agent event time with submit time", transientRecovery); + assertCondition(transientRecovery.recoveryMutationAllowedByThisSnapshot === false, "single submit snapshot must not allow recovery mutation", transientRecovery); + assertCondition(transientConcurrency.attentionRequired === true && transientConcurrency.interventionRequired === false, "submit heartbeat risk should require attention but not direct high-risk intervention", transientConcurrency); + assertCondition(transientStateDisclosure.transientRiskHint === "re-poll supervisor before recovery", "stateDisclosure should repeat low-noise re-poll hint", transientStateDisclosure); + assertCondition(transientDiagnosticsRecovery.snapshotRole === "submit-confirmation" && transientDiagnosticsRecovery.hint === "re-poll supervisor before recovery", "submit execution diagnostics should carry submit snapshot recovery semantics", transientDiagnosticsRecovery); + const queuedIdsOmitted = compactSubmitSuccessResponseForTest({ tasks: [task("codex_submitted_already_running", "running", "live-fast-lane")], queue: { @@ -177,6 +217,7 @@ export function runCodeQueueSubmitSummaryContract(): JsonRecord { "running count context falls back to database active ids", "nonzero count with omitted id lists uses idsUnavailable instead of items=[]", "split-brain-live submit summary says continue supervision and count as active", + "stale submit snapshots preserve candidates but require re-poll before recovery", "bounded id previews disclose omitted counts", "submit confirmation omits prompt text and remains low-noise", ], diff --git a/scripts/code-queue-supervisor-disclosure-contract-test.ts b/scripts/code-queue-supervisor-disclosure-contract-test.ts index 164b58fa..6fb212d4 100644 --- a/scripts/code-queue-supervisor-disclosure-contract-test.ts +++ b/scripts/code-queue-supervisor-disclosure-contract-test.ts @@ -241,6 +241,64 @@ function splitBrainLiveSupervisorFixtureResponse(path: string): JsonRecord { }; } +function staleSnapshotSupervisorFixtureResponse(path: string): JsonRecord { + if (path.includes("/summary")) return fixtureResponse(path); + assertCondition(path.startsWith("/api/microservices/code-queue/proxy/api/tasks/overview"), "unexpected path", { path }); + const staleTaskIds = manyIds("stale-active", 7); + const tasks = [ + ...staleTaskIds.map((taskId, index) => task( + taskId, + "running", + `2026-05-22T02:${String(50 - index).padStart(2, "0")}:00.000Z`, + )), + task("stale-snapshot-new-queued", "queued", "2026-05-22T02:55:00.000Z"), + ]; + return { + ok: true, + status: 200, + body: { + ok: true, + queue: { + counts: { running: 7, queued: 1 }, + activeQueueIds: [], + activeTaskIds: { items: [], count: 7, returned: 0, truncated: true }, + activeRunSlotCount: 0, + databaseActiveTaskCount: 7, + executionDiagnostics: { + state: "stale-active", + splitBrain: false, + effectiveLiveness: "at-risk", + recommendedAction: "investigate-heartbeat-risk", + now: "2026-05-22T03:00:00.000Z", + databaseActiveTaskCount: 7, + databaseActiveTaskIds: staleTaskIds, + schedulerActiveRunSlotCount: 0, + schedulerActiveTaskIds: [], + activeHeartbeatCount: 7, + activeHeartbeatTaskIds: staleTaskIds, + heartbeatFreshTaskIds: [], + heartbeatExpiredTaskIds: staleTaskIds, + heartbeatMissingTaskIds: [], + staleRecoveryCandidateTaskIds: staleTaskIds, + heartbeatRiskTaskIds: staleTaskIds, + lastSchedulerHeartbeatAt: "2026-05-22T02:45:00.000Z", + lastObservedAgentEventAt: "2026-05-22T02:44:30.000Z", + reasons: ["owner heartbeat is expired and scheduler has no local active run for at least one database-active task"], + }, + }, + pagination: { + limit: 200, + returned: 8, + total: 8, + hasMore: false, + nextBeforeId: null, + includeActive: true, + }, + tasks, + }, + }; +} + export function runCodeQueueSupervisorDisclosureContract(): JsonRecord { const supervisor = codexTasksQueryForTest(["--view", "supervisor", "--limit", "20"], fixtureResponse); const cappedLimit = codexTasksQueryForTest(["--view", "supervisor", "--limit", "260"], fixtureResponse); @@ -249,6 +307,7 @@ export function runCodeQueueSupervisorDisclosureContract(): JsonRecord { const runningFiltered = codexTasksQueryForTest(["--status", "running", "--limit", "40"], manyRunningFixtureResponse); const unreadFiltered = codexTasksQueryForTest(["--unread", "--limit", "20"], fixtureResponse); const splitBrainLive = codexTasksQueryForTest(["--view", "supervisor", "--limit", "20"], splitBrainLiveSupervisorFixtureResponse); + const staleSnapshot = codexTasksQueryForTest(["--view", "supervisor", "--limit", "20"], staleSnapshotSupervisorFixtureResponse); const supervisorBody = JSON.stringify(supervisor); const fullBody = JSON.stringify(full); @@ -283,6 +342,12 @@ export function runCodeQueueSupervisorDisclosureContract(): JsonRecord { const splitBrainLiveActivity = asRecord(splitBrainLiveView.activity); const splitBrainLiveConcurrency = asRecord(splitBrainLiveView.commanderConcurrency); const splitBrainLiveCounts = asRecord(splitBrainLiveView.counts); + const staleSnapshotView = asRecord(asRecord(staleSnapshot).supervisor); + const staleSnapshotActivity = asRecord(staleSnapshotView.activity); + const staleSnapshotConcurrency = asRecord(staleSnapshotView.commanderConcurrency); + const staleSnapshotDiagnostics = asRecord(staleSnapshotView.executionDiagnostics); + const staleSnapshotRecovery = asRecord(staleSnapshotActivity.recovery); + const staleSnapshotDiagnosticsRecovery = asRecord(staleSnapshotDiagnostics.recovery); const cappedFilters = asRecord(cappedSupervisorView.filters); const cappedSource = asRecord(cappedSupervisorView.source); const cappedLimitPolicy = asRecord(asRecord(cappedSupervisorView.disclosure).limitPolicy); @@ -359,6 +424,14 @@ export function runCodeQueueSupervisorDisclosureContract(): JsonRecord { assertCondition(String(splitBrainLiveConcurrency.decisionRule ?? "").includes("15 - activeRunnerCount"), "split-brain supervisor should give 15-concurrency arithmetic", splitBrainLiveConcurrency); assertCondition(String(splitBrainLiveActivity.activeQueueIdsNote ?? "").includes("zero local queue ids does not mean zero active runners"), "split-brain supervisor activity should explain activeQueueIds are local-only", splitBrainLiveActivity); assertCondition(String(splitBrainLiveActivity.interpretation ?? "").includes("continue supervision"), "split-brain supervisor activity should not imply scheduler stoppage", splitBrainLiveActivity); + assertCondition(staleSnapshotActivity.heartbeatRiskTaskCount === 7 && staleSnapshotActivity.staleRecoveryCandidateTaskCount === 7, "stale supervisor snapshot should keep heartbeat-risk candidates visible", staleSnapshotActivity); + assertCondition(staleSnapshotActivity.effectiveActiveTaskCount === 7 && staleSnapshotActivity.databaseRunningTaskCount === 7, "stale supervisor snapshot should preserve database-active running count", staleSnapshotActivity); + assertCondition(staleSnapshotRecovery.disposition === "transient-needs-repoll" && staleSnapshotRecovery.hint === "re-poll supervisor before recovery", "stale supervisor snapshot should ask for re-poll before recovery", staleSnapshotRecovery); + assertCondition(staleSnapshotRecovery.snapshotRole === "supervisor-poll" && staleSnapshotRecovery.boundedSnapshot === false, "supervisor recovery context should distinguish poll from submit confirmation", staleSnapshotRecovery); + assertCondition(staleSnapshotRecovery.recoveryMutationAllowedByThisSnapshot === false, "single supervisor poll must not allow recovery mutation", staleSnapshotRecovery); + assertCondition(staleSnapshotConcurrency.attentionRequired === true && staleSnapshotConcurrency.interventionRequired === false, "stale supervisor snapshot should require attention but not immediate high-risk intervention", staleSnapshotConcurrency); + assertCondition(staleSnapshotDiagnostics.effectiveLiveness === "at-risk" && staleSnapshotDiagnostics.recommendedAction === "investigate-heartbeat-risk", "stale supervisor diagnostics should keep at-risk visibility", staleSnapshotDiagnostics); + assertCondition(staleSnapshotDiagnosticsRecovery.hint === "re-poll supervisor before recovery", "stale supervisor diagnostics should carry the re-poll hint", staleSnapshotDiagnosticsRecovery); return { ok: true, @@ -376,6 +449,7 @@ export function runCodeQueueSupervisorDisclosureContract(): JsonRecord { "full view remains detailed", "split-brain live supervisor activity distinguishes scheduler-local, database, and heartbeat counts", "commander concurrency block names the active runner count and 15-concurrency rule", + "stale-active supervisor snapshot remains visible but requires re-poll before recovery", ], supervisorChars: supervisorBody.length, fullChars: fullBody.length, diff --git a/scripts/src/code-queue.ts b/scripts/src/code-queue.ts index ed8d2b81..0dc84256 100644 --- a/scripts/src/code-queue.ts +++ b/scripts/src/code-queue.ts @@ -300,6 +300,17 @@ interface CompactTaskMutationResponseOptions { interface CompactSubmitQueueConfirmationOptions { submittedTasks?: Record[]; idPreviewLimit?: number; + submittedAt?: string | null; +} + +type CodeQueueLivenessSnapshotRole = "submit-confirmation" | "supervisor-poll" | "queue-summary"; + +interface CompactCodeQueueActivityOptions { + schedulerLocalActiveQueueIds?: string[]; + runnableQueueCount?: number | null; + snapshotRole?: CodeQueueLivenessSnapshotRole; + snapshotAt?: string | null; + submittedAt?: string | null; } interface CodexTasksOptions { @@ -649,6 +660,26 @@ function asNumber(value: unknown, fallback = 0): number { return typeof value === "number" && Number.isFinite(value) ? value : fallback; } +function timestampMs(value: unknown): number | null { + const text = asString(value); + if (text.length === 0) return null; + const parsed = Date.parse(text); + return Number.isFinite(parsed) ? parsed : null; +} + +function maxIsoTimestamp(values: unknown[]): string | null { + let best: string | null = null; + let bestMs = -Infinity; + for (const value of values) { + const text = asString(value); + const parsed = timestampMs(text); + if (parsed === null || parsed < bestMs) continue; + best = text; + bestMs = parsed; + } + return best; +} + function finiteNumber(value: unknown): number | null { return typeof value === "number" && Number.isFinite(value) ? value : null; } @@ -1696,16 +1727,101 @@ function activeHeartbeatCountFromDiagnostics(record: Record, ac return Number.isFinite(explicit) ? explicit : Math.max(activeHeartbeatTaskIds.count, heartbeatFreshTaskIds.count); } +function compactRecoveryDecision(record: Record, counts: { + heartbeatRiskTaskCount: number; + staleRecoveryCandidateTaskCount: number; +}, options: { + snapshotRole?: CodeQueueLivenessSnapshotRole; + snapshotAt?: string | null; + submittedAt?: string | null; +} = {}): Record { + const confirmedIds = boundedUniqueStringList(record.staleRecoveryConfirmedTaskIds ?? record.confirmedStaleRecoveryCandidateTaskIds); + const repeatedPollConfirmed = asBoolean(record.repeatedPollConfirmed) + || asBoolean(record.recoveryRepeatedPollConfirmed) + || confirmedIds.count > 0; + const riskVisible = counts.heartbeatRiskTaskCount > 0 || counts.staleRecoveryCandidateTaskCount > 0; + const recoveryMutationAllowedByThisSnapshot = riskVisible && repeatedPollConfirmed; + const rePollBeforeRecovery = riskVisible && !repeatedPollConfirmed; + const snapshotRole = options.snapshotRole ?? "supervisor-poll"; + const snapshotAt = options.snapshotAt ?? (asString(record.now) || null); + const submittedAt = options.submittedAt ?? null; + const lastObservedAgentEventAt = asString(record.lastObservedAgentEventAt) || null; + const lastSchedulerHeartbeatAt = asString(record.lastSchedulerHeartbeatAt) || null; + const lastObservedMs = timestampMs(lastObservedAgentEventAt); + const submittedMs = timestampMs(submittedAt); + const snapshotMs = timestampMs(snapshotAt); + const lastObservedAgentEventBeforeSubmit = lastObservedMs !== null && submittedMs !== null && lastObservedMs < submittedMs; + const lastObservedAgentEventBeforeSnapshot = lastObservedMs !== null && snapshotMs !== null && lastObservedMs < snapshotMs; + const hint = riskVisible + ? recoveryMutationAllowedByThisSnapshot + ? "run dry-run reconcile before recovery" + : "re-poll supervisor before recovery" + : null; + if (!riskVisible) { + return { + riskVisible: false, + disposition: "none", + hint: null, + rePollBeforeRecovery: false, + pollConfirmationRequired: false, + repeatedPollConfirmed, + recoveryMutationAllowedByThisSnapshot: false, + heartbeatRiskTaskCount: 0, + staleRecoveryCandidateTaskCount: 0, + snapshotRole, + boundedSnapshot: snapshotRole === "submit-confirmation", + }; + } + return { + riskVisible, + disposition: !riskVisible + ? "none" + : recoveryMutationAllowedByThisSnapshot + ? "confirmed-stale-active-candidate" + : "transient-needs-repoll", + hint, + rePollBeforeRecovery, + pollConfirmationRequired: rePollBeforeRecovery, + repeatedPollConfirmed, + recoveryMutationAllowedByThisSnapshot, + staleActiveCandidateVisibility: counts.staleRecoveryCandidateTaskCount > 0 ? "visible-candidate-not-mutation-proof" : "none", + heartbeatRiskTaskCount: counts.heartbeatRiskTaskCount, + staleRecoveryCandidateTaskCount: counts.staleRecoveryCandidateTaskCount, + confirmedStaleRecoveryCandidateTaskIds: confirmedIds.items, + confirmedStaleRecoveryCandidateTaskCount: confirmedIds.count, + snapshotRole, + boundedSnapshot: snapshotRole === "submit-confirmation", + snapshotAt, + submittedAt, + lastObservedAgentEventAt, + lastSchedulerHeartbeatAt, + lastObservedAgentEventBeforeSubmit, + lastObservedAgentEventBeforeSnapshot, + nextPollCommand: rePollBeforeRecovery ? "bun scripts/cli.ts codex tasks --view supervisor --limit 20" : null, + dryRunReconcileCommand: recoveryMutationAllowedByThisSnapshot ? "bun scripts/cli.ts microservice proxy code-queue '/api/scheduler/reconcile?dryRun=1' --raw" : null, + mutationBoundary: "Never recover, restart, cancel, or interrupt from a single bounded liveness snapshot.", + }; +} + function compactLivenessDecision(record: Record, lists: { activeHeartbeatTaskIds: { items: string[]; count: number; truncated: boolean; omitted: number }; heartbeatFreshTaskIds: { items: string[]; count: number; truncated: boolean; omitted: number }; heartbeatRiskTaskIds: { items: string[]; count: number }; databaseActiveTaskIds: { count: number }; -}): Record { + staleRecoveryCandidateTaskIds?: { count: number }; +}, recoveryOptions: { + snapshotRole?: CodeQueueLivenessSnapshotRole; + snapshotAt?: string | null; + submittedAt?: string | null; +} = {}): Record { const splitBrainLive = splitBrainLiveFromDiagnostics(record); const effectiveLiveness = effectiveLivenessFromDiagnostics(record); const recommendedAction = recommendedActionFromDiagnostics(record); const activeHeartbeatCount = activeHeartbeatCountFromDiagnostics(record, lists.activeHeartbeatTaskIds, lists.heartbeatFreshTaskIds); + const recovery = compactRecoveryDecision(record, { + heartbeatRiskTaskCount: lists.heartbeatRiskTaskIds.count, + staleRecoveryCandidateTaskCount: lists.staleRecoveryCandidateTaskIds?.count ?? stringList(record.staleRecoveryCandidateTaskIds).length, + }, recoveryOptions); return { effectiveLiveness, recommendedAction, @@ -1717,8 +1833,12 @@ function compactLivenessDecision(record: Record, lists: { databaseActiveTaskCount: asNumber(record.databaseActiveTaskCount, lists.databaseActiveTaskIds.count), schedulerActiveRunSlotCount: record.schedulerActiveRunSlotCount ?? null, heartbeatRiskTaskCount: lists.heartbeatRiskTaskIds.count, + staleRecoveryCandidateTaskCount: lists.staleRecoveryCandidateTaskIds?.count ?? stringList(record.staleRecoveryCandidateTaskIds).length, + recovery, interpretation: splitBrainLive ? "scheduler heartbeat is fresh; treat active task count from heartbeat as live and continue supervision" + : recovery.rePollBeforeRecovery === true + ? "heartbeat risk is visible in this bounded snapshot; re-poll supervisor before recovery" : effectiveLiveness === "at-risk" ? "heartbeat risk is present; investigate heartbeat freshness before recovery" : effectiveLiveness === "degraded" @@ -1749,7 +1869,11 @@ function boundedInlineString(value: unknown, maxChars: number): { text: string | }; } -function compactExecutionDiagnostics(value: unknown): Record | null { +function compactExecutionDiagnostics(value: unknown, recoveryOptions: { + snapshotRole?: CodeQueueLivenessSnapshotRole; + snapshotAt?: string | null; + submittedAt?: string | null; +} = {}): Record | null { const record = asRecord(value); if (record === null) return null; const fullHeartbeatRiskTaskIds = Array.from(new Set([ @@ -1776,7 +1900,12 @@ function compactExecutionDiagnostics(value: unknown): Record | heartbeatFreshTaskIds, heartbeatRiskTaskIds, databaseActiveTaskIds, - }); + staleRecoveryCandidateTaskIds, + }, recoveryOptions); + const recovery = asRecord(liveness.recovery) ?? compactRecoveryDecision(record, { + heartbeatRiskTaskCount: heartbeatRiskTaskIds.count, + staleRecoveryCandidateTaskCount: staleRecoveryCandidateTaskIds.count, + }, recoveryOptions); const omittedCounts = { databaseActiveTaskIds: databaseActiveTaskIds.omitted, schedulerActiveTaskIds: schedulerActiveTaskIds.omitted, @@ -1798,6 +1927,7 @@ function compactExecutionDiagnostics(value: unknown): Record | splitBrainLive: splitBrainLiveFromDiagnostics(record), effectiveLiveness: liveness.effectiveLiveness, recommendedAction: liveness.recommendedAction, + recovery, liveness, livenessSummary: livenessSummary.text, livenessSummaryChars: livenessSummary.chars, @@ -1832,8 +1962,12 @@ function compactExecutionDiagnostics(value: unknown): Record | }; } -function compactQueueExecutionDiagnostics(value: unknown): Record | null { - const diagnostics = compactExecutionDiagnostics(value); +function compactQueueExecutionDiagnostics(value: unknown, recoveryOptions: { + snapshotRole?: CodeQueueLivenessSnapshotRole; + snapshotAt?: string | null; + submittedAt?: string | null; +} = {}): Record | null { + const diagnostics = compactExecutionDiagnostics(value, recoveryOptions); if (diagnostics === null) return null; const listBudget = asRecord(diagnostics.listBudget) ?? {}; const omittedCounts = asRecord(listBudget.omittedCounts) ?? {}; @@ -1844,6 +1978,7 @@ function compactQueueExecutionDiagnostics(value: unknown): Record, diagnostics: Record | null, - options: { schedulerLocalActiveQueueIds?: string[]; runnableQueueCount?: number | null } = {}, + options: CompactCodeQueueActivityOptions = {}, ): Record { const rawDiagnostics = asRecord(queue.executionDiagnostics) ?? {}; const compactDiagnostics = diagnostics ?? {}; @@ -1938,6 +2073,17 @@ function compactCodeQueueActivity( const recommendedAction = asString(rawDiagnostics.recommendedAction ?? compactDiagnostics.recommendedAction ?? rawLiveness.recommendedAction ?? compactLiveness.recommendedAction); const splitBrain = asBoolean(rawDiagnostics.splitBrain) || asBoolean(compactDiagnostics.splitBrain) || executionState === "split-brain"; const splitBrainLive = splitBrainLiveFromDiagnostics(rawDiagnostics) || splitBrainLiveFromDiagnostics(compactDiagnostics); + const snapshotAt = options.snapshotAt ?? (asString(rawDiagnostics.now ?? compactDiagnostics.now) || null); + const recovery = compactRecoveryDecision({ ...compactDiagnostics, ...rawDiagnostics }, { + heartbeatRiskTaskCount, + staleRecoveryCandidateTaskCount, + }, { + snapshotRole: options.snapshotRole, + snapshotAt, + submittedAt: options.submittedAt ?? null, + }); + const recoveryMutationAllowed = recovery.recoveryMutationAllowedByThisSnapshot === true; + const rePollBeforeRecovery = recovery.rePollBeforeRecovery === true; const effectiveActiveSource = heartbeatFreshActiveTaskCount > 0 && heartbeatFreshActiveTaskCount >= databaseActiveTaskCount ? "heartbeat-fresh" : databaseActiveTaskCount > 0 @@ -1948,8 +2094,13 @@ function compactCodeQueueActivity( const activeQueueIdsNote = schedulerLocalActiveQueueIds.length === 0 && effectiveActiveTaskCount > 0 ? "activeQueueIds are scheduler-local only; zero local queue ids does not mean zero active runners when database or heartbeat counts are nonzero." : "activeQueueIds are scheduler-local active-run slots; use effectiveActiveTaskCount for commander concurrency decisions."; - const recommendedActionIntervenes = recommendedAction.length > 0 && recommendedAction !== "continue-supervision"; - const interventionRequired = heartbeatRiskTaskCount > 0 || staleRecoveryCandidateTaskCount > 0 || recommendedActionIntervenes || (splitBrain && !splitBrainLive); + const recommendedActionIntervenes = recommendedAction.length > 0 + && !["none", "continue-supervision", "observe-degraded", "investigate-heartbeat-risk"].includes(recommendedAction); + const attentionRequired = heartbeatRiskTaskCount > 0 + || staleRecoveryCandidateTaskCount > 0 + || recommendedActionIntervenes + || (splitBrain && !splitBrainLive); + const interventionRequired = recoveryMutationAllowed || recommendedActionIntervenes || ((splitBrain && !splitBrainLive) && !rePollBeforeRecovery); const splitBrainDisposition = splitBrainLive ? "live-count-as-active" : splitBrain @@ -1962,15 +2113,35 @@ function compactCodeQueueActivity( : "control-plane and execution-plane activity signals are not split-brain."; const interventionReason = splitBrainLive ? "fresh heartbeat makes split-brain live; count these runners as active and continue supervision." - : heartbeatRiskTaskCount > 0 - ? "heartbeat risk is present; inspect before adding replacement work or recovering tasks." - : staleRecoveryCandidateTaskCount > 0 - ? "stale recovery candidates are present; follow the recovery runbook before changing concurrency." + : rePollBeforeRecovery + ? "heartbeat risk is visible, but this snapshot is not repeated-poll confirmation; re-poll supervisor before recovery." + : recoveryMutationAllowed + ? "repeated-poll confirmation is present; run dry-run reconcile before any recovery mutation." : recommendedActionIntervenes ? `execution diagnostics recommend ${recommendedAction}; intervene before adding work.` : splitBrain ? "split-brain is not proven live; inspect raw diagnostics before treating capacity as available." : "no intervention signal in compact activity summary."; + const schedulerHeartbeatFreshness = { + source: "executionDiagnostics.schedulerHeartbeat", + heartbeatFreshActiveTaskCount, + activeHeartbeatTaskCount, + heartbeatRiskTaskCount, + staleRecoveryCandidateTaskCount, + lastSchedulerHeartbeatAt: recovery.lastSchedulerHeartbeatAt, + lastObservedAgentEventAt: recovery.lastObservedAgentEventAt, + }; + const snapshotSemantics = { + role: recovery.snapshotRole, + boundedSnapshot: recovery.boundedSnapshot, + snapshotAt: recovery.snapshotAt, + submittedAt: recovery.submittedAt, + submitConfirmationIsImmediate: recovery.snapshotRole === "submit-confirmation", + databaseActiveField: "activity.databaseActiveTaskCount", + schedulerHeartbeatFreshnessField: "activity.schedulerHeartbeatFreshness", + queuedSubmittedStateField: "submitted.taskStates[]", + recoveryHint: recovery.hint, + }; const commanderConcurrency = { activeRunnerCount: effectiveActiveTaskCount, activeRunnerCountField: "activity.effectiveActiveTaskCount", @@ -1978,8 +2149,11 @@ function compactCodeQueueActivity( decisionRule: "subtract activeRunnerCount from the commander concurrency target; for a 15-runner policy, remaining slots = 15 - activeRunnerCount.", splitBrainDisposition, splitBrainReason, + attentionRequired, interventionRequired, interventionReason, + recoveryHint: recovery.hint, + recoveryDisposition: recovery.disposition, }; return { effectiveActiveTaskCount, @@ -1990,6 +2164,9 @@ function compactCodeQueueActivity( activeHeartbeatTaskCount, heartbeatRiskTaskCount, staleRecoveryCandidateTaskCount, + schedulerHeartbeatFreshness, + recovery, + snapshotSemantics, schedulerLocalActiveQueueCount: schedulerLocalActiveQueueIds.length, schedulerLocalActiveRunSlotCount, runnableQueueCount, @@ -2003,8 +2180,10 @@ function compactCodeQueueActivity( activeQueueIdsNote, interpretation: splitBrainLive ? "split-brain live: database-active tasks have fresh scheduler heartbeat; continue supervision." - : heartbeatRiskTaskCount > 0 - ? "heartbeat risk is present; investigate before retry or recovery." + : rePollBeforeRecovery + ? "heartbeat risk is visible; re-poll supervisor before recovery." + : recoveryMutationAllowed + ? "repeated poll confirms stale-active candidate; dry-run reconcile is the next recovery check." : effectiveActiveTaskCount > 0 ? "active work is present; compare database, heartbeat, and scheduler-local counts before changing concurrency." : "no active work observed by database, heartbeat, or scheduler-local signals.", @@ -2012,7 +2191,7 @@ function compactCodeQueueActivity( } function supervisorExecutionDiagnostics(value: unknown): Record | null { - const diagnostics = compactExecutionDiagnostics(value); + const diagnostics = compactExecutionDiagnostics(value, { snapshotRole: "supervisor-poll" }); if (diagnostics === null) return null; const liveness = asRecord(diagnostics.liveness) ?? {}; const listBudget = asRecord(diagnostics.listBudget) ?? {}; @@ -2020,6 +2199,7 @@ function supervisorExecutionDiagnostics(value: unknown): Record state: diagnostics.state ?? null, effectiveLiveness: diagnostics.effectiveLiveness ?? null, recommendedAction: diagnostics.recommendedAction ?? null, + recovery: diagnostics.recovery ?? null, splitBrainLive: diagnostics.splitBrainLive ?? null, activeHeartbeatCount: diagnostics.activeHeartbeatCount ?? null, databaseActiveTaskCount: diagnostics.databaseActiveTaskCount ?? null, @@ -3943,7 +4123,7 @@ function codexTasksCommanderResult( const rawQueue = asRecord(taskPage.queue) ?? {}; const rawDiagnostics = asRecord(rawQueue.executionDiagnostics) ?? {}; const diagnostics = supervisorExecutionDiagnostics(rawDiagnostics); - const activity = compactCodeQueueActivity(rawQueue, diagnostics); + const activity = compactCodeQueueActivity(rawQueue, diagnostics, { snapshotRole: "supervisor-poll" }); const commanderConcurrency = asRecord(activity.commanderConcurrency) ?? {}; const runningTasks = sortRunningWatchTasks(allTasks); const unreadCompletedTasks = sortCompletedWatchTasks(allTasks).filter((task) => taskUnreadTerminal(task)); @@ -4104,7 +4284,7 @@ function codexTasksOverviewResult( const queuedSection = buildSupervisorTaskSection(queuedTasks, summaries, sectionLimit, sectionNextCommand(queuedTasks, sectionLimit, options, nextCommand), fullCommand); const pagination = taskPage.pagination; const diagnostics = supervisorExecutionDiagnostics(asRecord(taskPage.queue)?.executionDiagnostics); - const activity = compactCodeQueueActivity(asRecord(taskPage.queue) ?? {}, diagnostics); + const activity = compactCodeQueueActivity(asRecord(taskPage.queue) ?? {}, diagnostics, { snapshotRole: "supervisor-poll" }); const commanderConcurrency = asRecord(activity.commanderConcurrency) ?? {}; const activeRunning = supervisorActiveRunningSummary(taskPage, options, runningSection, diagnostics); const visibleSupervisorItems = [...runningSection.items, ...unreadSection.items, ...recentSection.items, ...queuedSection.items]; @@ -4509,8 +4689,8 @@ function compactQueuesResponse(body: Record, options: CodexQueu const activeQueues = queues.filter((row) => typeof row.id === "string" && activeIds.includes(row.id)); const selected = options.full ? queues : Array.from(new Map([...activeQueues, ...unreadQueues, ...runnableQueues, ...nonemptyQueues].map((row) => [String(row.id), row])).values()); const visible = selected.slice(options.offset, options.offset + options.limit); - const diagnostics = compactQueueExecutionDiagnostics(queue.executionDiagnostics); - const activity = compactCodeQueueActivity(queue, diagnostics, { schedulerLocalActiveQueueIds: activeIds, runnableQueueCount: runnableQueues.length }); + const diagnostics = compactQueueExecutionDiagnostics(queue.executionDiagnostics, { snapshotRole: "queue-summary" }); + const activity = compactCodeQueueActivity(queue, diagnostics, { schedulerLocalActiveQueueIds: activeIds, runnableQueueCount: runnableQueues.length, snapshotRole: "queue-summary" }); const commanderConcurrency = asRecord(activity.commanderConcurrency) ?? {}; const activeTaskIds = boundedUniqueStringList(queue.activeTaskIds, Math.min(options.limit, maxTasksLimit)); const queuedTaskIds = boundedUniqueStringList(queue.queuedTaskIds, Math.min(options.limit, maxTasksLimit)); @@ -5195,8 +5375,16 @@ function compactSubmitQueueConfirmation(value: unknown, options: CompactSubmitQu || databaseActivePreview.truncated === true || queuedPreview.truncated === true || submittedPreview.truncated === true; - const diagnostics = compactQueueExecutionDiagnostics(record.executionDiagnostics); - const activity = compactCodeQueueActivity(record, diagnostics); + const diagnostics = compactQueueExecutionDiagnostics(record.executionDiagnostics, { + snapshotRole: "submit-confirmation", + snapshotAt: options.submittedAt ?? null, + submittedAt: options.submittedAt ?? null, + }); + const activity = compactCodeQueueActivity(record, diagnostics, { + snapshotRole: "submit-confirmation", + snapshotAt: options.submittedAt ?? null, + submittedAt: options.submittedAt ?? null, + }); const commanderConcurrency = asRecord(activity.commanderConcurrency) ?? {}; const unavailableIdLists = { activeTaskIds: activePreview.idsUnavailable === true, @@ -5245,6 +5433,13 @@ function compactSubmitQueueConfirmation(value: unknown, options: CompactSubmitQu queueCountsSource: "response.queue.counts", activeCountField: "queue.countContext.active", commanderActiveRunnerCountField: "queue.activity.effectiveActiveTaskCount", + snapshotRole: "submit-confirmation", + boundedSnapshot: true, + submittedAt: options.submittedAt ?? null, + schedulerHeartbeatFreshnessField: "queue.activity.schedulerHeartbeatFreshness", + databaseActiveField: "queue.countContext.databaseActive", + queuedSubmittedTaskStateField: "submitted.taskStates[]", + transientRiskHint: asRecord(activity.recovery)?.hint ?? null, splitBrainLive: diagnostics?.splitBrainLive ?? false, splitBrainDisposition: diagnostics?.splitBrainLive === true ? "live-counts-remain-active; continue supervision unless commanderConcurrency.interventionRequired=true" : "not-split-brain-live", idsUnavailableMeaning: "A nonzero count with idsUnavailable=true means ids were omitted or unavailable in the bounded submit summary, not that there are no tasks.", @@ -5277,6 +5472,7 @@ function compactSubmitSuccessResponse(body: Record, upstream: R const firstTaskId = taskIds[0] ?? null; const firstQueueId = queueIds[0] ?? null; const firstSubmittedTask = submittedTasks[0] ?? {}; + const submittedAt = maxIsoTimestamp(submittedTasks.flatMap((task) => [task.createdAt, task.updatedAt])); const queueRecord = asRecord(body.queue); const runnerPermissions = compactRunnerPermissions(queueRecord?.runnerPermissions); return { @@ -5305,7 +5501,7 @@ function compactSubmitSuccessResponse(body: Record, upstream: R reason: "codex submit is a write operation; default output confirms persistence and provides drill-down commands without echoing prompt text.", }, }, - queue: compactSubmitQueueConfirmation(body.queue, { submittedTasks }), + queue: compactSubmitQueueConfirmation(body.queue, { submittedTasks, submittedAt }), submitConcurrencyGuard: compactSubmitConcurrencyGuard(lock), commands: { firstTask: firstTaskId === null ? null : `bun scripts/cli.ts codex task ${firstTaskId}`,