diff --git a/scripts/src/artifact-registry.ts b/scripts/src/artifact-registry.ts index 683ccbfe..1e9cd2c0 100644 --- a/scripts/src/artifact-registry.ts +++ b/scripts/src/artifact-registry.ts @@ -1685,8 +1685,6 @@ function downloadRemoteFile(options: ArtifactRegistryOptions, remotePath: string "ssh", options.providerId, "download", - "--chunk-bytes", - "1048576", remotePath, localPath, ]; diff --git a/scripts/src/help.ts b/scripts/src/help.ts index 5bc507b6..1cbbc0e8 100644 --- a/scripts/src/help.ts +++ b/scripts/src/help.ts @@ -22,7 +22,7 @@ export function rootHelp(): unknown { { command: "trans [operation args...] (alias of ssh ...)", description: "Open a Host SSH / WSL SSH maintenance session; provider WebSocket carries control and host.ssh.tcp-pool carries stdin/stdout/stderr data." }, { command: "trans gh:/owner/repo[/pr|/issue][/number[/1]] ls|cat|rg|patch-apply", description: "Treat GitHub PRs/issues as virtual text directories; `ls --full` shows state/floors/body length, and `patch-apply` updates first-floor `body.md` through UniDesk gh plus apply-patch v2." }, { command: "trans apply-patch < patch.diff", description: "Default remote text patch entry: apply a standard patch with the local TypeScript v2 engine while the remote route only reads and writes files." }, - { command: "trans upload | trans download ", description: "Transfer whole files through SSH passthrough with remote temp files, automatic endpoint byte/SHA-256 verification, and client-side chunk fallback." }, + { command: "trans upload | trans download ", description: "Transfer whole files through SSH passthrough with automatic endpoint byte/SHA-256 verification; downloads stream stdout over host.ssh.tcp-pool." }, { command: "trans apply-patch-v1 [tool args...] < patch.diff", description: "Fallback to the injected legacy remote apply_patch helper directly over SSH passthrough and stream the patch from local stdin." }, { command: "trans py [script-args...] < script.py", description: "Run remote Python from local stdin through SSH passthrough without nested shell quoting; extra args become script argv." }, { command: "trans script [--shell sh|bash] [script-args...] <<'SCRIPT' ...", description: "Run a remote shell script from local stdin using shell -s; default sh inherits provider proxy env and gets the portable printf helper used by shell/script." }, diff --git a/scripts/src/jobs.ts b/scripts/src/jobs.ts index 2d1101f4..d916cf93 100644 --- a/scripts/src/jobs.ts +++ b/scripts/src/jobs.ts @@ -496,7 +496,14 @@ function genericJobProgress(job: JobRecord, stderrTailOverride?: string): JobPro }); const downloadSummary = lastDownload === null ? null - : ` ssh-download ${Number(lastDownload.actualBytes ?? 0)}/${Number(lastDownload.expectedBytes ?? 0)} bytes chunks=${Number(lastDownload.chunks ?? 0)}${downloadRetryEvents.length === 0 ? "" : ` retries=${downloadRetryEvents.length} lastRetry=${String(lastDownload.reason ?? "unknown")}`}`; + : [ + ` ssh-download ${Number(lastDownload.actualBytes ?? lastDownload.bytes ?? 0)}/${Number(lastDownload.expectedBytes ?? lastDownload.totalBytes ?? 0)} bytes`, + `chunks=${Number(lastDownload.chunks ?? 0)}`, + `remaining=${Number(lastDownload.remainingBytes ?? 0)}`, + `throughput=${Number(lastDownload.throughputBytesPerSecond ?? 0)}Bps`, + `elapsedMs=${Number(lastDownload.elapsedMs ?? 0)}`, + downloadRetryEvents.length === 0 ? null : `retries=${downloadRetryEvents.length} lastRetry=${String(lastDownload.reason ?? "unknown")}`, + ].filter(Boolean).join(" "); return { kind: "generic", stage: lastDownload === null ? null : "ssh-download", diff --git a/scripts/src/remote.ts b/scripts/src/remote.ts index 99845869..455a8392 100644 --- a/scripts/src/remote.ts +++ b/scripts/src/remote.ts @@ -24,7 +24,12 @@ import { wrapSshRemoteCommand, type SshCaptureResult, } from "./ssh"; -import { isSshFileTransferOperation, runSshFileTransferOperation, type SshRemoteCommandExecutor } from "./ssh-file-transfer"; +import { + isSshFileTransferOperation, + runSshFileTransferOperation, + type SshRemoteCommandExecutor, + type SshRemoteCommandStreamHandlers, +} from "./ssh-file-transfer"; import { runApplyPatchV2, type ApplyPatchV2Executor } from "./apply-patch-v2"; import { codexJudgeQueryAsync, codexOutputQueryAsync, codexPrPreflightQueryAsync, codexQueuesQueryAsync, codexTaskQueryAsync, codexTasksQueryAsync, codexUnreadTriageAsync } from "./code-queue"; import { runDecisionCenterCommandAsync } from "./decision-center"; @@ -1194,6 +1199,12 @@ async function runRemoteSshWebSocketCaptureRemoteCommand( const sendInput = (value: Buffer | string): void => { sendWhenSessionReady({ type: "ssh.input", data: Buffer.from(value).toString("base64"), encoding: "base64" }); }; + const sendInputChunked = (value: string): void => { + const buffer = Buffer.from(value, "utf8"); + for (let offset = 0; offset < buffer.length; offset += remoteSshInputChunkBytes) { + sendInput(buffer.subarray(offset, Math.min(buffer.length, offset + remoteSshInputChunkBytes))); + } + }; const flush = (): void => { while (pending.length > 0 && ws.readyState === WebSocket.OPEN) ws.send(pending.shift()!); }; @@ -1294,6 +1305,193 @@ async function runRemoteSshWebSocketCaptureRemoteCommand( }); } +async function runRemoteSshWebSocketStreamRemoteCommand( + session: FrontendSession, + invocation: ReturnType, + remoteCommand: string, + handlers: SshRemoteCommandStreamHandlers, + input?: string, +): Promise { + const streamInvocation = { + ...invocation, + parsed: { ...invocation.parsed, remoteCommand, requiresStdin: input !== undefined, invocationKind: "helper" as const }, + }; + const startedAtMs = Date.now(); + const size = { + cols: Number(process.stdout.columns) > 0 ? Number(process.stdout.columns) : 100, + rows: Number(process.stdout.rows) > 0 ? Number(process.stdout.rows) : 30, + }; + const runtimeTimeoutMs = sshRuntimeTimeoutMs(); + const payload = { + providerId: invocation.providerId, + command: wrapSshRemoteCommand(remoteCommand), + cwd: sshRoutePayloadCwd(invocation.route), + tty: false, + stdinEotOnEnd: false, + openTimeoutMs: Math.max(15000, Number(process.env.UNIDESK_SSH_OPEN_TIMEOUT_MS || 60000)), + runtimeTimeoutMs, + cols: size.cols, + rows: size.rows, + }; + const ws = openFrontendSshWebSocket(session); + let exitCode = 255; + let settled = false; + let canSend = false; + let sessionReady = false; + let stdout = ""; + let stderr = ""; + let streamError: unknown = null; + let streamWrites = Promise.resolve(); + const pending: string[] = []; + const pendingSessionMessages: string[] = []; + + const send = (value: unknown): void => { + const text = JSON.stringify(value); + if (!canSend || ws.readyState !== WebSocket.OPEN) { + pending.push(text); + return; + } + ws.send(text); + }; + const sendWhenSessionReady = (value: unknown): void => { + const text = JSON.stringify(value); + if (!sessionReady || ws.readyState !== WebSocket.OPEN) { + pendingSessionMessages.push(text); + return; + } + ws.send(text); + }; + const sendInput = (value: Buffer | string): void => { + sendWhenSessionReady({ type: "ssh.input", data: Buffer.from(value).toString("base64"), encoding: "base64" }); + }; + const sendInputChunked = (value: string): void => { + const buffer = Buffer.from(value, "utf8"); + for (let offset = 0; offset < buffer.length; offset += remoteSshInputChunkBytes) { + sendInput(buffer.subarray(offset, Math.min(buffer.length, offset + remoteSshInputChunkBytes))); + } + }; + const flush = (): void => { + while (pending.length > 0 && ws.readyState === WebSocket.OPEN) ws.send(pending.shift()!); + }; + const flushSessionMessages = (): void => { + if (!sessionReady || ws.readyState !== WebSocket.OPEN) return; + while (pendingSessionMessages.length > 0) ws.send(pendingSessionMessages.shift()!); + }; + const queueStreamWrite = (chunk: Buffer, stream: "stdout" | "stderr"): void => { + streamWrites = streamWrites.then(async () => { + if (stream === "stdout") await handlers.onStdout(chunk); + else if (handlers.onStderr !== undefined) await handlers.onStderr(chunk); + }).catch((error) => { + streamError = error; + stderr += `unidesk remote frontend ssh stream sink failed: ${error instanceof Error ? error.message : String(error)}\n`; + exitCode = 255; + try { + ws.close(); + } catch { + // Ignore close failures after the local stream sink has failed. + } + }); + }; + + return await new Promise((resolve) => { + let killTimer: ReturnType | null = null; + const finish = (code: number): void => { + if (settled) return; + settled = true; + clearTimeout(openTimer); + clearTimeout(runtimeTimer); + if (killTimer !== null) clearTimeout(killTimer); + void streamWrites.then(() => { + const finalCode = streamError === null ? code : 255; + const timingHint = formatSshRuntimeTimingHint(sshRuntimeTimingHint({ + invocation: streamInvocation, + transport: "frontend-websocket", + exitCode: finalCode, + startedAtMs, + })); + if (timingHint) stderr += timingHint; + resolve({ exitCode: finalCode, stdout, stderr }); + }); + }; + const openTimer = setTimeout(() => { + if (sessionReady || settled) return; + stderr += "unidesk remote frontend ssh bridge timed out waiting for provider session\n"; + exitCode = 255; + try { + ws.close(); + } catch { + // Ignore. + } + }, payload.openTimeoutMs); + const runtimeTimer = setTimeout(() => { + if (settled) return; + exitCode = 124; + stderr += formatSshRuntimeTimeoutHint(sshRuntimeTimeoutHint({ + invocation: streamInvocation, + transport: "frontend-websocket", + timeoutMs: runtimeTimeoutMs, + })); + try { + ws.close(); + } catch { + // Ignore. + } + killTimer = setTimeout(() => finish(124), 2000); + finish(124); + }, runtimeTimeoutMs); + + ws.addEventListener("open", () => { + canSend = true; + send({ type: "ssh.open", ...payload }); + flush(); + }); + ws.addEventListener("message", (event) => { + const text = webSocketDataText(event.data); + let message: Record; + try { + message = JSON.parse(text) as Record; + } catch { + stderr += `${text}\n`; + return; + } + if (message.type === "ssh.dispatched") return; + if (message.type === "ssh.opened") { + sessionReady = true; + clearTimeout(openTimer); + if (input !== undefined) sendInputChunked(input); + sendWhenSessionReady({ type: "ssh.eof" }); + flushSessionMessages(); + return; + } + if (message.type === "ssh.data") { + const chunk = Buffer.from(String(message.data ?? ""), message.encoding === "base64" ? "base64" : "utf8"); + if (message.stream === "stderr") { + stderr += chunk.toString("utf8"); + queueStreamWrite(chunk, "stderr"); + } else { + queueStreamWrite(chunk, "stdout"); + } + return; + } + if (message.type === "ssh.error") { + stderr += `${String(message.message || "ssh bridge error")}\n`; + exitCode = 255; + ws.close(); + return; + } + if (message.type === "ssh.exit") { + exitCode = Number.isInteger(message.exitCode) ? Number(message.exitCode) : 255; + ws.close(); + } + }); + ws.addEventListener("close", () => finish(exitCode)); + ws.addEventListener("error", () => { + stderr += "unidesk remote frontend ssh bridge websocket error\n"; + finish(255); + }); + }); +} + export function remoteSshFrontendPlanForTest(target: string, args: string[]): Record { const invocation = parseSshInvocation(target, args); return { @@ -1316,6 +1514,7 @@ async function runRemoteSshOverFrontend(session: FrontendSession, target: string if (isSshFileTransferOperation(normalizedArgs)) { const executor: SshRemoteCommandExecutor = { runRemoteCommand: (remoteCommand, input) => runRemoteSshWebSocketCaptureRemoteCommand(session, invocation, remoteCommand, input), + streamRemoteCommand: (remoteCommand, handlers, input) => runRemoteSshWebSocketStreamRemoteCommand(session, invocation, remoteCommand, handlers, input), }; return await runSshFileTransferOperation(invocation, normalizedArgs, executor, { buildRouteCommand: remoteCommandForRoute, diff --git a/scripts/src/ssh-file-transfer.ts b/scripts/src/ssh-file-transfer.ts index c46c4e15..06363471 100644 --- a/scripts/src/ssh-file-transfer.ts +++ b/scripts/src/ssh-file-transfer.ts @@ -1,10 +1,18 @@ import { createHash, randomBytes } from "node:crypto"; -import { mkdir, readFile, writeFile } from "node:fs/promises"; +import { once } from "node:events"; +import { createWriteStream } from "node:fs"; +import { mkdir, readFile, rm } from "node:fs/promises"; import path from "node:path"; import type { ParsedSshInvocation, ParsedSshRoute, SshCaptureResult } from "./ssh"; +export interface SshRemoteCommandStreamHandlers { + onStdout(chunk: Buffer): Promise | void; + onStderr?(chunk: Buffer): Promise | void; +} + export interface SshRemoteCommandExecutor { runRemoteCommand(remoteCommand: string, input?: string): Promise; + streamRemoteCommand?(remoteCommand: string, handlers: SshRemoteCommandStreamHandlers, input?: string): Promise; } export interface SshFileTransferCommandBuilders { @@ -14,7 +22,6 @@ export interface SshFileTransferCommandBuilders { type SshFileTransferOperation = | "stat" - | "read-b64-block" | "write-b64-argv" | "write-b64-stdin" | "write-b64-begin" @@ -25,7 +32,6 @@ interface SshFileTransferCliOptions { action: "upload" | "download"; localPath: string; remotePath: string; - readBlockBytes: number; } interface SshFileTransferStat { @@ -43,6 +49,17 @@ interface SshFileTransferWriteResult { chunks: number; } +interface SshFileTransferDownloadResult { + remote: SshFileTransferStat; + local: SshFileTransferStat; + strategy: "tcp-pool-stdout-stream"; + transport: "host.ssh.tcp-pool"; + chunks: number; + elapsedMs: number; + throughputBytesPerSecond: number; + remainingBytes: number; +} + class SshFileTransferError extends Error { constructor(message: string, public readonly details: Record = {}) { super(message); @@ -50,12 +67,9 @@ class SshFileTransferError extends Error { } } -const fileTransferReadBlockBytes = 1_048_576; const fileTransferWriteB64ArgvLimit = 48_000; const fileTransferWriteRawChunkBytes = 1_048_576; const fileTransferWriteB64ChunkChars = 1_398_104; -const fileTransferReadBlockMaxAttempts = 12; -const fileTransferReadEmptyRetryDelayMs = 250; const fileTransferProgressEveryChunks = 16; export function isSshFileTransferOperation(args: string[]): boolean { @@ -97,15 +111,10 @@ export async function runSshFileTransferOperation( return 0; } - const read = await readRemoteFileVerified(invocation, executor, builders, options.remotePath, options.readBlockBytes); - await mkdir(path.dirname(localPath), { recursive: true }); - await writeFile(localPath, read.content); - const local = await readFile(localPath); - const localStat = { bytes: local.length, sha256: sha256HexBuffer(local) }; - assertTransferStat("download final local verification", localPath, read.remote, localStat); + const read = await downloadRemoteFileVerified(invocation, executor, builders, options.remotePath, localPath); const verification = buildTransferVerification( { side: "remote", path: options.remotePath, ...read.remote }, - { side: "local", path: localPath, ...localStat }, + { side: "local", path: localPath, ...read.local }, ); process.stdout.write(`${JSON.stringify({ ok: true, @@ -119,9 +128,12 @@ export async function runSshFileTransferOperation( verified: true, verification, transfer: { - strategy: "chunked-read", + strategy: read.strategy, + transport: read.transport, chunks: read.chunks, - chunkBytes: options.readBlockBytes, + elapsedMs: read.elapsedMs, + throughputBytesPerSecond: read.throughputBytesPerSecond, + remainingBytes: read.remainingBytes, }, }, null, 2)}\n`); return 0; @@ -131,7 +143,6 @@ function parseSshFileTransferCliOptions(args: string[]): SshFileTransferCliOptio const action = args[0]; if (action !== "upload" && action !== "download") throw new Error("ssh file transfer requires upload or download"); const positionals: string[] = []; - let readBlockBytes = fileTransferReadBlockBytes; for (let index = 1; index < args.length; index += 1) { const arg = args[index] ?? ""; if (arg === "--") { @@ -139,14 +150,10 @@ function parseSshFileTransferCliOptions(args: string[]): SshFileTransferCliOptio break; } if (arg === "--chunk-bytes" || arg === "--block-bytes") { - const value = args[index + 1]; - if (value === undefined) throw new Error(`ssh ${action} ${arg} requires a value`); - readBlockBytes = boundedTransferChunkBytes(value, `ssh ${action} ${arg}`); - index += 1; - continue; + throw new Error(`unsupported ssh ${action} option: ${arg}; downloads use host.ssh.tcp-pool stdout streaming and no longer support the legacy base64 block reader`); } if (arg === "--help" || arg === "-h" || arg === "help") { - throw new Error(`ssh ${action} usage: trans ${action} ${action === "upload" ? " " : " "} [--chunk-bytes N]`); + throw new Error(`ssh ${action} usage: trans ${action} ${action === "upload" ? " " : " "}`); } if (arg.startsWith("-")) throw new Error(`unsupported ssh ${action} option: ${arg}`); positionals.push(arg); @@ -157,14 +164,8 @@ function parseSshFileTransferCliOptions(args: string[]): SshFileTransferCliOptio const [first, second] = positionals; if (!first || !second) throw new Error(`ssh ${action} paths must be non-empty`); return action === "upload" - ? { action, localPath: first, remotePath: second, readBlockBytes } - : { action, remotePath: first, localPath: second, readBlockBytes }; -} - -function boundedTransferChunkBytes(raw: string, option: string): number { - const value = Number(raw); - if (!Number.isInteger(value) || value <= 0) throw new Error(`${option} must be a positive integer`); - return Math.min(4 * 1024 * 1024, Math.max(1024, value)); + ? { action, localPath: first, remotePath: second } + : { action, remotePath: first, localPath: second }; } async function writeRemoteFileVerified( @@ -203,150 +204,117 @@ async function writeRemoteFileVerified( return { strategy: "chunked-stdin", chunks }; } -async function readRemoteFileVerified( +async function downloadRemoteFileVerified( invocation: ParsedSshInvocation, executor: SshRemoteCommandExecutor, builders: SshFileTransferCommandBuilders, remotePath: string, - readBlockBytes: number, -): Promise<{ remote: SshFileTransferStat; content: Buffer; chunks: number }> { + localPath: string, +): Promise { + if (executor.streamRemoteCommand === undefined) { + throw new SshFileTransferError("ssh download requires a tcp-pool stdout stream sink", { + route: invocation.route.raw, + providerId: invocation.providerId, + remotePath, + localPath, + requiredTransport: "host.ssh.tcp-pool", + blocker: "streamRemoteCommand unavailable", + }); + } + if (invocation.route.plane === "win") { + throw new SshFileTransferError("ssh download requires tcp-pool stdout streaming; win routes are not supported by the safe binary stream path", { + route: invocation.route.raw, + providerId: invocation.providerId, + remotePath, + localPath, + requiredTransport: "host.ssh.tcp-pool", + blocker: "win route has no verified binary stdout stream path", + }); + } const remote = await statRemoteFile(invocation, executor, builders, remotePath); - const chunks: Buffer[] = []; + await mkdir(path.dirname(localPath), { recursive: true }); + const remoteCommand = buildStreamDownloadRemoteCommand(invocation.route, builders, remotePath); + const startedAtMs = Date.now(); + const hash = createHash("sha256"); + const output = createWriteStream(localPath, { flags: "w", mode: 0o600 }); let actualBytes = 0; let chunkCount = 0; - for (let blockIndex = 0; blockIndex * readBlockBytes < remote.bytes; blockIndex += 1) { - const offsetBytes = blockIndex * readBlockBytes; - const expectedChunkBytes = Math.min(readBlockBytes, remote.bytes - offsetBytes); - const chunk = await readRemoteBase64BlockWithRetry(invocation, executor, builders, remotePath, readBlockBytes, blockIndex, expectedChunkBytes, remote.bytes, actualBytes); - chunks.push(chunk); - actualBytes += chunk.length; - chunkCount += 1; - emitDownloadProgress(invocation, remotePath, blockIndex, chunkCount, actualBytes, remote.bytes, chunk.length); - } - const content = Buffer.concat(chunks); - const actual = { bytes: content.length, sha256: sha256HexBuffer(content) }; - assertTransferStat("download remote read verification", remotePath, remote, actual); - return { remote, content, chunks: chunkCount }; -} - -async function readRemoteBase64BlockWithRetry( - invocation: ParsedSshInvocation, - executor: SshRemoteCommandExecutor, - builders: SshFileTransferCommandBuilders, - remotePath: string, - readBlockBytes: number, - blockIndex: number, - expectedChunkBytes: number, - expectedBytes: number, - actualBytes: number, -): Promise { - const attemptErrors: Array> = []; - for (let attempt = 1; attempt <= fileTransferReadBlockMaxAttempts; attempt += 1) { - try { - const read = await checkedFileTransfer(invocation, executor, builders, "read-b64-block", [remotePath, String(blockIndex), String(readBlockBytes)]); - const encoded = read.stdout.replace(/\s+/gu, ""); - const chunk = decodeRemoteBase64Block(encoded); - if (chunk !== null && chunk.length === expectedChunkBytes) return chunk; - const reason = chunk === null ? "invalid-base64" : chunk.length === 0 ? "empty-read" : "short-read"; - attemptErrors.push({ - attempt, - reason, - exitCode: read.exitCode, - stdoutBytes: read.stdout.length, - encodedBytes: encoded.length, - decodedBytes: chunk?.length ?? null, - expectedChunkBytes, - stderrTail: read.stderr.slice(-500), + try { + const result = await executor.streamRemoteCommand(remoteCommand, { + onStdout: async (chunk) => { + await writeStreamChunk(output, chunk); + actualBytes += chunk.length; + chunkCount += 1; + hash.update(chunk); + emitDownloadProgress(invocation, remotePath, chunkCount, actualBytes, remote.bytes, chunk.length, startedAtMs); + }, + }); + await closeWriteStream(output); + if (result.exitCode !== 0) { + throw new SshFileTransferError("remote ssh download stream failed", { + route: invocation.route.raw, + providerId: invocation.providerId, + remotePath, + localPath, + requiredTransport: "host.ssh.tcp-pool", + exitCode: result.exitCode, + stdout: transferTextSnapshot(result.stdout, { headChars: 120, tailChars: 500 }), + stderr: transferTextSnapshot(result.stderr, { headChars: 120, tailChars: 1000 }), }); - if (attempt < fileTransferReadBlockMaxAttempts) { - emitReadRetryProgress(invocation, remotePath, blockIndex, attempt, expectedBytes, actualBytes, expectedChunkBytes, chunk?.length ?? null, reason); - await delayMs(fileTransferReadEmptyRetryDelayMs); - } - } catch (error) { - attemptErrors.push({ - attempt, - error: error instanceof Error ? error.message : String(error), - expectedChunkBytes, - }); - if (attempt < fileTransferReadBlockMaxAttempts) { - emitReadRetryProgress(invocation, remotePath, blockIndex, attempt, expectedBytes, actualBytes, expectedChunkBytes, null, "remote-error"); - await delayMs(fileTransferReadEmptyRetryDelayMs); - } } + const elapsedMs = Math.max(1, Date.now() - startedAtMs); + const local = { bytes: actualBytes, sha256: hash.digest("hex") }; + assertTransferStat("download tcp-pool stdout stream verification", localPath, remote, local); + emitDownloadProgress(invocation, remotePath, Math.max(1, chunkCount), actualBytes, remote.bytes, 0, startedAtMs, true); + return { + remote, + local, + strategy: "tcp-pool-stdout-stream", + transport: "host.ssh.tcp-pool", + chunks: chunkCount, + elapsedMs, + throughputBytesPerSecond: Math.round((actualBytes * 1000) / elapsedMs), + remainingBytes: Math.max(0, remote.bytes - actualBytes), + }; + } catch (error) { + output.destroy(); + await rm(localPath, { force: true }).catch(() => undefined); + throw error; } - throw new SshFileTransferError("remote download returned an invalid block before EOF after retries", { - route: invocation.route.raw, - remotePath, - blockIndex, - attempts: fileTransferReadBlockMaxAttempts, - expectedBytes, - actualBytes, - expectedChunkBytes, - attemptErrors, - }); -} - -function decodeRemoteBase64Block(encoded: string): Buffer | null { - if (encoded.length === 0) return Buffer.alloc(0); - if (!/^[A-Za-z0-9+/]*={0,2}$/u.test(encoded) || encoded.length % 4 !== 0) return null; - return Buffer.from(encoded, "base64"); } function emitDownloadProgress( invocation: ParsedSshInvocation, remotePath: string, - blockIndex: number, chunkCount: number, actualBytes: number, expectedBytes: number, lastChunkBytes: number, + startedAtMs: number, + force = false, ): void { - if (chunkCount !== 1 && actualBytes < expectedBytes && chunkCount % fileTransferProgressEveryChunks !== 0) return; + if (!force && chunkCount !== 1 && actualBytes < expectedBytes && chunkCount % fileTransferProgressEveryChunks !== 0) return; + const elapsedMs = Math.max(1, Date.now() - startedAtMs); process.stderr.write(`${JSON.stringify({ event: "unidesk.ssh.download.progress", + at: new Date().toISOString(), route: invocation.route.raw, providerId: invocation.providerId, remotePath, - blockIndex, + strategy: "tcp-pool-stdout-stream", + transport: "host.ssh.tcp-pool", chunks: chunkCount, + bytes: actualBytes, + totalBytes: expectedBytes, actualBytes, expectedBytes, lastChunkBytes, + remainingBytes: Math.max(0, expectedBytes - actualBytes), + elapsedMs, + throughputBytesPerSecond: Math.round((actualBytes * 1000) / elapsedMs), })}\n`); } -function emitReadRetryProgress( - invocation: ParsedSshInvocation, - remotePath: string, - blockIndex: number, - attempt: number, - expectedBytes: number, - actualBytes: number, - expectedChunkBytes: number, - actualChunkBytes: number | null, - reason: string, -): void { - const event = reason === "empty-read" ? "unidesk.ssh.download.empty-read-retry" : "unidesk.ssh.download.short-read-retry"; - process.stderr.write(`${JSON.stringify({ - event, - route: invocation.route.raw, - providerId: invocation.providerId, - remotePath, - blockIndex, - attempt, - maxAttempts: fileTransferReadBlockMaxAttempts, - reason, - actualBytes, - expectedBytes, - actualChunkBytes, - expectedChunkBytes, - })}\n`); -} - -function delayMs(ms: number): Promise { - return new Promise((resolve) => setTimeout(resolve, ms)); -} - async function statRemoteFile( invocation: ParsedSshInvocation, executor: SshRemoteCommandExecutor, @@ -357,6 +325,31 @@ async function statRemoteFile( return parseFileTransferStat(stat.stdout, stat.stderr, remotePath); } +function buildStreamDownloadRemoteCommand( + route: ParsedSshRoute, + builders: SshFileTransferCommandBuilders, + remotePath: string, +): string { + return builders.buildRouteCommand(route, ["sh", "-c", "cat -- \"$1\"", "unidesk-download", remotePath]); +} + +async function writeStreamChunk(stream: ReturnType, chunk: Buffer): Promise { + if (chunk.length === 0) return; + await new Promise((resolve, reject) => { + stream.write(chunk, (error) => { + if (error) reject(error); + else resolve(); + }); + }); +} + +async function closeWriteStream(stream: ReturnType): Promise { + if (stream.closed) return; + const closed = once(stream, "close"); + stream.end(); + await closed; +} + async function checkedFileTransfer( invocation: ParsedSshInvocation, executor: SshRemoteCommandExecutor, @@ -486,11 +479,6 @@ function posixFileTransferScript(): string { " stat)", " target=$1; bytes=$(wc -c < \"$target\" | tr -d '[:space:]'); digest=$(sha256_file \"$target\"); printf '%s %s\\n' \"$bytes\" \"$digest\"", " ;;", - " read-b64-block)", - " target=$1; block_index=$2; block_size=$3", - " case \"$block_index:$block_size\" in *[!0-9:]*|:*) printf 'invalid read block args\\n' >&2; exit 2;; esac", - " dd if=\"$target\" bs=\"$block_size\" skip=\"$block_index\" count=1 2>/dev/null | base64 | tr -d '\\n'", - " ;;", " write-b64-argv)", " target=$1; expected_bytes=$2; expected_sha256=$3; shift 3", " ensure_parent \"$target\"; base=${target##*/}; dir=.; case \"$target\" in */*) dir=${target%/*};; esac", @@ -563,7 +551,6 @@ function windowsFileTransferScript(basePath: string | null, operation: SshFileTr "$target = Resolve-UnideskPath $targetArg;", "switch ($operation) {", " 'stat' { if (-not (Test-Path -LiteralPath $target -PathType Leaf)) { Fail ('file not found: ' + $target) 1 }; $bytes = ([System.IO.FileInfo]$target).Length; $digest = Get-Sha256 $target; [Console]::Out.WriteLine(([string]$bytes) + ' ' + $digest); break }", - " 'read-b64-block' { $blockIndex = [Int64]$arg1; $blockSize = [Int32]$arg2; if ($blockIndex -lt 0 -or $blockSize -le 0) { Fail 'invalid read block args' 2 }; $fs = [System.IO.File]::Open($target, [System.IO.FileMode]::Open, [System.IO.FileAccess]::Read, [System.IO.FileShare]::ReadWrite); try { [void]$fs.Seek($blockIndex * $blockSize, [System.IO.SeekOrigin]::Begin); $buffer = New-Object byte[] $blockSize; $read = $fs.Read($buffer, 0, $blockSize); if ($read -gt 0) { [Console]::Out.Write([Convert]::ToBase64String($buffer, 0, $read)) } } finally { $fs.Dispose() }; break }", " 'write-b64-argv' { Decode-ToTarget $target ([string]::Concat($argvChunks)) ([Int64]$arg1) $arg2; break }", " 'write-b64-stdin' { Decode-ToTarget $target ([Console]::In.ReadToEnd()) ([Int64]$arg1) $arg2; break }", " 'write-b64-begin' { Ensure-Parent $target; Set-TmpPaths $target $arg1; [System.IO.File]::WriteAllText($script:tmpB64, '', [System.Text.Encoding]::ASCII); break }", diff --git a/scripts/src/ssh.ts b/scripts/src/ssh.ts index ca06368c..8a0d3682 100644 --- a/scripts/src/ssh.ts +++ b/scripts/src/ssh.ts @@ -13,7 +13,12 @@ import { type ApplyPatchV2Executor, type ApplyPatchV2FileSystem, } from "./apply-patch-v2"; -import { isSshFileTransferOperation, runSshFileTransferOperation, type SshRemoteCommandExecutor } from "./ssh-file-transfer"; +import { + isSshFileTransferOperation, + runSshFileTransferOperation, + type SshRemoteCommandExecutor, + type SshRemoteCommandStreamHandlers, +} from "./ssh-file-transfer"; export interface ParsedSshArgs { remoteCommand: string | null; @@ -2843,6 +2848,128 @@ async function runSshCaptureRemoteCommand(config: UniDeskConfig, invocation: Par }); } +async function runSshStreamRemoteCommand( + config: UniDeskConfig, + invocation: ParsedSshInvocation, + remoteCommand: string, + handlers: SshRemoteCommandStreamHandlers, + input?: string, +): Promise { + const streamInvocation = { + ...invocation, + parsed: { ...invocation.parsed, remoteCommand, requiresStdin: input !== undefined, invocationKind: "helper" as const }, + }; + const startedAtMs = Date.now(); + const size = terminalSize(); + const runtimeTimeoutMs = sshRuntimeTimeoutMs(); + const payload = { + providerId: invocation.providerId, + command: wrapSshRemoteCommand(remoteCommand), + cwd: sshRoutePayloadCwd(invocation.route), + tty: false, + stdinEotOnEnd: false, + openTimeoutMs: Math.max(15000, Number(process.env.UNIDESK_SSH_OPEN_TIMEOUT_MS || 60000)), + runtimeTimeoutMs, + cols: size.cols, + rows: size.rows, + }; + const payloadJson = JSON.stringify(payload); + const encodedBrokerSource = Buffer.from(brokerSource(), "utf8").toString("base64"); + const script = [ + "set -eu", + `payload=${shellQuote(payloadJson)}`, + "if command -v backend-core >/dev/null 2>&1; then", + ' exec backend-core --ssh-broker "$payload"', + "fi", + `export UNIDESK_SSH_BROKER_URL=${shellQuote("ws://127.0.0.1:8080/ws/ssh")}`, + 'broker_js="$(mktemp "${TMPDIR:-/tmp}/unidesk-ssh-broker.XXXXXX")"', + 'trap \'rm -f "$broker_js"\' EXIT', + `printf %s ${shellQuote(encodedBrokerSource)} | base64 -d >"$broker_js"`, + 'bun "$broker_js" "$payload"', + ].join("\n"); + const child = spawn("docker", ["exec", "-i", "unidesk-backend-core", "sh", "-c", script], { + cwd: repoRoot, + stdio: ["pipe", "pipe", "pipe"], + }); + if (input !== undefined) { + writeChunkedStdin(child.stdin, input); + } else { + child.stdin.end(); + } + let stdout = ""; + let stderr = ""; + let streamError: unknown = null; + let streamWrites = Promise.resolve(); + const queueStreamWrite = (chunk: Buffer, stream: "stdout" | "stderr"): void => { + streamWrites = streamWrites.then(async () => { + if (stream === "stdout") await handlers.onStdout(chunk); + else if (handlers.onStderr !== undefined) await handlers.onStderr(chunk); + }).catch((error) => { + streamError = error; + stderr += `unidesk ssh stream sink failed: ${error instanceof Error ? error.message : String(error)}\n`; + try { + child.kill("SIGTERM"); + } catch { + // Ignore kill failures after a local stream sink error. + } + }); + }; + child.stdout.on("data", (chunk: Buffer) => { + queueStreamWrite(chunk, "stdout"); + }); + child.stderr.on("data", (chunk: Buffer) => { + stderr += chunk.toString("utf8"); + queueStreamWrite(chunk, "stderr"); + }); + return await new Promise((resolve) => { + let settled = false; + let killTimer: NodeJS.Timeout | null = null; + const finish = (exitCode: number): void => { + if (settled) return; + settled = true; + clearTimeout(runtimeTimer); + if (killTimer !== null) clearTimeout(killTimer); + void streamWrites.then(() => { + const finalCode = streamError === null ? exitCode : 255; + const timingHint = formatSshRuntimeTimingHint(sshRuntimeTimingHint({ + invocation: streamInvocation, + transport: "backend-core-broker", + exitCode: finalCode, + startedAtMs, + })); + if (timingHint) stderr += timingHint; + resolve({ exitCode: finalCode, stdout, stderr }); + }); + }; + const runtimeTimer = setTimeout(() => { + const hint = formatSshRuntimeTimeoutHint(sshRuntimeTimeoutHint({ + invocation: streamInvocation, + transport: "backend-core-broker", + timeoutMs: runtimeTimeoutMs, + })); + stderr += hint; + try { + child.kill("SIGTERM"); + } catch { + // Ignore. + } + killTimer = setTimeout(() => { + try { + child.kill("SIGKILL"); + } catch { + // Ignore. + } + }, 2000); + finish(124); + }, runtimeTimeoutMs); + child.on("error", (error) => { + stderr += `unidesk ssh failed to start broker: ${error.message}\n`; + finish(255); + }); + child.on("close", (code) => finish(code ?? 255)); + }); +} + async function runRemoteSsh(config: UniDeskConfig, host: string, providerId: string, args: string[]): Promise { const { runRemoteCli } = await import("./remote"); return await runRemoteCli({ @@ -2950,6 +3077,7 @@ export async function runSsh(config: UniDeskConfig, providerId: string, args: st if (isSshFileTransferOperation(normalizedArgs)) { const executor: SshRemoteCommandExecutor = { runRemoteCommand: (remoteCommand, input) => runSshCaptureRemoteCommand(config, invocation, remoteCommand, input), + streamRemoteCommand: (remoteCommand, handlers, input) => runSshStreamRemoteCommand(config, invocation, remoteCommand, handlers, input), }; return await runSshFileTransferOperation(invocation, normalizedArgs, executor, { buildRouteCommand: remoteCommandForRoute,