Files
pikasTech-agentrun/tools/tran
T
2026-06-10 17:46:45 +08:00

327 lines
14 KiB
TypeScript
Executable File

#!/usr/bin/env bun
const defaultFrontendPort = 18081;
const sshInputChunkBytes = 32 * 1024;
function jsonHelp() {
return {
ok: true,
tool: "tran",
purpose: "AgentRun runner UniDesk SSH passthrough over frontend /ws/ssh",
requiredEnv: ["UNIDESK_SSH_CLIENT_TOKEN", "UNIDESK_MAIN_SERVER_IP or UNIDESK_FRONTEND_URL"],
supported: [
"tran <provider> <command...>",
"tran <provider> argv <command...>",
"tran <provider> script -- '<shell script>'",
"tran <provider>:/absolute/workspace script -- '<shell script>'",
"tran <provider>:/absolute/workspace apply-patch < patch.diff",
"tran <provider>:k3s kubectl <args...>",
"tran <provider>:k3s script -- '<shell script>'",
"tran <provider>:k3s:<namespace>:<workload>[:container] argv <command...>",
"tran <provider>:k3s:<namespace>:<workload>[:container] script -- '<shell script>'",
"tran <provider>:k3s:<namespace>:<workload>[:container] apply-patch < patch.diff",
],
unsupported: ["upload", "download", "Windows win/ps/cmd routes"],
valuesPrinted: false,
};
}
function writeJson(value, stream = process.stdout) {
stream.write(`${JSON.stringify(value)}\n`);
}
function fail(failureKind, message, details = {}, exitCode = 2) {
writeJson({ ok: false, failureKind, message, ...details, valuesPrinted: false }, process.stderr);
process.exit(exitCode);
}
function shellQuote(value) {
return `'${String(value).replace(/'/g, `'"'"'`)}'`;
}
function shellArgv(args) {
if (args.length === 0) return "";
return args.map(shellQuote).join(" ");
}
function baseUrlFromEnv(env) {
const explicit = (env.UNIDESK_FRONTEND_URL || env.UNIDESK_MAIN_SERVER_URL || "").trim();
if (explicit) return explicit.replace(/\/+$/g, "");
const host = (env.UNIDESK_MAIN_SERVER_IP || env.UNIDESK_MAIN_SERVER_HOST || env.CODE_QUEUE_DEV_CONTAINER_MASTER_HOST || "").trim();
if (!host) return null;
if (host.startsWith("http://") || host.startsWith("https://")) return host.replace(/\/+$/g, "");
if (/:[0-9]+$/u.test(host)) return `http://${host}`;
const port = Number(env.UNIDESK_FRONTEND_PORT || env.UNIDESK_MAIN_SERVER_PORT || defaultFrontendPort);
return `http://${host}:${Number.isInteger(port) && port > 0 ? port : defaultFrontendPort}`;
}
function websocketUrl(baseUrl) {
const url = new URL("/ws/ssh", baseUrl);
url.protocol = url.protocol === "https:" ? "wss:" : "ws:";
return url.toString();
}
function parseRoute(raw) {
const parts = String(raw).split(":");
const providerId = parts.shift() || "";
if (!providerId) fail("schema-invalid", "tran route requires a provider id", { route: raw });
if (parts.length === 0) return { providerId, plane: "host", workspace: null, namespace: null, resource: null, container: null, raw };
if (parts[0] === "win") fail("unsupported-operation", "AgentRun tran does not support Windows routes yet", { route: raw });
if (parts[0] === "k3s") {
return {
providerId,
plane: "k3s",
workspace: null,
namespace: parts[1] || null,
resource: parts[2] || null,
container: parts[3] || null,
raw,
};
}
const workspace = parts.join(":");
if (!workspace.startsWith("/")) fail("schema-invalid", "host workspace routes must be absolute paths", { route: raw });
return { providerId, plane: "host", workspace, namespace: null, resource: null, container: null, raw };
}
async function readStdinText() {
const chunks = [];
for await (const chunk of Bun.stdin.stream()) chunks.push(Buffer.from(chunk));
return Buffer.concat(chunks).toString("utf8");
}
async function readApplyPatchHelperSource() {
const helperPath = new URL("./apply_patch", import.meta.url);
try {
const text = await Bun.file(helperPath).text();
if (!text.startsWith("#!")) fail("infra-failed", "adjacent apply_patch helper is not executable script shaped", { helper: "tools/apply_patch" });
return text;
} catch (error) {
fail("infra-failed", "adjacent apply_patch helper is required for runner-side tran apply-patch", { helper: "tools/apply_patch", error: error instanceof Error ? error.message : String(error) });
}
}
function applyPatchToolArgs(args) {
const supported = new Set(["--allow-loose", "--help", "-h"]);
for (const arg of args) {
if (!supported.has(arg)) fail("schema-invalid", `unsupported apply-patch option: ${arg}`, { supported: [...supported].sort() });
}
return args;
}
function base64Text(value) {
return Buffer.from(value, "utf8").toString("base64").replace(/.{1,76}/g, "$&\n").trimEnd();
}
async function applyPatchRemoteScript(args) {
const toolArgs = applyPatchToolArgs(args);
const [helperSource, patchText] = await Promise.all([readApplyPatchHelperSource(), readStdinText()]);
if (patchText.trim().length === 0) fail("schema-invalid", "apply-patch requires patch text on stdin");
const helperMarker = "__AGENTRUN_TRAN_APPLY_PATCH_HELPER__";
const patchMarker = "__AGENTRUN_TRAN_APPLY_PATCH_BODY__";
const toolArgText = toolArgs.length > 0 ? ` ${shellArgv(toolArgs)}` : "";
return [
"set -eu",
"helper=$(mktemp \"${TMPDIR:-/tmp}/agentrun-apply-patch.XXXXXX\") || exit 1",
"patch_file=$(mktemp \"${TMPDIR:-/tmp}/agentrun-apply-patch-body.XXXXXX\") || exit 1",
"cleanup() { rm -f \"$helper\" \"$patch_file\"; }",
"trap cleanup EXIT HUP INT TERM",
"decode_b64() { base64 -d; }",
`decode_b64 > \"$helper\" <<'${helperMarker}'`,
base64Text(helperSource),
helperMarker,
`decode_b64 > \"$patch_file\" <<'${patchMarker}'`,
base64Text(patchText),
patchMarker,
"chmod 700 \"$helper\"",
`\"$helper\"${toolArgText} < \"$patch_file\"`,
].join("\n");
}
async function scriptCommand(args) {
if (args[0] === "--") {
const rest = args.slice(1);
if (rest.length === 0) return await readStdinText();
if (rest.length === 1) return rest[0];
return shellArgv(rest);
}
if (args.length === 0) return await readStdinText();
return shellArgv(args);
}
async function hostCommand(route, args) {
if (args.length === 0) return { command: null, cwd: route.workspace, tty: true };
const op = args[0];
if (op === "apply-patch") return { command: await applyPatchRemoteScript(args.slice(1)), cwd: route.workspace, tty: false };
if (op === "upload" || op === "download") {
fail("unsupported-operation", `AgentRun tran does not support ${op}; use host/source controlled tools outside the runner for that operation`, { operation: op });
}
if (op === "script" || op === "shell") return { command: await scriptCommand(args.slice(1)), cwd: route.workspace, tty: false };
if (op === "argv") return { command: shellArgv(args.slice(1)), cwd: route.workspace, tty: false };
return { command: shellArgv(args), cwd: route.workspace, tty: false };
}
function k3sExecPrefix(route) {
const base = ["env", "KUBECONFIG=/etc/rancher/k3s/k3s.yaml", "kubectl", "exec"];
if (route.namespace) base.push("-n", route.namespace);
if (!route.resource) fail("schema-invalid", "k3s workload routes require namespace and resource", { route: route.raw });
base.push(route.resource);
if (route.container) base.push("-c", route.container);
base.push("--");
return base;
}
async function k3sCommand(route, args) {
const op = args[0] || "kubectl";
if (op === "upload" || op === "download") {
fail("unsupported-operation", `AgentRun tran does not support ${op}; use host/source controlled tools outside the runner for that operation`, { operation: op });
}
if (!route.resource) {
if (op === "apply-patch") fail("schema-invalid", "k3s apply-patch requires namespace and workload route", { route: route.raw });
if (op === "kubectl") return { command: shellArgv(["env", "KUBECONFIG=/etc/rancher/k3s/k3s.yaml", "kubectl", ...args.slice(1)]), cwd: null, tty: false };
if (op === "script" || op === "shell") {
const script = await scriptCommand(args.slice(1));
return { command: `export KUBECONFIG=/etc/rancher/k3s/k3s.yaml; ${script}`, cwd: null, tty: false };
}
if (op === "argv") return { command: shellArgv(["env", "KUBECONFIG=/etc/rancher/k3s/k3s.yaml", ...args.slice(1)]), cwd: null, tty: false };
return { command: shellArgv(["env", "KUBECONFIG=/etc/rancher/k3s/k3s.yaml", ...args]), cwd: null, tty: false };
}
if (op === "apply-patch") {
const script = await applyPatchRemoteScript(args.slice(1));
return { command: shellArgv([...k3sExecPrefix(route), "sh", "-lc", script]), cwd: null, tty: false };
}
if (op === "script" || op === "shell") {
const script = await scriptCommand(args.slice(1));
return { command: shellArgv([...k3sExecPrefix(route), "sh", "-lc", script]), cwd: null, tty: false };
}
if (op === "argv") return { command: shellArgv([...k3sExecPrefix(route), ...args.slice(1)]), cwd: null, tty: false };
return { command: shellArgv([...k3sExecPrefix(route), ...args]), cwd: null, tty: false };
}
async function buildOpenPayload(argv) {
if (argv.length === 0) fail("schema-invalid", "tran requires a route", { help: jsonHelp() });
const route = parseRoute(argv[0]);
const parsed = route.plane === "k3s" ? await k3sCommand(route, argv.slice(1)) : await hostCommand(route, argv.slice(1));
return {
providerId: route.providerId,
command: parsed.command || undefined,
cwd: parsed.cwd || undefined,
tty: parsed.tty === true,
cols: Number(process.stdout.columns) > 0 ? Number(process.stdout.columns) : 100,
rows: Number(process.stdout.rows) > 0 ? Number(process.stdout.rows) : 30,
openTimeoutMs: Math.max(15000, Math.min(Number(process.env.UNIDESK_SSH_OPEN_TIMEOUT_MS || 60000), 60000)),
runtimeTimeoutMs: Math.max(1000, Math.min(Number(process.env.UNIDESK_SSH_RUNTIME_TIMEOUT_MS || process.env.UNIDESK_TRAN_RUNTIME_TIMEOUT_MS || 60000), 60000)),
stdinEotOnEnd: true,
route: route.raw,
};
}
async function main() {
const argv = process.argv.slice(2);
if (argv[0] === "--help" || argv[0] === "help" || argv[0] === "-h") {
writeJson(jsonHelp());
return;
}
const token = (process.env.UNIDESK_SSH_CLIENT_TOKEN || "").trim();
if (!token) fail("secret-unavailable", "UNIDESK_SSH_CLIENT_TOKEN is required for runner-side tran");
const baseUrl = baseUrlFromEnv(process.env);
if (!baseUrl) fail("schema-invalid", "UNIDESK_MAIN_SERVER_IP, UNIDESK_MAIN_SERVER_HOST, or UNIDESK_FRONTEND_URL is required for runner-side tran");
const open = await buildOpenPayload(argv);
await runWebSocket(open, websocketUrl(baseUrl), token);
}
async function runWebSocket(open, url, token) {
const ws = new WebSocket(url, { headers: { authorization: `Bearer ${token}` } });
let exitCode = 255;
let canSend = false;
let sessionReady = false;
let settled = false;
const pending = [];
const pendingInput = [];
const send = (value) => {
const text = JSON.stringify(value);
if (!canSend || ws.readyState !== WebSocket.OPEN) pending.push(text);
else ws.send(text);
};
const sendInput = (value) => {
const text = JSON.stringify(value);
if (!sessionReady || ws.readyState !== WebSocket.OPEN) pendingInput.push(text);
else ws.send(text);
};
const flush = () => {
while (pending.length > 0 && ws.readyState === WebSocket.OPEN) ws.send(pending.shift());
};
const flushInput = () => {
if (!sessionReady || ws.readyState !== WebSocket.OPEN) return;
while (pendingInput.length > 0) ws.send(pendingInput.shift());
};
const finish = (code) => {
if (settled) return;
settled = true;
clearTimeout(openTimer);
clearTimeout(runtimeTimer);
process.exit(code);
};
const openTimer = setTimeout(() => {
if (sessionReady || settled) return;
process.stderr.write("unidesk runner tran timed out waiting for provider session\n");
exitCode = 255;
try { ws.close(); } catch {}
}, open.openTimeoutMs);
const runtimeTimer = setTimeout(() => {
if (settled) return;
process.stderr.write(`UNIDESK_TRAN_TIMEOUT_HINT ${JSON.stringify({ code: "tran-runtime-timeout", level: "warning", route: open.route, timeoutMs: open.runtimeTimeoutMs, message: "tran exceeded the runtime limit; use short query plus poll semantics" })}\n`);
exitCode = 124;
try { ws.close(); } catch {}
finish(124);
}, open.runtimeTimeoutMs);
ws.addEventListener("open", () => {
canSend = true;
send({ type: "ssh.open", providerId: open.providerId, command: open.command, cwd: open.cwd, tty: open.tty, cols: open.cols, rows: open.rows });
flush();
});
ws.addEventListener("message", (event) => {
let message;
const text = typeof event.data === "string" ? event.data : Buffer.from(event.data).toString("utf8");
try {
message = JSON.parse(text);
} catch {
process.stderr.write(`${text}\n`);
return;
}
if (message.type === "ssh.dispatched") return;
if (message.type === "ssh.opened") {
sessionReady = true;
clearTimeout(openTimer);
sendInput({ type: "ssh.input", data: Buffer.from([4]).toString("base64"), encoding: "base64" });
sendInput({ type: "ssh.eof" });
flushInput();
return;
}
if (message.type === "ssh.data") {
const chunk = Buffer.from(String(message.data || ""), message.encoding === "base64" ? "base64" : "utf8");
if (message.stream === "stderr") process.stderr.write(chunk);
else process.stdout.write(chunk);
return;
}
if (message.type === "ssh.error") {
process.stderr.write(`${String(message.message || "ssh bridge error")}\n`);
exitCode = 255;
try { ws.close(); } catch {}
return;
}
if (message.type === "ssh.exit") {
exitCode = Number.isInteger(message.exitCode) ? Number(message.exitCode) : 255;
try { ws.close(); } catch {}
}
});
ws.addEventListener("close", () => finish(exitCode));
ws.addEventListener("error", () => {
process.stderr.write("unidesk runner tran websocket error\n");
finish(255);
});
}
await main();