fix(runner): protect active session runners
This commit is contained in:
@@ -1,7 +1,7 @@
|
|||||||
import { spawn } from "node:child_process";
|
import { spawn } from "node:child_process";
|
||||||
import { AgentRunError } from "../common/errors.js";
|
import { AgentRunError } from "../common/errors.js";
|
||||||
import { redactJson, redactText } from "../common/redaction.js";
|
import { redactJson, redactText } from "../common/redaction.js";
|
||||||
import { isTerminalCommandState, isTerminalRunStatus, summarizeResourceBundleRef, summarizeSessionRef } from "./store.js";
|
import { isLeaseExpired, isTerminalCommandState, isTerminalRunStatus, summarizeResourceBundleRef, summarizeSessionRef } from "./store.js";
|
||||||
import type { AgentRunStore } from "./store.js";
|
import type { AgentRunStore } from "./store.js";
|
||||||
import type { ExecutionPolicy, JsonRecord } from "../common/types.js";
|
import type { ExecutionPolicy, JsonRecord } from "../common/types.js";
|
||||||
import { stableHash, validateEnvName } from "../common/validation.js";
|
import { stableHash, validateEnvName } from "../common/validation.js";
|
||||||
@@ -133,6 +133,26 @@ export async function createKubernetesRunnerJob(options: { store: AgentRunStore;
|
|||||||
}
|
}
|
||||||
if (isTerminalRunStatus(run.status)) throw new AgentRunError(run.failureKind ?? (run.status === "cancelled" ? "cancelled" : "schema-invalid"), `run ${run.id} is already terminal: ${run.status}`, { httpStatus: 409 });
|
if (isTerminalRunStatus(run.status)) throw new AgentRunError(run.failureKind ?? (run.status === "cancelled" ? "cancelled" : "schema-invalid"), `run ${run.id} is already terminal: ${run.status}`, { httpStatus: 409 });
|
||||||
if (isTerminalCommandState(command.state) || command.state !== "pending") throw new AgentRunError(command.state === "cancelled" ? "cancelled" : "schema-invalid", `command ${commandId} is not pending: ${command.state}`, { httpStatus: 409 });
|
if (isTerminalCommandState(command.state) || command.state !== "pending") throw new AgentRunError(command.state === "cancelled" ? "cancelled" : "schema-invalid", `command ${commandId} is not pending: ${command.state}`, { httpStatus: 409 });
|
||||||
|
if (run.claimedBy && !isLeaseExpired(run.leaseExpiresAt)) {
|
||||||
|
const response = {
|
||||||
|
action: "reuse-active-runner",
|
||||||
|
mutation: false,
|
||||||
|
runId: run.id,
|
||||||
|
commandId,
|
||||||
|
runnerId: run.claimedBy,
|
||||||
|
leaseExpiresAt: run.leaseExpiresAt,
|
||||||
|
reason: "fresh-active-runner-lease",
|
||||||
|
valuesPrinted: false,
|
||||||
|
} satisfies JsonRecord;
|
||||||
|
await options.store.appendEvent(run.id, "backend_status", {
|
||||||
|
phase: "runner-job-skipped-active-runner",
|
||||||
|
commandId,
|
||||||
|
runnerId: run.claimedBy,
|
||||||
|
leaseExpiresAt: run.leaseExpiresAt,
|
||||||
|
reason: "fresh-active-runner-lease",
|
||||||
|
});
|
||||||
|
return response;
|
||||||
|
}
|
||||||
let preCreateRetention: RunnerRetentionSummary | null = null;
|
let preCreateRetention: RunnerRetentionSummary | null = null;
|
||||||
if (options.defaults.retention) {
|
if (options.defaults.retention) {
|
||||||
preCreateRetention = await enforceRunnerRetentionBeforeCreate({ store: options.store, options: { ...options.defaults.retention, namespace }, incomingRunnerCount: 1 });
|
preCreateRetention = await enforceRunnerRetentionBeforeCreate({ store: options.store, options: { ...options.defaults.retention, namespace }, incomingRunnerCount: 1 });
|
||||||
|
|||||||
@@ -235,16 +235,24 @@ async function podEntry(store: AgentRunStore, item: K8sObject, options: RunnerRe
|
|||||||
async function activityFor(store: AgentRunStore, input: { runId: string | null; commandId: string | null; runnerId: string | null; terminal: boolean; createdAtMs: number; activeHeartbeatMaxAgeMs: number }): Promise<Pick<RunnerResourceEntry, "runId" | "commandId" | "runnerId" | "protectedActive" | "protectedReason" | "candidateKind" | "lastActiveAtMs">> {
|
async function activityFor(store: AgentRunStore, input: { runId: string | null; commandId: string | null; runnerId: string | null; terminal: boolean; createdAtMs: number; activeHeartbeatMaxAgeMs: number }): Promise<Pick<RunnerResourceEntry, "runId" | "commandId" | "runnerId" | "protectedActive" | "protectedReason" | "candidateKind" | "lastActiveAtMs">> {
|
||||||
let run: RunRecord | null = null;
|
let run: RunRecord | null = null;
|
||||||
let command: CommandRecord | null = null;
|
let command: CommandRecord | null = null;
|
||||||
|
let openCommand: CommandRecord | null = null;
|
||||||
if (input.runId) {
|
if (input.runId) {
|
||||||
try { run = await store.getRun(input.runId); } catch { run = null; }
|
try { run = await store.getRun(input.runId); } catch { run = null; }
|
||||||
}
|
}
|
||||||
if (input.commandId) {
|
if (input.commandId) {
|
||||||
try { command = await store.getCommand(input.commandId); } catch { command = null; }
|
try { command = await store.getCommand(input.commandId); } catch { command = null; }
|
||||||
}
|
}
|
||||||
const lastActiveAtMs = Math.max(input.createdAtMs, isoMs(run?.updatedAt), isoMs(command?.updatedAt), isoMs(run?.leaseExpiresAt));
|
if (run && !isTerminalRunStatus(run.status) && !input.terminal) {
|
||||||
if (run && command && !isTerminalRunStatus(run.status) && !isTerminalCommandState(command.state) && !input.terminal) {
|
try {
|
||||||
|
openCommand = (await store.listCommands(run.id, 0, 100)).find((item) => !isTerminalCommandState(item.state)) ?? null;
|
||||||
|
} catch {
|
||||||
|
openCommand = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
const lastActiveAtMs = Math.max(input.createdAtMs, isoMs(run?.updatedAt), isoMs(command?.updatedAt), isoMs(openCommand?.updatedAt), isoMs(run?.leaseExpiresAt));
|
||||||
|
if (run && !isTerminalRunStatus(run.status) && !input.terminal && (openCommand || (command && !isTerminalCommandState(command.state)))) {
|
||||||
const freshClaim = input.runnerId && run.claimedBy === input.runnerId && heartbeatFresh(run.leaseExpiresAt, input.activeHeartbeatMaxAgeMs);
|
const freshClaim = input.runnerId && run.claimedBy === input.runnerId && heartbeatFresh(run.leaseExpiresAt, input.activeHeartbeatMaxAgeMs);
|
||||||
return { runId: input.runId, commandId: input.commandId, runnerId: input.runnerId, protectedActive: true, protectedReason: freshClaim ? "fresh-active-run" : "nonterminal-runner-resource", candidateKind: "inactive", lastActiveAtMs };
|
return { runId: input.runId, commandId: input.commandId, runnerId: input.runnerId, protectedActive: true, protectedReason: freshClaim ? "fresh-active-run" : openCommand ? "nonterminal-run-open-command" : "nonterminal-runner-resource", candidateKind: "inactive", lastActiveAtMs };
|
||||||
}
|
}
|
||||||
const candidateKind: CandidateKind = command && isTerminalCommandState(command.state) && !input.terminal ? "idle" : input.terminal || (run !== null && isTerminalRunStatus(run.status)) ? "terminal" : "inactive";
|
const candidateKind: CandidateKind = command && isTerminalCommandState(command.state) && !input.terminal ? "idle" : input.terminal || (run !== null && isTerminalRunStatus(run.status)) ? "terminal" : "inactive";
|
||||||
return { runId: input.runId, commandId: input.commandId, runnerId: input.runnerId, protectedActive: false, protectedReason: null, candidateKind, lastActiveAtMs };
|
return { runId: input.runId, commandId: input.commandId, runnerId: input.runnerId, protectedActive: false, protectedReason: null, candidateKind, lastActiveAtMs };
|
||||||
|
|||||||
Reference in New Issue
Block a user