diff --git a/src/backend/producer/wayland.rs b/src/backend/producer/wayland.rs index 5ad1bf1..d386acf 100644 --- a/src/backend/producer/wayland.rs +++ b/src/backend/producer/wayland.rs @@ -568,6 +568,7 @@ impl EventProducer for WaylandEventProducer { } fn release(&mut self) { + log::debug!("releasing pointer"); let inner = self.0.get_mut(); inner.state.ungrab(); inner.flush_events(); diff --git a/src/client.rs b/src/client.rs index 10e6907..ca490b7 100644 --- a/src/client.rs +++ b/src/client.rs @@ -2,7 +2,6 @@ use std::{ collections::HashSet, fmt::Display, net::{IpAddr, SocketAddr}, - time::Instant, }; use serde::{Deserialize, Serialize}; @@ -57,10 +56,6 @@ pub struct Client { /// This way any event consumer / producer backend does not /// need to know anything about a client other than its handle. pub handle: ClientHandle, - /// `active` address of the client, used to send data to. - /// This should generally be the socket address where data - /// was last received from. - pub active_addr: Option, /// all socket addresses associated with a particular client /// e.g. Laptops usually have at least an ethernet and a wifi port /// which have different ip addresses @@ -71,7 +66,7 @@ pub struct Client { pub pos: Position, } -#[derive(Debug)] +#[derive(Clone, Copy, Debug)] pub enum ClientEvent { Create(ClientHandle, Position), Destroy(ClientHandle), @@ -81,11 +76,18 @@ pub type ClientHandle = u32; #[derive(Debug, Clone)] pub struct ClientState { + /// information about the client pub client: Client, + /// events should be sent to and received from the client pub active: bool, - pub last_ping: Option, - pub last_seen: Option, - pub last_replied: Option, + /// `active` address of the client, used to send data to. + /// This should generally be the socket address where data + /// was last received from. + pub active_addr: Option, + /// tracks whether or not the client is responding to pings + pub alive: bool, + /// keys currently pressed by this client + pub pressed_keys: HashSet, } pub struct ClientManager { @@ -114,9 +116,6 @@ impl ClientManager { // get a new client_handle let handle = self.free_id(); - // we dont know, which IP is initially active - let active_addr = None; - // store fix ip addresses let fix_ips = ips.iter().cloned().collect(); @@ -128,7 +127,6 @@ impl ClientManager { hostname, fix_ips, handle, - active_addr, addrs, port, pos, @@ -137,10 +135,10 @@ impl ClientManager { // client was never seen, nor pinged let client_state = ClientState { client, - last_ping: None, - last_seen: None, - last_replied: None, active: false, + active_addr: None, + alive: false, + pressed_keys: HashSet::new(), }; if handle as usize >= self.clients.len() { diff --git a/src/event.rs b/src/event.rs index 9086918..44a2bb9 100644 --- a/src/event.rs +++ b/src/event.rs @@ -1,3 +1,4 @@ +use anyhow::{anyhow, Result}; use std::{ error::Error, fmt::{self, Display}, @@ -65,6 +66,9 @@ pub enum Event { /// response to a ping event: this event signals that a client /// is still alive but must otherwise be ignored Pong(), + /// explicit disconnect request. The client will no longer + /// send events until the next Enter event. All of its keys should be released. + Disconnect(), } impl Display for PointerEvent { @@ -120,6 +124,7 @@ impl Display for Event { Event::Leave() => write!(f, "leave"), Event::Ping() => write!(f, "ping"), Event::Pong() => write!(f, "pong"), + Event::Disconnect() => write!(f, "disconnect"), } } } @@ -133,6 +138,7 @@ impl Event { Self::Leave() => EventType::Leave, Self::Ping() => EventType::Ping, Self::Pong() => EventType::Pong, + Self::Disconnect() => EventType::Disconnect, } } } @@ -174,18 +180,19 @@ enum EventType { Leave, Ping, Pong, + Disconnect, } impl TryFrom for PointerEventType { - type Error = Box; + type Error = anyhow::Error; - fn try_from(value: u8) -> Result { + fn try_from(value: u8) -> Result { match value { x if x == Self::Motion as u8 => Ok(Self::Motion), x if x == Self::Button as u8 => Ok(Self::Button), x if x == Self::Axis as u8 => Ok(Self::Axis), x if x == Self::Frame as u8 => Ok(Self::Frame), - _ => Err(Box::new(ProtocolError { + _ => Err(anyhow!(ProtocolError { msg: format!("invalid pointer event type {}", value), })), } @@ -193,13 +200,13 @@ impl TryFrom for PointerEventType { } impl TryFrom for KeyboardEventType { - type Error = Box; + type Error = anyhow::Error; - fn try_from(value: u8) -> Result { + fn try_from(value: u8) -> Result { match value { x if x == Self::Key as u8 => Ok(Self::Key), x if x == Self::Modifiers as u8 => Ok(Self::Modifiers), - _ => Err(Box::new(ProtocolError { + _ => Err(anyhow!(ProtocolError { msg: format!("invalid keyboard event type {}", value), })), } @@ -216,6 +223,7 @@ impl From<&Event> for Vec { Event::Leave() => vec![], Event::Ping() => vec![], Event::Pong() => vec![], + Event::Disconnect() => vec![], }; [event_id, event_data].concat() } @@ -234,9 +242,9 @@ impl fmt::Display for ProtocolError { impl Error for ProtocolError {} impl TryFrom> for Event { - type Error = Box; + type Error = anyhow::Error; - fn try_from(value: Vec) -> Result { + fn try_from(value: Vec) -> Result { let event_id = u8::from_be_bytes(value[..1].try_into()?); match event_id { i if i == (EventType::Pointer as u8) => Ok(Event::Pointer(value.try_into()?)), @@ -245,7 +253,8 @@ impl TryFrom> for Event { i if i == (EventType::Leave as u8) => Ok(Event::Leave()), i if i == (EventType::Ping as u8) => Ok(Event::Ping()), i if i == (EventType::Pong as u8) => Ok(Event::Pong()), - _ => Err(Box::new(ProtocolError { + i if i == (EventType::Disconnect as u8) => Ok(Event::Disconnect()), + _ => Err(anyhow!(ProtocolError { msg: format!("invalid event_id {}", event_id), })), } @@ -291,9 +300,9 @@ impl From<&PointerEvent> for Vec { } impl TryFrom> for PointerEvent { - type Error = Box; + type Error = anyhow::Error; - fn try_from(data: Vec) -> Result { + fn try_from(data: Vec) -> Result { match data.get(1) { Some(id) => { let event_type = match id.to_owned().try_into() { @@ -305,7 +314,7 @@ impl TryFrom> for PointerEvent { let time = match data.get(2..6) { Some(d) => u32::from_be_bytes(d.try_into()?), None => { - return Err(Box::new(ProtocolError { + return Err(anyhow!(ProtocolError { msg: "Expected 4 Bytes at index 2".into(), })) } @@ -313,7 +322,7 @@ impl TryFrom> for PointerEvent { let relative_x = match data.get(6..14) { Some(d) => f64::from_be_bytes(d.try_into()?), None => { - return Err(Box::new(ProtocolError { + return Err(anyhow!(ProtocolError { msg: "Expected 8 Bytes at index 6".into(), })) } @@ -321,7 +330,7 @@ impl TryFrom> for PointerEvent { let relative_y = match data.get(14..22) { Some(d) => f64::from_be_bytes(d.try_into()?), None => { - return Err(Box::new(ProtocolError { + return Err(anyhow!(ProtocolError { msg: "Expected 8 Bytes at index 14".into(), })) } @@ -336,7 +345,7 @@ impl TryFrom> for PointerEvent { let time = match data.get(2..6) { Some(d) => u32::from_be_bytes(d.try_into()?), None => { - return Err(Box::new(ProtocolError { + return Err(anyhow!(ProtocolError { msg: "Expected 4 Bytes at index 2".into(), })) } @@ -344,7 +353,7 @@ impl TryFrom> for PointerEvent { let button = match data.get(6..10) { Some(d) => u32::from_be_bytes(d.try_into()?), None => { - return Err(Box::new(ProtocolError { + return Err(anyhow!(ProtocolError { msg: "Expected 4 Bytes at index 10".into(), })) } @@ -352,7 +361,7 @@ impl TryFrom> for PointerEvent { let state = match data.get(10..14) { Some(d) => u32::from_be_bytes(d.try_into()?), None => { - return Err(Box::new(ProtocolError { + return Err(anyhow!(ProtocolError { msg: "Expected 4 Bytes at index 14".into(), })) } @@ -367,7 +376,7 @@ impl TryFrom> for PointerEvent { let time = match data.get(2..6) { Some(d) => u32::from_be_bytes(d.try_into()?), None => { - return Err(Box::new(ProtocolError { + return Err(anyhow!(ProtocolError { msg: "Expected 4 Bytes at index 2".into(), })) } @@ -375,7 +384,7 @@ impl TryFrom> for PointerEvent { let axis = match data.get(6) { Some(d) => *d, None => { - return Err(Box::new(ProtocolError { + return Err(anyhow!(ProtocolError { msg: "Expected 1 Byte at index 6".into(), })); } @@ -383,7 +392,7 @@ impl TryFrom> for PointerEvent { let value = match data.get(7..15) { Some(d) => f64::from_be_bytes(d.try_into()?), None => { - return Err(Box::new(ProtocolError { + return Err(anyhow!(ProtocolError { msg: "Expected 8 Bytes at index 7".into(), })); } @@ -393,7 +402,7 @@ impl TryFrom> for PointerEvent { PointerEventType::Frame => Ok(Self::Frame {}), } } - None => Err(Box::new(ProtocolError { + None => Err(anyhow!(ProtocolError { msg: "Expected an element at index 0".into(), })), } @@ -434,9 +443,9 @@ impl From<&KeyboardEvent> for Vec { } impl TryFrom> for KeyboardEvent { - type Error = Box; + type Error = anyhow::Error; - fn try_from(data: Vec) -> Result { + fn try_from(data: Vec) -> Result { match data.get(1) { Some(id) => { let event_type = match id.to_owned().try_into() { @@ -448,7 +457,7 @@ impl TryFrom> for KeyboardEvent { let time = match data.get(2..6) { Some(d) => u32::from_be_bytes(d.try_into()?), None => { - return Err(Box::new(ProtocolError { + return Err(anyhow!(ProtocolError { msg: "Expected 4 Bytes at index 6".into(), })) } @@ -456,7 +465,7 @@ impl TryFrom> for KeyboardEvent { let key = match data.get(6..10) { Some(d) => u32::from_be_bytes(d.try_into()?), None => { - return Err(Box::new(ProtocolError { + return Err(anyhow!(ProtocolError { msg: "Expected 4 Bytes at index 10".into(), })) } @@ -464,7 +473,7 @@ impl TryFrom> for KeyboardEvent { let state = match data.get(10) { Some(d) => *d, None => { - return Err(Box::new(ProtocolError { + return Err(anyhow!(ProtocolError { msg: "Expected 1 Bytes at index 14".into(), })) } @@ -475,7 +484,7 @@ impl TryFrom> for KeyboardEvent { let mods_depressed = match data.get(2..6) { Some(d) => u32::from_be_bytes(d.try_into()?), None => { - return Err(Box::new(ProtocolError { + return Err(anyhow!(ProtocolError { msg: "Expected 4 Bytes at index 6".into(), })) } @@ -483,7 +492,7 @@ impl TryFrom> for KeyboardEvent { let mods_latched = match data.get(6..10) { Some(d) => u32::from_be_bytes(d.try_into()?), None => { - return Err(Box::new(ProtocolError { + return Err(anyhow!(ProtocolError { msg: "Expected 4 Bytes at index 10".into(), })) } @@ -491,7 +500,7 @@ impl TryFrom> for KeyboardEvent { let mods_locked = match data.get(10..14) { Some(d) => u32::from_be_bytes(d.try_into()?), None => { - return Err(Box::new(ProtocolError { + return Err(anyhow!(ProtocolError { msg: "Expected 4 Bytes at index 14".into(), })) } @@ -499,7 +508,7 @@ impl TryFrom> for KeyboardEvent { let group = match data.get(14..18) { Some(d) => u32::from_be_bytes(d.try_into()?), None => { - return Err(Box::new(ProtocolError { + return Err(anyhow!(ProtocolError { msg: "Expected 4 Bytes at index 18".into(), })) } @@ -513,7 +522,7 @@ impl TryFrom> for KeyboardEvent { } } } - None => Err(Box::new(ProtocolError { + None => Err(anyhow!(ProtocolError { msg: "Expected an element at index 0".into(), })), } diff --git a/src/frontend.rs b/src/frontend.rs index 208faac..e69ed43 100644 --- a/src/frontend.rs +++ b/src/frontend.rs @@ -108,6 +108,7 @@ pub enum FrontendNotify { NotifyClientDelete(ClientHandle), /// new port, reason of failure (if failed) NotifyPortChange(u16, Option), + /// Client State, active Enumerate(Vec<(Client, bool)>), NotifyError(String), } diff --git a/src/main.rs b/src/main.rs index cd10823..7fb431c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -65,7 +65,9 @@ fn run_service(config: &Config) -> Result<()> { runtime.block_on(LocalSet::new().run_until(async { // run main loop log::info!("Press Ctrl+Alt+Shift+Super to release the mouse"); - Server::run(config).await?; + + let server = Server::new(config); + server.run().await?; log::debug!("service exiting"); anyhow::Ok(()) diff --git a/src/server.rs b/src/server.rs index e735ff3..12a8769 100644 --- a/src/server.rs +++ b/src/server.rs @@ -4,13 +4,12 @@ use log; use std::{ cell::{Cell, RefCell}, collections::HashSet, - error::Error, io::Result, net::IpAddr, rc::Rc, - time::{Duration, Instant}, + time::Duration, }; -use tokio::{io::ReadHalf, net::UdpSocket, signal, sync::mpsc::Sender, task}; +use tokio::{io::ReadHalf, net::UdpSocket, signal, sync::mpsc::Sender}; #[cfg(unix)] use tokio::net::UnixStream; @@ -27,6 +26,7 @@ use crate::{ dns, frontend::{self, FrontendEvent, FrontendListener, FrontendNotify}, producer::EventProducer, + scancode, }; use crate::{ consumer, @@ -34,20 +34,37 @@ use crate::{ producer, }; +const MAX_RESPONSE_TIME: Duration = Duration::from_millis(500); + #[derive(Clone, Copy, Debug, Eq, PartialEq)] enum State { + /// Currently sending events to another device Sending, + /// Currently receiving events from other devices Receiving, + /// Entered the deadzone of another device but waiting + /// for acknowledgement (Leave event) from the device AwaitingLeave, } +#[derive(Clone, Copy, Debug)] pub enum ProducerEvent { + /// producer must release the mouse Release, + /// producer is notified of a change in client states ClientEvent(ClientEvent), + /// termination signal + Terminate, } +#[derive(Clone, Debug)] pub enum ConsumerEvent { + /// consumer is notified of a change in client states ClientEvent(ClientEvent), + /// consumer must release keys for client + ReleaseKeys(ClientHandle), + /// termination signal + Terminate, } #[derive(Clone)] @@ -58,14 +75,35 @@ struct ClientUpdate { pos: Position, } -pub struct Server {} +#[derive(Clone)] +pub struct Server { + active_client: Rc>>, + client_manager: Rc>, + port: Rc>, + state: Rc>, +} impl Server { - pub async fn run(config: &Config) -> anyhow::Result<()> { + pub fn new(config: &Config) -> Self { + let active_client = Rc::new(Cell::new(None)); + let client_manager = Rc::new(RefCell::new(ClientManager::new())); + let state = Rc::new(Cell::new(State::Receiving)); + let port = Rc::new(Cell::new(config.port)); + for (ips, host, port, pos) in config.get_clients() { + client_manager.borrow_mut().add_client(host, ips, port, pos); + } + Self { + active_client, + client_manager, + port, + state, + } + } + + pub async fn run(&self) -> anyhow::Result<()> { // create frontend communication adapter let mut frontend = match FrontendListener::new().await { - Some(Err(e)) => return Err(e), - Some(Ok(f)) => f, + Some(f) => f?, None => { // none means some other instance is already running log::info!("service already running, exiting"); @@ -74,98 +112,67 @@ impl Server { }; let (mut consumer, mut producer) = tokio::join!(consumer::create(), producer::create()); - // create dns resolver - let resolver = dns::DnsResolver::new().await?; - - // bind the udp socket - let listen_addr = SocketAddr::new("0.0.0.0".parse().unwrap(), config.port); - let mut socket = UdpSocket::bind(listen_addr).await?; let (frontend_tx, mut frontend_rx) = tokio::sync::mpsc::channel(1); - - // create client manager - let client_manager_rc = Rc::new(RefCell::new(ClientManager::new())); - - let state_rc = Rc::new(Cell::new(State::Receiving)); - - // channel to notify producer let (producer_notify_tx, mut producer_notify_rx) = tokio::sync::mpsc::channel(32); - - // channel to notify consumer let (consumer_notify_tx, mut consumer_notify_rx) = tokio::sync::mpsc::channel(32); - - // channel to request dns resolver let (resolve_tx, mut resolve_rx) = tokio::sync::mpsc::channel(32); - - // channel to send events to frontends let (frontend_notify_tx, mut frontend_notify_rx) = tokio::sync::mpsc::channel(32); - - // channels for udp send / receive let (receiver_tx, mut receiver_rx) = tokio::sync::mpsc::channel(32); let (sender_tx, mut sender_rx) = tokio::sync::mpsc::channel(32); let (port_tx, mut port_rx) = tokio::sync::mpsc::channel(32); - - // add clients from config - for (c, h, port, p) in config.get_clients().into_iter() { - Self::add_client( - &resolve_tx, - &client_manager_rc, - &mut frontend, - h, - c, - port, - p, - ) - .await; - } + let (timer_tx, mut timer_rx) = tokio::sync::mpsc::channel(1); // event producer - let client_manager = client_manager_rc.clone(); - let state = state_rc.clone(); let sender_ch = sender_tx.clone(); - let producer_task = tokio::task::spawn_local(async move { + let timer_ch = timer_tx.clone(); + let server = self.clone(); + let mut producer_task = tokio::task::spawn_local(async move { loop { tokio::select! { - e = producer.next() => { - let (client, event) = match e { - Some(e) => e?, - None => return Err::<(), anyhow::Error>(anyhow!("event producer closed")), - }; - Self::handle_producer_event(&mut producer, &client_manager, &state, &sender_ch, client, event).await; + event = producer.next() => { + let event = event.ok_or(anyhow!("event producer closed"))??; + log::debug!("producer event: {event:?}"); + server.handle_producer_event(&mut producer, &sender_ch, &timer_ch, event).await; } e = producer_notify_rx.recv() => { + log::debug!("producer notify rx: {e:?}"); match e { Some(e) => match e { - ProducerEvent::Release => producer.release(), + ProducerEvent::Release => { + producer.release(); + server.state.replace(State::Receiving); + + } ProducerEvent::ClientEvent(e) => producer.notify(e), + ProducerEvent::Terminate => break, }, - None => break Ok(()), + None => break, } } } } + anyhow::Ok(()) }); // event consumer - let client_manager = client_manager_rc.clone(); - let state = state_rc.clone(); let producer_notify = producer_notify_tx.clone(); - let receiver_task = tokio::task::spawn_local(async move { + let sender_ch = sender_tx.clone(); + let server = self.clone(); + let mut consumer_task = tokio::task::spawn_local(async move { let mut last_ignored = None; loop { tokio::select! { udp_event = receiver_rx.recv() => { - let udp_event = match udp_event { - Some(Ok(e)) => e, - Some(Err(e)) => return Err::<(), anyhow::Error>(anyhow!("{}", e)), - None => return Err::<(), anyhow::Error>(anyhow!("receiver closed")), - }; - Self::handle_udp_rx(&client_manager, &producer_notify, &mut consumer, &sender_tx, &state, &mut last_ignored, udp_event).await; + let udp_event = udp_event.ok_or(anyhow!("receiver closed"))??; + server.handle_udp_rx(&producer_notify, &mut consumer, &sender_ch, &mut last_ignored, udp_event, &timer_tx).await; } consumer_event = consumer_notify_rx.recv() => { match consumer_event { Some(e) => match e { ConsumerEvent::ClientEvent(e) => consumer.notify(e).await, + ConsumerEvent::ReleaseKeys(c) => server.release_keys(&mut consumer, c).await, + ConsumerEvent::Terminate => break, }, None => break, } @@ -173,14 +180,29 @@ impl Server { _ = consumer.dispatch() => { } } } + + // release potentially still pressed keys + let clients = server + .client_manager + .borrow() + .get_client_states() + .map(|s| s.client.handle) + .collect::>(); + for client in clients { + server.release_keys(&mut consumer, client).await; + } + // destroy consumer consumer.destroy().await; - Ok(()) + anyhow::Ok(()) }); // frontend listener - let client_manager = client_manager_rc.clone(); - let frontend_task = tokio::task::spawn_local(async move { + let server = self.clone(); + let producer_notify = producer_notify_tx.clone(); + let consumer_notify = consumer_notify_tx.clone(); + let frontend_ch = frontend_tx.clone(); + let mut frontend_task = tokio::task::spawn_local(async move { loop { tokio::select! { stream = frontend.accept() => { @@ -191,32 +213,29 @@ impl Server { continue; } }; - Self::handle_frontend_stream(&client_manager, &mut frontend, &frontend_tx, stream).await; + server.handle_frontend_stream(&mut frontend, &frontend_ch, stream).await; } event = frontend_rx.recv() => { - let frontend_event = match event { - Some(e) => e, - None => return Err::<(), anyhow::Error>(anyhow!("frontend channel closed")), - }; - let exit = Self::handle_frontend_event(&producer_notify_tx, &consumer_notify_tx, &client_manager, &resolve_tx, &mut frontend, &port_tx, frontend_event).await; - if exit { - return Ok(()); + let frontend_event = event.ok_or(anyhow!("frontend channel closed"))?; + if server.handle_frontend_event(&producer_notify, &consumer_notify, &resolve_tx, &mut frontend, &port_tx, frontend_event).await { + break; } } notify = frontend_notify_rx.recv() => { - let notify = match notify { - Some(n) => n, - None => return Err::<(), anyhow::Error>(anyhow!("frontend notify closed")), - }; + let notify = notify.ok_or(anyhow!("frontend notify closed"))?; let _ = frontend.notify_all(notify).await; } } } + anyhow::Ok(()) }); // dns resolver - let client_manager = client_manager_rc.clone(); - let resolver_task = tokio::task::spawn_local(async move { + + // create dns resolver + let resolver = dns::DnsResolver::new().await?; + let server = self.clone(); + let mut resolver_task = tokio::task::spawn_local(async move { loop { let (host, client): (String, ClientHandle) = match resolve_rx.recv().await { Some(r) => r, @@ -229,7 +248,7 @@ impl Server { continue; } }; - if let Some(state) = client_manager.borrow_mut().get_mut(client) { + if let Some(state) = server.client_manager.borrow_mut().get_mut(client) { let port = state.client.port; let mut addrs = HashSet::from_iter( state @@ -247,8 +266,11 @@ impl Server { } }); + // bind the udp socket + let listen_addr = SocketAddr::new("0.0.0.0".parse().unwrap(), self.port.get()); + let mut socket = UdpSocket::bind(listen_addr).await?; // udp task - let udp_task = tokio::task::spawn_local(async move { + let mut udp_task = tokio::task::spawn_local(async move { loop { tokio::select! { event = receive_event(&socket) => { @@ -266,16 +288,16 @@ impl Server { let Some(port) = port else { break; }; - let current_port = socket.local_addr().unwrap().port(); - if current_port == port { - let _ = frontend_notify_tx.send(FrontendNotify::NotifyPortChange(port, None)).await; + + if socket.local_addr().unwrap().port() == port { continue; - }; + } let listen_addr = SocketAddr::new("0.0.0.0".parse().unwrap(), port); match UdpSocket::bind(listen_addr).await { Ok(new_socket) => { socket = new_socket; + server.port.replace(port); let _ = frontend_notify_tx.send(FrontendNotify::NotifyPortChange(port, None)).await; } Err(e) => { @@ -293,37 +315,157 @@ impl Server { } }); - let reaper = task::spawn_local(async move { - tokio::select! { - _ = signal::ctrl_c() => { - log::info!("terminating service"); - }, - _ = producer_task => { - // TODO restart producer? - } - _ = receiver_task => { - // TODO restart producer? - } - _ = frontend_task => { - // frontend exited => exit requested - } - _ = resolver_task => { - // resolver exited - } - _ = udp_task => { - // udp exited + // timer task + let server = self.clone(); + let sender_ch = sender_tx.clone(); + let consumer_notify = consumer_notify_tx.clone(); + let producer_notify = producer_notify_tx.clone(); + let mut live_tracker = tokio::task::spawn_local(async move { + loop { + // wait for wake up signal + let Some(_): Option<()> = timer_rx.recv().await else { + break; + }; + loop { + let receiving = server.state.get() == State::Receiving; + let (ping_clients, ping_addrs) = { + let mut client_manager = server.client_manager.borrow_mut(); + + let ping_clients: Vec = if receiving { + // if receiving we care about clients with pressed keys + client_manager + .get_client_states_mut() + .filter(|s| !s.pressed_keys.is_empty()) + .map(|s| s.client.handle) + .collect() + } else { + // if sending we care about the active client + server.active_client.get().iter().cloned().collect() + }; + + // get relevant socket addrs for clients + let ping_addrs: Vec = { + ping_clients + .iter() + .flat_map(|&c| client_manager.get(c)) + .flat_map(|state| { + if let Some(a) = state.active_addr { + vec![a] + } else { + state.client.addrs.iter().cloned().collect() + } + }) + .collect() + }; + + // reset alive + for state in client_manager.get_client_states_mut() { + state.alive = false; + } + + (ping_clients, ping_addrs) + }; + + if receiving && ping_clients.is_empty() { + // receiving and no client has pressed keys + // -> no need to keep pinging + break; + } + + // ping clients + for addr in ping_addrs { + if sender_ch.send((Event::Ping(), addr)).await.is_err() { + break; + } + } + + // give clients time to resond + if receiving { + log::debug!("waiting {MAX_RESPONSE_TIME:?} for response from client with pressed keys ..."); + } else { + log::debug!("state: {:?} => waiting {MAX_RESPONSE_TIME:?} for client to respond ...", server.state.get()); + } + + tokio::time::sleep(MAX_RESPONSE_TIME).await; + + // when anything is received from a client, + // the alive flag gets set + let unresponsive_clients: Vec<_> = { + let client_manager = server.client_manager.borrow(); + ping_clients + .iter() + .filter_map(|&c| match client_manager.get(c) { + Some(state) if !state.alive => Some(c), + _ => None, + }) + .collect() + }; + + // we may not be receiving anymore but we should respond + // to the original state and not the "new" one + if receiving { + for c in unresponsive_clients { + log::warn!("device not responding, releasing keys!"); + let _ = consumer_notify.send(ConsumerEvent::ReleaseKeys(c)).await; + } + } else { + // release pointer if the active client has not responded + if !unresponsive_clients.is_empty() { + log::warn!("client not responding, releasing pointer!"); + server.state.replace(State::Receiving); + let _ = producer_notify.send(ProducerEvent::Release).await; + } + } } } }); - reaper.await?; + // initial sync of clients + frontend_tx.send(FrontendEvent::Enumerate()).await?; + + tokio::select! { + _ = signal::ctrl_c() => { + log::info!("terminating service"); + }, + _ = &mut producer_task => { + // TODO restart producer? + } + _ = &mut consumer_task => { + // TODO restart producer? + } + _ = &mut frontend_task => { + // frontend exited => exit requested + } + _ = &mut resolver_task => { + // resolver exited + } + _ = &mut udp_task => { + // udp exited + } + _ = &mut live_tracker => { + // live tracker exited + } + } + + let _ = consumer_notify_tx.send(ConsumerEvent::Terminate).await; + let _ = producer_notify_tx.send(ProducerEvent::Terminate).await; + let _ = frontend_tx.send(FrontendEvent::Shutdown()).await; + + let (a, b, c) = tokio::join!(producer_task, consumer_task, frontend_task); + a??; + b??; + c??; + + resolver_task.abort(); + udp_task.abort(); + live_tracker.abort(); Ok(()) } pub async fn add_client( + &self, resolver_tx: &Sender<(String, ClientHandle)>, - client_manager: &Rc>, frontend: &mut FrontendListener, hostname: Option, addr: HashSet, @@ -336,7 +478,8 @@ impl Server { hostname.as_deref().unwrap_or(""), &addr ); - let client = client_manager + let client = self + .client_manager .borrow_mut() .add_client(hostname.clone(), addr, port, pos); @@ -352,13 +495,13 @@ impl Server { } pub async fn activate_client( + &self, producer_notify_tx: &Sender, consumer_notify_tx: &Sender, - client_manager: &Rc>, client: ClientHandle, active: bool, ) { - let (client, pos) = match client_manager.borrow_mut().get_mut(client) { + let (client, pos) = match self.client_manager.borrow_mut().get_mut(client) { Some(state) => { state.active = active; (state.client.handle, state.client.pos) @@ -383,7 +526,7 @@ impl Server { } pub async fn remove_client( - client_manager: &Rc>, + &self, producer_notify_tx: &Sender, consumer_notify_tx: &Sender, frontend: &mut FrontendListener, @@ -396,7 +539,8 @@ impl Server { .send(ConsumerEvent::ClientEvent(ClientEvent::Destroy(client))) .await; - let Some(client) = client_manager + let Some(client) = self + .client_manager .borrow_mut() .remove_client(client) .map(|s| s.client.handle) @@ -413,15 +557,15 @@ impl Server { } async fn update_client( + &self, producer_notify_tx: &Sender, consumer_notify_tx: &Sender, resolve_tx: &Sender<(String, ClientHandle)>, - client_manager: &Rc>, client_update: ClientUpdate, ) { let (hostname, handle, active) = { // retrieve state - let mut client_manager = client_manager.borrow_mut(); + let mut client_manager = self.client_manager.borrow_mut(); let Some(state) = client_manager.get_mut(client_update.client) else { return; }; @@ -443,7 +587,6 @@ impl Server { }) .collect(); state - .client .active_addr .map(|a| SocketAddr::new(a.ip(), client_update.port)); } @@ -451,7 +594,7 @@ impl Server { // update hostname if state.client.hostname != client_update.hostname { state.client.addrs = HashSet::new(); - state.client.active_addr = None; + state.active_addr = None; state.client.hostname = client_update.hostname; } @@ -496,18 +639,18 @@ impl Server { } async fn handle_udp_rx( - client_manager: &Rc>, + &self, producer_notify_tx: &Sender, consumer: &mut Box, sender_tx: &Sender<(Event, SocketAddr)>, - state: &Rc>, last_ignored: &mut Option, event: (Event, SocketAddr), + timer_tx: &Sender<()>, ) { let (event, addr) = event; // get handle for addr - let handle = match client_manager.borrow().get_client(addr) { + let handle = match self.client_manager.borrow().get_client(addr) { Some(a) => a, None => { if last_ignored.is_none() || last_ignored.is_some() && last_ignored.unwrap() != addr @@ -524,7 +667,7 @@ impl Server { log::trace!("{:20} <-<-<-<------ {addr} ({handle})", event.to_string()); { - let mut client_manager = client_manager.borrow_mut(); + let mut client_manager = self.client_manager.borrow_mut(); let client_state = match client_manager.get_mut(handle) { Some(s) => s, None => { @@ -534,9 +677,9 @@ impl Server { }; // reset ttl for client and - client_state.last_seen = Some(Instant::now()); + client_state.alive = true; // set addr as new default for this client - client_state.client.active_addr = Some(addr); + client_state.active_addr = Some(addr); } match (event, addr) { @@ -544,24 +687,49 @@ impl Server { (Event::Ping(), addr) => { let _ = sender_tx.send((Event::Pong(), addr)).await; } + (Event::Disconnect(), _) => { + self.release_keys(consumer, handle).await; + } (event, addr) => { // tell clients that we are ready to receive events if let Event::Enter() = event { let _ = sender_tx.send((Event::Leave(), addr)).await; } - match state.get() { + + match self.state.get() { State::Sending => { if let Event::Leave() = event { // ignore additional leave events that may // have been sent for redundancy } else { // upon receiving any event, we go back to receiving mode + self.state.replace(State::Receiving); let _ = producer_notify_tx.send(ProducerEvent::Release).await; - state.replace(State::Receiving); log::trace!("STATE ===> Receiving"); } } State::Receiving => { + if let Event::Keyboard(KeyboardEvent::Key { + time: _, + key, + state, + }) = event + { + let mut client_manager = self.client_manager.borrow_mut(); + let client_state = + if let Some(client_state) = client_manager.get_mut(handle) { + client_state + } else { + log::error!("unknown handle"); + return; + }; + if state == 0 { + client_state.pressed_keys.remove(&key); + } else { + client_state.pressed_keys.insert(key); + let _ = timer_tx.try_send(()); + } + } // consume event consumer.consume(event, handle).await; log::trace!("{event:?} => consumer"); @@ -572,59 +740,33 @@ impl Server { // be on the way until a leave event occurs // telling us the client registered the enter if let Event::Leave() = event { - state.replace(State::Sending); + self.state.replace(State::Sending); log::trace!("STATE ===> Sending"); } // entering a client that is waiting for a leave // event should still be possible if let Event::Enter() = event { - state.replace(State::Receiving); - log::trace!("STATE ===> Receiving"); + self.state.replace(State::Receiving); let _ = producer_notify_tx.send(ProducerEvent::Release).await; + log::trace!("STATE ===> Receiving"); } } } } } - - let pong = { - let mut client_manager = client_manager.borrow_mut(); - let client_state = match client_manager.get_mut(handle) { - Some(s) => s, - None => { - log::error!("unknown handle"); - return; - } - }; - - // let the server know we are still alive once every second - if client_state.last_replied.is_none() - || client_state.last_replied.is_some() - && client_state.last_replied.unwrap().elapsed() > Duration::from_secs(1) - { - client_state.last_replied = Some(Instant::now()); - true - } else { - false - } - }; - - if pong { - let _ = sender_tx.send((Event::Pong(), addr)).await; - } } const RELEASE_MODIFIERDS: u32 = 77; // ctrl+shift+super+alt async fn handle_producer_event( + &self, producer: &mut Box, - client_manager: &Rc>, - state: &Rc>, sender_tx: &Sender<(Event, SocketAddr)>, - c: ClientHandle, - mut e: Event, + timer_tx: &Sender<()>, + event: (ClientHandle, Event), ) { + let (c, mut e) = event; log::trace!("producer: ({c}) {e:?}"); if let Event::Keyboard(crate::event::KeyboardEvent::Modifiers { @@ -636,31 +778,26 @@ impl Server { { if mods_depressed == Self::RELEASE_MODIFIERDS { producer.release(); - state.replace(State::Receiving); + self.state.replace(State::Receiving); log::trace!("STATE ===> Receiving"); // send an event to release all the modifiers - e = Event::Keyboard(KeyboardEvent::Modifiers { - mods_depressed: 0, - mods_latched: 0, - mods_locked: 0, - group: 0, - }); + e = Event::Disconnect(); } } - let (addr, enter, ping_addrs) = { + let (addr, enter, start_timer) = { let mut enter = false; - let mut ping_addrs: Option> = None; + let mut start_timer = false; // get client state for handle - let mut client_manager = client_manager.borrow_mut(); + let mut client_manager = self.client_manager.borrow_mut(); let client_state = match client_manager.get_mut(c) { Some(state) => state, None => { // should not happen log::warn!("unknown client!"); producer.release(); - state.replace(State::Receiving); + self.state.replace(State::Receiving); log::trace!("STATE ===> Receiving"); return; } @@ -668,59 +805,30 @@ impl Server { // if we just entered the client we want to send additional enter events until // we get a leave event - if let State::Receiving | State::AwaitingLeave = state.get() { - state.replace(State::AwaitingLeave); + if let Event::Enter() = e { + self.state.replace(State::AwaitingLeave); + self.active_client.replace(Some(client_state.client.handle)); + log::trace!("Active client => {}", client_state.client.handle); + start_timer = true; log::trace!("STATE ===> AwaitingLeave"); enter = true; } - let last_seen = match client_state.last_seen { - None => Duration::MAX, - Some(i) => i.elapsed(), - }; - - let last_pinged = match client_state.last_ping { - None => Duration::MAX, - Some(i) => i.elapsed(), - }; - - // not seen for one second but pinged at least 500ms ago - if last_seen > Duration::from_secs(1) - && last_pinged > Duration::from_millis(500) - && last_pinged < Duration::from_secs(1) - { - // client unresponsive -> set state to receiving - if state.get() != State::Receiving { - log::info!("client not responding - releasing pointer"); - producer.release(); - state.replace(State::Receiving); - log::trace!("STATE ===> Receiving"); - } - } - - // last ping > 500ms ago -> ping all interfaces - if last_pinged > Duration::from_millis(500) { - ping_addrs = Some(client_state.client.addrs.iter().cloned().collect()); - client_state.last_ping = Some(Instant::now()); - } - - (client_state.client.active_addr, enter, ping_addrs) + (client_state.active_addr, enter, start_timer) }; + if start_timer { + let _ = timer_tx.try_send(()); + } if let Some(addr) = addr { if enter { let _ = sender_tx.send((Event::Enter(), addr)).await; } let _ = sender_tx.send((e, addr)).await; } - if let Some(addrs) = ping_addrs { - for addr in addrs { - let _ = sender_tx.send((Event::Ping(), addr)).await; - } - } } async fn handle_frontend_stream( - client_manager: &Rc>, + &self, frontend: &mut FrontendListener, frontend_tx: &Sender, #[cfg(unix)] mut stream: ReadHalf, @@ -748,13 +856,13 @@ impl Server { } } }); - Self::enumerate(client_manager, frontend).await; + self.enumerate(frontend).await; } async fn handle_frontend_event( + &self, producer_notify_tx: &Sender, consumer_notify_tx: &Sender, - client_manager: &Rc>, resolve_tx: &Sender<(String, ClientHandle)>, frontend: &mut FrontendListener, port_tx: &Sender, @@ -763,41 +871,21 @@ impl Server { log::debug!("frontend: {event:?}"); match event { FrontendEvent::AddClient(hostname, port, pos) => { - Self::add_client( - resolve_tx, - client_manager, - frontend, - hostname, - HashSet::new(), - port, - pos, - ) - .await; + self.add_client(resolve_tx, frontend, hostname, HashSet::new(), port, pos) + .await; } FrontendEvent::ActivateClient(client, active) => { - Self::activate_client( - producer_notify_tx, - consumer_notify_tx, - client_manager, - client, - active, - ) - .await + self.activate_client(producer_notify_tx, consumer_notify_tx, client, active) + .await } FrontendEvent::ChangePort(port) => { let _ = port_tx.send(port).await; } FrontendEvent::DelClient(client) => { - Self::remove_client( - client_manager, - producer_notify_tx, - consumer_notify_tx, - frontend, - client, - ) - .await; + self.remove_client(producer_notify_tx, consumer_notify_tx, frontend, client) + .await; } - FrontendEvent::Enumerate() => Self::enumerate(client_manager, frontend).await, + FrontendEvent::Enumerate() => self.enumerate(frontend).await, FrontendEvent::Shutdown() => { log::info!("terminating gracefully..."); return true; @@ -809,11 +897,10 @@ impl Server { port, pos, }; - Self::update_client( + self.update_client( producer_notify_tx, consumer_notify_tx, resolve_tx, - client_manager, client_update, ) .await @@ -822,11 +909,41 @@ impl Server { false } - async fn enumerate( - client_manager: &Rc>, - frontend: &mut FrontendListener, - ) { - let clients = client_manager + async fn release_keys(&self, consumer: &mut Box, client: ClientHandle) { + let keys = self + .client_manager + .borrow_mut() + .get_mut(client) + .iter_mut() + .flat_map(|s| s.pressed_keys.drain()) + .collect::>(); + + for key in keys { + let event = Event::Keyboard(KeyboardEvent::Key { + time: 0, + key, + state: 0, + }); + consumer.consume(event, client).await; + if let Ok(key) = scancode::Linux::try_from(key) { + log::warn!("releasing stuck key: {key:?}"); + } + } + + let modifiers_event = KeyboardEvent::Modifiers { + mods_depressed: 0, + mods_latched: 0, + mods_locked: 0, + group: 0, + }; + consumer + .consume(Event::Keyboard(modifiers_event), client) + .await; + } + + async fn enumerate(&self, frontend: &mut FrontendListener) { + let clients = self + .client_manager .borrow() .get_client_states() .map(|s| (s.client.clone(), s.active)) @@ -840,14 +957,10 @@ impl Server { } } -async fn receive_event( - socket: &UdpSocket, -) -> std::result::Result<(Event, SocketAddr), Box> { +async fn receive_event(socket: &UdpSocket) -> anyhow::Result<(Event, SocketAddr)> { let mut buf = vec![0u8; 22]; - match socket.recv_from(&mut buf).await { - Ok((_amt, src)) => Ok((Event::try_from(buf)?, src)), - Err(e) => Err(Box::new(e)), - } + let (_amt, src) = socket.recv_from(&mut buf).await?; + Ok((Event::try_from(buf)?, src)) } fn send_event(sock: &UdpSocket, e: Event, addr: SocketAddr) -> Result {