Files
pikasTech-unidesk/scripts/src/remote.ts
T

591 lines
24 KiB
TypeScript

import { spawn } from "node:child_process";
import { type UniDeskConfig } from "./config";
import { type DebugDispatchCommand, isDebugDispatchCommand } from "./debug";
import { summarizeMicroserviceProxyResponse } from "./microservices";
import { isSshSkillDiscoveryArgs, parseSshArgs } from "./ssh";
import { codexJudgeQueryAsync, codexOutputQueryAsync, codexTaskQueryAsync } from "./code-queue";
export interface RemoteCliOptions {
host: string | null;
user: string;
port: number;
projectRoot: string;
identityFile: string | null;
transport: "auto" | "frontend" | "ssh";
args: string[];
}
interface FrontendSession {
baseUrl: string;
cookie: string;
}
interface FetchJsonResult {
ok: boolean;
status?: number;
body?: unknown;
error?: string;
}
const hostOptions = new Set(["--main-server-ip", "--main-server", "--server"]);
const userOptions = new Set(["--main-server-user", "--server-user"]);
const portOptions = new Set(["--main-server-port", "--server-port"]);
const rootOptions = new Set(["--main-server-root", "--server-root"]);
const keyOptions = new Set(["--main-server-key", "--server-key"]);
const transportOptions = new Set(["--main-server-transport", "--server-transport"]);
function positivePort(raw: string, option: string): number {
const value = Number(raw);
if (!Number.isInteger(value) || value <= 0 || value > 65535) throw new Error(`${option} must be a TCP port from 1 to 65535`);
return value;
}
function requiredValue(argv: string[], index: number, option: string): string {
const value = argv[index + 1];
if (value === undefined || value.length === 0) throw new Error(`${option} requires a non-empty value`);
return value;
}
function transportValue(raw: string, option: string): RemoteCliOptions["transport"] {
if (raw === "auto" || raw === "frontend" || raw === "ssh") return raw;
throw new Error(`${option} must be one of: auto, frontend, ssh`);
}
export function extractRemoteCliOptions(argv: string[]): RemoteCliOptions {
const rest: string[] = [];
const options: RemoteCliOptions = {
host: null,
user: "root",
port: 22,
projectRoot: "/root/unidesk",
identityFile: null,
transport: "auto",
args: rest,
};
for (let index = 0; index < argv.length; index += 1) {
const arg = argv[index] ?? "";
if (arg === "--") {
rest.push(...argv.slice(index + 1));
break;
}
if (hostOptions.has(arg)) {
options.host = requiredValue(argv, index, arg);
index += 1;
continue;
}
if (userOptions.has(arg)) {
options.user = requiredValue(argv, index, arg);
index += 1;
continue;
}
if (portOptions.has(arg)) {
options.port = positivePort(requiredValue(argv, index, arg), arg);
index += 1;
continue;
}
if (rootOptions.has(arg)) {
options.projectRoot = requiredValue(argv, index, arg);
index += 1;
continue;
}
if (keyOptions.has(arg)) {
options.identityFile = requiredValue(argv, index, arg);
index += 1;
continue;
}
if (transportOptions.has(arg)) {
options.transport = transportValue(requiredValue(argv, index, arg), arg);
index += 1;
continue;
}
rest.push(arg);
}
return options;
}
function shellQuote(value: string): string {
return `'${value.replace(/'/g, `'\\''`)}'`;
}
async function runRemoteCliOverSsh(options: RemoteCliOptions): Promise<number> {
if (options.host === null) throw new Error("runRemoteCli requires --main-server-ip or --server");
const remoteArgs = options.args.length === 0 ? ["help"] : options.args;
const remoteCommand = [
"cd",
shellQuote(options.projectRoot),
"&&",
"exec",
"bun",
"scripts/cli.ts",
...remoteArgs.map(shellQuote),
].join(" ");
const sshArgs = [
"-p",
String(options.port),
"-o",
"BatchMode=yes",
"-o",
"StrictHostKeyChecking=accept-new",
"-o",
"ServerAliveInterval=20",
"-o",
"ServerAliveCountMax=3",
];
if (options.identityFile !== null) sshArgs.push("-i", options.identityFile);
sshArgs.push(`${options.user}@${options.host}`, remoteCommand);
const child = spawn("ssh", sshArgs, {
stdio: ["inherit", "inherit", "inherit"],
});
return await new Promise<number>((resolve) => {
child.on("error", (error) => {
process.stderr.write(`unidesk remote cli failed to start ssh: ${error.message}\n`);
resolve(127);
});
child.on("close", (code) => resolve(code ?? 255));
});
}
function emitRemoteJson(command: string, data: unknown, ok = true): void {
process.stdout.write(`${JSON.stringify({ ok, command, data }, null, 2)}\n`);
}
function emitRemoteError(command: string, error: unknown): void {
const payload = error instanceof Error
? { name: error.name, message: error.message, stack: error.stack ?? null }
: { message: String(error) };
process.stdout.write(`${JSON.stringify({ ok: false, command, error: payload }, null, 2)}\n`);
}
function commandName(args: string[]): string {
return args.join(" ") || "help";
}
function frontendBaseUrl(host: string, config: UniDeskConfig): string {
if (host.startsWith("http://") || host.startsWith("https://")) return host.replace(/\/+$/, "");
if (/:\d+$/.test(host)) return `http://${host}`;
return `http://${host}:${config.network.frontend.port}`;
}
async function readJson(url: string, init?: RequestInit, timeoutMs = 8000): Promise<FetchJsonResult> {
const controller = new AbortController();
const timer = setTimeout(() => controller.abort(), timeoutMs);
try {
const res = await fetch(url, { ...init, signal: controller.signal });
const text = await res.text();
let body: unknown = null;
try {
body = text.length > 0 ? JSON.parse(text) : null;
} catch {
body = { text };
}
return { ok: res.ok, status: res.status, body };
} catch (error) {
return { ok: false, error: error instanceof Error ? error.message : String(error) };
} finally {
clearTimeout(timer);
}
}
async function loginFrontend(host: string, config: UniDeskConfig): Promise<FrontendSession> {
const baseUrl = frontendBaseUrl(host, config);
const res = await fetch(`${baseUrl}/login`, {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify({ username: config.auth.username, password: config.auth.password }),
});
const body = await res.text();
if (!res.ok) {
throw new Error(`frontend login failed via ${baseUrl}: status=${res.status} body=${body.slice(0, 300)}`);
}
const cookie = res.headers.get("set-cookie")?.split(";")[0] ?? "";
if (cookie.length === 0) throw new Error(`frontend login via ${baseUrl} did not return a session cookie`);
return { baseUrl, cookie };
}
async function frontendJson(session: FrontendSession, path: string, init?: RequestInit, timeoutMs = 8000): Promise<FetchJsonResult> {
const headers = new Headers(init?.headers);
headers.set("cookie", session.cookie);
if (init?.body !== undefined && !headers.has("content-type")) headers.set("content-type", "application/json");
return readJson(`${session.baseUrl}${path}`, { ...init, headers }, timeoutMs);
}
function stringOption(args: string[], name: string): string | undefined {
const index = args.indexOf(name);
if (index === -1) return undefined;
const raw = args[index + 1];
if (raw === undefined || raw.length === 0) throw new Error(`${name} requires a non-empty value`);
return raw;
}
function numberOption(args: string[], name: string, defaultValue: number): number {
const raw = stringOption(args, name);
if (raw === undefined) return defaultValue;
const value = Number(raw);
if (!Number.isInteger(value) || value <= 0) throw new Error(`${name} must be a positive integer`);
return value;
}
function jsonOption(args: string[], name: string): Record<string, unknown> | undefined {
const raw = stringOption(args, name);
if (raw === undefined) return undefined;
const parsed = JSON.parse(raw) as unknown;
if (typeof parsed !== "object" || parsed === null || Array.isArray(parsed)) throw new Error(`${name} must be a JSON object`);
return parsed as Record<string, unknown>;
}
const compactSkillDiscoverPython = String.raw`import os,json,socket,platform,getpass
from pathlib import Path as P
S=os.getenv('S','all');L=int(os.getenv('L','0'));D=int(os.getenv('D','4'));skip={'node_modules','.git','.state','logs','references','__pycache__'}
def isw():
try:r=P('/proc/sys/kernel/osrelease').read_text(errors='ignore').lower()
except Exception:r=''
return 'microsoft' in r or 'wsl' in r or 'WSL_INTEROP' in os.environ
def wp(p):
s=str(p)
return s[5].upper()+':\\'+s[7:].replace('/','\\') if s.startswith('/mnt/') and len(s)>6 and s[5].isalpha() and s[6]=='/' else None
def md(f):
n=f.parent.name;d=''
try:ls=f.read_text(errors='replace')[:8192].splitlines()
except Exception:ls=[]
if ls and ls[0].strip()=='---':
for l in ls[1:]:
if l.strip()=='---':break
if ':' in l:
k,v=l.split(':',1);k=k.strip().lower();v=v.strip().strip('"\' ')
if k=='name' and v:n=v
if k=='description' and v:d=v
if not d:
for l in ls:
x=l.strip()
if x and not x.startswith('---') and not x.startswith('#'):
d=x;break
return n,d
def scan(sc,root):
rec={'scope':sc,'path':str(root),'windowsPath':wp(root),'exists':False,'skillCount':0,'error':None};out=[]
try:rec['exists']=root.exists()
except Exception as e:rec['error']=str(e)
if not rec['exists']:return rec,out
try:
for f in root.rglob('SKILL.md'):
rel=f.relative_to(root).parts[:-1]
if not rel or len(rel)>D or any(x in skip for x in rel):continue
n,d=md(f);out.append({'scope':sc,'name':n,'description':d,'path':str(f.parent),'skillMd':str(f),'windowsPath':wp(f.parent),'root':str(root)})
except Exception as e:rec['error']=str(e)
rec['skillCount']=len(out);return rec,out
roots=[];h=P.home()
if S!='windows':roots += [('wsl',h/'.agents/skills'),('wsl',h/'.codex/skills'),('wsl',P('/root/.agents/skills')),('wsl',P('/root/.codex/skills'))]
if S!='wsl' and isw():
try:users=list(P('/mnt/c/Users').iterdir())
except Exception:users=[]
for u in users:
if u.is_dir() and u.name.lower() not in {'all users','default','default user','public'}:roots += [('windows',u/'.agents/skills'),('windows',u/'.codex/skills')]
seen=set();rr=[];ss=[]
for sc,r in roots:
k=(sc,str(r))
if k in seen:continue
seen.add(k);a,b=scan(sc,r);rr.append(a);ss+=b
ss.sort(key=lambda x:(0 if x['scope']=='wsl' else 1,x['name'].lower(),x['path']));tb=len(ss)
if L>0:ss=ss[:L]
c={'total':len(ss),'totalBeforeLimit':tb,'wsl':sum(1 for x in ss if x['scope']=='wsl'),'windows':sum(1 for x in ss if x['scope']=='windows')}
print(json.dumps({'ok':True,'command':'unidesk ssh skills','node':{'hostname':socket.gethostname(),'user':getpass.getuser(),'home':str(P.home()),'platform':platform.platform(),'isWsl':isw()},'counts':c,'roots':rr,'skills':ss},ensure_ascii=False,indent=2))`;
function remoteFrontendSkillDiscoverCommand(args: string[]): string {
let scope = "all";
let limit = 0;
let maxDepth = 4;
const start = args[0] === "skill" ? 2 : 1;
for (let index = start; index < args.length; index += 1) {
const arg = args[index] ?? "";
const next = args[index + 1];
if (arg === "--scope") {
if (next !== "all" && next !== "wsl" && next !== "windows") throw new Error("ssh skills --scope must be one of: all, wsl, windows");
scope = next;
index += 1;
continue;
}
if (arg === "--limit") {
const value = Number(next);
if (!Number.isInteger(value) || value < 0) throw new Error("ssh skills --limit must be >= 0");
limit = value;
index += 1;
continue;
}
if (arg === "--max-depth") {
const value = Number(next);
if (!Number.isInteger(value) || value <= 0) throw new Error("ssh skills --max-depth must be positive");
maxDepth = value;
index += 1;
continue;
}
throw new Error(`remote frontend ssh skills does not support option: ${arg}`);
}
return `S=${shellQuote(scope)} L=${shellQuote(String(limit))} D=${shellQuote(String(maxDepth))} python3 -c ${shellQuote(compactSkillDiscoverPython)}`;
}
function dispatchPayload(args: string[], command: DebugDispatchCommand): Record<string, unknown> {
const explicit = jsonOption(args, "--payload-json") ?? {};
if (command === "provider.upgrade") {
return { source: "cli-remote", mode: stringOption(args, "--mode") ?? stringOption(args, "--upgrade-mode") ?? "plan", ...explicit };
}
if (command === "host.ssh") {
const sshCommand = stringOption(args, "--ssh-command");
return {
source: "cli-remote",
mode: sshCommand === undefined ? "probe" : "exec",
...(sshCommand === undefined ? {} : { command: sshCommand }),
...(stringOption(args, "--cwd") === undefined ? {} : { cwd: stringOption(args, "--cwd") }),
...(args.includes("--timeout-ms") ? { timeoutMs: numberOption(args, "--timeout-ms", 8000) } : {}),
...explicit,
};
}
return { source: "cli-remote", ...explicit };
}
function summarizeSystemStatus(response: FetchJsonResult): FetchJsonResult {
const body = response.body as { systemStatuses?: Array<Record<string, unknown>>; ok?: boolean } | null;
const systemStatuses = (body?.systemStatuses ?? []).map((item) => {
const current = (item.current ?? {}) as Record<string, unknown>;
const history = Array.isArray(item.history) ? item.history : [];
return {
providerId: item.providerId,
name: item.name,
nodeStatus: item.nodeStatus,
updatedAt: item.updatedAt,
current: item.current === null || item.current === undefined ? null : {
ok: current.ok,
collectedAt: current.collectedAt,
cpu: current.cpu,
memory: current.memory,
disk: current.disk,
},
historyPreview: history.slice(-8),
historyCount: history.length,
};
});
return { ...response, body: { ok: body?.ok === true, systemStatuses } };
}
function summarizeDockerStatus(response: FetchJsonResult): FetchJsonResult {
const body = response.body as { dockerStatuses?: Array<Record<string, unknown>>; ok?: boolean } | null;
const dockerStatuses = (body?.dockerStatuses ?? []).map((item) => {
const status = (item.dockerStatus ?? {}) as Record<string, unknown>;
const containers = Array.isArray(status.containers) ? status.containers : [];
return {
providerId: item.providerId,
name: item.name,
nodeStatus: item.nodeStatus,
updatedAt: item.updatedAt,
dockerStatus: {
ok: status.ok,
socketPresent: status.socketPresent,
collectedAt: status.collectedAt,
counts: status.counts,
daemon: status.daemon,
containersPreview: containers.slice(0, 8),
},
};
});
return { ...response, body: { ok: body?.ok === true, dockerStatuses } };
}
async function waitForFrontendTask(session: FrontendSession, taskId: string, timeoutMs: number): Promise<unknown> {
const started = Date.now();
let latest: unknown = null;
while (Date.now() - started < timeoutMs) {
latest = await frontendJson(session, "/api/tasks?limit=100");
const tasks = (latest as { body?: { tasks?: Array<{ id?: string; status?: string; result?: unknown }> } }).body?.tasks ?? [];
const task = tasks.find((item) => item.id === taskId);
if (task?.status === "succeeded" || task?.status === "failed") return { ok: true, task };
await Bun.sleep(500);
}
return { ok: false, timeoutMs, latest };
}
async function remoteHealth(session: FrontendSession, config: UniDeskConfig): Promise<unknown> {
return {
transport: "frontend",
frontendPublic: await readJson(`${session.baseUrl}/health`),
providerIngressPublic: await readJson(`http://${new URL(session.baseUrl).hostname}:${config.network.providerIngress.port}/health`),
overviewInternal: await frontendJson(session, "/api/overview"),
nodesInternal: await frontendJson(session, "/api/nodes"),
systemStatusInternal: summarizeSystemStatus(await frontendJson(session, "/api/nodes/system-status?limit=24")),
dockerStatusInternal: summarizeDockerStatus(await frontendJson(session, "/api/nodes/docker-status")),
publicExposureBoundary: {
coreHostPort: { port: config.network.core.port, expected: "not-exposed" },
databaseHostPort: { port: config.network.database.port, expected: "not-exposed" },
},
};
}
async function remoteDebugDispatch(session: FrontendSession, config: UniDeskConfig, args: string[]): Promise<unknown> {
const third = args[2];
const fourth = args[3];
const providerId = isDebugDispatchCommand(third) ? config.providerGateway.id : third ?? config.providerGateway.id;
const commandArg = isDebugDispatchCommand(third) ? third : fourth;
const dispatchCommand = isDebugDispatchCommand(commandArg) ? commandArg : "docker.ps";
const dispatch = await frontendJson(session, "/api/dispatch", {
method: "POST",
body: JSON.stringify({ providerId, command: dispatchCommand, payload: dispatchPayload(args, dispatchCommand) }),
});
const taskId = (dispatch as { body?: { taskId?: string } }).body?.taskId ?? "";
const waitMs = numberOption(args, "--wait-ms", 0);
const wait = waitMs > 0 && taskId.length > 0 ? await waitForFrontendTask(session, taskId, waitMs) : null;
return { transport: "frontend", dispatch, wait };
}
async function remoteDebugTask(session: FrontendSession, args: string[]): Promise<unknown> {
const taskId = args[2] ?? "latest";
const tasksResponse = await frontendJson(session, "/api/tasks?limit=100");
const tasks = (tasksResponse as { body?: { tasks?: Array<{ id?: string }> } }).body?.tasks ?? [];
const task = taskId === "latest" ? tasks[0] : tasks.find((item) => item.id === taskId);
return { transport: "frontend", tasksResponse, taskId, task: task ?? null };
}
async function remoteMicroservice(session: FrontendSession, args: string[]): Promise<unknown> {
const action = args[1] ?? "list";
const id = args[2];
const path = args[3];
if (action === "list") {
return { transport: "frontend", response: await frontendJson(session, "/api/microservices", undefined, 12_000) };
}
if ((action === "status" || action === "health") && id !== undefined) {
return {
transport: "frontend",
response: await frontendJson(session, `/api/microservices/${encodeURIComponent(id)}/${action}`, undefined, 18_000),
};
}
if (action === "proxy" && id !== undefined && path !== undefined && path.startsWith("/")) {
const response = await frontendJson(session, `/api/microservices/${encodeURIComponent(id)}/proxy${path}`, undefined, 24_000);
return {
transport: "frontend",
response: summarizeMicroserviceProxyResponse(response, args),
};
}
throw new Error("remote microservice command must be: microservice list | status <id> | health <id> | proxy <id> <path>");
}
async function remoteCodeQueue(session: FrontendSession, args: string[]): Promise<unknown> {
const action = args[1] ?? "task";
if (action !== "task" && action !== "summary" && action !== "show" && action !== "output" && action !== "judge") {
throw new Error("remote codex command must be: codex task <taskId>, codex output <taskId>, or codex judge <taskId> --attempt N");
}
const taskId = args[2];
if (taskId === undefined || taskId.length === 0) throw new Error(`codex ${action} requires task id`);
const fetcher = (path: string, init?: { method?: string; body?: unknown }): Promise<FetchJsonResult> => {
const requestInit = init === undefined
? undefined
: {
method: init.method,
body: init.body === undefined ? undefined : JSON.stringify(init.body),
};
return frontendJson(session, path, requestInit, action === "judge" ? 130_000 : 24_000);
};
return {
transport: "frontend",
result: action === "output"
? await codexOutputQueryAsync(taskId, args.slice(3), fetcher)
: action === "judge"
? await codexJudgeQueryAsync(taskId, args.slice(3), fetcher)
: await codexTaskQueryAsync(taskId, args.slice(3), fetcher),
};
}
async function runRemoteSshOverFrontend(session: FrontendSession, providerId: string | undefined, args: string[]): Promise<number> {
if (!providerId) throw new Error("remote ssh requires provider id, for example: bun scripts/cli.ts --main-server-ip 74.48.78.17 ssh D601 hostname");
const parsed = parseSshArgs(args);
if (parsed.requiresStdin) {
process.stderr.write("remote frontend transport does not stream stdin for ssh helper subcommands such as apply-patch or py; run the command on the main server or use --main-server-transport ssh\n");
return 255;
}
if (parsed.remoteCommand === null) {
process.stderr.write("remote frontend transport supports ssh remote commands only; pass a command such as: ssh D601 hostname\n");
return 255;
}
if (args[0] === "glob") {
process.stderr.write("remote frontend transport does not support the ssh glob helper because host.ssh exec has a short command-length limit; run it on the main server CLI instead\n");
return 255;
}
const remoteCommand = isSshSkillDiscoveryArgs(args) ? remoteFrontendSkillDiscoverCommand(args) : parsed.remoteCommand;
const dispatch = await frontendJson(session, "/api/dispatch", {
method: "POST",
body: JSON.stringify({
providerId,
command: "host.ssh",
payload: { source: "cli-remote-ssh", mode: "exec", command: remoteCommand, timeoutMs: isSshSkillDiscoveryArgs(args) ? 30000 : 15000 },
}),
});
const taskId = (dispatch as { body?: { taskId?: string } }).body?.taskId ?? "";
if (!dispatch.ok || taskId.length === 0) {
process.stderr.write(`${JSON.stringify(dispatch, null, 2)}\n`);
return 255;
}
const wait = await waitForFrontendTask(session, taskId, 20_000);
const task = (wait as { task?: { status?: string; result?: Record<string, unknown>; message?: string } }).task;
const result = task?.result ?? {};
const stdout = typeof result.stdout === "string" ? result.stdout : "";
const stderr = typeof result.stderr === "string" ? result.stderr : "";
if (stdout.length > 0) process.stdout.write(stdout);
if (stderr.length > 0) process.stderr.write(stderr);
if (task?.status !== "succeeded") {
if (stdout.length === 0 && stderr.length === 0) process.stderr.write(`${JSON.stringify({ taskId, task }, null, 2)}\n`);
return typeof result.exitCode === "number" ? result.exitCode : 255;
}
return typeof result.exitCode === "number" ? result.exitCode : 0;
}
async function runRemoteCliOverFrontend(options: RemoteCliOptions, config: UniDeskConfig): Promise<number> {
if (options.host === null) throw new Error("runRemoteCli requires --main-server-ip or --server");
const args = options.args.length === 0 ? ["help"] : options.args;
const name = commandName(args);
try {
const session = await loginFrontend(options.host, config);
const [top, sub] = args;
if (top === "help" || top === "--help" || top === "-h") {
emitRemoteJson(name, {
transport: "frontend",
baseUrl: session.baseUrl,
commands: ["debug health", "debug dispatch", "debug task", "ssh <providerId> <command>", "ssh <providerId> skills", "microservice list", "microservice status <id>", "microservice health <id>", "microservice proxy <id> <path>", "codex task <taskId>", "codex judge <taskId> --attempt N"],
});
return 0;
}
if (top === "debug" && sub === "health") {
emitRemoteJson(name, await remoteHealth(session, config));
return 0;
}
if (top === "debug" && sub === "dispatch") {
emitRemoteJson(name, await remoteDebugDispatch(session, config, args));
return 0;
}
if (top === "debug" && sub === "task") {
emitRemoteJson(name, await remoteDebugTask(session, args));
return 0;
}
if (top === "microservice") {
emitRemoteJson(name, await remoteMicroservice(session, args));
return 0;
}
if (top === "codex") {
emitRemoteJson(name, await remoteCodeQueue(session, args));
return 0;
}
if (top === "ssh") {
return await runRemoteSshOverFrontend(session, sub, args.slice(2));
}
throw new Error(`remote frontend transport does not support command: ${name}`);
} catch (error) {
emitRemoteError(name, error);
return 1;
}
}
export async function runRemoteCli(options: RemoteCliOptions, config: UniDeskConfig): Promise<number> {
if (options.host === null) throw new Error("runRemoteCli requires --main-server-ip or --server");
const useSsh = options.transport === "ssh" || (options.transport === "auto" && options.identityFile !== null);
if (useSsh) return runRemoteCliOverSsh(options);
return runRemoteCliOverFrontend(options, config);
}