Compare commits

..

4 Commits

Author SHA1 Message Date
fufesou
bb51c6aa42 fix(ipc): cmdline, unit tests (#15069)
Signed-off-by: fufesou <linlong1266@gmail.com>
2026-05-18 17:03:04 +08:00
fufesou
78e8134ad5 fix(ipc): cmdline, use scope, deploy (#15068)
Signed-off-by: fufesou <linlong1266@gmail.com>
2026-05-18 16:52:22 +08:00
fufesou
bc2c36215d fix(ipc): scope active-user IPC routing to root CLI main requests (#15058)
* fix(ipc): scope active-user IPC routing to root CLI main requests

Signed-off-by: fufesou <linlong1266@gmail.com>

* fix(ipc): cmdline, comments fails close

Signed-off-by: fufesou <linlong1266@gmail.com>

* fix(ipc): cmdline, better check

Signed-off-by: fufesou <linlong1266@gmail.com>

* fix(ipc): cmdline, try active uid when no --server processes

Signed-off-by: fufesou <linlong1266@gmail.com>

* fix(ipc): cmdline, select active uid

Signed-off-by: fufesou <linlong1266@gmail.com>

* fix(ipc): remove unused import

Signed-off-by: fufesou <linlong1266@gmail.com>

---------

Signed-off-by: fufesou <linlong1266@gmail.com>
2026-05-18 16:32:46 +08:00
IronCodeStudios
377547fa11 scrap/wayland: insert videoconvert to fix screencast on COSMIC / DMA-BUF portals (#15063)
On Wayland compositors whose xdg-desktop-portal backend exposes screencast
frames as DMA-BUF buffers — notably xdg-desktop-portal-cosmic 0.1.0 on
Pop!_OS 24.04 / COSMIC — inbound screen capture fails. PipeWireRecorder
links pipewiresrc directly to an appsink whose caps only accept
video/x-raw BGRx/RGBx in system memory. That format set is too narrow for
the portal's buffer-type / modifier negotiation, which collapses with:

  pw.link: negotiating -> error no more output formats (-22)
  gstpipewiresrc: stream error: no more output formats
  gstbasesrc: streaming stopped, reason not-negotiated (-4)
  ERROR src/server/wayland.rs: Failed scrap Element failed to change its state

Inserting a videoconvert element between pipewiresrc and appsink widens
the negotiable format set to any system-memory video/x-raw format, giving
the portal room to settle on a format it can deliver via its SHM path.
videoconvert then converts to the BGRx/RGBx the appsink expects.

Verified on Pop!_OS 24.04 / COSMIC with gst-launch, before and after:

  # fails (current behaviour):
  gst-launch-1.0 pipewiresrc path=N ! video/x-raw,format=BGRx ! fakesink
  # works (with this change):
  gst-launch-1.0 pipewiresrc path=N ! videoconvert ! video/x-raw,format=BGRx ! fakesink

After the change, inbound connections capture and stream the desktop
normally and the "Failed scrap" error no longer occurs.

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-17 16:02:23 +08:00
11 changed files with 377 additions and 456 deletions

View File

@@ -47,7 +47,7 @@ screencapturekit = ["cpal/screencapturekit"]
[dependencies] [dependencies]
async-trait = "0.1" async-trait = "0.1"
scrap = { path = "libs/scrap", features = ["wayland"] } scrap = { path = "libs/scrap", features = ["wayland"] }
hbb_common = { path = "libs/hbb_common", features = ["webrtc"] } hbb_common = { path = "libs/hbb_common" }
serde_derive = "1.0" serde_derive = "1.0"
serde = "1.0" serde = "1.0"
serde_json = "1.0" serde_json = "1.0"

View File

@@ -276,12 +276,21 @@ impl PipeWireRecorder {
// see: https://gitlab.freedesktop.org/pipewire/pipewire/-/issues/982 // see: https://gitlab.freedesktop.org/pipewire/pipewire/-/issues/982
src.set_property("always-copy", &true)?; src.set_property("always-copy", &true)?;
// COSMIC/Wayland fix: insert videoconvert between pipewiresrc and appsink.
// xdg-desktop-portal-cosmic's modifier negotiation fails when the downstream
// format set is too narrow (appsink only accepts BGRx/RGBx), producing
// "no more output formats" / not-negotiated (-4). videoconvert accepts any
// system-memory video/x-raw format, widening negotiation so the portal can
// settle on a format it can deliver via its SHM path.
let convert = gst::ElementFactory::make("videoconvert", None)?;
let sink = gst::ElementFactory::make("appsink", None)?; let sink = gst::ElementFactory::make("appsink", None)?;
sink.set_property("drop", &true)?; sink.set_property("drop", &true)?;
sink.set_property("max-buffers", &1u32)?; sink.set_property("max-buffers", &1u32)?;
pipeline.add_many(&[&src, &sink])?; pipeline.add_many(&[&src, &convert, &sink])?;
src.link(&sink)?; src.link(&convert)?;
convert.link(&sink)?;
let appsink = sink let appsink = sink
.dynamic_cast::<AppSink>() .dynamic_cast::<AppSink>()

View File

@@ -3,7 +3,7 @@ use async_trait::async_trait;
use hbb_common::{ use hbb_common::{
config::PeerConfig, config::PeerConfig,
config::READ_TIMEOUT, config::READ_TIMEOUT,
futures::StreamExt, futures::{SinkExt, StreamExt},
log, log,
message_proto::*, message_proto::*,
protobuf::Message as _, protobuf::Message as _,
@@ -46,7 +46,6 @@ impl Session {
false, false,
None, None,
None, None,
None,
); );
session session
} }
@@ -54,7 +53,7 @@ impl Session {
#[async_trait] #[async_trait]
impl Interface for Session { impl Interface for Session {
fn get_lch(&self) -> Arc<RwLock<LoginConfigHandler>> { fn get_login_config_handler(&self) -> Arc<RwLock<LoginConfigHandler>> {
return self.lc.clone(); return self.lc.clone();
} }
@@ -62,20 +61,14 @@ impl Interface for Session {
match msgtype { match msgtype {
"input-password" => { "input-password" => {
self.sender self.sender
.send(Data::Login(( .send(Data::Login((self.password.clone(), true)))
String::new(),
String::new(),
self.password.clone(),
true,
)))
.ok(); .ok();
} }
"re-input-password" => { "re-input-password" => {
log::error!("{}: {}", title, text); log::error!("{}: {}", title, text);
match rpassword::prompt_password("Enter password: ") { match rpassword::prompt_password("Enter password: ") {
Ok(password) => { Ok(password) => {
let login_data = let login_data = Data::Login((password, true));
Data::Login((String::new(), String::new(), password, true));
self.sender.send(login_data).ok(); self.sender.send(login_data).ok();
} }
Err(e) => { Err(e) => {
@@ -100,8 +93,6 @@ impl Interface for Session {
self.lc.write().unwrap().handle_peer_info(&pi); self.lc.write().unwrap().handle_peer_info(&pi);
} }
fn set_multiple_windows_session(&self, _sessions: Vec<WindowsSession>) {}
async fn handle_hash(&self, pass: &str, hash: Hash, peer: &mut Stream) { async fn handle_hash(&self, pass: &str, hash: Hash, peer: &mut Stream) {
log::info!( log::info!(
"password={}", "password={}",
@@ -146,8 +137,8 @@ pub async fn connect_test(id: &str, key: String, token: String) {
Err(err) => { Err(err) => {
log::error!("Failed to connect {}: {}", &id, err); log::error!("Failed to connect {}: {}", &id, err);
} }
Ok(((mut stream, _direct, _secure, _kcp, _typ), direct)) => { Ok((mut stream, direct)) => {
log::info!("direct: {:?}", direct); log::info!("direct: {}", direct);
// rpassword::prompt_password("Input anything to exit").ok(); // rpassword::prompt_password("Input anything to exit").ok();
loop { loop {
tokio::select! { tokio::select! {

View File

@@ -65,12 +65,11 @@ use hbb_common::{
self, self,
net::UdpSocket, net::UdpSocket,
sync::{ sync::{
mpsc::{error::TryRecvError, unbounded_channel, UnboundedReceiver}, mpsc::{unbounded_channel, UnboundedReceiver},
oneshot, oneshot,
}, },
time::{interval, Duration, Instant}, time::{interval, Duration, Instant},
}, },
webrtc::WebRTCStream,
AddrMangle, ResultType, Stream, AddrMangle, ResultType, Stream,
}; };
pub use helper::*; pub use helper::*;
@@ -331,19 +330,6 @@ impl Client {
} else { } else {
(None, None) (None, None)
}; };
let ipv6 = if crate::get_ipv6_punch_enabled() {
crate::get_ipv6_socket().await
} else {
None
};
let webrtc_offerer =
match WebRTCStream::new("", interface.is_force_relay(), CONNECT_TIMEOUT).await {
Ok(stream) => Some(stream),
Err(err) => {
log::warn!("webrtc offerer setup failed: {}", err);
None
}
};
let fut = Self::_start_inner( let fut = Self::_start_inner(
peer.to_owned(), peer.to_owned(),
key.to_owned(), key.to_owned(),
@@ -352,8 +338,6 @@ impl Client {
interface.clone(), interface.clone(),
udp.clone(), udp.clone(),
Some(stop_udp_tx), Some(stop_udp_tx),
ipv6,
webrtc_offerer,
rendezvous_server.clone(), rendezvous_server.clone(),
servers.clone(), servers.clone(),
contained, contained,
@@ -371,8 +355,6 @@ impl Client {
interface, interface,
(None, None), (None, None),
None, None,
None,
None,
rendezvous_server, rendezvous_server,
servers, servers,
contained, contained,
@@ -384,68 +366,6 @@ impl Client {
} }
} }
fn spawn_webrtc_ice_bridge(
mut socket: Stream,
mut local_ice_rx: Option<UnboundedReceiver<String>>,
webrtc: WebRTCStream,
peer: String,
session_key: String,
) -> oneshot::Sender<()> {
let (stop_tx, mut stop_rx) = oneshot::channel::<()>();
let my_id = Config::get_id();
tokio::spawn(async move {
loop {
match stop_rx.try_recv() {
Ok(_) | Err(tokio::sync::oneshot::error::TryRecvError::Closed) => break,
Err(tokio::sync::oneshot::error::TryRecvError::Empty) => {}
}
if let Some(rx) = local_ice_rx.as_mut() {
loop {
match rx.try_recv() {
Ok(candidate) => {
let mut msg = RendezvousMessage::new();
msg.set_ice_candidate(IceCandidate {
from_id: my_id.clone(),
to_id: peer.clone(),
session_key: session_key.clone(),
candidate,
..Default::default()
});
if let Err(err) = socket.send(&msg).await {
log::warn!("failed to send WebRTC ICE candidate: {}", err);
return;
}
}
Err(TryRecvError::Empty) => break,
Err(TryRecvError::Disconnected) => {
local_ice_rx = None;
break;
}
}
}
}
if let Some(msg_in) =
crate::get_next_nonkeyexchange_msg(&mut socket, Some(100)).await
{
if let Some(rendezvous_message::Union::IceCandidate(ice)) = msg_in.union {
if ice.from_id == peer
&& ice.to_id == my_id
&& ice.session_key == session_key
{
if let Err(err) = webrtc.add_remote_ice_candidate(&ice.candidate).await
{
log::warn!("failed to add WebRTC ICE candidate: {}", err);
}
}
}
}
}
});
stop_tx
}
async fn _start_inner( async fn _start_inner(
peer: String, peer: String,
key: String, key: String,
@@ -454,8 +374,6 @@ impl Client {
interface: impl Interface, interface: impl Interface,
mut udp: (Option<Arc<UdpSocket>>, Option<Arc<Mutex<u16>>>), mut udp: (Option<Arc<UdpSocket>>, Option<Arc<Mutex<u16>>>),
stop_udp_tx: Option<oneshot::Sender<()>>, stop_udp_tx: Option<oneshot::Sender<()>>,
mut ipv6: Option<(Arc<UdpSocket>, bytes::Bytes)>,
mut webrtc_offerer: Option<WebRTCStream>,
mut rendezvous_server: String, mut rendezvous_server: String,
servers: Vec<String>, servers: Vec<String>,
contained: bool, contained: bool,
@@ -528,20 +446,14 @@ impl Client {
// Stop UDP NAT test task if still running // Stop UDP NAT test task if still running
stop_udp_tx.map(|tx| tx.send(())); stop_udp_tx.map(|tx| tx.send(()));
let mut msg_out = RendezvousMessage::new(); let mut msg_out = RendezvousMessage::new();
let mut ipv6 = ipv6 let mut ipv6 = if crate::get_ipv6_punch_enabled() {
.take() if let Some((socket, addr)) = crate::get_ipv6_socket().await {
.map(|(socket, addr)| (Some(socket), Some(addr))) (Some(socket), Some(addr))
.unwrap_or((None, None)); } else {
let webrtc_sdp_offer = if let Some(webrtc) = webrtc_offerer.as_ref() { (None, None)
match webrtc.get_local_endpoint().await {
Ok(endpoint) => endpoint,
Err(err) => {
log::warn!("failed to read local WebRTC offer: {}", err);
String::new()
}
} }
} else { } else {
String::new() (None, None)
}; };
let udp_nat_port = udp.1.map(|x| *x.lock().unwrap()).unwrap_or(0); let udp_nat_port = udp.1.map(|x| *x.lock().unwrap()).unwrap_or(0);
let punch_type = if udp_nat_port > 0 { "UDP" } else { "TCP" }; let punch_type = if udp_nat_port > 0 { "UDP" } else { "TCP" };
@@ -555,15 +467,8 @@ impl Client {
udp_port: udp_nat_port as _, udp_port: udp_nat_port as _,
force_relay: interface.is_force_relay(), force_relay: interface.is_force_relay(),
socket_addr_v6: ipv6.1.unwrap_or_default(), socket_addr_v6: ipv6.1.unwrap_or_default(),
webrtc_sdp_offer: webrtc_sdp_offer.clone(),
..Default::default() ..Default::default()
}); });
let webrtc_session_key = webrtc_offerer
.as_ref()
.map(|webrtc| webrtc.session_key().to_owned())
.unwrap_or_default();
let mut webrtc_sdp_answer = String::new();
let mut pending_webrtc_ice = Vec::<String>::new();
for i in 1..=3 { for i in 1..=3 {
log::info!( log::info!(
"#{} {} punch attempt with {}, id: {}", "#{} {} punch attempt with {}, id: {}",
@@ -605,7 +510,6 @@ impl Client {
relay_server = ph.relay_server; relay_server = ph.relay_server;
peer_addr = AddrMangle::decode(&ph.socket_addr); peer_addr = AddrMangle::decode(&ph.socket_addr);
feedback = ph.feedback; feedback = ph.feedback;
webrtc_sdp_answer = ph.webrtc_sdp_answer;
let s = udp.0.take(); let s = udp.0.take();
if ph.is_udp && s.is_some() { if ph.is_udp && s.is_some() {
if let Some(s) = s { if let Some(s) = s {
@@ -645,38 +549,6 @@ impl Client {
} }
} }
signed_id_pk = rr.pk().into(); signed_id_pk = rr.pk().into();
let mut webrtc_bridge_stop = None;
let mut webrtc_for_connect = None;
if !rr.webrtc_sdp_answer.is_empty() {
if let Some(webrtc) = webrtc_offerer.take() {
if let Err(err) =
webrtc.set_remote_endpoint(&rr.webrtc_sdp_answer).await
{
log::warn!("failed to set WebRTC relay answer: {}", err);
} else {
for candidate in pending_webrtc_ice.drain(..) {
if let Err(err) =
webrtc.add_remote_ice_candidate(&candidate).await
{
log::warn!(
"failed to add buffered WebRTC ICE candidate: {}",
err
);
}
}
let session_key = webrtc.session_key().to_owned();
let local_ice_rx = webrtc.take_local_ice_rx();
webrtc_bridge_stop = Some(Self::spawn_webrtc_ice_bridge(
socket,
local_ice_rx,
webrtc.clone(),
peer.clone(),
session_key,
));
webrtc_for_connect = Some(webrtc);
}
}
}
let fut = Self::create_relay( let fut = Self::create_relay(
&peer, &peer,
rr.uuid, rr.uuid,
@@ -692,86 +564,30 @@ impl Client {
} }
.boxed(), .boxed(),
); );
if let Some(mut webrtc) = webrtc_for_connect {
connect_futures.push(
async move {
webrtc.wait_connected(CONNECT_TIMEOUT).await?;
Ok((Stream::WebRTC(webrtc), None, "WebRTC"))
}
.boxed(),
);
}
// Run all connection attempts concurrently, return the first successful one // Run all connection attempts concurrently, return the first successful one
let (conn, kcp, typ) = match select_ok(connect_futures).await { let (conn, kcp, typ) = match select_ok(connect_futures).await {
Ok(conn) => (Ok(conn.0 .0), conn.0 .1, conn.0 .2), Ok(conn) => (Ok(conn.0 .0), conn.0 .1, conn.0 .2),
Err(e) => (Err(e), None, ""), Err(e) => (Err(e), None, ""),
}; };
if let Some(stop) = webrtc_bridge_stop {
let _ = stop.send(());
}
let mut conn = conn?; let mut conn = conn?;
feedback = rr.feedback; feedback = rr.feedback;
log::info!("{:?} used to establish {typ} connection", start.elapsed()); log::info!("{:?} used to establish {typ} connection", start.elapsed());
let pk = let pk =
Self::secure_connection(&peer, signed_id_pk, &key, &mut conn).await?; Self::secure_connection(&peer, signed_id_pk, &key, &mut conn).await?;
return Ok(( return Ok((
(conn, typ == "IPv6" || typ == "WebRTC", pk, kcp, typ), (conn, typ == "IPv6", pk, kcp, typ),
(feedback, rendezvous_server), (feedback, rendezvous_server),
false, false,
)); ));
} }
Some(rendezvous_message::Union::IceCandidate(ice)) => {
if !webrtc_session_key.is_empty()
&& ice.from_id == peer
&& ice.to_id == Config::get_id()
&& ice.session_key == webrtc_session_key
{
pending_webrtc_ice.push(ice.candidate);
} else {
log::debug!(
"dropping ICE candidate for unexpected WebRTC session from {} key {}",
ice.from_id,
ice.session_key
);
}
}
_ => { _ => {
log::error!("Unexpected protobuf msg received: {:?}", msg_in); log::error!("Unexpected protobuf msg received: {:?}", msg_in);
} }
} }
} }
} }
let mut webrtc_bridge_stop = None; drop(socket);
let mut webrtc_for_connect = None;
if !webrtc_sdp_answer.is_empty() {
if let Some(webrtc) = webrtc_offerer.take() {
if let Err(err) = webrtc.set_remote_endpoint(&webrtc_sdp_answer).await {
log::warn!("failed to set WebRTC answer: {}", err);
drop(socket);
} else {
for candidate in pending_webrtc_ice.drain(..) {
if let Err(err) = webrtc.add_remote_ice_candidate(&candidate).await {
log::warn!("failed to add buffered WebRTC ICE candidate: {}", err);
}
}
let session_key = webrtc.session_key().to_owned();
let local_ice_rx = webrtc.take_local_ice_rx();
webrtc_bridge_stop = Some(Self::spawn_webrtc_ice_bridge(
socket,
local_ice_rx,
webrtc.clone(),
peer.clone(),
session_key,
));
webrtc_for_connect = Some(webrtc);
}
} else {
drop(socket);
}
} else {
drop(socket);
}
if peer_addr.port() == 0 { if peer_addr.port() == 0 {
bail!("Failed to connect via rendezvous server"); bail!("Failed to connect via rendezvous server");
} }
@@ -805,8 +621,6 @@ impl Client {
interface, interface,
udp.0, udp.0,
ipv6.0, ipv6.0,
webrtc_for_connect,
webrtc_bridge_stop,
punch_type, punch_type,
) )
.await?, .await?,
@@ -833,8 +647,6 @@ impl Client {
interface: impl Interface, interface: impl Interface,
udp_socket_nat: Option<Arc<UdpSocket>>, udp_socket_nat: Option<Arc<UdpSocket>>,
udp_socket_v6: Option<Arc<UdpSocket>>, udp_socket_v6: Option<Arc<UdpSocket>>,
webrtc_offerer: Option<WebRTCStream>,
webrtc_bridge_stop: Option<oneshot::Sender<()>>,
punch_type: &str, punch_type: &str,
) -> ResultType<( ) -> ResultType<(
Stream, Stream,
@@ -893,23 +705,11 @@ impl Client {
if let Some(udp_socket_v6) = udp_socket_v6 { if let Some(udp_socket_v6) = udp_socket_v6 {
connect_futures.push(udp_nat_connect(udp_socket_v6, "IPv6", connect_timeout).boxed()); connect_futures.push(udp_nat_connect(udp_socket_v6, "IPv6", connect_timeout).boxed());
} }
if let Some(mut webrtc) = webrtc_offerer {
connect_futures.push(
async move {
webrtc.wait_connected(connect_timeout).await?;
Ok((Stream::WebRTC(webrtc), None, "WebRTC"))
}
.boxed(),
);
}
// Run all connection attempts concurrently, return the first successful one // Run all connection attempts concurrently, return the first successful one
let (mut conn, kcp, mut typ) = match select_ok(connect_futures).await { let (mut conn, kcp, mut typ) = match select_ok(connect_futures).await {
Ok(conn) => (Ok(conn.0 .0), conn.0 .1, conn.0 .2), Ok(conn) => (Ok(conn.0 .0), conn.0 .1, conn.0 .2),
Err(e) => (Err(e), None, ""), Err(e) => (Err(e), None, ""),
}; };
if let Some(stop) = webrtc_bridge_stop {
let _ = stop.send(());
}
let mut direct = !conn.is_err(); let mut direct = !conn.is_err();
if interface.is_force_relay() || conn.is_err() { if interface.is_force_relay() || conn.is_err() {

View File

@@ -199,6 +199,20 @@ pub fn core_main() -> Option<Vec<String>> {
} }
std::thread::spawn(move || crate::start_server(false, no_server)); std::thread::spawn(move || crate::start_server(false, no_server));
} else { } else {
#[cfg(any(target_os = "linux", target_os = "macos"))]
// Root CLI management commands must talk to the user `--server` main IPC.
// Example: `sudo rustdesk --option custom-rendezvous-server` should query the
// user's IPC instead of root's `/tmp/<app>-0/ipc`; `connect()` still limits this
// routing to empty-postfix main IPC only.
let _user_main_ipc_scope = if crate::platform::is_installed()
&& is_root()
&& is_user_main_ipc_scope_cli_command(&args)
{
Some(crate::ipc::UserMainIpcScope::new())
} else {
None
};
#[cfg(windows)] #[cfg(windows)]
{ {
use crate::platform; use crate::platform;
@@ -938,6 +952,57 @@ fn is_root() -> bool {
crate::platform::is_root() crate::platform::is_root()
} }
#[cfg(any(target_os = "linux", target_os = "macos", test))]
fn is_user_main_ipc_scope_cli_command(args: &[String]) -> bool {
matches!(
args.first().map(String::as_str),
Some("--password")
| Some("--set-unlock-pin")
| Some("--get-id")
| Some("--set-id")
| Some("--config")
| Some("--option")
| Some("--assign")
| Some("--deploy")
)
}
#[cfg(test)]
mod tests {
use super::*;
fn args(values: &[&str]) -> Vec<String> {
values.iter().map(|value| value.to_string()).collect()
}
#[test]
fn user_main_ipc_scope_cli_command_matches_management_commands_only() {
for command in [
"--password",
"--set-unlock-pin",
"--get-id",
"--set-id",
"--config",
"--option",
"--assign",
"--deploy",
] {
assert!(is_user_main_ipc_scope_cli_command(&args(&[command])));
}
for command in [
"--service",
"--server",
"--tray",
"--cm",
"--check-hwcodec-config",
"--connect",
] {
assert!(!is_user_main_ipc_scope_cli_command(&args(&[command])));
}
}
}
/// Check if the executable is a Quick Support version. /// Check if the executable is a Quick Support version.
/// Note: This function must be kept in sync with `libs/portable/src/main.rs`. /// Note: This function must be kept in sync with `libs/portable/src/main.rs`.
#[cfg(windows)] #[cfg(windows)]

View File

@@ -33,25 +33,25 @@ use hbb_common::{
tokio_util::codec::Framed, tokio_util::codec::Framed,
ResultType, ResultType,
}; };
#[cfg(any(target_os = "linux", target_os = "macos"))]
use ipc_auth::authorize_service_scoped_ipc_connection;
#[cfg(windows)] #[cfg(windows)]
pub(crate) use ipc_auth::authorize_windows_portable_service_ipc_connection; pub(crate) use ipc_auth::authorize_windows_portable_service_ipc_connection;
#[cfg(windows)] #[cfg(windows)]
pub(crate) use ipc_auth::ensure_peer_executable_matches_current_by_pid_opt; pub(crate) use ipc_auth::ensure_peer_executable_matches_current_by_pid_opt;
#[cfg(windows)] #[cfg(windows)]
pub(crate) use ipc_auth::log_rejected_windows_ipc_connection; pub(crate) use ipc_auth::log_rejected_windows_ipc_connection;
#[cfg(target_os = "linux")] #[cfg(any(target_os = "linux", target_os = "macos"))]
pub(crate) use ipc_auth::{ use ipc_auth::{active_uid, authorize_service_scoped_ipc_connection};
active_uid, ensure_peer_executable_matches_current_by_fd, is_allowed_service_peer_uid,
log_rejected_uinput_connection, peer_uid_from_fd,
};
#[cfg(windows)] #[cfg(windows)]
use ipc_auth::{ use ipc_auth::{
authorize_windows_main_ipc_connection, portable_service_listener_security_attributes, authorize_windows_main_ipc_connection, portable_service_listener_security_attributes,
should_allow_everyone_create_on_windows, should_allow_everyone_create_on_windows,
}; };
#[cfg(target_os = "linux")] #[cfg(target_os = "linux")]
pub(crate) use ipc_auth::{
ensure_peer_executable_matches_current_by_fd, is_allowed_service_peer_uid,
log_rejected_uinput_connection, peer_uid_from_fd,
};
#[cfg(target_os = "linux")]
use ipc_fs::terminal_count_candidate_uids; use ipc_fs::terminal_count_candidate_uids;
#[cfg(any(target_os = "linux", target_os = "macos"))] #[cfg(any(target_os = "linux", target_os = "macos"))]
use ipc_fs::{ use ipc_fs::{
@@ -63,6 +63,8 @@ use parity_tokio_ipc::{
}; };
use serde_derive::{Deserialize, Serialize}; use serde_derive::{Deserialize, Serialize};
#[cfg(any(target_os = "linux", target_os = "macos"))] #[cfg(any(target_os = "linux", target_os = "macos"))]
use std::cell::Cell;
#[cfg(any(target_os = "linux", target_os = "macos"))]
use std::os::unix::fs::PermissionsExt; use std::os::unix::fs::PermissionsExt;
use std::{ use std::{
collections::HashMap, collections::HashMap,
@@ -71,12 +73,47 @@ use std::{
// IPC actions here. // IPC actions here.
pub const IPC_ACTION_CLOSE: &str = "close"; pub const IPC_ACTION_CLOSE: &str = "close";
#[cfg(target_os = "windows")]
const PORTABLE_SERVICE_IPC_HANDSHAKE_TIMEOUT_MS: u64 = 3_000; const PORTABLE_SERVICE_IPC_HANDSHAKE_TIMEOUT_MS: u64 = 3_000;
#[cfg(target_os = "windows")]
pub(crate) const IPC_TOKEN_LEN: usize = 64; pub(crate) const IPC_TOKEN_LEN: usize = 64;
#[cfg(target_os = "windows")]
const IPC_TOKEN_RANDOM_BYTES: usize = IPC_TOKEN_LEN / 2; const IPC_TOKEN_RANDOM_BYTES: usize = IPC_TOKEN_LEN / 2;
#[cfg(target_os = "windows")]
const _: () = assert!(IPC_TOKEN_LEN % 2 == 0); const _: () = assert!(IPC_TOKEN_LEN % 2 == 0);
pub static EXIT_RECV_CLOSE: AtomicBool = AtomicBool::new(true); pub static EXIT_RECV_CLOSE: AtomicBool = AtomicBool::new(true);
#[cfg(any(target_os = "linux", target_os = "macos"))]
thread_local! {
static USE_USER_MAIN_IPC: Cell<bool> = Cell::new(false);
}
#[must_use = "bind this guard to a local variable to keep the IPC scope active"]
/// Thread-local guard for routing root main IPC to the active user on Linux/macOS.
#[cfg(any(target_os = "linux", target_os = "macos"))]
pub(crate) struct UserMainIpcScope {
previous: bool,
}
#[cfg(any(target_os = "linux", target_os = "macos"))]
impl UserMainIpcScope {
pub(crate) fn new() -> Self {
let previous = USE_USER_MAIN_IPC.with(|use_user_main| {
let previous = use_user_main.get();
use_user_main.set(true);
previous
});
Self { previous }
}
}
#[cfg(any(target_os = "linux", target_os = "macos"))]
impl Drop for UserMainIpcScope {
fn drop(&mut self) {
USE_USER_MAIN_IPC.with(|use_user_main| use_user_main.set(self.previous));
}
}
#[inline] #[inline]
pub async fn connect_service(ms_timeout: u64) -> ResultType<ConnectionTmpl<ConnClient>> { pub async fn connect_service(ms_timeout: u64) -> ResultType<ConnectionTmpl<ConnClient>> {
connect(ms_timeout, crate::POSTFIX_SERVICE).await connect(ms_timeout, crate::POSTFIX_SERVICE).await
@@ -1112,11 +1149,7 @@ async fn handle(data: Data, stream: &mut Connection) {
}; };
} }
pub async fn connect(ms_timeout: u64, postfix: &str) -> ResultType<ConnectionTmpl<ConnClient>> { #[cfg(target_os = "windows")]
let path = Config::ipc_path(postfix);
connect_with_path(ms_timeout, &path).await
}
pub(crate) fn generate_one_time_ipc_token() -> ResultType<String> { pub(crate) fn generate_one_time_ipc_token() -> ResultType<String> {
use hbb_common::rand::{rngs::OsRng, RngCore as _}; use hbb_common::rand::{rngs::OsRng, RngCore as _};
use std::fmt::Write as _; use std::fmt::Write as _;
@@ -1137,6 +1170,7 @@ pub(crate) fn generate_one_time_ipc_token() -> ResultType<String> {
Ok(token) Ok(token)
} }
#[cfg(target_os = "windows")]
pub(crate) fn constant_time_ipc_token_eq(expected: &str, candidate: &str) -> bool { pub(crate) fn constant_time_ipc_token_eq(expected: &str, candidate: &str) -> bool {
if expected.len() != IPC_TOKEN_LEN || candidate.len() != IPC_TOKEN_LEN { if expected.len() != IPC_TOKEN_LEN || candidate.len() != IPC_TOKEN_LEN {
return false; return false;
@@ -1149,6 +1183,7 @@ pub(crate) fn constant_time_ipc_token_eq(expected: &str, candidate: &str) -> boo
== 0 == 0
} }
#[cfg(target_os = "windows")]
pub(crate) async fn portable_service_ipc_handshake_as_client<T>( pub(crate) async fn portable_service_ipc_handshake_as_client<T>(
stream: &mut ConnectionTmpl<T>, stream: &mut ConnectionTmpl<T>,
token: &str, token: &str,
@@ -1173,6 +1208,7 @@ where
} }
} }
#[cfg(target_os = "windows")]
pub(crate) async fn portable_service_ipc_handshake_as_server<T, F>( pub(crate) async fn portable_service_ipc_handshake_as_server<T, F>(
stream: &mut ConnectionTmpl<T>, stream: &mut ConnectionTmpl<T>,
mut validate_token: F, mut validate_token: F,
@@ -1209,6 +1245,103 @@ async fn connect_with_path(ms_timeout: u64, path: &str) -> ResultType<Connection
Ok(ConnectionTmpl::new(client)) Ok(ConnectionTmpl::new(client))
} }
#[cfg(any(target_os = "linux", target_os = "macos"))]
#[inline]
fn select_server_uid_for_user_main_ipc(
server_uids: &[u32],
active_uid: Option<u32>,
prefer_root: bool,
) -> ResultType<u32> {
let mut server_uids = server_uids.to_vec();
server_uids.sort_unstable();
server_uids.dedup();
match server_uids.as_slice() {
[] => {
if let Some(uid) = active_uid {
// If no `--server` processes are found but the active user is identifiable,
// try the active user anyway because the main process may also listen on "" IPC.
return Ok(uid);
} else {
bail!("No --server process found for user main IPC")
}
}
[uid] => return Ok(*uid),
_ => {}
}
if prefer_root && server_uids.contains(&0) {
return Ok(0);
}
if let Some(active_uid) = active_uid.filter(|uid| server_uids.contains(uid)) {
return Ok(active_uid);
}
bail!("Multiple --server processes found for user main IPC");
}
#[cfg(any(target_os = "linux", target_os = "macos"))]
fn running_server_uids_for_current_exe() -> ResultType<Vec<u32>> {
let current_exe = std::env::current_exe()?;
let current_exe_path = std::fs::canonicalize(&current_exe)?;
let current_pid = hbb_common::sysinfo::Pid::from_u32(std::process::id());
let mut sys = hbb_common::sysinfo::System::new();
sys.refresh_processes();
let mut server_uids = Vec::new();
for process in sys.processes().values() {
if process.pid() == current_pid {
continue;
}
if process.cmd().get(1).map_or(true, |arg| arg != "--server") {
continue;
}
let Ok(process_path) = std::fs::canonicalize(process.exe()) else {
continue;
};
if process_path != current_exe_path {
continue;
}
let Some(uid) = process.user_id().map(|uid| **uid as u32) else {
// Root CLI management commands need a stable matching `--server` target.
// If this key process races during enumeration, failing the command is clearer
// than silently skipping it; `--server` is not expected to exit frequently.
bail!("Failed to read --server process uid");
};
server_uids.push(uid);
}
Ok(server_uids)
}
#[cfg(any(target_os = "linux", target_os = "macos"))]
fn user_main_ipc_server_uid() -> ResultType<u32> {
let server_uids = running_server_uids_for_current_exe()?;
#[cfg(target_os = "linux")]
let prefer_root = crate::platform::linux::is_login_screen_wayland();
#[cfg(target_os = "macos")]
let prefer_root = false;
select_server_uid_for_user_main_ipc(&server_uids, active_uid(), prefer_root)
}
pub async fn connect(ms_timeout: u64, postfix: &str) -> ResultType<ConnectionTmpl<ConnClient>> {
#[cfg(any(target_os = "linux", target_os = "macos"))]
{
let use_user_main_ipc = USE_USER_MAIN_IPC.with(|use_user_main| use_user_main.get());
let is_root_main_ipc =
unsafe { hbb_common::libc::geteuid() == 0 } && postfix.is_empty() && use_user_main_ipc;
if is_root_main_ipc {
let uid = user_main_ipc_server_uid()?;
let path = Config::ipc_path_for_uid(uid, postfix);
return connect_with_path(ms_timeout, &path).await;
}
let path = Config::ipc_path(postfix);
return connect_with_path(ms_timeout, &path).await;
}
#[cfg(not(any(target_os = "linux", target_os = "macos")))]
{
let path = Config::ipc_path(postfix);
connect_with_path(ms_timeout, &path).await
}
}
#[cfg(target_os = "linux")] #[cfg(target_os = "linux")]
pub async fn connect_for_uid( pub async fn connect_for_uid(
ms_timeout: u64, ms_timeout: u64,
@@ -2002,7 +2135,16 @@ mod test {
assert!(std::mem::size_of::<Data>() <= 120); assert!(std::mem::size_of::<Data>() <= 120);
} }
#[cfg(target_os = "linux")] #[cfg(any(target_os = "linux", target_os = "macos"))]
#[test]
fn test_service_ipc_path_is_shared_across_uids() {
assert_eq!(
Config::ipc_path_for_uid(0, crate::POSTFIX_SERVICE),
Config::ipc_path_for_uid(501, crate::POSTFIX_SERVICE)
);
}
#[cfg(any(target_os = "linux", target_os = "macos"))]
#[test] #[test]
fn test_ipc_path_differs_by_uid_for_cm() { fn test_ipc_path_differs_by_uid_for_cm() {
let effective_uid = unsafe { hbb_common::libc::geteuid() as u32 }; let effective_uid = unsafe { hbb_common::libc::geteuid() as u32 };
@@ -2021,4 +2163,46 @@ mod test {
Config::ipc_path_for_uid(other_uid, postfix) Config::ipc_path_for_uid(other_uid, postfix)
); );
} }
#[cfg(any(target_os = "linux", target_os = "macos"))]
#[test]
fn test_select_server_uid_uses_active_uid_when_no_server_found() {
assert_eq!(
select_server_uid_for_user_main_ipc(&[], Some(501), false).unwrap(),
501
);
}
#[cfg(any(target_os = "linux", target_os = "macos"))]
#[test]
fn test_select_server_uid_uses_single_server_uid() {
assert_eq!(
select_server_uid_for_user_main_ipc(&[501], None, false).unwrap(),
501
);
}
#[cfg(any(target_os = "linux", target_os = "macos"))]
#[test]
fn test_select_server_uid_prefers_active_uid_with_multiple_servers() {
assert_eq!(
select_server_uid_for_user_main_ipc(&[0, 501], Some(501), false).unwrap(),
501
);
}
#[cfg(any(target_os = "linux", target_os = "macos"))]
#[test]
fn test_select_server_uid_prefers_root_on_wayland_login_screen() {
assert_eq!(
select_server_uid_for_user_main_ipc(&[0, 501], Some(501), true).unwrap(),
0
);
}
#[cfg(any(target_os = "linux", target_os = "macos"))]
#[test]
fn test_select_server_uid_fails_when_multiple_servers_are_ambiguous() {
assert!(select_server_uid_for_user_main_ipc(&[501, 502], None, false).is_err());
}
} }

View File

@@ -607,27 +607,30 @@ pub(crate) fn log_rejected_windows_ipc_connection(
peer_session_id: Option<u32>, peer_session_id: Option<u32>,
expected_session_id: Option<u32>, expected_session_id: Option<u32>,
peer_is_system: Option<bool>, peer_is_system: Option<bool>,
peer_is_elevated: Option<bool>,
) { ) {
static LOG_THROTTLE: OnceLock<Mutex<UnauthorizedIpcLogThrottle>> = OnceLock::new(); static LOG_THROTTLE: OnceLock<Mutex<UnauthorizedIpcLogThrottle>> = OnceLock::new();
throttled_unauthorized_ipc_log(&LOG_THROTTLE, |suppressed| { throttled_unauthorized_ipc_log(&LOG_THROTTLE, |suppressed| {
if suppressed > 0 { if suppressed > 0 {
log::warn!( log::warn!(
"Rejected unauthorized connection on ipc channel: postfix={}, peer_pid={:?}, peer_session_id={:?}, expected_session_id={:?}, peer_is_system={:?} (suppressed {} similar events)", "Rejected unauthorized connection on ipc channel: postfix={}, peer_pid={:?}, peer_session_id={:?}, expected_session_id={:?}, peer_is_system={:?}, peer_is_elevated={:?} (suppressed {} similar events)",
postfix, postfix,
peer_pid, peer_pid,
peer_session_id, peer_session_id,
expected_session_id, expected_session_id,
peer_is_system, peer_is_system,
peer_is_elevated,
suppressed suppressed
); );
} else { } else {
log::warn!( log::warn!(
"Rejected unauthorized connection on ipc channel: postfix={}, peer_pid={:?}, peer_session_id={:?}, expected_session_id={:?}, peer_is_system={:?}", "Rejected unauthorized connection on ipc channel: postfix={}, peer_pid={:?}, peer_session_id={:?}, expected_session_id={:?}, peer_is_system={:?}, peer_is_elevated={:?}",
postfix, postfix,
peer_pid, peer_pid,
peer_session_id, peer_session_id,
expected_session_id, expected_session_id,
peer_is_system peer_is_system,
peer_is_elevated
); );
} }
}); });
@@ -655,8 +658,14 @@ pub(crate) fn authorize_service_scoped_ipc_connection(stream: &Connection, postf
#[cfg(windows)] #[cfg(windows)]
pub(crate) fn authorize_windows_main_ipc_connection(stream: &Connection, postfix: &str) -> bool { pub(crate) fn authorize_windows_main_ipc_connection(stream: &Connection, postfix: &str) -> bool {
let (authorized, peer_pid, peer_session_id, server_session_id, peer_is_system) = let (
stream.server_authorization_status(); authorized,
peer_pid,
peer_session_id,
server_session_id,
peer_is_system,
peer_is_elevated,
) = stream.server_authorization_status();
if !authorized { if !authorized {
log_rejected_windows_ipc_connection( log_rejected_windows_ipc_connection(
postfix, postfix,
@@ -664,6 +673,7 @@ pub(crate) fn authorize_windows_main_ipc_connection(stream: &Connection, postfix
peer_session_id, peer_session_id,
server_session_id, server_session_id,
peer_is_system, peer_is_system,
peer_is_elevated,
); );
return false; return false;
} }
@@ -776,7 +786,14 @@ impl ConnectionTmpl<parity_tokio_ipc::Connection> {
fn server_authorization_status( fn server_authorization_status(
&self, &self,
) -> (bool, Option<u32>, Option<u32>, Option<u32>, Option<bool>) { ) -> (
bool,
Option<u32>,
Option<u32>,
Option<u32>,
Option<bool>,
Option<bool>,
) {
let peer_pid = self.peer_pid(); let peer_pid = self.peer_pid();
let server_session_id = crate::platform::windows::get_current_process_session_id(); let server_session_id = crate::platform::windows::get_current_process_session_id();
let peer_session_id = let peer_session_id =
@@ -786,20 +803,34 @@ impl ConnectionTmpl<parity_tokio_ipc::Connection> {
let peer_is_system = peer_is_system_result let peer_is_system = peer_is_system_result
.as_ref() .as_ref()
.and_then(|r| r.as_ref().ok().copied()); .and_then(|r| r.as_ref().ok().copied());
if server_session_id.is_none() && !peer_is_system.unwrap_or(false) { let session_authorized = is_allowed_windows_session_scoped_peer(
// When the server session id cannot be determined, the session-id allow-path is
// disabled and only SYSTEM peers can be authorized.
log::debug!(
"IPC authorization: server session id unavailable; rejecting non-SYSTEM peer, peer_pid={:?}, peer_session_id={:?}",
peer_pid,
peer_session_id
);
}
let authorized = is_allowed_windows_session_scoped_peer(
peer_is_system.unwrap_or(false), peer_is_system.unwrap_or(false),
peer_session_id, peer_session_id,
server_session_id, server_session_id,
); );
let peer_is_elevated_result = if session_authorized {
None
} else {
peer_pid.map(|pid| crate::platform::windows::is_elevated(Some(pid)))
};
let peer_is_elevated = peer_is_elevated_result
.as_ref()
.and_then(|r| r.as_ref().ok().copied());
if server_session_id.is_none()
&& !peer_is_system.unwrap_or(false)
&& !peer_is_elevated.unwrap_or(false)
{
// When the server session id cannot be determined, the session-id allow-path is
// disabled and only privileged peers can be authorized.
log::debug!(
"IPC authorization: server session id unavailable; rejecting non-privileged peer, peer_pid={:?}, peer_session_id={:?}",
peer_pid,
peer_session_id
);
}
// Main IPC trusts same-session peers, LocalSystem, and elevated administrators.
// Service-scoped IPC channels keep their own stricter authorization paths.
let authorized = session_authorized || peer_is_elevated.unwrap_or(false);
if !authorized { if !authorized {
if let (Some(pid), Some(Err(err))) = (peer_pid, peer_is_system_result.as_ref()) { if let (Some(pid), Some(Err(err))) = (peer_pid, peer_is_system_result.as_ref()) {
log::debug!( log::debug!(
@@ -808,6 +839,13 @@ impl ConnectionTmpl<parity_tokio_ipc::Connection> {
err err
); );
} }
if let (Some(pid), Some(Err(err))) = (peer_pid, peer_is_elevated_result.as_ref()) {
log::debug!(
"Failed to determine whether peer process is elevated, pid={}, err={}",
pid,
err
);
}
} }
( (
authorized, authorized,
@@ -815,6 +853,7 @@ impl ConnectionTmpl<parity_tokio_ipc::Connection> {
peer_session_id, peer_session_id,
server_session_id, server_session_id,
peer_is_system, peer_is_system,
peer_is_elevated,
) )
} }

View File

@@ -38,68 +38,49 @@ fn main() {
if !common::global_init() { if !common::global_init() {
return; return;
} }
use clap::{Arg, ArgAction, Command}; use clap::App;
use hbb_common::log; use hbb_common::log;
let matches = Command::new("rustdesk") let args = format!(
"-p, --port-forward=[PORT-FORWARD-OPTIONS] 'Format: remote-id:local-port:remote-port[:remote-host]'
-c, --connect=[REMOTE_ID] 'test only'
-k, --key=[KEY] ''
-s, --server=[] 'Start server'",
);
let matches = App::new("rustdesk")
.version(crate::VERSION) .version(crate::VERSION)
.author("Purslane Ltd<info@rustdesk.com>") .author("Purslane Ltd<info@rustdesk.com>")
.about("RustDesk command line tool") .about("RustDesk command line tool")
.arg( .args_from_usage(&args)
Arg::new("port-forward")
.short('p')
.long("port-forward")
.value_name("PORT-FORWARD-OPTIONS")
.help("Format: remote-id:local-port:remote-port[:remote-host]"),
)
.arg(
Arg::new("connect")
.short('c')
.long("connect")
.value_name("REMOTE_ID")
.help("test only"),
)
.arg(Arg::new("key").short('k').long("key").value_name("KEY"))
.arg(
Arg::new("server")
.short('s')
.long("server")
.action(ArgAction::SetTrue)
.help("Start server"),
)
.get_matches(); .get_matches();
use hbb_common::{config::LocalConfig, env_logger::*}; use hbb_common::{config::LocalConfig, env_logger::*};
init_from_env(Env::default().filter_or(DEFAULT_FILTER_ENV, "info")); init_from_env(Env::default().filter_or(DEFAULT_FILTER_ENV, "info"));
if let Some(p) = matches.get_one::<String>("port-forward") { if let Some(p) = matches.value_of("port-forward") {
let options: Vec<String> = p.split(':').map(|x| x.to_owned()).collect(); let options: Vec<String> = p.split(":").map(|x| x.to_owned()).collect();
if options.len() < 3 { if options.len() < 3 {
log::error!("Wrong port-forward options"); log::error!("Wrong port-forward options");
return; return;
} }
let port = match options[1].parse::<i32>() { let mut port = 0;
Ok(v) => v, if let Ok(v) = options[1].parse::<i32>() {
Err(_) => { port = v;
log::error!("Wrong local-port"); } else {
return; log::error!("Wrong local-port");
} return;
}; }
let remote_port = match options[2].parse::<i32>() { let mut remote_port = 0;
Ok(v) => v, if let Ok(v) = options[2].parse::<i32>() {
Err(_) => { remote_port = v;
log::error!("Wrong remote-port"); } else {
return; log::error!("Wrong remote-port");
} return;
}; }
let mut remote_host = "localhost".to_owned(); let mut remote_host = "localhost".to_owned();
if options.len() > 3 { if options.len() > 3 {
remote_host = options[3].clone(); remote_host = options[3].clone();
} }
common::test_rendezvous_server(); common::test_rendezvous_server();
common::test_nat_type(); common::test_nat_type();
let key = matches let key = matches.value_of("key").unwrap_or("").to_owned();
.get_one::<String>("key")
.map(String::as_str)
.unwrap_or("")
.to_owned();
let token = LocalConfig::get_option("access_token"); let token = LocalConfig::get_option("access_token");
cli::start_one_port_forward( cli::start_one_port_forward(
options[0].clone(), options[0].clone(),
@@ -109,17 +90,13 @@ fn main() {
key, key,
token, token,
); );
} else if let Some(p) = matches.get_one::<String>("connect") { } else if let Some(p) = matches.value_of("connect") {
common::test_rendezvous_server(); common::test_rendezvous_server();
common::test_nat_type(); common::test_nat_type();
let key = matches let key = matches.value_of("key").unwrap_or("").to_owned();
.get_one::<String>("key")
.map(String::as_str)
.unwrap_or("")
.to_owned();
let token = LocalConfig::get_option("access_token"); let token = LocalConfig::get_option("access_token");
cli::connect_test(p, key, token); cli::connect_test(p, key, token);
} else if matches.get_flag("server") { } else if let Some(p) = matches.value_of("server") {
log::info!("id={}", hbb_common::config::Config::get_id()); log::info!("id={}", hbb_common::config::Config::get_id());
crate::start_server(true, false); crate::start_server(true, false);
} }

View File

@@ -614,6 +614,7 @@ fn authorize_service_scoped_ipc_connection(
peer_session_id, peer_session_id,
expected_active_session_id, expected_active_session_id,
peer_is_system, peer_is_system,
None,
); );
return false; return false;
} }

View File

@@ -1,5 +1,4 @@
use std::{ use std::{
collections::HashMap,
net::SocketAddr, net::SocketAddr,
sync::{ sync::{
atomic::{AtomicBool, Ordering}, atomic::{AtomicBool, Ordering},
@@ -22,13 +21,8 @@ use hbb_common::{
rendezvous_proto::*, rendezvous_proto::*,
sleep, sleep,
socket_client::{self, connect_tcp, is_ipv4, new_direct_udp_for, new_udp_for}, socket_client::{self, connect_tcp, is_ipv4, new_direct_udp_for, new_udp_for},
tokio::{ tokio::{self, select, sync::Mutex, time::interval},
self, select,
sync::{mpsc, Mutex},
time::interval,
},
udp::FramedSocket, udp::FramedSocket,
webrtc::WebRTCStream,
AddrMangle, IntoTargetAddr, ResultType, Stream, TargetAddr, AddrMangle, IntoTargetAddr, ResultType, Stream, TargetAddr,
}; };
@@ -38,17 +32,11 @@ use crate::{
}; };
type Message = RendezvousMessage; type Message = RendezvousMessage;
type RendezvousSender = mpsc::UnboundedSender<Message>;
fn webrtc_ice_key(peer_id: &str, session_key: &str) -> String {
format!("{}\n{}", peer_id, session_key)
}
lazy_static::lazy_static! { lazy_static::lazy_static! {
static ref SOLVING_PK_MISMATCH: Mutex<String> = Default::default(); static ref SOLVING_PK_MISMATCH: Mutex<String> = Default::default();
static ref LAST_MSG: Mutex<(SocketAddr, Instant)> = Mutex::new((SocketAddr::new([0; 4].into(), 0), Instant::now())); static ref LAST_MSG: Mutex<(SocketAddr, Instant)> = Mutex::new((SocketAddr::new([0; 4].into(), 0), Instant::now()));
static ref LAST_RELAY_MSG: Mutex<(SocketAddr, Instant)> = Mutex::new((SocketAddr::new([0; 4].into(), 0), Instant::now())); static ref LAST_RELAY_MSG: Mutex<(SocketAddr, Instant)> = Mutex::new((SocketAddr::new([0; 4].into(), 0), Instant::now()));
static ref WEBRTC_ICE_TXS: Mutex<HashMap<String, mpsc::UnboundedSender<String>>> = Default::default();
} }
static SHOULD_EXIT: AtomicBool = AtomicBool::new(false); static SHOULD_EXIT: AtomicBool = AtomicBool::new(false);
static MANUAL_RESTARTED: AtomicBool = AtomicBool::new(false); static MANUAL_RESTARTED: AtomicBool = AtomicBool::new(false);
@@ -84,7 +72,6 @@ pub struct RendezvousMediator {
host: String, host: String,
host_prefix: String, host_prefix: String,
keep_alive: i32, keep_alive: i32,
rz_sender: RendezvousSender,
} }
impl RendezvousMediator { impl RendezvousMediator {
@@ -195,13 +182,11 @@ impl RendezvousMediator {
let host = check_port(&host, RENDEZVOUS_PORT); let host = check_port(&host, RENDEZVOUS_PORT);
log::info!("start udp: {host}"); log::info!("start udp: {host}");
let (mut socket, mut addr) = new_udp_for(&host, CONNECT_TIMEOUT).await?; let (mut socket, mut addr) = new_udp_for(&host, CONNECT_TIMEOUT).await?;
let (rz_sender, mut rz_out_rx) = mpsc::unbounded_channel::<Message>();
let mut rz = Self { let mut rz = Self {
addr: addr.clone(), addr: addr.clone(),
host: host.clone(), host: host.clone(),
host_prefix: Self::get_host_prefix(&host), host_prefix: Self::get_host_prefix(&host),
keep_alive: crate::DEFAULT_KEEP_ALIVE, keep_alive: crate::DEFAULT_KEEP_ALIVE,
rz_sender,
}; };
let mut timer = crate::rustdesk_interval(interval(crate::TIMER_OUT)); let mut timer = crate::rustdesk_interval(interval(crate::TIMER_OUT));
@@ -261,9 +246,6 @@ impl RendezvousMediator {
}, },
} }
}, },
Some(msg_out) = rz_out_rx.recv() => {
Sink::Framed(&mut socket, &addr).send(&msg_out).await?;
},
_ = timer.tick() => { _ = timer.tick() => {
if SHOULD_EXIT.load(Ordering::SeqCst) { if SHOULD_EXIT.load(Ordering::SeqCst) {
break; break;
@@ -385,22 +367,6 @@ impl RendezvousMediator {
allow_err!(rz.handle_intranet(fla, server).await); allow_err!(rz.handle_intranet(fla, server).await);
}); });
} }
Some(rendezvous_message::Union::IceCandidate(ice)) => {
if ice.to_id != Config::get_id() {
return Ok(());
}
let key = webrtc_ice_key(&ice.from_id, &ice.session_key);
let tx = WEBRTC_ICE_TXS.lock().await.get(&key).cloned();
if let Some(tx) = tx {
let _ = tx.send(ice.candidate);
} else {
log::debug!(
"dropping ICE candidate for unknown WebRTC session from {} key {}",
ice.from_id,
ice.session_key
);
}
}
Some(rendezvous_message::Union::ConfigureUpdate(cu)) => { Some(rendezvous_message::Union::ConfigureUpdate(cu)) => {
let v0 = Config::get_rendezvous_servers(); let v0 = Config::get_rendezvous_servers();
Config::set_option( Config::set_option(
@@ -423,13 +389,11 @@ impl RendezvousMediator {
let mut conn = connect_tcp(host.clone(), CONNECT_TIMEOUT).await?; let mut conn = connect_tcp(host.clone(), CONNECT_TIMEOUT).await?;
let key = crate::get_key(true).await; let key = crate::get_key(true).await;
crate::secure_tcp(&mut conn, &key).await?; crate::secure_tcp(&mut conn, &key).await?;
let (rz_sender, mut rz_out_rx) = mpsc::unbounded_channel::<Message>();
let mut rz = Self { let mut rz = Self {
addr: conn.local_addr().into_target_addr()?, addr: conn.local_addr().into_target_addr()?,
host: host.clone(), host: host.clone(),
host_prefix: Self::get_host_prefix(&host), host_prefix: Self::get_host_prefix(&host),
keep_alive: crate::DEFAULT_KEEP_ALIVE, keep_alive: crate::DEFAULT_KEEP_ALIVE,
rz_sender,
}; };
let mut timer = crate::rustdesk_interval(interval(crate::TIMER_OUT)); let mut timer = crate::rustdesk_interval(interval(crate::TIMER_OUT));
let mut last_register_sent: Option<Instant> = None; let mut last_register_sent: Option<Instant> = None;
@@ -457,9 +421,6 @@ impl RendezvousMediator {
let msg = Message::parse_from_bytes(&bytes)?; let msg = Message::parse_from_bytes(&bytes)?;
rz.handle_resp(msg.union, Sink::Stream(&mut conn), &server, &mut update_latency).await? rz.handle_resp(msg.union, Sink::Stream(&mut conn), &server, &mut update_latency).await?
} }
Some(msg_out) = rz_out_rx.recv() => {
Sink::Stream(&mut conn).send(&msg_out).await?;
}
_ = timer.tick() => { _ = timer.tick() => {
if SHOULD_EXIT.load(Ordering::SeqCst) { if SHOULD_EXIT.load(Ordering::SeqCst) {
break; break;
@@ -511,7 +472,6 @@ impl RendezvousMediator {
rr.secure, rr.secure,
false, false,
Default::default(), Default::default(),
String::new(),
rr.control_permissions.clone().into_option(), rr.control_permissions.clone().into_option(),
) )
.await .await
@@ -526,7 +486,6 @@ impl RendezvousMediator {
secure: bool, secure: bool,
initiate: bool, initiate: bool,
socket_addr_v6: bytes::Bytes, socket_addr_v6: bytes::Bytes,
webrtc_sdp_answer: String,
control_permissions: Option<ControlPermissions>, control_permissions: Option<ControlPermissions>,
) -> ResultType<()> { ) -> ResultType<()> {
let peer_addr = AddrMangle::decode(&socket_addr); let peer_addr = AddrMangle::decode(&socket_addr);
@@ -545,7 +504,6 @@ impl RendezvousMediator {
socket_addr: socket_addr.into(), socket_addr: socket_addr.into(),
version: crate::VERSION.to_owned(), version: crate::VERSION.to_owned(),
socket_addr_v6, socket_addr_v6,
webrtc_sdp_answer,
..Default::default() ..Default::default()
}; };
if initiate { if initiate {
@@ -613,7 +571,6 @@ impl RendezvousMediator {
true, true,
true, true,
socket_addr_v6, socket_addr_v6,
String::new(),
fla.control_permissions.into_option(), fla.control_permissions.into_option(),
) )
.await .await
@@ -656,91 +613,6 @@ impl RendezvousMediator {
Ok(()) Ok(())
} }
async fn spawn_webrtc_answerer(
&self,
ph: &PunchHole,
server: ServerPtr,
peer_addr: SocketAddr,
control_permissions: Option<ControlPermissions>,
) -> ResultType<String> {
if ph.requester_id.is_empty() {
log::warn!("WebRTC offer missing requester_id; falling back to existing transports");
return Ok(String::new());
}
let mut stream =
WebRTCStream::new(&ph.webrtc_sdp_offer, ph.force_relay, CONNECT_TIMEOUT).await?;
let answer = stream.get_local_endpoint().await?;
let session_key = stream.session_key().to_owned();
let peer_id = ph.requester_id.clone();
let (remote_ice_tx, mut remote_ice_rx) = mpsc::unbounded_channel::<String>();
WEBRTC_ICE_TXS
.lock()
.await
.insert(webrtc_ice_key(&peer_id, &session_key), remote_ice_tx);
let stream_for_remote_ice = stream.clone();
tokio::spawn(async move {
while let Some(candidate) = remote_ice_rx.recv().await {
if let Err(err) = stream_for_remote_ice.add_remote_ice_candidate(&candidate).await
{
log::warn!("failed to add remote WebRTC ICE candidate: {}", err);
}
}
});
if let Some(mut local_ice_rx) = stream.take_local_ice_rx() {
let sender = self.rz_sender.clone();
let my_id = Config::get_id();
let target_id = peer_id.clone();
let session_key_for_ice = session_key.clone();
tokio::spawn(async move {
while let Some(candidate) = local_ice_rx.recv().await {
let mut msg = Message::new();
msg.set_ice_candidate(IceCandidate {
from_id: my_id.clone(),
to_id: target_id.clone(),
session_key: session_key_for_ice.clone(),
candidate,
..Default::default()
});
let _ = sender.send(msg);
}
});
}
let peer_id_for_cleanup = peer_id.clone();
let session_key_for_cleanup = session_key.clone();
tokio::spawn(async move {
let result = stream.wait_connected(CONNECT_TIMEOUT).await;
WEBRTC_ICE_TXS
.lock()
.await
.remove(&webrtc_ice_key(
&peer_id_for_cleanup,
&session_key_for_cleanup,
));
if let Err(err) = result {
log::warn!("webrtc wait_connected failed: {}", err);
return;
}
if let Err(err) = crate::server::create_tcp_connection(
server,
Stream::WebRTC(stream),
peer_addr,
true,
control_permissions,
)
.await
{
log::warn!("failed to create WebRTC server connection: {}", err);
}
});
Ok(answer)
}
async fn handle_punch_hole(&self, ph: PunchHole, server: ServerPtr) -> ResultType<()> { async fn handle_punch_hole(&self, ph: PunchHole, server: ServerPtr) -> ResultType<()> {
let mut peer_addr = AddrMangle::decode(&ph.socket_addr); let mut peer_addr = AddrMangle::decode(&ph.socket_addr);
let last = *LAST_MSG.lock().await; let last = *LAST_MSG.lock().await;
@@ -752,22 +624,7 @@ impl RendezvousMediator {
let peer_addr_v6 = hbb_common::AddrMangle::decode(&ph.socket_addr_v6); let peer_addr_v6 = hbb_common::AddrMangle::decode(&ph.socket_addr_v6);
let relay = use_ws() || Config::is_proxy() || ph.force_relay; let relay = use_ws() || Config::is_proxy() || ph.force_relay;
let mut socket_addr_v6 = Default::default(); let mut socket_addr_v6 = Default::default();
let control_permissions = ph.control_permissions.clone().into_option(); let control_permissions = ph.control_permissions.into_option();
let webrtc_sdp_answer = if !ph.webrtc_sdp_offer.is_empty() {
self.spawn_webrtc_answerer(
&ph,
server.clone(),
peer_addr,
control_permissions.clone(),
)
.await
.unwrap_or_else(|err| {
log::warn!("failed to create WebRTC answer: {}", err);
String::new()
})
} else {
String::new()
};
if peer_addr_v6.port() > 0 && !relay { if peer_addr_v6.port() > 0 && !relay {
socket_addr_v6 = start_ipv6( socket_addr_v6 = start_ipv6(
peer_addr_v6, peer_addr_v6,
@@ -794,7 +651,6 @@ impl RendezvousMediator {
true, true,
true, true,
socket_addr_v6.clone(), socket_addr_v6.clone(),
webrtc_sdp_answer.clone(),
control_permissions, control_permissions,
) )
.await; .await;
@@ -808,7 +664,6 @@ impl RendezvousMediator {
nat_type: nat_type.into(), nat_type: nat_type.into(),
version: crate::VERSION.to_owned(), version: crate::VERSION.to_owned(),
socket_addr_v6, socket_addr_v6,
webrtc_sdp_answer,
..Default::default() ..Default::default()
}; };
if ph.udp_port > 0 { if ph.udp_port > 0 {