fix: drop stale ssh data channels
This commit is contained in:
@@ -48,7 +48,7 @@ TCP pool 的长期方向是把 `trans`/`tran` 变成真正并发的短连接工
|
||||
|
||||
当前默认池大小是 10 条,设计上优先覆盖高频短 SSH、并发小文件和单个大文件不阻塞其他请求的场景。已验证的目标状态是:D601 这类 WSL provider 上 10 路并发 `trans ... argv bash -lc 'sleep 2'` 不再出现 `provider ssh tcp data pool has no idle channel`、stderr 为空、每一路 stdout 都包含命令开始和结束输出,结束后 labels 回到 `ready=desired`、`claimed=0`。当前仍存在端到端固定开销,10 路并发短命令的墙钟可能明显高于远端命令自身耗时;这属于后续连接建立、broker 调度、WSL SSH spawn 或 provider 启动路径的性能优化范围,不能用队列、门禁或隐藏重试掩盖。
|
||||
|
||||
开发中最容易踩的坑是把“依赖层在线”误判成“数据面可用”。`host.ssh` 只证明 provider 能执行维护 SSH;`host.ssh.tcp-pool`、`providerGatewaySshDataPoolReady`、`providerGatewaySshDataPoolClaimed` 和 `providerGatewaySshDataPoolLastError` 才能证明 TCP 数据池状态。另一个坑是输出尾部丢失:backend-core broker 在收到 `ssh.data` 后必须把 stdout/stderr 写入并 flush,再处理 `ssh.exit`,否则短命令可能 rc=0 但最后一段 stdout 没到调用端。第三个坑是 session 释放:`ssh.exit`、错误和超时路径都必须释放 claimed channel,避免下一批并发请求看到假性的池耗尽。
|
||||
开发中最容易踩的坑是把“依赖层在线”误判成“数据面可用”。`host.ssh` 只证明 provider 能执行维护 SSH;`host.ssh.tcp-pool`、`providerGatewaySshDataPoolReady`、`providerGatewaySshDataPoolClaimed` 和 `providerGatewaySshDataPoolLastError` 才能证明 TCP 数据池状态。另一个坑是输出尾部丢失:backend-core broker 在收到 `ssh.data` 后必须把 stdout/stderr 写入并 flush,再处理 `ssh.exit`,否则短命令可能 rc=0 但最后一段 stdout 没到调用端。第三个坑是 session 释放:`ssh.exit`、错误和超时路径都必须释放 claimed channel,避免下一批并发请求看到假性的池耗尽。第四个坑是 core/provider 池状态漂移:如果 provider 通过控制 WebSocket 返回 `host_ssh_error` 且提示 `requested ssh tcp data channel is not ready`,说明 core 侧 claim 到的 channel 已经不被 provider 认可,backend-core 必须 drop 该 `providerId + dataChannelId`,不能把它 release 回 idle pool 后继续重复 claim。
|
||||
|
||||
## WSL Compute Node Deployment
|
||||
|
||||
|
||||
@@ -4,6 +4,17 @@ function assertCondition(condition: unknown, message: string, detail: unknown =
|
||||
if (!condition) throw new Error(`${message}: ${JSON.stringify(detail)}`);
|
||||
}
|
||||
|
||||
function compareSemver(left: unknown, right: string): number {
|
||||
if (typeof left !== "string") return -1;
|
||||
const a = left.split(".").map((part) => Number.parseInt(part, 10));
|
||||
const b = right.split(".").map((part) => Number.parseInt(part, 10));
|
||||
for (let i = 0; i < Math.max(a.length, b.length); i += 1) {
|
||||
const delta = (Number.isFinite(a[i]) ? a[i] : 0) - (Number.isFinite(b[i]) ? b[i] : 0);
|
||||
if (delta !== 0) return delta;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
const rustBridge = readFileSync("src/components/backend-core/src/ssh_bridge.rs", "utf8");
|
||||
const rustData = readFileSync("src/components/backend-core/src/ssh_data_channel.rs", "utf8");
|
||||
const rustMain = readFileSync("src/components/backend-core/src/main.rs", "utf8");
|
||||
@@ -20,6 +31,8 @@ const backendCoreDockerfile = readFileSync("src/components/backend-core/Dockerfi
|
||||
assertCondition(rustBridge.includes('"host.ssh.tcp-pool"'), "backend-core ssh bridge must require host.ssh.tcp-pool");
|
||||
assertCondition(rustBridge.includes("provider-gateway-upgrade-required"), "old provider must fail with upgrade-required classification");
|
||||
assertCondition(rustBridge.includes("provider-data-pool-exhausted"), "tcp pool exhaustion must be visible");
|
||||
assertCondition(rustBridge.includes("remove_ssh_data_channel") && rustBridge.includes("ssh_data_channel_removed_after_control_error"), "control-fallback host_ssh_error must remove the claimed data channel");
|
||||
assertCondition(rustBridge.includes("provider-data-channel-missing"), "missing data channel must be classified for clients");
|
||||
assertCondition(!rustBridge.includes('"host_ssh_input"'), "ssh input must not fall back to provider control websocket");
|
||||
assertCondition(!rustBridge.includes('"host_ssh_eof"'), "ssh eof must not fall back to provider control websocket");
|
||||
assertCondition(!rustBridge.includes('"host_ssh_close"'), "ssh close must not fall back to provider control websocket");
|
||||
@@ -27,6 +40,7 @@ assertCondition(rustBridge.includes('json!({ "type": "input"') && rustBridge.inc
|
||||
assertCondition(!rustProviderRegistry.includes('"host_ssh_opened"') && !rustProviderRegistry.includes('"host_ssh_data"') && !rustProviderRegistry.includes('"host_ssh_exit"'), "rust provider registry must not accept old websocket ssh data messages");
|
||||
|
||||
assertCondition(rustData.includes("TcpListener") && rustData.includes("read_data_frame") && rustData.includes("write_data_frame"), "backend-core must expose raw TCP data frame listener");
|
||||
assertCondition(rustData.includes("pub async fn remove_ssh_data_channel") && rustData.includes("channels.remove(&key)") && rustData.includes(".max_by_key(|channel| channel.connected_at_millis)"), "backend-core must purge stale/dead data channels and prefer the freshest idle channel");
|
||||
assertCondition(rustData.includes("SSH_DATA_PROTOCOL") && rustData.includes("unidesk-host-ssh-tcp-pool-v1"), "tcp data protocol must be explicit");
|
||||
assertCondition(rustData.includes("base64::engine::general_purpose::STANDARD.encode(payload)"), "only client-facing websocket payload should remain base64 encoded");
|
||||
assertCondition(rustMain.includes("providerDataTcpUrl") && rustMain.includes("spawn_ssh_data_listener"), "backend-core startup must expose provider data listener visibility");
|
||||
@@ -38,14 +52,19 @@ assertCondition(provider.includes('capabilities.push("host.ssh", "host.ssh.tcp-p
|
||||
assertCondition(provider.includes("acquireSshDataChannel") && provider.includes("releaseSshDataChannel"), "provider-gateway must claim/release one data channel per ssh session");
|
||||
assertCondition(provider.includes("writeSshDataFrame(dataChannel") && provider.includes("sendSshDataSessionFrame"), "provider-gateway ssh output must use tcp data frames");
|
||||
assertCondition(!provider.includes('parsed.type === "host_ssh_input"') && !provider.includes('parsed.type === "host_ssh_close"'), "provider-gateway must not retain old websocket ssh input handlers");
|
||||
assertCondition(providerPackage.version === "0.2.28", "provider-gateway behavior change must bump package version", providerPackage);
|
||||
assertCondition(compareSemver(providerPackage.version, "0.2.29") >= 0, "provider-gateway tcp-pool contract must not regress below the deployed tcp-pool version", providerPackage);
|
||||
|
||||
assertCondition(shared.includes('transport: "tcp-pool"') && shared.includes("dataChannelId: string"), "shared host_ssh_open contract must require tcp-pool fields");
|
||||
assertCondition(!shared.includes("CoreHostSshInputMessage") && !shared.includes("ProviderHostSshDataMessage"), "shared protocol must remove old websocket ssh data contracts");
|
||||
assertCondition(!providerRegistryTs.includes("host_ssh_data") && !providerRegistryTs.includes("host_ssh_exit"), "typescript backend registry must not forward old websocket ssh data messages");
|
||||
assertCondition(compose.includes("UNIDESK_PROVIDER_DATA_PORT") && compose.includes("PROVIDER_DATA_POOL_SIZE"), "compose must wire provider data port and pool size");
|
||||
assertCondition(config.includes('"providerData"') && config.includes('"port": 18084') && config.includes('"containerPort": 8082'), "config.json must declare providerData port pair");
|
||||
assertCondition(backendCoreDockerfile.includes("ARG CARGO_BUILD_JOBS=1") && backendCoreDockerfile.includes('cargo build --release --jobs "${CARGO_BUILD_JOBS}"'), "backend-core main-server online build must keep cargo concurrency constrained");
|
||||
assertCondition(
|
||||
backendCoreDockerfile.includes("ARG CARGO_BUILD_JOBS=1")
|
||||
&& backendCoreDockerfile.includes('ENV CARGO_BUILD_JOBS=${CARGO_BUILD_JOBS}')
|
||||
&& backendCoreDockerfile.includes('cargo build --release --locked --jobs "${CARGO_BUILD_JOBS}"'),
|
||||
"backend-core main-server online build must keep cargo concurrency constrained",
|
||||
);
|
||||
assertCondition(compose.includes("UNIDESK_BACKEND_CORE_CARGO_BUILD_JOBS") && compose.includes("CARGO_BUILD_JOBS"), "compose must pass backend-core cargo build concurrency explicitly");
|
||||
|
||||
console.log(JSON.stringify({ ok: true, test: "ssh-data-tcp-pool-contract" }));
|
||||
|
||||
@@ -13,7 +13,7 @@ use crate::db::provider_supports;
|
||||
use crate::http::json_response;
|
||||
use crate::ssh_data_channel::{
|
||||
active_ssh_data_channel_count, claim_ssh_data_channel, close_ssh_data_session,
|
||||
idle_ssh_data_channel_count, send_ssh_data_frame,
|
||||
idle_ssh_data_channel_count, remove_ssh_data_channel, send_ssh_data_frame,
|
||||
};
|
||||
use crate::state::{AppState, SshClientConnection, SshDataFrame};
|
||||
|
||||
@@ -285,20 +285,53 @@ pub async fn forward_ssh_provider_message(state: &Arc<AppState>, message: &Value
|
||||
.get("sessionId")
|
||||
.and_then(Value::as_str)
|
||||
.unwrap_or("");
|
||||
let clients = state.active_ssh_clients.read().await;
|
||||
let Some(client) = clients.get(session_id) else {
|
||||
let message_type = message.get("type").and_then(Value::as_str).unwrap_or("");
|
||||
let message_text = message
|
||||
.get("message")
|
||||
.and_then(Value::as_str)
|
||||
.unwrap_or("ssh session error");
|
||||
let client = {
|
||||
let clients = state.active_ssh_clients.read().await;
|
||||
clients.get(session_id).map(|client| {
|
||||
(
|
||||
client.provider_id.clone(),
|
||||
client.data_channel_id.clone(),
|
||||
client.sender.clone(),
|
||||
)
|
||||
})
|
||||
};
|
||||
let Some((provider_id, data_channel_id, client_sender)) = client else {
|
||||
state.log("warn", "ssh_client_missing", Some(json!({ "providerId": message.get("providerId").cloned().unwrap_or(Value::Null), "sessionId": session_id, "type": message.get("type").cloned().unwrap_or(Value::Null) })));
|
||||
return;
|
||||
};
|
||||
let message_type = message.get("type").and_then(Value::as_str).unwrap_or("");
|
||||
let outbound = json!({
|
||||
let stale_channel_not_ready = message_type == "host_ssh_error"
|
||||
&& message_text.contains("requested ssh tcp data channel is not ready");
|
||||
if message_type == "host_ssh_error" {
|
||||
let removed = remove_ssh_data_channel(state, &provider_id, &data_channel_id).await;
|
||||
state.log(
|
||||
if removed { "warn" } else { "info" },
|
||||
"ssh_data_channel_removed_after_control_error",
|
||||
Some(json!({
|
||||
"providerId": provider_id.clone(),
|
||||
"sessionId": session_id,
|
||||
"dataChannelId": data_channel_id.clone(),
|
||||
"removed": removed,
|
||||
"message": message_text,
|
||||
})),
|
||||
);
|
||||
}
|
||||
let mut outbound = json!({
|
||||
"type": "ssh.error",
|
||||
"message": message.get("message").and_then(Value::as_str).unwrap_or("ssh session error"),
|
||||
"message": message_text,
|
||||
"transport": "tcp-pool",
|
||||
"controlFallback": true,
|
||||
});
|
||||
let _ = client.sender.send(Message::Text(outbound.to_string()));
|
||||
drop(clients);
|
||||
if stale_channel_not_ready {
|
||||
outbound["failureKind"] = Value::String("provider-data-channel-missing".to_string());
|
||||
outbound["providerId"] = Value::String(provider_id);
|
||||
outbound["dataChannelId"] = Value::String(data_channel_id);
|
||||
}
|
||||
let _ = client_sender.send(Message::Text(outbound.to_string()));
|
||||
if message_type == "host_ssh_error" {
|
||||
let state = state.clone();
|
||||
let session_id = session_id.to_string();
|
||||
|
||||
@@ -101,7 +101,9 @@ pub async fn claim_ssh_data_channel(
|
||||
let channel = channels
|
||||
.values_mut()
|
||||
.filter(|channel| channel.provider_id == provider_id && channel.active_session_id.is_none())
|
||||
.min_by(|left, right| left.channel_id.cmp(&right.channel_id))?;
|
||||
// Prefer the newest channel so a freshly reconnected pool is not
|
||||
// starved behind older stale entries that have not failed out yet.
|
||||
.max_by_key(|channel| channel.connected_at_millis)?;
|
||||
channel.active_session_id = Some(session_id.to_string());
|
||||
Some((channel.channel_id.clone(), channel.writer.clone()))
|
||||
}
|
||||
@@ -112,10 +114,16 @@ pub async fn send_ssh_data_frame(
|
||||
channel_id: &str,
|
||||
frame: SshDataFrame,
|
||||
) -> bool {
|
||||
let channels = state.active_ssh_data_channels.lock().await;
|
||||
channels
|
||||
.get(&channel_key(provider_id, channel_id))
|
||||
.is_some_and(|channel| channel.writer.send(frame).is_ok())
|
||||
let key = channel_key(provider_id, channel_id);
|
||||
let mut channels = state.active_ssh_data_channels.lock().await;
|
||||
let Some(channel) = channels.get(&key) else {
|
||||
return false;
|
||||
};
|
||||
if channel.writer.send(frame).is_ok() {
|
||||
return true;
|
||||
}
|
||||
channels.remove(&key);
|
||||
false
|
||||
}
|
||||
|
||||
pub async fn release_ssh_data_channel(
|
||||
@@ -129,6 +137,19 @@ pub async fn release_ssh_data_channel(
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn remove_ssh_data_channel(
|
||||
state: &Arc<AppState>,
|
||||
provider_id: &str,
|
||||
channel_id: &str,
|
||||
) -> bool {
|
||||
state
|
||||
.active_ssh_data_channels
|
||||
.lock()
|
||||
.await
|
||||
.remove(&channel_key(provider_id, channel_id))
|
||||
.is_some()
|
||||
}
|
||||
|
||||
pub async fn close_ssh_data_session(state: &Arc<AppState>, session_id: &str) {
|
||||
let client = {
|
||||
let clients = state.active_ssh_clients.read().await;
|
||||
@@ -294,6 +315,7 @@ async fn handle_ssh_data_stream(state: Arc<AppState>, stream: TcpStream) {
|
||||
channel_id: channel_id.clone(),
|
||||
writer: tx,
|
||||
active_session_id: None,
|
||||
connected_at_millis: chrono::Utc::now().timestamp_millis(),
|
||||
},
|
||||
);
|
||||
state.log(
|
||||
|
||||
@@ -36,6 +36,7 @@ pub struct SshDataChannel {
|
||||
pub channel_id: String,
|
||||
pub writer: mpsc::UnboundedSender<SshDataFrame>,
|
||||
pub active_session_id: Option<String>,
|
||||
pub connected_at_millis: i64,
|
||||
}
|
||||
|
||||
pub struct HttpTunnelResponse {
|
||||
|
||||
Reference in New Issue
Block a user