Files
pikasTech-agentrun/src/mgr/store.ts
T
2026-06-02 08:50:21 +08:00

511 lines
26 KiB
TypeScript

import type { BackendProfile, BackendTurnResult, CommandRecord, CreateCommandInput, CreateQueueTaskInput, CreateRunInput, FailureKind, JsonRecord, QueueAttemptRef, QueueCommanderSnapshot, QueueReadCursorRecord, QueueStats, QueueTaskListResult, QueueTaskRecord, QueueTaskState, RunEvent, RunnerJobRecord, RunnerRecord, RunRecord, SessionRecord, SessionRef, TerminalStatus } from "../common/types.js";
import { AgentRunError } from "../common/errors.js";
import { newId, nowIso, stableHash } from "../common/validation.js";
import { redactJson } from "../common/redaction.js";
import { backendCapabilities } from "../common/backend-profiles.js";
import { normalizeRunEventPayload, requireEventType } from "../common/events.js";
export type MaybePromise<T> = T | Promise<T>;
export interface StoreHealth extends JsonRecord {
adapter: "memory-self-test" | "postgres";
ready: boolean;
reachable: boolean;
migrationReady: boolean;
migrationId: string | null;
failureKind: FailureKind | null;
message: string | null;
credentialValuesPrinted: false;
}
export interface AgentRunStore {
health(): MaybePromise<StoreHealth>;
createRun(input: CreateRunInput): MaybePromise<RunRecord>;
getRun(runId: string): MaybePromise<RunRecord>;
listEvents(runId: string, afterSeq: number, limit: number): MaybePromise<RunEvent[]>;
createCommand(runId: string, input: CreateCommandInput): MaybePromise<CommandRecord>;
getCommand(commandId: string): MaybePromise<CommandRecord>;
listCommands(runId: string, afterSeq: number, limit: number): MaybePromise<CommandRecord[]>;
registerRunner(input: Partial<RunnerRecord>): MaybePromise<RunnerRecord>;
listRunnerJobs(runId: string, commandId?: string): MaybePromise<RunnerJobRecord[]>;
getRunnerJobByIdempotencyKey(runId: string, idempotencyKey: string, payloadHash: string): MaybePromise<RunnerJobRecord | null>;
saveRunnerJob(input: SaveRunnerJobInput): MaybePromise<RunnerJobRecord>;
claimRun(runId: string, runnerId: string, leaseMs: number): MaybePromise<RunRecord>;
heartbeat(runId: string, runnerId: string, leaseMs: number): MaybePromise<RunRecord>;
ackCommand(commandId: string): MaybePromise<CommandRecord>;
finishCommand(commandId: string, result: Pick<BackendTurnResult, "terminalStatus" | "failureKind" | "failureMessage" | "threadId" | "turnId">): MaybePromise<CommandRecord>;
appendEvent(runId: string, type: RunEvent["type"], payload: JsonRecord): MaybePromise<RunEvent>;
finishRun(runId: string, result: Pick<BackendTurnResult, "terminalStatus" | "failureKind" | "failureMessage" | "threadId" | "turnId">): MaybePromise<RunRecord>;
cancelRun(runId: string, reason?: string): MaybePromise<RunRecord>;
cancelCommand(commandId: string, reason?: string): MaybePromise<CommandRecord>;
getSession(sessionId: string): MaybePromise<SessionRecord | null>;
createQueueTask(input: CreateQueueTaskInput): MaybePromise<QueueTaskRecord>;
listQueueTasks(input: ListQueueTasksInput): MaybePromise<QueueTaskListResult>;
getQueueTask(taskId: string): MaybePromise<QueueTaskRecord>;
updateQueueTaskAttempt(taskId: string, input: UpdateQueueTaskAttemptInput): MaybePromise<QueueTaskRecord>;
cancelQueueTask(taskId: string, reason?: string): MaybePromise<QueueTaskRecord>;
markQueueTaskRead(taskId: string, readerId: string): MaybePromise<QueueReadCursorRecord>;
queueStats(queue?: string): MaybePromise<QueueStats>;
queueCommander(queue?: string): MaybePromise<QueueCommanderSnapshot>;
backends(): MaybePromise<JsonRecord[]>;
close?(): MaybePromise<void>;
}
export interface ListQueueTasksInput {
queue?: string;
state?: QueueTaskState;
cursor?: string;
limit: number;
updatedAfter?: number;
}
export interface UpdateQueueTaskAttemptInput {
state: QueueTaskState;
latestAttempt: QueueAttemptRef;
sessionPath: string | null;
}
export interface SaveRunnerJobInput {
runId: string;
commandId: string;
idempotencyKey?: string | null;
payloadHash: string;
attemptId: string;
runnerId: string;
namespace: string;
jobName: string;
managerUrl: string;
image: string;
sourceCommit: string;
serviceAccountName?: string | null;
result: JsonRecord;
}
export async function openAgentRunStoreFromEnv(env: NodeJS.ProcessEnv = process.env): Promise<AgentRunStore> {
const databaseUrl = env.DATABASE_URL?.trim();
if (databaseUrl) {
const { createPostgresAgentRunStore } = await import("./postgres-store.js");
return createPostgresAgentRunStore({ connectionString: databaseUrl });
}
const storeMode = env.AGENTRUN_STORE ?? env.AGENTRUN_MGR_STORE;
if (storeMode === "memory") return new MemoryAgentRunStore();
throw new AgentRunError("infra-failed", "DATABASE_URL is required for agentrun-mgr live runtime; set AGENTRUN_STORE=memory only for explicit self-test/dev mode", { httpStatus: 503, details: { adapter: "postgres", databaseUrl: "missing", memoryFallback: "disabled" } });
}
export class MemoryAgentRunStore implements AgentRunStore {
private readonly runs = new Map<string, RunRecord>();
private readonly commands = new Map<string, CommandRecord>();
private readonly eventsByRun = new Map<string, RunEvent[]>();
private readonly runners = new Map<string, RunnerRecord>();
private readonly sessions = new Map<string, SessionRecord>();
private readonly runnerJobs = new Map<string, RunnerJobRecord>();
private readonly queueTasks = new Map<string, QueueTaskRecord>();
private readonly queueReadCursors = new Map<string, QueueReadCursorRecord>();
private queueVersion = 0;
health(): StoreHealth {
return { adapter: "memory-self-test", ready: true, reachable: true, migrationReady: true, migrationId: "memory-self-test", failureKind: null, message: null, credentialValuesPrinted: false };
}
createRun(input: CreateRunInput): RunRecord {
const at = nowIso();
const sessionRef = this.resolveSessionForRun(input, at);
const run: RunRecord = { ...input, sessionRef, resourceBundleRef: input.resourceBundleRef ?? null, id: newId("run"), status: "pending", terminalStatus: null, failureKind: null, failureMessage: null, createdAt: at, updatedAt: at, claimedBy: null, leaseExpiresAt: null };
this.runs.set(run.id, run);
this.eventsByRun.set(run.id, []);
this.appendEvent(run.id, "backend_status", { phase: "run-created", backendProfile: run.backendProfile, sessionRef: summarizeSessionRef(run.sessionRef ?? null), resourceBundleRef: summarizeResourceBundleRef(run.resourceBundleRef ?? null) });
return run;
}
getRun(runId: string): RunRecord {
const run = this.runs.get(runId);
if (!run) throw new AgentRunError("schema-invalid", `run ${runId} was not found`, { httpStatus: 404 });
return run;
}
listEvents(runId: string, afterSeq: number, limit: number): RunEvent[] {
this.getRun(runId);
return (this.eventsByRun.get(runId) ?? []).filter((event) => event.seq > afterSeq).slice(0, Math.max(1, Math.min(limit, 500)));
}
createCommand(runId: string, input: CreateCommandInput): CommandRecord {
const run = this.getRun(runId);
if (isTerminalRunStatus(run.status)) throw new AgentRunError(run.failureKind ?? (run.status === "cancelled" ? "cancelled" : "schema-invalid"), `run ${runId} is already terminal: ${run.status}`, { httpStatus: 409 });
const payloadHash = stableHash(input.payload);
if (input.idempotencyKey) {
const existing = Array.from(this.commands.values()).find((command) => command.runId === runId && command.idempotencyKey === input.idempotencyKey);
if (existing) {
if (existing.payloadHash !== payloadHash) throw new AgentRunError("schema-invalid", "idempotency key reused with different payload", { httpStatus: 409 });
return existing;
}
}
const at = nowIso();
const seq = Array.from(this.commands.values()).filter((command) => command.runId === runId).length + 1;
const command: CommandRecord = { ...input, id: newId("cmd"), runId, seq, state: "pending", payloadHash, createdAt: at, updatedAt: at, acknowledgedAt: null };
this.commands.set(command.id, command);
this.appendEvent(runId, "backend_status", { phase: "command-created", commandId: command.id, commandType: command.type });
return command;
}
getCommand(commandId: string): CommandRecord {
const command = this.commands.get(commandId);
if (!command) throw new AgentRunError("schema-invalid", `command ${commandId} was not found`, { httpStatus: 404 });
return command;
}
listCommands(runId: string, afterSeq: number, limit: number): CommandRecord[] {
this.getRun(runId);
return Array.from(this.commands.values()).filter((command) => command.runId === runId && command.seq > afterSeq).slice(0, Math.max(1, Math.min(limit, 100)));
}
registerRunner(input: Partial<RunnerRecord>): RunnerRecord {
const at = nowIso();
const runner: RunnerRecord = { id: input.id ?? newId("runner"), registeredAt: at, heartbeatAt: at, ...input };
this.runners.set(runner.id, runner);
return runner;
}
listRunnerJobs(runId: string, commandId?: string): RunnerJobRecord[] {
this.getRun(runId);
return Array.from(this.runnerJobs.values())
.filter((job) => job.runId === runId && (!commandId || job.commandId === commandId))
.sort((a, b) => a.createdAt.localeCompare(b.createdAt));
}
getRunnerJobByIdempotencyKey(runId: string, idempotencyKey: string, payloadHash: string): RunnerJobRecord | null {
const existing = Array.from(this.runnerJobs.values()).find((job) => job.runId === runId && job.idempotencyKey === idempotencyKey);
if (!existing) return null;
if (existing.payloadHash !== payloadHash) throw new AgentRunError("schema-invalid", "runner job idempotency key reused with different payload", { httpStatus: 409 });
return existing;
}
saveRunnerJob(input: SaveRunnerJobInput): RunnerJobRecord {
if (input.idempotencyKey) {
const existing = this.getRunnerJobByIdempotencyKey(input.runId, input.idempotencyKey, input.payloadHash);
if (existing) return existing;
}
const at = nowIso();
const record: RunnerJobRecord = { ...input, id: newId("rjob"), idempotencyKey: input.idempotencyKey ?? null, serviceAccountName: input.serviceAccountName ?? null, createdAt: at, updatedAt: at };
this.runnerJobs.set(record.id, record);
return record;
}
claimRun(runId: string, runnerId: string, leaseMs: number): RunRecord {
const run = this.getRun(runId);
if (isTerminalRunStatus(run.status)) throw new AgentRunError(run.failureKind ?? (run.status === "cancelled" ? "cancelled" : "schema-invalid"), `run ${runId} is already terminal: ${run.status}`, { httpStatus: 409 });
if (run.claimedBy && run.claimedBy !== runnerId && !isLeaseExpired(run.leaseExpiresAt)) throw new AgentRunError("runner-lease-conflict", `run ${runId} is already claimed`, { httpStatus: 409 });
const next = this.updateRun(runId, { status: "claimed", claimedBy: runnerId, leaseExpiresAt: new Date(Date.now() + leaseMs).toISOString() });
this.appendEvent(runId, "backend_status", { phase: "run-claimed", runnerId });
return next;
}
heartbeat(runId: string, runnerId: string, leaseMs: number): RunRecord {
const run = this.getRun(runId);
if (isTerminalRunStatus(run.status)) return run;
if (run.claimedBy !== runnerId) throw new AgentRunError("runner-lease-conflict", `run ${runId} is not claimed by ${runnerId}`, { httpStatus: 409 });
return this.updateRun(runId, { leaseExpiresAt: new Date(Date.now() + leaseMs).toISOString() });
}
ackCommand(commandId: string): CommandRecord {
const command = this.getCommand(commandId);
if (isTerminalCommandState(command.state) || command.state === "acknowledged") return command;
const next = { ...command, state: "acknowledged" as const, acknowledgedAt: nowIso(), updatedAt: nowIso() };
this.commands.set(commandId, next);
return next;
}
finishCommand(commandId: string, result: Pick<BackendTurnResult, "terminalStatus" | "failureKind" | "failureMessage" | "threadId" | "turnId">): CommandRecord {
const command = this.getCommand(commandId);
if (isTerminalCommandState(command.state)) return command;
const next = { ...command, state: commandStateFromTerminal(result.terminalStatus), updatedAt: nowIso() };
this.commands.set(commandId, next);
const run = this.getRun(command.runId);
if (result.threadId && run.sessionRef?.sessionId) this.upsertSessionThread(run, result.threadId, result.turnId ?? null);
this.appendEvent(command.runId, "backend_status", { phase: "command-terminal", commandId, state: next.state, terminalStatus: result.terminalStatus, failureKind: result.failureKind, message: result.failureMessage ?? null, threadId: result.threadId ?? null, turnId: result.turnId ?? null });
return next;
}
appendEvent(runId: string, type: RunEvent["type"], payload: JsonRecord): RunEvent {
this.getRun(runId);
const eventType = requireEventType(type);
const eventPayload = normalizeRunEventPayload(eventType, payload);
const events = this.eventsByRun.get(runId) ?? [];
const event: RunEvent = { id: newId("evt"), runId, seq: events.length + 1, type: eventType, payload: redactJson(eventPayload), createdAt: nowIso() };
events.push(event);
this.eventsByRun.set(runId, events);
return event;
}
finishRun(runId: string, result: Pick<BackendTurnResult, "terminalStatus" | "failureKind" | "failureMessage" | "threadId" | "turnId">): RunRecord {
const existing = this.getRun(runId);
if (isTerminalRunStatus(existing.status)) return existing;
const status = statusFromTerminal(result.terminalStatus);
const next = this.updateRun(runId, { status, terminalStatus: result.terminalStatus, failureKind: result.failureKind, failureMessage: result.failureMessage });
if (result.threadId && next.sessionRef?.sessionId) this.upsertSessionThread(next, result.threadId, result.turnId ?? null);
this.appendEvent(runId, "terminal_status", { terminalStatus: result.terminalStatus, failureKind: result.failureKind, message: result.failureMessage });
return next;
}
cancelRun(runId: string, reason = "cancel requested"): RunRecord {
const run = this.getRun(runId);
if (isTerminalRunStatus(run.status)) return run;
const at = nowIso();
for (const command of Array.from(this.commands.values()).filter((item) => item.runId === runId && !isTerminalCommandState(item.state))) {
const cancelled = { ...command, state: "cancelled" as const, updatedAt: at };
this.commands.set(command.id, cancelled);
this.appendEvent(runId, "backend_status", { phase: "command-terminal", commandId: command.id, state: "cancelled", terminalStatus: "cancelled", failureKind: "cancelled" });
}
this.appendEvent(runId, "backend_status", { phase: "cancel-requested", reason });
const next = this.updateRun(runId, { status: "cancelled", terminalStatus: "cancelled", failureKind: "cancelled", failureMessage: reason });
this.appendEvent(runId, "terminal_status", { terminalStatus: "cancelled", failureKind: "cancelled", message: reason });
return next;
}
cancelCommand(commandId: string, reason = "cancel requested"): CommandRecord {
const command = this.getCommand(commandId);
if (isTerminalCommandState(command.state)) return command;
const next = { ...command, state: "cancelled" as const, updatedAt: nowIso() };
this.commands.set(commandId, next);
this.appendEvent(command.runId, "backend_status", { phase: "command-terminal", commandId, state: "cancelled", terminalStatus: "cancelled", failureKind: "cancelled", message: reason });
return next;
}
getSession(sessionId: string): SessionRecord | null {
return this.sessions.get(sessionId) ?? null;
}
createQueueTask(input: CreateQueueTaskInput): QueueTaskRecord {
const payloadHash = stableHash(input.payload);
if (input.idempotencyKey) {
const existing = Array.from(this.queueTasks.values()).find((task) => task.tenantId === input.tenantId && task.projectId === input.projectId && task.idempotencyKey === input.idempotencyKey);
if (existing) {
if (existing.payloadHash !== payloadHash) throw new AgentRunError("schema-invalid", "queue task idempotency key reused with different payload", { httpStatus: 409 });
return existing;
}
}
const at = nowIso();
const task: QueueTaskRecord = { ...input, id: newId("qt"), state: "pending", version: this.nextQueueVersion(), payloadHash, latestAttempt: null, sessionPath: null, createdAt: at, updatedAt: at, cancelledAt: null, cancelReason: null };
this.queueTasks.set(task.id, task);
return task;
}
listQueueTasks(input: ListQueueTasksInput): QueueTaskListResult {
const startVersion = parseQueueCursor(input.cursor) ?? input.updatedAfter ?? 0;
const items = Array.from(this.queueTasks.values())
.filter((task) => task.version > startVersion)
.filter((task) => !input.queue || task.queue === input.queue)
.filter((task) => !input.state || task.state === input.state)
.sort(queueTaskSort)
.slice(0, clampQueueLimit(input.limit));
return { items, count: items.length, cursor: items.length > 0 ? String(items[items.length - 1]?.version ?? startVersion) : null };
}
getQueueTask(taskId: string): QueueTaskRecord {
const task = this.queueTasks.get(taskId);
if (!task) throw new AgentRunError("schema-invalid", `queue task ${taskId} was not found`, { httpStatus: 404 });
return task;
}
updateQueueTaskAttempt(taskId: string, input: UpdateQueueTaskAttemptInput): QueueTaskRecord {
const task = this.getQueueTask(taskId);
if (isTerminalQueueTaskState(task.state)) throw new AgentRunError(task.state === "cancelled" ? "cancelled" : "schema-invalid", `queue task ${taskId} is already terminal: ${task.state}`, { httpStatus: 409 });
const next: QueueTaskRecord = { ...task, state: input.state, latestAttempt: input.latestAttempt, sessionPath: input.sessionPath, version: this.nextQueueVersion(), updatedAt: nowIso() };
this.queueTasks.set(taskId, next);
return next;
}
cancelQueueTask(taskId: string, reason = "cancel requested"): QueueTaskRecord {
const task = this.getQueueTask(taskId);
if (isTerminalQueueTaskState(task.state)) return task;
const at = nowIso();
const next: QueueTaskRecord = { ...task, state: "cancelled", version: this.nextQueueVersion(), updatedAt: at, cancelledAt: at, cancelReason: reason };
this.queueTasks.set(taskId, next);
return next;
}
markQueueTaskRead(taskId: string, readerId: string): QueueReadCursorRecord {
const task = this.getQueueTask(taskId);
const record: QueueReadCursorRecord = { taskId, readerId, taskVersion: task.version, readAt: nowIso() };
this.queueReadCursors.set(queueReadKey(taskId, readerId), record);
return record;
}
queueStats(queue?: string): QueueStats {
return buildQueueStats(Array.from(this.queueTasks.values()).filter((task) => !queue || task.queue === queue), queue ?? null);
}
queueCommander(queue?: string): QueueCommanderSnapshot {
const items = Array.from(this.queueTasks.values()).filter((task) => !queue || task.queue === queue).sort(queueTaskSort).slice(0, 20);
const generatedAt = nowIso();
return { queue: queue ?? null, stats: buildQueueStats(Array.from(this.queueTasks.values()).filter((task) => !queue || task.queue === queue), queue ?? null, generatedAt), items, generatedAt };
}
backends(): JsonRecord[] {
return backendCapabilities();
}
private updateRun(runId: string, patch: Partial<RunRecord>): RunRecord {
const run = this.getRun(runId);
const next = { ...run, ...patch, updatedAt: nowIso() };
this.runs.set(runId, next);
return next;
}
private nextQueueVersion(): number {
this.queueVersion += 1;
return this.queueVersion;
}
private resolveSessionForRun(input: CreateRunInput, at: string): SessionRef | null {
if (!input.sessionRef) return null;
const existing = this.sessions.get(input.sessionRef.sessionId);
if (existing) {
assertSessionBoundary(existing, input);
return sessionRefFromRecord(existing, input.sessionRef);
}
const record: SessionRecord = {
sessionId: input.sessionRef.sessionId,
tenantId: input.tenantId,
projectId: input.projectId,
backendProfile: input.backendProfile,
conversationId: input.sessionRef.conversationId ?? null,
threadId: input.sessionRef.threadId ?? null,
metadata: input.sessionRef.metadata ?? {},
createdAt: at,
updatedAt: at,
expiresAt: input.sessionRef.expiresAt ?? null,
};
this.sessions.set(record.sessionId, record);
return sessionRefFromRecord(record, input.sessionRef);
}
private upsertSessionThread(run: RunRecord, threadId: string, turnId: string | null): void {
if (!run.sessionRef?.sessionId) return;
const at = nowIso();
const existing = this.sessions.get(run.sessionRef.sessionId);
const record: SessionRecord = {
sessionId: run.sessionRef.sessionId,
tenantId: run.tenantId,
projectId: run.projectId,
backendProfile: run.backendProfile,
conversationId: run.sessionRef.conversationId ?? existing?.conversationId ?? null,
threadId,
metadata: { ...(existing?.metadata ?? {}), ...(run.sessionRef.metadata ?? {}), ...(turnId ? { lastTurnId: turnId } : {}) },
createdAt: existing?.createdAt ?? at,
updatedAt: at,
expiresAt: run.sessionRef.expiresAt ?? existing?.expiresAt ?? null,
};
this.sessions.set(record.sessionId, record);
const nextSessionRef = sessionRefFromRecord(record, run.sessionRef);
this.updateRun(run.id, { sessionRef: nextSessionRef });
this.appendEvent(run.id, "backend_status", { phase: "session-updated", sessionRef: summarizeSessionRef(nextSessionRef), turnId });
}
}
export function assertSessionBoundary(existing: SessionRecord, input: CreateRunInput): void {
if (existing.tenantId !== input.tenantId || existing.projectId !== input.projectId) {
throw new AgentRunError("tenant-policy-denied", "sessionRef cannot be reused across tenant or project boundary", { httpStatus: 403, details: { sessionId: existing.sessionId, valuesPrinted: false } });
}
if (existing.backendProfile !== input.backendProfile) {
throw new AgentRunError("schema-invalid", "sessionRef cannot be reused across backendProfile boundary", { httpStatus: 400, details: { sessionId: existing.sessionId, existingBackendProfile: existing.backendProfile, requestedBackendProfile: input.backendProfile, valuesPrinted: false } });
}
}
export function statusFromTerminal(terminalStatus: TerminalStatus): RunRecord["status"] {
if (terminalStatus === "completed") return "completed";
if (terminalStatus === "cancelled") return "cancelled";
if (terminalStatus === "blocked") return "blocked";
return "failed";
}
export function commandStateFromTerminal(terminalStatus: TerminalStatus): CommandRecord["state"] {
if (terminalStatus === "completed") return "completed";
if (terminalStatus === "cancelled") return "cancelled";
return "failed";
}
export function isTerminalRunStatus(status: RunRecord["status"]): boolean {
return status === "completed" || status === "failed" || status === "blocked" || status === "cancelled";
}
export function isTerminalCommandState(state: CommandRecord["state"]): boolean {
return state === "completed" || state === "failed" || state === "cancelled";
}
export function isTerminalQueueTaskState(state: QueueTaskState): boolean {
return state === "completed" || state === "failed" || state === "blocked" || state === "cancelled";
}
export function isLeaseExpired(leaseExpiresAt: string | null): boolean {
if (!leaseExpiresAt) return true;
return new Date(leaseExpiresAt).getTime() <= Date.now();
}
export function sessionRefFromRecord(record: SessionRecord, fallback: SessionRef): SessionRef {
return {
sessionId: record.sessionId,
...(record.conversationId ? { conversationId: record.conversationId } : fallback.conversationId ? { conversationId: fallback.conversationId } : {}),
...(record.threadId ? { threadId: record.threadId } : fallback.threadId ? { threadId: fallback.threadId } : {}),
...(record.expiresAt ? { expiresAt: record.expiresAt } : fallback.expiresAt ? { expiresAt: fallback.expiresAt } : {}),
...(Object.keys(record.metadata).length > 0 ? { metadata: record.metadata } : fallback.metadata ? { metadata: fallback.metadata } : {}),
};
}
export function summarizeSessionRef(sessionRef: SessionRef | null): JsonRecord | null {
if (!sessionRef) return null;
return {
sessionId: sessionRef.sessionId,
conversationId: sessionRef.conversationId ?? null,
threadId: sessionRef.threadId ?? null,
expiresAt: sessionRef.expiresAt ?? null,
metadataKeys: Object.keys(sessionRef.metadata ?? {}).sort(),
valuesPrinted: false,
};
}
export function summarizeResourceBundleRef(resourceBundleRef: RunRecord["resourceBundleRef"] | null | undefined): JsonRecord | null {
if (!resourceBundleRef) return null;
return {
kind: resourceBundleRef.kind,
repoUrl: resourceBundleRef.repoUrl,
commitId: resourceBundleRef.commitId,
subdir: resourceBundleRef.subdir ?? null,
sparsePathCount: resourceBundleRef.sparsePaths?.length ?? 0,
toolAliases: resourceBundleRef.toolAliases ? { count: resourceBundleRef.toolAliases.length, names: resourceBundleRef.toolAliases.map((item) => item.name), valuesPrinted: false } : { count: 0, names: [], valuesPrinted: false },
submodules: resourceBundleRef.submodules ?? false,
lfs: resourceBundleRef.lfs ?? false,
credentialRef: resourceBundleRef.credentialRef ? { name: resourceBundleRef.credentialRef.name, namespace: resourceBundleRef.credentialRef.namespace ?? null, keys: resourceBundleRef.credentialRef.keys ?? [], valuesPrinted: false } : null,
};
}
export function queueTaskSort(a: QueueTaskRecord, b: QueueTaskRecord): number {
if (a.priority !== b.priority) return b.priority - a.priority;
return a.createdAt.localeCompare(b.createdAt) || a.id.localeCompare(b.id);
}
export function buildQueueStats(tasks: QueueTaskRecord[], queue: string | null, generatedAt = nowIso()): QueueStats {
const byState: Record<string, number> = {};
const byLane: Record<string, number> = {};
let maxVersion = 0;
for (const task of tasks) {
byState[task.state] = (byState[task.state] ?? 0) + 1;
byLane[task.lane] = (byLane[task.lane] ?? 0) + 1;
maxVersion = Math.max(maxVersion, task.version);
}
return { queue, total: tasks.length, byState, byLane, maxVersion, generatedAt };
}
export function clampQueueLimit(limit: number): number {
return Math.max(1, Math.min(Number.isFinite(limit) ? Math.trunc(limit) : 50, 100));
}
export function parseQueueCursor(cursor: string | undefined): number | null {
if (!cursor) return null;
const value = Number(cursor);
return Number.isInteger(value) && value >= 0 ? value : null;
}
function queueReadKey(taskId: string, readerId: string): string {
return `${taskId}:${readerId}`;
}