Merge pull request #109 from pikasTech/fix/issue105-queue-read-cancel
fix: 修复 Queue 已读投影和取消传播
This commit is contained in:
+11
-4
@@ -67,7 +67,7 @@ async function dispatch(args: ParsedArgs): Promise<JsonValue> {
|
||||
if (group === "queue" && command === "list") return listQueueTasks(args);
|
||||
if (group === "queue" && command === "show" && id) return client(args).get(`/api/v1/queue/tasks/${encodeURIComponent(id)}`);
|
||||
if (group === "queue" && command === "stats") return client(args).get(`/api/v1/queue/stats${queueQuery(args)}`);
|
||||
if (group === "queue" && command === "commander") return client(args).get(`/api/v1/queue/commander${queueQuery(args)}`);
|
||||
if (group === "queue" && command === "commander") return client(args).get(`/api/v1/queue/commander${queueQuery(args, { readerId: true })}`);
|
||||
if (group === "queue" && command === "read" && id) return client(args).post(`/api/v1/queue/tasks/${encodeURIComponent(id)}/read`, { readerId: optionalFlag(args, "reader-id") ?? "cli" });
|
||||
if (group === "queue" && command === "cancel" && id) return client(args).post(`/api/v1/queue/tasks/${encodeURIComponent(id)}/cancel`, cancelBody(args));
|
||||
if (group === "queue" && command === "dispatch" && id) return dispatchQueueTask(args, id);
|
||||
@@ -297,9 +297,16 @@ async function listQueueTasks(args: ParsedArgs): Promise<JsonValue> {
|
||||
return client(args).get(`/api/v1/queue/tasks${query ? `?${query}` : ""}`);
|
||||
}
|
||||
|
||||
function queueQuery(args: ParsedArgs): string {
|
||||
function queueQuery(args: ParsedArgs, options: { readerId?: boolean } = {}): string {
|
||||
const params = new URLSearchParams();
|
||||
const queue = optionalFlag(args, "queue");
|
||||
return queue ? `?queue=${encodeURIComponent(queue)}` : "";
|
||||
if (queue) params.set("queue", queue);
|
||||
if (options.readerId) {
|
||||
const readerId = optionalFlag(args, "reader-id");
|
||||
if (readerId) params.set("readerId", readerId);
|
||||
}
|
||||
const query = params.toString();
|
||||
return query ? `?${query}` : "";
|
||||
}
|
||||
|
||||
async function dispatchQueueTask(args: ParsedArgs, taskId: string): Promise<JsonValue> {
|
||||
@@ -834,7 +841,7 @@ function help(args: ParsedArgs, group?: string): JsonRecord {
|
||||
"queue list [--queue <queue>] [--state <state>] [--cursor <cursor>] [--limit <limit>] [--updated-after <version>]",
|
||||
"queue show <taskId>",
|
||||
"queue stats [--queue <queue>]",
|
||||
"queue commander [--queue <queue>]",
|
||||
"queue commander [--queue <queue>] [--reader-id <reader>]",
|
||||
"queue read <taskId> [--reader-id <reader>]",
|
||||
"queue cancel <taskId> [--reason <text>]",
|
||||
"queue dispatch <taskId> [--json-file <dispatch.json>] [--idempotency-key <key>] [--image <image>] [--namespace <namespace>]",
|
||||
|
||||
+11
-1
@@ -31,6 +31,7 @@ export type CommandState = "pending" | "acknowledged" | "completed" | "failed" |
|
||||
export type TerminalStatus = "completed" | "failed" | "blocked" | "cancelled";
|
||||
export type BackendProfile = string;
|
||||
export type QueueTaskState = "pending" | "running" | "completed" | "failed" | "blocked" | "cancelled";
|
||||
export type QueueTaskAttentionState = "active" | "unread" | "read";
|
||||
export type SessionExecutionState = "idle" | "running" | "terminal";
|
||||
export type SessionAttentionState = "active" | "unread" | "read";
|
||||
export type SessionListState = "default" | "running" | "unread" | "terminal" | "idle" | "all";
|
||||
@@ -338,6 +339,14 @@ export interface QueueReadCursorRecord extends JsonRecord {
|
||||
readAt: string;
|
||||
}
|
||||
|
||||
export interface QueueTaskSummary extends QueueTaskRecord {
|
||||
readerId: string | null;
|
||||
readCursor: QueueReadCursorRecord | null;
|
||||
attentionState: QueueTaskAttentionState;
|
||||
unread: boolean;
|
||||
active: boolean;
|
||||
}
|
||||
|
||||
export interface QueueTaskListResult extends JsonRecord {
|
||||
items: QueueTaskRecord[];
|
||||
count: number;
|
||||
@@ -355,8 +364,9 @@ export interface QueueStats extends JsonRecord {
|
||||
|
||||
export interface QueueCommanderSnapshot extends JsonRecord {
|
||||
queue: string | null;
|
||||
readerId: string | null;
|
||||
stats: QueueStats;
|
||||
items: QueueTaskRecord[];
|
||||
items: QueueTaskSummary[];
|
||||
generatedAt: string;
|
||||
}
|
||||
|
||||
|
||||
@@ -6,7 +6,7 @@ import { redactJson } from "../common/redaction.js";
|
||||
import type { BackendProfile, BackendTurnResult, CommandRecord, CommandState, CreateCommandInput, CreateQueueTaskInput, CreateRunInput, EventType, FailureKind, JsonRecord, JsonValue, ListGcExpiredSessionsInput, QueueCommanderSnapshot, QueueReadCursorRecord, QueueStats, QueueTaskListResult, QueueTaskRecord, QueueTaskState, RunEvent, RunnerJobRecord, RunnerRecord, RunRecord, RunStatus, SessionEventPage, SessionListResult, SessionReadCursorRecord, SessionRecord, SessionRef, SessionStoragePatch, SessionSummary, TerminalStatus, UpsertSessionInput } from "../common/types.js";
|
||||
import { newId, nowIso, stableHash } from "../common/validation.js";
|
||||
import type { AgentRunStore, ListQueueTasksInput, ListSessionsInput, SaveRunnerJobInput, SessionEventPageInput, StoreHealth, UpdateQueueTaskAttemptInput } from "./store.js";
|
||||
import { assertSessionBoundary, buildQueueStats, buildSessionSummary, clampQueueLimit, clampSessionLimit, commandStateFromTerminal, isLeaseExpired, isSessionOutputEvent, isTerminalCommandState, isTerminalQueueTaskState, isTerminalRunStatus, parseQueueCursor, parseSessionCursor, queueTaskPayloadHash, queueTaskSort, sessionListFilters, sessionMatchesListState, sessionRefFromRecord, sessionSort, sessionTitleFromCommand, statusFromTerminal, summarizeResourceBundleRef, summarizeSessionRef, titleFromMetadata } from "./store.js";
|
||||
import { assertSessionBoundary, buildQueueStats, buildQueueTaskSummary, buildSessionSummary, clampQueueLimit, clampSessionLimit, commandStateFromTerminal, isLeaseExpired, isSessionOutputEvent, isTerminalCommandState, isTerminalQueueTaskState, isTerminalRunStatus, parseQueueCursor, parseSessionCursor, queueTaskMatchesCommander, queueTaskPayloadHash, queueTaskSort, sessionListFilters, sessionMatchesListState, sessionRefFromRecord, sessionSort, sessionTitleFromCommand, statusFromTerminal, summarizeResourceBundleRef, summarizeSessionRef, titleFromMetadata } from "./store.js";
|
||||
import { backendCapabilitiesSqlValues, mergeBackendCapability } from "../common/backend-profiles.js";
|
||||
import { normalizeRunEventPayload, requireEventType } from "../common/events.js";
|
||||
|
||||
@@ -983,15 +983,20 @@ CREATE TABLE IF NOT EXISTS agentrun_schema_migrations (
|
||||
}
|
||||
|
||||
async cancelQueueTask(taskId: string, reason = "cancel requested"): Promise<QueueTaskRecord> {
|
||||
const task = await this.getQueueTask(taskId);
|
||||
if (isTerminalQueueTaskState(task.state)) return task;
|
||||
if (task.latestAttempt?.commandId) await this.cancelCommand(task.latestAttempt.commandId, reason);
|
||||
else if (task.latestAttempt?.runId) await this.cancelRun(task.latestAttempt.runId, reason);
|
||||
return this.withTransaction(async (client) => {
|
||||
const existing = await client.query("SELECT * FROM agentrun_queue_tasks WHERE id = $1 FOR UPDATE", [taskId]);
|
||||
const row = existing.rows[0];
|
||||
if (!row) throw new AgentRunError("schema-invalid", `queue task ${taskId} was not found`, { httpStatus: 404 });
|
||||
const task = queueTaskFromRow(row);
|
||||
if (isTerminalQueueTaskState(task.state)) return task;
|
||||
const lockedTask = queueTaskFromRow(row);
|
||||
if (isTerminalQueueTaskState(lockedTask.state)) return lockedTask;
|
||||
const at = nowIso();
|
||||
const version = await this.nextQueueVersion(client);
|
||||
const updated = await client.query("UPDATE agentrun_queue_tasks SET state = 'cancelled', version = $2, updated_at = $3, cancelled_at = $3, cancel_reason = $4 WHERE id = $1 RETURNING *", [taskId, version, at, reason]);
|
||||
const latestAttempt = lockedTask.latestAttempt ? { ...lockedTask.latestAttempt, state: "cancelled" as const } : null;
|
||||
const updated = await client.query("UPDATE agentrun_queue_tasks SET state = 'cancelled', latest_attempt = $2::jsonb, version = $3, updated_at = $4, cancelled_at = $4, cancel_reason = $5 WHERE id = $1 RETURNING *", [taskId, JSON.stringify(latestAttempt), version, at, reason]);
|
||||
return queueTaskFromRow(updated.rows[0]);
|
||||
});
|
||||
}
|
||||
@@ -1017,10 +1022,16 @@ CREATE TABLE IF NOT EXISTS agentrun_schema_migrations (
|
||||
return buildQueueStats(await this.loadQueueTasksForProjection(queue), queue ?? null);
|
||||
}
|
||||
|
||||
async queueCommander(queue?: string): Promise<QueueCommanderSnapshot> {
|
||||
async queueCommander(queue?: string, readerId: string | null = null): Promise<QueueCommanderSnapshot> {
|
||||
const tasks = await this.loadQueueTasksForProjection(queue);
|
||||
const generatedAt = nowIso();
|
||||
return { queue: queue ?? null, stats: buildQueueStats(tasks, queue ?? null, generatedAt), items: tasks.sort(queueTaskSort).slice(0, 20), generatedAt };
|
||||
const cursors = readerId ? await this.loadQueueReadCursors(readerId, tasks.map((task) => task.id)) : new Map<string, QueueReadCursorRecord>();
|
||||
const items = tasks
|
||||
.map((task) => buildQueueTaskSummary(task, readerId, readerId ? cursors.get(task.id) ?? null : null))
|
||||
.filter((task) => queueTaskMatchesCommander(task, readerId))
|
||||
.sort(queueTaskSort)
|
||||
.slice(0, 20);
|
||||
return { queue: queue ?? null, readerId, stats: buildQueueStats(tasks, queue ?? null, generatedAt), items, generatedAt };
|
||||
}
|
||||
|
||||
async backends(): Promise<JsonRecord[]> {
|
||||
@@ -1079,6 +1090,15 @@ CREATE TABLE IF NOT EXISTS agentrun_schema_migrations (
|
||||
}));
|
||||
}
|
||||
|
||||
private async loadQueueReadCursors(readerId: string, taskIds: string[]): Promise<Map<string, QueueReadCursorRecord>> {
|
||||
if (taskIds.length === 0) return new Map();
|
||||
const result = await this.pool.query("SELECT * FROM agentrun_queue_read_cursors WHERE reader_id = $1 AND task_id = ANY($2::text[])", [readerId, taskIds]);
|
||||
return new Map(result.rows.map((row) => {
|
||||
const cursor = queueReadCursorFromRow(row);
|
||||
return [cursor.taskId, cursor];
|
||||
}));
|
||||
}
|
||||
|
||||
private async loadQueueTasksForProjection(queue?: string): Promise<QueueTaskRecord[]> {
|
||||
if (queue) {
|
||||
const result = await this.pool.query("SELECT * FROM agentrun_queue_tasks WHERE queue = $1", [queue]);
|
||||
@@ -1376,6 +1396,10 @@ function sessionReadCursorFromRow(row: QueryResultRow): SessionReadCursorRecord
|
||||
return { sessionId: stringValue(row.session_id), readerId: stringValue(row.reader_id), sessionVersion: Number(row.session_version), readAt: iso(row.read_at) };
|
||||
}
|
||||
|
||||
function queueReadCursorFromRow(row: QueryResultRow): QueueReadCursorRecord {
|
||||
return { taskId: stringValue(row.task_id), readerId: stringValue(row.reader_id), taskVersion: Number(row.task_version), readAt: iso(row.read_at) };
|
||||
}
|
||||
|
||||
function sessionExecutionState(value: unknown): SessionRecord["executionState"] {
|
||||
if (value === "running" || value === "terminal") return value;
|
||||
return "idle";
|
||||
|
||||
+1
-1
@@ -262,7 +262,7 @@ async function route({ method, url, body, store, sourceCommit, runnerJobDefaults
|
||||
return await store.markQueueTaskRead(queueTaskReadMatch[1] ?? "", readerId) as unknown as JsonValue;
|
||||
}
|
||||
if (method === "GET" && path === "/api/v1/queue/stats") return await store.queueStats(url.searchParams.get("queue") ?? undefined) as unknown as JsonValue;
|
||||
if (method === "GET" && path === "/api/v1/queue/commander") return await store.queueCommander(url.searchParams.get("queue") ?? undefined) as unknown as JsonValue;
|
||||
if (method === "GET" && path === "/api/v1/queue/commander") return await store.queueCommander(url.searchParams.get("queue") ?? undefined, url.searchParams.get("readerId")) as unknown as JsonValue;
|
||||
if (method === "POST" && path === "/api/v1/runs") return await store.createRun(validateCreateRun(body)) as unknown as JsonValue;
|
||||
const runMatch = path.match(/^\/api\/v1\/runs\/([^/]+)$/u);
|
||||
if (method === "GET" && runMatch) return await store.getRun(runMatch[1] ?? "") as unknown as JsonValue;
|
||||
|
||||
+26
-6
@@ -1,4 +1,4 @@
|
||||
import type { BackendProfile, BackendTurnResult, CommandRecord, CreateCommandInput, CreateQueueTaskInput, CreateRunInput, FailureKind, JsonRecord, ListGcExpiredSessionsInput, QueueAttemptRef, QueueCommanderSnapshot, QueueReadCursorRecord, QueueStats, QueueTaskListResult, QueueTaskRecord, QueueTaskState, RunEvent, RunnerJobRecord, RunnerRecord, RunRecord, SessionEventPage, SessionListResult, SessionListState, SessionReadCursorRecord, SessionRecord, SessionRef, SessionStoragePatch, SessionSummary, TerminalStatus, UpsertSessionInput } from "../common/types.js";
|
||||
import type { BackendProfile, BackendTurnResult, CommandRecord, CreateCommandInput, CreateQueueTaskInput, CreateRunInput, FailureKind, JsonRecord, ListGcExpiredSessionsInput, QueueAttemptRef, QueueCommanderSnapshot, QueueReadCursorRecord, QueueStats, QueueTaskListResult, QueueTaskRecord, QueueTaskState, QueueTaskSummary, RunEvent, RunnerJobRecord, RunnerRecord, RunRecord, SessionEventPage, SessionListResult, SessionListState, SessionReadCursorRecord, SessionRecord, SessionRef, SessionStoragePatch, SessionSummary, TerminalStatus, UpsertSessionInput } from "../common/types.js";
|
||||
import { AgentRunError } from "../common/errors.js";
|
||||
import { newId, nowIso, stableHash } from "../common/validation.js";
|
||||
import { redactJson } from "../common/redaction.js";
|
||||
@@ -55,7 +55,7 @@ export interface AgentRunStore {
|
||||
cancelQueueTask(taskId: string, reason?: string): MaybePromise<QueueTaskRecord>;
|
||||
markQueueTaskRead(taskId: string, readerId: string): MaybePromise<QueueReadCursorRecord>;
|
||||
queueStats(queue?: string): MaybePromise<QueueStats>;
|
||||
queueCommander(queue?: string): MaybePromise<QueueCommanderSnapshot>;
|
||||
queueCommander(queue?: string, readerId?: string | null): MaybePromise<QueueCommanderSnapshot>;
|
||||
backends(): MaybePromise<JsonRecord[]>;
|
||||
close?(): MaybePromise<void>;
|
||||
}
|
||||
@@ -495,8 +495,11 @@ export class MemoryAgentRunStore implements AgentRunStore {
|
||||
cancelQueueTask(taskId: string, reason = "cancel requested"): QueueTaskRecord {
|
||||
const task = this.getQueueTask(taskId);
|
||||
if (isTerminalQueueTaskState(task.state)) return task;
|
||||
if (task.latestAttempt?.commandId) this.cancelCommand(task.latestAttempt.commandId, reason);
|
||||
else if (task.latestAttempt?.runId) this.cancelRun(task.latestAttempt.runId, reason);
|
||||
const at = nowIso();
|
||||
const next: QueueTaskRecord = { ...task, state: "cancelled", version: this.nextQueueVersion(), updatedAt: at, cancelledAt: at, cancelReason: reason };
|
||||
const latestAttempt = task.latestAttempt ? { ...task.latestAttempt, state: "cancelled" as const } : null;
|
||||
const next: QueueTaskRecord = { ...task, state: "cancelled", latestAttempt, version: this.nextQueueVersion(), updatedAt: at, cancelledAt: at, cancelReason: reason };
|
||||
this.queueTasks.set(taskId, next);
|
||||
return next;
|
||||
}
|
||||
@@ -512,10 +515,15 @@ export class MemoryAgentRunStore implements AgentRunStore {
|
||||
return buildQueueStats(Array.from(this.queueTasks.values()).filter((task) => !queue || task.queue === queue), queue ?? null);
|
||||
}
|
||||
|
||||
queueCommander(queue?: string): QueueCommanderSnapshot {
|
||||
const items = Array.from(this.queueTasks.values()).filter((task) => !queue || task.queue === queue).sort(queueTaskSort).slice(0, 20);
|
||||
queueCommander(queue?: string, readerId: string | null = null): QueueCommanderSnapshot {
|
||||
const tasks = Array.from(this.queueTasks.values()).filter((task) => !queue || task.queue === queue);
|
||||
const generatedAt = nowIso();
|
||||
return { queue: queue ?? null, stats: buildQueueStats(Array.from(this.queueTasks.values()).filter((task) => !queue || task.queue === queue), queue ?? null, generatedAt), items, generatedAt };
|
||||
const items = tasks
|
||||
.map((task) => buildQueueTaskSummary(task, readerId, readerId ? this.queueReadCursors.get(queueReadKey(task.id, readerId)) ?? null : null))
|
||||
.filter((task) => queueTaskMatchesCommander(task, readerId))
|
||||
.sort(queueTaskSort)
|
||||
.slice(0, 20);
|
||||
return { queue: queue ?? null, readerId, stats: buildQueueStats(tasks, queue ?? null, generatedAt), items, generatedAt };
|
||||
}
|
||||
|
||||
backends(): JsonRecord[] {
|
||||
@@ -686,6 +694,18 @@ export function buildSessionSummary(record: SessionRecord, readerId: string | nu
|
||||
return { ...record, sessionPath: `${record.tenantId}/${record.projectId}/${record.sessionId}`, readerId, readCursor, attentionState, unread, active };
|
||||
}
|
||||
|
||||
export function buildQueueTaskSummary(record: QueueTaskRecord, readerId: string | null, readCursor: QueueReadCursorRecord | null): QueueTaskSummary {
|
||||
const active = !isTerminalQueueTaskState(record.state);
|
||||
const unread = !active && (!readCursor || readCursor.taskVersion < record.version);
|
||||
const attentionState = active ? "active" : unread ? "unread" : "read";
|
||||
return { ...record, readerId, readCursor, attentionState, unread, active };
|
||||
}
|
||||
|
||||
export function queueTaskMatchesCommander(task: QueueTaskSummary, readerId: string | null): boolean {
|
||||
if (!readerId) return true;
|
||||
return task.active || task.unread;
|
||||
}
|
||||
|
||||
export function sessionMatchesListState(session: SessionSummary, state: SessionListState): boolean {
|
||||
if (state === "all") return true;
|
||||
if (state === "default") return session.active || session.unread;
|
||||
|
||||
@@ -62,6 +62,14 @@ const selfTest: SelfTestCase = async (context) => {
|
||||
const commander = await client.get("/api/v1/queue/commander?queue=dev") as QueueCommanderSnapshot;
|
||||
assert.equal(commander.stats.byState.cancelled, 1);
|
||||
assert.equal(commander.items[0]?.id, created.id);
|
||||
assert.equal(commander.items[0]?.attentionState, "unread");
|
||||
|
||||
const terminalRead = await client.post(`/api/v1/queue/tasks/${created.id}/read`, { readerId: "self-test" }) as QueueReadCursorRecord;
|
||||
assert.equal(terminalRead.taskVersion, cancelled.version);
|
||||
const readCommander = await client.get("/api/v1/queue/commander?queue=dev&readerId=self-test") as QueueCommanderSnapshot;
|
||||
assert.equal(readCommander.readerId, "self-test");
|
||||
assert.equal(readCommander.stats.byState.cancelled, 1);
|
||||
assert.equal(readCommander.items.some((item) => item.id === created.id), false);
|
||||
return { name: "queue-q1", tests: ["queue-q1-rest-memory"] };
|
||||
} finally {
|
||||
await new Promise<void>((resolve) => server.server.close(() => resolve()));
|
||||
|
||||
@@ -93,10 +93,52 @@ console.log(JSON.stringify({ apiVersion: manifest.apiVersion, kind: manifest.kin
|
||||
assert.equal(refreshed.latestAttempt?.state, "completed");
|
||||
assert.equal(refreshed.latestAttempt?.runId, dispatched.run.id);
|
||||
assert.equal(refreshed.latestAttempt?.sessionPath, "/api/v1/sessions/sess_queue_q2_dispatch_selftest");
|
||||
const manifest = JSON.parse(await readFile(createdManifest, "utf8")) as JsonRecord;
|
||||
assert.ok(JSON.stringify(manifest).includes(dispatched.run.id));
|
||||
const dispatchManifest = JSON.parse(await readFile(createdManifest, "utf8")) as JsonRecord;
|
||||
assert.ok(JSON.stringify(dispatchManifest).includes(dispatched.run.id));
|
||||
|
||||
const cancelCreated = await client.post("/api/v1/queue/tasks", {
|
||||
tenantId: "unidesk",
|
||||
projectId: "pikasTech/unidesk",
|
||||
queue: "dev",
|
||||
lane: "q2",
|
||||
title: "Q2 queue cancel task",
|
||||
priority: 21,
|
||||
backendProfile: "codex",
|
||||
providerId: "G14",
|
||||
workspaceRef: { kind: "host-path", path: context.workspace },
|
||||
sessionRef: { sessionId: "sess_queue_q2_cancel_selftest", metadata: { source: "queue-q2-cancel-self-test" } },
|
||||
executionPolicy: {
|
||||
sandbox: "workspace-write",
|
||||
approval: "never",
|
||||
timeoutMs: 15_000,
|
||||
network: "default",
|
||||
secretScope: { allowCredentialEcho: false, providerCredentials: [{ profile: "codex", secretRef: { name: "agentrun-v01-provider-codex", keys: ["auth.json", "config.toml"], mountPath: context.codexHome } }] },
|
||||
},
|
||||
resourceBundleRef: null,
|
||||
payload: { prompt: "queue cancel hello" },
|
||||
references: [{ kind: "issue", url: "https://github.com/pikasTech/agentrun/issues/105" }],
|
||||
metadata: { source: "queue-q2-cancel-self-test" },
|
||||
idempotencyKey: "queue-q2-cancel-self-test",
|
||||
}) as QueueTaskRecord;
|
||||
const cancelDispatched = await client.post(`/api/v1/queue/tasks/${cancelCreated.id}/dispatch`, { attemptId: "attempt_queue_q2_cancel_selftest" }) as QueueDispatchResult;
|
||||
const cancelled = await client.post(`/api/v1/queue/tasks/${cancelCreated.id}/cancel`, { reason: "self-test queue cancel propagation" }) as QueueTaskRecord;
|
||||
assert.equal(cancelled.state, "cancelled");
|
||||
assert.equal(cancelled.latestAttempt?.state, "cancelled");
|
||||
assert.equal(cancelled.latestAttempt?.runId, cancelDispatched.run.id);
|
||||
assert.equal(cancelled.cancelReason, "self-test queue cancel propagation");
|
||||
const cancelledCommand = await client.get(`/api/v1/runs/${cancelDispatched.run.id}/commands/${cancelDispatched.command.id}`) as { state?: string };
|
||||
assert.equal(cancelledCommand.state, "cancelled");
|
||||
const cancelledSession = await client.get("/api/v1/sessions/sess_queue_q2_cancel_selftest") as { executionState?: string; terminalStatus?: string; failureKind?: string; activeRunId?: string | null; activeCommandId?: string | null };
|
||||
assert.equal(cancelledSession.executionState, "terminal");
|
||||
assert.equal(cancelledSession.terminalStatus, "cancelled");
|
||||
assert.equal(cancelledSession.failureKind, "cancelled");
|
||||
assert.equal(cancelledSession.activeRunId, null);
|
||||
assert.equal(cancelledSession.activeCommandId, null);
|
||||
const cancelManifest = JSON.parse(await readFile(createdManifest, "utf8")) as JsonRecord;
|
||||
assert.ok(JSON.stringify(cancelManifest).includes(cancelDispatched.run.id));
|
||||
assertNoSecretLeak(dispatched);
|
||||
return { name: "queue-q2-dispatch", tests: ["queue-dispatch-run-command-runner-job", "queue-refresh-from-core-status", "queue-dispatch-no-repeat"] };
|
||||
assertNoSecretLeak(cancelled);
|
||||
return { name: "queue-q2-dispatch", tests: ["queue-dispatch-run-command-runner-job", "queue-refresh-from-core-status", "queue-dispatch-no-repeat", "queue-cancel-propagates-to-command-session"] };
|
||||
} finally {
|
||||
await new Promise<void>((resolve) => server.server.close(() => resolve()));
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user