Merge pull request #221 from pikasTech/fix/missing-terminal-after-tool-220

修复 codex-stdio 缺 terminal 后无限 running
This commit is contained in:
Lyon
2026-06-22 01:57:38 +08:00
committed by GitHub
5 changed files with 85 additions and 26 deletions
+56 -9
View File
@@ -532,8 +532,49 @@ async function runCodexStdioTurnWithSession(options: CodexStdioTurnOptions, sess
options.abortSignal?.addEventListener("abort", abortTurn, { once: true });
const turnIdleTimeoutMs = positiveTimeout(options.timeoutMs);
const idleWarningMs = codexIdleWarningMs(env, turnIdleTimeoutMs);
const missingTerminalAfterToolTimeoutMs = codexMissingTerminalAfterToolTimeoutMs(env, turnIdleTimeoutMs);
let idleTimeout: NodeJS.Timeout | null = null;
let idleWarningTimeout: NodeJS.Timeout | null = null;
let missingTerminalAfterToolTimeout: NodeJS.Timeout | null = null;
const missingTerminalAfterToolAttrs = (): JsonRecord => ({
waitingFor,
idleMs: Math.max(0, Date.now() - lastActivityAt),
timeoutMs: missingTerminalAfterToolTimeoutMs,
lastNotificationMethod,
threadId: threadId ?? null,
turnId: turnId ?? null,
terminalStatus: terminal?.status ?? null,
retryable: false,
retryAttempt: null,
retryMaxAttempts: 0,
retryExhausted: true,
lastToolCall,
});
const reportMissingTerminalAfterTool = (): void => {
if (!lastToolCall || missingTerminalAfterToolReported) return;
missingTerminalAfterToolReported = true;
emitCodexOtelSpan("codex_stdio.missing_terminal_after_tool", options, env, missingTerminalAfterToolAttrs());
};
const clearMissingTerminalAfterToolTimeout = (): void => {
if (!missingTerminalAfterToolTimeout) return;
clearTimeout(missingTerminalAfterToolTimeout);
missingTerminalAfterToolTimeout = null;
};
const failMissingTerminalAfterTool = (): void => {
if (terminal || !lastToolCall) return;
reportMissingTerminalAfterTool();
terminal = { status: "failed", failureKind: "backend-timeout", message: `codex app-server did not emit turn/completed within ${missingTerminalAfterToolTimeoutMs}ms after tool activity` };
const attrs = { ...missingTerminalAfterToolAttrs(), terminalStatus: terminal.status, failureKind: terminal.failureKind };
emitEvent({ type: "error", payload: { failureKind: terminal.failureKind, message: terminal.message, phase: "turn:missing-terminal-after-tool-timeout", timeoutMs: missingTerminalAfterToolTimeoutMs, retryable: false, retryAttempt: null, retryMaxAttempts: 0, retryExhausted: true, lastToolCall } });
emitCodexOtelSpan("codex_stdio.missing_terminal_after_tool_timeout", options, env, attrs, { status: "error", error: terminal.message });
beginInterruptAndStop("missing terminal after tool timeout", "turn:missing-terminal-after-tool-timeout");
terminalResolve();
};
const scheduleMissingTerminalAfterToolTimeout = (): void => {
clearMissingTerminalAfterToolTimeout();
missingTerminalAfterToolTimeout = setTimeout(failMissingTerminalAfterTool, missingTerminalAfterToolTimeoutMs);
missingTerminalAfterToolTimeout.unref?.();
};
const scheduleIdleWarning = (): void => {
if (idleWarningTimeout) clearTimeout(idleWarningTimeout);
idleWarningTimeout = setTimeout(() => {
@@ -541,10 +582,7 @@ async function runCodexStdioTurnWithSession(options: CodexStdioTurnOptions, sess
const idleMs = Math.max(0, Date.now() - lastActivityAt);
const attrs = { waitingFor, idleMs, lastNotificationMethod, threadId: threadId ?? null, turnId: turnId ?? null, terminalStatus: null };
emitCodexOtelSpan("codex_stdio.idle_warning", options, env, attrs);
if (lastToolCall && !missingTerminalAfterToolReported) {
missingTerminalAfterToolReported = true;
emitCodexOtelSpan("codex_stdio.missing_terminal_after_tool", options, env, { ...attrs, lastToolCall });
}
reportMissingTerminalAfterTool();
}, idleWarningMs);
idleWarningTimeout.unref?.();
};
@@ -552,6 +590,7 @@ async function runCodexStdioTurnWithSession(options: CodexStdioTurnOptions, sess
if (terminal) return;
lastActivityAt = Date.now();
scheduleIdleWarning();
if (lastToolCall) scheduleMissingTerminalAfterToolTimeout();
if (idleTimeout) clearTimeout(idleTimeout);
idleTimeout = setTimeout(() => {
if (terminal) return;
@@ -569,6 +608,7 @@ async function runCodexStdioTurnWithSession(options: CodexStdioTurnOptions, sess
idleTimeout = null;
if (idleWarningTimeout) clearTimeout(idleWarningTimeout);
idleWarningTimeout = null;
clearMissingTerminalAfterToolTimeout();
};
refreshTurnActivity();
const stopNotifications = session.addNotificationHandler((message) => {
@@ -580,7 +620,11 @@ async function runCodexStdioTurnWithSession(options: CodexStdioTurnOptions, sess
if (normalized.turnId) turnId = normalized.turnId;
waitingFor = waitingForAfterNotification(message, normalized.terminal !== undefined);
const toolSummary = toolCallSummaryFromNotification(message);
if (toolSummary?.status === "completed" || toolSummary?.status === "failed") lastToolCall = toolSummary;
if (toolSummary?.status === "completed" || toolSummary?.status === "failed") {
lastToolCall = toolSummary;
missingTerminalAfterToolReported = false;
scheduleMissingTerminalAfterToolTimeout();
}
exposeActiveTurn(normalized.turnId ? "turn-notification" : "notification");
emitEvents(normalized.events);
if (normalized.assistantDelta) {
@@ -694,10 +738,7 @@ async function runCodexStdioTurnWithSession(options: CodexStdioTurnOptions, sess
}
}
if (!terminal) {
if (lastToolCall && !missingTerminalAfterToolReported) {
missingTerminalAfterToolReported = true;
emitCodexOtelSpan("codex_stdio.missing_terminal_after_tool", options, env, { waitingFor, idleMs: Math.max(0, Date.now() - lastActivityAt), lastNotificationMethod, threadId: threadId ?? null, turnId: turnId ?? null, lastToolCall });
}
reportMissingTerminalAfterTool();
terminal = { status: "failed", failureKind: "backend-response-invalid", message: "codex app-server did not emit turn/completed" };
}
} catch (error) {
@@ -1401,6 +1442,12 @@ function codexIdleWarningMs(env: NodeJS.ProcessEnv, turnTimeoutMs: number): numb
return Math.max(250, Math.floor(turnTimeoutMs / 2));
}
function codexMissingTerminalAfterToolTimeoutMs(env: NodeJS.ProcessEnv, turnTimeoutMs: number): number {
const configured = Number(env.AGENTRUN_CODEX_MISSING_TERMINAL_AFTER_TOOL_TIMEOUT_MS);
if (Number.isFinite(configured) && configured > 0) return Math.max(250, Math.floor(configured));
return positiveTimeout(turnTimeoutMs);
}
function emitCodexNotificationOtel(options: CodexStdioTurnOptions, env: NodeJS.ProcessEnv, message: JsonRecord, state: JsonRecord): void {
const attributes = { ...state, ...notificationOtelAttributes(message) };
emitCodexOtelSpan("codex_stdio.notification", options, env, attributes);
+5
View File
@@ -49,6 +49,7 @@ export interface RunnerJobDefaults {
jobNamePrefix?: string;
lane?: string;
runnerIdleTimeoutMs?: number;
missingTerminalAfterToolTimeoutMs?: number;
kubectlCommand?: string;
unideskSshEndpointEnv?: JsonRecord;
retention?: RunnerRetentionOptions;
@@ -64,6 +65,7 @@ export interface CreateRunnerJobInput extends JsonRecord {
sourceCommit?: string;
serviceAccountName?: string;
runnerIdleTimeoutMs?: number;
missingTerminalAfterToolTimeoutMs?: number;
idempotencyKey?: string;
imageRef?: JsonRecord;
transientEnv?: JsonRecord[];
@@ -95,6 +97,7 @@ export async function createKubernetesRunnerJob(options: { store: AgentRunStore;
const attemptId = optionalString(options.input.attemptId) ?? `attempt_${Date.now().toString(36)}`;
const runnerId = optionalString(options.input.runnerId);
const runnerIdleTimeoutMs = optionalPositiveInteger(options.input.runnerIdleTimeoutMs, "runnerIdleTimeoutMs") ?? options.defaults.runnerIdleTimeoutMs;
const missingTerminalAfterToolTimeoutMs = optionalPositiveInteger(options.input.missingTerminalAfterToolTimeoutMs, "missingTerminalAfterToolTimeoutMs") ?? options.defaults.missingTerminalAfterToolTimeoutMs;
const transientEnvSecretName = transientEnv.length > 0 ? transientEnvSecretNameForRun(run.id, commandId, attemptId, jobNamePrefix) : null;
const renderTransientEnv = transientEnvSecretName ? transientEnvWithSecretRefs(transientEnv, transientEnvSecretName) : transientEnv;
const normalizedPayload = {
@@ -108,6 +111,7 @@ export async function createKubernetesRunnerJob(options: { store: AgentRunStore;
attemptId: optionalString(options.input.attemptId) ?? null,
runnerId: optionalString(options.input.runnerId) ?? null,
runnerIdleTimeoutMs: runnerIdleTimeoutMs ?? null,
missingTerminalAfterToolTimeoutMs: missingTerminalAfterToolTimeoutMs ?? null,
transientEnv: transientEnv.map((item) => ({ name: item.name, valueHash: stableHash(item.value), sensitive: true })),
};
const payloadHash = stableHash(normalizedPayload);
@@ -169,6 +173,7 @@ export async function createKubernetesRunnerJob(options: { store: AgentRunStore;
sourceCommit,
transientEnv: renderTransientEnv,
...(runnerIdleTimeoutMs !== undefined ? { runnerIdleTimeoutMs } : {}),
...(missingTerminalAfterToolTimeoutMs !== undefined ? { missingTerminalAfterToolTimeoutMs } : {}),
...(serviceAccountName ? { serviceAccountName } : {}),
...(jobNamePrefix ? { jobNamePrefix } : {}),
...(lane ? { lane } : {}),
+1
View File
@@ -56,6 +56,7 @@ function runnerJobDefaultsForRequest(defaults: ManagerServerOptions["runnerJobDe
jobNamePrefix,
lane,
...(defaults?.runnerIdleTimeoutMs !== undefined ? { runnerIdleTimeoutMs: defaults.runnerIdleTimeoutMs } : optionalPositiveIntegerRecord("runnerIdleTimeoutMs", process.env.AGENTRUN_RUNNER_IDLE_TIMEOUT_MS)),
...(defaults?.missingTerminalAfterToolTimeoutMs !== undefined ? { missingTerminalAfterToolTimeoutMs: defaults.missingTerminalAfterToolTimeoutMs } : optionalPositiveIntegerRecord("missingTerminalAfterToolTimeoutMs", process.env.AGENTRUN_RUNNER_MISSING_TERMINAL_AFTER_TOOL_TIMEOUT_MS)),
...(defaults?.kubectlCommand ? { kubectlCommand: defaults.kubectlCommand } : {}),
...(defaults?.unideskSshEndpointEnv ? { unideskSshEndpointEnv: defaults.unideskSshEndpointEnv } : {}),
...(retention ? { retention } : {}),
+13 -4
View File
@@ -58,6 +58,7 @@ export interface RunnerJobRenderOptions {
backoffLimit?: number;
ttlSecondsAfterFinished?: number;
runnerIdleTimeoutMs?: number;
missingTerminalAfterToolTimeoutMs?: number;
transientEnv?: RunnerTransientEnv[];
sessionPvc?: RunnerSessionPvcOptions;
dryRun?: boolean;
@@ -152,7 +153,7 @@ export function renderRunnerJobDryRun(options: RunnerJobRenderOptions): JsonReco
};
}
export function renderRunnerJobManifest(options: RunnerJobRenderOptions): { manifest: JsonRecord; namespace: string; jobName: string; runnerJobId: string; runnerId: string; attemptId: string; sourceCommit: string; serviceAccountName: string; secretRefs: CredentialProjection[]; toolCredentials: ToolCredentialProjection[]; warnings: string[]; ttlSecondsAfterFinished: number; ttlPolicy: JsonRecord; runnerIdleTimeoutMs: number } {
export function renderRunnerJobManifest(options: RunnerJobRenderOptions): { manifest: JsonRecord; namespace: string; jobName: string; runnerJobId: string; runnerId: string; attemptId: string; sourceCommit: string; serviceAccountName: string; secretRefs: CredentialProjection[]; toolCredentials: ToolCredentialProjection[]; warnings: string[]; ttlSecondsAfterFinished: number; ttlPolicy: JsonRecord; runnerIdleTimeoutMs: number; missingTerminalAfterToolTimeoutMs: number } {
const namespace = options.namespace ?? "agentrun-v01";
const attemptId = options.attemptId ?? `attempt_${Date.now().toString(36)}`;
const runnerId = options.runnerId ?? `runner_${shortHash(`${options.run.id}:${attemptId}:${options.commandId}`)}`;
@@ -165,12 +166,13 @@ export function renderRunnerJobManifest(options: RunnerJobRenderOptions): { mani
const ttlSecondsAfterFinished = normalizeTtlSecondsAfterFinished(options.ttlSecondsAfterFinished, warnings);
const ttlPolicy = terminalArtifactTtlPolicy(ttlSecondsAfterFinished);
const runnerIdleTimeoutMs = normalizeRunnerIdleTimeoutMs(options.runnerIdleTimeoutMs);
const missingTerminalAfterToolTimeoutMs = normalizeMissingTerminalAfterToolTimeoutMs(options.missingTerminalAfterToolTimeoutMs, runnerIdleTimeoutMs);
const jobName = `${jobNamePrefix}-${shortDnsHash(options.run.id, attemptId)}`;
const secretRefs = credentialProjections(options.run, namespace);
const toolCredentials = toolCredentialProjections(options.run, namespace);
const sessionPvc = options.sessionPvc;
if (secretRefs.length === 0) warnings.push("run executionPolicy.secretScope 未声明 provider SecretRefrunner 将按 secret-unavailable 上报,而不会降级直连外部凭据");
const env = runnerEnv(options, { namespace, jobName, runnerJobId, runnerId, attemptId, sourceCommit, secretRefs, toolCredentials, sessionPvc, runnerIdleTimeoutMs });
const env = runnerEnv(options, { namespace, jobName, runnerJobId, runnerId, attemptId, sourceCommit, secretRefs, toolCredentials, sessionPvc, runnerIdleTimeoutMs, missingTerminalAfterToolTimeoutMs });
const manifest: JsonRecord = {
apiVersion: "batch/v1",
kind: "Job",
@@ -237,10 +239,10 @@ export function renderRunnerJobManifest(options: RunnerJobRenderOptions): { mani
},
},
};
return { manifest, namespace, jobName, runnerJobId, runnerId, attemptId, sourceCommit, serviceAccountName, secretRefs, toolCredentials, warnings, ttlSecondsAfterFinished, ttlPolicy, runnerIdleTimeoutMs };
return { manifest, namespace, jobName, runnerJobId, runnerId, attemptId, sourceCommit, serviceAccountName, secretRefs, toolCredentials, warnings, ttlSecondsAfterFinished, ttlPolicy, runnerIdleTimeoutMs, missingTerminalAfterToolTimeoutMs };
}
function runnerEnv(options: RunnerJobRenderOptions, context: { namespace: string; jobName: string; runnerJobId: string; runnerId: string; attemptId: string; sourceCommit: string; secretRefs: CredentialProjection[]; toolCredentials: ToolCredentialProjection[]; sessionPvc: RunnerSessionPvcOptions | undefined; runnerIdleTimeoutMs: number }): JsonRecord[] {
function runnerEnv(options: RunnerJobRenderOptions, context: { namespace: string; jobName: string; runnerJobId: string; runnerId: string; attemptId: string; sourceCommit: string; secretRefs: CredentialProjection[]; toolCredentials: ToolCredentialProjection[]; sessionPvc: RunnerSessionPvcOptions | undefined; runnerIdleTimeoutMs: number; missingTerminalAfterToolTimeoutMs: number }): JsonRecord[] {
const selectedSecret = context.secretRefs.find((item) => item.profile === options.run.backendProfile);
const codexHome = selectedSecret?.runtimeMountPath ?? defaultRuntimeHome(options.run.backendProfile);
const bootRepoUrl = optionalString(options.bootRepoUrl) ?? defaultBootRepoUrl;
@@ -270,6 +272,7 @@ function runnerEnv(options: RunnerJobRenderOptions, context: { namespace: string
{ name: "AGENTRUN_WORK_READY_VERSION", value: String(staticWorkReadyCapabilitySummary().version) },
{ name: "AGENTRUN_PROJECT_DEPENDENCY_POLICY", value: "explicit-cache-or-derived-image-only" },
{ name: "AGENTRUN_RUNNER_IDLE_TIMEOUT_MS", value: String(context.runnerIdleTimeoutMs) },
{ name: "AGENTRUN_CODEX_MISSING_TERMINAL_AFTER_TOOL_TIMEOUT_MS", value: String(context.missingTerminalAfterToolTimeoutMs) },
{ name: "AGENTRUN_RUNNER_POLL_INTERVAL_MS", value: "250" },
{ name: "HOME", value: "/home/agentrun" },
{ name: "CODEX_HOME", value: codexHome },
@@ -296,6 +299,12 @@ function normalizeRunnerIdleTimeoutMs(value: number | undefined): number {
return value;
}
function normalizeMissingTerminalAfterToolTimeoutMs(value: number | undefined, runnerIdleTimeoutMs: number): number {
if (value === undefined) return runnerIdleTimeoutMs;
if (!Number.isInteger(value) || value <= 0) throw new Error("missingTerminalAfterToolTimeoutMs must be a positive integer");
return value;
}
function normalizeTtlSecondsAfterFinished(value: number | undefined, warnings: string[]): number {
if (value === undefined) return minimumTerminalArtifactTtlSeconds;
if (!Number.isInteger(value) || value <= 0) throw new Error("ttlSecondsAfterFinished must be a positive integer");
+10 -13
View File
@@ -260,21 +260,18 @@ process.exit(1);
assert.ok(steerEvents.some((event) => event.type === "backend_status" && event.payload?.phase === "steer-command-acknowledged" && event.payload?.commandId === steerCommand.id && event.payload?.targetCommandId === steerRun.commandId));
assert.ok(steerEvents.some((event) => event.type === "backend_status" && event.payload?.phase === "turn/steer:completed" && event.payload?.commandId === steerCommand.id && event.payload?.targetCommandId === steerRun.commandId && event.payload.deliveryState === "forwarded-to-backend" && event.payload.targetEffect === "not-guaranteed"));
const idleAfterTool = await createHwlabRun(client, context, bundle, "hwlab-session-idle-after-tool", "complete a tool and then stay active", "hwlab-command-idle-after-tool", 10_000);
const idleAfterToolRunner = runOnce({ managerUrl: server.baseUrl, runId: idleAfterTool.runId, commandId: idleAfterTool.commandId, codexCommand: context.fakeCodexCommand, codexArgs: context.fakeCodexArgs, codexHome: context.codexHome, env: { CODEX_HOME: context.codexHome, AGENTRUN_FAKE_CODEX_MODE: "tool-completes-without-terminal", AGENTRUN_WORKSPACE_ROOT: path.join(context.tmp, "workspaces-idle-after-tool") }, oneShot: true, pollIntervalMs: 50 });
const idleAfterTool = await createHwlabRun(client, context, bundle, "hwlab-session-idle-after-tool", "complete a tool and then fail without terminal", "hwlab-command-idle-after-tool", 10_000);
const idleAfterToolRunner = runOnce({ managerUrl: server.baseUrl, runId: idleAfterTool.runId, commandId: idleAfterTool.commandId, codexCommand: context.fakeCodexCommand, codexArgs: context.fakeCodexArgs, codexHome: context.codexHome, env: { CODEX_HOME: context.codexHome, AGENTRUN_FAKE_CODEX_MODE: "tool-completes-without-terminal", AGENTRUN_CODEX_MISSING_TERMINAL_AFTER_TOOL_TIMEOUT_MS: "300", AGENTRUN_WORKSPACE_ROOT: path.join(context.tmp, "workspaces-idle-after-tool") }, oneShot: true, pollIntervalMs: 50 });
await waitForCommandState(client, idleAfterTool.runId, idleAfterTool.commandId, "acknowledged");
await waitForEvent(client, idleAfterTool.runId, (event) => event.type === "tool_call" && (event.payload as JsonRecord).status === "completed", "tool_call completed without terminal");
const idleEnvelope = await client.get(`/api/v1/runs/${idleAfterTool.runId}/commands/${idleAfterTool.commandId}/result`) as JsonRecord;
const idleLiveness = idleEnvelope.liveness as JsonRecord;
assert.equal(idleLiveness.phase, "idle-after-tool");
assert.equal(idleLiveness.active, true);
assert.equal(((idleLiveness.lastCommandActivity as JsonRecord).type), "tool_call");
const idleSession = await client.get("/api/v1/sessions/hwlab-session-idle-after-tool?readerId=cli") as JsonRecord;
assert.equal(((idleSession.liveness as JsonRecord).phase), "idle-after-tool");
assert.ok(Array.isArray(((idleSession.supervisor as JsonRecord).recoveryActions)), "session show must expose supervisor recovery actions");
await client.post(`/api/v1/commands/${idleAfterTool.commandId}/cancel`, { reason: "self-test idle-after-tool cleanup" });
await waitForCommandState(client, idleAfterTool.runId, idleAfterTool.commandId, "failed");
const idleAfterToolResult = await idleAfterToolRunner as JsonRecord;
assert.equal(idleAfterToolResult.terminalStatus, "cancelled");
assert.equal(idleAfterToolResult.terminalStatus, "failed");
assert.equal(idleAfterToolResult.failureKind, "backend-timeout");
const idleEnvelope = await client.get(`/api/v1/runs/${idleAfterTool.runId}/commands/${idleAfterTool.commandId}/result`) as JsonRecord;
assert.equal(idleEnvelope.terminalStatus, "failed");
assert.equal(idleEnvelope.failureKind, "backend-timeout");
assert.match(String(idleEnvelope.failureMessage ?? idleEnvelope.message ?? ""), /did not emit turn\/completed/u);
const runningCancel = await createHwlabRun(client, context, bundle, "hwlab-session-cancel-running", "cancel running", "hwlab-command-cancel-running", 10_000);
const running = runOnce({ managerUrl: server.baseUrl, runId: runningCancel.runId, codexCommand: context.fakeCodexCommand, codexArgs: context.fakeCodexArgs, codexHome: context.codexHome, env: { CODEX_HOME: context.codexHome, AGENTRUN_FAKE_CODEX_MODE: "missing-terminal", AGENTRUN_WORKSPACE_ROOT: path.join(context.tmp, "workspaces-running-cancel") }, oneShot: true });
@@ -283,7 +280,7 @@ process.exit(1);
const runningResult = await running;
assert.equal(runningResult.terminalStatus, "cancelled");
return { name: "hwlab-manual-dispatch", tests: ["runner-job-idempotency", "pending-cancel", "result-envelope", "session-ref-resume", "resource-gitbundle-materialization", "gitbundle-ref-resolution", "gitbundle-tools-path", "gitbundle-skill-dir-assembly", "resource-prompt-required-blocker", "resource-required-skill-blocker", "same-run-runner-multiturn", "running-steer", "idle-after-tool-liveness", "running-cancel"] };
return { name: "hwlab-manual-dispatch", tests: ["runner-job-idempotency", "pending-cancel", "result-envelope", "session-ref-resume", "resource-gitbundle-materialization", "gitbundle-ref-resolution", "gitbundle-tools-path", "gitbundle-skill-dir-assembly", "resource-prompt-required-blocker", "resource-required-skill-blocker", "same-run-runner-multiturn", "running-steer", "missing-terminal-after-tool-auto-stop", "running-cancel"] };
} finally {
await new Promise<void>((resolve) => server.server.close(() => resolve()));
}