硬化 Codex stdio backend 错误观测

This commit is contained in:
Codex
2026-05-29 11:47:03 +08:00
parent a240122039
commit 843112ebff
8 changed files with 556 additions and 130 deletions
@@ -52,6 +52,9 @@ Adapter 必须把 backend 错误映射为稳定 failureKind
| `provider-auth-failed` | provider credential 或 auth file 无效、上游返回 401/403。 |
| `provider-rate-limited` | 上游限流或 quota 错误。 |
| `backend-protocol-error` | backend 输出无法解析、协议字段缺失。 |
| `backend-json-parse-error` | backend stdout 不是合法 JSON-RPC 行。 |
| `backend-response-invalid` | backend JSON-RPC response/terminal notification 缺少必需字段。 |
| `backend-spawn-failed` | backend app-server 进程无法启动。 |
| `backend-failed` | backend 进程非零退出或 terminal error。 |
| `backend-timeout` | executionPolicy timeout 触发。 |
| `cancelled` | interrupt/cancel 生效。 |
+2 -1
View File
@@ -96,6 +96,7 @@ Run 的 `executionPolicy.secretScope` 应引用 `agentrun-v01-provider-codex`
| --- | --- | --- |
| Codex backend 规格 | 已定义 | 本文为 v0.1 第一真实 backend 权威。 |
| Codex Secret projection | 未实现 | 需要后续 Kubernetes Secret 和 runner/backend manifest。 |
| Codex adapter | 实现 | 需要后续代码实现。 |
| Codex adapter | 已部分实现 | 当前代码实现受控 `codex app-server --listen stdio://``initialize`/`thread/start`/`thread/resume`/`turn/start` response 校验、stderr 有界诊断、spawn/JSON parse/response invalid/timeout failureKind 和 fake app-server 自测试。 |
| 错误可观测与脱敏 | 已部分实现 | child env、cwd、workspace 和 Codex home 只输出摘要;stderr tail 有界且标记截断;事件和 failure 统一走 redaction。 |
| 真实 provider turn | 未实现 | 综合联调必须真实完成后才能发布通过。 |
| hostPath `~/.codex` | 不采用 | 只能通过 Kubernetes Secret projection 注入。 |
+1
View File
@@ -21,6 +21,7 @@ export async function runBackendTurn(run: RunRecord, command: CommandRecord, opt
timeoutMs: run.executionPolicy.timeoutMs,
};
if (typeof command.payload.model === "string") turnOptions.model = command.payload.model;
if (typeof command.payload.threadId === "string") turnOptions.threadId = command.payload.threadId;
if (options.codexCommand) turnOptions.command = options.codexCommand;
if (options.codexArgs) turnOptions.args = options.codexArgs;
if (options.env) turnOptions.env = options.env;
+416 -104
View File
@@ -1,13 +1,40 @@
import { spawn, type ChildProcessWithoutNullStreams } from "node:child_process";
import { createHash } from "node:crypto";
import { accessSync, constants as fsConstants } from "node:fs";
import path from "node:path";
import * as readline from "node:readline";
import type { BackendEvent, BackendTurnResult, FailureKind, JsonRecord, JsonValue, TerminalStatus } from "../common/types.js";
import { redactJson, redactText } from "../common/redaction.js";
const codexProtocol = "codex-app-server-jsonrpc-stdio";
const defaultCodexArgs = ["app-server", "--listen", "stdio://"];
const stderrBufferBytes = 64_000;
const stderrEventChars = 4_000;
const requestTimeoutCapMs = 30_000;
const childEnvSummaryKeys = [
"CODEX_HOME",
"HOME",
"PATH",
"HTTP_PROXY",
"HTTPS_PROXY",
"ALL_PROXY",
"NO_PROXY",
"http_proxy",
"https_proxy",
"all_proxy",
"no_proxy",
"OPENAI_API_KEY",
"CODEX_API_KEY",
"GITHUB_TOKEN",
"GH_TOKEN",
];
export interface CodexStdioTurnOptions {
prompt: string;
cwd: string;
model?: string;
threadId?: string;
approvalPolicy: string;
sandbox: string;
timeoutMs: number;
@@ -18,48 +45,87 @@ export interface CodexStdioTurnOptions {
}
interface PendingRequest {
method: string;
timer: NodeJS.Timeout;
resolve: (value: unknown) => void;
reject: (error: Error) => void;
}
interface CodexStdioCloseInfo extends JsonRecord {
code: number | null;
signal: string | null;
stderrTail: string;
stderrBytes: number;
stderrTruncated: boolean;
failureKind: FailureKind | null;
message: string | null;
}
class CodexStdioFailure extends Error {
readonly failureKind: FailureKind;
readonly phase: string;
readonly details: JsonRecord;
constructor(failureKind: FailureKind, message: string, phase: string, details: JsonRecord = {}) {
super(redactText(message));
this.name = "CodexStdioFailure";
this.failureKind = failureKind;
this.phase = phase;
this.details = redactJson(details);
}
}
export class CodexStdioClient {
private readonly child: ChildProcessWithoutNullStreams;
private readonly pending = new Map<number, PendingRequest>();
private readonly stderrChunks: Buffer[] = [];
private stderrTailBuffer = Buffer.alloc(0);
private stderrBytes = 0;
private nextId = 1;
private closed = false;
readonly closedPromise: Promise<{ code: number | null; signal: string | null; stderrTail: string }>;
private closeResolve!: (value: { code: number | null; signal: string | null; stderrTail: string }) => void;
private closeFailure: CodexStdioFailure | null = null;
readonly closedPromise: Promise<CodexStdioCloseInfo>;
private closeResolve!: (value: CodexStdioCloseInfo) => void;
constructor(options: { command?: string; args?: string[]; cwd: string; env?: NodeJS.ProcessEnv; onNotification: (message: JsonRecord) => void }) {
this.closedPromise = new Promise((resolve) => { this.closeResolve = resolve; });
this.child = spawn(options.command ?? "codex", options.args ?? ["app-server", "--listen", "stdio://"], {
cwd: options.cwd,
env: options.env ?? process.env,
stdio: "pipe",
});
this.child.stderr.on("data", (chunk: Buffer) => {
this.stderrChunks.push(chunk);
while (Buffer.concat(this.stderrChunks).length > 64_000) this.stderrChunks.shift();
});
const command = options.command ?? "codex";
const args = options.args ?? defaultCodexArgs;
try {
this.child = spawn(command, args, {
cwd: options.cwd,
env: options.env ?? process.env,
stdio: "pipe",
});
} catch (error) {
throw spawnFailure(command, error);
}
this.child.stderr.on("data", (chunk: Buffer) => this.appendStderr(chunk));
const rl = readline.createInterface({ input: this.child.stdout, crlfDelay: Infinity });
void this.readLines(rl, options.onNotification);
this.child.on("close", (code, signal) => this.handleClose(code, signal));
this.child.on("error", (error) => this.handleClose(127, error.message));
this.child.on("error", (error) => this.handleClose(127, null, spawnFailure(command, error)));
}
request(method: string, params: JsonRecord): Promise<unknown> {
if (this.closed) return Promise.reject(new Error("codex app-server is closed"));
request(method: string, params: JsonRecord, timeoutMs = requestTimeoutCapMs): Promise<unknown> {
if (this.closed) return Promise.reject(this.closeFailure ?? new CodexStdioFailure("backend-failed", "codex app-server is closed", `request:${method}`));
const id = this.nextId++;
const message = { id, method, params };
const effectiveTimeoutMs = positiveTimeout(timeoutMs);
return new Promise((resolve, reject) => {
this.pending.set(id, { resolve, reject });
this.child.stdin.write(`${JSON.stringify(message)}\n`);
const timer = setTimeout(() => {
this.rejectRequest(id, new CodexStdioFailure("backend-timeout", `Codex stdio request ${method} timed out after ${effectiveTimeoutMs}ms`, `request:${method}`, { method, timeoutMs: effectiveTimeoutMs }));
}, effectiveTimeoutMs);
this.pending.set(id, { method, timer, resolve, reject });
this.child.stdin.write(`${JSON.stringify(message)}\n`, "utf8", (error: Error | null | undefined) => {
if (!error) return;
this.rejectRequest(id, new CodexStdioFailure("backend-failed", `failed to write Codex stdio request ${method}: ${error.message}`, `request:${method}`, { method }));
});
});
}
notify(method: string, params: JsonRecord = {}): void {
if (!this.closed) this.child.stdin.write(`${JSON.stringify({ method, params })}\n`);
if (this.closed) return;
this.child.stdin.write(`${JSON.stringify({ method, params })}\n`, "utf8", () => undefined);
}
stop(): void {
@@ -70,33 +136,65 @@ export class CodexStdioClient {
}, 1500).unref?.();
}
private appendStderr(chunk: Buffer): void {
this.stderrBytes += chunk.byteLength;
const next = Buffer.concat([this.stderrTailBuffer, chunk]);
this.stderrTailBuffer = next.byteLength > stderrBufferBytes ? next.subarray(next.byteLength - stderrBufferBytes) : next;
}
private async readLines(rl: readline.Interface, onNotification: (message: JsonRecord) => void): Promise<void> {
try {
for await (const line of rl) {
const trimmed = String(line).trim();
if (trimmed.length === 0) continue;
const message = JSON.parse(trimmed) as JsonRecord;
const id = typeof message.id === "number" ? message.id : null;
const method = typeof message.method === "string" ? message.method : null;
if (id !== null && method === null) {
const pending = this.pending.get(id);
if (!pending) continue;
this.pending.delete(id);
if (message.error !== undefined) pending.reject(new Error(JSON.stringify(redactJson(message.error))));
else pending.resolve(message.result);
continue;
let message: JsonRecord;
try {
message = JSON.parse(trimmed) as JsonRecord;
} catch {
this.handleProtocolFailure(new CodexStdioFailure("backend-json-parse-error", "codex app-server emitted invalid JSON on stdout", "stdout:parse", { linePreview: redactText(trimmed.slice(0, 800)), lineChars: trimmed.length }));
break;
}
if (id !== null && method !== null) {
this.handleServerRequest(id, method);
continue;
}
if (method !== null) onNotification(message);
this.handleMessage(message, onNotification);
}
} catch (error) {
this.rejectAll(error instanceof Error ? error : new Error(String(error)));
this.handleProtocolFailure(new CodexStdioFailure("backend-protocol-error", error instanceof Error ? error.message : String(error), "stdout:read"));
}
}
private handleMessage(message: JsonRecord, onNotification: (message: JsonRecord) => void): void {
const id = typeof message.id === "number" ? message.id : null;
const method = typeof message.method === "string" ? message.method : null;
if (id !== null && method === null) {
this.handleResponse(id, message);
return;
}
if (id !== null && method !== null) {
this.handleServerRequest(id, method);
return;
}
if (method !== null) {
onNotification(message);
return;
}
this.handleProtocolFailure(new CodexStdioFailure("backend-response-invalid", "codex app-server message had neither JSON-RPC id nor method", "stdout:message", { message }));
}
private handleResponse(id: number, message: JsonRecord): void {
const pending = this.pending.get(id);
if (!pending) return;
this.pending.delete(id);
clearTimeout(pending.timer);
if (message.error !== undefined) {
pending.reject(failureFromRpcError(pending.method, message.error));
return;
}
if (!("result" in message)) {
pending.reject(new CodexStdioFailure("backend-response-invalid", `codex app-server response for ${pending.method} omitted result and error`, `response:${pending.method}`, { method: pending.method }));
return;
}
pending.resolve(message.result);
}
private handleServerRequest(id: number, method: string): void {
if (method === "item/commandExecution/requestApproval" || method === "item/fileChange/requestApproval") {
this.child.stdin.write(`${JSON.stringify({ id, result: { decision: "decline" } })}\n`);
@@ -105,92 +203,172 @@ export class CodexStdioClient {
this.child.stdin.write(`${JSON.stringify({ id, error: { code: -32601, message: `Unsupported client-side request: ${method}` } })}\n`);
}
private rejectRequest(id: number, error: CodexStdioFailure): void {
const pending = this.pending.get(id);
if (!pending) return;
this.pending.delete(id);
clearTimeout(pending.timer);
pending.reject(error);
}
private rejectAll(error: Error): void {
for (const pending of this.pending.values()) pending.reject(error);
for (const pending of this.pending.values()) {
clearTimeout(pending.timer);
pending.reject(error);
}
this.pending.clear();
}
private handleClose(code: number | null, signal: string | null): void {
private handleProtocolFailure(error: CodexStdioFailure): void {
if (this.closed) return;
this.closeFailure = error;
this.rejectAll(error);
this.stop();
}
private handleClose(code: number | null, signal: string | null, failure: CodexStdioFailure | null = null): void {
if (this.closed) return;
this.closed = true;
const stderrTail = redactText(Buffer.concat(this.stderrChunks).toString("utf8").slice(-8000));
this.rejectAll(new Error(`codex app-server closed code=${code} signal=${signal}`));
this.closeResolve({ code, signal, stderrTail });
if (failure) this.closeFailure = failure;
const stderr = this.stderrInfo();
const closeInfo: CodexStdioCloseInfo = {
code,
signal,
stderrTail: stderr.stderrTail,
stderrBytes: this.stderrBytes,
stderrTruncated: stderr.stderrTruncated,
failureKind: this.closeFailure?.failureKind ?? null,
message: this.closeFailure?.message ?? null,
};
this.rejectAll(this.closeFailure ?? new CodexStdioFailure("backend-failed", `codex app-server closed code=${code} signal=${signal}`, "process:close", closeInfo));
this.closeResolve(closeInfo);
}
private stderrInfo(): { stderrTail: string; stderrTruncated: boolean } {
const buffered = this.stderrTailBuffer.toString("utf8");
const tail = buffered.slice(-8000);
return {
stderrTail: redactText(tail),
stderrTruncated: this.stderrBytes > this.stderrTailBuffer.byteLength || buffered.length > tail.length,
};
}
}
export async function runCodexStdioTurn(options: CodexStdioTurnOptions): Promise<BackendTurnResult> {
const secretFailure = codexHomeReadiness(options.codexHome ?? options.env?.CODEX_HOME ?? `${options.env?.HOME ?? process.env.HOME ?? ""}/.codex`);
const codexHome = resolveCodexHome(options);
const secretFailure = codexHomeReadiness(codexHome);
if (secretFailure) return secretFailure;
const events: BackendEvent[] = [{ type: "backend_status", payload: { phase: "codex-app-server-starting", protocol: "codex-app-server-jsonrpc-stdio" } }];
const env = childEnv(options, codexHome);
const events: BackendEvent[] = [{
type: "backend_status",
payload: {
phase: "codex-app-server-starting",
protocol: codexProtocol,
runtime: runtimeSummary(options, env, codexHome),
},
}];
let assistantText = "";
let threadId: string | undefined;
let threadId: string | undefined = options.threadId;
let turnId: string | undefined;
let terminal: { status: TerminalStatus; failureKind: FailureKind | null; message: string | null } | null = null;
let terminalResolve!: () => void;
const terminalPromise = new Promise<void>((resolve) => { terminalResolve = resolve; });
const clientOptions: ConstructorParameters<typeof CodexStdioClient>[0] = {
cwd: options.cwd,
env: childEnv(options),
onNotification: (message) => {
const normalized = normalizeCodexNotification(message);
if (normalized.threadId) threadId = normalized.threadId;
if (normalized.turnId) turnId = normalized.turnId;
if (normalized.assistantDelta) assistantText += normalized.assistantDelta;
events.push(...normalized.events);
if (normalized.terminal) {
terminal = normalized.terminal;
terminalResolve();
}
},
};
if (options.command) clientOptions.command = options.command;
if (options.args) clientOptions.args = options.args;
const client = new CodexStdioClient(clientOptions);
let client: CodexStdioClient | null = null;
const requestTimeoutMs = Math.min(positiveTimeout(options.timeoutMs), requestTimeoutCapMs);
const timeout = setTimeout(() => {
if (!terminal) {
terminal = { status: "failed", failureKind: "backend-timeout", message: `codex stdio turn timed out after ${options.timeoutMs}ms` };
events.push({ type: "error", payload: terminal });
client.stop();
terminalResolve();
}
}, options.timeoutMs);
if (terminal) return;
terminal = { status: "failed", failureKind: "backend-timeout", message: `codex stdio turn timed out after ${options.timeoutMs}ms` };
events.push({ type: "error", payload: { failureKind: terminal.failureKind, message: terminal.message, phase: "turn:timeout" } });
client?.stop();
terminalResolve();
}, positiveTimeout(options.timeoutMs));
try {
await client.request("initialize", { clientInfo: { name: "agentrun", title: "AgentRun", version: "0.1.0" }, capabilities: { experimentalApi: true } });
const threadResponse = asResponseRecord(await client.request("thread/start", { model: options.model ?? "default", cwd: options.cwd, approvalPolicy: options.approvalPolicy, sandbox: options.sandbox, serviceName: "agentrun" }));
threadId = stringAt(asRecordAt(threadResponse, "thread"), "id") ?? threadId;
const turnResponse = asResponseRecord(await client.request("turn/start", { threadId: threadId ?? "", input: [{ type: "text", text: options.prompt, text_elements: [] }], cwd: options.cwd, approvalPolicy: options.approvalPolicy, model: options.model ?? "default" }));
turnId = stringAt(asRecordAt(turnResponse, "turn"), "id") ?? turnId;
await Promise.race([terminalPromise, client.closedPromise]);
if (!terminal) terminal = { status: "failed", failureKind: "backend-protocol-error", message: "codex app-server closed before turn/completed" };
const clientOptions: ConstructorParameters<typeof CodexStdioClient>[0] = {
cwd: options.cwd,
env,
onNotification: (message) => {
const normalized = normalizeCodexNotification(message);
if (normalized.threadId) threadId = normalized.threadId;
if (normalized.turnId) turnId = normalized.turnId;
if (normalized.assistantDelta) assistantText += normalized.assistantDelta;
events.push(...normalized.events);
if (normalized.terminal && !terminal) {
terminal = normalized.terminal;
terminalResolve();
}
},
};
if (options.command) clientOptions.command = options.command;
if (options.args) clientOptions.args = options.args;
client = new CodexStdioClient(clientOptions);
const initializeResult = requireResponseRecord(await client.request("initialize", { clientInfo: { name: "agentrun", title: "AgentRun", version: "0.1.0" }, capabilities: { experimentalApi: true } }, requestTimeoutMs), "initialize");
validateInitializeResponse(initializeResult);
client.notify("initialized", {});
events.push({ type: "backend_status", payload: { phase: "initialize:completed", protocol: codexProtocol } });
const threadMethod = options.threadId ? "thread/resume" : "thread/start";
const threadParams: JsonRecord = options.threadId
? { threadId: options.threadId, model: options.model ?? "default", cwd: options.cwd, approvalPolicy: options.approvalPolicy, sandbox: options.sandbox }
: { model: options.model ?? "default", cwd: options.cwd, approvalPolicy: options.approvalPolicy, sandbox: options.sandbox, serviceName: "agentrun" };
const threadResponse = requireResponseRecord(await client.request(threadMethod, threadParams, requestTimeoutMs), threadMethod);
threadId = requireNestedId(threadResponse, threadMethod, "thread");
events.push({ type: "backend_status", payload: { phase: `${threadMethod}:completed`, threadId } });
const turnResponse = requireResponseRecord(await client.request("turn/start", { threadId, input: textInput(options.prompt), cwd: options.cwd, approvalPolicy: options.approvalPolicy, model: options.model ?? "default" }, requestTimeoutMs), "turn/start");
turnId = requireNestedId(turnResponse, "turn/start", "turn");
events.push({ type: "backend_status", payload: { phase: "turn/start:completed", turnId } });
const race = await Promise.race([
terminalPromise.then(() => ({ kind: "terminal" as const })),
client.closedPromise.then((closeInfo) => ({ kind: "closed" as const, closeInfo })),
]);
if (race.kind === "closed" && !terminal) {
terminal = terminalFromClose(race.closeInfo);
events.push({ type: "error", payload: { failureKind: terminal.failureKind, message: terminal.message, phase: "transport:closed-before-terminal", appServerExit: closeEvent(race.closeInfo) } });
}
if (!terminal) terminal = { status: "failed", failureKind: "backend-response-invalid", message: "codex app-server did not emit turn/completed" };
} catch (error) {
terminal = { status: "failed", failureKind: "backend-protocol-error", message: error instanceof Error ? error.message : String(error) };
events.push({ type: "error", payload: { failureKind: terminal.failureKind, message: terminal.message } });
if (!terminal) {
const failure = normalizeFailure(error);
terminal = { status: failure.failureKind === "secret-unavailable" ? "blocked" : "failed", failureKind: failure.failureKind, message: failure.message };
events.push({ type: "error", payload: { failureKind: failure.failureKind, message: failure.message, phase: failure.phase, details: failure.details } });
}
} finally {
clearTimeout(timeout);
client.stop();
if (client) {
client.stop();
const closeInfo = await client.closedPromise;
events.push({ type: "backend_status", payload: { phase: "codex-app-server-closed", appServerExit: closeEvent(closeInfo) } });
}
}
if (!terminal) terminal = { status: "failed", failureKind: "backend-response-invalid", message: "codex app-server finished without terminal status" };
if (assistantText.trim().length > 0) events.push({ type: "assistant_message", payload: { text: assistantText } });
events.push({ type: "terminal_status", payload: { terminalStatus: terminal.status, failureKind: terminal.failureKind, message: terminal.message } });
return { terminalStatus: terminal.status, failureKind: terminal.failureKind, failureMessage: terminal.message, events: events.map((event) => ({ ...event, payload: redactJson(event.payload) })), ...(threadId ? { threadId } : {}), ...(turnId ? { turnId } : {}) };
}
function codexHomeReadiness(codexHome: string): BackendTurnResult | null {
try {
accessSync(`${codexHome}/auth.json`, fsConstants.R_OK);
accessSync(`${codexHome}/config.toml`, fsConstants.R_OK);
return null;
} catch {
return {
terminalStatus: "blocked",
failureKind: "secret-unavailable",
failureMessage: "Codex auth.json or config.toml projection is not readable",
events: [
{ type: "error", payload: { failureKind: "secret-unavailable", credentialSource: { codexHome, valuesPrinted: false } } },
{ type: "terminal_status", payload: { terminalStatus: "blocked", failureKind: "secret-unavailable" } },
],
};
}
const auth = fileReadable(`${codexHome}/auth.json`);
const config = fileReadable(`${codexHome}/config.toml`);
if (auth.readable && config.readable) return null;
const payload = {
failureKind: "secret-unavailable",
projection: {
codexHome: pathSummary(codexHome),
authJson: auth,
configToml: config,
valuesPrinted: false,
},
} satisfies JsonRecord;
return {
terminalStatus: "blocked",
failureKind: "secret-unavailable",
failureMessage: "Codex auth.json or config.toml projection is not readable",
events: [
{ type: "error", payload },
{ type: "terminal_status", payload: { terminalStatus: "blocked", failureKind: "secret-unavailable" } },
],
};
}
function normalizeCodexNotification(message: JsonRecord): { events: BackendEvent[]; assistantDelta?: string; threadId?: string; turnId?: string; terminal?: { status: TerminalStatus; failureKind: FailureKind | null; message: string | null } } {
@@ -207,33 +385,163 @@ function normalizeCodexNotification(message: JsonRecord): { events: BackendEvent
if (method === "item/agentMessage/delta") return { events: [], assistantDelta: typeof params.delta === "string" ? params.delta : "" };
if (method === "item/commandExecution/outputDelta") return { events: [{ type: "command_output", payload: { stream: "stdout", text: typeof params.delta === "string" ? params.delta : "" } }] };
if (method === "item/started" || method === "item/completed") return { events: [{ type: "tool_call", payload: { method, item: redactJson(asRecordAt(params, "item")) } }] };
if (method === "error") return { events: [{ type: "error", payload: { failureKind: "backend-failed", error: redactJson(params.error ?? params) } }] };
if (method === "error") {
const error = asRecordAt(params, "error");
const messageText = typeof error.message === "string" ? error.message : "Codex app-server error";
const failureKind = classifyMessageFailureKind(messageText, "backend-failed");
const terminal = params.willRetry === true ? undefined : { status: "failed" as const, failureKind, message: redactText(messageText) };
return { events: [{ type: "error", payload: { failureKind, error: redactJson(error), willRetry: params.willRetry === true } }], ...(terminal ? { terminal } : {}) };
}
if (method === "turn/completed") {
const turn = asRecordAt(params, "turn");
if (typeof turn.status !== "string") {
return { events: [{ type: "error", payload: { failureKind: "backend-response-invalid", message: "turn/completed notification omitted turn.status" } }], terminal: { status: "failed", failureKind: "backend-response-invalid", message: "turn/completed notification omitted turn.status" } };
}
const status = terminalStatusFromValue(turn.status);
const error = asRecordAt(turn, "error");
const messageText = typeof error.message === "string" ? error.message : null;
return { events: [{ type: "backend_status", payload: { phase: method, terminalStatus: status } }], terminal: { status, failureKind: status === "completed" ? null : "backend-failed", message: messageText } };
const messageText = typeof error.message === "string" ? redactText(error.message) : null;
return { events: [{ type: "backend_status", payload: { phase: method, terminalStatus: status } }], terminal: { status, failureKind: status === "completed" ? null : classifyMessageFailureKind(messageText ?? turn.status, "backend-failed"), message: messageText } };
}
return { events: [{ type: "backend_status", payload: { phase: method } }] };
}
function terminalStatusFromValue(value: unknown): TerminalStatus {
if (value === "completed") return "completed";
if (value === "cancelled" || value === "canceled") return "cancelled";
if (value === "cancelled" || value === "canceled" || value === "interrupted") return "cancelled";
if (value === "blocked") return "blocked";
return "failed";
}
function childEnv(options: CodexStdioTurnOptions): NodeJS.ProcessEnv {
const env: NodeJS.ProcessEnv = { ...process.env, ...options.env };
const codexHome = options.codexHome ?? options.env?.CODEX_HOME;
if (codexHome) env.CODEX_HOME = codexHome;
return env;
function childEnv(options: CodexStdioTurnOptions, codexHome: string): NodeJS.ProcessEnv {
return {
...process.env,
...options.env,
CODEX_HOME: codexHome,
CODEX_INTERNAL_ORIGINATOR_OVERRIDE: "agentrun",
};
}
function asResponseRecord(value: unknown): JsonRecord {
return typeof value === "object" && value !== null && !Array.isArray(value) ? value as JsonRecord : {};
function resolveCodexHome(options: CodexStdioTurnOptions): string {
return options.codexHome ?? options.env?.CODEX_HOME ?? `${options.env?.HOME ?? process.env.HOME ?? ""}/.codex`;
}
function validateInitializeResponse(value: JsonRecord): void {
const serverInfo = value.serverInfo;
if (serverInfo !== undefined && (typeof serverInfo !== "object" || serverInfo === null || Array.isArray(serverInfo))) {
throw new CodexStdioFailure("backend-response-invalid", "initialize response serverInfo must be an object when present", "response:initialize", { response: value });
}
}
function requireResponseRecord(value: unknown, method: string): JsonRecord {
if (typeof value === "object" && value !== null && !Array.isArray(value)) return value as JsonRecord;
throw new CodexStdioFailure("backend-response-invalid", `${method} response result must be an object`, `response:${method}`);
}
function requireNestedId(value: JsonRecord, method: string, key: string): string {
const id = stringAt(asRecordAt(value, key), "id");
if (id) return id;
throw new CodexStdioFailure("backend-response-invalid", `${method} response did not include ${key}.id`, `response:${method}`, { response: value });
}
function textInput(text: string): JsonValue[] {
return [{ type: "text", text, text_elements: [] }];
}
function fileReadable(filePath: string): JsonRecord {
try {
accessSync(filePath, fsConstants.R_OK);
return { ...pathSummary(filePath), readable: true };
} catch {
return { ...pathSummary(filePath), readable: false };
}
}
function pathSummary(value: string): JsonRecord {
const raw = String(value || "");
const parts = raw.split(/[\\/]+/u).filter(Boolean);
return {
present: raw.trim().length > 0,
absolute: path.isAbsolute(raw),
basename: parts.at(-1) ?? null,
depth: parts.length,
fingerprint: shortHash(raw),
valuePrinted: false,
};
}
function runtimeSummary(options: CodexStdioTurnOptions, env: NodeJS.ProcessEnv, codexHome: string): JsonRecord {
return {
command: options.command ?? "codex",
args: options.args ?? defaultCodexArgs,
cwd: pathSummary(options.cwd),
workspace: pathSummary(options.cwd),
codexHome: pathSummary(codexHome),
env: envSummary(env),
valuesPrinted: false,
};
}
function envSummary(env: NodeJS.ProcessEnv): JsonRecord {
const keyState: Record<string, JsonValue> = {};
for (const key of childEnvSummaryKeys) keyState[key] = { present: typeof env[key] === "string" && String(env[key]).length > 0 };
const secretLikeKeyCount = Object.keys(env).filter((key) => /auth|authorization|api[_-]?key|token|password|secret|credential/iu.test(key)).length;
return {
keyCount: Object.keys(env).length,
trackedKeys: keyState,
secretLikeKeyCount,
valuesPrinted: false,
};
}
function closeEvent(closeInfo: CodexStdioCloseInfo): JsonRecord {
return {
code: closeInfo.code,
signal: closeInfo.signal,
failureKind: closeInfo.failureKind,
message: closeInfo.message,
stderrTail: closeInfo.stderrTail.slice(-stderrEventChars),
stderrBytes: closeInfo.stderrBytes,
stderrTruncated: closeInfo.stderrTruncated || closeInfo.stderrTail.length > stderrEventChars,
};
}
function terminalFromClose(closeInfo: CodexStdioCloseInfo): { status: TerminalStatus; failureKind: FailureKind; message: string } {
const baseMessage = `codex app-server closed before turn/completed code=${closeInfo.code} signal=${closeInfo.signal}`;
const combined = [closeInfo.message ?? "", closeInfo.stderrTail].filter(Boolean).join("\n");
const failureKind = closeInfo.failureKind ?? classifyMessageFailureKind(combined, "backend-response-invalid");
const stderrPreview = closeInfo.stderrTail.trim().length > 0 ? `; stderrTail=${closeInfo.stderrTail.slice(-1000)}` : "";
return { status: "failed", failureKind, message: redactText(`${baseMessage}${stderrPreview}`) };
}
function failureFromRpcError(method: string, value: unknown): CodexStdioFailure {
const error = typeof value === "object" && value !== null ? value as Record<string, unknown> : {};
const message = typeof error.message === "string" ? error.message : JSON.stringify(redactJson(value as JsonValue));
return new CodexStdioFailure(classifyMessageFailureKind(message, "backend-protocol-error"), `codex app-server ${method} error: ${message}`, `response:${method}`, { method, error: redactJson(value as JsonValue) });
}
function spawnFailure(command: string, error: unknown): CodexStdioFailure {
const message = error instanceof Error ? error.message : String(error);
const code = typeof error === "object" && error !== null && "code" in error ? String((error as { code?: unknown }).code ?? "") : "";
return new CodexStdioFailure("backend-spawn-failed", `failed to start Codex app-server command ${command}: ${message}`, "process:spawn", { command, code });
}
function normalizeFailure(error: unknown): CodexStdioFailure {
if (error instanceof CodexStdioFailure) return error;
const message = error instanceof Error ? error.message : String(error);
return new CodexStdioFailure(classifyMessageFailureKind(message, "backend-protocol-error"), message, "codex-stdio");
}
function classifyMessageFailureKind(message: string, fallback: FailureKind): FailureKind {
const text = String(message || "").toLowerCase();
if (/rate.?limit|too many requests|\b429\b/u.test(text)) return "provider-rate-limited";
if (/\b401\b|\b403\b|unauthori[sz]ed|forbidden|invalid api key|authentication|auth failed|oauth|access token/u.test(text)) return "provider-auth-failed";
if (/timed out|timeout|idle timeout/u.test(text)) return "backend-timeout";
if (/invalid json|json parse/u.test(text)) return "backend-json-parse-error";
return fallback;
}
function positiveTimeout(value: number): number {
return Number.isFinite(value) && value > 0 ? Math.max(1, Math.floor(value)) : requestTimeoutCapMs;
}
function asRecordAt(value: JsonRecord, key: string): JsonRecord {
@@ -242,5 +550,9 @@ function asRecordAt(value: JsonRecord, key: string): JsonRecord {
}
function stringAt(value: JsonRecord, key: string): string | null {
return typeof value[key] === "string" ? value[key] : null;
return typeof value[key] === "string" && String(value[key]).length > 0 ? String(value[key]) : null;
}
function shortHash(value: string): string {
return createHash("sha256").update(value).digest("hex").slice(0, 12);
}
+2 -1
View File
@@ -1,7 +1,8 @@
export function redactText(value: string): string {
return value
.replace(/\b(sk-[A-Za-z0-9_-]{8,}|ghp_[A-Za-z0-9_]{8,}|github_pat_[A-Za-z0-9_]+)\b/gu, "REDACTED")
.replace(/(authorization\s*[:=]\s*)(bearer\s+)?[A-Za-z0-9._~+/=-]+/giu, "$1$2REDACTED")
.replace(/((?:api[_-]?key|token|password|secret)\s*[:=]\s*)[A-Za-z0-9._~+/=-]+/giu, "$1REDACTED")
.replace(/((?:api[_-]?key|token|password|secret)\s*["']?\s*[:=]\s*["']?)[A-Za-z0-9._~+/=-]+/giu, "$1REDACTED")
.replace(/(postgres(?:ql)?:\/\/[^:\s/@]+:)[^@\s]+(@)/giu, "$1REDACTED$2")
.replace(/(https?:\/\/[^:\s/@]+:)[^@\s]+(@)/giu, "$1REDACTED$2");
}
+3
View File
@@ -9,6 +9,9 @@ export type FailureKind =
| "runner-lease-conflict"
| "backend-failed"
| "backend-protocol-error"
| "backend-spawn-failed"
| "backend-json-parse-error"
| "backend-response-invalid"
| "backend-timeout"
| "provider-auth-failed"
| "provider-rate-limited"
+16
View File
@@ -1,6 +1,7 @@
import * as readline from "node:readline";
const rl = readline.createInterface({ input: process.stdin, crlfDelay: Infinity });
const mode = process.env.AGENTRUN_FAKE_CODEX_MODE ?? "success";
let threadCounter = 0;
let turnCounter = 0;
@@ -9,6 +10,10 @@ for await (const line of rl) {
if (trimmed.length === 0) continue;
const message = JSON.parse(trimmed) as { id?: number; method?: string; params?: Record<string, unknown> };
if (message.method === "initialize") {
if (mode === "invalid-json") {
process.stdout.write('{"token":"test-token-material"\n');
process.exit(0);
}
respond(message.id, { serverInfo: { name: "fake-codex-app-server", version: "self-test" } });
continue;
}
@@ -26,6 +31,17 @@ for await (const line of rl) {
continue;
}
if (message.method === "turn/start") {
if (mode === "missing-turn-result") {
respond(message.id, {});
continue;
}
if (mode === "missing-terminal") {
turnCounter += 1;
const turn = { id: `turn_selftest_${turnCounter}`, status: "running" };
notify("turn/started", { turn });
respond(message.id, { turn });
continue;
}
turnCounter += 1;
const turn = { id: `turn_selftest_${turnCounter}`, status: "completed" };
notify("turn/started", { turn });
+113 -24
View File
@@ -7,6 +7,7 @@ import { startManagerServer } from "../mgr/server.js";
import { ManagerClient } from "../mgr/client.js";
import { runOnce } from "../runner/run-once.js";
import { redactText } from "../common/redaction.js";
import type { FailureKind, JsonRecord, TerminalStatus } from "../common/types.js";
const root = path.resolve(path.dirname(fileURLToPath(import.meta.url)), "../..");
const tmp = await mkdtemp(path.join(os.tmpdir(), "agentrun-selftest-"));
@@ -21,45 +22,133 @@ try {
await writeFile(path.join(workspace, "README.md"), "self-test workspace\n");
assert.equal(redactText("Authorization: Bearer abc123"), "Authorization: Bearer REDACTED");
assert.equal(redactText('{"token":"test-token-material"}').includes("test-token-material"), false);
const server = await startManagerServer({ port: 0, host: "127.0.0.1", sourceCommit: "self-test" });
try {
const client = new ManagerClient(server.baseUrl);
const health = await client.get("/health/readiness") as { database?: { adapter?: string } };
assert.equal(health.database?.adapter, "memory-self-test");
const run = await client.post("/api/v1/runs", {
tenantId: "unidesk",
projectId: "pikasTech/unidesk",
workspaceRef: { kind: "host-path", path: workspace },
providerId: "G14",
backendProfile: "codex",
executionPolicy: {
sandbox: "workspace-write",
approval: "never",
timeoutMs: 15_000,
network: "default",
secretScope: { allowCredentialEcho: false, providerCredentials: [{ profile: "codex", secretRef: { name: "agentrun-v01-provider-codex", keys: ["auth.json", "config.toml"], mountPath: codexHome } }] },
},
traceSink: null,
}) as { id: string };
const command = await client.post(`/api/v1/runs/${run.id}/commands`, { type: "turn", payload: { prompt: "hello" }, idempotencyKey: "selftest-turn" }) as { id: string };
const duplicate = await client.post(`/api/v1/runs/${run.id}/commands`, { type: "turn", payload: { prompt: "hello" }, idempotencyKey: "selftest-turn" }) as { id: string };
assert.equal(duplicate.id, command.id);
const fakePath = path.join(root, "src/selftest/fake-codex-app-server.ts");
const fakeCommand = process.env.AGENTRUN_SELFTEST_CODEX_COMMAND ?? process.execPath;
const fakeArgs = process.env.AGENTRUN_SELFTEST_CODEX_ARGS ? JSON.parse(process.env.AGENTRUN_SELFTEST_CODEX_ARGS) as string[] : [fakePath];
const result = await runOnce({ managerUrl: server.baseUrl, runId: run.id, codexCommand: fakeCommand, codexArgs: fakeArgs, codexHome, env: { CODEX_HOME: codexHome } });
const happy = await createRunWithCommand(client, workspace, codexHome, "hello", "selftest-turn", 15_000);
const result = await runOnce({ managerUrl: server.baseUrl, runId: happy.runId, codexCommand: fakeCommand, codexArgs: fakeArgs, codexHome, env: { CODEX_HOME: codexHome } });
assert.equal(result.terminalStatus, "completed");
const events = await client.get(`/api/v1/runs/${run.id}/events?afterSeq=0&limit=100`) as { items?: Array<{ type: string; payload: unknown }> };
const events = await client.get(`/api/v1/runs/${happy.runId}/events?afterSeq=0&limit=100`) as { items?: Array<{ type: string; payload: unknown }> };
assert.ok(events.items?.some((event) => event.type === "assistant_message"));
assert.equal(JSON.stringify(events).includes("test-token-material"), false);
assert.equal(JSON.stringify(events).includes("Bearer test-token"), false);
const finalRun = await client.get(`/api/v1/runs/${run.id}`) as { terminalStatus?: string };
assertNoSecretLeak(events);
const finalRun = await client.get(`/api/v1/runs/${happy.runId}`) as { terminalStatus?: string };
assert.equal(finalRun.terminalStatus, "completed");
console.log(JSON.stringify({ ok: true, tests: ["manager-memory-lifecycle", "codex-stdio-fake-turn", "redaction"], runId: run.id }));
await runFailureCase({
client,
managerUrl: server.baseUrl,
workspace,
codexHome,
fakeCommand,
fakeArgs,
mode: "missing-turn-result",
expectedStatus: "failed",
expectedFailureKind: "backend-response-invalid",
});
await runFailureCase({
client,
managerUrl: server.baseUrl,
workspace,
codexHome,
fakeCommand,
fakeArgs,
mode: "invalid-json",
expectedStatus: "failed",
expectedFailureKind: "backend-json-parse-error",
});
await runFailureCase({
client,
managerUrl: server.baseUrl,
workspace,
codexHome,
fakeCommand,
fakeArgs,
mode: "missing-terminal",
expectedStatus: "failed",
expectedFailureKind: "backend-timeout",
timeoutMs: 500,
});
await runSpawnFailureCase({
client,
managerUrl: server.baseUrl,
workspace,
codexHome,
});
console.log(JSON.stringify({ ok: true, tests: ["manager-memory-lifecycle", "codex-stdio-fake-turn", "codex-stdio-missing-turn-result", "codex-stdio-invalid-json", "codex-stdio-timeout", "codex-stdio-spawn-failure", "redaction"], runId: happy.runId }));
} finally {
await new Promise<void>((resolve) => server.server.close(() => resolve()));
}
} finally {
await rm(tmp, { recursive: true, force: true });
}
async function createRunWithCommand(client: ManagerClient, workspace: string, codexHome: string, prompt: string, idempotencyKey: string, timeoutMs: number): Promise<{ runId: string; commandId: string }> {
const run = await client.post("/api/v1/runs", {
tenantId: "unidesk",
projectId: "pikasTech/unidesk",
workspaceRef: { kind: "host-path", path: workspace },
providerId: "G14",
backendProfile: "codex",
executionPolicy: {
sandbox: "workspace-write",
approval: "never",
timeoutMs,
network: "default",
secretScope: { allowCredentialEcho: false, providerCredentials: [{ profile: "codex", secretRef: { name: "agentrun-v01-provider-codex", keys: ["auth.json", "config.toml"], mountPath: codexHome } }] },
},
traceSink: null,
}) as { id: string };
const command = await client.post(`/api/v1/runs/${run.id}/commands`, { type: "turn", payload: { prompt }, idempotencyKey }) as { id: string };
const duplicate = await client.post(`/api/v1/runs/${run.id}/commands`, { type: "turn", payload: { prompt }, idempotencyKey }) as { id: string };
assert.equal(duplicate.id, command.id);
return { runId: run.id, commandId: command.id };
}
async function runFailureCase(options: { client: ManagerClient; managerUrl: string; workspace: string; codexHome: string; fakeCommand: string; fakeArgs: string[]; mode: string; expectedStatus: TerminalStatus; expectedFailureKind: FailureKind; timeoutMs?: number }): Promise<void> {
const item = await createRunWithCommand(options.client, options.workspace, options.codexHome, `failure ${options.mode}`, `selftest-${options.mode}`, options.timeoutMs ?? 3_000);
const result = await runOnce({
managerUrl: options.managerUrl,
runId: item.runId,
codexCommand: options.fakeCommand,
codexArgs: options.fakeArgs,
codexHome: options.codexHome,
env: { CODEX_HOME: options.codexHome, AGENTRUN_FAKE_CODEX_MODE: options.mode },
}) as JsonRecord;
assert.equal(result.terminalStatus, options.expectedStatus, options.mode);
assert.equal(result.failureKind, options.expectedFailureKind, options.mode);
const events = await options.client.get(`/api/v1/runs/${item.runId}/events?afterSeq=0&limit=100`) as { items?: Array<{ type: string; payload: unknown }> };
assert.ok(events.items?.some((event) => event.type === "error"), options.mode);
assertNoSecretLeak(events);
}
async function runSpawnFailureCase(options: { client: ManagerClient; managerUrl: string; workspace: string; codexHome: string }): Promise<void> {
const item = await createRunWithCommand(options.client, options.workspace, options.codexHome, "failure spawn", "selftest-spawn-failure", 3_000);
const result = await runOnce({
managerUrl: options.managerUrl,
runId: item.runId,
codexCommand: path.join(os.tmpdir(), `agentrun-missing-codex-${process.pid}`),
codexArgs: [],
codexHome: options.codexHome,
env: { CODEX_HOME: options.codexHome },
}) as JsonRecord;
assert.equal(result.terminalStatus, "failed", "spawn failure");
assert.equal(result.failureKind, "backend-spawn-failed", "spawn failure");
const events = await options.client.get(`/api/v1/runs/${item.runId}/events?afterSeq=0&limit=100`) as { items?: Array<{ type: string; payload: unknown }> };
assert.ok(events.items?.some((event) => event.type === "error"), "spawn failure");
assertNoSecretLeak(events);
}
function assertNoSecretLeak(value: unknown): void {
const text = JSON.stringify(value);
assert.equal(text.includes("test-token-material"), false);
assert.equal(text.includes("Bearer test-token"), false);
}