689 lines
38 KiB
TypeScript
689 lines
38 KiB
TypeScript
import type { BackendProfile, BackendTurnResult, CommandRecord, CreateCommandInput, CreateQueueTaskInput, CreateRunInput, FailureKind, JsonRecord, QueueAttemptRef, QueueCommanderSnapshot, QueueReadCursorRecord, QueueStats, QueueTaskListResult, QueueTaskRecord, QueueTaskState, RunEvent, RunnerJobRecord, RunnerRecord, RunRecord, SessionEventPage, SessionListResult, SessionListState, SessionReadCursorRecord, SessionRecord, SessionRef, SessionSummary, TerminalStatus } from "../common/types.js";
|
|
import { AgentRunError } from "../common/errors.js";
|
|
import { newId, nowIso, stableHash } from "../common/validation.js";
|
|
import { redactJson } from "../common/redaction.js";
|
|
import { backendCapabilities } from "../common/backend-profiles.js";
|
|
import { normalizeRunEventPayload, requireEventType } from "../common/events.js";
|
|
|
|
export type MaybePromise<T> = T | Promise<T>;
|
|
|
|
export interface StoreHealth extends JsonRecord {
|
|
adapter: "memory-self-test" | "postgres";
|
|
ready: boolean;
|
|
reachable: boolean;
|
|
migrationReady: boolean;
|
|
migrationId: string | null;
|
|
failureKind: FailureKind | null;
|
|
message: string | null;
|
|
credentialValuesPrinted: false;
|
|
}
|
|
|
|
export interface AgentRunStore {
|
|
health(): MaybePromise<StoreHealth>;
|
|
createRun(input: CreateRunInput): MaybePromise<RunRecord>;
|
|
getRun(runId: string): MaybePromise<RunRecord>;
|
|
listEvents(runId: string, afterSeq: number, limit: number): MaybePromise<RunEvent[]>;
|
|
createCommand(runId: string, input: CreateCommandInput): MaybePromise<CommandRecord>;
|
|
getCommand(commandId: string): MaybePromise<CommandRecord>;
|
|
listCommands(runId: string, afterSeq: number, limit: number): MaybePromise<CommandRecord[]>;
|
|
registerRunner(input: Partial<RunnerRecord>): MaybePromise<RunnerRecord>;
|
|
listRunnerJobs(runId: string, commandId?: string): MaybePromise<RunnerJobRecord[]>;
|
|
getRunnerJobByIdempotencyKey(runId: string, idempotencyKey: string, payloadHash: string): MaybePromise<RunnerJobRecord | null>;
|
|
saveRunnerJob(input: SaveRunnerJobInput): MaybePromise<RunnerJobRecord>;
|
|
claimRun(runId: string, runnerId: string, leaseMs: number): MaybePromise<RunRecord>;
|
|
heartbeat(runId: string, runnerId: string, leaseMs: number): MaybePromise<RunRecord>;
|
|
ackCommand(commandId: string): MaybePromise<CommandRecord>;
|
|
finishCommand(commandId: string, result: Pick<BackendTurnResult, "terminalStatus" | "failureKind" | "failureMessage" | "threadId" | "turnId">): MaybePromise<CommandRecord>;
|
|
appendEvent(runId: string, type: RunEvent["type"], payload: JsonRecord): MaybePromise<RunEvent>;
|
|
finishRun(runId: string, result: Pick<BackendTurnResult, "terminalStatus" | "failureKind" | "failureMessage" | "threadId" | "turnId">): MaybePromise<RunRecord>;
|
|
cancelRun(runId: string, reason?: string): MaybePromise<RunRecord>;
|
|
cancelCommand(commandId: string, reason?: string): MaybePromise<CommandRecord>;
|
|
getSession(sessionId: string): MaybePromise<SessionRecord | null>;
|
|
getSessionSummary(sessionId: string, readerId?: string | null): MaybePromise<SessionSummary>;
|
|
listSessions(input: ListSessionsInput): MaybePromise<SessionListResult>;
|
|
listSessionTrace(sessionId: string, input: SessionEventPageInput): MaybePromise<SessionEventPage>;
|
|
listSessionOutput(sessionId: string, input: SessionEventPageInput): MaybePromise<SessionEventPage>;
|
|
markSessionRead(sessionId: string, readerId: string): MaybePromise<SessionReadCursorRecord>;
|
|
createQueueTask(input: CreateQueueTaskInput): MaybePromise<QueueTaskRecord>;
|
|
listQueueTasks(input: ListQueueTasksInput): MaybePromise<QueueTaskListResult>;
|
|
getQueueTask(taskId: string): MaybePromise<QueueTaskRecord>;
|
|
updateQueueTaskAttempt(taskId: string, input: UpdateQueueTaskAttemptInput): MaybePromise<QueueTaskRecord>;
|
|
cancelQueueTask(taskId: string, reason?: string): MaybePromise<QueueTaskRecord>;
|
|
markQueueTaskRead(taskId: string, readerId: string): MaybePromise<QueueReadCursorRecord>;
|
|
queueStats(queue?: string): MaybePromise<QueueStats>;
|
|
queueCommander(queue?: string): MaybePromise<QueueCommanderSnapshot>;
|
|
backends(): MaybePromise<JsonRecord[]>;
|
|
close?(): MaybePromise<void>;
|
|
}
|
|
|
|
export interface ListQueueTasksInput {
|
|
queue?: string;
|
|
state?: QueueTaskState;
|
|
cursor?: string;
|
|
limit: number;
|
|
updatedAfter?: number;
|
|
}
|
|
|
|
export interface ListSessionsInput {
|
|
state?: SessionListState;
|
|
backendProfile?: BackendProfile;
|
|
readerId?: string | null;
|
|
cursor?: string;
|
|
limit: number;
|
|
}
|
|
|
|
export interface SessionEventPageInput {
|
|
runId?: string | null;
|
|
afterSeq: number;
|
|
limit: number;
|
|
}
|
|
|
|
export interface UpdateQueueTaskAttemptInput {
|
|
state: QueueTaskState;
|
|
latestAttempt: QueueAttemptRef;
|
|
sessionPath: string | null;
|
|
}
|
|
|
|
export interface SaveRunnerJobInput {
|
|
runId: string;
|
|
commandId: string;
|
|
idempotencyKey?: string | null;
|
|
payloadHash: string;
|
|
attemptId: string;
|
|
runnerId: string;
|
|
namespace: string;
|
|
jobName: string;
|
|
managerUrl: string;
|
|
image: string;
|
|
sourceCommit: string;
|
|
serviceAccountName?: string | null;
|
|
result: JsonRecord;
|
|
}
|
|
|
|
export async function openAgentRunStoreFromEnv(env: NodeJS.ProcessEnv = process.env): Promise<AgentRunStore> {
|
|
const databaseUrl = env.DATABASE_URL?.trim();
|
|
if (databaseUrl) {
|
|
const { createPostgresAgentRunStore } = await import("./postgres-store.js");
|
|
return createPostgresAgentRunStore({ connectionString: databaseUrl });
|
|
}
|
|
const storeMode = env.AGENTRUN_STORE ?? env.AGENTRUN_MGR_STORE;
|
|
if (storeMode === "memory") return new MemoryAgentRunStore();
|
|
throw new AgentRunError("infra-failed", "DATABASE_URL is required for agentrun-mgr live runtime; set AGENTRUN_STORE=memory only for explicit self-test/dev mode", { httpStatus: 503, details: { adapter: "postgres", databaseUrl: "missing", memoryFallback: "disabled" } });
|
|
}
|
|
|
|
export class MemoryAgentRunStore implements AgentRunStore {
|
|
private readonly runs = new Map<string, RunRecord>();
|
|
private readonly commands = new Map<string, CommandRecord>();
|
|
private readonly eventsByRun = new Map<string, RunEvent[]>();
|
|
private readonly runners = new Map<string, RunnerRecord>();
|
|
private readonly sessions = new Map<string, SessionRecord>();
|
|
private readonly sessionReadCursors = new Map<string, SessionReadCursorRecord>();
|
|
private readonly runnerJobs = new Map<string, RunnerJobRecord>();
|
|
private readonly queueTasks = new Map<string, QueueTaskRecord>();
|
|
private readonly queueReadCursors = new Map<string, QueueReadCursorRecord>();
|
|
private queueVersion = 0;
|
|
private sessionVersion = 0;
|
|
|
|
health(): StoreHealth {
|
|
return { adapter: "memory-self-test", ready: true, reachable: true, migrationReady: true, migrationId: "memory-self-test", failureKind: null, message: null, credentialValuesPrinted: false };
|
|
}
|
|
|
|
createRun(input: CreateRunInput): RunRecord {
|
|
const at = nowIso();
|
|
const sessionRef = this.resolveSessionForRun(input, at);
|
|
const run: RunRecord = { ...input, sessionRef, resourceBundleRef: input.resourceBundleRef ?? null, id: newId("run"), status: "pending", terminalStatus: null, failureKind: null, failureMessage: null, createdAt: at, updatedAt: at, claimedBy: null, leaseExpiresAt: null };
|
|
this.runs.set(run.id, run);
|
|
this.eventsByRun.set(run.id, []);
|
|
this.touchSessionForRun(run, { lastRunId: run.id, lastActivityAt: at }, { bumpVersion: false, at });
|
|
this.appendEvent(run.id, "backend_status", { phase: "run-created", backendProfile: run.backendProfile, sessionRef: summarizeSessionRef(run.sessionRef ?? null), resourceBundleRef: summarizeResourceBundleRef(run.resourceBundleRef ?? null) });
|
|
return run;
|
|
}
|
|
|
|
getRun(runId: string): RunRecord {
|
|
const run = this.runs.get(runId);
|
|
if (!run) throw new AgentRunError("schema-invalid", `run ${runId} was not found`, { httpStatus: 404 });
|
|
return run;
|
|
}
|
|
|
|
listEvents(runId: string, afterSeq: number, limit: number): RunEvent[] {
|
|
this.getRun(runId);
|
|
return (this.eventsByRun.get(runId) ?? []).filter((event) => event.seq > afterSeq).slice(0, Math.max(1, Math.min(limit, 500)));
|
|
}
|
|
|
|
createCommand(runId: string, input: CreateCommandInput): CommandRecord {
|
|
const run = this.getRun(runId);
|
|
if (isTerminalRunStatus(run.status)) throw new AgentRunError(run.failureKind ?? (run.status === "cancelled" ? "cancelled" : "schema-invalid"), `run ${runId} is already terminal: ${run.status}`, { httpStatus: 409 });
|
|
const payloadHash = stableHash(input.payload);
|
|
if (input.idempotencyKey) {
|
|
const existing = Array.from(this.commands.values()).find((command) => command.runId === runId && command.idempotencyKey === input.idempotencyKey);
|
|
if (existing) {
|
|
if (existing.payloadHash !== payloadHash) throw new AgentRunError("schema-invalid", "idempotency key reused with different payload", { httpStatus: 409 });
|
|
return existing;
|
|
}
|
|
}
|
|
const at = nowIso();
|
|
const seq = Array.from(this.commands.values()).filter((command) => command.runId === runId).length + 1;
|
|
const command: CommandRecord = { ...input, id: newId("cmd"), runId, seq, state: "pending", payloadHash, createdAt: at, updatedAt: at, acknowledgedAt: null };
|
|
this.commands.set(command.id, command);
|
|
if (command.type === "turn") this.touchSessionForRun(run, { executionState: "running", lastRunId: run.id, lastCommandId: command.id, activeRunId: run.id, activeCommandId: command.id, terminalStatus: null, failureKind: null, title: sessionTitleFromCommand(command), lastActivityAt: at }, { bumpVersion: true, at });
|
|
else if (command.type === "steer") this.touchSessionForRun(run, { executionState: "running", lastRunId: run.id, lastCommandId: command.id, activeRunId: run.id, lastActivityAt: at }, { bumpVersion: true, at });
|
|
this.appendEvent(runId, "backend_status", { phase: "command-created", commandId: command.id, commandType: command.type });
|
|
return command;
|
|
}
|
|
|
|
getCommand(commandId: string): CommandRecord {
|
|
const command = this.commands.get(commandId);
|
|
if (!command) throw new AgentRunError("schema-invalid", `command ${commandId} was not found`, { httpStatus: 404 });
|
|
return command;
|
|
}
|
|
|
|
listCommands(runId: string, afterSeq: number, limit: number): CommandRecord[] {
|
|
this.getRun(runId);
|
|
return Array.from(this.commands.values()).filter((command) => command.runId === runId && command.seq > afterSeq).slice(0, Math.max(1, Math.min(limit, 100)));
|
|
}
|
|
|
|
registerRunner(input: Partial<RunnerRecord>): RunnerRecord {
|
|
const at = nowIso();
|
|
const runner: RunnerRecord = { id: input.id ?? newId("runner"), registeredAt: at, heartbeatAt: at, ...input };
|
|
this.runners.set(runner.id, runner);
|
|
return runner;
|
|
}
|
|
|
|
listRunnerJobs(runId: string, commandId?: string): RunnerJobRecord[] {
|
|
this.getRun(runId);
|
|
return Array.from(this.runnerJobs.values())
|
|
.filter((job) => job.runId === runId && (!commandId || job.commandId === commandId))
|
|
.sort((a, b) => a.createdAt.localeCompare(b.createdAt));
|
|
}
|
|
|
|
getRunnerJobByIdempotencyKey(runId: string, idempotencyKey: string, payloadHash: string): RunnerJobRecord | null {
|
|
const existing = Array.from(this.runnerJobs.values()).find((job) => job.runId === runId && job.idempotencyKey === idempotencyKey);
|
|
if (!existing) return null;
|
|
if (existing.payloadHash !== payloadHash) throw new AgentRunError("schema-invalid", "runner job idempotency key reused with different payload", { httpStatus: 409 });
|
|
return existing;
|
|
}
|
|
|
|
saveRunnerJob(input: SaveRunnerJobInput): RunnerJobRecord {
|
|
if (input.idempotencyKey) {
|
|
const existing = this.getRunnerJobByIdempotencyKey(input.runId, input.idempotencyKey, input.payloadHash);
|
|
if (existing) return existing;
|
|
}
|
|
const at = nowIso();
|
|
const record: RunnerJobRecord = { ...input, id: newId("rjob"), idempotencyKey: input.idempotencyKey ?? null, serviceAccountName: input.serviceAccountName ?? null, createdAt: at, updatedAt: at };
|
|
this.runnerJobs.set(record.id, record);
|
|
return record;
|
|
}
|
|
|
|
claimRun(runId: string, runnerId: string, leaseMs: number): RunRecord {
|
|
const run = this.getRun(runId);
|
|
if (isTerminalRunStatus(run.status)) throw new AgentRunError(run.failureKind ?? (run.status === "cancelled" ? "cancelled" : "schema-invalid"), `run ${runId} is already terminal: ${run.status}`, { httpStatus: 409 });
|
|
if (run.claimedBy && run.claimedBy !== runnerId && !isLeaseExpired(run.leaseExpiresAt)) throw new AgentRunError("runner-lease-conflict", `run ${runId} is already claimed`, { httpStatus: 409 });
|
|
const next = this.updateRun(runId, { status: "claimed", claimedBy: runnerId, leaseExpiresAt: new Date(Date.now() + leaseMs).toISOString() });
|
|
this.touchSessionForRun(next, { executionState: "running", activeRunId: runId, lastRunId: runId, lastActivityAt: next.updatedAt }, { bumpVersion: false, at: next.updatedAt });
|
|
this.appendEvent(runId, "backend_status", { phase: "run-claimed", runnerId });
|
|
return next;
|
|
}
|
|
|
|
heartbeat(runId: string, runnerId: string, leaseMs: number): RunRecord {
|
|
const run = this.getRun(runId);
|
|
if (isTerminalRunStatus(run.status)) return run;
|
|
if (run.claimedBy !== runnerId) throw new AgentRunError("runner-lease-conflict", `run ${runId} is not claimed by ${runnerId}`, { httpStatus: 409 });
|
|
return this.updateRun(runId, { leaseExpiresAt: new Date(Date.now() + leaseMs).toISOString() });
|
|
}
|
|
|
|
ackCommand(commandId: string): CommandRecord {
|
|
const command = this.getCommand(commandId);
|
|
if (isTerminalCommandState(command.state) || command.state === "acknowledged") return command;
|
|
const next = { ...command, state: "acknowledged" as const, acknowledgedAt: nowIso(), updatedAt: nowIso() };
|
|
this.commands.set(commandId, next);
|
|
return next;
|
|
}
|
|
|
|
finishCommand(commandId: string, result: Pick<BackendTurnResult, "terminalStatus" | "failureKind" | "failureMessage" | "threadId" | "turnId">): CommandRecord {
|
|
const command = this.getCommand(commandId);
|
|
if (isTerminalCommandState(command.state)) return command;
|
|
const next = { ...command, state: commandStateFromTerminal(result.terminalStatus), updatedAt: nowIso() };
|
|
this.commands.set(commandId, next);
|
|
const run = this.getRun(command.runId);
|
|
if (result.threadId && run.sessionRef?.sessionId) this.upsertSessionThread(run, result.threadId, result.turnId ?? null);
|
|
if (command.type === "turn") this.touchSessionForRun(this.getRun(command.runId), { executionState: "terminal", activeRunId: null, activeCommandId: null, lastRunId: command.runId, lastCommandId: command.id, terminalStatus: result.terminalStatus, failureKind: result.failureKind, lastActivityAt: next.updatedAt }, { bumpVersion: true, at: next.updatedAt });
|
|
this.appendEvent(command.runId, "backend_status", { phase: "command-terminal", commandId, state: next.state, terminalStatus: result.terminalStatus, failureKind: result.failureKind, message: result.failureMessage ?? null, threadId: result.threadId ?? null, turnId: result.turnId ?? null });
|
|
return next;
|
|
}
|
|
|
|
appendEvent(runId: string, type: RunEvent["type"], payload: JsonRecord): RunEvent {
|
|
this.getRun(runId);
|
|
const eventType = requireEventType(type);
|
|
const eventPayload = normalizeRunEventPayload(eventType, payload);
|
|
const events = this.eventsByRun.get(runId) ?? [];
|
|
const event: RunEvent = { id: newId("evt"), runId, seq: events.length + 1, type: eventType, payload: redactJson(eventPayload), createdAt: nowIso() };
|
|
events.push(event);
|
|
this.eventsByRun.set(runId, events);
|
|
this.touchSessionForRun(this.getRun(runId), { lastEventSeq: event.seq, lastActivityAt: event.createdAt }, { bumpVersion: false, at: event.createdAt });
|
|
return event;
|
|
}
|
|
|
|
finishRun(runId: string, result: Pick<BackendTurnResult, "terminalStatus" | "failureKind" | "failureMessage" | "threadId" | "turnId">): RunRecord {
|
|
const existing = this.getRun(runId);
|
|
if (isTerminalRunStatus(existing.status)) return existing;
|
|
const status = statusFromTerminal(result.terminalStatus);
|
|
const next = this.updateRun(runId, { status, terminalStatus: result.terminalStatus, failureKind: result.failureKind, failureMessage: result.failureMessage });
|
|
if (result.threadId && next.sessionRef?.sessionId) this.upsertSessionThread(next, result.threadId, result.turnId ?? null);
|
|
this.appendEvent(runId, "terminal_status", { terminalStatus: result.terminalStatus, failureKind: result.failureKind, message: result.failureMessage });
|
|
this.touchSessionForRun(this.getRun(runId), { executionState: "terminal", activeRunId: null, activeCommandId: null, lastRunId: runId, terminalStatus: result.terminalStatus, failureKind: result.failureKind, lastActivityAt: next.updatedAt }, { bumpVersion: true, at: next.updatedAt });
|
|
return next;
|
|
}
|
|
|
|
cancelRun(runId: string, reason = "cancel requested"): RunRecord {
|
|
const run = this.getRun(runId);
|
|
if (isTerminalRunStatus(run.status)) return run;
|
|
const at = nowIso();
|
|
for (const command of Array.from(this.commands.values()).filter((item) => item.runId === runId && !isTerminalCommandState(item.state))) {
|
|
const cancelled = { ...command, state: "cancelled" as const, updatedAt: at };
|
|
this.commands.set(command.id, cancelled);
|
|
this.appendEvent(runId, "backend_status", { phase: "command-terminal", commandId: command.id, state: "cancelled", terminalStatus: "cancelled", failureKind: "cancelled" });
|
|
}
|
|
this.appendEvent(runId, "backend_status", { phase: "cancel-requested", reason });
|
|
const next = this.updateRun(runId, { status: "cancelled", terminalStatus: "cancelled", failureKind: "cancelled", failureMessage: reason });
|
|
this.appendEvent(runId, "terminal_status", { terminalStatus: "cancelled", failureKind: "cancelled", message: reason });
|
|
this.touchSessionForRun(next, { executionState: "terminal", activeRunId: null, activeCommandId: null, lastRunId: runId, terminalStatus: "cancelled", failureKind: "cancelled", lastActivityAt: next.updatedAt }, { bumpVersion: true, at: next.updatedAt });
|
|
return next;
|
|
}
|
|
|
|
cancelCommand(commandId: string, reason = "cancel requested"): CommandRecord {
|
|
const command = this.getCommand(commandId);
|
|
if (isTerminalCommandState(command.state)) return command;
|
|
const next = { ...command, state: "cancelled" as const, updatedAt: nowIso() };
|
|
this.commands.set(commandId, next);
|
|
this.appendEvent(command.runId, "backend_status", { phase: "command-terminal", commandId, state: "cancelled", terminalStatus: "cancelled", failureKind: "cancelled", message: reason });
|
|
if (command.type === "turn") this.touchSessionForRun(this.getRun(command.runId), { executionState: "terminal", activeRunId: null, activeCommandId: null, lastRunId: command.runId, lastCommandId: command.id, terminalStatus: "cancelled", failureKind: "cancelled", lastActivityAt: next.updatedAt }, { bumpVersion: true, at: next.updatedAt });
|
|
return next;
|
|
}
|
|
|
|
getSession(sessionId: string): SessionRecord | null {
|
|
return this.sessions.get(sessionId) ?? null;
|
|
}
|
|
|
|
getSessionSummary(sessionId: string, readerId: string | null = null): SessionSummary {
|
|
const session = this.getSession(sessionId);
|
|
if (!session) throw new AgentRunError("schema-invalid", `session ${sessionId} was not found`, { httpStatus: 404 });
|
|
return buildSessionSummary(session, readerId, readerId ? this.sessionReadCursors.get(sessionReadKey(sessionId, readerId)) ?? null : null);
|
|
}
|
|
|
|
listSessions(input: ListSessionsInput): SessionListResult {
|
|
const startVersion = parseSessionCursor(input.cursor) ?? 0;
|
|
const state = input.state ?? "default";
|
|
const items = Array.from(this.sessions.values())
|
|
.map((session) => buildSessionSummary(session, input.readerId ?? null, input.readerId ? this.sessionReadCursors.get(sessionReadKey(session.sessionId, input.readerId)) ?? null : null))
|
|
.filter((session) => session.version > startVersion)
|
|
.filter((session) => !input.backendProfile || session.backendProfile === input.backendProfile)
|
|
.filter((session) => sessionMatchesListState(session, state))
|
|
.sort(sessionSort)
|
|
.slice(0, clampSessionLimit(input.limit));
|
|
return { items, count: items.length, cursor: items.length > 0 ? String(items[items.length - 1]?.version ?? startVersion) : null, filters: sessionListFilters(input) };
|
|
}
|
|
|
|
listSessionTrace(sessionId: string, input: SessionEventPageInput): SessionEventPage {
|
|
const runId = this.resolveSessionRunId(sessionId, input.runId ?? null);
|
|
if (!runId) return { sessionId, runId: null, items: [], count: 0, cursor: null };
|
|
const items = this.listEvents(runId, input.afterSeq, input.limit);
|
|
return { sessionId, runId, items, count: items.length, cursor: items.length > 0 ? String(items[items.length - 1]?.seq ?? input.afterSeq) : null };
|
|
}
|
|
|
|
listSessionOutput(sessionId: string, input: SessionEventPageInput): SessionEventPage {
|
|
const page = this.listSessionTrace(sessionId, input);
|
|
const items = page.items.filter(isSessionOutputEvent);
|
|
return { ...page, items, count: items.length, cursor: items.length > 0 ? String(items[items.length - 1]?.seq ?? input.afterSeq) : null };
|
|
}
|
|
|
|
markSessionRead(sessionId: string, readerId: string): SessionReadCursorRecord {
|
|
const session = this.getSession(sessionId);
|
|
if (!session) throw new AgentRunError("schema-invalid", `session ${sessionId} was not found`, { httpStatus: 404 });
|
|
const record: SessionReadCursorRecord = { sessionId, readerId, sessionVersion: session.version, readAt: nowIso() };
|
|
this.sessionReadCursors.set(sessionReadKey(sessionId, readerId), record);
|
|
return record;
|
|
}
|
|
|
|
createQueueTask(input: CreateQueueTaskInput): QueueTaskRecord {
|
|
const payloadHash = stableHash(input.payload);
|
|
if (input.idempotencyKey) {
|
|
const existing = Array.from(this.queueTasks.values()).find((task) => task.tenantId === input.tenantId && task.projectId === input.projectId && task.idempotencyKey === input.idempotencyKey);
|
|
if (existing) {
|
|
if (existing.payloadHash !== payloadHash) throw new AgentRunError("schema-invalid", "queue task idempotency key reused with different payload", { httpStatus: 409 });
|
|
return existing;
|
|
}
|
|
}
|
|
const at = nowIso();
|
|
const task: QueueTaskRecord = { ...input, id: newId("qt"), state: "pending", version: this.nextQueueVersion(), payloadHash, latestAttempt: null, sessionPath: null, createdAt: at, updatedAt: at, cancelledAt: null, cancelReason: null };
|
|
this.queueTasks.set(task.id, task);
|
|
return task;
|
|
}
|
|
|
|
listQueueTasks(input: ListQueueTasksInput): QueueTaskListResult {
|
|
const startVersion = parseQueueCursor(input.cursor) ?? input.updatedAfter ?? 0;
|
|
const items = Array.from(this.queueTasks.values())
|
|
.filter((task) => task.version > startVersion)
|
|
.filter((task) => !input.queue || task.queue === input.queue)
|
|
.filter((task) => !input.state || task.state === input.state)
|
|
.sort(queueTaskSort)
|
|
.slice(0, clampQueueLimit(input.limit));
|
|
return { items, count: items.length, cursor: items.length > 0 ? String(items[items.length - 1]?.version ?? startVersion) : null };
|
|
}
|
|
|
|
getQueueTask(taskId: string): QueueTaskRecord {
|
|
const task = this.queueTasks.get(taskId);
|
|
if (!task) throw new AgentRunError("schema-invalid", `queue task ${taskId} was not found`, { httpStatus: 404 });
|
|
return task;
|
|
}
|
|
|
|
updateQueueTaskAttempt(taskId: string, input: UpdateQueueTaskAttemptInput): QueueTaskRecord {
|
|
const task = this.getQueueTask(taskId);
|
|
if (isTerminalQueueTaskState(task.state)) throw new AgentRunError(task.state === "cancelled" ? "cancelled" : "schema-invalid", `queue task ${taskId} is already terminal: ${task.state}`, { httpStatus: 409 });
|
|
const next: QueueTaskRecord = { ...task, state: input.state, latestAttempt: input.latestAttempt, sessionPath: input.sessionPath, version: this.nextQueueVersion(), updatedAt: nowIso() };
|
|
this.queueTasks.set(taskId, next);
|
|
return next;
|
|
}
|
|
|
|
cancelQueueTask(taskId: string, reason = "cancel requested"): QueueTaskRecord {
|
|
const task = this.getQueueTask(taskId);
|
|
if (isTerminalQueueTaskState(task.state)) return task;
|
|
const at = nowIso();
|
|
const next: QueueTaskRecord = { ...task, state: "cancelled", version: this.nextQueueVersion(), updatedAt: at, cancelledAt: at, cancelReason: reason };
|
|
this.queueTasks.set(taskId, next);
|
|
return next;
|
|
}
|
|
|
|
markQueueTaskRead(taskId: string, readerId: string): QueueReadCursorRecord {
|
|
const task = this.getQueueTask(taskId);
|
|
const record: QueueReadCursorRecord = { taskId, readerId, taskVersion: task.version, readAt: nowIso() };
|
|
this.queueReadCursors.set(queueReadKey(taskId, readerId), record);
|
|
return record;
|
|
}
|
|
|
|
queueStats(queue?: string): QueueStats {
|
|
return buildQueueStats(Array.from(this.queueTasks.values()).filter((task) => !queue || task.queue === queue), queue ?? null);
|
|
}
|
|
|
|
queueCommander(queue?: string): QueueCommanderSnapshot {
|
|
const items = Array.from(this.queueTasks.values()).filter((task) => !queue || task.queue === queue).sort(queueTaskSort).slice(0, 20);
|
|
const generatedAt = nowIso();
|
|
return { queue: queue ?? null, stats: buildQueueStats(Array.from(this.queueTasks.values()).filter((task) => !queue || task.queue === queue), queue ?? null, generatedAt), items, generatedAt };
|
|
}
|
|
|
|
backends(): JsonRecord[] {
|
|
return backendCapabilities();
|
|
}
|
|
|
|
private updateRun(runId: string, patch: Partial<RunRecord>): RunRecord {
|
|
const run = this.getRun(runId);
|
|
const next = { ...run, ...patch, updatedAt: nowIso() };
|
|
this.runs.set(runId, next);
|
|
return next;
|
|
}
|
|
|
|
private nextQueueVersion(): number {
|
|
this.queueVersion += 1;
|
|
return this.queueVersion;
|
|
}
|
|
|
|
private resolveSessionForRun(input: CreateRunInput, at: string): SessionRef | null {
|
|
if (!input.sessionRef) return null;
|
|
const existing = this.sessions.get(input.sessionRef.sessionId);
|
|
if (existing) {
|
|
assertSessionBoundary(existing, input);
|
|
return sessionRefFromRecord(existing, input.sessionRef);
|
|
}
|
|
const record: SessionRecord = {
|
|
sessionId: input.sessionRef.sessionId,
|
|
tenantId: input.tenantId,
|
|
projectId: input.projectId,
|
|
backendProfile: input.backendProfile,
|
|
conversationId: input.sessionRef.conversationId ?? null,
|
|
threadId: input.sessionRef.threadId ?? null,
|
|
metadata: input.sessionRef.metadata ?? {},
|
|
version: this.nextSessionVersion(),
|
|
executionState: "idle",
|
|
lastRunId: null,
|
|
lastCommandId: null,
|
|
activeRunId: null,
|
|
activeCommandId: null,
|
|
lastEventSeq: 0,
|
|
terminalStatus: null,
|
|
failureKind: null,
|
|
title: titleFromMetadata(input.sessionRef.metadata ?? {}),
|
|
summary: {},
|
|
lastActivityAt: at,
|
|
createdAt: at,
|
|
updatedAt: at,
|
|
expiresAt: input.sessionRef.expiresAt ?? null,
|
|
};
|
|
this.sessions.set(record.sessionId, record);
|
|
return sessionRefFromRecord(record, input.sessionRef);
|
|
}
|
|
|
|
private upsertSessionThread(run: RunRecord, threadId: string, turnId: string | null): void {
|
|
if (!run.sessionRef?.sessionId) return;
|
|
const at = nowIso();
|
|
const existing = this.sessions.get(run.sessionRef.sessionId);
|
|
const record: SessionRecord = {
|
|
sessionId: run.sessionRef.sessionId,
|
|
tenantId: run.tenantId,
|
|
projectId: run.projectId,
|
|
backendProfile: run.backendProfile,
|
|
conversationId: run.sessionRef.conversationId ?? existing?.conversationId ?? null,
|
|
threadId,
|
|
metadata: { ...(existing?.metadata ?? {}), ...(run.sessionRef.metadata ?? {}), ...(turnId ? { lastTurnId: turnId } : {}) },
|
|
version: this.nextSessionVersion(),
|
|
executionState: existing?.executionState ?? "idle",
|
|
lastRunId: existing?.lastRunId ?? run.id,
|
|
lastCommandId: existing?.lastCommandId ?? null,
|
|
activeRunId: existing?.activeRunId ?? null,
|
|
activeCommandId: existing?.activeCommandId ?? null,
|
|
lastEventSeq: existing?.lastEventSeq ?? 0,
|
|
terminalStatus: existing?.terminalStatus ?? null,
|
|
failureKind: existing?.failureKind ?? null,
|
|
title: existing?.title ?? titleFromMetadata(run.sessionRef.metadata ?? {}),
|
|
summary: existing?.summary ?? {},
|
|
lastActivityAt: at,
|
|
createdAt: existing?.createdAt ?? at,
|
|
updatedAt: at,
|
|
expiresAt: run.sessionRef.expiresAt ?? existing?.expiresAt ?? null,
|
|
};
|
|
this.sessions.set(record.sessionId, record);
|
|
const nextSessionRef = sessionRefFromRecord(record, run.sessionRef);
|
|
this.updateRun(run.id, { sessionRef: nextSessionRef });
|
|
this.appendEvent(run.id, "backend_status", { phase: "session-updated", sessionRef: summarizeSessionRef(nextSessionRef), turnId });
|
|
}
|
|
|
|
private touchSessionForRun(run: RunRecord, patch: Partial<SessionRecord>, options: { bumpVersion: boolean; at?: string }): void {
|
|
const sessionId = run.sessionRef?.sessionId;
|
|
if (!sessionId) return;
|
|
const existing = this.sessions.get(sessionId);
|
|
if (!existing) return;
|
|
const at = options.at ?? nowIso();
|
|
const next: SessionRecord = { ...existing, ...patch, version: options.bumpVersion ? this.nextSessionVersion() : existing.version, updatedAt: at, lastActivityAt: patch.lastActivityAt ?? at };
|
|
this.sessions.set(sessionId, next);
|
|
}
|
|
|
|
private resolveSessionRunId(sessionId: string, requestedRunId: string | null): string | null {
|
|
const session = this.getSession(sessionId);
|
|
if (!session) throw new AgentRunError("schema-invalid", `session ${sessionId} was not found`, { httpStatus: 404 });
|
|
if (requestedRunId) {
|
|
const run = this.getRun(requestedRunId);
|
|
if (run.sessionRef?.sessionId !== sessionId) throw new AgentRunError("schema-invalid", `run ${requestedRunId} does not belong to session ${sessionId}`, { httpStatus: 404 });
|
|
return requestedRunId;
|
|
}
|
|
return session.activeRunId ?? session.lastRunId;
|
|
}
|
|
|
|
private nextSessionVersion(): number {
|
|
this.sessionVersion += 1;
|
|
return this.sessionVersion;
|
|
}
|
|
}
|
|
|
|
export function assertSessionBoundary(existing: SessionRecord, input: CreateRunInput): void {
|
|
if (existing.tenantId !== input.tenantId || existing.projectId !== input.projectId) {
|
|
throw new AgentRunError("tenant-policy-denied", "sessionRef cannot be reused across tenant or project boundary", { httpStatus: 403, details: { sessionId: existing.sessionId, valuesPrinted: false } });
|
|
}
|
|
if (existing.backendProfile !== input.backendProfile) {
|
|
throw new AgentRunError("schema-invalid", "sessionRef cannot be reused across backendProfile boundary", { httpStatus: 400, details: { sessionId: existing.sessionId, existingBackendProfile: existing.backendProfile, requestedBackendProfile: input.backendProfile, valuesPrinted: false } });
|
|
}
|
|
}
|
|
|
|
export function statusFromTerminal(terminalStatus: TerminalStatus): RunRecord["status"] {
|
|
if (terminalStatus === "completed") return "completed";
|
|
if (terminalStatus === "cancelled") return "cancelled";
|
|
if (terminalStatus === "blocked") return "blocked";
|
|
return "failed";
|
|
}
|
|
|
|
export function commandStateFromTerminal(terminalStatus: TerminalStatus): CommandRecord["state"] {
|
|
if (terminalStatus === "completed") return "completed";
|
|
if (terminalStatus === "cancelled") return "cancelled";
|
|
return "failed";
|
|
}
|
|
|
|
export function isTerminalRunStatus(status: RunRecord["status"]): boolean {
|
|
return status === "completed" || status === "failed" || status === "blocked" || status === "cancelled";
|
|
}
|
|
|
|
export function isTerminalCommandState(state: CommandRecord["state"]): boolean {
|
|
return state === "completed" || state === "failed" || state === "cancelled";
|
|
}
|
|
|
|
export function isTerminalQueueTaskState(state: QueueTaskState): boolean {
|
|
return state === "completed" || state === "failed" || state === "blocked" || state === "cancelled";
|
|
}
|
|
|
|
export function isLeaseExpired(leaseExpiresAt: string | null): boolean {
|
|
if (!leaseExpiresAt) return true;
|
|
return new Date(leaseExpiresAt).getTime() <= Date.now();
|
|
}
|
|
|
|
export function sessionRefFromRecord(record: SessionRecord, fallback: SessionRef): SessionRef {
|
|
return {
|
|
sessionId: record.sessionId,
|
|
...(record.conversationId ? { conversationId: record.conversationId } : fallback.conversationId ? { conversationId: fallback.conversationId } : {}),
|
|
...(record.threadId ? { threadId: record.threadId } : fallback.threadId ? { threadId: fallback.threadId } : {}),
|
|
...(record.expiresAt ? { expiresAt: record.expiresAt } : fallback.expiresAt ? { expiresAt: fallback.expiresAt } : {}),
|
|
...(Object.keys(record.metadata).length > 0 ? { metadata: record.metadata } : fallback.metadata ? { metadata: fallback.metadata } : {}),
|
|
};
|
|
}
|
|
|
|
export function buildSessionSummary(record: SessionRecord, readerId: string | null, readCursor: SessionReadCursorRecord | null): SessionSummary {
|
|
const active = record.executionState === "running" || record.activeRunId !== null || record.activeCommandId !== null;
|
|
const unread = !active && (!readCursor || readCursor.sessionVersion < record.version);
|
|
const attentionState = active ? "active" : unread ? "unread" : "read";
|
|
return { ...record, sessionPath: `${record.tenantId}/${record.projectId}/${record.sessionId}`, readerId, readCursor, attentionState, unread, active };
|
|
}
|
|
|
|
export function sessionMatchesListState(session: SessionSummary, state: SessionListState): boolean {
|
|
if (state === "all") return true;
|
|
if (state === "default") return session.active || session.unread;
|
|
if (state === "running") return session.active;
|
|
if (state === "unread") return session.unread;
|
|
if (state === "terminal") return session.executionState === "terminal";
|
|
if (state === "idle") return session.executionState === "idle";
|
|
return false;
|
|
}
|
|
|
|
export function sessionSort(a: SessionSummary, b: SessionSummary): number {
|
|
if (a.active !== b.active) return a.active ? -1 : 1;
|
|
if (a.unread !== b.unread) return a.unread ? -1 : 1;
|
|
return (b.lastActivityAt ?? b.updatedAt).localeCompare(a.lastActivityAt ?? a.updatedAt) || b.updatedAt.localeCompare(a.updatedAt) || a.sessionId.localeCompare(b.sessionId);
|
|
}
|
|
|
|
export function clampSessionLimit(limit: number): number {
|
|
return Math.max(1, Math.min(Number.isFinite(limit) ? Math.trunc(limit) : 50, 100));
|
|
}
|
|
|
|
export function parseSessionCursor(cursor: string | undefined): number | null {
|
|
if (!cursor) return null;
|
|
const value = Number(cursor);
|
|
return Number.isInteger(value) && value >= 0 ? value : null;
|
|
}
|
|
|
|
export function sessionListFilters(input: ListSessionsInput): JsonRecord {
|
|
return { state: input.state ?? "default", backendProfile: input.backendProfile ?? null, readerId: input.readerId ?? null, cursor: input.cursor ?? null, limit: clampSessionLimit(input.limit) };
|
|
}
|
|
|
|
export function summarizeSessionRef(sessionRef: SessionRef | null): JsonRecord | null {
|
|
if (!sessionRef) return null;
|
|
return {
|
|
sessionId: sessionRef.sessionId,
|
|
conversationId: sessionRef.conversationId ?? null,
|
|
threadId: sessionRef.threadId ?? null,
|
|
expiresAt: sessionRef.expiresAt ?? null,
|
|
metadataKeys: Object.keys(sessionRef.metadata ?? {}).sort(),
|
|
valuesPrinted: false,
|
|
};
|
|
}
|
|
|
|
export function summarizeResourceBundleRef(resourceBundleRef: RunRecord["resourceBundleRef"] | null | undefined): JsonRecord | null {
|
|
if (!resourceBundleRef) return null;
|
|
return {
|
|
kind: resourceBundleRef.kind,
|
|
repoUrl: resourceBundleRef.repoUrl,
|
|
commitId: resourceBundleRef.commitId,
|
|
subdir: resourceBundleRef.subdir ?? null,
|
|
sparsePathCount: resourceBundleRef.sparsePaths?.length ?? 0,
|
|
toolAliases: resourceBundleRef.toolAliases ? { count: resourceBundleRef.toolAliases.length, names: resourceBundleRef.toolAliases.map((item) => item.name), valuesPrinted: false } : { count: 0, names: [], valuesPrinted: false },
|
|
promptRefs: resourceBundleRef.promptRefs ? { count: resourceBundleRef.promptRefs.length, names: resourceBundleRef.promptRefs.map((item) => item.name), required: resourceBundleRef.promptRefs.filter((item) => item.required === true).map((item) => item.name), valuesPrinted: false } : { count: 0, names: [], required: [], valuesPrinted: false },
|
|
skillRefs: resourceBundleRef.skillRefs ? { count: resourceBundleRef.skillRefs.length, names: resourceBundleRef.skillRefs.map((item) => item.name), required: resourceBundleRef.skillRefs.filter((item) => item.required === true).map((item) => item.name), valuesPrinted: false } : { count: 0, names: [], required: [], valuesPrinted: false },
|
|
submodules: resourceBundleRef.submodules ?? false,
|
|
lfs: resourceBundleRef.lfs ?? false,
|
|
credentialRef: resourceBundleRef.credentialRef ? { name: resourceBundleRef.credentialRef.name, namespace: resourceBundleRef.credentialRef.namespace ?? null, keys: resourceBundleRef.credentialRef.keys ?? [], valuesPrinted: false } : null,
|
|
};
|
|
}
|
|
|
|
export function queueTaskSort(a: QueueTaskRecord, b: QueueTaskRecord): number {
|
|
if (a.priority !== b.priority) return b.priority - a.priority;
|
|
return a.createdAt.localeCompare(b.createdAt) || a.id.localeCompare(b.id);
|
|
}
|
|
|
|
export function buildQueueStats(tasks: QueueTaskRecord[], queue: string | null, generatedAt = nowIso()): QueueStats {
|
|
const byState: Record<string, number> = {};
|
|
const byLane: Record<string, number> = {};
|
|
let maxVersion = 0;
|
|
for (const task of tasks) {
|
|
byState[task.state] = (byState[task.state] ?? 0) + 1;
|
|
byLane[task.lane] = (byLane[task.lane] ?? 0) + 1;
|
|
maxVersion = Math.max(maxVersion, task.version);
|
|
}
|
|
return { queue, total: tasks.length, byState, byLane, maxVersion, generatedAt };
|
|
}
|
|
|
|
export function clampQueueLimit(limit: number): number {
|
|
return Math.max(1, Math.min(Number.isFinite(limit) ? Math.trunc(limit) : 50, 100));
|
|
}
|
|
|
|
export function parseQueueCursor(cursor: string | undefined): number | null {
|
|
if (!cursor) return null;
|
|
const value = Number(cursor);
|
|
return Number.isInteger(value) && value >= 0 ? value : null;
|
|
}
|
|
|
|
function queueReadKey(taskId: string, readerId: string): string {
|
|
return `${taskId}:${readerId}`;
|
|
}
|
|
|
|
export function sessionReadKey(sessionId: string, readerId: string): string {
|
|
return `${sessionId}:${readerId}`;
|
|
}
|
|
|
|
export function titleFromMetadata(metadata: JsonRecord): string | null {
|
|
const title = metadata.title;
|
|
return typeof title === "string" && title.trim().length > 0 ? title.trim().slice(0, 200) : null;
|
|
}
|
|
|
|
export function sessionTitleFromCommand(command: CommandRecord): string | null {
|
|
const value = command.payload.prompt;
|
|
if (typeof value !== "string") return null;
|
|
return value.trim().replace(/\s+/gu, " ").slice(0, 120) || null;
|
|
}
|
|
|
|
export function isSessionOutputEvent(event: RunEvent): boolean {
|
|
return event.type === "assistant_message" || event.type === "command_output" || event.type === "diff" || event.type === "error" || event.type === "terminal_status";
|
|
}
|