Files
pikasTech-agentrun/src/mgr/postgres-store.ts
T
Codex e8cfa4c692 feat(v0.1): add mgr session PVC lifecycle for true session state persistence
PR B for #770: mgr/session-pvc.ts + server endpoints + selftest.

- 新模块 src/mgr/session-pvc.ts: createSessionPvc / getSessionPvcSummary / deleteSessionPvc / refreshSessionPvcSummary / runSessionStorageGc / startSessionStorageGcLoop
- Server 增量 4 个 endpoint:
  * POST /api/v1/sessions: 创建 session 同步创建 PVC
  * GET /api/v1/sessions/:id/storage: 查询 PVC 摘要
  * DELETE /api/v1/sessions/:id/storage: 删 PVC + storage_kind=evicted
  * POST /api/v1/sessions/:id/storage/refresh: runner 上报 PVC 摘要
  * POST /api/v1/sessions/storage/gc: 手动触发 GC
- mgr SA RBAC 已在 PR A 增加;manager server 不直连 Kubernetes API(kubectl 由 mgr 容器内执行)
- SessionRecord 增量 storageKind / storagePvcName / storageNamespace / storageSizeBytes / storageFilesCount / storageSha256 / storageUpdatedAt / storagePvcPhase / storageEvictedAt / codexRolloutSubdir
- kubernetes-runner-job 短路:run 引用 evicted session 时直接返回 session-store-evicted,不创建 runner Job
- KubectlHandler 可注入,selftest 覆盖 create / summary / refresh / eviction / gc / REST 路径
- GC loop 默认 5min(AGENTRUN_SESSION_GC_INTERVAL_MS 可调)

runner / backend / HWLAB adapter 在 PR C / PR D 落地。
2026-06-03 19:19:09 +08:00

1405 lines
74 KiB
TypeScript

