Merge pull request #176 from pikasTech/fix/issue-175-session-followup-visibility

修复 session follow-up 可见性与 provider-http-error 摘要
This commit is contained in:
Lyon
2026-06-12 04:29:02 +08:00
committed by GitHub
3 changed files with 197 additions and 68 deletions
+49 -40
View File
@@ -213,7 +213,7 @@ function summarizeCommandShowResult(command: JsonValue, result: JsonValue, runId
function summarizeResultEnvelope(record: JsonRecord | null): JsonRecord | null {
if (!record) return null;
return withoutFullRecordBytes(compactRecord(record, {
keys: ["runId", "commandId", "attemptId", "runnerId", "jobName", "namespace", "status", "runStatus", "commandState", "terminalStatus", "terminalSource", "completed", "failureKind", "failureMessage", "lastSeq", "eventCount", "scopedEventCount", "scopedLastSeq", "runnerJobCount"],
keys: ["ok", "executionOk", "runId", "commandId", "attemptId", "runnerId", "jobName", "namespace", "status", "runStatus", "commandState", "terminalStatus", "terminalSource", "providerTerminalFailure", "recoverableViaSession", "completed", "finalResponseAuthority", "finalResponseFallback", "needsContinuation", "failureKind", "failureMessage", "lastSeq", "eventCount", "scopedEventCount", "scopedLastSeq", "runnerJobCount"],
}));
}
@@ -262,17 +262,16 @@ function summarizeDiagnosisRecord(record: JsonRecord | null): JsonRecord | null
function summarizeFinalResponseRecord(record: JsonRecord | null): JsonRecord | null {
if (!record) return null;
return withoutFullRecordBytes(compactRecord(record, { keys: ["seq", "source", "final", "replyAuthority", "textTruncated", "outputTruncated", "text"] }));
return withoutFullRecordBytes(compactRecord(record, { keys: ["seq", "source", "final", "replyAuthority", "authority", "fallback", "needsContinuation", "textTruncated", "outputTruncated", "text"] }));
}
function runCommandDrillDown(runId: string, commandId: string | null, sessionId: string | null, lastSeq: number): JsonRecord {
return {
run: `./scripts/agentrun runs show ${runId}`,
runFull: `./scripts/agentrun runs show ${runId} --full`,
result: `./scripts/agentrun runs result ${runId}${commandId ? ` --command-id ${commandId}` : ""}`,
events: `./scripts/agentrun runs events ${runId} --after-seq ${lastSeq} --limit 100 --tail-summary`,
...(commandId ? { command: `./scripts/agentrun commands show ${commandId} --run-id ${runId}`, commandResult: `./scripts/agentrun commands result ${commandId} --run-id ${runId}` } : {}),
...(sessionId ? { trace: `./scripts/agentrun sessions trace ${sessionId} --after-seq ${lastSeq} --limit 100 --run-id ${runId}`, output: `./scripts/agentrun sessions output ${sessionId} --after-seq ${lastSeq} --limit 100 --run-id ${runId}` } : {}),
run: `describe run/${runId}`,
result: `result run/${runId}${commandId ? ` --command ${commandId}` : ""}`,
events: `events run/${runId} --after-seq ${lastSeq} --limit 100`,
...(commandId ? { command: `describe command/${commandId} --run ${runId}`, commandResult: `result command/${commandId} --run ${runId}` } : {}),
...(sessionId ? { logs: `logs session/${sessionId} --tail 100`, send: `send session/${sessionId} --prompt-stdin` } : {}),
valuesPrinted: false,
};
}
@@ -405,11 +404,10 @@ export function summarizeQueueDispatchResult(result: JsonValue, taskId: string):
fullResponseBytes: jsonByteLength(result),
valuesPrinted: false,
pollCommands: {
queue: `./scripts/agentrun queue show ${stringValue(task?.id) ?? taskId}`,
...(runId ? { run: `./scripts/agentrun runs show ${runId}` } : {}),
...(runId && commandId ? { command: `./scripts/agentrun commands show ${commandId} --run-id ${runId}` } : {}),
...(runId ? { events: `./scripts/agentrun runs events ${runId} --after-seq 0 --limit 100 --tail-summary` } : {}),
...(sessionId ? { trace: `./scripts/agentrun sessions trace ${sessionId} --after-seq 0 --limit 100`, output: `./scripts/agentrun sessions output ${sessionId} --after-seq 0 --limit 100` } : {}),
queue: `describe task/${stringValue(task?.id) ?? taskId}`,
...(runId ? { run: `describe run/${runId}`, result: `result run/${runId}${commandId ? ` --command ${commandId}` : ""}`, events: `events run/${runId} --after-seq 0 --limit 100` } : {}),
...(runId && commandId ? { command: `describe command/${commandId} --run ${runId}` } : {}),
...(sessionId ? { logs: `logs session/${sessionId} --tail 100`, send: `send session/${sessionId} --prompt-stdin` } : {}),
},
expandedOutput: {
fullFlag: "--full",
@@ -665,11 +663,11 @@ function summarizeSessionMutationResult(action: "session-cancel" | "session-read
fullResponseBytes: jsonByteLength(result),
valuesPrinted: false,
drillDownCommands: {
show: `./scripts/agentrun sessions show ${sessionId} --reader-id cli`,
trace: `./scripts/agentrun sessions trace ${sessionId} --after-seq 0 --limit 100`,
output: `./scripts/agentrun sessions output ${sessionId} --after-seq 0 --limit 100`,
traceFull: `./scripts/agentrun sessions trace ${sessionId} --after-seq 0 --limit 100 --full`,
outputFull: `./scripts/agentrun sessions output ${sessionId} --after-seq 0 --limit 100 --full`,
show: `describe session/${sessionId}`,
logs: `logs session/${sessionId} --tail 100`,
send: `send session/${sessionId} --prompt-stdin`,
read: `ack session/${sessionId}`,
cancel: `cancel session/${sessionId}`,
},
expandedOutput: {
fullFlag: "--full",
@@ -703,11 +701,13 @@ function summarizeSessionSendResult(result: JsonValue, sessionId: string, profil
fullResponseBytes: jsonByteLength(result),
valuesPrinted: false,
drillDownCommands: {
show: `./scripts/agentrun sessions show ${sessionId} --reader-id cli`,
trace: `./scripts/agentrun sessions trace ${sessionId} --after-seq ${afterSeq} --limit 100`,
output: `./scripts/agentrun sessions output ${sessionId} --after-seq ${afterSeq} --limit 100`,
read: `./scripts/agentrun sessions read ${sessionId} --reader-id cli`,
cancel: `./scripts/agentrun sessions cancel ${sessionId}`,
show: `describe session/${sessionId}`,
logs: `logs session/${sessionId} --tail 100`,
...(stringValue(run?.id) ? { events: `events run/${String(run?.id)} --after-seq ${afterSeq} --limit 100` } : {}),
...(stringValue(run?.id) && stringValue(command?.id) ? { result: `result run/${String(run?.id)} --command ${String(command?.id)}` } : {}),
send: `send session/${sessionId} --prompt-stdin`,
read: `ack session/${sessionId}`,
cancel: `cancel session/${sessionId}`,
},
expandedOutput: {
fullFlag: "--full",
@@ -796,10 +796,10 @@ export function summarizeQueueTaskShowResult(result: JsonValue, taskId: string):
fullResponseBytes: jsonByteLength(result),
valuesPrinted: false,
pollCommands: {
full: `./scripts/agentrun queue show ${taskId} --full`,
...(runId ? { run: `./scripts/agentrun runs show ${runId}`, events: `./scripts/agentrun runs events ${runId} --after-seq ${afterSeq} --limit 100 --tail-summary` } : {}),
...(runId && commandId ? { command: `./scripts/agentrun commands show ${commandId} --run-id ${runId}` } : {}),
...(sessionId ? { trace: `./scripts/agentrun sessions trace ${sessionId} --after-seq 0 --limit 100`, output: `./scripts/agentrun sessions output ${sessionId} --after-seq 0 --limit 100` } : {}),
full: `describe task/${taskId} --full`,
...(runId ? { run: `describe run/${runId}`, result: `result run/${runId}${commandId ? ` --command ${commandId}` : ""}`, events: `events run/${runId} --after-seq ${afterSeq} --limit 100` } : {}),
...(runId && commandId ? { command: `describe command/${commandId} --run ${runId}` } : {}),
...(sessionId ? { logs: `logs session/${sessionId} --tail 100`, send: `send session/${sessionId} --prompt-stdin` } : {}),
},
};
}
@@ -814,6 +814,7 @@ export function summarizeQueueCommanderSnapshot(result: JsonValue, options: Queu
queue: stringValue(record.queue),
readerId: stringValue(record.readerId),
stats: summarizeQueueStats(jsonRecordValue(record.stats)),
activeSessionCount: numberValue(record.activeSessionCount),
sourceCount: items.length,
displayedCount: selected.length,
limit: options.limit,
@@ -824,13 +825,15 @@ export function summarizeQueueCommanderSnapshot(result: JsonValue, options: Queu
fullResponseBytes: jsonByteLength(result),
valuesPrinted: false,
drillDownCommands: {
full: "./scripts/agentrun queue commander --reader-id cli --full",
raw: "./scripts/agentrun queue commander --reader-id cli --raw",
item: "./scripts/agentrun queue show <taskId>",
run: "./scripts/agentrun runs show <runId>",
events: "./scripts/agentrun runs events <runId> --after-seq <lastSeq> --limit 100 --tail-summary",
command: "./scripts/agentrun commands show <commandId> --run-id <runId>",
sessionHint: "session trace/output 只在 queue show <taskId> 的 per-task pollCommands 中按实际 sessionId 输出",
full: "get tasks --queue commander -o wide",
item: "describe task/<taskId>",
run: "describe run/<runId>",
events: "events run/<runId> --after-seq <lastSeq> --limit 100",
result: "result run/<runId> --command <commandId>",
command: "describe command/<commandId> --run <runId>",
logs: "logs session/<sessionId> --tail 100",
send: "send session/<sessionId> --prompt-stdin",
sessionHint: "细节按实际 run/session 走 result/events/logs;续跑只用 send session/<sessionId>",
},
};
}
@@ -948,9 +951,11 @@ function summarizeQueueTaskWithAttempt(record: JsonRecord | null, fallbackTaskId
const summary = summarizeQueueTaskRecord(record, fallbackTaskId);
const latestAttempt = summarizeAttemptRecord(jsonRecordValue(record?.latestAttempt));
const supervisor = summarizeSupervisorRecord(jsonRecordValue(record?.supervisor));
const activeSession = summarizeActiveSessionRecord(jsonRecordValue(record?.activeSession));
const sessionRef = jsonRecordValue(record?.sessionRef);
if (latestAttempt) summary.latestAttempt = latestAttempt;
if (supervisor) summary.supervisor = supervisor;
if (activeSession) summary.activeSession = activeSession;
const sessionId = stringValue(sessionRef?.sessionId) ?? stringValue(latestAttempt?.sessionId);
if (sessionId) summary.sessionId = sessionId;
if (record?.readCursor !== undefined) summary.read = record.readCursor !== null;
@@ -994,7 +999,7 @@ function summarizeSupervisorRecord(record: JsonRecord | null): JsonRecord | null
const terminalClassification = jsonRecordValue(record.terminalClassification);
const diagnosis = jsonRecordValue(record.diagnosis);
return {
...withoutFullRecordBytes(compactRecord(record, { keys: ["phase", "active", "status", "terminalStatus", "failureKind", "runId", "commandId", "lastSeq", "lastEventAt", "lastEventAgeMs", "leaseRemainingMs", "leaseExpired"] })),
...withoutFullRecordBytes(compactRecord(record, { keys: ["source", "sessionId", "executionState", "attentionState", "active", "activeRunId", "activeCommandId", "attemptRunId", "attemptCommandId", "phase", "status", "terminalStatus", "failureKind", "runId", "commandId", "lastSeq", "lastEventAt", "lastEventAgeMs", "leaseRemainingMs", "leaseExpired"] })),
diagnosis: diagnosis ? summarizeDiagnosisRecord(diagnosis) : null,
terminalClassification: terminalClassification ? summarizeTerminalClassification(terminalClassification) : null,
lastActivity: lastActivity ? withoutFullRecordBytes(compactRecord(lastActivity, { keys: ["sourceSeq", "eventId", "activityKind", "type", "status", "toolName", "itemId", "ageMs", "summary"] })) : null,
@@ -1004,8 +1009,13 @@ function summarizeSupervisorRecord(record: JsonRecord | null): JsonRecord | null
};
}
function summarizeActiveSessionRecord(record: JsonRecord | null): JsonRecord | null {
if (!record) return null;
return withoutFullRecordBytes(compactRecord(record, { keys: ["sessionId", "sessionPath", "executionState", "attentionState", "active", "activeRunId", "activeCommandId", "lastRunId", "lastCommandId", "terminalStatus", "failureKind", "lastActivityAt", "updatedAt"] }));
}
function summarizeTerminalClassification(record: JsonRecord): JsonRecord {
return withoutFullRecordBytes(compactRecord(record, { keys: ["category", "confidence", "providerEvidence", "providerInterruption", "providerInterruptionKnown", "providerInterruptionReason", "retryInterruptionObserved", "retryInterruptionSeq", "retryInterruptionKind", "hardTimeout", "idleTimeout", "timeoutKind", "timeoutState", "transportDisconnectObserved", "transportDisconnectSeq", "reason"] }));
return withoutFullRecordBytes(compactRecord(record, { keys: ["category", "confidence", "terminalStatus", "failureKind", "providerEvidence", "providerInterruption", "providerInterruptionKnown", "providerInterruptionReason", "retryInterruptionObserved", "retryInterruptionSeq", "retryInterruptionKind", "hardTimeout", "idleTimeout", "timeoutKind", "timeoutState", "transportDisconnectObserved", "transportDisconnectSeq", "reason"] }));
}
function summarizeRecoveryActions(value: JsonValue | undefined): JsonValue[] {
@@ -1215,10 +1225,9 @@ async function sessionCreate(args: ParsedArgs, positionalSessionId: string | nul
pvc: created.pvc,
storage,
pollCommands: {
show: `./scripts/agentrun sessions show ${sessionId} --reader-id cli`,
storage: `./scripts/agentrun sessions storage ${sessionId}`,
trace: `./scripts/agentrun sessions trace ${sessionId} --after-seq 0 --limit 100`,
send: `./scripts/agentrun sessions send ${sessionId} --prompt "..."`,
show: `describe session/${sessionId}`,
logs: `logs session/${sessionId} --tail 100`,
send: `send session/${sessionId} --prompt-stdin`,
},
};
}
+37 -18
View File
@@ -57,14 +57,20 @@ export async function buildRunResult(store: AgentRunStore, runId: string, comman
const failureDetails = resultFailureDetails(scopedEvents, terminal);
const reply = assistantReply(scopedEvents);
const responseAuthority = finalResponseAuthority(reply);
const needsContinuation = terminal === "completed" && responseAuthority !== "authoritative";
const completionEvidence = completionEvidenceSummary({ terminal, terminalSource, reply, responseAuthority, needsContinuation, sessionId: run.sessionRef?.sessionId ?? null });
const providerTerminalFailure = terminal === "failed" && providerFailureCategory(failureKind) !== null;
const stableResponseAuthority = providerTerminalFailure ? "provider-failure" : responseAuthority;
const finalResponseFallback = !providerTerminalFailure && responseAuthority === "fallback";
const needsContinuation = (terminal === "completed" && responseAuthority !== "authoritative") || (providerTerminalFailure && Boolean(run.sessionRef?.sessionId));
const completionEvidence = completionEvidenceSummary({ terminal, terminalSource, reply, responseAuthority, needsContinuation, sessionId: run.sessionRef?.sessionId ?? null, providerTerminalFailure, failureKind });
const blocker = terminal === "blocked" || terminal === "failed" ? { failureKind, message: failureMessage, details: failureDetails } : null;
const liveness = livenessSnapshot(run, command, events, scopedEvents, terminal, failureKind, failureMessage, { responseAuthority, needsContinuation });
const terminalClassification = terminalClassificationSummary({ terminal, terminalSource, failureKind, failureMessage, liveness });
const diagnosis = runDiagnosis({ run, command, latestJob, events, terminalClassification, liveness, terminalStatus: terminal, failureKind, failureMessage });
const steerDelivery = command?.type === "steer" ? steerDeliverySummary(events, command.id) : null;
const executionOk = terminal === null ? null : terminal === "completed" && !providerTerminalFailure;
return {
ok: executionOk === true,
executionOk,
runId: run.id,
commandId: command?.id ?? commandId ?? null,
attemptId: latestJob?.attemptId ?? attemptFromEvents(events),
@@ -76,9 +82,11 @@ export async function buildRunResult(store: AgentRunStore, runId: string, comman
commandState: command?.state ?? null,
terminalStatus: terminal,
terminalSource,
providerTerminalFailure,
recoverableViaSession: Boolean(run.sessionRef?.sessionId && (needsContinuation || terminal === "failed" || terminal === "blocked" || terminal === "cancelled")),
completed: terminal === "completed",
finalResponseAuthority: responseAuthority,
finalResponseFallback: responseAuthority === "fallback",
finalResponseAuthority: stableResponseAuthority,
finalResponseFallback,
needsContinuation,
completionEvidence,
reply: reply.text,
@@ -88,8 +96,8 @@ export async function buildRunResult(store: AgentRunStore, runId: string, comman
source: reply.source,
final: reply.final,
replyAuthority: reply.replyAuthority,
authority: responseAuthority,
fallback: responseAuthority === "fallback",
authority: stableResponseAuthority,
fallback: finalResponseFallback,
needsContinuation,
textTruncated: reply.textTruncated,
outputTruncated: reply.outputTruncated,
@@ -447,21 +455,22 @@ function recoveryActions(input: { run: RunRecord; command: CommandRecord | null;
];
if (sessionId) actions.push(recoveryDescriptor({ action: "poll-output", operation: "logs", resourceKind: "session", resourceName: sessionId, runId: run.id, commandId: command?.id ?? null, sessionId, afterSeq, limit: 100 }));
if (active) {
if (sessionId) actions.push(recoveryDescriptor({ action: "send-session", operation: "send", resourceKind: "session", resourceName: sessionId, runId: run.id, commandId: command?.id ?? null, sessionId, inputKind: "prompt", reasonHint: "manager 会按当前 session 状态自动决定内部 steer 或新 turn" }));
if (sessionId) actions.push(recoveryDescriptor({ action: "send-session", operation: "send", resourceKind: "session", resourceName: sessionId, runId: run.id, commandId: command?.id ?? null, sessionId, inputKind: "prompt", reasonHint: "manager 会按当前 session 状态自动路由后续 prompt;用户入口保持 send session/<sessionId>" }));
if (command) actions.push(recoveryDescriptor({ action: "cancel-command", operation: "cancel", resourceKind: "command", resourceName: command.id, runId: run.id, commandId: command.id, sessionId, reasonRequired: true, reasonHint: "operator supplied cancel reason" }));
else actions.push(recoveryDescriptor({ action: "cancel-run", operation: "cancel", resourceKind: "run", resourceName: run.id, runId: run.id, sessionId, reasonRequired: true, reasonHint: "operator supplied cancel reason" }));
return actions;
}
if (needsContinuation && sessionId) {
if (command) actions.push(recoveryDescriptor({ action: "inspect-result", operation: "result", resourceKind: "command", resourceName: command.id, runId: run.id, commandId: command.id, sessionId }));
actions.push(recoveryDescriptor({ action: "continue-session", operation: "send", resourceKind: "session", resourceName: sessionId, runId: run.id, commandId: command?.id ?? null, sessionId, reason: `final-response-${finalResponseAuthority}`, inputKind: "prompt", reasonHint: "命令已 terminal completed,但没有 authoritative final response;管理者应先读 trace/output,再用同一 session 发送后续 prompt。" }));
const providerTerminalFailure = terminal === "failed" && providerFailureCategory(failureKind) !== null;
actions.push(recoveryDescriptor({ action: "continue-session", operation: "send", resourceKind: "session", resourceName: sessionId, runId: run.id, commandId: command?.id ?? null, sessionId, reason: providerTerminalFailure ? `provider-terminal-${failureKind ?? "failure"}` : `final-response-${finalResponseAuthority}`, inputKind: "prompt", reasonHint: providerTerminalFailure ? "终态 provider failure;管理者应先读 result/events/logs,确认 provider 错误与最新上下文,再用同一 session 发送后续 prompt。" : "命令已 terminal completed,但没有 authoritative final response;管理者应先读 result/events/logs,再用同一 session 发送后续 prompt。" }));
return actions;
}
if (terminal === "failed" || terminal === "blocked" || terminal === "cancelled") {
if (command) actions.push(recoveryDescriptor({ action: "inspect-result", operation: "result", resourceKind: "command", resourceName: command.id, runId: run.id, commandId: command.id, sessionId }));
if (sessionId) actions.push(recoveryDescriptor({ action: "continue-session", operation: "send", resourceKind: "session", resourceName: sessionId, runId: run.id, commandId: command?.id ?? null, sessionId, inputKind: "prompt" }));
if (failureKind === "backend-timeout") actions.push(recoveryDescriptor({ action: "split-task", operation: "operator-decision", resourceKind: sessionId ? "session" : "run", resourceName: sessionId ?? run.id, runId: run.id, commandId: command?.id ?? null, sessionId, reason: "backend-timeout", reasonHint: "先由管理者读取 trace/result,总结下一步,再把后续 prompt 发到同一 session;必要时把大 patch / 长工具链拆成更短 turn。", failureMessage: failureMessage ? boundedTextSummary(failureMessage, { limitChars: 200 }).text as string : null }));
else actions.push(recoveryDescriptor({ action: "retry-or-split", operation: "operator-decision", resourceKind: sessionId ? "session" : "run", resourceName: sessionId ?? run.id, runId: run.id, commandId: command?.id ?? null, sessionId, reason: failureKind ?? "terminal", reasonHint: "先读 trace/output 的 detail id,再决定继续同 session、重跑或拆分" }));
if (failureKind === "backend-timeout") actions.push(recoveryDescriptor({ action: "split-task", operation: "operator-decision", resourceKind: sessionId ? "session" : "run", resourceName: sessionId ?? run.id, runId: run.id, commandId: command?.id ?? null, sessionId, reason: "backend-timeout", reasonHint: "先由管理者读取 result/events/logs,总结下一步,再把后续 prompt 发到同一 session;必要时把大 patch / 长工具链拆成更短步骤。", failureMessage: failureMessage ? boundedTextSummary(failureMessage, { limitChars: 200 }).text as string : null }));
else actions.push(recoveryDescriptor({ action: "retry-or-split", operation: "operator-decision", resourceKind: sessionId ? "session" : "run", resourceName: sessionId ?? run.id, runId: run.id, commandId: command?.id ?? null, sessionId, reason: failureKind ?? "terminal", reasonHint: "先读 result/events/logs,再决定继续同 session、重跑或拆分" }));
}
return actions;
}
@@ -491,25 +500,29 @@ function finalResponseAuthority(reply: AssistantReplySummary): "authoritative" |
return reply.text.length > 0 ? "fallback" : "missing";
}
function completionEvidenceSummary(input: { terminal: TerminalStatus | null; terminalSource: string; reply: AssistantReplySummary; responseAuthority: string; needsContinuation: boolean; sessionId: string | null }): JsonRecord {
const recommendedAction = input.needsContinuation && input.sessionId ? recoveryDescriptor({ action: "continue-session", operation: "send", resourceKind: "session", resourceName: input.sessionId, sessionId: input.sessionId, inputKind: "prompt", reason: `final-response-${input.responseAuthority}` }) : null;
function completionEvidenceSummary(input: { terminal: TerminalStatus | null; terminalSource: string; reply: AssistantReplySummary; responseAuthority: string; needsContinuation: boolean; sessionId: string | null; providerTerminalFailure: boolean; failureKind: FailureKind | null }): JsonRecord {
const recommendedAction = input.needsContinuation && input.sessionId ? recoveryDescriptor({ action: "continue-session", operation: "send", resourceKind: "session", resourceName: input.sessionId, sessionId: input.sessionId, inputKind: "prompt", reason: input.providerTerminalFailure ? `provider-terminal-${input.failureKind ?? "failure"}` : `final-response-${input.responseAuthority}` }) : null;
const stableResponseAuthority = input.providerTerminalFailure ? "provider-failure" : input.responseAuthority;
return {
terminalStatus: input.terminal,
terminalSource: input.terminalSource,
finalResponseAuthority: input.responseAuthority,
providerTerminalFailure: input.providerTerminalFailure,
failureKind: input.failureKind,
finalResponseAuthority: stableResponseAuthority,
finalResponseSeq: input.reply.seq,
finalResponseSource: input.reply.source,
finalResponseFinal: input.reply.final,
finalResponseReplyAuthority: input.reply.replyAuthority,
finalResponseFallback: input.responseAuthority === "fallback",
finalResponseFallback: !input.providerTerminalFailure && input.responseAuthority === "fallback",
needsContinuation: input.needsContinuation,
reason: completionEvidenceReason(input.responseAuthority, input.terminal),
reason: completionEvidenceReason(input.responseAuthority, input.terminal, input.providerTerminalFailure),
recommendedAction,
valuesPrinted: false,
};
}
function completionEvidenceReason(responseAuthority: string, terminal: TerminalStatus | null): string {
function completionEvidenceReason(responseAuthority: string, terminal: TerminalStatus | null, providerTerminalFailure: boolean): string {
if (providerTerminalFailure) return "terminal provider failure; any assistant text is non-authoritative recovery context until the operator reads result/events/logs and sends the next prompt to the same session";
if (terminal !== "completed") return "command is not completed";
if (responseAuthority === "authoritative") return "terminal completed with authoritative assistant final response";
if (responseAuthority === "fallback") return "terminal completed but only a non-authoritative assistant progress/output fallback was available";
@@ -626,11 +639,13 @@ function terminalFromCommand(command: CommandRecord): TerminalStatus | null {
function resultTerminal(commandTerminal: TerminalStatus | null, terminalEventStatus: TerminalStatus | null, runTerminalStatus: TerminalStatus | null, failureKind: FailureKind | null): TerminalStatus | null {
if (commandTerminal === "failed" && terminalEventStatus === "blocked" && failureKind === "required-skill-unavailable") return "blocked";
if (providerFailureCategory(failureKind) && (commandTerminal === "completed" || terminalEventStatus === "completed" || runTerminalStatus === "completed")) return "failed";
return commandTerminal ?? terminalEventStatus ?? runTerminalStatus;
}
function resultTerminalSource(commandTerminal: TerminalStatus | null, terminalEventStatus: TerminalStatus | null, runTerminalStatus: TerminalStatus | null, failureKind: FailureKind | null): string {
if (commandTerminal === "failed" && terminalEventStatus === "blocked" && failureKind === "required-skill-unavailable") return "terminal_status-event";
if (providerFailureCategory(failureKind) && (commandTerminal === "completed" || terminalEventStatus === "completed" || runTerminalStatus === "completed")) return "provider-failure-evidence";
if (commandTerminal) return "command-record";
if (terminalEventStatus) return "terminal_status-event";
if (runTerminalStatus) return "run-record";
@@ -652,8 +667,12 @@ function failureKindFromEvents(events: RunEvent[], options: { includeRetry: bool
}
function resultFailureKind(run: RunRecord, command: CommandRecord | null, events: RunEvent[], jobs: RunnerJobRecord[], terminal: TerminalStatus | null): FailureKind | null {
if (terminal === "completed") return null;
return failureKindValue(run.failureKind) ?? failureKindFromEvents(events, { includeRetry: terminal !== null }) ?? failureKindFromRunnerJobs(jobs);
const runFailureKind = failureKindValue(run.failureKind);
if (terminal === "completed") {
const nonRetryFailureKind = runFailureKind ?? failureKindFromEvents(events, { includeRetry: false }) ?? failureKindFromRunnerJobs(jobs);
return providerFailureCategory(nonRetryFailureKind) ? nonRetryFailureKind : null;
}
return runFailureKind ?? failureKindFromEvents(events, { includeRetry: terminal !== null }) ?? failureKindFromRunnerJobs(jobs);
}
function failureKindFromRunnerJobs(jobs: RunnerJobRecord[]): FailureKind | null {
+111 -10
View File
@@ -115,9 +115,10 @@ async function readBody(req: import("node:http").IncomingMessage): Promise<unkno
return JSON.parse(text) as unknown;
}
async function refreshQueueTaskForRead(store: AgentRunStore, taskId: string): Promise<QueueTaskRecord> {
async function refreshQueueTaskForRead(store: AgentRunStore, taskId: string): Promise<JsonRecord> {
const task = await store.getQueueTask(taskId);
return await refreshQueueTaskRecordForRead(store, task);
const refreshed = await refreshQueueTaskRecordForRead(store, task);
return await enrichQueueTaskForRead(store, refreshed as unknown as JsonRecord);
}
async function refreshQueuePageForRead(store: AgentRunStore, input: ListQueueTasksInput): Promise<void> {
@@ -148,27 +149,69 @@ async function queueCommanderForRead(store: AgentRunStore, queue: string | undef
const items = Array.isArray(snapshot.items) ? snapshot.items : [];
const enrichedItems = await Promise.all(items.map(async (item) => {
const task = asJsonRecord(item);
if (!task) return item as JsonValue;
const supervisor = await queueTaskSupervisor(store, task);
return supervisor ? { ...task, supervisor, valuesPrinted: false } : task;
return task ? await enrichQueueTaskForRead(store, task) as JsonValue : item as JsonValue;
}));
return { ...snapshot, items: enrichedItems, valuesPrinted: false } as JsonValue;
const byId = new Map<string, JsonValue>();
for (const item of enrichedItems) {
const task = asJsonRecord(item);
const id = stringJsonValue(task?.id);
if (id) byId.set(id, item);
}
if (readerId) {
const unfiltered = await store.queueCommander(queue, null) as unknown as JsonRecord;
const unfilteredItems = Array.isArray(unfiltered.items) ? unfiltered.items : [];
for (const item of unfilteredItems) {
const task = asJsonRecord(item);
const id = stringJsonValue(task?.id);
if (!task || !id || byId.has(id)) continue;
const enriched = await enrichQueueTaskForRead(store, task);
if (queueTaskHasActiveSession(enriched)) byId.set(id, enriched as unknown as JsonValue);
}
}
const visibleItems = Array.from(byId.values());
const activeSessionCount = visibleItems.filter((item) => queueTaskHasActiveSession(asJsonRecord(item))).length;
return { ...snapshot, items: visibleItems, activeSessionCount, valuesPrinted: false } as JsonValue;
}
async function queueTaskSupervisor(store: AgentRunStore, task: JsonRecord): Promise<JsonRecord | null> {
async function enrichQueueTaskForRead(store: AgentRunStore, task: JsonRecord): Promise<JsonRecord> {
const activeSession = await queueTaskActiveSession(store, task);
const supervisor = await queueTaskSupervisor(store, task, activeSession);
const activeBySession = activeSession?.active === true || stringJsonValue(activeSession?.activeRunId) !== null || stringJsonValue(activeSession?.activeCommandId) !== null;
return {
...task,
...(activeSession ? { activeSession } : {}),
...(supervisor ? { supervisor } : {}),
...(activeBySession ? { active: true, attentionState: "active-session" } : {}),
valuesPrinted: false,
};
}
async function queueTaskSupervisor(store: AgentRunStore, task: JsonRecord, activeSession: JsonRecord | null = null): Promise<JsonRecord | null> {
const attempt = asJsonRecord(task.latestAttempt);
const runId = stringJsonValue(attempt?.runId);
const attemptRunId = stringJsonValue(attempt?.runId);
const attemptCommandId = stringJsonValue(attempt?.commandId);
const activeRunId = stringJsonValue(activeSession?.activeRunId);
const activeCommandId = stringJsonValue(activeSession?.activeCommandId);
const runId = activeRunId ?? attemptRunId;
if (!runId) return null;
const commandId = activeCommandId ?? attemptCommandId ?? undefined;
try {
const result = await buildRunResult(store, runId, stringJsonValue(attempt?.commandId) ?? undefined);
const result = await buildRunResult(store, runId, commandId);
const liveness = asJsonRecord(result.liveness);
const lastActivity = asJsonRecord(liveness?.lastActivity ?? liveness?.lastCommandActivity);
const timeoutBudget = asJsonRecord(liveness?.timeoutBudget);
const terminalClassification = asJsonRecord(result.terminalClassification ?? liveness?.terminalClassification);
const lease = asJsonRecord(liveness?.lease);
return {
source: activeRunId ? "active-session" : "latest-attempt",
attemptRunId,
attemptCommandId,
activeSession,
runId: stringJsonValue(result.runId),
commandId: stringJsonValue(result.commandId),
executionState: stringJsonValue(activeSession?.executionState),
activeRunId,
activeCommandId,
status: stringJsonValue(result.status),
terminalStatus: stringJsonValue(result.terminalStatus),
failureKind: stringJsonValue(result.failureKind),
@@ -192,6 +235,46 @@ async function queueTaskSupervisor(store: AgentRunStore, task: JsonRecord): Prom
}
}
async function queueTaskActiveSession(store: AgentRunStore, task: JsonRecord): Promise<JsonRecord | null> {
const sessionId = queueTaskSessionId(task);
if (!sessionId) return null;
try {
const session = await store.getSessionSummary(sessionId, null) as unknown as JsonRecord;
const activeRunId = stringJsonValue(session.activeRunId);
const activeCommandId = stringJsonValue(session.activeCommandId);
const active = session.active === true || activeRunId !== null || activeCommandId !== null || stringJsonValue(session.executionState) === "running";
if (!active) return null;
return {
sessionId,
sessionPath: stringJsonValue(session.sessionPath) ?? `/api/v1/sessions/${sessionId}`,
executionState: stringJsonValue(session.executionState),
attentionState: stringJsonValue(session.attentionState),
active,
activeRunId,
activeCommandId,
lastRunId: stringJsonValue(session.lastRunId),
lastCommandId: stringJsonValue(session.lastCommandId),
terminalStatus: stringJsonValue(session.terminalStatus),
failureKind: stringJsonValue(session.failureKind),
lastActivityAt: stringJsonValue(session.lastActivityAt),
updatedAt: stringJsonValue(session.updatedAt),
valuesPrinted: false,
};
} catch {
return null;
}
}
function queueTaskSessionId(task: JsonRecord): string | null {
return stringJsonValue(asJsonRecord(task.sessionRef)?.sessionId) ?? stringJsonValue(asJsonRecord(task.latestAttempt)?.sessionId) ?? null;
}
function queueTaskHasActiveSession(task: JsonRecord | null): boolean {
const activeSession = asJsonRecord(task?.activeSession);
const supervisor = asJsonRecord(task?.supervisor);
return activeSession?.active === true || supervisor?.source === "active-session" || supervisor?.active === true;
}
function compactActivity(activity: JsonRecord): JsonRecord {
return {
sourceSeq: numberJsonValue(activity.sourceSeq ?? activity.seq),
@@ -337,17 +420,35 @@ async function route({ method, url, body, store, sourceCommit, authSummary, runn
const commandId = summary.activeCommandId ?? summary.lastCommandId ?? undefined;
try {
const result = await buildRunResult(store, runId, commandId);
const liveness = asJsonRecord(result.liveness);
const lastActivity = asJsonRecord(liveness?.lastActivity ?? liveness?.lastCommandActivity);
const timeoutBudget = asJsonRecord(liveness?.timeoutBudget);
const terminalClassification = asJsonRecord(result.terminalClassification ?? liveness?.terminalClassification);
return {
...summary,
liveness: result.liveness ?? null,
supervisor: {
sessionId: summary.sessionId,
executionState: summary.executionState,
attentionState: summary.attentionState,
active: summary.active,
activeRunId: summary.activeRunId,
activeCommandId: summary.activeCommandId,
lastRunId: summary.lastRunId,
lastCommandId: summary.lastCommandId,
runId: result.runId,
commandId: result.commandId,
status: result.status,
terminalStatus: result.terminalStatus,
failureKind: result.failureKind,
terminalClassification: terminalClassification ? compactTerminalClassification(terminalClassification) : null,
phase: stringJsonValue(liveness?.phase),
lastEventAgeMs: numberJsonValue(liveness?.lastEventAgeMs),
lastActivity: lastActivity ? compactActivity(lastActivity) : null,
timeoutBudget: timeoutBudget ? compactTimeoutBudget(timeoutBudget) : null,
lastSeq: result.lastSeq,
liveness: result.liveness ?? null,
recoveryActions: typeof result.liveness === "object" && result.liveness !== null && !Array.isArray(result.liveness) ? (result.liveness as JsonRecord).recoveryActions ?? [] : [],
recoveryActions: compactRecoveryActions(liveness?.recoveryActions),
...(result.steerDelivery ? { steerDelivery: result.steerDelivery } : {}),
valuesPrinted: false,
},