diff --git a/src/components/provider-gateway/src/index.ts b/src/components/provider-gateway/src/index.ts index 014dcdfe..f627d855 100644 --- a/src/components/provider-gateway/src/index.ts +++ b/src/components/provider-gateway/src/index.ts @@ -103,11 +103,18 @@ interface HostSshSession { dataChannelId?: string; } +interface PendingSshDataFrame { + header: Record; + payload: Buffer; + queuedAt: number; +} + interface SshDataChannel { id: string; socket: Socket; status: "connecting" | "ready" | "claimed" | "closed"; buffer: Buffer; + pendingFrames: PendingSshDataFrame[]; openedAt: number; lastReadyAt: number | null; activeSessionId: string | null; @@ -154,6 +161,8 @@ const sshDataChannels = new Map(); let sshDataPoolDesired = false; let sshDataChannelSeq = 0; let sshDataLastError: string | null = null; +const sshDataPendingSessionFrameMaxCount = 64; +const sshDataPendingSessionFrameMaxBytes = sshDataMaxPayloadBytes; const microserviceForwardRequestHeaders = [ "accept", "content-type", @@ -687,6 +696,45 @@ function sendSshDataSessionFrame(sessionId: string, header: Record total + frame.payload.length, 0); +} + +function shouldBufferSshDataFrameBeforeSessionReady(channel: SshDataChannel, sessionId: string, type: string): boolean { + return ( + channel.activeSessionId === sessionId + && (type === "input" || type === "eof" || type === "resize" || type === "close") + ); +} + +function bufferSshDataFrameBeforeSessionReady(channel: SshDataChannel, sessionId: string, type: string, header: Record, payload: Buffer): void { + const pendingBytes = pendingSshDataFrameBytes(channel); + if ( + channel.pendingFrames.length >= sshDataPendingSessionFrameMaxCount + || pendingBytes + payload.length > sshDataPendingSessionFrameMaxBytes + ) { + const message = `ssh data frames arrived before session was ready and exceeded pending buffer limits: frames=${channel.pendingFrames.length} bytes=${pendingBytes}`; + channel.lastError = message; + sshDataLastError = message; + logger("warn", "ssh_data_pending_session_buffer_overflow", { channelId: channel.id, sessionId, type, pendingFrames: channel.pendingFrames.length, pendingBytes, payloadBytes: payload.length }); + writeSshDataFrame(channel, { type: "error", sessionId, message, at: new Date().toISOString() }); + closeSshDataChannel(channel, "pending session frame buffer overflow"); + return; + } + channel.pendingFrames.push({ header, payload, queuedAt: Date.now() }); + logger("debug", "ssh_data_frame_buffered_before_session_ready", { channelId: channel.id, sessionId, type, pendingFrames: channel.pendingFrames.length, payloadBytes: payload.length }); +} + +function replayBufferedSshDataFrames(sessionId: string, channel: SshDataChannel): void { + if (channel.pendingFrames.length === 0) return; + const frames = channel.pendingFrames.splice(0); + logger("info", "ssh_data_pending_session_frames_replayed", { channelId: channel.id, sessionId, frames: frames.length, oldestAgeMs: Date.now() - frames[0]!.queuedAt }); + for (const frame of frames) { + handleSshDataFrame(channel, frame.header, frame.payload); + if (!hostSshSessions.has(sessionId)) break; + } +} + function handleSshDataFrame(channel: SshDataChannel, header: Record, payload: Buffer): void { const type = typeof header.type === "string" ? header.type : ""; const sessionId = typeof header.sessionId === "string" ? header.sessionId : ""; @@ -696,6 +744,10 @@ function handleSshDataFrame(channel: SshDataChannel, header: Record