fix: keep active ssh downloads alive

This commit is contained in:
Codex
2026-06-11 18:08:56 +00:00
parent 0a9d07c760
commit a8d49845ca
5 changed files with 132 additions and 23 deletions
+5 -2
View File
@@ -1679,25 +1679,28 @@ function combineCommandResults(command: string[], parts: CommandResult[]): Comma
}
function downloadRemoteFile(options: ArtifactRegistryOptions, remotePath: string, localPath: string, timeoutMs = options.timeoutMs): CommandResult {
const transferRuntimeTimeoutMs = Math.max(timeoutMs * 4, 60 * 60_000);
const command = [
process.execPath,
"scripts/cli.ts",
"ssh",
options.providerId,
"download",
"--inactivity-timeout-ms",
String(timeoutMs),
remotePath,
localPath,
];
const stdoutFile = process.env.UNIDESK_JOB_STDOUT_FILE;
const stderrFile = process.env.UNIDESK_JOB_STDERR_FILE;
if (!stdoutFile || !stderrFile) return runCommand(command, repoRoot, { timeoutMs });
if (!stdoutFile || !stderrFile) return runCommand(command, repoRoot, { timeoutMs: transferRuntimeTimeoutMs });
return runCommand([
"bash",
"-lc",
`set -o pipefail; "$@" > >(tee -a ${shellQuote(stdoutFile)}) 2> >(tee -a ${shellQuote(stderrFile)} >&2)`,
"unidesk-artifact-download",
...command,
], repoRoot, { timeoutMs });
], repoRoot, { timeoutMs: transferRuntimeTimeoutMs });
}
async function runRemoteScriptBackground(
+2 -2
View File
@@ -200,7 +200,7 @@ export function sshHelp(): unknown {
"script and shell helper modes inject a tiny POSIX-compatible printf wrapper before user shell text, so portable printf headings such as `printf \"--- section ---\\n\"` work consistently under dash/sh and bash. Direct argv commands are unchanged.",
"For arbitrary stdin streams into a workload command, use a workload route plus `exec --stdin -- <command> ...`; this keeps the route as location-only and avoids heredoc/base64/tar shell wrapping.",
"`apply-patch` is the default remote text patch entry and uses the v2 local line-based patch engine with remote read/write operations, including Windows routes such as `D601:win/c/test`, so long Unicode/Chinese lines and pure insertion hunks avoid the legacy remote shell hunk parser. Plain multi-file Update File patches on POSIX host/k3s and Windows workspace routes use bulk read/write operations to avoid per-file SSH round trips. Its stdout follows Codex apply_patch text output rather than UniDesk JSON output; stderr keeps Codex-style failure text and appends one `UNIDESK_APPLY_PATCH_TIMING` JSON summary with durationMs, patchBytes, fileCount, hunkCount, changedCount, remoteOperationCount, remoteOperationCounts and remoteElapsedMs so slow patch runs can be attributed without changing success stdout.",
"`upload` and `download` are the default whole-file transfer entries for non-text and generated files. They write through remote temp files, verify byte count and SHA-256 on both sides, and return `verification.automatic=true`, `verification.verified=true`, and `verification.match.{bytes,sha256}=true`; this JSON is the transfer integrity proof, so callers do not need a separate manual `sha256sum` check. The client falls back from a single stdin payload to bounded chunks before treating provider-gateway limits as a server-side problem.",
"`upload` and `download` are the default whole-file transfer entries for non-text and generated files. They write through remote temp files, verify byte count and SHA-256 on both sides, and return `verification.automatic=true`, `verification.verified=true`, and `verification.match.{bytes,sha256}=true`; this JSON is the transfer integrity proof, so callers do not need a separate manual `sha256sum` check. Downloads stream over `host.ssh.tcp-pool`, emit progress JSON, and may receive a caller-supplied `--inactivity-timeout-ms` from async artifact/deploy jobs so active large transfers are not killed by the generic short-command budget.",
"`apply-patch-v1` is the only legacy fallback entry: it rejects low-context update hunks by default, reports the matched file:line for each hunk on stderr, and only accepts --allow-loose when the caller has manually reviewed an intentionally ambiguous insertion.",
"script defaults to target /bin/sh and inherits provider proxy variables such as HTTP_PROXY/HTTPS_PROXY/ALL_PROXY/NO_PROXY; it is for host/k3s POSIX shell only. Use --shell bash only for bash syntax such as pipefail, arrays, or [[ ... ]], not as a proxy workaround.",
"Route syntax is `{provider}:{plane}[:{scope...}] {operation} [operation-args...]`: the first argv token locates a distributed target only, and every following token belongs to the operation parser. Host workspace routes use `<provider>:/absolute/workspace`; WSL providers can use `<provider>:win ps` for Windows PowerShell and `<provider>:win cmd` for Windows cmd.exe, with `<provider>:win/c/test ...` mapping the Windows cwd to `C:\\test`; native k3s providers such as D601 and G14 use <provider>:k3s for the control plane, <provider>:k3s:<namespace>:<workload> for a workload, and <provider>:k3s:<namespace>:<workload>/<pod-workspace> for a pod workspace.",
@@ -209,7 +209,7 @@ export function sshHelp(): unknown {
"Do not put operation names in any colon route segment, including nested k3s namespace/workload/container segments.",
"Do not use post-provider shorthand such as `trans G14 k3s ...`; write `trans G14:k3s ...` so location and operation stay separated.",
"If an ssh-like remote command fails with timeout/kex/exit-255 friction, stderr includes one low-noise UNIDESK_SSH_HINT JSON line with the argv retry command.",
"Non-interactive ssh/trans/tran operations have a hard top-level runtime timeout capped at 60s. Timeout writes UNIDESK_SSH_RUNTIME_TIMEOUT or UNIDESK_TRAN_TIMEOUT_HINT and disconnects the broker; long CI/CD, trace, logs, build, or hardware work must use submit-and-poll / short query loops instead of keeping trans open.",
"Ordinary non-interactive ssh/trans/tran remote commands have a hard top-level runtime timeout capped at 60s. Timeout writes UNIDESK_SSH_RUNTIME_TIMEOUT or UNIDESK_TRAN_TIMEOUT_HINT and disconnects the broker; long CI/CD, trace, logs, build, or hardware work must use submit-and-poll / short query loops instead of keeping trans open. Whole-file `download` is the narrow exception: controlled async callers can pass `--inactivity-timeout-ms` for a verified progress-emitting tcp-pool transfer.",
"Only slow ssh/trans/tran runtime writes UNIDESK_SSH_TIMING JSON to stderr; operations over 10s are marked level=warning even when they succeed, because slow successful calls are a distributed performance monitoring signal. Check provider latency, remote command cost, helper bootstrap, or remote patch optimization before repeating high-frequency work. Routine short calls do not emit timing noise.",
"The local trans/tran wrapper must not add provider/plane directory locks; rely on k8s/Tekton/Argo/Lease or server-side TTL queues for coordination.",
"Use -- before a remote command that intentionally starts with a dash.",
+33 -3
View File
@@ -1311,6 +1311,7 @@ async function runRemoteSshWebSocketStreamRemoteCommand(
remoteCommand: string,
handlers: SshRemoteCommandStreamHandlers,
input?: string,
options: { inactivityTimeoutMs?: number } = {},
): Promise<SshCaptureResult> {
const streamInvocation = {
...invocation,
@@ -1321,7 +1322,8 @@ async function runRemoteSshWebSocketStreamRemoteCommand(
cols: Number(process.stdout.columns) > 0 ? Number(process.stdout.columns) : 100,
rows: Number(process.stdout.rows) > 0 ? Number(process.stdout.rows) : 30,
};
const runtimeTimeoutMs = sshRuntimeTimeoutMs();
const inactivityTimeoutMs = options.inactivityTimeoutMs ?? sshRuntimeTimeoutMs();
const runtimeTimeoutMs = options.inactivityTimeoutMs === undefined ? inactivityTimeoutMs : Math.max(inactivityTimeoutMs * 4, 60 * 60_000);
const payload = {
providerId: invocation.providerId,
command: wrapSshRemoteCommand(remoteCommand),
@@ -1330,6 +1332,7 @@ async function runRemoteSshWebSocketStreamRemoteCommand(
stdinEotOnEnd: false,
openTimeoutMs: Math.max(15000, Number(process.env.UNIDESK_SSH_OPEN_TIMEOUT_MS || 60000)),
runtimeTimeoutMs,
runtimeTimeoutMode: options.inactivityTimeoutMs === undefined ? "wall-clock" : "inactivity",
cols: size.cols,
rows: size.rows,
};
@@ -1395,11 +1398,32 @@ async function runRemoteSshWebSocketStreamRemoteCommand(
return await new Promise<SshCaptureResult>((resolve) => {
let killTimer: ReturnType<typeof setTimeout> | null = null;
let inactivityTimer: ReturnType<typeof setTimeout> | null = null;
const refreshActivityTimer = (): void => {
if (settled || options.inactivityTimeoutMs === undefined) return;
if (inactivityTimer !== null) clearTimeout(inactivityTimer);
inactivityTimer = setTimeout(() => {
exitCode = 124;
stderr += formatSshRuntimeTimeoutHint(sshRuntimeTimeoutHint({
invocation: streamInvocation,
transport: "frontend-websocket",
timeoutMs: inactivityTimeoutMs,
}));
try {
ws.close();
} catch {
// Ignore.
}
killTimer = setTimeout(() => finish(124), 2000);
finish(124);
}, inactivityTimeoutMs);
};
const finish = (code: number): void => {
if (settled) return;
settled = true;
clearTimeout(openTimer);
clearTimeout(runtimeTimer);
if (inactivityTimer !== null) clearTimeout(inactivityTimer);
if (killTimer !== null) clearTimeout(killTimer);
void streamWrites.then(() => {
const finalCode = streamError === null ? code : 255;
@@ -1439,6 +1463,7 @@ async function runRemoteSshWebSocketStreamRemoteCommand(
killTimer = setTimeout(() => finish(124), 2000);
finish(124);
}, runtimeTimeoutMs);
refreshActivityTimer();
ws.addEventListener("open", () => {
canSend = true;
@@ -1454,16 +1479,21 @@ async function runRemoteSshWebSocketStreamRemoteCommand(
stderr += `${text}\n`;
return;
}
if (message.type === "ssh.dispatched") return;
if (message.type === "ssh.dispatched") {
refreshActivityTimer();
return;
}
if (message.type === "ssh.opened") {
sessionReady = true;
clearTimeout(openTimer);
refreshActivityTimer();
if (input !== undefined) sendInputChunked(input);
sendWhenSessionReady({ type: "ssh.eof" });
flushSessionMessages();
return;
}
if (message.type === "ssh.data") {
refreshActivityTimer();
const chunk = Buffer.from(String(message.data ?? ""), message.encoding === "base64" ? "base64" : "utf8");
if (message.stream === "stderr") {
stderr += chunk.toString("utf8");
@@ -1514,7 +1544,7 @@ async function runRemoteSshOverFrontend(session: FrontendSession, target: string
if (isSshFileTransferOperation(normalizedArgs)) {
const executor: SshRemoteCommandExecutor = {
runRemoteCommand: (remoteCommand, input) => runRemoteSshWebSocketCaptureRemoteCommand(session, invocation, remoteCommand, input),
streamRemoteCommand: (remoteCommand, handlers, input) => runRemoteSshWebSocketStreamRemoteCommand(session, invocation, remoteCommand, handlers, input),
streamRemoteCommand: (remoteCommand, handlers, input, options) => runRemoteSshWebSocketStreamRemoteCommand(session, invocation, remoteCommand, handlers, input, options),
};
return await runSshFileTransferOperation(invocation, normalizedArgs, executor, {
buildRouteCommand: remoteCommandForRoute,
+36 -5
View File
@@ -12,7 +12,12 @@ export interface SshRemoteCommandStreamHandlers {
export interface SshRemoteCommandExecutor {
runRemoteCommand(remoteCommand: string, input?: string): Promise<SshCaptureResult>;
streamRemoteCommand?(remoteCommand: string, handlers: SshRemoteCommandStreamHandlers, input?: string): Promise<SshCaptureResult>;
streamRemoteCommand?(
remoteCommand: string,
handlers: SshRemoteCommandStreamHandlers,
input?: string,
options?: { inactivityTimeoutMs?: number },
): Promise<SshCaptureResult>;
}
export interface SshFileTransferCommandBuilders {
@@ -32,6 +37,7 @@ interface SshFileTransferCliOptions {
action: "upload" | "download";
localPath: string;
remotePath: string;
inactivityTimeoutMs?: number;
}
interface SshFileTransferStat {
@@ -111,7 +117,7 @@ export async function runSshFileTransferOperation(
return 0;
}
const read = await downloadRemoteFileVerified(invocation, executor, builders, options.remotePath, localPath);
const read = await downloadRemoteFileVerified(invocation, executor, builders, options.remotePath, localPath, options.inactivityTimeoutMs);
const verification = buildTransferVerification(
{ side: "remote", path: options.remotePath, ...read.remote },
{ side: "local", path: localPath, ...read.local },
@@ -143,6 +149,7 @@ function parseSshFileTransferCliOptions(args: string[]): SshFileTransferCliOptio
const action = args[0];
if (action !== "upload" && action !== "download") throw new Error("ssh file transfer requires upload or download");
const positionals: string[] = [];
let inactivityTimeoutMs: number | undefined;
for (let index = 1; index < args.length; index += 1) {
const arg = args[index] ?? "";
if (arg === "--") {
@@ -152,6 +159,21 @@ function parseSshFileTransferCliOptions(args: string[]): SshFileTransferCliOptio
if (arg === "--chunk-bytes" || arg === "--block-bytes") {
throw new Error(`unsupported ssh ${action} option: ${arg}; downloads use host.ssh.tcp-pool stdout streaming and no longer support the legacy base64 block reader`);
}
if (arg === "--inactivity-timeout-ms" || arg === "--runtime-timeout-ms") {
const value = args[index + 1];
if (value === undefined) throw new Error(`ssh ${action} ${arg} requires a positive integer value`);
index += 1;
inactivityTimeoutMs = parsePositiveRuntimeTimeoutMs(value, `ssh ${action} ${arg}`);
continue;
}
if (arg.startsWith("--inactivity-timeout-ms=")) {
inactivityTimeoutMs = parsePositiveRuntimeTimeoutMs(arg.slice("--inactivity-timeout-ms=".length), `ssh ${action} --inactivity-timeout-ms`);
continue;
}
if (arg.startsWith("--runtime-timeout-ms=")) {
inactivityTimeoutMs = parsePositiveRuntimeTimeoutMs(arg.slice("--runtime-timeout-ms=".length), `ssh ${action} --runtime-timeout-ms`);
continue;
}
if (arg === "--help" || arg === "-h" || arg === "help") {
throw new Error(`ssh ${action} usage: trans <route> ${action} ${action === "upload" ? "<local-file> <remote-file>" : "<remote-file> <local-file>"}`);
}
@@ -164,8 +186,16 @@ function parseSshFileTransferCliOptions(args: string[]): SshFileTransferCliOptio
const [first, second] = positionals;
if (!first || !second) throw new Error(`ssh ${action} paths must be non-empty`);
return action === "upload"
? { action, localPath: first, remotePath: second }
: { action, remotePath: first, localPath: second };
? { action, localPath: first, remotePath: second, inactivityTimeoutMs }
: { action, remotePath: first, localPath: second, inactivityTimeoutMs };
}
function parsePositiveRuntimeTimeoutMs(value: string, label: string): number {
const parsed = Number(value);
if (!Number.isFinite(parsed) || parsed <= 0 || !Number.isInteger(parsed)) {
throw new Error(`${label} must be a positive integer number of milliseconds`);
}
return parsed;
}
async function writeRemoteFileVerified(
@@ -210,6 +240,7 @@ async function downloadRemoteFileVerified(
builders: SshFileTransferCommandBuilders,
remotePath: string,
localPath: string,
inactivityTimeoutMs?: number,
): Promise<SshFileTransferDownloadResult> {
if (executor.streamRemoteCommand === undefined) {
throw new SshFileTransferError("ssh download requires a tcp-pool stdout stream sink", {
@@ -248,7 +279,7 @@ async function downloadRemoteFileVerified(
hash.update(chunk);
emitDownloadProgress(invocation, remotePath, chunkCount, actualBytes, remote.bytes, chunk.length, startedAtMs);
},
});
}, undefined, { inactivityTimeoutMs });
await closeWriteStream(output);
if (result.exitCode !== 0) {
throw new SshFileTransferError("remote ssh download stream failed", {
+56 -11
View File
@@ -2499,12 +2499,23 @@ const openTimer = setTimeout(() => {
process.exit(255);
}, Number(open.openTimeoutMs || 15000));
const runtimeTimeoutMs = Number(open.runtimeTimeoutMs || 60000);
const runtimeTimer = setTimeout(() => {
process.stderr.write("unidesk ssh bridge runtime timeout; use short query plus poll semantics instead of keeping tran open\n");
exitCode = 124;
try { ws.close(); } catch {}
setTimeout(() => process.exit(124), 250).unref?.();
}, runtimeTimeoutMs);
const runtimeTimeoutMode = open.runtimeTimeoutMode === "inactivity" ? "inactivity" : "wall-clock";
let runtimeTimer = null;
function armRuntimeTimer() {
if (runtimeTimer !== null) clearTimeout(runtimeTimer);
runtimeTimer = setTimeout(() => {
const noun = runtimeTimeoutMode === "inactivity" ? "inactivity timeout" : "runtime timeout";
process.stderr.write("unidesk ssh bridge " + noun + "; use short query plus poll semantics for long non-streaming work\n");
exitCode = 124;
try { ws.close(); } catch {}
setTimeout(() => process.exit(124), 250).unref?.();
}, runtimeTimeoutMs);
}
function clearRuntimeTimer() {
if (runtimeTimer !== null) clearTimeout(runtimeTimer);
runtimeTimer = null;
}
armRuntimeTimer();
function send(value) {
const text = JSON.stringify(value);
@@ -2557,6 +2568,7 @@ ws.addEventListener("open", () => {
ws.addEventListener("message", (event) => {
const message = JSON.parse(decodeData(event.data));
if (runtimeTimeoutMode === "inactivity") armRuntimeTimer();
if (message.type === "ssh.data") {
opened = true;
const chunk = Buffer.from(message.data || "", "base64");
@@ -2577,7 +2589,7 @@ ws.addEventListener("message", (event) => {
}
if (message.type === "ssh.error") {
clearTimeout(openTimer);
clearTimeout(runtimeTimer);
clearRuntimeTimer();
process.stderr.write(String(message.message || "ssh bridge error") + "\n");
exitCode = 255;
ws.close();
@@ -2585,7 +2597,7 @@ ws.addEventListener("message", (event) => {
}
if (message.type === "ssh.exit") {
clearTimeout(openTimer);
clearTimeout(runtimeTimer);
clearRuntimeTimer();
exitCode = Number.isInteger(message.exitCode) ? message.exitCode : 255;
ws.close();
}
@@ -2593,7 +2605,7 @@ ws.addEventListener("message", (event) => {
ws.addEventListener("close", () => {
clearTimeout(openTimer);
clearTimeout(runtimeTimer);
clearRuntimeTimer();
process.exit(exitCode);
});
@@ -2854,6 +2866,7 @@ async function runSshStreamRemoteCommand(
remoteCommand: string,
handlers: SshRemoteCommandStreamHandlers,
input?: string,
options: { inactivityTimeoutMs?: number } = {},
): Promise<SshCaptureResult> {
const streamInvocation = {
...invocation,
@@ -2861,7 +2874,8 @@ async function runSshStreamRemoteCommand(
};
const startedAtMs = Date.now();
const size = terminalSize();
const runtimeTimeoutMs = sshRuntimeTimeoutMs();
const inactivityTimeoutMs = options.inactivityTimeoutMs ?? sshRuntimeTimeoutMs();
const runtimeTimeoutMs = options.inactivityTimeoutMs === undefined ? inactivityTimeoutMs : Math.max(inactivityTimeoutMs * 4, 60 * 60_000);
const payload = {
providerId: invocation.providerId,
command: wrapSshRemoteCommand(remoteCommand),
@@ -2870,6 +2884,7 @@ async function runSshStreamRemoteCommand(
stdinEotOnEnd: false,
openTimeoutMs: Math.max(15000, Number(process.env.UNIDESK_SSH_OPEN_TIMEOUT_MS || 60000)),
runtimeTimeoutMs,
runtimeTimeoutMode: options.inactivityTimeoutMs === undefined ? "wall-clock" : "inactivity",
cols: size.cols,
rows: size.rows,
};
@@ -2924,10 +2939,37 @@ async function runSshStreamRemoteCommand(
return await new Promise<SshCaptureResult>((resolve) => {
let settled = false;
let killTimer: NodeJS.Timeout | null = null;
let inactivityTimer: NodeJS.Timeout | null = null;
const refreshActivityTimer = (): void => {
if (settled || options.inactivityTimeoutMs === undefined) return;
if (inactivityTimer !== null) clearTimeout(inactivityTimer);
inactivityTimer = setTimeout(() => {
const hint = formatSshRuntimeTimeoutHint(sshRuntimeTimeoutHint({
invocation: streamInvocation,
transport: "backend-core-broker",
timeoutMs: inactivityTimeoutMs,
}));
stderr += hint;
try {
child.kill("SIGTERM");
} catch {
// Ignore.
}
killTimer = setTimeout(() => {
try {
child.kill("SIGKILL");
} catch {
// Ignore.
}
}, 2000);
finish(124);
}, inactivityTimeoutMs);
};
const finish = (exitCode: number): void => {
if (settled) return;
settled = true;
clearTimeout(runtimeTimer);
if (inactivityTimer !== null) clearTimeout(inactivityTimer);
if (killTimer !== null) clearTimeout(killTimer);
void streamWrites.then(() => {
const finalCode = streamError === null ? exitCode : 255;
@@ -2962,11 +3004,14 @@ async function runSshStreamRemoteCommand(
}, 2000);
finish(124);
}, runtimeTimeoutMs);
refreshActivityTimer();
child.on("error", (error) => {
stderr += `unidesk ssh failed to start broker: ${error.message}\n`;
finish(255);
});
child.on("close", (code) => finish(code ?? 255));
child.stdout.on("data", refreshActivityTimer);
child.stderr.on("data", refreshActivityTimer);
});
}
@@ -3077,7 +3122,7 @@ export async function runSsh(config: UniDeskConfig, providerId: string, args: st
if (isSshFileTransferOperation(normalizedArgs)) {
const executor: SshRemoteCommandExecutor = {
runRemoteCommand: (remoteCommand, input) => runSshCaptureRemoteCommand(config, invocation, remoteCommand, input),
streamRemoteCommand: (remoteCommand, handlers, input) => runSshStreamRemoteCommand(config, invocation, remoteCommand, handlers, input),
streamRemoteCommand: (remoteCommand, handlers, input, options) => runSshStreamRemoteCommand(config, invocation, remoteCommand, handlers, input, options),
};
return await runSshFileTransferOperation(invocation, normalizedArgs, executor, {
buildRouteCommand: remoteCommandForRoute,