From 6fb8f7483a60d09f134fcd8938a7a45766594120 Mon Sep 17 00:00:00 2001 From: Codex Date: Mon, 1 Jun 2026 22:33:26 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=94=AF=E6=8C=81=E5=90=8C=20run=20run?= =?UTF-8?q?ner=20=E5=A4=9A=E8=BD=AE=20command?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- scripts/src/cli.ts | 5 + src/common/events.ts | 5 +- src/mgr/postgres-store.ts | 21 +-- src/mgr/result.ts | 31 +++- src/mgr/runner-job-status.ts | 6 +- src/mgr/server.ts | 8 +- src/mgr/store.ts | 21 ++- src/runner/k8s-job.ts | 1 + src/runner/main.ts | 3 + src/runner/manager-api.ts | 2 +- src/runner/run-once.ts | 162 ++++++++++++++---- src/selftest/cases/30-codex-stdio.ts | 26 +-- .../cases/50-hwlab-manual-dispatch.ts | 21 ++- .../cases/60-hwlab-baseline-contract.ts | 6 +- 14 files changed, 237 insertions(+), 81 deletions(-) diff --git a/scripts/src/cli.ts b/scripts/src/cli.ts index 7734f0a..c89d672 100644 --- a/scripts/src/cli.ts +++ b/scripts/src/cli.ts @@ -84,6 +84,11 @@ async function dispatch(args: ParsedArgs): Promise { } if (codexCommand) options.codexCommand = codexCommand; if (codexHome) options.codexHome = codexHome; + const idleTimeoutMs = optionalFlag(args, "idle-timeout-ms"); + const pollIntervalMs = optionalFlag(args, "poll-interval-ms"); + if (idleTimeoutMs) options.idleTimeoutMs = Number(idleTimeoutMs); + if (pollIntervalMs) options.pollIntervalMs = Number(pollIntervalMs); + if (args.flags.get("one-shot") === true) options.oneShot = true; return runOnce(options) as unknown as JsonValue; } if (group === "runner" && command === "job") return renderRunnerJob(args); diff --git a/src/common/events.ts b/src/common/events.ts index 75a6841..8aa1ddb 100644 --- a/src/common/events.ts +++ b/src/common/events.ts @@ -33,21 +33,24 @@ export function normalizeRunEventPayload(type: EventType, payload: JsonRecord): export function eventContractSummary(events: RunEvent[]): JsonRecord { const issues: JsonRecord[] = []; let terminalStatusCount = 0; + let runTerminalStatusCount = 0; for (let index = 0; index < events.length; index += 1) { const event = events[index]; if (!eventTypeSet.has(event.type)) issues.push({ code: "event-type-invalid", seq: event.seq, type: event.type }); if (event.seq !== index + 1) issues.push({ code: "seq-not-contiguous", expectedSeq: index + 1, actualSeq: event.seq }); if (event.type === "terminal_status") { terminalStatusCount += 1; + if (typeof event.payload.commandId !== "string") runTerminalStatusCount += 1; if (!isTerminalStatus(event.payload.terminalStatus)) issues.push({ code: "terminal-status-invalid", seq: event.seq, terminalStatus: String(event.payload.terminalStatus ?? "") }); } } - if (terminalStatusCount > 1) issues.push({ code: "terminal-status-duplicated", terminalStatusCount }); + if (runTerminalStatusCount > 1) issues.push({ code: "run-terminal-status-duplicated", runTerminalStatusCount }); return { ok: issues.length === 0, eventCount: events.length, lastSeq: events.at(-1)?.seq ?? 0, terminalStatusCount, + runTerminalStatusCount, issues, }; } diff --git a/src/mgr/postgres-store.ts b/src/mgr/postgres-store.ts index b224904..03e6dc9 100644 --- a/src/mgr/postgres-store.ts +++ b/src/mgr/postgres-store.ts @@ -6,7 +6,7 @@ import { redactJson } from "../common/redaction.js"; import type { BackendProfile, BackendTurnResult, CommandRecord, CommandState, CreateCommandInput, CreateQueueTaskInput, CreateRunInput, EventType, FailureKind, JsonRecord, JsonValue, QueueCommanderSnapshot, QueueReadCursorRecord, QueueStats, QueueTaskListResult, QueueTaskRecord, QueueTaskState, RunEvent, RunnerJobRecord, RunnerRecord, RunRecord, RunStatus, SessionRecord, SessionRef, TerminalStatus } from "../common/types.js"; import { newId, nowIso, stableHash } from "../common/validation.js"; import type { AgentRunStore, ListQueueTasksInput, SaveRunnerJobInput, StoreHealth } from "./store.js"; -import { assertSessionBoundary, buildQueueStats, clampQueueLimit, commandStateFromTerminal, isTerminalCommandState, isTerminalQueueTaskState, isTerminalRunStatus, parseQueueCursor, queueTaskSort, sessionRefFromRecord, statusFromTerminal, summarizeResourceBundleRef, summarizeSessionRef } from "./store.js"; +import { assertSessionBoundary, buildQueueStats, clampQueueLimit, commandStateFromTerminal, isLeaseExpired, isTerminalCommandState, isTerminalQueueTaskState, isTerminalRunStatus, parseQueueCursor, queueTaskSort, sessionRefFromRecord, statusFromTerminal, summarizeResourceBundleRef, summarizeSessionRef } from "./store.js"; import { backendCapabilitiesSqlValues, mergeBackendCapability } from "../common/backend-profiles.js"; import { normalizeRunEventPayload, requireEventType } from "../common/events.js"; @@ -332,7 +332,8 @@ CREATE TABLE IF NOT EXISTS agentrun_schema_migrations ( async createCommand(runId: string, input: CreateCommandInput): Promise { const payloadHash = stableHash(input.payload); return this.withTransaction(async (client) => { - await this.requireRunForUpdate(client, runId); + const run = await this.requireRunForUpdate(client, runId); + if (isTerminalRunStatus(run.status)) throw new AgentRunError(run.failureKind ?? (run.status === "cancelled" ? "cancelled" : "schema-invalid"), `run ${runId} is already terminal: ${run.status}`, { httpStatus: 409 }); if (input.idempotencyKey) { const existing = await client.query("SELECT * FROM agentrun_commands WHERE run_id = $1 AND idempotency_key = $2", [runId, input.idempotencyKey]); if (existing.rows[0]) { @@ -436,7 +437,7 @@ CREATE TABLE IF NOT EXISTS agentrun_schema_migrations ( return this.withTransaction(async (client) => { const run = await this.requireRunForUpdate(client, runId); if (isTerminalRunStatus(run.status)) throw new AgentRunError(run.failureKind ?? (run.status === "cancelled" ? "cancelled" : "schema-invalid"), `run ${runId} is already terminal: ${run.status}`, { httpStatus: 409 }); - if (run.claimedBy && run.claimedBy !== runnerId) throw new AgentRunError("runner-lease-conflict", `run ${runId} is already claimed`, { httpStatus: 409 }); + if (run.claimedBy && run.claimedBy !== runnerId && !isLeaseExpired(run.leaseExpiresAt)) throw new AgentRunError("runner-lease-conflict", `run ${runId} is already claimed`, { httpStatus: 409 }); const leaseExpiresAt = new Date(Date.now() + leaseMs).toISOString(); const updated = await client.query( `UPDATE agentrun_runs SET status = $2, claimed_by = $3, lease_expires_at = $4, updated_at = $5 WHERE id = $1 RETURNING *`, @@ -478,7 +479,7 @@ CREATE TABLE IF NOT EXISTS agentrun_schema_migrations ( }); } - async finishCommand(commandId: string, result: Pick): Promise { + async finishCommand(commandId: string, result: Pick): Promise { return this.withTransaction(async (client) => { const existing = await client.query("SELECT * FROM agentrun_commands WHERE id = $1 FOR UPDATE", [commandId]); const row = existing.rows[0]; @@ -487,7 +488,9 @@ CREATE TABLE IF NOT EXISTS agentrun_schema_migrations ( if (isTerminalCommandState(command.state)) return command; const state = commandStateFromTerminal(result.terminalStatus); const updated = await client.query("UPDATE agentrun_commands SET state = $2, updated_at = $3 WHERE id = $1 RETURNING *", [commandId, state, nowIso()]); - await this.appendEventWithLockedRun(client, command.runId, "backend_status", { phase: "command-terminal", commandId, state, terminalStatus: result.terminalStatus, failureKind: result.failureKind }); + const run = await this.requireRunForUpdate(client, command.runId); + if (result.threadId && run.sessionRef?.sessionId) await this.upsertSessionThread(client, run, result.threadId, result.turnId ?? null); + await this.appendEventWithLockedRun(client, command.runId, "backend_status", { phase: "command-terminal", commandId, state, terminalStatus: result.terminalStatus, failureKind: result.failureKind, message: result.failureMessage ?? null, threadId: result.threadId ?? null, turnId: result.turnId ?? null }); return commandFromRow(updated.rows[0]); }); } @@ -544,13 +547,7 @@ CREATE TABLE IF NOT EXISTS agentrun_schema_migrations ( const command = commandFromRow(row); if (isTerminalCommandState(command.state)) return command; const updated = await client.query("UPDATE agentrun_commands SET state = 'cancelled', updated_at = $2 WHERE id = $1 RETURNING *", [commandId, nowIso()]); - await this.appendEventWithLockedRun(client, command.runId, "backend_status", { phase: "command-terminal", commandId, state: "cancelled", terminalStatus: "cancelled", failureKind: "cancelled" }); - const run = await this.requireRunForUpdate(client, command.runId); - if (!isTerminalRunStatus(run.status)) { - await this.appendEventWithLockedRun(client, command.runId, "backend_status", { phase: "cancel-requested", reason }); - await client.query(`UPDATE agentrun_runs SET status = 'cancelled', terminal_status = 'cancelled', failure_kind = 'cancelled', failure_message = $2, updated_at = $3 WHERE id = $1`, [command.runId, reason, nowIso()]); - await this.appendEventWithLockedRun(client, command.runId, "terminal_status", { terminalStatus: "cancelled", failureKind: "cancelled", message: reason }); - } + await this.appendEventWithLockedRun(client, command.runId, "backend_status", { phase: "command-terminal", commandId, state: "cancelled", terminalStatus: "cancelled", failureKind: "cancelled", message: reason }); return commandFromRow(updated.rows[0]); }); } diff --git a/src/mgr/result.ts b/src/mgr/result.ts index 02441ee..adeb79f 100644 --- a/src/mgr/result.ts +++ b/src/mgr/result.ts @@ -6,14 +6,17 @@ export async function buildRunResult(store: AgentRunStore, runId: string, comman const run = await store.getRun(runId); const command = await selectCommand(store, runId, commandId); const events = await store.listEvents(runId, 0, 500); + const scopedEvents = command ? eventsForCommand(events, command.id) : events; const jobs = await store.listRunnerJobs(runId, command?.id); const latestJob = jobs.at(-1) ?? null; - const terminalEventStatus = terminalFromEvents(events); - const terminal = terminalEventStatus ?? run.terminalStatus; - const terminalSource = terminalEventStatus ? "terminal_status-event" : run.terminalStatus ? "run-record" : "none"; - const failureKind = run.failureKind ?? failureKindFromEvents(events); - const reply = assistantReply(events); - const blocker = terminal === "blocked" || terminal === "failed" ? { failureKind, message: run.failureMessage ?? messageFromEvents(events) } : null; + const commandTerminal = command ? terminalFromCommand(command) : null; + const terminalEventStatus = terminalFromEvents(scopedEvents); + const terminal = commandTerminal ?? terminalEventStatus ?? run.terminalStatus; + const terminalSource = commandTerminal ? "command-record" : terminalEventStatus ? "terminal_status-event" : run.terminalStatus ? "run-record" : "none"; + const failureKind = command ? failureKindFromEvents(scopedEvents) : run.failureKind ?? failureKindFromEvents(scopedEvents); + const failureMessage = command ? messageFromEvents(scopedEvents) : run.failureMessage ?? messageFromEvents(scopedEvents); + const reply = assistantReply(scopedEvents); + const blocker = terminal === "blocked" || terminal === "failed" ? { failureKind, message: failureMessage } : null; return { runId: run.id, commandId: command?.id ?? commandId ?? null, @@ -29,11 +32,11 @@ export async function buildRunResult(store: AgentRunStore, runId: string, comman completed: terminal === "completed", reply, failureKind, - failureMessage: run.failureMessage ?? messageFromEvents(events), + failureMessage, blocker, lastSeq: events.at(-1)?.seq ?? 0, eventCount: events.length, - artifactSummary: artifactSummary(events), + artifactSummary: artifactSummary(scopedEvents), sessionRef: sessionSummary(run), resourceBundleRef: resourceBundleSummary(run, events), runnerJobCount: jobs.length, @@ -58,6 +61,18 @@ function terminalFromEvents(events: RunEvent[]): TerminalStatus | null { return null; } +function terminalFromCommand(command: CommandRecord): TerminalStatus | null { + if (command.state === "completed") return "completed"; + if (command.state === "failed") return "failed"; + if (command.state === "cancelled") return "cancelled"; + return null; +} + +function eventsForCommand(events: RunEvent[], commandId: string): RunEvent[] { + const scoped = events.filter((event) => event.payload.commandId === commandId || typeof event.payload.commandId !== "string"); + return scoped.length > 0 ? scoped : events; +} + function failureKindFromEvents(events: RunEvent[]): string | null { for (const event of [...events].reverse()) { const value = event.payload.failureKind; diff --git a/src/mgr/runner-job-status.ts b/src/mgr/runner-job-status.ts index 4ac68d2..c3e168d 100644 --- a/src/mgr/runner-job-status.ts +++ b/src/mgr/runner-job-status.ts @@ -1,7 +1,7 @@ import type { JsonRecord, RunEvent, RunnerJobRecord, TerminalStatus } from "../common/types.js"; export function runnerJobStatusSummary(job: RunnerJobRecord, events: RunEvent[] = []): JsonRecord { - const terminalEvent = latestTerminalEvent(events); + const terminalEvent = latestTerminalEvent(events, job.commandId); const runner = recordAt(job.result, "runner"); const jobIdentity = recordAt(job.result, "jobIdentity"); const kubernetes = recordAt(job.result, "kubernetes"); @@ -36,9 +36,11 @@ export function runnerJobStatusSummary(job: RunnerJobRecord, events: RunEvent[] }; } -function latestTerminalEvent(events: RunEvent[]): RunEvent | null { +function latestTerminalEvent(events: RunEvent[], commandId: string): RunEvent | null { for (const event of [...events].reverse()) { + if (event.payload.commandId && event.payload.commandId !== commandId) continue; if (event.type === "terminal_status") return event; + if (event.type === "backend_status" && event.payload.phase === "command-terminal" && event.payload.commandId === commandId) return event; } return null; } diff --git a/src/mgr/server.ts b/src/mgr/server.ts index ad7ff03..80316c0 100644 --- a/src/mgr/server.ts +++ b/src/mgr/server.ts @@ -191,7 +191,13 @@ async function route({ method, url, body, store, sourceCommit, runnerJobDefaults if (method === "PATCH" && commandStatusMatch) { const record = asRecord(body, "commandStatus"); const terminalStatus = record.terminalStatus === "completed" || record.terminalStatus === "failed" || record.terminalStatus === "blocked" || record.terminalStatus === "cancelled" ? record.terminalStatus : "failed"; - return await store.finishCommand(commandStatusMatch[1] ?? "", { terminalStatus, failureKind: typeof record.failureKind === "string" ? record.failureKind as never : null, failureMessage: typeof record.failureMessage === "string" ? record.failureMessage : null }) as unknown as JsonValue; + return await store.finishCommand(commandStatusMatch[1] ?? "", { + terminalStatus, + failureKind: typeof record.failureKind === "string" ? record.failureKind as never : null, + failureMessage: typeof record.failureMessage === "string" ? record.failureMessage : null, + ...(typeof record.threadId === "string" ? { threadId: record.threadId } : {}), + ...(typeof record.turnId === "string" ? { turnId: record.turnId } : {}), + }) as unknown as JsonValue; } const commandCancelMatch = path.match(/^\/api\/v1\/commands\/([^/]+)\/cancel$/u); if (method === "POST" && commandCancelMatch) { diff --git a/src/mgr/store.ts b/src/mgr/store.ts index 11b1eb2..d7349d8 100644 --- a/src/mgr/store.ts +++ b/src/mgr/store.ts @@ -33,7 +33,7 @@ export interface AgentRunStore { claimRun(runId: string, runnerId: string, leaseMs: number): MaybePromise; heartbeat(runId: string, runnerId: string, leaseMs: number): MaybePromise; ackCommand(commandId: string): MaybePromise; - finishCommand(commandId: string, result: Pick): MaybePromise; + finishCommand(commandId: string, result: Pick): MaybePromise; appendEvent(runId: string, type: RunEvent["type"], payload: JsonRecord): MaybePromise; finishRun(runId: string, result: Pick): MaybePromise; cancelRun(runId: string, reason?: string): MaybePromise; @@ -122,7 +122,8 @@ export class MemoryAgentRunStore implements AgentRunStore { } createCommand(runId: string, input: CreateCommandInput): CommandRecord { - this.getRun(runId); + const run = this.getRun(runId); + if (isTerminalRunStatus(run.status)) throw new AgentRunError(run.failureKind ?? (run.status === "cancelled" ? "cancelled" : "schema-invalid"), `run ${runId} is already terminal: ${run.status}`, { httpStatus: 409 }); const payloadHash = stableHash(input.payload); if (input.idempotencyKey) { const existing = Array.from(this.commands.values()).find((command) => command.runId === runId && command.idempotencyKey === input.idempotencyKey); @@ -185,7 +186,7 @@ export class MemoryAgentRunStore implements AgentRunStore { claimRun(runId: string, runnerId: string, leaseMs: number): RunRecord { const run = this.getRun(runId); if (isTerminalRunStatus(run.status)) throw new AgentRunError(run.failureKind ?? (run.status === "cancelled" ? "cancelled" : "schema-invalid"), `run ${runId} is already terminal: ${run.status}`, { httpStatus: 409 }); - if (run.claimedBy && run.claimedBy !== runnerId) throw new AgentRunError("runner-lease-conflict", `run ${runId} is already claimed`, { httpStatus: 409 }); + if (run.claimedBy && run.claimedBy !== runnerId && !isLeaseExpired(run.leaseExpiresAt)) throw new AgentRunError("runner-lease-conflict", `run ${runId} is already claimed`, { httpStatus: 409 }); const next = this.updateRun(runId, { status: "claimed", claimedBy: runnerId, leaseExpiresAt: new Date(Date.now() + leaseMs).toISOString() }); this.appendEvent(runId, "backend_status", { phase: "run-claimed", runnerId }); return next; @@ -206,12 +207,14 @@ export class MemoryAgentRunStore implements AgentRunStore { return next; } - finishCommand(commandId: string, result: Pick): CommandRecord { + finishCommand(commandId: string, result: Pick): CommandRecord { const command = this.getCommand(commandId); if (isTerminalCommandState(command.state)) return command; const next = { ...command, state: commandStateFromTerminal(result.terminalStatus), updatedAt: nowIso() }; this.commands.set(commandId, next); - this.appendEvent(command.runId, "backend_status", { phase: "command-terminal", commandId, state: next.state, terminalStatus: result.terminalStatus, failureKind: result.failureKind }); + const run = this.getRun(command.runId); + if (result.threadId && run.sessionRef?.sessionId) this.upsertSessionThread(run, result.threadId, result.turnId ?? null); + this.appendEvent(command.runId, "backend_status", { phase: "command-terminal", commandId, state: next.state, terminalStatus: result.terminalStatus, failureKind: result.failureKind, message: result.failureMessage ?? null, threadId: result.threadId ?? null, turnId: result.turnId ?? null }); return next; } @@ -256,8 +259,7 @@ export class MemoryAgentRunStore implements AgentRunStore { if (isTerminalCommandState(command.state)) return command; const next = { ...command, state: "cancelled" as const, updatedAt: nowIso() }; this.commands.set(commandId, next); - this.appendEvent(command.runId, "backend_status", { phase: "command-terminal", commandId, state: "cancelled", terminalStatus: "cancelled", failureKind: "cancelled" }); - this.cancelRun(command.runId, reason); + this.appendEvent(command.runId, "backend_status", { phase: "command-terminal", commandId, state: "cancelled", terminalStatus: "cancelled", failureKind: "cancelled", message: reason }); return next; } @@ -419,6 +421,11 @@ export function isTerminalQueueTaskState(state: QueueTaskState): boolean { return state === "completed" || state === "failed" || state === "blocked" || state === "cancelled"; } +export function isLeaseExpired(leaseExpiresAt: string | null): boolean { + if (!leaseExpiresAt) return true; + return new Date(leaseExpiresAt).getTime() <= Date.now(); +} + export function sessionRefFromRecord(record: SessionRecord, fallback: SessionRef): SessionRef { return { sessionId: record.sessionId, diff --git a/src/runner/k8s-job.ts b/src/runner/k8s-job.ts index 525be0e..f3cad5e 100644 --- a/src/runner/k8s-job.ts +++ b/src/runner/k8s-job.ts @@ -157,6 +157,7 @@ function runnerEnv(options: RunnerJobRenderOptions, context: { namespace: string { name: "AGENTRUN_RUNTIME_NAMESPACE", value: context.namespace }, { name: "AGENTRUN_K8S_JOB_NAME", value: context.jobName }, { name: "AGENTRUN_LOG_PATH", value: "/tmp/agentrun-runner.jsonl" }, + { name: "AGENTRUN_RUNNER_IDLE_TIMEOUT_MS", value: "600000" }, { name: "HOME", value: "/home/agentrun" }, { name: "CODEX_HOME", value: codexHome }, ...(selectedSecret ? [{ name: "AGENTRUN_CODEX_SECRET_HOME", value: selectedSecret.projectionMountPath }] : []), diff --git a/src/runner/main.ts b/src/runner/main.ts index e5350ef..220ede8 100644 --- a/src/runner/main.ts +++ b/src/runner/main.ts @@ -32,6 +32,9 @@ if (process.env.AGENTRUN_LOG_PATH) options.logPath = process.env.AGENTRUN_LOG_PA if (process.env.AGENTRUN_CODEX_COMMAND) options.codexCommand = process.env.AGENTRUN_CODEX_COMMAND; if (process.env.AGENTRUN_CODEX_ARGS) options.codexArgs = JSON.parse(process.env.AGENTRUN_CODEX_ARGS) as string[]; if (process.env.CODEX_HOME) options.codexHome = process.env.CODEX_HOME; +if (process.env.AGENTRUN_RUNNER_IDLE_TIMEOUT_MS) options.idleTimeoutMs = Number(process.env.AGENTRUN_RUNNER_IDLE_TIMEOUT_MS); +if (process.env.AGENTRUN_RUNNER_POLL_INTERVAL_MS) options.pollIntervalMs = Number(process.env.AGENTRUN_RUNNER_POLL_INTERVAL_MS); +if (process.env.AGENTRUN_RUNNER_ONE_SHOT === "1" || process.env.AGENTRUN_RUNNER_ONE_SHOT === "true") options.oneShot = true; try { const result = await runOnce(options); console.log(JSON.stringify({ ok: true, data: result })); diff --git a/src/runner/manager-api.ts b/src/runner/manager-api.ts index ce6bee2..6ee9a19 100644 --- a/src/runner/manager-api.ts +++ b/src/runner/manager-api.ts @@ -84,7 +84,7 @@ export class RunnerManagerApi { return await this.client.patch(`/api/v1/runs/${encodeURIComponent(runId)}/status`, report as unknown as JsonRecord) as RunRecord; } - async reportCommandStatus(commandId: string, report: { terminalStatus: TerminalStatus; failureKind: FailureKind | null; failureMessage: string | null }): Promise { + async reportCommandStatus(commandId: string, report: { terminalStatus: TerminalStatus; failureKind: FailureKind | null; failureMessage: string | null; threadId?: string; turnId?: string }): Promise { return await this.client.patch(`/api/v1/commands/${encodeURIComponent(commandId)}/status`, report as unknown as JsonRecord) as CommandRecord; } diff --git a/src/runner/run-once.ts b/src/runner/run-once.ts index 1950fe8..028e284 100644 --- a/src/runner/run-once.ts +++ b/src/runner/run-once.ts @@ -1,7 +1,7 @@ import { RunnerManagerApi, failureKindFromError, terminalStatusForFailure, errorMessage } from "./manager-api.js"; import { runBackendTurn, type BackendAdapterOptions } from "../backend/adapter.js"; import { materializeResourceBundle } from "./resource-bundle.js"; -import type { BackendProfile, JsonRecord, RunRecord, RunnerRecord } from "../common/types.js"; +import type { BackendEvent, BackendProfile, CommandRecord, FailureKind, JsonRecord, RunRecord, RunnerRecord, TerminalStatus } from "../common/types.js"; import { AgentRunError } from "../common/errors.js"; export interface RunnerOnceOptions extends BackendAdapterOptions { @@ -17,11 +17,20 @@ export interface RunnerOnceOptions extends BackendAdapterOptions { jobName?: string; podName?: string; logPath?: string; + idleTimeoutMs?: number; + pollIntervalMs?: number; + oneShot?: boolean; +} + +interface CommandExecutionResult extends JsonRecord { + commandId: string; + terminalStatus: TerminalStatus; + failureKind: FailureKind | null; } export async function runOnce(options: RunnerOnceOptions): Promise { const api = new RunnerManagerApi(options.managerUrl); - const targetRun = await api.client.get(`/api/v1/runs/${encodeURIComponent(options.runId)}`) as RunRecord; + const targetRun = await api.getRun(options.runId); if (isTerminalRun(targetRun)) return { terminalStatus: targetRun.terminalStatus, failureKind: targetRun.failureKind, run: targetRun, skipped: "run-terminal" } as JsonRecord; if (options.backendProfile && options.backendProfile !== targetRun.backendProfile) { throw new AgentRunError("schema-invalid", `runner backendProfile ${options.backendProfile} does not match run backendProfile ${targetRun.backendProfile}`, { httpStatus: 400 }); @@ -50,41 +59,87 @@ export async function runOnce(options: RunnerOnceOptions): Promise { } throw error; } - const commandsResponse = await api.pollCommands(options.runId, { afterSeq: 0, limit: 20, ...(options.commandId ? { commandId: options.commandId } : {}) }); - const command = commandsResponse.selected; - if (!command) { - await api.reportStatus(options.runId, { terminalStatus: "blocked", failureKind: "schema-invalid", failureMessage: "no pending turn command" }); - return { runner, claimed, terminalStatus: "blocked", failureKind: "schema-invalid", polledCommands: commandsResponse.items.length }; - } - await api.ackCommand(command.id); - const acked = await api.getCommand(options.runId, command.id); - if (acked.state === "cancelled") return await reportCancelled(api, options.runId, command.id, runner, claimed, "command cancelled before backend start"); - await assertNotCancelled(api, options.runId, command.id); - const abortController = new AbortController(); - const stopCancelWatch = watchCancellation(api, options.runId, command.id, abortController); + + const stopHeartbeat = startHeartbeat(api, options.runId, runner.id, leaseMs); + const idleTimeoutMs = options.idleTimeoutMs ?? 120_000; + const pollIntervalMs = options.pollIntervalMs ?? 2_000; + const commandResults: CommandExecutionResult[] = []; let workspacePath: string | undefined; + let materializationAttempted = false; + let materializationFailure: { failureKind: FailureKind; terminalStatus: TerminalStatus; message: string } | null = null; + try { - const materialized = await materializeResourceBundle(claimed.resourceBundleRef ?? null, options.env ?? process.env); - if (materialized) { - workspacePath = materialized.workspacePath; - await api.appendEvent(options.runId, { type: "backend_status", payload: materialized.event }); + let idleSince = Date.now(); + let firstPoll = true; + while (true) { + const currentRun = await api.getRun(options.runId); + if (isTerminalRun(currentRun)) return { runner, claimed, terminalStatus: currentRun.terminalStatus, failureKind: currentRun.failureKind, run: currentRun, commandsProcessed: commandResults.length, commandResults, stopped: "run-terminal" } as JsonRecord; + + const commandsResponse = await api.pollCommands(options.runId, { afterSeq: 0, limit: 20, ...(firstPoll && options.commandId ? { commandId: options.commandId } : {}) }); + firstPoll = false; + const command = commandsResponse.selected; + if (!command) { + if (options.oneShot === true) return noPendingResult(runner, claimed, commandResults, commandsResponse.items.length, "one-shot-no-pending"); + if (Date.now() - idleSince >= idleTimeoutMs) return noPendingResult(runner, claimed, commandResults, commandsResponse.items.length, "idle-timeout"); + await sleep(pollIntervalMs); + continue; + } + + idleSince = Date.now(); + if (!materializationAttempted) { + materializationAttempted = true; + try { + const materialized = await materializeResourceBundle(claimed.resourceBundleRef ?? null, options.env ?? process.env); + if (materialized) { + workspacePath = materialized.workspacePath; + await api.appendEvent(options.runId, { type: "backend_status", payload: { ...materialized.event, commandId: command.id, attemptId, runnerId: runner.id } }); + } + } catch (error) { + const failureKind = failureKindFromError(error); + materializationFailure = { failureKind, terminalStatus: terminalStatusForFailure(failureKind), message: errorMessage(error) }; + } + } + + const result = materializationFailure + ? await reportCommandFailure(api, options.runId, command.id, runner, attemptId, materializationFailure, "runner:resource-bundle") + : await executeCommand(api, options, command, runner, attemptId, workspacePath); + commandResults.push(result); + if (options.oneShot === true) { + const run = await api.getRun(options.runId); + return { runner, commandId: command.id, terminalStatus: result.terminalStatus, failureKind: result.failureKind, run, commandsProcessed: commandResults.length, commandResults, stopped: "one-shot" } as JsonRecord; + } } - await assertNotCancelled(api, options.runId, command.id); - const result = await runBackendTurn(claimed, command, { ...options, ...(workspacePath ? { workspacePath } : {}), abortSignal: abortController.signal }); - for (const event of result.events) { - if (event.type !== "terminal_status") await api.appendEvent(options.runId, event); - } - await api.reportCommandStatus(command.id, { terminalStatus: result.terminalStatus, failureKind: result.failureKind, failureMessage: result.failureMessage }); - const finalRun = await api.reportStatus(options.runId, { terminalStatus: result.terminalStatus, failureKind: result.failureKind, failureMessage: result.failureMessage, ...(result.threadId ? { threadId: result.threadId } : {}), ...(result.turnId ? { turnId: result.turnId } : {}) }) as RunRecord; - return { runner, commandId: command.id, terminalStatus: result.terminalStatus, failureKind: result.failureKind, run: finalRun } as JsonRecord; } catch (error) { const failureKind = failureKindFromError(error); const terminalStatus = terminalStatusForFailure(failureKind); const message = errorMessage(error); - await api.appendEvent(options.runId, { type: "error", payload: { failureKind, message, phase: "runner:execute", attemptId, runnerId: runner.id } }); - await api.reportCommandStatus(command.id, { terminalStatus, failureKind, failureMessage: message }); + await api.appendEvent(options.runId, { type: "error", payload: { failureKind, message, phase: "runner:loop", attemptId, runnerId: runner.id } }); const finalRun = await api.reportStatus(options.runId, { terminalStatus, failureKind, failureMessage: message }) as RunRecord; - return { runner, commandId: command.id, terminalStatus, failureKind, run: finalRun } as JsonRecord; + return { runner, terminalStatus, failureKind, run: finalRun, commandsProcessed: commandResults.length, commandResults } as JsonRecord; + } finally { + stopHeartbeat(); + } +} + +async function executeCommand(api: RunnerManagerApi, options: RunnerOnceOptions, command: CommandRecord, runner: RunnerRecord, attemptId: string, workspacePath: string | undefined): Promise { + await api.ackCommand(command.id); + const acked = await api.getCommand(options.runId, command.id); + if (acked.state === "cancelled") return await reportCancelled(api, options.runId, command.id, runner, attemptId, "command cancelled before backend start"); + await assertNotCancelled(api, options.runId, command.id); + const abortController = new AbortController(); + const stopCancelWatch = watchCancellation(api, options.runId, command.id, abortController); + try { + const latestRun = await api.getRun(options.runId); + const result = await runBackendTurn(latestRun, command, { ...options, ...(workspacePath ? { workspacePath } : {}), abortSignal: abortController.signal }); + for (const event of result.events) { + await api.appendEvent(options.runId, annotateCommandEvent(event, command.id, attemptId, runner.id)); + } + await api.reportCommandStatus(command.id, { terminalStatus: result.terminalStatus, failureKind: result.failureKind, failureMessage: result.failureMessage, ...(result.threadId ? { threadId: result.threadId } : {}), ...(result.turnId ? { turnId: result.turnId } : {}) }); + return { commandId: command.id, terminalStatus: result.terminalStatus, failureKind: result.failureKind } as CommandExecutionResult; + } catch (error) { + const failureKind = failureKindFromError(error); + const failure = { failureKind, terminalStatus: terminalStatusForFailure(failureKind), message: errorMessage(error) }; + return await reportCommandFailure(api, options.runId, command.id, runner, attemptId, failure, "runner:execute"); } finally { stopCancelWatch(); } @@ -118,8 +173,49 @@ function watchCancellation(api: RunnerManagerApi, runId: string, commandId: stri }; } -async function reportCancelled(api: RunnerManagerApi, runId: string, commandId: string, runner: RunnerRecord, claimed: RunRecord, message: string): Promise { - await api.reportCommandStatus(commandId, { terminalStatus: "cancelled", failureKind: "cancelled", failureMessage: message }); - const finalRun = await api.reportStatus(runId, { terminalStatus: "cancelled", failureKind: "cancelled", failureMessage: message }); - return { runner, commandId, claimed, terminalStatus: "cancelled", failureKind: "cancelled", run: finalRun }; +function startHeartbeat(api: RunnerManagerApi, runId: string, runnerId: string, leaseMs: number): () => void { + let stopped = false; + const beat = async (): Promise => { + if (stopped) return; + try { + await api.heartbeat(runId, runnerId, leaseMs); + } catch { + // The next manager call will surface lease or run-terminal details. + } + }; + const timer = setInterval(() => { void beat(); }, Math.max(1_000, Math.floor(leaseMs / 3))); + return () => { + stopped = true; + clearInterval(timer); + }; +} + +function annotateCommandEvent(event: BackendEvent, commandId: string, attemptId: string, runnerId: string): BackendEvent { + return { ...event, payload: { ...event.payload, commandId, attemptId, runnerId } }; +} + +async function reportCommandFailure(api: RunnerManagerApi, runId: string, commandId: string, runner: RunnerRecord, attemptId: string, failure: { terminalStatus: TerminalStatus; failureKind: FailureKind; message: string }, phase: string): Promise { + await api.appendEvent(runId, { type: "error", payload: { failureKind: failure.failureKind, message: failure.message, phase, commandId, attemptId, runnerId: runner.id } }); + await api.appendEvent(runId, { type: "terminal_status", payload: { terminalStatus: failure.terminalStatus, failureKind: failure.failureKind, message: failure.message, commandId, attemptId, runnerId: runner.id } }); + await api.reportCommandStatus(commandId, { terminalStatus: failure.terminalStatus, failureKind: failure.failureKind, failureMessage: failure.message }); + return { commandId, terminalStatus: failure.terminalStatus, failureKind: failure.failureKind } as CommandExecutionResult; +} + +async function reportCancelled(api: RunnerManagerApi, runId: string, commandId: string, runner: RunnerRecord, attemptId: string, message: string): Promise { + await api.reportCommandStatus(commandId, { terminalStatus: "cancelled", failureKind: "cancelled", failureMessage: message }); + await api.appendEvent(runId, { type: "backend_status", payload: { phase: "turn-cancelled", commandId, attemptId, runnerId: runner.id, failureKind: "cancelled", message } }); + await api.appendEvent(runId, { type: "terminal_status", payload: { terminalStatus: "cancelled", failureKind: "cancelled", message, commandId, attemptId, runnerId: runner.id } }); + return { commandId, terminalStatus: "cancelled", failureKind: "cancelled" }; +} + +function noPendingResult(runner: RunnerRecord, claimed: RunRecord, commandResults: CommandExecutionResult[], polledCommands: number, stopped: string): JsonRecord { + if (commandResults.length > 0) { + const last = commandResults.at(-1)!; + return { runner, claimed, terminalStatus: last.terminalStatus, failureKind: last.failureKind, commandsProcessed: commandResults.length, commandResults, polledCommands, stopped }; + } + return { runner, claimed, terminalStatus: "blocked", failureKind: "schema-invalid", commandsProcessed: 0, commandResults, polledCommands, stopped }; +} + +function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); } diff --git a/src/selftest/cases/30-codex-stdio.ts b/src/selftest/cases/30-codex-stdio.ts index 280b0a1..6a6c629 100644 --- a/src/selftest/cases/30-codex-stdio.ts +++ b/src/selftest/cases/30-codex-stdio.ts @@ -14,28 +14,29 @@ const selfTest: SelfTestCase = async (context) => { try { const client = new ManagerClient(server.baseUrl); const happy = await createRunWithCommand(client, context, "hello", "selftest-turn", 15_000); - const result = await runOnce({ managerUrl: server.baseUrl, runId: happy.runId, codexCommand: context.fakeCodexCommand, codexArgs: context.fakeCodexArgs, codexHome: context.codexHome, env: { CODEX_HOME: context.codexHome } }); + const result = await runOnce({ managerUrl: server.baseUrl, runId: happy.runId, codexCommand: context.fakeCodexCommand, codexArgs: context.fakeCodexArgs, codexHome: context.codexHome, env: { CODEX_HOME: context.codexHome }, oneShot: true }); assert.equal(result.terminalStatus, "completed"); assert.equal(typeof (result.runner as { id?: unknown }).id, "string"); const events = await client.get(`/api/v1/runs/${happy.runId}/events?afterSeq=0&limit=100`) as { items?: Array<{ type: string; payload: unknown }> }; assert.ok(events.items?.some((event) => event.type === "assistant_message")); assert.ok(events.items?.some((event) => event.type === "backend_status" && JSON.stringify(event.payload).includes("run-claimed"))); assertNoSecretLeak(events); - const finalRun = await client.get(`/api/v1/runs/${happy.runId}`) as { terminalStatus?: string }; - assert.equal(finalRun.terminalStatus, "completed"); + const finalRun = await client.get(`/api/v1/runs/${happy.runId}`) as { terminalStatus?: string | null; status?: string }; + assert.equal(finalRun.terminalStatus, null); + assert.equal(finalRun.status, "claimed"); const finalCommand = await client.get(`/api/v1/runs/${happy.runId}/commands/${happy.commandId}`) as { state?: string }; assert.equal(finalCommand.state, "completed"); const projectedHome = path.join(context.tmp, "runtime-codex-home"); const projected = await createRunWithCommand(client, { workspace: context.workspace, codexHome: projectedHome }, "hello projected", "selftest-projected-codex-home", 15_000); - const projectedResult = await runOnce({ managerUrl: server.baseUrl, runId: projected.runId, codexCommand: context.fakeCodexCommand, codexArgs: context.fakeCodexArgs, codexHome: projectedHome, env: { CODEX_HOME: projectedHome, AGENTRUN_CODEX_SECRET_HOME: context.codexHome } }); + const projectedResult = await runOnce({ managerUrl: server.baseUrl, runId: projected.runId, codexCommand: context.fakeCodexCommand, codexArgs: context.fakeCodexArgs, codexHome: projectedHome, env: { CODEX_HOME: projectedHome, AGENTRUN_CODEX_SECRET_HOME: context.codexHome }, oneShot: true }); assert.equal(projectedResult.terminalStatus, "completed"); await access(path.join(projectedHome, "auth.json")); await access(path.join(projectedHome, "config.toml")); const deepseekHome = path.join(context.tmp, "runtime-deepseek-home"); const deepseek = await createRunWithCommand(client, { ...context, backendProfile: "deepseek" }, "hello deepseek", "selftest-deepseek-turn", 15_000); - const deepseekResult = await runOnce({ managerUrl: server.baseUrl, runId: deepseek.runId, backendProfile: "deepseek", codexCommand: context.fakeCodexCommand, codexArgs: context.fakeCodexArgs, codexHome: deepseekHome, env: { CODEX_HOME: deepseekHome, AGENTRUN_CODEX_SECRET_HOME: context.deepseekHome } }); + const deepseekResult = await runOnce({ managerUrl: server.baseUrl, runId: deepseek.runId, backendProfile: "deepseek", codexCommand: context.fakeCodexCommand, codexArgs: context.fakeCodexArgs, codexHome: deepseekHome, env: { CODEX_HOME: deepseekHome, AGENTRUN_CODEX_SECRET_HOME: context.deepseekHome }, oneShot: true }); assert.equal(deepseekResult.terminalStatus, "completed"); await access(path.join(deepseekHome, "auth.json")); await access(path.join(deepseekHome, "config.toml")); @@ -49,12 +50,12 @@ const selfTest: SelfTestCase = async (context) => { ); const configModel = await createRunWithCommand(client, context, "hello config model", "selftest-config-model", 15_000); - const configModelResult = await runOnce({ managerUrl: server.baseUrl, runId: configModel.runId, codexCommand: context.fakeCodexCommand, codexArgs: context.fakeCodexArgs, codexHome: context.codexHome, env: { CODEX_HOME: context.codexHome, AGENTRUN_FAKE_CODEX_MODE: "reject-unexpected-model" } }); + const configModelResult = await runOnce({ managerUrl: server.baseUrl, runId: configModel.runId, codexCommand: context.fakeCodexCommand, codexArgs: context.fakeCodexArgs, codexHome: context.codexHome, env: { CODEX_HOME: context.codexHome, AGENTRUN_FAKE_CODEX_MODE: "reject-unexpected-model" }, oneShot: true }); assert.equal(configModelResult.terminalStatus, "completed", "unspecified model should be omitted so Codex config.toml remains authoritative"); const explicitModel = await createRunWithCommand(client, context, "hello explicit model placeholder", "selftest-explicit-model-placeholder", 15_000); const explicitCommand = await client.post(`/api/v1/runs/${explicitModel.runId}/commands`, { type: "turn", payload: { prompt: "hello explicit model", model: "gpt-5.5" }, idempotencyKey: "selftest-explicit-model-command" }) as { id: string }; - const explicitModelResult = await runOnce({ managerUrl: server.baseUrl, runId: explicitModel.runId, commandId: explicitCommand.id, codexCommand: context.fakeCodexCommand, codexArgs: context.fakeCodexArgs, codexHome: context.codexHome, env: { CODEX_HOME: context.codexHome, AGENTRUN_FAKE_CODEX_MODE: "require-explicit-model" } }); + const explicitModelResult = await runOnce({ managerUrl: server.baseUrl, runId: explicitModel.runId, commandId: explicitCommand.id, codexCommand: context.fakeCodexCommand, codexArgs: context.fakeCodexArgs, codexHome: context.codexHome, env: { CODEX_HOME: context.codexHome, AGENTRUN_FAKE_CODEX_MODE: "require-explicit-model" }, oneShot: true }); assert.equal(explicitModelResult.terminalStatus, "completed", "explicit command payload model should still be forwarded"); await runFailureCase({ client, managerUrl: server.baseUrl, context, mode: "missing-turn-result", expectedStatus: "failed", expectedFailureKind: "backend-response-invalid" }); @@ -83,6 +84,7 @@ async function runFailureCase(options: { client: ManagerClient; managerUrl: stri codexArgs: options.context.fakeCodexArgs, codexHome: options.context.codexHome, env: { CODEX_HOME: options.context.codexHome, AGENTRUN_FAKE_CODEX_MODE: options.mode }, + oneShot: true, }) as JsonRecord; assert.equal(result.terminalStatus, options.expectedStatus, options.mode); assert.equal(result.failureKind, options.expectedFailureKind, options.mode); @@ -113,12 +115,15 @@ async function runSecretFailureCase(options: { client: ManagerClient; managerUrl codexArgs: options.context.fakeCodexArgs, codexHome: path.join(options.context.tmp, "missing-codex-home"), env: { CODEX_HOME: path.join(options.context.tmp, "missing-codex-home") }, + oneShot: true, }) as JsonRecord; assert.equal(result.terminalStatus, "blocked", "secret unavailable"); assert.equal(result.failureKind, "secret-unavailable", "secret unavailable"); - const run = await options.client.get(`/api/v1/runs/${item.runId}`) as { status?: string; failureKind?: string }; - assert.equal(run.status, "blocked", "secret unavailable"); - assert.equal(run.failureKind, "secret-unavailable", "secret unavailable"); + const command = await options.client.get(`/api/v1/runs/${item.runId}/commands/${item.commandId}`) as { state?: string }; + assert.equal(command.state, "failed", "secret unavailable command state"); + const envelope = await options.client.get(`/api/v1/runs/${item.runId}/commands/${item.commandId}/result`) as JsonRecord; + assert.equal(envelope.terminalStatus, "failed", "secret unavailable result terminal"); + assert.equal(envelope.failureKind, "secret-unavailable", "secret unavailable result kind"); } async function runSpawnFailureCase(options: { client: ManagerClient; managerUrl: string; context: SelfTestContext }): Promise { @@ -130,6 +135,7 @@ async function runSpawnFailureCase(options: { client: ManagerClient; managerUrl: codexArgs: [], codexHome: options.context.codexHome, env: { CODEX_HOME: options.context.codexHome }, + oneShot: true, }) as JsonRecord; assert.equal(result.terminalStatus, "failed", "spawn failure"); assert.equal(result.failureKind, "backend-spawn-failed", "spawn failure"); diff --git a/src/selftest/cases/50-hwlab-manual-dispatch.ts b/src/selftest/cases/50-hwlab-manual-dispatch.ts index d93cb37..5139ed3 100644 --- a/src/selftest/cases/50-hwlab-manual-dispatch.ts +++ b/src/selftest/cases/50-hwlab-manual-dispatch.ts @@ -69,7 +69,7 @@ console.log(JSON.stringify({ apiVersion: manifest.apiVersion, kind: manifest.kin ); const sessionRun = await createHwlabRun(client, context, bundle, "hwlab-session-resume", "hello session", "hwlab-command-session"); - const runResult = await runOnce({ managerUrl: server.baseUrl, runId: sessionRun.runId, codexCommand: context.fakeCodexCommand, codexArgs: context.fakeCodexArgs, codexHome: context.codexHome, env: { CODEX_HOME: context.codexHome, AGENTRUN_WORKSPACE_ROOT: path.join(context.tmp, "workspaces") } }); + const runResult = await runOnce({ managerUrl: server.baseUrl, runId: sessionRun.runId, codexCommand: context.fakeCodexCommand, codexArgs: context.fakeCodexArgs, codexHome: context.codexHome, env: { CODEX_HOME: context.codexHome, AGENTRUN_WORKSPACE_ROOT: path.join(context.tmp, "workspaces") }, oneShot: true }); assert.equal(runResult.terminalStatus, "completed"); const session = await store.getSession("hwlab-session-resume"); assert.equal(session?.threadId, "thread_selftest_1"); @@ -84,14 +84,29 @@ console.log(JSON.stringify({ apiVersion: manifest.apiVersion, kind: manifest.kin const resumedRun = await client.get(`/api/v1/runs/${resumed.runId}`) as JsonRecord; assert.equal(((resumedRun.sessionRef as JsonRecord).threadId), "thread_selftest_1"); + const multiTurn = await createHwlabRun(client, context, bundle, "hwlab-session-multiturn", "hello first turn", "hwlab-command-multiturn-1"); + const multiturnRunner = runOnce({ managerUrl: server.baseUrl, runId: multiTurn.runId, commandId: multiTurn.commandId, codexCommand: context.fakeCodexCommand, codexArgs: context.fakeCodexArgs, codexHome: context.codexHome, env: { CODEX_HOME: context.codexHome, AGENTRUN_WORKSPACE_ROOT: path.join(context.tmp, "workspaces-multiturn") }, idleTimeoutMs: 500, pollIntervalMs: 50 }); + await waitForCommandState(client, multiTurn.runId, multiTurn.commandId, "completed"); + const secondCommand = await client.post(`/api/v1/runs/${multiTurn.runId}/commands`, { type: "turn", payload: { prompt: "hello second turn", traceId: "hwlab-command-multiturn-2" }, idempotencyKey: "hwlab-command-multiturn-2" }) as { id: string }; + await waitForCommandState(client, multiTurn.runId, secondCommand.id, "completed"); + const multiturnResult = await multiturnRunner as JsonRecord; + assert.equal(multiturnResult.commandsProcessed, 2); + const multiEventsResponse = await client.get(`/api/v1/runs/${multiTurn.runId}/events?afterSeq=0&limit=200`) as { items?: Array<{ type?: string; payload?: JsonRecord }> }; + const multiEvents = multiEventsResponse.items ?? []; + assert.equal(multiEvents.filter((event) => event.type === "backend_status" && event.payload?.phase === "resource-bundle-materialized").length, 1); + assert.equal(multiEvents.filter((event) => event.type === "backend_status" && event.payload?.phase === "command-terminal").length, 2); + const secondEnvelope = await client.get(`/api/v1/runs/${multiTurn.runId}/commands/${secondCommand.id}/result`) as JsonRecord; + assert.equal(secondEnvelope.terminalStatus, "completed"); + assert.equal(secondEnvelope.reply, "fake codex stdio reply"); + const runningCancel = await createHwlabRun(client, context, bundle, "hwlab-session-cancel-running", "cancel running", "hwlab-command-cancel-running", 10_000); - const running = runOnce({ managerUrl: server.baseUrl, runId: runningCancel.runId, codexCommand: context.fakeCodexCommand, codexArgs: context.fakeCodexArgs, codexHome: context.codexHome, env: { CODEX_HOME: context.codexHome, AGENTRUN_FAKE_CODEX_MODE: "missing-terminal", AGENTRUN_WORKSPACE_ROOT: path.join(context.tmp, "workspaces-running-cancel") } }); + const running = runOnce({ managerUrl: server.baseUrl, runId: runningCancel.runId, codexCommand: context.fakeCodexCommand, codexArgs: context.fakeCodexArgs, codexHome: context.codexHome, env: { CODEX_HOME: context.codexHome, AGENTRUN_FAKE_CODEX_MODE: "missing-terminal", AGENTRUN_WORKSPACE_ROOT: path.join(context.tmp, "workspaces-running-cancel") }, oneShot: true }); await waitForCommandState(client, runningCancel.runId, runningCancel.commandId, "acknowledged"); await client.post(`/api/v1/commands/${runningCancel.commandId}/cancel`, { reason: "self-test running cancel" }); const runningResult = await running; assert.equal(runningResult.terminalStatus, "cancelled"); - return { name: "hwlab-manual-dispatch", tests: ["runner-job-idempotency", "pending-cancel", "result-envelope", "session-ref-resume", "resource-bundle-materialization", "running-cancel"] }; + return { name: "hwlab-manual-dispatch", tests: ["runner-job-idempotency", "pending-cancel", "result-envelope", "session-ref-resume", "resource-bundle-materialization", "same-run-runner-multiturn", "running-cancel"] }; } finally { await new Promise((resolve) => server.server.close(() => resolve())); } diff --git a/src/selftest/cases/60-hwlab-baseline-contract.ts b/src/selftest/cases/60-hwlab-baseline-contract.ts index c813f2a..cf09ae8 100644 --- a/src/selftest/cases/60-hwlab-baseline-contract.ts +++ b/src/selftest/cases/60-hwlab-baseline-contract.ts @@ -70,7 +70,7 @@ async function assertEventContractAndCompletedSemantics(client: ManagerClient, c const happy = await createRunWithCommand(client, context, "hello event contract", "selftest-event-contract", 15_000); await client.post(`/api/v1/runs/${happy.runId}/events`, { type: "tool_call", payload: { method: "selftest/tool", item: { command: "echo ok" } } }); await client.post(`/api/v1/runs/${happy.runId}/events`, { type: "diff", payload: { filesChanged: 1, summary: "selftest diff" } }); - const result = await runOnce({ managerUrl, runId: happy.runId, codexCommand: context.fakeCodexCommand, codexArgs: context.fakeCodexArgs, codexHome: context.codexHome, env: { CODEX_HOME: context.codexHome } }); + const result = await runOnce({ managerUrl, runId: happy.runId, codexCommand: context.fakeCodexCommand, codexArgs: context.fakeCodexArgs, codexHome: context.codexHome, env: { CODEX_HOME: context.codexHome }, oneShot: true }); assert.equal(result.terminalStatus, "completed"); const eventsResponse = await client.get(`/api/v1/runs/${happy.runId}/events?afterSeq=0&limit=100`) as { items?: RunEvent[] }; const events = eventsResponse.items ?? []; @@ -81,7 +81,7 @@ async function assertEventContractAndCompletedSemantics(client: ManagerClient, c const envelope = await client.get(`/api/v1/runs/${happy.runId}/commands/${happy.commandId}/result`) as JsonRecord; assert.equal(envelope.completed, true); assert.equal(envelope.terminalStatus, "completed"); - assert.equal(envelope.terminalSource, "terminal_status-event"); + assert.equal(envelope.terminalSource, "command-record"); assertNoSecretLeak({ eventsResponse, envelope }); const partial = await createRunWithCommand(client, context, "partial should not complete", "selftest-partial-not-completed", 15_000); @@ -132,7 +132,7 @@ async function assertResourceBundleFailure(client: ManagerClient, context: SelfT resourceBundleRef: { kind: "git", repoUrl: repo.repoUrl, commitId: "0000000000000000000000000000000000000000", submodules: false, lfs: false }, }) as { id: string }; const command = await client.post(`/api/v1/runs/${run.id}/commands`, { type: "turn", payload: { prompt: "bad bundle" }, idempotencyKey: "selftest-bad-bundle" }) as { id: string }; - const result = await runOnce({ managerUrl, runId: run.id, commandId: command.id, codexCommand: context.fakeCodexCommand, codexArgs: context.fakeCodexArgs, codexHome: context.codexHome, env: { CODEX_HOME: context.codexHome, AGENTRUN_WORKSPACE_ROOT: path.join(context.tmp, "bad-bundle-workspaces") } }) as JsonRecord; + const result = await runOnce({ managerUrl, runId: run.id, commandId: command.id, codexCommand: context.fakeCodexCommand, codexArgs: context.fakeCodexArgs, codexHome: context.codexHome, env: { CODEX_HOME: context.codexHome, AGENTRUN_WORKSPACE_ROOT: path.join(context.tmp, "bad-bundle-workspaces") }, oneShot: true }) as JsonRecord; assert.equal(result.terminalStatus, "failed"); assert.equal(result.failureKind, "infra-failed"); const envelope = await client.get(`/api/v1/runs/${run.id}/commands/${command.id}/result`) as JsonRecord;