fix: restart stale codex threads
This commit is contained in:
@@ -330,13 +330,36 @@ export async function runCodexStdioTurn(options: CodexStdioTurnOptions): Promise
|
||||
client.notify("initialized", {});
|
||||
events.push({ type: "backend_status", payload: { phase: "initialize:completed", ...backendMetadata(options), protocol: codexProtocol } });
|
||||
|
||||
const threadMethod = options.threadId ? "thread/resume" : "thread/start";
|
||||
const threadParams: JsonRecord = options.threadId
|
||||
? withOptionalModel({ threadId: options.threadId, cwd: options.cwd, approvalPolicy: options.approvalPolicy, sandbox: options.sandbox }, options.model)
|
||||
: withOptionalModel({ cwd: options.cwd, approvalPolicy: options.approvalPolicy, sandbox: options.sandbox, serviceName: "agentrun" }, options.model);
|
||||
const threadResponse = requireResponseRecord(await client.request(threadMethod, threadParams, requestTimeoutMs), threadMethod);
|
||||
threadId = requireNestedId(threadResponse, threadMethod, "thread");
|
||||
events.push({ type: "backend_status", payload: { phase: `${threadMethod}:completed`, threadId } });
|
||||
const startThread = async (phasePrefix = "thread/start"): Promise<string> => {
|
||||
const response = requireResponseRecord(await client!.request("thread/start", withOptionalModel({ cwd: options.cwd, approvalPolicy: options.approvalPolicy, sandbox: options.sandbox, serviceName: "agentrun" }, options.model), requestTimeoutMs), "thread/start");
|
||||
const nextThreadId = requireNestedId(response, "thread/start", "thread");
|
||||
events.push({ type: "backend_status", payload: { phase: `${phasePrefix}:completed`, threadId: nextThreadId } });
|
||||
return nextThreadId;
|
||||
};
|
||||
|
||||
if (options.threadId) {
|
||||
try {
|
||||
const threadResponse = requireResponseRecord(await client.request("thread/resume", withOptionalModel({ threadId: options.threadId, cwd: options.cwd, approvalPolicy: options.approvalPolicy, sandbox: options.sandbox }, options.model), requestTimeoutMs), "thread/resume");
|
||||
threadId = requireNestedId(threadResponse, "thread/resume", "thread");
|
||||
events.push({ type: "backend_status", payload: { phase: "thread/resume:completed", threadId } });
|
||||
} catch (error) {
|
||||
const failure = normalizeFailure(error);
|
||||
if (!isStaleThreadResumeFailure(failure)) throw error;
|
||||
events.push({
|
||||
type: "backend_status",
|
||||
payload: {
|
||||
phase: "thread/resume:stale-thread-fallback",
|
||||
requestedThreadId: options.threadId,
|
||||
failureKind: failure.failureKind,
|
||||
message: failure.message,
|
||||
fallback: "thread/start",
|
||||
},
|
||||
});
|
||||
threadId = await startThread("thread/start:after-stale-resume");
|
||||
}
|
||||
} else {
|
||||
threadId = await startThread();
|
||||
}
|
||||
|
||||
const turnResponse = requireResponseRecord(await client.request("turn/start", withOptionalModel({ threadId, input: textInput(options.prompt), cwd: options.cwd, approvalPolicy: options.approvalPolicy }, options.model), requestTimeoutMs), "turn/start");
|
||||
turnId = requireNestedId(turnResponse, "turn/start", "turn");
|
||||
@@ -644,6 +667,12 @@ function normalizeFailure(error: unknown): CodexStdioFailure {
|
||||
return new CodexStdioFailure(classifyMessageFailureKind(message, "backend-protocol-error"), message, "codex-stdio");
|
||||
}
|
||||
|
||||
function isStaleThreadResumeFailure(error: CodexStdioFailure): boolean {
|
||||
if (error.phase !== "response:thread/resume") return false;
|
||||
const text = `${error.message}\n${JSON.stringify(error.details)}`.toLowerCase();
|
||||
return /no rollout found for thread id/u.test(text);
|
||||
}
|
||||
|
||||
function classifyCodexErrorRecord(error: JsonRecord, fallback: FailureKind): FailureKind {
|
||||
const parts: string[] = [];
|
||||
if (typeof error.message === "string") parts.push(error.message);
|
||||
|
||||
@@ -68,6 +68,26 @@ const selfTest: SelfTestCase = async (context) => {
|
||||
assert.equal(assistantEvents.length, 1, "backend should emit one final assistant_message event");
|
||||
assert.equal(eventPayload(assistantEvents[0] ?? { payload: {} }).text, "Final answer only.");
|
||||
|
||||
const staleResume = await createStaleThreadRun(client, context);
|
||||
const staleResumeResult = await runOnce({
|
||||
managerUrl: server.baseUrl,
|
||||
runId: staleResume.runId,
|
||||
commandId: staleResume.commandId,
|
||||
codexCommand: context.fakeCodexCommand,
|
||||
codexArgs: context.fakeCodexArgs,
|
||||
codexHome: context.codexHome,
|
||||
env: { CODEX_HOME: context.codexHome, AGENTRUN_FAKE_CODEX_MODE: "resume-no-rollout" },
|
||||
oneShot: true,
|
||||
}) as JsonRecord;
|
||||
assert.equal(staleResumeResult.terminalStatus, "completed", "stale thread resume should fall back to thread/start and complete the turn");
|
||||
const staleEnvelope = await client.get(`/api/v1/runs/${staleResume.runId}/commands/${staleResume.commandId}/result`) as JsonRecord;
|
||||
assert.equal(staleEnvelope.terminalStatus, "completed");
|
||||
assert.equal((staleEnvelope.sessionRef as JsonRecord).threadId, "thread_selftest_1");
|
||||
const staleEvents = await client.get(`/api/v1/runs/${staleResume.runId}/events?afterSeq=0&limit=100`) as { items?: Array<{ type: string; payload: unknown }> };
|
||||
assert.ok(staleEvents.items?.some((event) => event.type === "backend_status" && eventPayload(event).phase === "thread/resume:stale-thread-fallback"), "stale resume fallback event should be visible");
|
||||
assert.ok(staleEvents.items?.some((event) => event.type === "backend_status" && eventPayload(event).phase === "thread/start:after-stale-resume:completed"), "fallback should start a fresh thread");
|
||||
assertNoSecretLeak(staleEvents);
|
||||
|
||||
await runFailureCase({ client, managerUrl: server.baseUrl, context, mode: "missing-turn-result", expectedStatus: "failed", expectedFailureKind: "backend-response-invalid" });
|
||||
await runFailureCase({ client, managerUrl: server.baseUrl, context, mode: "provider-401-rpc-error", expectedStatus: "failed", expectedFailureKind: "provider-auth-failed" });
|
||||
await runFailureCase({ client, managerUrl: server.baseUrl, context, mode: "provider-429-terminal", expectedStatus: "failed", expectedFailureKind: "provider-rate-limited" });
|
||||
@@ -79,7 +99,7 @@ const selfTest: SelfTestCase = async (context) => {
|
||||
await runSecretFailureCase({ client, managerUrl: server.baseUrl, context });
|
||||
await runSpawnFailureCase({ client, managerUrl: server.baseUrl, context });
|
||||
|
||||
return { name: "codex-stdio", tests: ["runner-lease-heartbeat", "codex-stdio-fake-turn", "codex-stdio-projected-writable-home", "codex-stdio-deepseek-profile-fake-turn", "codex-stdio-deepseek-missing-secret-no-fallback", "codex-stdio-config-model-authoritative", "codex-stdio-explicit-model-forwarded", "codex-stdio-final-agent-message-only", "codex-stdio-missing-turn-result", "codex-stdio-provider-auth-failed", "codex-stdio-provider-rate-limited", "codex-stdio-provider-503-rpc-error", "codex-stdio-provider-503-terminal", "codex-stdio-provider-503-retry-event", "codex-stdio-invalid-json", "codex-stdio-timeout", "codex-stdio-secret-unavailable", "codex-stdio-spawn-failure"] };
|
||||
return { name: "codex-stdio", tests: ["runner-lease-heartbeat", "codex-stdio-fake-turn", "codex-stdio-projected-writable-home", "codex-stdio-deepseek-profile-fake-turn", "codex-stdio-deepseek-missing-secret-no-fallback", "codex-stdio-config-model-authoritative", "codex-stdio-explicit-model-forwarded", "codex-stdio-final-agent-message-only", "codex-stdio-stale-thread-fallback", "codex-stdio-missing-turn-result", "codex-stdio-provider-auth-failed", "codex-stdio-provider-rate-limited", "codex-stdio-provider-503-rpc-error", "codex-stdio-provider-503-terminal", "codex-stdio-provider-503-retry-event", "codex-stdio-invalid-json", "codex-stdio-timeout", "codex-stdio-secret-unavailable", "codex-stdio-spawn-failure"] };
|
||||
} finally {
|
||||
await new Promise<void>((resolve) => server.server.close(() => resolve()));
|
||||
}
|
||||
@@ -116,6 +136,30 @@ function eventPayload(event: { payload: unknown }): JsonRecord {
|
||||
return typeof event.payload === "object" && event.payload !== null && !Array.isArray(event.payload) ? event.payload as JsonRecord : {};
|
||||
}
|
||||
|
||||
async function createStaleThreadRun(client: ManagerClient, context: SelfTestContext): Promise<{ runId: string; commandId: string }> {
|
||||
const run = await client.post("/api/v1/runs", {
|
||||
tenantId: "unidesk",
|
||||
projectId: "pikasTech/unidesk",
|
||||
workspaceRef: { kind: "host-path", path: context.workspace },
|
||||
sessionRef: { sessionId: "selftest-stale-thread-session", conversationId: "selftest-stale-thread-session", threadId: "thread_missing_rollout" },
|
||||
providerId: "G14",
|
||||
backendProfile: "codex",
|
||||
executionPolicy: {
|
||||
sandbox: "workspace-write",
|
||||
approval: "never",
|
||||
timeoutMs: 15_000,
|
||||
network: "default",
|
||||
secretScope: {
|
||||
allowCredentialEcho: false,
|
||||
providerCredentials: [{ profile: "codex", secretRef: { name: "agentrun-v01-provider-codex", keys: ["auth.json", "config.toml"], mountPath: context.codexHome } }],
|
||||
},
|
||||
},
|
||||
traceSink: null,
|
||||
}) as { id: string };
|
||||
const command = await client.post(`/api/v1/runs/${run.id}/commands`, { type: "turn", payload: { prompt: "hello stale thread" }, idempotencyKey: "selftest-stale-thread-fallback" }) as { id: string };
|
||||
return { runId: run.id, commandId: command.id };
|
||||
}
|
||||
|
||||
async function runSecretFailureCase(options: { client: ManagerClient; managerUrl: string; context: SelfTestContext }): Promise<void> {
|
||||
const item = await createRunWithCommand(options.client, options.context, "failure missing secret files", "selftest-secret-unavailable", 3_000);
|
||||
const result = await runOnce({
|
||||
|
||||
@@ -36,6 +36,10 @@ for await (const line of rl) {
|
||||
}
|
||||
if (message.method === "thread/resume") {
|
||||
observedThreadModel = Object.hasOwn(message.params ?? {}, "model");
|
||||
if (mode === "resume-no-rollout") {
|
||||
respond(message.id, null, { code: -32000, message: `no rollout found for thread id ${String(message.params?.threadId ?? "unknown")}` });
|
||||
continue;
|
||||
}
|
||||
if (mode === "reject-unexpected-model" && observedThreadModel) {
|
||||
respond(message.id, null, { code: -32000, message: "thread/resume unexpectedly included model" });
|
||||
continue;
|
||||
|
||||
Reference in New Issue
Block a user