diff --git a/docs/reference/provider-gateway.md b/docs/reference/provider-gateway.md index 4cb4ad30..31f74fd2 100644 --- a/docs/reference/provider-gateway.md +++ b/docs/reference/provider-gateway.md @@ -48,7 +48,7 @@ TCP pool 的长期方向是把 `trans`/`tran` 变成真正并发的短连接工 当前默认池大小是 10 条,设计上优先覆盖高频短 SSH、并发小文件和单个大文件不阻塞其他请求的场景。已验证的目标状态是:D601 这类 WSL provider 上 10 路并发 `trans ... argv bash -lc 'sleep 2'` 不再出现 `provider ssh tcp data pool has no idle channel`、stderr 为空、每一路 stdout 都包含命令开始和结束输出,结束后 labels 回到 `ready=desired`、`claimed=0`。当前仍存在端到端固定开销,10 路并发短命令的墙钟可能明显高于远端命令自身耗时;这属于后续连接建立、broker 调度、WSL SSH spawn 或 provider 启动路径的性能优化范围,不能用队列、门禁或隐藏重试掩盖。 -开发中最容易踩的坑是把“依赖层在线”误判成“数据面可用”。`host.ssh` 只证明 provider 能执行维护 SSH;`host.ssh.tcp-pool`、`providerGatewaySshDataPoolReady`、`providerGatewaySshDataPoolClaimed` 和 `providerGatewaySshDataPoolLastError` 才能证明 TCP 数据池状态。另一个坑是输出尾部丢失:backend-core broker 在收到 `ssh.data` 后必须把 stdout/stderr 写入并 flush,再处理 `ssh.exit`,否则短命令可能 rc=0 但最后一段 stdout 没到调用端。第三个坑是 session 释放:`ssh.exit`、错误和超时路径都必须释放 claimed channel,避免下一批并发请求看到假性的池耗尽。 +开发中最容易踩的坑是把“依赖层在线”误判成“数据面可用”。`host.ssh` 只证明 provider 能执行维护 SSH;`host.ssh.tcp-pool`、`providerGatewaySshDataPoolReady`、`providerGatewaySshDataPoolClaimed` 和 `providerGatewaySshDataPoolLastError` 才能证明 TCP 数据池状态。另一个坑是输出尾部丢失:backend-core broker 在收到 `ssh.data` 后必须把 stdout/stderr 写入并 flush,再处理 `ssh.exit`,否则短命令可能 rc=0 但最后一段 stdout 没到调用端。第三个坑是 session 释放:`ssh.exit`、错误和超时路径都必须释放 claimed channel,避免下一批并发请求看到假性的池耗尽。第四个坑是 core/provider 池状态漂移:如果 provider 通过控制 WebSocket 返回 `host_ssh_error` 且提示 `requested ssh tcp data channel is not ready`,说明 core 侧 claim 到的 channel 已经不被 provider 认可,backend-core 必须 drop 该 `providerId + dataChannelId`,不能把它 release 回 idle pool 后继续重复 claim。 ## WSL Compute Node Deployment diff --git a/scripts/ssh-data-tcp-pool-contract-test.ts b/scripts/ssh-data-tcp-pool-contract-test.ts index b9ec7174..20e3f742 100644 --- a/scripts/ssh-data-tcp-pool-contract-test.ts +++ b/scripts/ssh-data-tcp-pool-contract-test.ts @@ -4,6 +4,17 @@ function assertCondition(condition: unknown, message: string, detail: unknown = if (!condition) throw new Error(`${message}: ${JSON.stringify(detail)}`); } +function compareSemver(left: unknown, right: string): number { + if (typeof left !== "string") return -1; + const a = left.split(".").map((part) => Number.parseInt(part, 10)); + const b = right.split(".").map((part) => Number.parseInt(part, 10)); + for (let i = 0; i < Math.max(a.length, b.length); i += 1) { + const delta = (Number.isFinite(a[i]) ? a[i] : 0) - (Number.isFinite(b[i]) ? b[i] : 0); + if (delta !== 0) return delta; + } + return 0; +} + const rustBridge = readFileSync("src/components/backend-core/src/ssh_bridge.rs", "utf8"); const rustData = readFileSync("src/components/backend-core/src/ssh_data_channel.rs", "utf8"); const rustMain = readFileSync("src/components/backend-core/src/main.rs", "utf8"); @@ -20,6 +31,8 @@ const backendCoreDockerfile = readFileSync("src/components/backend-core/Dockerfi assertCondition(rustBridge.includes('"host.ssh.tcp-pool"'), "backend-core ssh bridge must require host.ssh.tcp-pool"); assertCondition(rustBridge.includes("provider-gateway-upgrade-required"), "old provider must fail with upgrade-required classification"); assertCondition(rustBridge.includes("provider-data-pool-exhausted"), "tcp pool exhaustion must be visible"); +assertCondition(rustBridge.includes("remove_ssh_data_channel") && rustBridge.includes("ssh_data_channel_removed_after_control_error"), "control-fallback host_ssh_error must remove the claimed data channel"); +assertCondition(rustBridge.includes("provider-data-channel-missing"), "missing data channel must be classified for clients"); assertCondition(!rustBridge.includes('"host_ssh_input"'), "ssh input must not fall back to provider control websocket"); assertCondition(!rustBridge.includes('"host_ssh_eof"'), "ssh eof must not fall back to provider control websocket"); assertCondition(!rustBridge.includes('"host_ssh_close"'), "ssh close must not fall back to provider control websocket"); @@ -27,6 +40,7 @@ assertCondition(rustBridge.includes('json!({ "type": "input"') && rustBridge.inc assertCondition(!rustProviderRegistry.includes('"host_ssh_opened"') && !rustProviderRegistry.includes('"host_ssh_data"') && !rustProviderRegistry.includes('"host_ssh_exit"'), "rust provider registry must not accept old websocket ssh data messages"); assertCondition(rustData.includes("TcpListener") && rustData.includes("read_data_frame") && rustData.includes("write_data_frame"), "backend-core must expose raw TCP data frame listener"); +assertCondition(rustData.includes("pub async fn remove_ssh_data_channel") && rustData.includes("channels.remove(&key)") && rustData.includes(".max_by_key(|channel| channel.connected_at_millis)"), "backend-core must purge stale/dead data channels and prefer the freshest idle channel"); assertCondition(rustData.includes("SSH_DATA_PROTOCOL") && rustData.includes("unidesk-host-ssh-tcp-pool-v1"), "tcp data protocol must be explicit"); assertCondition(rustData.includes("base64::engine::general_purpose::STANDARD.encode(payload)"), "only client-facing websocket payload should remain base64 encoded"); assertCondition(rustMain.includes("providerDataTcpUrl") && rustMain.includes("spawn_ssh_data_listener"), "backend-core startup must expose provider data listener visibility"); @@ -38,14 +52,19 @@ assertCondition(provider.includes('capabilities.push("host.ssh", "host.ssh.tcp-p assertCondition(provider.includes("acquireSshDataChannel") && provider.includes("releaseSshDataChannel"), "provider-gateway must claim/release one data channel per ssh session"); assertCondition(provider.includes("writeSshDataFrame(dataChannel") && provider.includes("sendSshDataSessionFrame"), "provider-gateway ssh output must use tcp data frames"); assertCondition(!provider.includes('parsed.type === "host_ssh_input"') && !provider.includes('parsed.type === "host_ssh_close"'), "provider-gateway must not retain old websocket ssh input handlers"); -assertCondition(providerPackage.version === "0.2.28", "provider-gateway behavior change must bump package version", providerPackage); +assertCondition(compareSemver(providerPackage.version, "0.2.29") >= 0, "provider-gateway tcp-pool contract must not regress below the deployed tcp-pool version", providerPackage); assertCondition(shared.includes('transport: "tcp-pool"') && shared.includes("dataChannelId: string"), "shared host_ssh_open contract must require tcp-pool fields"); assertCondition(!shared.includes("CoreHostSshInputMessage") && !shared.includes("ProviderHostSshDataMessage"), "shared protocol must remove old websocket ssh data contracts"); assertCondition(!providerRegistryTs.includes("host_ssh_data") && !providerRegistryTs.includes("host_ssh_exit"), "typescript backend registry must not forward old websocket ssh data messages"); assertCondition(compose.includes("UNIDESK_PROVIDER_DATA_PORT") && compose.includes("PROVIDER_DATA_POOL_SIZE"), "compose must wire provider data port and pool size"); assertCondition(config.includes('"providerData"') && config.includes('"port": 18084') && config.includes('"containerPort": 8082'), "config.json must declare providerData port pair"); -assertCondition(backendCoreDockerfile.includes("ARG CARGO_BUILD_JOBS=1") && backendCoreDockerfile.includes('cargo build --release --jobs "${CARGO_BUILD_JOBS}"'), "backend-core main-server online build must keep cargo concurrency constrained"); +assertCondition( + backendCoreDockerfile.includes("ARG CARGO_BUILD_JOBS=1") + && backendCoreDockerfile.includes('ENV CARGO_BUILD_JOBS=${CARGO_BUILD_JOBS}') + && backendCoreDockerfile.includes('cargo build --release --locked --jobs "${CARGO_BUILD_JOBS}"'), + "backend-core main-server online build must keep cargo concurrency constrained", +); assertCondition(compose.includes("UNIDESK_BACKEND_CORE_CARGO_BUILD_JOBS") && compose.includes("CARGO_BUILD_JOBS"), "compose must pass backend-core cargo build concurrency explicitly"); console.log(JSON.stringify({ ok: true, test: "ssh-data-tcp-pool-contract" })); diff --git a/src/components/backend-core/src/ssh_bridge.rs b/src/components/backend-core/src/ssh_bridge.rs index 9199e87e..bcea5b3b 100644 --- a/src/components/backend-core/src/ssh_bridge.rs +++ b/src/components/backend-core/src/ssh_bridge.rs @@ -13,7 +13,7 @@ use crate::db::provider_supports; use crate::http::json_response; use crate::ssh_data_channel::{ active_ssh_data_channel_count, claim_ssh_data_channel, close_ssh_data_session, - idle_ssh_data_channel_count, send_ssh_data_frame, + idle_ssh_data_channel_count, remove_ssh_data_channel, send_ssh_data_frame, }; use crate::state::{AppState, SshClientConnection, SshDataFrame}; @@ -285,20 +285,53 @@ pub async fn forward_ssh_provider_message(state: &Arc, message: &Value .get("sessionId") .and_then(Value::as_str) .unwrap_or(""); - let clients = state.active_ssh_clients.read().await; - let Some(client) = clients.get(session_id) else { + let message_type = message.get("type").and_then(Value::as_str).unwrap_or(""); + let message_text = message + .get("message") + .and_then(Value::as_str) + .unwrap_or("ssh session error"); + let client = { + let clients = state.active_ssh_clients.read().await; + clients.get(session_id).map(|client| { + ( + client.provider_id.clone(), + client.data_channel_id.clone(), + client.sender.clone(), + ) + }) + }; + let Some((provider_id, data_channel_id, client_sender)) = client else { state.log("warn", "ssh_client_missing", Some(json!({ "providerId": message.get("providerId").cloned().unwrap_or(Value::Null), "sessionId": session_id, "type": message.get("type").cloned().unwrap_or(Value::Null) }))); return; }; - let message_type = message.get("type").and_then(Value::as_str).unwrap_or(""); - let outbound = json!({ + let stale_channel_not_ready = message_type == "host_ssh_error" + && message_text.contains("requested ssh tcp data channel is not ready"); + if message_type == "host_ssh_error" { + let removed = remove_ssh_data_channel(state, &provider_id, &data_channel_id).await; + state.log( + if removed { "warn" } else { "info" }, + "ssh_data_channel_removed_after_control_error", + Some(json!({ + "providerId": provider_id.clone(), + "sessionId": session_id, + "dataChannelId": data_channel_id.clone(), + "removed": removed, + "message": message_text, + })), + ); + } + let mut outbound = json!({ "type": "ssh.error", - "message": message.get("message").and_then(Value::as_str).unwrap_or("ssh session error"), + "message": message_text, "transport": "tcp-pool", "controlFallback": true, }); - let _ = client.sender.send(Message::Text(outbound.to_string())); - drop(clients); + if stale_channel_not_ready { + outbound["failureKind"] = Value::String("provider-data-channel-missing".to_string()); + outbound["providerId"] = Value::String(provider_id); + outbound["dataChannelId"] = Value::String(data_channel_id); + } + let _ = client_sender.send(Message::Text(outbound.to_string())); if message_type == "host_ssh_error" { let state = state.clone(); let session_id = session_id.to_string(); diff --git a/src/components/backend-core/src/ssh_data_channel.rs b/src/components/backend-core/src/ssh_data_channel.rs index 61d9bc49..aa413ec6 100644 --- a/src/components/backend-core/src/ssh_data_channel.rs +++ b/src/components/backend-core/src/ssh_data_channel.rs @@ -101,7 +101,9 @@ pub async fn claim_ssh_data_channel( let channel = channels .values_mut() .filter(|channel| channel.provider_id == provider_id && channel.active_session_id.is_none()) - .min_by(|left, right| left.channel_id.cmp(&right.channel_id))?; + // Prefer the newest channel so a freshly reconnected pool is not + // starved behind older stale entries that have not failed out yet. + .max_by_key(|channel| channel.connected_at_millis)?; channel.active_session_id = Some(session_id.to_string()); Some((channel.channel_id.clone(), channel.writer.clone())) } @@ -112,10 +114,16 @@ pub async fn send_ssh_data_frame( channel_id: &str, frame: SshDataFrame, ) -> bool { - let channels = state.active_ssh_data_channels.lock().await; - channels - .get(&channel_key(provider_id, channel_id)) - .is_some_and(|channel| channel.writer.send(frame).is_ok()) + let key = channel_key(provider_id, channel_id); + let mut channels = state.active_ssh_data_channels.lock().await; + let Some(channel) = channels.get(&key) else { + return false; + }; + if channel.writer.send(frame).is_ok() { + return true; + } + channels.remove(&key); + false } pub async fn release_ssh_data_channel( @@ -129,6 +137,19 @@ pub async fn release_ssh_data_channel( } } +pub async fn remove_ssh_data_channel( + state: &Arc, + provider_id: &str, + channel_id: &str, +) -> bool { + state + .active_ssh_data_channels + .lock() + .await + .remove(&channel_key(provider_id, channel_id)) + .is_some() +} + pub async fn close_ssh_data_session(state: &Arc, session_id: &str) { let client = { let clients = state.active_ssh_clients.read().await; @@ -294,6 +315,7 @@ async fn handle_ssh_data_stream(state: Arc, stream: TcpStream) { channel_id: channel_id.clone(), writer: tx, active_session_id: None, + connected_at_millis: chrono::Utc::now().timestamp_millis(), }, ); state.log( diff --git a/src/components/backend-core/src/state.rs b/src/components/backend-core/src/state.rs index bdadb0d5..db417d3f 100644 --- a/src/components/backend-core/src/state.rs +++ b/src/components/backend-core/src/state.rs @@ -36,6 +36,7 @@ pub struct SshDataChannel { pub channel_id: String, pub writer: mpsc::UnboundedSender, pub active_session_id: Option, + pub connected_at_millis: i64, } pub struct HttpTunnelResponse {