diff --git a/src/mgr/server.ts b/src/mgr/server.ts index bd633db..cc89727 100644 --- a/src/mgr/server.ts +++ b/src/mgr/server.ts @@ -6,7 +6,7 @@ import { openAgentRunStoreFromEnv } from "./store.js"; import { AgentRunError, errorToJson } from "../common/errors.js"; import { asRecord, validateBackendProfile, validateCreateCommand, validateCreateQueueTask, validateCreateRun, validateQueueTaskState, validateSessionListState } from "../common/validation.js"; import { isBackendProfile } from "../common/backend-profiles.js"; -import type { ApiErrorBody, ApiOkBody, JsonRecord, JsonValue, RunEvent } from "../common/types.js"; +import type { ApiErrorBody, ApiOkBody, JsonRecord, JsonValue, QueueTaskRecord, RunEvent } from "../common/types.js"; import { createKubernetesRunnerJob } from "./kubernetes-runner-job.js"; import { dispatchQueueTask, refreshQueueTaskFromCore } from "./queue-dispatch.js"; import { buildRunResult } from "./result.js"; @@ -86,6 +86,34 @@ async function readBody(req: import("node:http").IncomingMessage): Promise { + const task = await store.getQueueTask(taskId); + return await refreshQueueTaskRecordForRead(store, task); +} + +async function refreshQueuePageForRead(store: AgentRunStore, input: ListQueueTasksInput): Promise { + const page = await store.listQueueTasks(input); + await refreshQueueTaskRecordsForRead(store, page.items); +} + +async function refreshRunningQueueTasksForRead(store: AgentRunStore, queue?: string): Promise { + const page = await store.listQueueTasks({ ...(queue ? { queue } : {}), state: "running", limit: 100 }); + await refreshQueueTaskRecordsForRead(store, page.items); +} + +async function refreshQueueTaskRecordsForRead(store: AgentRunStore, tasks: QueueTaskRecord[]): Promise { + await Promise.all(tasks.map((task) => refreshQueueTaskRecordForRead(store, task))); +} + +async function refreshQueueTaskRecordForRead(store: AgentRunStore, task: QueueTaskRecord): Promise { + if (!task.latestAttempt?.runId || !task.latestAttempt.commandId) return task; + try { + return await refreshQueueTaskFromCore(store, task.id); + } catch { + return task; + } +} + async function route({ method, url, body, store, sourceCommit, runnerJobDefaults, sessionPvcDefaults, providerProfileDefaults }: { method: string; url: URL; body: unknown; store: AgentRunStore; sourceCommit: string; runnerJobDefaults?: NonNullable; sessionPvcDefaults?: NonNullable; providerProfileDefaults?: NonNullable }): Promise { const path = url.pathname; if (method === "GET" && (path === "/health" || path === "/health/live" || path === "/health/readiness")) { @@ -261,10 +289,11 @@ async function route({ method, url, body, store, sourceCommit, runnerJobDefaults if (queue) listInput.queue = queue; if (state) listInput.state = validateQueueTaskState(state); if (cursor) listInput.cursor = cursor; + await refreshQueuePageForRead(store, listInput); return await store.listQueueTasks(listInput) as unknown as JsonValue; } const queueTaskMatch = path.match(/^\/api\/v1\/queue\/tasks\/([^/]+)$/u); - if (method === "GET" && queueTaskMatch) return await store.getQueueTask(queueTaskMatch[1] ?? "") as unknown as JsonValue; + if (method === "GET" && queueTaskMatch) return await refreshQueueTaskForRead(store, queueTaskMatch[1] ?? "") as unknown as JsonValue; const queueTaskDispatchMatch = path.match(/^\/api\/v1\/queue\/tasks\/([^/]+)\/dispatch$/u); if (method === "POST" && queueTaskDispatchMatch) { const namespace = runnerJobDefaults?.namespace ?? process.env.AGENTRUN_RUNTIME_NAMESPACE ?? "agentrun-v01"; @@ -297,8 +326,16 @@ async function route({ method, url, body, store, sourceCommit, runnerJobDefaults const readerId = typeof record.readerId === "string" && record.readerId.trim().length > 0 ? record.readerId.trim() : "cli"; return await store.markQueueTaskRead(queueTaskReadMatch[1] ?? "", readerId) as unknown as JsonValue; } - if (method === "GET" && path === "/api/v1/queue/stats") return await store.queueStats(url.searchParams.get("queue") ?? undefined) as unknown as JsonValue; - if (method === "GET" && path === "/api/v1/queue/commander") return await store.queueCommander(url.searchParams.get("queue") ?? undefined, url.searchParams.get("readerId")) as unknown as JsonValue; + if (method === "GET" && path === "/api/v1/queue/stats") { + const queue = url.searchParams.get("queue") ?? undefined; + await refreshRunningQueueTasksForRead(store, queue); + return await store.queueStats(queue) as unknown as JsonValue; + } + if (method === "GET" && path === "/api/v1/queue/commander") { + const queue = url.searchParams.get("queue") ?? undefined; + await refreshRunningQueueTasksForRead(store, queue); + return await store.queueCommander(queue, url.searchParams.get("readerId")) as unknown as JsonValue; + } if (method === "POST" && path === "/api/v1/runs") return await store.createRun(validateCreateRun(body)) as unknown as JsonValue; const runMatch = path.match(/^\/api\/v1\/runs\/([^/]+)$/u); if (method === "GET" && runMatch) return await store.getRun(runMatch[1] ?? "") as unknown as JsonValue; diff --git a/src/selftest/cases/75-queue-q2-dispatch.ts b/src/selftest/cases/75-queue-q2-dispatch.ts index 2912348..6639fd1 100644 --- a/src/selftest/cases/75-queue-q2-dispatch.ts +++ b/src/selftest/cases/75-queue-q2-dispatch.ts @@ -107,6 +107,15 @@ process.exit(1); (error) => error instanceof Error && error.message.includes("not pending"), ); await client.patch(`/api/v1/commands/${dispatched.command.id}/status`, { terminalStatus: "completed", failureKind: null, failureMessage: null }); + const autoShown = await client.get(`/api/v1/queue/tasks/${created.id}`) as QueueTaskRecord; + assert.equal(autoShown.state, "completed"); + assert.equal(autoShown.latestAttempt?.state, "completed"); + assert.equal(autoShown.latestAttempt?.runId, dispatched.run.id); + const commander = await client.get("/api/v1/queue/commander?queue=dev&readerId=queue-q2-self-test") as { items?: QueueTaskRecord[]; stats?: JsonRecord }; + const commanderTask = commander.items?.find((item) => item.id === created.id); + assert.equal(commanderTask?.state, "completed"); + assert.equal(commanderTask?.latestAttempt?.state, "completed"); + assert.equal(((commander.stats?.byState as JsonRecord).completed), 1); const refreshed = await client.post(`/api/v1/queue/tasks/${created.id}/refresh`, {}) as QueueTaskRecord; assert.equal(refreshed.state, "completed"); assert.equal(refreshed.latestAttempt?.state, "completed"); @@ -202,7 +211,7 @@ process.exit(1); assert.ok(JSON.stringify(cancelManifest).includes(cancelDispatched.run.id)); assertNoSecretLeak(dispatched); assertNoSecretLeak(cancelled); - return { name: "queue-q2-dispatch", tests: ["queue-dispatch-run-command-runner-job", "queue-refresh-from-core-status", "queue-dispatch-no-repeat", "queue-unidesk-ssh-endpoint-auto-env", "queue-cancel-propagates-to-run-command-session"] }; + return { name: "queue-q2-dispatch", tests: ["queue-dispatch-run-command-runner-job", "queue-read-views-refresh-terminal-state", "queue-refresh-from-core-status", "queue-dispatch-no-repeat", "queue-unidesk-ssh-endpoint-auto-env", "queue-cancel-propagates-to-run-command-session"] }; } finally { await new Promise((resolve) => server.server.close(() => resolve())); }