fix: route WebRTC ICE through rendezvous paths

This commit is contained in:
rustdesk
2026-05-18 18:47:13 +08:00
parent 1040df0399
commit c19698fa52
3 changed files with 50 additions and 49 deletions

View File

@@ -384,6 +384,10 @@ impl Client {
} }
} }
fn is_expected_webrtc_ice_candidate(ice: &IceCandidate, session_key: &str) -> bool {
!session_key.is_empty() && ice.session_key == session_key && !ice.candidate.is_empty()
}
fn spawn_webrtc_ice_bridge( fn spawn_webrtc_ice_bridge(
mut socket: Stream, mut socket: Stream,
mut local_ice_rx: Option<UnboundedReceiver<String>>, mut local_ice_rx: Option<UnboundedReceiver<String>>,
@@ -392,7 +396,6 @@ impl Client {
session_key: String, session_key: String,
) -> oneshot::Sender<()> { ) -> oneshot::Sender<()> {
let (stop_tx, mut stop_rx) = oneshot::channel::<()>(); let (stop_tx, mut stop_rx) = oneshot::channel::<()>();
let my_id = Config::get_id();
tokio::spawn(async move { tokio::spawn(async move {
loop { loop {
match stop_rx.try_recv() { match stop_rx.try_recv() {
@@ -406,8 +409,7 @@ impl Client {
Ok(candidate) => { Ok(candidate) => {
let mut msg = RendezvousMessage::new(); let mut msg = RendezvousMessage::new();
msg.set_ice_candidate(IceCandidate { msg.set_ice_candidate(IceCandidate {
from_id: my_id.clone(), id: peer.clone(),
to_id: peer.clone(),
session_key: session_key.clone(), session_key: session_key.clone(),
candidate, candidate,
..Default::default() ..Default::default()
@@ -430,10 +432,7 @@ impl Client {
crate::get_next_nonkeyexchange_msg(&mut socket, Some(100)).await crate::get_next_nonkeyexchange_msg(&mut socket, Some(100)).await
{ {
if let Some(rendezvous_message::Union::IceCandidate(ice)) = msg_in.union { if let Some(rendezvous_message::Union::IceCandidate(ice)) = msg_in.union {
if ice.from_id == peer if Self::is_expected_webrtc_ice_candidate(&ice, &session_key) {
&& ice.to_id == my_id
&& ice.session_key == session_key
{
if let Err(err) = webrtc.add_remote_ice_candidate(&ice.candidate).await if let Err(err) = webrtc.add_remote_ice_candidate(&ice.candidate).await
{ {
log::warn!("failed to add WebRTC ICE candidate: {}", err); log::warn!("failed to add WebRTC ICE candidate: {}", err);
@@ -564,7 +563,7 @@ impl Client {
.unwrap_or_default(); .unwrap_or_default();
let mut webrtc_sdp_answer = String::new(); let mut webrtc_sdp_answer = String::new();
let mut pending_webrtc_ice = Vec::<String>::new(); let mut pending_webrtc_ice = Vec::<String>::new();
for i in 1..=3 { 'punch_attempts: for i in 1..=3 {
log::info!( log::info!(
"#{} {} punch attempt with {}, id: {}", "#{} {} punch attempt with {}, id: {}",
i, i,
@@ -574,9 +573,20 @@ impl Client {
); );
socket.send(&msg_out).await?; socket.send(&msg_out).await?;
// below timeout should not bigger than hbbs's connection timeout. // below timeout should not bigger than hbbs's connection timeout.
if let Some(msg_in) = let attempt_deadline = Instant::now() + Duration::from_millis((i * 3000) as u64);
crate::get_next_nonkeyexchange_msg(&mut socket, Some(i * 3000)).await loop {
{ let remaining = attempt_deadline.saturating_duration_since(Instant::now());
if remaining.is_zero() {
break;
}
let timeout_ms = remaining
.as_millis()
.clamp(1, u64::MAX as u128) as u64;
let Some(msg_in) =
crate::get_next_nonkeyexchange_msg(&mut socket, Some(timeout_ms)).await
else {
break;
};
match msg_in.union { match msg_in.union {
Some(rendezvous_message::Union::PunchHoleResponse(ph)) => { Some(rendezvous_message::Union::PunchHoleResponse(ph)) => {
if ph.socket_addr.is_empty() { if ph.socket_addr.is_empty() {
@@ -624,7 +634,7 @@ impl Client {
} }
} }
log::info!("{} Hole Punched {} = {}", punch_type, peer, peer_addr); log::info!("{} Hole Punched {} = {}", punch_type, peer, peer_addr);
break; break 'punch_attempts;
} }
} }
Some(rendezvous_message::Union::RelayResponse(rr)) => { Some(rendezvous_message::Union::RelayResponse(rr)) => {
@@ -722,17 +732,12 @@ impl Client {
)); ));
} }
Some(rendezvous_message::Union::IceCandidate(ice)) => { Some(rendezvous_message::Union::IceCandidate(ice)) => {
if !webrtc_session_key.is_empty() if Self::is_expected_webrtc_ice_candidate(&ice, &webrtc_session_key) {
&& ice.from_id == peer
&& ice.to_id == Config::get_id()
&& ice.session_key == webrtc_session_key
{
pending_webrtc_ice.push(ice.candidate); pending_webrtc_ice.push(ice.candidate);
} else { } else {
log::debug!( log::debug!(
"dropping ICE candidate for unexpected WebRTC session from {} key {}", "dropping ICE candidate for unexpected WebRTC session key {}",
ice.from_id, ice.session_key,
ice.session_key
); );
} }
} }
@@ -4320,8 +4325,22 @@ pub mod peer_online {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use crate::client::Client;
use hbb_common::rendezvous_proto::IceCandidate;
use hbb_common::tokio; use hbb_common::tokio;
#[test]
fn accepts_webrtc_ice_by_session_key_only() {
let ice = IceCandidate {
session_key: "session-a".to_owned(),
candidate: "candidate-json".to_owned(),
..Default::default()
};
assert!(Client::is_expected_webrtc_ice_candidate(&ice, "session-a"));
assert!(!Client::is_expected_webrtc_ice_candidate(&ice, "session-b"));
}
#[tokio::test] #[tokio::test]
async fn test_query_onlines() { async fn test_query_onlines() {
super::query_online_states( super::query_online_states(

View File

@@ -40,10 +40,6 @@ use crate::{
type Message = RendezvousMessage; type Message = RendezvousMessage;
type RendezvousSender = mpsc::UnboundedSender<Message>; type RendezvousSender = mpsc::UnboundedSender<Message>;
fn webrtc_ice_key(peer_id: &str, session_key: &str) -> String {
format!("{}\n{}", peer_id, session_key)
}
lazy_static::lazy_static! { lazy_static::lazy_static! {
static ref SOLVING_PK_MISMATCH: Mutex<String> = Default::default(); static ref SOLVING_PK_MISMATCH: Mutex<String> = Default::default();
static ref LAST_MSG: Mutex<(SocketAddr, Instant)> = Mutex::new((SocketAddr::new([0; 4].into(), 0), Instant::now())); static ref LAST_MSG: Mutex<(SocketAddr, Instant)> = Mutex::new((SocketAddr::new([0; 4].into(), 0), Instant::now()));
@@ -386,17 +382,12 @@ impl RendezvousMediator {
}); });
} }
Some(rendezvous_message::Union::IceCandidate(ice)) => { Some(rendezvous_message::Union::IceCandidate(ice)) => {
if ice.to_id != Config::get_id() { let tx = WEBRTC_ICE_TXS.lock().await.get(&ice.session_key).cloned();
return Ok(());
}
let key = webrtc_ice_key(&ice.from_id, &ice.session_key);
let tx = WEBRTC_ICE_TXS.lock().await.get(&key).cloned();
if let Some(tx) = tx { if let Some(tx) = tx {
let _ = tx.send(ice.candidate); let _ = tx.send(ice.candidate);
} else { } else {
log::debug!( log::debug!(
"dropping ICE candidate for unknown WebRTC session from {} key {}", "dropping ICE candidate for unknown WebRTC session key {}",
ice.from_id,
ice.session_key ice.session_key
); );
} }
@@ -659,26 +650,22 @@ impl RendezvousMediator {
async fn spawn_webrtc_answerer( async fn spawn_webrtc_answerer(
&self, &self,
ph: &PunchHole, ph: &PunchHole,
force_relay: bool,
server: ServerPtr, server: ServerPtr,
peer_addr: SocketAddr, peer_addr: SocketAddr,
control_permissions: Option<ControlPermissions>, control_permissions: Option<ControlPermissions>,
) -> ResultType<String> { ) -> ResultType<String> {
if ph.requester_id.is_empty() {
log::warn!("WebRTC offer missing requester_id; falling back to existing transports");
return Ok(String::new());
}
let mut stream = let mut stream =
WebRTCStream::new(&ph.webrtc_sdp_offer, ph.force_relay, CONNECT_TIMEOUT).await?; WebRTCStream::new(&ph.webrtc_sdp_offer, force_relay, CONNECT_TIMEOUT).await?;
let answer = stream.get_local_endpoint().await?; let answer = stream.get_local_endpoint().await?;
let session_key = stream.session_key().to_owned(); let session_key = stream.session_key().to_owned();
let peer_id = ph.requester_id.clone(); let return_route = ph.socket_addr.clone();
let (remote_ice_tx, mut remote_ice_rx) = mpsc::unbounded_channel::<String>(); let (remote_ice_tx, mut remote_ice_rx) = mpsc::unbounded_channel::<String>();
WEBRTC_ICE_TXS WEBRTC_ICE_TXS
.lock() .lock()
.await .await
.insert(webrtc_ice_key(&peer_id, &session_key), remote_ice_tx); .insert(session_key.clone(), remote_ice_tx);
let stream_for_remote_ice = stream.clone(); let stream_for_remote_ice = stream.clone();
tokio::spawn(async move { tokio::spawn(async move {
@@ -692,15 +679,13 @@ impl RendezvousMediator {
if let Some(mut local_ice_rx) = stream.take_local_ice_rx() { if let Some(mut local_ice_rx) = stream.take_local_ice_rx() {
let sender = self.rz_sender.clone(); let sender = self.rz_sender.clone();
let my_id = Config::get_id(); let socket_addr = return_route.clone();
let target_id = peer_id.clone();
let session_key_for_ice = session_key.clone(); let session_key_for_ice = session_key.clone();
tokio::spawn(async move { tokio::spawn(async move {
while let Some(candidate) = local_ice_rx.recv().await { while let Some(candidate) = local_ice_rx.recv().await {
let mut msg = Message::new(); let mut msg = Message::new();
msg.set_ice_candidate(IceCandidate { msg.set_ice_candidate(IceCandidate {
from_id: my_id.clone(), socket_addr: socket_addr.clone(),
to_id: target_id.clone(),
session_key: session_key_for_ice.clone(), session_key: session_key_for_ice.clone(),
candidate, candidate,
..Default::default() ..Default::default()
@@ -710,17 +695,13 @@ impl RendezvousMediator {
}); });
} }
let peer_id_for_cleanup = peer_id.clone();
let session_key_for_cleanup = session_key.clone(); let session_key_for_cleanup = session_key.clone();
tokio::spawn(async move { tokio::spawn(async move {
let result = stream.wait_connected(CONNECT_TIMEOUT).await; let result = stream.wait_connected(CONNECT_TIMEOUT).await;
WEBRTC_ICE_TXS WEBRTC_ICE_TXS
.lock() .lock()
.await .await
.remove(&webrtc_ice_key( .remove(&session_key_for_cleanup);
&peer_id_for_cleanup,
&session_key_for_cleanup,
));
if let Err(err) = result { if let Err(err) = result {
log::warn!("webrtc wait_connected failed: {}", err); log::warn!("webrtc wait_connected failed: {}", err);
return; return;
@@ -756,6 +737,7 @@ impl RendezvousMediator {
let webrtc_sdp_answer = if !ph.webrtc_sdp_offer.is_empty() { let webrtc_sdp_answer = if !ph.webrtc_sdp_offer.is_empty() {
self.spawn_webrtc_answerer( self.spawn_webrtc_answerer(
&ph, &ph,
relay,
server.clone(), server.clone(),
peer_addr, peer_addr,
control_permissions.clone(), control_permissions.clone(),