Files
pikasTech-agentrun/src/mgr/postgres-store.ts
T
2026-06-02 08:11:15 +08:00

980 lines
49 KiB
TypeScript

import { createHash } from "node:crypto";
import { Pool } from "pg";
import type { PoolClient, QueryResultRow } from "pg";
import { AgentRunError } from "../common/errors.js";
import { redactJson } from "../common/redaction.js";
import type { BackendProfile, BackendTurnResult, CommandRecord, CommandState, CreateCommandInput, CreateQueueTaskInput, CreateRunInput, EventType, FailureKind, JsonRecord, JsonValue, QueueCommanderSnapshot, QueueReadCursorRecord, QueueStats, QueueTaskListResult, QueueTaskRecord, QueueTaskState, RunEvent, RunnerJobRecord, RunnerRecord, RunRecord, RunStatus, SessionRecord, SessionRef, TerminalStatus } from "../common/types.js";
import { newId, nowIso, stableHash } from "../common/validation.js";
import type { AgentRunStore, ListQueueTasksInput, SaveRunnerJobInput, StoreHealth, UpdateQueueTaskAttemptInput } from "./store.js";
import { assertSessionBoundary, buildQueueStats, clampQueueLimit, commandStateFromTerminal, isLeaseExpired, isTerminalCommandState, isTerminalQueueTaskState, isTerminalRunStatus, parseQueueCursor, queueTaskSort, sessionRefFromRecord, statusFromTerminal, summarizeResourceBundleRef, summarizeSessionRef } from "./store.js";
import { backendCapabilitiesSqlValues, mergeBackendCapability } from "../common/backend-profiles.js";
import { normalizeRunEventPayload, requireEventType } from "../common/events.js";
interface PostgresStoreOptions {
connectionString: string;
}
interface MigrationDefinition {
id: string;
checksum: string;
sql: string;
}
const initialMigrationSql = `
CREATE TABLE IF NOT EXISTS agentrun_runs (
id text PRIMARY KEY,
tenant_id text NOT NULL,
project_id text NOT NULL,
workspace_ref jsonb NOT NULL,
provider_id text NOT NULL,
backend_profile text NOT NULL,
execution_policy jsonb NOT NULL,
trace_sink jsonb,
status text NOT NULL,
terminal_status text,
failure_kind text,
failure_message text,
created_at timestamptz NOT NULL,
updated_at timestamptz NOT NULL,
claimed_by text,
lease_expires_at timestamptz
);
CREATE TABLE IF NOT EXISTS agentrun_commands (
id text PRIMARY KEY,
run_id text NOT NULL REFERENCES agentrun_runs(id) ON DELETE CASCADE,
seq integer NOT NULL,
type text NOT NULL,
payload jsonb NOT NULL,
payload_hash text NOT NULL,
idempotency_key text,
state text NOT NULL,
created_at timestamptz NOT NULL,
updated_at timestamptz NOT NULL,
acknowledged_at timestamptz,
UNIQUE (run_id, seq)
);
CREATE UNIQUE INDEX IF NOT EXISTS agentrun_commands_run_idempotency_key_idx
ON agentrun_commands (run_id, idempotency_key)
WHERE idempotency_key IS NOT NULL;
CREATE TABLE IF NOT EXISTS agentrun_events (
id text PRIMARY KEY,
run_id text NOT NULL REFERENCES agentrun_runs(id) ON DELETE CASCADE,
seq integer NOT NULL,
type text NOT NULL,
payload jsonb NOT NULL,
created_at timestamptz NOT NULL,
UNIQUE (run_id, seq)
);
CREATE TABLE IF NOT EXISTS agentrun_runners (
id text PRIMARY KEY,
run_id text,
attempt_id text,
backend_profile text,
placement text,
source_commit text,
metadata jsonb NOT NULL DEFAULT '{}'::jsonb,
registered_at timestamptz NOT NULL,
heartbeat_at timestamptz NOT NULL
);
CREATE TABLE IF NOT EXISTS agentrun_backends (
profile text PRIMARY KEY,
capabilities jsonb NOT NULL,
capacity jsonb NOT NULL,
health jsonb NOT NULL,
updated_at timestamptz NOT NULL
);
CREATE TABLE IF NOT EXISTS agentrun_leases (
run_id text PRIMARY KEY REFERENCES agentrun_runs(id) ON DELETE CASCADE,
runner_id text NOT NULL,
lease_expires_at timestamptz NOT NULL,
stale_recovery_marker jsonb,
updated_at timestamptz NOT NULL
);
CREATE INDEX IF NOT EXISTS agentrun_runs_status_idx ON agentrun_runs (status, updated_at);
CREATE INDEX IF NOT EXISTS agentrun_events_run_seq_idx ON agentrun_events (run_id, seq);
CREATE INDEX IF NOT EXISTS agentrun_commands_run_seq_idx ON agentrun_commands (run_id, seq);
CREATE INDEX IF NOT EXISTS agentrun_leases_expiry_idx ON agentrun_leases (lease_expires_at);
INSERT INTO agentrun_backends (profile, capabilities, capacity, health, updated_at)
VALUES (
'codex',
'{"protocol":"codex-app-server-jsonrpc-stdio","transport":"stdio","command":"codex app-server --listen stdio://"}'::jsonb,
'{"mode":"manual-runner-v0.1"}'::jsonb,
'{"status":"registered"}'::jsonb,
now()
)
ON CONFLICT (profile) DO UPDATE SET
capabilities = EXCLUDED.capabilities,
capacity = EXCLUDED.capacity,
health = EXCLUDED.health,
updated_at = EXCLUDED.updated_at;
`;
const backendProfilesMigrationSql = `
INSERT INTO agentrun_backends (profile, capabilities, capacity, health, updated_at)
VALUES ${backendCapabilitiesSqlValues(["codex", "deepseek"])}
ON CONFLICT (profile) DO UPDATE SET
capabilities = EXCLUDED.capabilities,
capacity = EXCLUDED.capacity,
health = EXCLUDED.health,
updated_at = EXCLUDED.updated_at;
`;
const minimaxM3BackendProfileMigrationSql = `
INSERT INTO agentrun_backends (profile, capabilities, capacity, health, updated_at)
VALUES ${backendCapabilitiesSqlValues(["minimax-m3"])}
ON CONFLICT (profile) DO UPDATE SET
capabilities = EXCLUDED.capabilities,
capacity = EXCLUDED.capacity,
health = EXCLUDED.health,
updated_at = EXCLUDED.updated_at;
`;
const hwlabManualDispatchMigrationSql = `
ALTER TABLE agentrun_runs ADD COLUMN IF NOT EXISTS session_ref jsonb;
ALTER TABLE agentrun_runs ADD COLUMN IF NOT EXISTS resource_bundle_ref jsonb;
CREATE TABLE IF NOT EXISTS agentrun_sessions (
session_id text PRIMARY KEY,
tenant_id text NOT NULL,
project_id text NOT NULL,
backend_profile text NOT NULL,
conversation_id text,
thread_id text,
metadata jsonb NOT NULL DEFAULT '{}'::jsonb,
created_at timestamptz NOT NULL,
updated_at timestamptz NOT NULL,
expires_at timestamptz
);
CREATE TABLE IF NOT EXISTS agentrun_runner_jobs (
id text PRIMARY KEY,
run_id text NOT NULL REFERENCES agentrun_runs(id) ON DELETE CASCADE,
command_id text NOT NULL REFERENCES agentrun_commands(id) ON DELETE CASCADE,
idempotency_key text,
payload_hash text NOT NULL,
attempt_id text NOT NULL,
runner_id text NOT NULL,
namespace text NOT NULL,
job_name text NOT NULL,
manager_url text NOT NULL,
image text NOT NULL,
source_commit text NOT NULL,
service_account_name text,
result jsonb NOT NULL,
created_at timestamptz NOT NULL,
updated_at timestamptz NOT NULL
);
CREATE UNIQUE INDEX IF NOT EXISTS agentrun_runner_jobs_run_idempotency_key_idx
ON agentrun_runner_jobs (run_id, idempotency_key)
WHERE idempotency_key IS NOT NULL;
CREATE INDEX IF NOT EXISTS agentrun_runner_jobs_run_command_idx ON agentrun_runner_jobs (run_id, command_id, created_at);
CREATE INDEX IF NOT EXISTS agentrun_sessions_tenant_project_idx ON agentrun_sessions (tenant_id, project_id, backend_profile, updated_at);
`;
const queueQ1MigrationSql = `
CREATE TABLE IF NOT EXISTS agentrun_queue_tasks (
id text PRIMARY KEY,
tenant_id text NOT NULL,
project_id text NOT NULL,
queue text NOT NULL,
lane text NOT NULL,
title text NOT NULL,
priority integer NOT NULL,
state text NOT NULL,
version bigint NOT NULL,
backend_profile text NOT NULL,
provider_id text,
workspace_ref jsonb,
execution_policy jsonb,
resource_bundle_ref jsonb,
payload jsonb NOT NULL,
payload_hash text NOT NULL,
references_json jsonb NOT NULL DEFAULT '[]'::jsonb,
metadata jsonb NOT NULL DEFAULT '{}'::jsonb,
idempotency_key text,
latest_attempt jsonb,
session_path text,
created_at timestamptz NOT NULL,
updated_at timestamptz NOT NULL,
cancelled_at timestamptz,
cancel_reason text
);
CREATE UNIQUE INDEX IF NOT EXISTS agentrun_queue_tasks_idempotency_idx
ON agentrun_queue_tasks (tenant_id, project_id, idempotency_key)
WHERE idempotency_key IS NOT NULL;
CREATE INDEX IF NOT EXISTS agentrun_queue_tasks_list_idx ON agentrun_queue_tasks (queue, state, version, priority, created_at);
CREATE INDEX IF NOT EXISTS agentrun_queue_tasks_updated_idx ON agentrun_queue_tasks (version, updated_at);
CREATE TABLE IF NOT EXISTS agentrun_queue_read_cursors (
task_id text NOT NULL REFERENCES agentrun_queue_tasks(id) ON DELETE CASCADE,
reader_id text NOT NULL,
task_version bigint NOT NULL,
read_at timestamptz NOT NULL,
PRIMARY KEY (task_id, reader_id)
);
CREATE SEQUENCE IF NOT EXISTS agentrun_queue_version_seq;
`;
const postgresMigrations: MigrationDefinition[] = [
{
id: "001_v01_initial_durable_store",
checksum: checksumSql(initialMigrationSql),
sql: initialMigrationSql,
},
{
id: "002_v01_backend_profiles",
checksum: checksumSql(backendProfilesMigrationSql),
sql: backendProfilesMigrationSql,
},
{
id: "003_v01_hwlab_manual_dispatch",
checksum: checksumSql(hwlabManualDispatchMigrationSql),
sql: hwlabManualDispatchMigrationSql,
},
{
id: "004_v01_queue_q1",
checksum: checksumSql(queueQ1MigrationSql),
sql: queueQ1MigrationSql,
},
{
id: "005_v01_minimax_m3_backend_profile",
checksum: checksumSql(minimaxM3BackendProfileMigrationSql),
sql: minimaxM3BackendProfileMigrationSql,
},
];
export function postgresMigrationContract(): JsonRecord {
return {
migrationIds: postgresMigrations.map((migration) => migration.id),
latestMigrationId: latestMigrationId(),
requiredTables: ["agentrun_schema_migrations", "agentrun_runs", "agentrun_commands", "agentrun_events", "agentrun_runners", "agentrun_backends", "agentrun_leases", "agentrun_sessions", "agentrun_runner_jobs", "agentrun_queue_tasks", "agentrun_queue_read_cursors"],
checksums: Object.fromEntries(postgresMigrations.map((migration) => [migration.id, migration.checksum])),
};
}
export async function createPostgresAgentRunStore(options: PostgresStoreOptions): Promise<PostgresAgentRunStore> {
const store = new PostgresAgentRunStore(options);
await store.migrate();
return store;
}
export class PostgresAgentRunStore implements AgentRunStore {
private readonly pool: Pool;
private migrationReady = false;
private appliedMigrationId: string | null = null;
constructor(options: PostgresStoreOptions) {
this.pool = new Pool({ connectionString: options.connectionString, application_name: "agentrun-mgr-v01" });
}
async migrate(): Promise<void> {
await this.withTransaction(async (client) => {
await client.query(`
CREATE TABLE IF NOT EXISTS agentrun_schema_migrations (
id text PRIMARY KEY,
checksum text NOT NULL,
applied_at timestamptz NOT NULL DEFAULT now()
)
`);
for (const migration of postgresMigrations) {
const existing = await client.query<{ checksum: string }>("SELECT checksum FROM agentrun_schema_migrations WHERE id = $1 FOR UPDATE", [migration.id]);
if (existing.rowCount && existing.rows[0]?.checksum !== migration.checksum) {
throw new AgentRunError("infra-failed", `schema migration checksum mismatch for ${migration.id}`, { httpStatus: 503, details: { migrationId: migration.id } });
}
if (!existing.rowCount) {
await client.query(migration.sql);
await client.query("INSERT INTO agentrun_schema_migrations (id, checksum) VALUES ($1, $2)", [migration.id, migration.checksum]);
}
}
});
this.migrationReady = true;
this.appliedMigrationId = latestMigrationId();
}
async health(): Promise<StoreHealth> {
try {
await this.pool.query("SELECT 1");
const result = await this.pool.query<{ id: string }>("SELECT id FROM agentrun_schema_migrations ORDER BY applied_at DESC, id DESC LIMIT 1");
const migrationId = result.rows[0]?.id ?? null;
const migrationReady = this.migrationReady && migrationId === latestMigrationId();
return { adapter: "postgres", ready: migrationReady, reachable: true, migrationReady, migrationId, failureKind: migrationReady ? null : "infra-failed", message: migrationReady ? null : "schema migration is not ready", credentialValuesPrinted: false };
} catch (error) {
return { adapter: "postgres", ready: false, reachable: false, migrationReady: false, migrationId: this.appliedMigrationId, failureKind: "infra-failed", message: error instanceof Error ? error.message : String(error), credentialValuesPrinted: false };
}
}
async createRun(input: CreateRunInput): Promise<RunRecord> {
const at = nowIso();
return this.withTransaction(async (client) => {
const sessionRef = await this.resolveSessionForRun(client, input, at);
const run: RunRecord = { ...input, sessionRef, resourceBundleRef: input.resourceBundleRef ?? null, id: newId("run"), status: "pending", terminalStatus: null, failureKind: null, failureMessage: null, createdAt: at, updatedAt: at, claimedBy: null, leaseExpiresAt: null };
await client.query(
`INSERT INTO agentrun_runs (id, tenant_id, project_id, workspace_ref, session_ref, resource_bundle_ref, provider_id, backend_profile, execution_policy, trace_sink, status, terminal_status, failure_kind, failure_message, created_at, updated_at, claimed_by, lease_expires_at)
VALUES ($1, $2, $3, $4::jsonb, $5::jsonb, $6::jsonb, $7, $8, $9::jsonb, $10::jsonb, $11, $12, $13, $14, $15, $16, $17, $18)`,
[run.id, run.tenantId, run.projectId, JSON.stringify(run.workspaceRef), JSON.stringify(run.sessionRef), JSON.stringify(run.resourceBundleRef), run.providerId, run.backendProfile, JSON.stringify(run.executionPolicy), JSON.stringify(run.traceSink), run.status, run.terminalStatus, run.failureKind, run.failureMessage, run.createdAt, run.updatedAt, run.claimedBy, run.leaseExpiresAt],
);
await this.appendEventWithLockedRun(client, run.id, "backend_status", { phase: "run-created", backendProfile: run.backendProfile, sessionRef: summarizeSessionRef(run.sessionRef ?? null), resourceBundleRef: summarizeResourceBundleRef(run.resourceBundleRef ?? null) });
return run;
});
}
async getRun(runId: string): Promise<RunRecord> {
const result = await this.pool.query("SELECT * FROM agentrun_runs WHERE id = $1", [runId]);
const row = result.rows[0];
if (!row) throw new AgentRunError("schema-invalid", `run ${runId} was not found`, { httpStatus: 404 });
return runFromRow(row);
}
async listEvents(runId: string, afterSeq: number, limit: number): Promise<RunEvent[]> {
await this.getRun(runId);
const result = await this.pool.query("SELECT * FROM agentrun_events WHERE run_id = $1 AND seq > $2 ORDER BY seq ASC LIMIT $3", [runId, afterSeq, clamp(limit, 1, 500)]);
return result.rows.map(eventFromRow);
}
async createCommand(runId: string, input: CreateCommandInput): Promise<CommandRecord> {
const payloadHash = stableHash(input.payload);
return this.withTransaction(async (client) => {
const run = await this.requireRunForUpdate(client, runId);
if (isTerminalRunStatus(run.status)) throw new AgentRunError(run.failureKind ?? (run.status === "cancelled" ? "cancelled" : "schema-invalid"), `run ${runId} is already terminal: ${run.status}`, { httpStatus: 409 });
if (input.idempotencyKey) {
const existing = await client.query("SELECT * FROM agentrun_commands WHERE run_id = $1 AND idempotency_key = $2", [runId, input.idempotencyKey]);
if (existing.rows[0]) {
const command = commandFromRow(existing.rows[0]);
if (command.payloadHash !== payloadHash) throw new AgentRunError("schema-invalid", "idempotency key reused with different payload", { httpStatus: 409 });
return command;
}
}
const seq = await this.nextSeq(client, "agentrun_commands", runId);
const at = nowIso();
const command: CommandRecord = { ...input, id: newId("cmd"), runId, seq, state: "pending", payloadHash, createdAt: at, updatedAt: at, acknowledgedAt: null };
await client.query(
`INSERT INTO agentrun_commands (id, run_id, seq, type, payload, payload_hash, idempotency_key, state, created_at, updated_at, acknowledged_at)
VALUES ($1, $2, $3, $4, $5::jsonb, $6, $7, $8, $9, $10, $11)`,
[command.id, command.runId, command.seq, command.type, JSON.stringify(command.payload), command.payloadHash, command.idempotencyKey ?? null, command.state, command.createdAt, command.updatedAt, command.acknowledgedAt],
);
await this.appendEventWithLockedRun(client, runId, "backend_status", { phase: "command-created", commandId: command.id, commandType: command.type });
return command;
});
}
async getCommand(commandId: string): Promise<CommandRecord> {
const result = await this.pool.query("SELECT * FROM agentrun_commands WHERE id = $1", [commandId]);
const row = result.rows[0];
if (!row) throw new AgentRunError("schema-invalid", `command ${commandId} was not found`, { httpStatus: 404 });
return commandFromRow(row);
}
async listCommands(runId: string, afterSeq: number, limit: number): Promise<CommandRecord[]> {
await this.getRun(runId);
const result = await this.pool.query("SELECT * FROM agentrun_commands WHERE run_id = $1 AND seq > $2 ORDER BY seq ASC LIMIT $3", [runId, afterSeq, clamp(limit, 1, 100)]);
return result.rows.map(commandFromRow);
}
async registerRunner(input: Partial<RunnerRecord>): Promise<RunnerRecord> {
const at = nowIso();
const runner: RunnerRecord = { id: input.id ?? newId("runner"), registeredAt: at, heartbeatAt: at, ...input };
const metadata = metadataForRunner(runner);
const result = await this.pool.query(
`INSERT INTO agentrun_runners (id, run_id, attempt_id, backend_profile, placement, source_commit, metadata, registered_at, heartbeat_at)
VALUES ($1, $2, $3, $4, $5, $6, $7::jsonb, $8, $9)
ON CONFLICT (id) DO UPDATE SET
run_id = EXCLUDED.run_id,
attempt_id = EXCLUDED.attempt_id,
backend_profile = EXCLUDED.backend_profile,
placement = EXCLUDED.placement,
source_commit = EXCLUDED.source_commit,
metadata = EXCLUDED.metadata,
heartbeat_at = EXCLUDED.heartbeat_at
RETURNING *`,
[runner.id, runner.runId ?? null, runner.attemptId ?? null, runner.backendProfile ?? null, runner.placement ?? null, runner.sourceCommit ?? null, JSON.stringify(metadata), runner.registeredAt, runner.heartbeatAt],
);
return runnerFromRow(result.rows[0]);
}
async listRunnerJobs(runId: string, commandId?: string): Promise<RunnerJobRecord[]> {
await this.getRun(runId);
const params: unknown[] = [runId];
let where = "run_id = $1";
if (commandId) {
params.push(commandId);
where += ` AND command_id = $${params.length}`;
}
const result = await this.pool.query(`SELECT * FROM agentrun_runner_jobs WHERE ${where} ORDER BY created_at ASC`, params);
return result.rows.map(runnerJobFromRow);
}
async getRunnerJobByIdempotencyKey(runId: string, idempotencyKey: string, payloadHash: string): Promise<RunnerJobRecord | null> {
const result = await this.pool.query("SELECT * FROM agentrun_runner_jobs WHERE run_id = $1 AND idempotency_key = $2", [runId, idempotencyKey]);
const row = result.rows[0];
if (!row) return null;
const record = runnerJobFromRow(row);
if (record.payloadHash !== payloadHash) throw new AgentRunError("schema-invalid", "runner job idempotency key reused with different payload", { httpStatus: 409 });
return record;
}
async saveRunnerJob(input: SaveRunnerJobInput): Promise<RunnerJobRecord> {
return this.withTransaction(async (client) => {
await this.requireRunForUpdate(client, input.runId);
if (input.idempotencyKey) {
const existing = await client.query("SELECT * FROM agentrun_runner_jobs WHERE run_id = $1 AND idempotency_key = $2 FOR UPDATE", [input.runId, input.idempotencyKey]);
if (existing.rows[0]) {
const record = runnerJobFromRow(existing.rows[0]);
if (record.payloadHash !== input.payloadHash) throw new AgentRunError("schema-invalid", "runner job idempotency key reused with different payload", { httpStatus: 409 });
return record;
}
}
const at = nowIso();
const record: RunnerJobRecord = { ...input, id: newId("rjob"), idempotencyKey: input.idempotencyKey ?? null, serviceAccountName: input.serviceAccountName ?? null, createdAt: at, updatedAt: at };
const inserted = await client.query(
`INSERT INTO agentrun_runner_jobs (id, run_id, command_id, idempotency_key, payload_hash, attempt_id, runner_id, namespace, job_name, manager_url, image, source_commit, service_account_name, result, created_at, updated_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14::jsonb, $15, $16)
RETURNING *`,
[record.id, record.runId, record.commandId, record.idempotencyKey, record.payloadHash, record.attemptId, record.runnerId, record.namespace, record.jobName, record.managerUrl, record.image, record.sourceCommit, record.serviceAccountName, JSON.stringify(record.result), record.createdAt, record.updatedAt],
);
return runnerJobFromRow(inserted.rows[0]);
});
}
async claimRun(runId: string, runnerId: string, leaseMs: number): Promise<RunRecord> {
return this.withTransaction(async (client) => {
const run = await this.requireRunForUpdate(client, runId);
if (isTerminalRunStatus(run.status)) throw new AgentRunError(run.failureKind ?? (run.status === "cancelled" ? "cancelled" : "schema-invalid"), `run ${runId} is already terminal: ${run.status}`, { httpStatus: 409 });
if (run.claimedBy && run.claimedBy !== runnerId && !isLeaseExpired(run.leaseExpiresAt)) throw new AgentRunError("runner-lease-conflict", `run ${runId} is already claimed`, { httpStatus: 409 });
const leaseExpiresAt = new Date(Date.now() + leaseMs).toISOString();
const updated = await client.query(
`UPDATE agentrun_runs SET status = $2, claimed_by = $3, lease_expires_at = $4, updated_at = $5 WHERE id = $1 RETURNING *`,
[runId, "claimed", runnerId, leaseExpiresAt, nowIso()],
);
await client.query(
`INSERT INTO agentrun_leases (run_id, runner_id, lease_expires_at, stale_recovery_marker, updated_at)
VALUES ($1, $2, $3, $4::jsonb, $5)
ON CONFLICT (run_id) DO UPDATE SET runner_id = EXCLUDED.runner_id, lease_expires_at = EXCLUDED.lease_expires_at, updated_at = EXCLUDED.updated_at`,
[runId, runnerId, leaseExpiresAt, null, nowIso()],
);
await this.appendEventWithLockedRun(client, runId, "backend_status", { phase: "run-claimed", runnerId });
return runFromRow(updated.rows[0]);
});
}
async heartbeat(runId: string, runnerId: string, leaseMs: number): Promise<RunRecord> {
return this.withTransaction(async (client) => {
const run = await this.requireRunForUpdate(client, runId);
if (isTerminalRunStatus(run.status)) return run;
if (run.claimedBy !== runnerId) throw new AgentRunError("runner-lease-conflict", `run ${runId} is not claimed by ${runnerId}`, { httpStatus: 409 });
const leaseExpiresAt = new Date(Date.now() + leaseMs).toISOString();
const updated = await client.query("UPDATE agentrun_runs SET lease_expires_at = $2, updated_at = $3 WHERE id = $1 RETURNING *", [runId, leaseExpiresAt, nowIso()]);
await client.query("UPDATE agentrun_runners SET heartbeat_at = $2 WHERE id = $1", [runnerId, nowIso()]);
await client.query("UPDATE agentrun_leases SET lease_expires_at = $2, updated_at = $3 WHERE run_id = $1", [runId, leaseExpiresAt, nowIso()]);
return runFromRow(updated.rows[0]);
});
}
async ackCommand(commandId: string): Promise<CommandRecord> {
return this.withTransaction(async (client) => {
const existing = await client.query("SELECT * FROM agentrun_commands WHERE id = $1 FOR UPDATE", [commandId]);
const row = existing.rows[0];
if (!row) throw new AgentRunError("schema-invalid", `command ${commandId} was not found`, { httpStatus: 404 });
const command = commandFromRow(row);
if (isTerminalCommandState(command.state) || command.state === "acknowledged") return command;
const result = await client.query("UPDATE agentrun_commands SET state = $2, acknowledged_at = $3, updated_at = $3 WHERE id = $1 RETURNING *", [commandId, "acknowledged", nowIso()]);
return commandFromRow(result.rows[0]);
});
}
async finishCommand(commandId: string, result: Pick<BackendTurnResult, "terminalStatus" | "failureKind" | "failureMessage" | "threadId" | "turnId">): Promise<CommandRecord> {
return this.withTransaction(async (client) => {
const existing = await client.query("SELECT * FROM agentrun_commands WHERE id = $1 FOR UPDATE", [commandId]);
const row = existing.rows[0];
if (!row) throw new AgentRunError("schema-invalid", `command ${commandId} was not found`, { httpStatus: 404 });
const command = commandFromRow(row);
if (isTerminalCommandState(command.state)) return command;
const state = commandStateFromTerminal(result.terminalStatus);
const updated = await client.query("UPDATE agentrun_commands SET state = $2, updated_at = $3 WHERE id = $1 RETURNING *", [commandId, state, nowIso()]);
const run = await this.requireRunForUpdate(client, command.runId);
if (result.threadId && run.sessionRef?.sessionId) await this.upsertSessionThread(client, run, result.threadId, result.turnId ?? null);
await this.appendEventWithLockedRun(client, command.runId, "backend_status", { phase: "command-terminal", commandId, state, terminalStatus: result.terminalStatus, failureKind: result.failureKind, message: result.failureMessage ?? null, threadId: result.threadId ?? null, turnId: result.turnId ?? null });
return commandFromRow(updated.rows[0]);
});
}
async appendEvent(runId: string, type: EventType, payload: JsonRecord): Promise<RunEvent> {
return this.withTransaction(async (client) => {
await this.requireRunForUpdate(client, runId);
return this.appendEventWithLockedRun(client, runId, type, payload);
});
}
async finishRun(runId: string, result: Pick<BackendTurnResult, "terminalStatus" | "failureKind" | "failureMessage" | "threadId" | "turnId">): Promise<RunRecord> {
return this.withTransaction(async (client) => {
const existing = await this.requireRunForUpdate(client, runId);
if (isTerminalRunStatus(existing.status)) return existing;
const status = statusFromTerminal(result.terminalStatus);
const updated = await client.query(
`UPDATE agentrun_runs SET status = $2, terminal_status = $3, failure_kind = $4, failure_message = $5, updated_at = $6 WHERE id = $1 RETURNING *`,
[runId, status, result.terminalStatus, result.failureKind, result.failureMessage, nowIso()],
);
const run = runFromRow(updated.rows[0]);
if (result.threadId && run.sessionRef?.sessionId) await this.upsertSessionThread(client, run, result.threadId, result.turnId ?? null);
await this.appendEventWithLockedRun(client, runId, "terminal_status", { terminalStatus: result.terminalStatus, failureKind: result.failureKind, message: result.failureMessage });
return run;
});
}
async cancelRun(runId: string, reason = "cancel requested"): Promise<RunRecord> {
return this.withTransaction(async (client) => {
const run = await this.requireRunForUpdate(client, runId);
if (isTerminalRunStatus(run.status)) return run;
const at = nowIso();
const commands = await client.query("SELECT * FROM agentrun_commands WHERE run_id = $1 AND state NOT IN ('completed', 'failed', 'cancelled') FOR UPDATE", [runId]);
for (const row of commands.rows) {
const command = commandFromRow(row);
await client.query("UPDATE agentrun_commands SET state = 'cancelled', updated_at = $2 WHERE id = $1", [command.id, at]);
await this.appendEventWithLockedRun(client, runId, "backend_status", { phase: "command-terminal", commandId: command.id, state: "cancelled", terminalStatus: "cancelled", failureKind: "cancelled" });
}
await this.appendEventWithLockedRun(client, runId, "backend_status", { phase: "cancel-requested", reason });
const updated = await client.query(
`UPDATE agentrun_runs SET status = 'cancelled', terminal_status = 'cancelled', failure_kind = 'cancelled', failure_message = $2, updated_at = $3 WHERE id = $1 RETURNING *`,
[runId, reason, at],
);
await this.appendEventWithLockedRun(client, runId, "terminal_status", { terminalStatus: "cancelled", failureKind: "cancelled", message: reason });
return runFromRow(updated.rows[0]);
});
}
async cancelCommand(commandId: string, reason = "cancel requested"): Promise<CommandRecord> {
return this.withTransaction(async (client) => {
const existing = await client.query("SELECT * FROM agentrun_commands WHERE id = $1 FOR UPDATE", [commandId]);
const row = existing.rows[0];
if (!row) throw new AgentRunError("schema-invalid", `command ${commandId} was not found`, { httpStatus: 404 });
const command = commandFromRow(row);
if (isTerminalCommandState(command.state)) return command;
const updated = await client.query("UPDATE agentrun_commands SET state = 'cancelled', updated_at = $2 WHERE id = $1 RETURNING *", [commandId, nowIso()]);
await this.appendEventWithLockedRun(client, command.runId, "backend_status", { phase: "command-terminal", commandId, state: "cancelled", terminalStatus: "cancelled", failureKind: "cancelled", message: reason });
return commandFromRow(updated.rows[0]);
});
}
async getSession(sessionId: string): Promise<SessionRecord | null> {
const result = await this.pool.query("SELECT * FROM agentrun_sessions WHERE session_id = $1", [sessionId]);
return result.rows[0] ? sessionFromRow(result.rows[0]) : null;
}
async createQueueTask(input: CreateQueueTaskInput): Promise<QueueTaskRecord> {
const payloadHash = stableHash(input.payload);
return this.withTransaction(async (client) => {
if (input.idempotencyKey) {
const existing = await client.query("SELECT * FROM agentrun_queue_tasks WHERE tenant_id = $1 AND project_id = $2 AND idempotency_key = $3 FOR UPDATE", [input.tenantId, input.projectId, input.idempotencyKey]);
if (existing.rows[0]) {
const record = queueTaskFromRow(existing.rows[0]);
if (record.payloadHash !== payloadHash) throw new AgentRunError("schema-invalid", "queue task idempotency key reused with different payload", { httpStatus: 409 });
return record;
}
}
const at = nowIso();
const version = await this.nextQueueVersion(client);
const task: QueueTaskRecord = { ...input, id: newId("qt"), state: "pending", version, payloadHash, latestAttempt: null, sessionPath: null, createdAt: at, updatedAt: at, cancelledAt: null, cancelReason: null };
const inserted = await client.query(
`INSERT INTO agentrun_queue_tasks (id, tenant_id, project_id, queue, lane, title, priority, state, version, backend_profile, provider_id, workspace_ref, execution_policy, resource_bundle_ref, payload, payload_hash, references_json, metadata, idempotency_key, latest_attempt, session_path, created_at, updated_at, cancelled_at, cancel_reason)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12::jsonb, $13::jsonb, $14::jsonb, $15::jsonb, $16, $17::jsonb, $18::jsonb, $19, $20::jsonb, $21, $22, $23, $24, $25)
RETURNING *`,
[task.id, task.tenantId, task.projectId, task.queue, task.lane, task.title, task.priority, task.state, task.version, task.backendProfile, task.providerId, JSON.stringify(task.workspaceRef), JSON.stringify(task.executionPolicy), JSON.stringify(task.resourceBundleRef), JSON.stringify(task.payload), task.payloadHash, JSON.stringify(task.references), JSON.stringify(task.metadata), task.idempotencyKey ?? null, JSON.stringify(task.latestAttempt), task.sessionPath, task.createdAt, task.updatedAt, task.cancelledAt, task.cancelReason],
);
return queueTaskFromRow(inserted.rows[0]);
});
}
async listQueueTasks(input: ListQueueTasksInput): Promise<QueueTaskListResult> {
const startVersion = parseQueueCursor(input.cursor) ?? input.updatedAfter ?? 0;
const params: unknown[] = [startVersion];
const where = ["version > $1"];
if (input.queue) {
params.push(input.queue);
where.push(`queue = $${params.length}`);
}
if (input.state) {
params.push(input.state);
where.push(`state = $${params.length}`);
}
params.push(clampQueueLimit(input.limit));
const result = await this.pool.query(`SELECT * FROM agentrun_queue_tasks WHERE ${where.join(" AND ")} ORDER BY priority DESC, created_at ASC, id ASC LIMIT $${params.length}`, params);
const items = result.rows.map(queueTaskFromRow);
return { items, count: items.length, cursor: items.length > 0 ? String(items[items.length - 1]?.version ?? startVersion) : null };
}
async getQueueTask(taskId: string): Promise<QueueTaskRecord> {
const result = await this.pool.query("SELECT * FROM agentrun_queue_tasks WHERE id = $1", [taskId]);
const row = result.rows[0];
if (!row) throw new AgentRunError("schema-invalid", `queue task ${taskId} was not found`, { httpStatus: 404 });
return queueTaskFromRow(row);
}
async updateQueueTaskAttempt(taskId: string, input: UpdateQueueTaskAttemptInput): Promise<QueueTaskRecord> {
return this.withTransaction(async (client) => {
const existing = await client.query("SELECT * FROM agentrun_queue_tasks WHERE id = $1 FOR UPDATE", [taskId]);
const row = existing.rows[0];
if (!row) throw new AgentRunError("schema-invalid", `queue task ${taskId} was not found`, { httpStatus: 404 });
const task = queueTaskFromRow(row);
if (isTerminalQueueTaskState(task.state)) throw new AgentRunError(task.state === "cancelled" ? "cancelled" : "schema-invalid", `queue task ${taskId} is already terminal: ${task.state}`, { httpStatus: 409 });
const version = await this.nextQueueVersion(client);
const at = nowIso();
const updated = await client.query(
"UPDATE agentrun_queue_tasks SET state = $2, latest_attempt = $3::jsonb, session_path = $4, version = $5, updated_at = $6 WHERE id = $1 RETURNING *",
[taskId, input.state, JSON.stringify(input.latestAttempt), input.sessionPath, version, at],
);
return queueTaskFromRow(updated.rows[0]);
});
}
async cancelQueueTask(taskId: string, reason = "cancel requested"): Promise<QueueTaskRecord> {
return this.withTransaction(async (client) => {
const existing = await client.query("SELECT * FROM agentrun_queue_tasks WHERE id = $1 FOR UPDATE", [taskId]);
const row = existing.rows[0];
if (!row) throw new AgentRunError("schema-invalid", `queue task ${taskId} was not found`, { httpStatus: 404 });
const task = queueTaskFromRow(row);
if (isTerminalQueueTaskState(task.state)) return task;
const at = nowIso();
const version = await this.nextQueueVersion(client);
const updated = await client.query("UPDATE agentrun_queue_tasks SET state = 'cancelled', version = $2, updated_at = $3, cancelled_at = $3, cancel_reason = $4 WHERE id = $1 RETURNING *", [taskId, version, at, reason]);
return queueTaskFromRow(updated.rows[0]);
});
}
async markQueueTaskRead(taskId: string, readerId: string): Promise<QueueReadCursorRecord> {
return this.withTransaction(async (client) => {
const existing = await client.query("SELECT * FROM agentrun_queue_tasks WHERE id = $1 FOR UPDATE", [taskId]);
const row = existing.rows[0];
if (!row) throw new AgentRunError("schema-invalid", `queue task ${taskId} was not found`, { httpStatus: 404 });
const task = queueTaskFromRow(row);
const record: QueueReadCursorRecord = { taskId, readerId, taskVersion: task.version, readAt: nowIso() };
await client.query(
`INSERT INTO agentrun_queue_read_cursors (task_id, reader_id, task_version, read_at)
VALUES ($1, $2, $3, $4)
ON CONFLICT (task_id, reader_id) DO UPDATE SET task_version = EXCLUDED.task_version, read_at = EXCLUDED.read_at`,
[record.taskId, record.readerId, record.taskVersion, record.readAt],
);
return record;
});
}
async queueStats(queue?: string): Promise<QueueStats> {
return buildQueueStats(await this.loadQueueTasksForProjection(queue), queue ?? null);
}
async queueCommander(queue?: string): Promise<QueueCommanderSnapshot> {
const tasks = await this.loadQueueTasksForProjection(queue);
const generatedAt = nowIso();
return { queue: queue ?? null, stats: buildQueueStats(tasks, queue ?? null, generatedAt), items: tasks.sort(queueTaskSort).slice(0, 20), generatedAt };
}
async backends(): Promise<JsonRecord[]> {
const result = await this.pool.query("SELECT * FROM agentrun_backends ORDER BY profile ASC");
return result.rows.map((row) => {
const profile = stringValue(row.profile);
return { ...mergeBackendCapability(profile, jsonRecord(row.capabilities)), capacity: jsonValue(row.capacity), health: jsonValue(row.health), updatedAt: nullableIso(row.updated_at) };
});
}
async close(): Promise<void> {
await this.pool.end();
}
private async appendEventWithLockedRun(client: PoolClient, runId: string, type: EventType, payload: JsonRecord): Promise<RunEvent> {
const eventType = requireEventType(type);
const eventPayload = normalizeRunEventPayload(eventType, payload);
const seq = await this.nextSeq(client, "agentrun_events", runId);
const event: RunEvent = { id: newId("evt"), runId, seq, type: eventType, payload: redactJson(eventPayload), createdAt: nowIso() };
await client.query("INSERT INTO agentrun_events (id, run_id, seq, type, payload, created_at) VALUES ($1, $2, $3, $4, $5::jsonb, $6)", [event.id, event.runId, event.seq, event.type, JSON.stringify(event.payload), event.createdAt]);
return event;
}
private async nextSeq(client: PoolClient, table: "agentrun_commands" | "agentrun_events", runId: string): Promise<number> {
const result = await client.query<{ seq: number }>(`SELECT COALESCE(MAX(seq), 0) + 1 AS seq FROM ${table} WHERE run_id = $1`, [runId]);
return Number(result.rows[0]?.seq ?? 1);
}
private async nextQueueVersion(client: PoolClient): Promise<number> {
const result = await client.query<{ version: string | number }>("SELECT nextval('agentrun_queue_version_seq') AS version");
return Number(result.rows[0]?.version ?? 1);
}
private async loadQueueTasksForProjection(queue?: string): Promise<QueueTaskRecord[]> {
if (queue) {
const result = await this.pool.query("SELECT * FROM agentrun_queue_tasks WHERE queue = $1", [queue]);
return result.rows.map(queueTaskFromRow);
}
const result = await this.pool.query("SELECT * FROM agentrun_queue_tasks");
return result.rows.map(queueTaskFromRow);
}
private async requireRunForUpdate(client: PoolClient, runId: string): Promise<RunRecord> {
const result = await client.query("SELECT * FROM agentrun_runs WHERE id = $1 FOR UPDATE", [runId]);
const row = result.rows[0];
if (!row) throw new AgentRunError("schema-invalid", `run ${runId} was not found`, { httpStatus: 404 });
return runFromRow(row);
}
private async resolveSessionForRun(client: PoolClient, input: CreateRunInput, at: string): Promise<SessionRef | null> {
if (!input.sessionRef) return null;
const existing = await client.query("SELECT * FROM agentrun_sessions WHERE session_id = $1 FOR UPDATE", [input.sessionRef.sessionId]);
if (existing.rows[0]) {
const session = sessionFromRow(existing.rows[0]);
assertSessionBoundary(session, input);
return sessionRefFromRecord(session, input.sessionRef);
}
const record: SessionRecord = {
sessionId: input.sessionRef.sessionId,
tenantId: input.tenantId,
projectId: input.projectId,
backendProfile: input.backendProfile,
conversationId: input.sessionRef.conversationId ?? null,
threadId: input.sessionRef.threadId ?? null,
metadata: input.sessionRef.metadata ?? {},
createdAt: at,
updatedAt: at,
expiresAt: input.sessionRef.expiresAt ?? null,
};
await client.query(
`INSERT INTO agentrun_sessions (session_id, tenant_id, project_id, backend_profile, conversation_id, thread_id, metadata, created_at, updated_at, expires_at)
VALUES ($1, $2, $3, $4, $5, $6, $7::jsonb, $8, $9, $10)`,
[record.sessionId, record.tenantId, record.projectId, record.backendProfile, record.conversationId, record.threadId, JSON.stringify(record.metadata), record.createdAt, record.updatedAt, record.expiresAt],
);
return sessionRefFromRecord(record, input.sessionRef);
}
private async upsertSessionThread(client: PoolClient, run: RunRecord, threadId: string, turnId: string | null): Promise<void> {
if (!run.sessionRef?.sessionId) return;
const at = nowIso();
const existingResult = await client.query("SELECT * FROM agentrun_sessions WHERE session_id = $1 FOR UPDATE", [run.sessionRef.sessionId]);
const existing = existingResult.rows[0] ? sessionFromRow(existingResult.rows[0]) : null;
const metadata = { ...(existing?.metadata ?? {}), ...(run.sessionRef.metadata ?? {}), ...(turnId ? { lastTurnId: turnId } : {}) };
const record: SessionRecord = {
sessionId: run.sessionRef.sessionId,
tenantId: run.tenantId,
projectId: run.projectId,
backendProfile: run.backendProfile,
conversationId: run.sessionRef.conversationId ?? existing?.conversationId ?? null,
threadId,
metadata,
createdAt: existing?.createdAt ?? at,
updatedAt: at,
expiresAt: run.sessionRef.expiresAt ?? existing?.expiresAt ?? null,
};
await client.query(
`INSERT INTO agentrun_sessions (session_id, tenant_id, project_id, backend_profile, conversation_id, thread_id, metadata, created_at, updated_at, expires_at)
VALUES ($1, $2, $3, $4, $5, $6, $7::jsonb, $8, $9, $10)
ON CONFLICT (session_id) DO UPDATE SET
tenant_id = EXCLUDED.tenant_id,
project_id = EXCLUDED.project_id,
backend_profile = EXCLUDED.backend_profile,
conversation_id = EXCLUDED.conversation_id,
thread_id = EXCLUDED.thread_id,
metadata = EXCLUDED.metadata,
updated_at = EXCLUDED.updated_at,
expires_at = EXCLUDED.expires_at`,
[record.sessionId, record.tenantId, record.projectId, record.backendProfile, record.conversationId, record.threadId, JSON.stringify(record.metadata), record.createdAt, record.updatedAt, record.expiresAt],
);
const nextSessionRef = sessionRefFromRecord(record, run.sessionRef);
await client.query("UPDATE agentrun_runs SET session_ref = $2::jsonb, updated_at = $3 WHERE id = $1", [run.id, JSON.stringify(nextSessionRef), at]);
await this.appendEventWithLockedRun(client, run.id, "backend_status", { phase: "session-updated", sessionRef: summarizeSessionRef(nextSessionRef), turnId });
}
private async withTransaction<T>(fn: (client: PoolClient) => Promise<T>): Promise<T> {
const client = await this.pool.connect();
try {
await client.query("BEGIN");
const result = await fn(client);
await client.query("COMMIT");
return result;
} catch (error) {
await client.query("ROLLBACK");
throw error;
} finally {
client.release();
}
}
}
function checksumSql(sql: string): string {
return createHash("sha256").update(sql.trim()).digest("hex");
}
function latestMigrationId(): string {
return postgresMigrations[postgresMigrations.length - 1]?.id ?? "none";
}
function clamp(value: number, min: number, max: number): number {
return Math.max(min, Math.min(value, max));
}
function runFromRow(row: QueryResultRow): RunRecord {
return {
id: stringValue(row.id),
tenantId: stringValue(row.tenant_id),
projectId: stringValue(row.project_id),
workspaceRef: jsonRecord(row.workspace_ref) as RunRecord["workspaceRef"],
sessionRef: jsonValue(row.session_ref) as RunRecord["sessionRef"],
resourceBundleRef: jsonValue(row.resource_bundle_ref) as RunRecord["resourceBundleRef"],
providerId: stringValue(row.provider_id),
backendProfile: stringValue(row.backend_profile) as BackendProfile,
executionPolicy: jsonRecord(row.execution_policy) as RunRecord["executionPolicy"],
traceSink: jsonValue(row.trace_sink),
status: stringValue(row.status) as RunStatus,
terminalStatus: nullableString(row.terminal_status) as TerminalStatus | null,
failureKind: nullableString(row.failure_kind) as FailureKind | null,
failureMessage: nullableString(row.failure_message),
createdAt: iso(row.created_at),
updatedAt: iso(row.updated_at),
claimedBy: nullableString(row.claimed_by),
leaseExpiresAt: nullableIso(row.lease_expires_at),
};
}
function commandFromRow(row: QueryResultRow): CommandRecord {
return {
id: stringValue(row.id),
runId: stringValue(row.run_id),
seq: Number(row.seq),
type: stringValue(row.type) as CommandRecord["type"],
payload: jsonRecord(row.payload),
payloadHash: stringValue(row.payload_hash),
...(nullableString(row.idempotency_key) ? { idempotencyKey: stringValue(row.idempotency_key) } : {}),
state: stringValue(row.state) as CommandState,
createdAt: iso(row.created_at),
updatedAt: iso(row.updated_at),
acknowledgedAt: nullableIso(row.acknowledged_at),
};
}
function eventFromRow(row: QueryResultRow): RunEvent {
return { id: stringValue(row.id), runId: stringValue(row.run_id), seq: Number(row.seq), type: stringValue(row.type) as EventType, payload: jsonRecord(row.payload), createdAt: iso(row.created_at) };
}
function runnerFromRow(row: QueryResultRow): RunnerRecord {
return {
...jsonRecord(row.metadata),
id: stringValue(row.id),
...(nullableString(row.run_id) ? { runId: stringValue(row.run_id) } : {}),
...(nullableString(row.attempt_id) ? { attemptId: stringValue(row.attempt_id) } : {}),
...(nullableString(row.backend_profile) ? { backendProfile: stringValue(row.backend_profile) as BackendProfile } : {}),
...(nullableString(row.placement) ? { placement: stringValue(row.placement) } : {}),
...(nullableString(row.source_commit) ? { sourceCommit: stringValue(row.source_commit) } : {}),
registeredAt: iso(row.registered_at),
heartbeatAt: iso(row.heartbeat_at),
};
}
function sessionFromRow(row: QueryResultRow): SessionRecord {
return {
sessionId: stringValue(row.session_id),
tenantId: stringValue(row.tenant_id),
projectId: stringValue(row.project_id),
backendProfile: stringValue(row.backend_profile) as BackendProfile,
conversationId: nullableString(row.conversation_id),
threadId: nullableString(row.thread_id),
metadata: jsonRecord(row.metadata),
createdAt: iso(row.created_at),
updatedAt: iso(row.updated_at),
expiresAt: nullableIso(row.expires_at),
};
}
function runnerJobFromRow(row: QueryResultRow): RunnerJobRecord {
return {
id: stringValue(row.id),
runId: stringValue(row.run_id),
commandId: stringValue(row.command_id),
idempotencyKey: nullableString(row.idempotency_key),
payloadHash: stringValue(row.payload_hash),
attemptId: stringValue(row.attempt_id),
runnerId: stringValue(row.runner_id),
namespace: stringValue(row.namespace),
jobName: stringValue(row.job_name),
managerUrl: stringValue(row.manager_url),
image: stringValue(row.image),
sourceCommit: stringValue(row.source_commit),
serviceAccountName: nullableString(row.service_account_name),
result: jsonRecord(row.result),
createdAt: iso(row.created_at),
updatedAt: iso(row.updated_at),
};
}
function queueTaskFromRow(row: QueryResultRow): QueueTaskRecord {
return {
id: stringValue(row.id),
tenantId: stringValue(row.tenant_id),
projectId: stringValue(row.project_id),
queue: stringValue(row.queue),
lane: stringValue(row.lane),
title: stringValue(row.title),
priority: Number(row.priority),
state: stringValue(row.state) as QueueTaskState,
version: Number(row.version),
backendProfile: stringValue(row.backend_profile) as BackendProfile,
providerId: nullableString(row.provider_id),
workspaceRef: jsonValue(row.workspace_ref) as QueueTaskRecord["workspaceRef"],
executionPolicy: jsonValue(row.execution_policy) as QueueTaskRecord["executionPolicy"],
resourceBundleRef: jsonValue(row.resource_bundle_ref) as QueueTaskRecord["resourceBundleRef"],
payload: jsonRecord(row.payload),
payloadHash: stringValue(row.payload_hash),
references: jsonArray(row.references_json) as JsonRecord[],
metadata: jsonRecord(row.metadata),
...(nullableString(row.idempotency_key) ? { idempotencyKey: stringValue(row.idempotency_key) } : {}),
latestAttempt: jsonValue(row.latest_attempt) as QueueTaskRecord["latestAttempt"],
sessionPath: nullableString(row.session_path),
createdAt: iso(row.created_at),
updatedAt: iso(row.updated_at),
cancelledAt: nullableIso(row.cancelled_at),
cancelReason: nullableString(row.cancel_reason),
};
}
function metadataForRunner(runner: RunnerRecord): JsonRecord {
const { id: _id, runId: _runId, attemptId: _attemptId, backendProfile: _backendProfile, placement: _placement, sourceCommit: _sourceCommit, registeredAt: _registeredAt, heartbeatAt: _heartbeatAt, ...metadata } = runner;
return redactJson(metadata);
}
function stringValue(value: unknown): string {
return typeof value === "string" ? value : String(value ?? "");
}
function nullableString(value: unknown): string | null {
return value === null || value === undefined ? null : stringValue(value);
}
function jsonValue(value: unknown): JsonValue {
if (value === undefined) return null;
return value as JsonValue;
}
function jsonRecord(value: unknown): JsonRecord {
return typeof value === "object" && value !== null && !Array.isArray(value) ? value as JsonRecord : {};
}
function jsonArray(value: unknown): JsonValue[] {
return Array.isArray(value) ? value as JsonValue[] : [];
}
function iso(value: unknown): string {
if (value instanceof Date) return value.toISOString();
if (typeof value === "string") return new Date(value).toISOString();
return new Date(String(value)).toISOString();
}
function nullableIso(value: unknown): string | null {
return value === null || value === undefined ? null : iso(value);
}