import { createHash } from "node:crypto";
import { Pool } from "pg";
import type { PoolClient, QueryResultRow } from "pg";
import { AgentRunError } from "../common/errors.js";
import { redactJson } from "../common/redaction.js";
import type { BackendProfile, BackendTurnResult, CommandRecord, CommandState, CreateCommandInput, CreateQueueTaskInput, CreateRunInput, EventType, FailureKind, JsonRecord, JsonValue, ListGcExpiredSessionsInput, QueueCommanderSnapshot, QueueReadCursorRecord, QueueStats, QueueTaskListResult, QueueTaskRecord, QueueTaskState, RunEvent, RunnerJobRecord, RunnerRecord, RunRecord, RunStatus, SessionEventPage, SessionListResult, SessionReadCursorRecord, SessionRecord, SessionRef, SessionStoragePatch, SessionSummary, TerminalStatus, UpsertSessionInput } from "../common/types.js";
import { newId, nowIso, stableHash } from "../common/validation.js";
import type { AgentRunStore, ListQueueTasksInput, ListSessionsInput, SaveRunnerJobInput, SessionEventPageInput, StoreHealth, UpdateQueueTaskAttemptInput } from "./store.js";
import { assertSessionBoundary, buildQueueStats, buildSessionSummary, clampQueueLimit, clampSessionLimit, commandStateFromTerminal, isLeaseExpired, isSessionOutputEvent, isTerminalCommandState, isTerminalQueueTaskState, isTerminalRunStatus, parseQueueCursor, parseSessionCursor, queueTaskSort, sessionListFilters, sessionMatchesListState, sessionRefFromRecord, sessionSort, sessionTitleFromCommand, statusFromTerminal, summarizeResourceBundleRef, summarizeSessionRef, titleFromMetadata } from "./store.js";
import { backendCapabilitiesSqlValues, mergeBackendCapability } from "../common/backend-profiles.js";
import { normalizeRunEventPayload, requireEventType } from "../common/events.js";
interface PostgresStoreOptions {
connectionString: string;
}
interface MigrationDefinition {
id: string;
checksum: string;
sql: string;
}
const initialMigrationSql = `
CREATE TABLE IF NOT EXISTS agentrun_runs (
id text PRIMARY KEY,
tenant_id text NOT NULL,
project_id text NOT NULL,
workspace_ref jsonb NOT NULL,
provider_id text NOT NULL,
backend_profile text NOT NULL,
execution_policy jsonb NOT NULL,
trace_sink jsonb,
status text NOT NULL,
terminal_status text,
failure_kind text,
failure_message text,
created_at timestamptz NOT NULL,
updated_at timestamptz NOT NULL,
claimed_by text,
lease_expires_at timestamptz
);
CREATE TABLE IF NOT EXISTS agentrun_commands (
id text PRIMARY KEY,
run_id text NOT NULL REFERENCES agentrun_runs(id) ON DELETE CASCADE,
seq integer NOT NULL,
type text NOT NULL,
payload jsonb NOT NULL,
payload_hash text NOT NULL,
idempotency_key text,
state text NOT NULL,
created_at timestamptz NOT NULL,
updated_at timestamptz NOT NULL,
acknowledged_at timestamptz,
UNIQUE (run_id, seq)
);
CREATE UNIQUE INDEX IF NOT EXISTS agentrun_commands_run_idempotency_key_idx
ON agentrun_commands (run_id, idempotency_key)
WHERE idempotency_key IS NOT NULL;
CREATE TABLE IF NOT EXISTS agentrun_events (
id text PRIMARY KEY,
run_id text NOT NULL REFERENCES agentrun_runs(id) ON DELETE CASCADE,
seq integer NOT NULL,
type text NOT NULL,
payload jsonb NOT NULL,
created_at timestamptz NOT NULL,
UNIQUE (run_id, seq)
);
CREATE TABLE IF NOT EXISTS agentrun_runners (
id text PRIMARY KEY,
run_id text,
attempt_id text,
backend_profile text,
placement text,
source_commit text,
metadata jsonb NOT NULL DEFAULT '{}'::jsonb,
registered_at timestamptz NOT NULL,
heartbeat_at timestamptz NOT NULL
);
CREATE TABLE IF NOT EXISTS agentrun_backends (
profile text PRIMARY KEY,
capabilities jsonb NOT NULL,
capacity jsonb NOT NULL,
health jsonb NOT NULL,
updated_at timestamptz NOT NULL
);
CREATE TABLE IF NOT EXISTS agentrun_leases (
run_id text PRIMARY KEY REFERENCES agentrun_runs(id) ON DELETE CASCADE,
runner_id text NOT NULL,
lease_expires_at timestamptz NOT NULL,
stale_recovery_marker jsonb,
updated_at timestamptz NOT NULL
);
CREATE INDEX IF NOT EXISTS agentrun_runs_status_idx ON agentrun_runs (status, updated_at);
CREATE INDEX IF NOT EXISTS agentrun_events_run_seq_idx ON agentrun_events (run_id, seq);
CREATE INDEX IF NOT EXISTS agentrun_commands_run_seq_idx ON agentrun_commands (run_id, seq);
CREATE INDEX IF NOT EXISTS agentrun_leases_expiry_idx ON agentrun_leases (lease_expires_at);
INSERT INTO agentrun_backends (profile, capabilities, capacity, health, updated_at)
VALUES (
'codex',
'{"protocol":"codex-app-server-jsonrpc-stdio","transport":"stdio","command":"codex app-server --listen stdio://"}'::jsonb,
'{"mode":"manual-runner-v0.1"}'::jsonb,
'{"status":"registered"}'::jsonb,
now()
)
ON CONFLICT (profile) DO UPDATE SET
capabilities = EXCLUDED.capabilities,
capacity = EXCLUDED.capacity,
health = EXCLUDED.health,
updated_at = EXCLUDED.updated_at;
`;
const backendProfilesMigrationSql = `
INSERT INTO agentrun_backends (profile, capabilities, capacity, health, updated_at)
VALUES ${backendCapabilitiesSqlValues(["codex", "deepseek"])}
ON CONFLICT (profile) DO UPDATE SET
capabilities = EXCLUDED.capabilities,
capacity = EXCLUDED.capacity,
health = EXCLUDED.health,
updated_at = EXCLUDED.updated_at;
`;
const minimaxM3BackendProfileMigrationSql = `
INSERT INTO agentrun_backends (profile, capabilities, capacity, health, updated_at)
VALUES ${backendCapabilitiesSqlValues(["minimax-m3"])}
ON CONFLICT (profile) DO UPDATE SET
capabilities = EXCLUDED.capabilities,
capacity = EXCLUDED.capacity,
health = EXCLUDED.health,
updated_at = EXCLUDED.updated_at;
`;
const sessionControlMigrationSql = `
ALTER TABLE agentrun_sessions ADD COLUMN IF NOT EXISTS version bigint NOT NULL DEFAULT 1;
ALTER TABLE agentrun_sessions ADD COLUMN IF NOT EXISTS execution_state text NOT NULL DEFAULT 'idle';
ALTER TABLE agentrun_sessions ADD COLUMN IF NOT EXISTS last_run_id text;
ALTER TABLE agentrun_sessions ADD COLUMN IF NOT EXISTS last_command_id text;
ALTER TABLE agentrun_sessions ADD COLUMN IF NOT EXISTS active_run_id text;
ALTER TABLE agentrun_sessions ADD COLUMN IF NOT EXISTS active_command_id text;
ALTER TABLE agentrun_sessions ADD COLUMN IF NOT EXISTS last_event_seq integer NOT NULL DEFAULT 0;
ALTER TABLE agentrun_sessions ADD COLUMN IF NOT EXISTS terminal_status text;
ALTER TABLE agentrun_sessions ADD COLUMN IF NOT EXISTS failure_kind text;
ALTER TABLE agentrun_sessions ADD COLUMN IF NOT EXISTS title text;
ALTER TABLE agentrun_sessions ADD COLUMN IF NOT EXISTS summary jsonb NOT NULL DEFAULT '{}'::jsonb;
ALTER TABLE agentrun_sessions ADD COLUMN IF NOT EXISTS last_activity_at timestamptz;
CREATE SEQUENCE IF NOT EXISTS agentrun_session_version_seq;
SELECT setval(
'agentrun_session_version_seq',
GREATEST(
COALESCE((SELECT MAX(version) FROM agentrun_sessions), 0),
1
),
true
);
CREATE TABLE IF NOT EXISTS agentrun_session_read_cursors (
session_id text NOT NULL REFERENCES agentrun_sessions(session_id) ON DELETE CASCADE,
reader_id text NOT NULL,
session_version bigint NOT NULL,
read_at timestamptz NOT NULL,
PRIMARY KEY (session_id, reader_id)
);
CREATE INDEX IF NOT EXISTS agentrun_sessions_control_idx ON agentrun_sessions (execution_state, backend_profile, updated_at DESC);
CREATE INDEX IF NOT EXISTS agentrun_sessions_version_idx ON agentrun_sessions (version, updated_at DESC);
CREATE INDEX IF NOT EXISTS agentrun_runs_session_idx ON agentrun_runs ((session_ref->>'sessionId'), updated_at DESC);
`;
const sessionStateStorageMigrationSql = `
ALTER TABLE agentrun_sessions ADD COLUMN IF NOT EXISTS storage_kind text NOT NULL DEFAULT 'none';
ALTER TABLE agentrun_sessions ADD COLUMN IF NOT EXISTS storage_pvc_name text;
ALTER TABLE agentrun_sessions ADD COLUMN IF NOT EXISTS storage_namespace text;
ALTER TABLE agentrun_sessions ADD COLUMN IF NOT EXISTS storage_size_bytes bigint;
ALTER TABLE agentrun_sessions ADD COLUMN IF NOT EXISTS storage_files_count integer;
ALTER TABLE agentrun_sessions ADD COLUMN IF NOT EXISTS storage_sha256 text;
ALTER TABLE agentrun_sessions ADD COLUMN IF NOT EXISTS storage_updated_at timestamptz;
ALTER TABLE agentrun_sessions ADD COLUMN IF NOT EXISTS codex_rollout_subdir text NOT NULL DEFAULT 'sessions';
ALTER TABLE agentrun_sessions ADD COLUMN IF NOT EXISTS storage_pvc_phase text;
ALTER TABLE agentrun_sessions ADD COLUMN IF NOT EXISTS storage_evicted_at timestamptz;
CREATE INDEX IF NOT EXISTS agentrun_sessions_storage_pvc_idx
ON agentrun_sessions (storage_pvc_name)
WHERE storage_pvc_name IS NOT NULL;
CREATE INDEX IF NOT EXISTS agentrun_sessions_storage_kind_idx
ON agentrun_sessions (storage_kind, expires_at);
`;
const hwlabManualDispatchMigrationSql = `
ALTER TABLE agentrun_runs ADD COLUMN IF NOT EXISTS session_ref jsonb;
ALTER TABLE agentrun_runs ADD COLUMN IF NOT EXISTS resource_bundle_ref jsonb;
CREATE TABLE IF NOT EXISTS agentrun_sessions (
session_id text PRIMARY KEY,
tenant_id text NOT NULL,
project_id text NOT NULL,
backend_profile text NOT NULL,
conversation_id text,
thread_id text,
metadata jsonb NOT NULL DEFAULT '{}'::jsonb,
created_at timestamptz NOT NULL,
updated_at timestamptz NOT NULL,
expires_at timestamptz
);
CREATE TABLE IF NOT EXISTS agentrun_runner_jobs (
id text PRIMARY KEY,
run_id text NOT NULL REFERENCES agentrun_runs(id) ON DELETE CASCADE,
command_id text NOT NULL REFERENCES agentrun_commands(id) ON DELETE CASCADE,
idempotency_key text,
payload_hash text NOT NULL,
attempt_id text NOT NULL,
runner_id text NOT NULL,
namespace text NOT NULL,
job_name text NOT NULL,
manager_url text NOT NULL,
image text NOT NULL,
source_commit text NOT NULL,
service_account_name text,
result jsonb NOT NULL,
created_at timestamptz NOT NULL,
updated_at timestamptz NOT NULL
);
CREATE UNIQUE INDEX IF NOT EXISTS agentrun_runner_jobs_run_idempotency_key_idx
ON agentrun_runner_jobs (run_id, idempotency_key)
WHERE idempotency_key IS NOT NULL;
CREATE INDEX IF NOT EXISTS agentrun_runner_jobs_run_command_idx ON agentrun_runner_jobs (run_id, command_id, created_at);
CREATE INDEX IF NOT EXISTS agentrun_sessions_tenant_project_idx ON agentrun_sessions (tenant_id, project_id, backend_profile, updated_at);
`;
const queueQ1MigrationSql = `
CREATE TABLE IF NOT EXISTS agentrun_queue_tasks (
id text PRIMARY KEY,
tenant_id text NOT NULL,
project_id text NOT NULL,
queue text NOT NULL,
lane text NOT NULL,
title text NOT NULL,
priority integer NOT NULL,
state text NOT NULL,
version bigint NOT NULL,
backend_profile text NOT NULL,
provider_id text,
workspace_ref jsonb,
execution_policy jsonb,
resource_bundle_ref jsonb,
payload jsonb NOT NULL,
payload_hash text NOT NULL,
references_json jsonb NOT NULL DEFAULT '[]'::jsonb,
metadata jsonb NOT NULL DEFAULT '{}'::jsonb,
idempotency_key text,
latest_attempt jsonb,
session_path text,
created_at timestamptz NOT NULL,
updated_at timestamptz NOT NULL,
cancelled_at timestamptz,
cancel_reason text
);
CREATE UNIQUE INDEX IF NOT EXISTS agentrun_queue_tasks_idempotency_idx
ON agentrun_queue_tasks (tenant_id, project_id, idempotency_key)
WHERE idempotency_key IS NOT NULL;
CREATE INDEX IF NOT EXISTS agentrun_queue_tasks_list_idx ON agentrun_queue_tasks (queue, state, version, priority, created_at);
CREATE INDEX IF NOT EXISTS agentrun_queue_tasks_updated_idx ON agentrun_queue_tasks (version, updated_at);
CREATE TABLE IF NOT EXISTS agentrun_queue_read_cursors (
task_id text NOT NULL REFERENCES agentrun_queue_tasks(id) ON DELETE CASCADE,
reader_id text NOT NULL,
task_version bigint NOT NULL,
read_at timestamptz NOT NULL,
PRIMARY KEY (task_id, reader_id)
);
CREATE SEQUENCE IF NOT EXISTS agentrun_queue_version_seq;
`;
const postgresMigrations: MigrationDefinition[] = [
{
id: "001_v01_initial_durable_store",
checksum: checksumSql(initialMigrationSql),
sql: initialMigrationSql,
},
{
id: "002_v01_backend_profiles",
checksum: checksumSql(backendProfilesMigrationSql),
sql: backendProfilesMigrationSql,
},
{
id: "003_v01_hwlab_manual_dispatch",
checksum: checksumSql(hwlabManualDispatchMigrationSql),
sql: hwlabManualDispatchMigrationSql,
},
{
id: "004_v01_queue_q1",
checksum: checksumSql(queueQ1MigrationSql),
sql: queueQ1MigrationSql,
},
{
id: "005_v01_minimax_m3_backend_profile",
checksum: checksumSql(minimaxM3BackendProfileMigrationSql),
sql: minimaxM3BackendProfileMigrationSql,
},
{
id: "006_v01_session_control",
checksum: checksumSql(sessionControlMigrationSql),
sql: sessionControlMigrationSql,
},
{
id: "007_v01_session_state_storage",
checksum: checksumSql(sessionStateStorageMigrationSql),
sql: sessionStateStorageMigrationSql,
},
];
export function postgresMigrationContract(): JsonRecord {
return {
migrationIds: postgresMigrations.map((migration) => migration.id),
latestMigrationId: latestMigrationId(),
requiredTables: ["agentrun_schema_migrations", "agentrun_runs", "agentrun_commands", "agentrun_events", "agentrun_runners", "agentrun_backends", "agentrun_leases", "agentrun_sessions", "agentrun_session_read_cursors", "agentrun_runner_jobs", "agentrun_queue_tasks", "agentrun_queue_read_cursors"],
checksums: Object.fromEntries(postgresMigrations.map((migration) => [migration.id, migration.checksum])),
};
}
export async function createPostgresAgentRunStore(options: PostgresStoreOptions): Promise<PostgresAgentRunStore> {
const store = new PostgresAgentRunStore(options);
await store.migrate();
return store;
}
export class PostgresAgentRunStore implements AgentRunStore {
private readonly pool: Pool;
private migrationReady = false;
private appliedMigrationId: string | null = null;
constructor(options: PostgresStoreOptions) {
this.pool = new Pool({ connectionString: options.connectionString, application_name: "agentrun-mgr-v01" });
}
async migrate(): Promise<void> {
await this.withTransaction(async (client) => {
await client.query(`
CREATE TABLE IF NOT EXISTS agentrun_schema_migrations (
id text PRIMARY KEY,
checksum text NOT NULL,
applied_at timestamptz NOT NULL DEFAULT now()
)
`);
for (const migration of postgresMigrations) {
const existing = await client.query<{ checksum: string }>("SELECT checksum FROM agentrun_schema_migrations WHERE id = $1 FOR UPDATE", [migration.id]);
if (existing.rowCount && existing.rows[0]?.checksum !== migration.checksum) {
throw new AgentRunError("infra-failed", `schema migration checksum mismatch for ${migration.id}`, { httpStatus: 503, details: { migrationId: migration.id } });
}
if (!existing.rowCount) {
await client.query(migration.sql);
await client.query("INSERT INTO agentrun_schema_migrations (id, checksum) VALUES ($1, $2)", [migration.id, migration.checksum]);
}
}
});
this.migrationReady = true;
this.appliedMigrationId = latestMigrationId();
}
async health(): Promise<StoreHealth> {
try {
await this.pool.query("SELECT 1");
const result = await this.pool.query<{ id: string }>("SELECT id FROM agentrun_schema_migrations ORDER BY applied_at DESC, id DESC LIMIT 1");
const migrationId = result.rows[0]?.id ?? null;
const migrationReady = this.migrationReady && migrationId === latestMigrationId();
return { adapter: "postgres", ready: migrationReady, reachable: true, migrationReady, migrationId, failureKind: migrationReady ? null : "infra-failed", message: migrationReady ? null : "schema migration is not ready", credentialValuesPrinted: false };
} catch (error) {
return { adapter: "postgres", ready: false, reachable: false, migrationReady: false, migrationId: this.appliedMigrationId, failureKind: "infra-failed", message: error instanceof Error ? error.message : String(error), credentialValuesPrinted: false };
}
}
async createRun(input: CreateRunInput): Promise<RunRecord> {
const at = nowIso();
return this.withTransaction(async (client) => {
const sessionRef = await this.resolveSessionForRun(client, input, at);
const run: RunRecord = { ...input, sessionRef, resourceBundleRef: input.resourceBundleRef ?? null, id: newId("run"), status: "pending", terminalStatus: null, failureKind: null, failureMessage: null, createdAt: at, updatedAt: at, claimedBy: null, leaseExpiresAt: null };
await client.query(
`INSERT INTO agentrun_runs (id, tenant_id, project_id, workspace_ref, session_ref, resource_bundle_ref, provider_id, backend_profile, execution_policy, trace_sink, status, terminal_status, failure_kind, failure_message, created_at, updated_at, claimed_by, lease_expires_at)
VALUES ($1, $2, $3, $4::jsonb, $5::jsonb, $6::jsonb, $7, $8, $9::jsonb, $10::jsonb, $11, $12, $13, $14, $15, $16, $17, $18)`,
[run.id, run.tenantId, run.projectId, JSON.stringify(run.workspaceRef), JSON.stringify(run.sessionRef), JSON.stringify(run.resourceBundleRef), run.providerId, run.backendProfile, JSON.stringify(run.executionPolicy), JSON.stringify(run.traceSink), run.status, run.terminalStatus, run.failureKind, run.failureMessage, run.createdAt, run.updatedAt, run.claimedBy, run.leaseExpiresAt],
);
await this.touchSessionForRun(client, run, { lastRunId: run.id, lastActivityAt: at }, { bumpVersion: false, at });
await this.appendEventWithLockedRun(client, run.id, "backend_status", { phase: "run-created", backendProfile: run.backendProfile, sessionRef: summarizeSessionRef(run.sessionRef ?? null), resourceBundleRef: summarizeResourceBundleRef(run.resourceBundleRef ?? null) });
return run;
});
}
async getRun(runId: string): Promise<RunRecord> {
const result = await this.pool.query("SELECT * FROM agentrun_runs WHERE id = $1", [runId]);
const row = result.rows[0];
if (!row) throw new AgentRunError("schema-invalid", `run ${runId} was not found`, { httpStatus: 404 });
return runFromRow(row);
}
async listEvents(runId: string, afterSeq: number, limit: number): Promise<RunEvent[]> {
await this.getRun(runId);
const result = await this.pool.query("SELECT * FROM agentrun_events WHERE run_id = $1 AND seq > $2 ORDER BY seq ASC LIMIT $3", [runId, afterSeq, clamp(limit, 1, 500)]);
return result.rows.map(eventFromRow);
}
async createCommand(runId: string, input: CreateCommandInput): Promise<CommandRecord> {
const payloadHash = stableHash(input.payload);
return this.withTransaction(async (client) => {
const run = await this.requireRunForUpdate(client, runId);
if (isTerminalRunStatus(run.status)) throw new AgentRunError(run.failureKind ?? (run.status === "cancelled" ? "cancelled" : "schema-invalid"), `run ${runId} is already terminal: ${run.status}`, { httpStatus: 409 });
if (input.idempotencyKey) {
const existing = await client.query("SELECT * FROM agentrun_commands WHERE run_id = $1 AND idempotency_key = $2", [runId, input.idempotencyKey]);
if (existing.rows[0]) {
const command = commandFromRow(existing.rows[0]);
if (command.payloadHash !== payloadHash) throw new AgentRunError("schema-invalid", "idempotency key reused with different payload", { httpStatus: 409 });
return command;
}
}
const seq = await this.nextSeq(client, "agentrun_commands", runId);
const at = nowIso();
const command: CommandRecord = { ...input, id: newId("cmd"), runId, seq, state: "pending", payloadHash, createdAt: at, updatedAt: at, acknowledgedAt: null };
await client.query(
`INSERT INTO agentrun_commands (id, run_id, seq, type, payload, payload_hash, idempotency_key, state, created_at, updated_at, acknowledged_at)
VALUES ($1, $2, $3, $4, $5::jsonb, $6, $7, $8, $9, $10, $11)`,
[command.id, command.runId, command.seq, command.type, JSON.stringify(command.payload), command.payloadHash, command.idempotencyKey ?? null, command.state, command.createdAt, command.updatedAt, command.acknowledgedAt],
);
if (command.type === "turn") await this.touchSessionForRun(client, run, { executionState: "running", lastRunId: run.id, lastCommandId: command.id, activeRunId: run.id, activeCommandId: command.id, terminalStatus: null, failureKind: null, title: sessionTitleFromCommand(command), lastActivityAt: at }, { bumpVersion: true, at });
else if (command.type === "steer") await this.touchSessionForRun(client, run, { executionState: "running", lastRunId: run.id, lastCommandId: command.id, activeRunId: run.id, lastActivityAt: at }, { bumpVersion: true, at });
await this.appendEventWithLockedRun(client, runId, "backend_status", { phase: "command-created", commandId: command.id, commandType: command.type });
return command;
});
}
async getCommand(commandId: string): Promise<CommandRecord> {
const result = await this.pool.query("SELECT * FROM agentrun_commands WHERE id = $1", [commandId]);
const row = result.rows[0];
if (!row) throw new AgentRunError("schema-invalid", `command ${commandId} was not found`, { httpStatus: 404 });
return commandFromRow(row);
}
async listCommands(runId: string, afterSeq: number, limit: number): Promise<CommandRecord[]> {
await this.getRun(runId);
const result = await this.pool.query("SELECT * FROM agentrun_commands WHERE run_id = $1 AND seq > $2 ORDER BY seq ASC LIMIT $3", [runId, afterSeq, clamp(limit, 1, 100)]);
return result.rows.map(commandFromRow);
}
async registerRunner(input: Partial<RunnerRecord>): Promise<RunnerRecord> {
const at = nowIso();
const runner: RunnerRecord = { id: input.id ?? newId("runner"), registeredAt: at, heartbeatAt: at, ...input };
const metadata = metadataForRunner(runner);
const result = await this.pool.query(
`INSERT INTO agentrun_runners (id, run_id, attempt_id, backend_profile, placement, source_commit, metadata, registered_at, heartbeat_at)
VALUES ($1, $2, $3, $4, $5, $6, $7::jsonb, $8, $9)
ON CONFLICT (id) DO UPDATE SET
run_id = EXCLUDED.run_id,
attempt_id = EXCLUDED.attempt_id,
backend_profile = EXCLUDED.backend_profile,
placement = EXCLUDED.placement,
source_commit = EXCLUDED.source_commit,
metadata = EXCLUDED.metadata,
heartbeat_at = EXCLUDED.heartbeat_at
RETURNING *`,
[runner.id, runner.runId ?? null, runner.attemptId ?? null, runner.backendProfile ?? null, runner.placement ?? null, runner.sourceCommit ?? null, JSON.stringify(metadata), runner.registeredAt, runner.heartbeatAt],
);
return runnerFromRow(result.rows[0]);
}
async listRunnerJobs(runId: string, commandId?: string): Promise<RunnerJobRecord[]> {
await this.getRun(runId);
const params: unknown[] = [runId];
let where = "run_id = $1";
if (commandId) {
params.push(commandId);
where += ` AND command_id = $${params.length}`;
}
const result = await this.pool.query(`SELECT * FROM agentrun_runner_jobs WHERE ${where} ORDER BY created_at ASC`, params);
return result.rows.map(runnerJobFromRow);
}
async getRunnerJobByIdempotencyKey(runId: string, idempotencyKey: string, payloadHash: string): Promise<RunnerJobRecord | null> {
const result = await this.pool.query("SELECT * FROM agentrun_runner_jobs WHERE run_id = $1 AND idempotency_key = $2", [runId, idempotencyKey]);
const row = result.rows[0];
if (!row) return null;
const record = runnerJobFromRow(row);
if (record.payloadHash !== payloadHash) throw new AgentRunError("schema-invalid", "runner job idempotency key reused with different payload", { httpStatus: 409 });
return record;
}
async saveRunnerJob(input: SaveRunnerJobInput): Promise<RunnerJobRecord> {
return this.withTransaction(async (client) => {
await this.requireRunForUpdate(client, input.runId);
if (input.idempotencyKey) {
const existing = await client.query("SELECT * FROM agentrun_runner_jobs WHERE run_id = $1 AND idempotency_key = $2 FOR UPDATE", [input.runId, input.idempotencyKey]);
if (existing.rows[0]) {
const record = runnerJobFromRow(existing.rows[0]);
if (record.payloadHash !== input.payloadHash) throw new AgentRunError("schema-invalid", "runner job idempotency key reused with different payload", { httpStatus: 409 });
return record;
}
}
const at = nowIso();
const record: RunnerJobRecord = { ...input, id: newId("rjob"), idempotencyKey: input.idempotencyKey ?? null, serviceAccountName: input.serviceAccountName ?? null, createdAt: at, updatedAt: at };
const inserted = await client.query(
`INSERT INTO agentrun_runner_jobs (id, run_id, command_id, idempotency_key, payload_hash, attempt_id, runner_id, namespace, job_name, manager_url, image, source_commit, service_account_name, result, created_at, updated_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14::jsonb, $15, $16)
RETURNING *`,
[record.id, record.runId, record.commandId, record.idempotencyKey, record.payloadHash, record.attemptId, record.runnerId, record.namespace, record.jobName, record.managerUrl, record.image, record.sourceCommit, record.serviceAccountName, JSON.stringify(record.result), record.createdAt, record.updatedAt],
);
return runnerJobFromRow(inserted.rows[0]);
});
}
async claimRun(runId: string, runnerId: string, leaseMs: number): Promise<RunRecord> {
return this.withTransaction(async (client) => {
const run = await this.requireRunForUpdate(client, runId);
if (isTerminalRunStatus(run.status)) throw new AgentRunError(run.failureKind ?? (run.status === "cancelled" ? "cancelled" : "schema-invalid"), `run ${runId} is already terminal: ${run.status}`, { httpStatus: 409 });
if (run.claimedBy && run.claimedBy !== runnerId && !isLeaseExpired(run.leaseExpiresAt)) throw new AgentRunError("runner-lease-conflict", `run ${runId} is already claimed`, { httpStatus: 409 });
const leaseExpiresAt = new Date(Date.now() + leaseMs).toISOString();
const updated = await client.query(
`UPDATE agentrun_runs SET status = $2, claimed_by = $3, lease_expires_at = $4, updated_at = $5 WHERE id = $1 RETURNING *`,
[runId, "claimed", runnerId, leaseExpiresAt, nowIso()],
);
await client.query(
`INSERT INTO agentrun_leases (run_id, runner_id, lease_expires_at, stale_recovery_marker, updated_at)
VALUES ($1, $2, $3, $4::jsonb, $5)
ON CONFLICT (run_id) DO UPDATE SET runner_id = EXCLUDED.runner_id, lease_expires_at = EXCLUDED.lease_expires_at, updated_at = EXCLUDED.updated_at`,
[runId, runnerId, leaseExpiresAt, null, nowIso()],
);
const next = runFromRow(updated.rows[0]);
await this.touchSessionForRun(client, next, { executionState: "running", activeRunId: runId, lastRunId: runId, lastActivityAt: next.updatedAt }, { bumpVersion: false, at: next.updatedAt });
await this.appendEventWithLockedRun(client, runId, "backend_status", { phase: "run-claimed", runnerId });
return next;
});
}
async heartbeat(runId: string, runnerId: string, leaseMs: number): Promise<RunRecord> {
return this.withTransaction(async (client) => {
const run = await this.requireRunForUpdate(client, runId);
if (isTerminalRunStatus(run.status)) return run;
if (run.claimedBy !== runnerId) throw new AgentRunError("runner-lease-conflict", `run ${runId} is not claimed by ${runnerId}`, { httpStatus: 409 });
const leaseExpiresAt = new Date(Date.now() + leaseMs).toISOString();
const updated = await client.query("UPDATE agentrun_runs SET lease_expires_at = $2, updated_at = $3 WHERE id = $1 RETURNING *", [runId, leaseExpiresAt, nowIso()]);
await client.query("UPDATE agentrun_runners SET heartbeat_at = $2 WHERE id = $1", [runnerId, nowIso()]);
await client.query("UPDATE agentrun_leases SET lease_expires_at = $2, updated_at = $3 WHERE run_id = $1", [runId, leaseExpiresAt, nowIso()]);
return runFromRow(updated.rows[0]);
});
}
async ackCommand(commandId: string): Promise<CommandRecord> {
return this.withTransaction(async (client) => {
const existing = await client.query("SELECT * FROM agentrun_commands WHERE id = $1 FOR UPDATE", [commandId]);
const row = existing.rows[0];
if (!row) throw new AgentRunError("schema-invalid", `command ${commandId} was not found`, { httpStatus: 404 });
const command = commandFromRow(row);
if (isTerminalCommandState(command.state) || command.state === "acknowledged") return command;
const result = await client.query("UPDATE agentrun_commands SET state = $2, acknowledged_at = $3, updated_at = $3 WHERE id = $1 RETURNING *", [commandId, "acknowledged", nowIso()]);
return commandFromRow(result.rows[0]);
});
}
async finishCommand(commandId: string, result: Pick<BackendTurnResult, "terminalStatus" | "failureKind" | "failureMessage" | "threadId" | "turnId">): Promise<CommandRecord> {
return this.withTransaction(async (client) => {
const existing = await client.query("SELECT * FROM agentrun_commands WHERE id = $1 FOR UPDATE", [commandId]);
const row = existing.rows[0];
if (!row) throw new AgentRunError("schema-invalid", `command ${commandId} was not found`, { httpStatus: 404 });
const command = commandFromRow(row);
if (isTerminalCommandState(command.state)) return command;
const state = commandStateFromTerminal(result.terminalStatus);
const updated = await client.query("UPDATE agentrun_commands SET state = $2, updated_at = $3 WHERE id = $1 RETURNING *", [commandId, state, nowIso()]);
const run = await this.requireRunForUpdate(client, command.runId);
if (result.threadId && run.sessionRef?.sessionId) await this.upsertSessionThread(client, run, result.threadId, result.turnId ?? null);
if (command.type === "turn") await this.touchSessionForRun(client, run, { executionState: "terminal", activeRunId: null, activeCommandId: null, lastRunId: command.runId, lastCommandId: command.id, terminalStatus: result.terminalStatus, failureKind: result.failureKind, lastActivityAt: nowIso() }, { bumpVersion: true });
await this.appendEventWithLockedRun(client, command.runId, "backend_status", { phase: "command-terminal", commandId, state, terminalStatus: result.terminalStatus, failureKind: result.failureKind, message: result.failureMessage ?? null, threadId: result.threadId ?? null, turnId: result.turnId ?? null });
return commandFromRow(updated.rows[0]);
});
}
async appendEvent(runId: string, type: EventType, payload: JsonRecord): Promise<RunEvent> {
return this.withTransaction(async (client) => {
await this.requireRunForUpdate(client, runId);
return this.appendEventWithLockedRun(client, runId, type, payload);
});
}
async finishRun(runId: string, result: Pick<BackendTurnResult, "terminalStatus" | "failureKind" | "failureMessage" | "threadId" | "turnId">): Promise<RunRecord> {
return this.withTransaction(async (client) => {
const existing = await this.requireRunForUpdate(client, runId);
if (isTerminalRunStatus(existing.status)) return existing;
const status = statusFromTerminal(result.terminalStatus);
const updated = await client.query(
`UPDATE agentrun_runs SET status = $2, terminal_status = $3, failure_kind = $4, failure_message = $5, updated_at = $6 WHERE id = $1 RETURNING *`,
[runId, status, result.terminalStatus, result.failureKind, result.failureMessage, nowIso()],
);
const run = runFromRow(updated.rows[0]);
if (result.threadId && run.sessionRef?.sessionId) await this.upsertSessionThread(client, run, result.threadId, result.turnId ?? null);
await this.appendEventWithLockedRun(client, runId, "terminal_status", { terminalStatus: result.terminalStatus, failureKind: result.failureKind, message: result.failureMessage });
await this.touchSessionForRun(client, run, { executionState: "terminal", activeRunId: null, activeCommandId: null, lastRunId: runId, terminalStatus: result.terminalStatus, failureKind: result.failureKind, lastActivityAt: run.updatedAt }, { bumpVersion: true, at: run.updatedAt });
return run;
});
}
async cancelRun(runId: string, reason = "cancel requested"): Promise<RunRecord> {
return this.withTransaction(async (client) => {
const run = await this.requireRunForUpdate(client, runId);
if (isTerminalRunStatus(run.status)) return run;
const at = nowIso();
const commands = await client.query("SELECT * FROM agentrun_commands WHERE run_id = $1 AND state NOT IN ('completed', 'failed', 'cancelled') FOR UPDATE", [runId]);
for (const row of commands.rows) {
const command = commandFromRow(row);
await client.query("UPDATE agentrun_commands SET state = 'cancelled', updated_at = $2 WHERE id = $1", [command.id, at]);
await this.appendEventWithLockedRun(client, runId, "backend_status", { phase: "command-terminal", commandId: command.id, state: "cancelled", terminalStatus: "cancelled", failureKind: "cancelled" });
}
await this.appendEventWithLockedRun(client, runId, "backend_status", { phase: "cancel-requested", reason });
const updated = await client.query(
`UPDATE agentrun_runs SET status = 'cancelled', terminal_status = 'cancelled', failure_kind = 'cancelled', failure_message = $2, updated_at = $3 WHERE id = $1 RETURNING *`,
[runId, reason, at],
);
await this.appendEventWithLockedRun(client, runId, "terminal_status", { terminalStatus: "cancelled", failureKind: "cancelled", message: reason });
const next = runFromRow(updated.rows[0]);
await this.touchSessionForRun(client, next, { executionState: "terminal", activeRunId: null, activeCommandId: null, lastRunId: runId, terminalStatus: "cancelled", failureKind: "cancelled", lastActivityAt: at }, { bumpVersion: true, at });
return next;
});
}
async cancelCommand(commandId: string, reason = "cancel requested"): Promise<CommandRecord> {
return this.withTransaction(async (client) => {
const existing = await client.query("SELECT * FROM agentrun_commands WHERE id = $1 FOR UPDATE", [commandId]);
const row = existing.rows[0];
if (!row) throw new AgentRunError("schema-invalid", `command ${commandId} was not found`, { httpStatus: 404 });
const command = commandFromRow(row);
if (isTerminalCommandState(command.state)) return command;
const updated = await client.query("UPDATE agentrun_commands SET state = 'cancelled', updated_at = $2 WHERE id = $1 RETURNING *", [commandId, nowIso()]);
await this.appendEventWithLockedRun(client, command.runId, "backend_status", { phase: "command-terminal", commandId, state: "cancelled", terminalStatus: "cancelled", failureKind: "cancelled", message: reason });
if (command.type === "turn") {
const run = await this.requireRunForUpdate(client, command.runId);
await this.touchSessionForRun(client, run, { executionState: "terminal", activeRunId: null, activeCommandId: null, lastRunId: command.runId, lastCommandId: command.id, terminalStatus: "cancelled", failureKind: "cancelled", lastActivityAt: nowIso() }, { bumpVersion: true });
}
return commandFromRow(updated.rows[0]);
});
}
async getSession(sessionId: string): Promise<SessionRecord | null> {
const result = await this.pool.query("SELECT * FROM agentrun_sessions WHERE session_id = $1", [sessionId]);
return result.rows[0] ? sessionFromRow(result.rows[0]) : null;
}
async getSessionSummary(sessionId: string, readerId: string | null = null): Promise<SessionSummary> {
const session = await this.getSession(sessionId);
if (!session) throw new AgentRunError("schema-invalid", `session ${sessionId} was not found`, { httpStatus: 404 });
const cursor = readerId ? await this.getSessionReadCursor(sessionId, readerId) : null;
return buildSessionSummary(session, readerId, cursor);
}
async listSessions(input: ListSessionsInput): Promise<SessionListResult> {
const startVersion = parseSessionCursor(input.cursor) ?? 0;
const state = input.state ?? "default";
const params: unknown[] = [startVersion];
const where = ["version > $1"];
if (input.backendProfile) {
params.push(input.backendProfile);
where.push(`backend_profile = $${params.length}`);
}
const result = await this.pool.query(`SELECT * FROM agentrun_sessions WHERE ${where.join(" AND ")} ORDER BY updated_at DESC, session_id ASC LIMIT 500`, params);
const cursors = input.readerId ? await this.loadSessionReadCursors(input.readerId, result.rows.map((row) => stringValue(row.session_id))) : new Map<string, SessionReadCursorRecord>();
const items = result.rows
.map(sessionFromRow)
.map((session) => buildSessionSummary(session, input.readerId ?? null, input.readerId ? cursors.get(session.sessionId) ?? null : null))
.filter((session) => sessionMatchesListState(session, state))
.sort(sessionSort)
.slice(0, clampSessionLimit(input.limit));
return { items, count: items.length, cursor: items.length > 0 ? String(items[items.length - 1]?.version ?? startVersion) : null, filters: sessionListFilters(input) };
}
async listSessionTrace(sessionId: string, input: SessionEventPageInput): Promise<SessionEventPage> {
const runId = await this.resolveSessionRunId(sessionId, input.runId ?? null);
if (!runId) return { sessionId, runId: null, items: [], count: 0, cursor: null };
const items = await this.listEvents(runId, input.afterSeq, input.limit);
return { sessionId, runId, items, count: items.length, cursor: items.length > 0 ? String(items[items.length - 1]?.seq ?? input.afterSeq) : null };
}
async listSessionOutput(sessionId: string, input: SessionEventPageInput): Promise<SessionEventPage> {
const page = await this.listSessionTrace(sessionId, input);
const items = page.items.filter(isSessionOutputEvent);
return { ...page, items, count: items.length, cursor: items.length > 0 ? String(items[items.length - 1]?.seq ?? input.afterSeq) : null };
}
async markSessionRead(sessionId: string, readerId: string): Promise<SessionReadCursorRecord> {
return this.withTransaction(async (client) => {
const result = await client.query("SELECT * FROM agentrun_sessions WHERE session_id = $1 FOR UPDATE", [sessionId]);
const row = result.rows[0];
if (!row) throw new AgentRunError("schema-invalid", `session ${sessionId} was not found`, { httpStatus: 404 });
const session = sessionFromRow(row);
const record: SessionReadCursorRecord = { sessionId, readerId, sessionVersion: session.version, readAt: nowIso() };
await client.query(
`INSERT INTO agentrun_session_read_cursors (session_id, reader_id, session_version, read_at)
VALUES ($1, $2, $3, $4)
ON CONFLICT (session_id, reader_id) DO UPDATE SET session_version = EXCLUDED.session_version, read_at = EXCLUDED.read_at`,
[record.sessionId, record.readerId, record.sessionVersion, record.readAt],
);
return record;
});
}
async upsertSession(input: UpsertSessionInput): Promise<SessionRecord> {
return this.withTransaction(async (client) => {
const existing = await client.query<QueryResultRow>("SELECT * FROM agentrun_sessions WHERE session_id = $1 FOR UPDATE", [input.sessionId]);
const at = nowIso();
if (existing.rows[0]) {
const session = sessionFromRow(existing.rows[0]);
const next: SessionRecord = {
...session,
tenantId: input.tenantId,
projectId: input.projectId,
backendProfile: input.backendProfile,
conversationId: input.conversationId,
threadId: input.threadId,
metadata: input.metadata,
expiresAt: input.expiresAt,
codexRolloutSubdir: input.codexRolloutSubdir,
version: session.version + 1,
updatedAt: at,
};
await client.query(
`UPDATE agentrun_sessions
SET tenant_id = $2, project_id = $3, backend_profile = $4, conversation_id = $5, thread_id = $6,
metadata = $7, expires_at = $8, codex_rollout_subdir = $9, version = $10, updated_at = $11
WHERE session_id = $1`,
[next.sessionId, next.tenantId, next.projectId, next.backendProfile, next.conversationId, next.threadId,
JSON.stringify(next.metadata), next.expiresAt, next.codexRolloutSubdir, next.version, next.updatedAt],
);
return next;
}
const next: SessionRecord = {
sessionId: input.sessionId,
tenantId: input.tenantId,
projectId: input.projectId,
backendProfile: input.backendProfile,
conversationId: input.conversationId,
threadId: input.threadId,
metadata: input.metadata,
version: 1,
executionState: "idle",
lastRunId: null,
lastCommandId: null,
activeRunId: null,
activeCommandId: null,
lastEventSeq: 0,
terminalStatus: null,
failureKind: null,
title: null,
summary: {},
lastActivityAt: null,
createdAt: at,
updatedAt: at,
expiresAt: input.expiresAt,
storageKind: "none",
codexRolloutSubdir: input.codexRolloutSubdir,
};
await client.query(
`INSERT INTO agentrun_sessions (session_id, tenant_id, project_id, backend_profile, conversation_id, thread_id,
metadata, version, execution_state, last_run_id, last_command_id, active_run_id,
active_command_id, last_event_seq, terminal_status, failure_kind, title, summary,
last_activity_at, created_at, updated_at, expires_at, storage_kind, codex_rollout_subdir)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, 'idle', null, null, null, null, 0, null, null, null, '{}'::jsonb, null, $9, $10, $11, 'none', $12)`,
[next.sessionId, next.tenantId, next.projectId, next.backendProfile, next.conversationId, next.threadId,
JSON.stringify(next.metadata), next.version, next.createdAt, next.updatedAt, next.expiresAt, next.codexRolloutSubdir],
);
return next;
});
}
async refreshSessionStorage(input: SessionStoragePatch): Promise<SessionRecord> {
return this.withTransaction(async (client) => {
const existing = await client.query("SELECT * FROM agentrun_sessions WHERE session_id = $1 FOR UPDATE", [input.sessionId]);
if (!existing.rows[0]) throw new AgentRunError("schema-invalid", `session ${input.sessionId} was not found`, { httpStatus: 404 });
const session = sessionFromRow(existing.rows[0]);
const at = nowIso();
const next: SessionRecord = {
...session,
storageKind: input.storageKind,
storagePvcName: input.pvcName ?? null,
storageNamespace: input.storageNamespace ?? null,
storagePvcPhase: input.pvcPhase ?? null,
storageSizeBytes: input.storageSizeBytes ?? null,
storageFilesCount: input.storageFilesCount ?? null,
storageSha256: input.storageSha256 ?? null,
codexRolloutSubdir: input.codexRolloutSubdir ?? session.codexRolloutSubdir ?? "sessions",
storageUpdatedAt: at,
version: session.version + 1,
updatedAt: at,
};
await client.query(
`UPDATE agentrun_sessions
SET storage_kind = $2, storage_pvc_name = $3, storage_namespace = $4, storage_pvc_phase = $5,
storage_size_bytes = $6, storage_files_count = $7, storage_sha256 = $8,
storage_updated_at = $9, codex_rollout_subdir = $10, version = $11, updated_at = $12
WHERE session_id = $1`,
[next.sessionId, next.storageKind, next.storagePvcName, next.storageNamespace, next.storagePvcPhase,
next.storageSizeBytes, next.storageFilesCount, next.storageSha256,
next.storageUpdatedAt, next.codexRolloutSubdir, next.version, next.updatedAt],
);
return next;
});
}
async markSessionStorageEvicted(input: { sessionId: string; pvcName: string }): Promise<SessionRecord> {
return this.withTransaction(async (client) => {
const existing = await client.query("SELECT * FROM agentrun_sessions WHERE session_id = $1 FOR UPDATE", [input.sessionId]);
if (!existing.rows[0]) throw new AgentRunError("schema-invalid", `session ${input.sessionId} was not found`, { httpStatus: 404 });
const session = sessionFromRow(existing.rows[0]);
const at = nowIso();
const next: SessionRecord = {
...session,
storageKind: "evicted",
storagePvcName: input.pvcName,
storageEvictedAt: at,
storageUpdatedAt: at,
version: session.version + 1,
updatedAt: at,
};
await client.query(
`UPDATE agentrun_sessions
SET storage_kind = $2, storage_pvc_name = $3, storage_evicted_at = $4, storage_updated_at = $5,
version = $6, updated_at = $7
WHERE session_id = $1`,
[next.sessionId, next.storageKind, next.storagePvcName, next.storageEvictedAt, next.storageUpdatedAt, next.version, next.updatedAt],
);
return next;
});
}
async listGcExpiredSessions(input: ListGcExpiredSessionsInput): Promise<SessionRecord[]> {
const limit = Math.max(1, input.limit);
const result = await this.pool.query<QueryResultRow>(
`SELECT * FROM agentrun_sessions
WHERE storage_kind = 'pvc' AND storage_pvc_name IS NOT NULL
AND expires_at IS NOT NULL AND expires_at <= to_timestamp($1)
ORDER BY updated_at ASC
LIMIT $2`,
[input.now / 1000, limit],
);
return result.rows.map(sessionFromRow);
}
async createQueueTask(input: CreateQueueTaskInput): Promise<QueueTaskRecord> {
const payloadHash = stableHash(input.payload);
return this.withTransaction(async (client) => {
if (input.idempotencyKey) {
const existing = await client.query("SELECT * FROM agentrun_queue_tasks WHERE tenant_id = $1 AND project_id = $2 AND idempotency_key = $3 FOR UPDATE", [input.tenantId, input.projectId, input.idempotencyKey]);
if (existing.rows[0]) {
const record = queueTaskFromRow(existing.rows[0]);
if (record.payloadHash !== payloadHash) throw new AgentRunError("schema-invalid", "queue task idempotency key reused with different payload", { httpStatus: 409 });
return record;
}
}
const at = nowIso();
const version = await this.nextQueueVersion(client);
const task: QueueTaskRecord = { ...input, id: newId("qt"), state: "pending", version, payloadHash, latestAttempt: null, sessionPath: null, createdAt: at, updatedAt: at, cancelledAt: null, cancelReason: null };
const inserted = await client.query(
`INSERT INTO agentrun_queue_tasks (id, tenant_id, project_id, queue, lane, title, priority, state, version, backend_profile, provider_id, workspace_ref, execution_policy, resource_bundle_ref, payload, payload_hash, references_json, metadata, idempotency_key, latest_attempt, session_path, created_at, updated_at, cancelled_at, cancel_reason)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12::jsonb, $13::jsonb, $14::jsonb, $15::jsonb, $16, $17::jsonb, $18::jsonb, $19, $20::jsonb, $21, $22, $23, $24, $25)
RETURNING *`,
[task.id, task.tenantId, task.projectId, task.queue, task.lane, task.title, task.priority, task.state, task.version, task.backendProfile, task.providerId, JSON.stringify(task.workspaceRef), JSON.stringify(task.executionPolicy), JSON.stringify(task.resourceBundleRef), JSON.stringify(task.payload), task.payloadHash, JSON.stringify(task.references), JSON.stringify(task.metadata), task.idempotencyKey ?? null, JSON.stringify(task.latestAttempt), task.sessionPath, task.createdAt, task.updatedAt, task.cancelledAt, task.cancelReason],
);
return queueTaskFromRow(inserted.rows[0]);
});
}
async listQueueTasks(input: ListQueueTasksInput): Promise<QueueTaskListResult> {
const startVersion = parseQueueCursor(input.cursor) ?? input.updatedAfter ?? 0;
const params: unknown[] = [startVersion];
const where = ["version > $1"];
if (input.queue) {
params.push(input.queue);
where.push(`queue = $${params.length}`);
}
if (input.state) {
params.push(input.state);
where.push(`state = $${params.length}`);
}
params.push(clampQueueLimit(input.limit));
const result = await this.pool.query(`SELECT * FROM agentrun_queue_tasks WHERE ${where.join(" AND ")} ORDER BY priority DESC, created_at ASC, id ASC LIMIT $${params.length}`, params);
const items = result.rows.map(queueTaskFromRow);
return { items, count: items.length, cursor: items.length > 0 ? String(items[items.length - 1]?.version ?? startVersion) : null };
}
async getQueueTask(taskId: string): Promise<QueueTaskRecord> {
const result = await this.pool.query("SELECT * FROM agentrun_queue_tasks WHERE id = $1", [taskId]);
const row = result.rows[0];
if (!row) throw new AgentRunError("schema-invalid", `queue task ${taskId} was not found`, { httpStatus: 404 });
return queueTaskFromRow(row);
}
async updateQueueTaskAttempt(taskId: string, input: UpdateQueueTaskAttemptInput): Promise<QueueTaskRecord> {
return this.withTransaction(async (client) => {
const existing = await client.query("SELECT * FROM agentrun_queue_tasks WHERE id = $1 FOR UPDATE", [taskId]);
const row = existing.rows[0];
if (!row) throw new AgentRunError("schema-invalid", `queue task ${taskId} was not found`, { httpStatus: 404 });
const task = queueTaskFromRow(row);
if (isTerminalQueueTaskState(task.state)) throw new AgentRunError(task.state === "cancelled" ? "cancelled" : "schema-invalid", `queue task ${taskId} is already terminal: ${task.state}`, { httpStatus: 409 });
const version = await this.nextQueueVersion(client);
const at = nowIso();
const updated = await client.query(
"UPDATE agentrun_queue_tasks SET state = $2, latest_attempt = $3::jsonb, session_path = $4, version = $5, updated_at = $6 WHERE id = $1 RETURNING *",
[taskId, input.state, JSON.stringify(input.latestAttempt), input.sessionPath, version, at],
);
return queueTaskFromRow(updated.rows[0]);
});
}
async cancelQueueTask(taskId: string, reason = "cancel requested"): Promise<QueueTaskRecord> {
return this.withTransaction(async (client) => {
const existing = await client.query("SELECT * FROM agentrun_queue_tasks WHERE id = $1 FOR UPDATE", [taskId]);
const row = existing.rows[0];
if (!row) throw new AgentRunError("schema-invalid", `queue task ${taskId} was not found`, { httpStatus: 404 });
const task = queueTaskFromRow(row);
if (isTerminalQueueTaskState(task.state)) return task;
const at = nowIso();
const version = await this.nextQueueVersion(client);
const updated = await client.query("UPDATE agentrun_queue_tasks SET state = 'cancelled', version = $2, updated_at = $3, cancelled_at = $3, cancel_reason = $4 WHERE id = $1 RETURNING *", [taskId, version, at, reason]);
return queueTaskFromRow(updated.rows[0]);
});
}
async markQueueTaskRead(taskId: string, readerId: string): Promise<QueueReadCursorRecord> {
return this.withTransaction(async (client) => {
const existing = await client.query("SELECT * FROM agentrun_queue_tasks WHERE id = $1 FOR UPDATE", [taskId]);
const row = existing.rows[0];
if (!row) throw new AgentRunError("schema-invalid", `queue task ${taskId} was not found`, { httpStatus: 404 });
const task = queueTaskFromRow(row);
const record: QueueReadCursorRecord = { taskId, readerId, taskVersion: task.version, readAt: nowIso() };
await client.query(
`INSERT INTO agentrun_queue_read_cursors (task_id, reader_id, task_version, read_at)
VALUES ($1, $2, $3, $4)
ON CONFLICT (task_id, reader_id) DO UPDATE SET task_version = EXCLUDED.task_version, read_at = EXCLUDED.read_at`,
[record.taskId, record.readerId, record.taskVersion, record.readAt],
);
return record;
});
}
async queueStats(queue?: string): Promise<QueueStats> {
return buildQueueStats(await this.loadQueueTasksForProjection(queue), queue ?? null);
}
async queueCommander(queue?: string): Promise<QueueCommanderSnapshot> {
const tasks = await this.loadQueueTasksForProjection(queue);
const generatedAt = nowIso();
return { queue: queue ?? null, stats: buildQueueStats(tasks, queue ?? null, generatedAt), items: tasks.sort(queueTaskSort).slice(0, 20), generatedAt };
}
async backends(): Promise<JsonRecord[]> {
const result = await this.pool.query("SELECT * FROM agentrun_backends ORDER BY profile ASC");
return result.rows.map((row) => {
const profile = stringValue(row.profile);
return { ...mergeBackendCapability(profile, jsonRecord(row.capabilities)), capacity: jsonValue(row.capacity), health: jsonValue(row.health), updatedAt: nullableIso(row.updated_at) };
});
}
async close(): Promise<void> {
await this.pool.end();
}
private async appendEventWithLockedRun(client: PoolClient, runId: string, type: EventType, payload: JsonRecord): Promise<RunEvent> {
const eventType = requireEventType(type);
const eventPayload = normalizeRunEventPayload(eventType, payload);
const seq = await this.nextSeq(client, "agentrun_events", runId);
const event: RunEvent = { id: newId("evt"), runId, seq, type: eventType, payload: redactJson(eventPayload), createdAt: nowIso() };
await client.query("INSERT INTO agentrun_events (id, run_id, seq, type, payload, created_at) VALUES ($1, $2, $3, $4, $5::jsonb, $6)", [event.id, event.runId, event.seq, event.type, JSON.stringify(event.payload), event.createdAt]);
await client.query(
`UPDATE agentrun_sessions
SET last_event_seq = $2, last_activity_at = $3, updated_at = $3
WHERE session_id = (SELECT session_ref->>'sessionId' FROM agentrun_runs WHERE id = $1)`,
[runId, event.seq, event.createdAt],
);
return event;
}
private async nextSeq(client: PoolClient, table: "agentrun_commands" | "agentrun_events", runId: string): Promise<number> {
const result = await client.query<{ seq: number }>(`SELECT COALESCE(MAX(seq), 0) + 1 AS seq FROM ${table} WHERE run_id = $1`, [runId]);
return Number(result.rows[0]?.seq ?? 1);
}
private async nextQueueVersion(client: PoolClient): Promise<number> {
const result = await client.query<{ version: string | number }>("SELECT nextval('agentrun_queue_version_seq') AS version");
return Number(result.rows[0]?.version ?? 1);
}
private async nextSessionVersion(client: PoolClient): Promise<number> {
const result = await client.query<{ version: string | number }>("SELECT nextval('agentrun_session_version_seq') AS version");
return Number(result.rows[0]?.version ?? 1);
}
private async getSessionReadCursor(sessionId: string, readerId: string): Promise<SessionReadCursorRecord | null> {
const result = await this.pool.query("SELECT * FROM agentrun_session_read_cursors WHERE session_id = $1 AND reader_id = $2", [sessionId, readerId]);
return result.rows[0] ? sessionReadCursorFromRow(result.rows[0]) : null;
}
private async loadSessionReadCursors(readerId: string, sessionIds: string[]): Promise<Map<string, SessionReadCursorRecord>> {
if (sessionIds.length === 0) return new Map();
const result = await this.pool.query("SELECT * FROM agentrun_session_read_cursors WHERE reader_id = $1 AND session_id = ANY($2::text[])", [readerId, sessionIds]);
return new Map(result.rows.map((row) => {
const cursor = sessionReadCursorFromRow(row);
return [cursor.sessionId, cursor];
}));
}
private async loadQueueTasksForProjection(queue?: string): Promise<QueueTaskRecord[]> {
if (queue) {
const result = await this.pool.query("SELECT * FROM agentrun_queue_tasks WHERE queue = $1", [queue]);
return result.rows.map(queueTaskFromRow);
}
const result = await this.pool.query("SELECT * FROM agentrun_queue_tasks");
return result.rows.map(queueTaskFromRow);
}
private async requireRunForUpdate(client: PoolClient, runId: string): Promise<RunRecord> {
const result = await client.query("SELECT * FROM agentrun_runs WHERE id = $1 FOR UPDATE", [runId]);
const row = result.rows[0];
if (!row) throw new AgentRunError("schema-invalid", `run ${runId} was not found`, { httpStatus: 404 });
return runFromRow(row);
}
private async resolveSessionForRun(client: PoolClient, input: CreateRunInput, at: string): Promise<SessionRef | null> {
if (!input.sessionRef) return null;
const existing = await client.query("SELECT * FROM agentrun_sessions WHERE session_id = $1 FOR UPDATE", [input.sessionRef.sessionId]);
if (existing.rows[0]) {
const session = sessionFromRow(existing.rows[0]);
assertSessionBoundary(session, input);
return sessionRefFromRecord(session, input.sessionRef);
}
const record: SessionRecord = {
sessionId: input.sessionRef.sessionId,
tenantId: input.tenantId,
projectId: input.projectId,
backendProfile: input.backendProfile,
conversationId: input.sessionRef.conversationId ?? null,
threadId: input.sessionRef.threadId ?? null,
metadata: input.sessionRef.metadata ?? {},
version: await this.nextSessionVersion(client),
executionState: "idle",
lastRunId: null,
lastCommandId: null,
activeRunId: null,
activeCommandId: null,
lastEventSeq: 0,
terminalStatus: null,
failureKind: null,
title: titleFromMetadata(input.sessionRef.metadata ?? {}),
summary: {},
lastActivityAt: at,
createdAt: at,
updatedAt: at,
expiresAt: input.sessionRef.expiresAt ?? null,
};
await client.query(
`INSERT INTO agentrun_sessions (session_id, tenant_id, project_id, backend_profile, conversation_id, thread_id, metadata, version, execution_state, last_run_id, last_command_id, active_run_id, active_command_id, last_event_seq, terminal_status, failure_kind, title, summary, last_activity_at, created_at, updated_at, expires_at)
VALUES ($1, $2, $3, $4, $5, $6, $7::jsonb, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18::jsonb, $19, $20, $21, $22)`,
[record.sessionId, record.tenantId, record.projectId, record.backendProfile, record.conversationId, record.threadId, JSON.stringify(record.metadata), record.version, record.executionState, record.lastRunId, record.lastCommandId, record.activeRunId, record.activeCommandId, record.lastEventSeq, record.terminalStatus, record.failureKind, record.title, JSON.stringify(record.summary), record.lastActivityAt, record.createdAt, record.updatedAt, record.expiresAt],
);
return sessionRefFromRecord(record, input.sessionRef);
}
private async upsertSessionThread(client: PoolClient, run: RunRecord, threadId: string, turnId: string | null): Promise<void> {
if (!run.sessionRef?.sessionId) return;
const at = nowIso();
const existingResult = await client.query("SELECT * FROM agentrun_sessions WHERE session_id = $1 FOR UPDATE", [run.sessionRef.sessionId]);
const existing = existingResult.rows[0] ? sessionFromRow(existingResult.rows[0]) : null;
const metadata = { ...(existing?.metadata ?? {}), ...(run.sessionRef.metadata ?? {}), ...(turnId ? { lastTurnId: turnId } : {}) };
const record: SessionRecord = {
sessionId: run.sessionRef.sessionId,
tenantId: run.tenantId,
projectId: run.projectId,
backendProfile: run.backendProfile,
conversationId: run.sessionRef.conversationId ?? existing?.conversationId ?? null,
threadId,
metadata,
version: await this.nextSessionVersion(client),
executionState: existing?.executionState ?? "idle",
lastRunId: existing?.lastRunId ?? run.id,
lastCommandId: existing?.lastCommandId ?? null,
activeRunId: existing?.activeRunId ?? null,
activeCommandId: existing?.activeCommandId ?? null,
lastEventSeq: existing?.lastEventSeq ?? 0,
terminalStatus: existing?.terminalStatus ?? null,
failureKind: existing?.failureKind ?? null,
title: existing?.title ?? titleFromMetadata(run.sessionRef.metadata ?? {}),
summary: existing?.summary ?? {},
lastActivityAt: at,
createdAt: existing?.createdAt ?? at,
updatedAt: at,
expiresAt: run.sessionRef.expiresAt ?? existing?.expiresAt ?? null,
};
await client.query(
`INSERT INTO agentrun_sessions (session_id, tenant_id, project_id, backend_profile, conversation_id, thread_id, metadata, version, execution_state, last_run_id, last_command_id, active_run_id, active_command_id, last_event_seq, terminal_status, failure_kind, title, summary, last_activity_at, created_at, updated_at, expires_at)
VALUES ($1, $2, $3, $4, $5, $6, $7::jsonb, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18::jsonb, $19, $20, $21, $22)
ON CONFLICT (session_id) DO UPDATE SET
tenant_id = EXCLUDED.tenant_id,
project_id = EXCLUDED.project_id,
backend_profile = EXCLUDED.backend_profile,
conversation_id = EXCLUDED.conversation_id,
thread_id = EXCLUDED.thread_id,
metadata = EXCLUDED.metadata,
version = EXCLUDED.version,
execution_state = EXCLUDED.execution_state,
last_run_id = EXCLUDED.last_run_id,
last_command_id = EXCLUDED.last_command_id,
active_run_id = EXCLUDED.active_run_id,
active_command_id = EXCLUDED.active_command_id,
last_event_seq = EXCLUDED.last_event_seq,
terminal_status = EXCLUDED.terminal_status,
failure_kind = EXCLUDED.failure_kind,
title = EXCLUDED.title,
summary = EXCLUDED.summary,
last_activity_at = EXCLUDED.last_activity_at,
updated_at = EXCLUDED.updated_at,
expires_at = EXCLUDED.expires_at`,
[record.sessionId, record.tenantId, record.projectId, record.backendProfile, record.conversationId, record.threadId, JSON.stringify(record.metadata), record.version, record.executionState, record.lastRunId, record.lastCommandId, record.activeRunId, record.activeCommandId, record.lastEventSeq, record.terminalStatus, record.failureKind, record.title, JSON.stringify(record.summary), record.lastActivityAt, record.createdAt, record.updatedAt, record.expiresAt],
);
const nextSessionRef = sessionRefFromRecord(record, run.sessionRef);
await client.query("UPDATE agentrun_runs SET session_ref = $2::jsonb, updated_at = $3 WHERE id = $1", [run.id, JSON.stringify(nextSessionRef), at]);
await this.appendEventWithLockedRun(client, run.id, "backend_status", { phase: "session-updated", sessionRef: summarizeSessionRef(nextSessionRef), turnId });
}
private async touchSessionForRun(client: PoolClient, run: RunRecord, patch: Partial<SessionRecord>, options: { bumpVersion: boolean; at?: string }): Promise<void> {
const sessionId = run.sessionRef?.sessionId;
if (!sessionId) return;
const existingResult = await client.query("SELECT * FROM agentrun_sessions WHERE session_id = $1 FOR UPDATE", [sessionId]);
const existing = existingResult.rows[0] ? sessionFromRow(existingResult.rows[0]) : null;
if (!existing) return;
const at = options.at ?? nowIso();
const version = options.bumpVersion ? await this.nextSessionVersion(client) : existing.version;
await client.query(
`UPDATE agentrun_sessions SET
version = $2,
execution_state = $3,
last_run_id = $4,
last_command_id = $5,
active_run_id = $6,
active_command_id = $7,
last_event_seq = $8,
terminal_status = $9,
failure_kind = $10,
title = $11,
summary = $12::jsonb,
last_activity_at = $13,
updated_at = $14
WHERE session_id = $1`,
[
sessionId,
version,
patch.executionState ?? existing.executionState,
patch.lastRunId === undefined ? existing.lastRunId : patch.lastRunId,
patch.lastCommandId === undefined ? existing.lastCommandId : patch.lastCommandId,
patch.activeRunId === undefined ? existing.activeRunId : patch.activeRunId,
patch.activeCommandId === undefined ? existing.activeCommandId : patch.activeCommandId,
patch.lastEventSeq ?? existing.lastEventSeq,
patch.terminalStatus === undefined ? existing.terminalStatus : patch.terminalStatus,
patch.failureKind === undefined ? existing.failureKind : patch.failureKind,
patch.title === undefined ? existing.title : patch.title,
JSON.stringify(patch.summary ?? existing.summary),
patch.lastActivityAt ?? at,
at,
],
);
}
private async resolveSessionRunId(sessionId: string, requestedRunId: string | null): Promise<string | null> {
const session = await this.getSession(sessionId);
if (!session) throw new AgentRunError("schema-invalid", `session ${sessionId} was not found`, { httpStatus: 404 });
if (requestedRunId) {
const run = await this.getRun(requestedRunId);
if (run.sessionRef?.sessionId !== sessionId) throw new AgentRunError("schema-invalid", `run ${requestedRunId} does not belong to session ${sessionId}`, { httpStatus: 404 });
return requestedRunId;
}
return session.activeRunId ?? session.lastRunId;
}
private async withTransaction<T>(fn: (client: PoolClient) => Promise<T>): Promise<T> {
const client = await this.pool.connect();
try {
await client.query("BEGIN");
const result = await fn(client);
await client.query("COMMIT");
return result;
} catch (error) {
await client.query("ROLLBACK");
throw error;
} finally {
client.release();
}
}
}
function checksumSql(sql: string): string {
return createHash("sha256").update(sql.trim()).digest("hex");
}
function latestMigrationId(): string {
return postgresMigrations[postgresMigrations.length - 1]?.id ?? "none";
}
function clamp(value: number, min: number, max: number): number {
return Math.max(min, Math.min(value, max));
}
function runFromRow(row: QueryResultRow): RunRecord {
return {
id: stringValue(row.id),
tenantId: stringValue(row.tenant_id),
projectId: stringValue(row.project_id),
workspaceRef: jsonRecord(row.workspace_ref) as RunRecord["workspaceRef"],
sessionRef: jsonValue(row.session_ref) as RunRecord["sessionRef"],
resourceBundleRef: jsonValue(row.resource_bundle_ref) as RunRecord["resourceBundleRef"],
providerId: stringValue(row.provider_id),
backendProfile: stringValue(row.backend_profile) as BackendProfile,
executionPolicy: jsonRecord(row.execution_policy) as RunRecord["executionPolicy"],
traceSink: jsonValue(row.trace_sink),
status: stringValue(row.status) as RunStatus,
terminalStatus: nullableString(row.terminal_status) as TerminalStatus | null,
failureKind: nullableString(row.failure_kind) as FailureKind | null,
failureMessage: nullableString(row.failure_message),
createdAt: iso(row.created_at),
updatedAt: iso(row.updated_at),
claimedBy: nullableString(row.claimed_by),
leaseExpiresAt: nullableIso(row.lease_expires_at),
};
}
function commandFromRow(row: QueryResultRow): CommandRecord {
return {
id: stringValue(row.id),
runId: stringValue(row.run_id),
seq: Number(row.seq),
type: stringValue(row.type) as CommandRecord["type"],
payload: jsonRecord(row.payload),
payloadHash: stringValue(row.payload_hash),
...(nullableString(row.idempotency_key) ? { idempotencyKey: stringValue(row.idempotency_key) } : {}),
state: stringValue(row.state) as CommandState,
createdAt: iso(row.created_at),
updatedAt: iso(row.updated_at),
acknowledgedAt: nullableIso(row.acknowledged_at),
};
}
function eventFromRow(row: QueryResultRow): RunEvent {
return { id: stringValue(row.id), runId: stringValue(row.run_id), seq: Number(row.seq), type: stringValue(row.type) as EventType, payload: jsonRecord(row.payload), createdAt: iso(row.created_at) };
}
function runnerFromRow(row: QueryResultRow): RunnerRecord {
return {
...jsonRecord(row.metadata),
id: stringValue(row.id),
...(nullableString(row.run_id) ? { runId: stringValue(row.run_id) } : {}),
...(nullableString(row.attempt_id) ? { attemptId: stringValue(row.attempt_id) } : {}),
...(nullableString(row.backend_profile) ? { backendProfile: stringValue(row.backend_profile) as BackendProfile } : {}),
...(nullableString(row.placement) ? { placement: stringValue(row.placement) } : {}),
...(nullableString(row.source_commit) ? { sourceCommit: stringValue(row.source_commit) } : {}),
registeredAt: iso(row.registered_at),
heartbeatAt: iso(row.heartbeat_at),
};
}
function sessionFromRow(row: QueryResultRow): SessionRecord {
return {
sessionId: stringValue(row.session_id),
tenantId: stringValue(row.tenant_id),
projectId: stringValue(row.project_id),
backendProfile: stringValue(row.backend_profile) as BackendProfile,
conversationId: nullableString(row.conversation_id),
threadId: nullableString(row.thread_id),
metadata: jsonRecord(row.metadata),
version: Number(row.version ?? 1),
executionState: sessionExecutionState(row.execution_state),
lastRunId: nullableString(row.last_run_id),
lastCommandId: nullableString(row.last_command_id),
activeRunId: nullableString(row.active_run_id),
activeCommandId: nullableString(row.active_command_id),
lastEventSeq: Number(row.last_event_seq ?? 0),
terminalStatus: nullableString(row.terminal_status) as TerminalStatus | null,
failureKind: nullableString(row.failure_kind) as FailureKind | null,
title: nullableString(row.title),
summary: jsonRecord(row.summary),
lastActivityAt: nullableIso(row.last_activity_at),
createdAt: iso(row.created_at),
updatedAt: iso(row.updated_at),
expiresAt: nullableIso(row.expires_at),
storageKind: (stringValue(row.storage_kind ?? "none") as SessionRecord["storageKind"]) ?? "none",
storagePvcName: nullableString(row.storage_pvc_name),
storageNamespace: nullableString(row.storage_namespace),
storageSizeBytes: row.storage_size_bytes !== null && row.storage_size_bytes !== undefined ? Number(row.storage_size_bytes) : null,
storageFilesCount: row.storage_files_count !== null && row.storage_files_count !== undefined ? Number(row.storage_files_count) : null,
storageSha256: nullableString(row.storage_sha256),
storagePvcPhase: nullableString(row.storage_pvc_phase),
storageUpdatedAt: nullableIso(row.storage_updated_at),
storageEvictedAt: nullableIso(row.storage_evicted_at),
codexRolloutSubdir: stringValue(row.codex_rollout_subdir ?? "sessions"),
};
}
function sessionReadCursorFromRow(row: QueryResultRow): SessionReadCursorRecord {
return { sessionId: stringValue(row.session_id), readerId: stringValue(row.reader_id), sessionVersion: Number(row.session_version), readAt: iso(row.read_at) };
}
function sessionExecutionState(value: unknown): SessionRecord["executionState"] {
if (value === "running" || value === "terminal") return value;
return "idle";
}
function runnerJobFromRow(row: QueryResultRow): RunnerJobRecord {
return {
id: stringValue(row.id),
runId: stringValue(row.run_id),
commandId: stringValue(row.command_id),
idempotencyKey: nullableString(row.idempotency_key),
payloadHash: stringValue(row.payload_hash),
attemptId: stringValue(row.attempt_id),
runnerId: stringValue(row.runner_id),
namespace: stringValue(row.namespace),
jobName: stringValue(row.job_name),
managerUrl: stringValue(row.manager_url),
image: stringValue(row.image),
sourceCommit: stringValue(row.source_commit),
serviceAccountName: nullableString(row.service_account_name),
result: jsonRecord(row.result),
createdAt: iso(row.created_at),
updatedAt: iso(row.updated_at),
};
}
function queueTaskFromRow(row: QueryResultRow): QueueTaskRecord {
return {
id: stringValue(row.id),
tenantId: stringValue(row.tenant_id),
projectId: stringValue(row.project_id),
queue: stringValue(row.queue),
lane: stringValue(row.lane),
title: stringValue(row.title),
priority: Number(row.priority),
state: stringValue(row.state) as QueueTaskState,
version: Number(row.version),
backendProfile: stringValue(row.backend_profile) as BackendProfile,
providerId: nullableString(row.provider_id),
workspaceRef: jsonValue(row.workspace_ref) as QueueTaskRecord["workspaceRef"],
executionPolicy: jsonValue(row.execution_policy) as QueueTaskRecord["executionPolicy"],
resourceBundleRef: jsonValue(row.resource_bundle_ref) as QueueTaskRecord["resourceBundleRef"],
payload: jsonRecord(row.payload),
payloadHash: stringValue(row.payload_hash),
references: jsonArray(row.references_json) as JsonRecord[],
metadata: jsonRecord(row.metadata),
...(nullableString(row.idempotency_key) ? { idempotencyKey: stringValue(row.idempotency_key) } : {}),
latestAttempt: jsonValue(row.latest_attempt) as QueueTaskRecord["latestAttempt"],
sessionPath: nullableString(row.session_path),
createdAt: iso(row.created_at),
updatedAt: iso(row.updated_at),
cancelledAt: nullableIso(row.cancelled_at),
cancelReason: nullableString(row.cancel_reason),
};
}
function metadataForRunner(runner: RunnerRecord): JsonRecord {
const { id: _id, runId: _runId, attemptId: _attemptId, backendProfile: _backendProfile, placement: _placement, sourceCommit: _sourceCommit, registeredAt: _registeredAt, heartbeatAt: _heartbeatAt, ...metadata } = runner;
return redactJson(metadata);
}
function stringValue(value: unknown): string {
return typeof value === "string" ? value : String(value ?? "");
}
function nullableString(value: unknown): string | null {
return value === null || value === undefined ? null : stringValue(value);
}
function jsonValue(value: unknown): JsonValue {
if (value === undefined) return null;
return value as JsonValue;
}
function jsonRecord(value: unknown): JsonRecord {
return typeof value === "object" && value !== null && !Array.isArray(value) ? value as JsonRecord : {};
}
function jsonArray(value: unknown): JsonValue[] {
return Array.isArray(value) ? value as JsonValue[] : [];
}
function iso(value: unknown): string {
if (value instanceof Date) return value.toISOString();
if (typeof value === "string") return new Date(value).toISOString();
return new Date(String(value)).toISOString();
}
function nullableIso(value: unknown): string | null {
return value === null || value === undefined ? null : iso(value);
}