fix: 保留长任务过程 trace 事件

This commit is contained in:
Codex
2026-06-02 21:17:56 +08:00
parent 3018b8a937
commit ce031238f1
4 changed files with 125 additions and 24 deletions
+73 -6
View File
@@ -14,6 +14,8 @@ const defaultCodexArgs = ["app-server", "--listen", "stdio://"];
const stderrBufferBytes = 64_000;
const stderrEventChars = 4_000;
const requestTimeoutCapMs = 30_000;
const assistantDeltaProgressMinChars = 500;
const assistantDeltaProgressLimitChars = 1_200;
const childEnvSummaryKeys = [
"CODEX_HOME",
@@ -73,12 +75,21 @@ interface CompletedAssistantMessage {
text: string;
}
interface AssistantDeltaProgressItem {
itemId: string | null;
text: string;
emittedChars: number;
flushed: boolean;
}
interface SuppressedNotificationSummary {
total: number;
byMethod: Record<string, number>;
byItemType: Record<string, number>;
}
type AssistantDeltaProgressState = Map<string, AssistantDeltaProgressItem>;
interface CodexStdioCloseInfo extends JsonRecord {
code: number | null;
signal: string | null;
@@ -398,6 +409,7 @@ async function runCodexStdioTurnWithSession(options: CodexStdioTurnOptions, sess
return { terminalStatus: cancelled.status, failureKind: cancelled.failureKind, failureMessage: cancelled.message, events: events.map((event) => ({ ...event, payload: redactJson(event.payload) })) };
}
let assistantText = "";
const assistantDeltaProgress = createAssistantDeltaProgressState();
const completedAssistantMessages: CompletedAssistantMessage[] = [];
const suppressedNotifications = createSuppressedNotificationSummary();
let threadId: string | undefined = options.threadId;
@@ -428,7 +440,11 @@ async function runCodexStdioTurnWithSession(options: CodexStdioTurnOptions, sess
if (normalized.threadId) threadId = normalized.threadId;
if (normalized.turnId) turnId = normalized.turnId;
emitEvents(normalized.events);
if (normalized.assistantDelta) assistantText += normalized.assistantDelta;
if (normalized.assistantDelta) {
assistantText += normalized.assistantDelta.text;
const progress = recordAssistantDeltaProgress(assistantDeltaProgress, normalized.assistantDelta);
if (progress) emitEvent(progress);
}
if (normalized.completedAssistantMessage) {
completedAssistantMessages.push(normalized.completedAssistantMessage);
emitEvent(assistantMessageEventForCompleted(normalized.completedAssistantMessage, completedAssistantMessages.length));
@@ -504,6 +520,7 @@ async function runCodexStdioTurnWithSession(options: CodexStdioTurnOptions, sess
}
if (!terminal) terminal = { status: "failed", failureKind: "backend-response-invalid", message: "codex app-server finished without terminal status" };
if (terminal.status !== "completed") emitEvents(await session.close());
emitEvents(flushAssistantDeltaProgress(assistantDeltaProgress));
if (completedAssistantMessages.length === 0) emitEvents(assistantMessageEventsForTurn(assistantText, terminal.status === "completed"));
emitEvents(suppressedNotificationEvents(suppressedNotifications));
emitEvent({ type: "terminal_status", payload: { terminalStatus: terminal.status, failureKind: terminal.failureKind, message: terminal.message } });
@@ -567,7 +584,7 @@ function codexHomeReadiness(codexHome: string): BackendTurnResult | null {
};
}
function normalizeCodexNotification(message: JsonRecord, suppressed: SuppressedNotificationSummary): { events: BackendEvent[]; assistantDelta?: string; completedAssistantMessage?: CompletedAssistantMessage; threadId?: string; turnId?: string; terminal?: { status: TerminalStatus; failureKind: FailureKind | null; message: string | null } } {
function normalizeCodexNotification(message: JsonRecord, suppressed: SuppressedNotificationSummary): { events: BackendEvent[]; assistantDelta?: { itemId: string | null; text: string }; completedAssistantMessage?: CompletedAssistantMessage; threadId?: string; turnId?: string; terminal?: { status: TerminalStatus; failureKind: FailureKind | null; message: string | null } } {
const method = typeof message.method === "string" ? message.method : "unknown";
const params = asRecordAt(message, "params");
if (method === "thread/started") {
@@ -582,7 +599,7 @@ function normalizeCodexNotification(message: JsonRecord, suppressed: SuppressedN
recordSuppressedNotification(suppressed, method);
return { events: [] };
}
if (method === "item/agentMessage/delta") return { events: [], assistantDelta: typeof params.delta === "string" ? params.delta : "" };
if (method === "item/agentMessage/delta") return { events: [], assistantDelta: { itemId: stringAt(params, "itemId"), text: typeof params.delta === "string" ? params.delta : "" } };
if (method === "item/commandExecution/outputDelta") return { events: [{ type: "command_output", payload: commandOutputPayload("stdout", typeof params.delta === "string" ? params.delta : "") }] };
if (method === "item/reasoning/textDelta") {
recordSuppressedNotification(suppressed, method, "reasoning");
@@ -601,7 +618,7 @@ function normalizeCodexNotification(message: JsonRecord, suppressed: SuppressedN
if (method === "item/started" || method === "item/completed") {
const item = asRecordAt(params, "item");
const itemType = typeof item.type === "string" ? item.type : "unknown";
if (itemType !== "commandExecution" || isSuppressedCodexItemType(itemType)) {
if (!isVisibleCodexToolItemType(itemType)) {
recordSuppressedNotification(suppressed, method, itemType);
return { events: [] };
}
@@ -664,8 +681,8 @@ function isSuppressedCodexStatusNotification(method: string): boolean {
return method === "thread/tokenUsage/updated" || method === "account/rateLimits/updated" || method === "warning" || method === "configWarning";
}
function isSuppressedCodexItemType(itemType: string): boolean {
return itemType === "reasoning";
function isVisibleCodexToolItemType(itemType: string): boolean {
return itemType === "commandExecution" || itemType === "webSearch";
}
function assistantMessageEventForCompleted(message: CompletedAssistantMessage, messageIndex: number): BackendEvent {
@@ -699,6 +716,56 @@ function assistantMessageEventsForTurn(assistantDeltaText: string, completed: bo
}];
}
function createAssistantDeltaProgressState(): AssistantDeltaProgressState {
return new Map();
}
function recordAssistantDeltaProgress(state: AssistantDeltaProgressState, delta: { itemId: string | null; text: string }): BackendEvent | null {
if (!delta.text) return null;
const key = delta.itemId ?? "default";
const current = state.get(key) ?? { itemId: delta.itemId, text: "", emittedChars: 0, flushed: false };
current.text += delta.text;
current.flushed = false;
state.set(key, current);
if (current.text.length - current.emittedChars < assistantDeltaProgressMinChars) return null;
current.emittedChars = current.text.length;
return assistantDeltaProgressEvent(current, false);
}
function flushAssistantDeltaProgress(state: AssistantDeltaProgressState): BackendEvent[] {
const events: BackendEvent[] = [];
for (const item of state.values()) {
if (item.flushed || item.text.trim().length === 0 || item.text.length === item.emittedChars) continue;
item.emittedChars = item.text.length;
item.flushed = true;
events.push(assistantDeltaProgressEvent(item, true));
}
return events;
}
function assistantDeltaProgressEvent(item: AssistantDeltaProgressItem, flush: boolean): BackendEvent {
const summary = boundedTextSummary(item.text.trim(), { limitChars: assistantDeltaProgressLimitChars });
return {
type: "assistant_message",
payload: {
text: summary.text,
itemId: item.itemId,
source: "agent-message-delta-progress",
messageIndex: null,
messageCount: null,
replyAuthority: false,
final: false,
progress: true,
progressFlush: flush,
textBytes: summary.textBytes,
textTruncated: summary.textTruncated,
outputBytes: summary.outputBytes,
outputTruncated: summary.outputTruncated,
valuesPrinted: false,
},
};
}
function terminalStatusFromValue(value: unknown): TerminalStatus {
if (value === "completed") return "completed";
if (value === "cancelled" || value === "canceled" || value === "interrupted") return "cancelled";