From 21ff548b86846701dadaac890f065b130b4d521c Mon Sep 17 00:00:00 2001 From: AgentRun Codex Date: Fri, 12 Jun 2026 04:26:14 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E6=9A=B4=E9=9C=B2=20session=20follow-up?= =?UTF-8?q?=20=E5=8F=AF=E8=A7=81=E6=80=A7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- scripts/src/cli.ts | 89 ++++++++++++++++++--------------- src/mgr/result.ts | 55 ++++++++++++++------- src/mgr/server.ts | 121 +++++++++++++++++++++++++++++++++++++++++---- 3 files changed, 197 insertions(+), 68 deletions(-) diff --git a/scripts/src/cli.ts b/scripts/src/cli.ts index 0ee980d..e9ec195 100644 --- a/scripts/src/cli.ts +++ b/scripts/src/cli.ts @@ -213,7 +213,7 @@ function summarizeCommandShowResult(command: JsonValue, result: JsonValue, runId function summarizeResultEnvelope(record: JsonRecord | null): JsonRecord | null { if (!record) return null; return withoutFullRecordBytes(compactRecord(record, { - keys: ["runId", "commandId", "attemptId", "runnerId", "jobName", "namespace", "status", "runStatus", "commandState", "terminalStatus", "terminalSource", "completed", "failureKind", "failureMessage", "lastSeq", "eventCount", "scopedEventCount", "scopedLastSeq", "runnerJobCount"], + keys: ["ok", "executionOk", "runId", "commandId", "attemptId", "runnerId", "jobName", "namespace", "status", "runStatus", "commandState", "terminalStatus", "terminalSource", "providerTerminalFailure", "recoverableViaSession", "completed", "finalResponseAuthority", "finalResponseFallback", "needsContinuation", "failureKind", "failureMessage", "lastSeq", "eventCount", "scopedEventCount", "scopedLastSeq", "runnerJobCount"], })); } @@ -262,17 +262,16 @@ function summarizeDiagnosisRecord(record: JsonRecord | null): JsonRecord | null function summarizeFinalResponseRecord(record: JsonRecord | null): JsonRecord | null { if (!record) return null; - return withoutFullRecordBytes(compactRecord(record, { keys: ["seq", "source", "final", "replyAuthority", "textTruncated", "outputTruncated", "text"] })); + return withoutFullRecordBytes(compactRecord(record, { keys: ["seq", "source", "final", "replyAuthority", "authority", "fallback", "needsContinuation", "textTruncated", "outputTruncated", "text"] })); } function runCommandDrillDown(runId: string, commandId: string | null, sessionId: string | null, lastSeq: number): JsonRecord { return { - run: `./scripts/agentrun runs show ${runId}`, - runFull: `./scripts/agentrun runs show ${runId} --full`, - result: `./scripts/agentrun runs result ${runId}${commandId ? ` --command-id ${commandId}` : ""}`, - events: `./scripts/agentrun runs events ${runId} --after-seq ${lastSeq} --limit 100 --tail-summary`, - ...(commandId ? { command: `./scripts/agentrun commands show ${commandId} --run-id ${runId}`, commandResult: `./scripts/agentrun commands result ${commandId} --run-id ${runId}` } : {}), - ...(sessionId ? { trace: `./scripts/agentrun sessions trace ${sessionId} --after-seq ${lastSeq} --limit 100 --run-id ${runId}`, output: `./scripts/agentrun sessions output ${sessionId} --after-seq ${lastSeq} --limit 100 --run-id ${runId}` } : {}), + run: `describe run/${runId}`, + result: `result run/${runId}${commandId ? ` --command ${commandId}` : ""}`, + events: `events run/${runId} --after-seq ${lastSeq} --limit 100`, + ...(commandId ? { command: `describe command/${commandId} --run ${runId}`, commandResult: `result command/${commandId} --run ${runId}` } : {}), + ...(sessionId ? { logs: `logs session/${sessionId} --tail 100`, send: `send session/${sessionId} --prompt-stdin` } : {}), valuesPrinted: false, }; } @@ -405,11 +404,10 @@ export function summarizeQueueDispatchResult(result: JsonValue, taskId: string): fullResponseBytes: jsonByteLength(result), valuesPrinted: false, pollCommands: { - queue: `./scripts/agentrun queue show ${stringValue(task?.id) ?? taskId}`, - ...(runId ? { run: `./scripts/agentrun runs show ${runId}` } : {}), - ...(runId && commandId ? { command: `./scripts/agentrun commands show ${commandId} --run-id ${runId}` } : {}), - ...(runId ? { events: `./scripts/agentrun runs events ${runId} --after-seq 0 --limit 100 --tail-summary` } : {}), - ...(sessionId ? { trace: `./scripts/agentrun sessions trace ${sessionId} --after-seq 0 --limit 100`, output: `./scripts/agentrun sessions output ${sessionId} --after-seq 0 --limit 100` } : {}), + queue: `describe task/${stringValue(task?.id) ?? taskId}`, + ...(runId ? { run: `describe run/${runId}`, result: `result run/${runId}${commandId ? ` --command ${commandId}` : ""}`, events: `events run/${runId} --after-seq 0 --limit 100` } : {}), + ...(runId && commandId ? { command: `describe command/${commandId} --run ${runId}` } : {}), + ...(sessionId ? { logs: `logs session/${sessionId} --tail 100`, send: `send session/${sessionId} --prompt-stdin` } : {}), }, expandedOutput: { fullFlag: "--full", @@ -665,11 +663,11 @@ function summarizeSessionMutationResult(action: "session-cancel" | "session-read fullResponseBytes: jsonByteLength(result), valuesPrinted: false, drillDownCommands: { - show: `./scripts/agentrun sessions show ${sessionId} --reader-id cli`, - trace: `./scripts/agentrun sessions trace ${sessionId} --after-seq 0 --limit 100`, - output: `./scripts/agentrun sessions output ${sessionId} --after-seq 0 --limit 100`, - traceFull: `./scripts/agentrun sessions trace ${sessionId} --after-seq 0 --limit 100 --full`, - outputFull: `./scripts/agentrun sessions output ${sessionId} --after-seq 0 --limit 100 --full`, + show: `describe session/${sessionId}`, + logs: `logs session/${sessionId} --tail 100`, + send: `send session/${sessionId} --prompt-stdin`, + read: `ack session/${sessionId}`, + cancel: `cancel session/${sessionId}`, }, expandedOutput: { fullFlag: "--full", @@ -703,11 +701,13 @@ function summarizeSessionSendResult(result: JsonValue, sessionId: string, profil fullResponseBytes: jsonByteLength(result), valuesPrinted: false, drillDownCommands: { - show: `./scripts/agentrun sessions show ${sessionId} --reader-id cli`, - trace: `./scripts/agentrun sessions trace ${sessionId} --after-seq ${afterSeq} --limit 100`, - output: `./scripts/agentrun sessions output ${sessionId} --after-seq ${afterSeq} --limit 100`, - read: `./scripts/agentrun sessions read ${sessionId} --reader-id cli`, - cancel: `./scripts/agentrun sessions cancel ${sessionId}`, + show: `describe session/${sessionId}`, + logs: `logs session/${sessionId} --tail 100`, + ...(stringValue(run?.id) ? { events: `events run/${String(run?.id)} --after-seq ${afterSeq} --limit 100` } : {}), + ...(stringValue(run?.id) && stringValue(command?.id) ? { result: `result run/${String(run?.id)} --command ${String(command?.id)}` } : {}), + send: `send session/${sessionId} --prompt-stdin`, + read: `ack session/${sessionId}`, + cancel: `cancel session/${sessionId}`, }, expandedOutput: { fullFlag: "--full", @@ -796,10 +796,10 @@ export function summarizeQueueTaskShowResult(result: JsonValue, taskId: string): fullResponseBytes: jsonByteLength(result), valuesPrinted: false, pollCommands: { - full: `./scripts/agentrun queue show ${taskId} --full`, - ...(runId ? { run: `./scripts/agentrun runs show ${runId}`, events: `./scripts/agentrun runs events ${runId} --after-seq ${afterSeq} --limit 100 --tail-summary` } : {}), - ...(runId && commandId ? { command: `./scripts/agentrun commands show ${commandId} --run-id ${runId}` } : {}), - ...(sessionId ? { trace: `./scripts/agentrun sessions trace ${sessionId} --after-seq 0 --limit 100`, output: `./scripts/agentrun sessions output ${sessionId} --after-seq 0 --limit 100` } : {}), + full: `describe task/${taskId} --full`, + ...(runId ? { run: `describe run/${runId}`, result: `result run/${runId}${commandId ? ` --command ${commandId}` : ""}`, events: `events run/${runId} --after-seq ${afterSeq} --limit 100` } : {}), + ...(runId && commandId ? { command: `describe command/${commandId} --run ${runId}` } : {}), + ...(sessionId ? { logs: `logs session/${sessionId} --tail 100`, send: `send session/${sessionId} --prompt-stdin` } : {}), }, }; } @@ -814,6 +814,7 @@ export function summarizeQueueCommanderSnapshot(result: JsonValue, options: Queu queue: stringValue(record.queue), readerId: stringValue(record.readerId), stats: summarizeQueueStats(jsonRecordValue(record.stats)), + activeSessionCount: numberValue(record.activeSessionCount), sourceCount: items.length, displayedCount: selected.length, limit: options.limit, @@ -824,13 +825,15 @@ export function summarizeQueueCommanderSnapshot(result: JsonValue, options: Queu fullResponseBytes: jsonByteLength(result), valuesPrinted: false, drillDownCommands: { - full: "./scripts/agentrun queue commander --reader-id cli --full", - raw: "./scripts/agentrun queue commander --reader-id cli --raw", - item: "./scripts/agentrun queue show ", - run: "./scripts/agentrun runs show ", - events: "./scripts/agentrun runs events --after-seq --limit 100 --tail-summary", - command: "./scripts/agentrun commands show --run-id ", - sessionHint: "session trace/output 只在 queue show 的 per-task pollCommands 中按实际 sessionId 输出", + full: "get tasks --queue commander -o wide", + item: "describe task/", + run: "describe run/", + events: "events run/ --after-seq --limit 100", + result: "result run/ --command ", + command: "describe command/ --run ", + logs: "logs session/ --tail 100", + send: "send session/ --prompt-stdin", + sessionHint: "细节按实际 run/session 走 result/events/logs;续跑只用 send session/", }, }; } @@ -948,9 +951,11 @@ function summarizeQueueTaskWithAttempt(record: JsonRecord | null, fallbackTaskId const summary = summarizeQueueTaskRecord(record, fallbackTaskId); const latestAttempt = summarizeAttemptRecord(jsonRecordValue(record?.latestAttempt)); const supervisor = summarizeSupervisorRecord(jsonRecordValue(record?.supervisor)); + const activeSession = summarizeActiveSessionRecord(jsonRecordValue(record?.activeSession)); const sessionRef = jsonRecordValue(record?.sessionRef); if (latestAttempt) summary.latestAttempt = latestAttempt; if (supervisor) summary.supervisor = supervisor; + if (activeSession) summary.activeSession = activeSession; const sessionId = stringValue(sessionRef?.sessionId) ?? stringValue(latestAttempt?.sessionId); if (sessionId) summary.sessionId = sessionId; if (record?.readCursor !== undefined) summary.read = record.readCursor !== null; @@ -994,7 +999,7 @@ function summarizeSupervisorRecord(record: JsonRecord | null): JsonRecord | null const terminalClassification = jsonRecordValue(record.terminalClassification); const diagnosis = jsonRecordValue(record.diagnosis); return { - ...withoutFullRecordBytes(compactRecord(record, { keys: ["phase", "active", "status", "terminalStatus", "failureKind", "runId", "commandId", "lastSeq", "lastEventAt", "lastEventAgeMs", "leaseRemainingMs", "leaseExpired"] })), + ...withoutFullRecordBytes(compactRecord(record, { keys: ["source", "sessionId", "executionState", "attentionState", "active", "activeRunId", "activeCommandId", "attemptRunId", "attemptCommandId", "phase", "status", "terminalStatus", "failureKind", "runId", "commandId", "lastSeq", "lastEventAt", "lastEventAgeMs", "leaseRemainingMs", "leaseExpired"] })), diagnosis: diagnosis ? summarizeDiagnosisRecord(diagnosis) : null, terminalClassification: terminalClassification ? summarizeTerminalClassification(terminalClassification) : null, lastActivity: lastActivity ? withoutFullRecordBytes(compactRecord(lastActivity, { keys: ["sourceSeq", "eventId", "activityKind", "type", "status", "toolName", "itemId", "ageMs", "summary"] })) : null, @@ -1004,8 +1009,13 @@ function summarizeSupervisorRecord(record: JsonRecord | null): JsonRecord | null }; } +function summarizeActiveSessionRecord(record: JsonRecord | null): JsonRecord | null { + if (!record) return null; + return withoutFullRecordBytes(compactRecord(record, { keys: ["sessionId", "sessionPath", "executionState", "attentionState", "active", "activeRunId", "activeCommandId", "lastRunId", "lastCommandId", "terminalStatus", "failureKind", "lastActivityAt", "updatedAt"] })); +} + function summarizeTerminalClassification(record: JsonRecord): JsonRecord { - return withoutFullRecordBytes(compactRecord(record, { keys: ["category", "confidence", "providerEvidence", "providerInterruption", "providerInterruptionKnown", "providerInterruptionReason", "retryInterruptionObserved", "retryInterruptionSeq", "retryInterruptionKind", "hardTimeout", "idleTimeout", "timeoutKind", "timeoutState", "transportDisconnectObserved", "transportDisconnectSeq", "reason"] })); + return withoutFullRecordBytes(compactRecord(record, { keys: ["category", "confidence", "terminalStatus", "failureKind", "providerEvidence", "providerInterruption", "providerInterruptionKnown", "providerInterruptionReason", "retryInterruptionObserved", "retryInterruptionSeq", "retryInterruptionKind", "hardTimeout", "idleTimeout", "timeoutKind", "timeoutState", "transportDisconnectObserved", "transportDisconnectSeq", "reason"] })); } function summarizeRecoveryActions(value: JsonValue | undefined): JsonValue[] { @@ -1215,10 +1225,9 @@ async function sessionCreate(args: ParsedArgs, positionalSessionId: string | nul pvc: created.pvc, storage, pollCommands: { - show: `./scripts/agentrun sessions show ${sessionId} --reader-id cli`, - storage: `./scripts/agentrun sessions storage ${sessionId}`, - trace: `./scripts/agentrun sessions trace ${sessionId} --after-seq 0 --limit 100`, - send: `./scripts/agentrun sessions send ${sessionId} --prompt "..."`, + show: `describe session/${sessionId}`, + logs: `logs session/${sessionId} --tail 100`, + send: `send session/${sessionId} --prompt-stdin`, }, }; } diff --git a/src/mgr/result.ts b/src/mgr/result.ts index b90d2ae..0fcec18 100644 --- a/src/mgr/result.ts +++ b/src/mgr/result.ts @@ -57,14 +57,20 @@ export async function buildRunResult(store: AgentRunStore, runId: string, comman const failureDetails = resultFailureDetails(scopedEvents, terminal); const reply = assistantReply(scopedEvents); const responseAuthority = finalResponseAuthority(reply); - const needsContinuation = terminal === "completed" && responseAuthority !== "authoritative"; - const completionEvidence = completionEvidenceSummary({ terminal, terminalSource, reply, responseAuthority, needsContinuation, sessionId: run.sessionRef?.sessionId ?? null }); + const providerTerminalFailure = terminal === "failed" && providerFailureCategory(failureKind) !== null; + const stableResponseAuthority = providerTerminalFailure ? "provider-failure" : responseAuthority; + const finalResponseFallback = !providerTerminalFailure && responseAuthority === "fallback"; + const needsContinuation = (terminal === "completed" && responseAuthority !== "authoritative") || (providerTerminalFailure && Boolean(run.sessionRef?.sessionId)); + const completionEvidence = completionEvidenceSummary({ terminal, terminalSource, reply, responseAuthority, needsContinuation, sessionId: run.sessionRef?.sessionId ?? null, providerTerminalFailure, failureKind }); const blocker = terminal === "blocked" || terminal === "failed" ? { failureKind, message: failureMessage, details: failureDetails } : null; const liveness = livenessSnapshot(run, command, events, scopedEvents, terminal, failureKind, failureMessage, { responseAuthority, needsContinuation }); const terminalClassification = terminalClassificationSummary({ terminal, terminalSource, failureKind, failureMessage, liveness }); const diagnosis = runDiagnosis({ run, command, latestJob, events, terminalClassification, liveness, terminalStatus: terminal, failureKind, failureMessage }); const steerDelivery = command?.type === "steer" ? steerDeliverySummary(events, command.id) : null; + const executionOk = terminal === null ? null : terminal === "completed" && !providerTerminalFailure; return { + ok: executionOk === true, + executionOk, runId: run.id, commandId: command?.id ?? commandId ?? null, attemptId: latestJob?.attemptId ?? attemptFromEvents(events), @@ -76,9 +82,11 @@ export async function buildRunResult(store: AgentRunStore, runId: string, comman commandState: command?.state ?? null, terminalStatus: terminal, terminalSource, + providerTerminalFailure, + recoverableViaSession: Boolean(run.sessionRef?.sessionId && (needsContinuation || terminal === "failed" || terminal === "blocked" || terminal === "cancelled")), completed: terminal === "completed", - finalResponseAuthority: responseAuthority, - finalResponseFallback: responseAuthority === "fallback", + finalResponseAuthority: stableResponseAuthority, + finalResponseFallback, needsContinuation, completionEvidence, reply: reply.text, @@ -88,8 +96,8 @@ export async function buildRunResult(store: AgentRunStore, runId: string, comman source: reply.source, final: reply.final, replyAuthority: reply.replyAuthority, - authority: responseAuthority, - fallback: responseAuthority === "fallback", + authority: stableResponseAuthority, + fallback: finalResponseFallback, needsContinuation, textTruncated: reply.textTruncated, outputTruncated: reply.outputTruncated, @@ -447,21 +455,22 @@ function recoveryActions(input: { run: RunRecord; command: CommandRecord | null; ]; if (sessionId) actions.push(recoveryDescriptor({ action: "poll-output", operation: "logs", resourceKind: "session", resourceName: sessionId, runId: run.id, commandId: command?.id ?? null, sessionId, afterSeq, limit: 100 })); if (active) { - if (sessionId) actions.push(recoveryDescriptor({ action: "send-session", operation: "send", resourceKind: "session", resourceName: sessionId, runId: run.id, commandId: command?.id ?? null, sessionId, inputKind: "prompt", reasonHint: "manager 会按当前 session 状态自动决定内部 steer 或新 turn" })); + if (sessionId) actions.push(recoveryDescriptor({ action: "send-session", operation: "send", resourceKind: "session", resourceName: sessionId, runId: run.id, commandId: command?.id ?? null, sessionId, inputKind: "prompt", reasonHint: "manager 会按当前 session 状态自动路由后续 prompt;用户入口保持 send session/" })); if (command) actions.push(recoveryDescriptor({ action: "cancel-command", operation: "cancel", resourceKind: "command", resourceName: command.id, runId: run.id, commandId: command.id, sessionId, reasonRequired: true, reasonHint: "operator supplied cancel reason" })); else actions.push(recoveryDescriptor({ action: "cancel-run", operation: "cancel", resourceKind: "run", resourceName: run.id, runId: run.id, sessionId, reasonRequired: true, reasonHint: "operator supplied cancel reason" })); return actions; } if (needsContinuation && sessionId) { if (command) actions.push(recoveryDescriptor({ action: "inspect-result", operation: "result", resourceKind: "command", resourceName: command.id, runId: run.id, commandId: command.id, sessionId })); - actions.push(recoveryDescriptor({ action: "continue-session", operation: "send", resourceKind: "session", resourceName: sessionId, runId: run.id, commandId: command?.id ?? null, sessionId, reason: `final-response-${finalResponseAuthority}`, inputKind: "prompt", reasonHint: "命令已 terminal completed,但没有 authoritative final response;管理者应先读 trace/output,再用同一 session 发送后续 prompt。" })); + const providerTerminalFailure = terminal === "failed" && providerFailureCategory(failureKind) !== null; + actions.push(recoveryDescriptor({ action: "continue-session", operation: "send", resourceKind: "session", resourceName: sessionId, runId: run.id, commandId: command?.id ?? null, sessionId, reason: providerTerminalFailure ? `provider-terminal-${failureKind ?? "failure"}` : `final-response-${finalResponseAuthority}`, inputKind: "prompt", reasonHint: providerTerminalFailure ? "终态 provider failure;管理者应先读 result/events/logs,确认 provider 错误与最新上下文,再用同一 session 发送后续 prompt。" : "命令已 terminal completed,但没有 authoritative final response;管理者应先读 result/events/logs,再用同一 session 发送后续 prompt。" })); return actions; } if (terminal === "failed" || terminal === "blocked" || terminal === "cancelled") { if (command) actions.push(recoveryDescriptor({ action: "inspect-result", operation: "result", resourceKind: "command", resourceName: command.id, runId: run.id, commandId: command.id, sessionId })); if (sessionId) actions.push(recoveryDescriptor({ action: "continue-session", operation: "send", resourceKind: "session", resourceName: sessionId, runId: run.id, commandId: command?.id ?? null, sessionId, inputKind: "prompt" })); - if (failureKind === "backend-timeout") actions.push(recoveryDescriptor({ action: "split-task", operation: "operator-decision", resourceKind: sessionId ? "session" : "run", resourceName: sessionId ?? run.id, runId: run.id, commandId: command?.id ?? null, sessionId, reason: "backend-timeout", reasonHint: "先由管理者读取 trace/result,总结下一步,再把后续 prompt 发到同一 session;必要时把大 patch / 长工具链拆成更短 turn。", failureMessage: failureMessage ? boundedTextSummary(failureMessage, { limitChars: 200 }).text as string : null })); - else actions.push(recoveryDescriptor({ action: "retry-or-split", operation: "operator-decision", resourceKind: sessionId ? "session" : "run", resourceName: sessionId ?? run.id, runId: run.id, commandId: command?.id ?? null, sessionId, reason: failureKind ?? "terminal", reasonHint: "先读 trace/output 的 detail id,再决定继续同 session、重跑或拆分" })); + if (failureKind === "backend-timeout") actions.push(recoveryDescriptor({ action: "split-task", operation: "operator-decision", resourceKind: sessionId ? "session" : "run", resourceName: sessionId ?? run.id, runId: run.id, commandId: command?.id ?? null, sessionId, reason: "backend-timeout", reasonHint: "先由管理者读取 result/events/logs,总结下一步,再把后续 prompt 发到同一 session;必要时把大 patch / 长工具链拆成更短步骤。", failureMessage: failureMessage ? boundedTextSummary(failureMessage, { limitChars: 200 }).text as string : null })); + else actions.push(recoveryDescriptor({ action: "retry-or-split", operation: "operator-decision", resourceKind: sessionId ? "session" : "run", resourceName: sessionId ?? run.id, runId: run.id, commandId: command?.id ?? null, sessionId, reason: failureKind ?? "terminal", reasonHint: "先读 result/events/logs,再决定继续同 session、重跑或拆分" })); } return actions; } @@ -491,25 +500,29 @@ function finalResponseAuthority(reply: AssistantReplySummary): "authoritative" | return reply.text.length > 0 ? "fallback" : "missing"; } -function completionEvidenceSummary(input: { terminal: TerminalStatus | null; terminalSource: string; reply: AssistantReplySummary; responseAuthority: string; needsContinuation: boolean; sessionId: string | null }): JsonRecord { - const recommendedAction = input.needsContinuation && input.sessionId ? recoveryDescriptor({ action: "continue-session", operation: "send", resourceKind: "session", resourceName: input.sessionId, sessionId: input.sessionId, inputKind: "prompt", reason: `final-response-${input.responseAuthority}` }) : null; +function completionEvidenceSummary(input: { terminal: TerminalStatus | null; terminalSource: string; reply: AssistantReplySummary; responseAuthority: string; needsContinuation: boolean; sessionId: string | null; providerTerminalFailure: boolean; failureKind: FailureKind | null }): JsonRecord { + const recommendedAction = input.needsContinuation && input.sessionId ? recoveryDescriptor({ action: "continue-session", operation: "send", resourceKind: "session", resourceName: input.sessionId, sessionId: input.sessionId, inputKind: "prompt", reason: input.providerTerminalFailure ? `provider-terminal-${input.failureKind ?? "failure"}` : `final-response-${input.responseAuthority}` }) : null; + const stableResponseAuthority = input.providerTerminalFailure ? "provider-failure" : input.responseAuthority; return { terminalStatus: input.terminal, terminalSource: input.terminalSource, - finalResponseAuthority: input.responseAuthority, + providerTerminalFailure: input.providerTerminalFailure, + failureKind: input.failureKind, + finalResponseAuthority: stableResponseAuthority, finalResponseSeq: input.reply.seq, finalResponseSource: input.reply.source, finalResponseFinal: input.reply.final, finalResponseReplyAuthority: input.reply.replyAuthority, - finalResponseFallback: input.responseAuthority === "fallback", + finalResponseFallback: !input.providerTerminalFailure && input.responseAuthority === "fallback", needsContinuation: input.needsContinuation, - reason: completionEvidenceReason(input.responseAuthority, input.terminal), + reason: completionEvidenceReason(input.responseAuthority, input.terminal, input.providerTerminalFailure), recommendedAction, valuesPrinted: false, }; } -function completionEvidenceReason(responseAuthority: string, terminal: TerminalStatus | null): string { +function completionEvidenceReason(responseAuthority: string, terminal: TerminalStatus | null, providerTerminalFailure: boolean): string { + if (providerTerminalFailure) return "terminal provider failure; any assistant text is non-authoritative recovery context until the operator reads result/events/logs and sends the next prompt to the same session"; if (terminal !== "completed") return "command is not completed"; if (responseAuthority === "authoritative") return "terminal completed with authoritative assistant final response"; if (responseAuthority === "fallback") return "terminal completed but only a non-authoritative assistant progress/output fallback was available"; @@ -626,11 +639,13 @@ function terminalFromCommand(command: CommandRecord): TerminalStatus | null { function resultTerminal(commandTerminal: TerminalStatus | null, terminalEventStatus: TerminalStatus | null, runTerminalStatus: TerminalStatus | null, failureKind: FailureKind | null): TerminalStatus | null { if (commandTerminal === "failed" && terminalEventStatus === "blocked" && failureKind === "required-skill-unavailable") return "blocked"; + if (providerFailureCategory(failureKind) && (commandTerminal === "completed" || terminalEventStatus === "completed" || runTerminalStatus === "completed")) return "failed"; return commandTerminal ?? terminalEventStatus ?? runTerminalStatus; } function resultTerminalSource(commandTerminal: TerminalStatus | null, terminalEventStatus: TerminalStatus | null, runTerminalStatus: TerminalStatus | null, failureKind: FailureKind | null): string { if (commandTerminal === "failed" && terminalEventStatus === "blocked" && failureKind === "required-skill-unavailable") return "terminal_status-event"; + if (providerFailureCategory(failureKind) && (commandTerminal === "completed" || terminalEventStatus === "completed" || runTerminalStatus === "completed")) return "provider-failure-evidence"; if (commandTerminal) return "command-record"; if (terminalEventStatus) return "terminal_status-event"; if (runTerminalStatus) return "run-record"; @@ -652,8 +667,12 @@ function failureKindFromEvents(events: RunEvent[], options: { includeRetry: bool } function resultFailureKind(run: RunRecord, command: CommandRecord | null, events: RunEvent[], jobs: RunnerJobRecord[], terminal: TerminalStatus | null): FailureKind | null { - if (terminal === "completed") return null; - return failureKindValue(run.failureKind) ?? failureKindFromEvents(events, { includeRetry: terminal !== null }) ?? failureKindFromRunnerJobs(jobs); + const runFailureKind = failureKindValue(run.failureKind); + if (terminal === "completed") { + const nonRetryFailureKind = runFailureKind ?? failureKindFromEvents(events, { includeRetry: false }) ?? failureKindFromRunnerJobs(jobs); + return providerFailureCategory(nonRetryFailureKind) ? nonRetryFailureKind : null; + } + return runFailureKind ?? failureKindFromEvents(events, { includeRetry: terminal !== null }) ?? failureKindFromRunnerJobs(jobs); } function failureKindFromRunnerJobs(jobs: RunnerJobRecord[]): FailureKind | null { diff --git a/src/mgr/server.ts b/src/mgr/server.ts index 2bd30fc..54c4bc8 100644 --- a/src/mgr/server.ts +++ b/src/mgr/server.ts @@ -115,9 +115,10 @@ async function readBody(req: import("node:http").IncomingMessage): Promise { +async function refreshQueueTaskForRead(store: AgentRunStore, taskId: string): Promise { const task = await store.getQueueTask(taskId); - return await refreshQueueTaskRecordForRead(store, task); + const refreshed = await refreshQueueTaskRecordForRead(store, task); + return await enrichQueueTaskForRead(store, refreshed as unknown as JsonRecord); } async function refreshQueuePageForRead(store: AgentRunStore, input: ListQueueTasksInput): Promise { @@ -148,27 +149,69 @@ async function queueCommanderForRead(store: AgentRunStore, queue: string | undef const items = Array.isArray(snapshot.items) ? snapshot.items : []; const enrichedItems = await Promise.all(items.map(async (item) => { const task = asJsonRecord(item); - if (!task) return item as JsonValue; - const supervisor = await queueTaskSupervisor(store, task); - return supervisor ? { ...task, supervisor, valuesPrinted: false } : task; + return task ? await enrichQueueTaskForRead(store, task) as JsonValue : item as JsonValue; })); - return { ...snapshot, items: enrichedItems, valuesPrinted: false } as JsonValue; + const byId = new Map(); + for (const item of enrichedItems) { + const task = asJsonRecord(item); + const id = stringJsonValue(task?.id); + if (id) byId.set(id, item); + } + if (readerId) { + const unfiltered = await store.queueCommander(queue, null) as unknown as JsonRecord; + const unfilteredItems = Array.isArray(unfiltered.items) ? unfiltered.items : []; + for (const item of unfilteredItems) { + const task = asJsonRecord(item); + const id = stringJsonValue(task?.id); + if (!task || !id || byId.has(id)) continue; + const enriched = await enrichQueueTaskForRead(store, task); + if (queueTaskHasActiveSession(enriched)) byId.set(id, enriched as unknown as JsonValue); + } + } + const visibleItems = Array.from(byId.values()); + const activeSessionCount = visibleItems.filter((item) => queueTaskHasActiveSession(asJsonRecord(item))).length; + return { ...snapshot, items: visibleItems, activeSessionCount, valuesPrinted: false } as JsonValue; } -async function queueTaskSupervisor(store: AgentRunStore, task: JsonRecord): Promise { +async function enrichQueueTaskForRead(store: AgentRunStore, task: JsonRecord): Promise { + const activeSession = await queueTaskActiveSession(store, task); + const supervisor = await queueTaskSupervisor(store, task, activeSession); + const activeBySession = activeSession?.active === true || stringJsonValue(activeSession?.activeRunId) !== null || stringJsonValue(activeSession?.activeCommandId) !== null; + return { + ...task, + ...(activeSession ? { activeSession } : {}), + ...(supervisor ? { supervisor } : {}), + ...(activeBySession ? { active: true, attentionState: "active-session" } : {}), + valuesPrinted: false, + }; +} + +async function queueTaskSupervisor(store: AgentRunStore, task: JsonRecord, activeSession: JsonRecord | null = null): Promise { const attempt = asJsonRecord(task.latestAttempt); - const runId = stringJsonValue(attempt?.runId); + const attemptRunId = stringJsonValue(attempt?.runId); + const attemptCommandId = stringJsonValue(attempt?.commandId); + const activeRunId = stringJsonValue(activeSession?.activeRunId); + const activeCommandId = stringJsonValue(activeSession?.activeCommandId); + const runId = activeRunId ?? attemptRunId; if (!runId) return null; + const commandId = activeCommandId ?? attemptCommandId ?? undefined; try { - const result = await buildRunResult(store, runId, stringJsonValue(attempt?.commandId) ?? undefined); + const result = await buildRunResult(store, runId, commandId); const liveness = asJsonRecord(result.liveness); const lastActivity = asJsonRecord(liveness?.lastActivity ?? liveness?.lastCommandActivity); const timeoutBudget = asJsonRecord(liveness?.timeoutBudget); const terminalClassification = asJsonRecord(result.terminalClassification ?? liveness?.terminalClassification); const lease = asJsonRecord(liveness?.lease); return { + source: activeRunId ? "active-session" : "latest-attempt", + attemptRunId, + attemptCommandId, + activeSession, runId: stringJsonValue(result.runId), commandId: stringJsonValue(result.commandId), + executionState: stringJsonValue(activeSession?.executionState), + activeRunId, + activeCommandId, status: stringJsonValue(result.status), terminalStatus: stringJsonValue(result.terminalStatus), failureKind: stringJsonValue(result.failureKind), @@ -192,6 +235,46 @@ async function queueTaskSupervisor(store: AgentRunStore, task: JsonRecord): Prom } } +async function queueTaskActiveSession(store: AgentRunStore, task: JsonRecord): Promise { + const sessionId = queueTaskSessionId(task); + if (!sessionId) return null; + try { + const session = await store.getSessionSummary(sessionId, null) as unknown as JsonRecord; + const activeRunId = stringJsonValue(session.activeRunId); + const activeCommandId = stringJsonValue(session.activeCommandId); + const active = session.active === true || activeRunId !== null || activeCommandId !== null || stringJsonValue(session.executionState) === "running"; + if (!active) return null; + return { + sessionId, + sessionPath: stringJsonValue(session.sessionPath) ?? `/api/v1/sessions/${sessionId}`, + executionState: stringJsonValue(session.executionState), + attentionState: stringJsonValue(session.attentionState), + active, + activeRunId, + activeCommandId, + lastRunId: stringJsonValue(session.lastRunId), + lastCommandId: stringJsonValue(session.lastCommandId), + terminalStatus: stringJsonValue(session.terminalStatus), + failureKind: stringJsonValue(session.failureKind), + lastActivityAt: stringJsonValue(session.lastActivityAt), + updatedAt: stringJsonValue(session.updatedAt), + valuesPrinted: false, + }; + } catch { + return null; + } +} + +function queueTaskSessionId(task: JsonRecord): string | null { + return stringJsonValue(asJsonRecord(task.sessionRef)?.sessionId) ?? stringJsonValue(asJsonRecord(task.latestAttempt)?.sessionId) ?? null; +} + +function queueTaskHasActiveSession(task: JsonRecord | null): boolean { + const activeSession = asJsonRecord(task?.activeSession); + const supervisor = asJsonRecord(task?.supervisor); + return activeSession?.active === true || supervisor?.source === "active-session" || supervisor?.active === true; +} + function compactActivity(activity: JsonRecord): JsonRecord { return { sourceSeq: numberJsonValue(activity.sourceSeq ?? activity.seq), @@ -337,17 +420,35 @@ async function route({ method, url, body, store, sourceCommit, authSummary, runn const commandId = summary.activeCommandId ?? summary.lastCommandId ?? undefined; try { const result = await buildRunResult(store, runId, commandId); + const liveness = asJsonRecord(result.liveness); + const lastActivity = asJsonRecord(liveness?.lastActivity ?? liveness?.lastCommandActivity); + const timeoutBudget = asJsonRecord(liveness?.timeoutBudget); + const terminalClassification = asJsonRecord(result.terminalClassification ?? liveness?.terminalClassification); return { ...summary, liveness: result.liveness ?? null, supervisor: { + sessionId: summary.sessionId, + executionState: summary.executionState, + attentionState: summary.attentionState, + active: summary.active, + activeRunId: summary.activeRunId, + activeCommandId: summary.activeCommandId, + lastRunId: summary.lastRunId, + lastCommandId: summary.lastCommandId, runId: result.runId, commandId: result.commandId, status: result.status, terminalStatus: result.terminalStatus, + failureKind: result.failureKind, + terminalClassification: terminalClassification ? compactTerminalClassification(terminalClassification) : null, + phase: stringJsonValue(liveness?.phase), + lastEventAgeMs: numberJsonValue(liveness?.lastEventAgeMs), + lastActivity: lastActivity ? compactActivity(lastActivity) : null, + timeoutBudget: timeoutBudget ? compactTimeoutBudget(timeoutBudget) : null, lastSeq: result.lastSeq, liveness: result.liveness ?? null, - recoveryActions: typeof result.liveness === "object" && result.liveness !== null && !Array.isArray(result.liveness) ? (result.liveness as JsonRecord).recoveryActions ?? [] : [], + recoveryActions: compactRecoveryActions(liveness?.recoveryActions), ...(result.steerDelivery ? { steerDelivery: result.steerDelivery } : {}), valuesPrinted: false, },