fix: retry transient ssh download empty blocks
This commit is contained in:
@@ -53,6 +53,7 @@ class SshFileTransferError extends Error {
|
||||
const fileTransferReadBlockBytes = 45_000;
|
||||
const fileTransferWriteB64ArgvLimit = 48_000;
|
||||
const fileTransferWriteB64ChunkChars = 12_000;
|
||||
const fileTransferReadBlockMaxAttempts = 3;
|
||||
|
||||
export function isSshFileTransferOperation(args: string[]): boolean {
|
||||
const subcommand = args[0] ?? "";
|
||||
@@ -204,18 +205,8 @@ async function readRemoteFileVerified(
|
||||
let actualBytes = 0;
|
||||
let chunkCount = 0;
|
||||
for (let blockIndex = 0; actualBytes < remote.bytes; blockIndex += 1) {
|
||||
const read = await checkedFileTransfer(invocation, executor, builders, "read-b64-block", [remotePath, String(blockIndex), String(readBlockBytes)]);
|
||||
const encoded = read.stdout.replace(/\s+/gu, "");
|
||||
const encoded = await readRemoteBase64BlockWithRetry(invocation, executor, builders, remotePath, readBlockBytes, blockIndex, remote.bytes, actualBytes);
|
||||
const chunk = encoded.length === 0 ? Buffer.alloc(0) : Buffer.from(encoded, "base64");
|
||||
if (chunk.length === 0) {
|
||||
throw new SshFileTransferError("remote download returned an empty block before EOF", {
|
||||
route: invocation.route.raw,
|
||||
remotePath,
|
||||
blockIndex,
|
||||
expectedBytes: remote.bytes,
|
||||
actualBytes,
|
||||
});
|
||||
}
|
||||
chunks.push(chunk);
|
||||
actualBytes += chunk.length;
|
||||
chunkCount += 1;
|
||||
@@ -226,6 +217,41 @@ async function readRemoteFileVerified(
|
||||
return { remote, content, chunks: chunkCount };
|
||||
}
|
||||
|
||||
async function readRemoteBase64BlockWithRetry(
|
||||
invocation: ParsedSshInvocation,
|
||||
executor: SshRemoteCommandExecutor,
|
||||
builders: SshFileTransferCommandBuilders,
|
||||
remotePath: string,
|
||||
readBlockBytes: number,
|
||||
blockIndex: number,
|
||||
expectedBytes: number,
|
||||
actualBytes: number,
|
||||
): Promise<string> {
|
||||
const attemptErrors: Array<Record<string, unknown>> = [];
|
||||
for (let attempt = 1; attempt <= fileTransferReadBlockMaxAttempts; attempt += 1) {
|
||||
try {
|
||||
const read = await checkedFileTransfer(invocation, executor, builders, "read-b64-block", [remotePath, String(blockIndex), String(readBlockBytes)]);
|
||||
const encoded = read.stdout.replace(/\s+/gu, "");
|
||||
if (encoded.length > 0) return encoded;
|
||||
attemptErrors.push({ attempt, exitCode: read.exitCode, stdoutBytes: read.stdout.length, stderrTail: read.stderr.slice(-500) });
|
||||
} catch (error) {
|
||||
attemptErrors.push({
|
||||
attempt,
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
});
|
||||
}
|
||||
}
|
||||
throw new SshFileTransferError("remote download returned an empty block before EOF after retries", {
|
||||
route: invocation.route.raw,
|
||||
remotePath,
|
||||
blockIndex,
|
||||
attempts: fileTransferReadBlockMaxAttempts,
|
||||
expectedBytes,
|
||||
actualBytes,
|
||||
attemptErrors,
|
||||
});
|
||||
}
|
||||
|
||||
async function statRemoteFile(
|
||||
invocation: ParsedSshInvocation,
|
||||
executor: SshRemoteCommandExecutor,
|
||||
|
||||
@@ -282,7 +282,7 @@ async function applyPatchV2Fixture(patch: string, files: Record<string, string>)
|
||||
return { stdout: result.stdout, files: result.files, commands: result.commands };
|
||||
}
|
||||
|
||||
function fileTransferFixture(initial: Record<string, Buffer> = {}): {
|
||||
function fileTransferFixture(initial: Record<string, Buffer> = {}, options: { emptyReadOnce?: Record<string, number[]> } = {}): {
|
||||
state: Map<string, Buffer>;
|
||||
commands: Array<{ operation: string; stdin: boolean }>;
|
||||
executor: SshRemoteCommandExecutor;
|
||||
@@ -290,6 +290,7 @@ function fileTransferFixture(initial: Record<string, Buffer> = {}): {
|
||||
} {
|
||||
const state = new Map(Object.entries(initial));
|
||||
const pending = new Map<string, string>();
|
||||
const emptyReadOnce = new Map(Object.entries(options.emptyReadOnce ?? {}).map(([target, blocks]) => [target, new Set(blocks)]));
|
||||
const commands: Array<{ operation: string; stdin: boolean }> = [];
|
||||
const builders: SshFileTransferCommandBuilders = {
|
||||
buildRouteCommand(route, command, options) {
|
||||
@@ -317,6 +318,11 @@ function fileTransferFixture(initial: Record<string, Buffer> = {}): {
|
||||
const blockIndex = Number(command[6] ?? "-1");
|
||||
const blockSize = Number(command[7] ?? "-1");
|
||||
const start = blockIndex * blockSize;
|
||||
const emptyBlocks = emptyReadOnce.get(target);
|
||||
if (emptyBlocks?.has(blockIndex)) {
|
||||
emptyBlocks.delete(blockIndex);
|
||||
return { exitCode: 0, stdout: "", stderr: "" };
|
||||
}
|
||||
return { exitCode: 0, stdout: content.subarray(start, start + blockSize).toString("base64"), stderr: "" };
|
||||
}
|
||||
if (operation === "write-b64-argv" || operation === "write-b64-stdin") {
|
||||
@@ -447,6 +453,16 @@ export async function runSshArgvGuidanceContract(): Promise<JsonRecord> {
|
||||
);
|
||||
assertCondition(readFileSync(localDownload).equals(payload), "download must preserve binary and UTF-8 bytes locally", { commands: transfer.commands });
|
||||
assertCondition(transfer.commands.some((item) => item.operation === "stat") && transfer.commands.some((item) => item.operation === "read-b64-block"), "file transfer should use stat plus chunked verified reads", transfer.commands);
|
||||
|
||||
const retryDownload = path.join(transferRoot, "downloaded", "retry-copy.bin");
|
||||
const retryPayload = Buffer.from("0123456789abcdef".repeat(4096), "utf8");
|
||||
const retryTransfer = fileTransferFixture({ "/tmp/retry-remote.bin": retryPayload }, { emptyReadOnce: { "/tmp/retry-remote.bin": [1] } });
|
||||
const retryResult = await captureStdout(() => runSshFileTransferOperation(parseSshInvocation("D601", ["download", "--chunk-bytes", "1024", "/tmp/retry-remote.bin", retryDownload]), ["download", "--chunk-bytes", "1024", "/tmp/retry-remote.bin", retryDownload], retryTransfer.executor, retryTransfer.builders));
|
||||
const retryJson = JSON.parse(retryResult.stdout) as JsonRecord;
|
||||
const retryReadBlocks = retryTransfer.commands.filter((item) => item.operation === "read-b64-block");
|
||||
assertCondition(retryResult.exitCode === 0 && retryJson.sha256 === sha256BufferHex(retryPayload), "download should retry a transient empty block and keep sha256 verification", retryResult);
|
||||
assertCondition(retryReadBlocks.length === Number(retryJson.transfer && typeof retryJson.transfer === "object" ? (retryJson.transfer as JsonRecord).chunks : 0) + 1, "transient empty block should add exactly one repeated read without counting as a chunk", retryTransfer.commands);
|
||||
assertCondition(readFileSync(retryDownload).equals(retryPayload), "retry download must preserve complete content after transient empty block", { commands: retryTransfer.commands });
|
||||
} finally {
|
||||
rmSync(transferRoot, { recursive: true, force: true });
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user