fix: 实时上报 agentMessage

This commit is contained in:
Codex
2026-06-02 08:03:48 +08:00
parent 425619d5ad
commit 3a8a3b404d
3 changed files with 48 additions and 25 deletions
+35 -24
View File
@@ -408,9 +408,12 @@ async function runCodexStdioTurnWithSession(options: CodexStdioTurnOptions, sess
const normalized = normalizeCodexNotification(message);
if (normalized.threadId) threadId = normalized.threadId;
if (normalized.turnId) turnId = normalized.turnId;
if (normalized.assistantDelta) assistantText += normalized.assistantDelta;
if (normalized.completedAssistantMessage) completedAssistantMessages.push(normalized.completedAssistantMessage);
emitEvents(normalized.events);
if (normalized.assistantDelta) assistantText += normalized.assistantDelta;
if (normalized.completedAssistantMessage) {
completedAssistantMessages.push(normalized.completedAssistantMessage);
emitEvent(assistantMessageEventForCompleted(normalized.completedAssistantMessage, completedAssistantMessages.length));
}
if (normalized.terminal && !terminal) {
terminal = normalized.terminal;
terminalResolve();
@@ -476,7 +479,7 @@ async function runCodexStdioTurnWithSession(options: CodexStdioTurnOptions, sess
}
if (!terminal) terminal = { status: "failed", failureKind: "backend-response-invalid", message: "codex app-server finished without terminal status" };
if (terminal.status !== "completed") emitEvents(await session.close());
emitEvents(assistantMessageEventsForTurn(completedAssistantMessages, assistantText, terminal.status === "completed"));
if (completedAssistantMessages.length === 0) emitEvents(assistantMessageEventsForTurn(assistantText, terminal.status === "completed"));
emitEvent({ type: "terminal_status", payload: { terminalStatus: terminal.status, failureKind: terminal.failureKind, message: terminal.message } });
await liveEventWrite;
return { terminalStatus: terminal.status, failureKind: terminal.failureKind, failureMessage: terminal.message, events: events.map((event) => ({ ...event, payload: redactJson(event.payload) })), ...(threadId ? { threadId } : {}), ...(turnId ? { turnId } : {}) };
@@ -585,27 +588,35 @@ function normalizeCodexNotification(message: JsonRecord): { events: BackendEvent
return { events: [{ type: "backend_status", payload: { phase: method } }] };
}
function assistantMessageEventsForTurn(completedMessages: CompletedAssistantMessage[], assistantDeltaText: string, completed: boolean): BackendEvent[] {
const messages = completedMessages.length > 0
? completedMessages.map((message) => ({ ...message, source: "completed-agent-message" }))
: assistantDeltaText.trim().length > 0
? [{ itemId: null, text: assistantDeltaText, source: "agent-message-delta-fallback" }]
: [];
return messages.map((message, index) => {
const replyAuthority = completed && index === messages.length - 1;
return {
type: "assistant_message",
payload: {
text: message.text,
itemId: message.itemId,
source: message.source,
messageIndex: index + 1,
messageCount: messages.length,
replyAuthority,
final: replyAuthority,
},
};
});
function assistantMessageEventForCompleted(message: CompletedAssistantMessage, messageIndex: number): BackendEvent {
return {
type: "assistant_message",
payload: {
text: message.text,
itemId: message.itemId,
source: "completed-agent-message",
messageIndex,
messageCount: null,
replyAuthority: false,
final: false,
},
};
}
function assistantMessageEventsForTurn(assistantDeltaText: string, completed: boolean): BackendEvent[] {
if (assistantDeltaText.trim().length === 0) return [];
return [{
type: "assistant_message",
payload: {
text: assistantDeltaText,
itemId: null,
source: "agent-message-delta-fallback",
messageIndex: 1,
messageCount: 1,
replyAuthority: completed,
final: completed,
},
}];
}
function terminalStatusFromValue(value: unknown): TerminalStatus {
+6
View File
@@ -97,6 +97,12 @@ function assistantReply(events: RunEvent[]): string {
.filter((text) => text.length > 0);
const latestAuthoritative = authoritative.at(-1);
if (latestAuthoritative) return latestAuthoritative;
const completedAgentMessages = assistantEvents
.filter((event) => event.payload.source === "completed-agent-message")
.map((event) => textPayload(event.payload))
.filter((text) => text.length > 0);
const latestCompletedAgentMessage = completedAgentMessages.at(-1);
if (latestCompletedAgentMessage) return latestCompletedAgentMessage;
return assistantEvents.map((event) => textPayload(event.payload)).filter((text) => text.length > 0).join("");
}
+7 -1
View File
@@ -85,7 +85,13 @@ const selfTest: SelfTestCase = async (context) => {
assert.equal(eventPayload(assistantEvents[0] ?? { payload: {} }).replyAuthority, false);
assert.equal(eventPayload(assistantEvents[1] ?? { payload: {} }).text, "Final answer only.");
assert.equal(eventPayload(assistantEvents[1] ?? { payload: {} }).itemId, "msg_final");
assert.equal(eventPayload(assistantEvents[1] ?? { payload: {} }).replyAuthority, true);
assert.equal(eventPayload(assistantEvents[1] ?? { payload: {} }).replyAuthority, false);
const finalMessageItems = finalMessageEvents.items ?? [];
const progressMessageIndex = finalMessageItems.findIndex((event) => event.type === "assistant_message" && eventPayload(event).itemId === "msg_progress");
const finalMessageIndex = finalMessageItems.findIndex((event) => event.type === "assistant_message" && eventPayload(event).itemId === "msg_final");
const turnCompletedIndex = finalMessageItems.findIndex((event) => event.type === "backend_status" && eventPayload(event).phase === "turn/completed");
assert.ok(progressMessageIndex >= 0 && progressMessageIndex < turnCompletedIndex, "progress agentMessage should be emitted before turn/completed instead of being delayed to final response");
assert.ok(finalMessageIndex >= 0 && finalMessageIndex < turnCompletedIndex, "final agentMessage should be emitted before turn/completed instead of being delayed to final response");
const staleResume = await createStaleThreadRun(client, context);
const staleResumeResult = await runOnce({