Compare commits

...

1 Commits

Author SHA1 Message Date
Ferdinand Schober
6c0dda8f37 use local-channel instead of tokio sync channel
this avoids the mutex overhead in tokio
2024-08-12 18:02:41 +02:00
8 changed files with 54 additions and 61 deletions

18
Cargo.lock generated
View File

@@ -1324,6 +1324,7 @@ dependencies = [
"lan-mouse-proto", "lan-mouse-proto",
"libadwaita", "libadwaita",
"libc", "libc",
"local-channel",
"log", "log",
"serde", "serde",
"serde_json", "serde_json",
@@ -1403,6 +1404,23 @@ version = "0.4.14"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "78b3ae25bc7c8c38cec158d1f2757ee79e9b3740fbc7ccf0e59e4b08d793fa89" checksum = "78b3ae25bc7c8c38cec158d1f2757ee79e9b3740fbc7ccf0e59e4b08d793fa89"
[[package]]
name = "local-channel"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b6cbc85e69b8df4b8bb8b89ec634e7189099cea8927a276b7384ce5488e53ec8"
dependencies = [
"futures-core",
"futures-sink",
"local-waker",
]
[[package]]
name = "local-waker"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4d873d7c67ce09b42110d801813efbc9364414e356be9935700d368351657487"
[[package]] [[package]]
name = "lock_api" name = "lock_api"
version = "0.4.12" version = "0.4.12"

View File

@@ -50,6 +50,7 @@ slab = "0.4.9"
endi = "1.1.0" endi = "1.1.0"
thiserror = "1.0.61" thiserror = "1.0.61"
tokio-util = "0.7.11" tokio-util = "0.7.11"
local-channel = "0.1.5"
[target.'cfg(unix)'.dependencies] [target.'cfg(unix)'.dependencies]
libc = "0.2.148" libc = "0.2.148"

View File

@@ -1,5 +1,5 @@
use local_channel::mpsc::Receiver;
use std::net::IpAddr; use std::net::IpAddr;
use tokio::sync::mpsc::Receiver;
use hickory_resolver::{error::ResolveError, TokioAsyncResolver}; use hickory_resolver::{error::ResolveError, TokioAsyncResolver};

View File

@@ -1,5 +1,6 @@
use capture_task::CaptureRequest; use capture_task::CaptureRequest;
use emulation_task::EmulationRequest; use emulation_task::EmulationRequest;
use local_channel::mpsc::{channel, Sender};
use log; use log;
use std::{ use std::{
cell::{Cell, RefCell}, cell::{Cell, RefCell},
@@ -8,15 +9,7 @@ use std::{
net::{IpAddr, SocketAddr}, net::{IpAddr, SocketAddr},
rc::Rc, rc::Rc,
}; };
use tokio::{ use tokio::{io::ReadHalf, join, signal, sync::Notify, task::JoinHandle};
io::ReadHalf,
join, signal,
sync::{
mpsc::{channel, Sender},
Notify,
},
task::JoinHandle,
};
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use crate::{ use crate::{
@@ -130,12 +123,12 @@ impl Server {
} }
}; };
let (capture_tx, capture_rx) = channel(1); /* requests for input capture */ let (capture_tx, capture_rx) = channel(); /* requests for input capture */
let (emulation_tx, emulation_rx) = channel(1); /* emulation requests */ let (emulation_tx, emulation_rx) = channel(); /* emulation requests */
let (udp_recv_tx, udp_recv_rx) = channel(1); /* udp receiver */ let (udp_recv_tx, udp_recv_rx) = channel(); /* udp receiver */
let (udp_send_tx, udp_send_rx) = channel(1); /* udp sender */ let (udp_send_tx, udp_send_rx) = channel(); /* udp sender */
let (request_tx, mut request_rx) = channel(1); /* frontend requests */ let (request_tx, mut request_rx) = channel(); /* frontend requests */
let (dns_tx, dns_rx) = channel(1); /* dns requests */ let (dns_tx, dns_rx) = channel(); /* dns requests */
// udp task // udp task
let network = network_task::new(self.clone(), udp_recv_tx.clone(), udp_send_rx).await?; let network = network_task::new(self.clone(), udp_recv_tx.clone(), udp_send_rx).await?;
@@ -200,7 +193,7 @@ impl Server {
let request = self.pending_dns_requests.borrow_mut().pop_front(); let request = self.pending_dns_requests.borrow_mut().pop_front();
request request
} { } {
dns_tx.send(request).await.expect("channel closed"); dns_tx.send(request).expect("channel closed");
} }
} }
_ = self.cancelled() => break, _ = self.cancelled() => break,
@@ -213,13 +206,6 @@ impl Server {
log::info!("terminating service"); log::info!("terminating service");
assert!(!capture_tx.is_closed());
assert!(!emulation_tx.is_closed());
assert!(!udp_recv_tx.is_closed());
assert!(!udp_send_tx.is_closed());
assert!(!request_tx.is_closed());
assert!(!dns_tx.is_closed());
self.cancel(); self.cancel();
futures::future::join_all(join_handles).await; futures::future::join_all(join_handles).await;
let _ = join!(capture, dns_task, emulation, network, ping); let _ = join!(capture, dns_task, emulation, network, ping);
@@ -377,8 +363,8 @@ impl Server {
None => return, None => return,
}; };
let _ = capture.send(CaptureRequest::Destroy(handle)).await; let _ = capture.send(CaptureRequest::Destroy(handle));
let _ = emulate.send(EmulationRequest::Destroy(handle)).await; let _ = emulate.send(EmulationRequest::Destroy(handle));
log::debug!("deactivating client {handle} done"); log::debug!("deactivating client {handle} done");
} }
@@ -410,10 +396,8 @@ impl Server {
}; };
/* notify emulation, capture and frontends */ /* notify emulation, capture and frontends */
let _ = capture let _ = capture.send(CaptureRequest::Create(handle, pos.into()));
.send(CaptureRequest::Create(handle, pos.into())) let _ = emulate.send(EmulationRequest::Create(handle));
.await;
let _ = emulate.send(EmulationRequest::Create(handle)).await;
log::debug!("activating client {handle} done"); log::debug!("activating client {handle} done");
} }
@@ -433,8 +417,8 @@ impl Server {
}; };
if active { if active {
let _ = capture.send(CaptureRequest::Destroy(handle)).await; let _ = capture.send(CaptureRequest::Destroy(handle));
let _ = emulate.send(EmulationRequest::Destroy(handle)).await; let _ = emulate.send(EmulationRequest::Destroy(handle));
} }
} }
@@ -517,13 +501,11 @@ impl Server {
// update state in event input emulator & input capture // update state in event input emulator & input capture
if changed { if changed {
if active { if active {
let _ = capture.send(CaptureRequest::Destroy(handle)).await; let _ = capture.send(CaptureRequest::Destroy(handle));
let _ = emulate.send(EmulationRequest::Destroy(handle)).await; let _ = emulate.send(EmulationRequest::Destroy(handle));
} }
let _ = capture let _ = capture.send(CaptureRequest::Create(handle, pos.into()));
.send(CaptureRequest::Create(handle, pos.into())) let _ = emulate.send(EmulationRequest::Create(handle));
.await;
let _ = emulate.send(EmulationRequest::Create(handle)).await;
} }
} }
@@ -595,7 +577,7 @@ async fn listen_frontend(
let request = frontend::wait_for_request(&mut stream).await; let request = frontend::wait_for_request(&mut stream).await;
match request { match request {
Ok(request) => { Ok(request) => {
let _ = request_tx.send(request).await; let _ = request_tx.send(request);
} }
Err(e) => { Err(e) => {
if let Some(e) = e.downcast_ref::<io::Error>() { if let Some(e) = e.downcast_ref::<io::Error>() {

View File

@@ -1,12 +1,9 @@
use futures::StreamExt; use futures::StreamExt;
use lan_mouse_proto::ProtoEvent; use lan_mouse_proto::ProtoEvent;
use local_channel::mpsc::{Receiver, Sender};
use std::net::SocketAddr; use std::net::SocketAddr;
use tokio::{ use tokio::{process::Command, task::JoinHandle};
process::Command,
sync::mpsc::{Receiver, Sender},
task::JoinHandle,
};
use input_capture::{ use input_capture::{
self, CaptureError, CaptureEvent, CaptureHandle, InputCapture, InputCaptureError, Position, self, CaptureError, CaptureEvent, CaptureHandle, InputCapture, InputCaptureError, Position,
@@ -161,7 +158,7 @@ async fn handle_capture_event(
/* released capture */ /* released capture */
State::Receiving => ProtoEvent::Leave(0), State::Receiving => ProtoEvent::Leave(0),
}; };
sender_tx.send((event, addr)).await.expect("sender closed"); sender_tx.send((event, addr)).expect("sender closed");
}; };
Ok(()) Ok(())

View File

@@ -1,10 +1,8 @@
use local_channel::mpsc::{Receiver, Sender};
use std::net::SocketAddr; use std::net::SocketAddr;
use lan_mouse_proto::ProtoEvent; use lan_mouse_proto::ProtoEvent;
use tokio::{ use tokio::task::JoinHandle;
sync::mpsc::{Receiver, Sender},
task::JoinHandle,
};
use crate::{ use crate::{
client::{ClientHandle, ClientManager}, client::{ClientHandle, ClientManager},
@@ -140,7 +138,7 @@ async fn handle_incoming_event(
match (event, addr) { match (event, addr) {
(ProtoEvent::Pong, _) => { /* ignore pong events */ } (ProtoEvent::Pong, _) => { /* ignore pong events */ }
(ProtoEvent::Ping, addr) => { (ProtoEvent::Ping, addr) => {
let _ = sender_tx.send((ProtoEvent::Pong, addr)).await; let _ = sender_tx.send((ProtoEvent::Pong, addr));
} }
(ProtoEvent::Leave(_), _) => emulate.release_keys(handle).await?, (ProtoEvent::Leave(_), _) => emulate.release_keys(handle).await?,
(ProtoEvent::Ack(_), _) => server.set_state(State::Sending), (ProtoEvent::Ack(_), _) => server.set_state(State::Sending),
@@ -148,7 +146,6 @@ async fn handle_incoming_event(
server.set_state(State::Receiving); server.set_state(State::Receiving);
sender_tx sender_tx
.send((ProtoEvent::Ack(0), addr)) .send((ProtoEvent::Ack(0), addr))
.await
.expect("no channel") .expect("no channel")
} }
(ProtoEvent::Input(e), _) => { (ProtoEvent::Input(e), _) => {

View File

@@ -1,11 +1,8 @@
use local_channel::mpsc::{Receiver, Sender};
use std::{io, net::SocketAddr}; use std::{io, net::SocketAddr};
use thiserror::Error; use thiserror::Error;
use tokio::{ use tokio::{net::UdpSocket, task::JoinHandle};
net::UdpSocket,
sync::mpsc::{Receiver, Sender},
task::JoinHandle,
};
use super::Server; use super::Server;
use lan_mouse_proto::{ProtoEvent, ProtocolError}; use lan_mouse_proto::{ProtoEvent, ProtocolError};
@@ -65,7 +62,7 @@ async fn udp_receiver(
) { ) {
loop { loop {
let event = receive_event(socket).await; let event = receive_event(socket).await;
receiver_tx.send(event).await.expect("channel closed"); receiver_tx.send(event).expect("channel closed");
} }
} }

View File

@@ -1,7 +1,8 @@
use std::{net::SocketAddr, time::Duration}; use std::{net::SocketAddr, time::Duration};
use lan_mouse_proto::ProtoEvent; use lan_mouse_proto::ProtoEvent;
use tokio::{sync::mpsc::Sender, task::JoinHandle}; use local_channel::mpsc::Sender;
use tokio::task::JoinHandle;
use crate::client::ClientHandle; use crate::client::ClientHandle;
@@ -85,7 +86,7 @@ async fn ping_task(
// ping clients // ping clients
for addr in ping_addrs { for addr in ping_addrs {
if sender_ch.send((ProtoEvent::Ping, addr)).await.is_err() { if sender_ch.send((ProtoEvent::Ping, addr)).is_err() {
break; break;
} }
} }
@@ -122,14 +123,14 @@ async fn ping_task(
if receiving { if receiving {
for h in unresponsive_clients { for h in unresponsive_clients {
log::warn!("device not responding, releasing keys!"); log::warn!("device not responding, releasing keys!");
let _ = emulate_notify.send(EmulationRequest::ReleaseKeys(h)).await; let _ = emulate_notify.send(EmulationRequest::ReleaseKeys(h));
} }
} else { } else {
// release pointer if the active client has not responded // release pointer if the active client has not responded
if !unresponsive_clients.is_empty() { if !unresponsive_clients.is_empty() {
log::warn!("client not responding, releasing pointer!"); log::warn!("client not responding, releasing pointer!");
server.state.replace(State::Receiving); server.state.replace(State::Receiving);
let _ = capture_notify.send(CaptureRequest::Release).await; let _ = capture_notify.send(CaptureRequest::Release);
} }
} }
} }