Merge pull request #16 from pikasTech/commander-v01-component-integration

整合 v0.1 组件实现并拆分自测试边界
This commit is contained in:
Lyon
2026-05-29 12:11:35 +08:00
committed by GitHub
33 changed files with 1966 additions and 235 deletions
+1
View File
@@ -5,6 +5,7 @@ ENV NODE_ENV=production
ENV PORT=8080
COPY package.json tsconfig.json ./
RUN bun install --production
COPY scripts ./scripts
COPY src ./src
COPY deploy/deploy.json ./deploy/deploy.json
+3 -3
View File
@@ -108,7 +108,7 @@ POST /api/v1/commands/:commandId/ack
| 规格项 | 状态 | 说明 |
| --- | --- | --- |
| `agentrun-mgr` 服务规格 | 已定义 | 本文为 v0.1 manager 权威。 |
| Manager REST API | 实现 | 需要后续代码实现。 |
| Manager REST API | 实现骨架 | 已有 run、command、event、backends、runner register、claim、lease heartbeat、poll、ack、status 和 health/readiness 的 HTTP JSON 骨架;仍需后续真实部署验收。 |
| Tenant policy boundary | 已定义/未实现 | v0.1 只做最小 schema/allowlist/secretScope 边界。 |
| Postgres durable adapter | 实现 | 见 [spec-v01-postgres.md](spec-v01-postgres.md)。 |
| Observability 最小合同 | 已定义/未实现 | event、terminal status、failureKind 和 redaction 需要代码实现。 |
| Postgres durable adapter | 实现骨架 | live runtime 通过 `DATABASE_URL` 使用 Postgres durable storememory store 仅用于显式 self-test/dev。见 [spec-v01-postgres.md](spec-v01-postgres.md)。 |
| Observability 最小合同 | 已实现骨架 | events append-only、terminal status、failureKind、health/readiness store 状态、runner claim/lease/backend events 和 Secret/DSN redaction 已进入 manager 骨架;集中 trace 和部署级观测仍属后续工作。 |
+3 -3
View File
@@ -99,7 +99,7 @@ Runner 日志必须实时 flush 到文件或 pod logCLI 启动 runner 时必
| 规格项 | 状态 | 说明 |
| --- | --- | --- |
| `agentrun-runner` 服务规格 | 已定义 | 本文为 v0.1 runner 权威。 |
| Kubernetes Job runner | 实现 | 需要后续 runtime/GitOps 实现。 |
| host process runner | 实现 | 可作为 MVP 手动 dispatch,但仍必须走 manager API。 |
| claim/lease/report client | 实现 | 需要后续代码实现。 |
| Kubernetes Job runner | 部分实现 | 已提供 `runner job --dry-run` Job manifest 渲染骨架,固定使用 `agentrun-v01-runner` ServiceAccount、manager URL、runId/commandId/attemptId、executionPolicy 和 SecretRef 文件投影;尚未执行真实 Kubernetes create/apply。 |
| host process runner | 部分实现 | `runner start``src/runner/main.ts` 进入同一套 `runOnce`,可通过 manager register/claim/poll/report 执行自测试。 |
| claim/lease/report client | 部分实现 | 已拆出 runner manager API client,覆盖 register、claim、lease heartbeat、poll command、ack、append event 和 terminal statusdurable store 仍待 Postgres adapter 接入。 |
| runner redaction | 已定义/未实现 | 需与 backend adapter 共同实现。 |
@@ -52,6 +52,9 @@ Adapter 必须把 backend 错误映射为稳定 failureKind
| `provider-auth-failed` | provider credential 或 auth file 无效、上游返回 401/403。 |
| `provider-rate-limited` | 上游限流或 quota 错误。 |
| `backend-protocol-error` | backend 输出无法解析、协议字段缺失。 |
| `backend-json-parse-error` | backend stdout 不是合法 JSON-RPC 行。 |
| `backend-response-invalid` | backend JSON-RPC response/terminal notification 缺少必需字段。 |
| `backend-spawn-failed` | backend app-server 进程无法启动。 |
| `backend-failed` | backend 进程非零退出或 terminal error。 |
| `backend-timeout` | executionPolicy timeout 触发。 |
| `cancelled` | interrupt/cancel 生效。 |
+2 -1
View File
@@ -96,6 +96,7 @@ Run 的 `executionPolicy.secretScope` 应引用 `agentrun-v01-provider-codex`
| --- | --- | --- |
| Codex backend 规格 | 已定义 | 本文为 v0.1 第一真实 backend 权威。 |
| Codex Secret projection | 未实现 | 需要后续 Kubernetes Secret 和 runner/backend manifest。 |
| Codex adapter | 实现 | 需要后续代码实现。 |
| Codex adapter | 已部分实现 | 当前代码实现受控 `codex app-server --listen stdio://``initialize`/`thread/start`/`thread/resume`/`turn/start` response 校验、stderr 有界诊断、spawn/JSON parse/response invalid/timeout failureKind 和 fake app-server 自测试。 |
| 错误可观测与脱敏 | 已部分实现 | child env、cwd、workspace 和 Codex home 只输出摘要;stderr tail 有界且标记截断;事件和 failure 统一走 redaction。 |
| 真实 provider turn | 未实现 | 综合联调必须真实完成后才能发布通过。 |
| hostPath `~/.codex` | 不采用 | 只能通过 Kubernetes Secret projection 注入。 |
+5 -3
View File
@@ -30,6 +30,7 @@ bun scripts/agentrun-cli.ts runs events <runId> --after-seq <n> --limit <n>
bun scripts/agentrun-cli.ts commands create <runId> --type turn --json-file <payload.json>
bun scripts/agentrun-cli.ts commands show <commandId>
bun scripts/agentrun-cli.ts runner start --run-id <runId> --backend <backendProfile>
bun scripts/agentrun-cli.ts secrets codex render --dry-run [--codex-home <dir>]
bun scripts/agentrun-cli.ts backends list
bun scripts/agentrun-cli.ts server start|status|stop|logs
```
@@ -41,6 +42,7 @@ bun scripts/agentrun-cli.ts server start|status|stop|logs
- 查询类命令返回当前 state、terminal_status、failureKind、event cursor 或 logPath。
- `events` 默认分页且有界,必须支持 `afterSeq``limit`
- `server logs` 返回有界日志摘要,并指向完整日志文件或 Kubernetes pod identity。
- `secrets codex render --dry-run` 返回 Codex provider Secret 创建计划、输入文件 bytes/hash、SecretRef、manifest 摘要和 apply 命令形状;它不得输出 Secret value 或执行 Kubernetes 写操作。
## 配置与 Secret 边界
@@ -72,7 +74,7 @@ bun scripts/agentrun-cli.ts server start|status|stop|logs
| 规格项 | 状态 | 说明 |
| --- | --- | --- |
| AgentRun CLI 规格 | 已定义 | 本文为 v0.1 CLI 权威。 |
| `scripts/agentrun-cli.ts` | 实现 | 需要后续代码实现。 |
| CLI 调 manager REST | 实现 | 需随 manager API 实现。 |
| runner start | 实现 | 需随 runner Job/host process 实现。 |
| `scripts/agentrun-cli.ts` | 部分实现 | 已提供 run/command/event/backend/server 基础命令和 JSON envelope。 |
| CLI 调 manager REST | 部分实现 | CLI 通过 `ManagerClient` 调 manager REST;当前自测试使用内存 manager。 |
| runner start | 部分实现 | `runner start` 可执行 host process runner`runner job --dry-run` 可渲染 Kubernetes Job JSON,尚不执行 create/apply。 |
| CLI 测试规格 | 已定义 | 综合联调见 [spec-v01-validation.md](spec-v01-validation.md)。 |
+3 -2
View File
@@ -74,6 +74,7 @@ Secret 名称和 key 可以在实现时按 Kubernetes 命名限制微调,但
| --- | --- | --- |
| Postgres durable store 规格 | 已定义 | 本文为 v0.1 存储权威。 |
| StatefulSet/Service/PVC | 未实现 | 需要后续 GitOps lane 初始化。 |
| migration ledger | 实现 | 需要后续代码和 schema migration。 |
| manager Postgres adapter | 实现 | 需要后续 `agentrun-mgr` 实现。 |
| migration ledger | 实现骨架 | `agentrun-mgr` 启动 Postgres adapter 时幂等创建 `agentrun_schema_migrations` 并记录 migration id/checksumlive DB 迁移验收仍依赖后续 GitOps lane 初始化。 |
| manager Postgres adapter | 实现骨架 | `agentrun-mgr` 通过 `DATABASE_URL` 启用 Postgres adapter,持久化 runs、commands、events、runners、backends 和 leases;缺少 `DATABASE_URL` 时 live runtime fail fastmemory 只允许显式 self-test/dev。 |
| health/readiness store 状态 | 已实现骨架 | health/readiness 返回 adapter、reachable、migrationReady、migrationId、failureKind 和 redacted Secret 状态,不输出 DSN 明文。 |
| file/sqlite durable store | 不采用 | 只可用于临时本地测试,不作为 v0.1 runtime truth。 |
+23 -3
View File
@@ -106,6 +106,25 @@ runner/backend Pod
Secret 创建和轮换不由 source branch 自动生成;source branch 只声明需要哪个 SecretRef。后续如果接入 External Secrets、Vault、SealedSecrets 或 SOPS,必须新增或更新本 spec,明确 controller、source of truth、rotation 和 redaction 规则。
## Codex Secret dry-run 工具
`v0.1` 提供只读 CLI 工具,用 operator 本地 `~/.codex/auth.json``~/.codex/config.toml` 构造 Kubernetes Secret 创建计划:
```bash
bun scripts/agentrun-cli.ts secrets codex render --dry-run
```
可选参数:
- `--codex-home <dir>`:覆盖默认 `~/.codex` 输入目录。
- `--auth-file <path>` / `--config-file <path>`:分别覆盖输入文件路径。
- `--namespace <name>`:默认 `agentrun-v01`
- `--secret-name <name>`:默认 `agentrun-v01-provider-codex`
输出必须是 JSON,并且只包含 `namespace``secretName``keys`、每个输入文件的 `bytes``sha256`/`contentHash`、整体 hash、redaction 状态、apply 命令形状和 Secret manifest 摘要。输出不得包含 Secret value、`auth.json` 明文、`config.toml` 明文、base64 `data` 字段或可直接恢复 credential 的内容。工具只支持 `--dry-run`;不得执行 `kubectl apply`
失败必须结构化返回 `failureKind`:缺文件、不可读文件或空 credential 归类为 `secret-unavailable`;非法 JSON/TOML 归类为 `schema-invalid`
## 日志与事件 Redaction
- event、trace、日志、CLI 输出、health 和 diagnostics 不得打印 Secret 值。
@@ -132,7 +151,8 @@ Secret 创建和轮换不由 source branch 自动生成;source branch 只声
| 规格项 | 状态 | 说明 |
| --- | --- | --- |
| Secret 分发规格 | 已定义 | 本文为 v0.1 provider credential 分发权威。 |
| Kubernetes SecretRef 注入 | 实现 | 需要后续 GitOps/runtime 实现。 |
| Codex auth/config file projection | 实现 | 需要后续 runner/backend adapter 实现,测试来源为 `~/.codex/auth.json``~/.codex/config.toml`。 |
| redaction 最小规则 | 已定义 | 需要后续代码实现和测试。 |
| Kubernetes SecretRef 注入 | 部分实现 | runner Job dry-run 渲染已按 run `executionPolicy.secretScope.providerCredentials` 生成 Secret volume projection 和 `CODEX_HOME`,但尚未 apply 到集群。 |
| Codex Secret dry-run 工具 | 实现 | `bun scripts/agentrun-cli.ts secrets codex render --dry-run` 只输出 Secret 创建计划、hash 和 redacted manifest 摘要,不执行 apply。 |
| Codex auth/config file projection | 部分实现 | backend readiness 检查 `auth.json`/`config.toml` 可读性,缺失时返回 `secret-unavailable`;self-test 使用临时文件模拟投影。 |
| redaction 最小规则 | 部分实现 | Secret dry-run 工具、event、Job dry-run 输出和 self-test 已验证不打印测试 token;复杂审计仍待后续补齐。 |
| 外部 secret manager | 未采用 | 如需 Vault/ExternalSecrets/SOPS,后续单独更新规格。 |
+16 -3
View File
@@ -39,6 +39,19 @@ UniDesk or HWLAB client
Runner inbound API 只允许本地或私有诊断,不作为业务客户端入口。业务客户端只能调用 `agentrun-mgr`
## 并行开发边界
`v0.1` 默认会由多个 agent 并行实现 manager、runner、backend、Secret、Postgres、CLI 和 CI/CD。架构必须主动降低并行冲突,而不是把所有组件都写进同一个入口文件、总表或巨型测试文件。
并行开发的长期规则:
- 组件实现按目录分层:manager 写入 `src/mgr/**`runner 写入 `src/runner/**`backend 写入 `src/backend/**`CLI 写入 `scripts/src/**`,部署写入 `deploy/**`。跨组件共享类型和工具只放 `src/common/**`,且变更必须保持小而稳定。
- 自测试采用可发现 case 模型:`src/selftest/run.ts` 只负责发现和调度,公共 fixture 放 `src/selftest/harness.ts`,组件测试放 `src/selftest/cases/*.ts`。新增组件自测试应优先新增 case 文件,不应反复修改总入口。
- 长期文档状态归属到对应 spec:组件实现状态优先维护在 `spec-v01-agentrun-mgr.md``spec-v01-agentrun-runner.md``spec-v01-backend-codex.md` 等组件 spec 中;`spec-v01-services.md` 只保留总览和跨组件边界,避免成为并行改动热点。
- 对并行冲突的处理优先优化模块边界:如果多个 PR 反复冲突在同一文件或同一段状态表,应拆出可独立追加的模块、case、manifest、helper 或 spec 子项;只有确认边界已经合理后,才做普通冲突解析。
- 共享 API 合同要先稳定再并行扩展:`RunRecord``CommandRecord``ExecutionPolicy``SecretRef``FailureKind` 等跨组件类型改动必须兼顾 manager、runner、backend、CLI、self-test 和 GitOps render,不得为单个组件引入临时字段绕开合同。
- 过程性合并、冲突解析和临时验证证据进入 PR/issue;长期参考文档只记录稳定的并行开发原则、边界和判断标准。
## 服务总表
| 对象 | 类型 | v0.1 处理 | 说明 | 后续单独 spec |
@@ -154,7 +167,7 @@ Manager 负责校验、保存和返回这些字段;runner 只能消费已保
| Codex backend 规格 | 已定义 | 见 [spec-v01-backend-codex.md](spec-v01-backend-codex.md)。 |
| AgentRun CLI 规格 | 已定义 | 见 [spec-v01-cli.md](spec-v01-cli.md)。 |
| Scheduler deferred 规格 | 已定义 | 见 [spec-v01-scheduler.md](spec-v01-scheduler.md)。 |
| `agentrun-mgr` 实现 | 实现 | 需后续代码实现。 |
| `agentrun-runner` 实现 | 实现 | 需后续代码实现。 |
| 第一真实 backend | 实现 | 默认候选 Codex。 |
| `agentrun-mgr` 实现 | 实现骨架 | 已有 REST API、Postgres durable store 选择、migration ledger、runner claim/lease/report、health/readiness 和 self-test memory 模式骨架;仍需 G14 `agentrun-v01` 真实 Postgres/GitOps 验收。 |
| `agentrun-runner` 实现 | 实现骨架 | 已有 host process runner 与 Kubernetes Job dry-run 渲染骨架,runner 通过 manager API claim/poll/report,不直连 Postgres;真实 Kubernetes Job 联调仍需验收。 |
| 第一真实 backend | 实现骨架 | Codex app-server stdio backend 已有协议、失败分类、脱敏和 fake self-test;真实 Codex provider turn 仍需综合联调验收。 |
| 自动 scheduler | Deferred | 不作为 `v0.1` 第一阶段验收目标。 |
+2
View File
@@ -26,6 +26,8 @@
自测试应使用 Bun + TypeScript 运行,Codex 相关自测试可以使用 fake app-server JSON-RPC client 模拟 `initialize``thread/start``thread/resume``turn/start`、assistant 输出、协议错误、timeout 和 transport close。
仓库内自测试入口必须保持可并行扩展:`src/selftest/run.ts` 只负责发现和调度 `src/selftest/cases/*.ts`,公共 fixture 和断言辅助放 `src/selftest/harness.ts`。新增组件自测试优先新增独立 case 文件,避免多个并行任务修改同一个总入口导致反复冲突。若某个 case 需要跨组件验证,可以依赖正式 manager/runner/backend API,但仍应作为独立 case 命名和维护。
自测试输出可以是 Tekton TaskRun result、JUnit、tap、JSON summary 或普通日志,但不得回写 source branch,也不得伪装成综合联调通过。
## 综合联调
+5
View File
@@ -9,8 +9,13 @@
"test": "bun run src/selftest/run.ts",
"cli": "bun scripts/agentrun-cli.ts"
},
"dependencies": {
"pg": "^8.13.1"
},
"devDependencies": {
"@types/pg": "^8.11.10",
"@types/node": "^22.10.0",
"tsx": "^4.19.2",
"typescript": "^5.8.3"
}
}
+57 -3
View File
@@ -1,8 +1,11 @@
import { readFile } from "node:fs/promises";
import { startManagerServer } from "../../src/mgr/server.js";
import { MemoryAgentRunStore } from "../../src/mgr/store.js";
import { ManagerClient } from "../../src/mgr/client.js";
import { runOnce } from "../../src/runner/run-once.js";
import type { JsonRecord, JsonValue } from "../../src/common/types.js";
import { renderRunnerJobDryRun } from "../../src/runner/k8s-job.js";
import { renderCodexProviderSecretPlan } from "./secret-render.js";
import type { JsonRecord, JsonValue, RunRecord } from "../../src/common/types.js";
import { AgentRunError, errorToJson } from "../../src/common/errors.js";
import type { RunnerOnceOptions } from "../../src/runner/run-once.js";
@@ -28,6 +31,7 @@ async function dispatch(args: ParsedArgs): Promise<JsonValue> {
if (group === "server" && command === "start") return startServer(args);
if (group === "server" && command === "status") return client(args).get("/health/readiness");
if (group === "backends" && command === "list") return client(args).get("/api/v1/backends");
if (group === "secrets" && command === "codex" && id === "render") return renderCodexSecret(args);
if (group === "runs" && command === "create") return client(args).post("/api/v1/runs", await jsonFile(args));
if (group === "runs" && command === "show" && id) return client(args).get(`/api/v1/runs/${encodeURIComponent(id)}`);
if (group === "runs" && command === "events" && id) return client(args).get(`/api/v1/runs/${encodeURIComponent(id)}/events?afterSeq=${flag(args, "after-seq", "0")}&limit=${flag(args, "limit", "100")}`);
@@ -58,14 +62,62 @@ async function dispatch(args: ParsedArgs): Promise<JsonValue> {
if (codexHome) options.codexHome = codexHome;
return runOnce(options) as unknown as JsonValue;
}
if (group === "runner" && command === "job") return renderRunnerJob(args);
throw new AgentRunError("schema-invalid", `unsupported command: ${args.positional.join(" ")}`, { httpStatus: 2 });
}
async function renderRunnerJob(args: ParsedArgs): Promise<JsonRecord> {
if (args.flags.get("dry-run") !== true) throw new AgentRunError("schema-invalid", "runner job only supports --dry-run in v0.1", { httpStatus: 2 });
const runId = flag(args, "run-id", "");
const commandId = flag(args, "command-id", "");
const image = flag(args, "image", "");
if (!runId) throw new AgentRunError("schema-invalid", "runner job requires --run-id", { httpStatus: 2 });
if (!commandId) throw new AgentRunError("schema-invalid", "runner job requires --command-id", { httpStatus: 2 });
if (!image) throw new AgentRunError("schema-invalid", "runner job requires --image", { httpStatus: 2 });
const run = await client(args).get(`/api/v1/runs/${encodeURIComponent(runId)}`) as RunRecord;
const options = {
run,
commandId,
image,
managerUrl: managerUrl(args),
namespace: optionalFlag(args, "namespace") ?? "agentrun-v01",
};
const attemptId = optionalFlag(args, "attempt-id");
const runnerId = optionalFlag(args, "runner-id");
const sourceCommit = optionalFlag(args, "source-commit");
return renderRunnerJobDryRun({
...options,
...(attemptId ? { attemptId } : {}),
...(runnerId ? { runnerId } : {}),
...(sourceCommit ? { sourceCommit } : {}),
});
}
async function renderCodexSecret(args: ParsedArgs): Promise<JsonRecord> {
if (args.flags.get("dry-run") !== true) {
throw new AgentRunError("schema-invalid", "secrets codex render requires --dry-run", { httpStatus: 2 });
}
const options: Parameters<typeof renderCodexProviderSecretPlan>[0] = { dryRun: true };
const codexHome = optionalFlag(args, "codex-home");
const authFile = optionalFlag(args, "auth-file");
const configFile = optionalFlag(args, "config-file");
const namespace = optionalFlag(args, "namespace");
const secretName = optionalFlag(args, "secret-name");
if (codexHome) options.codexHome = codexHome;
if (authFile) options.authFile = authFile;
if (configFile) options.configFile = configFile;
if (namespace) options.namespace = namespace;
if (secretName) options.secretName = secretName;
return renderCodexProviderSecretPlan(options);
}
async function startServer(args: ParsedArgs): Promise<JsonRecord> {
const port = Number(flag(args, "port", "8080"));
const host = flag(args, "host", "0.0.0.0");
const started = await startManagerServer({ port, host });
return { serviceId: "agentrun-mgr", baseUrl: started.baseUrl, pid: process.pid, note: "foreground process; use Kubernetes/Tekton for v0.1 runtime" };
const storeMode = optionalFlag(args, "store") ?? process.env.AGENTRUN_STORE ?? process.env.AGENTRUN_MGR_STORE;
const started = await startManagerServer({ port, host, ...(storeMode === "memory" ? { store: new MemoryAgentRunStore() } : {}) });
const database = await started.store.health();
return { serviceId: "agentrun-mgr", baseUrl: started.baseUrl, pid: process.pid, database, note: "foreground process; use Kubernetes/Tekton for v0.1 runtime" };
}
function client(args: ParsedArgs): ManagerClient {
@@ -123,6 +175,8 @@ function help(): JsonRecord {
"commands create <runId> --type turn --json-file <payload.json>",
"commands show <commandId> --run-id <runId>",
"runner start --run-id <runId>",
"runner job --dry-run --run-id <runId> --command-id <commandId> --image <image>",
"secrets codex render --dry-run [--codex-home <dir>] [--namespace agentrun-v01] [--secret-name agentrun-v01-provider-codex]",
"backends list",
"server start|status",
],
+194
View File
@@ -0,0 +1,194 @@
import { createHash } from "node:crypto";
import { constants as fsConstants } from "node:fs";
import { access, readFile } from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import type { JsonRecord } from "../../src/common/types.js";
import { AgentRunError } from "../../src/common/errors.js";
export interface CodexSecretRenderOptions {
codexHome?: string;
authFile?: string;
configFile?: string;
namespace?: string;
secretName?: string;
dryRun?: boolean;
}
interface SecretFileSummary extends JsonRecord {
key: "auth.json" | "config.toml";
source: string;
bytes: number;
sha256: string;
contentHash: string;
}
interface SecretSourceFile {
key: "auth.json" | "config.toml";
path: string;
validate: (content: string, file: string) => unknown;
}
const defaultNamespace = "agentrun-v01";
const defaultSecretName = "agentrun-v01-provider-codex";
const secretKeys = ["auth.json", "config.toml"] as const;
const credentialKeyPattern = /(?:api[_-]?key|token|password|secret|credential|authorization|auth)/iu;
export async function renderCodexProviderSecretPlan(options: CodexSecretRenderOptions = {}): Promise<JsonRecord> {
if (options.dryRun === false) {
throw new AgentRunError("schema-invalid", "Codex provider Secret rendering only supports --dry-run in v0.1", { httpStatus: 2 });
}
const namespace = nonEmpty(options.namespace, defaultNamespace);
const secretName = nonEmpty(options.secretName, defaultSecretName);
const codexHome = resolvePath(nonEmpty(options.codexHome, path.join(os.homedir(), ".codex")));
const sources: SecretSourceFile[] = [
{ key: "auth.json", path: resolvePath(options.authFile ?? path.join(codexHome, "auth.json")), validate: validateAuthJson },
{ key: "config.toml", path: resolvePath(options.configFile ?? path.join(codexHome, "config.toml")), validate: validateConfigToml },
];
const files: SecretFileSummary[] = [];
const hash = createHash("sha256");
for (const source of sources) {
const content = await readSecretInput(source);
const bytes = Buffer.byteLength(content, "utf8");
if (bytes === 0) throw new AgentRunError("secret-unavailable", `${source.key} is empty`, { httpStatus: 2, details: { key: source.key, path: source.path } });
source.validate(content, source.path);
const sha256 = sha256Hex(content);
hash.update(source.key);
hash.update("\0");
hash.update(content, "utf8");
hash.update("\0");
files.push({ key: source.key, source: source.path, bytes, sha256, contentHash: `sha256:${sha256}` });
}
const manifestSummary: JsonRecord = {
apiVersion: "v1",
kind: "Secret",
metadata: { namespace, name: secretName },
type: "Opaque",
dataKeys: [...secretKeys],
dataRedacted: true,
};
return {
mode: "dry-run",
writeAttempted: false,
namespace,
secretName,
keys: [...secretKeys],
totalBytes: files.reduce((sum, file) => sum + file.bytes, 0),
sha256: hash.digest("hex"),
files,
manifestSummary,
apply: {
attempted: false,
command: `kubectl create secret generic ${secretName} -n ${namespace} --from-file=auth.json=<redacted> --from-file=config.toml=<redacted> --dry-run=client -o yaml | kubectl apply -f -`,
note: "本命令只展示形状;v0.1 工具不会执行 kubectl apply,也不会输出 Secret data。",
},
redaction: {
secretValuesPrinted: false,
manifestDataPrinted: false,
configTomlPrinted: false,
authJsonPrinted: false,
},
};
}
async function readSecretInput(source: SecretSourceFile): Promise<string> {
try {
await access(source.path, fsConstants.R_OK);
return await readFile(source.path, "utf8");
} catch (error) {
if (isNodeError(error, "ENOENT")) {
throw new AgentRunError("secret-unavailable", `${source.key} is missing`, { httpStatus: 2, details: { key: source.key, path: source.path } });
}
if (isNodeError(error, "EACCES") || isNodeError(error, "EPERM")) {
throw new AgentRunError("secret-unavailable", `${source.key} is not readable`, { httpStatus: 2, details: { key: source.key, path: source.path } });
}
throw error;
}
}
function validateAuthJson(content: string, file: string): unknown {
let parsed: unknown;
try {
parsed = JSON.parse(content);
} catch {
throw new AgentRunError("schema-invalid", "auth.json is not valid JSON", { httpStatus: 2, details: { key: "auth.json", path: file } });
}
if (typeof parsed !== "object" || parsed === null || Array.isArray(parsed)) {
throw new AgentRunError("schema-invalid", "auth.json must contain a JSON object", { httpStatus: 2, details: { key: "auth.json", path: file } });
}
const emptyField = findEmptyCredentialField(parsed);
if (emptyField) {
throw new AgentRunError("secret-unavailable", "auth.json contains an empty credential field", { httpStatus: 2, details: { key: "auth.json", path: file, field: emptyField } });
}
if (!hasNonEmptyCredentialField(parsed)) {
throw new AgentRunError("secret-unavailable", "auth.json does not contain any non-empty credential field", { httpStatus: 2, details: { key: "auth.json", path: file } });
}
return parsed;
}
function validateConfigToml(content: string, file: string): unknown {
const parser = (globalThis as typeof globalThis & { Bun?: { TOML?: { parse?: (value: string) => unknown } } }).Bun?.TOML?.parse;
if (typeof parser !== "function") {
throw new AgentRunError("infra-failed", "Bun TOML parser is unavailable", { httpStatus: 1, details: { key: "config.toml", path: file } });
}
let parsed: unknown;
try {
parsed = parser(content);
} catch {
throw new AgentRunError("schema-invalid", "config.toml is not valid TOML", { httpStatus: 2, details: { key: "config.toml", path: file } });
}
const emptyField = findEmptyCredentialField(parsed);
if (emptyField) {
throw new AgentRunError("secret-unavailable", "config.toml contains an empty credential field", { httpStatus: 2, details: { key: "config.toml", path: file, field: emptyField } });
}
return parsed;
}
function hasNonEmptyCredentialField(value: unknown): boolean {
if (Array.isArray(value)) return value.some((item) => hasNonEmptyCredentialField(item));
if (typeof value !== "object" || value === null) return false;
return Object.entries(value).some(([key, entry]) => {
if (credentialKeyPattern.test(key) && typeof entry === "string" && entry.trim().length > 0) return true;
return hasNonEmptyCredentialField(entry);
});
}
function findEmptyCredentialField(value: unknown, trail: string[] = []): string | null {
if (Array.isArray(value)) {
for (let index = 0; index < value.length; index += 1) {
const found = findEmptyCredentialField(value[index], [...trail, String(index)]);
if (found) return found;
}
return null;
}
if (typeof value !== "object" || value === null) return null;
for (const [key, entry] of Object.entries(value)) {
const nextTrail = [...trail, key];
if (credentialKeyPattern.test(key) && (entry === null || (typeof entry === "string" && entry.trim().length === 0))) return nextTrail.join(".");
const found = findEmptyCredentialField(entry, nextTrail);
if (found) return found;
}
return null;
}
function nonEmpty(value: string | undefined, fallback: string): string {
return typeof value === "string" && value.length > 0 ? value : fallback;
}
function resolvePath(file: string): string {
if (file === "~") return os.homedir();
if (file.startsWith("~/")) return path.join(os.homedir(), file.slice(2));
return path.resolve(file);
}
function sha256Hex(content: string): string {
return createHash("sha256").update(content, "utf8").digest("hex");
}
function isNodeError(error: unknown, code: string): boolean {
return typeof error === "object" && error !== null && "code" in error && (error as { code?: unknown }).code === code;
}
+1
View File
@@ -21,6 +21,7 @@ export async function runBackendTurn(run: RunRecord, command: CommandRecord, opt
timeoutMs: run.executionPolicy.timeoutMs,
};
if (typeof command.payload.model === "string") turnOptions.model = command.payload.model;
if (typeof command.payload.threadId === "string") turnOptions.threadId = command.payload.threadId;
if (options.codexCommand) turnOptions.command = options.codexCommand;
if (options.codexArgs) turnOptions.args = options.codexArgs;
if (options.env) turnOptions.env = options.env;
+416 -104
View File
@@ -1,13 +1,40 @@
import { spawn, type ChildProcessWithoutNullStreams } from "node:child_process";
import { createHash } from "node:crypto";
import { accessSync, constants as fsConstants } from "node:fs";
import path from "node:path";
import * as readline from "node:readline";
import type { BackendEvent, BackendTurnResult, FailureKind, JsonRecord, JsonValue, TerminalStatus } from "../common/types.js";
import { redactJson, redactText } from "../common/redaction.js";
const codexProtocol = "codex-app-server-jsonrpc-stdio";
const defaultCodexArgs = ["app-server", "--listen", "stdio://"];
const stderrBufferBytes = 64_000;
const stderrEventChars = 4_000;
const requestTimeoutCapMs = 30_000;
const childEnvSummaryKeys = [
"CODEX_HOME",
"HOME",
"PATH",
"HTTP_PROXY",
"HTTPS_PROXY",
"ALL_PROXY",
"NO_PROXY",
"http_proxy",
"https_proxy",
"all_proxy",
"no_proxy",
"OPENAI_API_KEY",
"CODEX_API_KEY",
"GITHUB_TOKEN",
"GH_TOKEN",
];
export interface CodexStdioTurnOptions {
prompt: string;
cwd: string;
model?: string;
threadId?: string;
approvalPolicy: string;
sandbox: string;
timeoutMs: number;
@@ -18,48 +45,87 @@ export interface CodexStdioTurnOptions {
}
interface PendingRequest {
method: string;
timer: NodeJS.Timeout;
resolve: (value: unknown) => void;
reject: (error: Error) => void;
}
interface CodexStdioCloseInfo extends JsonRecord {
code: number | null;
signal: string | null;
stderrTail: string;
stderrBytes: number;
stderrTruncated: boolean;
failureKind: FailureKind | null;
message: string | null;
}
class CodexStdioFailure extends Error {
readonly failureKind: FailureKind;
readonly phase: string;
readonly details: JsonRecord;
constructor(failureKind: FailureKind, message: string, phase: string, details: JsonRecord = {}) {
super(redactText(message));
this.name = "CodexStdioFailure";
this.failureKind = failureKind;
this.phase = phase;
this.details = redactJson(details);
}
}
export class CodexStdioClient {
private readonly child: ChildProcessWithoutNullStreams;
private readonly pending = new Map<number, PendingRequest>();
private readonly stderrChunks: Buffer[] = [];
private stderrTailBuffer = Buffer.alloc(0);
private stderrBytes = 0;
private nextId = 1;
private closed = false;
readonly closedPromise: Promise<{ code: number | null; signal: string | null; stderrTail: string }>;
private closeResolve!: (value: { code: number | null; signal: string | null; stderrTail: string }) => void;
private closeFailure: CodexStdioFailure | null = null;
readonly closedPromise: Promise<CodexStdioCloseInfo>;
private closeResolve!: (value: CodexStdioCloseInfo) => void;
constructor(options: { command?: string; args?: string[]; cwd: string; env?: NodeJS.ProcessEnv; onNotification: (message: JsonRecord) => void }) {
this.closedPromise = new Promise((resolve) => { this.closeResolve = resolve; });
this.child = spawn(options.command ?? "codex", options.args ?? ["app-server", "--listen", "stdio://"], {
cwd: options.cwd,
env: options.env ?? process.env,
stdio: "pipe",
});
this.child.stderr.on("data", (chunk: Buffer) => {
this.stderrChunks.push(chunk);
while (Buffer.concat(this.stderrChunks).length > 64_000) this.stderrChunks.shift();
});
const command = options.command ?? "codex";
const args = options.args ?? defaultCodexArgs;
try {
this.child = spawn(command, args, {
cwd: options.cwd,
env: options.env ?? process.env,
stdio: "pipe",
});
} catch (error) {
throw spawnFailure(command, error);
}
this.child.stderr.on("data", (chunk: Buffer) => this.appendStderr(chunk));
const rl = readline.createInterface({ input: this.child.stdout, crlfDelay: Infinity });
void this.readLines(rl, options.onNotification);
this.child.on("close", (code, signal) => this.handleClose(code, signal));
this.child.on("error", (error) => this.handleClose(127, error.message));
this.child.on("error", (error) => this.handleClose(127, null, spawnFailure(command, error)));
}
request(method: string, params: JsonRecord): Promise<unknown> {
if (this.closed) return Promise.reject(new Error("codex app-server is closed"));
request(method: string, params: JsonRecord, timeoutMs = requestTimeoutCapMs): Promise<unknown> {
if (this.closed) return Promise.reject(this.closeFailure ?? new CodexStdioFailure("backend-failed", "codex app-server is closed", `request:${method}`));
const id = this.nextId++;
const message = { id, method, params };
const effectiveTimeoutMs = positiveTimeout(timeoutMs);
return new Promise((resolve, reject) => {
this.pending.set(id, { resolve, reject });
this.child.stdin.write(`${JSON.stringify(message)}\n`);
const timer = setTimeout(() => {
this.rejectRequest(id, new CodexStdioFailure("backend-timeout", `Codex stdio request ${method} timed out after ${effectiveTimeoutMs}ms`, `request:${method}`, { method, timeoutMs: effectiveTimeoutMs }));
}, effectiveTimeoutMs);
this.pending.set(id, { method, timer, resolve, reject });
this.child.stdin.write(`${JSON.stringify(message)}\n`, "utf8", (error: Error | null | undefined) => {
if (!error) return;
this.rejectRequest(id, new CodexStdioFailure("backend-failed", `failed to write Codex stdio request ${method}: ${error.message}`, `request:${method}`, { method }));
});
});
}
notify(method: string, params: JsonRecord = {}): void {
if (!this.closed) this.child.stdin.write(`${JSON.stringify({ method, params })}\n`);
if (this.closed) return;
this.child.stdin.write(`${JSON.stringify({ method, params })}\n`, "utf8", () => undefined);
}
stop(): void {
@@ -70,33 +136,65 @@ export class CodexStdioClient {
}, 1500).unref?.();
}
private appendStderr(chunk: Buffer): void {
this.stderrBytes += chunk.byteLength;
const next = Buffer.concat([this.stderrTailBuffer, chunk]);
this.stderrTailBuffer = next.byteLength > stderrBufferBytes ? next.subarray(next.byteLength - stderrBufferBytes) : next;
}
private async readLines(rl: readline.Interface, onNotification: (message: JsonRecord) => void): Promise<void> {
try {
for await (const line of rl) {
const trimmed = String(line).trim();
if (trimmed.length === 0) continue;
const message = JSON.parse(trimmed) as JsonRecord;
const id = typeof message.id === "number" ? message.id : null;
const method = typeof message.method === "string" ? message.method : null;
if (id !== null && method === null) {
const pending = this.pending.get(id);
if (!pending) continue;
this.pending.delete(id);
if (message.error !== undefined) pending.reject(new Error(JSON.stringify(redactJson(message.error))));
else pending.resolve(message.result);
continue;
let message: JsonRecord;
try {
message = JSON.parse(trimmed) as JsonRecord;
} catch {
this.handleProtocolFailure(new CodexStdioFailure("backend-json-parse-error", "codex app-server emitted invalid JSON on stdout", "stdout:parse", { linePreview: redactText(trimmed.slice(0, 800)), lineChars: trimmed.length }));
break;
}
if (id !== null && method !== null) {
this.handleServerRequest(id, method);
continue;
}
if (method !== null) onNotification(message);
this.handleMessage(message, onNotification);
}
} catch (error) {
this.rejectAll(error instanceof Error ? error : new Error(String(error)));
this.handleProtocolFailure(new CodexStdioFailure("backend-protocol-error", error instanceof Error ? error.message : String(error), "stdout:read"));
}
}
private handleMessage(message: JsonRecord, onNotification: (message: JsonRecord) => void): void {
const id = typeof message.id === "number" ? message.id : null;
const method = typeof message.method === "string" ? message.method : null;
if (id !== null && method === null) {
this.handleResponse(id, message);
return;
}
if (id !== null && method !== null) {
this.handleServerRequest(id, method);
return;
}
if (method !== null) {
onNotification(message);
return;
}
this.handleProtocolFailure(new CodexStdioFailure("backend-response-invalid", "codex app-server message had neither JSON-RPC id nor method", "stdout:message", { message }));
}
private handleResponse(id: number, message: JsonRecord): void {
const pending = this.pending.get(id);
if (!pending) return;
this.pending.delete(id);
clearTimeout(pending.timer);
if (message.error !== undefined) {
pending.reject(failureFromRpcError(pending.method, message.error));
return;
}
if (!("result" in message)) {
pending.reject(new CodexStdioFailure("backend-response-invalid", `codex app-server response for ${pending.method} omitted result and error`, `response:${pending.method}`, { method: pending.method }));
return;
}
pending.resolve(message.result);
}
private handleServerRequest(id: number, method: string): void {
if (method === "item/commandExecution/requestApproval" || method === "item/fileChange/requestApproval") {
this.child.stdin.write(`${JSON.stringify({ id, result: { decision: "decline" } })}\n`);
@@ -105,92 +203,172 @@ export class CodexStdioClient {
this.child.stdin.write(`${JSON.stringify({ id, error: { code: -32601, message: `Unsupported client-side request: ${method}` } })}\n`);
}
private rejectRequest(id: number, error: CodexStdioFailure): void {
const pending = this.pending.get(id);
if (!pending) return;
this.pending.delete(id);
clearTimeout(pending.timer);
pending.reject(error);
}
private rejectAll(error: Error): void {
for (const pending of this.pending.values()) pending.reject(error);
for (const pending of this.pending.values()) {
clearTimeout(pending.timer);
pending.reject(error);
}
this.pending.clear();
}
private handleClose(code: number | null, signal: string | null): void {
private handleProtocolFailure(error: CodexStdioFailure): void {
if (this.closed) return;
this.closeFailure = error;
this.rejectAll(error);
this.stop();
}
private handleClose(code: number | null, signal: string | null, failure: CodexStdioFailure | null = null): void {
if (this.closed) return;
this.closed = true;
const stderrTail = redactText(Buffer.concat(this.stderrChunks).toString("utf8").slice(-8000));
this.rejectAll(new Error(`codex app-server closed code=${code} signal=${signal}`));
this.closeResolve({ code, signal, stderrTail });
if (failure) this.closeFailure = failure;
const stderr = this.stderrInfo();
const closeInfo: CodexStdioCloseInfo = {
code,
signal,
stderrTail: stderr.stderrTail,
stderrBytes: this.stderrBytes,
stderrTruncated: stderr.stderrTruncated,
failureKind: this.closeFailure?.failureKind ?? null,
message: this.closeFailure?.message ?? null,
};
this.rejectAll(this.closeFailure ?? new CodexStdioFailure("backend-failed", `codex app-server closed code=${code} signal=${signal}`, "process:close", closeInfo));
this.closeResolve(closeInfo);
}
private stderrInfo(): { stderrTail: string; stderrTruncated: boolean } {
const buffered = this.stderrTailBuffer.toString("utf8");
const tail = buffered.slice(-8000);
return {
stderrTail: redactText(tail),
stderrTruncated: this.stderrBytes > this.stderrTailBuffer.byteLength || buffered.length > tail.length,
};
}
}
export async function runCodexStdioTurn(options: CodexStdioTurnOptions): Promise<BackendTurnResult> {
const secretFailure = codexHomeReadiness(options.codexHome ?? options.env?.CODEX_HOME ?? `${options.env?.HOME ?? process.env.HOME ?? ""}/.codex`);
const codexHome = resolveCodexHome(options);
const secretFailure = codexHomeReadiness(codexHome);
if (secretFailure) return secretFailure;
const events: BackendEvent[] = [{ type: "backend_status", payload: { phase: "codex-app-server-starting", protocol: "codex-app-server-jsonrpc-stdio" } }];
const env = childEnv(options, codexHome);
const events: BackendEvent[] = [{
type: "backend_status",
payload: {
phase: "codex-app-server-starting",
protocol: codexProtocol,
runtime: runtimeSummary(options, env, codexHome),
},
}];
let assistantText = "";
let threadId: string | undefined;
let threadId: string | undefined = options.threadId;
let turnId: string | undefined;
let terminal: { status: TerminalStatus; failureKind: FailureKind | null; message: string | null } | null = null;
let terminalResolve!: () => void;
const terminalPromise = new Promise<void>((resolve) => { terminalResolve = resolve; });
const clientOptions: ConstructorParameters<typeof CodexStdioClient>[0] = {
cwd: options.cwd,
env: childEnv(options),
onNotification: (message) => {
const normalized = normalizeCodexNotification(message);
if (normalized.threadId) threadId = normalized.threadId;
if (normalized.turnId) turnId = normalized.turnId;
if (normalized.assistantDelta) assistantText += normalized.assistantDelta;
events.push(...normalized.events);
if (normalized.terminal) {
terminal = normalized.terminal;
terminalResolve();
}
},
};
if (options.command) clientOptions.command = options.command;
if (options.args) clientOptions.args = options.args;
const client = new CodexStdioClient(clientOptions);
let client: CodexStdioClient | null = null;
const requestTimeoutMs = Math.min(positiveTimeout(options.timeoutMs), requestTimeoutCapMs);
const timeout = setTimeout(() => {
if (!terminal) {
terminal = { status: "failed", failureKind: "backend-timeout", message: `codex stdio turn timed out after ${options.timeoutMs}ms` };
events.push({ type: "error", payload: terminal });
client.stop();
terminalResolve();
}
}, options.timeoutMs);
if (terminal) return;
terminal = { status: "failed", failureKind: "backend-timeout", message: `codex stdio turn timed out after ${options.timeoutMs}ms` };
events.push({ type: "error", payload: { failureKind: terminal.failureKind, message: terminal.message, phase: "turn:timeout" } });
client?.stop();
terminalResolve();
}, positiveTimeout(options.timeoutMs));
try {
await client.request("initialize", { clientInfo: { name: "agentrun", title: "AgentRun", version: "0.1.0" }, capabilities: { experimentalApi: true } });
const threadResponse = asResponseRecord(await client.request("thread/start", { model: options.model ?? "default", cwd: options.cwd, approvalPolicy: options.approvalPolicy, sandbox: options.sandbox, serviceName: "agentrun" }));
threadId = stringAt(asRecordAt(threadResponse, "thread"), "id") ?? threadId;
const turnResponse = asResponseRecord(await client.request("turn/start", { threadId: threadId ?? "", input: [{ type: "text", text: options.prompt, text_elements: [] }], cwd: options.cwd, approvalPolicy: options.approvalPolicy, model: options.model ?? "default" }));
turnId = stringAt(asRecordAt(turnResponse, "turn"), "id") ?? turnId;
await Promise.race([terminalPromise, client.closedPromise]);
if (!terminal) terminal = { status: "failed", failureKind: "backend-protocol-error", message: "codex app-server closed before turn/completed" };
const clientOptions: ConstructorParameters<typeof CodexStdioClient>[0] = {
cwd: options.cwd,
env,
onNotification: (message) => {
const normalized = normalizeCodexNotification(message);
if (normalized.threadId) threadId = normalized.threadId;
if (normalized.turnId) turnId = normalized.turnId;
if (normalized.assistantDelta) assistantText += normalized.assistantDelta;
events.push(...normalized.events);
if (normalized.terminal && !terminal) {
terminal = normalized.terminal;
terminalResolve();
}
},
};
if (options.command) clientOptions.command = options.command;
if (options.args) clientOptions.args = options.args;
client = new CodexStdioClient(clientOptions);
const initializeResult = requireResponseRecord(await client.request("initialize", { clientInfo: { name: "agentrun", title: "AgentRun", version: "0.1.0" }, capabilities: { experimentalApi: true } }, requestTimeoutMs), "initialize");
validateInitializeResponse(initializeResult);
client.notify("initialized", {});
events.push({ type: "backend_status", payload: { phase: "initialize:completed", protocol: codexProtocol } });
const threadMethod = options.threadId ? "thread/resume" : "thread/start";
const threadParams: JsonRecord = options.threadId
? { threadId: options.threadId, model: options.model ?? "default", cwd: options.cwd, approvalPolicy: options.approvalPolicy, sandbox: options.sandbox }
: { model: options.model ?? "default", cwd: options.cwd, approvalPolicy: options.approvalPolicy, sandbox: options.sandbox, serviceName: "agentrun" };
const threadResponse = requireResponseRecord(await client.request(threadMethod, threadParams, requestTimeoutMs), threadMethod);
threadId = requireNestedId(threadResponse, threadMethod, "thread");
events.push({ type: "backend_status", payload: { phase: `${threadMethod}:completed`, threadId } });
const turnResponse = requireResponseRecord(await client.request("turn/start", { threadId, input: textInput(options.prompt), cwd: options.cwd, approvalPolicy: options.approvalPolicy, model: options.model ?? "default" }, requestTimeoutMs), "turn/start");
turnId = requireNestedId(turnResponse, "turn/start", "turn");
events.push({ type: "backend_status", payload: { phase: "turn/start:completed", turnId } });
const race = await Promise.race([
terminalPromise.then(() => ({ kind: "terminal" as const })),
client.closedPromise.then((closeInfo) => ({ kind: "closed" as const, closeInfo })),
]);
if (race.kind === "closed" && !terminal) {
terminal = terminalFromClose(race.closeInfo);
events.push({ type: "error", payload: { failureKind: terminal.failureKind, message: terminal.message, phase: "transport:closed-before-terminal", appServerExit: closeEvent(race.closeInfo) } });
}
if (!terminal) terminal = { status: "failed", failureKind: "backend-response-invalid", message: "codex app-server did not emit turn/completed" };
} catch (error) {
terminal = { status: "failed", failureKind: "backend-protocol-error", message: error instanceof Error ? error.message : String(error) };
events.push({ type: "error", payload: { failureKind: terminal.failureKind, message: terminal.message } });
if (!terminal) {
const failure = normalizeFailure(error);
terminal = { status: failure.failureKind === "secret-unavailable" ? "blocked" : "failed", failureKind: failure.failureKind, message: failure.message };
events.push({ type: "error", payload: { failureKind: failure.failureKind, message: failure.message, phase: failure.phase, details: failure.details } });
}
} finally {
clearTimeout(timeout);
client.stop();
if (client) {
client.stop();
const closeInfo = await client.closedPromise;
events.push({ type: "backend_status", payload: { phase: "codex-app-server-closed", appServerExit: closeEvent(closeInfo) } });
}
}
if (!terminal) terminal = { status: "failed", failureKind: "backend-response-invalid", message: "codex app-server finished without terminal status" };
if (assistantText.trim().length > 0) events.push({ type: "assistant_message", payload: { text: assistantText } });
events.push({ type: "terminal_status", payload: { terminalStatus: terminal.status, failureKind: terminal.failureKind, message: terminal.message } });
return { terminalStatus: terminal.status, failureKind: terminal.failureKind, failureMessage: terminal.message, events: events.map((event) => ({ ...event, payload: redactJson(event.payload) })), ...(threadId ? { threadId } : {}), ...(turnId ? { turnId } : {}) };
}
function codexHomeReadiness(codexHome: string): BackendTurnResult | null {
try {
accessSync(`${codexHome}/auth.json`, fsConstants.R_OK);
accessSync(`${codexHome}/config.toml`, fsConstants.R_OK);
return null;
} catch {
return {
terminalStatus: "blocked",
failureKind: "secret-unavailable",
failureMessage: "Codex auth.json or config.toml projection is not readable",
events: [
{ type: "error", payload: { failureKind: "secret-unavailable", credentialSource: { codexHome, valuesPrinted: false } } },
{ type: "terminal_status", payload: { terminalStatus: "blocked", failureKind: "secret-unavailable" } },
],
};
}
const auth = fileReadable(`${codexHome}/auth.json`);
const config = fileReadable(`${codexHome}/config.toml`);
if (auth.readable && config.readable) return null;
const payload = {
failureKind: "secret-unavailable",
projection: {
codexHome: pathSummary(codexHome),
authJson: auth,
configToml: config,
valuesPrinted: false,
},
} satisfies JsonRecord;
return {
terminalStatus: "blocked",
failureKind: "secret-unavailable",
failureMessage: "Codex auth.json or config.toml projection is not readable",
events: [
{ type: "error", payload },
{ type: "terminal_status", payload: { terminalStatus: "blocked", failureKind: "secret-unavailable" } },
],
};
}
function normalizeCodexNotification(message: JsonRecord): { events: BackendEvent[]; assistantDelta?: string; threadId?: string; turnId?: string; terminal?: { status: TerminalStatus; failureKind: FailureKind | null; message: string | null } } {
@@ -207,33 +385,163 @@ function normalizeCodexNotification(message: JsonRecord): { events: BackendEvent
if (method === "item/agentMessage/delta") return { events: [], assistantDelta: typeof params.delta === "string" ? params.delta : "" };
if (method === "item/commandExecution/outputDelta") return { events: [{ type: "command_output", payload: { stream: "stdout", text: typeof params.delta === "string" ? params.delta : "" } }] };
if (method === "item/started" || method === "item/completed") return { events: [{ type: "tool_call", payload: { method, item: redactJson(asRecordAt(params, "item")) } }] };
if (method === "error") return { events: [{ type: "error", payload: { failureKind: "backend-failed", error: redactJson(params.error ?? params) } }] };
if (method === "error") {
const error = asRecordAt(params, "error");
const messageText = typeof error.message === "string" ? error.message : "Codex app-server error";
const failureKind = classifyMessageFailureKind(messageText, "backend-failed");
const terminal = params.willRetry === true ? undefined : { status: "failed" as const, failureKind, message: redactText(messageText) };
return { events: [{ type: "error", payload: { failureKind, error: redactJson(error), willRetry: params.willRetry === true } }], ...(terminal ? { terminal } : {}) };
}
if (method === "turn/completed") {
const turn = asRecordAt(params, "turn");
if (typeof turn.status !== "string") {
return { events: [{ type: "error", payload: { failureKind: "backend-response-invalid", message: "turn/completed notification omitted turn.status" } }], terminal: { status: "failed", failureKind: "backend-response-invalid", message: "turn/completed notification omitted turn.status" } };
}
const status = terminalStatusFromValue(turn.status);
const error = asRecordAt(turn, "error");
const messageText = typeof error.message === "string" ? error.message : null;
return { events: [{ type: "backend_status", payload: { phase: method, terminalStatus: status } }], terminal: { status, failureKind: status === "completed" ? null : "backend-failed", message: messageText } };
const messageText = typeof error.message === "string" ? redactText(error.message) : null;
return { events: [{ type: "backend_status", payload: { phase: method, terminalStatus: status } }], terminal: { status, failureKind: status === "completed" ? null : classifyMessageFailureKind(messageText ?? turn.status, "backend-failed"), message: messageText } };
}
return { events: [{ type: "backend_status", payload: { phase: method } }] };
}
function terminalStatusFromValue(value: unknown): TerminalStatus {
if (value === "completed") return "completed";
if (value === "cancelled" || value === "canceled") return "cancelled";
if (value === "cancelled" || value === "canceled" || value === "interrupted") return "cancelled";
if (value === "blocked") return "blocked";
return "failed";
}
function childEnv(options: CodexStdioTurnOptions): NodeJS.ProcessEnv {
const env: NodeJS.ProcessEnv = { ...process.env, ...options.env };
const codexHome = options.codexHome ?? options.env?.CODEX_HOME;
if (codexHome) env.CODEX_HOME = codexHome;
return env;
function childEnv(options: CodexStdioTurnOptions, codexHome: string): NodeJS.ProcessEnv {
return {
...process.env,
...options.env,
CODEX_HOME: codexHome,
CODEX_INTERNAL_ORIGINATOR_OVERRIDE: "agentrun",
};
}
function asResponseRecord(value: unknown): JsonRecord {
return typeof value === "object" && value !== null && !Array.isArray(value) ? value as JsonRecord : {};
function resolveCodexHome(options: CodexStdioTurnOptions): string {
return options.codexHome ?? options.env?.CODEX_HOME ?? `${options.env?.HOME ?? process.env.HOME ?? ""}/.codex`;
}
function validateInitializeResponse(value: JsonRecord): void {
const serverInfo = value.serverInfo;
if (serverInfo !== undefined && (typeof serverInfo !== "object" || serverInfo === null || Array.isArray(serverInfo))) {
throw new CodexStdioFailure("backend-response-invalid", "initialize response serverInfo must be an object when present", "response:initialize", { response: value });
}
}
function requireResponseRecord(value: unknown, method: string): JsonRecord {
if (typeof value === "object" && value !== null && !Array.isArray(value)) return value as JsonRecord;
throw new CodexStdioFailure("backend-response-invalid", `${method} response result must be an object`, `response:${method}`);
}
function requireNestedId(value: JsonRecord, method: string, key: string): string {
const id = stringAt(asRecordAt(value, key), "id");
if (id) return id;
throw new CodexStdioFailure("backend-response-invalid", `${method} response did not include ${key}.id`, `response:${method}`, { response: value });
}
function textInput(text: string): JsonValue[] {
return [{ type: "text", text, text_elements: [] }];
}
function fileReadable(filePath: string): JsonRecord {
try {
accessSync(filePath, fsConstants.R_OK);
return { ...pathSummary(filePath), readable: true };
} catch {
return { ...pathSummary(filePath), readable: false };
}
}
function pathSummary(value: string): JsonRecord {
const raw = String(value || "");
const parts = raw.split(/[\\/]+/u).filter(Boolean);
return {
present: raw.trim().length > 0,
absolute: path.isAbsolute(raw),
basename: parts.at(-1) ?? null,
depth: parts.length,
fingerprint: shortHash(raw),
valuePrinted: false,
};
}
function runtimeSummary(options: CodexStdioTurnOptions, env: NodeJS.ProcessEnv, codexHome: string): JsonRecord {
return {
command: options.command ?? "codex",
args: options.args ?? defaultCodexArgs,
cwd: pathSummary(options.cwd),
workspace: pathSummary(options.cwd),
codexHome: pathSummary(codexHome),
env: envSummary(env),
valuesPrinted: false,
};
}
function envSummary(env: NodeJS.ProcessEnv): JsonRecord {
const keyState: Record<string, JsonValue> = {};
for (const key of childEnvSummaryKeys) keyState[key] = { present: typeof env[key] === "string" && String(env[key]).length > 0 };
const secretLikeKeyCount = Object.keys(env).filter((key) => /auth|authorization|api[_-]?key|token|password|secret|credential/iu.test(key)).length;
return {
keyCount: Object.keys(env).length,
trackedKeys: keyState,
secretLikeKeyCount,
valuesPrinted: false,
};
}
function closeEvent(closeInfo: CodexStdioCloseInfo): JsonRecord {
return {
code: closeInfo.code,
signal: closeInfo.signal,
failureKind: closeInfo.failureKind,
message: closeInfo.message,
stderrTail: closeInfo.stderrTail.slice(-stderrEventChars),
stderrBytes: closeInfo.stderrBytes,
stderrTruncated: closeInfo.stderrTruncated || closeInfo.stderrTail.length > stderrEventChars,
};
}
function terminalFromClose(closeInfo: CodexStdioCloseInfo): { status: TerminalStatus; failureKind: FailureKind; message: string } {
const baseMessage = `codex app-server closed before turn/completed code=${closeInfo.code} signal=${closeInfo.signal}`;
const combined = [closeInfo.message ?? "", closeInfo.stderrTail].filter(Boolean).join("\n");
const failureKind = closeInfo.failureKind ?? classifyMessageFailureKind(combined, "backend-response-invalid");
const stderrPreview = closeInfo.stderrTail.trim().length > 0 ? `; stderrTail=${closeInfo.stderrTail.slice(-1000)}` : "";
return { status: "failed", failureKind, message: redactText(`${baseMessage}${stderrPreview}`) };
}
function failureFromRpcError(method: string, value: unknown): CodexStdioFailure {
const error = typeof value === "object" && value !== null ? value as Record<string, unknown> : {};
const message = typeof error.message === "string" ? error.message : JSON.stringify(redactJson(value as JsonValue));
return new CodexStdioFailure(classifyMessageFailureKind(message, "backend-protocol-error"), `codex app-server ${method} error: ${message}`, `response:${method}`, { method, error: redactJson(value as JsonValue) });
}
function spawnFailure(command: string, error: unknown): CodexStdioFailure {
const message = error instanceof Error ? error.message : String(error);
const code = typeof error === "object" && error !== null && "code" in error ? String((error as { code?: unknown }).code ?? "") : "";
return new CodexStdioFailure("backend-spawn-failed", `failed to start Codex app-server command ${command}: ${message}`, "process:spawn", { command, code });
}
function normalizeFailure(error: unknown): CodexStdioFailure {
if (error instanceof CodexStdioFailure) return error;
const message = error instanceof Error ? error.message : String(error);
return new CodexStdioFailure(classifyMessageFailureKind(message, "backend-protocol-error"), message, "codex-stdio");
}
function classifyMessageFailureKind(message: string, fallback: FailureKind): FailureKind {
const text = String(message || "").toLowerCase();
if (/rate.?limit|too many requests|\b429\b/u.test(text)) return "provider-rate-limited";
if (/\b401\b|\b403\b|unauthori[sz]ed|forbidden|invalid api key|authentication|auth failed|oauth|access token/u.test(text)) return "provider-auth-failed";
if (/timed out|timeout|idle timeout/u.test(text)) return "backend-timeout";
if (/invalid json|json parse/u.test(text)) return "backend-json-parse-error";
return fallback;
}
function positiveTimeout(value: number): number {
return Number.isFinite(value) && value > 0 ? Math.max(1, Math.floor(value)) : requestTimeoutCapMs;
}
function asRecordAt(value: JsonRecord, key: string): JsonRecord {
@@ -242,5 +550,9 @@ function asRecordAt(value: JsonRecord, key: string): JsonRecord {
}
function stringAt(value: JsonRecord, key: string): string | null {
return typeof value[key] === "string" ? value[key] : null;
return typeof value[key] === "string" && String(value[key]).length > 0 ? String(value[key]) : null;
}
function shortHash(value: string): string {
return createHash("sha256").update(value).digest("hex").slice(0, 12);
}
+2 -1
View File
@@ -1,7 +1,8 @@
export function redactText(value: string): string {
return value
.replace(/\b(sk-[A-Za-z0-9_-]{8,}|ghp_[A-Za-z0-9_]{8,}|github_pat_[A-Za-z0-9_]+)\b/gu, "REDACTED")
.replace(/(authorization\s*[:=]\s*)(bearer\s+)?[A-Za-z0-9._~+/=-]+/giu, "$1$2REDACTED")
.replace(/((?:api[_-]?key|token|password|secret)\s*[:=]\s*)[A-Za-z0-9._~+/=-]+/giu, "$1REDACTED")
.replace(/((?:api[_-]?key|token|password|secret)\s*["']?\s*[:=]\s*["']?)[A-Za-z0-9._~+/=-]+/giu, "$1REDACTED")
.replace(/(postgres(?:ql)?:\/\/[^:\s/@]+:)[^@\s]+(@)/giu, "$1REDACTED$2")
.replace(/(https?:\/\/[^:\s/@]+:)[^@\s]+(@)/giu, "$1REDACTED$2");
}
+3
View File
@@ -9,6 +9,9 @@ export type FailureKind =
| "runner-lease-conflict"
| "backend-failed"
| "backend-protocol-error"
| "backend-spawn-failed"
| "backend-json-parse-error"
| "backend-response-invalid"
| "backend-timeout"
| "provider-auth-failed"
| "provider-rate-limited"
+9 -2
View File
@@ -1,6 +1,13 @@
import { startManagerServer } from "./server.js";
import { errorToJson } from "../common/errors.js";
const port = Number(process.env.PORT ?? process.env.AGENTRUN_MGR_PORT ?? "8080");
const host = process.env.HOST ?? "0.0.0.0";
const started = await startManagerServer({ port, host });
console.log(JSON.stringify({ ok: true, serviceId: "agentrun-mgr", baseUrl: started.baseUrl }));
try {
const started = await startManagerServer({ port, host });
const database = await started.store.health();
console.log(JSON.stringify({ ok: true, serviceId: "agentrun-mgr", baseUrl: started.baseUrl, database }));
} catch (error) {
console.error(JSON.stringify({ ok: false, serviceId: "agentrun-mgr", failureKind: "infra-failed", message: error instanceof Error ? error.message : String(error), error: errorToJson(error) }));
process.exit(1);
}
+476
View File
@@ -0,0 +1,476 @@
import { createHash } from "node:crypto";
import { Pool } from "pg";
import type { PoolClient, QueryResultRow } from "pg";
import { AgentRunError } from "../common/errors.js";
import { redactJson } from "../common/redaction.js";
import type { BackendProfile, BackendTurnResult, CommandRecord, CommandState, CreateCommandInput, CreateRunInput, EventType, FailureKind, JsonRecord, JsonValue, RunEvent, RunnerRecord, RunRecord, RunStatus, TerminalStatus } from "../common/types.js";
import { newId, nowIso, stableHash } from "../common/validation.js";
import type { AgentRunStore, StoreHealth } from "./store.js";
import { statusFromTerminal } from "./store.js";
interface PostgresStoreOptions {
connectionString: string;
}
interface MigrationDefinition {
id: string;
checksum: string;
sql: string;
}
const initialMigrationSql = `
CREATE TABLE IF NOT EXISTS agentrun_runs (
id text PRIMARY KEY,
tenant_id text NOT NULL,
project_id text NOT NULL,
workspace_ref jsonb NOT NULL,
provider_id text NOT NULL,
backend_profile text NOT NULL,
execution_policy jsonb NOT NULL,
trace_sink jsonb,
status text NOT NULL,
terminal_status text,
failure_kind text,
failure_message text,
created_at timestamptz NOT NULL,
updated_at timestamptz NOT NULL,
claimed_by text,
lease_expires_at timestamptz
);
CREATE TABLE IF NOT EXISTS agentrun_commands (
id text PRIMARY KEY,
run_id text NOT NULL REFERENCES agentrun_runs(id) ON DELETE CASCADE,
seq integer NOT NULL,
type text NOT NULL,
payload jsonb NOT NULL,
payload_hash text NOT NULL,
idempotency_key text,
state text NOT NULL,
created_at timestamptz NOT NULL,
updated_at timestamptz NOT NULL,
acknowledged_at timestamptz,
UNIQUE (run_id, seq)
);
CREATE UNIQUE INDEX IF NOT EXISTS agentrun_commands_run_idempotency_key_idx
ON agentrun_commands (run_id, idempotency_key)
WHERE idempotency_key IS NOT NULL;
CREATE TABLE IF NOT EXISTS agentrun_events (
id text PRIMARY KEY,
run_id text NOT NULL REFERENCES agentrun_runs(id) ON DELETE CASCADE,
seq integer NOT NULL,
type text NOT NULL,
payload jsonb NOT NULL,
created_at timestamptz NOT NULL,
UNIQUE (run_id, seq)
);
CREATE TABLE IF NOT EXISTS agentrun_runners (
id text PRIMARY KEY,
run_id text,
attempt_id text,
backend_profile text,
placement text,
source_commit text,
metadata jsonb NOT NULL DEFAULT '{}'::jsonb,
registered_at timestamptz NOT NULL,
heartbeat_at timestamptz NOT NULL
);
CREATE TABLE IF NOT EXISTS agentrun_backends (
profile text PRIMARY KEY,
capabilities jsonb NOT NULL,
capacity jsonb NOT NULL,
health jsonb NOT NULL,
updated_at timestamptz NOT NULL
);
CREATE TABLE IF NOT EXISTS agentrun_leases (
run_id text PRIMARY KEY REFERENCES agentrun_runs(id) ON DELETE CASCADE,
runner_id text NOT NULL,
lease_expires_at timestamptz NOT NULL,
stale_recovery_marker jsonb,
updated_at timestamptz NOT NULL
);
CREATE INDEX IF NOT EXISTS agentrun_runs_status_idx ON agentrun_runs (status, updated_at);
CREATE INDEX IF NOT EXISTS agentrun_events_run_seq_idx ON agentrun_events (run_id, seq);
CREATE INDEX IF NOT EXISTS agentrun_commands_run_seq_idx ON agentrun_commands (run_id, seq);
CREATE INDEX IF NOT EXISTS agentrun_leases_expiry_idx ON agentrun_leases (lease_expires_at);
INSERT INTO agentrun_backends (profile, capabilities, capacity, health, updated_at)
VALUES (
'codex',
'{"protocol":"codex-app-server-jsonrpc-stdio","transport":"stdio","command":"codex app-server --listen stdio://"}'::jsonb,
'{"mode":"manual-runner-v0.1"}'::jsonb,
'{"status":"registered"}'::jsonb,
now()
)
ON CONFLICT (profile) DO UPDATE SET
capabilities = EXCLUDED.capabilities,
capacity = EXCLUDED.capacity,
health = EXCLUDED.health,
updated_at = EXCLUDED.updated_at;
`;
const postgresMigrations: MigrationDefinition[] = [
{
id: "001_v01_initial_durable_store",
checksum: checksumSql(initialMigrationSql),
sql: initialMigrationSql,
},
];
export function postgresMigrationContract(): JsonRecord {
return {
migrationIds: postgresMigrations.map((migration) => migration.id),
latestMigrationId: latestMigrationId(),
requiredTables: ["agentrun_schema_migrations", "agentrun_runs", "agentrun_commands", "agentrun_events", "agentrun_runners", "agentrun_backends", "agentrun_leases"],
checksums: Object.fromEntries(postgresMigrations.map((migration) => [migration.id, migration.checksum])),
};
}
export async function createPostgresAgentRunStore(options: PostgresStoreOptions): Promise<PostgresAgentRunStore> {
const store = new PostgresAgentRunStore(options);
await store.migrate();
return store;
}
export class PostgresAgentRunStore implements AgentRunStore {
private readonly pool: Pool;
private migrationReady = false;
private appliedMigrationId: string | null = null;
constructor(options: PostgresStoreOptions) {
this.pool = new Pool({ connectionString: options.connectionString, application_name: "agentrun-mgr-v01" });
}
async migrate(): Promise<void> {
await this.withTransaction(async (client) => {
await client.query(`
CREATE TABLE IF NOT EXISTS agentrun_schema_migrations (
id text PRIMARY KEY,
checksum text NOT NULL,
applied_at timestamptz NOT NULL DEFAULT now()
)
`);
for (const migration of postgresMigrations) {
const existing = await client.query<{ checksum: string }>("SELECT checksum FROM agentrun_schema_migrations WHERE id = $1 FOR UPDATE", [migration.id]);
if (existing.rowCount && existing.rows[0]?.checksum !== migration.checksum) {
throw new AgentRunError("infra-failed", `schema migration checksum mismatch for ${migration.id}`, { httpStatus: 503, details: { migrationId: migration.id } });
}
if (!existing.rowCount) {
await client.query(migration.sql);
await client.query("INSERT INTO agentrun_schema_migrations (id, checksum) VALUES ($1, $2)", [migration.id, migration.checksum]);
}
}
});
this.migrationReady = true;
this.appliedMigrationId = latestMigrationId();
}
async health(): Promise<StoreHealth> {
try {
await this.pool.query("SELECT 1");
const result = await this.pool.query<{ id: string }>("SELECT id FROM agentrun_schema_migrations ORDER BY applied_at DESC, id DESC LIMIT 1");
const migrationId = result.rows[0]?.id ?? null;
const migrationReady = this.migrationReady && migrationId === latestMigrationId();
return { adapter: "postgres", ready: migrationReady, reachable: true, migrationReady, migrationId, failureKind: migrationReady ? null : "infra-failed", message: migrationReady ? null : "schema migration is not ready", credentialValuesPrinted: false };
} catch (error) {
return { adapter: "postgres", ready: false, reachable: false, migrationReady: false, migrationId: this.appliedMigrationId, failureKind: "infra-failed", message: error instanceof Error ? error.message : String(error), credentialValuesPrinted: false };
}
}
async createRun(input: CreateRunInput): Promise<RunRecord> {
const at = nowIso();
const run: RunRecord = { ...input, id: newId("run"), status: "pending", terminalStatus: null, failureKind: null, failureMessage: null, createdAt: at, updatedAt: at, claimedBy: null, leaseExpiresAt: null };
return this.withTransaction(async (client) => {
await client.query(
`INSERT INTO agentrun_runs (id, tenant_id, project_id, workspace_ref, provider_id, backend_profile, execution_policy, trace_sink, status, terminal_status, failure_kind, failure_message, created_at, updated_at, claimed_by, lease_expires_at)
VALUES ($1, $2, $3, $4::jsonb, $5, $6, $7::jsonb, $8::jsonb, $9, $10, $11, $12, $13, $14, $15, $16)`,
[run.id, run.tenantId, run.projectId, JSON.stringify(run.workspaceRef), run.providerId, run.backendProfile, JSON.stringify(run.executionPolicy), JSON.stringify(run.traceSink), run.status, run.terminalStatus, run.failureKind, run.failureMessage, run.createdAt, run.updatedAt, run.claimedBy, run.leaseExpiresAt],
);
await this.appendEventWithLockedRun(client, run.id, "backend_status", { phase: "run-created", backendProfile: run.backendProfile });
return run;
});
}
async getRun(runId: string): Promise<RunRecord> {
const result = await this.pool.query("SELECT * FROM agentrun_runs WHERE id = $1", [runId]);
const row = result.rows[0];
if (!row) throw new AgentRunError("schema-invalid", `run ${runId} was not found`, { httpStatus: 404 });
return runFromRow(row);
}
async listEvents(runId: string, afterSeq: number, limit: number): Promise<RunEvent[]> {
await this.getRun(runId);
const result = await this.pool.query("SELECT * FROM agentrun_events WHERE run_id = $1 AND seq > $2 ORDER BY seq ASC LIMIT $3", [runId, afterSeq, clamp(limit, 1, 500)]);
return result.rows.map(eventFromRow);
}
async createCommand(runId: string, input: CreateCommandInput): Promise<CommandRecord> {
const payloadHash = stableHash(input.payload);
return this.withTransaction(async (client) => {
await this.requireRunForUpdate(client, runId);
if (input.idempotencyKey) {
const existing = await client.query("SELECT * FROM agentrun_commands WHERE run_id = $1 AND idempotency_key = $2", [runId, input.idempotencyKey]);
if (existing.rows[0]) {
const command = commandFromRow(existing.rows[0]);
if (command.payloadHash !== payloadHash) throw new AgentRunError("schema-invalid", "idempotency key reused with different payload", { httpStatus: 409 });
return command;
}
}
const seq = await this.nextSeq(client, "agentrun_commands", runId);
const at = nowIso();
const command: CommandRecord = { ...input, id: newId("cmd"), runId, seq, state: "pending", payloadHash, createdAt: at, updatedAt: at, acknowledgedAt: null };
await client.query(
`INSERT INTO agentrun_commands (id, run_id, seq, type, payload, payload_hash, idempotency_key, state, created_at, updated_at, acknowledged_at)
VALUES ($1, $2, $3, $4, $5::jsonb, $6, $7, $8, $9, $10, $11)`,
[command.id, command.runId, command.seq, command.type, JSON.stringify(command.payload), command.payloadHash, command.idempotencyKey ?? null, command.state, command.createdAt, command.updatedAt, command.acknowledgedAt],
);
await this.appendEventWithLockedRun(client, runId, "backend_status", { phase: "command-created", commandId: command.id, commandType: command.type });
return command;
});
}
async getCommand(commandId: string): Promise<CommandRecord> {
const result = await this.pool.query("SELECT * FROM agentrun_commands WHERE id = $1", [commandId]);
const row = result.rows[0];
if (!row) throw new AgentRunError("schema-invalid", `command ${commandId} was not found`, { httpStatus: 404 });
return commandFromRow(row);
}
async listCommands(runId: string, afterSeq: number, limit: number): Promise<CommandRecord[]> {
await this.getRun(runId);
const result = await this.pool.query("SELECT * FROM agentrun_commands WHERE run_id = $1 AND seq > $2 ORDER BY seq ASC LIMIT $3", [runId, afterSeq, clamp(limit, 1, 100)]);
return result.rows.map(commandFromRow);
}
async registerRunner(input: Partial<RunnerRecord>): Promise<RunnerRecord> {
const at = nowIso();
const runner: RunnerRecord = { id: input.id ?? newId("runner"), registeredAt: at, heartbeatAt: at, ...input };
const metadata = metadataForRunner(runner);
const result = await this.pool.query(
`INSERT INTO agentrun_runners (id, run_id, attempt_id, backend_profile, placement, source_commit, metadata, registered_at, heartbeat_at)
VALUES ($1, $2, $3, $4, $5, $6, $7::jsonb, $8, $9)
ON CONFLICT (id) DO UPDATE SET
run_id = EXCLUDED.run_id,
attempt_id = EXCLUDED.attempt_id,
backend_profile = EXCLUDED.backend_profile,
placement = EXCLUDED.placement,
source_commit = EXCLUDED.source_commit,
metadata = EXCLUDED.metadata,
heartbeat_at = EXCLUDED.heartbeat_at
RETURNING *`,
[runner.id, runner.runId ?? null, runner.attemptId ?? null, runner.backendProfile ?? null, runner.placement ?? null, runner.sourceCommit ?? null, JSON.stringify(metadata), runner.registeredAt, runner.heartbeatAt],
);
return runnerFromRow(result.rows[0]);
}
async claimRun(runId: string, runnerId: string, leaseMs: number): Promise<RunRecord> {
return this.withTransaction(async (client) => {
const run = await this.requireRunForUpdate(client, runId);
if (run.claimedBy && run.claimedBy !== runnerId && !isTerminalStatus(run.status)) throw new AgentRunError("runner-lease-conflict", `run ${runId} is already claimed`, { httpStatus: 409 });
const leaseExpiresAt = new Date(Date.now() + leaseMs).toISOString();
const updated = await client.query(
`UPDATE agentrun_runs SET status = $2, claimed_by = $3, lease_expires_at = $4, updated_at = $5 WHERE id = $1 RETURNING *`,
[runId, "claimed", runnerId, leaseExpiresAt, nowIso()],
);
await client.query(
`INSERT INTO agentrun_leases (run_id, runner_id, lease_expires_at, stale_recovery_marker, updated_at)
VALUES ($1, $2, $3, $4::jsonb, $5)
ON CONFLICT (run_id) DO UPDATE SET runner_id = EXCLUDED.runner_id, lease_expires_at = EXCLUDED.lease_expires_at, updated_at = EXCLUDED.updated_at`,
[runId, runnerId, leaseExpiresAt, null, nowIso()],
);
await this.appendEventWithLockedRun(client, runId, "backend_status", { phase: "run-claimed", runnerId });
return runFromRow(updated.rows[0]);
});
}
async heartbeat(runId: string, runnerId: string, leaseMs: number): Promise<RunRecord> {
return this.withTransaction(async (client) => {
const run = await this.requireRunForUpdate(client, runId);
if (run.claimedBy !== runnerId) throw new AgentRunError("runner-lease-conflict", `run ${runId} is not claimed by ${runnerId}`, { httpStatus: 409 });
const leaseExpiresAt = new Date(Date.now() + leaseMs).toISOString();
const updated = await client.query("UPDATE agentrun_runs SET lease_expires_at = $2, updated_at = $3 WHERE id = $1 RETURNING *", [runId, leaseExpiresAt, nowIso()]);
await client.query("UPDATE agentrun_runners SET heartbeat_at = $2 WHERE id = $1", [runnerId, nowIso()]);
await client.query("UPDATE agentrun_leases SET lease_expires_at = $2, updated_at = $3 WHERE run_id = $1", [runId, leaseExpiresAt, nowIso()]);
return runFromRow(updated.rows[0]);
});
}
async ackCommand(commandId: string): Promise<CommandRecord> {
const result = await this.pool.query("UPDATE agentrun_commands SET state = $2, acknowledged_at = $3, updated_at = $3 WHERE id = $1 RETURNING *", [commandId, "acknowledged", nowIso()]);
const row = result.rows[0];
if (!row) throw new AgentRunError("schema-invalid", `command ${commandId} was not found`, { httpStatus: 404 });
return commandFromRow(row);
}
async appendEvent(runId: string, type: EventType, payload: JsonRecord): Promise<RunEvent> {
return this.withTransaction(async (client) => {
await this.requireRunForUpdate(client, runId);
return this.appendEventWithLockedRun(client, runId, type, payload);
});
}
async finishRun(runId: string, result: Pick<BackendTurnResult, "terminalStatus" | "failureKind" | "failureMessage">): Promise<RunRecord> {
return this.withTransaction(async (client) => {
await this.requireRunForUpdate(client, runId);
const status = statusFromTerminal(result.terminalStatus);
const updated = await client.query(
`UPDATE agentrun_runs SET status = $2, terminal_status = $3, failure_kind = $4, failure_message = $5, updated_at = $6 WHERE id = $1 RETURNING *`,
[runId, status, result.terminalStatus, result.failureKind, result.failureMessage, nowIso()],
);
await this.appendEventWithLockedRun(client, runId, "terminal_status", { terminalStatus: result.terminalStatus, failureKind: result.failureKind, message: result.failureMessage });
return runFromRow(updated.rows[0]);
});
}
async backends(): Promise<JsonRecord[]> {
const result = await this.pool.query("SELECT * FROM agentrun_backends ORDER BY profile ASC");
return result.rows.map((row) => ({ profile: stringValue(row.profile), ...jsonRecord(row.capabilities), capacity: jsonValue(row.capacity), health: jsonValue(row.health), updatedAt: nullableIso(row.updated_at) }));
}
async close(): Promise<void> {
await this.pool.end();
}
private async appendEventWithLockedRun(client: PoolClient, runId: string, type: EventType, payload: JsonRecord): Promise<RunEvent> {
const seq = await this.nextSeq(client, "agentrun_events", runId);
const event: RunEvent = { id: newId("evt"), runId, seq, type, payload: redactJson(payload), createdAt: nowIso() };
await client.query("INSERT INTO agentrun_events (id, run_id, seq, type, payload, created_at) VALUES ($1, $2, $3, $4, $5::jsonb, $6)", [event.id, event.runId, event.seq, event.type, JSON.stringify(event.payload), event.createdAt]);
return event;
}
private async nextSeq(client: PoolClient, table: "agentrun_commands" | "agentrun_events", runId: string): Promise<number> {
const result = await client.query<{ seq: number }>(`SELECT COALESCE(MAX(seq), 0) + 1 AS seq FROM ${table} WHERE run_id = $1`, [runId]);
return Number(result.rows[0]?.seq ?? 1);
}
private async requireRunForUpdate(client: PoolClient, runId: string): Promise<RunRecord> {
const result = await client.query("SELECT * FROM agentrun_runs WHERE id = $1 FOR UPDATE", [runId]);
const row = result.rows[0];
if (!row) throw new AgentRunError("schema-invalid", `run ${runId} was not found`, { httpStatus: 404 });
return runFromRow(row);
}
private async withTransaction<T>(fn: (client: PoolClient) => Promise<T>): Promise<T> {
const client = await this.pool.connect();
try {
await client.query("BEGIN");
const result = await fn(client);
await client.query("COMMIT");
return result;
} catch (error) {
await client.query("ROLLBACK");
throw error;
} finally {
client.release();
}
}
}
function checksumSql(sql: string): string {
return createHash("sha256").update(sql.trim()).digest("hex");
}
function latestMigrationId(): string {
return postgresMigrations[postgresMigrations.length - 1]?.id ?? "none";
}
function clamp(value: number, min: number, max: number): number {
return Math.max(min, Math.min(value, max));
}
function runFromRow(row: QueryResultRow): RunRecord {
return {
id: stringValue(row.id),
tenantId: stringValue(row.tenant_id),
projectId: stringValue(row.project_id),
workspaceRef: jsonRecord(row.workspace_ref) as RunRecord["workspaceRef"],
providerId: stringValue(row.provider_id),
backendProfile: stringValue(row.backend_profile) as BackendProfile,
executionPolicy: jsonRecord(row.execution_policy) as RunRecord["executionPolicy"],
traceSink: jsonValue(row.trace_sink),
status: stringValue(row.status) as RunStatus,
terminalStatus: nullableString(row.terminal_status) as TerminalStatus | null,
failureKind: nullableString(row.failure_kind) as FailureKind | null,
failureMessage: nullableString(row.failure_message),
createdAt: iso(row.created_at),
updatedAt: iso(row.updated_at),
claimedBy: nullableString(row.claimed_by),
leaseExpiresAt: nullableIso(row.lease_expires_at),
};
}
function commandFromRow(row: QueryResultRow): CommandRecord {
return {
id: stringValue(row.id),
runId: stringValue(row.run_id),
seq: Number(row.seq),
type: stringValue(row.type) as CommandRecord["type"],
payload: jsonRecord(row.payload),
payloadHash: stringValue(row.payload_hash),
...(nullableString(row.idempotency_key) ? { idempotencyKey: stringValue(row.idempotency_key) } : {}),
state: stringValue(row.state) as CommandState,
createdAt: iso(row.created_at),
updatedAt: iso(row.updated_at),
acknowledgedAt: nullableIso(row.acknowledged_at),
};
}
function eventFromRow(row: QueryResultRow): RunEvent {
return { id: stringValue(row.id), runId: stringValue(row.run_id), seq: Number(row.seq), type: stringValue(row.type) as EventType, payload: jsonRecord(row.payload), createdAt: iso(row.created_at) };
}
function runnerFromRow(row: QueryResultRow): RunnerRecord {
return {
...jsonRecord(row.metadata),
id: stringValue(row.id),
...(nullableString(row.run_id) ? { runId: stringValue(row.run_id) } : {}),
...(nullableString(row.attempt_id) ? { attemptId: stringValue(row.attempt_id) } : {}),
...(nullableString(row.backend_profile) ? { backendProfile: stringValue(row.backend_profile) as BackendProfile } : {}),
...(nullableString(row.placement) ? { placement: stringValue(row.placement) } : {}),
...(nullableString(row.source_commit) ? { sourceCommit: stringValue(row.source_commit) } : {}),
registeredAt: iso(row.registered_at),
heartbeatAt: iso(row.heartbeat_at),
};
}
function metadataForRunner(runner: RunnerRecord): JsonRecord {
const { id: _id, runId: _runId, attemptId: _attemptId, backendProfile: _backendProfile, placement: _placement, sourceCommit: _sourceCommit, registeredAt: _registeredAt, heartbeatAt: _heartbeatAt, ...metadata } = runner;
return redactJson(metadata);
}
function isTerminalStatus(status: RunRecord["status"]): boolean {
return status === "completed" || status === "failed" || status === "cancelled";
}
function stringValue(value: unknown): string {
return typeof value === "string" ? value : String(value ?? "");
}
function nullableString(value: unknown): string | null {
return value === null || value === undefined ? null : stringValue(value);
}
function jsonValue(value: unknown): JsonValue {
if (value === undefined) return null;
return value as JsonValue;
}
function jsonRecord(value: unknown): JsonRecord {
return typeof value === "object" && value !== null && !Array.isArray(value) ? value as JsonRecord : {};
}
function iso(value: unknown): string {
if (value instanceof Date) return value.toISOString();
if (typeof value === "string") return new Date(value).toISOString();
return new Date(String(value)).toISOString();
}
function nullableIso(value: unknown): string | null {
return value === null || value === undefined ? null : iso(value);
}
+24 -15
View File
@@ -2,7 +2,7 @@ import type { Server } from "node:http";
import { createServer } from "node:http";
import type { AddressInfo } from "node:net";
import type { AgentRunStore } from "./store.js";
import { MemoryAgentRunStore } from "./store.js";
import { openAgentRunStoreFromEnv } from "./store.js";
import { AgentRunError, errorToJson } from "../common/errors.js";
import { asRecord, validateCreateCommand, validateCreateRun } from "../common/validation.js";
import type { ApiErrorBody, ApiOkBody, JsonRecord, JsonValue, RunEvent } from "../common/types.js";
@@ -21,7 +21,7 @@ export interface StartedManagerServer {
}
export async function startManagerServer(options: ManagerServerOptions = {}): Promise<StartedManagerServer> {
const store = options.store ?? new MemoryAgentRunStore();
const store = options.store ?? await openAgentRunStoreFromEnv();
const sourceCommit = options.sourceCommit ?? process.env.AGENTRUN_SOURCE_COMMIT ?? "unknown";
const server = createServer(async (req, res) => {
const traceId = `trc_${Date.now().toString(36)}_${Math.random().toString(36).slice(2, 8)}`;
@@ -52,45 +52,54 @@ async function readBody(req: import("node:http").IncomingMessage): Promise<unkno
async function route({ method, url, body, store, sourceCommit }: { method: string; url: URL; body: unknown; store: AgentRunStore; sourceCommit: string }): Promise<JsonValue> {
const path = url.pathname;
if (method === "GET" && (path === "/health" || path === "/health/live" || path === "/health/readiness")) {
return { serviceId: "agentrun-mgr", live: true, ready: true, database: { adapter: "memory-self-test", migrationReady: true }, sourceCommit, secretRefs: { valuesPrinted: false } };
const database = await store.health();
const ready = path === "/health/live" ? true : database.ready;
return { serviceId: "agentrun-mgr", live: true, ready, database, sourceCommit, secretRefs: { databaseUrl: database.adapter === "postgres" ? "redacted" : "not-used", valuesPrinted: false } };
}
if (method === "GET" && path === "/api/v1/backends") return { items: store.backends() };
if (method === "POST" && path === "/api/v1/runs") return store.createRun(validateCreateRun(body)) as unknown as JsonValue;
if (method === "GET" && path === "/api/v1/backends") return { items: await store.backends() as unknown as JsonValue };
if (method === "POST" && path === "/api/v1/runs") return await store.createRun(validateCreateRun(body)) as unknown as JsonValue;
const runMatch = path.match(/^\/api\/v1\/runs\/([^/]+)$/u);
if (method === "GET" && runMatch) return store.getRun(runMatch[1] ?? "") as unknown as JsonValue;
if (method === "GET" && runMatch) return await store.getRun(runMatch[1] ?? "") as unknown as JsonValue;
const eventMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/events$/u);
if (method === "GET" && eventMatch) {
const afterSeq = integerQuery(url, "afterSeq", 0);
const limit = integerQuery(url, "limit", 100);
return { items: store.listEvents(eventMatch[1] ?? "", afterSeq, limit) as unknown as JsonValue };
return { items: await store.listEvents(eventMatch[1] ?? "", afterSeq, limit) as unknown as JsonValue };
}
const commandCreateMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/commands$/u);
if (method === "POST" && commandCreateMatch) return store.createCommand(commandCreateMatch[1] ?? "", validateCreateCommand(body)) as unknown as JsonValue;
if (method === "GET" && commandCreateMatch) return { items: store.listCommands(commandCreateMatch[1] ?? "", integerQuery(url, "afterSeq", 0), integerQuery(url, "limit", 20)) as unknown as JsonValue };
if (method === "POST" && commandCreateMatch) return await store.createCommand(commandCreateMatch[1] ?? "", validateCreateCommand(body)) as unknown as JsonValue;
if (method === "GET" && commandCreateMatch) return { items: await store.listCommands(commandCreateMatch[1] ?? "", integerQuery(url, "afterSeq", 0), integerQuery(url, "limit", 20)) as unknown as JsonValue };
const commandShowMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/commands\/([^/]+)$/u);
if (method === "GET" && commandShowMatch) return store.getCommand(commandShowMatch[2] ?? "") as unknown as JsonValue;
if (method === "POST" && path === "/api/v1/runners/register") return store.registerRunner(asRecord(body ?? {}, "runner")) as unknown as JsonValue;
if (method === "GET" && commandShowMatch) return await store.getCommand(commandShowMatch[2] ?? "") as unknown as JsonValue;
if (method === "POST" && path === "/api/v1/runners/register") return await store.registerRunner(asRecord(body ?? {}, "runner")) as unknown as JsonValue;
const claimMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/claim$/u);
if (method === "POST" && claimMatch) {
const record = asRecord(body, "claim");
const runnerId = typeof record.runnerId === "string" ? record.runnerId : "";
if (runnerId.length === 0) throw new AgentRunError("schema-invalid", "runnerId is required", { httpStatus: 400 });
return store.claimRun(claimMatch[1] ?? "", runnerId, numberField(record, "leaseMs", 60_000)) as unknown as JsonValue;
return await store.claimRun(claimMatch[1] ?? "", runnerId, numberField(record, "leaseMs", 60_000)) as unknown as JsonValue;
}
const leaseMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/lease$/u);
if (method === "PATCH" && leaseMatch) {
const record = asRecord(body, "lease");
const runnerId = typeof record.runnerId === "string" ? record.runnerId : "";
if (runnerId.length === 0) throw new AgentRunError("schema-invalid", "runnerId is required", { httpStatus: 400 });
return await store.heartbeat(leaseMatch[1] ?? "", runnerId, numberField(record, "leaseMs", 60_000)) as unknown as JsonValue;
}
const eventsAppendMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/events$/u);
if (method === "POST" && eventsAppendMatch) {
const record = asRecord(body, "event");
const type = typeof record.type === "string" ? record.type as RunEvent["type"] : "backend_status";
return store.appendEvent(eventsAppendMatch[1] ?? "", type, asRecord(record.payload ?? {}, "event.payload")) as unknown as JsonValue;
return await store.appendEvent(eventsAppendMatch[1] ?? "", type, asRecord(record.payload ?? {}, "event.payload")) as unknown as JsonValue;
}
const statusMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/status$/u);
if (method === "PATCH" && statusMatch) {
const record = asRecord(body, "status");
const terminalStatus = record.terminalStatus === "completed" || record.terminalStatus === "failed" || record.terminalStatus === "blocked" || record.terminalStatus === "cancelled" ? record.terminalStatus : "failed";
return store.finishRun(statusMatch[1] ?? "", { terminalStatus, failureKind: typeof record.failureKind === "string" ? record.failureKind as never : null, failureMessage: typeof record.failureMessage === "string" ? record.failureMessage : null }) as unknown as JsonValue;
return await store.finishRun(statusMatch[1] ?? "", { terminalStatus, failureKind: typeof record.failureKind === "string" ? record.failureKind as never : null, failureMessage: typeof record.failureMessage === "string" ? record.failureMessage : null }) as unknown as JsonValue;
}
const ackMatch = path.match(/^\/api\/v1\/commands\/([^/]+)\/ack$/u);
if (method === "POST" && ackMatch) return store.ackCommand(ackMatch[1] ?? "") as unknown as JsonValue;
if (method === "POST" && ackMatch) return await store.ackCommand(ackMatch[1] ?? "") as unknown as JsonValue;
throw new AgentRunError("schema-invalid", `unsupported route ${method} ${path}`, { httpStatus: 404 });
}
+44 -14
View File
@@ -3,20 +3,46 @@ import { AgentRunError } from "../common/errors.js";
import { newId, nowIso, stableHash } from "../common/validation.js";
import { redactJson } from "../common/redaction.js";
export type MaybePromise<T> = T | Promise<T>;
export interface StoreHealth extends JsonRecord {
adapter: "memory-self-test" | "postgres";
ready: boolean;
reachable: boolean;
migrationReady: boolean;
migrationId: string | null;
failureKind: FailureKind | null;
message: string | null;
credentialValuesPrinted: false;
}
export interface AgentRunStore {
createRun(input: CreateRunInput): RunRecord;
getRun(runId: string): RunRecord;
listEvents(runId: string, afterSeq: number, limit: number): RunEvent[];
createCommand(runId: string, input: CreateCommandInput): CommandRecord;
getCommand(commandId: string): CommandRecord;
listCommands(runId: string, afterSeq: number, limit: number): CommandRecord[];
registerRunner(input: Partial<RunnerRecord>): RunnerRecord;
claimRun(runId: string, runnerId: string, leaseMs: number): RunRecord;
heartbeat(runId: string, runnerId: string, leaseMs: number): RunRecord;
ackCommand(commandId: string): CommandRecord;
appendEvent(runId: string, type: RunEvent["type"], payload: JsonRecord): RunEvent;
finishRun(runId: string, result: Pick<BackendTurnResult, "terminalStatus" | "failureKind" | "failureMessage">): RunRecord;
backends(): JsonRecord[];
health(): MaybePromise<StoreHealth>;
createRun(input: CreateRunInput): MaybePromise<RunRecord>;
getRun(runId: string): MaybePromise<RunRecord>;
listEvents(runId: string, afterSeq: number, limit: number): MaybePromise<RunEvent[]>;
createCommand(runId: string, input: CreateCommandInput): MaybePromise<CommandRecord>;
getCommand(commandId: string): MaybePromise<CommandRecord>;
listCommands(runId: string, afterSeq: number, limit: number): MaybePromise<CommandRecord[]>;
registerRunner(input: Partial<RunnerRecord>): MaybePromise<RunnerRecord>;
claimRun(runId: string, runnerId: string, leaseMs: number): MaybePromise<RunRecord>;
heartbeat(runId: string, runnerId: string, leaseMs: number): MaybePromise<RunRecord>;
ackCommand(commandId: string): MaybePromise<CommandRecord>;
appendEvent(runId: string, type: RunEvent["type"], payload: JsonRecord): MaybePromise<RunEvent>;
finishRun(runId: string, result: Pick<BackendTurnResult, "terminalStatus" | "failureKind" | "failureMessage">): MaybePromise<RunRecord>;
backends(): MaybePromise<JsonRecord[]>;
close?(): MaybePromise<void>;
}
export async function openAgentRunStoreFromEnv(env: NodeJS.ProcessEnv = process.env): Promise<AgentRunStore> {
const databaseUrl = env.DATABASE_URL?.trim();
if (databaseUrl) {
const { createPostgresAgentRunStore } = await import("./postgres-store.js");
return createPostgresAgentRunStore({ connectionString: databaseUrl });
}
const storeMode = env.AGENTRUN_STORE ?? env.AGENTRUN_MGR_STORE;
if (storeMode === "memory") return new MemoryAgentRunStore();
throw new AgentRunError("infra-failed", "DATABASE_URL is required for agentrun-mgr live runtime; set AGENTRUN_STORE=memory only for explicit self-test/dev mode", { httpStatus: 503, details: { adapter: "postgres", databaseUrl: "missing", memoryFallback: "disabled" } });
}
export class MemoryAgentRunStore implements AgentRunStore {
@@ -25,6 +51,10 @@ export class MemoryAgentRunStore implements AgentRunStore {
private readonly eventsByRun = new Map<string, RunEvent[]>();
private readonly runners = new Map<string, RunnerRecord>();
health(): StoreHealth {
return { adapter: "memory-self-test", ready: true, reachable: true, migrationReady: true, migrationId: "memory-self-test", failureKind: null, message: null, credentialValuesPrinted: false };
}
createRun(input: CreateRunInput): RunRecord {
const at = nowIso();
const run: RunRecord = { ...input, id: newId("run"), status: "pending", terminalStatus: null, failureKind: null, failureMessage: null, createdAt: at, updatedAt: at, claimedBy: null, leaseExpiresAt: null };
@@ -130,7 +160,7 @@ export class MemoryAgentRunStore implements AgentRunStore {
}
}
function statusFromTerminal(terminalStatus: TerminalStatus): RunRecord["status"] {
export function statusFromTerminal(terminalStatus: TerminalStatus): RunRecord["status"] {
if (terminalStatus === "completed") return "completed";
if (terminalStatus === "cancelled") return "cancelled";
if (terminalStatus === "blocked") return "blocked";
+191
View File
@@ -0,0 +1,191 @@
import { stableHash } from "../common/validation.js";
import type { BackendProfile, ExecutionPolicy, JsonRecord, JsonValue, RunRecord, SecretRef } from "../common/types.js";
export interface RunnerJobRenderOptions {
run: RunRecord;
commandId: string;
managerUrl: string;
image: string;
namespace?: string;
attemptId?: string;
runnerId?: string;
sourceCommit?: string;
serviceAccountName?: string;
imagePullPolicy?: string;
backoffLimit?: number;
ttlSecondsAfterFinished?: number;
}
interface CredentialProjection {
profile: BackendProfile | string;
secretRef: SecretRef;
volumeName: string;
mountPath: string;
}
export function renderRunnerJobDryRun(options: RunnerJobRenderOptions): JsonRecord {
const render = renderRunnerJobManifest(options);
return {
dryRun: true,
mutation: false,
action: "render-kubernetes-job",
jobIdentity: {
kind: "Job",
namespace: render.namespace,
name: render.jobName,
serviceAccountName: render.serviceAccountName,
},
runner: {
runId: options.run.id,
commandId: options.commandId,
attemptId: render.attemptId,
runnerId: render.runnerId,
backendProfile: options.run.backendProfile,
managerUrl: options.managerUrl,
sourceCommit: render.sourceCommit,
},
secretRefs: render.secretRefs.map((item) => ({ profile: item.profile, name: item.secretRef.name, namespace: item.secretRef.namespace ?? render.namespace, keys: item.secretRef.keys ?? [], mountPath: item.mountPath, valuesPrinted: false })),
pollCommands: {
run: `bun scripts/agentrun-cli.ts runs show ${options.run.id} --manager-url ${options.managerUrl}`,
events: `bun scripts/agentrun-cli.ts runs events ${options.run.id} --manager-url ${options.managerUrl} --after-seq 0 --limit 100`,
},
warnings: render.warnings,
manifest: render.manifest,
};
}
export function renderRunnerJobManifest(options: RunnerJobRenderOptions): { manifest: JsonRecord; namespace: string; jobName: string; runnerId: string; attemptId: string; sourceCommit: string; serviceAccountName: string; secretRefs: CredentialProjection[]; warnings: string[] } {
const namespace = options.namespace ?? "agentrun-v01";
const attemptId = options.attemptId ?? `attempt_${Date.now().toString(36)}`;
const runnerId = options.runnerId ?? `runner_${shortHash(`${options.run.id}:${attemptId}:${options.commandId}`)}`;
const sourceCommit = options.sourceCommit ?? process.env.AGENTRUN_SOURCE_COMMIT ?? "unknown";
const serviceAccountName = options.serviceAccountName ?? "agentrun-v01-runner";
const jobName = `agentrun-v01-runner-${shortDnsHash(options.run.id, attemptId)}`;
const secretRefs = credentialProjections(options.run, namespace);
const warnings: string[] = [];
if (secretRefs.length === 0) warnings.push("run executionPolicy.secretScope 未声明 provider SecretRefrunner 将按 secret-unavailable 上报,而不会降级直连外部凭据");
const env = runnerEnv(options, { namespace, jobName, runnerId, attemptId, sourceCommit, secretRefs });
const manifest: JsonRecord = {
apiVersion: "batch/v1",
kind: "Job",
metadata: {
name: jobName,
namespace,
labels: labels(options.run, jobName),
annotations: {
"agentrun.pikastech.local/run-id": options.run.id,
"agentrun.pikastech.local/command-id": options.commandId,
"agentrun.pikastech.local/dry-run-render": "true",
},
},
spec: {
backoffLimit: options.backoffLimit ?? 0,
ttlSecondsAfterFinished: options.ttlSecondsAfterFinished ?? 86_400,
template: {
metadata: {
labels: labels(options.run, jobName),
annotations: {
"agentrun.pikastech.local/run-id": options.run.id,
"agentrun.pikastech.local/command-id": options.commandId,
},
},
spec: {
serviceAccountName,
automountServiceAccountToken: false,
restartPolicy: "Never",
containers: [
{
name: "runner",
image: options.image,
imagePullPolicy: options.imagePullPolicy ?? "IfNotPresent",
command: ["bun", "src/runner/main.ts"],
env,
volumeMounts: secretRefs.map((item) => ({ name: item.volumeName, mountPath: item.mountPath, readOnly: true })),
resources: {
requests: { cpu: "250m", memory: "512Mi" },
limits: { cpu: "2", memory: "4Gi" },
},
securityContext: {
allowPrivilegeEscalation: false,
readOnlyRootFilesystem: false,
capabilities: { drop: ["ALL"] },
},
},
],
volumes: secretRefs.map(secretVolume),
},
},
},
};
return { manifest, namespace, jobName, runnerId, attemptId, sourceCommit, serviceAccountName, secretRefs, warnings };
}
function runnerEnv(options: RunnerJobRenderOptions, context: { namespace: string; jobName: string; runnerId: string; attemptId: string; sourceCommit: string; secretRefs: CredentialProjection[] }): JsonRecord[] {
const codexMount = context.secretRefs.find((item) => item.profile === "codex")?.mountPath ?? "/home/agentrun/.codex";
return [
{ name: "AGENTRUN_MGR_URL", value: options.managerUrl },
{ name: "AGENTRUN_RUN_ID", value: options.run.id },
{ name: "AGENTRUN_COMMAND_ID", value: options.commandId },
{ name: "AGENTRUN_ATTEMPT_ID", value: context.attemptId },
{ name: "AGENTRUN_RUNNER_ID", value: context.runnerId },
{ name: "AGENTRUN_BACKEND_PROFILE", value: options.run.backendProfile },
{ name: "AGENTRUN_EXECUTION_POLICY_JSON", value: JSON.stringify(options.run.executionPolicy) },
{ name: "AGENTRUN_SOURCE_COMMIT", value: context.sourceCommit },
{ name: "AGENTRUN_RUNTIME_NAMESPACE", value: context.namespace },
{ name: "AGENTRUN_K8S_JOB_NAME", value: context.jobName },
{ name: "AGENTRUN_LOG_PATH", value: "/tmp/agentrun-runner.jsonl" },
{ name: "HOME", value: "/home/agentrun" },
{ name: "CODEX_HOME", value: codexMount },
];
}
function credentialProjections(run: RunRecord, namespace: string): CredentialProjection[] {
const policy: ExecutionPolicy = run.executionPolicy;
const credentials = policy.secretScope.providerCredentials ?? [];
return credentials.map((item, index) => ({
profile: item.profile,
secretRef: item.secretRef.namespace ? item.secretRef : { ...item.secretRef, namespace },
volumeName: sanitizeVolumeName(`${String(item.profile)}-${index}`),
mountPath: normalizeMountPath(item.secretRef.mountPath),
}));
}
function secretVolume(item: CredentialProjection): JsonRecord {
const secret: JsonRecord = {
secretName: item.secretRef.name,
defaultMode: 256,
};
const keys = item.secretRef.keys ?? [];
if (keys.length > 0) secret.items = keys.map((key) => ({ key, path: key, mode: 256 }));
return { name: item.volumeName, secret };
}
function normalizeMountPath(value: string | undefined): string {
if (!value || value === "~/.codex") return "/home/agentrun/.codex";
if (value.startsWith("~/")) return `/home/agentrun/${value.slice(2)}`;
return value;
}
function labels(run: RunRecord, jobName: string): JsonRecord {
return {
"app.kubernetes.io/name": "agentrun-runner",
"app.kubernetes.io/component": "runner",
"app.kubernetes.io/part-of": "agentrun",
"agentrun.pikastech.local/lane": "v0.1",
"agentrun.pikastech.local/run-hash": shortHash(run.id),
"job-name": jobName,
};
}
function shortDnsHash(...parts: string[]): string {
return shortHash(parts.join(":"));
}
function shortHash(value: JsonValue): string {
return stableHash(value).slice(0, 12);
}
function sanitizeVolumeName(value: string): string {
const sanitized = value.toLowerCase().replace(/[^a-z0-9-]+/gu, "-").replace(/^-+|-+$/gu, "");
return sanitized.length > 0 ? sanitized.slice(0, 40) : "credential";
}
+18 -2
View File
@@ -1,4 +1,6 @@
import { runOnce, type RunnerOnceOptions } from "./run-once.js";
import { AgentRunError, errorToJson } from "../common/errors.js";
import { failureKindFromError } from "./manager-api.js";
const managerUrl = process.env.AGENTRUN_MGR_URL;
const runId = process.env.AGENTRUN_RUN_ID;
@@ -11,9 +13,23 @@ const options: RunnerOnceOptions = {
managerUrl,
runId,
};
if (process.env.AGENTRUN_COMMAND_ID) options.commandId = process.env.AGENTRUN_COMMAND_ID;
if (process.env.AGENTRUN_ATTEMPT_ID) options.attemptId = process.env.AGENTRUN_ATTEMPT_ID;
if (process.env.AGENTRUN_RUNNER_ID) options.runnerId = process.env.AGENTRUN_RUNNER_ID;
if (process.env.AGENTRUN_BACKEND_PROFILE === "codex") options.backendProfile = "codex";
if (process.env.AGENTRUN_K8S_JOB_NAME) options.placement = "kubernetes-job";
if (process.env.AGENTRUN_SOURCE_COMMIT) options.sourceCommit = process.env.AGENTRUN_SOURCE_COMMIT;
if (process.env.AGENTRUN_K8S_JOB_NAME) options.jobName = process.env.AGENTRUN_K8S_JOB_NAME;
if (process.env.HOSTNAME) options.podName = process.env.HOSTNAME;
if (process.env.AGENTRUN_LOG_PATH) options.logPath = process.env.AGENTRUN_LOG_PATH;
if (process.env.AGENTRUN_CODEX_COMMAND) options.codexCommand = process.env.AGENTRUN_CODEX_COMMAND;
if (process.env.AGENTRUN_CODEX_ARGS) options.codexArgs = JSON.parse(process.env.AGENTRUN_CODEX_ARGS) as string[];
if (process.env.CODEX_HOME) options.codexHome = process.env.CODEX_HOME;
const result = await runOnce(options);
console.log(JSON.stringify({ ok: true, data: result }));
try {
const result = await runOnce(options);
console.log(JSON.stringify({ ok: true, data: result }));
} catch (error) {
const failureKind = failureKindFromError(error);
console.log(JSON.stringify({ ok: false, failureKind, message: error instanceof Error ? error.message : String(error), error: errorToJson(error) }));
process.exit(error instanceof AgentRunError && error.httpStatus >= 1 && error.httpStatus <= 255 ? error.httpStatus : 1);
}
+108
View File
@@ -0,0 +1,108 @@
import { ManagerClient } from "../mgr/client.js";
import { AgentRunError } from "../common/errors.js";
import type { BackendEvent, BackendProfile, CommandRecord, FailureKind, JsonRecord, RunRecord, RunnerRecord, TerminalStatus } from "../common/types.js";
export interface RunnerRegistrationInput {
runId: string;
attemptId: string;
backendProfile: BackendProfile;
placement: "host-process" | "kubernetes-job";
sourceCommit: string;
runnerId?: string;
jobName?: string;
podName?: string;
logPath?: string;
}
export interface PollCommandsResult {
items: CommandRecord[];
selected: CommandRecord | null;
}
export interface RunnerFailureReport {
terminalStatus: TerminalStatus;
failureKind: FailureKind;
failureMessage: string;
}
export class RunnerManagerApi {
readonly client: ManagerClient;
constructor(readonly managerUrl: string) {
this.client = new ManagerClient(managerUrl);
}
async register(input: RunnerRegistrationInput): Promise<RunnerRecord> {
const body: JsonRecord = {
runId: input.runId,
attemptId: input.attemptId,
backendProfile: input.backendProfile,
placement: input.placement,
sourceCommit: input.sourceCommit,
};
if (input.runnerId) body.id = input.runnerId;
if (input.jobName) body.jobName = input.jobName;
if (input.podName) body.podName = input.podName;
if (input.logPath) body.logPath = input.logPath;
return await this.client.post("/api/v1/runners/register", body) as RunnerRecord;
}
async claim(runId: string, runnerId: string, leaseMs: number): Promise<RunRecord> {
return await this.client.post(`/api/v1/runs/${encodeURIComponent(runId)}/claim`, { runnerId, leaseMs }) as RunRecord;
}
async heartbeat(runId: string, runnerId: string, leaseMs: number): Promise<RunRecord> {
return await this.client.patch(`/api/v1/runs/${encodeURIComponent(runId)}/lease`, { runnerId, leaseMs }) as RunRecord;
}
async pollCommands(runId: string, options: { afterSeq?: number; limit?: number; commandId?: string }): Promise<PollCommandsResult> {
const afterSeq = options.afterSeq ?? 0;
const limit = options.limit ?? 20;
const response = await this.client.get(`/api/v1/runs/${encodeURIComponent(runId)}/commands?afterSeq=${afterSeq}&limit=${limit}`) as { items?: CommandRecord[] };
const items = Array.isArray(response.items) ? response.items : [];
const selected = options.commandId ? items.find((item) => item.id === options.commandId && item.state === "pending" && item.type === "turn") ?? null : items.find((item) => item.state === "pending" && item.type === "turn") ?? null;
return { items, selected };
}
async ackCommand(commandId: string): Promise<CommandRecord> {
return await this.client.post(`/api/v1/commands/${encodeURIComponent(commandId)}/ack`, {}) as CommandRecord;
}
async appendEvent(runId: string, event: BackendEvent): Promise<JsonRecord> {
return await this.client.post(`/api/v1/runs/${encodeURIComponent(runId)}/events`, event as unknown as JsonRecord) as JsonRecord;
}
async reportStatus(runId: string, report: { terminalStatus: TerminalStatus; failureKind: FailureKind | null; failureMessage: string | null }): Promise<RunRecord> {
return await this.client.patch(`/api/v1/runs/${encodeURIComponent(runId)}/status`, report as unknown as JsonRecord) as RunRecord;
}
async reportFailure(runId: string, report: RunnerFailureReport): Promise<{ reported: boolean; run: RunRecord | null; reportError: string | null }> {
try {
await this.appendEvent(runId, { type: "error", payload: { failureKind: report.failureKind, message: report.failureMessage, source: "agentrun-runner" } });
const run = await this.reportStatus(runId, report);
return { reported: true, run, reportError: null };
} catch (error) {
return { reported: false, run: null, reportError: errorMessage(error) };
}
}
}
export function failureKindFromError(error: unknown): FailureKind {
if (error instanceof AgentRunError) return error.failureKind;
const message = errorMessage(error).toLowerCase();
if (message.includes("auth") || message.includes("unauthorized") || message.includes("forbidden")) return "provider-auth-failed";
if (message.includes("timeout")) return "backend-timeout";
if (message.includes("lease") || message.includes("claim")) return "runner-lease-conflict";
return "infra-failed";
}
export function terminalStatusForFailure(failureKind: FailureKind): TerminalStatus {
if (failureKind === "cancelled") return "cancelled";
if (failureKind === "secret-unavailable" || failureKind === "tenant-policy-denied" || failureKind === "schema-invalid") return "blocked";
return "failed";
}
export function errorMessage(error: unknown): string {
if (error instanceof Error) return error.message;
return String(error);
}
+41 -19
View File
@@ -1,35 +1,57 @@
import { ManagerClient } from "../mgr/client.js";
import { RunnerManagerApi, failureKindFromError, terminalStatusForFailure, errorMessage } from "./manager-api.js";
import { runBackendTurn, type BackendAdapterOptions } from "../backend/adapter.js";
import type { CommandRecord, JsonRecord, RunRecord, RunnerRecord } from "../common/types.js";
import type { BackendProfile, JsonRecord, RunRecord, RunnerRecord } from "../common/types.js";
export interface RunnerOnceOptions extends BackendAdapterOptions {
managerUrl: string;
runId: string;
commandId?: string;
runnerId?: string;
attemptId?: string;
leaseMs?: number;
backendProfile?: BackendProfile;
placement?: "host-process" | "kubernetes-job";
sourceCommit?: string;
jobName?: string;
podName?: string;
logPath?: string;
}
export async function runOnce(options: RunnerOnceOptions): Promise<JsonRecord> {
const client = new ManagerClient(options.managerUrl);
const runner = await client.post("/api/v1/runners/register", {
id: options.runnerId ?? undefined,
const api = new RunnerManagerApi(options.managerUrl);
const leaseMs = options.leaseMs ?? 60_000;
const attemptId = options.attemptId ?? `attempt_${Date.now().toString(36)}`;
const runner = await api.register({
runId: options.runId,
attemptId: options.attemptId ?? `attempt_${Date.now().toString(36)}`,
backendProfile: "codex",
placement: "host-process",
sourceCommit: process.env.AGENTRUN_SOURCE_COMMIT ?? "unknown",
} as JsonRecord) as unknown as RunnerRecord;
const claimed = await client.post(`/api/v1/runs/${options.runId}/claim`, { runnerId: runner.id, leaseMs: options.leaseMs ?? 60_000 }) as unknown as RunRecord;
const commandsResponse = await client.get(`/api/v1/runs/${options.runId}/commands?afterSeq=0&limit=20`) as { items?: CommandRecord[] };
const command = commandsResponse.items?.find((item) => item.state === "pending" && item.type === "turn");
if (!command) {
await client.patch(`/api/v1/runs/${options.runId}/status`, { terminalStatus: "blocked", failureKind: "schema-invalid", failureMessage: "no pending turn command" });
return { runner, claimed, terminalStatus: "blocked", failureKind: "schema-invalid" };
attemptId,
backendProfile: options.backendProfile ?? "codex",
placement: options.placement ?? "host-process",
sourceCommit: options.sourceCommit ?? process.env.AGENTRUN_SOURCE_COMMIT ?? "unknown",
...(options.runnerId ? { runnerId: options.runnerId } : {}),
...(options.jobName ? { jobName: options.jobName } : {}),
...(options.podName ? { podName: options.podName } : {}),
...(options.logPath ? { logPath: options.logPath } : {}),
}) as RunnerRecord;
let claimed: RunRecord;
try {
claimed = await api.claim(options.runId, runner.id, leaseMs);
await api.heartbeat(options.runId, runner.id, leaseMs);
} catch (error) {
const failureKind = failureKindFromError(error);
if (failureKind !== "runner-lease-conflict") {
await api.reportFailure(options.runId, { terminalStatus: terminalStatusForFailure(failureKind), failureKind, failureMessage: errorMessage(error) });
}
throw error;
}
await client.post(`/api/v1/commands/${command.id}/ack`, {});
const commandsResponse = await api.pollCommands(options.runId, { afterSeq: 0, limit: 20, ...(options.commandId ? { commandId: options.commandId } : {}) });
const command = commandsResponse.selected;
if (!command) {
await api.reportStatus(options.runId, { terminalStatus: "blocked", failureKind: "schema-invalid", failureMessage: "no pending turn command" });
return { runner, claimed, terminalStatus: "blocked", failureKind: "schema-invalid", polledCommands: commandsResponse.items.length };
}
await api.ackCommand(command.id);
const result = await runBackendTurn(claimed, command, options);
for (const event of result.events) await client.post(`/api/v1/runs/${options.runId}/events`, event as unknown as JsonRecord);
const finalRun = await client.patch(`/api/v1/runs/${options.runId}/status`, { terminalStatus: result.terminalStatus, failureKind: result.failureKind, failureMessage: result.failureMessage }) as unknown as RunRecord;
for (const event of result.events) await api.appendEvent(options.runId, event);
const finalRun = await api.reportStatus(options.runId, { terminalStatus: result.terminalStatus, failureKind: result.failureKind, failureMessage: result.failureMessage }) as RunRecord;
return { runner, commandId: command.id, terminalStatus: result.terminalStatus, failureKind: result.failureKind, run: finalRun } as JsonRecord;
}
@@ -0,0 +1,24 @@
import assert from "node:assert/strict";
import { openAgentRunStoreFromEnv } from "../../mgr/store.js";
import { postgresMigrationContract } from "../../mgr/postgres-store.js";
import { redactText } from "../../common/redaction.js";
import { AgentRunError } from "../../common/errors.js";
import type { SelfTestCase } from "../harness.js";
const selfTest: SelfTestCase = async () => {
assert.equal(redactText("Authorization: Bearer abc123"), "Authorization: Bearer REDACTED");
assert.equal(redactText('{"token":"test-token-material"}').includes("test-token-material"), false);
await assert.rejects(
() => openAgentRunStoreFromEnv({}),
(error) => error instanceof AgentRunError && error.failureKind === "infra-failed" && error.message.includes("DATABASE_URL is required"),
);
const postgresContract = postgresMigrationContract();
assert.equal(postgresContract.latestMigrationId, "001_v01_initial_durable_store");
assert.ok(Array.isArray(postgresContract.requiredTables));
assert.ok(postgresContract.requiredTables.includes("agentrun_schema_migrations"));
assert.ok(postgresContract.requiredTables.includes("agentrun_runs"));
assert.ok(postgresContract.requiredTables.includes("agentrun_events"));
return { name: "redaction-postgres", tests: ["redaction", "postgres-store-contract"] };
};
export default selfTest;
+23
View File
@@ -0,0 +1,23 @@
import assert from "node:assert/strict";
import { startManagerServer } from "../../mgr/server.js";
import { MemoryAgentRunStore } from "../../mgr/store.js";
import { ManagerClient } from "../../mgr/client.js";
import type { SelfTestCase } from "../harness.js";
const selfTest: SelfTestCase = async () => {
const server = await startManagerServer({ port: 0, host: "127.0.0.1", sourceCommit: "self-test", store: new MemoryAgentRunStore() });
try {
const client = new ManagerClient(server.baseUrl);
const health = await client.get("/health/readiness") as { database?: { adapter?: string; reachable?: boolean; migrationReady?: boolean; failureKind?: string | null }; secretRefs?: { valuesPrinted?: boolean } };
assert.equal(health.database?.adapter, "memory-self-test");
assert.equal(health.database?.reachable, true);
assert.equal(health.database?.migrationReady, true);
assert.equal(health.database?.failureKind, null);
assert.equal(health.secretRefs?.valuesPrinted, false);
return { name: "manager-memory", tests: ["manager-memory-lifecycle"] };
} finally {
await new Promise<void>((resolve) => server.server.close(() => resolve()));
}
};
export default selfTest;
+32
View File
@@ -0,0 +1,32 @@
import assert from "node:assert/strict";
import { startManagerServer } from "../../mgr/server.js";
import { MemoryAgentRunStore } from "../../mgr/store.js";
import { ManagerClient } from "../../mgr/client.js";
import { renderRunnerJobDryRun } from "../../runner/k8s-job.js";
import type { RunRecord } from "../../common/types.js";
import { assertNoSecretLeak, createRunWithCommand, type SelfTestCase } from "../harness.js";
const selfTest: SelfTestCase = async (context) => {
const server = await startManagerServer({ port: 0, host: "127.0.0.1", sourceCommit: "self-test", store: new MemoryAgentRunStore() });
try {
const client = new ManagerClient(server.baseUrl);
const item = await createRunWithCommand(client, context, "job smoke", "selftest-job-render", 15_000);
const rendered = renderRunnerJobDryRun({
run: await client.get(`/api/v1/runs/${item.runId}`) as RunRecord,
commandId: item.commandId,
managerUrl: server.baseUrl,
image: "127.0.0.1:5000/agentrun/agentrun-mgr@sha256:1111111111111111111111111111111111111111111111111111111111111111",
attemptId: "attempt_selftest",
sourceCommit: "self-test",
});
assert.equal(rendered.dryRun, true);
assert.equal(rendered.mutation, false);
assert.equal((rendered.jobIdentity as { serviceAccountName?: string }).serviceAccountName, "agentrun-v01-runner");
assertNoSecretLeak(rendered);
return { name: "runner-k8s-job", tests: ["runner-k8s-job-dry-run"] };
} finally {
await new Promise<void>((resolve) => server.server.close(() => resolve()));
}
};
export default selfTest;
+71
View File
@@ -0,0 +1,71 @@
import assert from "node:assert/strict";
import path from "node:path";
import os from "node:os";
import { startManagerServer } from "../../mgr/server.js";
import { MemoryAgentRunStore } from "../../mgr/store.js";
import { ManagerClient } from "../../mgr/client.js";
import { runOnce } from "../../runner/run-once.js";
import type { FailureKind, JsonRecord, TerminalStatus } from "../../common/types.js";
import { assertNoSecretLeak, createRunWithCommand, type SelfTestCase, type SelfTestContext } from "../harness.js";
const selfTest: SelfTestCase = async (context) => {
const server = await startManagerServer({ port: 0, host: "127.0.0.1", sourceCommit: "self-test", store: new MemoryAgentRunStore() });
try {
const client = new ManagerClient(server.baseUrl);
const happy = await createRunWithCommand(client, context, "hello", "selftest-turn", 15_000);
const result = await runOnce({ managerUrl: server.baseUrl, runId: happy.runId, codexCommand: context.fakeCodexCommand, codexArgs: context.fakeCodexArgs, codexHome: context.codexHome, env: { CODEX_HOME: context.codexHome } });
assert.equal(result.terminalStatus, "completed");
assert.equal(typeof (result.runner as { id?: unknown }).id, "string");
const events = await client.get(`/api/v1/runs/${happy.runId}/events?afterSeq=0&limit=100`) as { items?: Array<{ type: string; payload: unknown }> };
assert.ok(events.items?.some((event) => event.type === "assistant_message"));
assert.ok(events.items?.some((event) => event.type === "backend_status" && JSON.stringify(event.payload).includes("run-claimed")));
assertNoSecretLeak(events);
const finalRun = await client.get(`/api/v1/runs/${happy.runId}`) as { terminalStatus?: string };
assert.equal(finalRun.terminalStatus, "completed");
await runFailureCase({ client, managerUrl: server.baseUrl, context, mode: "missing-turn-result", expectedStatus: "failed", expectedFailureKind: "backend-response-invalid" });
await runFailureCase({ client, managerUrl: server.baseUrl, context, mode: "invalid-json", expectedStatus: "failed", expectedFailureKind: "backend-json-parse-error" });
await runFailureCase({ client, managerUrl: server.baseUrl, context, mode: "missing-terminal", expectedStatus: "failed", expectedFailureKind: "backend-timeout", timeoutMs: 500 });
await runSpawnFailureCase({ client, managerUrl: server.baseUrl, context });
return { name: "codex-stdio", tests: ["runner-lease-heartbeat", "codex-stdio-fake-turn", "codex-stdio-missing-turn-result", "codex-stdio-invalid-json", "codex-stdio-timeout", "codex-stdio-spawn-failure"] };
} finally {
await new Promise<void>((resolve) => server.server.close(() => resolve()));
}
};
async function runFailureCase(options: { client: ManagerClient; managerUrl: string; context: SelfTestContext; mode: string; expectedStatus: TerminalStatus; expectedFailureKind: FailureKind; timeoutMs?: number }): Promise<void> {
const item = await createRunWithCommand(options.client, options.context, `failure ${options.mode}`, `selftest-${options.mode}`, options.timeoutMs ?? 3_000);
const result = await runOnce({
managerUrl: options.managerUrl,
runId: item.runId,
codexCommand: options.context.fakeCodexCommand,
codexArgs: options.context.fakeCodexArgs,
codexHome: options.context.codexHome,
env: { CODEX_HOME: options.context.codexHome, AGENTRUN_FAKE_CODEX_MODE: options.mode },
}) as JsonRecord;
assert.equal(result.terminalStatus, options.expectedStatus, options.mode);
assert.equal(result.failureKind, options.expectedFailureKind, options.mode);
const events = await options.client.get(`/api/v1/runs/${item.runId}/events?afterSeq=0&limit=100`) as { items?: Array<{ type: string; payload: unknown }> };
assert.ok(events.items?.some((event) => event.type === "error"), options.mode);
assertNoSecretLeak(events);
}
async function runSpawnFailureCase(options: { client: ManagerClient; managerUrl: string; context: SelfTestContext }): Promise<void> {
const item = await createRunWithCommand(options.client, options.context, "failure spawn", "selftest-spawn-failure", 3_000);
const result = await runOnce({
managerUrl: options.managerUrl,
runId: item.runId,
codexCommand: path.join(os.tmpdir(), `agentrun-missing-codex-${process.pid}`),
codexArgs: [],
codexHome: options.context.codexHome,
env: { CODEX_HOME: options.context.codexHome },
}) as JsonRecord;
assert.equal(result.terminalStatus, "failed", "spawn failure");
assert.equal(result.failureKind, "backend-spawn-failed", "spawn failure");
const events = await options.client.get(`/api/v1/runs/${item.runId}/events?afterSeq=0&limit=100`) as { items?: Array<{ type: string; payload: unknown }> };
assert.ok(events.items?.some((event) => event.type === "error"), "spawn failure");
assertNoSecretLeak(events);
}
export default selfTest;
+48
View File
@@ -0,0 +1,48 @@
import { mkdir, writeFile } from "node:fs/promises";
import path from "node:path";
import assert from "node:assert/strict";
import { renderCodexProviderSecretPlan } from "../../../scripts/src/secret-render.js";
import { AgentRunError } from "../../common/errors.js";
import type { SelfTestCase } from "../harness.js";
const selfTest: SelfTestCase = async (context) => {
const secretPlan = await renderCodexProviderSecretPlan({ codexHome: context.codexHome, dryRun: true });
assert.equal(secretPlan.namespace, "agentrun-v01");
assert.equal(secretPlan.secretName, "agentrun-v01-provider-codex");
assert.deepEqual(secretPlan.keys, ["auth.json", "config.toml"]);
assert.equal(secretPlan.writeAttempted, false);
assert.equal(secretPlan.totalBytes, Buffer.byteLength(JSON.stringify({ token: "test-token-material" }), "utf8") + Buffer.byteLength("model = \"gpt-test\"\n", "utf8"));
assert.match(String(secretPlan.sha256), /^[a-f0-9]{64}$/u);
const renderedSecretJson = JSON.stringify(secretPlan);
assert.equal(renderedSecretJson.includes("test-token-material"), false);
assert.equal(renderedSecretJson.includes("gpt-test"), false);
assert.equal(renderedSecretJson.includes("model ="), false);
await assert.rejects(
() => renderCodexProviderSecretPlan({ codexHome: path.join(context.tmp, "missing-codex-home"), dryRun: true }),
(error) => error instanceof AgentRunError && error.failureKind === "secret-unavailable",
);
const invalidCodexHome = path.join(context.tmp, "invalid-codex-home");
await mkdir(invalidCodexHome, { recursive: true });
await writeFile(path.join(invalidCodexHome, "auth.json"), "not-json");
await writeFile(path.join(invalidCodexHome, "config.toml"), "model = \"gpt-test\"\n");
await assert.rejects(
() => renderCodexProviderSecretPlan({ codexHome: invalidCodexHome, dryRun: true }),
(error) => error instanceof AgentRunError && error.failureKind === "schema-invalid",
);
await writeFile(path.join(invalidCodexHome, "auth.json"), JSON.stringify({ token: "" }));
await assert.rejects(
() => renderCodexProviderSecretPlan({ codexHome: invalidCodexHome, dryRun: true }),
(error) => error instanceof AgentRunError && error.failureKind === "secret-unavailable",
);
await writeFile(path.join(invalidCodexHome, "auth.json"), JSON.stringify({ token: "test-token-material" }));
await writeFile(path.join(invalidCodexHome, "config.toml"), "model =");
await assert.rejects(
() => renderCodexProviderSecretPlan({ codexHome: invalidCodexHome, dryRun: true }),
(error) => error instanceof AgentRunError && error.failureKind === "schema-invalid",
);
return { name: "secret-render", tests: ["codex-secret-dry-run"] };
};
export default selfTest;
+16
View File
@@ -1,6 +1,7 @@
import * as readline from "node:readline";
const rl = readline.createInterface({ input: process.stdin, crlfDelay: Infinity });
const mode = process.env.AGENTRUN_FAKE_CODEX_MODE ?? "success";
let threadCounter = 0;
let turnCounter = 0;
@@ -9,6 +10,10 @@ for await (const line of rl) {
if (trimmed.length === 0) continue;
const message = JSON.parse(trimmed) as { id?: number; method?: string; params?: Record<string, unknown> };
if (message.method === "initialize") {
if (mode === "invalid-json") {
process.stdout.write('{"token":"test-token-material"\n');
process.exit(0);
}
respond(message.id, { serverInfo: { name: "fake-codex-app-server", version: "self-test" } });
continue;
}
@@ -26,6 +31,17 @@ for await (const line of rl) {
continue;
}
if (message.method === "turn/start") {
if (mode === "missing-turn-result") {
respond(message.id, {});
continue;
}
if (mode === "missing-terminal") {
turnCounter += 1;
const turn = { id: `turn_selftest_${turnCounter}`, status: "running" };
notify("turn/started", { turn });
respond(message.id, { turn });
continue;
}
turnCounter += 1;
const turn = { id: `turn_selftest_${turnCounter}`, status: "completed" };
notify("turn/started", { turn });
+82
View File
@@ -0,0 +1,82 @@
import { mkdtemp, mkdir, writeFile, rm } from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import assert from "node:assert/strict";
import { ManagerClient } from "../mgr/client.js";
import type { JsonRecord } from "../common/types.js";
export interface SelfTestContext {
root: string;
tmp: string;
codexHome: string;
workspace: string;
fakeCodexPath: string;
fakeCodexCommand: string;
fakeCodexArgs: string[];
cleanup(): Promise<void>;
}
export interface SelfTestResult {
name?: string;
tests?: string[];
}
export type SelfTestCase = (context: SelfTestContext) => Promise<SelfTestResult> | SelfTestResult;
export async function createSelfTestContext(root: string): Promise<SelfTestContext> {
const tmp = await mkdtemp(path.join(os.tmpdir(), "agentrun-selftest-"));
const codexHome = path.join(tmp, "codex-home");
const workspace = path.join(tmp, "workspace");
await mkdir(codexHome, { recursive: true });
await mkdir(workspace, { recursive: true });
await writeFile(path.join(codexHome, "auth.json"), JSON.stringify({ token: "test-token-material" }));
await writeFile(path.join(codexHome, "config.toml"), "model = \"gpt-test\"\n");
await writeFile(path.join(workspace, "README.md"), "self-test workspace\n");
const fakeCodexPath = path.join(root, "src/selftest/fake-codex-app-server.ts");
return {
root,
tmp,
codexHome,
workspace,
fakeCodexPath,
fakeCodexCommand: process.env.AGENTRUN_SELFTEST_CODEX_COMMAND ?? defaultFakeCommand(),
fakeCodexArgs: process.env.AGENTRUN_SELFTEST_CODEX_ARGS ? JSON.parse(process.env.AGENTRUN_SELFTEST_CODEX_ARGS) as string[] : defaultFakeArgs(fakeCodexPath),
cleanup: () => rm(tmp, { recursive: true, force: true }),
};
}
export async function createRunWithCommand(client: ManagerClient, context: Pick<SelfTestContext, "workspace" | "codexHome">, prompt: string, idempotencyKey: string, timeoutMs: number): Promise<{ runId: string; commandId: string }> {
const run = await client.post("/api/v1/runs", {
tenantId: "unidesk",
projectId: "pikasTech/unidesk",
workspaceRef: { kind: "host-path", path: context.workspace },
providerId: "G14",
backendProfile: "codex",
executionPolicy: {
sandbox: "workspace-write",
approval: "never",
timeoutMs,
network: "default",
secretScope: { allowCredentialEcho: false, providerCredentials: [{ profile: "codex", secretRef: { name: "agentrun-v01-provider-codex", keys: ["auth.json", "config.toml"], mountPath: context.codexHome } }] },
},
traceSink: null,
}) as { id: string };
const command = await client.post(`/api/v1/runs/${run.id}/commands`, { type: "turn", payload: { prompt }, idempotencyKey }) as { id: string };
const duplicate = await client.post(`/api/v1/runs/${run.id}/commands`, { type: "turn", payload: { prompt }, idempotencyKey }) as { id: string };
assert.equal(duplicate.id, command.id);
return { runId: run.id, commandId: command.id };
}
export function assertNoSecretLeak(value: unknown): void {
const text = JSON.stringify(value);
assert.equal(text.includes("test-token-material"), false);
assert.equal(text.includes("Bearer test-token"), false);
}
function defaultFakeCommand(): string {
return process.versions.bun ? process.execPath : "npx";
}
function defaultFakeArgs(fakePath: string): string[] {
return process.versions.bun ? [fakePath] : ["tsx", fakePath];
}
+20 -57
View File
@@ -1,65 +1,28 @@
import { mkdtemp, mkdir, writeFile, rm } from "node:fs/promises";
import os from "node:os";
import { readdir } from "node:fs/promises";
import path from "node:path";
import { fileURLToPath } from "node:url";
import assert from "node:assert/strict";
import { startManagerServer } from "../mgr/server.js";
import { ManagerClient } from "../mgr/client.js";
import { runOnce } from "../runner/run-once.js";
import { redactText } from "../common/redaction.js";
import { pathToFileURL } from "node:url";
import { createSelfTestContext, type SelfTestCase, type SelfTestResult } from "./harness.js";
const root = path.resolve(path.dirname(fileURLToPath(import.meta.url)), "../..");
const tmp = await mkdtemp(path.join(os.tmpdir(), "agentrun-selftest-"));
const root = path.resolve(import.meta.dirname, "../..");
const casesDir = path.join(root, "src/selftest/cases");
const context = await createSelfTestContext(root);
try {
const codexHome = path.join(tmp, "codex-home");
const workspace = path.join(tmp, "workspace");
await mkdir(codexHome, { recursive: true });
await mkdir(workspace, { recursive: true });
await writeFile(path.join(codexHome, "auth.json"), JSON.stringify({ token: "test-token-material" }));
await writeFile(path.join(codexHome, "config.toml"), "model = \"gpt-test\"\n");
await writeFile(path.join(workspace, "README.md"), "self-test workspace\n");
const caseFiles = (await readdir(casesDir))
.filter((file) => file.endsWith(".ts") && !file.endsWith(".d.ts"))
.sort();
const results: SelfTestResult[] = [];
assert.equal(redactText("Authorization: Bearer abc123"), "Authorization: Bearer REDACTED");
const server = await startManagerServer({ port: 0, host: "127.0.0.1", sourceCommit: "self-test" });
try {
const client = new ManagerClient(server.baseUrl);
const health = await client.get("/health/readiness") as { database?: { adapter?: string } };
assert.equal(health.database?.adapter, "memory-self-test");
const run = await client.post("/api/v1/runs", {
tenantId: "unidesk",
projectId: "pikasTech/unidesk",
workspaceRef: { kind: "host-path", path: workspace },
providerId: "G14",
backendProfile: "codex",
executionPolicy: {
sandbox: "workspace-write",
approval: "never",
timeoutMs: 15_000,
network: "default",
secretScope: { allowCredentialEcho: false, providerCredentials: [{ profile: "codex", secretRef: { name: "agentrun-v01-provider-codex", keys: ["auth.json", "config.toml"], mountPath: codexHome } }] },
},
traceSink: null,
}) as { id: string };
const command = await client.post(`/api/v1/runs/${run.id}/commands`, { type: "turn", payload: { prompt: "hello" }, idempotencyKey: "selftest-turn" }) as { id: string };
const duplicate = await client.post(`/api/v1/runs/${run.id}/commands`, { type: "turn", payload: { prompt: "hello" }, idempotencyKey: "selftest-turn" }) as { id: string };
assert.equal(duplicate.id, command.id);
const fakePath = path.join(root, "src/selftest/fake-codex-app-server.ts");
const fakeCommand = process.env.AGENTRUN_SELFTEST_CODEX_COMMAND ?? process.execPath;
const fakeArgs = process.env.AGENTRUN_SELFTEST_CODEX_ARGS ? JSON.parse(process.env.AGENTRUN_SELFTEST_CODEX_ARGS) as string[] : [fakePath];
const result = await runOnce({ managerUrl: server.baseUrl, runId: run.id, codexCommand: fakeCommand, codexArgs: fakeArgs, codexHome, env: { CODEX_HOME: codexHome } });
assert.equal(result.terminalStatus, "completed");
const events = await client.get(`/api/v1/runs/${run.id}/events?afterSeq=0&limit=100`) as { items?: Array<{ type: string; payload: unknown }> };
assert.ok(events.items?.some((event) => event.type === "assistant_message"));
assert.equal(JSON.stringify(events).includes("test-token-material"), false);
assert.equal(JSON.stringify(events).includes("Bearer test-token"), false);
const finalRun = await client.get(`/api/v1/runs/${run.id}`) as { terminalStatus?: string };
assert.equal(finalRun.terminalStatus, "completed");
console.log(JSON.stringify({ ok: true, tests: ["manager-memory-lifecycle", "codex-stdio-fake-turn", "redaction"], runId: run.id }));
} finally {
await new Promise<void>((resolve) => server.server.close(() => resolve()));
for (const file of caseFiles) {
const moduleUrl = pathToFileURL(path.join(casesDir, file)).href;
const imported = await import(moduleUrl) as { default?: SelfTestCase; selfTest?: SelfTestCase };
const selfTest = imported.default ?? imported.selfTest;
if (typeof selfTest !== "function") throw new Error(`self-test case ${file} must export default or selfTest function`);
const result = await selfTest(context);
results.push({ name: result.name ?? file.replace(/\.ts$/u, ""), tests: result.tests ?? [] });
}
console.log(JSON.stringify({ ok: true, cases: results, tests: results.flatMap((result) => result.tests) }));
} finally {
await rm(tmp, { recursive: true, force: true });
await context.cleanup();
}