import type { BackendProfile, BackendTurnResult, CommandRecord, CreateCommandInput, CreateQueueTaskInput, CreateRunInput, FailureKind, JsonRecord, ListGcExpiredSessionsInput, QueueAttemptRef, QueueCommanderSnapshot, QueueReadCursorRecord, QueueStats, QueueTaskListResult, QueueTaskRecord, QueueTaskState, QueueTaskSummary, RunEvent, RunnerJobRecord, RunnerRecord, RunRecord, SessionEventPage, SessionListResult, SessionListState, SessionReadCursorRecord, SessionRecord, SessionRef, SessionStoragePatch, SessionSummary, TerminalStatus, UpsertSessionInput } from "../common/types.js"; import { AgentRunError } from "../common/errors.js"; import { newId, nowIso, stableHash } from "../common/validation.js"; import { redactJson } from "../common/redaction.js"; import { backendCapabilities } from "../common/backend-profiles.js"; import { normalizeRunEventPayload, requireEventType } from "../common/events.js"; export type MaybePromise = T | Promise; export interface StoreHealth extends JsonRecord { adapter: "memory-self-test" | "postgres"; ready: boolean; reachable: boolean; migrationReady: boolean; migrationId: string | null; failureKind: FailureKind | null; message: string | null; credentialValuesPrinted: false; } export interface AgentRunStore { health(): MaybePromise; createRun(input: CreateRunInput): MaybePromise; getRun(runId: string): MaybePromise; listEvents(runId: string, afterSeq: number, limit: number): MaybePromise; createCommand(runId: string, input: CreateCommandInput): MaybePromise; getCommand(commandId: string): MaybePromise; listCommands(runId: string, afterSeq: number, limit: number): MaybePromise; registerRunner(input: Partial): MaybePromise; listRunnerJobs(runId: string, commandId?: string): MaybePromise; getRunnerJobByIdempotencyKey(runId: string, idempotencyKey: string, payloadHash: string): MaybePromise; saveRunnerJob(input: SaveRunnerJobInput): MaybePromise; claimRun(runId: string, runnerId: string, leaseMs: number): MaybePromise; heartbeat(runId: string, runnerId: string, leaseMs: number): MaybePromise; ackCommand(commandId: string): MaybePromise; finishCommand(commandId: string, result: Pick): MaybePromise; appendEvent(runId: string, type: RunEvent["type"], payload: JsonRecord): MaybePromise; finishRun(runId: string, result: Pick): MaybePromise; cancelRun(runId: string, reason?: string): MaybePromise; cancelCommand(commandId: string, reason?: string): MaybePromise; getSession(sessionId: string): MaybePromise; getSessionSummary(sessionId: string, readerId?: string | null): MaybePromise; listSessions(input: ListSessionsInput): MaybePromise; listSessionTrace(sessionId: string, input: SessionEventPageInput): MaybePromise; listSessionOutput(sessionId: string, input: SessionEventPageInput): MaybePromise; markSessionRead(sessionId: string, readerId: string): MaybePromise; upsertSession(input: UpsertSessionInput): MaybePromise; refreshSessionStorage(input: SessionStoragePatch): MaybePromise; markSessionStorageEvicted(input: { sessionId: string; pvcName: string }): MaybePromise; listGcExpiredSessions(input: ListGcExpiredSessionsInput): MaybePromise; createQueueTask(input: CreateQueueTaskInput): MaybePromise; listQueueTasks(input: ListQueueTasksInput): MaybePromise; getQueueTask(taskId: string): MaybePromise; updateQueueTaskAttempt(taskId: string, input: UpdateQueueTaskAttemptInput): MaybePromise; cancelQueueTask(taskId: string, reason?: string): MaybePromise; markQueueTaskRead(taskId: string, readerId: string): MaybePromise; queueStats(queue?: string): MaybePromise; queueCommander(queue?: string, readerId?: string | null): MaybePromise; backends(): MaybePromise; close?(): MaybePromise; } export interface ListQueueTasksInput { queue?: string; state?: QueueTaskState; cursor?: string; limit: number; updatedAfter?: number; } export interface ListSessionsInput { state?: SessionListState; backendProfile?: BackendProfile; readerId?: string | null; cursor?: string; limit: number; } export interface SessionEventPageInput { runId?: string | null; afterSeq: number; limit: number; } export interface UpdateQueueTaskAttemptInput { state: QueueTaskState; latestAttempt: QueueAttemptRef; sessionPath: string | null; } export interface SaveRunnerJobInput { id?: string; runId: string; commandId: string; idempotencyKey?: string | null; payloadHash: string; attemptId: string; runnerId: string; namespace: string; jobName: string; managerUrl: string; image: string; sourceCommit: string; serviceAccountName?: string | null; result: JsonRecord; } export async function openAgentRunStoreFromEnv(env: NodeJS.ProcessEnv = process.env): Promise { const databaseUrl = env.DATABASE_URL?.trim(); if (databaseUrl) { const { createPostgresAgentRunStore } = await import("./postgres-store.js"); return createPostgresAgentRunStore({ connectionString: databaseUrl }); } const storeMode = env.AGENTRUN_STORE ?? env.AGENTRUN_MGR_STORE; if (storeMode === "memory") return new MemoryAgentRunStore(); throw new AgentRunError("infra-failed", "DATABASE_URL is required for agentrun-mgr live runtime; set AGENTRUN_STORE=memory only for explicit self-test/dev mode", { httpStatus: 503, details: { adapter: "postgres", databaseUrl: "missing", memoryFallback: "disabled" } }); } export class MemoryAgentRunStore implements AgentRunStore { private readonly runs = new Map(); private readonly commands = new Map(); private readonly eventsByRun = new Map(); private readonly runners = new Map(); private readonly sessions = new Map(); private readonly sessionReadCursors = new Map(); private readonly runnerJobs = new Map(); private readonly queueTasks = new Map(); private readonly queueReadCursors = new Map(); private queueVersion = 0; private sessionVersion = 0; health(): StoreHealth { return { adapter: "memory-self-test", ready: true, reachable: true, migrationReady: true, migrationId: "memory-self-test", failureKind: null, message: null, credentialValuesPrinted: false }; } createRun(input: CreateRunInput): RunRecord { const at = nowIso(); const sessionRef = this.resolveSessionForRun(input, at); const run: RunRecord = { ...input, sessionRef, resourceBundleRef: input.resourceBundleRef ?? null, id: newId("run"), status: "pending", terminalStatus: null, failureKind: null, failureMessage: null, createdAt: at, updatedAt: at, claimedBy: null, leaseExpiresAt: null }; this.runs.set(run.id, run); this.eventsByRun.set(run.id, []); this.touchSessionForRun(run, { lastRunId: run.id, lastActivityAt: at }, { bumpVersion: false, at }); this.appendEvent(run.id, "backend_status", { phase: "run-created", backendProfile: run.backendProfile, sessionRef: summarizeSessionRef(run.sessionRef ?? null), resourceBundleRef: summarizeResourceBundleRef(run.resourceBundleRef ?? null) }); return run; } getRun(runId: string): RunRecord { const run = this.runs.get(runId); if (!run) throw new AgentRunError("schema-invalid", `run ${runId} was not found`, { httpStatus: 404 }); return run; } listEvents(runId: string, afterSeq: number, limit: number): RunEvent[] { this.getRun(runId); return (this.eventsByRun.get(runId) ?? []).filter((event) => event.seq > afterSeq).slice(0, Math.max(1, Math.min(limit, 500))); } createCommand(runId: string, input: CreateCommandInput): CommandRecord { const run = this.getRun(runId); if (isTerminalRunStatus(run.status)) throw new AgentRunError(run.failureKind ?? (run.status === "cancelled" ? "cancelled" : "schema-invalid"), `run ${runId} is already terminal: ${run.status}`, { httpStatus: 409 }); const payloadHash = stableHash(input.payload); if (input.idempotencyKey) { const existing = Array.from(this.commands.values()).find((command) => command.runId === runId && command.idempotencyKey === input.idempotencyKey); if (existing) { if (existing.payloadHash !== payloadHash) throw new AgentRunError("schema-invalid", "idempotency key reused with different payload", { httpStatus: 409 }); return existing; } } const at = nowIso(); const seq = Array.from(this.commands.values()).filter((command) => command.runId === runId).length + 1; const command: CommandRecord = { ...input, id: newId("cmd"), runId, seq, state: "pending", payloadHash, createdAt: at, updatedAt: at, acknowledgedAt: null }; this.commands.set(command.id, command); if (command.type === "turn") this.touchSessionForRun(run, { executionState: "running", lastRunId: run.id, lastCommandId: command.id, activeRunId: run.id, activeCommandId: command.id, terminalStatus: null, failureKind: null, title: sessionTitleFromCommand(command), lastActivityAt: at }, { bumpVersion: true, at }); else if (command.type === "steer") this.touchSessionForRun(run, { executionState: "running", lastRunId: run.id, lastCommandId: command.id, activeRunId: run.id, lastActivityAt: at }, { bumpVersion: true, at }); this.appendEvent(runId, "backend_status", { phase: "command-created", commandId: command.id, commandType: command.type }); return command; } getCommand(commandId: string): CommandRecord { const command = this.commands.get(commandId); if (!command) throw new AgentRunError("schema-invalid", `command ${commandId} was not found`, { httpStatus: 404 }); return command; } listCommands(runId: string, afterSeq: number, limit: number): CommandRecord[] { this.getRun(runId); return Array.from(this.commands.values()).filter((command) => command.runId === runId && command.seq > afterSeq).slice(0, Math.max(1, Math.min(limit, 100))); } registerRunner(input: Partial): RunnerRecord { const at = nowIso(); const runner: RunnerRecord = { id: input.id ?? newId("runner"), registeredAt: at, heartbeatAt: at, ...input }; this.runners.set(runner.id, runner); return runner; } listRunnerJobs(runId: string, commandId?: string): RunnerJobRecord[] { this.getRun(runId); return Array.from(this.runnerJobs.values()) .filter((job) => job.runId === runId && (!commandId || job.commandId === commandId)) .sort((a, b) => a.createdAt.localeCompare(b.createdAt)); } getRunnerJobByIdempotencyKey(runId: string, idempotencyKey: string, payloadHash: string): RunnerJobRecord | null { const existing = Array.from(this.runnerJobs.values()).find((job) => job.runId === runId && job.idempotencyKey === idempotencyKey); if (!existing) return null; if (existing.payloadHash !== payloadHash) throw new AgentRunError("schema-invalid", "runner job idempotency key reused with different payload", { httpStatus: 409 }); return existing; } saveRunnerJob(input: SaveRunnerJobInput): RunnerJobRecord { if (input.idempotencyKey) { const existing = this.getRunnerJobByIdempotencyKey(input.runId, input.idempotencyKey, input.payloadHash); if (existing) return existing; } const at = nowIso(); const record: RunnerJobRecord = { ...input, id: input.id ?? newId("rjob"), idempotencyKey: input.idempotencyKey ?? null, serviceAccountName: input.serviceAccountName ?? null, createdAt: at, updatedAt: at }; this.runnerJobs.set(record.id, record); return record; } claimRun(runId: string, runnerId: string, leaseMs: number): RunRecord { const run = this.getRun(runId); if (isTerminalRunStatus(run.status)) throw new AgentRunError(run.failureKind ?? (run.status === "cancelled" ? "cancelled" : "schema-invalid"), `run ${runId} is already terminal: ${run.status}`, { httpStatus: 409 }); if (run.claimedBy && run.claimedBy !== runnerId && !isLeaseExpired(run.leaseExpiresAt)) throw new AgentRunError("runner-lease-conflict", `run ${runId} is already claimed`, { httpStatus: 409 }); const next = this.updateRun(runId, { status: "claimed", claimedBy: runnerId, leaseExpiresAt: new Date(Date.now() + leaseMs).toISOString() }); this.touchSessionForRun(next, { executionState: "running", activeRunId: runId, lastRunId: runId, lastActivityAt: next.updatedAt }, { bumpVersion: false, at: next.updatedAt }); this.appendEvent(runId, "backend_status", { phase: "run-claimed", runnerId }); return next; } heartbeat(runId: string, runnerId: string, leaseMs: number): RunRecord { const run = this.getRun(runId); if (isTerminalRunStatus(run.status)) return run; if (run.claimedBy !== runnerId) throw new AgentRunError("runner-lease-conflict", `run ${runId} is not claimed by ${runnerId}`, { httpStatus: 409 }); return this.updateRun(runId, { leaseExpiresAt: new Date(Date.now() + leaseMs).toISOString() }); } ackCommand(commandId: string): CommandRecord { const command = this.getCommand(commandId); if (isTerminalCommandState(command.state) || command.state === "acknowledged") return command; const next = { ...command, state: "acknowledged" as const, acknowledgedAt: nowIso(), updatedAt: nowIso() }; this.commands.set(commandId, next); return next; } finishCommand(commandId: string, result: Pick): CommandRecord { const command = this.getCommand(commandId); if (isTerminalCommandState(command.state)) return command; const next = { ...command, state: commandStateFromTerminal(result.terminalStatus), updatedAt: nowIso() }; this.commands.set(commandId, next); const run = this.getRun(command.runId); if (result.threadId && run.sessionRef?.sessionId) this.upsertSessionThread(run, result.threadId, result.turnId ?? null); if (command.type === "turn") this.touchSessionForRun(this.getRun(command.runId), { executionState: "terminal", activeRunId: null, activeCommandId: null, lastRunId: command.runId, lastCommandId: command.id, terminalStatus: result.terminalStatus, failureKind: result.failureKind, lastActivityAt: next.updatedAt }, { bumpVersion: true, at: next.updatedAt }); this.appendEvent(command.runId, "backend_status", { phase: "command-terminal", commandId, state: next.state, terminalStatus: result.terminalStatus, failureKind: result.failureKind, message: result.failureMessage ?? null, threadId: result.threadId ?? null, turnId: result.turnId ?? null }); return next; } appendEvent(runId: string, type: RunEvent["type"], payload: JsonRecord): RunEvent { this.getRun(runId); const eventType = requireEventType(type); const eventPayload = normalizeRunEventPayload(eventType, payload); const events = this.eventsByRun.get(runId) ?? []; const event: RunEvent = { id: newId("evt"), runId, seq: events.length + 1, type: eventType, payload: redactJson(eventPayload), createdAt: nowIso() }; events.push(event); this.eventsByRun.set(runId, events); this.touchSessionForRun(this.getRun(runId), { lastEventSeq: event.seq, lastActivityAt: event.createdAt }, { bumpVersion: false, at: event.createdAt }); return event; } finishRun(runId: string, result: Pick): RunRecord { const existing = this.getRun(runId); if (isTerminalRunStatus(existing.status)) return existing; const status = statusFromTerminal(result.terminalStatus); const next = this.updateRun(runId, { status, terminalStatus: result.terminalStatus, failureKind: result.failureKind, failureMessage: result.failureMessage }); if (result.threadId && next.sessionRef?.sessionId) this.upsertSessionThread(next, result.threadId, result.turnId ?? null); this.appendEvent(runId, "terminal_status", { terminalStatus: result.terminalStatus, failureKind: result.failureKind, message: result.failureMessage }); this.touchSessionForRun(this.getRun(runId), { executionState: "terminal", activeRunId: null, activeCommandId: null, lastRunId: runId, terminalStatus: result.terminalStatus, failureKind: result.failureKind, lastActivityAt: next.updatedAt }, { bumpVersion: true, at: next.updatedAt }); return next; } cancelRun(runId: string, reason = "cancel requested"): RunRecord { const run = this.getRun(runId); if (isTerminalRunStatus(run.status)) return run; const at = nowIso(); for (const command of Array.from(this.commands.values()).filter((item) => item.runId === runId && !isTerminalCommandState(item.state))) { const cancelled = { ...command, state: "cancelled" as const, updatedAt: at }; this.commands.set(command.id, cancelled); this.appendEvent(runId, "backend_status", { phase: "command-terminal", commandId: command.id, state: "cancelled", terminalStatus: "cancelled", failureKind: "cancelled" }); } this.appendEvent(runId, "backend_status", { phase: "cancel-requested", reason }); const next = this.updateRun(runId, { status: "cancelled", terminalStatus: "cancelled", failureKind: "cancelled", failureMessage: reason }); this.appendEvent(runId, "terminal_status", { terminalStatus: "cancelled", failureKind: "cancelled", message: reason }); this.touchSessionForRun(next, { executionState: "terminal", activeRunId: null, activeCommandId: null, lastRunId: runId, terminalStatus: "cancelled", failureKind: "cancelled", lastActivityAt: next.updatedAt }, { bumpVersion: true, at: next.updatedAt }); return next; } cancelCommand(commandId: string, reason = "cancel requested"): CommandRecord { const command = this.getCommand(commandId); if (isTerminalCommandState(command.state)) return command; const next = { ...command, state: "cancelled" as const, updatedAt: nowIso() }; this.commands.set(commandId, next); this.appendEvent(command.runId, "backend_status", { phase: "command-terminal", commandId, state: "cancelled", terminalStatus: "cancelled", failureKind: "cancelled", message: reason }); if (command.type === "turn") this.touchSessionForRun(this.getRun(command.runId), { executionState: "terminal", activeRunId: null, activeCommandId: null, lastRunId: command.runId, lastCommandId: command.id, terminalStatus: "cancelled", failureKind: "cancelled", lastActivityAt: next.updatedAt }, { bumpVersion: true, at: next.updatedAt }); return next; } getSession(sessionId: string): SessionRecord | null { return this.sessions.get(sessionId) ?? null; } getSessionSummary(sessionId: string, readerId: string | null = null): SessionSummary { const session = this.getSession(sessionId); if (!session) throw new AgentRunError("schema-invalid", `session ${sessionId} was not found`, { httpStatus: 404 }); return buildSessionSummary(session, readerId, readerId ? this.sessionReadCursors.get(sessionReadKey(sessionId, readerId)) ?? null : null); } listSessions(input: ListSessionsInput): SessionListResult { const startVersion = parseSessionCursor(input.cursor) ?? 0; const state = input.state ?? "default"; const items = Array.from(this.sessions.values()) .map((session) => buildSessionSummary(session, input.readerId ?? null, input.readerId ? this.sessionReadCursors.get(sessionReadKey(session.sessionId, input.readerId)) ?? null : null)) .filter((session) => session.version > startVersion) .filter((session) => !input.backendProfile || session.backendProfile === input.backendProfile) .filter((session) => sessionMatchesListState(session, state)) .sort(sessionSort) .slice(0, clampSessionLimit(input.limit)); return { items, count: items.length, cursor: items.length > 0 ? String(items[items.length - 1]?.version ?? startVersion) : null, filters: sessionListFilters(input) }; } listSessionTrace(sessionId: string, input: SessionEventPageInput): SessionEventPage { const runId = this.resolveSessionRunId(sessionId, input.runId ?? null); if (!runId) return { sessionId, runId: null, items: [], count: 0, cursor: null }; const items = this.listEvents(runId, input.afterSeq, input.limit); return { sessionId, runId, items, count: items.length, cursor: items.length > 0 ? String(items[items.length - 1]?.seq ?? input.afterSeq) : null }; } listSessionOutput(sessionId: string, input: SessionEventPageInput): SessionEventPage { const page = this.listSessionTrace(sessionId, input); const items = page.items.filter(isSessionOutputEvent); return { ...page, items, count: items.length, cursor: items.length > 0 ? String(items[items.length - 1]?.seq ?? input.afterSeq) : null }; } markSessionRead(sessionId: string, readerId: string): SessionReadCursorRecord { const session = this.getSession(sessionId); if (!session) throw new AgentRunError("schema-invalid", `session ${sessionId} was not found`, { httpStatus: 404 }); const record: SessionReadCursorRecord = { sessionId, readerId, sessionVersion: session.version, readAt: nowIso() }; this.sessionReadCursors.set(sessionReadKey(sessionId, readerId), record); return record; } upsertSession(input: UpsertSessionInput): SessionRecord { const at = nowIso(); const existing = this.sessions.get(input.sessionId); if (existing) { const next: SessionRecord = { ...existing, tenantId: input.tenantId, projectId: input.projectId, backendProfile: input.backendProfile, conversationId: input.conversationId, threadId: input.threadId, metadata: input.metadata, expiresAt: input.expiresAt, codexRolloutSubdir: input.codexRolloutSubdir, version: existing.version + 1, updatedAt: at, }; this.sessions.set(input.sessionId, next); this.sessionVersion = Math.max(this.sessionVersion, next.version) + 1; return next; } const next: SessionRecord = { sessionId: input.sessionId, tenantId: input.tenantId, projectId: input.projectId, backendProfile: input.backendProfile, conversationId: input.conversationId, threadId: input.threadId, metadata: input.metadata, version: 1, executionState: "idle", lastRunId: null, lastCommandId: null, activeRunId: null, activeCommandId: null, lastEventSeq: 0, terminalStatus: null, failureKind: null, title: null, summary: {}, lastActivityAt: null, createdAt: at, updatedAt: at, expiresAt: input.expiresAt, storageKind: "none", codexRolloutSubdir: input.codexRolloutSubdir, }; this.sessions.set(input.sessionId, next); this.sessionVersion = Math.max(this.sessionVersion, next.version) + 1; return next; } refreshSessionStorage(input: SessionStoragePatch): SessionRecord { const session = this.sessions.get(input.sessionId); if (!session) throw new AgentRunError("schema-invalid", `session ${input.sessionId} was not found`, { httpStatus: 404 }); const at = nowIso(); const next: SessionRecord = { ...session, storageKind: input.storageKind, storagePvcName: input.pvcName ?? null, storageNamespace: input.storageNamespace ?? null, storagePvcPhase: input.pvcPhase ?? null, storageSizeBytes: input.storageSizeBytes ?? null, storageFilesCount: input.storageFilesCount ?? null, storageSha256: input.storageSha256 ?? null, codexRolloutSubdir: input.codexRolloutSubdir ?? session.codexRolloutSubdir ?? "sessions", storageUpdatedAt: at, version: session.version + 1, updatedAt: at, }; this.sessions.set(input.sessionId, next); this.sessionVersion = Math.max(this.sessionVersion, next.version) + 1; return next; } markSessionStorageEvicted(input: { sessionId: string; pvcName: string }): SessionRecord { const session = this.sessions.get(input.sessionId); if (!session) throw new AgentRunError("schema-invalid", `session ${input.sessionId} was not found`, { httpStatus: 404 }); const at = nowIso(); const next: SessionRecord = { ...session, storageKind: "evicted", storagePvcName: input.pvcName, storageEvictedAt: at, storageUpdatedAt: at, version: session.version + 1, updatedAt: at, }; this.sessions.set(input.sessionId, next); this.sessionVersion = Math.max(this.sessionVersion, next.version) + 1; return next; } listGcExpiredSessions(input: ListGcExpiredSessionsInput): SessionRecord[] { const now = input.now; return Array.from(this.sessions.values()) .filter((session) => session.storageKind === "pvc" && Boolean(session.storagePvcName)) .filter((session) => session.expiresAt !== null && Date.parse(session.expiresAt ?? "") <= now) .sort((a, b) => a.updatedAt.localeCompare(b.updatedAt)) .slice(0, Math.max(1, input.limit)); } createQueueTask(input: CreateQueueTaskInput): QueueTaskRecord { const payloadHash = queueTaskPayloadHash(input); if (input.idempotencyKey) { const existing = Array.from(this.queueTasks.values()).find((task) => task.tenantId === input.tenantId && task.projectId === input.projectId && task.idempotencyKey === input.idempotencyKey); if (existing) { if (existing.payloadHash !== payloadHash) throw new AgentRunError("schema-invalid", "queue task idempotency key reused with different payload", { httpStatus: 409 }); return existing; } } const at = nowIso(); const sessionId = input.sessionRef?.sessionId ?? null; const sessionPath = sessionId ? `/api/v1/sessions/${encodeURIComponent(sessionId)}` : null; const task: QueueTaskRecord = { ...input, id: newId("qt"), state: "pending", version: this.nextQueueVersion(), payloadHash, latestAttempt: null, sessionPath, createdAt: at, updatedAt: at, cancelledAt: null, cancelReason: null }; this.queueTasks.set(task.id, task); return task; } listQueueTasks(input: ListQueueTasksInput): QueueTaskListResult { const startVersion = parseQueueCursor(input.cursor) ?? input.updatedAfter ?? 0; const items = Array.from(this.queueTasks.values()) .filter((task) => task.version > startVersion) .filter((task) => !input.queue || task.queue === input.queue) .filter((task) => !input.state || task.state === input.state) .sort(queueTaskSort) .slice(0, clampQueueLimit(input.limit)); return { items, count: items.length, cursor: items.length > 0 ? String(items[items.length - 1]?.version ?? startVersion) : null }; } getQueueTask(taskId: string): QueueTaskRecord { const task = this.queueTasks.get(taskId); if (!task) throw new AgentRunError("schema-invalid", `queue task ${taskId} was not found`, { httpStatus: 404 }); return task; } updateQueueTaskAttempt(taskId: string, input: UpdateQueueTaskAttemptInput): QueueTaskRecord { const task = this.getQueueTask(taskId); if (isTerminalQueueTaskState(task.state)) throw new AgentRunError(task.state === "cancelled" ? "cancelled" : "schema-invalid", `queue task ${taskId} is already terminal: ${task.state}`, { httpStatus: 409 }); const next: QueueTaskRecord = { ...task, state: input.state, latestAttempt: input.latestAttempt, sessionPath: input.sessionPath, version: this.nextQueueVersion(), updatedAt: nowIso() }; this.queueTasks.set(taskId, next); return next; } cancelQueueTask(taskId: string, reason = "cancel requested"): QueueTaskRecord { const task = this.getQueueTask(taskId); if (isTerminalQueueTaskState(task.state)) return task; if (task.latestAttempt?.runId) this.cancelRun(task.latestAttempt.runId, reason); else if (task.latestAttempt?.commandId) this.cancelCommand(task.latestAttempt.commandId, reason); const at = nowIso(); const latestAttempt = task.latestAttempt ? { ...task.latestAttempt, state: "cancelled" as const } : null; const next: QueueTaskRecord = { ...task, state: "cancelled", latestAttempt, version: this.nextQueueVersion(), updatedAt: at, cancelledAt: at, cancelReason: reason }; this.queueTasks.set(taskId, next); return next; } markQueueTaskRead(taskId: string, readerId: string): QueueReadCursorRecord { const task = this.getQueueTask(taskId); const record: QueueReadCursorRecord = { taskId, readerId, taskVersion: task.version, readAt: nowIso() }; this.queueReadCursors.set(queueReadKey(taskId, readerId), record); return record; } queueStats(queue?: string): QueueStats { return buildQueueStats(Array.from(this.queueTasks.values()).filter((task) => !queue || task.queue === queue), queue ?? null); } queueCommander(queue?: string, readerId: string | null = null): QueueCommanderSnapshot { const tasks = Array.from(this.queueTasks.values()).filter((task) => !queue || task.queue === queue); const generatedAt = nowIso(); const items = tasks .map((task) => buildQueueTaskSummary(task, readerId, readerId ? this.queueReadCursors.get(queueReadKey(task.id, readerId)) ?? null : null)) .filter((task) => queueTaskMatchesCommander(task, readerId)) .sort(queueTaskSort) .slice(0, 20); return { queue: queue ?? null, readerId, stats: buildQueueStats(tasks, queue ?? null, generatedAt), items, generatedAt }; } backends(): JsonRecord[] { return backendCapabilities(); } private updateRun(runId: string, patch: Partial): RunRecord { const run = this.getRun(runId); const next = { ...run, ...patch, updatedAt: nowIso() }; this.runs.set(runId, next); return next; } private nextQueueVersion(): number { this.queueVersion += 1; return this.queueVersion; } private resolveSessionForRun(input: CreateRunInput, at: string): SessionRef | null { if (!input.sessionRef) return null; const existing = this.sessions.get(input.sessionRef.sessionId); if (existing) { assertSessionBoundary(existing, input); return sessionRefFromRecord(existing, input.sessionRef); } const record: SessionRecord = { sessionId: input.sessionRef.sessionId, tenantId: input.tenantId, projectId: input.projectId, backendProfile: input.backendProfile, conversationId: input.sessionRef.conversationId ?? null, threadId: input.sessionRef.threadId ?? null, metadata: input.sessionRef.metadata ?? {}, version: this.nextSessionVersion(), executionState: "idle", lastRunId: null, lastCommandId: null, activeRunId: null, activeCommandId: null, lastEventSeq: 0, terminalStatus: null, failureKind: null, title: titleFromMetadata(input.sessionRef.metadata ?? {}), summary: {}, lastActivityAt: at, createdAt: at, updatedAt: at, expiresAt: input.sessionRef.expiresAt ?? null, }; this.sessions.set(record.sessionId, record); return sessionRefFromRecord(record, input.sessionRef); } private upsertSessionThread(run: RunRecord, threadId: string, turnId: string | null): void { if (!run.sessionRef?.sessionId) return; const at = nowIso(); const existing = this.sessions.get(run.sessionRef.sessionId); const record: SessionRecord = { sessionId: run.sessionRef.sessionId, tenantId: run.tenantId, projectId: run.projectId, backendProfile: run.backendProfile, conversationId: run.sessionRef.conversationId ?? existing?.conversationId ?? null, threadId, metadata: { ...(existing?.metadata ?? {}), ...(run.sessionRef.metadata ?? {}), ...(turnId ? { lastTurnId: turnId } : {}) }, version: this.nextSessionVersion(), executionState: existing?.executionState ?? "idle", lastRunId: existing?.lastRunId ?? run.id, lastCommandId: existing?.lastCommandId ?? null, activeRunId: existing?.activeRunId ?? null, activeCommandId: existing?.activeCommandId ?? null, lastEventSeq: existing?.lastEventSeq ?? 0, terminalStatus: existing?.terminalStatus ?? null, failureKind: existing?.failureKind ?? null, title: existing?.title ?? titleFromMetadata(run.sessionRef.metadata ?? {}), summary: existing?.summary ?? {}, lastActivityAt: at, createdAt: existing?.createdAt ?? at, updatedAt: at, expiresAt: run.sessionRef.expiresAt ?? existing?.expiresAt ?? null, }; this.sessions.set(record.sessionId, record); const nextSessionRef = sessionRefFromRecord(record, run.sessionRef); this.updateRun(run.id, { sessionRef: nextSessionRef }); this.appendEvent(run.id, "backend_status", { phase: "session-updated", sessionRef: summarizeSessionRef(nextSessionRef), turnId }); } private touchSessionForRun(run: RunRecord, patch: Partial, options: { bumpVersion: boolean; at?: string }): void { const sessionId = run.sessionRef?.sessionId; if (!sessionId) return; const existing = this.sessions.get(sessionId); if (!existing) return; const at = options.at ?? nowIso(); const next: SessionRecord = { ...existing, ...patch, version: options.bumpVersion ? this.nextSessionVersion() : existing.version, updatedAt: at, lastActivityAt: patch.lastActivityAt ?? at }; this.sessions.set(sessionId, next); } private resolveSessionRunId(sessionId: string, requestedRunId: string | null): string | null { const session = this.getSession(sessionId); if (!session) throw new AgentRunError("schema-invalid", `session ${sessionId} was not found`, { httpStatus: 404 }); if (requestedRunId) { const run = this.getRun(requestedRunId); if (run.sessionRef?.sessionId !== sessionId) throw new AgentRunError("schema-invalid", `run ${requestedRunId} does not belong to session ${sessionId}`, { httpStatus: 404 }); return requestedRunId; } return session.activeRunId ?? session.lastRunId; } private nextSessionVersion(): number { this.sessionVersion += 1; return this.sessionVersion; } } export function assertSessionBoundary(existing: SessionRecord, input: CreateRunInput): void { if (existing.tenantId !== input.tenantId || existing.projectId !== input.projectId) { throw new AgentRunError("tenant-policy-denied", "sessionRef cannot be reused across tenant or project boundary", { httpStatus: 403, details: { sessionId: existing.sessionId, valuesPrinted: false } }); } if (existing.backendProfile !== input.backendProfile) { throw new AgentRunError("schema-invalid", "sessionRef cannot be reused across backendProfile boundary", { httpStatus: 400, details: { sessionId: existing.sessionId, existingBackendProfile: existing.backendProfile, requestedBackendProfile: input.backendProfile, valuesPrinted: false } }); } } export function statusFromTerminal(terminalStatus: TerminalStatus): RunRecord["status"] { if (terminalStatus === "completed") return "completed"; if (terminalStatus === "cancelled") return "cancelled"; if (terminalStatus === "blocked") return "blocked"; return "failed"; } export function commandStateFromTerminal(terminalStatus: TerminalStatus): CommandRecord["state"] { if (terminalStatus === "completed") return "completed"; if (terminalStatus === "cancelled") return "cancelled"; return "failed"; } export function isTerminalRunStatus(status: RunRecord["status"]): boolean { return status === "completed" || status === "failed" || status === "blocked" || status === "cancelled"; } export function isTerminalCommandState(state: CommandRecord["state"]): boolean { return state === "completed" || state === "failed" || state === "cancelled"; } export function isTerminalQueueTaskState(state: QueueTaskState): boolean { return state === "completed" || state === "failed" || state === "blocked" || state === "cancelled"; } export function isLeaseExpired(leaseExpiresAt: string | null): boolean { if (!leaseExpiresAt) return true; return new Date(leaseExpiresAt).getTime() <= Date.now(); } export function sessionRefFromRecord(record: SessionRecord, fallback: SessionRef): SessionRef { return { sessionId: record.sessionId, ...(record.conversationId ? { conversationId: record.conversationId } : fallback.conversationId ? { conversationId: fallback.conversationId } : {}), ...(record.threadId ? { threadId: record.threadId } : fallback.threadId ? { threadId: fallback.threadId } : {}), ...(record.expiresAt ? { expiresAt: record.expiresAt } : fallback.expiresAt ? { expiresAt: fallback.expiresAt } : {}), ...(Object.keys(record.metadata).length > 0 ? { metadata: record.metadata } : fallback.metadata ? { metadata: fallback.metadata } : {}), }; } export function buildSessionSummary(record: SessionRecord, readerId: string | null, readCursor: SessionReadCursorRecord | null): SessionSummary { const active = record.executionState === "running" || record.activeRunId !== null || record.activeCommandId !== null; const unread = !active && (!readCursor || readCursor.sessionVersion < record.version); const attentionState = active ? "active" : unread ? "unread" : "read"; return { ...record, sessionPath: `${record.tenantId}/${record.projectId}/${record.sessionId}`, readerId, readCursor, attentionState, unread, active }; } export function buildQueueTaskSummary(record: QueueTaskRecord, readerId: string | null, readCursor: QueueReadCursorRecord | null): QueueTaskSummary { const active = !isTerminalQueueTaskState(record.state); const unread = !active && (!readCursor || readCursor.taskVersion < record.version); const attentionState = active ? "active" : unread ? "unread" : "read"; return { ...record, readerId, readCursor, attentionState, unread, active }; } export function queueTaskMatchesCommander(task: QueueTaskSummary, readerId: string | null): boolean { if (!readerId) return true; return task.active || task.unread; } export function sessionMatchesListState(session: SessionSummary, state: SessionListState): boolean { if (state === "all") return true; if (state === "default") return session.active || session.unread; if (state === "running") return session.active; if (state === "unread") return session.unread; if (state === "terminal") return session.executionState === "terminal"; if (state === "idle") return session.executionState === "idle"; return false; } export function sessionSort(a: SessionSummary, b: SessionSummary): number { if (a.active !== b.active) return a.active ? -1 : 1; if (a.unread !== b.unread) return a.unread ? -1 : 1; return (b.lastActivityAt ?? b.updatedAt).localeCompare(a.lastActivityAt ?? a.updatedAt) || b.updatedAt.localeCompare(a.updatedAt) || a.sessionId.localeCompare(b.sessionId); } export function clampSessionLimit(limit: number): number { return Math.max(1, Math.min(Number.isFinite(limit) ? Math.trunc(limit) : 50, 100)); } export function parseSessionCursor(cursor: string | undefined): number | null { if (!cursor) return null; const value = Number(cursor); return Number.isInteger(value) && value >= 0 ? value : null; } export function sessionListFilters(input: ListSessionsInput): JsonRecord { return { state: input.state ?? "default", backendProfile: input.backendProfile ?? null, readerId: input.readerId ?? null, cursor: input.cursor ?? null, limit: clampSessionLimit(input.limit) }; } export function summarizeSessionRef(sessionRef: SessionRef | null): JsonRecord | null { if (!sessionRef) return null; return { sessionId: sessionRef.sessionId, conversationId: sessionRef.conversationId ?? null, threadId: sessionRef.threadId ?? null, expiresAt: sessionRef.expiresAt ?? null, metadataKeys: Object.keys(sessionRef.metadata ?? {}).sort(), valuesPrinted: false, }; } export function summarizeResourceBundleRef(resourceBundleRef: RunRecord["resourceBundleRef"] | null | undefined): JsonRecord | null { if (!resourceBundleRef) return null; return { kind: resourceBundleRef.kind, repoUrl: resourceBundleRef.repoUrl, commitId: resourceBundleRef.commitId ?? null, ref: resourceBundleRef.ref ?? null, bundles: { count: resourceBundleRef.bundles.length, items: resourceBundleRef.bundles.map((item) => ({ name: item.name ?? null, repoUrl: item.repoUrl ?? resourceBundleRef.repoUrl, commitId: item.commitId ?? resourceBundleRef.commitId ?? null, ref: item.ref ?? resourceBundleRef.ref ?? null, subpath: item.subpath, targetPath: item.targetPath, valuesPrinted: false })), valuesPrinted: false, }, promptRefs: resourceBundleRef.promptRefs ? { count: resourceBundleRef.promptRefs.length, names: resourceBundleRef.promptRefs.map((item) => item.name), required: resourceBundleRef.promptRefs.filter((item) => item.required === true).map((item) => item.name), valuesPrinted: false } : { count: 0, names: [], required: [], valuesPrinted: false }, requiredSkills: resourceBundleRef.requiredSkills ? { count: resourceBundleRef.requiredSkills.length, names: resourceBundleRef.requiredSkills.map((item) => item.name), valuesPrinted: false } : { count: 0, names: [], valuesPrinted: false }, submodules: resourceBundleRef.submodules ?? false, lfs: resourceBundleRef.lfs ?? false, credentialRef: resourceBundleRef.credentialRef ? { name: resourceBundleRef.credentialRef.name, namespace: resourceBundleRef.credentialRef.namespace ?? null, keys: resourceBundleRef.credentialRef.keys ?? [], valuesPrinted: false } : null, }; } export function queueTaskSort(a: QueueTaskRecord, b: QueueTaskRecord): number { if (a.priority !== b.priority) return b.priority - a.priority; return a.createdAt.localeCompare(b.createdAt) || a.id.localeCompare(b.id); } export function queueTaskPayloadHash(input: CreateQueueTaskInput): string { return stableHash({ tenantId: input.tenantId, projectId: input.projectId, queue: input.queue, lane: input.lane, title: input.title, priority: input.priority, backendProfile: input.backendProfile, providerId: input.providerId, workspaceRef: input.workspaceRef, sessionRef: input.sessionRef, executionPolicy: input.executionPolicy, resourceBundleRef: input.resourceBundleRef, payload: input.payload, references: input.references, metadata: input.metadata, }); } export function buildQueueStats(tasks: QueueTaskRecord[], queue: string | null, generatedAt = nowIso()): QueueStats { const byState: Record = {}; const byLane: Record = {}; let maxVersion = 0; for (const task of tasks) { byState[task.state] = (byState[task.state] ?? 0) + 1; byLane[task.lane] = (byLane[task.lane] ?? 0) + 1; maxVersion = Math.max(maxVersion, task.version); } return { queue, total: tasks.length, byState, byLane, maxVersion, generatedAt }; } export function clampQueueLimit(limit: number): number { return Math.max(1, Math.min(Number.isFinite(limit) ? Math.trunc(limit) : 50, 100)); } export function parseQueueCursor(cursor: string | undefined): number | null { if (!cursor) return null; const value = Number(cursor); return Number.isInteger(value) && value >= 0 ? value : null; } function queueReadKey(taskId: string, readerId: string): string { return `${taskId}:${readerId}`; } export function sessionReadKey(sessionId: string, readerId: string): string { return `${sessionId}:${readerId}`; } export function titleFromMetadata(metadata: JsonRecord): string | null { const title = metadata.title; return typeof title === "string" && title.trim().length > 0 ? title.trim().slice(0, 200) : null; } export function sessionTitleFromCommand(command: CommandRecord): string | null { const value = command.payload.prompt; if (typeof value !== "string") return null; return value.trim().replace(/\s+/gu, " ").slice(0, 120) || null; } export function isSessionOutputEvent(event: RunEvent): boolean { return event.type === "assistant_message" || event.type === "command_output" || event.type === "diff" || event.type === "error" || event.type === "terminal_status"; }