fix: wait for stale runner lease before replacement claim
This commit is contained in:
@@ -34,6 +34,8 @@ if (process.env.AGENTRUN_CODEX_ARGS) options.codexArgs = JSON.parse(process.env.
|
||||
if (process.env.CODEX_HOME) options.codexHome = process.env.CODEX_HOME;
|
||||
if (process.env.AGENTRUN_RUNNER_IDLE_TIMEOUT_MS) options.idleTimeoutMs = Number(process.env.AGENTRUN_RUNNER_IDLE_TIMEOUT_MS);
|
||||
if (process.env.AGENTRUN_RUNNER_POLL_INTERVAL_MS) options.pollIntervalMs = Number(process.env.AGENTRUN_RUNNER_POLL_INTERVAL_MS);
|
||||
if (process.env.AGENTRUN_RUNNER_CLAIM_RETRY_TIMEOUT_MS) options.claimRetryTimeoutMs = Number(process.env.AGENTRUN_RUNNER_CLAIM_RETRY_TIMEOUT_MS);
|
||||
if (process.env.AGENTRUN_RUNNER_CLAIM_RETRY_INTERVAL_MS) options.claimRetryIntervalMs = Number(process.env.AGENTRUN_RUNNER_CLAIM_RETRY_INTERVAL_MS);
|
||||
if (process.env.AGENTRUN_RUNNER_ONE_SHOT === "1" || process.env.AGENTRUN_RUNNER_ONE_SHOT === "true") options.oneShot = true;
|
||||
try {
|
||||
const result = await runOnce(options);
|
||||
|
||||
+74
-1
@@ -19,6 +19,8 @@ export interface RunnerOnceOptions extends BackendAdapterOptions {
|
||||
logPath?: string;
|
||||
idleTimeoutMs?: number;
|
||||
pollIntervalMs?: number;
|
||||
claimRetryTimeoutMs?: number;
|
||||
claimRetryIntervalMs?: number;
|
||||
oneShot?: boolean;
|
||||
}
|
||||
|
||||
@@ -50,7 +52,7 @@ export async function runOnce(options: RunnerOnceOptions): Promise<JsonRecord> {
|
||||
}) as RunnerRecord;
|
||||
let claimed: RunRecord;
|
||||
try {
|
||||
claimed = await api.claim(options.runId, runner.id, leaseMs);
|
||||
claimed = await claimRunWithLeaseRecovery(api, options, runner, attemptId, leaseMs);
|
||||
await api.heartbeat(options.runId, runner.id, leaseMs);
|
||||
} catch (error) {
|
||||
const failureKind = failureKindFromError(error);
|
||||
@@ -355,6 +357,77 @@ async function appendBestEffort(api: RunnerManagerApi, runId: string, event: Bac
|
||||
}
|
||||
}
|
||||
|
||||
async function claimRunWithLeaseRecovery(api: RunnerManagerApi, options: RunnerOnceOptions, runner: RunnerRecord, attemptId: string, leaseMs: number): Promise<RunRecord> {
|
||||
const startedAt = Date.now();
|
||||
const timeoutMs = normalizeClaimRetryTimeoutMs(options.claimRetryTimeoutMs, leaseMs);
|
||||
const intervalMs = normalizeClaimRetryIntervalMs(options.claimRetryIntervalMs);
|
||||
let waitingEventWritten = false;
|
||||
let lastError: unknown = null;
|
||||
|
||||
while (Date.now() - startedAt <= timeoutMs) {
|
||||
try {
|
||||
const claimed = await api.claim(options.runId, runner.id, leaseMs);
|
||||
if (waitingEventWritten) {
|
||||
await appendBestEffort(api, options.runId, {
|
||||
type: "backend_status",
|
||||
payload: { phase: "runner-claim-recovered", attemptId, runnerId: runner.id, waitedMs: Date.now() - startedAt },
|
||||
});
|
||||
}
|
||||
return claimed;
|
||||
} catch (error) {
|
||||
if (failureKindFromError(error) !== "runner-lease-conflict") throw error;
|
||||
lastError = error;
|
||||
const run = await getRunBestEffort(api, options.runId);
|
||||
if (!waitingEventWritten) {
|
||||
waitingEventWritten = true;
|
||||
await appendBestEffort(api, options.runId, {
|
||||
type: "backend_status",
|
||||
payload: {
|
||||
phase: "runner-claim-waiting-for-stale-lease",
|
||||
attemptId,
|
||||
runnerId: runner.id,
|
||||
claimedBy: run?.claimedBy ?? null,
|
||||
leaseExpiresAt: run?.leaseExpiresAt ?? null,
|
||||
retryTimeoutMs: timeoutMs,
|
||||
},
|
||||
});
|
||||
}
|
||||
const remainingMs = timeoutMs - (Date.now() - startedAt);
|
||||
if (remainingMs <= 0) break;
|
||||
await sleep(Math.min(remainingMs, claimRetryDelayMs(run, intervalMs)));
|
||||
}
|
||||
}
|
||||
|
||||
throw lastError ?? new AgentRunError("runner-lease-conflict", `run ${options.runId} could not be claimed before retry timeout`, { httpStatus: 409 });
|
||||
}
|
||||
|
||||
async function getRunBestEffort(api: RunnerManagerApi, runId: string): Promise<RunRecord | null> {
|
||||
try {
|
||||
return await api.getRun(runId);
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
function claimRetryDelayMs(run: RunRecord | null, intervalMs: number): number {
|
||||
const leaseExpiresAt = run?.leaseExpiresAt ? Date.parse(run.leaseExpiresAt) : NaN;
|
||||
if (Number.isFinite(leaseExpiresAt)) {
|
||||
const untilExpiryMs = leaseExpiresAt - Date.now() + 100;
|
||||
if (untilExpiryMs > 0) return Math.max(25, Math.min(intervalMs, untilExpiryMs));
|
||||
}
|
||||
return intervalMs;
|
||||
}
|
||||
|
||||
function normalizeClaimRetryTimeoutMs(value: number | undefined, leaseMs: number): number {
|
||||
if (Number.isFinite(value ?? NaN)) return Math.max(0, Math.floor(value!));
|
||||
return Math.max(leaseMs + Math.max(5_000, Math.floor(leaseMs / 3)), leaseMs);
|
||||
}
|
||||
|
||||
function normalizeClaimRetryIntervalMs(value: number | undefined): number {
|
||||
if (!Number.isFinite(value ?? NaN)) return 5_000;
|
||||
return Math.max(25, Math.min(30_000, Math.floor(value!)));
|
||||
}
|
||||
|
||||
function annotateCommandEvent(event: BackendEvent, commandId: string, attemptId: string, runnerId: string): BackendEvent {
|
||||
return { ...event, payload: { ...event.payload, commandId, attemptId, runnerId } };
|
||||
}
|
||||
|
||||
@@ -27,6 +27,8 @@ const selfTest: SelfTestCase = async (context) => {
|
||||
const finalCommand = await client.get(`/api/v1/runs/${happy.runId}/commands/${happy.commandId}`) as { state?: string };
|
||||
assert.equal(finalCommand.state, "completed");
|
||||
|
||||
await runLeaseConflictRecoveryCase({ client, managerUrl: server.baseUrl, context });
|
||||
|
||||
const projectedHome = path.join(context.tmp, "runtime-codex-home");
|
||||
const projected = await createRunWithCommand(client, { workspace: context.workspace, codexHome: projectedHome }, "hello projected", "selftest-projected-codex-home", 15_000);
|
||||
const projectedResult = await runOnce({ managerUrl: server.baseUrl, runId: projected.runId, codexCommand: context.fakeCodexCommand, codexArgs: context.fakeCodexArgs, codexHome: projectedHome, env: { CODEX_HOME: projectedHome, AGENTRUN_CODEX_SECRET_HOME: context.codexHome }, oneShot: true });
|
||||
@@ -215,12 +217,47 @@ const selfTest: SelfTestCase = async (context) => {
|
||||
await runSessionStorageSubdirCase({ client, managerUrl: server.baseUrl, context });
|
||||
await runSessionStorageNoSecretLeakCase({ client, managerUrl: server.baseUrl, context });
|
||||
|
||||
return { name: "codex-stdio", tests: ["runner-lease-heartbeat", "codex-stdio-fake-turn", "codex-stdio-projected-writable-home", "codex-stdio-deepseek-profile-fake-turn", "codex-stdio-minimax-m3-profile-fake-turn", "codex-stdio-deepseek-missing-secret-no-fallback", "codex-stdio-minimax-m3-missing-secret-no-fallback", "codex-stdio-config-model-authoritative", "codex-stdio-explicit-model-forwarded", "codex-stdio-final-agent-message-only", "codex-stdio-web-search-progress", "codex-stdio-stale-thread-resume-failed", "codex-stdio-live-tool-events", "codex-stdio-noisy-reasoning-suppression", "codex-stdio-missing-turn-result", "codex-stdio-provider-auth-failed", "codex-stdio-provider-rate-limited", "codex-stdio-provider-invalid-tool-call", "codex-stdio-provider-503-rpc-error", "codex-stdio-provider-503-terminal", "codex-stdio-provider-503-retry-event", "codex-stdio-invalid-json", "codex-stdio-timeout", "codex-stdio-idle-timeout-progress-refresh", "codex-stdio-command-failure-keeps-run-open", "codex-stdio-secret-unavailable", "codex-stdio-spawn-failure"] };
|
||||
return { name: "codex-stdio", tests: ["runner-lease-heartbeat", "runner-lease-conflict-recovery", "codex-stdio-fake-turn", "codex-stdio-projected-writable-home", "codex-stdio-deepseek-profile-fake-turn", "codex-stdio-minimax-m3-profile-fake-turn", "codex-stdio-deepseek-missing-secret-no-fallback", "codex-stdio-minimax-m3-missing-secret-no-fallback", "codex-stdio-config-model-authoritative", "codex-stdio-explicit-model-forwarded", "codex-stdio-final-agent-message-only", "codex-stdio-web-search-progress", "codex-stdio-stale-thread-resume-failed", "codex-stdio-live-tool-events", "codex-stdio-noisy-reasoning-suppression", "codex-stdio-missing-turn-result", "codex-stdio-provider-auth-failed", "codex-stdio-provider-rate-limited", "codex-stdio-provider-invalid-tool-call", "codex-stdio-provider-503-rpc-error", "codex-stdio-provider-503-terminal", "codex-stdio-provider-503-retry-event", "codex-stdio-invalid-json", "codex-stdio-timeout", "codex-stdio-idle-timeout-progress-refresh", "codex-stdio-command-failure-keeps-run-open", "codex-stdio-secret-unavailable", "codex-stdio-spawn-failure"] };
|
||||
} finally {
|
||||
await new Promise<void>((resolve) => server.server.close(() => resolve()));
|
||||
}
|
||||
};
|
||||
|
||||
async function runLeaseConflictRecoveryCase(options: { client: ManagerClient; managerUrl: string; context: SelfTestContext }): Promise<void> {
|
||||
const item = await createRunWithCommand(options.client, options.context, "claim after stale lease", "selftest-runner-lease-conflict-recovery", 15_000);
|
||||
const staleRunner = await options.client.post("/api/v1/runners/register", {
|
||||
runId: item.runId,
|
||||
attemptId: "attempt_stale_claim",
|
||||
backendProfile: "codex",
|
||||
placement: "kubernetes-job",
|
||||
sourceCommit: "self-test",
|
||||
id: "runner_stale_claim",
|
||||
}) as JsonRecord;
|
||||
await options.client.post(`/api/v1/runs/${item.runId}/claim`, { runnerId: staleRunner.id, leaseMs: 300 });
|
||||
const result = await runOnce({
|
||||
managerUrl: options.managerUrl,
|
||||
runId: item.runId,
|
||||
commandId: item.commandId,
|
||||
runnerId: "runner_recovered_claim",
|
||||
attemptId: "attempt_recovered_claim",
|
||||
leaseMs: 1_000,
|
||||
claimRetryTimeoutMs: 2_000,
|
||||
claimRetryIntervalMs: 50,
|
||||
codexCommand: options.context.fakeCodexCommand,
|
||||
codexArgs: options.context.fakeCodexArgs,
|
||||
codexHome: options.context.codexHome,
|
||||
env: { CODEX_HOME: options.context.codexHome },
|
||||
oneShot: true,
|
||||
}) as JsonRecord;
|
||||
assert.equal(result.terminalStatus, "completed", "replacement runner should claim after stale lease expiry and finish the pending command");
|
||||
const run = await options.client.get(`/api/v1/runs/${item.runId}`) as JsonRecord;
|
||||
assert.equal(run.claimedBy, "runner_recovered_claim");
|
||||
const events = await options.client.get(`/api/v1/runs/${item.runId}/events?afterSeq=0&limit=100`) as { items?: Array<{ type: string; payload: unknown }> };
|
||||
assert.equal(events.items?.some((event) => event.type === "backend_status" && eventPayload(event).phase === "runner-claim-waiting-for-stale-lease"), true, "lease conflict recovery should be visible while waiting");
|
||||
assert.equal(events.items?.some((event) => event.type === "backend_status" && eventPayload(event).phase === "runner-claim-recovered"), true, "lease conflict recovery should be visible after claim succeeds");
|
||||
assertNoSecretLeak({ result, run, events });
|
||||
}
|
||||
|
||||
async function runSlowProgressIdleCase(options: { client: ManagerClient; managerUrl: string; context: SelfTestContext }): Promise<void> {
|
||||
const item = await createRunWithCommand(options.client, options.context, "slow progress before terminal", "selftest-slow-progress-idle-refresh", 60);
|
||||
const result = await runOnce({
|
||||
|
||||
Reference in New Issue
Block a user