Merge pull request #1257 from pikasTech/fix/jd01-ssh-data-session-buffer
fix: buffer early ssh data frames in provider gateway
This commit is contained in:
@@ -103,11 +103,18 @@ interface HostSshSession {
|
|||||||
dataChannelId?: string;
|
dataChannelId?: string;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
interface PendingSshDataFrame {
|
||||||
|
header: Record<string, unknown>;
|
||||||
|
payload: Buffer;
|
||||||
|
queuedAt: number;
|
||||||
|
}
|
||||||
|
|
||||||
interface SshDataChannel {
|
interface SshDataChannel {
|
||||||
id: string;
|
id: string;
|
||||||
socket: Socket;
|
socket: Socket;
|
||||||
status: "connecting" | "ready" | "claimed" | "closed";
|
status: "connecting" | "ready" | "claimed" | "closed";
|
||||||
buffer: Buffer;
|
buffer: Buffer;
|
||||||
|
pendingFrames: PendingSshDataFrame[];
|
||||||
openedAt: number;
|
openedAt: number;
|
||||||
lastReadyAt: number | null;
|
lastReadyAt: number | null;
|
||||||
activeSessionId: string | null;
|
activeSessionId: string | null;
|
||||||
@@ -154,6 +161,8 @@ const sshDataChannels = new Map<string, SshDataChannel>();
|
|||||||
let sshDataPoolDesired = false;
|
let sshDataPoolDesired = false;
|
||||||
let sshDataChannelSeq = 0;
|
let sshDataChannelSeq = 0;
|
||||||
let sshDataLastError: string | null = null;
|
let sshDataLastError: string | null = null;
|
||||||
|
const sshDataPendingSessionFrameMaxCount = 64;
|
||||||
|
const sshDataPendingSessionFrameMaxBytes = sshDataMaxPayloadBytes;
|
||||||
const microserviceForwardRequestHeaders = [
|
const microserviceForwardRequestHeaders = [
|
||||||
"accept",
|
"accept",
|
||||||
"content-type",
|
"content-type",
|
||||||
@@ -687,6 +696,45 @@ function sendSshDataSessionFrame(sessionId: string, header: Record<string, unkno
|
|||||||
return writeSshDataFrame(channel, { ...header, sessionId }, payload);
|
return writeSshDataFrame(channel, { ...header, sessionId }, payload);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function pendingSshDataFrameBytes(channel: SshDataChannel): number {
|
||||||
|
return channel.pendingFrames.reduce((total, frame) => total + frame.payload.length, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
function shouldBufferSshDataFrameBeforeSessionReady(channel: SshDataChannel, sessionId: string, type: string): boolean {
|
||||||
|
return (
|
||||||
|
channel.activeSessionId === sessionId
|
||||||
|
&& (type === "input" || type === "eof" || type === "resize" || type === "close")
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
function bufferSshDataFrameBeforeSessionReady(channel: SshDataChannel, sessionId: string, type: string, header: Record<string, unknown>, payload: Buffer): void {
|
||||||
|
const pendingBytes = pendingSshDataFrameBytes(channel);
|
||||||
|
if (
|
||||||
|
channel.pendingFrames.length >= sshDataPendingSessionFrameMaxCount
|
||||||
|
|| pendingBytes + payload.length > sshDataPendingSessionFrameMaxBytes
|
||||||
|
) {
|
||||||
|
const message = `ssh data frames arrived before session was ready and exceeded pending buffer limits: frames=${channel.pendingFrames.length} bytes=${pendingBytes}`;
|
||||||
|
channel.lastError = message;
|
||||||
|
sshDataLastError = message;
|
||||||
|
logger("warn", "ssh_data_pending_session_buffer_overflow", { channelId: channel.id, sessionId, type, pendingFrames: channel.pendingFrames.length, pendingBytes, payloadBytes: payload.length });
|
||||||
|
writeSshDataFrame(channel, { type: "error", sessionId, message, at: new Date().toISOString() });
|
||||||
|
closeSshDataChannel(channel, "pending session frame buffer overflow");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
channel.pendingFrames.push({ header, payload, queuedAt: Date.now() });
|
||||||
|
logger("debug", "ssh_data_frame_buffered_before_session_ready", { channelId: channel.id, sessionId, type, pendingFrames: channel.pendingFrames.length, payloadBytes: payload.length });
|
||||||
|
}
|
||||||
|
|
||||||
|
function replayBufferedSshDataFrames(sessionId: string, channel: SshDataChannel): void {
|
||||||
|
if (channel.pendingFrames.length === 0) return;
|
||||||
|
const frames = channel.pendingFrames.splice(0);
|
||||||
|
logger("info", "ssh_data_pending_session_frames_replayed", { channelId: channel.id, sessionId, frames: frames.length, oldestAgeMs: Date.now() - frames[0]!.queuedAt });
|
||||||
|
for (const frame of frames) {
|
||||||
|
handleSshDataFrame(channel, frame.header, frame.payload);
|
||||||
|
if (!hostSshSessions.has(sessionId)) break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
function handleSshDataFrame(channel: SshDataChannel, header: Record<string, unknown>, payload: Buffer): void {
|
function handleSshDataFrame(channel: SshDataChannel, header: Record<string, unknown>, payload: Buffer): void {
|
||||||
const type = typeof header.type === "string" ? header.type : "";
|
const type = typeof header.type === "string" ? header.type : "";
|
||||||
const sessionId = typeof header.sessionId === "string" ? header.sessionId : "";
|
const sessionId = typeof header.sessionId === "string" ? header.sessionId : "";
|
||||||
@@ -696,6 +744,10 @@ function handleSshDataFrame(channel: SshDataChannel, header: Record<string, unkn
|
|||||||
}
|
}
|
||||||
const session = hostSshSessions.get(sessionId);
|
const session = hostSshSessions.get(sessionId);
|
||||||
if (session === undefined) {
|
if (session === undefined) {
|
||||||
|
if (shouldBufferSshDataFrameBeforeSessionReady(channel, sessionId, type)) {
|
||||||
|
bufferSshDataFrameBeforeSessionReady(channel, sessionId, type, header, payload);
|
||||||
|
return;
|
||||||
|
}
|
||||||
logger("warn", "ssh_data_session_missing", { channelId: channel.id, sessionId, type });
|
logger("warn", "ssh_data_session_missing", { channelId: channel.id, sessionId, type });
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@@ -797,6 +849,7 @@ function openSshDataChannel(): void {
|
|||||||
socket,
|
socket,
|
||||||
status: "connecting",
|
status: "connecting",
|
||||||
buffer: Buffer.alloc(0),
|
buffer: Buffer.alloc(0),
|
||||||
|
pendingFrames: [],
|
||||||
openedAt: Date.now(),
|
openedAt: Date.now(),
|
||||||
lastReadyAt: null,
|
lastReadyAt: null,
|
||||||
activeSessionId: null,
|
activeSessionId: null,
|
||||||
@@ -1871,6 +1924,7 @@ function startHostSshSession(message: CoreHostSshOpenMessage): void {
|
|||||||
providerId: config.providerId,
|
providerId: config.providerId,
|
||||||
at: new Date().toISOString(),
|
at: new Date().toISOString(),
|
||||||
});
|
});
|
||||||
|
replayBufferedSshDataFrames(message.sessionId, dataChannel);
|
||||||
const stdoutDone = pumpHostSshOutput(message.sessionId, "stdout", proc.stdout);
|
const stdoutDone = pumpHostSshOutput(message.sessionId, "stdout", proc.stdout);
|
||||||
const stderrDone = pumpHostSshOutput(message.sessionId, "stderr", proc.stderr);
|
const stderrDone = pumpHostSshOutput(message.sessionId, "stderr", proc.stderr);
|
||||||
Promise.all([stdoutDone, stderrDone, proc.exited])
|
Promise.all([stdoutDone, stderrDone, proc.exited])
|
||||||
|
|||||||
Reference in New Issue
Block a user