diff --git a/scripts/src/cli.ts b/scripts/src/cli.ts index e56f1d0..c2cdeef 100644 --- a/scripts/src/cli.ts +++ b/scripts/src/cli.ts @@ -67,7 +67,7 @@ async function dispatch(args: ParsedArgs): Promise { if (group === "queue" && command === "list") return listQueueTasks(args); if (group === "queue" && command === "show" && id) return client(args).get(`/api/v1/queue/tasks/${encodeURIComponent(id)}`); if (group === "queue" && command === "stats") return client(args).get(`/api/v1/queue/stats${queueQuery(args)}`); - if (group === "queue" && command === "commander") return client(args).get(`/api/v1/queue/commander${queueQuery(args)}`); + if (group === "queue" && command === "commander") return client(args).get(`/api/v1/queue/commander${queueQuery(args, { readerId: true })}`); if (group === "queue" && command === "read" && id) return client(args).post(`/api/v1/queue/tasks/${encodeURIComponent(id)}/read`, { readerId: optionalFlag(args, "reader-id") ?? "cli" }); if (group === "queue" && command === "cancel" && id) return client(args).post(`/api/v1/queue/tasks/${encodeURIComponent(id)}/cancel`, cancelBody(args)); if (group === "queue" && command === "dispatch" && id) return dispatchQueueTask(args, id); @@ -297,9 +297,16 @@ async function listQueueTasks(args: ParsedArgs): Promise { return client(args).get(`/api/v1/queue/tasks${query ? `?${query}` : ""}`); } -function queueQuery(args: ParsedArgs): string { +function queueQuery(args: ParsedArgs, options: { readerId?: boolean } = {}): string { + const params = new URLSearchParams(); const queue = optionalFlag(args, "queue"); - return queue ? `?queue=${encodeURIComponent(queue)}` : ""; + if (queue) params.set("queue", queue); + if (options.readerId) { + const readerId = optionalFlag(args, "reader-id"); + if (readerId) params.set("readerId", readerId); + } + const query = params.toString(); + return query ? `?${query}` : ""; } async function dispatchQueueTask(args: ParsedArgs, taskId: string): Promise { @@ -834,7 +841,7 @@ function help(args: ParsedArgs, group?: string): JsonRecord { "queue list [--queue ] [--state ] [--cursor ] [--limit ] [--updated-after ]", "queue show ", "queue stats [--queue ]", - "queue commander [--queue ]", + "queue commander [--queue ] [--reader-id ]", "queue read [--reader-id ]", "queue cancel [--reason ]", "queue dispatch [--json-file ] [--idempotency-key ] [--image ] [--namespace ]", diff --git a/src/common/types.ts b/src/common/types.ts index d4959d3..60a15f6 100644 --- a/src/common/types.ts +++ b/src/common/types.ts @@ -31,6 +31,7 @@ export type CommandState = "pending" | "acknowledged" | "completed" | "failed" | export type TerminalStatus = "completed" | "failed" | "blocked" | "cancelled"; export type BackendProfile = string; export type QueueTaskState = "pending" | "running" | "completed" | "failed" | "blocked" | "cancelled"; +export type QueueTaskAttentionState = "active" | "unread" | "read"; export type SessionExecutionState = "idle" | "running" | "terminal"; export type SessionAttentionState = "active" | "unread" | "read"; export type SessionListState = "default" | "running" | "unread" | "terminal" | "idle" | "all"; @@ -338,6 +339,14 @@ export interface QueueReadCursorRecord extends JsonRecord { readAt: string; } +export interface QueueTaskSummary extends QueueTaskRecord { + readerId: string | null; + readCursor: QueueReadCursorRecord | null; + attentionState: QueueTaskAttentionState; + unread: boolean; + active: boolean; +} + export interface QueueTaskListResult extends JsonRecord { items: QueueTaskRecord[]; count: number; @@ -355,8 +364,9 @@ export interface QueueStats extends JsonRecord { export interface QueueCommanderSnapshot extends JsonRecord { queue: string | null; + readerId: string | null; stats: QueueStats; - items: QueueTaskRecord[]; + items: QueueTaskSummary[]; generatedAt: string; } diff --git a/src/mgr/postgres-store.ts b/src/mgr/postgres-store.ts index 489ab08..ca19f09 100644 --- a/src/mgr/postgres-store.ts +++ b/src/mgr/postgres-store.ts @@ -6,7 +6,7 @@ import { redactJson } from "../common/redaction.js"; import type { BackendProfile, BackendTurnResult, CommandRecord, CommandState, CreateCommandInput, CreateQueueTaskInput, CreateRunInput, EventType, FailureKind, JsonRecord, JsonValue, ListGcExpiredSessionsInput, QueueCommanderSnapshot, QueueReadCursorRecord, QueueStats, QueueTaskListResult, QueueTaskRecord, QueueTaskState, RunEvent, RunnerJobRecord, RunnerRecord, RunRecord, RunStatus, SessionEventPage, SessionListResult, SessionReadCursorRecord, SessionRecord, SessionRef, SessionStoragePatch, SessionSummary, TerminalStatus, UpsertSessionInput } from "../common/types.js"; import { newId, nowIso, stableHash } from "../common/validation.js"; import type { AgentRunStore, ListQueueTasksInput, ListSessionsInput, SaveRunnerJobInput, SessionEventPageInput, StoreHealth, UpdateQueueTaskAttemptInput } from "./store.js"; -import { assertSessionBoundary, buildQueueStats, buildSessionSummary, clampQueueLimit, clampSessionLimit, commandStateFromTerminal, isLeaseExpired, isSessionOutputEvent, isTerminalCommandState, isTerminalQueueTaskState, isTerminalRunStatus, parseQueueCursor, parseSessionCursor, queueTaskPayloadHash, queueTaskSort, sessionListFilters, sessionMatchesListState, sessionRefFromRecord, sessionSort, sessionTitleFromCommand, statusFromTerminal, summarizeResourceBundleRef, summarizeSessionRef, titleFromMetadata } from "./store.js"; +import { assertSessionBoundary, buildQueueStats, buildQueueTaskSummary, buildSessionSummary, clampQueueLimit, clampSessionLimit, commandStateFromTerminal, isLeaseExpired, isSessionOutputEvent, isTerminalCommandState, isTerminalQueueTaskState, isTerminalRunStatus, parseQueueCursor, parseSessionCursor, queueTaskMatchesCommander, queueTaskPayloadHash, queueTaskSort, sessionListFilters, sessionMatchesListState, sessionRefFromRecord, sessionSort, sessionTitleFromCommand, statusFromTerminal, summarizeResourceBundleRef, summarizeSessionRef, titleFromMetadata } from "./store.js"; import { backendCapabilitiesSqlValues, mergeBackendCapability } from "../common/backend-profiles.js"; import { normalizeRunEventPayload, requireEventType } from "../common/events.js"; @@ -983,15 +983,20 @@ CREATE TABLE IF NOT EXISTS agentrun_schema_migrations ( } async cancelQueueTask(taskId: string, reason = "cancel requested"): Promise { + const task = await this.getQueueTask(taskId); + if (isTerminalQueueTaskState(task.state)) return task; + if (task.latestAttempt?.commandId) await this.cancelCommand(task.latestAttempt.commandId, reason); + else if (task.latestAttempt?.runId) await this.cancelRun(task.latestAttempt.runId, reason); return this.withTransaction(async (client) => { const existing = await client.query("SELECT * FROM agentrun_queue_tasks WHERE id = $1 FOR UPDATE", [taskId]); const row = existing.rows[0]; if (!row) throw new AgentRunError("schema-invalid", `queue task ${taskId} was not found`, { httpStatus: 404 }); - const task = queueTaskFromRow(row); - if (isTerminalQueueTaskState(task.state)) return task; + const lockedTask = queueTaskFromRow(row); + if (isTerminalQueueTaskState(lockedTask.state)) return lockedTask; const at = nowIso(); const version = await this.nextQueueVersion(client); - const updated = await client.query("UPDATE agentrun_queue_tasks SET state = 'cancelled', version = $2, updated_at = $3, cancelled_at = $3, cancel_reason = $4 WHERE id = $1 RETURNING *", [taskId, version, at, reason]); + const latestAttempt = lockedTask.latestAttempt ? { ...lockedTask.latestAttempt, state: "cancelled" as const } : null; + const updated = await client.query("UPDATE agentrun_queue_tasks SET state = 'cancelled', latest_attempt = $2::jsonb, version = $3, updated_at = $4, cancelled_at = $4, cancel_reason = $5 WHERE id = $1 RETURNING *", [taskId, JSON.stringify(latestAttempt), version, at, reason]); return queueTaskFromRow(updated.rows[0]); }); } @@ -1017,10 +1022,16 @@ CREATE TABLE IF NOT EXISTS agentrun_schema_migrations ( return buildQueueStats(await this.loadQueueTasksForProjection(queue), queue ?? null); } - async queueCommander(queue?: string): Promise { + async queueCommander(queue?: string, readerId: string | null = null): Promise { const tasks = await this.loadQueueTasksForProjection(queue); const generatedAt = nowIso(); - return { queue: queue ?? null, stats: buildQueueStats(tasks, queue ?? null, generatedAt), items: tasks.sort(queueTaskSort).slice(0, 20), generatedAt }; + const cursors = readerId ? await this.loadQueueReadCursors(readerId, tasks.map((task) => task.id)) : new Map(); + const items = tasks + .map((task) => buildQueueTaskSummary(task, readerId, readerId ? cursors.get(task.id) ?? null : null)) + .filter((task) => queueTaskMatchesCommander(task, readerId)) + .sort(queueTaskSort) + .slice(0, 20); + return { queue: queue ?? null, readerId, stats: buildQueueStats(tasks, queue ?? null, generatedAt), items, generatedAt }; } async backends(): Promise { @@ -1079,6 +1090,15 @@ CREATE TABLE IF NOT EXISTS agentrun_schema_migrations ( })); } + private async loadQueueReadCursors(readerId: string, taskIds: string[]): Promise> { + if (taskIds.length === 0) return new Map(); + const result = await this.pool.query("SELECT * FROM agentrun_queue_read_cursors WHERE reader_id = $1 AND task_id = ANY($2::text[])", [readerId, taskIds]); + return new Map(result.rows.map((row) => { + const cursor = queueReadCursorFromRow(row); + return [cursor.taskId, cursor]; + })); + } + private async loadQueueTasksForProjection(queue?: string): Promise { if (queue) { const result = await this.pool.query("SELECT * FROM agentrun_queue_tasks WHERE queue = $1", [queue]); @@ -1376,6 +1396,10 @@ function sessionReadCursorFromRow(row: QueryResultRow): SessionReadCursorRecord return { sessionId: stringValue(row.session_id), readerId: stringValue(row.reader_id), sessionVersion: Number(row.session_version), readAt: iso(row.read_at) }; } +function queueReadCursorFromRow(row: QueryResultRow): QueueReadCursorRecord { + return { taskId: stringValue(row.task_id), readerId: stringValue(row.reader_id), taskVersion: Number(row.task_version), readAt: iso(row.read_at) }; +} + function sessionExecutionState(value: unknown): SessionRecord["executionState"] { if (value === "running" || value === "terminal") return value; return "idle"; diff --git a/src/mgr/server.ts b/src/mgr/server.ts index 3e9ea27..081a9ce 100644 --- a/src/mgr/server.ts +++ b/src/mgr/server.ts @@ -262,7 +262,7 @@ async function route({ method, url, body, store, sourceCommit, runnerJobDefaults return await store.markQueueTaskRead(queueTaskReadMatch[1] ?? "", readerId) as unknown as JsonValue; } if (method === "GET" && path === "/api/v1/queue/stats") return await store.queueStats(url.searchParams.get("queue") ?? undefined) as unknown as JsonValue; - if (method === "GET" && path === "/api/v1/queue/commander") return await store.queueCommander(url.searchParams.get("queue") ?? undefined) as unknown as JsonValue; + if (method === "GET" && path === "/api/v1/queue/commander") return await store.queueCommander(url.searchParams.get("queue") ?? undefined, url.searchParams.get("readerId")) as unknown as JsonValue; if (method === "POST" && path === "/api/v1/runs") return await store.createRun(validateCreateRun(body)) as unknown as JsonValue; const runMatch = path.match(/^\/api\/v1\/runs\/([^/]+)$/u); if (method === "GET" && runMatch) return await store.getRun(runMatch[1] ?? "") as unknown as JsonValue; diff --git a/src/mgr/store.ts b/src/mgr/store.ts index 634c6fb..3c04e67 100644 --- a/src/mgr/store.ts +++ b/src/mgr/store.ts @@ -1,4 +1,4 @@ -import type { BackendProfile, BackendTurnResult, CommandRecord, CreateCommandInput, CreateQueueTaskInput, CreateRunInput, FailureKind, JsonRecord, ListGcExpiredSessionsInput, QueueAttemptRef, QueueCommanderSnapshot, QueueReadCursorRecord, QueueStats, QueueTaskListResult, QueueTaskRecord, QueueTaskState, RunEvent, RunnerJobRecord, RunnerRecord, RunRecord, SessionEventPage, SessionListResult, SessionListState, SessionReadCursorRecord, SessionRecord, SessionRef, SessionStoragePatch, SessionSummary, TerminalStatus, UpsertSessionInput } from "../common/types.js"; +import type { BackendProfile, BackendTurnResult, CommandRecord, CreateCommandInput, CreateQueueTaskInput, CreateRunInput, FailureKind, JsonRecord, ListGcExpiredSessionsInput, QueueAttemptRef, QueueCommanderSnapshot, QueueReadCursorRecord, QueueStats, QueueTaskListResult, QueueTaskRecord, QueueTaskState, QueueTaskSummary, RunEvent, RunnerJobRecord, RunnerRecord, RunRecord, SessionEventPage, SessionListResult, SessionListState, SessionReadCursorRecord, SessionRecord, SessionRef, SessionStoragePatch, SessionSummary, TerminalStatus, UpsertSessionInput } from "../common/types.js"; import { AgentRunError } from "../common/errors.js"; import { newId, nowIso, stableHash } from "../common/validation.js"; import { redactJson } from "../common/redaction.js"; @@ -55,7 +55,7 @@ export interface AgentRunStore { cancelQueueTask(taskId: string, reason?: string): MaybePromise; markQueueTaskRead(taskId: string, readerId: string): MaybePromise; queueStats(queue?: string): MaybePromise; - queueCommander(queue?: string): MaybePromise; + queueCommander(queue?: string, readerId?: string | null): MaybePromise; backends(): MaybePromise; close?(): MaybePromise; } @@ -495,8 +495,11 @@ export class MemoryAgentRunStore implements AgentRunStore { cancelQueueTask(taskId: string, reason = "cancel requested"): QueueTaskRecord { const task = this.getQueueTask(taskId); if (isTerminalQueueTaskState(task.state)) return task; + if (task.latestAttempt?.commandId) this.cancelCommand(task.latestAttempt.commandId, reason); + else if (task.latestAttempt?.runId) this.cancelRun(task.latestAttempt.runId, reason); const at = nowIso(); - const next: QueueTaskRecord = { ...task, state: "cancelled", version: this.nextQueueVersion(), updatedAt: at, cancelledAt: at, cancelReason: reason }; + const latestAttempt = task.latestAttempt ? { ...task.latestAttempt, state: "cancelled" as const } : null; + const next: QueueTaskRecord = { ...task, state: "cancelled", latestAttempt, version: this.nextQueueVersion(), updatedAt: at, cancelledAt: at, cancelReason: reason }; this.queueTasks.set(taskId, next); return next; } @@ -512,10 +515,15 @@ export class MemoryAgentRunStore implements AgentRunStore { return buildQueueStats(Array.from(this.queueTasks.values()).filter((task) => !queue || task.queue === queue), queue ?? null); } - queueCommander(queue?: string): QueueCommanderSnapshot { - const items = Array.from(this.queueTasks.values()).filter((task) => !queue || task.queue === queue).sort(queueTaskSort).slice(0, 20); + queueCommander(queue?: string, readerId: string | null = null): QueueCommanderSnapshot { + const tasks = Array.from(this.queueTasks.values()).filter((task) => !queue || task.queue === queue); const generatedAt = nowIso(); - return { queue: queue ?? null, stats: buildQueueStats(Array.from(this.queueTasks.values()).filter((task) => !queue || task.queue === queue), queue ?? null, generatedAt), items, generatedAt }; + const items = tasks + .map((task) => buildQueueTaskSummary(task, readerId, readerId ? this.queueReadCursors.get(queueReadKey(task.id, readerId)) ?? null : null)) + .filter((task) => queueTaskMatchesCommander(task, readerId)) + .sort(queueTaskSort) + .slice(0, 20); + return { queue: queue ?? null, readerId, stats: buildQueueStats(tasks, queue ?? null, generatedAt), items, generatedAt }; } backends(): JsonRecord[] { @@ -686,6 +694,18 @@ export function buildSessionSummary(record: SessionRecord, readerId: string | nu return { ...record, sessionPath: `${record.tenantId}/${record.projectId}/${record.sessionId}`, readerId, readCursor, attentionState, unread, active }; } +export function buildQueueTaskSummary(record: QueueTaskRecord, readerId: string | null, readCursor: QueueReadCursorRecord | null): QueueTaskSummary { + const active = !isTerminalQueueTaskState(record.state); + const unread = !active && (!readCursor || readCursor.taskVersion < record.version); + const attentionState = active ? "active" : unread ? "unread" : "read"; + return { ...record, readerId, readCursor, attentionState, unread, active }; +} + +export function queueTaskMatchesCommander(task: QueueTaskSummary, readerId: string | null): boolean { + if (!readerId) return true; + return task.active || task.unread; +} + export function sessionMatchesListState(session: SessionSummary, state: SessionListState): boolean { if (state === "all") return true; if (state === "default") return session.active || session.unread; diff --git a/src/selftest/cases/70-queue-q1.ts b/src/selftest/cases/70-queue-q1.ts index b56d1c6..0fffe6d 100644 --- a/src/selftest/cases/70-queue-q1.ts +++ b/src/selftest/cases/70-queue-q1.ts @@ -62,6 +62,14 @@ const selfTest: SelfTestCase = async (context) => { const commander = await client.get("/api/v1/queue/commander?queue=dev") as QueueCommanderSnapshot; assert.equal(commander.stats.byState.cancelled, 1); assert.equal(commander.items[0]?.id, created.id); + assert.equal(commander.items[0]?.attentionState, "unread"); + + const terminalRead = await client.post(`/api/v1/queue/tasks/${created.id}/read`, { readerId: "self-test" }) as QueueReadCursorRecord; + assert.equal(terminalRead.taskVersion, cancelled.version); + const readCommander = await client.get("/api/v1/queue/commander?queue=dev&readerId=self-test") as QueueCommanderSnapshot; + assert.equal(readCommander.readerId, "self-test"); + assert.equal(readCommander.stats.byState.cancelled, 1); + assert.equal(readCommander.items.some((item) => item.id === created.id), false); return { name: "queue-q1", tests: ["queue-q1-rest-memory"] }; } finally { await new Promise((resolve) => server.server.close(() => resolve())); diff --git a/src/selftest/cases/75-queue-q2-dispatch.ts b/src/selftest/cases/75-queue-q2-dispatch.ts index afb7b28..180621c 100644 --- a/src/selftest/cases/75-queue-q2-dispatch.ts +++ b/src/selftest/cases/75-queue-q2-dispatch.ts @@ -93,10 +93,52 @@ console.log(JSON.stringify({ apiVersion: manifest.apiVersion, kind: manifest.kin assert.equal(refreshed.latestAttempt?.state, "completed"); assert.equal(refreshed.latestAttempt?.runId, dispatched.run.id); assert.equal(refreshed.latestAttempt?.sessionPath, "/api/v1/sessions/sess_queue_q2_dispatch_selftest"); - const manifest = JSON.parse(await readFile(createdManifest, "utf8")) as JsonRecord; - assert.ok(JSON.stringify(manifest).includes(dispatched.run.id)); + const dispatchManifest = JSON.parse(await readFile(createdManifest, "utf8")) as JsonRecord; + assert.ok(JSON.stringify(dispatchManifest).includes(dispatched.run.id)); + + const cancelCreated = await client.post("/api/v1/queue/tasks", { + tenantId: "unidesk", + projectId: "pikasTech/unidesk", + queue: "dev", + lane: "q2", + title: "Q2 queue cancel task", + priority: 21, + backendProfile: "codex", + providerId: "G14", + workspaceRef: { kind: "host-path", path: context.workspace }, + sessionRef: { sessionId: "sess_queue_q2_cancel_selftest", metadata: { source: "queue-q2-cancel-self-test" } }, + executionPolicy: { + sandbox: "workspace-write", + approval: "never", + timeoutMs: 15_000, + network: "default", + secretScope: { allowCredentialEcho: false, providerCredentials: [{ profile: "codex", secretRef: { name: "agentrun-v01-provider-codex", keys: ["auth.json", "config.toml"], mountPath: context.codexHome } }] }, + }, + resourceBundleRef: null, + payload: { prompt: "queue cancel hello" }, + references: [{ kind: "issue", url: "https://github.com/pikasTech/agentrun/issues/105" }], + metadata: { source: "queue-q2-cancel-self-test" }, + idempotencyKey: "queue-q2-cancel-self-test", + }) as QueueTaskRecord; + const cancelDispatched = await client.post(`/api/v1/queue/tasks/${cancelCreated.id}/dispatch`, { attemptId: "attempt_queue_q2_cancel_selftest" }) as QueueDispatchResult; + const cancelled = await client.post(`/api/v1/queue/tasks/${cancelCreated.id}/cancel`, { reason: "self-test queue cancel propagation" }) as QueueTaskRecord; + assert.equal(cancelled.state, "cancelled"); + assert.equal(cancelled.latestAttempt?.state, "cancelled"); + assert.equal(cancelled.latestAttempt?.runId, cancelDispatched.run.id); + assert.equal(cancelled.cancelReason, "self-test queue cancel propagation"); + const cancelledCommand = await client.get(`/api/v1/runs/${cancelDispatched.run.id}/commands/${cancelDispatched.command.id}`) as { state?: string }; + assert.equal(cancelledCommand.state, "cancelled"); + const cancelledSession = await client.get("/api/v1/sessions/sess_queue_q2_cancel_selftest") as { executionState?: string; terminalStatus?: string; failureKind?: string; activeRunId?: string | null; activeCommandId?: string | null }; + assert.equal(cancelledSession.executionState, "terminal"); + assert.equal(cancelledSession.terminalStatus, "cancelled"); + assert.equal(cancelledSession.failureKind, "cancelled"); + assert.equal(cancelledSession.activeRunId, null); + assert.equal(cancelledSession.activeCommandId, null); + const cancelManifest = JSON.parse(await readFile(createdManifest, "utf8")) as JsonRecord; + assert.ok(JSON.stringify(cancelManifest).includes(cancelDispatched.run.id)); assertNoSecretLeak(dispatched); - return { name: "queue-q2-dispatch", tests: ["queue-dispatch-run-command-runner-job", "queue-refresh-from-core-status", "queue-dispatch-no-repeat"] }; + assertNoSecretLeak(cancelled); + return { name: "queue-q2-dispatch", tests: ["queue-dispatch-run-command-runner-job", "queue-refresh-from-core-status", "queue-dispatch-no-repeat", "queue-cancel-propagates-to-command-session"] }; } finally { await new Promise((resolve) => server.server.close(() => resolve())); }