fix: 支持长工具调用 interrupt 与硬超时回收
This commit is contained in:
+91
-25
@@ -423,16 +423,75 @@ async function runCodexStdioTurnWithSession(options: CodexStdioTurnOptions, sess
|
|||||||
let client: CodexStdioClient | null = null;
|
let client: CodexStdioClient | null = null;
|
||||||
const requestTimeoutMs = Math.min(positiveTimeout(options.timeoutMs), requestTimeoutCapMs);
|
const requestTimeoutMs = Math.min(positiveTimeout(options.timeoutMs), requestTimeoutCapMs);
|
||||||
let stopActiveTurn: (() => void) | undefined;
|
let stopActiveTurn: (() => void) | undefined;
|
||||||
|
let activeTurnKey: string | null = null;
|
||||||
|
let interruptInFlight: Promise<void> | null = null;
|
||||||
|
let stopAfterInterrupt = false;
|
||||||
|
const controlRequestTimeoutMs = Math.min(requestTimeoutMs, 5_000);
|
||||||
|
const activeTurnControl = (activeThreadId: string, activeTurnId: string): CodexActiveTurnControl => ({
|
||||||
|
threadId: activeThreadId,
|
||||||
|
turnId: activeTurnId,
|
||||||
|
steer: async (prompt: string) => {
|
||||||
|
await client!.request("turn/steer", { threadId: activeThreadId, expectedTurnId: activeTurnId, input: textInput(prompt) }, requestTimeoutMs);
|
||||||
|
},
|
||||||
|
interrupt: async () => {
|
||||||
|
await client!.request("turn/interrupt", { threadId: activeThreadId, turnId: activeTurnId }, controlRequestTimeoutMs);
|
||||||
|
},
|
||||||
|
});
|
||||||
|
const exposeActiveTurn = (source: string): void => {
|
||||||
|
if (!client || !threadId || !turnId || !options.onActiveTurn) return;
|
||||||
|
const key = `${threadId}:${turnId}`;
|
||||||
|
if (activeTurnKey === key) return;
|
||||||
|
stopActiveTurn?.();
|
||||||
|
activeTurnKey = key;
|
||||||
|
emitEvent({ type: "backend_status", payload: { phase: "active-turn-control-ready", source, threadId, turnId } });
|
||||||
|
const maybeStop = options.onActiveTurn(activeTurnControl(threadId, turnId));
|
||||||
|
stopActiveTurn = typeof maybeStop === "function" ? maybeStop : undefined;
|
||||||
|
};
|
||||||
|
const requestInterrupt = (reason: string, triggerPhase: string): Promise<void> => {
|
||||||
|
const activeClient = client;
|
||||||
|
const activeThreadId = threadId;
|
||||||
|
const activeTurnId = turnId;
|
||||||
|
emitEvent({ type: "backend_status", payload: { phase: "turn-interrupt-requested", reason, triggerPhase, threadId: activeThreadId ?? null, turnId: activeTurnId ?? null } });
|
||||||
|
if (!activeClient || !activeThreadId || !activeTurnId) {
|
||||||
|
emitEvent({ type: "backend_status", payload: { phase: "turn-interrupt-unavailable", reason, triggerPhase, hasClient: Boolean(activeClient), hasThreadId: Boolean(activeThreadId), hasTurnId: Boolean(activeTurnId) } });
|
||||||
|
activeClient?.stop();
|
||||||
|
return Promise.resolve();
|
||||||
|
}
|
||||||
|
return activeClient.request("turn/interrupt", { threadId: activeThreadId, turnId: activeTurnId }, controlRequestTimeoutMs)
|
||||||
|
.then(() => {
|
||||||
|
emitEvent({ type: "backend_status", payload: { phase: "turn/interrupt:completed", reason, triggerPhase, threadId: activeThreadId, turnId: activeTurnId } });
|
||||||
|
})
|
||||||
|
.catch((error) => {
|
||||||
|
const failure = normalizeFailure(error);
|
||||||
|
emitEvent({ type: "error", payload: { failureKind: failure.failureKind, message: failure.message, phase: "turn/interrupt:failed", triggerPhase, details: failure.details } });
|
||||||
|
})
|
||||||
|
.finally(() => {
|
||||||
|
if (stopAfterInterrupt && !activeClient.isClosed) activeClient.stop();
|
||||||
|
});
|
||||||
|
};
|
||||||
|
const beginInterruptAndStop = (reason: string, triggerPhase: string): void => {
|
||||||
|
stopAfterInterrupt = true;
|
||||||
|
if (!interruptInFlight) interruptInFlight = requestInterrupt(reason, triggerPhase);
|
||||||
|
else interruptInFlight = interruptInFlight.then(() => requestInterrupt(reason, triggerPhase));
|
||||||
|
};
|
||||||
const abortTurn = (): void => {
|
const abortTurn = (): void => {
|
||||||
if (terminal) return;
|
if (terminal) return;
|
||||||
terminal = { status: "cancelled", failureKind: "cancelled", message: "cancel requested" };
|
terminal = { status: "cancelled", failureKind: "cancelled", message: "cancel requested" };
|
||||||
emitEvent({ type: "backend_status", payload: { phase: "turn-cancelled", failureKind: "cancelled" } });
|
emitEvent({ type: "backend_status", payload: { phase: "turn-cancelled", failureKind: "cancelled" } });
|
||||||
client?.stop();
|
beginInterruptAndStop("cancel requested", "abort-signal");
|
||||||
terminalResolve();
|
terminalResolve();
|
||||||
};
|
};
|
||||||
options.abortSignal?.addEventListener("abort", abortTurn, { once: true });
|
options.abortSignal?.addEventListener("abort", abortTurn, { once: true });
|
||||||
const turnIdleTimeoutMs = positiveTimeout(options.timeoutMs);
|
const turnIdleTimeoutMs = positiveTimeout(options.timeoutMs);
|
||||||
|
let hardTimeout: NodeJS.Timeout | null = null;
|
||||||
let idleTimeout: NodeJS.Timeout | null = null;
|
let idleTimeout: NodeJS.Timeout | null = null;
|
||||||
|
hardTimeout = setTimeout(() => {
|
||||||
|
if (terminal) return;
|
||||||
|
terminal = { status: "failed", failureKind: "backend-timeout", message: `codex stdio turn hard timed out after ${turnIdleTimeoutMs}ms` };
|
||||||
|
emitEvent({ type: "error", payload: { failureKind: terminal.failureKind, message: terminal.message, phase: "turn:hard-timeout" } });
|
||||||
|
beginInterruptAndStop("hard timeout", "turn:hard-timeout");
|
||||||
|
terminalResolve();
|
||||||
|
}, turnIdleTimeoutMs);
|
||||||
const refreshTurnActivity = (): void => {
|
const refreshTurnActivity = (): void => {
|
||||||
if (terminal) return;
|
if (terminal) return;
|
||||||
if (idleTimeout) clearTimeout(idleTimeout);
|
if (idleTimeout) clearTimeout(idleTimeout);
|
||||||
@@ -440,7 +499,7 @@ async function runCodexStdioTurnWithSession(options: CodexStdioTurnOptions, sess
|
|||||||
if (terminal) return;
|
if (terminal) return;
|
||||||
terminal = { status: "failed", failureKind: "backend-timeout", message: `codex stdio turn idle timed out after ${turnIdleTimeoutMs}ms without activity` };
|
terminal = { status: "failed", failureKind: "backend-timeout", message: `codex stdio turn idle timed out after ${turnIdleTimeoutMs}ms without activity` };
|
||||||
emitEvent({ type: "error", payload: { failureKind: terminal.failureKind, message: terminal.message, phase: "turn:idle-timeout" } });
|
emitEvent({ type: "error", payload: { failureKind: terminal.failureKind, message: terminal.message, phase: "turn:idle-timeout" } });
|
||||||
client?.stop();
|
beginInterruptAndStop("idle timeout", "turn:idle-timeout");
|
||||||
terminalResolve();
|
terminalResolve();
|
||||||
}, turnIdleTimeoutMs);
|
}, turnIdleTimeoutMs);
|
||||||
};
|
};
|
||||||
@@ -449,12 +508,18 @@ async function runCodexStdioTurnWithSession(options: CodexStdioTurnOptions, sess
|
|||||||
clearTimeout(idleTimeout);
|
clearTimeout(idleTimeout);
|
||||||
idleTimeout = null;
|
idleTimeout = null;
|
||||||
};
|
};
|
||||||
|
const stopTurnHardTimeout = (): void => {
|
||||||
|
if (!hardTimeout) return;
|
||||||
|
clearTimeout(hardTimeout);
|
||||||
|
hardTimeout = null;
|
||||||
|
};
|
||||||
refreshTurnActivity();
|
refreshTurnActivity();
|
||||||
const stopNotifications = session.addNotificationHandler((message) => {
|
const stopNotifications = session.addNotificationHandler((message) => {
|
||||||
refreshTurnActivity();
|
refreshTurnActivity();
|
||||||
const normalized = normalizeCodexNotification(message, suppressedNotifications);
|
const normalized = normalizeCodexNotification(message, suppressedNotifications);
|
||||||
if (normalized.threadId) threadId = normalized.threadId;
|
if (normalized.threadId) threadId = normalized.threadId;
|
||||||
if (normalized.turnId) turnId = normalized.turnId;
|
if (normalized.turnId) turnId = normalized.turnId;
|
||||||
|
exposeActiveTurn(normalized.turnId ? "turn-notification" : "notification");
|
||||||
emitEvents(normalized.events);
|
emitEvents(normalized.events);
|
||||||
if (normalized.assistantDelta) {
|
if (normalized.assistantDelta) {
|
||||||
assistantText += normalized.assistantDelta.text;
|
assistantText += normalized.assistantDelta.text;
|
||||||
@@ -511,30 +576,28 @@ async function runCodexStdioTurnWithSession(options: CodexStdioTurnOptions, sess
|
|||||||
|
|
||||||
const promptInjection = initialPromptInjection(options.initialPrompt, willResumeThread);
|
const promptInjection = initialPromptInjection(options.initialPrompt, willResumeThread);
|
||||||
emitEvent({ type: "backend_status", payload: { phase: "initial-prompt-assembly", initialPromptInjected: promptInjection.injected, reason: promptInjection.reason, initialPrompt: options.initialPrompt?.summary ?? { available: false, valuesPrinted: false }, valuesPrinted: false } });
|
emitEvent({ type: "backend_status", payload: { phase: "initial-prompt-assembly", initialPromptInjected: promptInjection.injected, reason: promptInjection.reason, initialPrompt: options.initialPrompt?.summary ?? { available: false, valuesPrinted: false }, valuesPrinted: false } });
|
||||||
const turnResponse = requireResponseRecord(await client.request("turn/start", withOptionalModel({ threadId, input: textInputForUserMessage(options.prompt, promptInjection), cwd: options.cwd, approvalPolicy: options.approvalPolicy }, options.model), requestTimeoutMs), "turn/start");
|
const turnStart = await Promise.race([
|
||||||
turnId = requireNestedId(turnResponse, "turn/start", "turn");
|
client.request("turn/start", withOptionalModel({ threadId, input: textInputForUserMessage(options.prompt, promptInjection), cwd: options.cwd, approvalPolicy: options.approvalPolicy }, options.model), requestTimeoutMs).then((response) => ({ kind: "response" as const, response })),
|
||||||
emitEvent({ type: "backend_status", payload: { phase: "turn/start:completed", turnId } });
|
terminalPromise.then(() => ({ kind: "terminal" as const })),
|
||||||
if (threadId && turnId && options.onActiveTurn) {
|
]);
|
||||||
const maybeStop = options.onActiveTurn({
|
if (turnStart.kind === "response") {
|
||||||
threadId,
|
const turnResponse = requireResponseRecord(turnStart.response, "turn/start");
|
||||||
turnId,
|
turnId = requireNestedId(turnResponse, "turn/start", "turn");
|
||||||
steer: async (prompt: string) => {
|
emitEvent({ type: "backend_status", payload: { phase: "turn/start:completed", turnId } });
|
||||||
await client!.request("turn/steer", { threadId: threadId!, expectedTurnId: turnId!, input: textInput(prompt) }, requestTimeoutMs);
|
exposeActiveTurn("turn-start-response");
|
||||||
},
|
} else {
|
||||||
interrupt: async () => {
|
emitEvent({ type: "backend_status", payload: { phase: "turn/start:interrupted-before-response", threadId: threadId ?? null, turnId: turnId ?? null } });
|
||||||
await client!.request("turn/interrupt", { threadId: threadId!, turnId: turnId! }, requestTimeoutMs);
|
|
||||||
},
|
|
||||||
});
|
|
||||||
if (typeof maybeStop === "function") stopActiveTurn = maybeStop;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
const race = await Promise.race([
|
if (!terminal) {
|
||||||
terminalPromise.then(() => ({ kind: "terminal" as const })),
|
const race = await Promise.race([
|
||||||
client.closedPromise.then((closeInfo) => ({ kind: "closed" as const, closeInfo })),
|
terminalPromise.then(() => ({ kind: "terminal" as const })),
|
||||||
]);
|
client.closedPromise.then((closeInfo) => ({ kind: "closed" as const, closeInfo })),
|
||||||
if (race.kind === "closed" && !terminal) {
|
]);
|
||||||
terminal = terminalFromClose(race.closeInfo);
|
if (race.kind === "closed" && !terminal) {
|
||||||
emitEvent({ type: "error", payload: { failureKind: terminal.failureKind, message: terminal.message, phase: "transport:closed-before-terminal", appServerExit: closeEvent(race.closeInfo) } });
|
terminal = terminalFromClose(race.closeInfo);
|
||||||
|
emitEvent({ type: "error", payload: { failureKind: terminal.failureKind, message: terminal.message, phase: "transport:closed-before-terminal", appServerExit: closeEvent(race.closeInfo) } });
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if (!terminal) terminal = { status: "failed", failureKind: "backend-response-invalid", message: "codex app-server did not emit turn/completed" };
|
if (!terminal) terminal = { status: "failed", failureKind: "backend-response-invalid", message: "codex app-server did not emit turn/completed" };
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
@@ -548,8 +611,11 @@ async function runCodexStdioTurnWithSession(options: CodexStdioTurnOptions, sess
|
|||||||
stopNotifications();
|
stopNotifications();
|
||||||
options.abortSignal?.removeEventListener("abort", abortTurn);
|
options.abortSignal?.removeEventListener("abort", abortTurn);
|
||||||
stopTurnIdleTimeout();
|
stopTurnIdleTimeout();
|
||||||
|
stopTurnHardTimeout();
|
||||||
}
|
}
|
||||||
if (!terminal) terminal = { status: "failed", failureKind: "backend-response-invalid", message: "codex app-server finished without terminal status" };
|
if (!terminal) terminal = { status: "failed", failureKind: "backend-response-invalid", message: "codex app-server finished without terminal status" };
|
||||||
|
const pendingInterrupt: Promise<void> | null = interruptInFlight as Promise<void> | null;
|
||||||
|
if (pendingInterrupt) await pendingInterrupt.catch(() => undefined);
|
||||||
if (terminal.status !== "completed") emitEvents(await session.close());
|
if (terminal.status !== "completed") emitEvents(await session.close());
|
||||||
emitEvents(flushAssistantDeltaProgress(assistantDeltaProgress));
|
emitEvents(flushAssistantDeltaProgress(assistantDeltaProgress));
|
||||||
if (completedAssistantMessages.length === 0) emitEvents(assistantMessageEventsForTurn(assistantText, terminal.status === "completed"));
|
if (completedAssistantMessages.length === 0) emitEvents(assistantMessageEventsForTurn(assistantText, terminal.status === "completed"));
|
||||||
@@ -754,7 +820,7 @@ function normalizeCodexNotification(message: JsonRecord, suppressed: SuppressedN
|
|||||||
const status = terminalStatusFromValue(turn.status);
|
const status = terminalStatusFromValue(turn.status);
|
||||||
const error = asRecordAt(turn, "error");
|
const error = asRecordAt(turn, "error");
|
||||||
const messageText = typeof error.message === "string" ? redactText(error.message) : null;
|
const messageText = typeof error.message === "string" ? redactText(error.message) : null;
|
||||||
const failureKind = status === "completed" ? null : classifyCodexErrorRecord(Object.keys(error).length > 0 ? error : { message: turn.status }, "backend-failed");
|
const failureKind = status === "completed" ? null : status === "cancelled" ? "cancelled" : classifyCodexErrorRecord(Object.keys(error).length > 0 ? error : { message: turn.status }, "backend-failed");
|
||||||
const events: BackendEvent[] = [{ type: "backend_status", payload: { phase: method, terminalStatus: status } }];
|
const events: BackendEvent[] = [{ type: "backend_status", payload: { phase: method, terminalStatus: status } }];
|
||||||
if (failureKind) events.push({ type: "error", payload: { failureKind, error: redactJson(error), phase: method } });
|
if (failureKind) events.push({ type: "error", payload: { failureKind, error: redactJson(error), phase: method } });
|
||||||
return { events, terminal: { status, failureKind, message: messageText } };
|
return { events, terminal: { status, failureKind, message: messageText } };
|
||||||
|
|||||||
+33
-9
@@ -199,7 +199,7 @@ async function executeCommand(api: RunnerManagerApi, options: RunnerOnceOptions,
|
|||||||
await appendBestEffort(api, options.runId, annotateCommandEvent(event, command.id, attemptId, runner.id));
|
await appendBestEffort(api, options.runId, annotateCommandEvent(event, command.id, attemptId, runner.id));
|
||||||
},
|
},
|
||||||
onActiveTurn: (control: BackendActiveTurnControl) => {
|
onActiveTurn: (control: BackendActiveTurnControl) => {
|
||||||
stopSteerWatch = startSteerWatch(api, options.runId, command.id, attemptId, runner.id, control, options.pollIntervalMs);
|
stopSteerWatch = startActiveTurnCommandWatch(api, options.runId, command.id, attemptId, runner.id, control, options.pollIntervalMs);
|
||||||
return () => {
|
return () => {
|
||||||
stopSteerWatch?.();
|
stopSteerWatch?.();
|
||||||
stopSteerWatch = undefined;
|
stopSteerWatch = undefined;
|
||||||
@@ -224,7 +224,7 @@ async function executeCommand(api: RunnerManagerApi, options: RunnerOnceOptions,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
function startSteerWatch(api: RunnerManagerApi, runId: string, targetCommandId: string, attemptId: string, runnerId: string, control: BackendActiveTurnControl, pollIntervalMs: number | undefined): () => void {
|
function startActiveTurnCommandWatch(api: RunnerManagerApi, runId: string, targetCommandId: string, attemptId: string, runnerId: string, control: BackendActiveTurnControl, pollIntervalMs: number | undefined): () => void {
|
||||||
let stopped = false;
|
let stopped = false;
|
||||||
let polling = false;
|
let polling = false;
|
||||||
const seen = new Set<string>();
|
const seen = new Set<string>();
|
||||||
@@ -234,13 +234,14 @@ function startSteerWatch(api: RunnerManagerApi, runId: string, targetCommandId:
|
|||||||
polling = true;
|
polling = true;
|
||||||
try {
|
try {
|
||||||
const commands = await api.listCommands(runId, { afterSeq: 0, limit: 100 });
|
const commands = await api.listCommands(runId, { afterSeq: 0, limit: 100 });
|
||||||
const pendingSteer = commands.filter((item) => item.type === "steer" && item.state === "pending" && !seen.has(item.id));
|
const pendingControlCommands = commands.filter((item) => (item.type === "steer" || item.type === "interrupt") && item.state === "pending" && !seen.has(item.id));
|
||||||
for (const steerCommand of pendingSteer) {
|
for (const controlCommand of pendingControlCommands) {
|
||||||
seen.add(steerCommand.id);
|
seen.add(controlCommand.id);
|
||||||
await handleSteerCommand(api, runId, steerCommand, targetCommandId, attemptId, runnerId, control);
|
if (controlCommand.type === "interrupt") await handleInterruptCommand(api, runId, controlCommand, targetCommandId, attemptId, runnerId, control);
|
||||||
|
else await handleSteerCommand(api, runId, controlCommand, targetCommandId, attemptId, runnerId, control);
|
||||||
}
|
}
|
||||||
} catch {
|
} catch {
|
||||||
// The active backend turn remains authoritative; missed steer commands stay pending for the next poll.
|
// The active backend turn remains authoritative; missed control commands stay pending for the next poll.
|
||||||
} finally {
|
} finally {
|
||||||
polling = false;
|
polling = false;
|
||||||
}
|
}
|
||||||
@@ -275,11 +276,29 @@ async function handleSteerCommand(api: RunnerManagerApi, runId: string, command:
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async function handleInterruptCommand(api: RunnerManagerApi, runId: string, command: CommandRecord, targetCommandId: string, attemptId: string, runnerId: string, control: BackendActiveTurnControl): Promise<void> {
|
||||||
|
const acked = await api.ackCommand(command.id);
|
||||||
|
if (acked.state === "cancelled") {
|
||||||
|
await api.reportCommandStatus(command.id, { terminalStatus: "cancelled", failureKind: "cancelled", failureMessage: "interrupt command cancelled before delivery", threadId: control.threadId, turnId: control.turnId });
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
const reason = interruptReason(command.payload);
|
||||||
|
await appendBestEffort(api, runId, { type: "backend_status", payload: { phase: "interrupt-command-acknowledged", commandId: command.id, commandType: "interrupt", targetCommandId, attemptId, runnerId, threadId: control.threadId, turnId: control.turnId, reason } });
|
||||||
|
try {
|
||||||
|
await control.interrupt();
|
||||||
|
await appendBestEffort(api, runId, { type: "backend_status", payload: { phase: "turn/interrupt:completed", commandId: command.id, commandType: "interrupt", targetCommandId, attemptId, runnerId, threadId: control.threadId, turnId: control.turnId, reason } });
|
||||||
|
await api.reportCommandStatus(command.id, { terminalStatus: "completed", failureKind: null, failureMessage: null, threadId: control.threadId, turnId: control.turnId });
|
||||||
|
} catch (error) {
|
||||||
|
const failureKind = failureKindFromError(error);
|
||||||
|
await reportNonTerminalCommandFailure(api, runId, command.id, attemptId, runnerId, { terminalStatus: terminalStatusForFailure(failureKind), failureKind, message: errorMessage(error) }, "runner:interrupt", control);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
async function failPendingSteerWithoutActiveTurn(api: RunnerManagerApi, runId: string, commands: CommandRecord[], runnerId: string, attemptId: string): Promise<void> {
|
async function failPendingSteerWithoutActiveTurn(api: RunnerManagerApi, runId: string, commands: CommandRecord[], runnerId: string, attemptId: string): Promise<void> {
|
||||||
for (const command of commands.filter((item) => item.type === "steer" && item.state === "pending")) {
|
for (const command of commands.filter((item) => (item.type === "steer" || item.type === "interrupt") && item.state === "pending")) {
|
||||||
const acked = await api.ackCommand(command.id);
|
const acked = await api.ackCommand(command.id);
|
||||||
if (acked.state === "cancelled") continue;
|
if (acked.state === "cancelled") continue;
|
||||||
await reportNonTerminalCommandFailure(api, runId, command.id, attemptId, runnerId, { terminalStatus: "blocked", failureKind: "schema-invalid", message: "steer command requires an active turn" }, "runner:steer:no-active-turn");
|
await reportNonTerminalCommandFailure(api, runId, command.id, attemptId, runnerId, { terminalStatus: "blocked", failureKind: "schema-invalid", message: `${command.type} command requires an active turn` }, `runner:${command.type}:no-active-turn`);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -296,6 +315,11 @@ function steerPrompt(payload: JsonRecord): string | null {
|
|||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function interruptReason(payload: JsonRecord): string | null {
|
||||||
|
const value = payload.reason ?? payload.message ?? payload.prompt ?? payload.text;
|
||||||
|
return typeof value === "string" && value.trim().length > 0 ? value.trim() : null;
|
||||||
|
}
|
||||||
|
|
||||||
function isTerminalRun(run: RunRecord): boolean {
|
function isTerminalRun(run: RunRecord): boolean {
|
||||||
return run.status === "completed" || run.status === "failed" || run.status === "blocked" || run.status === "cancelled";
|
return run.status === "completed" || run.status === "failed" || run.status === "blocked" || run.status === "cancelled";
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -181,6 +181,9 @@ const selfTest: SelfTestCase = async (context) => {
|
|||||||
const liveResult = await livePromise;
|
const liveResult = await livePromise;
|
||||||
assert.equal(liveResult.terminalStatus, "completed", "slow live tool event turn should complete");
|
assert.equal(liveResult.terminalStatus, "completed", "slow live tool event turn should complete");
|
||||||
|
|
||||||
|
await runInterruptBeforeTurnStartResponseCase({ client, managerUrl: server.baseUrl, context });
|
||||||
|
await runHardTimeoutDuringToolProgressCase({ client, managerUrl: server.baseUrl, context });
|
||||||
|
|
||||||
const noisy = await createRunWithCommand(client, context, "hello noisy reasoning", "selftest-noisy-reasoning-events", 15_000);
|
const noisy = await createRunWithCommand(client, context, "hello noisy reasoning", "selftest-noisy-reasoning-events", 15_000);
|
||||||
const noisyResult = await runOnce({ managerUrl: server.baseUrl, runId: noisy.runId, codexCommand: context.fakeCodexCommand, codexArgs: context.fakeCodexArgs, codexHome: context.codexHome, env: { CODEX_HOME: context.codexHome, AGENTRUN_FAKE_CODEX_MODE: "noisy-reasoning-events" }, oneShot: true }) as JsonRecord;
|
const noisyResult = await runOnce({ managerUrl: server.baseUrl, runId: noisy.runId, codexCommand: context.fakeCodexCommand, codexArgs: context.fakeCodexArgs, codexHome: context.codexHome, env: { CODEX_HOME: context.codexHome, AGENTRUN_FAKE_CODEX_MODE: "noisy-reasoning-events" }, oneShot: true }) as JsonRecord;
|
||||||
assert.equal(noisyResult.terminalStatus, "completed", "noisy reasoning turn should complete");
|
assert.equal(noisyResult.terminalStatus, "completed", "noisy reasoning turn should complete");
|
||||||
@@ -239,7 +242,7 @@ const selfTest: SelfTestCase = async (context) => {
|
|||||||
await runSessionStorageSubdirCase({ client, managerUrl: server.baseUrl, context });
|
await runSessionStorageSubdirCase({ client, managerUrl: server.baseUrl, context });
|
||||||
await runSessionStorageNoSecretLeakCase({ client, managerUrl: server.baseUrl, context });
|
await runSessionStorageNoSecretLeakCase({ client, managerUrl: server.baseUrl, context });
|
||||||
|
|
||||||
return { name: "codex-stdio", tests: ["runner-lease-heartbeat", "runner-lease-conflict-recovery", "codex-stdio-fake-turn", "codex-stdio-k8s-sandbox-override", "codex-stdio-projected-writable-home", "codex-stdio-deepseek-profile-fake-turn", "codex-stdio-dsflash-go-profile-fake-turn", "codex-stdio-dsflash-go-config-metadata", "codex-stdio-minimax-m3-profile-fake-turn", "codex-stdio-deepseek-missing-secret-no-fallback", "codex-stdio-minimax-m3-missing-secret-no-fallback", "codex-stdio-config-model-authoritative", "codex-stdio-explicit-model-forwarded", "codex-stdio-final-agent-message-only", "codex-stdio-web-search-progress", "codex-stdio-stale-thread-resume-failed", "codex-stdio-live-tool-events", "codex-stdio-noisy-reasoning-suppression", "codex-stdio-missing-turn-result", "codex-stdio-provider-auth-failed", "codex-stdio-provider-rate-limited", "codex-stdio-provider-invalid-tool-call", "codex-stdio-provider-compact-unsupported", "codex-stdio-provider-503-rpc-error", "codex-stdio-provider-503-terminal", "codex-stdio-provider-503-retry-event", "codex-stdio-invalid-json", "codex-stdio-timeout", "codex-stdio-idle-timeout-progress-refresh", "codex-stdio-command-failure-keeps-run-open", "codex-stdio-secret-unavailable", "codex-stdio-spawn-failure"] };
|
return { name: "codex-stdio", tests: ["runner-lease-heartbeat", "runner-lease-conflict-recovery", "codex-stdio-fake-turn", "codex-stdio-k8s-sandbox-override", "codex-stdio-projected-writable-home", "codex-stdio-deepseek-profile-fake-turn", "codex-stdio-dsflash-go-profile-fake-turn", "codex-stdio-dsflash-go-config-metadata", "codex-stdio-minimax-m3-profile-fake-turn", "codex-stdio-deepseek-missing-secret-no-fallback", "codex-stdio-minimax-m3-missing-secret-no-fallback", "codex-stdio-config-model-authoritative", "codex-stdio-explicit-model-forwarded", "codex-stdio-final-agent-message-only", "codex-stdio-web-search-progress", "codex-stdio-stale-thread-resume-failed", "codex-stdio-live-tool-events", "codex-stdio-interrupt-before-turn-start-response", "codex-stdio-hard-timeout-during-tool-progress", "codex-stdio-noisy-reasoning-suppression", "codex-stdio-missing-turn-result", "codex-stdio-provider-auth-failed", "codex-stdio-provider-rate-limited", "codex-stdio-provider-invalid-tool-call", "codex-stdio-provider-compact-unsupported", "codex-stdio-provider-503-rpc-error", "codex-stdio-provider-503-terminal", "codex-stdio-provider-503-retry-event", "codex-stdio-invalid-json", "codex-stdio-timeout", "codex-stdio-idle-timeout-progress-refresh", "codex-stdio-command-failure-keeps-run-open", "codex-stdio-secret-unavailable", "codex-stdio-spawn-failure"] };
|
||||||
} finally {
|
} finally {
|
||||||
await new Promise<void>((resolve) => server.server.close(() => resolve()));
|
await new Promise<void>((resolve) => server.server.close(() => resolve()));
|
||||||
}
|
}
|
||||||
@@ -281,7 +284,7 @@ async function runLeaseConflictRecoveryCase(options: { client: ManagerClient; ma
|
|||||||
}
|
}
|
||||||
|
|
||||||
async function runSlowProgressIdleCase(options: { client: ManagerClient; managerUrl: string; context: SelfTestContext }): Promise<void> {
|
async function runSlowProgressIdleCase(options: { client: ManagerClient; managerUrl: string; context: SelfTestContext }): Promise<void> {
|
||||||
const item = await createRunWithCommand(options.client, options.context, "slow progress before terminal", "selftest-slow-progress-idle-refresh", 60);
|
const item = await createRunWithCommand(options.client, options.context, "slow progress before terminal", "selftest-slow-progress-idle-refresh", 250);
|
||||||
const result = await runOnce({
|
const result = await runOnce({
|
||||||
managerUrl: options.managerUrl,
|
managerUrl: options.managerUrl,
|
||||||
runId: item.runId,
|
runId: item.runId,
|
||||||
@@ -296,6 +299,55 @@ async function runSlowProgressIdleCase(options: { client: ManagerClient; manager
|
|||||||
assert.equal(events.items?.some((event) => event.type === "error" && eventPayload(event).failureKind === "backend-timeout"), false, "progressing turns must not fail on total elapsed time");
|
assert.equal(events.items?.some((event) => event.type === "error" && eventPayload(event).failureKind === "backend-timeout"), false, "progressing turns must not fail on total elapsed time");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async function runInterruptBeforeTurnStartResponseCase(options: { client: ManagerClient; managerUrl: string; context: SelfTestContext }): Promise<void> {
|
||||||
|
const item = await createRunWithCommand(options.client, options.context, "interrupt hanging tool before turn/start response", "selftest-interrupt-before-turn-start-response", 1_000);
|
||||||
|
const runPromise = runOnce({
|
||||||
|
managerUrl: options.managerUrl,
|
||||||
|
runId: item.runId,
|
||||||
|
codexCommand: options.context.fakeCodexCommand,
|
||||||
|
codexArgs: options.context.fakeCodexArgs,
|
||||||
|
codexHome: options.context.codexHome,
|
||||||
|
env: { CODEX_HOME: options.context.codexHome, AGENTRUN_FAKE_CODEX_MODE: "tool-hangs-before-turn-start-response" },
|
||||||
|
oneShot: true,
|
||||||
|
}) as Promise<JsonRecord>;
|
||||||
|
await waitForEvent(options.client, item.runId, (event) => event.type === "tool_call" && eventPayload(event).itemId === "tool_hang_before_response", "tool_call before turn/start response");
|
||||||
|
const interrupt = await options.client.post(`/api/v1/runs/${item.runId}/commands`, { type: "interrupt", payload: { reason: "self-test interrupt hanging tool" }, idempotencyKey: "selftest-interrupt-before-response-command" }) as { id: string };
|
||||||
|
const result = await runPromise;
|
||||||
|
assert.equal(result.terminalStatus, "cancelled", "interrupt should cancel the active turn even if turn/start response has not returned");
|
||||||
|
assert.equal(result.failureKind, "cancelled");
|
||||||
|
const interruptCommand = await options.client.get(`/api/v1/runs/${item.runId}/commands/${interrupt.id}`) as { state?: string };
|
||||||
|
assert.equal(interruptCommand.state, "completed", "interrupt control command should be terminal completed after delivery");
|
||||||
|
const events = await options.client.get(`/api/v1/runs/${item.runId}/events?afterSeq=0&limit=100`) as { items?: Array<{ type: string; payload: unknown }> };
|
||||||
|
assert.ok(events.items?.some((event) => event.type === "backend_status" && eventPayload(event).phase === "active-turn-control-ready"), "turn/started notification should expose active turn control before turn/start response");
|
||||||
|
assert.ok(events.items?.some((event) => event.type === "backend_status" && eventPayload(event).phase === "interrupt-command-acknowledged"), "interrupt command acknowledgement should be visible");
|
||||||
|
assert.ok(events.items?.some((event) => event.type === "backend_status" && eventPayload(event).phase === "turn/interrupt:completed"), "interrupt delivery result should be visible");
|
||||||
|
assert.ok(events.items?.some((event) => event.type === "backend_status" && eventPayload(event).phase === "turn/start:interrupted-before-response"), "turn/start pending request should be bypassed after interrupt terminal");
|
||||||
|
assertNoSecretLeak({ result, events });
|
||||||
|
}
|
||||||
|
|
||||||
|
async function runHardTimeoutDuringToolProgressCase(options: { client: ManagerClient; managerUrl: string; context: SelfTestContext }): Promise<void> {
|
||||||
|
const item = await createRunWithCommand(options.client, options.context, "hard timeout during tool progress", "selftest-hard-timeout-tool-progress", 120);
|
||||||
|
const result = await runOnce({
|
||||||
|
managerUrl: options.managerUrl,
|
||||||
|
runId: item.runId,
|
||||||
|
codexCommand: options.context.fakeCodexCommand,
|
||||||
|
codexArgs: options.context.fakeCodexArgs,
|
||||||
|
codexHome: options.context.codexHome,
|
||||||
|
env: { CODEX_HOME: options.context.codexHome, AGENTRUN_FAKE_CODEX_MODE: "hard-timeout-tool-progress" },
|
||||||
|
oneShot: true,
|
||||||
|
}) as JsonRecord;
|
||||||
|
assert.equal(result.terminalStatus, "failed", "hard timeout should fail even while the tool keeps producing progress");
|
||||||
|
assert.equal(result.failureKind, "backend-timeout");
|
||||||
|
const events = await options.client.get(`/api/v1/runs/${item.runId}/events?afterSeq=0&limit=100`) as { items?: Array<{ type: string; payload: unknown }> };
|
||||||
|
assert.ok(events.items?.some((event) => event.type === "command_output" && String(eventPayload(event).text ?? "").includes("progress")), "progress output should be visible before hard timeout");
|
||||||
|
assert.ok(events.items?.some((event) => event.type === "error" && eventPayload(event).phase === "turn:hard-timeout"), "hard timeout should be recorded as an error event");
|
||||||
|
assert.ok(events.items?.some((event) => event.type === "backend_status" && eventPayload(event).phase === "turn-interrupt-requested"), "timeout should request backend interrupt before process teardown");
|
||||||
|
assert.ok(events.items?.some((event) => event.type === "backend_status" && eventPayload(event).phase === "turn/interrupt:completed"), "timeout interrupt result should be visible");
|
||||||
|
const command = await options.client.get(`/api/v1/runs/${item.runId}/commands/${item.commandId}`) as { state?: string };
|
||||||
|
assert.equal(command.state, "failed", "hard timed out command should be failed");
|
||||||
|
assertNoSecretLeak({ result, events });
|
||||||
|
}
|
||||||
|
|
||||||
async function runFailureDoesNotTerminalRunCase(options: { client: ManagerClient; managerUrl: string; context: SelfTestContext }): Promise<void> {
|
async function runFailureDoesNotTerminalRunCase(options: { client: ManagerClient; managerUrl: string; context: SelfTestContext }): Promise<void> {
|
||||||
const item = await createRunWithCommand(options.client, options.context, "first command fails", "selftest-command-failure-keeps-run-open", 3_000);
|
const item = await createRunWithCommand(options.client, options.context, "first command fails", "selftest-command-failure-keeps-run-open", 3_000);
|
||||||
const result = await runOnce({
|
const result = await runOnce({
|
||||||
|
|||||||
@@ -224,6 +224,29 @@ for await (const line of rl) {
|
|||||||
}, 50);
|
}, 50);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
if (mode === "tool-hangs-before-turn-start-response") {
|
||||||
|
turnCounter += 1;
|
||||||
|
const turn = { id: `turn_selftest_${turnCounter}`, status: "running" };
|
||||||
|
notify("turn/started", { turn });
|
||||||
|
notify("item/started", { item: { id: "tool_hang_before_response", type: "commandExecution", command: "hwpod cmd git clone", status: "running", processId: process.pid } });
|
||||||
|
notify("item/commandExecution/outputDelta", { itemId: "tool_hang_before_response", delta: "clone started\n" });
|
||||||
|
activeSteerTurn = { id: turn.id, completed: false, timer: setTimeout(() => undefined, 60_000) };
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (mode === "hard-timeout-tool-progress") {
|
||||||
|
turnCounter += 1;
|
||||||
|
const turn = { id: `turn_selftest_${turnCounter}`, status: "running" };
|
||||||
|
notify("turn/started", { turn });
|
||||||
|
notify("item/started", { item: { id: "tool_hard_timeout", type: "commandExecution", command: "hwpod cmd long-running", status: "running", processId: process.pid } });
|
||||||
|
respond(message.id, { turn });
|
||||||
|
activeSteerTurn = { id: turn.id, completed: false, timer: null };
|
||||||
|
let ticks = 0;
|
||||||
|
activeSteerTurn.timer = setInterval(() => {
|
||||||
|
ticks += 1;
|
||||||
|
notify("item/commandExecution/outputDelta", { itemId: "tool_hard_timeout", delta: `progress ${ticks}\n` });
|
||||||
|
}, 25);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
if (mode === "steer-waits") {
|
if (mode === "steer-waits") {
|
||||||
turnCounter += 1;
|
turnCounter += 1;
|
||||||
const turn = { id: `turn_selftest_${turnCounter}`, status: "running" };
|
const turn = { id: `turn_selftest_${turnCounter}`, status: "running" };
|
||||||
@@ -276,6 +299,16 @@ for await (const line of rl) {
|
|||||||
setTimeout(() => completeActiveSteerTurn("steer-applied"), 20);
|
setTimeout(() => completeActiveSteerTurn("steer-applied"), 20);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
if (message.method === "turn/interrupt") {
|
||||||
|
if ((mode !== "tool-hangs-before-turn-start-response" && mode !== "hard-timeout-tool-progress" && mode !== "steer-waits") || !activeSteerTurn) {
|
||||||
|
respond(message.id, null, { code: -32000, message: "no active fake turn for interrupt" });
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
notify("item/completed", { item: { id: "tool_interrupted", type: "commandExecution", command: "hwpod cmd interrupted", status: "cancelled" } });
|
||||||
|
respond(message.id, { interrupted: true });
|
||||||
|
setTimeout(() => completeActiveSteerTurn("interrupt-applied", "cancelled"), 20);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
respond(message.id, null, { code: -32601, message: `unsupported fake method ${message.method ?? "unknown"}` });
|
respond(message.id, null, { code: -32601, message: `unsupported fake method ${message.method ?? "unknown"}` });
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -288,11 +321,11 @@ function notify(method: string, params: unknown): void {
|
|||||||
process.stdout.write(`${JSON.stringify({ method, params })}\n`);
|
process.stdout.write(`${JSON.stringify({ method, params })}\n`);
|
||||||
}
|
}
|
||||||
|
|
||||||
function completeActiveSteerTurn(reason: string): void {
|
function completeActiveSteerTurn(reason: string, status = "completed"): void {
|
||||||
if (!activeSteerTurn || activeSteerTurn.completed) return;
|
if (!activeSteerTurn || activeSteerTurn.completed) return;
|
||||||
activeSteerTurn.completed = true;
|
activeSteerTurn.completed = true;
|
||||||
if (activeSteerTurn.timer) clearTimeout(activeSteerTurn.timer);
|
if (activeSteerTurn.timer) clearTimeout(activeSteerTurn.timer);
|
||||||
const turn = { id: activeSteerTurn.id, status: "completed", reason };
|
const turn = { id: activeSteerTurn.id, status, reason };
|
||||||
notify("turn/completed", { turn });
|
notify("turn/completed", { turn });
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user