fix: drain codex terminal after quiet notifications (#235)
This commit is contained in:
@@ -18,6 +18,7 @@ const requestTimeoutCapMs = 30_000;
|
|||||||
const assistantDeltaProgressMinChars = 500;
|
const assistantDeltaProgressMinChars = 500;
|
||||||
const assistantDeltaProgressLimitChars = 1_200;
|
const assistantDeltaProgressLimitChars = 1_200;
|
||||||
const defaultIdleWarningMs = 8_000;
|
const defaultIdleWarningMs = 8_000;
|
||||||
|
const defaultTerminalNotificationDrainMs = 2_000;
|
||||||
|
|
||||||
const childEnvSummaryKeys = [
|
const childEnvSummaryKeys = [
|
||||||
"CODEX_HOME",
|
"CODEX_HOME",
|
||||||
@@ -472,15 +473,26 @@ async function runCodexStdioTurnWithSession(options: CodexStdioTurnOptions, sess
|
|||||||
const terminalPromise = new Promise<void>((resolve) => { terminalResolve = resolve; });
|
const terminalPromise = new Promise<void>((resolve) => { terminalResolve = resolve; });
|
||||||
const terminalNotificationDrainMs = codexTerminalNotificationDrainMs(env);
|
const terminalNotificationDrainMs = codexTerminalNotificationDrainMs(env);
|
||||||
let terminalNotificationDrainTimeout: NodeJS.Timeout | null = null;
|
let terminalNotificationDrainTimeout: NodeJS.Timeout | null = null;
|
||||||
|
let terminalNotificationDrainResolved = false;
|
||||||
|
const resolveTerminalNow = (): void => {
|
||||||
|
if (terminalNotificationDrainResolved) return;
|
||||||
|
terminalNotificationDrainResolved = true;
|
||||||
|
if (terminalNotificationDrainTimeout) {
|
||||||
|
clearTimeout(terminalNotificationDrainTimeout);
|
||||||
|
terminalNotificationDrainTimeout = null;
|
||||||
|
}
|
||||||
|
terminalResolve();
|
||||||
|
};
|
||||||
const resolveTerminalAfterNotificationDrain = (): void => {
|
const resolveTerminalAfterNotificationDrain = (): void => {
|
||||||
if (terminalNotificationDrainTimeout) clearTimeout(terminalNotificationDrainTimeout);
|
if (terminalNotificationDrainTimeout) clearTimeout(terminalNotificationDrainTimeout);
|
||||||
|
if (terminalNotificationDrainResolved) return;
|
||||||
if (terminalNotificationDrainMs <= 0) {
|
if (terminalNotificationDrainMs <= 0) {
|
||||||
terminalResolve();
|
resolveTerminalNow();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
terminalNotificationDrainTimeout = setTimeout(() => {
|
terminalNotificationDrainTimeout = setTimeout(() => {
|
||||||
terminalNotificationDrainTimeout = null;
|
terminalNotificationDrainTimeout = null;
|
||||||
terminalResolve();
|
resolveTerminalNow();
|
||||||
}, terminalNotificationDrainMs);
|
}, terminalNotificationDrainMs);
|
||||||
terminalNotificationDrainTimeout.unref?.();
|
terminalNotificationDrainTimeout.unref?.();
|
||||||
};
|
};
|
||||||
@@ -543,7 +555,7 @@ async function runCodexStdioTurnWithSession(options: CodexStdioTurnOptions, sess
|
|||||||
terminal = { status: "cancelled", failureKind: "cancelled", message: "cancel requested" };
|
terminal = { status: "cancelled", failureKind: "cancelled", message: "cancel requested" };
|
||||||
emitEvent({ type: "backend_status", payload: { phase: "turn-cancelled", failureKind: "cancelled" } });
|
emitEvent({ type: "backend_status", payload: { phase: "turn-cancelled", failureKind: "cancelled" } });
|
||||||
beginInterruptAndStop("cancel requested", "abort-signal");
|
beginInterruptAndStop("cancel requested", "abort-signal");
|
||||||
terminalResolve();
|
resolveTerminalNow();
|
||||||
};
|
};
|
||||||
options.abortSignal?.addEventListener("abort", abortTurn, { once: true });
|
options.abortSignal?.addEventListener("abort", abortTurn, { once: true });
|
||||||
const turnHardTimeoutMs = positiveTimeout(options.timeoutMs);
|
const turnHardTimeoutMs = positiveTimeout(options.timeoutMs);
|
||||||
@@ -577,7 +589,7 @@ async function runCodexStdioTurnWithSession(options: CodexStdioTurnOptions, sess
|
|||||||
emitEvent({ type: "error", payload: { failureKind: terminal.failureKind, message: terminal.message, phase: "turn:hard-timeout", timeoutMs: turnHardTimeoutMs, elapsedMs, idleMs: Math.max(0, Date.now() - lastActivityAt), waitingFor, lastNotificationMethod, threadId: threadId ?? null, turnId: turnId ?? null, retryable: false, retryAttempt: null, retryMaxAttempts: 0, retryExhausted: true, lastToolCall } });
|
emitEvent({ type: "error", payload: { failureKind: terminal.failureKind, message: terminal.message, phase: "turn:hard-timeout", timeoutMs: turnHardTimeoutMs, elapsedMs, idleMs: Math.max(0, Date.now() - lastActivityAt), waitingFor, lastNotificationMethod, threadId: threadId ?? null, turnId: turnId ?? null, retryable: false, retryAttempt: null, retryMaxAttempts: 0, retryExhausted: true, lastToolCall } });
|
||||||
emitCodexOtelSpan("codex_stdio.turn_hard_timeout", options, env, attrs, { status: "error", error: terminal.message });
|
emitCodexOtelSpan("codex_stdio.turn_hard_timeout", options, env, attrs, { status: "error", error: terminal.message });
|
||||||
beginInterruptAndStop("turn hard timeout", "turn:hard-timeout");
|
beginInterruptAndStop("turn hard timeout", "turn:hard-timeout");
|
||||||
terminalResolve();
|
resolveTerminalNow();
|
||||||
};
|
};
|
||||||
const scheduleTurnHardTimeout = (): void => {
|
const scheduleTurnHardTimeout = (): void => {
|
||||||
if (hardTimeout) clearTimeout(hardTimeout);
|
if (hardTimeout) clearTimeout(hardTimeout);
|
||||||
@@ -616,7 +628,7 @@ async function runCodexStdioTurnWithSession(options: CodexStdioTurnOptions, sess
|
|||||||
emitEvent({ type: "error", payload: { failureKind: terminal.failureKind, message: terminal.message, phase: "turn:missing-terminal-after-tool-timeout", timeoutMs: missingTerminalAfterToolTimeoutMs, retryable: false, retryAttempt: null, retryMaxAttempts: 0, retryExhausted: true, lastToolCall } });
|
emitEvent({ type: "error", payload: { failureKind: terminal.failureKind, message: terminal.message, phase: "turn:missing-terminal-after-tool-timeout", timeoutMs: missingTerminalAfterToolTimeoutMs, retryable: false, retryAttempt: null, retryMaxAttempts: 0, retryExhausted: true, lastToolCall } });
|
||||||
emitCodexOtelSpan("codex_stdio.missing_terminal_after_tool_timeout", options, env, attrs, { status: "error", error: terminal.message });
|
emitCodexOtelSpan("codex_stdio.missing_terminal_after_tool_timeout", options, env, attrs, { status: "error", error: terminal.message });
|
||||||
beginInterruptAndStop("missing terminal after tool timeout", "turn:missing-terminal-after-tool-timeout");
|
beginInterruptAndStop("missing terminal after tool timeout", "turn:missing-terminal-after-tool-timeout");
|
||||||
terminalResolve();
|
resolveTerminalNow();
|
||||||
};
|
};
|
||||||
const scheduleMissingTerminalAfterToolTimeout = (): void => {
|
const scheduleMissingTerminalAfterToolTimeout = (): void => {
|
||||||
clearMissingTerminalAfterToolTimeout();
|
clearMissingTerminalAfterToolTimeout();
|
||||||
@@ -646,7 +658,7 @@ async function runCodexStdioTurnWithSession(options: CodexStdioTurnOptions, sess
|
|||||||
emitEvent({ type: "error", payload: { failureKind: terminal.failureKind, message: terminal.message, phase: "turn:idle-timeout" } });
|
emitEvent({ type: "error", payload: { failureKind: terminal.failureKind, message: terminal.message, phase: "turn:idle-timeout" } });
|
||||||
emitCodexOtelSpan("codex_stdio.idle_timeout", options, env, { waitingFor, idleMs: Math.max(0, Date.now() - lastActivityAt), lastNotificationMethod, threadId: threadId ?? null, turnId: turnId ?? null, terminalStatus: terminal.status, failureKind: terminal.failureKind }, { status: "error", error: terminal.message });
|
emitCodexOtelSpan("codex_stdio.idle_timeout", options, env, { waitingFor, idleMs: Math.max(0, Date.now() - lastActivityAt), lastNotificationMethod, threadId: threadId ?? null, turnId: turnId ?? null, terminalStatus: terminal.status, failureKind: terminal.failureKind }, { status: "error", error: terminal.message });
|
||||||
beginInterruptAndStop("idle timeout", "turn:idle-timeout");
|
beginInterruptAndStop("idle timeout", "turn:idle-timeout");
|
||||||
terminalResolve();
|
resolveTerminalNow();
|
||||||
}, turnIdleTimeoutMs);
|
}, turnIdleTimeoutMs);
|
||||||
idleTimeout.unref?.();
|
idleTimeout.unref?.();
|
||||||
};
|
};
|
||||||
@@ -691,6 +703,8 @@ async function runCodexStdioTurnWithSession(options: CodexStdioTurnOptions, sess
|
|||||||
if (normalized.terminal && !terminal) {
|
if (normalized.terminal && !terminal) {
|
||||||
terminal = normalized.terminal;
|
terminal = normalized.terminal;
|
||||||
resolveTerminalAfterNotificationDrain();
|
resolveTerminalAfterNotificationDrain();
|
||||||
|
} else if (terminal && terminalNotificationDrainTimeout !== null) {
|
||||||
|
resolveTerminalAfterNotificationDrain();
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
try {
|
try {
|
||||||
@@ -1504,7 +1518,7 @@ function codexMissingTerminalAfterToolTimeoutMs(env: NodeJS.ProcessEnv, turnTime
|
|||||||
function codexTerminalNotificationDrainMs(env: NodeJS.ProcessEnv): number {
|
function codexTerminalNotificationDrainMs(env: NodeJS.ProcessEnv): number {
|
||||||
const configured = Number(env.AGENTRUN_CODEX_TERMINAL_NOTIFICATION_DRAIN_MS);
|
const configured = Number(env.AGENTRUN_CODEX_TERMINAL_NOTIFICATION_DRAIN_MS);
|
||||||
if (Number.isFinite(configured) && configured >= 0) return Math.floor(configured);
|
if (Number.isFinite(configured) && configured >= 0) return Math.floor(configured);
|
||||||
return 250;
|
return defaultTerminalNotificationDrainMs;
|
||||||
}
|
}
|
||||||
|
|
||||||
function emitCodexNotificationOtel(options: CodexStdioTurnOptions, env: NodeJS.ProcessEnv, message: JsonRecord, state: JsonRecord): void {
|
function emitCodexNotificationOtel(options: CodexStdioTurnOptions, env: NodeJS.ProcessEnv, message: JsonRecord, state: JsonRecord): void {
|
||||||
|
|||||||
@@ -119,7 +119,7 @@ const selfTest: SelfTestCase = async (context) => {
|
|||||||
assert.equal(finalMessageItems.some((event) => event.type === "backend_status" && String(eventPayload(event).phase ?? "").startsWith("item/agentMessage:")), false, "agentMessage lifecycle must not be persisted as backend_status noise");
|
assert.equal(finalMessageItems.some((event) => event.type === "backend_status" && String(eventPayload(event).phase ?? "").startsWith("item/agentMessage:")), false, "agentMessage lifecycle must not be persisted as backend_status noise");
|
||||||
|
|
||||||
const terminalBeforeFinal = await createRunWithCommand(client, context, "hello terminal before final", "selftest-terminal-before-final-agent-message", 15_000);
|
const terminalBeforeFinal = await createRunWithCommand(client, context, "hello terminal before final", "selftest-terminal-before-final-agent-message", 15_000);
|
||||||
const terminalBeforeFinalResult = await runOnce({ managerUrl: server.baseUrl, runId: terminalBeforeFinal.runId, codexCommand: context.fakeCodexCommand, codexArgs: context.fakeCodexArgs, codexHome: context.codexHome, env: { CODEX_HOME: context.codexHome, AGENTRUN_FAKE_CODEX_MODE: "multi-agent-message-terminal-before-final" }, oneShot: true });
|
const terminalBeforeFinalResult = await runOnce({ managerUrl: server.baseUrl, runId: terminalBeforeFinal.runId, codexCommand: context.fakeCodexCommand, codexArgs: context.fakeCodexArgs, codexHome: context.codexHome, env: { CODEX_HOME: context.codexHome, AGENTRUN_FAKE_CODEX_MODE: "multi-agent-message-terminal-before-final", AGENTRUN_CODEX_TERMINAL_NOTIFICATION_DRAIN_MS: "300" }, oneShot: true });
|
||||||
assert.equal(terminalBeforeFinalResult.terminalStatus, "completed", "terminal-before-final agentMessage run should complete");
|
assert.equal(terminalBeforeFinalResult.terminalStatus, "completed", "terminal-before-final agentMessage run should complete");
|
||||||
const terminalBeforeFinalEvents = await client.get(`/api/v1/runs/${terminalBeforeFinal.runId}/events?afterSeq=0&limit=100`) as { items?: Array<{ type: string; payload: unknown }> };
|
const terminalBeforeFinalEvents = await client.get(`/api/v1/runs/${terminalBeforeFinal.runId}/events?afterSeq=0&limit=100`) as { items?: Array<{ type: string; payload: unknown }> };
|
||||||
const terminalBeforeFinalItems = terminalBeforeFinalEvents.items ?? [];
|
const terminalBeforeFinalItems = terminalBeforeFinalEvents.items ?? [];
|
||||||
|
|||||||
@@ -232,8 +232,10 @@ for await (const line of rl) {
|
|||||||
notify("turn/completed", { turn });
|
notify("turn/completed", { turn });
|
||||||
setTimeout(() => {
|
setTimeout(() => {
|
||||||
notify("item/completed", { item: { id: "msg_late_progress", type: "agentMessage", text: "Progress before delayed final." } });
|
notify("item/completed", { item: { id: "msg_late_progress", type: "agentMessage", text: "Progress before delayed final." } });
|
||||||
|
}, 200).unref?.();
|
||||||
|
setTimeout(() => {
|
||||||
notify("item/completed", { item: { id: "msg_late_final", type: "agentMessage", text: "Delayed final answer." } });
|
notify("item/completed", { item: { id: "msg_late_final", type: "agentMessage", text: "Delayed final answer." } });
|
||||||
}, 20).unref?.();
|
}, 450).unref?.();
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if (mode === "web-search-progress") {
|
if (mode === "web-search-progress") {
|
||||||
|
|||||||
Reference in New Issue
Block a user