From 43c47d3fa9fc613add161f75210bf755c8ff9835 Mon Sep 17 00:00:00 2001 From: Codex Date: Wed, 10 Jun 2026 12:05:21 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E5=A2=9E=E5=BC=BA=E9=95=BF=20turn=20liv?= =?UTF-8?q?eness=20=E5=8F=AF=E8=A7=81=E6=80=A7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- docs/reference/spec-v01-agentrun-mgr.md | 2 +- docs/reference/spec-v01-cli.md | 2 +- docs/reference/spec-v01-queue.md | 2 +- scripts/src/cli.ts | 20 ++++ src/mgr/result.ts | 113 ++++++++++++++++-- src/mgr/server.ts | 106 ++++++++++++++++- src/selftest/cases/55-timeout-liveness.ts | 132 ++++++++++++++++++++++ 7 files changed, 362 insertions(+), 15 deletions(-) create mode 100644 src/selftest/cases/55-timeout-liveness.ts diff --git a/docs/reference/spec-v01-agentrun-mgr.md b/docs/reference/spec-v01-agentrun-mgr.md index 76b1c63..731660f 100644 --- a/docs/reference/spec-v01-agentrun-mgr.md +++ b/docs/reference/spec-v01-agentrun-mgr.md @@ -167,7 +167,7 @@ Manager 只承接 HWLAB v0.2 Code Agent 的通用执行事实,不承接 HWLAB | `runId` / `commandId` / `attemptId` | 支持调用方持久关联和问题定位。 | | `artifactSummary` | 第一阶段只放有界摘要、字节数、截断标记和必要引用;不内嵌大 stdout/stderr。 | | `toolCallSummary` | 输出有界、脱敏的 tool call 状态摘要,至少包含 `count`、`statusCounts`、`exitCodeCounts` 和最近若干条 `items` 的 `method/toolName/type/status/exitCode/command`。消费侧必须用它区分 AgentRun command terminal、agent 内部工具执行和后置诊断,不得用单一 `hwpodExitCode` 覆盖 AgentRun 成功终态。 | -| `liveness` | 查询时派生的 supervisor 活性快照,不写入 durable event。必须暴露 `phase`、`active`、`lastSeq`、`lastEventAgeMs`、`lastCommandActivity`、lease/heartbeat 摘要和可执行恢复动作。`phase` 至少区分 `waiting-runner`、`waiting-model`、`waiting-tool`、`idle-after-tool`、`transport-disconnected`、`runner-heartbeat-stale` 和 `terminal`,避免调用方只能用外层超时猜测 backend 状态。 | +| `liveness` | 查询时派生的 supervisor 活性快照,不写入 durable event。必须暴露 `phase`、`active`、`lastSeq`、`lastEventAgeMs`、`lastActivity`/`lastCommandActivity`、`timeoutBudget`、lease/heartbeat 摘要和可执行恢复动作。`lastActivity` 必须包含 `sourceSeq`、`eventId`、`activityKind` 和 `ageMs`,用于按 id/seq drill-down;默认只给有界摘要,不展开 stdout、runnerTrace、完整 tool command 或 raw event。`timeoutBudget` 必须基于 `executionPolicy.timeoutMs` 暴露 `elapsedMs`、`remainingMs` 和 `state`(如 `within-budget`、`approaching-hard-timeout`、`timed-out`)。`phase` 至少区分 `waiting-runner`、`waiting-model`、`waiting-model-output`、`waiting-tool`、`waiting-tool-output`、`idle-after-tool`、`runner-stdio-inactive`、`transport-disconnected`、`runner-heartbeat-stale` 和 `terminal`,避免调用方只能用外层超时猜测 backend 状态。终态失败/阻塞时仍必须保留恢复动作,例如 inspect result、resume session、split task,而不是返回空数组。 | | `steerDelivery` | 仅在查询 `type=steer` command result 时出现。必须说明 steer 是否已被 runner ack、是否已转发并被 backend `turn/steer` RPC 接受、目标 `targetCommandId`、是否观察到 target command 后续事件,以及“steer command completed 不等于 target turn 已产生后续 assistant/tool 输出”的语义。 | `assistant_message` partial、`command_output` 存在、stdout 非空、backend transport close 或 idle timeout 都不能单独让 result 进入 `completed`。 diff --git a/docs/reference/spec-v01-cli.md b/docs/reference/spec-v01-cli.md index efbbf9e..7d41616 100644 --- a/docs/reference/spec-v01-cli.md +++ b/docs/reference/spec-v01-cli.md @@ -98,7 +98,7 @@ CLI 官方 TypeScript 入口固定为 `scripts/agentrun-cli.ts`。在 G14 非交 - `queue submit/read/cancel/dispatch/refresh --dry-run` 必须只返回 non-mutating plan,固定 `dryRun=true`、`mutation=false`,不得创建 task、mark read、cancel、dispatch、refresh 或启动 runner job。 - `queue dispatch` 是 Q2 的受控手动调度入口,只对单个 task 显式创建 attempt 和 Core run/command/runner job;不得伪装成自动 scheduler;带 `--dry-run` 时只读取 task 并展示将要 POST 的路径和有界 request 摘要。 - `queue refresh` 只根据 Queue task 中保存的 Core run/command 引用回写 Queue attempt 状态,不读取 Core trace 反推 commander 或统计;带 `--dry-run` 时不得写回状态。 -- `queue list/show/commander` 默认返回低噪声 summary,只显示 task/attempt/session ids、state、read cursor、stats 相关字段和 drill-down 命令;需要完整 task payload、resource bundle 或 metadata 时显式使用 `--full|--raw`。 +- `queue list/show/commander` 默认返回低噪声 summary,只显示 task/attempt/session ids、state、read cursor、stats 相关字段、compact supervisor 和 drill-down 命令;commander 的 supervisor 只能放 `phase`、last activity source seq/id、timeout budget 和恢复动作摘要,不得展开完整 payload、trace、tool command、stdout/stderr 或 runnerTrace。需要完整 task payload、resource bundle 或 metadata 时显式使用 `--full|--raw`;需要 trace/output 细节时继续按返回的 `sessionId`/`sourceSeq` 走 `sessions trace|output --seq/--event-id/--item-id --full`。 - `queue show` 不得返回或代理完整 output/trace;输出和 trace 只能通过返回的 `sessionPath` 对应 `sessions ...` 命令查询。 - 需要提交较长 Queue task、dispatch body、run base 或 command payload 时,CLI 必须支持 `--json-stdin`,避免为了 heredoc/stdin 内容先写临时 dump 文件再传 `--json-file`;`sessions turn` 的 runner job override 也必须支持 `--runner-json-stdin`。所有 stdin JSON 仍必须解析为 object,并在 dry-run 中只展示有界 body 摘要、bytes 和 keys。 - `sessions ps` 默认只显示 running 和 unread session;`--state all` 才显示历史 read session,避免旧 session 噪声淹没当前进度。 diff --git a/docs/reference/spec-v01-queue.md b/docs/reference/spec-v01-queue.md index 9ea70f9..73a6f29 100644 --- a/docs/reference/spec-v01-queue.md +++ b/docs/reference/spec-v01-queue.md @@ -101,7 +101,7 @@ Session 命令负责输出、trace 和会话控制: ./scripts/agentrun sessions read [--reader-id ] ``` -不得新增 `queue output`、`queue trace` 或 `queue session/*` 这类子路径代理。`queue list/show/commander` 默认输出低噪声 summary,最多打印 task/attempt/session ids、状态、统计、`sessionPath` 和下一步 `sessions ...` 命令;完整 payload/resource bundle/metadata 只能通过显式 `--full|--raw` 展开。Queue mutation 命令带 `--dry-run` 时必须只返回 `mutation=false` 的计划,不得写 Queue、Core run/command 或 runner job。 +不得新增 `queue output`、`queue trace` 或 `queue session/*` 这类子路径代理。`queue list/show/commander` 默认输出低噪声 summary,最多打印 task/attempt/session ids、状态、统计、`sessionPath`、compact supervisor 和下一步 `sessions ...` 命令;supervisor 只允许披露 phase、last activity source seq/id、timeout budget 和恢复动作摘要,不得展开完整 payload、trace、tool command、stdout/stderr 或 runnerTrace。完整 payload/resource bundle/metadata 只能通过显式 `--full|--raw` 展开;trace/output 细节继续按 `sessionId` + `sourceSeq/eventId/itemId` 走 Session CLI 渐进披露。Queue mutation 命令带 `--dry-run` 时必须只返回 `mutation=false` 的计划,不得写 Queue、Core run/command 或 runner job。 Queue task 的 `resourceBundleRef` 在 dispatch 时原样进入 Core run。若其中声明 `requiredSkills`,Queue 只展示声明和终态摘要,不能自行判定可用;runner 必须在 gitbundle materialization 后、backend 启动前校验 `.agents/skills//SKILL.md`,缺失时以 `required-skill-unavailable` 写入 command/run result 和 events。 diff --git a/scripts/src/cli.ts b/scripts/src/cli.ts index 772775f..2124188 100644 --- a/scripts/src/cli.ts +++ b/scripts/src/cli.ts @@ -699,8 +699,10 @@ function summarizeQueueStats(record: JsonRecord | null): JsonRecord | null { function summarizeQueueTaskWithAttempt(record: JsonRecord | null, fallbackTaskId: string): JsonRecord { const summary = summarizeQueueTaskRecord(record, fallbackTaskId); const latestAttempt = summarizeAttemptRecord(jsonRecordValue(record?.latestAttempt)); + const supervisor = summarizeSupervisorRecord(jsonRecordValue(record?.supervisor)); const sessionRef = jsonRecordValue(record?.sessionRef); if (latestAttempt) summary.latestAttempt = latestAttempt; + if (supervisor) summary.supervisor = supervisor; const sessionId = stringValue(sessionRef?.sessionId) ?? stringValue(latestAttempt?.sessionId); if (sessionId) summary.sessionId = sessionId; if (record?.readCursor !== undefined) summary.read = record.readCursor !== null; @@ -737,6 +739,24 @@ function summarizeAttemptRecord(record: JsonRecord | null): JsonRecord | null { return compactRecord(record, { keys: ["attemptId", "state", "runId", "commandId", "runnerJobId", "sessionId", "sessionPath"] }); } +function summarizeSupervisorRecord(record: JsonRecord | null): JsonRecord | null { + if (!record) return null; + const lastActivity = jsonRecordValue(record.lastActivity); + const timeoutBudget = jsonRecordValue(record.timeoutBudget); + return { + ...compactRecord(record, { keys: ["phase", "active", "status", "terminalStatus", "failureKind", "runId", "commandId", "lastSeq"] }), + lastActivity: lastActivity ? compactRecord(lastActivity, { keys: ["sourceSeq", "eventId", "activityKind", "type", "status", "toolName", "itemId", "ageMs", "summary"] }) : null, + timeoutBudget: timeoutBudget ? compactRecord(timeoutBudget, { keys: ["state", "timeoutMs", "elapsedMs", "remainingMs", "startedAt", "source"] }) : null, + recoveryActions: summarizeRecoveryActions(record.recoveryActions), + valuesPrinted: false, + }; +} + +function summarizeRecoveryActions(value: JsonValue | undefined): JsonValue[] { + if (!Array.isArray(value)) return []; + return value.slice(0, 5).map((item) => compactRecord(jsonRecordValue(item), { keys: ["action", "reason", "runId", "commandId", "sessionId", "afterSeq", "command", "hint"] })); +} + function summarizeRunRecord(record: JsonRecord | null): JsonRecord | null { if (!record) return null; return compactRecord(record, { keys: ["id", "status", "terminalStatus", "failureKind", "failureMessage", "backendProfile", "providerId", "claimedBy", "leaseExpiresAt", "createdAt", "updatedAt"] }); diff --git a/src/mgr/result.ts b/src/mgr/result.ts index 8ff0346..8153d8c 100644 --- a/src/mgr/result.ts +++ b/src/mgr/result.ts @@ -43,7 +43,7 @@ export async function buildRunResult(store: AgentRunStore, runId: string, comman const failureDetails = resultFailureDetails(scopedEvents, terminal); const reply = assistantReply(scopedEvents); const blocker = terminal === "blocked" || terminal === "failed" ? { failureKind, message: failureMessage, details: failureDetails } : null; - const liveness = livenessSnapshot(run, command, events, scopedEvents, terminal); + const liveness = livenessSnapshot(run, command, events, scopedEvents, terminal, failureKind, failureMessage); const steerDelivery = command?.type === "steer" ? steerDeliverySummary(events, command.id) : null; return { runId: run.id, @@ -92,7 +92,7 @@ export async function buildRunResult(store: AgentRunStore, runId: string, comman }; } -function livenessSnapshot(run: RunRecord, command: CommandRecord | null, events: RunEvent[], scopedEvents: RunEvent[], terminal: TerminalStatus | null): JsonRecord { +function livenessSnapshot(run: RunRecord, command: CommandRecord | null, events: RunEvent[], scopedEvents: RunEvent[], terminal: TerminalStatus | null, failureKind: FailureKind | null, failureMessage: string | null): JsonRecord { const nowMs = Date.now(); const active = terminal === null && !runIsTerminal(run) && !commandIsTerminal(command); const lastEvent = events.at(-1) ?? null; @@ -100,7 +100,10 @@ function livenessSnapshot(run: RunRecord, command: CommandRecord | null, events: const lastCommandActivity = lastVisibleActivity ?? latestLivenessActivity(scopedEvents); const lease = leaseSummary(run, nowMs); const transportDisconnect = latestTransportDisconnect(scopedEvents); - const phase = livenessPhase({ active, command, lastVisibleActivity, leaseExpired: lease.leaseExpired, transportDisconnect }); + const lastActivity = livenessActivitySummary(lastCommandActivity, nowMs); + const timeoutBudget = timeoutBudgetSummary(run, command, terminal, failureKind, nowMs); + const phase = livenessPhase({ active, command, lastVisibleActivity, leaseExpired: lease.leaseExpired, transportDisconnect, timeoutBudget, lastActivity }); + const afterSeq = lastEvent?.seq ?? 0; return { phase, active, @@ -112,15 +115,17 @@ function livenessSnapshot(run: RunRecord, command: CommandRecord | null, events: lastSeq: lastEvent?.seq ?? 0, lastEventAt: lastEvent?.createdAt ?? null, lastEventAgeMs: lastEvent ? ageMs(lastEvent.createdAt, nowMs) : null, - lastCommandActivity: livenessActivitySummary(lastCommandActivity, nowMs), + lastActivity, + lastCommandActivity: lastActivity, + timeoutBudget, lease, transportDisconnect: transportDisconnect ? livenessActivitySummary(transportDisconnect, nowMs) : null, - recoveryActions: active ? recoveryActions(run, command, lastEvent?.seq ?? 0) : [], + recoveryActions: recoveryActions({ run, command, afterSeq, active, terminal, failureKind, failureMessage }), valuesPrinted: false, }; } -function livenessPhase(input: { active: boolean; command: CommandRecord | null; lastVisibleActivity: RunEvent | null; leaseExpired: boolean | null; transportDisconnect: RunEvent | null }): string { +function livenessPhase(input: { active: boolean; command: CommandRecord | null; lastVisibleActivity: RunEvent | null; leaseExpired: boolean | null; transportDisconnect: RunEvent | null; timeoutBudget: JsonRecord; lastActivity: JsonRecord | null }): string { if (!input.active) return "terminal"; if (input.command?.state === "pending") return "waiting-runner"; if (input.leaseExpired === true) return "runner-heartbeat-stale"; @@ -130,9 +135,43 @@ function livenessPhase(input: { active: boolean; command: CommandRecord | null; if (status === "inProgress" || status === "running") return "waiting-tool"; if (status === "completed") return "idle-after-tool"; } + if (input.lastVisibleActivity?.type === "command_output") return "waiting-tool-output"; + if (input.lastVisibleActivity?.type === "assistant_message") return "waiting-model-output"; + const remainingMs = numberJsonValue(input.timeoutBudget.remainingMs); + const timeoutMs = numberJsonValue(input.timeoutBudget.timeoutMs); + const activityAgeMs = numberJsonValue(input.lastActivity?.ageMs); + const inactiveThresholdMs = timeoutMs === null ? 300_000 : Math.min(300_000, Math.max(60_000, Math.floor(timeoutMs / 4))); + if (remainingMs !== null && remainingMs <= Math.min(120_000, Math.max(10_000, Math.floor((timeoutMs ?? 120_000) / 10))) && (activityAgeMs === null || activityAgeMs >= inactiveThresholdMs)) return "runner-stdio-inactive"; return "waiting-model"; } +function timeoutBudgetSummary(run: RunRecord, command: CommandRecord | null, terminal: TerminalStatus | null, failureKind: FailureKind | null, nowMs: number): JsonRecord { + const timeoutMs = typeof run.executionPolicy.timeoutMs === "number" && Number.isFinite(run.executionPolicy.timeoutMs) && run.executionPolicy.timeoutMs > 0 ? Math.trunc(run.executionPolicy.timeoutMs) : null; + const startedAt = command?.acknowledgedAt ?? command?.createdAt ?? run.updatedAt ?? run.createdAt; + const startedMs = Date.parse(startedAt); + const elapsedMs = timeoutMs !== null && Number.isFinite(startedMs) ? Math.max(0, nowMs - startedMs) : null; + const remainingMs = timeoutMs !== null && elapsedMs !== null ? Math.max(0, timeoutMs - elapsedMs) : null; + const approachingThresholdMs = timeoutMs === null ? null : Math.min(120_000, Math.max(10_000, Math.floor(timeoutMs / 10))); + let state = "unknown"; + if (timeoutMs !== null && elapsedMs !== null) { + if (terminal !== null) state = failureKind === "backend-timeout" ? "timed-out" : "terminal"; + else if (remainingMs === 0) state = "overdue"; + else if (approachingThresholdMs !== null && remainingMs !== null && remainingMs <= approachingThresholdMs) state = "approaching-hard-timeout"; + else state = "within-budget"; + } + return { + timeoutMs, + source: "executionPolicy.timeoutMs", + startedAt, + elapsedMs, + remainingMs, + approachingThresholdMs, + state, + hardTimeout: true, + valuesPrinted: false, + }; +} + function leaseSummary(run: RunRecord, nowMs: number): JsonRecord & { leaseExpired: boolean | null } { const leaseExpiresMs = run.leaseExpiresAt ? Date.parse(run.leaseExpiresAt) : NaN; const hasLease = Boolean(run.claimedBy && run.leaseExpiresAt && Number.isFinite(leaseExpiresMs)); @@ -172,33 +211,85 @@ function livenessActivitySummary(event: RunEvent | null, nowMs: number): JsonRec if (!event) return null; return { seq: event.seq, + sourceSeq: event.seq, + eventId: event.id, type: event.type, + activityKind: activityKind(event), phase: typeof event.payload.phase === "string" ? event.payload.phase : null, status: typeof event.payload.status === "string" ? event.payload.status : null, toolName: typeof event.payload.toolName === "string" ? event.payload.toolName : null, + itemId: typeof event.payload.itemId === "string" ? event.payload.itemId : null, exitCode: normalizedExitCode(event.payload.exitCode), commandId: typeof event.payload.commandId === "string" ? event.payload.commandId : null, createdAt: event.createdAt, ageMs: ageMs(event.createdAt, nowMs), + summary: activityTextSummary(event), valuesPrinted: false, }; } +function activityKind(event: RunEvent): string { + if (event.type === "tool_call") { + const status = typeof event.payload.status === "string" ? event.payload.status : ""; + if (status === "running" || status === "inProgress") return "tool-in-flight"; + if (status === "completed") return "tool-completed"; + if (status === "cancelled") return "tool-cancelled"; + if (status === "failed") return "tool-failed"; + return "tool"; + } + if (event.type === "command_output") return "tool-output"; + if (event.type === "assistant_message") return event.payload.progress === true ? "assistant-progress" : "assistant-output"; + if (event.type === "diff") return "diff"; + if (event.type === "error") return "error"; + if (event.type === "terminal_status") return "terminal"; + return "backend-activity"; +} + +function activityTextSummary(event: RunEvent): string | null { + if (event.type === "tool_call") { + const name = typeof event.payload.toolName === "string" ? event.payload.toolName : typeof event.payload.type === "string" ? event.payload.type : "tool"; + const status = typeof event.payload.status === "string" ? event.payload.status : "observed"; + return `${name} ${status}`; + } + const fromSummary = typeof event.payload.outputSummary === "string" ? event.payload.outputSummary : typeof event.payload.summary === "string" ? event.payload.summary : null; + const text = fromSummary ?? textPayload(event.payload) ?? (typeof event.payload.phase === "string" ? event.payload.phase : null); + if (!text) return null; + return boundedTextSummary(text.replace(/\s+/gu, " ").trim(), { limitChars: 240 }).text as string; +} + function ageMs(value: string, nowMs: number): number | null { const parsed = Date.parse(value); return Number.isFinite(parsed) ? Math.max(0, nowMs - parsed) : null; } -function recoveryActions(run: RunRecord, command: CommandRecord | null, afterSeq: number): JsonRecord[] { +function recoveryActions(input: { run: RunRecord; command: CommandRecord | null; afterSeq: number; active: boolean; terminal: TerminalStatus | null; failureKind: FailureKind | null; failureMessage: string | null }): JsonRecord[] { + const { run, command, afterSeq, active, terminal, failureKind, failureMessage } = input; + const sessionId = run.sessionRef?.sessionId ?? null; + const traceCommand = sessionId ? `./scripts/agentrun sessions trace ${sessionId} --after-seq ${afterSeq} --limit 100 --run-id ${run.id}` : `./scripts/agentrun runs events ${run.id} --after-seq ${afterSeq} --limit 100 --summary`; + const outputCommand = sessionId ? `./scripts/agentrun sessions output ${sessionId} --after-seq ${afterSeq} --limit 100 --run-id ${run.id}` : null; const actions: JsonRecord[] = [ - { action: "poll-trace", runId: run.id, afterSeq }, - { action: "poll-output", runId: run.id, afterSeq }, + { action: "poll-trace", runId: run.id, commandId: command?.id ?? null, afterSeq, command: traceCommand, valuesPrinted: false }, ]; - if (command) actions.push({ action: "cancel-command", runId: run.id, commandId: command.id }); - else actions.push({ action: "cancel-run", runId: run.id }); + if (outputCommand) actions.push({ action: "poll-output", runId: run.id, commandId: command?.id ?? null, afterSeq, command: outputCommand, valuesPrinted: false }); + if (active) { + if (sessionId) actions.push({ action: "steer-session", sessionId, runId: run.id, commandId: command?.id ?? null, command: `./scripts/agentrun sessions steer ${sessionId} --prompt-stdin`, valuesPrinted: false }); + if (command) actions.push({ action: "cancel-command", runId: run.id, commandId: command.id, command: `./scripts/agentrun commands cancel ${command.id} --reason `, valuesPrinted: false }); + else actions.push({ action: "cancel-run", runId: run.id, command: `./scripts/agentrun runs cancel ${run.id} --reason `, valuesPrinted: false }); + return actions; + } + if (terminal === "failed" || terminal === "blocked" || terminal === "cancelled") { + if (command) actions.push({ action: "inspect-result", runId: run.id, commandId: command.id, command: `./scripts/agentrun commands result ${command.id} --run-id ${run.id}`, valuesPrinted: false }); + if (sessionId) actions.push({ action: "resume-session", sessionId, command: `./scripts/agentrun sessions turn ${sessionId} --prompt-stdin`, valuesPrinted: false }); + if (failureKind === "backend-timeout") actions.push({ action: "split-task", reason: "backend-timeout", hint: "把大 patch / 长工具链拆成更短 turn 后用同一 session 续跑", failureMessage: failureMessage ? boundedTextSummary(failureMessage, { limitChars: 200 }).text as string : null, valuesPrinted: false }); + else actions.push({ action: "retry-or-split", reason: failureKind ?? "terminal", hint: "先读 trace/output 的 detail id,再决定 steer、重跑或拆分", valuesPrinted: false }); + } return actions; } +function numberJsonValue(value: JsonValue | undefined): number | null { + return typeof value === "number" && Number.isFinite(value) ? value : null; +} + function steerDeliverySummary(events: RunEvent[], commandId: string): JsonRecord { const related = events.filter((event) => event.payload.commandId === commandId); const completed = latestPhaseEvent(related, "turn/steer:completed"); diff --git a/src/mgr/server.ts b/src/mgr/server.ts index cc89727..8b52e0c 100644 --- a/src/mgr/server.ts +++ b/src/mgr/server.ts @@ -114,6 +114,92 @@ async function refreshQueueTaskRecordForRead(store: AgentRunStore, task: QueueTa } } +async function queueCommanderForRead(store: AgentRunStore, queue: string | undefined, readerId: string | null): Promise { + const snapshot = await store.queueCommander(queue, readerId) as unknown as JsonRecord; + const items = Array.isArray(snapshot.items) ? snapshot.items : []; + const enrichedItems = await Promise.all(items.map(async (item) => { + const task = asJsonRecord(item); + if (!task) return item as JsonValue; + const supervisor = await queueTaskSupervisor(store, task); + return supervisor ? { ...task, supervisor, valuesPrinted: false } : task; + })); + return { ...snapshot, items: enrichedItems, valuesPrinted: false } as JsonValue; +} + +async function queueTaskSupervisor(store: AgentRunStore, task: JsonRecord): Promise { + const attempt = asJsonRecord(task.latestAttempt); + const runId = stringJsonValue(attempt?.runId); + if (!runId) return null; + try { + const result = await buildRunResult(store, runId, stringJsonValue(attempt?.commandId) ?? undefined); + const liveness = asJsonRecord(result.liveness); + const lastActivity = asJsonRecord(liveness?.lastActivity ?? liveness?.lastCommandActivity); + const timeoutBudget = asJsonRecord(liveness?.timeoutBudget); + return { + runId: stringJsonValue(result.runId), + commandId: stringJsonValue(result.commandId), + status: stringJsonValue(result.status), + terminalStatus: stringJsonValue(result.terminalStatus), + failureKind: stringJsonValue(result.failureKind), + phase: stringJsonValue(liveness?.phase), + active: liveness?.active === true, + lastSeq: numberJsonValue(liveness?.lastSeq ?? result.lastSeq), + lastActivity: lastActivity ? compactActivity(lastActivity) : null, + timeoutBudget: timeoutBudget ? compactTimeoutBudget(timeoutBudget) : null, + recoveryActions: compactRecoveryActions(liveness?.recoveryActions), + valuesPrinted: false, + }; + } catch (error) { + return { phase: "unavailable", failureKind: "infra-failed", message: error instanceof Error ? error.message : String(error), valuesPrinted: false }; + } +} + +function compactActivity(activity: JsonRecord): JsonRecord { + return { + sourceSeq: numberJsonValue(activity.sourceSeq ?? activity.seq), + eventId: stringJsonValue(activity.eventId), + activityKind: stringJsonValue(activity.activityKind), + type: stringJsonValue(activity.type), + status: stringJsonValue(activity.status), + toolName: stringJsonValue(activity.toolName), + itemId: stringJsonValue(activity.itemId), + ageMs: numberJsonValue(activity.ageMs), + summary: boundedJsonString(activity.summary, 180), + valuesPrinted: false, + }; +} + +function compactTimeoutBudget(budget: JsonRecord): JsonRecord { + return { + state: stringJsonValue(budget.state), + timeoutMs: numberJsonValue(budget.timeoutMs), + elapsedMs: numberJsonValue(budget.elapsedMs), + remainingMs: numberJsonValue(budget.remainingMs), + startedAt: stringJsonValue(budget.startedAt), + source: stringJsonValue(budget.source), + valuesPrinted: false, + }; +} + +function compactRecoveryActions(value: JsonValue | undefined): JsonValue[] { + if (!Array.isArray(value)) return []; + return value.slice(0, 5).map((item) => { + const action = asJsonRecord(item); + if (!action) return { action: "unknown", valuesPrinted: false }; + return { + action: stringJsonValue(action.action), + reason: stringJsonValue(action.reason), + runId: stringJsonValue(action.runId), + commandId: stringJsonValue(action.commandId), + sessionId: stringJsonValue(action.sessionId), + afterSeq: numberJsonValue(action.afterSeq), + command: boundedJsonString(action.command, 220), + hint: boundedJsonString(action.hint, 220), + valuesPrinted: false, + }; + }); +} + async function route({ method, url, body, store, sourceCommit, runnerJobDefaults, sessionPvcDefaults, providerProfileDefaults }: { method: string; url: URL; body: unknown; store: AgentRunStore; sourceCommit: string; runnerJobDefaults?: NonNullable; sessionPvcDefaults?: NonNullable; providerProfileDefaults?: NonNullable }): Promise { const path = url.pathname; if (method === "GET" && (path === "/health" || path === "/health/live" || path === "/health/readiness")) { @@ -334,7 +420,7 @@ async function route({ method, url, body, store, sourceCommit, runnerJobDefaults if (method === "GET" && path === "/api/v1/queue/commander") { const queue = url.searchParams.get("queue") ?? undefined; await refreshRunningQueueTasksForRead(store, queue); - return await store.queueCommander(queue, url.searchParams.get("readerId")) as unknown as JsonValue; + return await queueCommanderForRead(store, queue, url.searchParams.get("readerId")); } if (method === "POST" && path === "/api/v1/runs") return await store.createRun(validateCreateRun(body)) as unknown as JsonValue; const runMatch = path.match(/^\/api\/v1\/runs\/([^/]+)$/u); @@ -460,6 +546,24 @@ function numberField(record: JsonRecord, key: string, fallback: number): number return typeof value === "number" && Number.isFinite(value) ? value : fallback; } +function asJsonRecord(value: unknown): JsonRecord | null { + return typeof value === "object" && value !== null && !Array.isArray(value) ? value as JsonRecord : null; +} + +function stringJsonValue(value: JsonValue | undefined): string | null { + return typeof value === "string" && value.length > 0 ? value : null; +} + +function numberJsonValue(value: JsonValue | undefined): number | null { + return typeof value === "number" && Number.isFinite(value) ? value : null; +} + +function boundedJsonString(value: JsonValue | undefined, limit: number): string | null { + if (typeof value !== "string" || value.length === 0) return null; + const normalized = value.replace(/\s+/gu, " ").trim(); + return normalized.length > limit ? `${normalized.slice(0, Math.max(0, limit - 3))}...` : normalized; +} + function stringField(record: JsonRecord, key: string): string { const value = record[key]; if (typeof value !== "string" || value.trim().length === 0) throw new AgentRunError("schema-invalid", `${key} is required`, { httpStatus: 400 }); diff --git a/src/selftest/cases/55-timeout-liveness.ts b/src/selftest/cases/55-timeout-liveness.ts new file mode 100644 index 0000000..2c472ff --- /dev/null +++ b/src/selftest/cases/55-timeout-liveness.ts @@ -0,0 +1,132 @@ +import assert from "node:assert/strict"; +import { setTimeout as sleep } from "node:timers/promises"; +import { ManagerClient } from "../../mgr/client.js"; +import { startManagerServer } from "../../mgr/server.js"; +import { MemoryAgentRunStore } from "../../mgr/store.js"; +import type { JsonRecord } from "../../common/types.js"; +import { assertNoSecretLeak, type SelfTestCase, type SelfTestContext } from "../harness.js"; +import { summarizeQueueCommanderSnapshot } from "../../../scripts/src/cli.js"; + +const selfTest: SelfTestCase = async (context: SelfTestContext) => { + const store = new MemoryAgentRunStore(); + const server = await startManagerServer({ port: 0, host: "127.0.0.1", sourceCommit: "self-test", store }); + try { + const client = new ManagerClient(server.baseUrl); + + const tool = await createActiveRun(client, context, "timeout-liveness-tool", 120_000); + await client.post(`/api/v1/runs/${tool.runId}/events`, { type: "tool_call", payload: { commandId: tool.commandId, itemId: "tool_live", toolName: "commandExecution", status: "running", command: "hwpod workspace apply-patch" } }); + const toolResult = await commandResult(client, tool); + const toolLive = toolResult.liveness as JsonRecord; + assert.equal(toolLive.phase, "waiting-tool"); + assert.equal(((toolLive.lastActivity as JsonRecord).activityKind), "tool-in-flight"); + assert.equal(((toolLive.lastActivity as JsonRecord).sourceSeq), 4); + assert.equal(((toolLive.timeoutBudget as JsonRecord).state), "within-budget"); + assert.ok(Array.isArray(toolLive.recoveryActions)); + + const assistant = await createActiveRun(client, context, "timeout-liveness-assistant", 120_000); + await client.post(`/api/v1/runs/${assistant.runId}/events`, { type: "assistant_message", payload: { commandId: assistant.commandId, itemId: "msg_progress", progress: true, text: "正在生成 apply-patch,先汇总待改文件。" } }); + const assistantLive = (await commandResult(client, assistant)).liveness as JsonRecord; + assert.equal(assistantLive.phase, "waiting-model-output"); + assert.equal(((assistantLive.lastActivity as JsonRecord).activityKind), "assistant-progress"); + + const inactive = await createActiveRun(client, context, "timeout-liveness-inactive", 40); + await sleep(36); + const inactiveLive = (await commandResult(client, inactive)).liveness as JsonRecord; + assert.equal(inactiveLive.phase, "runner-stdio-inactive"); + assert.ok(["approaching-hard-timeout", "overdue"].includes(String((inactiveLive.timeoutBudget as JsonRecord).state))); + + const terminal = await createActiveRun(client, context, "timeout-liveness-terminal", 50); + await client.post(`/api/v1/runs/${terminal.runId}/events`, { type: "error", payload: { commandId: terminal.commandId, failureKind: "backend-timeout", phase: "turn:hard-timeout", message: "codex stdio turn hard timed out after 50ms" } }); + await client.patch(`/api/v1/commands/${terminal.commandId}/status`, { terminalStatus: "failed", failureKind: "backend-timeout", failureMessage: "codex stdio turn hard timed out after 50ms" }); + await client.patch(`/api/v1/runs/${terminal.runId}/status`, { terminalStatus: "failed", failureKind: "backend-timeout", failureMessage: "codex stdio turn hard timed out after 50ms" }); + const terminalResult = await commandResult(client, terminal); + const terminalLive = terminalResult.liveness as JsonRecord; + assert.equal(terminalResult.terminalStatus, "failed"); + assert.equal(terminalLive.phase, "terminal"); + assert.equal(((terminalLive.timeoutBudget as JsonRecord).state), "timed-out"); + assert.ok((terminalLive.recoveryActions as JsonRecord[]).some((action) => action.action === "resume-session")); + assert.ok((terminalLive.recoveryActions as JsonRecord[]).some((action) => action.action === "split-task")); + + const session = await client.get(`/api/v1/sessions/${terminal.sessionId}?readerId=timeout-liveness`) as JsonRecord; + assert.equal(((session.liveness as JsonRecord).phase), "terminal"); + assert.ok(Array.isArray(((session.supervisor as JsonRecord).recoveryActions)), "session show must keep terminal recovery actions"); + + const task = await client.post("/api/v1/queue/tasks", queueTask(context, terminal.sessionId, 50)) as JsonRecord; + store.updateQueueTaskAttempt(String(task.id), { + state: "running", + latestAttempt: { attemptId: "attempt_timeout_liveness", state: "running", runId: terminal.runId, commandId: terminal.commandId, runnerJobId: null, sessionId: terminal.sessionId, sessionPath: `/api/v1/sessions/${terminal.sessionId}` }, + sessionPath: `/api/v1/sessions/${terminal.sessionId}`, + }); + const commander = await client.get("/api/v1/queue/commander?queue=timeout-liveness&readerId=timeout-liveness") as JsonRecord; + const commanderItem = ((commander.items as JsonRecord[]) ?? []).find((item) => item.id === task.id) as JsonRecord; + assert.equal(((commanderItem.supervisor as JsonRecord).phase), "terminal"); + assert.equal((((commanderItem.supervisor as JsonRecord).timeoutBudget as JsonRecord).state), "timed-out"); + const commanderSummary = summarizeQueueCommanderSnapshot(commander, { limit: 5 }); + const summaryItem = ((commanderSummary.items as JsonRecord[]) ?? []).find((item) => item.id === task.id) as JsonRecord; + assert.equal(((summaryItem.supervisor as JsonRecord).phase), "terminal"); + assert.equal(JSON.stringify(commanderSummary).includes("hwpod workspace apply-patch"), false, "commander summary must stay compact and avoid dumping command bodies"); + assertNoSecretLeak({ toolResult, assistantLive, inactiveLive, terminalResult, session, commanderSummary }); + + return { name: "timeout-liveness", tests: ["tool-in-flight-liveness", "assistant-progress-liveness", "stdio-inactive-timeout-budget", "terminal-timeout-recovery", "queue-commander-supervisor"] }; + } finally { + await new Promise((resolve) => server.server.close(() => resolve())); + } +}; + +async function createActiveRun(client: ManagerClient, context: SelfTestContext, sessionSuffix: string, timeoutMs: number): Promise<{ runId: string; commandId: string; sessionId: string }> { + const sessionId = `selftest-${sessionSuffix}`; + const run = await client.post("/api/v1/runs", runBody(context, sessionId, timeoutMs)) as JsonRecord; + const command = await client.post(`/api/v1/runs/${run.id}/commands`, { type: "turn", payload: { prompt: sessionSuffix }, idempotencyKey: sessionSuffix }) as JsonRecord; + await client.post(`/api/v1/runs/${run.id}/claim`, { runnerId: `runner_${sessionSuffix}`, leaseMs: 60_000 }); + await client.post(`/api/v1/commands/${command.id}/ack`, {}); + return { runId: String(run.id), commandId: String(command.id), sessionId }; +} + +async function commandResult(client: ManagerClient, item: { runId: string; commandId: string }): Promise { + return await client.get(`/api/v1/runs/${item.runId}/commands/${item.commandId}/result`) as JsonRecord; +} + +function runBody(context: SelfTestContext, sessionId: string, timeoutMs: number): JsonRecord { + return { + tenantId: "unidesk", + projectId: "pikasTech/agentrun", + workspaceRef: { kind: "host-path", path: context.workspace }, + sessionRef: { sessionId, conversationId: sessionId }, + providerId: "G14", + backendProfile: "codex", + executionPolicy: executionPolicy(timeoutMs, context.codexHome), + traceSink: null, + }; +} + +function queueTask(context: SelfTestContext, sessionId: string, timeoutMs: number): JsonRecord { + return { + tenantId: "unidesk", + projectId: "pikasTech/agentrun", + queue: "timeout-liveness", + lane: "selftest", + title: "timeout liveness commander", + priority: 1, + backendProfile: "codex", + providerId: "G14", + workspaceRef: { kind: "host-path", path: context.workspace }, + sessionRef: { sessionId, conversationId: sessionId }, + executionPolicy: executionPolicy(timeoutMs, context.codexHome), + resourceBundleRef: null, + payload: { prompt: "timeout liveness commander" }, + references: [], + metadata: {}, + }; +} + +function executionPolicy(timeoutMs: number, codexHome: string): JsonRecord { + return { + sandbox: "workspace-write", + approval: "never", + timeoutMs, + network: "default", + secretScope: { allowCredentialEcho: false, providerCredentials: [{ profile: "codex", secretRef: { name: "agentrun-v01-provider-codex", keys: ["auth.json", "config.toml"], mountPath: codexHome } }] }, + }; +} + +export default selfTest;