Merge pull request #67 from pikasTech/fix/v01-steer-685

feat: 支持运行中 steer command
This commit is contained in:
Lyon
2026-06-02 10:05:39 +08:00
committed by GitHub
15 changed files with 206 additions and 13 deletions
+2 -1
View File
@@ -27,7 +27,7 @@ CLI 默认输出 JSON。空 stdout 是失败,不是成功。每个命令都必
./scripts/agentrun runs events <runId> --after-seq <n> --limit <n>
./scripts/agentrun runs result <runId> [--command-id <commandId>]
./scripts/agentrun runs cancel <runId> [--reason <text>]
./scripts/agentrun commands create <runId> --type turn --json-file <payload.json>
./scripts/agentrun commands create <runId> --type turn|steer|interrupt --json-file <payload.json>
./scripts/agentrun commands show <commandId> --run-id <runId>
./scripts/agentrun commands result <commandId> --run-id <runId>
./scripts/agentrun commands cancel <commandId> [--reason <text>]
@@ -43,6 +43,7 @@ CLI 默认输出 JSON。空 stdout 是失败,不是成功。每个命令都必
行为必须保持以下规则:
- `runs create` 创建 durable facts 并立即返回。
- `commands create` 创建 durable command`turn` 启动一轮对话,`steer` 只在同 run 有 active turn 时由 runner 转发到 Codex `turn/steer`,迟到 steer 返回结构化 blocked,不终结 run。
- `runner start` 启动本地进程或 Kubernetes Job,并返回 process/job identity、log path 和 poll commands。
- `events` 默认分页且有界。
- `server logs` 返回有界日志,并指向完整日志文件。
+1 -1
View File
@@ -90,7 +90,7 @@ Manager 只承接 HWLAB v0.2 Code Agent 的通用执行事实,不承接 HWLAB
| `executionPolicy` | 必填或由 manager 显式补齐默认值,至少包含 sandbox、approval、timeout、network 和 secretScope。 |
| `traceSink` | 字段必须存在;可以为 `null` 或显式 sink。 |
`POST /api/v1/runs/:runId/commands` 必须支持 idempotency key。相同 key 且 payload hash 相同应返回既有 command;相同 key 但 payload hash 不同必须结构化失败。
`POST /api/v1/runs/:runId/commands` 必须支持 idempotency key。相同 key 且 payload hash 相同应返回既有 command;相同 key 但 payload hash 不同必须结构化失败。`type=turn` 是普通对话 command`type=steer` 是面向同 run active turn 的运行中引导 commandpayload 必须包含非空 `prompt``message``text`,普通 runner poll 不得把它当作新 turn 执行;`type=interrupt` 只保留 durable command 语义,业务 cancel 仍以 run/command cancel API 为权威。
## Tenant Policy Boundary
@@ -64,6 +64,7 @@ claimed -> lease_lost
- runner 必须先 register,再 claim runclaim 失败不能继续调用 backend。
- lease heartbeat 必须可观察;过期或冲突时写入 failure event 或明确退出原因。
- command 只能从 manager poll;不得从本地文件或临时参数伪造正式 command。
- runner 的普通 poll 只选择 pending `turn`;当 backend adapter 暴露 active turn control 后,runner 才在同 run 内轮询 pending `steer` commandack 后调用 backend 的 steer 能力并单独终结该 steer command。active turn 结束后到达的 steer 必须结构化 blocked,不得启动新 turn,也不得把 run 标为 terminal。
- backend 产生的所有可见输出必须先经过 adapter normalization 和 redaction,再 append 到 managerbackend_status 至少包含 redacted profile/backendKind/protocol 摘要。
- 单个 command terminal 上报后 runner 不应立即退出,而应继续 poll 同一 run 的 pending command,直到 idle timeout、lease 冲突或 run terminal。退出码与 runner loop 终态必须一致或在日志中可解释。
+1 -1
View File
@@ -18,7 +18,7 @@ Codex stdio backend 是 AgentRun `v0.1` 的第一真实 Code Agent backend kind
codex app-server --listen stdio://
```
Adapter 通过 stdin 写入换行分隔 JSON-RPC 请求,通过 stdout 逐行读取 JSON-RPC response 和 notificationstderr 只作为有界诊断日志。最小请求序列是 `initialize``thread/start``thread/resume``turn/start`response 中必须提取 thread/turn identitynotification 和后续输出必须归一化为 `backend_status``assistant_message``tool_call``command_output``error``terminal_status` events。
Adapter 通过 stdin 写入换行分隔 JSON-RPC 请求,通过 stdout 逐行读取 JSON-RPC response 和 notificationstderr 只作为有界诊断日志。最小请求序列是 `initialize``thread/start``thread/resume``turn/start`response 中必须提取 thread/turn identitynotification 和后续输出必须归一化为 `backend_status``assistant_message``tool_call``command_output``error``terminal_status` events。运行中 steer 使用同一 app-server 进程的 `turn/steer` JSON-RPC 方法,参数为 `threadId``expectedTurnId` 和文本 `input` 数组;取消/中断使用 `turn/interrupt`,参数为 `threadId``turnId`
不得把以下路径作为 `v0.1` Codex stdio backend 的正式实现或综合联调通过证据:直接 Responses HTTP 代理、OpenAI SDK wrapper、`codex exec` 一次性命令输出、fake provider、固定文本回复、只读 shortcut 或本地 shell 模拟。裸 HTTP 或 `codex exec --json` 可以作为 provider/upstream 诊断,但最终通过必须来自 app-server stdio turn。
+1 -1
View File
@@ -39,7 +39,7 @@ CLI 官方 TypeScript 入口固定为 `scripts/agentrun-cli.ts`。在 G14 非交
./scripts/agentrun runs events <runId> --after-seq <n> --limit <n>
./scripts/agentrun runs result <runId> [--command-id <commandId>]
./scripts/agentrun runs cancel <runId> [--reason <text>]
./scripts/agentrun commands create <runId> --type turn --json-file <payload.json>
./scripts/agentrun commands create <runId> --type turn|steer|interrupt --json-file <payload.json>
./scripts/agentrun commands show <commandId> --run-id <runId>
./scripts/agentrun commands result <commandId> --run-id <runId>
./scripts/agentrun commands cancel <commandId> [--reason <text>]
@@ -87,7 +87,7 @@ HWLAB canary 创建 run 时应使用以下字段口径:
| `executionPolicy` | sandbox、network、timeout、secretScope 必须显式,不得由 HWLAB 扩大 AgentRun Secret 范围。 |
| `traceSink` | 可指向 HWLAB trace adapter;为 `null` 时 HWLAB 仍可通过 AgentRun events 轮询。 |
Command 第一阶段要求 `type=turn`用户原始 prompt、conversation metadata、profile 选择和 HWLAB trace correlation 必须作为 command payload 的非敏感字段保存;不得把 cookie、session token、provider credential、device internal token 或 Secret value 写入 payload。
Command 第一阶段要求 `type=turn``type=steer``turn` 保存用户原始 prompt、conversation metadata、profile 选择和 HWLAB trace correlation`steer` 保存运行中引导文本,并由 runner 在同 run active turn 期间转发到 backend。业务 cancel 仍走 run/command cancel API,不用 `steer` 伪装。不得把 cookie、session token、provider credential、device internal token 或 Secret value 写入 payload。
## 需要补齐的能力
+1 -1
View File
@@ -453,7 +453,7 @@ function help(): JsonRecord {
"runs events <runId> --after-seq <n> --limit <n>",
"runs result <runId> [--command-id <commandId>]",
"runs cancel <runId> [--reason <text>]",
"commands create <runId> --type turn --json-file <payload.json>",
"commands create <runId> --type turn|steer|interrupt --json-file <payload.json>",
"commands show <commandId> --run-id <runId>",
"commands result <commandId> --run-id <runId>",
"commands cancel <commandId> [--reason <text>]",
+9
View File
@@ -2,6 +2,13 @@ import type { BackendEvent, BackendTurnResult, CommandRecord, RunRecord } from "
import { CodexStdioBackendSession, runCodexStdioTurn, type CodexStdioTurnOptions } from "./codex-stdio.js";
import { backendProfileSpec } from "../common/backend-profiles.js";
export interface BackendActiveTurnControl {
threadId: string;
turnId: string;
steer(prompt: string): Promise<void>;
interrupt(): Promise<void>;
}
export interface BackendAdapterOptions {
codexCommand?: string;
codexArgs?: string[];
@@ -9,6 +16,7 @@ export interface BackendAdapterOptions {
workspacePath?: string;
abortSignal?: AbortSignal;
onEvent?: (event: BackendEvent) => void | Promise<void>;
onActiveTurn?: (control: BackendActiveTurnControl) => void | (() => void);
env?: NodeJS.ProcessEnv;
}
@@ -54,5 +62,6 @@ export function backendTurnOptions(run: RunRecord, command: CommandRecord, optio
if (options.codexHome) turnOptions.codexHome = options.codexHome;
if (options.abortSignal) turnOptions.abortSignal = options.abortSignal;
if (options.onEvent) turnOptions.onEvent = options.onEvent;
if (options.onActiveTurn) turnOptions.onActiveTurn = options.onActiveTurn;
return turnOptions;
}
+23
View File
@@ -48,6 +48,14 @@ export interface CodexStdioTurnOptions {
codexHome?: string;
abortSignal?: AbortSignal;
onEvent?: (event: BackendEvent) => void | Promise<void>;
onActiveTurn?: (control: CodexActiveTurnControl) => void | (() => void);
}
export interface CodexActiveTurnControl {
threadId: string;
turnId: string;
steer(prompt: string): Promise<void>;
interrupt(): Promise<void>;
}
interface PendingRequest {
@@ -396,6 +404,7 @@ async function runCodexStdioTurnWithSession(options: CodexStdioTurnOptions, sess
const terminalPromise = new Promise<void>((resolve) => { terminalResolve = resolve; });
let client: CodexStdioClient | null = null;
const requestTimeoutMs = Math.min(positiveTimeout(options.timeoutMs), requestTimeoutCapMs);
let stopActiveTurn: (() => void) | undefined;
const abortTurn = (): void => {
if (terminal) return;
terminal = { status: "cancelled", failureKind: "cancelled", message: "cancel requested" };
@@ -463,6 +472,19 @@ async function runCodexStdioTurnWithSession(options: CodexStdioTurnOptions, sess
const turnResponse = requireResponseRecord(await client.request("turn/start", withOptionalModel({ threadId, input: textInput(options.prompt), cwd: options.cwd, approvalPolicy: options.approvalPolicy }, options.model), requestTimeoutMs), "turn/start");
turnId = requireNestedId(turnResponse, "turn/start", "turn");
emitEvent({ type: "backend_status", payload: { phase: "turn/start:completed", turnId } });
if (threadId && turnId && options.onActiveTurn) {
const maybeStop = options.onActiveTurn({
threadId,
turnId,
steer: async (prompt: string) => {
await client!.request("turn/steer", { threadId: threadId!, expectedTurnId: turnId!, input: textInput(prompt) }, requestTimeoutMs);
},
interrupt: async () => {
await client!.request("turn/interrupt", { threadId: threadId!, turnId: turnId! }, requestTimeoutMs);
},
});
if (typeof maybeStop === "function") stopActiveTurn = maybeStop;
}
const race = await Promise.race([
terminalPromise.then(() => ({ kind: "terminal" as const })),
@@ -480,6 +502,7 @@ async function runCodexStdioTurnWithSession(options: CodexStdioTurnOptions, sess
emitEvent({ type: "error", payload: { failureKind: failure.failureKind, message: failure.message, phase: failure.phase, details: failure.details } });
}
} finally {
stopActiveTurn?.();
stopNotifications();
options.abortSignal?.removeEventListener("abort", abortTurn);
clearTimeout(timeout);
+3 -1
View File
@@ -113,8 +113,10 @@ export interface RunRecord extends CreateRunInput {
leaseExpiresAt: string | null;
}
export type CommandType = "turn" | "steer" | "interrupt";
export interface CreateCommandInput extends JsonRecord {
type: "turn" | "interrupt";
type: CommandType;
payload: JsonRecord;
idempotencyKey?: string;
}
+10 -1
View File
@@ -215,12 +215,21 @@ function validateBackendSecretScope(backendProfile: BackendProfile, executionPol
export function validateCreateCommand(input: unknown): CreateCommandInput {
const record = asRecord(input, "command");
const type = requiredString(record, "type");
if (type !== "turn" && type !== "interrupt") throw new AgentRunError("schema-invalid", `command type ${type} is not supported`, { httpStatus: 400 });
if (type !== "turn" && type !== "steer" && type !== "interrupt") throw new AgentRunError("schema-invalid", `command type ${type} is not supported`, { httpStatus: 400 });
const payload = asRecord(record.payload ?? {}, "payload");
if (type === "steer" && !steerPrompt(payload)) throw new AgentRunError("schema-invalid", "steer command payload requires a non-empty prompt, message, or text", { httpStatus: 400 });
const idempotencyKey = typeof record.idempotencyKey === "string" && record.idempotencyKey.trim().length > 0 ? record.idempotencyKey.trim() : undefined;
return { type, payload, ...(idempotencyKey ? { idempotencyKey } : {}) };
}
function steerPrompt(payload: JsonRecord): string | null {
for (const key of ["prompt", "message", "text"]) {
const value = payload[key];
if (typeof value === "string" && value.trim().length > 0) return value.trim();
}
return null;
}
export function validateCreateQueueTask(input: unknown): CreateQueueTaskInput {
const record = asRecord(input, "queueTask");
const tenantId = requiredString(record, "tenantId");
+10 -3
View File
@@ -64,12 +64,19 @@ export class RunnerManagerApi {
}
async pollCommands(runId: string, options: { afterSeq?: number; limit?: number; commandId?: string }): Promise<PollCommandsResult> {
const listOptions: { afterSeq?: number; limit?: number } = {};
if (options.afterSeq !== undefined) listOptions.afterSeq = options.afterSeq;
if (options.limit !== undefined) listOptions.limit = options.limit;
const items = await this.listCommands(runId, listOptions);
const selected = options.commandId ? items.find((item) => item.id === options.commandId && item.state === "pending" && item.type === "turn") ?? null : items.find((item) => item.state === "pending" && item.type === "turn") ?? null;
return { items, selected };
}
async listCommands(runId: string, options: { afterSeq?: number; limit?: number } = {}): Promise<CommandRecord[]> {
const afterSeq = options.afterSeq ?? 0;
const limit = options.limit ?? 20;
const response = await this.client.get(`/api/v1/runs/${encodeURIComponent(runId)}/commands?afterSeq=${afterSeq}&limit=${limit}`) as { items?: CommandRecord[] };
const items = Array.isArray(response.items) ? response.items : [];
const selected = options.commandId ? items.find((item) => item.id === options.commandId && item.state === "pending" && item.type === "turn") ?? null : items.find((item) => item.state === "pending" && item.type === "turn") ?? null;
return { items, selected };
return Array.isArray(response.items) ? response.items : [];
}
async ackCommand(commandId: string): Promise<CommandRecord> {
+83 -1
View File
@@ -1,5 +1,5 @@
import { RunnerManagerApi, failureKindFromError, terminalStatusForFailure, errorMessage } from "./manager-api.js";
import { createBackendSession, runBackendTurn, type BackendAdapterOptions, type BackendSession } from "../backend/adapter.js";
import { createBackendSession, runBackendTurn, type BackendActiveTurnControl, type BackendAdapterOptions, type BackendSession } from "../backend/adapter.js";
import { materializeResourceBundle } from "./resource-bundle.js";
import type { BackendEvent, BackendProfile, CommandRecord, FailureKind, JsonRecord, RunRecord, RunnerRecord, TerminalStatus } from "../common/types.js";
import { AgentRunError } from "../common/errors.js";
@@ -81,6 +81,7 @@ export async function runOnce(options: RunnerOnceOptions): Promise<JsonRecord> {
firstPoll = false;
const command = commandsResponse.selected;
if (!command) {
await failPendingSteerWithoutActiveTurn(api, options.runId, commandsResponse.items, runner.id, attemptId);
if (options.oneShot === true) return noPendingResult(runner, claimed, commandResults, commandsResponse.items.length, "one-shot-no-pending");
if (Date.now() - idleSince >= idleTimeoutMs) return noPendingResult(runner, claimed, commandResults, commandsResponse.items.length, "idle-timeout");
await sleep(pollIntervalMs);
@@ -153,6 +154,7 @@ async function executeCommand(api: RunnerManagerApi, options: RunnerOnceOptions,
const abortController = new AbortController();
const stopCancelWatch = watchCancellation(api, options.runId, command.id, abortController);
const stopBackendProgress = startBackendProgress(api, options.runId, command.id, attemptId, runner.id, options.backendProfile ?? null);
let stopSteerWatch: (() => void) | undefined;
try {
const latestRun = await api.getRun(options.runId);
const backendOptions = {
@@ -162,6 +164,13 @@ async function executeCommand(api: RunnerManagerApi, options: RunnerOnceOptions,
onEvent: async (event: BackendEvent) => {
await appendBestEffort(api, options.runId, annotateCommandEvent(event, command.id, attemptId, runner.id));
},
onActiveTurn: (control: BackendActiveTurnControl) => {
stopSteerWatch = startSteerWatch(api, options.runId, command.id, attemptId, runner.id, control, options.pollIntervalMs);
return () => {
stopSteerWatch?.();
stopSteerWatch = undefined;
};
},
};
const result = backendSession ? await backendSession.runTurn(latestRun, command, backendOptions) : await runBackendTurn(latestRun, command, backendOptions);
for (const event of result.events) {
@@ -174,12 +183,85 @@ async function executeCommand(api: RunnerManagerApi, options: RunnerOnceOptions,
const failure = { failureKind, terminalStatus: terminalStatusForFailure(failureKind), message: errorMessage(error) };
return await reportCommandFailure(api, options.runId, command.id, runner, attemptId, failure, "runner:execute");
} finally {
stopSteerWatch?.();
stopBackendProgress();
await appendBestEffort(api, options.runId, { type: "backend_status", payload: { phase: "backend-turn-finished", commandId: command.id, attemptId, runnerId: runner.id } });
stopCancelWatch();
}
}
function startSteerWatch(api: RunnerManagerApi, runId: string, targetCommandId: string, attemptId: string, runnerId: string, control: BackendActiveTurnControl, pollIntervalMs: number | undefined): () => void {
let stopped = false;
let polling = false;
const seen = new Set<string>();
const intervalMs = normalizePollIntervalMs(pollIntervalMs);
const poll = async (): Promise<void> => {
if (stopped || polling) return;
polling = true;
try {
const commands = await api.listCommands(runId, { afterSeq: 0, limit: 100 });
const pendingSteer = commands.filter((item) => item.type === "steer" && item.state === "pending" && !seen.has(item.id));
for (const steerCommand of pendingSteer) {
seen.add(steerCommand.id);
await handleSteerCommand(api, runId, steerCommand, targetCommandId, attemptId, runnerId, control);
}
} catch {
// The active backend turn remains authoritative; missed steer commands stay pending for the next poll.
} finally {
polling = false;
}
};
const timer = setInterval(() => { void poll(); }, intervalMs);
void poll();
return () => {
stopped = true;
clearInterval(timer);
};
}
async function handleSteerCommand(api: RunnerManagerApi, runId: string, command: CommandRecord, targetCommandId: string, attemptId: string, runnerId: string, control: BackendActiveTurnControl): Promise<void> {
const acked = await api.ackCommand(command.id);
if (acked.state === "cancelled") {
await api.reportCommandStatus(command.id, { terminalStatus: "cancelled", failureKind: "cancelled", failureMessage: "steer command cancelled before delivery", threadId: control.threadId, turnId: control.turnId });
return;
}
const prompt = steerPrompt(command.payload);
if (!prompt) {
await reportNonTerminalCommandFailure(api, runId, command.id, attemptId, runnerId, { terminalStatus: "blocked", failureKind: "schema-invalid", message: "steer command payload requires a non-empty prompt, message, or text" }, "runner:steer:payload", control);
return;
}
await appendBestEffort(api, runId, { type: "backend_status", payload: { phase: "steer-command-acknowledged", commandId: command.id, commandType: "steer", targetCommandId, attemptId, runnerId, threadId: control.threadId, turnId: control.turnId } });
try {
await control.steer(prompt);
await appendBestEffort(api, runId, { type: "backend_status", payload: { phase: "turn/steer:completed", commandId: command.id, commandType: "steer", targetCommandId, attemptId, runnerId, threadId: control.threadId, turnId: control.turnId } });
await api.reportCommandStatus(command.id, { terminalStatus: "completed", failureKind: null, failureMessage: null, threadId: control.threadId, turnId: control.turnId });
} catch (error) {
const failureKind = failureKindFromError(error);
await reportNonTerminalCommandFailure(api, runId, command.id, attemptId, runnerId, { terminalStatus: terminalStatusForFailure(failureKind), failureKind, message: errorMessage(error) }, "runner:steer", control);
}
}
async function failPendingSteerWithoutActiveTurn(api: RunnerManagerApi, runId: string, commands: CommandRecord[], runnerId: string, attemptId: string): Promise<void> {
for (const command of commands.filter((item) => item.type === "steer" && item.state === "pending")) {
const acked = await api.ackCommand(command.id);
if (acked.state === "cancelled") continue;
await reportNonTerminalCommandFailure(api, runId, command.id, attemptId, runnerId, { terminalStatus: "blocked", failureKind: "schema-invalid", message: "steer command requires an active turn" }, "runner:steer:no-active-turn");
}
}
async function reportNonTerminalCommandFailure(api: RunnerManagerApi, runId: string, commandId: string, attemptId: string, runnerId: string, failure: { terminalStatus: TerminalStatus; failureKind: FailureKind; message: string }, phase: string, control?: BackendActiveTurnControl): Promise<void> {
await appendBestEffort(api, runId, { type: "error", payload: { failureKind: failure.failureKind, message: failure.message, phase, commandId, attemptId, runnerId, ...(control ? { threadId: control.threadId, turnId: control.turnId } : {}) } });
await api.reportCommandStatus(commandId, { terminalStatus: failure.terminalStatus, failureKind: failure.failureKind, failureMessage: failure.message, ...(control ? { threadId: control.threadId, turnId: control.turnId } : {}) });
}
function steerPrompt(payload: JsonRecord): string | null {
for (const key of ["prompt", "message", "text"]) {
const value = payload[key];
if (typeof value === "string" && value.trim().length > 0) return value.trim();
}
return null;
}
function isTerminalRun(run: RunRecord): boolean {
return run.status === "completed" || run.status === "failed" || run.status === "blocked" || run.status === "cancelled";
}
+18 -1
View File
@@ -131,6 +131,23 @@ console.log(JSON.stringify({ apiVersion: manifest.apiVersion, kind: manifest.kin
assert.equal(secondEnvelope.terminalStatus, "completed");
assert.equal(secondEnvelope.reply, "fake codex stdio reply");
const steerRun = await createHwlabRun(client, context, bundle, "hwlab-session-steer", "start a turn that waits for steer", "hwlab-command-steer-turn", 10_000);
const steerRunner = runOnce({ managerUrl: server.baseUrl, runId: steerRun.runId, commandId: steerRun.commandId, codexCommand: context.fakeCodexCommand, codexArgs: context.fakeCodexArgs, codexHome: context.codexHome, env: { CODEX_HOME: context.codexHome, AGENTRUN_FAKE_CODEX_MODE: "steer-waits", AGENTRUN_WORKSPACE_ROOT: path.join(context.tmp, "workspaces-steer") }, oneShot: true, pollIntervalMs: 50 });
await waitForCommandState(client, steerRun.runId, steerRun.commandId, "acknowledged");
const steerCommand = await client.post(`/api/v1/runs/${steerRun.runId}/commands`, { type: "steer", payload: { prompt: "STEER_MARK_SELFTEST", traceId: "hwlab-command-steer" }, idempotencyKey: "hwlab-command-steer" }) as { id: string };
await waitForCommandState(client, steerRun.runId, steerCommand.id, "completed");
const steerRunnerResult = await steerRunner as JsonRecord;
assert.equal(steerRunnerResult.terminalStatus, "completed");
const steerTurnEnvelope = await client.get(`/api/v1/runs/${steerRun.runId}/commands/${steerRun.commandId}/result`) as JsonRecord;
assert.equal(steerTurnEnvelope.terminalStatus, "completed");
assert.match(String(steerTurnEnvelope.reply), /steered:STEER_MARK_SELFTEST/u);
const steerCommandEnvelope = await client.get(`/api/v1/runs/${steerRun.runId}/commands/${steerCommand.id}/result`) as JsonRecord;
assert.equal(steerCommandEnvelope.terminalStatus, "completed");
const steerEventsResponse = await client.get(`/api/v1/runs/${steerRun.runId}/events?afterSeq=0&limit=200`) as { items?: Array<{ type?: string; payload?: JsonRecord }> };
const steerEvents = steerEventsResponse.items ?? [];
assert.ok(steerEvents.some((event) => event.type === "backend_status" && event.payload?.phase === "steer-command-acknowledged" && event.payload?.commandId === steerCommand.id && event.payload?.targetCommandId === steerRun.commandId));
assert.ok(steerEvents.some((event) => event.type === "backend_status" && event.payload?.phase === "turn/steer:completed" && event.payload?.commandId === steerCommand.id && event.payload?.targetCommandId === steerRun.commandId));
const runningCancel = await createHwlabRun(client, context, bundle, "hwlab-session-cancel-running", "cancel running", "hwlab-command-cancel-running", 10_000);
const running = runOnce({ managerUrl: server.baseUrl, runId: runningCancel.runId, codexCommand: context.fakeCodexCommand, codexArgs: context.fakeCodexArgs, codexHome: context.codexHome, env: { CODEX_HOME: context.codexHome, AGENTRUN_FAKE_CODEX_MODE: "missing-terminal", AGENTRUN_WORKSPACE_ROOT: path.join(context.tmp, "workspaces-running-cancel") }, oneShot: true });
await waitForCommandState(client, runningCancel.runId, runningCancel.commandId, "acknowledged");
@@ -138,7 +155,7 @@ console.log(JSON.stringify({ apiVersion: manifest.apiVersion, kind: manifest.kin
const runningResult = await running;
assert.equal(runningResult.terminalStatus, "cancelled");
return { name: "hwlab-manual-dispatch", tests: ["runner-job-idempotency", "pending-cancel", "result-envelope", "session-ref-resume", "resource-bundle-materialization", "resource-bundle-tool-alias", "same-run-runner-multiturn", "running-cancel"] };
return { name: "hwlab-manual-dispatch", tests: ["runner-job-idempotency", "pending-cancel", "result-envelope", "session-ref-resume", "resource-bundle-materialization", "resource-bundle-tool-alias", "same-run-runner-multiturn", "running-steer", "running-cancel"] };
} finally {
await new Promise<void>((resolve) => server.server.close(() => resolve()));
}
+42
View File
@@ -7,6 +7,7 @@ if (process.env.AGENTRUN_FAKE_CODEX_START_FILE) appendFileSync(process.env.AGENT
let threadCounter = 0;
let turnCounter = 0;
let observedThreadModel = false;
let activeSteerTurn: { id: string; completed: boolean; timer: NodeJS.Timeout | null } | null = null;
for await (const line of rl) {
const trimmed = String(line).trim();
@@ -148,6 +149,18 @@ for await (const line of rl) {
}, 50);
continue;
}
if (mode === "steer-waits") {
turnCounter += 1;
const turn = { id: `turn_selftest_${turnCounter}`, status: "running" };
notify("turn/started", { turn });
activeSteerTurn = {
id: turn.id,
completed: false,
timer: setTimeout(() => completeActiveSteerTurn("timeout-no-steer"), 2_000),
};
respond(message.id, { turn });
continue;
}
if (mode === "noisy-reasoning-events") {
turnCounter += 1;
const turn = { id: `turn_selftest_${turnCounter}`, status: "completed" };
@@ -176,6 +189,18 @@ for await (const line of rl) {
respond(message.id, { turn });
continue;
}
if (message.method === "turn/steer") {
if (mode !== "steer-waits" || !activeSteerTurn) {
respond(message.id, null, { code: -32000, message: "no active fake turn for steer" });
continue;
}
const text = steerText(message.params?.input);
notify("item/agentMessage/delta", { itemId: "msg_steer", delta: `steered:${text}` });
notify("item/completed", { item: { id: "msg_steer", type: "agentMessage", text: `steered:${text}` } });
respond(message.id, { accepted: true });
setTimeout(() => completeActiveSteerTurn("steer-applied"), 20);
continue;
}
respond(message.id, null, { code: -32601, message: `unsupported fake method ${message.method ?? "unknown"}` });
}
@@ -187,3 +212,20 @@ function respond(id: number | undefined, result: unknown, error?: unknown): void
function notify(method: string, params: unknown): void {
process.stdout.write(`${JSON.stringify({ method, params })}\n`);
}
function completeActiveSteerTurn(reason: string): void {
if (!activeSteerTurn || activeSteerTurn.completed) return;
activeSteerTurn.completed = true;
if (activeSteerTurn.timer) clearTimeout(activeSteerTurn.timer);
const turn = { id: activeSteerTurn.id, status: "completed", reason };
notify("turn/completed", { turn });
}
function steerText(input: unknown): string {
if (!Array.isArray(input)) return "";
return input.flatMap((item) => {
if (typeof item !== "object" || item === null || Array.isArray(item)) return [];
const text = (item as Record<string, unknown>).text;
return typeof text === "string" ? [text] : [];
}).join("");
}