From f2f04ac2cf94ca85c5bf94988c9683509dbc192a Mon Sep 17 00:00:00 2001 From: lyon Date: Sat, 20 Jun 2026 15:45:43 +0800 Subject: [PATCH] fix: reconcile runner job observations --- scripts/src/cli.ts | 16 ++ src/mgr/diagnosis.ts | 39 ++++- src/mgr/postgres-store.ts | 30 ++++ src/mgr/runner-job-status.ts | 4 + src/mgr/runner-reconciler.ts | 314 +++++++++++++++++++++++++++++++++++ src/mgr/server.ts | 38 ++++- src/mgr/store.ts | 22 +++ 7 files changed, 459 insertions(+), 4 deletions(-) create mode 100644 src/mgr/runner-reconciler.ts diff --git a/scripts/src/cli.ts b/scripts/src/cli.ts index e9ec195..34fc46f 100644 --- a/scripts/src/cli.ts +++ b/scripts/src/cli.ts @@ -148,6 +148,7 @@ async function dispatch(args: ParsedArgs): Promise { if (group === "runner" && command === "job") return renderRunnerJob(args); if (group === "runner" && command === "jobs") return listRunnerJobs(args); if (group === "runner" && command === "job-status") return showRunnerJobStatus(args); + if (group === "runner" && command === "reconcile") return reconcileRunnerJobs(args); throw new AgentRunError("schema-invalid", `unsupported command: ${args.positional.join(" ")}`, { httpStatus: 2 }); } @@ -1496,6 +1497,20 @@ async function showRunnerJobStatus(args: ParsedArgs): Promise { return client(args).get(`/api/v1/runs/${encodeURIComponent(runId)}/runner-jobs/${encodeURIComponent(runnerJobId)}`); } +async function reconcileRunnerJobs(args: ParsedArgs): Promise { + const body: JsonRecord = {}; + const limit = optionalFlag(args, "limit"); + const namespace = optionalFlag(args, "namespace"); + if (limit) { + const parsed = Number(limit); + if (!Number.isInteger(parsed) || parsed <= 0) throw new AgentRunError("schema-invalid", "runner reconcile --limit must be a positive integer", { httpStatus: 2 }); + body.limit = parsed; + } + if (namespace) body.namespace = namespace; + if (args.flags.get("dry-run") === true) return { action: "runner-job-reconcile", method: "POST", path: "/api/v1/reconciler/runner-jobs", body, valuesPrinted: false }; + return client(args).post("/api/v1/reconciler/runner-jobs", body); +} + async function renderRunnerJob(args: ParsedArgs): Promise { const runId = flag(args, "run-id", ""); const commandId = flag(args, "command-id", ""); @@ -2055,6 +2070,7 @@ function help(args: ParsedArgs, group?: string): JsonRecord { "runner job --dry-run --run-id --command-id --image ", "runner jobs --run-id [--command-id ]", "runner job-status [runnerJobId] --run-id ", + "runner reconcile [--limit ] [--namespace ] [--dry-run]", "queue submit --json-stdin|--json-file [--idempotency-key ] [--dry-run]", "queue submit --aipod [--prompt-stdin|--prompt-file |--prompt ] [--idempotency-key ] [--dry-run]", "queue list [--queue ] [--state ] [--cursor ] [--limit ] [--updated-after ] [--full|--raw]", diff --git a/src/mgr/diagnosis.ts b/src/mgr/diagnosis.ts index 660ff28..acabea4 100644 --- a/src/mgr/diagnosis.ts +++ b/src/mgr/diagnosis.ts @@ -20,7 +20,7 @@ export function runDiagnosis(input: RunDiagnosisInput): JsonRecord { const staleClaimed = input.run.status === "claimed" && booleanValue(lease.leaseExpired) === true; const terminalCommandOpenRun = input.run.status === "claimed" && input.terminalStatus !== null; const runnerJob = input.latestJob ? runnerJobReference(input.latestJob, input.events) : null; - const runnerLost = staleClaimed && (runnerJob === null || runnerJob.phase === "created" || runnerJob.phase === "recorded"); + const runnerLost = staleClaimed && (runnerJob === null || runnerJobPhaseIndicatesLost(runnerJob.phase)); const session = sessionReference(input.run); const providerEvidence = stringValue(input.terminalClassification?.providerEvidence) ?? "not-applicable"; const providerInterruption = stringValue(input.terminalClassification?.providerInterruption) ?? "not-established"; @@ -69,14 +69,17 @@ export function runnerJobDiagnosis(job: RunnerJobRecord, events: RunEvent[] = [] const observation = runnerJobObservation(job, events); const phase = stringValue(observation.phase) ?? "unknown"; const notStarted = phase === "created" || phase === "recorded"; + const runnerLostSuspected = notStarted || runnerJobPhaseIndicatesLost(phase); return { category: stringValue(observation.category) ?? (notStarted ? "runner-job-created" : phase.startsWith("terminal:") ? "runner-job-terminal" : "runner-job-observed"), - runnerLostSuspected: notStarted, + runnerLostSuspected, phase, evidenceLevel: stringValue(observation.evidenceLevel) ?? (notStarted ? "medium" : "high"), lastObservedSeq: numberValue(observation.lastObservedSeq), lastObservedAt: stringValue(observation.lastObservedAt), lastObservedKind: stringValue(observation.lastObservedKind), + terminalReportState: stringValue(observation.terminalReportState), + runReportState: stringValue(observation.runReportState), runId: job.runId, commandId: job.commandId, runnerJobId: job.id, @@ -131,6 +134,26 @@ export function runnerJobObservation(job: RunnerJobRecord, events: RunEvent[] = }; } + const reconcilerObservation = recordAt(job.result, "observation"); + const reconcilerPhase = stringValue(reconcilerObservation?.observedRunnerPhase) ?? stringValue(reconcilerObservation?.phase); + if (reconcilerObservation && reconcilerPhase) { + return { + phase: reconcilerPhase, + category: stringValue(reconcilerObservation.category) ?? "runner-job-observed", + terminalStatus: stringValue(reconcilerObservation.terminalStatus), + failureKind: stringValue(reconcilerObservation.failureKind), + startedAt: stringValue(recordAt(reconcilerObservation, "k8s")?.startTime), + finishedAt: stringValue(recordAt(reconcilerObservation, "k8s")?.completionTime), + lastObservedSeq: null, + lastObservedAt: stringValue(reconcilerObservation.lastK8sObservedAt) ?? stringValue(reconcilerObservation.lastObservedAt), + lastObservedKind: stringValue(reconcilerObservation.lastObservedKind) ?? `manager-reconciler:${reconcilerPhase}`, + terminalReportState: stringValue(reconcilerObservation.terminalReportState), + runReportState: stringValue(reconcilerObservation.runReportState), + evidenceLevel: stringValue(reconcilerObservation.evidenceLevel) ?? "high", + valuesPrinted: false, + }; + } + const created = recordAt(job.result, "kubernetes")?.created === true; return { phase: created ? "created" : "recorded", @@ -212,11 +235,23 @@ function runnerJobReference(job: RunnerJobRecord, events: RunEvent[]): JsonRecor lastObservedSeq: numberValue(observation.lastObservedSeq), lastObservedAt: stringValue(observation.lastObservedAt), lastObservedKind: stringValue(observation.lastObservedKind), + terminalReportState: stringValue(observation.terminalReportState), + runReportState: stringValue(observation.runReportState), logPath: stringValue(recordAt(job.result, "runner")?.logPath), valuesPrinted: false, }; } +function runnerJobPhaseIndicatesLost(value: JsonValue | undefined): boolean { + const phase = stringValue(value); + return phase === "created" + || phase === "recorded" + || phase === "k8s:failed" + || phase === "k8s:missing" + || phase === "k8s:succeeded" + || phase === "k8s:observe-failed"; +} + function sessionReference(run: RunRecord): JsonRecord { if (!run.sessionRef) return { sessionId: null, sessionRefNull: true, sessionPath: null, valuesPrinted: false }; return { sessionId: run.sessionRef.sessionId, sessionRefNull: false, sessionPath: `/api/v1/sessions/${run.sessionRef.sessionId}`, valuesPrinted: false }; diff --git a/src/mgr/postgres-store.ts b/src/mgr/postgres-store.ts index 4a50d76..2483192 100644 --- a/src/mgr/postgres-store.ts +++ b/src/mgr/postgres-store.ts @@ -550,6 +550,24 @@ CREATE TABLE IF NOT EXISTS agentrun_schema_migrations ( return result.rows.map(runnerJobFromRow); } + async listRunnerJobsForReconciliation(limit: number): Promise { + const clamped = Math.max(1, Math.min(limit, 500)); + const result = await this.pool.query( + `SELECT rj.* + FROM agentrun_runner_jobs rj + LEFT JOIN agentrun_runs r ON r.id = rj.run_id + LEFT JOIN agentrun_commands c ON c.id = rj.command_id + WHERE r.id IS NULL + OR c.id IS NULL + OR r.status NOT IN ('completed', 'failed', 'blocked', 'cancelled') + OR c.state NOT IN ('completed', 'failed', 'cancelled') + ORDER BY rj.updated_at ASC, rj.created_at ASC + LIMIT $1`, + [clamped], + ); + return result.rows.map(runnerJobFromRow); + } + async getRunnerJobByIdempotencyKey(runId: string, idempotencyKey: string, payloadHash: string): Promise { const result = await this.pool.query("SELECT * FROM agentrun_runner_jobs WHERE run_id = $1 AND idempotency_key = $2", [runId, idempotencyKey]); const row = result.rows[0]; @@ -582,6 +600,18 @@ CREATE TABLE IF NOT EXISTS agentrun_schema_migrations ( }); } + async updateRunnerJobResult(runnerJobId: string, patch: JsonRecord): Promise { + return this.withTransaction(async (client) => { + const existing = await client.query("SELECT * FROM agentrun_runner_jobs WHERE id = $1 FOR UPDATE", [runnerJobId]); + const row = existing.rows[0]; + if (!row) throw new AgentRunError("schema-invalid", `runner job ${runnerJobId} was not found`, { httpStatus: 404 }); + const record = runnerJobFromRow(row); + const nextResult = { ...record.result, ...patch }; + const updated = await client.query("UPDATE agentrun_runner_jobs SET result = $2::jsonb, updated_at = $3 WHERE id = $1 RETURNING *", [runnerJobId, JSON.stringify(nextResult), nowIso()]); + return runnerJobFromRow(updated.rows[0]); + }); + } + async claimRun(runId: string, runnerId: string, leaseMs: number): Promise { return this.withTransaction(async (client) => { const run = await this.requireRunForUpdate(client, runId); diff --git a/src/mgr/runner-job-status.ts b/src/mgr/runner-job-status.ts index dfb840d..a518fee 100644 --- a/src/mgr/runner-job-status.ts +++ b/src/mgr/runner-job-status.ts @@ -8,6 +8,7 @@ export function runnerJobStatusSummary(job: RunnerJobRecord, events: RunEvent[] const kubernetes = recordAt(job.result, "kubernetes"); const retention = recordAt(job.result, "retention"); const envImage = recordAt(job.result, "envImage"); + const reconcilerObservation = recordAt(job.result, "observation"); const terminalStatus = observation.terminalStatus; return { id: job.id, @@ -31,6 +32,9 @@ export function runnerJobStatusSummary(job: RunnerJobRecord, events: RunEvent[] lastObservedSeq: typeof observation.lastObservedSeq === "number" ? observation.lastObservedSeq : null, lastObservedAt: typeof observation.lastObservedAt === "string" ? observation.lastObservedAt : null, lastObservedKind: typeof observation.lastObservedKind === "string" ? observation.lastObservedKind : null, + terminalReportState: typeof observation.terminalReportState === "string" ? observation.terminalReportState : null, + runReportState: typeof observation.runReportState === "string" ? observation.runReportState : null, + reconcilerObservation, jobIdentity, podIdentity: recordAt(job.result, "podIdentity"), logPath: typeof runner.logPath === "string" ? runner.logPath : null, diff --git a/src/mgr/runner-reconciler.ts b/src/mgr/runner-reconciler.ts new file mode 100644 index 0000000..fa33a70 --- /dev/null +++ b/src/mgr/runner-reconciler.ts @@ -0,0 +1,314 @@ +import { spawn } from "node:child_process"; +import { AgentRunError } from "../common/errors.js"; +import { redactJson, redactText } from "../common/redaction.js"; +import type { CommandRecord, FailureKind, JsonRecord, JsonValue, RunnerJobRecord, TerminalStatus } from "../common/types.js"; +import { nowIso, stableHash } from "../common/validation.js"; +import type { AgentRunStore } from "./store.js"; +import { isTerminalCommandState, isTerminalRunStatus } from "./store.js"; + +export interface RunnerReconcilerOptions { + store: AgentRunStore; + namespace?: string; + kubectlCommand?: string; + limit?: number; +} + +export interface RunnerReconcilerLoopOptions extends RunnerReconcilerOptions { + batchSize: number; + intervalMs: number; + onError?: (error: unknown) => void; +} + +interface K8sObject { + kind?: string; + metadata?: { + name?: string; + namespace?: string; + uid?: string; + creationTimestamp?: string; + labels?: Record; + annotations?: Record; + ownerReferences?: Array<{ kind?: string; name?: string }>; + }; + status?: JsonRecord; +} + +interface K8sList { + items?: K8sObject[]; +} + +export async function reconcileRunnerJobsOnce(input: RunnerReconcilerOptions): Promise { + const limit = clampLimit(input.limit ?? 20); + const jobs = await input.store.listRunnerJobsForReconciliation(limit); + const items: JsonRecord[] = []; + let observeFailedCount = 0; + let runClosureCount = 0; + + for (const job of jobs) { + const observation = await observeRunnerJob(job, { ...(input.namespace ? { namespace: input.namespace } : {}), ...(input.kubectlCommand ? { kubectlCommand: input.kubectlCommand } : {}) }); + await input.store.updateRunnerJobResult(job.id, { observation }); + if (stringValue(observation.category) === "runner-job-observe-failed") observeFailedCount++; + const runClosure = await closeOpenRunWhenCommandTerminal(input.store, job); + if (runClosure.closed === true) runClosureCount++; + items.push({ + runnerJobId: job.id, + runId: job.runId, + commandId: job.commandId, + namespace: stringValue(observation.namespace) ?? job.namespace, + jobName: job.jobName, + observedRunnerPhase: stringValue(observation.observedRunnerPhase) ?? "unknown", + terminalReportState: stringValue(observation.terminalReportState) ?? "unknown", + runReportState: stringValue(observation.runReportState) ?? "unknown", + runClosure, + valuesPrinted: false, + }); + } + + return { + action: "runner-job-reconcile", + reconciledAt: nowIso(), + limit, + scannedCount: jobs.length, + updatedCount: jobs.length, + observeFailedCount, + runClosureCount, + items, + valuesPrinted: false, + }; +} + +export function startRunnerJobReconciler(input: RunnerReconcilerLoopOptions): () => void { + const intervalMs = Math.max(1_000, input.intervalMs); + let stopped = false; + let running = false; + const tick = async (): Promise => { + if (stopped || running) return; + running = true; + try { + await reconcileRunnerJobsOnce({ store: input.store, limit: input.batchSize, ...(input.namespace ? { namespace: input.namespace } : {}), ...(input.kubectlCommand ? { kubectlCommand: input.kubectlCommand } : {}) }); + } catch (error) { + input.onError?.(error); + } finally { + running = false; + } + }; + const timer = setInterval(() => { void tick(); }, intervalMs); + timer.unref?.(); + void tick(); + return () => { + stopped = true; + clearInterval(timer); + }; +} + +async function observeRunnerJob(job: RunnerJobRecord, input: { namespace?: string; kubectlCommand?: string }): Promise { + const namespace = input.namespace ?? job.namespace; + const kubectlCommand = input.kubectlCommand ?? "kubectl"; + const observedAt = nowIso(); + try { + const [jobObject, podObjects] = await Promise.all([ + getK8sObject(kubectlCommand, "job", namespace, job.jobName), + getK8sList(kubectlCommand, "pods", namespace, ["-l", `job-name=${job.jobName}`]), + ]); + return observationFromObjects(job, namespace, observedAt, jobObject, podObjects); + } catch (error) { + return { + source: "manager-reconciler", + namespace, + jobName: job.jobName, + observedRunnerPhase: "k8s:observe-failed", + category: "runner-job-observe-failed", + terminalStatus: null, + failureKind: "infra-failed", + failureMessage: error instanceof Error ? redactText(error.message).slice(0, 300) : "unknown observation failure", + terminalReportState: "k8s-observation-failed", + runReportState: "not-updated", + lastK8sObservedAt: observedAt, + evidenceLevel: "medium", + valuesPrinted: false, + }; + } +} + +function observationFromObjects(job: RunnerJobRecord, namespace: string, observedAt: string, jobObject: K8sObject | null, podObjects: K8sObject[]): JsonRecord { + const phase = observedRunnerPhase(jobObject, podObjects); + const terminalReportState = terminalReportStateForPhase(phase); + const failureKind: FailureKind | null = phase === "k8s:failed" || phase === "k8s:missing" ? "infra-failed" : null; + const k8s = k8sSummary(jobObject, podObjects); + return { + source: "manager-reconciler", + namespace, + jobName: job.jobName, + observedRunnerPhase: phase, + phase, + category: categoryForPhase(phase), + terminalStatus: null, + failureKind, + terminalReportState, + runReportState: "not-updated", + lastK8sObservedAt: observedAt, + lastObservedAt: observedAt, + lastObservedKind: `manager-reconciler:${phase}`, + evidenceLevel: phase === "k8s:unknown" ? "medium" : "high", + k8s, + valuesPrinted: false, + }; +} + +async function closeOpenRunWhenCommandTerminal(store: AgentRunStore, job: RunnerJobRecord): Promise { + let command: CommandRecord; + try { + command = await store.getCommand(job.commandId); + } catch { + return { closed: false, state: "command-missing", valuesPrinted: false }; + } + if (!isTerminalCommandState(command.state)) return { closed: false, state: "command-open", commandState: command.state, valuesPrinted: false }; + const run = await store.getRun(job.runId); + if (isTerminalRunStatus(run.status)) return { closed: false, state: "run-terminal", runStatus: run.status, commandState: command.state, valuesPrinted: false }; + const terminalStatus = terminalStatusFromCommand(command); + const failureKind: FailureKind | null = terminalStatus === "cancelled" ? "cancelled" : terminalStatus === "failed" ? "infra-failed" : null; + const failureMessage = terminalStatus === "completed" ? null : "manager reconciler closed open run after terminal command state"; + const next = await store.finishRun(run.id, { terminalStatus, failureKind, failureMessage }); + return { closed: true, state: "closed-open-run", runStatus: next.status, terminalStatus, commandState: command.state, valuesPrinted: false }; +} + +function terminalStatusFromCommand(command: CommandRecord): TerminalStatus { + if (command.state === "completed") return "completed"; + if (command.state === "cancelled") return "cancelled"; + return "failed"; +} + +function observedRunnerPhase(jobObject: K8sObject | null, pods: K8sObject[]): string { + if (!jobObject && pods.length === 0) return "k8s:missing"; + const condition = jobCondition(jobObject); + if (condition === "Complete") return "k8s:succeeded"; + if (condition === "Failed") return "k8s:failed"; + const active = numberPath(jobObject, ["status", "active"]); + const succeeded = numberPath(jobObject, ["status", "succeeded"]); + const failed = numberPath(jobObject, ["status", "failed"]); + const podPhases = pods.map((pod) => stringPath(pod, ["status", "phase"])).filter((phase): phase is string => Boolean(phase)); + if ((succeeded ?? 0) > 0 || podPhases.some((phase) => phase === "Succeeded")) return "k8s:succeeded"; + if ((failed ?? 0) > 0 || podPhases.some((phase) => phase === "Failed")) return "k8s:failed"; + if ((active ?? 0) > 0 || podPhases.some((phase) => phase === "Running")) return "k8s:running"; + if (podPhases.some((phase) => phase === "Pending")) return "k8s:pending"; + return "k8s:unknown"; +} + +function categoryForPhase(phase: string): string { + if (phase === "k8s:succeeded") return "runner-job-k8s-succeeded"; + if (phase === "k8s:failed") return "runner-job-k8s-failed"; + if (phase === "k8s:missing") return "runner-job-k8s-missing"; + if (phase === "k8s:running" || phase === "k8s:pending") return "runner-job-running"; + return "runner-job-observed"; +} + +function terminalReportStateForPhase(phase: string): string { + if (phase === "k8s:succeeded" || phase === "k8s:failed") return "terminal-runner-report-missing"; + if (phase === "k8s:missing") return "runner-resource-missing"; + return "not-terminal"; +} + +function k8sSummary(jobObject: K8sObject | null, pods: K8sObject[]): JsonRecord { + const podPhases = pods.map((pod) => stringPath(pod, ["status", "phase"]) ?? "unknown"); + return { + jobPresent: jobObject !== null, + jobUid: stringPath(jobObject, ["metadata", "uid"]), + condition: jobCondition(jobObject), + active: numberPath(jobObject, ["status", "active"]), + succeeded: numberPath(jobObject, ["status", "succeeded"]), + failed: numberPath(jobObject, ["status", "failed"]), + startTime: stringPath(jobObject, ["status", "startTime"]), + completionTime: stringPath(jobObject, ["status", "completionTime"]), + podCount: pods.length, + podPhases, + newestPodName: newestPodName(pods), + valuesPrinted: false, + }; +} + +function jobCondition(jobObject: K8sObject | null): string | null { + const conditions = jobObject?.status?.conditions; + if (!Array.isArray(conditions)) return null; + for (const condition of [...conditions].reverse()) { + if (typeof condition !== "object" || condition === null || Array.isArray(condition)) continue; + const record = condition as JsonRecord; + if ((record.type === "Complete" || record.type === "Failed") && record.status === "True") return record.type; + } + return null; +} + +function newestPodName(pods: K8sObject[]): string | null { + const sorted = [...pods].sort((left, right) => (stringPath(right, ["metadata", "creationTimestamp"]) ?? "").localeCompare(stringPath(left, ["metadata", "creationTimestamp"]) ?? "")); + return stringPath(sorted[0] ?? null, ["metadata", "name"]); +} + +async function getK8sObject(kubectlCommand: string, resource: string, namespace: string, name: string): Promise { + const result = await kubectlRun(kubectlCommand, ["get", resource, name, "-n", namespace, "-o", "json"]); + if (result.code === 0) return parseJsonObject(result.stdout, `kubectl get ${resource}`) as K8sObject; + if (isNotFound(result.stderr)) return null; + throw new AgentRunError("infra-failed", `kubectl get ${resource} failed with code ${result.code}`, { httpStatus: 502, details: redactJson({ stderr: redactText(result.stderr.slice(-2000)), stdout: redactText(result.stdout.slice(-1000)), valuesPrinted: false }) }); +} + +async function getK8sList(kubectlCommand: string, resource: string, namespace: string, extraArgs: string[]): Promise { + const result = await kubectlRun(kubectlCommand, ["get", resource, "-n", namespace, ...extraArgs, "-o", "json"]); + if (result.code !== 0) throw new AgentRunError("infra-failed", `kubectl get ${resource} failed with code ${result.code}`, { httpStatus: 502, details: redactJson({ stderr: redactText(result.stderr.slice(-2000)), stdout: redactText(result.stdout.slice(-1000)), valuesPrinted: false }) }); + const parsed = parseJsonObject(result.stdout, `kubectl get ${resource}`) as K8sList; + return Array.isArray(parsed.items) ? parsed.items.filter((item) => typeof item === "object" && item !== null && !Array.isArray(item)) : []; +} + +async function kubectlRun(kubectlCommand: string, args: string[]): Promise<{ code: number | null; signal: NodeJS.Signals | null; stdout: string; stderr: string }> { + const child = spawn(kubectlCommand, args, { stdio: ["ignore", "pipe", "pipe"] }); + let stdout = ""; + let stderr = ""; + child.stdout.setEncoding("utf8"); + child.stderr.setEncoding("utf8"); + child.stdout.on("data", (chunk) => { stdout += String(chunk); }); + child.stderr.on("data", (chunk) => { stderr += String(chunk); }); + const result = await new Promise<{ code: number | null; signal: NodeJS.Signals | null }>((resolve, reject) => { + child.on("error", reject); + child.on("close", (code, signal) => resolve({ code, signal })); + }).catch((error: unknown) => { + throw new AgentRunError("infra-failed", `failed to start kubectl: ${error instanceof Error ? error.message : String(error)}`, { httpStatus: 503 }); + }); + return { ...result, stdout, stderr }; +} + +function parseJsonObject(text: string, label: string): JsonRecord { + try { + const parsed = JSON.parse(text) as JsonValue; + if (typeof parsed === "object" && parsed !== null && !Array.isArray(parsed)) return parsed; + } catch (error) { + throw new AgentRunError("infra-failed", `${label} returned invalid JSON: ${error instanceof Error ? error.message : String(error)}`, { httpStatus: 502, details: { stdoutHash: stableHash(text), valuesPrinted: false } }); + } + throw new AgentRunError("infra-failed", `${label} returned non-object JSON`, { httpStatus: 502, details: { stdoutHash: stableHash(text), valuesPrinted: false } }); +} + +function isNotFound(stderr: string): boolean { + return /notfound|not found|notfound/i.test(stderr.replace(/\s+/gu, "")); +} + +function clampLimit(limit: number): number { + return Math.max(1, Math.min(Math.floor(limit), 500)); +} + +function stringValue(value: JsonValue | undefined): string | null { + return typeof value === "string" && value.length > 0 ? value : null; +} + +function stringPath(value: unknown, path: string[]): string | null { + let current: unknown = value; + for (const key of path) { + if (typeof current !== "object" || current === null || Array.isArray(current)) return null; + current = (current as Record)[key]; + } + return typeof current === "string" ? current : null; +} + +function numberPath(value: unknown, path: string[]): number | null { + let current: unknown = value; + for (const key of path) { + if (typeof current !== "object" || current === null || Array.isArray(current)) return null; + current = (current as Record)[key]; + } + return typeof current === "number" && Number.isFinite(current) ? current : null; +} diff --git a/src/mgr/server.ts b/src/mgr/server.ts index 5e902f5..a881853 100644 --- a/src/mgr/server.ts +++ b/src/mgr/server.ts @@ -12,6 +12,7 @@ import type { RunnerRetentionOptions } from "./runner-retention.js"; import { dispatchQueueTask, refreshQueueTaskFromCore } from "./queue-dispatch.js"; import { buildRunResult } from "./result.js"; import { runnerJobStatusSummary } from "./runner-job-status.js"; +import { reconcileRunnerJobsOnce, startRunnerJobReconciler } from "./runner-reconciler.js"; import { createSessionPvc, deleteSessionPvc, getSessionPvcSummary, refreshSessionPvcSummary, runSessionStorageGc } from "./session-pvc.js"; import type { SessionPvcSummary } from "./session-pvc.js"; import type { SessionPvcOptions } from "./session-pvc.js"; @@ -84,6 +85,15 @@ function runnerRetentionOptionsForRequest(defaults: RunnerRetentionOptions | und }; } +function runnerReconcilerOptionsForRuntime(defaults: ManagerServerOptions["runnerReconcilerOptions"] | undefined, runnerJobDefaults: ManagerServerOptions["runnerJobDefaults"] | undefined): RunnerReconcilerRuntimeOptions { + const enabled = defaults?.enabled ?? optionalBooleanEnv("AGENTRUN_MANAGER_RECONCILER_ENABLED", process.env.AGENTRUN_MANAGER_RECONCILER_ENABLED, false); + const namespace = defaults?.namespace ?? runnerJobDefaults?.namespace ?? process.env.AGENTRUN_RUNTIME_NAMESPACE ?? "agentrun-v01"; + const intervalMs = defaults?.intervalMs ?? optionalPositiveInteger("AGENTRUN_MANAGER_RECONCILER_INTERVAL_MS", process.env.AGENTRUN_MANAGER_RECONCILER_INTERVAL_MS) ?? 30_000; + const batchSize = defaults?.batchSize ?? optionalPositiveInteger("AGENTRUN_MANAGER_RECONCILER_BATCH_SIZE", process.env.AGENTRUN_MANAGER_RECONCILER_BATCH_SIZE) ?? 20; + const kubectlCommand = defaults?.kubectlCommand ?? runnerJobDefaults?.kubectlCommand; + return { enabled, namespace, intervalMs, batchSize, ...(kubectlCommand ? { kubectlCommand } : {}) }; +} + export interface ManagerServerOptions { store?: AgentRunStore; port?: number; @@ -107,10 +117,19 @@ export interface ManagerServerOptions { sessionPvcOptions?: { kubectlHandler?: import("./session-pvc.js").KubectlHandler; kubectlCommand?: string; storageClassName?: string; size?: string }; providerProfileOptions?: { namespace?: string; kubectlCommand?: string }; toolCredentialOptions?: { namespace?: string; kubectlCommand?: string }; + runnerReconcilerOptions?: { enabled?: boolean; namespace?: string; kubectlCommand?: string; intervalMs?: number; batchSize?: number }; aipodSpecDir?: string; auth?: ManagerAuthConfig; } +interface RunnerReconcilerRuntimeOptions { + enabled: boolean; + namespace: string; + intervalMs: number; + batchSize: number; + kubectlCommand?: string; +} + export interface StartedManagerServer { server: Server; baseUrl: string; @@ -124,22 +143,25 @@ export async function startManagerServer(options: ManagerServerOptions = {}): Pr const sessionPvcDefaults = options.sessionPvcOptions; const providerProfileDefaults = options.providerProfileOptions; const toolCredentialDefaults = options.toolCredentialOptions; + const runnerReconcilerOptions = runnerReconcilerOptionsForRuntime(options.runnerReconcilerOptions, runnerJobDefaults); const aipodSpecDir = options.aipodSpecDir ?? process.env.AGENTRUN_AIPOD_SPEC_DIR; const auth = options.auth ?? managerServerAuthConfigFromEnv(); const authSummary = managerAuthSummary(auth); + const stopRunnerReconciler = runnerReconcilerOptions.enabled ? startRunnerJobReconciler({ store, namespace: runnerReconcilerOptions.namespace, intervalMs: runnerReconcilerOptions.intervalMs, batchSize: runnerReconcilerOptions.batchSize, ...(runnerReconcilerOptions.kubectlCommand ? { kubectlCommand: runnerReconcilerOptions.kubectlCommand } : {}), onError: (error) => console.warn(JSON.stringify({ code: "runner-reconciler-failed", message: error instanceof Error ? error.message : String(error), valuesPrinted: false })) }) : null; const server = createServer(async (req, res) => { const traceId = `trc_${Date.now().toString(36)}_${Math.random().toString(36).slice(2, 8)}`; try { const method = req.method ?? "GET"; const url = new URL(req.url ?? "/", "http://agentrun.local"); assertManagerRequestAuthorized(req, url.pathname, auth); - const data = await route({ method, url, body: await readBody(req), store, sourceCommit, authSummary, ...(runnerJobDefaults ? { runnerJobDefaults } : {}), ...(sessionPvcDefaults ? { sessionPvcDefaults } : {}), ...(providerProfileDefaults ? { providerProfileDefaults } : {}), ...(toolCredentialDefaults ? { toolCredentialDefaults } : {}), ...(aipodSpecDir ? { aipodSpecDir } : {}) }); + const data = await route({ method, url, body: await readBody(req), store, sourceCommit, authSummary, runnerReconcilerOptions, ...(runnerJobDefaults ? { runnerJobDefaults } : {}), ...(sessionPvcDefaults ? { sessionPvcDefaults } : {}), ...(providerProfileDefaults ? { providerProfileDefaults } : {}), ...(toolCredentialDefaults ? { toolCredentialDefaults } : {}), ...(aipodSpecDir ? { aipodSpecDir } : {}) }); writeJson(res, 200, { ok: true, data, traceId }); } catch (error) { const agentError = normalizeError(error); writeJson(res, agentError.httpStatus, { ok: false, failureKind: agentError.failureKind, message: agentError.message, traceId, error: errorToJson(error) }); } }); + server.on("close", () => { stopRunnerReconciler?.(); }); await new Promise((resolve) => server.listen(options.port ?? 0, options.host ?? "127.0.0.1", resolve)); const address = server.address() as AddressInfo; return { server, baseUrl: `http://${address.address}:${address.port}`, store }; @@ -405,7 +427,7 @@ function compactRecoveryActions(value: JsonValue | undefined): JsonValue[] { }); } -async function route({ method, url, body, store, sourceCommit, authSummary, runnerJobDefaults, sessionPvcDefaults, providerProfileDefaults, toolCredentialDefaults, aipodSpecDir }: { method: string; url: URL; body: unknown; store: AgentRunStore; sourceCommit: string; authSummary?: JsonRecord; runnerJobDefaults?: NonNullable; sessionPvcDefaults?: NonNullable; providerProfileDefaults?: NonNullable; toolCredentialDefaults?: NonNullable; aipodSpecDir?: string }): Promise { +async function route({ method, url, body, store, sourceCommit, authSummary, runnerJobDefaults, sessionPvcDefaults, providerProfileDefaults, toolCredentialDefaults, runnerReconcilerOptions, aipodSpecDir }: { method: string; url: URL; body: unknown; store: AgentRunStore; sourceCommit: string; authSummary?: JsonRecord; runnerJobDefaults?: NonNullable; sessionPvcDefaults?: NonNullable; providerProfileDefaults?: NonNullable; toolCredentialDefaults?: NonNullable; runnerReconcilerOptions?: RunnerReconcilerRuntimeOptions; aipodSpecDir?: string }): Promise { const path = url.pathname; if (method === "GET" && (path === "/health" || path === "/health/live" || path === "/health/readiness")) { const database = await store.health(); @@ -730,6 +752,12 @@ async function route({ method, url, body, store, sourceCommit, authSummary, runn if (!job) throw new AgentRunError("schema-invalid", `runner job ${runnerJobId} was not found`, { httpStatus: 404 }); return runnerJobStatusSummary(job, await store.listEvents(runId, 0, 500)) as JsonValue; } + if (method === "POST" && path === "/api/v1/reconciler/runner-jobs") { + const record = body === null ? {} : asRecord(body, "runnerReconciler"); + const limit = Math.max(1, Math.min(numberField(record, "limit", runnerReconcilerOptions?.batchSize ?? 20), 500)); + const namespace = optionalString(record.namespace) ?? runnerReconcilerOptions?.namespace ?? process.env.AGENTRUN_RUNTIME_NAMESPACE ?? "agentrun-v01"; + return await reconcileRunnerJobsOnce({ store, namespace, limit, ...(runnerReconcilerOptions?.kubectlCommand ? { kubectlCommand: runnerReconcilerOptions.kubectlCommand } : {}) }) as JsonValue; + } const commandShowMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/commands\/([^/]+)$/u); if (method === "GET" && commandShowMatch) return await store.getCommand(commandShowMatch[2] ?? "") as unknown as JsonValue; const commandResultMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/commands\/([^/]+)\/result$/u); @@ -1104,6 +1132,12 @@ function booleanEnv(key: string, value: string): boolean { throw new AgentRunError("schema-invalid", `${key} must be true or false`, { httpStatus: 500 }); } +function optionalBooleanEnv(key: string, value: unknown, fallback: boolean): boolean { + if (value === undefined || value === null || value === "") return fallback; + if (typeof value !== "string") throw new AgentRunError("schema-invalid", `${key} must be true or false`, { httpStatus: 500 }); + return booleanEnv(key, value); +} + function normalizeError(error: unknown): AgentRunError { if (error instanceof AgentRunError) return error; return new AgentRunError("infra-failed", error instanceof Error ? error.message : String(error), { httpStatus: 500 }); diff --git a/src/mgr/store.ts b/src/mgr/store.ts index f0c3feb..6d91f0b 100644 --- a/src/mgr/store.ts +++ b/src/mgr/store.ts @@ -28,8 +28,10 @@ export interface AgentRunStore { listCommands(runId: string, afterSeq: number, limit: number): MaybePromise; registerRunner(input: Partial): MaybePromise; listRunnerJobs(runId: string, commandId?: string): MaybePromise; + listRunnerJobsForReconciliation(limit: number): MaybePromise; getRunnerJobByIdempotencyKey(runId: string, idempotencyKey: string, payloadHash: string): MaybePromise; saveRunnerJob(input: SaveRunnerJobInput): MaybePromise; + updateRunnerJobResult(runnerJobId: string, patch: JsonRecord): MaybePromise; claimRun(runId: string, runnerId: string, leaseMs: number): MaybePromise; heartbeat(runId: string, runnerId: string, leaseMs: number): MaybePromise; ackCommand(commandId: string): MaybePromise; @@ -201,6 +203,18 @@ export class MemoryAgentRunStore implements AgentRunStore { .sort((a, b) => a.createdAt.localeCompare(b.createdAt)); } + listRunnerJobsForReconciliation(limit: number): RunnerJobRecord[] { + const clamped = Math.max(1, Math.min(limit, 500)); + return Array.from(this.runnerJobs.values()) + .filter((job) => { + const run = this.runs.get(job.runId); + const command = this.commands.get(job.commandId); + return !run || !command || !isTerminalRunStatus(run.status) || !isTerminalCommandState(command.state); + }) + .sort((a, b) => a.updatedAt.localeCompare(b.updatedAt) || a.createdAt.localeCompare(b.createdAt)) + .slice(0, clamped); + } + getRunnerJobByIdempotencyKey(runId: string, idempotencyKey: string, payloadHash: string): RunnerJobRecord | null { const existing = Array.from(this.runnerJobs.values()).find((job) => job.runId === runId && job.idempotencyKey === idempotencyKey); if (!existing) return null; @@ -219,6 +233,14 @@ export class MemoryAgentRunStore implements AgentRunStore { return record; } + updateRunnerJobResult(runnerJobId: string, patch: JsonRecord): RunnerJobRecord { + const existing = this.runnerJobs.get(runnerJobId); + if (!existing) throw new AgentRunError("schema-invalid", `runner job ${runnerJobId} was not found`, { httpStatus: 404 }); + const next: RunnerJobRecord = { ...existing, result: { ...existing.result, ...patch }, updatedAt: nowIso() }; + this.runnerJobs.set(runnerJobId, next); + return next; + } + claimRun(runId: string, runnerId: string, leaseMs: number): RunRecord { const run = this.getRun(runId); if (isTerminalRunStatus(run.status)) throw new AgentRunError(run.failureKind ?? (run.status === "cancelled" ? "cancelled" : "schema-invalid"), `run ${runId} is already terminal: ${run.status}`, { httpStatus: 409 });