From d90e01a91cea8f297105595e9be016f84f6898df Mon Sep 17 00:00:00 2001 From: Codex Date: Tue, 2 Jun 2026 10:04:36 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=94=AF=E6=8C=81=E8=BF=90=E8=A1=8C?= =?UTF-8?q?=E4=B8=AD=20steer=20command?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- docs/reference/cli.md | 3 +- docs/reference/spec-v01-agentrun-mgr.md | 2 +- docs/reference/spec-v01-agentrun-runner.md | 1 + docs/reference/spec-v01-backend-codex.md | 2 +- docs/reference/spec-v01-cli.md | 2 +- .../spec-v01-hwlab-manual-dispatch.md | 2 +- scripts/src/cli.ts | 2 +- src/backend/adapter.ts | 9 ++ src/backend/codex-stdio.ts | 23 +++++ src/common/types.ts | 4 +- src/common/validation.ts | 11 ++- src/runner/manager-api.ts | 13 ++- src/runner/run-once.ts | 84 ++++++++++++++++++- .../cases/50-hwlab-manual-dispatch.ts | 19 ++++- src/selftest/fake-codex-app-server.ts | 42 ++++++++++ 15 files changed, 206 insertions(+), 13 deletions(-) diff --git a/docs/reference/cli.md b/docs/reference/cli.md index b36034a..7c5c3f2 100644 --- a/docs/reference/cli.md +++ b/docs/reference/cli.md @@ -27,7 +27,7 @@ CLI 默认输出 JSON。空 stdout 是失败,不是成功。每个命令都必 ./scripts/agentrun runs events --after-seq --limit ./scripts/agentrun runs result [--command-id ] ./scripts/agentrun runs cancel [--reason ] -./scripts/agentrun commands create --type turn --json-file +./scripts/agentrun commands create --type turn|steer|interrupt --json-file ./scripts/agentrun commands show --run-id ./scripts/agentrun commands result --run-id ./scripts/agentrun commands cancel [--reason ] @@ -43,6 +43,7 @@ CLI 默认输出 JSON。空 stdout 是失败,不是成功。每个命令都必 行为必须保持以下规则: - `runs create` 创建 durable facts 并立即返回。 +- `commands create` 创建 durable command;`turn` 启动一轮对话,`steer` 只在同 run 有 active turn 时由 runner 转发到 Codex `turn/steer`,迟到 steer 返回结构化 blocked,不终结 run。 - `runner start` 启动本地进程或 Kubernetes Job,并返回 process/job identity、log path 和 poll commands。 - `events` 默认分页且有界。 - `server logs` 返回有界日志,并指向完整日志文件。 diff --git a/docs/reference/spec-v01-agentrun-mgr.md b/docs/reference/spec-v01-agentrun-mgr.md index 41958f5..4257cb3 100644 --- a/docs/reference/spec-v01-agentrun-mgr.md +++ b/docs/reference/spec-v01-agentrun-mgr.md @@ -90,7 +90,7 @@ Manager 只承接 HWLAB v0.2 Code Agent 的通用执行事实,不承接 HWLAB | `executionPolicy` | 必填或由 manager 显式补齐默认值,至少包含 sandbox、approval、timeout、network 和 secretScope。 | | `traceSink` | 字段必须存在;可以为 `null` 或显式 sink。 | -`POST /api/v1/runs/:runId/commands` 必须支持 idempotency key。相同 key 且 payload hash 相同应返回既有 command;相同 key 但 payload hash 不同必须结构化失败。 +`POST /api/v1/runs/:runId/commands` 必须支持 idempotency key。相同 key 且 payload hash 相同应返回既有 command;相同 key 但 payload hash 不同必须结构化失败。`type=turn` 是普通对话 command;`type=steer` 是面向同 run active turn 的运行中引导 command,payload 必须包含非空 `prompt`、`message` 或 `text`,普通 runner poll 不得把它当作新 turn 执行;`type=interrupt` 只保留 durable command 语义,业务 cancel 仍以 run/command cancel API 为权威。 ## Tenant Policy Boundary diff --git a/docs/reference/spec-v01-agentrun-runner.md b/docs/reference/spec-v01-agentrun-runner.md index f0ac99e..368f281 100644 --- a/docs/reference/spec-v01-agentrun-runner.md +++ b/docs/reference/spec-v01-agentrun-runner.md @@ -64,6 +64,7 @@ claimed -> lease_lost - runner 必须先 register,再 claim run;claim 失败不能继续调用 backend。 - lease heartbeat 必须可观察;过期或冲突时写入 failure event 或明确退出原因。 - command 只能从 manager poll;不得从本地文件或临时参数伪造正式 command。 +- runner 的普通 poll 只选择 pending `turn`;当 backend adapter 暴露 active turn control 后,runner 才在同 run 内轮询 pending `steer` command,ack 后调用 backend 的 steer 能力并单独终结该 steer command。active turn 结束后到达的 steer 必须结构化 blocked,不得启动新 turn,也不得把 run 标为 terminal。 - backend 产生的所有可见输出必须先经过 adapter normalization 和 redaction,再 append 到 manager;backend_status 至少包含 redacted profile/backendKind/protocol 摘要。 - 单个 command terminal 上报后 runner 不应立即退出,而应继续 poll 同一 run 的 pending command,直到 idle timeout、lease 冲突或 run terminal。退出码与 runner loop 终态必须一致或在日志中可解释。 diff --git a/docs/reference/spec-v01-backend-codex.md b/docs/reference/spec-v01-backend-codex.md index fed765a..4647e5e 100644 --- a/docs/reference/spec-v01-backend-codex.md +++ b/docs/reference/spec-v01-backend-codex.md @@ -18,7 +18,7 @@ Codex stdio backend 是 AgentRun `v0.1` 的第一真实 Code Agent backend kind codex app-server --listen stdio:// ``` -Adapter 通过 stdin 写入换行分隔 JSON-RPC 请求,通过 stdout 逐行读取 JSON-RPC response 和 notification,stderr 只作为有界诊断日志。最小请求序列是 `initialize`、`thread/start` 或 `thread/resume`、`turn/start`;response 中必须提取 thread/turn identity,notification 和后续输出必须归一化为 `backend_status`、`assistant_message`、`tool_call`、`command_output`、`error` 和 `terminal_status` events。 +Adapter 通过 stdin 写入换行分隔 JSON-RPC 请求,通过 stdout 逐行读取 JSON-RPC response 和 notification,stderr 只作为有界诊断日志。最小请求序列是 `initialize`、`thread/start` 或 `thread/resume`、`turn/start`;response 中必须提取 thread/turn identity,notification 和后续输出必须归一化为 `backend_status`、`assistant_message`、`tool_call`、`command_output`、`error` 和 `terminal_status` events。运行中 steer 使用同一 app-server 进程的 `turn/steer` JSON-RPC 方法,参数为 `threadId`、`expectedTurnId` 和文本 `input` 数组;取消/中断使用 `turn/interrupt`,参数为 `threadId` 和 `turnId`。 不得把以下路径作为 `v0.1` Codex stdio backend 的正式实现或综合联调通过证据:直接 Responses HTTP 代理、OpenAI SDK wrapper、`codex exec` 一次性命令输出、fake provider、固定文本回复、只读 shortcut 或本地 shell 模拟。裸 HTTP 或 `codex exec --json` 可以作为 provider/upstream 诊断,但最终通过必须来自 app-server stdio turn。 diff --git a/docs/reference/spec-v01-cli.md b/docs/reference/spec-v01-cli.md index 7ee2bb0..b168e53 100644 --- a/docs/reference/spec-v01-cli.md +++ b/docs/reference/spec-v01-cli.md @@ -39,7 +39,7 @@ CLI 官方 TypeScript 入口固定为 `scripts/agentrun-cli.ts`。在 G14 非交 ./scripts/agentrun runs events --after-seq --limit ./scripts/agentrun runs result [--command-id ] ./scripts/agentrun runs cancel [--reason ] -./scripts/agentrun commands create --type turn --json-file +./scripts/agentrun commands create --type turn|steer|interrupt --json-file ./scripts/agentrun commands show --run-id ./scripts/agentrun commands result --run-id ./scripts/agentrun commands cancel [--reason ] diff --git a/docs/reference/spec-v01-hwlab-manual-dispatch.md b/docs/reference/spec-v01-hwlab-manual-dispatch.md index ad9defa..aae649f 100644 --- a/docs/reference/spec-v01-hwlab-manual-dispatch.md +++ b/docs/reference/spec-v01-hwlab-manual-dispatch.md @@ -87,7 +87,7 @@ HWLAB canary 创建 run 时应使用以下字段口径: | `executionPolicy` | sandbox、network、timeout、secretScope 必须显式,不得由 HWLAB 扩大 AgentRun Secret 范围。 | | `traceSink` | 可指向 HWLAB trace adapter;为 `null` 时 HWLAB 仍可通过 AgentRun events 轮询。 | -Command 第一阶段只要求 `type=turn`。用户原始 prompt、conversation metadata、profile 选择和 HWLAB trace correlation 必须作为 command payload 的非敏感字段保存;不得把 cookie、session token、provider credential、device internal token 或 Secret value 写入 payload。 +Command 第一阶段要求 `type=turn` 和 `type=steer`。`turn` 保存用户原始 prompt、conversation metadata、profile 选择和 HWLAB trace correlation;`steer` 保存运行中引导文本,并由 runner 在同 run active turn 期间转发到 backend。业务 cancel 仍走 run/command cancel API,不用 `steer` 伪装。不得把 cookie、session token、provider credential、device internal token 或 Secret value 写入 payload。 ## 需要补齐的能力 diff --git a/scripts/src/cli.ts b/scripts/src/cli.ts index d5ec7bd..e78f678 100644 --- a/scripts/src/cli.ts +++ b/scripts/src/cli.ts @@ -453,7 +453,7 @@ function help(): JsonRecord { "runs events --after-seq --limit ", "runs result [--command-id ]", "runs cancel [--reason ]", - "commands create --type turn --json-file ", + "commands create --type turn|steer|interrupt --json-file ", "commands show --run-id ", "commands result --run-id ", "commands cancel [--reason ]", diff --git a/src/backend/adapter.ts b/src/backend/adapter.ts index 107bc36..943d063 100644 --- a/src/backend/adapter.ts +++ b/src/backend/adapter.ts @@ -2,6 +2,13 @@ import type { BackendEvent, BackendTurnResult, CommandRecord, RunRecord } from " import { CodexStdioBackendSession, runCodexStdioTurn, type CodexStdioTurnOptions } from "./codex-stdio.js"; import { backendProfileSpec } from "../common/backend-profiles.js"; +export interface BackendActiveTurnControl { + threadId: string; + turnId: string; + steer(prompt: string): Promise; + interrupt(): Promise; +} + export interface BackendAdapterOptions { codexCommand?: string; codexArgs?: string[]; @@ -9,6 +16,7 @@ export interface BackendAdapterOptions { workspacePath?: string; abortSignal?: AbortSignal; onEvent?: (event: BackendEvent) => void | Promise; + onActiveTurn?: (control: BackendActiveTurnControl) => void | (() => void); env?: NodeJS.ProcessEnv; } @@ -54,5 +62,6 @@ export function backendTurnOptions(run: RunRecord, command: CommandRecord, optio if (options.codexHome) turnOptions.codexHome = options.codexHome; if (options.abortSignal) turnOptions.abortSignal = options.abortSignal; if (options.onEvent) turnOptions.onEvent = options.onEvent; + if (options.onActiveTurn) turnOptions.onActiveTurn = options.onActiveTurn; return turnOptions; } diff --git a/src/backend/codex-stdio.ts b/src/backend/codex-stdio.ts index 7bdb053..80227a7 100644 --- a/src/backend/codex-stdio.ts +++ b/src/backend/codex-stdio.ts @@ -48,6 +48,14 @@ export interface CodexStdioTurnOptions { codexHome?: string; abortSignal?: AbortSignal; onEvent?: (event: BackendEvent) => void | Promise; + onActiveTurn?: (control: CodexActiveTurnControl) => void | (() => void); +} + +export interface CodexActiveTurnControl { + threadId: string; + turnId: string; + steer(prompt: string): Promise; + interrupt(): Promise; } interface PendingRequest { @@ -396,6 +404,7 @@ async function runCodexStdioTurnWithSession(options: CodexStdioTurnOptions, sess const terminalPromise = new Promise((resolve) => { terminalResolve = resolve; }); let client: CodexStdioClient | null = null; const requestTimeoutMs = Math.min(positiveTimeout(options.timeoutMs), requestTimeoutCapMs); + let stopActiveTurn: (() => void) | undefined; const abortTurn = (): void => { if (terminal) return; terminal = { status: "cancelled", failureKind: "cancelled", message: "cancel requested" }; @@ -463,6 +472,19 @@ async function runCodexStdioTurnWithSession(options: CodexStdioTurnOptions, sess const turnResponse = requireResponseRecord(await client.request("turn/start", withOptionalModel({ threadId, input: textInput(options.prompt), cwd: options.cwd, approvalPolicy: options.approvalPolicy }, options.model), requestTimeoutMs), "turn/start"); turnId = requireNestedId(turnResponse, "turn/start", "turn"); emitEvent({ type: "backend_status", payload: { phase: "turn/start:completed", turnId } }); + if (threadId && turnId && options.onActiveTurn) { + const maybeStop = options.onActiveTurn({ + threadId, + turnId, + steer: async (prompt: string) => { + await client!.request("turn/steer", { threadId: threadId!, expectedTurnId: turnId!, input: textInput(prompt) }, requestTimeoutMs); + }, + interrupt: async () => { + await client!.request("turn/interrupt", { threadId: threadId!, turnId: turnId! }, requestTimeoutMs); + }, + }); + if (typeof maybeStop === "function") stopActiveTurn = maybeStop; + } const race = await Promise.race([ terminalPromise.then(() => ({ kind: "terminal" as const })), @@ -480,6 +502,7 @@ async function runCodexStdioTurnWithSession(options: CodexStdioTurnOptions, sess emitEvent({ type: "error", payload: { failureKind: failure.failureKind, message: failure.message, phase: failure.phase, details: failure.details } }); } } finally { + stopActiveTurn?.(); stopNotifications(); options.abortSignal?.removeEventListener("abort", abortTurn); clearTimeout(timeout); diff --git a/src/common/types.ts b/src/common/types.ts index 92fd6f6..4ea38a1 100644 --- a/src/common/types.ts +++ b/src/common/types.ts @@ -113,8 +113,10 @@ export interface RunRecord extends CreateRunInput { leaseExpiresAt: string | null; } +export type CommandType = "turn" | "steer" | "interrupt"; + export interface CreateCommandInput extends JsonRecord { - type: "turn" | "interrupt"; + type: CommandType; payload: JsonRecord; idempotencyKey?: string; } diff --git a/src/common/validation.ts b/src/common/validation.ts index 995fbb7..fd915b4 100644 --- a/src/common/validation.ts +++ b/src/common/validation.ts @@ -215,12 +215,21 @@ function validateBackendSecretScope(backendProfile: BackendProfile, executionPol export function validateCreateCommand(input: unknown): CreateCommandInput { const record = asRecord(input, "command"); const type = requiredString(record, "type"); - if (type !== "turn" && type !== "interrupt") throw new AgentRunError("schema-invalid", `command type ${type} is not supported`, { httpStatus: 400 }); + if (type !== "turn" && type !== "steer" && type !== "interrupt") throw new AgentRunError("schema-invalid", `command type ${type} is not supported`, { httpStatus: 400 }); const payload = asRecord(record.payload ?? {}, "payload"); + if (type === "steer" && !steerPrompt(payload)) throw new AgentRunError("schema-invalid", "steer command payload requires a non-empty prompt, message, or text", { httpStatus: 400 }); const idempotencyKey = typeof record.idempotencyKey === "string" && record.idempotencyKey.trim().length > 0 ? record.idempotencyKey.trim() : undefined; return { type, payload, ...(idempotencyKey ? { idempotencyKey } : {}) }; } +function steerPrompt(payload: JsonRecord): string | null { + for (const key of ["prompt", "message", "text"]) { + const value = payload[key]; + if (typeof value === "string" && value.trim().length > 0) return value.trim(); + } + return null; +} + export function validateCreateQueueTask(input: unknown): CreateQueueTaskInput { const record = asRecord(input, "queueTask"); const tenantId = requiredString(record, "tenantId"); diff --git a/src/runner/manager-api.ts b/src/runner/manager-api.ts index 6ee9a19..4149fa5 100644 --- a/src/runner/manager-api.ts +++ b/src/runner/manager-api.ts @@ -64,12 +64,19 @@ export class RunnerManagerApi { } async pollCommands(runId: string, options: { afterSeq?: number; limit?: number; commandId?: string }): Promise { + const listOptions: { afterSeq?: number; limit?: number } = {}; + if (options.afterSeq !== undefined) listOptions.afterSeq = options.afterSeq; + if (options.limit !== undefined) listOptions.limit = options.limit; + const items = await this.listCommands(runId, listOptions); + const selected = options.commandId ? items.find((item) => item.id === options.commandId && item.state === "pending" && item.type === "turn") ?? null : items.find((item) => item.state === "pending" && item.type === "turn") ?? null; + return { items, selected }; + } + + async listCommands(runId: string, options: { afterSeq?: number; limit?: number } = {}): Promise { const afterSeq = options.afterSeq ?? 0; const limit = options.limit ?? 20; const response = await this.client.get(`/api/v1/runs/${encodeURIComponent(runId)}/commands?afterSeq=${afterSeq}&limit=${limit}`) as { items?: CommandRecord[] }; - const items = Array.isArray(response.items) ? response.items : []; - const selected = options.commandId ? items.find((item) => item.id === options.commandId && item.state === "pending" && item.type === "turn") ?? null : items.find((item) => item.state === "pending" && item.type === "turn") ?? null; - return { items, selected }; + return Array.isArray(response.items) ? response.items : []; } async ackCommand(commandId: string): Promise { diff --git a/src/runner/run-once.ts b/src/runner/run-once.ts index 63d0ae9..eabf4e1 100644 --- a/src/runner/run-once.ts +++ b/src/runner/run-once.ts @@ -1,5 +1,5 @@ import { RunnerManagerApi, failureKindFromError, terminalStatusForFailure, errorMessage } from "./manager-api.js"; -import { createBackendSession, runBackendTurn, type BackendAdapterOptions, type BackendSession } from "../backend/adapter.js"; +import { createBackendSession, runBackendTurn, type BackendActiveTurnControl, type BackendAdapterOptions, type BackendSession } from "../backend/adapter.js"; import { materializeResourceBundle } from "./resource-bundle.js"; import type { BackendEvent, BackendProfile, CommandRecord, FailureKind, JsonRecord, RunRecord, RunnerRecord, TerminalStatus } from "../common/types.js"; import { AgentRunError } from "../common/errors.js"; @@ -81,6 +81,7 @@ export async function runOnce(options: RunnerOnceOptions): Promise { firstPoll = false; const command = commandsResponse.selected; if (!command) { + await failPendingSteerWithoutActiveTurn(api, options.runId, commandsResponse.items, runner.id, attemptId); if (options.oneShot === true) return noPendingResult(runner, claimed, commandResults, commandsResponse.items.length, "one-shot-no-pending"); if (Date.now() - idleSince >= idleTimeoutMs) return noPendingResult(runner, claimed, commandResults, commandsResponse.items.length, "idle-timeout"); await sleep(pollIntervalMs); @@ -153,6 +154,7 @@ async function executeCommand(api: RunnerManagerApi, options: RunnerOnceOptions, const abortController = new AbortController(); const stopCancelWatch = watchCancellation(api, options.runId, command.id, abortController); const stopBackendProgress = startBackendProgress(api, options.runId, command.id, attemptId, runner.id, options.backendProfile ?? null); + let stopSteerWatch: (() => void) | undefined; try { const latestRun = await api.getRun(options.runId); const backendOptions = { @@ -162,6 +164,13 @@ async function executeCommand(api: RunnerManagerApi, options: RunnerOnceOptions, onEvent: async (event: BackendEvent) => { await appendBestEffort(api, options.runId, annotateCommandEvent(event, command.id, attemptId, runner.id)); }, + onActiveTurn: (control: BackendActiveTurnControl) => { + stopSteerWatch = startSteerWatch(api, options.runId, command.id, attemptId, runner.id, control, options.pollIntervalMs); + return () => { + stopSteerWatch?.(); + stopSteerWatch = undefined; + }; + }, }; const result = backendSession ? await backendSession.runTurn(latestRun, command, backendOptions) : await runBackendTurn(latestRun, command, backendOptions); for (const event of result.events) { @@ -174,12 +183,85 @@ async function executeCommand(api: RunnerManagerApi, options: RunnerOnceOptions, const failure = { failureKind, terminalStatus: terminalStatusForFailure(failureKind), message: errorMessage(error) }; return await reportCommandFailure(api, options.runId, command.id, runner, attemptId, failure, "runner:execute"); } finally { + stopSteerWatch?.(); stopBackendProgress(); await appendBestEffort(api, options.runId, { type: "backend_status", payload: { phase: "backend-turn-finished", commandId: command.id, attemptId, runnerId: runner.id } }); stopCancelWatch(); } } +function startSteerWatch(api: RunnerManagerApi, runId: string, targetCommandId: string, attemptId: string, runnerId: string, control: BackendActiveTurnControl, pollIntervalMs: number | undefined): () => void { + let stopped = false; + let polling = false; + const seen = new Set(); + const intervalMs = normalizePollIntervalMs(pollIntervalMs); + const poll = async (): Promise => { + if (stopped || polling) return; + polling = true; + try { + const commands = await api.listCommands(runId, { afterSeq: 0, limit: 100 }); + const pendingSteer = commands.filter((item) => item.type === "steer" && item.state === "pending" && !seen.has(item.id)); + for (const steerCommand of pendingSteer) { + seen.add(steerCommand.id); + await handleSteerCommand(api, runId, steerCommand, targetCommandId, attemptId, runnerId, control); + } + } catch { + // The active backend turn remains authoritative; missed steer commands stay pending for the next poll. + } finally { + polling = false; + } + }; + const timer = setInterval(() => { void poll(); }, intervalMs); + void poll(); + return () => { + stopped = true; + clearInterval(timer); + }; +} + +async function handleSteerCommand(api: RunnerManagerApi, runId: string, command: CommandRecord, targetCommandId: string, attemptId: string, runnerId: string, control: BackendActiveTurnControl): Promise { + const acked = await api.ackCommand(command.id); + if (acked.state === "cancelled") { + await api.reportCommandStatus(command.id, { terminalStatus: "cancelled", failureKind: "cancelled", failureMessage: "steer command cancelled before delivery", threadId: control.threadId, turnId: control.turnId }); + return; + } + const prompt = steerPrompt(command.payload); + if (!prompt) { + await reportNonTerminalCommandFailure(api, runId, command.id, attemptId, runnerId, { terminalStatus: "blocked", failureKind: "schema-invalid", message: "steer command payload requires a non-empty prompt, message, or text" }, "runner:steer:payload", control); + return; + } + await appendBestEffort(api, runId, { type: "backend_status", payload: { phase: "steer-command-acknowledged", commandId: command.id, commandType: "steer", targetCommandId, attemptId, runnerId, threadId: control.threadId, turnId: control.turnId } }); + try { + await control.steer(prompt); + await appendBestEffort(api, runId, { type: "backend_status", payload: { phase: "turn/steer:completed", commandId: command.id, commandType: "steer", targetCommandId, attemptId, runnerId, threadId: control.threadId, turnId: control.turnId } }); + await api.reportCommandStatus(command.id, { terminalStatus: "completed", failureKind: null, failureMessage: null, threadId: control.threadId, turnId: control.turnId }); + } catch (error) { + const failureKind = failureKindFromError(error); + await reportNonTerminalCommandFailure(api, runId, command.id, attemptId, runnerId, { terminalStatus: terminalStatusForFailure(failureKind), failureKind, message: errorMessage(error) }, "runner:steer", control); + } +} + +async function failPendingSteerWithoutActiveTurn(api: RunnerManagerApi, runId: string, commands: CommandRecord[], runnerId: string, attemptId: string): Promise { + for (const command of commands.filter((item) => item.type === "steer" && item.state === "pending")) { + const acked = await api.ackCommand(command.id); + if (acked.state === "cancelled") continue; + await reportNonTerminalCommandFailure(api, runId, command.id, attemptId, runnerId, { terminalStatus: "blocked", failureKind: "schema-invalid", message: "steer command requires an active turn" }, "runner:steer:no-active-turn"); + } +} + +async function reportNonTerminalCommandFailure(api: RunnerManagerApi, runId: string, commandId: string, attemptId: string, runnerId: string, failure: { terminalStatus: TerminalStatus; failureKind: FailureKind; message: string }, phase: string, control?: BackendActiveTurnControl): Promise { + await appendBestEffort(api, runId, { type: "error", payload: { failureKind: failure.failureKind, message: failure.message, phase, commandId, attemptId, runnerId, ...(control ? { threadId: control.threadId, turnId: control.turnId } : {}) } }); + await api.reportCommandStatus(commandId, { terminalStatus: failure.terminalStatus, failureKind: failure.failureKind, failureMessage: failure.message, ...(control ? { threadId: control.threadId, turnId: control.turnId } : {}) }); +} + +function steerPrompt(payload: JsonRecord): string | null { + for (const key of ["prompt", "message", "text"]) { + const value = payload[key]; + if (typeof value === "string" && value.trim().length > 0) return value.trim(); + } + return null; +} + function isTerminalRun(run: RunRecord): boolean { return run.status === "completed" || run.status === "failed" || run.status === "blocked" || run.status === "cancelled"; } diff --git a/src/selftest/cases/50-hwlab-manual-dispatch.ts b/src/selftest/cases/50-hwlab-manual-dispatch.ts index 2465674..07a0dbd 100644 --- a/src/selftest/cases/50-hwlab-manual-dispatch.ts +++ b/src/selftest/cases/50-hwlab-manual-dispatch.ts @@ -131,6 +131,23 @@ console.log(JSON.stringify({ apiVersion: manifest.apiVersion, kind: manifest.kin assert.equal(secondEnvelope.terminalStatus, "completed"); assert.equal(secondEnvelope.reply, "fake codex stdio reply"); + const steerRun = await createHwlabRun(client, context, bundle, "hwlab-session-steer", "start a turn that waits for steer", "hwlab-command-steer-turn", 10_000); + const steerRunner = runOnce({ managerUrl: server.baseUrl, runId: steerRun.runId, commandId: steerRun.commandId, codexCommand: context.fakeCodexCommand, codexArgs: context.fakeCodexArgs, codexHome: context.codexHome, env: { CODEX_HOME: context.codexHome, AGENTRUN_FAKE_CODEX_MODE: "steer-waits", AGENTRUN_WORKSPACE_ROOT: path.join(context.tmp, "workspaces-steer") }, oneShot: true, pollIntervalMs: 50 }); + await waitForCommandState(client, steerRun.runId, steerRun.commandId, "acknowledged"); + const steerCommand = await client.post(`/api/v1/runs/${steerRun.runId}/commands`, { type: "steer", payload: { prompt: "STEER_MARK_SELFTEST", traceId: "hwlab-command-steer" }, idempotencyKey: "hwlab-command-steer" }) as { id: string }; + await waitForCommandState(client, steerRun.runId, steerCommand.id, "completed"); + const steerRunnerResult = await steerRunner as JsonRecord; + assert.equal(steerRunnerResult.terminalStatus, "completed"); + const steerTurnEnvelope = await client.get(`/api/v1/runs/${steerRun.runId}/commands/${steerRun.commandId}/result`) as JsonRecord; + assert.equal(steerTurnEnvelope.terminalStatus, "completed"); + assert.match(String(steerTurnEnvelope.reply), /steered:STEER_MARK_SELFTEST/u); + const steerCommandEnvelope = await client.get(`/api/v1/runs/${steerRun.runId}/commands/${steerCommand.id}/result`) as JsonRecord; + assert.equal(steerCommandEnvelope.terminalStatus, "completed"); + const steerEventsResponse = await client.get(`/api/v1/runs/${steerRun.runId}/events?afterSeq=0&limit=200`) as { items?: Array<{ type?: string; payload?: JsonRecord }> }; + const steerEvents = steerEventsResponse.items ?? []; + assert.ok(steerEvents.some((event) => event.type === "backend_status" && event.payload?.phase === "steer-command-acknowledged" && event.payload?.commandId === steerCommand.id && event.payload?.targetCommandId === steerRun.commandId)); + assert.ok(steerEvents.some((event) => event.type === "backend_status" && event.payload?.phase === "turn/steer:completed" && event.payload?.commandId === steerCommand.id && event.payload?.targetCommandId === steerRun.commandId)); + const runningCancel = await createHwlabRun(client, context, bundle, "hwlab-session-cancel-running", "cancel running", "hwlab-command-cancel-running", 10_000); const running = runOnce({ managerUrl: server.baseUrl, runId: runningCancel.runId, codexCommand: context.fakeCodexCommand, codexArgs: context.fakeCodexArgs, codexHome: context.codexHome, env: { CODEX_HOME: context.codexHome, AGENTRUN_FAKE_CODEX_MODE: "missing-terminal", AGENTRUN_WORKSPACE_ROOT: path.join(context.tmp, "workspaces-running-cancel") }, oneShot: true }); await waitForCommandState(client, runningCancel.runId, runningCancel.commandId, "acknowledged"); @@ -138,7 +155,7 @@ console.log(JSON.stringify({ apiVersion: manifest.apiVersion, kind: manifest.kin const runningResult = await running; assert.equal(runningResult.terminalStatus, "cancelled"); - return { name: "hwlab-manual-dispatch", tests: ["runner-job-idempotency", "pending-cancel", "result-envelope", "session-ref-resume", "resource-bundle-materialization", "resource-bundle-tool-alias", "same-run-runner-multiturn", "running-cancel"] }; + return { name: "hwlab-manual-dispatch", tests: ["runner-job-idempotency", "pending-cancel", "result-envelope", "session-ref-resume", "resource-bundle-materialization", "resource-bundle-tool-alias", "same-run-runner-multiturn", "running-steer", "running-cancel"] }; } finally { await new Promise((resolve) => server.server.close(() => resolve())); } diff --git a/src/selftest/fake-codex-app-server.ts b/src/selftest/fake-codex-app-server.ts index e6d42f4..10bc5ad 100644 --- a/src/selftest/fake-codex-app-server.ts +++ b/src/selftest/fake-codex-app-server.ts @@ -7,6 +7,7 @@ if (process.env.AGENTRUN_FAKE_CODEX_START_FILE) appendFileSync(process.env.AGENT let threadCounter = 0; let turnCounter = 0; let observedThreadModel = false; +let activeSteerTurn: { id: string; completed: boolean; timer: NodeJS.Timeout | null } | null = null; for await (const line of rl) { const trimmed = String(line).trim(); @@ -148,6 +149,18 @@ for await (const line of rl) { }, 50); continue; } + if (mode === "steer-waits") { + turnCounter += 1; + const turn = { id: `turn_selftest_${turnCounter}`, status: "running" }; + notify("turn/started", { turn }); + activeSteerTurn = { + id: turn.id, + completed: false, + timer: setTimeout(() => completeActiveSteerTurn("timeout-no-steer"), 2_000), + }; + respond(message.id, { turn }); + continue; + } if (mode === "noisy-reasoning-events") { turnCounter += 1; const turn = { id: `turn_selftest_${turnCounter}`, status: "completed" }; @@ -176,6 +189,18 @@ for await (const line of rl) { respond(message.id, { turn }); continue; } + if (message.method === "turn/steer") { + if (mode !== "steer-waits" || !activeSteerTurn) { + respond(message.id, null, { code: -32000, message: "no active fake turn for steer" }); + continue; + } + const text = steerText(message.params?.input); + notify("item/agentMessage/delta", { itemId: "msg_steer", delta: `steered:${text}` }); + notify("item/completed", { item: { id: "msg_steer", type: "agentMessage", text: `steered:${text}` } }); + respond(message.id, { accepted: true }); + setTimeout(() => completeActiveSteerTurn("steer-applied"), 20); + continue; + } respond(message.id, null, { code: -32601, message: `unsupported fake method ${message.method ?? "unknown"}` }); } @@ -187,3 +212,20 @@ function respond(id: number | undefined, result: unknown, error?: unknown): void function notify(method: string, params: unknown): void { process.stdout.write(`${JSON.stringify({ method, params })}\n`); } + +function completeActiveSteerTurn(reason: string): void { + if (!activeSteerTurn || activeSteerTurn.completed) return; + activeSteerTurn.completed = true; + if (activeSteerTurn.timer) clearTimeout(activeSteerTurn.timer); + const turn = { id: activeSteerTurn.id, status: "completed", reason }; + notify("turn/completed", { turn }); +} + +function steerText(input: unknown): string { + if (!Array.isArray(input)) return ""; + return input.flatMap((item) => { + if (typeof item !== "object" || item === null || Array.isArray(item)) return []; + const text = (item as Record).text; + return typeof text === "string" ? [text] : []; + }).join(""); +}