fix: 收敛 codex stdio trace 噪声 (#66)

Co-authored-by: Codex <codex@pikas.tech>
This commit is contained in:
Lyon
2026-06-02 09:49:55 +08:00
committed by GitHub
parent 5db48b299e
commit 1d45d272f1
4 changed files with 109 additions and 4 deletions
@@ -69,6 +69,8 @@ Adapter 输出给 runner 的 event 类型至少包括:
事件必须有上限和分页友好形态。大型日志、完整 stdout 或完整 trace 应进入 logPath 或后续 artifact,不得一次性塞入单个 event 造成输出爆炸。
Codex app-server 的低价值内部 notification 必须在 AgentRun adapter 层收敛,不得要求 HWLAB Web/CLI 或其他消费侧自行过滤。以下事件默认不作为 durable trace event 持久化:`item/reasoning/textDelta`、纯 `reasoning` item 的 `item/started|item/completed``thread/tokenUsage/updated``account/rateLimits/updated`、普通 `warning``configWarning`。adapter 可以输出一条有界 `backend_status.phase=codex-app-server-notifications-suppressed` 摘要,只包含计数、method 和 item type,不包含 reasoning 文本、Secret、token 或 env value。真实 `agentMessage``commandExecution``command_output`、error、terminal 和生命周期事件必须继续保留。
## Failure Mapping
Adapter 必须把 backend 错误映射为稳定 failureKind
+63 -3
View File
@@ -62,6 +62,12 @@ interface CompletedAssistantMessage {
text: string;
}
interface SuppressedNotificationSummary {
total: number;
byMethod: Record<string, number>;
byItemType: Record<string, number>;
}
interface CodexStdioCloseInfo extends JsonRecord {
code: number | null;
signal: string | null;
@@ -382,6 +388,7 @@ async function runCodexStdioTurnWithSession(options: CodexStdioTurnOptions, sess
}
let assistantText = "";
const completedAssistantMessages: CompletedAssistantMessage[] = [];
const suppressedNotifications = createSuppressedNotificationSummary();
let threadId: string | undefined = options.threadId;
let turnId: string | undefined;
let terminal: { status: TerminalStatus; failureKind: FailureKind | null; message: string | null } | null = null;
@@ -405,7 +412,7 @@ async function runCodexStdioTurnWithSession(options: CodexStdioTurnOptions, sess
terminalResolve();
}, positiveTimeout(options.timeoutMs));
const stopNotifications = session.addNotificationHandler((message) => {
const normalized = normalizeCodexNotification(message);
const normalized = normalizeCodexNotification(message, suppressedNotifications);
if (normalized.threadId) threadId = normalized.threadId;
if (normalized.turnId) turnId = normalized.turnId;
emitEvents(normalized.events);
@@ -480,6 +487,7 @@ async function runCodexStdioTurnWithSession(options: CodexStdioTurnOptions, sess
if (!terminal) terminal = { status: "failed", failureKind: "backend-response-invalid", message: "codex app-server finished without terminal status" };
if (terminal.status !== "completed") emitEvents(await session.close());
if (completedAssistantMessages.length === 0) emitEvents(assistantMessageEventsForTurn(assistantText, terminal.status === "completed"));
emitEvents(suppressedNotificationEvents(suppressedNotifications));
emitEvent({ type: "terminal_status", payload: { terminalStatus: terminal.status, failureKind: terminal.failureKind, message: terminal.message } });
await liveEventWrite;
return { terminalStatus: terminal.status, failureKind: terminal.failureKind, failureMessage: terminal.message, events: events.map((event) => ({ ...event, payload: redactJson(event.payload) })), ...(threadId ? { threadId } : {}), ...(turnId ? { turnId } : {}) };
@@ -541,7 +549,7 @@ function codexHomeReadiness(codexHome: string): BackendTurnResult | null {
};
}
function normalizeCodexNotification(message: JsonRecord): { events: BackendEvent[]; assistantDelta?: string; completedAssistantMessage?: CompletedAssistantMessage; threadId?: string; turnId?: string; terminal?: { status: TerminalStatus; failureKind: FailureKind | null; message: string | null } } {
function normalizeCodexNotification(message: JsonRecord, suppressed: SuppressedNotificationSummary): { events: BackendEvent[]; assistantDelta?: string; completedAssistantMessage?: CompletedAssistantMessage; threadId?: string; turnId?: string; terminal?: { status: TerminalStatus; failureKind: FailureKind | null; message: string | null } } {
const method = typeof message.method === "string" ? message.method : "unknown";
const params = asRecordAt(message, "params");
if (method === "thread/started") {
@@ -552,8 +560,16 @@ function normalizeCodexNotification(message: JsonRecord): { events: BackendEvent
const turnId = stringAt(asRecordAt(params, "turn"), "id");
return { events: [{ type: "backend_status", payload: { phase: method, turnId } }], ...(turnId ? { turnId } : {}) };
}
if (isSuppressedCodexStatusNotification(method)) {
recordSuppressedNotification(suppressed, method);
return { events: [] };
}
if (method === "item/agentMessage/delta") return { events: [], assistantDelta: typeof params.delta === "string" ? params.delta : "" };
if (method === "item/commandExecution/outputDelta") return { events: [{ type: "command_output", payload: commandOutputPayload("stdout", typeof params.delta === "string" ? params.delta : "") }] };
if (method === "item/reasoning/textDelta") {
recordSuppressedNotification(suppressed, method, "reasoning");
return { events: [] };
}
if ((method === "item/started" || method === "item/completed") && asRecordAt(params, "item").type === "agentMessage") {
const item = asRecordAt(params, "item");
const itemId = stringAt(item, "id") ?? stringAt(params, "itemId");
@@ -564,7 +580,15 @@ function normalizeCodexNotification(message: JsonRecord): { events: BackendEvent
...(completedAssistantMessage ? { completedAssistantMessage } : {}),
};
}
if (method === "item/started" || method === "item/completed") return { events: [{ type: "tool_call", payload: toolCallPayload(method, asRecordAt(params, "item")) }] };
if (method === "item/started" || method === "item/completed") {
const item = asRecordAt(params, "item");
const itemType = typeof item.type === "string" ? item.type : "unknown";
if (isSuppressedCodexItemType(itemType)) {
recordSuppressedNotification(suppressed, method, itemType);
return { events: [] };
}
return { events: [{ type: "tool_call", payload: toolCallPayload(method, item) }] };
}
if (method === "error") {
const error = asRecordAt(params, "error");
const messageText = typeof error.message === "string" ? error.message : "Codex app-server error";
@@ -588,6 +612,42 @@ function normalizeCodexNotification(message: JsonRecord): { events: BackendEvent
return { events: [{ type: "backend_status", payload: { phase: method } }] };
}
function createSuppressedNotificationSummary(): SuppressedNotificationSummary {
return { total: 0, byMethod: {}, byItemType: {} };
}
function recordSuppressedNotification(summary: SuppressedNotificationSummary, method: string, itemType?: string): void {
summary.total += 1;
summary.byMethod[method] = (summary.byMethod[method] ?? 0) + 1;
if (itemType) summary.byItemType[itemType] = (summary.byItemType[itemType] ?? 0) + 1;
}
function suppressedNotificationEvents(summary: SuppressedNotificationSummary): BackendEvent[] {
if (summary.total === 0) return [];
return [{
type: "backend_status",
payload: {
phase: "codex-app-server-notifications-suppressed",
total: summary.total,
byMethod: sortCountRecord(summary.byMethod),
byItemType: sortCountRecord(summary.byItemType),
valuesPrinted: false,
},
}];
}
function sortCountRecord(input: Record<string, number>): JsonRecord {
return Object.fromEntries(Object.entries(input).sort(([left], [right]) => left.localeCompare(right))) as JsonRecord;
}
function isSuppressedCodexStatusNotification(method: string): boolean {
return method === "thread/tokenUsage/updated" || method === "account/rateLimits/updated" || method === "warning" || method === "configWarning";
}
function isSuppressedCodexItemType(itemType: string): boolean {
return itemType === "reasoning";
}
function assistantMessageEventForCompleted(message: CompletedAssistantMessage, messageIndex: number): BackendEvent {
return {
type: "assistant_message",
+25 -1
View File
@@ -120,6 +120,25 @@ const selfTest: SelfTestCase = async (context) => {
const liveResult = await livePromise;
assert.equal(liveResult.terminalStatus, "completed", "slow live tool event turn should complete");
const noisy = await createRunWithCommand(client, context, "hello noisy reasoning", "selftest-noisy-reasoning-events", 15_000);
const noisyResult = await runOnce({ managerUrl: server.baseUrl, runId: noisy.runId, codexCommand: context.fakeCodexCommand, codexArgs: context.fakeCodexArgs, codexHome: context.codexHome, env: { CODEX_HOME: context.codexHome, AGENTRUN_FAKE_CODEX_MODE: "noisy-reasoning-events" }, oneShot: true }) as JsonRecord;
assert.equal(noisyResult.terminalStatus, "completed", "noisy reasoning turn should complete");
const noisyEvents = await client.get(`/api/v1/runs/${noisy.runId}/events?afterSeq=0&limit=100`) as { items?: Array<{ type: string; payload: unknown }> };
const noisyItems = noisyEvents.items ?? [];
assert.equal(noisyItems.some((event) => event.type === "backend_status" && eventPayload(event).phase === "item/reasoning/textDelta"), false, "reasoning textDelta must not be persisted as backend_status");
assert.equal(noisyItems.some((event) => event.type === "backend_status" && eventPayload(event).phase === "thread/tokenUsage/updated"), false, "token usage update must not be persisted as backend_status");
assert.equal(noisyItems.some((event) => event.type === "backend_status" && eventPayload(event).phase === "account/rateLimits/updated"), false, "rate limit update must not be persisted as backend_status");
assert.equal(noisyItems.some((event) => event.type === "backend_status" && eventPayload(event).phase === "warning"), false, "low value warnings must not be persisted as backend_status");
assert.equal(noisyItems.some((event) => event.type === "backend_status" && eventPayload(event).phase === "configWarning"), false, "low value config warnings must not be persisted as backend_status");
assert.equal(noisyItems.some((event) => event.type === "tool_call" && eventPayloadItem(event).type === "reasoning"), false, "reasoning items must not be persisted as tool_call");
assert.ok(noisyItems.some((event) => event.type === "tool_call" && eventPayload(event).method === "item/started" && eventPayloadItem(event).type === "commandExecution"), "real commandExecution tool call should remain visible");
assert.ok(noisyItems.some((event) => event.type === "assistant_message" && eventPayload(event).text === "noise filtered final"), "final assistant_message should remain visible");
const suppression = noisyItems.find((event) => event.type === "backend_status" && eventPayload(event).phase === "codex-app-server-notifications-suppressed");
assert.ok(suppression, "suppression summary must be emitted when noisy notifications are filtered");
assert.equal(eventPayload(suppression ?? { payload: {} }).total, 8);
assert.equal(JSON.stringify(noisyEvents).includes("internal reasoning must not become durable trace text"), false, "suppression summary must not leak reasoning text");
assertNoSecretLeak(noisyEvents);
await runFailureCase({ client, managerUrl: server.baseUrl, context, mode: "missing-turn-result", expectedStatus: "failed", expectedFailureKind: "backend-response-invalid" });
await runFailureCase({ client, managerUrl: server.baseUrl, context, mode: "provider-401-rpc-error", expectedStatus: "failed", expectedFailureKind: "provider-auth-failed" });
await runFailureCase({ client, managerUrl: server.baseUrl, context, mode: "provider-429-terminal", expectedStatus: "failed", expectedFailureKind: "provider-rate-limited" });
@@ -131,7 +150,7 @@ const selfTest: SelfTestCase = async (context) => {
await runSecretFailureCase({ client, managerUrl: server.baseUrl, context });
await runSpawnFailureCase({ client, managerUrl: server.baseUrl, context });
return { name: "codex-stdio", tests: ["runner-lease-heartbeat", "codex-stdio-fake-turn", "codex-stdio-projected-writable-home", "codex-stdio-deepseek-profile-fake-turn", "codex-stdio-minimax-m3-profile-fake-turn", "codex-stdio-deepseek-missing-secret-no-fallback", "codex-stdio-minimax-m3-missing-secret-no-fallback", "codex-stdio-config-model-authoritative", "codex-stdio-explicit-model-forwarded", "codex-stdio-final-agent-message-only", "codex-stdio-stale-thread-fallback", "codex-stdio-live-tool-events", "codex-stdio-missing-turn-result", "codex-stdio-provider-auth-failed", "codex-stdio-provider-rate-limited", "codex-stdio-provider-503-rpc-error", "codex-stdio-provider-503-terminal", "codex-stdio-provider-503-retry-event", "codex-stdio-invalid-json", "codex-stdio-timeout", "codex-stdio-secret-unavailable", "codex-stdio-spawn-failure"] };
return { name: "codex-stdio", tests: ["runner-lease-heartbeat", "codex-stdio-fake-turn", "codex-stdio-projected-writable-home", "codex-stdio-deepseek-profile-fake-turn", "codex-stdio-minimax-m3-profile-fake-turn", "codex-stdio-deepseek-missing-secret-no-fallback", "codex-stdio-minimax-m3-missing-secret-no-fallback", "codex-stdio-config-model-authoritative", "codex-stdio-explicit-model-forwarded", "codex-stdio-final-agent-message-only", "codex-stdio-stale-thread-fallback", "codex-stdio-live-tool-events", "codex-stdio-noisy-reasoning-suppression", "codex-stdio-missing-turn-result", "codex-stdio-provider-auth-failed", "codex-stdio-provider-rate-limited", "codex-stdio-provider-503-rpc-error", "codex-stdio-provider-503-terminal", "codex-stdio-provider-503-retry-event", "codex-stdio-invalid-json", "codex-stdio-timeout", "codex-stdio-secret-unavailable", "codex-stdio-spawn-failure"] };
} finally {
await new Promise<void>((resolve) => server.server.close(() => resolve()));
}
@@ -168,6 +187,11 @@ function eventPayload(event: { payload: unknown }): JsonRecord {
return typeof event.payload === "object" && event.payload !== null && !Array.isArray(event.payload) ? event.payload as JsonRecord : {};
}
function eventPayloadItem(event: { payload: unknown }): JsonRecord {
const item = eventPayload(event).item;
return typeof item === "object" && item !== null && !Array.isArray(item) ? item as JsonRecord : {};
}
async function waitForEvent(client: ManagerClient, runId: string, predicate: (event: { type: string; payload: unknown }) => boolean, label: string): Promise<void> {
const deadline = Date.now() + 3_000;
while (Date.now() < deadline) {
+19
View File
@@ -148,6 +148,25 @@ for await (const line of rl) {
}, 50);
continue;
}
if (mode === "noisy-reasoning-events") {
turnCounter += 1;
const turn = { id: `turn_selftest_${turnCounter}`, status: "completed" };
notify("turn/started", { turn });
notify("item/started", { item: { id: "reasoning_selftest", type: "reasoning", content: [] } });
notify("item/reasoning/textDelta", { itemId: "reasoning_selftest", delta: "internal reasoning must not become durable trace text 1" });
notify("item/reasoning/textDelta", { itemId: "reasoning_selftest", delta: "internal reasoning must not become durable trace text 2" });
notify("thread/tokenUsage/updated", { usage: { inputTokens: 1, outputTokens: 2 } });
notify("account/rateLimits/updated", { limit: "selftest" });
notify("warning", { message: "low value warning should be summarized" });
notify("configWarning", { message: "low value config warning should be summarized" });
notify("item/completed", { item: { id: "reasoning_selftest", type: "reasoning", content: ["internal reasoning must not become durable trace text 3"] } });
notify("item/started", { item: { id: "tool_after_noise", type: "commandExecution", command: "echo after-noise" } });
notify("item/completed", { item: { id: "tool_after_noise", type: "commandExecution", command: "echo after-noise", status: "completed" } });
notify("item/completed", { item: { id: "msg_after_noise", type: "agentMessage", text: "noise filtered final" } });
notify("turn/completed", { turn });
respond(message.id, { turn });
continue;
}
turnCounter += 1;
const turn = { id: `turn_selftest_${turnCounter}`, status: "completed" };
notify("turn/started", { turn });