From ecab3a360d74c709bd0a9a7c7bbfd9a48b819081 Mon Sep 17 00:00:00 2001 From: Ferdinand Schober Date: Thu, 18 Jan 2024 23:14:21 +0100 Subject: [PATCH] refactor producer task --- src/server.rs | 135 +++--------------------------------- src/server/producer_task.rs | 132 +++++++++++++++++++++++++++++++++++ 2 files changed, 143 insertions(+), 124 deletions(-) create mode 100644 src/server/producer_task.rs diff --git a/src/server.rs b/src/server.rs index 370442d..d0d30b3 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,5 +1,4 @@ use anyhow::anyhow; -use futures::stream::StreamExt; use log; use std::{ cell::{Cell, RefCell}, @@ -25,8 +24,8 @@ use crate::{ consumer::EventConsumer, dns, frontend::{self, FrontendEvent, FrontendListener, FrontendNotify}, - producer::EventProducer, scancode, + server::producer_task::ProducerEvent, }; use crate::{ consumer, @@ -34,6 +33,8 @@ use crate::{ producer, }; +mod producer_task; + const MAX_RESPONSE_TIME: Duration = Duration::from_millis(500); #[derive(Clone, Copy, Debug, Eq, PartialEq)] @@ -47,16 +48,6 @@ enum State { AwaitingLeave, } -#[derive(Clone, Copy, Debug)] -pub enum ProducerEvent { - /// producer must release the mouse - Release, - /// producer is notified of a change in client states - ClientEvent(ClientEvent), - /// termination signal - Terminate, -} - #[derive(Clone, Debug)] pub enum ConsumerEvent { /// consumer is notified of a change in client states @@ -108,10 +99,9 @@ impl Server { return anyhow::Ok(()); } }; - let (mut consumer, mut producer) = tokio::join!(consumer::create(), producer::create()); + let (mut consumer, producer) = tokio::join!(consumer::create(), producer::create()); let (frontend_tx, mut frontend_rx) = tokio::sync::mpsc::channel(32); - let (producer_notify_tx, mut producer_notify_rx) = tokio::sync::mpsc::channel(32); let (consumer_notify_tx, mut consumer_notify_rx) = tokio::sync::mpsc::channel(32); let (resolve_tx, mut resolve_rx) = tokio::sync::mpsc::channel(32); let (frontend_notify_tx, mut frontend_notify_rx) = tokio::sync::mpsc::channel(32); @@ -121,40 +111,13 @@ impl Server { let (timer_tx, mut timer_rx) = tokio::sync::mpsc::channel(1); // event producer - let sender_ch = sender_tx.clone(); - let timer_ch = timer_tx.clone(); - let server = self.clone(); - let mut producer_task = tokio::task::spawn_local(async move { - loop { - tokio::select! { - event = producer.next() => { - let event = event.ok_or(anyhow!("event producer closed"))??; - server.handle_producer_event(&mut producer, &sender_ch, &timer_ch, event).await?; - } - e = producer_notify_rx.recv() => { - log::debug!("producer notify rx: {e:?}"); - match e { - Some(e) => match e { - ProducerEvent::Release => { - producer.release()?; - server.state.replace(State::Receiving); - - } - ProducerEvent::ClientEvent(e) => producer.notify(e)?, - ProducerEvent::Terminate => break, - }, - None => break, - } - } - } - } - anyhow::Ok(()) - }); + let (mut producer_task, producer_channel) = + producer_task::new(producer, self.clone(), sender_tx.clone(), timer_tx.clone()); // event consumer - let producer_notify = producer_notify_tx.clone(); let sender_ch = sender_tx.clone(); let server = self.clone(); + let producer_tx = producer_channel.clone(); let mut consumer_task = tokio::task::spawn_local(async move { let mut last_ignored = None; @@ -162,7 +125,7 @@ impl Server { tokio::select! { udp_event = receiver_rx.recv() => { let udp_event = udp_event.ok_or(anyhow!("receiver closed"))??; - server.handle_udp_rx(&producer_notify, &mut consumer, &sender_ch, &mut last_ignored, udp_event, &timer_tx).await; + server.handle_udp_rx(&producer_tx, &mut consumer, &sender_ch, &mut last_ignored, udp_event, &timer_tx).await; } consumer_event = consumer_notify_rx.recv() => { match consumer_event { @@ -196,7 +159,7 @@ impl Server { // frontend listener let server = self.clone(); - let producer_notify = producer_notify_tx.clone(); + let producer_notify = producer_channel.clone(); let consumer_notify = consumer_notify_tx.clone(); let frontend_ch = frontend_tx.clone(); let resolve_ch = resolve_tx.clone(); @@ -309,7 +272,7 @@ impl Server { let server = self.clone(); let sender_ch = sender_tx.clone(); let consumer_notify = consumer_notify_tx.clone(); - let producer_notify = producer_notify_tx.clone(); + let producer_notify = producer_channel.clone(); let mut live_tracker = tokio::task::spawn_local(async move { loop { // wait for wake up signal @@ -462,7 +425,7 @@ impl Server { } let _ = consumer_notify_tx.send(ConsumerEvent::Terminate).await; - let _ = producer_notify_tx.send(ProducerEvent::Terminate).await; + let _ = producer_channel.send(ProducerEvent::Terminate).await; let _ = frontend_tx.send(FrontendEvent::Shutdown()).await; if !producer_task.is_finished() { @@ -766,82 +729,6 @@ impl Server { } } - const RELEASE_MODIFIERDS: u32 = 77; // ctrl+shift+super+alt - - async fn handle_producer_event( - &self, - producer: &mut Box, - sender_tx: &Sender<(Event, SocketAddr)>, - timer_tx: &Sender<()>, - event: (ClientHandle, Event), - ) -> Result<()> { - let (c, mut e) = event; - log::trace!("producer: ({c}) {e:?}"); - - if let Event::Keyboard(crate::event::KeyboardEvent::Modifiers { - mods_depressed, - mods_latched: _, - mods_locked: _, - group: _, - }) = e - { - if mods_depressed == Self::RELEASE_MODIFIERDS { - producer.release()?; - self.state.replace(State::Receiving); - log::trace!("STATE ===> Receiving"); - // send an event to release all the modifiers - e = Event::Disconnect(); - } - } - - let (addr, enter, start_timer) = { - let mut enter = false; - let mut start_timer = false; - - // get client state for handle - let mut client_manager = self.client_manager.borrow_mut(); - let client_state = match client_manager.get_mut(c) { - Some(state) => state, - None => { - // should not happen - log::warn!("unknown client!"); - producer.release()?; - self.state.replace(State::Receiving); - log::trace!("STATE ===> Receiving"); - return Ok(()); - } - }; - - // if we just entered the client we want to send additional enter events until - // we get a leave event - if let Event::Enter() = e { - self.state.replace(State::AwaitingLeave); - self.active_client.replace(Some(client_state.client.handle)); - log::trace!("Active client => {}", client_state.client.handle); - start_timer = true; - log::trace!("STATE ===> AwaitingLeave"); - enter = true; - } else { - // ignore any potential events in receiving mode - if self.state.get() == State::Receiving && e != Event::Disconnect() { - return Ok(()); - } - } - - (client_state.active_addr, enter, start_timer) - }; - if start_timer { - let _ = timer_tx.try_send(()); - } - if let Some(addr) = addr { - if enter { - let _ = sender_tx.send((Event::Enter(), addr)).await; - } - let _ = sender_tx.send((e, addr)).await; - } - Ok(()) - } - async fn handle_frontend_stream( &self, frontend_tx: &Sender, diff --git a/src/server/producer_task.rs b/src/server/producer_task.rs new file mode 100644 index 0000000..76b2f84 --- /dev/null +++ b/src/server/producer_task.rs @@ -0,0 +1,132 @@ +use anyhow::{anyhow, Result}; +use futures::StreamExt; +use std::net::SocketAddr; + +use tokio::{sync::mpsc::Sender, task::JoinHandle}; + +use crate::{ + client::{ClientEvent, ClientHandle}, + event::{Event, KeyboardEvent}, + producer::EventProducer, + server::State, +}; + +use super::Server; + +#[derive(Clone, Copy, Debug)] +pub enum ProducerEvent { + /// producer must release the mouse + Release, + /// producer is notified of a change in client states + ClientEvent(ClientEvent), + /// termination signal + Terminate, +} + +pub fn new( + mut producer: Box, + server: Server, + sender_tx: Sender<(Event, SocketAddr)>, + timer_tx: Sender<()>, +) -> (JoinHandle>, Sender) { + let (tx, mut rx) = tokio::sync::mpsc::channel(32); + let task = tokio::task::spawn_local(async move { + loop { + tokio::select! { + event = producer.next() => { + let event = event.ok_or(anyhow!("event producer closed"))??; + handle_producer_event(&server, &mut producer, &sender_tx, &timer_tx, event).await?; + } + e = rx.recv() => { + log::debug!("producer notify rx: {e:?}"); + match e { + Some(e) => match e { + ProducerEvent::Release => { + producer.release()?; + server.state.replace(State::Receiving); + + } + ProducerEvent::ClientEvent(e) => producer.notify(e)?, + ProducerEvent::Terminate => break, + }, + None => break, + } + } + } + } + anyhow::Ok(()) + }); + (task, tx) +} + +const RELEASE_MODIFIERDS: u32 = 77; // ctrl+shift+super+alt + +async fn handle_producer_event( + server: &Server, + producer: &mut Box, + sender_tx: &Sender<(Event, SocketAddr)>, + timer_tx: &Sender<()>, + event: (ClientHandle, Event), +) -> Result<()> { + let (c, mut e) = event; + log::trace!("({c}) {e:?}"); + + if let Event::Keyboard(KeyboardEvent::Modifiers { mods_depressed, .. }) = e { + if mods_depressed == RELEASE_MODIFIERDS { + producer.release()?; + server.state.replace(State::Receiving); + log::trace!("STATE ===> Receiving"); + // send an event to release all the modifiers + e = Event::Disconnect(); + } + } + + let (addr, enter, start_timer) = { + let mut enter = false; + let mut start_timer = false; + + // get client state for handle + let mut client_manager = server.client_manager.borrow_mut(); + let client_state = match client_manager.get_mut(c) { + Some(state) => state, + None => { + // should not happen + log::warn!("unknown client!"); + producer.release()?; + server.state.replace(State::Receiving); + log::trace!("STATE ===> Receiving"); + return Ok(()); + } + }; + + // if we just entered the client we want to send additional enter events until + // we get a leave event + if let Event::Enter() = e { + server.state.replace(State::AwaitingLeave); + server + .active_client + .replace(Some(client_state.client.handle)); + log::trace!("Active client => {}", client_state.client.handle); + start_timer = true; + log::trace!("STATE ===> AwaitingLeave"); + enter = true; + } else { + // ignore any potential events in receiving mode + if server.state.get() == State::Receiving && e != Event::Disconnect() { + return Ok(()); + } + } + + (client_state.active_addr, enter, start_timer) + }; + if start_timer { + let _ = timer_tx.try_send(()); + } + if let Some(addr) = addr { + if enter { + let _ = sender_tx.send((Event::Enter(), addr)).await; + } + let _ = sender_tx.send((e, addr)).await; + } + Ok(()) +}