diff --git a/docs/reference/spec-v01-cli.md b/docs/reference/spec-v01-cli.md index 8b4b34a..6df74fd 100644 --- a/docs/reference/spec-v01-cli.md +++ b/docs/reference/spec-v01-cli.md @@ -120,6 +120,7 @@ CLI 官方 TypeScript 入口固定为 `scripts/agentrun-cli.ts`。在 G14 非交 | runner start/job | 已实现 | `runner start` 可执行 host process runner;`runner job --dry-run` 可渲染 Kubernetes Job JSON;`runner job` 正式路径通过 manager REST 创建 Kubernetes Job,支持 `--idempotency-key` 并快速返回 job identity、SecretRef、retention 和轮询命令。 | | runner jobs/job-status | 已实现 | CLI 通过 manager REST 查询 runner Job 持久记录和最小状态摘要,不直连 Kubernetes、不读取 Secret 值。 | | result/cancel CLI | 已实现 | `runs result`、`commands result`、`runs cancel` 和 `commands cancel` 均调用 manager REST,不维护独立状态。 | -| Queue/Session CLI | 待实现 | 规格见 [spec-v01-queue.md](spec-v01-queue.md);Queue 命令只返回 sessionPath,输出和 trace 进入 Session 命令。 | +| Queue CLI | 已实现/Q1 | 已提供 `queue submit/list/show/stats/commander/read/cancel`,通过 manager REST 访问 Queue task 和 stats,不直连 Postgres。 | +| Session CLI | 待实现 | 规格见 [spec-v01-queue.md](spec-v01-queue.md);输出和 trace 进入 Session 命令,Queue 命令不得代理 output/trace。 | | CLI 测试规格 | 已定义/已验证主闭环 | 综合联调见 [spec-v01-validation.md](spec-v01-validation.md);每次发布仍按手动交互验收复跑。 | | `deepseek` profile CLI | 已实现/已通过主闭环 | `secrets codex render --profile deepseek`、`backends list`、`runner start --backend`、`runner job` 和 JSON 错误可见性已实现;真实 CLI/RESTful 联调已通过 `codex -> deepseek -> codex` 切换主闭环。 | diff --git a/docs/reference/spec-v01-postgres.md b/docs/reference/spec-v01-postgres.md index 954fdf7..e789aae 100644 --- a/docs/reference/spec-v01-postgres.md +++ b/docs/reference/spec-v01-postgres.md @@ -83,7 +83,7 @@ Secret 名称和 key 可以在实现时按 Kubernetes 命名限制微调,但 | --- | --- | --- | | Postgres durable store 规格 | 已定义 | 本文为 v0.1 存储权威。 | | StatefulSet/Service/PVC | 已实现/已通过主闭环 | `agentrun-v01-postgres` StatefulSet、Service 和 PVC 已由 GitOps runtime 提供,作为 `agentrun-v01` durable store。 | -| migration ledger | 已实现/已通过主闭环 | `agentrun-mgr` 启动 Postgres adapter 时幂等创建 `agentrun_schema_migrations` 并记录 migration id/checksum;当前最新 migration 为 `003_v01_hwlab_manual_dispatch`,用于新增 SessionRef、ResourceBundleRef 和 runner job idempotency 持久化;readiness 必须显示 migration ready。 | -| manager Postgres adapter | 已实现/已通过主闭环 | `agentrun-mgr` 通过 `DATABASE_URL` 启用 Postgres adapter,持久化 runs、commands、events、runners、runner_jobs、sessions、backends 和 leases;缺少 `DATABASE_URL` 时 live runtime fail fast,memory 只允许显式 self-test/dev。 | +| migration ledger | 已实现/已通过主闭环 | `agentrun-mgr` 启动 Postgres adapter 时幂等创建 `agentrun_schema_migrations` 并记录 migration id/checksum;当前最新 migration 为 `004_v01_queue_q1`,用于新增 Q1 Queue task 与 read cursor 持久化;readiness 必须显示 migration ready。 | +| manager Postgres adapter | 已实现/已通过主闭环 | `agentrun-mgr` 通过 `DATABASE_URL` 启用 Postgres adapter,持久化 runs、commands、events、runners、runner_jobs、sessions、backends、leases、Queue task 和 read cursor;缺少 `DATABASE_URL` 时 live runtime fail fast,memory 只允许显式 self-test/dev。 | | health/readiness store 状态 | 已实现 | health/readiness 返回 adapter、reachable、migrationReady、migrationId、failureKind 和 redacted Secret 状态,不输出 DSN 明文。 | | file/sqlite durable store | 不采用 | 只可用于临时本地测试,不作为 v0.1 runtime truth。 | diff --git a/docs/reference/spec-v01-queue.md b/docs/reference/spec-v01-queue.md index 8b813b6..6c32b2d 100644 --- a/docs/reference/spec-v01-queue.md +++ b/docs/reference/spec-v01-queue.md @@ -161,8 +161,8 @@ Queue 首版新增或扩展的稳定表方向: | 规格项 | 状态 | 说明 | | --- | --- | --- | | AgentRun Queue 直接吸收规格 | 已定义 | 本文为 Queue 吸收 Code Queue 的首版权威规格。 | -| Queue RESTful API | 待实现 | 计划通过 `agentrun-mgr` 暴露,仍使用短请求和轻量轮询。 | -| Queue CLI | 待实现 | 计划加入 `queue ...` 和 `sessions ...` 命令族。 | +| Queue RESTful API | 已实现/Q1 | 已通过 `agentrun-mgr` 暴露 `submit/list/show/stats/read/cancel/commander`,使用短请求和 Queue version/cursor 轻量轮询;Q2 再接入 attempt 与真实执行。 | +| Queue CLI | 已实现/Q1 | 已加入 `queue submit/list/show/stats/commander/read/cancel`;Queue 命令只返回 task summary、stats、read cursor 和 `sessionPath`。 | | Session API/CLI | 待实现 | Queue 只返回 `sessionPath`;Session 层承接输出、trace 和控制。 | | Scheduler 接入 | 待实现 | 旧 Code Queue scheduler 不保留;AgentRun Scheduler 是唯一调度方向。 | | OA/Event/integrations | 不采用 | 首版不做,后续如需外部 connector/sink 必须单独立规格。 | diff --git a/scripts/src/cli.ts b/scripts/src/cli.ts index 66fae35..7734f0a 100644 --- a/scripts/src/cli.ts +++ b/scripts/src/cli.ts @@ -33,6 +33,13 @@ async function dispatch(args: ParsedArgs): Promise { if (group === "server" && command === "status") return client(args).get("/health/readiness"); if (group === "backends" && command === "list") return client(args).get("/api/v1/backends"); if (group === "secrets" && command === "codex" && id === "render") return renderCodexSecret(args); + if (group === "queue" && command === "submit") return submitQueueTask(args); + if (group === "queue" && command === "list") return listQueueTasks(args); + if (group === "queue" && command === "show" && id) return client(args).get(`/api/v1/queue/tasks/${encodeURIComponent(id)}`); + if (group === "queue" && command === "stats") return client(args).get(`/api/v1/queue/stats${queueQuery(args)}`); + if (group === "queue" && command === "commander") return client(args).get(`/api/v1/queue/commander${queueQuery(args)}`); + if (group === "queue" && command === "read" && id) return client(args).post(`/api/v1/queue/tasks/${encodeURIComponent(id)}/read`, { readerId: optionalFlag(args, "reader-id") ?? "cli" }); + if (group === "queue" && command === "cancel" && id) return client(args).post(`/api/v1/queue/tasks/${encodeURIComponent(id)}/cancel`, cancelBody(args)); if (group === "runs" && command === "create") return client(args).post("/api/v1/runs", await jsonFile(args)); if (group === "runs" && command === "show" && id) return client(args).get(`/api/v1/runs/${encodeURIComponent(id)}`); if (group === "runs" && command === "events" && id) return client(args).get(`/api/v1/runs/${encodeURIComponent(id)}/events?afterSeq=${flag(args, "after-seq", "0")}&limit=${flag(args, "limit", "100")}`); @@ -92,6 +99,34 @@ async function listRunnerJobs(args: ParsedArgs): Promise { return client(args).get(`/api/v1/runs/${encodeURIComponent(runId)}/runner-jobs${commandId ? `?commandId=${encodeURIComponent(commandId)}` : ""}`); } +async function submitQueueTask(args: ParsedArgs): Promise { + const body = await jsonFile(args); + const idempotencyKey = optionalFlag(args, "idempotency-key"); + if (idempotencyKey) body.idempotencyKey = idempotencyKey; + return client(args).post("/api/v1/queue/tasks", body); +} + +async function listQueueTasks(args: ParsedArgs): Promise { + const params = new URLSearchParams(); + const queue = optionalFlag(args, "queue"); + const state = optionalFlag(args, "state"); + const cursor = optionalFlag(args, "cursor"); + const limit = optionalFlag(args, "limit"); + const updatedAfter = optionalFlag(args, "updated-after"); + if (queue) params.set("queue", queue); + if (state) params.set("state", state); + if (cursor) params.set("cursor", cursor); + if (limit) params.set("limit", limit); + if (updatedAfter) params.set("updatedAfter", updatedAfter); + const query = params.toString(); + return client(args).get(`/api/v1/queue/tasks${query ? `?${query}` : ""}`); +} + +function queueQuery(args: ParsedArgs): string { + const queue = optionalFlag(args, "queue"); + return queue ? `?queue=${encodeURIComponent(queue)}` : ""; +} + async function showRunnerJobStatus(args: ParsedArgs): Promise { const runId = flag(args, "run-id", ""); if (!runId) throw new AgentRunError("schema-invalid", "runner job-status requires --run-id", { httpStatus: 2 }); @@ -240,6 +275,13 @@ function help(): JsonRecord { "runner job --dry-run --run-id --command-id --image ", "runner jobs --run-id [--command-id ]", "runner job-status [runnerJobId] --run-id ", + "queue submit --json-file [--idempotency-key ]", + "queue list [--queue ] [--state ] [--cursor ] [--limit ] [--updated-after ]", + "queue show ", + "queue stats [--queue ]", + "queue commander [--queue ]", + "queue read [--reader-id ]", + "queue cancel [--reason ]", "secrets codex render --dry-run [--profile codex|deepseek] [--codex-home ] [--namespace agentrun-v01] [--secret-name ]", "backends list", "server start|status", diff --git a/src/common/types.ts b/src/common/types.ts index 9b76199..c25f2ab 100644 --- a/src/common/types.ts +++ b/src/common/types.ts @@ -23,6 +23,7 @@ export type RunStatus = "pending" | "claimed" | "running" | "completed" | "faile export type CommandState = "pending" | "acknowledged" | "completed" | "failed" | "cancelled"; export type TerminalStatus = "completed" | "failed" | "blocked" | "cancelled"; export type BackendProfile = "codex" | "deepseek"; +export type QueueTaskState = "pending" | "running" | "completed" | "failed" | "blocked" | "cancelled"; export interface WorkspaceRef extends JsonRecord { kind: "git-worktree" | "host-path" | "kubernetes-pvc" | "opaque"; @@ -168,6 +169,76 @@ export interface RunnerJobRecord extends JsonRecord { updatedAt: string; } +export interface QueueAttemptRef extends JsonRecord { + attemptId: string; + state: QueueTaskState; + runId: string | null; + commandId: string | null; + runnerJobId: string | null; + sessionId: string | null; + sessionPath: string | null; +} + +export interface CreateQueueTaskInput extends JsonRecord { + tenantId: string; + projectId: string; + queue: string; + lane: string; + title: string; + priority: number; + backendProfile: BackendProfile; + providerId: string | null; + workspaceRef: WorkspaceRef | null; + executionPolicy: ExecutionPolicy | null; + resourceBundleRef: ResourceBundleRef | null; + payload: JsonRecord; + references: JsonRecord[]; + metadata: JsonRecord; + idempotencyKey?: string; +} + +export interface QueueTaskRecord extends CreateQueueTaskInput { + id: string; + state: QueueTaskState; + version: number; + payloadHash: string; + latestAttempt: QueueAttemptRef | null; + sessionPath: string | null; + createdAt: string; + updatedAt: string; + cancelledAt: string | null; + cancelReason: string | null; +} + +export interface QueueReadCursorRecord extends JsonRecord { + taskId: string; + readerId: string; + taskVersion: number; + readAt: string; +} + +export interface QueueTaskListResult extends JsonRecord { + items: QueueTaskRecord[]; + count: number; + cursor: string | null; +} + +export interface QueueStats extends JsonRecord { + queue: string | null; + total: number; + byState: JsonRecord; + byLane: JsonRecord; + maxVersion: number; + generatedAt: string; +} + +export interface QueueCommanderSnapshot extends JsonRecord { + queue: string | null; + stats: QueueStats; + items: QueueTaskRecord[]; + generatedAt: string; +} + export interface BackendEvent { type: EventType; payload: JsonRecord; diff --git a/src/common/validation.ts b/src/common/validation.ts index 4eca761..67d7f04 100644 --- a/src/common/validation.ts +++ b/src/common/validation.ts @@ -1,5 +1,5 @@ import { createHash, randomUUID } from "node:crypto"; -import type { BackendProfile, CreateCommandInput, CreateRunInput, ExecutionPolicy, JsonRecord, JsonValue, ResourceBundleRef, SecretRef, SessionRef } from "./types.js"; +import type { BackendProfile, CreateCommandInput, CreateQueueTaskInput, CreateRunInput, ExecutionPolicy, JsonRecord, JsonValue, QueueTaskState, ResourceBundleRef, SecretRef, SessionRef } from "./types.js"; import { AgentRunError } from "./errors.js"; import { backendProfileSpec, backendProfiles, isBackendProfile } from "./backend-profiles.js"; @@ -168,3 +168,41 @@ export function validateCreateCommand(input: unknown): CreateCommandInput { const idempotencyKey = typeof record.idempotencyKey === "string" && record.idempotencyKey.trim().length > 0 ? record.idempotencyKey.trim() : undefined; return { type, payload, ...(idempotencyKey ? { idempotencyKey } : {}) }; } + +export function validateCreateQueueTask(input: unknown): CreateQueueTaskInput { + const record = asRecord(input, "queueTask"); + const tenantId = requiredString(record, "tenantId"); + if (!allowedTenants.has(tenantId)) throw new AgentRunError("tenant-policy-denied", `tenantId ${tenantId} is not allowed`, { httpStatus: 403 }); + const backendProfileValue = optionalString(record.backendProfile) ?? "codex"; + if (!isBackendProfile(backendProfileValue)) throw new AgentRunError("schema-invalid", `backendProfile ${backendProfileValue} is not supported in v0.1`, { httpStatus: 400, details: { allowedBackends: [...backendProfiles] } }); + const queue = optionalString(record.queue) ?? "default"; + const lane = optionalString(record.lane) ?? "default"; + const priorityValue = record.priority ?? 0; + if (typeof priorityValue !== "number" || !Number.isFinite(priorityValue)) throw new AgentRunError("schema-invalid", "priority must be a finite number", { httpStatus: 400 }); + const referencesValue = record.references ?? []; + if (!Array.isArray(referencesValue)) throw new AgentRunError("schema-invalid", "references must be an array", { httpStatus: 400 }); + const result: CreateQueueTaskInput = { + tenantId, + projectId: requiredString(record, "projectId"), + queue, + lane, + title: requiredString(record, "title"), + priority: priorityValue, + backendProfile: backendProfileValue, + providerId: optionalString(record.providerId) ?? null, + workspaceRef: record.workspaceRef === undefined || record.workspaceRef === null ? null : requiredRecord(record, "workspaceRef") as CreateQueueTaskInput["workspaceRef"], + executionPolicy: record.executionPolicy === undefined || record.executionPolicy === null ? null : validateExecutionPolicy(requiredRecord(record, "executionPolicy")), + resourceBundleRef: validateResourceBundleRef(record.resourceBundleRef), + payload: record.payload === undefined ? {} : asRecord(record.payload, "payload"), + references: referencesValue.map((item, index) => asRecord(item, `references[${index}]`)), + metadata: record.metadata === undefined ? {} : asRecord(record.metadata, "metadata"), + }; + const idempotencyKey = optionalString(record.idempotencyKey); + if (idempotencyKey) result.idempotencyKey = idempotencyKey; + return result; +} + +export function validateQueueTaskState(value: string): QueueTaskState { + if (value === "pending" || value === "running" || value === "completed" || value === "failed" || value === "blocked" || value === "cancelled") return value; + throw new AgentRunError("schema-invalid", `queue task state ${value} is not supported`, { httpStatus: 400 }); +} diff --git a/src/mgr/postgres-store.ts b/src/mgr/postgres-store.ts index d2dc43d..b224904 100644 --- a/src/mgr/postgres-store.ts +++ b/src/mgr/postgres-store.ts @@ -3,10 +3,10 @@ import { Pool } from "pg"; import type { PoolClient, QueryResultRow } from "pg"; import { AgentRunError } from "../common/errors.js"; import { redactJson } from "../common/redaction.js"; -import type { BackendProfile, BackendTurnResult, CommandRecord, CommandState, CreateCommandInput, CreateRunInput, EventType, FailureKind, JsonRecord, JsonValue, RunEvent, RunnerJobRecord, RunnerRecord, RunRecord, RunStatus, SessionRecord, SessionRef, TerminalStatus } from "../common/types.js"; +import type { BackendProfile, BackendTurnResult, CommandRecord, CommandState, CreateCommandInput, CreateQueueTaskInput, CreateRunInput, EventType, FailureKind, JsonRecord, JsonValue, QueueCommanderSnapshot, QueueReadCursorRecord, QueueStats, QueueTaskListResult, QueueTaskRecord, QueueTaskState, RunEvent, RunnerJobRecord, RunnerRecord, RunRecord, RunStatus, SessionRecord, SessionRef, TerminalStatus } from "../common/types.js"; import { newId, nowIso, stableHash } from "../common/validation.js"; -import type { AgentRunStore, SaveRunnerJobInput, StoreHealth } from "./store.js"; -import { assertSessionBoundary, commandStateFromTerminal, isTerminalCommandState, isTerminalRunStatus, sessionRefFromRecord, statusFromTerminal, summarizeResourceBundleRef, summarizeSessionRef } from "./store.js"; +import type { AgentRunStore, ListQueueTasksInput, SaveRunnerJobInput, StoreHealth } from "./store.js"; +import { assertSessionBoundary, buildQueueStats, clampQueueLimit, commandStateFromTerminal, isTerminalCommandState, isTerminalQueueTaskState, isTerminalRunStatus, parseQueueCursor, queueTaskSort, sessionRefFromRecord, statusFromTerminal, summarizeResourceBundleRef, summarizeSessionRef } from "./store.js"; import { backendCapabilitiesSqlValues, mergeBackendCapability } from "../common/backend-profiles.js"; import { normalizeRunEventPayload, requireEventType } from "../common/events.js"; @@ -171,6 +171,53 @@ CREATE INDEX IF NOT EXISTS agentrun_runner_jobs_run_command_idx ON agentrun_runn CREATE INDEX IF NOT EXISTS agentrun_sessions_tenant_project_idx ON agentrun_sessions (tenant_id, project_id, backend_profile, updated_at); `; +const queueQ1MigrationSql = ` +CREATE TABLE IF NOT EXISTS agentrun_queue_tasks ( + id text PRIMARY KEY, + tenant_id text NOT NULL, + project_id text NOT NULL, + queue text NOT NULL, + lane text NOT NULL, + title text NOT NULL, + priority integer NOT NULL, + state text NOT NULL, + version bigint NOT NULL, + backend_profile text NOT NULL, + provider_id text, + workspace_ref jsonb, + execution_policy jsonb, + resource_bundle_ref jsonb, + payload jsonb NOT NULL, + payload_hash text NOT NULL, + references_json jsonb NOT NULL DEFAULT '[]'::jsonb, + metadata jsonb NOT NULL DEFAULT '{}'::jsonb, + idempotency_key text, + latest_attempt jsonb, + session_path text, + created_at timestamptz NOT NULL, + updated_at timestamptz NOT NULL, + cancelled_at timestamptz, + cancel_reason text +); + +CREATE UNIQUE INDEX IF NOT EXISTS agentrun_queue_tasks_idempotency_idx + ON agentrun_queue_tasks (tenant_id, project_id, idempotency_key) + WHERE idempotency_key IS NOT NULL; + +CREATE INDEX IF NOT EXISTS agentrun_queue_tasks_list_idx ON agentrun_queue_tasks (queue, state, version, priority, created_at); +CREATE INDEX IF NOT EXISTS agentrun_queue_tasks_updated_idx ON agentrun_queue_tasks (version, updated_at); + +CREATE TABLE IF NOT EXISTS agentrun_queue_read_cursors ( + task_id text NOT NULL REFERENCES agentrun_queue_tasks(id) ON DELETE CASCADE, + reader_id text NOT NULL, + task_version bigint NOT NULL, + read_at timestamptz NOT NULL, + PRIMARY KEY (task_id, reader_id) +); + +CREATE SEQUENCE IF NOT EXISTS agentrun_queue_version_seq; +`; + const postgresMigrations: MigrationDefinition[] = [ { id: "001_v01_initial_durable_store", @@ -187,13 +234,18 @@ const postgresMigrations: MigrationDefinition[] = [ checksum: checksumSql(hwlabManualDispatchMigrationSql), sql: hwlabManualDispatchMigrationSql, }, + { + id: "004_v01_queue_q1", + checksum: checksumSql(queueQ1MigrationSql), + sql: queueQ1MigrationSql, + }, ]; export function postgresMigrationContract(): JsonRecord { return { migrationIds: postgresMigrations.map((migration) => migration.id), latestMigrationId: latestMigrationId(), - requiredTables: ["agentrun_schema_migrations", "agentrun_runs", "agentrun_commands", "agentrun_events", "agentrun_runners", "agentrun_backends", "agentrun_leases", "agentrun_sessions", "agentrun_runner_jobs"], + requiredTables: ["agentrun_schema_migrations", "agentrun_runs", "agentrun_commands", "agentrun_events", "agentrun_runners", "agentrun_backends", "agentrun_leases", "agentrun_sessions", "agentrun_runner_jobs", "agentrun_queue_tasks", "agentrun_queue_read_cursors"], checksums: Object.fromEntries(postgresMigrations.map((migration) => [migration.id, migration.checksum])), }; } @@ -508,6 +560,96 @@ CREATE TABLE IF NOT EXISTS agentrun_schema_migrations ( return result.rows[0] ? sessionFromRow(result.rows[0]) : null; } + async createQueueTask(input: CreateQueueTaskInput): Promise { + const payloadHash = stableHash(input.payload); + return this.withTransaction(async (client) => { + if (input.idempotencyKey) { + const existing = await client.query("SELECT * FROM agentrun_queue_tasks WHERE tenant_id = $1 AND project_id = $2 AND idempotency_key = $3 FOR UPDATE", [input.tenantId, input.projectId, input.idempotencyKey]); + if (existing.rows[0]) { + const record = queueTaskFromRow(existing.rows[0]); + if (record.payloadHash !== payloadHash) throw new AgentRunError("schema-invalid", "queue task idempotency key reused with different payload", { httpStatus: 409 }); + return record; + } + } + const at = nowIso(); + const version = await this.nextQueueVersion(client); + const task: QueueTaskRecord = { ...input, id: newId("qt"), state: "pending", version, payloadHash, latestAttempt: null, sessionPath: null, createdAt: at, updatedAt: at, cancelledAt: null, cancelReason: null }; + const inserted = await client.query( + `INSERT INTO agentrun_queue_tasks (id, tenant_id, project_id, queue, lane, title, priority, state, version, backend_profile, provider_id, workspace_ref, execution_policy, resource_bundle_ref, payload, payload_hash, references_json, metadata, idempotency_key, latest_attempt, session_path, created_at, updated_at, cancelled_at, cancel_reason) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12::jsonb, $13::jsonb, $14::jsonb, $15::jsonb, $16, $17::jsonb, $18::jsonb, $19, $20::jsonb, $21, $22, $23, $24, $25) + RETURNING *`, + [task.id, task.tenantId, task.projectId, task.queue, task.lane, task.title, task.priority, task.state, task.version, task.backendProfile, task.providerId, JSON.stringify(task.workspaceRef), JSON.stringify(task.executionPolicy), JSON.stringify(task.resourceBundleRef), JSON.stringify(task.payload), task.payloadHash, JSON.stringify(task.references), JSON.stringify(task.metadata), task.idempotencyKey ?? null, JSON.stringify(task.latestAttempt), task.sessionPath, task.createdAt, task.updatedAt, task.cancelledAt, task.cancelReason], + ); + return queueTaskFromRow(inserted.rows[0]); + }); + } + + async listQueueTasks(input: ListQueueTasksInput): Promise { + const startVersion = parseQueueCursor(input.cursor) ?? input.updatedAfter ?? 0; + const params: unknown[] = [startVersion]; + const where = ["version > $1"]; + if (input.queue) { + params.push(input.queue); + where.push(`queue = $${params.length}`); + } + if (input.state) { + params.push(input.state); + where.push(`state = $${params.length}`); + } + params.push(clampQueueLimit(input.limit)); + const result = await this.pool.query(`SELECT * FROM agentrun_queue_tasks WHERE ${where.join(" AND ")} ORDER BY priority DESC, created_at ASC, id ASC LIMIT $${params.length}`, params); + const items = result.rows.map(queueTaskFromRow); + return { items, count: items.length, cursor: items.length > 0 ? String(items[items.length - 1]?.version ?? startVersion) : null }; + } + + async getQueueTask(taskId: string): Promise { + const result = await this.pool.query("SELECT * FROM agentrun_queue_tasks WHERE id = $1", [taskId]); + const row = result.rows[0]; + if (!row) throw new AgentRunError("schema-invalid", `queue task ${taskId} was not found`, { httpStatus: 404 }); + return queueTaskFromRow(row); + } + + async cancelQueueTask(taskId: string, reason = "cancel requested"): Promise { + return this.withTransaction(async (client) => { + const existing = await client.query("SELECT * FROM agentrun_queue_tasks WHERE id = $1 FOR UPDATE", [taskId]); + const row = existing.rows[0]; + if (!row) throw new AgentRunError("schema-invalid", `queue task ${taskId} was not found`, { httpStatus: 404 }); + const task = queueTaskFromRow(row); + if (isTerminalQueueTaskState(task.state)) return task; + const at = nowIso(); + const version = await this.nextQueueVersion(client); + const updated = await client.query("UPDATE agentrun_queue_tasks SET state = 'cancelled', version = $2, updated_at = $3, cancelled_at = $3, cancel_reason = $4 WHERE id = $1 RETURNING *", [taskId, version, at, reason]); + return queueTaskFromRow(updated.rows[0]); + }); + } + + async markQueueTaskRead(taskId: string, readerId: string): Promise { + return this.withTransaction(async (client) => { + const existing = await client.query("SELECT * FROM agentrun_queue_tasks WHERE id = $1 FOR UPDATE", [taskId]); + const row = existing.rows[0]; + if (!row) throw new AgentRunError("schema-invalid", `queue task ${taskId} was not found`, { httpStatus: 404 }); + const task = queueTaskFromRow(row); + const record: QueueReadCursorRecord = { taskId, readerId, taskVersion: task.version, readAt: nowIso() }; + await client.query( + `INSERT INTO agentrun_queue_read_cursors (task_id, reader_id, task_version, read_at) + VALUES ($1, $2, $3, $4) + ON CONFLICT (task_id, reader_id) DO UPDATE SET task_version = EXCLUDED.task_version, read_at = EXCLUDED.read_at`, + [record.taskId, record.readerId, record.taskVersion, record.readAt], + ); + return record; + }); + } + + async queueStats(queue?: string): Promise { + return buildQueueStats(await this.loadQueueTasksForProjection(queue), queue ?? null); + } + + async queueCommander(queue?: string): Promise { + const tasks = await this.loadQueueTasksForProjection(queue); + const generatedAt = nowIso(); + return { queue: queue ?? null, stats: buildQueueStats(tasks, queue ?? null, generatedAt), items: tasks.sort(queueTaskSort).slice(0, 20), generatedAt }; + } + async backends(): Promise { const result = await this.pool.query("SELECT * FROM agentrun_backends ORDER BY profile ASC"); return result.rows.map((row) => { @@ -534,6 +676,20 @@ CREATE TABLE IF NOT EXISTS agentrun_schema_migrations ( return Number(result.rows[0]?.seq ?? 1); } + private async nextQueueVersion(client: PoolClient): Promise { + const result = await client.query<{ version: string | number }>("SELECT nextval('agentrun_queue_version_seq') AS version"); + return Number(result.rows[0]?.version ?? 1); + } + + private async loadQueueTasksForProjection(queue?: string): Promise { + if (queue) { + const result = await this.pool.query("SELECT * FROM agentrun_queue_tasks WHERE queue = $1", [queue]); + return result.rows.map(queueTaskFromRow); + } + const result = await this.pool.query("SELECT * FROM agentrun_queue_tasks"); + return result.rows.map(queueTaskFromRow); + } + private async requireRunForUpdate(client: PoolClient, runId: string): Promise { const result = await client.query("SELECT * FROM agentrun_runs WHERE id = $1 FOR UPDATE", [runId]); const row = result.rows[0]; @@ -727,6 +883,36 @@ function runnerJobFromRow(row: QueryResultRow): RunnerJobRecord { }; } +function queueTaskFromRow(row: QueryResultRow): QueueTaskRecord { + return { + id: stringValue(row.id), + tenantId: stringValue(row.tenant_id), + projectId: stringValue(row.project_id), + queue: stringValue(row.queue), + lane: stringValue(row.lane), + title: stringValue(row.title), + priority: Number(row.priority), + state: stringValue(row.state) as QueueTaskState, + version: Number(row.version), + backendProfile: stringValue(row.backend_profile) as BackendProfile, + providerId: nullableString(row.provider_id), + workspaceRef: jsonValue(row.workspace_ref) as QueueTaskRecord["workspaceRef"], + executionPolicy: jsonValue(row.execution_policy) as QueueTaskRecord["executionPolicy"], + resourceBundleRef: jsonValue(row.resource_bundle_ref) as QueueTaskRecord["resourceBundleRef"], + payload: jsonRecord(row.payload), + payloadHash: stringValue(row.payload_hash), + references: jsonArray(row.references_json) as JsonRecord[], + metadata: jsonRecord(row.metadata), + ...(nullableString(row.idempotency_key) ? { idempotencyKey: stringValue(row.idempotency_key) } : {}), + latestAttempt: jsonValue(row.latest_attempt) as QueueTaskRecord["latestAttempt"], + sessionPath: nullableString(row.session_path), + createdAt: iso(row.created_at), + updatedAt: iso(row.updated_at), + cancelledAt: nullableIso(row.cancelled_at), + cancelReason: nullableString(row.cancel_reason), + }; +} + function metadataForRunner(runner: RunnerRecord): JsonRecord { const { id: _id, runId: _runId, attemptId: _attemptId, backendProfile: _backendProfile, placement: _placement, sourceCommit: _sourceCommit, registeredAt: _registeredAt, heartbeatAt: _heartbeatAt, ...metadata } = runner; return redactJson(metadata); @@ -749,6 +935,10 @@ function jsonRecord(value: unknown): JsonRecord { return typeof value === "object" && value !== null && !Array.isArray(value) ? value as JsonRecord : {}; } +function jsonArray(value: unknown): JsonValue[] { + return Array.isArray(value) ? value as JsonValue[] : []; +} + function iso(value: unknown): string { if (value instanceof Date) return value.toISOString(); if (typeof value === "string") return new Date(value).toISOString(); diff --git a/src/mgr/server.ts b/src/mgr/server.ts index 4afe13a..ad7ff03 100644 --- a/src/mgr/server.ts +++ b/src/mgr/server.ts @@ -1,10 +1,10 @@ import type { Server } from "node:http"; import { createServer } from "node:http"; import type { AddressInfo } from "node:net"; -import type { AgentRunStore } from "./store.js"; +import type { AgentRunStore, ListQueueTasksInput } from "./store.js"; import { openAgentRunStoreFromEnv } from "./store.js"; import { AgentRunError, errorToJson } from "../common/errors.js"; -import { asRecord, validateCreateCommand, validateCreateRun } from "../common/validation.js"; +import { asRecord, validateCreateCommand, validateCreateQueueTask, validateCreateRun, validateQueueTaskState } from "../common/validation.js"; import type { ApiErrorBody, ApiOkBody, JsonRecord, JsonValue, RunEvent } from "../common/types.js"; import { createKubernetesRunnerJob } from "./kubernetes-runner-job.js"; import { buildRunResult } from "./result.js"; @@ -68,6 +68,33 @@ async function route({ method, url, body, store, sourceCommit, runnerJobDefaults return { serviceId: "agentrun-mgr", live: true, ready, database, sourceCommit, secretRefs: { databaseUrl: database.adapter === "postgres" ? "redacted" : "not-used", valuesPrinted: false } }; } if (method === "GET" && path === "/api/v1/backends") return { items: await store.backends() as unknown as JsonValue }; + if (method === "POST" && path === "/api/v1/queue/tasks") return await store.createQueueTask(validateCreateQueueTask(body)) as unknown as JsonValue; + if (method === "GET" && path === "/api/v1/queue/tasks") { + const state = url.searchParams.get("state"); + const listInput: ListQueueTasksInput = { updatedAfter: integerQuery(url, "updatedAfter", 0), limit: integerQuery(url, "limit", 50) }; + const queue = url.searchParams.get("queue"); + const cursor = url.searchParams.get("cursor"); + if (queue) listInput.queue = queue; + if (state) listInput.state = validateQueueTaskState(state); + if (cursor) listInput.cursor = cursor; + return await store.listQueueTasks(listInput) as unknown as JsonValue; + } + const queueTaskMatch = path.match(/^\/api\/v1\/queue\/tasks\/([^/]+)$/u); + if (method === "GET" && queueTaskMatch) return await store.getQueueTask(queueTaskMatch[1] ?? "") as unknown as JsonValue; + const queueTaskCancelMatch = path.match(/^\/api\/v1\/queue\/tasks\/([^/]+)\/cancel$/u); + if (method === "POST" && queueTaskCancelMatch) { + const record = body === null ? {} : asRecord(body, "cancel"); + const reason = typeof record.reason === "string" && record.reason.trim().length > 0 ? record.reason.trim() : undefined; + return await store.cancelQueueTask(queueTaskCancelMatch[1] ?? "", reason) as unknown as JsonValue; + } + const queueTaskReadMatch = path.match(/^\/api\/v1\/queue\/tasks\/([^/]+)\/read$/u); + if (method === "POST" && queueTaskReadMatch) { + const record = body === null ? {} : asRecord(body, "read"); + const readerId = typeof record.readerId === "string" && record.readerId.trim().length > 0 ? record.readerId.trim() : "cli"; + return await store.markQueueTaskRead(queueTaskReadMatch[1] ?? "", readerId) as unknown as JsonValue; + } + if (method === "GET" && path === "/api/v1/queue/stats") return await store.queueStats(url.searchParams.get("queue") ?? undefined) as unknown as JsonValue; + if (method === "GET" && path === "/api/v1/queue/commander") return await store.queueCommander(url.searchParams.get("queue") ?? undefined) as unknown as JsonValue; if (method === "POST" && path === "/api/v1/runs") return await store.createRun(validateCreateRun(body)) as unknown as JsonValue; const runMatch = path.match(/^\/api\/v1\/runs\/([^/]+)$/u); if (method === "GET" && runMatch) return await store.getRun(runMatch[1] ?? "") as unknown as JsonValue; diff --git a/src/mgr/store.ts b/src/mgr/store.ts index cab4991..11b1eb2 100644 --- a/src/mgr/store.ts +++ b/src/mgr/store.ts @@ -1,4 +1,4 @@ -import type { BackendProfile, BackendTurnResult, CommandRecord, CreateCommandInput, CreateRunInput, FailureKind, JsonRecord, RunEvent, RunnerJobRecord, RunnerRecord, RunRecord, SessionRecord, SessionRef, TerminalStatus } from "../common/types.js"; +import type { BackendProfile, BackendTurnResult, CommandRecord, CreateCommandInput, CreateQueueTaskInput, CreateRunInput, FailureKind, JsonRecord, QueueCommanderSnapshot, QueueReadCursorRecord, QueueStats, QueueTaskListResult, QueueTaskRecord, QueueTaskState, RunEvent, RunnerJobRecord, RunnerRecord, RunRecord, SessionRecord, SessionRef, TerminalStatus } from "../common/types.js"; import { AgentRunError } from "../common/errors.js"; import { newId, nowIso, stableHash } from "../common/validation.js"; import { redactJson } from "../common/redaction.js"; @@ -39,10 +39,25 @@ export interface AgentRunStore { cancelRun(runId: string, reason?: string): MaybePromise; cancelCommand(commandId: string, reason?: string): MaybePromise; getSession(sessionId: string): MaybePromise; + createQueueTask(input: CreateQueueTaskInput): MaybePromise; + listQueueTasks(input: ListQueueTasksInput): MaybePromise; + getQueueTask(taskId: string): MaybePromise; + cancelQueueTask(taskId: string, reason?: string): MaybePromise; + markQueueTaskRead(taskId: string, readerId: string): MaybePromise; + queueStats(queue?: string): MaybePromise; + queueCommander(queue?: string): MaybePromise; backends(): MaybePromise; close?(): MaybePromise; } +export interface ListQueueTasksInput { + queue?: string; + state?: QueueTaskState; + cursor?: string; + limit: number; + updatedAfter?: number; +} + export interface SaveRunnerJobInput { runId: string; commandId: string; @@ -77,6 +92,9 @@ export class MemoryAgentRunStore implements AgentRunStore { private readonly runners = new Map(); private readonly sessions = new Map(); private readonly runnerJobs = new Map(); + private readonly queueTasks = new Map(); + private readonly queueReadCursors = new Map(); + private queueVersion = 0; health(): StoreHealth { return { adapter: "memory-self-test", ready: true, reachable: true, migrationReady: true, migrationId: "memory-self-test", failureKind: null, message: null, credentialValuesPrinted: false }; @@ -247,6 +265,64 @@ export class MemoryAgentRunStore implements AgentRunStore { return this.sessions.get(sessionId) ?? null; } + createQueueTask(input: CreateQueueTaskInput): QueueTaskRecord { + const payloadHash = stableHash(input.payload); + if (input.idempotencyKey) { + const existing = Array.from(this.queueTasks.values()).find((task) => task.tenantId === input.tenantId && task.projectId === input.projectId && task.idempotencyKey === input.idempotencyKey); + if (existing) { + if (existing.payloadHash !== payloadHash) throw new AgentRunError("schema-invalid", "queue task idempotency key reused with different payload", { httpStatus: 409 }); + return existing; + } + } + const at = nowIso(); + const task: QueueTaskRecord = { ...input, id: newId("qt"), state: "pending", version: this.nextQueueVersion(), payloadHash, latestAttempt: null, sessionPath: null, createdAt: at, updatedAt: at, cancelledAt: null, cancelReason: null }; + this.queueTasks.set(task.id, task); + return task; + } + + listQueueTasks(input: ListQueueTasksInput): QueueTaskListResult { + const startVersion = parseQueueCursor(input.cursor) ?? input.updatedAfter ?? 0; + const items = Array.from(this.queueTasks.values()) + .filter((task) => task.version > startVersion) + .filter((task) => !input.queue || task.queue === input.queue) + .filter((task) => !input.state || task.state === input.state) + .sort(queueTaskSort) + .slice(0, clampQueueLimit(input.limit)); + return { items, count: items.length, cursor: items.length > 0 ? String(items[items.length - 1]?.version ?? startVersion) : null }; + } + + getQueueTask(taskId: string): QueueTaskRecord { + const task = this.queueTasks.get(taskId); + if (!task) throw new AgentRunError("schema-invalid", `queue task ${taskId} was not found`, { httpStatus: 404 }); + return task; + } + + cancelQueueTask(taskId: string, reason = "cancel requested"): QueueTaskRecord { + const task = this.getQueueTask(taskId); + if (isTerminalQueueTaskState(task.state)) return task; + const at = nowIso(); + const next: QueueTaskRecord = { ...task, state: "cancelled", version: this.nextQueueVersion(), updatedAt: at, cancelledAt: at, cancelReason: reason }; + this.queueTasks.set(taskId, next); + return next; + } + + markQueueTaskRead(taskId: string, readerId: string): QueueReadCursorRecord { + const task = this.getQueueTask(taskId); + const record: QueueReadCursorRecord = { taskId, readerId, taskVersion: task.version, readAt: nowIso() }; + this.queueReadCursors.set(queueReadKey(taskId, readerId), record); + return record; + } + + queueStats(queue?: string): QueueStats { + return buildQueueStats(Array.from(this.queueTasks.values()).filter((task) => !queue || task.queue === queue), queue ?? null); + } + + queueCommander(queue?: string): QueueCommanderSnapshot { + const items = Array.from(this.queueTasks.values()).filter((task) => !queue || task.queue === queue).sort(queueTaskSort).slice(0, 20); + const generatedAt = nowIso(); + return { queue: queue ?? null, stats: buildQueueStats(Array.from(this.queueTasks.values()).filter((task) => !queue || task.queue === queue), queue ?? null, generatedAt), items, generatedAt }; + } + backends(): JsonRecord[] { return backendCapabilities(); } @@ -258,6 +334,11 @@ export class MemoryAgentRunStore implements AgentRunStore { return next; } + private nextQueueVersion(): number { + this.queueVersion += 1; + return this.queueVersion; + } + private resolveSessionForRun(input: CreateRunInput, at: string): SessionRef | null { if (!input.sessionRef) return null; const existing = this.sessions.get(input.sessionRef.sessionId); @@ -334,6 +415,10 @@ export function isTerminalCommandState(state: CommandRecord["state"]): boolean { return state === "completed" || state === "failed" || state === "cancelled"; } +export function isTerminalQueueTaskState(state: QueueTaskState): boolean { + return state === "completed" || state === "failed" || state === "blocked" || state === "cancelled"; +} + export function sessionRefFromRecord(record: SessionRecord, fallback: SessionRef): SessionRef { return { sessionId: record.sessionId, @@ -369,3 +454,34 @@ export function summarizeResourceBundleRef(resourceBundleRef: RunRecord["resourc credentialRef: resourceBundleRef.credentialRef ? { name: resourceBundleRef.credentialRef.name, namespace: resourceBundleRef.credentialRef.namespace ?? null, keys: resourceBundleRef.credentialRef.keys ?? [], valuesPrinted: false } : null, }; } + +export function queueTaskSort(a: QueueTaskRecord, b: QueueTaskRecord): number { + if (a.priority !== b.priority) return b.priority - a.priority; + return a.createdAt.localeCompare(b.createdAt) || a.id.localeCompare(b.id); +} + +export function buildQueueStats(tasks: QueueTaskRecord[], queue: string | null, generatedAt = nowIso()): QueueStats { + const byState: Record = {}; + const byLane: Record = {}; + let maxVersion = 0; + for (const task of tasks) { + byState[task.state] = (byState[task.state] ?? 0) + 1; + byLane[task.lane] = (byLane[task.lane] ?? 0) + 1; + maxVersion = Math.max(maxVersion, task.version); + } + return { queue, total: tasks.length, byState, byLane, maxVersion, generatedAt }; +} + +export function clampQueueLimit(limit: number): number { + return Math.max(1, Math.min(Number.isFinite(limit) ? Math.trunc(limit) : 50, 100)); +} + +export function parseQueueCursor(cursor: string | undefined): number | null { + if (!cursor) return null; + const value = Number(cursor); + return Number.isInteger(value) && value >= 0 ? value : null; +} + +function queueReadKey(taskId: string, readerId: string): string { + return `${taskId}:${readerId}`; +} diff --git a/src/selftest/cases/00-redaction-postgres.ts b/src/selftest/cases/00-redaction-postgres.ts index 9412a1a..6402fd0 100644 --- a/src/selftest/cases/00-redaction-postgres.ts +++ b/src/selftest/cases/00-redaction-postgres.ts @@ -13,13 +13,15 @@ const selfTest: SelfTestCase = async () => { (error) => error instanceof AgentRunError && error.failureKind === "infra-failed" && error.message.includes("DATABASE_URL is required"), ); const postgresContract = postgresMigrationContract(); - assert.equal(postgresContract.latestMigrationId, "003_v01_hwlab_manual_dispatch"); + assert.equal(postgresContract.latestMigrationId, "004_v01_queue_q1"); assert.ok(Array.isArray(postgresContract.requiredTables)); assert.ok(postgresContract.requiredTables.includes("agentrun_schema_migrations")); assert.ok(postgresContract.requiredTables.includes("agentrun_runs")); assert.ok(postgresContract.requiredTables.includes("agentrun_events")); assert.ok(postgresContract.requiredTables.includes("agentrun_sessions")); assert.ok(postgresContract.requiredTables.includes("agentrun_runner_jobs")); + assert.ok(postgresContract.requiredTables.includes("agentrun_queue_tasks")); + assert.ok(postgresContract.requiredTables.includes("agentrun_queue_read_cursors")); return { name: "redaction-postgres", tests: ["redaction", "postgres-store-contract"] }; }; diff --git a/src/selftest/cases/70-queue-q1.ts b/src/selftest/cases/70-queue-q1.ts new file mode 100644 index 0000000..ce21671 --- /dev/null +++ b/src/selftest/cases/70-queue-q1.ts @@ -0,0 +1,65 @@ +import assert from "node:assert/strict"; +import { startManagerServer } from "../../mgr/server.js"; +import { MemoryAgentRunStore } from "../../mgr/store.js"; +import { ManagerClient } from "../../mgr/client.js"; +import type { QueueCommanderSnapshot, QueueReadCursorRecord, QueueStats, QueueTaskListResult, QueueTaskRecord } from "../../common/types.js"; +import type { SelfTestCase } from "../harness.js"; + +const selfTest: SelfTestCase = async (context) => { + const server = await startManagerServer({ port: 0, host: "127.0.0.1", sourceCommit: "self-test", store: new MemoryAgentRunStore() }); + try { + const client = new ManagerClient(server.baseUrl); + const input = { + tenantId: "unidesk", + projectId: "pikasTech/unidesk", + queue: "dev", + lane: "main", + title: "Q1 queue task", + priority: 10, + backendProfile: "codex", + providerId: "G14", + workspaceRef: { kind: "host-path", path: context.workspace }, + executionPolicy: null, + resourceBundleRef: null, + payload: { prompt: "hello" }, + references: [{ kind: "issue", url: "https://github.com/pikasTech/agentrun/issues/39" }], + metadata: { source: "self-test" }, + idempotencyKey: "queue-q1-self-test", + }; + const created = await client.post("/api/v1/queue/tasks", input) as QueueTaskRecord; + assert.equal(created.state, "pending"); + assert.equal(created.sessionPath, null); + assert.equal(created.latestAttempt, null); + const duplicate = await client.post("/api/v1/queue/tasks", input) as QueueTaskRecord; + assert.equal(duplicate.id, created.id); + + const listed = await client.get("/api/v1/queue/tasks?queue=dev&limit=10") as QueueTaskListResult; + assert.equal(listed.count, 1); + assert.equal(listed.items[0]?.id, created.id); + + const shown = await client.get(`/api/v1/queue/tasks/${created.id}`) as QueueTaskRecord; + assert.equal(shown.title, "Q1 queue task"); + assert.equal(shown.sessionPath, null); + + const stats = await client.get("/api/v1/queue/stats?queue=dev") as QueueStats; + assert.equal(stats.total, 1); + assert.equal(stats.byState.pending, 1); + + const read = await client.post(`/api/v1/queue/tasks/${created.id}/read`, { readerId: "self-test" }) as QueueReadCursorRecord; + assert.equal(read.taskId, created.id); + assert.equal(read.readerId, "self-test"); + + const cancelled = await client.post(`/api/v1/queue/tasks/${created.id}/cancel`, { reason: "self-test complete" }) as QueueTaskRecord; + assert.equal(cancelled.state, "cancelled"); + assert.equal(cancelled.cancelReason, "self-test complete"); + + const commander = await client.get("/api/v1/queue/commander?queue=dev") as QueueCommanderSnapshot; + assert.equal(commander.stats.byState.cancelled, 1); + assert.equal(commander.items[0]?.id, created.id); + return { name: "queue-q1", tests: ["queue-q1-rest-memory"] }; + } finally { + await new Promise((resolve) => server.server.close(() => resolve())); + } +}; + +export default selfTest;