From 03cbf609f65ce6bf6c9c9ee4a4a0134751e47332 Mon Sep 17 00:00:00 2001 From: rustdesk Date: Sun, 17 May 2026 15:30:50 +0800 Subject: [PATCH] feat: race WebRTC as a direct transport enhancement --- src/client.rs | 218 +++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 209 insertions(+), 9 deletions(-) diff --git a/src/client.rs b/src/client.rs index 321a49ee6..2f9f02ad5 100644 --- a/src/client.rs +++ b/src/client.rs @@ -65,11 +65,12 @@ use hbb_common::{ self, net::UdpSocket, sync::{ - mpsc::{unbounded_channel, UnboundedReceiver}, + mpsc::{error::TryRecvError, unbounded_channel, UnboundedReceiver}, oneshot, }, time::{interval, Duration, Instant}, }, + webrtc::WebRTCStream, AddrMangle, ResultType, Stream, }; pub use helper::*; @@ -330,6 +331,19 @@ impl Client { } else { (None, None) }; + let ipv6 = if crate::get_ipv6_punch_enabled() { + crate::get_ipv6_socket().await + } else { + None + }; + let webrtc_offerer = + match WebRTCStream::new("", interface.is_force_relay(), CONNECT_TIMEOUT).await { + Ok(stream) => Some(stream), + Err(err) => { + log::warn!("webrtc offerer setup failed: {}", err); + None + } + }; let fut = Self::_start_inner( peer.to_owned(), key.to_owned(), @@ -338,6 +352,8 @@ impl Client { interface.clone(), udp.clone(), Some(stop_udp_tx), + ipv6, + webrtc_offerer, rendezvous_server.clone(), servers.clone(), contained, @@ -355,6 +371,8 @@ impl Client { interface, (None, None), None, + None, + None, rendezvous_server, servers, contained, @@ -366,6 +384,68 @@ impl Client { } } + fn spawn_webrtc_ice_bridge( + mut socket: Stream, + mut local_ice_rx: Option>, + webrtc: WebRTCStream, + peer: String, + session_key: String, + ) -> oneshot::Sender<()> { + let (stop_tx, mut stop_rx) = oneshot::channel::<()>(); + let my_id = Config::get_id(); + tokio::spawn(async move { + loop { + match stop_rx.try_recv() { + Ok(_) | Err(tokio::sync::oneshot::error::TryRecvError::Closed) => break, + Err(tokio::sync::oneshot::error::TryRecvError::Empty) => {} + } + + if let Some(rx) = local_ice_rx.as_mut() { + loop { + match rx.try_recv() { + Ok(candidate) => { + let mut msg = RendezvousMessage::new(); + msg.set_ice_candidate(IceCandidate { + from_id: my_id.clone(), + to_id: peer.clone(), + session_key: session_key.clone(), + candidate, + ..Default::default() + }); + if let Err(err) = socket.send(&msg).await { + log::warn!("failed to send WebRTC ICE candidate: {}", err); + return; + } + } + Err(TryRecvError::Empty) => break, + Err(TryRecvError::Disconnected) => { + local_ice_rx = None; + break; + } + } + } + } + + if let Some(msg_in) = + crate::get_next_nonkeyexchange_msg(&mut socket, Some(100)).await + { + if let Some(rendezvous_message::Union::IceCandidate(ice)) = msg_in.union { + if ice.from_id == peer + && ice.to_id == my_id + && ice.session_key == session_key + { + if let Err(err) = webrtc.add_remote_ice_candidate(&ice.candidate).await + { + log::warn!("failed to add WebRTC ICE candidate: {}", err); + } + } + } + } + } + }); + stop_tx + } + async fn _start_inner( peer: String, key: String, @@ -374,6 +454,8 @@ impl Client { interface: impl Interface, mut udp: (Option>, Option>>), stop_udp_tx: Option>, + mut ipv6: Option<(Arc, bytes::Bytes)>, + mut webrtc_offerer: Option, mut rendezvous_server: String, servers: Vec, contained: bool, @@ -446,14 +528,20 @@ impl Client { // Stop UDP NAT test task if still running stop_udp_tx.map(|tx| tx.send(())); let mut msg_out = RendezvousMessage::new(); - let mut ipv6 = if crate::get_ipv6_punch_enabled() { - if let Some((socket, addr)) = crate::get_ipv6_socket().await { - (Some(socket), Some(addr)) - } else { - (None, None) + let mut ipv6 = ipv6 + .take() + .map(|(socket, addr)| (Some(socket), Some(addr))) + .unwrap_or((None, None)); + let webrtc_sdp_offer = if let Some(webrtc) = webrtc_offerer.as_ref() { + match webrtc.get_local_endpoint().await { + Ok(endpoint) => endpoint, + Err(err) => { + log::warn!("failed to read local WebRTC offer: {}", err); + String::new() + } } } else { - (None, None) + String::new() }; let udp_nat_port = udp.1.map(|x| *x.lock().unwrap()).unwrap_or(0); let punch_type = if udp_nat_port > 0 { "UDP" } else { "TCP" }; @@ -467,8 +555,15 @@ impl Client { udp_port: udp_nat_port as _, force_relay: interface.is_force_relay(), socket_addr_v6: ipv6.1.unwrap_or_default(), + webrtc_sdp_offer: webrtc_sdp_offer.clone(), ..Default::default() }); + let webrtc_session_key = webrtc_offerer + .as_ref() + .map(|webrtc| webrtc.session_key().to_owned()) + .unwrap_or_default(); + let mut webrtc_sdp_answer = String::new(); + let mut pending_webrtc_ice = Vec::::new(); for i in 1..=3 { log::info!( "#{} {} punch attempt with {}, id: {}", @@ -510,6 +605,7 @@ impl Client { relay_server = ph.relay_server; peer_addr = AddrMangle::decode(&ph.socket_addr); feedback = ph.feedback; + webrtc_sdp_answer = ph.webrtc_sdp_answer; let s = udp.0.take(); if ph.is_udp && s.is_some() { if let Some(s) = s { @@ -549,6 +645,38 @@ impl Client { } } signed_id_pk = rr.pk().into(); + let mut webrtc_bridge_stop = None; + let mut webrtc_for_connect = None; + if !rr.webrtc_sdp_answer.is_empty() { + if let Some(webrtc) = webrtc_offerer.take() { + if let Err(err) = + webrtc.set_remote_endpoint(&rr.webrtc_sdp_answer).await + { + log::warn!("failed to set WebRTC relay answer: {}", err); + } else { + for candidate in pending_webrtc_ice.drain(..) { + if let Err(err) = + webrtc.add_remote_ice_candidate(&candidate).await + { + log::warn!( + "failed to add buffered WebRTC ICE candidate: {}", + err + ); + } + } + let session_key = webrtc.session_key().to_owned(); + let local_ice_rx = webrtc.take_local_ice_rx(); + webrtc_bridge_stop = Some(Self::spawn_webrtc_ice_bridge( + socket, + local_ice_rx, + webrtc.clone(), + peer.clone(), + session_key, + )); + webrtc_for_connect = Some(webrtc); + } + } + } let fut = Self::create_relay( &peer, rr.uuid, @@ -564,30 +692,86 @@ impl Client { } .boxed(), ); + if let Some(mut webrtc) = webrtc_for_connect { + connect_futures.push( + async move { + webrtc.wait_connected(CONNECT_TIMEOUT).await?; + Ok((Stream::WebRTC(webrtc), None, "WebRTC")) + } + .boxed(), + ); + } // Run all connection attempts concurrently, return the first successful one let (conn, kcp, typ) = match select_ok(connect_futures).await { Ok(conn) => (Ok(conn.0 .0), conn.0 .1, conn.0 .2), Err(e) => (Err(e), None, ""), }; + if let Some(stop) = webrtc_bridge_stop { + let _ = stop.send(()); + } let mut conn = conn?; feedback = rr.feedback; log::info!("{:?} used to establish {typ} connection", start.elapsed()); let pk = Self::secure_connection(&peer, signed_id_pk, &key, &mut conn).await?; return Ok(( - (conn, typ == "IPv6", pk, kcp, typ), + (conn, typ == "IPv6" || typ == "WebRTC", pk, kcp, typ), (feedback, rendezvous_server), false, )); } + Some(rendezvous_message::Union::IceCandidate(ice)) => { + if !webrtc_session_key.is_empty() + && ice.from_id == peer + && ice.to_id == Config::get_id() + && ice.session_key == webrtc_session_key + { + pending_webrtc_ice.push(ice.candidate); + } else { + log::debug!( + "dropping ICE candidate for unexpected WebRTC session from {} key {}", + ice.from_id, + ice.session_key + ); + } + } _ => { log::error!("Unexpected protobuf msg received: {:?}", msg_in); } } } } - drop(socket); + let mut webrtc_bridge_stop = None; + let mut webrtc_for_connect = None; + if !webrtc_sdp_answer.is_empty() { + if let Some(webrtc) = webrtc_offerer.take() { + if let Err(err) = webrtc.set_remote_endpoint(&webrtc_sdp_answer).await { + log::warn!("failed to set WebRTC answer: {}", err); + drop(socket); + } else { + for candidate in pending_webrtc_ice.drain(..) { + if let Err(err) = webrtc.add_remote_ice_candidate(&candidate).await { + log::warn!("failed to add buffered WebRTC ICE candidate: {}", err); + } + } + let session_key = webrtc.session_key().to_owned(); + let local_ice_rx = webrtc.take_local_ice_rx(); + webrtc_bridge_stop = Some(Self::spawn_webrtc_ice_bridge( + socket, + local_ice_rx, + webrtc.clone(), + peer.clone(), + session_key, + )); + webrtc_for_connect = Some(webrtc); + } + } else { + drop(socket); + } + } else { + drop(socket); + } if peer_addr.port() == 0 { bail!("Failed to connect via rendezvous server"); } @@ -621,6 +805,8 @@ impl Client { interface, udp.0, ipv6.0, + webrtc_for_connect, + webrtc_bridge_stop, punch_type, ) .await?, @@ -647,6 +833,8 @@ impl Client { interface: impl Interface, udp_socket_nat: Option>, udp_socket_v6: Option>, + webrtc_offerer: Option, + webrtc_bridge_stop: Option>, punch_type: &str, ) -> ResultType<( Stream, @@ -705,11 +893,23 @@ impl Client { if let Some(udp_socket_v6) = udp_socket_v6 { connect_futures.push(udp_nat_connect(udp_socket_v6, "IPv6", connect_timeout).boxed()); } + if let Some(mut webrtc) = webrtc_offerer { + connect_futures.push( + async move { + webrtc.wait_connected(connect_timeout).await?; + Ok((Stream::WebRTC(webrtc), None, "WebRTC")) + } + .boxed(), + ); + } // Run all connection attempts concurrently, return the first successful one let (mut conn, kcp, mut typ) = match select_ok(connect_futures).await { Ok(conn) => (Ok(conn.0 .0), conn.0 .1, conn.0 .2), Err(e) => (Err(e), None, ""), }; + if let Some(stop) = webrtc_bridge_stop { + let _ = stop.send(()); + } let mut direct = !conn.is_err(); if interface.is_force_relay() || conn.is_err() {