diff --git a/libs/hbb_common b/libs/hbb_common index c7f556786..5a78ec423 160000 --- a/libs/hbb_common +++ b/libs/hbb_common @@ -1 +1 @@ -Subproject commit c7f55678653cae6abb3d28e80f91844f727c4be2 +Subproject commit 5a78ec42303ca046d808742e911a156636a2432b diff --git a/src/client.rs b/src/client.rs index 2f9f02ad5..fd4a5e881 100644 --- a/src/client.rs +++ b/src/client.rs @@ -384,6 +384,10 @@ impl Client { } } + fn is_expected_webrtc_ice_candidate(ice: &IceCandidate, session_key: &str) -> bool { + !session_key.is_empty() && ice.session_key == session_key && !ice.candidate.is_empty() + } + fn spawn_webrtc_ice_bridge( mut socket: Stream, mut local_ice_rx: Option>, @@ -392,7 +396,6 @@ impl Client { session_key: String, ) -> oneshot::Sender<()> { let (stop_tx, mut stop_rx) = oneshot::channel::<()>(); - let my_id = Config::get_id(); tokio::spawn(async move { loop { match stop_rx.try_recv() { @@ -406,8 +409,7 @@ impl Client { Ok(candidate) => { let mut msg = RendezvousMessage::new(); msg.set_ice_candidate(IceCandidate { - from_id: my_id.clone(), - to_id: peer.clone(), + id: peer.clone(), session_key: session_key.clone(), candidate, ..Default::default() @@ -430,10 +432,7 @@ impl Client { crate::get_next_nonkeyexchange_msg(&mut socket, Some(100)).await { if let Some(rendezvous_message::Union::IceCandidate(ice)) = msg_in.union { - if ice.from_id == peer - && ice.to_id == my_id - && ice.session_key == session_key - { + if Self::is_expected_webrtc_ice_candidate(&ice, &session_key) { if let Err(err) = webrtc.add_remote_ice_candidate(&ice.candidate).await { log::warn!("failed to add WebRTC ICE candidate: {}", err); @@ -564,7 +563,7 @@ impl Client { .unwrap_or_default(); let mut webrtc_sdp_answer = String::new(); let mut pending_webrtc_ice = Vec::::new(); - for i in 1..=3 { + 'punch_attempts: for i in 1..=3 { log::info!( "#{} {} punch attempt with {}, id: {}", i, @@ -574,9 +573,20 @@ impl Client { ); socket.send(&msg_out).await?; // below timeout should not bigger than hbbs's connection timeout. - if let Some(msg_in) = - crate::get_next_nonkeyexchange_msg(&mut socket, Some(i * 3000)).await - { + let attempt_deadline = Instant::now() + Duration::from_millis((i * 3000) as u64); + loop { + let remaining = attempt_deadline.saturating_duration_since(Instant::now()); + if remaining.is_zero() { + break; + } + let timeout_ms = remaining + .as_millis() + .clamp(1, u64::MAX as u128) as u64; + let Some(msg_in) = + crate::get_next_nonkeyexchange_msg(&mut socket, Some(timeout_ms)).await + else { + break; + }; match msg_in.union { Some(rendezvous_message::Union::PunchHoleResponse(ph)) => { if ph.socket_addr.is_empty() { @@ -624,7 +634,7 @@ impl Client { } } log::info!("{} Hole Punched {} = {}", punch_type, peer, peer_addr); - break; + break 'punch_attempts; } } Some(rendezvous_message::Union::RelayResponse(rr)) => { @@ -722,17 +732,12 @@ impl Client { )); } Some(rendezvous_message::Union::IceCandidate(ice)) => { - if !webrtc_session_key.is_empty() - && ice.from_id == peer - && ice.to_id == Config::get_id() - && ice.session_key == webrtc_session_key - { + if Self::is_expected_webrtc_ice_candidate(&ice, &webrtc_session_key) { pending_webrtc_ice.push(ice.candidate); } else { log::debug!( - "dropping ICE candidate for unexpected WebRTC session from {} key {}", - ice.from_id, - ice.session_key + "dropping ICE candidate for unexpected WebRTC session key {}", + ice.session_key, ); } } @@ -4320,8 +4325,22 @@ pub mod peer_online { #[cfg(test)] mod tests { + use crate::client::Client; + use hbb_common::rendezvous_proto::IceCandidate; use hbb_common::tokio; + #[test] + fn accepts_webrtc_ice_by_session_key_only() { + let ice = IceCandidate { + session_key: "session-a".to_owned(), + candidate: "candidate-json".to_owned(), + ..Default::default() + }; + + assert!(Client::is_expected_webrtc_ice_candidate(&ice, "session-a")); + assert!(!Client::is_expected_webrtc_ice_candidate(&ice, "session-b")); + } + #[tokio::test] async fn test_query_onlines() { super::query_online_states( diff --git a/src/rendezvous_mediator.rs b/src/rendezvous_mediator.rs index 1c15aecb5..41fc75b18 100644 --- a/src/rendezvous_mediator.rs +++ b/src/rendezvous_mediator.rs @@ -40,10 +40,6 @@ use crate::{ type Message = RendezvousMessage; type RendezvousSender = mpsc::UnboundedSender; -fn webrtc_ice_key(peer_id: &str, session_key: &str) -> String { - format!("{}\n{}", peer_id, session_key) -} - lazy_static::lazy_static! { static ref SOLVING_PK_MISMATCH: Mutex = Default::default(); static ref LAST_MSG: Mutex<(SocketAddr, Instant)> = Mutex::new((SocketAddr::new([0; 4].into(), 0), Instant::now())); @@ -386,17 +382,12 @@ impl RendezvousMediator { }); } Some(rendezvous_message::Union::IceCandidate(ice)) => { - if ice.to_id != Config::get_id() { - return Ok(()); - } - let key = webrtc_ice_key(&ice.from_id, &ice.session_key); - let tx = WEBRTC_ICE_TXS.lock().await.get(&key).cloned(); + let tx = WEBRTC_ICE_TXS.lock().await.get(&ice.session_key).cloned(); if let Some(tx) = tx { let _ = tx.send(ice.candidate); } else { log::debug!( - "dropping ICE candidate for unknown WebRTC session from {} key {}", - ice.from_id, + "dropping ICE candidate for unknown WebRTC session key {}", ice.session_key ); } @@ -659,26 +650,22 @@ impl RendezvousMediator { async fn spawn_webrtc_answerer( &self, ph: &PunchHole, + force_relay: bool, server: ServerPtr, peer_addr: SocketAddr, control_permissions: Option, ) -> ResultType { - if ph.requester_id.is_empty() { - log::warn!("WebRTC offer missing requester_id; falling back to existing transports"); - return Ok(String::new()); - } - let mut stream = - WebRTCStream::new(&ph.webrtc_sdp_offer, ph.force_relay, CONNECT_TIMEOUT).await?; + WebRTCStream::new(&ph.webrtc_sdp_offer, force_relay, CONNECT_TIMEOUT).await?; let answer = stream.get_local_endpoint().await?; let session_key = stream.session_key().to_owned(); - let peer_id = ph.requester_id.clone(); + let return_route = ph.socket_addr.clone(); let (remote_ice_tx, mut remote_ice_rx) = mpsc::unbounded_channel::(); WEBRTC_ICE_TXS .lock() .await - .insert(webrtc_ice_key(&peer_id, &session_key), remote_ice_tx); + .insert(session_key.clone(), remote_ice_tx); let stream_for_remote_ice = stream.clone(); tokio::spawn(async move { @@ -692,15 +679,13 @@ impl RendezvousMediator { if let Some(mut local_ice_rx) = stream.take_local_ice_rx() { let sender = self.rz_sender.clone(); - let my_id = Config::get_id(); - let target_id = peer_id.clone(); + let socket_addr = return_route.clone(); let session_key_for_ice = session_key.clone(); tokio::spawn(async move { while let Some(candidate) = local_ice_rx.recv().await { let mut msg = Message::new(); msg.set_ice_candidate(IceCandidate { - from_id: my_id.clone(), - to_id: target_id.clone(), + socket_addr: socket_addr.clone(), session_key: session_key_for_ice.clone(), candidate, ..Default::default() @@ -710,17 +695,13 @@ impl RendezvousMediator { }); } - let peer_id_for_cleanup = peer_id.clone(); let session_key_for_cleanup = session_key.clone(); tokio::spawn(async move { let result = stream.wait_connected(CONNECT_TIMEOUT).await; WEBRTC_ICE_TXS .lock() .await - .remove(&webrtc_ice_key( - &peer_id_for_cleanup, - &session_key_for_cleanup, - )); + .remove(&session_key_for_cleanup); if let Err(err) = result { log::warn!("webrtc wait_connected failed: {}", err); return; @@ -756,6 +737,7 @@ impl RendezvousMediator { let webrtc_sdp_answer = if !ph.webrtc_sdp_offer.is_empty() { self.spawn_webrtc_answerer( &ph, + relay, server.clone(), peer_addr, control_permissions.clone(),