From 017bc4317650e41cf01b3c0f4a351e057b635825 Mon Sep 17 00:00:00 2001 From: Ferdinand Schober Date: Fri, 19 Jan 2024 00:54:10 +0100 Subject: [PATCH] refactor timer task --- src/server.rs | 131 ++++------------------------------------ src/server/ping_task.rs | 131 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 143 insertions(+), 119 deletions(-) create mode 100644 src/server/ping_task.rs diff --git a/src/server.rs b/src/server.rs index 84be47c..d12fa8a 100644 --- a/src/server.rs +++ b/src/server.rs @@ -2,17 +2,13 @@ use log; use std::{ cell::{Cell, RefCell}, rc::Rc, - time::Duration, }; use tokio::signal; -use std::net::SocketAddr; - use crate::{ client::{ClientHandle, ClientManager}, config::Config, dns, - event::Event, frontend::{FrontendEvent, FrontendListener}, server::producer_task::ProducerEvent, }; @@ -23,11 +19,10 @@ use self::{consumer_task::ConsumerEvent, resolver_task::DnsRequest}; mod consumer_task; mod frontend_task; mod network_task; +mod ping_task; mod producer_task; mod resolver_task; -const MAX_RESPONSE_TIME: Duration = Duration::from_millis(500); - #[derive(Clone, Copy, Debug, Eq, PartialEq)] enum State { /// Currently sending events to another device @@ -82,7 +77,7 @@ impl Server { }; let (consumer, producer) = tokio::join!(consumer::create(), producer::create()); - let (timer_tx, mut timer_rx) = tokio::sync::mpsc::channel(1); + let (timer_tx, timer_rx) = tokio::sync::mpsc::channel(1); let (frontend_notify_tx, frontend_notify_rx) = tokio::sync::mpsc::channel(1); // udp task @@ -118,116 +113,14 @@ impl Server { port_tx, ); - // timer task - let server = self.clone(); - let sender_ch = sender_tx.clone(); - let consumer_notify = consumer_channel.clone(); - let producer_notify = producer_channel.clone(); - let mut live_tracker = tokio::task::spawn_local(async move { - loop { - // wait for wake up signal - let Some(_): Option<()> = timer_rx.recv().await else { - break; - }; - loop { - let receiving = server.state.get() == State::Receiving; - let (ping_clients, ping_addrs) = { - let mut client_manager = server.client_manager.borrow_mut(); - - let ping_clients: Vec = if receiving { - // if receiving we care about clients with pressed keys - client_manager - .get_client_states_mut() - .filter(|s| !s.pressed_keys.is_empty()) - .map(|s| s.client.handle) - .collect() - } else { - // if sending we care about the active client - server.active_client.get().iter().cloned().collect() - }; - - // get relevant socket addrs for clients - let ping_addrs: Vec = { - ping_clients - .iter() - .flat_map(|&c| client_manager.get(c)) - .flat_map(|state| { - if state.alive && state.active_addr.is_some() { - vec![state.active_addr.unwrap()] - } else { - state - .client - .ips - .iter() - .cloned() - .map(|ip| SocketAddr::new(ip, state.client.port)) - .collect() - } - }) - .collect() - }; - - // reset alive - for state in client_manager.get_client_states_mut() { - state.alive = false; - } - - (ping_clients, ping_addrs) - }; - - if receiving && ping_clients.is_empty() { - // receiving and no client has pressed keys - // -> no need to keep pinging - break; - } - - // ping clients - for addr in ping_addrs { - if sender_ch.send((Event::Ping(), addr)).await.is_err() { - break; - } - } - - // give clients time to resond - if receiving { - log::debug!("waiting {MAX_RESPONSE_TIME:?} for response from client with pressed keys ..."); - } else { - log::debug!("state: {:?} => waiting {MAX_RESPONSE_TIME:?} for client to respond ...", server.state.get()); - } - - tokio::time::sleep(MAX_RESPONSE_TIME).await; - - // when anything is received from a client, - // the alive flag gets set - let unresponsive_clients: Vec<_> = { - let client_manager = server.client_manager.borrow(); - ping_clients - .iter() - .filter_map(|&c| match client_manager.get(c) { - Some(state) if !state.alive => Some(c), - _ => None, - }) - .collect() - }; - - // we may not be receiving anymore but we should respond - // to the original state and not the "new" one - if receiving { - for c in unresponsive_clients { - log::warn!("device not responding, releasing keys!"); - let _ = consumer_notify.send(ConsumerEvent::ReleaseKeys(c)).await; - } - } else { - // release pointer if the active client has not responded - if !unresponsive_clients.is_empty() { - log::warn!("client not responding, releasing pointer!"); - server.state.replace(State::Receiving); - let _ = producer_notify.send(ProducerEvent::Release).await; - } - } - } - } - }); + // task that pings clients to see if they are responding + let mut ping_task = ping_task::new( + self.clone(), + sender_tx.clone(), + consumer_channel.clone(), + producer_channel.clone(), + timer_rx, + ); let active = self .client_manager @@ -271,7 +164,7 @@ impl Server { } _ = &mut resolver_task => { } _ = &mut udp_task => { } - _ = &mut live_tracker => { } + _ = &mut ping_task => { } } let _ = consumer_channel.send(ConsumerEvent::Terminate).await; @@ -297,7 +190,7 @@ impl Server { resolver_task.abort(); udp_task.abort(); - live_tracker.abort(); + ping_task.abort(); Ok(()) } diff --git a/src/server/ping_task.rs b/src/server/ping_task.rs new file mode 100644 index 0000000..6fe0f17 --- /dev/null +++ b/src/server/ping_task.rs @@ -0,0 +1,131 @@ +use std::{net::SocketAddr, time::Duration}; + +use tokio::{ + sync::mpsc::{Receiver, Sender}, + task::JoinHandle, +}; + +use crate::{client::ClientHandle, event::Event}; + +use super::{consumer_task::ConsumerEvent, producer_task::ProducerEvent, Server, State}; + +const MAX_RESPONSE_TIME: Duration = Duration::from_millis(500); + +pub fn new( + server: Server, + sender_ch: Sender<(Event, SocketAddr)>, + consumer_notify: Sender, + producer_notify: Sender, + mut timer_rx: Receiver<()>, +) -> JoinHandle<()> { + // timer task + let ping_task = tokio::task::spawn_local(async move { + loop { + // wait for wake up signal + let Some(_): Option<()> = timer_rx.recv().await else { + break; + }; + loop { + let receiving = server.state.get() == State::Receiving; + let (ping_clients, ping_addrs) = { + let mut client_manager = server.client_manager.borrow_mut(); + + let ping_clients: Vec = if receiving { + // if receiving we care about clients with pressed keys + client_manager + .get_client_states_mut() + .filter(|s| !s.pressed_keys.is_empty()) + .map(|s| s.client.handle) + .collect() + } else { + // if sending we care about the active client + server.active_client.get().iter().cloned().collect() + }; + + // get relevant socket addrs for clients + let ping_addrs: Vec = { + ping_clients + .iter() + .flat_map(|&c| client_manager.get(c)) + .flat_map(|state| { + if state.alive && state.active_addr.is_some() { + vec![state.active_addr.unwrap()] + } else { + state + .client + .ips + .iter() + .cloned() + .map(|ip| SocketAddr::new(ip, state.client.port)) + .collect() + } + }) + .collect() + }; + + // reset alive + for state in client_manager.get_client_states_mut() { + state.alive = false; + } + + (ping_clients, ping_addrs) + }; + + if receiving && ping_clients.is_empty() { + // receiving and no client has pressed keys + // -> no need to keep pinging + break; + } + + // ping clients + for addr in ping_addrs { + if sender_ch.send((Event::Ping(), addr)).await.is_err() { + break; + } + } + + // give clients time to resond + if receiving { + log::debug!("waiting {MAX_RESPONSE_TIME:?} for response from client with pressed keys ..."); + } else { + log::debug!( + "state: {:?} => waiting {MAX_RESPONSE_TIME:?} for client to respond ...", + server.state.get() + ); + } + + tokio::time::sleep(MAX_RESPONSE_TIME).await; + + // when anything is received from a client, + // the alive flag gets set + let unresponsive_clients: Vec<_> = { + let client_manager = server.client_manager.borrow(); + ping_clients + .iter() + .filter_map(|&c| match client_manager.get(c) { + Some(state) if !state.alive => Some(c), + _ => None, + }) + .collect() + }; + + // we may not be receiving anymore but we should respond + // to the original state and not the "new" one + if receiving { + for c in unresponsive_clients { + log::warn!("device not responding, releasing keys!"); + let _ = consumer_notify.send(ConsumerEvent::ReleaseKeys(c)).await; + } + } else { + // release pointer if the active client has not responded + if !unresponsive_clients.is_empty() { + log::warn!("client not responding, releasing pointer!"); + server.state.replace(State::Receiving); + let _ = producer_notify.send(ProducerEvent::Release).await; + } + } + } + } + }); + ping_task +}