Merge pull request #32 from pikasTech/docs/hwlab-manual-dispatch-v01

feat: 补齐 HWLAB 手动调度能力
This commit is contained in:
Lyon
2026-06-01 11:43:53 +08:00
committed by GitHub
26 changed files with 1320 additions and 64 deletions
+2
View File
@@ -27,6 +27,7 @@ AgentRun 是面向 UniDesk 与 HWLAB 的共享 Agent 执行基础设施。本仓
- P0: AgentRun 必须按纵向 MVP 开发,不能一开始并行铺开成多服务大重写。
- P0: 先证明最小 `runner + 一个 backend`;再加入 `agentrun-mgr` 的 durable facts 和手动启动 runner;最后再加入自动 scheduler。
- P0: 现有 UniDesk Code Queue 和 HWLAB Code Agent 路径默认不被 AgentRun 替换。AgentRun 先作为新的共享执行面,通过 canary 集成逐步验证。
- P0: `v0.1` 面向 HWLAB v0.2 的下一阶段目标是先通过 `agentrun-mgr` 手动调度 API 提供 canary 服务;HWLAB `hwlab-cloud-api` 负责显式创建 run/command 并启动 runner Job,自动 scheduler 不作为前置条件。
## Critical RESTful MVP Rule
@@ -54,6 +55,7 @@ AgentRun 是面向 UniDesk 与 HWLAB 的共享 Agent 执行基础设施。本仓
- `docs/reference/spec-v01-postgres.md`v0.1 Postgres durable store、schema migration 和 SecretRef 规格。
- `docs/reference/spec-v01-secret-distribution.md`v0.1 Code Agent provider credential 和运行时 Secret 分发规格。
- `docs/reference/spec-v01-runtime-assembly.md`v0.1 runner/backend 启动前的四要素 RuntimeAssembly 装配模型,覆盖 BackendImageRef、ProfileRef、SessionRef 和 Git-only ResourceBundleRef。
- `docs/reference/spec-v01-hwlab-manual-dispatch.md`:v0.1 通过手动调度 API 为 HWLAB v0.2 提供 canary Code Agent 服务的目标、缺口和增强计划。
- `docs/reference/spec-v01-validation.md`:v0.1 两层验证模型,自测试允许 mock,综合联调必须 100% 真实。
- `docs/reference/spec-v01-agentrun-mgr.md`v0.1 manager REST API、tenant boundary、runner claim 和 event/status authority。
- `docs/reference/spec-v01-agentrun-runner.md`v0.1 短生命周期 runner、claim/poll/report、日志和 failureKind。
+17 -12
View File
@@ -2,7 +2,7 @@
`v0.1` CLI 的权威规格是 [spec-v01-cli.md](spec-v01-cli.md)。本文保留为 CLI 与服务 API 的辅助参考;如果命令、测试规格或实现状态与 `spec-v01-cli.md` 冲突,以 `spec-v01-cli.md` 为准。
AgentRun CLI 与服务 API 遵循 UniDesk `cli-spec` 原则。本文在 CLI 实现前先固化期望形态,避免实现漂移成长阻塞命令或隐藏状态
AgentRun CLI 与服务 API 遵循 UniDesk `cli-spec` 原则。本文只保留辅助索引,完整命令与测试规格见 [spec-v01-cli.md](spec-v01-cli.md)
## CLI 形态
@@ -17,22 +17,27 @@ CLI 默认输出 JSON。空 stdout 是失败,不是成功。每个命令都必
长操作必须是 fire-and-forget 或短异步资源操作。CLI 调用应在 60 秒内返回。创建 run 或启动 runner 的命令返回创建出的资源和轮询命令,不等待模型 turn 完成。
## 规划命令
## 常用命令
初始命令族:
`v0.1` 常用命令族:
```bash
bun scripts/agentrun-cli.ts runs create --json-file <run.json>
bun scripts/agentrun-cli.ts runs show <runId>
bun scripts/agentrun-cli.ts runs events <runId> --after-seq <n> --limit <n>
bun scripts/agentrun-cli.ts commands create <runId> --type turn --json-file <payload.json>
bun scripts/agentrun-cli.ts commands show <commandId>
bun scripts/agentrun-cli.ts runner start --run-id <runId> --backend <backendProfile>
bun scripts/agentrun-cli.ts backends list
bun scripts/agentrun-cli.ts server start|status|stop|logs
./scripts/agentrun runs create --json-file <run.json>
./scripts/agentrun runs show <runId>
./scripts/agentrun runs events <runId> --after-seq <n> --limit <n>
./scripts/agentrun runs result <runId> [--command-id <commandId>]
./scripts/agentrun runs cancel <runId> [--reason <text>]
./scripts/agentrun commands create <runId> --type turn --json-file <payload.json>
./scripts/agentrun commands show <commandId> --run-id <runId>
./scripts/agentrun commands result <commandId> --run-id <runId>
./scripts/agentrun commands cancel <commandId> [--reason <text>]
./scripts/agentrun runner start --run-id <runId> --backend <backendProfile>
./scripts/agentrun runner job --run-id <runId> --command-id <commandId> [--idempotency-key <key>]
./scripts/agentrun backends list
./scripts/agentrun server start|status
```
具体命令名可以在实现时调整,但行为必须保持以下规则:
行为必须保持以下规则:
- `runs create` 创建 durable facts 并立即返回。
- `runner start` 启动本地进程或 Kubernetes Job,并返回 process/job identity、log path 和 poll commands。
+14
View File
@@ -5,6 +5,7 @@
## 在系统中的职责划分
- 提供 Manager 公共 API:创建和查询 run、提交 command、分页读取 events、查询 backend capability。
- 提供手动调度 API:为已创建的 run/command 显式创建 Kubernetes runner Job,并快速返回 job identity、attempt 和轮询入口。
- 提供 Runner 私有 APIrunner register、claim run、lease heartbeat、poll commands、append events、ack command、上报 status。
- 校验并持久化 `tenantId``projectId``workspaceRef``providerId``backendProfile``executionPolicy``traceSink`
- 执行最小 tenant policy boundary:只做 schema、allowlist、idempotency、secret scope 和 executionPolicy 范围检查;不内建 UniDesk/HWLAB 的业务授权。
@@ -32,11 +33,18 @@ GET /health/readiness
POST /api/v1/runs
GET /api/v1/runs/:runId
GET /api/v1/runs/:runId/events?afterSeq=0&limit=100
GET /api/v1/runs/:runId/result?commandId=<commandId>
POST /api/v1/runs/:runId/cancel
POST /api/v1/runs/:runId/commands
GET /api/v1/runs/:runId/commands/:commandId
GET /api/v1/runs/:runId/commands/:commandId/result
POST /api/v1/runs/:runId/runner-jobs
POST /api/v1/commands/:commandId/cancel
GET /api/v1/backends
```
面向 HWLAB v0.2 canary 的手动调度 API 目标见 [spec-v01-hwlab-manual-dispatch.md](spec-v01-hwlab-manual-dispatch.md)。`runner-jobs` 只显式启动当前 run/command 的 runner Job,不扫描 pending queue,不等待完整模型 turn;自动 scheduler 仍是 deferred 能力。后续 durable cancel API 必须与同一 run/command 状态机衔接,不能让 HWLAB 直接删除 Kubernetes Job 作为正式取消语义。
Runner 私有 API 的 `v0.1` 范围:
```http
@@ -103,13 +111,19 @@ POST /api/v1/commands/:commandId/ack
阅读本文和 [spec-v01-agentrun-runner.md](spec-v01-agentrun-runner.md),然后让两个真实 runner 尝试 claim 同一个 run。确认只有一个 owner 成功,另一个返回 `runner-lease-conflict` 或等价 failureKind;随后分页读取 events,确认 `seq` 单调、不重复、不丢失。
### T5 手动 runner Job 调度 API
阅读本文和 [spec-v01-hwlab-manual-dispatch.md](spec-v01-hwlab-manual-dispatch.md),然后用 RESTful API 创建 `tenantId=hwlab` 的 run、提交 command、调用 `POST /api/v1/runs/:runId/runner-jobs`。确认响应短返回 JSON,包含 `runId``commandId``attemptId``jobName`、namespace、log/pod identity 和后续 poll 入口;重复 idempotency key 不创建重复 job。
## 规格的实现情况
| 规格项 | 状态 | 说明 |
| --- | --- | --- |
| `agentrun-mgr` 服务规格 | 已定义 | 本文为 v0.1 manager 权威。 |
| Manager REST API | 已实现/已通过主闭环 | 已有 run、command、event、backends、runner register、claim、lease heartbeat、poll、ack、status、runner Job 创建和 health/readiness 的 HTTP JSON API;真实 runtime 已通过 RESTful API 主闭环。 |
| 手动 runner Job API | 已实现 | `POST /api/v1/runs/:runId/runner-jobs` 已可创建 Kubernetes runner Job,并固化 idempotency、持久 runner job record、响应 schema 和 cancel 前置检查。 |
| Tenant policy boundary | 已实现最小边界 | v0.1 已做 schema、tenant/backend allowlist、executionPolicy 和 secretScope 结构校验;业务授权仍由 UniDesk/HWLAB 自己判定。 |
| `deepseek` backendProfile allowlist | 已实现/已通过主闭环 | Manager validation、backend capability 和 matching SecretRef 校验已支持 `deepseek`;真实 runtime 已经通过 CI/CD 发布并确认 Postgres migration `002_v01_backend_profiles` 应用。 |
| Postgres durable adapter | 已实现/已通过主闭环 | live runtime 通过 `DATABASE_URL` 使用 Postgres durable storememory store 仅用于显式 self-test/dev。见 [spec-v01-postgres.md](spec-v01-postgres.md)。 |
| Observability 最小合同 | 已实现主路径 | events append-only、terminal status、failureKind、health/readiness store 状态、runner claim/lease/backend events 和 Secret/DSN redaction 已进入 manager;集中 trace 和部署级观测仍属后续工作。 |
| durable cancel API | 已实现最小闭环 | 已提供 run/command cancel APIpending command cancel 阻止新 runner Jobrunning runner 轮询 cancel 并中止 Codex stdio backend,终态使用 `cancelled`。 |
+3 -1
View File
@@ -30,7 +30,7 @@ Runner Secret 只能通过 Kubernetes Secret projection、ServiceAccount/RBAC
Kubernetes Job runner 必须把 credential source 与 runtime home 分开:Secret volume 只读挂在 `/var/run/agentrun/secrets/...``/home/agentrun``emptyDir` 提供可写空间,`CODEX_HOME` 指向当前 run/profile 的 writable runtime home`AGENTRUN_CODEX_SECRET_HOME` 指向当前 `backendProfile` 对应的只读 projection。runner/backend 在启动 provider 前只复制授权文件,不打印内容。`codex``deepseek` profile 不得共享同一个可写 runtime home,除非它们运行在不同的 per-run Kubernetes Job 且该目录由 Job 独占 emptyDir 提供。
RuntimeAssembly P0 中 `SessionRef` 可以显式为 `null`runner 不得把完整 `CODEX_HOME`、Secret projection 或节点 host path 当作 session store。`ResourceBundleRef` P0 收敛为 Git-onlyrunner 后续实现资源 materialization 时只能 checkout 到允许 workspace 前缀,并记录 commit/tree 摘要,不能把用户上传文件或 env dump 混入 Git-only bundle。
RuntimeAssembly P0 中 `SessionRef` 可以显式为 `null`runner 不得把完整 `CODEX_HOME`、Secret projection 或节点 host path 当作 session store。`ResourceBundleRef` P0 收敛为 Git-onlyrunner 已支持把 `repoUrl + full commitId` checkout 到 `AGENTRUN_WORKSPACE_ROOT` 下的隔离目录,并记录 commit/tree 摘要,不能把用户上传文件或 env dump 混入 Git-only bundle。
Kubernetes Job runner 必须设置有限保留时间。`v0.1` 默认 `ttlSecondsAfterFinished=86400`,用于保留最近完成 Job 的调试窗口,同时避免长期堆积 `Completed` runner Job 污染运行面观察。该 TTL 是 Job manifest 的运行面属性,不是 CI/CD 门禁;需要延长保留时间时必须通过受控 Job render/input 显式覆盖,并在 issue 或 PR 中说明原因。
@@ -114,5 +114,7 @@ Runner 日志必须实时 flush 到文件或 pod logCLI 启动 runner 时必
| Kubernetes Job runner | 已实现/已通过主闭环 | `runner job` 通过 manager REST 创建 Kubernetes Job,固定使用 `agentrun-v01-runner` ServiceAccount、manager URL、runId/commandId/attemptId、executionPolicy、SecretRef 文件投影、writable Codex runtime home 和有限 TTL;真实 `agentrun-v01` runner Job 已完成 Codex turn。 |
| host process runner | 已实现 | `runner start``src/runner/main.ts` 进入同一套 `runOnce`,可通过 manager register/claim/poll/report 执行自测试。 |
| claim/lease/report client | 已实现 | 已拆出 runner manager API client,覆盖 register、claim、lease heartbeat、poll command、ack、append event 和 terminal statuslive runtime 通过 manager 写入 Postgres durable store。 |
| cancel observation | 已实现最小闭环 | runner 在 backend 执行期间轮询 run/command cancel,触发 AbortController 中止 Codex stdio backend,并按 `cancelled` 上报 command/run 终态。 |
| SessionRef/ResourceBundleRef 消费 | 已实现最小闭环 | runner 会使用 run 中的 SessionRef threadId 执行 resume,并 materialize Git-only ResourceBundleRef 到隔离 workspace 后再启动 backend。 |
| runner redaction | 已实现主路径 | runner/backend event 和 Job 输出使用 redaction;复杂审计仍按 [spec-v01-validation.md](spec-v01-validation.md) 的人工验收抽查。 |
| `deepseek` profile runner selection | 已实现/已通过主闭环 | Runner Job 和 host runner 已按 run `backendProfile` 选择 matching SecretRef、projection、`CODEX_HOME` 和 backend metadata;真实 Kubernetes Job 已完成 `codex -> deepseek -> codex` 切换联调。 |
+7 -2
View File
@@ -35,10 +35,14 @@ CLI 官方 TypeScript 入口固定为 `scripts/agentrun-cli.ts`。在 G14 非交
./scripts/agentrun runs create --json-file <run.json>
./scripts/agentrun runs show <runId>
./scripts/agentrun runs events <runId> --after-seq <n> --limit <n>
./scripts/agentrun runs result <runId> [--command-id <commandId>]
./scripts/agentrun runs cancel <runId> [--reason <text>]
./scripts/agentrun commands create <runId> --type turn --json-file <payload.json>
./scripts/agentrun commands show <commandId> --run-id <runId>
./scripts/agentrun commands result <commandId> --run-id <runId>
./scripts/agentrun commands cancel <commandId> [--reason <text>]
./scripts/agentrun runner start --run-id <runId> --backend <backendProfile>
./scripts/agentrun runner job --run-id <runId> --command-id <commandId>
./scripts/agentrun runner job --run-id <runId> --command-id <commandId> [--idempotency-key <key>]
./scripts/agentrun runner job --dry-run --run-id <runId> --command-id <commandId> --image <image>
./scripts/agentrun secrets codex render --dry-run [--profile codex|deepseek] [--codex-home <dir>]
./scripts/agentrun backends list
@@ -91,6 +95,7 @@ CLI 官方 TypeScript 入口固定为 `scripts/agentrun-cli.ts`。在 G14 非交
| AgentRun CLI 规格 | 已定义 | 本文为 v0.1 CLI 权威。 |
| `scripts/agentrun-cli.ts` | 已实现 | 已提供 run/command/event/backend/server 基础命令和 JSON envelope`scripts/agentrun` 是同一入口的 Bun 定位 launcher。 |
| CLI 调 manager REST | 已实现 | CLI 通过 `ManagerClient` 调 manager REST;自测试可用内存 manager,综合联调必须指向真实 `agentrun-v01` manager。 |
| runner start/job | 已实现 | `runner start` 可执行 host process runner`runner job --dry-run` 可渲染 Kubernetes Job JSON`runner job` 正式路径通过 manager REST 创建 Kubernetes Job 并快速返回 job identity、SecretRef、retention 和轮询命令。 |
| runner start/job | 已实现 | `runner start` 可执行 host process runner`runner job --dry-run` 可渲染 Kubernetes Job JSON`runner job` 正式路径通过 manager REST 创建 Kubernetes Job,支持 `--idempotency-key` 并快速返回 job identity、SecretRef、retention 和轮询命令。 |
| result/cancel CLI | 已实现 | `runs result``commands result``runs cancel``commands cancel` 均调用 manager REST,不维护独立状态。 |
| CLI 测试规格 | 已定义/已验证主闭环 | 综合联调见 [spec-v01-validation.md](spec-v01-validation.md);每次发布仍按手动交互验收复跑。 |
| `deepseek` profile CLI | 已实现/已通过主闭环 | `secrets codex render --profile deepseek``backends list``runner start --backend``runner job` 和 JSON 错误可见性已实现;真实 CLI/RESTful 联调已通过 `codex -> deepseek -> codex` 切换主闭环。 |
@@ -0,0 +1,138 @@
# v0.1 HWLAB 手动调度接入规格
本文定义 AgentRun `v0.1` 面向 HWLAB v0.2 的下一阶段服务目标:不以前置自动 scheduler 为条件,而是通过 `agentrun-mgr` 的手动调度 API 为 HWLAB 提供 canary Code Agent 执行服务。实施跟踪见 [pikasTech/agentrun#31](https://github.com/pikasTech/agentrun/issues/31)。
## 在系统中的职责划分
- HWLAB `hwlab-cloud-api` 是业务 dispatcher:继续负责用户登录、session owner、device-pod 授权、Workbench `/v1/agent/chat` 合同、result/trace/cancel 对外接口和业务权限判断。
- AgentRun `agentrun-mgr` 是执行事实 authority:负责 run、command、event、runner job、backend profile、SecretRef、terminal status 和手动调度 API。
- `agentrun-runner` 是短生命周期执行者:从 manager claim run、poll command、调用 backend adapter、append events、ack command、上报 terminal status。
- HWLAB 不直接写 AgentRun Postgres、不读取 AgentRun Secret、不直接创建 Kubernetes Job;所有跨服务操作只走 AgentRun RESTful API。
- AgentRun 不内建 HWLAB device-pod/gateway 业务授权;涉及硬件、用户授权或 device lease 的判断仍由 HWLAB 自己完成。
## 非目标
- 本阶段不要求自动 scheduler、pending scan、capacity selection 或长驻调度器。
- 不改变 HWLAB 对外 `/v1/agent/chat``/result``/trace``/cancel` 用户合同。
- 不使用 SSE、WebSocket、long-polling 或长同步 `turn` 请求替代 durable resource 模型。
- 不把 HWLAB 的 provider Secret、device token、gateway route 或 kubeconfig 复制给业务客户端。
- 不把 mock、fixture、source-only smoke 或 dry-run 结果当作 HWLAB canary 通过证据。
## 目标调用链
```text
HWLAB Workbench
-> hwlab-cloud-api /v1/agent/chat
-> HWLAB 用户/session/device 权限判断
-> AgentRun POST /api/v1/runs
-> AgentRun POST /api/v1/runs/:runId/commands
-> AgentRun POST /api/v1/runs/:runId/runner-jobs
-> agentrun-runner claim/poll/report
-> AgentRun events/command status/terminal_status
-> hwlab-cloud-api 映射为 HWLAB result/trace
```
HWLAB 应保存 `traceId -> runId/commandId/attemptId/jobName` 映射。用户轮询 HWLAB result/trace 时,HWLAB 从 AgentRun command status 与 run events 读取进展,再转换为 HWLAB 自己的前端 schema;浏览器不直接理解 AgentRun 内部 event schema。
## 手动调度 API
`POST /api/v1/runs/:runId/runner-jobs` 是 HWLAB canary 的正式手动调度入口。它只负责为一个已存在 run 和 command 显式创建 runner Job,不负责扫描 pending queue。
请求最小字段:
| 字段 | 规则 |
| --- | --- |
| `commandId` | 必填,必须属于 `runId`。 |
| `attemptId` | 可选;未提供时由 manager 生成,返回值必须可持久查询。 |
| `idempotencyKey` | HWLAB 必须用 `traceId``messageId` 或等价稳定 key;相同 key 和相同 payload 返回既有 job/attempt。 |
| `image` / `backendImageRef` | 只能来自 manager allowlist、GitOps/catalog 或受控默认值;客户端不能传任意镜像扩大执行面。 |
| `retention` / `ttlSecondsAfterFinished` | 可选;默认遵循 runner Job TTL 规格。 |
响应必须短返回 JSON,不等待完整模型 turn,至少包含:`runId``commandId``attemptId``jobName``namespace``runnerId``logPath``podIdentity`、后续 `commands show``events` 轮询入口。重复提交若 payload 不同,必须结构化失败,不能创建第二个同名业务 attempt。
## Run / Command 映射
HWLAB canary 创建 run 时应使用以下字段口径:
| 字段 | HWLAB canary 口径 |
| --- | --- |
| `tenantId` | `hwlab`。 |
| `projectId` | `pikasTech/HWLAB`。 |
| `providerId` | `G14`,只表示目标 provider,不授予 HWLAB 业务权限。 |
| `backendProfile` | `deepseek``codex`,由 HWLAB 显式选择;缺少 matching SecretRef 必须失败,不 fallback。 |
| `workspaceRef` | 必须引用 ResourceBundleRef 中的 Git-only full commit;不得由 runner 猜 host path。 |
| `executionPolicy` | sandbox、network、timeout、secretScope 必须显式,不得由 HWLAB 扩大 AgentRun Secret 范围。 |
| `traceSink` | 可指向 HWLAB trace adapter;为 `null` 时 HWLAB 仍可通过 AgentRun events 轮询。 |
Command 第一阶段只要求 `type=turn`。用户原始 prompt、conversation metadata、profile 选择和 HWLAB trace correlation 必须作为 command payload 的非敏感字段保存;不得把 cookie、session token、provider credential、device internal token 或 Secret value 写入 payload。
## 需要补齐的能力
### P0 trace/result 元语
AgentRun 标准 events 必须稳定到足以被 HWLAB 转换:
- `backend_status`profile、backendKind、protocol、attempt、resource/session 摘要,不包含 Secret 值。
- `assistant_message`:用户可见 assistant 文本,允许分片但必须能聚合为最终 reply。
- `tool_call`:工具名、状态、bounded 参数摘要和 redacted correlation。
- `command_output`stdout/stderr/diff 的 bounded summary、原始字节数、截断标记和 artifact/log 引用。
- `error``failureKind`、message、retryable、provider/infra/backend 分类和 redacted details。
- `terminal_status``completed``failed``blocked``cancelled`,是 completed 的唯一终态来源。
面向 HWLAB 的 result envelope 至少应能回答:`status``terminalStatus``reply``failureKind``blocker``lastSeq``eventCount``artifactSummary``runId``commandId``attemptId`。partial assistant 文本、transport close、idle timeout 或 stdout 存在都不能单独升级为 `completed`
### P0 cancel
AgentRun 需要提供 durable cancel 能力,建议形态为 `POST /api/v1/runs/:runId/cancel``POST /api/v1/commands/:commandId/cancel`。cancel 必须幂等;已 terminal 的对象返回当前终态。pending command 被 cancel 后不得再创建 runner Job。running runner 必须通过 poll、lease 或 heartbeat 观察 cancel,并传播到 backend interruptbackend 不支持 interrupt 时终止受控进程组。cancel 最终必须写入 event、command state 和 run status`failureKind` 使用 `cancelled`
### P1 SessionRef 持久化
`SessionRef` 需要从 `null/deferred` 升级为可选持久会话引用,支持 HWLAB `conversationId/sessionId/threadId` 到 AgentRun session identity 的映射。session 只能保存 backend thread/session/cache,不保存 API KEY、`auth.json``config.toml` 或完整 `CODEX_HOME`。session store 必须与 Secret projection、writable runtime home、Git workspace 分离。runner 启动时,有 SessionRef 则执行 `thread/resume`,没有则执行 `thread/start`profile 隔离、TTL、GC 和跨 profile 污染防护必须可见。
### P1 ResourceBundleRef / bundle materialization
`ResourceBundleRef` 必须按 Git-only 模型落地:`repoUrl + full commitId` 是唯一内容身份。runner 只能 checkout 到允许 workspace 前缀,不能覆盖 `/app`、Secret projection、profile runtime home 或 session 目录。第一阶段支持 `subdir``sparsePaths``submodules=false``lfs=false``credentialRef` 的最小字段即可。HWLAB canary 只需要 `pikasTech/HWLAB` 固定 full commit 的普通 checkout;用户上传文件和对象存储 artifact 不进入 `v0.1`
## 分阶段增强计划
| 阶段 | 目标 | 主要交付 | 验收重点 |
| --- | --- | --- | --- |
| 1 | 手动调度 API 固化 | `runner-jobs` request/response schema、idempotency、job identity、CLI 调同一 REST API | 重复 key 不重复创建;短返回;manager 重启后可查。 |
| 2 | trace/result 元语 | 标准 event 子集、terminal result envelope、bounded output metadata | HWLAB 可由 events 稳定生成 result/tracepartial 不误报 completed。 |
| 3 | cancel 闭环 | durable cancel API、runner cancel poll、backend interrupt/process group stop | pending/running/terminal 后 cancel 均幂等且可见。 |
| 4 | ResourceBundleRef materialization | Git-only checkout、workspace 前缀、commit/tree 摘要、failureKind | 使用 full commit;不接受 branch/tag/HEAD;不覆盖 Secret/session/runtime home。 |
| 5 | SessionRef 持久化 | session record/store、thread resume、TTL/GC、profile 隔离 | 同一 conversation 连续两轮可复用;不同 profile 不污染。 |
| 6 | HWLAB v0.2 canary | HWLAB dispatcher adapter、traceId 映射、result/trace 转换 | 普通自然语言最短 turn 真实 completed 且 reply 非空;device-pod 仍由 HWLAB 授权。 |
## 测试规格
### T1 手动调度 API
阅读本文和 [spec-v01-agentrun-mgr.md](spec-v01-agentrun-mgr.md),然后在真实 `agentrun-v01` runtime 中用 RESTful API 创建 `tenantId=hwlab` 的 run、提交 `turn` command、调用 `POST /api/v1/runs/:runId/runner-jobs`。确认每个 API 返回 JSON、60 秒内返回、不等待完整 turn,并返回 job identity 与后续 poll 入口。
### T2 幂等和重复提交
阅读本文,然后用相同 `idempotencyKey` 重复调用 runner job API。相同 payload 必须返回同一 attempt/job;不同 payload 必须结构化失败,且不能创建第二个 runner Job。
### T3 trace/result 映射
阅读本文和 [spec-v01-backend-adapter.md](spec-v01-backend-adapter.md),然后执行真实 Codex stdio turn,确认 events 中存在可转换为 HWLAB result/trace 的 `backend_status`、assistant 或 error、`terminal_status`,且 bounded output metadata 足够判断截断和 artifact 引用。
### T4 cancel
阅读本文,然后分别验证 pending cancel、running cancel、重复 cancel 和 terminal 后 cancel。确认 command/run 终态、events 和 failureKind 均为 `cancelled` 或当前既有 terminal 状态,日志不泄露 Secret。
### T5 SessionRef 与 ResourceBundleRef
阅读本文和 [spec-v01-runtime-assembly.md](spec-v01-runtime-assembly.md),然后验证一次带 SessionRef 和 Git-only ResourceBundleRef 的 runner Job。确认 session 不含 credential 文件,bundle 使用 full commit checkout 到允许 workspaceevent/result 能回答 session id、repo、commit 和 checkout 摘要。
## 实现状态
| 能力 | v0.1 状态 | 说明 |
| --- | --- | --- |
| 手动 runner Job API | 已实现 | `runner job` 通过 manager REST 创建 Kubernetes Job,支持 `idempotencyKey`、持久 runner job record、job identity、attempt/runner/jobName 返回和重复 payload 冲突保护。 |
| trace/result 元语 | 已实现最小合同 | 新增 run/command result envelope,聚合 terminal status、reply、failureKind、event cursor、artifact summary、attempt、SessionRef 和 ResourceBundleRef 摘要。 |
| cancel | 已实现最小闭环 | 已提供 run/command cancel APIpending cancel 会阻止新 runner Jobrunning runner 通过轮询触发 backend abort,终态写入 event、command state 和 run status。 |
| SessionRef | 已实现最小持久化 | run 可携带 `sessionRef`manager 保存 session/threadrunner 会按 threadId resumeresult envelope 暴露脱敏 session 摘要;TTL/GC 仍按后续运维策略细化。 |
| ResourceBundleRef | 已实现 Git-only materialization | run 可携带 `repoUrl + full commitId`runner checkout 到 `AGENTRUN_WORKSPACE_ROOT` 下的隔离目录并记录 commit/tree/workspace 摘要;上传文件和对象存储仍不进入 v0.1。 |
| HWLAB v0.2 canary | 待实现 | 需要 HWLAB dispatcher adapter 调 AgentRun 手动调度 API,并转换 result/trace。 |
+5 -3
View File
@@ -4,7 +4,7 @@
## 在系统中的职责划分
- 保存 `agentrun-mgr` 的 durable factsruns、commands、events、runners、backends、leases 和 migration ledger。
- 保存 `agentrun-mgr` 的 durable factsruns、commands、events、runners、runner_jobs、sessions、backends、leases 和 migration ledger。
- 为 runner claim、command ack、event append、heartbeat 和 terminal status 提供事务边界。
-`agentrun-v01` namespace 绑定;不复用未来 `v0.2``v0.3` 或旧 dev/prod 数据库。
- 不向公网开放,不允许普通 runner 直连写入不属于自己的 run facts。
@@ -34,6 +34,8 @@ Secret 名称和 key 可以在实现时按 Kubernetes 命名限制微调,但
- `agentrun_commands`command type、idempotency key、payload hash、state 和 ack timestamps。
- `agentrun_events`:按 run 和 seq 索引的 append-only event records。
- `agentrun_runners`registered runner identity、placement、heartbeat 和 capability snapshot。
- `agentrun_runner_jobs`:手动 runner Job 的 idempotency key、payload hash、attempt/job identity 和创建响应。
- `agentrun_sessions`SessionRef 到 backend thread/cache identity 的最小映射,不保存 credential 文件或 Secret 值。
- `agentrun_backends`backend profile、capabilities、capacity 和 health。
- `agentrun_leases`run ownership、expiry 和 stale recovery marker。
@@ -74,7 +76,7 @@ Secret 名称和 key 可以在实现时按 Kubernetes 命名限制微调,但
| --- | --- | --- |
| Postgres durable store 规格 | 已定义 | 本文为 v0.1 存储权威。 |
| StatefulSet/Service/PVC | 已实现/已通过主闭环 | `agentrun-v01-postgres` StatefulSet、Service 和 PVC 已由 GitOps runtime 提供,作为 `agentrun-v01` durable store。 |
| migration ledger | 已实现/已通过主闭环 | `agentrun-mgr` 启动 Postgres adapter 时幂等创建 `agentrun_schema_migrations` 并记录 migration id/checksum;当前最新 migration 为 `002_v01_backend_profiles`,用于 upsert `codex`/`deepseek` backend capabilityreadiness 必须显示 migration ready。 |
| manager Postgres adapter | 已实现/已通过主闭环 | `agentrun-mgr` 通过 `DATABASE_URL` 启用 Postgres adapter,持久化 runs、commands、events、runners、backends 和 leases;缺少 `DATABASE_URL` 时 live runtime fail fastmemory 只允许显式 self-test/dev。 |
| migration ledger | 已实现/已通过主闭环 | `agentrun-mgr` 启动 Postgres adapter 时幂等创建 `agentrun_schema_migrations` 并记录 migration id/checksum;当前最新 migration 为 `003_v01_hwlab_manual_dispatch`,用于新增 SessionRef、ResourceBundleRef 和 runner job idempotency 持久化readiness 必须显示 migration ready。 |
| manager Postgres adapter | 已实现/已通过主闭环 | `agentrun-mgr` 通过 `DATABASE_URL` 启用 Postgres adapter,持久化 runs、commands、events、runners、runner_jobs、sessions、backends 和 leases;缺少 `DATABASE_URL` 时 live runtime fail fastmemory 只允许显式 self-test/dev。 |
| health/readiness store 状态 | 已实现 | health/readiness 返回 adapter、reachable、migrationReady、migrationId、failureKind 和 redacted Secret 状态,不输出 DSN 明文。 |
| file/sqlite durable store | 不采用 | 只可用于临时本地测试,不作为 v0.1 runtime truth。 |
+6 -3
View File
@@ -53,9 +53,11 @@ P0 最小 JSON 形态:
### SessionRef
- P0 允许 `sessionRef=null`,表示不持久化 backend session 文件。
- 面向 HWLAB 手动调度 canarySessionRef 是 P1 必补能力:需要支持 `conversationId/sessionId/threadId` 到 backend session identity 的稳定映射。
- 一旦启用 session,必须只保存 backend session/cache,不保存 API KEY、`auth.json``config.toml` 或完整 `CODEX_HOME`
- session 文件目录必须和 profile credential、Git workspace 分开。
- v0.1 先定义边界,不要求实现 PVC/session restore
- runner 启动时,有 SessionRef 则执行 `thread/resume`,没有 SessionRef 则执行 `thread/start`;profile 切换不得复用另一 profile 的 session
- v0.1 先定义边界;持久 session store、TTL、GC 和 resume 验收按 [spec-v01-hwlab-manual-dispatch.md](spec-v01-hwlab-manual-dispatch.md) 分阶段推进。
### ResourceBundleRef
@@ -64,6 +66,7 @@ P0 最小 JSON 形态:
- 可选扩展只允许 `subdir``sparsePaths``submodules=false``lfs=false``credentialRef`;默认不启用。
- `credentialRef` 只用于拉取私有 Git repo,不等同于 backend API KEY。
- 不支持上传文件、对象存储 artifact、任意 ConfigMap 文件袋或 inline env;后续需要时另写版本规格。
- 面向 HWLAB 手动调度 canaryrunner materialization 必须把 Git-only bundle checkout 到允许 workspace 前缀,并在 event/result 中记录 repo、full commit、checkout path 和 tree 摘要;不得隐式使用 manager Pod、host path 或镜像内旧代码。
## 最简装配顺序
@@ -119,5 +122,5 @@ P0 最小 JSON 形态:
| --- | --- | --- |
| `BackendImageRef` | 部分实现 | CI/CD 已使用 digest-pinned runtime image;当前 runner/backend 仍复用 agentrun 镜像。 |
| `ProfileRef` | 已实现/已通过主闭环 | `codex``deepseek` 已通过 SecretRef、writable runtime home 和真实 stdio turn 验证。 |
| `SessionRef` | 规格已定义/P0 未实现 | 当前只持久化 run facts,不持久化 backend session 文件。 |
| `ResourceBundleRef` | 规格已定义/P0 Git-only/待实现 | 已明确最简 Git-only 模型,runner materialization 后续实现。 |
| `SessionRef` | 已实现最小持久化 | manager 持久化 `sessionId/conversationId/threadId`run 创建会解析既有 sessionrunner 按 threadId resumesession 不保存 credential 文件,TTL/GC 后续细化。 |
| `ResourceBundleRef` | 已实现 Git-only materialization | `repoUrl + full commitId` 已进入 run schema 和 runner checkoutworkspace 受 `AGENTRUN_WORKSPACE_ROOT` 限制,event/result 记录 commit/tree/workspace 摘要。 |
+12 -3
View File
@@ -15,6 +15,12 @@ AgentRun 是面向 UniDesk 与 HWLAB 的共享 Code Agent 执行基础设施。`
- RuntimeAssembly 是 runner/backend 启动前的四要素装配模型,负责把 backend image、profile、session 和 Git-only resource bundle 统一成受控 Job 输入;四要素权威规格见 [spec-v01-runtime-assembly.md](spec-v01-runtime-assembly.md)。
- Scheduler 是后续自动派发能力;`v0.1` 可以保留规格和状态字段,但不把自动调度作为第一阶段验收目标。
## HWLAB 手动调度服务目标
`v0.1` 面向 HWLAB v0.2 的下一阶段目标是通过手动调度 API 提供 canary Code Agent 执行服务,而不是先实现自动 scheduler。HWLAB `hwlab-cloud-api` 作为业务 dispatcher,完成用户/session/device 权限判断后,显式调用 AgentRun REST API 创建 run、提交 command、启动 runner Job,并把 AgentRun events/terminal status 转换回 HWLAB result/trace。完整目标、非目标、缺口和增强计划见 [spec-v01-hwlab-manual-dispatch.md](spec-v01-hwlab-manual-dispatch.md)。
该目标要求优先补齐四类能力:SessionRef 持久化、Git-only ResourceBundleRef materialization、durable cancel、trace/result 元语。上述四项已进入 v0.1 最小实现;自动 scheduler、跨 backend 自动路由和独立 UI 仍然 deferred,不能成为 HWLAB canary 的前置条件。
## 语言与协议选型
AgentRun `v0.1` 的自研组件优先使用 Bun + TypeScript 实现:`agentrun-mgr``agentrun-runner`、backend adapter、Codex backend、AgentRun CLI 和后续 scheduler 都属于该边界。官方 TypeScript CLI 入口固定为 `scripts/agentrun-cli.ts`,入口只做参数解析和路由,复杂逻辑拆到 `scripts/src/``src/`G14/CI/人工非交互命令使用 `./scripts/agentrun` launcher 启动同一入口。Postgres、Kubernetes、Tekton、Argo CD、YAML manifest 和 shell 级容器启动命令属于外部运行面或部署面,不受“必须 TypeScript 实现”的约束。
@@ -77,7 +83,8 @@ Runner inbound API 只允许本地或私有诊断,不作为业务客户端入
| AgentRun CLI | CLI/Job 工具 | 保留,P0 | JSON 输出、短返回、run/command/event/runner/backend 操作入口。 | `spec-v01-cli.md` |
| Postgres durable store | 稳定外部服务 | 保留,P0 | 使用 `agentrun-v01-postgres` 保存 runs、commands、events、runners、backends、leases 和 migration ledger;不使用 file/sqlite 作为 v0.1 durable store。 | `spec-v01-postgres.md` |
| Secret distribution | 系统能力 | 保留,P0 | Provider credential 只通过 Kubernetes SecretRef、ServiceAccount/RBAC 和 runner env/file projection 分发;Codex 测试凭据使用 `~/.codex/auth.json``~/.codex/config.toml` 生成 Secret projectionsource、GitOps、logs 和 events 不保存明文。 | `spec-v01-secret-distribution.md` |
| RuntimeAssembly | 系统能力 | 保留,P0 规格 | runner/backend 启动前的四要素装配模型:`BackendImageRef``ProfileRef``SessionRef` 和 Git-only `ResourceBundleRef`P0 先定义模型,ProfileRef 已实现,SessionRef/ResourceBundleRef 按 deferred 子项推进。 | `spec-v01-runtime-assembly.md` |
| RuntimeAssembly | 系统能力 | 保留,P0 规格 | runner/backend 启动前的四要素装配模型:`BackendImageRef``ProfileRef``SessionRef` 和 Git-only `ResourceBundleRef`ProfileRefSessionRefResourceBundleRef 已具备 v0.1 最小实现。 | `spec-v01-runtime-assembly.md` |
| HWLAB 手动调度接入 | canary 集成目标 | 保留,P0 规格 | HWLAB `hwlab-cloud-api` 显式创建 run/command 并调用 runner Job APIAgentRun 提供 durable facts、events、cancel、bundle 和 session 能力。 | `spec-v01-hwlab-manual-dispatch.md` |
| Tenant policy boundary | Run schema 合同 | 保留,P0 | 作为 `Run` 的必填字段和最小校验存在,不做独立 policy enginetenant 的业务授权仍由 UniDesk/HWLAB 判定。 | 并入 `spec-v01-agentrun-mgr.md` |
| Observability | 最小事件/日志合同 | 保留,P1 子项 | 作为 manager/runner 的 event、terminal status、failureKind、logPath 和 redaction 最小合同,不拆独立观测系统。 | 并入 `spec-v01-agentrun-mgr.md``spec-v01-agentrun-runner.md` |
| `agentrun-scheduler` | 长驻调度器 | Deferred | M1-M3 稳定后再实现自动 pending scan、capacity selection 和 runner Job 创建。 | `spec-v01-scheduler.md` |
@@ -95,6 +102,7 @@ GET /api/v1/runs/:runId
GET /api/v1/runs/:runId/events?afterSeq=0&limit=100
POST /api/v1/runs/:runId/commands
GET /api/v1/runs/:runId/commands/:commandId
POST /api/v1/runs/:runId/runner-jobs
GET /api/v1/backends
```
@@ -149,8 +157,8 @@ Manager 负责校验、保存和返回这些字段;runner 只能消费已保
| M1 | 最小 runner + 一个 backend | 一个 turn 经过真实 backendassistant/output/error/terminal events 被归一化。 |
| M2 | manager + runner claim | run create/query durableclaim 拒绝双 ownerevents append-only。 |
| M3 | 手动 dispatch CLI | CLI 快速返回 runner process/job identity、log path 和轮询命令。 |
| M4 | 自动 scheduler | Deferredpending 自动派发和 stale lease recovery 进入后续实现。 |
| M5 | UniDesk/HWLAB canary | Deferred;只在核心生命周期稳定后接入窄范围 canary。 |
| M4 | HWLAB 手动调度 canary | HWLAB 通过 REST API 显式启动 runner Job,并能轮询 result/trace 到终态。 |
| M5 | 自动 scheduler | Deferredpending 自动派发、capacity selection 和 stale lease recovery 进入后续实现。 |
## 测试规格
@@ -176,6 +184,7 @@ Manager 负责校验、保存和返回这些字段;runner 只能消费已保
| Postgres durable store 规格 | 已定义 | 见 [spec-v01-postgres.md](spec-v01-postgres.md)。 |
| Secret 分发规格 | 已定义 | 见 [spec-v01-secret-distribution.md](spec-v01-secret-distribution.md)。 |
| RuntimeAssembly 规格 | 已定义 | 见 [spec-v01-runtime-assembly.md](spec-v01-runtime-assembly.md)。 |
| HWLAB 手动调度接入规格 | 已定义 | 见 [spec-v01-hwlab-manual-dispatch.md](spec-v01-hwlab-manual-dispatch.md)。 |
| 两层验证规格 | 已定义 | 见 [spec-v01-validation.md](spec-v01-validation.md)。 |
| `agentrun-mgr` 服务规格 | 已定义 | 见 [spec-v01-agentrun-mgr.md](spec-v01-agentrun-mgr.md)。 |
| `agentrun-runner` 服务规格 | 已定义 | 见 [spec-v01-agentrun-runner.md](spec-v01-agentrun-runner.md)。 |
+6 -1
View File
@@ -57,7 +57,7 @@
6. manager 可查询 command state、append-only events、terminal_status 和 redacted logPath/job identity。
7. 重启 `agentrun-mgr` 后,run、command、events 和 terminal_status 仍可从 Postgres 查询。
8. 日志、event、CLI 输出和 health 中没有 provider credential、DSN password、token 或 URL credential 明文。
9. 若变更涉及 RuntimeAssembly,必须能追溯 `BackendImageRef``ProfileRef``SessionRef``ResourceBundleRef` 的装配状态;P0 未实现的 session/resource 能力必须显式为 `null` 或 deferred,不能由 runner 隐式猜测。
9. 若变更涉及 RuntimeAssembly,必须能追溯 `BackendImageRef``ProfileRef``SessionRef``ResourceBundleRef` 的装配状态;未提供 session/resource 必须显式为 `null`,提供时必须能查到 session/thread 和 Git commit/tree/workspace 摘要,不能由 runner 隐式猜测。
### CLI 交互联调标准
@@ -162,6 +162,10 @@ T8 是涉及 backend profile 变更时的综合联调标准;不涉及 backend
阅读本文和 [spec-v01-runtime-assembly.md](spec-v01-runtime-assembly.md),然后检查一次真实 runner Job 或 dry-run manifest,确认四个问题都有明确答案:用哪一个 digest-pinned image、用哪一个 profile/SecretRef、是否使用 session、使用哪一个 Git repo/full commit。P0 未启用 session 或 Git-only resource materialization 时,必须明确为 `null` 或 deferred,不能由 runner 隐式使用 host path。任何 Secret value、用户文件正文、env dump、branch/tag/HEAD 形式的 resource commit 都不能作为通过证据。
### T10 HWLAB 手动调度 canary 验收
阅读本文和 [spec-v01-hwlab-manual-dispatch.md](spec-v01-hwlab-manual-dispatch.md),然后在真实 `agentrun-v01` runtime 中模拟 HWLAB dispatcher:创建 `tenantId=hwlab` 的 run,提交 `turn` command,调用 `POST /api/v1/runs/:runId/runner-jobs`,轮询 command、events 和 result 到 terminal。确认 API 全部短返回 JSONrunner Job identity 可见,events/result 能转换为 HWLAB result/trace,并验证 runner job idempotency、cancel、SessionRef 和 ResourceBundleRef 的真实路径;不能用 mock pass 替代真实 canary。
## 规格的实现情况
| 规格项 | 状态 | 说明 |
@@ -174,4 +178,5 @@ T8 是涉及 backend profile 变更时的综合联调标准;不涉及 backend
| 真实主闭环 | 已通过 | 当前 v0.1 已通过真实 Tekton/Argo、Postgres、SecretRef、Kubernetes runner Job、Codex stdio turn、RESTful API 和 CLI 主闭环;每次发布仍需按本文手动复验。 |
| `deepseek` profile 切换验收 | 已通过主闭环 | 自测试和 CLI smoke 已覆盖 profile registry、Secret render、fake stdio turn、无 fallback 和结构化错误;真实 `agentrun-v01` 已按 T8 完成 `codex -> deepseek -> codex` 切换综合联调。后续涉及 backend profile 的发布仍必须按 T8 复验。 |
| RuntimeAssembly 四要素验收 | 已定义 | T9 收敛为四个最简问题:image digest、profile/SecretRef、session null/deferred、Git-only repo/full commitsession/resource materialization 后续实现时必须补真实联调。 |
| HWLAB 手动调度 canary 验收 | 已定义 | T10 规定 HWLAB dispatcher 通过手动 runner Job API 使用 AgentRun 的真实联调口径;自动 scheduler 不是前置条件。 |
| mock 作为发布证据 | 不采用 | mock 只能证明自测试通过。 |
+23 -1
View File
@@ -36,6 +36,11 @@ async function dispatch(args: ParsedArgs): Promise<JsonValue> {
if (group === "runs" && command === "create") return client(args).post("/api/v1/runs", await jsonFile(args));
if (group === "runs" && command === "show" && id) return client(args).get(`/api/v1/runs/${encodeURIComponent(id)}`);
if (group === "runs" && command === "events" && id) return client(args).get(`/api/v1/runs/${encodeURIComponent(id)}/events?afterSeq=${flag(args, "after-seq", "0")}&limit=${flag(args, "limit", "100")}`);
if (group === "runs" && command === "result" && id) {
const commandId = optionalFlag(args, "command-id");
return client(args).get(`/api/v1/runs/${encodeURIComponent(id)}/result${commandId ? `?commandId=${encodeURIComponent(commandId)}` : ""}`);
}
if (group === "runs" && command === "cancel" && id) return client(args).post(`/api/v1/runs/${encodeURIComponent(id)}/cancel`, cancelBody(args));
if (group === "commands" && command === "create" && id) {
const body = await jsonFile(args);
if (!body.type) body.type = flag(args, "type", "turn");
@@ -48,6 +53,12 @@ async function dispatch(args: ParsedArgs): Promise<JsonValue> {
if (!runId) throw new AgentRunError("schema-invalid", "commands show requires --run-id", { httpStatus: 2 });
return client(args).get(`/api/v1/runs/${encodeURIComponent(runId)}/commands/${encodeURIComponent(id)}`);
}
if (group === "commands" && command === "result" && id) {
const runId = flag(args, "run-id", "");
if (!runId) throw new AgentRunError("schema-invalid", "commands result requires --run-id", { httpStatus: 2 });
return client(args).get(`/api/v1/runs/${encodeURIComponent(runId)}/commands/${encodeURIComponent(id)}/result`);
}
if (group === "commands" && command === "cancel" && id) return client(args).post(`/api/v1/commands/${encodeURIComponent(id)}/cancel`, cancelBody(args));
if (group === "runner" && command === "start") {
const runId = flag(args, "run-id", "");
if (!runId) throw new AgentRunError("schema-invalid", "runner start requires --run-id", { httpStatus: 2 });
@@ -86,11 +97,13 @@ async function renderRunnerJob(args: ParsedArgs): Promise<JsonRecord> {
const runnerId = optionalFlag(args, "runner-id");
const sourceCommit = optionalFlag(args, "source-commit");
const runnerManagerUrl = optionalFlag(args, "runner-manager-url");
const idempotencyKey = optionalFlag(args, "idempotency-key");
if (namespace) body.namespace = namespace;
if (attemptId) body.attemptId = attemptId;
if (runnerId) body.runnerId = runnerId;
if (sourceCommit) body.sourceCommit = sourceCommit;
if (runnerManagerUrl) body.managerUrl = runnerManagerUrl;
if (idempotencyKey) body.idempotencyKey = idempotencyKey;
return await client(args).post(`/api/v1/runs/${encodeURIComponent(runId)}/runner-jobs`, body) as JsonRecord;
}
if (!image) throw new AgentRunError("schema-invalid", "runner job --dry-run requires --image", { httpStatus: 2 });
@@ -188,16 +201,25 @@ function optionalFlag(args: ParsedArgs, name: string): string | null {
return typeof value === "string" && value.length > 0 ? value : null;
}
function cancelBody(args: ParsedArgs): JsonRecord {
const reason = optionalFlag(args, "reason");
return reason ? { reason } : {};
}
function help(): JsonRecord {
return {
commands: [
"runs create --json-file <run.json>",
"runs show <runId>",
"runs events <runId> --after-seq <n> --limit <n>",
"runs result <runId> [--command-id <commandId>]",
"runs cancel <runId> [--reason <text>]",
"commands create <runId> --type turn --json-file <payload.json>",
"commands show <commandId> --run-id <runId>",
"commands result <commandId> --run-id <runId>",
"commands cancel <commandId> [--reason <text>]",
"runner start --run-id <runId> [--backend codex|deepseek]",
"runner job --run-id <runId> --command-id <commandId> [--image <image>] [--runner-manager-url <url>]",
"runner job --run-id <runId> --command-id <commandId> [--image <image>] [--runner-manager-url <url>] [--idempotency-key <key>]",
"runner job --dry-run --run-id <runId> --command-id <commandId> --image <image>",
"secrets codex render --dry-run [--profile codex|deepseek] [--codex-home <dir>] [--namespace agentrun-v01] [--secret-name <name>]",
"backends list",
+5 -1
View File
@@ -6,6 +6,8 @@ export interface BackendAdapterOptions {
codexCommand?: string;
codexArgs?: string[];
codexHome?: string;
workspacePath?: string;
abortSignal?: AbortSignal;
env?: NodeJS.ProcessEnv;
}
@@ -18,16 +20,18 @@ export async function runBackendTurn(run: RunRecord, command: CommandRecord, opt
const turnOptions: CodexStdioTurnOptions = {
backendProfile: run.backendProfile,
prompt,
cwd: typeof run.workspaceRef.path === "string" ? run.workspaceRef.path : process.cwd(),
cwd: options.workspacePath ?? (typeof run.workspaceRef.path === "string" ? run.workspaceRef.path : process.cwd()),
approvalPolicy: run.executionPolicy.approval,
sandbox: run.executionPolicy.sandbox,
timeoutMs: run.executionPolicy.timeoutMs,
};
if (typeof command.payload.model === "string") turnOptions.model = command.payload.model;
if (typeof command.payload.threadId === "string") turnOptions.threadId = command.payload.threadId;
else if (typeof run.sessionRef?.threadId === "string") turnOptions.threadId = run.sessionRef.threadId;
if (options.codexCommand) turnOptions.command = options.codexCommand;
if (options.codexArgs) turnOptions.args = options.codexArgs;
if (options.env) turnOptions.env = options.env;
if (options.codexHome) turnOptions.codexHome = options.codexHome;
if (options.abortSignal) turnOptions.abortSignal = options.abortSignal;
return runCodexStdioTurn(turnOptions);
}
+16
View File
@@ -45,6 +45,7 @@ export interface CodexStdioTurnOptions {
args?: string[];
env?: NodeJS.ProcessEnv;
codexHome?: string;
abortSignal?: AbortSignal;
}
interface PendingRequest {
@@ -273,6 +274,12 @@ export async function runCodexStdioTurn(options: CodexStdioTurnOptions): Promise
runtime: runtimeSummary(options, env, codexHome),
},
}];
if (options.abortSignal?.aborted) {
const cancelled = { status: "cancelled" as const, failureKind: "cancelled" as const, message: "cancel requested" };
events.push({ type: "backend_status", payload: { phase: "turn-cancelled", failureKind: "cancelled" } });
events.push({ type: "terminal_status", payload: { terminalStatus: cancelled.status, failureKind: cancelled.failureKind, message: cancelled.message } });
return { terminalStatus: cancelled.status, failureKind: cancelled.failureKind, failureMessage: cancelled.message, events: events.map((event) => ({ ...event, payload: redactJson(event.payload) })) };
}
let assistantText = "";
let threadId: string | undefined = options.threadId;
let turnId: string | undefined;
@@ -281,6 +288,14 @@ export async function runCodexStdioTurn(options: CodexStdioTurnOptions): Promise
const terminalPromise = new Promise<void>((resolve) => { terminalResolve = resolve; });
let client: CodexStdioClient | null = null;
const requestTimeoutMs = Math.min(positiveTimeout(options.timeoutMs), requestTimeoutCapMs);
const abortTurn = (): void => {
if (terminal) return;
terminal = { status: "cancelled", failureKind: "cancelled", message: "cancel requested" };
events.push({ type: "backend_status", payload: { phase: "turn-cancelled", failureKind: "cancelled" } });
client?.stop();
terminalResolve();
};
options.abortSignal?.addEventListener("abort", abortTurn, { once: true });
const timeout = setTimeout(() => {
if (terminal) return;
terminal = { status: "failed", failureKind: "backend-timeout", message: `codex stdio turn timed out after ${options.timeoutMs}ms` };
@@ -340,6 +355,7 @@ export async function runCodexStdioTurn(options: CodexStdioTurnOptions): Promise
events.push({ type: "error", payload: { failureKind: failure.failureKind, message: failure.message, phase: failure.phase, details: failure.details } });
}
} finally {
options.abortSignal?.removeEventListener("abort", abortTurn);
clearTimeout(timeout);
if (client) {
client.stop();
+55
View File
@@ -31,6 +31,14 @@ export interface WorkspaceRef extends JsonRecord {
branch?: string;
}
export interface SessionRef extends JsonRecord {
sessionId: string;
conversationId?: string;
threadId?: string;
expiresAt?: string;
metadata?: JsonRecord;
}
export interface SecretRef extends JsonRecord {
namespace?: string;
name: string;
@@ -38,6 +46,17 @@ export interface SecretRef extends JsonRecord {
mountPath?: string;
}
export interface ResourceBundleRef extends JsonRecord {
kind: "git";
repoUrl: string;
commitId: string;
subdir?: string;
sparsePaths?: string[];
submodules?: false;
lfs?: false;
credentialRef?: SecretRef;
}
export interface ExecutionPolicy extends JsonRecord {
sandbox: string;
approval: string;
@@ -56,6 +75,8 @@ export interface CreateRunInput extends JsonRecord {
tenantId: string;
projectId: string;
workspaceRef: WorkspaceRef;
sessionRef?: SessionRef | null;
resourceBundleRef?: ResourceBundleRef | null;
providerId: string;
backendProfile: BackendProfile;
executionPolicy: ExecutionPolicy;
@@ -64,6 +85,8 @@ export interface CreateRunInput extends JsonRecord {
export interface RunRecord extends CreateRunInput {
id: string;
sessionRef: SessionRef | null;
resourceBundleRef: ResourceBundleRef | null;
status: RunStatus;
terminalStatus: TerminalStatus | null;
failureKind: FailureKind | null;
@@ -113,6 +136,38 @@ export interface RunnerRecord extends JsonRecord {
heartbeatAt: string;
}
export interface SessionRecord extends JsonRecord {
sessionId: string;
tenantId: string;
projectId: string;
backendProfile: BackendProfile;
conversationId: string | null;
threadId: string | null;
metadata: JsonRecord;
createdAt: string;
updatedAt: string;
expiresAt: string | null;
}
export interface RunnerJobRecord extends JsonRecord {
id: string;
runId: string;
commandId: string;
idempotencyKey: string | null;
payloadHash: string;
attemptId: string;
runnerId: string;
namespace: string;
jobName: string;
managerUrl: string;
image: string;
sourceCommit: string;
serviceAccountName: string | null;
result: JsonRecord;
createdAt: string;
updatedAt: string;
}
export interface BackendEvent {
type: EventType;
payload: JsonRecord;
+65 -1
View File
@@ -1,5 +1,5 @@
import { createHash, randomUUID } from "node:crypto";
import type { BackendProfile, CreateCommandInput, CreateRunInput, ExecutionPolicy, JsonRecord, JsonValue } from "./types.js";
import type { BackendProfile, CreateCommandInput, CreateRunInput, ExecutionPolicy, JsonRecord, JsonValue, ResourceBundleRef, SecretRef, SessionRef } from "./types.js";
import { AgentRunError } from "./errors.js";
import { backendProfileSpec, backendProfiles, isBackendProfile } from "./backend-profiles.js";
@@ -51,6 +51,8 @@ export function validateCreateRun(input: unknown): CreateRunInput {
tenantId,
projectId: requiredString(record, "projectId"),
workspaceRef: requiredRecord(record, "workspaceRef") as CreateRunInput["workspaceRef"],
sessionRef: validateSessionRef(record.sessionRef),
resourceBundleRef: validateResourceBundleRef(record.resourceBundleRef),
providerId: requiredString(record, "providerId"),
backendProfile,
executionPolicy,
@@ -58,6 +60,50 @@ export function validateCreateRun(input: unknown): CreateRunInput {
};
}
export function validateSessionRef(value: unknown): SessionRef | null {
if (value === undefined || value === null) return null;
const record = asRecord(value, "sessionRef");
const sessionId = requiredString(record, "sessionId");
const result: SessionRef = { sessionId };
const conversationId = optionalString(record.conversationId);
const threadId = optionalString(record.threadId);
const expiresAt = optionalString(record.expiresAt);
const metadata = record.metadata === undefined ? undefined : asRecord(record.metadata, "sessionRef.metadata");
if (conversationId) result.conversationId = conversationId;
if (threadId) result.threadId = threadId;
if (expiresAt) result.expiresAt = expiresAt;
if (metadata) result.metadata = metadata;
return result;
}
export function validateResourceBundleRef(value: unknown): ResourceBundleRef | null {
if (value === undefined || value === null) return null;
const record = asRecord(value, "resourceBundleRef");
const kind = requiredString(record, "kind");
if (kind !== "git") throw new AgentRunError("schema-invalid", "resourceBundleRef.kind must be git in v0.1", { httpStatus: 400 });
const repoUrl = requiredString(record, "repoUrl");
const commitId = requiredString(record, "commitId");
if (!/^[0-9a-f]{40}$/u.test(commitId)) throw new AgentRunError("schema-invalid", "resourceBundleRef.commitId must be a full 40-character git commit sha", { httpStatus: 400 });
const result: ResourceBundleRef = { kind: "git", repoUrl, commitId };
const subdir = optionalString(record.subdir);
if (subdir) {
if (subdir.startsWith("/") || subdir.includes("..")) throw new AgentRunError("schema-invalid", "resourceBundleRef.subdir must stay within the checkout", { httpStatus: 400 });
result.subdir = subdir;
}
if (record.sparsePaths !== undefined) {
if (!Array.isArray(record.sparsePaths) || !record.sparsePaths.every((item) => typeof item === "string" && item.length > 0 && !item.startsWith("/") && !item.includes(".."))) {
throw new AgentRunError("schema-invalid", "resourceBundleRef.sparsePaths must be relative path strings", { httpStatus: 400 });
}
result.sparsePaths = record.sparsePaths as string[];
}
if (record.submodules !== undefined && record.submodules !== false) throw new AgentRunError("schema-invalid", "resourceBundleRef.submodules can only be false in v0.1", { httpStatus: 400 });
if (record.lfs !== undefined && record.lfs !== false) throw new AgentRunError("schema-invalid", "resourceBundleRef.lfs can only be false in v0.1", { httpStatus: 400 });
if (record.submodules === false) result.submodules = false;
if (record.lfs === false) result.lfs = false;
if (record.credentialRef !== undefined) result.credentialRef = validateSecretRef(asRecord(record.credentialRef, "resourceBundleRef.credentialRef"));
return result;
}
export function validateExecutionPolicy(record: JsonRecord): ExecutionPolicy {
const timeout = record.timeoutMs;
if (typeof timeout !== "number" || !Number.isFinite(timeout) || timeout <= 0) throw new AgentRunError("schema-invalid", "executionPolicy.timeoutMs must be a positive number", { httpStatus: 400 });
@@ -87,6 +133,24 @@ export function validateExecutionPolicy(record: JsonRecord): ExecutionPolicy {
};
}
function validateSecretRef(record: JsonRecord): SecretRef {
const name = requiredString(record, "name");
const result: SecretRef = { name };
const namespace = optionalString(record.namespace);
const mountPath = optionalString(record.mountPath);
if (namespace) result.namespace = namespace;
if (mountPath) result.mountPath = mountPath;
if (record.keys !== undefined) {
if (!Array.isArray(record.keys) || !record.keys.every((item) => typeof item === "string" && item.length > 0)) throw new AgentRunError("schema-invalid", "secretRef.keys must be non-empty strings", { httpStatus: 400 });
result.keys = record.keys as string[];
}
return result;
}
function optionalString(value: unknown): string | undefined {
return typeof value === "string" && value.trim().length > 0 ? value.trim() : undefined;
}
function validateBackendSecretScope(backendProfile: BackendProfile, executionPolicy: ExecutionPolicy): void {
const credentials = executionPolicy.secretScope.providerCredentials ?? [];
const matching = credentials.filter((item) => item.profile === backendProfile);
+47 -2
View File
@@ -1,8 +1,10 @@
import { spawn } from "node:child_process";
import { AgentRunError } from "../common/errors.js";
import { redactJson, redactText } from "../common/redaction.js";
import { isTerminalCommandState, isTerminalRunStatus, summarizeResourceBundleRef, summarizeSessionRef } from "./store.js";
import type { AgentRunStore } from "./store.js";
import type { JsonRecord } from "../common/types.js";
import { stableHash } from "../common/validation.js";
import { renderRunnerJobManifest } from "../runner/k8s-job.js";
export interface RunnerJobDefaults {
@@ -23,6 +25,7 @@ export interface CreateRunnerJobInput extends JsonRecord {
runnerId?: string;
sourceCommit?: string;
serviceAccountName?: string;
idempotencyKey?: string;
}
export async function createKubernetesRunnerJob(options: { store: AgentRunStore; runId: string; input: CreateRunnerJobInput; defaults: RunnerJobDefaults }): Promise<JsonRecord> {
@@ -38,6 +41,15 @@ export async function createKubernetesRunnerJob(options: { store: AgentRunStore;
const managerUrl = optionalString(options.input.managerUrl) ?? options.defaults.managerUrl;
const sourceCommit = optionalString(options.input.sourceCommit) ?? options.defaults.sourceCommit;
const serviceAccountName = optionalString(options.input.serviceAccountName) ?? options.defaults.serviceAccountName;
const idempotencyKey = optionalString(options.input.idempotencyKey);
const normalizedPayload = { commandId, image, namespace, managerUrl, sourceCommit, serviceAccountName: serviceAccountName ?? null, attemptId: optionalString(options.input.attemptId) ?? null, runnerId: optionalString(options.input.runnerId) ?? null };
const payloadHash = stableHash(normalizedPayload);
if (idempotencyKey) {
const existing = await options.store.getRunnerJobByIdempotencyKey(run.id, idempotencyKey, payloadHash);
if (existing) return { ...existing.result, idempotentReplay: true };
}
if (isTerminalRunStatus(run.status)) throw new AgentRunError(run.failureKind ?? (run.status === "cancelled" ? "cancelled" : "schema-invalid"), `run ${run.id} is already terminal: ${run.status}`, { httpStatus: 409 });
if (isTerminalCommandState(command.state) || command.state !== "pending") throw new AgentRunError(command.state === "cancelled" ? "cancelled" : "schema-invalid", `command ${commandId} is not pending: ${command.state}`, { httpStatus: 409 });
const renderOptions = {
run,
@@ -52,9 +64,15 @@ export async function createKubernetesRunnerJob(options: { store: AgentRunStore;
const runnerId = optionalString(options.input.runnerId);
const render = renderRunnerJobManifest({ ...renderOptions, ...(attemptId ? { attemptId } : {}), ...(runnerId ? { runnerId } : {}) });
const created = await kubectlCreate(render.manifest, options.defaults.kubectlCommand ?? "kubectl");
return {
const response = {
action: "create-kubernetes-job",
mutation: true,
runId: run.id,
commandId,
attemptId: render.attemptId,
runnerId: render.runnerId,
namespace: render.namespace,
jobName: render.jobName,
jobIdentity: {
kind: "Job",
namespace: render.namespace,
@@ -90,7 +108,34 @@ export async function createKubernetesRunnerJob(options: { store: AgentRunStore;
kind: objectPath(created, ["kind"]),
resourceVersion: objectPath(created, ["metadata", "resourceVersion"]),
},
};
} satisfies JsonRecord;
const saved = await options.store.saveRunnerJob({
runId: run.id,
commandId,
idempotencyKey: idempotencyKey ?? null,
payloadHash,
attemptId: render.attemptId,
runnerId: render.runnerId,
namespace: render.namespace,
jobName: render.jobName,
managerUrl,
image,
sourceCommit,
serviceAccountName: serviceAccountName ?? null,
result: response,
});
await options.store.appendEvent(run.id, "backend_status", {
phase: "runner-job-created",
commandId,
attemptId: saved.attemptId,
runnerId: saved.runnerId,
namespace: saved.namespace,
jobName: saved.jobName,
idempotencyKey: idempotencyKey ? "present" : null,
sessionRef: summarizeSessionRef(run.sessionRef ?? null),
resourceBundleRef: summarizeResourceBundleRef(run.resourceBundleRef ?? null),
});
return response;
}
async function kubectlCreate(manifest: JsonRecord, kubectlCommand: string): Promise<JsonRecord> {
+265 -20
View File
@@ -3,10 +3,10 @@ import { Pool } from "pg";
import type { PoolClient, QueryResultRow } from "pg";
import { AgentRunError } from "../common/errors.js";
import { redactJson } from "../common/redaction.js";
import type { BackendProfile, BackendTurnResult, CommandRecord, CommandState, CreateCommandInput, CreateRunInput, EventType, FailureKind, JsonRecord, JsonValue, RunEvent, RunnerRecord, RunRecord, RunStatus, TerminalStatus } from "../common/types.js";
import type { BackendProfile, BackendTurnResult, CommandRecord, CommandState, CreateCommandInput, CreateRunInput, EventType, FailureKind, JsonRecord, JsonValue, RunEvent, RunnerJobRecord, RunnerRecord, RunRecord, RunStatus, SessionRecord, SessionRef, TerminalStatus } from "../common/types.js";
import { newId, nowIso, stableHash } from "../common/validation.js";
import type { AgentRunStore, StoreHealth } from "./store.js";
import { commandStateFromTerminal, statusFromTerminal } from "./store.js";
import type { AgentRunStore, SaveRunnerJobInput, StoreHealth } from "./store.js";
import { commandStateFromTerminal, isTerminalCommandState, isTerminalRunStatus, sessionRefFromRecord, statusFromTerminal, summarizeResourceBundleRef, summarizeSessionRef } from "./store.js";
import { backendCapabilitiesSqlValues } from "../common/backend-profiles.js";
interface PostgresStoreOptions {
@@ -126,6 +126,50 @@ ON CONFLICT (profile) DO UPDATE SET
updated_at = EXCLUDED.updated_at;
`;
const hwlabManualDispatchMigrationSql = `
ALTER TABLE agentrun_runs ADD COLUMN IF NOT EXISTS session_ref jsonb;
ALTER TABLE agentrun_runs ADD COLUMN IF NOT EXISTS resource_bundle_ref jsonb;
CREATE TABLE IF NOT EXISTS agentrun_sessions (
session_id text PRIMARY KEY,
tenant_id text NOT NULL,
project_id text NOT NULL,
backend_profile text NOT NULL,
conversation_id text,
thread_id text,
metadata jsonb NOT NULL DEFAULT '{}'::jsonb,
created_at timestamptz NOT NULL,
updated_at timestamptz NOT NULL,
expires_at timestamptz
);
CREATE TABLE IF NOT EXISTS agentrun_runner_jobs (
id text PRIMARY KEY,
run_id text NOT NULL REFERENCES agentrun_runs(id) ON DELETE CASCADE,
command_id text NOT NULL REFERENCES agentrun_commands(id) ON DELETE CASCADE,
idempotency_key text,
payload_hash text NOT NULL,
attempt_id text NOT NULL,
runner_id text NOT NULL,
namespace text NOT NULL,
job_name text NOT NULL,
manager_url text NOT NULL,
image text NOT NULL,
source_commit text NOT NULL,
service_account_name text,
result jsonb NOT NULL,
created_at timestamptz NOT NULL,
updated_at timestamptz NOT NULL
);
CREATE UNIQUE INDEX IF NOT EXISTS agentrun_runner_jobs_run_idempotency_key_idx
ON agentrun_runner_jobs (run_id, idempotency_key)
WHERE idempotency_key IS NOT NULL;
CREATE INDEX IF NOT EXISTS agentrun_runner_jobs_run_command_idx ON agentrun_runner_jobs (run_id, command_id, created_at);
CREATE INDEX IF NOT EXISTS agentrun_sessions_tenant_project_idx ON agentrun_sessions (tenant_id, project_id, backend_profile, updated_at);
`;
const postgresMigrations: MigrationDefinition[] = [
{
id: "001_v01_initial_durable_store",
@@ -137,13 +181,18 @@ const postgresMigrations: MigrationDefinition[] = [
checksum: checksumSql(backendProfilesMigrationSql),
sql: backendProfilesMigrationSql,
},
{
id: "003_v01_hwlab_manual_dispatch",
checksum: checksumSql(hwlabManualDispatchMigrationSql),
sql: hwlabManualDispatchMigrationSql,
},
];
export function postgresMigrationContract(): JsonRecord {
return {
migrationIds: postgresMigrations.map((migration) => migration.id),
latestMigrationId: latestMigrationId(),
requiredTables: ["agentrun_schema_migrations", "agentrun_runs", "agentrun_commands", "agentrun_events", "agentrun_runners", "agentrun_backends", "agentrun_leases"],
requiredTables: ["agentrun_schema_migrations", "agentrun_runs", "agentrun_commands", "agentrun_events", "agentrun_runners", "agentrun_backends", "agentrun_leases", "agentrun_sessions", "agentrun_runner_jobs"],
checksums: Object.fromEntries(postgresMigrations.map((migration) => [migration.id, migration.checksum])),
};
}
@@ -201,14 +250,15 @@ CREATE TABLE IF NOT EXISTS agentrun_schema_migrations (
async createRun(input: CreateRunInput): Promise<RunRecord> {
const at = nowIso();
const run: RunRecord = { ...input, id: newId("run"), status: "pending", terminalStatus: null, failureKind: null, failureMessage: null, createdAt: at, updatedAt: at, claimedBy: null, leaseExpiresAt: null };
return this.withTransaction(async (client) => {
const sessionRef = await this.resolveSessionForRun(client, input, at);
const run: RunRecord = { ...input, sessionRef, resourceBundleRef: input.resourceBundleRef ?? null, id: newId("run"), status: "pending", terminalStatus: null, failureKind: null, failureMessage: null, createdAt: at, updatedAt: at, claimedBy: null, leaseExpiresAt: null };
await client.query(
`INSERT INTO agentrun_runs (id, tenant_id, project_id, workspace_ref, provider_id, backend_profile, execution_policy, trace_sink, status, terminal_status, failure_kind, failure_message, created_at, updated_at, claimed_by, lease_expires_at)
VALUES ($1, $2, $3, $4::jsonb, $5, $6, $7::jsonb, $8::jsonb, $9, $10, $11, $12, $13, $14, $15, $16)`,
[run.id, run.tenantId, run.projectId, JSON.stringify(run.workspaceRef), run.providerId, run.backendProfile, JSON.stringify(run.executionPolicy), JSON.stringify(run.traceSink), run.status, run.terminalStatus, run.failureKind, run.failureMessage, run.createdAt, run.updatedAt, run.claimedBy, run.leaseExpiresAt],
`INSERT INTO agentrun_runs (id, tenant_id, project_id, workspace_ref, session_ref, resource_bundle_ref, provider_id, backend_profile, execution_policy, trace_sink, status, terminal_status, failure_kind, failure_message, created_at, updated_at, claimed_by, lease_expires_at)
VALUES ($1, $2, $3, $4::jsonb, $5::jsonb, $6::jsonb, $7, $8, $9::jsonb, $10::jsonb, $11, $12, $13, $14, $15, $16, $17, $18)`,
[run.id, run.tenantId, run.projectId, JSON.stringify(run.workspaceRef), JSON.stringify(run.sessionRef), JSON.stringify(run.resourceBundleRef), run.providerId, run.backendProfile, JSON.stringify(run.executionPolicy), JSON.stringify(run.traceSink), run.status, run.terminalStatus, run.failureKind, run.failureMessage, run.createdAt, run.updatedAt, run.claimedBy, run.leaseExpiresAt],
);
await this.appendEventWithLockedRun(client, run.id, "backend_status", { phase: "run-created", backendProfile: run.backendProfile });
await this.appendEventWithLockedRun(client, run.id, "backend_status", { phase: "run-created", backendProfile: run.backendProfile, sessionRef: summarizeSessionRef(run.sessionRef ?? null), resourceBundleRef: summarizeResourceBundleRef(run.resourceBundleRef ?? null) });
return run;
});
}
@@ -285,10 +335,55 @@ CREATE TABLE IF NOT EXISTS agentrun_schema_migrations (
return runnerFromRow(result.rows[0]);
}
async listRunnerJobs(runId: string, commandId?: string): Promise<RunnerJobRecord[]> {
await this.getRun(runId);
const params: unknown[] = [runId];
let where = "run_id = $1";
if (commandId) {
params.push(commandId);
where += ` AND command_id = $${params.length}`;
}
const result = await this.pool.query(`SELECT * FROM agentrun_runner_jobs WHERE ${where} ORDER BY created_at ASC`, params);
return result.rows.map(runnerJobFromRow);
}
async getRunnerJobByIdempotencyKey(runId: string, idempotencyKey: string, payloadHash: string): Promise<RunnerJobRecord | null> {
const result = await this.pool.query("SELECT * FROM agentrun_runner_jobs WHERE run_id = $1 AND idempotency_key = $2", [runId, idempotencyKey]);
const row = result.rows[0];
if (!row) return null;
const record = runnerJobFromRow(row);
if (record.payloadHash !== payloadHash) throw new AgentRunError("schema-invalid", "runner job idempotency key reused with different payload", { httpStatus: 409 });
return record;
}
async saveRunnerJob(input: SaveRunnerJobInput): Promise<RunnerJobRecord> {
return this.withTransaction(async (client) => {
await this.requireRunForUpdate(client, input.runId);
if (input.idempotencyKey) {
const existing = await client.query("SELECT * FROM agentrun_runner_jobs WHERE run_id = $1 AND idempotency_key = $2 FOR UPDATE", [input.runId, input.idempotencyKey]);
if (existing.rows[0]) {
const record = runnerJobFromRow(existing.rows[0]);
if (record.payloadHash !== input.payloadHash) throw new AgentRunError("schema-invalid", "runner job idempotency key reused with different payload", { httpStatus: 409 });
return record;
}
}
const at = nowIso();
const record: RunnerJobRecord = { ...input, id: newId("rjob"), idempotencyKey: input.idempotencyKey ?? null, serviceAccountName: input.serviceAccountName ?? null, createdAt: at, updatedAt: at };
const inserted = await client.query(
`INSERT INTO agentrun_runner_jobs (id, run_id, command_id, idempotency_key, payload_hash, attempt_id, runner_id, namespace, job_name, manager_url, image, source_commit, service_account_name, result, created_at, updated_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14::jsonb, $15, $16)
RETURNING *`,
[record.id, record.runId, record.commandId, record.idempotencyKey, record.payloadHash, record.attemptId, record.runnerId, record.namespace, record.jobName, record.managerUrl, record.image, record.sourceCommit, record.serviceAccountName, JSON.stringify(record.result), record.createdAt, record.updatedAt],
);
return runnerJobFromRow(inserted.rows[0]);
});
}
async claimRun(runId: string, runnerId: string, leaseMs: number): Promise<RunRecord> {
return this.withTransaction(async (client) => {
const run = await this.requireRunForUpdate(client, runId);
if (run.claimedBy && run.claimedBy !== runnerId && !isTerminalStatus(run.status)) throw new AgentRunError("runner-lease-conflict", `run ${runId} is already claimed`, { httpStatus: 409 });
if (isTerminalRunStatus(run.status)) throw new AgentRunError(run.failureKind ?? (run.status === "cancelled" ? "cancelled" : "schema-invalid"), `run ${runId} is already terminal: ${run.status}`, { httpStatus: 409 });
if (run.claimedBy && run.claimedBy !== runnerId) throw new AgentRunError("runner-lease-conflict", `run ${runId} is already claimed`, { httpStatus: 409 });
const leaseExpiresAt = new Date(Date.now() + leaseMs).toISOString();
const updated = await client.query(
`UPDATE agentrun_runs SET status = $2, claimed_by = $3, lease_expires_at = $4, updated_at = $5 WHERE id = $1 RETURNING *`,
@@ -308,6 +403,7 @@ CREATE TABLE IF NOT EXISTS agentrun_schema_migrations (
async heartbeat(runId: string, runnerId: string, leaseMs: number): Promise<RunRecord> {
return this.withTransaction(async (client) => {
const run = await this.requireRunForUpdate(client, runId);
if (isTerminalRunStatus(run.status)) return run;
if (run.claimedBy !== runnerId) throw new AgentRunError("runner-lease-conflict", `run ${runId} is not claimed by ${runnerId}`, { httpStatus: 409 });
const leaseExpiresAt = new Date(Date.now() + leaseMs).toISOString();
const updated = await client.query("UPDATE agentrun_runs SET lease_expires_at = $2, updated_at = $3 WHERE id = $1 RETURNING *", [runId, leaseExpiresAt, nowIso()]);
@@ -318,10 +414,15 @@ CREATE TABLE IF NOT EXISTS agentrun_schema_migrations (
}
async ackCommand(commandId: string): Promise<CommandRecord> {
const result = await this.pool.query("UPDATE agentrun_commands SET state = $2, acknowledged_at = $3, updated_at = $3 WHERE id = $1 RETURNING *", [commandId, "acknowledged", nowIso()]);
const row = result.rows[0];
if (!row) throw new AgentRunError("schema-invalid", `command ${commandId} was not found`, { httpStatus: 404 });
return commandFromRow(row);
return this.withTransaction(async (client) => {
const existing = await client.query("SELECT * FROM agentrun_commands WHERE id = $1 FOR UPDATE", [commandId]);
const row = existing.rows[0];
if (!row) throw new AgentRunError("schema-invalid", `command ${commandId} was not found`, { httpStatus: 404 });
const command = commandFromRow(row);
if (isTerminalCommandState(command.state) || command.state === "acknowledged") return command;
const result = await client.query("UPDATE agentrun_commands SET state = $2, acknowledged_at = $3, updated_at = $3 WHERE id = $1 RETURNING *", [commandId, "acknowledged", nowIso()]);
return commandFromRow(result.rows[0]);
});
}
async finishCommand(commandId: string, result: Pick<BackendTurnResult, "terminalStatus" | "failureKind" | "failureMessage">): Promise<CommandRecord> {
@@ -330,6 +431,7 @@ CREATE TABLE IF NOT EXISTS agentrun_schema_migrations (
const row = existing.rows[0];
if (!row) throw new AgentRunError("schema-invalid", `command ${commandId} was not found`, { httpStatus: 404 });
const command = commandFromRow(row);
if (isTerminalCommandState(command.state)) return command;
const state = commandStateFromTerminal(result.terminalStatus);
const updated = await client.query("UPDATE agentrun_commands SET state = $2, updated_at = $3 WHERE id = $1 RETURNING *", [commandId, state, nowIso()]);
await this.appendEventWithLockedRun(client, command.runId, "backend_status", { phase: "command-terminal", commandId, state, terminalStatus: result.terminalStatus, failureKind: result.failureKind });
@@ -344,19 +446,67 @@ CREATE TABLE IF NOT EXISTS agentrun_schema_migrations (
});
}
async finishRun(runId: string, result: Pick<BackendTurnResult, "terminalStatus" | "failureKind" | "failureMessage">): Promise<RunRecord> {
async finishRun(runId: string, result: Pick<BackendTurnResult, "terminalStatus" | "failureKind" | "failureMessage" | "threadId" | "turnId">): Promise<RunRecord> {
return this.withTransaction(async (client) => {
await this.requireRunForUpdate(client, runId);
const existing = await this.requireRunForUpdate(client, runId);
if (isTerminalRunStatus(existing.status)) return existing;
const status = statusFromTerminal(result.terminalStatus);
const updated = await client.query(
`UPDATE agentrun_runs SET status = $2, terminal_status = $3, failure_kind = $4, failure_message = $5, updated_at = $6 WHERE id = $1 RETURNING *`,
[runId, status, result.terminalStatus, result.failureKind, result.failureMessage, nowIso()],
);
const run = runFromRow(updated.rows[0]);
if (result.threadId && run.sessionRef?.sessionId) await this.upsertSessionThread(client, run, result.threadId, result.turnId ?? null);
await this.appendEventWithLockedRun(client, runId, "terminal_status", { terminalStatus: result.terminalStatus, failureKind: result.failureKind, message: result.failureMessage });
return run;
});
}
async cancelRun(runId: string, reason = "cancel requested"): Promise<RunRecord> {
return this.withTransaction(async (client) => {
const run = await this.requireRunForUpdate(client, runId);
if (isTerminalRunStatus(run.status)) return run;
const at = nowIso();
const commands = await client.query("SELECT * FROM agentrun_commands WHERE run_id = $1 AND state NOT IN ('completed', 'failed', 'cancelled') FOR UPDATE", [runId]);
for (const row of commands.rows) {
const command = commandFromRow(row);
await client.query("UPDATE agentrun_commands SET state = 'cancelled', updated_at = $2 WHERE id = $1", [command.id, at]);
await this.appendEventWithLockedRun(client, runId, "backend_status", { phase: "command-terminal", commandId: command.id, state: "cancelled", terminalStatus: "cancelled", failureKind: "cancelled" });
}
await this.appendEventWithLockedRun(client, runId, "backend_status", { phase: "cancel-requested", reason });
const updated = await client.query(
`UPDATE agentrun_runs SET status = 'cancelled', terminal_status = 'cancelled', failure_kind = 'cancelled', failure_message = $2, updated_at = $3 WHERE id = $1 RETURNING *`,
[runId, reason, at],
);
await this.appendEventWithLockedRun(client, runId, "terminal_status", { terminalStatus: "cancelled", failureKind: "cancelled", message: reason });
return runFromRow(updated.rows[0]);
});
}
async cancelCommand(commandId: string, reason = "cancel requested"): Promise<CommandRecord> {
return this.withTransaction(async (client) => {
const existing = await client.query("SELECT * FROM agentrun_commands WHERE id = $1 FOR UPDATE", [commandId]);
const row = existing.rows[0];
if (!row) throw new AgentRunError("schema-invalid", `command ${commandId} was not found`, { httpStatus: 404 });
const command = commandFromRow(row);
if (isTerminalCommandState(command.state)) return command;
const updated = await client.query("UPDATE agentrun_commands SET state = 'cancelled', updated_at = $2 WHERE id = $1 RETURNING *", [commandId, nowIso()]);
await this.appendEventWithLockedRun(client, command.runId, "backend_status", { phase: "command-terminal", commandId, state: "cancelled", terminalStatus: "cancelled", failureKind: "cancelled" });
const run = await this.requireRunForUpdate(client, command.runId);
if (!isTerminalRunStatus(run.status)) {
await this.appendEventWithLockedRun(client, command.runId, "backend_status", { phase: "cancel-requested", reason });
await client.query(`UPDATE agentrun_runs SET status = 'cancelled', terminal_status = 'cancelled', failure_kind = 'cancelled', failure_message = $2, updated_at = $3 WHERE id = $1`, [command.runId, reason, nowIso()]);
await this.appendEventWithLockedRun(client, command.runId, "terminal_status", { terminalStatus: "cancelled", failureKind: "cancelled", message: reason });
}
return commandFromRow(updated.rows[0]);
});
}
async getSession(sessionId: string): Promise<SessionRecord | null> {
const result = await this.pool.query("SELECT * FROM agentrun_sessions WHERE session_id = $1", [sessionId]);
return result.rows[0] ? sessionFromRow(result.rows[0]) : null;
}
async backends(): Promise<JsonRecord[]> {
const result = await this.pool.query("SELECT * FROM agentrun_backends ORDER BY profile ASC");
return result.rows.map((row) => ({ profile: stringValue(row.profile), ...jsonRecord(row.capabilities), capacity: jsonValue(row.capacity), health: jsonValue(row.health), updatedAt: nullableIso(row.updated_at) }));
@@ -385,6 +535,67 @@ CREATE TABLE IF NOT EXISTS agentrun_schema_migrations (
return runFromRow(row);
}
private async resolveSessionForRun(client: PoolClient, input: CreateRunInput, at: string): Promise<SessionRef | null> {
if (!input.sessionRef) return null;
const existing = await client.query("SELECT * FROM agentrun_sessions WHERE session_id = $1 FOR UPDATE", [input.sessionRef.sessionId]);
if (existing.rows[0]) return sessionRefFromRecord(sessionFromRow(existing.rows[0]), input.sessionRef);
const record: SessionRecord = {
sessionId: input.sessionRef.sessionId,
tenantId: input.tenantId,
projectId: input.projectId,
backendProfile: input.backendProfile,
conversationId: input.sessionRef.conversationId ?? null,
threadId: input.sessionRef.threadId ?? null,
metadata: input.sessionRef.metadata ?? {},
createdAt: at,
updatedAt: at,
expiresAt: input.sessionRef.expiresAt ?? null,
};
await client.query(
`INSERT INTO agentrun_sessions (session_id, tenant_id, project_id, backend_profile, conversation_id, thread_id, metadata, created_at, updated_at, expires_at)
VALUES ($1, $2, $3, $4, $5, $6, $7::jsonb, $8, $9, $10)`,
[record.sessionId, record.tenantId, record.projectId, record.backendProfile, record.conversationId, record.threadId, JSON.stringify(record.metadata), record.createdAt, record.updatedAt, record.expiresAt],
);
return sessionRefFromRecord(record, input.sessionRef);
}
private async upsertSessionThread(client: PoolClient, run: RunRecord, threadId: string, turnId: string | null): Promise<void> {
if (!run.sessionRef?.sessionId) return;
const at = nowIso();
const existingResult = await client.query("SELECT * FROM agentrun_sessions WHERE session_id = $1 FOR UPDATE", [run.sessionRef.sessionId]);
const existing = existingResult.rows[0] ? sessionFromRow(existingResult.rows[0]) : null;
const metadata = { ...(existing?.metadata ?? {}), ...(run.sessionRef.metadata ?? {}), ...(turnId ? { lastTurnId: turnId } : {}) };
const record: SessionRecord = {
sessionId: run.sessionRef.sessionId,
tenantId: run.tenantId,
projectId: run.projectId,
backendProfile: run.backendProfile,
conversationId: run.sessionRef.conversationId ?? existing?.conversationId ?? null,
threadId,
metadata,
createdAt: existing?.createdAt ?? at,
updatedAt: at,
expiresAt: run.sessionRef.expiresAt ?? existing?.expiresAt ?? null,
};
await client.query(
`INSERT INTO agentrun_sessions (session_id, tenant_id, project_id, backend_profile, conversation_id, thread_id, metadata, created_at, updated_at, expires_at)
VALUES ($1, $2, $3, $4, $5, $6, $7::jsonb, $8, $9, $10)
ON CONFLICT (session_id) DO UPDATE SET
tenant_id = EXCLUDED.tenant_id,
project_id = EXCLUDED.project_id,
backend_profile = EXCLUDED.backend_profile,
conversation_id = EXCLUDED.conversation_id,
thread_id = EXCLUDED.thread_id,
metadata = EXCLUDED.metadata,
updated_at = EXCLUDED.updated_at,
expires_at = EXCLUDED.expires_at`,
[record.sessionId, record.tenantId, record.projectId, record.backendProfile, record.conversationId, record.threadId, JSON.stringify(record.metadata), record.createdAt, record.updatedAt, record.expiresAt],
);
const nextSessionRef = sessionRefFromRecord(record, run.sessionRef);
await client.query("UPDATE agentrun_runs SET session_ref = $2::jsonb, updated_at = $3 WHERE id = $1", [run.id, JSON.stringify(nextSessionRef), at]);
await this.appendEventWithLockedRun(client, run.id, "backend_status", { phase: "session-updated", sessionRef: summarizeSessionRef(nextSessionRef), turnId });
}
private async withTransaction<T>(fn: (client: PoolClient) => Promise<T>): Promise<T> {
const client = await this.pool.connect();
try {
@@ -419,6 +630,8 @@ function runFromRow(row: QueryResultRow): RunRecord {
tenantId: stringValue(row.tenant_id),
projectId: stringValue(row.project_id),
workspaceRef: jsonRecord(row.workspace_ref) as RunRecord["workspaceRef"],
sessionRef: jsonValue(row.session_ref) as RunRecord["sessionRef"],
resourceBundleRef: jsonValue(row.resource_bundle_ref) as RunRecord["resourceBundleRef"],
providerId: stringValue(row.provider_id),
backendProfile: stringValue(row.backend_profile) as BackendProfile,
executionPolicy: jsonRecord(row.execution_policy) as RunRecord["executionPolicy"],
@@ -468,15 +681,47 @@ function runnerFromRow(row: QueryResultRow): RunnerRecord {
};
}
function sessionFromRow(row: QueryResultRow): SessionRecord {
return {
sessionId: stringValue(row.session_id),
tenantId: stringValue(row.tenant_id),
projectId: stringValue(row.project_id),
backendProfile: stringValue(row.backend_profile) as BackendProfile,
conversationId: nullableString(row.conversation_id),
threadId: nullableString(row.thread_id),
metadata: jsonRecord(row.metadata),
createdAt: iso(row.created_at),
updatedAt: iso(row.updated_at),
expiresAt: nullableIso(row.expires_at),
};
}
function runnerJobFromRow(row: QueryResultRow): RunnerJobRecord {
return {
id: stringValue(row.id),
runId: stringValue(row.run_id),
commandId: stringValue(row.command_id),
idempotencyKey: nullableString(row.idempotency_key),
payloadHash: stringValue(row.payload_hash),
attemptId: stringValue(row.attempt_id),
runnerId: stringValue(row.runner_id),
namespace: stringValue(row.namespace),
jobName: stringValue(row.job_name),
managerUrl: stringValue(row.manager_url),
image: stringValue(row.image),
sourceCommit: stringValue(row.source_commit),
serviceAccountName: nullableString(row.service_account_name),
result: jsonRecord(row.result),
createdAt: iso(row.created_at),
updatedAt: iso(row.updated_at),
};
}
function metadataForRunner(runner: RunnerRecord): JsonRecord {
const { id: _id, runId: _runId, attemptId: _attemptId, backendProfile: _backendProfile, placement: _placement, sourceCommit: _sourceCommit, registeredAt: _registeredAt, heartbeatAt: _heartbeatAt, ...metadata } = runner;
return redactJson(metadata);
}
function isTerminalStatus(status: RunRecord["status"]): boolean {
return status === "completed" || status === "failed" || status === "cancelled";
}
function stringValue(value: unknown): string {
return typeof value === "string" ? value : String(value ?? "");
}
+137
View File
@@ -0,0 +1,137 @@
import type { AgentRunStore } from "./store.js";
import type { CommandRecord, JsonRecord, JsonValue, RunEvent, RunRecord, RunnerJobRecord, TerminalStatus } from "../common/types.js";
export async function buildRunResult(store: AgentRunStore, runId: string, commandId?: string): Promise<JsonRecord> {
const run = await store.getRun(runId);
const command = await selectCommand(store, runId, commandId);
const events = await store.listEvents(runId, 0, 500);
const jobs = await store.listRunnerJobs(runId, command?.id);
const latestJob = jobs.at(-1) ?? null;
const terminal = terminalFromEvents(events) ?? run.terminalStatus;
const failureKind = run.failureKind ?? failureKindFromEvents(events);
const reply = assistantReply(events);
const blocker = terminal === "blocked" || terminal === "failed" ? { failureKind, message: run.failureMessage ?? messageFromEvents(events) } : null;
return {
runId: run.id,
commandId: command?.id ?? commandId ?? null,
attemptId: latestJob?.attemptId ?? attemptFromEvents(events),
runnerId: latestJob?.runnerId ?? null,
jobName: latestJob?.jobName ?? null,
namespace: latestJob?.namespace ?? null,
status: command?.state ?? run.status,
runStatus: run.status,
commandState: command?.state ?? null,
terminalStatus: terminal,
reply,
failureKind,
failureMessage: run.failureMessage ?? messageFromEvents(events),
blocker,
lastSeq: events.at(-1)?.seq ?? 0,
eventCount: events.length,
artifactSummary: artifactSummary(events),
sessionRef: sessionSummary(run),
resourceBundleRef: resourceBundleSummary(run, events),
runnerJobCount: jobs.length,
};
}
async function selectCommand(store: AgentRunStore, runId: string, commandId?: string): Promise<CommandRecord | null> {
if (commandId) {
const command = await store.getCommand(commandId);
return command.runId === runId ? command : null;
}
const commands = await store.listCommands(runId, 0, 100);
return commands.at(-1) ?? null;
}
function terminalFromEvents(events: RunEvent[]): TerminalStatus | null {
for (const event of [...events].reverse()) {
if (event.type !== "terminal_status") continue;
const value = event.payload.terminalStatus;
if (value === "completed" || value === "failed" || value === "blocked" || value === "cancelled") return value;
}
return null;
}
function failureKindFromEvents(events: RunEvent[]): string | null {
for (const event of [...events].reverse()) {
const value = event.payload.failureKind;
if (typeof value === "string") return value;
}
return null;
}
function messageFromEvents(events: RunEvent[]): string | null {
for (const event of [...events].reverse()) {
const value = event.payload.message;
if (typeof value === "string" && value.length > 0) return value;
}
return null;
}
function assistantReply(events: RunEvent[]): string {
return events
.filter((event) => event.type === "assistant_message")
.map((event) => textPayload(event.payload))
.filter((text) => text.length > 0)
.join("");
}
function textPayload(payload: JsonRecord): string {
for (const key of ["text", "content", "delta"]) {
const value = payload[key];
if (typeof value === "string") return value;
}
return "";
}
function artifactSummary(events: RunEvent[]): JsonRecord {
let commandOutputEvents = 0;
let diffEvents = 0;
let toolCallEvents = 0;
let outputChars = 0;
let truncatedEvents = 0;
for (const event of events) {
if (event.type === "command_output") {
commandOutputEvents += 1;
const text = textPayload(event.payload);
outputChars += text.length;
if (event.payload.truncated === true) truncatedEvents += 1;
}
if (event.type === "diff") diffEvents += 1;
if (event.type === "tool_call") toolCallEvents += 1;
}
return { commandOutputEvents, diffEvents, toolCallEvents, outputChars, truncatedEvents };
}
function attemptFromEvents(events: RunEvent[]): string | null {
for (const event of [...events].reverse()) {
const value = event.payload.attemptId;
if (typeof value === "string") return value;
}
return null;
}
function sessionSummary(run: RunRecord): JsonRecord | null {
if (!run.sessionRef) return null;
return {
sessionId: run.sessionRef.sessionId,
conversationId: run.sessionRef.conversationId ?? null,
threadId: run.sessionRef.threadId ?? null,
expiresAt: run.sessionRef.expiresAt ?? null,
metadataKeys: Object.keys(run.sessionRef.metadata ?? {}).sort(),
valuesPrinted: false,
};
}
function resourceBundleSummary(run: RunRecord, events: RunEvent[]): JsonRecord | null {
if (!run.resourceBundleRef) return null;
const materialized = events.find((event) => event.type === "backend_status" && event.payload.phase === "resource-bundle-materialized")?.payload ?? null;
return {
kind: run.resourceBundleRef.kind,
repoUrl: run.resourceBundleRef.repoUrl,
commitId: run.resourceBundleRef.commitId,
subdir: run.resourceBundleRef.subdir ?? null,
materialized: materialized as JsonValue,
};
}
+24 -1
View File
@@ -7,6 +7,7 @@ import { AgentRunError, errorToJson } from "../common/errors.js";
import { asRecord, validateCreateCommand, validateCreateRun } from "../common/validation.js";
import type { ApiErrorBody, ApiOkBody, JsonRecord, JsonValue, RunEvent } from "../common/types.js";
import { createKubernetesRunnerJob } from "./kubernetes-runner-job.js";
import { buildRunResult } from "./result.js";
export interface ManagerServerOptions {
store?: AgentRunStore;
@@ -75,6 +76,14 @@ async function route({ method, url, body, store, sourceCommit, runnerJobDefaults
const limit = integerQuery(url, "limit", 100);
return { items: await store.listEvents(eventMatch[1] ?? "", afterSeq, limit) as unknown as JsonValue };
}
const runResultMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/result$/u);
if (method === "GET" && runResultMatch) return await buildRunResult(store, runResultMatch[1] ?? "", url.searchParams.get("commandId") ?? undefined) as JsonValue;
const runCancelMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/cancel$/u);
if (method === "POST" && runCancelMatch) {
const record = body === null ? {} : asRecord(body, "cancel");
const reason = typeof record.reason === "string" && record.reason.trim().length > 0 ? record.reason.trim() : undefined;
return await store.cancelRun(runCancelMatch[1] ?? "", reason) as unknown as JsonValue;
}
const commandCreateMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/commands$/u);
if (method === "POST" && commandCreateMatch) return await store.createCommand(commandCreateMatch[1] ?? "", validateCreateCommand(body)) as unknown as JsonValue;
if (method === "GET" && commandCreateMatch) return { items: await store.listCommands(commandCreateMatch[1] ?? "", integerQuery(url, "afterSeq", 0), integerQuery(url, "limit", 20)) as unknown as JsonValue };
@@ -97,6 +106,8 @@ async function route({ method, url, body, store, sourceCommit, runnerJobDefaults
}
const commandShowMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/commands\/([^/]+)$/u);
if (method === "GET" && commandShowMatch) return await store.getCommand(commandShowMatch[2] ?? "") as unknown as JsonValue;
const commandResultMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/commands\/([^/]+)\/result$/u);
if (method === "GET" && commandResultMatch) return await buildRunResult(store, commandResultMatch[1] ?? "", commandResultMatch[2] ?? "") as JsonValue;
if (method === "POST" && path === "/api/v1/runners/register") return await store.registerRunner(asRecord(body ?? {}, "runner")) as unknown as JsonValue;
const claimMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/claim$/u);
if (method === "POST" && claimMatch) {
@@ -122,7 +133,13 @@ async function route({ method, url, body, store, sourceCommit, runnerJobDefaults
if (method === "PATCH" && statusMatch) {
const record = asRecord(body, "status");
const terminalStatus = record.terminalStatus === "completed" || record.terminalStatus === "failed" || record.terminalStatus === "blocked" || record.terminalStatus === "cancelled" ? record.terminalStatus : "failed";
return await store.finishRun(statusMatch[1] ?? "", { terminalStatus, failureKind: typeof record.failureKind === "string" ? record.failureKind as never : null, failureMessage: typeof record.failureMessage === "string" ? record.failureMessage : null }) as unknown as JsonValue;
return await store.finishRun(statusMatch[1] ?? "", {
terminalStatus,
failureKind: typeof record.failureKind === "string" ? record.failureKind as never : null,
failureMessage: typeof record.failureMessage === "string" ? record.failureMessage : null,
...(typeof record.threadId === "string" ? { threadId: record.threadId } : {}),
...(typeof record.turnId === "string" ? { turnId: record.turnId } : {}),
}) as unknown as JsonValue;
}
const ackMatch = path.match(/^\/api\/v1\/commands\/([^/]+)\/ack$/u);
if (method === "POST" && ackMatch) return await store.ackCommand(ackMatch[1] ?? "") as unknown as JsonValue;
@@ -132,6 +149,12 @@ async function route({ method, url, body, store, sourceCommit, runnerJobDefaults
const terminalStatus = record.terminalStatus === "completed" || record.terminalStatus === "failed" || record.terminalStatus === "blocked" || record.terminalStatus === "cancelled" ? record.terminalStatus : "failed";
return await store.finishCommand(commandStatusMatch[1] ?? "", { terminalStatus, failureKind: typeof record.failureKind === "string" ? record.failureKind as never : null, failureMessage: typeof record.failureMessage === "string" ? record.failureMessage : null }) as unknown as JsonValue;
}
const commandCancelMatch = path.match(/^\/api\/v1\/commands\/([^/]+)\/cancel$/u);
if (method === "POST" && commandCancelMatch) {
const record = body === null ? {} : asRecord(body, "cancel");
const reason = typeof record.reason === "string" && record.reason.trim().length > 0 ? record.reason.trim() : undefined;
return await store.cancelCommand(commandCancelMatch[1] ?? "", reason) as unknown as JsonValue;
}
throw new AgentRunError("schema-invalid", `unsupported route ${method} ${path}`, { httpStatus: 404 });
}
+178 -6
View File
@@ -1,4 +1,4 @@
import type { BackendProfile, BackendTurnResult, CommandRecord, CreateCommandInput, CreateRunInput, FailureKind, JsonRecord, RunEvent, RunnerRecord, RunRecord, TerminalStatus } from "../common/types.js";
import type { BackendProfile, BackendTurnResult, CommandRecord, CreateCommandInput, CreateRunInput, FailureKind, JsonRecord, RunEvent, RunnerJobRecord, RunnerRecord, RunRecord, SessionRecord, SessionRef, TerminalStatus } from "../common/types.js";
import { AgentRunError } from "../common/errors.js";
import { newId, nowIso, stableHash } from "../common/validation.js";
import { redactJson } from "../common/redaction.js";
@@ -26,16 +26,38 @@ export interface AgentRunStore {
getCommand(commandId: string): MaybePromise<CommandRecord>;
listCommands(runId: string, afterSeq: number, limit: number): MaybePromise<CommandRecord[]>;
registerRunner(input: Partial<RunnerRecord>): MaybePromise<RunnerRecord>;
listRunnerJobs(runId: string, commandId?: string): MaybePromise<RunnerJobRecord[]>;
getRunnerJobByIdempotencyKey(runId: string, idempotencyKey: string, payloadHash: string): MaybePromise<RunnerJobRecord | null>;
saveRunnerJob(input: SaveRunnerJobInput): MaybePromise<RunnerJobRecord>;
claimRun(runId: string, runnerId: string, leaseMs: number): MaybePromise<RunRecord>;
heartbeat(runId: string, runnerId: string, leaseMs: number): MaybePromise<RunRecord>;
ackCommand(commandId: string): MaybePromise<CommandRecord>;
finishCommand(commandId: string, result: Pick<BackendTurnResult, "terminalStatus" | "failureKind" | "failureMessage">): MaybePromise<CommandRecord>;
appendEvent(runId: string, type: RunEvent["type"], payload: JsonRecord): MaybePromise<RunEvent>;
finishRun(runId: string, result: Pick<BackendTurnResult, "terminalStatus" | "failureKind" | "failureMessage">): MaybePromise<RunRecord>;
finishRun(runId: string, result: Pick<BackendTurnResult, "terminalStatus" | "failureKind" | "failureMessage" | "threadId" | "turnId">): MaybePromise<RunRecord>;
cancelRun(runId: string, reason?: string): MaybePromise<RunRecord>;
cancelCommand(commandId: string, reason?: string): MaybePromise<CommandRecord>;
getSession(sessionId: string): MaybePromise<SessionRecord | null>;
backends(): MaybePromise<JsonRecord[]>;
close?(): MaybePromise<void>;
}
export interface SaveRunnerJobInput {
runId: string;
commandId: string;
idempotencyKey?: string | null;
payloadHash: string;
attemptId: string;
runnerId: string;
namespace: string;
jobName: string;
managerUrl: string;
image: string;
sourceCommit: string;
serviceAccountName?: string | null;
result: JsonRecord;
}
export async function openAgentRunStoreFromEnv(env: NodeJS.ProcessEnv = process.env): Promise<AgentRunStore> {
const databaseUrl = env.DATABASE_URL?.trim();
if (databaseUrl) {
@@ -52,6 +74,8 @@ export class MemoryAgentRunStore implements AgentRunStore {
private readonly commands = new Map<string, CommandRecord>();
private readonly eventsByRun = new Map<string, RunEvent[]>();
private readonly runners = new Map<string, RunnerRecord>();
private readonly sessions = new Map<string, SessionRecord>();
private readonly runnerJobs = new Map<string, RunnerJobRecord>();
health(): StoreHealth {
return { adapter: "memory-self-test", ready: true, reachable: true, migrationReady: true, migrationId: "memory-self-test", failureKind: null, message: null, credentialValuesPrinted: false };
@@ -59,10 +83,11 @@ export class MemoryAgentRunStore implements AgentRunStore {
createRun(input: CreateRunInput): RunRecord {
const at = nowIso();
const run: RunRecord = { ...input, id: newId("run"), status: "pending", terminalStatus: null, failureKind: null, failureMessage: null, createdAt: at, updatedAt: at, claimedBy: null, leaseExpiresAt: null };
const sessionRef = this.resolveSessionForRun(input, at);
const run: RunRecord = { ...input, sessionRef, resourceBundleRef: input.resourceBundleRef ?? null, id: newId("run"), status: "pending", terminalStatus: null, failureKind: null, failureMessage: null, createdAt: at, updatedAt: at, claimedBy: null, leaseExpiresAt: null };
this.runs.set(run.id, run);
this.eventsByRun.set(run.id, []);
this.appendEvent(run.id, "backend_status", { phase: "run-created", backendProfile: run.backendProfile });
this.appendEvent(run.id, "backend_status", { phase: "run-created", backendProfile: run.backendProfile, sessionRef: summarizeSessionRef(run.sessionRef ?? null), resourceBundleRef: summarizeResourceBundleRef(run.resourceBundleRef ?? null) });
return run;
}
@@ -113,9 +138,35 @@ export class MemoryAgentRunStore implements AgentRunStore {
return runner;
}
listRunnerJobs(runId: string, commandId?: string): RunnerJobRecord[] {
this.getRun(runId);
return Array.from(this.runnerJobs.values())
.filter((job) => job.runId === runId && (!commandId || job.commandId === commandId))
.sort((a, b) => a.createdAt.localeCompare(b.createdAt));
}
getRunnerJobByIdempotencyKey(runId: string, idempotencyKey: string, payloadHash: string): RunnerJobRecord | null {
const existing = Array.from(this.runnerJobs.values()).find((job) => job.runId === runId && job.idempotencyKey === idempotencyKey);
if (!existing) return null;
if (existing.payloadHash !== payloadHash) throw new AgentRunError("schema-invalid", "runner job idempotency key reused with different payload", { httpStatus: 409 });
return existing;
}
saveRunnerJob(input: SaveRunnerJobInput): RunnerJobRecord {
if (input.idempotencyKey) {
const existing = this.getRunnerJobByIdempotencyKey(input.runId, input.idempotencyKey, input.payloadHash);
if (existing) return existing;
}
const at = nowIso();
const record: RunnerJobRecord = { ...input, id: newId("rjob"), idempotencyKey: input.idempotencyKey ?? null, serviceAccountName: input.serviceAccountName ?? null, createdAt: at, updatedAt: at };
this.runnerJobs.set(record.id, record);
return record;
}
claimRun(runId: string, runnerId: string, leaseMs: number): RunRecord {
const run = this.getRun(runId);
if (run.claimedBy && run.claimedBy !== runnerId && run.status !== "completed" && run.status !== "failed" && run.status !== "cancelled") throw new AgentRunError("runner-lease-conflict", `run ${runId} is already claimed`, { httpStatus: 409 });
if (isTerminalRunStatus(run.status)) throw new AgentRunError(run.failureKind ?? (run.status === "cancelled" ? "cancelled" : "schema-invalid"), `run ${runId} is already terminal: ${run.status}`, { httpStatus: 409 });
if (run.claimedBy && run.claimedBy !== runnerId) throw new AgentRunError("runner-lease-conflict", `run ${runId} is already claimed`, { httpStatus: 409 });
const next = this.updateRun(runId, { status: "claimed", claimedBy: runnerId, leaseExpiresAt: new Date(Date.now() + leaseMs).toISOString() });
this.appendEvent(runId, "backend_status", { phase: "run-claimed", runnerId });
return next;
@@ -123,12 +174,14 @@ export class MemoryAgentRunStore implements AgentRunStore {
heartbeat(runId: string, runnerId: string, leaseMs: number): RunRecord {
const run = this.getRun(runId);
if (isTerminalRunStatus(run.status)) return run;
if (run.claimedBy !== runnerId) throw new AgentRunError("runner-lease-conflict", `run ${runId} is not claimed by ${runnerId}`, { httpStatus: 409 });
return this.updateRun(runId, { leaseExpiresAt: new Date(Date.now() + leaseMs).toISOString() });
}
ackCommand(commandId: string): CommandRecord {
const command = this.getCommand(commandId);
if (isTerminalCommandState(command.state) || command.state === "acknowledged") return command;
const next = { ...command, state: "acknowledged" as const, acknowledgedAt: nowIso(), updatedAt: nowIso() };
this.commands.set(commandId, next);
return next;
@@ -136,6 +189,7 @@ export class MemoryAgentRunStore implements AgentRunStore {
finishCommand(commandId: string, result: Pick<BackendTurnResult, "terminalStatus" | "failureKind" | "failureMessage">): CommandRecord {
const command = this.getCommand(commandId);
if (isTerminalCommandState(command.state)) return command;
const next = { ...command, state: commandStateFromTerminal(result.terminalStatus), updatedAt: nowIso() };
this.commands.set(commandId, next);
this.appendEvent(command.runId, "backend_status", { phase: "command-terminal", commandId, state: next.state, terminalStatus: result.terminalStatus, failureKind: result.failureKind });
@@ -151,13 +205,45 @@ export class MemoryAgentRunStore implements AgentRunStore {
return event;
}
finishRun(runId: string, result: Pick<BackendTurnResult, "terminalStatus" | "failureKind" | "failureMessage">): RunRecord {
finishRun(runId: string, result: Pick<BackendTurnResult, "terminalStatus" | "failureKind" | "failureMessage" | "threadId" | "turnId">): RunRecord {
const existing = this.getRun(runId);
if (isTerminalRunStatus(existing.status)) return existing;
const status = statusFromTerminal(result.terminalStatus);
const next = this.updateRun(runId, { status, terminalStatus: result.terminalStatus, failureKind: result.failureKind, failureMessage: result.failureMessage });
if (result.threadId && next.sessionRef?.sessionId) this.upsertSessionThread(next, result.threadId, result.turnId ?? null);
this.appendEvent(runId, "terminal_status", { terminalStatus: result.terminalStatus, failureKind: result.failureKind, message: result.failureMessage });
return next;
}
cancelRun(runId: string, reason = "cancel requested"): RunRecord {
const run = this.getRun(runId);
if (isTerminalRunStatus(run.status)) return run;
const at = nowIso();
for (const command of Array.from(this.commands.values()).filter((item) => item.runId === runId && !isTerminalCommandState(item.state))) {
const cancelled = { ...command, state: "cancelled" as const, updatedAt: at };
this.commands.set(command.id, cancelled);
this.appendEvent(runId, "backend_status", { phase: "command-terminal", commandId: command.id, state: "cancelled", terminalStatus: "cancelled", failureKind: "cancelled" });
}
this.appendEvent(runId, "backend_status", { phase: "cancel-requested", reason });
const next = this.updateRun(runId, { status: "cancelled", terminalStatus: "cancelled", failureKind: "cancelled", failureMessage: reason });
this.appendEvent(runId, "terminal_status", { terminalStatus: "cancelled", failureKind: "cancelled", message: reason });
return next;
}
cancelCommand(commandId: string, reason = "cancel requested"): CommandRecord {
const command = this.getCommand(commandId);
if (isTerminalCommandState(command.state)) return command;
const next = { ...command, state: "cancelled" as const, updatedAt: nowIso() };
this.commands.set(commandId, next);
this.appendEvent(command.runId, "backend_status", { phase: "command-terminal", commandId, state: "cancelled", terminalStatus: "cancelled", failureKind: "cancelled" });
this.cancelRun(command.runId, reason);
return next;
}
getSession(sessionId: string): SessionRecord | null {
return this.sessions.get(sessionId) ?? null;
}
backends(): JsonRecord[] {
return backendCapabilities();
}
@@ -168,6 +254,48 @@ export class MemoryAgentRunStore implements AgentRunStore {
this.runs.set(runId, next);
return next;
}
private resolveSessionForRun(input: CreateRunInput, at: string): SessionRef | null {
if (!input.sessionRef) return null;
const existing = this.sessions.get(input.sessionRef.sessionId);
if (existing) return sessionRefFromRecord(existing, input.sessionRef);
const record: SessionRecord = {
sessionId: input.sessionRef.sessionId,
tenantId: input.tenantId,
projectId: input.projectId,
backendProfile: input.backendProfile,
conversationId: input.sessionRef.conversationId ?? null,
threadId: input.sessionRef.threadId ?? null,
metadata: input.sessionRef.metadata ?? {},
createdAt: at,
updatedAt: at,
expiresAt: input.sessionRef.expiresAt ?? null,
};
this.sessions.set(record.sessionId, record);
return sessionRefFromRecord(record, input.sessionRef);
}
private upsertSessionThread(run: RunRecord, threadId: string, turnId: string | null): void {
if (!run.sessionRef?.sessionId) return;
const at = nowIso();
const existing = this.sessions.get(run.sessionRef.sessionId);
const record: SessionRecord = {
sessionId: run.sessionRef.sessionId,
tenantId: run.tenantId,
projectId: run.projectId,
backendProfile: run.backendProfile,
conversationId: run.sessionRef.conversationId ?? existing?.conversationId ?? null,
threadId,
metadata: { ...(existing?.metadata ?? {}), ...(run.sessionRef.metadata ?? {}), ...(turnId ? { lastTurnId: turnId } : {}) },
createdAt: existing?.createdAt ?? at,
updatedAt: at,
expiresAt: run.sessionRef.expiresAt ?? existing?.expiresAt ?? null,
};
this.sessions.set(record.sessionId, record);
const nextSessionRef = sessionRefFromRecord(record, run.sessionRef);
this.updateRun(run.id, { sessionRef: nextSessionRef });
this.appendEvent(run.id, "backend_status", { phase: "session-updated", sessionRef: summarizeSessionRef(nextSessionRef), turnId });
}
}
export function statusFromTerminal(terminalStatus: TerminalStatus): RunRecord["status"] {
@@ -182,3 +310,47 @@ export function commandStateFromTerminal(terminalStatus: TerminalStatus): Comman
if (terminalStatus === "cancelled") return "cancelled";
return "failed";
}
export function isTerminalRunStatus(status: RunRecord["status"]): boolean {
return status === "completed" || status === "failed" || status === "blocked" || status === "cancelled";
}
export function isTerminalCommandState(state: CommandRecord["state"]): boolean {
return state === "completed" || state === "failed" || state === "cancelled";
}
export function sessionRefFromRecord(record: SessionRecord, fallback: SessionRef): SessionRef {
return {
sessionId: record.sessionId,
...(record.conversationId ? { conversationId: record.conversationId } : fallback.conversationId ? { conversationId: fallback.conversationId } : {}),
...(record.threadId ? { threadId: record.threadId } : fallback.threadId ? { threadId: fallback.threadId } : {}),
...(record.expiresAt ? { expiresAt: record.expiresAt } : fallback.expiresAt ? { expiresAt: fallback.expiresAt } : {}),
...(Object.keys(record.metadata).length > 0 ? { metadata: record.metadata } : fallback.metadata ? { metadata: fallback.metadata } : {}),
};
}
export function summarizeSessionRef(sessionRef: SessionRef | null): JsonRecord | null {
if (!sessionRef) return null;
return {
sessionId: sessionRef.sessionId,
conversationId: sessionRef.conversationId ?? null,
threadId: sessionRef.threadId ?? null,
expiresAt: sessionRef.expiresAt ?? null,
metadataKeys: Object.keys(sessionRef.metadata ?? {}).sort(),
valuesPrinted: false,
};
}
export function summarizeResourceBundleRef(resourceBundleRef: RunRecord["resourceBundleRef"] | null | undefined): JsonRecord | null {
if (!resourceBundleRef) return null;
return {
kind: resourceBundleRef.kind,
repoUrl: resourceBundleRef.repoUrl,
commitId: resourceBundleRef.commitId,
subdir: resourceBundleRef.subdir ?? null,
sparsePathCount: resourceBundleRef.sparsePaths?.length ?? 0,
submodules: resourceBundleRef.submodules ?? false,
lfs: resourceBundleRef.lfs ?? false,
credentialRef: resourceBundleRef.credentialRef ? { name: resourceBundleRef.credentialRef.name, namespace: resourceBundleRef.credentialRef.namespace ?? null, keys: resourceBundleRef.credentialRef.keys ?? [], valuesPrinted: false } : null,
};
}
+3
View File
@@ -141,6 +141,9 @@ function runnerEnv(options: RunnerJobRenderOptions, context: { namespace: string
{ name: "AGENTRUN_RUNNER_ID", value: context.runnerId },
{ name: "AGENTRUN_BACKEND_PROFILE", value: options.run.backendProfile },
{ name: "AGENTRUN_EXECUTION_POLICY_JSON", value: JSON.stringify(options.run.executionPolicy) },
{ name: "AGENTRUN_SESSION_REF_JSON", value: JSON.stringify(options.run.sessionRef ?? null) },
{ name: "AGENTRUN_RESOURCE_BUNDLE_JSON", value: JSON.stringify(options.run.resourceBundleRef ?? null) },
{ name: "AGENTRUN_WORKSPACE_ROOT", value: "/home/agentrun/workspaces" },
{ name: "AGENTRUN_SOURCE_COMMIT", value: context.sourceCommit },
{ name: "AGENTRUN_RUNTIME_NAMESPACE", value: context.namespace },
{ name: "AGENTRUN_K8S_JOB_NAME", value: context.jobName },
+9 -1
View File
@@ -55,6 +55,14 @@ export class RunnerManagerApi {
return await this.client.patch(`/api/v1/runs/${encodeURIComponent(runId)}/lease`, { runnerId, leaseMs }) as RunRecord;
}
async getRun(runId: string): Promise<RunRecord> {
return await this.client.get(`/api/v1/runs/${encodeURIComponent(runId)}`) as RunRecord;
}
async getCommand(runId: string, commandId: string): Promise<CommandRecord> {
return await this.client.get(`/api/v1/runs/${encodeURIComponent(runId)}/commands/${encodeURIComponent(commandId)}`) as CommandRecord;
}
async pollCommands(runId: string, options: { afterSeq?: number; limit?: number; commandId?: string }): Promise<PollCommandsResult> {
const afterSeq = options.afterSeq ?? 0;
const limit = options.limit ?? 20;
@@ -72,7 +80,7 @@ export class RunnerManagerApi {
return await this.client.post(`/api/v1/runs/${encodeURIComponent(runId)}/events`, event as unknown as JsonRecord) as JsonRecord;
}
async reportStatus(runId: string, report: { terminalStatus: TerminalStatus; failureKind: FailureKind | null; failureMessage: string | null }): Promise<RunRecord> {
async reportStatus(runId: string, report: { terminalStatus: TerminalStatus; failureKind: FailureKind | null; failureMessage: string | null; threadId?: string; turnId?: string }): Promise<RunRecord> {
return await this.client.patch(`/api/v1/runs/${encodeURIComponent(runId)}/status`, report as unknown as JsonRecord) as RunRecord;
}
+81
View File
@@ -0,0 +1,81 @@
import { spawn } from "node:child_process";
import { mkdir, writeFile } from "node:fs/promises";
import path from "node:path";
import { AgentRunError } from "../common/errors.js";
import { redactText } from "../common/redaction.js";
import type { JsonRecord, ResourceBundleRef } from "../common/types.js";
import { stableHash } from "../common/validation.js";
export interface MaterializedResourceBundle {
workspacePath: string;
event: JsonRecord;
}
export async function materializeResourceBundle(resourceBundleRef: ResourceBundleRef | null | undefined, env: NodeJS.ProcessEnv = process.env): Promise<MaterializedResourceBundle | null> {
if (!resourceBundleRef) return null;
const workspaceRoot = path.resolve(env.AGENTRUN_WORKSPACE_ROOT ?? "/home/agentrun/workspaces");
const checkoutPath = path.join(workspaceRoot, stableHash({ repoUrl: resourceBundleRef.repoUrl, commitId: resourceBundleRef.commitId }).slice(0, 16));
await mkdir(checkoutPath, { recursive: true });
await git(["init"], checkoutPath);
await git(["remote", "remove", "origin"], checkoutPath, { allowFailure: true });
await git(["remote", "add", "origin", resourceBundleRef.repoUrl], checkoutPath);
if (resourceBundleRef.sparsePaths && resourceBundleRef.sparsePaths.length > 0) {
await git(["config", "core.sparseCheckout", "true"], checkoutPath);
await mkdir(path.join(checkoutPath, ".git", "info"), { recursive: true });
await writeFile(path.join(checkoutPath, ".git", "info", "sparse-checkout"), `${resourceBundleRef.sparsePaths.join("\n")}\n`, "utf8");
}
await git(["fetch", "--depth", "1", "origin", resourceBundleRef.commitId], checkoutPath);
await git(["checkout", "--detach", resourceBundleRef.commitId], checkoutPath);
const actualCommit = (await git(["rev-parse", "HEAD"], checkoutPath)).stdout.trim();
if (actualCommit !== resourceBundleRef.commitId) throw new AgentRunError("infra-failed", "resource bundle checkout did not land on requested commit", { httpStatus: 500, details: { expectedCommit: resourceBundleRef.commitId, actualCommit } });
const treeId = (await git(["rev-parse", "HEAD^{tree}"], checkoutPath)).stdout.trim();
const workspacePath = resolveWorkspacePath(checkoutPath, resourceBundleRef.subdir);
return {
workspacePath,
event: {
phase: "resource-bundle-materialized",
kind: "git",
repoUrl: resourceBundleRef.repoUrl,
commitId: resourceBundleRef.commitId,
treeId,
checkoutPath: pathSummary(checkoutPath),
workspacePath: pathSummary(workspacePath),
subdir: resourceBundleRef.subdir ?? null,
sparsePathCount: resourceBundleRef.sparsePaths?.length ?? 0,
valuesPrinted: false,
},
};
}
async function git(args: string[], cwd: string, options: { allowFailure?: boolean } = {}): Promise<{ stdout: string; stderr: string }> {
const child = spawn("git", args, { cwd, stdio: ["ignore", "pipe", "pipe"] });
let stdout = "";
let stderr = "";
child.stdout.setEncoding("utf8");
child.stderr.setEncoding("utf8");
child.stdout.on("data", (chunk) => { stdout += String(chunk); });
child.stderr.on("data", (chunk) => { stderr += String(chunk); });
const result = await new Promise<{ code: number | null; signal: NodeJS.Signals | null }>((resolve, reject) => {
child.on("error", reject);
child.on("close", (code, signal) => resolve({ code, signal }));
}).catch((error: unknown) => {
throw new AgentRunError("infra-failed", `failed to start git: ${error instanceof Error ? error.message : String(error)}`, { httpStatus: 503 });
});
if (result.code !== 0 && !options.allowFailure) {
throw new AgentRunError("infra-failed", `git ${args[0] ?? "command"} failed with code ${result.code}`, { httpStatus: 502, details: { stderr: redactText(stderr.slice(-4000)), stdout: redactText(stdout.slice(-1000)), signal: result.signal } });
}
return { stdout, stderr };
}
function resolveWorkspacePath(checkoutPath: string, subdir: string | undefined): string {
if (!subdir) return checkoutPath;
const resolved = path.resolve(checkoutPath, subdir);
const root = path.resolve(checkoutPath);
if (resolved !== root && !resolved.startsWith(`${root}${path.sep}`)) throw new AgentRunError("schema-invalid", "resource bundle subdir escaped checkout", { httpStatus: 400 });
return resolved;
}
function pathSummary(value: string): JsonRecord {
const parts = value.split(/[\\/]+/u).filter(Boolean);
return { absolute: path.isAbsolute(value), basename: parts.at(-1) ?? null, depth: parts.length, fingerprint: stableHash(value).slice(0, 16), valuePrinted: false };
}
+58 -5
View File
@@ -1,5 +1,6 @@
import { RunnerManagerApi, failureKindFromError, terminalStatusForFailure, errorMessage } from "./manager-api.js";
import { runBackendTurn, type BackendAdapterOptions } from "../backend/adapter.js";
import { materializeResourceBundle } from "./resource-bundle.js";
import type { BackendProfile, JsonRecord, RunRecord, RunnerRecord } from "../common/types.js";
import { AgentRunError } from "../common/errors.js";
@@ -21,6 +22,7 @@ export interface RunnerOnceOptions extends BackendAdapterOptions {
export async function runOnce(options: RunnerOnceOptions): Promise<JsonRecord> {
const api = new RunnerManagerApi(options.managerUrl);
const targetRun = await api.client.get(`/api/v1/runs/${encodeURIComponent(options.runId)}`) as RunRecord;
if (isTerminalRun(targetRun)) return { terminalStatus: targetRun.terminalStatus, failureKind: targetRun.failureKind, run: targetRun, skipped: "run-terminal" } as JsonRecord;
if (options.backendProfile && options.backendProfile !== targetRun.backendProfile) {
throw new AgentRunError("schema-invalid", `runner backendProfile ${options.backendProfile} does not match run backendProfile ${targetRun.backendProfile}`, { httpStatus: 400 });
}
@@ -55,9 +57,60 @@ export async function runOnce(options: RunnerOnceOptions): Promise<JsonRecord> {
return { runner, claimed, terminalStatus: "blocked", failureKind: "schema-invalid", polledCommands: commandsResponse.items.length };
}
await api.ackCommand(command.id);
const result = await runBackendTurn(claimed, command, options);
for (const event of result.events) await api.appendEvent(options.runId, event);
await api.reportCommandStatus(command.id, { terminalStatus: result.terminalStatus, failureKind: result.failureKind, failureMessage: result.failureMessage });
const finalRun = await api.reportStatus(options.runId, { terminalStatus: result.terminalStatus, failureKind: result.failureKind, failureMessage: result.failureMessage }) as RunRecord;
return { runner, commandId: command.id, terminalStatus: result.terminalStatus, failureKind: result.failureKind, run: finalRun } as JsonRecord;
const acked = await api.getCommand(options.runId, command.id);
if (acked.state === "cancelled") return await reportCancelled(api, options.runId, command.id, runner, claimed, "command cancelled before backend start");
await assertNotCancelled(api, options.runId, command.id);
const abortController = new AbortController();
const stopCancelWatch = watchCancellation(api, options.runId, command.id, abortController);
let workspacePath: string | undefined;
try {
const materialized = await materializeResourceBundle(claimed.resourceBundleRef ?? null, options.env ?? process.env);
if (materialized) {
workspacePath = materialized.workspacePath;
await api.appendEvent(options.runId, { type: "backend_status", payload: materialized.event });
}
await assertNotCancelled(api, options.runId, command.id);
const result = await runBackendTurn(claimed, command, { ...options, ...(workspacePath ? { workspacePath } : {}), abortSignal: abortController.signal });
for (const event of result.events) await api.appendEvent(options.runId, event);
await api.reportCommandStatus(command.id, { terminalStatus: result.terminalStatus, failureKind: result.failureKind, failureMessage: result.failureMessage });
const finalRun = await api.reportStatus(options.runId, { terminalStatus: result.terminalStatus, failureKind: result.failureKind, failureMessage: result.failureMessage, ...(result.threadId ? { threadId: result.threadId } : {}), ...(result.turnId ? { turnId: result.turnId } : {}) }) as RunRecord;
return { runner, commandId: command.id, terminalStatus: result.terminalStatus, failureKind: result.failureKind, run: finalRun } as JsonRecord;
} finally {
stopCancelWatch();
}
}
function isTerminalRun(run: RunRecord): boolean {
return run.status === "completed" || run.status === "failed" || run.status === "blocked" || run.status === "cancelled";
}
async function assertNotCancelled(api: RunnerManagerApi, runId: string, commandId: string): Promise<void> {
const [run, command] = await Promise.all([api.getRun(runId), api.getCommand(runId, commandId)]);
if (run.status === "cancelled" || command.state === "cancelled") throw new AgentRunError("cancelled", "run or command was cancelled", { httpStatus: 409 });
}
function watchCancellation(api: RunnerManagerApi, runId: string, commandId: string, controller: AbortController): () => void {
let stopped = false;
const check = async (): Promise<void> => {
if (stopped || controller.signal.aborted) return;
try {
const [run, command] = await Promise.all([api.getRun(runId), api.getCommand(runId, commandId)]);
if (run.status === "cancelled" || command.state === "cancelled") controller.abort();
} catch {
// Cancellation polling must not hide the backend's own terminal result.
}
};
const timer = setInterval(() => { void check(); }, 2_000);
void check();
return () => {
stopped = true;
clearInterval(timer);
};
}
async function reportCancelled(api: RunnerManagerApi, runId: string, commandId: string, runner: RunnerRecord, claimed: RunRecord, message: string): Promise<JsonRecord> {
await api.appendEvent(runId, { type: "terminal_status", payload: { terminalStatus: "cancelled", failureKind: "cancelled", message } });
await api.reportCommandStatus(commandId, { terminalStatus: "cancelled", failureKind: "cancelled", failureMessage: message });
const finalRun = await api.reportStatus(runId, { terminalStatus: "cancelled", failureKind: "cancelled", failureMessage: message });
return { runner, commandId, claimed, terminalStatus: "cancelled", failureKind: "cancelled", run: finalRun };
}
+3 -1
View File
@@ -13,11 +13,13 @@ const selfTest: SelfTestCase = async () => {
(error) => error instanceof AgentRunError && error.failureKind === "infra-failed" && error.message.includes("DATABASE_URL is required"),
);
const postgresContract = postgresMigrationContract();
assert.equal(postgresContract.latestMigrationId, "002_v01_backend_profiles");
assert.equal(postgresContract.latestMigrationId, "003_v01_hwlab_manual_dispatch");
assert.ok(Array.isArray(postgresContract.requiredTables));
assert.ok(postgresContract.requiredTables.includes("agentrun_schema_migrations"));
assert.ok(postgresContract.requiredTables.includes("agentrun_runs"));
assert.ok(postgresContract.requiredTables.includes("agentrun_events"));
assert.ok(postgresContract.requiredTables.includes("agentrun_sessions"));
assert.ok(postgresContract.requiredTables.includes("agentrun_runner_jobs"));
return { name: "redaction-postgres", tests: ["redaction", "postgres-store-contract"] };
};
@@ -0,0 +1,141 @@
import assert from "node:assert/strict";
import { execFile as execFileCallback } from "node:child_process";
import { promisify } from "node:util";
import { chmod, mkdir, readFile, writeFile } from "node:fs/promises";
import path from "node:path";
import { startManagerServer } from "../../mgr/server.js";
import { ManagerClient } from "../../mgr/client.js";
import { MemoryAgentRunStore } from "../../mgr/store.js";
import { runOnce } from "../../runner/run-once.js";
import type { JsonRecord } from "../../common/types.js";
import { assertNoSecretLeak, type SelfTestCase, type SelfTestContext } from "../harness.js";
const execFile = promisify(execFileCallback);
const selfTest: SelfTestCase = async (context) => {
const fakeKubectl = path.join(context.tmp, "fake-kubectl-hwlab.js");
const createdManifest = path.join(context.tmp, "created-hwlab-runner-job.json");
await writeFile(fakeKubectl, `#!/usr/bin/env bun
const chunks = [];
for await (const chunk of Bun.stdin.stream()) chunks.push(chunk);
const text = Buffer.concat(chunks.map((chunk) => Buffer.from(chunk))).toString("utf8");
await Bun.write(${JSON.stringify(createdManifest)}, text);
const manifest = JSON.parse(text);
console.log(JSON.stringify({ apiVersion: manifest.apiVersion, kind: manifest.kind, metadata: { uid: "job-uid-hwlab", resourceVersion: "1", name: manifest.metadata.name, namespace: manifest.metadata.namespace } }));
`);
await chmod(fakeKubectl, 0o755);
const store = new MemoryAgentRunStore();
const server = await startManagerServer({
port: 0,
host: "127.0.0.1",
sourceCommit: "self-test",
store,
runnerJobDefaults: {
namespace: "agentrun-v01",
managerUrl: "http://agentrun-mgr.agentrun-v01.svc.cluster.local:8080",
image: "127.0.0.1:5000/agentrun/agentrun-mgr@sha256:1111111111111111111111111111111111111111111111111111111111111111",
kubectlCommand: fakeKubectl,
},
});
try {
const client = new ManagerClient(server.baseUrl);
const bundle = await createLocalGitBundle(context);
const first = await createHwlabRun(client, context, bundle, "hwlab-session-1", "hello bundle", "hwlab-command-1");
const created = await client.post(`/api/v1/runs/${first.runId}/runner-jobs`, { commandId: first.commandId, idempotencyKey: "hwlab-trace-1" }) as JsonRecord;
const replay = await client.post(`/api/v1/runs/${first.runId}/runner-jobs`, { commandId: first.commandId, idempotencyKey: "hwlab-trace-1" }) as JsonRecord;
assert.equal(replay.idempotentReplay, true);
assert.equal(replay.jobName, created.jobName);
assert.equal(replay.attemptId, created.attemptId);
await assert.rejects(
() => client.post(`/api/v1/runs/${first.runId}/runner-jobs`, { commandId: first.commandId, idempotencyKey: "hwlab-trace-1", runnerId: "runner_changed" }),
(error) => error instanceof Error && error.message.includes("idempotency key reused"),
);
const manifest = JSON.parse(await readFile(createdManifest, "utf8")) as JsonRecord;
assert.ok(JSON.stringify(manifest).includes("AGENTRUN_RESOURCE_BUNDLE_JSON"));
assertNoSecretLeak(created);
const pendingCancel = await createHwlabRun(client, context, bundle, "hwlab-session-cancel-pending", "cancel pending", "hwlab-command-cancel-pending");
const cancelledRun = await client.post(`/api/v1/runs/${pendingCancel.runId}/cancel`, { reason: "self-test pending cancel" }) as { status?: string; terminalStatus?: string; failureKind?: string };
assert.equal(cancelledRun.status, "cancelled");
assert.equal(cancelledRun.terminalStatus, "cancelled");
assert.equal(cancelledRun.failureKind, "cancelled");
const cancelledCommand = await client.get(`/api/v1/runs/${pendingCancel.runId}/commands/${pendingCancel.commandId}`) as { state?: string };
assert.equal(cancelledCommand.state, "cancelled");
await assert.rejects(
() => client.post(`/api/v1/runs/${pendingCancel.runId}/runner-jobs`, { commandId: pendingCancel.commandId, idempotencyKey: "hwlab-cancelled-job" }),
(error) => error instanceof Error && error.message.includes("already terminal"),
);
const sessionRun = await createHwlabRun(client, context, bundle, "hwlab-session-resume", "hello session", "hwlab-command-session");
const runResult = await runOnce({ managerUrl: server.baseUrl, runId: sessionRun.runId, codexCommand: context.fakeCodexCommand, codexArgs: context.fakeCodexArgs, codexHome: context.codexHome, env: { CODEX_HOME: context.codexHome, AGENTRUN_WORKSPACE_ROOT: path.join(context.tmp, "workspaces") } });
assert.equal(runResult.terminalStatus, "completed");
const session = await store.getSession("hwlab-session-resume");
assert.equal(session?.threadId, "thread_selftest_1");
const resultEnvelope = await client.get(`/api/v1/runs/${sessionRun.runId}/commands/${sessionRun.commandId}/result`) as JsonRecord;
assert.equal(resultEnvelope.terminalStatus, "completed");
assert.equal(resultEnvelope.reply, "fake codex stdio reply");
assert.equal(((resultEnvelope.sessionRef as JsonRecord).threadId), "thread_selftest_1");
assert.equal(((resultEnvelope.resourceBundleRef as JsonRecord).commitId), bundle.commitId);
assertNoSecretLeak(resultEnvelope);
const resumed = await createHwlabRun(client, context, bundle, "hwlab-session-resume", "hello resumed", "hwlab-command-session-resumed");
const resumedRun = await client.get(`/api/v1/runs/${resumed.runId}`) as JsonRecord;
assert.equal(((resumedRun.sessionRef as JsonRecord).threadId), "thread_selftest_1");
const runningCancel = await createHwlabRun(client, context, bundle, "hwlab-session-cancel-running", "cancel running", "hwlab-command-cancel-running", 10_000);
const running = runOnce({ managerUrl: server.baseUrl, runId: runningCancel.runId, codexCommand: context.fakeCodexCommand, codexArgs: context.fakeCodexArgs, codexHome: context.codexHome, env: { CODEX_HOME: context.codexHome, AGENTRUN_FAKE_CODEX_MODE: "missing-terminal", AGENTRUN_WORKSPACE_ROOT: path.join(context.tmp, "workspaces-running-cancel") } });
await waitForCommandState(client, runningCancel.runId, runningCancel.commandId, "acknowledged");
await client.post(`/api/v1/commands/${runningCancel.commandId}/cancel`, { reason: "self-test running cancel" });
const runningResult = await running;
assert.equal(runningResult.terminalStatus, "cancelled");
return { name: "hwlab-manual-dispatch", tests: ["runner-job-idempotency", "pending-cancel", "result-envelope", "session-ref-resume", "resource-bundle-materialization", "running-cancel"] };
} finally {
await new Promise<void>((resolve) => server.server.close(() => resolve()));
}
};
async function createLocalGitBundle(context: SelfTestContext): Promise<{ repoUrl: string; commitId: string }> {
const repo = path.join(context.tmp, "bundle-repo");
await mkdir(repo, { recursive: true });
await execFile("git", ["init"], { cwd: repo });
await writeFile(path.join(repo, "README.md"), "HWLAB bundle self-test\n", "utf8");
await execFile("git", ["add", "README.md"], { cwd: repo });
await execFile("git", ["-c", "user.email=selftest@example.invalid", "-c", "user.name=AgentRun SelfTest", "commit", "-m", "bundle selftest"], { cwd: repo });
const { stdout } = await execFile("git", ["rev-parse", "HEAD"], { cwd: repo });
return { repoUrl: repo, commitId: stdout.trim() };
}
async function createHwlabRun(client: ManagerClient, context: SelfTestContext, bundle: { repoUrl: string; commitId: string }, sessionId: string, prompt: string, idempotencyKey: string, timeoutMs = 15_000): Promise<{ runId: string; commandId: string }> {
const run = await client.post("/api/v1/runs", {
tenantId: "hwlab",
projectId: "pikasTech/HWLAB",
workspaceRef: { kind: "opaque", repo: "pikasTech/HWLAB" },
sessionRef: { sessionId, conversationId: sessionId },
resourceBundleRef: { kind: "git", repoUrl: bundle.repoUrl, commitId: bundle.commitId, submodules: false, lfs: false },
providerId: "G14",
backendProfile: "codex",
executionPolicy: {
sandbox: "workspace-write",
approval: "never",
timeoutMs,
network: "default",
secretScope: { allowCredentialEcho: false, providerCredentials: [{ profile: "codex", secretRef: { name: "agentrun-v01-provider-codex", keys: ["auth.json", "config.toml"], mountPath: context.codexHome } }] },
},
traceSink: { kind: "hwlab", traceId: idempotencyKey },
}) as { id: string };
const command = await client.post(`/api/v1/runs/${run.id}/commands`, { type: "turn", payload: { prompt, traceId: idempotencyKey }, idempotencyKey }) as { id: string };
return { runId: run.id, commandId: command.id };
}
async function waitForCommandState(client: ManagerClient, runId: string, commandId: string, state: string): Promise<void> {
const deadline = Date.now() + 5_000;
while (Date.now() < deadline) {
const command = await client.get(`/api/v1/runs/${runId}/commands/${commandId}`) as { state?: string };
if (command.state === state) return;
await new Promise((resolve) => setTimeout(resolve, 100));
}
throw new Error(`command ${commandId} did not reach ${state}`);
}
export default selfTest;