fix(code-queue): forward-port stale-active recovery

Supersedes stale PR #79 with a current-master forward-port of safe stale-active recovery diagnostics and explicit recovery controls.
This commit is contained in:
Lyon
2026-05-23 16:22:41 +08:00
committed by GitHub
parent d88eb460a5
commit 4ce5d2fd97
6 changed files with 379 additions and 42 deletions
+4
View File
@@ -275,6 +275,8 @@ bun scripts/cli.ts codex pr-preflight --remote --issue <issue-number>
队列诊断中的 `split-brain` 表示控制面/执行面观测分裂,不自动证明任务已经死亡。只要任务 heartbeat 还在刷新、trace 仍在推进,就不能把它判成服务中断或要求立刻 stop;应把它视为 `splitBrainLive=true` 的 live 任务,继续监督并推进 #20 里的已排任务,而不是 interrupt、替换或把 backend 当成已经挂掉。队列摘要应显示 `effectiveLiveness=live``splitBrainLive=true``recommendedAction=continue-supervision`compact 输出还应在 `executionDiagnostics.liveness` 中重复这些低噪声字段,并突出 `activeHeartbeatCount`、有界 `heartbeatFreshTaskIds``databaseActiveTaskCount``schedulerActiveRunSlotCount`。当 master/control-plane 的 `schedulerActiveRunSlotCount=0``heartbeatFreshTaskIds` 非空时,active 数应优先按 scheduler heartbeat 摘要解释为 live,而不是按 master 本地 slot 0 解释为执行停摆。只有 heartbeat expired/missing 或满足 stale-recovery 条件时,才应显示 `effectiveLiveness=at-risk` 并进入恢复判断。
stale-active 恢复和 `/api/scheduler/reconcile?staleMs=...` 诊断入口的 heartbeat stale 阈值必须按安全下限归一化:缺省和低于默认 5 分钟的值都按 5 分钟处理,过大值按 24 小时上限截断,并在结构化响应中返回 `requestedStaleMs*``staleMsAdjusted``staleMsAdjustmentReason``minStaleMs``maxStaleMs`。任何 `staleMs=0` 或过低阈值都不能把仍有 fresh scheduler heartbeat 的任务判成 stale/recoverable。
`codex queues` 和默认 supervisor 视图的 `activity` / `commanderConcurrency` 是指挥官并发治理的主读数。并发决策固定使用 `commanderConcurrency.activeRunnerCount`,它等于 `activity.effectiveActiveTaskCount`15 并发策略的可补窗口按 `15 - activeRunnerCount` 计算,不能用 `activeQueueIds.length` 或 scheduler-local slot 数替代。`effectiveActiveTaskCount` 表示用于调度判断的有效活跃任务数;`databaseRunningTaskCount` 来自 PostgreSQL 中 `running` 状态计数;`databaseActiveTaskCount` 覆盖 running/judging 等数据库活跃任务;`heartbeatFreshActiveTaskCount` 表示 heartbeat-fresh 的有效 runner 数;`schedulerLocalActiveQueueCount``schedulerLocalActiveRunSlotCount` 只表示当前控制面本地可见 active run slots。`activeQueueIds``activeQueueCount` 是 scheduler-local 字段,可能在 `counts.running>0` 且 heartbeat 新鲜时为 0;看到这种组合时应按 `activity.effectiveActiveTaskCount``activity.heartbeatFreshActiveTaskCount``splitBrainLive` 决策,不得把空 `activeQueueIds` 当作零并发或停摆证据。`commanderConcurrency.splitBrainDisposition=live-count-as-active` 表示 split-brain 仍是 live 且应计入 active runner`interventionRequired=true`、heartbeat risk、stale recovery candidates,或非 `continue-supervision` 的 recommended action 才进入人工介入/恢复判断。
单次 `provider is not online`、SSH 超时、proxy 超时或 registry 请求失败只能证明“当前观察路径失败”,不能单独升级为 D601 全局离线、CI/CD 全局阻塞或业务任务不可推进。指挥官和 runner 必须用多信号裁决运行面状态,至少区分以下观察面:
@@ -301,6 +303,8 @@ D601 artifact registry 的 systemd unit inactive 不等于 D601 全局离线。
对于 trace 或 heartbeat 新鲜的长任务,通常应保持运行。每几分钟轮询一次优于反复 interrupt/retry。
`databaseActiveTaskIds` 非空、scheduler 本地 active run/slot 为空、且 owner scheduler heartbeat 已过期时,运行面应把该任务列为 stale-active recovery candidate,并通过 scheduler reconcile 路径把它恢复为 `retry_wait`,不应依赖 rollout restart 触发 startup recovery。恢复入口必须保留 fresh heartbeat 保护:heartbeat 新鲜的 split-brain live 任务只能继续监督,trace gap 但 heartbeat 新鲜也不能触发 stale retry。只读诊断优先使用队列/health 中的 `executionDiagnostics``reconcile` 字段;需要人工确认时可先调用 bounded dry-run reconcile,只有明确使用 `POST /api/scheduler/reconcile?recover=1` 且通过高风险恢复边界后才执行恢复。OA Event Flow publisher 积压或 overflow 只能降低 trace/stats 可观测性,必须在 `oaPublisher` 状态中显式暴露并隔离为观测降级,不能阻断 PostgreSQL task state、heartbeat 或 stale recovery。
外部 token provider、模型 API 或上游服务的限流和短时不可用是正常预期,不应自动升级为 Code Queue 基础设施缺陷。典型表现包括 `429 Too Many Requests`、provider transient error、上游 timeout 或模型服务短时失败。runner 必须把这类 OpenAI/模型 provider 429 归类为 `scope=external-provider``failureKind=external-provider-rate-limit``externalProvider429=true`,并在 attempt 的 `runnerErrorClassification.backoffHint`、任务 output 的 `queue/backoff` 行和日志 `task_retry_backoff` 中暴露指数退避与 jitter 证据。退避策略是保守指数退避加稳定正向 jitter:429 至少等待 30 秒,单次不超过 10 分钟,jitter 按 task id 和 completed attempt count 稳定计算,避免多 runner 同时恢复造成 provider 再次拥塞。只要 Code Queue 的状态机仍在自动退避,task heartbeat 或 scheduler heartbeat 新鲜,且任务仍能从 `retry_wait` 回到 `running`,指挥官应等待外部 provider 自行恢复,不创建额外修复 issue、不重派重复任务、不把该现象写成 blocker。只有当退避机制失效、任务丢失、heartbeat 过期、状态机卡死,或重试耗尽进入不可恢复终态时,才按 Code Queue 基础设施问题介入。
对于大规模 CI/CD 迁移波次,除非发生事故,否则使用稳定但可自适应的监督节奏。指挥官可以根据任务活跃度、完成未读积压、heartbeat 风险和外部等待性质自行决定 sleep 时长,但单次 sleep 不能低于 5 分钟、不能高于 30 分钟。活跃排障、刚派出新任务、存在完成未读或 heartbeat 风险时使用接近 5 分钟的短轮询;长时间等待外部 CI、模型 provider 退避或镜像构建且 heartbeat 新鲜时,可以拉长到 10 到 30 分钟。每轮醒来后读取 `codex queues`,读取 terminal 或可疑任务摘要,然后决定接受、retry、拆分 blocker,或让健康任务继续运行。循环期间指挥官可以做不重叠的有用工作,例如文档或 issue 梳理,但这些辅助工作不能接管 worker 已分配的实现。
+82 -2
View File
@@ -1,11 +1,13 @@
import { buildExecutionDiagnostics, buildSchedulerHeartbeat, staleRecoveryCandidate, taskHasTraceGapButFreshHeartbeat } from "../../src/components/microservices/code-queue/src/execution-diagnostics";
import { buildExecutionDiagnostics, buildSchedulerHeartbeat, schedulerHeartbeatStaleMs, staleRecoveryCandidate, taskHasTraceGapButFreshHeartbeat } from "../../src/components/microservices/code-queue/src/execution-diagnostics";
import type { ActiveRun } from "../../src/components/microservices/code-queue/src/code-agent/common";
import type { CodeQueueExecutionDiagnostics, QueueTask, SchedulerActiveRunHeartbeat, TaskStatus } from "../../src/components/microservices/code-queue/src/types";
export const CODE_QUEUE_LIVENESS_CHECK_NAMES = [
"code-queue:active-run-heartbeat-visible",
"code-queue:trace-gap-not-stale",
"code-queue:heartbeat-threshold-normalized",
"code-queue:stale-active-owner-expired",
"code-queue:stale-active-recovery-transition",
"code-queue:control-plane-split-brain-diagnostics",
"code-queue:oa-publisher-degraded-visible",
] as const;
@@ -104,9 +106,10 @@ function activeRun(taskId: string, queueId = "default"): ActiveRun {
};
}
function schedulerDiagnostics(tasks: QueueTask[], activeRuns: ActiveRun[] = [], oaPublisher: unknown = null): CodeQueueExecutionDiagnostics {
function schedulerDiagnostics(tasks: QueueTask[], activeRuns: ActiveRun[] = [], oaPublisher: unknown = null, heartbeatStaleMs?: number): CodeQueueExecutionDiagnostics {
return buildExecutionDiagnostics({
now,
heartbeatStaleMs,
controlPlane: "D601-code-queue-scheduler",
executionStateSource: "scheduler-execution-plane",
tasks,
@@ -119,6 +122,37 @@ function schedulerDiagnostics(tasks: QueueTask[], activeRuns: ActiveRun[] = [],
});
}
function checkHeartbeatThresholdNormalized(): FixtureCheck {
const task = fixtureTask("codex_fixture_threshold_fresh_1", "running", heartbeat("codex_fixture_threshold_fresh_1", freshAt));
const run = activeRun(task.id);
const zeroDecision = staleRecoveryCandidate({ task, localActive: false, now, heartbeatStaleMs: 0 });
const tooLowDecision = staleRecoveryCandidate({ task, localActive: false, now, heartbeatStaleMs: 1 });
const zeroDiagnostics = schedulerDiagnostics([task], [run], null, 0);
const tooLowDiagnostics = schedulerDiagnostics([task], [run], null, 1);
assertCondition(zeroDecision.allowed === false && zeroDecision.reason === "owner-heartbeat-fresh", "zero heartbeat threshold must normalize before stale recovery", { zeroDecision });
assertCondition(tooLowDecision.allowed === false && tooLowDecision.reason === "owner-heartbeat-fresh", "too-low heartbeat threshold must normalize before stale recovery", { tooLowDecision });
assertCondition(zeroDiagnostics.schedulerHeartbeatStaleMs === schedulerHeartbeatStaleMs, "zero diagnostics threshold should normalize to default stale window", zeroDiagnostics as unknown as Record<string, unknown>);
assertCondition(tooLowDiagnostics.schedulerHeartbeatStaleMs === schedulerHeartbeatStaleMs, "too-low diagnostics threshold should normalize to default stale window", tooLowDiagnostics as unknown as Record<string, unknown>);
assertCondition(zeroDiagnostics.state === "healthy" && zeroDiagnostics.effectiveLiveness === "healthy", "fresh heartbeat with zero requested threshold must stay healthy", zeroDiagnostics as unknown as Record<string, unknown>);
assertCondition(tooLowDiagnostics.state === "healthy" && tooLowDiagnostics.effectiveLiveness === "healthy", "fresh heartbeat with too-low requested threshold must stay healthy", tooLowDiagnostics as unknown as Record<string, unknown>);
assertCondition(zeroDiagnostics.heartbeatFreshTaskIds.includes(task.id), "fresh heartbeat must remain in fresh task ids", zeroDiagnostics as unknown as Record<string, unknown>);
assertCondition(zeroDiagnostics.heartbeatExpiredTaskIds.length === 0 && zeroDiagnostics.staleRecoveryCandidateTaskIds.length === 0 && zeroDiagnostics.heartbeatRiskTaskIds.length === 0, "fresh heartbeat must not be reported stale or risky", zeroDiagnostics as unknown as Record<string, unknown>);
return {
name: "code-queue:heartbeat-threshold-normalized",
ok: true,
detail: {
zeroDecision,
tooLowDecision,
normalizedStaleMs: zeroDiagnostics.schedulerHeartbeatStaleMs,
state: zeroDiagnostics.state,
effectiveLiveness: zeroDiagnostics.effectiveLiveness,
heartbeatFreshTaskIds: zeroDiagnostics.heartbeatFreshTaskIds,
heartbeatExpiredTaskIds: zeroDiagnostics.heartbeatExpiredTaskIds,
staleRecoveryCandidateTaskIds: zeroDiagnostics.staleRecoveryCandidateTaskIds,
},
};
}
function checkActiveRunHeartbeatVisible(): FixtureCheck {
const task = fixtureTask("codex_fixture_active_1", "running");
const run = activeRun(task.id);
@@ -194,6 +228,50 @@ function checkStaleActiveOwnerExpired(): FixtureCheck {
};
}
function recoverStaleActiveTaskForFixture(task: QueueTask): number {
const decision = staleRecoveryCandidate({ task, localActive: false, now });
if (!decision.allowed) return 0;
task.status = "retry_wait";
task.finishedAt = null;
task.readAt = null;
task.activeTurnId = null;
task.lastError = "fixture stale-active recovery";
task.nextMode = "retry";
task.nextPrompt = "fixture recovery prompt";
task.updatedAt = now;
return 1;
}
function checkStaleActiveRecoveryTransition(): FixtureCheck {
const task = fixtureTask("codex_fixture_stale_recovery_1", "running", heartbeat("codex_fixture_stale_recovery_1", expiredAt, {
lastObservedAgentEventAt: expiredAt,
lastPersistedTraceAt: expiredAt,
}));
const freshTask = fixtureTask("codex_fixture_fresh_no_recovery_1", "running", heartbeat("codex_fixture_fresh_no_recovery_1", freshAt));
const diagnosticsBefore = schedulerDiagnostics([task], []);
const recovered = recoverStaleActiveTaskForFixture(task);
const freshRecovered = recoverStaleActiveTaskForFixture(freshTask);
const diagnosticsAfter = schedulerDiagnostics([task], []);
assertCondition(recovered === 1, "expired active task should be recovered exactly once", { recovered, task });
assertCondition(freshRecovered === 0 && freshTask.status === "running", "fresh owner heartbeat must block recovery", { freshRecovered, freshStatus: freshTask.status });
assertCondition(task.status === "retry_wait" && task.activeTurnId === null && task.nextMode === "retry", "recovered task should re-enter retry_wait with active turn cleared", { task });
assertCondition(diagnosticsBefore.staleRecoveryCandidateTaskIds.includes("codex_fixture_stale_recovery_1"), "before recovery should show stale candidate", diagnosticsBefore as unknown as Record<string, unknown>);
assertCondition(!diagnosticsAfter.staleRecoveryCandidateTaskIds.includes("codex_fixture_stale_recovery_1"), "after recovery should remove stale candidate", diagnosticsAfter as unknown as Record<string, unknown>);
return {
name: "code-queue:stale-active-recovery-transition",
ok: true,
detail: {
recovered,
freshRecovered,
status: task.status,
activeTurnId: task.activeTurnId,
nextMode: task.nextMode,
candidatesBefore: diagnosticsBefore.staleRecoveryCandidateTaskIds,
candidatesAfter: diagnosticsAfter.staleRecoveryCandidateTaskIds,
},
};
}
function checkControlPlaneSplitBrainDiagnostics(): FixtureCheck {
const task = fixtureTask("codex_fixture_split_1", "running", heartbeat("codex_fixture_split_1", freshAt));
const diagnostics = buildExecutionDiagnostics({
@@ -247,7 +325,9 @@ export function runCodeQueueLivenessFixtureChecks(only: string[] = []): { ok: bo
const runners: Array<[CodeQueueLivenessCheckName, () => FixtureCheck]> = [
["code-queue:active-run-heartbeat-visible", checkActiveRunHeartbeatVisible],
["code-queue:trace-gap-not-stale", checkTraceGapNotStale],
["code-queue:heartbeat-threshold-normalized", checkHeartbeatThresholdNormalized],
["code-queue:stale-active-owner-expired", checkStaleActiveOwnerExpired],
["code-queue:stale-active-recovery-transition", checkStaleActiveRecoveryTransition],
["code-queue:control-plane-split-brain-diagnostics", checkControlPlaneSplitBrainDiagnostics],
["code-queue:oa-publisher-degraded-visible", checkOaPublisherDegradedVisible],
];
@@ -4,6 +4,7 @@ import type { ActiveRun } from "./code-agent/common";
import type { CodeQueueExecutionDiagnostics, JsonValue, QueueTask, SchedulerActiveRunHeartbeat } from "./types";
export const schedulerHeartbeatStaleMs = 5 * 60 * 1000;
export const schedulerHeartbeatStaleMsMax = 24 * 60 * 60_000;
interface ExecutionDiagnosticsInput {
now?: string;
@@ -59,6 +60,19 @@ function heartbeatFresh(heartbeat: SchedulerActiveRunHeartbeat | null, nowMs: nu
return atMs !== null && nowMs - atMs < staleMs;
}
export function normalizeSchedulerHeartbeatStaleMs(
value: number | null | undefined,
options: { min?: number; max?: number } = {},
): number {
const minimum = Math.max(1, Math.floor(options.min ?? schedulerHeartbeatStaleMs));
const maximum = Math.max(minimum, Math.floor(options.max ?? Number.MAX_SAFE_INTEGER));
const parsed = value ?? schedulerHeartbeatStaleMs;
if (!Number.isFinite(parsed)) return minimum;
const floored = Math.floor(parsed);
if (floored < minimum) return minimum;
return Math.min(maximum, floored);
}
function outputMaxSeq(task: QueueTask): number {
const values = [
Number(task.outputMaxSeq ?? 0),
@@ -77,7 +91,8 @@ export function staleRecoveryCandidate(input: StaleRecoveryInput): { allowed: bo
const heartbeatMs = timestampMs(heartbeat.lastLocalHeartbeatAt);
if (heartbeatMs === null) return { allowed: false, reason: "owner-heartbeat-missing", heartbeatFresh: false, heartbeatAgeMs: null };
const heartbeatAgeMs = Math.max(0, nowMs - heartbeatMs);
const fresh = heartbeatAgeMs < (input.heartbeatStaleMs ?? schedulerHeartbeatStaleMs);
const staleMs = normalizeSchedulerHeartbeatStaleMs(input.heartbeatStaleMs);
const fresh = heartbeatAgeMs < staleMs;
if (fresh) return { allowed: false, reason: "owner-heartbeat-fresh", heartbeatFresh: true, heartbeatAgeMs };
return { allowed: true, reason: "owner-heartbeat-expired", heartbeatFresh: false, heartbeatAgeMs };
}
@@ -85,10 +100,11 @@ export function staleRecoveryCandidate(input: StaleRecoveryInput): { allowed: bo
export function taskHasTraceGapButFreshHeartbeat(task: QueueTask, now: string, heartbeatStaleMs = schedulerHeartbeatStaleMs): boolean {
if (!activeTask(task)) return false;
const heartbeat = taskHeartbeat(task);
if (!heartbeatFresh(heartbeat, timestampMs(now) ?? Date.now(), heartbeatStaleMs)) return false;
const staleMs = normalizeSchedulerHeartbeatStaleMs(heartbeatStaleMs);
if (!heartbeatFresh(heartbeat, timestampMs(now) ?? Date.now(), staleMs)) return false;
const traceAt = timestampMs(heartbeat?.lastPersistedTraceAt);
if (traceAt === null) return false;
return (timestampMs(now) ?? Date.now()) - traceAt >= heartbeatStaleMs;
return (timestampMs(now) ?? Date.now()) - traceAt >= staleMs;
}
export function buildSchedulerHeartbeat(task: QueueTask, run: ActiveRun | null, options: {
@@ -122,7 +138,7 @@ export function buildSchedulerHeartbeat(task: QueueTask, run: ActiveRun | null,
export function buildExecutionDiagnostics(input: ExecutionDiagnosticsInput): CodeQueueExecutionDiagnostics {
const now = input.now ?? new Date().toISOString();
const nowMs = timestampMs(now) ?? Date.now();
const staleMs = input.heartbeatStaleMs ?? schedulerHeartbeatStaleMs;
const staleMs = normalizeSchedulerHeartbeatStaleMs(input.heartbeatStaleMs);
const databaseActiveTaskIds = input.tasks.filter(activeTask).map((task) => task.id).sort();
const activeRunRows = Array.from(input.activeRuns ?? []);
const schedulerActiveTaskIds = Array.from(new Set(activeRunRows.map((run) => run.taskId).filter(Boolean))).sort();
@@ -116,7 +116,7 @@ import {
taskForListResponse,
transcriptChunkResponse,
} from "./queue-api";
import { buildExecutionDiagnostics, buildSchedulerHeartbeat, schedulerHeartbeatStaleMs, staleRecoveryCandidate } from "./execution-diagnostics";
import { buildExecutionDiagnostics, buildSchedulerHeartbeat, normalizeSchedulerHeartbeatStaleMs, schedulerHeartbeatStaleMs, schedulerHeartbeatStaleMsMax, staleRecoveryCandidate } from "./execution-diagnostics";
import { ReferenceTaskLookupError, configureReferences, injectReferencedTaskContext, taskReferenceIds } from "./references";
import {
applyOaTraceStatsToTaskJson,
@@ -135,7 +135,7 @@ import {
} from "./oa-events";
import { collectRuntimePreflight, runtimePreflightJson } from "./runtime-preflight";
import { collectSkillAvailability, collectSkillSyncPreflight, skillAvailabilityJson, skillSyncPreflightJson } from "./skill-availability";
import { configureSelfTests, runJudgeInfraSelfTest, runQueueClaimMoveSelfTest, runQueueOrderingSelfTest, runReferenceInjectionSelfTest, runTracePortSelfTest, runTraceSummaryContractSelfTest } from "./self-tests";
import { configureSelfTests, runJudgeInfraSelfTest, runQueueClaimMoveSelfTest, runQueueOrderingSelfTest, runReferenceInjectionSelfTest, runStaleActiveRecoverySelfTest, runTracePortSelfTest, runTraceSummaryContractSelfTest } from "./self-tests";
import {
codexToolLifecycleStartedBeforeIn,
configureTaskView,
@@ -177,7 +177,7 @@ const maxTaskAttempts = 99;
const referenceInjectionMaxRounds: number | null = null;
const retryBackoffBaseMs = 1000;
const retryBackoffMaxMs = 10 * 60 * 1000;
const orphanedActiveTaskRecoveryStaleMs = 5 * 60 * 1000;
const orphanedActiveTaskRecoveryStaleMs = normalizeSchedulerHeartbeatStaleMs(5 * 60 * 1000);
const queueIdPattern = /^[A-Za-z0-9][A-Za-z0-9_.-]{0,63}$/u;
const queueNameMaxLength = 80;
const workdirMaxLength = 512;
@@ -2722,6 +2722,12 @@ function truthyParam(url: URL, name: string): boolean {
return value === "1" || value === "true" || value === "yes";
}
function boundedStringSetParam(url: URL, name: string, maxItems = 50): Set<string> | undefined {
const rawValues = url.searchParams.getAll(name).flatMap((value) => value.split(","));
const values = Array.from(new Set(rawValues.map((value) => value.trim()).filter(Boolean))).slice(0, maxItems);
return values.length === 0 ? undefined : new Set(values);
}
function parseSeqParam(url: URL, name: string, defaultValue: number | null): number | null {
const raw = url.searchParams.get(name);
if (raw === null) return defaultValue;
@@ -2965,6 +2971,7 @@ configureSelfTests({
removeActiveRunSlotWaiter,
resolveReasoningEffort,
runDatabaseClaimMoveSelfTest,
staleActiveRecoverySelfTest: runSchedulerStaleActiveRecoverySelfTest,
taskIsRecoverableOrphanedActive,
tasks: () => state.tasks,
updateProcessingFlag,
@@ -3608,10 +3615,39 @@ function orphanedActiveTaskAgeMs(task: QueueTask): number {
function taskIsRecoverableOrphanedActive(task: QueueTask, staleMs = orphanedActiveTaskRecoveryStaleMs): boolean {
if (task.status !== "running" && task.status !== "judging") return false;
if (activeTaskHasLocalRunSlotOrWaiter(task)) return false;
const decision = staleRecoveryCandidate({ task, localActive: false, heartbeatStaleMs: staleMs, now: nowIso() });
const effectiveStaleMs = normalizeSchedulerHeartbeatStaleMs(staleMs, {
min: orphanedActiveTaskRecoveryStaleMs,
max: schedulerHeartbeatStaleMsMax,
});
const decision = staleRecoveryCandidate({ task, localActive: false, heartbeatStaleMs: effectiveStaleMs, now: nowIso() });
return decision.allowed;
}
function recoverableOrphanedActiveTaskIds(tasks: QueueTask[] = state.tasks, staleMs = orphanedActiveTaskRecoveryStaleMs, filterIds?: Set<string>): string[] {
return tasks
.filter((task) => filterIds === undefined || filterIds.has(task.id))
.filter((task) => taskIsRecoverableOrphanedActive(task, staleMs))
.map((task) => task.id)
.sort();
}
function schedulerReconcileCandidateTasks(databaseTasks: QueueTask[]): QueueTask[] {
const mergedById = new Map<string, QueueTask>();
for (const task of state.tasks) mergedById.set(task.id, task);
for (const task of databaseTasks) {
if (task.status !== "queued" && task.status !== "retry_wait" && task.status !== "running" && task.status !== "judging") continue;
const existing = mergedById.get(task.id);
if (existing === undefined) {
mergedById.set(task.id, task);
continue;
}
const existingUpdatedAt = timestampMs(existing.updatedAt) ?? 0;
const taskUpdatedAt = timestampMs(task.updatedAt) ?? 0;
if (taskUpdatedAt > existingUpdatedAt && !activeRunForTask(existing)) mergedById.set(task.id, task);
}
return Array.from(mergedById.values());
}
function schedulerReconcileStatus(tasks: QueueTask[] = state.tasks): JsonValue {
const orphanedActiveTasks = tasks
.filter((task) => (task.status === "running" || task.status === "judging") && !activeTaskHasLocalRunSlotOrWaiter(task))
@@ -3694,13 +3730,11 @@ function queueActiveTasksForRestartRetry(reason: string, method: string, options
async function recoverOrphanedActiveTasks(reason: string, method: string, options: { taskIds?: Set<string>; staleMs?: number } = {}): Promise<number> {
schedulerReconcileAudit.lastRunAt = nowIso();
schedulerReconcileAudit.lastReason = reason;
const staleMs = options.staleMs ?? orphanedActiveTaskRecoveryStaleMs;
const staleOrphanTaskIds = new Set(
state.tasks
.filter((task) => options.taskIds === undefined || options.taskIds.has(task.id))
.filter((task) => taskIsRecoverableOrphanedActive(task, staleMs))
.map((task) => task.id),
);
const staleMs = normalizeSchedulerHeartbeatStaleMs(options.staleMs, {
min: orphanedActiveTaskRecoveryStaleMs,
max: schedulerHeartbeatStaleMsMax,
});
const staleOrphanTaskIds = new Set(recoverableOrphanedActiveTaskIds(state.tasks, staleMs, options.taskIds));
if (staleOrphanTaskIds.size === 0) {
schedulerReconcileAudit.lastRecovered = 0;
schedulerReconcileAudit.lastError = null;
@@ -3722,6 +3756,178 @@ async function recoverOrphanedActiveTasks(reason: string, method: string, option
return recovered;
}
function schedulerRecoveryStaleMsFromUrl(url: URL): {
staleMs: number;
requestedStaleMs: number | null;
requestedStaleMsRaw: string | null;
staleMsAdjusted: boolean;
staleMsAdjustmentReason: "default" | "accepted" | "invalid" | "below-safe-minimum" | "above-maximum";
minStaleMs: number;
maxStaleMs: number;
} {
const requestedStaleMsRaw = url.searchParams.get("staleMs");
const minStaleMs = orphanedActiveTaskRecoveryStaleMs;
const maxStaleMs = schedulerHeartbeatStaleMsMax;
if (requestedStaleMsRaw === null || requestedStaleMsRaw.trim().length === 0) {
return {
staleMs: minStaleMs,
requestedStaleMs: null,
requestedStaleMsRaw,
staleMsAdjusted: false,
staleMsAdjustmentReason: "default",
minStaleMs,
maxStaleMs,
};
}
const requestedStaleMs = Number(requestedStaleMsRaw);
const staleMs = normalizeSchedulerHeartbeatStaleMs(requestedStaleMs, { min: minStaleMs, max: maxStaleMs });
const requestedFloor = Math.floor(requestedStaleMs);
const staleMsAdjustmentReason = !Number.isFinite(requestedStaleMs)
? "invalid"
: requestedFloor < minStaleMs
? "below-safe-minimum"
: requestedFloor > maxStaleMs
? "above-maximum"
: "accepted";
return {
staleMs,
requestedStaleMs: Number.isFinite(requestedStaleMs) ? requestedStaleMs : null,
requestedStaleMsRaw,
staleMsAdjusted: staleMsAdjustmentReason !== "accepted",
staleMsAdjustmentReason,
minStaleMs,
maxStaleMs,
};
}
async function reconcileStaleActiveTasksFromApi(url: URL, req: Request): Promise<Response> {
if (!serviceRoleAllowsScheduler(config.serviceRole)) return schedulerOnlyRejectResponse(req.method, "/api/scheduler/reconcile");
const notReady = requireDatabaseReadyForWrite(req.method, "/api/scheduler/reconcile");
if (notReady !== null) return notReady;
const recoverRequested = req.method === "POST" && truthyParam(url, "recover");
const dryRun = !recoverRequested || truthyParam(url, "dryRun");
const taskIds = boundedStringSetParam(url, "taskId");
const staleThreshold = schedulerRecoveryStaleMsFromUrl(url);
const staleMs = staleThreshold.staleMs;
let loaded = 0;
let candidateTasks = state.tasks;
if (dryRun) {
const databaseTasks = await loadTasksFromDatabase("hot");
loaded = databaseTasks.length;
candidateTasks = schedulerReconcileCandidateTasks(databaseTasks);
} else {
await refreshSchedulerTasksFromDatabase("api-reconcile", { recover: false });
loaded = schedulerReconcileAudit.lastLoaded;
}
const candidateTaskIds = recoverableOrphanedActiveTaskIds(candidateTasks, staleMs, taskIds);
let recovered = 0;
if (!dryRun && candidateTaskIds.length > 0) {
recovered = await recoverOrphanedActiveTasks("Scheduler API recovered expired database active task without local run", "scheduler/api-reconcile-recovery", { taskIds: new Set(candidateTaskIds), staleMs });
}
return jsonResponse({
ok: true,
dryRun,
recoverRequested,
loaded,
staleMs,
requestedStaleMs: staleThreshold.requestedStaleMs,
requestedStaleMsRaw: staleThreshold.requestedStaleMsRaw,
staleMsAdjusted: staleThreshold.staleMsAdjusted,
staleMsAdjustmentReason: staleThreshold.staleMsAdjustmentReason,
minStaleMs: staleThreshold.minStaleMs,
maxStaleMs: staleThreshold.maxStaleMs,
candidateTaskIds,
candidateCount: candidateTaskIds.length,
recovered,
reconcile: schedulerReconcileStatus(dryRun ? candidateTasks : state.tasks),
executionDiagnostics: executionDiagnosticsForTasks(dryRun ? candidateTasks : state.tasks),
oaEventPublisher: oaEventPublisherStatus(),
});
}
async function runSchedulerStaleActiveRecoverySelfTest(): Promise<JsonValue> {
const staleAt = new Date(Date.now() - orphanedActiveTaskRecoveryStaleMs - 60_000).toISOString();
const createdAt = new Date(Date.now() - orphanedActiveTaskRecoveryStaleMs - 120_000).toISOString();
const task = normalizeTask({
id: "codex_stale_active_recovery_self_test",
queueId: "stale_active_recovery_self_test",
queueEnteredAt: createdAt,
prompt: "stale active recovery self-test",
basePrompt: "stale active recovery self-test",
referenceTaskIds: [],
referenceInjection: null,
providerId: config.mainProviderId,
cwd: config.defaultWorkdir,
model: config.defaultModel,
reasoningEffort: resolveReasoningEffort(config.defaultModel, config.defaultReasoningEffort),
executionMode: "default",
maxAttempts: 1,
status: "running",
createdAt,
updatedAt: staleAt,
startedAt: createdAt,
finishedAt: null,
readAt: null,
currentAttempt: 1,
currentMode: "initial",
codexThreadId: "thread_stale_active_recovery_self_test",
activeTurnId: "turn_stale_active_recovery_self_test",
finalResponse: "",
lastError: null,
lastJudge: null,
judgeFailCount: 0,
promptHistory: [],
output: [],
events: [],
attempts: [],
cancelRequested: false,
nextPrompt: null,
nextMode: null,
});
task.schedulerHeartbeat = {
taskId: task.id,
queueId: task.queueId,
attempt: 1,
activeTurnId: task.activeTurnId,
codexThreadId: task.codexThreadId,
owner: "self-test",
schedulerInstance: "self-test",
executionPlane: "scheduler-execution-plane",
agentPort: "codex",
status: task.status,
lastLocalHeartbeatAt: staleAt,
lastObservedAgentEventAt: staleAt,
lastPersistedTraceAt: staleAt,
outputMaxSeq: 0,
source: "scheduler",
};
const before = state.tasks.slice();
try {
state.tasks.push(task);
const candidatesBefore = recoverableOrphanedActiveTaskIds(state.tasks, 0, new Set([task.id]));
const recovered = queueActiveTasksForRestartRetry("stale active recovery self-test", "self-test/stale-active-recovery", { taskIds: new Set(candidatesBefore) });
const candidatesAfter = recoverableOrphanedActiveTaskIds(state.tasks, 0, new Set([task.id]));
if (!candidatesBefore.includes(task.id)) throw new Error("expired active self-test task was not recovery-eligible");
if (recovered !== 1) throw new Error(`expected one recovered stale active task, got ${recovered}`);
if (task.status !== "retry_wait") throw new Error(`expected retry_wait after recovery, got ${task.status}`);
if (task.activeTurnId !== null) throw new Error("recovery should clear activeTurnId");
if (candidatesAfter.length !== 0) throw new Error("recovered self-test task should no longer be stale-active eligible");
return {
ok: true,
taskId: task.id,
recovered,
status: task.status,
candidatesBefore,
candidatesAfter,
lastError: task.lastError,
nextMode: task.nextMode,
} as unknown as JsonValue;
} finally {
state.tasks.splice(0, state.tasks.length, ...before);
dirtyDatabaseTaskIds.delete(task.id);
}
}
function failTaskForFallbackRetryLimit(task: QueueTask, judge: JudgeResult | null): void {
const count = fallbackJudgeRetryCount(task);
const reason = `Fallback/non-LLM judge retry limit reached (${count}/${fallbackJudgeRetryLimit}). ${judge?.reason ?? task.lastJudge?.reason ?? "MiniMax judge was unavailable."}`;
@@ -4245,7 +4451,7 @@ function hasRunnableTask(): boolean {
}
function shouldPollSchedulerDatabase(): boolean {
return config.schedulerEnabled && config.serviceRole === "scheduler";
return config.schedulerEnabled && serviceRoleAllowsScheduler(config.serviceRole);
}
function mergeSchedulerDatabaseTasks(tasks: QueueTask[]): number {
@@ -4272,14 +4478,14 @@ function mergeSchedulerDatabaseTasks(tasks: QueueTask[]): number {
return changed;
}
async function refreshSchedulerTasksFromDatabase(reason: string): Promise<number> {
async function refreshSchedulerTasksFromDatabase(reason: string, options: { recover?: boolean } = {}): Promise<number> {
if (!databaseReady || !config.schedulerEnabled) return 0;
schedulerReconcileAudit.lastRunAt = nowIso();
schedulerReconcileAudit.lastReason = reason;
try {
const tasks = await loadTasksFromDatabase("hot");
const changed = mergeSchedulerDatabaseTasks(tasks);
const recovered = await recoverOrphanedActiveTasks("Scheduler recovered database active task without local run", `scheduler/${reason}-recovery`);
const recovered = options.recover === false ? 0 : await recoverOrphanedActiveTasks("Scheduler recovered database active task without local run", `scheduler/${reason}-recovery`);
schedulerReconcileAudit.lastLoaded = tasks.length;
schedulerReconcileAudit.lastChanged = changed;
schedulerReconcileAudit.lastRecovered = recovered;
@@ -5372,6 +5578,8 @@ async function route(req: Request): Promise<Response> {
if (url.pathname === "/api/reference-injection/self-test" && (req.method === "GET" || req.method === "POST")) return jsonResponse(await runReferenceInjectionSelfTest());
if (url.pathname === "/api/trace-port/self-test" && (req.method === "GET" || req.method === "POST")) return jsonResponse(runTracePortSelfTest());
if (url.pathname === "/api/trace-summary-contract/self-test" && (req.method === "GET" || req.method === "POST")) return jsonResponse(runTraceSummaryContractSelfTest());
if (url.pathname === "/api/stale-active-recovery/self-test" && (req.method === "GET" || req.method === "POST")) return jsonResponse(await runStaleActiveRecoverySelfTest());
if (url.pathname === "/api/scheduler/reconcile" && (req.method === "GET" || req.method === "POST")) return await reconcileStaleActiveTasksFromApi(url, req);
if (url.pathname === "/api/oa/backfill" && (req.method === "GET" || req.method === "POST")) return jsonResponse(await backfillOaTraceStats(url));
if (url.pathname === "/api/notifications/claudeqq" && req.method === "GET") {
await loadClaudeQqNotificationOutboxFromDatabase();
@@ -72,9 +72,13 @@ let totalFailed = 0;
let totalRetried = 0;
let totalAbandoned = 0;
let totalDropped = 0;
let totalOverflowDropped = 0;
let lastEnqueuedAt: string | null = null;
let lastSucceededAt: string | null = null;
let lastDroppedAt: string | null = null;
let lastOverflowAt: string | null = null;
let lastError: JsonRecord | null = null;
let nextOverflowLogAtMs = 0;
export function configureOaEvents(runtimeContext: OaEventContext): void {
const baseUrl = runtimeContext.baseUrl.replace(/\/+$/u, "");
@@ -122,8 +126,11 @@ export function oaEventPublisherStatus(): JsonRecord {
totalRetried,
totalAbandoned,
totalDropped,
totalOverflowDropped,
lastEnqueuedAt,
lastSucceededAt,
lastDroppedAt,
lastOverflowAt,
lastError,
};
}
@@ -187,10 +194,23 @@ function postOaEvent(event: OaEventEnvelope): void {
totalEnqueued += 1;
lastEnqueuedAt = runtime.nowIso();
if (queuedEvents.length >= maxQueuedEvents) {
const dropped = queuedEvents.shift();
totalDropped += 1;
lastError = { eventId: dropped?.event.eventId ?? "", type: dropped?.event.type ?? "", error: "oa event publish queue overflow" };
runtime.logger("error", "oa_event_publish_queue_overflow", { ...lastError, pending: queuedEvents.length, maxQueuedEvents });
totalOverflowDropped += 1;
lastDroppedAt = lastEnqueuedAt;
lastOverflowAt = lastEnqueuedAt;
lastError = { eventId: event.eventId, type: event.type, error: "oa event publish queue overflow; incoming observability event dropped" };
const nowMs = Date.now();
if (nowMs >= nextOverflowLogAtMs) {
nextOverflowLogAtMs = nowMs + 30_000;
runtime.logger("error", "oa_event_publish_queue_overflow", {
...lastError,
pending: queuedEvents.length,
maxQueuedEvents,
totalOverflowDropped,
isolated: true,
});
}
return;
}
queuedEvents.push({ event, attempts: 1 });
pumpOaEventQueue();
@@ -7,7 +7,7 @@ import { codeQueueEnvironmentHintTitle, injectCodeQueueEnvironmentHint, promptWi
import { buildTaskTranscript, safePreview, taskTraceSummaryFixtureResponse, transcriptLineSummaryLines } from "./task-view";
import type { ActiveRunSlotWaiter } from "./code-agent/common";
import type { OaTraceStepSummary } from "./oa-events";
import type { JsonValue, LiveOutput, QueueTask, QueuedStatusReason, QueueTaskRequest, RuntimeConfig, TaskStatus } from "./types";
import type { JsonValue, LiveOutput, QueueTask, QueuedStatusReason, QueueTaskRequest, RuntimeConfig, SchedulerActiveRunHeartbeat, TaskStatus } from "./types";
export interface SelfTestsContext {
config: Pick<RuntimeConfig, "defaultModel" | "defaultReasoningEffort" | "defaultWorkdir" | "mainProviderId" | "maxActiveQueues">;
@@ -30,6 +30,7 @@ export interface SelfTestsContext {
resolveReasoningEffort: (model: string, explicit?: string | null) => string | null;
runDatabaseClaimMoveSelfTest?: () => Promise<JsonValue | null>;
runTraceSummaryContractSelfTest?: () => JsonValue;
staleActiveRecoverySelfTest: () => Promise<JsonValue>;
taskIsRecoverableOrphanedActive: (task: QueueTask, staleMs?: number) => boolean;
tasks: () => QueueTask[];
updateProcessingFlag: () => void;
@@ -311,6 +312,30 @@ function queueOrderTestTask(id: string, status: TaskStatus, createdAt: string, q
return ctx().normalizeTask(task);
}
function selfTestSchedulerHeartbeat(task: QueueTask, lastLocalHeartbeatAt: string): SchedulerActiveRunHeartbeat {
return {
taskId: task.id,
queueId: task.queueId,
attempt: 1,
activeTurnId: task.activeTurnId,
codexThreadId: task.codexThreadId,
owner: "self-test",
schedulerInstance: "self-test",
executionPlane: "scheduler-execution-plane",
agentPort: "codex",
status: task.status,
lastLocalHeartbeatAt,
lastObservedAgentEventAt: lastLocalHeartbeatAt,
lastPersistedTraceAt: lastLocalHeartbeatAt,
outputMaxSeq: 0,
source: "scheduler",
};
}
async function runStaleActiveRecoverySelfTest(): Promise<JsonValue> {
return await ctx().staleActiveRecoverySelfTest();
}
async function runQueueClaimMoveSelfTest(): Promise<JsonValue> {
const at = "2026-05-17T06:09:46.702Z";
const task = queueOrderTestTask("codex_claim_move_self_test", "running", at, at);
@@ -370,23 +395,7 @@ function runQueueOrderingSelfTest(): JsonValue {
const orphanRunning = queueOrderTestTask("codex_4400_orphan_running", "running", "2026-05-11T13:00:00.000Z", "2026-05-11T13:00:00.000Z");
orphanRunning.queueId = "queue_orphan_recovery";
orphanRunning.activeTurnId = null;
orphanRunning.schedulerHeartbeat = {
taskId: orphanRunning.id,
queueId: orphanRunning.queueId,
attempt: 1,
activeTurnId: null,
codexThreadId: null,
owner: "self-test",
schedulerInstance: "self-test",
executionPlane: "scheduler-execution-plane",
agentPort: "codex",
status: "running",
lastLocalHeartbeatAt: "2026-05-11T13:00:00.000Z",
lastObservedAgentEventAt: null,
lastPersistedTraceAt: null,
outputMaxSeq: 0,
source: "scheduler",
};
orphanRunning.schedulerHeartbeat = selfTestSchedulerHeartbeat(orphanRunning, "2026-05-11T13:00:00.000Z");
const queuedBehindOrphan = queueOrderTestTask("codex_4401_queued", "queued", "2026-05-11T13:01:00.000Z", "2026-05-11T13:01:00.000Z");
queuedBehindOrphan.queueId = "queue_orphan_recovery";
const originalMaxActiveQueues = ctx().config.maxActiveQueues;
@@ -676,4 +685,4 @@ function runJudgeInfraSelfTest(): JsonValue {
};
}
export { runJudgeInfraSelfTest, runQueueClaimMoveSelfTest, runQueueOrderingSelfTest, runReferenceInjectionSelfTest, runTracePortSelfTest, runTraceSummaryContractSelfTest };
export { runJudgeInfraSelfTest, runQueueClaimMoveSelfTest, runQueueOrderingSelfTest, runReferenceInjectionSelfTest, runStaleActiveRecoverySelfTest, runTracePortSelfTest, runTraceSummaryContractSelfTest };