Merge pull request #128 from pikasTech/fix/issue123-result-envelope

修正 provider retry 与完成结果归因
This commit is contained in:
Lyon
2026-06-10 00:45:43 +08:00
committed by GitHub
4 changed files with 57 additions and 3 deletions
+2
View File
@@ -1252,6 +1252,8 @@ function isProviderHttpErrorMessage(text: string): boolean {
function isProviderUnavailableMessage(text: string): boolean { function isProviderUnavailableMessage(text: string): boolean {
if (/\b(?:http(?:\s+status)?|status(?:\s+code)?|code)\s*[:=]?\s*5\d\d\b/u.test(text)) return true; if (/\b(?:http(?:\s+status)?|status(?:\s+code)?|code)\s*[:=]?\s*5\d\d\b/u.test(text)) return true;
if (/\b5\d\d\b/u.test(text) && /service unavailable|bad gateway|gateway timeout|internal server error|provider|upstream|response\s*stream\s*disconnected|responsestreamdisconnected/u.test(text)) return true; if (/\b5\d\d\b/u.test(text) && /service unavailable|bad gateway|gateway timeout|internal server error|provider|upstream|response\s*stream\s*disconnected|responsestreamdisconnected/u.test(text)) return true;
if (/response\s*stream\s*disconnected|responsestreamdisconnected|stream disconnected before completion/u.test(text)) return true;
if (/connection refused|econnrefused|connection reset|websocket.*(?:refused|unavailable|closed)/u.test(text)) return true;
if (/service unavailable|temporar(?:y|ily) unavailable|provider (?:is )?unavailable|provider availability|upstream (?:is )?unavailable/u.test(text)) return true; if (/service unavailable|temporar(?:y|ily) unavailable|provider (?:is )?unavailable|provider availability|upstream (?:is )?unavailable/u.test(text)) return true;
return false; return false;
} }
+14 -2
View File
@@ -37,8 +37,8 @@ export async function buildRunResult(store: AgentRunStore, runId: string, comman
const terminalEventStatus = terminalFromEvents(scopedEvents); const terminalEventStatus = terminalFromEvents(scopedEvents);
const terminal = commandTerminal ?? terminalEventStatus ?? run.terminalStatus; const terminal = commandTerminal ?? terminalEventStatus ?? run.terminalStatus;
const terminalSource = commandTerminal ? "command-record" : terminalEventStatus ? "terminal_status-event" : run.terminalStatus ? "run-record" : "none"; const terminalSource = commandTerminal ? "command-record" : terminalEventStatus ? "terminal_status-event" : run.terminalStatus ? "run-record" : "none";
const failureKind = command ? failureKindFromEvents(scopedEvents) : run.failureKind ?? failureKindFromEvents(scopedEvents); const failureKind = resultFailureKind(run, command, scopedEvents, terminal);
const failureMessage = command ? messageFromEvents(scopedEvents) : run.failureMessage ?? messageFromEvents(scopedEvents); const failureMessage = resultFailureMessage(run, command, scopedEvents, terminal);
const reply = assistantReply(scopedEvents); const reply = assistantReply(scopedEvents);
const blocker = terminal === "blocked" || terminal === "failed" ? { failureKind, message: failureMessage } : null; const blocker = terminal === "blocked" || terminal === "failed" ? { failureKind, message: failureMessage } : null;
const liveness = livenessSnapshot(run, command, events, scopedEvents, terminal); const liveness = livenessSnapshot(run, command, events, scopedEvents, terminal);
@@ -305,6 +305,12 @@ function failureKindFromEvents(events: RunEvent[]): string | null {
return null; return null;
} }
function resultFailureKind(run: RunRecord, command: CommandRecord | null, events: RunEvent[], terminal: TerminalStatus | null): string | null {
if (terminal === "completed") return null;
if (command) return failureKindFromEvents(events);
return run.failureKind ?? failureKindFromEvents(events);
}
function messageFromEvents(events: RunEvent[]): string | null { function messageFromEvents(events: RunEvent[]): string | null {
for (const event of [...events].reverse()) { for (const event of [...events].reverse()) {
const value = event.payload.message; const value = event.payload.message;
@@ -313,6 +319,12 @@ function messageFromEvents(events: RunEvent[]): string | null {
return null; return null;
} }
function resultFailureMessage(run: RunRecord, command: CommandRecord | null, events: RunEvent[], terminal: TerminalStatus | null): string | null {
if (terminal === "completed") return null;
if (command) return messageFromEvents(events);
return run.failureMessage ?? messageFromEvents(events);
}
function assistantReply(events: RunEvent[]): AssistantReplySummary { function assistantReply(events: RunEvent[]): AssistantReplySummary {
const assistantEvents = events.filter((event) => event.type === "assistant_message"); const assistantEvents = events.filter((event) => event.type === "assistant_message");
const latestAuthoritative = [...assistantEvents].reverse().find((event) => (event.payload.replyAuthority === true || event.payload.final === true) && textPayload(event.payload).length > 0); const latestAuthoritative = [...assistantEvents].reverse().find((event) => (event.payload.replyAuthority === true || event.payload.final === true) && textPayload(event.payload).length > 0);
+24 -1
View File
@@ -233,6 +233,7 @@ const selfTest: SelfTestCase = async (context) => {
await runFailureCase({ client, managerUrl: server.baseUrl, context, mode: "provider-503-terminal", expectedStatus: "failed", expectedFailureKind: "provider-http-error" }); await runFailureCase({ client, managerUrl: server.baseUrl, context, mode: "provider-503-terminal", expectedStatus: "failed", expectedFailureKind: "provider-http-error" });
await runFailureCase({ client, managerUrl: server.baseUrl, context, mode: "provider-unavailable-terminal", expectedStatus: "failed", expectedFailureKind: "provider-unavailable" }); await runFailureCase({ client, managerUrl: server.baseUrl, context, mode: "provider-unavailable-terminal", expectedStatus: "failed", expectedFailureKind: "provider-unavailable" });
await runFailureCase({ client, managerUrl: server.baseUrl, context, mode: "provider-503-retry-event", expectedStatus: "failed", expectedFailureKind: "provider-stream-disconnected", expectRetryError: true }); await runFailureCase({ client, managerUrl: server.baseUrl, context, mode: "provider-503-retry-event", expectedStatus: "failed", expectedFailureKind: "provider-stream-disconnected", expectRetryError: true });
await runRetryThenCompletedCase({ client, managerUrl: server.baseUrl, context });
await runFailureCase({ client, managerUrl: server.baseUrl, context, mode: "invalid-json", expectedStatus: "failed", expectedFailureKind: "backend-json-parse-error" }); await runFailureCase({ client, managerUrl: server.baseUrl, context, mode: "invalid-json", expectedStatus: "failed", expectedFailureKind: "backend-json-parse-error" });
await runFailureCase({ client, managerUrl: server.baseUrl, context, mode: "missing-terminal", expectedStatus: "failed", expectedFailureKind: "backend-timeout", timeoutMs: 500 }); await runFailureCase({ client, managerUrl: server.baseUrl, context, mode: "missing-terminal", expectedStatus: "failed", expectedFailureKind: "backend-timeout", timeoutMs: 500 });
await runSlowProgressIdleCase({ client, managerUrl: server.baseUrl, context }); await runSlowProgressIdleCase({ client, managerUrl: server.baseUrl, context });
@@ -244,12 +245,34 @@ const selfTest: SelfTestCase = async (context) => {
await runSessionStorageSubdirCase({ client, managerUrl: server.baseUrl, context }); await runSessionStorageSubdirCase({ client, managerUrl: server.baseUrl, context });
await runSessionStorageNoSecretLeakCase({ client, managerUrl: server.baseUrl, context }); await runSessionStorageNoSecretLeakCase({ client, managerUrl: server.baseUrl, context });
return { name: "codex-stdio", tests: ["runner-lease-heartbeat", "runner-lease-conflict-recovery", "codex-stdio-fake-turn", "codex-stdio-k8s-sandbox-override", "codex-stdio-projected-writable-home", "codex-stdio-deepseek-profile-fake-turn", "codex-stdio-dsflash-go-profile-fake-turn", "codex-stdio-dsflash-go-config-metadata", "codex-stdio-minimax-m3-profile-fake-turn", "codex-stdio-deepseek-missing-secret-no-fallback", "codex-stdio-minimax-m3-missing-secret-no-fallback", "codex-stdio-config-model-authoritative", "codex-stdio-explicit-model-forwarded", "codex-stdio-final-agent-message-only", "codex-stdio-web-search-progress", "codex-stdio-stale-thread-resume-failed", "codex-stdio-live-tool-events", "codex-stdio-interrupt-before-turn-start-response", "codex-stdio-hard-timeout-during-tool-progress", "codex-stdio-noisy-reasoning-suppression", "codex-stdio-missing-turn-result", "codex-stdio-provider-auth-failed", "codex-stdio-provider-rate-limited", "codex-stdio-provider-invalid-tool-call", "codex-stdio-provider-compact-unsupported", "codex-stdio-provider-stream-disconnected", "codex-stdio-provider-503-rpc-error", "codex-stdio-provider-503-terminal", "codex-stdio-provider-unavailable", "codex-stdio-provider-503-retry-event", "codex-stdio-invalid-json", "codex-stdio-timeout", "codex-stdio-idle-timeout-progress-refresh", "codex-stdio-command-failure-keeps-run-open", "codex-stdio-secret-unavailable", "codex-stdio-spawn-failure"] }; return { name: "codex-stdio", tests: ["runner-lease-heartbeat", "runner-lease-conflict-recovery", "codex-stdio-fake-turn", "codex-stdio-k8s-sandbox-override", "codex-stdio-projected-writable-home", "codex-stdio-deepseek-profile-fake-turn", "codex-stdio-dsflash-go-profile-fake-turn", "codex-stdio-dsflash-go-config-metadata", "codex-stdio-minimax-m3-profile-fake-turn", "codex-stdio-deepseek-missing-secret-no-fallback", "codex-stdio-minimax-m3-missing-secret-no-fallback", "codex-stdio-config-model-authoritative", "codex-stdio-explicit-model-forwarded", "codex-stdio-final-agent-message-only", "codex-stdio-web-search-progress", "codex-stdio-stale-thread-resume-failed", "codex-stdio-live-tool-events", "codex-stdio-interrupt-before-turn-start-response", "codex-stdio-hard-timeout-during-tool-progress", "codex-stdio-noisy-reasoning-suppression", "codex-stdio-missing-turn-result", "codex-stdio-provider-auth-failed", "codex-stdio-provider-rate-limited", "codex-stdio-provider-invalid-tool-call", "codex-stdio-provider-compact-unsupported", "codex-stdio-provider-stream-disconnected", "codex-stdio-provider-503-rpc-error", "codex-stdio-provider-503-terminal", "codex-stdio-provider-unavailable", "codex-stdio-provider-503-retry-event", "codex-stdio-provider-refused-retry-recovered", "codex-stdio-invalid-json", "codex-stdio-timeout", "codex-stdio-idle-timeout-progress-refresh", "codex-stdio-command-failure-keeps-run-open", "codex-stdio-secret-unavailable", "codex-stdio-spawn-failure"] };
} finally { } finally {
await new Promise<void>((resolve) => server.server.close(() => resolve())); await new Promise<void>((resolve) => server.server.close(() => resolve()));
} }
}; };
async function runRetryThenCompletedCase(options: { client: ManagerClient; managerUrl: string; context: SelfTestContext }): Promise<void> {
const item = await createRunWithCommand(options.client, options.context, "retry then complete", "selftest-provider-refused-retry-recovered", 15_000);
const result = await runOnce({
managerUrl: options.managerUrl,
runId: item.runId,
codexCommand: options.context.fakeCodexCommand,
codexArgs: options.context.fakeCodexArgs,
codexHome: options.context.codexHome,
env: { CODEX_HOME: options.context.codexHome, AGENTRUN_FAKE_CODEX_MODE: "provider-refused-retry-then-completed" },
oneShot: true,
}) as JsonRecord;
assert.equal(result.terminalStatus, "completed");
assert.equal(result.failureKind, null);
const events = await options.client.get(`/api/v1/runs/${item.runId}/events?afterSeq=0&limit=100`) as { items?: Array<{ type: string; payload: unknown }> };
assert.equal(events.items?.some((event) => event.type === "error" && eventPayload(event).willRetry === true && eventPayload(event).failureKind === "provider-stream-disconnected"), true);
const envelope = await options.client.get(`/api/v1/runs/${item.runId}/commands/${item.commandId}/result`) as JsonRecord;
assert.equal(envelope.terminalStatus, "completed");
assert.equal(envelope.failureKind, null);
assert.equal(envelope.failureMessage, null);
assert.equal(envelope.completed, true);
}
async function runLeaseConflictRecoveryCase(options: { client: ManagerClient; managerUrl: string; context: SelfTestContext }): Promise<void> { async function runLeaseConflictRecoveryCase(options: { client: ManagerClient; managerUrl: string; context: SelfTestContext }): Promise<void> {
const item = await createRunWithCommand(options.client, options.context, "claim after stale lease", "selftest-runner-lease-conflict-recovery", 15_000); const item = await createRunWithCommand(options.client, options.context, "claim after stale lease", "selftest-runner-lease-conflict-recovery", 15_000);
const staleRunner = await options.client.post("/api/v1/runners/register", { const staleRunner = await options.client.post("/api/v1/runners/register", {
+17
View File
@@ -194,6 +194,23 @@ for await (const line of rl) {
respond(message.id, { turn }); respond(message.id, { turn });
continue; continue;
} }
if (mode === "provider-refused-retry-then-completed") {
turnCounter += 1;
const turn = { id: `turn_selftest_${turnCounter}`, status: "completed" };
notify("turn/started", { turn: { id: turn.id, status: "running" } });
notify("error", {
willRetry: true,
error: {
message: "Reconnecting... 5/5",
codexErrorInfo: { responseStreamDisconnected: { httpStatusCode: null } },
additionalDetails: "stream disconnected before completion: Connection refused (os error 111)",
},
});
notify("item/completed", { item: { id: "msg_refused_retry", type: "agentMessage", text: "retry recovered final" } });
notify("turn/completed", { turn });
respond(message.id, { turn });
continue;
}
if (mode === "multi-agent-message-final") { if (mode === "multi-agent-message-final") {
turnCounter += 1; turnCounter += 1;
const turn = { id: `turn_selftest_${turnCounter}`, status: "completed" }; const turn = { id: `turn_selftest_${turnCounter}`, status: "completed" };