diff --git a/docs/reference/spec-v01-backend-adapter.md b/docs/reference/spec-v01-backend-adapter.md index b43ae62..564bfdb 100644 --- a/docs/reference/spec-v01-backend-adapter.md +++ b/docs/reference/spec-v01-backend-adapter.md @@ -52,6 +52,9 @@ Adapter 必须把 backend 错误映射为稳定 failureKind: | `provider-auth-failed` | provider credential 或 auth file 无效、上游返回 401/403。 | | `provider-rate-limited` | 上游限流或 quota 错误。 | | `backend-protocol-error` | backend 输出无法解析、协议字段缺失。 | +| `backend-json-parse-error` | backend stdout 不是合法 JSON-RPC 行。 | +| `backend-response-invalid` | backend JSON-RPC response/terminal notification 缺少必需字段。 | +| `backend-spawn-failed` | backend app-server 进程无法启动。 | | `backend-failed` | backend 进程非零退出或 terminal error。 | | `backend-timeout` | executionPolicy timeout 触发。 | | `cancelled` | interrupt/cancel 生效。 | diff --git a/docs/reference/spec-v01-backend-codex.md b/docs/reference/spec-v01-backend-codex.md index a883064..ee34ba1 100644 --- a/docs/reference/spec-v01-backend-codex.md +++ b/docs/reference/spec-v01-backend-codex.md @@ -96,6 +96,7 @@ Run 的 `executionPolicy.secretScope` 应引用 `agentrun-v01-provider-codex` | --- | --- | --- | | Codex backend 规格 | 已定义 | 本文为 v0.1 第一真实 backend 权威。 | | Codex Secret projection | 未实现 | 需要后续 Kubernetes Secret 和 runner/backend manifest。 | -| Codex adapter | 未实现 | 需要后续代码实现。 | +| Codex adapter | 已部分实现 | 当前代码已实现受控 `codex app-server --listen stdio://`、`initialize`/`thread/start`/`thread/resume`/`turn/start` response 校验、stderr 有界诊断、spawn/JSON parse/response invalid/timeout failureKind 和 fake app-server 自测试。 | +| 错误可观测与脱敏 | 已部分实现 | child env、cwd、workspace 和 Codex home 只输出摘要;stderr tail 有界且标记截断;事件和 failure 统一走 redaction。 | | 真实 provider turn | 未实现 | 综合联调必须真实完成后才能发布通过。 | | hostPath `~/.codex` | 不采用 | 只能通过 Kubernetes Secret projection 注入。 | diff --git a/src/backend/adapter.ts b/src/backend/adapter.ts index d12ff54..cb8115c 100644 --- a/src/backend/adapter.ts +++ b/src/backend/adapter.ts @@ -21,6 +21,7 @@ export async function runBackendTurn(run: RunRecord, command: CommandRecord, opt timeoutMs: run.executionPolicy.timeoutMs, }; if (typeof command.payload.model === "string") turnOptions.model = command.payload.model; + if (typeof command.payload.threadId === "string") turnOptions.threadId = command.payload.threadId; if (options.codexCommand) turnOptions.command = options.codexCommand; if (options.codexArgs) turnOptions.args = options.codexArgs; if (options.env) turnOptions.env = options.env; diff --git a/src/backend/codex-stdio.ts b/src/backend/codex-stdio.ts index c82365c..83ecbd4 100644 --- a/src/backend/codex-stdio.ts +++ b/src/backend/codex-stdio.ts @@ -1,13 +1,40 @@ import { spawn, type ChildProcessWithoutNullStreams } from "node:child_process"; +import { createHash } from "node:crypto"; import { accessSync, constants as fsConstants } from "node:fs"; +import path from "node:path"; import * as readline from "node:readline"; import type { BackendEvent, BackendTurnResult, FailureKind, JsonRecord, JsonValue, TerminalStatus } from "../common/types.js"; import { redactJson, redactText } from "../common/redaction.js"; +const codexProtocol = "codex-app-server-jsonrpc-stdio"; +const defaultCodexArgs = ["app-server", "--listen", "stdio://"]; +const stderrBufferBytes = 64_000; +const stderrEventChars = 4_000; +const requestTimeoutCapMs = 30_000; + +const childEnvSummaryKeys = [ + "CODEX_HOME", + "HOME", + "PATH", + "HTTP_PROXY", + "HTTPS_PROXY", + "ALL_PROXY", + "NO_PROXY", + "http_proxy", + "https_proxy", + "all_proxy", + "no_proxy", + "OPENAI_API_KEY", + "CODEX_API_KEY", + "GITHUB_TOKEN", + "GH_TOKEN", +]; + export interface CodexStdioTurnOptions { prompt: string; cwd: string; model?: string; + threadId?: string; approvalPolicy: string; sandbox: string; timeoutMs: number; @@ -18,48 +45,87 @@ export interface CodexStdioTurnOptions { } interface PendingRequest { + method: string; + timer: NodeJS.Timeout; resolve: (value: unknown) => void; reject: (error: Error) => void; } +interface CodexStdioCloseInfo extends JsonRecord { + code: number | null; + signal: string | null; + stderrTail: string; + stderrBytes: number; + stderrTruncated: boolean; + failureKind: FailureKind | null; + message: string | null; +} + +class CodexStdioFailure extends Error { + readonly failureKind: FailureKind; + readonly phase: string; + readonly details: JsonRecord; + + constructor(failureKind: FailureKind, message: string, phase: string, details: JsonRecord = {}) { + super(redactText(message)); + this.name = "CodexStdioFailure"; + this.failureKind = failureKind; + this.phase = phase; + this.details = redactJson(details); + } +} + export class CodexStdioClient { private readonly child: ChildProcessWithoutNullStreams; private readonly pending = new Map(); - private readonly stderrChunks: Buffer[] = []; + private stderrTailBuffer = Buffer.alloc(0); + private stderrBytes = 0; private nextId = 1; private closed = false; - readonly closedPromise: Promise<{ code: number | null; signal: string | null; stderrTail: string }>; - private closeResolve!: (value: { code: number | null; signal: string | null; stderrTail: string }) => void; + private closeFailure: CodexStdioFailure | null = null; + readonly closedPromise: Promise; + private closeResolve!: (value: CodexStdioCloseInfo) => void; constructor(options: { command?: string; args?: string[]; cwd: string; env?: NodeJS.ProcessEnv; onNotification: (message: JsonRecord) => void }) { this.closedPromise = new Promise((resolve) => { this.closeResolve = resolve; }); - this.child = spawn(options.command ?? "codex", options.args ?? ["app-server", "--listen", "stdio://"], { - cwd: options.cwd, - env: options.env ?? process.env, - stdio: "pipe", - }); - this.child.stderr.on("data", (chunk: Buffer) => { - this.stderrChunks.push(chunk); - while (Buffer.concat(this.stderrChunks).length > 64_000) this.stderrChunks.shift(); - }); + const command = options.command ?? "codex"; + const args = options.args ?? defaultCodexArgs; + try { + this.child = spawn(command, args, { + cwd: options.cwd, + env: options.env ?? process.env, + stdio: "pipe", + }); + } catch (error) { + throw spawnFailure(command, error); + } + this.child.stderr.on("data", (chunk: Buffer) => this.appendStderr(chunk)); const rl = readline.createInterface({ input: this.child.stdout, crlfDelay: Infinity }); void this.readLines(rl, options.onNotification); this.child.on("close", (code, signal) => this.handleClose(code, signal)); - this.child.on("error", (error) => this.handleClose(127, error.message)); + this.child.on("error", (error) => this.handleClose(127, null, spawnFailure(command, error))); } - request(method: string, params: JsonRecord): Promise { - if (this.closed) return Promise.reject(new Error("codex app-server is closed")); + request(method: string, params: JsonRecord, timeoutMs = requestTimeoutCapMs): Promise { + if (this.closed) return Promise.reject(this.closeFailure ?? new CodexStdioFailure("backend-failed", "codex app-server is closed", `request:${method}`)); const id = this.nextId++; const message = { id, method, params }; + const effectiveTimeoutMs = positiveTimeout(timeoutMs); return new Promise((resolve, reject) => { - this.pending.set(id, { resolve, reject }); - this.child.stdin.write(`${JSON.stringify(message)}\n`); + const timer = setTimeout(() => { + this.rejectRequest(id, new CodexStdioFailure("backend-timeout", `Codex stdio request ${method} timed out after ${effectiveTimeoutMs}ms`, `request:${method}`, { method, timeoutMs: effectiveTimeoutMs })); + }, effectiveTimeoutMs); + this.pending.set(id, { method, timer, resolve, reject }); + this.child.stdin.write(`${JSON.stringify(message)}\n`, "utf8", (error: Error | null | undefined) => { + if (!error) return; + this.rejectRequest(id, new CodexStdioFailure("backend-failed", `failed to write Codex stdio request ${method}: ${error.message}`, `request:${method}`, { method })); + }); }); } notify(method: string, params: JsonRecord = {}): void { - if (!this.closed) this.child.stdin.write(`${JSON.stringify({ method, params })}\n`); + if (this.closed) return; + this.child.stdin.write(`${JSON.stringify({ method, params })}\n`, "utf8", () => undefined); } stop(): void { @@ -70,33 +136,65 @@ export class CodexStdioClient { }, 1500).unref?.(); } + private appendStderr(chunk: Buffer): void { + this.stderrBytes += chunk.byteLength; + const next = Buffer.concat([this.stderrTailBuffer, chunk]); + this.stderrTailBuffer = next.byteLength > stderrBufferBytes ? next.subarray(next.byteLength - stderrBufferBytes) : next; + } + private async readLines(rl: readline.Interface, onNotification: (message: JsonRecord) => void): Promise { try { for await (const line of rl) { const trimmed = String(line).trim(); if (trimmed.length === 0) continue; - const message = JSON.parse(trimmed) as JsonRecord; - const id = typeof message.id === "number" ? message.id : null; - const method = typeof message.method === "string" ? message.method : null; - if (id !== null && method === null) { - const pending = this.pending.get(id); - if (!pending) continue; - this.pending.delete(id); - if (message.error !== undefined) pending.reject(new Error(JSON.stringify(redactJson(message.error)))); - else pending.resolve(message.result); - continue; + let message: JsonRecord; + try { + message = JSON.parse(trimmed) as JsonRecord; + } catch { + this.handleProtocolFailure(new CodexStdioFailure("backend-json-parse-error", "codex app-server emitted invalid JSON on stdout", "stdout:parse", { linePreview: redactText(trimmed.slice(0, 800)), lineChars: trimmed.length })); + break; } - if (id !== null && method !== null) { - this.handleServerRequest(id, method); - continue; - } - if (method !== null) onNotification(message); + this.handleMessage(message, onNotification); } } catch (error) { - this.rejectAll(error instanceof Error ? error : new Error(String(error))); + this.handleProtocolFailure(new CodexStdioFailure("backend-protocol-error", error instanceof Error ? error.message : String(error), "stdout:read")); } } + private handleMessage(message: JsonRecord, onNotification: (message: JsonRecord) => void): void { + const id = typeof message.id === "number" ? message.id : null; + const method = typeof message.method === "string" ? message.method : null; + if (id !== null && method === null) { + this.handleResponse(id, message); + return; + } + if (id !== null && method !== null) { + this.handleServerRequest(id, method); + return; + } + if (method !== null) { + onNotification(message); + return; + } + this.handleProtocolFailure(new CodexStdioFailure("backend-response-invalid", "codex app-server message had neither JSON-RPC id nor method", "stdout:message", { message })); + } + + private handleResponse(id: number, message: JsonRecord): void { + const pending = this.pending.get(id); + if (!pending) return; + this.pending.delete(id); + clearTimeout(pending.timer); + if (message.error !== undefined) { + pending.reject(failureFromRpcError(pending.method, message.error)); + return; + } + if (!("result" in message)) { + pending.reject(new CodexStdioFailure("backend-response-invalid", `codex app-server response for ${pending.method} omitted result and error`, `response:${pending.method}`, { method: pending.method })); + return; + } + pending.resolve(message.result); + } + private handleServerRequest(id: number, method: string): void { if (method === "item/commandExecution/requestApproval" || method === "item/fileChange/requestApproval") { this.child.stdin.write(`${JSON.stringify({ id, result: { decision: "decline" } })}\n`); @@ -105,92 +203,172 @@ export class CodexStdioClient { this.child.stdin.write(`${JSON.stringify({ id, error: { code: -32601, message: `Unsupported client-side request: ${method}` } })}\n`); } + private rejectRequest(id: number, error: CodexStdioFailure): void { + const pending = this.pending.get(id); + if (!pending) return; + this.pending.delete(id); + clearTimeout(pending.timer); + pending.reject(error); + } + private rejectAll(error: Error): void { - for (const pending of this.pending.values()) pending.reject(error); + for (const pending of this.pending.values()) { + clearTimeout(pending.timer); + pending.reject(error); + } this.pending.clear(); } - private handleClose(code: number | null, signal: string | null): void { + private handleProtocolFailure(error: CodexStdioFailure): void { + if (this.closed) return; + this.closeFailure = error; + this.rejectAll(error); + this.stop(); + } + + private handleClose(code: number | null, signal: string | null, failure: CodexStdioFailure | null = null): void { if (this.closed) return; this.closed = true; - const stderrTail = redactText(Buffer.concat(this.stderrChunks).toString("utf8").slice(-8000)); - this.rejectAll(new Error(`codex app-server closed code=${code} signal=${signal}`)); - this.closeResolve({ code, signal, stderrTail }); + if (failure) this.closeFailure = failure; + const stderr = this.stderrInfo(); + const closeInfo: CodexStdioCloseInfo = { + code, + signal, + stderrTail: stderr.stderrTail, + stderrBytes: this.stderrBytes, + stderrTruncated: stderr.stderrTruncated, + failureKind: this.closeFailure?.failureKind ?? null, + message: this.closeFailure?.message ?? null, + }; + this.rejectAll(this.closeFailure ?? new CodexStdioFailure("backend-failed", `codex app-server closed code=${code} signal=${signal}`, "process:close", closeInfo)); + this.closeResolve(closeInfo); + } + + private stderrInfo(): { stderrTail: string; stderrTruncated: boolean } { + const buffered = this.stderrTailBuffer.toString("utf8"); + const tail = buffered.slice(-8000); + return { + stderrTail: redactText(tail), + stderrTruncated: this.stderrBytes > this.stderrTailBuffer.byteLength || buffered.length > tail.length, + }; } } export async function runCodexStdioTurn(options: CodexStdioTurnOptions): Promise { - const secretFailure = codexHomeReadiness(options.codexHome ?? options.env?.CODEX_HOME ?? `${options.env?.HOME ?? process.env.HOME ?? ""}/.codex`); + const codexHome = resolveCodexHome(options); + const secretFailure = codexHomeReadiness(codexHome); if (secretFailure) return secretFailure; - const events: BackendEvent[] = [{ type: "backend_status", payload: { phase: "codex-app-server-starting", protocol: "codex-app-server-jsonrpc-stdio" } }]; + const env = childEnv(options, codexHome); + const events: BackendEvent[] = [{ + type: "backend_status", + payload: { + phase: "codex-app-server-starting", + protocol: codexProtocol, + runtime: runtimeSummary(options, env, codexHome), + }, + }]; let assistantText = ""; - let threadId: string | undefined; + let threadId: string | undefined = options.threadId; let turnId: string | undefined; let terminal: { status: TerminalStatus; failureKind: FailureKind | null; message: string | null } | null = null; let terminalResolve!: () => void; const terminalPromise = new Promise((resolve) => { terminalResolve = resolve; }); - const clientOptions: ConstructorParameters[0] = { - cwd: options.cwd, - env: childEnv(options), - onNotification: (message) => { - const normalized = normalizeCodexNotification(message); - if (normalized.threadId) threadId = normalized.threadId; - if (normalized.turnId) turnId = normalized.turnId; - if (normalized.assistantDelta) assistantText += normalized.assistantDelta; - events.push(...normalized.events); - if (normalized.terminal) { - terminal = normalized.terminal; - terminalResolve(); - } - }, - }; - if (options.command) clientOptions.command = options.command; - if (options.args) clientOptions.args = options.args; - const client = new CodexStdioClient(clientOptions); + let client: CodexStdioClient | null = null; + const requestTimeoutMs = Math.min(positiveTimeout(options.timeoutMs), requestTimeoutCapMs); const timeout = setTimeout(() => { - if (!terminal) { - terminal = { status: "failed", failureKind: "backend-timeout", message: `codex stdio turn timed out after ${options.timeoutMs}ms` }; - events.push({ type: "error", payload: terminal }); - client.stop(); - terminalResolve(); - } - }, options.timeoutMs); + if (terminal) return; + terminal = { status: "failed", failureKind: "backend-timeout", message: `codex stdio turn timed out after ${options.timeoutMs}ms` }; + events.push({ type: "error", payload: { failureKind: terminal.failureKind, message: terminal.message, phase: "turn:timeout" } }); + client?.stop(); + terminalResolve(); + }, positiveTimeout(options.timeoutMs)); try { - await client.request("initialize", { clientInfo: { name: "agentrun", title: "AgentRun", version: "0.1.0" }, capabilities: { experimentalApi: true } }); - const threadResponse = asResponseRecord(await client.request("thread/start", { model: options.model ?? "default", cwd: options.cwd, approvalPolicy: options.approvalPolicy, sandbox: options.sandbox, serviceName: "agentrun" })); - threadId = stringAt(asRecordAt(threadResponse, "thread"), "id") ?? threadId; - const turnResponse = asResponseRecord(await client.request("turn/start", { threadId: threadId ?? "", input: [{ type: "text", text: options.prompt, text_elements: [] }], cwd: options.cwd, approvalPolicy: options.approvalPolicy, model: options.model ?? "default" })); - turnId = stringAt(asRecordAt(turnResponse, "turn"), "id") ?? turnId; - await Promise.race([terminalPromise, client.closedPromise]); - if (!terminal) terminal = { status: "failed", failureKind: "backend-protocol-error", message: "codex app-server closed before turn/completed" }; + const clientOptions: ConstructorParameters[0] = { + cwd: options.cwd, + env, + onNotification: (message) => { + const normalized = normalizeCodexNotification(message); + if (normalized.threadId) threadId = normalized.threadId; + if (normalized.turnId) turnId = normalized.turnId; + if (normalized.assistantDelta) assistantText += normalized.assistantDelta; + events.push(...normalized.events); + if (normalized.terminal && !terminal) { + terminal = normalized.terminal; + terminalResolve(); + } + }, + }; + if (options.command) clientOptions.command = options.command; + if (options.args) clientOptions.args = options.args; + client = new CodexStdioClient(clientOptions); + const initializeResult = requireResponseRecord(await client.request("initialize", { clientInfo: { name: "agentrun", title: "AgentRun", version: "0.1.0" }, capabilities: { experimentalApi: true } }, requestTimeoutMs), "initialize"); + validateInitializeResponse(initializeResult); + client.notify("initialized", {}); + events.push({ type: "backend_status", payload: { phase: "initialize:completed", protocol: codexProtocol } }); + + const threadMethod = options.threadId ? "thread/resume" : "thread/start"; + const threadParams: JsonRecord = options.threadId + ? { threadId: options.threadId, model: options.model ?? "default", cwd: options.cwd, approvalPolicy: options.approvalPolicy, sandbox: options.sandbox } + : { model: options.model ?? "default", cwd: options.cwd, approvalPolicy: options.approvalPolicy, sandbox: options.sandbox, serviceName: "agentrun" }; + const threadResponse = requireResponseRecord(await client.request(threadMethod, threadParams, requestTimeoutMs), threadMethod); + threadId = requireNestedId(threadResponse, threadMethod, "thread"); + events.push({ type: "backend_status", payload: { phase: `${threadMethod}:completed`, threadId } }); + + const turnResponse = requireResponseRecord(await client.request("turn/start", { threadId, input: textInput(options.prompt), cwd: options.cwd, approvalPolicy: options.approvalPolicy, model: options.model ?? "default" }, requestTimeoutMs), "turn/start"); + turnId = requireNestedId(turnResponse, "turn/start", "turn"); + events.push({ type: "backend_status", payload: { phase: "turn/start:completed", turnId } }); + + const race = await Promise.race([ + terminalPromise.then(() => ({ kind: "terminal" as const })), + client.closedPromise.then((closeInfo) => ({ kind: "closed" as const, closeInfo })), + ]); + if (race.kind === "closed" && !terminal) { + terminal = terminalFromClose(race.closeInfo); + events.push({ type: "error", payload: { failureKind: terminal.failureKind, message: terminal.message, phase: "transport:closed-before-terminal", appServerExit: closeEvent(race.closeInfo) } }); + } + if (!terminal) terminal = { status: "failed", failureKind: "backend-response-invalid", message: "codex app-server did not emit turn/completed" }; } catch (error) { - terminal = { status: "failed", failureKind: "backend-protocol-error", message: error instanceof Error ? error.message : String(error) }; - events.push({ type: "error", payload: { failureKind: terminal.failureKind, message: terminal.message } }); + if (!terminal) { + const failure = normalizeFailure(error); + terminal = { status: failure.failureKind === "secret-unavailable" ? "blocked" : "failed", failureKind: failure.failureKind, message: failure.message }; + events.push({ type: "error", payload: { failureKind: failure.failureKind, message: failure.message, phase: failure.phase, details: failure.details } }); + } } finally { clearTimeout(timeout); - client.stop(); + if (client) { + client.stop(); + const closeInfo = await client.closedPromise; + events.push({ type: "backend_status", payload: { phase: "codex-app-server-closed", appServerExit: closeEvent(closeInfo) } }); + } } + if (!terminal) terminal = { status: "failed", failureKind: "backend-response-invalid", message: "codex app-server finished without terminal status" }; if (assistantText.trim().length > 0) events.push({ type: "assistant_message", payload: { text: assistantText } }); events.push({ type: "terminal_status", payload: { terminalStatus: terminal.status, failureKind: terminal.failureKind, message: terminal.message } }); return { terminalStatus: terminal.status, failureKind: terminal.failureKind, failureMessage: terminal.message, events: events.map((event) => ({ ...event, payload: redactJson(event.payload) })), ...(threadId ? { threadId } : {}), ...(turnId ? { turnId } : {}) }; } function codexHomeReadiness(codexHome: string): BackendTurnResult | null { - try { - accessSync(`${codexHome}/auth.json`, fsConstants.R_OK); - accessSync(`${codexHome}/config.toml`, fsConstants.R_OK); - return null; - } catch { - return { - terminalStatus: "blocked", - failureKind: "secret-unavailable", - failureMessage: "Codex auth.json or config.toml projection is not readable", - events: [ - { type: "error", payload: { failureKind: "secret-unavailable", credentialSource: { codexHome, valuesPrinted: false } } }, - { type: "terminal_status", payload: { terminalStatus: "blocked", failureKind: "secret-unavailable" } }, - ], - }; - } + const auth = fileReadable(`${codexHome}/auth.json`); + const config = fileReadable(`${codexHome}/config.toml`); + if (auth.readable && config.readable) return null; + const payload = { + failureKind: "secret-unavailable", + projection: { + codexHome: pathSummary(codexHome), + authJson: auth, + configToml: config, + valuesPrinted: false, + }, + } satisfies JsonRecord; + return { + terminalStatus: "blocked", + failureKind: "secret-unavailable", + failureMessage: "Codex auth.json or config.toml projection is not readable", + events: [ + { type: "error", payload }, + { type: "terminal_status", payload: { terminalStatus: "blocked", failureKind: "secret-unavailable" } }, + ], + }; } function normalizeCodexNotification(message: JsonRecord): { events: BackendEvent[]; assistantDelta?: string; threadId?: string; turnId?: string; terminal?: { status: TerminalStatus; failureKind: FailureKind | null; message: string | null } } { @@ -207,33 +385,163 @@ function normalizeCodexNotification(message: JsonRecord): { events: BackendEvent if (method === "item/agentMessage/delta") return { events: [], assistantDelta: typeof params.delta === "string" ? params.delta : "" }; if (method === "item/commandExecution/outputDelta") return { events: [{ type: "command_output", payload: { stream: "stdout", text: typeof params.delta === "string" ? params.delta : "" } }] }; if (method === "item/started" || method === "item/completed") return { events: [{ type: "tool_call", payload: { method, item: redactJson(asRecordAt(params, "item")) } }] }; - if (method === "error") return { events: [{ type: "error", payload: { failureKind: "backend-failed", error: redactJson(params.error ?? params) } }] }; + if (method === "error") { + const error = asRecordAt(params, "error"); + const messageText = typeof error.message === "string" ? error.message : "Codex app-server error"; + const failureKind = classifyMessageFailureKind(messageText, "backend-failed"); + const terminal = params.willRetry === true ? undefined : { status: "failed" as const, failureKind, message: redactText(messageText) }; + return { events: [{ type: "error", payload: { failureKind, error: redactJson(error), willRetry: params.willRetry === true } }], ...(terminal ? { terminal } : {}) }; + } if (method === "turn/completed") { const turn = asRecordAt(params, "turn"); + if (typeof turn.status !== "string") { + return { events: [{ type: "error", payload: { failureKind: "backend-response-invalid", message: "turn/completed notification omitted turn.status" } }], terminal: { status: "failed", failureKind: "backend-response-invalid", message: "turn/completed notification omitted turn.status" } }; + } const status = terminalStatusFromValue(turn.status); const error = asRecordAt(turn, "error"); - const messageText = typeof error.message === "string" ? error.message : null; - return { events: [{ type: "backend_status", payload: { phase: method, terminalStatus: status } }], terminal: { status, failureKind: status === "completed" ? null : "backend-failed", message: messageText } }; + const messageText = typeof error.message === "string" ? redactText(error.message) : null; + return { events: [{ type: "backend_status", payload: { phase: method, terminalStatus: status } }], terminal: { status, failureKind: status === "completed" ? null : classifyMessageFailureKind(messageText ?? turn.status, "backend-failed"), message: messageText } }; } return { events: [{ type: "backend_status", payload: { phase: method } }] }; } function terminalStatusFromValue(value: unknown): TerminalStatus { if (value === "completed") return "completed"; - if (value === "cancelled" || value === "canceled") return "cancelled"; + if (value === "cancelled" || value === "canceled" || value === "interrupted") return "cancelled"; if (value === "blocked") return "blocked"; return "failed"; } -function childEnv(options: CodexStdioTurnOptions): NodeJS.ProcessEnv { - const env: NodeJS.ProcessEnv = { ...process.env, ...options.env }; - const codexHome = options.codexHome ?? options.env?.CODEX_HOME; - if (codexHome) env.CODEX_HOME = codexHome; - return env; +function childEnv(options: CodexStdioTurnOptions, codexHome: string): NodeJS.ProcessEnv { + return { + ...process.env, + ...options.env, + CODEX_HOME: codexHome, + CODEX_INTERNAL_ORIGINATOR_OVERRIDE: "agentrun", + }; } -function asResponseRecord(value: unknown): JsonRecord { - return typeof value === "object" && value !== null && !Array.isArray(value) ? value as JsonRecord : {}; +function resolveCodexHome(options: CodexStdioTurnOptions): string { + return options.codexHome ?? options.env?.CODEX_HOME ?? `${options.env?.HOME ?? process.env.HOME ?? ""}/.codex`; +} + +function validateInitializeResponse(value: JsonRecord): void { + const serverInfo = value.serverInfo; + if (serverInfo !== undefined && (typeof serverInfo !== "object" || serverInfo === null || Array.isArray(serverInfo))) { + throw new CodexStdioFailure("backend-response-invalid", "initialize response serverInfo must be an object when present", "response:initialize", { response: value }); + } +} + +function requireResponseRecord(value: unknown, method: string): JsonRecord { + if (typeof value === "object" && value !== null && !Array.isArray(value)) return value as JsonRecord; + throw new CodexStdioFailure("backend-response-invalid", `${method} response result must be an object`, `response:${method}`); +} + +function requireNestedId(value: JsonRecord, method: string, key: string): string { + const id = stringAt(asRecordAt(value, key), "id"); + if (id) return id; + throw new CodexStdioFailure("backend-response-invalid", `${method} response did not include ${key}.id`, `response:${method}`, { response: value }); +} + +function textInput(text: string): JsonValue[] { + return [{ type: "text", text, text_elements: [] }]; +} + +function fileReadable(filePath: string): JsonRecord { + try { + accessSync(filePath, fsConstants.R_OK); + return { ...pathSummary(filePath), readable: true }; + } catch { + return { ...pathSummary(filePath), readable: false }; + } +} + +function pathSummary(value: string): JsonRecord { + const raw = String(value || ""); + const parts = raw.split(/[\\/]+/u).filter(Boolean); + return { + present: raw.trim().length > 0, + absolute: path.isAbsolute(raw), + basename: parts.at(-1) ?? null, + depth: parts.length, + fingerprint: shortHash(raw), + valuePrinted: false, + }; +} + +function runtimeSummary(options: CodexStdioTurnOptions, env: NodeJS.ProcessEnv, codexHome: string): JsonRecord { + return { + command: options.command ?? "codex", + args: options.args ?? defaultCodexArgs, + cwd: pathSummary(options.cwd), + workspace: pathSummary(options.cwd), + codexHome: pathSummary(codexHome), + env: envSummary(env), + valuesPrinted: false, + }; +} + +function envSummary(env: NodeJS.ProcessEnv): JsonRecord { + const keyState: Record = {}; + for (const key of childEnvSummaryKeys) keyState[key] = { present: typeof env[key] === "string" && String(env[key]).length > 0 }; + const secretLikeKeyCount = Object.keys(env).filter((key) => /auth|authorization|api[_-]?key|token|password|secret|credential/iu.test(key)).length; + return { + keyCount: Object.keys(env).length, + trackedKeys: keyState, + secretLikeKeyCount, + valuesPrinted: false, + }; +} + +function closeEvent(closeInfo: CodexStdioCloseInfo): JsonRecord { + return { + code: closeInfo.code, + signal: closeInfo.signal, + failureKind: closeInfo.failureKind, + message: closeInfo.message, + stderrTail: closeInfo.stderrTail.slice(-stderrEventChars), + stderrBytes: closeInfo.stderrBytes, + stderrTruncated: closeInfo.stderrTruncated || closeInfo.stderrTail.length > stderrEventChars, + }; +} + +function terminalFromClose(closeInfo: CodexStdioCloseInfo): { status: TerminalStatus; failureKind: FailureKind; message: string } { + const baseMessage = `codex app-server closed before turn/completed code=${closeInfo.code} signal=${closeInfo.signal}`; + const combined = [closeInfo.message ?? "", closeInfo.stderrTail].filter(Boolean).join("\n"); + const failureKind = closeInfo.failureKind ?? classifyMessageFailureKind(combined, "backend-response-invalid"); + const stderrPreview = closeInfo.stderrTail.trim().length > 0 ? `; stderrTail=${closeInfo.stderrTail.slice(-1000)}` : ""; + return { status: "failed", failureKind, message: redactText(`${baseMessage}${stderrPreview}`) }; +} + +function failureFromRpcError(method: string, value: unknown): CodexStdioFailure { + const error = typeof value === "object" && value !== null ? value as Record : {}; + const message = typeof error.message === "string" ? error.message : JSON.stringify(redactJson(value as JsonValue)); + return new CodexStdioFailure(classifyMessageFailureKind(message, "backend-protocol-error"), `codex app-server ${method} error: ${message}`, `response:${method}`, { method, error: redactJson(value as JsonValue) }); +} + +function spawnFailure(command: string, error: unknown): CodexStdioFailure { + const message = error instanceof Error ? error.message : String(error); + const code = typeof error === "object" && error !== null && "code" in error ? String((error as { code?: unknown }).code ?? "") : ""; + return new CodexStdioFailure("backend-spawn-failed", `failed to start Codex app-server command ${command}: ${message}`, "process:spawn", { command, code }); +} + +function normalizeFailure(error: unknown): CodexStdioFailure { + if (error instanceof CodexStdioFailure) return error; + const message = error instanceof Error ? error.message : String(error); + return new CodexStdioFailure(classifyMessageFailureKind(message, "backend-protocol-error"), message, "codex-stdio"); +} + +function classifyMessageFailureKind(message: string, fallback: FailureKind): FailureKind { + const text = String(message || "").toLowerCase(); + if (/rate.?limit|too many requests|\b429\b/u.test(text)) return "provider-rate-limited"; + if (/\b401\b|\b403\b|unauthori[sz]ed|forbidden|invalid api key|authentication|auth failed|oauth|access token/u.test(text)) return "provider-auth-failed"; + if (/timed out|timeout|idle timeout/u.test(text)) return "backend-timeout"; + if (/invalid json|json parse/u.test(text)) return "backend-json-parse-error"; + return fallback; +} + +function positiveTimeout(value: number): number { + return Number.isFinite(value) && value > 0 ? Math.max(1, Math.floor(value)) : requestTimeoutCapMs; } function asRecordAt(value: JsonRecord, key: string): JsonRecord { @@ -242,5 +550,9 @@ function asRecordAt(value: JsonRecord, key: string): JsonRecord { } function stringAt(value: JsonRecord, key: string): string | null { - return typeof value[key] === "string" ? value[key] : null; + return typeof value[key] === "string" && String(value[key]).length > 0 ? String(value[key]) : null; +} + +function shortHash(value: string): string { + return createHash("sha256").update(value).digest("hex").slice(0, 12); } diff --git a/src/common/redaction.ts b/src/common/redaction.ts index 43f06d1..6ba45b1 100644 --- a/src/common/redaction.ts +++ b/src/common/redaction.ts @@ -1,7 +1,8 @@ export function redactText(value: string): string { return value + .replace(/\b(sk-[A-Za-z0-9_-]{8,}|ghp_[A-Za-z0-9_]{8,}|github_pat_[A-Za-z0-9_]+)\b/gu, "REDACTED") .replace(/(authorization\s*[:=]\s*)(bearer\s+)?[A-Za-z0-9._~+/=-]+/giu, "$1$2REDACTED") - .replace(/((?:api[_-]?key|token|password|secret)\s*[:=]\s*)[A-Za-z0-9._~+/=-]+/giu, "$1REDACTED") + .replace(/((?:api[_-]?key|token|password|secret)\s*["']?\s*[:=]\s*["']?)[A-Za-z0-9._~+/=-]+/giu, "$1REDACTED") .replace(/(postgres(?:ql)?:\/\/[^:\s/@]+:)[^@\s]+(@)/giu, "$1REDACTED$2") .replace(/(https?:\/\/[^:\s/@]+:)[^@\s]+(@)/giu, "$1REDACTED$2"); } diff --git a/src/common/types.ts b/src/common/types.ts index 1ef153e..195f4c6 100644 --- a/src/common/types.ts +++ b/src/common/types.ts @@ -9,6 +9,9 @@ export type FailureKind = | "runner-lease-conflict" | "backend-failed" | "backend-protocol-error" + | "backend-spawn-failed" + | "backend-json-parse-error" + | "backend-response-invalid" | "backend-timeout" | "provider-auth-failed" | "provider-rate-limited" diff --git a/src/selftest/fake-codex-app-server.ts b/src/selftest/fake-codex-app-server.ts index 316a9b7..2f14559 100644 --- a/src/selftest/fake-codex-app-server.ts +++ b/src/selftest/fake-codex-app-server.ts @@ -1,6 +1,7 @@ import * as readline from "node:readline"; const rl = readline.createInterface({ input: process.stdin, crlfDelay: Infinity }); +const mode = process.env.AGENTRUN_FAKE_CODEX_MODE ?? "success"; let threadCounter = 0; let turnCounter = 0; @@ -9,6 +10,10 @@ for await (const line of rl) { if (trimmed.length === 0) continue; const message = JSON.parse(trimmed) as { id?: number; method?: string; params?: Record }; if (message.method === "initialize") { + if (mode === "invalid-json") { + process.stdout.write('{"token":"test-token-material"\n'); + process.exit(0); + } respond(message.id, { serverInfo: { name: "fake-codex-app-server", version: "self-test" } }); continue; } @@ -26,6 +31,17 @@ for await (const line of rl) { continue; } if (message.method === "turn/start") { + if (mode === "missing-turn-result") { + respond(message.id, {}); + continue; + } + if (mode === "missing-terminal") { + turnCounter += 1; + const turn = { id: `turn_selftest_${turnCounter}`, status: "running" }; + notify("turn/started", { turn }); + respond(message.id, { turn }); + continue; + } turnCounter += 1; const turn = { id: `turn_selftest_${turnCounter}`, status: "completed" }; notify("turn/started", { turn }); diff --git a/src/selftest/run.ts b/src/selftest/run.ts index 655fd90..af3fbd2 100644 --- a/src/selftest/run.ts +++ b/src/selftest/run.ts @@ -7,6 +7,7 @@ import { startManagerServer } from "../mgr/server.js"; import { ManagerClient } from "../mgr/client.js"; import { runOnce } from "../runner/run-once.js"; import { redactText } from "../common/redaction.js"; +import type { FailureKind, JsonRecord, TerminalStatus } from "../common/types.js"; const root = path.resolve(path.dirname(fileURLToPath(import.meta.url)), "../.."); const tmp = await mkdtemp(path.join(os.tmpdir(), "agentrun-selftest-")); @@ -21,45 +22,133 @@ try { await writeFile(path.join(workspace, "README.md"), "self-test workspace\n"); assert.equal(redactText("Authorization: Bearer abc123"), "Authorization: Bearer REDACTED"); + assert.equal(redactText('{"token":"test-token-material"}').includes("test-token-material"), false); const server = await startManagerServer({ port: 0, host: "127.0.0.1", sourceCommit: "self-test" }); try { const client = new ManagerClient(server.baseUrl); const health = await client.get("/health/readiness") as { database?: { adapter?: string } }; assert.equal(health.database?.adapter, "memory-self-test"); - const run = await client.post("/api/v1/runs", { - tenantId: "unidesk", - projectId: "pikasTech/unidesk", - workspaceRef: { kind: "host-path", path: workspace }, - providerId: "G14", - backendProfile: "codex", - executionPolicy: { - sandbox: "workspace-write", - approval: "never", - timeoutMs: 15_000, - network: "default", - secretScope: { allowCredentialEcho: false, providerCredentials: [{ profile: "codex", secretRef: { name: "agentrun-v01-provider-codex", keys: ["auth.json", "config.toml"], mountPath: codexHome } }] }, - }, - traceSink: null, - }) as { id: string }; - const command = await client.post(`/api/v1/runs/${run.id}/commands`, { type: "turn", payload: { prompt: "hello" }, idempotencyKey: "selftest-turn" }) as { id: string }; - const duplicate = await client.post(`/api/v1/runs/${run.id}/commands`, { type: "turn", payload: { prompt: "hello" }, idempotencyKey: "selftest-turn" }) as { id: string }; - assert.equal(duplicate.id, command.id); const fakePath = path.join(root, "src/selftest/fake-codex-app-server.ts"); const fakeCommand = process.env.AGENTRUN_SELFTEST_CODEX_COMMAND ?? process.execPath; const fakeArgs = process.env.AGENTRUN_SELFTEST_CODEX_ARGS ? JSON.parse(process.env.AGENTRUN_SELFTEST_CODEX_ARGS) as string[] : [fakePath]; - const result = await runOnce({ managerUrl: server.baseUrl, runId: run.id, codexCommand: fakeCommand, codexArgs: fakeArgs, codexHome, env: { CODEX_HOME: codexHome } }); + + const happy = await createRunWithCommand(client, workspace, codexHome, "hello", "selftest-turn", 15_000); + const result = await runOnce({ managerUrl: server.baseUrl, runId: happy.runId, codexCommand: fakeCommand, codexArgs: fakeArgs, codexHome, env: { CODEX_HOME: codexHome } }); assert.equal(result.terminalStatus, "completed"); - const events = await client.get(`/api/v1/runs/${run.id}/events?afterSeq=0&limit=100`) as { items?: Array<{ type: string; payload: unknown }> }; + const events = await client.get(`/api/v1/runs/${happy.runId}/events?afterSeq=0&limit=100`) as { items?: Array<{ type: string; payload: unknown }> }; assert.ok(events.items?.some((event) => event.type === "assistant_message")); - assert.equal(JSON.stringify(events).includes("test-token-material"), false); - assert.equal(JSON.stringify(events).includes("Bearer test-token"), false); - const finalRun = await client.get(`/api/v1/runs/${run.id}`) as { terminalStatus?: string }; + assertNoSecretLeak(events); + const finalRun = await client.get(`/api/v1/runs/${happy.runId}`) as { terminalStatus?: string }; assert.equal(finalRun.terminalStatus, "completed"); - console.log(JSON.stringify({ ok: true, tests: ["manager-memory-lifecycle", "codex-stdio-fake-turn", "redaction"], runId: run.id })); + + await runFailureCase({ + client, + managerUrl: server.baseUrl, + workspace, + codexHome, + fakeCommand, + fakeArgs, + mode: "missing-turn-result", + expectedStatus: "failed", + expectedFailureKind: "backend-response-invalid", + }); + await runFailureCase({ + client, + managerUrl: server.baseUrl, + workspace, + codexHome, + fakeCommand, + fakeArgs, + mode: "invalid-json", + expectedStatus: "failed", + expectedFailureKind: "backend-json-parse-error", + }); + await runFailureCase({ + client, + managerUrl: server.baseUrl, + workspace, + codexHome, + fakeCommand, + fakeArgs, + mode: "missing-terminal", + expectedStatus: "failed", + expectedFailureKind: "backend-timeout", + timeoutMs: 500, + }); + await runSpawnFailureCase({ + client, + managerUrl: server.baseUrl, + workspace, + codexHome, + }); + + console.log(JSON.stringify({ ok: true, tests: ["manager-memory-lifecycle", "codex-stdio-fake-turn", "codex-stdio-missing-turn-result", "codex-stdio-invalid-json", "codex-stdio-timeout", "codex-stdio-spawn-failure", "redaction"], runId: happy.runId })); } finally { await new Promise((resolve) => server.server.close(() => resolve())); } } finally { await rm(tmp, { recursive: true, force: true }); } + +async function createRunWithCommand(client: ManagerClient, workspace: string, codexHome: string, prompt: string, idempotencyKey: string, timeoutMs: number): Promise<{ runId: string; commandId: string }> { + const run = await client.post("/api/v1/runs", { + tenantId: "unidesk", + projectId: "pikasTech/unidesk", + workspaceRef: { kind: "host-path", path: workspace }, + providerId: "G14", + backendProfile: "codex", + executionPolicy: { + sandbox: "workspace-write", + approval: "never", + timeoutMs, + network: "default", + secretScope: { allowCredentialEcho: false, providerCredentials: [{ profile: "codex", secretRef: { name: "agentrun-v01-provider-codex", keys: ["auth.json", "config.toml"], mountPath: codexHome } }] }, + }, + traceSink: null, + }) as { id: string }; + const command = await client.post(`/api/v1/runs/${run.id}/commands`, { type: "turn", payload: { prompt }, idempotencyKey }) as { id: string }; + const duplicate = await client.post(`/api/v1/runs/${run.id}/commands`, { type: "turn", payload: { prompt }, idempotencyKey }) as { id: string }; + assert.equal(duplicate.id, command.id); + return { runId: run.id, commandId: command.id }; +} + +async function runFailureCase(options: { client: ManagerClient; managerUrl: string; workspace: string; codexHome: string; fakeCommand: string; fakeArgs: string[]; mode: string; expectedStatus: TerminalStatus; expectedFailureKind: FailureKind; timeoutMs?: number }): Promise { + const item = await createRunWithCommand(options.client, options.workspace, options.codexHome, `failure ${options.mode}`, `selftest-${options.mode}`, options.timeoutMs ?? 3_000); + const result = await runOnce({ + managerUrl: options.managerUrl, + runId: item.runId, + codexCommand: options.fakeCommand, + codexArgs: options.fakeArgs, + codexHome: options.codexHome, + env: { CODEX_HOME: options.codexHome, AGENTRUN_FAKE_CODEX_MODE: options.mode }, + }) as JsonRecord; + assert.equal(result.terminalStatus, options.expectedStatus, options.mode); + assert.equal(result.failureKind, options.expectedFailureKind, options.mode); + const events = await options.client.get(`/api/v1/runs/${item.runId}/events?afterSeq=0&limit=100`) as { items?: Array<{ type: string; payload: unknown }> }; + assert.ok(events.items?.some((event) => event.type === "error"), options.mode); + assertNoSecretLeak(events); +} + +async function runSpawnFailureCase(options: { client: ManagerClient; managerUrl: string; workspace: string; codexHome: string }): Promise { + const item = await createRunWithCommand(options.client, options.workspace, options.codexHome, "failure spawn", "selftest-spawn-failure", 3_000); + const result = await runOnce({ + managerUrl: options.managerUrl, + runId: item.runId, + codexCommand: path.join(os.tmpdir(), `agentrun-missing-codex-${process.pid}`), + codexArgs: [], + codexHome: options.codexHome, + env: { CODEX_HOME: options.codexHome }, + }) as JsonRecord; + assert.equal(result.terminalStatus, "failed", "spawn failure"); + assert.equal(result.failureKind, "backend-spawn-failed", "spawn failure"); + const events = await options.client.get(`/api/v1/runs/${item.runId}/events?afterSeq=0&limit=100`) as { items?: Array<{ type: string; payload: unknown }> }; + assert.ok(events.items?.some((event) => event.type === "error"), "spawn failure"); + assertNoSecretLeak(events); +} + +function assertNoSecretLeak(value: unknown): void { + const text = JSON.stringify(value); + assert.equal(text.includes("test-token-material"), false); + assert.equal(text.includes("Bearer test-token"), false); +}