From f4ee644233681915c78b56ac0c7e5bd2b257ed06 Mon Sep 17 00:00:00 2001 From: Codex Date: Mon, 1 Jun 2026 13:43:27 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E8=A1=A5=E9=BD=90=20HWLAB=20=E5=9F=BA?= =?UTF-8?q?=E7=BA=BF=20AgentRun=20=E6=89=A7=E8=A1=8C=E5=85=83=E8=AF=AD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- docs/reference/spec-v01-agentrun-mgr.md | 6 +- docs/reference/spec-v01-cli.md | 4 + docs/reference/spec-v01-validation.md | 2 + scripts/src/cli.ts | 19 ++ src/backend/codex-stdio.ts | 12 +- src/common/backend-profiles.ts | 23 ++- src/common/events.ts | 88 +++++++++ src/common/output.ts | 62 ++++++ src/mgr/postgres-store.ts | 20 +- src/mgr/result.ts | 25 ++- src/mgr/runner-job-status.ts | 53 ++++++ src/mgr/server.ts | 17 ++ src/mgr/store.ts | 19 +- src/runner/run-once.ts | 13 +- src/selftest/cases/30-codex-stdio.ts | 22 ++- .../cases/60-hwlab-baseline-contract.ts | 176 ++++++++++++++++++ src/selftest/fake-codex-app-server.ts | 12 ++ 17 files changed, 555 insertions(+), 18 deletions(-) create mode 100644 src/common/events.ts create mode 100644 src/common/output.ts create mode 100644 src/mgr/runner-job-status.ts create mode 100644 src/selftest/cases/60-hwlab-baseline-contract.ts diff --git a/docs/reference/spec-v01-agentrun-mgr.md b/docs/reference/spec-v01-agentrun-mgr.md index 15e0c4e..540b421 100644 --- a/docs/reference/spec-v01-agentrun-mgr.md +++ b/docs/reference/spec-v01-agentrun-mgr.md @@ -39,6 +39,8 @@ POST /api/v1/runs/:runId/commands GET /api/v1/runs/:runId/commands/:commandId GET /api/v1/runs/:runId/commands/:commandId/result POST /api/v1/runs/:runId/runner-jobs +GET /api/v1/runs/:runId/runner-jobs?commandId= +GET /api/v1/runs/:runId/runner-jobs/:runnerJobId POST /api/v1/commands/:commandId/cancel GET /api/v1/backends ``` @@ -101,7 +103,7 @@ Manager 只承接 HWLAB v0.2 Code Agent 的通用执行事实,不承接 HWLAB ## 最小 Observability 合同 - events append-only,单 run 内 `seq` 单调递增。 -- 每个 run 必须最终出现 `terminal_status`,或保持明确 non-terminal status 并可查询 lease/heartbeat。 +- 每个 run 必须最终出现唯一 authoritative `terminal_status`,或保持明确 non-terminal status 并可查询 lease/heartbeat;assistant partial、stdout、transport close 或 idle timeout 不能替代 terminal completed。 - failureKind 至少能区分 `schema-invalid`、`tenant-policy-denied`、`secret-unavailable`、`runner-lease-conflict`、`backend-failed`、`provider-auth-failed`、`provider-unavailable`、`infra-failed`、`cancelled`。 - health/readiness 必须返回 Postgres reachable、schema migration ready、SecretRef redacted 状态和 build/source metadata。 - 日志、event、trace、health 和 diagnostics 不得输出 provider credential、Codex auth/config 内容、DSN password、token 或 URL credential。 @@ -114,6 +116,7 @@ Manager 只承接 HWLAB v0.2 Code Agent 的通用执行事实,不承接 HWLAB | --- | --- | | `status` | run/command 当前聚合状态,只能由 command state 和 terminal_status 推导。 | | `terminalStatus` | `completed`、`failed`、`blocked` 或 `cancelled`;没有 terminal event 时为 `null` 或 equivalent running 状态。 | +| `completed` / `terminalSource` | `completed=true` 只能来自 terminal completed;`terminalSource` 标明来自 `terminal_status` event、run record 或暂无 terminal。 | | `reply` | 从 `assistant_message` 聚合的最终用户可见文本;没有 terminal completed 时不得伪造 completed reply。 | | `failureKind` / `blocker` | 结构化失败分类和摘要;必须 redacted。 | | `lastSeq` / `eventCount` | 支持调用方增量轮询和 result/trace reconciliation。 | @@ -151,6 +154,7 @@ Manager 只承接 HWLAB v0.2 Code Agent 的通用执行事实,不承接 HWLAB | `agentrun-mgr` 服务规格 | 已定义 | 本文为 v0.1 manager 权威。 | | Manager REST API | 已实现/已通过主闭环 | 已有 run、command、event、backends、runner register、claim、lease heartbeat、poll、ack、status、runner Job 创建和 health/readiness 的 HTTP JSON API;真实 runtime 已通过 RESTful API 主闭环。 | | 手动 runner Job API | 已实现 | `POST /api/v1/runs/:runId/runner-jobs` 已可创建 Kubernetes runner Job,并固化 idempotency、持久 runner job record、响应 schema 和 cancel 前置检查。 | +| runner Job 状态查询 | 已实现 | `GET /api/v1/runs/:runId/runner-jobs` 和 `GET /api/v1/runs/:runId/runner-jobs/:runnerJobId` 返回 attempt/job/log/phase/terminal 摘要,业务客户端无需直连 Kubernetes 做最小定位。 | | Tenant policy boundary | 已实现最小边界 | v0.1 已做 schema、tenant/backend allowlist、executionPolicy 和 secretScope 结构校验;业务授权仍由 UniDesk/HWLAB 自己判定。 | | `deepseek` backendProfile allowlist | 已实现/已通过主闭环 | Manager validation、backend capability 和 matching SecretRef 校验已支持 `deepseek`;真实 runtime 已经通过 CI/CD 发布并确认 Postgres migration `002_v01_backend_profiles` 应用。 | | Postgres durable adapter | 已实现/已通过主闭环 | live runtime 通过 `DATABASE_URL` 使用 Postgres durable store;memory store 仅用于显式 self-test/dev。见 [spec-v01-postgres.md](spec-v01-postgres.md)。 | diff --git a/docs/reference/spec-v01-cli.md b/docs/reference/spec-v01-cli.md index a9c0476..e42b2ae 100644 --- a/docs/reference/spec-v01-cli.md +++ b/docs/reference/spec-v01-cli.md @@ -44,6 +44,8 @@ CLI 官方 TypeScript 入口固定为 `scripts/agentrun-cli.ts`。在 G14 非交 ./scripts/agentrun runner start --run-id --backend ./scripts/agentrun runner job --run-id --command-id [--idempotency-key ] ./scripts/agentrun runner job --dry-run --run-id --command-id --image +./scripts/agentrun runner jobs --run-id [--command-id ] +./scripts/agentrun runner job-status [runnerJobId] --run-id ./scripts/agentrun secrets codex render --dry-run [--profile codex|deepseek] [--codex-home ] ./scripts/agentrun backends list ./scripts/agentrun server start|status @@ -53,6 +55,7 @@ CLI 官方 TypeScript 入口固定为 `scripts/agentrun-cli.ts`。在 G14 非交 - 创建类命令返回 `runId`、`commandId`、status 和下一步 poll command。 - `runner start` 返回 attemptId、job/process identity、logPath 和后续 status/events 命令。 +- `runner jobs` / `runner job-status` 返回 manager 持久化的 runner Job 最小状态摘要,包括 attemptId、runnerId、namespace、jobName、phase、terminalStatus、logPath、retention 和 redacted Kubernetes identity;业务方不需要直连 Kubernetes 才能定位当前 attempt。 - 查询类命令返回当前 state、terminal_status、failureKind、event cursor 或 logPath。 - `events` 默认分页且有界,必须支持 `afterSeq` 和 `limit`。 - `server logs` 返回有界日志摘要,并指向完整日志文件或 Kubernetes pod identity。 @@ -96,6 +99,7 @@ CLI 官方 TypeScript 入口固定为 `scripts/agentrun-cli.ts`。在 G14 非交 | `scripts/agentrun-cli.ts` | 已实现 | 已提供 run/command/event/backend/server 基础命令和 JSON envelope;`scripts/agentrun` 是同一入口的 Bun 定位 launcher。 | | CLI 调 manager REST | 已实现 | CLI 通过 `ManagerClient` 调 manager REST;自测试可用内存 manager,综合联调必须指向真实 `agentrun-v01` manager。 | | runner start/job | 已实现 | `runner start` 可执行 host process runner;`runner job --dry-run` 可渲染 Kubernetes Job JSON;`runner job` 正式路径通过 manager REST 创建 Kubernetes Job,支持 `--idempotency-key` 并快速返回 job identity、SecretRef、retention 和轮询命令。 | +| runner jobs/job-status | 已实现 | CLI 通过 manager REST 查询 runner Job 持久记录和最小状态摘要,不直连 Kubernetes、不读取 Secret 值。 | | result/cancel CLI | 已实现 | `runs result`、`commands result`、`runs cancel` 和 `commands cancel` 均调用 manager REST,不维护独立状态。 | | CLI 测试规格 | 已定义/已验证主闭环 | 综合联调见 [spec-v01-validation.md](spec-v01-validation.md);每次发布仍按手动交互验收复跑。 | | `deepseek` profile CLI | 已实现/已通过主闭环 | `secrets codex render --profile deepseek`、`backends list`、`runner start --backend`、`runner job` 和 JSON 错误可见性已实现;真实 CLI/RESTful 联调已通过 `codex -> deepseek -> codex` 切换主闭环。 | diff --git a/docs/reference/spec-v01-validation.md b/docs/reference/spec-v01-validation.md index 05c2982..12e2e22 100644 --- a/docs/reference/spec-v01-validation.md +++ b/docs/reference/spec-v01-validation.md @@ -23,6 +23,7 @@ - CLI:默认 JSON、空 stdout 失败、长操作短返回、错误结构化。 - Postgres adapter:migration、事务、run/command/event round-trip、重启后可查询。 - Secret 分发:SecretRef schema、missing secret failure、redaction。 +- HWLAB v0.2 基线承接:可以用 fake backend/临时 manager 做组件自测试,覆盖 event contract、result completed 防误判、bounded output、runner job status、SessionRef profile 隔离、ResourceBundleRef 失败分类和 backend preflight redaction;这些自测试不能替代真实 `agentrun-v01` CLI 交互验收。 自测试应使用 Bun + TypeScript 运行,Codex 相关自测试可以使用 fake app-server JSON-RPC client 模拟 `initialize`、`thread/start`、`thread/resume`、`turn/start`、assistant 输出、协议错误、timeout 和 transport close。 @@ -189,6 +190,7 @@ T8 是涉及 backend profile 变更时的综合联调标准;不涉及 backend | --- | --- | --- | | 两层验证模型 | 已定义 | 本文为 v0.1 验证权威。 | | 自测试 task | 已实现 | `src/selftest/run.ts` 自动发现 `src/selftest/cases/*.ts`;覆盖 redaction/Postgres contract、manager memory、runner Job render/create、Codex fake app-server stdio 和 Secret render。 | +| HWLAB 基线承接自测试 | 已实现 | `src/selftest/cases/60-hwlab-baseline-contract.ts` 固化 event/result/failureKind/bounded output/runner job status/session/profile/bundle/preflight 的组件合同;综合联调仍必须走正式 AgentRun CLI 且不能使用 mock。 | | 综合联调验收规格 | 已增强 | 本文保留人工交互验收模型;T4-T8 定义 CLI、RESTful、一致性、负向场景和 backend profile 切换的手动验收标准,不新增自动脚本或门禁。 | | CLI 交互联调标准 | 已定义 | 必须只使用正式 CLI,验证真实 run 生命周期和可观测输出。 | | RESTful API 交互联调标准 | 已定义 | 必须直连真实 manager HTTP JSON API,验证服务合同和 durable facts。 | diff --git a/scripts/src/cli.ts b/scripts/src/cli.ts index b8ee12b..66fae35 100644 --- a/scripts/src/cli.ts +++ b/scripts/src/cli.ts @@ -80,9 +80,26 @@ async function dispatch(args: ParsedArgs): Promise { return runOnce(options) as unknown as JsonValue; } if (group === "runner" && command === "job") return renderRunnerJob(args); + if (group === "runner" && command === "jobs") return listRunnerJobs(args); + if (group === "runner" && command === "job-status") return showRunnerJobStatus(args); throw new AgentRunError("schema-invalid", `unsupported command: ${args.positional.join(" ")}`, { httpStatus: 2 }); } +async function listRunnerJobs(args: ParsedArgs): Promise { + const runId = flag(args, "run-id", ""); + if (!runId) throw new AgentRunError("schema-invalid", "runner jobs requires --run-id", { httpStatus: 2 }); + const commandId = optionalFlag(args, "command-id"); + return client(args).get(`/api/v1/runs/${encodeURIComponent(runId)}/runner-jobs${commandId ? `?commandId=${encodeURIComponent(commandId)}` : ""}`); +} + +async function showRunnerJobStatus(args: ParsedArgs): Promise { + const runId = flag(args, "run-id", ""); + if (!runId) throw new AgentRunError("schema-invalid", "runner job-status requires --run-id", { httpStatus: 2 }); + const runnerJobId = args.positional[2] ?? optionalFlag(args, "runner-job-id"); + if (!runnerJobId) return listRunnerJobs(args); + return client(args).get(`/api/v1/runs/${encodeURIComponent(runId)}/runner-jobs/${encodeURIComponent(runnerJobId)}`); +} + async function renderRunnerJob(args: ParsedArgs): Promise { const runId = flag(args, "run-id", ""); const commandId = flag(args, "command-id", ""); @@ -221,6 +238,8 @@ function help(): JsonRecord { "runner start --run-id [--backend codex|deepseek]", "runner job --run-id --command-id [--image ] [--runner-manager-url ] [--idempotency-key ]", "runner job --dry-run --run-id --command-id --image ", + "runner jobs --run-id [--command-id ]", + "runner job-status [runnerJobId] --run-id ", "secrets codex render --dry-run [--profile codex|deepseek] [--codex-home ] [--namespace agentrun-v01] [--secret-name ]", "backends list", "server start|status", diff --git a/src/backend/codex-stdio.ts b/src/backend/codex-stdio.ts index 6600d2f..a2ec0d3 100644 --- a/src/backend/codex-stdio.ts +++ b/src/backend/codex-stdio.ts @@ -7,6 +7,7 @@ import * as readline from "node:readline"; import type { BackendEvent, BackendProfile, BackendTurnResult, FailureKind, JsonRecord, JsonValue, TerminalStatus } from "../common/types.js"; import { redactJson, redactText } from "../common/redaction.js"; import { backendProfileSpec } from "../common/backend-profiles.js"; +import { boundedTextSummary, commandOutputPayload } from "../common/output.js"; const codexProtocol = "codex-app-server-jsonrpc-stdio"; const defaultCodexArgs = ["app-server", "--listen", "stdio://"]; @@ -437,8 +438,8 @@ function normalizeCodexNotification(message: JsonRecord): { events: BackendEvent return { events: [{ type: "backend_status", payload: { phase: method, turnId } }], ...(turnId ? { turnId } : {}) }; } if (method === "item/agentMessage/delta") return { events: [], assistantDelta: typeof params.delta === "string" ? params.delta : "" }; - if (method === "item/commandExecution/outputDelta") return { events: [{ type: "command_output", payload: { stream: "stdout", text: typeof params.delta === "string" ? params.delta : "" } }] }; - if (method === "item/started" || method === "item/completed") return { events: [{ type: "tool_call", payload: { method, item: redactJson(asRecordAt(params, "item")) } }] }; + if (method === "item/commandExecution/outputDelta") return { events: [{ type: "command_output", payload: commandOutputPayload("stdout", typeof params.delta === "string" ? params.delta : "") }] }; + if (method === "item/started" || method === "item/completed") return { events: [{ type: "tool_call", payload: toolCallPayload(method, asRecordAt(params, "item")) }] }; if (method === "error") { const error = asRecordAt(params, "error"); const messageText = typeof error.message === "string" ? error.message : "Codex app-server error"; @@ -469,6 +470,13 @@ function terminalStatusFromValue(value: unknown): TerminalStatus { return "failed"; } +function toolCallPayload(method: string, item: JsonRecord): JsonRecord { + const redacted = redactJson(item); + const summary = boundedTextSummary(JSON.stringify(redacted)); + if (summary.outputTruncated !== true) return { method, item: redacted, summary, outputBytes: summary.outputBytes, outputTruncated: false }; + return { method, itemPreview: summary.text, summary, outputBytes: summary.outputBytes, outputTruncated: true }; +} + function withOptionalModel(params: JsonRecord, model: string | undefined): JsonRecord { const value = typeof model === "string" ? model.trim() : ""; if (!value) return params; diff --git a/src/common/backend-profiles.ts b/src/common/backend-profiles.ts index de33d9d..1c23ee1 100644 --- a/src/common/backend-profiles.ts +++ b/src/common/backend-profiles.ts @@ -51,6 +51,7 @@ export function isBackendProfile(value: string): value is BackendProfile { } export function backendCapability(spec: BackendProfileSpec): JsonRecord { + const defaultSecretRef = { name: spec.defaultSecretName, keys: [...spec.requiredSecretKeys], valuesPrinted: false }; return { profile: spec.profile, backendKind: spec.backendKind, @@ -59,12 +60,32 @@ export function backendCapability(spec: BackendProfileSpec): JsonRecord { command: spec.command, status: spec.status, requiredSecretKeys: [...spec.requiredSecretKeys], - defaultSecretRef: { name: spec.defaultSecretName, keys: [...spec.requiredSecretKeys] }, + defaultSecretRef, + preflight: { + readiness: "requires-profile-secret-ref", + defaultSecretRef, + requiredSecretKeys: [...spec.requiredSecretKeys], + profileIsolation: spec.profileIsolation, + credentialValuesPrinted: false, + valuesPrinted: false, + }, profileIsolation: spec.profileIsolation, description: spec.description, }; } +export function mergeBackendCapability(profile: string, storedCapabilities: JsonRecord): JsonRecord { + const spec = backendProfileSpec(profile); + if (!spec) return { profile, ...storedCapabilities, valuesPrinted: false }; + const base = backendCapability(spec); + return { + ...base, + ...storedCapabilities, + defaultSecretRef: base.defaultSecretRef, + preflight: base.preflight, + }; +} + export function backendCapabilities(): JsonRecord[] { return backendProfileSpecs.map(backendCapability); } diff --git a/src/common/events.ts b/src/common/events.ts new file mode 100644 index 0000000..75a6841 --- /dev/null +++ b/src/common/events.ts @@ -0,0 +1,88 @@ +import { AgentRunError } from "./errors.js"; +import type { EventType, JsonRecord, RunEvent, TerminalStatus } from "./types.js"; +import { boundedTextSummary, commandOutputPayload } from "./output.js"; +import { redactJson } from "./redaction.js"; + +export const eventTypes = ["backend_status", "assistant_message", "tool_call", "command_output", "diff", "error", "terminal_status"] as const satisfies readonly EventType[]; +export const terminalStatuses = ["completed", "failed", "blocked", "cancelled"] as const satisfies readonly TerminalStatus[]; + +const eventTypeSet = new Set(eventTypes); +const terminalStatusSet = new Set(terminalStatuses); + +export function isEventType(value: unknown): value is EventType { + return typeof value === "string" && eventTypeSet.has(value); +} + +export function isTerminalStatus(value: unknown): value is TerminalStatus { + return typeof value === "string" && terminalStatusSet.has(value); +} + +export function requireEventType(value: unknown): EventType { + if (isEventType(value)) return value; + throw new AgentRunError("schema-invalid", `event.type ${String(value)} is not supported`, { httpStatus: 400, details: { allowedEventTypes: [...eventTypes] } }); +} + +export function normalizeRunEventPayload(type: EventType, payload: JsonRecord): JsonRecord { + if (type === "terminal_status") return normalizeTerminalStatusPayload(payload); + if (type === "command_output") return normalizeCommandOutputPayload(payload); + if (type === "assistant_message") return normalizeTextPayload(payload); + if (type === "tool_call") return normalizeToolCallPayload(payload); + return payload; +} + +export function eventContractSummary(events: RunEvent[]): JsonRecord { + const issues: JsonRecord[] = []; + let terminalStatusCount = 0; + for (let index = 0; index < events.length; index += 1) { + const event = events[index]; + if (!eventTypeSet.has(event.type)) issues.push({ code: "event-type-invalid", seq: event.seq, type: event.type }); + if (event.seq !== index + 1) issues.push({ code: "seq-not-contiguous", expectedSeq: index + 1, actualSeq: event.seq }); + if (event.type === "terminal_status") { + terminalStatusCount += 1; + if (!isTerminalStatus(event.payload.terminalStatus)) issues.push({ code: "terminal-status-invalid", seq: event.seq, terminalStatus: String(event.payload.terminalStatus ?? "") }); + } + } + if (terminalStatusCount > 1) issues.push({ code: "terminal-status-duplicated", terminalStatusCount }); + return { + ok: issues.length === 0, + eventCount: events.length, + lastSeq: events.at(-1)?.seq ?? 0, + terminalStatusCount, + issues, + }; +} + +function normalizeTerminalStatusPayload(payload: JsonRecord): JsonRecord { + if (!isTerminalStatus(payload.terminalStatus)) { + throw new AgentRunError("schema-invalid", "terminal_status event requires terminalStatus completed|failed|blocked|cancelled", { httpStatus: 400, details: { allowedTerminalStatuses: [...terminalStatuses] } }); + } + return payload; +} + +function normalizeCommandOutputPayload(payload: JsonRecord): JsonRecord { + const { text: _text, delta: _delta, content: _content, summary: _summary, ...rest } = payload; + const value = typeof payload.text === "string" ? payload.text : typeof payload.delta === "string" ? payload.delta : typeof payload.content === "string" ? payload.content : ""; + const stream = typeof payload.stream === "string" ? payload.stream : "stdout"; + return { ...rest, ...commandOutputPayload(stream, value) }; +} + +function normalizeTextPayload(payload: JsonRecord): JsonRecord { + const { text: _text, delta: _delta, content: _content, summary: _summary, ...rest } = payload; + const value = typeof payload.text === "string" ? payload.text : typeof payload.delta === "string" ? payload.delta : typeof payload.content === "string" ? payload.content : ""; + const summary = boundedTextSummary(value); + return { ...rest, text: summary.text, summary, textBytes: summary.textBytes, textTruncated: summary.textTruncated }; +} + +function normalizeToolCallPayload(payload: JsonRecord): JsonRecord { + const redacted = redactJson(payload); + const json = JSON.stringify(redacted); + const summary = boundedTextSummary(json); + if (summary.outputTruncated !== true) return { ...redacted, summary, outputBytes: summary.outputBytes, outputTruncated: false }; + return { + method: typeof payload.method === "string" ? payload.method : null, + itemPreview: summary.text, + summary, + outputBytes: summary.outputBytes, + outputTruncated: true, + }; +} diff --git a/src/common/output.ts b/src/common/output.ts new file mode 100644 index 0000000..2256afe --- /dev/null +++ b/src/common/output.ts @@ -0,0 +1,62 @@ +import type { JsonRecord } from "./types.js"; +import { redactText } from "./redaction.js"; + +const defaultOutputLimitChars = 4_000; + +export interface BoundedTextOptions { + limitChars?: number; +} + +export function boundedTextSummary(value: unknown, options: BoundedTextOptions = {}): JsonRecord { + const limitChars = positiveLimit(options.limitChars ?? defaultOutputLimitChars); + const redacted = redactText(typeof value === "string" ? value : value === null || value === undefined ? "" : String(value)); + const outputBytes = Buffer.byteLength(redacted, "utf8"); + const outputTruncated = redacted.length > limitChars; + const text = outputTruncated ? redacted.slice(0, limitChars) : redacted; + return { + text, + textChars: redacted.length, + textBytes: outputBytes, + outputBytes, + limitChars, + textTruncated: outputTruncated, + outputTruncated, + }; +} + +export function commandOutputPayload(stream: string, value: unknown, options: BoundedTextOptions = {}): JsonRecord { + const summary = boundedTextSummary(value, options); + return { + stream: stream === "stderr" ? "stderr" : "stdout", + text: summary.text, + summary, + outputBytes: summary.outputBytes, + outputTruncated: summary.outputTruncated, + textBytes: summary.textBytes, + textTruncated: summary.textTruncated, + }; +} + +export function outputBytesFromPayload(payload: JsonRecord): number { + for (const key of ["outputBytes", "textBytes"] as const) { + const value = payload[key]; + if (typeof value === "number" && Number.isFinite(value) && value >= 0) return value; + } + const summary = payload.summary; + if (typeof summary === "object" && summary !== null && !Array.isArray(summary)) { + const outputBytes = (summary as JsonRecord).outputBytes; + if (typeof outputBytes === "number" && Number.isFinite(outputBytes) && outputBytes >= 0) return outputBytes; + } + const text = typeof payload.text === "string" ? payload.text : ""; + return Buffer.byteLength(text, "utf8"); +} + +export function outputTruncatedFromPayload(payload: JsonRecord): boolean { + if (payload.outputTruncated === true || payload.textTruncated === true || payload.truncated === true) return true; + const summary = payload.summary; + return typeof summary === "object" && summary !== null && !Array.isArray(summary) && (summary as JsonRecord).outputTruncated === true; +} + +function positiveLimit(value: number): number { + return Number.isFinite(value) && value > 0 ? Math.floor(value) : defaultOutputLimitChars; +} diff --git a/src/mgr/postgres-store.ts b/src/mgr/postgres-store.ts index bbd1d40..d2dc43d 100644 --- a/src/mgr/postgres-store.ts +++ b/src/mgr/postgres-store.ts @@ -6,8 +6,9 @@ import { redactJson } from "../common/redaction.js"; import type { BackendProfile, BackendTurnResult, CommandRecord, CommandState, CreateCommandInput, CreateRunInput, EventType, FailureKind, JsonRecord, JsonValue, RunEvent, RunnerJobRecord, RunnerRecord, RunRecord, RunStatus, SessionRecord, SessionRef, TerminalStatus } from "../common/types.js"; import { newId, nowIso, stableHash } from "../common/validation.js"; import type { AgentRunStore, SaveRunnerJobInput, StoreHealth } from "./store.js"; -import { commandStateFromTerminal, isTerminalCommandState, isTerminalRunStatus, sessionRefFromRecord, statusFromTerminal, summarizeResourceBundleRef, summarizeSessionRef } from "./store.js"; -import { backendCapabilitiesSqlValues } from "../common/backend-profiles.js"; +import { assertSessionBoundary, commandStateFromTerminal, isTerminalCommandState, isTerminalRunStatus, sessionRefFromRecord, statusFromTerminal, summarizeResourceBundleRef, summarizeSessionRef } from "./store.js"; +import { backendCapabilitiesSqlValues, mergeBackendCapability } from "../common/backend-profiles.js"; +import { normalizeRunEventPayload, requireEventType } from "../common/events.js"; interface PostgresStoreOptions { connectionString: string; @@ -509,7 +510,10 @@ CREATE TABLE IF NOT EXISTS agentrun_schema_migrations ( async backends(): Promise { const result = await this.pool.query("SELECT * FROM agentrun_backends ORDER BY profile ASC"); - return result.rows.map((row) => ({ profile: stringValue(row.profile), ...jsonRecord(row.capabilities), capacity: jsonValue(row.capacity), health: jsonValue(row.health), updatedAt: nullableIso(row.updated_at) })); + return result.rows.map((row) => { + const profile = stringValue(row.profile); + return { ...mergeBackendCapability(profile, jsonRecord(row.capabilities)), capacity: jsonValue(row.capacity), health: jsonValue(row.health), updatedAt: nullableIso(row.updated_at) }; + }); } async close(): Promise { @@ -517,8 +521,10 @@ CREATE TABLE IF NOT EXISTS agentrun_schema_migrations ( } private async appendEventWithLockedRun(client: PoolClient, runId: string, type: EventType, payload: JsonRecord): Promise { + const eventType = requireEventType(type); + const eventPayload = normalizeRunEventPayload(eventType, payload); const seq = await this.nextSeq(client, "agentrun_events", runId); - const event: RunEvent = { id: newId("evt"), runId, seq, type, payload: redactJson(payload), createdAt: nowIso() }; + const event: RunEvent = { id: newId("evt"), runId, seq, type: eventType, payload: redactJson(eventPayload), createdAt: nowIso() }; await client.query("INSERT INTO agentrun_events (id, run_id, seq, type, payload, created_at) VALUES ($1, $2, $3, $4, $5::jsonb, $6)", [event.id, event.runId, event.seq, event.type, JSON.stringify(event.payload), event.createdAt]); return event; } @@ -538,7 +544,11 @@ CREATE TABLE IF NOT EXISTS agentrun_schema_migrations ( private async resolveSessionForRun(client: PoolClient, input: CreateRunInput, at: string): Promise { if (!input.sessionRef) return null; const existing = await client.query("SELECT * FROM agentrun_sessions WHERE session_id = $1 FOR UPDATE", [input.sessionRef.sessionId]); - if (existing.rows[0]) return sessionRefFromRecord(sessionFromRow(existing.rows[0]), input.sessionRef); + if (existing.rows[0]) { + const session = sessionFromRow(existing.rows[0]); + assertSessionBoundary(session, input); + return sessionRefFromRecord(session, input.sessionRef); + } const record: SessionRecord = { sessionId: input.sessionRef.sessionId, tenantId: input.tenantId, diff --git a/src/mgr/result.ts b/src/mgr/result.ts index 8b8bf91..02441ee 100644 --- a/src/mgr/result.ts +++ b/src/mgr/result.ts @@ -1,5 +1,6 @@ import type { AgentRunStore } from "./store.js"; import type { CommandRecord, JsonRecord, JsonValue, RunEvent, RunRecord, RunnerJobRecord, TerminalStatus } from "../common/types.js"; +import { outputBytesFromPayload, outputTruncatedFromPayload } from "../common/output.js"; export async function buildRunResult(store: AgentRunStore, runId: string, commandId?: string): Promise { const run = await store.getRun(runId); @@ -7,7 +8,9 @@ export async function buildRunResult(store: AgentRunStore, runId: string, comman const events = await store.listEvents(runId, 0, 500); const jobs = await store.listRunnerJobs(runId, command?.id); const latestJob = jobs.at(-1) ?? null; - const terminal = terminalFromEvents(events) ?? run.terminalStatus; + const terminalEventStatus = terminalFromEvents(events); + const terminal = terminalEventStatus ?? run.terminalStatus; + const terminalSource = terminalEventStatus ? "terminal_status-event" : run.terminalStatus ? "run-record" : "none"; const failureKind = run.failureKind ?? failureKindFromEvents(events); const reply = assistantReply(events); const blocker = terminal === "blocked" || terminal === "failed" ? { failureKind, message: run.failureMessage ?? messageFromEvents(events) } : null; @@ -22,6 +25,8 @@ export async function buildRunResult(store: AgentRunStore, runId: string, comman runStatus: run.status, commandState: command?.state ?? null, terminalStatus: terminal, + terminalSource, + completed: terminal === "completed", reply, failureKind, failureMessage: run.failureMessage ?? messageFromEvents(events), @@ -90,18 +95,30 @@ function artifactSummary(events: RunEvent[]): JsonRecord { let diffEvents = 0; let toolCallEvents = 0; let outputChars = 0; - let truncatedEvents = 0; + let outputBytes = 0; + let outputTruncatedEvents = 0; + const streamSummary: Record = { + stdout: { events: 0, outputBytes: 0, outputTruncated: false }, + stderr: { events: 0, outputBytes: 0, outputTruncated: false }, + }; for (const event of events) { if (event.type === "command_output") { commandOutputEvents += 1; const text = textPayload(event.payload); outputChars += text.length; - if (event.payload.truncated === true) truncatedEvents += 1; + const bytes = outputBytesFromPayload(event.payload); + outputBytes += bytes; + const truncated = outputTruncatedFromPayload(event.payload); + if (truncated) outputTruncatedEvents += 1; + const stream = event.payload.stream === "stderr" ? "stderr" : "stdout"; + streamSummary[stream].events += 1; + streamSummary[stream].outputBytes += bytes; + streamSummary[stream].outputTruncated ||= truncated; } if (event.type === "diff") diffEvents += 1; if (event.type === "tool_call") toolCallEvents += 1; } - return { commandOutputEvents, diffEvents, toolCallEvents, outputChars, truncatedEvents }; + return { commandOutputEvents, diffEvents, toolCallEvents, outputChars, outputBytes, truncatedEvents: outputTruncatedEvents, outputTruncatedEvents, stdoutSummary: streamSummary.stdout, stderrSummary: streamSummary.stderr }; } function attemptFromEvents(events: RunEvent[]): string | null { diff --git a/src/mgr/runner-job-status.ts b/src/mgr/runner-job-status.ts new file mode 100644 index 0000000..4ac68d2 --- /dev/null +++ b/src/mgr/runner-job-status.ts @@ -0,0 +1,53 @@ +import type { JsonRecord, RunEvent, RunnerJobRecord, TerminalStatus } from "../common/types.js"; + +export function runnerJobStatusSummary(job: RunnerJobRecord, events: RunEvent[] = []): JsonRecord { + const terminalEvent = latestTerminalEvent(events); + const runner = recordAt(job.result, "runner"); + const jobIdentity = recordAt(job.result, "jobIdentity"); + const kubernetes = recordAt(job.result, "kubernetes"); + const retention = recordAt(job.result, "retention"); + const terminalStatus = terminalEvent?.payload.terminalStatus; + return { + id: job.id, + runId: job.runId, + commandId: job.commandId, + attemptId: job.attemptId, + runnerId: job.runnerId, + namespace: job.namespace, + jobName: job.jobName, + managerUrl: job.managerUrl, + image: job.image, + sourceCommit: job.sourceCommit, + serviceAccountName: job.serviceAccountName, + phase: terminalStatus ? `terminal:${terminalStatus}` : kubernetes.created === true ? "created" : "recorded", + terminalStatus: isTerminalStatus(terminalStatus) ? terminalStatus : null, + failureKind: typeof terminalEvent?.payload.failureKind === "string" ? terminalEvent.payload.failureKind : null, + exitCode: null, + startedAt: null, + finishedAt: terminalEvent?.createdAt ?? null, + jobIdentity, + podIdentity: recordAt(job.result, "podIdentity"), + logPath: typeof runner.logPath === "string" ? runner.logPath : null, + retention, + kubernetes, + createdAt: job.createdAt, + updatedAt: job.updatedAt, + valuesPrinted: false, + }; +} + +function latestTerminalEvent(events: RunEvent[]): RunEvent | null { + for (const event of [...events].reverse()) { + if (event.type === "terminal_status") return event; + } + return null; +} + +function recordAt(record: JsonRecord, key: string): JsonRecord { + const value = record[key]; + return typeof value === "object" && value !== null && !Array.isArray(value) ? value as JsonRecord : {}; +} + +function isTerminalStatus(value: unknown): value is TerminalStatus { + return value === "completed" || value === "failed" || value === "blocked" || value === "cancelled"; +} diff --git a/src/mgr/server.ts b/src/mgr/server.ts index 3ff9eb8..4afe13a 100644 --- a/src/mgr/server.ts +++ b/src/mgr/server.ts @@ -8,6 +8,7 @@ import { asRecord, validateCreateCommand, validateCreateRun } from "../common/va import type { ApiErrorBody, ApiOkBody, JsonRecord, JsonValue, RunEvent } from "../common/types.js"; import { createKubernetesRunnerJob } from "./kubernetes-runner-job.js"; import { buildRunResult } from "./result.js"; +import { runnerJobStatusSummary } from "./runner-job-status.js"; export interface ManagerServerOptions { store?: AgentRunStore; @@ -104,6 +105,22 @@ async function route({ method, url, body, store, sourceCommit, runnerJobDefaults }, }) as unknown as JsonValue; } + if (method === "GET" && runnerJobMatch) { + const runId = runnerJobMatch[1] ?? ""; + const commandId = url.searchParams.get("commandId") ?? undefined; + const jobs = await store.listRunnerJobs(runId, commandId); + const events = await store.listEvents(runId, 0, 500); + return { items: jobs.map((job) => runnerJobStatusSummary(job, events)), count: jobs.length, lastSeq: events.at(-1)?.seq ?? 0 }; + } + const runnerJobShowMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/runner-jobs\/([^/]+)$/u); + if (method === "GET" && runnerJobShowMatch) { + const runId = runnerJobShowMatch[1] ?? ""; + const runnerJobId = runnerJobShowMatch[2] ?? ""; + const jobs = await store.listRunnerJobs(runId); + const job = jobs.find((item) => item.id === runnerJobId); + if (!job) throw new AgentRunError("schema-invalid", `runner job ${runnerJobId} was not found`, { httpStatus: 404 }); + return runnerJobStatusSummary(job, await store.listEvents(runId, 0, 500)) as JsonValue; + } const commandShowMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/commands\/([^/]+)$/u); if (method === "GET" && commandShowMatch) return await store.getCommand(commandShowMatch[2] ?? "") as unknown as JsonValue; const commandResultMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/commands\/([^/]+)\/result$/u); diff --git a/src/mgr/store.ts b/src/mgr/store.ts index 61f2e60..cab4991 100644 --- a/src/mgr/store.ts +++ b/src/mgr/store.ts @@ -3,6 +3,7 @@ import { AgentRunError } from "../common/errors.js"; import { newId, nowIso, stableHash } from "../common/validation.js"; import { redactJson } from "../common/redaction.js"; import { backendCapabilities } from "../common/backend-profiles.js"; +import { normalizeRunEventPayload, requireEventType } from "../common/events.js"; export type MaybePromise = T | Promise; @@ -198,8 +199,10 @@ export class MemoryAgentRunStore implements AgentRunStore { appendEvent(runId: string, type: RunEvent["type"], payload: JsonRecord): RunEvent { this.getRun(runId); + const eventType = requireEventType(type); + const eventPayload = normalizeRunEventPayload(eventType, payload); const events = this.eventsByRun.get(runId) ?? []; - const event: RunEvent = { id: newId("evt"), runId, seq: events.length + 1, type, payload: redactJson(payload), createdAt: nowIso() }; + const event: RunEvent = { id: newId("evt"), runId, seq: events.length + 1, type: eventType, payload: redactJson(eventPayload), createdAt: nowIso() }; events.push(event); this.eventsByRun.set(runId, events); return event; @@ -258,7 +261,10 @@ export class MemoryAgentRunStore implements AgentRunStore { private resolveSessionForRun(input: CreateRunInput, at: string): SessionRef | null { if (!input.sessionRef) return null; const existing = this.sessions.get(input.sessionRef.sessionId); - if (existing) return sessionRefFromRecord(existing, input.sessionRef); + if (existing) { + assertSessionBoundary(existing, input); + return sessionRefFromRecord(existing, input.sessionRef); + } const record: SessionRecord = { sessionId: input.sessionRef.sessionId, tenantId: input.tenantId, @@ -298,6 +304,15 @@ export class MemoryAgentRunStore implements AgentRunStore { } } +export function assertSessionBoundary(existing: SessionRecord, input: CreateRunInput): void { + if (existing.tenantId !== input.tenantId || existing.projectId !== input.projectId) { + throw new AgentRunError("tenant-policy-denied", "sessionRef cannot be reused across tenant or project boundary", { httpStatus: 403, details: { sessionId: existing.sessionId, valuesPrinted: false } }); + } + if (existing.backendProfile !== input.backendProfile) { + throw new AgentRunError("schema-invalid", "sessionRef cannot be reused across backendProfile boundary", { httpStatus: 400, details: { sessionId: existing.sessionId, existingBackendProfile: existing.backendProfile, requestedBackendProfile: input.backendProfile, valuesPrinted: false } }); + } +} + export function statusFromTerminal(terminalStatus: TerminalStatus): RunRecord["status"] { if (terminalStatus === "completed") return "completed"; if (terminalStatus === "cancelled") return "cancelled"; diff --git a/src/runner/run-once.ts b/src/runner/run-once.ts index 78c2552..1950fe8 100644 --- a/src/runner/run-once.ts +++ b/src/runner/run-once.ts @@ -71,10 +71,20 @@ export async function runOnce(options: RunnerOnceOptions): Promise { } await assertNotCancelled(api, options.runId, command.id); const result = await runBackendTurn(claimed, command, { ...options, ...(workspacePath ? { workspacePath } : {}), abortSignal: abortController.signal }); - for (const event of result.events) await api.appendEvent(options.runId, event); + for (const event of result.events) { + if (event.type !== "terminal_status") await api.appendEvent(options.runId, event); + } await api.reportCommandStatus(command.id, { terminalStatus: result.terminalStatus, failureKind: result.failureKind, failureMessage: result.failureMessage }); const finalRun = await api.reportStatus(options.runId, { terminalStatus: result.terminalStatus, failureKind: result.failureKind, failureMessage: result.failureMessage, ...(result.threadId ? { threadId: result.threadId } : {}), ...(result.turnId ? { turnId: result.turnId } : {}) }) as RunRecord; return { runner, commandId: command.id, terminalStatus: result.terminalStatus, failureKind: result.failureKind, run: finalRun } as JsonRecord; + } catch (error) { + const failureKind = failureKindFromError(error); + const terminalStatus = terminalStatusForFailure(failureKind); + const message = errorMessage(error); + await api.appendEvent(options.runId, { type: "error", payload: { failureKind, message, phase: "runner:execute", attemptId, runnerId: runner.id } }); + await api.reportCommandStatus(command.id, { terminalStatus, failureKind, failureMessage: message }); + const finalRun = await api.reportStatus(options.runId, { terminalStatus, failureKind, failureMessage: message }) as RunRecord; + return { runner, commandId: command.id, terminalStatus, failureKind, run: finalRun } as JsonRecord; } finally { stopCancelWatch(); } @@ -109,7 +119,6 @@ function watchCancellation(api: RunnerManagerApi, runId: string, commandId: stri } async function reportCancelled(api: RunnerManagerApi, runId: string, commandId: string, runner: RunnerRecord, claimed: RunRecord, message: string): Promise { - await api.appendEvent(runId, { type: "terminal_status", payload: { terminalStatus: "cancelled", failureKind: "cancelled", message } }); await api.reportCommandStatus(commandId, { terminalStatus: "cancelled", failureKind: "cancelled", failureMessage: message }); const finalRun = await api.reportStatus(runId, { terminalStatus: "cancelled", failureKind: "cancelled", failureMessage: message }); return { runner, commandId, claimed, terminalStatus: "cancelled", failureKind: "cancelled", run: finalRun }; diff --git a/src/selftest/cases/30-codex-stdio.ts b/src/selftest/cases/30-codex-stdio.ts index 0dbbf2d..280b0a1 100644 --- a/src/selftest/cases/30-codex-stdio.ts +++ b/src/selftest/cases/30-codex-stdio.ts @@ -58,14 +58,17 @@ const selfTest: SelfTestCase = async (context) => { assert.equal(explicitModelResult.terminalStatus, "completed", "explicit command payload model should still be forwarded"); await runFailureCase({ client, managerUrl: server.baseUrl, context, mode: "missing-turn-result", expectedStatus: "failed", expectedFailureKind: "backend-response-invalid" }); + await runFailureCase({ client, managerUrl: server.baseUrl, context, mode: "provider-401-rpc-error", expectedStatus: "failed", expectedFailureKind: "provider-auth-failed" }); + await runFailureCase({ client, managerUrl: server.baseUrl, context, mode: "provider-429-terminal", expectedStatus: "failed", expectedFailureKind: "provider-rate-limited" }); await runFailureCase({ client, managerUrl: server.baseUrl, context, mode: "provider-503-rpc-error", expectedStatus: "failed", expectedFailureKind: "provider-unavailable" }); await runFailureCase({ client, managerUrl: server.baseUrl, context, mode: "provider-503-terminal", expectedStatus: "failed", expectedFailureKind: "provider-unavailable" }); await runFailureCase({ client, managerUrl: server.baseUrl, context, mode: "provider-503-retry-event", expectedStatus: "failed", expectedFailureKind: "provider-unavailable", expectRetryError: true }); await runFailureCase({ client, managerUrl: server.baseUrl, context, mode: "invalid-json", expectedStatus: "failed", expectedFailureKind: "backend-json-parse-error" }); await runFailureCase({ client, managerUrl: server.baseUrl, context, mode: "missing-terminal", expectedStatus: "failed", expectedFailureKind: "backend-timeout", timeoutMs: 500 }); + await runSecretFailureCase({ client, managerUrl: server.baseUrl, context }); await runSpawnFailureCase({ client, managerUrl: server.baseUrl, context }); - return { name: "codex-stdio", tests: ["runner-lease-heartbeat", "codex-stdio-fake-turn", "codex-stdio-projected-writable-home", "codex-stdio-deepseek-profile-fake-turn", "codex-stdio-deepseek-missing-secret-no-fallback", "codex-stdio-config-model-authoritative", "codex-stdio-explicit-model-forwarded", "codex-stdio-missing-turn-result", "codex-stdio-provider-503-rpc-error", "codex-stdio-provider-503-terminal", "codex-stdio-provider-503-retry-event", "codex-stdio-invalid-json", "codex-stdio-timeout", "codex-stdio-spawn-failure"] }; + return { name: "codex-stdio", tests: ["runner-lease-heartbeat", "codex-stdio-fake-turn", "codex-stdio-projected-writable-home", "codex-stdio-deepseek-profile-fake-turn", "codex-stdio-deepseek-missing-secret-no-fallback", "codex-stdio-config-model-authoritative", "codex-stdio-explicit-model-forwarded", "codex-stdio-missing-turn-result", "codex-stdio-provider-auth-failed", "codex-stdio-provider-rate-limited", "codex-stdio-provider-503-rpc-error", "codex-stdio-provider-503-terminal", "codex-stdio-provider-503-retry-event", "codex-stdio-invalid-json", "codex-stdio-timeout", "codex-stdio-secret-unavailable", "codex-stdio-spawn-failure"] }; } finally { await new Promise((resolve) => server.server.close(() => resolve())); } @@ -101,6 +104,23 @@ function eventPayload(event: { payload: unknown }): JsonRecord { return typeof event.payload === "object" && event.payload !== null && !Array.isArray(event.payload) ? event.payload as JsonRecord : {}; } +async function runSecretFailureCase(options: { client: ManagerClient; managerUrl: string; context: SelfTestContext }): Promise { + const item = await createRunWithCommand(options.client, options.context, "failure missing secret files", "selftest-secret-unavailable", 3_000); + const result = await runOnce({ + managerUrl: options.managerUrl, + runId: item.runId, + codexCommand: options.context.fakeCodexCommand, + codexArgs: options.context.fakeCodexArgs, + codexHome: path.join(options.context.tmp, "missing-codex-home"), + env: { CODEX_HOME: path.join(options.context.tmp, "missing-codex-home") }, + }) as JsonRecord; + assert.equal(result.terminalStatus, "blocked", "secret unavailable"); + assert.equal(result.failureKind, "secret-unavailable", "secret unavailable"); + const run = await options.client.get(`/api/v1/runs/${item.runId}`) as { status?: string; failureKind?: string }; + assert.equal(run.status, "blocked", "secret unavailable"); + assert.equal(run.failureKind, "secret-unavailable", "secret unavailable"); +} + async function runSpawnFailureCase(options: { client: ManagerClient; managerUrl: string; context: SelfTestContext }): Promise { const item = await createRunWithCommand(options.client, options.context, "failure spawn", "selftest-spawn-failure", 3_000); const result = await runOnce({ diff --git a/src/selftest/cases/60-hwlab-baseline-contract.ts b/src/selftest/cases/60-hwlab-baseline-contract.ts new file mode 100644 index 0000000..c813f2a --- /dev/null +++ b/src/selftest/cases/60-hwlab-baseline-contract.ts @@ -0,0 +1,176 @@ +import assert from "node:assert/strict"; +import { execFile as execFileCallback } from "node:child_process"; +import { promisify } from "node:util"; +import { chmod, mkdir, readFile, writeFile } from "node:fs/promises"; +import path from "node:path"; +import { startManagerServer } from "../../mgr/server.js"; +import { MemoryAgentRunStore } from "../../mgr/store.js"; +import { ManagerClient } from "../../mgr/client.js"; +import { runOnce } from "../../runner/run-once.js"; +import { eventContractSummary } from "../../common/events.js"; +import type { BackendProfile, JsonRecord, RunEvent } from "../../common/types.js"; +import { assertNoSecretLeak, createRunWithCommand, type SelfTestCase, type SelfTestContext } from "../harness.js"; + +const execFile = promisify(execFileCallback); + +const selfTest: SelfTestCase = async (context) => { + const fakeKubectl = path.join(context.tmp, "fake-kubectl-contract.js"); + const createdManifest = path.join(context.tmp, "created-contract-runner-job.json"); + await writeFile(fakeKubectl, `#!/usr/bin/env bun +const chunks = []; +for await (const chunk of Bun.stdin.stream()) chunks.push(chunk); +const text = Buffer.concat(chunks.map((chunk) => Buffer.from(chunk))).toString("utf8"); +await Bun.write(${JSON.stringify(createdManifest)}, text); +const manifest = JSON.parse(text); +console.log(JSON.stringify({ apiVersion: manifest.apiVersion, kind: manifest.kind, metadata: { uid: "job-uid-contract", resourceVersion: "1", name: manifest.metadata.name, namespace: manifest.metadata.namespace } })); +`); + await chmod(fakeKubectl, 0o755); + const store = new MemoryAgentRunStore(); + const server = await startManagerServer({ + port: 0, + host: "127.0.0.1", + sourceCommit: "self-test", + store, + runnerJobDefaults: { + namespace: "agentrun-v01", + managerUrl: "http://agentrun-mgr.agentrun-v01.svc.cluster.local:8080", + image: "127.0.0.1:5000/agentrun/agentrun-mgr@sha256:1111111111111111111111111111111111111111111111111111111111111111", + kubectlCommand: fakeKubectl, + }, + }); + try { + const client = new ManagerClient(server.baseUrl); + await assertBackendPreflight(client); + await assertEventContractAndCompletedSemantics(client, context, server.baseUrl); + await assertRunnerJobStatus(client, context); + await assertSessionProfileIsolation(client, context); + await assertResourceBundleFailure(client, context, server.baseUrl); + return { name: "hwlab-baseline-contract", tests: ["event-contract", "result-completed-terminal-only", "bounded-output-summary", "runner-job-status", "backend-preflight-redacted", "session-profile-isolation", "resource-bundle-failure-kind"] }; + } finally { + await new Promise((resolve) => server.server.close(() => resolve())); + } +}; + +async function assertBackendPreflight(client: ManagerClient): Promise { + const response = await client.get("/api/v1/backends") as { items?: JsonRecord[] }; + const items = response.items ?? []; + assert.ok(items.length >= 2, "codex/deepseek backend capabilities should be visible"); + for (const item of items) { + const preflight = item.preflight as JsonRecord; + const defaultSecretRef = item.defaultSecretRef as JsonRecord; + assert.equal(preflight.valuesPrinted, false); + assert.equal(preflight.credentialValuesPrinted, false); + assert.equal(defaultSecretRef.valuesPrinted, false); + assert.ok(Array.isArray(preflight.requiredSecretKeys)); + } + assertNoSecretLeak(response); +} + +async function assertEventContractAndCompletedSemantics(client: ManagerClient, context: SelfTestContext, managerUrl: string): Promise { + const happy = await createRunWithCommand(client, context, "hello event contract", "selftest-event-contract", 15_000); + await client.post(`/api/v1/runs/${happy.runId}/events`, { type: "tool_call", payload: { method: "selftest/tool", item: { command: "echo ok" } } }); + await client.post(`/api/v1/runs/${happy.runId}/events`, { type: "diff", payload: { filesChanged: 1, summary: "selftest diff" } }); + const result = await runOnce({ managerUrl, runId: happy.runId, codexCommand: context.fakeCodexCommand, codexArgs: context.fakeCodexArgs, codexHome: context.codexHome, env: { CODEX_HOME: context.codexHome } }); + assert.equal(result.terminalStatus, "completed"); + const eventsResponse = await client.get(`/api/v1/runs/${happy.runId}/events?afterSeq=0&limit=100`) as { items?: RunEvent[] }; + const events = eventsResponse.items ?? []; + const summary = eventContractSummary(events); + assert.equal(summary.ok, true, JSON.stringify(summary.issues)); + const types = new Set(events.map((event) => event.type)); + for (const type of ["backend_status", "assistant_message", "tool_call", "command_output", "diff", "terminal_status"]) assert.ok(types.has(type as never), `missing event type ${type}`); + const envelope = await client.get(`/api/v1/runs/${happy.runId}/commands/${happy.commandId}/result`) as JsonRecord; + assert.equal(envelope.completed, true); + assert.equal(envelope.terminalStatus, "completed"); + assert.equal(envelope.terminalSource, "terminal_status-event"); + assertNoSecretLeak({ eventsResponse, envelope }); + + const partial = await createRunWithCommand(client, context, "partial should not complete", "selftest-partial-not-completed", 15_000); + const largeOutput = `${"x".repeat(6_000)}\nAuthorization: Bearer test-token\n`; + await client.post(`/api/v1/runs/${partial.runId}/events`, { type: "assistant_message", payload: { text: "partial assistant only" } }); + await client.post(`/api/v1/runs/${partial.runId}/events`, { type: "command_output", payload: { stream: "stdout", text: largeOutput } }); + const partialEnvelope = await client.get(`/api/v1/runs/${partial.runId}/commands/${partial.commandId}/result`) as JsonRecord; + const artifactSummary = partialEnvelope.artifactSummary as JsonRecord; + const stdoutSummary = artifactSummary.stdoutSummary as JsonRecord; + assert.equal(partialEnvelope.completed, false); + assert.equal(partialEnvelope.terminalStatus, null); + assert.equal(stdoutSummary.outputTruncated, true); + assert.equal(typeof stdoutSummary.outputBytes, "number"); + assertNoSecretLeak(partialEnvelope); +} + +async function assertRunnerJobStatus(client: ManagerClient, context: SelfTestContext): Promise { + const item = await createRunWithCommand(client, context, "runner job status", "selftest-runner-job-status", 15_000); + await client.post(`/api/v1/runs/${item.runId}/runner-jobs`, { commandId: item.commandId, idempotencyKey: "selftest-runner-job-status" }) as JsonRecord; + const manifest = JSON.parse(await readFile(path.join(context.tmp, "created-contract-runner-job.json"), "utf8")) as JsonRecord; + assert.equal(manifest.kind, "Job"); + const list = await client.get(`/api/v1/runs/${item.runId}/runner-jobs`) as { items?: JsonRecord[]; count?: number }; + assert.equal(list.count, 1); + const status = list.items?.[0] ?? {}; + assert.equal(status.runId, item.runId); + assert.equal(status.commandId, item.commandId); + assert.equal(status.phase, "created"); + assert.equal(status.valuesPrinted, false); + assert.equal(typeof status.logPath, "string"); + const single = await client.get(`/api/v1/runs/${item.runId}/runner-jobs/${String(status.id)}`) as JsonRecord; + assert.equal(single.jobName, status.jobName); + assertNoSecretLeak({ list, single }); +} + +async function assertSessionProfileIsolation(client: ManagerClient, context: SelfTestContext): Promise { + const first = await client.post("/api/v1/runs", runPayload(context, "codex", "selftest-profile-boundary-session")) as { id: string }; + await client.patch(`/api/v1/runs/${first.id}/status`, { terminalStatus: "completed", failureKind: null, failureMessage: null, threadId: "thread_codex_profile_boundary", turnId: "turn_profile_boundary" }); + await assert.rejects( + () => client.post("/api/v1/runs", runPayload(context, "deepseek", "selftest-profile-boundary-session")), + (error) => error instanceof Error && error.message.includes("backendProfile boundary"), + ); +} + +async function assertResourceBundleFailure(client: ManagerClient, context: SelfTestContext, managerUrl: string): Promise { + const repo = await createLocalGitRepo(context); + const run = await client.post("/api/v1/runs", { + ...runPayload(context, "codex", "selftest-bad-bundle-session"), + resourceBundleRef: { kind: "git", repoUrl: repo.repoUrl, commitId: "0000000000000000000000000000000000000000", submodules: false, lfs: false }, + }) as { id: string }; + const command = await client.post(`/api/v1/runs/${run.id}/commands`, { type: "turn", payload: { prompt: "bad bundle" }, idempotencyKey: "selftest-bad-bundle" }) as { id: string }; + const result = await runOnce({ managerUrl, runId: run.id, commandId: command.id, codexCommand: context.fakeCodexCommand, codexArgs: context.fakeCodexArgs, codexHome: context.codexHome, env: { CODEX_HOME: context.codexHome, AGENTRUN_WORKSPACE_ROOT: path.join(context.tmp, "bad-bundle-workspaces") } }) as JsonRecord; + assert.equal(result.terminalStatus, "failed"); + assert.equal(result.failureKind, "infra-failed"); + const envelope = await client.get(`/api/v1/runs/${run.id}/commands/${command.id}/result`) as JsonRecord; + assert.equal(envelope.completed, false); + assert.equal(envelope.failureKind, "infra-failed"); + const commandRecord = await client.get(`/api/v1/runs/${run.id}/commands/${command.id}`) as { state?: string }; + assert.equal(commandRecord.state, "failed"); +} + +function runPayload(context: SelfTestContext, backendProfile: BackendProfile, sessionId: string): JsonRecord { + const secretHome = backendProfile === "deepseek" ? context.deepseekHome : context.codexHome; + return { + tenantId: "hwlab", + projectId: "pikasTech/HWLAB", + workspaceRef: { kind: "opaque", repo: "pikasTech/HWLAB" }, + sessionRef: { sessionId, conversationId: sessionId }, + providerId: "G14", + backendProfile, + executionPolicy: { + sandbox: "workspace-write", + approval: "never", + timeoutMs: 15_000, + network: "default", + secretScope: { allowCredentialEcho: false, providerCredentials: [{ profile: backendProfile, secretRef: { name: `agentrun-v01-provider-${backendProfile}`, keys: ["auth.json", "config.toml"], mountPath: secretHome } }] }, + }, + traceSink: { kind: "hwlab", traceId: sessionId }, + }; +} + +async function createLocalGitRepo(context: SelfTestContext): Promise<{ repoUrl: string; commitId: string }> { + const repo = path.join(context.tmp, "contract-bundle-repo"); + await mkdir(repo, { recursive: true }); + await execFile("git", ["init"], { cwd: repo }); + await writeFile(path.join(repo, "README.md"), "AgentRun contract bundle self-test\n", "utf8"); + await execFile("git", ["add", "README.md"], { cwd: repo }); + await execFile("git", ["-c", "user.email=selftest@example.invalid", "-c", "user.name=AgentRun SelfTest", "commit", "-m", "contract bundle selftest"], { cwd: repo }); + const { stdout } = await execFile("git", ["rev-parse", "HEAD"], { cwd: repo }); + return { repoUrl: repo, commitId: stdout.trim() }; +} + +export default selfTest; diff --git a/src/selftest/fake-codex-app-server.ts b/src/selftest/fake-codex-app-server.ts index e5ef69d..985079e 100644 --- a/src/selftest/fake-codex-app-server.ts +++ b/src/selftest/fake-codex-app-server.ts @@ -66,6 +66,10 @@ for await (const line of rl) { respond(message.id, null, { code: -32000, message: "responseStreamDisconnected: HTTP 503 Service Unavailable from provider" }); continue; } + if (mode === "provider-401-rpc-error") { + respond(message.id, null, { code: -32000, message: "HTTP 401 Unauthorized: invalid api key" }); + continue; + } if (mode === "missing-terminal") { turnCounter += 1; const turn = { id: `turn_selftest_${turnCounter}`, status: "running" }; @@ -81,6 +85,14 @@ for await (const line of rl) { respond(message.id, { turn }); continue; } + if (mode === "provider-429-terminal") { + turnCounter += 1; + const turn = { id: `turn_selftest_${turnCounter}`, status: "failed", error: { message: "HTTP 429 Too Many Requests: rate limit exceeded" } }; + notify("turn/started", { turn: { id: turn.id, status: "running" } }); + notify("turn/completed", { turn }); + respond(message.id, { turn }); + continue; + } if (mode === "provider-503-retry-event") { turnCounter += 1; const turn = {