From c8b16260b8a79e25480e5d35c4fe44956f3a0443 Mon Sep 17 00:00:00 2001 From: Lyon <88232613+pikasTech@users.noreply.github.com> Date: Thu, 11 Jun 2026 13:11:20 +0800 Subject: [PATCH] fix: provision session pvc before runner dispatch (#168) Co-authored-by: AgentRun Codex --- src/mgr/kubernetes-runner-job.ts | 33 ++++++++++++++++++++-- src/selftest/cases/20-runner-k8s-job.ts | 4 +++ src/selftest/cases/75-queue-q2-dispatch.ts | 25 ++++++++++++++++ 3 files changed, 59 insertions(+), 3 deletions(-) diff --git a/src/mgr/kubernetes-runner-job.ts b/src/mgr/kubernetes-runner-job.ts index 316e309..38e2aae 100644 --- a/src/mgr/kubernetes-runner-job.ts +++ b/src/mgr/kubernetes-runner-job.ts @@ -9,6 +9,7 @@ import { renderRunnerJobManifest } from "../runner/k8s-job.js"; import type { RunnerSessionPvcOptions, RunnerTransientEnv } from "../runner/k8s-job.js"; import { staticWorkReadyCapabilitySummary } from "../common/work-ready.js"; import { resolveRunnerEnvImage } from "../common/env-image-ref.js"; +import { ensureSessionPvc } from "./session-pvc.js"; const reusableCredentialEnvNames = new Set([ "AUTH_PASSWORD", @@ -104,14 +105,38 @@ export async function createKubernetesRunnerJob(options: { store: AgentRunStore; if (isTerminalRunStatus(run.status)) throw new AgentRunError(run.failureKind ?? (run.status === "cancelled" ? "cancelled" : "schema-invalid"), `run ${run.id} is already terminal: ${run.status}`, { httpStatus: 409 }); if (isTerminalCommandState(command.state) || command.state !== "pending") throw new AgentRunError(command.state === "cancelled" ? "cancelled" : "schema-invalid", `command ${commandId} is not pending: ${command.state}`, { httpStatus: 409 }); let sessionPvc: RunnerSessionPvcOptions | undefined; + let sessionPvcSummary: JsonRecord | null = null; if (run.sessionRef?.sessionId) { const session = await options.store.getSession(run.sessionRef.sessionId); if (session?.storageKind === "evicted") { throw new AgentRunError("session-store-evicted", `session ${session.sessionId} storage has been evicted; create a new sessionId`, { httpStatus: 409, details: { sessionId: session.sessionId, pvcName: session.storagePvcName ?? null, pvcPhase: session.storagePvcPhase ?? null, valuesPrinted: false } }); } - if (session?.storageKind === "pvc" && session.storagePvcName) { - const subdir = session.codexRolloutSubdir ?? "sessions"; - sessionPvc = { pvcName: session.storagePvcName, namespace: session.storageNamespace ?? "agentrun-v01", mountPath: `/home/agentrun/.codex-${run.backendProfile}/${subdir}`, codexRolloutSubdir: subdir }; + if (!session) throw new AgentRunError("schema-invalid", `session ${run.sessionRef.sessionId} was not found`, { httpStatus: 404 }); + const ensured = await ensureSessionPvc({ + store: options.store, + sessionId: run.sessionRef.sessionId, + namespace, + options: { ...(options.defaults.kubectlCommand ? { kubectlCommand: options.defaults.kubectlCommand } : {}), defaultCodexRolloutSubdir: session.codexRolloutSubdir ?? "sessions" }, + }); + const refreshed = await options.store.getSession(run.sessionRef.sessionId); + if (refreshed?.storageKind === "evicted") { + throw new AgentRunError("session-store-evicted", `session ${refreshed.sessionId} storage has been evicted; create a new sessionId`, { httpStatus: 409, details: { sessionId: refreshed.sessionId, pvcName: refreshed.storagePvcName ?? null, pvcPhase: refreshed.storagePvcPhase ?? null, valuesPrinted: false } }); + } + const pvcName = refreshed?.storagePvcName ?? ensured.pvcName; + if (!pvcName) throw new AgentRunError("infra-failed", `session ${run.sessionRef.sessionId} PVC was not resolved for runner job`, { httpStatus: 502 }); + const subdir = refreshed?.codexRolloutSubdir ?? ensured.codexRolloutSubdir ?? "sessions"; + sessionPvc = { pvcName, namespace: refreshed?.storageNamespace ?? ensured.namespace ?? namespace, mountPath: `/home/agentrun/.codex-${run.backendProfile}/${subdir}`, codexRolloutSubdir: subdir }; + sessionPvcSummary = { + sessionId: run.sessionRef.sessionId, + pvcName: sessionPvc.pvcName, + namespace: sessionPvc.namespace, + pvcPhase: refreshed?.storagePvcPhase ?? ensured.pvcPhase ?? null, + mountPath: sessionPvc.mountPath, + codexRolloutSubdir: sessionPvc.codexRolloutSubdir, + valuesPrinted: false, + }; + if (ensured.pvcPhase === "NotFound" || ensured.pvcPhase === "Unknown") { + throw new AgentRunError("infra-failed", `session ${run.sessionRef.sessionId} PVC is not ready for runner job`, { httpStatus: 502, details: sessionPvcSummary }); } } const renderOptions: Parameters[0] = { @@ -181,6 +206,7 @@ export async function createKubernetesRunnerJob(options: { store: AgentRunStore; toolCredentials: summarizeToolCredentials(render.toolCredentials, render.namespace), transientEnv: summarizeTransientEnv(transientEnv), transientEnvSecret: transientEnvSecretResponse, + sessionPvc: sessionPvcSummary, workReady: staticWorkReadyCapabilitySummary(), retention: { ttlSecondsAfterFinished: render.ttlSecondsAfterFinished, @@ -227,6 +253,7 @@ export async function createKubernetesRunnerJob(options: { store: AgentRunStore; envImage, workReady: staticWorkReadyCapabilitySummary(), toolCredentials: summarizeToolCredentials(render.toolCredentials, render.namespace), + sessionPvc: sessionPvcSummary, sessionRef: summarizeSessionRef(run.sessionRef ?? null), resourceBundleRef: summarizeResourceBundleRef(run.resourceBundleRef ?? null), }); diff --git a/src/selftest/cases/20-runner-k8s-job.ts b/src/selftest/cases/20-runner-k8s-job.ts index 258c75b..14f71e9 100644 --- a/src/selftest/cases/20-runner-k8s-job.ts +++ b/src/selftest/cases/20-runner-k8s-job.ts @@ -152,6 +152,10 @@ if (args[0] === "create") { console.log(JSON.stringify({ apiVersion: manifest.apiVersion, kind: manifest.kind, metadata: { uid, resourceVersion: "1", name: manifest.metadata.name, namespace: manifest.metadata.namespace } })); process.exit(0); } +if (args[0] === "get" && args[1] === "pvc") { + console.error("Error from server (NotFound): persistentvolumeclaims " + args[2] + " not found"); + process.exit(1); +} if (args[0] === "patch" && args[1] === "secret") { await Bun.write(${JSON.stringify(patchedTransientEnvSecret)}, JSON.stringify({ args }, null, 2)); console.log(JSON.stringify({ apiVersion: "v1", kind: "Secret", metadata: { uid: "secret-uid-selftest", resourceVersion: "2", name: args[2], namespace: args[4] } })); diff --git a/src/selftest/cases/75-queue-q2-dispatch.ts b/src/selftest/cases/75-queue-q2-dispatch.ts index 5143469..8f347f2 100644 --- a/src/selftest/cases/75-queue-q2-dispatch.ts +++ b/src/selftest/cases/75-queue-q2-dispatch.ts @@ -26,6 +26,10 @@ if (args[0] === "create") { console.log(JSON.stringify({ apiVersion: manifest.apiVersion, kind: manifest.kind, metadata: { uid, resourceVersion: "1", name: manifest.metadata.name, namespace: manifest.metadata.namespace } })); process.exit(0); } +if (args[0] === "get" && args[1] === "pvc") { + console.error("Error from server (NotFound): persistentvolumeclaims " + args[2] + " not found"); + process.exit(1); +} if (args[0] === "patch" && args[1] === "secret") { console.log(JSON.stringify({ apiVersion: "v1", kind: "Secret", metadata: { uid: "secret-uid-queue-q2", resourceVersion: "2", name: args[2], namespace: args[4] } })); process.exit(0); @@ -168,6 +172,10 @@ process.exit(1); assert.equal(refreshed.latestAttempt?.sessionPath, "/api/v1/sessions/sess_queue_q2_dispatch_selftest"); const dispatchManifest = JSON.parse(await readFile(createdManifest, "utf8")) as JsonRecord; assert.ok(JSON.stringify(dispatchManifest).includes(dispatched.run.id)); + assert.equal(runnerEnvValue(dispatchManifest, "AGENTRUN_SESSION_PVC_NAME"), "agentrun-v01-session-sess-queue-q2-dispatch-selftest"); + assert.equal(runnerEnvValue(dispatchManifest, "AGENTRUN_SESSION_PVC_MOUNT_PATH"), "/home/agentrun/.codex-codex/sessions"); + assert.ok(runnerVolumeMount(dispatchManifest, "agentrun-sessions", "/home/agentrun/.codex-codex/sessions"), "queue dispatch must mount session PVC for same-session resume"); + assert.ok(runnerPersistentVolumeClaim(dispatchManifest, "agentrun-sessions", "agentrun-v01-session-sess-queue-q2-dispatch-selftest"), "queue dispatch PVC volume must reference per-session claim"); const unideskCreated = await client.post("/api/v1/queue/tasks", { tenantId: "unidesk", @@ -325,3 +333,20 @@ function runnerEnvValue(manifest: JsonRecord, name: string): unknown { if (item.valueFrom) return "secretRef"; return item.value; } + +function runnerVolumeMount(manifest: JsonRecord, volumeName: string, mountPath: string): boolean { + const spec = manifest.spec as JsonRecord; + const template = spec.template as JsonRecord; + const podSpec = template.spec as JsonRecord; + const containers = podSpec.containers as JsonRecord[]; + const mounts = containers[0]?.volumeMounts as JsonRecord[]; + return Array.isArray(mounts) && mounts.some((mount) => mount.name === volumeName && mount.mountPath === mountPath && mount.readOnly === false); +} + +function runnerPersistentVolumeClaim(manifest: JsonRecord, volumeName: string, claimName: string): boolean { + const spec = manifest.spec as JsonRecord; + const template = spec.template as JsonRecord; + const podSpec = template.spec as JsonRecord; + const volumes = podSpec.volumes as JsonRecord[]; + return Array.isArray(volumes) && volumes.some((volume) => volume.name === volumeName && ((volume.persistentVolumeClaim as JsonRecord | undefined)?.claimName) === claimName); +}