From 68ef21421ac8538ea6fd33787dfb711fac9f7431 Mon Sep 17 00:00:00 2001 From: AgentRun Codex Date: Thu, 11 Jun 2026 20:26:17 +0800 Subject: [PATCH] fix: derive runner job phase from observed events --- src/mgr/diagnosis.ts | 106 ++++++++++++++++-- src/mgr/runner-job-status.ts | 26 ++--- .../cases/60-hwlab-baseline-contract.ts | 2 +- 3 files changed, 107 insertions(+), 27 deletions(-) diff --git a/src/mgr/diagnosis.ts b/src/mgr/diagnosis.ts index 5cf08fb..7d89a8c 100644 --- a/src/mgr/diagnosis.ts +++ b/src/mgr/diagnosis.ts @@ -66,14 +66,17 @@ export function runDiagnosis(input: RunDiagnosisInput): JsonRecord { } export function runnerJobDiagnosis(job: RunnerJobRecord, events: RunEvent[] = []): JsonRecord { - const terminalEvent = latestTerminalEvent(events, job.commandId); - const phase = terminalEvent ? `terminal:${String(terminalEvent.payload?.terminalStatus ?? "unknown")}` : recordAt(job.result, "kubernetes")?.created === true ? "created" : "recorded"; + const observation = runnerJobObservation(job, events); + const phase = stringValue(observation.phase) ?? "unknown"; const notStarted = phase === "created" || phase === "recorded"; return { - category: notStarted ? "runner-job-not-started" : phase.startsWith("terminal:") ? "runner-job-terminal" : "runner-job-observed", + category: stringValue(observation.category) ?? (notStarted ? "runner-job-created" : phase.startsWith("terminal:") ? "runner-job-terminal" : "runner-job-observed"), runnerLostSuspected: notStarted, phase, - evidenceLevel: notStarted ? "medium" : "high", + evidenceLevel: stringValue(observation.evidenceLevel) ?? (notStarted ? "medium" : "high"), + lastObservedSeq: numberValue(observation.lastObservedSeq), + lastObservedAt: stringValue(observation.lastObservedAt), + lastObservedKind: stringValue(observation.lastObservedKind), runId: job.runId, commandId: job.commandId, runnerJobId: job.id, @@ -91,6 +94,59 @@ export function runnerJobDiagnosis(job: RunnerJobRecord, events: RunEvent[] = [] }; } +export function runnerJobObservation(job: RunnerJobRecord, events: RunEvent[] = []): JsonRecord { + const terminalEvent = latestTerminalEvent(events, job.commandId); + const terminalStatus = stringValue(terminalEvent?.payload.terminalStatus) ?? stringValue(latestTerminalStatusFromResult(job.result)); + if (terminalStatus) { + return { + phase: `terminal:${terminalStatus}`, + category: "runner-job-terminal", + terminalStatus, + failureKind: stringValue(terminalEvent?.payload.failureKind), + startedAt: stringValue(firstObservedRunnerEvent(job, events)?.createdAt), + finishedAt: stringValue(terminalEvent?.createdAt), + lastObservedSeq: numberValue(terminalEvent?.seq), + lastObservedAt: stringValue(terminalEvent?.createdAt), + lastObservedKind: eventKind(terminalEvent), + evidenceLevel: "high", + valuesPrinted: false, + }; + } + + const observed = relevantRunnerJobEvents(job, events); + const lastObserved = observed.at(-1) ?? null; + if (lastObserved) { + return { + phase: "running", + category: "runner-job-running", + terminalStatus: null, + failureKind: null, + startedAt: stringValue(observed[0]?.createdAt), + finishedAt: null, + lastObservedSeq: numberValue(lastObserved.seq), + lastObservedAt: stringValue(lastObserved.createdAt), + lastObservedKind: eventKind(lastObserved), + evidenceLevel: "high", + valuesPrinted: false, + }; + } + + const created = recordAt(job.result, "kubernetes")?.created === true; + return { + phase: created ? "created" : "recorded", + category: created ? "runner-job-created" : "runner-job-recorded", + terminalStatus: null, + failureKind: null, + startedAt: null, + finishedAt: null, + lastObservedSeq: null, + lastObservedAt: null, + lastObservedKind: null, + evidenceLevel: created ? "medium" : "low", + valuesPrinted: false, + }; +} + function diagnosisCategory(input: { staleClaimed: boolean; runnerLost: boolean; terminalCommandOpenRun: boolean; providerEvidence: string; terminalCategory: string | null }): string { if (input.runnerLost) return "runner-lost"; if (input.staleClaimed) return "stale-claimed"; @@ -122,19 +178,21 @@ function recoveryActionsForDiagnosis(input: { run: RunRecord; command: CommandRe } function runnerJobReference(job: RunnerJobRecord, events: RunEvent[]): JsonRecord { - const kubernetes = recordAt(job.result, "kubernetes"); - const terminalStatus = stringValue(latestTerminalEvent(events, job.commandId)?.payload.terminalStatus) ?? stringValue(latestTerminalStatusFromResult(job.result)); - const phase = terminalStatus ? `terminal:${terminalStatus}` : kubernetes?.created === true ? "created" : "recorded"; + const observation = runnerJobObservation(job, events); + const terminalStatus = stringValue(observation.terminalStatus); return { runnerJobId: job.id, attemptId: job.attemptId, runnerId: job.runnerId, namespace: job.namespace, jobName: job.jobName, - phase, + phase: stringValue(observation.phase) ?? "unknown", terminalStatus, - startedAt: null, - finishedAt: null, + startedAt: stringValue(observation.startedAt), + finishedAt: stringValue(observation.finishedAt), + lastObservedSeq: numberValue(observation.lastObservedSeq), + lastObservedAt: stringValue(observation.lastObservedAt), + lastObservedKind: stringValue(observation.lastObservedKind), logPath: stringValue(recordAt(job.result, "runner")?.logPath), valuesPrinted: false, }; @@ -170,6 +228,34 @@ function latestTerminalEvent(events: RunEvent[], commandId: string): RunEvent | return null; } +function relevantRunnerJobEvents(job: RunnerJobRecord, events: RunEvent[]): RunEvent[] { + return events.filter((event) => isRunnerJobActivity(job, event)); +} + +function firstObservedRunnerEvent(job: RunnerJobRecord, events: RunEvent[]): RunEvent | null { + return relevantRunnerJobEvents(job, events)[0] ?? null; +} + +function isRunnerJobActivity(job: RunnerJobRecord, event: RunEvent): boolean { + const payload = event.payload; + if (payload?.phase === "runner-job-created") return false; + if (payload?.runnerId === job.runnerId || payload?.attemptId === job.attemptId) return true; + if (payload?.commandId === job.commandId) { + if (event.type === "tool_call" || event.type === "assistant_message" || event.type === "command_output" || event.type === "error") return true; + if (event.type === "backend_status" && typeof payload.phase === "string" && payload.phase !== "command-created") return true; + } + return false; +} + +function eventKind(event: RunEvent | null): string | null { + if (!event) return null; + const phase = stringValue(event.payload?.phase); + if (phase) return `${event.type}:${phase}`; + const toolName = stringValue(event.payload?.toolName); + if (toolName) return `${event.type}:${toolName}`; + return event.type; +} + function compactRecord(record: JsonRecord, keys: string[]): JsonRecord { const result: JsonRecord = {}; for (const key of keys) { diff --git a/src/mgr/runner-job-status.ts b/src/mgr/runner-job-status.ts index 1755c62..dfb840d 100644 --- a/src/mgr/runner-job-status.ts +++ b/src/mgr/runner-job-status.ts @@ -1,14 +1,14 @@ import type { JsonRecord, RunEvent, RunnerJobRecord, TerminalStatus } from "../common/types.js"; -import { runnerJobDiagnosis } from "./diagnosis.js"; +import { runnerJobDiagnosis, runnerJobObservation } from "./diagnosis.js"; export function runnerJobStatusSummary(job: RunnerJobRecord, events: RunEvent[] = []): JsonRecord { - const terminalEvent = latestTerminalEvent(events, job.commandId); + const observation = runnerJobObservation(job, events); const runner = recordAt(job.result, "runner"); const jobIdentity = recordAt(job.result, "jobIdentity"); const kubernetes = recordAt(job.result, "kubernetes"); const retention = recordAt(job.result, "retention"); const envImage = recordAt(job.result, "envImage"); - const terminalStatus = terminalEvent?.payload.terminalStatus; + const terminalStatus = observation.terminalStatus; return { id: job.id, runId: job.runId, @@ -22,12 +22,15 @@ export function runnerJobStatusSummary(job: RunnerJobRecord, events: RunEvent[] envImage, sourceCommit: job.sourceCommit, serviceAccountName: job.serviceAccountName, - phase: terminalStatus ? `terminal:${terminalStatus}` : kubernetes.created === true ? "created" : "recorded", + phase: typeof observation.phase === "string" ? observation.phase : "unknown", terminalStatus: isTerminalStatus(terminalStatus) ? terminalStatus : null, - failureKind: typeof terminalEvent?.payload.failureKind === "string" ? terminalEvent.payload.failureKind : null, + failureKind: typeof observation.failureKind === "string" ? observation.failureKind : null, exitCode: null, - startedAt: null, - finishedAt: terminalEvent?.createdAt ?? null, + startedAt: typeof observation.startedAt === "string" ? observation.startedAt : null, + finishedAt: typeof observation.finishedAt === "string" ? observation.finishedAt : null, + lastObservedSeq: typeof observation.lastObservedSeq === "number" ? observation.lastObservedSeq : null, + lastObservedAt: typeof observation.lastObservedAt === "string" ? observation.lastObservedAt : null, + lastObservedKind: typeof observation.lastObservedKind === "string" ? observation.lastObservedKind : null, jobIdentity, podIdentity: recordAt(job.result, "podIdentity"), logPath: typeof runner.logPath === "string" ? runner.logPath : null, @@ -40,15 +43,6 @@ export function runnerJobStatusSummary(job: RunnerJobRecord, events: RunEvent[] }; } -function latestTerminalEvent(events: RunEvent[], commandId: string): RunEvent | null { - for (const event of [...events].reverse()) { - if (event.payload.commandId && event.payload.commandId !== commandId) continue; - if (event.type === "terminal_status") return event; - if (event.type === "backend_status" && event.payload.phase === "command-terminal" && event.payload.commandId === commandId) return event; - } - return null; -} - function recordAt(record: JsonRecord, key: string): JsonRecord { const value = record[key]; return typeof value === "object" && value !== null && !Array.isArray(value) ? value as JsonRecord : {}; diff --git a/src/selftest/cases/60-hwlab-baseline-contract.ts b/src/selftest/cases/60-hwlab-baseline-contract.ts index 67b94bb..4f5c553 100644 --- a/src/selftest/cases/60-hwlab-baseline-contract.ts +++ b/src/selftest/cases/60-hwlab-baseline-contract.ts @@ -122,7 +122,7 @@ async function assertRunnerJobStatus(client: ManagerClient, context: SelfTestCon assert.equal(status.runId, item.runId); assert.equal(status.commandId, item.commandId); assert.equal(status.phase, "created"); - assert.equal(((status.diagnosis as JsonRecord).category), "runner-job-not-started"); + assert.equal(((status.diagnosis as JsonRecord).category), "runner-job-created"); assert.equal(((status.diagnosis as JsonRecord).runnerLostSuspected), true); assert.equal(status.valuesPrinted, false); assert.equal(typeof status.logPath, "string");