Files
pikasTech-agentrun/src/runner/resource-bundle.ts
T
2026-06-09 01:06:19 +08:00

416 lines
20 KiB
TypeScript

import { spawn } from "node:child_process";
import { createHash } from "node:crypto";
import { chmod, cp, mkdir, readdir, readFile, rm, stat } from "node:fs/promises";
import path from "node:path";
import { AgentRunError } from "../common/errors.js";
import { redactText } from "../common/redaction.js";
import type { InitialPromptAssembly, JsonRecord, ResourceBundleRef } from "../common/types.js";
import { stableHash } from "../common/validation.js";
const maxPromptRefBytes = 16 * 1024;
const maxInitialPromptBytes = 64 * 1024;
const skillSummaryChars = 600;
export interface MaterializedResourceBundle {
workspacePath: string;
binPath?: string;
skillsDir?: string;
initialPrompt?: InitialPromptAssembly;
event: JsonRecord;
}
interface MaterializedPromptRef {
name: string;
path: string;
inject: "thread-start";
required: boolean;
text: string;
bytes: number;
sha256: string;
}
interface MaterializedSkillRef {
name: string;
path: string;
aggregateAs: string;
required: boolean;
registryPath: string;
manifestBytes: number;
manifestSha256: string;
summary: string;
}
interface GitCheckout {
repoUrl: string;
commitId: string;
requestedCommitId?: string;
requestedRef?: string;
checkoutPath: string;
treeId: string;
}
interface GitBundleSource {
repoUrl: string;
commitId?: string;
ref?: string;
}
interface MaterializedGitBundle {
name: string | null;
repoUrl: string;
commitId: string;
requestedCommitId: string | null;
requestedRef: string | null;
subpath: string;
targetPath: string;
sourceKind: "file" | "directory";
sourceBytes: number | null;
}
export async function materializeResourceBundle(resourceBundleRef: ResourceBundleRef | null | undefined, env: NodeJS.ProcessEnv = process.env): Promise<MaterializedResourceBundle | null> {
if (!resourceBundleRef) return null;
const workspaceRoot = path.resolve(env.AGENTRUN_WORKSPACE_ROOT ?? "/home/agentrun/workspaces");
const runScope = env.AGENTRUN_RUN_ID ?? env.AGENTRUN_ATTEMPT_ID ?? "standalone";
const assemblyRoot = path.join(workspaceRoot, `gitbundle-${stableHash({ runScope, resourceBundleRef }).slice(0, 16)}`);
const checkoutRoot = path.join(assemblyRoot, "checkouts");
const workspacePath = path.join(assemblyRoot, "workspace");
await rm(assemblyRoot, { recursive: true, force: true });
await mkdir(checkoutRoot, { recursive: true });
await mkdir(workspacePath, { recursive: true });
const defaultSource = defaultGitBundleSource(resourceBundleRef, env);
const checkoutCache = new Map<string, Promise<GitCheckout>>();
const checkoutFor = (source: GitBundleSource) => {
const key = stableHash(gitSourceIdentity(source));
let checkout = checkoutCache.get(key);
if (!checkout) {
checkout = checkoutGitSource(checkoutRoot, source);
checkoutCache.set(key, checkout);
}
return checkout;
};
const defaultCheckout = await checkoutFor(defaultSource);
const materializedBundles = await materializeGitBundles(workspacePath, resourceBundleRef, defaultSource, defaultCheckout, checkoutFor);
const tools = await prepareGitBundleTools(workspacePath, env);
const skills = await discoverGitBundleSkills(workspacePath);
const prompts = await materializePromptRefs(defaultCheckout.checkoutPath, resourceBundleRef.promptRefs ?? []);
const initialPrompt = assembleInitialPrompt(prompts.items, skills.items);
return {
workspacePath,
...(tools.binPath ? { binPath: tools.binPath } : {}),
...(skills.skillsDir ? { skillsDir: skills.skillsDir } : {}),
...(initialPrompt ? { initialPrompt } : {}),
event: {
phase: "resource-bundle-materialized",
kind: "gitbundle",
repoUrl: resourceBundleRef.repoUrl,
commitId: defaultCheckout.commitId,
requestedCommitId: resourceBundleRef.commitId ?? null,
requestedRef: defaultCheckout.requestedRef ?? null,
treeId: defaultCheckout.treeId,
checkoutPath: pathSummary(defaultCheckout.checkoutPath),
workspacePath: pathSummary(workspacePath),
bundles: {
count: materializedBundles.length,
items: materializedBundles.map((item) => ({ ...item, valuesPrinted: false })),
valuesPrinted: false,
},
tools: tools.event,
skillDirs: skills.event,
promptRefs: prompts.event,
initialPrompt: initialPrompt?.summary ?? { available: false, bytes: 0, sha256: null, promptRefCount: prompts.items.length, skillCount: skills.items.length, valuesPrinted: false },
valuesPrinted: false,
},
};
}
function defaultGitBundleSource(resourceBundleRef: ResourceBundleRef, env: NodeJS.ProcessEnv): GitBundleSource {
const ref = optionalNonEmpty(resourceBundleRef.ref) ?? optionalNonEmpty(env.AGENTRUN_RESOURCE_BUNDLE_REF) ?? optionalNonEmpty(env.AGENTRUN_WORKSPACE_REF) ?? optionalNonEmpty(env.AGENTRUN_WORKSPACE_BRANCH);
if (ref) return { repoUrl: resourceBundleRef.repoUrl, ref };
const commitId = optionalNonEmpty(resourceBundleRef.commitId);
if (commitId) return { repoUrl: resourceBundleRef.repoUrl, commitId };
return { repoUrl: resourceBundleRef.repoUrl, ref: "HEAD" };
}
function bundleGitSource(bundle: ResourceBundleRef["bundles"][number], resourceBundleRef: ResourceBundleRef, defaultSource: GitBundleSource): GitBundleSource {
const repoUrl = bundle.repoUrl ?? resourceBundleRef.repoUrl;
const ref = optionalNonEmpty(bundle.ref);
if (ref) return { repoUrl, ref };
const commitId = optionalNonEmpty(bundle.commitId);
if (commitId) return { repoUrl, commitId };
if (repoUrl === defaultSource.repoUrl) return defaultSource;
if (defaultSource.ref) return { repoUrl, ref: defaultSource.ref };
if (defaultSource.commitId) return { repoUrl, commitId: defaultSource.commitId };
return { repoUrl, ref: "HEAD" };
}
async function checkoutGitSource(checkoutRoot: string, source: GitBundleSource): Promise<GitCheckout> {
const checkoutPath = path.join(checkoutRoot, stableHash(gitSourceIdentity(source)).slice(0, 16));
await mkdir(checkoutPath, { recursive: true });
await git(["init"], checkoutPath);
await git(["remote", "remove", "origin"], checkoutPath, { allowFailure: true });
await git(["remote", "add", "origin", source.repoUrl], checkoutPath);
if (source.ref) {
await git(["fetch", "--depth", "1", "origin", source.ref], checkoutPath);
await git(["checkout", "--detach", "FETCH_HEAD"], checkoutPath);
} else if (source.commitId) {
await git(["fetch", "--depth", "1", "origin", source.commitId], checkoutPath);
await git(["checkout", "--detach", source.commitId], checkoutPath);
} else {
throw new AgentRunError("schema-invalid", "gitbundle source must include repo ref or commit", { httpStatus: 400 });
}
const actualCommit = (await git(["rev-parse", "HEAD"], checkoutPath)).stdout.trim();
if (source.commitId && actualCommit !== source.commitId) throw new AgentRunError("infra-failed", "gitbundle checkout did not land on requested commit", { httpStatus: 500, details: { expectedCommit: source.commitId, actualCommit } });
const treeId = (await git(["rev-parse", "HEAD^{tree}"], checkoutPath)).stdout.trim();
return { repoUrl: source.repoUrl, commitId: actualCommit, ...(source.commitId ? { requestedCommitId: source.commitId } : {}), ...(source.ref ? { requestedRef: source.ref } : {}), checkoutPath, treeId };
}
function gitSourceIdentity(source: GitBundleSource): JsonRecord {
return { repoUrl: source.repoUrl, commitId: source.commitId ?? null, ref: source.ref ?? null };
}
async function materializeGitBundles(workspacePath: string, resourceBundleRef: ResourceBundleRef, defaultSource: GitBundleSource, defaultCheckout: GitCheckout, checkoutFor: (source: GitBundleSource) => Promise<GitCheckout>): Promise<MaterializedGitBundle[]> {
const items: MaterializedGitBundle[] = [];
for (const [index, bundle] of resourceBundleRef.bundles.entries()) {
const gitSource = bundleGitSource(bundle, resourceBundleRef, defaultSource);
const checkout = gitSource === defaultSource ? defaultCheckout : await checkoutFor(gitSource);
const source = resolveBundlePath(checkout.checkoutPath, bundle.subpath, `bundles[${index}].subpath`);
const target = resolveWorkspaceTargetPath(workspacePath, bundle.targetPath, `bundles[${index}].target_path`);
let sourceStat;
try {
sourceStat = await stat(source);
} catch (error) {
throw new AgentRunError("schema-invalid", `gitbundle subpath ${bundle.subpath} is not readable`, { httpStatus: 400, details: { index, subpath: bundle.subpath, error: fileErrorSummary(error), valuesPrinted: false } });
}
await mkdir(path.dirname(target), { recursive: true });
await rm(target, { recursive: true, force: true });
await cp(source, target, { recursive: true, force: true, dereference: false });
items.push({ name: bundle.name ?? null, repoUrl: checkout.repoUrl, commitId: checkout.commitId, requestedCommitId: bundle.commitId ?? resourceBundleRef.commitId ?? null, requestedRef: checkout.requestedRef ?? null, subpath: bundle.subpath, targetPath: bundle.targetPath, sourceKind: sourceStat.isDirectory() ? "directory" : "file", sourceBytes: sourceStat.isFile() ? sourceStat.size : null });
}
return items;
}
function optionalNonEmpty(value: unknown): string | undefined {
return typeof value === "string" && value.trim().length > 0 ? value.trim() : undefined;
}
async function prepareGitBundleTools(workspacePath: string, env: NodeJS.ProcessEnv): Promise<{ binPath?: string; event: JsonRecord }> {
const sourceBinPath = path.join(workspacePath, "tools");
const installedBinPath = optionalNonEmpty(env.AGENTRUN_RESOURCE_BIN_PATH);
const runtimeBinPath = installedBinPath ?? sourceBinPath;
let entries;
try {
entries = await readdir(sourceBinPath, { withFileTypes: true });
} catch (error) {
if (error && typeof error === "object" && "code" in error && (error as { code?: unknown }).code === "ENOENT") return { event: { count: 0, names: [], binPath: null, sourceBinPath: null, installedBinPath: null, installed: false, valuesPrinted: false } };
throw error;
}
const names: string[] = [];
const items: JsonRecord[] = [];
if (installedBinPath) await mkdir(installedBinPath, { recursive: true });
for (const entry of entries) {
if (!entry.isFile()) continue;
const filePath = path.join(sourceBinPath, entry.name);
const text = await readFile(filePath, "utf8");
const firstLine = text.split(/\r?\n/u, 1)[0] ?? "";
if (!firstLine.startsWith("#!")) continue;
await chmod(filePath, 0o755);
if (installedBinPath) {
const targetPath = path.join(installedBinPath, entry.name);
if (targetPath !== filePath) {
await cp(filePath, targetPath, { force: true, dereference: false });
await chmod(targetPath, 0o755);
}
}
names.push(entry.name);
items.push({ name: entry.name, sha256: sha256Text(text), bytes: Buffer.byteLength(text, "utf8"), shebang: firstLine.slice(0, 80), valuesPrinted: false });
}
return {
...(names.length > 0 ? { binPath: runtimeBinPath } : {}),
event: {
count: names.length,
names,
items,
binPath: names.length > 0 ? pathSummary(runtimeBinPath) : null,
sourceBinPath: pathSummary(sourceBinPath),
installedBinPath: installedBinPath ? pathSummary(installedBinPath) : null,
installed: Boolean(installedBinPath && names.length > 0),
valuesPrinted: false,
},
};
}
async function materializePromptRefs(checkoutPath: string, refs: NonNullable<ResourceBundleRef["promptRefs"]>): Promise<{ items: MaterializedPromptRef[]; event: JsonRecord }> {
const items: MaterializedPromptRef[] = [];
const eventItems: JsonRecord[] = [];
let totalBytes = 0;
for (const ref of refs) {
const promptPath = resolveBundlePath(checkoutPath, ref.path, `promptRefs.${ref.name}.path`);
const required = ref.required === true;
let text: string;
try {
text = await readFile(promptPath, "utf8");
} catch (error) {
if (required) throw new AgentRunError("prompt-unavailable", `required resource prompt ${ref.name} is not readable`, { httpStatus: 400, details: { name: ref.name, path: ref.path, error: fileErrorSummary(error), valuesPrinted: false } });
eventItems.push({ name: ref.name, path: ref.path, inject: "thread-start", required, status: "missing", valuesPrinted: false });
continue;
}
const bytes = Buffer.byteLength(text, "utf8");
if (bytes > maxPromptRefBytes) throw new AgentRunError("prompt-too-large", `resource prompt ${ref.name} exceeds the per-file size limit`, { httpStatus: 400, details: { name: ref.name, path: ref.path, bytes, maxPromptRefBytes, valuesPrinted: false } });
totalBytes += bytes;
if (totalBytes > maxInitialPromptBytes) throw new AgentRunError("prompt-too-large", "assembled resource prompt exceeds the total size limit", { httpStatus: 400, details: { totalBytes, maxInitialPromptBytes, valuesPrinted: false } });
const sha = sha256Text(text);
items.push({ name: ref.name, path: ref.path, inject: "thread-start", required, text, bytes, sha256: sha });
eventItems.push({ name: ref.name, path: ref.path, inject: "thread-start", required, status: "materialized", sha256: sha, bytes, valuesPrinted: false });
}
return {
items,
event: {
count: refs.length,
materializedCount: items.length,
names: items.map((item) => item.name),
items: eventItems,
totalBytes,
valuesPrinted: false,
},
};
}
async function discoverGitBundleSkills(workspacePath: string): Promise<{ items: MaterializedSkillRef[]; skillsDir?: string; event: JsonRecord }> {
const skillsDir = path.join(workspacePath, ".agents", "skills");
let entries;
try {
entries = await readdir(skillsDir, { withFileTypes: true });
} catch (error) {
if (error && typeof error === "object" && "code" in error && (error as { code?: unknown }).code === "ENOENT") return { items: [], event: { count: 0, materializedCount: 0, names: [], skillsDir: null, items: [], valuesPrinted: false } };
throw error;
}
const items: MaterializedSkillRef[] = [];
const eventItems: JsonRecord[] = [];
for (const entry of entries) {
if (!entry.isDirectory()) continue;
const aggregateAs = entry.name;
const manifestPath = path.join(skillsDir, aggregateAs, "SKILL.md");
let manifestText: string;
try {
manifestText = await readFile(manifestPath, "utf8");
} catch (error) {
eventItems.push({ name: aggregateAs, path: `.agents/skills/${aggregateAs}/SKILL.md`, required: true, aggregateAs, status: "missing", error: fileErrorSummary(error), valuesPrinted: false });
continue;
}
const bytes = Buffer.byteLength(manifestText, "utf8");
const sha = sha256Text(manifestText);
const summary = skillSummary(manifestText);
items.push({ name: aggregateAs, path: `.agents/skills/${aggregateAs}/SKILL.md`, aggregateAs, required: true, registryPath: manifestPath, manifestBytes: bytes, manifestSha256: sha, summary });
eventItems.push({ name: aggregateAs, path: `.agents/skills/${aggregateAs}/SKILL.md`, aggregateAs, required: true, status: "materialized", manifestSha256: sha, manifestBytes: bytes, registryPath: pathSummary(manifestPath), summary, valuesPrinted: false });
}
return {
items,
skillsDir,
event: {
count: entries.filter((entry) => entry.isDirectory()).length,
materializedCount: items.length,
names: items.map((item) => item.name),
skillsDir: pathSummary(skillsDir),
items: eventItems,
valuesPrinted: false,
},
};
}
function assembleInitialPrompt(promptRefs: MaterializedPromptRef[], skills: MaterializedSkillRef[]): InitialPromptAssembly | undefined {
if (promptRefs.length === 0 && skills.length === 0) return undefined;
const sections: string[] = [
"AgentRun initial runtime instructions. These instructions are assembled from ResourceBundleRef promptRefs and gitbundle skill directories for the first thread-start turn only.",
];
for (const prompt of promptRefs) {
sections.push([`## Resource Prompt: ${prompt.name}`, `path: ${prompt.path}`, prompt.text].join("\n"));
}
if (skills.length > 0) {
const lines = [
"## Resource Skills",
"The following required runtime skills are mounted in the current workspace. Use these bundle skills instead of default model skill guesses.",
...skills.map((skill) => `- ${skill.name}: ${skill.summary || "No summary provided."} manifest=.agents/skills/${skill.aggregateAs}/SKILL.md source=${skill.path} required=${skill.required}`),
];
sections.push(lines.join("\n"));
}
const text = sections.join("\n\n");
const bytes = Buffer.byteLength(text, "utf8");
if (bytes > maxInitialPromptBytes) throw new AgentRunError("prompt-too-large", "assembled initial prompt exceeds the total size limit", { httpStatus: 400, details: { bytes, maxInitialPromptBytes, promptRefCount: promptRefs.length, skillCount: skills.length, valuesPrinted: false } });
return {
text,
summary: {
available: true,
bytes,
sha256: sha256Text(text),
promptRefCount: promptRefs.length,
promptRefNames: promptRefs.map((item) => item.name),
skillCount: skills.length,
skillNames: skills.map((item) => item.name),
valuesPrinted: false,
},
};
}
function skillSummary(text: string): string {
const frontmatter = /^---\s*\n([\s\S]*?)\n---\s*/u.exec(text);
if (frontmatter) {
const descriptionLine = frontmatter[1]?.split(/\r?\n/u).find((line) => /^description\s*:/iu.test(line));
if (descriptionLine) return trimSummary(descriptionLine.replace(/^description\s*:\s*/iu, "").trim().replace(/^['"]|['"]$/gu, ""));
}
const line = text.split(/\r?\n/u).map((entry) => entry.trim()).find((entry) => entry.length > 0 && !entry.startsWith("#") && entry !== "---");
return trimSummary(line ?? "");
}
function trimSummary(value: string): string {
const normalized = value.replace(/\s+/gu, " ").trim();
return normalized.length > skillSummaryChars ? `${normalized.slice(0, skillSummaryChars)}...` : normalized;
}
function sha256Text(text: string): string {
return createHash("sha256").update(text, "utf8").digest("hex");
}
function fileErrorSummary(error: unknown): JsonRecord {
const record = typeof error === "object" && error !== null ? error as { code?: unknown; message?: unknown } : {};
return { code: typeof record.code === "string" ? record.code : null, message: typeof record.message === "string" ? redactText(record.message).slice(0, 300) : null };
}
async function git(args: string[], cwd: string, options: { allowFailure?: boolean } = {}): Promise<{ stdout: string; stderr: string }> {
const child = spawn("git", args, { cwd, stdio: ["ignore", "pipe", "pipe"] });
let stdout = "";
let stderr = "";
child.stdout.setEncoding("utf8");
child.stderr.setEncoding("utf8");
child.stdout.on("data", (chunk) => { stdout += String(chunk); });
child.stderr.on("data", (chunk) => { stderr += String(chunk); });
const result = await new Promise<{ code: number | null; signal: NodeJS.Signals | null }>((resolve, reject) => {
child.on("error", reject);
child.on("close", (code, signal) => resolve({ code, signal }));
}).catch((error: unknown) => {
throw new AgentRunError("infra-failed", `failed to start git: ${error instanceof Error ? error.message : String(error)}`, { httpStatus: 503 });
});
if (result.code !== 0 && !options.allowFailure) {
throw new AgentRunError("infra-failed", `git ${args[0] ?? "command"} failed with code ${result.code}`, { httpStatus: 502, details: { stderr: redactText(stderr.slice(-4000)), stdout: redactText(stdout.slice(-1000)), signal: result.signal } });
}
return { stdout, stderr };
}
function resolveBundlePath(checkoutPath: string, relativePath: string, fieldName: string): string {
const resolved = path.resolve(checkoutPath, relativePath);
const root = path.resolve(checkoutPath);
if (resolved !== root && !resolved.startsWith(`${root}${path.sep}`)) throw new AgentRunError("schema-invalid", `${fieldName} escaped checkout`, { httpStatus: 400 });
return resolved;
}
function resolveWorkspaceTargetPath(workspacePath: string, relativePath: string, fieldName: string): string {
const resolved = path.resolve(workspacePath, relativePath);
const root = path.resolve(workspacePath);
if (resolved === root || !resolved.startsWith(`${root}${path.sep}`)) throw new AgentRunError("schema-invalid", `${fieldName} escaped workspace`, { httpStatus: 400 });
return resolved;
}
function pathSummary(value: string): JsonRecord {
const parts = value.split(/[\\/]+/u).filter(Boolean);
return { absolute: path.isAbsolute(value), basename: parts.at(-1) ?? null, depth: parts.length, fingerprint: stableHash(value).slice(0, 16), valuePrinted: false };
}