feat: 实现 Queue Q1 API 和 CLI 骨架

This commit is contained in:
Codex
2026-06-01 22:20:09 +08:00
parent f4a26e5961
commit 237b10c4da
11 changed files with 566 additions and 14 deletions
+2 -1
View File
@@ -120,6 +120,7 @@ CLI 官方 TypeScript 入口固定为 `scripts/agentrun-cli.ts`。在 G14 非交
| runner start/job | 已实现 | `runner start` 可执行 host process runner`runner job --dry-run` 可渲染 Kubernetes Job JSON`runner job` 正式路径通过 manager REST 创建 Kubernetes Job,支持 `--idempotency-key` 并快速返回 job identity、SecretRef、retention 和轮询命令。 |
| runner jobs/job-status | 已实现 | CLI 通过 manager REST 查询 runner Job 持久记录和最小状态摘要,不直连 Kubernetes、不读取 Secret 值。 |
| result/cancel CLI | 已实现 | `runs result``commands result``runs cancel``commands cancel` 均调用 manager REST,不维护独立状态。 |
| Queue/Session CLI | 实现 | 规格见 [spec-v01-queue.md](spec-v01-queue.md)Queue 命令只返回 sessionPath,输出和 trace 进入 Session 命令。 |
| Queue CLI | 实现/Q1 | 已提供 `queue submit/list/show/stats/commander/read/cancel`,通过 manager REST 访问 Queue task 和 stats,不直连 Postgres。 |
| Session CLI | 待实现 | 规格见 [spec-v01-queue.md](spec-v01-queue.md);输出和 trace 进入 Session 命令,Queue 命令不得代理 output/trace。 |
| CLI 测试规格 | 已定义/已验证主闭环 | 综合联调见 [spec-v01-validation.md](spec-v01-validation.md);每次发布仍按手动交互验收复跑。 |
| `deepseek` profile CLI | 已实现/已通过主闭环 | `secrets codex render --profile deepseek``backends list``runner start --backend``runner job` 和 JSON 错误可见性已实现;真实 CLI/RESTful 联调已通过 `codex -> deepseek -> codex` 切换主闭环。 |
+2 -2
View File
@@ -83,7 +83,7 @@ Secret 名称和 key 可以在实现时按 Kubernetes 命名限制微调,但
| --- | --- | --- |
| Postgres durable store 规格 | 已定义 | 本文为 v0.1 存储权威。 |
| StatefulSet/Service/PVC | 已实现/已通过主闭环 | `agentrun-v01-postgres` StatefulSet、Service 和 PVC 已由 GitOps runtime 提供,作为 `agentrun-v01` durable store。 |
| migration ledger | 已实现/已通过主闭环 | `agentrun-mgr` 启动 Postgres adapter 时幂等创建 `agentrun_schema_migrations` 并记录 migration id/checksum;当前最新 migration 为 `003_v01_hwlab_manual_dispatch`,用于新增 SessionRef、ResourceBundleRef 和 runner job idempotency 持久化;readiness 必须显示 migration ready。 |
| manager Postgres adapter | 已实现/已通过主闭环 | `agentrun-mgr` 通过 `DATABASE_URL` 启用 Postgres adapter,持久化 runs、commands、events、runners、runner_jobs、sessions、backends 和 leases;缺少 `DATABASE_URL` 时 live runtime fail fastmemory 只允许显式 self-test/dev。 |
| migration ledger | 已实现/已通过主闭环 | `agentrun-mgr` 启动 Postgres adapter 时幂等创建 `agentrun_schema_migrations` 并记录 migration id/checksum;当前最新 migration 为 `004_v01_queue_q1`,用于新增 Q1 Queue task 与 read cursor 持久化;readiness 必须显示 migration ready。 |
| manager Postgres adapter | 已实现/已通过主闭环 | `agentrun-mgr` 通过 `DATABASE_URL` 启用 Postgres adapter,持久化 runs、commands、events、runners、runner_jobs、sessions、backends、leases、Queue taskread cursor;缺少 `DATABASE_URL` 时 live runtime fail fastmemory 只允许显式 self-test/dev。 |
| health/readiness store 状态 | 已实现 | health/readiness 返回 adapter、reachable、migrationReady、migrationId、failureKind 和 redacted Secret 状态,不输出 DSN 明文。 |
| file/sqlite durable store | 不采用 | 只可用于临时本地测试,不作为 v0.1 runtime truth。 |
+2 -2
View File
@@ -161,8 +161,8 @@ Queue 首版新增或扩展的稳定表方向:
| 规格项 | 状态 | 说明 |
| --- | --- | --- |
| AgentRun Queue 直接吸收规格 | 已定义 | 本文为 Queue 吸收 Code Queue 的首版权威规格。 |
| Queue RESTful API | 实现 | 计划通过 `agentrun-mgr` 暴露,仍使用短请求和轻量轮询。 |
| Queue CLI | 实现 | 计划加入 `queue ...``sessions ...` 命令族。 |
| Queue RESTful API | 实现/Q1 | 通过 `agentrun-mgr` 暴露 `submit/list/show/stats/read/cancel/commander`,使用短请求和 Queue version/cursor 轻量轮询;Q2 再接入 attempt 与真实执行。 |
| Queue CLI | 实现/Q1 | 加入 `queue submit/list/show/stats/commander/read/cancel`Queue 命令只返回 task summary、stats、read cursor`sessionPath`。 |
| Session API/CLI | 待实现 | Queue 只返回 `sessionPath`Session 层承接输出、trace 和控制。 |
| Scheduler 接入 | 待实现 | 旧 Code Queue scheduler 不保留;AgentRun Scheduler 是唯一调度方向。 |
| OA/Event/integrations | 不采用 | 首版不做,后续如需外部 connector/sink 必须单独立规格。 |
+42
View File
@@ -33,6 +33,13 @@ async function dispatch(args: ParsedArgs): Promise<JsonValue> {
if (group === "server" && command === "status") return client(args).get("/health/readiness");
if (group === "backends" && command === "list") return client(args).get("/api/v1/backends");
if (group === "secrets" && command === "codex" && id === "render") return renderCodexSecret(args);
if (group === "queue" && command === "submit") return submitQueueTask(args);
if (group === "queue" && command === "list") return listQueueTasks(args);
if (group === "queue" && command === "show" && id) return client(args).get(`/api/v1/queue/tasks/${encodeURIComponent(id)}`);
if (group === "queue" && command === "stats") return client(args).get(`/api/v1/queue/stats${queueQuery(args)}`);
if (group === "queue" && command === "commander") return client(args).get(`/api/v1/queue/commander${queueQuery(args)}`);
if (group === "queue" && command === "read" && id) return client(args).post(`/api/v1/queue/tasks/${encodeURIComponent(id)}/read`, { readerId: optionalFlag(args, "reader-id") ?? "cli" });
if (group === "queue" && command === "cancel" && id) return client(args).post(`/api/v1/queue/tasks/${encodeURIComponent(id)}/cancel`, cancelBody(args));
if (group === "runs" && command === "create") return client(args).post("/api/v1/runs", await jsonFile(args));
if (group === "runs" && command === "show" && id) return client(args).get(`/api/v1/runs/${encodeURIComponent(id)}`);
if (group === "runs" && command === "events" && id) return client(args).get(`/api/v1/runs/${encodeURIComponent(id)}/events?afterSeq=${flag(args, "after-seq", "0")}&limit=${flag(args, "limit", "100")}`);
@@ -92,6 +99,34 @@ async function listRunnerJobs(args: ParsedArgs): Promise<JsonValue> {
return client(args).get(`/api/v1/runs/${encodeURIComponent(runId)}/runner-jobs${commandId ? `?commandId=${encodeURIComponent(commandId)}` : ""}`);
}
async function submitQueueTask(args: ParsedArgs): Promise<JsonValue> {
const body = await jsonFile(args);
const idempotencyKey = optionalFlag(args, "idempotency-key");
if (idempotencyKey) body.idempotencyKey = idempotencyKey;
return client(args).post("/api/v1/queue/tasks", body);
}
async function listQueueTasks(args: ParsedArgs): Promise<JsonValue> {
const params = new URLSearchParams();
const queue = optionalFlag(args, "queue");
const state = optionalFlag(args, "state");
const cursor = optionalFlag(args, "cursor");
const limit = optionalFlag(args, "limit");
const updatedAfter = optionalFlag(args, "updated-after");
if (queue) params.set("queue", queue);
if (state) params.set("state", state);
if (cursor) params.set("cursor", cursor);
if (limit) params.set("limit", limit);
if (updatedAfter) params.set("updatedAfter", updatedAfter);
const query = params.toString();
return client(args).get(`/api/v1/queue/tasks${query ? `?${query}` : ""}`);
}
function queueQuery(args: ParsedArgs): string {
const queue = optionalFlag(args, "queue");
return queue ? `?queue=${encodeURIComponent(queue)}` : "";
}
async function showRunnerJobStatus(args: ParsedArgs): Promise<JsonValue> {
const runId = flag(args, "run-id", "");
if (!runId) throw new AgentRunError("schema-invalid", "runner job-status requires --run-id", { httpStatus: 2 });
@@ -240,6 +275,13 @@ function help(): JsonRecord {
"runner job --dry-run --run-id <runId> --command-id <commandId> --image <image>",
"runner jobs --run-id <runId> [--command-id <commandId>]",
"runner job-status [runnerJobId] --run-id <runId>",
"queue submit --json-file <task.json> [--idempotency-key <key>]",
"queue list [--queue <queue>] [--state <state>] [--cursor <cursor>] [--limit <limit>] [--updated-after <version>]",
"queue show <taskId>",
"queue stats [--queue <queue>]",
"queue commander [--queue <queue>]",
"queue read <taskId> [--reader-id <reader>]",
"queue cancel <taskId> [--reason <text>]",
"secrets codex render --dry-run [--profile codex|deepseek] [--codex-home <dir>] [--namespace agentrun-v01] [--secret-name <name>]",
"backends list",
"server start|status",
+71
View File
@@ -23,6 +23,7 @@ export type RunStatus = "pending" | "claimed" | "running" | "completed" | "faile
export type CommandState = "pending" | "acknowledged" | "completed" | "failed" | "cancelled";
export type TerminalStatus = "completed" | "failed" | "blocked" | "cancelled";
export type BackendProfile = "codex" | "deepseek";
export type QueueTaskState = "pending" | "running" | "completed" | "failed" | "blocked" | "cancelled";
export interface WorkspaceRef extends JsonRecord {
kind: "git-worktree" | "host-path" | "kubernetes-pvc" | "opaque";
@@ -168,6 +169,76 @@ export interface RunnerJobRecord extends JsonRecord {
updatedAt: string;
}
export interface QueueAttemptRef extends JsonRecord {
attemptId: string;
state: QueueTaskState;
runId: string | null;
commandId: string | null;
runnerJobId: string | null;
sessionId: string | null;
sessionPath: string | null;
}
export interface CreateQueueTaskInput extends JsonRecord {
tenantId: string;
projectId: string;
queue: string;
lane: string;
title: string;
priority: number;
backendProfile: BackendProfile;
providerId: string | null;
workspaceRef: WorkspaceRef | null;
executionPolicy: ExecutionPolicy | null;
resourceBundleRef: ResourceBundleRef | null;
payload: JsonRecord;
references: JsonRecord[];
metadata: JsonRecord;
idempotencyKey?: string;
}
export interface QueueTaskRecord extends CreateQueueTaskInput {
id: string;
state: QueueTaskState;
version: number;
payloadHash: string;
latestAttempt: QueueAttemptRef | null;
sessionPath: string | null;
createdAt: string;
updatedAt: string;
cancelledAt: string | null;
cancelReason: string | null;
}
export interface QueueReadCursorRecord extends JsonRecord {
taskId: string;
readerId: string;
taskVersion: number;
readAt: string;
}
export interface QueueTaskListResult extends JsonRecord {
items: QueueTaskRecord[];
count: number;
cursor: string | null;
}
export interface QueueStats extends JsonRecord {
queue: string | null;
total: number;
byState: JsonRecord;
byLane: JsonRecord;
maxVersion: number;
generatedAt: string;
}
export interface QueueCommanderSnapshot extends JsonRecord {
queue: string | null;
stats: QueueStats;
items: QueueTaskRecord[];
generatedAt: string;
}
export interface BackendEvent {
type: EventType;
payload: JsonRecord;
+39 -1
View File
@@ -1,5 +1,5 @@
import { createHash, randomUUID } from "node:crypto";
import type { BackendProfile, CreateCommandInput, CreateRunInput, ExecutionPolicy, JsonRecord, JsonValue, ResourceBundleRef, SecretRef, SessionRef } from "./types.js";
import type { BackendProfile, CreateCommandInput, CreateQueueTaskInput, CreateRunInput, ExecutionPolicy, JsonRecord, JsonValue, QueueTaskState, ResourceBundleRef, SecretRef, SessionRef } from "./types.js";
import { AgentRunError } from "./errors.js";
import { backendProfileSpec, backendProfiles, isBackendProfile } from "./backend-profiles.js";
@@ -168,3 +168,41 @@ export function validateCreateCommand(input: unknown): CreateCommandInput {
const idempotencyKey = typeof record.idempotencyKey === "string" && record.idempotencyKey.trim().length > 0 ? record.idempotencyKey.trim() : undefined;
return { type, payload, ...(idempotencyKey ? { idempotencyKey } : {}) };
}
export function validateCreateQueueTask(input: unknown): CreateQueueTaskInput {
const record = asRecord(input, "queueTask");
const tenantId = requiredString(record, "tenantId");
if (!allowedTenants.has(tenantId)) throw new AgentRunError("tenant-policy-denied", `tenantId ${tenantId} is not allowed`, { httpStatus: 403 });
const backendProfileValue = optionalString(record.backendProfile) ?? "codex";
if (!isBackendProfile(backendProfileValue)) throw new AgentRunError("schema-invalid", `backendProfile ${backendProfileValue} is not supported in v0.1`, { httpStatus: 400, details: { allowedBackends: [...backendProfiles] } });
const queue = optionalString(record.queue) ?? "default";
const lane = optionalString(record.lane) ?? "default";
const priorityValue = record.priority ?? 0;
if (typeof priorityValue !== "number" || !Number.isFinite(priorityValue)) throw new AgentRunError("schema-invalid", "priority must be a finite number", { httpStatus: 400 });
const referencesValue = record.references ?? [];
if (!Array.isArray(referencesValue)) throw new AgentRunError("schema-invalid", "references must be an array", { httpStatus: 400 });
const result: CreateQueueTaskInput = {
tenantId,
projectId: requiredString(record, "projectId"),
queue,
lane,
title: requiredString(record, "title"),
priority: priorityValue,
backendProfile: backendProfileValue,
providerId: optionalString(record.providerId) ?? null,
workspaceRef: record.workspaceRef === undefined || record.workspaceRef === null ? null : requiredRecord(record, "workspaceRef") as CreateQueueTaskInput["workspaceRef"],
executionPolicy: record.executionPolicy === undefined || record.executionPolicy === null ? null : validateExecutionPolicy(requiredRecord(record, "executionPolicy")),
resourceBundleRef: validateResourceBundleRef(record.resourceBundleRef),
payload: record.payload === undefined ? {} : asRecord(record.payload, "payload"),
references: referencesValue.map((item, index) => asRecord(item, `references[${index}]`)),
metadata: record.metadata === undefined ? {} : asRecord(record.metadata, "metadata"),
};
const idempotencyKey = optionalString(record.idempotencyKey);
if (idempotencyKey) result.idempotencyKey = idempotencyKey;
return result;
}
export function validateQueueTaskState(value: string): QueueTaskState {
if (value === "pending" || value === "running" || value === "completed" || value === "failed" || value === "blocked" || value === "cancelled") return value;
throw new AgentRunError("schema-invalid", `queue task state ${value} is not supported`, { httpStatus: 400 });
}
+194 -4
View File
@@ -3,10 +3,10 @@ import { Pool } from "pg";
import type { PoolClient, QueryResultRow } from "pg";
import { AgentRunError } from "../common/errors.js";
import { redactJson } from "../common/redaction.js";
import type { BackendProfile, BackendTurnResult, CommandRecord, CommandState, CreateCommandInput, CreateRunInput, EventType, FailureKind, JsonRecord, JsonValue, RunEvent, RunnerJobRecord, RunnerRecord, RunRecord, RunStatus, SessionRecord, SessionRef, TerminalStatus } from "../common/types.js";
import type { BackendProfile, BackendTurnResult, CommandRecord, CommandState, CreateCommandInput, CreateQueueTaskInput, CreateRunInput, EventType, FailureKind, JsonRecord, JsonValue, QueueCommanderSnapshot, QueueReadCursorRecord, QueueStats, QueueTaskListResult, QueueTaskRecord, QueueTaskState, RunEvent, RunnerJobRecord, RunnerRecord, RunRecord, RunStatus, SessionRecord, SessionRef, TerminalStatus } from "../common/types.js";
import { newId, nowIso, stableHash } from "../common/validation.js";
import type { AgentRunStore, SaveRunnerJobInput, StoreHealth } from "./store.js";
import { assertSessionBoundary, commandStateFromTerminal, isTerminalCommandState, isTerminalRunStatus, sessionRefFromRecord, statusFromTerminal, summarizeResourceBundleRef, summarizeSessionRef } from "./store.js";
import type { AgentRunStore, ListQueueTasksInput, SaveRunnerJobInput, StoreHealth } from "./store.js";
import { assertSessionBoundary, buildQueueStats, clampQueueLimit, commandStateFromTerminal, isTerminalCommandState, isTerminalQueueTaskState, isTerminalRunStatus, parseQueueCursor, queueTaskSort, sessionRefFromRecord, statusFromTerminal, summarizeResourceBundleRef, summarizeSessionRef } from "./store.js";
import { backendCapabilitiesSqlValues, mergeBackendCapability } from "../common/backend-profiles.js";
import { normalizeRunEventPayload, requireEventType } from "../common/events.js";
@@ -171,6 +171,53 @@ CREATE INDEX IF NOT EXISTS agentrun_runner_jobs_run_command_idx ON agentrun_runn
CREATE INDEX IF NOT EXISTS agentrun_sessions_tenant_project_idx ON agentrun_sessions (tenant_id, project_id, backend_profile, updated_at);
`;
const queueQ1MigrationSql = `
CREATE TABLE IF NOT EXISTS agentrun_queue_tasks (
id text PRIMARY KEY,
tenant_id text NOT NULL,
project_id text NOT NULL,
queue text NOT NULL,
lane text NOT NULL,
title text NOT NULL,
priority integer NOT NULL,
state text NOT NULL,
version bigint NOT NULL,
backend_profile text NOT NULL,
provider_id text,
workspace_ref jsonb,
execution_policy jsonb,
resource_bundle_ref jsonb,
payload jsonb NOT NULL,
payload_hash text NOT NULL,
references_json jsonb NOT NULL DEFAULT '[]'::jsonb,
metadata jsonb NOT NULL DEFAULT '{}'::jsonb,
idempotency_key text,
latest_attempt jsonb,
session_path text,
created_at timestamptz NOT NULL,
updated_at timestamptz NOT NULL,
cancelled_at timestamptz,
cancel_reason text
);
CREATE UNIQUE INDEX IF NOT EXISTS agentrun_queue_tasks_idempotency_idx
ON agentrun_queue_tasks (tenant_id, project_id, idempotency_key)
WHERE idempotency_key IS NOT NULL;
CREATE INDEX IF NOT EXISTS agentrun_queue_tasks_list_idx ON agentrun_queue_tasks (queue, state, version, priority, created_at);
CREATE INDEX IF NOT EXISTS agentrun_queue_tasks_updated_idx ON agentrun_queue_tasks (version, updated_at);
CREATE TABLE IF NOT EXISTS agentrun_queue_read_cursors (
task_id text NOT NULL REFERENCES agentrun_queue_tasks(id) ON DELETE CASCADE,
reader_id text NOT NULL,
task_version bigint NOT NULL,
read_at timestamptz NOT NULL,
PRIMARY KEY (task_id, reader_id)
);
CREATE SEQUENCE IF NOT EXISTS agentrun_queue_version_seq;
`;
const postgresMigrations: MigrationDefinition[] = [
{
id: "001_v01_initial_durable_store",
@@ -187,13 +234,18 @@ const postgresMigrations: MigrationDefinition[] = [
checksum: checksumSql(hwlabManualDispatchMigrationSql),
sql: hwlabManualDispatchMigrationSql,
},
{
id: "004_v01_queue_q1",
checksum: checksumSql(queueQ1MigrationSql),
sql: queueQ1MigrationSql,
},
];
export function postgresMigrationContract(): JsonRecord {
return {
migrationIds: postgresMigrations.map((migration) => migration.id),
latestMigrationId: latestMigrationId(),
requiredTables: ["agentrun_schema_migrations", "agentrun_runs", "agentrun_commands", "agentrun_events", "agentrun_runners", "agentrun_backends", "agentrun_leases", "agentrun_sessions", "agentrun_runner_jobs"],
requiredTables: ["agentrun_schema_migrations", "agentrun_runs", "agentrun_commands", "agentrun_events", "agentrun_runners", "agentrun_backends", "agentrun_leases", "agentrun_sessions", "agentrun_runner_jobs", "agentrun_queue_tasks", "agentrun_queue_read_cursors"],
checksums: Object.fromEntries(postgresMigrations.map((migration) => [migration.id, migration.checksum])),
};
}
@@ -508,6 +560,96 @@ CREATE TABLE IF NOT EXISTS agentrun_schema_migrations (
return result.rows[0] ? sessionFromRow(result.rows[0]) : null;
}
async createQueueTask(input: CreateQueueTaskInput): Promise<QueueTaskRecord> {
const payloadHash = stableHash(input.payload);
return this.withTransaction(async (client) => {
if (input.idempotencyKey) {
const existing = await client.query("SELECT * FROM agentrun_queue_tasks WHERE tenant_id = $1 AND project_id = $2 AND idempotency_key = $3 FOR UPDATE", [input.tenantId, input.projectId, input.idempotencyKey]);
if (existing.rows[0]) {
const record = queueTaskFromRow(existing.rows[0]);
if (record.payloadHash !== payloadHash) throw new AgentRunError("schema-invalid", "queue task idempotency key reused with different payload", { httpStatus: 409 });
return record;
}
}
const at = nowIso();
const version = await this.nextQueueVersion(client);
const task: QueueTaskRecord = { ...input, id: newId("qt"), state: "pending", version, payloadHash, latestAttempt: null, sessionPath: null, createdAt: at, updatedAt: at, cancelledAt: null, cancelReason: null };
const inserted = await client.query(
`INSERT INTO agentrun_queue_tasks (id, tenant_id, project_id, queue, lane, title, priority, state, version, backend_profile, provider_id, workspace_ref, execution_policy, resource_bundle_ref, payload, payload_hash, references_json, metadata, idempotency_key, latest_attempt, session_path, created_at, updated_at, cancelled_at, cancel_reason)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12::jsonb, $13::jsonb, $14::jsonb, $15::jsonb, $16, $17::jsonb, $18::jsonb, $19, $20::jsonb, $21, $22, $23, $24, $25)
RETURNING *`,
[task.id, task.tenantId, task.projectId, task.queue, task.lane, task.title, task.priority, task.state, task.version, task.backendProfile, task.providerId, JSON.stringify(task.workspaceRef), JSON.stringify(task.executionPolicy), JSON.stringify(task.resourceBundleRef), JSON.stringify(task.payload), task.payloadHash, JSON.stringify(task.references), JSON.stringify(task.metadata), task.idempotencyKey ?? null, JSON.stringify(task.latestAttempt), task.sessionPath, task.createdAt, task.updatedAt, task.cancelledAt, task.cancelReason],
);
return queueTaskFromRow(inserted.rows[0]);
});
}
async listQueueTasks(input: ListQueueTasksInput): Promise<QueueTaskListResult> {
const startVersion = parseQueueCursor(input.cursor) ?? input.updatedAfter ?? 0;
const params: unknown[] = [startVersion];
const where = ["version > $1"];
if (input.queue) {
params.push(input.queue);
where.push(`queue = $${params.length}`);
}
if (input.state) {
params.push(input.state);
where.push(`state = $${params.length}`);
}
params.push(clampQueueLimit(input.limit));
const result = await this.pool.query(`SELECT * FROM agentrun_queue_tasks WHERE ${where.join(" AND ")} ORDER BY priority DESC, created_at ASC, id ASC LIMIT $${params.length}`, params);
const items = result.rows.map(queueTaskFromRow);
return { items, count: items.length, cursor: items.length > 0 ? String(items[items.length - 1]?.version ?? startVersion) : null };
}
async getQueueTask(taskId: string): Promise<QueueTaskRecord> {
const result = await this.pool.query("SELECT * FROM agentrun_queue_tasks WHERE id = $1", [taskId]);
const row = result.rows[0];
if (!row) throw new AgentRunError("schema-invalid", `queue task ${taskId} was not found`, { httpStatus: 404 });
return queueTaskFromRow(row);
}
async cancelQueueTask(taskId: string, reason = "cancel requested"): Promise<QueueTaskRecord> {
return this.withTransaction(async (client) => {
const existing = await client.query("SELECT * FROM agentrun_queue_tasks WHERE id = $1 FOR UPDATE", [taskId]);
const row = existing.rows[0];
if (!row) throw new AgentRunError("schema-invalid", `queue task ${taskId} was not found`, { httpStatus: 404 });
const task = queueTaskFromRow(row);
if (isTerminalQueueTaskState(task.state)) return task;
const at = nowIso();
const version = await this.nextQueueVersion(client);
const updated = await client.query("UPDATE agentrun_queue_tasks SET state = 'cancelled', version = $2, updated_at = $3, cancelled_at = $3, cancel_reason = $4 WHERE id = $1 RETURNING *", [taskId, version, at, reason]);
return queueTaskFromRow(updated.rows[0]);
});
}
async markQueueTaskRead(taskId: string, readerId: string): Promise<QueueReadCursorRecord> {
return this.withTransaction(async (client) => {
const existing = await client.query("SELECT * FROM agentrun_queue_tasks WHERE id = $1 FOR UPDATE", [taskId]);
const row = existing.rows[0];
if (!row) throw new AgentRunError("schema-invalid", `queue task ${taskId} was not found`, { httpStatus: 404 });
const task = queueTaskFromRow(row);
const record: QueueReadCursorRecord = { taskId, readerId, taskVersion: task.version, readAt: nowIso() };
await client.query(
`INSERT INTO agentrun_queue_read_cursors (task_id, reader_id, task_version, read_at)
VALUES ($1, $2, $3, $4)
ON CONFLICT (task_id, reader_id) DO UPDATE SET task_version = EXCLUDED.task_version, read_at = EXCLUDED.read_at`,
[record.taskId, record.readerId, record.taskVersion, record.readAt],
);
return record;
});
}
async queueStats(queue?: string): Promise<QueueStats> {
return buildQueueStats(await this.loadQueueTasksForProjection(queue), queue ?? null);
}
async queueCommander(queue?: string): Promise<QueueCommanderSnapshot> {
const tasks = await this.loadQueueTasksForProjection(queue);
const generatedAt = nowIso();
return { queue: queue ?? null, stats: buildQueueStats(tasks, queue ?? null, generatedAt), items: tasks.sort(queueTaskSort).slice(0, 20), generatedAt };
}
async backends(): Promise<JsonRecord[]> {
const result = await this.pool.query("SELECT * FROM agentrun_backends ORDER BY profile ASC");
return result.rows.map((row) => {
@@ -534,6 +676,20 @@ CREATE TABLE IF NOT EXISTS agentrun_schema_migrations (
return Number(result.rows[0]?.seq ?? 1);
}
private async nextQueueVersion(client: PoolClient): Promise<number> {
const result = await client.query<{ version: string | number }>("SELECT nextval('agentrun_queue_version_seq') AS version");
return Number(result.rows[0]?.version ?? 1);
}
private async loadQueueTasksForProjection(queue?: string): Promise<QueueTaskRecord[]> {
if (queue) {
const result = await this.pool.query("SELECT * FROM agentrun_queue_tasks WHERE queue = $1", [queue]);
return result.rows.map(queueTaskFromRow);
}
const result = await this.pool.query("SELECT * FROM agentrun_queue_tasks");
return result.rows.map(queueTaskFromRow);
}
private async requireRunForUpdate(client: PoolClient, runId: string): Promise<RunRecord> {
const result = await client.query("SELECT * FROM agentrun_runs WHERE id = $1 FOR UPDATE", [runId]);
const row = result.rows[0];
@@ -727,6 +883,36 @@ function runnerJobFromRow(row: QueryResultRow): RunnerJobRecord {
};
}
function queueTaskFromRow(row: QueryResultRow): QueueTaskRecord {
return {
id: stringValue(row.id),
tenantId: stringValue(row.tenant_id),
projectId: stringValue(row.project_id),
queue: stringValue(row.queue),
lane: stringValue(row.lane),
title: stringValue(row.title),
priority: Number(row.priority),
state: stringValue(row.state) as QueueTaskState,
version: Number(row.version),
backendProfile: stringValue(row.backend_profile) as BackendProfile,
providerId: nullableString(row.provider_id),
workspaceRef: jsonValue(row.workspace_ref) as QueueTaskRecord["workspaceRef"],
executionPolicy: jsonValue(row.execution_policy) as QueueTaskRecord["executionPolicy"],
resourceBundleRef: jsonValue(row.resource_bundle_ref) as QueueTaskRecord["resourceBundleRef"],
payload: jsonRecord(row.payload),
payloadHash: stringValue(row.payload_hash),
references: jsonArray(row.references_json) as JsonRecord[],
metadata: jsonRecord(row.metadata),
...(nullableString(row.idempotency_key) ? { idempotencyKey: stringValue(row.idempotency_key) } : {}),
latestAttempt: jsonValue(row.latest_attempt) as QueueTaskRecord["latestAttempt"],
sessionPath: nullableString(row.session_path),
createdAt: iso(row.created_at),
updatedAt: iso(row.updated_at),
cancelledAt: nullableIso(row.cancelled_at),
cancelReason: nullableString(row.cancel_reason),
};
}
function metadataForRunner(runner: RunnerRecord): JsonRecord {
const { id: _id, runId: _runId, attemptId: _attemptId, backendProfile: _backendProfile, placement: _placement, sourceCommit: _sourceCommit, registeredAt: _registeredAt, heartbeatAt: _heartbeatAt, ...metadata } = runner;
return redactJson(metadata);
@@ -749,6 +935,10 @@ function jsonRecord(value: unknown): JsonRecord {
return typeof value === "object" && value !== null && !Array.isArray(value) ? value as JsonRecord : {};
}
function jsonArray(value: unknown): JsonValue[] {
return Array.isArray(value) ? value as JsonValue[] : [];
}
function iso(value: unknown): string {
if (value instanceof Date) return value.toISOString();
if (typeof value === "string") return new Date(value).toISOString();
+29 -2
View File
@@ -1,10 +1,10 @@
import type { Server } from "node:http";
import { createServer } from "node:http";
import type { AddressInfo } from "node:net";
import type { AgentRunStore } from "./store.js";
import type { AgentRunStore, ListQueueTasksInput } from "./store.js";
import { openAgentRunStoreFromEnv } from "./store.js";
import { AgentRunError, errorToJson } from "../common/errors.js";
import { asRecord, validateCreateCommand, validateCreateRun } from "../common/validation.js";
import { asRecord, validateCreateCommand, validateCreateQueueTask, validateCreateRun, validateQueueTaskState } from "../common/validation.js";
import type { ApiErrorBody, ApiOkBody, JsonRecord, JsonValue, RunEvent } from "../common/types.js";
import { createKubernetesRunnerJob } from "./kubernetes-runner-job.js";
import { buildRunResult } from "./result.js";
@@ -68,6 +68,33 @@ async function route({ method, url, body, store, sourceCommit, runnerJobDefaults
return { serviceId: "agentrun-mgr", live: true, ready, database, sourceCommit, secretRefs: { databaseUrl: database.adapter === "postgres" ? "redacted" : "not-used", valuesPrinted: false } };
}
if (method === "GET" && path === "/api/v1/backends") return { items: await store.backends() as unknown as JsonValue };
if (method === "POST" && path === "/api/v1/queue/tasks") return await store.createQueueTask(validateCreateQueueTask(body)) as unknown as JsonValue;
if (method === "GET" && path === "/api/v1/queue/tasks") {
const state = url.searchParams.get("state");
const listInput: ListQueueTasksInput = { updatedAfter: integerQuery(url, "updatedAfter", 0), limit: integerQuery(url, "limit", 50) };
const queue = url.searchParams.get("queue");
const cursor = url.searchParams.get("cursor");
if (queue) listInput.queue = queue;
if (state) listInput.state = validateQueueTaskState(state);
if (cursor) listInput.cursor = cursor;
return await store.listQueueTasks(listInput) as unknown as JsonValue;
}
const queueTaskMatch = path.match(/^\/api\/v1\/queue\/tasks\/([^/]+)$/u);
if (method === "GET" && queueTaskMatch) return await store.getQueueTask(queueTaskMatch[1] ?? "") as unknown as JsonValue;
const queueTaskCancelMatch = path.match(/^\/api\/v1\/queue\/tasks\/([^/]+)\/cancel$/u);
if (method === "POST" && queueTaskCancelMatch) {
const record = body === null ? {} : asRecord(body, "cancel");
const reason = typeof record.reason === "string" && record.reason.trim().length > 0 ? record.reason.trim() : undefined;
return await store.cancelQueueTask(queueTaskCancelMatch[1] ?? "", reason) as unknown as JsonValue;
}
const queueTaskReadMatch = path.match(/^\/api\/v1\/queue\/tasks\/([^/]+)\/read$/u);
if (method === "POST" && queueTaskReadMatch) {
const record = body === null ? {} : asRecord(body, "read");
const readerId = typeof record.readerId === "string" && record.readerId.trim().length > 0 ? record.readerId.trim() : "cli";
return await store.markQueueTaskRead(queueTaskReadMatch[1] ?? "", readerId) as unknown as JsonValue;
}
if (method === "GET" && path === "/api/v1/queue/stats") return await store.queueStats(url.searchParams.get("queue") ?? undefined) as unknown as JsonValue;
if (method === "GET" && path === "/api/v1/queue/commander") return await store.queueCommander(url.searchParams.get("queue") ?? undefined) as unknown as JsonValue;
if (method === "POST" && path === "/api/v1/runs") return await store.createRun(validateCreateRun(body)) as unknown as JsonValue;
const runMatch = path.match(/^\/api\/v1\/runs\/([^/]+)$/u);
if (method === "GET" && runMatch) return await store.getRun(runMatch[1] ?? "") as unknown as JsonValue;
+117 -1
View File
@@ -1,4 +1,4 @@
import type { BackendProfile, BackendTurnResult, CommandRecord, CreateCommandInput, CreateRunInput, FailureKind, JsonRecord, RunEvent, RunnerJobRecord, RunnerRecord, RunRecord, SessionRecord, SessionRef, TerminalStatus } from "../common/types.js";
import type { BackendProfile, BackendTurnResult, CommandRecord, CreateCommandInput, CreateQueueTaskInput, CreateRunInput, FailureKind, JsonRecord, QueueCommanderSnapshot, QueueReadCursorRecord, QueueStats, QueueTaskListResult, QueueTaskRecord, QueueTaskState, RunEvent, RunnerJobRecord, RunnerRecord, RunRecord, SessionRecord, SessionRef, TerminalStatus } from "../common/types.js";
import { AgentRunError } from "../common/errors.js";
import { newId, nowIso, stableHash } from "../common/validation.js";
import { redactJson } from "../common/redaction.js";
@@ -39,10 +39,25 @@ export interface AgentRunStore {
cancelRun(runId: string, reason?: string): MaybePromise<RunRecord>;
cancelCommand(commandId: string, reason?: string): MaybePromise<CommandRecord>;
getSession(sessionId: string): MaybePromise<SessionRecord | null>;
createQueueTask(input: CreateQueueTaskInput): MaybePromise<QueueTaskRecord>;
listQueueTasks(input: ListQueueTasksInput): MaybePromise<QueueTaskListResult>;
getQueueTask(taskId: string): MaybePromise<QueueTaskRecord>;
cancelQueueTask(taskId: string, reason?: string): MaybePromise<QueueTaskRecord>;
markQueueTaskRead(taskId: string, readerId: string): MaybePromise<QueueReadCursorRecord>;
queueStats(queue?: string): MaybePromise<QueueStats>;
queueCommander(queue?: string): MaybePromise<QueueCommanderSnapshot>;
backends(): MaybePromise<JsonRecord[]>;
close?(): MaybePromise<void>;
}
export interface ListQueueTasksInput {
queue?: string;
state?: QueueTaskState;
cursor?: string;
limit: number;
updatedAfter?: number;
}
export interface SaveRunnerJobInput {
runId: string;
commandId: string;
@@ -77,6 +92,9 @@ export class MemoryAgentRunStore implements AgentRunStore {
private readonly runners = new Map<string, RunnerRecord>();
private readonly sessions = new Map<string, SessionRecord>();
private readonly runnerJobs = new Map<string, RunnerJobRecord>();
private readonly queueTasks = new Map<string, QueueTaskRecord>();
private readonly queueReadCursors = new Map<string, QueueReadCursorRecord>();
private queueVersion = 0;
health(): StoreHealth {
return { adapter: "memory-self-test", ready: true, reachable: true, migrationReady: true, migrationId: "memory-self-test", failureKind: null, message: null, credentialValuesPrinted: false };
@@ -247,6 +265,64 @@ export class MemoryAgentRunStore implements AgentRunStore {
return this.sessions.get(sessionId) ?? null;
}
createQueueTask(input: CreateQueueTaskInput): QueueTaskRecord {
const payloadHash = stableHash(input.payload);
if (input.idempotencyKey) {
const existing = Array.from(this.queueTasks.values()).find((task) => task.tenantId === input.tenantId && task.projectId === input.projectId && task.idempotencyKey === input.idempotencyKey);
if (existing) {
if (existing.payloadHash !== payloadHash) throw new AgentRunError("schema-invalid", "queue task idempotency key reused with different payload", { httpStatus: 409 });
return existing;
}
}
const at = nowIso();
const task: QueueTaskRecord = { ...input, id: newId("qt"), state: "pending", version: this.nextQueueVersion(), payloadHash, latestAttempt: null, sessionPath: null, createdAt: at, updatedAt: at, cancelledAt: null, cancelReason: null };
this.queueTasks.set(task.id, task);
return task;
}
listQueueTasks(input: ListQueueTasksInput): QueueTaskListResult {
const startVersion = parseQueueCursor(input.cursor) ?? input.updatedAfter ?? 0;
const items = Array.from(this.queueTasks.values())
.filter((task) => task.version > startVersion)
.filter((task) => !input.queue || task.queue === input.queue)
.filter((task) => !input.state || task.state === input.state)
.sort(queueTaskSort)
.slice(0, clampQueueLimit(input.limit));
return { items, count: items.length, cursor: items.length > 0 ? String(items[items.length - 1]?.version ?? startVersion) : null };
}
getQueueTask(taskId: string): QueueTaskRecord {
const task = this.queueTasks.get(taskId);
if (!task) throw new AgentRunError("schema-invalid", `queue task ${taskId} was not found`, { httpStatus: 404 });
return task;
}
cancelQueueTask(taskId: string, reason = "cancel requested"): QueueTaskRecord {
const task = this.getQueueTask(taskId);
if (isTerminalQueueTaskState(task.state)) return task;
const at = nowIso();
const next: QueueTaskRecord = { ...task, state: "cancelled", version: this.nextQueueVersion(), updatedAt: at, cancelledAt: at, cancelReason: reason };
this.queueTasks.set(taskId, next);
return next;
}
markQueueTaskRead(taskId: string, readerId: string): QueueReadCursorRecord {
const task = this.getQueueTask(taskId);
const record: QueueReadCursorRecord = { taskId, readerId, taskVersion: task.version, readAt: nowIso() };
this.queueReadCursors.set(queueReadKey(taskId, readerId), record);
return record;
}
queueStats(queue?: string): QueueStats {
return buildQueueStats(Array.from(this.queueTasks.values()).filter((task) => !queue || task.queue === queue), queue ?? null);
}
queueCommander(queue?: string): QueueCommanderSnapshot {
const items = Array.from(this.queueTasks.values()).filter((task) => !queue || task.queue === queue).sort(queueTaskSort).slice(0, 20);
const generatedAt = nowIso();
return { queue: queue ?? null, stats: buildQueueStats(Array.from(this.queueTasks.values()).filter((task) => !queue || task.queue === queue), queue ?? null, generatedAt), items, generatedAt };
}
backends(): JsonRecord[] {
return backendCapabilities();
}
@@ -258,6 +334,11 @@ export class MemoryAgentRunStore implements AgentRunStore {
return next;
}
private nextQueueVersion(): number {
this.queueVersion += 1;
return this.queueVersion;
}
private resolveSessionForRun(input: CreateRunInput, at: string): SessionRef | null {
if (!input.sessionRef) return null;
const existing = this.sessions.get(input.sessionRef.sessionId);
@@ -334,6 +415,10 @@ export function isTerminalCommandState(state: CommandRecord["state"]): boolean {
return state === "completed" || state === "failed" || state === "cancelled";
}
export function isTerminalQueueTaskState(state: QueueTaskState): boolean {
return state === "completed" || state === "failed" || state === "blocked" || state === "cancelled";
}
export function sessionRefFromRecord(record: SessionRecord, fallback: SessionRef): SessionRef {
return {
sessionId: record.sessionId,
@@ -369,3 +454,34 @@ export function summarizeResourceBundleRef(resourceBundleRef: RunRecord["resourc
credentialRef: resourceBundleRef.credentialRef ? { name: resourceBundleRef.credentialRef.name, namespace: resourceBundleRef.credentialRef.namespace ?? null, keys: resourceBundleRef.credentialRef.keys ?? [], valuesPrinted: false } : null,
};
}
export function queueTaskSort(a: QueueTaskRecord, b: QueueTaskRecord): number {
if (a.priority !== b.priority) return b.priority - a.priority;
return a.createdAt.localeCompare(b.createdAt) || a.id.localeCompare(b.id);
}
export function buildQueueStats(tasks: QueueTaskRecord[], queue: string | null, generatedAt = nowIso()): QueueStats {
const byState: Record<string, number> = {};
const byLane: Record<string, number> = {};
let maxVersion = 0;
for (const task of tasks) {
byState[task.state] = (byState[task.state] ?? 0) + 1;
byLane[task.lane] = (byLane[task.lane] ?? 0) + 1;
maxVersion = Math.max(maxVersion, task.version);
}
return { queue, total: tasks.length, byState, byLane, maxVersion, generatedAt };
}
export function clampQueueLimit(limit: number): number {
return Math.max(1, Math.min(Number.isFinite(limit) ? Math.trunc(limit) : 50, 100));
}
export function parseQueueCursor(cursor: string | undefined): number | null {
if (!cursor) return null;
const value = Number(cursor);
return Number.isInteger(value) && value >= 0 ? value : null;
}
function queueReadKey(taskId: string, readerId: string): string {
return `${taskId}:${readerId}`;
}
+3 -1
View File
@@ -13,13 +13,15 @@ const selfTest: SelfTestCase = async () => {
(error) => error instanceof AgentRunError && error.failureKind === "infra-failed" && error.message.includes("DATABASE_URL is required"),
);
const postgresContract = postgresMigrationContract();
assert.equal(postgresContract.latestMigrationId, "003_v01_hwlab_manual_dispatch");
assert.equal(postgresContract.latestMigrationId, "004_v01_queue_q1");
assert.ok(Array.isArray(postgresContract.requiredTables));
assert.ok(postgresContract.requiredTables.includes("agentrun_schema_migrations"));
assert.ok(postgresContract.requiredTables.includes("agentrun_runs"));
assert.ok(postgresContract.requiredTables.includes("agentrun_events"));
assert.ok(postgresContract.requiredTables.includes("agentrun_sessions"));
assert.ok(postgresContract.requiredTables.includes("agentrun_runner_jobs"));
assert.ok(postgresContract.requiredTables.includes("agentrun_queue_tasks"));
assert.ok(postgresContract.requiredTables.includes("agentrun_queue_read_cursors"));
return { name: "redaction-postgres", tests: ["redaction", "postgres-store-contract"] };
};
+65
View File
@@ -0,0 +1,65 @@
import assert from "node:assert/strict";
import { startManagerServer } from "../../mgr/server.js";
import { MemoryAgentRunStore } from "../../mgr/store.js";
import { ManagerClient } from "../../mgr/client.js";
import type { QueueCommanderSnapshot, QueueReadCursorRecord, QueueStats, QueueTaskListResult, QueueTaskRecord } from "../../common/types.js";
import type { SelfTestCase } from "../harness.js";
const selfTest: SelfTestCase = async (context) => {
const server = await startManagerServer({ port: 0, host: "127.0.0.1", sourceCommit: "self-test", store: new MemoryAgentRunStore() });
try {
const client = new ManagerClient(server.baseUrl);
const input = {
tenantId: "unidesk",
projectId: "pikasTech/unidesk",
queue: "dev",
lane: "main",
title: "Q1 queue task",
priority: 10,
backendProfile: "codex",
providerId: "G14",
workspaceRef: { kind: "host-path", path: context.workspace },
executionPolicy: null,
resourceBundleRef: null,
payload: { prompt: "hello" },
references: [{ kind: "issue", url: "https://github.com/pikasTech/agentrun/issues/39" }],
metadata: { source: "self-test" },
idempotencyKey: "queue-q1-self-test",
};
const created = await client.post("/api/v1/queue/tasks", input) as QueueTaskRecord;
assert.equal(created.state, "pending");
assert.equal(created.sessionPath, null);
assert.equal(created.latestAttempt, null);
const duplicate = await client.post("/api/v1/queue/tasks", input) as QueueTaskRecord;
assert.equal(duplicate.id, created.id);
const listed = await client.get("/api/v1/queue/tasks?queue=dev&limit=10") as QueueTaskListResult;
assert.equal(listed.count, 1);
assert.equal(listed.items[0]?.id, created.id);
const shown = await client.get(`/api/v1/queue/tasks/${created.id}`) as QueueTaskRecord;
assert.equal(shown.title, "Q1 queue task");
assert.equal(shown.sessionPath, null);
const stats = await client.get("/api/v1/queue/stats?queue=dev") as QueueStats;
assert.equal(stats.total, 1);
assert.equal(stats.byState.pending, 1);
const read = await client.post(`/api/v1/queue/tasks/${created.id}/read`, { readerId: "self-test" }) as QueueReadCursorRecord;
assert.equal(read.taskId, created.id);
assert.equal(read.readerId, "self-test");
const cancelled = await client.post(`/api/v1/queue/tasks/${created.id}/cancel`, { reason: "self-test complete" }) as QueueTaskRecord;
assert.equal(cancelled.state, "cancelled");
assert.equal(cancelled.cancelReason, "self-test complete");
const commander = await client.get("/api/v1/queue/commander?queue=dev") as QueueCommanderSnapshot;
assert.equal(commander.stats.byState.cancelled, 1);
assert.equal(commander.items[0]?.id, created.id);
return { name: "queue-q1", tests: ["queue-q1-rest-memory"] };
} finally {
await new Promise<void>((resolve) => server.server.close(() => resolve()));
}
};
export default selfTest;