From 174307dc0cc8089692db0d275cd1331043dabac4 Mon Sep 17 00:00:00 2001 From: lyon Date: Mon, 22 Jun 2026 12:42:09 +0800 Subject: [PATCH] fix(runner): protect active session runners --- src/mgr/kubernetes-runner-job.ts | 22 +++++++++++++++++++++- src/mgr/runner-retention.ts | 14 +++++++++++--- 2 files changed, 32 insertions(+), 4 deletions(-) diff --git a/src/mgr/kubernetes-runner-job.ts b/src/mgr/kubernetes-runner-job.ts index d70797c..b2d0260 100644 --- a/src/mgr/kubernetes-runner-job.ts +++ b/src/mgr/kubernetes-runner-job.ts @@ -1,7 +1,7 @@ import { spawn } from "node:child_process"; import { AgentRunError } from "../common/errors.js"; import { redactJson, redactText } from "../common/redaction.js"; -import { isTerminalCommandState, isTerminalRunStatus, summarizeResourceBundleRef, summarizeSessionRef } from "./store.js"; +import { isLeaseExpired, isTerminalCommandState, isTerminalRunStatus, summarizeResourceBundleRef, summarizeSessionRef } from "./store.js"; import type { AgentRunStore } from "./store.js"; import type { ExecutionPolicy, JsonRecord } from "../common/types.js"; import { stableHash, validateEnvName } from "../common/validation.js"; @@ -133,6 +133,26 @@ export async function createKubernetesRunnerJob(options: { store: AgentRunStore; } if (isTerminalRunStatus(run.status)) throw new AgentRunError(run.failureKind ?? (run.status === "cancelled" ? "cancelled" : "schema-invalid"), `run ${run.id} is already terminal: ${run.status}`, { httpStatus: 409 }); if (isTerminalCommandState(command.state) || command.state !== "pending") throw new AgentRunError(command.state === "cancelled" ? "cancelled" : "schema-invalid", `command ${commandId} is not pending: ${command.state}`, { httpStatus: 409 }); + if (run.claimedBy && !isLeaseExpired(run.leaseExpiresAt)) { + const response = { + action: "reuse-active-runner", + mutation: false, + runId: run.id, + commandId, + runnerId: run.claimedBy, + leaseExpiresAt: run.leaseExpiresAt, + reason: "fresh-active-runner-lease", + valuesPrinted: false, + } satisfies JsonRecord; + await options.store.appendEvent(run.id, "backend_status", { + phase: "runner-job-skipped-active-runner", + commandId, + runnerId: run.claimedBy, + leaseExpiresAt: run.leaseExpiresAt, + reason: "fresh-active-runner-lease", + }); + return response; + } let preCreateRetention: RunnerRetentionSummary | null = null; if (options.defaults.retention) { preCreateRetention = await enforceRunnerRetentionBeforeCreate({ store: options.store, options: { ...options.defaults.retention, namespace }, incomingRunnerCount: 1 }); diff --git a/src/mgr/runner-retention.ts b/src/mgr/runner-retention.ts index 0e3527c..e983918 100644 --- a/src/mgr/runner-retention.ts +++ b/src/mgr/runner-retention.ts @@ -235,16 +235,24 @@ async function podEntry(store: AgentRunStore, item: K8sObject, options: RunnerRe async function activityFor(store: AgentRunStore, input: { runId: string | null; commandId: string | null; runnerId: string | null; terminal: boolean; createdAtMs: number; activeHeartbeatMaxAgeMs: number }): Promise> { let run: RunRecord | null = null; let command: CommandRecord | null = null; + let openCommand: CommandRecord | null = null; if (input.runId) { try { run = await store.getRun(input.runId); } catch { run = null; } } if (input.commandId) { try { command = await store.getCommand(input.commandId); } catch { command = null; } } - const lastActiveAtMs = Math.max(input.createdAtMs, isoMs(run?.updatedAt), isoMs(command?.updatedAt), isoMs(run?.leaseExpiresAt)); - if (run && command && !isTerminalRunStatus(run.status) && !isTerminalCommandState(command.state) && !input.terminal) { + if (run && !isTerminalRunStatus(run.status) && !input.terminal) { + try { + openCommand = (await store.listCommands(run.id, 0, 100)).find((item) => !isTerminalCommandState(item.state)) ?? null; + } catch { + openCommand = null; + } + } + const lastActiveAtMs = Math.max(input.createdAtMs, isoMs(run?.updatedAt), isoMs(command?.updatedAt), isoMs(openCommand?.updatedAt), isoMs(run?.leaseExpiresAt)); + if (run && !isTerminalRunStatus(run.status) && !input.terminal && (openCommand || (command && !isTerminalCommandState(command.state)))) { const freshClaim = input.runnerId && run.claimedBy === input.runnerId && heartbeatFresh(run.leaseExpiresAt, input.activeHeartbeatMaxAgeMs); - return { runId: input.runId, commandId: input.commandId, runnerId: input.runnerId, protectedActive: true, protectedReason: freshClaim ? "fresh-active-run" : "nonterminal-runner-resource", candidateKind: "inactive", lastActiveAtMs }; + return { runId: input.runId, commandId: input.commandId, runnerId: input.runnerId, protectedActive: true, protectedReason: freshClaim ? "fresh-active-run" : openCommand ? "nonterminal-run-open-command" : "nonterminal-runner-resource", candidateKind: "inactive", lastActiveAtMs }; } const candidateKind: CandidateKind = command && isTerminalCommandState(command.state) && !input.terminal ? "idle" : input.terminal || (run !== null && isTerminalRunStatus(run.status)) ? "terminal" : "inactive"; return { runId: input.runId, commandId: input.commandId, runnerId: input.runnerId, protectedActive: false, protectedReason: null, candidateKind, lastActiveAtMs };