fix: reconcile runner job observations
This commit is contained in:
@@ -148,6 +148,7 @@ async function dispatch(args: ParsedArgs): Promise<CliResult> {
|
|||||||
if (group === "runner" && command === "job") return renderRunnerJob(args);
|
if (group === "runner" && command === "job") return renderRunnerJob(args);
|
||||||
if (group === "runner" && command === "jobs") return listRunnerJobs(args);
|
if (group === "runner" && command === "jobs") return listRunnerJobs(args);
|
||||||
if (group === "runner" && command === "job-status") return showRunnerJobStatus(args);
|
if (group === "runner" && command === "job-status") return showRunnerJobStatus(args);
|
||||||
|
if (group === "runner" && command === "reconcile") return reconcileRunnerJobs(args);
|
||||||
throw new AgentRunError("schema-invalid", `unsupported command: ${args.positional.join(" ")}`, { httpStatus: 2 });
|
throw new AgentRunError("schema-invalid", `unsupported command: ${args.positional.join(" ")}`, { httpStatus: 2 });
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1496,6 +1497,20 @@ async function showRunnerJobStatus(args: ParsedArgs): Promise<JsonValue> {
|
|||||||
return client(args).get(`/api/v1/runs/${encodeURIComponent(runId)}/runner-jobs/${encodeURIComponent(runnerJobId)}`);
|
return client(args).get(`/api/v1/runs/${encodeURIComponent(runId)}/runner-jobs/${encodeURIComponent(runnerJobId)}`);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async function reconcileRunnerJobs(args: ParsedArgs): Promise<JsonValue> {
|
||||||
|
const body: JsonRecord = {};
|
||||||
|
const limit = optionalFlag(args, "limit");
|
||||||
|
const namespace = optionalFlag(args, "namespace");
|
||||||
|
if (limit) {
|
||||||
|
const parsed = Number(limit);
|
||||||
|
if (!Number.isInteger(parsed) || parsed <= 0) throw new AgentRunError("schema-invalid", "runner reconcile --limit must be a positive integer", { httpStatus: 2 });
|
||||||
|
body.limit = parsed;
|
||||||
|
}
|
||||||
|
if (namespace) body.namespace = namespace;
|
||||||
|
if (args.flags.get("dry-run") === true) return { action: "runner-job-reconcile", method: "POST", path: "/api/v1/reconciler/runner-jobs", body, valuesPrinted: false };
|
||||||
|
return client(args).post("/api/v1/reconciler/runner-jobs", body);
|
||||||
|
}
|
||||||
|
|
||||||
async function renderRunnerJob(args: ParsedArgs): Promise<JsonRecord> {
|
async function renderRunnerJob(args: ParsedArgs): Promise<JsonRecord> {
|
||||||
const runId = flag(args, "run-id", "");
|
const runId = flag(args, "run-id", "");
|
||||||
const commandId = flag(args, "command-id", "");
|
const commandId = flag(args, "command-id", "");
|
||||||
@@ -2055,6 +2070,7 @@ function help(args: ParsedArgs, group?: string): JsonRecord {
|
|||||||
"runner job --dry-run --run-id <runId> --command-id <commandId> --image <image>",
|
"runner job --dry-run --run-id <runId> --command-id <commandId> --image <image>",
|
||||||
"runner jobs --run-id <runId> [--command-id <commandId>]",
|
"runner jobs --run-id <runId> [--command-id <commandId>]",
|
||||||
"runner job-status [runnerJobId] --run-id <runId>",
|
"runner job-status [runnerJobId] --run-id <runId>",
|
||||||
|
"runner reconcile [--limit <n>] [--namespace <namespace>] [--dry-run]",
|
||||||
"queue submit --json-stdin|--json-file <task.json> [--idempotency-key <key>] [--dry-run]",
|
"queue submit --json-stdin|--json-file <task.json> [--idempotency-key <key>] [--dry-run]",
|
||||||
"queue submit --aipod <name> [--prompt-stdin|--prompt-file <file>|--prompt <text>] [--idempotency-key <key>] [--dry-run]",
|
"queue submit --aipod <name> [--prompt-stdin|--prompt-file <file>|--prompt <text>] [--idempotency-key <key>] [--dry-run]",
|
||||||
"queue list [--queue <queue>] [--state <state>] [--cursor <cursor>] [--limit <limit>] [--updated-after <version>] [--full|--raw]",
|
"queue list [--queue <queue>] [--state <state>] [--cursor <cursor>] [--limit <limit>] [--updated-after <version>] [--full|--raw]",
|
||||||
|
|||||||
+37
-2
@@ -20,7 +20,7 @@ export function runDiagnosis(input: RunDiagnosisInput): JsonRecord {
|
|||||||
const staleClaimed = input.run.status === "claimed" && booleanValue(lease.leaseExpired) === true;
|
const staleClaimed = input.run.status === "claimed" && booleanValue(lease.leaseExpired) === true;
|
||||||
const terminalCommandOpenRun = input.run.status === "claimed" && input.terminalStatus !== null;
|
const terminalCommandOpenRun = input.run.status === "claimed" && input.terminalStatus !== null;
|
||||||
const runnerJob = input.latestJob ? runnerJobReference(input.latestJob, input.events) : null;
|
const runnerJob = input.latestJob ? runnerJobReference(input.latestJob, input.events) : null;
|
||||||
const runnerLost = staleClaimed && (runnerJob === null || runnerJob.phase === "created" || runnerJob.phase === "recorded");
|
const runnerLost = staleClaimed && (runnerJob === null || runnerJobPhaseIndicatesLost(runnerJob.phase));
|
||||||
const session = sessionReference(input.run);
|
const session = sessionReference(input.run);
|
||||||
const providerEvidence = stringValue(input.terminalClassification?.providerEvidence) ?? "not-applicable";
|
const providerEvidence = stringValue(input.terminalClassification?.providerEvidence) ?? "not-applicable";
|
||||||
const providerInterruption = stringValue(input.terminalClassification?.providerInterruption) ?? "not-established";
|
const providerInterruption = stringValue(input.terminalClassification?.providerInterruption) ?? "not-established";
|
||||||
@@ -69,14 +69,17 @@ export function runnerJobDiagnosis(job: RunnerJobRecord, events: RunEvent[] = []
|
|||||||
const observation = runnerJobObservation(job, events);
|
const observation = runnerJobObservation(job, events);
|
||||||
const phase = stringValue(observation.phase) ?? "unknown";
|
const phase = stringValue(observation.phase) ?? "unknown";
|
||||||
const notStarted = phase === "created" || phase === "recorded";
|
const notStarted = phase === "created" || phase === "recorded";
|
||||||
|
const runnerLostSuspected = notStarted || runnerJobPhaseIndicatesLost(phase);
|
||||||
return {
|
return {
|
||||||
category: stringValue(observation.category) ?? (notStarted ? "runner-job-created" : phase.startsWith("terminal:") ? "runner-job-terminal" : "runner-job-observed"),
|
category: stringValue(observation.category) ?? (notStarted ? "runner-job-created" : phase.startsWith("terminal:") ? "runner-job-terminal" : "runner-job-observed"),
|
||||||
runnerLostSuspected: notStarted,
|
runnerLostSuspected,
|
||||||
phase,
|
phase,
|
||||||
evidenceLevel: stringValue(observation.evidenceLevel) ?? (notStarted ? "medium" : "high"),
|
evidenceLevel: stringValue(observation.evidenceLevel) ?? (notStarted ? "medium" : "high"),
|
||||||
lastObservedSeq: numberValue(observation.lastObservedSeq),
|
lastObservedSeq: numberValue(observation.lastObservedSeq),
|
||||||
lastObservedAt: stringValue(observation.lastObservedAt),
|
lastObservedAt: stringValue(observation.lastObservedAt),
|
||||||
lastObservedKind: stringValue(observation.lastObservedKind),
|
lastObservedKind: stringValue(observation.lastObservedKind),
|
||||||
|
terminalReportState: stringValue(observation.terminalReportState),
|
||||||
|
runReportState: stringValue(observation.runReportState),
|
||||||
runId: job.runId,
|
runId: job.runId,
|
||||||
commandId: job.commandId,
|
commandId: job.commandId,
|
||||||
runnerJobId: job.id,
|
runnerJobId: job.id,
|
||||||
@@ -131,6 +134,26 @@ export function runnerJobObservation(job: RunnerJobRecord, events: RunEvent[] =
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const reconcilerObservation = recordAt(job.result, "observation");
|
||||||
|
const reconcilerPhase = stringValue(reconcilerObservation?.observedRunnerPhase) ?? stringValue(reconcilerObservation?.phase);
|
||||||
|
if (reconcilerObservation && reconcilerPhase) {
|
||||||
|
return {
|
||||||
|
phase: reconcilerPhase,
|
||||||
|
category: stringValue(reconcilerObservation.category) ?? "runner-job-observed",
|
||||||
|
terminalStatus: stringValue(reconcilerObservation.terminalStatus),
|
||||||
|
failureKind: stringValue(reconcilerObservation.failureKind),
|
||||||
|
startedAt: stringValue(recordAt(reconcilerObservation, "k8s")?.startTime),
|
||||||
|
finishedAt: stringValue(recordAt(reconcilerObservation, "k8s")?.completionTime),
|
||||||
|
lastObservedSeq: null,
|
||||||
|
lastObservedAt: stringValue(reconcilerObservation.lastK8sObservedAt) ?? stringValue(reconcilerObservation.lastObservedAt),
|
||||||
|
lastObservedKind: stringValue(reconcilerObservation.lastObservedKind) ?? `manager-reconciler:${reconcilerPhase}`,
|
||||||
|
terminalReportState: stringValue(reconcilerObservation.terminalReportState),
|
||||||
|
runReportState: stringValue(reconcilerObservation.runReportState),
|
||||||
|
evidenceLevel: stringValue(reconcilerObservation.evidenceLevel) ?? "high",
|
||||||
|
valuesPrinted: false,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
const created = recordAt(job.result, "kubernetes")?.created === true;
|
const created = recordAt(job.result, "kubernetes")?.created === true;
|
||||||
return {
|
return {
|
||||||
phase: created ? "created" : "recorded",
|
phase: created ? "created" : "recorded",
|
||||||
@@ -212,11 +235,23 @@ function runnerJobReference(job: RunnerJobRecord, events: RunEvent[]): JsonRecor
|
|||||||
lastObservedSeq: numberValue(observation.lastObservedSeq),
|
lastObservedSeq: numberValue(observation.lastObservedSeq),
|
||||||
lastObservedAt: stringValue(observation.lastObservedAt),
|
lastObservedAt: stringValue(observation.lastObservedAt),
|
||||||
lastObservedKind: stringValue(observation.lastObservedKind),
|
lastObservedKind: stringValue(observation.lastObservedKind),
|
||||||
|
terminalReportState: stringValue(observation.terminalReportState),
|
||||||
|
runReportState: stringValue(observation.runReportState),
|
||||||
logPath: stringValue(recordAt(job.result, "runner")?.logPath),
|
logPath: stringValue(recordAt(job.result, "runner")?.logPath),
|
||||||
valuesPrinted: false,
|
valuesPrinted: false,
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function runnerJobPhaseIndicatesLost(value: JsonValue | undefined): boolean {
|
||||||
|
const phase = stringValue(value);
|
||||||
|
return phase === "created"
|
||||||
|
|| phase === "recorded"
|
||||||
|
|| phase === "k8s:failed"
|
||||||
|
|| phase === "k8s:missing"
|
||||||
|
|| phase === "k8s:succeeded"
|
||||||
|
|| phase === "k8s:observe-failed";
|
||||||
|
}
|
||||||
|
|
||||||
function sessionReference(run: RunRecord): JsonRecord {
|
function sessionReference(run: RunRecord): JsonRecord {
|
||||||
if (!run.sessionRef) return { sessionId: null, sessionRefNull: true, sessionPath: null, valuesPrinted: false };
|
if (!run.sessionRef) return { sessionId: null, sessionRefNull: true, sessionPath: null, valuesPrinted: false };
|
||||||
return { sessionId: run.sessionRef.sessionId, sessionRefNull: false, sessionPath: `/api/v1/sessions/${run.sessionRef.sessionId}`, valuesPrinted: false };
|
return { sessionId: run.sessionRef.sessionId, sessionRefNull: false, sessionPath: `/api/v1/sessions/${run.sessionRef.sessionId}`, valuesPrinted: false };
|
||||||
|
|||||||
@@ -550,6 +550,24 @@ CREATE TABLE IF NOT EXISTS agentrun_schema_migrations (
|
|||||||
return result.rows.map(runnerJobFromRow);
|
return result.rows.map(runnerJobFromRow);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async listRunnerJobsForReconciliation(limit: number): Promise<RunnerJobRecord[]> {
|
||||||
|
const clamped = Math.max(1, Math.min(limit, 500));
|
||||||
|
const result = await this.pool.query(
|
||||||
|
`SELECT rj.*
|
||||||
|
FROM agentrun_runner_jobs rj
|
||||||
|
LEFT JOIN agentrun_runs r ON r.id = rj.run_id
|
||||||
|
LEFT JOIN agentrun_commands c ON c.id = rj.command_id
|
||||||
|
WHERE r.id IS NULL
|
||||||
|
OR c.id IS NULL
|
||||||
|
OR r.status NOT IN ('completed', 'failed', 'blocked', 'cancelled')
|
||||||
|
OR c.state NOT IN ('completed', 'failed', 'cancelled')
|
||||||
|
ORDER BY rj.updated_at ASC, rj.created_at ASC
|
||||||
|
LIMIT $1`,
|
||||||
|
[clamped],
|
||||||
|
);
|
||||||
|
return result.rows.map(runnerJobFromRow);
|
||||||
|
}
|
||||||
|
|
||||||
async getRunnerJobByIdempotencyKey(runId: string, idempotencyKey: string, payloadHash: string): Promise<RunnerJobRecord | null> {
|
async getRunnerJobByIdempotencyKey(runId: string, idempotencyKey: string, payloadHash: string): Promise<RunnerJobRecord | null> {
|
||||||
const result = await this.pool.query("SELECT * FROM agentrun_runner_jobs WHERE run_id = $1 AND idempotency_key = $2", [runId, idempotencyKey]);
|
const result = await this.pool.query("SELECT * FROM agentrun_runner_jobs WHERE run_id = $1 AND idempotency_key = $2", [runId, idempotencyKey]);
|
||||||
const row = result.rows[0];
|
const row = result.rows[0];
|
||||||
@@ -582,6 +600,18 @@ CREATE TABLE IF NOT EXISTS agentrun_schema_migrations (
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async updateRunnerJobResult(runnerJobId: string, patch: JsonRecord): Promise<RunnerJobRecord> {
|
||||||
|
return this.withTransaction(async (client) => {
|
||||||
|
const existing = await client.query("SELECT * FROM agentrun_runner_jobs WHERE id = $1 FOR UPDATE", [runnerJobId]);
|
||||||
|
const row = existing.rows[0];
|
||||||
|
if (!row) throw new AgentRunError("schema-invalid", `runner job ${runnerJobId} was not found`, { httpStatus: 404 });
|
||||||
|
const record = runnerJobFromRow(row);
|
||||||
|
const nextResult = { ...record.result, ...patch };
|
||||||
|
const updated = await client.query("UPDATE agentrun_runner_jobs SET result = $2::jsonb, updated_at = $3 WHERE id = $1 RETURNING *", [runnerJobId, JSON.stringify(nextResult), nowIso()]);
|
||||||
|
return runnerJobFromRow(updated.rows[0]);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
async claimRun(runId: string, runnerId: string, leaseMs: number): Promise<RunRecord> {
|
async claimRun(runId: string, runnerId: string, leaseMs: number): Promise<RunRecord> {
|
||||||
return this.withTransaction(async (client) => {
|
return this.withTransaction(async (client) => {
|
||||||
const run = await this.requireRunForUpdate(client, runId);
|
const run = await this.requireRunForUpdate(client, runId);
|
||||||
|
|||||||
@@ -8,6 +8,7 @@ export function runnerJobStatusSummary(job: RunnerJobRecord, events: RunEvent[]
|
|||||||
const kubernetes = recordAt(job.result, "kubernetes");
|
const kubernetes = recordAt(job.result, "kubernetes");
|
||||||
const retention = recordAt(job.result, "retention");
|
const retention = recordAt(job.result, "retention");
|
||||||
const envImage = recordAt(job.result, "envImage");
|
const envImage = recordAt(job.result, "envImage");
|
||||||
|
const reconcilerObservation = recordAt(job.result, "observation");
|
||||||
const terminalStatus = observation.terminalStatus;
|
const terminalStatus = observation.terminalStatus;
|
||||||
return {
|
return {
|
||||||
id: job.id,
|
id: job.id,
|
||||||
@@ -31,6 +32,9 @@ export function runnerJobStatusSummary(job: RunnerJobRecord, events: RunEvent[]
|
|||||||
lastObservedSeq: typeof observation.lastObservedSeq === "number" ? observation.lastObservedSeq : null,
|
lastObservedSeq: typeof observation.lastObservedSeq === "number" ? observation.lastObservedSeq : null,
|
||||||
lastObservedAt: typeof observation.lastObservedAt === "string" ? observation.lastObservedAt : null,
|
lastObservedAt: typeof observation.lastObservedAt === "string" ? observation.lastObservedAt : null,
|
||||||
lastObservedKind: typeof observation.lastObservedKind === "string" ? observation.lastObservedKind : null,
|
lastObservedKind: typeof observation.lastObservedKind === "string" ? observation.lastObservedKind : null,
|
||||||
|
terminalReportState: typeof observation.terminalReportState === "string" ? observation.terminalReportState : null,
|
||||||
|
runReportState: typeof observation.runReportState === "string" ? observation.runReportState : null,
|
||||||
|
reconcilerObservation,
|
||||||
jobIdentity,
|
jobIdentity,
|
||||||
podIdentity: recordAt(job.result, "podIdentity"),
|
podIdentity: recordAt(job.result, "podIdentity"),
|
||||||
logPath: typeof runner.logPath === "string" ? runner.logPath : null,
|
logPath: typeof runner.logPath === "string" ? runner.logPath : null,
|
||||||
|
|||||||
@@ -0,0 +1,314 @@
|
|||||||
|
import { spawn } from "node:child_process";
|
||||||
|
import { AgentRunError } from "../common/errors.js";
|
||||||
|
import { redactJson, redactText } from "../common/redaction.js";
|
||||||
|
import type { CommandRecord, FailureKind, JsonRecord, JsonValue, RunnerJobRecord, TerminalStatus } from "../common/types.js";
|
||||||
|
import { nowIso, stableHash } from "../common/validation.js";
|
||||||
|
import type { AgentRunStore } from "./store.js";
|
||||||
|
import { isTerminalCommandState, isTerminalRunStatus } from "./store.js";
|
||||||
|
|
||||||
|
export interface RunnerReconcilerOptions {
|
||||||
|
store: AgentRunStore;
|
||||||
|
namespace?: string;
|
||||||
|
kubectlCommand?: string;
|
||||||
|
limit?: number;
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface RunnerReconcilerLoopOptions extends RunnerReconcilerOptions {
|
||||||
|
batchSize: number;
|
||||||
|
intervalMs: number;
|
||||||
|
onError?: (error: unknown) => void;
|
||||||
|
}
|
||||||
|
|
||||||
|
interface K8sObject {
|
||||||
|
kind?: string;
|
||||||
|
metadata?: {
|
||||||
|
name?: string;
|
||||||
|
namespace?: string;
|
||||||
|
uid?: string;
|
||||||
|
creationTimestamp?: string;
|
||||||
|
labels?: Record<string, string>;
|
||||||
|
annotations?: Record<string, string>;
|
||||||
|
ownerReferences?: Array<{ kind?: string; name?: string }>;
|
||||||
|
};
|
||||||
|
status?: JsonRecord;
|
||||||
|
}
|
||||||
|
|
||||||
|
interface K8sList {
|
||||||
|
items?: K8sObject[];
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function reconcileRunnerJobsOnce(input: RunnerReconcilerOptions): Promise<JsonRecord> {
|
||||||
|
const limit = clampLimit(input.limit ?? 20);
|
||||||
|
const jobs = await input.store.listRunnerJobsForReconciliation(limit);
|
||||||
|
const items: JsonRecord[] = [];
|
||||||
|
let observeFailedCount = 0;
|
||||||
|
let runClosureCount = 0;
|
||||||
|
|
||||||
|
for (const job of jobs) {
|
||||||
|
const observation = await observeRunnerJob(job, { ...(input.namespace ? { namespace: input.namespace } : {}), ...(input.kubectlCommand ? { kubectlCommand: input.kubectlCommand } : {}) });
|
||||||
|
await input.store.updateRunnerJobResult(job.id, { observation });
|
||||||
|
if (stringValue(observation.category) === "runner-job-observe-failed") observeFailedCount++;
|
||||||
|
const runClosure = await closeOpenRunWhenCommandTerminal(input.store, job);
|
||||||
|
if (runClosure.closed === true) runClosureCount++;
|
||||||
|
items.push({
|
||||||
|
runnerJobId: job.id,
|
||||||
|
runId: job.runId,
|
||||||
|
commandId: job.commandId,
|
||||||
|
namespace: stringValue(observation.namespace) ?? job.namespace,
|
||||||
|
jobName: job.jobName,
|
||||||
|
observedRunnerPhase: stringValue(observation.observedRunnerPhase) ?? "unknown",
|
||||||
|
terminalReportState: stringValue(observation.terminalReportState) ?? "unknown",
|
||||||
|
runReportState: stringValue(observation.runReportState) ?? "unknown",
|
||||||
|
runClosure,
|
||||||
|
valuesPrinted: false,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
return {
|
||||||
|
action: "runner-job-reconcile",
|
||||||
|
reconciledAt: nowIso(),
|
||||||
|
limit,
|
||||||
|
scannedCount: jobs.length,
|
||||||
|
updatedCount: jobs.length,
|
||||||
|
observeFailedCount,
|
||||||
|
runClosureCount,
|
||||||
|
items,
|
||||||
|
valuesPrinted: false,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
export function startRunnerJobReconciler(input: RunnerReconcilerLoopOptions): () => void {
|
||||||
|
const intervalMs = Math.max(1_000, input.intervalMs);
|
||||||
|
let stopped = false;
|
||||||
|
let running = false;
|
||||||
|
const tick = async (): Promise<void> => {
|
||||||
|
if (stopped || running) return;
|
||||||
|
running = true;
|
||||||
|
try {
|
||||||
|
await reconcileRunnerJobsOnce({ store: input.store, limit: input.batchSize, ...(input.namespace ? { namespace: input.namespace } : {}), ...(input.kubectlCommand ? { kubectlCommand: input.kubectlCommand } : {}) });
|
||||||
|
} catch (error) {
|
||||||
|
input.onError?.(error);
|
||||||
|
} finally {
|
||||||
|
running = false;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
const timer = setInterval(() => { void tick(); }, intervalMs);
|
||||||
|
timer.unref?.();
|
||||||
|
void tick();
|
||||||
|
return () => {
|
||||||
|
stopped = true;
|
||||||
|
clearInterval(timer);
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
async function observeRunnerJob(job: RunnerJobRecord, input: { namespace?: string; kubectlCommand?: string }): Promise<JsonRecord> {
|
||||||
|
const namespace = input.namespace ?? job.namespace;
|
||||||
|
const kubectlCommand = input.kubectlCommand ?? "kubectl";
|
||||||
|
const observedAt = nowIso();
|
||||||
|
try {
|
||||||
|
const [jobObject, podObjects] = await Promise.all([
|
||||||
|
getK8sObject(kubectlCommand, "job", namespace, job.jobName),
|
||||||
|
getK8sList(kubectlCommand, "pods", namespace, ["-l", `job-name=${job.jobName}`]),
|
||||||
|
]);
|
||||||
|
return observationFromObjects(job, namespace, observedAt, jobObject, podObjects);
|
||||||
|
} catch (error) {
|
||||||
|
return {
|
||||||
|
source: "manager-reconciler",
|
||||||
|
namespace,
|
||||||
|
jobName: job.jobName,
|
||||||
|
observedRunnerPhase: "k8s:observe-failed",
|
||||||
|
category: "runner-job-observe-failed",
|
||||||
|
terminalStatus: null,
|
||||||
|
failureKind: "infra-failed",
|
||||||
|
failureMessage: error instanceof Error ? redactText(error.message).slice(0, 300) : "unknown observation failure",
|
||||||
|
terminalReportState: "k8s-observation-failed",
|
||||||
|
runReportState: "not-updated",
|
||||||
|
lastK8sObservedAt: observedAt,
|
||||||
|
evidenceLevel: "medium",
|
||||||
|
valuesPrinted: false,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
function observationFromObjects(job: RunnerJobRecord, namespace: string, observedAt: string, jobObject: K8sObject | null, podObjects: K8sObject[]): JsonRecord {
|
||||||
|
const phase = observedRunnerPhase(jobObject, podObjects);
|
||||||
|
const terminalReportState = terminalReportStateForPhase(phase);
|
||||||
|
const failureKind: FailureKind | null = phase === "k8s:failed" || phase === "k8s:missing" ? "infra-failed" : null;
|
||||||
|
const k8s = k8sSummary(jobObject, podObjects);
|
||||||
|
return {
|
||||||
|
source: "manager-reconciler",
|
||||||
|
namespace,
|
||||||
|
jobName: job.jobName,
|
||||||
|
observedRunnerPhase: phase,
|
||||||
|
phase,
|
||||||
|
category: categoryForPhase(phase),
|
||||||
|
terminalStatus: null,
|
||||||
|
failureKind,
|
||||||
|
terminalReportState,
|
||||||
|
runReportState: "not-updated",
|
||||||
|
lastK8sObservedAt: observedAt,
|
||||||
|
lastObservedAt: observedAt,
|
||||||
|
lastObservedKind: `manager-reconciler:${phase}`,
|
||||||
|
evidenceLevel: phase === "k8s:unknown" ? "medium" : "high",
|
||||||
|
k8s,
|
||||||
|
valuesPrinted: false,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
async function closeOpenRunWhenCommandTerminal(store: AgentRunStore, job: RunnerJobRecord): Promise<JsonRecord> {
|
||||||
|
let command: CommandRecord;
|
||||||
|
try {
|
||||||
|
command = await store.getCommand(job.commandId);
|
||||||
|
} catch {
|
||||||
|
return { closed: false, state: "command-missing", valuesPrinted: false };
|
||||||
|
}
|
||||||
|
if (!isTerminalCommandState(command.state)) return { closed: false, state: "command-open", commandState: command.state, valuesPrinted: false };
|
||||||
|
const run = await store.getRun(job.runId);
|
||||||
|
if (isTerminalRunStatus(run.status)) return { closed: false, state: "run-terminal", runStatus: run.status, commandState: command.state, valuesPrinted: false };
|
||||||
|
const terminalStatus = terminalStatusFromCommand(command);
|
||||||
|
const failureKind: FailureKind | null = terminalStatus === "cancelled" ? "cancelled" : terminalStatus === "failed" ? "infra-failed" : null;
|
||||||
|
const failureMessage = terminalStatus === "completed" ? null : "manager reconciler closed open run after terminal command state";
|
||||||
|
const next = await store.finishRun(run.id, { terminalStatus, failureKind, failureMessage });
|
||||||
|
return { closed: true, state: "closed-open-run", runStatus: next.status, terminalStatus, commandState: command.state, valuesPrinted: false };
|
||||||
|
}
|
||||||
|
|
||||||
|
function terminalStatusFromCommand(command: CommandRecord): TerminalStatus {
|
||||||
|
if (command.state === "completed") return "completed";
|
||||||
|
if (command.state === "cancelled") return "cancelled";
|
||||||
|
return "failed";
|
||||||
|
}
|
||||||
|
|
||||||
|
function observedRunnerPhase(jobObject: K8sObject | null, pods: K8sObject[]): string {
|
||||||
|
if (!jobObject && pods.length === 0) return "k8s:missing";
|
||||||
|
const condition = jobCondition(jobObject);
|
||||||
|
if (condition === "Complete") return "k8s:succeeded";
|
||||||
|
if (condition === "Failed") return "k8s:failed";
|
||||||
|
const active = numberPath(jobObject, ["status", "active"]);
|
||||||
|
const succeeded = numberPath(jobObject, ["status", "succeeded"]);
|
||||||
|
const failed = numberPath(jobObject, ["status", "failed"]);
|
||||||
|
const podPhases = pods.map((pod) => stringPath(pod, ["status", "phase"])).filter((phase): phase is string => Boolean(phase));
|
||||||
|
if ((succeeded ?? 0) > 0 || podPhases.some((phase) => phase === "Succeeded")) return "k8s:succeeded";
|
||||||
|
if ((failed ?? 0) > 0 || podPhases.some((phase) => phase === "Failed")) return "k8s:failed";
|
||||||
|
if ((active ?? 0) > 0 || podPhases.some((phase) => phase === "Running")) return "k8s:running";
|
||||||
|
if (podPhases.some((phase) => phase === "Pending")) return "k8s:pending";
|
||||||
|
return "k8s:unknown";
|
||||||
|
}
|
||||||
|
|
||||||
|
function categoryForPhase(phase: string): string {
|
||||||
|
if (phase === "k8s:succeeded") return "runner-job-k8s-succeeded";
|
||||||
|
if (phase === "k8s:failed") return "runner-job-k8s-failed";
|
||||||
|
if (phase === "k8s:missing") return "runner-job-k8s-missing";
|
||||||
|
if (phase === "k8s:running" || phase === "k8s:pending") return "runner-job-running";
|
||||||
|
return "runner-job-observed";
|
||||||
|
}
|
||||||
|
|
||||||
|
function terminalReportStateForPhase(phase: string): string {
|
||||||
|
if (phase === "k8s:succeeded" || phase === "k8s:failed") return "terminal-runner-report-missing";
|
||||||
|
if (phase === "k8s:missing") return "runner-resource-missing";
|
||||||
|
return "not-terminal";
|
||||||
|
}
|
||||||
|
|
||||||
|
function k8sSummary(jobObject: K8sObject | null, pods: K8sObject[]): JsonRecord {
|
||||||
|
const podPhases = pods.map((pod) => stringPath(pod, ["status", "phase"]) ?? "unknown");
|
||||||
|
return {
|
||||||
|
jobPresent: jobObject !== null,
|
||||||
|
jobUid: stringPath(jobObject, ["metadata", "uid"]),
|
||||||
|
condition: jobCondition(jobObject),
|
||||||
|
active: numberPath(jobObject, ["status", "active"]),
|
||||||
|
succeeded: numberPath(jobObject, ["status", "succeeded"]),
|
||||||
|
failed: numberPath(jobObject, ["status", "failed"]),
|
||||||
|
startTime: stringPath(jobObject, ["status", "startTime"]),
|
||||||
|
completionTime: stringPath(jobObject, ["status", "completionTime"]),
|
||||||
|
podCount: pods.length,
|
||||||
|
podPhases,
|
||||||
|
newestPodName: newestPodName(pods),
|
||||||
|
valuesPrinted: false,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
function jobCondition(jobObject: K8sObject | null): string | null {
|
||||||
|
const conditions = jobObject?.status?.conditions;
|
||||||
|
if (!Array.isArray(conditions)) return null;
|
||||||
|
for (const condition of [...conditions].reverse()) {
|
||||||
|
if (typeof condition !== "object" || condition === null || Array.isArray(condition)) continue;
|
||||||
|
const record = condition as JsonRecord;
|
||||||
|
if ((record.type === "Complete" || record.type === "Failed") && record.status === "True") return record.type;
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
function newestPodName(pods: K8sObject[]): string | null {
|
||||||
|
const sorted = [...pods].sort((left, right) => (stringPath(right, ["metadata", "creationTimestamp"]) ?? "").localeCompare(stringPath(left, ["metadata", "creationTimestamp"]) ?? ""));
|
||||||
|
return stringPath(sorted[0] ?? null, ["metadata", "name"]);
|
||||||
|
}
|
||||||
|
|
||||||
|
async function getK8sObject(kubectlCommand: string, resource: string, namespace: string, name: string): Promise<K8sObject | null> {
|
||||||
|
const result = await kubectlRun(kubectlCommand, ["get", resource, name, "-n", namespace, "-o", "json"]);
|
||||||
|
if (result.code === 0) return parseJsonObject(result.stdout, `kubectl get ${resource}`) as K8sObject;
|
||||||
|
if (isNotFound(result.stderr)) return null;
|
||||||
|
throw new AgentRunError("infra-failed", `kubectl get ${resource} failed with code ${result.code}`, { httpStatus: 502, details: redactJson({ stderr: redactText(result.stderr.slice(-2000)), stdout: redactText(result.stdout.slice(-1000)), valuesPrinted: false }) });
|
||||||
|
}
|
||||||
|
|
||||||
|
async function getK8sList(kubectlCommand: string, resource: string, namespace: string, extraArgs: string[]): Promise<K8sObject[]> {
|
||||||
|
const result = await kubectlRun(kubectlCommand, ["get", resource, "-n", namespace, ...extraArgs, "-o", "json"]);
|
||||||
|
if (result.code !== 0) throw new AgentRunError("infra-failed", `kubectl get ${resource} failed with code ${result.code}`, { httpStatus: 502, details: redactJson({ stderr: redactText(result.stderr.slice(-2000)), stdout: redactText(result.stdout.slice(-1000)), valuesPrinted: false }) });
|
||||||
|
const parsed = parseJsonObject(result.stdout, `kubectl get ${resource}`) as K8sList;
|
||||||
|
return Array.isArray(parsed.items) ? parsed.items.filter((item) => typeof item === "object" && item !== null && !Array.isArray(item)) : [];
|
||||||
|
}
|
||||||
|
|
||||||
|
async function kubectlRun(kubectlCommand: string, args: string[]): Promise<{ code: number | null; signal: NodeJS.Signals | null; stdout: string; stderr: string }> {
|
||||||
|
const child = spawn(kubectlCommand, args, { stdio: ["ignore", "pipe", "pipe"] });
|
||||||
|
let stdout = "";
|
||||||
|
let stderr = "";
|
||||||
|
child.stdout.setEncoding("utf8");
|
||||||
|
child.stderr.setEncoding("utf8");
|
||||||
|
child.stdout.on("data", (chunk) => { stdout += String(chunk); });
|
||||||
|
child.stderr.on("data", (chunk) => { stderr += String(chunk); });
|
||||||
|
const result = await new Promise<{ code: number | null; signal: NodeJS.Signals | null }>((resolve, reject) => {
|
||||||
|
child.on("error", reject);
|
||||||
|
child.on("close", (code, signal) => resolve({ code, signal }));
|
||||||
|
}).catch((error: unknown) => {
|
||||||
|
throw new AgentRunError("infra-failed", `failed to start kubectl: ${error instanceof Error ? error.message : String(error)}`, { httpStatus: 503 });
|
||||||
|
});
|
||||||
|
return { ...result, stdout, stderr };
|
||||||
|
}
|
||||||
|
|
||||||
|
function parseJsonObject(text: string, label: string): JsonRecord {
|
||||||
|
try {
|
||||||
|
const parsed = JSON.parse(text) as JsonValue;
|
||||||
|
if (typeof parsed === "object" && parsed !== null && !Array.isArray(parsed)) return parsed;
|
||||||
|
} catch (error) {
|
||||||
|
throw new AgentRunError("infra-failed", `${label} returned invalid JSON: ${error instanceof Error ? error.message : String(error)}`, { httpStatus: 502, details: { stdoutHash: stableHash(text), valuesPrinted: false } });
|
||||||
|
}
|
||||||
|
throw new AgentRunError("infra-failed", `${label} returned non-object JSON`, { httpStatus: 502, details: { stdoutHash: stableHash(text), valuesPrinted: false } });
|
||||||
|
}
|
||||||
|
|
||||||
|
function isNotFound(stderr: string): boolean {
|
||||||
|
return /notfound|not found|notfound/i.test(stderr.replace(/\s+/gu, ""));
|
||||||
|
}
|
||||||
|
|
||||||
|
function clampLimit(limit: number): number {
|
||||||
|
return Math.max(1, Math.min(Math.floor(limit), 500));
|
||||||
|
}
|
||||||
|
|
||||||
|
function stringValue(value: JsonValue | undefined): string | null {
|
||||||
|
return typeof value === "string" && value.length > 0 ? value : null;
|
||||||
|
}
|
||||||
|
|
||||||
|
function stringPath(value: unknown, path: string[]): string | null {
|
||||||
|
let current: unknown = value;
|
||||||
|
for (const key of path) {
|
||||||
|
if (typeof current !== "object" || current === null || Array.isArray(current)) return null;
|
||||||
|
current = (current as Record<string, unknown>)[key];
|
||||||
|
}
|
||||||
|
return typeof current === "string" ? current : null;
|
||||||
|
}
|
||||||
|
|
||||||
|
function numberPath(value: unknown, path: string[]): number | null {
|
||||||
|
let current: unknown = value;
|
||||||
|
for (const key of path) {
|
||||||
|
if (typeof current !== "object" || current === null || Array.isArray(current)) return null;
|
||||||
|
current = (current as Record<string, unknown>)[key];
|
||||||
|
}
|
||||||
|
return typeof current === "number" && Number.isFinite(current) ? current : null;
|
||||||
|
}
|
||||||
+36
-2
@@ -12,6 +12,7 @@ import type { RunnerRetentionOptions } from "./runner-retention.js";
|
|||||||
import { dispatchQueueTask, refreshQueueTaskFromCore } from "./queue-dispatch.js";
|
import { dispatchQueueTask, refreshQueueTaskFromCore } from "./queue-dispatch.js";
|
||||||
import { buildRunResult } from "./result.js";
|
import { buildRunResult } from "./result.js";
|
||||||
import { runnerJobStatusSummary } from "./runner-job-status.js";
|
import { runnerJobStatusSummary } from "./runner-job-status.js";
|
||||||
|
import { reconcileRunnerJobsOnce, startRunnerJobReconciler } from "./runner-reconciler.js";
|
||||||
import { createSessionPvc, deleteSessionPvc, getSessionPvcSummary, refreshSessionPvcSummary, runSessionStorageGc } from "./session-pvc.js";
|
import { createSessionPvc, deleteSessionPvc, getSessionPvcSummary, refreshSessionPvcSummary, runSessionStorageGc } from "./session-pvc.js";
|
||||||
import type { SessionPvcSummary } from "./session-pvc.js";
|
import type { SessionPvcSummary } from "./session-pvc.js";
|
||||||
import type { SessionPvcOptions } from "./session-pvc.js";
|
import type { SessionPvcOptions } from "./session-pvc.js";
|
||||||
@@ -84,6 +85,15 @@ function runnerRetentionOptionsForRequest(defaults: RunnerRetentionOptions | und
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function runnerReconcilerOptionsForRuntime(defaults: ManagerServerOptions["runnerReconcilerOptions"] | undefined, runnerJobDefaults: ManagerServerOptions["runnerJobDefaults"] | undefined): RunnerReconcilerRuntimeOptions {
|
||||||
|
const enabled = defaults?.enabled ?? optionalBooleanEnv("AGENTRUN_MANAGER_RECONCILER_ENABLED", process.env.AGENTRUN_MANAGER_RECONCILER_ENABLED, false);
|
||||||
|
const namespace = defaults?.namespace ?? runnerJobDefaults?.namespace ?? process.env.AGENTRUN_RUNTIME_NAMESPACE ?? "agentrun-v01";
|
||||||
|
const intervalMs = defaults?.intervalMs ?? optionalPositiveInteger("AGENTRUN_MANAGER_RECONCILER_INTERVAL_MS", process.env.AGENTRUN_MANAGER_RECONCILER_INTERVAL_MS) ?? 30_000;
|
||||||
|
const batchSize = defaults?.batchSize ?? optionalPositiveInteger("AGENTRUN_MANAGER_RECONCILER_BATCH_SIZE", process.env.AGENTRUN_MANAGER_RECONCILER_BATCH_SIZE) ?? 20;
|
||||||
|
const kubectlCommand = defaults?.kubectlCommand ?? runnerJobDefaults?.kubectlCommand;
|
||||||
|
return { enabled, namespace, intervalMs, batchSize, ...(kubectlCommand ? { kubectlCommand } : {}) };
|
||||||
|
}
|
||||||
|
|
||||||
export interface ManagerServerOptions {
|
export interface ManagerServerOptions {
|
||||||
store?: AgentRunStore;
|
store?: AgentRunStore;
|
||||||
port?: number;
|
port?: number;
|
||||||
@@ -107,10 +117,19 @@ export interface ManagerServerOptions {
|
|||||||
sessionPvcOptions?: { kubectlHandler?: import("./session-pvc.js").KubectlHandler; kubectlCommand?: string; storageClassName?: string; size?: string };
|
sessionPvcOptions?: { kubectlHandler?: import("./session-pvc.js").KubectlHandler; kubectlCommand?: string; storageClassName?: string; size?: string };
|
||||||
providerProfileOptions?: { namespace?: string; kubectlCommand?: string };
|
providerProfileOptions?: { namespace?: string; kubectlCommand?: string };
|
||||||
toolCredentialOptions?: { namespace?: string; kubectlCommand?: string };
|
toolCredentialOptions?: { namespace?: string; kubectlCommand?: string };
|
||||||
|
runnerReconcilerOptions?: { enabled?: boolean; namespace?: string; kubectlCommand?: string; intervalMs?: number; batchSize?: number };
|
||||||
aipodSpecDir?: string;
|
aipodSpecDir?: string;
|
||||||
auth?: ManagerAuthConfig;
|
auth?: ManagerAuthConfig;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
interface RunnerReconcilerRuntimeOptions {
|
||||||
|
enabled: boolean;
|
||||||
|
namespace: string;
|
||||||
|
intervalMs: number;
|
||||||
|
batchSize: number;
|
||||||
|
kubectlCommand?: string;
|
||||||
|
}
|
||||||
|
|
||||||
export interface StartedManagerServer {
|
export interface StartedManagerServer {
|
||||||
server: Server;
|
server: Server;
|
||||||
baseUrl: string;
|
baseUrl: string;
|
||||||
@@ -124,22 +143,25 @@ export async function startManagerServer(options: ManagerServerOptions = {}): Pr
|
|||||||
const sessionPvcDefaults = options.sessionPvcOptions;
|
const sessionPvcDefaults = options.sessionPvcOptions;
|
||||||
const providerProfileDefaults = options.providerProfileOptions;
|
const providerProfileDefaults = options.providerProfileOptions;
|
||||||
const toolCredentialDefaults = options.toolCredentialOptions;
|
const toolCredentialDefaults = options.toolCredentialOptions;
|
||||||
|
const runnerReconcilerOptions = runnerReconcilerOptionsForRuntime(options.runnerReconcilerOptions, runnerJobDefaults);
|
||||||
const aipodSpecDir = options.aipodSpecDir ?? process.env.AGENTRUN_AIPOD_SPEC_DIR;
|
const aipodSpecDir = options.aipodSpecDir ?? process.env.AGENTRUN_AIPOD_SPEC_DIR;
|
||||||
const auth = options.auth ?? managerServerAuthConfigFromEnv();
|
const auth = options.auth ?? managerServerAuthConfigFromEnv();
|
||||||
const authSummary = managerAuthSummary(auth);
|
const authSummary = managerAuthSummary(auth);
|
||||||
|
const stopRunnerReconciler = runnerReconcilerOptions.enabled ? startRunnerJobReconciler({ store, namespace: runnerReconcilerOptions.namespace, intervalMs: runnerReconcilerOptions.intervalMs, batchSize: runnerReconcilerOptions.batchSize, ...(runnerReconcilerOptions.kubectlCommand ? { kubectlCommand: runnerReconcilerOptions.kubectlCommand } : {}), onError: (error) => console.warn(JSON.stringify({ code: "runner-reconciler-failed", message: error instanceof Error ? error.message : String(error), valuesPrinted: false })) }) : null;
|
||||||
const server = createServer(async (req, res) => {
|
const server = createServer(async (req, res) => {
|
||||||
const traceId = `trc_${Date.now().toString(36)}_${Math.random().toString(36).slice(2, 8)}`;
|
const traceId = `trc_${Date.now().toString(36)}_${Math.random().toString(36).slice(2, 8)}`;
|
||||||
try {
|
try {
|
||||||
const method = req.method ?? "GET";
|
const method = req.method ?? "GET";
|
||||||
const url = new URL(req.url ?? "/", "http://agentrun.local");
|
const url = new URL(req.url ?? "/", "http://agentrun.local");
|
||||||
assertManagerRequestAuthorized(req, url.pathname, auth);
|
assertManagerRequestAuthorized(req, url.pathname, auth);
|
||||||
const data = await route({ method, url, body: await readBody(req), store, sourceCommit, authSummary, ...(runnerJobDefaults ? { runnerJobDefaults } : {}), ...(sessionPvcDefaults ? { sessionPvcDefaults } : {}), ...(providerProfileDefaults ? { providerProfileDefaults } : {}), ...(toolCredentialDefaults ? { toolCredentialDefaults } : {}), ...(aipodSpecDir ? { aipodSpecDir } : {}) });
|
const data = await route({ method, url, body: await readBody(req), store, sourceCommit, authSummary, runnerReconcilerOptions, ...(runnerJobDefaults ? { runnerJobDefaults } : {}), ...(sessionPvcDefaults ? { sessionPvcDefaults } : {}), ...(providerProfileDefaults ? { providerProfileDefaults } : {}), ...(toolCredentialDefaults ? { toolCredentialDefaults } : {}), ...(aipodSpecDir ? { aipodSpecDir } : {}) });
|
||||||
writeJson(res, 200, { ok: true, data, traceId });
|
writeJson(res, 200, { ok: true, data, traceId });
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
const agentError = normalizeError(error);
|
const agentError = normalizeError(error);
|
||||||
writeJson(res, agentError.httpStatus, { ok: false, failureKind: agentError.failureKind, message: agentError.message, traceId, error: errorToJson(error) });
|
writeJson(res, agentError.httpStatus, { ok: false, failureKind: agentError.failureKind, message: agentError.message, traceId, error: errorToJson(error) });
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
server.on("close", () => { stopRunnerReconciler?.(); });
|
||||||
await new Promise<void>((resolve) => server.listen(options.port ?? 0, options.host ?? "127.0.0.1", resolve));
|
await new Promise<void>((resolve) => server.listen(options.port ?? 0, options.host ?? "127.0.0.1", resolve));
|
||||||
const address = server.address() as AddressInfo;
|
const address = server.address() as AddressInfo;
|
||||||
return { server, baseUrl: `http://${address.address}:${address.port}`, store };
|
return { server, baseUrl: `http://${address.address}:${address.port}`, store };
|
||||||
@@ -405,7 +427,7 @@ function compactRecoveryActions(value: JsonValue | undefined): JsonValue[] {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
async function route({ method, url, body, store, sourceCommit, authSummary, runnerJobDefaults, sessionPvcDefaults, providerProfileDefaults, toolCredentialDefaults, aipodSpecDir }: { method: string; url: URL; body: unknown; store: AgentRunStore; sourceCommit: string; authSummary?: JsonRecord; runnerJobDefaults?: NonNullable<ManagerServerOptions["runnerJobDefaults"]>; sessionPvcDefaults?: NonNullable<ManagerServerOptions["sessionPvcOptions"]>; providerProfileDefaults?: NonNullable<ManagerServerOptions["providerProfileOptions"]>; toolCredentialDefaults?: NonNullable<ManagerServerOptions["toolCredentialOptions"]>; aipodSpecDir?: string }): Promise<JsonValue> {
|
async function route({ method, url, body, store, sourceCommit, authSummary, runnerJobDefaults, sessionPvcDefaults, providerProfileDefaults, toolCredentialDefaults, runnerReconcilerOptions, aipodSpecDir }: { method: string; url: URL; body: unknown; store: AgentRunStore; sourceCommit: string; authSummary?: JsonRecord; runnerJobDefaults?: NonNullable<ManagerServerOptions["runnerJobDefaults"]>; sessionPvcDefaults?: NonNullable<ManagerServerOptions["sessionPvcOptions"]>; providerProfileDefaults?: NonNullable<ManagerServerOptions["providerProfileOptions"]>; toolCredentialDefaults?: NonNullable<ManagerServerOptions["toolCredentialOptions"]>; runnerReconcilerOptions?: RunnerReconcilerRuntimeOptions; aipodSpecDir?: string }): Promise<JsonValue> {
|
||||||
const path = url.pathname;
|
const path = url.pathname;
|
||||||
if (method === "GET" && (path === "/health" || path === "/health/live" || path === "/health/readiness")) {
|
if (method === "GET" && (path === "/health" || path === "/health/live" || path === "/health/readiness")) {
|
||||||
const database = await store.health();
|
const database = await store.health();
|
||||||
@@ -730,6 +752,12 @@ async function route({ method, url, body, store, sourceCommit, authSummary, runn
|
|||||||
if (!job) throw new AgentRunError("schema-invalid", `runner job ${runnerJobId} was not found`, { httpStatus: 404 });
|
if (!job) throw new AgentRunError("schema-invalid", `runner job ${runnerJobId} was not found`, { httpStatus: 404 });
|
||||||
return runnerJobStatusSummary(job, await store.listEvents(runId, 0, 500)) as JsonValue;
|
return runnerJobStatusSummary(job, await store.listEvents(runId, 0, 500)) as JsonValue;
|
||||||
}
|
}
|
||||||
|
if (method === "POST" && path === "/api/v1/reconciler/runner-jobs") {
|
||||||
|
const record = body === null ? {} : asRecord(body, "runnerReconciler");
|
||||||
|
const limit = Math.max(1, Math.min(numberField(record, "limit", runnerReconcilerOptions?.batchSize ?? 20), 500));
|
||||||
|
const namespace = optionalString(record.namespace) ?? runnerReconcilerOptions?.namespace ?? process.env.AGENTRUN_RUNTIME_NAMESPACE ?? "agentrun-v01";
|
||||||
|
return await reconcileRunnerJobsOnce({ store, namespace, limit, ...(runnerReconcilerOptions?.kubectlCommand ? { kubectlCommand: runnerReconcilerOptions.kubectlCommand } : {}) }) as JsonValue;
|
||||||
|
}
|
||||||
const commandShowMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/commands\/([^/]+)$/u);
|
const commandShowMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/commands\/([^/]+)$/u);
|
||||||
if (method === "GET" && commandShowMatch) return await store.getCommand(commandShowMatch[2] ?? "") as unknown as JsonValue;
|
if (method === "GET" && commandShowMatch) return await store.getCommand(commandShowMatch[2] ?? "") as unknown as JsonValue;
|
||||||
const commandResultMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/commands\/([^/]+)\/result$/u);
|
const commandResultMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/commands\/([^/]+)\/result$/u);
|
||||||
@@ -1104,6 +1132,12 @@ function booleanEnv(key: string, value: string): boolean {
|
|||||||
throw new AgentRunError("schema-invalid", `${key} must be true or false`, { httpStatus: 500 });
|
throw new AgentRunError("schema-invalid", `${key} must be true or false`, { httpStatus: 500 });
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function optionalBooleanEnv(key: string, value: unknown, fallback: boolean): boolean {
|
||||||
|
if (value === undefined || value === null || value === "") return fallback;
|
||||||
|
if (typeof value !== "string") throw new AgentRunError("schema-invalid", `${key} must be true or false`, { httpStatus: 500 });
|
||||||
|
return booleanEnv(key, value);
|
||||||
|
}
|
||||||
|
|
||||||
function normalizeError(error: unknown): AgentRunError {
|
function normalizeError(error: unknown): AgentRunError {
|
||||||
if (error instanceof AgentRunError) return error;
|
if (error instanceof AgentRunError) return error;
|
||||||
return new AgentRunError("infra-failed", error instanceof Error ? error.message : String(error), { httpStatus: 500 });
|
return new AgentRunError("infra-failed", error instanceof Error ? error.message : String(error), { httpStatus: 500 });
|
||||||
|
|||||||
@@ -28,8 +28,10 @@ export interface AgentRunStore {
|
|||||||
listCommands(runId: string, afterSeq: number, limit: number): MaybePromise<CommandRecord[]>;
|
listCommands(runId: string, afterSeq: number, limit: number): MaybePromise<CommandRecord[]>;
|
||||||
registerRunner(input: Partial<RunnerRecord>): MaybePromise<RunnerRecord>;
|
registerRunner(input: Partial<RunnerRecord>): MaybePromise<RunnerRecord>;
|
||||||
listRunnerJobs(runId: string, commandId?: string): MaybePromise<RunnerJobRecord[]>;
|
listRunnerJobs(runId: string, commandId?: string): MaybePromise<RunnerJobRecord[]>;
|
||||||
|
listRunnerJobsForReconciliation(limit: number): MaybePromise<RunnerJobRecord[]>;
|
||||||
getRunnerJobByIdempotencyKey(runId: string, idempotencyKey: string, payloadHash: string): MaybePromise<RunnerJobRecord | null>;
|
getRunnerJobByIdempotencyKey(runId: string, idempotencyKey: string, payloadHash: string): MaybePromise<RunnerJobRecord | null>;
|
||||||
saveRunnerJob(input: SaveRunnerJobInput): MaybePromise<RunnerJobRecord>;
|
saveRunnerJob(input: SaveRunnerJobInput): MaybePromise<RunnerJobRecord>;
|
||||||
|
updateRunnerJobResult(runnerJobId: string, patch: JsonRecord): MaybePromise<RunnerJobRecord>;
|
||||||
claimRun(runId: string, runnerId: string, leaseMs: number): MaybePromise<RunRecord>;
|
claimRun(runId: string, runnerId: string, leaseMs: number): MaybePromise<RunRecord>;
|
||||||
heartbeat(runId: string, runnerId: string, leaseMs: number): MaybePromise<RunRecord>;
|
heartbeat(runId: string, runnerId: string, leaseMs: number): MaybePromise<RunRecord>;
|
||||||
ackCommand(commandId: string): MaybePromise<CommandRecord>;
|
ackCommand(commandId: string): MaybePromise<CommandRecord>;
|
||||||
@@ -201,6 +203,18 @@ export class MemoryAgentRunStore implements AgentRunStore {
|
|||||||
.sort((a, b) => a.createdAt.localeCompare(b.createdAt));
|
.sort((a, b) => a.createdAt.localeCompare(b.createdAt));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
listRunnerJobsForReconciliation(limit: number): RunnerJobRecord[] {
|
||||||
|
const clamped = Math.max(1, Math.min(limit, 500));
|
||||||
|
return Array.from(this.runnerJobs.values())
|
||||||
|
.filter((job) => {
|
||||||
|
const run = this.runs.get(job.runId);
|
||||||
|
const command = this.commands.get(job.commandId);
|
||||||
|
return !run || !command || !isTerminalRunStatus(run.status) || !isTerminalCommandState(command.state);
|
||||||
|
})
|
||||||
|
.sort((a, b) => a.updatedAt.localeCompare(b.updatedAt) || a.createdAt.localeCompare(b.createdAt))
|
||||||
|
.slice(0, clamped);
|
||||||
|
}
|
||||||
|
|
||||||
getRunnerJobByIdempotencyKey(runId: string, idempotencyKey: string, payloadHash: string): RunnerJobRecord | null {
|
getRunnerJobByIdempotencyKey(runId: string, idempotencyKey: string, payloadHash: string): RunnerJobRecord | null {
|
||||||
const existing = Array.from(this.runnerJobs.values()).find((job) => job.runId === runId && job.idempotencyKey === idempotencyKey);
|
const existing = Array.from(this.runnerJobs.values()).find((job) => job.runId === runId && job.idempotencyKey === idempotencyKey);
|
||||||
if (!existing) return null;
|
if (!existing) return null;
|
||||||
@@ -219,6 +233,14 @@ export class MemoryAgentRunStore implements AgentRunStore {
|
|||||||
return record;
|
return record;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
updateRunnerJobResult(runnerJobId: string, patch: JsonRecord): RunnerJobRecord {
|
||||||
|
const existing = this.runnerJobs.get(runnerJobId);
|
||||||
|
if (!existing) throw new AgentRunError("schema-invalid", `runner job ${runnerJobId} was not found`, { httpStatus: 404 });
|
||||||
|
const next: RunnerJobRecord = { ...existing, result: { ...existing.result, ...patch }, updatedAt: nowIso() };
|
||||||
|
this.runnerJobs.set(runnerJobId, next);
|
||||||
|
return next;
|
||||||
|
}
|
||||||
|
|
||||||
claimRun(runId: string, runnerId: string, leaseMs: number): RunRecord {
|
claimRun(runId: string, runnerId: string, leaseMs: number): RunRecord {
|
||||||
const run = this.getRun(runId);
|
const run = this.getRun(runId);
|
||||||
if (isTerminalRunStatus(run.status)) throw new AgentRunError(run.failureKind ?? (run.status === "cancelled" ? "cancelled" : "schema-invalid"), `run ${runId} is already terminal: ${run.status}`, { httpStatus: 409 });
|
if (isTerminalRunStatus(run.status)) throw new AgentRunError(run.failureKind ?? (run.status === "cancelled" ? "cancelled" : "schema-invalid"), `run ${runId} is already terminal: ${run.status}`, { httpStatus: 409 });
|
||||||
|
|||||||
Reference in New Issue
Block a user