From ab2514e508b52702a82d8902c0113b74d4b82067 Mon Sep 17 00:00:00 2001 From: Ferdinand Schober Date: Wed, 11 Oct 2023 14:52:18 +0200 Subject: [PATCH] Async (#30) - manual eventloop now replaced by asycn-await using the tokio runtime - dns no longer blocks the event loop - simplifies logic - makes xdg-desktop-portal easier to integrate --- Cargo.lock | 27 +- Cargo.toml | 3 +- src/backend/producer/wayland.rs | 33 +- src/backend/producer/windows.rs | 48 +-- src/backend/producer/x11.rs | 37 +- src/dns.rs | 15 +- src/event/server.rs | 599 ++++++++++++++++---------------- src/frontend.rs | 171 +++------ src/frontend/cli.rs | 4 +- src/frontend/gtk.rs | 5 +- src/frontend/gtk/window.rs | 2 +- src/main.rs | 49 ++- src/producer.rs | 25 +- 13 files changed, 453 insertions(+), 565 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0e8b185..d2a5437 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -685,11 +685,10 @@ dependencies = [ "libc", "log", "memmap", - "mio", - "mio-signals", "serde", "serde_json", "tempfile", + "tokio", "toml", "trust-dns-resolver", "wayland-client", @@ -842,22 +841,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "927a765cd3fc26206e66b296465fa9d3e5ab003e651c1b3c060e7956d96b19d2" dependencies = [ "libc", - "log", "wasi", "windows-sys", ] -[[package]] -name = "mio-signals" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "21e9524e26c8749824640a1282b68a695b21d55862efa6d465b5f71107c93368" -dependencies = [ - "libc", - "log", - "mio", -] - [[package]] name = "nix" version = "0.26.4" @@ -1346,9 +1333,21 @@ dependencies = [ "num_cpus", "pin-project-lite", "socket2", + "tokio-macros", "windows-sys", ] +[[package]] +name = "tokio-macros" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.37", +] + [[package]] name = "toml" version = "0.7.8" diff --git a/Cargo.toml b/Cargo.toml index d2ac360..7b6037b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,9 +21,9 @@ serde = { version = "1.0", features = ["derive"] } anyhow = "1.0.71" log = "0.4.20" env_logger = "0.10.0" -mio = { version = "0.8", features = ["os-ext"] } libc = "0.2.148" serde_json = "1.0.107" +tokio = {version = "1.32.0", features = ["io-util", "macros", "net", "rt", "sync" ] } [target.'cfg(unix)'.dependencies] wayland-client = { version="0.30.2", optional = true } @@ -31,7 +31,6 @@ wayland-protocols = { version="0.30.0", features=["client", "staging", "unstable wayland-protocols-wlr = { version="0.1.0", features=["client"], optional = true } wayland-protocols-misc = { version="0.1.0", features=["client"], optional = true } wayland-protocols-plasma = { version="0.1.0", features=["client"], optional = true } -mio-signals = "0.2.0" x11 = { version = "2.21.0", features = ["xlib", "xtest"], optional = true } gtk = { package = "gtk4", version = "0.7.2", features = ["v4_6"], optional = true } adw = { package = "libadwaita", version = "0.5.2", features = ["v1_1"], optional = true } diff --git a/src/backend/producer/wayland.rs b/src/backend/producer/wayland.rs index 03f025d..346772a 100644 --- a/src/backend/producer/wayland.rs +++ b/src/backend/producer/wayland.rs @@ -1,9 +1,9 @@ use crate::{client::{ClientHandle, Position, ClientEvent}, producer::EventProducer}; -use mio::{event::Source, unix::SourceFd}; -use std::{os::fd::RawFd, vec::Drain, io::ErrorKind, env}; +use std::{os::fd::RawFd, vec::Drain, io::{ErrorKind, self}, env}; use memmap::MmapOptions; use anyhow::{anyhow, Result}; +use tokio::io::unix::AsyncFd; use std::{ fs::File, @@ -421,29 +421,12 @@ impl State { } } -impl Source for WaylandEventProducer { - fn register( - &mut self, - registry: &mio::Registry, - token: mio::Token, - interests: mio::Interest, - ) -> std::io::Result<()> { - SourceFd(&self.state.wayland_fd).register(registry, token, interests) - } - - fn reregister( - &mut self, - registry: &mio::Registry, - token: mio::Token, - interests: mio::Interest, - ) -> std::io::Result<()> { - SourceFd(&self.state.wayland_fd).reregister(registry, token, interests) - } - - fn deregister(&mut self, registry: &mio::Registry) -> std::io::Result<()> { - SourceFd(&self.state.wayland_fd).deregister(registry) +impl AsRawFd for WaylandEventProducer { + fn as_raw_fd(&self) -> RawFd { + self.state.wayland_fd } } + impl WaylandEventProducer { fn read(&mut self) -> bool { match self.state.read_guard.take().unwrap().read() { @@ -508,6 +491,10 @@ impl WaylandEventProducer { impl EventProducer for WaylandEventProducer { + fn get_async_fd(&self) -> io::Result> { + AsyncFd::new(self.as_raw_fd()) + } + fn read_events(&mut self) -> Drain<(ClientHandle, Event)> { // read events while self.read() { diff --git a/src/backend/producer/windows.rs b/src/backend/producer/windows.rs index c7b6f69..27de5ec 100644 --- a/src/backend/producer/windows.rs +++ b/src/backend/producer/windows.rs @@ -1,8 +1,4 @@ -use std::vec::Drain; - -use mio::{Token, Registry}; -use mio::event::Source; -use std::io::Result; +use tokio::sync::mpsc::{self, Receiver, Sender}; use crate::{ client::{ClientHandle, ClientEvent}, @@ -11,48 +7,24 @@ use crate::{ }; pub struct WindowsProducer { - pending_events: Vec<(ClientHandle, Event)>, + _tx: Sender<(ClientHandle, Event)>, + rx: Option>, } -impl Source for WindowsProducer { - fn register( - &mut self, - _registry: &Registry, - _token: Token, - _interests: mio::Interest, - ) -> Result<()> { - Ok(()) - } - - fn reregister( - &mut self, - _registry: &Registry, - _token: Token, - _interests: mio::Interest, - ) -> Result<()> { - Ok(()) - } - - fn deregister(&mut self, _registry: &Registry) -> Result<()> { - Ok(()) - } -} - - impl EventProducer for WindowsProducer { fn notify(&mut self, _: ClientEvent) { } - fn read_events(&mut self) -> Drain<(ClientHandle, Event)> { - self.pending_events.drain(..) - } - fn release(&mut self) { } + + fn get_wait_channel(&mut self) -> Option> { + self.rx.take() + } } impl WindowsProducer { pub(crate) fn new() -> Self { - Self { - pending_events: vec![], - } + let (_tx, rx) = mpsc::channel(1); + let rx = Some(rx); + Self { _tx, rx } } } diff --git a/src/backend/producer/x11.rs b/src/backend/producer/x11.rs index 5ec37ce..379534a 100644 --- a/src/backend/producer/x11.rs +++ b/src/backend/producer/x11.rs @@ -1,12 +1,13 @@ +use std::io::Result; +use std::os::fd::{AsRawFd, self}; use std::vec::Drain; -use mio::{Token, Registry}; -use mio::event::Source; -use std::io::Result; +use tokio::io::unix::AsyncFd; +use crate::event::Event; use crate::producer::EventProducer; -use crate::{client::{ClientHandle, ClientEvent}, event::Event}; +use crate::client::{ClientEvent, ClientHandle}; pub struct X11Producer { pending_events: Vec<(ClientHandle, Event)>, @@ -20,27 +21,9 @@ impl X11Producer { } } -impl Source for X11Producer { - fn register( - &mut self, - _registry: &Registry, - _token: Token, - _interests: mio::Interest, - ) -> Result<()> { - Ok(()) - } - - fn reregister( - &mut self, - _registry: &Registry, - _token: Token, - _interests: mio::Interest, - ) -> Result<()> { - Ok(()) - } - - fn deregister(&mut self, _registry: &Registry) -> Result<()> { - Ok(()) +impl AsRawFd for X11Producer { + fn as_raw_fd(&self) -> fd::RawFd { + todo!() } } @@ -52,4 +35,8 @@ impl EventProducer for X11Producer { } fn release(&mut self) {} + + fn get_async_fd(&self) -> Result> { + todo!() + } } diff --git a/src/dns.rs b/src/dns.rs index bef5d5e..215f618 100644 --- a/src/dns.rs +++ b/src/dns.rs @@ -1,20 +1,23 @@ use anyhow::Result; use std::{error::Error, net::IpAddr}; -use trust_dns_resolver::Resolver; +use trust_dns_resolver::TokioAsyncResolver; pub(crate) struct DnsResolver { - resolver: Resolver, + resolver: TokioAsyncResolver, } impl DnsResolver { - pub(crate) fn new() -> Result { - let resolver = Resolver::from_system_conf()?; + pub(crate) async fn new() -> Result { + let resolver = TokioAsyncResolver::tokio_from_system_conf()?; Ok(Self { resolver }) } - pub(crate) fn resolve(&self, host: &str) -> Result, Box> { + pub(crate) async fn resolve(&self, host: &str) -> Result, Box> { log::info!("resolving {host} ..."); - let response = self.resolver.lookup_ip(host)?; + let response = self.resolver.lookup_ip(host).await?; + for ip in response.iter() { + log::info!("{host}: adding ip {ip}"); + } Ok(response.iter().collect()) } } diff --git a/src/event/server.rs b/src/event/server.rs index 273f1e5..acf1439 100644 --- a/src/event/server.rs +++ b/src/event/server.rs @@ -1,12 +1,16 @@ use std::{error::Error, io::Result, collections::HashSet, time::{Duration, Instant}, net::IpAddr}; use log; -use mio::{Events, Poll, Interest, Token, net::UdpSocket, event::Source}; -#[cfg(not(windows))] -use mio_signals::{Signals, Signal, SignalSet}; +use tokio::{net::UdpSocket, io::ReadHalf, sync::mpsc::{Sender, Receiver}}; + +#[cfg(unix)] +use tokio::net::UnixStream; + +#[cfg(windows)] +use tokio::net::TcpStream; use std::{net::SocketAddr, io::ErrorKind}; -use crate::{client::{ClientEvent, ClientManager, Position, ClientHandle}, consumer::EventConsumer, producer::EventProducer, frontend::{FrontendEvent, FrontendListener, FrontendNotify}, dns::{self, DnsResolver}}; +use crate::{client::{ClientEvent, ClientManager, Position, ClientHandle}, consumer::EventConsumer, producer::EventProducer, frontend::{FrontendEvent, FrontendListener, FrontendNotify, self}, dns::{self, DnsResolver}}; use super::Event; /// keeps track of state to prevent a feedback loop @@ -18,92 +22,134 @@ enum State { } pub struct Server { - poll: Poll, - socket: UdpSocket, - producer: Box, - consumer: Box, resolver: DnsResolver, - #[cfg(not(windows))] - signals: Signals, - frontend: FrontendListener, client_manager: ClientManager, state: State, - next_token: usize, + frontend: FrontendListener, + consumer: Box, + producer: Box, + socket: UdpSocket, + frontend_rx: Receiver, + frontend_tx: Sender, } -const UDP_RX: Token = Token(0); -const FRONTEND_RX: Token = Token(1); -const PRODUCER_RX: Token = Token(2); -#[cfg(not(windows))] -const SIGNAL: Token = Token(3); - -const MAX_TOKEN: usize = 4; - impl Server { - pub fn new( + pub async fn new( port: u16, - mut producer: Box, + frontend: FrontendListener, consumer: Box, - mut frontend: FrontendListener, + producer: Box, ) -> anyhow::Result { - // bind the udp socket - let listen_addr = SocketAddr::new("0.0.0.0".parse().unwrap(), port); - let mut socket = UdpSocket::bind(listen_addr)?; // create dns resolver - let resolver = dns::DnsResolver::new()?; + let resolver = dns::DnsResolver::new().await?; - // register event sources - let poll = Poll::new()?; - - // hand signal handling over to the event loop - #[cfg(not(windows))] - let mut signals = Signals::new(SignalSet::all())?; - - #[cfg(not(windows))] - poll.registry().register(&mut signals, SIGNAL, Interest::READABLE)?; - poll.registry().register(&mut socket, UDP_RX, Interest::READABLE)?; - poll.registry().register(&mut producer, PRODUCER_RX, Interest::READABLE)?; - poll.registry().register(&mut frontend, FRONTEND_RX, Interest::READABLE)?; + // bind the udp socket + let listen_addr = SocketAddr::new("0.0.0.0".parse().unwrap(), port); + let socket = UdpSocket::bind(listen_addr).await?; + let (frontend_tx, frontend_rx) = tokio::sync::mpsc::channel(1); // create client manager let client_manager = ClientManager::new(); Ok(Server { - poll, socket, consumer, producer, + frontend, + consumer, + producer, resolver, - #[cfg(not(windows))] - signals, frontend, + socket, client_manager, state: State::Receiving, - next_token: MAX_TOKEN, + frontend_rx, + frontend_tx, }) } - pub fn run(&mut self) -> Result<()> { - let mut events = Events::with_capacity(10); + pub async fn run(&mut self) -> Result<()> { + + #[cfg(unix)] + let producer_fd = self.producer.get_async_fd()?; + + #[cfg(unix)] loop { - match self.poll.poll(&mut events, None) { - Ok(()) => (), - Err(e) if e.kind() == ErrorKind::Interrupted => continue, - Err(e) => return Err(e), - } - for event in &events { - if !event.is_readable() { continue } - match event.token() { - UDP_RX => self.handle_udp_rx(), - PRODUCER_RX => self.handle_producer_rx(), - FRONTEND_RX => self.handle_frontend_incoming(), - #[cfg(not(windows))] - SIGNAL => if self.handle_signal() { return Ok(()) }, - _ => if self.handle_frontend_event(event.token()) { return Ok(()) }, + tokio::select! { + udp_event = receive_event(&self.socket) => { + match udp_event { + Ok(e) => self.handle_udp_rx(e).await, + Err(e) => log::error!("error reading event: {e}"), + } + } + read_guard = producer_fd.readable() => { + let mut guard = match read_guard { + Ok(g) => g, + Err(e) => { + log::error!("wayland_fd read_guard: {e}"); + continue + } + }; + self.handle_producer_rx().await; + guard.clear_ready_matching(tokio::io::Ready::READABLE); + } + stream = self.frontend.accept() => { + match stream { + Ok(s) => self.handle_frontend_stream(s).await, + Err(e) => log::error!("error connecting to frontend: {e}"), + } + } + frontend_event = self.frontend_rx.recv() => { + if let Some(event) = frontend_event { + if self.handle_frontend_event(event).await { + break; + } + } } } } + + #[cfg(windows)] + let mut channel = self.producer.get_wait_channel().unwrap(); + + #[cfg(windows)] + loop { + tokio::select! { + udp_event = receive_event(&self.socket) => { + match udp_event { + Ok(e) => self.handle_udp_rx(e).await, + Err(e) => log::error!("error reading event: {e}"), + } + } + event = channel.recv() => { + if let Some((c,e)) = event { + self.handle_producer_event(c,e).await; + } + } + stream = self.frontend.accept() => { + match stream { + Ok(s) => self.handle_frontend_stream(s).await, + Err(e) => log::error!("error connecting to frontend: {e}"), + } + } + frontend_event = self.frontend_rx.recv() => { + if let Some(event) = frontend_event { + if self.handle_frontend_event(event).await { + break; + } + } + } + } + } + + Ok(()) } - pub fn add_client(&mut self, hostname: Option, mut addr: HashSet, port: u16, pos: Position) -> ClientHandle { + pub async fn add_client(&mut self, hostname: Option, mut addr: HashSet, port: u16, pos: Position) -> ClientHandle { let ips = if let Some(hostname) = hostname.as_ref() { - HashSet::from_iter(self.resolver.resolve(hostname.as_str()).ok().iter().flatten().cloned()) + match self.resolver.resolve(hostname.as_str()).await { + Ok(ips) => HashSet::from_iter(ips.iter().cloned()), + Err(e) => { + log::warn!("could not resolve host: {e}"); + HashSet::new() + } + } } else { HashSet::new() }; @@ -112,7 +158,7 @@ impl Server { let client = self.client_manager.add_client(hostname.clone(), addr, port, pos); log::debug!("add_client {client}"); let notify = FrontendNotify::NotifyClientCreate(client, hostname, port, pos); - if let Err(e) = self.frontend.notify_all(notify) { + if let Err(e) = self.frontend.notify_all(notify).await { log::error!("{e}"); }; client @@ -131,13 +177,13 @@ impl Server { } } - pub fn remove_client(&mut self, client: ClientHandle) -> Option { + pub async fn remove_client(&mut self, client: ClientHandle) -> Option { self.producer.notify(ClientEvent::Destroy(client)); self.consumer.notify(ClientEvent::Destroy(client)); if let Some(client) = self.client_manager.remove_client(client).map(|s| s.client.handle) { let notify = FrontendNotify::NotifyClientDelete(client); log::debug!("{notify:?}"); - if let Err(e) = self.frontend.notify_all(notify) { + if let Err(e) = self.frontend.notify_all(notify).await { log::error!("{e}"); } Some(client) @@ -146,7 +192,7 @@ impl Server { } } - pub fn update_client( + pub async fn update_client( &mut self, client: ClientHandle, hostname: Option, @@ -184,89 +230,80 @@ impl Server { state.client.active_addr = None; state.client.hostname = hostname; if let Some(hostname) = state.client.hostname.as_ref() { - if let Ok(ips) = self.resolver.resolve(hostname.as_str()) { - let addrs = ips.iter().map(|i| SocketAddr::new(*i, port)); - state.client.addrs = HashSet::from_iter(addrs); + match self.resolver.resolve(hostname.as_str()).await { + Ok(ips) => { + let addrs = ips.iter().map(|i| SocketAddr::new(*i, port)); + state.client.addrs = HashSet::from_iter(addrs); + } + Err(e) => { + log::warn!("could not resolve host: {e}"); + } } } } log::debug!("client updated: {:?}", state); } - fn handle_udp_rx(&mut self) { - loop { - let (event, addr) = match self.receive_event() { - Ok(e) => e, - Err(e) => { - if e.is::() { - if let ErrorKind::WouldBlock = e.downcast_ref::() - .unwrap() - .kind() { - return - } - } - log::error!("{}", e); - continue - } - }; + async fn handle_udp_rx(&mut self, event: (Event, SocketAddr)) { + let (event, addr) = event; - // get handle for addr - let handle = match self.client_manager.get_client(addr) { - Some(a) => a, - None => { - log::warn!("ignoring event from client {addr:?}"); - continue - } - }; - log::trace!("{:20} <-<-<-<------ {addr} ({handle})", event.to_string()); - let state = match self.client_manager.get_mut(handle) { - Some(s) => s, - None => { - log::error!("unknown handle"); - continue - } - }; + // get handle for addr + let handle = match self.client_manager.get_client(addr) { + Some(a) => a, + None => { + log::warn!("ignoring event from client {addr:?}"); + return; + } + }; - // reset ttl for client and - state.last_seen = Some(Instant::now()); - // set addr as new default for this client - state.client.active_addr = Some(addr); - match (event, addr) { - (Event::Pong(), _) => {}, - (Event::Ping(), addr) => { - if let Err(e) = Self::send_event(&self.socket, Event::Pong(), addr) { - log::error!("udp send: {}", e); - } - // we release the mouse here, - // since its very likely, that we wont get a release event - self.producer.release(); - } - (event, addr) => match self.state { - State::Sending => { - // in sending state, we dont want to process - // any events to avoid feedback loops, - // therefore we tell the event producer - // to release the pointer and move on - // first event -> release pointer - if let Event::Release() = event { - log::debug!("releasing pointer ..."); - self.producer.release(); - self.state = State::Receiving; - } - } - State::Receiving => { - // consume event - self.consumer.consume(event, handle); + log::trace!("{:20} <-<-<-<------ {addr} ({handle})", event.to_string()); + let state = match self.client_manager.get_mut(handle) { + Some(s) => s, + None => { + log::error!("unknown handle"); + return; + } + }; - // let the server know we are still alive once every second - let last_replied = state.last_replied; - if last_replied.is_none() - || last_replied.is_some() - && last_replied.unwrap().elapsed() > Duration::from_secs(1) { - state.last_replied = Some(Instant::now()); - if let Err(e) = Self::send_event(&self.socket, Event::Pong(), addr) { - log::error!("udp send: {}", e); - } + // reset ttl for client and + state.last_seen = Some(Instant::now()); + // set addr as new default for this client + state.client.active_addr = Some(addr); + match (event, addr) { + (Event::Pong(), _) => {}, + (Event::Ping(), addr) => { + if let Err(e) = send_event(&self.socket, Event::Pong(), addr).await { + log::error!("udp send: {}", e); + } + // we release the mouse here, + // since its very likely, that we wont get a release event + self.producer.release(); + } + (event, addr) => match self.state { + State::Sending => { + // in sending state, we dont want to process + // any events to avoid feedback loops, + // therefore we tell the event producer + // to release the pointer and move on + // first event -> release pointer + if let Event::Release() = event { + log::debug!("releasing pointer ..."); + self.producer.release(); + self.state = State::Receiving; + } + } + State::Receiving => { + // consume event + self.consumer.consume(event, handle); + + // let the server know we are still alive once every second + let last_replied = state.last_replied; + if last_replied.is_none() + || last_replied.is_some() + && last_replied.unwrap().elapsed() > Duration::from_secs(1) { + state.last_replied = Some(Instant::now()); + if let Err(e) = send_event(&self.socket, Event::Pong(), addr).await { + log::error!("udp send: {}", e); } } } @@ -274,66 +311,71 @@ impl Server { } } - fn handle_producer_rx(&mut self) { - let events = self.producer.read_events(); - let mut should_release = false; - for (c, e) in events.into_iter() { - // in receiving state, only release events - // must be transmitted - if let Event::Release() = e { - self.state = State::Sending; - } + #[cfg(unix)] + async fn handle_producer_rx(&mut self) { + let events: Vec<(ClientHandle, Event)> = self.producer.read_events().collect(); + for (c,e) in events.into_iter() { + self.handle_producer_event(c,e).await; + } + } - log::trace!("producer: ({c}) {e:?}"); - let state = match self.client_manager.get_mut(c) { - Some(state) => state, - None => { - log::warn!("unknown client!"); - continue - } - }; - // otherwise we should have an address to send to - // transmit events to the corrensponding client - if let Some(addr) = state.client.active_addr { - if let Err(e) = Self::send_event(&self.socket, e, addr) { + async fn handle_producer_event(&mut self, c: ClientHandle, e: Event) { + let mut should_release = false; + // in receiving state, only release events + // must be transmitted + if let Event::Release() = e { + self.state = State::Sending; + } + + log::trace!("producer: ({c}) {e:?}"); + let state = match self.client_manager.get_mut(c) { + Some(state) => state, + None => { + log::warn!("unknown client!"); + return + } + }; + // otherwise we should have an address to send to + // transmit events to the corrensponding client + if let Some(addr) = state.client.active_addr { + if let Err(e) = send_event(&self.socket, e, addr).await { + log::error!("udp send: {}", e); + } + } + + // if client last responded > 2 seconds ago + // and we have not sent a ping since 500 milliseconds, + // send a ping + if state.last_seen.is_some() + && state.last_seen.unwrap().elapsed() < Duration::from_secs(2) { + return + } + + // client last seen > 500ms ago + if state.last_ping.is_some() + && state.last_ping.unwrap().elapsed() < Duration::from_millis(500) { + return + } + + // release mouse if client didnt respond to the first ping + if state.last_ping.is_some() + && state.last_ping.unwrap().elapsed() < Duration::from_secs(1) { + should_release = true; + } + + // last ping > 500ms ago -> ping all interfaces + state.last_ping = Some(Instant::now()); + for addr in state.client.addrs.iter() { + log::debug!("pinging {addr}"); + if let Err(e) = send_event(&self.socket, Event::Ping(), *addr).await { + if e.kind() != ErrorKind::WouldBlock { log::error!("udp send: {}", e); } } - - // if client last responded > 2 seconds ago - // and we have not sent a ping since 500 milliseconds, - // send a ping - if state.last_seen.is_some() - && state.last_seen.unwrap().elapsed() < Duration::from_secs(2) { - continue - } - - // client last seen > 500ms ago - if state.last_ping.is_some() - && state.last_ping.unwrap().elapsed() < Duration::from_millis(500) { - continue - } - - // release mouse if client didnt respond to the first ping - if state.last_ping.is_some() - && state.last_ping.unwrap().elapsed() < Duration::from_secs(1) { - should_release = true; - } - - // last ping > 500ms ago -> ping all interfaces - state.last_ping = Some(Instant::now()); - for addr in state.client.addrs.iter() { - log::debug!("pinging {addr}"); - if let Err(e) = Self::send_event(&self.socket, Event::Ping(), *addr) { - if e.kind() != ErrorKind::WouldBlock { - log::error!("udp send: {}", e); - } - } - // send additional release event, in case client is still in sending mode - if let Err(e) = Self::send_event(&self.socket, Event::Release(), *addr) { - if e.kind() != ErrorKind::WouldBlock { - log::error!("udp send: {}", e); - } + // send additional release event, in case client is still in sending mode + if let Err(e) = send_event(&self.socket, Event::Release(), *addr).await { + if e.kind() != ErrorKind::WouldBlock { + log::error!("udp send: {}", e); } } } @@ -343,123 +385,76 @@ impl Server { self.producer.release(); self.state = State::Receiving; } - } - fn handle_frontend_incoming(&mut self) { - loop { - let token = self.fresh_token(); - let poll = &mut self.poll; - match self.frontend.handle_incoming(|s, i| { - poll.registry().register(s, token, i)?; - Ok(token) - }) { - Err(e) if e.kind() == ErrorKind::WouldBlock => break, - Err(e) if e.kind() == ErrorKind::Interrupted => continue, - Err(e) => { - log::error!("{e}"); - break - } - _ => continue, - } - } - // notify new frontend connections of current clients - self.enumerate(); - } - - fn handle_frontend_event(&mut self, token: Token) -> bool { - loop { - let event = match self.frontend.read_event(token) { - Ok(event) => event, - Err(e) if e.kind() == ErrorKind::WouldBlock => return false, - Err(e) if e.kind() == ErrorKind::Interrupted => continue, - Err(e) => { - log::error!("{e}"); - return false; - } - }; - if let Some(event) = event { - log::debug!("frontend: {event:?}"); - match event { - FrontendEvent::AddClient(hostname, port, pos) => { - self.add_client(hostname, HashSet::new(), port, pos); - } - FrontendEvent::ActivateClient(client, active) => { - self.activate_client(client, active); - } - FrontendEvent::DelClient(client) => { - self.remove_client(client); - } - FrontendEvent::UpdateClient(client, hostname, port, pos) => { - self.update_client(client, hostname, port, pos); - } - FrontendEvent::Enumerate() => self.enumerate(), - FrontendEvent::Shutdown() => { - log::info!("terminating gracefully..."); - return true; - }, + #[cfg(unix)] + async fn handle_frontend_stream(&mut self, mut stream: ReadHalf) { + let tx = self.frontend_tx.clone(); + tokio::task::spawn_local(async move { + loop { + let event = frontend::read_event(&mut stream).await; + match event { + Ok(event) => tx.send(event).await.unwrap(), + Err(e) => log::error!("error reading frontend event: {e}"), } } - } + }); + self.enumerate().await; } - fn enumerate(&mut self) { + #[cfg(windows)] + async fn handle_frontend_stream(&mut self, mut stream: ReadHalf) { + let tx = self.frontend_tx.clone(); + tokio::task::spawn_local(async move { + loop { + let event = frontend::read_event(&mut stream).await; + match event { + Ok(event) => tx.send(event).await.unwrap(), + Err(e) => log::error!("error reading frontend event: {e}"), + } + } + }); + self.enumerate().await; + } + + async fn handle_frontend_event(&mut self, event: FrontendEvent) -> bool { + log::debug!("frontend: {event:?}"); + match event { + FrontendEvent::AddClient(hostname, port, pos) => { self.add_client(hostname, HashSet::new(), port, pos).await; }, + FrontendEvent::ActivateClient(client, active) => self.activate_client(client, active), + FrontendEvent::DelClient(client) => { self.remove_client(client).await; }, + FrontendEvent::UpdateClient(client, hostname, port, pos) => self.update_client(client, hostname, port, pos).await, + FrontendEvent::Enumerate() => self.enumerate().await, + FrontendEvent::Shutdown() => { + log::info!("terminating gracefully..."); + return true; + }, + } + false + } + + async fn enumerate(&mut self) { let clients = self.client_manager.enumerate(); - if let Err(e) = self.frontend.notify_all(FrontendNotify::Enumerate(clients)) { + if let Err(e) = self.frontend.notify_all(FrontendNotify::Enumerate(clients)).await { log::error!("{e}"); } } +} - #[cfg(not(windows))] - fn handle_signal(&mut self) -> bool { - #[cfg(windows)] - return false; - #[cfg(not(windows))] - loop { - match self.signals.receive() { - Err(e) if e.kind() == ErrorKind::WouldBlock => return false, - Err(e) => { - log::error!("error reading signal: {e}"); - return false; - } - Ok(Some(Signal::Interrupt) | Some(Signal::Terminate)) => { - // terminate on SIG_INT or SIG_TERM - log::info!("terminating gracefully..."); - return true; - }, - Ok(Some(signal)) => { - log::info!("ignoring signal {signal:?}"); - }, - Ok(None) => return false, - } - } - } - - fn send_event(sock: &UdpSocket, e: Event, addr: SocketAddr) -> Result { - log::trace!("{:20} ------>->->-> {addr}", e.to_string()); - let data: Vec = (&e).into(); - // We are currently abusing a blocking send to get the lowest possible latency. - // It may be better to set the socket to non-blocking and only send when ready. - sock.send_to(&data[..], addr) - } - - fn receive_event(&self) -> std::result::Result<(Event, SocketAddr), Box> { - let mut buf = vec![0u8; 22]; - match self.socket.recv_from(&mut buf) { - Ok((_amt, src)) => Ok((Event::try_from(buf)?, src)), - Err(e) => Err(Box::new(e)), - } - } - - fn fresh_token(&mut self) -> Token { - let token = self.next_token as usize; - self.next_token += 1; - Token(token) - } - - pub fn register_frontend(&mut self, source: &mut dyn Source, interests: Interest) -> Result { - let token = self.fresh_token(); - self.poll.registry().register(source, token, interests)?; - Ok(token) +async fn receive_event(socket: &UdpSocket) -> std::result::Result<(Event, SocketAddr), Box> { + let mut buf = vec![0u8; 22]; + match socket.recv_from(&mut buf).await { + Ok((_amt, src)) => Ok((Event::try_from(buf)?, src)), + Err(e) => Err(Box::new(e)), } } + + +async fn send_event(sock: &UdpSocket, e: Event, addr: SocketAddr) -> Result { + log::trace!("{:20} ------>->->-> {addr}", e.to_string()); + let data: Vec = (&e).into(); + // We are currently abusing a blocking send to get the lowest possible latency. + // It may be better to set the socket to non-blocking and only send when ready. + sock.send_to(&data[..], addr).await +} + diff --git a/src/frontend.rs b/src/frontend.rs index 323c67c..e08f27a 100644 --- a/src/frontend.rs +++ b/src/frontend.rs @@ -1,21 +1,20 @@ -use std::collections::HashMap; -use std::io::{Read, Result, Write}; +use std::io::Result; use std::str; #[cfg(unix)] use std::{env, path::{Path, PathBuf}}; -use mio::Interest; -use mio::{Registry, Token, event::Source}; +use tokio::io::{AsyncReadExt, WriteHalf, AsyncWriteExt}; +use tokio::io::ReadHalf; #[cfg(unix)] -use mio::net::UnixStream; +use tokio::net::UnixStream; #[cfg(unix)] -use mio::net::UnixListener; +use tokio::net::UnixListener; #[cfg(windows)] -use mio::net::TcpStream; +use tokio::net::TcpStream; #[cfg(windows)] -use mio::net::TcpListener; +use tokio::net::TcpListener; use serde::{Serialize, Deserialize}; @@ -60,11 +59,14 @@ pub struct FrontendListener { listener: UnixListener, #[cfg(unix)] socket_path: PathBuf, - frontend_connections: HashMap, + #[cfg(unix)] + tx_streams: Vec>, + #[cfg(windows)] + tx_streams: Vec>, } impl FrontendListener { - pub fn new() -> std::result::Result> { + pub async fn new() -> std::result::Result> { #[cfg(unix)] let socket_path = Path::new(env::var("XDG_RUNTIME_DIR")?.as_str()).join("lan-mouse-socket.sock"); #[cfg(unix)] @@ -77,86 +79,53 @@ impl FrontendListener { let listener = UnixListener::bind(&socket_path)?; #[cfg(windows)] - let listener = TcpListener::bind("127.0.0.1:5252".parse().unwrap())?; // abuse tcp + let listener = TcpListener::bind("127.0.0.1:5252").await?; // abuse tcp let adapter = Self { listener, #[cfg(unix)] socket_path, - frontend_connections: HashMap::new(), + tx_streams: vec![], }; Ok(adapter) } #[cfg(unix)] - pub fn handle_incoming(&mut self, register_frontend: F) -> Result<()> - where F: Fn(&mut UnixStream, Interest) -> Result { - let (mut stream, _) = self.listener.accept()?; - let token = register_frontend(&mut stream, Interest::READABLE)?; - let con = FrontendConnection::new(stream); - self.frontend_connections.insert(token, con); - Ok(()) + pub async fn accept(&mut self) -> Result> { + + let stream = self.listener.accept().await?.0; + let (rx, tx) = tokio::io::split(stream); + self.tx_streams.push(tx); + Ok(rx) } #[cfg(windows)] - pub fn handle_incoming(&mut self, register_frontend: F) -> Result<()> - where F: Fn(&mut TcpStream, Interest) -> Result { - let (mut stream, _) = self.listener.accept()?; - let token = register_frontend(&mut stream, Interest::READABLE)?; - let con = FrontendConnection::new(stream); - self.frontend_connections.insert(token, con); - Ok(()) + pub async fn accept(&mut self) -> Result> { + let stream = self.listener.accept().await?.0; + let (rx, tx) = tokio::io::split(stream); + self.tx_streams.push(tx); + Ok(rx) } - pub fn read_event(&mut self, token: Token) -> Result> { - if let Some(con) = self.frontend_connections.get_mut(&token) { - con.handle_event() - } else { - panic!("unknown token"); - } - } - pub(crate) fn notify_all(&mut self, notify: FrontendNotify) -> Result<()> { + pub(crate) async fn notify_all(&mut self, notify: FrontendNotify) -> Result<()> { // encode event let json = serde_json::to_string(¬ify).unwrap(); let payload = json.as_bytes(); - let len = payload.len().to_ne_bytes(); + let len = payload.len().to_be_bytes(); log::debug!("json: {json}, len: {}", payload.len()); - for con in self.frontend_connections.values_mut() { + // TODO do simultaneously + for tx in self.tx_streams.iter_mut() { // write len + payload - con.stream.write(&len)?; - con.stream.write(payload)?; + tx.write(&len).await?; + tx.write(payload).await?; } Ok(()) } } -impl Source for FrontendListener { - fn register( - &mut self, - registry: &Registry, - token: Token, - interests: mio::Interest, - ) -> Result<()> { - self.listener.register(registry, token, interests) - } - - fn reregister( - &mut self, - registry: &Registry, - token: Token, - interests: mio::Interest, - ) -> Result<()> { - self.listener.reregister(registry, token, interests) - } - - fn deregister(&mut self, registry: &Registry) -> Result<()> { - self.listener.deregister(registry) - } -} - #[cfg(unix)] impl Drop for FrontendListener { fn drop(&mut self) { @@ -165,72 +134,20 @@ impl Drop for FrontendListener { } } -enum ReceiveState { - Len, Data, +#[cfg(unix)] +pub async fn read_event(stream: &mut ReadHalf) -> Result { + let len = stream.read_u64().await?; + assert!(len <= 256); + let mut buf = [0u8; 256]; + stream.read_exact(&mut buf[..len as usize]).await?; + Ok(serde_json::from_slice(&buf[..len as usize])?) } -pub struct FrontendConnection { - #[cfg(unix)] - stream: UnixStream, - #[cfg(windows)] - stream: TcpStream, - state: ReceiveState, - len: usize, - len_buf: [u8; std::mem::size_of::()], - recieve_buf: [u8; 256], // FIXME - pos: usize, +#[cfg(windows)] +pub async fn read_event(stream: &mut ReadHalf) -> Result { + let len = stream.read_u64().await?; + let mut buf = [0u8; 256]; + stream.read_exact(&mut buf[..len as usize]).await?; + Ok(serde_json::from_slice(&buf[..len as usize])?) } -impl FrontendConnection { - #[cfg(unix)] - pub fn new(stream: UnixStream) -> Self { - Self { - stream, - state: ReceiveState::Len, - len: 0, - len_buf: [0u8; std::mem::size_of::()], - recieve_buf: [0u8; 256], - pos: 0, - } - } - - #[cfg(windows)] - pub fn new(stream: TcpStream) -> Self { - Self { - stream, - state: ReceiveState::Len, - len: 0, - len_buf: [0u8; std::mem::size_of::()], - recieve_buf: [0u8; 256], - pos: 0, - } - } - - pub fn handle_event(&mut self) -> Result> { - match self.state { - ReceiveState::Len => { - // we receive sizeof(usize) Bytes - let n = self.stream.read(&mut self.len_buf)?; - self.pos += n; - if self.pos == self.len_buf.len() { - self.state = ReceiveState::Data; - self.len = usize::from_ne_bytes(self.len_buf); - self.pos = 0; - } - Ok(None) - }, - ReceiveState::Data => { - // read at most as many bytes as the length of the next event - let n = self.stream.read(&mut self.recieve_buf[..self.len])?; - self.pos += n; - if n == self.len { - self.state = ReceiveState::Len; - self.pos = 0; - Ok(Some(serde_json::from_slice(&self.recieve_buf[..self.len])?)) - } else { - Ok(None) - } - } - } - } -} diff --git a/src/frontend/cli.rs b/src/frontend/cli.rs index 383ad0a..b2f5ebe 100644 --- a/src/frontend/cli.rs +++ b/src/frontend/cli.rs @@ -42,7 +42,7 @@ pub fn start() -> Result<(JoinHandle<()>, JoinHandle<()>)> { for event in events.iter() { let json = serde_json::to_string(&event).unwrap(); let bytes = json.as_bytes(); - let len = bytes.len().to_ne_bytes(); + let len = bytes.len().to_be_bytes(); if let Err(e) = tx.write(&len) { log::error!("error sending message: {e}"); }; @@ -77,7 +77,7 @@ pub fn start() -> Result<(JoinHandle<()>, JoinHandle<()>)> { Err(e) if e.kind() == ErrorKind::UnexpectedEof => break, Err(e) => break log::error!("{e}"), }; - let len = usize::from_ne_bytes(len); + let len = usize::from_be_bytes(len); // read payload let mut buf: Vec = vec![0u8; len]; diff --git a/src/frontend/gtk.rs b/src/frontend/gtk.rs index 9154bc8..8b498a4 100644 --- a/src/frontend/gtk.rs +++ b/src/frontend/gtk.rs @@ -15,6 +15,7 @@ use self::client_object::ClientObject; use super::FrontendNotify; pub fn start() -> Result> { + log::debug!("starting gtk frontend"); thread::Builder::new() .name("gtk-thread".into()) .spawn(gtk_main) @@ -58,6 +59,7 @@ fn build_ui(app: &Application) { process::exit(1); } }; + log::debug!("connecting to lan-mouse-socket ... "); let socket_path = Path::new(xdg_runtime_dir.as_str()) .join("lan-mouse-socket.sock"); let Ok(mut rx) = UnixStream::connect(&socket_path) else { @@ -71,6 +73,7 @@ fn build_ui(app: &Application) { process::exit(1); } }; + log::debug!("connected to lan-mouse-socket"); let (sender, receiver) = MainContext::channel::(Priority::default()); @@ -83,7 +86,7 @@ fn build_ui(app: &Application) { Err(e) if e.kind() == ErrorKind::UnexpectedEof => break Ok(()), Err(e) => break Err(e), }; - let len = usize::from_ne_bytes(len); + let len = usize::from_be_bytes(len); // read payload let mut buf = vec![0u8; len]; diff --git a/src/frontend/gtk/window.rs b/src/frontend/gtk/window.rs index fb80acc..4d8b7ee 100644 --- a/src/frontend/gtk/window.rs +++ b/src/frontend/gtk/window.rs @@ -141,7 +141,7 @@ impl Window { let mut stream = self.imp().stream.borrow_mut(); let stream = stream.as_mut().unwrap(); let bytes = json.as_bytes(); - let len = bytes.len().to_ne_bytes(); + let len = bytes.len().to_be_bytes(); if let Err(e) = stream.write(&len) { log::error!("error sending message: {e}"); }; diff --git a/src/main.rs b/src/main.rs index 23b317e..54e0b58 100644 --- a/src/main.rs +++ b/src/main.rs @@ -9,6 +9,7 @@ use lan_mouse::{ #[cfg(all(unix, feature = "gtk"))] use lan_mouse::frontend::gtk; +use tokio::task::LocalSet; pub fn main() { @@ -30,29 +31,39 @@ pub fn run() -> Result<(), Box> { let producer = producer::create()?; let consumer = consumer::create()?; - // create frontend communication adapter - let frontend_adapter = FrontendListener::new()?; + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_io() + .enable_time() + .build()?; - // start sending and receiving events - let mut event_server = Server::new(config.port, producer, consumer, frontend_adapter)?; + // run async event loop + runtime.block_on(LocalSet::new().run_until(async { + // create frontend communication adapter + let frontend_adapter = FrontendListener::new().await?; - // any threads need to be started after event_server sets up signal handling - match config.frontend { - #[cfg(all(unix, feature = "gtk"))] - Gtk => { gtk::start()?; } - #[cfg(any(not(feature = "gtk"), not(unix)))] - Gtk => panic!("gtk frontend requested but feature not enabled!"), - Cli => { cli::start()?; } - }; + // start frontend + match config.frontend { + #[cfg(all(unix, feature = "gtk"))] + Gtk => { gtk::start()?; } + #[cfg(any(not(feature = "gtk"), not(unix)))] + Gtk => panic!("gtk frontend requested but feature not enabled!"), + Cli => { cli::start()?; } + }; - // add clients from config - config.get_clients().into_iter().for_each(|(c, h, port, p)| { - event_server.add_client(h, c, port, p); - }); + // start sending and receiving events + let mut event_server = Server::new(config.port, frontend_adapter, consumer, producer).await?; - log::info!("Press Ctrl+Alt+Shift+Super to release the mouse"); - // run event loop - event_server.run()?; + // add clients from config + for (c,h,port,p) in config.get_clients().into_iter() { + event_server.add_client(h, c, port, p).await; + } + + log::info!("Press Ctrl+Alt+Shift+Super to release the mouse"); + // run event loop + event_server.run().await?; + Result::<_, Box>::Ok(()) + }))?; + log::debug!("exiting main"); Ok(()) } diff --git a/src/producer.rs b/src/producer.rs index 434cc33..ed6fcd8 100644 --- a/src/producer.rs +++ b/src/producer.rs @@ -1,5 +1,11 @@ -use mio::event::Source; -use std::{error::Error, vec::Drain}; +use std::error::Error; + +#[cfg(unix)] +use std::{io, os::fd::RawFd, vec::Drain}; + +#[cfg(unix)] +use tokio::io::unix::AsyncFd; + use crate::{client::{ClientHandle, ClientEvent}, event::Event}; use crate::backend::producer; @@ -49,15 +55,24 @@ pub fn create() -> Result, Box> { } } -pub trait EventProducer: Source { +pub trait EventProducer { + /// notify event producer of configuration changes fn notify(&mut self, event: ClientEvent); + /// release mouse + fn release(&mut self); + + /// unix only + #[cfg(unix)] + fn get_async_fd(&self) -> io::Result>; + /// read an event /// this function must be invoked to retrieve an Event after /// the eventfd indicates a pending Event + #[cfg(unix)] fn read_events(&mut self) -> Drain<(ClientHandle, Event)>; - /// release mouse - fn release(&mut self); + #[cfg(not(unix))] + fn get_wait_channel(&mut self) -> Option>; }