Merge pull request #37 from pikasTech/feat-hwlab-baseline-core-v01-20260601
feat: 补齐 HWLAB 基线 AgentRun 执行元语
This commit is contained in:
@@ -39,6 +39,8 @@ POST /api/v1/runs/:runId/commands
|
||||
GET /api/v1/runs/:runId/commands/:commandId
|
||||
GET /api/v1/runs/:runId/commands/:commandId/result
|
||||
POST /api/v1/runs/:runId/runner-jobs
|
||||
GET /api/v1/runs/:runId/runner-jobs?commandId=<commandId>
|
||||
GET /api/v1/runs/:runId/runner-jobs/:runnerJobId
|
||||
POST /api/v1/commands/:commandId/cancel
|
||||
GET /api/v1/backends
|
||||
```
|
||||
@@ -101,7 +103,7 @@ Manager 只承接 HWLAB v0.2 Code Agent 的通用执行事实,不承接 HWLAB
|
||||
## 最小 Observability 合同
|
||||
|
||||
- events append-only,单 run 内 `seq` 单调递增。
|
||||
- 每个 run 必须最终出现 `terminal_status`,或保持明确 non-terminal status 并可查询 lease/heartbeat。
|
||||
- 每个 run 必须最终出现唯一 authoritative `terminal_status`,或保持明确 non-terminal status 并可查询 lease/heartbeat;assistant partial、stdout、transport close 或 idle timeout 不能替代 terminal completed。
|
||||
- failureKind 至少能区分 `schema-invalid`、`tenant-policy-denied`、`secret-unavailable`、`runner-lease-conflict`、`backend-failed`、`provider-auth-failed`、`provider-unavailable`、`infra-failed`、`cancelled`。
|
||||
- health/readiness 必须返回 Postgres reachable、schema migration ready、SecretRef redacted 状态和 build/source metadata。
|
||||
- 日志、event、trace、health 和 diagnostics 不得输出 provider credential、Codex auth/config 内容、DSN password、token 或 URL credential。
|
||||
@@ -114,6 +116,7 @@ Manager 只承接 HWLAB v0.2 Code Agent 的通用执行事实,不承接 HWLAB
|
||||
| --- | --- |
|
||||
| `status` | run/command 当前聚合状态,只能由 command state 和 terminal_status 推导。 |
|
||||
| `terminalStatus` | `completed`、`failed`、`blocked` 或 `cancelled`;没有 terminal event 时为 `null` 或 equivalent running 状态。 |
|
||||
| `completed` / `terminalSource` | `completed=true` 只能来自 terminal completed;`terminalSource` 标明来自 `terminal_status` event、run record 或暂无 terminal。 |
|
||||
| `reply` | 从 `assistant_message` 聚合的最终用户可见文本;没有 terminal completed 时不得伪造 completed reply。 |
|
||||
| `failureKind` / `blocker` | 结构化失败分类和摘要;必须 redacted。 |
|
||||
| `lastSeq` / `eventCount` | 支持调用方增量轮询和 result/trace reconciliation。 |
|
||||
@@ -151,6 +154,7 @@ Manager 只承接 HWLAB v0.2 Code Agent 的通用执行事实,不承接 HWLAB
|
||||
| `agentrun-mgr` 服务规格 | 已定义 | 本文为 v0.1 manager 权威。 |
|
||||
| Manager REST API | 已实现/已通过主闭环 | 已有 run、command、event、backends、runner register、claim、lease heartbeat、poll、ack、status、runner Job 创建和 health/readiness 的 HTTP JSON API;真实 runtime 已通过 RESTful API 主闭环。 |
|
||||
| 手动 runner Job API | 已实现 | `POST /api/v1/runs/:runId/runner-jobs` 已可创建 Kubernetes runner Job,并固化 idempotency、持久 runner job record、响应 schema 和 cancel 前置检查。 |
|
||||
| runner Job 状态查询 | 已实现 | `GET /api/v1/runs/:runId/runner-jobs` 和 `GET /api/v1/runs/:runId/runner-jobs/:runnerJobId` 返回 attempt/job/log/phase/terminal 摘要,业务客户端无需直连 Kubernetes 做最小定位。 |
|
||||
| Tenant policy boundary | 已实现最小边界 | v0.1 已做 schema、tenant/backend allowlist、executionPolicy 和 secretScope 结构校验;业务授权仍由 UniDesk/HWLAB 自己判定。 |
|
||||
| `deepseek` backendProfile allowlist | 已实现/已通过主闭环 | Manager validation、backend capability 和 matching SecretRef 校验已支持 `deepseek`;真实 runtime 已经通过 CI/CD 发布并确认 Postgres migration `002_v01_backend_profiles` 应用。 |
|
||||
| Postgres durable adapter | 已实现/已通过主闭环 | live runtime 通过 `DATABASE_URL` 使用 Postgres durable store;memory store 仅用于显式 self-test/dev。见 [spec-v01-postgres.md](spec-v01-postgres.md)。 |
|
||||
|
||||
@@ -44,6 +44,8 @@ CLI 官方 TypeScript 入口固定为 `scripts/agentrun-cli.ts`。在 G14 非交
|
||||
./scripts/agentrun runner start --run-id <runId> --backend <backendProfile>
|
||||
./scripts/agentrun runner job --run-id <runId> --command-id <commandId> [--idempotency-key <key>]
|
||||
./scripts/agentrun runner job --dry-run --run-id <runId> --command-id <commandId> --image <image>
|
||||
./scripts/agentrun runner jobs --run-id <runId> [--command-id <commandId>]
|
||||
./scripts/agentrun runner job-status [runnerJobId] --run-id <runId>
|
||||
./scripts/agentrun secrets codex render --dry-run [--profile codex|deepseek] [--codex-home <dir>]
|
||||
./scripts/agentrun backends list
|
||||
./scripts/agentrun server start|status
|
||||
@@ -53,6 +55,7 @@ CLI 官方 TypeScript 入口固定为 `scripts/agentrun-cli.ts`。在 G14 非交
|
||||
|
||||
- 创建类命令返回 `runId`、`commandId`、status 和下一步 poll command。
|
||||
- `runner start` 返回 attemptId、job/process identity、logPath 和后续 status/events 命令。
|
||||
- `runner jobs` / `runner job-status` 返回 manager 持久化的 runner Job 最小状态摘要,包括 attemptId、runnerId、namespace、jobName、phase、terminalStatus、logPath、retention 和 redacted Kubernetes identity;业务方不需要直连 Kubernetes 才能定位当前 attempt。
|
||||
- 查询类命令返回当前 state、terminal_status、failureKind、event cursor 或 logPath。
|
||||
- `events` 默认分页且有界,必须支持 `afterSeq` 和 `limit`。
|
||||
- `server logs` 返回有界日志摘要,并指向完整日志文件或 Kubernetes pod identity。
|
||||
@@ -96,6 +99,7 @@ CLI 官方 TypeScript 入口固定为 `scripts/agentrun-cli.ts`。在 G14 非交
|
||||
| `scripts/agentrun-cli.ts` | 已实现 | 已提供 run/command/event/backend/server 基础命令和 JSON envelope;`scripts/agentrun` 是同一入口的 Bun 定位 launcher。 |
|
||||
| CLI 调 manager REST | 已实现 | CLI 通过 `ManagerClient` 调 manager REST;自测试可用内存 manager,综合联调必须指向真实 `agentrun-v01` manager。 |
|
||||
| runner start/job | 已实现 | `runner start` 可执行 host process runner;`runner job --dry-run` 可渲染 Kubernetes Job JSON;`runner job` 正式路径通过 manager REST 创建 Kubernetes Job,支持 `--idempotency-key` 并快速返回 job identity、SecretRef、retention 和轮询命令。 |
|
||||
| runner jobs/job-status | 已实现 | CLI 通过 manager REST 查询 runner Job 持久记录和最小状态摘要,不直连 Kubernetes、不读取 Secret 值。 |
|
||||
| result/cancel CLI | 已实现 | `runs result`、`commands result`、`runs cancel` 和 `commands cancel` 均调用 manager REST,不维护独立状态。 |
|
||||
| CLI 测试规格 | 已定义/已验证主闭环 | 综合联调见 [spec-v01-validation.md](spec-v01-validation.md);每次发布仍按手动交互验收复跑。 |
|
||||
| `deepseek` profile CLI | 已实现/已通过主闭环 | `secrets codex render --profile deepseek`、`backends list`、`runner start --backend`、`runner job` 和 JSON 错误可见性已实现;真实 CLI/RESTful 联调已通过 `codex -> deepseek -> codex` 切换主闭环。 |
|
||||
|
||||
@@ -23,6 +23,7 @@
|
||||
- CLI:默认 JSON、空 stdout 失败、长操作短返回、错误结构化。
|
||||
- Postgres adapter:migration、事务、run/command/event round-trip、重启后可查询。
|
||||
- Secret 分发:SecretRef schema、missing secret failure、redaction。
|
||||
- HWLAB v0.2 基线承接:可以用 fake backend/临时 manager 做组件自测试,覆盖 event contract、result completed 防误判、bounded output、runner job status、SessionRef profile 隔离、ResourceBundleRef 失败分类和 backend preflight redaction;这些自测试不能替代真实 `agentrun-v01` CLI 交互验收。
|
||||
|
||||
自测试应使用 Bun + TypeScript 运行,Codex 相关自测试可以使用 fake app-server JSON-RPC client 模拟 `initialize`、`thread/start`、`thread/resume`、`turn/start`、assistant 输出、协议错误、timeout 和 transport close。
|
||||
|
||||
@@ -189,6 +190,7 @@ T8 是涉及 backend profile 变更时的综合联调标准;不涉及 backend
|
||||
| --- | --- | --- |
|
||||
| 两层验证模型 | 已定义 | 本文为 v0.1 验证权威。 |
|
||||
| 自测试 task | 已实现 | `src/selftest/run.ts` 自动发现 `src/selftest/cases/*.ts`;覆盖 redaction/Postgres contract、manager memory、runner Job render/create、Codex fake app-server stdio 和 Secret render。 |
|
||||
| HWLAB 基线承接自测试 | 已实现 | `src/selftest/cases/60-hwlab-baseline-contract.ts` 固化 event/result/failureKind/bounded output/runner job status/session/profile/bundle/preflight 的组件合同;综合联调仍必须走正式 AgentRun CLI 且不能使用 mock。 |
|
||||
| 综合联调验收规格 | 已增强 | 本文保留人工交互验收模型;T4-T8 定义 CLI、RESTful、一致性、负向场景和 backend profile 切换的手动验收标准,不新增自动脚本或门禁。 |
|
||||
| CLI 交互联调标准 | 已定义 | 必须只使用正式 CLI,验证真实 run 生命周期和可观测输出。 |
|
||||
| RESTful API 交互联调标准 | 已定义 | 必须直连真实 manager HTTP JSON API,验证服务合同和 durable facts。 |
|
||||
|
||||
@@ -80,9 +80,26 @@ async function dispatch(args: ParsedArgs): Promise<JsonValue> {
|
||||
return runOnce(options) as unknown as JsonValue;
|
||||
}
|
||||
if (group === "runner" && command === "job") return renderRunnerJob(args);
|
||||
if (group === "runner" && command === "jobs") return listRunnerJobs(args);
|
||||
if (group === "runner" && command === "job-status") return showRunnerJobStatus(args);
|
||||
throw new AgentRunError("schema-invalid", `unsupported command: ${args.positional.join(" ")}`, { httpStatus: 2 });
|
||||
}
|
||||
|
||||
async function listRunnerJobs(args: ParsedArgs): Promise<JsonValue> {
|
||||
const runId = flag(args, "run-id", "");
|
||||
if (!runId) throw new AgentRunError("schema-invalid", "runner jobs requires --run-id", { httpStatus: 2 });
|
||||
const commandId = optionalFlag(args, "command-id");
|
||||
return client(args).get(`/api/v1/runs/${encodeURIComponent(runId)}/runner-jobs${commandId ? `?commandId=${encodeURIComponent(commandId)}` : ""}`);
|
||||
}
|
||||
|
||||
async function showRunnerJobStatus(args: ParsedArgs): Promise<JsonValue> {
|
||||
const runId = flag(args, "run-id", "");
|
||||
if (!runId) throw new AgentRunError("schema-invalid", "runner job-status requires --run-id", { httpStatus: 2 });
|
||||
const runnerJobId = args.positional[2] ?? optionalFlag(args, "runner-job-id");
|
||||
if (!runnerJobId) return listRunnerJobs(args);
|
||||
return client(args).get(`/api/v1/runs/${encodeURIComponent(runId)}/runner-jobs/${encodeURIComponent(runnerJobId)}`);
|
||||
}
|
||||
|
||||
async function renderRunnerJob(args: ParsedArgs): Promise<JsonRecord> {
|
||||
const runId = flag(args, "run-id", "");
|
||||
const commandId = flag(args, "command-id", "");
|
||||
@@ -221,6 +238,8 @@ function help(): JsonRecord {
|
||||
"runner start --run-id <runId> [--backend codex|deepseek]",
|
||||
"runner job --run-id <runId> --command-id <commandId> [--image <image>] [--runner-manager-url <url>] [--idempotency-key <key>]",
|
||||
"runner job --dry-run --run-id <runId> --command-id <commandId> --image <image>",
|
||||
"runner jobs --run-id <runId> [--command-id <commandId>]",
|
||||
"runner job-status [runnerJobId] --run-id <runId>",
|
||||
"secrets codex render --dry-run [--profile codex|deepseek] [--codex-home <dir>] [--namespace agentrun-v01] [--secret-name <name>]",
|
||||
"backends list",
|
||||
"server start|status",
|
||||
|
||||
@@ -7,6 +7,7 @@ import * as readline from "node:readline";
|
||||
import type { BackendEvent, BackendProfile, BackendTurnResult, FailureKind, JsonRecord, JsonValue, TerminalStatus } from "../common/types.js";
|
||||
import { redactJson, redactText } from "../common/redaction.js";
|
||||
import { backendProfileSpec } from "../common/backend-profiles.js";
|
||||
import { boundedTextSummary, commandOutputPayload } from "../common/output.js";
|
||||
|
||||
const codexProtocol = "codex-app-server-jsonrpc-stdio";
|
||||
const defaultCodexArgs = ["app-server", "--listen", "stdio://"];
|
||||
@@ -437,8 +438,8 @@ function normalizeCodexNotification(message: JsonRecord): { events: BackendEvent
|
||||
return { events: [{ type: "backend_status", payload: { phase: method, turnId } }], ...(turnId ? { turnId } : {}) };
|
||||
}
|
||||
if (method === "item/agentMessage/delta") return { events: [], assistantDelta: typeof params.delta === "string" ? params.delta : "" };
|
||||
if (method === "item/commandExecution/outputDelta") return { events: [{ type: "command_output", payload: { stream: "stdout", text: typeof params.delta === "string" ? params.delta : "" } }] };
|
||||
if (method === "item/started" || method === "item/completed") return { events: [{ type: "tool_call", payload: { method, item: redactJson(asRecordAt(params, "item")) } }] };
|
||||
if (method === "item/commandExecution/outputDelta") return { events: [{ type: "command_output", payload: commandOutputPayload("stdout", typeof params.delta === "string" ? params.delta : "") }] };
|
||||
if (method === "item/started" || method === "item/completed") return { events: [{ type: "tool_call", payload: toolCallPayload(method, asRecordAt(params, "item")) }] };
|
||||
if (method === "error") {
|
||||
const error = asRecordAt(params, "error");
|
||||
const messageText = typeof error.message === "string" ? error.message : "Codex app-server error";
|
||||
@@ -469,6 +470,13 @@ function terminalStatusFromValue(value: unknown): TerminalStatus {
|
||||
return "failed";
|
||||
}
|
||||
|
||||
function toolCallPayload(method: string, item: JsonRecord): JsonRecord {
|
||||
const redacted = redactJson(item);
|
||||
const summary = boundedTextSummary(JSON.stringify(redacted));
|
||||
if (summary.outputTruncated !== true) return { method, item: redacted, summary, outputBytes: summary.outputBytes, outputTruncated: false };
|
||||
return { method, itemPreview: summary.text, summary, outputBytes: summary.outputBytes, outputTruncated: true };
|
||||
}
|
||||
|
||||
function withOptionalModel(params: JsonRecord, model: string | undefined): JsonRecord {
|
||||
const value = typeof model === "string" ? model.trim() : "";
|
||||
if (!value) return params;
|
||||
|
||||
@@ -51,6 +51,7 @@ export function isBackendProfile(value: string): value is BackendProfile {
|
||||
}
|
||||
|
||||
export function backendCapability(spec: BackendProfileSpec): JsonRecord {
|
||||
const defaultSecretRef = { name: spec.defaultSecretName, keys: [...spec.requiredSecretKeys], valuesPrinted: false };
|
||||
return {
|
||||
profile: spec.profile,
|
||||
backendKind: spec.backendKind,
|
||||
@@ -59,12 +60,32 @@ export function backendCapability(spec: BackendProfileSpec): JsonRecord {
|
||||
command: spec.command,
|
||||
status: spec.status,
|
||||
requiredSecretKeys: [...spec.requiredSecretKeys],
|
||||
defaultSecretRef: { name: spec.defaultSecretName, keys: [...spec.requiredSecretKeys] },
|
||||
defaultSecretRef,
|
||||
preflight: {
|
||||
readiness: "requires-profile-secret-ref",
|
||||
defaultSecretRef,
|
||||
requiredSecretKeys: [...spec.requiredSecretKeys],
|
||||
profileIsolation: spec.profileIsolation,
|
||||
credentialValuesPrinted: false,
|
||||
valuesPrinted: false,
|
||||
},
|
||||
profileIsolation: spec.profileIsolation,
|
||||
description: spec.description,
|
||||
};
|
||||
}
|
||||
|
||||
export function mergeBackendCapability(profile: string, storedCapabilities: JsonRecord): JsonRecord {
|
||||
const spec = backendProfileSpec(profile);
|
||||
if (!spec) return { profile, ...storedCapabilities, valuesPrinted: false };
|
||||
const base = backendCapability(spec);
|
||||
return {
|
||||
...base,
|
||||
...storedCapabilities,
|
||||
defaultSecretRef: base.defaultSecretRef,
|
||||
preflight: base.preflight,
|
||||
};
|
||||
}
|
||||
|
||||
export function backendCapabilities(): JsonRecord[] {
|
||||
return backendProfileSpecs.map(backendCapability);
|
||||
}
|
||||
|
||||
@@ -0,0 +1,88 @@
|
||||
import { AgentRunError } from "./errors.js";
|
||||
import type { EventType, JsonRecord, RunEvent, TerminalStatus } from "./types.js";
|
||||
import { boundedTextSummary, commandOutputPayload } from "./output.js";
|
||||
import { redactJson } from "./redaction.js";
|
||||
|
||||
export const eventTypes = ["backend_status", "assistant_message", "tool_call", "command_output", "diff", "error", "terminal_status"] as const satisfies readonly EventType[];
|
||||
export const terminalStatuses = ["completed", "failed", "blocked", "cancelled"] as const satisfies readonly TerminalStatus[];
|
||||
|
||||
const eventTypeSet = new Set<string>(eventTypes);
|
||||
const terminalStatusSet = new Set<string>(terminalStatuses);
|
||||
|
||||
export function isEventType(value: unknown): value is EventType {
|
||||
return typeof value === "string" && eventTypeSet.has(value);
|
||||
}
|
||||
|
||||
export function isTerminalStatus(value: unknown): value is TerminalStatus {
|
||||
return typeof value === "string" && terminalStatusSet.has(value);
|
||||
}
|
||||
|
||||
export function requireEventType(value: unknown): EventType {
|
||||
if (isEventType(value)) return value;
|
||||
throw new AgentRunError("schema-invalid", `event.type ${String(value)} is not supported`, { httpStatus: 400, details: { allowedEventTypes: [...eventTypes] } });
|
||||
}
|
||||
|
||||
export function normalizeRunEventPayload(type: EventType, payload: JsonRecord): JsonRecord {
|
||||
if (type === "terminal_status") return normalizeTerminalStatusPayload(payload);
|
||||
if (type === "command_output") return normalizeCommandOutputPayload(payload);
|
||||
if (type === "assistant_message") return normalizeTextPayload(payload);
|
||||
if (type === "tool_call") return normalizeToolCallPayload(payload);
|
||||
return payload;
|
||||
}
|
||||
|
||||
export function eventContractSummary(events: RunEvent[]): JsonRecord {
|
||||
const issues: JsonRecord[] = [];
|
||||
let terminalStatusCount = 0;
|
||||
for (let index = 0; index < events.length; index += 1) {
|
||||
const event = events[index];
|
||||
if (!eventTypeSet.has(event.type)) issues.push({ code: "event-type-invalid", seq: event.seq, type: event.type });
|
||||
if (event.seq !== index + 1) issues.push({ code: "seq-not-contiguous", expectedSeq: index + 1, actualSeq: event.seq });
|
||||
if (event.type === "terminal_status") {
|
||||
terminalStatusCount += 1;
|
||||
if (!isTerminalStatus(event.payload.terminalStatus)) issues.push({ code: "terminal-status-invalid", seq: event.seq, terminalStatus: String(event.payload.terminalStatus ?? "") });
|
||||
}
|
||||
}
|
||||
if (terminalStatusCount > 1) issues.push({ code: "terminal-status-duplicated", terminalStatusCount });
|
||||
return {
|
||||
ok: issues.length === 0,
|
||||
eventCount: events.length,
|
||||
lastSeq: events.at(-1)?.seq ?? 0,
|
||||
terminalStatusCount,
|
||||
issues,
|
||||
};
|
||||
}
|
||||
|
||||
function normalizeTerminalStatusPayload(payload: JsonRecord): JsonRecord {
|
||||
if (!isTerminalStatus(payload.terminalStatus)) {
|
||||
throw new AgentRunError("schema-invalid", "terminal_status event requires terminalStatus completed|failed|blocked|cancelled", { httpStatus: 400, details: { allowedTerminalStatuses: [...terminalStatuses] } });
|
||||
}
|
||||
return payload;
|
||||
}
|
||||
|
||||
function normalizeCommandOutputPayload(payload: JsonRecord): JsonRecord {
|
||||
const { text: _text, delta: _delta, content: _content, summary: _summary, ...rest } = payload;
|
||||
const value = typeof payload.text === "string" ? payload.text : typeof payload.delta === "string" ? payload.delta : typeof payload.content === "string" ? payload.content : "";
|
||||
const stream = typeof payload.stream === "string" ? payload.stream : "stdout";
|
||||
return { ...rest, ...commandOutputPayload(stream, value) };
|
||||
}
|
||||
|
||||
function normalizeTextPayload(payload: JsonRecord): JsonRecord {
|
||||
const { text: _text, delta: _delta, content: _content, summary: _summary, ...rest } = payload;
|
||||
const value = typeof payload.text === "string" ? payload.text : typeof payload.delta === "string" ? payload.delta : typeof payload.content === "string" ? payload.content : "";
|
||||
const summary = boundedTextSummary(value);
|
||||
return { ...rest, text: summary.text, summary, textBytes: summary.textBytes, textTruncated: summary.textTruncated };
|
||||
}
|
||||
|
||||
function normalizeToolCallPayload(payload: JsonRecord): JsonRecord {
|
||||
const redacted = redactJson(payload);
|
||||
const json = JSON.stringify(redacted);
|
||||
const summary = boundedTextSummary(json);
|
||||
if (summary.outputTruncated !== true) return { ...redacted, summary, outputBytes: summary.outputBytes, outputTruncated: false };
|
||||
return {
|
||||
method: typeof payload.method === "string" ? payload.method : null,
|
||||
itemPreview: summary.text,
|
||||
summary,
|
||||
outputBytes: summary.outputBytes,
|
||||
outputTruncated: true,
|
||||
};
|
||||
}
|
||||
@@ -0,0 +1,62 @@
|
||||
import type { JsonRecord } from "./types.js";
|
||||
import { redactText } from "./redaction.js";
|
||||
|
||||
const defaultOutputLimitChars = 4_000;
|
||||
|
||||
export interface BoundedTextOptions {
|
||||
limitChars?: number;
|
||||
}
|
||||
|
||||
export function boundedTextSummary(value: unknown, options: BoundedTextOptions = {}): JsonRecord {
|
||||
const limitChars = positiveLimit(options.limitChars ?? defaultOutputLimitChars);
|
||||
const redacted = redactText(typeof value === "string" ? value : value === null || value === undefined ? "" : String(value));
|
||||
const outputBytes = Buffer.byteLength(redacted, "utf8");
|
||||
const outputTruncated = redacted.length > limitChars;
|
||||
const text = outputTruncated ? redacted.slice(0, limitChars) : redacted;
|
||||
return {
|
||||
text,
|
||||
textChars: redacted.length,
|
||||
textBytes: outputBytes,
|
||||
outputBytes,
|
||||
limitChars,
|
||||
textTruncated: outputTruncated,
|
||||
outputTruncated,
|
||||
};
|
||||
}
|
||||
|
||||
export function commandOutputPayload(stream: string, value: unknown, options: BoundedTextOptions = {}): JsonRecord {
|
||||
const summary = boundedTextSummary(value, options);
|
||||
return {
|
||||
stream: stream === "stderr" ? "stderr" : "stdout",
|
||||
text: summary.text,
|
||||
summary,
|
||||
outputBytes: summary.outputBytes,
|
||||
outputTruncated: summary.outputTruncated,
|
||||
textBytes: summary.textBytes,
|
||||
textTruncated: summary.textTruncated,
|
||||
};
|
||||
}
|
||||
|
||||
export function outputBytesFromPayload(payload: JsonRecord): number {
|
||||
for (const key of ["outputBytes", "textBytes"] as const) {
|
||||
const value = payload[key];
|
||||
if (typeof value === "number" && Number.isFinite(value) && value >= 0) return value;
|
||||
}
|
||||
const summary = payload.summary;
|
||||
if (typeof summary === "object" && summary !== null && !Array.isArray(summary)) {
|
||||
const outputBytes = (summary as JsonRecord).outputBytes;
|
||||
if (typeof outputBytes === "number" && Number.isFinite(outputBytes) && outputBytes >= 0) return outputBytes;
|
||||
}
|
||||
const text = typeof payload.text === "string" ? payload.text : "";
|
||||
return Buffer.byteLength(text, "utf8");
|
||||
}
|
||||
|
||||
export function outputTruncatedFromPayload(payload: JsonRecord): boolean {
|
||||
if (payload.outputTruncated === true || payload.textTruncated === true || payload.truncated === true) return true;
|
||||
const summary = payload.summary;
|
||||
return typeof summary === "object" && summary !== null && !Array.isArray(summary) && (summary as JsonRecord).outputTruncated === true;
|
||||
}
|
||||
|
||||
function positiveLimit(value: number): number {
|
||||
return Number.isFinite(value) && value > 0 ? Math.floor(value) : defaultOutputLimitChars;
|
||||
}
|
||||
@@ -6,8 +6,9 @@ import { redactJson } from "../common/redaction.js";
|
||||
import type { BackendProfile, BackendTurnResult, CommandRecord, CommandState, CreateCommandInput, CreateRunInput, EventType, FailureKind, JsonRecord, JsonValue, RunEvent, RunnerJobRecord, RunnerRecord, RunRecord, RunStatus, SessionRecord, SessionRef, TerminalStatus } from "../common/types.js";
|
||||
import { newId, nowIso, stableHash } from "../common/validation.js";
|
||||
import type { AgentRunStore, SaveRunnerJobInput, StoreHealth } from "./store.js";
|
||||
import { commandStateFromTerminal, isTerminalCommandState, isTerminalRunStatus, sessionRefFromRecord, statusFromTerminal, summarizeResourceBundleRef, summarizeSessionRef } from "./store.js";
|
||||
import { backendCapabilitiesSqlValues } from "../common/backend-profiles.js";
|
||||
import { assertSessionBoundary, commandStateFromTerminal, isTerminalCommandState, isTerminalRunStatus, sessionRefFromRecord, statusFromTerminal, summarizeResourceBundleRef, summarizeSessionRef } from "./store.js";
|
||||
import { backendCapabilitiesSqlValues, mergeBackendCapability } from "../common/backend-profiles.js";
|
||||
import { normalizeRunEventPayload, requireEventType } from "../common/events.js";
|
||||
|
||||
interface PostgresStoreOptions {
|
||||
connectionString: string;
|
||||
@@ -509,7 +510,10 @@ CREATE TABLE IF NOT EXISTS agentrun_schema_migrations (
|
||||
|
||||
async backends(): Promise<JsonRecord[]> {
|
||||
const result = await this.pool.query("SELECT * FROM agentrun_backends ORDER BY profile ASC");
|
||||
return result.rows.map((row) => ({ profile: stringValue(row.profile), ...jsonRecord(row.capabilities), capacity: jsonValue(row.capacity), health: jsonValue(row.health), updatedAt: nullableIso(row.updated_at) }));
|
||||
return result.rows.map((row) => {
|
||||
const profile = stringValue(row.profile);
|
||||
return { ...mergeBackendCapability(profile, jsonRecord(row.capabilities)), capacity: jsonValue(row.capacity), health: jsonValue(row.health), updatedAt: nullableIso(row.updated_at) };
|
||||
});
|
||||
}
|
||||
|
||||
async close(): Promise<void> {
|
||||
@@ -517,8 +521,10 @@ CREATE TABLE IF NOT EXISTS agentrun_schema_migrations (
|
||||
}
|
||||
|
||||
private async appendEventWithLockedRun(client: PoolClient, runId: string, type: EventType, payload: JsonRecord): Promise<RunEvent> {
|
||||
const eventType = requireEventType(type);
|
||||
const eventPayload = normalizeRunEventPayload(eventType, payload);
|
||||
const seq = await this.nextSeq(client, "agentrun_events", runId);
|
||||
const event: RunEvent = { id: newId("evt"), runId, seq, type, payload: redactJson(payload), createdAt: nowIso() };
|
||||
const event: RunEvent = { id: newId("evt"), runId, seq, type: eventType, payload: redactJson(eventPayload), createdAt: nowIso() };
|
||||
await client.query("INSERT INTO agentrun_events (id, run_id, seq, type, payload, created_at) VALUES ($1, $2, $3, $4, $5::jsonb, $6)", [event.id, event.runId, event.seq, event.type, JSON.stringify(event.payload), event.createdAt]);
|
||||
return event;
|
||||
}
|
||||
@@ -538,7 +544,11 @@ CREATE TABLE IF NOT EXISTS agentrun_schema_migrations (
|
||||
private async resolveSessionForRun(client: PoolClient, input: CreateRunInput, at: string): Promise<SessionRef | null> {
|
||||
if (!input.sessionRef) return null;
|
||||
const existing = await client.query("SELECT * FROM agentrun_sessions WHERE session_id = $1 FOR UPDATE", [input.sessionRef.sessionId]);
|
||||
if (existing.rows[0]) return sessionRefFromRecord(sessionFromRow(existing.rows[0]), input.sessionRef);
|
||||
if (existing.rows[0]) {
|
||||
const session = sessionFromRow(existing.rows[0]);
|
||||
assertSessionBoundary(session, input);
|
||||
return sessionRefFromRecord(session, input.sessionRef);
|
||||
}
|
||||
const record: SessionRecord = {
|
||||
sessionId: input.sessionRef.sessionId,
|
||||
tenantId: input.tenantId,
|
||||
|
||||
+21
-4
@@ -1,5 +1,6 @@
|
||||
import type { AgentRunStore } from "./store.js";
|
||||
import type { CommandRecord, JsonRecord, JsonValue, RunEvent, RunRecord, RunnerJobRecord, TerminalStatus } from "../common/types.js";
|
||||
import { outputBytesFromPayload, outputTruncatedFromPayload } from "../common/output.js";
|
||||
|
||||
export async function buildRunResult(store: AgentRunStore, runId: string, commandId?: string): Promise<JsonRecord> {
|
||||
const run = await store.getRun(runId);
|
||||
@@ -7,7 +8,9 @@ export async function buildRunResult(store: AgentRunStore, runId: string, comman
|
||||
const events = await store.listEvents(runId, 0, 500);
|
||||
const jobs = await store.listRunnerJobs(runId, command?.id);
|
||||
const latestJob = jobs.at(-1) ?? null;
|
||||
const terminal = terminalFromEvents(events) ?? run.terminalStatus;
|
||||
const terminalEventStatus = terminalFromEvents(events);
|
||||
const terminal = terminalEventStatus ?? run.terminalStatus;
|
||||
const terminalSource = terminalEventStatus ? "terminal_status-event" : run.terminalStatus ? "run-record" : "none";
|
||||
const failureKind = run.failureKind ?? failureKindFromEvents(events);
|
||||
const reply = assistantReply(events);
|
||||
const blocker = terminal === "blocked" || terminal === "failed" ? { failureKind, message: run.failureMessage ?? messageFromEvents(events) } : null;
|
||||
@@ -22,6 +25,8 @@ export async function buildRunResult(store: AgentRunStore, runId: string, comman
|
||||
runStatus: run.status,
|
||||
commandState: command?.state ?? null,
|
||||
terminalStatus: terminal,
|
||||
terminalSource,
|
||||
completed: terminal === "completed",
|
||||
reply,
|
||||
failureKind,
|
||||
failureMessage: run.failureMessage ?? messageFromEvents(events),
|
||||
@@ -90,18 +95,30 @@ function artifactSummary(events: RunEvent[]): JsonRecord {
|
||||
let diffEvents = 0;
|
||||
let toolCallEvents = 0;
|
||||
let outputChars = 0;
|
||||
let truncatedEvents = 0;
|
||||
let outputBytes = 0;
|
||||
let outputTruncatedEvents = 0;
|
||||
const streamSummary: Record<string, { events: number; outputBytes: number; outputTruncated: boolean }> = {
|
||||
stdout: { events: 0, outputBytes: 0, outputTruncated: false },
|
||||
stderr: { events: 0, outputBytes: 0, outputTruncated: false },
|
||||
};
|
||||
for (const event of events) {
|
||||
if (event.type === "command_output") {
|
||||
commandOutputEvents += 1;
|
||||
const text = textPayload(event.payload);
|
||||
outputChars += text.length;
|
||||
if (event.payload.truncated === true) truncatedEvents += 1;
|
||||
const bytes = outputBytesFromPayload(event.payload);
|
||||
outputBytes += bytes;
|
||||
const truncated = outputTruncatedFromPayload(event.payload);
|
||||
if (truncated) outputTruncatedEvents += 1;
|
||||
const stream = event.payload.stream === "stderr" ? "stderr" : "stdout";
|
||||
streamSummary[stream].events += 1;
|
||||
streamSummary[stream].outputBytes += bytes;
|
||||
streamSummary[stream].outputTruncated ||= truncated;
|
||||
}
|
||||
if (event.type === "diff") diffEvents += 1;
|
||||
if (event.type === "tool_call") toolCallEvents += 1;
|
||||
}
|
||||
return { commandOutputEvents, diffEvents, toolCallEvents, outputChars, truncatedEvents };
|
||||
return { commandOutputEvents, diffEvents, toolCallEvents, outputChars, outputBytes, truncatedEvents: outputTruncatedEvents, outputTruncatedEvents, stdoutSummary: streamSummary.stdout, stderrSummary: streamSummary.stderr };
|
||||
}
|
||||
|
||||
function attemptFromEvents(events: RunEvent[]): string | null {
|
||||
|
||||
@@ -0,0 +1,53 @@
|
||||
import type { JsonRecord, RunEvent, RunnerJobRecord, TerminalStatus } from "../common/types.js";
|
||||
|
||||
export function runnerJobStatusSummary(job: RunnerJobRecord, events: RunEvent[] = []): JsonRecord {
|
||||
const terminalEvent = latestTerminalEvent(events);
|
||||
const runner = recordAt(job.result, "runner");
|
||||
const jobIdentity = recordAt(job.result, "jobIdentity");
|
||||
const kubernetes = recordAt(job.result, "kubernetes");
|
||||
const retention = recordAt(job.result, "retention");
|
||||
const terminalStatus = terminalEvent?.payload.terminalStatus;
|
||||
return {
|
||||
id: job.id,
|
||||
runId: job.runId,
|
||||
commandId: job.commandId,
|
||||
attemptId: job.attemptId,
|
||||
runnerId: job.runnerId,
|
||||
namespace: job.namespace,
|
||||
jobName: job.jobName,
|
||||
managerUrl: job.managerUrl,
|
||||
image: job.image,
|
||||
sourceCommit: job.sourceCommit,
|
||||
serviceAccountName: job.serviceAccountName,
|
||||
phase: terminalStatus ? `terminal:${terminalStatus}` : kubernetes.created === true ? "created" : "recorded",
|
||||
terminalStatus: isTerminalStatus(terminalStatus) ? terminalStatus : null,
|
||||
failureKind: typeof terminalEvent?.payload.failureKind === "string" ? terminalEvent.payload.failureKind : null,
|
||||
exitCode: null,
|
||||
startedAt: null,
|
||||
finishedAt: terminalEvent?.createdAt ?? null,
|
||||
jobIdentity,
|
||||
podIdentity: recordAt(job.result, "podIdentity"),
|
||||
logPath: typeof runner.logPath === "string" ? runner.logPath : null,
|
||||
retention,
|
||||
kubernetes,
|
||||
createdAt: job.createdAt,
|
||||
updatedAt: job.updatedAt,
|
||||
valuesPrinted: false,
|
||||
};
|
||||
}
|
||||
|
||||
function latestTerminalEvent(events: RunEvent[]): RunEvent | null {
|
||||
for (const event of [...events].reverse()) {
|
||||
if (event.type === "terminal_status") return event;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
function recordAt(record: JsonRecord, key: string): JsonRecord {
|
||||
const value = record[key];
|
||||
return typeof value === "object" && value !== null && !Array.isArray(value) ? value as JsonRecord : {};
|
||||
}
|
||||
|
||||
function isTerminalStatus(value: unknown): value is TerminalStatus {
|
||||
return value === "completed" || value === "failed" || value === "blocked" || value === "cancelled";
|
||||
}
|
||||
@@ -8,6 +8,7 @@ import { asRecord, validateCreateCommand, validateCreateRun } from "../common/va
|
||||
import type { ApiErrorBody, ApiOkBody, JsonRecord, JsonValue, RunEvent } from "../common/types.js";
|
||||
import { createKubernetesRunnerJob } from "./kubernetes-runner-job.js";
|
||||
import { buildRunResult } from "./result.js";
|
||||
import { runnerJobStatusSummary } from "./runner-job-status.js";
|
||||
|
||||
export interface ManagerServerOptions {
|
||||
store?: AgentRunStore;
|
||||
@@ -104,6 +105,22 @@ async function route({ method, url, body, store, sourceCommit, runnerJobDefaults
|
||||
},
|
||||
}) as unknown as JsonValue;
|
||||
}
|
||||
if (method === "GET" && runnerJobMatch) {
|
||||
const runId = runnerJobMatch[1] ?? "";
|
||||
const commandId = url.searchParams.get("commandId") ?? undefined;
|
||||
const jobs = await store.listRunnerJobs(runId, commandId);
|
||||
const events = await store.listEvents(runId, 0, 500);
|
||||
return { items: jobs.map((job) => runnerJobStatusSummary(job, events)), count: jobs.length, lastSeq: events.at(-1)?.seq ?? 0 };
|
||||
}
|
||||
const runnerJobShowMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/runner-jobs\/([^/]+)$/u);
|
||||
if (method === "GET" && runnerJobShowMatch) {
|
||||
const runId = runnerJobShowMatch[1] ?? "";
|
||||
const runnerJobId = runnerJobShowMatch[2] ?? "";
|
||||
const jobs = await store.listRunnerJobs(runId);
|
||||
const job = jobs.find((item) => item.id === runnerJobId);
|
||||
if (!job) throw new AgentRunError("schema-invalid", `runner job ${runnerJobId} was not found`, { httpStatus: 404 });
|
||||
return runnerJobStatusSummary(job, await store.listEvents(runId, 0, 500)) as JsonValue;
|
||||
}
|
||||
const commandShowMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/commands\/([^/]+)$/u);
|
||||
if (method === "GET" && commandShowMatch) return await store.getCommand(commandShowMatch[2] ?? "") as unknown as JsonValue;
|
||||
const commandResultMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/commands\/([^/]+)\/result$/u);
|
||||
|
||||
+17
-2
@@ -3,6 +3,7 @@ import { AgentRunError } from "../common/errors.js";
|
||||
import { newId, nowIso, stableHash } from "../common/validation.js";
|
||||
import { redactJson } from "../common/redaction.js";
|
||||
import { backendCapabilities } from "../common/backend-profiles.js";
|
||||
import { normalizeRunEventPayload, requireEventType } from "../common/events.js";
|
||||
|
||||
export type MaybePromise<T> = T | Promise<T>;
|
||||
|
||||
@@ -198,8 +199,10 @@ export class MemoryAgentRunStore implements AgentRunStore {
|
||||
|
||||
appendEvent(runId: string, type: RunEvent["type"], payload: JsonRecord): RunEvent {
|
||||
this.getRun(runId);
|
||||
const eventType = requireEventType(type);
|
||||
const eventPayload = normalizeRunEventPayload(eventType, payload);
|
||||
const events = this.eventsByRun.get(runId) ?? [];
|
||||
const event: RunEvent = { id: newId("evt"), runId, seq: events.length + 1, type, payload: redactJson(payload), createdAt: nowIso() };
|
||||
const event: RunEvent = { id: newId("evt"), runId, seq: events.length + 1, type: eventType, payload: redactJson(eventPayload), createdAt: nowIso() };
|
||||
events.push(event);
|
||||
this.eventsByRun.set(runId, events);
|
||||
return event;
|
||||
@@ -258,7 +261,10 @@ export class MemoryAgentRunStore implements AgentRunStore {
|
||||
private resolveSessionForRun(input: CreateRunInput, at: string): SessionRef | null {
|
||||
if (!input.sessionRef) return null;
|
||||
const existing = this.sessions.get(input.sessionRef.sessionId);
|
||||
if (existing) return sessionRefFromRecord(existing, input.sessionRef);
|
||||
if (existing) {
|
||||
assertSessionBoundary(existing, input);
|
||||
return sessionRefFromRecord(existing, input.sessionRef);
|
||||
}
|
||||
const record: SessionRecord = {
|
||||
sessionId: input.sessionRef.sessionId,
|
||||
tenantId: input.tenantId,
|
||||
@@ -298,6 +304,15 @@ export class MemoryAgentRunStore implements AgentRunStore {
|
||||
}
|
||||
}
|
||||
|
||||
export function assertSessionBoundary(existing: SessionRecord, input: CreateRunInput): void {
|
||||
if (existing.tenantId !== input.tenantId || existing.projectId !== input.projectId) {
|
||||
throw new AgentRunError("tenant-policy-denied", "sessionRef cannot be reused across tenant or project boundary", { httpStatus: 403, details: { sessionId: existing.sessionId, valuesPrinted: false } });
|
||||
}
|
||||
if (existing.backendProfile !== input.backendProfile) {
|
||||
throw new AgentRunError("schema-invalid", "sessionRef cannot be reused across backendProfile boundary", { httpStatus: 400, details: { sessionId: existing.sessionId, existingBackendProfile: existing.backendProfile, requestedBackendProfile: input.backendProfile, valuesPrinted: false } });
|
||||
}
|
||||
}
|
||||
|
||||
export function statusFromTerminal(terminalStatus: TerminalStatus): RunRecord["status"] {
|
||||
if (terminalStatus === "completed") return "completed";
|
||||
if (terminalStatus === "cancelled") return "cancelled";
|
||||
|
||||
+11
-2
@@ -71,10 +71,20 @@ export async function runOnce(options: RunnerOnceOptions): Promise<JsonRecord> {
|
||||
}
|
||||
await assertNotCancelled(api, options.runId, command.id);
|
||||
const result = await runBackendTurn(claimed, command, { ...options, ...(workspacePath ? { workspacePath } : {}), abortSignal: abortController.signal });
|
||||
for (const event of result.events) await api.appendEvent(options.runId, event);
|
||||
for (const event of result.events) {
|
||||
if (event.type !== "terminal_status") await api.appendEvent(options.runId, event);
|
||||
}
|
||||
await api.reportCommandStatus(command.id, { terminalStatus: result.terminalStatus, failureKind: result.failureKind, failureMessage: result.failureMessage });
|
||||
const finalRun = await api.reportStatus(options.runId, { terminalStatus: result.terminalStatus, failureKind: result.failureKind, failureMessage: result.failureMessage, ...(result.threadId ? { threadId: result.threadId } : {}), ...(result.turnId ? { turnId: result.turnId } : {}) }) as RunRecord;
|
||||
return { runner, commandId: command.id, terminalStatus: result.terminalStatus, failureKind: result.failureKind, run: finalRun } as JsonRecord;
|
||||
} catch (error) {
|
||||
const failureKind = failureKindFromError(error);
|
||||
const terminalStatus = terminalStatusForFailure(failureKind);
|
||||
const message = errorMessage(error);
|
||||
await api.appendEvent(options.runId, { type: "error", payload: { failureKind, message, phase: "runner:execute", attemptId, runnerId: runner.id } });
|
||||
await api.reportCommandStatus(command.id, { terminalStatus, failureKind, failureMessage: message });
|
||||
const finalRun = await api.reportStatus(options.runId, { terminalStatus, failureKind, failureMessage: message }) as RunRecord;
|
||||
return { runner, commandId: command.id, terminalStatus, failureKind, run: finalRun } as JsonRecord;
|
||||
} finally {
|
||||
stopCancelWatch();
|
||||
}
|
||||
@@ -109,7 +119,6 @@ function watchCancellation(api: RunnerManagerApi, runId: string, commandId: stri
|
||||
}
|
||||
|
||||
async function reportCancelled(api: RunnerManagerApi, runId: string, commandId: string, runner: RunnerRecord, claimed: RunRecord, message: string): Promise<JsonRecord> {
|
||||
await api.appendEvent(runId, { type: "terminal_status", payload: { terminalStatus: "cancelled", failureKind: "cancelled", message } });
|
||||
await api.reportCommandStatus(commandId, { terminalStatus: "cancelled", failureKind: "cancelled", failureMessage: message });
|
||||
const finalRun = await api.reportStatus(runId, { terminalStatus: "cancelled", failureKind: "cancelled", failureMessage: message });
|
||||
return { runner, commandId, claimed, terminalStatus: "cancelled", failureKind: "cancelled", run: finalRun };
|
||||
|
||||
@@ -58,14 +58,17 @@ const selfTest: SelfTestCase = async (context) => {
|
||||
assert.equal(explicitModelResult.terminalStatus, "completed", "explicit command payload model should still be forwarded");
|
||||
|
||||
await runFailureCase({ client, managerUrl: server.baseUrl, context, mode: "missing-turn-result", expectedStatus: "failed", expectedFailureKind: "backend-response-invalid" });
|
||||
await runFailureCase({ client, managerUrl: server.baseUrl, context, mode: "provider-401-rpc-error", expectedStatus: "failed", expectedFailureKind: "provider-auth-failed" });
|
||||
await runFailureCase({ client, managerUrl: server.baseUrl, context, mode: "provider-429-terminal", expectedStatus: "failed", expectedFailureKind: "provider-rate-limited" });
|
||||
await runFailureCase({ client, managerUrl: server.baseUrl, context, mode: "provider-503-rpc-error", expectedStatus: "failed", expectedFailureKind: "provider-unavailable" });
|
||||
await runFailureCase({ client, managerUrl: server.baseUrl, context, mode: "provider-503-terminal", expectedStatus: "failed", expectedFailureKind: "provider-unavailable" });
|
||||
await runFailureCase({ client, managerUrl: server.baseUrl, context, mode: "provider-503-retry-event", expectedStatus: "failed", expectedFailureKind: "provider-unavailable", expectRetryError: true });
|
||||
await runFailureCase({ client, managerUrl: server.baseUrl, context, mode: "invalid-json", expectedStatus: "failed", expectedFailureKind: "backend-json-parse-error" });
|
||||
await runFailureCase({ client, managerUrl: server.baseUrl, context, mode: "missing-terminal", expectedStatus: "failed", expectedFailureKind: "backend-timeout", timeoutMs: 500 });
|
||||
await runSecretFailureCase({ client, managerUrl: server.baseUrl, context });
|
||||
await runSpawnFailureCase({ client, managerUrl: server.baseUrl, context });
|
||||
|
||||
return { name: "codex-stdio", tests: ["runner-lease-heartbeat", "codex-stdio-fake-turn", "codex-stdio-projected-writable-home", "codex-stdio-deepseek-profile-fake-turn", "codex-stdio-deepseek-missing-secret-no-fallback", "codex-stdio-config-model-authoritative", "codex-stdio-explicit-model-forwarded", "codex-stdio-missing-turn-result", "codex-stdio-provider-503-rpc-error", "codex-stdio-provider-503-terminal", "codex-stdio-provider-503-retry-event", "codex-stdio-invalid-json", "codex-stdio-timeout", "codex-stdio-spawn-failure"] };
|
||||
return { name: "codex-stdio", tests: ["runner-lease-heartbeat", "codex-stdio-fake-turn", "codex-stdio-projected-writable-home", "codex-stdio-deepseek-profile-fake-turn", "codex-stdio-deepseek-missing-secret-no-fallback", "codex-stdio-config-model-authoritative", "codex-stdio-explicit-model-forwarded", "codex-stdio-missing-turn-result", "codex-stdio-provider-auth-failed", "codex-stdio-provider-rate-limited", "codex-stdio-provider-503-rpc-error", "codex-stdio-provider-503-terminal", "codex-stdio-provider-503-retry-event", "codex-stdio-invalid-json", "codex-stdio-timeout", "codex-stdio-secret-unavailable", "codex-stdio-spawn-failure"] };
|
||||
} finally {
|
||||
await new Promise<void>((resolve) => server.server.close(() => resolve()));
|
||||
}
|
||||
@@ -101,6 +104,23 @@ function eventPayload(event: { payload: unknown }): JsonRecord {
|
||||
return typeof event.payload === "object" && event.payload !== null && !Array.isArray(event.payload) ? event.payload as JsonRecord : {};
|
||||
}
|
||||
|
||||
async function runSecretFailureCase(options: { client: ManagerClient; managerUrl: string; context: SelfTestContext }): Promise<void> {
|
||||
const item = await createRunWithCommand(options.client, options.context, "failure missing secret files", "selftest-secret-unavailable", 3_000);
|
||||
const result = await runOnce({
|
||||
managerUrl: options.managerUrl,
|
||||
runId: item.runId,
|
||||
codexCommand: options.context.fakeCodexCommand,
|
||||
codexArgs: options.context.fakeCodexArgs,
|
||||
codexHome: path.join(options.context.tmp, "missing-codex-home"),
|
||||
env: { CODEX_HOME: path.join(options.context.tmp, "missing-codex-home") },
|
||||
}) as JsonRecord;
|
||||
assert.equal(result.terminalStatus, "blocked", "secret unavailable");
|
||||
assert.equal(result.failureKind, "secret-unavailable", "secret unavailable");
|
||||
const run = await options.client.get(`/api/v1/runs/${item.runId}`) as { status?: string; failureKind?: string };
|
||||
assert.equal(run.status, "blocked", "secret unavailable");
|
||||
assert.equal(run.failureKind, "secret-unavailable", "secret unavailable");
|
||||
}
|
||||
|
||||
async function runSpawnFailureCase(options: { client: ManagerClient; managerUrl: string; context: SelfTestContext }): Promise<void> {
|
||||
const item = await createRunWithCommand(options.client, options.context, "failure spawn", "selftest-spawn-failure", 3_000);
|
||||
const result = await runOnce({
|
||||
|
||||
@@ -0,0 +1,176 @@
|
||||
import assert from "node:assert/strict";
|
||||
import { execFile as execFileCallback } from "node:child_process";
|
||||
import { promisify } from "node:util";
|
||||
import { chmod, mkdir, readFile, writeFile } from "node:fs/promises";
|
||||
import path from "node:path";
|
||||
import { startManagerServer } from "../../mgr/server.js";
|
||||
import { MemoryAgentRunStore } from "../../mgr/store.js";
|
||||
import { ManagerClient } from "../../mgr/client.js";
|
||||
import { runOnce } from "../../runner/run-once.js";
|
||||
import { eventContractSummary } from "../../common/events.js";
|
||||
import type { BackendProfile, JsonRecord, RunEvent } from "../../common/types.js";
|
||||
import { assertNoSecretLeak, createRunWithCommand, type SelfTestCase, type SelfTestContext } from "../harness.js";
|
||||
|
||||
const execFile = promisify(execFileCallback);
|
||||
|
||||
const selfTest: SelfTestCase = async (context) => {
|
||||
const fakeKubectl = path.join(context.tmp, "fake-kubectl-contract.js");
|
||||
const createdManifest = path.join(context.tmp, "created-contract-runner-job.json");
|
||||
await writeFile(fakeKubectl, `#!/usr/bin/env bun
|
||||
const chunks = [];
|
||||
for await (const chunk of Bun.stdin.stream()) chunks.push(chunk);
|
||||
const text = Buffer.concat(chunks.map((chunk) => Buffer.from(chunk))).toString("utf8");
|
||||
await Bun.write(${JSON.stringify(createdManifest)}, text);
|
||||
const manifest = JSON.parse(text);
|
||||
console.log(JSON.stringify({ apiVersion: manifest.apiVersion, kind: manifest.kind, metadata: { uid: "job-uid-contract", resourceVersion: "1", name: manifest.metadata.name, namespace: manifest.metadata.namespace } }));
|
||||
`);
|
||||
await chmod(fakeKubectl, 0o755);
|
||||
const store = new MemoryAgentRunStore();
|
||||
const server = await startManagerServer({
|
||||
port: 0,
|
||||
host: "127.0.0.1",
|
||||
sourceCommit: "self-test",
|
||||
store,
|
||||
runnerJobDefaults: {
|
||||
namespace: "agentrun-v01",
|
||||
managerUrl: "http://agentrun-mgr.agentrun-v01.svc.cluster.local:8080",
|
||||
image: "127.0.0.1:5000/agentrun/agentrun-mgr@sha256:1111111111111111111111111111111111111111111111111111111111111111",
|
||||
kubectlCommand: fakeKubectl,
|
||||
},
|
||||
});
|
||||
try {
|
||||
const client = new ManagerClient(server.baseUrl);
|
||||
await assertBackendPreflight(client);
|
||||
await assertEventContractAndCompletedSemantics(client, context, server.baseUrl);
|
||||
await assertRunnerJobStatus(client, context);
|
||||
await assertSessionProfileIsolation(client, context);
|
||||
await assertResourceBundleFailure(client, context, server.baseUrl);
|
||||
return { name: "hwlab-baseline-contract", tests: ["event-contract", "result-completed-terminal-only", "bounded-output-summary", "runner-job-status", "backend-preflight-redacted", "session-profile-isolation", "resource-bundle-failure-kind"] };
|
||||
} finally {
|
||||
await new Promise<void>((resolve) => server.server.close(() => resolve()));
|
||||
}
|
||||
};
|
||||
|
||||
async function assertBackendPreflight(client: ManagerClient): Promise<void> {
|
||||
const response = await client.get("/api/v1/backends") as { items?: JsonRecord[] };
|
||||
const items = response.items ?? [];
|
||||
assert.ok(items.length >= 2, "codex/deepseek backend capabilities should be visible");
|
||||
for (const item of items) {
|
||||
const preflight = item.preflight as JsonRecord;
|
||||
const defaultSecretRef = item.defaultSecretRef as JsonRecord;
|
||||
assert.equal(preflight.valuesPrinted, false);
|
||||
assert.equal(preflight.credentialValuesPrinted, false);
|
||||
assert.equal(defaultSecretRef.valuesPrinted, false);
|
||||
assert.ok(Array.isArray(preflight.requiredSecretKeys));
|
||||
}
|
||||
assertNoSecretLeak(response);
|
||||
}
|
||||
|
||||
async function assertEventContractAndCompletedSemantics(client: ManagerClient, context: SelfTestContext, managerUrl: string): Promise<void> {
|
||||
const happy = await createRunWithCommand(client, context, "hello event contract", "selftest-event-contract", 15_000);
|
||||
await client.post(`/api/v1/runs/${happy.runId}/events`, { type: "tool_call", payload: { method: "selftest/tool", item: { command: "echo ok" } } });
|
||||
await client.post(`/api/v1/runs/${happy.runId}/events`, { type: "diff", payload: { filesChanged: 1, summary: "selftest diff" } });
|
||||
const result = await runOnce({ managerUrl, runId: happy.runId, codexCommand: context.fakeCodexCommand, codexArgs: context.fakeCodexArgs, codexHome: context.codexHome, env: { CODEX_HOME: context.codexHome } });
|
||||
assert.equal(result.terminalStatus, "completed");
|
||||
const eventsResponse = await client.get(`/api/v1/runs/${happy.runId}/events?afterSeq=0&limit=100`) as { items?: RunEvent[] };
|
||||
const events = eventsResponse.items ?? [];
|
||||
const summary = eventContractSummary(events);
|
||||
assert.equal(summary.ok, true, JSON.stringify(summary.issues));
|
||||
const types = new Set(events.map((event) => event.type));
|
||||
for (const type of ["backend_status", "assistant_message", "tool_call", "command_output", "diff", "terminal_status"]) assert.ok(types.has(type as never), `missing event type ${type}`);
|
||||
const envelope = await client.get(`/api/v1/runs/${happy.runId}/commands/${happy.commandId}/result`) as JsonRecord;
|
||||
assert.equal(envelope.completed, true);
|
||||
assert.equal(envelope.terminalStatus, "completed");
|
||||
assert.equal(envelope.terminalSource, "terminal_status-event");
|
||||
assertNoSecretLeak({ eventsResponse, envelope });
|
||||
|
||||
const partial = await createRunWithCommand(client, context, "partial should not complete", "selftest-partial-not-completed", 15_000);
|
||||
const largeOutput = `${"x".repeat(6_000)}\nAuthorization: Bearer test-token\n`;
|
||||
await client.post(`/api/v1/runs/${partial.runId}/events`, { type: "assistant_message", payload: { text: "partial assistant only" } });
|
||||
await client.post(`/api/v1/runs/${partial.runId}/events`, { type: "command_output", payload: { stream: "stdout", text: largeOutput } });
|
||||
const partialEnvelope = await client.get(`/api/v1/runs/${partial.runId}/commands/${partial.commandId}/result`) as JsonRecord;
|
||||
const artifactSummary = partialEnvelope.artifactSummary as JsonRecord;
|
||||
const stdoutSummary = artifactSummary.stdoutSummary as JsonRecord;
|
||||
assert.equal(partialEnvelope.completed, false);
|
||||
assert.equal(partialEnvelope.terminalStatus, null);
|
||||
assert.equal(stdoutSummary.outputTruncated, true);
|
||||
assert.equal(typeof stdoutSummary.outputBytes, "number");
|
||||
assertNoSecretLeak(partialEnvelope);
|
||||
}
|
||||
|
||||
async function assertRunnerJobStatus(client: ManagerClient, context: SelfTestContext): Promise<void> {
|
||||
const item = await createRunWithCommand(client, context, "runner job status", "selftest-runner-job-status", 15_000);
|
||||
await client.post(`/api/v1/runs/${item.runId}/runner-jobs`, { commandId: item.commandId, idempotencyKey: "selftest-runner-job-status" }) as JsonRecord;
|
||||
const manifest = JSON.parse(await readFile(path.join(context.tmp, "created-contract-runner-job.json"), "utf8")) as JsonRecord;
|
||||
assert.equal(manifest.kind, "Job");
|
||||
const list = await client.get(`/api/v1/runs/${item.runId}/runner-jobs`) as { items?: JsonRecord[]; count?: number };
|
||||
assert.equal(list.count, 1);
|
||||
const status = list.items?.[0] ?? {};
|
||||
assert.equal(status.runId, item.runId);
|
||||
assert.equal(status.commandId, item.commandId);
|
||||
assert.equal(status.phase, "created");
|
||||
assert.equal(status.valuesPrinted, false);
|
||||
assert.equal(typeof status.logPath, "string");
|
||||
const single = await client.get(`/api/v1/runs/${item.runId}/runner-jobs/${String(status.id)}`) as JsonRecord;
|
||||
assert.equal(single.jobName, status.jobName);
|
||||
assertNoSecretLeak({ list, single });
|
||||
}
|
||||
|
||||
async function assertSessionProfileIsolation(client: ManagerClient, context: SelfTestContext): Promise<void> {
|
||||
const first = await client.post("/api/v1/runs", runPayload(context, "codex", "selftest-profile-boundary-session")) as { id: string };
|
||||
await client.patch(`/api/v1/runs/${first.id}/status`, { terminalStatus: "completed", failureKind: null, failureMessage: null, threadId: "thread_codex_profile_boundary", turnId: "turn_profile_boundary" });
|
||||
await assert.rejects(
|
||||
() => client.post("/api/v1/runs", runPayload(context, "deepseek", "selftest-profile-boundary-session")),
|
||||
(error) => error instanceof Error && error.message.includes("backendProfile boundary"),
|
||||
);
|
||||
}
|
||||
|
||||
async function assertResourceBundleFailure(client: ManagerClient, context: SelfTestContext, managerUrl: string): Promise<void> {
|
||||
const repo = await createLocalGitRepo(context);
|
||||
const run = await client.post("/api/v1/runs", {
|
||||
...runPayload(context, "codex", "selftest-bad-bundle-session"),
|
||||
resourceBundleRef: { kind: "git", repoUrl: repo.repoUrl, commitId: "0000000000000000000000000000000000000000", submodules: false, lfs: false },
|
||||
}) as { id: string };
|
||||
const command = await client.post(`/api/v1/runs/${run.id}/commands`, { type: "turn", payload: { prompt: "bad bundle" }, idempotencyKey: "selftest-bad-bundle" }) as { id: string };
|
||||
const result = await runOnce({ managerUrl, runId: run.id, commandId: command.id, codexCommand: context.fakeCodexCommand, codexArgs: context.fakeCodexArgs, codexHome: context.codexHome, env: { CODEX_HOME: context.codexHome, AGENTRUN_WORKSPACE_ROOT: path.join(context.tmp, "bad-bundle-workspaces") } }) as JsonRecord;
|
||||
assert.equal(result.terminalStatus, "failed");
|
||||
assert.equal(result.failureKind, "infra-failed");
|
||||
const envelope = await client.get(`/api/v1/runs/${run.id}/commands/${command.id}/result`) as JsonRecord;
|
||||
assert.equal(envelope.completed, false);
|
||||
assert.equal(envelope.failureKind, "infra-failed");
|
||||
const commandRecord = await client.get(`/api/v1/runs/${run.id}/commands/${command.id}`) as { state?: string };
|
||||
assert.equal(commandRecord.state, "failed");
|
||||
}
|
||||
|
||||
function runPayload(context: SelfTestContext, backendProfile: BackendProfile, sessionId: string): JsonRecord {
|
||||
const secretHome = backendProfile === "deepseek" ? context.deepseekHome : context.codexHome;
|
||||
return {
|
||||
tenantId: "hwlab",
|
||||
projectId: "pikasTech/HWLAB",
|
||||
workspaceRef: { kind: "opaque", repo: "pikasTech/HWLAB" },
|
||||
sessionRef: { sessionId, conversationId: sessionId },
|
||||
providerId: "G14",
|
||||
backendProfile,
|
||||
executionPolicy: {
|
||||
sandbox: "workspace-write",
|
||||
approval: "never",
|
||||
timeoutMs: 15_000,
|
||||
network: "default",
|
||||
secretScope: { allowCredentialEcho: false, providerCredentials: [{ profile: backendProfile, secretRef: { name: `agentrun-v01-provider-${backendProfile}`, keys: ["auth.json", "config.toml"], mountPath: secretHome } }] },
|
||||
},
|
||||
traceSink: { kind: "hwlab", traceId: sessionId },
|
||||
};
|
||||
}
|
||||
|
||||
async function createLocalGitRepo(context: SelfTestContext): Promise<{ repoUrl: string; commitId: string }> {
|
||||
const repo = path.join(context.tmp, "contract-bundle-repo");
|
||||
await mkdir(repo, { recursive: true });
|
||||
await execFile("git", ["init"], { cwd: repo });
|
||||
await writeFile(path.join(repo, "README.md"), "AgentRun contract bundle self-test\n", "utf8");
|
||||
await execFile("git", ["add", "README.md"], { cwd: repo });
|
||||
await execFile("git", ["-c", "user.email=selftest@example.invalid", "-c", "user.name=AgentRun SelfTest", "commit", "-m", "contract bundle selftest"], { cwd: repo });
|
||||
const { stdout } = await execFile("git", ["rev-parse", "HEAD"], { cwd: repo });
|
||||
return { repoUrl: repo, commitId: stdout.trim() };
|
||||
}
|
||||
|
||||
export default selfTest;
|
||||
@@ -66,6 +66,10 @@ for await (const line of rl) {
|
||||
respond(message.id, null, { code: -32000, message: "responseStreamDisconnected: HTTP 503 Service Unavailable from provider" });
|
||||
continue;
|
||||
}
|
||||
if (mode === "provider-401-rpc-error") {
|
||||
respond(message.id, null, { code: -32000, message: "HTTP 401 Unauthorized: invalid api key" });
|
||||
continue;
|
||||
}
|
||||
if (mode === "missing-terminal") {
|
||||
turnCounter += 1;
|
||||
const turn = { id: `turn_selftest_${turnCounter}`, status: "running" };
|
||||
@@ -81,6 +85,14 @@ for await (const line of rl) {
|
||||
respond(message.id, { turn });
|
||||
continue;
|
||||
}
|
||||
if (mode === "provider-429-terminal") {
|
||||
turnCounter += 1;
|
||||
const turn = { id: `turn_selftest_${turnCounter}`, status: "failed", error: { message: "HTTP 429 Too Many Requests: rate limit exceeded" } };
|
||||
notify("turn/started", { turn: { id: turn.id, status: "running" } });
|
||||
notify("turn/completed", { turn });
|
||||
respond(message.id, { turn });
|
||||
continue;
|
||||
}
|
||||
if (mode === "provider-503-retry-event") {
|
||||
turnCounter += 1;
|
||||
const turn = {
|
||||
|
||||
Reference in New Issue
Block a user