fix: 增强长 turn liveness 可见性

This commit is contained in:
Codex
2026-06-10 12:05:21 +08:00
parent 43d42fe087
commit 43c47d3fa9
7 changed files with 362 additions and 15 deletions
+102 -11
View File
@@ -43,7 +43,7 @@ export async function buildRunResult(store: AgentRunStore, runId: string, comman
const failureDetails = resultFailureDetails(scopedEvents, terminal);
const reply = assistantReply(scopedEvents);
const blocker = terminal === "blocked" || terminal === "failed" ? { failureKind, message: failureMessage, details: failureDetails } : null;
const liveness = livenessSnapshot(run, command, events, scopedEvents, terminal);
const liveness = livenessSnapshot(run, command, events, scopedEvents, terminal, failureKind, failureMessage);
const steerDelivery = command?.type === "steer" ? steerDeliverySummary(events, command.id) : null;
return {
runId: run.id,
@@ -92,7 +92,7 @@ export async function buildRunResult(store: AgentRunStore, runId: string, comman
};
}
function livenessSnapshot(run: RunRecord, command: CommandRecord | null, events: RunEvent[], scopedEvents: RunEvent[], terminal: TerminalStatus | null): JsonRecord {
function livenessSnapshot(run: RunRecord, command: CommandRecord | null, events: RunEvent[], scopedEvents: RunEvent[], terminal: TerminalStatus | null, failureKind: FailureKind | null, failureMessage: string | null): JsonRecord {
const nowMs = Date.now();
const active = terminal === null && !runIsTerminal(run) && !commandIsTerminal(command);
const lastEvent = events.at(-1) ?? null;
@@ -100,7 +100,10 @@ function livenessSnapshot(run: RunRecord, command: CommandRecord | null, events:
const lastCommandActivity = lastVisibleActivity ?? latestLivenessActivity(scopedEvents);
const lease = leaseSummary(run, nowMs);
const transportDisconnect = latestTransportDisconnect(scopedEvents);
const phase = livenessPhase({ active, command, lastVisibleActivity, leaseExpired: lease.leaseExpired, transportDisconnect });
const lastActivity = livenessActivitySummary(lastCommandActivity, nowMs);
const timeoutBudget = timeoutBudgetSummary(run, command, terminal, failureKind, nowMs);
const phase = livenessPhase({ active, command, lastVisibleActivity, leaseExpired: lease.leaseExpired, transportDisconnect, timeoutBudget, lastActivity });
const afterSeq = lastEvent?.seq ?? 0;
return {
phase,
active,
@@ -112,15 +115,17 @@ function livenessSnapshot(run: RunRecord, command: CommandRecord | null, events:
lastSeq: lastEvent?.seq ?? 0,
lastEventAt: lastEvent?.createdAt ?? null,
lastEventAgeMs: lastEvent ? ageMs(lastEvent.createdAt, nowMs) : null,
lastCommandActivity: livenessActivitySummary(lastCommandActivity, nowMs),
lastActivity,
lastCommandActivity: lastActivity,
timeoutBudget,
lease,
transportDisconnect: transportDisconnect ? livenessActivitySummary(transportDisconnect, nowMs) : null,
recoveryActions: active ? recoveryActions(run, command, lastEvent?.seq ?? 0) : [],
recoveryActions: recoveryActions({ run, command, afterSeq, active, terminal, failureKind, failureMessage }),
valuesPrinted: false,
};
}
function livenessPhase(input: { active: boolean; command: CommandRecord | null; lastVisibleActivity: RunEvent | null; leaseExpired: boolean | null; transportDisconnect: RunEvent | null }): string {
function livenessPhase(input: { active: boolean; command: CommandRecord | null; lastVisibleActivity: RunEvent | null; leaseExpired: boolean | null; transportDisconnect: RunEvent | null; timeoutBudget: JsonRecord; lastActivity: JsonRecord | null }): string {
if (!input.active) return "terminal";
if (input.command?.state === "pending") return "waiting-runner";
if (input.leaseExpired === true) return "runner-heartbeat-stale";
@@ -130,9 +135,43 @@ function livenessPhase(input: { active: boolean; command: CommandRecord | null;
if (status === "inProgress" || status === "running") return "waiting-tool";
if (status === "completed") return "idle-after-tool";
}
if (input.lastVisibleActivity?.type === "command_output") return "waiting-tool-output";
if (input.lastVisibleActivity?.type === "assistant_message") return "waiting-model-output";
const remainingMs = numberJsonValue(input.timeoutBudget.remainingMs);
const timeoutMs = numberJsonValue(input.timeoutBudget.timeoutMs);
const activityAgeMs = numberJsonValue(input.lastActivity?.ageMs);
const inactiveThresholdMs = timeoutMs === null ? 300_000 : Math.min(300_000, Math.max(60_000, Math.floor(timeoutMs / 4)));
if (remainingMs !== null && remainingMs <= Math.min(120_000, Math.max(10_000, Math.floor((timeoutMs ?? 120_000) / 10))) && (activityAgeMs === null || activityAgeMs >= inactiveThresholdMs)) return "runner-stdio-inactive";
return "waiting-model";
}
function timeoutBudgetSummary(run: RunRecord, command: CommandRecord | null, terminal: TerminalStatus | null, failureKind: FailureKind | null, nowMs: number): JsonRecord {
const timeoutMs = typeof run.executionPolicy.timeoutMs === "number" && Number.isFinite(run.executionPolicy.timeoutMs) && run.executionPolicy.timeoutMs > 0 ? Math.trunc(run.executionPolicy.timeoutMs) : null;
const startedAt = command?.acknowledgedAt ?? command?.createdAt ?? run.updatedAt ?? run.createdAt;
const startedMs = Date.parse(startedAt);
const elapsedMs = timeoutMs !== null && Number.isFinite(startedMs) ? Math.max(0, nowMs - startedMs) : null;
const remainingMs = timeoutMs !== null && elapsedMs !== null ? Math.max(0, timeoutMs - elapsedMs) : null;
const approachingThresholdMs = timeoutMs === null ? null : Math.min(120_000, Math.max(10_000, Math.floor(timeoutMs / 10)));
let state = "unknown";
if (timeoutMs !== null && elapsedMs !== null) {
if (terminal !== null) state = failureKind === "backend-timeout" ? "timed-out" : "terminal";
else if (remainingMs === 0) state = "overdue";
else if (approachingThresholdMs !== null && remainingMs !== null && remainingMs <= approachingThresholdMs) state = "approaching-hard-timeout";
else state = "within-budget";
}
return {
timeoutMs,
source: "executionPolicy.timeoutMs",
startedAt,
elapsedMs,
remainingMs,
approachingThresholdMs,
state,
hardTimeout: true,
valuesPrinted: false,
};
}
function leaseSummary(run: RunRecord, nowMs: number): JsonRecord & { leaseExpired: boolean | null } {
const leaseExpiresMs = run.leaseExpiresAt ? Date.parse(run.leaseExpiresAt) : NaN;
const hasLease = Boolean(run.claimedBy && run.leaseExpiresAt && Number.isFinite(leaseExpiresMs));
@@ -172,33 +211,85 @@ function livenessActivitySummary(event: RunEvent | null, nowMs: number): JsonRec
if (!event) return null;
return {
seq: event.seq,
sourceSeq: event.seq,
eventId: event.id,
type: event.type,
activityKind: activityKind(event),
phase: typeof event.payload.phase === "string" ? event.payload.phase : null,
status: typeof event.payload.status === "string" ? event.payload.status : null,
toolName: typeof event.payload.toolName === "string" ? event.payload.toolName : null,
itemId: typeof event.payload.itemId === "string" ? event.payload.itemId : null,
exitCode: normalizedExitCode(event.payload.exitCode),
commandId: typeof event.payload.commandId === "string" ? event.payload.commandId : null,
createdAt: event.createdAt,
ageMs: ageMs(event.createdAt, nowMs),
summary: activityTextSummary(event),
valuesPrinted: false,
};
}
function activityKind(event: RunEvent): string {
if (event.type === "tool_call") {
const status = typeof event.payload.status === "string" ? event.payload.status : "";
if (status === "running" || status === "inProgress") return "tool-in-flight";
if (status === "completed") return "tool-completed";
if (status === "cancelled") return "tool-cancelled";
if (status === "failed") return "tool-failed";
return "tool";
}
if (event.type === "command_output") return "tool-output";
if (event.type === "assistant_message") return event.payload.progress === true ? "assistant-progress" : "assistant-output";
if (event.type === "diff") return "diff";
if (event.type === "error") return "error";
if (event.type === "terminal_status") return "terminal";
return "backend-activity";
}
function activityTextSummary(event: RunEvent): string | null {
if (event.type === "tool_call") {
const name = typeof event.payload.toolName === "string" ? event.payload.toolName : typeof event.payload.type === "string" ? event.payload.type : "tool";
const status = typeof event.payload.status === "string" ? event.payload.status : "observed";
return `${name} ${status}`;
}
const fromSummary = typeof event.payload.outputSummary === "string" ? event.payload.outputSummary : typeof event.payload.summary === "string" ? event.payload.summary : null;
const text = fromSummary ?? textPayload(event.payload) ?? (typeof event.payload.phase === "string" ? event.payload.phase : null);
if (!text) return null;
return boundedTextSummary(text.replace(/\s+/gu, " ").trim(), { limitChars: 240 }).text as string;
}
function ageMs(value: string, nowMs: number): number | null {
const parsed = Date.parse(value);
return Number.isFinite(parsed) ? Math.max(0, nowMs - parsed) : null;
}
function recoveryActions(run: RunRecord, command: CommandRecord | null, afterSeq: number): JsonRecord[] {
function recoveryActions(input: { run: RunRecord; command: CommandRecord | null; afterSeq: number; active: boolean; terminal: TerminalStatus | null; failureKind: FailureKind | null; failureMessage: string | null }): JsonRecord[] {
const { run, command, afterSeq, active, terminal, failureKind, failureMessage } = input;
const sessionId = run.sessionRef?.sessionId ?? null;
const traceCommand = sessionId ? `./scripts/agentrun sessions trace ${sessionId} --after-seq ${afterSeq} --limit 100 --run-id ${run.id}` : `./scripts/agentrun runs events ${run.id} --after-seq ${afterSeq} --limit 100 --summary`;
const outputCommand = sessionId ? `./scripts/agentrun sessions output ${sessionId} --after-seq ${afterSeq} --limit 100 --run-id ${run.id}` : null;
const actions: JsonRecord[] = [
{ action: "poll-trace", runId: run.id, afterSeq },
{ action: "poll-output", runId: run.id, afterSeq },
{ action: "poll-trace", runId: run.id, commandId: command?.id ?? null, afterSeq, command: traceCommand, valuesPrinted: false },
];
if (command) actions.push({ action: "cancel-command", runId: run.id, commandId: command.id });
else actions.push({ action: "cancel-run", runId: run.id });
if (outputCommand) actions.push({ action: "poll-output", runId: run.id, commandId: command?.id ?? null, afterSeq, command: outputCommand, valuesPrinted: false });
if (active) {
if (sessionId) actions.push({ action: "steer-session", sessionId, runId: run.id, commandId: command?.id ?? null, command: `./scripts/agentrun sessions steer ${sessionId} --prompt-stdin`, valuesPrinted: false });
if (command) actions.push({ action: "cancel-command", runId: run.id, commandId: command.id, command: `./scripts/agentrun commands cancel ${command.id} --reason <reason>`, valuesPrinted: false });
else actions.push({ action: "cancel-run", runId: run.id, command: `./scripts/agentrun runs cancel ${run.id} --reason <reason>`, valuesPrinted: false });
return actions;
}
if (terminal === "failed" || terminal === "blocked" || terminal === "cancelled") {
if (command) actions.push({ action: "inspect-result", runId: run.id, commandId: command.id, command: `./scripts/agentrun commands result ${command.id} --run-id ${run.id}`, valuesPrinted: false });
if (sessionId) actions.push({ action: "resume-session", sessionId, command: `./scripts/agentrun sessions turn ${sessionId} --prompt-stdin`, valuesPrinted: false });
if (failureKind === "backend-timeout") actions.push({ action: "split-task", reason: "backend-timeout", hint: "把大 patch / 长工具链拆成更短 turn 后用同一 session 续跑", failureMessage: failureMessage ? boundedTextSummary(failureMessage, { limitChars: 200 }).text as string : null, valuesPrinted: false });
else actions.push({ action: "retry-or-split", reason: failureKind ?? "terminal", hint: "先读 trace/output 的 detail id,再决定 steer、重跑或拆分", valuesPrinted: false });
}
return actions;
}
function numberJsonValue(value: JsonValue | undefined): number | null {
return typeof value === "number" && Number.isFinite(value) ? value : null;
}
function steerDeliverySummary(events: RunEvent[], commandId: string): JsonRecord {
const related = events.filter((event) => event.payload.commandId === commandId);
const completed = latestPhaseEvent(related, "turn/steer:completed");
+105 -1
View File
@@ -114,6 +114,92 @@ async function refreshQueueTaskRecordForRead(store: AgentRunStore, task: QueueTa
}
}
async function queueCommanderForRead(store: AgentRunStore, queue: string | undefined, readerId: string | null): Promise<JsonValue> {
const snapshot = await store.queueCommander(queue, readerId) as unknown as JsonRecord;
const items = Array.isArray(snapshot.items) ? snapshot.items : [];
const enrichedItems = await Promise.all(items.map(async (item) => {
const task = asJsonRecord(item);
if (!task) return item as JsonValue;
const supervisor = await queueTaskSupervisor(store, task);
return supervisor ? { ...task, supervisor, valuesPrinted: false } : task;
}));
return { ...snapshot, items: enrichedItems, valuesPrinted: false } as JsonValue;
}
async function queueTaskSupervisor(store: AgentRunStore, task: JsonRecord): Promise<JsonRecord | null> {
const attempt = asJsonRecord(task.latestAttempt);
const runId = stringJsonValue(attempt?.runId);
if (!runId) return null;
try {
const result = await buildRunResult(store, runId, stringJsonValue(attempt?.commandId) ?? undefined);
const liveness = asJsonRecord(result.liveness);
const lastActivity = asJsonRecord(liveness?.lastActivity ?? liveness?.lastCommandActivity);
const timeoutBudget = asJsonRecord(liveness?.timeoutBudget);
return {
runId: stringJsonValue(result.runId),
commandId: stringJsonValue(result.commandId),
status: stringJsonValue(result.status),
terminalStatus: stringJsonValue(result.terminalStatus),
failureKind: stringJsonValue(result.failureKind),
phase: stringJsonValue(liveness?.phase),
active: liveness?.active === true,
lastSeq: numberJsonValue(liveness?.lastSeq ?? result.lastSeq),
lastActivity: lastActivity ? compactActivity(lastActivity) : null,
timeoutBudget: timeoutBudget ? compactTimeoutBudget(timeoutBudget) : null,
recoveryActions: compactRecoveryActions(liveness?.recoveryActions),
valuesPrinted: false,
};
} catch (error) {
return { phase: "unavailable", failureKind: "infra-failed", message: error instanceof Error ? error.message : String(error), valuesPrinted: false };
}
}
function compactActivity(activity: JsonRecord): JsonRecord {
return {
sourceSeq: numberJsonValue(activity.sourceSeq ?? activity.seq),
eventId: stringJsonValue(activity.eventId),
activityKind: stringJsonValue(activity.activityKind),
type: stringJsonValue(activity.type),
status: stringJsonValue(activity.status),
toolName: stringJsonValue(activity.toolName),
itemId: stringJsonValue(activity.itemId),
ageMs: numberJsonValue(activity.ageMs),
summary: boundedJsonString(activity.summary, 180),
valuesPrinted: false,
};
}
function compactTimeoutBudget(budget: JsonRecord): JsonRecord {
return {
state: stringJsonValue(budget.state),
timeoutMs: numberJsonValue(budget.timeoutMs),
elapsedMs: numberJsonValue(budget.elapsedMs),
remainingMs: numberJsonValue(budget.remainingMs),
startedAt: stringJsonValue(budget.startedAt),
source: stringJsonValue(budget.source),
valuesPrinted: false,
};
}
function compactRecoveryActions(value: JsonValue | undefined): JsonValue[] {
if (!Array.isArray(value)) return [];
return value.slice(0, 5).map((item) => {
const action = asJsonRecord(item);
if (!action) return { action: "unknown", valuesPrinted: false };
return {
action: stringJsonValue(action.action),
reason: stringJsonValue(action.reason),
runId: stringJsonValue(action.runId),
commandId: stringJsonValue(action.commandId),
sessionId: stringJsonValue(action.sessionId),
afterSeq: numberJsonValue(action.afterSeq),
command: boundedJsonString(action.command, 220),
hint: boundedJsonString(action.hint, 220),
valuesPrinted: false,
};
});
}
async function route({ method, url, body, store, sourceCommit, runnerJobDefaults, sessionPvcDefaults, providerProfileDefaults }: { method: string; url: URL; body: unknown; store: AgentRunStore; sourceCommit: string; runnerJobDefaults?: NonNullable<ManagerServerOptions["runnerJobDefaults"]>; sessionPvcDefaults?: NonNullable<ManagerServerOptions["sessionPvcOptions"]>; providerProfileDefaults?: NonNullable<ManagerServerOptions["providerProfileOptions"]> }): Promise<JsonValue> {
const path = url.pathname;
if (method === "GET" && (path === "/health" || path === "/health/live" || path === "/health/readiness")) {
@@ -334,7 +420,7 @@ async function route({ method, url, body, store, sourceCommit, runnerJobDefaults
if (method === "GET" && path === "/api/v1/queue/commander") {
const queue = url.searchParams.get("queue") ?? undefined;
await refreshRunningQueueTasksForRead(store, queue);
return await store.queueCommander(queue, url.searchParams.get("readerId")) as unknown as JsonValue;
return await queueCommanderForRead(store, queue, url.searchParams.get("readerId"));
}
if (method === "POST" && path === "/api/v1/runs") return await store.createRun(validateCreateRun(body)) as unknown as JsonValue;
const runMatch = path.match(/^\/api\/v1\/runs\/([^/]+)$/u);
@@ -460,6 +546,24 @@ function numberField(record: JsonRecord, key: string, fallback: number): number
return typeof value === "number" && Number.isFinite(value) ? value : fallback;
}
function asJsonRecord(value: unknown): JsonRecord | null {
return typeof value === "object" && value !== null && !Array.isArray(value) ? value as JsonRecord : null;
}
function stringJsonValue(value: JsonValue | undefined): string | null {
return typeof value === "string" && value.length > 0 ? value : null;
}
function numberJsonValue(value: JsonValue | undefined): number | null {
return typeof value === "number" && Number.isFinite(value) ? value : null;
}
function boundedJsonString(value: JsonValue | undefined, limit: number): string | null {
if (typeof value !== "string" || value.length === 0) return null;
const normalized = value.replace(/\s+/gu, " ").trim();
return normalized.length > limit ? `${normalized.slice(0, Math.max(0, limit - 3))}...` : normalized;
}
function stringField(record: JsonRecord, key: string): string {
const value = record[key];
if (typeof value !== "string" || value.trim().length === 0) throw new AgentRunError("schema-invalid", `${key} is required`, { httpStatus: 400 });