diff --git a/src/backend/adapter.ts b/src/backend/adapter.ts index fe8c6e5..3adf883 100644 --- a/src/backend/adapter.ts +++ b/src/backend/adapter.ts @@ -1,5 +1,5 @@ import type { BackendTurnResult, CommandRecord, RunRecord } from "../common/types.js"; -import { runCodexStdioTurn, type CodexStdioTurnOptions } from "./codex-stdio.js"; +import { CodexStdioBackendSession, runCodexStdioTurn, type CodexStdioTurnOptions } from "./codex-stdio.js"; import { backendProfileSpec } from "../common/backend-profiles.js"; export interface BackendAdapterOptions { @@ -11,11 +11,30 @@ export interface BackendAdapterOptions { env?: NodeJS.ProcessEnv; } +export interface BackendSession { + runTurn(run: RunRecord, command: CommandRecord, options?: BackendAdapterOptions): Promise; + close(): Promise; +} + export async function runBackendTurn(run: RunRecord, command: CommandRecord, options: BackendAdapterOptions = {}): Promise { const spec = backendProfileSpec(run.backendProfile); if (!spec || spec.backendKind !== "codex-app-server-stdio") { return { terminalStatus: "failed", failureKind: "backend-failed", failureMessage: `unsupported backendProfile ${run.backendProfile}`, events: [{ type: "error", payload: { failureKind: "backend-failed", backendProfile: run.backendProfile } }] }; } + return runCodexStdioTurn(backendTurnOptions(run, command, options)); +} + +export function createBackendSession(run: RunRecord, options: BackendAdapterOptions = {}): BackendSession | null { + const spec = backendProfileSpec(run.backendProfile); + if (!spec || spec.backendKind !== "codex-app-server-stdio") return null; + const session = new CodexStdioBackendSession(); + return { + runTurn: async (nextRun, command, turnOptions = {}) => session.runTurn(backendTurnOptions(nextRun, command, { ...options, ...turnOptions })), + close: async () => session.close(), + }; +} + +export function backendTurnOptions(run: RunRecord, command: CommandRecord, options: BackendAdapterOptions = {}): CodexStdioTurnOptions { const prompt = typeof command.payload.prompt === "string" ? command.payload.prompt : JSON.stringify(command.payload); const turnOptions: CodexStdioTurnOptions = { backendProfile: run.backendProfile, @@ -33,5 +52,5 @@ export async function runBackendTurn(run: RunRecord, command: CommandRecord, opt if (options.env) turnOptions.env = options.env; if (options.codexHome) turnOptions.codexHome = options.codexHome; if (options.abortSignal) turnOptions.abortSignal = options.abortSignal; - return runCodexStdioTurn(turnOptions); + return turnOptions; } diff --git a/src/backend/codex-stdio.ts b/src/backend/codex-stdio.ts index 871575e..d91cd7d 100644 --- a/src/backend/codex-stdio.ts +++ b/src/backend/codex-stdio.ts @@ -99,6 +99,7 @@ export class CodexStdioClient { this.child = spawn(command, args, { cwd: options.cwd, env: options.env ?? process.env, + detached: true, stdio: "pipe", }); } catch (error) { @@ -111,6 +112,10 @@ export class CodexStdioClient { this.child.on("error", (error) => this.handleClose(127, null, spawnFailure(command, error))); } + get isClosed(): boolean { + return this.closed; + } + request(method: string, params: JsonRecord, timeoutMs = requestTimeoutCapMs): Promise { if (this.closed) return Promise.reject(this.closeFailure ?? new CodexStdioFailure("backend-failed", "codex app-server is closed", `request:${method}`)); const id = this.nextId++; @@ -135,12 +140,25 @@ export class CodexStdioClient { stop(): void { if (this.closed) return; - this.child.kill("SIGTERM"); + this.kill("SIGTERM"); setTimeout(() => { - if (!this.closed) this.child.kill("SIGKILL"); + if (!this.closed) this.kill("SIGKILL"); }, 1500).unref?.(); } + private kill(signal: NodeJS.Signals): void { + const pid = this.child.pid; + if (typeof pid === "number") { + try { + process.kill(-pid, signal); + return; + } catch { + // Fall back to killing the direct child when process-group termination is unavailable. + } + } + this.child.kill(signal); + } + private appendStderr(chunk: Buffer): void { this.stderrBytes += chunk.byteLength; const next = Buffer.concat([this.stderrTailBuffer, chunk]); @@ -260,21 +278,84 @@ export class CodexStdioClient { } export async function runCodexStdioTurn(options: CodexStdioTurnOptions): Promise { + const session = new CodexStdioBackendSession(); + const result = await session.runTurn(options); + const closeEvents = await session.close(); + return { ...result, events: [...result.events, ...closeEvents] }; +} + +export class CodexStdioBackendSession { + private client: CodexStdioClient | null = null; + private clientKey: string | null = null; + + async runTurn(options: CodexStdioTurnOptions): Promise { + return await runCodexStdioTurnWithSession(options, this); + } + + async close(): Promise { + const client = this.client; + if (!client) return []; + this.client = null; + this.clientKey = null; + client.stop(); + const closeInfo = await client.closedPromise; + return [{ type: "backend_status", payload: { phase: "codex-app-server-closed", appServerExit: closeEvent(closeInfo) } }]; + } + + async getClient(options: CodexStdioTurnOptions, env: NodeJS.ProcessEnv, events: BackendEvent[]): Promise { + const key = codexClientKey(options, env); + if (this.client && !this.client.isClosed && this.clientKey === key) { + events.push({ type: "backend_status", payload: { phase: "codex-app-server:reused", ...backendMetadata(options), protocol: codexProtocol } }); + return this.client; + } + const closeEvents = await this.close(); + events.push(...closeEvents); + events.push({ + type: "backend_status", + payload: { + phase: "codex-app-server-starting", + ...backendMetadata(options), + protocol: codexProtocol, + runtime: runtimeSummary(options, env, resolveCodexHome(options)), + }, + }); + const clientOptions: ConstructorParameters[0] = { + cwd: options.cwd, + env, + onNotification: (message) => this.onNotification(message), + }; + if (options.command) clientOptions.command = options.command; + if (options.args) clientOptions.args = options.args; + this.client = new CodexStdioClient(clientOptions); + this.clientKey = key; + const requestTimeoutMs = Math.min(positiveTimeout(options.timeoutMs), requestTimeoutCapMs); + const initializeResult = requireResponseRecord(await this.client.request("initialize", { clientInfo: { name: "agentrun", title: "AgentRun", version: "0.1.0" }, capabilities: { experimentalApi: true } }, requestTimeoutMs), "initialize"); + validateInitializeResponse(initializeResult); + this.client.notify("initialized", {}); + events.push({ type: "backend_status", payload: { phase: "initialize:completed", ...backendMetadata(options), protocol: codexProtocol } }); + return this.client; + } + + private notificationHandlers = new Set<(message: JsonRecord) => void>(); + + addNotificationHandler(handler: (message: JsonRecord) => void): () => void { + this.notificationHandlers.add(handler); + return () => this.notificationHandlers.delete(handler); + } + + private onNotification(message: JsonRecord): void { + for (const handler of this.notificationHandlers) handler(message); + } +} + +async function runCodexStdioTurnWithSession(options: CodexStdioTurnOptions, session: CodexStdioBackendSession): Promise { const codexHome = resolveCodexHome(options); const projectionFailure = await prepareProjectedCodexHome(codexHome, options.env?.AGENTRUN_CODEX_SECRET_HOME ?? process.env.AGENTRUN_CODEX_SECRET_HOME); if (projectionFailure) return projectionFailure; const secretFailure = codexHomeReadiness(codexHome); if (secretFailure) return secretFailure; const env = childEnv(options, codexHome); - const events: BackendEvent[] = [{ - type: "backend_status", - payload: { - phase: "codex-app-server-starting", - ...backendMetadata(options), - protocol: codexProtocol, - runtime: runtimeSummary(options, env, codexHome), - }, - }]; + const events: BackendEvent[] = []; if (options.abortSignal?.aborted) { const cancelled = { status: "cancelled" as const, failureKind: "cancelled" as const, message: "cancel requested" }; events.push({ type: "backend_status", payload: { phase: "turn-cancelled", failureKind: "cancelled" } }); @@ -305,30 +386,20 @@ export async function runCodexStdioTurn(options: CodexStdioTurnOptions): Promise client?.stop(); terminalResolve(); }, positiveTimeout(options.timeoutMs)); + const stopNotifications = session.addNotificationHandler((message) => { + const normalized = normalizeCodexNotification(message); + if (normalized.threadId) threadId = normalized.threadId; + if (normalized.turnId) turnId = normalized.turnId; + if (normalized.assistantDelta) assistantText += normalized.assistantDelta; + if (typeof normalized.assistantFinal === "string" && normalized.assistantFinal.trim().length > 0) finalAssistantText = normalized.assistantFinal; + events.push(...normalized.events); + if (normalized.terminal && !terminal) { + terminal = normalized.terminal; + terminalResolve(); + } + }); try { - const clientOptions: ConstructorParameters[0] = { - cwd: options.cwd, - env, - onNotification: (message) => { - const normalized = normalizeCodexNotification(message); - if (normalized.threadId) threadId = normalized.threadId; - if (normalized.turnId) turnId = normalized.turnId; - if (normalized.assistantDelta) assistantText += normalized.assistantDelta; - if (typeof normalized.assistantFinal === "string" && normalized.assistantFinal.trim().length > 0) finalAssistantText = normalized.assistantFinal; - events.push(...normalized.events); - if (normalized.terminal && !terminal) { - terminal = normalized.terminal; - terminalResolve(); - } - }, - }; - if (options.command) clientOptions.command = options.command; - if (options.args) clientOptions.args = options.args; - client = new CodexStdioClient(clientOptions); - const initializeResult = requireResponseRecord(await client.request("initialize", { clientInfo: { name: "agentrun", title: "AgentRun", version: "0.1.0" }, capabilities: { experimentalApi: true } }, requestTimeoutMs), "initialize"); - validateInitializeResponse(initializeResult); - client.notify("initialized", {}); - events.push({ type: "backend_status", payload: { phase: "initialize:completed", ...backendMetadata(options), protocol: codexProtocol } }); + client = await session.getClient(options, env, events); const startThread = async (phasePrefix = "thread/start"): Promise => { const response = requireResponseRecord(await client!.request("thread/start", withOptionalModel({ cwd: options.cwd, approvalPolicy: options.approvalPolicy, sandbox: options.sandbox, serviceName: "agentrun" }, options.model), requestTimeoutMs), "thread/start"); @@ -381,15 +452,12 @@ export async function runCodexStdioTurn(options: CodexStdioTurnOptions): Promise events.push({ type: "error", payload: { failureKind: failure.failureKind, message: failure.message, phase: failure.phase, details: failure.details } }); } } finally { + stopNotifications(); options.abortSignal?.removeEventListener("abort", abortTurn); clearTimeout(timeout); - if (client) { - client.stop(); - const closeInfo = await client.closedPromise; - events.push({ type: "backend_status", payload: { phase: "codex-app-server-closed", appServerExit: closeEvent(closeInfo) } }); - } } if (!terminal) terminal = { status: "failed", failureKind: "backend-response-invalid", message: "codex app-server finished without terminal status" }; + if (terminal.status !== "completed") events.push(...await session.close()); const reply = finalAssistantText.trim().length > 0 ? finalAssistantText : assistantText; if (reply.trim().length > 0) events.push({ type: "assistant_message", payload: { text: reply } }); events.push({ type: "terminal_status", payload: { terminalStatus: terminal.status, failureKind: terminal.failureKind, message: terminal.message } }); @@ -527,6 +595,19 @@ function childEnv(options: CodexStdioTurnOptions, codexHome: string): NodeJS.Pro }; } +function codexClientKey(options: CodexStdioTurnOptions, env: NodeJS.ProcessEnv): string { + return JSON.stringify({ + command: options.command ?? "codex", + args: options.args ?? defaultCodexArgs, + cwd: options.cwd, + codexHome: env.CODEX_HOME ?? resolveCodexHome(options), + backendProfile: options.backendProfile ?? "codex", + model: options.model ?? null, + approvalPolicy: options.approvalPolicy, + sandbox: options.sandbox, + }); +} + function resolveCodexHome(options: CodexStdioTurnOptions): string { return options.codexHome ?? options.env?.CODEX_HOME ?? `${options.env?.HOME ?? process.env.HOME ?? ""}/.codex`; } diff --git a/src/runner/run-once.ts b/src/runner/run-once.ts index d278c02..bf0f9a9 100644 --- a/src/runner/run-once.ts +++ b/src/runner/run-once.ts @@ -1,5 +1,5 @@ import { RunnerManagerApi, failureKindFromError, terminalStatusForFailure, errorMessage } from "./manager-api.js"; -import { runBackendTurn, type BackendAdapterOptions } from "../backend/adapter.js"; +import { createBackendSession, runBackendTurn, type BackendAdapterOptions, type BackendSession } from "../backend/adapter.js"; import { materializeResourceBundle } from "./resource-bundle.js"; import type { BackendEvent, BackendProfile, CommandRecord, FailureKind, JsonRecord, RunRecord, RunnerRecord, TerminalStatus } from "../common/types.js"; import { AgentRunError } from "../common/errors.js"; @@ -67,6 +67,7 @@ export async function runOnce(options: RunnerOnceOptions): Promise { let workspacePath: string | undefined; let materializationAttempted = false; let materializationFailure: { failureKind: FailureKind; terminalStatus: TerminalStatus; message: string } | null = null; + let backendSession: BackendSession | null = null; try { let idleSince = Date.now(); @@ -102,7 +103,7 @@ export async function runOnce(options: RunnerOnceOptions): Promise { const result = materializationFailure ? await reportCommandFailure(api, options.runId, command.id, runner, attemptId, materializationFailure, "runner:resource-bundle") - : await executeCommand(api, options, command, runner, attemptId, workspacePath); + : await executeCommand(api, options, command, runner, attemptId, workspacePath, backendSession ?? (backendSession = createBackendSession(currentRun, options))); commandResults.push(result); if (options.oneShot === true) { const run = await api.reportStatus(options.runId, { terminalStatus: result.terminalStatus, failureKind: result.failureKind, failureMessage: null }); @@ -117,11 +118,19 @@ export async function runOnce(options: RunnerOnceOptions): Promise { const finalRun = await api.reportStatus(options.runId, { terminalStatus, failureKind, failureMessage: message }) as RunRecord; return { runner, terminalStatus, failureKind, run: finalRun, commandsProcessed: commandResults.length, commandResults } as JsonRecord; } finally { + if (backendSession) { + try { + const closeEvents = await backendSession.close(); + for (const event of closeEvents) await api.appendEvent(options.runId, annotateCommandEvent(event, "runner-shutdown", attemptId, runner.id)); + } catch { + // Runner shutdown must not hide the terminal result already reported for the command. + } + } stopHeartbeat(); } } -async function executeCommand(api: RunnerManagerApi, options: RunnerOnceOptions, command: CommandRecord, runner: RunnerRecord, attemptId: string, workspacePath: string | undefined): Promise { +async function executeCommand(api: RunnerManagerApi, options: RunnerOnceOptions, command: CommandRecord, runner: RunnerRecord, attemptId: string, workspacePath: string | undefined, backendSession: BackendSession | null): Promise { await api.ackCommand(command.id); const acked = await api.getCommand(options.runId, command.id); if (acked.state === "cancelled") return await reportCancelled(api, options.runId, command.id, runner, attemptId, "command cancelled before backend start"); @@ -130,7 +139,8 @@ async function executeCommand(api: RunnerManagerApi, options: RunnerOnceOptions, const stopCancelWatch = watchCancellation(api, options.runId, command.id, abortController); try { const latestRun = await api.getRun(options.runId); - const result = await runBackendTurn(latestRun, command, { ...options, ...(workspacePath ? { workspacePath } : {}), abortSignal: abortController.signal }); + const backendOptions = { ...options, ...(workspacePath ? { workspacePath } : {}), abortSignal: abortController.signal }; + const result = backendSession ? await backendSession.runTurn(latestRun, command, backendOptions) : await runBackendTurn(latestRun, command, backendOptions); for (const event of result.events) { await api.appendEvent(options.runId, annotateCommandEvent(event, command.id, attemptId, runner.id)); } diff --git a/src/selftest/cases/50-hwlab-manual-dispatch.ts b/src/selftest/cases/50-hwlab-manual-dispatch.ts index bccd53f..087214a 100644 --- a/src/selftest/cases/50-hwlab-manual-dispatch.ts +++ b/src/selftest/cases/50-hwlab-manual-dispatch.ts @@ -89,14 +89,19 @@ console.log(JSON.stringify({ apiVersion: manifest.apiVersion, kind: manifest.kin assert.equal(((resumedRun.sessionRef as JsonRecord).threadId), "thread_selftest_1"); const multiTurn = await createHwlabRun(client, context, bundle, "hwlab-session-multiturn", "hello first turn", "hwlab-command-multiturn-1"); - const multiturnRunner = runOnce({ managerUrl: server.baseUrl, runId: multiTurn.runId, commandId: multiTurn.commandId, codexCommand: context.fakeCodexCommand, codexArgs: context.fakeCodexArgs, codexHome: context.codexHome, env: { CODEX_HOME: context.codexHome, AGENTRUN_WORKSPACE_ROOT: path.join(context.tmp, "workspaces-multiturn") }, idleTimeoutMs: 500, pollIntervalMs: 50 }); + const fakeCodexStartFile = path.join(context.tmp, "fake-codex-starts-multiturn.txt"); + const multiturnRunner = runOnce({ managerUrl: server.baseUrl, runId: multiTurn.runId, commandId: multiTurn.commandId, codexCommand: context.fakeCodexCommand, codexArgs: context.fakeCodexArgs, codexHome: context.codexHome, env: { CODEX_HOME: context.codexHome, AGENTRUN_WORKSPACE_ROOT: path.join(context.tmp, "workspaces-multiturn"), AGENTRUN_FAKE_CODEX_START_FILE: fakeCodexStartFile }, idleTimeoutMs: 500, pollIntervalMs: 50 }); await waitForCommandState(client, multiTurn.runId, multiTurn.commandId, "completed"); const secondCommand = await client.post(`/api/v1/runs/${multiTurn.runId}/commands`, { type: "turn", payload: { prompt: "hello second turn", traceId: "hwlab-command-multiturn-2" }, idempotencyKey: "hwlab-command-multiturn-2" }) as { id: string }; await waitForCommandState(client, multiTurn.runId, secondCommand.id, "completed"); const multiturnResult = await multiturnRunner as JsonRecord; assert.equal(multiturnResult.commandsProcessed, 2); + const starts = (await readTextIfExists(fakeCodexStartFile)).trim().split("\n").filter(Boolean); + assert.equal(starts.length, 1, "same-run multiturn runner must keep one codex app-server process alive instead of restarting per command"); const multiEventsResponse = await client.get(`/api/v1/runs/${multiTurn.runId}/events?afterSeq=0&limit=200`) as { items?: Array<{ type?: string; payload?: JsonRecord }> }; const multiEvents = multiEventsResponse.items ?? []; + assert.equal(multiEvents.filter((event) => event.type === "backend_status" && event.payload?.phase === "codex-app-server-starting").length, 1); + assert.equal(multiEvents.filter((event) => event.type === "backend_status" && event.payload?.phase === "codex-app-server:reused").length, 1); assert.equal(multiEvents.filter((event) => event.type === "backend_status" && event.payload?.phase === "resource-bundle-materialized").length, 1); assert.equal(multiEvents.filter((event) => event.type === "backend_status" && event.payload?.phase === "command-terminal").length, 2); const secondEnvelope = await client.get(`/api/v1/runs/${multiTurn.runId}/commands/${secondCommand.id}/result`) as JsonRecord; @@ -159,4 +164,12 @@ async function waitForCommandState(client: ManagerClient, runId: string, command throw new Error(`command ${commandId} did not reach ${state}`); } +async function readTextIfExists(filePath: string): Promise { + try { + return await readFile(filePath, "utf8"); + } catch { + return ""; + } +} + export default selfTest; diff --git a/src/selftest/fake-codex-app-server.ts b/src/selftest/fake-codex-app-server.ts index 4c2584f..245259d 100644 --- a/src/selftest/fake-codex-app-server.ts +++ b/src/selftest/fake-codex-app-server.ts @@ -1,7 +1,9 @@ import * as readline from "node:readline"; +import { appendFileSync } from "node:fs"; const rl = readline.createInterface({ input: process.stdin, crlfDelay: Infinity }); const mode = process.env.AGENTRUN_FAKE_CODEX_MODE ?? "success"; +if (process.env.AGENTRUN_FAKE_CODEX_START_FILE) appendFileSync(process.env.AGENTRUN_FAKE_CODEX_START_FILE, `${process.pid}\n`); let threadCounter = 0; let turnCounter = 0; let observedThreadModel = false;