diff --git a/src/client.rs b/src/client.rs index 2f1f8ec..10e6907 100644 --- a/src/client.rs +++ b/src/client.rs @@ -51,6 +51,8 @@ impl Display for Position { pub struct Client { /// hostname of this client pub hostname: Option, + /// fix ips, determined by the user + pub fix_ips: Vec, /// unique handle to refer to the client. /// This way any event consumer / producer backend does not /// need to know anything about a client other than its handle. @@ -105,7 +107,7 @@ impl ClientManager { pub fn add_client( &mut self, hostname: Option, - addrs: HashSet, + ips: HashSet, port: u16, pos: Position, ) -> ClientHandle { @@ -115,12 +117,16 @@ impl ClientManager { // we dont know, which IP is initially active let active_addr = None; + // store fix ip addresses + let fix_ips = ips.iter().cloned().collect(); + // map ip addresses to socket addresses - let addrs = HashSet::from_iter(addrs.into_iter().map(|ip| SocketAddr::new(ip, port))); + let addrs = HashSet::from_iter(ips.into_iter().map(|ip| SocketAddr::new(ip, port))); // store the client let client = Client { hostname, + fix_ips, handle, active_addr, addrs, diff --git a/src/dns.rs b/src/dns.rs index 215f618..bcedff4 100644 --- a/src/dns.rs +++ b/src/dns.rs @@ -3,7 +3,7 @@ use std::{error::Error, net::IpAddr}; use trust_dns_resolver::TokioAsyncResolver; -pub(crate) struct DnsResolver { +pub struct DnsResolver { resolver: TokioAsyncResolver, } impl DnsResolver { diff --git a/src/main.rs b/src/main.rs index f03b119..fef1e63 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,15 +2,9 @@ use anyhow::Result; use std::process::{self, Child, Command}; use env_logger::Env; -use lan_mouse::{ - config::Config, - consumer, - frontend::{self, FrontendListener}, - producer, - server::Server, -}; +use lan_mouse::{config::Config, frontend, server::Server}; -use tokio::{join, task::LocalSet}; +use tokio::task::LocalSet; pub fn main() { // init logging @@ -58,26 +52,10 @@ fn run_service(config: &Config) -> Result<()> { // run async event loop runtime.block_on(LocalSet::new().run_until(async { - // create frontend communication adapter - let frontend_adapter = match FrontendListener::new().await { - Some(Err(e)) => return Err(e), - Some(Ok(f)) => f, - None => { - // none means some other instance is already running - log::info!("service already running, exiting"); - return anyhow::Ok(()); - } - }; - - // create event producer and consumer - let (producer, consumer) = join!(producer::create(), consumer::create()); - - // create server - let mut event_server = Server::new(config, frontend_adapter, consumer, producer).await?; + // run main loop log::info!("Press Ctrl+Alt+Shift+Super to release the mouse"); + Server::run(config).await?; - // run event loop - event_server.run().await?; log::debug!("service exiting"); anyhow::Ok(()) }))?; diff --git a/src/server.rs b/src/server.rs index 7541d2c..411455d 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,18 +1,16 @@ +use anyhow::anyhow; use futures::stream::StreamExt; use log; use std::{ + cell::{Cell, RefCell}, collections::HashSet, error::Error, io::Result, net::IpAddr, + rc::Rc, time::{Duration, Instant}, }; -use tokio::{ - io::ReadHalf, - net::UdpSocket, - signal, - sync::mpsc::{Receiver, Sender}, -}; +use tokio::{io::ReadHalf, net::UdpSocket, signal, sync::mpsc::Sender, task}; #[cfg(unix)] use tokio::net::UnixStream; @@ -22,321 +20,537 @@ use tokio::net::TcpStream; use std::{io::ErrorKind, net::SocketAddr}; -use crate::event::{Event, KeyboardEvent}; use crate::{ client::{ClientEvent, ClientHandle, ClientManager, Position}, config::Config, consumer::EventConsumer, - dns::{self, DnsResolver}, + dns, frontend::{self, FrontendEvent, FrontendListener, FrontendNotify}, producer::EventProducer, }; +use crate::{ + consumer, + event::{Event, KeyboardEvent}, + producer, +}; -#[derive(Debug, Eq, PartialEq)] +#[derive(Clone, Copy, Debug, Eq, PartialEq)] enum State { Sending, Receiving, AwaitingLeave, } -pub struct Server { - resolver: DnsResolver, - client_manager: ClientManager, - state: State, - frontend: FrontendListener, - consumer: Box, - producer: Box, - socket: UdpSocket, - frontend_rx: Receiver, - frontend_tx: Sender, - last_ignored: Option, +pub enum ProducerEvent { + Release, + ClientEvent(ClientEvent), } +pub enum ConsumerEvent { + ClientEvent(ClientEvent), +} + +#[derive(Clone)] +struct ClientUpdate { + client: ClientHandle, + hostname: Option, + port: u16, + pos: Position, +} + +pub struct Server {} + impl Server { - pub async fn new( - config: &Config, - frontend: FrontendListener, - consumer: Box, - producer: Box, - ) -> anyhow::Result { + pub async fn run(config: &Config) -> anyhow::Result<()> { + // create frontend communication adapter + let mut frontend = match FrontendListener::new().await { + Some(Err(e)) => return Err(e), + Some(Ok(f)) => f, + None => { + // none means some other instance is already running + log::info!("service already running, exiting"); + return anyhow::Ok(()); + } + }; + let (mut consumer, mut producer) = tokio::join!(consumer::create(), producer::create()); + // create dns resolver let resolver = dns::DnsResolver::new().await?; // bind the udp socket let listen_addr = SocketAddr::new("0.0.0.0".parse().unwrap(), config.port); - let socket = UdpSocket::bind(listen_addr).await?; - let (frontend_tx, frontend_rx) = tokio::sync::mpsc::channel(1); + let mut socket = UdpSocket::bind(listen_addr).await?; + let (frontend_tx, mut frontend_rx) = tokio::sync::mpsc::channel(1); // create client manager - let client_manager = ClientManager::new(); - let mut server = Server { - frontend, - consumer, - producer, - resolver, - socket, - client_manager, - state: State::Receiving, - frontend_rx, - frontend_tx, - last_ignored: None, - }; + let client_manager_rc = Rc::new(RefCell::new(ClientManager::new())); + + let state_rc = Rc::new(Cell::new(State::Receiving)); + + // channel to notify producer + let (producer_notify_tx, mut producer_notify_rx) = tokio::sync::mpsc::channel(32); + + // channel to notify consumer + let (consumer_notify_tx, mut consumer_notify_rx) = tokio::sync::mpsc::channel(32); + + // channel to request dns resolver + let (resolve_tx, mut resolve_rx) = tokio::sync::mpsc::channel(32); + + // channel to send events to frontends + let (frontend_notify_tx, mut frontend_notify_rx) = tokio::sync::mpsc::channel(32); + + // channels for udp send / receive + let (receiver_tx, mut receiver_rx) = tokio::sync::mpsc::channel(32); + let (sender_tx, mut sender_rx) = tokio::sync::mpsc::channel(32); + let (port_tx, mut port_rx) = tokio::sync::mpsc::channel(32); // add clients from config for (c, h, port, p) in config.get_clients().into_iter() { - server.add_client(h, c, port, p).await; + Self::add_client( + &resolve_tx, + &client_manager_rc, + &mut frontend, + h, + c, + port, + p, + ) + .await; } - Ok(server) - } - - pub async fn run(&mut self) -> anyhow::Result<()> { - loop { - log::trace!("polling..."); - tokio::select! { biased; - // safety: cancellation safe - res = self.producer.next() => { - match res { - Some(Ok((client, event))) => { - self.handle_producer_event(client,event).await; - }, - Some(Err(e)) => return Err(e.into()), - _ => break, + // event producer + let client_manager = client_manager_rc.clone(); + let state = state_rc.clone(); + let sender_ch = sender_tx.clone(); + let producer_task = tokio::task::spawn_local(async move { + loop { + tokio::select! { + e = producer.next() => { + let (client, event) = match e { + Some(e) => e?, + None => return Err::<(), anyhow::Error>(anyhow!("event producer closed")), + }; + Self::handle_producer_event(&mut producer, &client_manager, &state, &sender_ch, client, event).await; } - } - // safety: cancellation safe - udp_event = receive_event(&self.socket) => { - match udp_event { - Ok(e) => self.handle_udp_rx(e).await, - Err(e) => log::error!("error reading event: {e}"), - } - } - // safety: cancellation safe - stream = self.frontend.accept() => { - match stream { - Ok(s) => self.handle_frontend_stream(s).await, - Err(e) => log::error!("error connecting to frontend: {e}"), - } - } - // safety: cancellation safe - frontend_event = self.frontend_rx.recv() => { - if let Some(event) = frontend_event { - if self.handle_frontend_event(event).await { - break; + e = producer_notify_rx.recv() => { + match e { + Some(e) => match e { + ProducerEvent::Release => producer.release(), + ProducerEvent::ClientEvent(e) => producer.notify(e), + }, + None => break Ok(()), } } } - // safety: cancellation safe - e = self.consumer.dispatch() => { - e?; - } - // safety: cancellation safe - _ = signal::ctrl_c() => { - log::info!("terminating gracefully ..."); - break; + } + }); + + // event consumer + let client_manager = client_manager_rc.clone(); + let state = state_rc.clone(); + let producer_notify = producer_notify_tx.clone(); + let receiver_task = tokio::task::spawn_local(async move { + let mut last_ignored = None; + + loop { + tokio::select! { + udp_event = receiver_rx.recv() => { + let udp_event = match udp_event { + Some(Ok(e)) => e, + Some(Err(e)) => return Err::<(), anyhow::Error>(anyhow!("{}", e)), + None => return Err::<(), anyhow::Error>(anyhow!("receiver closed")), + }; + Self::handle_udp_rx(&client_manager, &producer_notify, &mut consumer, &sender_tx, &state, &mut last_ignored, udp_event).await; + } + consumer_event = consumer_notify_rx.recv() => { + match consumer_event { + Some(e) => match e { + ConsumerEvent::ClientEvent(e) => consumer.notify(e).await, + }, + None => break, + } + } + _ = consumer.dispatch() => { } } } - } + // destroy consumer + consumer.destroy().await; + Ok(()) + }); - // destroy consumer - self.consumer.destroy().await; + // frontend listener + let client_manager = client_manager_rc.clone(); + let frontend_task = tokio::task::spawn_local(async move { + loop { + tokio::select! { + stream = frontend.accept() => { + let stream = match stream { + Ok(s) => s, + Err(e) => { + log::warn!("error accepting frontend connection: {e}"); + continue; + } + }; + Self::handle_frontend_stream(&client_manager, &mut frontend, &frontend_tx, stream).await; + } + event = frontend_rx.recv() => { + let frontend_event = match event { + Some(e) => e, + None => return Err::<(), anyhow::Error>(anyhow!("frontend channel closed")), + }; + Self::handle_frontend_event(&producer_notify_tx, &consumer_notify_tx, &client_manager, &resolve_tx, &mut frontend, &port_tx, frontend_event).await; + } + notify = frontend_notify_rx.recv() => { + let notify = match notify { + Some(n) => n, + None => return Err::<(), anyhow::Error>(anyhow!("frontend notify closed")), + }; + let _ = frontend.notify_all(notify).await; + } + } + } + }); + + // dns resolver + let client_manager = client_manager_rc.clone(); + let resolver_task = tokio::task::spawn_local(async move { + loop { + let (host, client): (String, ClientHandle) = match resolve_rx.recv().await { + Some(r) => r, + None => break, + }; + let ips = match resolver.resolve(&host).await { + Ok(ips) => ips, + Err(e) => { + log::warn!("could not resolve host '{host}': {e}"); + continue; + } + }; + if let Some(state) = client_manager.borrow_mut().get_mut(client) { + let port = state.client.port; + let mut addrs = HashSet::from_iter( + state + .client + .fix_ips + .iter() + .map(|a| SocketAddr::new(*a, port)), + ); + for ip in ips { + let sock_addr = SocketAddr::new(ip, port); + addrs.insert(sock_addr); + } + state.client.addrs = addrs; + } + } + }); + + // udp task + let udp_task = tokio::task::spawn_local(async move { + loop { + tokio::select! { + event = receive_event(&socket) => { + let _ = receiver_tx.send(event).await; + } + event = sender_rx.recv() => { + let (event, addr) = match event { + Some(e) => e, + None => break, + }; + if let Err(e) = send_event(&socket, event, addr) { + log::warn!("udp send failed: {e}"); + }; + } + port = port_rx.recv() => { + let Some(port) = port else { + break; + }; + let current_port = socket.local_addr().unwrap().port(); + if current_port == port { + let _ = frontend_notify_tx.send(FrontendNotify::NotifyPortChange(port, None)).await; + return; + }; + + let listen_addr = SocketAddr::new("0.0.0.0".parse().unwrap(), port); + match UdpSocket::bind(listen_addr).await { + Ok(new_socket) => { + socket = new_socket; + let _ = frontend_notify_tx.send(FrontendNotify::NotifyPortChange(port, None)).await; + } + Err(e) => { + log::warn!("could not change port: {e}"); + let port = socket.local_addr().unwrap().port(); + let _ = frontend_notify_tx.send(FrontendNotify::NotifyPortChange( + port, + Some(format!("could not change port: {e}")), + )).await; + } + } + + } + } + } + }); + + let reaper = task::spawn_local(async move { + tokio::select! { + _ = signal::ctrl_c() => { + producer_task.abort(); + receiver_task.abort(); + frontend_task.abort(); + resolver_task.abort(); + udp_task.abort(); + }, + } + }); + + reaper.await?; Ok(()) } pub async fn add_client( - &mut self, + resolver_tx: &Sender<(String, ClientHandle)>, + client_manager: &Rc>, + frontend: &mut FrontendListener, hostname: Option, - mut addr: HashSet, + addr: HashSet, port: u16, pos: Position, ) -> ClientHandle { - let ips = if let Some(hostname) = hostname.as_ref() { - match self.resolver.resolve(hostname.as_str()).await { - Ok(ips) => HashSet::from_iter(ips.iter().cloned()), - Err(e) => { - log::warn!("could not resolve host: {e}"); - HashSet::new() - } - } - } else { - HashSet::new() - }; - addr.extend(ips.iter()); log::info!( "adding client [{}]{} @ {:?}", pos, hostname.as_deref().unwrap_or(""), - &ips + &addr ); - let client = self - .client_manager + let client = client_manager + .borrow_mut() .add_client(hostname.clone(), addr, port, pos); + log::debug!("add_client {client}"); + if let Some(hostname) = hostname.clone() { + let _ = resolver_tx.send((hostname, client)).await; + }; let notify = FrontendNotify::NotifyClientCreate(client, hostname, port, pos); - if let Err(e) = self.frontend.notify_all(notify).await { + if let Err(e) = frontend.notify_all(notify).await { log::error!("error notifying frontend: {e}"); }; client } - pub async fn activate_client(&mut self, client: ClientHandle, active: bool) { - if let Some(state) = self.client_manager.get_mut(client) { - state.active = active; - if state.active { - self.producer - .notify(ClientEvent::Create(client, state.client.pos)); - self.consumer - .notify(ClientEvent::Create(client, state.client.pos)) - .await; - } else { - self.producer.notify(ClientEvent::Destroy(client)); - self.consumer.notify(ClientEvent::Destroy(client)).await; + pub async fn activate_client( + producer_notify_tx: &Sender, + consumer_notify_tx: &Sender, + client_manager: &Rc>, + client: ClientHandle, + active: bool, + ) { + let (client, pos) = match client_manager.borrow_mut().get_mut(client) { + Some(state) => { + state.active = active; + (state.client.handle, state.client.pos) } + None => return, + }; + if active { + let _ = producer_notify_tx + .send(ProducerEvent::ClientEvent(ClientEvent::Create(client, pos))) + .await; + let _ = consumer_notify_tx + .send(ConsumerEvent::ClientEvent(ClientEvent::Create(client, pos))) + .await; + } else { + let _ = producer_notify_tx + .send(ProducerEvent::ClientEvent(ClientEvent::Destroy(client))) + .await; + let _ = consumer_notify_tx + .send(ConsumerEvent::ClientEvent(ClientEvent::Destroy(client))) + .await; } } - pub async fn remove_client(&mut self, client: ClientHandle) -> Option { - self.producer.notify(ClientEvent::Destroy(client)); - self.consumer.notify(ClientEvent::Destroy(client)).await; - if let Some(client) = self - .client_manager + pub async fn remove_client( + client_manager: &Rc>, + producer_notify_tx: &Sender, + consumer_notify_tx: &Sender, + frontend: &mut FrontendListener, + client: ClientHandle, + ) -> Option { + let _ = producer_notify_tx + .send(ProducerEvent::ClientEvent(ClientEvent::Destroy(client))) + .await; + let _ = consumer_notify_tx + .send(ConsumerEvent::ClientEvent(ClientEvent::Destroy(client))) + .await; + + let Some(client) = client_manager + .borrow_mut() .remove_client(client) .map(|s| s.client.handle) - { - let notify = FrontendNotify::NotifyClientDelete(client); - log::debug!("{notify:?}"); - if let Err(e) = self.frontend.notify_all(notify).await { - log::error!("error notifying frontend: {e}"); - } - Some(client) - } else { - None - } - } - - pub async fn update_client( - &mut self, - client: ClientHandle, - hostname: Option, - port: u16, - pos: Position, - ) { - // retrieve state - let Some(state) = self.client_manager.get_mut(client) else { - return; + else { + return None; }; - // update pos - state.client.pos = pos; - if state.active { - self.producer.notify(ClientEvent::Destroy(client)); - self.consumer.notify(ClientEvent::Destroy(client)).await; - self.producer.notify(ClientEvent::Create(client, pos)); - self.consumer.notify(ClientEvent::Create(client, pos)).await; + let notify = FrontendNotify::NotifyClientDelete(client); + log::debug!("{notify:?}"); + if let Err(e) = frontend.notify_all(notify).await { + log::error!("error notifying frontend: {e}"); } - - // update port - if state.client.port != port { - state.client.port = port; - state.client.addrs = state - .client - .addrs - .iter() - .cloned() - .map(|mut a| { - a.set_port(port); - a - }) - .collect(); - state - .client - .active_addr - .map(|a| SocketAddr::new(a.ip(), port)); - } - - // update hostname - if state.client.hostname != hostname { - state.client.addrs = HashSet::new(); - state.client.active_addr = None; - state.client.hostname = hostname; - if let Some(hostname) = state.client.hostname.as_ref() { - match self.resolver.resolve(hostname.as_str()).await { - Ok(ips) => { - let addrs = ips.iter().map(|i| SocketAddr::new(*i, port)); - state.client.addrs = HashSet::from_iter(addrs); - } - Err(e) => { - log::warn!("could not resolve host: {e}"); - } - } - } - } - log::debug!("client updated: {:?}", state); + Some(client) } - async fn handle_udp_rx(&mut self, event: (Event, SocketAddr)) { + async fn update_client( + producer_notify_tx: &Sender, + consumer_notify_tx: &Sender, + resolve_tx: &Sender<(String, ClientHandle)>, + client_manager: &Rc>, + client_update: ClientUpdate, + ) { + let (hostname, handle, active) = { + // retrieve state + let mut client_manager = client_manager.borrow_mut(); + let Some(state) = client_manager.get_mut(client_update.client) else { + return; + }; + + // update pos + state.client.pos = client_update.pos; + + // update port + if state.client.port != client_update.port { + state.client.port = client_update.port; + state.client.addrs = state + .client + .addrs + .iter() + .cloned() + .map(|mut a| { + a.set_port(client_update.port); + a + }) + .collect(); + state + .client + .active_addr + .map(|a| SocketAddr::new(a.ip(), client_update.port)); + } + + // update hostname + if state.client.hostname != client_update.hostname { + state.client.addrs = HashSet::new(); + state.client.active_addr = None; + state.client.hostname = client_update.hostname; + } + + log::debug!("client updated: {:?}", state); + ( + state.client.hostname.clone(), + state.client.handle, + state.active, + ) + }; + + // resolve dns + if let Some(hostname) = hostname { + let _ = resolve_tx.send((hostname, handle)).await; + } + + // update state in event consumer & producer + if active { + let _ = producer_notify_tx + .send(ProducerEvent::ClientEvent(ClientEvent::Destroy( + client_update.client, + ))) + .await; + let _ = consumer_notify_tx + .send(ConsumerEvent::ClientEvent(ClientEvent::Destroy( + client_update.client, + ))) + .await; + let _ = producer_notify_tx + .send(ProducerEvent::ClientEvent(ClientEvent::Create( + client_update.client, + client_update.pos, + ))) + .await; + let _ = consumer_notify_tx + .send(ConsumerEvent::ClientEvent(ClientEvent::Create( + client_update.client, + client_update.pos, + ))) + .await; + } + } + + async fn handle_udp_rx( + client_manager: &Rc>, + producer_notify_tx: &Sender, + consumer: &mut Box, + sender_tx: &Sender<(Event, SocketAddr)>, + state: &Rc>, + last_ignored: &mut Option, + event: (Event, SocketAddr), + ) { let (event, addr) = event; // get handle for addr - let handle = match self.client_manager.get_client(addr) { + let handle = match client_manager.borrow().get_client(addr) { Some(a) => a, None => { - if self.last_ignored.is_none() - || self.last_ignored.is_some() && self.last_ignored.unwrap() != addr + if last_ignored.is_none() || last_ignored.is_some() && last_ignored.unwrap() != addr { log::warn!("ignoring events from client {addr}"); - self.last_ignored = Some(addr); + last_ignored.replace(addr); } return; } }; // next event can be logged as ignored again - self.last_ignored = None; + last_ignored.take(); log::trace!("{:20} <-<-<-<------ {addr} ({handle})", event.to_string()); - let state = match self.client_manager.get_mut(handle) { - Some(s) => s, - None => { - log::error!("unknown handle"); - return; - } - }; + { + let mut client_manager = client_manager.borrow_mut(); + let client_state = match client_manager.get_mut(handle) { + Some(s) => s, + None => { + log::error!("unknown handle"); + return; + } + }; - // reset ttl for client and - state.last_seen = Some(Instant::now()); - // set addr as new default for this client - state.client.active_addr = Some(addr); + // reset ttl for client and + client_state.last_seen = Some(Instant::now()); + // set addr as new default for this client + client_state.client.active_addr = Some(addr); + } match (event, addr) { (Event::Pong(), _) => { /* ignore pong events */ } (Event::Ping(), addr) => { - if let Err(e) = send_event(&self.socket, Event::Pong(), addr) { - log::error!("udp send: {}", e); - } + let _ = sender_tx.send((Event::Pong(), addr)).await; } (event, addr) => { // tell clients that we are ready to receive events if let Event::Enter() = event { - if let Err(e) = send_event(&self.socket, Event::Leave(), addr) { - log::error!("udp send: {}", e); - } + let _ = sender_tx.send((Event::Leave(), addr)).await; } - match self.state { + match state.get() { State::Sending => { if let Event::Leave() = event { // ignore additional leave events that may // have been sent for redundancy } else { // upon receiving any event, we go back to receiving mode - self.producer.release(); - self.state = State::Receiving; + let _ = producer_notify_tx.send(ProducerEvent::Release).await; + state.replace(State::Receiving); + log::trace!("STATE ===> Receiving"); } } State::Receiving => { // consume event - self.consumer.consume(event, handle).await; + consumer.consume(event, handle).await; log::trace!("{event:?} => consumer"); } State::AwaitingLeave => { @@ -345,34 +559,59 @@ impl Server { // be on the way until a leave event occurs // telling us the client registered the enter if let Event::Leave() = event { - self.state = State::Sending; + state.replace(State::Sending); + log::trace!("STATE ===> Sending"); } // entering a client that is waiting for a leave // event should still be possible if let Event::Enter() = event { - self.state = State::Receiving; - self.producer.release(); + state.replace(State::Receiving); + log::trace!("STATE ===> Receiving"); + let _ = producer_notify_tx.send(ProducerEvent::Release).await; } } } } } - // let the server know we are still alive once every second - if state.last_replied.is_none() - || state.last_replied.is_some() - && state.last_replied.unwrap().elapsed() > Duration::from_secs(1) - { - state.last_replied = Some(Instant::now()); - if let Err(e) = send_event(&self.socket, Event::Pong(), addr) { - log::error!("udp send: {}", e); + + let pong = { + let mut client_manager = client_manager.borrow_mut(); + let client_state = match client_manager.get_mut(handle) { + Some(s) => s, + None => { + log::error!("unknown handle"); + return; + } + }; + + // let the server know we are still alive once every second + if client_state.last_replied.is_none() + || client_state.last_replied.is_some() + && client_state.last_replied.unwrap().elapsed() > Duration::from_secs(1) + { + client_state.last_replied = Some(Instant::now()); + true + } else { + false } + }; + + if pong { + let _ = sender_tx.send((Event::Pong(), addr)).await; } } const RELEASE_MODIFIERDS: u32 = 77; // ctrl+shift+super+alt - async fn handle_producer_event(&mut self, c: ClientHandle, mut e: Event) { + async fn handle_producer_event( + producer: &mut Box, + client_manager: &Rc>, + state: &Rc>, + sender_tx: &Sender<(Event, SocketAddr)>, + c: ClientHandle, + mut e: Event, + ) { log::trace!("producer: ({c}) {e:?}"); if let Event::Keyboard(crate::event::KeyboardEvent::Modifiers { @@ -383,8 +622,9 @@ impl Server { }) = e { if mods_depressed == Self::RELEASE_MODIFIERDS { - self.producer.release(); - self.state = State::Receiving; + producer.release(); + state.replace(State::Receiving); + log::trace!("STATE ===> Receiving"); // send an event to release all the modifiers e = Event::Keyboard(KeyboardEvent::Modifiers { mods_depressed: 0, @@ -395,91 +635,94 @@ impl Server { } } - // get client state for handle - let state = match self.client_manager.get_mut(c) { - Some(state) => state, - None => { - // should not happen - log::warn!("unknown client!"); - self.producer.release(); - self.state = State::Receiving; - return; + let (addr, enter, ping_addrs) = { + let mut enter = false; + let mut ping_addrs: Option> = None; + + // get client state for handle + let mut client_manager = client_manager.borrow_mut(); + let client_state = match client_manager.get_mut(c) { + Some(state) => state, + None => { + // should not happen + log::warn!("unknown client!"); + producer.release(); + state.replace(State::Receiving); + log::trace!("STATE ===> Receiving"); + return; + } + }; + + // if we just entered the client we want to send additional enter events until + // we get a leave event + if let State::Receiving | State::AwaitingLeave = state.get() { + state.replace(State::AwaitingLeave); + log::trace!("STATE ===> AwaitingLeave"); + enter = true; } + + let last_seen = match client_state.last_seen { + None => Duration::MAX, + Some(i) => i.elapsed(), + }; + + let last_pinged = match client_state.last_ping { + None => Duration::MAX, + Some(i) => i.elapsed(), + }; + + // not seen for one second but pinged at least 500ms ago + if last_seen > Duration::from_secs(1) + && last_pinged > Duration::from_millis(500) + && last_pinged < Duration::from_secs(1) + { + // client unresponsive -> set state to receiving + if state.get() != State::Receiving { + log::info!("client not responding - releasing pointer"); + producer.release(); + state.replace(State::Receiving); + log::trace!("STATE ===> Receiving"); + } + } + + // last ping > 500ms ago -> ping all interfaces + if last_pinged > Duration::from_millis(500) { + ping_addrs = Some(client_state.client.addrs.iter().cloned().collect()); + client_state.last_ping = Some(Instant::now()); + } + + (client_state.client.active_addr, enter, ping_addrs) }; - - // if we just entered the client we want to send additional enter events until - // we get a leave event - if let State::Receiving | State::AwaitingLeave = self.state { - self.state = State::AwaitingLeave; - if let Some(addr) = state.client.active_addr { - if let Err(e) = send_event(&self.socket, Event::Enter(), addr) { - log::error!("udp send: {}", e); - } + if let Some(addr) = addr { + if enter { + let _ = sender_tx.send((Event::Enter(), addr)).await; } + let _ = sender_tx.send((e, addr)).await; } - - // otherwise we should have an address to - // transmit events to the corrensponding client - if let Some(addr) = state.client.active_addr { - if let Err(e) = send_event(&self.socket, e, addr) { - log::error!("udp send: {}", e); - } - } - - // if client last responded > 2 seconds ago - // and we have not sent a ping since 500 milliseconds, send a ping - - // check if client was seen in the past 2 seconds - if state.last_seen.is_some() && state.last_seen.unwrap().elapsed() < Duration::from_secs(2) - { - return; - } - - // check if last ping is < 500ms ago - if state.last_ping.is_some() - && state.last_ping.unwrap().elapsed() < Duration::from_millis(500) - { - return; - } - - // last seen >= 2s, last ping >= 500ms - // -> client did not respond or a ping has not been sent for a while - // (pings are only sent when trying to access a device!) - - // check if last ping was < 1s ago -> 500ms < last_ping < 1s - // -> client did not respond in at least 500ms - if state.last_ping.is_some() && state.last_ping.unwrap().elapsed() < Duration::from_secs(1) - { - // client unresponsive -> set state to receiving - if self.state != State::Receiving { - log::info!("client not responding - releasing pointer"); - self.producer.release(); - self.state = State::Receiving; - } - } - - // last ping > 500ms ago -> ping all interfaces - state.last_ping = Some(Instant::now()); - for addr in state.client.addrs.iter() { - log::debug!("pinging {addr}"); - if let Err(e) = send_event(&self.socket, Event::Ping(), *addr) { - if e.kind() != ErrorKind::WouldBlock { - log::error!("udp send: {}", e); - } + if let Some(addrs) = ping_addrs { + for addr in addrs { + let _ = sender_tx.send((Event::Ping(), addr)).await; } } } #[cfg(unix)] - async fn handle_frontend_stream(&mut self, mut stream: ReadHalf) { + async fn handle_frontend_stream( + client_manager: &Rc>, + frontend: &mut FrontendListener, + frontend_tx: &Sender, + mut stream: ReadHalf, + ) { use std::io; - let tx = self.frontend_tx.clone(); + let tx = frontend_tx.clone(); tokio::task::spawn_local(async move { loop { let event = frontend::read_event(&mut stream).await; match event { - Ok(event) => tx.send(event).await.unwrap(), + Ok(event) => { + let _ = tx.send(event).await; + } Err(e) => { if let Some(e) = e.downcast_ref::() { if e.kind() == ErrorKind::UnexpectedEof { @@ -487,100 +730,127 @@ impl Server { } } log::error!("error reading frontend event: {e}"); + return; } } } }); - self.enumerate().await; + Self::enumerate(client_manager, frontend).await; } #[cfg(windows)] - async fn handle_frontend_stream(&mut self, mut stream: ReadHalf) { - let tx = self.frontend_tx.clone(); + async fn handle_frontend_stream( + client_manager: &Rc>, + frontend: &mut FrontendListener, + frontend_tx: &Sender, + mut stream: ReadHalf, + ) { + use std::io; + + let tx = frontend_tx.clone(); tokio::task::spawn_local(async move { loop { let event = frontend::read_event(&mut stream).await; match event { - Ok(event) => tx.send(event).await.unwrap(), - Err(e) => log::error!("error reading frontend event: {e}"), + Ok(event) => { + let _ = tx.send(event).await; + } + Err(e) => { + if let Some(e) = e.downcast_ref::() { + if e.kind() == ErrorKind::UnexpectedEof { + return; + } + } + log::error!("error reading frontend event: {e}"); + return; + } } } }); - self.enumerate().await; + Self::enumerate(client_manager, frontend).await; } - async fn handle_frontend_event(&mut self, event: FrontendEvent) -> bool { + async fn handle_frontend_event( + producer_notify_tx: &Sender, + consumer_notify_tx: &Sender, + client_manager: &Rc>, + resolve_tx: &Sender<(String, ClientHandle)>, + frontend: &mut FrontendListener, + port_tx: &Sender, + event: FrontendEvent, + ) -> bool { log::debug!("frontend: {event:?}"); match event { FrontendEvent::AddClient(hostname, port, pos) => { - self.add_client(hostname, HashSet::new(), port, pos).await; + Self::add_client( + resolve_tx, + client_manager, + frontend, + hostname, + HashSet::new(), + port, + pos, + ) + .await; } FrontendEvent::ActivateClient(client, active) => { - self.activate_client(client, active).await + Self::activate_client( + producer_notify_tx, + consumer_notify_tx, + client_manager, + client, + active, + ) + .await } FrontendEvent::ChangePort(port) => { - let current_port = self.socket.local_addr().unwrap().port(); - if current_port == port { - if let Err(e) = self - .frontend - .notify_all(FrontendNotify::NotifyPortChange(port, None)) - .await - { - log::warn!("error notifying frontend: {e}"); - } - return false; - } - let listen_addr = SocketAddr::new("0.0.0.0".parse().unwrap(), port); - match UdpSocket::bind(listen_addr).await { - Ok(socket) => { - self.socket = socket; - if let Err(e) = self - .frontend - .notify_all(FrontendNotify::NotifyPortChange(port, None)) - .await - { - log::warn!("error notifying frontend: {e}"); - } - } - Err(e) => { - log::warn!("could not change port: {e}"); - let port = self.socket.local_addr().unwrap().port(); - if let Err(e) = self - .frontend - .notify_all(FrontendNotify::NotifyPortChange( - port, - Some(format!("could not change port: {e}")), - )) - .await - { - log::error!("error notifying frontend: {e}"); - } - } - } + let _ = port_tx.send(port).await; } FrontendEvent::DelClient(client) => { - self.remove_client(client).await; + Self::remove_client( + client_manager, + producer_notify_tx, + consumer_notify_tx, + frontend, + client, + ) + .await; } - FrontendEvent::Enumerate() => self.enumerate().await, + FrontendEvent::Enumerate() => Self::enumerate(client_manager, frontend).await, FrontendEvent::Shutdown() => { log::info!("terminating gracefully..."); return true; } FrontendEvent::UpdateClient(client, hostname, port, pos) => { - self.update_client(client, hostname, port, pos).await + let client_update = ClientUpdate { + client, + hostname, + port, + pos, + }; + Self::update_client( + producer_notify_tx, + consumer_notify_tx, + resolve_tx, + client_manager, + client_update, + ) + .await } } false } - async fn enumerate(&mut self) { - let clients = self - .client_manager + async fn enumerate( + client_manager: &Rc>, + frontend: &mut FrontendListener, + ) { + let clients = client_manager + .borrow() .get_client_states() .map(|s| (s.client.clone(), s.active)) .collect(); - if let Err(e) = self - .frontend + if let Err(e) = frontend .notify_all(FrontendNotify::Enumerate(clients)) .await {