feat: 支持 runner transient env
This commit is contained in:
@@ -6,6 +6,7 @@ import type { AgentRunStore } from "./store.js";
|
||||
import type { JsonRecord } from "../common/types.js";
|
||||
import { stableHash } from "../common/validation.js";
|
||||
import { renderRunnerJobManifest } from "../runner/k8s-job.js";
|
||||
import type { RunnerTransientEnv } from "../runner/k8s-job.js";
|
||||
|
||||
export interface RunnerJobDefaults {
|
||||
namespace: string;
|
||||
@@ -26,6 +27,7 @@ export interface CreateRunnerJobInput extends JsonRecord {
|
||||
sourceCommit?: string;
|
||||
serviceAccountName?: string;
|
||||
idempotencyKey?: string;
|
||||
transientEnv?: JsonRecord[];
|
||||
}
|
||||
|
||||
export async function createKubernetesRunnerJob(options: { store: AgentRunStore; runId: string; input: CreateRunnerJobInput; defaults: RunnerJobDefaults }): Promise<JsonRecord> {
|
||||
@@ -42,7 +44,18 @@ export async function createKubernetesRunnerJob(options: { store: AgentRunStore;
|
||||
const sourceCommit = optionalString(options.input.sourceCommit) ?? options.defaults.sourceCommit;
|
||||
const serviceAccountName = optionalString(options.input.serviceAccountName) ?? options.defaults.serviceAccountName;
|
||||
const idempotencyKey = optionalString(options.input.idempotencyKey);
|
||||
const normalizedPayload = { commandId, image, namespace, managerUrl, sourceCommit, serviceAccountName: serviceAccountName ?? null, attemptId: optionalString(options.input.attemptId) ?? null, runnerId: optionalString(options.input.runnerId) ?? null };
|
||||
const transientEnv = transientEnvField(options.input.transientEnv);
|
||||
const normalizedPayload = {
|
||||
commandId,
|
||||
image,
|
||||
namespace,
|
||||
managerUrl,
|
||||
sourceCommit,
|
||||
serviceAccountName: serviceAccountName ?? null,
|
||||
attemptId: optionalString(options.input.attemptId) ?? null,
|
||||
runnerId: optionalString(options.input.runnerId) ?? null,
|
||||
transientEnv: transientEnv.map((item) => ({ name: item.name, valueHash: stableHash(item.value), sensitive: true })),
|
||||
};
|
||||
const payloadHash = stableHash(normalizedPayload);
|
||||
if (idempotencyKey) {
|
||||
const existing = await options.store.getRunnerJobByIdempotencyKey(run.id, idempotencyKey, payloadHash);
|
||||
@@ -58,6 +71,7 @@ export async function createKubernetesRunnerJob(options: { store: AgentRunStore;
|
||||
image,
|
||||
namespace,
|
||||
sourceCommit,
|
||||
transientEnv,
|
||||
...(serviceAccountName ? { serviceAccountName } : {}),
|
||||
};
|
||||
const attemptId = optionalString(options.input.attemptId);
|
||||
@@ -92,6 +106,7 @@ export async function createKubernetesRunnerJob(options: { store: AgentRunStore;
|
||||
logPath: `kubectl -n ${render.namespace} logs job/${render.jobName}`,
|
||||
},
|
||||
secretRefs: render.secretRefs.map((item) => ({ profile: item.profile, name: item.secretRef.name, namespace: item.secretRef.namespace ?? render.namespace, keys: item.secretRef.keys ?? [], mountPath: item.runtimeMountPath, projectionPath: item.projectionMountPath, writableCopy: true, valuesPrinted: false })),
|
||||
transientEnv: summarizeTransientEnv(transientEnv),
|
||||
retention: {
|
||||
ttlSecondsAfterFinished: render.ttlSecondsAfterFinished,
|
||||
},
|
||||
@@ -132,12 +147,40 @@ export async function createKubernetesRunnerJob(options: { store: AgentRunStore;
|
||||
namespace: saved.namespace,
|
||||
jobName: saved.jobName,
|
||||
idempotencyKey: idempotencyKey ? "present" : null,
|
||||
transientEnv: summarizeTransientEnv(transientEnv),
|
||||
sessionRef: summarizeSessionRef(run.sessionRef ?? null),
|
||||
resourceBundleRef: summarizeResourceBundleRef(run.resourceBundleRef ?? null),
|
||||
});
|
||||
return response;
|
||||
}
|
||||
|
||||
function transientEnvField(value: unknown): RunnerTransientEnv[] {
|
||||
if (value === undefined) return [];
|
||||
if (!Array.isArray(value)) throw new AgentRunError("schema-invalid", "transientEnv must be an array", { httpStatus: 400 });
|
||||
if (value.length > 8) throw new AgentRunError("schema-invalid", "transientEnv must contain at most 8 entries", { httpStatus: 400 });
|
||||
const seen = new Set<string>();
|
||||
return value.map((entry, index) => {
|
||||
if (!entry || typeof entry !== "object" || Array.isArray(entry)) throw new AgentRunError("schema-invalid", `transientEnv[${index}] must be an object`, { httpStatus: 400 });
|
||||
const record = entry as JsonRecord;
|
||||
const name = stringField(record, "name");
|
||||
if (!/^[A-Z_][A-Z0-9_]{0,63}$/u.test(name)) throw new AgentRunError("schema-invalid", `transientEnv[${index}].name must be an uppercase env name`, { httpStatus: 400 });
|
||||
if (seen.has(name)) throw new AgentRunError("schema-invalid", `transientEnv name ${name} is duplicated`, { httpStatus: 400 });
|
||||
seen.add(name);
|
||||
const rawValue = record.value;
|
||||
if (typeof rawValue !== "string" || rawValue.length === 0) throw new AgentRunError("schema-invalid", `transientEnv[${index}].value must be a non-empty string`, { httpStatus: 400 });
|
||||
if (Buffer.byteLength(rawValue, "utf8") > 8192) throw new AgentRunError("schema-invalid", `transientEnv[${index}].value is too large`, { httpStatus: 400 });
|
||||
return { name, value: rawValue, sensitive: true };
|
||||
});
|
||||
}
|
||||
|
||||
function summarizeTransientEnv(items: RunnerTransientEnv[]): JsonRecord {
|
||||
return {
|
||||
count: items.length,
|
||||
names: items.map((item) => item.name),
|
||||
valuesPrinted: false,
|
||||
};
|
||||
}
|
||||
|
||||
async function kubectlCreate(manifest: JsonRecord, kubectlCommand: string): Promise<JsonRecord> {
|
||||
const child = spawn(kubectlCommand, ["create", "-f", "-", "-o", "json"], { stdio: ["pipe", "pipe", "pipe"] });
|
||||
let stdout = "";
|
||||
|
||||
Reference in New Issue
Block a user