Merge pull request #163 from pikasTech/fix/159-drilldown-visibility

fix: 暴露 stale runner 诊断
This commit is contained in:
Lyon
2026-06-11 09:22:58 +08:00
committed by GitHub
8 changed files with 279 additions and 5 deletions
+1
View File
@@ -93,6 +93,7 @@ CLI 官方 TypeScript 入口固定为 `scripts/agentrun-cli.ts`。在 G14 非交
- 创建类命令返回 `runId``commandId`、status 和下一步 poll command。 - 创建类命令返回 `runId``commandId`、status 和下一步 poll command。
- `runner start` 返回 attemptId、job/process identity、logPath 和后续 status/events 命令。 - `runner start` 返回 attemptId、job/process identity、logPath 和后续 status/events 命令。
- `runner jobs` / `runner job-status` 返回 manager 持久化的 runner Job 最小状态摘要,包括 attemptId、runnerId、namespace、jobName、phase、terminalStatus、logPath、retention 和 redacted Kubernetes identity;业务方不需要直连 Kubernetes 才能定位当前 attempt。 - `runner jobs` / `runner job-status` 返回 manager 持久化的 runner Job 最小状态摘要,包括 attemptId、runnerId、namespace、jobName、phase、terminalStatus、logPath、retention 和 redacted Kubernetes identity;业务方不需要直连 Kubernetes 才能定位当前 attempt。
- `runs show/result``commands show/result``queue show/commander``runner jobs/job-status` 的低噪声视图必须暴露 compact `diagnosis`,把 stale claimed、runner lost、terminal command/open run、`sessionRef=null`、provider interruption known/unknown、runner job phase 和下一步 drill-down 命令放到同一层级,避免监督者人工拼接 run、command、runner job 和 events 才能判断恢复入口。
- `aipod-specs render``queue submit --aipod` 必须调用同一 manager `/api/v1/aipod-specs/:name/render` 路径,把 YAML 展开为标准 Queue taskCLI 不得在本地复制一套 render 逻辑。 - `aipod-specs render``queue submit --aipod` 必须调用同一 manager `/api/v1/aipod-specs/:name/render` 路径,把 YAML 展开为标准 Queue taskCLI 不得在本地复制一套 render 逻辑。
- `queue submit --aipod <name>` 只接受本次任务输入(prompt、idempotencyKey、tenant/project/queue/lane/provider 覆盖等),模型、provider credential、tool credential、gitbundle 和 requiredSkills 由 [spec-v01-aipod-spec.md](spec-v01-aipod-spec.md) 定义。 - `queue submit --aipod <name>` 只接受本次任务输入(prompt、idempotencyKey、tenant/project/queue/lane/provider 覆盖等),模型、provider credential、tool credential、gitbundle 和 requiredSkills 由 [spec-v01-aipod-spec.md](spec-v01-aipod-spec.md) 定义。
- 查询类命令返回当前 state、terminal_status、failureKind、event cursor 或 logPath。 - 查询类命令返回当前 state、terminal_status、failureKind、event cursor 或 logPath。
+31
View File
@@ -183,6 +183,7 @@ function summarizeRunShowResult(run: JsonValue, result: JsonValue, runId: string
action: "runs-show-summary", action: "runs-show-summary",
run: summarizeRunRecord(jsonRecordValue(run)), run: summarizeRunRecord(jsonRecordValue(run)),
result: summarizeResultEnvelope(resultRecord), result: summarizeResultEnvelope(resultRecord),
diagnosis: summarizeDiagnosisRecord(jsonRecordValue(resultRecord?.diagnosis)),
terminalClassification: summarizeTerminalClassificationIfPresent(resultRecord), terminalClassification: summarizeTerminalClassificationIfPresent(resultRecord),
liveness: summarizeLivenessRecord(jsonRecordValue(resultRecord?.liveness)), liveness: summarizeLivenessRecord(jsonRecordValue(resultRecord?.liveness)),
finalResponse: summarizeFinalResponseRecord(jsonRecordValue(resultRecord?.finalResponse)), finalResponse: summarizeFinalResponseRecord(jsonRecordValue(resultRecord?.finalResponse)),
@@ -200,6 +201,7 @@ function summarizeCommandShowResult(command: JsonValue, result: JsonValue, runId
action: "commands-show-summary", action: "commands-show-summary",
command: summarizeCommandRecord(jsonRecordValue(command)), command: summarizeCommandRecord(jsonRecordValue(command)),
result: summarizeResultEnvelope(resultRecord), result: summarizeResultEnvelope(resultRecord),
diagnosis: summarizeDiagnosisRecord(jsonRecordValue(resultRecord?.diagnosis)),
terminalClassification: summarizeTerminalClassificationIfPresent(resultRecord), terminalClassification: summarizeTerminalClassificationIfPresent(resultRecord),
liveness: summarizeLivenessRecord(jsonRecordValue(resultRecord?.liveness)), liveness: summarizeLivenessRecord(jsonRecordValue(resultRecord?.liveness)),
finalResponse: summarizeFinalResponseRecord(jsonRecordValue(resultRecord?.finalResponse)), finalResponse: summarizeFinalResponseRecord(jsonRecordValue(resultRecord?.finalResponse)),
@@ -238,6 +240,23 @@ function summarizeLivenessRecord(record: JsonRecord | null): JsonRecord | null {
}; };
} }
function summarizeDiagnosisRecord(record: JsonRecord | null): JsonRecord | null {
if (!record) return null;
const run = jsonRecordValue(record.run);
const command = jsonRecordValue(record.command);
const runnerJob = jsonRecordValue(record.runnerJob);
const session = jsonRecordValue(record.session);
return {
...withoutFullRecordBytes(compactRecord(record, { keys: ["category", "staleClaimed", "runnerLost", "terminalCommandOpenRun", "evidenceLevel", "providerEvidence", "providerInterruption", "providerInterruptionKnown", "terminalCategory", "terminalStatus", "failureKind", "failureMessage"] })),
run: run ? withoutFullRecordBytes(compactRecord(run, { keys: ["runId", "status", "claimedBy", "leaseExpiresAt", "leaseExpired", "leaseRemainingMs"] })) : null,
command: command ? withoutFullRecordBytes(compactRecord(command, { keys: ["commandId", "state", "terminalStatus", "acknowledgedAt", "updatedAt"] })) : null,
runnerJob: runnerJob ? withoutFullRecordBytes(compactRecord(runnerJob, { keys: ["runnerJobId", "attemptId", "runnerId", "namespace", "jobName", "phase", "terminalStatus", "startedAt", "finishedAt", "logPath"] })) : null,
session: session ? withoutFullRecordBytes(compactRecord(session, { keys: ["sessionId", "sessionRefNull", "sessionPath"] })) : null,
recoveryActions: summarizeRecoveryActions(record.recoveryActions),
valuesPrinted: false,
};
}
function summarizeFinalResponseRecord(record: JsonRecord | null): JsonRecord | null { function summarizeFinalResponseRecord(record: JsonRecord | null): JsonRecord | null {
if (!record) return null; if (!record) return null;
return withoutFullRecordBytes(compactRecord(record, { keys: ["seq", "source", "final", "replyAuthority", "textTruncated", "outputTruncated", "text"] })); return withoutFullRecordBytes(compactRecord(record, { keys: ["seq", "source", "final", "replyAuthority", "textTruncated", "outputTruncated", "text"] }));
@@ -900,8 +919,10 @@ function summarizeSupervisorRecord(record: JsonRecord | null): JsonRecord | null
const lastActivity = jsonRecordValue(record.lastActivity); const lastActivity = jsonRecordValue(record.lastActivity);
const timeoutBudget = jsonRecordValue(record.timeoutBudget); const timeoutBudget = jsonRecordValue(record.timeoutBudget);
const terminalClassification = jsonRecordValue(record.terminalClassification); const terminalClassification = jsonRecordValue(record.terminalClassification);
const diagnosis = jsonRecordValue(record.diagnosis);
return { return {
...withoutFullRecordBytes(compactRecord(record, { keys: ["phase", "active", "status", "terminalStatus", "failureKind", "runId", "commandId", "lastSeq"] })), ...withoutFullRecordBytes(compactRecord(record, { keys: ["phase", "active", "status", "terminalStatus", "failureKind", "runId", "commandId", "lastSeq"] })),
diagnosis: diagnosis ? summarizeDiagnosisRecord(diagnosis) : null,
terminalClassification: terminalClassification ? summarizeTerminalClassification(terminalClassification) : null, terminalClassification: terminalClassification ? summarizeTerminalClassification(terminalClassification) : null,
lastActivity: lastActivity ? withoutFullRecordBytes(compactRecord(lastActivity, { keys: ["sourceSeq", "eventId", "activityKind", "type", "status", "toolName", "itemId", "ageMs", "summary"] })) : null, lastActivity: lastActivity ? withoutFullRecordBytes(compactRecord(lastActivity, { keys: ["sourceSeq", "eventId", "activityKind", "type", "status", "toolName", "itemId", "ageMs", "summary"] })) : null,
timeoutBudget: timeoutBudget ? withoutFullRecordBytes(compactRecord(timeoutBudget, { keys: ["state", "timeoutMs", "elapsedMs", "remainingMs", "startedAt", "source"] })) : null, timeoutBudget: timeoutBudget ? withoutFullRecordBytes(compactRecord(timeoutBudget, { keys: ["state", "timeoutMs", "elapsedMs", "remainingMs", "startedAt", "source"] })) : null,
@@ -943,6 +964,7 @@ function summarizeRunnerJobRecord(record: JsonRecord | null): JsonRecord | null
...compactRecord(record, { keys: ["action", "mutation", "runId", "commandId", "attemptId", "runnerId", "namespace", "jobName"] }), ...compactRecord(record, { keys: ["action", "mutation", "runId", "commandId", "attemptId", "runnerId", "namespace", "jobName"] }),
logPath: stringValue(runner?.logPath), logPath: stringValue(runner?.logPath),
backendProfile: stringValue(runner?.backendProfile), backendProfile: stringValue(runner?.backendProfile),
diagnosis: summarizeRunnerJobDiagnosis(jsonRecordValue(record.diagnosis)),
jobUid: stringValue(jobIdentity?.uid), jobUid: stringValue(jobIdentity?.uid),
created: kubernetes?.created === true, created: kubernetes?.created === true,
warnings: Array.isArray(record.warnings) ? record.warnings.map((item) => boundedSummaryString(typeof item === "string" ? item : JSON.stringify(item), 240)).filter((item): item is string => Boolean(item)) : [], warnings: Array.isArray(record.warnings) ? record.warnings.map((item) => boundedSummaryString(typeof item === "string" ? item : JSON.stringify(item), 240)).filter((item): item is string => Boolean(item)) : [],
@@ -951,6 +973,15 @@ function summarizeRunnerJobRecord(record: JsonRecord | null): JsonRecord | null
}; };
} }
function summarizeRunnerJobDiagnosis(record: JsonRecord | null): JsonRecord | null {
if (!record) return null;
return withoutFullRecordBytes({
...compactRecord(record, { keys: ["category", "runnerLostSuspected", "phase", "evidenceLevel", "runId", "commandId", "runnerJobId", "attemptId", "runnerId", "jobName", "namespace", "logPath"] }),
nextActions: summarizeRecoveryActions(record.nextActions),
valuesPrinted: false,
});
}
function summarizeGenericRecord(record: JsonRecord | null): JsonRecord | null { function summarizeGenericRecord(record: JsonRecord | null): JsonRecord | null {
if (!record) return null; if (!record) return null;
return compactRecord(record, { return compactRecord(record, {
+203
View File
@@ -0,0 +1,203 @@
import type { CommandRecord, JsonRecord, JsonValue, RunEvent, RunRecord, RunnerJobRecord } from "../common/types.js";
import { boundedTextSummary } from "../common/output.js";
export interface RunDiagnosisInput {
run: RunRecord;
command: CommandRecord | null;
latestJob: RunnerJobRecord | null;
events: RunEvent[];
terminalClassification: JsonRecord | null;
liveness: JsonRecord | null;
terminalStatus: string | null;
failureKind: string | null;
failureMessage: string | null;
}
export function runDiagnosis(input: RunDiagnosisInput): JsonRecord {
const nowMs = Date.now();
const lease = recordAt(input.liveness, "lease") ?? leaseFromRun(input.run, nowMs);
const timeoutBudget = recordAt(input.liveness, "timeoutBudget");
const staleClaimed = input.run.status === "claimed" && booleanValue(lease.leaseExpired) === true;
const terminalCommandOpenRun = input.run.status === "claimed" && input.terminalStatus !== null;
const runnerJob = input.latestJob ? runnerJobReference(input.latestJob, input.events) : null;
const runnerLost = staleClaimed && (runnerJob === null || runnerJob.phase === "created" || runnerJob.phase === "recorded");
const session = sessionReference(input.run);
const providerEvidence = stringValue(input.terminalClassification?.providerEvidence) ?? "not-applicable";
const providerInterruption = stringValue(input.terminalClassification?.providerInterruption) ?? "not-established";
const category = diagnosisCategory({ staleClaimed, runnerLost, terminalCommandOpenRun, providerEvidence, terminalCategory: stringValue(input.terminalClassification?.category) });
const recoveryActions = recoveryActionsForDiagnosis({ run: input.run, command: input.command, latestJob: input.latestJob, session, runnerLost, staleClaimed, terminalCommandOpenRun, failureKind: input.failureKind, lastSeq: numberValue(input.liveness?.lastSeq) ?? 0 });
return {
category,
staleClaimed,
runnerLost,
terminalCommandOpenRun,
evidenceLevel: evidenceLevel(category, providerEvidence, runnerLost, staleClaimed, terminalCommandOpenRun),
providerEvidence,
providerInterruption,
providerInterruptionKnown: input.terminalClassification?.providerInterruptionKnown === true,
terminalCategory: stringValue(input.terminalClassification?.category),
terminalStatus: input.terminalStatus,
failureKind: input.failureKind,
failureMessage: input.failureMessage ? boundedTextSummary(input.failureMessage, { limitChars: 240 }).text as string : null,
run: {
runId: input.run.id,
status: input.run.status,
claimedBy: input.run.claimedBy,
leaseExpiresAt: input.run.leaseExpiresAt,
leaseExpired: booleanOrNull(lease.leaseExpired),
leaseRemainingMs: numberValue(lease.leaseRemainingMs),
valuesPrinted: false,
},
command: input.command ? {
commandId: input.command.id,
state: input.command.state,
terminalStatus: input.terminalStatus,
acknowledgedAt: input.command.acknowledgedAt ?? null,
updatedAt: input.command.updatedAt,
valuesPrinted: false,
} : null,
runnerJob,
session,
timeoutBudget: timeoutBudget ? compactRecord(timeoutBudget, ["state", "timeoutMs", "elapsedMs", "remainingMs", "startedAt", "source"]) : null,
recoveryActions,
valuesPrinted: false,
};
}
export function runnerJobDiagnosis(job: RunnerJobRecord, events: RunEvent[] = []): JsonRecord {
const terminalEvent = latestTerminalEvent(events, job.commandId);
const phase = terminalEvent ? `terminal:${String(terminalEvent.payload?.terminalStatus ?? "unknown")}` : recordAt(job.result, "kubernetes")?.created === true ? "created" : "recorded";
const notStarted = phase === "created" || phase === "recorded";
return {
category: notStarted ? "runner-job-not-started" : phase.startsWith("terminal:") ? "runner-job-terminal" : "runner-job-observed",
runnerLostSuspected: notStarted,
phase,
evidenceLevel: notStarted ? "medium" : "high",
runId: job.runId,
commandId: job.commandId,
runnerJobId: job.id,
attemptId: job.attemptId,
runnerId: job.runnerId,
jobName: job.jobName,
namespace: job.namespace,
logPath: stringValue(recordAt(job.result, "runner")?.logPath),
nextActions: [
{ action: "inspect-run", command: `./scripts/agentrun runs show ${job.runId}`, valuesPrinted: false },
{ action: "inspect-command", command: `./scripts/agentrun commands show ${job.commandId} --run-id ${job.runId}`, valuesPrinted: false },
{ action: "poll-events", command: `./scripts/agentrun runs events ${job.runId} --after-seq 0 --limit 100 --tail-summary`, valuesPrinted: false },
],
valuesPrinted: false,
};
}
function diagnosisCategory(input: { staleClaimed: boolean; runnerLost: boolean; terminalCommandOpenRun: boolean; providerEvidence: string; terminalCategory: string | null }): string {
if (input.runnerLost) return "runner-lost";
if (input.staleClaimed) return "stale-claimed";
if (input.terminalCommandOpenRun) return "terminal-command-open-run";
if (input.providerEvidence === "failure-kind") return "provider-interruption-known";
if (input.providerEvidence === "observed-transport-disconnect") return "provider-interruption-unknown";
if (input.terminalCategory) return input.terminalCategory;
return "unknown";
}
function evidenceLevel(category: string, providerEvidence: string, runnerLost: boolean, staleClaimed: boolean, terminalCommandOpenRun: boolean): string {
if (runnerLost || staleClaimed || terminalCommandOpenRun) return "high";
if (providerEvidence === "failure-kind") return "high";
if (providerEvidence === "observed-transport-disconnect") return "medium";
if (category === "completed" || category === "cancelled") return "high";
return "low";
}
function recoveryActionsForDiagnosis(input: { run: RunRecord; command: CommandRecord | null; latestJob: RunnerJobRecord | null; session: JsonRecord; runnerLost: boolean; staleClaimed: boolean; terminalCommandOpenRun: boolean; failureKind: string | null; lastSeq: number }): JsonRecord[] {
const actions: JsonRecord[] = [];
if (input.latestJob) actions.push({ action: "inspect-runner-job", runnerJobId: input.latestJob.id, command: `./scripts/agentrun runner job-status ${input.latestJob.id} --run-id ${input.run.id}`, valuesPrinted: false });
if (input.command) actions.push({ action: "inspect-command", commandId: input.command.id, command: `./scripts/agentrun commands result ${input.command.id} --run-id ${input.run.id}`, valuesPrinted: false });
actions.push({ action: "poll-events", runId: input.run.id, afterSeq: input.lastSeq, command: `./scripts/agentrun runs events ${input.run.id} --after-seq ${input.lastSeq} --limit 100 --tail-summary`, valuesPrinted: false });
const sessionId = stringValue(input.session.sessionId);
if (sessionId) actions.push({ action: "resume-session", sessionId, command: `./scripts/agentrun sessions turn ${sessionId} --prompt-stdin`, valuesPrinted: false });
else actions.push({ action: "session-unavailable", reason: "sessionRef=null", hint: "当前 run 没有 sessionRef,只能从 run/events/command/runner-job 继续 drill-down 或重新提交任务。", valuesPrinted: false });
if (input.runnerLost || input.staleClaimed || input.terminalCommandOpenRun) actions.push({ action: "refresh-queue-or-resubmit", reason: input.failureKind ?? "stale-runner-state", hint: "先用 queue refresh/show 对齐 attempt,再按任务边界决定重新派发或拆分续跑。", valuesPrinted: false });
return actions.slice(0, 6);
}
function runnerJobReference(job: RunnerJobRecord, events: RunEvent[]): JsonRecord {
const kubernetes = recordAt(job.result, "kubernetes");
const terminalStatus = stringValue(latestTerminalEvent(events, job.commandId)?.payload.terminalStatus) ?? stringValue(latestTerminalStatusFromResult(job.result));
const phase = terminalStatus ? `terminal:${terminalStatus}` : kubernetes?.created === true ? "created" : "recorded";
return {
runnerJobId: job.id,
attemptId: job.attemptId,
runnerId: job.runnerId,
namespace: job.namespace,
jobName: job.jobName,
phase,
terminalStatus,
startedAt: null,
finishedAt: null,
logPath: stringValue(recordAt(job.result, "runner")?.logPath),
valuesPrinted: false,
};
}
function sessionReference(run: RunRecord): JsonRecord {
if (!run.sessionRef) return { sessionId: null, sessionRefNull: true, sessionPath: null, valuesPrinted: false };
return { sessionId: run.sessionRef.sessionId, sessionRefNull: false, sessionPath: `/api/v1/sessions/${run.sessionRef.sessionId}`, valuesPrinted: false };
}
function leaseFromRun(run: RunRecord, nowMs: number): JsonRecord {
const leaseExpiresMs = run.leaseExpiresAt ? Date.parse(run.leaseExpiresAt) : NaN;
const hasLease = Boolean(run.claimedBy && run.leaseExpiresAt && Number.isFinite(leaseExpiresMs));
return {
leaseExpired: run.claimedBy ? (hasLease ? leaseExpiresMs <= nowMs : true) : null,
leaseRemainingMs: hasLease ? Math.max(0, leaseExpiresMs - nowMs) : null,
valuesPrinted: false,
};
}
function latestTerminalStatusFromResult(result: JsonRecord): JsonValue | null {
const value = result.terminalStatus;
return typeof value === "string" ? value : null;
}
function latestTerminalEvent(events: RunEvent[], commandId: string): RunEvent | null {
for (const event of [...events].reverse()) {
const payload = event.payload;
if (payload?.commandId && payload.commandId !== commandId) continue;
if (event.type === "terminal_status") return event;
if (event.type === "backend_status" && payload?.phase === "command-terminal" && payload.commandId === commandId) return event;
}
return null;
}
function compactRecord(record: JsonRecord, keys: string[]): JsonRecord {
const result: JsonRecord = {};
for (const key of keys) {
const value = record[key];
if (value === undefined) continue;
if (typeof value === "string" || typeof value === "number" || typeof value === "boolean" || value === null) result[key] = value;
}
result.valuesPrinted = false;
return result;
}
function recordAt(record: JsonRecord | null | undefined, key: string): JsonRecord | null {
const value = record?.[key];
return typeof value === "object" && value !== null && !Array.isArray(value) ? value as JsonRecord : null;
}
function stringValue(value: JsonValue | undefined): string | null {
return typeof value === "string" && value.length > 0 ? value : null;
}
function numberValue(value: JsonValue | undefined): number | null {
return typeof value === "number" && Number.isFinite(value) ? value : null;
}
function booleanValue(value: JsonValue | undefined): boolean | null {
return typeof value === "boolean" ? value : null;
}
function booleanOrNull(value: JsonValue | undefined): boolean | null {
return typeof value === "boolean" ? value : null;
}
+3
View File
@@ -1,6 +1,7 @@
import type { AgentRunStore } from "./store.js"; import type { AgentRunStore } from "./store.js";
import type { CommandRecord, FailureKind, JsonRecord, JsonValue, RunEvent, RunRecord, RunnerJobRecord, TerminalStatus } from "../common/types.js"; import type { CommandRecord, FailureKind, JsonRecord, JsonValue, RunEvent, RunRecord, RunnerJobRecord, TerminalStatus } from "../common/types.js";
import { boundedTextSummary, outputBytesFromPayload, outputTruncatedFromPayload } from "../common/output.js"; import { boundedTextSummary, outputBytesFromPayload, outputTruncatedFromPayload } from "../common/output.js";
import { runDiagnosis } from "./diagnosis.js";
const maxToolCallSummaryItems = 40; const maxToolCallSummaryItems = 40;
const toolCallCommandLimitChars = 600; const toolCallCommandLimitChars = 600;
@@ -56,6 +57,7 @@ export async function buildRunResult(store: AgentRunStore, runId: string, comman
const blocker = terminal === "blocked" || terminal === "failed" ? { failureKind, message: failureMessage, details: failureDetails } : null; const blocker = terminal === "blocked" || terminal === "failed" ? { failureKind, message: failureMessage, details: failureDetails } : null;
const liveness = livenessSnapshot(run, command, events, scopedEvents, terminal, failureKind, failureMessage); const liveness = livenessSnapshot(run, command, events, scopedEvents, terminal, failureKind, failureMessage);
const terminalClassification = terminalClassificationSummary({ terminal, terminalSource, failureKind, failureMessage, liveness }); const terminalClassification = terminalClassificationSummary({ terminal, terminalSource, failureKind, failureMessage, liveness });
const diagnosis = runDiagnosis({ run, command, latestJob, events, terminalClassification, liveness, terminalStatus: terminal, failureKind, failureMessage });
const steerDelivery = command?.type === "steer" ? steerDeliverySummary(events, command.id) : null; const steerDelivery = command?.type === "steer" ? steerDeliverySummary(events, command.id) : null;
return { return {
runId: run.id, runId: run.id,
@@ -88,6 +90,7 @@ export async function buildRunResult(store: AgentRunStore, runId: string, comman
failureMessage, failureMessage,
failureDetails, failureDetails,
terminalClassification, terminalClassification,
diagnosis,
blocker, blocker,
liveness, liveness,
...(steerDelivery ? { steerDelivery } : {}), ...(steerDelivery ? { steerDelivery } : {}),
+2
View File
@@ -1,4 +1,5 @@
import type { JsonRecord, RunEvent, RunnerJobRecord, TerminalStatus } from "../common/types.js"; import type { JsonRecord, RunEvent, RunnerJobRecord, TerminalStatus } from "../common/types.js";
import { runnerJobDiagnosis } from "./diagnosis.js";
export function runnerJobStatusSummary(job: RunnerJobRecord, events: RunEvent[] = []): JsonRecord { export function runnerJobStatusSummary(job: RunnerJobRecord, events: RunEvent[] = []): JsonRecord {
const terminalEvent = latestTerminalEvent(events, job.commandId); const terminalEvent = latestTerminalEvent(events, job.commandId);
@@ -32,6 +33,7 @@ export function runnerJobStatusSummary(job: RunnerJobRecord, events: RunEvent[]
logPath: typeof runner.logPath === "string" ? runner.logPath : null, logPath: typeof runner.logPath === "string" ? runner.logPath : null,
retention, retention,
kubernetes, kubernetes,
diagnosis: runnerJobDiagnosis(job, events),
createdAt: job.createdAt, createdAt: job.createdAt,
updatedAt: job.updatedAt, updatedAt: job.updatedAt,
valuesPrinted: false, valuesPrinted: false,
+1
View File
@@ -151,6 +151,7 @@ async function queueTaskSupervisor(store: AgentRunStore, task: JsonRecord): Prom
status: stringJsonValue(result.status), status: stringJsonValue(result.status),
terminalStatus: stringJsonValue(result.terminalStatus), terminalStatus: stringJsonValue(result.terminalStatus),
failureKind: stringJsonValue(result.failureKind), failureKind: stringJsonValue(result.failureKind),
diagnosis: asJsonRecord(result.diagnosis),
terminalClassification: terminalClassification ? compactTerminalClassification(terminalClassification) : null, terminalClassification: terminalClassification ? compactTerminalClassification(terminalClassification) : null,
phase: stringJsonValue(liveness?.phase), phase: stringJsonValue(liveness?.phase),
active: liveness?.active === true, active: liveness?.active === true,
+35 -5
View File
@@ -54,19 +54,48 @@ const selfTest: SelfTestCase = async (context: SelfTestContext) => {
await client.post(`/api/v1/runs/${noSession.runId}/events`, { type: "backend_status", payload: { commandId: noSession.commandId, phase: "codex-app-server-closed", message: "stdio closed before terminal result" } }); await client.post(`/api/v1/runs/${noSession.runId}/events`, { type: "backend_status", payload: { commandId: noSession.commandId, phase: "codex-app-server-closed", message: "stdio closed before terminal result" } });
await client.post(`/api/v1/runs/${noSession.runId}/events`, { type: "terminal_status", payload: { commandId: noSession.commandId, terminalStatus: "failed", failureKind: "backend-timeout", message: "codex stdio turn hard timed out after 50ms" } }); await client.post(`/api/v1/runs/${noSession.runId}/events`, { type: "terminal_status", payload: { commandId: noSession.commandId, terminalStatus: "failed", failureKind: "backend-timeout", message: "codex stdio turn hard timed out after 50ms" } });
await client.patch(`/api/v1/commands/${noSession.commandId}/status`, { terminalStatus: "failed", failureKind: "backend-timeout", failureMessage: "codex stdio turn hard timed out after 50ms" }); await client.patch(`/api/v1/commands/${noSession.commandId}/status`, { terminalStatus: "failed", failureKind: "backend-timeout", failureMessage: "codex stdio turn hard timed out after 50ms" });
await client.patch(`/api/v1/runs/${noSession.runId}/status`, { terminalStatus: "failed", failureKind: "backend-timeout", failureMessage: "codex stdio turn hard timed out after 50ms" });
const noSessionResult = await commandResult(client, noSession); const noSessionResult = await commandResult(client, noSession);
const noSessionLive = noSessionResult.liveness as JsonRecord; const noSessionLive = noSessionResult.liveness as JsonRecord;
const noSessionClassification = noSessionResult.terminalClassification as JsonRecord; const noSessionClassification = noSessionResult.terminalClassification as JsonRecord;
const noSessionDiagnosis = noSessionResult.diagnosis as JsonRecord;
assert.equal(noSessionClassification.category, "execution-hard-timeout"); assert.equal(noSessionClassification.category, "execution-hard-timeout");
assert.equal(noSessionClassification.providerEvidence, "observed-transport-disconnect"); assert.equal(noSessionClassification.providerEvidence, "observed-transport-disconnect");
assert.equal(noSessionClassification.providerInterruptionKnown, false); assert.equal(noSessionClassification.providerInterruptionKnown, false);
assert.equal(noSessionDiagnosis.category, "terminal-command-open-run");
assert.equal(noSessionDiagnosis.providerEvidence, "observed-transport-disconnect");
assert.equal(((noSessionDiagnosis.session as JsonRecord).sessionRefNull), true);
assert.ok((noSessionDiagnosis.recoveryActions as JsonRecord[]).some((action) => action.action === "session-unavailable"));
assert.match(String(noSessionClassification.providerInterruptionReason), /cannot distinguish provider outage/u); assert.match(String(noSessionClassification.providerInterruptionReason), /cannot distinguish provider outage/u);
assert.equal((noSessionLive.transportDisconnect as JsonRecord).sourceSeq, 4); assert.equal((noSessionLive.transportDisconnect as JsonRecord).sourceSeq, 4);
assert.equal((noSessionLive.recoveryActions as JsonRecord[]).some((action) => action.action === "resume-session"), false, "sessionId=null must not suggest session-only resume"); assert.equal((noSessionLive.recoveryActions as JsonRecord[]).some((action) => action.action === "resume-session"), false, "sessionId=null must not suggest session-only resume");
assert.equal((noSessionLive.recoveryActions as JsonRecord[]).some((action) => action.action === "poll-output"), false, "sessionId=null must not suggest session output path"); assert.equal((noSessionLive.recoveryActions as JsonRecord[]).some((action) => action.action === "poll-output"), false, "sessionId=null must not suggest session output path");
assert.ok((noSessionLive.recoveryActions as JsonRecord[]).some((action) => action.action === "poll-trace" && String(action.command).includes("runs events"))); assert.ok((noSessionLive.recoveryActions as JsonRecord[]).some((action) => action.action === "poll-trace" && String(action.command).includes("runs events")));
const stale = await createActiveRun(client, context, "timeout-liveness-stale-claimed", 120_000, { session: false, leaseMs: 1 });
await store.saveRunnerJob({
runId: stale.runId,
commandId: stale.commandId,
idempotencyKey: "stale-runner-lost",
payloadHash: "hash-stale-runner-lost",
attemptId: "attempt_stale_runner_lost",
runnerId: "runner_stale_claimed",
namespace: "agentrun-v01",
jobName: "agentrun-v01-runner-stale",
managerUrl: server.baseUrl,
image: "127.0.0.1:5000/agentrun/selftest:stale",
sourceCommit: "self-test",
serviceAccountName: "agentrun-v01-runner",
result: { runner: { logPath: "kubectl -n agentrun-v01 logs job/agentrun-v01-runner-stale" }, kubernetes: { created: true, valuesPrinted: false }, valuesPrinted: false },
});
await sleep(5);
const staleResult = await commandResult(client, stale);
const staleDiagnosis = staleResult.diagnosis as JsonRecord;
assert.equal(staleDiagnosis.category, "runner-lost");
assert.equal(staleDiagnosis.staleClaimed, true);
assert.equal(staleDiagnosis.runnerLost, true);
assert.equal(((staleDiagnosis.runnerJob as JsonRecord).phase), "created");
assert.equal(((staleDiagnosis.session as JsonRecord).sessionRefNull), true);
assert.ok(terminal.sessionId, "terminal fixture must have a session id"); assert.ok(terminal.sessionId, "terminal fixture must have a session id");
const terminalSessionId = terminal.sessionId; const terminalSessionId = terminal.sessionId;
const session = await client.get(`/api/v1/sessions/${terminalSessionId}?readerId=timeout-liveness`) as JsonRecord; const session = await client.get(`/api/v1/sessions/${terminalSessionId}?readerId=timeout-liveness`) as JsonRecord;
@@ -82,6 +111,7 @@ const selfTest: SelfTestCase = async (context: SelfTestContext) => {
const commander = await client.get("/api/v1/queue/commander?queue=timeout-liveness&readerId=timeout-liveness") as JsonRecord; const commander = await client.get("/api/v1/queue/commander?queue=timeout-liveness&readerId=timeout-liveness") as JsonRecord;
const commanderItem = ((commander.items as JsonRecord[]) ?? []).find((item) => item.id === task.id) as JsonRecord; const commanderItem = ((commander.items as JsonRecord[]) ?? []).find((item) => item.id === task.id) as JsonRecord;
assert.equal(((commanderItem.supervisor as JsonRecord).phase), "terminal"); assert.equal(((commanderItem.supervisor as JsonRecord).phase), "terminal");
assert.equal((((commanderItem.supervisor as JsonRecord).diagnosis as JsonRecord).category), "execution-hard-timeout");
assert.equal((((commanderItem.supervisor as JsonRecord).timeoutBudget as JsonRecord).state), "timed-out"); assert.equal((((commanderItem.supervisor as JsonRecord).timeoutBudget as JsonRecord).state), "timed-out");
const commanderSummary = summarizeQueueCommanderSnapshot(commander, { limit: 5 }); const commanderSummary = summarizeQueueCommanderSnapshot(commander, { limit: 5 });
const summaryItem = ((commanderSummary.items as JsonRecord[]) ?? []).find((item) => item.id === task.id) as JsonRecord; const summaryItem = ((commanderSummary.items as JsonRecord[]) ?? []).find((item) => item.id === task.id) as JsonRecord;
@@ -90,19 +120,19 @@ const selfTest: SelfTestCase = async (context: SelfTestContext) => {
assert.equal((((summaryItem.supervisor as JsonRecord).terminalClassification as JsonRecord).providerEvidence), "insufficient"); assert.equal((((summaryItem.supervisor as JsonRecord).terminalClassification as JsonRecord).providerEvidence), "insufficient");
assert.equal(JSON.stringify(commanderSummary).includes("hwpod workspace apply-patch"), false, "commander summary must stay compact and avoid dumping command bodies"); assert.equal(JSON.stringify(commanderSummary).includes("hwpod workspace apply-patch"), false, "commander summary must stay compact and avoid dumping command bodies");
assert.equal(JSON.stringify(summaryItem).includes("fullRecordBytes"), false, "commander item must not add bookkeeping noise"); assert.equal(JSON.stringify(summaryItem).includes("fullRecordBytes"), false, "commander item must not add bookkeeping noise");
assertNoSecretLeak({ toolResult, assistantLive, inactiveLive, terminalResult, noSessionResult, session, commanderSummary }); assertNoSecretLeak({ toolResult, assistantLive, inactiveLive, terminalResult, noSessionResult, staleResult, session, commanderSummary });
return { name: "timeout-liveness", tests: ["tool-in-flight-liveness", "assistant-progress-liveness", "stdio-inactive-timeout-budget", "terminal-timeout-recovery", "no-session-drilldown", "terminal-classification", "queue-commander-supervisor"] }; return { name: "timeout-liveness", tests: ["tool-in-flight-liveness", "assistant-progress-liveness", "stdio-inactive-timeout-budget", "terminal-timeout-recovery", "no-session-drilldown", "terminal-classification", "queue-commander-supervisor", "diagnosis-visibility", "stale-claimed-runner-lost"] };
} finally { } finally {
await new Promise<void>((resolve) => server.server.close(() => resolve())); await new Promise<void>((resolve) => server.server.close(() => resolve()));
} }
}; };
async function createActiveRun(client: ManagerClient, context: SelfTestContext, sessionSuffix: string, timeoutMs: number, options: { session?: boolean } = {}): Promise<{ runId: string; commandId: string; sessionId: string | null }> { async function createActiveRun(client: ManagerClient, context: SelfTestContext, sessionSuffix: string, timeoutMs: number, options: { session?: boolean; leaseMs?: number } = {}): Promise<{ runId: string; commandId: string; sessionId: string | null }> {
const sessionId = `selftest-${sessionSuffix}`; const sessionId = `selftest-${sessionSuffix}`;
const run = await client.post("/api/v1/runs", runBody(context, options.session === false ? null : sessionId, timeoutMs)) as JsonRecord; const run = await client.post("/api/v1/runs", runBody(context, options.session === false ? null : sessionId, timeoutMs)) as JsonRecord;
const command = await client.post(`/api/v1/runs/${run.id}/commands`, { type: "turn", payload: { prompt: sessionSuffix }, idempotencyKey: sessionSuffix }) as JsonRecord; const command = await client.post(`/api/v1/runs/${run.id}/commands`, { type: "turn", payload: { prompt: sessionSuffix }, idempotencyKey: sessionSuffix }) as JsonRecord;
await client.post(`/api/v1/runs/${run.id}/claim`, { runnerId: `runner_${sessionSuffix}`, leaseMs: 60_000 }); await client.post(`/api/v1/runs/${run.id}/claim`, { runnerId: `runner_${sessionSuffix}`, leaseMs: options.leaseMs ?? 60_000 });
await client.post(`/api/v1/commands/${command.id}/ack`, {}); await client.post(`/api/v1/commands/${command.id}/ack`, {});
return { runId: String(run.id), commandId: String(command.id), sessionId: options.session === false ? null : sessionId }; return { runId: String(run.id), commandId: String(command.id), sessionId: options.session === false ? null : sessionId };
} }
@@ -122,10 +122,13 @@ async function assertRunnerJobStatus(client: ManagerClient, context: SelfTestCon
assert.equal(status.runId, item.runId); assert.equal(status.runId, item.runId);
assert.equal(status.commandId, item.commandId); assert.equal(status.commandId, item.commandId);
assert.equal(status.phase, "created"); assert.equal(status.phase, "created");
assert.equal(((status.diagnosis as JsonRecord).category), "runner-job-not-started");
assert.equal(((status.diagnosis as JsonRecord).runnerLostSuspected), true);
assert.equal(status.valuesPrinted, false); assert.equal(status.valuesPrinted, false);
assert.equal(typeof status.logPath, "string"); assert.equal(typeof status.logPath, "string");
const single = await client.get(`/api/v1/runs/${item.runId}/runner-jobs/${String(status.id)}`) as JsonRecord; const single = await client.get(`/api/v1/runs/${item.runId}/runner-jobs/${String(status.id)}`) as JsonRecord;
assert.equal(single.jobName, status.jobName); assert.equal(single.jobName, status.jobName);
assert.equal(((single.diagnosis as JsonRecord).phase), "created");
assertNoSecretLeak({ list, single }); assertNoSecretLeak({ list, single });
} }