diff --git a/deploy/container/Containerfile b/deploy/container/Containerfile index 3f9a612..29a66d0 100644 --- a/deploy/container/Containerfile +++ b/deploy/container/Containerfile @@ -5,6 +5,7 @@ ENV NODE_ENV=production ENV PORT=8080 COPY package.json tsconfig.json ./ +RUN bun install --production COPY scripts ./scripts COPY src ./src COPY deploy/deploy.json ./deploy/deploy.json diff --git a/docs/reference/spec-v01-agentrun-mgr.md b/docs/reference/spec-v01-agentrun-mgr.md index b99c65a..58fc148 100644 --- a/docs/reference/spec-v01-agentrun-mgr.md +++ b/docs/reference/spec-v01-agentrun-mgr.md @@ -108,7 +108,7 @@ POST /api/v1/commands/:commandId/ack | 规格项 | 状态 | 说明 | | --- | --- | --- | | `agentrun-mgr` 服务规格 | 已定义 | 本文为 v0.1 manager 权威。 | -| Manager REST API | 未实现 | 需要后续代码实现。 | +| Manager REST API | 已实现骨架 | 已有 run、command、event、backends、runner register、claim、lease heartbeat、poll、ack、status 和 health/readiness 的 HTTP JSON 骨架;仍需后续真实部署验收。 | | Tenant policy boundary | 已定义/未实现 | v0.1 只做最小 schema/allowlist/secretScope 边界。 | -| Postgres durable adapter | 未实现 | 见 [spec-v01-postgres.md](spec-v01-postgres.md)。 | -| Observability 最小合同 | 已定义/未实现 | event、terminal status、failureKind 和 redaction 需要代码实现。 | +| Postgres durable adapter | 已实现骨架 | live runtime 通过 `DATABASE_URL` 使用 Postgres durable store;memory store 仅用于显式 self-test/dev。见 [spec-v01-postgres.md](spec-v01-postgres.md)。 | +| Observability 最小合同 | 已实现骨架 | events append-only、terminal status、failureKind、health/readiness store 状态、runner claim/lease/backend events 和 Secret/DSN redaction 已进入 manager 骨架;集中 trace 和部署级观测仍属后续工作。 | diff --git a/docs/reference/spec-v01-agentrun-runner.md b/docs/reference/spec-v01-agentrun-runner.md index 7db51bc..023f85c 100644 --- a/docs/reference/spec-v01-agentrun-runner.md +++ b/docs/reference/spec-v01-agentrun-runner.md @@ -99,7 +99,7 @@ Runner 日志必须实时 flush 到文件或 pod log,CLI 启动 runner 时必 | 规格项 | 状态 | 说明 | | --- | --- | --- | | `agentrun-runner` 服务规格 | 已定义 | 本文为 v0.1 runner 权威。 | -| Kubernetes Job runner | 未实现 | 需要后续 runtime/GitOps 实现。 | -| host process runner | 未实现 | 可作为 MVP 手动 dispatch,但仍必须走 manager API。 | -| claim/lease/report client | 未实现 | 需要后续代码实现。 | +| Kubernetes Job runner | 部分实现 | 已提供 `runner job --dry-run` Job manifest 渲染骨架,固定使用 `agentrun-v01-runner` ServiceAccount、manager URL、runId/commandId/attemptId、executionPolicy 和 SecretRef 文件投影;尚未执行真实 Kubernetes create/apply。 | +| host process runner | 部分实现 | `runner start` 和 `src/runner/main.ts` 进入同一套 `runOnce`,可通过 manager register/claim/poll/report 执行自测试。 | +| claim/lease/report client | 部分实现 | 已拆出 runner manager API client,覆盖 register、claim、lease heartbeat、poll command、ack、append event 和 terminal status;durable store 仍待 Postgres adapter 接入。 | | runner redaction | 已定义/未实现 | 需与 backend adapter 共同实现。 | diff --git a/docs/reference/spec-v01-backend-adapter.md b/docs/reference/spec-v01-backend-adapter.md index b43ae62..564bfdb 100644 --- a/docs/reference/spec-v01-backend-adapter.md +++ b/docs/reference/spec-v01-backend-adapter.md @@ -52,6 +52,9 @@ Adapter 必须把 backend 错误映射为稳定 failureKind: | `provider-auth-failed` | provider credential 或 auth file 无效、上游返回 401/403。 | | `provider-rate-limited` | 上游限流或 quota 错误。 | | `backend-protocol-error` | backend 输出无法解析、协议字段缺失。 | +| `backend-json-parse-error` | backend stdout 不是合法 JSON-RPC 行。 | +| `backend-response-invalid` | backend JSON-RPC response/terminal notification 缺少必需字段。 | +| `backend-spawn-failed` | backend app-server 进程无法启动。 | | `backend-failed` | backend 进程非零退出或 terminal error。 | | `backend-timeout` | executionPolicy timeout 触发。 | | `cancelled` | interrupt/cancel 生效。 | diff --git a/docs/reference/spec-v01-backend-codex.md b/docs/reference/spec-v01-backend-codex.md index a883064..ee34ba1 100644 --- a/docs/reference/spec-v01-backend-codex.md +++ b/docs/reference/spec-v01-backend-codex.md @@ -96,6 +96,7 @@ Run 的 `executionPolicy.secretScope` 应引用 `agentrun-v01-provider-codex` | --- | --- | --- | | Codex backend 规格 | 已定义 | 本文为 v0.1 第一真实 backend 权威。 | | Codex Secret projection | 未实现 | 需要后续 Kubernetes Secret 和 runner/backend manifest。 | -| Codex adapter | 未实现 | 需要后续代码实现。 | +| Codex adapter | 已部分实现 | 当前代码已实现受控 `codex app-server --listen stdio://`、`initialize`/`thread/start`/`thread/resume`/`turn/start` response 校验、stderr 有界诊断、spawn/JSON parse/response invalid/timeout failureKind 和 fake app-server 自测试。 | +| 错误可观测与脱敏 | 已部分实现 | child env、cwd、workspace 和 Codex home 只输出摘要;stderr tail 有界且标记截断;事件和 failure 统一走 redaction。 | | 真实 provider turn | 未实现 | 综合联调必须真实完成后才能发布通过。 | | hostPath `~/.codex` | 不采用 | 只能通过 Kubernetes Secret projection 注入。 | diff --git a/docs/reference/spec-v01-cli.md b/docs/reference/spec-v01-cli.md index dd7b468..764bee0 100644 --- a/docs/reference/spec-v01-cli.md +++ b/docs/reference/spec-v01-cli.md @@ -30,6 +30,7 @@ bun scripts/agentrun-cli.ts runs events --after-seq --limit bun scripts/agentrun-cli.ts commands create --type turn --json-file bun scripts/agentrun-cli.ts commands show bun scripts/agentrun-cli.ts runner start --run-id --backend +bun scripts/agentrun-cli.ts secrets codex render --dry-run [--codex-home ] bun scripts/agentrun-cli.ts backends list bun scripts/agentrun-cli.ts server start|status|stop|logs ``` @@ -41,6 +42,7 @@ bun scripts/agentrun-cli.ts server start|status|stop|logs - 查询类命令返回当前 state、terminal_status、failureKind、event cursor 或 logPath。 - `events` 默认分页且有界,必须支持 `afterSeq` 和 `limit`。 - `server logs` 返回有界日志摘要,并指向完整日志文件或 Kubernetes pod identity。 +- `secrets codex render --dry-run` 返回 Codex provider Secret 创建计划、输入文件 bytes/hash、SecretRef、manifest 摘要和 apply 命令形状;它不得输出 Secret value 或执行 Kubernetes 写操作。 ## 配置与 Secret 边界 @@ -72,7 +74,7 @@ bun scripts/agentrun-cli.ts server start|status|stop|logs | 规格项 | 状态 | 说明 | | --- | --- | --- | | AgentRun CLI 规格 | 已定义 | 本文为 v0.1 CLI 权威。 | -| `scripts/agentrun-cli.ts` | 未实现 | 需要后续代码实现。 | -| CLI 调 manager REST | 未实现 | 需随 manager API 实现。 | -| runner start | 未实现 | 需随 runner Job/host process 实现。 | +| `scripts/agentrun-cli.ts` | 部分实现 | 已提供 run/command/event/backend/server 基础命令和 JSON envelope。 | +| CLI 调 manager REST | 部分实现 | CLI 通过 `ManagerClient` 调 manager REST;当前自测试使用内存 manager。 | +| runner start | 部分实现 | `runner start` 可执行 host process runner;`runner job --dry-run` 可渲染 Kubernetes Job JSON,尚不执行 create/apply。 | | CLI 测试规格 | 已定义 | 综合联调见 [spec-v01-validation.md](spec-v01-validation.md)。 | diff --git a/docs/reference/spec-v01-postgres.md b/docs/reference/spec-v01-postgres.md index da0037b..9c7fdca 100644 --- a/docs/reference/spec-v01-postgres.md +++ b/docs/reference/spec-v01-postgres.md @@ -74,6 +74,7 @@ Secret 名称和 key 可以在实现时按 Kubernetes 命名限制微调,但 | --- | --- | --- | | Postgres durable store 规格 | 已定义 | 本文为 v0.1 存储权威。 | | StatefulSet/Service/PVC | 未实现 | 需要后续 GitOps lane 初始化。 | -| migration ledger | 未实现 | 需要后续代码和 schema migration。 | -| manager Postgres adapter | 未实现 | 需要后续 `agentrun-mgr` 实现。 | +| migration ledger | 已实现骨架 | `agentrun-mgr` 启动 Postgres adapter 时幂等创建 `agentrun_schema_migrations` 并记录 migration id/checksum;live DB 迁移验收仍依赖后续 GitOps lane 初始化。 | +| manager Postgres adapter | 已实现骨架 | `agentrun-mgr` 通过 `DATABASE_URL` 启用 Postgres adapter,持久化 runs、commands、events、runners、backends 和 leases;缺少 `DATABASE_URL` 时 live runtime fail fast,memory 只允许显式 self-test/dev。 | +| health/readiness store 状态 | 已实现骨架 | health/readiness 返回 adapter、reachable、migrationReady、migrationId、failureKind 和 redacted Secret 状态,不输出 DSN 明文。 | | file/sqlite durable store | 不采用 | 只可用于临时本地测试,不作为 v0.1 runtime truth。 | diff --git a/docs/reference/spec-v01-secret-distribution.md b/docs/reference/spec-v01-secret-distribution.md index 58d68ce..8487f7b 100644 --- a/docs/reference/spec-v01-secret-distribution.md +++ b/docs/reference/spec-v01-secret-distribution.md @@ -106,6 +106,25 @@ runner/backend Pod Secret 创建和轮换不由 source branch 自动生成;source branch 只声明需要哪个 SecretRef。后续如果接入 External Secrets、Vault、SealedSecrets 或 SOPS,必须新增或更新本 spec,明确 controller、source of truth、rotation 和 redaction 规则。 +## Codex Secret dry-run 工具 + +`v0.1` 提供只读 CLI 工具,用 operator 本地 `~/.codex/auth.json` 与 `~/.codex/config.toml` 构造 Kubernetes Secret 创建计划: + +```bash +bun scripts/agentrun-cli.ts secrets codex render --dry-run +``` + +可选参数: + +- `--codex-home `:覆盖默认 `~/.codex` 输入目录。 +- `--auth-file ` / `--config-file `:分别覆盖输入文件路径。 +- `--namespace `:默认 `agentrun-v01`。 +- `--secret-name `:默认 `agentrun-v01-provider-codex`。 + +输出必须是 JSON,并且只包含 `namespace`、`secretName`、`keys`、每个输入文件的 `bytes`、`sha256`/`contentHash`、整体 hash、redaction 状态、apply 命令形状和 Secret manifest 摘要。输出不得包含 Secret value、`auth.json` 明文、`config.toml` 明文、base64 `data` 字段或可直接恢复 credential 的内容。工具只支持 `--dry-run`;不得执行 `kubectl apply`。 + +失败必须结构化返回 `failureKind`:缺文件、不可读文件或空 credential 归类为 `secret-unavailable`;非法 JSON/TOML 归类为 `schema-invalid`。 + ## 日志与事件 Redaction - event、trace、日志、CLI 输出、health 和 diagnostics 不得打印 Secret 值。 @@ -132,7 +151,8 @@ Secret 创建和轮换不由 source branch 自动生成;source branch 只声 | 规格项 | 状态 | 说明 | | --- | --- | --- | | Secret 分发规格 | 已定义 | 本文为 v0.1 provider credential 分发权威。 | -| Kubernetes SecretRef 注入 | 未实现 | 需要后续 GitOps/runtime 实现。 | -| Codex auth/config file projection | 未实现 | 需要后续 runner/backend adapter 实现,测试来源为 `~/.codex/auth.json` 和 `~/.codex/config.toml`。 | -| redaction 最小规则 | 已定义 | 需要后续代码实现和测试。 | +| Kubernetes SecretRef 注入 | 部分实现 | runner Job dry-run 渲染已按 run `executionPolicy.secretScope.providerCredentials` 生成 Secret volume projection 和 `CODEX_HOME`,但尚未 apply 到集群。 | +| Codex Secret dry-run 工具 | 已实现 | `bun scripts/agentrun-cli.ts secrets codex render --dry-run` 只输出 Secret 创建计划、hash 和 redacted manifest 摘要,不执行 apply。 | +| Codex auth/config file projection | 部分实现 | backend readiness 检查 `auth.json`/`config.toml` 可读性,缺失时返回 `secret-unavailable`;self-test 使用临时文件模拟投影。 | +| redaction 最小规则 | 部分实现 | Secret dry-run 工具、event、Job dry-run 输出和 self-test 已验证不打印测试 token;复杂审计仍待后续补齐。 | | 外部 secret manager | 未采用 | 如需 Vault/ExternalSecrets/SOPS,后续单独更新规格。 | diff --git a/docs/reference/spec-v01-services.md b/docs/reference/spec-v01-services.md index 9afd9dd..e66aea3 100644 --- a/docs/reference/spec-v01-services.md +++ b/docs/reference/spec-v01-services.md @@ -39,6 +39,19 @@ UniDesk or HWLAB client Runner inbound API 只允许本地或私有诊断,不作为业务客户端入口。业务客户端只能调用 `agentrun-mgr`。 +## 并行开发边界 + +`v0.1` 默认会由多个 agent 并行实现 manager、runner、backend、Secret、Postgres、CLI 和 CI/CD。架构必须主动降低并行冲突,而不是把所有组件都写进同一个入口文件、总表或巨型测试文件。 + +并行开发的长期规则: + +- 组件实现按目录分层:manager 写入 `src/mgr/**`,runner 写入 `src/runner/**`,backend 写入 `src/backend/**`,CLI 写入 `scripts/src/**`,部署写入 `deploy/**`。跨组件共享类型和工具只放 `src/common/**`,且变更必须保持小而稳定。 +- 自测试采用可发现 case 模型:`src/selftest/run.ts` 只负责发现和调度,公共 fixture 放 `src/selftest/harness.ts`,组件测试放 `src/selftest/cases/*.ts`。新增组件自测试应优先新增 case 文件,不应反复修改总入口。 +- 长期文档状态归属到对应 spec:组件实现状态优先维护在 `spec-v01-agentrun-mgr.md`、`spec-v01-agentrun-runner.md`、`spec-v01-backend-codex.md` 等组件 spec 中;`spec-v01-services.md` 只保留总览和跨组件边界,避免成为并行改动热点。 +- 对并行冲突的处理优先优化模块边界:如果多个 PR 反复冲突在同一文件或同一段状态表,应拆出可独立追加的模块、case、manifest、helper 或 spec 子项;只有确认边界已经合理后,才做普通冲突解析。 +- 共享 API 合同要先稳定再并行扩展:`RunRecord`、`CommandRecord`、`ExecutionPolicy`、`SecretRef`、`FailureKind` 等跨组件类型改动必须兼顾 manager、runner、backend、CLI、self-test 和 GitOps render,不得为单个组件引入临时字段绕开合同。 +- 过程性合并、冲突解析和临时验证证据进入 PR/issue;长期参考文档只记录稳定的并行开发原则、边界和判断标准。 + ## 服务总表 | 对象 | 类型 | v0.1 处理 | 说明 | 后续单独 spec | @@ -154,7 +167,7 @@ Manager 负责校验、保存和返回这些字段;runner 只能消费已保 | Codex backend 规格 | 已定义 | 见 [spec-v01-backend-codex.md](spec-v01-backend-codex.md)。 | | AgentRun CLI 规格 | 已定义 | 见 [spec-v01-cli.md](spec-v01-cli.md)。 | | Scheduler deferred 规格 | 已定义 | 见 [spec-v01-scheduler.md](spec-v01-scheduler.md)。 | -| `agentrun-mgr` 实现 | 未实现 | 需后续代码实现。 | -| `agentrun-runner` 实现 | 未实现 | 需后续代码实现。 | -| 第一真实 backend | 未实现 | 默认候选 Codex。 | +| `agentrun-mgr` 实现 | 已实现骨架 | 已有 REST API、Postgres durable store 选择、migration ledger、runner claim/lease/report、health/readiness 和 self-test memory 模式骨架;仍需 G14 `agentrun-v01` 真实 Postgres/GitOps 验收。 | +| `agentrun-runner` 实现 | 已实现骨架 | 已有 host process runner 与 Kubernetes Job dry-run 渲染骨架,runner 通过 manager API claim/poll/report,不直连 Postgres;真实 Kubernetes Job 联调仍需验收。 | +| 第一真实 backend | 已实现骨架 | Codex app-server stdio backend 已有协议、失败分类、脱敏和 fake self-test;真实 Codex provider turn 仍需综合联调验收。 | | 自动 scheduler | Deferred | 不作为 `v0.1` 第一阶段验收目标。 | diff --git a/docs/reference/spec-v01-validation.md b/docs/reference/spec-v01-validation.md index 7624b09..c032c07 100644 --- a/docs/reference/spec-v01-validation.md +++ b/docs/reference/spec-v01-validation.md @@ -26,6 +26,8 @@ 自测试应使用 Bun + TypeScript 运行,Codex 相关自测试可以使用 fake app-server JSON-RPC client 模拟 `initialize`、`thread/start`、`thread/resume`、`turn/start`、assistant 输出、协议错误、timeout 和 transport close。 +仓库内自测试入口必须保持可并行扩展:`src/selftest/run.ts` 只负责发现和调度 `src/selftest/cases/*.ts`,公共 fixture 和断言辅助放 `src/selftest/harness.ts`。新增组件自测试优先新增独立 case 文件,避免多个并行任务修改同一个总入口导致反复冲突。若某个 case 需要跨组件验证,可以依赖正式 manager/runner/backend API,但仍应作为独立 case 命名和维护。 + 自测试输出可以是 Tekton TaskRun result、JUnit、tap、JSON summary 或普通日志,但不得回写 source branch,也不得伪装成综合联调通过。 ## 综合联调 diff --git a/package.json b/package.json index ea5c1f2..32cb657 100644 --- a/package.json +++ b/package.json @@ -9,8 +9,13 @@ "test": "bun run src/selftest/run.ts", "cli": "bun scripts/agentrun-cli.ts" }, + "dependencies": { + "pg": "^8.13.1" + }, "devDependencies": { + "@types/pg": "^8.11.10", "@types/node": "^22.10.0", + "tsx": "^4.19.2", "typescript": "^5.8.3" } } diff --git a/scripts/src/cli.ts b/scripts/src/cli.ts index 169838c..2136102 100644 --- a/scripts/src/cli.ts +++ b/scripts/src/cli.ts @@ -1,8 +1,11 @@ import { readFile } from "node:fs/promises"; import { startManagerServer } from "../../src/mgr/server.js"; +import { MemoryAgentRunStore } from "../../src/mgr/store.js"; import { ManagerClient } from "../../src/mgr/client.js"; import { runOnce } from "../../src/runner/run-once.js"; -import type { JsonRecord, JsonValue } from "../../src/common/types.js"; +import { renderRunnerJobDryRun } from "../../src/runner/k8s-job.js"; +import { renderCodexProviderSecretPlan } from "./secret-render.js"; +import type { JsonRecord, JsonValue, RunRecord } from "../../src/common/types.js"; import { AgentRunError, errorToJson } from "../../src/common/errors.js"; import type { RunnerOnceOptions } from "../../src/runner/run-once.js"; @@ -28,6 +31,7 @@ async function dispatch(args: ParsedArgs): Promise { if (group === "server" && command === "start") return startServer(args); if (group === "server" && command === "status") return client(args).get("/health/readiness"); if (group === "backends" && command === "list") return client(args).get("/api/v1/backends"); + if (group === "secrets" && command === "codex" && id === "render") return renderCodexSecret(args); if (group === "runs" && command === "create") return client(args).post("/api/v1/runs", await jsonFile(args)); if (group === "runs" && command === "show" && id) return client(args).get(`/api/v1/runs/${encodeURIComponent(id)}`); if (group === "runs" && command === "events" && id) return client(args).get(`/api/v1/runs/${encodeURIComponent(id)}/events?afterSeq=${flag(args, "after-seq", "0")}&limit=${flag(args, "limit", "100")}`); @@ -58,14 +62,62 @@ async function dispatch(args: ParsedArgs): Promise { if (codexHome) options.codexHome = codexHome; return runOnce(options) as unknown as JsonValue; } + if (group === "runner" && command === "job") return renderRunnerJob(args); throw new AgentRunError("schema-invalid", `unsupported command: ${args.positional.join(" ")}`, { httpStatus: 2 }); } +async function renderRunnerJob(args: ParsedArgs): Promise { + if (args.flags.get("dry-run") !== true) throw new AgentRunError("schema-invalid", "runner job only supports --dry-run in v0.1", { httpStatus: 2 }); + const runId = flag(args, "run-id", ""); + const commandId = flag(args, "command-id", ""); + const image = flag(args, "image", ""); + if (!runId) throw new AgentRunError("schema-invalid", "runner job requires --run-id", { httpStatus: 2 }); + if (!commandId) throw new AgentRunError("schema-invalid", "runner job requires --command-id", { httpStatus: 2 }); + if (!image) throw new AgentRunError("schema-invalid", "runner job requires --image", { httpStatus: 2 }); + const run = await client(args).get(`/api/v1/runs/${encodeURIComponent(runId)}`) as RunRecord; + const options = { + run, + commandId, + image, + managerUrl: managerUrl(args), + namespace: optionalFlag(args, "namespace") ?? "agentrun-v01", + }; + const attemptId = optionalFlag(args, "attempt-id"); + const runnerId = optionalFlag(args, "runner-id"); + const sourceCommit = optionalFlag(args, "source-commit"); + return renderRunnerJobDryRun({ + ...options, + ...(attemptId ? { attemptId } : {}), + ...(runnerId ? { runnerId } : {}), + ...(sourceCommit ? { sourceCommit } : {}), + }); +} + +async function renderCodexSecret(args: ParsedArgs): Promise { + if (args.flags.get("dry-run") !== true) { + throw new AgentRunError("schema-invalid", "secrets codex render requires --dry-run", { httpStatus: 2 }); + } + const options: Parameters[0] = { dryRun: true }; + const codexHome = optionalFlag(args, "codex-home"); + const authFile = optionalFlag(args, "auth-file"); + const configFile = optionalFlag(args, "config-file"); + const namespace = optionalFlag(args, "namespace"); + const secretName = optionalFlag(args, "secret-name"); + if (codexHome) options.codexHome = codexHome; + if (authFile) options.authFile = authFile; + if (configFile) options.configFile = configFile; + if (namespace) options.namespace = namespace; + if (secretName) options.secretName = secretName; + return renderCodexProviderSecretPlan(options); +} + async function startServer(args: ParsedArgs): Promise { const port = Number(flag(args, "port", "8080")); const host = flag(args, "host", "0.0.0.0"); - const started = await startManagerServer({ port, host }); - return { serviceId: "agentrun-mgr", baseUrl: started.baseUrl, pid: process.pid, note: "foreground process; use Kubernetes/Tekton for v0.1 runtime" }; + const storeMode = optionalFlag(args, "store") ?? process.env.AGENTRUN_STORE ?? process.env.AGENTRUN_MGR_STORE; + const started = await startManagerServer({ port, host, ...(storeMode === "memory" ? { store: new MemoryAgentRunStore() } : {}) }); + const database = await started.store.health(); + return { serviceId: "agentrun-mgr", baseUrl: started.baseUrl, pid: process.pid, database, note: "foreground process; use Kubernetes/Tekton for v0.1 runtime" }; } function client(args: ParsedArgs): ManagerClient { @@ -123,6 +175,8 @@ function help(): JsonRecord { "commands create --type turn --json-file ", "commands show --run-id ", "runner start --run-id ", + "runner job --dry-run --run-id --command-id --image ", + "secrets codex render --dry-run [--codex-home ] [--namespace agentrun-v01] [--secret-name agentrun-v01-provider-codex]", "backends list", "server start|status", ], diff --git a/scripts/src/secret-render.ts b/scripts/src/secret-render.ts new file mode 100644 index 0000000..e889b38 --- /dev/null +++ b/scripts/src/secret-render.ts @@ -0,0 +1,194 @@ +import { createHash } from "node:crypto"; +import { constants as fsConstants } from "node:fs"; +import { access, readFile } from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; +import type { JsonRecord } from "../../src/common/types.js"; +import { AgentRunError } from "../../src/common/errors.js"; + +export interface CodexSecretRenderOptions { + codexHome?: string; + authFile?: string; + configFile?: string; + namespace?: string; + secretName?: string; + dryRun?: boolean; +} + +interface SecretFileSummary extends JsonRecord { + key: "auth.json" | "config.toml"; + source: string; + bytes: number; + sha256: string; + contentHash: string; +} + +interface SecretSourceFile { + key: "auth.json" | "config.toml"; + path: string; + validate: (content: string, file: string) => unknown; +} + +const defaultNamespace = "agentrun-v01"; +const defaultSecretName = "agentrun-v01-provider-codex"; +const secretKeys = ["auth.json", "config.toml"] as const; +const credentialKeyPattern = /(?:api[_-]?key|token|password|secret|credential|authorization|auth)/iu; + +export async function renderCodexProviderSecretPlan(options: CodexSecretRenderOptions = {}): Promise { + if (options.dryRun === false) { + throw new AgentRunError("schema-invalid", "Codex provider Secret rendering only supports --dry-run in v0.1", { httpStatus: 2 }); + } + + const namespace = nonEmpty(options.namespace, defaultNamespace); + const secretName = nonEmpty(options.secretName, defaultSecretName); + const codexHome = resolvePath(nonEmpty(options.codexHome, path.join(os.homedir(), ".codex"))); + const sources: SecretSourceFile[] = [ + { key: "auth.json", path: resolvePath(options.authFile ?? path.join(codexHome, "auth.json")), validate: validateAuthJson }, + { key: "config.toml", path: resolvePath(options.configFile ?? path.join(codexHome, "config.toml")), validate: validateConfigToml }, + ]; + + const files: SecretFileSummary[] = []; + const hash = createHash("sha256"); + for (const source of sources) { + const content = await readSecretInput(source); + const bytes = Buffer.byteLength(content, "utf8"); + if (bytes === 0) throw new AgentRunError("secret-unavailable", `${source.key} is empty`, { httpStatus: 2, details: { key: source.key, path: source.path } }); + source.validate(content, source.path); + const sha256 = sha256Hex(content); + hash.update(source.key); + hash.update("\0"); + hash.update(content, "utf8"); + hash.update("\0"); + files.push({ key: source.key, source: source.path, bytes, sha256, contentHash: `sha256:${sha256}` }); + } + + const manifestSummary: JsonRecord = { + apiVersion: "v1", + kind: "Secret", + metadata: { namespace, name: secretName }, + type: "Opaque", + dataKeys: [...secretKeys], + dataRedacted: true, + }; + + return { + mode: "dry-run", + writeAttempted: false, + namespace, + secretName, + keys: [...secretKeys], + totalBytes: files.reduce((sum, file) => sum + file.bytes, 0), + sha256: hash.digest("hex"), + files, + manifestSummary, + apply: { + attempted: false, + command: `kubectl create secret generic ${secretName} -n ${namespace} --from-file=auth.json= --from-file=config.toml= --dry-run=client -o yaml | kubectl apply -f -`, + note: "本命令只展示形状;v0.1 工具不会执行 kubectl apply,也不会输出 Secret data。", + }, + redaction: { + secretValuesPrinted: false, + manifestDataPrinted: false, + configTomlPrinted: false, + authJsonPrinted: false, + }, + }; +} + +async function readSecretInput(source: SecretSourceFile): Promise { + try { + await access(source.path, fsConstants.R_OK); + return await readFile(source.path, "utf8"); + } catch (error) { + if (isNodeError(error, "ENOENT")) { + throw new AgentRunError("secret-unavailable", `${source.key} is missing`, { httpStatus: 2, details: { key: source.key, path: source.path } }); + } + if (isNodeError(error, "EACCES") || isNodeError(error, "EPERM")) { + throw new AgentRunError("secret-unavailable", `${source.key} is not readable`, { httpStatus: 2, details: { key: source.key, path: source.path } }); + } + throw error; + } +} + +function validateAuthJson(content: string, file: string): unknown { + let parsed: unknown; + try { + parsed = JSON.parse(content); + } catch { + throw new AgentRunError("schema-invalid", "auth.json is not valid JSON", { httpStatus: 2, details: { key: "auth.json", path: file } }); + } + if (typeof parsed !== "object" || parsed === null || Array.isArray(parsed)) { + throw new AgentRunError("schema-invalid", "auth.json must contain a JSON object", { httpStatus: 2, details: { key: "auth.json", path: file } }); + } + const emptyField = findEmptyCredentialField(parsed); + if (emptyField) { + throw new AgentRunError("secret-unavailable", "auth.json contains an empty credential field", { httpStatus: 2, details: { key: "auth.json", path: file, field: emptyField } }); + } + if (!hasNonEmptyCredentialField(parsed)) { + throw new AgentRunError("secret-unavailable", "auth.json does not contain any non-empty credential field", { httpStatus: 2, details: { key: "auth.json", path: file } }); + } + return parsed; +} + +function validateConfigToml(content: string, file: string): unknown { + const parser = (globalThis as typeof globalThis & { Bun?: { TOML?: { parse?: (value: string) => unknown } } }).Bun?.TOML?.parse; + if (typeof parser !== "function") { + throw new AgentRunError("infra-failed", "Bun TOML parser is unavailable", { httpStatus: 1, details: { key: "config.toml", path: file } }); + } + let parsed: unknown; + try { + parsed = parser(content); + } catch { + throw new AgentRunError("schema-invalid", "config.toml is not valid TOML", { httpStatus: 2, details: { key: "config.toml", path: file } }); + } + const emptyField = findEmptyCredentialField(parsed); + if (emptyField) { + throw new AgentRunError("secret-unavailable", "config.toml contains an empty credential field", { httpStatus: 2, details: { key: "config.toml", path: file, field: emptyField } }); + } + return parsed; +} + +function hasNonEmptyCredentialField(value: unknown): boolean { + if (Array.isArray(value)) return value.some((item) => hasNonEmptyCredentialField(item)); + if (typeof value !== "object" || value === null) return false; + return Object.entries(value).some(([key, entry]) => { + if (credentialKeyPattern.test(key) && typeof entry === "string" && entry.trim().length > 0) return true; + return hasNonEmptyCredentialField(entry); + }); +} + +function findEmptyCredentialField(value: unknown, trail: string[] = []): string | null { + if (Array.isArray(value)) { + for (let index = 0; index < value.length; index += 1) { + const found = findEmptyCredentialField(value[index], [...trail, String(index)]); + if (found) return found; + } + return null; + } + if (typeof value !== "object" || value === null) return null; + for (const [key, entry] of Object.entries(value)) { + const nextTrail = [...trail, key]; + if (credentialKeyPattern.test(key) && (entry === null || (typeof entry === "string" && entry.trim().length === 0))) return nextTrail.join("."); + const found = findEmptyCredentialField(entry, nextTrail); + if (found) return found; + } + return null; +} + +function nonEmpty(value: string | undefined, fallback: string): string { + return typeof value === "string" && value.length > 0 ? value : fallback; +} + +function resolvePath(file: string): string { + if (file === "~") return os.homedir(); + if (file.startsWith("~/")) return path.join(os.homedir(), file.slice(2)); + return path.resolve(file); +} + +function sha256Hex(content: string): string { + return createHash("sha256").update(content, "utf8").digest("hex"); +} + +function isNodeError(error: unknown, code: string): boolean { + return typeof error === "object" && error !== null && "code" in error && (error as { code?: unknown }).code === code; +} diff --git a/src/backend/adapter.ts b/src/backend/adapter.ts index d12ff54..cb8115c 100644 --- a/src/backend/adapter.ts +++ b/src/backend/adapter.ts @@ -21,6 +21,7 @@ export async function runBackendTurn(run: RunRecord, command: CommandRecord, opt timeoutMs: run.executionPolicy.timeoutMs, }; if (typeof command.payload.model === "string") turnOptions.model = command.payload.model; + if (typeof command.payload.threadId === "string") turnOptions.threadId = command.payload.threadId; if (options.codexCommand) turnOptions.command = options.codexCommand; if (options.codexArgs) turnOptions.args = options.codexArgs; if (options.env) turnOptions.env = options.env; diff --git a/src/backend/codex-stdio.ts b/src/backend/codex-stdio.ts index c82365c..83ecbd4 100644 --- a/src/backend/codex-stdio.ts +++ b/src/backend/codex-stdio.ts @@ -1,13 +1,40 @@ import { spawn, type ChildProcessWithoutNullStreams } from "node:child_process"; +import { createHash } from "node:crypto"; import { accessSync, constants as fsConstants } from "node:fs"; +import path from "node:path"; import * as readline from "node:readline"; import type { BackendEvent, BackendTurnResult, FailureKind, JsonRecord, JsonValue, TerminalStatus } from "../common/types.js"; import { redactJson, redactText } from "../common/redaction.js"; +const codexProtocol = "codex-app-server-jsonrpc-stdio"; +const defaultCodexArgs = ["app-server", "--listen", "stdio://"]; +const stderrBufferBytes = 64_000; +const stderrEventChars = 4_000; +const requestTimeoutCapMs = 30_000; + +const childEnvSummaryKeys = [ + "CODEX_HOME", + "HOME", + "PATH", + "HTTP_PROXY", + "HTTPS_PROXY", + "ALL_PROXY", + "NO_PROXY", + "http_proxy", + "https_proxy", + "all_proxy", + "no_proxy", + "OPENAI_API_KEY", + "CODEX_API_KEY", + "GITHUB_TOKEN", + "GH_TOKEN", +]; + export interface CodexStdioTurnOptions { prompt: string; cwd: string; model?: string; + threadId?: string; approvalPolicy: string; sandbox: string; timeoutMs: number; @@ -18,48 +45,87 @@ export interface CodexStdioTurnOptions { } interface PendingRequest { + method: string; + timer: NodeJS.Timeout; resolve: (value: unknown) => void; reject: (error: Error) => void; } +interface CodexStdioCloseInfo extends JsonRecord { + code: number | null; + signal: string | null; + stderrTail: string; + stderrBytes: number; + stderrTruncated: boolean; + failureKind: FailureKind | null; + message: string | null; +} + +class CodexStdioFailure extends Error { + readonly failureKind: FailureKind; + readonly phase: string; + readonly details: JsonRecord; + + constructor(failureKind: FailureKind, message: string, phase: string, details: JsonRecord = {}) { + super(redactText(message)); + this.name = "CodexStdioFailure"; + this.failureKind = failureKind; + this.phase = phase; + this.details = redactJson(details); + } +} + export class CodexStdioClient { private readonly child: ChildProcessWithoutNullStreams; private readonly pending = new Map(); - private readonly stderrChunks: Buffer[] = []; + private stderrTailBuffer = Buffer.alloc(0); + private stderrBytes = 0; private nextId = 1; private closed = false; - readonly closedPromise: Promise<{ code: number | null; signal: string | null; stderrTail: string }>; - private closeResolve!: (value: { code: number | null; signal: string | null; stderrTail: string }) => void; + private closeFailure: CodexStdioFailure | null = null; + readonly closedPromise: Promise; + private closeResolve!: (value: CodexStdioCloseInfo) => void; constructor(options: { command?: string; args?: string[]; cwd: string; env?: NodeJS.ProcessEnv; onNotification: (message: JsonRecord) => void }) { this.closedPromise = new Promise((resolve) => { this.closeResolve = resolve; }); - this.child = spawn(options.command ?? "codex", options.args ?? ["app-server", "--listen", "stdio://"], { - cwd: options.cwd, - env: options.env ?? process.env, - stdio: "pipe", - }); - this.child.stderr.on("data", (chunk: Buffer) => { - this.stderrChunks.push(chunk); - while (Buffer.concat(this.stderrChunks).length > 64_000) this.stderrChunks.shift(); - }); + const command = options.command ?? "codex"; + const args = options.args ?? defaultCodexArgs; + try { + this.child = spawn(command, args, { + cwd: options.cwd, + env: options.env ?? process.env, + stdio: "pipe", + }); + } catch (error) { + throw spawnFailure(command, error); + } + this.child.stderr.on("data", (chunk: Buffer) => this.appendStderr(chunk)); const rl = readline.createInterface({ input: this.child.stdout, crlfDelay: Infinity }); void this.readLines(rl, options.onNotification); this.child.on("close", (code, signal) => this.handleClose(code, signal)); - this.child.on("error", (error) => this.handleClose(127, error.message)); + this.child.on("error", (error) => this.handleClose(127, null, spawnFailure(command, error))); } - request(method: string, params: JsonRecord): Promise { - if (this.closed) return Promise.reject(new Error("codex app-server is closed")); + request(method: string, params: JsonRecord, timeoutMs = requestTimeoutCapMs): Promise { + if (this.closed) return Promise.reject(this.closeFailure ?? new CodexStdioFailure("backend-failed", "codex app-server is closed", `request:${method}`)); const id = this.nextId++; const message = { id, method, params }; + const effectiveTimeoutMs = positiveTimeout(timeoutMs); return new Promise((resolve, reject) => { - this.pending.set(id, { resolve, reject }); - this.child.stdin.write(`${JSON.stringify(message)}\n`); + const timer = setTimeout(() => { + this.rejectRequest(id, new CodexStdioFailure("backend-timeout", `Codex stdio request ${method} timed out after ${effectiveTimeoutMs}ms`, `request:${method}`, { method, timeoutMs: effectiveTimeoutMs })); + }, effectiveTimeoutMs); + this.pending.set(id, { method, timer, resolve, reject }); + this.child.stdin.write(`${JSON.stringify(message)}\n`, "utf8", (error: Error | null | undefined) => { + if (!error) return; + this.rejectRequest(id, new CodexStdioFailure("backend-failed", `failed to write Codex stdio request ${method}: ${error.message}`, `request:${method}`, { method })); + }); }); } notify(method: string, params: JsonRecord = {}): void { - if (!this.closed) this.child.stdin.write(`${JSON.stringify({ method, params })}\n`); + if (this.closed) return; + this.child.stdin.write(`${JSON.stringify({ method, params })}\n`, "utf8", () => undefined); } stop(): void { @@ -70,33 +136,65 @@ export class CodexStdioClient { }, 1500).unref?.(); } + private appendStderr(chunk: Buffer): void { + this.stderrBytes += chunk.byteLength; + const next = Buffer.concat([this.stderrTailBuffer, chunk]); + this.stderrTailBuffer = next.byteLength > stderrBufferBytes ? next.subarray(next.byteLength - stderrBufferBytes) : next; + } + private async readLines(rl: readline.Interface, onNotification: (message: JsonRecord) => void): Promise { try { for await (const line of rl) { const trimmed = String(line).trim(); if (trimmed.length === 0) continue; - const message = JSON.parse(trimmed) as JsonRecord; - const id = typeof message.id === "number" ? message.id : null; - const method = typeof message.method === "string" ? message.method : null; - if (id !== null && method === null) { - const pending = this.pending.get(id); - if (!pending) continue; - this.pending.delete(id); - if (message.error !== undefined) pending.reject(new Error(JSON.stringify(redactJson(message.error)))); - else pending.resolve(message.result); - continue; + let message: JsonRecord; + try { + message = JSON.parse(trimmed) as JsonRecord; + } catch { + this.handleProtocolFailure(new CodexStdioFailure("backend-json-parse-error", "codex app-server emitted invalid JSON on stdout", "stdout:parse", { linePreview: redactText(trimmed.slice(0, 800)), lineChars: trimmed.length })); + break; } - if (id !== null && method !== null) { - this.handleServerRequest(id, method); - continue; - } - if (method !== null) onNotification(message); + this.handleMessage(message, onNotification); } } catch (error) { - this.rejectAll(error instanceof Error ? error : new Error(String(error))); + this.handleProtocolFailure(new CodexStdioFailure("backend-protocol-error", error instanceof Error ? error.message : String(error), "stdout:read")); } } + private handleMessage(message: JsonRecord, onNotification: (message: JsonRecord) => void): void { + const id = typeof message.id === "number" ? message.id : null; + const method = typeof message.method === "string" ? message.method : null; + if (id !== null && method === null) { + this.handleResponse(id, message); + return; + } + if (id !== null && method !== null) { + this.handleServerRequest(id, method); + return; + } + if (method !== null) { + onNotification(message); + return; + } + this.handleProtocolFailure(new CodexStdioFailure("backend-response-invalid", "codex app-server message had neither JSON-RPC id nor method", "stdout:message", { message })); + } + + private handleResponse(id: number, message: JsonRecord): void { + const pending = this.pending.get(id); + if (!pending) return; + this.pending.delete(id); + clearTimeout(pending.timer); + if (message.error !== undefined) { + pending.reject(failureFromRpcError(pending.method, message.error)); + return; + } + if (!("result" in message)) { + pending.reject(new CodexStdioFailure("backend-response-invalid", `codex app-server response for ${pending.method} omitted result and error`, `response:${pending.method}`, { method: pending.method })); + return; + } + pending.resolve(message.result); + } + private handleServerRequest(id: number, method: string): void { if (method === "item/commandExecution/requestApproval" || method === "item/fileChange/requestApproval") { this.child.stdin.write(`${JSON.stringify({ id, result: { decision: "decline" } })}\n`); @@ -105,92 +203,172 @@ export class CodexStdioClient { this.child.stdin.write(`${JSON.stringify({ id, error: { code: -32601, message: `Unsupported client-side request: ${method}` } })}\n`); } + private rejectRequest(id: number, error: CodexStdioFailure): void { + const pending = this.pending.get(id); + if (!pending) return; + this.pending.delete(id); + clearTimeout(pending.timer); + pending.reject(error); + } + private rejectAll(error: Error): void { - for (const pending of this.pending.values()) pending.reject(error); + for (const pending of this.pending.values()) { + clearTimeout(pending.timer); + pending.reject(error); + } this.pending.clear(); } - private handleClose(code: number | null, signal: string | null): void { + private handleProtocolFailure(error: CodexStdioFailure): void { + if (this.closed) return; + this.closeFailure = error; + this.rejectAll(error); + this.stop(); + } + + private handleClose(code: number | null, signal: string | null, failure: CodexStdioFailure | null = null): void { if (this.closed) return; this.closed = true; - const stderrTail = redactText(Buffer.concat(this.stderrChunks).toString("utf8").slice(-8000)); - this.rejectAll(new Error(`codex app-server closed code=${code} signal=${signal}`)); - this.closeResolve({ code, signal, stderrTail }); + if (failure) this.closeFailure = failure; + const stderr = this.stderrInfo(); + const closeInfo: CodexStdioCloseInfo = { + code, + signal, + stderrTail: stderr.stderrTail, + stderrBytes: this.stderrBytes, + stderrTruncated: stderr.stderrTruncated, + failureKind: this.closeFailure?.failureKind ?? null, + message: this.closeFailure?.message ?? null, + }; + this.rejectAll(this.closeFailure ?? new CodexStdioFailure("backend-failed", `codex app-server closed code=${code} signal=${signal}`, "process:close", closeInfo)); + this.closeResolve(closeInfo); + } + + private stderrInfo(): { stderrTail: string; stderrTruncated: boolean } { + const buffered = this.stderrTailBuffer.toString("utf8"); + const tail = buffered.slice(-8000); + return { + stderrTail: redactText(tail), + stderrTruncated: this.stderrBytes > this.stderrTailBuffer.byteLength || buffered.length > tail.length, + }; } } export async function runCodexStdioTurn(options: CodexStdioTurnOptions): Promise { - const secretFailure = codexHomeReadiness(options.codexHome ?? options.env?.CODEX_HOME ?? `${options.env?.HOME ?? process.env.HOME ?? ""}/.codex`); + const codexHome = resolveCodexHome(options); + const secretFailure = codexHomeReadiness(codexHome); if (secretFailure) return secretFailure; - const events: BackendEvent[] = [{ type: "backend_status", payload: { phase: "codex-app-server-starting", protocol: "codex-app-server-jsonrpc-stdio" } }]; + const env = childEnv(options, codexHome); + const events: BackendEvent[] = [{ + type: "backend_status", + payload: { + phase: "codex-app-server-starting", + protocol: codexProtocol, + runtime: runtimeSummary(options, env, codexHome), + }, + }]; let assistantText = ""; - let threadId: string | undefined; + let threadId: string | undefined = options.threadId; let turnId: string | undefined; let terminal: { status: TerminalStatus; failureKind: FailureKind | null; message: string | null } | null = null; let terminalResolve!: () => void; const terminalPromise = new Promise((resolve) => { terminalResolve = resolve; }); - const clientOptions: ConstructorParameters[0] = { - cwd: options.cwd, - env: childEnv(options), - onNotification: (message) => { - const normalized = normalizeCodexNotification(message); - if (normalized.threadId) threadId = normalized.threadId; - if (normalized.turnId) turnId = normalized.turnId; - if (normalized.assistantDelta) assistantText += normalized.assistantDelta; - events.push(...normalized.events); - if (normalized.terminal) { - terminal = normalized.terminal; - terminalResolve(); - } - }, - }; - if (options.command) clientOptions.command = options.command; - if (options.args) clientOptions.args = options.args; - const client = new CodexStdioClient(clientOptions); + let client: CodexStdioClient | null = null; + const requestTimeoutMs = Math.min(positiveTimeout(options.timeoutMs), requestTimeoutCapMs); const timeout = setTimeout(() => { - if (!terminal) { - terminal = { status: "failed", failureKind: "backend-timeout", message: `codex stdio turn timed out after ${options.timeoutMs}ms` }; - events.push({ type: "error", payload: terminal }); - client.stop(); - terminalResolve(); - } - }, options.timeoutMs); + if (terminal) return; + terminal = { status: "failed", failureKind: "backend-timeout", message: `codex stdio turn timed out after ${options.timeoutMs}ms` }; + events.push({ type: "error", payload: { failureKind: terminal.failureKind, message: terminal.message, phase: "turn:timeout" } }); + client?.stop(); + terminalResolve(); + }, positiveTimeout(options.timeoutMs)); try { - await client.request("initialize", { clientInfo: { name: "agentrun", title: "AgentRun", version: "0.1.0" }, capabilities: { experimentalApi: true } }); - const threadResponse = asResponseRecord(await client.request("thread/start", { model: options.model ?? "default", cwd: options.cwd, approvalPolicy: options.approvalPolicy, sandbox: options.sandbox, serviceName: "agentrun" })); - threadId = stringAt(asRecordAt(threadResponse, "thread"), "id") ?? threadId; - const turnResponse = asResponseRecord(await client.request("turn/start", { threadId: threadId ?? "", input: [{ type: "text", text: options.prompt, text_elements: [] }], cwd: options.cwd, approvalPolicy: options.approvalPolicy, model: options.model ?? "default" })); - turnId = stringAt(asRecordAt(turnResponse, "turn"), "id") ?? turnId; - await Promise.race([terminalPromise, client.closedPromise]); - if (!terminal) terminal = { status: "failed", failureKind: "backend-protocol-error", message: "codex app-server closed before turn/completed" }; + const clientOptions: ConstructorParameters[0] = { + cwd: options.cwd, + env, + onNotification: (message) => { + const normalized = normalizeCodexNotification(message); + if (normalized.threadId) threadId = normalized.threadId; + if (normalized.turnId) turnId = normalized.turnId; + if (normalized.assistantDelta) assistantText += normalized.assistantDelta; + events.push(...normalized.events); + if (normalized.terminal && !terminal) { + terminal = normalized.terminal; + terminalResolve(); + } + }, + }; + if (options.command) clientOptions.command = options.command; + if (options.args) clientOptions.args = options.args; + client = new CodexStdioClient(clientOptions); + const initializeResult = requireResponseRecord(await client.request("initialize", { clientInfo: { name: "agentrun", title: "AgentRun", version: "0.1.0" }, capabilities: { experimentalApi: true } }, requestTimeoutMs), "initialize"); + validateInitializeResponse(initializeResult); + client.notify("initialized", {}); + events.push({ type: "backend_status", payload: { phase: "initialize:completed", protocol: codexProtocol } }); + + const threadMethod = options.threadId ? "thread/resume" : "thread/start"; + const threadParams: JsonRecord = options.threadId + ? { threadId: options.threadId, model: options.model ?? "default", cwd: options.cwd, approvalPolicy: options.approvalPolicy, sandbox: options.sandbox } + : { model: options.model ?? "default", cwd: options.cwd, approvalPolicy: options.approvalPolicy, sandbox: options.sandbox, serviceName: "agentrun" }; + const threadResponse = requireResponseRecord(await client.request(threadMethod, threadParams, requestTimeoutMs), threadMethod); + threadId = requireNestedId(threadResponse, threadMethod, "thread"); + events.push({ type: "backend_status", payload: { phase: `${threadMethod}:completed`, threadId } }); + + const turnResponse = requireResponseRecord(await client.request("turn/start", { threadId, input: textInput(options.prompt), cwd: options.cwd, approvalPolicy: options.approvalPolicy, model: options.model ?? "default" }, requestTimeoutMs), "turn/start"); + turnId = requireNestedId(turnResponse, "turn/start", "turn"); + events.push({ type: "backend_status", payload: { phase: "turn/start:completed", turnId } }); + + const race = await Promise.race([ + terminalPromise.then(() => ({ kind: "terminal" as const })), + client.closedPromise.then((closeInfo) => ({ kind: "closed" as const, closeInfo })), + ]); + if (race.kind === "closed" && !terminal) { + terminal = terminalFromClose(race.closeInfo); + events.push({ type: "error", payload: { failureKind: terminal.failureKind, message: terminal.message, phase: "transport:closed-before-terminal", appServerExit: closeEvent(race.closeInfo) } }); + } + if (!terminal) terminal = { status: "failed", failureKind: "backend-response-invalid", message: "codex app-server did not emit turn/completed" }; } catch (error) { - terminal = { status: "failed", failureKind: "backend-protocol-error", message: error instanceof Error ? error.message : String(error) }; - events.push({ type: "error", payload: { failureKind: terminal.failureKind, message: terminal.message } }); + if (!terminal) { + const failure = normalizeFailure(error); + terminal = { status: failure.failureKind === "secret-unavailable" ? "blocked" : "failed", failureKind: failure.failureKind, message: failure.message }; + events.push({ type: "error", payload: { failureKind: failure.failureKind, message: failure.message, phase: failure.phase, details: failure.details } }); + } } finally { clearTimeout(timeout); - client.stop(); + if (client) { + client.stop(); + const closeInfo = await client.closedPromise; + events.push({ type: "backend_status", payload: { phase: "codex-app-server-closed", appServerExit: closeEvent(closeInfo) } }); + } } + if (!terminal) terminal = { status: "failed", failureKind: "backend-response-invalid", message: "codex app-server finished without terminal status" }; if (assistantText.trim().length > 0) events.push({ type: "assistant_message", payload: { text: assistantText } }); events.push({ type: "terminal_status", payload: { terminalStatus: terminal.status, failureKind: terminal.failureKind, message: terminal.message } }); return { terminalStatus: terminal.status, failureKind: terminal.failureKind, failureMessage: terminal.message, events: events.map((event) => ({ ...event, payload: redactJson(event.payload) })), ...(threadId ? { threadId } : {}), ...(turnId ? { turnId } : {}) }; } function codexHomeReadiness(codexHome: string): BackendTurnResult | null { - try { - accessSync(`${codexHome}/auth.json`, fsConstants.R_OK); - accessSync(`${codexHome}/config.toml`, fsConstants.R_OK); - return null; - } catch { - return { - terminalStatus: "blocked", - failureKind: "secret-unavailable", - failureMessage: "Codex auth.json or config.toml projection is not readable", - events: [ - { type: "error", payload: { failureKind: "secret-unavailable", credentialSource: { codexHome, valuesPrinted: false } } }, - { type: "terminal_status", payload: { terminalStatus: "blocked", failureKind: "secret-unavailable" } }, - ], - }; - } + const auth = fileReadable(`${codexHome}/auth.json`); + const config = fileReadable(`${codexHome}/config.toml`); + if (auth.readable && config.readable) return null; + const payload = { + failureKind: "secret-unavailable", + projection: { + codexHome: pathSummary(codexHome), + authJson: auth, + configToml: config, + valuesPrinted: false, + }, + } satisfies JsonRecord; + return { + terminalStatus: "blocked", + failureKind: "secret-unavailable", + failureMessage: "Codex auth.json or config.toml projection is not readable", + events: [ + { type: "error", payload }, + { type: "terminal_status", payload: { terminalStatus: "blocked", failureKind: "secret-unavailable" } }, + ], + }; } function normalizeCodexNotification(message: JsonRecord): { events: BackendEvent[]; assistantDelta?: string; threadId?: string; turnId?: string; terminal?: { status: TerminalStatus; failureKind: FailureKind | null; message: string | null } } { @@ -207,33 +385,163 @@ function normalizeCodexNotification(message: JsonRecord): { events: BackendEvent if (method === "item/agentMessage/delta") return { events: [], assistantDelta: typeof params.delta === "string" ? params.delta : "" }; if (method === "item/commandExecution/outputDelta") return { events: [{ type: "command_output", payload: { stream: "stdout", text: typeof params.delta === "string" ? params.delta : "" } }] }; if (method === "item/started" || method === "item/completed") return { events: [{ type: "tool_call", payload: { method, item: redactJson(asRecordAt(params, "item")) } }] }; - if (method === "error") return { events: [{ type: "error", payload: { failureKind: "backend-failed", error: redactJson(params.error ?? params) } }] }; + if (method === "error") { + const error = asRecordAt(params, "error"); + const messageText = typeof error.message === "string" ? error.message : "Codex app-server error"; + const failureKind = classifyMessageFailureKind(messageText, "backend-failed"); + const terminal = params.willRetry === true ? undefined : { status: "failed" as const, failureKind, message: redactText(messageText) }; + return { events: [{ type: "error", payload: { failureKind, error: redactJson(error), willRetry: params.willRetry === true } }], ...(terminal ? { terminal } : {}) }; + } if (method === "turn/completed") { const turn = asRecordAt(params, "turn"); + if (typeof turn.status !== "string") { + return { events: [{ type: "error", payload: { failureKind: "backend-response-invalid", message: "turn/completed notification omitted turn.status" } }], terminal: { status: "failed", failureKind: "backend-response-invalid", message: "turn/completed notification omitted turn.status" } }; + } const status = terminalStatusFromValue(turn.status); const error = asRecordAt(turn, "error"); - const messageText = typeof error.message === "string" ? error.message : null; - return { events: [{ type: "backend_status", payload: { phase: method, terminalStatus: status } }], terminal: { status, failureKind: status === "completed" ? null : "backend-failed", message: messageText } }; + const messageText = typeof error.message === "string" ? redactText(error.message) : null; + return { events: [{ type: "backend_status", payload: { phase: method, terminalStatus: status } }], terminal: { status, failureKind: status === "completed" ? null : classifyMessageFailureKind(messageText ?? turn.status, "backend-failed"), message: messageText } }; } return { events: [{ type: "backend_status", payload: { phase: method } }] }; } function terminalStatusFromValue(value: unknown): TerminalStatus { if (value === "completed") return "completed"; - if (value === "cancelled" || value === "canceled") return "cancelled"; + if (value === "cancelled" || value === "canceled" || value === "interrupted") return "cancelled"; if (value === "blocked") return "blocked"; return "failed"; } -function childEnv(options: CodexStdioTurnOptions): NodeJS.ProcessEnv { - const env: NodeJS.ProcessEnv = { ...process.env, ...options.env }; - const codexHome = options.codexHome ?? options.env?.CODEX_HOME; - if (codexHome) env.CODEX_HOME = codexHome; - return env; +function childEnv(options: CodexStdioTurnOptions, codexHome: string): NodeJS.ProcessEnv { + return { + ...process.env, + ...options.env, + CODEX_HOME: codexHome, + CODEX_INTERNAL_ORIGINATOR_OVERRIDE: "agentrun", + }; } -function asResponseRecord(value: unknown): JsonRecord { - return typeof value === "object" && value !== null && !Array.isArray(value) ? value as JsonRecord : {}; +function resolveCodexHome(options: CodexStdioTurnOptions): string { + return options.codexHome ?? options.env?.CODEX_HOME ?? `${options.env?.HOME ?? process.env.HOME ?? ""}/.codex`; +} + +function validateInitializeResponse(value: JsonRecord): void { + const serverInfo = value.serverInfo; + if (serverInfo !== undefined && (typeof serverInfo !== "object" || serverInfo === null || Array.isArray(serverInfo))) { + throw new CodexStdioFailure("backend-response-invalid", "initialize response serverInfo must be an object when present", "response:initialize", { response: value }); + } +} + +function requireResponseRecord(value: unknown, method: string): JsonRecord { + if (typeof value === "object" && value !== null && !Array.isArray(value)) return value as JsonRecord; + throw new CodexStdioFailure("backend-response-invalid", `${method} response result must be an object`, `response:${method}`); +} + +function requireNestedId(value: JsonRecord, method: string, key: string): string { + const id = stringAt(asRecordAt(value, key), "id"); + if (id) return id; + throw new CodexStdioFailure("backend-response-invalid", `${method} response did not include ${key}.id`, `response:${method}`, { response: value }); +} + +function textInput(text: string): JsonValue[] { + return [{ type: "text", text, text_elements: [] }]; +} + +function fileReadable(filePath: string): JsonRecord { + try { + accessSync(filePath, fsConstants.R_OK); + return { ...pathSummary(filePath), readable: true }; + } catch { + return { ...pathSummary(filePath), readable: false }; + } +} + +function pathSummary(value: string): JsonRecord { + const raw = String(value || ""); + const parts = raw.split(/[\\/]+/u).filter(Boolean); + return { + present: raw.trim().length > 0, + absolute: path.isAbsolute(raw), + basename: parts.at(-1) ?? null, + depth: parts.length, + fingerprint: shortHash(raw), + valuePrinted: false, + }; +} + +function runtimeSummary(options: CodexStdioTurnOptions, env: NodeJS.ProcessEnv, codexHome: string): JsonRecord { + return { + command: options.command ?? "codex", + args: options.args ?? defaultCodexArgs, + cwd: pathSummary(options.cwd), + workspace: pathSummary(options.cwd), + codexHome: pathSummary(codexHome), + env: envSummary(env), + valuesPrinted: false, + }; +} + +function envSummary(env: NodeJS.ProcessEnv): JsonRecord { + const keyState: Record = {}; + for (const key of childEnvSummaryKeys) keyState[key] = { present: typeof env[key] === "string" && String(env[key]).length > 0 }; + const secretLikeKeyCount = Object.keys(env).filter((key) => /auth|authorization|api[_-]?key|token|password|secret|credential/iu.test(key)).length; + return { + keyCount: Object.keys(env).length, + trackedKeys: keyState, + secretLikeKeyCount, + valuesPrinted: false, + }; +} + +function closeEvent(closeInfo: CodexStdioCloseInfo): JsonRecord { + return { + code: closeInfo.code, + signal: closeInfo.signal, + failureKind: closeInfo.failureKind, + message: closeInfo.message, + stderrTail: closeInfo.stderrTail.slice(-stderrEventChars), + stderrBytes: closeInfo.stderrBytes, + stderrTruncated: closeInfo.stderrTruncated || closeInfo.stderrTail.length > stderrEventChars, + }; +} + +function terminalFromClose(closeInfo: CodexStdioCloseInfo): { status: TerminalStatus; failureKind: FailureKind; message: string } { + const baseMessage = `codex app-server closed before turn/completed code=${closeInfo.code} signal=${closeInfo.signal}`; + const combined = [closeInfo.message ?? "", closeInfo.stderrTail].filter(Boolean).join("\n"); + const failureKind = closeInfo.failureKind ?? classifyMessageFailureKind(combined, "backend-response-invalid"); + const stderrPreview = closeInfo.stderrTail.trim().length > 0 ? `; stderrTail=${closeInfo.stderrTail.slice(-1000)}` : ""; + return { status: "failed", failureKind, message: redactText(`${baseMessage}${stderrPreview}`) }; +} + +function failureFromRpcError(method: string, value: unknown): CodexStdioFailure { + const error = typeof value === "object" && value !== null ? value as Record : {}; + const message = typeof error.message === "string" ? error.message : JSON.stringify(redactJson(value as JsonValue)); + return new CodexStdioFailure(classifyMessageFailureKind(message, "backend-protocol-error"), `codex app-server ${method} error: ${message}`, `response:${method}`, { method, error: redactJson(value as JsonValue) }); +} + +function spawnFailure(command: string, error: unknown): CodexStdioFailure { + const message = error instanceof Error ? error.message : String(error); + const code = typeof error === "object" && error !== null && "code" in error ? String((error as { code?: unknown }).code ?? "") : ""; + return new CodexStdioFailure("backend-spawn-failed", `failed to start Codex app-server command ${command}: ${message}`, "process:spawn", { command, code }); +} + +function normalizeFailure(error: unknown): CodexStdioFailure { + if (error instanceof CodexStdioFailure) return error; + const message = error instanceof Error ? error.message : String(error); + return new CodexStdioFailure(classifyMessageFailureKind(message, "backend-protocol-error"), message, "codex-stdio"); +} + +function classifyMessageFailureKind(message: string, fallback: FailureKind): FailureKind { + const text = String(message || "").toLowerCase(); + if (/rate.?limit|too many requests|\b429\b/u.test(text)) return "provider-rate-limited"; + if (/\b401\b|\b403\b|unauthori[sz]ed|forbidden|invalid api key|authentication|auth failed|oauth|access token/u.test(text)) return "provider-auth-failed"; + if (/timed out|timeout|idle timeout/u.test(text)) return "backend-timeout"; + if (/invalid json|json parse/u.test(text)) return "backend-json-parse-error"; + return fallback; +} + +function positiveTimeout(value: number): number { + return Number.isFinite(value) && value > 0 ? Math.max(1, Math.floor(value)) : requestTimeoutCapMs; } function asRecordAt(value: JsonRecord, key: string): JsonRecord { @@ -242,5 +550,9 @@ function asRecordAt(value: JsonRecord, key: string): JsonRecord { } function stringAt(value: JsonRecord, key: string): string | null { - return typeof value[key] === "string" ? value[key] : null; + return typeof value[key] === "string" && String(value[key]).length > 0 ? String(value[key]) : null; +} + +function shortHash(value: string): string { + return createHash("sha256").update(value).digest("hex").slice(0, 12); } diff --git a/src/common/redaction.ts b/src/common/redaction.ts index 43f06d1..6ba45b1 100644 --- a/src/common/redaction.ts +++ b/src/common/redaction.ts @@ -1,7 +1,8 @@ export function redactText(value: string): string { return value + .replace(/\b(sk-[A-Za-z0-9_-]{8,}|ghp_[A-Za-z0-9_]{8,}|github_pat_[A-Za-z0-9_]+)\b/gu, "REDACTED") .replace(/(authorization\s*[:=]\s*)(bearer\s+)?[A-Za-z0-9._~+/=-]+/giu, "$1$2REDACTED") - .replace(/((?:api[_-]?key|token|password|secret)\s*[:=]\s*)[A-Za-z0-9._~+/=-]+/giu, "$1REDACTED") + .replace(/((?:api[_-]?key|token|password|secret)\s*["']?\s*[:=]\s*["']?)[A-Za-z0-9._~+/=-]+/giu, "$1REDACTED") .replace(/(postgres(?:ql)?:\/\/[^:\s/@]+:)[^@\s]+(@)/giu, "$1REDACTED$2") .replace(/(https?:\/\/[^:\s/@]+:)[^@\s]+(@)/giu, "$1REDACTED$2"); } diff --git a/src/common/types.ts b/src/common/types.ts index 1ef153e..195f4c6 100644 --- a/src/common/types.ts +++ b/src/common/types.ts @@ -9,6 +9,9 @@ export type FailureKind = | "runner-lease-conflict" | "backend-failed" | "backend-protocol-error" + | "backend-spawn-failed" + | "backend-json-parse-error" + | "backend-response-invalid" | "backend-timeout" | "provider-auth-failed" | "provider-rate-limited" diff --git a/src/mgr/main.ts b/src/mgr/main.ts index 876ad1c..775016f 100644 --- a/src/mgr/main.ts +++ b/src/mgr/main.ts @@ -1,6 +1,13 @@ import { startManagerServer } from "./server.js"; +import { errorToJson } from "../common/errors.js"; const port = Number(process.env.PORT ?? process.env.AGENTRUN_MGR_PORT ?? "8080"); const host = process.env.HOST ?? "0.0.0.0"; -const started = await startManagerServer({ port, host }); -console.log(JSON.stringify({ ok: true, serviceId: "agentrun-mgr", baseUrl: started.baseUrl })); +try { + const started = await startManagerServer({ port, host }); + const database = await started.store.health(); + console.log(JSON.stringify({ ok: true, serviceId: "agentrun-mgr", baseUrl: started.baseUrl, database })); +} catch (error) { + console.error(JSON.stringify({ ok: false, serviceId: "agentrun-mgr", failureKind: "infra-failed", message: error instanceof Error ? error.message : String(error), error: errorToJson(error) })); + process.exit(1); +} diff --git a/src/mgr/postgres-store.ts b/src/mgr/postgres-store.ts new file mode 100644 index 0000000..8c5d815 --- /dev/null +++ b/src/mgr/postgres-store.ts @@ -0,0 +1,476 @@ +import { createHash } from "node:crypto"; +import { Pool } from "pg"; +import type { PoolClient, QueryResultRow } from "pg"; +import { AgentRunError } from "../common/errors.js"; +import { redactJson } from "../common/redaction.js"; +import type { BackendProfile, BackendTurnResult, CommandRecord, CommandState, CreateCommandInput, CreateRunInput, EventType, FailureKind, JsonRecord, JsonValue, RunEvent, RunnerRecord, RunRecord, RunStatus, TerminalStatus } from "../common/types.js"; +import { newId, nowIso, stableHash } from "../common/validation.js"; +import type { AgentRunStore, StoreHealth } from "./store.js"; +import { statusFromTerminal } from "./store.js"; + +interface PostgresStoreOptions { + connectionString: string; +} + +interface MigrationDefinition { + id: string; + checksum: string; + sql: string; +} + +const initialMigrationSql = ` +CREATE TABLE IF NOT EXISTS agentrun_runs ( + id text PRIMARY KEY, + tenant_id text NOT NULL, + project_id text NOT NULL, + workspace_ref jsonb NOT NULL, + provider_id text NOT NULL, + backend_profile text NOT NULL, + execution_policy jsonb NOT NULL, + trace_sink jsonb, + status text NOT NULL, + terminal_status text, + failure_kind text, + failure_message text, + created_at timestamptz NOT NULL, + updated_at timestamptz NOT NULL, + claimed_by text, + lease_expires_at timestamptz +); + +CREATE TABLE IF NOT EXISTS agentrun_commands ( + id text PRIMARY KEY, + run_id text NOT NULL REFERENCES agentrun_runs(id) ON DELETE CASCADE, + seq integer NOT NULL, + type text NOT NULL, + payload jsonb NOT NULL, + payload_hash text NOT NULL, + idempotency_key text, + state text NOT NULL, + created_at timestamptz NOT NULL, + updated_at timestamptz NOT NULL, + acknowledged_at timestamptz, + UNIQUE (run_id, seq) +); + +CREATE UNIQUE INDEX IF NOT EXISTS agentrun_commands_run_idempotency_key_idx + ON agentrun_commands (run_id, idempotency_key) + WHERE idempotency_key IS NOT NULL; + +CREATE TABLE IF NOT EXISTS agentrun_events ( + id text PRIMARY KEY, + run_id text NOT NULL REFERENCES agentrun_runs(id) ON DELETE CASCADE, + seq integer NOT NULL, + type text NOT NULL, + payload jsonb NOT NULL, + created_at timestamptz NOT NULL, + UNIQUE (run_id, seq) +); + +CREATE TABLE IF NOT EXISTS agentrun_runners ( + id text PRIMARY KEY, + run_id text, + attempt_id text, + backend_profile text, + placement text, + source_commit text, + metadata jsonb NOT NULL DEFAULT '{}'::jsonb, + registered_at timestamptz NOT NULL, + heartbeat_at timestamptz NOT NULL +); + +CREATE TABLE IF NOT EXISTS agentrun_backends ( + profile text PRIMARY KEY, + capabilities jsonb NOT NULL, + capacity jsonb NOT NULL, + health jsonb NOT NULL, + updated_at timestamptz NOT NULL +); + +CREATE TABLE IF NOT EXISTS agentrun_leases ( + run_id text PRIMARY KEY REFERENCES agentrun_runs(id) ON DELETE CASCADE, + runner_id text NOT NULL, + lease_expires_at timestamptz NOT NULL, + stale_recovery_marker jsonb, + updated_at timestamptz NOT NULL +); + +CREATE INDEX IF NOT EXISTS agentrun_runs_status_idx ON agentrun_runs (status, updated_at); +CREATE INDEX IF NOT EXISTS agentrun_events_run_seq_idx ON agentrun_events (run_id, seq); +CREATE INDEX IF NOT EXISTS agentrun_commands_run_seq_idx ON agentrun_commands (run_id, seq); +CREATE INDEX IF NOT EXISTS agentrun_leases_expiry_idx ON agentrun_leases (lease_expires_at); + +INSERT INTO agentrun_backends (profile, capabilities, capacity, health, updated_at) +VALUES ( + 'codex', + '{"protocol":"codex-app-server-jsonrpc-stdio","transport":"stdio","command":"codex app-server --listen stdio://"}'::jsonb, + '{"mode":"manual-runner-v0.1"}'::jsonb, + '{"status":"registered"}'::jsonb, + now() +) +ON CONFLICT (profile) DO UPDATE SET + capabilities = EXCLUDED.capabilities, + capacity = EXCLUDED.capacity, + health = EXCLUDED.health, + updated_at = EXCLUDED.updated_at; +`; + +const postgresMigrations: MigrationDefinition[] = [ + { + id: "001_v01_initial_durable_store", + checksum: checksumSql(initialMigrationSql), + sql: initialMigrationSql, + }, +]; + +export function postgresMigrationContract(): JsonRecord { + return { + migrationIds: postgresMigrations.map((migration) => migration.id), + latestMigrationId: latestMigrationId(), + requiredTables: ["agentrun_schema_migrations", "agentrun_runs", "agentrun_commands", "agentrun_events", "agentrun_runners", "agentrun_backends", "agentrun_leases"], + checksums: Object.fromEntries(postgresMigrations.map((migration) => [migration.id, migration.checksum])), + }; +} + +export async function createPostgresAgentRunStore(options: PostgresStoreOptions): Promise { + const store = new PostgresAgentRunStore(options); + await store.migrate(); + return store; +} + +export class PostgresAgentRunStore implements AgentRunStore { + private readonly pool: Pool; + private migrationReady = false; + private appliedMigrationId: string | null = null; + + constructor(options: PostgresStoreOptions) { + this.pool = new Pool({ connectionString: options.connectionString, application_name: "agentrun-mgr-v01" }); + } + + async migrate(): Promise { + await this.withTransaction(async (client) => { + await client.query(` +CREATE TABLE IF NOT EXISTS agentrun_schema_migrations ( + id text PRIMARY KEY, + checksum text NOT NULL, + applied_at timestamptz NOT NULL DEFAULT now() +) +`); + for (const migration of postgresMigrations) { + const existing = await client.query<{ checksum: string }>("SELECT checksum FROM agentrun_schema_migrations WHERE id = $1 FOR UPDATE", [migration.id]); + if (existing.rowCount && existing.rows[0]?.checksum !== migration.checksum) { + throw new AgentRunError("infra-failed", `schema migration checksum mismatch for ${migration.id}`, { httpStatus: 503, details: { migrationId: migration.id } }); + } + if (!existing.rowCount) { + await client.query(migration.sql); + await client.query("INSERT INTO agentrun_schema_migrations (id, checksum) VALUES ($1, $2)", [migration.id, migration.checksum]); + } + } + }); + this.migrationReady = true; + this.appliedMigrationId = latestMigrationId(); + } + + async health(): Promise { + try { + await this.pool.query("SELECT 1"); + const result = await this.pool.query<{ id: string }>("SELECT id FROM agentrun_schema_migrations ORDER BY applied_at DESC, id DESC LIMIT 1"); + const migrationId = result.rows[0]?.id ?? null; + const migrationReady = this.migrationReady && migrationId === latestMigrationId(); + return { adapter: "postgres", ready: migrationReady, reachable: true, migrationReady, migrationId, failureKind: migrationReady ? null : "infra-failed", message: migrationReady ? null : "schema migration is not ready", credentialValuesPrinted: false }; + } catch (error) { + return { adapter: "postgres", ready: false, reachable: false, migrationReady: false, migrationId: this.appliedMigrationId, failureKind: "infra-failed", message: error instanceof Error ? error.message : String(error), credentialValuesPrinted: false }; + } + } + + async createRun(input: CreateRunInput): Promise { + const at = nowIso(); + const run: RunRecord = { ...input, id: newId("run"), status: "pending", terminalStatus: null, failureKind: null, failureMessage: null, createdAt: at, updatedAt: at, claimedBy: null, leaseExpiresAt: null }; + return this.withTransaction(async (client) => { + await client.query( + `INSERT INTO agentrun_runs (id, tenant_id, project_id, workspace_ref, provider_id, backend_profile, execution_policy, trace_sink, status, terminal_status, failure_kind, failure_message, created_at, updated_at, claimed_by, lease_expires_at) + VALUES ($1, $2, $3, $4::jsonb, $5, $6, $7::jsonb, $8::jsonb, $9, $10, $11, $12, $13, $14, $15, $16)`, + [run.id, run.tenantId, run.projectId, JSON.stringify(run.workspaceRef), run.providerId, run.backendProfile, JSON.stringify(run.executionPolicy), JSON.stringify(run.traceSink), run.status, run.terminalStatus, run.failureKind, run.failureMessage, run.createdAt, run.updatedAt, run.claimedBy, run.leaseExpiresAt], + ); + await this.appendEventWithLockedRun(client, run.id, "backend_status", { phase: "run-created", backendProfile: run.backendProfile }); + return run; + }); + } + + async getRun(runId: string): Promise { + const result = await this.pool.query("SELECT * FROM agentrun_runs WHERE id = $1", [runId]); + const row = result.rows[0]; + if (!row) throw new AgentRunError("schema-invalid", `run ${runId} was not found`, { httpStatus: 404 }); + return runFromRow(row); + } + + async listEvents(runId: string, afterSeq: number, limit: number): Promise { + await this.getRun(runId); + const result = await this.pool.query("SELECT * FROM agentrun_events WHERE run_id = $1 AND seq > $2 ORDER BY seq ASC LIMIT $3", [runId, afterSeq, clamp(limit, 1, 500)]); + return result.rows.map(eventFromRow); + } + + async createCommand(runId: string, input: CreateCommandInput): Promise { + const payloadHash = stableHash(input.payload); + return this.withTransaction(async (client) => { + await this.requireRunForUpdate(client, runId); + if (input.idempotencyKey) { + const existing = await client.query("SELECT * FROM agentrun_commands WHERE run_id = $1 AND idempotency_key = $2", [runId, input.idempotencyKey]); + if (existing.rows[0]) { + const command = commandFromRow(existing.rows[0]); + if (command.payloadHash !== payloadHash) throw new AgentRunError("schema-invalid", "idempotency key reused with different payload", { httpStatus: 409 }); + return command; + } + } + const seq = await this.nextSeq(client, "agentrun_commands", runId); + const at = nowIso(); + const command: CommandRecord = { ...input, id: newId("cmd"), runId, seq, state: "pending", payloadHash, createdAt: at, updatedAt: at, acknowledgedAt: null }; + await client.query( + `INSERT INTO agentrun_commands (id, run_id, seq, type, payload, payload_hash, idempotency_key, state, created_at, updated_at, acknowledged_at) + VALUES ($1, $2, $3, $4, $5::jsonb, $6, $7, $8, $9, $10, $11)`, + [command.id, command.runId, command.seq, command.type, JSON.stringify(command.payload), command.payloadHash, command.idempotencyKey ?? null, command.state, command.createdAt, command.updatedAt, command.acknowledgedAt], + ); + await this.appendEventWithLockedRun(client, runId, "backend_status", { phase: "command-created", commandId: command.id, commandType: command.type }); + return command; + }); + } + + async getCommand(commandId: string): Promise { + const result = await this.pool.query("SELECT * FROM agentrun_commands WHERE id = $1", [commandId]); + const row = result.rows[0]; + if (!row) throw new AgentRunError("schema-invalid", `command ${commandId} was not found`, { httpStatus: 404 }); + return commandFromRow(row); + } + + async listCommands(runId: string, afterSeq: number, limit: number): Promise { + await this.getRun(runId); + const result = await this.pool.query("SELECT * FROM agentrun_commands WHERE run_id = $1 AND seq > $2 ORDER BY seq ASC LIMIT $3", [runId, afterSeq, clamp(limit, 1, 100)]); + return result.rows.map(commandFromRow); + } + + async registerRunner(input: Partial): Promise { + const at = nowIso(); + const runner: RunnerRecord = { id: input.id ?? newId("runner"), registeredAt: at, heartbeatAt: at, ...input }; + const metadata = metadataForRunner(runner); + const result = await this.pool.query( + `INSERT INTO agentrun_runners (id, run_id, attempt_id, backend_profile, placement, source_commit, metadata, registered_at, heartbeat_at) + VALUES ($1, $2, $3, $4, $5, $6, $7::jsonb, $8, $9) + ON CONFLICT (id) DO UPDATE SET + run_id = EXCLUDED.run_id, + attempt_id = EXCLUDED.attempt_id, + backend_profile = EXCLUDED.backend_profile, + placement = EXCLUDED.placement, + source_commit = EXCLUDED.source_commit, + metadata = EXCLUDED.metadata, + heartbeat_at = EXCLUDED.heartbeat_at + RETURNING *`, + [runner.id, runner.runId ?? null, runner.attemptId ?? null, runner.backendProfile ?? null, runner.placement ?? null, runner.sourceCommit ?? null, JSON.stringify(metadata), runner.registeredAt, runner.heartbeatAt], + ); + return runnerFromRow(result.rows[0]); + } + + async claimRun(runId: string, runnerId: string, leaseMs: number): Promise { + return this.withTransaction(async (client) => { + const run = await this.requireRunForUpdate(client, runId); + if (run.claimedBy && run.claimedBy !== runnerId && !isTerminalStatus(run.status)) throw new AgentRunError("runner-lease-conflict", `run ${runId} is already claimed`, { httpStatus: 409 }); + const leaseExpiresAt = new Date(Date.now() + leaseMs).toISOString(); + const updated = await client.query( + `UPDATE agentrun_runs SET status = $2, claimed_by = $3, lease_expires_at = $4, updated_at = $5 WHERE id = $1 RETURNING *`, + [runId, "claimed", runnerId, leaseExpiresAt, nowIso()], + ); + await client.query( + `INSERT INTO agentrun_leases (run_id, runner_id, lease_expires_at, stale_recovery_marker, updated_at) + VALUES ($1, $2, $3, $4::jsonb, $5) + ON CONFLICT (run_id) DO UPDATE SET runner_id = EXCLUDED.runner_id, lease_expires_at = EXCLUDED.lease_expires_at, updated_at = EXCLUDED.updated_at`, + [runId, runnerId, leaseExpiresAt, null, nowIso()], + ); + await this.appendEventWithLockedRun(client, runId, "backend_status", { phase: "run-claimed", runnerId }); + return runFromRow(updated.rows[0]); + }); + } + + async heartbeat(runId: string, runnerId: string, leaseMs: number): Promise { + return this.withTransaction(async (client) => { + const run = await this.requireRunForUpdate(client, runId); + if (run.claimedBy !== runnerId) throw new AgentRunError("runner-lease-conflict", `run ${runId} is not claimed by ${runnerId}`, { httpStatus: 409 }); + const leaseExpiresAt = new Date(Date.now() + leaseMs).toISOString(); + const updated = await client.query("UPDATE agentrun_runs SET lease_expires_at = $2, updated_at = $3 WHERE id = $1 RETURNING *", [runId, leaseExpiresAt, nowIso()]); + await client.query("UPDATE agentrun_runners SET heartbeat_at = $2 WHERE id = $1", [runnerId, nowIso()]); + await client.query("UPDATE agentrun_leases SET lease_expires_at = $2, updated_at = $3 WHERE run_id = $1", [runId, leaseExpiresAt, nowIso()]); + return runFromRow(updated.rows[0]); + }); + } + + async ackCommand(commandId: string): Promise { + const result = await this.pool.query("UPDATE agentrun_commands SET state = $2, acknowledged_at = $3, updated_at = $3 WHERE id = $1 RETURNING *", [commandId, "acknowledged", nowIso()]); + const row = result.rows[0]; + if (!row) throw new AgentRunError("schema-invalid", `command ${commandId} was not found`, { httpStatus: 404 }); + return commandFromRow(row); + } + + async appendEvent(runId: string, type: EventType, payload: JsonRecord): Promise { + return this.withTransaction(async (client) => { + await this.requireRunForUpdate(client, runId); + return this.appendEventWithLockedRun(client, runId, type, payload); + }); + } + + async finishRun(runId: string, result: Pick): Promise { + return this.withTransaction(async (client) => { + await this.requireRunForUpdate(client, runId); + const status = statusFromTerminal(result.terminalStatus); + const updated = await client.query( + `UPDATE agentrun_runs SET status = $2, terminal_status = $3, failure_kind = $4, failure_message = $5, updated_at = $6 WHERE id = $1 RETURNING *`, + [runId, status, result.terminalStatus, result.failureKind, result.failureMessage, nowIso()], + ); + await this.appendEventWithLockedRun(client, runId, "terminal_status", { terminalStatus: result.terminalStatus, failureKind: result.failureKind, message: result.failureMessage }); + return runFromRow(updated.rows[0]); + }); + } + + async backends(): Promise { + const result = await this.pool.query("SELECT * FROM agentrun_backends ORDER BY profile ASC"); + return result.rows.map((row) => ({ profile: stringValue(row.profile), ...jsonRecord(row.capabilities), capacity: jsonValue(row.capacity), health: jsonValue(row.health), updatedAt: nullableIso(row.updated_at) })); + } + + async close(): Promise { + await this.pool.end(); + } + + private async appendEventWithLockedRun(client: PoolClient, runId: string, type: EventType, payload: JsonRecord): Promise { + const seq = await this.nextSeq(client, "agentrun_events", runId); + const event: RunEvent = { id: newId("evt"), runId, seq, type, payload: redactJson(payload), createdAt: nowIso() }; + await client.query("INSERT INTO agentrun_events (id, run_id, seq, type, payload, created_at) VALUES ($1, $2, $3, $4, $5::jsonb, $6)", [event.id, event.runId, event.seq, event.type, JSON.stringify(event.payload), event.createdAt]); + return event; + } + + private async nextSeq(client: PoolClient, table: "agentrun_commands" | "agentrun_events", runId: string): Promise { + const result = await client.query<{ seq: number }>(`SELECT COALESCE(MAX(seq), 0) + 1 AS seq FROM ${table} WHERE run_id = $1`, [runId]); + return Number(result.rows[0]?.seq ?? 1); + } + + private async requireRunForUpdate(client: PoolClient, runId: string): Promise { + const result = await client.query("SELECT * FROM agentrun_runs WHERE id = $1 FOR UPDATE", [runId]); + const row = result.rows[0]; + if (!row) throw new AgentRunError("schema-invalid", `run ${runId} was not found`, { httpStatus: 404 }); + return runFromRow(row); + } + + private async withTransaction(fn: (client: PoolClient) => Promise): Promise { + const client = await this.pool.connect(); + try { + await client.query("BEGIN"); + const result = await fn(client); + await client.query("COMMIT"); + return result; + } catch (error) { + await client.query("ROLLBACK"); + throw error; + } finally { + client.release(); + } + } +} + +function checksumSql(sql: string): string { + return createHash("sha256").update(sql.trim()).digest("hex"); +} + +function latestMigrationId(): string { + return postgresMigrations[postgresMigrations.length - 1]?.id ?? "none"; +} + +function clamp(value: number, min: number, max: number): number { + return Math.max(min, Math.min(value, max)); +} + +function runFromRow(row: QueryResultRow): RunRecord { + return { + id: stringValue(row.id), + tenantId: stringValue(row.tenant_id), + projectId: stringValue(row.project_id), + workspaceRef: jsonRecord(row.workspace_ref) as RunRecord["workspaceRef"], + providerId: stringValue(row.provider_id), + backendProfile: stringValue(row.backend_profile) as BackendProfile, + executionPolicy: jsonRecord(row.execution_policy) as RunRecord["executionPolicy"], + traceSink: jsonValue(row.trace_sink), + status: stringValue(row.status) as RunStatus, + terminalStatus: nullableString(row.terminal_status) as TerminalStatus | null, + failureKind: nullableString(row.failure_kind) as FailureKind | null, + failureMessage: nullableString(row.failure_message), + createdAt: iso(row.created_at), + updatedAt: iso(row.updated_at), + claimedBy: nullableString(row.claimed_by), + leaseExpiresAt: nullableIso(row.lease_expires_at), + }; +} + +function commandFromRow(row: QueryResultRow): CommandRecord { + return { + id: stringValue(row.id), + runId: stringValue(row.run_id), + seq: Number(row.seq), + type: stringValue(row.type) as CommandRecord["type"], + payload: jsonRecord(row.payload), + payloadHash: stringValue(row.payload_hash), + ...(nullableString(row.idempotency_key) ? { idempotencyKey: stringValue(row.idempotency_key) } : {}), + state: stringValue(row.state) as CommandState, + createdAt: iso(row.created_at), + updatedAt: iso(row.updated_at), + acknowledgedAt: nullableIso(row.acknowledged_at), + }; +} + +function eventFromRow(row: QueryResultRow): RunEvent { + return { id: stringValue(row.id), runId: stringValue(row.run_id), seq: Number(row.seq), type: stringValue(row.type) as EventType, payload: jsonRecord(row.payload), createdAt: iso(row.created_at) }; +} + +function runnerFromRow(row: QueryResultRow): RunnerRecord { + return { + ...jsonRecord(row.metadata), + id: stringValue(row.id), + ...(nullableString(row.run_id) ? { runId: stringValue(row.run_id) } : {}), + ...(nullableString(row.attempt_id) ? { attemptId: stringValue(row.attempt_id) } : {}), + ...(nullableString(row.backend_profile) ? { backendProfile: stringValue(row.backend_profile) as BackendProfile } : {}), + ...(nullableString(row.placement) ? { placement: stringValue(row.placement) } : {}), + ...(nullableString(row.source_commit) ? { sourceCommit: stringValue(row.source_commit) } : {}), + registeredAt: iso(row.registered_at), + heartbeatAt: iso(row.heartbeat_at), + }; +} + +function metadataForRunner(runner: RunnerRecord): JsonRecord { + const { id: _id, runId: _runId, attemptId: _attemptId, backendProfile: _backendProfile, placement: _placement, sourceCommit: _sourceCommit, registeredAt: _registeredAt, heartbeatAt: _heartbeatAt, ...metadata } = runner; + return redactJson(metadata); +} + +function isTerminalStatus(status: RunRecord["status"]): boolean { + return status === "completed" || status === "failed" || status === "cancelled"; +} + +function stringValue(value: unknown): string { + return typeof value === "string" ? value : String(value ?? ""); +} + +function nullableString(value: unknown): string | null { + return value === null || value === undefined ? null : stringValue(value); +} + +function jsonValue(value: unknown): JsonValue { + if (value === undefined) return null; + return value as JsonValue; +} + +function jsonRecord(value: unknown): JsonRecord { + return typeof value === "object" && value !== null && !Array.isArray(value) ? value as JsonRecord : {}; +} + +function iso(value: unknown): string { + if (value instanceof Date) return value.toISOString(); + if (typeof value === "string") return new Date(value).toISOString(); + return new Date(String(value)).toISOString(); +} + +function nullableIso(value: unknown): string | null { + return value === null || value === undefined ? null : iso(value); +} diff --git a/src/mgr/server.ts b/src/mgr/server.ts index 05845f8..944a037 100644 --- a/src/mgr/server.ts +++ b/src/mgr/server.ts @@ -2,7 +2,7 @@ import type { Server } from "node:http"; import { createServer } from "node:http"; import type { AddressInfo } from "node:net"; import type { AgentRunStore } from "./store.js"; -import { MemoryAgentRunStore } from "./store.js"; +import { openAgentRunStoreFromEnv } from "./store.js"; import { AgentRunError, errorToJson } from "../common/errors.js"; import { asRecord, validateCreateCommand, validateCreateRun } from "../common/validation.js"; import type { ApiErrorBody, ApiOkBody, JsonRecord, JsonValue, RunEvent } from "../common/types.js"; @@ -21,7 +21,7 @@ export interface StartedManagerServer { } export async function startManagerServer(options: ManagerServerOptions = {}): Promise { - const store = options.store ?? new MemoryAgentRunStore(); + const store = options.store ?? await openAgentRunStoreFromEnv(); const sourceCommit = options.sourceCommit ?? process.env.AGENTRUN_SOURCE_COMMIT ?? "unknown"; const server = createServer(async (req, res) => { const traceId = `trc_${Date.now().toString(36)}_${Math.random().toString(36).slice(2, 8)}`; @@ -52,45 +52,54 @@ async function readBody(req: import("node:http").IncomingMessage): Promise { const path = url.pathname; if (method === "GET" && (path === "/health" || path === "/health/live" || path === "/health/readiness")) { - return { serviceId: "agentrun-mgr", live: true, ready: true, database: { adapter: "memory-self-test", migrationReady: true }, sourceCommit, secretRefs: { valuesPrinted: false } }; + const database = await store.health(); + const ready = path === "/health/live" ? true : database.ready; + return { serviceId: "agentrun-mgr", live: true, ready, database, sourceCommit, secretRefs: { databaseUrl: database.adapter === "postgres" ? "redacted" : "not-used", valuesPrinted: false } }; } - if (method === "GET" && path === "/api/v1/backends") return { items: store.backends() }; - if (method === "POST" && path === "/api/v1/runs") return store.createRun(validateCreateRun(body)) as unknown as JsonValue; + if (method === "GET" && path === "/api/v1/backends") return { items: await store.backends() as unknown as JsonValue }; + if (method === "POST" && path === "/api/v1/runs") return await store.createRun(validateCreateRun(body)) as unknown as JsonValue; const runMatch = path.match(/^\/api\/v1\/runs\/([^/]+)$/u); - if (method === "GET" && runMatch) return store.getRun(runMatch[1] ?? "") as unknown as JsonValue; + if (method === "GET" && runMatch) return await store.getRun(runMatch[1] ?? "") as unknown as JsonValue; const eventMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/events$/u); if (method === "GET" && eventMatch) { const afterSeq = integerQuery(url, "afterSeq", 0); const limit = integerQuery(url, "limit", 100); - return { items: store.listEvents(eventMatch[1] ?? "", afterSeq, limit) as unknown as JsonValue }; + return { items: await store.listEvents(eventMatch[1] ?? "", afterSeq, limit) as unknown as JsonValue }; } const commandCreateMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/commands$/u); - if (method === "POST" && commandCreateMatch) return store.createCommand(commandCreateMatch[1] ?? "", validateCreateCommand(body)) as unknown as JsonValue; - if (method === "GET" && commandCreateMatch) return { items: store.listCommands(commandCreateMatch[1] ?? "", integerQuery(url, "afterSeq", 0), integerQuery(url, "limit", 20)) as unknown as JsonValue }; + if (method === "POST" && commandCreateMatch) return await store.createCommand(commandCreateMatch[1] ?? "", validateCreateCommand(body)) as unknown as JsonValue; + if (method === "GET" && commandCreateMatch) return { items: await store.listCommands(commandCreateMatch[1] ?? "", integerQuery(url, "afterSeq", 0), integerQuery(url, "limit", 20)) as unknown as JsonValue }; const commandShowMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/commands\/([^/]+)$/u); - if (method === "GET" && commandShowMatch) return store.getCommand(commandShowMatch[2] ?? "") as unknown as JsonValue; - if (method === "POST" && path === "/api/v1/runners/register") return store.registerRunner(asRecord(body ?? {}, "runner")) as unknown as JsonValue; + if (method === "GET" && commandShowMatch) return await store.getCommand(commandShowMatch[2] ?? "") as unknown as JsonValue; + if (method === "POST" && path === "/api/v1/runners/register") return await store.registerRunner(asRecord(body ?? {}, "runner")) as unknown as JsonValue; const claimMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/claim$/u); if (method === "POST" && claimMatch) { const record = asRecord(body, "claim"); const runnerId = typeof record.runnerId === "string" ? record.runnerId : ""; if (runnerId.length === 0) throw new AgentRunError("schema-invalid", "runnerId is required", { httpStatus: 400 }); - return store.claimRun(claimMatch[1] ?? "", runnerId, numberField(record, "leaseMs", 60_000)) as unknown as JsonValue; + return await store.claimRun(claimMatch[1] ?? "", runnerId, numberField(record, "leaseMs", 60_000)) as unknown as JsonValue; + } + const leaseMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/lease$/u); + if (method === "PATCH" && leaseMatch) { + const record = asRecord(body, "lease"); + const runnerId = typeof record.runnerId === "string" ? record.runnerId : ""; + if (runnerId.length === 0) throw new AgentRunError("schema-invalid", "runnerId is required", { httpStatus: 400 }); + return await store.heartbeat(leaseMatch[1] ?? "", runnerId, numberField(record, "leaseMs", 60_000)) as unknown as JsonValue; } const eventsAppendMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/events$/u); if (method === "POST" && eventsAppendMatch) { const record = asRecord(body, "event"); const type = typeof record.type === "string" ? record.type as RunEvent["type"] : "backend_status"; - return store.appendEvent(eventsAppendMatch[1] ?? "", type, asRecord(record.payload ?? {}, "event.payload")) as unknown as JsonValue; + return await store.appendEvent(eventsAppendMatch[1] ?? "", type, asRecord(record.payload ?? {}, "event.payload")) as unknown as JsonValue; } const statusMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/status$/u); if (method === "PATCH" && statusMatch) { const record = asRecord(body, "status"); const terminalStatus = record.terminalStatus === "completed" || record.terminalStatus === "failed" || record.terminalStatus === "blocked" || record.terminalStatus === "cancelled" ? record.terminalStatus : "failed"; - return store.finishRun(statusMatch[1] ?? "", { terminalStatus, failureKind: typeof record.failureKind === "string" ? record.failureKind as never : null, failureMessage: typeof record.failureMessage === "string" ? record.failureMessage : null }) as unknown as JsonValue; + return await store.finishRun(statusMatch[1] ?? "", { terminalStatus, failureKind: typeof record.failureKind === "string" ? record.failureKind as never : null, failureMessage: typeof record.failureMessage === "string" ? record.failureMessage : null }) as unknown as JsonValue; } const ackMatch = path.match(/^\/api\/v1\/commands\/([^/]+)\/ack$/u); - if (method === "POST" && ackMatch) return store.ackCommand(ackMatch[1] ?? "") as unknown as JsonValue; + if (method === "POST" && ackMatch) return await store.ackCommand(ackMatch[1] ?? "") as unknown as JsonValue; throw new AgentRunError("schema-invalid", `unsupported route ${method} ${path}`, { httpStatus: 404 }); } diff --git a/src/mgr/store.ts b/src/mgr/store.ts index 44fa01a..ee7a36c 100644 --- a/src/mgr/store.ts +++ b/src/mgr/store.ts @@ -3,20 +3,46 @@ import { AgentRunError } from "../common/errors.js"; import { newId, nowIso, stableHash } from "../common/validation.js"; import { redactJson } from "../common/redaction.js"; +export type MaybePromise = T | Promise; + +export interface StoreHealth extends JsonRecord { + adapter: "memory-self-test" | "postgres"; + ready: boolean; + reachable: boolean; + migrationReady: boolean; + migrationId: string | null; + failureKind: FailureKind | null; + message: string | null; + credentialValuesPrinted: false; +} + export interface AgentRunStore { - createRun(input: CreateRunInput): RunRecord; - getRun(runId: string): RunRecord; - listEvents(runId: string, afterSeq: number, limit: number): RunEvent[]; - createCommand(runId: string, input: CreateCommandInput): CommandRecord; - getCommand(commandId: string): CommandRecord; - listCommands(runId: string, afterSeq: number, limit: number): CommandRecord[]; - registerRunner(input: Partial): RunnerRecord; - claimRun(runId: string, runnerId: string, leaseMs: number): RunRecord; - heartbeat(runId: string, runnerId: string, leaseMs: number): RunRecord; - ackCommand(commandId: string): CommandRecord; - appendEvent(runId: string, type: RunEvent["type"], payload: JsonRecord): RunEvent; - finishRun(runId: string, result: Pick): RunRecord; - backends(): JsonRecord[]; + health(): MaybePromise; + createRun(input: CreateRunInput): MaybePromise; + getRun(runId: string): MaybePromise; + listEvents(runId: string, afterSeq: number, limit: number): MaybePromise; + createCommand(runId: string, input: CreateCommandInput): MaybePromise; + getCommand(commandId: string): MaybePromise; + listCommands(runId: string, afterSeq: number, limit: number): MaybePromise; + registerRunner(input: Partial): MaybePromise; + claimRun(runId: string, runnerId: string, leaseMs: number): MaybePromise; + heartbeat(runId: string, runnerId: string, leaseMs: number): MaybePromise; + ackCommand(commandId: string): MaybePromise; + appendEvent(runId: string, type: RunEvent["type"], payload: JsonRecord): MaybePromise; + finishRun(runId: string, result: Pick): MaybePromise; + backends(): MaybePromise; + close?(): MaybePromise; +} + +export async function openAgentRunStoreFromEnv(env: NodeJS.ProcessEnv = process.env): Promise { + const databaseUrl = env.DATABASE_URL?.trim(); + if (databaseUrl) { + const { createPostgresAgentRunStore } = await import("./postgres-store.js"); + return createPostgresAgentRunStore({ connectionString: databaseUrl }); + } + const storeMode = env.AGENTRUN_STORE ?? env.AGENTRUN_MGR_STORE; + if (storeMode === "memory") return new MemoryAgentRunStore(); + throw new AgentRunError("infra-failed", "DATABASE_URL is required for agentrun-mgr live runtime; set AGENTRUN_STORE=memory only for explicit self-test/dev mode", { httpStatus: 503, details: { adapter: "postgres", databaseUrl: "missing", memoryFallback: "disabled" } }); } export class MemoryAgentRunStore implements AgentRunStore { @@ -25,6 +51,10 @@ export class MemoryAgentRunStore implements AgentRunStore { private readonly eventsByRun = new Map(); private readonly runners = new Map(); + health(): StoreHealth { + return { adapter: "memory-self-test", ready: true, reachable: true, migrationReady: true, migrationId: "memory-self-test", failureKind: null, message: null, credentialValuesPrinted: false }; + } + createRun(input: CreateRunInput): RunRecord { const at = nowIso(); const run: RunRecord = { ...input, id: newId("run"), status: "pending", terminalStatus: null, failureKind: null, failureMessage: null, createdAt: at, updatedAt: at, claimedBy: null, leaseExpiresAt: null }; @@ -130,7 +160,7 @@ export class MemoryAgentRunStore implements AgentRunStore { } } -function statusFromTerminal(terminalStatus: TerminalStatus): RunRecord["status"] { +export function statusFromTerminal(terminalStatus: TerminalStatus): RunRecord["status"] { if (terminalStatus === "completed") return "completed"; if (terminalStatus === "cancelled") return "cancelled"; if (terminalStatus === "blocked") return "blocked"; diff --git a/src/runner/k8s-job.ts b/src/runner/k8s-job.ts new file mode 100644 index 0000000..6ce5225 --- /dev/null +++ b/src/runner/k8s-job.ts @@ -0,0 +1,191 @@ +import { stableHash } from "../common/validation.js"; +import type { BackendProfile, ExecutionPolicy, JsonRecord, JsonValue, RunRecord, SecretRef } from "../common/types.js"; + +export interface RunnerJobRenderOptions { + run: RunRecord; + commandId: string; + managerUrl: string; + image: string; + namespace?: string; + attemptId?: string; + runnerId?: string; + sourceCommit?: string; + serviceAccountName?: string; + imagePullPolicy?: string; + backoffLimit?: number; + ttlSecondsAfterFinished?: number; +} + +interface CredentialProjection { + profile: BackendProfile | string; + secretRef: SecretRef; + volumeName: string; + mountPath: string; +} + +export function renderRunnerJobDryRun(options: RunnerJobRenderOptions): JsonRecord { + const render = renderRunnerJobManifest(options); + return { + dryRun: true, + mutation: false, + action: "render-kubernetes-job", + jobIdentity: { + kind: "Job", + namespace: render.namespace, + name: render.jobName, + serviceAccountName: render.serviceAccountName, + }, + runner: { + runId: options.run.id, + commandId: options.commandId, + attemptId: render.attemptId, + runnerId: render.runnerId, + backendProfile: options.run.backendProfile, + managerUrl: options.managerUrl, + sourceCommit: render.sourceCommit, + }, + secretRefs: render.secretRefs.map((item) => ({ profile: item.profile, name: item.secretRef.name, namespace: item.secretRef.namespace ?? render.namespace, keys: item.secretRef.keys ?? [], mountPath: item.mountPath, valuesPrinted: false })), + pollCommands: { + run: `bun scripts/agentrun-cli.ts runs show ${options.run.id} --manager-url ${options.managerUrl}`, + events: `bun scripts/agentrun-cli.ts runs events ${options.run.id} --manager-url ${options.managerUrl} --after-seq 0 --limit 100`, + }, + warnings: render.warnings, + manifest: render.manifest, + }; +} + +export function renderRunnerJobManifest(options: RunnerJobRenderOptions): { manifest: JsonRecord; namespace: string; jobName: string; runnerId: string; attemptId: string; sourceCommit: string; serviceAccountName: string; secretRefs: CredentialProjection[]; warnings: string[] } { + const namespace = options.namespace ?? "agentrun-v01"; + const attemptId = options.attemptId ?? `attempt_${Date.now().toString(36)}`; + const runnerId = options.runnerId ?? `runner_${shortHash(`${options.run.id}:${attemptId}:${options.commandId}`)}`; + const sourceCommit = options.sourceCommit ?? process.env.AGENTRUN_SOURCE_COMMIT ?? "unknown"; + const serviceAccountName = options.serviceAccountName ?? "agentrun-v01-runner"; + const jobName = `agentrun-v01-runner-${shortDnsHash(options.run.id, attemptId)}`; + const secretRefs = credentialProjections(options.run, namespace); + const warnings: string[] = []; + if (secretRefs.length === 0) warnings.push("run executionPolicy.secretScope 未声明 provider SecretRef;runner 将按 secret-unavailable 上报,而不会降级直连外部凭据"); + const env = runnerEnv(options, { namespace, jobName, runnerId, attemptId, sourceCommit, secretRefs }); + const manifest: JsonRecord = { + apiVersion: "batch/v1", + kind: "Job", + metadata: { + name: jobName, + namespace, + labels: labels(options.run, jobName), + annotations: { + "agentrun.pikastech.local/run-id": options.run.id, + "agentrun.pikastech.local/command-id": options.commandId, + "agentrun.pikastech.local/dry-run-render": "true", + }, + }, + spec: { + backoffLimit: options.backoffLimit ?? 0, + ttlSecondsAfterFinished: options.ttlSecondsAfterFinished ?? 86_400, + template: { + metadata: { + labels: labels(options.run, jobName), + annotations: { + "agentrun.pikastech.local/run-id": options.run.id, + "agentrun.pikastech.local/command-id": options.commandId, + }, + }, + spec: { + serviceAccountName, + automountServiceAccountToken: false, + restartPolicy: "Never", + containers: [ + { + name: "runner", + image: options.image, + imagePullPolicy: options.imagePullPolicy ?? "IfNotPresent", + command: ["bun", "src/runner/main.ts"], + env, + volumeMounts: secretRefs.map((item) => ({ name: item.volumeName, mountPath: item.mountPath, readOnly: true })), + resources: { + requests: { cpu: "250m", memory: "512Mi" }, + limits: { cpu: "2", memory: "4Gi" }, + }, + securityContext: { + allowPrivilegeEscalation: false, + readOnlyRootFilesystem: false, + capabilities: { drop: ["ALL"] }, + }, + }, + ], + volumes: secretRefs.map(secretVolume), + }, + }, + }, + }; + return { manifest, namespace, jobName, runnerId, attemptId, sourceCommit, serviceAccountName, secretRefs, warnings }; +} + +function runnerEnv(options: RunnerJobRenderOptions, context: { namespace: string; jobName: string; runnerId: string; attemptId: string; sourceCommit: string; secretRefs: CredentialProjection[] }): JsonRecord[] { + const codexMount = context.secretRefs.find((item) => item.profile === "codex")?.mountPath ?? "/home/agentrun/.codex"; + return [ + { name: "AGENTRUN_MGR_URL", value: options.managerUrl }, + { name: "AGENTRUN_RUN_ID", value: options.run.id }, + { name: "AGENTRUN_COMMAND_ID", value: options.commandId }, + { name: "AGENTRUN_ATTEMPT_ID", value: context.attemptId }, + { name: "AGENTRUN_RUNNER_ID", value: context.runnerId }, + { name: "AGENTRUN_BACKEND_PROFILE", value: options.run.backendProfile }, + { name: "AGENTRUN_EXECUTION_POLICY_JSON", value: JSON.stringify(options.run.executionPolicy) }, + { name: "AGENTRUN_SOURCE_COMMIT", value: context.sourceCommit }, + { name: "AGENTRUN_RUNTIME_NAMESPACE", value: context.namespace }, + { name: "AGENTRUN_K8S_JOB_NAME", value: context.jobName }, + { name: "AGENTRUN_LOG_PATH", value: "/tmp/agentrun-runner.jsonl" }, + { name: "HOME", value: "/home/agentrun" }, + { name: "CODEX_HOME", value: codexMount }, + ]; +} + +function credentialProjections(run: RunRecord, namespace: string): CredentialProjection[] { + const policy: ExecutionPolicy = run.executionPolicy; + const credentials = policy.secretScope.providerCredentials ?? []; + return credentials.map((item, index) => ({ + profile: item.profile, + secretRef: item.secretRef.namespace ? item.secretRef : { ...item.secretRef, namespace }, + volumeName: sanitizeVolumeName(`${String(item.profile)}-${index}`), + mountPath: normalizeMountPath(item.secretRef.mountPath), + })); +} + +function secretVolume(item: CredentialProjection): JsonRecord { + const secret: JsonRecord = { + secretName: item.secretRef.name, + defaultMode: 256, + }; + const keys = item.secretRef.keys ?? []; + if (keys.length > 0) secret.items = keys.map((key) => ({ key, path: key, mode: 256 })); + return { name: item.volumeName, secret }; +} + +function normalizeMountPath(value: string | undefined): string { + if (!value || value === "~/.codex") return "/home/agentrun/.codex"; + if (value.startsWith("~/")) return `/home/agentrun/${value.slice(2)}`; + return value; +} + +function labels(run: RunRecord, jobName: string): JsonRecord { + return { + "app.kubernetes.io/name": "agentrun-runner", + "app.kubernetes.io/component": "runner", + "app.kubernetes.io/part-of": "agentrun", + "agentrun.pikastech.local/lane": "v0.1", + "agentrun.pikastech.local/run-hash": shortHash(run.id), + "job-name": jobName, + }; +} + +function shortDnsHash(...parts: string[]): string { + return shortHash(parts.join(":")); +} + +function shortHash(value: JsonValue): string { + return stableHash(value).slice(0, 12); +} + +function sanitizeVolumeName(value: string): string { + const sanitized = value.toLowerCase().replace(/[^a-z0-9-]+/gu, "-").replace(/^-+|-+$/gu, ""); + return sanitized.length > 0 ? sanitized.slice(0, 40) : "credential"; +} diff --git a/src/runner/main.ts b/src/runner/main.ts index e2eecda..722e2d4 100644 --- a/src/runner/main.ts +++ b/src/runner/main.ts @@ -1,4 +1,6 @@ import { runOnce, type RunnerOnceOptions } from "./run-once.js"; +import { AgentRunError, errorToJson } from "../common/errors.js"; +import { failureKindFromError } from "./manager-api.js"; const managerUrl = process.env.AGENTRUN_MGR_URL; const runId = process.env.AGENTRUN_RUN_ID; @@ -11,9 +13,23 @@ const options: RunnerOnceOptions = { managerUrl, runId, }; +if (process.env.AGENTRUN_COMMAND_ID) options.commandId = process.env.AGENTRUN_COMMAND_ID; +if (process.env.AGENTRUN_ATTEMPT_ID) options.attemptId = process.env.AGENTRUN_ATTEMPT_ID; if (process.env.AGENTRUN_RUNNER_ID) options.runnerId = process.env.AGENTRUN_RUNNER_ID; +if (process.env.AGENTRUN_BACKEND_PROFILE === "codex") options.backendProfile = "codex"; +if (process.env.AGENTRUN_K8S_JOB_NAME) options.placement = "kubernetes-job"; +if (process.env.AGENTRUN_SOURCE_COMMIT) options.sourceCommit = process.env.AGENTRUN_SOURCE_COMMIT; +if (process.env.AGENTRUN_K8S_JOB_NAME) options.jobName = process.env.AGENTRUN_K8S_JOB_NAME; +if (process.env.HOSTNAME) options.podName = process.env.HOSTNAME; +if (process.env.AGENTRUN_LOG_PATH) options.logPath = process.env.AGENTRUN_LOG_PATH; if (process.env.AGENTRUN_CODEX_COMMAND) options.codexCommand = process.env.AGENTRUN_CODEX_COMMAND; if (process.env.AGENTRUN_CODEX_ARGS) options.codexArgs = JSON.parse(process.env.AGENTRUN_CODEX_ARGS) as string[]; if (process.env.CODEX_HOME) options.codexHome = process.env.CODEX_HOME; -const result = await runOnce(options); -console.log(JSON.stringify({ ok: true, data: result })); +try { + const result = await runOnce(options); + console.log(JSON.stringify({ ok: true, data: result })); +} catch (error) { + const failureKind = failureKindFromError(error); + console.log(JSON.stringify({ ok: false, failureKind, message: error instanceof Error ? error.message : String(error), error: errorToJson(error) })); + process.exit(error instanceof AgentRunError && error.httpStatus >= 1 && error.httpStatus <= 255 ? error.httpStatus : 1); +} diff --git a/src/runner/manager-api.ts b/src/runner/manager-api.ts new file mode 100644 index 0000000..c707b55 --- /dev/null +++ b/src/runner/manager-api.ts @@ -0,0 +1,108 @@ +import { ManagerClient } from "../mgr/client.js"; +import { AgentRunError } from "../common/errors.js"; +import type { BackendEvent, BackendProfile, CommandRecord, FailureKind, JsonRecord, RunRecord, RunnerRecord, TerminalStatus } from "../common/types.js"; + +export interface RunnerRegistrationInput { + runId: string; + attemptId: string; + backendProfile: BackendProfile; + placement: "host-process" | "kubernetes-job"; + sourceCommit: string; + runnerId?: string; + jobName?: string; + podName?: string; + logPath?: string; +} + +export interface PollCommandsResult { + items: CommandRecord[]; + selected: CommandRecord | null; +} + +export interface RunnerFailureReport { + terminalStatus: TerminalStatus; + failureKind: FailureKind; + failureMessage: string; +} + +export class RunnerManagerApi { + readonly client: ManagerClient; + + constructor(readonly managerUrl: string) { + this.client = new ManagerClient(managerUrl); + } + + async register(input: RunnerRegistrationInput): Promise { + const body: JsonRecord = { + runId: input.runId, + attemptId: input.attemptId, + backendProfile: input.backendProfile, + placement: input.placement, + sourceCommit: input.sourceCommit, + }; + if (input.runnerId) body.id = input.runnerId; + if (input.jobName) body.jobName = input.jobName; + if (input.podName) body.podName = input.podName; + if (input.logPath) body.logPath = input.logPath; + return await this.client.post("/api/v1/runners/register", body) as RunnerRecord; + } + + async claim(runId: string, runnerId: string, leaseMs: number): Promise { + return await this.client.post(`/api/v1/runs/${encodeURIComponent(runId)}/claim`, { runnerId, leaseMs }) as RunRecord; + } + + async heartbeat(runId: string, runnerId: string, leaseMs: number): Promise { + return await this.client.patch(`/api/v1/runs/${encodeURIComponent(runId)}/lease`, { runnerId, leaseMs }) as RunRecord; + } + + async pollCommands(runId: string, options: { afterSeq?: number; limit?: number; commandId?: string }): Promise { + const afterSeq = options.afterSeq ?? 0; + const limit = options.limit ?? 20; + const response = await this.client.get(`/api/v1/runs/${encodeURIComponent(runId)}/commands?afterSeq=${afterSeq}&limit=${limit}`) as { items?: CommandRecord[] }; + const items = Array.isArray(response.items) ? response.items : []; + const selected = options.commandId ? items.find((item) => item.id === options.commandId && item.state === "pending" && item.type === "turn") ?? null : items.find((item) => item.state === "pending" && item.type === "turn") ?? null; + return { items, selected }; + } + + async ackCommand(commandId: string): Promise { + return await this.client.post(`/api/v1/commands/${encodeURIComponent(commandId)}/ack`, {}) as CommandRecord; + } + + async appendEvent(runId: string, event: BackendEvent): Promise { + return await this.client.post(`/api/v1/runs/${encodeURIComponent(runId)}/events`, event as unknown as JsonRecord) as JsonRecord; + } + + async reportStatus(runId: string, report: { terminalStatus: TerminalStatus; failureKind: FailureKind | null; failureMessage: string | null }): Promise { + return await this.client.patch(`/api/v1/runs/${encodeURIComponent(runId)}/status`, report as unknown as JsonRecord) as RunRecord; + } + + async reportFailure(runId: string, report: RunnerFailureReport): Promise<{ reported: boolean; run: RunRecord | null; reportError: string | null }> { + try { + await this.appendEvent(runId, { type: "error", payload: { failureKind: report.failureKind, message: report.failureMessage, source: "agentrun-runner" } }); + const run = await this.reportStatus(runId, report); + return { reported: true, run, reportError: null }; + } catch (error) { + return { reported: false, run: null, reportError: errorMessage(error) }; + } + } +} + +export function failureKindFromError(error: unknown): FailureKind { + if (error instanceof AgentRunError) return error.failureKind; + const message = errorMessage(error).toLowerCase(); + if (message.includes("auth") || message.includes("unauthorized") || message.includes("forbidden")) return "provider-auth-failed"; + if (message.includes("timeout")) return "backend-timeout"; + if (message.includes("lease") || message.includes("claim")) return "runner-lease-conflict"; + return "infra-failed"; +} + +export function terminalStatusForFailure(failureKind: FailureKind): TerminalStatus { + if (failureKind === "cancelled") return "cancelled"; + if (failureKind === "secret-unavailable" || failureKind === "tenant-policy-denied" || failureKind === "schema-invalid") return "blocked"; + return "failed"; +} + +export function errorMessage(error: unknown): string { + if (error instanceof Error) return error.message; + return String(error); +} diff --git a/src/runner/run-once.ts b/src/runner/run-once.ts index c383808..c1cdf72 100644 --- a/src/runner/run-once.ts +++ b/src/runner/run-once.ts @@ -1,35 +1,57 @@ -import { ManagerClient } from "../mgr/client.js"; +import { RunnerManagerApi, failureKindFromError, terminalStatusForFailure, errorMessage } from "./manager-api.js"; import { runBackendTurn, type BackendAdapterOptions } from "../backend/adapter.js"; -import type { CommandRecord, JsonRecord, RunRecord, RunnerRecord } from "../common/types.js"; +import type { BackendProfile, JsonRecord, RunRecord, RunnerRecord } from "../common/types.js"; export interface RunnerOnceOptions extends BackendAdapterOptions { managerUrl: string; runId: string; + commandId?: string; runnerId?: string; attemptId?: string; leaseMs?: number; + backendProfile?: BackendProfile; + placement?: "host-process" | "kubernetes-job"; + sourceCommit?: string; + jobName?: string; + podName?: string; + logPath?: string; } export async function runOnce(options: RunnerOnceOptions): Promise { - const client = new ManagerClient(options.managerUrl); - const runner = await client.post("/api/v1/runners/register", { - id: options.runnerId ?? undefined, + const api = new RunnerManagerApi(options.managerUrl); + const leaseMs = options.leaseMs ?? 60_000; + const attemptId = options.attemptId ?? `attempt_${Date.now().toString(36)}`; + const runner = await api.register({ runId: options.runId, - attemptId: options.attemptId ?? `attempt_${Date.now().toString(36)}`, - backendProfile: "codex", - placement: "host-process", - sourceCommit: process.env.AGENTRUN_SOURCE_COMMIT ?? "unknown", - } as JsonRecord) as unknown as RunnerRecord; - const claimed = await client.post(`/api/v1/runs/${options.runId}/claim`, { runnerId: runner.id, leaseMs: options.leaseMs ?? 60_000 }) as unknown as RunRecord; - const commandsResponse = await client.get(`/api/v1/runs/${options.runId}/commands?afterSeq=0&limit=20`) as { items?: CommandRecord[] }; - const command = commandsResponse.items?.find((item) => item.state === "pending" && item.type === "turn"); - if (!command) { - await client.patch(`/api/v1/runs/${options.runId}/status`, { terminalStatus: "blocked", failureKind: "schema-invalid", failureMessage: "no pending turn command" }); - return { runner, claimed, terminalStatus: "blocked", failureKind: "schema-invalid" }; + attemptId, + backendProfile: options.backendProfile ?? "codex", + placement: options.placement ?? "host-process", + sourceCommit: options.sourceCommit ?? process.env.AGENTRUN_SOURCE_COMMIT ?? "unknown", + ...(options.runnerId ? { runnerId: options.runnerId } : {}), + ...(options.jobName ? { jobName: options.jobName } : {}), + ...(options.podName ? { podName: options.podName } : {}), + ...(options.logPath ? { logPath: options.logPath } : {}), + }) as RunnerRecord; + let claimed: RunRecord; + try { + claimed = await api.claim(options.runId, runner.id, leaseMs); + await api.heartbeat(options.runId, runner.id, leaseMs); + } catch (error) { + const failureKind = failureKindFromError(error); + if (failureKind !== "runner-lease-conflict") { + await api.reportFailure(options.runId, { terminalStatus: terminalStatusForFailure(failureKind), failureKind, failureMessage: errorMessage(error) }); + } + throw error; } - await client.post(`/api/v1/commands/${command.id}/ack`, {}); + const commandsResponse = await api.pollCommands(options.runId, { afterSeq: 0, limit: 20, ...(options.commandId ? { commandId: options.commandId } : {}) }); + const command = commandsResponse.selected; + if (!command) { + await api.reportStatus(options.runId, { terminalStatus: "blocked", failureKind: "schema-invalid", failureMessage: "no pending turn command" }); + return { runner, claimed, terminalStatus: "blocked", failureKind: "schema-invalid", polledCommands: commandsResponse.items.length }; + } + await api.ackCommand(command.id); const result = await runBackendTurn(claimed, command, options); - for (const event of result.events) await client.post(`/api/v1/runs/${options.runId}/events`, event as unknown as JsonRecord); - const finalRun = await client.patch(`/api/v1/runs/${options.runId}/status`, { terminalStatus: result.terminalStatus, failureKind: result.failureKind, failureMessage: result.failureMessage }) as unknown as RunRecord; + for (const event of result.events) await api.appendEvent(options.runId, event); + const finalRun = await api.reportStatus(options.runId, { terminalStatus: result.terminalStatus, failureKind: result.failureKind, failureMessage: result.failureMessage }) as RunRecord; return { runner, commandId: command.id, terminalStatus: result.terminalStatus, failureKind: result.failureKind, run: finalRun } as JsonRecord; } diff --git a/src/selftest/cases/00-redaction-postgres.ts b/src/selftest/cases/00-redaction-postgres.ts new file mode 100644 index 0000000..c87bc62 --- /dev/null +++ b/src/selftest/cases/00-redaction-postgres.ts @@ -0,0 +1,24 @@ +import assert from "node:assert/strict"; +import { openAgentRunStoreFromEnv } from "../../mgr/store.js"; +import { postgresMigrationContract } from "../../mgr/postgres-store.js"; +import { redactText } from "../../common/redaction.js"; +import { AgentRunError } from "../../common/errors.js"; +import type { SelfTestCase } from "../harness.js"; + +const selfTest: SelfTestCase = async () => { + assert.equal(redactText("Authorization: Bearer abc123"), "Authorization: Bearer REDACTED"); + assert.equal(redactText('{"token":"test-token-material"}').includes("test-token-material"), false); + await assert.rejects( + () => openAgentRunStoreFromEnv({}), + (error) => error instanceof AgentRunError && error.failureKind === "infra-failed" && error.message.includes("DATABASE_URL is required"), + ); + const postgresContract = postgresMigrationContract(); + assert.equal(postgresContract.latestMigrationId, "001_v01_initial_durable_store"); + assert.ok(Array.isArray(postgresContract.requiredTables)); + assert.ok(postgresContract.requiredTables.includes("agentrun_schema_migrations")); + assert.ok(postgresContract.requiredTables.includes("agentrun_runs")); + assert.ok(postgresContract.requiredTables.includes("agentrun_events")); + return { name: "redaction-postgres", tests: ["redaction", "postgres-store-contract"] }; +}; + +export default selfTest; diff --git a/src/selftest/cases/10-manager-memory.ts b/src/selftest/cases/10-manager-memory.ts new file mode 100644 index 0000000..935640b --- /dev/null +++ b/src/selftest/cases/10-manager-memory.ts @@ -0,0 +1,23 @@ +import assert from "node:assert/strict"; +import { startManagerServer } from "../../mgr/server.js"; +import { MemoryAgentRunStore } from "../../mgr/store.js"; +import { ManagerClient } from "../../mgr/client.js"; +import type { SelfTestCase } from "../harness.js"; + +const selfTest: SelfTestCase = async () => { + const server = await startManagerServer({ port: 0, host: "127.0.0.1", sourceCommit: "self-test", store: new MemoryAgentRunStore() }); + try { + const client = new ManagerClient(server.baseUrl); + const health = await client.get("/health/readiness") as { database?: { adapter?: string; reachable?: boolean; migrationReady?: boolean; failureKind?: string | null }; secretRefs?: { valuesPrinted?: boolean } }; + assert.equal(health.database?.adapter, "memory-self-test"); + assert.equal(health.database?.reachable, true); + assert.equal(health.database?.migrationReady, true); + assert.equal(health.database?.failureKind, null); + assert.equal(health.secretRefs?.valuesPrinted, false); + return { name: "manager-memory", tests: ["manager-memory-lifecycle"] }; + } finally { + await new Promise((resolve) => server.server.close(() => resolve())); + } +}; + +export default selfTest; diff --git a/src/selftest/cases/20-runner-k8s-job.ts b/src/selftest/cases/20-runner-k8s-job.ts new file mode 100644 index 0000000..7c067f7 --- /dev/null +++ b/src/selftest/cases/20-runner-k8s-job.ts @@ -0,0 +1,32 @@ +import assert from "node:assert/strict"; +import { startManagerServer } from "../../mgr/server.js"; +import { MemoryAgentRunStore } from "../../mgr/store.js"; +import { ManagerClient } from "../../mgr/client.js"; +import { renderRunnerJobDryRun } from "../../runner/k8s-job.js"; +import type { RunRecord } from "../../common/types.js"; +import { assertNoSecretLeak, createRunWithCommand, type SelfTestCase } from "../harness.js"; + +const selfTest: SelfTestCase = async (context) => { + const server = await startManagerServer({ port: 0, host: "127.0.0.1", sourceCommit: "self-test", store: new MemoryAgentRunStore() }); + try { + const client = new ManagerClient(server.baseUrl); + const item = await createRunWithCommand(client, context, "job smoke", "selftest-job-render", 15_000); + const rendered = renderRunnerJobDryRun({ + run: await client.get(`/api/v1/runs/${item.runId}`) as RunRecord, + commandId: item.commandId, + managerUrl: server.baseUrl, + image: "127.0.0.1:5000/agentrun/agentrun-mgr@sha256:1111111111111111111111111111111111111111111111111111111111111111", + attemptId: "attempt_selftest", + sourceCommit: "self-test", + }); + assert.equal(rendered.dryRun, true); + assert.equal(rendered.mutation, false); + assert.equal((rendered.jobIdentity as { serviceAccountName?: string }).serviceAccountName, "agentrun-v01-runner"); + assertNoSecretLeak(rendered); + return { name: "runner-k8s-job", tests: ["runner-k8s-job-dry-run"] }; + } finally { + await new Promise((resolve) => server.server.close(() => resolve())); + } +}; + +export default selfTest; diff --git a/src/selftest/cases/30-codex-stdio.ts b/src/selftest/cases/30-codex-stdio.ts new file mode 100644 index 0000000..299937c --- /dev/null +++ b/src/selftest/cases/30-codex-stdio.ts @@ -0,0 +1,71 @@ +import assert from "node:assert/strict"; +import path from "node:path"; +import os from "node:os"; +import { startManagerServer } from "../../mgr/server.js"; +import { MemoryAgentRunStore } from "../../mgr/store.js"; +import { ManagerClient } from "../../mgr/client.js"; +import { runOnce } from "../../runner/run-once.js"; +import type { FailureKind, JsonRecord, TerminalStatus } from "../../common/types.js"; +import { assertNoSecretLeak, createRunWithCommand, type SelfTestCase, type SelfTestContext } from "../harness.js"; + +const selfTest: SelfTestCase = async (context) => { + const server = await startManagerServer({ port: 0, host: "127.0.0.1", sourceCommit: "self-test", store: new MemoryAgentRunStore() }); + try { + const client = new ManagerClient(server.baseUrl); + const happy = await createRunWithCommand(client, context, "hello", "selftest-turn", 15_000); + const result = await runOnce({ managerUrl: server.baseUrl, runId: happy.runId, codexCommand: context.fakeCodexCommand, codexArgs: context.fakeCodexArgs, codexHome: context.codexHome, env: { CODEX_HOME: context.codexHome } }); + assert.equal(result.terminalStatus, "completed"); + assert.equal(typeof (result.runner as { id?: unknown }).id, "string"); + const events = await client.get(`/api/v1/runs/${happy.runId}/events?afterSeq=0&limit=100`) as { items?: Array<{ type: string; payload: unknown }> }; + assert.ok(events.items?.some((event) => event.type === "assistant_message")); + assert.ok(events.items?.some((event) => event.type === "backend_status" && JSON.stringify(event.payload).includes("run-claimed"))); + assertNoSecretLeak(events); + const finalRun = await client.get(`/api/v1/runs/${happy.runId}`) as { terminalStatus?: string }; + assert.equal(finalRun.terminalStatus, "completed"); + + await runFailureCase({ client, managerUrl: server.baseUrl, context, mode: "missing-turn-result", expectedStatus: "failed", expectedFailureKind: "backend-response-invalid" }); + await runFailureCase({ client, managerUrl: server.baseUrl, context, mode: "invalid-json", expectedStatus: "failed", expectedFailureKind: "backend-json-parse-error" }); + await runFailureCase({ client, managerUrl: server.baseUrl, context, mode: "missing-terminal", expectedStatus: "failed", expectedFailureKind: "backend-timeout", timeoutMs: 500 }); + await runSpawnFailureCase({ client, managerUrl: server.baseUrl, context }); + + return { name: "codex-stdio", tests: ["runner-lease-heartbeat", "codex-stdio-fake-turn", "codex-stdio-missing-turn-result", "codex-stdio-invalid-json", "codex-stdio-timeout", "codex-stdio-spawn-failure"] }; + } finally { + await new Promise((resolve) => server.server.close(() => resolve())); + } +}; + +async function runFailureCase(options: { client: ManagerClient; managerUrl: string; context: SelfTestContext; mode: string; expectedStatus: TerminalStatus; expectedFailureKind: FailureKind; timeoutMs?: number }): Promise { + const item = await createRunWithCommand(options.client, options.context, `failure ${options.mode}`, `selftest-${options.mode}`, options.timeoutMs ?? 3_000); + const result = await runOnce({ + managerUrl: options.managerUrl, + runId: item.runId, + codexCommand: options.context.fakeCodexCommand, + codexArgs: options.context.fakeCodexArgs, + codexHome: options.context.codexHome, + env: { CODEX_HOME: options.context.codexHome, AGENTRUN_FAKE_CODEX_MODE: options.mode }, + }) as JsonRecord; + assert.equal(result.terminalStatus, options.expectedStatus, options.mode); + assert.equal(result.failureKind, options.expectedFailureKind, options.mode); + const events = await options.client.get(`/api/v1/runs/${item.runId}/events?afterSeq=0&limit=100`) as { items?: Array<{ type: string; payload: unknown }> }; + assert.ok(events.items?.some((event) => event.type === "error"), options.mode); + assertNoSecretLeak(events); +} + +async function runSpawnFailureCase(options: { client: ManagerClient; managerUrl: string; context: SelfTestContext }): Promise { + const item = await createRunWithCommand(options.client, options.context, "failure spawn", "selftest-spawn-failure", 3_000); + const result = await runOnce({ + managerUrl: options.managerUrl, + runId: item.runId, + codexCommand: path.join(os.tmpdir(), `agentrun-missing-codex-${process.pid}`), + codexArgs: [], + codexHome: options.context.codexHome, + env: { CODEX_HOME: options.context.codexHome }, + }) as JsonRecord; + assert.equal(result.terminalStatus, "failed", "spawn failure"); + assert.equal(result.failureKind, "backend-spawn-failed", "spawn failure"); + const events = await options.client.get(`/api/v1/runs/${item.runId}/events?afterSeq=0&limit=100`) as { items?: Array<{ type: string; payload: unknown }> }; + assert.ok(events.items?.some((event) => event.type === "error"), "spawn failure"); + assertNoSecretLeak(events); +} + +export default selfTest; diff --git a/src/selftest/cases/40-secret-render.ts b/src/selftest/cases/40-secret-render.ts new file mode 100644 index 0000000..dee1f75 --- /dev/null +++ b/src/selftest/cases/40-secret-render.ts @@ -0,0 +1,48 @@ +import { mkdir, writeFile } from "node:fs/promises"; +import path from "node:path"; +import assert from "node:assert/strict"; +import { renderCodexProviderSecretPlan } from "../../../scripts/src/secret-render.js"; +import { AgentRunError } from "../../common/errors.js"; +import type { SelfTestCase } from "../harness.js"; + +const selfTest: SelfTestCase = async (context) => { + const secretPlan = await renderCodexProviderSecretPlan({ codexHome: context.codexHome, dryRun: true }); + assert.equal(secretPlan.namespace, "agentrun-v01"); + assert.equal(secretPlan.secretName, "agentrun-v01-provider-codex"); + assert.deepEqual(secretPlan.keys, ["auth.json", "config.toml"]); + assert.equal(secretPlan.writeAttempted, false); + assert.equal(secretPlan.totalBytes, Buffer.byteLength(JSON.stringify({ token: "test-token-material" }), "utf8") + Buffer.byteLength("model = \"gpt-test\"\n", "utf8")); + assert.match(String(secretPlan.sha256), /^[a-f0-9]{64}$/u); + const renderedSecretJson = JSON.stringify(secretPlan); + assert.equal(renderedSecretJson.includes("test-token-material"), false); + assert.equal(renderedSecretJson.includes("gpt-test"), false); + assert.equal(renderedSecretJson.includes("model ="), false); + + await assert.rejects( + () => renderCodexProviderSecretPlan({ codexHome: path.join(context.tmp, "missing-codex-home"), dryRun: true }), + (error) => error instanceof AgentRunError && error.failureKind === "secret-unavailable", + ); + const invalidCodexHome = path.join(context.tmp, "invalid-codex-home"); + await mkdir(invalidCodexHome, { recursive: true }); + await writeFile(path.join(invalidCodexHome, "auth.json"), "not-json"); + await writeFile(path.join(invalidCodexHome, "config.toml"), "model = \"gpt-test\"\n"); + await assert.rejects( + () => renderCodexProviderSecretPlan({ codexHome: invalidCodexHome, dryRun: true }), + (error) => error instanceof AgentRunError && error.failureKind === "schema-invalid", + ); + await writeFile(path.join(invalidCodexHome, "auth.json"), JSON.stringify({ token: "" })); + await assert.rejects( + () => renderCodexProviderSecretPlan({ codexHome: invalidCodexHome, dryRun: true }), + (error) => error instanceof AgentRunError && error.failureKind === "secret-unavailable", + ); + await writeFile(path.join(invalidCodexHome, "auth.json"), JSON.stringify({ token: "test-token-material" })); + await writeFile(path.join(invalidCodexHome, "config.toml"), "model ="); + await assert.rejects( + () => renderCodexProviderSecretPlan({ codexHome: invalidCodexHome, dryRun: true }), + (error) => error instanceof AgentRunError && error.failureKind === "schema-invalid", + ); + + return { name: "secret-render", tests: ["codex-secret-dry-run"] }; +}; + +export default selfTest; diff --git a/src/selftest/fake-codex-app-server.ts b/src/selftest/fake-codex-app-server.ts index 316a9b7..2f14559 100644 --- a/src/selftest/fake-codex-app-server.ts +++ b/src/selftest/fake-codex-app-server.ts @@ -1,6 +1,7 @@ import * as readline from "node:readline"; const rl = readline.createInterface({ input: process.stdin, crlfDelay: Infinity }); +const mode = process.env.AGENTRUN_FAKE_CODEX_MODE ?? "success"; let threadCounter = 0; let turnCounter = 0; @@ -9,6 +10,10 @@ for await (const line of rl) { if (trimmed.length === 0) continue; const message = JSON.parse(trimmed) as { id?: number; method?: string; params?: Record }; if (message.method === "initialize") { + if (mode === "invalid-json") { + process.stdout.write('{"token":"test-token-material"\n'); + process.exit(0); + } respond(message.id, { serverInfo: { name: "fake-codex-app-server", version: "self-test" } }); continue; } @@ -26,6 +31,17 @@ for await (const line of rl) { continue; } if (message.method === "turn/start") { + if (mode === "missing-turn-result") { + respond(message.id, {}); + continue; + } + if (mode === "missing-terminal") { + turnCounter += 1; + const turn = { id: `turn_selftest_${turnCounter}`, status: "running" }; + notify("turn/started", { turn }); + respond(message.id, { turn }); + continue; + } turnCounter += 1; const turn = { id: `turn_selftest_${turnCounter}`, status: "completed" }; notify("turn/started", { turn }); diff --git a/src/selftest/harness.ts b/src/selftest/harness.ts new file mode 100644 index 0000000..40ed9a0 --- /dev/null +++ b/src/selftest/harness.ts @@ -0,0 +1,82 @@ +import { mkdtemp, mkdir, writeFile, rm } from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; +import assert from "node:assert/strict"; +import { ManagerClient } from "../mgr/client.js"; +import type { JsonRecord } from "../common/types.js"; + +export interface SelfTestContext { + root: string; + tmp: string; + codexHome: string; + workspace: string; + fakeCodexPath: string; + fakeCodexCommand: string; + fakeCodexArgs: string[]; + cleanup(): Promise; +} + +export interface SelfTestResult { + name?: string; + tests?: string[]; +} + +export type SelfTestCase = (context: SelfTestContext) => Promise | SelfTestResult; + +export async function createSelfTestContext(root: string): Promise { + const tmp = await mkdtemp(path.join(os.tmpdir(), "agentrun-selftest-")); + const codexHome = path.join(tmp, "codex-home"); + const workspace = path.join(tmp, "workspace"); + await mkdir(codexHome, { recursive: true }); + await mkdir(workspace, { recursive: true }); + await writeFile(path.join(codexHome, "auth.json"), JSON.stringify({ token: "test-token-material" })); + await writeFile(path.join(codexHome, "config.toml"), "model = \"gpt-test\"\n"); + await writeFile(path.join(workspace, "README.md"), "self-test workspace\n"); + const fakeCodexPath = path.join(root, "src/selftest/fake-codex-app-server.ts"); + return { + root, + tmp, + codexHome, + workspace, + fakeCodexPath, + fakeCodexCommand: process.env.AGENTRUN_SELFTEST_CODEX_COMMAND ?? defaultFakeCommand(), + fakeCodexArgs: process.env.AGENTRUN_SELFTEST_CODEX_ARGS ? JSON.parse(process.env.AGENTRUN_SELFTEST_CODEX_ARGS) as string[] : defaultFakeArgs(fakeCodexPath), + cleanup: () => rm(tmp, { recursive: true, force: true }), + }; +} + +export async function createRunWithCommand(client: ManagerClient, context: Pick, prompt: string, idempotencyKey: string, timeoutMs: number): Promise<{ runId: string; commandId: string }> { + const run = await client.post("/api/v1/runs", { + tenantId: "unidesk", + projectId: "pikasTech/unidesk", + workspaceRef: { kind: "host-path", path: context.workspace }, + providerId: "G14", + backendProfile: "codex", + executionPolicy: { + sandbox: "workspace-write", + approval: "never", + timeoutMs, + network: "default", + secretScope: { allowCredentialEcho: false, providerCredentials: [{ profile: "codex", secretRef: { name: "agentrun-v01-provider-codex", keys: ["auth.json", "config.toml"], mountPath: context.codexHome } }] }, + }, + traceSink: null, + }) as { id: string }; + const command = await client.post(`/api/v1/runs/${run.id}/commands`, { type: "turn", payload: { prompt }, idempotencyKey }) as { id: string }; + const duplicate = await client.post(`/api/v1/runs/${run.id}/commands`, { type: "turn", payload: { prompt }, idempotencyKey }) as { id: string }; + assert.equal(duplicate.id, command.id); + return { runId: run.id, commandId: command.id }; +} + +export function assertNoSecretLeak(value: unknown): void { + const text = JSON.stringify(value); + assert.equal(text.includes("test-token-material"), false); + assert.equal(text.includes("Bearer test-token"), false); +} + +function defaultFakeCommand(): string { + return process.versions.bun ? process.execPath : "npx"; +} + +function defaultFakeArgs(fakePath: string): string[] { + return process.versions.bun ? [fakePath] : ["tsx", fakePath]; +} diff --git a/src/selftest/run.ts b/src/selftest/run.ts index 655fd90..75ee331 100644 --- a/src/selftest/run.ts +++ b/src/selftest/run.ts @@ -1,65 +1,28 @@ -import { mkdtemp, mkdir, writeFile, rm } from "node:fs/promises"; -import os from "node:os"; +import { readdir } from "node:fs/promises"; import path from "node:path"; -import { fileURLToPath } from "node:url"; -import assert from "node:assert/strict"; -import { startManagerServer } from "../mgr/server.js"; -import { ManagerClient } from "../mgr/client.js"; -import { runOnce } from "../runner/run-once.js"; -import { redactText } from "../common/redaction.js"; +import { pathToFileURL } from "node:url"; +import { createSelfTestContext, type SelfTestCase, type SelfTestResult } from "./harness.js"; -const root = path.resolve(path.dirname(fileURLToPath(import.meta.url)), "../.."); -const tmp = await mkdtemp(path.join(os.tmpdir(), "agentrun-selftest-")); +const root = path.resolve(import.meta.dirname, "../.."); +const casesDir = path.join(root, "src/selftest/cases"); +const context = await createSelfTestContext(root); try { - const codexHome = path.join(tmp, "codex-home"); - const workspace = path.join(tmp, "workspace"); - await mkdir(codexHome, { recursive: true }); - await mkdir(workspace, { recursive: true }); - await writeFile(path.join(codexHome, "auth.json"), JSON.stringify({ token: "test-token-material" })); - await writeFile(path.join(codexHome, "config.toml"), "model = \"gpt-test\"\n"); - await writeFile(path.join(workspace, "README.md"), "self-test workspace\n"); + const caseFiles = (await readdir(casesDir)) + .filter((file) => file.endsWith(".ts") && !file.endsWith(".d.ts")) + .sort(); + const results: SelfTestResult[] = []; - assert.equal(redactText("Authorization: Bearer abc123"), "Authorization: Bearer REDACTED"); - - const server = await startManagerServer({ port: 0, host: "127.0.0.1", sourceCommit: "self-test" }); - try { - const client = new ManagerClient(server.baseUrl); - const health = await client.get("/health/readiness") as { database?: { adapter?: string } }; - assert.equal(health.database?.adapter, "memory-self-test"); - const run = await client.post("/api/v1/runs", { - tenantId: "unidesk", - projectId: "pikasTech/unidesk", - workspaceRef: { kind: "host-path", path: workspace }, - providerId: "G14", - backendProfile: "codex", - executionPolicy: { - sandbox: "workspace-write", - approval: "never", - timeoutMs: 15_000, - network: "default", - secretScope: { allowCredentialEcho: false, providerCredentials: [{ profile: "codex", secretRef: { name: "agentrun-v01-provider-codex", keys: ["auth.json", "config.toml"], mountPath: codexHome } }] }, - }, - traceSink: null, - }) as { id: string }; - const command = await client.post(`/api/v1/runs/${run.id}/commands`, { type: "turn", payload: { prompt: "hello" }, idempotencyKey: "selftest-turn" }) as { id: string }; - const duplicate = await client.post(`/api/v1/runs/${run.id}/commands`, { type: "turn", payload: { prompt: "hello" }, idempotencyKey: "selftest-turn" }) as { id: string }; - assert.equal(duplicate.id, command.id); - const fakePath = path.join(root, "src/selftest/fake-codex-app-server.ts"); - const fakeCommand = process.env.AGENTRUN_SELFTEST_CODEX_COMMAND ?? process.execPath; - const fakeArgs = process.env.AGENTRUN_SELFTEST_CODEX_ARGS ? JSON.parse(process.env.AGENTRUN_SELFTEST_CODEX_ARGS) as string[] : [fakePath]; - const result = await runOnce({ managerUrl: server.baseUrl, runId: run.id, codexCommand: fakeCommand, codexArgs: fakeArgs, codexHome, env: { CODEX_HOME: codexHome } }); - assert.equal(result.terminalStatus, "completed"); - const events = await client.get(`/api/v1/runs/${run.id}/events?afterSeq=0&limit=100`) as { items?: Array<{ type: string; payload: unknown }> }; - assert.ok(events.items?.some((event) => event.type === "assistant_message")); - assert.equal(JSON.stringify(events).includes("test-token-material"), false); - assert.equal(JSON.stringify(events).includes("Bearer test-token"), false); - const finalRun = await client.get(`/api/v1/runs/${run.id}`) as { terminalStatus?: string }; - assert.equal(finalRun.terminalStatus, "completed"); - console.log(JSON.stringify({ ok: true, tests: ["manager-memory-lifecycle", "codex-stdio-fake-turn", "redaction"], runId: run.id })); - } finally { - await new Promise((resolve) => server.server.close(() => resolve())); + for (const file of caseFiles) { + const moduleUrl = pathToFileURL(path.join(casesDir, file)).href; + const imported = await import(moduleUrl) as { default?: SelfTestCase; selfTest?: SelfTestCase }; + const selfTest = imported.default ?? imported.selfTest; + if (typeof selfTest !== "function") throw new Error(`self-test case ${file} must export default or selfTest function`); + const result = await selfTest(context); + results.push({ name: result.name ?? file.replace(/\.ts$/u, ""), tests: result.tests ?? [] }); } + + console.log(JSON.stringify({ ok: true, cases: results, tests: results.flatMap((result) => result.tests) })); } finally { - await rm(tmp, { recursive: true, force: true }); + await context.cleanup(); }