Merge pull request #140 from pikasTech/fix/issue131-timeout-liveness

fix: 增强长 turn liveness 可见性
This commit is contained in:
Lyon
2026-06-10 12:07:56 +08:00
committed by GitHub
7 changed files with 362 additions and 15 deletions
+1 -1
View File
@@ -167,7 +167,7 @@ Manager 只承接 HWLAB v0.2 Code Agent 的通用执行事实,不承接 HWLAB
| `runId` / `commandId` / `attemptId` | 支持调用方持久关联和问题定位。 |
| `artifactSummary` | 第一阶段只放有界摘要、字节数、截断标记和必要引用;不内嵌大 stdout/stderr。 |
| `toolCallSummary` | 输出有界、脱敏的 tool call 状态摘要,至少包含 `count``statusCounts``exitCodeCounts` 和最近若干条 `items``method/toolName/type/status/exitCode/command`。消费侧必须用它区分 AgentRun command terminal、agent 内部工具执行和后置诊断,不得用单一 `hwpodExitCode` 覆盖 AgentRun 成功终态。 |
| `liveness` | 查询时派生的 supervisor 活性快照,不写入 durable event。必须暴露 `phase``active``lastSeq``lastEventAgeMs``lastCommandActivity`、lease/heartbeat 摘要和可执行恢复动作。`phase` 至少区分 `waiting-runner``waiting-model``waiting-tool``idle-after-tool``transport-disconnected``runner-heartbeat-stale``terminal`,避免调用方只能用外层超时猜测 backend 状态。 |
| `liveness` | 查询时派生的 supervisor 活性快照,不写入 durable event。必须暴露 `phase``active``lastSeq``lastEventAgeMs``lastActivity`/`lastCommandActivity``timeoutBudget`lease/heartbeat 摘要和可执行恢复动作。`lastActivity` 必须包含 `sourceSeq``eventId``activityKind``ageMs`,用于按 id/seq drill-down;默认只给有界摘要,不展开 stdout、runnerTrace、完整 tool command 或 raw event。`timeoutBudget` 必须基于 `executionPolicy.timeoutMs` 暴露 `elapsedMs``remainingMs``state`(如 `within-budget``approaching-hard-timeout``timed-out`)。`phase` 至少区分 `waiting-runner``waiting-model``waiting-model-output``waiting-tool``waiting-tool-output``idle-after-tool``runner-stdio-inactive``transport-disconnected``runner-heartbeat-stale``terminal`,避免调用方只能用外层超时猜测 backend 状态。终态失败/阻塞时仍必须保留恢复动作,例如 inspect result、resume session、split task,而不是返回空数组。 |
| `steerDelivery` | 仅在查询 `type=steer` command result 时出现。必须说明 steer 是否已被 runner ack、是否已转发并被 backend `turn/steer` RPC 接受、目标 `targetCommandId`、是否观察到 target command 后续事件,以及“steer command completed 不等于 target turn 已产生后续 assistant/tool 输出”的语义。 |
`assistant_message` partial、`command_output` 存在、stdout 非空、backend transport close 或 idle timeout 都不能单独让 result 进入 `completed`
+1 -1
View File
@@ -98,7 +98,7 @@ CLI 官方 TypeScript 入口固定为 `scripts/agentrun-cli.ts`。在 G14 非交
- `queue submit/read/cancel/dispatch/refresh --dry-run` 必须只返回 non-mutating plan,固定 `dryRun=true``mutation=false`,不得创建 task、mark read、cancel、dispatch、refresh 或启动 runner job。
- `queue dispatch` 是 Q2 的受控手动调度入口,只对单个 task 显式创建 attempt 和 Core run/command/runner job;不得伪装成自动 scheduler;带 `--dry-run` 时只读取 task 并展示将要 POST 的路径和有界 request 摘要。
- `queue refresh` 只根据 Queue task 中保存的 Core run/command 引用回写 Queue attempt 状态,不读取 Core trace 反推 commander 或统计;带 `--dry-run` 时不得写回状态。
- `queue list/show/commander` 默认返回低噪声 summary,只显示 task/attempt/session ids、state、read cursor、stats 相关字段和 drill-down 命令;需要完整 task payload、resource bundle 或 metadata 时显式使用 `--full|--raw`
- `queue list/show/commander` 默认返回低噪声 summary,只显示 task/attempt/session ids、state、read cursor、stats 相关字段、compact supervisor 和 drill-down 命令;commander 的 supervisor 只能放 `phase`、last activity source seq/id、timeout budget 和恢复动作摘要,不得展开完整 payload、trace、tool command、stdout/stderr 或 runnerTrace。需要完整 task payload、resource bundle 或 metadata 时显式使用 `--full|--raw`;需要 trace/output 细节时继续按返回的 `sessionId`/`sourceSeq``sessions trace|output --seq/--event-id/--item-id --full`
- `queue show` 不得返回或代理完整 output/trace;输出和 trace 只能通过返回的 `sessionPath` 对应 `sessions ...` 命令查询。
- 需要提交较长 Queue task、dispatch body、run base 或 command payload 时,CLI 必须支持 `--json-stdin`,避免为了 heredoc/stdin 内容先写临时 dump 文件再传 `--json-file``sessions turn` 的 runner job override 也必须支持 `--runner-json-stdin`。所有 stdin JSON 仍必须解析为 object,并在 dry-run 中只展示有界 body 摘要、bytes 和 keys。
- `sessions ps` 默认只显示 running 和 unread session`--state all` 才显示历史 read session,避免旧 session 噪声淹没当前进度。
+1 -1
View File
@@ -101,7 +101,7 @@ Session 命令负责输出、trace 和会话控制:
./scripts/agentrun sessions read <sessionId> [--reader-id <reader>]
```
不得新增 `queue output``queue trace``queue session/*` 这类子路径代理。`queue list/show/commander` 默认输出低噪声 summary,最多打印 task/attempt/session ids、状态、统计、`sessionPath` 和下一步 `sessions ...` 命令;完整 payload/resource bundle/metadata 只能通过显式 `--full|--raw` 展开。Queue mutation 命令带 `--dry-run` 时必须只返回 `mutation=false` 的计划,不得写 Queue、Core run/command 或 runner job。
不得新增 `queue output``queue trace``queue session/*` 这类子路径代理。`queue list/show/commander` 默认输出低噪声 summary,最多打印 task/attempt/session ids、状态、统计、`sessionPath`、compact supervisor 和下一步 `sessions ...` 命令;supervisor 只允许披露 phase、last activity source seq/id、timeout budget 和恢复动作摘要,不得展开完整 payload、trace、tool command、stdout/stderr 或 runnerTrace。完整 payload/resource bundle/metadata 只能通过显式 `--full|--raw` 展开;trace/output 细节继续按 `sessionId` + `sourceSeq/eventId/itemId` 走 Session CLI 渐进披露。Queue mutation 命令带 `--dry-run` 时必须只返回 `mutation=false` 的计划,不得写 Queue、Core run/command 或 runner job。
Queue task 的 `resourceBundleRef` 在 dispatch 时原样进入 Core run。若其中声明 `requiredSkills`,Queue 只展示声明和终态摘要,不能自行判定可用;runner 必须在 gitbundle materialization 后、backend 启动前校验 `.agents/skills/<name>/SKILL.md`,缺失时以 `required-skill-unavailable` 写入 command/run result 和 events。
+20
View File
@@ -699,8 +699,10 @@ function summarizeQueueStats(record: JsonRecord | null): JsonRecord | null {
function summarizeQueueTaskWithAttempt(record: JsonRecord | null, fallbackTaskId: string): JsonRecord {
const summary = summarizeQueueTaskRecord(record, fallbackTaskId);
const latestAttempt = summarizeAttemptRecord(jsonRecordValue(record?.latestAttempt));
const supervisor = summarizeSupervisorRecord(jsonRecordValue(record?.supervisor));
const sessionRef = jsonRecordValue(record?.sessionRef);
if (latestAttempt) summary.latestAttempt = latestAttempt;
if (supervisor) summary.supervisor = supervisor;
const sessionId = stringValue(sessionRef?.sessionId) ?? stringValue(latestAttempt?.sessionId);
if (sessionId) summary.sessionId = sessionId;
if (record?.readCursor !== undefined) summary.read = record.readCursor !== null;
@@ -737,6 +739,24 @@ function summarizeAttemptRecord(record: JsonRecord | null): JsonRecord | null {
return compactRecord(record, { keys: ["attemptId", "state", "runId", "commandId", "runnerJobId", "sessionId", "sessionPath"] });
}
function summarizeSupervisorRecord(record: JsonRecord | null): JsonRecord | null {
if (!record) return null;
const lastActivity = jsonRecordValue(record.lastActivity);
const timeoutBudget = jsonRecordValue(record.timeoutBudget);
return {
...compactRecord(record, { keys: ["phase", "active", "status", "terminalStatus", "failureKind", "runId", "commandId", "lastSeq"] }),
lastActivity: lastActivity ? compactRecord(lastActivity, { keys: ["sourceSeq", "eventId", "activityKind", "type", "status", "toolName", "itemId", "ageMs", "summary"] }) : null,
timeoutBudget: timeoutBudget ? compactRecord(timeoutBudget, { keys: ["state", "timeoutMs", "elapsedMs", "remainingMs", "startedAt", "source"] }) : null,
recoveryActions: summarizeRecoveryActions(record.recoveryActions),
valuesPrinted: false,
};
}
function summarizeRecoveryActions(value: JsonValue | undefined): JsonValue[] {
if (!Array.isArray(value)) return [];
return value.slice(0, 5).map((item) => compactRecord(jsonRecordValue(item), { keys: ["action", "reason", "runId", "commandId", "sessionId", "afterSeq", "command", "hint"] }));
}
function summarizeRunRecord(record: JsonRecord | null): JsonRecord | null {
if (!record) return null;
return compactRecord(record, { keys: ["id", "status", "terminalStatus", "failureKind", "failureMessage", "backendProfile", "providerId", "claimedBy", "leaseExpiresAt", "createdAt", "updatedAt"] });
+102 -11
View File
@@ -43,7 +43,7 @@ export async function buildRunResult(store: AgentRunStore, runId: string, comman
const failureDetails = resultFailureDetails(scopedEvents, terminal);
const reply = assistantReply(scopedEvents);
const blocker = terminal === "blocked" || terminal === "failed" ? { failureKind, message: failureMessage, details: failureDetails } : null;
const liveness = livenessSnapshot(run, command, events, scopedEvents, terminal);
const liveness = livenessSnapshot(run, command, events, scopedEvents, terminal, failureKind, failureMessage);
const steerDelivery = command?.type === "steer" ? steerDeliverySummary(events, command.id) : null;
return {
runId: run.id,
@@ -92,7 +92,7 @@ export async function buildRunResult(store: AgentRunStore, runId: string, comman
};
}
function livenessSnapshot(run: RunRecord, command: CommandRecord | null, events: RunEvent[], scopedEvents: RunEvent[], terminal: TerminalStatus | null): JsonRecord {
function livenessSnapshot(run: RunRecord, command: CommandRecord | null, events: RunEvent[], scopedEvents: RunEvent[], terminal: TerminalStatus | null, failureKind: FailureKind | null, failureMessage: string | null): JsonRecord {
const nowMs = Date.now();
const active = terminal === null && !runIsTerminal(run) && !commandIsTerminal(command);
const lastEvent = events.at(-1) ?? null;
@@ -100,7 +100,10 @@ function livenessSnapshot(run: RunRecord, command: CommandRecord | null, events:
const lastCommandActivity = lastVisibleActivity ?? latestLivenessActivity(scopedEvents);
const lease = leaseSummary(run, nowMs);
const transportDisconnect = latestTransportDisconnect(scopedEvents);
const phase = livenessPhase({ active, command, lastVisibleActivity, leaseExpired: lease.leaseExpired, transportDisconnect });
const lastActivity = livenessActivitySummary(lastCommandActivity, nowMs);
const timeoutBudget = timeoutBudgetSummary(run, command, terminal, failureKind, nowMs);
const phase = livenessPhase({ active, command, lastVisibleActivity, leaseExpired: lease.leaseExpired, transportDisconnect, timeoutBudget, lastActivity });
const afterSeq = lastEvent?.seq ?? 0;
return {
phase,
active,
@@ -112,15 +115,17 @@ function livenessSnapshot(run: RunRecord, command: CommandRecord | null, events:
lastSeq: lastEvent?.seq ?? 0,
lastEventAt: lastEvent?.createdAt ?? null,
lastEventAgeMs: lastEvent ? ageMs(lastEvent.createdAt, nowMs) : null,
lastCommandActivity: livenessActivitySummary(lastCommandActivity, nowMs),
lastActivity,
lastCommandActivity: lastActivity,
timeoutBudget,
lease,
transportDisconnect: transportDisconnect ? livenessActivitySummary(transportDisconnect, nowMs) : null,
recoveryActions: active ? recoveryActions(run, command, lastEvent?.seq ?? 0) : [],
recoveryActions: recoveryActions({ run, command, afterSeq, active, terminal, failureKind, failureMessage }),
valuesPrinted: false,
};
}
function livenessPhase(input: { active: boolean; command: CommandRecord | null; lastVisibleActivity: RunEvent | null; leaseExpired: boolean | null; transportDisconnect: RunEvent | null }): string {
function livenessPhase(input: { active: boolean; command: CommandRecord | null; lastVisibleActivity: RunEvent | null; leaseExpired: boolean | null; transportDisconnect: RunEvent | null; timeoutBudget: JsonRecord; lastActivity: JsonRecord | null }): string {
if (!input.active) return "terminal";
if (input.command?.state === "pending") return "waiting-runner";
if (input.leaseExpired === true) return "runner-heartbeat-stale";
@@ -130,9 +135,43 @@ function livenessPhase(input: { active: boolean; command: CommandRecord | null;
if (status === "inProgress" || status === "running") return "waiting-tool";
if (status === "completed") return "idle-after-tool";
}
if (input.lastVisibleActivity?.type === "command_output") return "waiting-tool-output";
if (input.lastVisibleActivity?.type === "assistant_message") return "waiting-model-output";
const remainingMs = numberJsonValue(input.timeoutBudget.remainingMs);
const timeoutMs = numberJsonValue(input.timeoutBudget.timeoutMs);
const activityAgeMs = numberJsonValue(input.lastActivity?.ageMs);
const inactiveThresholdMs = timeoutMs === null ? 300_000 : Math.min(300_000, Math.max(60_000, Math.floor(timeoutMs / 4)));
if (remainingMs !== null && remainingMs <= Math.min(120_000, Math.max(10_000, Math.floor((timeoutMs ?? 120_000) / 10))) && (activityAgeMs === null || activityAgeMs >= inactiveThresholdMs)) return "runner-stdio-inactive";
return "waiting-model";
}
function timeoutBudgetSummary(run: RunRecord, command: CommandRecord | null, terminal: TerminalStatus | null, failureKind: FailureKind | null, nowMs: number): JsonRecord {
const timeoutMs = typeof run.executionPolicy.timeoutMs === "number" && Number.isFinite(run.executionPolicy.timeoutMs) && run.executionPolicy.timeoutMs > 0 ? Math.trunc(run.executionPolicy.timeoutMs) : null;
const startedAt = command?.acknowledgedAt ?? command?.createdAt ?? run.updatedAt ?? run.createdAt;
const startedMs = Date.parse(startedAt);
const elapsedMs = timeoutMs !== null && Number.isFinite(startedMs) ? Math.max(0, nowMs - startedMs) : null;
const remainingMs = timeoutMs !== null && elapsedMs !== null ? Math.max(0, timeoutMs - elapsedMs) : null;
const approachingThresholdMs = timeoutMs === null ? null : Math.min(120_000, Math.max(10_000, Math.floor(timeoutMs / 10)));
let state = "unknown";
if (timeoutMs !== null && elapsedMs !== null) {
if (terminal !== null) state = failureKind === "backend-timeout" ? "timed-out" : "terminal";
else if (remainingMs === 0) state = "overdue";
else if (approachingThresholdMs !== null && remainingMs !== null && remainingMs <= approachingThresholdMs) state = "approaching-hard-timeout";
else state = "within-budget";
}
return {
timeoutMs,
source: "executionPolicy.timeoutMs",
startedAt,
elapsedMs,
remainingMs,
approachingThresholdMs,
state,
hardTimeout: true,
valuesPrinted: false,
};
}
function leaseSummary(run: RunRecord, nowMs: number): JsonRecord & { leaseExpired: boolean | null } {
const leaseExpiresMs = run.leaseExpiresAt ? Date.parse(run.leaseExpiresAt) : NaN;
const hasLease = Boolean(run.claimedBy && run.leaseExpiresAt && Number.isFinite(leaseExpiresMs));
@@ -172,33 +211,85 @@ function livenessActivitySummary(event: RunEvent | null, nowMs: number): JsonRec
if (!event) return null;
return {
seq: event.seq,
sourceSeq: event.seq,
eventId: event.id,
type: event.type,
activityKind: activityKind(event),
phase: typeof event.payload.phase === "string" ? event.payload.phase : null,
status: typeof event.payload.status === "string" ? event.payload.status : null,
toolName: typeof event.payload.toolName === "string" ? event.payload.toolName : null,
itemId: typeof event.payload.itemId === "string" ? event.payload.itemId : null,
exitCode: normalizedExitCode(event.payload.exitCode),
commandId: typeof event.payload.commandId === "string" ? event.payload.commandId : null,
createdAt: event.createdAt,
ageMs: ageMs(event.createdAt, nowMs),
summary: activityTextSummary(event),
valuesPrinted: false,
};
}
function activityKind(event: RunEvent): string {
if (event.type === "tool_call") {
const status = typeof event.payload.status === "string" ? event.payload.status : "";
if (status === "running" || status === "inProgress") return "tool-in-flight";
if (status === "completed") return "tool-completed";
if (status === "cancelled") return "tool-cancelled";
if (status === "failed") return "tool-failed";
return "tool";
}
if (event.type === "command_output") return "tool-output";
if (event.type === "assistant_message") return event.payload.progress === true ? "assistant-progress" : "assistant-output";
if (event.type === "diff") return "diff";
if (event.type === "error") return "error";
if (event.type === "terminal_status") return "terminal";
return "backend-activity";
}
function activityTextSummary(event: RunEvent): string | null {
if (event.type === "tool_call") {
const name = typeof event.payload.toolName === "string" ? event.payload.toolName : typeof event.payload.type === "string" ? event.payload.type : "tool";
const status = typeof event.payload.status === "string" ? event.payload.status : "observed";
return `${name} ${status}`;
}
const fromSummary = typeof event.payload.outputSummary === "string" ? event.payload.outputSummary : typeof event.payload.summary === "string" ? event.payload.summary : null;
const text = fromSummary ?? textPayload(event.payload) ?? (typeof event.payload.phase === "string" ? event.payload.phase : null);
if (!text) return null;
return boundedTextSummary(text.replace(/\s+/gu, " ").trim(), { limitChars: 240 }).text as string;
}
function ageMs(value: string, nowMs: number): number | null {
const parsed = Date.parse(value);
return Number.isFinite(parsed) ? Math.max(0, nowMs - parsed) : null;
}
function recoveryActions(run: RunRecord, command: CommandRecord | null, afterSeq: number): JsonRecord[] {
function recoveryActions(input: { run: RunRecord; command: CommandRecord | null; afterSeq: number; active: boolean; terminal: TerminalStatus | null; failureKind: FailureKind | null; failureMessage: string | null }): JsonRecord[] {
const { run, command, afterSeq, active, terminal, failureKind, failureMessage } = input;
const sessionId = run.sessionRef?.sessionId ?? null;
const traceCommand = sessionId ? `./scripts/agentrun sessions trace ${sessionId} --after-seq ${afterSeq} --limit 100 --run-id ${run.id}` : `./scripts/agentrun runs events ${run.id} --after-seq ${afterSeq} --limit 100 --summary`;
const outputCommand = sessionId ? `./scripts/agentrun sessions output ${sessionId} --after-seq ${afterSeq} --limit 100 --run-id ${run.id}` : null;
const actions: JsonRecord[] = [
{ action: "poll-trace", runId: run.id, afterSeq },
{ action: "poll-output", runId: run.id, afterSeq },
{ action: "poll-trace", runId: run.id, commandId: command?.id ?? null, afterSeq, command: traceCommand, valuesPrinted: false },
];
if (command) actions.push({ action: "cancel-command", runId: run.id, commandId: command.id });
else actions.push({ action: "cancel-run", runId: run.id });
if (outputCommand) actions.push({ action: "poll-output", runId: run.id, commandId: command?.id ?? null, afterSeq, command: outputCommand, valuesPrinted: false });
if (active) {
if (sessionId) actions.push({ action: "steer-session", sessionId, runId: run.id, commandId: command?.id ?? null, command: `./scripts/agentrun sessions steer ${sessionId} --prompt-stdin`, valuesPrinted: false });
if (command) actions.push({ action: "cancel-command", runId: run.id, commandId: command.id, command: `./scripts/agentrun commands cancel ${command.id} --reason <reason>`, valuesPrinted: false });
else actions.push({ action: "cancel-run", runId: run.id, command: `./scripts/agentrun runs cancel ${run.id} --reason <reason>`, valuesPrinted: false });
return actions;
}
if (terminal === "failed" || terminal === "blocked" || terminal === "cancelled") {
if (command) actions.push({ action: "inspect-result", runId: run.id, commandId: command.id, command: `./scripts/agentrun commands result ${command.id} --run-id ${run.id}`, valuesPrinted: false });
if (sessionId) actions.push({ action: "resume-session", sessionId, command: `./scripts/agentrun sessions turn ${sessionId} --prompt-stdin`, valuesPrinted: false });
if (failureKind === "backend-timeout") actions.push({ action: "split-task", reason: "backend-timeout", hint: "把大 patch / 长工具链拆成更短 turn 后用同一 session 续跑", failureMessage: failureMessage ? boundedTextSummary(failureMessage, { limitChars: 200 }).text as string : null, valuesPrinted: false });
else actions.push({ action: "retry-or-split", reason: failureKind ?? "terminal", hint: "先读 trace/output 的 detail id,再决定 steer、重跑或拆分", valuesPrinted: false });
}
return actions;
}
function numberJsonValue(value: JsonValue | undefined): number | null {
return typeof value === "number" && Number.isFinite(value) ? value : null;
}
function steerDeliverySummary(events: RunEvent[], commandId: string): JsonRecord {
const related = events.filter((event) => event.payload.commandId === commandId);
const completed = latestPhaseEvent(related, "turn/steer:completed");
+105 -1
View File
@@ -114,6 +114,92 @@ async function refreshQueueTaskRecordForRead(store: AgentRunStore, task: QueueTa
}
}
async function queueCommanderForRead(store: AgentRunStore, queue: string | undefined, readerId: string | null): Promise<JsonValue> {
const snapshot = await store.queueCommander(queue, readerId) as unknown as JsonRecord;
const items = Array.isArray(snapshot.items) ? snapshot.items : [];
const enrichedItems = await Promise.all(items.map(async (item) => {
const task = asJsonRecord(item);
if (!task) return item as JsonValue;
const supervisor = await queueTaskSupervisor(store, task);
return supervisor ? { ...task, supervisor, valuesPrinted: false } : task;
}));
return { ...snapshot, items: enrichedItems, valuesPrinted: false } as JsonValue;
}
async function queueTaskSupervisor(store: AgentRunStore, task: JsonRecord): Promise<JsonRecord | null> {
const attempt = asJsonRecord(task.latestAttempt);
const runId = stringJsonValue(attempt?.runId);
if (!runId) return null;
try {
const result = await buildRunResult(store, runId, stringJsonValue(attempt?.commandId) ?? undefined);
const liveness = asJsonRecord(result.liveness);
const lastActivity = asJsonRecord(liveness?.lastActivity ?? liveness?.lastCommandActivity);
const timeoutBudget = asJsonRecord(liveness?.timeoutBudget);
return {
runId: stringJsonValue(result.runId),
commandId: stringJsonValue(result.commandId),
status: stringJsonValue(result.status),
terminalStatus: stringJsonValue(result.terminalStatus),
failureKind: stringJsonValue(result.failureKind),
phase: stringJsonValue(liveness?.phase),
active: liveness?.active === true,
lastSeq: numberJsonValue(liveness?.lastSeq ?? result.lastSeq),
lastActivity: lastActivity ? compactActivity(lastActivity) : null,
timeoutBudget: timeoutBudget ? compactTimeoutBudget(timeoutBudget) : null,
recoveryActions: compactRecoveryActions(liveness?.recoveryActions),
valuesPrinted: false,
};
} catch (error) {
return { phase: "unavailable", failureKind: "infra-failed", message: error instanceof Error ? error.message : String(error), valuesPrinted: false };
}
}
function compactActivity(activity: JsonRecord): JsonRecord {
return {
sourceSeq: numberJsonValue(activity.sourceSeq ?? activity.seq),
eventId: stringJsonValue(activity.eventId),
activityKind: stringJsonValue(activity.activityKind),
type: stringJsonValue(activity.type),
status: stringJsonValue(activity.status),
toolName: stringJsonValue(activity.toolName),
itemId: stringJsonValue(activity.itemId),
ageMs: numberJsonValue(activity.ageMs),
summary: boundedJsonString(activity.summary, 180),
valuesPrinted: false,
};
}
function compactTimeoutBudget(budget: JsonRecord): JsonRecord {
return {
state: stringJsonValue(budget.state),
timeoutMs: numberJsonValue(budget.timeoutMs),
elapsedMs: numberJsonValue(budget.elapsedMs),
remainingMs: numberJsonValue(budget.remainingMs),
startedAt: stringJsonValue(budget.startedAt),
source: stringJsonValue(budget.source),
valuesPrinted: false,
};
}
function compactRecoveryActions(value: JsonValue | undefined): JsonValue[] {
if (!Array.isArray(value)) return [];
return value.slice(0, 5).map((item) => {
const action = asJsonRecord(item);
if (!action) return { action: "unknown", valuesPrinted: false };
return {
action: stringJsonValue(action.action),
reason: stringJsonValue(action.reason),
runId: stringJsonValue(action.runId),
commandId: stringJsonValue(action.commandId),
sessionId: stringJsonValue(action.sessionId),
afterSeq: numberJsonValue(action.afterSeq),
command: boundedJsonString(action.command, 220),
hint: boundedJsonString(action.hint, 220),
valuesPrinted: false,
};
});
}
async function route({ method, url, body, store, sourceCommit, runnerJobDefaults, sessionPvcDefaults, providerProfileDefaults }: { method: string; url: URL; body: unknown; store: AgentRunStore; sourceCommit: string; runnerJobDefaults?: NonNullable<ManagerServerOptions["runnerJobDefaults"]>; sessionPvcDefaults?: NonNullable<ManagerServerOptions["sessionPvcOptions"]>; providerProfileDefaults?: NonNullable<ManagerServerOptions["providerProfileOptions"]> }): Promise<JsonValue> {
const path = url.pathname;
if (method === "GET" && (path === "/health" || path === "/health/live" || path === "/health/readiness")) {
@@ -334,7 +420,7 @@ async function route({ method, url, body, store, sourceCommit, runnerJobDefaults
if (method === "GET" && path === "/api/v1/queue/commander") {
const queue = url.searchParams.get("queue") ?? undefined;
await refreshRunningQueueTasksForRead(store, queue);
return await store.queueCommander(queue, url.searchParams.get("readerId")) as unknown as JsonValue;
return await queueCommanderForRead(store, queue, url.searchParams.get("readerId"));
}
if (method === "POST" && path === "/api/v1/runs") return await store.createRun(validateCreateRun(body)) as unknown as JsonValue;
const runMatch = path.match(/^\/api\/v1\/runs\/([^/]+)$/u);
@@ -460,6 +546,24 @@ function numberField(record: JsonRecord, key: string, fallback: number): number
return typeof value === "number" && Number.isFinite(value) ? value : fallback;
}
function asJsonRecord(value: unknown): JsonRecord | null {
return typeof value === "object" && value !== null && !Array.isArray(value) ? value as JsonRecord : null;
}
function stringJsonValue(value: JsonValue | undefined): string | null {
return typeof value === "string" && value.length > 0 ? value : null;
}
function numberJsonValue(value: JsonValue | undefined): number | null {
return typeof value === "number" && Number.isFinite(value) ? value : null;
}
function boundedJsonString(value: JsonValue | undefined, limit: number): string | null {
if (typeof value !== "string" || value.length === 0) return null;
const normalized = value.replace(/\s+/gu, " ").trim();
return normalized.length > limit ? `${normalized.slice(0, Math.max(0, limit - 3))}...` : normalized;
}
function stringField(record: JsonRecord, key: string): string {
const value = record[key];
if (typeof value !== "string" || value.trim().length === 0) throw new AgentRunError("schema-invalid", `${key} is required`, { httpStatus: 400 });
+132
View File
@@ -0,0 +1,132 @@
import assert from "node:assert/strict";
import { setTimeout as sleep } from "node:timers/promises";
import { ManagerClient } from "../../mgr/client.js";
import { startManagerServer } from "../../mgr/server.js";
import { MemoryAgentRunStore } from "../../mgr/store.js";
import type { JsonRecord } from "../../common/types.js";
import { assertNoSecretLeak, type SelfTestCase, type SelfTestContext } from "../harness.js";
import { summarizeQueueCommanderSnapshot } from "../../../scripts/src/cli.js";
const selfTest: SelfTestCase = async (context: SelfTestContext) => {
const store = new MemoryAgentRunStore();
const server = await startManagerServer({ port: 0, host: "127.0.0.1", sourceCommit: "self-test", store });
try {
const client = new ManagerClient(server.baseUrl);
const tool = await createActiveRun(client, context, "timeout-liveness-tool", 120_000);
await client.post(`/api/v1/runs/${tool.runId}/events`, { type: "tool_call", payload: { commandId: tool.commandId, itemId: "tool_live", toolName: "commandExecution", status: "running", command: "hwpod workspace apply-patch" } });
const toolResult = await commandResult(client, tool);
const toolLive = toolResult.liveness as JsonRecord;
assert.equal(toolLive.phase, "waiting-tool");
assert.equal(((toolLive.lastActivity as JsonRecord).activityKind), "tool-in-flight");
assert.equal(((toolLive.lastActivity as JsonRecord).sourceSeq), 4);
assert.equal(((toolLive.timeoutBudget as JsonRecord).state), "within-budget");
assert.ok(Array.isArray(toolLive.recoveryActions));
const assistant = await createActiveRun(client, context, "timeout-liveness-assistant", 120_000);
await client.post(`/api/v1/runs/${assistant.runId}/events`, { type: "assistant_message", payload: { commandId: assistant.commandId, itemId: "msg_progress", progress: true, text: "正在生成 apply-patch,先汇总待改文件。" } });
const assistantLive = (await commandResult(client, assistant)).liveness as JsonRecord;
assert.equal(assistantLive.phase, "waiting-model-output");
assert.equal(((assistantLive.lastActivity as JsonRecord).activityKind), "assistant-progress");
const inactive = await createActiveRun(client, context, "timeout-liveness-inactive", 40);
await sleep(36);
const inactiveLive = (await commandResult(client, inactive)).liveness as JsonRecord;
assert.equal(inactiveLive.phase, "runner-stdio-inactive");
assert.ok(["approaching-hard-timeout", "overdue"].includes(String((inactiveLive.timeoutBudget as JsonRecord).state)));
const terminal = await createActiveRun(client, context, "timeout-liveness-terminal", 50);
await client.post(`/api/v1/runs/${terminal.runId}/events`, { type: "error", payload: { commandId: terminal.commandId, failureKind: "backend-timeout", phase: "turn:hard-timeout", message: "codex stdio turn hard timed out after 50ms" } });
await client.patch(`/api/v1/commands/${terminal.commandId}/status`, { terminalStatus: "failed", failureKind: "backend-timeout", failureMessage: "codex stdio turn hard timed out after 50ms" });
await client.patch(`/api/v1/runs/${terminal.runId}/status`, { terminalStatus: "failed", failureKind: "backend-timeout", failureMessage: "codex stdio turn hard timed out after 50ms" });
const terminalResult = await commandResult(client, terminal);
const terminalLive = terminalResult.liveness as JsonRecord;
assert.equal(terminalResult.terminalStatus, "failed");
assert.equal(terminalLive.phase, "terminal");
assert.equal(((terminalLive.timeoutBudget as JsonRecord).state), "timed-out");
assert.ok((terminalLive.recoveryActions as JsonRecord[]).some((action) => action.action === "resume-session"));
assert.ok((terminalLive.recoveryActions as JsonRecord[]).some((action) => action.action === "split-task"));
const session = await client.get(`/api/v1/sessions/${terminal.sessionId}?readerId=timeout-liveness`) as JsonRecord;
assert.equal(((session.liveness as JsonRecord).phase), "terminal");
assert.ok(Array.isArray(((session.supervisor as JsonRecord).recoveryActions)), "session show must keep terminal recovery actions");
const task = await client.post("/api/v1/queue/tasks", queueTask(context, terminal.sessionId, 50)) as JsonRecord;
store.updateQueueTaskAttempt(String(task.id), {
state: "running",
latestAttempt: { attemptId: "attempt_timeout_liveness", state: "running", runId: terminal.runId, commandId: terminal.commandId, runnerJobId: null, sessionId: terminal.sessionId, sessionPath: `/api/v1/sessions/${terminal.sessionId}` },
sessionPath: `/api/v1/sessions/${terminal.sessionId}`,
});
const commander = await client.get("/api/v1/queue/commander?queue=timeout-liveness&readerId=timeout-liveness") as JsonRecord;
const commanderItem = ((commander.items as JsonRecord[]) ?? []).find((item) => item.id === task.id) as JsonRecord;
assert.equal(((commanderItem.supervisor as JsonRecord).phase), "terminal");
assert.equal((((commanderItem.supervisor as JsonRecord).timeoutBudget as JsonRecord).state), "timed-out");
const commanderSummary = summarizeQueueCommanderSnapshot(commander, { limit: 5 });
const summaryItem = ((commanderSummary.items as JsonRecord[]) ?? []).find((item) => item.id === task.id) as JsonRecord;
assert.equal(((summaryItem.supervisor as JsonRecord).phase), "terminal");
assert.equal(JSON.stringify(commanderSummary).includes("hwpod workspace apply-patch"), false, "commander summary must stay compact and avoid dumping command bodies");
assertNoSecretLeak({ toolResult, assistantLive, inactiveLive, terminalResult, session, commanderSummary });
return { name: "timeout-liveness", tests: ["tool-in-flight-liveness", "assistant-progress-liveness", "stdio-inactive-timeout-budget", "terminal-timeout-recovery", "queue-commander-supervisor"] };
} finally {
await new Promise<void>((resolve) => server.server.close(() => resolve()));
}
};
async function createActiveRun(client: ManagerClient, context: SelfTestContext, sessionSuffix: string, timeoutMs: number): Promise<{ runId: string; commandId: string; sessionId: string }> {
const sessionId = `selftest-${sessionSuffix}`;
const run = await client.post("/api/v1/runs", runBody(context, sessionId, timeoutMs)) as JsonRecord;
const command = await client.post(`/api/v1/runs/${run.id}/commands`, { type: "turn", payload: { prompt: sessionSuffix }, idempotencyKey: sessionSuffix }) as JsonRecord;
await client.post(`/api/v1/runs/${run.id}/claim`, { runnerId: `runner_${sessionSuffix}`, leaseMs: 60_000 });
await client.post(`/api/v1/commands/${command.id}/ack`, {});
return { runId: String(run.id), commandId: String(command.id), sessionId };
}
async function commandResult(client: ManagerClient, item: { runId: string; commandId: string }): Promise<JsonRecord> {
return await client.get(`/api/v1/runs/${item.runId}/commands/${item.commandId}/result`) as JsonRecord;
}
function runBody(context: SelfTestContext, sessionId: string, timeoutMs: number): JsonRecord {
return {
tenantId: "unidesk",
projectId: "pikasTech/agentrun",
workspaceRef: { kind: "host-path", path: context.workspace },
sessionRef: { sessionId, conversationId: sessionId },
providerId: "G14",
backendProfile: "codex",
executionPolicy: executionPolicy(timeoutMs, context.codexHome),
traceSink: null,
};
}
function queueTask(context: SelfTestContext, sessionId: string, timeoutMs: number): JsonRecord {
return {
tenantId: "unidesk",
projectId: "pikasTech/agentrun",
queue: "timeout-liveness",
lane: "selftest",
title: "timeout liveness commander",
priority: 1,
backendProfile: "codex",
providerId: "G14",
workspaceRef: { kind: "host-path", path: context.workspace },
sessionRef: { sessionId, conversationId: sessionId },
executionPolicy: executionPolicy(timeoutMs, context.codexHome),
resourceBundleRef: null,
payload: { prompt: "timeout liveness commander" },
references: [],
metadata: {},
};
}
function executionPolicy(timeoutMs: number, codexHome: string): JsonRecord {
return {
sandbox: "workspace-write",
approval: "never",
timeoutMs,
network: "default",
secretScope: { allowCredentialEcho: false, providerCredentials: [{ profile: "codex", secretRef: { name: "agentrun-v01-provider-codex", keys: ["auth.json", "config.toml"], mountPath: codexHome } }] },
};
}
export default selfTest;