Files
pikasTech-agentrun/src/mgr/kubernetes-runner-job.ts
T

569 lines
30 KiB
TypeScript

import { spawn } from "node:child_process";
import { AgentRunError } from "../common/errors.js";
import { redactJson, redactText } from "../common/redaction.js";
import { isLeaseExpired, isTerminalCommandState, isTerminalRunStatus, summarizeResourceBundleRef, summarizeSessionRef } from "./store.js";
import type { AgentRunStore } from "./store.js";
import type { ExecutionPolicy, JsonRecord } from "../common/types.js";
import { stableHash, validateEnvName } from "../common/validation.js";
import { renderRunnerJobManifest } from "../runner/k8s-job.js";
import type { RunnerSessionPvcOptions, RunnerTransientEnv } from "../runner/k8s-job.js";
import { staticWorkReadyCapabilitySummary } from "../common/work-ready.js";
import { resolveRunnerEnvImage } from "../common/env-image-ref.js";
import { ensureSessionPvc } from "./session-pvc.js";
import { gitTransportSummary } from "../common/git-transport.js";
import { enforceRunnerRetentionBeforeCreate } from "./runner-retention.js";
import type { RunnerRetentionOptions, RunnerRetentionSummary } from "./runner-retention.js";
const reusableCredentialEnvNames = new Set([
"AUTH_PASSWORD",
"CODEX_API_KEY",
"GH_TOKEN",
"GITHUB_TOKEN",
"OPENAI_API_KEY",
"PROVIDER_TOKEN",
"UNIDESK_AUTH_PASSWORD",
"UNIDESK_PROVIDER_TOKEN",
"UNIDESK_SSH_CLIENT_TOKEN",
]);
const unideskSshEndpointEnvNames = ["UNIDESK_MAIN_SERVER_IP", "UNIDESK_MAIN_SERVER_HOST", "UNIDESK_FRONTEND_URL"] as const;
const unideskSshEndpointEnvNameSet = new Set<string>(unideskSshEndpointEnvNames);
const unideskSshEndpointConfigEnvNames: Array<{ configName: string; targetName: (typeof unideskSshEndpointEnvNames)[number] }> = [
{ configName: "AGENTRUN_UNIDESK_MAIN_SERVER_IP", targetName: "UNIDESK_MAIN_SERVER_IP" },
{ configName: "AGENTRUN_UNIDESK_MAIN_SERVER_HOST", targetName: "UNIDESK_MAIN_SERVER_HOST" },
{ configName: "AGENTRUN_UNIDESK_FRONTEND_URL", targetName: "UNIDESK_FRONTEND_URL" },
{ configName: "UNIDESK_MAIN_SERVER_IP", targetName: "UNIDESK_MAIN_SERVER_IP" },
{ configName: "UNIDESK_MAIN_SERVER_HOST", targetName: "UNIDESK_MAIN_SERVER_HOST" },
{ configName: "UNIDESK_FRONTEND_URL", targetName: "UNIDESK_FRONTEND_URL" },
];
export interface RunnerJobDefaults {
namespace: string;
managerUrl: string;
image: string;
bootRepoUrl?: string;
sourceCommit: string;
envIdentity?: string;
artifactCatalogFile?: string;
serviceAccountName?: string;
jobNamePrefix?: string;
lane?: string;
runnerIdleTimeoutMs?: number;
missingTerminalAfterToolTimeoutMs?: number;
backendRetryMaxAttempts?: number;
backendRetryInitialBackoffMs?: number;
backendRetryMaxBackoffMs?: number;
kubectlCommand?: string;
unideskSshEndpointEnv?: JsonRecord;
retention?: RunnerRetentionOptions;
}
export interface CreateRunnerJobInput extends JsonRecord {
commandId: string;
managerUrl?: string;
image?: string;
namespace?: string;
attemptId?: string;
runnerId?: string;
sourceCommit?: string;
serviceAccountName?: string;
runnerIdleTimeoutMs?: number;
missingTerminalAfterToolTimeoutMs?: number;
backendRetryMaxAttempts?: number;
backendRetryInitialBackoffMs?: number;
backendRetryMaxBackoffMs?: number;
idempotencyKey?: string;
imageRef?: JsonRecord;
transientEnv?: JsonRecord[];
}
export async function createKubernetesRunnerJob(options: { store: AgentRunStore; runId: string; input: CreateRunnerJobInput; defaults: RunnerJobDefaults }): Promise<JsonRecord> {
const commandId = stringField(options.input, "commandId");
const run = await options.store.getRun(options.runId);
const command = await options.store.getCommand(commandId);
if (command.runId !== run.id) throw new AgentRunError("schema-invalid", `command ${commandId} does not belong to run ${run.id}`, { httpStatus: 400 });
if (command.type !== "turn") throw new AgentRunError("schema-invalid", `command ${commandId} is not a turn command`, { httpStatus: 400 });
const envImage = await resolveRunnerEnvImage({
...(options.input.imageRef !== undefined ? { imageRef: options.input.imageRef } : {}),
...(optionalString(options.input.image) ? { explicitImage: optionalString(options.input.image) as string } : {}),
defaultImage: options.defaults.image,
...(options.defaults.envIdentity ? { envIdentity: options.defaults.envIdentity } : {}),
...(options.defaults.artifactCatalogFile ? { artifactCatalogFile: options.defaults.artifactCatalogFile } : {}),
});
const image = envImage.image;
const namespace = optionalString(options.input.namespace) ?? options.defaults.namespace;
const managerUrl = optionalString(options.input.managerUrl) ?? options.defaults.managerUrl;
const sourceCommit = optionalString(options.input.sourceCommit) ?? options.defaults.sourceCommit;
const serviceAccountName = optionalString(options.input.serviceAccountName) ?? options.defaults.serviceAccountName;
const jobNamePrefix = options.defaults.jobNamePrefix;
const lane = options.defaults.lane;
const idempotencyKey = optionalString(options.input.idempotencyKey);
const transientEnv = assembleToolContextTransientEnv(run.executionPolicy, transientEnvField(options.input.transientEnv), options.defaults);
const attemptId = optionalString(options.input.attemptId) ?? `attempt_${Date.now().toString(36)}`;
const runnerId = optionalString(options.input.runnerId);
const runnerIdleTimeoutMs = optionalPositiveInteger(options.input.runnerIdleTimeoutMs, "runnerIdleTimeoutMs") ?? options.defaults.runnerIdleTimeoutMs;
const missingTerminalAfterToolTimeoutMs = optionalPositiveInteger(options.input.missingTerminalAfterToolTimeoutMs, "missingTerminalAfterToolTimeoutMs") ?? options.defaults.missingTerminalAfterToolTimeoutMs;
const backendRetryMaxAttempts = optionalPositiveInteger(options.input.backendRetryMaxAttempts, "backendRetryMaxAttempts") ?? options.defaults.backendRetryMaxAttempts;
const backendRetryInitialBackoffMs = optionalPositiveInteger(options.input.backendRetryInitialBackoffMs, "backendRetryInitialBackoffMs") ?? options.defaults.backendRetryInitialBackoffMs;
const backendRetryMaxBackoffMs = optionalPositiveInteger(options.input.backendRetryMaxBackoffMs, "backendRetryMaxBackoffMs") ?? options.defaults.backendRetryMaxBackoffMs;
const transientEnvSecretName = transientEnv.length > 0 ? transientEnvSecretNameForRun(run.id, commandId, attemptId, jobNamePrefix) : null;
const renderTransientEnv = transientEnvSecretName ? transientEnvWithSecretRefs(transientEnv, transientEnvSecretName) : transientEnv;
const normalizedPayload = {
commandId,
image,
namespace,
managerUrl,
sourceCommit,
envImage,
serviceAccountName: serviceAccountName ?? null,
attemptId: optionalString(options.input.attemptId) ?? null,
runnerId: optionalString(options.input.runnerId) ?? null,
runnerIdleTimeoutMs: runnerIdleTimeoutMs ?? null,
missingTerminalAfterToolTimeoutMs: missingTerminalAfterToolTimeoutMs ?? null,
backendRetryMaxAttempts: backendRetryMaxAttempts ?? null,
backendRetryInitialBackoffMs: backendRetryInitialBackoffMs ?? null,
backendRetryMaxBackoffMs: backendRetryMaxBackoffMs ?? null,
transientEnv: transientEnv.map((item) => ({ name: item.name, valueHash: stableHash(item.value), sensitive: true })),
};
const payloadHash = stableHash(normalizedPayload);
if (idempotencyKey) {
const existing = await options.store.getRunnerJobByIdempotencyKey(run.id, idempotencyKey, payloadHash);
if (existing) return { ...existing.result, idempotentReplay: true };
}
if (isTerminalRunStatus(run.status)) throw new AgentRunError(run.failureKind ?? (run.status === "cancelled" ? "cancelled" : "schema-invalid"), `run ${run.id} is already terminal: ${run.status}`, { httpStatus: 409 });
if (isTerminalCommandState(command.state) || command.state !== "pending") throw new AgentRunError(command.state === "cancelled" ? "cancelled" : "schema-invalid", `command ${commandId} is not pending: ${command.state}`, { httpStatus: 409 });
if (run.claimedBy && !isLeaseExpired(run.leaseExpiresAt)) {
const response = {
action: "reuse-active-runner",
mutation: false,
runId: run.id,
commandId,
runnerId: run.claimedBy,
leaseExpiresAt: run.leaseExpiresAt,
reason: "fresh-active-runner-lease",
valuesPrinted: false,
} satisfies JsonRecord;
await options.store.appendEvent(run.id, "backend_status", {
phase: "runner-job-skipped-active-runner",
commandId,
runnerId: run.claimedBy,
leaseExpiresAt: run.leaseExpiresAt,
reason: "fresh-active-runner-lease",
});
return response;
}
let preCreateRetention: RunnerRetentionSummary | null = null;
if (options.defaults.retention) {
preCreateRetention = await enforceRunnerRetentionBeforeCreate({ store: options.store, options: { ...options.defaults.retention, namespace }, incomingRunnerCount: 1 });
}
let sessionPvc: RunnerSessionPvcOptions | undefined;
let sessionPvcSummary: JsonRecord | null = null;
if (run.sessionRef?.sessionId) {
const session = await options.store.getSession(run.sessionRef.sessionId);
if (session?.storageKind === "evicted") {
throw new AgentRunError("session-store-evicted", `session ${session.sessionId} storage has been evicted; create a new sessionId`, { httpStatus: 409, details: { sessionId: session.sessionId, pvcName: session.storagePvcName ?? null, pvcPhase: session.storagePvcPhase ?? null, valuesPrinted: false } });
}
if (!session) throw new AgentRunError("schema-invalid", `session ${run.sessionRef.sessionId} was not found`, { httpStatus: 404 });
const ensured = await ensureSessionPvc({
store: options.store,
sessionId: run.sessionRef.sessionId,
namespace,
options: { ...(options.defaults.kubectlCommand ? { kubectlCommand: options.defaults.kubectlCommand } : {}), defaultCodexRolloutSubdir: session.codexRolloutSubdir ?? "sessions" },
});
const refreshed = await options.store.getSession(run.sessionRef.sessionId);
if (refreshed?.storageKind === "evicted") {
throw new AgentRunError("session-store-evicted", `session ${refreshed.sessionId} storage has been evicted; create a new sessionId`, { httpStatus: 409, details: { sessionId: refreshed.sessionId, pvcName: refreshed.storagePvcName ?? null, pvcPhase: refreshed.storagePvcPhase ?? null, valuesPrinted: false } });
}
const pvcName = refreshed?.storagePvcName ?? ensured.pvcName;
if (!pvcName) throw new AgentRunError("infra-failed", `session ${run.sessionRef.sessionId} PVC was not resolved for runner job`, { httpStatus: 502 });
const subdir = refreshed?.codexRolloutSubdir ?? ensured.codexRolloutSubdir ?? "sessions";
const mountPath = `/home/agentrun/.agentrun-sessions/${subdir}`;
const workspacePath = `${mountPath}/agentrun-workspace`;
sessionPvc = { pvcName, namespace: refreshed?.storageNamespace ?? ensured.namespace ?? namespace, mountPath, codexRolloutSubdir: subdir, workspacePath };
sessionPvcSummary = {
sessionId: run.sessionRef.sessionId,
pvcName: sessionPvc.pvcName,
namespace: sessionPvc.namespace,
pvcPhase: refreshed?.storagePvcPhase ?? ensured.pvcPhase ?? null,
mountPath: sessionPvc.mountPath,
codexRolloutSubdir: sessionPvc.codexRolloutSubdir,
workspacePath: sessionPvc.workspacePath ?? null,
valuesPrinted: false,
};
if (ensured.pvcPhase === "NotFound" || ensured.pvcPhase === "Unknown") {
throw new AgentRunError("infra-failed", `session ${run.sessionRef.sessionId} PVC is not ready for runner job`, { httpStatus: 502, details: sessionPvcSummary });
}
}
const renderOptions: Parameters<typeof renderRunnerJobManifest>[0] = {
run,
commandId,
managerUrl,
image,
...(options.defaults.bootRepoUrl ? { bootRepoUrl: options.defaults.bootRepoUrl } : {}),
namespace,
sourceCommit,
transientEnv: renderTransientEnv,
...(runnerIdleTimeoutMs !== undefined ? { runnerIdleTimeoutMs } : {}),
...(missingTerminalAfterToolTimeoutMs !== undefined ? { missingTerminalAfterToolTimeoutMs } : {}),
...(backendRetryMaxAttempts !== undefined ? { backendRetryMaxAttempts } : {}),
...(backendRetryInitialBackoffMs !== undefined ? { backendRetryInitialBackoffMs } : {}),
...(backendRetryMaxBackoffMs !== undefined ? { backendRetryMaxBackoffMs } : {}),
...(serviceAccountName ? { serviceAccountName } : {}),
...(jobNamePrefix ? { jobNamePrefix } : {}),
...(lane ? { lane } : {}),
...(sessionPvc ? { sessionPvc } : {}),
};
const render = renderRunnerJobManifest({ ...renderOptions, attemptId, ...(runnerId ? { runnerId } : {}) });
const kubectlCommand = options.defaults.kubectlCommand ?? "kubectl";
let transientEnvSecretCreated = false;
let transientEnvSecretOwnerAttached = false;
let created: JsonRecord | null = null;
try {
if (transientEnvSecretName) {
await kubectlCreate(transientEnvSecretManifest({ namespace: render.namespace, name: transientEnvSecretName, runId: run.id, commandId, attemptId: render.attemptId, runnerId: render.runnerId, jobName: render.jobName, lane: lane ?? "v0.1", items: transientEnv }), kubectlCommand, "runner transient env secret");
transientEnvSecretCreated = true;
}
created = await kubectlCreate(render.manifest, kubectlCommand, "runner job");
} catch (error) {
if (transientEnvSecretName && transientEnvSecretCreated) await kubectlDeleteSecret(transientEnvSecretName, render.namespace, kubectlCommand);
throw error;
}
if (!created) throw new AgentRunError("infra-failed", "kubectl did not return created runner job metadata", { httpStatus: 502 });
if (transientEnvSecretName) {
const owner = await kubectlPatchSecretOwnerReference(transientEnvSecretName, render.namespace, { name: render.jobName, uid: objectPath(created, ["metadata", "uid"]) }, kubectlCommand);
transientEnvSecretOwnerAttached = owner.ok;
if (!owner.ok) render.warnings.push("transientEnv Secret ownerReference patch failed; Kubernetes TTL may not garbage-collect this per-job Secret automatically");
}
const transientEnvSecretResponse = transientEnvSecretName ? summarizeTransientEnvSecret(transientEnvSecretName, render.namespace, transientEnv, render.jobName, transientEnvSecretOwnerAttached) : null;
const response = {
action: "create-kubernetes-job",
mutation: true,
runId: run.id,
commandId,
runnerJobId: render.runnerJobId,
attemptId: render.attemptId,
runnerId: render.runnerId,
namespace: render.namespace,
jobName: render.jobName,
image,
jobIdentity: {
kind: "Job",
namespace: render.namespace,
name: render.jobName,
serviceAccountName: render.serviceAccountName,
uid: objectPath(created, ["metadata", "uid"]),
},
runner: {
runId: run.id,
commandId,
runnerJobId: render.runnerJobId,
attemptId: render.attemptId,
runnerId: render.runnerId,
backendProfile: run.backendProfile,
managerUrl,
image,
sourceCommit,
placement: "kubernetes-job",
logPath: `kubectl -n ${render.namespace} logs job/${render.jobName}`,
},
envImage,
secretRefs: render.secretRefs.map((item) => ({ profile: item.profile, name: item.secretRef.name, namespace: item.secretRef.namespace ?? render.namespace, keys: item.secretRef.keys ?? [], mountPath: item.runtimeMountPath, projectionPath: item.projectionMountPath, writableCopy: true, valuesPrinted: false })),
toolCredentials: summarizeToolCredentials(render.toolCredentials, render.namespace),
gitTransport: gitTransportSummary(),
transientEnv: summarizeTransientEnv(transientEnv),
transientEnvSecret: transientEnvSecretResponse,
sessionPvc: sessionPvcSummary,
workReady: staticWorkReadyCapabilitySummary(),
retention: {
ttlSecondsAfterFinished: render.ttlSecondsAfterFinished,
ttlPolicy: render.ttlPolicy,
runnerIdleTimeoutMs: render.runnerIdleTimeoutMs,
backendRetryMaxAttempts: render.backendRetryMaxAttempts,
backendRetryInitialBackoffMs: render.backendRetryInitialBackoffMs,
backendRetryMaxBackoffMs: render.backendRetryMaxBackoffMs,
preCreateCleanup: preCreateRetention,
},
pollActions: [
runnerJobActionDescriptor({ action: "inspect-run", operation: "describe", resourceKind: "run", resourceName: run.id, runId: run.id }),
runnerJobActionDescriptor({ action: "inspect-command", operation: "describe", resourceKind: "command", resourceName: commandId, runId: run.id, commandId }),
runnerJobActionDescriptor({ action: "poll-events", operation: "events", resourceKind: "run", resourceName: run.id, runId: run.id, commandId, afterSeq: 0, limit: 100 }),
],
warnings: render.warnings,
kubernetes: {
created: true,
valuesPrinted: false,
apiVersion: objectPath(created, ["apiVersion"]),
kind: objectPath(created, ["kind"]),
resourceVersion: objectPath(created, ["metadata", "resourceVersion"]),
},
} satisfies JsonRecord;
const saved = await options.store.saveRunnerJob({
id: render.runnerJobId,
runId: run.id,
commandId,
idempotencyKey: idempotencyKey ?? null,
payloadHash,
attemptId: render.attemptId,
runnerId: render.runnerId,
namespace: render.namespace,
jobName: render.jobName,
managerUrl,
image,
sourceCommit,
serviceAccountName: serviceAccountName ?? null,
result: response,
});
await options.store.appendEvent(run.id, "backend_status", {
phase: "runner-job-created",
commandId,
runnerJobId: saved.id,
attemptId: saved.attemptId,
runnerId: saved.runnerId,
namespace: saved.namespace,
jobName: saved.jobName,
idempotencyKey: idempotencyKey ? "present" : null,
transientEnv: summarizeTransientEnv(transientEnv),
transientEnvSecret: transientEnvSecretResponse,
envImage,
gitTransport: gitTransportSummary(),
workReady: staticWorkReadyCapabilitySummary(),
retention: preCreateRetention,
toolCredentials: summarizeToolCredentials(render.toolCredentials, render.namespace),
sessionPvc: sessionPvcSummary,
sessionRef: summarizeSessionRef(run.sessionRef ?? null),
resourceBundleRef: summarizeResourceBundleRef(run.resourceBundleRef ?? null),
});
return response;
}
function transientEnvField(value: unknown): RunnerTransientEnv[] {
if (value === undefined) return [];
if (!Array.isArray(value)) throw new AgentRunError("schema-invalid", "transientEnv must be an array", { httpStatus: 400 });
const seen = new Set<string>();
return value.map((entry, index) => {
if (!entry || typeof entry !== "object" || Array.isArray(entry)) throw new AgentRunError("schema-invalid", `transientEnv[${index}] must be an object`, { httpStatus: 400 });
const record = entry as JsonRecord;
const name = stringField(record, "name");
validateEnvName(name, `transientEnv[${index}].name`);
if (reusableCredentialEnvNames.has(name)) throw new AgentRunError("tenant-policy-denied", `transientEnv ${name} must use tool/provider credential assembly instead`, { httpStatus: 403 });
if (seen.has(name)) throw new AgentRunError("schema-invalid", `transientEnv name ${name} is duplicated`, { httpStatus: 400 });
seen.add(name);
const rawValue = record.value;
if (typeof rawValue !== "string" || rawValue.length === 0) throw new AgentRunError("schema-invalid", `transientEnv[${index}].value must be a non-empty string`, { httpStatus: 400 });
if (Buffer.byteLength(rawValue, "utf8") > 8192) throw new AgentRunError("schema-invalid", `transientEnv[${index}].value is too large`, { httpStatus: 400 });
return { name, value: rawValue, sensitive: true };
});
}
function assembleToolContextTransientEnv(policy: ExecutionPolicy, provided: RunnerTransientEnv[], defaults: RunnerJobDefaults): RunnerTransientEnv[] {
if (!usesUnideskSsh(policy)) return provided;
if (provided.some((item) => unideskSshEndpointEnvNameSet.has(item.name))) return provided;
const endpoint = defaultUnideskSshEndpointEnv(defaults);
if (!endpoint) {
throw new AgentRunError("schema-invalid", "unidesk-ssh tool credential requires runner-job transientEnv UNIDESK_MAIN_SERVER_IP, UNIDESK_MAIN_SERVER_HOST, or UNIDESK_FRONTEND_URL", {
httpStatus: 400,
details: { tool: "unidesk-ssh", requiredEnv: [...unideskSshEndpointEnvNames], valuesPrinted: false },
});
}
return [...provided, endpoint];
}
function usesUnideskSsh(policy: ExecutionPolicy): boolean {
return (policy.secretScope.toolCredentials ?? []).some((item) => item.tool === "unidesk-ssh");
}
function defaultUnideskSshEndpointEnv(defaults: RunnerJobDefaults): RunnerTransientEnv | null {
const fromDefaults = defaults.unideskSshEndpointEnv;
if (fromDefaults !== undefined) return unideskSshEndpointEnvFromRecord(fromDefaults, "runnerJobDefaults.unideskSshEndpointEnv");
for (const item of unideskSshEndpointConfigEnvNames) {
const value = optionalString(process.env[item.configName]);
if (value) return { name: item.targetName, value, sensitive: true };
}
return null;
}
function unideskSshEndpointEnvFromRecord(record: JsonRecord, fieldName: string): RunnerTransientEnv {
const name = stringField(record, "name");
validateEnvName(name, `${fieldName}.name`);
if (!unideskSshEndpointEnvNameSet.has(name)) {
throw new AgentRunError("schema-invalid", `${fieldName}.name must be one of ${unideskSshEndpointEnvNames.join(", ")}`, { httpStatus: 400, details: { requiredEnv: [...unideskSshEndpointEnvNames], valuesPrinted: false } });
}
const rawValue = record.value;
if (typeof rawValue !== "string" || rawValue.length === 0) throw new AgentRunError("schema-invalid", `${fieldName}.value must be a non-empty string`, { httpStatus: 400 });
if (Buffer.byteLength(rawValue, "utf8") > 8192) throw new AgentRunError("schema-invalid", `${fieldName}.value is too large`, { httpStatus: 400 });
return { name, value: rawValue, sensitive: true };
}
function summarizeToolCredentials(items: Array<{ tool: string; purpose: string | null; secretRef: { namespace?: string; name: string; keys?: string[] }; kind?: string; envName?: string; secretKey?: string; mountPath?: string }>, namespace: string): JsonRecord {
return {
count: items.length,
items: items.map((item) => ({
tool: item.tool,
purpose: item.purpose,
name: item.secretRef.name,
namespace: item.secretRef.namespace ?? namespace,
keys: item.secretRef.keys ?? [],
projection: item.kind === "volume" ? { kind: "volume", mountPath: item.mountPath ?? null } : { kind: "env", envName: item.envName ?? null, secretKey: item.secretKey ?? null },
valuesPrinted: false,
})),
valuesPrinted: false,
};
}
function summarizeTransientEnv(items: RunnerTransientEnv[]): JsonRecord {
return {
count: items.length,
names: items.map((item) => item.name),
valuesPrinted: false,
};
}
function transientEnvWithSecretRefs(items: RunnerTransientEnv[], secretName: string): RunnerTransientEnv[] {
return items.map((item) => ({ ...item, secretRef: { name: secretName, key: item.name } }));
}
function transientEnvSecretNameForRun(runId: string, commandId: string, attemptId: string, jobNamePrefix: string | undefined): string {
const prefix = (jobNamePrefix ?? "agentrun-v01-runner").toLowerCase().replace(/[^a-z0-9-]+/gu, "-").replace(/^-+|-+$/gu, "") || "agentrun-runner";
return `${prefix.slice(0, 36).replace(/-+$/u, "")}-env-${stableHash({ runId, commandId, attemptId }).slice(0, 20)}`;
}
function transientEnvSecretManifest(options: { namespace: string; name: string; runId: string; commandId: string; attemptId: string; runnerId: string; jobName: string; lane: string; items: RunnerTransientEnv[] }): JsonRecord {
const stringData: JsonRecord = {};
for (const item of options.items) stringData[item.name] = item.value;
return {
apiVersion: "v1",
kind: "Secret",
metadata: {
name: options.name,
namespace: options.namespace,
labels: {
"app.kubernetes.io/name": "agentrun-runner",
"app.kubernetes.io/component": "runner-transient-env",
"app.kubernetes.io/part-of": "agentrun",
"agentrun.pikastech.local/lane": options.lane,
"agentrun.pikastech.local/run-hash": stableHash(options.runId).slice(0, 12),
"agentrun.pikastech.local/runner-job": options.jobName,
},
annotations: {
"agentrun.pikastech.local/run-id": options.runId,
"agentrun.pikastech.local/command-id": options.commandId,
"agentrun.pikastech.local/attempt-id": options.attemptId,
"agentrun.pikastech.local/runner-id": options.runnerId,
"agentrun.pikastech.local/runner-job": options.jobName,
"agentrun.pikastech.local/values-printed": "false",
},
},
type: "Opaque",
stringData,
};
}
function summarizeTransientEnvSecret(name: string, namespace: string, items: RunnerTransientEnv[], jobName: string, ownerReferenceAttached: boolean): JsonRecord {
return {
name,
namespace,
keys: items.map((item) => item.name),
valuesPrinted: false,
ownerReference: { kind: "Job", name: jobName, attached: ownerReferenceAttached },
};
}
async function kubectlCreate(manifest: JsonRecord, kubectlCommand: string, label = "runner job"): Promise<JsonRecord> {
const child = spawn(kubectlCommand, ["create", "-f", "-", "-o", "json"], { stdio: ["pipe", "pipe", "pipe"] });
let stdout = "";
let stderr = "";
child.stdout.setEncoding("utf8");
child.stderr.setEncoding("utf8");
child.stdout.on("data", (chunk) => { stdout += String(chunk); });
child.stderr.on("data", (chunk) => { stderr += String(chunk); });
child.stdin.end(`${JSON.stringify(manifest)}\n`);
const result = await new Promise<{ code: number | null; signal: NodeJS.Signals | null }>((resolve, reject) => {
child.on("error", reject);
child.on("close", (code, signal) => resolve({ code, signal }));
}).catch((error: unknown) => {
throw new AgentRunError("infra-failed", `failed to start kubectl: ${error instanceof Error ? error.message : String(error)}`, { httpStatus: 503 });
});
if (result.code !== 0) {
throw new AgentRunError("infra-failed", `kubectl create ${label} failed with code ${result.code}`, { httpStatus: 502, details: redactJson({ stderr: redactText(stderr.slice(-4000)), stdout: redactText(stdout.slice(-2000)), signal: result.signal }) });
}
try {
const parsed = JSON.parse(stdout) as unknown;
if (typeof parsed === "object" && parsed !== null && !Array.isArray(parsed)) return redactJson(parsed as JsonRecord);
} catch (error) {
throw new AgentRunError("infra-failed", `kubectl returned invalid JSON: ${error instanceof Error ? error.message : String(error)}`, { httpStatus: 502, details: { stdoutPreview: redactText(stdout.slice(0, 1000)) } });
}
throw new AgentRunError("infra-failed", "kubectl returned non-object JSON", { httpStatus: 502 });
}
async function kubectlDeleteSecret(name: string, namespace: string, kubectlCommand: string): Promise<void> {
await kubectlRun(kubectlCommand, ["delete", "secret", name, "-n", namespace, "--ignore-not-found=true"]);
}
async function kubectlPatchSecretOwnerReference(name: string, namespace: string, owner: { name: string; uid: string | null }, kubectlCommand: string): Promise<{ ok: boolean }> {
if (!owner.uid) return { ok: false };
const patch = {
metadata: {
ownerReferences: [{ apiVersion: "batch/v1", kind: "Job", name: owner.name, uid: owner.uid, controller: false, blockOwnerDeletion: false }],
},
};
const result = await kubectlRun(kubectlCommand, ["patch", "secret", name, "-n", namespace, "--type", "merge", "-p", JSON.stringify(patch), "-o", "json"]);
return { ok: result.code === 0 };
}
async function kubectlRun(kubectlCommand: string, args: string[]): Promise<{ code: number | null; signal: NodeJS.Signals | null; stdout: string; stderr: string }> {
const child = spawn(kubectlCommand, args, { stdio: ["ignore", "pipe", "pipe"] });
let stdout = "";
let stderr = "";
child.stdout.setEncoding("utf8");
child.stderr.setEncoding("utf8");
child.stdout.on("data", (chunk) => { stdout += String(chunk); });
child.stderr.on("data", (chunk) => { stderr += String(chunk); });
const result = await new Promise<{ code: number | null; signal: NodeJS.Signals | null }>((resolve, reject) => {
child.on("error", reject);
child.on("close", (code, signal) => resolve({ code, signal }));
}).catch((error: unknown) => {
throw new AgentRunError("infra-failed", `failed to start kubectl: ${error instanceof Error ? error.message : String(error)}`, { httpStatus: 503 });
});
return { ...result, stdout: redactText(stdout), stderr: redactText(stderr) };
}
function stringField(record: JsonRecord, key: string): string {
const value = record[key];
if (typeof value !== "string" || value.trim().length === 0) throw new AgentRunError("schema-invalid", `${key} is required`, { httpStatus: 400 });
return value.trim();
}
function runnerJobActionDescriptor(input: { action: string; operation: string; resourceKind: string; resourceName: string; runId?: string | null; commandId?: string | null; afterSeq?: number | null; limit?: number | null }): JsonRecord {
return {
action: input.action,
operation: input.operation,
resourceKind: input.resourceKind,
resourceName: input.resourceName,
runId: input.runId ?? null,
commandId: input.commandId ?? null,
...(input.afterSeq !== undefined ? { afterSeq: input.afterSeq } : {}),
...(input.limit !== undefined ? { limit: input.limit } : {}),
valuesPrinted: false,
};
}
function optionalString(value: unknown): string | undefined {
return typeof value === "string" && value.trim().length > 0 ? value.trim() : undefined;
}
function optionalPositiveInteger(value: unknown, key: string): number | undefined {
if (value === undefined || value === null) return undefined;
if (!Number.isInteger(value) || Number(value) <= 0) throw new AgentRunError("schema-invalid", `${key} must be a positive integer`, { httpStatus: 400 });
return Number(value);
}
function objectPath(value: unknown, path: string[]): string | null {
let current: unknown = value;
for (const key of path) {
if (typeof current !== "object" || current === null || Array.isArray(current)) return null;
current = (current as Record<string, unknown>)[key];
}
return typeof current === "string" ? current : null;
}