Compare commits

...

5 Commits

Author SHA1 Message Date
rustdesk
1040df0399 fix: update CLI for current client APIs 2026-05-17 15:43:48 +08:00
rustdesk
03cbf609f6 feat: race WebRTC as a direct transport enhancement 2026-05-17 15:30:50 +08:00
rustdesk
0e0ec5a551 feat: route WebRTC ICE on controlled side 2026-05-17 15:24:27 +08:00
rustdesk
c50e7d078d feat: support trickle ICE in WebRTCStream 2026-05-17 15:18:53 +08:00
rustdesk
3d6b06e854 feat: add rendezvous WebRTC signaling fields 2026-05-17 15:15:12 +08:00
6 changed files with 425 additions and 48 deletions

View File

@@ -47,7 +47,7 @@ screencapturekit = ["cpal/screencapturekit"]
[dependencies]
async-trait = "0.1"
scrap = { path = "libs/scrap", features = ["wayland"] }
hbb_common = { path = "libs/hbb_common" }
hbb_common = { path = "libs/hbb_common", features = ["webrtc"] }
serde_derive = "1.0"
serde = "1.0"
serde_json = "1.0"

View File

@@ -3,7 +3,7 @@ use async_trait::async_trait;
use hbb_common::{
config::PeerConfig,
config::READ_TIMEOUT,
futures::{SinkExt, StreamExt},
futures::StreamExt,
log,
message_proto::*,
protobuf::Message as _,
@@ -46,6 +46,7 @@ impl Session {
false,
None,
None,
None,
);
session
}
@@ -53,7 +54,7 @@ impl Session {
#[async_trait]
impl Interface for Session {
fn get_login_config_handler(&self) -> Arc<RwLock<LoginConfigHandler>> {
fn get_lch(&self) -> Arc<RwLock<LoginConfigHandler>> {
return self.lc.clone();
}
@@ -61,14 +62,20 @@ impl Interface for Session {
match msgtype {
"input-password" => {
self.sender
.send(Data::Login((self.password.clone(), true)))
.send(Data::Login((
String::new(),
String::new(),
self.password.clone(),
true,
)))
.ok();
}
"re-input-password" => {
log::error!("{}: {}", title, text);
match rpassword::prompt_password("Enter password: ") {
Ok(password) => {
let login_data = Data::Login((password, true));
let login_data =
Data::Login((String::new(), String::new(), password, true));
self.sender.send(login_data).ok();
}
Err(e) => {
@@ -93,6 +100,8 @@ impl Interface for Session {
self.lc.write().unwrap().handle_peer_info(&pi);
}
fn set_multiple_windows_session(&self, _sessions: Vec<WindowsSession>) {}
async fn handle_hash(&self, pass: &str, hash: Hash, peer: &mut Stream) {
log::info!(
"password={}",
@@ -137,8 +146,8 @@ pub async fn connect_test(id: &str, key: String, token: String) {
Err(err) => {
log::error!("Failed to connect {}: {}", &id, err);
}
Ok((mut stream, direct)) => {
log::info!("direct: {}", direct);
Ok(((mut stream, _direct, _secure, _kcp, _typ), direct)) => {
log::info!("direct: {:?}", direct);
// rpassword::prompt_password("Input anything to exit").ok();
loop {
tokio::select! {

View File

@@ -65,11 +65,12 @@ use hbb_common::{
self,
net::UdpSocket,
sync::{
mpsc::{unbounded_channel, UnboundedReceiver},
mpsc::{error::TryRecvError, unbounded_channel, UnboundedReceiver},
oneshot,
},
time::{interval, Duration, Instant},
},
webrtc::WebRTCStream,
AddrMangle, ResultType, Stream,
};
pub use helper::*;
@@ -330,6 +331,19 @@ impl Client {
} else {
(None, None)
};
let ipv6 = if crate::get_ipv6_punch_enabled() {
crate::get_ipv6_socket().await
} else {
None
};
let webrtc_offerer =
match WebRTCStream::new("", interface.is_force_relay(), CONNECT_TIMEOUT).await {
Ok(stream) => Some(stream),
Err(err) => {
log::warn!("webrtc offerer setup failed: {}", err);
None
}
};
let fut = Self::_start_inner(
peer.to_owned(),
key.to_owned(),
@@ -338,6 +352,8 @@ impl Client {
interface.clone(),
udp.clone(),
Some(stop_udp_tx),
ipv6,
webrtc_offerer,
rendezvous_server.clone(),
servers.clone(),
contained,
@@ -355,6 +371,8 @@ impl Client {
interface,
(None, None),
None,
None,
None,
rendezvous_server,
servers,
contained,
@@ -366,6 +384,68 @@ impl Client {
}
}
fn spawn_webrtc_ice_bridge(
mut socket: Stream,
mut local_ice_rx: Option<UnboundedReceiver<String>>,
webrtc: WebRTCStream,
peer: String,
session_key: String,
) -> oneshot::Sender<()> {
let (stop_tx, mut stop_rx) = oneshot::channel::<()>();
let my_id = Config::get_id();
tokio::spawn(async move {
loop {
match stop_rx.try_recv() {
Ok(_) | Err(tokio::sync::oneshot::error::TryRecvError::Closed) => break,
Err(tokio::sync::oneshot::error::TryRecvError::Empty) => {}
}
if let Some(rx) = local_ice_rx.as_mut() {
loop {
match rx.try_recv() {
Ok(candidate) => {
let mut msg = RendezvousMessage::new();
msg.set_ice_candidate(IceCandidate {
from_id: my_id.clone(),
to_id: peer.clone(),
session_key: session_key.clone(),
candidate,
..Default::default()
});
if let Err(err) = socket.send(&msg).await {
log::warn!("failed to send WebRTC ICE candidate: {}", err);
return;
}
}
Err(TryRecvError::Empty) => break,
Err(TryRecvError::Disconnected) => {
local_ice_rx = None;
break;
}
}
}
}
if let Some(msg_in) =
crate::get_next_nonkeyexchange_msg(&mut socket, Some(100)).await
{
if let Some(rendezvous_message::Union::IceCandidate(ice)) = msg_in.union {
if ice.from_id == peer
&& ice.to_id == my_id
&& ice.session_key == session_key
{
if let Err(err) = webrtc.add_remote_ice_candidate(&ice.candidate).await
{
log::warn!("failed to add WebRTC ICE candidate: {}", err);
}
}
}
}
}
});
stop_tx
}
async fn _start_inner(
peer: String,
key: String,
@@ -374,6 +454,8 @@ impl Client {
interface: impl Interface,
mut udp: (Option<Arc<UdpSocket>>, Option<Arc<Mutex<u16>>>),
stop_udp_tx: Option<oneshot::Sender<()>>,
mut ipv6: Option<(Arc<UdpSocket>, bytes::Bytes)>,
mut webrtc_offerer: Option<WebRTCStream>,
mut rendezvous_server: String,
servers: Vec<String>,
contained: bool,
@@ -446,14 +528,20 @@ impl Client {
// Stop UDP NAT test task if still running
stop_udp_tx.map(|tx| tx.send(()));
let mut msg_out = RendezvousMessage::new();
let mut ipv6 = if crate::get_ipv6_punch_enabled() {
if let Some((socket, addr)) = crate::get_ipv6_socket().await {
(Some(socket), Some(addr))
} else {
(None, None)
let mut ipv6 = ipv6
.take()
.map(|(socket, addr)| (Some(socket), Some(addr)))
.unwrap_or((None, None));
let webrtc_sdp_offer = if let Some(webrtc) = webrtc_offerer.as_ref() {
match webrtc.get_local_endpoint().await {
Ok(endpoint) => endpoint,
Err(err) => {
log::warn!("failed to read local WebRTC offer: {}", err);
String::new()
}
}
} else {
(None, None)
String::new()
};
let udp_nat_port = udp.1.map(|x| *x.lock().unwrap()).unwrap_or(0);
let punch_type = if udp_nat_port > 0 { "UDP" } else { "TCP" };
@@ -467,8 +555,15 @@ impl Client {
udp_port: udp_nat_port as _,
force_relay: interface.is_force_relay(),
socket_addr_v6: ipv6.1.unwrap_or_default(),
webrtc_sdp_offer: webrtc_sdp_offer.clone(),
..Default::default()
});
let webrtc_session_key = webrtc_offerer
.as_ref()
.map(|webrtc| webrtc.session_key().to_owned())
.unwrap_or_default();
let mut webrtc_sdp_answer = String::new();
let mut pending_webrtc_ice = Vec::<String>::new();
for i in 1..=3 {
log::info!(
"#{} {} punch attempt with {}, id: {}",
@@ -510,6 +605,7 @@ impl Client {
relay_server = ph.relay_server;
peer_addr = AddrMangle::decode(&ph.socket_addr);
feedback = ph.feedback;
webrtc_sdp_answer = ph.webrtc_sdp_answer;
let s = udp.0.take();
if ph.is_udp && s.is_some() {
if let Some(s) = s {
@@ -549,6 +645,38 @@ impl Client {
}
}
signed_id_pk = rr.pk().into();
let mut webrtc_bridge_stop = None;
let mut webrtc_for_connect = None;
if !rr.webrtc_sdp_answer.is_empty() {
if let Some(webrtc) = webrtc_offerer.take() {
if let Err(err) =
webrtc.set_remote_endpoint(&rr.webrtc_sdp_answer).await
{
log::warn!("failed to set WebRTC relay answer: {}", err);
} else {
for candidate in pending_webrtc_ice.drain(..) {
if let Err(err) =
webrtc.add_remote_ice_candidate(&candidate).await
{
log::warn!(
"failed to add buffered WebRTC ICE candidate: {}",
err
);
}
}
let session_key = webrtc.session_key().to_owned();
let local_ice_rx = webrtc.take_local_ice_rx();
webrtc_bridge_stop = Some(Self::spawn_webrtc_ice_bridge(
socket,
local_ice_rx,
webrtc.clone(),
peer.clone(),
session_key,
));
webrtc_for_connect = Some(webrtc);
}
}
}
let fut = Self::create_relay(
&peer,
rr.uuid,
@@ -564,30 +692,86 @@ impl Client {
}
.boxed(),
);
if let Some(mut webrtc) = webrtc_for_connect {
connect_futures.push(
async move {
webrtc.wait_connected(CONNECT_TIMEOUT).await?;
Ok((Stream::WebRTC(webrtc), None, "WebRTC"))
}
.boxed(),
);
}
// Run all connection attempts concurrently, return the first successful one
let (conn, kcp, typ) = match select_ok(connect_futures).await {
Ok(conn) => (Ok(conn.0 .0), conn.0 .1, conn.0 .2),
Err(e) => (Err(e), None, ""),
};
if let Some(stop) = webrtc_bridge_stop {
let _ = stop.send(());
}
let mut conn = conn?;
feedback = rr.feedback;
log::info!("{:?} used to establish {typ} connection", start.elapsed());
let pk =
Self::secure_connection(&peer, signed_id_pk, &key, &mut conn).await?;
return Ok((
(conn, typ == "IPv6", pk, kcp, typ),
(conn, typ == "IPv6" || typ == "WebRTC", pk, kcp, typ),
(feedback, rendezvous_server),
false,
));
}
Some(rendezvous_message::Union::IceCandidate(ice)) => {
if !webrtc_session_key.is_empty()
&& ice.from_id == peer
&& ice.to_id == Config::get_id()
&& ice.session_key == webrtc_session_key
{
pending_webrtc_ice.push(ice.candidate);
} else {
log::debug!(
"dropping ICE candidate for unexpected WebRTC session from {} key {}",
ice.from_id,
ice.session_key
);
}
}
_ => {
log::error!("Unexpected protobuf msg received: {:?}", msg_in);
}
}
}
}
drop(socket);
let mut webrtc_bridge_stop = None;
let mut webrtc_for_connect = None;
if !webrtc_sdp_answer.is_empty() {
if let Some(webrtc) = webrtc_offerer.take() {
if let Err(err) = webrtc.set_remote_endpoint(&webrtc_sdp_answer).await {
log::warn!("failed to set WebRTC answer: {}", err);
drop(socket);
} else {
for candidate in pending_webrtc_ice.drain(..) {
if let Err(err) = webrtc.add_remote_ice_candidate(&candidate).await {
log::warn!("failed to add buffered WebRTC ICE candidate: {}", err);
}
}
let session_key = webrtc.session_key().to_owned();
let local_ice_rx = webrtc.take_local_ice_rx();
webrtc_bridge_stop = Some(Self::spawn_webrtc_ice_bridge(
socket,
local_ice_rx,
webrtc.clone(),
peer.clone(),
session_key,
));
webrtc_for_connect = Some(webrtc);
}
} else {
drop(socket);
}
} else {
drop(socket);
}
if peer_addr.port() == 0 {
bail!("Failed to connect via rendezvous server");
}
@@ -621,6 +805,8 @@ impl Client {
interface,
udp.0,
ipv6.0,
webrtc_for_connect,
webrtc_bridge_stop,
punch_type,
)
.await?,
@@ -647,6 +833,8 @@ impl Client {
interface: impl Interface,
udp_socket_nat: Option<Arc<UdpSocket>>,
udp_socket_v6: Option<Arc<UdpSocket>>,
webrtc_offerer: Option<WebRTCStream>,
webrtc_bridge_stop: Option<oneshot::Sender<()>>,
punch_type: &str,
) -> ResultType<(
Stream,
@@ -705,11 +893,23 @@ impl Client {
if let Some(udp_socket_v6) = udp_socket_v6 {
connect_futures.push(udp_nat_connect(udp_socket_v6, "IPv6", connect_timeout).boxed());
}
if let Some(mut webrtc) = webrtc_offerer {
connect_futures.push(
async move {
webrtc.wait_connected(connect_timeout).await?;
Ok((Stream::WebRTC(webrtc), None, "WebRTC"))
}
.boxed(),
);
}
// Run all connection attempts concurrently, return the first successful one
let (mut conn, kcp, mut typ) = match select_ok(connect_futures).await {
Ok(conn) => (Ok(conn.0 .0), conn.0 .1, conn.0 .2),
Err(e) => (Err(e), None, ""),
};
if let Some(stop) = webrtc_bridge_stop {
let _ = stop.send(());
}
let mut direct = !conn.is_err();
if interface.is_force_relay() || conn.is_err() {

View File

@@ -38,49 +38,68 @@ fn main() {
if !common::global_init() {
return;
}
use clap::App;
use clap::{Arg, ArgAction, Command};
use hbb_common::log;
let args = format!(
"-p, --port-forward=[PORT-FORWARD-OPTIONS] 'Format: remote-id:local-port:remote-port[:remote-host]'
-c, --connect=[REMOTE_ID] 'test only'
-k, --key=[KEY] ''
-s, --server=[] 'Start server'",
);
let matches = App::new("rustdesk")
let matches = Command::new("rustdesk")
.version(crate::VERSION)
.author("Purslane Ltd<info@rustdesk.com>")
.about("RustDesk command line tool")
.args_from_usage(&args)
.arg(
Arg::new("port-forward")
.short('p')
.long("port-forward")
.value_name("PORT-FORWARD-OPTIONS")
.help("Format: remote-id:local-port:remote-port[:remote-host]"),
)
.arg(
Arg::new("connect")
.short('c')
.long("connect")
.value_name("REMOTE_ID")
.help("test only"),
)
.arg(Arg::new("key").short('k').long("key").value_name("KEY"))
.arg(
Arg::new("server")
.short('s')
.long("server")
.action(ArgAction::SetTrue)
.help("Start server"),
)
.get_matches();
use hbb_common::{config::LocalConfig, env_logger::*};
init_from_env(Env::default().filter_or(DEFAULT_FILTER_ENV, "info"));
if let Some(p) = matches.value_of("port-forward") {
let options: Vec<String> = p.split(":").map(|x| x.to_owned()).collect();
if let Some(p) = matches.get_one::<String>("port-forward") {
let options: Vec<String> = p.split(':').map(|x| x.to_owned()).collect();
if options.len() < 3 {
log::error!("Wrong port-forward options");
return;
}
let mut port = 0;
if let Ok(v) = options[1].parse::<i32>() {
port = v;
} else {
log::error!("Wrong local-port");
return;
}
let mut remote_port = 0;
if let Ok(v) = options[2].parse::<i32>() {
remote_port = v;
} else {
log::error!("Wrong remote-port");
return;
}
let port = match options[1].parse::<i32>() {
Ok(v) => v,
Err(_) => {
log::error!("Wrong local-port");
return;
}
};
let remote_port = match options[2].parse::<i32>() {
Ok(v) => v,
Err(_) => {
log::error!("Wrong remote-port");
return;
}
};
let mut remote_host = "localhost".to_owned();
if options.len() > 3 {
remote_host = options[3].clone();
}
common::test_rendezvous_server();
common::test_nat_type();
let key = matches.value_of("key").unwrap_or("").to_owned();
let key = matches
.get_one::<String>("key")
.map(String::as_str)
.unwrap_or("")
.to_owned();
let token = LocalConfig::get_option("access_token");
cli::start_one_port_forward(
options[0].clone(),
@@ -90,13 +109,17 @@ fn main() {
key,
token,
);
} else if let Some(p) = matches.value_of("connect") {
} else if let Some(p) = matches.get_one::<String>("connect") {
common::test_rendezvous_server();
common::test_nat_type();
let key = matches.value_of("key").unwrap_or("").to_owned();
let key = matches
.get_one::<String>("key")
.map(String::as_str)
.unwrap_or("")
.to_owned();
let token = LocalConfig::get_option("access_token");
cli::connect_test(p, key, token);
} else if let Some(p) = matches.value_of("server") {
} else if matches.get_flag("server") {
log::info!("id={}", hbb_common::config::Config::get_id());
crate::start_server(true, false);
}

View File

@@ -1,4 +1,5 @@
use std::{
collections::HashMap,
net::SocketAddr,
sync::{
atomic::{AtomicBool, Ordering},
@@ -21,8 +22,13 @@ use hbb_common::{
rendezvous_proto::*,
sleep,
socket_client::{self, connect_tcp, is_ipv4, new_direct_udp_for, new_udp_for},
tokio::{self, select, sync::Mutex, time::interval},
tokio::{
self, select,
sync::{mpsc, Mutex},
time::interval,
},
udp::FramedSocket,
webrtc::WebRTCStream,
AddrMangle, IntoTargetAddr, ResultType, Stream, TargetAddr,
};
@@ -32,11 +38,17 @@ use crate::{
};
type Message = RendezvousMessage;
type RendezvousSender = mpsc::UnboundedSender<Message>;
fn webrtc_ice_key(peer_id: &str, session_key: &str) -> String {
format!("{}\n{}", peer_id, session_key)
}
lazy_static::lazy_static! {
static ref SOLVING_PK_MISMATCH: Mutex<String> = Default::default();
static ref LAST_MSG: Mutex<(SocketAddr, Instant)> = Mutex::new((SocketAddr::new([0; 4].into(), 0), Instant::now()));
static ref LAST_RELAY_MSG: Mutex<(SocketAddr, Instant)> = Mutex::new((SocketAddr::new([0; 4].into(), 0), Instant::now()));
static ref WEBRTC_ICE_TXS: Mutex<HashMap<String, mpsc::UnboundedSender<String>>> = Default::default();
}
static SHOULD_EXIT: AtomicBool = AtomicBool::new(false);
static MANUAL_RESTARTED: AtomicBool = AtomicBool::new(false);
@@ -72,6 +84,7 @@ pub struct RendezvousMediator {
host: String,
host_prefix: String,
keep_alive: i32,
rz_sender: RendezvousSender,
}
impl RendezvousMediator {
@@ -182,11 +195,13 @@ impl RendezvousMediator {
let host = check_port(&host, RENDEZVOUS_PORT);
log::info!("start udp: {host}");
let (mut socket, mut addr) = new_udp_for(&host, CONNECT_TIMEOUT).await?;
let (rz_sender, mut rz_out_rx) = mpsc::unbounded_channel::<Message>();
let mut rz = Self {
addr: addr.clone(),
host: host.clone(),
host_prefix: Self::get_host_prefix(&host),
keep_alive: crate::DEFAULT_KEEP_ALIVE,
rz_sender,
};
let mut timer = crate::rustdesk_interval(interval(crate::TIMER_OUT));
@@ -246,6 +261,9 @@ impl RendezvousMediator {
},
}
},
Some(msg_out) = rz_out_rx.recv() => {
Sink::Framed(&mut socket, &addr).send(&msg_out).await?;
},
_ = timer.tick() => {
if SHOULD_EXIT.load(Ordering::SeqCst) {
break;
@@ -367,6 +385,22 @@ impl RendezvousMediator {
allow_err!(rz.handle_intranet(fla, server).await);
});
}
Some(rendezvous_message::Union::IceCandidate(ice)) => {
if ice.to_id != Config::get_id() {
return Ok(());
}
let key = webrtc_ice_key(&ice.from_id, &ice.session_key);
let tx = WEBRTC_ICE_TXS.lock().await.get(&key).cloned();
if let Some(tx) = tx {
let _ = tx.send(ice.candidate);
} else {
log::debug!(
"dropping ICE candidate for unknown WebRTC session from {} key {}",
ice.from_id,
ice.session_key
);
}
}
Some(rendezvous_message::Union::ConfigureUpdate(cu)) => {
let v0 = Config::get_rendezvous_servers();
Config::set_option(
@@ -389,11 +423,13 @@ impl RendezvousMediator {
let mut conn = connect_tcp(host.clone(), CONNECT_TIMEOUT).await?;
let key = crate::get_key(true).await;
crate::secure_tcp(&mut conn, &key).await?;
let (rz_sender, mut rz_out_rx) = mpsc::unbounded_channel::<Message>();
let mut rz = Self {
addr: conn.local_addr().into_target_addr()?,
host: host.clone(),
host_prefix: Self::get_host_prefix(&host),
keep_alive: crate::DEFAULT_KEEP_ALIVE,
rz_sender,
};
let mut timer = crate::rustdesk_interval(interval(crate::TIMER_OUT));
let mut last_register_sent: Option<Instant> = None;
@@ -421,6 +457,9 @@ impl RendezvousMediator {
let msg = Message::parse_from_bytes(&bytes)?;
rz.handle_resp(msg.union, Sink::Stream(&mut conn), &server, &mut update_latency).await?
}
Some(msg_out) = rz_out_rx.recv() => {
Sink::Stream(&mut conn).send(&msg_out).await?;
}
_ = timer.tick() => {
if SHOULD_EXIT.load(Ordering::SeqCst) {
break;
@@ -472,6 +511,7 @@ impl RendezvousMediator {
rr.secure,
false,
Default::default(),
String::new(),
rr.control_permissions.clone().into_option(),
)
.await
@@ -486,6 +526,7 @@ impl RendezvousMediator {
secure: bool,
initiate: bool,
socket_addr_v6: bytes::Bytes,
webrtc_sdp_answer: String,
control_permissions: Option<ControlPermissions>,
) -> ResultType<()> {
let peer_addr = AddrMangle::decode(&socket_addr);
@@ -504,6 +545,7 @@ impl RendezvousMediator {
socket_addr: socket_addr.into(),
version: crate::VERSION.to_owned(),
socket_addr_v6,
webrtc_sdp_answer,
..Default::default()
};
if initiate {
@@ -571,6 +613,7 @@ impl RendezvousMediator {
true,
true,
socket_addr_v6,
String::new(),
fla.control_permissions.into_option(),
)
.await
@@ -613,6 +656,91 @@ impl RendezvousMediator {
Ok(())
}
async fn spawn_webrtc_answerer(
&self,
ph: &PunchHole,
server: ServerPtr,
peer_addr: SocketAddr,
control_permissions: Option<ControlPermissions>,
) -> ResultType<String> {
if ph.requester_id.is_empty() {
log::warn!("WebRTC offer missing requester_id; falling back to existing transports");
return Ok(String::new());
}
let mut stream =
WebRTCStream::new(&ph.webrtc_sdp_offer, ph.force_relay, CONNECT_TIMEOUT).await?;
let answer = stream.get_local_endpoint().await?;
let session_key = stream.session_key().to_owned();
let peer_id = ph.requester_id.clone();
let (remote_ice_tx, mut remote_ice_rx) = mpsc::unbounded_channel::<String>();
WEBRTC_ICE_TXS
.lock()
.await
.insert(webrtc_ice_key(&peer_id, &session_key), remote_ice_tx);
let stream_for_remote_ice = stream.clone();
tokio::spawn(async move {
while let Some(candidate) = remote_ice_rx.recv().await {
if let Err(err) = stream_for_remote_ice.add_remote_ice_candidate(&candidate).await
{
log::warn!("failed to add remote WebRTC ICE candidate: {}", err);
}
}
});
if let Some(mut local_ice_rx) = stream.take_local_ice_rx() {
let sender = self.rz_sender.clone();
let my_id = Config::get_id();
let target_id = peer_id.clone();
let session_key_for_ice = session_key.clone();
tokio::spawn(async move {
while let Some(candidate) = local_ice_rx.recv().await {
let mut msg = Message::new();
msg.set_ice_candidate(IceCandidate {
from_id: my_id.clone(),
to_id: target_id.clone(),
session_key: session_key_for_ice.clone(),
candidate,
..Default::default()
});
let _ = sender.send(msg);
}
});
}
let peer_id_for_cleanup = peer_id.clone();
let session_key_for_cleanup = session_key.clone();
tokio::spawn(async move {
let result = stream.wait_connected(CONNECT_TIMEOUT).await;
WEBRTC_ICE_TXS
.lock()
.await
.remove(&webrtc_ice_key(
&peer_id_for_cleanup,
&session_key_for_cleanup,
));
if let Err(err) = result {
log::warn!("webrtc wait_connected failed: {}", err);
return;
}
if let Err(err) = crate::server::create_tcp_connection(
server,
Stream::WebRTC(stream),
peer_addr,
true,
control_permissions,
)
.await
{
log::warn!("failed to create WebRTC server connection: {}", err);
}
});
Ok(answer)
}
async fn handle_punch_hole(&self, ph: PunchHole, server: ServerPtr) -> ResultType<()> {
let mut peer_addr = AddrMangle::decode(&ph.socket_addr);
let last = *LAST_MSG.lock().await;
@@ -624,7 +752,22 @@ impl RendezvousMediator {
let peer_addr_v6 = hbb_common::AddrMangle::decode(&ph.socket_addr_v6);
let relay = use_ws() || Config::is_proxy() || ph.force_relay;
let mut socket_addr_v6 = Default::default();
let control_permissions = ph.control_permissions.into_option();
let control_permissions = ph.control_permissions.clone().into_option();
let webrtc_sdp_answer = if !ph.webrtc_sdp_offer.is_empty() {
self.spawn_webrtc_answerer(
&ph,
server.clone(),
peer_addr,
control_permissions.clone(),
)
.await
.unwrap_or_else(|err| {
log::warn!("failed to create WebRTC answer: {}", err);
String::new()
})
} else {
String::new()
};
if peer_addr_v6.port() > 0 && !relay {
socket_addr_v6 = start_ipv6(
peer_addr_v6,
@@ -651,6 +794,7 @@ impl RendezvousMediator {
true,
true,
socket_addr_v6.clone(),
webrtc_sdp_answer.clone(),
control_permissions,
)
.await;
@@ -664,6 +808,7 @@ impl RendezvousMediator {
nat_type: nat_type.into(),
version: crate::VERSION.to_owned(),
socket_addr_v6,
webrtc_sdp_answer,
..Default::default()
};
if ph.udp_port > 0 {