feat: 实现 Queue Q2 受控 dispatch
This commit is contained in:
@@ -58,6 +58,8 @@ CLI 官方 TypeScript 入口固定为 `scripts/agentrun-cli.ts`。在 G14 非交
|
||||
./scripts/agentrun queue commander [--queue <queue>]
|
||||
./scripts/agentrun queue read <taskId>
|
||||
./scripts/agentrun queue cancel <taskId> [--reason <text>]
|
||||
./scripts/agentrun queue dispatch <taskId> [--json-file <dispatch.json>]
|
||||
./scripts/agentrun queue refresh <taskId>
|
||||
./scripts/agentrun sessions show <sessionId|sessionPath>
|
||||
./scripts/agentrun sessions output <sessionId|sessionPath> [--cursor <cursor>] [--limit <limit>]
|
||||
./scripts/agentrun sessions trace <sessionId|sessionPath> [--cursor <cursor>] [--limit <limit>]
|
||||
@@ -74,6 +76,8 @@ CLI 官方 TypeScript 入口固定为 `scripts/agentrun-cli.ts`。在 G14 非交
|
||||
- `server logs` 返回有界日志摘要,并指向完整日志文件或 Kubernetes pod identity。
|
||||
- `secrets codex render --dry-run` 返回 Codex stdio profile Secret 创建计划、输入文件 bytes/hash、SecretRef、manifest 摘要和 apply 命令形状;`--profile codex` 默认 Secret name 为 `agentrun-v01-provider-codex`,`--profile deepseek` 默认 Secret name 为 `agentrun-v01-provider-deepseek`;它不得输出 Secret value 或执行 Kubernetes 写操作。
|
||||
- `backends list` 必须显示 `codex` 与 `deepseek` profile 的 backendKind、protocol、transport、command、requiredSecretKeys 和状态;不得因为 `deepseek` 尚未配置 Secret 就隐藏 capability。
|
||||
- `queue dispatch` 是 Q2 的受控手动调度入口,只对单个 task 显式创建 attempt 和 Core run/command/runner job;不得伪装成自动 scheduler。
|
||||
- `queue refresh` 只根据 Queue task 中保存的 Core run/command 引用回写 Queue attempt 状态,不读取 Core trace 反推 commander 或统计。
|
||||
- `queue show` 必须返回 task/attempt summary、state、read cursor、stats 相关字段和 `sessionPath`;不得返回或代理完整 output/trace。
|
||||
- `sessions output` 与 `sessions trace` 是输出和 trace 的唯一 CLI 查询入口;不得新增 `queue output` 或 `queue trace` 兼容命令。
|
||||
|
||||
|
||||
@@ -34,6 +34,8 @@ GET /api/v1/queue/tasks?queue=<queue>&state=<state>&cursor=<cursor>&limit=<limi
|
||||
GET /api/v1/queue/tasks/:taskId
|
||||
POST /api/v1/queue/tasks/:taskId/cancel
|
||||
POST /api/v1/queue/tasks/:taskId/read
|
||||
POST /api/v1/queue/tasks/:taskId/dispatch
|
||||
POST /api/v1/queue/tasks/:taskId/refresh
|
||||
GET /api/v1/queue/stats?queue=<queue>
|
||||
GET /api/v1/queue/commander?queue=<queue>
|
||||
```
|
||||
@@ -80,6 +82,8 @@ AgentRun CLI 必须提供 Queue 和 Session 两组命令。Queue 命令只操作
|
||||
./scripts/agentrun queue commander [--queue <queue>]
|
||||
./scripts/agentrun queue read <taskId>
|
||||
./scripts/agentrun queue cancel <taskId> [--reason <text>]
|
||||
./scripts/agentrun queue dispatch <taskId> [--json-file <dispatch.json>]
|
||||
./scripts/agentrun queue refresh <taskId>
|
||||
```
|
||||
|
||||
Session 命令负责输出、trace 和会话控制:
|
||||
@@ -135,7 +139,7 @@ Queue 首版新增或扩展的稳定表方向:
|
||||
| --- | --- | --- |
|
||||
| Q0 | 固化 Queue SPEC 和 schema migration 计划 | 文档与旧 Code Queue 兼容口径不冲突。 |
|
||||
| Q1 | Queue RESTful API 和 CLI 骨架 | submit/list/show/stats/read/cancel 全部短返回 JSON,不触发真实执行也能保存 task。 |
|
||||
| Q2 | attempt 到 Core run/command/runner job 的真实闭环 | Queue submit 产生 attempt,Scheduler 或受控 dispatch 创建真实 runner job。 |
|
||||
| Q2 | attempt 到 Core run/command/runner job 的真实闭环 | Queue submit 后通过受控 `queue dispatch` 产生 attempt,并创建真实 Core run/command/runner job;`queue refresh` 从 Core run/command 终态回写 Queue attempt;自动 Scheduler 仍 deferred。 |
|
||||
| Q3 | Session 引用边界 | `queue show` 返回 `sessionPath`,输出和 trace 只能通过 `sessions ...` 查询。 |
|
||||
| Q4 | summary/stats/read/commander | Queue overview、stats、read cursor、commander 全部直接查 Queue 模型。 |
|
||||
| Q5 | retry/judge/cancel | retry/backoff、judge、cancel 与 attempt/session/core 状态一致。 |
|
||||
|
||||
@@ -40,6 +40,8 @@ async function dispatch(args: ParsedArgs): Promise<JsonValue> {
|
||||
if (group === "queue" && command === "commander") return client(args).get(`/api/v1/queue/commander${queueQuery(args)}`);
|
||||
if (group === "queue" && command === "read" && id) return client(args).post(`/api/v1/queue/tasks/${encodeURIComponent(id)}/read`, { readerId: optionalFlag(args, "reader-id") ?? "cli" });
|
||||
if (group === "queue" && command === "cancel" && id) return client(args).post(`/api/v1/queue/tasks/${encodeURIComponent(id)}/cancel`, cancelBody(args));
|
||||
if (group === "queue" && command === "dispatch" && id) return dispatchQueueTask(args, id);
|
||||
if (group === "queue" && command === "refresh" && id) return client(args).post(`/api/v1/queue/tasks/${encodeURIComponent(id)}/refresh`, {});
|
||||
if (group === "runs" && command === "create") return client(args).post("/api/v1/runs", await jsonFile(args));
|
||||
if (group === "runs" && command === "show" && id) return client(args).get(`/api/v1/runs/${encodeURIComponent(id)}`);
|
||||
if (group === "runs" && command === "events" && id) return client(args).get(`/api/v1/runs/${encodeURIComponent(id)}/events?afterSeq=${flag(args, "after-seq", "0")}&limit=${flag(args, "limit", "100")}`);
|
||||
@@ -132,6 +134,23 @@ function queueQuery(args: ParsedArgs): string {
|
||||
return queue ? `?queue=${encodeURIComponent(queue)}` : "";
|
||||
}
|
||||
|
||||
async function dispatchQueueTask(args: ParsedArgs, taskId: string): Promise<JsonValue> {
|
||||
const body = await optionalJsonFile(args);
|
||||
const copy = (flagName: string, key = flagName.replace(/-([a-z])/gu, (_, letter: string) => letter.toUpperCase())): void => {
|
||||
const value = optionalFlag(args, flagName);
|
||||
if (value) body[key] = value;
|
||||
};
|
||||
copy("idempotency-key", "idempotencyKey");
|
||||
copy("image");
|
||||
copy("namespace");
|
||||
copy("attempt-id", "attemptId");
|
||||
copy("runner-id", "runnerId");
|
||||
copy("source-commit", "sourceCommit");
|
||||
copy("runner-manager-url", "managerUrl");
|
||||
copy("service-account-name", "serviceAccountName");
|
||||
return client(args).post(`/api/v1/queue/tasks/${encodeURIComponent(taskId)}/dispatch`, body);
|
||||
}
|
||||
|
||||
async function showRunnerJobStatus(args: ParsedArgs): Promise<JsonValue> {
|
||||
const runId = flag(args, "run-id", "");
|
||||
if (!runId) throw new AgentRunError("schema-invalid", "runner job-status requires --run-id", { httpStatus: 2 });
|
||||
@@ -228,6 +247,12 @@ async function jsonFile(args: ParsedArgs): Promise<JsonRecord> {
|
||||
throw new AgentRunError("schema-invalid", "json file must contain an object", { httpStatus: 2 });
|
||||
}
|
||||
|
||||
async function optionalJsonFile(args: ParsedArgs): Promise<JsonRecord> {
|
||||
const file = optionalFlag(args, "json-file");
|
||||
if (!file) return {};
|
||||
return jsonFile(args);
|
||||
}
|
||||
|
||||
function parseArgs(argv: string[]): ParsedArgs {
|
||||
const positional: string[] = [];
|
||||
const flags = new Map<string, string | boolean>();
|
||||
@@ -287,6 +312,8 @@ function help(): JsonRecord {
|
||||
"queue commander [--queue <queue>]",
|
||||
"queue read <taskId> [--reader-id <reader>]",
|
||||
"queue cancel <taskId> [--reason <text>]",
|
||||
"queue dispatch <taskId> [--json-file <dispatch.json>] [--idempotency-key <key>] [--image <image>] [--namespace <namespace>]",
|
||||
"queue refresh <taskId>",
|
||||
"secrets codex render --dry-run [--profile codex|deepseek] [--codex-home <dir>] [--namespace agentrun-v01] [--secret-name <name>]",
|
||||
"backends list",
|
||||
"server start|status",
|
||||
|
||||
@@ -239,6 +239,17 @@ export interface QueueCommanderSnapshot extends JsonRecord {
|
||||
generatedAt: string;
|
||||
}
|
||||
|
||||
export interface QueueDispatchResult extends JsonRecord {
|
||||
action: "queue-dispatch";
|
||||
mutation: true;
|
||||
task: QueueTaskRecord;
|
||||
run: RunRecord;
|
||||
command: CommandRecord;
|
||||
runnerJob: JsonRecord;
|
||||
latestAttempt: QueueAttemptRef;
|
||||
pollCommands: JsonRecord;
|
||||
}
|
||||
|
||||
export interface BackendEvent {
|
||||
type: EventType;
|
||||
payload: JsonRecord;
|
||||
|
||||
@@ -5,7 +5,7 @@ import { AgentRunError } from "../common/errors.js";
|
||||
import { redactJson } from "../common/redaction.js";
|
||||
import type { BackendProfile, BackendTurnResult, CommandRecord, CommandState, CreateCommandInput, CreateQueueTaskInput, CreateRunInput, EventType, FailureKind, JsonRecord, JsonValue, QueueCommanderSnapshot, QueueReadCursorRecord, QueueStats, QueueTaskListResult, QueueTaskRecord, QueueTaskState, RunEvent, RunnerJobRecord, RunnerRecord, RunRecord, RunStatus, SessionRecord, SessionRef, TerminalStatus } from "../common/types.js";
|
||||
import { newId, nowIso, stableHash } from "../common/validation.js";
|
||||
import type { AgentRunStore, ListQueueTasksInput, SaveRunnerJobInput, StoreHealth } from "./store.js";
|
||||
import type { AgentRunStore, ListQueueTasksInput, SaveRunnerJobInput, StoreHealth, UpdateQueueTaskAttemptInput } from "./store.js";
|
||||
import { assertSessionBoundary, buildQueueStats, clampQueueLimit, commandStateFromTerminal, isLeaseExpired, isTerminalCommandState, isTerminalQueueTaskState, isTerminalRunStatus, parseQueueCursor, queueTaskSort, sessionRefFromRecord, statusFromTerminal, summarizeResourceBundleRef, summarizeSessionRef } from "./store.js";
|
||||
import { backendCapabilitiesSqlValues, mergeBackendCapability } from "../common/backend-profiles.js";
|
||||
import { normalizeRunEventPayload, requireEventType } from "../common/events.js";
|
||||
@@ -606,6 +606,23 @@ CREATE TABLE IF NOT EXISTS agentrun_schema_migrations (
|
||||
return queueTaskFromRow(row);
|
||||
}
|
||||
|
||||
async updateQueueTaskAttempt(taskId: string, input: UpdateQueueTaskAttemptInput): Promise<QueueTaskRecord> {
|
||||
return this.withTransaction(async (client) => {
|
||||
const existing = await client.query("SELECT * FROM agentrun_queue_tasks WHERE id = $1 FOR UPDATE", [taskId]);
|
||||
const row = existing.rows[0];
|
||||
if (!row) throw new AgentRunError("schema-invalid", `queue task ${taskId} was not found`, { httpStatus: 404 });
|
||||
const task = queueTaskFromRow(row);
|
||||
if (isTerminalQueueTaskState(task.state)) throw new AgentRunError(task.state === "cancelled" ? "cancelled" : "schema-invalid", `queue task ${taskId} is already terminal: ${task.state}`, { httpStatus: 409 });
|
||||
const version = await this.nextQueueVersion(client);
|
||||
const at = nowIso();
|
||||
const updated = await client.query(
|
||||
"UPDATE agentrun_queue_tasks SET state = $2, latest_attempt = $3::jsonb, session_path = $4, version = $5, updated_at = $6 WHERE id = $1 RETURNING *",
|
||||
[taskId, input.state, JSON.stringify(input.latestAttempt), input.sessionPath, version, at],
|
||||
);
|
||||
return queueTaskFromRow(updated.rows[0]);
|
||||
});
|
||||
}
|
||||
|
||||
async cancelQueueTask(taskId: string, reason = "cancel requested"): Promise<QueueTaskRecord> {
|
||||
return this.withTransaction(async (client) => {
|
||||
const existing = await client.query("SELECT * FROM agentrun_queue_tasks WHERE id = $1 FOR UPDATE", [taskId]);
|
||||
|
||||
@@ -0,0 +1,135 @@
|
||||
import { AgentRunError } from "../common/errors.js";
|
||||
import type { CreateRunInput, JsonRecord, JsonValue, QueueAttemptRef, QueueDispatchResult, QueueTaskRecord, QueueTaskState, RunRecord } from "../common/types.js";
|
||||
import { asRecord } from "../common/validation.js";
|
||||
import type { AgentRunStore } from "./store.js";
|
||||
import { isTerminalQueueTaskState } from "./store.js";
|
||||
import { createKubernetesRunnerJob, type CreateRunnerJobInput, type RunnerJobDefaults } from "./kubernetes-runner-job.js";
|
||||
|
||||
export interface DispatchQueueTaskOptions {
|
||||
store: AgentRunStore;
|
||||
taskId: string;
|
||||
input: JsonRecord;
|
||||
defaults: RunnerJobDefaults;
|
||||
}
|
||||
|
||||
export async function dispatchQueueTask(options: DispatchQueueTaskOptions): Promise<QueueDispatchResult> {
|
||||
const task = await options.store.getQueueTask(options.taskId);
|
||||
assertDispatchable(task);
|
||||
|
||||
const run = await options.store.createRun(buildRunInput(task, options.input));
|
||||
const command = await options.store.createCommand(run.id, { type: "turn", payload: task.payload, idempotencyKey: `queue:${task.id}:command` });
|
||||
const runnerJob = await createKubernetesRunnerJob({
|
||||
store: options.store,
|
||||
runId: run.id,
|
||||
input: buildRunnerJobInput(task, command.id, options.input),
|
||||
defaults: options.defaults,
|
||||
});
|
||||
|
||||
const attemptId = stringFrom(runnerJob, "attemptId");
|
||||
const runnerJobs = await options.store.listRunnerJobs(run.id, command.id);
|
||||
const runnerJobRecord = runnerJobs.find((item) => item.attemptId === attemptId) ?? runnerJobs.at(-1) ?? null;
|
||||
const sessionId = run.sessionRef?.sessionId ?? null;
|
||||
const sessionPath = sessionId ? `/api/v1/sessions/${encodeURIComponent(sessionId)}` : null;
|
||||
const latestAttempt: QueueAttemptRef = {
|
||||
attemptId,
|
||||
state: "running",
|
||||
runId: run.id,
|
||||
commandId: command.id,
|
||||
runnerJobId: runnerJobRecord?.id ?? null,
|
||||
sessionId,
|
||||
sessionPath,
|
||||
};
|
||||
const updatedTask = await options.store.updateQueueTaskAttempt(task.id, { state: "running", latestAttempt, sessionPath });
|
||||
await options.store.appendEvent(run.id, "backend_status", { phase: "queue-dispatched", taskId: task.id, queue: task.queue, lane: task.lane, attemptId, runnerJobId: runnerJobRecord?.id ?? null, sessionPath });
|
||||
|
||||
return {
|
||||
action: "queue-dispatch",
|
||||
mutation: true,
|
||||
task: updatedTask,
|
||||
run,
|
||||
command,
|
||||
runnerJob,
|
||||
latestAttempt,
|
||||
pollCommands: {
|
||||
queue: `./scripts/agentrun queue show ${task.id}`,
|
||||
run: `./scripts/agentrun runs show ${run.id}`,
|
||||
command: `./scripts/agentrun commands show ${command.id} --run-id ${run.id}`,
|
||||
events: `./scripts/agentrun runs events ${run.id} --after-seq 0 --limit 100`,
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
export async function refreshQueueTaskFromCore(store: AgentRunStore, taskId: string): Promise<QueueTaskRecord> {
|
||||
const task = await store.getQueueTask(taskId);
|
||||
if (isTerminalQueueTaskState(task.state)) return task;
|
||||
const latestAttempt = task.latestAttempt;
|
||||
if (!latestAttempt?.runId || !latestAttempt.commandId) throw new AgentRunError("schema-invalid", `queue task ${taskId} has no Core attempt to refresh`, { httpStatus: 409 });
|
||||
const [run, command] = await Promise.all([store.getRun(latestAttempt.runId), store.getCommand(latestAttempt.commandId)]);
|
||||
if (command.runId !== run.id) throw new AgentRunError("schema-invalid", `queue task ${taskId} attempt references mismatched run and command`, { httpStatus: 409 });
|
||||
const state = queueStateFromRun(run);
|
||||
const sessionId = run.sessionRef?.sessionId ?? latestAttempt.sessionId ?? null;
|
||||
const sessionPath = sessionId ? `/api/v1/sessions/${encodeURIComponent(sessionId)}` : latestAttempt.sessionPath ?? null;
|
||||
const nextAttempt: QueueAttemptRef = { ...latestAttempt, state, sessionId, sessionPath };
|
||||
return store.updateQueueTaskAttempt(task.id, { state, latestAttempt: nextAttempt, sessionPath });
|
||||
}
|
||||
|
||||
function assertDispatchable(task: QueueTaskRecord): void {
|
||||
if (isTerminalQueueTaskState(task.state)) throw new AgentRunError(task.state === "cancelled" ? "cancelled" : "schema-invalid", `queue task ${task.id} is already terminal: ${task.state}`, { httpStatus: 409 });
|
||||
if (task.state !== "pending") throw new AgentRunError("schema-invalid", `queue task ${task.id} is not pending: ${task.state}`, { httpStatus: 409 });
|
||||
if (!task.workspaceRef) throw new AgentRunError("schema-invalid", "queue dispatch requires workspaceRef", { httpStatus: 400 });
|
||||
if (!task.providerId) throw new AgentRunError("schema-invalid", "queue dispatch requires providerId", { httpStatus: 400 });
|
||||
if (!task.executionPolicy) throw new AgentRunError("schema-invalid", "queue dispatch requires executionPolicy", { httpStatus: 400 });
|
||||
}
|
||||
|
||||
function queueStateFromRun(run: RunRecord): QueueTaskState {
|
||||
if (run.status === "completed") return "completed";
|
||||
if (run.status === "failed") return "failed";
|
||||
if (run.status === "blocked") return "blocked";
|
||||
if (run.status === "cancelled") return "cancelled";
|
||||
return "running";
|
||||
}
|
||||
|
||||
function buildRunInput(task: QueueTaskRecord, input: JsonRecord): CreateRunInput {
|
||||
if (!task.workspaceRef || !task.providerId || !task.executionPolicy) throw new AgentRunError("schema-invalid", "queue dispatch task is missing runtime fields", { httpStatus: 400 });
|
||||
const traceSink = input.traceSink === undefined ? { kind: "queue", taskId: task.id, queue: task.queue, lane: task.lane } : input.traceSink;
|
||||
return {
|
||||
tenantId: task.tenantId,
|
||||
projectId: task.projectId,
|
||||
workspaceRef: task.workspaceRef,
|
||||
resourceBundleRef: task.resourceBundleRef,
|
||||
providerId: task.providerId,
|
||||
backendProfile: task.backendProfile,
|
||||
executionPolicy: task.executionPolicy,
|
||||
traceSink: traceSink as JsonValue,
|
||||
};
|
||||
}
|
||||
|
||||
function buildRunnerJobInput(task: QueueTaskRecord, commandId: string, input: JsonRecord): CreateRunnerJobInput {
|
||||
const jobInput: CreateRunnerJobInput = { commandId, idempotencyKey: optionalString(input.idempotencyKey) ?? `queue:${task.id}:runner-job` };
|
||||
const copyString = (inputKey: string, outputKey: keyof CreateRunnerJobInput): void => {
|
||||
const value = optionalString(input[inputKey]);
|
||||
if (value) jobInput[outputKey] = value as never;
|
||||
};
|
||||
copyString("managerUrl", "managerUrl");
|
||||
copyString("image", "image");
|
||||
copyString("namespace", "namespace");
|
||||
copyString("attemptId", "attemptId");
|
||||
copyString("runnerId", "runnerId");
|
||||
copyString("sourceCommit", "sourceCommit");
|
||||
copyString("serviceAccountName", "serviceAccountName");
|
||||
if (input.transientEnv !== undefined) {
|
||||
if (!Array.isArray(input.transientEnv)) throw new AgentRunError("schema-invalid", "transientEnv must be an array", { httpStatus: 400 });
|
||||
jobInput.transientEnv = input.transientEnv.map((item, index) => asRecord(item, `transientEnv[${index}]`));
|
||||
}
|
||||
return jobInput;
|
||||
}
|
||||
|
||||
function stringFrom(record: JsonRecord, key: string): string {
|
||||
const value = record[key];
|
||||
if (typeof value !== "string" || value.length === 0) throw new AgentRunError("infra-failed", `runner job response missing ${key}`, { httpStatus: 502 });
|
||||
return value;
|
||||
}
|
||||
|
||||
function optionalString(value: unknown): string | undefined {
|
||||
return typeof value === "string" && value.trim().length > 0 ? value.trim() : undefined;
|
||||
}
|
||||
@@ -7,6 +7,7 @@ import { AgentRunError, errorToJson } from "../common/errors.js";
|
||||
import { asRecord, validateCreateCommand, validateCreateQueueTask, validateCreateRun, validateQueueTaskState } from "../common/validation.js";
|
||||
import type { ApiErrorBody, ApiOkBody, JsonRecord, JsonValue, RunEvent } from "../common/types.js";
|
||||
import { createKubernetesRunnerJob } from "./kubernetes-runner-job.js";
|
||||
import { dispatchQueueTask, refreshQueueTaskFromCore } from "./queue-dispatch.js";
|
||||
import { buildRunResult } from "./result.js";
|
||||
import { runnerJobStatusSummary } from "./runner-job-status.js";
|
||||
|
||||
@@ -81,6 +82,25 @@ async function route({ method, url, body, store, sourceCommit, runnerJobDefaults
|
||||
}
|
||||
const queueTaskMatch = path.match(/^\/api\/v1\/queue\/tasks\/([^/]+)$/u);
|
||||
if (method === "GET" && queueTaskMatch) return await store.getQueueTask(queueTaskMatch[1] ?? "") as unknown as JsonValue;
|
||||
const queueTaskDispatchMatch = path.match(/^\/api\/v1\/queue\/tasks\/([^/]+)\/dispatch$/u);
|
||||
if (method === "POST" && queueTaskDispatchMatch) {
|
||||
const namespace = runnerJobDefaults?.namespace ?? process.env.AGENTRUN_RUNTIME_NAMESPACE ?? "agentrun-v01";
|
||||
return await dispatchQueueTask({
|
||||
store,
|
||||
taskId: queueTaskDispatchMatch[1] ?? "",
|
||||
input: asRecord(body ?? {}, "queueDispatch"),
|
||||
defaults: {
|
||||
namespace,
|
||||
managerUrl: runnerJobDefaults?.managerUrl ?? process.env.AGENTRUN_INTERNAL_MGR_URL ?? `http://agentrun-mgr.${namespace}.svc.cluster.local:8080`,
|
||||
image: runnerJobDefaults?.image ?? process.env.AGENTRUN_RUNNER_IMAGE ?? "",
|
||||
sourceCommit,
|
||||
serviceAccountName: runnerJobDefaults?.serviceAccountName ?? process.env.AGENTRUN_RUNNER_SERVICE_ACCOUNT ?? "agentrun-v01-runner",
|
||||
...(runnerJobDefaults?.kubectlCommand ? { kubectlCommand: runnerJobDefaults.kubectlCommand } : {}),
|
||||
},
|
||||
}) as unknown as JsonValue;
|
||||
}
|
||||
const queueTaskRefreshMatch = path.match(/^\/api\/v1\/queue\/tasks\/([^/]+)\/refresh$/u);
|
||||
if (method === "POST" && queueTaskRefreshMatch) return await refreshQueueTaskFromCore(store, queueTaskRefreshMatch[1] ?? "") as unknown as JsonValue;
|
||||
const queueTaskCancelMatch = path.match(/^\/api\/v1\/queue\/tasks\/([^/]+)\/cancel$/u);
|
||||
if (method === "POST" && queueTaskCancelMatch) {
|
||||
const record = body === null ? {} : asRecord(body, "cancel");
|
||||
|
||||
+16
-1
@@ -1,4 +1,4 @@
|
||||
import type { BackendProfile, BackendTurnResult, CommandRecord, CreateCommandInput, CreateQueueTaskInput, CreateRunInput, FailureKind, JsonRecord, QueueCommanderSnapshot, QueueReadCursorRecord, QueueStats, QueueTaskListResult, QueueTaskRecord, QueueTaskState, RunEvent, RunnerJobRecord, RunnerRecord, RunRecord, SessionRecord, SessionRef, TerminalStatus } from "../common/types.js";
|
||||
import type { BackendProfile, BackendTurnResult, CommandRecord, CreateCommandInput, CreateQueueTaskInput, CreateRunInput, FailureKind, JsonRecord, QueueAttemptRef, QueueCommanderSnapshot, QueueReadCursorRecord, QueueStats, QueueTaskListResult, QueueTaskRecord, QueueTaskState, RunEvent, RunnerJobRecord, RunnerRecord, RunRecord, SessionRecord, SessionRef, TerminalStatus } from "../common/types.js";
|
||||
import { AgentRunError } from "../common/errors.js";
|
||||
import { newId, nowIso, stableHash } from "../common/validation.js";
|
||||
import { redactJson } from "../common/redaction.js";
|
||||
@@ -42,6 +42,7 @@ export interface AgentRunStore {
|
||||
createQueueTask(input: CreateQueueTaskInput): MaybePromise<QueueTaskRecord>;
|
||||
listQueueTasks(input: ListQueueTasksInput): MaybePromise<QueueTaskListResult>;
|
||||
getQueueTask(taskId: string): MaybePromise<QueueTaskRecord>;
|
||||
updateQueueTaskAttempt(taskId: string, input: UpdateQueueTaskAttemptInput): MaybePromise<QueueTaskRecord>;
|
||||
cancelQueueTask(taskId: string, reason?: string): MaybePromise<QueueTaskRecord>;
|
||||
markQueueTaskRead(taskId: string, readerId: string): MaybePromise<QueueReadCursorRecord>;
|
||||
queueStats(queue?: string): MaybePromise<QueueStats>;
|
||||
@@ -58,6 +59,12 @@ export interface ListQueueTasksInput {
|
||||
updatedAfter?: number;
|
||||
}
|
||||
|
||||
export interface UpdateQueueTaskAttemptInput {
|
||||
state: QueueTaskState;
|
||||
latestAttempt: QueueAttemptRef;
|
||||
sessionPath: string | null;
|
||||
}
|
||||
|
||||
export interface SaveRunnerJobInput {
|
||||
runId: string;
|
||||
commandId: string;
|
||||
@@ -299,6 +306,14 @@ export class MemoryAgentRunStore implements AgentRunStore {
|
||||
return task;
|
||||
}
|
||||
|
||||
updateQueueTaskAttempt(taskId: string, input: UpdateQueueTaskAttemptInput): QueueTaskRecord {
|
||||
const task = this.getQueueTask(taskId);
|
||||
if (isTerminalQueueTaskState(task.state)) throw new AgentRunError(task.state === "cancelled" ? "cancelled" : "schema-invalid", `queue task ${taskId} is already terminal: ${task.state}`, { httpStatus: 409 });
|
||||
const next: QueueTaskRecord = { ...task, state: input.state, latestAttempt: input.latestAttempt, sessionPath: input.sessionPath, version: this.nextQueueVersion(), updatedAt: nowIso() };
|
||||
this.queueTasks.set(taskId, next);
|
||||
return next;
|
||||
}
|
||||
|
||||
cancelQueueTask(taskId: string, reason = "cancel requested"): QueueTaskRecord {
|
||||
const task = this.getQueueTask(taskId);
|
||||
if (isTerminalQueueTaskState(task.state)) return task;
|
||||
|
||||
@@ -0,0 +1,101 @@
|
||||
import assert from "node:assert/strict";
|
||||
import { chmod, readFile, writeFile } from "node:fs/promises";
|
||||
import path from "node:path";
|
||||
import { startManagerServer } from "../../mgr/server.js";
|
||||
import { MemoryAgentRunStore } from "../../mgr/store.js";
|
||||
import { ManagerClient } from "../../mgr/client.js";
|
||||
import type { JsonRecord, QueueDispatchResult, QueueTaskRecord } from "../../common/types.js";
|
||||
import { assertNoSecretLeak, type SelfTestCase } from "../harness.js";
|
||||
|
||||
const selfTest: SelfTestCase = async (context) => {
|
||||
const fakeKubectl = path.join(context.tmp, "fake-kubectl-queue-q2.js");
|
||||
const createdManifest = path.join(context.tmp, "created-queue-q2-runner-job.json");
|
||||
await writeFile(fakeKubectl, `#!/usr/bin/env bun
|
||||
const chunks = [];
|
||||
for await (const chunk of Bun.stdin.stream()) chunks.push(chunk);
|
||||
const text = Buffer.concat(chunks.map((chunk) => Buffer.from(chunk))).toString("utf8");
|
||||
await Bun.write(${JSON.stringify(createdManifest)}, text);
|
||||
const manifest = JSON.parse(text);
|
||||
console.log(JSON.stringify({ apiVersion: manifest.apiVersion, kind: manifest.kind, metadata: { uid: "job-uid-queue-q2", resourceVersion: "1", name: manifest.metadata.name, namespace: manifest.metadata.namespace } }));
|
||||
`);
|
||||
await chmod(fakeKubectl, 0o755);
|
||||
const store = new MemoryAgentRunStore();
|
||||
const server = await startManagerServer({
|
||||
port: 0,
|
||||
host: "127.0.0.1",
|
||||
sourceCommit: "self-test",
|
||||
store,
|
||||
runnerJobDefaults: {
|
||||
namespace: "agentrun-v01",
|
||||
managerUrl: "http://agentrun-mgr.agentrun-v01.svc.cluster.local:8080",
|
||||
image: "127.0.0.1:5000/agentrun/agentrun-mgr@sha256:1111111111111111111111111111111111111111111111111111111111111111",
|
||||
kubectlCommand: fakeKubectl,
|
||||
},
|
||||
});
|
||||
try {
|
||||
const client = new ManagerClient(server.baseUrl);
|
||||
const created = await client.post("/api/v1/queue/tasks", {
|
||||
tenantId: "unidesk",
|
||||
projectId: "pikasTech/unidesk",
|
||||
queue: "dev",
|
||||
lane: "q2",
|
||||
title: "Q2 queue dispatch task",
|
||||
priority: 20,
|
||||
backendProfile: "codex",
|
||||
providerId: "G14",
|
||||
workspaceRef: { kind: "host-path", path: context.workspace },
|
||||
executionPolicy: {
|
||||
sandbox: "workspace-write",
|
||||
approval: "never",
|
||||
timeoutMs: 15_000,
|
||||
network: "default",
|
||||
secretScope: { allowCredentialEcho: false, providerCredentials: [{ profile: "codex", secretRef: { name: "agentrun-v01-provider-codex", keys: ["auth.json", "config.toml"], mountPath: context.codexHome } }] },
|
||||
},
|
||||
resourceBundleRef: null,
|
||||
payload: { prompt: "queue dispatch hello" },
|
||||
references: [{ kind: "issue", url: "https://github.com/pikasTech/agentrun/issues/39" }],
|
||||
metadata: { source: "queue-q2-self-test" },
|
||||
idempotencyKey: "queue-q2-dispatch-self-test",
|
||||
}) as QueueTaskRecord;
|
||||
const dispatched = await client.post(`/api/v1/queue/tasks/${created.id}/dispatch`, { attemptId: "attempt_queue_q2_selftest" }) as QueueDispatchResult;
|
||||
assert.equal(dispatched.action, "queue-dispatch");
|
||||
assert.equal(dispatched.mutation, true);
|
||||
assert.equal(dispatched.latestAttempt.attemptId, "attempt_queue_q2_selftest");
|
||||
assert.equal(dispatched.latestAttempt.runId, dispatched.run.id);
|
||||
assert.equal(dispatched.latestAttempt.commandId, dispatched.command.id);
|
||||
assert.ok(dispatched.latestAttempt.runnerJobId);
|
||||
assert.equal(dispatched.task.state, "running");
|
||||
assert.equal(dispatched.task.latestAttempt?.attemptId, "attempt_queue_q2_selftest");
|
||||
assert.equal(dispatched.task.sessionPath, null);
|
||||
|
||||
const shown = await client.get(`/api/v1/queue/tasks/${created.id}`) as QueueTaskRecord;
|
||||
assert.equal(shown.state, "running");
|
||||
assert.equal(shown.latestAttempt?.runId, dispatched.run.id);
|
||||
assert.equal(shown.latestAttempt?.commandId, dispatched.command.id);
|
||||
assert.equal(shown.latestAttempt?.runnerJobId, dispatched.latestAttempt.runnerJobId);
|
||||
|
||||
const jobs = await client.get(`/api/v1/runs/${dispatched.run.id}/runner-jobs?commandId=${dispatched.command.id}`) as { items?: JsonRecord[]; count?: number };
|
||||
assert.equal(jobs.count, 1);
|
||||
assert.equal(jobs.items?.[0]?.attemptId, "attempt_queue_q2_selftest");
|
||||
const events = await client.get(`/api/v1/runs/${dispatched.run.id}/events?afterSeq=0&limit=100`) as { items?: JsonRecord[] };
|
||||
assert.ok(events.items?.some((item) => ((item.payload as JsonRecord).phase) === "queue-dispatched"));
|
||||
await assert.rejects(
|
||||
() => client.post(`/api/v1/queue/tasks/${created.id}/dispatch`, { attemptId: "attempt_queue_q2_duplicate" }),
|
||||
(error) => error instanceof Error && error.message.includes("not pending"),
|
||||
);
|
||||
await client.patch(`/api/v1/commands/${dispatched.command.id}/status`, { terminalStatus: "completed", failureKind: null, failureMessage: null });
|
||||
await client.patch(`/api/v1/runs/${dispatched.run.id}/status`, { terminalStatus: "completed", failureKind: null, failureMessage: null });
|
||||
const refreshed = await client.post(`/api/v1/queue/tasks/${created.id}/refresh`, {}) as QueueTaskRecord;
|
||||
assert.equal(refreshed.state, "completed");
|
||||
assert.equal(refreshed.latestAttempt?.state, "completed");
|
||||
assert.equal(refreshed.latestAttempt?.runId, dispatched.run.id);
|
||||
const manifest = JSON.parse(await readFile(createdManifest, "utf8")) as JsonRecord;
|
||||
assert.ok(JSON.stringify(manifest).includes(dispatched.run.id));
|
||||
assertNoSecretLeak(dispatched);
|
||||
return { name: "queue-q2-dispatch", tests: ["queue-dispatch-run-command-runner-job", "queue-refresh-from-core-status", "queue-dispatch-no-repeat"] };
|
||||
} finally {
|
||||
await new Promise<void>((resolve) => server.server.close(() => resolve()));
|
||||
}
|
||||
};
|
||||
|
||||
export default selfTest;
|
||||
Reference in New Issue
Block a user