diff --git a/src/server.rs b/src/server.rs index d0d30b3..ebaa24f 100644 --- a/src/server.rs +++ b/src/server.rs @@ -21,18 +21,16 @@ use std::{io::ErrorKind, net::SocketAddr}; use crate::{ client::{ClientEvent, ClientHandle, ClientManager, Position}, config::Config, - consumer::EventConsumer, dns, + event::Event, frontend::{self, FrontendEvent, FrontendListener, FrontendNotify}, - scancode, server::producer_task::ProducerEvent, }; -use crate::{ - consumer, - event::{Event, KeyboardEvent}, - producer, -}; +use crate::{consumer, producer}; +use self::consumer_task::ConsumerEvent; + +mod consumer_task; mod producer_task; const MAX_RESPONSE_TIME: Duration = Duration::from_millis(500); @@ -48,16 +46,6 @@ enum State { AwaitingLeave, } -#[derive(Clone, Debug)] -pub enum ConsumerEvent { - /// consumer is notified of a change in client states - ClientEvent(ClientEvent), - /// consumer must release keys for client - ReleaseKeys(ClientHandle), - /// termination signal - Terminate, -} - #[derive(Clone)] pub struct Server { active_client: Rc>>, @@ -99,13 +87,12 @@ impl Server { return anyhow::Ok(()); } }; - let (mut consumer, producer) = tokio::join!(consumer::create(), producer::create()); + let (consumer, producer) = tokio::join!(consumer::create(), producer::create()); let (frontend_tx, mut frontend_rx) = tokio::sync::mpsc::channel(32); - let (consumer_notify_tx, mut consumer_notify_rx) = tokio::sync::mpsc::channel(32); let (resolve_tx, mut resolve_rx) = tokio::sync::mpsc::channel(32); let (frontend_notify_tx, mut frontend_notify_rx) = tokio::sync::mpsc::channel(32); - let (receiver_tx, mut receiver_rx) = tokio::sync::mpsc::channel(32); + let (receiver_tx, receiver_rx) = tokio::sync::mpsc::channel(32); let (sender_tx, mut sender_rx) = tokio::sync::mpsc::channel(32); let (port_tx, mut port_rx) = tokio::sync::mpsc::channel(32); let (timer_tx, mut timer_rx) = tokio::sync::mpsc::channel(1); @@ -115,52 +102,19 @@ impl Server { producer_task::new(producer, self.clone(), sender_tx.clone(), timer_tx.clone()); // event consumer - let sender_ch = sender_tx.clone(); - let server = self.clone(); - let producer_tx = producer_channel.clone(); - let mut consumer_task = tokio::task::spawn_local(async move { - let mut last_ignored = None; - - loop { - tokio::select! { - udp_event = receiver_rx.recv() => { - let udp_event = udp_event.ok_or(anyhow!("receiver closed"))??; - server.handle_udp_rx(&producer_tx, &mut consumer, &sender_ch, &mut last_ignored, udp_event, &timer_tx).await; - } - consumer_event = consumer_notify_rx.recv() => { - match consumer_event { - Some(e) => match e { - ConsumerEvent::ClientEvent(e) => consumer.notify(e).await, - ConsumerEvent::ReleaseKeys(c) => server.release_keys(&mut consumer, c).await, - ConsumerEvent::Terminate => break, - }, - None => break, - } - } - _ = consumer.dispatch() => { } - } - } - - // release potentially still pressed keys - let clients = server - .client_manager - .borrow() - .get_client_states() - .map(|s| s.client.handle) - .collect::>(); - for client in clients { - server.release_keys(&mut consumer, client).await; - } - - // destroy consumer - consumer.destroy().await; - anyhow::Ok(()) - }); + let (mut consumer_task, consumer_channel) = consumer_task::new( + consumer, + self.clone(), + receiver_rx, + sender_tx.clone(), + producer_channel.clone(), + timer_tx, + ); // frontend listener let server = self.clone(); let producer_notify = producer_channel.clone(); - let consumer_notify = consumer_notify_tx.clone(); + let consumer_notify = consumer_channel.clone(); let frontend_ch = frontend_tx.clone(); let resolve_ch = resolve_tx.clone(); let mut frontend_task = tokio::task::spawn_local(async move { @@ -271,7 +225,7 @@ impl Server { // timer task let server = self.clone(); let sender_ch = sender_tx.clone(); - let consumer_notify = consumer_notify_tx.clone(); + let consumer_notify = consumer_channel.clone(); let producer_notify = producer_channel.clone(); let mut live_tracker = tokio::task::spawn_local(async move { loop { @@ -424,7 +378,7 @@ impl Server { _ = &mut live_tracker => { } } - let _ = consumer_notify_tx.send(ConsumerEvent::Terminate).await; + let _ = consumer_channel.send(ConsumerEvent::Terminate).await; let _ = producer_channel.send(ProducerEvent::Terminate).await; let _ = frontend_tx.send(FrontendEvent::Shutdown()).await; @@ -603,132 +557,6 @@ impl Server { } } - async fn handle_udp_rx( - &self, - producer_notify_tx: &Sender, - consumer: &mut Box, - sender_tx: &Sender<(Event, SocketAddr)>, - last_ignored: &mut Option, - event: (Event, SocketAddr), - timer_tx: &Sender<()>, - ) { - let (event, addr) = event; - - // get handle for addr - let handle = match self.client_manager.borrow().get_client(addr) { - Some(a) => a, - None => { - if last_ignored.is_none() || last_ignored.is_some() && last_ignored.unwrap() != addr - { - log::warn!("ignoring events from client {addr}"); - last_ignored.replace(addr); - } - return; - } - }; - - // next event can be logged as ignored again - last_ignored.take(); - - log::trace!("{:20} <-<-<-<------ {addr} ({handle})", event.to_string()); - { - let mut client_manager = self.client_manager.borrow_mut(); - let client_state = match client_manager.get_mut(handle) { - Some(s) => s, - None => { - log::error!("unknown handle"); - return; - } - }; - - // reset ttl for client and - client_state.alive = true; - // set addr as new default for this client - client_state.active_addr = Some(addr); - } - - match (event, addr) { - (Event::Pong(), _) => { /* ignore pong events */ } - (Event::Ping(), addr) => { - let _ = sender_tx.send((Event::Pong(), addr)).await; - } - (Event::Disconnect(), _) => { - self.release_keys(consumer, handle).await; - } - (event, addr) => { - // tell clients that we are ready to receive events - if let Event::Enter() = event { - let _ = sender_tx.send((Event::Leave(), addr)).await; - } - - match self.state.get() { - State::Sending => { - if let Event::Leave() = event { - // ignore additional leave events that may - // have been sent for redundancy - } else { - // upon receiving any event, we go back to receiving mode - self.state.replace(State::Receiving); - let _ = producer_notify_tx.send(ProducerEvent::Release).await; - log::trace!("STATE ===> Receiving"); - } - } - State::Receiving => { - let mut ignore_event = false; - if let Event::Keyboard(KeyboardEvent::Key { - time: _, - key, - state, - }) = event - { - let mut client_manager = self.client_manager.borrow_mut(); - let client_state = - if let Some(client_state) = client_manager.get_mut(handle) { - client_state - } else { - log::error!("unknown handle"); - return; - }; - if state == 0 { - // ignore release event if key not pressed - ignore_event = !client_state.pressed_keys.remove(&key); - } else { - // ignore press event if key not released - ignore_event = !client_state.pressed_keys.insert(key); - let _ = timer_tx.try_send(()); - } - } - // ignore double press / release events to - // workaround buggy rdp backend. - if !ignore_event { - // consume event - consumer.consume(event, handle).await; - log::trace!("{event:?} => consumer"); - } - } - State::AwaitingLeave => { - // we just entered the deadzone of a client, so - // we need to ignore events that may still - // be on the way until a leave event occurs - // telling us the client registered the enter - if let Event::Leave() = event { - self.state.replace(State::Sending); - log::trace!("STATE ===> Sending"); - } - - // entering a client that is waiting for a leave - // event should still be possible - if let Event::Enter() = event { - self.state.replace(State::Receiving); - let _ = producer_notify_tx.send(ProducerEvent::Release).await; - log::trace!("STATE ===> Receiving"); - } - } - } - } - } - } - async fn handle_frontend_stream( &self, frontend_tx: &Sender, @@ -839,38 +667,6 @@ impl Server { } false } - - async fn release_keys(&self, consumer: &mut Box, client: ClientHandle) { - let keys = self - .client_manager - .borrow_mut() - .get_mut(client) - .iter_mut() - .flat_map(|s| s.pressed_keys.drain()) - .collect::>(); - - for key in keys { - let event = Event::Keyboard(KeyboardEvent::Key { - time: 0, - key, - state: 0, - }); - consumer.consume(event, client).await; - if let Ok(key) = scancode::Linux::try_from(key) { - log::warn!("releasing stuck key: {key:?}"); - } - } - - let modifiers_event = KeyboardEvent::Modifiers { - mods_depressed: 0, - mods_latched: 0, - mods_locked: 0, - group: 0, - }; - consumer - .consume(Event::Keyboard(modifiers_event), client) - .await; - } } async fn receive_event(socket: &UdpSocket) -> anyhow::Result<(Event, SocketAddr)> { diff --git a/src/server/consumer_task.rs b/src/server/consumer_task.rs new file mode 100644 index 0000000..fc67a41 --- /dev/null +++ b/src/server/consumer_task.rs @@ -0,0 +1,238 @@ +use anyhow::{anyhow, Result}; +use std::net::SocketAddr; + +use tokio::{ + sync::mpsc::{Receiver, Sender}, + task::JoinHandle, +}; + +use crate::{ + client::{ClientEvent, ClientHandle}, + consumer::EventConsumer, + event::{Event, KeyboardEvent}, + scancode, + server::State, +}; + +use super::{ProducerEvent, Server}; + +#[derive(Clone, Debug)] +pub enum ConsumerEvent { + /// consumer is notified of a change in client states + ClientEvent(ClientEvent), + /// consumer must release keys for client + ReleaseKeys(ClientHandle), + /// termination signal + Terminate, +} + +pub fn new( + mut consumer: Box, + server: Server, + mut udp_rx: Receiver>, + sender_tx: Sender<(Event, SocketAddr)>, + producer_tx: Sender, + timer_tx: Sender<()>, +) -> (JoinHandle>, Sender) { + let (tx, mut rx) = tokio::sync::mpsc::channel(32); + let consumer_task = tokio::task::spawn_local(async move { + let mut last_ignored = None; + + loop { + tokio::select! { + udp_event = udp_rx.recv() => { + let udp_event = udp_event.ok_or(anyhow!("receiver closed"))??; + handle_udp_rx(&server, &producer_tx, &mut consumer, &sender_tx, &mut last_ignored, udp_event, &timer_tx).await; + } + consumer_event = rx.recv() => { + match consumer_event { + Some(e) => match e { + ConsumerEvent::ClientEvent(e) => consumer.notify(e).await, + ConsumerEvent::ReleaseKeys(c) => release_keys(&server, &mut consumer, c).await, + ConsumerEvent::Terminate => break, + }, + None => break, + } + } + _ = consumer.dispatch() => { } + } + } + + // release potentially still pressed keys + let clients = server + .client_manager + .borrow() + .get_client_states() + .map(|s| s.client.handle) + .collect::>(); + for client in clients { + release_keys(&server, &mut consumer, client).await; + } + + // destroy consumer + consumer.destroy().await; + anyhow::Ok(()) + }); + (consumer_task, tx) +} + +async fn handle_udp_rx( + server: &Server, + producer_notify_tx: &Sender, + consumer: &mut Box, + sender_tx: &Sender<(Event, SocketAddr)>, + last_ignored: &mut Option, + event: (Event, SocketAddr), + timer_tx: &Sender<()>, +) { + let (event, addr) = event; + + // get handle for addr + let handle = match server.client_manager.borrow().get_client(addr) { + Some(a) => a, + None => { + if last_ignored.is_none() || last_ignored.is_some() && last_ignored.unwrap() != addr { + log::warn!("ignoring events from client {addr}"); + last_ignored.replace(addr); + } + return; + } + }; + + // next event can be logged as ignored again + last_ignored.take(); + + log::trace!("{:20} <-<-<-<------ {addr} ({handle})", event.to_string()); + { + let mut client_manager = server.client_manager.borrow_mut(); + let client_state = match client_manager.get_mut(handle) { + Some(s) => s, + None => { + log::error!("unknown handle"); + return; + } + }; + + // reset ttl for client and + client_state.alive = true; + // set addr as new default for this client + client_state.active_addr = Some(addr); + } + + match (event, addr) { + (Event::Pong(), _) => { /* ignore pong events */ } + (Event::Ping(), addr) => { + let _ = sender_tx.send((Event::Pong(), addr)).await; + } + (Event::Disconnect(), _) => { + release_keys(server, consumer, handle).await; + } + (event, addr) => { + // tell clients that we are ready to receive events + if let Event::Enter() = event { + let _ = sender_tx.send((Event::Leave(), addr)).await; + } + + match server.state.get() { + State::Sending => { + if let Event::Leave() = event { + // ignore additional leave events that may + // have been sent for redundancy + } else { + // upon receiving any event, we go back to receiving mode + server.state.replace(State::Receiving); + let _ = producer_notify_tx.send(ProducerEvent::Release).await; + log::trace!("STATE ===> Receiving"); + } + } + State::Receiving => { + let mut ignore_event = false; + if let Event::Keyboard(KeyboardEvent::Key { + time: _, + key, + state, + }) = event + { + let mut client_manager = server.client_manager.borrow_mut(); + let client_state = + if let Some(client_state) = client_manager.get_mut(handle) { + client_state + } else { + log::error!("unknown handle"); + return; + }; + if state == 0 { + // ignore release event if key not pressed + ignore_event = !client_state.pressed_keys.remove(&key); + } else { + // ignore press event if key not released + ignore_event = !client_state.pressed_keys.insert(key); + let _ = timer_tx.try_send(()); + } + } + // ignore double press / release events to + // workaround buggy rdp backend. + if !ignore_event { + // consume event + consumer.consume(event, handle).await; + log::trace!("{event:?} => consumer"); + } + } + State::AwaitingLeave => { + // we just entered the deadzone of a client, so + // we need to ignore events that may still + // be on the way until a leave event occurs + // telling us the client registered the enter + if let Event::Leave() = event { + server.state.replace(State::Sending); + log::trace!("STATE ===> Sending"); + } + + // entering a client that is waiting for a leave + // event should still be possible + if let Event::Enter() = event { + server.state.replace(State::Receiving); + let _ = producer_notify_tx.send(ProducerEvent::Release).await; + log::trace!("STATE ===> Receiving"); + } + } + } + } + } +} + +async fn release_keys( + server: &Server, + consumer: &mut Box, + client: ClientHandle, +) { + let keys = server + .client_manager + .borrow_mut() + .get_mut(client) + .iter_mut() + .flat_map(|s| s.pressed_keys.drain()) + .collect::>(); + + for key in keys { + let event = Event::Keyboard(KeyboardEvent::Key { + time: 0, + key, + state: 0, + }); + consumer.consume(event, client).await; + if let Ok(key) = scancode::Linux::try_from(key) { + log::warn!("releasing stuck key: {key:?}"); + } + } + + let modifiers_event = KeyboardEvent::Modifiers { + mods_depressed: 0, + mods_latched: 0, + mods_locked: 0, + group: 0, + }; + consumer + .consume(Event::Keyboard(modifiers_event), client) + .await; +}