fix: 补齐 cancel lifecycle 运行时治理 (#245)
This commit is contained in:
@@ -34,6 +34,8 @@ export type FailureKind =
|
||||
export type RunStatus = "pending" | "claimed" | "running" | "completed" | "failed" | "blocked" | "cancelled";
|
||||
export type CommandState = "pending" | "acknowledged" | "completed" | "failed" | "cancelled";
|
||||
export type TerminalStatus = "completed" | "failed" | "blocked" | "cancelled";
|
||||
export type CancelTargetKind = "task" | "session" | "run" | "command";
|
||||
export type CancelStage = "accepted" | "persisted" | "delivered" | "aborting" | "terminalized" | "fenced" | "late-write-rejected";
|
||||
export type BackendProfile = string;
|
||||
export type QueueTaskState = "pending" | "running" | "completed" | "failed" | "blocked" | "cancelled";
|
||||
export type QueueTaskAttentionState = "active" | "unread" | "read";
|
||||
@@ -213,6 +215,10 @@ export interface RunRecord extends CreateRunInput {
|
||||
terminalStatus: TerminalStatus | null;
|
||||
failureKind: FailureKind | null;
|
||||
failureMessage: string | null;
|
||||
cancelEpoch: number;
|
||||
cancelRequestId: string | null;
|
||||
cancelRequestedAt: string | null;
|
||||
cancelReason: string | null;
|
||||
createdAt: string;
|
||||
updatedAt: string;
|
||||
claimedBy: string | null;
|
||||
@@ -233,11 +239,32 @@ export interface CommandRecord extends CreateCommandInput {
|
||||
seq: number;
|
||||
state: CommandState;
|
||||
payloadHash: string;
|
||||
cancelEpoch: number;
|
||||
cancelRequestId: string | null;
|
||||
cancelRequestedAt: string | null;
|
||||
cancelReason: string | null;
|
||||
createdAt: string;
|
||||
updatedAt: string;
|
||||
acknowledgedAt: string | null;
|
||||
}
|
||||
|
||||
export interface CancelRequestRecord extends JsonRecord {
|
||||
id: string;
|
||||
targetKind: CancelTargetKind;
|
||||
targetId: string;
|
||||
runId: string | null;
|
||||
commandId: string | null;
|
||||
sessionId: string | null;
|
||||
taskId: string | null;
|
||||
reason: string;
|
||||
requestedBy: string | null;
|
||||
epoch: number;
|
||||
stage: CancelStage;
|
||||
metadata: JsonRecord;
|
||||
createdAt: string;
|
||||
updatedAt: string;
|
||||
}
|
||||
|
||||
export type EventType = "backend_status" | "assistant_message" | "tool_call" | "command_output" | "diff" | "error" | "terminal_status";
|
||||
|
||||
export interface RunEvent extends JsonRecord {
|
||||
|
||||
+133
-17
@@ -3,10 +3,10 @@ import { Pool } from "pg";
|
||||
import type { PoolClient, QueryResultRow } from "pg";
|
||||
import { AgentRunError } from "../common/errors.js";
|
||||
import { redactJson } from "../common/redaction.js";
|
||||
import type { BackendProfile, BackendTurnResult, CommandRecord, CommandState, CreateCommandInput, CreateQueueTaskInput, CreateRunInput, EventType, FailureKind, JsonRecord, JsonValue, ListGcExpiredSessionsInput, QueueCommanderSnapshot, QueueReadCursorRecord, QueueStats, QueueTaskListResult, QueueTaskRecord, QueueTaskState, RunEvent, RunnerJobRecord, RunnerRecord, RunRecord, RunStatus, SessionEventPage, SessionListResult, SessionReadCursorRecord, SessionRecord, SessionRef, SessionStoragePatch, SessionSummary, TerminalStatus, UpsertSessionInput } from "../common/types.js";
|
||||
import type { BackendProfile, BackendTurnResult, CancelRequestRecord, CancelStage, CancelTargetKind, CommandRecord, CommandState, CreateCommandInput, CreateQueueTaskInput, CreateRunInput, EventType, FailureKind, JsonRecord, JsonValue, ListGcExpiredSessionsInput, QueueCommanderSnapshot, QueueReadCursorRecord, QueueStats, QueueTaskListResult, QueueTaskRecord, QueueTaskState, RunEvent, RunnerJobRecord, RunnerRecord, RunRecord, RunStatus, SessionEventPage, SessionListResult, SessionReadCursorRecord, SessionRecord, SessionRef, SessionStoragePatch, SessionSummary, TerminalStatus, UpsertSessionInput } from "../common/types.js";
|
||||
import { newId, nowIso, stableHash } from "../common/validation.js";
|
||||
import type { AgentRunStore, ListQueueTasksInput, ListSessionsInput, SaveRunnerJobInput, SessionEventPageInput, StoreHealth, UpdateQueueTaskAttemptInput } from "./store.js";
|
||||
import { assertSessionBoundary, buildQueueStats, buildQueueTaskSummary, buildSessionSummary, clampQueueLimit, clampSessionLimit, commandStateFromTerminal, isLeaseExpired, isSessionOutputEvent, isTerminalCommandState, isTerminalQueueTaskState, isTerminalRunStatus, parseQueueCursor, parseSessionCursor, queueTaskMatchesCommander, queueTaskPayloadHash, queueTaskSort, sessionListFilters, sessionMatchesListState, sessionRefFromRecord, sessionSort, sessionTitleFromCommand, statusFromTerminal, summarizeResourceBundleRef, summarizeSessionRef, titleFromMetadata } from "./store.js";
|
||||
import { assertSessionBoundary, buildQueueStats, buildQueueTaskSummary, buildSessionSummary, cancelStagePayload, clampQueueLimit, clampSessionLimit, commandStateFromTerminal, fenceLateEventForCancelledRun, isLeaseExpired, isSessionOutputEvent, isTerminalCommandState, isTerminalQueueTaskState, isTerminalRunStatus, lateWriteRejectedPayload, parseQueueCursor, parseSessionCursor, queueTaskMatchesCommander, queueTaskPayloadHash, queueTaskSort, sessionListFilters, sessionMatchesListState, sessionRefFromRecord, sessionSort, sessionTitleFromCommand, statusFromTerminal, summarizeResourceBundleRef, summarizeSessionRef, titleFromMetadata } from "./store.js";
|
||||
import { backendCapabilitiesSqlValues, mergeBackendCapability } from "../common/backend-profiles.js";
|
||||
import { normalizeRunEventPayload, requireEventType } from "../common/events.js";
|
||||
|
||||
@@ -335,6 +335,39 @@ ALTER TABLE agentrun_queue_tasks
|
||||
ADD COLUMN IF NOT EXISTS session_ref jsonb;
|
||||
`;
|
||||
|
||||
const cancelLifecycleMigrationSql = `
|
||||
ALTER TABLE agentrun_runs ADD COLUMN IF NOT EXISTS cancel_epoch integer NOT NULL DEFAULT 0;
|
||||
ALTER TABLE agentrun_runs ADD COLUMN IF NOT EXISTS cancel_request_id text;
|
||||
ALTER TABLE agentrun_runs ADD COLUMN IF NOT EXISTS cancel_requested_at timestamptz;
|
||||
ALTER TABLE agentrun_runs ADD COLUMN IF NOT EXISTS cancel_reason text;
|
||||
|
||||
ALTER TABLE agentrun_commands ADD COLUMN IF NOT EXISTS cancel_epoch integer NOT NULL DEFAULT 0;
|
||||
ALTER TABLE agentrun_commands ADD COLUMN IF NOT EXISTS cancel_request_id text;
|
||||
ALTER TABLE agentrun_commands ADD COLUMN IF NOT EXISTS cancel_requested_at timestamptz;
|
||||
ALTER TABLE agentrun_commands ADD COLUMN IF NOT EXISTS cancel_reason text;
|
||||
|
||||
CREATE TABLE IF NOT EXISTS agentrun_cancel_requests (
|
||||
id text PRIMARY KEY,
|
||||
target_kind text NOT NULL,
|
||||
target_id text NOT NULL,
|
||||
run_id text REFERENCES agentrun_runs(id) ON DELETE CASCADE,
|
||||
command_id text REFERENCES agentrun_commands(id) ON DELETE SET NULL,
|
||||
session_id text,
|
||||
task_id text,
|
||||
reason text NOT NULL,
|
||||
requested_by text,
|
||||
epoch integer NOT NULL,
|
||||
stage text NOT NULL,
|
||||
metadata jsonb NOT NULL DEFAULT '{}'::jsonb,
|
||||
created_at timestamptz NOT NULL,
|
||||
updated_at timestamptz NOT NULL
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS agentrun_cancel_requests_target_idx ON agentrun_cancel_requests (target_kind, target_id, created_at DESC);
|
||||
CREATE INDEX IF NOT EXISTS agentrun_cancel_requests_run_idx ON agentrun_cancel_requests (run_id, created_at DESC);
|
||||
CREATE INDEX IF NOT EXISTS agentrun_cancel_requests_command_idx ON agentrun_cancel_requests (command_id, created_at DESC);
|
||||
`;
|
||||
|
||||
const postgresMigrations: MigrationDefinition[] = [
|
||||
{
|
||||
id: "001_v01_initial_durable_store",
|
||||
@@ -386,13 +419,18 @@ const postgresMigrations: MigrationDefinition[] = [
|
||||
checksum: checksumSql(queueSessionRefMigrationSql),
|
||||
sql: queueSessionRefMigrationSql,
|
||||
},
|
||||
{
|
||||
id: "011_v01_cancel_lifecycle",
|
||||
checksum: checksumSql(cancelLifecycleMigrationSql),
|
||||
sql: cancelLifecycleMigrationSql,
|
||||
},
|
||||
];
|
||||
|
||||
export function postgresMigrationContract(): JsonRecord {
|
||||
return {
|
||||
migrationIds: postgresMigrations.map((migration) => migration.id),
|
||||
latestMigrationId: latestMigrationId(),
|
||||
requiredTables: ["agentrun_schema_migrations", "agentrun_runs", "agentrun_commands", "agentrun_events", "agentrun_runners", "agentrun_backends", "agentrun_leases", "agentrun_sessions", "agentrun_session_read_cursors", "agentrun_runner_jobs", "agentrun_queue_tasks", "agentrun_queue_read_cursors"],
|
||||
requiredTables: ["agentrun_schema_migrations", "agentrun_runs", "agentrun_commands", "agentrun_events", "agentrun_runners", "agentrun_backends", "agentrun_leases", "agentrun_sessions", "agentrun_session_read_cursors", "agentrun_runner_jobs", "agentrun_queue_tasks", "agentrun_queue_read_cursors", "agentrun_cancel_requests"],
|
||||
checksums: Object.fromEntries(postgresMigrations.map((migration) => [migration.id, migration.checksum])),
|
||||
};
|
||||
}
|
||||
@@ -452,7 +490,7 @@ CREATE TABLE IF NOT EXISTS agentrun_schema_migrations (
|
||||
const at = nowIso();
|
||||
return this.withTransaction(async (client) => {
|
||||
const sessionRef = await this.resolveSessionForRun(client, input, at);
|
||||
const run: RunRecord = { ...input, sessionRef, resourceBundleRef: input.resourceBundleRef ?? null, id: newId("run"), status: "pending", terminalStatus: null, failureKind: null, failureMessage: null, createdAt: at, updatedAt: at, claimedBy: null, leaseExpiresAt: null };
|
||||
const run: RunRecord = { ...input, sessionRef, resourceBundleRef: input.resourceBundleRef ?? null, id: newId("run"), status: "pending", terminalStatus: null, failureKind: null, failureMessage: null, cancelEpoch: 0, cancelRequestId: null, cancelRequestedAt: null, cancelReason: null, createdAt: at, updatedAt: at, claimedBy: null, leaseExpiresAt: null };
|
||||
await client.query(
|
||||
`INSERT INTO agentrun_runs (id, tenant_id, project_id, workspace_ref, session_ref, resource_bundle_ref, provider_id, backend_profile, execution_policy, trace_sink, status, terminal_status, failure_kind, failure_message, created_at, updated_at, claimed_by, lease_expires_at)
|
||||
VALUES ($1, $2, $3, $4::jsonb, $5::jsonb, $6::jsonb, $7, $8, $9::jsonb, $10::jsonb, $11, $12, $13, $14, $15, $16, $17, $18)`,
|
||||
@@ -509,7 +547,7 @@ CREATE TABLE IF NOT EXISTS agentrun_schema_migrations (
|
||||
}
|
||||
const seq = await this.nextSeq(client, "agentrun_commands", runId);
|
||||
const at = nowIso();
|
||||
const command: CommandRecord = { ...input, id: newId("cmd"), runId, seq, state: "pending", payloadHash, createdAt: at, updatedAt: at, acknowledgedAt: null };
|
||||
const command: CommandRecord = { ...input, id: newId("cmd"), runId, seq, state: "pending", payloadHash, cancelEpoch: 0, cancelRequestId: null, cancelRequestedAt: null, cancelReason: null, createdAt: at, updatedAt: at, acknowledgedAt: null };
|
||||
await client.query(
|
||||
`INSERT INTO agentrun_commands (id, run_id, seq, type, payload, payload_hash, idempotency_key, state, created_at, updated_at, acknowledged_at)
|
||||
VALUES ($1, $2, $3, $4, $5::jsonb, $6, $7, $8, $9, $10, $11)`,
|
||||
@@ -684,7 +722,13 @@ CREATE TABLE IF NOT EXISTS agentrun_schema_migrations (
|
||||
const row = existing.rows[0];
|
||||
if (!row) throw new AgentRunError("schema-invalid", `command ${commandId} was not found`, { httpStatus: 404 });
|
||||
const command = commandFromRow(row);
|
||||
if (isTerminalCommandState(command.state)) return command;
|
||||
if (isTerminalCommandState(command.state)) {
|
||||
if (command.state === "cancelled" && result.terminalStatus !== "cancelled") {
|
||||
const run = await this.requireRunForUpdate(client, command.runId);
|
||||
await this.appendEventWithLockedRun(client, command.runId, "backend_status", lateWriteRejectedPayload(run, command, { source: "command-status", terminalStatus: result.terminalStatus, failureKind: result.failureKind, message: result.failureMessage ?? null, threadId: result.threadId ?? null, turnId: result.turnId ?? null }));
|
||||
}
|
||||
return command;
|
||||
}
|
||||
const state = commandStateFromTerminal(result.terminalStatus);
|
||||
const updated = await client.query("UPDATE agentrun_commands SET state = $2, updated_at = $3 WHERE id = $1 RETURNING *", [commandId, state, nowIso()]);
|
||||
const run = await this.requireRunForUpdate(client, command.runId);
|
||||
@@ -705,7 +749,10 @@ CREATE TABLE IF NOT EXISTS agentrun_schema_migrations (
|
||||
async finishRun(runId: string, result: Pick<BackendTurnResult, "terminalStatus" | "failureKind" | "failureMessage" | "threadId" | "turnId">): Promise<RunRecord> {
|
||||
return this.withTransaction(async (client) => {
|
||||
const existing = await this.requireRunForUpdate(client, runId);
|
||||
if (isTerminalRunStatus(existing.status)) return existing;
|
||||
if (isTerminalRunStatus(existing.status)) {
|
||||
if (existing.status === "cancelled" && result.terminalStatus !== "cancelled") await this.appendEventWithLockedRun(client, runId, "backend_status", lateWriteRejectedPayload(existing, null, { source: "run-status", terminalStatus: result.terminalStatus, failureKind: result.failureKind, message: result.failureMessage ?? null, threadId: result.threadId ?? null, turnId: result.turnId ?? null }));
|
||||
return existing;
|
||||
}
|
||||
const status = statusFromTerminal(result.terminalStatus);
|
||||
const updated = await client.query(
|
||||
`UPDATE agentrun_runs SET status = $2, terminal_status = $3, failure_kind = $4, failure_message = $5, updated_at = $6 WHERE id = $1 RETURNING *`,
|
||||
@@ -724,18 +771,32 @@ CREATE TABLE IF NOT EXISTS agentrun_schema_migrations (
|
||||
const run = await this.requireRunForUpdate(client, runId);
|
||||
if (isTerminalRunStatus(run.status)) return run;
|
||||
const at = nowIso();
|
||||
const cancel = await this.createCancelRequest(client, { targetKind: "run", targetId: runId, run, command: null, reason, at, stage: "accepted" });
|
||||
await this.appendCancelStage(client, runId, "accepted", cancel);
|
||||
const persistedRunResult = await client.query(
|
||||
`UPDATE agentrun_runs SET cancel_epoch = $2, cancel_request_id = $3, cancel_requested_at = $4, cancel_reason = $5, updated_at = $4 WHERE id = $1 RETURNING *`,
|
||||
[runId, cancel.epoch, cancel.id, at, reason],
|
||||
);
|
||||
const persistedRun = runFromRow(persistedRunResult.rows[0]);
|
||||
await this.appendCancelStage(client, runId, "persisted", cancel);
|
||||
const leaseExpired = Boolean(persistedRun.claimedBy && isLeaseExpired(persistedRun.leaseExpiresAt));
|
||||
if (leaseExpired) await this.appendCancelStage(client, runId, "fenced", cancel, { claimedBy: persistedRun.claimedBy, leaseExpiresAt: persistedRun.leaseExpiresAt });
|
||||
else if (persistedRun.claimedBy) {
|
||||
await this.appendCancelStage(client, runId, "delivered", cancel, { claimedBy: persistedRun.claimedBy });
|
||||
await this.appendCancelStage(client, runId, "aborting", cancel, { claimedBy: persistedRun.claimedBy });
|
||||
}
|
||||
const commands = await client.query("SELECT * FROM agentrun_commands WHERE run_id = $1 AND state NOT IN ('completed', 'failed', 'cancelled') FOR UPDATE", [runId]);
|
||||
for (const row of commands.rows) {
|
||||
const command = commandFromRow(row);
|
||||
await client.query("UPDATE agentrun_commands SET state = 'cancelled', updated_at = $2 WHERE id = $1", [command.id, at]);
|
||||
await this.appendEventWithLockedRun(client, runId, "backend_status", { phase: "command-terminal", commandId: command.id, state: "cancelled", terminalStatus: "cancelled", failureKind: "cancelled" });
|
||||
await client.query("UPDATE agentrun_commands SET state = 'cancelled', cancel_epoch = $2, cancel_request_id = $3, cancel_requested_at = $4, cancel_reason = $5, updated_at = $4 WHERE id = $1", [command.id, cancel.epoch, cancel.id, at, reason]);
|
||||
await this.appendEventWithLockedRun(client, runId, "backend_status", { phase: "command-terminal", commandId: command.id, state: "cancelled", terminalStatus: "cancelled", failureKind: "cancelled", cancelRequestId: cancel.id, cancelEpoch: cancel.epoch });
|
||||
}
|
||||
await this.appendEventWithLockedRun(client, runId, "backend_status", { phase: "cancel-requested", reason });
|
||||
const updated = await client.query(
|
||||
`UPDATE agentrun_runs SET status = 'cancelled', terminal_status = 'cancelled', failure_kind = 'cancelled', failure_message = $2, updated_at = $3 WHERE id = $1 RETURNING *`,
|
||||
[runId, reason, at],
|
||||
);
|
||||
await this.appendEventWithLockedRun(client, runId, "terminal_status", { terminalStatus: "cancelled", failureKind: "cancelled", message: reason });
|
||||
await this.appendEventWithLockedRun(client, runId, "terminal_status", { terminalStatus: "cancelled", failureKind: "cancelled", message: reason, cancelRequestId: cancel.id, cancelEpoch: cancel.epoch });
|
||||
await this.appendCancelStage(client, runId, "terminalized", cancel);
|
||||
const next = runFromRow(updated.rows[0]);
|
||||
await this.touchSessionForRun(client, next, { executionState: "terminal", activeRunId: null, activeCommandId: null, lastRunId: runId, terminalStatus: "cancelled", failureKind: "cancelled", lastActivityAt: at }, { bumpVersion: true, at });
|
||||
return next;
|
||||
@@ -749,11 +810,22 @@ CREATE TABLE IF NOT EXISTS agentrun_schema_migrations (
|
||||
if (!row) throw new AgentRunError("schema-invalid", `command ${commandId} was not found`, { httpStatus: 404 });
|
||||
const command = commandFromRow(row);
|
||||
if (isTerminalCommandState(command.state)) return command;
|
||||
const updated = await client.query("UPDATE agentrun_commands SET state = 'cancelled', updated_at = $2 WHERE id = $1 RETURNING *", [commandId, nowIso()]);
|
||||
await this.appendEventWithLockedRun(client, command.runId, "backend_status", { phase: "command-terminal", commandId, state: "cancelled", terminalStatus: "cancelled", failureKind: "cancelled", message: reason });
|
||||
const run = await this.requireRunForUpdate(client, command.runId);
|
||||
const at = nowIso();
|
||||
const cancel = await this.createCancelRequest(client, { targetKind: "command", targetId: commandId, run, command, reason, at, stage: "accepted" });
|
||||
await this.appendCancelStage(client, command.runId, "accepted", cancel);
|
||||
const updated = await client.query("UPDATE agentrun_commands SET state = 'cancelled', cancel_epoch = $2, cancel_request_id = $3, cancel_requested_at = $4, cancel_reason = $5, updated_at = $4 WHERE id = $1 RETURNING *", [commandId, cancel.epoch, cancel.id, at, reason]);
|
||||
await this.appendCancelStage(client, command.runId, "persisted", cancel);
|
||||
const leaseExpired = Boolean(run.claimedBy && isLeaseExpired(run.leaseExpiresAt));
|
||||
if (leaseExpired) await this.appendCancelStage(client, command.runId, "fenced", cancel, { claimedBy: run.claimedBy, leaseExpiresAt: run.leaseExpiresAt });
|
||||
else if (run.claimedBy || command.state === "acknowledged") {
|
||||
await this.appendCancelStage(client, command.runId, "delivered", cancel, { claimedBy: run.claimedBy, commandState: command.state });
|
||||
await this.appendCancelStage(client, command.runId, "aborting", cancel, { claimedBy: run.claimedBy, commandState: command.state });
|
||||
}
|
||||
await this.appendEventWithLockedRun(client, command.runId, "backend_status", { phase: "command-terminal", commandId, state: "cancelled", terminalStatus: "cancelled", failureKind: "cancelled", message: reason, cancelRequestId: cancel.id, cancelEpoch: cancel.epoch });
|
||||
await this.appendCancelStage(client, command.runId, "terminalized", cancel);
|
||||
if (command.type === "turn") {
|
||||
const run = await this.requireRunForUpdate(client, command.runId);
|
||||
await this.touchSessionForRun(client, run, { executionState: "terminal", activeRunId: null, activeCommandId: null, lastRunId: command.runId, lastCommandId: command.id, terminalStatus: "cancelled", failureKind: "cancelled", lastActivityAt: nowIso() }, { bumpVersion: true });
|
||||
await this.touchSessionForRun(client, run, { executionState: "terminal", activeRunId: null, activeCommandId: null, lastRunId: command.runId, lastCommandId: command.id, terminalStatus: "cancelled", failureKind: "cancelled", lastActivityAt: at }, { bumpVersion: true, at });
|
||||
}
|
||||
return commandFromRow(updated.rows[0]);
|
||||
});
|
||||
@@ -1094,11 +1166,47 @@ CREATE TABLE IF NOT EXISTS agentrun_schema_migrations (
|
||||
await this.pool.end();
|
||||
}
|
||||
|
||||
private async createCancelRequest(client: PoolClient, input: { targetKind: CancelTargetKind; targetId: string; run: RunRecord; command: CommandRecord | null; reason: string; at: string; stage: CancelStage }): Promise<CancelRequestRecord> {
|
||||
const epoch = input.command ? input.command.cancelEpoch + 1 : input.run.cancelEpoch + 1;
|
||||
const record: CancelRequestRecord = {
|
||||
id: newId("cancel"),
|
||||
targetKind: input.targetKind,
|
||||
targetId: input.targetId,
|
||||
runId: input.run.id,
|
||||
commandId: input.command?.id ?? null,
|
||||
sessionId: input.run.sessionRef?.sessionId ?? null,
|
||||
taskId: null,
|
||||
reason: input.reason,
|
||||
requestedBy: null,
|
||||
epoch,
|
||||
stage: input.stage,
|
||||
metadata: {},
|
||||
createdAt: input.at,
|
||||
updatedAt: input.at,
|
||||
};
|
||||
await client.query(
|
||||
`INSERT INTO agentrun_cancel_requests (id, target_kind, target_id, run_id, command_id, session_id, task_id, reason, requested_by, epoch, stage, metadata, created_at, updated_at)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12::jsonb, $13, $14)`,
|
||||
[record.id, record.targetKind, record.targetId, record.runId, record.commandId, record.sessionId, record.taskId, record.reason, record.requestedBy, record.epoch, record.stage, JSON.stringify(record.metadata), record.createdAt, record.updatedAt],
|
||||
);
|
||||
return record;
|
||||
}
|
||||
|
||||
private async appendCancelStage(client: PoolClient, runId: string, stage: CancelStage, cancel: CancelRequestRecord, extra: JsonRecord = {}): Promise<CancelRequestRecord> {
|
||||
const at = nowIso();
|
||||
const next: CancelRequestRecord = { ...cancel, stage, updatedAt: at };
|
||||
await client.query("UPDATE agentrun_cancel_requests SET stage = $2, updated_at = $3 WHERE id = $1", [cancel.id, stage, at]);
|
||||
await this.appendEventWithLockedRun(client, runId, "backend_status", cancelStagePayload(next, stage, extra));
|
||||
return next;
|
||||
}
|
||||
|
||||
private async appendEventWithLockedRun(client: PoolClient, runId: string, type: EventType, payload: JsonRecord): Promise<RunEvent> {
|
||||
const run = await this.requireRunForUpdate(client, runId);
|
||||
const eventType = requireEventType(type);
|
||||
const eventPayload = normalizeRunEventPayload(eventType, payload);
|
||||
const fenced = fenceLateEventForCancelledRun(run, eventType, payload);
|
||||
const eventPayload = normalizeRunEventPayload(fenced.type, fenced.payload);
|
||||
const seq = await this.nextSeq(client, "agentrun_events", runId);
|
||||
const event: RunEvent = { id: newId("evt"), runId, seq, type: eventType, payload: redactJson(eventPayload), createdAt: nowIso() };
|
||||
const event: RunEvent = { id: newId("evt"), runId, seq, type: fenced.type, payload: redactJson(eventPayload), createdAt: nowIso() };
|
||||
await client.query("INSERT INTO agentrun_events (id, run_id, seq, type, payload, created_at) VALUES ($1, $2, $3, $4, $5::jsonb, $6)", [event.id, event.runId, event.seq, event.type, JSON.stringify(event.payload), event.createdAt]);
|
||||
await client.query(
|
||||
`UPDATE agentrun_sessions
|
||||
@@ -1362,6 +1470,10 @@ function runFromRow(row: QueryResultRow): RunRecord {
|
||||
terminalStatus: nullableString(row.terminal_status) as TerminalStatus | null,
|
||||
failureKind: nullableString(row.failure_kind) as FailureKind | null,
|
||||
failureMessage: nullableString(row.failure_message),
|
||||
cancelEpoch: Number(row.cancel_epoch ?? 0),
|
||||
cancelRequestId: nullableString(row.cancel_request_id),
|
||||
cancelRequestedAt: nullableIso(row.cancel_requested_at),
|
||||
cancelReason: nullableString(row.cancel_reason),
|
||||
createdAt: iso(row.created_at),
|
||||
updatedAt: iso(row.updated_at),
|
||||
claimedBy: nullableString(row.claimed_by),
|
||||
@@ -1379,6 +1491,10 @@ function commandFromRow(row: QueryResultRow): CommandRecord {
|
||||
payloadHash: stringValue(row.payload_hash),
|
||||
...(nullableString(row.idempotency_key) ? { idempotencyKey: stringValue(row.idempotency_key) } : {}),
|
||||
state: stringValue(row.state) as CommandState,
|
||||
cancelEpoch: Number(row.cancel_epoch ?? 0),
|
||||
cancelRequestId: nullableString(row.cancel_request_id),
|
||||
cancelRequestedAt: nullableIso(row.cancel_requested_at),
|
||||
cancelReason: nullableString(row.cancel_reason),
|
||||
createdAt: iso(row.created_at),
|
||||
updatedAt: iso(row.updated_at),
|
||||
acknowledgedAt: nullableIso(row.acknowledged_at),
|
||||
|
||||
@@ -67,6 +67,7 @@ export async function buildRunResult(store: AgentRunStore, runId: string, comman
|
||||
const terminalClassification = terminalClassificationSummary({ terminal, terminalSource, failureKind, failureMessage, liveness });
|
||||
const diagnosis = runDiagnosis({ run, command, latestJob, events, terminalClassification, liveness, terminalStatus: terminal, failureKind, failureMessage });
|
||||
const steerDelivery = command?.type === "steer" ? steerDeliverySummary(events, command.id) : null;
|
||||
const cancelLifecycle = cancelLifecycleSummary(run, command, scopedEvents);
|
||||
const executionOk = terminal === null ? null : terminal === "completed" && !providerTerminalFailure;
|
||||
return {
|
||||
ok: executionOk === true,
|
||||
@@ -113,6 +114,7 @@ export async function buildRunResult(store: AgentRunStore, runId: string, comman
|
||||
diagnosis,
|
||||
blocker,
|
||||
liveness,
|
||||
cancelLifecycle,
|
||||
...(steerDelivery ? { steerDelivery } : {}),
|
||||
lastSeq: events.at(-1)?.seq ?? 0,
|
||||
eventCount: events.length,
|
||||
@@ -128,6 +130,38 @@ export async function buildRunResult(store: AgentRunStore, runId: string, comman
|
||||
};
|
||||
}
|
||||
|
||||
function cancelLifecycleSummary(run: RunRecord, command: CommandRecord | null, events: RunEvent[]): JsonRecord | null {
|
||||
const cancelEvents = events.filter((event) => {
|
||||
const stage = stringJsonValue(event.payload.cancelStage);
|
||||
const phase = stringJsonValue(event.payload.phase);
|
||||
return Boolean(stage || phase?.startsWith("cancel-") || phase === "late-write-rejected");
|
||||
});
|
||||
const requestId = command?.cancelRequestId ?? run.cancelRequestId ?? stringJsonValue(cancelEvents.at(-1)?.payload.cancelRequestId);
|
||||
if (!requestId && cancelEvents.length === 0) return null;
|
||||
const stages = cancelEvents.map((event) => ({
|
||||
seq: event.seq,
|
||||
at: event.createdAt,
|
||||
phase: stringJsonValue(event.payload.phase),
|
||||
stage: stringJsonValue(event.payload.cancelStage),
|
||||
commandId: stringJsonValue(event.payload.commandId),
|
||||
cancelRequestId: stringJsonValue(event.payload.cancelRequestId),
|
||||
}));
|
||||
const stageNames = Array.from(new Set(stages.map((stage) => stage.stage ?? stage.phase).filter((stage): stage is string => Boolean(stage))));
|
||||
const lateWriteRejected = stages.filter((stage) => stage.stage === "late-write-rejected" || stage.phase === "late-write-rejected").length;
|
||||
return {
|
||||
requestId,
|
||||
cancelEpoch: command?.cancelEpoch || run.cancelEpoch || numberJsonValue(cancelEvents.at(-1)?.payload.cancelEpoch),
|
||||
reason: command?.cancelReason ?? run.cancelReason ?? stringJsonValue(cancelEvents.at(-1)?.payload.reason),
|
||||
requestedAt: command?.cancelRequestedAt ?? run.cancelRequestedAt,
|
||||
stages,
|
||||
stageNames,
|
||||
stageCount: stages.length,
|
||||
terminalized: stageNames.includes("terminalized") || stageNames.includes("cancel-terminalized"),
|
||||
lateWriteRejected,
|
||||
valuesPrinted: false,
|
||||
};
|
||||
}
|
||||
|
||||
function livenessSnapshot(run: RunRecord, command: CommandRecord | null, events: RunEvent[], scopedEvents: RunEvent[], terminal: TerminalStatus | null, failureKind: FailureKind | null, failureMessage: string | null, completion: { responseAuthority: string; needsContinuation: boolean }): JsonRecord {
|
||||
const nowMs = Date.now();
|
||||
const active = terminal === null && !runIsTerminal(run) && !commandIsTerminal(command);
|
||||
|
||||
+1
-1
@@ -590,7 +590,7 @@ async function route({ method, url, body, store, sourceCommit, authSummary, runn
|
||||
const reason = typeof record.reason === "string" && record.reason.trim().length > 0 ? record.reason.trim() : undefined;
|
||||
if (session.activeCommandId) return await store.cancelCommand(session.activeCommandId, reason) as unknown as JsonValue;
|
||||
if (session.activeRunId) return await store.cancelRun(session.activeRunId, reason) as unknown as JsonValue;
|
||||
throw new AgentRunError("schema-invalid", `session ${session.sessionId} has no active run or command`, { httpStatus: 409 });
|
||||
return { action: "session-cancel", sessionId: session.sessionId, cancelled: false, reason: reason ?? null, session, valuesPrinted: false } as unknown as JsonValue;
|
||||
}
|
||||
throw new AgentRunError("schema-invalid", `session control action ${action} is not supported`, { httpStatus: 400 });
|
||||
}
|
||||
|
||||
+129
-17
@@ -1,4 +1,4 @@
|
||||
import type { BackendProfile, BackendTurnResult, CommandRecord, CreateCommandInput, CreateQueueTaskInput, CreateRunInput, FailureKind, JsonRecord, ListGcExpiredSessionsInput, QueueAttemptRef, QueueCommanderSnapshot, QueueReadCursorRecord, QueueStats, QueueTaskListResult, QueueTaskRecord, QueueTaskState, QueueTaskSummary, RunEvent, RunnerJobRecord, RunnerRecord, RunRecord, SessionEventPage, SessionListResult, SessionListState, SessionReadCursorRecord, SessionRecord, SessionRef, SessionStoragePatch, SessionSummary, TerminalStatus, UpsertSessionInput } from "../common/types.js";
|
||||
import type { BackendProfile, BackendTurnResult, CancelRequestRecord, CancelStage, CancelTargetKind, CommandRecord, CreateCommandInput, CreateQueueTaskInput, CreateRunInput, FailureKind, JsonRecord, ListGcExpiredSessionsInput, QueueAttemptRef, QueueCommanderSnapshot, QueueReadCursorRecord, QueueStats, QueueTaskListResult, QueueTaskRecord, QueueTaskState, QueueTaskSummary, RunEvent, RunnerJobRecord, RunnerRecord, RunRecord, SessionEventPage, SessionListResult, SessionListState, SessionReadCursorRecord, SessionRecord, SessionRef, SessionStoragePatch, SessionSummary, TerminalStatus, UpsertSessionInput } from "../common/types.js";
|
||||
import { AgentRunError } from "../common/errors.js";
|
||||
import { newId, nowIso, stableHash } from "../common/validation.js";
|
||||
import { redactJson } from "../common/redaction.js";
|
||||
@@ -136,6 +136,7 @@ export class MemoryAgentRunStore implements AgentRunStore {
|
||||
private readonly eventsByRun = new Map<string, RunEvent[]>();
|
||||
private readonly runners = new Map<string, RunnerRecord>();
|
||||
private readonly sessions = new Map<string, SessionRecord>();
|
||||
private readonly cancelRequests = new Map<string, CancelRequestRecord>();
|
||||
private readonly sessionReadCursors = new Map<string, SessionReadCursorRecord>();
|
||||
private readonly runnerJobs = new Map<string, RunnerJobRecord>();
|
||||
private readonly queueTasks = new Map<string, QueueTaskRecord>();
|
||||
@@ -150,7 +151,7 @@ export class MemoryAgentRunStore implements AgentRunStore {
|
||||
createRun(input: CreateRunInput): RunRecord {
|
||||
const at = nowIso();
|
||||
const sessionRef = this.resolveSessionForRun(input, at);
|
||||
const run: RunRecord = { ...input, sessionRef, resourceBundleRef: input.resourceBundleRef ?? null, id: newId("run"), status: "pending", terminalStatus: null, failureKind: null, failureMessage: null, createdAt: at, updatedAt: at, claimedBy: null, leaseExpiresAt: null };
|
||||
const run: RunRecord = { ...input, sessionRef, resourceBundleRef: input.resourceBundleRef ?? null, id: newId("run"), status: "pending", terminalStatus: null, failureKind: null, failureMessage: null, cancelEpoch: 0, cancelRequestId: null, cancelRequestedAt: null, cancelReason: null, createdAt: at, updatedAt: at, claimedBy: null, leaseExpiresAt: null };
|
||||
this.runs.set(run.id, run);
|
||||
this.eventsByRun.set(run.id, []);
|
||||
this.touchSessionForRun(run, { lastRunId: run.id, lastActivityAt: at }, { bumpVersion: false, at });
|
||||
@@ -190,7 +191,7 @@ export class MemoryAgentRunStore implements AgentRunStore {
|
||||
}
|
||||
const at = nowIso();
|
||||
const seq = Array.from(this.commands.values()).filter((command) => command.runId === runId).length + 1;
|
||||
const command: CommandRecord = { ...input, id: newId("cmd"), runId, seq, state: "pending", payloadHash, createdAt: at, updatedAt: at, acknowledgedAt: null };
|
||||
const command: CommandRecord = { ...input, id: newId("cmd"), runId, seq, state: "pending", payloadHash, cancelEpoch: 0, cancelRequestId: null, cancelRequestedAt: null, cancelReason: null, createdAt: at, updatedAt: at, acknowledgedAt: null };
|
||||
this.commands.set(command.id, command);
|
||||
if (command.type === "turn") this.touchSessionForRun(run, { executionState: "running", lastRunId: run.id, lastCommandId: command.id, activeRunId: run.id, activeCommandId: command.id, terminalStatus: null, failureKind: null, title: sessionTitleFromCommand(command), lastActivityAt: at }, { bumpVersion: true, at });
|
||||
else if (command.type === "steer") this.touchSessionForRun(run, { executionState: "running", lastRunId: run.id, lastCommandId: command.id, activeRunId: run.id, lastActivityAt: at }, { bumpVersion: true, at });
|
||||
@@ -288,7 +289,10 @@ export class MemoryAgentRunStore implements AgentRunStore {
|
||||
|
||||
finishCommand(commandId: string, result: Pick<BackendTurnResult, "terminalStatus" | "failureKind" | "failureMessage" | "threadId" | "turnId">): CommandRecord {
|
||||
const command = this.getCommand(commandId);
|
||||
if (isTerminalCommandState(command.state)) return command;
|
||||
if (isTerminalCommandState(command.state)) {
|
||||
if (command.state === "cancelled" && result.terminalStatus !== "cancelled") this.appendLateWriteRejected(command.runId, command, result, "command-status");
|
||||
return command;
|
||||
}
|
||||
const next = { ...command, state: commandStateFromTerminal(result.terminalStatus), updatedAt: nowIso() };
|
||||
this.commands.set(commandId, next);
|
||||
const run = this.getRun(command.runId);
|
||||
@@ -299,11 +303,12 @@ export class MemoryAgentRunStore implements AgentRunStore {
|
||||
}
|
||||
|
||||
appendEvent(runId: string, type: RunEvent["type"], payload: JsonRecord): RunEvent {
|
||||
this.getRun(runId);
|
||||
const run = this.getRun(runId);
|
||||
const eventType = requireEventType(type);
|
||||
const eventPayload = normalizeRunEventPayload(eventType, payload);
|
||||
const fenced = fenceLateEventForCancelledRun(run, eventType, payload);
|
||||
const eventPayload = normalizeRunEventPayload(fenced.type, fenced.payload);
|
||||
const events = this.eventsByRun.get(runId) ?? [];
|
||||
const event: RunEvent = { id: newId("evt"), runId, seq: events.length + 1, type: eventType, payload: redactJson(eventPayload), createdAt: nowIso() };
|
||||
const event: RunEvent = { id: newId("evt"), runId, seq: events.length + 1, type: fenced.type, payload: redactJson(eventPayload), createdAt: nowIso() };
|
||||
events.push(event);
|
||||
this.eventsByRun.set(runId, events);
|
||||
this.touchSessionForRun(this.getRun(runId), { lastEventSeq: event.seq, lastActivityAt: event.createdAt }, { bumpVersion: false, at: event.createdAt });
|
||||
@@ -312,7 +317,10 @@ export class MemoryAgentRunStore implements AgentRunStore {
|
||||
|
||||
finishRun(runId: string, result: Pick<BackendTurnResult, "terminalStatus" | "failureKind" | "failureMessage" | "threadId" | "turnId">): RunRecord {
|
||||
const existing = this.getRun(runId);
|
||||
if (isTerminalRunStatus(existing.status)) return existing;
|
||||
if (isTerminalRunStatus(existing.status)) {
|
||||
if (existing.status === "cancelled" && result.terminalStatus !== "cancelled") this.appendLateWriteRejected(runId, null, result, "run-status");
|
||||
return existing;
|
||||
}
|
||||
const status = statusFromTerminal(result.terminalStatus);
|
||||
const next = this.updateRun(runId, { status, terminalStatus: result.terminalStatus, failureKind: result.failureKind, failureMessage: result.failureMessage });
|
||||
if (result.threadId && next.sessionRef?.sessionId) this.upsertSessionThread(next, result.threadId, result.turnId ?? null);
|
||||
@@ -325,14 +333,24 @@ export class MemoryAgentRunStore implements AgentRunStore {
|
||||
const run = this.getRun(runId);
|
||||
if (isTerminalRunStatus(run.status)) return run;
|
||||
const at = nowIso();
|
||||
for (const command of Array.from(this.commands.values()).filter((item) => item.runId === runId && !isTerminalCommandState(item.state))) {
|
||||
const cancelled = { ...command, state: "cancelled" as const, updatedAt: at };
|
||||
this.commands.set(command.id, cancelled);
|
||||
this.appendEvent(runId, "backend_status", { phase: "command-terminal", commandId: command.id, state: "cancelled", terminalStatus: "cancelled", failureKind: "cancelled" });
|
||||
const cancel = this.createCancelRequest({ targetKind: "run", targetId: runId, run, command: null, reason, at, stage: "accepted" });
|
||||
this.appendCancelStage(runId, "accepted", cancel);
|
||||
const persistedRun = this.updateRun(runId, { cancelEpoch: cancel.epoch, cancelRequestId: cancel.id, cancelRequestedAt: at, cancelReason: reason });
|
||||
this.appendCancelStage(runId, "persisted", cancel);
|
||||
const leaseExpired = Boolean(persistedRun.claimedBy && isLeaseExpired(persistedRun.leaseExpiresAt));
|
||||
if (leaseExpired) this.appendCancelStage(runId, "fenced", cancel, { claimedBy: persistedRun.claimedBy, leaseExpiresAt: persistedRun.leaseExpiresAt });
|
||||
else if (persistedRun.claimedBy) {
|
||||
this.appendCancelStage(runId, "delivered", cancel, { claimedBy: persistedRun.claimedBy });
|
||||
this.appendCancelStage(runId, "aborting", cancel, { claimedBy: persistedRun.claimedBy });
|
||||
}
|
||||
for (const command of Array.from(this.commands.values()).filter((item) => item.runId === runId && !isTerminalCommandState(item.state))) {
|
||||
const cancelled = { ...command, state: "cancelled" as const, cancelEpoch: cancel.epoch, cancelRequestId: cancel.id, cancelRequestedAt: at, cancelReason: reason, updatedAt: at };
|
||||
this.commands.set(command.id, cancelled);
|
||||
this.appendEvent(runId, "backend_status", { phase: "command-terminal", commandId: command.id, state: "cancelled", terminalStatus: "cancelled", failureKind: "cancelled", cancelRequestId: cancel.id, cancelEpoch: cancel.epoch });
|
||||
}
|
||||
this.appendEvent(runId, "backend_status", { phase: "cancel-requested", reason });
|
||||
const next = this.updateRun(runId, { status: "cancelled", terminalStatus: "cancelled", failureKind: "cancelled", failureMessage: reason });
|
||||
this.appendEvent(runId, "terminal_status", { terminalStatus: "cancelled", failureKind: "cancelled", message: reason });
|
||||
this.appendEvent(runId, "terminal_status", { terminalStatus: "cancelled", failureKind: "cancelled", message: reason, cancelRequestId: cancel.id, cancelEpoch: cancel.epoch });
|
||||
this.appendCancelStage(runId, "terminalized", cancel);
|
||||
this.touchSessionForRun(next, { executionState: "terminal", activeRunId: null, activeCommandId: null, lastRunId: runId, terminalStatus: "cancelled", failureKind: "cancelled", lastActivityAt: next.updatedAt }, { bumpVersion: true, at: next.updatedAt });
|
||||
return next;
|
||||
}
|
||||
@@ -340,10 +358,22 @@ export class MemoryAgentRunStore implements AgentRunStore {
|
||||
cancelCommand(commandId: string, reason = "cancel requested"): CommandRecord {
|
||||
const command = this.getCommand(commandId);
|
||||
if (isTerminalCommandState(command.state)) return command;
|
||||
const next = { ...command, state: "cancelled" as const, updatedAt: nowIso() };
|
||||
const run = this.getRun(command.runId);
|
||||
const at = nowIso();
|
||||
const cancel = this.createCancelRequest({ targetKind: "command", targetId: commandId, run, command, reason, at, stage: "accepted" });
|
||||
this.appendCancelStage(command.runId, "accepted", cancel);
|
||||
const next = { ...command, state: "cancelled" as const, cancelEpoch: cancel.epoch, cancelRequestId: cancel.id, cancelRequestedAt: at, cancelReason: reason, updatedAt: at };
|
||||
this.commands.set(commandId, next);
|
||||
this.appendEvent(command.runId, "backend_status", { phase: "command-terminal", commandId, state: "cancelled", terminalStatus: "cancelled", failureKind: "cancelled", message: reason });
|
||||
if (command.type === "turn") this.touchSessionForRun(this.getRun(command.runId), { executionState: "terminal", activeRunId: null, activeCommandId: null, lastRunId: command.runId, lastCommandId: command.id, terminalStatus: "cancelled", failureKind: "cancelled", lastActivityAt: next.updatedAt }, { bumpVersion: true, at: next.updatedAt });
|
||||
this.appendCancelStage(command.runId, "persisted", cancel);
|
||||
const leaseExpired = Boolean(run.claimedBy && isLeaseExpired(run.leaseExpiresAt));
|
||||
if (leaseExpired) this.appendCancelStage(command.runId, "fenced", cancel, { claimedBy: run.claimedBy, leaseExpiresAt: run.leaseExpiresAt });
|
||||
else if (run.claimedBy || command.state === "acknowledged") {
|
||||
this.appendCancelStage(command.runId, "delivered", cancel, { claimedBy: run.claimedBy, commandState: command.state });
|
||||
this.appendCancelStage(command.runId, "aborting", cancel, { claimedBy: run.claimedBy, commandState: command.state });
|
||||
}
|
||||
this.appendEvent(command.runId, "backend_status", { phase: "command-terminal", commandId, state: "cancelled", terminalStatus: "cancelled", failureKind: "cancelled", message: reason, cancelRequestId: cancel.id, cancelEpoch: cancel.epoch });
|
||||
this.appendCancelStage(command.runId, "terminalized", cancel);
|
||||
if (command.type === "turn") this.touchSessionForRun(run, { executionState: "terminal", activeRunId: null, activeCommandId: null, lastRunId: command.runId, lastCommandId: command.id, terminalStatus: "cancelled", failureKind: "cancelled", lastActivityAt: next.updatedAt }, { bumpVersion: true, at: next.updatedAt });
|
||||
return next;
|
||||
}
|
||||
|
||||
@@ -580,6 +610,39 @@ export class MemoryAgentRunStore implements AgentRunStore {
|
||||
return next;
|
||||
}
|
||||
|
||||
private createCancelRequest(input: { targetKind: CancelTargetKind; targetId: string; run: RunRecord; command: CommandRecord | null; reason: string; at: string; stage: CancelStage }): CancelRequestRecord {
|
||||
const sessionId = input.run.sessionRef?.sessionId ?? null;
|
||||
const epoch = input.command ? input.command.cancelEpoch + 1 : input.run.cancelEpoch + 1;
|
||||
const record: CancelRequestRecord = {
|
||||
id: newId("cancel"),
|
||||
targetKind: input.targetKind,
|
||||
targetId: input.targetId,
|
||||
runId: input.run.id,
|
||||
commandId: input.command?.id ?? null,
|
||||
sessionId,
|
||||
taskId: null,
|
||||
reason: input.reason,
|
||||
requestedBy: null,
|
||||
epoch,
|
||||
stage: input.stage,
|
||||
metadata: {},
|
||||
createdAt: input.at,
|
||||
updatedAt: input.at,
|
||||
};
|
||||
this.cancelRequests.set(record.id, record);
|
||||
return record;
|
||||
}
|
||||
|
||||
private appendCancelStage(runId: string, stage: CancelStage, cancel: CancelRequestRecord, extra: JsonRecord = {}): void {
|
||||
const next: CancelRequestRecord = { ...cancel, stage, updatedAt: nowIso() };
|
||||
this.cancelRequests.set(cancel.id, next);
|
||||
this.appendEvent(runId, "backend_status", cancelStagePayload(next, stage, extra));
|
||||
}
|
||||
|
||||
private appendLateWriteRejected(runId: string, command: CommandRecord | null, result: Pick<BackendTurnResult, "terminalStatus" | "failureKind" | "failureMessage" | "threadId" | "turnId">, source: string): void {
|
||||
this.appendEvent(runId, "backend_status", lateWriteRejectedPayload(this.getRun(runId), command, { source, terminalStatus: result.terminalStatus, failureKind: result.failureKind, message: result.failureMessage ?? null, threadId: result.threadId ?? null, turnId: result.turnId ?? null }));
|
||||
}
|
||||
|
||||
private nextQueueVersion(): number {
|
||||
this.queueVersion += 1;
|
||||
return this.queueVersion;
|
||||
@@ -710,6 +773,55 @@ export function isTerminalCommandState(state: CommandRecord["state"]): boolean {
|
||||
return state === "completed" || state === "failed" || state === "cancelled";
|
||||
}
|
||||
|
||||
export function cancelStagePayload(cancel: CancelRequestRecord, stage: CancelStage, extra: JsonRecord = {}): JsonRecord {
|
||||
return {
|
||||
phase: cancelPhase(stage),
|
||||
cancelStage: stage,
|
||||
cancelRequestId: cancel.id,
|
||||
targetKind: cancel.targetKind,
|
||||
targetId: cancel.targetId,
|
||||
runId: cancel.runId,
|
||||
commandId: cancel.commandId,
|
||||
sessionId: cancel.sessionId,
|
||||
taskId: cancel.taskId,
|
||||
reason: cancel.reason,
|
||||
requestedBy: cancel.requestedBy,
|
||||
cancelEpoch: cancel.epoch,
|
||||
...extra,
|
||||
};
|
||||
}
|
||||
|
||||
export function fenceLateEventForCancelledRun(run: RunRecord, type: RunEvent["type"], payload: JsonRecord): { type: RunEvent["type"]; payload: JsonRecord } {
|
||||
if (run.status !== "cancelled" || isAllowedAfterCancel(type, payload)) return { type, payload };
|
||||
return { type: "backend_status", payload: lateWriteRejectedPayload(run, null, { eventType: type, payload }) };
|
||||
}
|
||||
|
||||
export function lateWriteRejectedPayload(run: RunRecord, command: CommandRecord | null, details: JsonRecord): JsonRecord {
|
||||
return {
|
||||
phase: "late-write-rejected",
|
||||
cancelStage: "late-write-rejected",
|
||||
cancelRequestId: command?.cancelRequestId ?? run.cancelRequestId,
|
||||
cancelEpoch: command?.cancelEpoch ?? run.cancelEpoch,
|
||||
runId: run.id,
|
||||
commandId: command?.id ?? null,
|
||||
reason: command?.cancelReason ?? run.cancelReason,
|
||||
rejected: redactJson(details),
|
||||
valuesPrinted: false,
|
||||
};
|
||||
}
|
||||
|
||||
function cancelPhase(stage: CancelStage): string {
|
||||
return stage === "late-write-rejected" ? "late-write-rejected" : `cancel-${stage}`;
|
||||
}
|
||||
|
||||
function isAllowedAfterCancel(type: RunEvent["type"], payload: JsonRecord): boolean {
|
||||
if (type === "terminal_status" && payload.terminalStatus === "cancelled") return true;
|
||||
const phase = typeof payload.phase === "string" ? payload.phase : "";
|
||||
if (phase.startsWith("cancel-") || phase === "late-write-rejected" || phase === "turn-cancelled") return true;
|
||||
if (phase === "command-terminal" && payload.failureKind === "cancelled") return true;
|
||||
return false;
|
||||
}
|
||||
|
||||
export function isTerminalQueueTaskState(state: QueueTaskState): boolean {
|
||||
return state === "completed" || state === "failed" || state === "blocked" || state === "cancelled";
|
||||
}
|
||||
|
||||
+16
-3
@@ -260,7 +260,7 @@ async function executeCommand(api: RunnerManagerApi, options: RunnerOnceOptions,
|
||||
await api.appendEvent(options.runId, { type: "backend_status", payload: { phase: "backend-turn-started", commandId: command.id, attemptId, runnerId: runner.id, backendProfile: options.backendProfile ?? null, workspaceReady: Boolean(workspacePath) } });
|
||||
await runnerLog.write("command.started", { runId: options.runId, commandId: command.id, runnerId: runner.id, runnerJobId: options.runnerJobId ?? null, attemptId, backendProfile: options.backendProfile ?? null, valuesPrinted: false });
|
||||
const abortController = new AbortController();
|
||||
const stopCancelWatch = watchCancellation(api, options.runId, command.id, abortController);
|
||||
const stopCancelWatch = watchCancellation(api, options.runId, command.id, abortController, { attemptId, runnerId: runner.id, ...(options.runnerJobId ? { runnerJobId: options.runnerJobId } : {}), runnerLog });
|
||||
const backendProgress = startBackendProgress();
|
||||
let stopSteerWatch: (() => void) | undefined;
|
||||
const terminalOutboxEvents: BackendEvent[] = [];
|
||||
@@ -501,13 +501,26 @@ async function assertNotCancelled(api: RunnerManagerApi, runId: string, commandI
|
||||
if (run.status === "cancelled" || command.state === "cancelled") throw new AgentRunError("cancelled", "run or command was cancelled", { httpStatus: 409 });
|
||||
}
|
||||
|
||||
function watchCancellation(api: RunnerManagerApi, runId: string, commandId: string, controller: AbortController): () => void {
|
||||
function watchCancellation(api: RunnerManagerApi, runId: string, commandId: string, controller: AbortController, context: { attemptId: string; runnerId: string; runnerJobId?: string; runnerLog: RunnerLogSink }): () => void {
|
||||
let stopped = false;
|
||||
let reported = false;
|
||||
const check = async (): Promise<void> => {
|
||||
if (stopped || controller.signal.aborted) return;
|
||||
try {
|
||||
const [run, command] = await Promise.all([api.getRun(runId), api.getCommand(runId, commandId)]);
|
||||
if (run.status === "cancelled" || command.state === "cancelled") controller.abort();
|
||||
if (run.status === "cancelled" || command.state === "cancelled") {
|
||||
if (!reported) {
|
||||
reported = true;
|
||||
const cancelRequestId = command.cancelRequestId ?? run.cancelRequestId;
|
||||
const cancelEpoch = command.cancelEpoch || run.cancelEpoch;
|
||||
const reason = command.cancelReason ?? run.cancelReason ?? "cancel requested";
|
||||
const base = { commandId, attemptId: context.attemptId, runnerId: context.runnerId, runnerJobId: context.runnerJobId ?? null, cancelRequestId, cancelEpoch, reason, source: "runner-watch" };
|
||||
await appendBestEffort(api, runId, { type: "backend_status", payload: { ...base, phase: "cancel-delivered", cancelStage: "delivered", deliveryState: "observed-by-runner", runStatus: run.status, commandState: command.state } });
|
||||
await appendBestEffort(api, runId, { type: "backend_status", payload: { ...base, phase: "cancel-aborting", cancelStage: "aborting", abortSignal: "triggered" } });
|
||||
await context.runnerLog.write("cancel.observed", { runId, ...base, valuesPrinted: false });
|
||||
}
|
||||
controller.abort();
|
||||
}
|
||||
} catch {
|
||||
// Cancellation polling must not hide the backend's own terminal result.
|
||||
}
|
||||
|
||||
@@ -13,13 +13,15 @@ const selfTest: SelfTestCase = async () => {
|
||||
(error) => error instanceof AgentRunError && error.failureKind === "infra-failed" && error.message.includes("DATABASE_URL is required"),
|
||||
);
|
||||
const postgresContract = postgresMigrationContract();
|
||||
assert.equal(postgresContract.latestMigrationId, "010_v01_queue_session_ref");
|
||||
assert.equal(postgresContract.latestMigrationId, "011_v01_cancel_lifecycle");
|
||||
assert.equal((postgresContract.migrationIds as string[]).includes("008_v01_dsflash_go_backend_profile"), true);
|
||||
assert.equal((postgresContract.migrationIds as string[]).includes("009_v01_dsflash_go_model_catalog"), true);
|
||||
assert.equal((postgresContract.migrationIds as string[]).includes("010_v01_queue_session_ref"), true);
|
||||
assert.equal((postgresContract.migrationIds as string[]).includes("011_v01_cancel_lifecycle"), true);
|
||||
assert.ok(typeof (postgresContract.checksums as Record<string, string>)["008_v01_dsflash_go_backend_profile"] === "string" && (postgresContract.checksums as Record<string, string>)["008_v01_dsflash_go_backend_profile"].length > 0);
|
||||
assert.ok(typeof (postgresContract.checksums as Record<string, string>)["009_v01_dsflash_go_model_catalog"] === "string" && (postgresContract.checksums as Record<string, string>)["009_v01_dsflash_go_model_catalog"].length > 0);
|
||||
assert.ok(typeof (postgresContract.checksums as Record<string, string>)["010_v01_queue_session_ref"] === "string" && (postgresContract.checksums as Record<string, string>)["010_v01_queue_session_ref"].length > 0);
|
||||
assert.ok(typeof (postgresContract.checksums as Record<string, string>)["011_v01_cancel_lifecycle"] === "string" && (postgresContract.checksums as Record<string, string>)["011_v01_cancel_lifecycle"].length > 0);
|
||||
assert.equal((postgresContract.checksums as Record<string, string>)["002_v01_backend_profiles"], "928b5c490cc4539cb64ecef34784557601b2724fa2870570f16a53576804e49c");
|
||||
assert.ok(Array.isArray(postgresContract.requiredTables));
|
||||
assert.ok(postgresContract.requiredTables.includes("agentrun_schema_migrations"));
|
||||
@@ -29,6 +31,7 @@ const selfTest: SelfTestCase = async () => {
|
||||
assert.ok(postgresContract.requiredTables.includes("agentrun_runner_jobs"));
|
||||
assert.ok(postgresContract.requiredTables.includes("agentrun_queue_tasks"));
|
||||
assert.ok(postgresContract.requiredTables.includes("agentrun_queue_read_cursors"));
|
||||
assert.ok(postgresContract.requiredTables.includes("agentrun_cancel_requests"));
|
||||
return { name: "redaction-postgres", tests: ["redaction", "postgres-store-contract"] };
|
||||
};
|
||||
|
||||
|
||||
@@ -450,6 +450,10 @@ process.exit(1);
|
||||
terminalStatus: null,
|
||||
failureKind: null,
|
||||
failureMessage: null,
|
||||
cancelEpoch: 0,
|
||||
cancelRequestId: null,
|
||||
cancelRequestedAt: null,
|
||||
cancelReason: null,
|
||||
createdAt: new Date().toISOString(),
|
||||
updatedAt: new Date().toISOString(),
|
||||
claimedBy: null,
|
||||
|
||||
@@ -90,11 +90,37 @@ process.exit(1);
|
||||
assert.equal(cancelledRun.failureKind, "cancelled");
|
||||
const cancelledCommand = await client.get(`/api/v1/runs/${pendingCancel.runId}/commands/${pendingCancel.commandId}`) as { state?: string };
|
||||
assert.equal(cancelledCommand.state, "cancelled");
|
||||
const pendingCancelEvents = await client.get(`/api/v1/runs/${pendingCancel.runId}/events?limit=100`) as { items?: JsonRecord[] };
|
||||
const pendingCancelPhases = (pendingCancelEvents.items ?? []).map((event) => String(((event.payload as JsonRecord | undefined)?.phase) ?? ""));
|
||||
assert.ok(pendingCancelPhases.includes("cancel-accepted"));
|
||||
assert.ok(pendingCancelPhases.includes("cancel-persisted"));
|
||||
assert.ok(pendingCancelPhases.includes("cancel-terminalized"));
|
||||
const pendingCancelResult = await client.get(`/api/v1/runs/${pendingCancel.runId}/commands/${pendingCancel.commandId}/result`) as JsonRecord;
|
||||
assert.equal(pendingCancelResult.terminalStatus, "cancelled");
|
||||
assert.equal(pendingCancelResult.completed, false);
|
||||
assert.equal(((pendingCancelResult.cancelLifecycle as JsonRecord).terminalized), true);
|
||||
assert.equal(((pendingCancelResult.cancelLifecycle as JsonRecord).lateWriteRejected), 0);
|
||||
await client.patch(`/api/v1/commands/${pendingCancel.commandId}/status`, { terminalStatus: "completed", failureKind: null, failureMessage: null });
|
||||
const lateCancelEvents = await client.get(`/api/v1/runs/${pendingCancel.runId}/events?limit=100`) as { items?: JsonRecord[] };
|
||||
const lateCancelPhases = (lateCancelEvents.items ?? []).map((event) => String(((event.payload as JsonRecord | undefined)?.phase) ?? ""));
|
||||
assert.ok(lateCancelPhases.includes("late-write-rejected"));
|
||||
const lateCancelResult = await client.get(`/api/v1/runs/${pendingCancel.runId}/commands/${pendingCancel.commandId}/result`) as JsonRecord;
|
||||
assert.equal(lateCancelResult.terminalStatus, "cancelled");
|
||||
assert.equal(lateCancelResult.completed, false);
|
||||
assert.equal(((lateCancelResult.cancelLifecycle as JsonRecord).lateWriteRejected), 1);
|
||||
await assert.rejects(
|
||||
() => client.post(`/api/v1/runs/${pendingCancel.runId}/runner-jobs`, { commandId: pendingCancel.commandId, idempotencyKey: "hwlab-cancelled-job" }),
|
||||
(error) => error instanceof Error && error.message.includes("already terminal"),
|
||||
);
|
||||
|
||||
const fencedCancel = await createHwlabRun(client, context, bundle, "hwlab-session-cancel-fenced", "cancel fenced", "hwlab-command-cancel-fenced");
|
||||
await client.post(`/api/v1/runs/${fencedCancel.runId}/claim`, { runnerId: "stale-runner", leaseMs: 1 });
|
||||
await new Promise((resolve) => setTimeout(resolve, 10));
|
||||
await client.post(`/api/v1/runs/${fencedCancel.runId}/cancel`, { reason: "self-test fenced cancel" });
|
||||
const fencedCancelEvents = await client.get(`/api/v1/runs/${fencedCancel.runId}/events?limit=100`) as { items?: JsonRecord[] };
|
||||
const fencedCancelPhases = (fencedCancelEvents.items ?? []).map((event) => String(((event.payload as JsonRecord | undefined)?.phase) ?? ""));
|
||||
assert.ok(fencedCancelPhases.includes("cancel-fenced"));
|
||||
|
||||
const sessionRun = await createHwlabRun(client, context, bundle, "hwlab-session-resume", "hello session", "hwlab-command-session");
|
||||
const resourceBinPath = path.join(context.tmp, "resource-bin");
|
||||
const runResult = await runOnce({ managerUrl: server.baseUrl, runId: sessionRun.runId, codexCommand: context.fakeCodexCommand, codexArgs: context.fakeCodexArgs, codexHome: context.codexHome, env: { CODEX_HOME: context.codexHome, AGENTRUN_WORKSPACE_ROOT: path.join(context.tmp, "workspaces"), AGENTRUN_RESOURCE_BIN_PATH: resourceBinPath }, oneShot: true });
|
||||
@@ -213,7 +239,7 @@ process.exit(1);
|
||||
|
||||
const multiTurn = await createHwlabRun(client, context, bundle, "hwlab-session-multiturn", "hello first turn", "hwlab-command-multiturn-1");
|
||||
const fakeCodexStartFile = path.join(context.tmp, "fake-codex-starts-multiturn.txt");
|
||||
const multiturnRunner = runOnce({ managerUrl: server.baseUrl, runId: multiTurn.runId, commandId: multiTurn.commandId, codexCommand: context.fakeCodexCommand, codexArgs: context.fakeCodexArgs, codexHome: context.codexHome, env: { CODEX_HOME: context.codexHome, AGENTRUN_WORKSPACE_ROOT: path.join(context.tmp, "workspaces-multiturn"), AGENTRUN_FAKE_CODEX_START_FILE: fakeCodexStartFile }, idleTimeoutMs: 500, pollIntervalMs: 50 });
|
||||
const multiturnRunner = runOnce({ managerUrl: server.baseUrl, runId: multiTurn.runId, commandId: multiTurn.commandId, codexCommand: context.fakeCodexCommand, codexArgs: context.fakeCodexArgs, codexHome: context.codexHome, env: { CODEX_HOME: context.codexHome, AGENTRUN_WORKSPACE_ROOT: path.join(context.tmp, "workspaces-multiturn"), AGENTRUN_FAKE_CODEX_START_FILE: fakeCodexStartFile }, idleTimeoutMs: 2_000, pollIntervalMs: 50 });
|
||||
await waitForCommandState(client, multiTurn.runId, multiTurn.commandId, "completed");
|
||||
const secondCommand = await client.post(`/api/v1/runs/${multiTurn.runId}/commands`, { type: "turn", payload: { prompt: "hello second turn", traceId: "hwlab-command-multiturn-2" }, idempotencyKey: "hwlab-command-multiturn-2" }) as { id: string };
|
||||
await waitForCommandState(client, multiTurn.runId, secondCommand.id, "completed");
|
||||
@@ -267,14 +293,14 @@ process.exit(1);
|
||||
const noEventWatchdogRunner = runOnce({ managerUrl: server.baseUrl, runId: noEventWatchdog.runId, commandId: noEventWatchdog.commandId, codexCommand: context.fakeCodexCommand, codexArgs: context.fakeCodexArgs, codexHome: context.codexHome, env: { CODEX_HOME: context.codexHome, AGENTRUN_FAKE_CODEX_MODE: "tool-completes-without-terminal", AGENTRUN_RUNNER_IDLE_TIMEOUT_MS: "300", AGENTRUN_WORKSPACE_ROOT: path.join(context.tmp, "workspaces-no-event-watchdog") }, oneShot: true, pollIntervalMs: 50 });
|
||||
await waitForCommandState(client, noEventWatchdog.runId, noEventWatchdog.commandId, "acknowledged");
|
||||
await waitForEvent(client, noEventWatchdog.runId, (event) => event.type === "tool_call" && (event.payload as JsonRecord).status === "completed", "tool_call completed before no-event watchdog");
|
||||
await waitForCommandState(client, noEventWatchdog.runId, noEventWatchdog.commandId, "failed");
|
||||
await waitForCommandState(client, noEventWatchdog.runId, noEventWatchdog.commandId, "failed", 15_000);
|
||||
const noEventWatchdogResult = await noEventWatchdogRunner as JsonRecord;
|
||||
assert.equal(noEventWatchdogResult.terminalStatus, "failed");
|
||||
assert.equal(noEventWatchdogResult.failureKind, "backend-timeout");
|
||||
const idleEnvelope = await client.get(`/api/v1/runs/${noEventWatchdog.runId}/commands/${noEventWatchdog.commandId}/result`) as JsonRecord;
|
||||
assert.equal(idleEnvelope.terminalStatus, "failed");
|
||||
assert.equal(idleEnvelope.failureKind, "backend-timeout");
|
||||
assert.match(String(idleEnvelope.failureMessage ?? idleEnvelope.message ?? ""), /idle timed out/u);
|
||||
assert.match(String(idleEnvelope.failureMessage ?? idleEnvelope.message ?? ""), /timed out/u);
|
||||
|
||||
const outputWithoutTerminal = await createHwlabRun(client, context, bundle, "hwlab-session-output-without-terminal", "start a tool that keeps printing without terminal", "hwlab-command-output-without-terminal", 500);
|
||||
const outputWithoutTerminalRunner = runOnce({ managerUrl: server.baseUrl, runId: outputWithoutTerminal.runId, commandId: outputWithoutTerminal.commandId, codexCommand: context.fakeCodexCommand, codexArgs: context.fakeCodexArgs, codexHome: context.codexHome, env: { CODEX_HOME: context.codexHome, AGENTRUN_FAKE_CODEX_MODE: "tool-output-without-terminal", AGENTRUN_WORKSPACE_ROOT: path.join(context.tmp, "workspaces-output-without-terminal") }, oneShot: true, pollIntervalMs: 50 });
|
||||
@@ -404,14 +430,24 @@ function defaultGitBundles(): ResourceBundleRef["bundles"] {
|
||||
];
|
||||
}
|
||||
|
||||
async function waitForCommandState(client: ManagerClient, runId: string, commandId: string, state: string): Promise<void> {
|
||||
const deadline = Date.now() + 5_000;
|
||||
async function waitForCommandState(client: ManagerClient, runId: string, commandId: string, state: string, timeoutMs = 5_000): Promise<void> {
|
||||
const deadline = Date.now() + timeoutMs;
|
||||
let lastState: string | undefined;
|
||||
while (Date.now() < deadline) {
|
||||
const command = await client.get(`/api/v1/runs/${runId}/commands/${commandId}`) as { state?: string };
|
||||
lastState = command.state;
|
||||
if (command.state === state) return;
|
||||
await new Promise((resolve) => setTimeout(resolve, 100));
|
||||
}
|
||||
throw new Error(`command ${commandId} did not reach ${state}`);
|
||||
const events = await client.get(`/api/v1/runs/${runId}/events?afterSeq=0&limit=200`) as { items?: Array<{ type?: string; payload?: JsonRecord }> };
|
||||
const phases = (events.items ?? []).map((event) => String(event.payload?.phase ?? event.type ?? "")).slice(-12);
|
||||
let result: JsonRecord | null = null;
|
||||
try {
|
||||
result = await client.get(`/api/v1/runs/${runId}/commands/${commandId}/result`) as JsonRecord;
|
||||
} catch {
|
||||
result = null;
|
||||
}
|
||||
throw new Error(`command ${commandId} did not reach ${state}; lastState=${lastState ?? "unknown"}; terminal=${String(result?.terminalStatus ?? "unknown")}; failureKind=${String(result?.failureKind ?? "unknown")}; phases=${phases.join(",")}`);
|
||||
}
|
||||
|
||||
async function waitForEvent(client: ManagerClient, runId: string, predicate: (event: { type?: string; payload?: JsonRecord }) => boolean, label: string): Promise<void> {
|
||||
|
||||
Reference in New Issue
Block a user