From 2288cb1558b77afe381dd4b810fbee74de589165 Mon Sep 17 00:00:00 2001 From: Codex Date: Fri, 29 May 2026 11:41:20 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=9E=E7=8E=B0=20v0.1=20Postgres=20durable?= =?UTF-8?q?=20store=20=E9=AA=A8=E6=9E=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- deploy/container/Containerfile | 1 + docs/reference/spec-v01-agentrun-mgr.md | 6 +- docs/reference/spec-v01-postgres.md | 5 +- docs/reference/spec-v01-services.md | 2 +- package.json | 5 + scripts/src/cli.ts | 7 +- src/mgr/main.ts | 11 +- src/mgr/postgres-store.ts | 476 ++++++++++++++++++++++++ src/mgr/server.ts | 39 +- src/mgr/store.ts | 58 ++- src/selftest/run.ts | 35 +- 11 files changed, 601 insertions(+), 44 deletions(-) create mode 100644 src/mgr/postgres-store.ts diff --git a/deploy/container/Containerfile b/deploy/container/Containerfile index 3f9a612..29a66d0 100644 --- a/deploy/container/Containerfile +++ b/deploy/container/Containerfile @@ -5,6 +5,7 @@ ENV NODE_ENV=production ENV PORT=8080 COPY package.json tsconfig.json ./ +RUN bun install --production COPY scripts ./scripts COPY src ./src COPY deploy/deploy.json ./deploy/deploy.json diff --git a/docs/reference/spec-v01-agentrun-mgr.md b/docs/reference/spec-v01-agentrun-mgr.md index b99c65a..2b82220 100644 --- a/docs/reference/spec-v01-agentrun-mgr.md +++ b/docs/reference/spec-v01-agentrun-mgr.md @@ -108,7 +108,7 @@ POST /api/v1/commands/:commandId/ack | 规格项 | 状态 | 说明 | | --- | --- | --- | | `agentrun-mgr` 服务规格 | 已定义 | 本文为 v0.1 manager 权威。 | -| Manager REST API | 未实现 | 需要后续代码实现。 | +| Manager REST API | 已实现骨架 | 已有 run、command、event、runner register、claim、lease heartbeat、status、ack、backends 和 health/readiness 的 HTTP JSON 骨架;仍需后续真实部署验收。 | | Tenant policy boundary | 已定义/未实现 | v0.1 只做最小 schema/allowlist/secretScope 边界。 | -| Postgres durable adapter | 未实现 | 见 [spec-v01-postgres.md](spec-v01-postgres.md)。 | -| Observability 最小合同 | 已定义/未实现 | event、terminal status、failureKind 和 redaction 需要代码实现。 | +| Postgres durable adapter | 已实现骨架 | live runtime 通过 `DATABASE_URL` 使用 Postgres durable store;memory store 仅用于显式 self-test/dev。见 [spec-v01-postgres.md](spec-v01-postgres.md)。 | +| Observability 最小合同 | 已实现骨架 | events append-only、terminal status、failureKind、health/readiness store 状态和 Secret/DSN redaction 已进入 manager 骨架;集中 trace 和部署级观测仍属后续工作。 | diff --git a/docs/reference/spec-v01-postgres.md b/docs/reference/spec-v01-postgres.md index da0037b..9c7fdca 100644 --- a/docs/reference/spec-v01-postgres.md +++ b/docs/reference/spec-v01-postgres.md @@ -74,6 +74,7 @@ Secret 名称和 key 可以在实现时按 Kubernetes 命名限制微调,但 | --- | --- | --- | | Postgres durable store 规格 | 已定义 | 本文为 v0.1 存储权威。 | | StatefulSet/Service/PVC | 未实现 | 需要后续 GitOps lane 初始化。 | -| migration ledger | 未实现 | 需要后续代码和 schema migration。 | -| manager Postgres adapter | 未实现 | 需要后续 `agentrun-mgr` 实现。 | +| migration ledger | 已实现骨架 | `agentrun-mgr` 启动 Postgres adapter 时幂等创建 `agentrun_schema_migrations` 并记录 migration id/checksum;live DB 迁移验收仍依赖后续 GitOps lane 初始化。 | +| manager Postgres adapter | 已实现骨架 | `agentrun-mgr` 通过 `DATABASE_URL` 启用 Postgres adapter,持久化 runs、commands、events、runners、backends 和 leases;缺少 `DATABASE_URL` 时 live runtime fail fast,memory 只允许显式 self-test/dev。 | +| health/readiness store 状态 | 已实现骨架 | health/readiness 返回 adapter、reachable、migrationReady、migrationId、failureKind 和 redacted Secret 状态,不输出 DSN 明文。 | | file/sqlite durable store | 不采用 | 只可用于临时本地测试,不作为 v0.1 runtime truth。 | diff --git a/docs/reference/spec-v01-services.md b/docs/reference/spec-v01-services.md index 9afd9dd..c3438cc 100644 --- a/docs/reference/spec-v01-services.md +++ b/docs/reference/spec-v01-services.md @@ -154,7 +154,7 @@ Manager 负责校验、保存和返回这些字段;runner 只能消费已保 | Codex backend 规格 | 已定义 | 见 [spec-v01-backend-codex.md](spec-v01-backend-codex.md)。 | | AgentRun CLI 规格 | 已定义 | 见 [spec-v01-cli.md](spec-v01-cli.md)。 | | Scheduler deferred 规格 | 已定义 | 见 [spec-v01-scheduler.md](spec-v01-scheduler.md)。 | -| `agentrun-mgr` 实现 | 未实现 | 需后续代码实现。 | +| `agentrun-mgr` 实现 | 已实现骨架 | 已有 REST API、Postgres durable store 选择、migration ledger、health/readiness 和 self-test memory 模式骨架;仍需 G14 `agentrun-v01` 真实 Postgres/GitOps 验收。 | | `agentrun-runner` 实现 | 未实现 | 需后续代码实现。 | | 第一真实 backend | 未实现 | 默认候选 Codex。 | | 自动 scheduler | Deferred | 不作为 `v0.1` 第一阶段验收目标。 | diff --git a/package.json b/package.json index ea5c1f2..32cb657 100644 --- a/package.json +++ b/package.json @@ -9,8 +9,13 @@ "test": "bun run src/selftest/run.ts", "cli": "bun scripts/agentrun-cli.ts" }, + "dependencies": { + "pg": "^8.13.1" + }, "devDependencies": { + "@types/pg": "^8.11.10", "@types/node": "^22.10.0", + "tsx": "^4.19.2", "typescript": "^5.8.3" } } diff --git a/scripts/src/cli.ts b/scripts/src/cli.ts index 169838c..0db43d4 100644 --- a/scripts/src/cli.ts +++ b/scripts/src/cli.ts @@ -1,5 +1,6 @@ import { readFile } from "node:fs/promises"; import { startManagerServer } from "../../src/mgr/server.js"; +import { MemoryAgentRunStore } from "../../src/mgr/store.js"; import { ManagerClient } from "../../src/mgr/client.js"; import { runOnce } from "../../src/runner/run-once.js"; import type { JsonRecord, JsonValue } from "../../src/common/types.js"; @@ -64,8 +65,10 @@ async function dispatch(args: ParsedArgs): Promise { async function startServer(args: ParsedArgs): Promise { const port = Number(flag(args, "port", "8080")); const host = flag(args, "host", "0.0.0.0"); - const started = await startManagerServer({ port, host }); - return { serviceId: "agentrun-mgr", baseUrl: started.baseUrl, pid: process.pid, note: "foreground process; use Kubernetes/Tekton for v0.1 runtime" }; + const storeMode = optionalFlag(args, "store") ?? process.env.AGENTRUN_STORE ?? process.env.AGENTRUN_MGR_STORE; + const started = await startManagerServer({ port, host, ...(storeMode === "memory" ? { store: new MemoryAgentRunStore() } : {}) }); + const database = await started.store.health(); + return { serviceId: "agentrun-mgr", baseUrl: started.baseUrl, pid: process.pid, database, note: "foreground process; use Kubernetes/Tekton for v0.1 runtime" }; } function client(args: ParsedArgs): ManagerClient { diff --git a/src/mgr/main.ts b/src/mgr/main.ts index 876ad1c..775016f 100644 --- a/src/mgr/main.ts +++ b/src/mgr/main.ts @@ -1,6 +1,13 @@ import { startManagerServer } from "./server.js"; +import { errorToJson } from "../common/errors.js"; const port = Number(process.env.PORT ?? process.env.AGENTRUN_MGR_PORT ?? "8080"); const host = process.env.HOST ?? "0.0.0.0"; -const started = await startManagerServer({ port, host }); -console.log(JSON.stringify({ ok: true, serviceId: "agentrun-mgr", baseUrl: started.baseUrl })); +try { + const started = await startManagerServer({ port, host }); + const database = await started.store.health(); + console.log(JSON.stringify({ ok: true, serviceId: "agentrun-mgr", baseUrl: started.baseUrl, database })); +} catch (error) { + console.error(JSON.stringify({ ok: false, serviceId: "agentrun-mgr", failureKind: "infra-failed", message: error instanceof Error ? error.message : String(error), error: errorToJson(error) })); + process.exit(1); +} diff --git a/src/mgr/postgres-store.ts b/src/mgr/postgres-store.ts new file mode 100644 index 0000000..8c5d815 --- /dev/null +++ b/src/mgr/postgres-store.ts @@ -0,0 +1,476 @@ +import { createHash } from "node:crypto"; +import { Pool } from "pg"; +import type { PoolClient, QueryResultRow } from "pg"; +import { AgentRunError } from "../common/errors.js"; +import { redactJson } from "../common/redaction.js"; +import type { BackendProfile, BackendTurnResult, CommandRecord, CommandState, CreateCommandInput, CreateRunInput, EventType, FailureKind, JsonRecord, JsonValue, RunEvent, RunnerRecord, RunRecord, RunStatus, TerminalStatus } from "../common/types.js"; +import { newId, nowIso, stableHash } from "../common/validation.js"; +import type { AgentRunStore, StoreHealth } from "./store.js"; +import { statusFromTerminal } from "./store.js"; + +interface PostgresStoreOptions { + connectionString: string; +} + +interface MigrationDefinition { + id: string; + checksum: string; + sql: string; +} + +const initialMigrationSql = ` +CREATE TABLE IF NOT EXISTS agentrun_runs ( + id text PRIMARY KEY, + tenant_id text NOT NULL, + project_id text NOT NULL, + workspace_ref jsonb NOT NULL, + provider_id text NOT NULL, + backend_profile text NOT NULL, + execution_policy jsonb NOT NULL, + trace_sink jsonb, + status text NOT NULL, + terminal_status text, + failure_kind text, + failure_message text, + created_at timestamptz NOT NULL, + updated_at timestamptz NOT NULL, + claimed_by text, + lease_expires_at timestamptz +); + +CREATE TABLE IF NOT EXISTS agentrun_commands ( + id text PRIMARY KEY, + run_id text NOT NULL REFERENCES agentrun_runs(id) ON DELETE CASCADE, + seq integer NOT NULL, + type text NOT NULL, + payload jsonb NOT NULL, + payload_hash text NOT NULL, + idempotency_key text, + state text NOT NULL, + created_at timestamptz NOT NULL, + updated_at timestamptz NOT NULL, + acknowledged_at timestamptz, + UNIQUE (run_id, seq) +); + +CREATE UNIQUE INDEX IF NOT EXISTS agentrun_commands_run_idempotency_key_idx + ON agentrun_commands (run_id, idempotency_key) + WHERE idempotency_key IS NOT NULL; + +CREATE TABLE IF NOT EXISTS agentrun_events ( + id text PRIMARY KEY, + run_id text NOT NULL REFERENCES agentrun_runs(id) ON DELETE CASCADE, + seq integer NOT NULL, + type text NOT NULL, + payload jsonb NOT NULL, + created_at timestamptz NOT NULL, + UNIQUE (run_id, seq) +); + +CREATE TABLE IF NOT EXISTS agentrun_runners ( + id text PRIMARY KEY, + run_id text, + attempt_id text, + backend_profile text, + placement text, + source_commit text, + metadata jsonb NOT NULL DEFAULT '{}'::jsonb, + registered_at timestamptz NOT NULL, + heartbeat_at timestamptz NOT NULL +); + +CREATE TABLE IF NOT EXISTS agentrun_backends ( + profile text PRIMARY KEY, + capabilities jsonb NOT NULL, + capacity jsonb NOT NULL, + health jsonb NOT NULL, + updated_at timestamptz NOT NULL +); + +CREATE TABLE IF NOT EXISTS agentrun_leases ( + run_id text PRIMARY KEY REFERENCES agentrun_runs(id) ON DELETE CASCADE, + runner_id text NOT NULL, + lease_expires_at timestamptz NOT NULL, + stale_recovery_marker jsonb, + updated_at timestamptz NOT NULL +); + +CREATE INDEX IF NOT EXISTS agentrun_runs_status_idx ON agentrun_runs (status, updated_at); +CREATE INDEX IF NOT EXISTS agentrun_events_run_seq_idx ON agentrun_events (run_id, seq); +CREATE INDEX IF NOT EXISTS agentrun_commands_run_seq_idx ON agentrun_commands (run_id, seq); +CREATE INDEX IF NOT EXISTS agentrun_leases_expiry_idx ON agentrun_leases (lease_expires_at); + +INSERT INTO agentrun_backends (profile, capabilities, capacity, health, updated_at) +VALUES ( + 'codex', + '{"protocol":"codex-app-server-jsonrpc-stdio","transport":"stdio","command":"codex app-server --listen stdio://"}'::jsonb, + '{"mode":"manual-runner-v0.1"}'::jsonb, + '{"status":"registered"}'::jsonb, + now() +) +ON CONFLICT (profile) DO UPDATE SET + capabilities = EXCLUDED.capabilities, + capacity = EXCLUDED.capacity, + health = EXCLUDED.health, + updated_at = EXCLUDED.updated_at; +`; + +const postgresMigrations: MigrationDefinition[] = [ + { + id: "001_v01_initial_durable_store", + checksum: checksumSql(initialMigrationSql), + sql: initialMigrationSql, + }, +]; + +export function postgresMigrationContract(): JsonRecord { + return { + migrationIds: postgresMigrations.map((migration) => migration.id), + latestMigrationId: latestMigrationId(), + requiredTables: ["agentrun_schema_migrations", "agentrun_runs", "agentrun_commands", "agentrun_events", "agentrun_runners", "agentrun_backends", "agentrun_leases"], + checksums: Object.fromEntries(postgresMigrations.map((migration) => [migration.id, migration.checksum])), + }; +} + +export async function createPostgresAgentRunStore(options: PostgresStoreOptions): Promise { + const store = new PostgresAgentRunStore(options); + await store.migrate(); + return store; +} + +export class PostgresAgentRunStore implements AgentRunStore { + private readonly pool: Pool; + private migrationReady = false; + private appliedMigrationId: string | null = null; + + constructor(options: PostgresStoreOptions) { + this.pool = new Pool({ connectionString: options.connectionString, application_name: "agentrun-mgr-v01" }); + } + + async migrate(): Promise { + await this.withTransaction(async (client) => { + await client.query(` +CREATE TABLE IF NOT EXISTS agentrun_schema_migrations ( + id text PRIMARY KEY, + checksum text NOT NULL, + applied_at timestamptz NOT NULL DEFAULT now() +) +`); + for (const migration of postgresMigrations) { + const existing = await client.query<{ checksum: string }>("SELECT checksum FROM agentrun_schema_migrations WHERE id = $1 FOR UPDATE", [migration.id]); + if (existing.rowCount && existing.rows[0]?.checksum !== migration.checksum) { + throw new AgentRunError("infra-failed", `schema migration checksum mismatch for ${migration.id}`, { httpStatus: 503, details: { migrationId: migration.id } }); + } + if (!existing.rowCount) { + await client.query(migration.sql); + await client.query("INSERT INTO agentrun_schema_migrations (id, checksum) VALUES ($1, $2)", [migration.id, migration.checksum]); + } + } + }); + this.migrationReady = true; + this.appliedMigrationId = latestMigrationId(); + } + + async health(): Promise { + try { + await this.pool.query("SELECT 1"); + const result = await this.pool.query<{ id: string }>("SELECT id FROM agentrun_schema_migrations ORDER BY applied_at DESC, id DESC LIMIT 1"); + const migrationId = result.rows[0]?.id ?? null; + const migrationReady = this.migrationReady && migrationId === latestMigrationId(); + return { adapter: "postgres", ready: migrationReady, reachable: true, migrationReady, migrationId, failureKind: migrationReady ? null : "infra-failed", message: migrationReady ? null : "schema migration is not ready", credentialValuesPrinted: false }; + } catch (error) { + return { adapter: "postgres", ready: false, reachable: false, migrationReady: false, migrationId: this.appliedMigrationId, failureKind: "infra-failed", message: error instanceof Error ? error.message : String(error), credentialValuesPrinted: false }; + } + } + + async createRun(input: CreateRunInput): Promise { + const at = nowIso(); + const run: RunRecord = { ...input, id: newId("run"), status: "pending", terminalStatus: null, failureKind: null, failureMessage: null, createdAt: at, updatedAt: at, claimedBy: null, leaseExpiresAt: null }; + return this.withTransaction(async (client) => { + await client.query( + `INSERT INTO agentrun_runs (id, tenant_id, project_id, workspace_ref, provider_id, backend_profile, execution_policy, trace_sink, status, terminal_status, failure_kind, failure_message, created_at, updated_at, claimed_by, lease_expires_at) + VALUES ($1, $2, $3, $4::jsonb, $5, $6, $7::jsonb, $8::jsonb, $9, $10, $11, $12, $13, $14, $15, $16)`, + [run.id, run.tenantId, run.projectId, JSON.stringify(run.workspaceRef), run.providerId, run.backendProfile, JSON.stringify(run.executionPolicy), JSON.stringify(run.traceSink), run.status, run.terminalStatus, run.failureKind, run.failureMessage, run.createdAt, run.updatedAt, run.claimedBy, run.leaseExpiresAt], + ); + await this.appendEventWithLockedRun(client, run.id, "backend_status", { phase: "run-created", backendProfile: run.backendProfile }); + return run; + }); + } + + async getRun(runId: string): Promise { + const result = await this.pool.query("SELECT * FROM agentrun_runs WHERE id = $1", [runId]); + const row = result.rows[0]; + if (!row) throw new AgentRunError("schema-invalid", `run ${runId} was not found`, { httpStatus: 404 }); + return runFromRow(row); + } + + async listEvents(runId: string, afterSeq: number, limit: number): Promise { + await this.getRun(runId); + const result = await this.pool.query("SELECT * FROM agentrun_events WHERE run_id = $1 AND seq > $2 ORDER BY seq ASC LIMIT $3", [runId, afterSeq, clamp(limit, 1, 500)]); + return result.rows.map(eventFromRow); + } + + async createCommand(runId: string, input: CreateCommandInput): Promise { + const payloadHash = stableHash(input.payload); + return this.withTransaction(async (client) => { + await this.requireRunForUpdate(client, runId); + if (input.idempotencyKey) { + const existing = await client.query("SELECT * FROM agentrun_commands WHERE run_id = $1 AND idempotency_key = $2", [runId, input.idempotencyKey]); + if (existing.rows[0]) { + const command = commandFromRow(existing.rows[0]); + if (command.payloadHash !== payloadHash) throw new AgentRunError("schema-invalid", "idempotency key reused with different payload", { httpStatus: 409 }); + return command; + } + } + const seq = await this.nextSeq(client, "agentrun_commands", runId); + const at = nowIso(); + const command: CommandRecord = { ...input, id: newId("cmd"), runId, seq, state: "pending", payloadHash, createdAt: at, updatedAt: at, acknowledgedAt: null }; + await client.query( + `INSERT INTO agentrun_commands (id, run_id, seq, type, payload, payload_hash, idempotency_key, state, created_at, updated_at, acknowledged_at) + VALUES ($1, $2, $3, $4, $5::jsonb, $6, $7, $8, $9, $10, $11)`, + [command.id, command.runId, command.seq, command.type, JSON.stringify(command.payload), command.payloadHash, command.idempotencyKey ?? null, command.state, command.createdAt, command.updatedAt, command.acknowledgedAt], + ); + await this.appendEventWithLockedRun(client, runId, "backend_status", { phase: "command-created", commandId: command.id, commandType: command.type }); + return command; + }); + } + + async getCommand(commandId: string): Promise { + const result = await this.pool.query("SELECT * FROM agentrun_commands WHERE id = $1", [commandId]); + const row = result.rows[0]; + if (!row) throw new AgentRunError("schema-invalid", `command ${commandId} was not found`, { httpStatus: 404 }); + return commandFromRow(row); + } + + async listCommands(runId: string, afterSeq: number, limit: number): Promise { + await this.getRun(runId); + const result = await this.pool.query("SELECT * FROM agentrun_commands WHERE run_id = $1 AND seq > $2 ORDER BY seq ASC LIMIT $3", [runId, afterSeq, clamp(limit, 1, 100)]); + return result.rows.map(commandFromRow); + } + + async registerRunner(input: Partial): Promise { + const at = nowIso(); + const runner: RunnerRecord = { id: input.id ?? newId("runner"), registeredAt: at, heartbeatAt: at, ...input }; + const metadata = metadataForRunner(runner); + const result = await this.pool.query( + `INSERT INTO agentrun_runners (id, run_id, attempt_id, backend_profile, placement, source_commit, metadata, registered_at, heartbeat_at) + VALUES ($1, $2, $3, $4, $5, $6, $7::jsonb, $8, $9) + ON CONFLICT (id) DO UPDATE SET + run_id = EXCLUDED.run_id, + attempt_id = EXCLUDED.attempt_id, + backend_profile = EXCLUDED.backend_profile, + placement = EXCLUDED.placement, + source_commit = EXCLUDED.source_commit, + metadata = EXCLUDED.metadata, + heartbeat_at = EXCLUDED.heartbeat_at + RETURNING *`, + [runner.id, runner.runId ?? null, runner.attemptId ?? null, runner.backendProfile ?? null, runner.placement ?? null, runner.sourceCommit ?? null, JSON.stringify(metadata), runner.registeredAt, runner.heartbeatAt], + ); + return runnerFromRow(result.rows[0]); + } + + async claimRun(runId: string, runnerId: string, leaseMs: number): Promise { + return this.withTransaction(async (client) => { + const run = await this.requireRunForUpdate(client, runId); + if (run.claimedBy && run.claimedBy !== runnerId && !isTerminalStatus(run.status)) throw new AgentRunError("runner-lease-conflict", `run ${runId} is already claimed`, { httpStatus: 409 }); + const leaseExpiresAt = new Date(Date.now() + leaseMs).toISOString(); + const updated = await client.query( + `UPDATE agentrun_runs SET status = $2, claimed_by = $3, lease_expires_at = $4, updated_at = $5 WHERE id = $1 RETURNING *`, + [runId, "claimed", runnerId, leaseExpiresAt, nowIso()], + ); + await client.query( + `INSERT INTO agentrun_leases (run_id, runner_id, lease_expires_at, stale_recovery_marker, updated_at) + VALUES ($1, $2, $3, $4::jsonb, $5) + ON CONFLICT (run_id) DO UPDATE SET runner_id = EXCLUDED.runner_id, lease_expires_at = EXCLUDED.lease_expires_at, updated_at = EXCLUDED.updated_at`, + [runId, runnerId, leaseExpiresAt, null, nowIso()], + ); + await this.appendEventWithLockedRun(client, runId, "backend_status", { phase: "run-claimed", runnerId }); + return runFromRow(updated.rows[0]); + }); + } + + async heartbeat(runId: string, runnerId: string, leaseMs: number): Promise { + return this.withTransaction(async (client) => { + const run = await this.requireRunForUpdate(client, runId); + if (run.claimedBy !== runnerId) throw new AgentRunError("runner-lease-conflict", `run ${runId} is not claimed by ${runnerId}`, { httpStatus: 409 }); + const leaseExpiresAt = new Date(Date.now() + leaseMs).toISOString(); + const updated = await client.query("UPDATE agentrun_runs SET lease_expires_at = $2, updated_at = $3 WHERE id = $1 RETURNING *", [runId, leaseExpiresAt, nowIso()]); + await client.query("UPDATE agentrun_runners SET heartbeat_at = $2 WHERE id = $1", [runnerId, nowIso()]); + await client.query("UPDATE agentrun_leases SET lease_expires_at = $2, updated_at = $3 WHERE run_id = $1", [runId, leaseExpiresAt, nowIso()]); + return runFromRow(updated.rows[0]); + }); + } + + async ackCommand(commandId: string): Promise { + const result = await this.pool.query("UPDATE agentrun_commands SET state = $2, acknowledged_at = $3, updated_at = $3 WHERE id = $1 RETURNING *", [commandId, "acknowledged", nowIso()]); + const row = result.rows[0]; + if (!row) throw new AgentRunError("schema-invalid", `command ${commandId} was not found`, { httpStatus: 404 }); + return commandFromRow(row); + } + + async appendEvent(runId: string, type: EventType, payload: JsonRecord): Promise { + return this.withTransaction(async (client) => { + await this.requireRunForUpdate(client, runId); + return this.appendEventWithLockedRun(client, runId, type, payload); + }); + } + + async finishRun(runId: string, result: Pick): Promise { + return this.withTransaction(async (client) => { + await this.requireRunForUpdate(client, runId); + const status = statusFromTerminal(result.terminalStatus); + const updated = await client.query( + `UPDATE agentrun_runs SET status = $2, terminal_status = $3, failure_kind = $4, failure_message = $5, updated_at = $6 WHERE id = $1 RETURNING *`, + [runId, status, result.terminalStatus, result.failureKind, result.failureMessage, nowIso()], + ); + await this.appendEventWithLockedRun(client, runId, "terminal_status", { terminalStatus: result.terminalStatus, failureKind: result.failureKind, message: result.failureMessage }); + return runFromRow(updated.rows[0]); + }); + } + + async backends(): Promise { + const result = await this.pool.query("SELECT * FROM agentrun_backends ORDER BY profile ASC"); + return result.rows.map((row) => ({ profile: stringValue(row.profile), ...jsonRecord(row.capabilities), capacity: jsonValue(row.capacity), health: jsonValue(row.health), updatedAt: nullableIso(row.updated_at) })); + } + + async close(): Promise { + await this.pool.end(); + } + + private async appendEventWithLockedRun(client: PoolClient, runId: string, type: EventType, payload: JsonRecord): Promise { + const seq = await this.nextSeq(client, "agentrun_events", runId); + const event: RunEvent = { id: newId("evt"), runId, seq, type, payload: redactJson(payload), createdAt: nowIso() }; + await client.query("INSERT INTO agentrun_events (id, run_id, seq, type, payload, created_at) VALUES ($1, $2, $3, $4, $5::jsonb, $6)", [event.id, event.runId, event.seq, event.type, JSON.stringify(event.payload), event.createdAt]); + return event; + } + + private async nextSeq(client: PoolClient, table: "agentrun_commands" | "agentrun_events", runId: string): Promise { + const result = await client.query<{ seq: number }>(`SELECT COALESCE(MAX(seq), 0) + 1 AS seq FROM ${table} WHERE run_id = $1`, [runId]); + return Number(result.rows[0]?.seq ?? 1); + } + + private async requireRunForUpdate(client: PoolClient, runId: string): Promise { + const result = await client.query("SELECT * FROM agentrun_runs WHERE id = $1 FOR UPDATE", [runId]); + const row = result.rows[0]; + if (!row) throw new AgentRunError("schema-invalid", `run ${runId} was not found`, { httpStatus: 404 }); + return runFromRow(row); + } + + private async withTransaction(fn: (client: PoolClient) => Promise): Promise { + const client = await this.pool.connect(); + try { + await client.query("BEGIN"); + const result = await fn(client); + await client.query("COMMIT"); + return result; + } catch (error) { + await client.query("ROLLBACK"); + throw error; + } finally { + client.release(); + } + } +} + +function checksumSql(sql: string): string { + return createHash("sha256").update(sql.trim()).digest("hex"); +} + +function latestMigrationId(): string { + return postgresMigrations[postgresMigrations.length - 1]?.id ?? "none"; +} + +function clamp(value: number, min: number, max: number): number { + return Math.max(min, Math.min(value, max)); +} + +function runFromRow(row: QueryResultRow): RunRecord { + return { + id: stringValue(row.id), + tenantId: stringValue(row.tenant_id), + projectId: stringValue(row.project_id), + workspaceRef: jsonRecord(row.workspace_ref) as RunRecord["workspaceRef"], + providerId: stringValue(row.provider_id), + backendProfile: stringValue(row.backend_profile) as BackendProfile, + executionPolicy: jsonRecord(row.execution_policy) as RunRecord["executionPolicy"], + traceSink: jsonValue(row.trace_sink), + status: stringValue(row.status) as RunStatus, + terminalStatus: nullableString(row.terminal_status) as TerminalStatus | null, + failureKind: nullableString(row.failure_kind) as FailureKind | null, + failureMessage: nullableString(row.failure_message), + createdAt: iso(row.created_at), + updatedAt: iso(row.updated_at), + claimedBy: nullableString(row.claimed_by), + leaseExpiresAt: nullableIso(row.lease_expires_at), + }; +} + +function commandFromRow(row: QueryResultRow): CommandRecord { + return { + id: stringValue(row.id), + runId: stringValue(row.run_id), + seq: Number(row.seq), + type: stringValue(row.type) as CommandRecord["type"], + payload: jsonRecord(row.payload), + payloadHash: stringValue(row.payload_hash), + ...(nullableString(row.idempotency_key) ? { idempotencyKey: stringValue(row.idempotency_key) } : {}), + state: stringValue(row.state) as CommandState, + createdAt: iso(row.created_at), + updatedAt: iso(row.updated_at), + acknowledgedAt: nullableIso(row.acknowledged_at), + }; +} + +function eventFromRow(row: QueryResultRow): RunEvent { + return { id: stringValue(row.id), runId: stringValue(row.run_id), seq: Number(row.seq), type: stringValue(row.type) as EventType, payload: jsonRecord(row.payload), createdAt: iso(row.created_at) }; +} + +function runnerFromRow(row: QueryResultRow): RunnerRecord { + return { + ...jsonRecord(row.metadata), + id: stringValue(row.id), + ...(nullableString(row.run_id) ? { runId: stringValue(row.run_id) } : {}), + ...(nullableString(row.attempt_id) ? { attemptId: stringValue(row.attempt_id) } : {}), + ...(nullableString(row.backend_profile) ? { backendProfile: stringValue(row.backend_profile) as BackendProfile } : {}), + ...(nullableString(row.placement) ? { placement: stringValue(row.placement) } : {}), + ...(nullableString(row.source_commit) ? { sourceCommit: stringValue(row.source_commit) } : {}), + registeredAt: iso(row.registered_at), + heartbeatAt: iso(row.heartbeat_at), + }; +} + +function metadataForRunner(runner: RunnerRecord): JsonRecord { + const { id: _id, runId: _runId, attemptId: _attemptId, backendProfile: _backendProfile, placement: _placement, sourceCommit: _sourceCommit, registeredAt: _registeredAt, heartbeatAt: _heartbeatAt, ...metadata } = runner; + return redactJson(metadata); +} + +function isTerminalStatus(status: RunRecord["status"]): boolean { + return status === "completed" || status === "failed" || status === "cancelled"; +} + +function stringValue(value: unknown): string { + return typeof value === "string" ? value : String(value ?? ""); +} + +function nullableString(value: unknown): string | null { + return value === null || value === undefined ? null : stringValue(value); +} + +function jsonValue(value: unknown): JsonValue { + if (value === undefined) return null; + return value as JsonValue; +} + +function jsonRecord(value: unknown): JsonRecord { + return typeof value === "object" && value !== null && !Array.isArray(value) ? value as JsonRecord : {}; +} + +function iso(value: unknown): string { + if (value instanceof Date) return value.toISOString(); + if (typeof value === "string") return new Date(value).toISOString(); + return new Date(String(value)).toISOString(); +} + +function nullableIso(value: unknown): string | null { + return value === null || value === undefined ? null : iso(value); +} diff --git a/src/mgr/server.ts b/src/mgr/server.ts index 05845f8..944a037 100644 --- a/src/mgr/server.ts +++ b/src/mgr/server.ts @@ -2,7 +2,7 @@ import type { Server } from "node:http"; import { createServer } from "node:http"; import type { AddressInfo } from "node:net"; import type { AgentRunStore } from "./store.js"; -import { MemoryAgentRunStore } from "./store.js"; +import { openAgentRunStoreFromEnv } from "./store.js"; import { AgentRunError, errorToJson } from "../common/errors.js"; import { asRecord, validateCreateCommand, validateCreateRun } from "../common/validation.js"; import type { ApiErrorBody, ApiOkBody, JsonRecord, JsonValue, RunEvent } from "../common/types.js"; @@ -21,7 +21,7 @@ export interface StartedManagerServer { } export async function startManagerServer(options: ManagerServerOptions = {}): Promise { - const store = options.store ?? new MemoryAgentRunStore(); + const store = options.store ?? await openAgentRunStoreFromEnv(); const sourceCommit = options.sourceCommit ?? process.env.AGENTRUN_SOURCE_COMMIT ?? "unknown"; const server = createServer(async (req, res) => { const traceId = `trc_${Date.now().toString(36)}_${Math.random().toString(36).slice(2, 8)}`; @@ -52,45 +52,54 @@ async function readBody(req: import("node:http").IncomingMessage): Promise { const path = url.pathname; if (method === "GET" && (path === "/health" || path === "/health/live" || path === "/health/readiness")) { - return { serviceId: "agentrun-mgr", live: true, ready: true, database: { adapter: "memory-self-test", migrationReady: true }, sourceCommit, secretRefs: { valuesPrinted: false } }; + const database = await store.health(); + const ready = path === "/health/live" ? true : database.ready; + return { serviceId: "agentrun-mgr", live: true, ready, database, sourceCommit, secretRefs: { databaseUrl: database.adapter === "postgres" ? "redacted" : "not-used", valuesPrinted: false } }; } - if (method === "GET" && path === "/api/v1/backends") return { items: store.backends() }; - if (method === "POST" && path === "/api/v1/runs") return store.createRun(validateCreateRun(body)) as unknown as JsonValue; + if (method === "GET" && path === "/api/v1/backends") return { items: await store.backends() as unknown as JsonValue }; + if (method === "POST" && path === "/api/v1/runs") return await store.createRun(validateCreateRun(body)) as unknown as JsonValue; const runMatch = path.match(/^\/api\/v1\/runs\/([^/]+)$/u); - if (method === "GET" && runMatch) return store.getRun(runMatch[1] ?? "") as unknown as JsonValue; + if (method === "GET" && runMatch) return await store.getRun(runMatch[1] ?? "") as unknown as JsonValue; const eventMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/events$/u); if (method === "GET" && eventMatch) { const afterSeq = integerQuery(url, "afterSeq", 0); const limit = integerQuery(url, "limit", 100); - return { items: store.listEvents(eventMatch[1] ?? "", afterSeq, limit) as unknown as JsonValue }; + return { items: await store.listEvents(eventMatch[1] ?? "", afterSeq, limit) as unknown as JsonValue }; } const commandCreateMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/commands$/u); - if (method === "POST" && commandCreateMatch) return store.createCommand(commandCreateMatch[1] ?? "", validateCreateCommand(body)) as unknown as JsonValue; - if (method === "GET" && commandCreateMatch) return { items: store.listCommands(commandCreateMatch[1] ?? "", integerQuery(url, "afterSeq", 0), integerQuery(url, "limit", 20)) as unknown as JsonValue }; + if (method === "POST" && commandCreateMatch) return await store.createCommand(commandCreateMatch[1] ?? "", validateCreateCommand(body)) as unknown as JsonValue; + if (method === "GET" && commandCreateMatch) return { items: await store.listCommands(commandCreateMatch[1] ?? "", integerQuery(url, "afterSeq", 0), integerQuery(url, "limit", 20)) as unknown as JsonValue }; const commandShowMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/commands\/([^/]+)$/u); - if (method === "GET" && commandShowMatch) return store.getCommand(commandShowMatch[2] ?? "") as unknown as JsonValue; - if (method === "POST" && path === "/api/v1/runners/register") return store.registerRunner(asRecord(body ?? {}, "runner")) as unknown as JsonValue; + if (method === "GET" && commandShowMatch) return await store.getCommand(commandShowMatch[2] ?? "") as unknown as JsonValue; + if (method === "POST" && path === "/api/v1/runners/register") return await store.registerRunner(asRecord(body ?? {}, "runner")) as unknown as JsonValue; const claimMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/claim$/u); if (method === "POST" && claimMatch) { const record = asRecord(body, "claim"); const runnerId = typeof record.runnerId === "string" ? record.runnerId : ""; if (runnerId.length === 0) throw new AgentRunError("schema-invalid", "runnerId is required", { httpStatus: 400 }); - return store.claimRun(claimMatch[1] ?? "", runnerId, numberField(record, "leaseMs", 60_000)) as unknown as JsonValue; + return await store.claimRun(claimMatch[1] ?? "", runnerId, numberField(record, "leaseMs", 60_000)) as unknown as JsonValue; + } + const leaseMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/lease$/u); + if (method === "PATCH" && leaseMatch) { + const record = asRecord(body, "lease"); + const runnerId = typeof record.runnerId === "string" ? record.runnerId : ""; + if (runnerId.length === 0) throw new AgentRunError("schema-invalid", "runnerId is required", { httpStatus: 400 }); + return await store.heartbeat(leaseMatch[1] ?? "", runnerId, numberField(record, "leaseMs", 60_000)) as unknown as JsonValue; } const eventsAppendMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/events$/u); if (method === "POST" && eventsAppendMatch) { const record = asRecord(body, "event"); const type = typeof record.type === "string" ? record.type as RunEvent["type"] : "backend_status"; - return store.appendEvent(eventsAppendMatch[1] ?? "", type, asRecord(record.payload ?? {}, "event.payload")) as unknown as JsonValue; + return await store.appendEvent(eventsAppendMatch[1] ?? "", type, asRecord(record.payload ?? {}, "event.payload")) as unknown as JsonValue; } const statusMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/status$/u); if (method === "PATCH" && statusMatch) { const record = asRecord(body, "status"); const terminalStatus = record.terminalStatus === "completed" || record.terminalStatus === "failed" || record.terminalStatus === "blocked" || record.terminalStatus === "cancelled" ? record.terminalStatus : "failed"; - return store.finishRun(statusMatch[1] ?? "", { terminalStatus, failureKind: typeof record.failureKind === "string" ? record.failureKind as never : null, failureMessage: typeof record.failureMessage === "string" ? record.failureMessage : null }) as unknown as JsonValue; + return await store.finishRun(statusMatch[1] ?? "", { terminalStatus, failureKind: typeof record.failureKind === "string" ? record.failureKind as never : null, failureMessage: typeof record.failureMessage === "string" ? record.failureMessage : null }) as unknown as JsonValue; } const ackMatch = path.match(/^\/api\/v1\/commands\/([^/]+)\/ack$/u); - if (method === "POST" && ackMatch) return store.ackCommand(ackMatch[1] ?? "") as unknown as JsonValue; + if (method === "POST" && ackMatch) return await store.ackCommand(ackMatch[1] ?? "") as unknown as JsonValue; throw new AgentRunError("schema-invalid", `unsupported route ${method} ${path}`, { httpStatus: 404 }); } diff --git a/src/mgr/store.ts b/src/mgr/store.ts index 44fa01a..ee7a36c 100644 --- a/src/mgr/store.ts +++ b/src/mgr/store.ts @@ -3,20 +3,46 @@ import { AgentRunError } from "../common/errors.js"; import { newId, nowIso, stableHash } from "../common/validation.js"; import { redactJson } from "../common/redaction.js"; +export type MaybePromise = T | Promise; + +export interface StoreHealth extends JsonRecord { + adapter: "memory-self-test" | "postgres"; + ready: boolean; + reachable: boolean; + migrationReady: boolean; + migrationId: string | null; + failureKind: FailureKind | null; + message: string | null; + credentialValuesPrinted: false; +} + export interface AgentRunStore { - createRun(input: CreateRunInput): RunRecord; - getRun(runId: string): RunRecord; - listEvents(runId: string, afterSeq: number, limit: number): RunEvent[]; - createCommand(runId: string, input: CreateCommandInput): CommandRecord; - getCommand(commandId: string): CommandRecord; - listCommands(runId: string, afterSeq: number, limit: number): CommandRecord[]; - registerRunner(input: Partial): RunnerRecord; - claimRun(runId: string, runnerId: string, leaseMs: number): RunRecord; - heartbeat(runId: string, runnerId: string, leaseMs: number): RunRecord; - ackCommand(commandId: string): CommandRecord; - appendEvent(runId: string, type: RunEvent["type"], payload: JsonRecord): RunEvent; - finishRun(runId: string, result: Pick): RunRecord; - backends(): JsonRecord[]; + health(): MaybePromise; + createRun(input: CreateRunInput): MaybePromise; + getRun(runId: string): MaybePromise; + listEvents(runId: string, afterSeq: number, limit: number): MaybePromise; + createCommand(runId: string, input: CreateCommandInput): MaybePromise; + getCommand(commandId: string): MaybePromise; + listCommands(runId: string, afterSeq: number, limit: number): MaybePromise; + registerRunner(input: Partial): MaybePromise; + claimRun(runId: string, runnerId: string, leaseMs: number): MaybePromise; + heartbeat(runId: string, runnerId: string, leaseMs: number): MaybePromise; + ackCommand(commandId: string): MaybePromise; + appendEvent(runId: string, type: RunEvent["type"], payload: JsonRecord): MaybePromise; + finishRun(runId: string, result: Pick): MaybePromise; + backends(): MaybePromise; + close?(): MaybePromise; +} + +export async function openAgentRunStoreFromEnv(env: NodeJS.ProcessEnv = process.env): Promise { + const databaseUrl = env.DATABASE_URL?.trim(); + if (databaseUrl) { + const { createPostgresAgentRunStore } = await import("./postgres-store.js"); + return createPostgresAgentRunStore({ connectionString: databaseUrl }); + } + const storeMode = env.AGENTRUN_STORE ?? env.AGENTRUN_MGR_STORE; + if (storeMode === "memory") return new MemoryAgentRunStore(); + throw new AgentRunError("infra-failed", "DATABASE_URL is required for agentrun-mgr live runtime; set AGENTRUN_STORE=memory only for explicit self-test/dev mode", { httpStatus: 503, details: { adapter: "postgres", databaseUrl: "missing", memoryFallback: "disabled" } }); } export class MemoryAgentRunStore implements AgentRunStore { @@ -25,6 +51,10 @@ export class MemoryAgentRunStore implements AgentRunStore { private readonly eventsByRun = new Map(); private readonly runners = new Map(); + health(): StoreHealth { + return { adapter: "memory-self-test", ready: true, reachable: true, migrationReady: true, migrationId: "memory-self-test", failureKind: null, message: null, credentialValuesPrinted: false }; + } + createRun(input: CreateRunInput): RunRecord { const at = nowIso(); const run: RunRecord = { ...input, id: newId("run"), status: "pending", terminalStatus: null, failureKind: null, failureMessage: null, createdAt: at, updatedAt: at, claimedBy: null, leaseExpiresAt: null }; @@ -130,7 +160,7 @@ export class MemoryAgentRunStore implements AgentRunStore { } } -function statusFromTerminal(terminalStatus: TerminalStatus): RunRecord["status"] { +export function statusFromTerminal(terminalStatus: TerminalStatus): RunRecord["status"] { if (terminalStatus === "completed") return "completed"; if (terminalStatus === "cancelled") return "cancelled"; if (terminalStatus === "blocked") return "blocked"; diff --git a/src/selftest/run.ts b/src/selftest/run.ts index 655fd90..5a89040 100644 --- a/src/selftest/run.ts +++ b/src/selftest/run.ts @@ -4,9 +4,12 @@ import path from "node:path"; import { fileURLToPath } from "node:url"; import assert from "node:assert/strict"; import { startManagerServer } from "../mgr/server.js"; +import { MemoryAgentRunStore, openAgentRunStoreFromEnv } from "../mgr/store.js"; +import { postgresMigrationContract } from "../mgr/postgres-store.js"; import { ManagerClient } from "../mgr/client.js"; import { runOnce } from "../runner/run-once.js"; import { redactText } from "../common/redaction.js"; +import { AgentRunError } from "../common/errors.js"; const root = path.resolve(path.dirname(fileURLToPath(import.meta.url)), "../.."); const tmp = await mkdtemp(path.join(os.tmpdir(), "agentrun-selftest-")); @@ -21,12 +24,26 @@ try { await writeFile(path.join(workspace, "README.md"), "self-test workspace\n"); assert.equal(redactText("Authorization: Bearer abc123"), "Authorization: Bearer REDACTED"); + await assert.rejects( + () => openAgentRunStoreFromEnv({}), + (error) => error instanceof AgentRunError && error.failureKind === "infra-failed" && error.message.includes("DATABASE_URL is required"), + ); + const postgresContract = postgresMigrationContract(); + assert.equal(postgresContract.latestMigrationId, "001_v01_initial_durable_store"); + assert.ok(Array.isArray(postgresContract.requiredTables)); + assert.ok(postgresContract.requiredTables.includes("agentrun_schema_migrations")); + assert.ok(postgresContract.requiredTables.includes("agentrun_runs")); + assert.ok(postgresContract.requiredTables.includes("agentrun_events")); - const server = await startManagerServer({ port: 0, host: "127.0.0.1", sourceCommit: "self-test" }); + const server = await startManagerServer({ port: 0, host: "127.0.0.1", sourceCommit: "self-test", store: new MemoryAgentRunStore() }); try { const client = new ManagerClient(server.baseUrl); - const health = await client.get("/health/readiness") as { database?: { adapter?: string } }; + const health = await client.get("/health/readiness") as { database?: { adapter?: string; reachable?: boolean; migrationReady?: boolean; failureKind?: string | null }; secretRefs?: { valuesPrinted?: boolean } }; assert.equal(health.database?.adapter, "memory-self-test"); + assert.equal(health.database?.reachable, true); + assert.equal(health.database?.migrationReady, true); + assert.equal(health.database?.failureKind, null); + assert.equal(health.secretRefs?.valuesPrinted, false); const run = await client.post("/api/v1/runs", { tenantId: "unidesk", projectId: "pikasTech/unidesk", @@ -46,8 +63,8 @@ try { const duplicate = await client.post(`/api/v1/runs/${run.id}/commands`, { type: "turn", payload: { prompt: "hello" }, idempotencyKey: "selftest-turn" }) as { id: string }; assert.equal(duplicate.id, command.id); const fakePath = path.join(root, "src/selftest/fake-codex-app-server.ts"); - const fakeCommand = process.env.AGENTRUN_SELFTEST_CODEX_COMMAND ?? process.execPath; - const fakeArgs = process.env.AGENTRUN_SELFTEST_CODEX_ARGS ? JSON.parse(process.env.AGENTRUN_SELFTEST_CODEX_ARGS) as string[] : [fakePath]; + const fakeCommand = process.env.AGENTRUN_SELFTEST_CODEX_COMMAND ?? defaultFakeCommand(); + const fakeArgs = process.env.AGENTRUN_SELFTEST_CODEX_ARGS ? JSON.parse(process.env.AGENTRUN_SELFTEST_CODEX_ARGS) as string[] : defaultFakeArgs(fakePath); const result = await runOnce({ managerUrl: server.baseUrl, runId: run.id, codexCommand: fakeCommand, codexArgs: fakeArgs, codexHome, env: { CODEX_HOME: codexHome } }); assert.equal(result.terminalStatus, "completed"); const events = await client.get(`/api/v1/runs/${run.id}/events?afterSeq=0&limit=100`) as { items?: Array<{ type: string; payload: unknown }> }; @@ -56,10 +73,18 @@ try { assert.equal(JSON.stringify(events).includes("Bearer test-token"), false); const finalRun = await client.get(`/api/v1/runs/${run.id}`) as { terminalStatus?: string }; assert.equal(finalRun.terminalStatus, "completed"); - console.log(JSON.stringify({ ok: true, tests: ["manager-memory-lifecycle", "codex-stdio-fake-turn", "redaction"], runId: run.id })); + console.log(JSON.stringify({ ok: true, tests: ["manager-memory-lifecycle", "codex-stdio-fake-turn", "postgres-store-contract", "redaction"], runId: run.id })); } finally { await new Promise((resolve) => server.server.close(() => resolve())); } } finally { await rm(tmp, { recursive: true, force: true }); } + +function defaultFakeCommand(): string { + return process.versions.bun ? process.execPath : "npx"; +} + +function defaultFakeArgs(fakePath: string): string[] { + return process.versions.bun ? [fakePath] : ["tsx", fakePath]; +}