fix: 延后 codex turn completion 事件 (#233)
This commit is contained in:
@@ -457,6 +457,7 @@ async function runCodexStdioTurnWithSession(options: CodexStdioTurnOptions, sess
|
|||||||
let assistantText = "";
|
let assistantText = "";
|
||||||
const assistantDeltaProgress = createAssistantDeltaProgressState();
|
const assistantDeltaProgress = createAssistantDeltaProgressState();
|
||||||
const completedAssistantMessages: CompletedAssistantMessage[] = [];
|
const completedAssistantMessages: CompletedAssistantMessage[] = [];
|
||||||
|
const deferredTerminalEvents: BackendEvent[] = [];
|
||||||
const suppressedNotifications = createSuppressedNotificationSummary();
|
const suppressedNotifications = createSuppressedNotificationSummary();
|
||||||
let waitingFor = "codex-app-server";
|
let waitingFor = "codex-app-server";
|
||||||
let lastNotificationMethod: string | null = null;
|
let lastNotificationMethod: string | null = null;
|
||||||
@@ -672,6 +673,7 @@ async function runCodexStdioTurnWithSession(options: CodexStdioTurnOptions, sess
|
|||||||
completedAssistantMessages.push(normalized.completedAssistantMessage);
|
completedAssistantMessages.push(normalized.completedAssistantMessage);
|
||||||
emitEvent(assistantMessageEventForCompleted(normalized.completedAssistantMessage, completedAssistantMessages.length));
|
emitEvent(assistantMessageEventForCompleted(normalized.completedAssistantMessage, completedAssistantMessages.length));
|
||||||
}
|
}
|
||||||
|
if (normalized.terminalEvents) deferredTerminalEvents.push(...normalized.terminalEvents);
|
||||||
if (normalized.terminal && !terminal) {
|
if (normalized.terminal && !terminal) {
|
||||||
terminal = normalized.terminal;
|
terminal = normalized.terminal;
|
||||||
terminalResolve();
|
terminalResolve();
|
||||||
@@ -798,6 +800,7 @@ async function runCodexStdioTurnWithSession(options: CodexStdioTurnOptions, sess
|
|||||||
if (finalAssistant) emitEvent(assistantFinalResponseEvent(finalAssistant));
|
if (finalAssistant) emitEvent(assistantFinalResponseEvent(finalAssistant));
|
||||||
else if (completedAssistantMessages.length === 0) emitEvents(assistantMessageEventsForTurn(assistantText, false));
|
else if (completedAssistantMessages.length === 0) emitEvents(assistantMessageEventsForTurn(assistantText, false));
|
||||||
emitEvents(suppressedNotificationEvents(suppressedNotifications));
|
emitEvents(suppressedNotificationEvents(suppressedNotifications));
|
||||||
|
emitEvents(deferredTerminalEvents);
|
||||||
emitEvent({ type: "terminal_status", payload: { terminalStatus: terminal.status, failureKind: terminal.failureKind, message: terminal.message } });
|
emitEvent({ type: "terminal_status", payload: { terminalStatus: terminal.status, failureKind: terminal.failureKind, message: terminal.message } });
|
||||||
await liveEventWrite;
|
await liveEventWrite;
|
||||||
return { terminalStatus: terminal.status, failureKind: terminal.failureKind, failureMessage: terminal.message, events: events.map((event) => ({ ...event, payload: redactJson(event.payload) })), ...(threadId ? { threadId } : {}), ...(turnId ? { turnId } : {}) };
|
return { terminalStatus: terminal.status, failureKind: terminal.failureKind, failureMessage: terminal.message, events: events.map((event) => ({ ...event, payload: redactJson(event.payload) })), ...(threadId ? { threadId } : {}), ...(turnId ? { turnId } : {}) };
|
||||||
@@ -943,7 +946,7 @@ function escapeRegExp(value: string): string {
|
|||||||
return value.replace(/[.*+?^${}()|[\]\\]/gu, "\\$&");
|
return value.replace(/[.*+?^${}()|[\]\\]/gu, "\\$&");
|
||||||
}
|
}
|
||||||
|
|
||||||
function normalizeCodexNotification(message: JsonRecord, suppressed: SuppressedNotificationSummary): { events: BackendEvent[]; assistantDelta?: { itemId: string | null; text: string }; completedAssistantMessage?: CompletedAssistantMessage; threadId?: string; turnId?: string; terminal?: { status: TerminalStatus; failureKind: FailureKind | null; message: string | null } } {
|
function normalizeCodexNotification(message: JsonRecord, suppressed: SuppressedNotificationSummary): { events: BackendEvent[]; terminalEvents?: BackendEvent[]; assistantDelta?: { itemId: string | null; text: string }; completedAssistantMessage?: CompletedAssistantMessage; threadId?: string; turnId?: string; terminal?: { status: TerminalStatus; failureKind: FailureKind | null; message: string | null } } {
|
||||||
const method = typeof message.method === "string" ? message.method : "unknown";
|
const method = typeof message.method === "string" ? message.method : "unknown";
|
||||||
const params = asRecordAt(message, "params");
|
const params = asRecordAt(message, "params");
|
||||||
if (method === "thread/started") {
|
if (method === "thread/started") {
|
||||||
@@ -999,9 +1002,9 @@ function normalizeCodexNotification(message: JsonRecord, suppressed: SuppressedN
|
|||||||
const error = asRecordAt(turn, "error");
|
const error = asRecordAt(turn, "error");
|
||||||
const messageText = typeof error.message === "string" ? redactText(error.message) : null;
|
const messageText = typeof error.message === "string" ? redactText(error.message) : null;
|
||||||
const failureKind = status === "completed" ? null : status === "cancelled" ? "cancelled" : classifyCodexErrorRecord(Object.keys(error).length > 0 ? error : { message: turn.status }, "backend-failed");
|
const failureKind = status === "completed" ? null : status === "cancelled" ? "cancelled" : classifyCodexErrorRecord(Object.keys(error).length > 0 ? error : { message: turn.status }, "backend-failed");
|
||||||
const events: BackendEvent[] = [{ type: "backend_status", payload: { phase: method, terminalStatus: status } }];
|
const terminalEvents: BackendEvent[] = [{ type: "backend_status", payload: { phase: method, terminalStatus: status } }];
|
||||||
if (failureKind) events.push({ type: "error", payload: { failureKind, error: redactJson(error), phase: method } });
|
if (failureKind) terminalEvents.push({ type: "error", payload: { failureKind, error: redactJson(error), phase: method } });
|
||||||
return { events, terminal: { status, failureKind, message: messageText } };
|
return { events: [], terminalEvents, terminal: { status, failureKind, message: messageText } };
|
||||||
}
|
}
|
||||||
return { events: [{ type: "backend_status", payload: { phase: method } }] };
|
return { events: [{ type: "backend_status", payload: { phase: method } }] };
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -118,6 +118,15 @@ const selfTest: SelfTestCase = async (context) => {
|
|||||||
assert.ok(finalMessageIndex >= 0 && finalMessageIndex < turnCompletedIndex, "final agentMessage should be emitted before turn/completed instead of being delayed to final response");
|
assert.ok(finalMessageIndex >= 0 && finalMessageIndex < turnCompletedIndex, "final agentMessage should be emitted before turn/completed instead of being delayed to final response");
|
||||||
assert.equal(finalMessageItems.some((event) => event.type === "backend_status" && String(eventPayload(event).phase ?? "").startsWith("item/agentMessage:")), false, "agentMessage lifecycle must not be persisted as backend_status noise");
|
assert.equal(finalMessageItems.some((event) => event.type === "backend_status" && String(eventPayload(event).phase ?? "").startsWith("item/agentMessage:")), false, "agentMessage lifecycle must not be persisted as backend_status noise");
|
||||||
|
|
||||||
|
const terminalBeforeFinal = await createRunWithCommand(client, context, "hello terminal before final", "selftest-terminal-before-final-agent-message", 15_000);
|
||||||
|
const terminalBeforeFinalResult = await runOnce({ managerUrl: server.baseUrl, runId: terminalBeforeFinal.runId, codexCommand: context.fakeCodexCommand, codexArgs: context.fakeCodexArgs, codexHome: context.codexHome, env: { CODEX_HOME: context.codexHome, AGENTRUN_FAKE_CODEX_MODE: "multi-agent-message-terminal-before-final" }, oneShot: true });
|
||||||
|
assert.equal(terminalBeforeFinalResult.terminalStatus, "completed", "terminal-before-final agentMessage run should complete");
|
||||||
|
const terminalBeforeFinalEvents = await client.get(`/api/v1/runs/${terminalBeforeFinal.runId}/events?afterSeq=0&limit=100`) as { items?: Array<{ type: string; payload: unknown }> };
|
||||||
|
const terminalBeforeFinalItems = terminalBeforeFinalEvents.items ?? [];
|
||||||
|
const delayedFinalIndex = terminalBeforeFinalItems.findIndex((event) => event.type === "assistant_message" && eventPayload(event).itemId === "msg_late_final");
|
||||||
|
const delayedTurnCompletedIndex = terminalBeforeFinalItems.findIndex((event) => event.type === "backend_status" && eventPayload(event).phase === "turn/completed");
|
||||||
|
assert.ok(delayedFinalIndex >= 0 && delayedTurnCompletedIndex > delayedFinalIndex, "turn/completed must be emitted after final assistant even when Codex sends terminal notification first");
|
||||||
|
|
||||||
const webSearch = await createRunWithCommand(client, context, "hello web search progress", "selftest-web-search-progress", 15_000);
|
const webSearch = await createRunWithCommand(client, context, "hello web search progress", "selftest-web-search-progress", 15_000);
|
||||||
const webSearchPromise = runOnce({ managerUrl: server.baseUrl, runId: webSearch.runId, codexCommand: context.fakeCodexCommand, codexArgs: context.fakeCodexArgs, codexHome: context.codexHome, env: { CODEX_HOME: context.codexHome, AGENTRUN_FAKE_CODEX_MODE: "web-search-progress" }, oneShot: true }) as Promise<JsonRecord>;
|
const webSearchPromise = runOnce({ managerUrl: server.baseUrl, runId: webSearch.runId, codexCommand: context.fakeCodexCommand, codexArgs: context.fakeCodexArgs, codexHome: context.codexHome, env: { CODEX_HOME: context.codexHome, AGENTRUN_FAKE_CODEX_MODE: "web-search-progress" }, oneShot: true }) as Promise<JsonRecord>;
|
||||||
await waitForEvent(client, webSearch.runId, (event) => event.type === "tool_call" && eventPayload(event).type === "webSearch" && eventPayload(event).method === "item/started", "webSearch tool_call start event");
|
await waitForEvent(client, webSearch.runId, (event) => event.type === "tool_call" && eventPayload(event).type === "webSearch" && eventPayload(event).method === "item/started", "webSearch tool_call start event");
|
||||||
|
|||||||
@@ -223,6 +223,17 @@ for await (const line of rl) {
|
|||||||
respond(message.id, { turn });
|
respond(message.id, { turn });
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
if (mode === "multi-agent-message-terminal-before-final") {
|
||||||
|
turnCounter += 1;
|
||||||
|
const turn = { id: `turn_selftest_${turnCounter}`, status: "completed" };
|
||||||
|
notify("turn/started", { turn });
|
||||||
|
notify("item/agentMessage/delta", { itemId: "msg_late_progress", delta: "Progress before delayed final. " });
|
||||||
|
notify("turn/completed", { turn });
|
||||||
|
notify("item/completed", { item: { id: "msg_late_progress", type: "agentMessage", text: "Progress before delayed final." } });
|
||||||
|
notify("item/completed", { item: { id: "msg_late_final", type: "agentMessage", text: "Delayed final answer." } });
|
||||||
|
respond(message.id, { turn });
|
||||||
|
continue;
|
||||||
|
}
|
||||||
if (mode === "web-search-progress") {
|
if (mode === "web-search-progress") {
|
||||||
turnCounter += 1;
|
turnCounter += 1;
|
||||||
const turn = { id: `turn_selftest_${turnCounter}`, status: "completed" };
|
const turn = { id: `turn_selftest_${turnCounter}`, status: "completed" };
|
||||||
|
|||||||
Reference in New Issue
Block a user