Merge pull request #205 from pikasTech/fix/1720-manager-reconciler

fix: reconcile runner job observations
This commit is contained in:
Lyon
2026-06-20 15:47:18 +08:00
committed by GitHub
7 changed files with 459 additions and 4 deletions
+16
View File
@@ -148,6 +148,7 @@ async function dispatch(args: ParsedArgs): Promise<CliResult> {
if (group === "runner" && command === "job") return renderRunnerJob(args);
if (group === "runner" && command === "jobs") return listRunnerJobs(args);
if (group === "runner" && command === "job-status") return showRunnerJobStatus(args);
if (group === "runner" && command === "reconcile") return reconcileRunnerJobs(args);
throw new AgentRunError("schema-invalid", `unsupported command: ${args.positional.join(" ")}`, { httpStatus: 2 });
}
@@ -1496,6 +1497,20 @@ async function showRunnerJobStatus(args: ParsedArgs): Promise<JsonValue> {
return client(args).get(`/api/v1/runs/${encodeURIComponent(runId)}/runner-jobs/${encodeURIComponent(runnerJobId)}`);
}
async function reconcileRunnerJobs(args: ParsedArgs): Promise<JsonValue> {
const body: JsonRecord = {};
const limit = optionalFlag(args, "limit");
const namespace = optionalFlag(args, "namespace");
if (limit) {
const parsed = Number(limit);
if (!Number.isInteger(parsed) || parsed <= 0) throw new AgentRunError("schema-invalid", "runner reconcile --limit must be a positive integer", { httpStatus: 2 });
body.limit = parsed;
}
if (namespace) body.namespace = namespace;
if (args.flags.get("dry-run") === true) return { action: "runner-job-reconcile", method: "POST", path: "/api/v1/reconciler/runner-jobs", body, valuesPrinted: false };
return client(args).post("/api/v1/reconciler/runner-jobs", body);
}
async function renderRunnerJob(args: ParsedArgs): Promise<JsonRecord> {
const runId = flag(args, "run-id", "");
const commandId = flag(args, "command-id", "");
@@ -2055,6 +2070,7 @@ function help(args: ParsedArgs, group?: string): JsonRecord {
"runner job --dry-run --run-id <runId> --command-id <commandId> --image <image>",
"runner jobs --run-id <runId> [--command-id <commandId>]",
"runner job-status [runnerJobId] --run-id <runId>",
"runner reconcile [--limit <n>] [--namespace <namespace>] [--dry-run]",
"queue submit --json-stdin|--json-file <task.json> [--idempotency-key <key>] [--dry-run]",
"queue submit --aipod <name> [--prompt-stdin|--prompt-file <file>|--prompt <text>] [--idempotency-key <key>] [--dry-run]",
"queue list [--queue <queue>] [--state <state>] [--cursor <cursor>] [--limit <limit>] [--updated-after <version>] [--full|--raw]",
+37 -2
View File
@@ -20,7 +20,7 @@ export function runDiagnosis(input: RunDiagnosisInput): JsonRecord {
const staleClaimed = input.run.status === "claimed" && booleanValue(lease.leaseExpired) === true;
const terminalCommandOpenRun = input.run.status === "claimed" && input.terminalStatus !== null;
const runnerJob = input.latestJob ? runnerJobReference(input.latestJob, input.events) : null;
const runnerLost = staleClaimed && (runnerJob === null || runnerJob.phase === "created" || runnerJob.phase === "recorded");
const runnerLost = staleClaimed && (runnerJob === null || runnerJobPhaseIndicatesLost(runnerJob.phase));
const session = sessionReference(input.run);
const providerEvidence = stringValue(input.terminalClassification?.providerEvidence) ?? "not-applicable";
const providerInterruption = stringValue(input.terminalClassification?.providerInterruption) ?? "not-established";
@@ -69,14 +69,17 @@ export function runnerJobDiagnosis(job: RunnerJobRecord, events: RunEvent[] = []
const observation = runnerJobObservation(job, events);
const phase = stringValue(observation.phase) ?? "unknown";
const notStarted = phase === "created" || phase === "recorded";
const runnerLostSuspected = notStarted || runnerJobPhaseIndicatesLost(phase);
return {
category: stringValue(observation.category) ?? (notStarted ? "runner-job-created" : phase.startsWith("terminal:") ? "runner-job-terminal" : "runner-job-observed"),
runnerLostSuspected: notStarted,
runnerLostSuspected,
phase,
evidenceLevel: stringValue(observation.evidenceLevel) ?? (notStarted ? "medium" : "high"),
lastObservedSeq: numberValue(observation.lastObservedSeq),
lastObservedAt: stringValue(observation.lastObservedAt),
lastObservedKind: stringValue(observation.lastObservedKind),
terminalReportState: stringValue(observation.terminalReportState),
runReportState: stringValue(observation.runReportState),
runId: job.runId,
commandId: job.commandId,
runnerJobId: job.id,
@@ -131,6 +134,26 @@ export function runnerJobObservation(job: RunnerJobRecord, events: RunEvent[] =
};
}
const reconcilerObservation = recordAt(job.result, "observation");
const reconcilerPhase = stringValue(reconcilerObservation?.observedRunnerPhase) ?? stringValue(reconcilerObservation?.phase);
if (reconcilerObservation && reconcilerPhase) {
return {
phase: reconcilerPhase,
category: stringValue(reconcilerObservation.category) ?? "runner-job-observed",
terminalStatus: stringValue(reconcilerObservation.terminalStatus),
failureKind: stringValue(reconcilerObservation.failureKind),
startedAt: stringValue(recordAt(reconcilerObservation, "k8s")?.startTime),
finishedAt: stringValue(recordAt(reconcilerObservation, "k8s")?.completionTime),
lastObservedSeq: null,
lastObservedAt: stringValue(reconcilerObservation.lastK8sObservedAt) ?? stringValue(reconcilerObservation.lastObservedAt),
lastObservedKind: stringValue(reconcilerObservation.lastObservedKind) ?? `manager-reconciler:${reconcilerPhase}`,
terminalReportState: stringValue(reconcilerObservation.terminalReportState),
runReportState: stringValue(reconcilerObservation.runReportState),
evidenceLevel: stringValue(reconcilerObservation.evidenceLevel) ?? "high",
valuesPrinted: false,
};
}
const created = recordAt(job.result, "kubernetes")?.created === true;
return {
phase: created ? "created" : "recorded",
@@ -212,11 +235,23 @@ function runnerJobReference(job: RunnerJobRecord, events: RunEvent[]): JsonRecor
lastObservedSeq: numberValue(observation.lastObservedSeq),
lastObservedAt: stringValue(observation.lastObservedAt),
lastObservedKind: stringValue(observation.lastObservedKind),
terminalReportState: stringValue(observation.terminalReportState),
runReportState: stringValue(observation.runReportState),
logPath: stringValue(recordAt(job.result, "runner")?.logPath),
valuesPrinted: false,
};
}
function runnerJobPhaseIndicatesLost(value: JsonValue | undefined): boolean {
const phase = stringValue(value);
return phase === "created"
|| phase === "recorded"
|| phase === "k8s:failed"
|| phase === "k8s:missing"
|| phase === "k8s:succeeded"
|| phase === "k8s:observe-failed";
}
function sessionReference(run: RunRecord): JsonRecord {
if (!run.sessionRef) return { sessionId: null, sessionRefNull: true, sessionPath: null, valuesPrinted: false };
return { sessionId: run.sessionRef.sessionId, sessionRefNull: false, sessionPath: `/api/v1/sessions/${run.sessionRef.sessionId}`, valuesPrinted: false };
+30
View File
@@ -550,6 +550,24 @@ CREATE TABLE IF NOT EXISTS agentrun_schema_migrations (
return result.rows.map(runnerJobFromRow);
}
async listRunnerJobsForReconciliation(limit: number): Promise<RunnerJobRecord[]> {
const clamped = Math.max(1, Math.min(limit, 500));
const result = await this.pool.query(
`SELECT rj.*
FROM agentrun_runner_jobs rj
LEFT JOIN agentrun_runs r ON r.id = rj.run_id
LEFT JOIN agentrun_commands c ON c.id = rj.command_id
WHERE r.id IS NULL
OR c.id IS NULL
OR r.status NOT IN ('completed', 'failed', 'blocked', 'cancelled')
OR c.state NOT IN ('completed', 'failed', 'cancelled')
ORDER BY rj.updated_at ASC, rj.created_at ASC
LIMIT $1`,
[clamped],
);
return result.rows.map(runnerJobFromRow);
}
async getRunnerJobByIdempotencyKey(runId: string, idempotencyKey: string, payloadHash: string): Promise<RunnerJobRecord | null> {
const result = await this.pool.query("SELECT * FROM agentrun_runner_jobs WHERE run_id = $1 AND idempotency_key = $2", [runId, idempotencyKey]);
const row = result.rows[0];
@@ -582,6 +600,18 @@ CREATE TABLE IF NOT EXISTS agentrun_schema_migrations (
});
}
async updateRunnerJobResult(runnerJobId: string, patch: JsonRecord): Promise<RunnerJobRecord> {
return this.withTransaction(async (client) => {
const existing = await client.query("SELECT * FROM agentrun_runner_jobs WHERE id = $1 FOR UPDATE", [runnerJobId]);
const row = existing.rows[0];
if (!row) throw new AgentRunError("schema-invalid", `runner job ${runnerJobId} was not found`, { httpStatus: 404 });
const record = runnerJobFromRow(row);
const nextResult = { ...record.result, ...patch };
const updated = await client.query("UPDATE agentrun_runner_jobs SET result = $2::jsonb, updated_at = $3 WHERE id = $1 RETURNING *", [runnerJobId, JSON.stringify(nextResult), nowIso()]);
return runnerJobFromRow(updated.rows[0]);
});
}
async claimRun(runId: string, runnerId: string, leaseMs: number): Promise<RunRecord> {
return this.withTransaction(async (client) => {
const run = await this.requireRunForUpdate(client, runId);
+4
View File
@@ -8,6 +8,7 @@ export function runnerJobStatusSummary(job: RunnerJobRecord, events: RunEvent[]
const kubernetes = recordAt(job.result, "kubernetes");
const retention = recordAt(job.result, "retention");
const envImage = recordAt(job.result, "envImage");
const reconcilerObservation = recordAt(job.result, "observation");
const terminalStatus = observation.terminalStatus;
return {
id: job.id,
@@ -31,6 +32,9 @@ export function runnerJobStatusSummary(job: RunnerJobRecord, events: RunEvent[]
lastObservedSeq: typeof observation.lastObservedSeq === "number" ? observation.lastObservedSeq : null,
lastObservedAt: typeof observation.lastObservedAt === "string" ? observation.lastObservedAt : null,
lastObservedKind: typeof observation.lastObservedKind === "string" ? observation.lastObservedKind : null,
terminalReportState: typeof observation.terminalReportState === "string" ? observation.terminalReportState : null,
runReportState: typeof observation.runReportState === "string" ? observation.runReportState : null,
reconcilerObservation,
jobIdentity,
podIdentity: recordAt(job.result, "podIdentity"),
logPath: typeof runner.logPath === "string" ? runner.logPath : null,
+314
View File
@@ -0,0 +1,314 @@
import { spawn } from "node:child_process";
import { AgentRunError } from "../common/errors.js";
import { redactJson, redactText } from "../common/redaction.js";
import type { CommandRecord, FailureKind, JsonRecord, JsonValue, RunnerJobRecord, TerminalStatus } from "../common/types.js";
import { nowIso, stableHash } from "../common/validation.js";
import type { AgentRunStore } from "./store.js";
import { isTerminalCommandState, isTerminalRunStatus } from "./store.js";
export interface RunnerReconcilerOptions {
store: AgentRunStore;
namespace?: string;
kubectlCommand?: string;
limit?: number;
}
export interface RunnerReconcilerLoopOptions extends RunnerReconcilerOptions {
batchSize: number;
intervalMs: number;
onError?: (error: unknown) => void;
}
interface K8sObject {
kind?: string;
metadata?: {
name?: string;
namespace?: string;
uid?: string;
creationTimestamp?: string;
labels?: Record<string, string>;
annotations?: Record<string, string>;
ownerReferences?: Array<{ kind?: string; name?: string }>;
};
status?: JsonRecord;
}
interface K8sList {
items?: K8sObject[];
}
export async function reconcileRunnerJobsOnce(input: RunnerReconcilerOptions): Promise<JsonRecord> {
const limit = clampLimit(input.limit ?? 20);
const jobs = await input.store.listRunnerJobsForReconciliation(limit);
const items: JsonRecord[] = [];
let observeFailedCount = 0;
let runClosureCount = 0;
for (const job of jobs) {
const observation = await observeRunnerJob(job, { ...(input.namespace ? { namespace: input.namespace } : {}), ...(input.kubectlCommand ? { kubectlCommand: input.kubectlCommand } : {}) });
await input.store.updateRunnerJobResult(job.id, { observation });
if (stringValue(observation.category) === "runner-job-observe-failed") observeFailedCount++;
const runClosure = await closeOpenRunWhenCommandTerminal(input.store, job);
if (runClosure.closed === true) runClosureCount++;
items.push({
runnerJobId: job.id,
runId: job.runId,
commandId: job.commandId,
namespace: stringValue(observation.namespace) ?? job.namespace,
jobName: job.jobName,
observedRunnerPhase: stringValue(observation.observedRunnerPhase) ?? "unknown",
terminalReportState: stringValue(observation.terminalReportState) ?? "unknown",
runReportState: stringValue(observation.runReportState) ?? "unknown",
runClosure,
valuesPrinted: false,
});
}
return {
action: "runner-job-reconcile",
reconciledAt: nowIso(),
limit,
scannedCount: jobs.length,
updatedCount: jobs.length,
observeFailedCount,
runClosureCount,
items,
valuesPrinted: false,
};
}
export function startRunnerJobReconciler(input: RunnerReconcilerLoopOptions): () => void {
const intervalMs = Math.max(1_000, input.intervalMs);
let stopped = false;
let running = false;
const tick = async (): Promise<void> => {
if (stopped || running) return;
running = true;
try {
await reconcileRunnerJobsOnce({ store: input.store, limit: input.batchSize, ...(input.namespace ? { namespace: input.namespace } : {}), ...(input.kubectlCommand ? { kubectlCommand: input.kubectlCommand } : {}) });
} catch (error) {
input.onError?.(error);
} finally {
running = false;
}
};
const timer = setInterval(() => { void tick(); }, intervalMs);
timer.unref?.();
void tick();
return () => {
stopped = true;
clearInterval(timer);
};
}
async function observeRunnerJob(job: RunnerJobRecord, input: { namespace?: string; kubectlCommand?: string }): Promise<JsonRecord> {
const namespace = input.namespace ?? job.namespace;
const kubectlCommand = input.kubectlCommand ?? "kubectl";
const observedAt = nowIso();
try {
const [jobObject, podObjects] = await Promise.all([
getK8sObject(kubectlCommand, "job", namespace, job.jobName),
getK8sList(kubectlCommand, "pods", namespace, ["-l", `job-name=${job.jobName}`]),
]);
return observationFromObjects(job, namespace, observedAt, jobObject, podObjects);
} catch (error) {
return {
source: "manager-reconciler",
namespace,
jobName: job.jobName,
observedRunnerPhase: "k8s:observe-failed",
category: "runner-job-observe-failed",
terminalStatus: null,
failureKind: "infra-failed",
failureMessage: error instanceof Error ? redactText(error.message).slice(0, 300) : "unknown observation failure",
terminalReportState: "k8s-observation-failed",
runReportState: "not-updated",
lastK8sObservedAt: observedAt,
evidenceLevel: "medium",
valuesPrinted: false,
};
}
}
function observationFromObjects(job: RunnerJobRecord, namespace: string, observedAt: string, jobObject: K8sObject | null, podObjects: K8sObject[]): JsonRecord {
const phase = observedRunnerPhase(jobObject, podObjects);
const terminalReportState = terminalReportStateForPhase(phase);
const failureKind: FailureKind | null = phase === "k8s:failed" || phase === "k8s:missing" ? "infra-failed" : null;
const k8s = k8sSummary(jobObject, podObjects);
return {
source: "manager-reconciler",
namespace,
jobName: job.jobName,
observedRunnerPhase: phase,
phase,
category: categoryForPhase(phase),
terminalStatus: null,
failureKind,
terminalReportState,
runReportState: "not-updated",
lastK8sObservedAt: observedAt,
lastObservedAt: observedAt,
lastObservedKind: `manager-reconciler:${phase}`,
evidenceLevel: phase === "k8s:unknown" ? "medium" : "high",
k8s,
valuesPrinted: false,
};
}
async function closeOpenRunWhenCommandTerminal(store: AgentRunStore, job: RunnerJobRecord): Promise<JsonRecord> {
let command: CommandRecord;
try {
command = await store.getCommand(job.commandId);
} catch {
return { closed: false, state: "command-missing", valuesPrinted: false };
}
if (!isTerminalCommandState(command.state)) return { closed: false, state: "command-open", commandState: command.state, valuesPrinted: false };
const run = await store.getRun(job.runId);
if (isTerminalRunStatus(run.status)) return { closed: false, state: "run-terminal", runStatus: run.status, commandState: command.state, valuesPrinted: false };
const terminalStatus = terminalStatusFromCommand(command);
const failureKind: FailureKind | null = terminalStatus === "cancelled" ? "cancelled" : terminalStatus === "failed" ? "infra-failed" : null;
const failureMessage = terminalStatus === "completed" ? null : "manager reconciler closed open run after terminal command state";
const next = await store.finishRun(run.id, { terminalStatus, failureKind, failureMessage });
return { closed: true, state: "closed-open-run", runStatus: next.status, terminalStatus, commandState: command.state, valuesPrinted: false };
}
function terminalStatusFromCommand(command: CommandRecord): TerminalStatus {
if (command.state === "completed") return "completed";
if (command.state === "cancelled") return "cancelled";
return "failed";
}
function observedRunnerPhase(jobObject: K8sObject | null, pods: K8sObject[]): string {
if (!jobObject && pods.length === 0) return "k8s:missing";
const condition = jobCondition(jobObject);
if (condition === "Complete") return "k8s:succeeded";
if (condition === "Failed") return "k8s:failed";
const active = numberPath(jobObject, ["status", "active"]);
const succeeded = numberPath(jobObject, ["status", "succeeded"]);
const failed = numberPath(jobObject, ["status", "failed"]);
const podPhases = pods.map((pod) => stringPath(pod, ["status", "phase"])).filter((phase): phase is string => Boolean(phase));
if ((succeeded ?? 0) > 0 || podPhases.some((phase) => phase === "Succeeded")) return "k8s:succeeded";
if ((failed ?? 0) > 0 || podPhases.some((phase) => phase === "Failed")) return "k8s:failed";
if ((active ?? 0) > 0 || podPhases.some((phase) => phase === "Running")) return "k8s:running";
if (podPhases.some((phase) => phase === "Pending")) return "k8s:pending";
return "k8s:unknown";
}
function categoryForPhase(phase: string): string {
if (phase === "k8s:succeeded") return "runner-job-k8s-succeeded";
if (phase === "k8s:failed") return "runner-job-k8s-failed";
if (phase === "k8s:missing") return "runner-job-k8s-missing";
if (phase === "k8s:running" || phase === "k8s:pending") return "runner-job-running";
return "runner-job-observed";
}
function terminalReportStateForPhase(phase: string): string {
if (phase === "k8s:succeeded" || phase === "k8s:failed") return "terminal-runner-report-missing";
if (phase === "k8s:missing") return "runner-resource-missing";
return "not-terminal";
}
function k8sSummary(jobObject: K8sObject | null, pods: K8sObject[]): JsonRecord {
const podPhases = pods.map((pod) => stringPath(pod, ["status", "phase"]) ?? "unknown");
return {
jobPresent: jobObject !== null,
jobUid: stringPath(jobObject, ["metadata", "uid"]),
condition: jobCondition(jobObject),
active: numberPath(jobObject, ["status", "active"]),
succeeded: numberPath(jobObject, ["status", "succeeded"]),
failed: numberPath(jobObject, ["status", "failed"]),
startTime: stringPath(jobObject, ["status", "startTime"]),
completionTime: stringPath(jobObject, ["status", "completionTime"]),
podCount: pods.length,
podPhases,
newestPodName: newestPodName(pods),
valuesPrinted: false,
};
}
function jobCondition(jobObject: K8sObject | null): string | null {
const conditions = jobObject?.status?.conditions;
if (!Array.isArray(conditions)) return null;
for (const condition of [...conditions].reverse()) {
if (typeof condition !== "object" || condition === null || Array.isArray(condition)) continue;
const record = condition as JsonRecord;
if ((record.type === "Complete" || record.type === "Failed") && record.status === "True") return record.type;
}
return null;
}
function newestPodName(pods: K8sObject[]): string | null {
const sorted = [...pods].sort((left, right) => (stringPath(right, ["metadata", "creationTimestamp"]) ?? "").localeCompare(stringPath(left, ["metadata", "creationTimestamp"]) ?? ""));
return stringPath(sorted[0] ?? null, ["metadata", "name"]);
}
async function getK8sObject(kubectlCommand: string, resource: string, namespace: string, name: string): Promise<K8sObject | null> {
const result = await kubectlRun(kubectlCommand, ["get", resource, name, "-n", namespace, "-o", "json"]);
if (result.code === 0) return parseJsonObject(result.stdout, `kubectl get ${resource}`) as K8sObject;
if (isNotFound(result.stderr)) return null;
throw new AgentRunError("infra-failed", `kubectl get ${resource} failed with code ${result.code}`, { httpStatus: 502, details: redactJson({ stderr: redactText(result.stderr.slice(-2000)), stdout: redactText(result.stdout.slice(-1000)), valuesPrinted: false }) });
}
async function getK8sList(kubectlCommand: string, resource: string, namespace: string, extraArgs: string[]): Promise<K8sObject[]> {
const result = await kubectlRun(kubectlCommand, ["get", resource, "-n", namespace, ...extraArgs, "-o", "json"]);
if (result.code !== 0) throw new AgentRunError("infra-failed", `kubectl get ${resource} failed with code ${result.code}`, { httpStatus: 502, details: redactJson({ stderr: redactText(result.stderr.slice(-2000)), stdout: redactText(result.stdout.slice(-1000)), valuesPrinted: false }) });
const parsed = parseJsonObject(result.stdout, `kubectl get ${resource}`) as K8sList;
return Array.isArray(parsed.items) ? parsed.items.filter((item) => typeof item === "object" && item !== null && !Array.isArray(item)) : [];
}
async function kubectlRun(kubectlCommand: string, args: string[]): Promise<{ code: number | null; signal: NodeJS.Signals | null; stdout: string; stderr: string }> {
const child = spawn(kubectlCommand, args, { stdio: ["ignore", "pipe", "pipe"] });
let stdout = "";
let stderr = "";
child.stdout.setEncoding("utf8");
child.stderr.setEncoding("utf8");
child.stdout.on("data", (chunk) => { stdout += String(chunk); });
child.stderr.on("data", (chunk) => { stderr += String(chunk); });
const result = await new Promise<{ code: number | null; signal: NodeJS.Signals | null }>((resolve, reject) => {
child.on("error", reject);
child.on("close", (code, signal) => resolve({ code, signal }));
}).catch((error: unknown) => {
throw new AgentRunError("infra-failed", `failed to start kubectl: ${error instanceof Error ? error.message : String(error)}`, { httpStatus: 503 });
});
return { ...result, stdout, stderr };
}
function parseJsonObject(text: string, label: string): JsonRecord {
try {
const parsed = JSON.parse(text) as JsonValue;
if (typeof parsed === "object" && parsed !== null && !Array.isArray(parsed)) return parsed;
} catch (error) {
throw new AgentRunError("infra-failed", `${label} returned invalid JSON: ${error instanceof Error ? error.message : String(error)}`, { httpStatus: 502, details: { stdoutHash: stableHash(text), valuesPrinted: false } });
}
throw new AgentRunError("infra-failed", `${label} returned non-object JSON`, { httpStatus: 502, details: { stdoutHash: stableHash(text), valuesPrinted: false } });
}
function isNotFound(stderr: string): boolean {
return /notfound|not found|notfound/i.test(stderr.replace(/\s+/gu, ""));
}
function clampLimit(limit: number): number {
return Math.max(1, Math.min(Math.floor(limit), 500));
}
function stringValue(value: JsonValue | undefined): string | null {
return typeof value === "string" && value.length > 0 ? value : null;
}
function stringPath(value: unknown, path: string[]): string | null {
let current: unknown = value;
for (const key of path) {
if (typeof current !== "object" || current === null || Array.isArray(current)) return null;
current = (current as Record<string, unknown>)[key];
}
return typeof current === "string" ? current : null;
}
function numberPath(value: unknown, path: string[]): number | null {
let current: unknown = value;
for (const key of path) {
if (typeof current !== "object" || current === null || Array.isArray(current)) return null;
current = (current as Record<string, unknown>)[key];
}
return typeof current === "number" && Number.isFinite(current) ? current : null;
}
+36 -2
View File
@@ -12,6 +12,7 @@ import type { RunnerRetentionOptions } from "./runner-retention.js";
import { dispatchQueueTask, refreshQueueTaskFromCore } from "./queue-dispatch.js";
import { buildRunResult } from "./result.js";
import { runnerJobStatusSummary } from "./runner-job-status.js";
import { reconcileRunnerJobsOnce, startRunnerJobReconciler } from "./runner-reconciler.js";
import { createSessionPvc, deleteSessionPvc, getSessionPvcSummary, refreshSessionPvcSummary, runSessionStorageGc } from "./session-pvc.js";
import type { SessionPvcSummary } from "./session-pvc.js";
import type { SessionPvcOptions } from "./session-pvc.js";
@@ -84,6 +85,15 @@ function runnerRetentionOptionsForRequest(defaults: RunnerRetentionOptions | und
};
}
function runnerReconcilerOptionsForRuntime(defaults: ManagerServerOptions["runnerReconcilerOptions"] | undefined, runnerJobDefaults: ManagerServerOptions["runnerJobDefaults"] | undefined): RunnerReconcilerRuntimeOptions {
const enabled = defaults?.enabled ?? optionalBooleanEnv("AGENTRUN_MANAGER_RECONCILER_ENABLED", process.env.AGENTRUN_MANAGER_RECONCILER_ENABLED, false);
const namespace = defaults?.namespace ?? runnerJobDefaults?.namespace ?? process.env.AGENTRUN_RUNTIME_NAMESPACE ?? "agentrun-v01";
const intervalMs = defaults?.intervalMs ?? optionalPositiveInteger("AGENTRUN_MANAGER_RECONCILER_INTERVAL_MS", process.env.AGENTRUN_MANAGER_RECONCILER_INTERVAL_MS) ?? 30_000;
const batchSize = defaults?.batchSize ?? optionalPositiveInteger("AGENTRUN_MANAGER_RECONCILER_BATCH_SIZE", process.env.AGENTRUN_MANAGER_RECONCILER_BATCH_SIZE) ?? 20;
const kubectlCommand = defaults?.kubectlCommand ?? runnerJobDefaults?.kubectlCommand;
return { enabled, namespace, intervalMs, batchSize, ...(kubectlCommand ? { kubectlCommand } : {}) };
}
export interface ManagerServerOptions {
store?: AgentRunStore;
port?: number;
@@ -107,10 +117,19 @@ export interface ManagerServerOptions {
sessionPvcOptions?: { kubectlHandler?: import("./session-pvc.js").KubectlHandler; kubectlCommand?: string; storageClassName?: string; size?: string };
providerProfileOptions?: { namespace?: string; kubectlCommand?: string };
toolCredentialOptions?: { namespace?: string; kubectlCommand?: string };
runnerReconcilerOptions?: { enabled?: boolean; namespace?: string; kubectlCommand?: string; intervalMs?: number; batchSize?: number };
aipodSpecDir?: string;
auth?: ManagerAuthConfig;
}
interface RunnerReconcilerRuntimeOptions {
enabled: boolean;
namespace: string;
intervalMs: number;
batchSize: number;
kubectlCommand?: string;
}
export interface StartedManagerServer {
server: Server;
baseUrl: string;
@@ -124,22 +143,25 @@ export async function startManagerServer(options: ManagerServerOptions = {}): Pr
const sessionPvcDefaults = options.sessionPvcOptions;
const providerProfileDefaults = options.providerProfileOptions;
const toolCredentialDefaults = options.toolCredentialOptions;
const runnerReconcilerOptions = runnerReconcilerOptionsForRuntime(options.runnerReconcilerOptions, runnerJobDefaults);
const aipodSpecDir = options.aipodSpecDir ?? process.env.AGENTRUN_AIPOD_SPEC_DIR;
const auth = options.auth ?? managerServerAuthConfigFromEnv();
const authSummary = managerAuthSummary(auth);
const stopRunnerReconciler = runnerReconcilerOptions.enabled ? startRunnerJobReconciler({ store, namespace: runnerReconcilerOptions.namespace, intervalMs: runnerReconcilerOptions.intervalMs, batchSize: runnerReconcilerOptions.batchSize, ...(runnerReconcilerOptions.kubectlCommand ? { kubectlCommand: runnerReconcilerOptions.kubectlCommand } : {}), onError: (error) => console.warn(JSON.stringify({ code: "runner-reconciler-failed", message: error instanceof Error ? error.message : String(error), valuesPrinted: false })) }) : null;
const server = createServer(async (req, res) => {
const traceId = `trc_${Date.now().toString(36)}_${Math.random().toString(36).slice(2, 8)}`;
try {
const method = req.method ?? "GET";
const url = new URL(req.url ?? "/", "http://agentrun.local");
assertManagerRequestAuthorized(req, url.pathname, auth);
const data = await route({ method, url, body: await readBody(req), store, sourceCommit, authSummary, ...(runnerJobDefaults ? { runnerJobDefaults } : {}), ...(sessionPvcDefaults ? { sessionPvcDefaults } : {}), ...(providerProfileDefaults ? { providerProfileDefaults } : {}), ...(toolCredentialDefaults ? { toolCredentialDefaults } : {}), ...(aipodSpecDir ? { aipodSpecDir } : {}) });
const data = await route({ method, url, body: await readBody(req), store, sourceCommit, authSummary, runnerReconcilerOptions, ...(runnerJobDefaults ? { runnerJobDefaults } : {}), ...(sessionPvcDefaults ? { sessionPvcDefaults } : {}), ...(providerProfileDefaults ? { providerProfileDefaults } : {}), ...(toolCredentialDefaults ? { toolCredentialDefaults } : {}), ...(aipodSpecDir ? { aipodSpecDir } : {}) });
writeJson(res, 200, { ok: true, data, traceId });
} catch (error) {
const agentError = normalizeError(error);
writeJson(res, agentError.httpStatus, { ok: false, failureKind: agentError.failureKind, message: agentError.message, traceId, error: errorToJson(error) });
}
});
server.on("close", () => { stopRunnerReconciler?.(); });
await new Promise<void>((resolve) => server.listen(options.port ?? 0, options.host ?? "127.0.0.1", resolve));
const address = server.address() as AddressInfo;
return { server, baseUrl: `http://${address.address}:${address.port}`, store };
@@ -405,7 +427,7 @@ function compactRecoveryActions(value: JsonValue | undefined): JsonValue[] {
});
}
async function route({ method, url, body, store, sourceCommit, authSummary, runnerJobDefaults, sessionPvcDefaults, providerProfileDefaults, toolCredentialDefaults, aipodSpecDir }: { method: string; url: URL; body: unknown; store: AgentRunStore; sourceCommit: string; authSummary?: JsonRecord; runnerJobDefaults?: NonNullable<ManagerServerOptions["runnerJobDefaults"]>; sessionPvcDefaults?: NonNullable<ManagerServerOptions["sessionPvcOptions"]>; providerProfileDefaults?: NonNullable<ManagerServerOptions["providerProfileOptions"]>; toolCredentialDefaults?: NonNullable<ManagerServerOptions["toolCredentialOptions"]>; aipodSpecDir?: string }): Promise<JsonValue> {
async function route({ method, url, body, store, sourceCommit, authSummary, runnerJobDefaults, sessionPvcDefaults, providerProfileDefaults, toolCredentialDefaults, runnerReconcilerOptions, aipodSpecDir }: { method: string; url: URL; body: unknown; store: AgentRunStore; sourceCommit: string; authSummary?: JsonRecord; runnerJobDefaults?: NonNullable<ManagerServerOptions["runnerJobDefaults"]>; sessionPvcDefaults?: NonNullable<ManagerServerOptions["sessionPvcOptions"]>; providerProfileDefaults?: NonNullable<ManagerServerOptions["providerProfileOptions"]>; toolCredentialDefaults?: NonNullable<ManagerServerOptions["toolCredentialOptions"]>; runnerReconcilerOptions?: RunnerReconcilerRuntimeOptions; aipodSpecDir?: string }): Promise<JsonValue> {
const path = url.pathname;
if (method === "GET" && (path === "/health" || path === "/health/live" || path === "/health/readiness")) {
const database = await store.health();
@@ -730,6 +752,12 @@ async function route({ method, url, body, store, sourceCommit, authSummary, runn
if (!job) throw new AgentRunError("schema-invalid", `runner job ${runnerJobId} was not found`, { httpStatus: 404 });
return runnerJobStatusSummary(job, await store.listEvents(runId, 0, 500)) as JsonValue;
}
if (method === "POST" && path === "/api/v1/reconciler/runner-jobs") {
const record = body === null ? {} : asRecord(body, "runnerReconciler");
const limit = Math.max(1, Math.min(numberField(record, "limit", runnerReconcilerOptions?.batchSize ?? 20), 500));
const namespace = optionalString(record.namespace) ?? runnerReconcilerOptions?.namespace ?? process.env.AGENTRUN_RUNTIME_NAMESPACE ?? "agentrun-v01";
return await reconcileRunnerJobsOnce({ store, namespace, limit, ...(runnerReconcilerOptions?.kubectlCommand ? { kubectlCommand: runnerReconcilerOptions.kubectlCommand } : {}) }) as JsonValue;
}
const commandShowMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/commands\/([^/]+)$/u);
if (method === "GET" && commandShowMatch) return await store.getCommand(commandShowMatch[2] ?? "") as unknown as JsonValue;
const commandResultMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/commands\/([^/]+)\/result$/u);
@@ -1104,6 +1132,12 @@ function booleanEnv(key: string, value: string): boolean {
throw new AgentRunError("schema-invalid", `${key} must be true or false`, { httpStatus: 500 });
}
function optionalBooleanEnv(key: string, value: unknown, fallback: boolean): boolean {
if (value === undefined || value === null || value === "") return fallback;
if (typeof value !== "string") throw new AgentRunError("schema-invalid", `${key} must be true or false`, { httpStatus: 500 });
return booleanEnv(key, value);
}
function normalizeError(error: unknown): AgentRunError {
if (error instanceof AgentRunError) return error;
return new AgentRunError("infra-failed", error instanceof Error ? error.message : String(error), { httpStatus: 500 });
+22
View File
@@ -28,8 +28,10 @@ export interface AgentRunStore {
listCommands(runId: string, afterSeq: number, limit: number): MaybePromise<CommandRecord[]>;
registerRunner(input: Partial<RunnerRecord>): MaybePromise<RunnerRecord>;
listRunnerJobs(runId: string, commandId?: string): MaybePromise<RunnerJobRecord[]>;
listRunnerJobsForReconciliation(limit: number): MaybePromise<RunnerJobRecord[]>;
getRunnerJobByIdempotencyKey(runId: string, idempotencyKey: string, payloadHash: string): MaybePromise<RunnerJobRecord | null>;
saveRunnerJob(input: SaveRunnerJobInput): MaybePromise<RunnerJobRecord>;
updateRunnerJobResult(runnerJobId: string, patch: JsonRecord): MaybePromise<RunnerJobRecord>;
claimRun(runId: string, runnerId: string, leaseMs: number): MaybePromise<RunRecord>;
heartbeat(runId: string, runnerId: string, leaseMs: number): MaybePromise<RunRecord>;
ackCommand(commandId: string): MaybePromise<CommandRecord>;
@@ -201,6 +203,18 @@ export class MemoryAgentRunStore implements AgentRunStore {
.sort((a, b) => a.createdAt.localeCompare(b.createdAt));
}
listRunnerJobsForReconciliation(limit: number): RunnerJobRecord[] {
const clamped = Math.max(1, Math.min(limit, 500));
return Array.from(this.runnerJobs.values())
.filter((job) => {
const run = this.runs.get(job.runId);
const command = this.commands.get(job.commandId);
return !run || !command || !isTerminalRunStatus(run.status) || !isTerminalCommandState(command.state);
})
.sort((a, b) => a.updatedAt.localeCompare(b.updatedAt) || a.createdAt.localeCompare(b.createdAt))
.slice(0, clamped);
}
getRunnerJobByIdempotencyKey(runId: string, idempotencyKey: string, payloadHash: string): RunnerJobRecord | null {
const existing = Array.from(this.runnerJobs.values()).find((job) => job.runId === runId && job.idempotencyKey === idempotencyKey);
if (!existing) return null;
@@ -219,6 +233,14 @@ export class MemoryAgentRunStore implements AgentRunStore {
return record;
}
updateRunnerJobResult(runnerJobId: string, patch: JsonRecord): RunnerJobRecord {
const existing = this.runnerJobs.get(runnerJobId);
if (!existing) throw new AgentRunError("schema-invalid", `runner job ${runnerJobId} was not found`, { httpStatus: 404 });
const next: RunnerJobRecord = { ...existing, result: { ...existing.result, ...patch }, updatedAt: nowIso() };
this.runnerJobs.set(runnerJobId, next);
return next;
}
claimRun(runId: string, runnerId: string, leaseMs: number): RunRecord {
const run = this.getRun(runId);
if (isTerminalRunStatus(run.status)) throw new AgentRunError(run.failureKind ?? (run.status === "cancelled" ? "cancelled" : "schema-invalid"), `run ${runId} is already terminal: ${run.status}`, { httpStatus: 409 });