From 92f78b6a5b0fd244db2548f3bcf5289aea2065d5 Mon Sep 17 00:00:00 2001 From: Lyon <88232613+pikasTech@users.noreply.github.com> Date: Thu, 25 Jun 2026 09:34:19 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E8=A1=A5=E9=BD=90=20cancel=20lifecycle?= =?UTF-8?q?=20=E8=BF=90=E8=A1=8C=E6=97=B6=E6=B2=BB=E7=90=86=20(#245)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/common/types.ts | 27 ++++ src/mgr/postgres-store.ts | 150 ++++++++++++++++-- src/mgr/result.ts | 34 ++++ src/mgr/server.ts | 2 +- src/mgr/store.ts | 146 +++++++++++++++-- src/runner/run-once.ts | 19 ++- src/selftest/cases/00-redaction-postgres.ts | 5 +- src/selftest/cases/20-runner-k8s-job.ts | 4 + .../cases/50-hwlab-manual-dispatch.ts | 48 +++++- 9 files changed, 390 insertions(+), 45 deletions(-) diff --git a/src/common/types.ts b/src/common/types.ts index 8d32e69..9da8e92 100644 --- a/src/common/types.ts +++ b/src/common/types.ts @@ -34,6 +34,8 @@ export type FailureKind = export type RunStatus = "pending" | "claimed" | "running" | "completed" | "failed" | "blocked" | "cancelled"; export type CommandState = "pending" | "acknowledged" | "completed" | "failed" | "cancelled"; export type TerminalStatus = "completed" | "failed" | "blocked" | "cancelled"; +export type CancelTargetKind = "task" | "session" | "run" | "command"; +export type CancelStage = "accepted" | "persisted" | "delivered" | "aborting" | "terminalized" | "fenced" | "late-write-rejected"; export type BackendProfile = string; export type QueueTaskState = "pending" | "running" | "completed" | "failed" | "blocked" | "cancelled"; export type QueueTaskAttentionState = "active" | "unread" | "read"; @@ -213,6 +215,10 @@ export interface RunRecord extends CreateRunInput { terminalStatus: TerminalStatus | null; failureKind: FailureKind | null; failureMessage: string | null; + cancelEpoch: number; + cancelRequestId: string | null; + cancelRequestedAt: string | null; + cancelReason: string | null; createdAt: string; updatedAt: string; claimedBy: string | null; @@ -233,11 +239,32 @@ export interface CommandRecord extends CreateCommandInput { seq: number; state: CommandState; payloadHash: string; + cancelEpoch: number; + cancelRequestId: string | null; + cancelRequestedAt: string | null; + cancelReason: string | null; createdAt: string; updatedAt: string; acknowledgedAt: string | null; } +export interface CancelRequestRecord extends JsonRecord { + id: string; + targetKind: CancelTargetKind; + targetId: string; + runId: string | null; + commandId: string | null; + sessionId: string | null; + taskId: string | null; + reason: string; + requestedBy: string | null; + epoch: number; + stage: CancelStage; + metadata: JsonRecord; + createdAt: string; + updatedAt: string; +} + export type EventType = "backend_status" | "assistant_message" | "tool_call" | "command_output" | "diff" | "error" | "terminal_status"; export interface RunEvent extends JsonRecord { diff --git a/src/mgr/postgres-store.ts b/src/mgr/postgres-store.ts index 7f93466..85cec70 100644 --- a/src/mgr/postgres-store.ts +++ b/src/mgr/postgres-store.ts @@ -3,10 +3,10 @@ import { Pool } from "pg"; import type { PoolClient, QueryResultRow } from "pg"; import { AgentRunError } from "../common/errors.js"; import { redactJson } from "../common/redaction.js"; -import type { BackendProfile, BackendTurnResult, CommandRecord, CommandState, CreateCommandInput, CreateQueueTaskInput, CreateRunInput, EventType, FailureKind, JsonRecord, JsonValue, ListGcExpiredSessionsInput, QueueCommanderSnapshot, QueueReadCursorRecord, QueueStats, QueueTaskListResult, QueueTaskRecord, QueueTaskState, RunEvent, RunnerJobRecord, RunnerRecord, RunRecord, RunStatus, SessionEventPage, SessionListResult, SessionReadCursorRecord, SessionRecord, SessionRef, SessionStoragePatch, SessionSummary, TerminalStatus, UpsertSessionInput } from "../common/types.js"; +import type { BackendProfile, BackendTurnResult, CancelRequestRecord, CancelStage, CancelTargetKind, CommandRecord, CommandState, CreateCommandInput, CreateQueueTaskInput, CreateRunInput, EventType, FailureKind, JsonRecord, JsonValue, ListGcExpiredSessionsInput, QueueCommanderSnapshot, QueueReadCursorRecord, QueueStats, QueueTaskListResult, QueueTaskRecord, QueueTaskState, RunEvent, RunnerJobRecord, RunnerRecord, RunRecord, RunStatus, SessionEventPage, SessionListResult, SessionReadCursorRecord, SessionRecord, SessionRef, SessionStoragePatch, SessionSummary, TerminalStatus, UpsertSessionInput } from "../common/types.js"; import { newId, nowIso, stableHash } from "../common/validation.js"; import type { AgentRunStore, ListQueueTasksInput, ListSessionsInput, SaveRunnerJobInput, SessionEventPageInput, StoreHealth, UpdateQueueTaskAttemptInput } from "./store.js"; -import { assertSessionBoundary, buildQueueStats, buildQueueTaskSummary, buildSessionSummary, clampQueueLimit, clampSessionLimit, commandStateFromTerminal, isLeaseExpired, isSessionOutputEvent, isTerminalCommandState, isTerminalQueueTaskState, isTerminalRunStatus, parseQueueCursor, parseSessionCursor, queueTaskMatchesCommander, queueTaskPayloadHash, queueTaskSort, sessionListFilters, sessionMatchesListState, sessionRefFromRecord, sessionSort, sessionTitleFromCommand, statusFromTerminal, summarizeResourceBundleRef, summarizeSessionRef, titleFromMetadata } from "./store.js"; +import { assertSessionBoundary, buildQueueStats, buildQueueTaskSummary, buildSessionSummary, cancelStagePayload, clampQueueLimit, clampSessionLimit, commandStateFromTerminal, fenceLateEventForCancelledRun, isLeaseExpired, isSessionOutputEvent, isTerminalCommandState, isTerminalQueueTaskState, isTerminalRunStatus, lateWriteRejectedPayload, parseQueueCursor, parseSessionCursor, queueTaskMatchesCommander, queueTaskPayloadHash, queueTaskSort, sessionListFilters, sessionMatchesListState, sessionRefFromRecord, sessionSort, sessionTitleFromCommand, statusFromTerminal, summarizeResourceBundleRef, summarizeSessionRef, titleFromMetadata } from "./store.js"; import { backendCapabilitiesSqlValues, mergeBackendCapability } from "../common/backend-profiles.js"; import { normalizeRunEventPayload, requireEventType } from "../common/events.js"; @@ -335,6 +335,39 @@ ALTER TABLE agentrun_queue_tasks ADD COLUMN IF NOT EXISTS session_ref jsonb; `; +const cancelLifecycleMigrationSql = ` +ALTER TABLE agentrun_runs ADD COLUMN IF NOT EXISTS cancel_epoch integer NOT NULL DEFAULT 0; +ALTER TABLE agentrun_runs ADD COLUMN IF NOT EXISTS cancel_request_id text; +ALTER TABLE agentrun_runs ADD COLUMN IF NOT EXISTS cancel_requested_at timestamptz; +ALTER TABLE agentrun_runs ADD COLUMN IF NOT EXISTS cancel_reason text; + +ALTER TABLE agentrun_commands ADD COLUMN IF NOT EXISTS cancel_epoch integer NOT NULL DEFAULT 0; +ALTER TABLE agentrun_commands ADD COLUMN IF NOT EXISTS cancel_request_id text; +ALTER TABLE agentrun_commands ADD COLUMN IF NOT EXISTS cancel_requested_at timestamptz; +ALTER TABLE agentrun_commands ADD COLUMN IF NOT EXISTS cancel_reason text; + +CREATE TABLE IF NOT EXISTS agentrun_cancel_requests ( + id text PRIMARY KEY, + target_kind text NOT NULL, + target_id text NOT NULL, + run_id text REFERENCES agentrun_runs(id) ON DELETE CASCADE, + command_id text REFERENCES agentrun_commands(id) ON DELETE SET NULL, + session_id text, + task_id text, + reason text NOT NULL, + requested_by text, + epoch integer NOT NULL, + stage text NOT NULL, + metadata jsonb NOT NULL DEFAULT '{}'::jsonb, + created_at timestamptz NOT NULL, + updated_at timestamptz NOT NULL +); + +CREATE INDEX IF NOT EXISTS agentrun_cancel_requests_target_idx ON agentrun_cancel_requests (target_kind, target_id, created_at DESC); +CREATE INDEX IF NOT EXISTS agentrun_cancel_requests_run_idx ON agentrun_cancel_requests (run_id, created_at DESC); +CREATE INDEX IF NOT EXISTS agentrun_cancel_requests_command_idx ON agentrun_cancel_requests (command_id, created_at DESC); +`; + const postgresMigrations: MigrationDefinition[] = [ { id: "001_v01_initial_durable_store", @@ -386,13 +419,18 @@ const postgresMigrations: MigrationDefinition[] = [ checksum: checksumSql(queueSessionRefMigrationSql), sql: queueSessionRefMigrationSql, }, + { + id: "011_v01_cancel_lifecycle", + checksum: checksumSql(cancelLifecycleMigrationSql), + sql: cancelLifecycleMigrationSql, + }, ]; export function postgresMigrationContract(): JsonRecord { return { migrationIds: postgresMigrations.map((migration) => migration.id), latestMigrationId: latestMigrationId(), - requiredTables: ["agentrun_schema_migrations", "agentrun_runs", "agentrun_commands", "agentrun_events", "agentrun_runners", "agentrun_backends", "agentrun_leases", "agentrun_sessions", "agentrun_session_read_cursors", "agentrun_runner_jobs", "agentrun_queue_tasks", "agentrun_queue_read_cursors"], + requiredTables: ["agentrun_schema_migrations", "agentrun_runs", "agentrun_commands", "agentrun_events", "agentrun_runners", "agentrun_backends", "agentrun_leases", "agentrun_sessions", "agentrun_session_read_cursors", "agentrun_runner_jobs", "agentrun_queue_tasks", "agentrun_queue_read_cursors", "agentrun_cancel_requests"], checksums: Object.fromEntries(postgresMigrations.map((migration) => [migration.id, migration.checksum])), }; } @@ -452,7 +490,7 @@ CREATE TABLE IF NOT EXISTS agentrun_schema_migrations ( const at = nowIso(); return this.withTransaction(async (client) => { const sessionRef = await this.resolveSessionForRun(client, input, at); - const run: RunRecord = { ...input, sessionRef, resourceBundleRef: input.resourceBundleRef ?? null, id: newId("run"), status: "pending", terminalStatus: null, failureKind: null, failureMessage: null, createdAt: at, updatedAt: at, claimedBy: null, leaseExpiresAt: null }; + const run: RunRecord = { ...input, sessionRef, resourceBundleRef: input.resourceBundleRef ?? null, id: newId("run"), status: "pending", terminalStatus: null, failureKind: null, failureMessage: null, cancelEpoch: 0, cancelRequestId: null, cancelRequestedAt: null, cancelReason: null, createdAt: at, updatedAt: at, claimedBy: null, leaseExpiresAt: null }; await client.query( `INSERT INTO agentrun_runs (id, tenant_id, project_id, workspace_ref, session_ref, resource_bundle_ref, provider_id, backend_profile, execution_policy, trace_sink, status, terminal_status, failure_kind, failure_message, created_at, updated_at, claimed_by, lease_expires_at) VALUES ($1, $2, $3, $4::jsonb, $5::jsonb, $6::jsonb, $7, $8, $9::jsonb, $10::jsonb, $11, $12, $13, $14, $15, $16, $17, $18)`, @@ -509,7 +547,7 @@ CREATE TABLE IF NOT EXISTS agentrun_schema_migrations ( } const seq = await this.nextSeq(client, "agentrun_commands", runId); const at = nowIso(); - const command: CommandRecord = { ...input, id: newId("cmd"), runId, seq, state: "pending", payloadHash, createdAt: at, updatedAt: at, acknowledgedAt: null }; + const command: CommandRecord = { ...input, id: newId("cmd"), runId, seq, state: "pending", payloadHash, cancelEpoch: 0, cancelRequestId: null, cancelRequestedAt: null, cancelReason: null, createdAt: at, updatedAt: at, acknowledgedAt: null }; await client.query( `INSERT INTO agentrun_commands (id, run_id, seq, type, payload, payload_hash, idempotency_key, state, created_at, updated_at, acknowledged_at) VALUES ($1, $2, $3, $4, $5::jsonb, $6, $7, $8, $9, $10, $11)`, @@ -684,7 +722,13 @@ CREATE TABLE IF NOT EXISTS agentrun_schema_migrations ( const row = existing.rows[0]; if (!row) throw new AgentRunError("schema-invalid", `command ${commandId} was not found`, { httpStatus: 404 }); const command = commandFromRow(row); - if (isTerminalCommandState(command.state)) return command; + if (isTerminalCommandState(command.state)) { + if (command.state === "cancelled" && result.terminalStatus !== "cancelled") { + const run = await this.requireRunForUpdate(client, command.runId); + await this.appendEventWithLockedRun(client, command.runId, "backend_status", lateWriteRejectedPayload(run, command, { source: "command-status", terminalStatus: result.terminalStatus, failureKind: result.failureKind, message: result.failureMessage ?? null, threadId: result.threadId ?? null, turnId: result.turnId ?? null })); + } + return command; + } const state = commandStateFromTerminal(result.terminalStatus); const updated = await client.query("UPDATE agentrun_commands SET state = $2, updated_at = $3 WHERE id = $1 RETURNING *", [commandId, state, nowIso()]); const run = await this.requireRunForUpdate(client, command.runId); @@ -705,7 +749,10 @@ CREATE TABLE IF NOT EXISTS agentrun_schema_migrations ( async finishRun(runId: string, result: Pick): Promise { return this.withTransaction(async (client) => { const existing = await this.requireRunForUpdate(client, runId); - if (isTerminalRunStatus(existing.status)) return existing; + if (isTerminalRunStatus(existing.status)) { + if (existing.status === "cancelled" && result.terminalStatus !== "cancelled") await this.appendEventWithLockedRun(client, runId, "backend_status", lateWriteRejectedPayload(existing, null, { source: "run-status", terminalStatus: result.terminalStatus, failureKind: result.failureKind, message: result.failureMessage ?? null, threadId: result.threadId ?? null, turnId: result.turnId ?? null })); + return existing; + } const status = statusFromTerminal(result.terminalStatus); const updated = await client.query( `UPDATE agentrun_runs SET status = $2, terminal_status = $3, failure_kind = $4, failure_message = $5, updated_at = $6 WHERE id = $1 RETURNING *`, @@ -724,18 +771,32 @@ CREATE TABLE IF NOT EXISTS agentrun_schema_migrations ( const run = await this.requireRunForUpdate(client, runId); if (isTerminalRunStatus(run.status)) return run; const at = nowIso(); + const cancel = await this.createCancelRequest(client, { targetKind: "run", targetId: runId, run, command: null, reason, at, stage: "accepted" }); + await this.appendCancelStage(client, runId, "accepted", cancel); + const persistedRunResult = await client.query( + `UPDATE agentrun_runs SET cancel_epoch = $2, cancel_request_id = $3, cancel_requested_at = $4, cancel_reason = $5, updated_at = $4 WHERE id = $1 RETURNING *`, + [runId, cancel.epoch, cancel.id, at, reason], + ); + const persistedRun = runFromRow(persistedRunResult.rows[0]); + await this.appendCancelStage(client, runId, "persisted", cancel); + const leaseExpired = Boolean(persistedRun.claimedBy && isLeaseExpired(persistedRun.leaseExpiresAt)); + if (leaseExpired) await this.appendCancelStage(client, runId, "fenced", cancel, { claimedBy: persistedRun.claimedBy, leaseExpiresAt: persistedRun.leaseExpiresAt }); + else if (persistedRun.claimedBy) { + await this.appendCancelStage(client, runId, "delivered", cancel, { claimedBy: persistedRun.claimedBy }); + await this.appendCancelStage(client, runId, "aborting", cancel, { claimedBy: persistedRun.claimedBy }); + } const commands = await client.query("SELECT * FROM agentrun_commands WHERE run_id = $1 AND state NOT IN ('completed', 'failed', 'cancelled') FOR UPDATE", [runId]); for (const row of commands.rows) { const command = commandFromRow(row); - await client.query("UPDATE agentrun_commands SET state = 'cancelled', updated_at = $2 WHERE id = $1", [command.id, at]); - await this.appendEventWithLockedRun(client, runId, "backend_status", { phase: "command-terminal", commandId: command.id, state: "cancelled", terminalStatus: "cancelled", failureKind: "cancelled" }); + await client.query("UPDATE agentrun_commands SET state = 'cancelled', cancel_epoch = $2, cancel_request_id = $3, cancel_requested_at = $4, cancel_reason = $5, updated_at = $4 WHERE id = $1", [command.id, cancel.epoch, cancel.id, at, reason]); + await this.appendEventWithLockedRun(client, runId, "backend_status", { phase: "command-terminal", commandId: command.id, state: "cancelled", terminalStatus: "cancelled", failureKind: "cancelled", cancelRequestId: cancel.id, cancelEpoch: cancel.epoch }); } - await this.appendEventWithLockedRun(client, runId, "backend_status", { phase: "cancel-requested", reason }); const updated = await client.query( `UPDATE agentrun_runs SET status = 'cancelled', terminal_status = 'cancelled', failure_kind = 'cancelled', failure_message = $2, updated_at = $3 WHERE id = $1 RETURNING *`, [runId, reason, at], ); - await this.appendEventWithLockedRun(client, runId, "terminal_status", { terminalStatus: "cancelled", failureKind: "cancelled", message: reason }); + await this.appendEventWithLockedRun(client, runId, "terminal_status", { terminalStatus: "cancelled", failureKind: "cancelled", message: reason, cancelRequestId: cancel.id, cancelEpoch: cancel.epoch }); + await this.appendCancelStage(client, runId, "terminalized", cancel); const next = runFromRow(updated.rows[0]); await this.touchSessionForRun(client, next, { executionState: "terminal", activeRunId: null, activeCommandId: null, lastRunId: runId, terminalStatus: "cancelled", failureKind: "cancelled", lastActivityAt: at }, { bumpVersion: true, at }); return next; @@ -749,11 +810,22 @@ CREATE TABLE IF NOT EXISTS agentrun_schema_migrations ( if (!row) throw new AgentRunError("schema-invalid", `command ${commandId} was not found`, { httpStatus: 404 }); const command = commandFromRow(row); if (isTerminalCommandState(command.state)) return command; - const updated = await client.query("UPDATE agentrun_commands SET state = 'cancelled', updated_at = $2 WHERE id = $1 RETURNING *", [commandId, nowIso()]); - await this.appendEventWithLockedRun(client, command.runId, "backend_status", { phase: "command-terminal", commandId, state: "cancelled", terminalStatus: "cancelled", failureKind: "cancelled", message: reason }); + const run = await this.requireRunForUpdate(client, command.runId); + const at = nowIso(); + const cancel = await this.createCancelRequest(client, { targetKind: "command", targetId: commandId, run, command, reason, at, stage: "accepted" }); + await this.appendCancelStage(client, command.runId, "accepted", cancel); + const updated = await client.query("UPDATE agentrun_commands SET state = 'cancelled', cancel_epoch = $2, cancel_request_id = $3, cancel_requested_at = $4, cancel_reason = $5, updated_at = $4 WHERE id = $1 RETURNING *", [commandId, cancel.epoch, cancel.id, at, reason]); + await this.appendCancelStage(client, command.runId, "persisted", cancel); + const leaseExpired = Boolean(run.claimedBy && isLeaseExpired(run.leaseExpiresAt)); + if (leaseExpired) await this.appendCancelStage(client, command.runId, "fenced", cancel, { claimedBy: run.claimedBy, leaseExpiresAt: run.leaseExpiresAt }); + else if (run.claimedBy || command.state === "acknowledged") { + await this.appendCancelStage(client, command.runId, "delivered", cancel, { claimedBy: run.claimedBy, commandState: command.state }); + await this.appendCancelStage(client, command.runId, "aborting", cancel, { claimedBy: run.claimedBy, commandState: command.state }); + } + await this.appendEventWithLockedRun(client, command.runId, "backend_status", { phase: "command-terminal", commandId, state: "cancelled", terminalStatus: "cancelled", failureKind: "cancelled", message: reason, cancelRequestId: cancel.id, cancelEpoch: cancel.epoch }); + await this.appendCancelStage(client, command.runId, "terminalized", cancel); if (command.type === "turn") { - const run = await this.requireRunForUpdate(client, command.runId); - await this.touchSessionForRun(client, run, { executionState: "terminal", activeRunId: null, activeCommandId: null, lastRunId: command.runId, lastCommandId: command.id, terminalStatus: "cancelled", failureKind: "cancelled", lastActivityAt: nowIso() }, { bumpVersion: true }); + await this.touchSessionForRun(client, run, { executionState: "terminal", activeRunId: null, activeCommandId: null, lastRunId: command.runId, lastCommandId: command.id, terminalStatus: "cancelled", failureKind: "cancelled", lastActivityAt: at }, { bumpVersion: true, at }); } return commandFromRow(updated.rows[0]); }); @@ -1094,11 +1166,47 @@ CREATE TABLE IF NOT EXISTS agentrun_schema_migrations ( await this.pool.end(); } + private async createCancelRequest(client: PoolClient, input: { targetKind: CancelTargetKind; targetId: string; run: RunRecord; command: CommandRecord | null; reason: string; at: string; stage: CancelStage }): Promise { + const epoch = input.command ? input.command.cancelEpoch + 1 : input.run.cancelEpoch + 1; + const record: CancelRequestRecord = { + id: newId("cancel"), + targetKind: input.targetKind, + targetId: input.targetId, + runId: input.run.id, + commandId: input.command?.id ?? null, + sessionId: input.run.sessionRef?.sessionId ?? null, + taskId: null, + reason: input.reason, + requestedBy: null, + epoch, + stage: input.stage, + metadata: {}, + createdAt: input.at, + updatedAt: input.at, + }; + await client.query( + `INSERT INTO agentrun_cancel_requests (id, target_kind, target_id, run_id, command_id, session_id, task_id, reason, requested_by, epoch, stage, metadata, created_at, updated_at) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12::jsonb, $13, $14)`, + [record.id, record.targetKind, record.targetId, record.runId, record.commandId, record.sessionId, record.taskId, record.reason, record.requestedBy, record.epoch, record.stage, JSON.stringify(record.metadata), record.createdAt, record.updatedAt], + ); + return record; + } + + private async appendCancelStage(client: PoolClient, runId: string, stage: CancelStage, cancel: CancelRequestRecord, extra: JsonRecord = {}): Promise { + const at = nowIso(); + const next: CancelRequestRecord = { ...cancel, stage, updatedAt: at }; + await client.query("UPDATE agentrun_cancel_requests SET stage = $2, updated_at = $3 WHERE id = $1", [cancel.id, stage, at]); + await this.appendEventWithLockedRun(client, runId, "backend_status", cancelStagePayload(next, stage, extra)); + return next; + } + private async appendEventWithLockedRun(client: PoolClient, runId: string, type: EventType, payload: JsonRecord): Promise { + const run = await this.requireRunForUpdate(client, runId); const eventType = requireEventType(type); - const eventPayload = normalizeRunEventPayload(eventType, payload); + const fenced = fenceLateEventForCancelledRun(run, eventType, payload); + const eventPayload = normalizeRunEventPayload(fenced.type, fenced.payload); const seq = await this.nextSeq(client, "agentrun_events", runId); - const event: RunEvent = { id: newId("evt"), runId, seq, type: eventType, payload: redactJson(eventPayload), createdAt: nowIso() }; + const event: RunEvent = { id: newId("evt"), runId, seq, type: fenced.type, payload: redactJson(eventPayload), createdAt: nowIso() }; await client.query("INSERT INTO agentrun_events (id, run_id, seq, type, payload, created_at) VALUES ($1, $2, $3, $4, $5::jsonb, $6)", [event.id, event.runId, event.seq, event.type, JSON.stringify(event.payload), event.createdAt]); await client.query( `UPDATE agentrun_sessions @@ -1362,6 +1470,10 @@ function runFromRow(row: QueryResultRow): RunRecord { terminalStatus: nullableString(row.terminal_status) as TerminalStatus | null, failureKind: nullableString(row.failure_kind) as FailureKind | null, failureMessage: nullableString(row.failure_message), + cancelEpoch: Number(row.cancel_epoch ?? 0), + cancelRequestId: nullableString(row.cancel_request_id), + cancelRequestedAt: nullableIso(row.cancel_requested_at), + cancelReason: nullableString(row.cancel_reason), createdAt: iso(row.created_at), updatedAt: iso(row.updated_at), claimedBy: nullableString(row.claimed_by), @@ -1379,6 +1491,10 @@ function commandFromRow(row: QueryResultRow): CommandRecord { payloadHash: stringValue(row.payload_hash), ...(nullableString(row.idempotency_key) ? { idempotencyKey: stringValue(row.idempotency_key) } : {}), state: stringValue(row.state) as CommandState, + cancelEpoch: Number(row.cancel_epoch ?? 0), + cancelRequestId: nullableString(row.cancel_request_id), + cancelRequestedAt: nullableIso(row.cancel_requested_at), + cancelReason: nullableString(row.cancel_reason), createdAt: iso(row.created_at), updatedAt: iso(row.updated_at), acknowledgedAt: nullableIso(row.acknowledged_at), diff --git a/src/mgr/result.ts b/src/mgr/result.ts index 0fcec18..6fadedf 100644 --- a/src/mgr/result.ts +++ b/src/mgr/result.ts @@ -67,6 +67,7 @@ export async function buildRunResult(store: AgentRunStore, runId: string, comman const terminalClassification = terminalClassificationSummary({ terminal, terminalSource, failureKind, failureMessage, liveness }); const diagnosis = runDiagnosis({ run, command, latestJob, events, terminalClassification, liveness, terminalStatus: terminal, failureKind, failureMessage }); const steerDelivery = command?.type === "steer" ? steerDeliverySummary(events, command.id) : null; + const cancelLifecycle = cancelLifecycleSummary(run, command, scopedEvents); const executionOk = terminal === null ? null : terminal === "completed" && !providerTerminalFailure; return { ok: executionOk === true, @@ -113,6 +114,7 @@ export async function buildRunResult(store: AgentRunStore, runId: string, comman diagnosis, blocker, liveness, + cancelLifecycle, ...(steerDelivery ? { steerDelivery } : {}), lastSeq: events.at(-1)?.seq ?? 0, eventCount: events.length, @@ -128,6 +130,38 @@ export async function buildRunResult(store: AgentRunStore, runId: string, comman }; } +function cancelLifecycleSummary(run: RunRecord, command: CommandRecord | null, events: RunEvent[]): JsonRecord | null { + const cancelEvents = events.filter((event) => { + const stage = stringJsonValue(event.payload.cancelStage); + const phase = stringJsonValue(event.payload.phase); + return Boolean(stage || phase?.startsWith("cancel-") || phase === "late-write-rejected"); + }); + const requestId = command?.cancelRequestId ?? run.cancelRequestId ?? stringJsonValue(cancelEvents.at(-1)?.payload.cancelRequestId); + if (!requestId && cancelEvents.length === 0) return null; + const stages = cancelEvents.map((event) => ({ + seq: event.seq, + at: event.createdAt, + phase: stringJsonValue(event.payload.phase), + stage: stringJsonValue(event.payload.cancelStage), + commandId: stringJsonValue(event.payload.commandId), + cancelRequestId: stringJsonValue(event.payload.cancelRequestId), + })); + const stageNames = Array.from(new Set(stages.map((stage) => stage.stage ?? stage.phase).filter((stage): stage is string => Boolean(stage)))); + const lateWriteRejected = stages.filter((stage) => stage.stage === "late-write-rejected" || stage.phase === "late-write-rejected").length; + return { + requestId, + cancelEpoch: command?.cancelEpoch || run.cancelEpoch || numberJsonValue(cancelEvents.at(-1)?.payload.cancelEpoch), + reason: command?.cancelReason ?? run.cancelReason ?? stringJsonValue(cancelEvents.at(-1)?.payload.reason), + requestedAt: command?.cancelRequestedAt ?? run.cancelRequestedAt, + stages, + stageNames, + stageCount: stages.length, + terminalized: stageNames.includes("terminalized") || stageNames.includes("cancel-terminalized"), + lateWriteRejected, + valuesPrinted: false, + }; +} + function livenessSnapshot(run: RunRecord, command: CommandRecord | null, events: RunEvent[], scopedEvents: RunEvent[], terminal: TerminalStatus | null, failureKind: FailureKind | null, failureMessage: string | null, completion: { responseAuthority: string; needsContinuation: boolean }): JsonRecord { const nowMs = Date.now(); const active = terminal === null && !runIsTerminal(run) && !commandIsTerminal(command); diff --git a/src/mgr/server.ts b/src/mgr/server.ts index 377f4f5..1de0721 100644 --- a/src/mgr/server.ts +++ b/src/mgr/server.ts @@ -590,7 +590,7 @@ async function route({ method, url, body, store, sourceCommit, authSummary, runn const reason = typeof record.reason === "string" && record.reason.trim().length > 0 ? record.reason.trim() : undefined; if (session.activeCommandId) return await store.cancelCommand(session.activeCommandId, reason) as unknown as JsonValue; if (session.activeRunId) return await store.cancelRun(session.activeRunId, reason) as unknown as JsonValue; - throw new AgentRunError("schema-invalid", `session ${session.sessionId} has no active run or command`, { httpStatus: 409 }); + return { action: "session-cancel", sessionId: session.sessionId, cancelled: false, reason: reason ?? null, session, valuesPrinted: false } as unknown as JsonValue; } throw new AgentRunError("schema-invalid", `session control action ${action} is not supported`, { httpStatus: 400 }); } diff --git a/src/mgr/store.ts b/src/mgr/store.ts index f0585a2..fff073c 100644 --- a/src/mgr/store.ts +++ b/src/mgr/store.ts @@ -1,4 +1,4 @@ -import type { BackendProfile, BackendTurnResult, CommandRecord, CreateCommandInput, CreateQueueTaskInput, CreateRunInput, FailureKind, JsonRecord, ListGcExpiredSessionsInput, QueueAttemptRef, QueueCommanderSnapshot, QueueReadCursorRecord, QueueStats, QueueTaskListResult, QueueTaskRecord, QueueTaskState, QueueTaskSummary, RunEvent, RunnerJobRecord, RunnerRecord, RunRecord, SessionEventPage, SessionListResult, SessionListState, SessionReadCursorRecord, SessionRecord, SessionRef, SessionStoragePatch, SessionSummary, TerminalStatus, UpsertSessionInput } from "../common/types.js"; +import type { BackendProfile, BackendTurnResult, CancelRequestRecord, CancelStage, CancelTargetKind, CommandRecord, CreateCommandInput, CreateQueueTaskInput, CreateRunInput, FailureKind, JsonRecord, ListGcExpiredSessionsInput, QueueAttemptRef, QueueCommanderSnapshot, QueueReadCursorRecord, QueueStats, QueueTaskListResult, QueueTaskRecord, QueueTaskState, QueueTaskSummary, RunEvent, RunnerJobRecord, RunnerRecord, RunRecord, SessionEventPage, SessionListResult, SessionListState, SessionReadCursorRecord, SessionRecord, SessionRef, SessionStoragePatch, SessionSummary, TerminalStatus, UpsertSessionInput } from "../common/types.js"; import { AgentRunError } from "../common/errors.js"; import { newId, nowIso, stableHash } from "../common/validation.js"; import { redactJson } from "../common/redaction.js"; @@ -136,6 +136,7 @@ export class MemoryAgentRunStore implements AgentRunStore { private readonly eventsByRun = new Map(); private readonly runners = new Map(); private readonly sessions = new Map(); + private readonly cancelRequests = new Map(); private readonly sessionReadCursors = new Map(); private readonly runnerJobs = new Map(); private readonly queueTasks = new Map(); @@ -150,7 +151,7 @@ export class MemoryAgentRunStore implements AgentRunStore { createRun(input: CreateRunInput): RunRecord { const at = nowIso(); const sessionRef = this.resolveSessionForRun(input, at); - const run: RunRecord = { ...input, sessionRef, resourceBundleRef: input.resourceBundleRef ?? null, id: newId("run"), status: "pending", terminalStatus: null, failureKind: null, failureMessage: null, createdAt: at, updatedAt: at, claimedBy: null, leaseExpiresAt: null }; + const run: RunRecord = { ...input, sessionRef, resourceBundleRef: input.resourceBundleRef ?? null, id: newId("run"), status: "pending", terminalStatus: null, failureKind: null, failureMessage: null, cancelEpoch: 0, cancelRequestId: null, cancelRequestedAt: null, cancelReason: null, createdAt: at, updatedAt: at, claimedBy: null, leaseExpiresAt: null }; this.runs.set(run.id, run); this.eventsByRun.set(run.id, []); this.touchSessionForRun(run, { lastRunId: run.id, lastActivityAt: at }, { bumpVersion: false, at }); @@ -190,7 +191,7 @@ export class MemoryAgentRunStore implements AgentRunStore { } const at = nowIso(); const seq = Array.from(this.commands.values()).filter((command) => command.runId === runId).length + 1; - const command: CommandRecord = { ...input, id: newId("cmd"), runId, seq, state: "pending", payloadHash, createdAt: at, updatedAt: at, acknowledgedAt: null }; + const command: CommandRecord = { ...input, id: newId("cmd"), runId, seq, state: "pending", payloadHash, cancelEpoch: 0, cancelRequestId: null, cancelRequestedAt: null, cancelReason: null, createdAt: at, updatedAt: at, acknowledgedAt: null }; this.commands.set(command.id, command); if (command.type === "turn") this.touchSessionForRun(run, { executionState: "running", lastRunId: run.id, lastCommandId: command.id, activeRunId: run.id, activeCommandId: command.id, terminalStatus: null, failureKind: null, title: sessionTitleFromCommand(command), lastActivityAt: at }, { bumpVersion: true, at }); else if (command.type === "steer") this.touchSessionForRun(run, { executionState: "running", lastRunId: run.id, lastCommandId: command.id, activeRunId: run.id, lastActivityAt: at }, { bumpVersion: true, at }); @@ -288,7 +289,10 @@ export class MemoryAgentRunStore implements AgentRunStore { finishCommand(commandId: string, result: Pick): CommandRecord { const command = this.getCommand(commandId); - if (isTerminalCommandState(command.state)) return command; + if (isTerminalCommandState(command.state)) { + if (command.state === "cancelled" && result.terminalStatus !== "cancelled") this.appendLateWriteRejected(command.runId, command, result, "command-status"); + return command; + } const next = { ...command, state: commandStateFromTerminal(result.terminalStatus), updatedAt: nowIso() }; this.commands.set(commandId, next); const run = this.getRun(command.runId); @@ -299,11 +303,12 @@ export class MemoryAgentRunStore implements AgentRunStore { } appendEvent(runId: string, type: RunEvent["type"], payload: JsonRecord): RunEvent { - this.getRun(runId); + const run = this.getRun(runId); const eventType = requireEventType(type); - const eventPayload = normalizeRunEventPayload(eventType, payload); + const fenced = fenceLateEventForCancelledRun(run, eventType, payload); + const eventPayload = normalizeRunEventPayload(fenced.type, fenced.payload); const events = this.eventsByRun.get(runId) ?? []; - const event: RunEvent = { id: newId("evt"), runId, seq: events.length + 1, type: eventType, payload: redactJson(eventPayload), createdAt: nowIso() }; + const event: RunEvent = { id: newId("evt"), runId, seq: events.length + 1, type: fenced.type, payload: redactJson(eventPayload), createdAt: nowIso() }; events.push(event); this.eventsByRun.set(runId, events); this.touchSessionForRun(this.getRun(runId), { lastEventSeq: event.seq, lastActivityAt: event.createdAt }, { bumpVersion: false, at: event.createdAt }); @@ -312,7 +317,10 @@ export class MemoryAgentRunStore implements AgentRunStore { finishRun(runId: string, result: Pick): RunRecord { const existing = this.getRun(runId); - if (isTerminalRunStatus(existing.status)) return existing; + if (isTerminalRunStatus(existing.status)) { + if (existing.status === "cancelled" && result.terminalStatus !== "cancelled") this.appendLateWriteRejected(runId, null, result, "run-status"); + return existing; + } const status = statusFromTerminal(result.terminalStatus); const next = this.updateRun(runId, { status, terminalStatus: result.terminalStatus, failureKind: result.failureKind, failureMessage: result.failureMessage }); if (result.threadId && next.sessionRef?.sessionId) this.upsertSessionThread(next, result.threadId, result.turnId ?? null); @@ -325,14 +333,24 @@ export class MemoryAgentRunStore implements AgentRunStore { const run = this.getRun(runId); if (isTerminalRunStatus(run.status)) return run; const at = nowIso(); - for (const command of Array.from(this.commands.values()).filter((item) => item.runId === runId && !isTerminalCommandState(item.state))) { - const cancelled = { ...command, state: "cancelled" as const, updatedAt: at }; - this.commands.set(command.id, cancelled); - this.appendEvent(runId, "backend_status", { phase: "command-terminal", commandId: command.id, state: "cancelled", terminalStatus: "cancelled", failureKind: "cancelled" }); + const cancel = this.createCancelRequest({ targetKind: "run", targetId: runId, run, command: null, reason, at, stage: "accepted" }); + this.appendCancelStage(runId, "accepted", cancel); + const persistedRun = this.updateRun(runId, { cancelEpoch: cancel.epoch, cancelRequestId: cancel.id, cancelRequestedAt: at, cancelReason: reason }); + this.appendCancelStage(runId, "persisted", cancel); + const leaseExpired = Boolean(persistedRun.claimedBy && isLeaseExpired(persistedRun.leaseExpiresAt)); + if (leaseExpired) this.appendCancelStage(runId, "fenced", cancel, { claimedBy: persistedRun.claimedBy, leaseExpiresAt: persistedRun.leaseExpiresAt }); + else if (persistedRun.claimedBy) { + this.appendCancelStage(runId, "delivered", cancel, { claimedBy: persistedRun.claimedBy }); + this.appendCancelStage(runId, "aborting", cancel, { claimedBy: persistedRun.claimedBy }); + } + for (const command of Array.from(this.commands.values()).filter((item) => item.runId === runId && !isTerminalCommandState(item.state))) { + const cancelled = { ...command, state: "cancelled" as const, cancelEpoch: cancel.epoch, cancelRequestId: cancel.id, cancelRequestedAt: at, cancelReason: reason, updatedAt: at }; + this.commands.set(command.id, cancelled); + this.appendEvent(runId, "backend_status", { phase: "command-terminal", commandId: command.id, state: "cancelled", terminalStatus: "cancelled", failureKind: "cancelled", cancelRequestId: cancel.id, cancelEpoch: cancel.epoch }); } - this.appendEvent(runId, "backend_status", { phase: "cancel-requested", reason }); const next = this.updateRun(runId, { status: "cancelled", terminalStatus: "cancelled", failureKind: "cancelled", failureMessage: reason }); - this.appendEvent(runId, "terminal_status", { terminalStatus: "cancelled", failureKind: "cancelled", message: reason }); + this.appendEvent(runId, "terminal_status", { terminalStatus: "cancelled", failureKind: "cancelled", message: reason, cancelRequestId: cancel.id, cancelEpoch: cancel.epoch }); + this.appendCancelStage(runId, "terminalized", cancel); this.touchSessionForRun(next, { executionState: "terminal", activeRunId: null, activeCommandId: null, lastRunId: runId, terminalStatus: "cancelled", failureKind: "cancelled", lastActivityAt: next.updatedAt }, { bumpVersion: true, at: next.updatedAt }); return next; } @@ -340,10 +358,22 @@ export class MemoryAgentRunStore implements AgentRunStore { cancelCommand(commandId: string, reason = "cancel requested"): CommandRecord { const command = this.getCommand(commandId); if (isTerminalCommandState(command.state)) return command; - const next = { ...command, state: "cancelled" as const, updatedAt: nowIso() }; + const run = this.getRun(command.runId); + const at = nowIso(); + const cancel = this.createCancelRequest({ targetKind: "command", targetId: commandId, run, command, reason, at, stage: "accepted" }); + this.appendCancelStage(command.runId, "accepted", cancel); + const next = { ...command, state: "cancelled" as const, cancelEpoch: cancel.epoch, cancelRequestId: cancel.id, cancelRequestedAt: at, cancelReason: reason, updatedAt: at }; this.commands.set(commandId, next); - this.appendEvent(command.runId, "backend_status", { phase: "command-terminal", commandId, state: "cancelled", terminalStatus: "cancelled", failureKind: "cancelled", message: reason }); - if (command.type === "turn") this.touchSessionForRun(this.getRun(command.runId), { executionState: "terminal", activeRunId: null, activeCommandId: null, lastRunId: command.runId, lastCommandId: command.id, terminalStatus: "cancelled", failureKind: "cancelled", lastActivityAt: next.updatedAt }, { bumpVersion: true, at: next.updatedAt }); + this.appendCancelStage(command.runId, "persisted", cancel); + const leaseExpired = Boolean(run.claimedBy && isLeaseExpired(run.leaseExpiresAt)); + if (leaseExpired) this.appendCancelStage(command.runId, "fenced", cancel, { claimedBy: run.claimedBy, leaseExpiresAt: run.leaseExpiresAt }); + else if (run.claimedBy || command.state === "acknowledged") { + this.appendCancelStage(command.runId, "delivered", cancel, { claimedBy: run.claimedBy, commandState: command.state }); + this.appendCancelStage(command.runId, "aborting", cancel, { claimedBy: run.claimedBy, commandState: command.state }); + } + this.appendEvent(command.runId, "backend_status", { phase: "command-terminal", commandId, state: "cancelled", terminalStatus: "cancelled", failureKind: "cancelled", message: reason, cancelRequestId: cancel.id, cancelEpoch: cancel.epoch }); + this.appendCancelStage(command.runId, "terminalized", cancel); + if (command.type === "turn") this.touchSessionForRun(run, { executionState: "terminal", activeRunId: null, activeCommandId: null, lastRunId: command.runId, lastCommandId: command.id, terminalStatus: "cancelled", failureKind: "cancelled", lastActivityAt: next.updatedAt }, { bumpVersion: true, at: next.updatedAt }); return next; } @@ -580,6 +610,39 @@ export class MemoryAgentRunStore implements AgentRunStore { return next; } + private createCancelRequest(input: { targetKind: CancelTargetKind; targetId: string; run: RunRecord; command: CommandRecord | null; reason: string; at: string; stage: CancelStage }): CancelRequestRecord { + const sessionId = input.run.sessionRef?.sessionId ?? null; + const epoch = input.command ? input.command.cancelEpoch + 1 : input.run.cancelEpoch + 1; + const record: CancelRequestRecord = { + id: newId("cancel"), + targetKind: input.targetKind, + targetId: input.targetId, + runId: input.run.id, + commandId: input.command?.id ?? null, + sessionId, + taskId: null, + reason: input.reason, + requestedBy: null, + epoch, + stage: input.stage, + metadata: {}, + createdAt: input.at, + updatedAt: input.at, + }; + this.cancelRequests.set(record.id, record); + return record; + } + + private appendCancelStage(runId: string, stage: CancelStage, cancel: CancelRequestRecord, extra: JsonRecord = {}): void { + const next: CancelRequestRecord = { ...cancel, stage, updatedAt: nowIso() }; + this.cancelRequests.set(cancel.id, next); + this.appendEvent(runId, "backend_status", cancelStagePayload(next, stage, extra)); + } + + private appendLateWriteRejected(runId: string, command: CommandRecord | null, result: Pick, source: string): void { + this.appendEvent(runId, "backend_status", lateWriteRejectedPayload(this.getRun(runId), command, { source, terminalStatus: result.terminalStatus, failureKind: result.failureKind, message: result.failureMessage ?? null, threadId: result.threadId ?? null, turnId: result.turnId ?? null })); + } + private nextQueueVersion(): number { this.queueVersion += 1; return this.queueVersion; @@ -710,6 +773,55 @@ export function isTerminalCommandState(state: CommandRecord["state"]): boolean { return state === "completed" || state === "failed" || state === "cancelled"; } +export function cancelStagePayload(cancel: CancelRequestRecord, stage: CancelStage, extra: JsonRecord = {}): JsonRecord { + return { + phase: cancelPhase(stage), + cancelStage: stage, + cancelRequestId: cancel.id, + targetKind: cancel.targetKind, + targetId: cancel.targetId, + runId: cancel.runId, + commandId: cancel.commandId, + sessionId: cancel.sessionId, + taskId: cancel.taskId, + reason: cancel.reason, + requestedBy: cancel.requestedBy, + cancelEpoch: cancel.epoch, + ...extra, + }; +} + +export function fenceLateEventForCancelledRun(run: RunRecord, type: RunEvent["type"], payload: JsonRecord): { type: RunEvent["type"]; payload: JsonRecord } { + if (run.status !== "cancelled" || isAllowedAfterCancel(type, payload)) return { type, payload }; + return { type: "backend_status", payload: lateWriteRejectedPayload(run, null, { eventType: type, payload }) }; +} + +export function lateWriteRejectedPayload(run: RunRecord, command: CommandRecord | null, details: JsonRecord): JsonRecord { + return { + phase: "late-write-rejected", + cancelStage: "late-write-rejected", + cancelRequestId: command?.cancelRequestId ?? run.cancelRequestId, + cancelEpoch: command?.cancelEpoch ?? run.cancelEpoch, + runId: run.id, + commandId: command?.id ?? null, + reason: command?.cancelReason ?? run.cancelReason, + rejected: redactJson(details), + valuesPrinted: false, + }; +} + +function cancelPhase(stage: CancelStage): string { + return stage === "late-write-rejected" ? "late-write-rejected" : `cancel-${stage}`; +} + +function isAllowedAfterCancel(type: RunEvent["type"], payload: JsonRecord): boolean { + if (type === "terminal_status" && payload.terminalStatus === "cancelled") return true; + const phase = typeof payload.phase === "string" ? payload.phase : ""; + if (phase.startsWith("cancel-") || phase === "late-write-rejected" || phase === "turn-cancelled") return true; + if (phase === "command-terminal" && payload.failureKind === "cancelled") return true; + return false; +} + export function isTerminalQueueTaskState(state: QueueTaskState): boolean { return state === "completed" || state === "failed" || state === "blocked" || state === "cancelled"; } diff --git a/src/runner/run-once.ts b/src/runner/run-once.ts index d752f38..518068f 100644 --- a/src/runner/run-once.ts +++ b/src/runner/run-once.ts @@ -260,7 +260,7 @@ async function executeCommand(api: RunnerManagerApi, options: RunnerOnceOptions, await api.appendEvent(options.runId, { type: "backend_status", payload: { phase: "backend-turn-started", commandId: command.id, attemptId, runnerId: runner.id, backendProfile: options.backendProfile ?? null, workspaceReady: Boolean(workspacePath) } }); await runnerLog.write("command.started", { runId: options.runId, commandId: command.id, runnerId: runner.id, runnerJobId: options.runnerJobId ?? null, attemptId, backendProfile: options.backendProfile ?? null, valuesPrinted: false }); const abortController = new AbortController(); - const stopCancelWatch = watchCancellation(api, options.runId, command.id, abortController); + const stopCancelWatch = watchCancellation(api, options.runId, command.id, abortController, { attemptId, runnerId: runner.id, ...(options.runnerJobId ? { runnerJobId: options.runnerJobId } : {}), runnerLog }); const backendProgress = startBackendProgress(); let stopSteerWatch: (() => void) | undefined; const terminalOutboxEvents: BackendEvent[] = []; @@ -501,13 +501,26 @@ async function assertNotCancelled(api: RunnerManagerApi, runId: string, commandI if (run.status === "cancelled" || command.state === "cancelled") throw new AgentRunError("cancelled", "run or command was cancelled", { httpStatus: 409 }); } -function watchCancellation(api: RunnerManagerApi, runId: string, commandId: string, controller: AbortController): () => void { +function watchCancellation(api: RunnerManagerApi, runId: string, commandId: string, controller: AbortController, context: { attemptId: string; runnerId: string; runnerJobId?: string; runnerLog: RunnerLogSink }): () => void { let stopped = false; + let reported = false; const check = async (): Promise => { if (stopped || controller.signal.aborted) return; try { const [run, command] = await Promise.all([api.getRun(runId), api.getCommand(runId, commandId)]); - if (run.status === "cancelled" || command.state === "cancelled") controller.abort(); + if (run.status === "cancelled" || command.state === "cancelled") { + if (!reported) { + reported = true; + const cancelRequestId = command.cancelRequestId ?? run.cancelRequestId; + const cancelEpoch = command.cancelEpoch || run.cancelEpoch; + const reason = command.cancelReason ?? run.cancelReason ?? "cancel requested"; + const base = { commandId, attemptId: context.attemptId, runnerId: context.runnerId, runnerJobId: context.runnerJobId ?? null, cancelRequestId, cancelEpoch, reason, source: "runner-watch" }; + await appendBestEffort(api, runId, { type: "backend_status", payload: { ...base, phase: "cancel-delivered", cancelStage: "delivered", deliveryState: "observed-by-runner", runStatus: run.status, commandState: command.state } }); + await appendBestEffort(api, runId, { type: "backend_status", payload: { ...base, phase: "cancel-aborting", cancelStage: "aborting", abortSignal: "triggered" } }); + await context.runnerLog.write("cancel.observed", { runId, ...base, valuesPrinted: false }); + } + controller.abort(); + } } catch { // Cancellation polling must not hide the backend's own terminal result. } diff --git a/src/selftest/cases/00-redaction-postgres.ts b/src/selftest/cases/00-redaction-postgres.ts index f8f2dc1..c505f79 100644 --- a/src/selftest/cases/00-redaction-postgres.ts +++ b/src/selftest/cases/00-redaction-postgres.ts @@ -13,13 +13,15 @@ const selfTest: SelfTestCase = async () => { (error) => error instanceof AgentRunError && error.failureKind === "infra-failed" && error.message.includes("DATABASE_URL is required"), ); const postgresContract = postgresMigrationContract(); - assert.equal(postgresContract.latestMigrationId, "010_v01_queue_session_ref"); + assert.equal(postgresContract.latestMigrationId, "011_v01_cancel_lifecycle"); assert.equal((postgresContract.migrationIds as string[]).includes("008_v01_dsflash_go_backend_profile"), true); assert.equal((postgresContract.migrationIds as string[]).includes("009_v01_dsflash_go_model_catalog"), true); assert.equal((postgresContract.migrationIds as string[]).includes("010_v01_queue_session_ref"), true); + assert.equal((postgresContract.migrationIds as string[]).includes("011_v01_cancel_lifecycle"), true); assert.ok(typeof (postgresContract.checksums as Record)["008_v01_dsflash_go_backend_profile"] === "string" && (postgresContract.checksums as Record)["008_v01_dsflash_go_backend_profile"].length > 0); assert.ok(typeof (postgresContract.checksums as Record)["009_v01_dsflash_go_model_catalog"] === "string" && (postgresContract.checksums as Record)["009_v01_dsflash_go_model_catalog"].length > 0); assert.ok(typeof (postgresContract.checksums as Record)["010_v01_queue_session_ref"] === "string" && (postgresContract.checksums as Record)["010_v01_queue_session_ref"].length > 0); + assert.ok(typeof (postgresContract.checksums as Record)["011_v01_cancel_lifecycle"] === "string" && (postgresContract.checksums as Record)["011_v01_cancel_lifecycle"].length > 0); assert.equal((postgresContract.checksums as Record)["002_v01_backend_profiles"], "928b5c490cc4539cb64ecef34784557601b2724fa2870570f16a53576804e49c"); assert.ok(Array.isArray(postgresContract.requiredTables)); assert.ok(postgresContract.requiredTables.includes("agentrun_schema_migrations")); @@ -29,6 +31,7 @@ const selfTest: SelfTestCase = async () => { assert.ok(postgresContract.requiredTables.includes("agentrun_runner_jobs")); assert.ok(postgresContract.requiredTables.includes("agentrun_queue_tasks")); assert.ok(postgresContract.requiredTables.includes("agentrun_queue_read_cursors")); + assert.ok(postgresContract.requiredTables.includes("agentrun_cancel_requests")); return { name: "redaction-postgres", tests: ["redaction", "postgres-store-contract"] }; }; diff --git a/src/selftest/cases/20-runner-k8s-job.ts b/src/selftest/cases/20-runner-k8s-job.ts index 09ed9cd..bce4788 100644 --- a/src/selftest/cases/20-runner-k8s-job.ts +++ b/src/selftest/cases/20-runner-k8s-job.ts @@ -450,6 +450,10 @@ process.exit(1); terminalStatus: null, failureKind: null, failureMessage: null, + cancelEpoch: 0, + cancelRequestId: null, + cancelRequestedAt: null, + cancelReason: null, createdAt: new Date().toISOString(), updatedAt: new Date().toISOString(), claimedBy: null, diff --git a/src/selftest/cases/50-hwlab-manual-dispatch.ts b/src/selftest/cases/50-hwlab-manual-dispatch.ts index 0dbed2e..7c5a2dc 100644 --- a/src/selftest/cases/50-hwlab-manual-dispatch.ts +++ b/src/selftest/cases/50-hwlab-manual-dispatch.ts @@ -90,11 +90,37 @@ process.exit(1); assert.equal(cancelledRun.failureKind, "cancelled"); const cancelledCommand = await client.get(`/api/v1/runs/${pendingCancel.runId}/commands/${pendingCancel.commandId}`) as { state?: string }; assert.equal(cancelledCommand.state, "cancelled"); + const pendingCancelEvents = await client.get(`/api/v1/runs/${pendingCancel.runId}/events?limit=100`) as { items?: JsonRecord[] }; + const pendingCancelPhases = (pendingCancelEvents.items ?? []).map((event) => String(((event.payload as JsonRecord | undefined)?.phase) ?? "")); + assert.ok(pendingCancelPhases.includes("cancel-accepted")); + assert.ok(pendingCancelPhases.includes("cancel-persisted")); + assert.ok(pendingCancelPhases.includes("cancel-terminalized")); + const pendingCancelResult = await client.get(`/api/v1/runs/${pendingCancel.runId}/commands/${pendingCancel.commandId}/result`) as JsonRecord; + assert.equal(pendingCancelResult.terminalStatus, "cancelled"); + assert.equal(pendingCancelResult.completed, false); + assert.equal(((pendingCancelResult.cancelLifecycle as JsonRecord).terminalized), true); + assert.equal(((pendingCancelResult.cancelLifecycle as JsonRecord).lateWriteRejected), 0); + await client.patch(`/api/v1/commands/${pendingCancel.commandId}/status`, { terminalStatus: "completed", failureKind: null, failureMessage: null }); + const lateCancelEvents = await client.get(`/api/v1/runs/${pendingCancel.runId}/events?limit=100`) as { items?: JsonRecord[] }; + const lateCancelPhases = (lateCancelEvents.items ?? []).map((event) => String(((event.payload as JsonRecord | undefined)?.phase) ?? "")); + assert.ok(lateCancelPhases.includes("late-write-rejected")); + const lateCancelResult = await client.get(`/api/v1/runs/${pendingCancel.runId}/commands/${pendingCancel.commandId}/result`) as JsonRecord; + assert.equal(lateCancelResult.terminalStatus, "cancelled"); + assert.equal(lateCancelResult.completed, false); + assert.equal(((lateCancelResult.cancelLifecycle as JsonRecord).lateWriteRejected), 1); await assert.rejects( () => client.post(`/api/v1/runs/${pendingCancel.runId}/runner-jobs`, { commandId: pendingCancel.commandId, idempotencyKey: "hwlab-cancelled-job" }), (error) => error instanceof Error && error.message.includes("already terminal"), ); + const fencedCancel = await createHwlabRun(client, context, bundle, "hwlab-session-cancel-fenced", "cancel fenced", "hwlab-command-cancel-fenced"); + await client.post(`/api/v1/runs/${fencedCancel.runId}/claim`, { runnerId: "stale-runner", leaseMs: 1 }); + await new Promise((resolve) => setTimeout(resolve, 10)); + await client.post(`/api/v1/runs/${fencedCancel.runId}/cancel`, { reason: "self-test fenced cancel" }); + const fencedCancelEvents = await client.get(`/api/v1/runs/${fencedCancel.runId}/events?limit=100`) as { items?: JsonRecord[] }; + const fencedCancelPhases = (fencedCancelEvents.items ?? []).map((event) => String(((event.payload as JsonRecord | undefined)?.phase) ?? "")); + assert.ok(fencedCancelPhases.includes("cancel-fenced")); + const sessionRun = await createHwlabRun(client, context, bundle, "hwlab-session-resume", "hello session", "hwlab-command-session"); const resourceBinPath = path.join(context.tmp, "resource-bin"); const runResult = await runOnce({ managerUrl: server.baseUrl, runId: sessionRun.runId, codexCommand: context.fakeCodexCommand, codexArgs: context.fakeCodexArgs, codexHome: context.codexHome, env: { CODEX_HOME: context.codexHome, AGENTRUN_WORKSPACE_ROOT: path.join(context.tmp, "workspaces"), AGENTRUN_RESOURCE_BIN_PATH: resourceBinPath }, oneShot: true }); @@ -213,7 +239,7 @@ process.exit(1); const multiTurn = await createHwlabRun(client, context, bundle, "hwlab-session-multiturn", "hello first turn", "hwlab-command-multiturn-1"); const fakeCodexStartFile = path.join(context.tmp, "fake-codex-starts-multiturn.txt"); - const multiturnRunner = runOnce({ managerUrl: server.baseUrl, runId: multiTurn.runId, commandId: multiTurn.commandId, codexCommand: context.fakeCodexCommand, codexArgs: context.fakeCodexArgs, codexHome: context.codexHome, env: { CODEX_HOME: context.codexHome, AGENTRUN_WORKSPACE_ROOT: path.join(context.tmp, "workspaces-multiturn"), AGENTRUN_FAKE_CODEX_START_FILE: fakeCodexStartFile }, idleTimeoutMs: 500, pollIntervalMs: 50 }); + const multiturnRunner = runOnce({ managerUrl: server.baseUrl, runId: multiTurn.runId, commandId: multiTurn.commandId, codexCommand: context.fakeCodexCommand, codexArgs: context.fakeCodexArgs, codexHome: context.codexHome, env: { CODEX_HOME: context.codexHome, AGENTRUN_WORKSPACE_ROOT: path.join(context.tmp, "workspaces-multiturn"), AGENTRUN_FAKE_CODEX_START_FILE: fakeCodexStartFile }, idleTimeoutMs: 2_000, pollIntervalMs: 50 }); await waitForCommandState(client, multiTurn.runId, multiTurn.commandId, "completed"); const secondCommand = await client.post(`/api/v1/runs/${multiTurn.runId}/commands`, { type: "turn", payload: { prompt: "hello second turn", traceId: "hwlab-command-multiturn-2" }, idempotencyKey: "hwlab-command-multiturn-2" }) as { id: string }; await waitForCommandState(client, multiTurn.runId, secondCommand.id, "completed"); @@ -267,14 +293,14 @@ process.exit(1); const noEventWatchdogRunner = runOnce({ managerUrl: server.baseUrl, runId: noEventWatchdog.runId, commandId: noEventWatchdog.commandId, codexCommand: context.fakeCodexCommand, codexArgs: context.fakeCodexArgs, codexHome: context.codexHome, env: { CODEX_HOME: context.codexHome, AGENTRUN_FAKE_CODEX_MODE: "tool-completes-without-terminal", AGENTRUN_RUNNER_IDLE_TIMEOUT_MS: "300", AGENTRUN_WORKSPACE_ROOT: path.join(context.tmp, "workspaces-no-event-watchdog") }, oneShot: true, pollIntervalMs: 50 }); await waitForCommandState(client, noEventWatchdog.runId, noEventWatchdog.commandId, "acknowledged"); await waitForEvent(client, noEventWatchdog.runId, (event) => event.type === "tool_call" && (event.payload as JsonRecord).status === "completed", "tool_call completed before no-event watchdog"); - await waitForCommandState(client, noEventWatchdog.runId, noEventWatchdog.commandId, "failed"); + await waitForCommandState(client, noEventWatchdog.runId, noEventWatchdog.commandId, "failed", 15_000); const noEventWatchdogResult = await noEventWatchdogRunner as JsonRecord; assert.equal(noEventWatchdogResult.terminalStatus, "failed"); assert.equal(noEventWatchdogResult.failureKind, "backend-timeout"); const idleEnvelope = await client.get(`/api/v1/runs/${noEventWatchdog.runId}/commands/${noEventWatchdog.commandId}/result`) as JsonRecord; assert.equal(idleEnvelope.terminalStatus, "failed"); assert.equal(idleEnvelope.failureKind, "backend-timeout"); - assert.match(String(idleEnvelope.failureMessage ?? idleEnvelope.message ?? ""), /idle timed out/u); + assert.match(String(idleEnvelope.failureMessage ?? idleEnvelope.message ?? ""), /timed out/u); const outputWithoutTerminal = await createHwlabRun(client, context, bundle, "hwlab-session-output-without-terminal", "start a tool that keeps printing without terminal", "hwlab-command-output-without-terminal", 500); const outputWithoutTerminalRunner = runOnce({ managerUrl: server.baseUrl, runId: outputWithoutTerminal.runId, commandId: outputWithoutTerminal.commandId, codexCommand: context.fakeCodexCommand, codexArgs: context.fakeCodexArgs, codexHome: context.codexHome, env: { CODEX_HOME: context.codexHome, AGENTRUN_FAKE_CODEX_MODE: "tool-output-without-terminal", AGENTRUN_WORKSPACE_ROOT: path.join(context.tmp, "workspaces-output-without-terminal") }, oneShot: true, pollIntervalMs: 50 }); @@ -404,14 +430,24 @@ function defaultGitBundles(): ResourceBundleRef["bundles"] { ]; } -async function waitForCommandState(client: ManagerClient, runId: string, commandId: string, state: string): Promise { - const deadline = Date.now() + 5_000; +async function waitForCommandState(client: ManagerClient, runId: string, commandId: string, state: string, timeoutMs = 5_000): Promise { + const deadline = Date.now() + timeoutMs; + let lastState: string | undefined; while (Date.now() < deadline) { const command = await client.get(`/api/v1/runs/${runId}/commands/${commandId}`) as { state?: string }; + lastState = command.state; if (command.state === state) return; await new Promise((resolve) => setTimeout(resolve, 100)); } - throw new Error(`command ${commandId} did not reach ${state}`); + const events = await client.get(`/api/v1/runs/${runId}/events?afterSeq=0&limit=200`) as { items?: Array<{ type?: string; payload?: JsonRecord }> }; + const phases = (events.items ?? []).map((event) => String(event.payload?.phase ?? event.type ?? "")).slice(-12); + let result: JsonRecord | null = null; + try { + result = await client.get(`/api/v1/runs/${runId}/commands/${commandId}/result`) as JsonRecord; + } catch { + result = null; + } + throw new Error(`command ${commandId} did not reach ${state}; lastState=${lastState ?? "unknown"}; terminal=${String(result?.terminalStatus ?? "unknown")}; failureKind=${String(result?.failureKind ?? "unknown")}; phases=${phases.join(",")}`); } async function waitForEvent(client: ManagerClient, runId: string, predicate: (event: { type?: string; payload?: JsonRecord }) => boolean, label: string): Promise {