fix: quiet late ssh data frames after provider sessions

This commit is contained in:
Codex
2026-06-29 16:18:41 +00:00
parent f28e851f69
commit ef68531e17
2 changed files with 68 additions and 1 deletions
+1 -1
View File
@@ -48,7 +48,7 @@ TCP pool 的长期方向是把 `trans`/`tran` 变成真正并发的短连接工
当前默认池大小是 10 条,设计上优先覆盖高频短 SSH、并发小文件和单个大文件不阻塞其他请求的场景。已验证的目标状态是:D601 这类 WSL provider 上 10 路并发 `trans ... sh -- 'sleep 2'` 不再出现 `provider ssh tcp data pool has no idle channel`、stderr 为空、每一路 stdout 都包含命令开始和结束输出,结束后 labels 回到 `ready=desired``claimed=0`。当前仍存在端到端固定开销,10 路并发短命令的墙钟可能明显高于远端命令自身耗时;这属于后续连接建立、broker 调度、WSL SSH spawn 或 provider 启动路径的性能优化范围,不能用队列、门禁或隐藏重试掩盖。 当前默认池大小是 10 条,设计上优先覆盖高频短 SSH、并发小文件和单个大文件不阻塞其他请求的场景。已验证的目标状态是:D601 这类 WSL provider 上 10 路并发 `trans ... sh -- 'sleep 2'` 不再出现 `provider ssh tcp data pool has no idle channel`、stderr 为空、每一路 stdout 都包含命令开始和结束输出,结束后 labels 回到 `ready=desired``claimed=0`。当前仍存在端到端固定开销,10 路并发短命令的墙钟可能明显高于远端命令自身耗时;这属于后续连接建立、broker 调度、WSL SSH spawn 或 provider 启动路径的性能优化范围,不能用队列、门禁或隐藏重试掩盖。
开发中最容易踩的坑是把“依赖层在线”误判成“数据面可用”。`host.ssh` 只证明 provider 能执行维护 SSH`host.ssh.tcp-pool``providerGatewaySshDataPoolReady``providerGatewaySshDataPoolClaimed``providerGatewaySshDataPoolLastError` 才能证明 TCP 数据池状态。另一个坑是输出尾部丢失:backend-core broker 在收到 `ssh.data` 后必须把 stdout/stderr 写入并 flush,再处理 `ssh.exit`,否则短命令可能 rc=0 但最后一段 stdout 没到调用端。第三个坑是 session 释放:`ssh.exit`、错误和超时路径都必须释放 claimed channel,避免下一批并发请求看到假性的池耗尽。第四个坑是 core/provider 池状态漂移:如果 provider 通过控制 WebSocket 返回 `host_ssh_error` 且提示 `requested ssh tcp data channel is not ready`,说明 core 侧 claim 到的 channel 已经不被 provider 认可,backend-core 必须 drop 该 `providerId + dataChannelId`,不能把它 release 回 idle pool 后继续重复 claim。 开发中最容易踩的坑是把“依赖层在线”误判成“数据面可用”。`host.ssh` 只证明 provider 能执行维护 SSH`host.ssh.tcp-pool``providerGatewaySshDataPoolReady``providerGatewaySshDataPoolClaimed``providerGatewaySshDataPoolLastError` 才能证明 TCP 数据池状态。另一个坑是输出尾部丢失:backend-core broker 在收到 `ssh.data` 后必须把 stdout/stderr 写入并 flush,再处理 `ssh.exit`,否则短命令可能 rc=0 但最后一段 stdout 没到调用端。第三个坑是 session 释放:`ssh.exit`、错误和超时路径都必须释放 claimed channel,避免下一批并发请求看到假性的池耗尽。第四个坑是 core/provider 池状态漂移:如果 provider 通过控制 WebSocket 返回 `host_ssh_error` 且提示 `requested ssh tcp data channel is not ready`,说明 core 侧 claim 到的 channel 已经不被 provider 认可,backend-core 必须 drop 该 `providerId + dataChannelId`,不能把它 release 回 idle pool 后继续重复 claim。第五个坑是短命令 session 生命周期竞态:channel 已 claim 但 provider 还没登记 session 时到达的 `input``eof``resize``close` 应进入有界 pending buffersession 已正常退出、异常结束或被客户端 close 后短时间内到达的同 channel 控制帧应按 completed-session 归档并降为 debug,不应继续产生 `ssh_data_session_missing` warn;真正未知 session 或跨 channel 的数据帧仍必须保留 warn,避免掩盖真实数据丢失。
## WSL Compute Node Deployment ## WSL Compute Node Deployment
@@ -109,6 +109,15 @@ interface PendingSshDataFrame {
queuedAt: number; queuedAt: number;
} }
type CompletedHostSshSessionReason = "exit" | "error" | "client-close";
interface CompletedHostSshSession {
dataChannelId: string;
closedAt: number;
reason: CompletedHostSshSessionReason;
exitCode: number | null;
}
interface SshDataChannel { interface SshDataChannel {
id: string; id: string;
socket: Socket; socket: Socket;
@@ -154,6 +163,7 @@ interface DockerContainerInspectDetails {
} }
const hostSshSessions = new Map<string, HostSshSession>(); const hostSshSessions = new Map<string, HostSshSession>();
const completedHostSshSessions = new Map<string, CompletedHostSshSession>();
const microserviceHttpCache = new Map<string, MicroserviceHttpCacheEntry>(); const microserviceHttpCache = new Map<string, MicroserviceHttpCacheEntry>();
const microserviceHttpInFlight = new Map<string, Promise<JsonValue>>(); const microserviceHttpInFlight = new Map<string, Promise<JsonValue>>();
let providerEgressProxy: ProviderEgressProxyHandle | null = null; let providerEgressProxy: ProviderEgressProxyHandle | null = null;
@@ -163,6 +173,8 @@ let sshDataChannelSeq = 0;
let sshDataLastError: string | null = null; let sshDataLastError: string | null = null;
const sshDataPendingSessionFrameMaxCount = 64; const sshDataPendingSessionFrameMaxCount = 64;
const sshDataPendingSessionFrameMaxBytes = sshDataMaxPayloadBytes; const sshDataPendingSessionFrameMaxBytes = sshDataMaxPayloadBytes;
const completedHostSshSessionTtlMs = 60_000;
const completedHostSshSessionMaxCount = 512;
const microserviceForwardRequestHeaders = [ const microserviceForwardRequestHeaders = [
"accept", "accept",
"content-type", "content-type",
@@ -696,6 +708,45 @@ function sendSshDataSessionFrame(sessionId: string, header: Record<string, unkno
return writeSshDataFrame(channel, { ...header, sessionId }, payload); return writeSshDataFrame(channel, { ...header, sessionId }, payload);
} }
function pruneCompletedHostSshSessions(now = Date.now()): void {
for (const [sessionId, completed] of completedHostSshSessions) {
if (now - completed.closedAt > completedHostSshSessionTtlMs) completedHostSshSessions.delete(sessionId);
}
while (completedHostSshSessions.size > completedHostSshSessionMaxCount) {
const oldestSessionId = completedHostSshSessions.keys().next().value;
if (typeof oldestSessionId !== "string") break;
completedHostSshSessions.delete(oldestSessionId);
}
}
function rememberCompletedHostSshSession(
sessionId: string,
dataChannelId: string | undefined,
reason: CompletedHostSshSessionReason,
exitCode: number | null = null,
): void {
if (dataChannelId === undefined) return;
pruneCompletedHostSshSessions();
completedHostSshSessions.delete(sessionId);
completedHostSshSessions.set(sessionId, { dataChannelId, closedAt: Date.now(), reason, exitCode });
}
function getRecentCompletedHostSshSession(sessionId: string, dataChannelId: string): CompletedHostSshSession | null {
const completed = completedHostSshSessions.get(sessionId);
if (completed === undefined) return null;
const ageMs = Date.now() - completed.closedAt;
if (ageMs > completedHostSshSessionTtlMs) {
completedHostSshSessions.delete(sessionId);
return null;
}
if (completed.dataChannelId !== dataChannelId) return null;
return completed;
}
function isSshDataFrameIgnorableAfterSessionClosed(type: string): boolean {
return type === "input" || type === "eof" || type === "resize" || type === "close";
}
function pendingSshDataFrameBytes(channel: SshDataChannel): number { function pendingSshDataFrameBytes(channel: SshDataChannel): number {
return channel.pendingFrames.reduce((total, frame) => total + frame.payload.length, 0); return channel.pendingFrames.reduce((total, frame) => total + frame.payload.length, 0);
} }
@@ -748,6 +799,19 @@ function handleSshDataFrame(channel: SshDataChannel, header: Record<string, unkn
bufferSshDataFrameBeforeSessionReady(channel, sessionId, type, header, payload); bufferSshDataFrameBeforeSessionReady(channel, sessionId, type, header, payload);
return; return;
} }
const completed = getRecentCompletedHostSshSession(sessionId, channel.id);
if (completed !== null && isSshDataFrameIgnorableAfterSessionClosed(type)) {
logger("debug", "ssh_data_frame_ignored_after_session_closed", {
channelId: channel.id,
sessionId,
type,
reason: completed.reason,
exitCode: completed.exitCode,
ageMs: Date.now() - completed.closedAt,
payloadBytes: payload.length,
});
return;
}
logger("warn", "ssh_data_session_missing", { channelId: channel.id, sessionId, type }); logger("warn", "ssh_data_session_missing", { channelId: channel.id, sessionId, type });
return; return;
} }
@@ -1935,11 +1999,13 @@ function startHostSshSession(message: CoreHostSshOpenMessage): void {
signal: null, signal: null,
at: new Date().toISOString(), at: new Date().toISOString(),
}); });
rememberCompletedHostSshSession(message.sessionId, message.dataChannelId, "exit", typeof exitCode === "number" ? exitCode : null);
hostSshSessions.delete(message.sessionId); hostSshSessions.delete(message.sessionId);
releaseSshDataChannel(message.dataChannelId, message.sessionId); releaseSshDataChannel(message.dataChannelId, message.sessionId);
}) })
.catch((error) => { .catch((error) => {
sendHostSshError(message.sessionId, error); sendHostSshError(message.sessionId, error);
rememberCompletedHostSshSession(message.sessionId, message.dataChannelId, "error");
hostSshSessions.delete(message.sessionId); hostSshSessions.delete(message.sessionId);
releaseSshDataChannel(message.dataChannelId, message.sessionId); releaseSshDataChannel(message.dataChannelId, message.sessionId);
}); });
@@ -1953,6 +2019,7 @@ function startHostSshSession(message: CoreHostSshOpenMessage): void {
function closeHostSshSession(sessionId: string): void { function closeHostSshSession(sessionId: string): void {
const session = hostSshSessions.get(sessionId); const session = hostSshSessions.get(sessionId);
if (session === undefined) return; if (session === undefined) return;
rememberCompletedHostSshSession(sessionId, session.dataChannelId, "client-close");
hostSshSessions.delete(sessionId); hostSshSessions.delete(sessionId);
if (session.dataChannelId !== undefined) releaseSshDataChannel(session.dataChannelId, sessionId); if (session.dataChannelId !== undefined) releaseSshDataChannel(session.dataChannelId, sessionId);
session.proc.kill("SIGTERM"); session.proc.kill("SIGTERM");