Files
pikasTech-agentrun/src/mgr/diagnosis.ts
T
2026-06-11 12:52:46 +08:00

204 lines
11 KiB
TypeScript

import type { CommandRecord, JsonRecord, JsonValue, RunEvent, RunRecord, RunnerJobRecord } from "../common/types.js";
import { boundedTextSummary } from "../common/output.js";
export interface RunDiagnosisInput {
run: RunRecord;
command: CommandRecord | null;
latestJob: RunnerJobRecord | null;
events: RunEvent[];
terminalClassification: JsonRecord | null;
liveness: JsonRecord | null;
terminalStatus: string | null;
failureKind: string | null;
failureMessage: string | null;
}
export function runDiagnosis(input: RunDiagnosisInput): JsonRecord {
const nowMs = Date.now();
const lease = recordAt(input.liveness, "lease") ?? leaseFromRun(input.run, nowMs);
const timeoutBudget = recordAt(input.liveness, "timeoutBudget");
const staleClaimed = input.run.status === "claimed" && booleanValue(lease.leaseExpired) === true;
const terminalCommandOpenRun = input.run.status === "claimed" && input.terminalStatus !== null;
const runnerJob = input.latestJob ? runnerJobReference(input.latestJob, input.events) : null;
const runnerLost = staleClaimed && (runnerJob === null || runnerJob.phase === "created" || runnerJob.phase === "recorded");
const session = sessionReference(input.run);
const providerEvidence = stringValue(input.terminalClassification?.providerEvidence) ?? "not-applicable";
const providerInterruption = stringValue(input.terminalClassification?.providerInterruption) ?? "not-established";
const category = diagnosisCategory({ staleClaimed, runnerLost, terminalCommandOpenRun, providerEvidence, terminalCategory: stringValue(input.terminalClassification?.category) });
const recoveryActions = recoveryActionsForDiagnosis({ run: input.run, command: input.command, latestJob: input.latestJob, session, runnerLost, staleClaimed, terminalCommandOpenRun, failureKind: input.failureKind, lastSeq: numberValue(input.liveness?.lastSeq) ?? 0 });
return {
category,
staleClaimed,
runnerLost,
terminalCommandOpenRun,
evidenceLevel: evidenceLevel(category, providerEvidence, runnerLost, staleClaimed, terminalCommandOpenRun),
providerEvidence,
providerInterruption,
providerInterruptionKnown: input.terminalClassification?.providerInterruptionKnown === true,
terminalCategory: stringValue(input.terminalClassification?.category),
terminalStatus: input.terminalStatus,
failureKind: input.failureKind,
failureMessage: input.failureMessage ? boundedTextSummary(input.failureMessage, { limitChars: 240 }).text as string : null,
run: {
runId: input.run.id,
status: input.run.status,
claimedBy: input.run.claimedBy,
leaseExpiresAt: input.run.leaseExpiresAt,
leaseExpired: booleanOrNull(lease.leaseExpired),
leaseRemainingMs: numberValue(lease.leaseRemainingMs),
valuesPrinted: false,
},
command: input.command ? {
commandId: input.command.id,
state: input.command.state,
terminalStatus: input.terminalStatus,
acknowledgedAt: input.command.acknowledgedAt ?? null,
updatedAt: input.command.updatedAt,
valuesPrinted: false,
} : null,
runnerJob,
session,
timeoutBudget: timeoutBudget ? compactRecord(timeoutBudget, ["state", "timeoutKind", "timeoutMs", "elapsedMs", "idleElapsedMs", "remainingMs", "startedAt", "idleStartedAt", "lastActivityAt", "lastActivitySeq", "commandElapsedMs", "runElapsedMs", "source"]) : null,
recoveryActions,
valuesPrinted: false,
};
}
export function runnerJobDiagnosis(job: RunnerJobRecord, events: RunEvent[] = []): JsonRecord {
const terminalEvent = latestTerminalEvent(events, job.commandId);
const phase = terminalEvent ? `terminal:${String(terminalEvent.payload?.terminalStatus ?? "unknown")}` : recordAt(job.result, "kubernetes")?.created === true ? "created" : "recorded";
const notStarted = phase === "created" || phase === "recorded";
return {
category: notStarted ? "runner-job-not-started" : phase.startsWith("terminal:") ? "runner-job-terminal" : "runner-job-observed",
runnerLostSuspected: notStarted,
phase,
evidenceLevel: notStarted ? "medium" : "high",
runId: job.runId,
commandId: job.commandId,
runnerJobId: job.id,
attemptId: job.attemptId,
runnerId: job.runnerId,
jobName: job.jobName,
namespace: job.namespace,
logPath: stringValue(recordAt(job.result, "runner")?.logPath),
nextActions: [
{ action: "inspect-run", command: `./scripts/agentrun runs show ${job.runId}`, valuesPrinted: false },
{ action: "inspect-command", command: `./scripts/agentrun commands show ${job.commandId} --run-id ${job.runId}`, valuesPrinted: false },
{ action: "poll-events", command: `./scripts/agentrun runs events ${job.runId} --after-seq 0 --limit 100 --tail-summary`, valuesPrinted: false },
],
valuesPrinted: false,
};
}
function diagnosisCategory(input: { staleClaimed: boolean; runnerLost: boolean; terminalCommandOpenRun: boolean; providerEvidence: string; terminalCategory: string | null }): string {
if (input.runnerLost) return "runner-lost";
if (input.staleClaimed) return "stale-claimed";
if (input.terminalCommandOpenRun) return "terminal-command-open-run";
if (input.providerEvidence === "failure-kind") return "provider-interruption-known";
if (input.providerEvidence === "observed-transport-disconnect") return "provider-interruption-unknown";
if (input.terminalCategory) return input.terminalCategory;
return "unknown";
}
function evidenceLevel(category: string, providerEvidence: string, runnerLost: boolean, staleClaimed: boolean, terminalCommandOpenRun: boolean): string {
if (runnerLost || staleClaimed || terminalCommandOpenRun) return "high";
if (providerEvidence === "failure-kind") return "high";
if (providerEvidence === "observed-transport-disconnect") return "medium";
if (category === "completed" || category === "cancelled") return "high";
return "low";
}
function recoveryActionsForDiagnosis(input: { run: RunRecord; command: CommandRecord | null; latestJob: RunnerJobRecord | null; session: JsonRecord; runnerLost: boolean; staleClaimed: boolean; terminalCommandOpenRun: boolean; failureKind: string | null; lastSeq: number }): JsonRecord[] {
const actions: JsonRecord[] = [];
if (input.latestJob) actions.push({ action: "inspect-runner-job", runnerJobId: input.latestJob.id, command: `./scripts/agentrun runner job-status ${input.latestJob.id} --run-id ${input.run.id}`, valuesPrinted: false });
if (input.command) actions.push({ action: "inspect-command", commandId: input.command.id, command: `./scripts/agentrun commands result ${input.command.id} --run-id ${input.run.id}`, valuesPrinted: false });
actions.push({ action: "poll-events", runId: input.run.id, afterSeq: input.lastSeq, command: `./scripts/agentrun runs events ${input.run.id} --after-seq ${input.lastSeq} --limit 100 --tail-summary`, valuesPrinted: false });
const sessionId = stringValue(input.session.sessionId);
if (sessionId) actions.push({ action: "resume-session", sessionId, command: `./scripts/agentrun sessions turn ${sessionId} --prompt-stdin`, valuesPrinted: false });
else actions.push({ action: "session-unavailable", reason: "sessionRef=null", hint: "当前 run 没有 sessionRef,管理者只能从 run/events/command/runner-job 读取 trace 后重新提交;这表示该任务不可同 session 续跑。", valuesPrinted: false });
if (input.runnerLost || input.staleClaimed || input.terminalCommandOpenRun) actions.push({ action: "refresh-queue-or-resubmit", reason: input.failureKind ?? "stale-runner-state", hint: "先用 queue refresh/show 对齐 attempt;有 sessionId 时继续同一 session,没有 sessionId 才重新派发。", valuesPrinted: false });
return actions.slice(0, 6);
}
function runnerJobReference(job: RunnerJobRecord, events: RunEvent[]): JsonRecord {
const kubernetes = recordAt(job.result, "kubernetes");
const terminalStatus = stringValue(latestTerminalEvent(events, job.commandId)?.payload.terminalStatus) ?? stringValue(latestTerminalStatusFromResult(job.result));
const phase = terminalStatus ? `terminal:${terminalStatus}` : kubernetes?.created === true ? "created" : "recorded";
return {
runnerJobId: job.id,
attemptId: job.attemptId,
runnerId: job.runnerId,
namespace: job.namespace,
jobName: job.jobName,
phase,
terminalStatus,
startedAt: null,
finishedAt: null,
logPath: stringValue(recordAt(job.result, "runner")?.logPath),
valuesPrinted: false,
};
}
function sessionReference(run: RunRecord): JsonRecord {
if (!run.sessionRef) return { sessionId: null, sessionRefNull: true, sessionPath: null, valuesPrinted: false };
return { sessionId: run.sessionRef.sessionId, sessionRefNull: false, sessionPath: `/api/v1/sessions/${run.sessionRef.sessionId}`, valuesPrinted: false };
}
function leaseFromRun(run: RunRecord, nowMs: number): JsonRecord {
const leaseExpiresMs = run.leaseExpiresAt ? Date.parse(run.leaseExpiresAt) : NaN;
const hasLease = Boolean(run.claimedBy && run.leaseExpiresAt && Number.isFinite(leaseExpiresMs));
return {
leaseExpired: run.claimedBy ? (hasLease ? leaseExpiresMs <= nowMs : true) : null,
leaseRemainingMs: hasLease ? Math.max(0, leaseExpiresMs - nowMs) : null,
valuesPrinted: false,
};
}
function latestTerminalStatusFromResult(result: JsonRecord): JsonValue | null {
const value = result.terminalStatus;
return typeof value === "string" ? value : null;
}
function latestTerminalEvent(events: RunEvent[], commandId: string): RunEvent | null {
for (const event of [...events].reverse()) {
const payload = event.payload;
if (payload?.commandId && payload.commandId !== commandId) continue;
if (event.type === "terminal_status") return event;
if (event.type === "backend_status" && payload?.phase === "command-terminal" && payload.commandId === commandId) return event;
}
return null;
}
function compactRecord(record: JsonRecord, keys: string[]): JsonRecord {
const result: JsonRecord = {};
for (const key of keys) {
const value = record[key];
if (value === undefined) continue;
if (typeof value === "string" || typeof value === "number" || typeof value === "boolean" || value === null) result[key] = value;
}
result.valuesPrinted = false;
return result;
}
function recordAt(record: JsonRecord | null | undefined, key: string): JsonRecord | null {
const value = record?.[key];
return typeof value === "object" && value !== null && !Array.isArray(value) ? value as JsonRecord : null;
}
function stringValue(value: JsonValue | undefined): string | null {
return typeof value === "string" && value.length > 0 ? value : null;
}
function numberValue(value: JsonValue | undefined): number | null {
return typeof value === "number" && Number.isFinite(value) ? value : null;
}
function booleanValue(value: JsonValue | undefined): boolean | null {
return typeof value === "boolean" ? value : null;
}
function booleanOrNull(value: JsonValue | undefined): boolean | null {
return typeof value === "boolean" ? value : null;
}