fix: stream ssh downloads over tcp-pool

This commit is contained in:
Codex
2026-06-11 16:26:35 +00:00
parent 3d0faf557e
commit 6bce211b11
6 changed files with 473 additions and 154 deletions
-2
View File
@@ -1685,8 +1685,6 @@ function downloadRemoteFile(options: ArtifactRegistryOptions, remotePath: string
"ssh",
options.providerId,
"download",
"--chunk-bytes",
"1048576",
remotePath,
localPath,
];
+1 -1
View File
@@ -22,7 +22,7 @@ export function rootHelp(): unknown {
{ command: "trans <route> [operation args...] (alias of ssh <route> ...)", description: "Open a Host SSH / WSL SSH maintenance session; provider WebSocket carries control and host.ssh.tcp-pool carries stdin/stdout/stderr data." },
{ command: "trans gh:/owner/repo[/pr|/issue][/number[/1]] ls|cat|rg|patch-apply", description: "Treat GitHub PRs/issues as virtual text directories; `ls --full` shows state/floors/body length, and `patch-apply` updates first-floor `body.md` through UniDesk gh plus apply-patch v2." },
{ command: "trans <route> apply-patch < patch.diff", description: "Default remote text patch entry: apply a standard patch with the local TypeScript v2 engine while the remote route only reads and writes files." },
{ command: "trans <route> upload <local-file> <remote-file> | trans <route> download <remote-file> <local-file>", description: "Transfer whole files through SSH passthrough with remote temp files, automatic endpoint byte/SHA-256 verification, and client-side chunk fallback." },
{ command: "trans <route> upload <local-file> <remote-file> | trans <route> download <remote-file> <local-file>", description: "Transfer whole files through SSH passthrough with automatic endpoint byte/SHA-256 verification; downloads stream stdout over host.ssh.tcp-pool." },
{ command: "trans <providerId> apply-patch-v1 [tool args...] < patch.diff", description: "Fallback to the injected legacy remote apply_patch helper directly over SSH passthrough and stream the patch from local stdin." },
{ command: "trans <providerId> py [script-args...] < script.py", description: "Run remote Python from local stdin through SSH passthrough without nested shell quoting; extra args become script argv." },
{ command: "trans <providerId> script [--shell sh|bash] [script-args...] <<'SCRIPT' ...", description: "Run a remote shell script from local stdin using shell -s; default sh inherits provider proxy env and gets the portable printf helper used by shell/script." },
+8 -1
View File
@@ -496,7 +496,14 @@ function genericJobProgress(job: JobRecord, stderrTailOverride?: string): JobPro
});
const downloadSummary = lastDownload === null
? null
: ` ssh-download ${Number(lastDownload.actualBytes ?? 0)}/${Number(lastDownload.expectedBytes ?? 0)} bytes chunks=${Number(lastDownload.chunks ?? 0)}${downloadRetryEvents.length === 0 ? "" : ` retries=${downloadRetryEvents.length} lastRetry=${String(lastDownload.reason ?? "unknown")}`}`;
: [
` ssh-download ${Number(lastDownload.actualBytes ?? lastDownload.bytes ?? 0)}/${Number(lastDownload.expectedBytes ?? lastDownload.totalBytes ?? 0)} bytes`,
`chunks=${Number(lastDownload.chunks ?? 0)}`,
`remaining=${Number(lastDownload.remainingBytes ?? 0)}`,
`throughput=${Number(lastDownload.throughputBytesPerSecond ?? 0)}Bps`,
`elapsedMs=${Number(lastDownload.elapsedMs ?? 0)}`,
downloadRetryEvents.length === 0 ? null : `retries=${downloadRetryEvents.length} lastRetry=${String(lastDownload.reason ?? "unknown")}`,
].filter(Boolean).join(" ");
return {
kind: "generic",
stage: lastDownload === null ? null : "ssh-download",
+200 -1
View File
@@ -24,7 +24,12 @@ import {
wrapSshRemoteCommand,
type SshCaptureResult,
} from "./ssh";
import { isSshFileTransferOperation, runSshFileTransferOperation, type SshRemoteCommandExecutor } from "./ssh-file-transfer";
import {
isSshFileTransferOperation,
runSshFileTransferOperation,
type SshRemoteCommandExecutor,
type SshRemoteCommandStreamHandlers,
} from "./ssh-file-transfer";
import { runApplyPatchV2, type ApplyPatchV2Executor } from "./apply-patch-v2";
import { codexJudgeQueryAsync, codexOutputQueryAsync, codexPrPreflightQueryAsync, codexQueuesQueryAsync, codexTaskQueryAsync, codexTasksQueryAsync, codexUnreadTriageAsync } from "./code-queue";
import { runDecisionCenterCommandAsync } from "./decision-center";
@@ -1194,6 +1199,12 @@ async function runRemoteSshWebSocketCaptureRemoteCommand(
const sendInput = (value: Buffer | string): void => {
sendWhenSessionReady({ type: "ssh.input", data: Buffer.from(value).toString("base64"), encoding: "base64" });
};
const sendInputChunked = (value: string): void => {
const buffer = Buffer.from(value, "utf8");
for (let offset = 0; offset < buffer.length; offset += remoteSshInputChunkBytes) {
sendInput(buffer.subarray(offset, Math.min(buffer.length, offset + remoteSshInputChunkBytes)));
}
};
const flush = (): void => {
while (pending.length > 0 && ws.readyState === WebSocket.OPEN) ws.send(pending.shift()!);
};
@@ -1294,6 +1305,193 @@ async function runRemoteSshWebSocketCaptureRemoteCommand(
});
}
async function runRemoteSshWebSocketStreamRemoteCommand(
session: FrontendSession,
invocation: ReturnType<typeof parseSshInvocation>,
remoteCommand: string,
handlers: SshRemoteCommandStreamHandlers,
input?: string,
): Promise<SshCaptureResult> {
const streamInvocation = {
...invocation,
parsed: { ...invocation.parsed, remoteCommand, requiresStdin: input !== undefined, invocationKind: "helper" as const },
};
const startedAtMs = Date.now();
const size = {
cols: Number(process.stdout.columns) > 0 ? Number(process.stdout.columns) : 100,
rows: Number(process.stdout.rows) > 0 ? Number(process.stdout.rows) : 30,
};
const runtimeTimeoutMs = sshRuntimeTimeoutMs();
const payload = {
providerId: invocation.providerId,
command: wrapSshRemoteCommand(remoteCommand),
cwd: sshRoutePayloadCwd(invocation.route),
tty: false,
stdinEotOnEnd: false,
openTimeoutMs: Math.max(15000, Number(process.env.UNIDESK_SSH_OPEN_TIMEOUT_MS || 60000)),
runtimeTimeoutMs,
cols: size.cols,
rows: size.rows,
};
const ws = openFrontendSshWebSocket(session);
let exitCode = 255;
let settled = false;
let canSend = false;
let sessionReady = false;
let stdout = "";
let stderr = "";
let streamError: unknown = null;
let streamWrites = Promise.resolve();
const pending: string[] = [];
const pendingSessionMessages: string[] = [];
const send = (value: unknown): void => {
const text = JSON.stringify(value);
if (!canSend || ws.readyState !== WebSocket.OPEN) {
pending.push(text);
return;
}
ws.send(text);
};
const sendWhenSessionReady = (value: unknown): void => {
const text = JSON.stringify(value);
if (!sessionReady || ws.readyState !== WebSocket.OPEN) {
pendingSessionMessages.push(text);
return;
}
ws.send(text);
};
const sendInput = (value: Buffer | string): void => {
sendWhenSessionReady({ type: "ssh.input", data: Buffer.from(value).toString("base64"), encoding: "base64" });
};
const sendInputChunked = (value: string): void => {
const buffer = Buffer.from(value, "utf8");
for (let offset = 0; offset < buffer.length; offset += remoteSshInputChunkBytes) {
sendInput(buffer.subarray(offset, Math.min(buffer.length, offset + remoteSshInputChunkBytes)));
}
};
const flush = (): void => {
while (pending.length > 0 && ws.readyState === WebSocket.OPEN) ws.send(pending.shift()!);
};
const flushSessionMessages = (): void => {
if (!sessionReady || ws.readyState !== WebSocket.OPEN) return;
while (pendingSessionMessages.length > 0) ws.send(pendingSessionMessages.shift()!);
};
const queueStreamWrite = (chunk: Buffer, stream: "stdout" | "stderr"): void => {
streamWrites = streamWrites.then(async () => {
if (stream === "stdout") await handlers.onStdout(chunk);
else if (handlers.onStderr !== undefined) await handlers.onStderr(chunk);
}).catch((error) => {
streamError = error;
stderr += `unidesk remote frontend ssh stream sink failed: ${error instanceof Error ? error.message : String(error)}\n`;
exitCode = 255;
try {
ws.close();
} catch {
// Ignore close failures after the local stream sink has failed.
}
});
};
return await new Promise<SshCaptureResult>((resolve) => {
let killTimer: ReturnType<typeof setTimeout> | null = null;
const finish = (code: number): void => {
if (settled) return;
settled = true;
clearTimeout(openTimer);
clearTimeout(runtimeTimer);
if (killTimer !== null) clearTimeout(killTimer);
void streamWrites.then(() => {
const finalCode = streamError === null ? code : 255;
const timingHint = formatSshRuntimeTimingHint(sshRuntimeTimingHint({
invocation: streamInvocation,
transport: "frontend-websocket",
exitCode: finalCode,
startedAtMs,
}));
if (timingHint) stderr += timingHint;
resolve({ exitCode: finalCode, stdout, stderr });
});
};
const openTimer = setTimeout(() => {
if (sessionReady || settled) return;
stderr += "unidesk remote frontend ssh bridge timed out waiting for provider session\n";
exitCode = 255;
try {
ws.close();
} catch {
// Ignore.
}
}, payload.openTimeoutMs);
const runtimeTimer = setTimeout(() => {
if (settled) return;
exitCode = 124;
stderr += formatSshRuntimeTimeoutHint(sshRuntimeTimeoutHint({
invocation: streamInvocation,
transport: "frontend-websocket",
timeoutMs: runtimeTimeoutMs,
}));
try {
ws.close();
} catch {
// Ignore.
}
killTimer = setTimeout(() => finish(124), 2000);
finish(124);
}, runtimeTimeoutMs);
ws.addEventListener("open", () => {
canSend = true;
send({ type: "ssh.open", ...payload });
flush();
});
ws.addEventListener("message", (event) => {
const text = webSocketDataText(event.data);
let message: Record<string, unknown>;
try {
message = JSON.parse(text) as Record<string, unknown>;
} catch {
stderr += `${text}\n`;
return;
}
if (message.type === "ssh.dispatched") return;
if (message.type === "ssh.opened") {
sessionReady = true;
clearTimeout(openTimer);
if (input !== undefined) sendInputChunked(input);
sendWhenSessionReady({ type: "ssh.eof" });
flushSessionMessages();
return;
}
if (message.type === "ssh.data") {
const chunk = Buffer.from(String(message.data ?? ""), message.encoding === "base64" ? "base64" : "utf8");
if (message.stream === "stderr") {
stderr += chunk.toString("utf8");
queueStreamWrite(chunk, "stderr");
} else {
queueStreamWrite(chunk, "stdout");
}
return;
}
if (message.type === "ssh.error") {
stderr += `${String(message.message || "ssh bridge error")}\n`;
exitCode = 255;
ws.close();
return;
}
if (message.type === "ssh.exit") {
exitCode = Number.isInteger(message.exitCode) ? Number(message.exitCode) : 255;
ws.close();
}
});
ws.addEventListener("close", () => finish(exitCode));
ws.addEventListener("error", () => {
stderr += "unidesk remote frontend ssh bridge websocket error\n";
finish(255);
});
});
}
export function remoteSshFrontendPlanForTest(target: string, args: string[]): Record<string, unknown> {
const invocation = parseSshInvocation(target, args);
return {
@@ -1316,6 +1514,7 @@ async function runRemoteSshOverFrontend(session: FrontendSession, target: string
if (isSshFileTransferOperation(normalizedArgs)) {
const executor: SshRemoteCommandExecutor = {
runRemoteCommand: (remoteCommand, input) => runRemoteSshWebSocketCaptureRemoteCommand(session, invocation, remoteCommand, input),
streamRemoteCommand: (remoteCommand, handlers, input) => runRemoteSshWebSocketStreamRemoteCommand(session, invocation, remoteCommand, handlers, input),
};
return await runSshFileTransferOperation(invocation, normalizedArgs, executor, {
buildRouteCommand: remoteCommandForRoute,
+135 -148
View File
@@ -1,10 +1,18 @@
import { createHash, randomBytes } from "node:crypto";
import { mkdir, readFile, writeFile } from "node:fs/promises";
import { once } from "node:events";
import { createWriteStream } from "node:fs";
import { mkdir, readFile, rm } from "node:fs/promises";
import path from "node:path";
import type { ParsedSshInvocation, ParsedSshRoute, SshCaptureResult } from "./ssh";
export interface SshRemoteCommandStreamHandlers {
onStdout(chunk: Buffer): Promise<void> | void;
onStderr?(chunk: Buffer): Promise<void> | void;
}
export interface SshRemoteCommandExecutor {
runRemoteCommand(remoteCommand: string, input?: string): Promise<SshCaptureResult>;
streamRemoteCommand?(remoteCommand: string, handlers: SshRemoteCommandStreamHandlers, input?: string): Promise<SshCaptureResult>;
}
export interface SshFileTransferCommandBuilders {
@@ -14,7 +22,6 @@ export interface SshFileTransferCommandBuilders {
type SshFileTransferOperation =
| "stat"
| "read-b64-block"
| "write-b64-argv"
| "write-b64-stdin"
| "write-b64-begin"
@@ -25,7 +32,6 @@ interface SshFileTransferCliOptions {
action: "upload" | "download";
localPath: string;
remotePath: string;
readBlockBytes: number;
}
interface SshFileTransferStat {
@@ -43,6 +49,17 @@ interface SshFileTransferWriteResult {
chunks: number;
}
interface SshFileTransferDownloadResult {
remote: SshFileTransferStat;
local: SshFileTransferStat;
strategy: "tcp-pool-stdout-stream";
transport: "host.ssh.tcp-pool";
chunks: number;
elapsedMs: number;
throughputBytesPerSecond: number;
remainingBytes: number;
}
class SshFileTransferError extends Error {
constructor(message: string, public readonly details: Record<string, unknown> = {}) {
super(message);
@@ -50,12 +67,9 @@ class SshFileTransferError extends Error {
}
}
const fileTransferReadBlockBytes = 1_048_576;
const fileTransferWriteB64ArgvLimit = 48_000;
const fileTransferWriteRawChunkBytes = 1_048_576;
const fileTransferWriteB64ChunkChars = 1_398_104;
const fileTransferReadBlockMaxAttempts = 12;
const fileTransferReadEmptyRetryDelayMs = 250;
const fileTransferProgressEveryChunks = 16;
export function isSshFileTransferOperation(args: string[]): boolean {
@@ -97,15 +111,10 @@ export async function runSshFileTransferOperation(
return 0;
}
const read = await readRemoteFileVerified(invocation, executor, builders, options.remotePath, options.readBlockBytes);
await mkdir(path.dirname(localPath), { recursive: true });
await writeFile(localPath, read.content);
const local = await readFile(localPath);
const localStat = { bytes: local.length, sha256: sha256HexBuffer(local) };
assertTransferStat("download final local verification", localPath, read.remote, localStat);
const read = await downloadRemoteFileVerified(invocation, executor, builders, options.remotePath, localPath);
const verification = buildTransferVerification(
{ side: "remote", path: options.remotePath, ...read.remote },
{ side: "local", path: localPath, ...localStat },
{ side: "local", path: localPath, ...read.local },
);
process.stdout.write(`${JSON.stringify({
ok: true,
@@ -119,9 +128,12 @@ export async function runSshFileTransferOperation(
verified: true,
verification,
transfer: {
strategy: "chunked-read",
strategy: read.strategy,
transport: read.transport,
chunks: read.chunks,
chunkBytes: options.readBlockBytes,
elapsedMs: read.elapsedMs,
throughputBytesPerSecond: read.throughputBytesPerSecond,
remainingBytes: read.remainingBytes,
},
}, null, 2)}\n`);
return 0;
@@ -131,7 +143,6 @@ function parseSshFileTransferCliOptions(args: string[]): SshFileTransferCliOptio
const action = args[0];
if (action !== "upload" && action !== "download") throw new Error("ssh file transfer requires upload or download");
const positionals: string[] = [];
let readBlockBytes = fileTransferReadBlockBytes;
for (let index = 1; index < args.length; index += 1) {
const arg = args[index] ?? "";
if (arg === "--") {
@@ -139,14 +150,10 @@ function parseSshFileTransferCliOptions(args: string[]): SshFileTransferCliOptio
break;
}
if (arg === "--chunk-bytes" || arg === "--block-bytes") {
const value = args[index + 1];
if (value === undefined) throw new Error(`ssh ${action} ${arg} requires a value`);
readBlockBytes = boundedTransferChunkBytes(value, `ssh ${action} ${arg}`);
index += 1;
continue;
throw new Error(`unsupported ssh ${action} option: ${arg}; downloads use host.ssh.tcp-pool stdout streaming and no longer support the legacy base64 block reader`);
}
if (arg === "--help" || arg === "-h" || arg === "help") {
throw new Error(`ssh ${action} usage: trans <route> ${action} ${action === "upload" ? "<local-file> <remote-file>" : "<remote-file> <local-file>"} [--chunk-bytes N]`);
throw new Error(`ssh ${action} usage: trans <route> ${action} ${action === "upload" ? "<local-file> <remote-file>" : "<remote-file> <local-file>"}`);
}
if (arg.startsWith("-")) throw new Error(`unsupported ssh ${action} option: ${arg}`);
positionals.push(arg);
@@ -157,14 +164,8 @@ function parseSshFileTransferCliOptions(args: string[]): SshFileTransferCliOptio
const [first, second] = positionals;
if (!first || !second) throw new Error(`ssh ${action} paths must be non-empty`);
return action === "upload"
? { action, localPath: first, remotePath: second, readBlockBytes }
: { action, remotePath: first, localPath: second, readBlockBytes };
}
function boundedTransferChunkBytes(raw: string, option: string): number {
const value = Number(raw);
if (!Number.isInteger(value) || value <= 0) throw new Error(`${option} must be a positive integer`);
return Math.min(4 * 1024 * 1024, Math.max(1024, value));
? { action, localPath: first, remotePath: second }
: { action, remotePath: first, localPath: second };
}
async function writeRemoteFileVerified(
@@ -203,150 +204,117 @@ async function writeRemoteFileVerified(
return { strategy: "chunked-stdin", chunks };
}
async function readRemoteFileVerified(
async function downloadRemoteFileVerified(
invocation: ParsedSshInvocation,
executor: SshRemoteCommandExecutor,
builders: SshFileTransferCommandBuilders,
remotePath: string,
readBlockBytes: number,
): Promise<{ remote: SshFileTransferStat; content: Buffer; chunks: number }> {
localPath: string,
): Promise<SshFileTransferDownloadResult> {
if (executor.streamRemoteCommand === undefined) {
throw new SshFileTransferError("ssh download requires a tcp-pool stdout stream sink", {
route: invocation.route.raw,
providerId: invocation.providerId,
remotePath,
localPath,
requiredTransport: "host.ssh.tcp-pool",
blocker: "streamRemoteCommand unavailable",
});
}
if (invocation.route.plane === "win") {
throw new SshFileTransferError("ssh download requires tcp-pool stdout streaming; win routes are not supported by the safe binary stream path", {
route: invocation.route.raw,
providerId: invocation.providerId,
remotePath,
localPath,
requiredTransport: "host.ssh.tcp-pool",
blocker: "win route has no verified binary stdout stream path",
});
}
const remote = await statRemoteFile(invocation, executor, builders, remotePath);
const chunks: Buffer[] = [];
await mkdir(path.dirname(localPath), { recursive: true });
const remoteCommand = buildStreamDownloadRemoteCommand(invocation.route, builders, remotePath);
const startedAtMs = Date.now();
const hash = createHash("sha256");
const output = createWriteStream(localPath, { flags: "w", mode: 0o600 });
let actualBytes = 0;
let chunkCount = 0;
for (let blockIndex = 0; blockIndex * readBlockBytes < remote.bytes; blockIndex += 1) {
const offsetBytes = blockIndex * readBlockBytes;
const expectedChunkBytes = Math.min(readBlockBytes, remote.bytes - offsetBytes);
const chunk = await readRemoteBase64BlockWithRetry(invocation, executor, builders, remotePath, readBlockBytes, blockIndex, expectedChunkBytes, remote.bytes, actualBytes);
chunks.push(chunk);
actualBytes += chunk.length;
chunkCount += 1;
emitDownloadProgress(invocation, remotePath, blockIndex, chunkCount, actualBytes, remote.bytes, chunk.length);
}
const content = Buffer.concat(chunks);
const actual = { bytes: content.length, sha256: sha256HexBuffer(content) };
assertTransferStat("download remote read verification", remotePath, remote, actual);
return { remote, content, chunks: chunkCount };
}
async function readRemoteBase64BlockWithRetry(
invocation: ParsedSshInvocation,
executor: SshRemoteCommandExecutor,
builders: SshFileTransferCommandBuilders,
remotePath: string,
readBlockBytes: number,
blockIndex: number,
expectedChunkBytes: number,
expectedBytes: number,
actualBytes: number,
): Promise<Buffer> {
const attemptErrors: Array<Record<string, unknown>> = [];
for (let attempt = 1; attempt <= fileTransferReadBlockMaxAttempts; attempt += 1) {
try {
const read = await checkedFileTransfer(invocation, executor, builders, "read-b64-block", [remotePath, String(blockIndex), String(readBlockBytes)]);
const encoded = read.stdout.replace(/\s+/gu, "");
const chunk = decodeRemoteBase64Block(encoded);
if (chunk !== null && chunk.length === expectedChunkBytes) return chunk;
const reason = chunk === null ? "invalid-base64" : chunk.length === 0 ? "empty-read" : "short-read";
attemptErrors.push({
attempt,
reason,
exitCode: read.exitCode,
stdoutBytes: read.stdout.length,
encodedBytes: encoded.length,
decodedBytes: chunk?.length ?? null,
expectedChunkBytes,
stderrTail: read.stderr.slice(-500),
try {
const result = await executor.streamRemoteCommand(remoteCommand, {
onStdout: async (chunk) => {
await writeStreamChunk(output, chunk);
actualBytes += chunk.length;
chunkCount += 1;
hash.update(chunk);
emitDownloadProgress(invocation, remotePath, chunkCount, actualBytes, remote.bytes, chunk.length, startedAtMs);
},
});
await closeWriteStream(output);
if (result.exitCode !== 0) {
throw new SshFileTransferError("remote ssh download stream failed", {
route: invocation.route.raw,
providerId: invocation.providerId,
remotePath,
localPath,
requiredTransport: "host.ssh.tcp-pool",
exitCode: result.exitCode,
stdout: transferTextSnapshot(result.stdout, { headChars: 120, tailChars: 500 }),
stderr: transferTextSnapshot(result.stderr, { headChars: 120, tailChars: 1000 }),
});
if (attempt < fileTransferReadBlockMaxAttempts) {
emitReadRetryProgress(invocation, remotePath, blockIndex, attempt, expectedBytes, actualBytes, expectedChunkBytes, chunk?.length ?? null, reason);
await delayMs(fileTransferReadEmptyRetryDelayMs);
}
} catch (error) {
attemptErrors.push({
attempt,
error: error instanceof Error ? error.message : String(error),
expectedChunkBytes,
});
if (attempt < fileTransferReadBlockMaxAttempts) {
emitReadRetryProgress(invocation, remotePath, blockIndex, attempt, expectedBytes, actualBytes, expectedChunkBytes, null, "remote-error");
await delayMs(fileTransferReadEmptyRetryDelayMs);
}
}
const elapsedMs = Math.max(1, Date.now() - startedAtMs);
const local = { bytes: actualBytes, sha256: hash.digest("hex") };
assertTransferStat("download tcp-pool stdout stream verification", localPath, remote, local);
emitDownloadProgress(invocation, remotePath, Math.max(1, chunkCount), actualBytes, remote.bytes, 0, startedAtMs, true);
return {
remote,
local,
strategy: "tcp-pool-stdout-stream",
transport: "host.ssh.tcp-pool",
chunks: chunkCount,
elapsedMs,
throughputBytesPerSecond: Math.round((actualBytes * 1000) / elapsedMs),
remainingBytes: Math.max(0, remote.bytes - actualBytes),
};
} catch (error) {
output.destroy();
await rm(localPath, { force: true }).catch(() => undefined);
throw error;
}
throw new SshFileTransferError("remote download returned an invalid block before EOF after retries", {
route: invocation.route.raw,
remotePath,
blockIndex,
attempts: fileTransferReadBlockMaxAttempts,
expectedBytes,
actualBytes,
expectedChunkBytes,
attemptErrors,
});
}
function decodeRemoteBase64Block(encoded: string): Buffer | null {
if (encoded.length === 0) return Buffer.alloc(0);
if (!/^[A-Za-z0-9+/]*={0,2}$/u.test(encoded) || encoded.length % 4 !== 0) return null;
return Buffer.from(encoded, "base64");
}
function emitDownloadProgress(
invocation: ParsedSshInvocation,
remotePath: string,
blockIndex: number,
chunkCount: number,
actualBytes: number,
expectedBytes: number,
lastChunkBytes: number,
startedAtMs: number,
force = false,
): void {
if (chunkCount !== 1 && actualBytes < expectedBytes && chunkCount % fileTransferProgressEveryChunks !== 0) return;
if (!force && chunkCount !== 1 && actualBytes < expectedBytes && chunkCount % fileTransferProgressEveryChunks !== 0) return;
const elapsedMs = Math.max(1, Date.now() - startedAtMs);
process.stderr.write(`${JSON.stringify({
event: "unidesk.ssh.download.progress",
at: new Date().toISOString(),
route: invocation.route.raw,
providerId: invocation.providerId,
remotePath,
blockIndex,
strategy: "tcp-pool-stdout-stream",
transport: "host.ssh.tcp-pool",
chunks: chunkCount,
bytes: actualBytes,
totalBytes: expectedBytes,
actualBytes,
expectedBytes,
lastChunkBytes,
remainingBytes: Math.max(0, expectedBytes - actualBytes),
elapsedMs,
throughputBytesPerSecond: Math.round((actualBytes * 1000) / elapsedMs),
})}\n`);
}
function emitReadRetryProgress(
invocation: ParsedSshInvocation,
remotePath: string,
blockIndex: number,
attempt: number,
expectedBytes: number,
actualBytes: number,
expectedChunkBytes: number,
actualChunkBytes: number | null,
reason: string,
): void {
const event = reason === "empty-read" ? "unidesk.ssh.download.empty-read-retry" : "unidesk.ssh.download.short-read-retry";
process.stderr.write(`${JSON.stringify({
event,
route: invocation.route.raw,
providerId: invocation.providerId,
remotePath,
blockIndex,
attempt,
maxAttempts: fileTransferReadBlockMaxAttempts,
reason,
actualBytes,
expectedBytes,
actualChunkBytes,
expectedChunkBytes,
})}\n`);
}
function delayMs(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
async function statRemoteFile(
invocation: ParsedSshInvocation,
executor: SshRemoteCommandExecutor,
@@ -357,6 +325,31 @@ async function statRemoteFile(
return parseFileTransferStat(stat.stdout, stat.stderr, remotePath);
}
function buildStreamDownloadRemoteCommand(
route: ParsedSshRoute,
builders: SshFileTransferCommandBuilders,
remotePath: string,
): string {
return builders.buildRouteCommand(route, ["sh", "-c", "cat -- \"$1\"", "unidesk-download", remotePath]);
}
async function writeStreamChunk(stream: ReturnType<typeof createWriteStream>, chunk: Buffer): Promise<void> {
if (chunk.length === 0) return;
await new Promise<void>((resolve, reject) => {
stream.write(chunk, (error) => {
if (error) reject(error);
else resolve();
});
});
}
async function closeWriteStream(stream: ReturnType<typeof createWriteStream>): Promise<void> {
if (stream.closed) return;
const closed = once(stream, "close");
stream.end();
await closed;
}
async function checkedFileTransfer(
invocation: ParsedSshInvocation,
executor: SshRemoteCommandExecutor,
@@ -486,11 +479,6 @@ function posixFileTransferScript(): string {
" stat)",
" target=$1; bytes=$(wc -c < \"$target\" | tr -d '[:space:]'); digest=$(sha256_file \"$target\"); printf '%s %s\\n' \"$bytes\" \"$digest\"",
" ;;",
" read-b64-block)",
" target=$1; block_index=$2; block_size=$3",
" case \"$block_index:$block_size\" in *[!0-9:]*|:*) printf 'invalid read block args\\n' >&2; exit 2;; esac",
" dd if=\"$target\" bs=\"$block_size\" skip=\"$block_index\" count=1 2>/dev/null | base64 | tr -d '\\n'",
" ;;",
" write-b64-argv)",
" target=$1; expected_bytes=$2; expected_sha256=$3; shift 3",
" ensure_parent \"$target\"; base=${target##*/}; dir=.; case \"$target\" in */*) dir=${target%/*};; esac",
@@ -563,7 +551,6 @@ function windowsFileTransferScript(basePath: string | null, operation: SshFileTr
"$target = Resolve-UnideskPath $targetArg;",
"switch ($operation) {",
" 'stat' { if (-not (Test-Path -LiteralPath $target -PathType Leaf)) { Fail ('file not found: ' + $target) 1 }; $bytes = ([System.IO.FileInfo]$target).Length; $digest = Get-Sha256 $target; [Console]::Out.WriteLine(([string]$bytes) + ' ' + $digest); break }",
" 'read-b64-block' { $blockIndex = [Int64]$arg1; $blockSize = [Int32]$arg2; if ($blockIndex -lt 0 -or $blockSize -le 0) { Fail 'invalid read block args' 2 }; $fs = [System.IO.File]::Open($target, [System.IO.FileMode]::Open, [System.IO.FileAccess]::Read, [System.IO.FileShare]::ReadWrite); try { [void]$fs.Seek($blockIndex * $blockSize, [System.IO.SeekOrigin]::Begin); $buffer = New-Object byte[] $blockSize; $read = $fs.Read($buffer, 0, $blockSize); if ($read -gt 0) { [Console]::Out.Write([Convert]::ToBase64String($buffer, 0, $read)) } } finally { $fs.Dispose() }; break }",
" 'write-b64-argv' { Decode-ToTarget $target ([string]::Concat($argvChunks)) ([Int64]$arg1) $arg2; break }",
" 'write-b64-stdin' { Decode-ToTarget $target ([Console]::In.ReadToEnd()) ([Int64]$arg1) $arg2; break }",
" 'write-b64-begin' { Ensure-Parent $target; Set-TmpPaths $target $arg1; [System.IO.File]::WriteAllText($script:tmpB64, '', [System.Text.Encoding]::ASCII); break }",
+129 -1
View File
@@ -13,7 +13,12 @@ import {
type ApplyPatchV2Executor,
type ApplyPatchV2FileSystem,
} from "./apply-patch-v2";
import { isSshFileTransferOperation, runSshFileTransferOperation, type SshRemoteCommandExecutor } from "./ssh-file-transfer";
import {
isSshFileTransferOperation,
runSshFileTransferOperation,
type SshRemoteCommandExecutor,
type SshRemoteCommandStreamHandlers,
} from "./ssh-file-transfer";
export interface ParsedSshArgs {
remoteCommand: string | null;
@@ -2843,6 +2848,128 @@ async function runSshCaptureRemoteCommand(config: UniDeskConfig, invocation: Par
});
}
async function runSshStreamRemoteCommand(
config: UniDeskConfig,
invocation: ParsedSshInvocation,
remoteCommand: string,
handlers: SshRemoteCommandStreamHandlers,
input?: string,
): Promise<SshCaptureResult> {
const streamInvocation = {
...invocation,
parsed: { ...invocation.parsed, remoteCommand, requiresStdin: input !== undefined, invocationKind: "helper" as const },
};
const startedAtMs = Date.now();
const size = terminalSize();
const runtimeTimeoutMs = sshRuntimeTimeoutMs();
const payload = {
providerId: invocation.providerId,
command: wrapSshRemoteCommand(remoteCommand),
cwd: sshRoutePayloadCwd(invocation.route),
tty: false,
stdinEotOnEnd: false,
openTimeoutMs: Math.max(15000, Number(process.env.UNIDESK_SSH_OPEN_TIMEOUT_MS || 60000)),
runtimeTimeoutMs,
cols: size.cols,
rows: size.rows,
};
const payloadJson = JSON.stringify(payload);
const encodedBrokerSource = Buffer.from(brokerSource(), "utf8").toString("base64");
const script = [
"set -eu",
`payload=${shellQuote(payloadJson)}`,
"if command -v backend-core >/dev/null 2>&1; then",
' exec backend-core --ssh-broker "$payload"',
"fi",
`export UNIDESK_SSH_BROKER_URL=${shellQuote("ws://127.0.0.1:8080/ws/ssh")}`,
'broker_js="$(mktemp "${TMPDIR:-/tmp}/unidesk-ssh-broker.XXXXXX")"',
'trap \'rm -f "$broker_js"\' EXIT',
`printf %s ${shellQuote(encodedBrokerSource)} | base64 -d >"$broker_js"`,
'bun "$broker_js" "$payload"',
].join("\n");
const child = spawn("docker", ["exec", "-i", "unidesk-backend-core", "sh", "-c", script], {
cwd: repoRoot,
stdio: ["pipe", "pipe", "pipe"],
});
if (input !== undefined) {
writeChunkedStdin(child.stdin, input);
} else {
child.stdin.end();
}
let stdout = "";
let stderr = "";
let streamError: unknown = null;
let streamWrites = Promise.resolve();
const queueStreamWrite = (chunk: Buffer, stream: "stdout" | "stderr"): void => {
streamWrites = streamWrites.then(async () => {
if (stream === "stdout") await handlers.onStdout(chunk);
else if (handlers.onStderr !== undefined) await handlers.onStderr(chunk);
}).catch((error) => {
streamError = error;
stderr += `unidesk ssh stream sink failed: ${error instanceof Error ? error.message : String(error)}\n`;
try {
child.kill("SIGTERM");
} catch {
// Ignore kill failures after a local stream sink error.
}
});
};
child.stdout.on("data", (chunk: Buffer) => {
queueStreamWrite(chunk, "stdout");
});
child.stderr.on("data", (chunk: Buffer) => {
stderr += chunk.toString("utf8");
queueStreamWrite(chunk, "stderr");
});
return await new Promise<SshCaptureResult>((resolve) => {
let settled = false;
let killTimer: NodeJS.Timeout | null = null;
const finish = (exitCode: number): void => {
if (settled) return;
settled = true;
clearTimeout(runtimeTimer);
if (killTimer !== null) clearTimeout(killTimer);
void streamWrites.then(() => {
const finalCode = streamError === null ? exitCode : 255;
const timingHint = formatSshRuntimeTimingHint(sshRuntimeTimingHint({
invocation: streamInvocation,
transport: "backend-core-broker",
exitCode: finalCode,
startedAtMs,
}));
if (timingHint) stderr += timingHint;
resolve({ exitCode: finalCode, stdout, stderr });
});
};
const runtimeTimer = setTimeout(() => {
const hint = formatSshRuntimeTimeoutHint(sshRuntimeTimeoutHint({
invocation: streamInvocation,
transport: "backend-core-broker",
timeoutMs: runtimeTimeoutMs,
}));
stderr += hint;
try {
child.kill("SIGTERM");
} catch {
// Ignore.
}
killTimer = setTimeout(() => {
try {
child.kill("SIGKILL");
} catch {
// Ignore.
}
}, 2000);
finish(124);
}, runtimeTimeoutMs);
child.on("error", (error) => {
stderr += `unidesk ssh failed to start broker: ${error.message}\n`;
finish(255);
});
child.on("close", (code) => finish(code ?? 255));
});
}
async function runRemoteSsh(config: UniDeskConfig, host: string, providerId: string, args: string[]): Promise<number> {
const { runRemoteCli } = await import("./remote");
return await runRemoteCli({
@@ -2950,6 +3077,7 @@ export async function runSsh(config: UniDeskConfig, providerId: string, args: st
if (isSshFileTransferOperation(normalizedArgs)) {
const executor: SshRemoteCommandExecutor = {
runRemoteCommand: (remoteCommand, input) => runSshCaptureRemoteCommand(config, invocation, remoteCommand, input),
streamRemoteCommand: (remoteCommand, handlers, input) => runSshStreamRemoteCommand(config, invocation, remoteCommand, handlers, input),
};
return await runSshFileTransferOperation(invocation, normalizedArgs, executor, {
buildRouteCommand: remoteCommandForRoute,