feat: add AgentRun cancel lifecycle policy (#859)

Co-authored-by: Codex <codex@noreply.local>
This commit is contained in:
Lyon
2026-06-25 08:44:30 +08:00
committed by GitHub
parent 94d985d628
commit 790e5df281
10 changed files with 278 additions and 13 deletions
+45
View File
@@ -1,3 +1,5 @@
// SPEC: PJ2026-01060305 AgentRun execution policy + PJ2026-01020108 cancel lifecycle draft-2026-06-25-p0.
// Parses AgentRun YAML lane policy, including cancel lifecycle values owned by config/agentrun.yaml.
import { rootPath } from "./config";
import {
asRecord,
@@ -113,6 +115,7 @@ export interface AgentRunLaneSpec {
readonly egressProxyUrl: string | null;
readonly noProxyExtra: readonly string[];
readonly retention: AgentRunRunnerRetentionSpec;
readonly cancelLifecycle: AgentRunCancelLifecycleSpec;
};
readonly localPostgres: {
readonly enabled: boolean;
@@ -195,6 +198,19 @@ export interface AgentRunRunnerRetentionSpec {
};
}
export type AgentRunCancelLifecycleStage = "accepted" | "persisted" | "delivered" | "aborting" | "terminalized" | "fenced" | "late-write-rejected";
export interface AgentRunCancelLifecycleSpec {
readonly deliveryMode: "manager-epoch";
readonly gracefulAbortMs: number;
readonly killEscalationMs: number;
readonly staleHeartbeatFencingMs: number;
readonly lateWriteFencing: {
readonly enabled: boolean;
};
readonly eventStages: readonly AgentRunCancelLifecycleStage[];
}
export interface AgentRunLaneTarget {
readonly configPath: string;
readonly spec: AgentRunLaneSpec;
@@ -312,6 +328,7 @@ export function agentRunLaneSummary(spec: AgentRunLaneSpec): Record<string, unkn
egressProxyUrl: spec.deployment.runner.egressProxyUrl,
noProxyExtra: spec.deployment.runner.noProxyExtra,
retention: spec.deployment.runner.retention,
cancelLifecycle: spec.deployment.runner.cancelLifecycle,
},
localPostgres: spec.deployment.localPostgres,
},
@@ -551,11 +568,39 @@ function parseDeployment(input: Record<string, unknown>, path: string): AgentRun
egressProxyUrl: optionalStringField(runner, "egressProxyUrl", `${path}.runner`) ?? null,
noProxyExtra: optionalStringArrayField(runner, "noProxyExtra", `${path}.runner`),
retention: parseRunnerRetention(recordField(runner, "retention", `${path}.runner`), `${path}.runner.retention`),
cancelLifecycle: parseCancelLifecycle(recordField(runner, "cancelLifecycle", `${path}.runner`), `${path}.runner.cancelLifecycle`),
},
localPostgres: parseLocalPostgres(localPostgres, `${path}.localPostgres`),
};
}
function parseCancelLifecycle(input: Record<string, unknown>, path: string): AgentRunCancelLifecycleSpec {
const lateWriteFencing = recordField(input, "lateWriteFencing", path);
return {
deliveryMode: enumField(input, "deliveryMode", path, ["manager-epoch"]),
gracefulAbortMs: positiveIntegerField(input, "gracefulAbortMs", path),
killEscalationMs: positiveIntegerField(input, "killEscalationMs", path),
staleHeartbeatFencingMs: positiveIntegerField(input, "staleHeartbeatFencingMs", path),
lateWriteFencing: {
enabled: booleanField(lateWriteFencing, "enabled", `${path}.lateWriteFencing`),
},
eventStages: parseCancelLifecycleStages(input.eventStages, `${path}.eventStages`),
};
}
function parseCancelLifecycleStages(input: unknown, path: string): readonly AgentRunCancelLifecycleStage[] {
const values: readonly AgentRunCancelLifecycleStage[] = ["accepted", "persisted", "delivered", "aborting", "terminalized", "fenced", "late-write-rejected"];
if (!Array.isArray(input)) throw new Error(`${path} must be an array`);
if (input.length === 0) throw new Error(`${path} must declare at least one stage`);
const result = input.map((value, index) => {
if (typeof value !== "string" || !values.includes(value as AgentRunCancelLifecycleStage)) throw new Error(`${path}[${index}] must be one of ${values.join(", ")}`);
return value as AgentRunCancelLifecycleStage;
});
const duplicates = result.filter((value, index) => result.indexOf(value) !== index);
if (duplicates.length > 0) throw new Error(`${path} must not contain duplicate stages: ${[...new Set(duplicates)].join(", ")}`);
return result;
}
function parseRunnerRetention(input: Record<string, unknown>, path: string): AgentRunRunnerRetentionSpec {
const selectors = recordField(input, "selectors", path);
const ageBasedCleanup = recordField(input, "ageBasedCleanup", path);
+8
View File
@@ -1,3 +1,5 @@
// SPEC: PJ2026-01060305 AgentRun execution policy + PJ2026-01020108 cancel lifecycle draft-2026-06-25-p0.
// Renders AgentRun YAML lane policy into runtime manager environment.
import { createHash } from "node:crypto";
import type { AgentRunLaneSpec } from "./agentrun-lanes";
@@ -449,6 +451,12 @@ function managerEnv(spec: AgentRunLaneSpec, sourceCommit: string, imageRef: stri
{ name: "AGENTRUN_RUNNER_RETENTION_JOB_NAME_PREFIXES", value: spec.deployment.runner.retention.selectors.jobNamePrefixes.join(",") },
{ name: "AGENTRUN_RUNNER_RETENTION_AGE_BASED_CLEANUP_ENABLED", value: String(spec.deployment.runner.retention.ageBasedCleanup.enabled) },
...(spec.deployment.runner.retention.ageBasedCleanup.maxAgeHours === null ? [] : [{ name: "AGENTRUN_RUNNER_RETENTION_AGE_BASED_MAX_AGE_HOURS", value: String(spec.deployment.runner.retention.ageBasedCleanup.maxAgeHours) }]),
{ name: "AGENTRUN_CANCEL_DELIVERY_MODE", value: spec.deployment.runner.cancelLifecycle.deliveryMode },
{ name: "AGENTRUN_CANCEL_GRACEFUL_ABORT_MS", value: String(spec.deployment.runner.cancelLifecycle.gracefulAbortMs) },
{ name: "AGENTRUN_CANCEL_KILL_ESCALATION_MS", value: String(spec.deployment.runner.cancelLifecycle.killEscalationMs) },
{ name: "AGENTRUN_CANCEL_STALE_HEARTBEAT_FENCING_MS", value: String(spec.deployment.runner.cancelLifecycle.staleHeartbeatFencingMs) },
{ name: "AGENTRUN_CANCEL_LATE_WRITE_FENCING_ENABLED", value: String(spec.deployment.runner.cancelLifecycle.lateWriteFencing.enabled) },
{ name: "AGENTRUN_CANCEL_EVENT_STAGES", value: spec.deployment.runner.cancelLifecycle.eventStages.join(",") },
...(spec.deployment.runner.egressProxyUrl === null ? [] : [{ name: "AGENTRUN_RUNNER_EGRESS_PROXY_URL", value: spec.deployment.runner.egressProxyUrl }]),
...(spec.deployment.runner.noProxyExtra.length === 0 ? [] : [{ name: "AGENTRUN_RUNNER_NO_PROXY_EXTRA", value: spec.deployment.runner.noProxyExtra.join(",") }]),
{ name: "AGENTRUN_API_KEY", valueFrom: { secretKeyRef: spec.deployment.manager.apiKeySecretRef } },
+128 -7
View File
@@ -1,3 +1,5 @@
// SPEC: PJ2026-01020108 cancel lifecycle + PJ2026-01020305 cancel control + PJ2026-01060305 AgentRun execution policy draft-2026-06-25-p0.
// Exposes AgentRun cancel lifecycle policy and dry-run visibility in the UniDesk CLI.
import { chmodSync, copyFileSync, existsSync, readFileSync, statSync, writeFileSync } from "node:fs";
import { join } from "node:path";
import { spawnSync } from "node:child_process";
@@ -13,6 +15,7 @@ import {
agentRunPipelineRunName,
agentRunProviderCredentialRefs,
resolveAgentRunLaneTarget,
type AgentRunCancelLifecycleSpec,
type AgentRunLaneSpec,
} from "./agentrun-lanes";
import {
@@ -620,7 +623,7 @@ async function resourceCancel(config: UniDeskConfig | null, command: string, act
if (options.reason !== null) cancelArgs.push("--reason", options.reason);
if (ref.kind === "command") cancelArgs.push("--run-id", options.runId ?? requiredContext("command cancel", "--run <runId>"));
if (options.dryRun) {
const result = agentRunResourceCancelDryRunPlan(ref, options, rerunWithoutDryRun(command));
const result = agentRunResourceCancelDryRunPlan(config, ref, options, rerunWithoutDryRun(command));
return renderMutationSummary(command, result, options, `Planned cancel ${ref.kind}/${shortId(ref.name)}`, [rerunWithoutDryRun(command)]);
}
const result = ref.kind === "task"
@@ -636,21 +639,108 @@ async function resourceCancel(config: UniDeskConfig | null, command: string, act
return renderMutationSummary(command, result, options, `${options.dryRun ? "Planned cancel" : "Cancel requested"} ${ref.kind}/${shortId(ref.name)}`, options.dryRun ? [rerunWithoutDryRun(command)] : undefined);
}
function agentRunResourceCancelDryRunPlan(ref: AgentRunResourceRef, options: AgentRunResourceOptions, confirmCommand: string): Record<string, unknown> {
function agentRunResourceCancelDryRunPlan(config: UniDeskConfig | null, ref: AgentRunResourceRef, options: AgentRunResourceOptions, confirmCommand: string): Record<string, unknown> {
const body: Record<string, unknown> = {};
if (options.reason !== null) body.reason = options.reason;
if (ref.kind === "task") return agentRunDryRunPlan("task-cancel", `/api/v1/queue/tasks/${encodeURIComponent(ref.name)}/cancel`, body, confirmCommand);
if (ref.kind === "session") return agentRunDryRunPlan("session-cancel", `/api/v1/sessions/${encodeURIComponent(ref.name)}/control`, { action: "cancel", ...body }, confirmCommand);
if (ref.kind === "run") return agentRunDryRunPlan("run-cancel", `/api/v1/runs/${encodeURIComponent(ref.name)}/cancel`, body, confirmCommand);
const cancelLifecycle = agentRunCancelLifecycleDryRunDisclosure(config, ref, options);
if (ref.kind === "task") return agentRunDryRunPlan("task-cancel", `/api/v1/queue/tasks/${encodeURIComponent(ref.name)}/cancel`, body, confirmCommand, "POST", { cancelLifecycle });
if (ref.kind === "session") return agentRunDryRunPlan("session-cancel", `/api/v1/sessions/${encodeURIComponent(ref.name)}/control`, { action: "cancel", ...body }, confirmCommand, "POST", { cancelLifecycle });
if (ref.kind === "run") return agentRunDryRunPlan("run-cancel", `/api/v1/runs/${encodeURIComponent(ref.name)}/cancel`, body, confirmCommand, "POST", { cancelLifecycle });
if (ref.kind === "command") {
const runId = options.runId ?? requiredContext("command cancel", "--run <runId>");
return agentRunDryRunPlan("command-cancel", `/api/v1/commands/${encodeURIComponent(ref.name)}/cancel`, body, confirmCommand, "POST", {
commandRef: { runId, commandId: ref.name, valuesPrinted: false },
cancelLifecycle,
});
}
throw new Error("cancel supports task/<taskId>, session/<sessionId>, run/<runId>, or command/<commandId>");
}
function agentRunCancelLifecycleDryRunDisclosure(config: UniDeskConfig | null, ref: AgentRunResourceRef, options: AgentRunResourceOptions): Record<string, unknown> {
const target = resolveAgentRunCancelPolicyTarget(config, options);
const policy = target?.spec.deployment.runner.cancelLifecycle ?? null;
return {
specRefs: ["PJ2026-01020108", "PJ2026-01020305", "PJ2026-01060305"],
authority: agentRunCancelAuthorityDisclosure(target),
targetRef: {
kind: ref.kind,
name: ref.name,
runId: ref.kind === "command" ? options.runId : options.runId ?? null,
valuesPrinted: false,
},
cascadeScope: agentRunCancelCascadeScope(ref.kind),
terminalAuthority: "AgentRun Core canceled terminal/result event",
expectedStages: policy?.eventStages ?? [],
runnerAbort: policy === null ? null : agentRunCancelRunnerAbortDisclosure(policy),
fencing: agentRunCancelFencingDisclosure(policy),
verification: {
describe: `bun scripts/cli.ts agentrun describe ${ref.kind}/${ref.name}`,
events: ref.kind === "run" || options.runId !== null ? `bun scripts/cli.ts agentrun events run/${ref.kind === "run" ? ref.name : options.runId} --after-seq 0` : null,
logs: ref.kind === "session" ? `bun scripts/cli.ts agentrun logs session/${ref.name} --tail 100` : null,
result: ref.kind === "command" ? `bun scripts/cli.ts agentrun result command/${ref.name} --run ${options.runId ?? "<runId>"}` : null,
valuesPrinted: false,
},
valuesPrinted: false,
};
}
function resolveAgentRunCancelPolicyTarget(config: UniDeskConfig | null, options: AgentRunResourceOptions): { configPath: string; spec: AgentRunLaneSpec; source: "selected-lane" | "default-lane" } | null {
if (activeAgentRunRestTarget !== null) return { configPath: activeAgentRunRestTarget.configPath, spec: activeAgentRunRestTarget.spec, source: "selected-lane" };
if (config === null) return null;
const { configPath, spec } = resolveAgentRunLaneTarget({ node: options.node, lane: options.lane });
return { configPath, spec, source: options.node !== null || options.lane !== null ? "selected-lane" : "default-lane" };
}
function agentRunCancelAuthorityDisclosure(target: { configPath: string; spec: AgentRunLaneSpec; source: "selected-lane" | "default-lane" } | null): Record<string, unknown> {
const laneTarget = activeAgentRunRestTarget !== null;
return {
transport: laneTarget ? "lane-k8s-service-proxy" : "direct-http",
policySource: target?.source ?? "unavailable",
node: target?.spec.nodeId ?? null,
lane: target?.spec.lane ?? null,
namespace: target?.spec.runtime.namespace ?? null,
managerDeployment: target?.spec.runtime.managerDeployment ?? null,
baseUrl: laneTarget ? target?.spec.runtime.internalBaseUrl ?? null : agentRunDirectManagerBaseUrl(),
laneConfigPath: target?.configPath ?? null,
valuesPrinted: false,
};
}
function agentRunDirectManagerBaseUrl(): string | null {
try {
return readAgentRunClientConfig().manager.baseUrl;
} catch {
return null;
}
}
function agentRunCancelRunnerAbortDisclosure(policy: AgentRunCancelLifecycleSpec): Record<string, unknown> {
return {
deliveryMode: policy.deliveryMode,
gracefulAbortMs: policy.gracefulAbortMs,
killEscalationMs: policy.killEscalationMs,
valuesPrinted: false,
};
}
function agentRunCancelFencingDisclosure(policy: AgentRunCancelLifecycleSpec | null): Record<string, unknown> {
if (policy === null) return { cancelEpoch: true, policySource: "unavailable", valuesPrinted: false };
return {
cancelEpoch: true,
staleHeartbeatFencingMs: policy.staleHeartbeatFencingMs,
lateWriteFencing: policy.lateWriteFencing.enabled,
valuesPrinted: false,
};
}
function agentRunCancelCascadeScope(kind: AgentRunResourceKind): string {
if (kind === "task") return "current task attempt -> run -> active command -> runner job";
if (kind === "session") return "session active work -> active run/command -> session-scoped background work";
if (kind === "run") return "run active commands -> runner jobs -> run terminal";
if (kind === "command") return "single command -> current runner job; session remains reusable";
return "unsupported cancel target";
}
async function resourceDispatch(config: UniDeskConfig | null, command: string, action: string | undefined, args: string[], options: AgentRunResourceOptions): Promise<RenderedCliResult> {
const ref = parseResourceRef(action, args, "task");
if (ref.kind !== "task") throw new Error("dispatch supports task/<taskId>");
@@ -775,16 +865,47 @@ function renderMutationSummary(command: string, raw: Record<string, unknown>, op
if (id !== null) lines.push(`Name: ${id}`);
const decision = stringOrNull(data.decision);
const internalCommandType = stringOrNull(data.internalCommandType);
if (data.dryRun !== undefined) lines.push(`DryRun: ${String(data.dryRun)}`);
if (data.mutation !== undefined) lines.push(`Mutation: ${String(data.mutation)}`);
const dryRun = data.dryRun !== undefined ? data.dryRun : raw.dryRun;
const mutation = data.mutation !== undefined ? data.mutation : raw.mutation;
if (dryRun !== undefined) lines.push(`DryRun: ${String(dryRun)}`);
if (mutation !== undefined) lines.push(`Mutation: ${String(mutation)}`);
if (decision !== null) lines.push(`Decision: ${decision}`);
if (internalCommandType !== null) lines.push(`InternalCommandType: ${internalCommandType}`);
lines.push(...renderCancelLifecycleMutationLines(record(data.cancelLifecycle ?? raw.cancelLifecycle)));
const next = record(raw.next ?? data.next);
const nextLines = (overrideNextLines ?? Object.values(next).map(String)).filter((line) => line.length > 0).slice(0, 5);
if (nextLines.length > 0) lines.push("", "Next:", ...nextLines.map((line) => ` ${line}`));
return renderedCliResult(raw.ok !== false, command, lines.join("\n"));
}
function renderCancelLifecycleMutationLines(lifecycle: Record<string, unknown>): string[] {
if (Object.keys(lifecycle).length === 0) return [];
const authority = record(lifecycle.authority);
const runnerAbort = record(lifecycle.runnerAbort);
const fencing = record(lifecycle.fencing);
const expectedStages = Array.isArray(lifecycle.expectedStages) ? lifecycle.expectedStages.map(String).filter((value) => value.length > 0) : [];
const node = stringOrNull(authority.node);
const lane = stringOrNull(authority.lane);
const target = node !== null && lane !== null ? `${node}/${lane}` : "-";
const lines = ["", "CancelLifecycle:"];
lines.push(` Authority: ${displayValue(authority.transport)} policy=${displayValue(authority.policySource)} lane=${target}`);
const namespace = stringOrNull(authority.namespace);
const deployment = stringOrNull(authority.managerDeployment);
if (namespace !== null || deployment !== null) lines.push(` Runtime: ns=${displayValue(namespace)} manager=${displayValue(deployment)}`);
const cascadeScope = stringOrNull(lifecycle.cascadeScope);
if (cascadeScope !== null) lines.push(` Cascade: ${cascadeScope}`);
if (Object.keys(runnerAbort).length > 0) {
lines.push(` RunnerAbort: mode=${displayValue(runnerAbort.deliveryMode)} gracefulMs=${displayValue(runnerAbort.gracefulAbortMs)} killMs=${displayValue(runnerAbort.killEscalationMs)}`);
}
if (Object.keys(fencing).length > 0) {
lines.push(` Fencing: cancelEpoch=${displayValue(fencing.cancelEpoch)} staleHeartbeatMs=${displayValue(fencing.staleHeartbeatFencingMs)} lateWrite=${displayValue(fencing.lateWriteFencing)}`);
}
if (expectedStages.length > 0) lines.push(` Stages: ${expectedStages.join(", ")}`);
const terminalAuthority = stringOrNull(lifecycle.terminalAuthority);
if (terminalAuthority !== null) lines.push(` Terminal: ${terminalAuthority}`);
return lines;
}
function rerunWithoutDryRun(command: string): string {
return `bun scripts/cli.ts ${command.replace(/\s+--dry-run\b/gu, "").trim()}`;
}