Files
pikasTech-agentrun/src/backend/codex-stdio.ts
T

977 lines
43 KiB
TypeScript

import { spawn, type ChildProcessWithoutNullStreams } from "node:child_process";
import { createHash } from "node:crypto";
import { accessSync, constants as fsConstants } from "node:fs";
import { chmod, copyFile, mkdir } from "node:fs/promises";
import path from "node:path";
import * as readline from "node:readline";
import type { BackendEvent, BackendProfile, BackendTurnResult, FailureKind, JsonRecord, JsonValue, TerminalStatus } from "../common/types.js";
import { redactJson, redactText } from "../common/redaction.js";
import { backendProfileSpec } from "../common/backend-profiles.js";
import { boundedTextSummary, commandOutputPayload } from "../common/output.js";
const codexProtocol = "codex-app-server-jsonrpc-stdio";
const defaultCodexArgs = ["app-server", "--listen", "stdio://"];
const stderrBufferBytes = 64_000;
const stderrEventChars = 4_000;
const requestTimeoutCapMs = 30_000;
const childEnvSummaryKeys = [
"CODEX_HOME",
"HOME",
"PATH",
"HTTP_PROXY",
"HTTPS_PROXY",
"ALL_PROXY",
"NO_PROXY",
"http_proxy",
"https_proxy",
"all_proxy",
"no_proxy",
"OPENAI_API_KEY",
"CODEX_API_KEY",
"GITHUB_TOKEN",
"GH_TOKEN",
];
export interface CodexStdioTurnOptions {
backendProfile?: BackendProfile;
prompt: string;
cwd: string;
model?: string;
threadId?: string;
approvalPolicy: string;
sandbox: string;
timeoutMs: number;
command?: string;
args?: string[];
env?: NodeJS.ProcessEnv;
codexHome?: string;
abortSignal?: AbortSignal;
onEvent?: (event: BackendEvent) => void | Promise<void>;
onActiveTurn?: (control: CodexActiveTurnControl) => void | (() => void);
}
export interface CodexActiveTurnControl {
threadId: string;
turnId: string;
steer(prompt: string): Promise<void>;
interrupt(): Promise<void>;
}
interface PendingRequest {
method: string;
timer: NodeJS.Timeout;
resolve: (value: unknown) => void;
reject: (error: Error) => void;
}
interface CompletedAssistantMessage {
itemId: string | null;
text: string;
}
interface SuppressedNotificationSummary {
total: number;
byMethod: Record<string, number>;
byItemType: Record<string, number>;
}
interface CodexStdioCloseInfo extends JsonRecord {
code: number | null;
signal: string | null;
stderrTail: string;
stderrBytes: number;
stderrTruncated: boolean;
failureKind: FailureKind | null;
message: string | null;
}
class CodexStdioFailure extends Error {
readonly failureKind: FailureKind;
readonly phase: string;
readonly details: JsonRecord;
constructor(failureKind: FailureKind, message: string, phase: string, details: JsonRecord = {}) {
super(redactText(message));
this.name = "CodexStdioFailure";
this.failureKind = failureKind;
this.phase = phase;
this.details = redactJson(details);
}
}
export class CodexStdioClient {
private readonly child: ChildProcessWithoutNullStreams;
private readonly pending = new Map<number, PendingRequest>();
private stderrTailBuffer = Buffer.alloc(0);
private stderrBytes = 0;
private nextId = 1;
private closed = false;
private closeFailure: CodexStdioFailure | null = null;
readonly closedPromise: Promise<CodexStdioCloseInfo>;
private closeResolve!: (value: CodexStdioCloseInfo) => void;
constructor(options: { command?: string; args?: string[]; cwd: string; env?: NodeJS.ProcessEnv; onNotification: (message: JsonRecord) => void }) {
this.closedPromise = new Promise((resolve) => { this.closeResolve = resolve; });
const command = options.command ?? "codex";
const args = options.args ?? defaultCodexArgs;
try {
this.child = spawn(command, args, {
cwd: options.cwd,
env: options.env ?? process.env,
detached: true,
stdio: "pipe",
});
} catch (error) {
throw spawnFailure(command, error);
}
this.child.stderr.on("data", (chunk: Buffer) => this.appendStderr(chunk));
const rl = readline.createInterface({ input: this.child.stdout, crlfDelay: Infinity });
void this.readLines(rl, options.onNotification);
this.child.on("close", (code, signal) => this.handleClose(code, signal));
this.child.on("error", (error) => this.handleClose(127, null, spawnFailure(command, error)));
}
get isClosed(): boolean {
return this.closed;
}
request(method: string, params: JsonRecord, timeoutMs = requestTimeoutCapMs): Promise<unknown> {
if (this.closed) return Promise.reject(this.closeFailure ?? new CodexStdioFailure("backend-failed", "codex app-server is closed", `request:${method}`));
const id = this.nextId++;
const message = { id, method, params };
const effectiveTimeoutMs = positiveTimeout(timeoutMs);
return new Promise((resolve, reject) => {
const timer = setTimeout(() => {
this.rejectRequest(id, new CodexStdioFailure("backend-timeout", `Codex stdio request ${method} timed out after ${effectiveTimeoutMs}ms`, `request:${method}`, { method, timeoutMs: effectiveTimeoutMs }));
}, effectiveTimeoutMs);
this.pending.set(id, { method, timer, resolve, reject });
this.child.stdin.write(`${JSON.stringify(message)}\n`, "utf8", (error: Error | null | undefined) => {
if (!error) return;
this.rejectRequest(id, new CodexStdioFailure("backend-failed", `failed to write Codex stdio request ${method}: ${error.message}`, `request:${method}`, { method }));
});
});
}
notify(method: string, params: JsonRecord = {}): void {
if (this.closed) return;
this.child.stdin.write(`${JSON.stringify({ method, params })}\n`, "utf8", () => undefined);
}
stop(): void {
if (this.closed) return;
this.kill("SIGTERM");
setTimeout(() => {
if (!this.closed) this.kill("SIGKILL");
}, 1500).unref?.();
}
private kill(signal: NodeJS.Signals): void {
const pid = this.child.pid;
if (typeof pid === "number") {
try {
process.kill(-pid, signal);
return;
} catch {
// Fall back to killing the direct child when process-group termination is unavailable.
}
}
this.child.kill(signal);
}
private appendStderr(chunk: Buffer): void {
this.stderrBytes += chunk.byteLength;
const next = Buffer.concat([this.stderrTailBuffer, chunk]);
this.stderrTailBuffer = next.byteLength > stderrBufferBytes ? next.subarray(next.byteLength - stderrBufferBytes) : next;
}
private async readLines(rl: readline.Interface, onNotification: (message: JsonRecord) => void): Promise<void> {
try {
for await (const line of rl) {
const trimmed = String(line).trim();
if (trimmed.length === 0) continue;
let message: JsonRecord;
try {
message = JSON.parse(trimmed) as JsonRecord;
} catch {
this.handleProtocolFailure(new CodexStdioFailure("backend-json-parse-error", "codex app-server emitted invalid JSON on stdout", "stdout:parse", { linePreview: redactText(trimmed.slice(0, 800)), lineChars: trimmed.length }));
break;
}
this.handleMessage(message, onNotification);
}
} catch (error) {
this.handleProtocolFailure(new CodexStdioFailure("backend-protocol-error", error instanceof Error ? error.message : String(error), "stdout:read"));
}
}
private handleMessage(message: JsonRecord, onNotification: (message: JsonRecord) => void): void {
const id = typeof message.id === "number" ? message.id : null;
const method = typeof message.method === "string" ? message.method : null;
if (id !== null && method === null) {
this.handleResponse(id, message);
return;
}
if (id !== null && method !== null) {
this.handleServerRequest(id, method);
return;
}
if (method !== null) {
onNotification(message);
return;
}
this.handleProtocolFailure(new CodexStdioFailure("backend-response-invalid", "codex app-server message had neither JSON-RPC id nor method", "stdout:message", { message }));
}
private handleResponse(id: number, message: JsonRecord): void {
const pending = this.pending.get(id);
if (!pending) return;
this.pending.delete(id);
clearTimeout(pending.timer);
if (message.error !== undefined) {
pending.reject(failureFromRpcError(pending.method, message.error));
return;
}
if (!("result" in message)) {
pending.reject(new CodexStdioFailure("backend-response-invalid", `codex app-server response for ${pending.method} omitted result and error`, `response:${pending.method}`, { method: pending.method }));
return;
}
pending.resolve(message.result);
}
private handleServerRequest(id: number, method: string): void {
if (method === "item/commandExecution/requestApproval" || method === "item/fileChange/requestApproval") {
this.child.stdin.write(`${JSON.stringify({ id, result: { decision: "decline" } })}\n`);
return;
}
this.child.stdin.write(`${JSON.stringify({ id, error: { code: -32601, message: `Unsupported client-side request: ${method}` } })}\n`);
}
private rejectRequest(id: number, error: CodexStdioFailure): void {
const pending = this.pending.get(id);
if (!pending) return;
this.pending.delete(id);
clearTimeout(pending.timer);
pending.reject(error);
}
private rejectAll(error: Error): void {
for (const pending of this.pending.values()) {
clearTimeout(pending.timer);
pending.reject(error);
}
this.pending.clear();
}
private handleProtocolFailure(error: CodexStdioFailure): void {
if (this.closed) return;
this.closeFailure = error;
this.rejectAll(error);
this.stop();
}
private handleClose(code: number | null, signal: string | null, failure: CodexStdioFailure | null = null): void {
if (this.closed) return;
this.closed = true;
if (failure) this.closeFailure = failure;
const stderr = this.stderrInfo();
const closeInfo: CodexStdioCloseInfo = {
code,
signal,
stderrTail: stderr.stderrTail,
stderrBytes: this.stderrBytes,
stderrTruncated: stderr.stderrTruncated,
failureKind: this.closeFailure?.failureKind ?? null,
message: this.closeFailure?.message ?? null,
};
this.rejectAll(this.closeFailure ?? new CodexStdioFailure("backend-failed", `codex app-server closed code=${code} signal=${signal}`, "process:close", closeInfo));
this.closeResolve(closeInfo);
}
private stderrInfo(): { stderrTail: string; stderrTruncated: boolean } {
const buffered = this.stderrTailBuffer.toString("utf8");
const tail = buffered.slice(-8000);
return {
stderrTail: redactText(tail),
stderrTruncated: this.stderrBytes > this.stderrTailBuffer.byteLength || buffered.length > tail.length,
};
}
}
export async function runCodexStdioTurn(options: CodexStdioTurnOptions): Promise<BackendTurnResult> {
const session = new CodexStdioBackendSession();
const result = await session.runTurn(options);
const closeEvents = await session.close();
return { ...result, events: [...result.events, ...closeEvents] };
}
export class CodexStdioBackendSession {
private client: CodexStdioClient | null = null;
private clientKey: string | null = null;
async runTurn(options: CodexStdioTurnOptions): Promise<BackendTurnResult> {
return await runCodexStdioTurnWithSession(options, this);
}
async close(): Promise<BackendEvent[]> {
const client = this.client;
if (!client) return [];
this.client = null;
this.clientKey = null;
client.stop();
const closeInfo = await client.closedPromise;
return [{ type: "backend_status", payload: { phase: "codex-app-server-closed", appServerExit: closeEvent(closeInfo) } }];
}
async getClient(options: CodexStdioTurnOptions, env: NodeJS.ProcessEnv, emitEvent: (event: BackendEvent) => void): Promise<CodexStdioClient> {
const key = codexClientKey(options, env);
if (this.client && !this.client.isClosed && this.clientKey === key) {
emitEvent({ type: "backend_status", payload: { phase: "codex-app-server:reused", ...backendMetadata(options), protocol: codexProtocol } });
return this.client;
}
const closeEvents = await this.close();
for (const event of closeEvents) emitEvent(event);
emitEvent({
type: "backend_status",
payload: {
phase: "codex-app-server-starting",
...backendMetadata(options),
protocol: codexProtocol,
runtime: runtimeSummary(options, env, resolveCodexHome(options)),
},
});
const clientOptions: ConstructorParameters<typeof CodexStdioClient>[0] = {
cwd: options.cwd,
env,
onNotification: (message) => this.onNotification(message),
};
if (options.command) clientOptions.command = options.command;
if (options.args) clientOptions.args = options.args;
this.client = new CodexStdioClient(clientOptions);
this.clientKey = key;
const requestTimeoutMs = Math.min(positiveTimeout(options.timeoutMs), requestTimeoutCapMs);
const initializeResult = requireResponseRecord(await this.client.request("initialize", { clientInfo: { name: "agentrun", title: "AgentRun", version: "0.1.0" }, capabilities: { experimentalApi: true } }, requestTimeoutMs), "initialize");
validateInitializeResponse(initializeResult);
this.client.notify("initialized", {});
emitEvent({ type: "backend_status", payload: { phase: "initialize:completed", ...backendMetadata(options), protocol: codexProtocol } });
return this.client;
}
private notificationHandlers = new Set<(message: JsonRecord) => void>();
addNotificationHandler(handler: (message: JsonRecord) => void): () => void {
this.notificationHandlers.add(handler);
return () => this.notificationHandlers.delete(handler);
}
private onNotification(message: JsonRecord): void {
for (const handler of this.notificationHandlers) handler(message);
}
}
async function runCodexStdioTurnWithSession(options: CodexStdioTurnOptions, session: CodexStdioBackendSession): Promise<BackendTurnResult> {
const codexHome = resolveCodexHome(options);
const projectionFailure = await prepareProjectedCodexHome(codexHome, options.env?.AGENTRUN_CODEX_SECRET_HOME ?? process.env.AGENTRUN_CODEX_SECRET_HOME);
if (projectionFailure) return projectionFailure;
const secretFailure = codexHomeReadiness(codexHome);
if (secretFailure) return secretFailure;
const env = childEnv(options, codexHome);
const events: BackendEvent[] = [];
let liveEventWrite = Promise.resolve();
const emitEvent = (event: BackendEvent): void => {
const redactedEvent: BackendEvent = { ...event, payload: redactJson(event.payload) };
if (options.onEvent) {
liveEventWrite = liveEventWrite.then(() => Promise.resolve(options.onEvent?.(redactedEvent))).catch(() => undefined);
return;
}
events.push(redactedEvent);
};
const emitEvents = (nextEvents: BackendEvent[]): void => {
for (const event of nextEvents) emitEvent(event);
};
if (options.abortSignal?.aborted) {
const cancelled = { status: "cancelled" as const, failureKind: "cancelled" as const, message: "cancel requested" };
events.push({ type: "backend_status", payload: { phase: "turn-cancelled", failureKind: "cancelled" } });
events.push({ type: "terminal_status", payload: { terminalStatus: cancelled.status, failureKind: cancelled.failureKind, message: cancelled.message } });
return { terminalStatus: cancelled.status, failureKind: cancelled.failureKind, failureMessage: cancelled.message, events: events.map((event) => ({ ...event, payload: redactJson(event.payload) })) };
}
let assistantText = "";
const completedAssistantMessages: CompletedAssistantMessage[] = [];
const suppressedNotifications = createSuppressedNotificationSummary();
let threadId: string | undefined = options.threadId;
let turnId: string | undefined;
let terminal: { status: TerminalStatus; failureKind: FailureKind | null; message: string | null } | null = null;
let terminalResolve!: () => void;
const terminalPromise = new Promise<void>((resolve) => { terminalResolve = resolve; });
let client: CodexStdioClient | null = null;
const requestTimeoutMs = Math.min(positiveTimeout(options.timeoutMs), requestTimeoutCapMs);
let stopActiveTurn: (() => void) | undefined;
const abortTurn = (): void => {
if (terminal) return;
terminal = { status: "cancelled", failureKind: "cancelled", message: "cancel requested" };
emitEvent({ type: "backend_status", payload: { phase: "turn-cancelled", failureKind: "cancelled" } });
client?.stop();
terminalResolve();
};
options.abortSignal?.addEventListener("abort", abortTurn, { once: true });
const timeout = setTimeout(() => {
if (terminal) return;
terminal = { status: "failed", failureKind: "backend-timeout", message: `codex stdio turn timed out after ${options.timeoutMs}ms` };
emitEvent({ type: "error", payload: { failureKind: terminal.failureKind, message: terminal.message, phase: "turn:timeout" } });
client?.stop();
terminalResolve();
}, positiveTimeout(options.timeoutMs));
const stopNotifications = session.addNotificationHandler((message) => {
const normalized = normalizeCodexNotification(message, suppressedNotifications);
if (normalized.threadId) threadId = normalized.threadId;
if (normalized.turnId) turnId = normalized.turnId;
emitEvents(normalized.events);
if (normalized.assistantDelta) assistantText += normalized.assistantDelta;
if (normalized.completedAssistantMessage) {
completedAssistantMessages.push(normalized.completedAssistantMessage);
emitEvent(assistantMessageEventForCompleted(normalized.completedAssistantMessage, completedAssistantMessages.length));
}
if (normalized.terminal && !terminal) {
terminal = normalized.terminal;
terminalResolve();
}
});
try {
client = await session.getClient(options, env, emitEvent);
const startThread = async (phasePrefix = "thread/start"): Promise<string> => {
const response = requireResponseRecord(await client!.request("thread/start", withOptionalModel({ cwd: options.cwd, approvalPolicy: options.approvalPolicy, sandbox: options.sandbox, serviceName: "agentrun" }, options.model), requestTimeoutMs), "thread/start");
const nextThreadId = requireNestedId(response, "thread/start", "thread");
emitEvent({ type: "backend_status", payload: { phase: `${phasePrefix}:completed`, threadId: nextThreadId } });
return nextThreadId;
};
if (options.threadId) {
try {
const threadResponse = requireResponseRecord(await client.request("thread/resume", withOptionalModel({ threadId: options.threadId, cwd: options.cwd, approvalPolicy: options.approvalPolicy, sandbox: options.sandbox }, options.model), requestTimeoutMs), "thread/resume");
threadId = requireNestedId(threadResponse, "thread/resume", "thread");
emitEvent({ type: "backend_status", payload: { phase: "thread/resume:completed", threadId } });
} catch (error) {
const failure = normalizeFailure(error);
if (!isMissingRolloutThreadResumeFailure(failure)) throw error;
throw threadResumeFailure(options.threadId, failure);
}
} else {
threadId = await startThread();
}
const turnResponse = requireResponseRecord(await client.request("turn/start", withOptionalModel({ threadId, input: textInput(options.prompt), cwd: options.cwd, approvalPolicy: options.approvalPolicy }, options.model), requestTimeoutMs), "turn/start");
turnId = requireNestedId(turnResponse, "turn/start", "turn");
emitEvent({ type: "backend_status", payload: { phase: "turn/start:completed", turnId } });
if (threadId && turnId && options.onActiveTurn) {
const maybeStop = options.onActiveTurn({
threadId,
turnId,
steer: async (prompt: string) => {
await client!.request("turn/steer", { threadId: threadId!, expectedTurnId: turnId!, input: textInput(prompt) }, requestTimeoutMs);
},
interrupt: async () => {
await client!.request("turn/interrupt", { threadId: threadId!, turnId: turnId! }, requestTimeoutMs);
},
});
if (typeof maybeStop === "function") stopActiveTurn = maybeStop;
}
const race = await Promise.race([
terminalPromise.then(() => ({ kind: "terminal" as const })),
client.closedPromise.then((closeInfo) => ({ kind: "closed" as const, closeInfo })),
]);
if (race.kind === "closed" && !terminal) {
terminal = terminalFromClose(race.closeInfo);
emitEvent({ type: "error", payload: { failureKind: terminal.failureKind, message: terminal.message, phase: "transport:closed-before-terminal", appServerExit: closeEvent(race.closeInfo) } });
}
if (!terminal) terminal = { status: "failed", failureKind: "backend-response-invalid", message: "codex app-server did not emit turn/completed" };
} catch (error) {
if (!terminal) {
const failure = normalizeFailure(error);
terminal = { status: failure.failureKind === "secret-unavailable" ? "blocked" : "failed", failureKind: failure.failureKind, message: failure.message };
emitEvent({ type: "error", payload: { failureKind: failure.failureKind, message: failure.message, phase: failure.phase, details: failure.details } });
}
} finally {
stopActiveTurn?.();
stopNotifications();
options.abortSignal?.removeEventListener("abort", abortTurn);
clearTimeout(timeout);
}
if (!terminal) terminal = { status: "failed", failureKind: "backend-response-invalid", message: "codex app-server finished without terminal status" };
if (terminal.status !== "completed") emitEvents(await session.close());
if (completedAssistantMessages.length === 0) emitEvents(assistantMessageEventsForTurn(assistantText, terminal.status === "completed"));
emitEvents(suppressedNotificationEvents(suppressedNotifications));
emitEvent({ type: "terminal_status", payload: { terminalStatus: terminal.status, failureKind: terminal.failureKind, message: terminal.message } });
await liveEventWrite;
return { terminalStatus: terminal.status, failureKind: terminal.failureKind, failureMessage: terminal.message, events: events.map((event) => ({ ...event, payload: redactJson(event.payload) })), ...(threadId ? { threadId } : {}), ...(turnId ? { turnId } : {}) };
}
async function prepareProjectedCodexHome(codexHome: string, projectedHome: string | undefined): Promise<BackendTurnResult | null> {
if (!projectedHome || projectedHome.trim().length === 0) return null;
if (path.resolve(projectedHome) === path.resolve(codexHome)) return null;
try {
await mkdir(codexHome, { recursive: true, mode: 0o700 });
for (const fileName of ["auth.json", "config.toml"]) {
await copyFile(path.join(projectedHome, fileName), path.join(codexHome, fileName));
await chmod(path.join(codexHome, fileName), 0o600);
}
return null;
} catch (error) {
const payload = {
failureKind: "secret-unavailable",
projection: {
source: pathSummary(projectedHome),
destination: pathSummary(codexHome),
valuesPrinted: false,
},
message: error instanceof Error ? redactText(error.message) : "failed to prepare writable Codex home",
} satisfies JsonRecord;
return {
terminalStatus: "blocked",
failureKind: "secret-unavailable",
failureMessage: "Codex Secret projection could not be copied to writable CODEX_HOME",
events: [
{ type: "error", payload },
{ type: "terminal_status", payload: { terminalStatus: "blocked", failureKind: "secret-unavailable" } },
],
};
}
}
function codexHomeReadiness(codexHome: string): BackendTurnResult | null {
const auth = fileReadable(`${codexHome}/auth.json`);
const config = fileReadable(`${codexHome}/config.toml`);
if (auth.readable && config.readable) return null;
const payload = {
failureKind: "secret-unavailable",
projection: {
codexHome: pathSummary(codexHome),
authJson: auth,
configToml: config,
valuesPrinted: false,
},
} satisfies JsonRecord;
return {
terminalStatus: "blocked",
failureKind: "secret-unavailable",
failureMessage: "Codex auth.json or config.toml projection is not readable",
events: [
{ type: "error", payload },
{ type: "terminal_status", payload: { terminalStatus: "blocked", failureKind: "secret-unavailable" } },
],
};
}
function normalizeCodexNotification(message: JsonRecord, suppressed: SuppressedNotificationSummary): { events: BackendEvent[]; assistantDelta?: string; completedAssistantMessage?: CompletedAssistantMessage; threadId?: string; turnId?: string; terminal?: { status: TerminalStatus; failureKind: FailureKind | null; message: string | null } } {
const method = typeof message.method === "string" ? message.method : "unknown";
const params = asRecordAt(message, "params");
if (method === "thread/started") {
const threadId = stringAt(asRecordAt(params, "thread"), "id");
return { events: [{ type: "backend_status", payload: { phase: method, threadId } }], ...(threadId ? { threadId } : {}) };
}
if (method === "turn/started") {
const turnId = stringAt(asRecordAt(params, "turn"), "id");
return { events: [{ type: "backend_status", payload: { phase: method, turnId } }], ...(turnId ? { turnId } : {}) };
}
if (isSuppressedCodexStatusNotification(method)) {
recordSuppressedNotification(suppressed, method);
return { events: [] };
}
if (method === "item/agentMessage/delta") return { events: [], assistantDelta: typeof params.delta === "string" ? params.delta : "" };
if (method === "item/commandExecution/outputDelta") return { events: [{ type: "command_output", payload: commandOutputPayload("stdout", typeof params.delta === "string" ? params.delta : "") }] };
if (method === "item/reasoning/textDelta") {
recordSuppressedNotification(suppressed, method, "reasoning");
return { events: [] };
}
if ((method === "item/started" || method === "item/completed") && asRecordAt(params, "item").type === "agentMessage") {
const item = asRecordAt(params, "item");
const itemId = stringAt(item, "id") ?? stringAt(params, "itemId");
const text = method === "item/completed" ? agentMessageText(item) : "";
const completedAssistantMessage = text.trim().length > 0 ? { itemId: itemId ?? null, text } : undefined;
return {
events: [],
...(completedAssistantMessage ? { completedAssistantMessage } : {}),
};
}
if (method === "item/started" || method === "item/completed") {
const item = asRecordAt(params, "item");
const itemType = typeof item.type === "string" ? item.type : "unknown";
if (itemType !== "commandExecution" || isSuppressedCodexItemType(itemType)) {
recordSuppressedNotification(suppressed, method, itemType);
return { events: [] };
}
return { events: [{ type: "tool_call", payload: toolCallPayload(method, item) }] };
}
if (method === "error") {
const error = asRecordAt(params, "error");
const messageText = typeof error.message === "string" ? error.message : "Codex app-server error";
const failureKind = classifyCodexErrorRecord(error, "backend-failed");
const terminal = params.willRetry === true ? undefined : { status: "failed" as const, failureKind, message: redactText(messageText) };
return { events: [{ type: "error", payload: { failureKind, error: redactJson(error), willRetry: params.willRetry === true } }], ...(terminal ? { terminal } : {}) };
}
if (method === "turn/completed") {
const turn = asRecordAt(params, "turn");
if (typeof turn.status !== "string") {
return { events: [{ type: "error", payload: { failureKind: "backend-response-invalid", message: "turn/completed notification omitted turn.status" } }], terminal: { status: "failed", failureKind: "backend-response-invalid", message: "turn/completed notification omitted turn.status" } };
}
const status = terminalStatusFromValue(turn.status);
const error = asRecordAt(turn, "error");
const messageText = typeof error.message === "string" ? redactText(error.message) : null;
const failureKind = status === "completed" ? null : classifyCodexErrorRecord(Object.keys(error).length > 0 ? error : { message: turn.status }, "backend-failed");
const events: BackendEvent[] = [{ type: "backend_status", payload: { phase: method, terminalStatus: status } }];
if (failureKind) events.push({ type: "error", payload: { failureKind, error: redactJson(error), phase: method } });
return { events, terminal: { status, failureKind, message: messageText } };
}
return { events: [{ type: "backend_status", payload: { phase: method } }] };
}
function createSuppressedNotificationSummary(): SuppressedNotificationSummary {
return { total: 0, byMethod: {}, byItemType: {} };
}
function recordSuppressedNotification(summary: SuppressedNotificationSummary, method: string, itemType?: string): void {
summary.total += 1;
summary.byMethod[method] = (summary.byMethod[method] ?? 0) + 1;
if (itemType) summary.byItemType[itemType] = (summary.byItemType[itemType] ?? 0) + 1;
}
function suppressedNotificationEvents(summary: SuppressedNotificationSummary): BackendEvent[] {
if (summary.total === 0) return [];
return [{
type: "backend_status",
payload: {
phase: "codex-app-server-notifications-suppressed",
total: summary.total,
methods: countRecordEntries(summary.byMethod, "method"),
itemTypes: countRecordEntries(summary.byItemType, "itemType"),
valuesPrinted: false,
},
}];
}
function countRecordEntries(input: Record<string, number>, keyName: "method" | "itemType"): JsonRecord[] {
return Object.entries(input)
.sort(([left], [right]) => left.localeCompare(right))
.map(([name, count]) => ({ [keyName]: name, count }) as JsonRecord);
}
function isSuppressedCodexStatusNotification(method: string): boolean {
return method === "thread/tokenUsage/updated" || method === "account/rateLimits/updated" || method === "warning" || method === "configWarning";
}
function isSuppressedCodexItemType(itemType: string): boolean {
return itemType === "reasoning";
}
function assistantMessageEventForCompleted(message: CompletedAssistantMessage, messageIndex: number): BackendEvent {
return {
type: "assistant_message",
payload: {
text: message.text,
itemId: message.itemId,
source: "completed-agent-message",
messageIndex,
messageCount: null,
replyAuthority: false,
final: false,
},
};
}
function assistantMessageEventsForTurn(assistantDeltaText: string, completed: boolean): BackendEvent[] {
if (assistantDeltaText.trim().length === 0) return [];
return [{
type: "assistant_message",
payload: {
text: assistantDeltaText,
itemId: null,
source: "agent-message-delta-fallback",
messageIndex: 1,
messageCount: 1,
replyAuthority: completed,
final: completed,
},
}];
}
function terminalStatusFromValue(value: unknown): TerminalStatus {
if (value === "completed") return "completed";
if (value === "cancelled" || value === "canceled" || value === "interrupted") return "cancelled";
if (value === "blocked") return "blocked";
return "failed";
}
function toolCallPayload(method: string, item: JsonRecord): JsonRecord {
const redacted = redactJson(item);
const itemId = typeof redacted.id === "string" ? redacted.id : null;
const itemType = typeof redacted.type === "string" ? redacted.type : "unknown";
const command = typeof redacted.command === "string" ? redacted.command : null;
const cwd = typeof redacted.cwd === "string" ? redacted.cwd : null;
const status = typeof redacted.status === "string" ? redacted.status : null;
const processId = typeof redacted.processId === "string" || typeof redacted.processId === "number" ? String(redacted.processId) : null;
const exitCode = typeof redacted.exitCode === "number" ? redacted.exitCode : null;
const durationMs = typeof redacted.durationMs === "number" ? redacted.durationMs : null;
const outputSummary = toolCallOutputSummary(redacted);
return {
method,
itemId,
type: itemType,
toolName: itemType,
...(command ? { command } : {}),
...(cwd ? { cwd } : {}),
...(status ? { status } : {}),
...(processId ? { processId } : {}),
...(exitCode !== null ? { exitCode } : {}),
...(durationMs !== null ? { durationMs } : {}),
...(outputSummary ? { outputSummary } : {}),
valuesPrinted: false,
};
}
function toolCallOutputSummary(item: JsonRecord): string | null {
const direct = item.outputSummary ?? item.stdoutSummary ?? item.message;
if (typeof direct === "string" && direct.trim().length > 0) return String(boundedTextSummary(direct).text);
const summary = item.summary;
if (typeof summary === "object" && summary !== null && !Array.isArray(summary) && typeof (summary as JsonRecord).text === "string") {
const text = String((summary as JsonRecord).text);
if (text.trim().length > 0) return String(boundedTextSummary(text).text);
}
const aggregated = item.aggregatedOutput;
if (typeof aggregated === "string" && aggregated.trim().length > 0) return String(boundedTextSummary(aggregated).text);
return null;
}
function withOptionalModel(params: JsonRecord, model: string | undefined): JsonRecord {
const value = typeof model === "string" ? model.trim() : "";
if (!value) return params;
return { ...params, model: value };
}
function childEnv(options: CodexStdioTurnOptions, codexHome: string): NodeJS.ProcessEnv {
return {
...process.env,
...options.env,
CODEX_HOME: codexHome,
CODEX_INTERNAL_ORIGINATOR_OVERRIDE: "agentrun",
};
}
function codexClientKey(options: CodexStdioTurnOptions, env: NodeJS.ProcessEnv): string {
return JSON.stringify({
command: options.command ?? "codex",
args: options.args ?? defaultCodexArgs,
cwd: options.cwd,
codexHome: env.CODEX_HOME ?? resolveCodexHome(options),
backendProfile: options.backendProfile ?? "codex",
model: options.model ?? null,
approvalPolicy: options.approvalPolicy,
sandbox: options.sandbox,
});
}
function resolveCodexHome(options: CodexStdioTurnOptions): string {
return options.codexHome ?? options.env?.CODEX_HOME ?? `${options.env?.HOME ?? process.env.HOME ?? ""}/.codex`;
}
function validateInitializeResponse(value: JsonRecord): void {
const serverInfo = value.serverInfo;
if (serverInfo !== undefined && (typeof serverInfo !== "object" || serverInfo === null || Array.isArray(serverInfo))) {
throw new CodexStdioFailure("backend-response-invalid", "initialize response serverInfo must be an object when present", "response:initialize", { response: value });
}
}
function requireResponseRecord(value: unknown, method: string): JsonRecord {
if (typeof value === "object" && value !== null && !Array.isArray(value)) return value as JsonRecord;
throw new CodexStdioFailure("backend-response-invalid", `${method} response result must be an object`, `response:${method}`);
}
function requireNestedId(value: JsonRecord, method: string, key: string): string {
const id = stringAt(asRecordAt(value, key), "id");
if (id) return id;
throw new CodexStdioFailure("backend-response-invalid", `${method} response did not include ${key}.id`, `response:${method}`, { response: value });
}
function textInput(text: string): JsonValue[] {
return [{ type: "text", text, text_elements: [] }];
}
function agentMessageText(item: JsonRecord): string {
for (const key of ["text", "content", "message"]) {
const value = item[key];
if (typeof value === "string") return value;
}
for (const key of ["text_elements", "content"]) {
const value = item[key];
if (!Array.isArray(value)) continue;
const parts = value.flatMap((entry) => {
if (typeof entry === "string") return [entry];
if (typeof entry !== "object" || entry === null || Array.isArray(entry)) return [];
const record = entry as JsonRecord;
return typeof record.text === "string" ? [record.text] : [];
});
if (parts.length > 0) return parts.join("");
}
return "";
}
function fileReadable(filePath: string): JsonRecord {
try {
accessSync(filePath, fsConstants.R_OK);
return { ...pathSummary(filePath), readable: true };
} catch {
return { ...pathSummary(filePath), readable: false };
}
}
function pathSummary(value: string): JsonRecord {
const raw = String(value || "");
const parts = raw.split(/[\\/]+/u).filter(Boolean);
return {
present: raw.trim().length > 0,
absolute: path.isAbsolute(raw),
basename: parts.at(-1) ?? null,
depth: parts.length,
fingerprint: shortHash(raw),
valuePrinted: false,
};
}
function runtimeSummary(options: CodexStdioTurnOptions, env: NodeJS.ProcessEnv, codexHome: string): JsonRecord {
return {
command: options.command ?? "codex",
args: options.args ?? defaultCodexArgs,
cwd: pathSummary(options.cwd),
workspace: pathSummary(options.cwd),
codexHome: pathSummary(codexHome),
env: envSummary(env),
valuesPrinted: false,
};
}
function backendMetadata(options: CodexStdioTurnOptions): JsonRecord {
const profile = options.backendProfile ?? "codex";
const spec = backendProfileSpec(profile);
return {
backendProfile: profile,
backendKind: spec?.backendKind ?? "codex-app-server-stdio",
protocol: spec?.protocol ?? codexProtocol,
transport: spec?.transport ?? "stdio",
};
}
function envSummary(env: NodeJS.ProcessEnv): JsonRecord {
const keyState: Record<string, JsonValue> = {};
for (const key of childEnvSummaryKeys) keyState[key] = { present: typeof env[key] === "string" && String(env[key]).length > 0 };
const secretLikeKeyCount = Object.keys(env).filter((key) => /auth|authorization|api[_-]?key|token|password|secret|credential/iu.test(key)).length;
return {
keyCount: Object.keys(env).length,
trackedKeys: keyState,
secretLikeKeyCount,
valuesPrinted: false,
};
}
function closeEvent(closeInfo: CodexStdioCloseInfo): JsonRecord {
return {
code: closeInfo.code,
signal: closeInfo.signal,
failureKind: closeInfo.failureKind,
message: closeInfo.message,
stderrTail: closeInfo.stderrTail.slice(-stderrEventChars),
stderrBytes: closeInfo.stderrBytes,
stderrTruncated: closeInfo.stderrTruncated || closeInfo.stderrTail.length > stderrEventChars,
};
}
function terminalFromClose(closeInfo: CodexStdioCloseInfo): { status: TerminalStatus; failureKind: FailureKind; message: string } {
const baseMessage = `codex app-server closed before turn/completed code=${closeInfo.code} signal=${closeInfo.signal}`;
const combined = [closeInfo.message ?? "", closeInfo.stderrTail].filter(Boolean).join("\n");
const failureKind = closeInfo.failureKind ?? classifyMessageFailureKind(combined, "backend-response-invalid");
const stderrPreview = closeInfo.stderrTail.trim().length > 0 ? `; stderrTail=${closeInfo.stderrTail.slice(-1000)}` : "";
return { status: "failed", failureKind, message: redactText(`${baseMessage}${stderrPreview}`) };
}
function failureFromRpcError(method: string, value: unknown): CodexStdioFailure {
const error = typeof value === "object" && value !== null ? value as Record<string, unknown> : {};
const message = typeof error.message === "string" ? error.message : JSON.stringify(redactJson(value as JsonValue));
return new CodexStdioFailure(classifyMessageFailureKind(message, "backend-protocol-error"), `codex app-server ${method} error: ${message}`, `response:${method}`, { method, error: redactJson(value as JsonValue) });
}
function spawnFailure(command: string, error: unknown): CodexStdioFailure {
const message = error instanceof Error ? error.message : String(error);
const code = typeof error === "object" && error !== null && "code" in error ? String((error as { code?: unknown }).code ?? "") : "";
return new CodexStdioFailure("backend-spawn-failed", `failed to start Codex app-server command ${command}: ${message}`, "process:spawn", { command, code });
}
function normalizeFailure(error: unknown): CodexStdioFailure {
if (error instanceof CodexStdioFailure) return error;
const message = error instanceof Error ? error.message : String(error);
return new CodexStdioFailure(classifyMessageFailureKind(message, "backend-protocol-error"), message, "codex-stdio");
}
function isMissingRolloutThreadResumeFailure(error: CodexStdioFailure): boolean {
if (error.phase !== "response:thread/resume") return false;
const text = `${error.message}\n${JSON.stringify(error.details)}`.toLowerCase();
return text.includes("no rollout found for thread id");
}
function threadResumeFailure(threadId: string, error: CodexStdioFailure): CodexStdioFailure {
return new CodexStdioFailure(
"thread-resume-failed",
`codex app-server thread/resume failed for existing thread: ${error.message}`,
"thread/resume",
{
requestedThreadId: threadId,
originalFailureKind: error.failureKind,
originalPhase: error.phase,
originalDetails: redactJson(error.details),
valuesPrinted: false,
},
);
}
function classifyCodexErrorRecord(error: JsonRecord, fallback: FailureKind): FailureKind {
const parts: string[] = [];
if (typeof error.message === "string") parts.push(error.message);
if (typeof error.additionalDetails === "string") parts.push(error.additionalDetails);
const redactedJson = JSON.stringify(redactJson(error as JsonValue));
if (redactedJson && redactedJson !== "{}") parts.push(redactedJson);
return classifyMessageFailureKind(parts.join("\n"), fallback);
}
function classifyMessageFailureKind(message: string, fallback: FailureKind): FailureKind {
const text = String(message || "").toLowerCase();
if (/invalid[_ -]?prompt/u.test(text) && /invalid function arguments json string|tool_call_id/u.test(text)) return "provider-invalid-tool-call";
if (/invalid function arguments json string/u.test(text)) return "provider-invalid-tool-call";
if (/rate.?limit|too many requests|\b429\b/u.test(text)) return "provider-rate-limited";
if (/\b401\b|\b403\b|unauthori[sz]ed|forbidden|invalid api key|authentication|auth failed|oauth|access token/u.test(text)) return "provider-auth-failed";
if (isProviderUnavailableMessage(text)) return "provider-unavailable";
if (/timed out|timeout|idle timeout/u.test(text)) return "backend-timeout";
if (/invalid json|json parse/u.test(text)) return "backend-json-parse-error";
return fallback;
}
function isProviderUnavailableMessage(text: string): boolean {
if (/\b(?:http(?:\s+status)?|status(?:\s+code)?|code)\s*[:=]?\s*5\d\d\b/u.test(text)) return true;
if (/\b5\d\d\b/u.test(text) && /service unavailable|bad gateway|gateway timeout|internal server error|provider|upstream|response\s*stream\s*disconnected|responsestreamdisconnected/u.test(text)) return true;
if (/service unavailable|temporar(?:y|ily) unavailable|provider (?:is )?unavailable|provider availability|upstream (?:is )?unavailable/u.test(text)) return true;
return false;
}
function positiveTimeout(value: number): number {
return Number.isFinite(value) && value > 0 ? Math.max(1, Math.floor(value)) : requestTimeoutCapMs;
}
function asRecordAt(value: JsonRecord, key: string): JsonRecord {
const next = value[key];
return typeof next === "object" && next !== null && !Array.isArray(next) ? next as JsonRecord : {};
}
function stringAt(value: JsonRecord, key: string): string | null {
return typeof value[key] === "string" && String(value[key]).length > 0 ? String(value[key]) : null;
}
function shortHash(value: string): string {
return createHash("sha256").update(value).digest("hex").slice(0, 12);
}