From b19143ad854387ffc7c39c760cd1220e15b42525 Mon Sep 17 00:00:00 2001 From: Codex Date: Mon, 1 Jun 2026 22:44:27 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=AE=9E=E7=8E=B0=20Queue=20Q2=20?= =?UTF-8?q?=E5=8F=97=E6=8E=A7=20dispatch?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- docs/reference/spec-v01-cli.md | 4 + docs/reference/spec-v01-queue.md | 6 +- scripts/src/cli.ts | 27 +++++ src/common/types.ts | 11 ++ src/mgr/postgres-store.ts | 19 ++- src/mgr/queue-dispatch.ts | 135 +++++++++++++++++++++ src/mgr/server.ts | 20 +++ src/mgr/store.ts | 17 ++- src/selftest/cases/75-queue-q2-dispatch.ts | 101 +++++++++++++++ 9 files changed, 337 insertions(+), 3 deletions(-) create mode 100644 src/mgr/queue-dispatch.ts create mode 100644 src/selftest/cases/75-queue-q2-dispatch.ts diff --git a/docs/reference/spec-v01-cli.md b/docs/reference/spec-v01-cli.md index 6df74fd..800f908 100644 --- a/docs/reference/spec-v01-cli.md +++ b/docs/reference/spec-v01-cli.md @@ -58,6 +58,8 @@ CLI 官方 TypeScript 入口固定为 `scripts/agentrun-cli.ts`。在 G14 非交 ./scripts/agentrun queue commander [--queue ] ./scripts/agentrun queue read ./scripts/agentrun queue cancel [--reason ] +./scripts/agentrun queue dispatch [--json-file ] +./scripts/agentrun queue refresh ./scripts/agentrun sessions show ./scripts/agentrun sessions output [--cursor ] [--limit ] ./scripts/agentrun sessions trace [--cursor ] [--limit ] @@ -74,6 +76,8 @@ CLI 官方 TypeScript 入口固定为 `scripts/agentrun-cli.ts`。在 G14 非交 - `server logs` 返回有界日志摘要,并指向完整日志文件或 Kubernetes pod identity。 - `secrets codex render --dry-run` 返回 Codex stdio profile Secret 创建计划、输入文件 bytes/hash、SecretRef、manifest 摘要和 apply 命令形状;`--profile codex` 默认 Secret name 为 `agentrun-v01-provider-codex`,`--profile deepseek` 默认 Secret name 为 `agentrun-v01-provider-deepseek`;它不得输出 Secret value 或执行 Kubernetes 写操作。 - `backends list` 必须显示 `codex` 与 `deepseek` profile 的 backendKind、protocol、transport、command、requiredSecretKeys 和状态;不得因为 `deepseek` 尚未配置 Secret 就隐藏 capability。 +- `queue dispatch` 是 Q2 的受控手动调度入口,只对单个 task 显式创建 attempt 和 Core run/command/runner job;不得伪装成自动 scheduler。 +- `queue refresh` 只根据 Queue task 中保存的 Core run/command 引用回写 Queue attempt 状态,不读取 Core trace 反推 commander 或统计。 - `queue show` 必须返回 task/attempt summary、state、read cursor、stats 相关字段和 `sessionPath`;不得返回或代理完整 output/trace。 - `sessions output` 与 `sessions trace` 是输出和 trace 的唯一 CLI 查询入口;不得新增 `queue output` 或 `queue trace` 兼容命令。 diff --git a/docs/reference/spec-v01-queue.md b/docs/reference/spec-v01-queue.md index 6c32b2d..dbad9b5 100644 --- a/docs/reference/spec-v01-queue.md +++ b/docs/reference/spec-v01-queue.md @@ -34,6 +34,8 @@ GET /api/v1/queue/tasks?queue=&state=&cursor=&limit= GET /api/v1/queue/commander?queue= ``` @@ -80,6 +82,8 @@ AgentRun CLI 必须提供 Queue 和 Session 两组命令。Queue 命令只操作 ./scripts/agentrun queue commander [--queue ] ./scripts/agentrun queue read ./scripts/agentrun queue cancel [--reason ] +./scripts/agentrun queue dispatch [--json-file ] +./scripts/agentrun queue refresh ``` Session 命令负责输出、trace 和会话控制: @@ -135,7 +139,7 @@ Queue 首版新增或扩展的稳定表方向: | --- | --- | --- | | Q0 | 固化 Queue SPEC 和 schema migration 计划 | 文档与旧 Code Queue 兼容口径不冲突。 | | Q1 | Queue RESTful API 和 CLI 骨架 | submit/list/show/stats/read/cancel 全部短返回 JSON,不触发真实执行也能保存 task。 | -| Q2 | attempt 到 Core run/command/runner job 的真实闭环 | Queue submit 产生 attempt,Scheduler 或受控 dispatch 创建真实 runner job。 | +| Q2 | attempt 到 Core run/command/runner job 的真实闭环 | Queue submit 后通过受控 `queue dispatch` 产生 attempt,并创建真实 Core run/command/runner job;`queue refresh` 从 Core run/command 终态回写 Queue attempt;自动 Scheduler 仍 deferred。 | | Q3 | Session 引用边界 | `queue show` 返回 `sessionPath`,输出和 trace 只能通过 `sessions ...` 查询。 | | Q4 | summary/stats/read/commander | Queue overview、stats、read cursor、commander 全部直接查 Queue 模型。 | | Q5 | retry/judge/cancel | retry/backoff、judge、cancel 与 attempt/session/core 状态一致。 | diff --git a/scripts/src/cli.ts b/scripts/src/cli.ts index c89d672..debe7e8 100644 --- a/scripts/src/cli.ts +++ b/scripts/src/cli.ts @@ -40,6 +40,8 @@ async function dispatch(args: ParsedArgs): Promise { if (group === "queue" && command === "commander") return client(args).get(`/api/v1/queue/commander${queueQuery(args)}`); if (group === "queue" && command === "read" && id) return client(args).post(`/api/v1/queue/tasks/${encodeURIComponent(id)}/read`, { readerId: optionalFlag(args, "reader-id") ?? "cli" }); if (group === "queue" && command === "cancel" && id) return client(args).post(`/api/v1/queue/tasks/${encodeURIComponent(id)}/cancel`, cancelBody(args)); + if (group === "queue" && command === "dispatch" && id) return dispatchQueueTask(args, id); + if (group === "queue" && command === "refresh" && id) return client(args).post(`/api/v1/queue/tasks/${encodeURIComponent(id)}/refresh`, {}); if (group === "runs" && command === "create") return client(args).post("/api/v1/runs", await jsonFile(args)); if (group === "runs" && command === "show" && id) return client(args).get(`/api/v1/runs/${encodeURIComponent(id)}`); if (group === "runs" && command === "events" && id) return client(args).get(`/api/v1/runs/${encodeURIComponent(id)}/events?afterSeq=${flag(args, "after-seq", "0")}&limit=${flag(args, "limit", "100")}`); @@ -132,6 +134,23 @@ function queueQuery(args: ParsedArgs): string { return queue ? `?queue=${encodeURIComponent(queue)}` : ""; } +async function dispatchQueueTask(args: ParsedArgs, taskId: string): Promise { + const body = await optionalJsonFile(args); + const copy = (flagName: string, key = flagName.replace(/-([a-z])/gu, (_, letter: string) => letter.toUpperCase())): void => { + const value = optionalFlag(args, flagName); + if (value) body[key] = value; + }; + copy("idempotency-key", "idempotencyKey"); + copy("image"); + copy("namespace"); + copy("attempt-id", "attemptId"); + copy("runner-id", "runnerId"); + copy("source-commit", "sourceCommit"); + copy("runner-manager-url", "managerUrl"); + copy("service-account-name", "serviceAccountName"); + return client(args).post(`/api/v1/queue/tasks/${encodeURIComponent(taskId)}/dispatch`, body); +} + async function showRunnerJobStatus(args: ParsedArgs): Promise { const runId = flag(args, "run-id", ""); if (!runId) throw new AgentRunError("schema-invalid", "runner job-status requires --run-id", { httpStatus: 2 }); @@ -228,6 +247,12 @@ async function jsonFile(args: ParsedArgs): Promise { throw new AgentRunError("schema-invalid", "json file must contain an object", { httpStatus: 2 }); } +async function optionalJsonFile(args: ParsedArgs): Promise { + const file = optionalFlag(args, "json-file"); + if (!file) return {}; + return jsonFile(args); +} + function parseArgs(argv: string[]): ParsedArgs { const positional: string[] = []; const flags = new Map(); @@ -287,6 +312,8 @@ function help(): JsonRecord { "queue commander [--queue ]", "queue read [--reader-id ]", "queue cancel [--reason ]", + "queue dispatch [--json-file ] [--idempotency-key ] [--image ] [--namespace ]", + "queue refresh ", "secrets codex render --dry-run [--profile codex|deepseek] [--codex-home ] [--namespace agentrun-v01] [--secret-name ]", "backends list", "server start|status", diff --git a/src/common/types.ts b/src/common/types.ts index c25f2ab..a747e70 100644 --- a/src/common/types.ts +++ b/src/common/types.ts @@ -239,6 +239,17 @@ export interface QueueCommanderSnapshot extends JsonRecord { generatedAt: string; } +export interface QueueDispatchResult extends JsonRecord { + action: "queue-dispatch"; + mutation: true; + task: QueueTaskRecord; + run: RunRecord; + command: CommandRecord; + runnerJob: JsonRecord; + latestAttempt: QueueAttemptRef; + pollCommands: JsonRecord; +} + export interface BackendEvent { type: EventType; payload: JsonRecord; diff --git a/src/mgr/postgres-store.ts b/src/mgr/postgres-store.ts index 03e6dc9..057d877 100644 --- a/src/mgr/postgres-store.ts +++ b/src/mgr/postgres-store.ts @@ -5,7 +5,7 @@ import { AgentRunError } from "../common/errors.js"; import { redactJson } from "../common/redaction.js"; import type { BackendProfile, BackendTurnResult, CommandRecord, CommandState, CreateCommandInput, CreateQueueTaskInput, CreateRunInput, EventType, FailureKind, JsonRecord, JsonValue, QueueCommanderSnapshot, QueueReadCursorRecord, QueueStats, QueueTaskListResult, QueueTaskRecord, QueueTaskState, RunEvent, RunnerJobRecord, RunnerRecord, RunRecord, RunStatus, SessionRecord, SessionRef, TerminalStatus } from "../common/types.js"; import { newId, nowIso, stableHash } from "../common/validation.js"; -import type { AgentRunStore, ListQueueTasksInput, SaveRunnerJobInput, StoreHealth } from "./store.js"; +import type { AgentRunStore, ListQueueTasksInput, SaveRunnerJobInput, StoreHealth, UpdateQueueTaskAttemptInput } from "./store.js"; import { assertSessionBoundary, buildQueueStats, clampQueueLimit, commandStateFromTerminal, isLeaseExpired, isTerminalCommandState, isTerminalQueueTaskState, isTerminalRunStatus, parseQueueCursor, queueTaskSort, sessionRefFromRecord, statusFromTerminal, summarizeResourceBundleRef, summarizeSessionRef } from "./store.js"; import { backendCapabilitiesSqlValues, mergeBackendCapability } from "../common/backend-profiles.js"; import { normalizeRunEventPayload, requireEventType } from "../common/events.js"; @@ -606,6 +606,23 @@ CREATE TABLE IF NOT EXISTS agentrun_schema_migrations ( return queueTaskFromRow(row); } + async updateQueueTaskAttempt(taskId: string, input: UpdateQueueTaskAttemptInput): Promise { + return this.withTransaction(async (client) => { + const existing = await client.query("SELECT * FROM agentrun_queue_tasks WHERE id = $1 FOR UPDATE", [taskId]); + const row = existing.rows[0]; + if (!row) throw new AgentRunError("schema-invalid", `queue task ${taskId} was not found`, { httpStatus: 404 }); + const task = queueTaskFromRow(row); + if (isTerminalQueueTaskState(task.state)) throw new AgentRunError(task.state === "cancelled" ? "cancelled" : "schema-invalid", `queue task ${taskId} is already terminal: ${task.state}`, { httpStatus: 409 }); + const version = await this.nextQueueVersion(client); + const at = nowIso(); + const updated = await client.query( + "UPDATE agentrun_queue_tasks SET state = $2, latest_attempt = $3::jsonb, session_path = $4, version = $5, updated_at = $6 WHERE id = $1 RETURNING *", + [taskId, input.state, JSON.stringify(input.latestAttempt), input.sessionPath, version, at], + ); + return queueTaskFromRow(updated.rows[0]); + }); + } + async cancelQueueTask(taskId: string, reason = "cancel requested"): Promise { return this.withTransaction(async (client) => { const existing = await client.query("SELECT * FROM agentrun_queue_tasks WHERE id = $1 FOR UPDATE", [taskId]); diff --git a/src/mgr/queue-dispatch.ts b/src/mgr/queue-dispatch.ts new file mode 100644 index 0000000..1c19b96 --- /dev/null +++ b/src/mgr/queue-dispatch.ts @@ -0,0 +1,135 @@ +import { AgentRunError } from "../common/errors.js"; +import type { CreateRunInput, JsonRecord, JsonValue, QueueAttemptRef, QueueDispatchResult, QueueTaskRecord, QueueTaskState, RunRecord } from "../common/types.js"; +import { asRecord } from "../common/validation.js"; +import type { AgentRunStore } from "./store.js"; +import { isTerminalQueueTaskState } from "./store.js"; +import { createKubernetesRunnerJob, type CreateRunnerJobInput, type RunnerJobDefaults } from "./kubernetes-runner-job.js"; + +export interface DispatchQueueTaskOptions { + store: AgentRunStore; + taskId: string; + input: JsonRecord; + defaults: RunnerJobDefaults; +} + +export async function dispatchQueueTask(options: DispatchQueueTaskOptions): Promise { + const task = await options.store.getQueueTask(options.taskId); + assertDispatchable(task); + + const run = await options.store.createRun(buildRunInput(task, options.input)); + const command = await options.store.createCommand(run.id, { type: "turn", payload: task.payload, idempotencyKey: `queue:${task.id}:command` }); + const runnerJob = await createKubernetesRunnerJob({ + store: options.store, + runId: run.id, + input: buildRunnerJobInput(task, command.id, options.input), + defaults: options.defaults, + }); + + const attemptId = stringFrom(runnerJob, "attemptId"); + const runnerJobs = await options.store.listRunnerJobs(run.id, command.id); + const runnerJobRecord = runnerJobs.find((item) => item.attemptId === attemptId) ?? runnerJobs.at(-1) ?? null; + const sessionId = run.sessionRef?.sessionId ?? null; + const sessionPath = sessionId ? `/api/v1/sessions/${encodeURIComponent(sessionId)}` : null; + const latestAttempt: QueueAttemptRef = { + attemptId, + state: "running", + runId: run.id, + commandId: command.id, + runnerJobId: runnerJobRecord?.id ?? null, + sessionId, + sessionPath, + }; + const updatedTask = await options.store.updateQueueTaskAttempt(task.id, { state: "running", latestAttempt, sessionPath }); + await options.store.appendEvent(run.id, "backend_status", { phase: "queue-dispatched", taskId: task.id, queue: task.queue, lane: task.lane, attemptId, runnerJobId: runnerJobRecord?.id ?? null, sessionPath }); + + return { + action: "queue-dispatch", + mutation: true, + task: updatedTask, + run, + command, + runnerJob, + latestAttempt, + pollCommands: { + queue: `./scripts/agentrun queue show ${task.id}`, + run: `./scripts/agentrun runs show ${run.id}`, + command: `./scripts/agentrun commands show ${command.id} --run-id ${run.id}`, + events: `./scripts/agentrun runs events ${run.id} --after-seq 0 --limit 100`, + }, + }; +} + +export async function refreshQueueTaskFromCore(store: AgentRunStore, taskId: string): Promise { + const task = await store.getQueueTask(taskId); + if (isTerminalQueueTaskState(task.state)) return task; + const latestAttempt = task.latestAttempt; + if (!latestAttempt?.runId || !latestAttempt.commandId) throw new AgentRunError("schema-invalid", `queue task ${taskId} has no Core attempt to refresh`, { httpStatus: 409 }); + const [run, command] = await Promise.all([store.getRun(latestAttempt.runId), store.getCommand(latestAttempt.commandId)]); + if (command.runId !== run.id) throw new AgentRunError("schema-invalid", `queue task ${taskId} attempt references mismatched run and command`, { httpStatus: 409 }); + const state = queueStateFromRun(run); + const sessionId = run.sessionRef?.sessionId ?? latestAttempt.sessionId ?? null; + const sessionPath = sessionId ? `/api/v1/sessions/${encodeURIComponent(sessionId)}` : latestAttempt.sessionPath ?? null; + const nextAttempt: QueueAttemptRef = { ...latestAttempt, state, sessionId, sessionPath }; + return store.updateQueueTaskAttempt(task.id, { state, latestAttempt: nextAttempt, sessionPath }); +} + +function assertDispatchable(task: QueueTaskRecord): void { + if (isTerminalQueueTaskState(task.state)) throw new AgentRunError(task.state === "cancelled" ? "cancelled" : "schema-invalid", `queue task ${task.id} is already terminal: ${task.state}`, { httpStatus: 409 }); + if (task.state !== "pending") throw new AgentRunError("schema-invalid", `queue task ${task.id} is not pending: ${task.state}`, { httpStatus: 409 }); + if (!task.workspaceRef) throw new AgentRunError("schema-invalid", "queue dispatch requires workspaceRef", { httpStatus: 400 }); + if (!task.providerId) throw new AgentRunError("schema-invalid", "queue dispatch requires providerId", { httpStatus: 400 }); + if (!task.executionPolicy) throw new AgentRunError("schema-invalid", "queue dispatch requires executionPolicy", { httpStatus: 400 }); +} + +function queueStateFromRun(run: RunRecord): QueueTaskState { + if (run.status === "completed") return "completed"; + if (run.status === "failed") return "failed"; + if (run.status === "blocked") return "blocked"; + if (run.status === "cancelled") return "cancelled"; + return "running"; +} + +function buildRunInput(task: QueueTaskRecord, input: JsonRecord): CreateRunInput { + if (!task.workspaceRef || !task.providerId || !task.executionPolicy) throw new AgentRunError("schema-invalid", "queue dispatch task is missing runtime fields", { httpStatus: 400 }); + const traceSink = input.traceSink === undefined ? { kind: "queue", taskId: task.id, queue: task.queue, lane: task.lane } : input.traceSink; + return { + tenantId: task.tenantId, + projectId: task.projectId, + workspaceRef: task.workspaceRef, + resourceBundleRef: task.resourceBundleRef, + providerId: task.providerId, + backendProfile: task.backendProfile, + executionPolicy: task.executionPolicy, + traceSink: traceSink as JsonValue, + }; +} + +function buildRunnerJobInput(task: QueueTaskRecord, commandId: string, input: JsonRecord): CreateRunnerJobInput { + const jobInput: CreateRunnerJobInput = { commandId, idempotencyKey: optionalString(input.idempotencyKey) ?? `queue:${task.id}:runner-job` }; + const copyString = (inputKey: string, outputKey: keyof CreateRunnerJobInput): void => { + const value = optionalString(input[inputKey]); + if (value) jobInput[outputKey] = value as never; + }; + copyString("managerUrl", "managerUrl"); + copyString("image", "image"); + copyString("namespace", "namespace"); + copyString("attemptId", "attemptId"); + copyString("runnerId", "runnerId"); + copyString("sourceCommit", "sourceCommit"); + copyString("serviceAccountName", "serviceAccountName"); + if (input.transientEnv !== undefined) { + if (!Array.isArray(input.transientEnv)) throw new AgentRunError("schema-invalid", "transientEnv must be an array", { httpStatus: 400 }); + jobInput.transientEnv = input.transientEnv.map((item, index) => asRecord(item, `transientEnv[${index}]`)); + } + return jobInput; +} + +function stringFrom(record: JsonRecord, key: string): string { + const value = record[key]; + if (typeof value !== "string" || value.length === 0) throw new AgentRunError("infra-failed", `runner job response missing ${key}`, { httpStatus: 502 }); + return value; +} + +function optionalString(value: unknown): string | undefined { + return typeof value === "string" && value.trim().length > 0 ? value.trim() : undefined; +} diff --git a/src/mgr/server.ts b/src/mgr/server.ts index 80316c0..dcdb4c8 100644 --- a/src/mgr/server.ts +++ b/src/mgr/server.ts @@ -7,6 +7,7 @@ import { AgentRunError, errorToJson } from "../common/errors.js"; import { asRecord, validateCreateCommand, validateCreateQueueTask, validateCreateRun, validateQueueTaskState } from "../common/validation.js"; import type { ApiErrorBody, ApiOkBody, JsonRecord, JsonValue, RunEvent } from "../common/types.js"; import { createKubernetesRunnerJob } from "./kubernetes-runner-job.js"; +import { dispatchQueueTask, refreshQueueTaskFromCore } from "./queue-dispatch.js"; import { buildRunResult } from "./result.js"; import { runnerJobStatusSummary } from "./runner-job-status.js"; @@ -81,6 +82,25 @@ async function route({ method, url, body, store, sourceCommit, runnerJobDefaults } const queueTaskMatch = path.match(/^\/api\/v1\/queue\/tasks\/([^/]+)$/u); if (method === "GET" && queueTaskMatch) return await store.getQueueTask(queueTaskMatch[1] ?? "") as unknown as JsonValue; + const queueTaskDispatchMatch = path.match(/^\/api\/v1\/queue\/tasks\/([^/]+)\/dispatch$/u); + if (method === "POST" && queueTaskDispatchMatch) { + const namespace = runnerJobDefaults?.namespace ?? process.env.AGENTRUN_RUNTIME_NAMESPACE ?? "agentrun-v01"; + return await dispatchQueueTask({ + store, + taskId: queueTaskDispatchMatch[1] ?? "", + input: asRecord(body ?? {}, "queueDispatch"), + defaults: { + namespace, + managerUrl: runnerJobDefaults?.managerUrl ?? process.env.AGENTRUN_INTERNAL_MGR_URL ?? `http://agentrun-mgr.${namespace}.svc.cluster.local:8080`, + image: runnerJobDefaults?.image ?? process.env.AGENTRUN_RUNNER_IMAGE ?? "", + sourceCommit, + serviceAccountName: runnerJobDefaults?.serviceAccountName ?? process.env.AGENTRUN_RUNNER_SERVICE_ACCOUNT ?? "agentrun-v01-runner", + ...(runnerJobDefaults?.kubectlCommand ? { kubectlCommand: runnerJobDefaults.kubectlCommand } : {}), + }, + }) as unknown as JsonValue; + } + const queueTaskRefreshMatch = path.match(/^\/api\/v1\/queue\/tasks\/([^/]+)\/refresh$/u); + if (method === "POST" && queueTaskRefreshMatch) return await refreshQueueTaskFromCore(store, queueTaskRefreshMatch[1] ?? "") as unknown as JsonValue; const queueTaskCancelMatch = path.match(/^\/api\/v1\/queue\/tasks\/([^/]+)\/cancel$/u); if (method === "POST" && queueTaskCancelMatch) { const record = body === null ? {} : asRecord(body, "cancel"); diff --git a/src/mgr/store.ts b/src/mgr/store.ts index d7349d8..f9f8425 100644 --- a/src/mgr/store.ts +++ b/src/mgr/store.ts @@ -1,4 +1,4 @@ -import type { BackendProfile, BackendTurnResult, CommandRecord, CreateCommandInput, CreateQueueTaskInput, CreateRunInput, FailureKind, JsonRecord, QueueCommanderSnapshot, QueueReadCursorRecord, QueueStats, QueueTaskListResult, QueueTaskRecord, QueueTaskState, RunEvent, RunnerJobRecord, RunnerRecord, RunRecord, SessionRecord, SessionRef, TerminalStatus } from "../common/types.js"; +import type { BackendProfile, BackendTurnResult, CommandRecord, CreateCommandInput, CreateQueueTaskInput, CreateRunInput, FailureKind, JsonRecord, QueueAttemptRef, QueueCommanderSnapshot, QueueReadCursorRecord, QueueStats, QueueTaskListResult, QueueTaskRecord, QueueTaskState, RunEvent, RunnerJobRecord, RunnerRecord, RunRecord, SessionRecord, SessionRef, TerminalStatus } from "../common/types.js"; import { AgentRunError } from "../common/errors.js"; import { newId, nowIso, stableHash } from "../common/validation.js"; import { redactJson } from "../common/redaction.js"; @@ -42,6 +42,7 @@ export interface AgentRunStore { createQueueTask(input: CreateQueueTaskInput): MaybePromise; listQueueTasks(input: ListQueueTasksInput): MaybePromise; getQueueTask(taskId: string): MaybePromise; + updateQueueTaskAttempt(taskId: string, input: UpdateQueueTaskAttemptInput): MaybePromise; cancelQueueTask(taskId: string, reason?: string): MaybePromise; markQueueTaskRead(taskId: string, readerId: string): MaybePromise; queueStats(queue?: string): MaybePromise; @@ -58,6 +59,12 @@ export interface ListQueueTasksInput { updatedAfter?: number; } +export interface UpdateQueueTaskAttemptInput { + state: QueueTaskState; + latestAttempt: QueueAttemptRef; + sessionPath: string | null; +} + export interface SaveRunnerJobInput { runId: string; commandId: string; @@ -299,6 +306,14 @@ export class MemoryAgentRunStore implements AgentRunStore { return task; } + updateQueueTaskAttempt(taskId: string, input: UpdateQueueTaskAttemptInput): QueueTaskRecord { + const task = this.getQueueTask(taskId); + if (isTerminalQueueTaskState(task.state)) throw new AgentRunError(task.state === "cancelled" ? "cancelled" : "schema-invalid", `queue task ${taskId} is already terminal: ${task.state}`, { httpStatus: 409 }); + const next: QueueTaskRecord = { ...task, state: input.state, latestAttempt: input.latestAttempt, sessionPath: input.sessionPath, version: this.nextQueueVersion(), updatedAt: nowIso() }; + this.queueTasks.set(taskId, next); + return next; + } + cancelQueueTask(taskId: string, reason = "cancel requested"): QueueTaskRecord { const task = this.getQueueTask(taskId); if (isTerminalQueueTaskState(task.state)) return task; diff --git a/src/selftest/cases/75-queue-q2-dispatch.ts b/src/selftest/cases/75-queue-q2-dispatch.ts new file mode 100644 index 0000000..00d3962 --- /dev/null +++ b/src/selftest/cases/75-queue-q2-dispatch.ts @@ -0,0 +1,101 @@ +import assert from "node:assert/strict"; +import { chmod, readFile, writeFile } from "node:fs/promises"; +import path from "node:path"; +import { startManagerServer } from "../../mgr/server.js"; +import { MemoryAgentRunStore } from "../../mgr/store.js"; +import { ManagerClient } from "../../mgr/client.js"; +import type { JsonRecord, QueueDispatchResult, QueueTaskRecord } from "../../common/types.js"; +import { assertNoSecretLeak, type SelfTestCase } from "../harness.js"; + +const selfTest: SelfTestCase = async (context) => { + const fakeKubectl = path.join(context.tmp, "fake-kubectl-queue-q2.js"); + const createdManifest = path.join(context.tmp, "created-queue-q2-runner-job.json"); + await writeFile(fakeKubectl, `#!/usr/bin/env bun +const chunks = []; +for await (const chunk of Bun.stdin.stream()) chunks.push(chunk); +const text = Buffer.concat(chunks.map((chunk) => Buffer.from(chunk))).toString("utf8"); +await Bun.write(${JSON.stringify(createdManifest)}, text); +const manifest = JSON.parse(text); +console.log(JSON.stringify({ apiVersion: manifest.apiVersion, kind: manifest.kind, metadata: { uid: "job-uid-queue-q2", resourceVersion: "1", name: manifest.metadata.name, namespace: manifest.metadata.namespace } })); +`); + await chmod(fakeKubectl, 0o755); + const store = new MemoryAgentRunStore(); + const server = await startManagerServer({ + port: 0, + host: "127.0.0.1", + sourceCommit: "self-test", + store, + runnerJobDefaults: { + namespace: "agentrun-v01", + managerUrl: "http://agentrun-mgr.agentrun-v01.svc.cluster.local:8080", + image: "127.0.0.1:5000/agentrun/agentrun-mgr@sha256:1111111111111111111111111111111111111111111111111111111111111111", + kubectlCommand: fakeKubectl, + }, + }); + try { + const client = new ManagerClient(server.baseUrl); + const created = await client.post("/api/v1/queue/tasks", { + tenantId: "unidesk", + projectId: "pikasTech/unidesk", + queue: "dev", + lane: "q2", + title: "Q2 queue dispatch task", + priority: 20, + backendProfile: "codex", + providerId: "G14", + workspaceRef: { kind: "host-path", path: context.workspace }, + executionPolicy: { + sandbox: "workspace-write", + approval: "never", + timeoutMs: 15_000, + network: "default", + secretScope: { allowCredentialEcho: false, providerCredentials: [{ profile: "codex", secretRef: { name: "agentrun-v01-provider-codex", keys: ["auth.json", "config.toml"], mountPath: context.codexHome } }] }, + }, + resourceBundleRef: null, + payload: { prompt: "queue dispatch hello" }, + references: [{ kind: "issue", url: "https://github.com/pikasTech/agentrun/issues/39" }], + metadata: { source: "queue-q2-self-test" }, + idempotencyKey: "queue-q2-dispatch-self-test", + }) as QueueTaskRecord; + const dispatched = await client.post(`/api/v1/queue/tasks/${created.id}/dispatch`, { attemptId: "attempt_queue_q2_selftest" }) as QueueDispatchResult; + assert.equal(dispatched.action, "queue-dispatch"); + assert.equal(dispatched.mutation, true); + assert.equal(dispatched.latestAttempt.attemptId, "attempt_queue_q2_selftest"); + assert.equal(dispatched.latestAttempt.runId, dispatched.run.id); + assert.equal(dispatched.latestAttempt.commandId, dispatched.command.id); + assert.ok(dispatched.latestAttempt.runnerJobId); + assert.equal(dispatched.task.state, "running"); + assert.equal(dispatched.task.latestAttempt?.attemptId, "attempt_queue_q2_selftest"); + assert.equal(dispatched.task.sessionPath, null); + + const shown = await client.get(`/api/v1/queue/tasks/${created.id}`) as QueueTaskRecord; + assert.equal(shown.state, "running"); + assert.equal(shown.latestAttempt?.runId, dispatched.run.id); + assert.equal(shown.latestAttempt?.commandId, dispatched.command.id); + assert.equal(shown.latestAttempt?.runnerJobId, dispatched.latestAttempt.runnerJobId); + + const jobs = await client.get(`/api/v1/runs/${dispatched.run.id}/runner-jobs?commandId=${dispatched.command.id}`) as { items?: JsonRecord[]; count?: number }; + assert.equal(jobs.count, 1); + assert.equal(jobs.items?.[0]?.attemptId, "attempt_queue_q2_selftest"); + const events = await client.get(`/api/v1/runs/${dispatched.run.id}/events?afterSeq=0&limit=100`) as { items?: JsonRecord[] }; + assert.ok(events.items?.some((item) => ((item.payload as JsonRecord).phase) === "queue-dispatched")); + await assert.rejects( + () => client.post(`/api/v1/queue/tasks/${created.id}/dispatch`, { attemptId: "attempt_queue_q2_duplicate" }), + (error) => error instanceof Error && error.message.includes("not pending"), + ); + await client.patch(`/api/v1/commands/${dispatched.command.id}/status`, { terminalStatus: "completed", failureKind: null, failureMessage: null }); + await client.patch(`/api/v1/runs/${dispatched.run.id}/status`, { terminalStatus: "completed", failureKind: null, failureMessage: null }); + const refreshed = await client.post(`/api/v1/queue/tasks/${created.id}/refresh`, {}) as QueueTaskRecord; + assert.equal(refreshed.state, "completed"); + assert.equal(refreshed.latestAttempt?.state, "completed"); + assert.equal(refreshed.latestAttempt?.runId, dispatched.run.id); + const manifest = JSON.parse(await readFile(createdManifest, "utf8")) as JsonRecord; + assert.ok(JSON.stringify(manifest).includes(dispatched.run.id)); + assertNoSecretLeak(dispatched); + return { name: "queue-q2-dispatch", tests: ["queue-dispatch-run-command-runner-job", "queue-refresh-from-core-status", "queue-dispatch-no-repeat"] }; + } finally { + await new Promise((resolve) => server.server.close(() => resolve())); + } +}; + +export default selfTest;