fix: refresh queue read views from terminal commands

This commit is contained in:
Codex
2026-06-10 08:02:01 +08:00
parent 65b670f4f0
commit 461c775f05
2 changed files with 51 additions and 5 deletions
+41 -4
View File
@@ -6,7 +6,7 @@ import { openAgentRunStoreFromEnv } from "./store.js";
import { AgentRunError, errorToJson } from "../common/errors.js";
import { asRecord, validateBackendProfile, validateCreateCommand, validateCreateQueueTask, validateCreateRun, validateQueueTaskState, validateSessionListState } from "../common/validation.js";
import { isBackendProfile } from "../common/backend-profiles.js";
import type { ApiErrorBody, ApiOkBody, JsonRecord, JsonValue, RunEvent } from "../common/types.js";
import type { ApiErrorBody, ApiOkBody, JsonRecord, JsonValue, QueueTaskRecord, RunEvent } from "../common/types.js";
import { createKubernetesRunnerJob } from "./kubernetes-runner-job.js";
import { dispatchQueueTask, refreshQueueTaskFromCore } from "./queue-dispatch.js";
import { buildRunResult } from "./result.js";
@@ -86,6 +86,34 @@ async function readBody(req: import("node:http").IncomingMessage): Promise<unkno
return JSON.parse(text) as unknown;
}
async function refreshQueueTaskForRead(store: AgentRunStore, taskId: string): Promise<QueueTaskRecord> {
const task = await store.getQueueTask(taskId);
return await refreshQueueTaskRecordForRead(store, task);
}
async function refreshQueuePageForRead(store: AgentRunStore, input: ListQueueTasksInput): Promise<void> {
const page = await store.listQueueTasks(input);
await refreshQueueTaskRecordsForRead(store, page.items);
}
async function refreshRunningQueueTasksForRead(store: AgentRunStore, queue?: string): Promise<void> {
const page = await store.listQueueTasks({ ...(queue ? { queue } : {}), state: "running", limit: 100 });
await refreshQueueTaskRecordsForRead(store, page.items);
}
async function refreshQueueTaskRecordsForRead(store: AgentRunStore, tasks: QueueTaskRecord[]): Promise<void> {
await Promise.all(tasks.map((task) => refreshQueueTaskRecordForRead(store, task)));
}
async function refreshQueueTaskRecordForRead(store: AgentRunStore, task: QueueTaskRecord): Promise<QueueTaskRecord> {
if (!task.latestAttempt?.runId || !task.latestAttempt.commandId) return task;
try {
return await refreshQueueTaskFromCore(store, task.id);
} catch {
return task;
}
}
async function route({ method, url, body, store, sourceCommit, runnerJobDefaults, sessionPvcDefaults, providerProfileDefaults }: { method: string; url: URL; body: unknown; store: AgentRunStore; sourceCommit: string; runnerJobDefaults?: NonNullable<ManagerServerOptions["runnerJobDefaults"]>; sessionPvcDefaults?: NonNullable<ManagerServerOptions["sessionPvcOptions"]>; providerProfileDefaults?: NonNullable<ManagerServerOptions["providerProfileOptions"]> }): Promise<JsonValue> {
const path = url.pathname;
if (method === "GET" && (path === "/health" || path === "/health/live" || path === "/health/readiness")) {
@@ -261,10 +289,11 @@ async function route({ method, url, body, store, sourceCommit, runnerJobDefaults
if (queue) listInput.queue = queue;
if (state) listInput.state = validateQueueTaskState(state);
if (cursor) listInput.cursor = cursor;
await refreshQueuePageForRead(store, listInput);
return await store.listQueueTasks(listInput) as unknown as JsonValue;
}
const queueTaskMatch = path.match(/^\/api\/v1\/queue\/tasks\/([^/]+)$/u);
if (method === "GET" && queueTaskMatch) return await store.getQueueTask(queueTaskMatch[1] ?? "") as unknown as JsonValue;
if (method === "GET" && queueTaskMatch) return await refreshQueueTaskForRead(store, queueTaskMatch[1] ?? "") as unknown as JsonValue;
const queueTaskDispatchMatch = path.match(/^\/api\/v1\/queue\/tasks\/([^/]+)\/dispatch$/u);
if (method === "POST" && queueTaskDispatchMatch) {
const namespace = runnerJobDefaults?.namespace ?? process.env.AGENTRUN_RUNTIME_NAMESPACE ?? "agentrun-v01";
@@ -297,8 +326,16 @@ async function route({ method, url, body, store, sourceCommit, runnerJobDefaults
const readerId = typeof record.readerId === "string" && record.readerId.trim().length > 0 ? record.readerId.trim() : "cli";
return await store.markQueueTaskRead(queueTaskReadMatch[1] ?? "", readerId) as unknown as JsonValue;
}
if (method === "GET" && path === "/api/v1/queue/stats") return await store.queueStats(url.searchParams.get("queue") ?? undefined) as unknown as JsonValue;
if (method === "GET" && path === "/api/v1/queue/commander") return await store.queueCommander(url.searchParams.get("queue") ?? undefined, url.searchParams.get("readerId")) as unknown as JsonValue;
if (method === "GET" && path === "/api/v1/queue/stats") {
const queue = url.searchParams.get("queue") ?? undefined;
await refreshRunningQueueTasksForRead(store, queue);
return await store.queueStats(queue) as unknown as JsonValue;
}
if (method === "GET" && path === "/api/v1/queue/commander") {
const queue = url.searchParams.get("queue") ?? undefined;
await refreshRunningQueueTasksForRead(store, queue);
return await store.queueCommander(queue, url.searchParams.get("readerId")) as unknown as JsonValue;
}
if (method === "POST" && path === "/api/v1/runs") return await store.createRun(validateCreateRun(body)) as unknown as JsonValue;
const runMatch = path.match(/^\/api\/v1\/runs\/([^/]+)$/u);
if (method === "GET" && runMatch) return await store.getRun(runMatch[1] ?? "") as unknown as JsonValue;
+10 -1
View File
@@ -107,6 +107,15 @@ process.exit(1);
(error) => error instanceof Error && error.message.includes("not pending"),
);
await client.patch(`/api/v1/commands/${dispatched.command.id}/status`, { terminalStatus: "completed", failureKind: null, failureMessage: null });
const autoShown = await client.get(`/api/v1/queue/tasks/${created.id}`) as QueueTaskRecord;
assert.equal(autoShown.state, "completed");
assert.equal(autoShown.latestAttempt?.state, "completed");
assert.equal(autoShown.latestAttempt?.runId, dispatched.run.id);
const commander = await client.get("/api/v1/queue/commander?queue=dev&readerId=queue-q2-self-test") as { items?: QueueTaskRecord[]; stats?: JsonRecord };
const commanderTask = commander.items?.find((item) => item.id === created.id);
assert.equal(commanderTask?.state, "completed");
assert.equal(commanderTask?.latestAttempt?.state, "completed");
assert.equal(((commander.stats?.byState as JsonRecord).completed), 1);
const refreshed = await client.post(`/api/v1/queue/tasks/${created.id}/refresh`, {}) as QueueTaskRecord;
assert.equal(refreshed.state, "completed");
assert.equal(refreshed.latestAttempt?.state, "completed");
@@ -202,7 +211,7 @@ process.exit(1);
assert.ok(JSON.stringify(cancelManifest).includes(cancelDispatched.run.id));
assertNoSecretLeak(dispatched);
assertNoSecretLeak(cancelled);
return { name: "queue-q2-dispatch", tests: ["queue-dispatch-run-command-runner-job", "queue-refresh-from-core-status", "queue-dispatch-no-repeat", "queue-unidesk-ssh-endpoint-auto-env", "queue-cancel-propagates-to-run-command-session"] };
return { name: "queue-q2-dispatch", tests: ["queue-dispatch-run-command-runner-job", "queue-read-views-refresh-terminal-state", "queue-refresh-from-core-status", "queue-dispatch-no-repeat", "queue-unidesk-ssh-endpoint-auto-env", "queue-cancel-propagates-to-run-command-session"] };
} finally {
await new Promise<void>((resolve) => server.server.close(() => resolve()));
}