Merge remote-tracking branch 'origin/postgres-v01-store' into commander-v01-integration-check-1780026895

This commit is contained in:
Codex
2026-05-29 11:54:55 +08:00
11 changed files with 601 additions and 44 deletions
+1
View File
@@ -5,6 +5,7 @@ ENV NODE_ENV=production
ENV PORT=8080
COPY package.json tsconfig.json ./
RUN bun install --production
COPY scripts ./scripts
COPY src ./src
COPY deploy/deploy.json ./deploy/deploy.json
+3 -3
View File
@@ -108,7 +108,7 @@ POST /api/v1/commands/:commandId/ack
| 规格项 | 状态 | 说明 |
| --- | --- | --- |
| `agentrun-mgr` 服务规格 | 已定义 | 本文为 v0.1 manager 权威。 |
| Manager REST API | 实现 | 需要后续代码实现。 |
| Manager REST API | 实现骨架 | 已有 run、command、event、runner register、claim、lease heartbeat、status、ack、backends 和 health/readiness 的 HTTP JSON 骨架;仍需后续真实部署验收。 |
| Tenant policy boundary | 已定义/未实现 | v0.1 只做最小 schema/allowlist/secretScope 边界。 |
| Postgres durable adapter | 实现 | 见 [spec-v01-postgres.md](spec-v01-postgres.md)。 |
| Observability 最小合同 | 已定义/未实现 | event、terminal status、failureKind 和 redaction 需要代码实现。 |
| Postgres durable adapter | 实现骨架 | live runtime 通过 `DATABASE_URL` 使用 Postgres durable storememory store 仅用于显式 self-test/dev。见 [spec-v01-postgres.md](spec-v01-postgres.md)。 |
| Observability 最小合同 | 已实现骨架 | events append-only、terminal status、failureKind、health/readiness store 状态和 Secret/DSN redaction 已进入 manager 骨架;集中 trace 和部署级观测仍属后续工作。 |
+3 -2
View File
@@ -74,6 +74,7 @@ Secret 名称和 key 可以在实现时按 Kubernetes 命名限制微调,但
| --- | --- | --- |
| Postgres durable store 规格 | 已定义 | 本文为 v0.1 存储权威。 |
| StatefulSet/Service/PVC | 未实现 | 需要后续 GitOps lane 初始化。 |
| migration ledger | 实现 | 需要后续代码和 schema migration。 |
| manager Postgres adapter | 实现 | 需要后续 `agentrun-mgr` 实现。 |
| migration ledger | 实现骨架 | `agentrun-mgr` 启动 Postgres adapter 时幂等创建 `agentrun_schema_migrations` 并记录 migration id/checksumlive DB 迁移验收仍依赖后续 GitOps lane 初始化。 |
| manager Postgres adapter | 实现骨架 | `agentrun-mgr` 通过 `DATABASE_URL` 启用 Postgres adapter,持久化 runs、commands、events、runners、backends 和 leases;缺少 `DATABASE_URL` 时 live runtime fail fastmemory 只允许显式 self-test/dev。 |
| health/readiness store 状态 | 已实现骨架 | health/readiness 返回 adapter、reachable、migrationReady、migrationId、failureKind 和 redacted Secret 状态,不输出 DSN 明文。 |
| file/sqlite durable store | 不采用 | 只可用于临时本地测试,不作为 v0.1 runtime truth。 |
+1 -1
View File
@@ -154,7 +154,7 @@ Manager 负责校验、保存和返回这些字段;runner 只能消费已保
| Codex backend 规格 | 已定义 | 见 [spec-v01-backend-codex.md](spec-v01-backend-codex.md)。 |
| AgentRun CLI 规格 | 已定义 | 见 [spec-v01-cli.md](spec-v01-cli.md)。 |
| Scheduler deferred 规格 | 已定义 | 见 [spec-v01-scheduler.md](spec-v01-scheduler.md)。 |
| `agentrun-mgr` 实现 | 实现 | 需后续代码实现。 |
| `agentrun-mgr` 实现 | 实现骨架 | 已有 REST API、Postgres durable store 选择、migration ledger、health/readiness 和 self-test memory 模式骨架;仍需 G14 `agentrun-v01` 真实 Postgres/GitOps 验收。 |
| `agentrun-runner` 实现 | 未实现 | 需后续代码实现。 |
| 第一真实 backend | 未实现 | 默认候选 Codex。 |
| 自动 scheduler | Deferred | 不作为 `v0.1` 第一阶段验收目标。 |
+5
View File
@@ -9,8 +9,13 @@
"test": "bun run src/selftest/run.ts",
"cli": "bun scripts/agentrun-cli.ts"
},
"dependencies": {
"pg": "^8.13.1"
},
"devDependencies": {
"@types/pg": "^8.11.10",
"@types/node": "^22.10.0",
"tsx": "^4.19.2",
"typescript": "^5.8.3"
}
}
+5 -2
View File
@@ -1,5 +1,6 @@
import { readFile } from "node:fs/promises";
import { startManagerServer } from "../../src/mgr/server.js";
import { MemoryAgentRunStore } from "../../src/mgr/store.js";
import { ManagerClient } from "../../src/mgr/client.js";
import { runOnce } from "../../src/runner/run-once.js";
import type { JsonRecord, JsonValue } from "../../src/common/types.js";
@@ -64,8 +65,10 @@ async function dispatch(args: ParsedArgs): Promise<JsonValue> {
async function startServer(args: ParsedArgs): Promise<JsonRecord> {
const port = Number(flag(args, "port", "8080"));
const host = flag(args, "host", "0.0.0.0");
const started = await startManagerServer({ port, host });
return { serviceId: "agentrun-mgr", baseUrl: started.baseUrl, pid: process.pid, note: "foreground process; use Kubernetes/Tekton for v0.1 runtime" };
const storeMode = optionalFlag(args, "store") ?? process.env.AGENTRUN_STORE ?? process.env.AGENTRUN_MGR_STORE;
const started = await startManagerServer({ port, host, ...(storeMode === "memory" ? { store: new MemoryAgentRunStore() } : {}) });
const database = await started.store.health();
return { serviceId: "agentrun-mgr", baseUrl: started.baseUrl, pid: process.pid, database, note: "foreground process; use Kubernetes/Tekton for v0.1 runtime" };
}
function client(args: ParsedArgs): ManagerClient {
+9 -2
View File
@@ -1,6 +1,13 @@
import { startManagerServer } from "./server.js";
import { errorToJson } from "../common/errors.js";
const port = Number(process.env.PORT ?? process.env.AGENTRUN_MGR_PORT ?? "8080");
const host = process.env.HOST ?? "0.0.0.0";
const started = await startManagerServer({ port, host });
console.log(JSON.stringify({ ok: true, serviceId: "agentrun-mgr", baseUrl: started.baseUrl }));
try {
const started = await startManagerServer({ port, host });
const database = await started.store.health();
console.log(JSON.stringify({ ok: true, serviceId: "agentrun-mgr", baseUrl: started.baseUrl, database }));
} catch (error) {
console.error(JSON.stringify({ ok: false, serviceId: "agentrun-mgr", failureKind: "infra-failed", message: error instanceof Error ? error.message : String(error), error: errorToJson(error) }));
process.exit(1);
}
+476
View File
@@ -0,0 +1,476 @@
import { createHash } from "node:crypto";
import { Pool } from "pg";
import type { PoolClient, QueryResultRow } from "pg";
import { AgentRunError } from "../common/errors.js";
import { redactJson } from "../common/redaction.js";
import type { BackendProfile, BackendTurnResult, CommandRecord, CommandState, CreateCommandInput, CreateRunInput, EventType, FailureKind, JsonRecord, JsonValue, RunEvent, RunnerRecord, RunRecord, RunStatus, TerminalStatus } from "../common/types.js";
import { newId, nowIso, stableHash } from "../common/validation.js";
import type { AgentRunStore, StoreHealth } from "./store.js";
import { statusFromTerminal } from "./store.js";
interface PostgresStoreOptions {
connectionString: string;
}
interface MigrationDefinition {
id: string;
checksum: string;
sql: string;
}
const initialMigrationSql = `
CREATE TABLE IF NOT EXISTS agentrun_runs (
id text PRIMARY KEY,
tenant_id text NOT NULL,
project_id text NOT NULL,
workspace_ref jsonb NOT NULL,
provider_id text NOT NULL,
backend_profile text NOT NULL,
execution_policy jsonb NOT NULL,
trace_sink jsonb,
status text NOT NULL,
terminal_status text,
failure_kind text,
failure_message text,
created_at timestamptz NOT NULL,
updated_at timestamptz NOT NULL,
claimed_by text,
lease_expires_at timestamptz
);
CREATE TABLE IF NOT EXISTS agentrun_commands (
id text PRIMARY KEY,
run_id text NOT NULL REFERENCES agentrun_runs(id) ON DELETE CASCADE,
seq integer NOT NULL,
type text NOT NULL,
payload jsonb NOT NULL,
payload_hash text NOT NULL,
idempotency_key text,
state text NOT NULL,
created_at timestamptz NOT NULL,
updated_at timestamptz NOT NULL,
acknowledged_at timestamptz,
UNIQUE (run_id, seq)
);
CREATE UNIQUE INDEX IF NOT EXISTS agentrun_commands_run_idempotency_key_idx
ON agentrun_commands (run_id, idempotency_key)
WHERE idempotency_key IS NOT NULL;
CREATE TABLE IF NOT EXISTS agentrun_events (
id text PRIMARY KEY,
run_id text NOT NULL REFERENCES agentrun_runs(id) ON DELETE CASCADE,
seq integer NOT NULL,
type text NOT NULL,
payload jsonb NOT NULL,
created_at timestamptz NOT NULL,
UNIQUE (run_id, seq)
);
CREATE TABLE IF NOT EXISTS agentrun_runners (
id text PRIMARY KEY,
run_id text,
attempt_id text,
backend_profile text,
placement text,
source_commit text,
metadata jsonb NOT NULL DEFAULT '{}'::jsonb,
registered_at timestamptz NOT NULL,
heartbeat_at timestamptz NOT NULL
);
CREATE TABLE IF NOT EXISTS agentrun_backends (
profile text PRIMARY KEY,
capabilities jsonb NOT NULL,
capacity jsonb NOT NULL,
health jsonb NOT NULL,
updated_at timestamptz NOT NULL
);
CREATE TABLE IF NOT EXISTS agentrun_leases (
run_id text PRIMARY KEY REFERENCES agentrun_runs(id) ON DELETE CASCADE,
runner_id text NOT NULL,
lease_expires_at timestamptz NOT NULL,
stale_recovery_marker jsonb,
updated_at timestamptz NOT NULL
);
CREATE INDEX IF NOT EXISTS agentrun_runs_status_idx ON agentrun_runs (status, updated_at);
CREATE INDEX IF NOT EXISTS agentrun_events_run_seq_idx ON agentrun_events (run_id, seq);
CREATE INDEX IF NOT EXISTS agentrun_commands_run_seq_idx ON agentrun_commands (run_id, seq);
CREATE INDEX IF NOT EXISTS agentrun_leases_expiry_idx ON agentrun_leases (lease_expires_at);
INSERT INTO agentrun_backends (profile, capabilities, capacity, health, updated_at)
VALUES (
'codex',
'{"protocol":"codex-app-server-jsonrpc-stdio","transport":"stdio","command":"codex app-server --listen stdio://"}'::jsonb,
'{"mode":"manual-runner-v0.1"}'::jsonb,
'{"status":"registered"}'::jsonb,
now()
)
ON CONFLICT (profile) DO UPDATE SET
capabilities = EXCLUDED.capabilities,
capacity = EXCLUDED.capacity,
health = EXCLUDED.health,
updated_at = EXCLUDED.updated_at;
`;
const postgresMigrations: MigrationDefinition[] = [
{
id: "001_v01_initial_durable_store",
checksum: checksumSql(initialMigrationSql),
sql: initialMigrationSql,
},
];
export function postgresMigrationContract(): JsonRecord {
return {
migrationIds: postgresMigrations.map((migration) => migration.id),
latestMigrationId: latestMigrationId(),
requiredTables: ["agentrun_schema_migrations", "agentrun_runs", "agentrun_commands", "agentrun_events", "agentrun_runners", "agentrun_backends", "agentrun_leases"],
checksums: Object.fromEntries(postgresMigrations.map((migration) => [migration.id, migration.checksum])),
};
}
export async function createPostgresAgentRunStore(options: PostgresStoreOptions): Promise<PostgresAgentRunStore> {
const store = new PostgresAgentRunStore(options);
await store.migrate();
return store;
}
export class PostgresAgentRunStore implements AgentRunStore {
private readonly pool: Pool;
private migrationReady = false;
private appliedMigrationId: string | null = null;
constructor(options: PostgresStoreOptions) {
this.pool = new Pool({ connectionString: options.connectionString, application_name: "agentrun-mgr-v01" });
}
async migrate(): Promise<void> {
await this.withTransaction(async (client) => {
await client.query(`
CREATE TABLE IF NOT EXISTS agentrun_schema_migrations (
id text PRIMARY KEY,
checksum text NOT NULL,
applied_at timestamptz NOT NULL DEFAULT now()
)
`);
for (const migration of postgresMigrations) {
const existing = await client.query<{ checksum: string }>("SELECT checksum FROM agentrun_schema_migrations WHERE id = $1 FOR UPDATE", [migration.id]);
if (existing.rowCount && existing.rows[0]?.checksum !== migration.checksum) {
throw new AgentRunError("infra-failed", `schema migration checksum mismatch for ${migration.id}`, { httpStatus: 503, details: { migrationId: migration.id } });
}
if (!existing.rowCount) {
await client.query(migration.sql);
await client.query("INSERT INTO agentrun_schema_migrations (id, checksum) VALUES ($1, $2)", [migration.id, migration.checksum]);
}
}
});
this.migrationReady = true;
this.appliedMigrationId = latestMigrationId();
}
async health(): Promise<StoreHealth> {
try {
await this.pool.query("SELECT 1");
const result = await this.pool.query<{ id: string }>("SELECT id FROM agentrun_schema_migrations ORDER BY applied_at DESC, id DESC LIMIT 1");
const migrationId = result.rows[0]?.id ?? null;
const migrationReady = this.migrationReady && migrationId === latestMigrationId();
return { adapter: "postgres", ready: migrationReady, reachable: true, migrationReady, migrationId, failureKind: migrationReady ? null : "infra-failed", message: migrationReady ? null : "schema migration is not ready", credentialValuesPrinted: false };
} catch (error) {
return { adapter: "postgres", ready: false, reachable: false, migrationReady: false, migrationId: this.appliedMigrationId, failureKind: "infra-failed", message: error instanceof Error ? error.message : String(error), credentialValuesPrinted: false };
}
}
async createRun(input: CreateRunInput): Promise<RunRecord> {
const at = nowIso();
const run: RunRecord = { ...input, id: newId("run"), status: "pending", terminalStatus: null, failureKind: null, failureMessage: null, createdAt: at, updatedAt: at, claimedBy: null, leaseExpiresAt: null };
return this.withTransaction(async (client) => {
await client.query(
`INSERT INTO agentrun_runs (id, tenant_id, project_id, workspace_ref, provider_id, backend_profile, execution_policy, trace_sink, status, terminal_status, failure_kind, failure_message, created_at, updated_at, claimed_by, lease_expires_at)
VALUES ($1, $2, $3, $4::jsonb, $5, $6, $7::jsonb, $8::jsonb, $9, $10, $11, $12, $13, $14, $15, $16)`,
[run.id, run.tenantId, run.projectId, JSON.stringify(run.workspaceRef), run.providerId, run.backendProfile, JSON.stringify(run.executionPolicy), JSON.stringify(run.traceSink), run.status, run.terminalStatus, run.failureKind, run.failureMessage, run.createdAt, run.updatedAt, run.claimedBy, run.leaseExpiresAt],
);
await this.appendEventWithLockedRun(client, run.id, "backend_status", { phase: "run-created", backendProfile: run.backendProfile });
return run;
});
}
async getRun(runId: string): Promise<RunRecord> {
const result = await this.pool.query("SELECT * FROM agentrun_runs WHERE id = $1", [runId]);
const row = result.rows[0];
if (!row) throw new AgentRunError("schema-invalid", `run ${runId} was not found`, { httpStatus: 404 });
return runFromRow(row);
}
async listEvents(runId: string, afterSeq: number, limit: number): Promise<RunEvent[]> {
await this.getRun(runId);
const result = await this.pool.query("SELECT * FROM agentrun_events WHERE run_id = $1 AND seq > $2 ORDER BY seq ASC LIMIT $3", [runId, afterSeq, clamp(limit, 1, 500)]);
return result.rows.map(eventFromRow);
}
async createCommand(runId: string, input: CreateCommandInput): Promise<CommandRecord> {
const payloadHash = stableHash(input.payload);
return this.withTransaction(async (client) => {
await this.requireRunForUpdate(client, runId);
if (input.idempotencyKey) {
const existing = await client.query("SELECT * FROM agentrun_commands WHERE run_id = $1 AND idempotency_key = $2", [runId, input.idempotencyKey]);
if (existing.rows[0]) {
const command = commandFromRow(existing.rows[0]);
if (command.payloadHash !== payloadHash) throw new AgentRunError("schema-invalid", "idempotency key reused with different payload", { httpStatus: 409 });
return command;
}
}
const seq = await this.nextSeq(client, "agentrun_commands", runId);
const at = nowIso();
const command: CommandRecord = { ...input, id: newId("cmd"), runId, seq, state: "pending", payloadHash, createdAt: at, updatedAt: at, acknowledgedAt: null };
await client.query(
`INSERT INTO agentrun_commands (id, run_id, seq, type, payload, payload_hash, idempotency_key, state, created_at, updated_at, acknowledged_at)
VALUES ($1, $2, $3, $4, $5::jsonb, $6, $7, $8, $9, $10, $11)`,
[command.id, command.runId, command.seq, command.type, JSON.stringify(command.payload), command.payloadHash, command.idempotencyKey ?? null, command.state, command.createdAt, command.updatedAt, command.acknowledgedAt],
);
await this.appendEventWithLockedRun(client, runId, "backend_status", { phase: "command-created", commandId: command.id, commandType: command.type });
return command;
});
}
async getCommand(commandId: string): Promise<CommandRecord> {
const result = await this.pool.query("SELECT * FROM agentrun_commands WHERE id = $1", [commandId]);
const row = result.rows[0];
if (!row) throw new AgentRunError("schema-invalid", `command ${commandId} was not found`, { httpStatus: 404 });
return commandFromRow(row);
}
async listCommands(runId: string, afterSeq: number, limit: number): Promise<CommandRecord[]> {
await this.getRun(runId);
const result = await this.pool.query("SELECT * FROM agentrun_commands WHERE run_id = $1 AND seq > $2 ORDER BY seq ASC LIMIT $3", [runId, afterSeq, clamp(limit, 1, 100)]);
return result.rows.map(commandFromRow);
}
async registerRunner(input: Partial<RunnerRecord>): Promise<RunnerRecord> {
const at = nowIso();
const runner: RunnerRecord = { id: input.id ?? newId("runner"), registeredAt: at, heartbeatAt: at, ...input };
const metadata = metadataForRunner(runner);
const result = await this.pool.query(
`INSERT INTO agentrun_runners (id, run_id, attempt_id, backend_profile, placement, source_commit, metadata, registered_at, heartbeat_at)
VALUES ($1, $2, $3, $4, $5, $6, $7::jsonb, $8, $9)
ON CONFLICT (id) DO UPDATE SET
run_id = EXCLUDED.run_id,
attempt_id = EXCLUDED.attempt_id,
backend_profile = EXCLUDED.backend_profile,
placement = EXCLUDED.placement,
source_commit = EXCLUDED.source_commit,
metadata = EXCLUDED.metadata,
heartbeat_at = EXCLUDED.heartbeat_at
RETURNING *`,
[runner.id, runner.runId ?? null, runner.attemptId ?? null, runner.backendProfile ?? null, runner.placement ?? null, runner.sourceCommit ?? null, JSON.stringify(metadata), runner.registeredAt, runner.heartbeatAt],
);
return runnerFromRow(result.rows[0]);
}
async claimRun(runId: string, runnerId: string, leaseMs: number): Promise<RunRecord> {
return this.withTransaction(async (client) => {
const run = await this.requireRunForUpdate(client, runId);
if (run.claimedBy && run.claimedBy !== runnerId && !isTerminalStatus(run.status)) throw new AgentRunError("runner-lease-conflict", `run ${runId} is already claimed`, { httpStatus: 409 });
const leaseExpiresAt = new Date(Date.now() + leaseMs).toISOString();
const updated = await client.query(
`UPDATE agentrun_runs SET status = $2, claimed_by = $3, lease_expires_at = $4, updated_at = $5 WHERE id = $1 RETURNING *`,
[runId, "claimed", runnerId, leaseExpiresAt, nowIso()],
);
await client.query(
`INSERT INTO agentrun_leases (run_id, runner_id, lease_expires_at, stale_recovery_marker, updated_at)
VALUES ($1, $2, $3, $4::jsonb, $5)
ON CONFLICT (run_id) DO UPDATE SET runner_id = EXCLUDED.runner_id, lease_expires_at = EXCLUDED.lease_expires_at, updated_at = EXCLUDED.updated_at`,
[runId, runnerId, leaseExpiresAt, null, nowIso()],
);
await this.appendEventWithLockedRun(client, runId, "backend_status", { phase: "run-claimed", runnerId });
return runFromRow(updated.rows[0]);
});
}
async heartbeat(runId: string, runnerId: string, leaseMs: number): Promise<RunRecord> {
return this.withTransaction(async (client) => {
const run = await this.requireRunForUpdate(client, runId);
if (run.claimedBy !== runnerId) throw new AgentRunError("runner-lease-conflict", `run ${runId} is not claimed by ${runnerId}`, { httpStatus: 409 });
const leaseExpiresAt = new Date(Date.now() + leaseMs).toISOString();
const updated = await client.query("UPDATE agentrun_runs SET lease_expires_at = $2, updated_at = $3 WHERE id = $1 RETURNING *", [runId, leaseExpiresAt, nowIso()]);
await client.query("UPDATE agentrun_runners SET heartbeat_at = $2 WHERE id = $1", [runnerId, nowIso()]);
await client.query("UPDATE agentrun_leases SET lease_expires_at = $2, updated_at = $3 WHERE run_id = $1", [runId, leaseExpiresAt, nowIso()]);
return runFromRow(updated.rows[0]);
});
}
async ackCommand(commandId: string): Promise<CommandRecord> {
const result = await this.pool.query("UPDATE agentrun_commands SET state = $2, acknowledged_at = $3, updated_at = $3 WHERE id = $1 RETURNING *", [commandId, "acknowledged", nowIso()]);
const row = result.rows[0];
if (!row) throw new AgentRunError("schema-invalid", `command ${commandId} was not found`, { httpStatus: 404 });
return commandFromRow(row);
}
async appendEvent(runId: string, type: EventType, payload: JsonRecord): Promise<RunEvent> {
return this.withTransaction(async (client) => {
await this.requireRunForUpdate(client, runId);
return this.appendEventWithLockedRun(client, runId, type, payload);
});
}
async finishRun(runId: string, result: Pick<BackendTurnResult, "terminalStatus" | "failureKind" | "failureMessage">): Promise<RunRecord> {
return this.withTransaction(async (client) => {
await this.requireRunForUpdate(client, runId);
const status = statusFromTerminal(result.terminalStatus);
const updated = await client.query(
`UPDATE agentrun_runs SET status = $2, terminal_status = $3, failure_kind = $4, failure_message = $5, updated_at = $6 WHERE id = $1 RETURNING *`,
[runId, status, result.terminalStatus, result.failureKind, result.failureMessage, nowIso()],
);
await this.appendEventWithLockedRun(client, runId, "terminal_status", { terminalStatus: result.terminalStatus, failureKind: result.failureKind, message: result.failureMessage });
return runFromRow(updated.rows[0]);
});
}
async backends(): Promise<JsonRecord[]> {
const result = await this.pool.query("SELECT * FROM agentrun_backends ORDER BY profile ASC");
return result.rows.map((row) => ({ profile: stringValue(row.profile), ...jsonRecord(row.capabilities), capacity: jsonValue(row.capacity), health: jsonValue(row.health), updatedAt: nullableIso(row.updated_at) }));
}
async close(): Promise<void> {
await this.pool.end();
}
private async appendEventWithLockedRun(client: PoolClient, runId: string, type: EventType, payload: JsonRecord): Promise<RunEvent> {
const seq = await this.nextSeq(client, "agentrun_events", runId);
const event: RunEvent = { id: newId("evt"), runId, seq, type, payload: redactJson(payload), createdAt: nowIso() };
await client.query("INSERT INTO agentrun_events (id, run_id, seq, type, payload, created_at) VALUES ($1, $2, $3, $4, $5::jsonb, $6)", [event.id, event.runId, event.seq, event.type, JSON.stringify(event.payload), event.createdAt]);
return event;
}
private async nextSeq(client: PoolClient, table: "agentrun_commands" | "agentrun_events", runId: string): Promise<number> {
const result = await client.query<{ seq: number }>(`SELECT COALESCE(MAX(seq), 0) + 1 AS seq FROM ${table} WHERE run_id = $1`, [runId]);
return Number(result.rows[0]?.seq ?? 1);
}
private async requireRunForUpdate(client: PoolClient, runId: string): Promise<RunRecord> {
const result = await client.query("SELECT * FROM agentrun_runs WHERE id = $1 FOR UPDATE", [runId]);
const row = result.rows[0];
if (!row) throw new AgentRunError("schema-invalid", `run ${runId} was not found`, { httpStatus: 404 });
return runFromRow(row);
}
private async withTransaction<T>(fn: (client: PoolClient) => Promise<T>): Promise<T> {
const client = await this.pool.connect();
try {
await client.query("BEGIN");
const result = await fn(client);
await client.query("COMMIT");
return result;
} catch (error) {
await client.query("ROLLBACK");
throw error;
} finally {
client.release();
}
}
}
function checksumSql(sql: string): string {
return createHash("sha256").update(sql.trim()).digest("hex");
}
function latestMigrationId(): string {
return postgresMigrations[postgresMigrations.length - 1]?.id ?? "none";
}
function clamp(value: number, min: number, max: number): number {
return Math.max(min, Math.min(value, max));
}
function runFromRow(row: QueryResultRow): RunRecord {
return {
id: stringValue(row.id),
tenantId: stringValue(row.tenant_id),
projectId: stringValue(row.project_id),
workspaceRef: jsonRecord(row.workspace_ref) as RunRecord["workspaceRef"],
providerId: stringValue(row.provider_id),
backendProfile: stringValue(row.backend_profile) as BackendProfile,
executionPolicy: jsonRecord(row.execution_policy) as RunRecord["executionPolicy"],
traceSink: jsonValue(row.trace_sink),
status: stringValue(row.status) as RunStatus,
terminalStatus: nullableString(row.terminal_status) as TerminalStatus | null,
failureKind: nullableString(row.failure_kind) as FailureKind | null,
failureMessage: nullableString(row.failure_message),
createdAt: iso(row.created_at),
updatedAt: iso(row.updated_at),
claimedBy: nullableString(row.claimed_by),
leaseExpiresAt: nullableIso(row.lease_expires_at),
};
}
function commandFromRow(row: QueryResultRow): CommandRecord {
return {
id: stringValue(row.id),
runId: stringValue(row.run_id),
seq: Number(row.seq),
type: stringValue(row.type) as CommandRecord["type"],
payload: jsonRecord(row.payload),
payloadHash: stringValue(row.payload_hash),
...(nullableString(row.idempotency_key) ? { idempotencyKey: stringValue(row.idempotency_key) } : {}),
state: stringValue(row.state) as CommandState,
createdAt: iso(row.created_at),
updatedAt: iso(row.updated_at),
acknowledgedAt: nullableIso(row.acknowledged_at),
};
}
function eventFromRow(row: QueryResultRow): RunEvent {
return { id: stringValue(row.id), runId: stringValue(row.run_id), seq: Number(row.seq), type: stringValue(row.type) as EventType, payload: jsonRecord(row.payload), createdAt: iso(row.created_at) };
}
function runnerFromRow(row: QueryResultRow): RunnerRecord {
return {
...jsonRecord(row.metadata),
id: stringValue(row.id),
...(nullableString(row.run_id) ? { runId: stringValue(row.run_id) } : {}),
...(nullableString(row.attempt_id) ? { attemptId: stringValue(row.attempt_id) } : {}),
...(nullableString(row.backend_profile) ? { backendProfile: stringValue(row.backend_profile) as BackendProfile } : {}),
...(nullableString(row.placement) ? { placement: stringValue(row.placement) } : {}),
...(nullableString(row.source_commit) ? { sourceCommit: stringValue(row.source_commit) } : {}),
registeredAt: iso(row.registered_at),
heartbeatAt: iso(row.heartbeat_at),
};
}
function metadataForRunner(runner: RunnerRecord): JsonRecord {
const { id: _id, runId: _runId, attemptId: _attemptId, backendProfile: _backendProfile, placement: _placement, sourceCommit: _sourceCommit, registeredAt: _registeredAt, heartbeatAt: _heartbeatAt, ...metadata } = runner;
return redactJson(metadata);
}
function isTerminalStatus(status: RunRecord["status"]): boolean {
return status === "completed" || status === "failed" || status === "cancelled";
}
function stringValue(value: unknown): string {
return typeof value === "string" ? value : String(value ?? "");
}
function nullableString(value: unknown): string | null {
return value === null || value === undefined ? null : stringValue(value);
}
function jsonValue(value: unknown): JsonValue {
if (value === undefined) return null;
return value as JsonValue;
}
function jsonRecord(value: unknown): JsonRecord {
return typeof value === "object" && value !== null && !Array.isArray(value) ? value as JsonRecord : {};
}
function iso(value: unknown): string {
if (value instanceof Date) return value.toISOString();
if (typeof value === "string") return new Date(value).toISOString();
return new Date(String(value)).toISOString();
}
function nullableIso(value: unknown): string | null {
return value === null || value === undefined ? null : iso(value);
}
+24 -15
View File
@@ -2,7 +2,7 @@ import type { Server } from "node:http";
import { createServer } from "node:http";
import type { AddressInfo } from "node:net";
import type { AgentRunStore } from "./store.js";
import { MemoryAgentRunStore } from "./store.js";
import { openAgentRunStoreFromEnv } from "./store.js";
import { AgentRunError, errorToJson } from "../common/errors.js";
import { asRecord, validateCreateCommand, validateCreateRun } from "../common/validation.js";
import type { ApiErrorBody, ApiOkBody, JsonRecord, JsonValue, RunEvent } from "../common/types.js";
@@ -21,7 +21,7 @@ export interface StartedManagerServer {
}
export async function startManagerServer(options: ManagerServerOptions = {}): Promise<StartedManagerServer> {
const store = options.store ?? new MemoryAgentRunStore();
const store = options.store ?? await openAgentRunStoreFromEnv();
const sourceCommit = options.sourceCommit ?? process.env.AGENTRUN_SOURCE_COMMIT ?? "unknown";
const server = createServer(async (req, res) => {
const traceId = `trc_${Date.now().toString(36)}_${Math.random().toString(36).slice(2, 8)}`;
@@ -52,45 +52,54 @@ async function readBody(req: import("node:http").IncomingMessage): Promise<unkno
async function route({ method, url, body, store, sourceCommit }: { method: string; url: URL; body: unknown; store: AgentRunStore; sourceCommit: string }): Promise<JsonValue> {
const path = url.pathname;
if (method === "GET" && (path === "/health" || path === "/health/live" || path === "/health/readiness")) {
return { serviceId: "agentrun-mgr", live: true, ready: true, database: { adapter: "memory-self-test", migrationReady: true }, sourceCommit, secretRefs: { valuesPrinted: false } };
const database = await store.health();
const ready = path === "/health/live" ? true : database.ready;
return { serviceId: "agentrun-mgr", live: true, ready, database, sourceCommit, secretRefs: { databaseUrl: database.adapter === "postgres" ? "redacted" : "not-used", valuesPrinted: false } };
}
if (method === "GET" && path === "/api/v1/backends") return { items: store.backends() };
if (method === "POST" && path === "/api/v1/runs") return store.createRun(validateCreateRun(body)) as unknown as JsonValue;
if (method === "GET" && path === "/api/v1/backends") return { items: await store.backends() as unknown as JsonValue };
if (method === "POST" && path === "/api/v1/runs") return await store.createRun(validateCreateRun(body)) as unknown as JsonValue;
const runMatch = path.match(/^\/api\/v1\/runs\/([^/]+)$/u);
if (method === "GET" && runMatch) return store.getRun(runMatch[1] ?? "") as unknown as JsonValue;
if (method === "GET" && runMatch) return await store.getRun(runMatch[1] ?? "") as unknown as JsonValue;
const eventMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/events$/u);
if (method === "GET" && eventMatch) {
const afterSeq = integerQuery(url, "afterSeq", 0);
const limit = integerQuery(url, "limit", 100);
return { items: store.listEvents(eventMatch[1] ?? "", afterSeq, limit) as unknown as JsonValue };
return { items: await store.listEvents(eventMatch[1] ?? "", afterSeq, limit) as unknown as JsonValue };
}
const commandCreateMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/commands$/u);
if (method === "POST" && commandCreateMatch) return store.createCommand(commandCreateMatch[1] ?? "", validateCreateCommand(body)) as unknown as JsonValue;
if (method === "GET" && commandCreateMatch) return { items: store.listCommands(commandCreateMatch[1] ?? "", integerQuery(url, "afterSeq", 0), integerQuery(url, "limit", 20)) as unknown as JsonValue };
if (method === "POST" && commandCreateMatch) return await store.createCommand(commandCreateMatch[1] ?? "", validateCreateCommand(body)) as unknown as JsonValue;
if (method === "GET" && commandCreateMatch) return { items: await store.listCommands(commandCreateMatch[1] ?? "", integerQuery(url, "afterSeq", 0), integerQuery(url, "limit", 20)) as unknown as JsonValue };
const commandShowMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/commands\/([^/]+)$/u);
if (method === "GET" && commandShowMatch) return store.getCommand(commandShowMatch[2] ?? "") as unknown as JsonValue;
if (method === "POST" && path === "/api/v1/runners/register") return store.registerRunner(asRecord(body ?? {}, "runner")) as unknown as JsonValue;
if (method === "GET" && commandShowMatch) return await store.getCommand(commandShowMatch[2] ?? "") as unknown as JsonValue;
if (method === "POST" && path === "/api/v1/runners/register") return await store.registerRunner(asRecord(body ?? {}, "runner")) as unknown as JsonValue;
const claimMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/claim$/u);
if (method === "POST" && claimMatch) {
const record = asRecord(body, "claim");
const runnerId = typeof record.runnerId === "string" ? record.runnerId : "";
if (runnerId.length === 0) throw new AgentRunError("schema-invalid", "runnerId is required", { httpStatus: 400 });
return store.claimRun(claimMatch[1] ?? "", runnerId, numberField(record, "leaseMs", 60_000)) as unknown as JsonValue;
return await store.claimRun(claimMatch[1] ?? "", runnerId, numberField(record, "leaseMs", 60_000)) as unknown as JsonValue;
}
const leaseMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/lease$/u);
if (method === "PATCH" && leaseMatch) {
const record = asRecord(body, "lease");
const runnerId = typeof record.runnerId === "string" ? record.runnerId : "";
if (runnerId.length === 0) throw new AgentRunError("schema-invalid", "runnerId is required", { httpStatus: 400 });
return await store.heartbeat(leaseMatch[1] ?? "", runnerId, numberField(record, "leaseMs", 60_000)) as unknown as JsonValue;
}
const eventsAppendMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/events$/u);
if (method === "POST" && eventsAppendMatch) {
const record = asRecord(body, "event");
const type = typeof record.type === "string" ? record.type as RunEvent["type"] : "backend_status";
return store.appendEvent(eventsAppendMatch[1] ?? "", type, asRecord(record.payload ?? {}, "event.payload")) as unknown as JsonValue;
return await store.appendEvent(eventsAppendMatch[1] ?? "", type, asRecord(record.payload ?? {}, "event.payload")) as unknown as JsonValue;
}
const statusMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/status$/u);
if (method === "PATCH" && statusMatch) {
const record = asRecord(body, "status");
const terminalStatus = record.terminalStatus === "completed" || record.terminalStatus === "failed" || record.terminalStatus === "blocked" || record.terminalStatus === "cancelled" ? record.terminalStatus : "failed";
return store.finishRun(statusMatch[1] ?? "", { terminalStatus, failureKind: typeof record.failureKind === "string" ? record.failureKind as never : null, failureMessage: typeof record.failureMessage === "string" ? record.failureMessage : null }) as unknown as JsonValue;
return await store.finishRun(statusMatch[1] ?? "", { terminalStatus, failureKind: typeof record.failureKind === "string" ? record.failureKind as never : null, failureMessage: typeof record.failureMessage === "string" ? record.failureMessage : null }) as unknown as JsonValue;
}
const ackMatch = path.match(/^\/api\/v1\/commands\/([^/]+)\/ack$/u);
if (method === "POST" && ackMatch) return store.ackCommand(ackMatch[1] ?? "") as unknown as JsonValue;
if (method === "POST" && ackMatch) return await store.ackCommand(ackMatch[1] ?? "") as unknown as JsonValue;
throw new AgentRunError("schema-invalid", `unsupported route ${method} ${path}`, { httpStatus: 404 });
}
+44 -14
View File
@@ -3,20 +3,46 @@ import { AgentRunError } from "../common/errors.js";
import { newId, nowIso, stableHash } from "../common/validation.js";
import { redactJson } from "../common/redaction.js";
export type MaybePromise<T> = T | Promise<T>;
export interface StoreHealth extends JsonRecord {
adapter: "memory-self-test" | "postgres";
ready: boolean;
reachable: boolean;
migrationReady: boolean;
migrationId: string | null;
failureKind: FailureKind | null;
message: string | null;
credentialValuesPrinted: false;
}
export interface AgentRunStore {
createRun(input: CreateRunInput): RunRecord;
getRun(runId: string): RunRecord;
listEvents(runId: string, afterSeq: number, limit: number): RunEvent[];
createCommand(runId: string, input: CreateCommandInput): CommandRecord;
getCommand(commandId: string): CommandRecord;
listCommands(runId: string, afterSeq: number, limit: number): CommandRecord[];
registerRunner(input: Partial<RunnerRecord>): RunnerRecord;
claimRun(runId: string, runnerId: string, leaseMs: number): RunRecord;
heartbeat(runId: string, runnerId: string, leaseMs: number): RunRecord;
ackCommand(commandId: string): CommandRecord;
appendEvent(runId: string, type: RunEvent["type"], payload: JsonRecord): RunEvent;
finishRun(runId: string, result: Pick<BackendTurnResult, "terminalStatus" | "failureKind" | "failureMessage">): RunRecord;
backends(): JsonRecord[];
health(): MaybePromise<StoreHealth>;
createRun(input: CreateRunInput): MaybePromise<RunRecord>;
getRun(runId: string): MaybePromise<RunRecord>;
listEvents(runId: string, afterSeq: number, limit: number): MaybePromise<RunEvent[]>;
createCommand(runId: string, input: CreateCommandInput): MaybePromise<CommandRecord>;
getCommand(commandId: string): MaybePromise<CommandRecord>;
listCommands(runId: string, afterSeq: number, limit: number): MaybePromise<CommandRecord[]>;
registerRunner(input: Partial<RunnerRecord>): MaybePromise<RunnerRecord>;
claimRun(runId: string, runnerId: string, leaseMs: number): MaybePromise<RunRecord>;
heartbeat(runId: string, runnerId: string, leaseMs: number): MaybePromise<RunRecord>;
ackCommand(commandId: string): MaybePromise<CommandRecord>;
appendEvent(runId: string, type: RunEvent["type"], payload: JsonRecord): MaybePromise<RunEvent>;
finishRun(runId: string, result: Pick<BackendTurnResult, "terminalStatus" | "failureKind" | "failureMessage">): MaybePromise<RunRecord>;
backends(): MaybePromise<JsonRecord[]>;
close?(): MaybePromise<void>;
}
export async function openAgentRunStoreFromEnv(env: NodeJS.ProcessEnv = process.env): Promise<AgentRunStore> {
const databaseUrl = env.DATABASE_URL?.trim();
if (databaseUrl) {
const { createPostgresAgentRunStore } = await import("./postgres-store.js");
return createPostgresAgentRunStore({ connectionString: databaseUrl });
}
const storeMode = env.AGENTRUN_STORE ?? env.AGENTRUN_MGR_STORE;
if (storeMode === "memory") return new MemoryAgentRunStore();
throw new AgentRunError("infra-failed", "DATABASE_URL is required for agentrun-mgr live runtime; set AGENTRUN_STORE=memory only for explicit self-test/dev mode", { httpStatus: 503, details: { adapter: "postgres", databaseUrl: "missing", memoryFallback: "disabled" } });
}
export class MemoryAgentRunStore implements AgentRunStore {
@@ -25,6 +51,10 @@ export class MemoryAgentRunStore implements AgentRunStore {
private readonly eventsByRun = new Map<string, RunEvent[]>();
private readonly runners = new Map<string, RunnerRecord>();
health(): StoreHealth {
return { adapter: "memory-self-test", ready: true, reachable: true, migrationReady: true, migrationId: "memory-self-test", failureKind: null, message: null, credentialValuesPrinted: false };
}
createRun(input: CreateRunInput): RunRecord {
const at = nowIso();
const run: RunRecord = { ...input, id: newId("run"), status: "pending", terminalStatus: null, failureKind: null, failureMessage: null, createdAt: at, updatedAt: at, claimedBy: null, leaseExpiresAt: null };
@@ -130,7 +160,7 @@ export class MemoryAgentRunStore implements AgentRunStore {
}
}
function statusFromTerminal(terminalStatus: TerminalStatus): RunRecord["status"] {
export function statusFromTerminal(terminalStatus: TerminalStatus): RunRecord["status"] {
if (terminalStatus === "completed") return "completed";
if (terminalStatus === "cancelled") return "cancelled";
if (terminalStatus === "blocked") return "blocked";
+30 -5
View File
@@ -4,9 +4,12 @@ import path from "node:path";
import { fileURLToPath } from "node:url";
import assert from "node:assert/strict";
import { startManagerServer } from "../mgr/server.js";
import { MemoryAgentRunStore, openAgentRunStoreFromEnv } from "../mgr/store.js";
import { postgresMigrationContract } from "../mgr/postgres-store.js";
import { ManagerClient } from "../mgr/client.js";
import { runOnce } from "../runner/run-once.js";
import { redactText } from "../common/redaction.js";
import { AgentRunError } from "../common/errors.js";
const root = path.resolve(path.dirname(fileURLToPath(import.meta.url)), "../..");
const tmp = await mkdtemp(path.join(os.tmpdir(), "agentrun-selftest-"));
@@ -21,12 +24,26 @@ try {
await writeFile(path.join(workspace, "README.md"), "self-test workspace\n");
assert.equal(redactText("Authorization: Bearer abc123"), "Authorization: Bearer REDACTED");
await assert.rejects(
() => openAgentRunStoreFromEnv({}),
(error) => error instanceof AgentRunError && error.failureKind === "infra-failed" && error.message.includes("DATABASE_URL is required"),
);
const postgresContract = postgresMigrationContract();
assert.equal(postgresContract.latestMigrationId, "001_v01_initial_durable_store");
assert.ok(Array.isArray(postgresContract.requiredTables));
assert.ok(postgresContract.requiredTables.includes("agentrun_schema_migrations"));
assert.ok(postgresContract.requiredTables.includes("agentrun_runs"));
assert.ok(postgresContract.requiredTables.includes("agentrun_events"));
const server = await startManagerServer({ port: 0, host: "127.0.0.1", sourceCommit: "self-test" });
const server = await startManagerServer({ port: 0, host: "127.0.0.1", sourceCommit: "self-test", store: new MemoryAgentRunStore() });
try {
const client = new ManagerClient(server.baseUrl);
const health = await client.get("/health/readiness") as { database?: { adapter?: string } };
const health = await client.get("/health/readiness") as { database?: { adapter?: string; reachable?: boolean; migrationReady?: boolean; failureKind?: string | null }; secretRefs?: { valuesPrinted?: boolean } };
assert.equal(health.database?.adapter, "memory-self-test");
assert.equal(health.database?.reachable, true);
assert.equal(health.database?.migrationReady, true);
assert.equal(health.database?.failureKind, null);
assert.equal(health.secretRefs?.valuesPrinted, false);
const run = await client.post("/api/v1/runs", {
tenantId: "unidesk",
projectId: "pikasTech/unidesk",
@@ -46,8 +63,8 @@ try {
const duplicate = await client.post(`/api/v1/runs/${run.id}/commands`, { type: "turn", payload: { prompt: "hello" }, idempotencyKey: "selftest-turn" }) as { id: string };
assert.equal(duplicate.id, command.id);
const fakePath = path.join(root, "src/selftest/fake-codex-app-server.ts");
const fakeCommand = process.env.AGENTRUN_SELFTEST_CODEX_COMMAND ?? process.execPath;
const fakeArgs = process.env.AGENTRUN_SELFTEST_CODEX_ARGS ? JSON.parse(process.env.AGENTRUN_SELFTEST_CODEX_ARGS) as string[] : [fakePath];
const fakeCommand = process.env.AGENTRUN_SELFTEST_CODEX_COMMAND ?? defaultFakeCommand();
const fakeArgs = process.env.AGENTRUN_SELFTEST_CODEX_ARGS ? JSON.parse(process.env.AGENTRUN_SELFTEST_CODEX_ARGS) as string[] : defaultFakeArgs(fakePath);
const result = await runOnce({ managerUrl: server.baseUrl, runId: run.id, codexCommand: fakeCommand, codexArgs: fakeArgs, codexHome, env: { CODEX_HOME: codexHome } });
assert.equal(result.terminalStatus, "completed");
const events = await client.get(`/api/v1/runs/${run.id}/events?afterSeq=0&limit=100`) as { items?: Array<{ type: string; payload: unknown }> };
@@ -56,10 +73,18 @@ try {
assert.equal(JSON.stringify(events).includes("Bearer test-token"), false);
const finalRun = await client.get(`/api/v1/runs/${run.id}`) as { terminalStatus?: string };
assert.equal(finalRun.terminalStatus, "completed");
console.log(JSON.stringify({ ok: true, tests: ["manager-memory-lifecycle", "codex-stdio-fake-turn", "redaction"], runId: run.id }));
console.log(JSON.stringify({ ok: true, tests: ["manager-memory-lifecycle", "codex-stdio-fake-turn", "postgres-store-contract", "redaction"], runId: run.id }));
} finally {
await new Promise<void>((resolve) => server.server.close(() => resolve()));
}
} finally {
await rm(tmp, { recursive: true, force: true });
}
function defaultFakeCommand(): string {
return process.versions.bun ? process.execPath : "npx";
}
function defaultFakeArgs(fakePath: string): string[] {
return process.versions.bun ? [fakePath] : ["tsx", fakePath];
}