From 0d040a33c27f8690551f98dba7611e55fc5129cc Mon Sep 17 00:00:00 2001 From: Codex Date: Wed, 3 Jun 2026 23:49:40 +0800 Subject: [PATCH] fix: wait for stale runner lease before replacement claim --- src/runner/main.ts | 2 + src/runner/run-once.ts | 75 +++++++++++++++++++++++++++- src/selftest/cases/30-codex-stdio.ts | 39 ++++++++++++++- 3 files changed, 114 insertions(+), 2 deletions(-) diff --git a/src/runner/main.ts b/src/runner/main.ts index 220ede8..0a8a6f7 100644 --- a/src/runner/main.ts +++ b/src/runner/main.ts @@ -34,6 +34,8 @@ if (process.env.AGENTRUN_CODEX_ARGS) options.codexArgs = JSON.parse(process.env. if (process.env.CODEX_HOME) options.codexHome = process.env.CODEX_HOME; if (process.env.AGENTRUN_RUNNER_IDLE_TIMEOUT_MS) options.idleTimeoutMs = Number(process.env.AGENTRUN_RUNNER_IDLE_TIMEOUT_MS); if (process.env.AGENTRUN_RUNNER_POLL_INTERVAL_MS) options.pollIntervalMs = Number(process.env.AGENTRUN_RUNNER_POLL_INTERVAL_MS); +if (process.env.AGENTRUN_RUNNER_CLAIM_RETRY_TIMEOUT_MS) options.claimRetryTimeoutMs = Number(process.env.AGENTRUN_RUNNER_CLAIM_RETRY_TIMEOUT_MS); +if (process.env.AGENTRUN_RUNNER_CLAIM_RETRY_INTERVAL_MS) options.claimRetryIntervalMs = Number(process.env.AGENTRUN_RUNNER_CLAIM_RETRY_INTERVAL_MS); if (process.env.AGENTRUN_RUNNER_ONE_SHOT === "1" || process.env.AGENTRUN_RUNNER_ONE_SHOT === "true") options.oneShot = true; try { const result = await runOnce(options); diff --git a/src/runner/run-once.ts b/src/runner/run-once.ts index 887628b..0833d09 100644 --- a/src/runner/run-once.ts +++ b/src/runner/run-once.ts @@ -19,6 +19,8 @@ export interface RunnerOnceOptions extends BackendAdapterOptions { logPath?: string; idleTimeoutMs?: number; pollIntervalMs?: number; + claimRetryTimeoutMs?: number; + claimRetryIntervalMs?: number; oneShot?: boolean; } @@ -50,7 +52,7 @@ export async function runOnce(options: RunnerOnceOptions): Promise { }) as RunnerRecord; let claimed: RunRecord; try { - claimed = await api.claim(options.runId, runner.id, leaseMs); + claimed = await claimRunWithLeaseRecovery(api, options, runner, attemptId, leaseMs); await api.heartbeat(options.runId, runner.id, leaseMs); } catch (error) { const failureKind = failureKindFromError(error); @@ -355,6 +357,77 @@ async function appendBestEffort(api: RunnerManagerApi, runId: string, event: Bac } } +async function claimRunWithLeaseRecovery(api: RunnerManagerApi, options: RunnerOnceOptions, runner: RunnerRecord, attemptId: string, leaseMs: number): Promise { + const startedAt = Date.now(); + const timeoutMs = normalizeClaimRetryTimeoutMs(options.claimRetryTimeoutMs, leaseMs); + const intervalMs = normalizeClaimRetryIntervalMs(options.claimRetryIntervalMs); + let waitingEventWritten = false; + let lastError: unknown = null; + + while (Date.now() - startedAt <= timeoutMs) { + try { + const claimed = await api.claim(options.runId, runner.id, leaseMs); + if (waitingEventWritten) { + await appendBestEffort(api, options.runId, { + type: "backend_status", + payload: { phase: "runner-claim-recovered", attemptId, runnerId: runner.id, waitedMs: Date.now() - startedAt }, + }); + } + return claimed; + } catch (error) { + if (failureKindFromError(error) !== "runner-lease-conflict") throw error; + lastError = error; + const run = await getRunBestEffort(api, options.runId); + if (!waitingEventWritten) { + waitingEventWritten = true; + await appendBestEffort(api, options.runId, { + type: "backend_status", + payload: { + phase: "runner-claim-waiting-for-stale-lease", + attemptId, + runnerId: runner.id, + claimedBy: run?.claimedBy ?? null, + leaseExpiresAt: run?.leaseExpiresAt ?? null, + retryTimeoutMs: timeoutMs, + }, + }); + } + const remainingMs = timeoutMs - (Date.now() - startedAt); + if (remainingMs <= 0) break; + await sleep(Math.min(remainingMs, claimRetryDelayMs(run, intervalMs))); + } + } + + throw lastError ?? new AgentRunError("runner-lease-conflict", `run ${options.runId} could not be claimed before retry timeout`, { httpStatus: 409 }); +} + +async function getRunBestEffort(api: RunnerManagerApi, runId: string): Promise { + try { + return await api.getRun(runId); + } catch { + return null; + } +} + +function claimRetryDelayMs(run: RunRecord | null, intervalMs: number): number { + const leaseExpiresAt = run?.leaseExpiresAt ? Date.parse(run.leaseExpiresAt) : NaN; + if (Number.isFinite(leaseExpiresAt)) { + const untilExpiryMs = leaseExpiresAt - Date.now() + 100; + if (untilExpiryMs > 0) return Math.max(25, Math.min(intervalMs, untilExpiryMs)); + } + return intervalMs; +} + +function normalizeClaimRetryTimeoutMs(value: number | undefined, leaseMs: number): number { + if (Number.isFinite(value ?? NaN)) return Math.max(0, Math.floor(value!)); + return Math.max(leaseMs + Math.max(5_000, Math.floor(leaseMs / 3)), leaseMs); +} + +function normalizeClaimRetryIntervalMs(value: number | undefined): number { + if (!Number.isFinite(value ?? NaN)) return 5_000; + return Math.max(25, Math.min(30_000, Math.floor(value!))); +} + function annotateCommandEvent(event: BackendEvent, commandId: string, attemptId: string, runnerId: string): BackendEvent { return { ...event, payload: { ...event.payload, commandId, attemptId, runnerId } }; } diff --git a/src/selftest/cases/30-codex-stdio.ts b/src/selftest/cases/30-codex-stdio.ts index 08b9966..86df720 100644 --- a/src/selftest/cases/30-codex-stdio.ts +++ b/src/selftest/cases/30-codex-stdio.ts @@ -27,6 +27,8 @@ const selfTest: SelfTestCase = async (context) => { const finalCommand = await client.get(`/api/v1/runs/${happy.runId}/commands/${happy.commandId}`) as { state?: string }; assert.equal(finalCommand.state, "completed"); + await runLeaseConflictRecoveryCase({ client, managerUrl: server.baseUrl, context }); + const projectedHome = path.join(context.tmp, "runtime-codex-home"); const projected = await createRunWithCommand(client, { workspace: context.workspace, codexHome: projectedHome }, "hello projected", "selftest-projected-codex-home", 15_000); const projectedResult = await runOnce({ managerUrl: server.baseUrl, runId: projected.runId, codexCommand: context.fakeCodexCommand, codexArgs: context.fakeCodexArgs, codexHome: projectedHome, env: { CODEX_HOME: projectedHome, AGENTRUN_CODEX_SECRET_HOME: context.codexHome }, oneShot: true }); @@ -215,12 +217,47 @@ const selfTest: SelfTestCase = async (context) => { await runSessionStorageSubdirCase({ client, managerUrl: server.baseUrl, context }); await runSessionStorageNoSecretLeakCase({ client, managerUrl: server.baseUrl, context }); - return { name: "codex-stdio", tests: ["runner-lease-heartbeat", "codex-stdio-fake-turn", "codex-stdio-projected-writable-home", "codex-stdio-deepseek-profile-fake-turn", "codex-stdio-minimax-m3-profile-fake-turn", "codex-stdio-deepseek-missing-secret-no-fallback", "codex-stdio-minimax-m3-missing-secret-no-fallback", "codex-stdio-config-model-authoritative", "codex-stdio-explicit-model-forwarded", "codex-stdio-final-agent-message-only", "codex-stdio-web-search-progress", "codex-stdio-stale-thread-resume-failed", "codex-stdio-live-tool-events", "codex-stdio-noisy-reasoning-suppression", "codex-stdio-missing-turn-result", "codex-stdio-provider-auth-failed", "codex-stdio-provider-rate-limited", "codex-stdio-provider-invalid-tool-call", "codex-stdio-provider-503-rpc-error", "codex-stdio-provider-503-terminal", "codex-stdio-provider-503-retry-event", "codex-stdio-invalid-json", "codex-stdio-timeout", "codex-stdio-idle-timeout-progress-refresh", "codex-stdio-command-failure-keeps-run-open", "codex-stdio-secret-unavailable", "codex-stdio-spawn-failure"] }; + return { name: "codex-stdio", tests: ["runner-lease-heartbeat", "runner-lease-conflict-recovery", "codex-stdio-fake-turn", "codex-stdio-projected-writable-home", "codex-stdio-deepseek-profile-fake-turn", "codex-stdio-minimax-m3-profile-fake-turn", "codex-stdio-deepseek-missing-secret-no-fallback", "codex-stdio-minimax-m3-missing-secret-no-fallback", "codex-stdio-config-model-authoritative", "codex-stdio-explicit-model-forwarded", "codex-stdio-final-agent-message-only", "codex-stdio-web-search-progress", "codex-stdio-stale-thread-resume-failed", "codex-stdio-live-tool-events", "codex-stdio-noisy-reasoning-suppression", "codex-stdio-missing-turn-result", "codex-stdio-provider-auth-failed", "codex-stdio-provider-rate-limited", "codex-stdio-provider-invalid-tool-call", "codex-stdio-provider-503-rpc-error", "codex-stdio-provider-503-terminal", "codex-stdio-provider-503-retry-event", "codex-stdio-invalid-json", "codex-stdio-timeout", "codex-stdio-idle-timeout-progress-refresh", "codex-stdio-command-failure-keeps-run-open", "codex-stdio-secret-unavailable", "codex-stdio-spawn-failure"] }; } finally { await new Promise((resolve) => server.server.close(() => resolve())); } }; +async function runLeaseConflictRecoveryCase(options: { client: ManagerClient; managerUrl: string; context: SelfTestContext }): Promise { + const item = await createRunWithCommand(options.client, options.context, "claim after stale lease", "selftest-runner-lease-conflict-recovery", 15_000); + const staleRunner = await options.client.post("/api/v1/runners/register", { + runId: item.runId, + attemptId: "attempt_stale_claim", + backendProfile: "codex", + placement: "kubernetes-job", + sourceCommit: "self-test", + id: "runner_stale_claim", + }) as JsonRecord; + await options.client.post(`/api/v1/runs/${item.runId}/claim`, { runnerId: staleRunner.id, leaseMs: 300 }); + const result = await runOnce({ + managerUrl: options.managerUrl, + runId: item.runId, + commandId: item.commandId, + runnerId: "runner_recovered_claim", + attemptId: "attempt_recovered_claim", + leaseMs: 1_000, + claimRetryTimeoutMs: 2_000, + claimRetryIntervalMs: 50, + codexCommand: options.context.fakeCodexCommand, + codexArgs: options.context.fakeCodexArgs, + codexHome: options.context.codexHome, + env: { CODEX_HOME: options.context.codexHome }, + oneShot: true, + }) as JsonRecord; + assert.equal(result.terminalStatus, "completed", "replacement runner should claim after stale lease expiry and finish the pending command"); + const run = await options.client.get(`/api/v1/runs/${item.runId}`) as JsonRecord; + assert.equal(run.claimedBy, "runner_recovered_claim"); + const events = await options.client.get(`/api/v1/runs/${item.runId}/events?afterSeq=0&limit=100`) as { items?: Array<{ type: string; payload: unknown }> }; + assert.equal(events.items?.some((event) => event.type === "backend_status" && eventPayload(event).phase === "runner-claim-waiting-for-stale-lease"), true, "lease conflict recovery should be visible while waiting"); + assert.equal(events.items?.some((event) => event.type === "backend_status" && eventPayload(event).phase === "runner-claim-recovered"), true, "lease conflict recovery should be visible after claim succeeds"); + assertNoSecretLeak({ result, run, events }); +} + async function runSlowProgressIdleCase(options: { client: ManagerClient; managerUrl: string; context: SelfTestContext }): Promise { const item = await createRunWithCommand(options.client, options.context, "slow progress before terminal", "selftest-slow-progress-idle-refresh", 60); const result = await runOnce({