diff --git a/docs/reference/spec-v01-backend-adapter.md b/docs/reference/spec-v01-backend-adapter.md index 18d957f..aa88e9e 100644 --- a/docs/reference/spec-v01-backend-adapter.md +++ b/docs/reference/spec-v01-backend-adapter.md @@ -69,6 +69,8 @@ Adapter 输出给 runner 的 event 类型至少包括: 事件必须有上限和分页友好形态。大型日志、完整 stdout 或完整 trace 应进入 logPath 或后续 artifact,不得一次性塞入单个 event 造成输出爆炸。 +Codex app-server 的低价值内部 notification 必须在 AgentRun adapter 层收敛,不得要求 HWLAB Web/CLI 或其他消费侧自行过滤。以下事件默认不作为 durable trace event 持久化:`item/reasoning/textDelta`、纯 `reasoning` item 的 `item/started|item/completed`、`thread/tokenUsage/updated`、`account/rateLimits/updated`、普通 `warning` 和 `configWarning`。adapter 可以输出一条有界 `backend_status.phase=codex-app-server-notifications-suppressed` 摘要,只包含计数、method 和 item type,不包含 reasoning 文本、Secret、token 或 env value。真实 `agentMessage`、`commandExecution`、`command_output`、error、terminal 和生命周期事件必须继续保留。 + ## Failure Mapping Adapter 必须把 backend 错误映射为稳定 failureKind: diff --git a/src/backend/codex-stdio.ts b/src/backend/codex-stdio.ts index 706434c..7bdb053 100644 --- a/src/backend/codex-stdio.ts +++ b/src/backend/codex-stdio.ts @@ -62,6 +62,12 @@ interface CompletedAssistantMessage { text: string; } +interface SuppressedNotificationSummary { + total: number; + byMethod: Record; + byItemType: Record; +} + interface CodexStdioCloseInfo extends JsonRecord { code: number | null; signal: string | null; @@ -382,6 +388,7 @@ async function runCodexStdioTurnWithSession(options: CodexStdioTurnOptions, sess } let assistantText = ""; const completedAssistantMessages: CompletedAssistantMessage[] = []; + const suppressedNotifications = createSuppressedNotificationSummary(); let threadId: string | undefined = options.threadId; let turnId: string | undefined; let terminal: { status: TerminalStatus; failureKind: FailureKind | null; message: string | null } | null = null; @@ -405,7 +412,7 @@ async function runCodexStdioTurnWithSession(options: CodexStdioTurnOptions, sess terminalResolve(); }, positiveTimeout(options.timeoutMs)); const stopNotifications = session.addNotificationHandler((message) => { - const normalized = normalizeCodexNotification(message); + const normalized = normalizeCodexNotification(message, suppressedNotifications); if (normalized.threadId) threadId = normalized.threadId; if (normalized.turnId) turnId = normalized.turnId; emitEvents(normalized.events); @@ -480,6 +487,7 @@ async function runCodexStdioTurnWithSession(options: CodexStdioTurnOptions, sess if (!terminal) terminal = { status: "failed", failureKind: "backend-response-invalid", message: "codex app-server finished without terminal status" }; if (terminal.status !== "completed") emitEvents(await session.close()); if (completedAssistantMessages.length === 0) emitEvents(assistantMessageEventsForTurn(assistantText, terminal.status === "completed")); + emitEvents(suppressedNotificationEvents(suppressedNotifications)); emitEvent({ type: "terminal_status", payload: { terminalStatus: terminal.status, failureKind: terminal.failureKind, message: terminal.message } }); await liveEventWrite; return { terminalStatus: terminal.status, failureKind: terminal.failureKind, failureMessage: terminal.message, events: events.map((event) => ({ ...event, payload: redactJson(event.payload) })), ...(threadId ? { threadId } : {}), ...(turnId ? { turnId } : {}) }; @@ -541,7 +549,7 @@ function codexHomeReadiness(codexHome: string): BackendTurnResult | null { }; } -function normalizeCodexNotification(message: JsonRecord): { events: BackendEvent[]; assistantDelta?: string; completedAssistantMessage?: CompletedAssistantMessage; threadId?: string; turnId?: string; terminal?: { status: TerminalStatus; failureKind: FailureKind | null; message: string | null } } { +function normalizeCodexNotification(message: JsonRecord, suppressed: SuppressedNotificationSummary): { events: BackendEvent[]; assistantDelta?: string; completedAssistantMessage?: CompletedAssistantMessage; threadId?: string; turnId?: string; terminal?: { status: TerminalStatus; failureKind: FailureKind | null; message: string | null } } { const method = typeof message.method === "string" ? message.method : "unknown"; const params = asRecordAt(message, "params"); if (method === "thread/started") { @@ -552,8 +560,16 @@ function normalizeCodexNotification(message: JsonRecord): { events: BackendEvent const turnId = stringAt(asRecordAt(params, "turn"), "id"); return { events: [{ type: "backend_status", payload: { phase: method, turnId } }], ...(turnId ? { turnId } : {}) }; } + if (isSuppressedCodexStatusNotification(method)) { + recordSuppressedNotification(suppressed, method); + return { events: [] }; + } if (method === "item/agentMessage/delta") return { events: [], assistantDelta: typeof params.delta === "string" ? params.delta : "" }; if (method === "item/commandExecution/outputDelta") return { events: [{ type: "command_output", payload: commandOutputPayload("stdout", typeof params.delta === "string" ? params.delta : "") }] }; + if (method === "item/reasoning/textDelta") { + recordSuppressedNotification(suppressed, method, "reasoning"); + return { events: [] }; + } if ((method === "item/started" || method === "item/completed") && asRecordAt(params, "item").type === "agentMessage") { const item = asRecordAt(params, "item"); const itemId = stringAt(item, "id") ?? stringAt(params, "itemId"); @@ -564,7 +580,15 @@ function normalizeCodexNotification(message: JsonRecord): { events: BackendEvent ...(completedAssistantMessage ? { completedAssistantMessage } : {}), }; } - if (method === "item/started" || method === "item/completed") return { events: [{ type: "tool_call", payload: toolCallPayload(method, asRecordAt(params, "item")) }] }; + if (method === "item/started" || method === "item/completed") { + const item = asRecordAt(params, "item"); + const itemType = typeof item.type === "string" ? item.type : "unknown"; + if (isSuppressedCodexItemType(itemType)) { + recordSuppressedNotification(suppressed, method, itemType); + return { events: [] }; + } + return { events: [{ type: "tool_call", payload: toolCallPayload(method, item) }] }; + } if (method === "error") { const error = asRecordAt(params, "error"); const messageText = typeof error.message === "string" ? error.message : "Codex app-server error"; @@ -588,6 +612,42 @@ function normalizeCodexNotification(message: JsonRecord): { events: BackendEvent return { events: [{ type: "backend_status", payload: { phase: method } }] }; } +function createSuppressedNotificationSummary(): SuppressedNotificationSummary { + return { total: 0, byMethod: {}, byItemType: {} }; +} + +function recordSuppressedNotification(summary: SuppressedNotificationSummary, method: string, itemType?: string): void { + summary.total += 1; + summary.byMethod[method] = (summary.byMethod[method] ?? 0) + 1; + if (itemType) summary.byItemType[itemType] = (summary.byItemType[itemType] ?? 0) + 1; +} + +function suppressedNotificationEvents(summary: SuppressedNotificationSummary): BackendEvent[] { + if (summary.total === 0) return []; + return [{ + type: "backend_status", + payload: { + phase: "codex-app-server-notifications-suppressed", + total: summary.total, + byMethod: sortCountRecord(summary.byMethod), + byItemType: sortCountRecord(summary.byItemType), + valuesPrinted: false, + }, + }]; +} + +function sortCountRecord(input: Record): JsonRecord { + return Object.fromEntries(Object.entries(input).sort(([left], [right]) => left.localeCompare(right))) as JsonRecord; +} + +function isSuppressedCodexStatusNotification(method: string): boolean { + return method === "thread/tokenUsage/updated" || method === "account/rateLimits/updated" || method === "warning" || method === "configWarning"; +} + +function isSuppressedCodexItemType(itemType: string): boolean { + return itemType === "reasoning"; +} + function assistantMessageEventForCompleted(message: CompletedAssistantMessage, messageIndex: number): BackendEvent { return { type: "assistant_message", diff --git a/src/selftest/cases/30-codex-stdio.ts b/src/selftest/cases/30-codex-stdio.ts index 4965323..eef8c67 100644 --- a/src/selftest/cases/30-codex-stdio.ts +++ b/src/selftest/cases/30-codex-stdio.ts @@ -120,6 +120,25 @@ const selfTest: SelfTestCase = async (context) => { const liveResult = await livePromise; assert.equal(liveResult.terminalStatus, "completed", "slow live tool event turn should complete"); + const noisy = await createRunWithCommand(client, context, "hello noisy reasoning", "selftest-noisy-reasoning-events", 15_000); + const noisyResult = await runOnce({ managerUrl: server.baseUrl, runId: noisy.runId, codexCommand: context.fakeCodexCommand, codexArgs: context.fakeCodexArgs, codexHome: context.codexHome, env: { CODEX_HOME: context.codexHome, AGENTRUN_FAKE_CODEX_MODE: "noisy-reasoning-events" }, oneShot: true }) as JsonRecord; + assert.equal(noisyResult.terminalStatus, "completed", "noisy reasoning turn should complete"); + const noisyEvents = await client.get(`/api/v1/runs/${noisy.runId}/events?afterSeq=0&limit=100`) as { items?: Array<{ type: string; payload: unknown }> }; + const noisyItems = noisyEvents.items ?? []; + assert.equal(noisyItems.some((event) => event.type === "backend_status" && eventPayload(event).phase === "item/reasoning/textDelta"), false, "reasoning textDelta must not be persisted as backend_status"); + assert.equal(noisyItems.some((event) => event.type === "backend_status" && eventPayload(event).phase === "thread/tokenUsage/updated"), false, "token usage update must not be persisted as backend_status"); + assert.equal(noisyItems.some((event) => event.type === "backend_status" && eventPayload(event).phase === "account/rateLimits/updated"), false, "rate limit update must not be persisted as backend_status"); + assert.equal(noisyItems.some((event) => event.type === "backend_status" && eventPayload(event).phase === "warning"), false, "low value warnings must not be persisted as backend_status"); + assert.equal(noisyItems.some((event) => event.type === "backend_status" && eventPayload(event).phase === "configWarning"), false, "low value config warnings must not be persisted as backend_status"); + assert.equal(noisyItems.some((event) => event.type === "tool_call" && eventPayloadItem(event).type === "reasoning"), false, "reasoning items must not be persisted as tool_call"); + assert.ok(noisyItems.some((event) => event.type === "tool_call" && eventPayload(event).method === "item/started" && eventPayloadItem(event).type === "commandExecution"), "real commandExecution tool call should remain visible"); + assert.ok(noisyItems.some((event) => event.type === "assistant_message" && eventPayload(event).text === "noise filtered final"), "final assistant_message should remain visible"); + const suppression = noisyItems.find((event) => event.type === "backend_status" && eventPayload(event).phase === "codex-app-server-notifications-suppressed"); + assert.ok(suppression, "suppression summary must be emitted when noisy notifications are filtered"); + assert.equal(eventPayload(suppression ?? { payload: {} }).total, 8); + assert.equal(JSON.stringify(noisyEvents).includes("internal reasoning must not become durable trace text"), false, "suppression summary must not leak reasoning text"); + assertNoSecretLeak(noisyEvents); + await runFailureCase({ client, managerUrl: server.baseUrl, context, mode: "missing-turn-result", expectedStatus: "failed", expectedFailureKind: "backend-response-invalid" }); await runFailureCase({ client, managerUrl: server.baseUrl, context, mode: "provider-401-rpc-error", expectedStatus: "failed", expectedFailureKind: "provider-auth-failed" }); await runFailureCase({ client, managerUrl: server.baseUrl, context, mode: "provider-429-terminal", expectedStatus: "failed", expectedFailureKind: "provider-rate-limited" }); @@ -131,7 +150,7 @@ const selfTest: SelfTestCase = async (context) => { await runSecretFailureCase({ client, managerUrl: server.baseUrl, context }); await runSpawnFailureCase({ client, managerUrl: server.baseUrl, context }); - return { name: "codex-stdio", tests: ["runner-lease-heartbeat", "codex-stdio-fake-turn", "codex-stdio-projected-writable-home", "codex-stdio-deepseek-profile-fake-turn", "codex-stdio-minimax-m3-profile-fake-turn", "codex-stdio-deepseek-missing-secret-no-fallback", "codex-stdio-minimax-m3-missing-secret-no-fallback", "codex-stdio-config-model-authoritative", "codex-stdio-explicit-model-forwarded", "codex-stdio-final-agent-message-only", "codex-stdio-stale-thread-fallback", "codex-stdio-live-tool-events", "codex-stdio-missing-turn-result", "codex-stdio-provider-auth-failed", "codex-stdio-provider-rate-limited", "codex-stdio-provider-503-rpc-error", "codex-stdio-provider-503-terminal", "codex-stdio-provider-503-retry-event", "codex-stdio-invalid-json", "codex-stdio-timeout", "codex-stdio-secret-unavailable", "codex-stdio-spawn-failure"] }; + return { name: "codex-stdio", tests: ["runner-lease-heartbeat", "codex-stdio-fake-turn", "codex-stdio-projected-writable-home", "codex-stdio-deepseek-profile-fake-turn", "codex-stdio-minimax-m3-profile-fake-turn", "codex-stdio-deepseek-missing-secret-no-fallback", "codex-stdio-minimax-m3-missing-secret-no-fallback", "codex-stdio-config-model-authoritative", "codex-stdio-explicit-model-forwarded", "codex-stdio-final-agent-message-only", "codex-stdio-stale-thread-fallback", "codex-stdio-live-tool-events", "codex-stdio-noisy-reasoning-suppression", "codex-stdio-missing-turn-result", "codex-stdio-provider-auth-failed", "codex-stdio-provider-rate-limited", "codex-stdio-provider-503-rpc-error", "codex-stdio-provider-503-terminal", "codex-stdio-provider-503-retry-event", "codex-stdio-invalid-json", "codex-stdio-timeout", "codex-stdio-secret-unavailable", "codex-stdio-spawn-failure"] }; } finally { await new Promise((resolve) => server.server.close(() => resolve())); } @@ -168,6 +187,11 @@ function eventPayload(event: { payload: unknown }): JsonRecord { return typeof event.payload === "object" && event.payload !== null && !Array.isArray(event.payload) ? event.payload as JsonRecord : {}; } +function eventPayloadItem(event: { payload: unknown }): JsonRecord { + const item = eventPayload(event).item; + return typeof item === "object" && item !== null && !Array.isArray(item) ? item as JsonRecord : {}; +} + async function waitForEvent(client: ManagerClient, runId: string, predicate: (event: { type: string; payload: unknown }) => boolean, label: string): Promise { const deadline = Date.now() + 3_000; while (Date.now() < deadline) { diff --git a/src/selftest/fake-codex-app-server.ts b/src/selftest/fake-codex-app-server.ts index 3a994c9..e6d42f4 100644 --- a/src/selftest/fake-codex-app-server.ts +++ b/src/selftest/fake-codex-app-server.ts @@ -148,6 +148,25 @@ for await (const line of rl) { }, 50); continue; } + if (mode === "noisy-reasoning-events") { + turnCounter += 1; + const turn = { id: `turn_selftest_${turnCounter}`, status: "completed" }; + notify("turn/started", { turn }); + notify("item/started", { item: { id: "reasoning_selftest", type: "reasoning", content: [] } }); + notify("item/reasoning/textDelta", { itemId: "reasoning_selftest", delta: "internal reasoning must not become durable trace text 1" }); + notify("item/reasoning/textDelta", { itemId: "reasoning_selftest", delta: "internal reasoning must not become durable trace text 2" }); + notify("thread/tokenUsage/updated", { usage: { inputTokens: 1, outputTokens: 2 } }); + notify("account/rateLimits/updated", { limit: "selftest" }); + notify("warning", { message: "low value warning should be summarized" }); + notify("configWarning", { message: "low value config warning should be summarized" }); + notify("item/completed", { item: { id: "reasoning_selftest", type: "reasoning", content: ["internal reasoning must not become durable trace text 3"] } }); + notify("item/started", { item: { id: "tool_after_noise", type: "commandExecution", command: "echo after-noise" } }); + notify("item/completed", { item: { id: "tool_after_noise", type: "commandExecution", command: "echo after-noise", status: "completed" } }); + notify("item/completed", { item: { id: "msg_after_noise", type: "agentMessage", text: "noise filtered final" } }); + notify("turn/completed", { turn }); + respond(message.id, { turn }); + continue; + } turnCounter += 1; const turn = { id: `turn_selftest_${turnCounter}`, status: "completed" }; notify("turn/started", { turn });