fix: retry transient backend timeouts

This commit is contained in:
AgentRun Codex
2026-06-22 07:56:40 +08:00
parent ccfbf2a866
commit a393e49ed0
5 changed files with 129 additions and 5 deletions
+18
View File
@@ -50,6 +50,9 @@ export interface RunnerJobDefaults {
lane?: string;
runnerIdleTimeoutMs?: number;
missingTerminalAfterToolTimeoutMs?: number;
backendRetryMaxAttempts?: number;
backendRetryInitialBackoffMs?: number;
backendRetryMaxBackoffMs?: number;
kubectlCommand?: string;
unideskSshEndpointEnv?: JsonRecord;
retention?: RunnerRetentionOptions;
@@ -66,6 +69,9 @@ export interface CreateRunnerJobInput extends JsonRecord {
serviceAccountName?: string;
runnerIdleTimeoutMs?: number;
missingTerminalAfterToolTimeoutMs?: number;
backendRetryMaxAttempts?: number;
backendRetryInitialBackoffMs?: number;
backendRetryMaxBackoffMs?: number;
idempotencyKey?: string;
imageRef?: JsonRecord;
transientEnv?: JsonRecord[];
@@ -98,6 +104,9 @@ export async function createKubernetesRunnerJob(options: { store: AgentRunStore;
const runnerId = optionalString(options.input.runnerId);
const runnerIdleTimeoutMs = optionalPositiveInteger(options.input.runnerIdleTimeoutMs, "runnerIdleTimeoutMs") ?? options.defaults.runnerIdleTimeoutMs;
const missingTerminalAfterToolTimeoutMs = optionalPositiveInteger(options.input.missingTerminalAfterToolTimeoutMs, "missingTerminalAfterToolTimeoutMs") ?? options.defaults.missingTerminalAfterToolTimeoutMs;
const backendRetryMaxAttempts = optionalPositiveInteger(options.input.backendRetryMaxAttempts, "backendRetryMaxAttempts") ?? options.defaults.backendRetryMaxAttempts;
const backendRetryInitialBackoffMs = optionalPositiveInteger(options.input.backendRetryInitialBackoffMs, "backendRetryInitialBackoffMs") ?? options.defaults.backendRetryInitialBackoffMs;
const backendRetryMaxBackoffMs = optionalPositiveInteger(options.input.backendRetryMaxBackoffMs, "backendRetryMaxBackoffMs") ?? options.defaults.backendRetryMaxBackoffMs;
const transientEnvSecretName = transientEnv.length > 0 ? transientEnvSecretNameForRun(run.id, commandId, attemptId, jobNamePrefix) : null;
const renderTransientEnv = transientEnvSecretName ? transientEnvWithSecretRefs(transientEnv, transientEnvSecretName) : transientEnv;
const normalizedPayload = {
@@ -112,6 +121,9 @@ export async function createKubernetesRunnerJob(options: { store: AgentRunStore;
runnerId: optionalString(options.input.runnerId) ?? null,
runnerIdleTimeoutMs: runnerIdleTimeoutMs ?? null,
missingTerminalAfterToolTimeoutMs: missingTerminalAfterToolTimeoutMs ?? null,
backendRetryMaxAttempts: backendRetryMaxAttempts ?? null,
backendRetryInitialBackoffMs: backendRetryInitialBackoffMs ?? null,
backendRetryMaxBackoffMs: backendRetryMaxBackoffMs ?? null,
transientEnv: transientEnv.map((item) => ({ name: item.name, valueHash: stableHash(item.value), sensitive: true })),
};
const payloadHash = stableHash(normalizedPayload);
@@ -174,6 +186,9 @@ export async function createKubernetesRunnerJob(options: { store: AgentRunStore;
transientEnv: renderTransientEnv,
...(runnerIdleTimeoutMs !== undefined ? { runnerIdleTimeoutMs } : {}),
...(missingTerminalAfterToolTimeoutMs !== undefined ? { missingTerminalAfterToolTimeoutMs } : {}),
...(backendRetryMaxAttempts !== undefined ? { backendRetryMaxAttempts } : {}),
...(backendRetryInitialBackoffMs !== undefined ? { backendRetryInitialBackoffMs } : {}),
...(backendRetryMaxBackoffMs !== undefined ? { backendRetryMaxBackoffMs } : {}),
...(serviceAccountName ? { serviceAccountName } : {}),
...(jobNamePrefix ? { jobNamePrefix } : {}),
...(lane ? { lane } : {}),
@@ -244,6 +259,9 @@ export async function createKubernetesRunnerJob(options: { store: AgentRunStore;
ttlSecondsAfterFinished: render.ttlSecondsAfterFinished,
ttlPolicy: render.ttlPolicy,
runnerIdleTimeoutMs: render.runnerIdleTimeoutMs,
backendRetryMaxAttempts: render.backendRetryMaxAttempts,
backendRetryInitialBackoffMs: render.backendRetryInitialBackoffMs,
backendRetryMaxBackoffMs: render.backendRetryMaxBackoffMs,
preCreateCleanup: preCreateRetention,
},
pollActions: [
+7
View File
@@ -57,6 +57,9 @@ function runnerJobDefaultsForRequest(defaults: ManagerServerOptions["runnerJobDe
lane,
...(defaults?.runnerIdleTimeoutMs !== undefined ? { runnerIdleTimeoutMs: defaults.runnerIdleTimeoutMs } : optionalPositiveIntegerRecord("runnerIdleTimeoutMs", process.env.AGENTRUN_RUNNER_IDLE_TIMEOUT_MS)),
...(defaults?.missingTerminalAfterToolTimeoutMs !== undefined ? { missingTerminalAfterToolTimeoutMs: defaults.missingTerminalAfterToolTimeoutMs } : optionalPositiveIntegerRecord("missingTerminalAfterToolTimeoutMs", process.env.AGENTRUN_RUNNER_MISSING_TERMINAL_AFTER_TOOL_TIMEOUT_MS)),
...(defaults?.backendRetryMaxAttempts !== undefined ? { backendRetryMaxAttempts: defaults.backendRetryMaxAttempts } : optionalPositiveIntegerRecord("backendRetryMaxAttempts", process.env.AGENTRUN_BACKEND_RETRY_MAX_ATTEMPTS)),
...(defaults?.backendRetryInitialBackoffMs !== undefined ? { backendRetryInitialBackoffMs: defaults.backendRetryInitialBackoffMs } : optionalPositiveIntegerRecord("backendRetryInitialBackoffMs", process.env.AGENTRUN_BACKEND_RETRY_INITIAL_BACKOFF_MS)),
...(defaults?.backendRetryMaxBackoffMs !== undefined ? { backendRetryMaxBackoffMs: defaults.backendRetryMaxBackoffMs } : optionalPositiveIntegerRecord("backendRetryMaxBackoffMs", process.env.AGENTRUN_BACKEND_RETRY_MAX_BACKOFF_MS)),
...(defaults?.kubectlCommand ? { kubectlCommand: defaults.kubectlCommand } : {}),
...(defaults?.unideskSshEndpointEnv ? { unideskSshEndpointEnv: defaults.unideskSshEndpointEnv } : {}),
...(retention ? { retention } : {}),
@@ -111,6 +114,10 @@ export interface ManagerServerOptions {
jobNamePrefix?: string;
lane?: string;
runnerIdleTimeoutMs?: number;
missingTerminalAfterToolTimeoutMs?: number;
backendRetryMaxAttempts?: number;
backendRetryInitialBackoffMs?: number;
backendRetryMaxBackoffMs?: number;
kubectlCommand?: string;
unideskSshEndpointEnv?: JsonRecord;
retention?: RunnerRetentionOptions;
+25 -4
View File
@@ -59,6 +59,9 @@ export interface RunnerJobRenderOptions {
ttlSecondsAfterFinished?: number;
runnerIdleTimeoutMs?: number;
missingTerminalAfterToolTimeoutMs?: number;
backendRetryMaxAttempts?: number;
backendRetryInitialBackoffMs?: number;
backendRetryMaxBackoffMs?: number;
transientEnv?: RunnerTransientEnv[];
sessionPvc?: RunnerSessionPvcOptions;
dryRun?: boolean;
@@ -153,7 +156,7 @@ export function renderRunnerJobDryRun(options: RunnerJobRenderOptions): JsonReco
};
}
export function renderRunnerJobManifest(options: RunnerJobRenderOptions): { manifest: JsonRecord; namespace: string; jobName: string; runnerJobId: string; runnerId: string; attemptId: string; sourceCommit: string; serviceAccountName: string; secretRefs: CredentialProjection[]; toolCredentials: ToolCredentialProjection[]; warnings: string[]; ttlSecondsAfterFinished: number; ttlPolicy: JsonRecord; runnerIdleTimeoutMs: number; missingTerminalAfterToolTimeoutMs: number } {
export function renderRunnerJobManifest(options: RunnerJobRenderOptions): { manifest: JsonRecord; namespace: string; jobName: string; runnerJobId: string; runnerId: string; attemptId: string; sourceCommit: string; serviceAccountName: string; secretRefs: CredentialProjection[]; toolCredentials: ToolCredentialProjection[]; warnings: string[]; ttlSecondsAfterFinished: number; ttlPolicy: JsonRecord; runnerIdleTimeoutMs: number; missingTerminalAfterToolTimeoutMs: number; backendRetryMaxAttempts: number; backendRetryInitialBackoffMs: number; backendRetryMaxBackoffMs: number } {
const namespace = options.namespace ?? "agentrun-v01";
const attemptId = options.attemptId ?? `attempt_${Date.now().toString(36)}`;
const runnerId = options.runnerId ?? `runner_${shortHash(`${options.run.id}:${attemptId}:${options.commandId}`)}`;
@@ -167,12 +170,15 @@ export function renderRunnerJobManifest(options: RunnerJobRenderOptions): { mani
const ttlPolicy = terminalArtifactTtlPolicy(ttlSecondsAfterFinished);
const runnerIdleTimeoutMs = normalizeRunnerIdleTimeoutMs(options.runnerIdleTimeoutMs);
const missingTerminalAfterToolTimeoutMs = normalizeMissingTerminalAfterToolTimeoutMs(options.missingTerminalAfterToolTimeoutMs, runnerIdleTimeoutMs);
const backendRetryMaxAttempts = normalizeBackendRetryMaxAttempts(options.backendRetryMaxAttempts);
const backendRetryInitialBackoffMs = normalizeBackendRetryBackoffMs(options.backendRetryInitialBackoffMs, "backendRetryInitialBackoffMs", 1000);
const backendRetryMaxBackoffMs = normalizeBackendRetryBackoffMs(options.backendRetryMaxBackoffMs, "backendRetryMaxBackoffMs", 30000);
const jobName = `${jobNamePrefix}-${shortDnsHash(options.run.id, attemptId)}`;
const secretRefs = credentialProjections(options.run, namespace);
const toolCredentials = toolCredentialProjections(options.run, namespace);
const sessionPvc = options.sessionPvc;
if (secretRefs.length === 0) warnings.push("run executionPolicy.secretScope 未声明 provider SecretRefrunner 将按 secret-unavailable 上报,而不会降级直连外部凭据");
const env = runnerEnv(options, { namespace, jobName, runnerJobId, runnerId, attemptId, sourceCommit, secretRefs, toolCredentials, sessionPvc, runnerIdleTimeoutMs, missingTerminalAfterToolTimeoutMs });
const env = runnerEnv(options, { namespace, jobName, runnerJobId, runnerId, attemptId, sourceCommit, secretRefs, toolCredentials, sessionPvc, runnerIdleTimeoutMs, missingTerminalAfterToolTimeoutMs, backendRetryMaxAttempts, backendRetryInitialBackoffMs, backendRetryMaxBackoffMs });
const manifest: JsonRecord = {
apiVersion: "batch/v1",
kind: "Job",
@@ -239,10 +245,10 @@ export function renderRunnerJobManifest(options: RunnerJobRenderOptions): { mani
},
},
};
return { manifest, namespace, jobName, runnerJobId, runnerId, attemptId, sourceCommit, serviceAccountName, secretRefs, toolCredentials, warnings, ttlSecondsAfterFinished, ttlPolicy, runnerIdleTimeoutMs, missingTerminalAfterToolTimeoutMs };
return { manifest, namespace, jobName, runnerJobId, runnerId, attemptId, sourceCommit, serviceAccountName, secretRefs, toolCredentials, warnings, ttlSecondsAfterFinished, ttlPolicy, runnerIdleTimeoutMs, missingTerminalAfterToolTimeoutMs, backendRetryMaxAttempts, backendRetryInitialBackoffMs, backendRetryMaxBackoffMs };
}
function runnerEnv(options: RunnerJobRenderOptions, context: { namespace: string; jobName: string; runnerJobId: string; runnerId: string; attemptId: string; sourceCommit: string; secretRefs: CredentialProjection[]; toolCredentials: ToolCredentialProjection[]; sessionPvc: RunnerSessionPvcOptions | undefined; runnerIdleTimeoutMs: number; missingTerminalAfterToolTimeoutMs: number }): JsonRecord[] {
function runnerEnv(options: RunnerJobRenderOptions, context: { namespace: string; jobName: string; runnerJobId: string; runnerId: string; attemptId: string; sourceCommit: string; secretRefs: CredentialProjection[]; toolCredentials: ToolCredentialProjection[]; sessionPvc: RunnerSessionPvcOptions | undefined; runnerIdleTimeoutMs: number; missingTerminalAfterToolTimeoutMs: number; backendRetryMaxAttempts: number; backendRetryInitialBackoffMs: number; backendRetryMaxBackoffMs: number }): JsonRecord[] {
const selectedSecret = context.secretRefs.find((item) => item.profile === options.run.backendProfile);
const codexHome = selectedSecret?.runtimeMountPath ?? defaultRuntimeHome(options.run.backendProfile);
const bootRepoUrl = optionalString(options.bootRepoUrl) ?? defaultBootRepoUrl;
@@ -273,6 +279,9 @@ function runnerEnv(options: RunnerJobRenderOptions, context: { namespace: string
{ name: "AGENTRUN_PROJECT_DEPENDENCY_POLICY", value: "explicit-cache-or-derived-image-only" },
{ name: "AGENTRUN_RUNNER_IDLE_TIMEOUT_MS", value: String(context.runnerIdleTimeoutMs) },
{ name: "AGENTRUN_CODEX_MISSING_TERMINAL_AFTER_TOOL_TIMEOUT_MS", value: String(context.missingTerminalAfterToolTimeoutMs) },
{ name: "AGENTRUN_BACKEND_RETRY_MAX_ATTEMPTS", value: String(context.backendRetryMaxAttempts) },
{ name: "AGENTRUN_BACKEND_RETRY_INITIAL_BACKOFF_MS", value: String(context.backendRetryInitialBackoffMs) },
{ name: "AGENTRUN_BACKEND_RETRY_MAX_BACKOFF_MS", value: String(context.backendRetryMaxBackoffMs) },
{ name: "AGENTRUN_RUNNER_POLL_INTERVAL_MS", value: "250" },
{ name: "HOME", value: "/home/agentrun" },
{ name: "CODEX_HOME", value: codexHome },
@@ -305,6 +314,18 @@ function normalizeMissingTerminalAfterToolTimeoutMs(value: number | undefined, r
return value;
}
function normalizeBackendRetryMaxAttempts(value: number | undefined): number {
if (value === undefined) return 1;
if (!Number.isInteger(value) || value <= 0) throw new Error("backendRetryMaxAttempts must be a positive integer");
return Math.min(10, value);
}
function normalizeBackendRetryBackoffMs(value: number | undefined, field: string, fallback: number): number {
if (value === undefined) return fallback;
if (!Number.isInteger(value) || value <= 0) throw new Error(`${field} must be a positive integer`);
return value;
}
function normalizeTtlSecondsAfterFinished(value: number | undefined, warnings: string[]): number {
if (value === undefined) return minimumTerminalArtifactTtlSeconds;
if (!Number.isInteger(value) || value <= 0) throw new Error("ttlSecondsAfterFinished must be a positive integer");
+3
View File
@@ -37,6 +37,9 @@ if (process.env.AGENTRUN_RUNNER_IDLE_TIMEOUT_MS) options.idleTimeoutMs = Number(
if (process.env.AGENTRUN_RUNNER_POLL_INTERVAL_MS) options.pollIntervalMs = Number(process.env.AGENTRUN_RUNNER_POLL_INTERVAL_MS);
if (process.env.AGENTRUN_RUNNER_CLAIM_RETRY_TIMEOUT_MS) options.claimRetryTimeoutMs = Number(process.env.AGENTRUN_RUNNER_CLAIM_RETRY_TIMEOUT_MS);
if (process.env.AGENTRUN_RUNNER_CLAIM_RETRY_INTERVAL_MS) options.claimRetryIntervalMs = Number(process.env.AGENTRUN_RUNNER_CLAIM_RETRY_INTERVAL_MS);
if (process.env.AGENTRUN_BACKEND_RETRY_MAX_ATTEMPTS) options.backendRetryMaxAttempts = Number(process.env.AGENTRUN_BACKEND_RETRY_MAX_ATTEMPTS);
if (process.env.AGENTRUN_BACKEND_RETRY_INITIAL_BACKOFF_MS) options.backendRetryInitialBackoffMs = Number(process.env.AGENTRUN_BACKEND_RETRY_INITIAL_BACKOFF_MS);
if (process.env.AGENTRUN_BACKEND_RETRY_MAX_BACKOFF_MS) options.backendRetryMaxBackoffMs = Number(process.env.AGENTRUN_BACKEND_RETRY_MAX_BACKOFF_MS);
if (process.env.AGENTRUN_RUNNER_ONE_SHOT === "1" || process.env.AGENTRUN_RUNNER_ONE_SHOT === "true") options.oneShot = true;
try {
const result = await runOnce(options);
+76 -1
View File
@@ -26,6 +26,9 @@ export interface RunnerOnceOptions extends BackendAdapterOptions {
pollIntervalMs?: number;
claimRetryTimeoutMs?: number;
claimRetryIntervalMs?: number;
backendRetryMaxAttempts?: number;
backendRetryInitialBackoffMs?: number;
backendRetryMaxBackoffMs?: number;
oneShot?: boolean;
}
@@ -50,6 +53,12 @@ interface CommandTerminalReport {
turnId?: string;
}
interface BackendRetryPolicy {
maxAttempts: number;
initialBackoffMs: number;
maxBackoffMs: number;
}
interface RunnerLogSink {
write(label: string, payload: JsonRecord): Promise<void>;
}
@@ -285,8 +294,42 @@ async function executeCommand(api: RunnerManagerApi, options: RunnerOnceOptions,
};
},
};
const result = backendSession ? await backendSession.runTurn(latestRun, command, backendOptions) : await runBackendTurn(latestRun, command, backendOptions);
const retryPolicy = backendRetryPolicy(latestRun.executionPolicy, options.env ?? process.env, options);
let backendAttempt = 1;
let result = backendSession ? await backendSession.runTurn(latestRun, command, backendOptions) : await runBackendTurn(latestRun, command, backendOptions);
while (shouldRetryBackendTurn(result, retryPolicy, backendAttempt)) {
const retryDelayMs = backendRetryDelayMs(retryPolicy, backendAttempt);
const retryPayload = {
phase: "runner:backend-retry",
commandId: command.id,
attemptId,
runnerId: runner.id,
failureKind: result.failureKind,
message: result.failureMessage ?? `${result.failureKind} retryable backend failure`,
retryable: true,
willRetry: true,
retryAttempt: backendAttempt,
retryNextAttempt: backendAttempt + 1,
retryMaxAttempts: retryPolicy.maxAttempts,
retryDelayMs,
retryExhausted: false,
...(options.runnerJobId ? { runnerJobId: options.runnerJobId } : {}),
};
await appendBestEffort(api, options.runId, { type: "error", payload: retryPayload });
await appendBestEffort(api, options.runId, { type: "backend_status", payload: retryPayload });
await runnerLog.write("command.retrying", { runId: options.runId, commandId: command.id, runnerId: runner.id, runnerJobId: options.runnerJobId ?? null, ...retryPayload, valuesPrinted: false });
if (backendSession) {
const closeEvents = await backendSession.close();
for (const event of closeEvents) await appendBestEffort(api, options.runId, annotateCommandEvent(event, command.id, attemptId, runner.id));
}
await sleep(retryDelayMs);
backendAttempt += 1;
result = backendSession ? await backendSession.runTurn(latestRun, command, backendOptions) : await runBackendTurn(latestRun, command, backendOptions);
}
for (const event of result.events) if (shouldKeepTerminalOutboxEvent(event)) terminalOutboxEvents.push(event);
if (isRetryableBackendFailure(result.failureKind) && retryPolicy.maxAttempts > 1 && backendAttempt >= retryPolicy.maxAttempts && result.terminalStatus !== "completed") {
terminalOutboxEvents.push({ type: "error", payload: { phase: "runner:backend-retry-exhausted", commandId: command.id, attemptId, runnerId: runner.id, failureKind: result.failureKind, message: result.failureMessage ?? `${result.failureKind} retry attempts exhausted`, retryable: true, willRetry: false, retryAttempt: backendAttempt, retryMaxAttempts: retryPolicy.maxAttempts, retryExhausted: true, ...(options.runnerJobId ? { runnerJobId: options.runnerJobId } : {}) } });
}
const report: CommandTerminalReport = { terminalStatus: result.terminalStatus, failureKind: result.failureKind, failureMessage: result.failureMessage, ...(result.threadId ? { threadId: result.threadId } : {}), ...(result.turnId ? { turnId: result.turnId } : {}) };
await reportTerminalCommand(api, {
runId: options.runId,
@@ -314,6 +357,38 @@ async function executeCommand(api: RunnerManagerApi, options: RunnerOnceOptions,
}
}
function backendRetryPolicy(policy: RunRecord["executionPolicy"], env: NodeJS.ProcessEnv, options: RunnerOnceOptions): BackendRetryPolicy {
const configured = jsonRecordValue(policy.backendRetry) ?? jsonRecordValue(policy.retryPolicy);
const maxAttempts = positiveInteger(configured?.maxAttempts, positiveInteger(env.AGENTRUN_BACKEND_RETRY_MAX_ATTEMPTS, options.backendRetryMaxAttempts ?? 1));
const initialBackoffMs = positiveInteger(configured?.initialBackoffMs, positiveInteger(env.AGENTRUN_BACKEND_RETRY_INITIAL_BACKOFF_MS, options.backendRetryInitialBackoffMs ?? 1_000));
const maxBackoffMs = positiveInteger(configured?.maxBackoffMs, positiveInteger(env.AGENTRUN_BACKEND_RETRY_MAX_BACKOFF_MS, options.backendRetryMaxBackoffMs ?? 30_000));
return { maxAttempts: Math.max(1, Math.min(10, maxAttempts)), initialBackoffMs: Math.max(100, initialBackoffMs), maxBackoffMs: Math.max(100, maxBackoffMs) };
}
function shouldRetryBackendTurn(result: { terminalStatus: TerminalStatus; failureKind: FailureKind | null }, policy: BackendRetryPolicy, backendAttempt: number): boolean {
return result.terminalStatus !== "completed" && backendAttempt < policy.maxAttempts && isRetryableBackendFailure(result.failureKind);
}
function isRetryableBackendFailure(failureKind: FailureKind | null): boolean {
return failureKind === "backend-timeout" || failureKind === "provider-stream-disconnected" || failureKind === "provider-unavailable" || failureKind === "provider-rate-limited" || failureKind === "provider-http-error";
}
function backendRetryDelayMs(policy: BackendRetryPolicy, backendAttempt: number): number {
const exponent = Math.max(0, backendAttempt - 1);
const delay = policy.initialBackoffMs * (2 ** exponent);
return Math.min(policy.maxBackoffMs, delay);
}
function jsonRecordValue(value: unknown): JsonRecord | null {
return value && typeof value === "object" && !Array.isArray(value) ? value as JsonRecord : null;
}
function positiveInteger(value: unknown, fallback: number): number {
const parsed = typeof value === "number" ? value : typeof value === "string" && value.trim().length > 0 ? Number(value) : NaN;
if (!Number.isFinite(parsed)) return fallback;
return Math.max(1, Math.floor(parsed));
}
function startActiveTurnCommandWatch(api: RunnerManagerApi, runId: string, targetCommandId: string, attemptId: string, runnerId: string, control: BackendActiveTurnControl, pollIntervalMs: number | undefined, runnerLog: RunnerLogSink, runnerJobId: string | undefined): () => void {
let stopped = false;
let polling = false;