feat: race WebRTC as a direct transport enhancement

This commit is contained in:
rustdesk
2026-05-17 15:30:50 +08:00
parent 0e0ec5a551
commit 03cbf609f6

View File

@@ -65,11 +65,12 @@ use hbb_common::{
self,
net::UdpSocket,
sync::{
mpsc::{unbounded_channel, UnboundedReceiver},
mpsc::{error::TryRecvError, unbounded_channel, UnboundedReceiver},
oneshot,
},
time::{interval, Duration, Instant},
},
webrtc::WebRTCStream,
AddrMangle, ResultType, Stream,
};
pub use helper::*;
@@ -330,6 +331,19 @@ impl Client {
} else {
(None, None)
};
let ipv6 = if crate::get_ipv6_punch_enabled() {
crate::get_ipv6_socket().await
} else {
None
};
let webrtc_offerer =
match WebRTCStream::new("", interface.is_force_relay(), CONNECT_TIMEOUT).await {
Ok(stream) => Some(stream),
Err(err) => {
log::warn!("webrtc offerer setup failed: {}", err);
None
}
};
let fut = Self::_start_inner(
peer.to_owned(),
key.to_owned(),
@@ -338,6 +352,8 @@ impl Client {
interface.clone(),
udp.clone(),
Some(stop_udp_tx),
ipv6,
webrtc_offerer,
rendezvous_server.clone(),
servers.clone(),
contained,
@@ -355,6 +371,8 @@ impl Client {
interface,
(None, None),
None,
None,
None,
rendezvous_server,
servers,
contained,
@@ -366,6 +384,68 @@ impl Client {
}
}
fn spawn_webrtc_ice_bridge(
mut socket: Stream,
mut local_ice_rx: Option<UnboundedReceiver<String>>,
webrtc: WebRTCStream,
peer: String,
session_key: String,
) -> oneshot::Sender<()> {
let (stop_tx, mut stop_rx) = oneshot::channel::<()>();
let my_id = Config::get_id();
tokio::spawn(async move {
loop {
match stop_rx.try_recv() {
Ok(_) | Err(tokio::sync::oneshot::error::TryRecvError::Closed) => break,
Err(tokio::sync::oneshot::error::TryRecvError::Empty) => {}
}
if let Some(rx) = local_ice_rx.as_mut() {
loop {
match rx.try_recv() {
Ok(candidate) => {
let mut msg = RendezvousMessage::new();
msg.set_ice_candidate(IceCandidate {
from_id: my_id.clone(),
to_id: peer.clone(),
session_key: session_key.clone(),
candidate,
..Default::default()
});
if let Err(err) = socket.send(&msg).await {
log::warn!("failed to send WebRTC ICE candidate: {}", err);
return;
}
}
Err(TryRecvError::Empty) => break,
Err(TryRecvError::Disconnected) => {
local_ice_rx = None;
break;
}
}
}
}
if let Some(msg_in) =
crate::get_next_nonkeyexchange_msg(&mut socket, Some(100)).await
{
if let Some(rendezvous_message::Union::IceCandidate(ice)) = msg_in.union {
if ice.from_id == peer
&& ice.to_id == my_id
&& ice.session_key == session_key
{
if let Err(err) = webrtc.add_remote_ice_candidate(&ice.candidate).await
{
log::warn!("failed to add WebRTC ICE candidate: {}", err);
}
}
}
}
}
});
stop_tx
}
async fn _start_inner(
peer: String,
key: String,
@@ -374,6 +454,8 @@ impl Client {
interface: impl Interface,
mut udp: (Option<Arc<UdpSocket>>, Option<Arc<Mutex<u16>>>),
stop_udp_tx: Option<oneshot::Sender<()>>,
mut ipv6: Option<(Arc<UdpSocket>, bytes::Bytes)>,
mut webrtc_offerer: Option<WebRTCStream>,
mut rendezvous_server: String,
servers: Vec<String>,
contained: bool,
@@ -446,14 +528,20 @@ impl Client {
// Stop UDP NAT test task if still running
stop_udp_tx.map(|tx| tx.send(()));
let mut msg_out = RendezvousMessage::new();
let mut ipv6 = if crate::get_ipv6_punch_enabled() {
if let Some((socket, addr)) = crate::get_ipv6_socket().await {
(Some(socket), Some(addr))
} else {
(None, None)
let mut ipv6 = ipv6
.take()
.map(|(socket, addr)| (Some(socket), Some(addr)))
.unwrap_or((None, None));
let webrtc_sdp_offer = if let Some(webrtc) = webrtc_offerer.as_ref() {
match webrtc.get_local_endpoint().await {
Ok(endpoint) => endpoint,
Err(err) => {
log::warn!("failed to read local WebRTC offer: {}", err);
String::new()
}
}
} else {
(None, None)
String::new()
};
let udp_nat_port = udp.1.map(|x| *x.lock().unwrap()).unwrap_or(0);
let punch_type = if udp_nat_port > 0 { "UDP" } else { "TCP" };
@@ -467,8 +555,15 @@ impl Client {
udp_port: udp_nat_port as _,
force_relay: interface.is_force_relay(),
socket_addr_v6: ipv6.1.unwrap_or_default(),
webrtc_sdp_offer: webrtc_sdp_offer.clone(),
..Default::default()
});
let webrtc_session_key = webrtc_offerer
.as_ref()
.map(|webrtc| webrtc.session_key().to_owned())
.unwrap_or_default();
let mut webrtc_sdp_answer = String::new();
let mut pending_webrtc_ice = Vec::<String>::new();
for i in 1..=3 {
log::info!(
"#{} {} punch attempt with {}, id: {}",
@@ -510,6 +605,7 @@ impl Client {
relay_server = ph.relay_server;
peer_addr = AddrMangle::decode(&ph.socket_addr);
feedback = ph.feedback;
webrtc_sdp_answer = ph.webrtc_sdp_answer;
let s = udp.0.take();
if ph.is_udp && s.is_some() {
if let Some(s) = s {
@@ -549,6 +645,38 @@ impl Client {
}
}
signed_id_pk = rr.pk().into();
let mut webrtc_bridge_stop = None;
let mut webrtc_for_connect = None;
if !rr.webrtc_sdp_answer.is_empty() {
if let Some(webrtc) = webrtc_offerer.take() {
if let Err(err) =
webrtc.set_remote_endpoint(&rr.webrtc_sdp_answer).await
{
log::warn!("failed to set WebRTC relay answer: {}", err);
} else {
for candidate in pending_webrtc_ice.drain(..) {
if let Err(err) =
webrtc.add_remote_ice_candidate(&candidate).await
{
log::warn!(
"failed to add buffered WebRTC ICE candidate: {}",
err
);
}
}
let session_key = webrtc.session_key().to_owned();
let local_ice_rx = webrtc.take_local_ice_rx();
webrtc_bridge_stop = Some(Self::spawn_webrtc_ice_bridge(
socket,
local_ice_rx,
webrtc.clone(),
peer.clone(),
session_key,
));
webrtc_for_connect = Some(webrtc);
}
}
}
let fut = Self::create_relay(
&peer,
rr.uuid,
@@ -564,30 +692,86 @@ impl Client {
}
.boxed(),
);
if let Some(mut webrtc) = webrtc_for_connect {
connect_futures.push(
async move {
webrtc.wait_connected(CONNECT_TIMEOUT).await?;
Ok((Stream::WebRTC(webrtc), None, "WebRTC"))
}
.boxed(),
);
}
// Run all connection attempts concurrently, return the first successful one
let (conn, kcp, typ) = match select_ok(connect_futures).await {
Ok(conn) => (Ok(conn.0 .0), conn.0 .1, conn.0 .2),
Err(e) => (Err(e), None, ""),
};
if let Some(stop) = webrtc_bridge_stop {
let _ = stop.send(());
}
let mut conn = conn?;
feedback = rr.feedback;
log::info!("{:?} used to establish {typ} connection", start.elapsed());
let pk =
Self::secure_connection(&peer, signed_id_pk, &key, &mut conn).await?;
return Ok((
(conn, typ == "IPv6", pk, kcp, typ),
(conn, typ == "IPv6" || typ == "WebRTC", pk, kcp, typ),
(feedback, rendezvous_server),
false,
));
}
Some(rendezvous_message::Union::IceCandidate(ice)) => {
if !webrtc_session_key.is_empty()
&& ice.from_id == peer
&& ice.to_id == Config::get_id()
&& ice.session_key == webrtc_session_key
{
pending_webrtc_ice.push(ice.candidate);
} else {
log::debug!(
"dropping ICE candidate for unexpected WebRTC session from {} key {}",
ice.from_id,
ice.session_key
);
}
}
_ => {
log::error!("Unexpected protobuf msg received: {:?}", msg_in);
}
}
}
}
drop(socket);
let mut webrtc_bridge_stop = None;
let mut webrtc_for_connect = None;
if !webrtc_sdp_answer.is_empty() {
if let Some(webrtc) = webrtc_offerer.take() {
if let Err(err) = webrtc.set_remote_endpoint(&webrtc_sdp_answer).await {
log::warn!("failed to set WebRTC answer: {}", err);
drop(socket);
} else {
for candidate in pending_webrtc_ice.drain(..) {
if let Err(err) = webrtc.add_remote_ice_candidate(&candidate).await {
log::warn!("failed to add buffered WebRTC ICE candidate: {}", err);
}
}
let session_key = webrtc.session_key().to_owned();
let local_ice_rx = webrtc.take_local_ice_rx();
webrtc_bridge_stop = Some(Self::spawn_webrtc_ice_bridge(
socket,
local_ice_rx,
webrtc.clone(),
peer.clone(),
session_key,
));
webrtc_for_connect = Some(webrtc);
}
} else {
drop(socket);
}
} else {
drop(socket);
}
if peer_addr.port() == 0 {
bail!("Failed to connect via rendezvous server");
}
@@ -621,6 +805,8 @@ impl Client {
interface,
udp.0,
ipv6.0,
webrtc_for_connect,
webrtc_bridge_stop,
punch_type,
)
.await?,
@@ -647,6 +833,8 @@ impl Client {
interface: impl Interface,
udp_socket_nat: Option<Arc<UdpSocket>>,
udp_socket_v6: Option<Arc<UdpSocket>>,
webrtc_offerer: Option<WebRTCStream>,
webrtc_bridge_stop: Option<oneshot::Sender<()>>,
punch_type: &str,
) -> ResultType<(
Stream,
@@ -705,11 +893,23 @@ impl Client {
if let Some(udp_socket_v6) = udp_socket_v6 {
connect_futures.push(udp_nat_connect(udp_socket_v6, "IPv6", connect_timeout).boxed());
}
if let Some(mut webrtc) = webrtc_offerer {
connect_futures.push(
async move {
webrtc.wait_connected(connect_timeout).await?;
Ok((Stream::WebRTC(webrtc), None, "WebRTC"))
}
.boxed(),
);
}
// Run all connection attempts concurrently, return the first successful one
let (mut conn, kcp, mut typ) = match select_ok(connect_futures).await {
Ok(conn) => (Ok(conn.0 .0), conn.0 .1, conn.0 .2),
Err(e) => (Err(e), None, ""),
};
if let Some(stop) = webrtc_bridge_stop {
let _ = stop.send(());
}
let mut direct = !conn.is_err();
if interface.is_force_relay() || conn.is_err() {