import { spawn, type ChildProcessWithoutNullStreams } from "node:child_process"; import { createHash } from "node:crypto"; import { accessSync, constants as fsConstants, readdirSync, readFileSync } from "node:fs"; import { chmod, copyFile, mkdir } from "node:fs/promises"; import path from "node:path"; import * as readline from "node:readline"; import type { BackendEvent, BackendProfile, BackendTurnResult, FailureKind, InitialPromptAssembly, JsonRecord, JsonValue, TerminalStatus } from "../common/types.js"; import { redactJson, redactText } from "../common/redaction.js"; import { backendProfileSpec } from "../common/backend-profiles.js"; import { boundedTextSummary, commandOutputPayload } from "../common/output.js"; const codexProtocol = "codex-app-server-jsonrpc-stdio"; const defaultCodexArgs = ["app-server", "--listen", "stdio://"]; const stderrBufferBytes = 64_000; const stderrEventChars = 4_000; const requestTimeoutCapMs = 30_000; const assistantDeltaProgressMinChars = 500; const assistantDeltaProgressLimitChars = 1_200; const childEnvSummaryKeys = [ "CODEX_HOME", "HOME", "PATH", "HTTP_PROXY", "HTTPS_PROXY", "ALL_PROXY", "NO_PROXY", "http_proxy", "https_proxy", "all_proxy", "no_proxy", "OPENAI_API_KEY", "CODEX_API_KEY", "GITHUB_TOKEN", "GH_TOKEN", "AGENTRUN_SKILLS_DIRS", "HWLAB_CODE_AGENT_SKILLS_DIRS", ]; export interface CodexStdioTurnOptions { backendProfile?: BackendProfile; prompt: string; cwd: string; model?: string; threadId?: string; approvalPolicy: string; sandbox: string; requestedSandbox?: string; sandboxOverrideSource?: string | null; timeoutMs: number; command?: string; args?: string[]; env?: NodeJS.ProcessEnv; codexHome?: string; initialPrompt?: InitialPromptAssembly; abortSignal?: AbortSignal; onEvent?: (event: BackendEvent) => void | Promise; onActiveTurn?: (control: CodexActiveTurnControl) => void | (() => void); } export interface CodexActiveTurnControl { threadId: string; turnId: string; steer(prompt: string): Promise; interrupt(): Promise; } interface PendingRequest { method: string; timer: NodeJS.Timeout; resolve: (value: unknown) => void; reject: (error: Error) => void; } interface CompletedAssistantMessage { itemId: string | null; text: string; } interface AssistantDeltaProgressItem { itemId: string | null; text: string; emittedChars: number; flushed: boolean; } interface SuppressedNotificationSummary { total: number; byMethod: Record; byItemType: Record; } type AssistantDeltaProgressState = Map; interface CodexStdioCloseInfo extends JsonRecord { code: number | null; signal: string | null; stderrTail: string; stderrBytes: number; stderrTruncated: boolean; failureKind: FailureKind | null; message: string | null; } class CodexStdioFailure extends Error { readonly failureKind: FailureKind; readonly phase: string; readonly details: JsonRecord; constructor(failureKind: FailureKind, message: string, phase: string, details: JsonRecord = {}) { super(redactText(message)); this.name = "CodexStdioFailure"; this.failureKind = failureKind; this.phase = phase; this.details = redactJson(details); } } export class CodexStdioClient { private readonly child: ChildProcessWithoutNullStreams; private readonly pending = new Map(); private stderrTailBuffer = Buffer.alloc(0); private stderrBytes = 0; private nextId = 1; private closed = false; private closeFailure: CodexStdioFailure | null = null; readonly closedPromise: Promise; private closeResolve!: (value: CodexStdioCloseInfo) => void; constructor(options: { command?: string; args?: string[]; cwd: string; env?: NodeJS.ProcessEnv; onNotification: (message: JsonRecord) => void }) { this.closedPromise = new Promise((resolve) => { this.closeResolve = resolve; }); const command = options.command ?? "codex"; const args = options.args ?? defaultCodexArgs; try { this.child = spawn(command, args, { cwd: options.cwd, env: options.env ?? process.env, detached: true, stdio: "pipe", }); } catch (error) { throw spawnFailure(command, error); } this.child.stderr.on("data", (chunk: Buffer) => this.appendStderr(chunk)); const rl = readline.createInterface({ input: this.child.stdout, crlfDelay: Infinity }); void this.readLines(rl, options.onNotification); this.child.on("close", (code, signal) => this.handleClose(code, signal)); this.child.on("error", (error) => this.handleClose(127, null, spawnFailure(command, error))); } get isClosed(): boolean { return this.closed; } request(method: string, params: JsonRecord, timeoutMs = requestTimeoutCapMs): Promise { if (this.closed) return Promise.reject(this.closeFailure ?? new CodexStdioFailure("backend-failed", "codex app-server is closed", `request:${method}`)); const id = this.nextId++; const message = { id, method, params }; const effectiveTimeoutMs = positiveTimeout(timeoutMs); return new Promise((resolve, reject) => { const timer = setTimeout(() => { this.rejectRequest(id, new CodexStdioFailure("backend-timeout", `Codex stdio request ${method} timed out after ${effectiveTimeoutMs}ms`, `request:${method}`, { method, timeoutMs: effectiveTimeoutMs })); }, effectiveTimeoutMs); this.pending.set(id, { method, timer, resolve, reject }); this.child.stdin.write(`${JSON.stringify(message)}\n`, "utf8", (error: Error | null | undefined) => { if (!error) return; this.rejectRequest(id, new CodexStdioFailure("backend-failed", `failed to write Codex stdio request ${method}: ${error.message}`, `request:${method}`, { method })); }); }); } notify(method: string, params: JsonRecord = {}): void { if (this.closed) return; this.child.stdin.write(`${JSON.stringify({ method, params })}\n`, "utf8", () => undefined); } stop(): void { if (this.closed) return; this.kill("SIGTERM"); setTimeout(() => { if (!this.closed) this.kill("SIGKILL"); }, 1500).unref?.(); } private kill(signal: NodeJS.Signals): void { const pid = this.child.pid; if (typeof pid === "number") { try { process.kill(-pid, signal); return; } catch { // Fall back to killing the direct child when process-group termination is unavailable. } } this.child.kill(signal); } private appendStderr(chunk: Buffer): void { this.stderrBytes += chunk.byteLength; const next = Buffer.concat([this.stderrTailBuffer, chunk]); this.stderrTailBuffer = next.byteLength > stderrBufferBytes ? next.subarray(next.byteLength - stderrBufferBytes) : next; } private async readLines(rl: readline.Interface, onNotification: (message: JsonRecord) => void): Promise { try { for await (const line of rl) { const trimmed = String(line).trim(); if (trimmed.length === 0) continue; let message: JsonRecord; try { message = JSON.parse(trimmed) as JsonRecord; } catch { this.handleProtocolFailure(new CodexStdioFailure("backend-json-parse-error", "codex app-server emitted invalid JSON on stdout", "stdout:parse", { linePreview: redactText(trimmed.slice(0, 800)), lineChars: trimmed.length })); break; } this.handleMessage(message, onNotification); } } catch (error) { this.handleProtocolFailure(new CodexStdioFailure("backend-protocol-error", error instanceof Error ? error.message : String(error), "stdout:read")); } } private handleMessage(message: JsonRecord, onNotification: (message: JsonRecord) => void): void { const id = typeof message.id === "number" ? message.id : null; const method = typeof message.method === "string" ? message.method : null; if (id !== null && method === null) { this.handleResponse(id, message); return; } if (id !== null && method !== null) { this.handleServerRequest(id, method); return; } if (method !== null) { onNotification(message); return; } this.handleProtocolFailure(new CodexStdioFailure("backend-response-invalid", "codex app-server message had neither JSON-RPC id nor method", "stdout:message", { message })); } private handleResponse(id: number, message: JsonRecord): void { const pending = this.pending.get(id); if (!pending) return; this.pending.delete(id); clearTimeout(pending.timer); if (message.error !== undefined) { pending.reject(failureFromRpcError(pending.method, message.error)); return; } if (!("result" in message)) { pending.reject(new CodexStdioFailure("backend-response-invalid", `codex app-server response for ${pending.method} omitted result and error`, `response:${pending.method}`, { method: pending.method })); return; } pending.resolve(message.result); } private handleServerRequest(id: number, method: string): void { if (method === "item/commandExecution/requestApproval" || method === "item/fileChange/requestApproval") { this.child.stdin.write(`${JSON.stringify({ id, result: { decision: "decline" } })}\n`); return; } this.child.stdin.write(`${JSON.stringify({ id, error: { code: -32601, message: `Unsupported client-side request: ${method}` } })}\n`); } private rejectRequest(id: number, error: CodexStdioFailure): void { const pending = this.pending.get(id); if (!pending) return; this.pending.delete(id); clearTimeout(pending.timer); pending.reject(error); } private rejectAll(error: Error): void { for (const pending of this.pending.values()) { clearTimeout(pending.timer); pending.reject(error); } this.pending.clear(); } private handleProtocolFailure(error: CodexStdioFailure): void { if (this.closed) return; this.closeFailure = error; this.rejectAll(error); this.stop(); } private handleClose(code: number | null, signal: string | null, failure: CodexStdioFailure | null = null): void { if (this.closed) return; this.closed = true; if (failure) this.closeFailure = failure; const stderr = this.stderrInfo(); const closeInfo: CodexStdioCloseInfo = { code, signal, stderrTail: stderr.stderrTail, stderrBytes: this.stderrBytes, stderrTruncated: stderr.stderrTruncated, failureKind: this.closeFailure?.failureKind ?? null, message: this.closeFailure?.message ?? null, }; this.rejectAll(this.closeFailure ?? new CodexStdioFailure("backend-failed", `codex app-server closed code=${code} signal=${signal}`, "process:close", closeInfo)); this.closeResolve(closeInfo); } private stderrInfo(): { stderrTail: string; stderrTruncated: boolean } { const buffered = this.stderrTailBuffer.toString("utf8"); const tail = buffered.slice(-8000); return { stderrTail: redactText(tail), stderrTruncated: this.stderrBytes > this.stderrTailBuffer.byteLength || buffered.length > tail.length, }; } } export async function runCodexStdioTurn(options: CodexStdioTurnOptions): Promise { const session = new CodexStdioBackendSession(); const result = await session.runTurn(options); const closeEvents = await session.close(); return { ...result, events: [...result.events, ...closeEvents] }; } export class CodexStdioBackendSession { private client: CodexStdioClient | null = null; private clientKey: string | null = null; async runTurn(options: CodexStdioTurnOptions): Promise { return await runCodexStdioTurnWithSession(options, this); } async close(): Promise { const client = this.client; if (!client) return []; this.client = null; this.clientKey = null; client.stop(); const closeInfo = await client.closedPromise; return [{ type: "backend_status", payload: { phase: "codex-app-server-closed", appServerExit: closeEvent(closeInfo) } }]; } async getClient(options: CodexStdioTurnOptions, env: NodeJS.ProcessEnv, emitEvent: (event: BackendEvent) => void): Promise { const key = codexClientKey(options, env); if (this.client && !this.client.isClosed && this.clientKey === key) { emitEvent({ type: "backend_status", payload: { phase: "codex-app-server:reused", ...backendMetadata(options), protocol: codexProtocol } }); return this.client; } const closeEvents = await this.close(); for (const event of closeEvents) emitEvent(event); emitEvent({ type: "backend_status", payload: { phase: "codex-app-server-starting", ...backendMetadata(options), protocol: codexProtocol, runtime: runtimeSummary(options, env, resolveCodexHome(options)), config: codexConfigSummary(resolveCodexHome(options), options.backendProfile ?? "codex"), }, }); const clientOptions: ConstructorParameters[0] = { cwd: options.cwd, env, onNotification: (message) => this.onNotification(message), }; if (options.command) clientOptions.command = options.command; if (options.args) clientOptions.args = options.args; this.client = new CodexStdioClient(clientOptions); this.clientKey = key; const requestTimeoutMs = Math.min(positiveTimeout(options.timeoutMs), requestTimeoutCapMs); const initializeResult = requireResponseRecord(await this.client.request("initialize", { clientInfo: { name: "agentrun", title: "AgentRun", version: "0.1.0" }, capabilities: { experimentalApi: true } }, requestTimeoutMs), "initialize"); validateInitializeResponse(initializeResult); this.client.notify("initialized", {}); emitEvent({ type: "backend_status", payload: { phase: "initialize:completed", ...backendMetadata(options), protocol: codexProtocol } }); return this.client; } private notificationHandlers = new Set<(message: JsonRecord) => void>(); addNotificationHandler(handler: (message: JsonRecord) => void): () => void { this.notificationHandlers.add(handler); return () => this.notificationHandlers.delete(handler); } private onNotification(message: JsonRecord): void { for (const handler of this.notificationHandlers) handler(message); } } async function runCodexStdioTurnWithSession(options: CodexStdioTurnOptions, session: CodexStdioBackendSession): Promise { const codexHome = resolveCodexHome(options); const projectionFailure = await prepareProjectedCodexHome(codexHome, options.env?.AGENTRUN_CODEX_SECRET_HOME ?? process.env.AGENTRUN_CODEX_SECRET_HOME); if (projectionFailure) return projectionFailure; const secretFailure = codexHomeReadiness(codexHome, options.backendProfile ?? "codex"); if (secretFailure) return secretFailure; const env = childEnv(options, codexHome); const events: BackendEvent[] = []; let liveEventWrite = Promise.resolve(); const emitEvent = (event: BackendEvent): void => { const redactedEvent: BackendEvent = { ...event, payload: redactJson(event.payload) }; if (options.onEvent) { liveEventWrite = liveEventWrite.then(() => Promise.resolve(options.onEvent?.(redactedEvent))).catch(() => undefined); return; } events.push(redactedEvent); }; const emitEvents = (nextEvents: BackendEvent[]): void => { for (const event of nextEvents) emitEvent(event); }; if (options.abortSignal?.aborted) { const cancelled = { status: "cancelled" as const, failureKind: "cancelled" as const, message: "cancel requested" }; events.push({ type: "backend_status", payload: { phase: "turn-cancelled", failureKind: "cancelled" } }); events.push({ type: "terminal_status", payload: { terminalStatus: cancelled.status, failureKind: cancelled.failureKind, message: cancelled.message } }); return { terminalStatus: cancelled.status, failureKind: cancelled.failureKind, failureMessage: cancelled.message, events: events.map((event) => ({ ...event, payload: redactJson(event.payload) })) }; } let assistantText = ""; const assistantDeltaProgress = createAssistantDeltaProgressState(); const completedAssistantMessages: CompletedAssistantMessage[] = []; const suppressedNotifications = createSuppressedNotificationSummary(); let threadId: string | undefined = options.threadId; let turnId: string | undefined; let terminal: { status: TerminalStatus; failureKind: FailureKind | null; message: string | null } | null = null; let terminalResolve!: () => void; const terminalPromise = new Promise((resolve) => { terminalResolve = resolve; }); let client: CodexStdioClient | null = null; const requestTimeoutMs = Math.min(positiveTimeout(options.timeoutMs), requestTimeoutCapMs); let stopActiveTurn: (() => void) | undefined; let activeTurnKey: string | null = null; let interruptInFlight: Promise | null = null; let stopAfterInterrupt = false; const controlRequestTimeoutMs = Math.min(requestTimeoutMs, 5_000); const activeTurnControl = (activeThreadId: string, activeTurnId: string): CodexActiveTurnControl => ({ threadId: activeThreadId, turnId: activeTurnId, steer: async (prompt: string) => { await client!.request("turn/steer", { threadId: activeThreadId, expectedTurnId: activeTurnId, input: textInput(prompt) }, requestTimeoutMs); }, interrupt: async () => { await client!.request("turn/interrupt", { threadId: activeThreadId, turnId: activeTurnId }, controlRequestTimeoutMs); }, }); const exposeActiveTurn = (source: string): void => { if (!client || !threadId || !turnId || !options.onActiveTurn) return; const key = `${threadId}:${turnId}`; if (activeTurnKey === key) return; stopActiveTurn?.(); activeTurnKey = key; emitEvent({ type: "backend_status", payload: { phase: "active-turn-control-ready", source, threadId, turnId } }); const maybeStop = options.onActiveTurn(activeTurnControl(threadId, turnId)); stopActiveTurn = typeof maybeStop === "function" ? maybeStop : undefined; }; const requestInterrupt = (reason: string, triggerPhase: string): Promise => { const activeClient = client; const activeThreadId = threadId; const activeTurnId = turnId; emitEvent({ type: "backend_status", payload: { phase: "turn-interrupt-requested", reason, triggerPhase, threadId: activeThreadId ?? null, turnId: activeTurnId ?? null } }); if (!activeClient || !activeThreadId || !activeTurnId) { emitEvent({ type: "backend_status", payload: { phase: "turn-interrupt-unavailable", reason, triggerPhase, hasClient: Boolean(activeClient), hasThreadId: Boolean(activeThreadId), hasTurnId: Boolean(activeTurnId) } }); activeClient?.stop(); return Promise.resolve(); } return activeClient.request("turn/interrupt", { threadId: activeThreadId, turnId: activeTurnId }, controlRequestTimeoutMs) .then(() => { emitEvent({ type: "backend_status", payload: { phase: "turn/interrupt:completed", reason, triggerPhase, threadId: activeThreadId, turnId: activeTurnId } }); }) .catch((error) => { const failure = normalizeFailure(error); emitEvent({ type: "error", payload: { failureKind: failure.failureKind, message: failure.message, phase: "turn/interrupt:failed", triggerPhase, details: failure.details } }); }) .finally(() => { if (stopAfterInterrupt && !activeClient.isClosed) activeClient.stop(); }); }; const beginInterruptAndStop = (reason: string, triggerPhase: string): void => { stopAfterInterrupt = true; if (!interruptInFlight) interruptInFlight = requestInterrupt(reason, triggerPhase); else interruptInFlight = interruptInFlight.then(() => requestInterrupt(reason, triggerPhase)); }; const abortTurn = (): void => { if (terminal) return; terminal = { status: "cancelled", failureKind: "cancelled", message: "cancel requested" }; emitEvent({ type: "backend_status", payload: { phase: "turn-cancelled", failureKind: "cancelled" } }); beginInterruptAndStop("cancel requested", "abort-signal"); terminalResolve(); }; options.abortSignal?.addEventListener("abort", abortTurn, { once: true }); const turnIdleTimeoutMs = positiveTimeout(options.timeoutMs); let idleTimeout: NodeJS.Timeout | null = null; const refreshTurnActivity = (): void => { if (terminal) return; if (idleTimeout) clearTimeout(idleTimeout); idleTimeout = setTimeout(() => { if (terminal) return; terminal = { status: "failed", failureKind: "backend-timeout", message: `codex stdio turn idle timed out after ${turnIdleTimeoutMs}ms without activity` }; emitEvent({ type: "error", payload: { failureKind: terminal.failureKind, message: terminal.message, phase: "turn:idle-timeout" } }); beginInterruptAndStop("idle timeout", "turn:idle-timeout"); terminalResolve(); }, turnIdleTimeoutMs); }; const stopTurnIdleTimeout = (): void => { if (!idleTimeout) return; clearTimeout(idleTimeout); idleTimeout = null; }; refreshTurnActivity(); const stopNotifications = session.addNotificationHandler((message) => { refreshTurnActivity(); const normalized = normalizeCodexNotification(message, suppressedNotifications); if (normalized.threadId) threadId = normalized.threadId; if (normalized.turnId) turnId = normalized.turnId; exposeActiveTurn(normalized.turnId ? "turn-notification" : "notification"); emitEvents(normalized.events); if (normalized.assistantDelta) { assistantText += normalized.assistantDelta.text; const progress = recordAssistantDeltaProgress(assistantDeltaProgress, normalized.assistantDelta); if (progress) emitEvent(progress); } if (normalized.completedAssistantMessage) { completedAssistantMessages.push(normalized.completedAssistantMessage); emitEvent(assistantMessageEventForCompleted(normalized.completedAssistantMessage, completedAssistantMessages.length)); } if (normalized.terminal && !terminal) { terminal = normalized.terminal; terminalResolve(); } }); try { client = await session.getClient(options, env, emitEvent); const startThread = async (phasePrefix = "thread/start"): Promise => { const response = requireResponseRecord(await client!.request("thread/start", withOptionalModel({ cwd: options.cwd, approvalPolicy: options.approvalPolicy, sandbox: options.sandbox, serviceName: "agentrun" }, options.model), requestTimeoutMs), "thread/start"); const nextThreadId = requireNestedId(response, "thread/start", "thread"); emitEvent({ type: "backend_status", payload: { phase: `${phasePrefix}:completed`, threadId: nextThreadId } }); return nextThreadId; }; const willResumeThread = Boolean(options.threadId); const sessionPvcName = env.AGENTRUN_SESSION_PVC_NAME?.trim() || null; const sessionPvcNamespace = env.AGENTRUN_SESSION_PVC_NAMESPACE?.trim() || null; const sessionPvcMountPath = env.AGENTRUN_SESSION_PVC_MOUNT_PATH?.trim() || null; const codexRolloutSubdirEnv = env.AGENTRUN_CODEX_ROLLOUT_SUBDIR?.trim() || null; if (sessionPvcName && sessionPvcNamespace && sessionPvcMountPath) { emitEvent({ type: "backend_status", payload: { phase: "codex-rollout-storage-mounted", pvcName: sessionPvcName, pvcNamespace: sessionPvcNamespace, mountPath: sessionPvcMountPath, codexRolloutSubdir: codexRolloutSubdirEnv ?? "sessions", valuesPrinted: false } }); } if (options.threadId) { try { const threadResponse = requireResponseRecord(await client.request("thread/resume", withOptionalModel({ threadId: options.threadId, cwd: options.cwd, approvalPolicy: options.approvalPolicy, sandbox: options.sandbox }, options.model), requestTimeoutMs), "thread/resume"); threadId = requireNestedId(threadResponse, "thread/resume", "thread"); emitEvent({ type: "backend_status", payload: { phase: "thread/resume:completed", threadId } }); } catch (error) { const failure = normalizeFailure(error); if (sessionPvcName && isNoRolloutFoundMessage(failure.message)) { throw new CodexStdioFailure( "session-store-evicted", `codex app-server thread/resume reported no rollout found for PVC-backed session; session storage was likely evicted`, "thread/resume", { requestedThreadId: options.threadId, pvcName: sessionPvcName, pvcNamespace: sessionPvcNamespace, pvcMountPath: sessionPvcMountPath, originalFailureKind: failure.failureKind, originalPhase: failure.phase, originalDetails: redactJson(failure.details), valuesPrinted: false }, ); } throw threadResumeFailure(options.threadId, failure); } } else { threadId = await startThread(); } const promptInjection = initialPromptInjection(options.initialPrompt, willResumeThread); emitEvent({ type: "backend_status", payload: { phase: "initial-prompt-assembly", initialPromptInjected: promptInjection.injected, reason: promptInjection.reason, initialPrompt: options.initialPrompt?.summary ?? { available: false, valuesPrinted: false }, valuesPrinted: false } }); const turnStart = await Promise.race([ client.request("turn/start", withOptionalModel({ threadId, input: textInputForUserMessage(options.prompt, promptInjection), cwd: options.cwd, approvalPolicy: options.approvalPolicy }, options.model), requestTimeoutMs).then((response) => ({ kind: "response" as const, response })), terminalPromise.then(() => ({ kind: "terminal" as const })), ]); if (turnStart.kind === "response") { const turnResponse = requireResponseRecord(turnStart.response, "turn/start"); turnId = requireNestedId(turnResponse, "turn/start", "turn"); emitEvent({ type: "backend_status", payload: { phase: "turn/start:completed", turnId } }); exposeActiveTurn("turn-start-response"); } else { emitEvent({ type: "backend_status", payload: { phase: "turn/start:interrupted-before-response", threadId: threadId ?? null, turnId: turnId ?? null } }); } if (!terminal) { const race = await Promise.race([ terminalPromise.then(() => ({ kind: "terminal" as const })), client.closedPromise.then((closeInfo) => ({ kind: "closed" as const, closeInfo })), ]); if (race.kind === "closed" && !terminal) { terminal = terminalFromClose(race.closeInfo); emitEvent({ type: "error", payload: { failureKind: terminal.failureKind, message: terminal.message, phase: "transport:closed-before-terminal", appServerExit: closeEvent(race.closeInfo) } }); } } if (!terminal) terminal = { status: "failed", failureKind: "backend-response-invalid", message: "codex app-server did not emit turn/completed" }; } catch (error) { if (!terminal) { const failure = normalizeFailure(error); terminal = { status: failure.failureKind === "secret-unavailable" ? "blocked" : "failed", failureKind: failure.failureKind, message: failure.message }; emitEvent({ type: "error", payload: { failureKind: failure.failureKind, message: failure.message, phase: failure.phase, details: failure.details } }); } } finally { stopActiveTurn?.(); stopNotifications(); options.abortSignal?.removeEventListener("abort", abortTurn); stopTurnIdleTimeout(); } if (!terminal) terminal = { status: "failed", failureKind: "backend-response-invalid", message: "codex app-server finished without terminal status" }; const pendingInterrupt: Promise | null = interruptInFlight as Promise | null; if (pendingInterrupt) await pendingInterrupt.catch(() => undefined); if (terminal.status !== "completed") emitEvents(await session.close()); emitEvents(flushAssistantDeltaProgress(assistantDeltaProgress)); if (completedAssistantMessages.length === 0) emitEvents(assistantMessageEventsForTurn(assistantText, terminal.status === "completed")); emitEvents(suppressedNotificationEvents(suppressedNotifications)); emitEvent({ type: "terminal_status", payload: { terminalStatus: terminal.status, failureKind: terminal.failureKind, message: terminal.message } }); await liveEventWrite; return { terminalStatus: terminal.status, failureKind: terminal.failureKind, failureMessage: terminal.message, events: events.map((event) => ({ ...event, payload: redactJson(event.payload) })), ...(threadId ? { threadId } : {}), ...(turnId ? { turnId } : {}) }; } async function prepareProjectedCodexHome(codexHome: string, projectedHome: string | undefined): Promise { if (!projectedHome || projectedHome.trim().length === 0) return null; if (path.resolve(projectedHome) === path.resolve(codexHome)) return null; try { await mkdir(codexHome, { recursive: true, mode: 0o700 }); for (const fileName of projectedCodexHomeFiles(projectedHome)) { await copyFile(path.join(projectedHome, fileName), path.join(codexHome, fileName)); await chmod(path.join(codexHome, fileName), 0o600); } return null; } catch (error) { const payload = { failureKind: "secret-unavailable", projection: { source: pathSummary(projectedHome), destination: pathSummary(codexHome), valuesPrinted: false, }, message: error instanceof Error ? redactText(error.message) : "failed to prepare writable Codex home", } satisfies JsonRecord; return { terminalStatus: "blocked", failureKind: "secret-unavailable", failureMessage: "Codex Secret projection could not be copied to writable CODEX_HOME", events: [ { type: "error", payload }, { type: "terminal_status", payload: { terminalStatus: "blocked", failureKind: "secret-unavailable" } }, ], }; } } function codexHomeReadiness(codexHome: string, profile: BackendProfile): BackendTurnResult | null { const auth = fileReadable(`${codexHome}/auth.json`); const config = fileReadable(`${codexHome}/config.toml`); const modelCatalog = auth.readable && config.readable ? modelCatalogReadiness(codexHome, profile) : null; if (auth.readable && config.readable && !modelCatalog) return null; const payload = { failureKind: "secret-unavailable", projection: { codexHome: pathSummary(codexHome), authJson: auth, configToml: config, ...(modelCatalog ? { modelCatalogJson: modelCatalog } : {}), valuesPrinted: false, }, } satisfies JsonRecord; return { terminalStatus: "blocked", failureKind: "secret-unavailable", failureMessage: "Codex auth.json or config.toml projection is not readable", events: [ { type: "error", payload }, { type: "terminal_status", payload: { terminalStatus: "blocked", failureKind: "secret-unavailable" } }, ], }; } function projectedCodexHomeFiles(projectedHome: string): string[] { const required = ["auth.json", "config.toml"]; try { const optionalFiles = readdirSync(projectedHome).filter((name) => name === "model-catalog.json"); return [...required, ...optionalFiles]; } catch { return required; } } function modelCatalogReadiness(codexHome: string, profile: BackendProfile): JsonRecord | null { let configToml = ""; try { configToml = readFileSync(path.join(codexHome, "config.toml"), "utf8"); } catch { return null; } const modelCatalogPath = modelCatalogJsonPathFromConfig(configToml, codexHome); if (!modelCatalogPath) { return profile === "dsflash-go" ? { present: false, requiredByConfig: false, requiredByProfile: true, valuesPrinted: false } : null; } const readiness = fileReadable(modelCatalogPath); return readiness.readable ? null : { ...readiness, requiredByConfig: true, valuesPrinted: false }; } function modelCatalogJsonPathFromConfig(configToml: string, codexHome: string): string | null { const match = configToml.match(/^\s*model_catalog_json\s*=\s*"([^"]+)"\s*$/mu); if (!match?.[1]) return null; return path.isAbsolute(match[1]) ? match[1] : path.join(codexHome, match[1]); } function codexConfigSummary(codexHome: string, profile: BackendProfile): JsonRecord { const configPath = path.join(codexHome, "config.toml"); let configToml = ""; try { configToml = readFileSync(configPath, "utf8"); } catch { return { available: false, valuesPrinted: false }; } const model = tomlStringValue(configToml, "model"); const providerName = tomlStringValue(configToml, "model_provider"); const baseUrl = tomlStringValue(configToml, "base_url"); const modelCatalogPath = modelCatalogJsonPathFromConfig(configToml, codexHome); return { available: true, profile, model, providerName, baseUrl: baseUrl ? redactedUrlSummary(baseUrl) : null, wireApi: tomlStringValue(configToml, "wire_api"), contextWindow: tomlNumberValue(configToml, "model_context_window"), autoCompactTokenLimit: tomlNumberValue(configToml, "model_auto_compact_token_limit"), modelCatalogJson: modelCatalogPath ? { ...pathSummary(modelCatalogPath), readable: fileReadable(modelCatalogPath).readable } : null, valuesPrinted: false, }; } function tomlStringValue(configToml: string, key: string): string | null { const match = configToml.match(new RegExp(`^\\s*${escapeRegExp(key)}\\s*=\\s*"([^"]*)"\\s*$`, "mu")); return match?.[1] ?? null; } function tomlNumberValue(configToml: string, key: string): number | null { const match = configToml.match(new RegExp(`^\\s*${escapeRegExp(key)}\\s*=\\s*([0-9][0-9_]*)\\s*$`, "mu")); if (!match?.[1]) return null; const value = Number(match[1].replace(/_/gu, "")); return Number.isFinite(value) ? value : null; } function redactedUrlSummary(value: string): JsonRecord { try { const url = new URL(value); return { protocol: url.protocol.replace(/:$/u, ""), hostname: url.hostname, port: url.port || null, pathname: url.pathname, valuesPrinted: false }; } catch { return { valid: false, fingerprint: shortHash(value), valuesPrinted: false }; } } function escapeRegExp(value: string): string { return value.replace(/[.*+?^${}()|[\]\\]/gu, "\\$&"); } function normalizeCodexNotification(message: JsonRecord, suppressed: SuppressedNotificationSummary): { events: BackendEvent[]; assistantDelta?: { itemId: string | null; text: string }; completedAssistantMessage?: CompletedAssistantMessage; threadId?: string; turnId?: string; terminal?: { status: TerminalStatus; failureKind: FailureKind | null; message: string | null } } { const method = typeof message.method === "string" ? message.method : "unknown"; const params = asRecordAt(message, "params"); if (method === "thread/started") { const threadId = stringAt(asRecordAt(params, "thread"), "id"); return { events: [{ type: "backend_status", payload: { phase: method, threadId } }], ...(threadId ? { threadId } : {}) }; } if (method === "turn/started") { const turnId = stringAt(asRecordAt(params, "turn"), "id"); return { events: [{ type: "backend_status", payload: { phase: method, turnId } }], ...(turnId ? { turnId } : {}) }; } if (isSuppressedCodexStatusNotification(method)) { recordSuppressedNotification(suppressed, method); return { events: [] }; } if (method === "item/agentMessage/delta") return { events: [], assistantDelta: { itemId: stringAt(params, "itemId"), text: typeof params.delta === "string" ? params.delta : "" } }; if (method === "item/commandExecution/outputDelta") return { events: [{ type: "command_output", payload: commandOutputPayload("stdout", typeof params.delta === "string" ? params.delta : "") }] }; if (method === "item/reasoning/textDelta") { recordSuppressedNotification(suppressed, method, "reasoning"); return { events: [] }; } if ((method === "item/started" || method === "item/completed") && asRecordAt(params, "item").type === "agentMessage") { const item = asRecordAt(params, "item"); const itemId = stringAt(item, "id") ?? stringAt(params, "itemId"); const text = method === "item/completed" ? agentMessageText(item) : ""; const completedAssistantMessage = text.trim().length > 0 ? { itemId: itemId ?? null, text } : undefined; return { events: [], ...(completedAssistantMessage ? { completedAssistantMessage } : {}), }; } if (method === "item/started" || method === "item/completed") { const item = asRecordAt(params, "item"); const itemType = typeof item.type === "string" ? item.type : "unknown"; if (!isVisibleCodexToolItemType(itemType)) { recordSuppressedNotification(suppressed, method, itemType); return { events: [] }; } return { events: [{ type: "tool_call", payload: toolCallPayload(method, item) }] }; } if (method === "error") { const error = asRecordAt(params, "error"); const messageText = typeof error.message === "string" ? error.message : "Codex app-server error"; const failureKind = classifyCodexErrorRecord(error, "backend-failed"); const terminal = params.willRetry === true ? undefined : { status: "failed" as const, failureKind, message: redactText(messageText) }; return { events: [{ type: "error", payload: { failureKind, error: redactJson(error), willRetry: params.willRetry === true } }], ...(terminal ? { terminal } : {}) }; } if (method === "turn/completed") { const turn = asRecordAt(params, "turn"); if (typeof turn.status !== "string") { return { events: [{ type: "error", payload: { failureKind: "backend-response-invalid", message: "turn/completed notification omitted turn.status" } }], terminal: { status: "failed", failureKind: "backend-response-invalid", message: "turn/completed notification omitted turn.status" } }; } const status = terminalStatusFromValue(turn.status); const error = asRecordAt(turn, "error"); const messageText = typeof error.message === "string" ? redactText(error.message) : null; const failureKind = status === "completed" ? null : status === "cancelled" ? "cancelled" : classifyCodexErrorRecord(Object.keys(error).length > 0 ? error : { message: turn.status }, "backend-failed"); const events: BackendEvent[] = [{ type: "backend_status", payload: { phase: method, terminalStatus: status } }]; if (failureKind) events.push({ type: "error", payload: { failureKind, error: redactJson(error), phase: method } }); return { events, terminal: { status, failureKind, message: messageText } }; } return { events: [{ type: "backend_status", payload: { phase: method } }] }; } function createSuppressedNotificationSummary(): SuppressedNotificationSummary { return { total: 0, byMethod: {}, byItemType: {} }; } function recordSuppressedNotification(summary: SuppressedNotificationSummary, method: string, itemType?: string): void { summary.total += 1; summary.byMethod[method] = (summary.byMethod[method] ?? 0) + 1; if (itemType) summary.byItemType[itemType] = (summary.byItemType[itemType] ?? 0) + 1; } function suppressedNotificationEvents(summary: SuppressedNotificationSummary): BackendEvent[] { if (summary.total === 0) return []; return [{ type: "backend_status", payload: { phase: "codex-app-server-notifications-suppressed", total: summary.total, methods: countRecordEntries(summary.byMethod, "method"), itemTypes: countRecordEntries(summary.byItemType, "itemType"), valuesPrinted: false, }, }]; } function countRecordEntries(input: Record, keyName: "method" | "itemType"): JsonRecord[] { return Object.entries(input) .sort(([left], [right]) => left.localeCompare(right)) .map(([name, count]) => ({ [keyName]: name, count }) as JsonRecord); } function isSuppressedCodexStatusNotification(method: string): boolean { return method === "thread/tokenUsage/updated" || method === "account/rateLimits/updated" || method === "warning" || method === "configWarning"; } function isVisibleCodexToolItemType(itemType: string): boolean { return itemType === "commandExecution" || itemType === "webSearch" || itemType === "mcpToolCall" || itemType === "dynamicToolCall"; } function assistantMessageEventForCompleted(message: CompletedAssistantMessage, messageIndex: number): BackendEvent { return { type: "assistant_message", payload: { text: message.text, itemId: message.itemId, source: "completed-agent-message", messageIndex, messageCount: null, replyAuthority: false, final: false, }, }; } function assistantMessageEventsForTurn(assistantDeltaText: string, completed: boolean): BackendEvent[] { if (assistantDeltaText.trim().length === 0) return []; return [{ type: "assistant_message", payload: { text: assistantDeltaText, itemId: null, source: "agent-message-delta-fallback", messageIndex: 1, messageCount: 1, replyAuthority: completed, final: completed, }, }]; } function createAssistantDeltaProgressState(): AssistantDeltaProgressState { return new Map(); } function recordAssistantDeltaProgress(state: AssistantDeltaProgressState, delta: { itemId: string | null; text: string }): BackendEvent | null { if (!delta.text) return null; const key = delta.itemId ?? "default"; const current = state.get(key) ?? { itemId: delta.itemId, text: "", emittedChars: 0, flushed: false }; current.text += delta.text; current.flushed = false; state.set(key, current); if (current.text.length - current.emittedChars < assistantDeltaProgressMinChars) return null; current.emittedChars = current.text.length; return assistantDeltaProgressEvent(current, false); } function flushAssistantDeltaProgress(state: AssistantDeltaProgressState): BackendEvent[] { const events: BackendEvent[] = []; for (const item of state.values()) { if (item.flushed || item.text.trim().length === 0 || item.text.length === item.emittedChars) continue; item.emittedChars = item.text.length; item.flushed = true; events.push(assistantDeltaProgressEvent(item, true)); } return events; } function assistantDeltaProgressEvent(item: AssistantDeltaProgressItem, flush: boolean): BackendEvent { const summary = boundedTextSummary(item.text.trim(), { limitChars: assistantDeltaProgressLimitChars }); return { type: "assistant_message", payload: { text: summary.text, itemId: item.itemId, source: "agent-message-delta-progress", messageIndex: null, messageCount: null, replyAuthority: false, final: false, progress: true, progressFlush: flush, textBytes: summary.textBytes, textTruncated: summary.textTruncated, outputBytes: summary.outputBytes, outputTruncated: summary.outputTruncated, valuesPrinted: false, }, }; } function terminalStatusFromValue(value: unknown): TerminalStatus { if (value === "completed") return "completed"; if (value === "cancelled" || value === "canceled" || value === "interrupted") return "cancelled"; if (value === "blocked") return "blocked"; return "failed"; } function toolCallPayload(method: string, item: JsonRecord): JsonRecord { const redacted = redactJson(item); const itemId = typeof redacted.id === "string" ? redacted.id : null; const itemType = typeof redacted.type === "string" ? redacted.type : "unknown"; const toolName = toolCallName(redacted, itemType); const command = toolCallCommandSummary(redacted, itemType, toolName); const cwd = typeof redacted.cwd === "string" ? redacted.cwd : null; const status = toolCallStatus(method, redacted); const processId = typeof redacted.processId === "string" || typeof redacted.processId === "number" ? String(redacted.processId) : null; const exitCode = typeof redacted.exitCode === "number" ? redacted.exitCode : null; const durationMs = typeof redacted.durationMs === "number" ? redacted.durationMs : null; const outputSummary = toolCallOutputSummary(redacted); return { method, itemId, type: itemType, toolName, ...(command ? { command } : {}), ...(cwd ? { cwd } : {}), ...(status ? { status } : {}), ...(processId ? { processId } : {}), ...(exitCode !== null ? { exitCode } : {}), ...(durationMs !== null ? { durationMs } : {}), ...(outputSummary ? { outputSummary } : {}), valuesPrinted: false, }; } function toolCallStatus(method: string, item: JsonRecord): string | null { if (typeof item.status === "string" && item.status.trim().length > 0) return item.status; if (method === "item/started") return "started"; if (method === "item/completed") return "completed"; return null; } function toolCallName(item: JsonRecord, itemType: string): string { const direct = firstToolCallString(item, ["toolName", "name", "tool", "functionName"]); const server = firstToolCallString(item, ["serverName", "server", "mcpServer"]); if (server && direct && !direct.includes(server)) return `${server}.${direct}`; return direct ?? itemType; } function toolCallCommandSummary(item: JsonRecord, itemType: string, toolName: string): string | null { const direct = typeof item.command === "string" && item.command.trim().length > 0 ? item.command : null; if (direct) return direct; if (itemType !== "mcpToolCall" && itemType !== "dynamicToolCall") return null; const input = toolCallInputSummary(item); return input ? `${toolName} ${input}` : toolName; } function toolCallInputSummary(item: JsonRecord): string | null { for (const key of ["arguments", "args", "input", "params", "parameters"] as const) { if (!Object.prototype.hasOwnProperty.call(item, key)) continue; const value = item[key]; if (value === null || value === undefined) continue; const text = typeof value === "string" ? value : JSON.stringify(value); if (typeof text === "string" && text.trim().length > 0 && text.trim() !== "{}") return String(boundedTextSummary(text, { limitChars: 600 }).text); } return null; } function firstToolCallString(item: JsonRecord, keys: readonly string[]): string | null { for (const key of keys) { const value = item[key]; if (typeof value === "string" && value.trim().length > 0) return value; } return null; } function toolCallOutputSummary(item: JsonRecord): string | null { const direct = item.outputSummary ?? item.stdoutSummary ?? item.message; if (typeof direct === "string" && direct.trim().length > 0) return String(boundedTextSummary(direct).text); const summary = item.summary; if (typeof summary === "object" && summary !== null && !Array.isArray(summary) && typeof (summary as JsonRecord).text === "string") { const text = String((summary as JsonRecord).text); if (text.trim().length > 0) return String(boundedTextSummary(text).text); } const aggregated = item.aggregatedOutput; if (typeof aggregated === "string" && aggregated.trim().length > 0) return String(boundedTextSummary(aggregated).text); return null; } function withOptionalModel(params: JsonRecord, model: string | undefined): JsonRecord { const value = typeof model === "string" ? model.trim() : ""; if (!value) return params; return { ...params, model: value }; } function childEnv(options: CodexStdioTurnOptions, codexHome: string): NodeJS.ProcessEnv { return { ...process.env, ...options.env, CODEX_HOME: codexHome, CODEX_INTERNAL_ORIGINATOR_OVERRIDE: "agentrun", }; } function codexClientKey(options: CodexStdioTurnOptions, env: NodeJS.ProcessEnv): string { return JSON.stringify({ command: options.command ?? "codex", args: options.args ?? defaultCodexArgs, cwd: options.cwd, codexHome: env.CODEX_HOME ?? resolveCodexHome(options), backendProfile: options.backendProfile ?? "codex", model: options.model ?? null, approvalPolicy: options.approvalPolicy, sandbox: options.sandbox, }); } function resolveCodexHome(options: CodexStdioTurnOptions): string { return options.codexHome ?? options.env?.CODEX_HOME ?? `${options.env?.HOME ?? process.env.HOME ?? ""}/.codex`; } function validateInitializeResponse(value: JsonRecord): void { const serverInfo = value.serverInfo; if (serverInfo !== undefined && (typeof serverInfo !== "object" || serverInfo === null || Array.isArray(serverInfo))) { throw new CodexStdioFailure("backend-response-invalid", "initialize response serverInfo must be an object when present", "response:initialize", { response: value }); } } function requireResponseRecord(value: unknown, method: string): JsonRecord { if (typeof value === "object" && value !== null && !Array.isArray(value)) return value as JsonRecord; throw new CodexStdioFailure("backend-response-invalid", `${method} response result must be an object`, `response:${method}`); } function requireNestedId(value: JsonRecord, method: string, key: string): string { const id = stringAt(asRecordAt(value, key), "id"); if (id) return id; throw new CodexStdioFailure("backend-response-invalid", `${method} response did not include ${key}.id`, `response:${method}`, { response: value }); } function textInput(text: string): JsonValue[] { return [{ type: "text", text, text_elements: [] }]; } function initialPromptInjection(initialPrompt: InitialPromptAssembly | undefined, resume: boolean): { text: string; injected: boolean; reason: string } { if (!initialPrompt) return { text: "", injected: false, reason: "no-initial-prompt" }; if (resume) return { text: "", injected: false, reason: "thread-resume" }; return { text: [ "", initialPrompt.text, "", ].join("\n"), injected: true, reason: "thread-start", }; } function textInputForUserMessage(prompt: string, initial: ReturnType): JsonValue[] { if (!initial.injected) return textInput(prompt); return textInput([ initial.text, "", prompt, "", ].join("\n")); } function agentMessageText(item: JsonRecord): string { for (const key of ["text", "content", "message"]) { const value = item[key]; if (typeof value === "string") return value; } for (const key of ["text_elements", "content"]) { const value = item[key]; if (!Array.isArray(value)) continue; const parts = value.flatMap((entry) => { if (typeof entry === "string") return [entry]; if (typeof entry !== "object" || entry === null || Array.isArray(entry)) return []; const record = entry as JsonRecord; return typeof record.text === "string" ? [record.text] : []; }); if (parts.length > 0) return parts.join(""); } return ""; } function fileReadable(filePath: string): JsonRecord { try { accessSync(filePath, fsConstants.R_OK); return { ...pathSummary(filePath), readable: true }; } catch { return { ...pathSummary(filePath), readable: false }; } } function pathSummary(value: string): JsonRecord { const raw = String(value || ""); const parts = raw.split(/[\\/]+/u).filter(Boolean); return { present: raw.trim().length > 0, absolute: path.isAbsolute(raw), basename: parts.at(-1) ?? null, depth: parts.length, fingerprint: shortHash(raw), valuePrinted: false, }; } function runtimeSummary(options: CodexStdioTurnOptions, env: NodeJS.ProcessEnv, codexHome: string): JsonRecord { return { command: options.command ?? "codex", args: options.args ?? defaultCodexArgs, cwd: pathSummary(options.cwd), workspace: pathSummary(options.cwd), codexHome: pathSummary(codexHome), env: envSummary(env), valuesPrinted: false, }; } function backendMetadata(options: CodexStdioTurnOptions): JsonRecord { const profile = options.backendProfile ?? "codex"; const spec = backendProfileSpec(profile); return { backendProfile: profile, backendKind: spec?.backendKind ?? "codex-app-server-stdio", protocol: spec?.protocol ?? codexProtocol, transport: spec?.transport ?? "stdio", sandbox: { requested: options.requestedSandbox ?? options.sandbox, effective: options.sandbox, overrideSource: options.sandboxOverrideSource ?? null, valuesPrinted: false, }, }; } function envSummary(env: NodeJS.ProcessEnv): JsonRecord { const keyState: Record = {}; for (const key of childEnvSummaryKeys) keyState[key] = { present: typeof env[key] === "string" && String(env[key]).length > 0 }; const secretLikeKeyCount = Object.keys(env).filter((key) => /auth|authorization|api[_-]?key|token|password|secret|credential/iu.test(key)).length; return { keyCount: Object.keys(env).length, trackedKeys: keyState, secretLikeKeyCount, valuesPrinted: false, }; } function closeEvent(closeInfo: CodexStdioCloseInfo): JsonRecord { return { code: closeInfo.code, signal: closeInfo.signal, failureKind: closeInfo.failureKind, message: closeInfo.message, stderrTail: closeInfo.stderrTail.slice(-stderrEventChars), stderrBytes: closeInfo.stderrBytes, stderrTruncated: closeInfo.stderrTruncated || closeInfo.stderrTail.length > stderrEventChars, }; } function terminalFromClose(closeInfo: CodexStdioCloseInfo): { status: TerminalStatus; failureKind: FailureKind; message: string } { const baseMessage = `codex app-server closed before turn/completed code=${closeInfo.code} signal=${closeInfo.signal}`; const combined = [closeInfo.message ?? "", closeInfo.stderrTail].filter(Boolean).join("\n"); const failureKind = closeInfo.failureKind ?? classifyMessageFailureKind(combined, "backend-response-invalid"); const stderrPreview = closeInfo.stderrTail.trim().length > 0 ? `; stderrTail=${closeInfo.stderrTail.slice(-1000)}` : ""; return { status: "failed", failureKind, message: redactText(`${baseMessage}${stderrPreview}`) }; } function failureFromRpcError(method: string, value: unknown): CodexStdioFailure { const error = typeof value === "object" && value !== null ? value as Record : {}; const message = typeof error.message === "string" ? error.message : JSON.stringify(redactJson(value as JsonValue)); return new CodexStdioFailure(classifyMessageFailureKind(message, "backend-protocol-error"), `codex app-server ${method} error: ${message}`, `response:${method}`, { method, error: redactJson(value as JsonValue) }); } function spawnFailure(command: string, error: unknown): CodexStdioFailure { const message = error instanceof Error ? error.message : String(error); const code = typeof error === "object" && error !== null && "code" in error ? String((error as { code?: unknown }).code ?? "") : ""; return new CodexStdioFailure("backend-spawn-failed", `failed to start Codex app-server command ${command}: ${message}`, "process:spawn", { command, code }); } function normalizeFailure(error: unknown): CodexStdioFailure { if (error instanceof CodexStdioFailure) return error; const message = error instanceof Error ? error.message : String(error); return new CodexStdioFailure(classifyMessageFailureKind(message, "backend-protocol-error"), message, "codex-stdio"); } function threadResumeFailure(threadId: string, error: CodexStdioFailure): CodexStdioFailure { return new CodexStdioFailure( "thread-resume-failed", `codex app-server thread/resume failed for existing thread: ${error.message}`, "thread/resume", { requestedThreadId: threadId, originalFailureKind: error.failureKind, originalPhase: error.phase, originalDetails: redactJson(error.details), valuesPrinted: false, }, ); } function isNoRolloutFoundMessage(message: string): boolean { return /no rollout found for thread id/i.test(message); } function classifyCodexErrorRecord(error: JsonRecord, fallback: FailureKind): FailureKind { const parts: string[] = []; if (typeof error.message === "string") parts.push(error.message); if (typeof error.additionalDetails === "string") parts.push(error.additionalDetails); const redactedJson = JSON.stringify(redactJson(error as JsonValue)); if (redactedJson && redactedJson !== "{}") parts.push(redactedJson); return classifyMessageFailureKind(parts.join("\n"), fallback); } function classifyMessageFailureKind(message: string, fallback: FailureKind): FailureKind { const text = String(message || "").toLowerCase(); if (isProviderCompactUnsupportedMessage(text)) return "provider-compact-unsupported"; if (/invalid[_ -]?prompt/u.test(text) && /invalid function arguments json string|tool_call_id/u.test(text)) return "provider-invalid-tool-call"; if (/invalid function arguments json string/u.test(text)) return "provider-invalid-tool-call"; if (/rate.?limit|too many requests|\b429\b/u.test(text)) return "provider-rate-limited"; if (/\b401\b|\b403\b|unauthori[sz]ed|forbidden|invalid api key|api key (?:is )?(?:required|missing)|authentication|auth failed|oauth|access token/u.test(text)) return "provider-auth-failed"; if (isProviderUnavailableMessage(text)) return "provider-unavailable"; if (isProviderStreamDisconnectedMessage(text)) return "provider-stream-disconnected"; if (isProviderHttpErrorMessage(text)) return "provider-http-error"; if (/timed out|timeout|idle timeout/u.test(text)) return "backend-timeout"; if (/invalid json|json parse/u.test(text)) return "backend-json-parse-error"; return fallback; } function isProviderCompactUnsupportedMessage(text: string): boolean { return /responses\/compact|\/compact\b/u.test(text) && /\b404\b|not found|unsupported|no route|not implemented/u.test(text); } function isProviderStreamDisconnectedMessage(text: string): boolean { return /responsestreamdisconnected|response stream disconnected|stream disconnected before completion|disconnected before completion/u.test(text); } function isProviderHttpErrorMessage(text: string): boolean { if (/\b(?:http(?:\s+status)?|status(?:\s+code)?|unexpected status|status|code)\s*[:=]?\s*[45]\d\d\b/u.test(text)) return true; if (/\b[45]\d\d\b/u.test(text) && /http|status|service unavailable|bad gateway|gateway timeout|internal server error|not found|provider|upstream/u.test(text)) return true; return false; } function isProviderUnavailableMessage(text: string): boolean { if (/\b(?:http(?:\s+status)?|status(?:\s+code)?|code)\s*[:=]?\s*5\d\d\b/u.test(text)) return true; if (/\b5\d\d\b/u.test(text) && /service unavailable|bad gateway|gateway timeout|internal server error|provider|upstream|response\s*stream\s*disconnected|responsestreamdisconnected/u.test(text)) return true; if (/connection refused|econnrefused|connection reset|websocket.*(?:refused|unavailable|closed)/u.test(text)) return true; if (/service unavailable|temporar(?:y|ily) unavailable|provider (?:is )?unavailable|provider availability|upstream (?:is )?unavailable/u.test(text)) return true; return false; } function positiveTimeout(value: number): number { return Number.isFinite(value) && value > 0 ? Math.max(1, Math.floor(value)) : requestTimeoutCapMs; } function asRecordAt(value: JsonRecord, key: string): JsonRecord { const next = value[key]; return typeof next === "object" && next !== null && !Array.isArray(next) ? next as JsonRecord : {}; } function stringAt(value: JsonRecord, key: string): string | null { return typeof value[key] === "string" && String(value[key]).length > 0 ? String(value[key]) : null; } function shortHash(value: string): string { return createHash("sha256").update(value).digest("hex").slice(0, 12); }