From e7a1d72149de8154b7a338bf04723f35520d61b9 Mon Sep 17 00:00:00 2001 From: Ferdinand Schober Date: Mon, 12 Aug 2024 18:20:21 +0200 Subject: [PATCH] use local-channel instead of tokio sync channel (#179) this avoids the mutex overhead in tokio --- Cargo.lock | 18 +++++++++++ Cargo.toml | 1 + src/dns.rs | 2 +- src/server.rs | 58 +++++++++++++----------------------- src/server/capture_task.rs | 9 ++---- src/server/emulation_task.rs | 9 ++---- src/server/network_task.rs | 9 ++---- src/server/ping_task.rs | 9 +++--- 8 files changed, 54 insertions(+), 61 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1f0a67a..7eb1bba 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1324,6 +1324,7 @@ dependencies = [ "lan-mouse-proto", "libadwaita", "libc", + "local-channel", "log", "serde", "serde_json", @@ -1403,6 +1404,23 @@ version = "0.4.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "78b3ae25bc7c8c38cec158d1f2757ee79e9b3740fbc7ccf0e59e4b08d793fa89" +[[package]] +name = "local-channel" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6cbc85e69b8df4b8bb8b89ec634e7189099cea8927a276b7384ce5488e53ec8" +dependencies = [ + "futures-core", + "futures-sink", + "local-waker", +] + +[[package]] +name = "local-waker" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4d873d7c67ce09b42110d801813efbc9364414e356be9935700d368351657487" + [[package]] name = "lock_api" version = "0.4.12" diff --git a/Cargo.toml b/Cargo.toml index efde54f..698eccc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -50,6 +50,7 @@ slab = "0.4.9" endi = "1.1.0" thiserror = "1.0.61" tokio-util = "0.7.11" +local-channel = "0.1.5" [target.'cfg(unix)'.dependencies] libc = "0.2.148" diff --git a/src/dns.rs b/src/dns.rs index bdbabee..160efbb 100644 --- a/src/dns.rs +++ b/src/dns.rs @@ -1,5 +1,5 @@ +use local_channel::mpsc::Receiver; use std::net::IpAddr; -use tokio::sync::mpsc::Receiver; use hickory_resolver::{error::ResolveError, TokioAsyncResolver}; diff --git a/src/server.rs b/src/server.rs index b36f933..25fc633 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,5 +1,6 @@ use capture_task::CaptureRequest; use emulation_task::EmulationRequest; +use local_channel::mpsc::{channel, Sender}; use log; use std::{ cell::{Cell, RefCell}, @@ -8,15 +9,7 @@ use std::{ net::{IpAddr, SocketAddr}, rc::Rc, }; -use tokio::{ - io::ReadHalf, - join, signal, - sync::{ - mpsc::{channel, Sender}, - Notify, - }, - task::JoinHandle, -}; +use tokio::{io::ReadHalf, join, signal, sync::Notify, task::JoinHandle}; use tokio_util::sync::CancellationToken; use crate::{ @@ -130,12 +123,12 @@ impl Server { } }; - let (capture_tx, capture_rx) = channel(1); /* requests for input capture */ - let (emulation_tx, emulation_rx) = channel(1); /* emulation requests */ - let (udp_recv_tx, udp_recv_rx) = channel(1); /* udp receiver */ - let (udp_send_tx, udp_send_rx) = channel(1); /* udp sender */ - let (request_tx, mut request_rx) = channel(1); /* frontend requests */ - let (dns_tx, dns_rx) = channel(1); /* dns requests */ + let (capture_tx, capture_rx) = channel(); /* requests for input capture */ + let (emulation_tx, emulation_rx) = channel(); /* emulation requests */ + let (udp_recv_tx, udp_recv_rx) = channel(); /* udp receiver */ + let (udp_send_tx, udp_send_rx) = channel(); /* udp sender */ + let (request_tx, mut request_rx) = channel(); /* frontend requests */ + let (dns_tx, dns_rx) = channel(); /* dns requests */ // udp task let network = network_task::new(self.clone(), udp_recv_tx.clone(), udp_send_rx).await?; @@ -200,7 +193,7 @@ impl Server { let request = self.pending_dns_requests.borrow_mut().pop_front(); request } { - dns_tx.send(request).await.expect("channel closed"); + dns_tx.send(request).expect("channel closed"); } } _ = self.cancelled() => break, @@ -213,13 +206,6 @@ impl Server { log::info!("terminating service"); - assert!(!capture_tx.is_closed()); - assert!(!emulation_tx.is_closed()); - assert!(!udp_recv_tx.is_closed()); - assert!(!udp_send_tx.is_closed()); - assert!(!request_tx.is_closed()); - assert!(!dns_tx.is_closed()); - self.cancel(); futures::future::join_all(join_handles).await; let _ = join!(capture, dns_task, emulation, network, ping); @@ -377,8 +363,8 @@ impl Server { None => return, }; - let _ = capture.send(CaptureRequest::Destroy(handle)).await; - let _ = emulate.send(EmulationRequest::Destroy(handle)).await; + let _ = capture.send(CaptureRequest::Destroy(handle)); + let _ = emulate.send(EmulationRequest::Destroy(handle)); log::debug!("deactivating client {handle} done"); } @@ -410,10 +396,8 @@ impl Server { }; /* notify emulation, capture and frontends */ - let _ = capture - .send(CaptureRequest::Create(handle, pos.into())) - .await; - let _ = emulate.send(EmulationRequest::Create(handle)).await; + let _ = capture.send(CaptureRequest::Create(handle, pos.into())); + let _ = emulate.send(EmulationRequest::Create(handle)); log::debug!("activating client {handle} done"); } @@ -433,8 +417,8 @@ impl Server { }; if active { - let _ = capture.send(CaptureRequest::Destroy(handle)).await; - let _ = emulate.send(EmulationRequest::Destroy(handle)).await; + let _ = capture.send(CaptureRequest::Destroy(handle)); + let _ = emulate.send(EmulationRequest::Destroy(handle)); } } @@ -517,13 +501,11 @@ impl Server { // update state in event input emulator & input capture if changed { if active { - let _ = capture.send(CaptureRequest::Destroy(handle)).await; - let _ = emulate.send(EmulationRequest::Destroy(handle)).await; + let _ = capture.send(CaptureRequest::Destroy(handle)); + let _ = emulate.send(EmulationRequest::Destroy(handle)); } - let _ = capture - .send(CaptureRequest::Create(handle, pos.into())) - .await; - let _ = emulate.send(EmulationRequest::Create(handle)).await; + let _ = capture.send(CaptureRequest::Create(handle, pos.into())); + let _ = emulate.send(EmulationRequest::Create(handle)); } } @@ -595,7 +577,7 @@ async fn listen_frontend( let request = frontend::wait_for_request(&mut stream).await; match request { Ok(request) => { - let _ = request_tx.send(request).await; + let _ = request_tx.send(request); } Err(e) => { if let Some(e) = e.downcast_ref::() { diff --git a/src/server/capture_task.rs b/src/server/capture_task.rs index 1700dd0..9da2c14 100644 --- a/src/server/capture_task.rs +++ b/src/server/capture_task.rs @@ -1,12 +1,9 @@ use futures::StreamExt; use lan_mouse_proto::ProtoEvent; +use local_channel::mpsc::{Receiver, Sender}; use std::net::SocketAddr; -use tokio::{ - process::Command, - sync::mpsc::{Receiver, Sender}, - task::JoinHandle, -}; +use tokio::{process::Command, task::JoinHandle}; use input_capture::{ self, CaptureError, CaptureEvent, CaptureHandle, InputCapture, InputCaptureError, Position, @@ -161,7 +158,7 @@ async fn handle_capture_event( /* released capture */ State::Receiving => ProtoEvent::Leave(0), }; - sender_tx.send((event, addr)).await.expect("sender closed"); + sender_tx.send((event, addr)).expect("sender closed"); }; Ok(()) diff --git a/src/server/emulation_task.rs b/src/server/emulation_task.rs index 78af81f..d9db606 100644 --- a/src/server/emulation_task.rs +++ b/src/server/emulation_task.rs @@ -1,10 +1,8 @@ +use local_channel::mpsc::{Receiver, Sender}; use std::net::SocketAddr; use lan_mouse_proto::ProtoEvent; -use tokio::{ - sync::mpsc::{Receiver, Sender}, - task::JoinHandle, -}; +use tokio::task::JoinHandle; use crate::{ client::{ClientHandle, ClientManager}, @@ -140,7 +138,7 @@ async fn handle_incoming_event( match (event, addr) { (ProtoEvent::Pong, _) => { /* ignore pong events */ } (ProtoEvent::Ping, addr) => { - let _ = sender_tx.send((ProtoEvent::Pong, addr)).await; + let _ = sender_tx.send((ProtoEvent::Pong, addr)); } (ProtoEvent::Leave(_), _) => emulate.release_keys(handle).await?, (ProtoEvent::Ack(_), _) => server.set_state(State::Sending), @@ -148,7 +146,6 @@ async fn handle_incoming_event( server.set_state(State::Receiving); sender_tx .send((ProtoEvent::Ack(0), addr)) - .await .expect("no channel") } (ProtoEvent::Input(e), _) => { diff --git a/src/server/network_task.rs b/src/server/network_task.rs index 09a48d3..a90794a 100644 --- a/src/server/network_task.rs +++ b/src/server/network_task.rs @@ -1,11 +1,8 @@ +use local_channel::mpsc::{Receiver, Sender}; use std::{io, net::SocketAddr}; use thiserror::Error; -use tokio::{ - net::UdpSocket, - sync::mpsc::{Receiver, Sender}, - task::JoinHandle, -}; +use tokio::{net::UdpSocket, task::JoinHandle}; use super::Server; use lan_mouse_proto::{ProtoEvent, ProtocolError}; @@ -65,7 +62,7 @@ async fn udp_receiver( ) { loop { let event = receive_event(socket).await; - receiver_tx.send(event).await.expect("channel closed"); + receiver_tx.send(event).expect("channel closed"); } } diff --git a/src/server/ping_task.rs b/src/server/ping_task.rs index ad6534d..4b4534c 100644 --- a/src/server/ping_task.rs +++ b/src/server/ping_task.rs @@ -1,7 +1,8 @@ use std::{net::SocketAddr, time::Duration}; use lan_mouse_proto::ProtoEvent; -use tokio::{sync::mpsc::Sender, task::JoinHandle}; +use local_channel::mpsc::Sender; +use tokio::task::JoinHandle; use crate::client::ClientHandle; @@ -85,7 +86,7 @@ async fn ping_task( // ping clients for addr in ping_addrs { - if sender_ch.send((ProtoEvent::Ping, addr)).await.is_err() { + if sender_ch.send((ProtoEvent::Ping, addr)).is_err() { break; } } @@ -122,14 +123,14 @@ async fn ping_task( if receiving { for h in unresponsive_clients { log::warn!("device not responding, releasing keys!"); - let _ = emulate_notify.send(EmulationRequest::ReleaseKeys(h)).await; + let _ = emulate_notify.send(EmulationRequest::ReleaseKeys(h)); } } else { // release pointer if the active client has not responded if !unresponsive_clients.is_empty() { log::warn!("client not responding, releasing pointer!"); server.state.replace(State::Receiving); - let _ = capture_notify.send(CaptureRequest::Release).await; + let _ = capture_notify.send(CaptureRequest::Release); } } }