fix: keep codex stdio server alive across turns

This commit is contained in:
Codex
2026-06-02 01:35:31 +08:00
parent 4ddea73629
commit 1ff2f114b3
5 changed files with 171 additions and 46 deletions
+21 -2
View File
@@ -1,5 +1,5 @@
import type { BackendTurnResult, CommandRecord, RunRecord } from "../common/types.js";
import { runCodexStdioTurn, type CodexStdioTurnOptions } from "./codex-stdio.js";
import { CodexStdioBackendSession, runCodexStdioTurn, type CodexStdioTurnOptions } from "./codex-stdio.js";
import { backendProfileSpec } from "../common/backend-profiles.js";
export interface BackendAdapterOptions {
@@ -11,11 +11,30 @@ export interface BackendAdapterOptions {
env?: NodeJS.ProcessEnv;
}
export interface BackendSession {
runTurn(run: RunRecord, command: CommandRecord, options?: BackendAdapterOptions): Promise<BackendTurnResult>;
close(): Promise<BackendTurnResult["events"]>;
}
export async function runBackendTurn(run: RunRecord, command: CommandRecord, options: BackendAdapterOptions = {}): Promise<BackendTurnResult> {
const spec = backendProfileSpec(run.backendProfile);
if (!spec || spec.backendKind !== "codex-app-server-stdio") {
return { terminalStatus: "failed", failureKind: "backend-failed", failureMessage: `unsupported backendProfile ${run.backendProfile}`, events: [{ type: "error", payload: { failureKind: "backend-failed", backendProfile: run.backendProfile } }] };
}
return runCodexStdioTurn(backendTurnOptions(run, command, options));
}
export function createBackendSession(run: RunRecord, options: BackendAdapterOptions = {}): BackendSession | null {
const spec = backendProfileSpec(run.backendProfile);
if (!spec || spec.backendKind !== "codex-app-server-stdio") return null;
const session = new CodexStdioBackendSession();
return {
runTurn: async (nextRun, command, turnOptions = {}) => session.runTurn(backendTurnOptions(nextRun, command, { ...options, ...turnOptions })),
close: async () => session.close(),
};
}
export function backendTurnOptions(run: RunRecord, command: CommandRecord, options: BackendAdapterOptions = {}): CodexStdioTurnOptions {
const prompt = typeof command.payload.prompt === "string" ? command.payload.prompt : JSON.stringify(command.payload);
const turnOptions: CodexStdioTurnOptions = {
backendProfile: run.backendProfile,
@@ -33,5 +52,5 @@ export async function runBackendTurn(run: RunRecord, command: CommandRecord, opt
if (options.env) turnOptions.env = options.env;
if (options.codexHome) turnOptions.codexHome = options.codexHome;
if (options.abortSignal) turnOptions.abortSignal = options.abortSignal;
return runCodexStdioTurn(turnOptions);
return turnOptions;
}
+120 -39
View File
@@ -99,6 +99,7 @@ export class CodexStdioClient {
this.child = spawn(command, args, {
cwd: options.cwd,
env: options.env ?? process.env,
detached: true,
stdio: "pipe",
});
} catch (error) {
@@ -111,6 +112,10 @@ export class CodexStdioClient {
this.child.on("error", (error) => this.handleClose(127, null, spawnFailure(command, error)));
}
get isClosed(): boolean {
return this.closed;
}
request(method: string, params: JsonRecord, timeoutMs = requestTimeoutCapMs): Promise<unknown> {
if (this.closed) return Promise.reject(this.closeFailure ?? new CodexStdioFailure("backend-failed", "codex app-server is closed", `request:${method}`));
const id = this.nextId++;
@@ -135,12 +140,25 @@ export class CodexStdioClient {
stop(): void {
if (this.closed) return;
this.child.kill("SIGTERM");
this.kill("SIGTERM");
setTimeout(() => {
if (!this.closed) this.child.kill("SIGKILL");
if (!this.closed) this.kill("SIGKILL");
}, 1500).unref?.();
}
private kill(signal: NodeJS.Signals): void {
const pid = this.child.pid;
if (typeof pid === "number") {
try {
process.kill(-pid, signal);
return;
} catch {
// Fall back to killing the direct child when process-group termination is unavailable.
}
}
this.child.kill(signal);
}
private appendStderr(chunk: Buffer): void {
this.stderrBytes += chunk.byteLength;
const next = Buffer.concat([this.stderrTailBuffer, chunk]);
@@ -260,21 +278,84 @@ export class CodexStdioClient {
}
export async function runCodexStdioTurn(options: CodexStdioTurnOptions): Promise<BackendTurnResult> {
const session = new CodexStdioBackendSession();
const result = await session.runTurn(options);
const closeEvents = await session.close();
return { ...result, events: [...result.events, ...closeEvents] };
}
export class CodexStdioBackendSession {
private client: CodexStdioClient | null = null;
private clientKey: string | null = null;
async runTurn(options: CodexStdioTurnOptions): Promise<BackendTurnResult> {
return await runCodexStdioTurnWithSession(options, this);
}
async close(): Promise<BackendEvent[]> {
const client = this.client;
if (!client) return [];
this.client = null;
this.clientKey = null;
client.stop();
const closeInfo = await client.closedPromise;
return [{ type: "backend_status", payload: { phase: "codex-app-server-closed", appServerExit: closeEvent(closeInfo) } }];
}
async getClient(options: CodexStdioTurnOptions, env: NodeJS.ProcessEnv, events: BackendEvent[]): Promise<CodexStdioClient> {
const key = codexClientKey(options, env);
if (this.client && !this.client.isClosed && this.clientKey === key) {
events.push({ type: "backend_status", payload: { phase: "codex-app-server:reused", ...backendMetadata(options), protocol: codexProtocol } });
return this.client;
}
const closeEvents = await this.close();
events.push(...closeEvents);
events.push({
type: "backend_status",
payload: {
phase: "codex-app-server-starting",
...backendMetadata(options),
protocol: codexProtocol,
runtime: runtimeSummary(options, env, resolveCodexHome(options)),
},
});
const clientOptions: ConstructorParameters<typeof CodexStdioClient>[0] = {
cwd: options.cwd,
env,
onNotification: (message) => this.onNotification(message),
};
if (options.command) clientOptions.command = options.command;
if (options.args) clientOptions.args = options.args;
this.client = new CodexStdioClient(clientOptions);
this.clientKey = key;
const requestTimeoutMs = Math.min(positiveTimeout(options.timeoutMs), requestTimeoutCapMs);
const initializeResult = requireResponseRecord(await this.client.request("initialize", { clientInfo: { name: "agentrun", title: "AgentRun", version: "0.1.0" }, capabilities: { experimentalApi: true } }, requestTimeoutMs), "initialize");
validateInitializeResponse(initializeResult);
this.client.notify("initialized", {});
events.push({ type: "backend_status", payload: { phase: "initialize:completed", ...backendMetadata(options), protocol: codexProtocol } });
return this.client;
}
private notificationHandlers = new Set<(message: JsonRecord) => void>();
addNotificationHandler(handler: (message: JsonRecord) => void): () => void {
this.notificationHandlers.add(handler);
return () => this.notificationHandlers.delete(handler);
}
private onNotification(message: JsonRecord): void {
for (const handler of this.notificationHandlers) handler(message);
}
}
async function runCodexStdioTurnWithSession(options: CodexStdioTurnOptions, session: CodexStdioBackendSession): Promise<BackendTurnResult> {
const codexHome = resolveCodexHome(options);
const projectionFailure = await prepareProjectedCodexHome(codexHome, options.env?.AGENTRUN_CODEX_SECRET_HOME ?? process.env.AGENTRUN_CODEX_SECRET_HOME);
if (projectionFailure) return projectionFailure;
const secretFailure = codexHomeReadiness(codexHome);
if (secretFailure) return secretFailure;
const env = childEnv(options, codexHome);
const events: BackendEvent[] = [{
type: "backend_status",
payload: {
phase: "codex-app-server-starting",
...backendMetadata(options),
protocol: codexProtocol,
runtime: runtimeSummary(options, env, codexHome),
},
}];
const events: BackendEvent[] = [];
if (options.abortSignal?.aborted) {
const cancelled = { status: "cancelled" as const, failureKind: "cancelled" as const, message: "cancel requested" };
events.push({ type: "backend_status", payload: { phase: "turn-cancelled", failureKind: "cancelled" } });
@@ -305,30 +386,20 @@ export async function runCodexStdioTurn(options: CodexStdioTurnOptions): Promise
client?.stop();
terminalResolve();
}, positiveTimeout(options.timeoutMs));
const stopNotifications = session.addNotificationHandler((message) => {
const normalized = normalizeCodexNotification(message);
if (normalized.threadId) threadId = normalized.threadId;
if (normalized.turnId) turnId = normalized.turnId;
if (normalized.assistantDelta) assistantText += normalized.assistantDelta;
if (typeof normalized.assistantFinal === "string" && normalized.assistantFinal.trim().length > 0) finalAssistantText = normalized.assistantFinal;
events.push(...normalized.events);
if (normalized.terminal && !terminal) {
terminal = normalized.terminal;
terminalResolve();
}
});
try {
const clientOptions: ConstructorParameters<typeof CodexStdioClient>[0] = {
cwd: options.cwd,
env,
onNotification: (message) => {
const normalized = normalizeCodexNotification(message);
if (normalized.threadId) threadId = normalized.threadId;
if (normalized.turnId) turnId = normalized.turnId;
if (normalized.assistantDelta) assistantText += normalized.assistantDelta;
if (typeof normalized.assistantFinal === "string" && normalized.assistantFinal.trim().length > 0) finalAssistantText = normalized.assistantFinal;
events.push(...normalized.events);
if (normalized.terminal && !terminal) {
terminal = normalized.terminal;
terminalResolve();
}
},
};
if (options.command) clientOptions.command = options.command;
if (options.args) clientOptions.args = options.args;
client = new CodexStdioClient(clientOptions);
const initializeResult = requireResponseRecord(await client.request("initialize", { clientInfo: { name: "agentrun", title: "AgentRun", version: "0.1.0" }, capabilities: { experimentalApi: true } }, requestTimeoutMs), "initialize");
validateInitializeResponse(initializeResult);
client.notify("initialized", {});
events.push({ type: "backend_status", payload: { phase: "initialize:completed", ...backendMetadata(options), protocol: codexProtocol } });
client = await session.getClient(options, env, events);
const startThread = async (phasePrefix = "thread/start"): Promise<string> => {
const response = requireResponseRecord(await client!.request("thread/start", withOptionalModel({ cwd: options.cwd, approvalPolicy: options.approvalPolicy, sandbox: options.sandbox, serviceName: "agentrun" }, options.model), requestTimeoutMs), "thread/start");
@@ -381,15 +452,12 @@ export async function runCodexStdioTurn(options: CodexStdioTurnOptions): Promise
events.push({ type: "error", payload: { failureKind: failure.failureKind, message: failure.message, phase: failure.phase, details: failure.details } });
}
} finally {
stopNotifications();
options.abortSignal?.removeEventListener("abort", abortTurn);
clearTimeout(timeout);
if (client) {
client.stop();
const closeInfo = await client.closedPromise;
events.push({ type: "backend_status", payload: { phase: "codex-app-server-closed", appServerExit: closeEvent(closeInfo) } });
}
}
if (!terminal) terminal = { status: "failed", failureKind: "backend-response-invalid", message: "codex app-server finished without terminal status" };
if (terminal.status !== "completed") events.push(...await session.close());
const reply = finalAssistantText.trim().length > 0 ? finalAssistantText : assistantText;
if (reply.trim().length > 0) events.push({ type: "assistant_message", payload: { text: reply } });
events.push({ type: "terminal_status", payload: { terminalStatus: terminal.status, failureKind: terminal.failureKind, message: terminal.message } });
@@ -527,6 +595,19 @@ function childEnv(options: CodexStdioTurnOptions, codexHome: string): NodeJS.Pro
};
}
function codexClientKey(options: CodexStdioTurnOptions, env: NodeJS.ProcessEnv): string {
return JSON.stringify({
command: options.command ?? "codex",
args: options.args ?? defaultCodexArgs,
cwd: options.cwd,
codexHome: env.CODEX_HOME ?? resolveCodexHome(options),
backendProfile: options.backendProfile ?? "codex",
model: options.model ?? null,
approvalPolicy: options.approvalPolicy,
sandbox: options.sandbox,
});
}
function resolveCodexHome(options: CodexStdioTurnOptions): string {
return options.codexHome ?? options.env?.CODEX_HOME ?? `${options.env?.HOME ?? process.env.HOME ?? ""}/.codex`;
}
+14 -4
View File
@@ -1,5 +1,5 @@
import { RunnerManagerApi, failureKindFromError, terminalStatusForFailure, errorMessage } from "./manager-api.js";
import { runBackendTurn, type BackendAdapterOptions } from "../backend/adapter.js";
import { createBackendSession, runBackendTurn, type BackendAdapterOptions, type BackendSession } from "../backend/adapter.js";
import { materializeResourceBundle } from "./resource-bundle.js";
import type { BackendEvent, BackendProfile, CommandRecord, FailureKind, JsonRecord, RunRecord, RunnerRecord, TerminalStatus } from "../common/types.js";
import { AgentRunError } from "../common/errors.js";
@@ -67,6 +67,7 @@ export async function runOnce(options: RunnerOnceOptions): Promise<JsonRecord> {
let workspacePath: string | undefined;
let materializationAttempted = false;
let materializationFailure: { failureKind: FailureKind; terminalStatus: TerminalStatus; message: string } | null = null;
let backendSession: BackendSession | null = null;
try {
let idleSince = Date.now();
@@ -102,7 +103,7 @@ export async function runOnce(options: RunnerOnceOptions): Promise<JsonRecord> {
const result = materializationFailure
? await reportCommandFailure(api, options.runId, command.id, runner, attemptId, materializationFailure, "runner:resource-bundle")
: await executeCommand(api, options, command, runner, attemptId, workspacePath);
: await executeCommand(api, options, command, runner, attemptId, workspacePath, backendSession ?? (backendSession = createBackendSession(currentRun, options)));
commandResults.push(result);
if (options.oneShot === true) {
const run = await api.reportStatus(options.runId, { terminalStatus: result.terminalStatus, failureKind: result.failureKind, failureMessage: null });
@@ -117,11 +118,19 @@ export async function runOnce(options: RunnerOnceOptions): Promise<JsonRecord> {
const finalRun = await api.reportStatus(options.runId, { terminalStatus, failureKind, failureMessage: message }) as RunRecord;
return { runner, terminalStatus, failureKind, run: finalRun, commandsProcessed: commandResults.length, commandResults } as JsonRecord;
} finally {
if (backendSession) {
try {
const closeEvents = await backendSession.close();
for (const event of closeEvents) await api.appendEvent(options.runId, annotateCommandEvent(event, "runner-shutdown", attemptId, runner.id));
} catch {
// Runner shutdown must not hide the terminal result already reported for the command.
}
}
stopHeartbeat();
}
}
async function executeCommand(api: RunnerManagerApi, options: RunnerOnceOptions, command: CommandRecord, runner: RunnerRecord, attemptId: string, workspacePath: string | undefined): Promise<CommandExecutionResult> {
async function executeCommand(api: RunnerManagerApi, options: RunnerOnceOptions, command: CommandRecord, runner: RunnerRecord, attemptId: string, workspacePath: string | undefined, backendSession: BackendSession | null): Promise<CommandExecutionResult> {
await api.ackCommand(command.id);
const acked = await api.getCommand(options.runId, command.id);
if (acked.state === "cancelled") return await reportCancelled(api, options.runId, command.id, runner, attemptId, "command cancelled before backend start");
@@ -130,7 +139,8 @@ async function executeCommand(api: RunnerManagerApi, options: RunnerOnceOptions,
const stopCancelWatch = watchCancellation(api, options.runId, command.id, abortController);
try {
const latestRun = await api.getRun(options.runId);
const result = await runBackendTurn(latestRun, command, { ...options, ...(workspacePath ? { workspacePath } : {}), abortSignal: abortController.signal });
const backendOptions = { ...options, ...(workspacePath ? { workspacePath } : {}), abortSignal: abortController.signal };
const result = backendSession ? await backendSession.runTurn(latestRun, command, backendOptions) : await runBackendTurn(latestRun, command, backendOptions);
for (const event of result.events) {
await api.appendEvent(options.runId, annotateCommandEvent(event, command.id, attemptId, runner.id));
}
+14 -1
View File
@@ -89,14 +89,19 @@ console.log(JSON.stringify({ apiVersion: manifest.apiVersion, kind: manifest.kin
assert.equal(((resumedRun.sessionRef as JsonRecord).threadId), "thread_selftest_1");
const multiTurn = await createHwlabRun(client, context, bundle, "hwlab-session-multiturn", "hello first turn", "hwlab-command-multiturn-1");
const multiturnRunner = runOnce({ managerUrl: server.baseUrl, runId: multiTurn.runId, commandId: multiTurn.commandId, codexCommand: context.fakeCodexCommand, codexArgs: context.fakeCodexArgs, codexHome: context.codexHome, env: { CODEX_HOME: context.codexHome, AGENTRUN_WORKSPACE_ROOT: path.join(context.tmp, "workspaces-multiturn") }, idleTimeoutMs: 500, pollIntervalMs: 50 });
const fakeCodexStartFile = path.join(context.tmp, "fake-codex-starts-multiturn.txt");
const multiturnRunner = runOnce({ managerUrl: server.baseUrl, runId: multiTurn.runId, commandId: multiTurn.commandId, codexCommand: context.fakeCodexCommand, codexArgs: context.fakeCodexArgs, codexHome: context.codexHome, env: { CODEX_HOME: context.codexHome, AGENTRUN_WORKSPACE_ROOT: path.join(context.tmp, "workspaces-multiturn"), AGENTRUN_FAKE_CODEX_START_FILE: fakeCodexStartFile }, idleTimeoutMs: 500, pollIntervalMs: 50 });
await waitForCommandState(client, multiTurn.runId, multiTurn.commandId, "completed");
const secondCommand = await client.post(`/api/v1/runs/${multiTurn.runId}/commands`, { type: "turn", payload: { prompt: "hello second turn", traceId: "hwlab-command-multiturn-2" }, idempotencyKey: "hwlab-command-multiturn-2" }) as { id: string };
await waitForCommandState(client, multiTurn.runId, secondCommand.id, "completed");
const multiturnResult = await multiturnRunner as JsonRecord;
assert.equal(multiturnResult.commandsProcessed, 2);
const starts = (await readTextIfExists(fakeCodexStartFile)).trim().split("\n").filter(Boolean);
assert.equal(starts.length, 1, "same-run multiturn runner must keep one codex app-server process alive instead of restarting per command");
const multiEventsResponse = await client.get(`/api/v1/runs/${multiTurn.runId}/events?afterSeq=0&limit=200`) as { items?: Array<{ type?: string; payload?: JsonRecord }> };
const multiEvents = multiEventsResponse.items ?? [];
assert.equal(multiEvents.filter((event) => event.type === "backend_status" && event.payload?.phase === "codex-app-server-starting").length, 1);
assert.equal(multiEvents.filter((event) => event.type === "backend_status" && event.payload?.phase === "codex-app-server:reused").length, 1);
assert.equal(multiEvents.filter((event) => event.type === "backend_status" && event.payload?.phase === "resource-bundle-materialized").length, 1);
assert.equal(multiEvents.filter((event) => event.type === "backend_status" && event.payload?.phase === "command-terminal").length, 2);
const secondEnvelope = await client.get(`/api/v1/runs/${multiTurn.runId}/commands/${secondCommand.id}/result`) as JsonRecord;
@@ -159,4 +164,12 @@ async function waitForCommandState(client: ManagerClient, runId: string, command
throw new Error(`command ${commandId} did not reach ${state}`);
}
async function readTextIfExists(filePath: string): Promise<string> {
try {
return await readFile(filePath, "utf8");
} catch {
return "";
}
}
export default selfTest;
+2
View File
@@ -1,7 +1,9 @@
import * as readline from "node:readline";
import { appendFileSync } from "node:fs";
const rl = readline.createInterface({ input: process.stdin, crlfDelay: Infinity });
const mode = process.env.AGENTRUN_FAKE_CODEX_MODE ?? "success";
if (process.env.AGENTRUN_FAKE_CODEX_START_FILE) appendFileSync(process.env.AGENTRUN_FAKE_CODEX_START_FILE, `${process.pid}\n`);
let threadCounter = 0;
let turnCounter = 0;
let observedThreadModel = false;