Merge pull request #134 from pikasTech/fix/issue132-queue-read-refresh
修复 queue 读视图 stale running 状态
This commit is contained in:
+41
-4
@@ -6,7 +6,7 @@ import { openAgentRunStoreFromEnv } from "./store.js";
|
||||
import { AgentRunError, errorToJson } from "../common/errors.js";
|
||||
import { asRecord, validateBackendProfile, validateCreateCommand, validateCreateQueueTask, validateCreateRun, validateQueueTaskState, validateSessionListState } from "../common/validation.js";
|
||||
import { isBackendProfile } from "../common/backend-profiles.js";
|
||||
import type { ApiErrorBody, ApiOkBody, JsonRecord, JsonValue, RunEvent } from "../common/types.js";
|
||||
import type { ApiErrorBody, ApiOkBody, JsonRecord, JsonValue, QueueTaskRecord, RunEvent } from "../common/types.js";
|
||||
import { createKubernetesRunnerJob } from "./kubernetes-runner-job.js";
|
||||
import { dispatchQueueTask, refreshQueueTaskFromCore } from "./queue-dispatch.js";
|
||||
import { buildRunResult } from "./result.js";
|
||||
@@ -86,6 +86,34 @@ async function readBody(req: import("node:http").IncomingMessage): Promise<unkno
|
||||
return JSON.parse(text) as unknown;
|
||||
}
|
||||
|
||||
async function refreshQueueTaskForRead(store: AgentRunStore, taskId: string): Promise<QueueTaskRecord> {
|
||||
const task = await store.getQueueTask(taskId);
|
||||
return await refreshQueueTaskRecordForRead(store, task);
|
||||
}
|
||||
|
||||
async function refreshQueuePageForRead(store: AgentRunStore, input: ListQueueTasksInput): Promise<void> {
|
||||
const page = await store.listQueueTasks(input);
|
||||
await refreshQueueTaskRecordsForRead(store, page.items);
|
||||
}
|
||||
|
||||
async function refreshRunningQueueTasksForRead(store: AgentRunStore, queue?: string): Promise<void> {
|
||||
const page = await store.listQueueTasks({ ...(queue ? { queue } : {}), state: "running", limit: 100 });
|
||||
await refreshQueueTaskRecordsForRead(store, page.items);
|
||||
}
|
||||
|
||||
async function refreshQueueTaskRecordsForRead(store: AgentRunStore, tasks: QueueTaskRecord[]): Promise<void> {
|
||||
await Promise.all(tasks.map((task) => refreshQueueTaskRecordForRead(store, task)));
|
||||
}
|
||||
|
||||
async function refreshQueueTaskRecordForRead(store: AgentRunStore, task: QueueTaskRecord): Promise<QueueTaskRecord> {
|
||||
if (!task.latestAttempt?.runId || !task.latestAttempt.commandId) return task;
|
||||
try {
|
||||
return await refreshQueueTaskFromCore(store, task.id);
|
||||
} catch {
|
||||
return task;
|
||||
}
|
||||
}
|
||||
|
||||
async function route({ method, url, body, store, sourceCommit, runnerJobDefaults, sessionPvcDefaults, providerProfileDefaults }: { method: string; url: URL; body: unknown; store: AgentRunStore; sourceCommit: string; runnerJobDefaults?: NonNullable<ManagerServerOptions["runnerJobDefaults"]>; sessionPvcDefaults?: NonNullable<ManagerServerOptions["sessionPvcOptions"]>; providerProfileDefaults?: NonNullable<ManagerServerOptions["providerProfileOptions"]> }): Promise<JsonValue> {
|
||||
const path = url.pathname;
|
||||
if (method === "GET" && (path === "/health" || path === "/health/live" || path === "/health/readiness")) {
|
||||
@@ -261,10 +289,11 @@ async function route({ method, url, body, store, sourceCommit, runnerJobDefaults
|
||||
if (queue) listInput.queue = queue;
|
||||
if (state) listInput.state = validateQueueTaskState(state);
|
||||
if (cursor) listInput.cursor = cursor;
|
||||
await refreshQueuePageForRead(store, listInput);
|
||||
return await store.listQueueTasks(listInput) as unknown as JsonValue;
|
||||
}
|
||||
const queueTaskMatch = path.match(/^\/api\/v1\/queue\/tasks\/([^/]+)$/u);
|
||||
if (method === "GET" && queueTaskMatch) return await store.getQueueTask(queueTaskMatch[1] ?? "") as unknown as JsonValue;
|
||||
if (method === "GET" && queueTaskMatch) return await refreshQueueTaskForRead(store, queueTaskMatch[1] ?? "") as unknown as JsonValue;
|
||||
const queueTaskDispatchMatch = path.match(/^\/api\/v1\/queue\/tasks\/([^/]+)\/dispatch$/u);
|
||||
if (method === "POST" && queueTaskDispatchMatch) {
|
||||
const namespace = runnerJobDefaults?.namespace ?? process.env.AGENTRUN_RUNTIME_NAMESPACE ?? "agentrun-v01";
|
||||
@@ -297,8 +326,16 @@ async function route({ method, url, body, store, sourceCommit, runnerJobDefaults
|
||||
const readerId = typeof record.readerId === "string" && record.readerId.trim().length > 0 ? record.readerId.trim() : "cli";
|
||||
return await store.markQueueTaskRead(queueTaskReadMatch[1] ?? "", readerId) as unknown as JsonValue;
|
||||
}
|
||||
if (method === "GET" && path === "/api/v1/queue/stats") return await store.queueStats(url.searchParams.get("queue") ?? undefined) as unknown as JsonValue;
|
||||
if (method === "GET" && path === "/api/v1/queue/commander") return await store.queueCommander(url.searchParams.get("queue") ?? undefined, url.searchParams.get("readerId")) as unknown as JsonValue;
|
||||
if (method === "GET" && path === "/api/v1/queue/stats") {
|
||||
const queue = url.searchParams.get("queue") ?? undefined;
|
||||
await refreshRunningQueueTasksForRead(store, queue);
|
||||
return await store.queueStats(queue) as unknown as JsonValue;
|
||||
}
|
||||
if (method === "GET" && path === "/api/v1/queue/commander") {
|
||||
const queue = url.searchParams.get("queue") ?? undefined;
|
||||
await refreshRunningQueueTasksForRead(store, queue);
|
||||
return await store.queueCommander(queue, url.searchParams.get("readerId")) as unknown as JsonValue;
|
||||
}
|
||||
if (method === "POST" && path === "/api/v1/runs") return await store.createRun(validateCreateRun(body)) as unknown as JsonValue;
|
||||
const runMatch = path.match(/^\/api\/v1\/runs\/([^/]+)$/u);
|
||||
if (method === "GET" && runMatch) return await store.getRun(runMatch[1] ?? "") as unknown as JsonValue;
|
||||
|
||||
@@ -107,6 +107,15 @@ process.exit(1);
|
||||
(error) => error instanceof Error && error.message.includes("not pending"),
|
||||
);
|
||||
await client.patch(`/api/v1/commands/${dispatched.command.id}/status`, { terminalStatus: "completed", failureKind: null, failureMessage: null });
|
||||
const autoShown = await client.get(`/api/v1/queue/tasks/${created.id}`) as QueueTaskRecord;
|
||||
assert.equal(autoShown.state, "completed");
|
||||
assert.equal(autoShown.latestAttempt?.state, "completed");
|
||||
assert.equal(autoShown.latestAttempt?.runId, dispatched.run.id);
|
||||
const commander = await client.get("/api/v1/queue/commander?queue=dev&readerId=queue-q2-self-test") as { items?: QueueTaskRecord[]; stats?: JsonRecord };
|
||||
const commanderTask = commander.items?.find((item) => item.id === created.id);
|
||||
assert.equal(commanderTask?.state, "completed");
|
||||
assert.equal(commanderTask?.latestAttempt?.state, "completed");
|
||||
assert.equal(((commander.stats?.byState as JsonRecord).completed), 1);
|
||||
const refreshed = await client.post(`/api/v1/queue/tasks/${created.id}/refresh`, {}) as QueueTaskRecord;
|
||||
assert.equal(refreshed.state, "completed");
|
||||
assert.equal(refreshed.latestAttempt?.state, "completed");
|
||||
@@ -202,7 +211,7 @@ process.exit(1);
|
||||
assert.ok(JSON.stringify(cancelManifest).includes(cancelDispatched.run.id));
|
||||
assertNoSecretLeak(dispatched);
|
||||
assertNoSecretLeak(cancelled);
|
||||
return { name: "queue-q2-dispatch", tests: ["queue-dispatch-run-command-runner-job", "queue-refresh-from-core-status", "queue-dispatch-no-repeat", "queue-unidesk-ssh-endpoint-auto-env", "queue-cancel-propagates-to-run-command-session"] };
|
||||
return { name: "queue-q2-dispatch", tests: ["queue-dispatch-run-command-runner-job", "queue-read-views-refresh-terminal-state", "queue-refresh-from-core-status", "queue-dispatch-no-repeat", "queue-unidesk-ssh-endpoint-auto-env", "queue-cancel-propagates-to-run-command-session"] };
|
||||
} finally {
|
||||
await new Promise<void>((resolve) => server.server.close(() => resolve()));
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user