feat: route WebRTC ICE on controlled side

This commit is contained in:
rustdesk
2026-05-17 15:24:27 +08:00
parent c50e7d078d
commit 0e0ec5a551

View File

@@ -1,4 +1,5 @@
use std::{
collections::HashMap,
net::SocketAddr,
sync::{
atomic::{AtomicBool, Ordering},
@@ -21,8 +22,13 @@ use hbb_common::{
rendezvous_proto::*,
sleep,
socket_client::{self, connect_tcp, is_ipv4, new_direct_udp_for, new_udp_for},
tokio::{self, select, sync::Mutex, time::interval},
tokio::{
self, select,
sync::{mpsc, Mutex},
time::interval,
},
udp::FramedSocket,
webrtc::WebRTCStream,
AddrMangle, IntoTargetAddr, ResultType, Stream, TargetAddr,
};
@@ -32,11 +38,17 @@ use crate::{
};
type Message = RendezvousMessage;
type RendezvousSender = mpsc::UnboundedSender<Message>;
fn webrtc_ice_key(peer_id: &str, session_key: &str) -> String {
format!("{}\n{}", peer_id, session_key)
}
lazy_static::lazy_static! {
static ref SOLVING_PK_MISMATCH: Mutex<String> = Default::default();
static ref LAST_MSG: Mutex<(SocketAddr, Instant)> = Mutex::new((SocketAddr::new([0; 4].into(), 0), Instant::now()));
static ref LAST_RELAY_MSG: Mutex<(SocketAddr, Instant)> = Mutex::new((SocketAddr::new([0; 4].into(), 0), Instant::now()));
static ref WEBRTC_ICE_TXS: Mutex<HashMap<String, mpsc::UnboundedSender<String>>> = Default::default();
}
static SHOULD_EXIT: AtomicBool = AtomicBool::new(false);
static MANUAL_RESTARTED: AtomicBool = AtomicBool::new(false);
@@ -72,6 +84,7 @@ pub struct RendezvousMediator {
host: String,
host_prefix: String,
keep_alive: i32,
rz_sender: RendezvousSender,
}
impl RendezvousMediator {
@@ -182,11 +195,13 @@ impl RendezvousMediator {
let host = check_port(&host, RENDEZVOUS_PORT);
log::info!("start udp: {host}");
let (mut socket, mut addr) = new_udp_for(&host, CONNECT_TIMEOUT).await?;
let (rz_sender, mut rz_out_rx) = mpsc::unbounded_channel::<Message>();
let mut rz = Self {
addr: addr.clone(),
host: host.clone(),
host_prefix: Self::get_host_prefix(&host),
keep_alive: crate::DEFAULT_KEEP_ALIVE,
rz_sender,
};
let mut timer = crate::rustdesk_interval(interval(crate::TIMER_OUT));
@@ -246,6 +261,9 @@ impl RendezvousMediator {
},
}
},
Some(msg_out) = rz_out_rx.recv() => {
Sink::Framed(&mut socket, &addr).send(&msg_out).await?;
},
_ = timer.tick() => {
if SHOULD_EXIT.load(Ordering::SeqCst) {
break;
@@ -367,6 +385,22 @@ impl RendezvousMediator {
allow_err!(rz.handle_intranet(fla, server).await);
});
}
Some(rendezvous_message::Union::IceCandidate(ice)) => {
if ice.to_id != Config::get_id() {
return Ok(());
}
let key = webrtc_ice_key(&ice.from_id, &ice.session_key);
let tx = WEBRTC_ICE_TXS.lock().await.get(&key).cloned();
if let Some(tx) = tx {
let _ = tx.send(ice.candidate);
} else {
log::debug!(
"dropping ICE candidate for unknown WebRTC session from {} key {}",
ice.from_id,
ice.session_key
);
}
}
Some(rendezvous_message::Union::ConfigureUpdate(cu)) => {
let v0 = Config::get_rendezvous_servers();
Config::set_option(
@@ -389,11 +423,13 @@ impl RendezvousMediator {
let mut conn = connect_tcp(host.clone(), CONNECT_TIMEOUT).await?;
let key = crate::get_key(true).await;
crate::secure_tcp(&mut conn, &key).await?;
let (rz_sender, mut rz_out_rx) = mpsc::unbounded_channel::<Message>();
let mut rz = Self {
addr: conn.local_addr().into_target_addr()?,
host: host.clone(),
host_prefix: Self::get_host_prefix(&host),
keep_alive: crate::DEFAULT_KEEP_ALIVE,
rz_sender,
};
let mut timer = crate::rustdesk_interval(interval(crate::TIMER_OUT));
let mut last_register_sent: Option<Instant> = None;
@@ -421,6 +457,9 @@ impl RendezvousMediator {
let msg = Message::parse_from_bytes(&bytes)?;
rz.handle_resp(msg.union, Sink::Stream(&mut conn), &server, &mut update_latency).await?
}
Some(msg_out) = rz_out_rx.recv() => {
Sink::Stream(&mut conn).send(&msg_out).await?;
}
_ = timer.tick() => {
if SHOULD_EXIT.load(Ordering::SeqCst) {
break;
@@ -472,6 +511,7 @@ impl RendezvousMediator {
rr.secure,
false,
Default::default(),
String::new(),
rr.control_permissions.clone().into_option(),
)
.await
@@ -486,6 +526,7 @@ impl RendezvousMediator {
secure: bool,
initiate: bool,
socket_addr_v6: bytes::Bytes,
webrtc_sdp_answer: String,
control_permissions: Option<ControlPermissions>,
) -> ResultType<()> {
let peer_addr = AddrMangle::decode(&socket_addr);
@@ -504,6 +545,7 @@ impl RendezvousMediator {
socket_addr: socket_addr.into(),
version: crate::VERSION.to_owned(),
socket_addr_v6,
webrtc_sdp_answer,
..Default::default()
};
if initiate {
@@ -571,6 +613,7 @@ impl RendezvousMediator {
true,
true,
socket_addr_v6,
String::new(),
fla.control_permissions.into_option(),
)
.await
@@ -613,6 +656,91 @@ impl RendezvousMediator {
Ok(())
}
async fn spawn_webrtc_answerer(
&self,
ph: &PunchHole,
server: ServerPtr,
peer_addr: SocketAddr,
control_permissions: Option<ControlPermissions>,
) -> ResultType<String> {
if ph.requester_id.is_empty() {
log::warn!("WebRTC offer missing requester_id; falling back to existing transports");
return Ok(String::new());
}
let mut stream =
WebRTCStream::new(&ph.webrtc_sdp_offer, ph.force_relay, CONNECT_TIMEOUT).await?;
let answer = stream.get_local_endpoint().await?;
let session_key = stream.session_key().to_owned();
let peer_id = ph.requester_id.clone();
let (remote_ice_tx, mut remote_ice_rx) = mpsc::unbounded_channel::<String>();
WEBRTC_ICE_TXS
.lock()
.await
.insert(webrtc_ice_key(&peer_id, &session_key), remote_ice_tx);
let stream_for_remote_ice = stream.clone();
tokio::spawn(async move {
while let Some(candidate) = remote_ice_rx.recv().await {
if let Err(err) = stream_for_remote_ice.add_remote_ice_candidate(&candidate).await
{
log::warn!("failed to add remote WebRTC ICE candidate: {}", err);
}
}
});
if let Some(mut local_ice_rx) = stream.take_local_ice_rx() {
let sender = self.rz_sender.clone();
let my_id = Config::get_id();
let target_id = peer_id.clone();
let session_key_for_ice = session_key.clone();
tokio::spawn(async move {
while let Some(candidate) = local_ice_rx.recv().await {
let mut msg = Message::new();
msg.set_ice_candidate(IceCandidate {
from_id: my_id.clone(),
to_id: target_id.clone(),
session_key: session_key_for_ice.clone(),
candidate,
..Default::default()
});
let _ = sender.send(msg);
}
});
}
let peer_id_for_cleanup = peer_id.clone();
let session_key_for_cleanup = session_key.clone();
tokio::spawn(async move {
let result = stream.wait_connected(CONNECT_TIMEOUT).await;
WEBRTC_ICE_TXS
.lock()
.await
.remove(&webrtc_ice_key(
&peer_id_for_cleanup,
&session_key_for_cleanup,
));
if let Err(err) = result {
log::warn!("webrtc wait_connected failed: {}", err);
return;
}
if let Err(err) = crate::server::create_tcp_connection(
server,
Stream::WebRTC(stream),
peer_addr,
true,
control_permissions,
)
.await
{
log::warn!("failed to create WebRTC server connection: {}", err);
}
});
Ok(answer)
}
async fn handle_punch_hole(&self, ph: PunchHole, server: ServerPtr) -> ResultType<()> {
let mut peer_addr = AddrMangle::decode(&ph.socket_addr);
let last = *LAST_MSG.lock().await;
@@ -624,7 +752,22 @@ impl RendezvousMediator {
let peer_addr_v6 = hbb_common::AddrMangle::decode(&ph.socket_addr_v6);
let relay = use_ws() || Config::is_proxy() || ph.force_relay;
let mut socket_addr_v6 = Default::default();
let control_permissions = ph.control_permissions.into_option();
let control_permissions = ph.control_permissions.clone().into_option();
let webrtc_sdp_answer = if !ph.webrtc_sdp_offer.is_empty() {
self.spawn_webrtc_answerer(
&ph,
server.clone(),
peer_addr,
control_permissions.clone(),
)
.await
.unwrap_or_else(|err| {
log::warn!("failed to create WebRTC answer: {}", err);
String::new()
})
} else {
String::new()
};
if peer_addr_v6.port() > 0 && !relay {
socket_addr_v6 = start_ipv6(
peer_addr_v6,
@@ -651,6 +794,7 @@ impl RendezvousMediator {
true,
true,
socket_addr_v6.clone(),
webrtc_sdp_answer.clone(),
control_permissions,
)
.await;
@@ -664,6 +808,7 @@ impl RendezvousMediator {
nat_type: nat_type.into(),
version: crate::VERSION.to_owned(),
socket_addr_v6,
webrtc_sdp_answer,
..Default::default()
};
if ph.udp_port > 0 {