diff --git a/src/rendezvous_mediator.rs b/src/rendezvous_mediator.rs index 89d7fa01e..1c15aecb5 100644 --- a/src/rendezvous_mediator.rs +++ b/src/rendezvous_mediator.rs @@ -1,4 +1,5 @@ use std::{ + collections::HashMap, net::SocketAddr, sync::{ atomic::{AtomicBool, Ordering}, @@ -21,8 +22,13 @@ use hbb_common::{ rendezvous_proto::*, sleep, socket_client::{self, connect_tcp, is_ipv4, new_direct_udp_for, new_udp_for}, - tokio::{self, select, sync::Mutex, time::interval}, + tokio::{ + self, select, + sync::{mpsc, Mutex}, + time::interval, + }, udp::FramedSocket, + webrtc::WebRTCStream, AddrMangle, IntoTargetAddr, ResultType, Stream, TargetAddr, }; @@ -32,11 +38,17 @@ use crate::{ }; type Message = RendezvousMessage; +type RendezvousSender = mpsc::UnboundedSender; + +fn webrtc_ice_key(peer_id: &str, session_key: &str) -> String { + format!("{}\n{}", peer_id, session_key) +} lazy_static::lazy_static! { static ref SOLVING_PK_MISMATCH: Mutex = Default::default(); static ref LAST_MSG: Mutex<(SocketAddr, Instant)> = Mutex::new((SocketAddr::new([0; 4].into(), 0), Instant::now())); static ref LAST_RELAY_MSG: Mutex<(SocketAddr, Instant)> = Mutex::new((SocketAddr::new([0; 4].into(), 0), Instant::now())); + static ref WEBRTC_ICE_TXS: Mutex>> = Default::default(); } static SHOULD_EXIT: AtomicBool = AtomicBool::new(false); static MANUAL_RESTARTED: AtomicBool = AtomicBool::new(false); @@ -72,6 +84,7 @@ pub struct RendezvousMediator { host: String, host_prefix: String, keep_alive: i32, + rz_sender: RendezvousSender, } impl RendezvousMediator { @@ -182,11 +195,13 @@ impl RendezvousMediator { let host = check_port(&host, RENDEZVOUS_PORT); log::info!("start udp: {host}"); let (mut socket, mut addr) = new_udp_for(&host, CONNECT_TIMEOUT).await?; + let (rz_sender, mut rz_out_rx) = mpsc::unbounded_channel::(); let mut rz = Self { addr: addr.clone(), host: host.clone(), host_prefix: Self::get_host_prefix(&host), keep_alive: crate::DEFAULT_KEEP_ALIVE, + rz_sender, }; let mut timer = crate::rustdesk_interval(interval(crate::TIMER_OUT)); @@ -246,6 +261,9 @@ impl RendezvousMediator { }, } }, + Some(msg_out) = rz_out_rx.recv() => { + Sink::Framed(&mut socket, &addr).send(&msg_out).await?; + }, _ = timer.tick() => { if SHOULD_EXIT.load(Ordering::SeqCst) { break; @@ -367,6 +385,22 @@ impl RendezvousMediator { allow_err!(rz.handle_intranet(fla, server).await); }); } + Some(rendezvous_message::Union::IceCandidate(ice)) => { + if ice.to_id != Config::get_id() { + return Ok(()); + } + let key = webrtc_ice_key(&ice.from_id, &ice.session_key); + let tx = WEBRTC_ICE_TXS.lock().await.get(&key).cloned(); + if let Some(tx) = tx { + let _ = tx.send(ice.candidate); + } else { + log::debug!( + "dropping ICE candidate for unknown WebRTC session from {} key {}", + ice.from_id, + ice.session_key + ); + } + } Some(rendezvous_message::Union::ConfigureUpdate(cu)) => { let v0 = Config::get_rendezvous_servers(); Config::set_option( @@ -389,11 +423,13 @@ impl RendezvousMediator { let mut conn = connect_tcp(host.clone(), CONNECT_TIMEOUT).await?; let key = crate::get_key(true).await; crate::secure_tcp(&mut conn, &key).await?; + let (rz_sender, mut rz_out_rx) = mpsc::unbounded_channel::(); let mut rz = Self { addr: conn.local_addr().into_target_addr()?, host: host.clone(), host_prefix: Self::get_host_prefix(&host), keep_alive: crate::DEFAULT_KEEP_ALIVE, + rz_sender, }; let mut timer = crate::rustdesk_interval(interval(crate::TIMER_OUT)); let mut last_register_sent: Option = None; @@ -421,6 +457,9 @@ impl RendezvousMediator { let msg = Message::parse_from_bytes(&bytes)?; rz.handle_resp(msg.union, Sink::Stream(&mut conn), &server, &mut update_latency).await? } + Some(msg_out) = rz_out_rx.recv() => { + Sink::Stream(&mut conn).send(&msg_out).await?; + } _ = timer.tick() => { if SHOULD_EXIT.load(Ordering::SeqCst) { break; @@ -472,6 +511,7 @@ impl RendezvousMediator { rr.secure, false, Default::default(), + String::new(), rr.control_permissions.clone().into_option(), ) .await @@ -486,6 +526,7 @@ impl RendezvousMediator { secure: bool, initiate: bool, socket_addr_v6: bytes::Bytes, + webrtc_sdp_answer: String, control_permissions: Option, ) -> ResultType<()> { let peer_addr = AddrMangle::decode(&socket_addr); @@ -504,6 +545,7 @@ impl RendezvousMediator { socket_addr: socket_addr.into(), version: crate::VERSION.to_owned(), socket_addr_v6, + webrtc_sdp_answer, ..Default::default() }; if initiate { @@ -571,6 +613,7 @@ impl RendezvousMediator { true, true, socket_addr_v6, + String::new(), fla.control_permissions.into_option(), ) .await @@ -613,6 +656,91 @@ impl RendezvousMediator { Ok(()) } + async fn spawn_webrtc_answerer( + &self, + ph: &PunchHole, + server: ServerPtr, + peer_addr: SocketAddr, + control_permissions: Option, + ) -> ResultType { + if ph.requester_id.is_empty() { + log::warn!("WebRTC offer missing requester_id; falling back to existing transports"); + return Ok(String::new()); + } + + let mut stream = + WebRTCStream::new(&ph.webrtc_sdp_offer, ph.force_relay, CONNECT_TIMEOUT).await?; + let answer = stream.get_local_endpoint().await?; + let session_key = stream.session_key().to_owned(); + let peer_id = ph.requester_id.clone(); + + let (remote_ice_tx, mut remote_ice_rx) = mpsc::unbounded_channel::(); + WEBRTC_ICE_TXS + .lock() + .await + .insert(webrtc_ice_key(&peer_id, &session_key), remote_ice_tx); + + let stream_for_remote_ice = stream.clone(); + tokio::spawn(async move { + while let Some(candidate) = remote_ice_rx.recv().await { + if let Err(err) = stream_for_remote_ice.add_remote_ice_candidate(&candidate).await + { + log::warn!("failed to add remote WebRTC ICE candidate: {}", err); + } + } + }); + + if let Some(mut local_ice_rx) = stream.take_local_ice_rx() { + let sender = self.rz_sender.clone(); + let my_id = Config::get_id(); + let target_id = peer_id.clone(); + let session_key_for_ice = session_key.clone(); + tokio::spawn(async move { + while let Some(candidate) = local_ice_rx.recv().await { + let mut msg = Message::new(); + msg.set_ice_candidate(IceCandidate { + from_id: my_id.clone(), + to_id: target_id.clone(), + session_key: session_key_for_ice.clone(), + candidate, + ..Default::default() + }); + let _ = sender.send(msg); + } + }); + } + + let peer_id_for_cleanup = peer_id.clone(); + let session_key_for_cleanup = session_key.clone(); + tokio::spawn(async move { + let result = stream.wait_connected(CONNECT_TIMEOUT).await; + WEBRTC_ICE_TXS + .lock() + .await + .remove(&webrtc_ice_key( + &peer_id_for_cleanup, + &session_key_for_cleanup, + )); + if let Err(err) = result { + log::warn!("webrtc wait_connected failed: {}", err); + return; + } + if let Err(err) = crate::server::create_tcp_connection( + server, + Stream::WebRTC(stream), + peer_addr, + true, + control_permissions, + ) + .await + { + log::warn!("failed to create WebRTC server connection: {}", err); + } + }); + + Ok(answer) + } + async fn handle_punch_hole(&self, ph: PunchHole, server: ServerPtr) -> ResultType<()> { let mut peer_addr = AddrMangle::decode(&ph.socket_addr); let last = *LAST_MSG.lock().await; @@ -624,7 +752,22 @@ impl RendezvousMediator { let peer_addr_v6 = hbb_common::AddrMangle::decode(&ph.socket_addr_v6); let relay = use_ws() || Config::is_proxy() || ph.force_relay; let mut socket_addr_v6 = Default::default(); - let control_permissions = ph.control_permissions.into_option(); + let control_permissions = ph.control_permissions.clone().into_option(); + let webrtc_sdp_answer = if !ph.webrtc_sdp_offer.is_empty() { + self.spawn_webrtc_answerer( + &ph, + server.clone(), + peer_addr, + control_permissions.clone(), + ) + .await + .unwrap_or_else(|err| { + log::warn!("failed to create WebRTC answer: {}", err); + String::new() + }) + } else { + String::new() + }; if peer_addr_v6.port() > 0 && !relay { socket_addr_v6 = start_ipv6( peer_addr_v6, @@ -651,6 +794,7 @@ impl RendezvousMediator { true, true, socket_addr_v6.clone(), + webrtc_sdp_answer.clone(), control_permissions, ) .await; @@ -664,6 +808,7 @@ impl RendezvousMediator { nat_type: nat_type.into(), version: crate::VERSION.to_owned(), socket_addr_v6, + webrtc_sdp_answer, ..Default::default() }; if ph.udp_port > 0 {