fix: 修复 Queue 已读投影和取消传播

This commit is contained in:
Codex
2026-06-09 01:55:42 +08:00
parent 424058e40b
commit 88ed2c1791
7 changed files with 132 additions and 21 deletions
+11 -4
View File
@@ -67,7 +67,7 @@ async function dispatch(args: ParsedArgs): Promise<JsonValue> {
if (group === "queue" && command === "list") return listQueueTasks(args);
if (group === "queue" && command === "show" && id) return client(args).get(`/api/v1/queue/tasks/${encodeURIComponent(id)}`);
if (group === "queue" && command === "stats") return client(args).get(`/api/v1/queue/stats${queueQuery(args)}`);
if (group === "queue" && command === "commander") return client(args).get(`/api/v1/queue/commander${queueQuery(args)}`);
if (group === "queue" && command === "commander") return client(args).get(`/api/v1/queue/commander${queueQuery(args, { readerId: true })}`);
if (group === "queue" && command === "read" && id) return client(args).post(`/api/v1/queue/tasks/${encodeURIComponent(id)}/read`, { readerId: optionalFlag(args, "reader-id") ?? "cli" });
if (group === "queue" && command === "cancel" && id) return client(args).post(`/api/v1/queue/tasks/${encodeURIComponent(id)}/cancel`, cancelBody(args));
if (group === "queue" && command === "dispatch" && id) return dispatchQueueTask(args, id);
@@ -297,9 +297,16 @@ async function listQueueTasks(args: ParsedArgs): Promise<JsonValue> {
return client(args).get(`/api/v1/queue/tasks${query ? `?${query}` : ""}`);
}
function queueQuery(args: ParsedArgs): string {
function queueQuery(args: ParsedArgs, options: { readerId?: boolean } = {}): string {
const params = new URLSearchParams();
const queue = optionalFlag(args, "queue");
return queue ? `?queue=${encodeURIComponent(queue)}` : "";
if (queue) params.set("queue", queue);
if (options.readerId) {
const readerId = optionalFlag(args, "reader-id");
if (readerId) params.set("readerId", readerId);
}
const query = params.toString();
return query ? `?${query}` : "";
}
async function dispatchQueueTask(args: ParsedArgs, taskId: string): Promise<JsonValue> {
@@ -834,7 +841,7 @@ function help(args: ParsedArgs, group?: string): JsonRecord {
"queue list [--queue <queue>] [--state <state>] [--cursor <cursor>] [--limit <limit>] [--updated-after <version>]",
"queue show <taskId>",
"queue stats [--queue <queue>]",
"queue commander [--queue <queue>]",
"queue commander [--queue <queue>] [--reader-id <reader>]",
"queue read <taskId> [--reader-id <reader>]",
"queue cancel <taskId> [--reason <text>]",
"queue dispatch <taskId> [--json-file <dispatch.json>] [--idempotency-key <key>] [--image <image>] [--namespace <namespace>]",
+11 -1
View File
@@ -31,6 +31,7 @@ export type CommandState = "pending" | "acknowledged" | "completed" | "failed" |
export type TerminalStatus = "completed" | "failed" | "blocked" | "cancelled";
export type BackendProfile = string;
export type QueueTaskState = "pending" | "running" | "completed" | "failed" | "blocked" | "cancelled";
export type QueueTaskAttentionState = "active" | "unread" | "read";
export type SessionExecutionState = "idle" | "running" | "terminal";
export type SessionAttentionState = "active" | "unread" | "read";
export type SessionListState = "default" | "running" | "unread" | "terminal" | "idle" | "all";
@@ -338,6 +339,14 @@ export interface QueueReadCursorRecord extends JsonRecord {
readAt: string;
}
export interface QueueTaskSummary extends QueueTaskRecord {
readerId: string | null;
readCursor: QueueReadCursorRecord | null;
attentionState: QueueTaskAttentionState;
unread: boolean;
active: boolean;
}
export interface QueueTaskListResult extends JsonRecord {
items: QueueTaskRecord[];
count: number;
@@ -355,8 +364,9 @@ export interface QueueStats extends JsonRecord {
export interface QueueCommanderSnapshot extends JsonRecord {
queue: string | null;
readerId: string | null;
stats: QueueStats;
items: QueueTaskRecord[];
items: QueueTaskSummary[];
generatedAt: string;
}
+30 -6
View File
@@ -6,7 +6,7 @@ import { redactJson } from "../common/redaction.js";
import type { BackendProfile, BackendTurnResult, CommandRecord, CommandState, CreateCommandInput, CreateQueueTaskInput, CreateRunInput, EventType, FailureKind, JsonRecord, JsonValue, ListGcExpiredSessionsInput, QueueCommanderSnapshot, QueueReadCursorRecord, QueueStats, QueueTaskListResult, QueueTaskRecord, QueueTaskState, RunEvent, RunnerJobRecord, RunnerRecord, RunRecord, RunStatus, SessionEventPage, SessionListResult, SessionReadCursorRecord, SessionRecord, SessionRef, SessionStoragePatch, SessionSummary, TerminalStatus, UpsertSessionInput } from "../common/types.js";
import { newId, nowIso, stableHash } from "../common/validation.js";
import type { AgentRunStore, ListQueueTasksInput, ListSessionsInput, SaveRunnerJobInput, SessionEventPageInput, StoreHealth, UpdateQueueTaskAttemptInput } from "./store.js";
import { assertSessionBoundary, buildQueueStats, buildSessionSummary, clampQueueLimit, clampSessionLimit, commandStateFromTerminal, isLeaseExpired, isSessionOutputEvent, isTerminalCommandState, isTerminalQueueTaskState, isTerminalRunStatus, parseQueueCursor, parseSessionCursor, queueTaskPayloadHash, queueTaskSort, sessionListFilters, sessionMatchesListState, sessionRefFromRecord, sessionSort, sessionTitleFromCommand, statusFromTerminal, summarizeResourceBundleRef, summarizeSessionRef, titleFromMetadata } from "./store.js";
import { assertSessionBoundary, buildQueueStats, buildQueueTaskSummary, buildSessionSummary, clampQueueLimit, clampSessionLimit, commandStateFromTerminal, isLeaseExpired, isSessionOutputEvent, isTerminalCommandState, isTerminalQueueTaskState, isTerminalRunStatus, parseQueueCursor, parseSessionCursor, queueTaskMatchesCommander, queueTaskPayloadHash, queueTaskSort, sessionListFilters, sessionMatchesListState, sessionRefFromRecord, sessionSort, sessionTitleFromCommand, statusFromTerminal, summarizeResourceBundleRef, summarizeSessionRef, titleFromMetadata } from "./store.js";
import { backendCapabilitiesSqlValues, mergeBackendCapability } from "../common/backend-profiles.js";
import { normalizeRunEventPayload, requireEventType } from "../common/events.js";
@@ -983,15 +983,20 @@ CREATE TABLE IF NOT EXISTS agentrun_schema_migrations (
}
async cancelQueueTask(taskId: string, reason = "cancel requested"): Promise<QueueTaskRecord> {
const task = await this.getQueueTask(taskId);
if (isTerminalQueueTaskState(task.state)) return task;
if (task.latestAttempt?.commandId) await this.cancelCommand(task.latestAttempt.commandId, reason);
else if (task.latestAttempt?.runId) await this.cancelRun(task.latestAttempt.runId, reason);
return this.withTransaction(async (client) => {
const existing = await client.query("SELECT * FROM agentrun_queue_tasks WHERE id = $1 FOR UPDATE", [taskId]);
const row = existing.rows[0];
if (!row) throw new AgentRunError("schema-invalid", `queue task ${taskId} was not found`, { httpStatus: 404 });
const task = queueTaskFromRow(row);
if (isTerminalQueueTaskState(task.state)) return task;
const lockedTask = queueTaskFromRow(row);
if (isTerminalQueueTaskState(lockedTask.state)) return lockedTask;
const at = nowIso();
const version = await this.nextQueueVersion(client);
const updated = await client.query("UPDATE agentrun_queue_tasks SET state = 'cancelled', version = $2, updated_at = $3, cancelled_at = $3, cancel_reason = $4 WHERE id = $1 RETURNING *", [taskId, version, at, reason]);
const latestAttempt = lockedTask.latestAttempt ? { ...lockedTask.latestAttempt, state: "cancelled" as const } : null;
const updated = await client.query("UPDATE agentrun_queue_tasks SET state = 'cancelled', latest_attempt = $2::jsonb, version = $3, updated_at = $4, cancelled_at = $4, cancel_reason = $5 WHERE id = $1 RETURNING *", [taskId, JSON.stringify(latestAttempt), version, at, reason]);
return queueTaskFromRow(updated.rows[0]);
});
}
@@ -1017,10 +1022,16 @@ CREATE TABLE IF NOT EXISTS agentrun_schema_migrations (
return buildQueueStats(await this.loadQueueTasksForProjection(queue), queue ?? null);
}
async queueCommander(queue?: string): Promise<QueueCommanderSnapshot> {
async queueCommander(queue?: string, readerId: string | null = null): Promise<QueueCommanderSnapshot> {
const tasks = await this.loadQueueTasksForProjection(queue);
const generatedAt = nowIso();
return { queue: queue ?? null, stats: buildQueueStats(tasks, queue ?? null, generatedAt), items: tasks.sort(queueTaskSort).slice(0, 20), generatedAt };
const cursors = readerId ? await this.loadQueueReadCursors(readerId, tasks.map((task) => task.id)) : new Map<string, QueueReadCursorRecord>();
const items = tasks
.map((task) => buildQueueTaskSummary(task, readerId, readerId ? cursors.get(task.id) ?? null : null))
.filter((task) => queueTaskMatchesCommander(task, readerId))
.sort(queueTaskSort)
.slice(0, 20);
return { queue: queue ?? null, readerId, stats: buildQueueStats(tasks, queue ?? null, generatedAt), items, generatedAt };
}
async backends(): Promise<JsonRecord[]> {
@@ -1079,6 +1090,15 @@ CREATE TABLE IF NOT EXISTS agentrun_schema_migrations (
}));
}
private async loadQueueReadCursors(readerId: string, taskIds: string[]): Promise<Map<string, QueueReadCursorRecord>> {
if (taskIds.length === 0) return new Map();
const result = await this.pool.query("SELECT * FROM agentrun_queue_read_cursors WHERE reader_id = $1 AND task_id = ANY($2::text[])", [readerId, taskIds]);
return new Map(result.rows.map((row) => {
const cursor = queueReadCursorFromRow(row);
return [cursor.taskId, cursor];
}));
}
private async loadQueueTasksForProjection(queue?: string): Promise<QueueTaskRecord[]> {
if (queue) {
const result = await this.pool.query("SELECT * FROM agentrun_queue_tasks WHERE queue = $1", [queue]);
@@ -1376,6 +1396,10 @@ function sessionReadCursorFromRow(row: QueryResultRow): SessionReadCursorRecord
return { sessionId: stringValue(row.session_id), readerId: stringValue(row.reader_id), sessionVersion: Number(row.session_version), readAt: iso(row.read_at) };
}
function queueReadCursorFromRow(row: QueryResultRow): QueueReadCursorRecord {
return { taskId: stringValue(row.task_id), readerId: stringValue(row.reader_id), taskVersion: Number(row.task_version), readAt: iso(row.read_at) };
}
function sessionExecutionState(value: unknown): SessionRecord["executionState"] {
if (value === "running" || value === "terminal") return value;
return "idle";
+1 -1
View File
@@ -262,7 +262,7 @@ async function route({ method, url, body, store, sourceCommit, runnerJobDefaults
return await store.markQueueTaskRead(queueTaskReadMatch[1] ?? "", readerId) as unknown as JsonValue;
}
if (method === "GET" && path === "/api/v1/queue/stats") return await store.queueStats(url.searchParams.get("queue") ?? undefined) as unknown as JsonValue;
if (method === "GET" && path === "/api/v1/queue/commander") return await store.queueCommander(url.searchParams.get("queue") ?? undefined) as unknown as JsonValue;
if (method === "GET" && path === "/api/v1/queue/commander") return await store.queueCommander(url.searchParams.get("queue") ?? undefined, url.searchParams.get("readerId")) as unknown as JsonValue;
if (method === "POST" && path === "/api/v1/runs") return await store.createRun(validateCreateRun(body)) as unknown as JsonValue;
const runMatch = path.match(/^\/api\/v1\/runs\/([^/]+)$/u);
if (method === "GET" && runMatch) return await store.getRun(runMatch[1] ?? "") as unknown as JsonValue;
+26 -6
View File
@@ -1,4 +1,4 @@
import type { BackendProfile, BackendTurnResult, CommandRecord, CreateCommandInput, CreateQueueTaskInput, CreateRunInput, FailureKind, JsonRecord, ListGcExpiredSessionsInput, QueueAttemptRef, QueueCommanderSnapshot, QueueReadCursorRecord, QueueStats, QueueTaskListResult, QueueTaskRecord, QueueTaskState, RunEvent, RunnerJobRecord, RunnerRecord, RunRecord, SessionEventPage, SessionListResult, SessionListState, SessionReadCursorRecord, SessionRecord, SessionRef, SessionStoragePatch, SessionSummary, TerminalStatus, UpsertSessionInput } from "../common/types.js";
import type { BackendProfile, BackendTurnResult, CommandRecord, CreateCommandInput, CreateQueueTaskInput, CreateRunInput, FailureKind, JsonRecord, ListGcExpiredSessionsInput, QueueAttemptRef, QueueCommanderSnapshot, QueueReadCursorRecord, QueueStats, QueueTaskListResult, QueueTaskRecord, QueueTaskState, QueueTaskSummary, RunEvent, RunnerJobRecord, RunnerRecord, RunRecord, SessionEventPage, SessionListResult, SessionListState, SessionReadCursorRecord, SessionRecord, SessionRef, SessionStoragePatch, SessionSummary, TerminalStatus, UpsertSessionInput } from "../common/types.js";
import { AgentRunError } from "../common/errors.js";
import { newId, nowIso, stableHash } from "../common/validation.js";
import { redactJson } from "../common/redaction.js";
@@ -55,7 +55,7 @@ export interface AgentRunStore {
cancelQueueTask(taskId: string, reason?: string): MaybePromise<QueueTaskRecord>;
markQueueTaskRead(taskId: string, readerId: string): MaybePromise<QueueReadCursorRecord>;
queueStats(queue?: string): MaybePromise<QueueStats>;
queueCommander(queue?: string): MaybePromise<QueueCommanderSnapshot>;
queueCommander(queue?: string, readerId?: string | null): MaybePromise<QueueCommanderSnapshot>;
backends(): MaybePromise<JsonRecord[]>;
close?(): MaybePromise<void>;
}
@@ -495,8 +495,11 @@ export class MemoryAgentRunStore implements AgentRunStore {
cancelQueueTask(taskId: string, reason = "cancel requested"): QueueTaskRecord {
const task = this.getQueueTask(taskId);
if (isTerminalQueueTaskState(task.state)) return task;
if (task.latestAttempt?.commandId) this.cancelCommand(task.latestAttempt.commandId, reason);
else if (task.latestAttempt?.runId) this.cancelRun(task.latestAttempt.runId, reason);
const at = nowIso();
const next: QueueTaskRecord = { ...task, state: "cancelled", version: this.nextQueueVersion(), updatedAt: at, cancelledAt: at, cancelReason: reason };
const latestAttempt = task.latestAttempt ? { ...task.latestAttempt, state: "cancelled" as const } : null;
const next: QueueTaskRecord = { ...task, state: "cancelled", latestAttempt, version: this.nextQueueVersion(), updatedAt: at, cancelledAt: at, cancelReason: reason };
this.queueTasks.set(taskId, next);
return next;
}
@@ -512,10 +515,15 @@ export class MemoryAgentRunStore implements AgentRunStore {
return buildQueueStats(Array.from(this.queueTasks.values()).filter((task) => !queue || task.queue === queue), queue ?? null);
}
queueCommander(queue?: string): QueueCommanderSnapshot {
const items = Array.from(this.queueTasks.values()).filter((task) => !queue || task.queue === queue).sort(queueTaskSort).slice(0, 20);
queueCommander(queue?: string, readerId: string | null = null): QueueCommanderSnapshot {
const tasks = Array.from(this.queueTasks.values()).filter((task) => !queue || task.queue === queue);
const generatedAt = nowIso();
return { queue: queue ?? null, stats: buildQueueStats(Array.from(this.queueTasks.values()).filter((task) => !queue || task.queue === queue), queue ?? null, generatedAt), items, generatedAt };
const items = tasks
.map((task) => buildQueueTaskSummary(task, readerId, readerId ? this.queueReadCursors.get(queueReadKey(task.id, readerId)) ?? null : null))
.filter((task) => queueTaskMatchesCommander(task, readerId))
.sort(queueTaskSort)
.slice(0, 20);
return { queue: queue ?? null, readerId, stats: buildQueueStats(tasks, queue ?? null, generatedAt), items, generatedAt };
}
backends(): JsonRecord[] {
@@ -686,6 +694,18 @@ export function buildSessionSummary(record: SessionRecord, readerId: string | nu
return { ...record, sessionPath: `${record.tenantId}/${record.projectId}/${record.sessionId}`, readerId, readCursor, attentionState, unread, active };
}
export function buildQueueTaskSummary(record: QueueTaskRecord, readerId: string | null, readCursor: QueueReadCursorRecord | null): QueueTaskSummary {
const active = !isTerminalQueueTaskState(record.state);
const unread = !active && (!readCursor || readCursor.taskVersion < record.version);
const attentionState = active ? "active" : unread ? "unread" : "read";
return { ...record, readerId, readCursor, attentionState, unread, active };
}
export function queueTaskMatchesCommander(task: QueueTaskSummary, readerId: string | null): boolean {
if (!readerId) return true;
return task.active || task.unread;
}
export function sessionMatchesListState(session: SessionSummary, state: SessionListState): boolean {
if (state === "all") return true;
if (state === "default") return session.active || session.unread;
+8
View File
@@ -62,6 +62,14 @@ const selfTest: SelfTestCase = async (context) => {
const commander = await client.get("/api/v1/queue/commander?queue=dev") as QueueCommanderSnapshot;
assert.equal(commander.stats.byState.cancelled, 1);
assert.equal(commander.items[0]?.id, created.id);
assert.equal(commander.items[0]?.attentionState, "unread");
const terminalRead = await client.post(`/api/v1/queue/tasks/${created.id}/read`, { readerId: "self-test" }) as QueueReadCursorRecord;
assert.equal(terminalRead.taskVersion, cancelled.version);
const readCommander = await client.get("/api/v1/queue/commander?queue=dev&readerId=self-test") as QueueCommanderSnapshot;
assert.equal(readCommander.readerId, "self-test");
assert.equal(readCommander.stats.byState.cancelled, 1);
assert.equal(readCommander.items.some((item) => item.id === created.id), false);
return { name: "queue-q1", tests: ["queue-q1-rest-memory"] };
} finally {
await new Promise<void>((resolve) => server.server.close(() => resolve()));
+45 -3
View File
@@ -93,10 +93,52 @@ console.log(JSON.stringify({ apiVersion: manifest.apiVersion, kind: manifest.kin
assert.equal(refreshed.latestAttempt?.state, "completed");
assert.equal(refreshed.latestAttempt?.runId, dispatched.run.id);
assert.equal(refreshed.latestAttempt?.sessionPath, "/api/v1/sessions/sess_queue_q2_dispatch_selftest");
const manifest = JSON.parse(await readFile(createdManifest, "utf8")) as JsonRecord;
assert.ok(JSON.stringify(manifest).includes(dispatched.run.id));
const dispatchManifest = JSON.parse(await readFile(createdManifest, "utf8")) as JsonRecord;
assert.ok(JSON.stringify(dispatchManifest).includes(dispatched.run.id));
const cancelCreated = await client.post("/api/v1/queue/tasks", {
tenantId: "unidesk",
projectId: "pikasTech/unidesk",
queue: "dev",
lane: "q2",
title: "Q2 queue cancel task",
priority: 21,
backendProfile: "codex",
providerId: "G14",
workspaceRef: { kind: "host-path", path: context.workspace },
sessionRef: { sessionId: "sess_queue_q2_cancel_selftest", metadata: { source: "queue-q2-cancel-self-test" } },
executionPolicy: {
sandbox: "workspace-write",
approval: "never",
timeoutMs: 15_000,
network: "default",
secretScope: { allowCredentialEcho: false, providerCredentials: [{ profile: "codex", secretRef: { name: "agentrun-v01-provider-codex", keys: ["auth.json", "config.toml"], mountPath: context.codexHome } }] },
},
resourceBundleRef: null,
payload: { prompt: "queue cancel hello" },
references: [{ kind: "issue", url: "https://github.com/pikasTech/agentrun/issues/105" }],
metadata: { source: "queue-q2-cancel-self-test" },
idempotencyKey: "queue-q2-cancel-self-test",
}) as QueueTaskRecord;
const cancelDispatched = await client.post(`/api/v1/queue/tasks/${cancelCreated.id}/dispatch`, { attemptId: "attempt_queue_q2_cancel_selftest" }) as QueueDispatchResult;
const cancelled = await client.post(`/api/v1/queue/tasks/${cancelCreated.id}/cancel`, { reason: "self-test queue cancel propagation" }) as QueueTaskRecord;
assert.equal(cancelled.state, "cancelled");
assert.equal(cancelled.latestAttempt?.state, "cancelled");
assert.equal(cancelled.latestAttempt?.runId, cancelDispatched.run.id);
assert.equal(cancelled.cancelReason, "self-test queue cancel propagation");
const cancelledCommand = await client.get(`/api/v1/runs/${cancelDispatched.run.id}/commands/${cancelDispatched.command.id}`) as { state?: string };
assert.equal(cancelledCommand.state, "cancelled");
const cancelledSession = await client.get("/api/v1/sessions/sess_queue_q2_cancel_selftest") as { executionState?: string; terminalStatus?: string; failureKind?: string; activeRunId?: string | null; activeCommandId?: string | null };
assert.equal(cancelledSession.executionState, "terminal");
assert.equal(cancelledSession.terminalStatus, "cancelled");
assert.equal(cancelledSession.failureKind, "cancelled");
assert.equal(cancelledSession.activeRunId, null);
assert.equal(cancelledSession.activeCommandId, null);
const cancelManifest = JSON.parse(await readFile(createdManifest, "utf8")) as JsonRecord;
assert.ok(JSON.stringify(cancelManifest).includes(cancelDispatched.run.id));
assertNoSecretLeak(dispatched);
return { name: "queue-q2-dispatch", tests: ["queue-dispatch-run-command-runner-job", "queue-refresh-from-core-status", "queue-dispatch-no-repeat"] };
assertNoSecretLeak(cancelled);
return { name: "queue-q2-dispatch", tests: ["queue-dispatch-run-command-runner-job", "queue-refresh-from-core-status", "queue-dispatch-no-repeat", "queue-cancel-propagates-to-command-session"] };
} finally {
await new Promise<void>((resolve) => server.server.close(() => resolve()));
}