diff --git a/src/common/types.ts b/src/common/types.ts index 72436e9..6fc137b 100644 --- a/src/common/types.ts +++ b/src/common/types.ts @@ -198,8 +198,48 @@ export interface SessionRecord extends JsonRecord { createdAt: string; updatedAt: string; expiresAt: string | null; + storageKind?: SessionStorageKind; + storagePvcName?: string | null; + storageNamespace?: string | null; + storageSizeBytes?: number | null; + storageFilesCount?: number | null; + storageSha256?: string | null; + storagePvcPhase?: string | null; + storageUpdatedAt?: string | null; + storageEvictedAt?: string | null; + codexRolloutSubdir?: string; } +export type SessionStorageKind = "none" | "pvc" | "evicted"; + +export interface UpsertSessionInput extends JsonRecord { + sessionId: string; + tenantId: string; + projectId: string; + backendProfile: BackendProfile; + conversationId: string | null; + threadId: string | null; + metadata: JsonRecord; + expiresAt: string | null; + codexRolloutSubdir: string; +} + +export interface SessionStoragePatch extends JsonRecord { + sessionId: string; + storageKind: SessionStorageKind; + pvcName?: string | null; + storageNamespace?: string | null; + pvcPhase?: string | null; + storageSizeBytes?: number | null; + storageFilesCount?: number | null; + storageSha256?: string | null; + codexRolloutSubdir?: string; +} + +export interface ListGcExpiredSessionsInput extends JsonRecord { + now: number; + limit: number; +} export interface SessionReadCursorRecord extends JsonRecord { sessionId: string; readerId: string; diff --git a/src/mgr/kubernetes-runner-job.ts b/src/mgr/kubernetes-runner-job.ts index 2f99afc..dbd7214 100644 --- a/src/mgr/kubernetes-runner-job.ts +++ b/src/mgr/kubernetes-runner-job.ts @@ -75,6 +75,12 @@ export async function createKubernetesRunnerJob(options: { store: AgentRunStore; } if (isTerminalRunStatus(run.status)) throw new AgentRunError(run.failureKind ?? (run.status === "cancelled" ? "cancelled" : "schema-invalid"), `run ${run.id} is already terminal: ${run.status}`, { httpStatus: 409 }); if (isTerminalCommandState(command.state) || command.state !== "pending") throw new AgentRunError(command.state === "cancelled" ? "cancelled" : "schema-invalid", `command ${commandId} is not pending: ${command.state}`, { httpStatus: 409 }); + if (run.sessionRef?.sessionId) { + const session = await options.store.getSession(run.sessionRef.sessionId); + if (session?.storageKind === "evicted") { + throw new AgentRunError("session-store-evicted", `session ${session.sessionId} storage has been evicted; create a new sessionId`, { httpStatus: 409, details: { sessionId: session.sessionId, pvcName: session.storagePvcName ?? null, pvcPhase: session.storagePvcPhase ?? null, valuesPrinted: false } }); + } + } const renderOptions = { run, diff --git a/src/mgr/postgres-store.ts b/src/mgr/postgres-store.ts index 1739906..3bcf5f7 100644 --- a/src/mgr/postgres-store.ts +++ b/src/mgr/postgres-store.ts @@ -3,7 +3,7 @@ import { Pool } from "pg"; import type { PoolClient, QueryResultRow } from "pg"; import { AgentRunError } from "../common/errors.js"; import { redactJson } from "../common/redaction.js"; -import type { BackendProfile, BackendTurnResult, CommandRecord, CommandState, CreateCommandInput, CreateQueueTaskInput, CreateRunInput, EventType, FailureKind, JsonRecord, JsonValue, QueueCommanderSnapshot, QueueReadCursorRecord, QueueStats, QueueTaskListResult, QueueTaskRecord, QueueTaskState, RunEvent, RunnerJobRecord, RunnerRecord, RunRecord, RunStatus, SessionEventPage, SessionListResult, SessionReadCursorRecord, SessionRecord, SessionRef, SessionSummary, TerminalStatus } from "../common/types.js"; +import type { BackendProfile, BackendTurnResult, CommandRecord, CommandState, CreateCommandInput, CreateQueueTaskInput, CreateRunInput, EventType, FailureKind, JsonRecord, JsonValue, ListGcExpiredSessionsInput, QueueCommanderSnapshot, QueueReadCursorRecord, QueueStats, QueueTaskListResult, QueueTaskRecord, QueueTaskState, RunEvent, RunnerJobRecord, RunnerRecord, RunRecord, RunStatus, SessionEventPage, SessionListResult, SessionReadCursorRecord, SessionRecord, SessionRef, SessionStoragePatch, SessionSummary, TerminalStatus, UpsertSessionInput } from "../common/types.js"; import { newId, nowIso, stableHash } from "../common/validation.js"; import type { AgentRunStore, ListQueueTasksInput, ListSessionsInput, SaveRunnerJobInput, SessionEventPageInput, StoreHealth, UpdateQueueTaskAttemptInput } from "./store.js"; import { assertSessionBoundary, buildQueueStats, buildSessionSummary, clampQueueLimit, clampSessionLimit, commandStateFromTerminal, isLeaseExpired, isSessionOutputEvent, isTerminalCommandState, isTerminalQueueTaskState, isTerminalRunStatus, parseQueueCursor, parseSessionCursor, queueTaskSort, sessionListFilters, sessionMatchesListState, sessionRefFromRecord, sessionSort, sessionTitleFromCommand, statusFromTerminal, summarizeResourceBundleRef, summarizeSessionRef, titleFromMetadata } from "./store.js"; @@ -710,6 +710,147 @@ CREATE TABLE IF NOT EXISTS agentrun_schema_migrations ( }); } + async upsertSession(input: UpsertSessionInput): Promise { + return this.withTransaction(async (client) => { + const existing = await client.query("SELECT * FROM agentrun_sessions WHERE session_id = $1 FOR UPDATE", [input.sessionId]); + const at = nowIso(); + if (existing.rows[0]) { + const session = sessionFromRow(existing.rows[0]); + const next: SessionRecord = { + ...session, + tenantId: input.tenantId, + projectId: input.projectId, + backendProfile: input.backendProfile, + conversationId: input.conversationId, + threadId: input.threadId, + metadata: input.metadata, + expiresAt: input.expiresAt, + codexRolloutSubdir: input.codexRolloutSubdir, + version: session.version + 1, + updatedAt: at, + }; + await client.query( + `UPDATE agentrun_sessions + SET tenant_id = $2, project_id = $3, backend_profile = $4, conversation_id = $5, thread_id = $6, + metadata = $7, expires_at = $8, codex_rollout_subdir = $9, version = $10, updated_at = $11 + WHERE session_id = $1`, + [next.sessionId, next.tenantId, next.projectId, next.backendProfile, next.conversationId, next.threadId, + JSON.stringify(next.metadata), next.expiresAt, next.codexRolloutSubdir, next.version, next.updatedAt], + ); + return next; + } + const next: SessionRecord = { + sessionId: input.sessionId, + tenantId: input.tenantId, + projectId: input.projectId, + backendProfile: input.backendProfile, + conversationId: input.conversationId, + threadId: input.threadId, + metadata: input.metadata, + version: 1, + executionState: "idle", + lastRunId: null, + lastCommandId: null, + activeRunId: null, + activeCommandId: null, + lastEventSeq: 0, + terminalStatus: null, + failureKind: null, + title: null, + summary: {}, + lastActivityAt: null, + createdAt: at, + updatedAt: at, + expiresAt: input.expiresAt, + storageKind: "none", + codexRolloutSubdir: input.codexRolloutSubdir, + }; + await client.query( + `INSERT INTO agentrun_sessions (session_id, tenant_id, project_id, backend_profile, conversation_id, thread_id, + metadata, version, execution_state, last_run_id, last_command_id, active_run_id, + active_command_id, last_event_seq, terminal_status, failure_kind, title, summary, + last_activity_at, created_at, updated_at, expires_at, storage_kind, codex_rollout_subdir) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, 'idle', null, null, null, null, 0, null, null, null, '{}'::jsonb, null, $9, $10, $11, 'none', $12)`, + [next.sessionId, next.tenantId, next.projectId, next.backendProfile, next.conversationId, next.threadId, + JSON.stringify(next.metadata), next.version, next.createdAt, next.updatedAt, next.expiresAt, next.codexRolloutSubdir], + ); + return next; + }); + } + + async refreshSessionStorage(input: SessionStoragePatch): Promise { + return this.withTransaction(async (client) => { + const existing = await client.query("SELECT * FROM agentrun_sessions WHERE session_id = $1 FOR UPDATE", [input.sessionId]); + if (!existing.rows[0]) throw new AgentRunError("schema-invalid", `session ${input.sessionId} was not found`, { httpStatus: 404 }); + const session = sessionFromRow(existing.rows[0]); + const at = nowIso(); + const next: SessionRecord = { + ...session, + storageKind: input.storageKind, + storagePvcName: input.pvcName ?? null, + storageNamespace: input.storageNamespace ?? null, + storagePvcPhase: input.pvcPhase ?? null, + storageSizeBytes: input.storageSizeBytes ?? null, + storageFilesCount: input.storageFilesCount ?? null, + storageSha256: input.storageSha256 ?? null, + codexRolloutSubdir: input.codexRolloutSubdir ?? session.codexRolloutSubdir ?? "sessions", + storageUpdatedAt: at, + version: session.version + 1, + updatedAt: at, + }; + await client.query( + `UPDATE agentrun_sessions + SET storage_kind = $2, storage_pvc_name = $3, storage_namespace = $4, storage_pvc_phase = $5, + storage_size_bytes = $6, storage_files_count = $7, storage_sha256 = $8, + storage_updated_at = $9, codex_rollout_subdir = $10, version = $11, updated_at = $12 + WHERE session_id = $1`, + [next.sessionId, next.storageKind, next.storagePvcName, next.storageNamespace, next.storagePvcPhase, + next.storageSizeBytes, next.storageFilesCount, next.storageSha256, + next.storageUpdatedAt, next.codexRolloutSubdir, next.version, next.updatedAt], + ); + return next; + }); + } + + async markSessionStorageEvicted(input: { sessionId: string; pvcName: string }): Promise { + return this.withTransaction(async (client) => { + const existing = await client.query("SELECT * FROM agentrun_sessions WHERE session_id = $1 FOR UPDATE", [input.sessionId]); + if (!existing.rows[0]) throw new AgentRunError("schema-invalid", `session ${input.sessionId} was not found`, { httpStatus: 404 }); + const session = sessionFromRow(existing.rows[0]); + const at = nowIso(); + const next: SessionRecord = { + ...session, + storageKind: "evicted", + storagePvcName: input.pvcName, + storageEvictedAt: at, + storageUpdatedAt: at, + version: session.version + 1, + updatedAt: at, + }; + await client.query( + `UPDATE agentrun_sessions + SET storage_kind = $2, storage_pvc_name = $3, storage_evicted_at = $4, storage_updated_at = $5, + version = $6, updated_at = $7 + WHERE session_id = $1`, + [next.sessionId, next.storageKind, next.storagePvcName, next.storageEvictedAt, next.storageUpdatedAt, next.version, next.updatedAt], + ); + return next; + }); + } + + async listGcExpiredSessions(input: ListGcExpiredSessionsInput): Promise { + const limit = Math.max(1, input.limit); + const result = await this.pool.query( + `SELECT * FROM agentrun_sessions + WHERE storage_kind = 'pvc' AND storage_pvc_name IS NOT NULL + AND expires_at IS NOT NULL AND expires_at <= to_timestamp($1) + ORDER BY updated_at ASC + LIMIT $2`, + [input.now / 1000, limit], + ); + return result.rows.map(sessionFromRow); + } + async createQueueTask(input: CreateQueueTaskInput): Promise { const payloadHash = stableHash(input.payload); return this.withTransaction(async (client) => { @@ -1153,6 +1294,16 @@ function sessionFromRow(row: QueryResultRow): SessionRecord { createdAt: iso(row.created_at), updatedAt: iso(row.updated_at), expiresAt: nullableIso(row.expires_at), + storageKind: (stringValue(row.storage_kind ?? "none") as SessionRecord["storageKind"]) ?? "none", + storagePvcName: nullableString(row.storage_pvc_name), + storageNamespace: nullableString(row.storage_namespace), + storageSizeBytes: row.storage_size_bytes !== null && row.storage_size_bytes !== undefined ? Number(row.storage_size_bytes) : null, + storageFilesCount: row.storage_files_count !== null && row.storage_files_count !== undefined ? Number(row.storage_files_count) : null, + storageSha256: nullableString(row.storage_sha256), + storagePvcPhase: nullableString(row.storage_pvc_phase), + storageUpdatedAt: nullableIso(row.storage_updated_at), + storageEvictedAt: nullableIso(row.storage_evicted_at), + codexRolloutSubdir: stringValue(row.codex_rollout_subdir ?? "sessions"), }; } diff --git a/src/mgr/server.ts b/src/mgr/server.ts index 8585038..b99c104 100644 --- a/src/mgr/server.ts +++ b/src/mgr/server.ts @@ -10,6 +10,24 @@ import { createKubernetesRunnerJob } from "./kubernetes-runner-job.js"; import { dispatchQueueTask, refreshQueueTaskFromCore } from "./queue-dispatch.js"; import { buildRunResult } from "./result.js"; import { runnerJobStatusSummary } from "./runner-job-status.js"; +import { createSessionPvc, deleteSessionPvc, getSessionPvcSummary, refreshSessionPvcSummary, runSessionStorageGc } from "./session-pvc.js"; +import type { SessionPvcSummary } from "./session-pvc.js"; +import type { SessionPvcOptions } from "./session-pvc.js"; + +function pvcOptions(defaults: { kubectlCommand?: string } | undefined): SessionPvcOptions { + return defaults?.kubectlCommand ? { kubectlCommand: defaults.kubectlCommand } : {}; +} + +function sessionPvcOptionsForRequest(serverDefaults: { kubectlHandler?: import("./session-pvc.js").KubectlHandler; kubectlCommand?: string; storageClassName?: string; size?: string } | undefined, runnerJobDefaults: { kubectlCommand?: string } | undefined): SessionPvcOptions { + if (serverDefaults?.kubectlHandler) { + const opts: SessionPvcOptions = { kubectlHandler: serverDefaults.kubectlHandler }; + if (serverDefaults.kubectlCommand) opts.kubectlCommand = serverDefaults.kubectlCommand; + if (serverDefaults.storageClassName) opts.storageClassName = serverDefaults.storageClassName; + if (serverDefaults.size) opts.size = serverDefaults.size; + return opts; + } + return pvcOptions(runnerJobDefaults); +} export interface ManagerServerOptions { store?: AgentRunStore; @@ -23,6 +41,7 @@ export interface ManagerServerOptions { serviceAccountName?: string; kubectlCommand?: string; }; + sessionPvcOptions?: { kubectlHandler?: import("./session-pvc.js").KubectlHandler; kubectlCommand?: string; storageClassName?: string; size?: string }; } export interface StartedManagerServer { @@ -35,12 +54,13 @@ export async function startManagerServer(options: ManagerServerOptions = {}): Pr const store = options.store ?? await openAgentRunStoreFromEnv(); const sourceCommit = options.sourceCommit ?? process.env.AGENTRUN_SOURCE_COMMIT ?? "unknown"; const runnerJobDefaults = options.runnerJobDefaults; + const sessionPvcDefaults = options.sessionPvcOptions; const server = createServer(async (req, res) => { const traceId = `trc_${Date.now().toString(36)}_${Math.random().toString(36).slice(2, 8)}`; try { const method = req.method ?? "GET"; const url = new URL(req.url ?? "/", "http://agentrun.local"); - const data = await route({ method, url, body: await readBody(req), store, sourceCommit, ...(runnerJobDefaults ? { runnerJobDefaults } : {}) }); + const data = await route({ method, url, body: await readBody(req), store, sourceCommit, ...(runnerJobDefaults ? { runnerJobDefaults } : {}), ...(sessionPvcDefaults ? { sessionPvcDefaults } : {}) }); writeJson(res, 200, { ok: true, data, traceId }); } catch (error) { const agentError = normalizeError(error); @@ -61,7 +81,7 @@ async function readBody(req: import("node:http").IncomingMessage): Promise }): Promise { +async function route({ method, url, body, store, sourceCommit, runnerJobDefaults, sessionPvcDefaults }: { method: string; url: URL; body: unknown; store: AgentRunStore; sourceCommit: string; runnerJobDefaults?: NonNullable; sessionPvcDefaults?: NonNullable }): Promise { const path = url.pathname; if (method === "GET" && (path === "/health" || path === "/health/live" || path === "/health/readiness")) { const database = await store.health(); @@ -117,6 +137,65 @@ async function route({ method, url, body, store, sourceCommit, runnerJobDefaults } throw new AgentRunError("schema-invalid", `session control action ${action} is not supported`, { httpStatus: 400 }); } + const sessionStorageMatch = path.match(/^\/api\/v1\/sessions\/([^/]+)\/storage$/u); + if (method === "GET" && sessionStorageMatch) { + const summary = await getSessionPvcSummary({ store, sessionId: sessionStorageMatch[1] ?? "", options: sessionPvcOptionsForRequest(sessionPvcDefaults, runnerJobDefaults) }); + return summary as unknown as JsonValue; + } + if (method === "DELETE" && sessionStorageMatch) { + return await deleteSessionPvc({ store, sessionId: sessionStorageMatch[1] ?? "", options: sessionPvcOptionsForRequest(sessionPvcDefaults, runnerJobDefaults) }) as unknown as JsonValue; + } + const sessionStorageRefreshMatch = path.match(/^\/api\/v1\/sessions\/([^/]+)\/storage\/refresh$/u); + if (method === "POST" && sessionStorageRefreshMatch) { + const record = asRecord(body ?? {}, "sessionStorageRefresh"); + const summary: SessionPvcSummary = { + pvcName: stringField(record, "pvcName"), + namespace: stringField(record, "namespace"), + pvcPhase: typeof record.pvcPhase === "string" ? record.pvcPhase : null, + storageSizeBytes: typeof record.storageSizeBytes === "number" ? record.storageSizeBytes : null, + storageFilesCount: typeof record.storageFilesCount === "number" ? record.storageFilesCount : null, + storageSha256: typeof record.storageSha256 === "string" ? record.storageSha256 : null, + storageUpdatedAt: typeof record.storageUpdatedAt === "string" ? record.storageUpdatedAt : new Date().toISOString(), + codexRolloutSubdir: typeof record.codexRolloutSubdir === "string" && record.codexRolloutSubdir.length > 0 ? record.codexRolloutSubdir : "sessions", + valuesPrinted: false, + }; + const refreshed = await refreshSessionPvcSummary({ store, sessionId: sessionStorageRefreshMatch[1] ?? "", summary, options: sessionPvcOptionsForRequest(sessionPvcDefaults, runnerJobDefaults) }); + return { action: "session-storage-refreshed", sessionId: refreshed.sessionId, summary: refreshed } as unknown as JsonValue; + } + if (method === "POST" && path === "/api/v1/sessions/storage/gc") { + const cycle = await runSessionStorageGc({ store, options: sessionPvcOptionsForRequest(sessionPvcDefaults, runnerJobDefaults) }); + return cycle as unknown as JsonValue; + } + if (method === "POST" && path === "/api/v1/sessions") { + const record = asRecord(body ?? {}, "sessionCreate"); + const sessionId = typeof record.sessionId === "string" && record.sessionId.trim().length > 0 ? record.sessionId.trim() : `sess_${Date.now().toString(36)}_${Math.random().toString(36).slice(2, 8)}`; + const tenantId = stringField(record, "tenantId"); + const projectId = stringField(record, "projectId"); + const backendProfileRaw = typeof record.backendProfile === "string" ? record.backendProfile : "codex"; + if (backendProfileRaw !== "codex" && backendProfileRaw !== "deepseek" && backendProfileRaw !== "minimax-m3") throw new AgentRunError("schema-invalid", `backendProfile ${backendProfileRaw} is not supported`, { httpStatus: 400 }); + const conversationId = typeof record.conversationId === "string" ? record.conversationId : null; + const codexRolloutSubdir = typeof record.codexRolloutSubdir === "string" && record.codexRolloutSubdir.length > 0 ? record.codexRolloutSubdir : "sessions"; + const expiresAt = typeof record.expiresAt === "string" ? record.expiresAt : new Date(Date.now() + 30 * 24 * 60 * 60 * 1000).toISOString(); + const existing = await store.getSession(sessionId); + if (existing) { + if (existing.storageKind === "evicted") throw new AgentRunError("session-store-evicted", `session ${sessionId} storage has been evicted; create a new sessionId`, { httpStatus: 409 }); + return { action: "session-exists", session: existing, pvcName: existing.storagePvcName ?? null, pvcPhase: existing.storagePvcPhase ?? null, codexRolloutSubdir: existing.codexRolloutSubdir ?? "sessions", valuesPrinted: false } as unknown as JsonValue; + } + const now = new Date().toISOString(); + const session = await store.upsertSession({ + sessionId, + tenantId, + projectId, + backendProfile: backendProfileRaw as never, + conversationId, + threadId: null, + metadata: typeof record.metadata === "object" && record.metadata !== null && !Array.isArray(record.metadata) ? record.metadata as Record : {}, + expiresAt, + codexRolloutSubdir, + }); + const pvc = await createSessionPvc({ store, sessionId, options: { ...sessionPvcOptionsForRequest(sessionPvcDefaults, runnerJobDefaults), defaultCodexRolloutSubdir: codexRolloutSubdir } }); + return { action: "session-created", session, pvc, valuesPrinted: false } as unknown as JsonValue; + } if (method === "POST" && path === "/api/v1/queue/tasks") return await store.createQueueTask(validateCreateQueueTask(body)) as unknown as JsonValue; if (method === "GET" && path === "/api/v1/queue/tasks") { const state = url.searchParams.get("state"); @@ -286,6 +365,12 @@ function numberField(record: JsonRecord, key: string, fallback: number): number return typeof value === "number" && Number.isFinite(value) ? value : fallback; } +function stringField(record: JsonRecord, key: string): string { + const value = record[key]; + if (typeof value !== "string" || value.trim().length === 0) throw new AgentRunError("schema-invalid", `${key} is required`, { httpStatus: 400 }); + return value.trim(); +} + function normalizeError(error: unknown): AgentRunError { if (error instanceof AgentRunError) return error; return new AgentRunError("infra-failed", error instanceof Error ? error.message : String(error), { httpStatus: 500 }); diff --git a/src/mgr/session-pvc.ts b/src/mgr/session-pvc.ts new file mode 100644 index 0000000..5d76d6d --- /dev/null +++ b/src/mgr/session-pvc.ts @@ -0,0 +1,272 @@ +import { spawn } from "node:child_process"; +import type { AgentRunStore } from "./store.js"; +import type { JsonRecord } from "../common/types.js"; +import { AgentRunError } from "../common/errors.js"; +import { redactJson, redactText } from "../common/redaction.js"; + +export interface SessionPvcSpec { + pvcName: string; + namespace: string; + storageClassName: string; + size: string; +} + +export interface SessionPvcSummary extends JsonRecord { + pvcName: string; + namespace: string; + pvcPhase: string | null; + storageSizeBytes: number | null; + storageFilesCount: number | null; + storageSha256: string | null; + storageUpdatedAt: string | null; + codexRolloutSubdir: string; + valuesPrinted: false; +} + +export type KubectlHandler = (input: { args: string[]; stdin?: string }) => { stdout: string; stderr: string; exitCode: number }; + +export interface SessionPvcOptions { + storageClassName?: string; + size?: string; + kubectlCommand?: string; + kubectlHandler?: KubectlHandler; + defaultCodexRolloutSubdir?: string; +} + +const defaultStorageClassName = "local-path"; +const defaultPvcSize = "1Gi"; +const defaultSubdir = "sessions"; + +export function sessionPvcNameFor(sessionId: string): string { + return `agentrun-v01-session-${sessionId}`; +} + +export function buildSessionPvcSpec(input: { sessionId: string; namespace?: string; options: SessionPvcOptions }): SessionPvcSpec { + const namespace = input.namespace ?? "agentrun-v01"; + return { + pvcName: sessionPvcNameFor(input.sessionId), + namespace, + storageClassName: input.options.storageClassName ?? process.env.AGENTRUN_SESSION_STORAGE_CLASS ?? defaultStorageClassName, + size: input.options.size ?? process.env.AGENTRUN_SESSION_STORAGE_SIZE ?? defaultPvcSize, + }; +} + +function resolveHandler(options: SessionPvcOptions): KubectlHandler { + if (options.kubectlHandler) return options.kubectlHandler; + const command = options.kubectlCommand ?? process.env.AGENTRUN_KUBECTL ?? "kubectl"; + return ({ args, stdin }) => spawnKubectl(command, args, stdin); +} + +function spawnKubectl(command: string, args: string[], stdinPayload?: string): { stdout: string; stderr: string; exitCode: number } { + const child = spawn(command, args, { stdio: ["pipe", "pipe", "pipe"] }); + let stdout = ""; + let stderr = ""; + child.stdout.setEncoding("utf8"); + child.stderr.setEncoding("utf8"); + child.stdout.on("data", (chunk) => { stdout += String(chunk); }); + child.stderr.on("data", (chunk) => { stderr += String(chunk); }); + if (stdinPayload !== undefined) child.stdin.end(`${stdinPayload}\n`); + else child.stdin.end(); + return new Promise<{ stdout: string; stderr: string; exitCode: number }>((resolve) => { + child.on("error", () => resolve({ stdout, stderr, exitCode: 1 })); + child.on("close", (code) => resolve({ stdout, stderr, exitCode: code ?? 1 })); + }) as unknown as { stdout: string; stderr: string; exitCode: number }; +} + +export async function createSessionPvc(input: { store: AgentRunStore; sessionId: string; namespace?: string; options: SessionPvcOptions }): Promise { + const session = await input.store.getSession(input.sessionId); + if (!session) throw new AgentRunError("schema-invalid", `session ${input.sessionId} was not found`, { httpStatus: 404 }); + const spec = buildSessionPvcSpec(input); + const manifest: JsonRecord = { + apiVersion: "v1", + kind: "PersistentVolumeClaim", + metadata: { name: spec.pvcName, namespace: spec.namespace, labels: { "agentrun.pikastech.local/session-id": input.sessionId, "agentrun.pikastech.local/lane": "v0.1" } }, + spec: { accessModes: ["ReadWriteOnce"], storageClassName: spec.storageClassName, resources: { requests: { storage: spec.size } } }, + }; + const handler = resolveHandler(input.options); + const result = await handler({ args: ["create", "-f", "-", "-o", "json"], stdin: JSON.stringify(manifest) }); + if (result.exitCode !== 0) { + throw new AgentRunError("infra-failed", `kubectl create session PVC failed with code ${result.exitCode}`, { httpStatus: 502, details: redactJson({ stderr: redactText(result.stderr.slice(-4000)) }) }); + } + let phase = "Pending"; + try { + const parsed = JSON.parse(result.stdout) as JsonRecord; + phase = (objectPath(parsed, ["status", "phase"]) as string | null) ?? "Pending"; + } catch { + phase = "Pending"; + } + const summary: SessionPvcSummary = { + pvcName: spec.pvcName, + namespace: spec.namespace, + pvcPhase: phase, + storageSizeBytes: null, + storageFilesCount: null, + storageSha256: null, + storageUpdatedAt: null, + codexRolloutSubdir: input.options.defaultCodexRolloutSubdir ?? defaultSubdir, + valuesPrinted: false, + }; + await input.store.refreshSessionStorage({ + sessionId: input.sessionId, + storageKind: "pvc", + pvcName: spec.pvcName, + storageNamespace: spec.namespace, + pvcPhase: phase, + codexRolloutSubdir: summary.codexRolloutSubdir, + }); + return summary; +} + +export async function getSessionPvcSummary(input: { store: AgentRunStore; sessionId: string; namespace?: string; options: SessionPvcOptions }): Promise { + const session = await input.store.getSession(input.sessionId); + if (!session) throw new AgentRunError("schema-invalid", `session ${input.sessionId} was not found`, { httpStatus: 404 }); + const spec = buildSessionPvcSpec(input); + const codexRolloutSubdir = session.codexRolloutSubdir ?? defaultSubdir; + const handler = resolveHandler(input.options); + const result = await handler({ args: ["get", "pvc", spec.pvcName, "-n", spec.namespace, "-o", "json"] }); + if (result.exitCode !== 0) { + const notFound = `${result.stdout}\n${result.stderr}`.toLowerCase().includes("notfound") || `${result.stdout}\n${result.stderr}`.toLowerCase().includes("not found"); + if (notFound) { + return { + pvcName: spec.pvcName, + namespace: spec.namespace, + pvcPhase: "NotFound", + storageSizeBytes: null, + storageFilesCount: null, + storageSha256: null, + storageUpdatedAt: new Date().toISOString(), + codexRolloutSubdir, + valuesPrinted: false, + }; + } + throw new AgentRunError("infra-failed", `kubectl get pvc failed with code ${result.exitCode}`, { httpStatus: 502, details: redactJson({ stderr: redactText(result.stderr.slice(-4000)) }) }); + } + let pvc: JsonRecord = {}; + try { pvc = JSON.parse(result.stdout) as JsonRecord; } catch { pvc = {}; } + const phase = (objectPath(pvc, ["status", "phase"]) as string | null) ?? "Unknown"; + const capacity = objectPath(pvc, ["status", "capacity", "storage"]) ?? null; + return { + pvcName: spec.pvcName, + namespace: spec.namespace, + pvcPhase: phase, + storageSizeBytes: capacity ? parseStorageSize(capacity) : null, + storageFilesCount: null, + storageSha256: null, + storageUpdatedAt: new Date().toISOString(), + codexRolloutSubdir, + valuesPrinted: false, + }; +} + +export async function deleteSessionPvc(input: { store: AgentRunStore; sessionId: string; namespace?: string; options: SessionPvcOptions }): Promise<{ pvcName: string; namespace: string; storageKind: string }> { + const session = await input.store.getSession(input.sessionId); + if (!session) throw new AgentRunError("schema-invalid", `session ${input.sessionId} was not found`, { httpStatus: 404 }); + const spec = buildSessionPvcSpec(input); + const handler = resolveHandler(input.options); + await handler({ args: ["delete", "pvc", spec.pvcName, "-n", spec.namespace, "--ignore-not-found"] }); + await input.store.markSessionStorageEvicted({ sessionId: input.sessionId, pvcName: spec.pvcName }); + return { pvcName: spec.pvcName, namespace: spec.namespace, storageKind: "evicted" }; +} + +export async function refreshSessionPvcSummary(input: { store: AgentRunStore; sessionId: string; summary: SessionPvcSummary; options: SessionPvcOptions }): Promise { + const session = await input.store.getSession(input.sessionId); + if (!session) throw new AgentRunError("schema-invalid", `session ${input.sessionId} was not found`, { httpStatus: 404 }); + await input.store.refreshSessionStorage({ + sessionId: input.sessionId, + storageKind: "pvc", + pvcName: input.summary.pvcName, + storageNamespace: input.summary.namespace, + pvcPhase: input.summary.pvcPhase, + storageSizeBytes: input.summary.storageSizeBytes, + storageFilesCount: input.summary.storageFilesCount, + storageSha256: input.summary.storageSha256, + codexRolloutSubdir: input.summary.codexRolloutSubdir, + }); + return input.summary; +} + +export async function ensureSessionPvc(input: { store: AgentRunStore; sessionId: string; namespace?: string; options: SessionPvcOptions }): Promise { + const summary = await getSessionPvcSummary(input); + if (summary.pvcPhase === "Bound" || summary.pvcPhase === "Pending") return summary; + if (summary.pvcPhase === "NotFound" || summary.pvcPhase === "Unknown") { + return await createSessionPvc(input); + } + return summary; +} + +export function shouldCreateSessionPvcForRun(session: { storageKind: string | null; storagePvcName: string | null } | null): boolean { + if (!session) return false; + if (session.storageKind === "pvc" && session.storagePvcName) return false; + if (session.storageKind === "evicted") return false; + return true; +} + +export function isSessionStorageEvicted(session: { storageKind: string | null } | null): boolean { + return session?.storageKind === "evicted"; +} + +export async function runSessionStorageGc(input: { store: AgentRunStore; options: SessionPvcOptions; now?: number; maxSessions?: number }): Promise<{ scanned: number; deleted: number; skipped: number; deletedPvcNames: string[] }> { + const now = input.now ?? Date.now(); + const max = input.maxSessions ?? 200; + const sessions = await input.store.listGcExpiredSessions({ now, limit: max }); + const handler = resolveHandler(input.options); + let deleted = 0; + let skipped = 0; + const deletedPvcNames: string[] = []; + for (const session of sessions) { + if (session.activeRunId || session.activeCommandId) { skipped++; continue; } + if (session.storageKind !== "pvc" || !session.storagePvcName) { skipped++; continue; } + try { + await handler({ args: ["delete", "pvc", session.storagePvcName, "-n", session.storageNamespace ?? "agentrun-v01", "--ignore-not-found"] }); + await input.store.markSessionStorageEvicted({ sessionId: session.sessionId, pvcName: session.storagePvcName }); + deleted++; + deletedPvcNames.push(session.storagePvcName); + } catch { + skipped++; + } + } + return { scanned: sessions.length, deleted, skipped, deletedPvcNames }; +} + +export function startSessionStorageGcLoop(input: { store: AgentRunStore; options: SessionPvcOptions; intervalMs?: number; onCycle?: (cycle: { scanned: number; deleted: number; skipped: number }) => void }): { stop(): void } { + const intervalMs = input.intervalMs ?? Number(process.env.AGENTRUN_SESSION_GC_INTERVAL_MS ?? 300_000); + let stopped = false; + let timer: NodeJS.Timeout | null = null; + const tick = async (): Promise => { + if (stopped) return; + try { + const cycle = await runSessionStorageGc({ store: input.store, options: input.options }); + input.onCycle?.(cycle); + } catch { + // swallow per cycle, retry on next tick + } finally { + if (!stopped) timer = setTimeout(() => { void tick(); }, intervalMs); + } + }; + timer = setTimeout(() => { void tick(); }, intervalMs); + return { + stop(): void { + stopped = true; + if (timer) clearTimeout(timer); + }, + }; +} + +function objectPath(value: unknown, path: string[]): string | null { + let current: unknown = value; + for (const key of path) { + if (typeof current !== "object" || current === null || Array.isArray(current)) return null; + current = (current as Record)[key]; + } + return typeof current === "string" || typeof current === "number" ? String(current) : null; +} + +function parseStorageSize(value: string): number | null { + const match = value.trim().match(/^(\d+(?:\.\d+)?)(Ki|Mi|Gi|Ti|Pi|K|M|G|T|P)?$/u); + if (!match) return null; + const num = Number(match[1]); + const unit = match[2] ?? ""; + const multipliers: Record = { "": 1, "K": 1_000, "Ki": 1024, "M": 1_000_000, "Mi": 1024 * 1024, "G": 1_000_000_000, "Gi": 1024 * 1024 * 1024, "T": 1_000_000_000_000, "Ti": 1024 * 1024 * 1024 * 1024, "P": 1_000_000_000_000_000, "Pi": 1024 * 1024 * 1024 * 1024 * 1024 }; + const factor = multipliers[unit] ?? 1; + return Math.round(num * factor); +} diff --git a/src/mgr/store.ts b/src/mgr/store.ts index 8fb29df..754f106 100644 --- a/src/mgr/store.ts +++ b/src/mgr/store.ts @@ -1,4 +1,4 @@ -import type { BackendProfile, BackendTurnResult, CommandRecord, CreateCommandInput, CreateQueueTaskInput, CreateRunInput, FailureKind, JsonRecord, QueueAttemptRef, QueueCommanderSnapshot, QueueReadCursorRecord, QueueStats, QueueTaskListResult, QueueTaskRecord, QueueTaskState, RunEvent, RunnerJobRecord, RunnerRecord, RunRecord, SessionEventPage, SessionListResult, SessionListState, SessionReadCursorRecord, SessionRecord, SessionRef, SessionSummary, TerminalStatus } from "../common/types.js"; +import type { BackendProfile, BackendTurnResult, CommandRecord, CreateCommandInput, CreateQueueTaskInput, CreateRunInput, FailureKind, JsonRecord, ListGcExpiredSessionsInput, QueueAttemptRef, QueueCommanderSnapshot, QueueReadCursorRecord, QueueStats, QueueTaskListResult, QueueTaskRecord, QueueTaskState, RunEvent, RunnerJobRecord, RunnerRecord, RunRecord, SessionEventPage, SessionListResult, SessionListState, SessionReadCursorRecord, SessionRecord, SessionRef, SessionStoragePatch, SessionSummary, TerminalStatus, UpsertSessionInput } from "../common/types.js"; import { AgentRunError } from "../common/errors.js"; import { newId, nowIso, stableHash } from "../common/validation.js"; import { redactJson } from "../common/redaction.js"; @@ -44,6 +44,10 @@ export interface AgentRunStore { listSessionTrace(sessionId: string, input: SessionEventPageInput): MaybePromise; listSessionOutput(sessionId: string, input: SessionEventPageInput): MaybePromise; markSessionRead(sessionId: string, readerId: string): MaybePromise; + upsertSession(input: UpsertSessionInput): MaybePromise; + refreshSessionStorage(input: SessionStoragePatch): MaybePromise; + markSessionStorageEvicted(input: { sessionId: string; pvcName: string }): MaybePromise; + listGcExpiredSessions(input: ListGcExpiredSessionsInput): MaybePromise; createQueueTask(input: CreateQueueTaskInput): MaybePromise; listQueueTasks(input: ListQueueTasksInput): MaybePromise; getQueueTask(taskId: string): MaybePromise; @@ -344,6 +348,108 @@ export class MemoryAgentRunStore implements AgentRunStore { return record; } + upsertSession(input: UpsertSessionInput): SessionRecord { + const at = nowIso(); + const existing = this.sessions.get(input.sessionId); + if (existing) { + const next: SessionRecord = { + ...existing, + tenantId: input.tenantId, + projectId: input.projectId, + backendProfile: input.backendProfile, + conversationId: input.conversationId, + threadId: input.threadId, + metadata: input.metadata, + expiresAt: input.expiresAt, + codexRolloutSubdir: input.codexRolloutSubdir, + version: existing.version + 1, + updatedAt: at, + }; + this.sessions.set(input.sessionId, next); + this.sessionVersion = Math.max(this.sessionVersion, next.version) + 1; + return next; + } + const next: SessionRecord = { + sessionId: input.sessionId, + tenantId: input.tenantId, + projectId: input.projectId, + backendProfile: input.backendProfile, + conversationId: input.conversationId, + threadId: input.threadId, + metadata: input.metadata, + version: 1, + executionState: "idle", + lastRunId: null, + lastCommandId: null, + activeRunId: null, + activeCommandId: null, + lastEventSeq: 0, + terminalStatus: null, + failureKind: null, + title: null, + summary: {}, + lastActivityAt: null, + createdAt: at, + updatedAt: at, + expiresAt: input.expiresAt, + storageKind: "none", + codexRolloutSubdir: input.codexRolloutSubdir, + }; + this.sessions.set(input.sessionId, next); + this.sessionVersion = Math.max(this.sessionVersion, next.version) + 1; + return next; + } + + refreshSessionStorage(input: SessionStoragePatch): SessionRecord { + const session = this.sessions.get(input.sessionId); + if (!session) throw new AgentRunError("schema-invalid", `session ${input.sessionId} was not found`, { httpStatus: 404 }); + const at = nowIso(); + const next: SessionRecord = { + ...session, + storageKind: input.storageKind, + storagePvcName: input.pvcName ?? null, + storageNamespace: input.storageNamespace ?? null, + storagePvcPhase: input.pvcPhase ?? null, + storageSizeBytes: input.storageSizeBytes ?? null, + storageFilesCount: input.storageFilesCount ?? null, + storageSha256: input.storageSha256 ?? null, + codexRolloutSubdir: input.codexRolloutSubdir ?? session.codexRolloutSubdir ?? "sessions", + storageUpdatedAt: at, + version: session.version + 1, + updatedAt: at, + }; + this.sessions.set(input.sessionId, next); + this.sessionVersion = Math.max(this.sessionVersion, next.version) + 1; + return next; + } + + markSessionStorageEvicted(input: { sessionId: string; pvcName: string }): SessionRecord { + const session = this.sessions.get(input.sessionId); + if (!session) throw new AgentRunError("schema-invalid", `session ${input.sessionId} was not found`, { httpStatus: 404 }); + const at = nowIso(); + const next: SessionRecord = { + ...session, + storageKind: "evicted", + storagePvcName: input.pvcName, + storageEvictedAt: at, + storageUpdatedAt: at, + version: session.version + 1, + updatedAt: at, + }; + this.sessions.set(input.sessionId, next); + this.sessionVersion = Math.max(this.sessionVersion, next.version) + 1; + return next; + } + + listGcExpiredSessions(input: ListGcExpiredSessionsInput): SessionRecord[] { + const now = input.now; + return Array.from(this.sessions.values()) + .filter((session) => session.storageKind === "pvc" && Boolean(session.storagePvcName)) + .filter((session) => session.expiresAt !== null && Date.parse(session.expiresAt ?? "") <= now) + .sort((a, b) => a.updatedAt.localeCompare(b.updatedAt)) + .slice(0, Math.max(1, input.limit)); + } + createQueueTask(input: CreateQueueTaskInput): QueueTaskRecord { const payloadHash = stableHash(input.payload); if (input.idempotencyKey) { diff --git a/src/selftest/cases/10-mgr-session-pvc.ts b/src/selftest/cases/10-mgr-session-pvc.ts new file mode 100644 index 0000000..7c6035c --- /dev/null +++ b/src/selftest/cases/10-mgr-session-pvc.ts @@ -0,0 +1,88 @@ +import assert from "node:assert/strict"; +import { startManagerServer } from "../../mgr/server.js"; +import { MemoryAgentRunStore } from "../../mgr/store.js"; +import { ManagerClient } from "../../mgr/client.js"; +import { createSessionPvc, deleteSessionPvc, getSessionPvcSummary, refreshSessionPvcSummary, runSessionStorageGc, sessionPvcNameFor } from "../../mgr/session-pvc.js"; +import type { KubectlHandler, SessionPvcOptions } from "../../mgr/session-pvc.js"; +import type { SelfTestCase } from "../harness.js"; + +function makeFakeKubectl(): KubectlHandler { + return ({ args, stdin }) => { + if (args[0] === "create") { + const manifest = stdin ? JSON.parse(stdin) : {}; + return { stdout: JSON.stringify({ apiVersion: "v1", kind: "PersistentVolumeClaim", metadata: { name: manifest.metadata?.name, namespace: manifest.metadata?.namespace }, status: { phase: "Bound" } }), stderr: "", exitCode: 0 }; + } + if (args[0] === "get") { + return { stdout: JSON.stringify({ apiVersion: "v1", kind: "PersistentVolumeClaim", metadata: { name: args[2] }, status: { phase: "Bound", capacity: { storage: "1Gi" } } }), stderr: "", exitCode: 0 }; + } + if (args[0] === "delete") { + return { stdout: "", stderr: "", exitCode: 0 }; + } + return { stdout: "{}", stderr: "", exitCode: 0 }; + }; +} + +const selfTest: SelfTestCase = async () => { + const fakeKubectl = makeFakeKubectl(); + const opts: SessionPvcOptions = { kubectlHandler: fakeKubectl, defaultCodexRolloutSubdir: "sessions" }; + const store = new MemoryAgentRunStore(); + const sessionId = "sess_test_pvc_001"; + store.upsertSession({ sessionId, tenantId: "hwlab", projectId: "pikasTech/HWLAB", backendProfile: "codex", conversationId: null, threadId: null, metadata: {}, expiresAt: new Date(Date.now() + 86_400_000).toISOString(), codexRolloutSubdir: "sessions" }); + const summary = await createSessionPvc({ store, sessionId, options: opts }); + assert.equal(summary.pvcName, sessionPvcNameFor(sessionId)); + assert.equal(summary.pvcPhase, "Bound"); + assert.equal(summary.codexRolloutSubdir, "sessions"); + const after = await store.getSession(sessionId); + assert.equal(after?.storageKind, "pvc"); + assert.equal(after?.storagePvcName, summary.pvcName); + + const readSummary = await getSessionPvcSummary({ store, sessionId, options: opts }); + assert.equal(readSummary.pvcName, summary.pvcName); + assert.equal(readSummary.pvcPhase, "Bound"); + assert.equal(readSummary.storageSizeBytes, 1024 * 1024 * 1024); + + await refreshSessionPvcSummary({ store, sessionId, summary: { ...summary, storageFilesCount: 7, storageSha256: "abc123" }, options: opts }); + const refreshed = await store.getSession(sessionId); + assert.equal(refreshed?.storageFilesCount, 7); + assert.equal(refreshed?.storageSha256, "abc123"); + + const evicted = await deleteSessionPvc({ store, sessionId, options: opts }); + assert.equal(evicted.storageKind, "evicted"); + const evictedSession = await store.getSession(sessionId); + assert.equal(evictedSession?.storageKind, "evicted"); + assert.ok(evictedSession?.storageEvictedAt); + + const now = Date.now(); + const expiredId = "sess_expired_001"; + const activeId = "sess_active_001"; + store.upsertSession({ sessionId: expiredId, tenantId: "hwlab", projectId: "pikasTech/HWLAB", backendProfile: "codex", conversationId: null, threadId: null, metadata: {}, expiresAt: new Date(now - 60_000).toISOString(), codexRolloutSubdir: "sessions" }); + await store.refreshSessionStorage({ sessionId: expiredId, storageKind: "pvc", pvcName: sessionPvcNameFor(expiredId), codexRolloutSubdir: "sessions" }); + store.upsertSession({ sessionId: activeId, tenantId: "hwlab", projectId: "pikasTech/HWLAB", backendProfile: "codex", conversationId: null, threadId: null, metadata: {}, expiresAt: new Date(now - 60_000).toISOString(), codexRolloutSubdir: "sessions" }); + await store.refreshSessionStorage({ sessionId: activeId, storageKind: "pvc", pvcName: sessionPvcNameFor(activeId), codexRolloutSubdir: "sessions" }); + const activeRun = store.createRun({ tenantId: "hwlab", projectId: "pikasTech/HWLAB", workspaceRef: { kind: "opaque", path: "." }, providerId: "G14", backendProfile: "codex", executionPolicy: { sandbox: "worktree", approval: "never", timeoutMs: 600_000, network: "restricted", secretScope: {} }, traceSink: null, sessionRef: { sessionId: activeId }, resourceBundleRef: null }); + store.createCommand(activeRun.id, { type: "turn", payload: { prompt: "test" } }); + const cycle = await runSessionStorageGc({ store, options: opts, now, maxSessions: 10 }); + assert.ok(cycle.scanned >= 1); + assert.ok(cycle.deleted >= 1); + const expiredAfter = await store.getSession(expiredId); + assert.equal(expiredAfter?.storageKind, "evicted"); + const activeAfter = await store.getSession(activeId); + assert.equal(activeAfter?.storageKind, "pvc"); + + const restStore = new MemoryAgentRunStore(); + const server = await startManagerServer({ port: 0, host: "127.0.0.1", sourceCommit: "self-test", store: restStore, sessionPvcOptions: { kubectlHandler: fakeKubectl } }); + try { + const client = new ManagerClient(server.baseUrl); + const create = await client.post("/api/v1/sessions", { sessionId: "sess_rest_create_001", tenantId: "hwlab", projectId: "pikasTech/HWLAB", backendProfile: "codex" }) as { action: string; pvc: { pvcName: string; pvcPhase: string } }; + assert.equal(create.action, "session-created"); + assert.ok(create.pvc.pvcName.startsWith("agentrun-v01-session-")); + const existing = await client.post("/api/v1/sessions", { sessionId: "sess_rest_create_001", tenantId: "hwlab", projectId: "pikasTech/HWLAB", backendProfile: "codex" }) as { action: string }; + assert.equal(existing.action, "session-exists"); + } finally { + if (server.server.listening) await new Promise((resolve) => server.server.close(() => resolve())); + } + + return { name: "mgr-session-pvc", tests: ["session-pvc-create-summary-eviction", "session-pvc-gc", "session-pvc-rest-create"] }; +}; + +export default selfTest;