Merge pull request #1258 from pikasTech/fix/336-provider-late-ssh-data-frames
fix: quiet late provider SSH data frames
This commit is contained in:
@@ -48,7 +48,7 @@ TCP pool 的长期方向是把 `trans`/`tran` 变成真正并发的短连接工
|
||||
|
||||
当前默认池大小是 10 条,设计上优先覆盖高频短 SSH、并发小文件和单个大文件不阻塞其他请求的场景。已验证的目标状态是:D601 这类 WSL provider 上 10 路并发 `trans ... sh -- 'sleep 2'` 不再出现 `provider ssh tcp data pool has no idle channel`、stderr 为空、每一路 stdout 都包含命令开始和结束输出,结束后 labels 回到 `ready=desired`、`claimed=0`。当前仍存在端到端固定开销,10 路并发短命令的墙钟可能明显高于远端命令自身耗时;这属于后续连接建立、broker 调度、WSL SSH spawn 或 provider 启动路径的性能优化范围,不能用队列、门禁或隐藏重试掩盖。
|
||||
|
||||
开发中最容易踩的坑是把“依赖层在线”误判成“数据面可用”。`host.ssh` 只证明 provider 能执行维护 SSH;`host.ssh.tcp-pool`、`providerGatewaySshDataPoolReady`、`providerGatewaySshDataPoolClaimed` 和 `providerGatewaySshDataPoolLastError` 才能证明 TCP 数据池状态。另一个坑是输出尾部丢失:backend-core broker 在收到 `ssh.data` 后必须把 stdout/stderr 写入并 flush,再处理 `ssh.exit`,否则短命令可能 rc=0 但最后一段 stdout 没到调用端。第三个坑是 session 释放:`ssh.exit`、错误和超时路径都必须释放 claimed channel,避免下一批并发请求看到假性的池耗尽。第四个坑是 core/provider 池状态漂移:如果 provider 通过控制 WebSocket 返回 `host_ssh_error` 且提示 `requested ssh tcp data channel is not ready`,说明 core 侧 claim 到的 channel 已经不被 provider 认可,backend-core 必须 drop 该 `providerId + dataChannelId`,不能把它 release 回 idle pool 后继续重复 claim。
|
||||
开发中最容易踩的坑是把“依赖层在线”误判成“数据面可用”。`host.ssh` 只证明 provider 能执行维护 SSH;`host.ssh.tcp-pool`、`providerGatewaySshDataPoolReady`、`providerGatewaySshDataPoolClaimed` 和 `providerGatewaySshDataPoolLastError` 才能证明 TCP 数据池状态。另一个坑是输出尾部丢失:backend-core broker 在收到 `ssh.data` 后必须把 stdout/stderr 写入并 flush,再处理 `ssh.exit`,否则短命令可能 rc=0 但最后一段 stdout 没到调用端。第三个坑是 session 释放:`ssh.exit`、错误和超时路径都必须释放 claimed channel,避免下一批并发请求看到假性的池耗尽。第四个坑是 core/provider 池状态漂移:如果 provider 通过控制 WebSocket 返回 `host_ssh_error` 且提示 `requested ssh tcp data channel is not ready`,说明 core 侧 claim 到的 channel 已经不被 provider 认可,backend-core 必须 drop 该 `providerId + dataChannelId`,不能把它 release 回 idle pool 后继续重复 claim。第五个坑是短命令 session 生命周期竞态:channel 已 claim 但 provider 还没登记 session 时到达的 `input`、`eof`、`resize`、`close` 应进入有界 pending buffer;session 已正常退出、异常结束或被客户端 close 后短时间内到达的同 channel 控制帧应按 completed-session 归档并降为 debug,不应继续产生 `ssh_data_session_missing` warn;真正未知 session 或跨 channel 的数据帧仍必须保留 warn,避免掩盖真实数据丢失。
|
||||
|
||||
## WSL Compute Node Deployment
|
||||
|
||||
|
||||
@@ -109,6 +109,15 @@ interface PendingSshDataFrame {
|
||||
queuedAt: number;
|
||||
}
|
||||
|
||||
type CompletedHostSshSessionReason = "exit" | "error" | "client-close";
|
||||
|
||||
interface CompletedHostSshSession {
|
||||
dataChannelId: string;
|
||||
closedAt: number;
|
||||
reason: CompletedHostSshSessionReason;
|
||||
exitCode: number | null;
|
||||
}
|
||||
|
||||
interface SshDataChannel {
|
||||
id: string;
|
||||
socket: Socket;
|
||||
@@ -154,6 +163,7 @@ interface DockerContainerInspectDetails {
|
||||
}
|
||||
|
||||
const hostSshSessions = new Map<string, HostSshSession>();
|
||||
const completedHostSshSessions = new Map<string, CompletedHostSshSession>();
|
||||
const microserviceHttpCache = new Map<string, MicroserviceHttpCacheEntry>();
|
||||
const microserviceHttpInFlight = new Map<string, Promise<JsonValue>>();
|
||||
let providerEgressProxy: ProviderEgressProxyHandle | null = null;
|
||||
@@ -163,6 +173,8 @@ let sshDataChannelSeq = 0;
|
||||
let sshDataLastError: string | null = null;
|
||||
const sshDataPendingSessionFrameMaxCount = 64;
|
||||
const sshDataPendingSessionFrameMaxBytes = sshDataMaxPayloadBytes;
|
||||
const completedHostSshSessionTtlMs = 60_000;
|
||||
const completedHostSshSessionMaxCount = 512;
|
||||
const microserviceForwardRequestHeaders = [
|
||||
"accept",
|
||||
"content-type",
|
||||
@@ -696,6 +708,45 @@ function sendSshDataSessionFrame(sessionId: string, header: Record<string, unkno
|
||||
return writeSshDataFrame(channel, { ...header, sessionId }, payload);
|
||||
}
|
||||
|
||||
function pruneCompletedHostSshSessions(now = Date.now()): void {
|
||||
for (const [sessionId, completed] of completedHostSshSessions) {
|
||||
if (now - completed.closedAt > completedHostSshSessionTtlMs) completedHostSshSessions.delete(sessionId);
|
||||
}
|
||||
while (completedHostSshSessions.size > completedHostSshSessionMaxCount) {
|
||||
const oldestSessionId = completedHostSshSessions.keys().next().value;
|
||||
if (typeof oldestSessionId !== "string") break;
|
||||
completedHostSshSessions.delete(oldestSessionId);
|
||||
}
|
||||
}
|
||||
|
||||
function rememberCompletedHostSshSession(
|
||||
sessionId: string,
|
||||
dataChannelId: string | undefined,
|
||||
reason: CompletedHostSshSessionReason,
|
||||
exitCode: number | null = null,
|
||||
): void {
|
||||
if (dataChannelId === undefined) return;
|
||||
pruneCompletedHostSshSessions();
|
||||
completedHostSshSessions.delete(sessionId);
|
||||
completedHostSshSessions.set(sessionId, { dataChannelId, closedAt: Date.now(), reason, exitCode });
|
||||
}
|
||||
|
||||
function getRecentCompletedHostSshSession(sessionId: string, dataChannelId: string): CompletedHostSshSession | null {
|
||||
const completed = completedHostSshSessions.get(sessionId);
|
||||
if (completed === undefined) return null;
|
||||
const ageMs = Date.now() - completed.closedAt;
|
||||
if (ageMs > completedHostSshSessionTtlMs) {
|
||||
completedHostSshSessions.delete(sessionId);
|
||||
return null;
|
||||
}
|
||||
if (completed.dataChannelId !== dataChannelId) return null;
|
||||
return completed;
|
||||
}
|
||||
|
||||
function isSshDataFrameIgnorableAfterSessionClosed(type: string): boolean {
|
||||
return type === "input" || type === "eof" || type === "resize" || type === "close";
|
||||
}
|
||||
|
||||
function pendingSshDataFrameBytes(channel: SshDataChannel): number {
|
||||
return channel.pendingFrames.reduce((total, frame) => total + frame.payload.length, 0);
|
||||
}
|
||||
@@ -748,6 +799,19 @@ function handleSshDataFrame(channel: SshDataChannel, header: Record<string, unkn
|
||||
bufferSshDataFrameBeforeSessionReady(channel, sessionId, type, header, payload);
|
||||
return;
|
||||
}
|
||||
const completed = getRecentCompletedHostSshSession(sessionId, channel.id);
|
||||
if (completed !== null && isSshDataFrameIgnorableAfterSessionClosed(type)) {
|
||||
logger("debug", "ssh_data_frame_ignored_after_session_closed", {
|
||||
channelId: channel.id,
|
||||
sessionId,
|
||||
type,
|
||||
reason: completed.reason,
|
||||
exitCode: completed.exitCode,
|
||||
ageMs: Date.now() - completed.closedAt,
|
||||
payloadBytes: payload.length,
|
||||
});
|
||||
return;
|
||||
}
|
||||
logger("warn", "ssh_data_session_missing", { channelId: channel.id, sessionId, type });
|
||||
return;
|
||||
}
|
||||
@@ -1935,11 +1999,13 @@ function startHostSshSession(message: CoreHostSshOpenMessage): void {
|
||||
signal: null,
|
||||
at: new Date().toISOString(),
|
||||
});
|
||||
rememberCompletedHostSshSession(message.sessionId, message.dataChannelId, "exit", typeof exitCode === "number" ? exitCode : null);
|
||||
hostSshSessions.delete(message.sessionId);
|
||||
releaseSshDataChannel(message.dataChannelId, message.sessionId);
|
||||
})
|
||||
.catch((error) => {
|
||||
sendHostSshError(message.sessionId, error);
|
||||
rememberCompletedHostSshSession(message.sessionId, message.dataChannelId, "error");
|
||||
hostSshSessions.delete(message.sessionId);
|
||||
releaseSshDataChannel(message.dataChannelId, message.sessionId);
|
||||
});
|
||||
@@ -1953,6 +2019,7 @@ function startHostSshSession(message: CoreHostSshOpenMessage): void {
|
||||
function closeHostSshSession(sessionId: string): void {
|
||||
const session = hostSshSessions.get(sessionId);
|
||||
if (session === undefined) return;
|
||||
rememberCompletedHostSshSession(sessionId, session.dataChannelId, "client-close");
|
||||
hostSshSessions.delete(sessionId);
|
||||
if (session.dataChannelId !== undefined) releaseSshDataChannel(session.dataChannelId, sessionId);
|
||||
session.proc.kill("SIGTERM");
|
||||
|
||||
Reference in New Issue
Block a user