From 4c30f032f447d6c06366adf2ff74e07f05bd480d Mon Sep 17 00:00:00 2001 From: Ferdinand Schober Date: Thu, 11 Jul 2024 16:47:36 +0200 Subject: [PATCH] cleanup --- src/server.rs | 18 +-- src/server/frontend_task.rs | 34 +++--- src/server/ping_task.rs | 219 +++++++++++++++++++----------------- 3 files changed, 142 insertions(+), 129 deletions(-) diff --git a/src/server.rs b/src/server.rs index 3f5709e..751da61 100644 --- a/src/server.rs +++ b/src/server.rs @@ -97,7 +97,7 @@ impl Server { } }; - let timer_notify = Arc::new(Notify::new()); + let timer_notify = Arc::new(Notify::new()); /* notify ping timer restart */ let (frontend_tx, frontend_rx) = channel(1); /* events coming from frontends */ let cancellation_token = CancellationToken::new(); /* notify termination */ let notify_capture = Arc::new(Notify::new()); /* notify capture restart */ @@ -166,6 +166,7 @@ impl Server { emulate_channel.clone(), capture_channel.clone(), timer_notify, + cancellation_token.clone(), ); let active = self @@ -196,11 +197,7 @@ impl Server { } _ = &mut capture_task => { } _ = &mut emulation_task => { } - e = &mut frontend_task => { - if let Ok(Err(e)) = e { - log::error!("error in frontend listener: {e}"); - } - } + _ = &mut frontend_task => { } _ = &mut resolver_task => { } _ = &mut udp_task => { } _ = &mut ping_task => { } @@ -209,9 +206,14 @@ impl Server { // cancel tasks cancellation_token.cancel(); - let _ = join!(capture_task, emulation_task, frontend_task, udp_task); + let _ = join!( + capture_task, + emulation_task, + frontend_task, + udp_task, + resolver_task + ); - resolver_task.abort(); ping_task.abort(); Ok(()) diff --git a/src/server/frontend_task.rs b/src/server/frontend_task.rs index ee80239..6d2a9c2 100644 --- a/src/server/frontend_task.rs +++ b/src/server/frontend_task.rs @@ -10,7 +10,6 @@ use tokio::net::UnixStream; #[cfg(windows)] use tokio::net::TcpStream; -use anyhow::{anyhow, Result}; use tokio::{ io::ReadHalf, sync::{ @@ -32,7 +31,7 @@ use super::{ pub(crate) fn new( mut frontend: FrontendListener, - mut notify_rx: Receiver, + mut event: Receiver, server: Server, notify_capture: Arc, notify_emulation: Arc, @@ -41,32 +40,28 @@ pub(crate) fn new( resolve_ch: Sender, port_tx: Sender, cancellation_token: CancellationToken, -) -> (JoinHandle>, Sender) { - let (event_tx, mut event_rx) = tokio::sync::mpsc::channel(32); - let event_tx_clone = event_tx.clone(); +) -> (JoinHandle<()>, Sender) { + let (request_tx, mut request) = tokio::sync::mpsc::channel(32); + let request_tx_clone = request_tx.clone(); let frontend_task = tokio::task::spawn_local(async move { let mut join_handles = vec![]; loop { tokio::select! { stream = frontend.accept() => { - let stream = match stream { - Ok(s) => s, - Err(e) => { - log::warn!("error accepting frontend connection: {e}"); - continue; - } + match stream { + Ok(s) => join_handles.push(handle_frontend_stream(&request_tx_clone, s, cancellation_token.clone())), + Err(e) => log::warn!("error accepting frontend connection: {e}"), }; - join_handles.push(handle_frontend_stream(&event_tx_clone, stream, cancellation_token.clone())); } - event = event_rx.recv() => { - let frontend_event = event.ok_or(anyhow!("frontend channel closed"))?; - if handle_frontend_event(&server, ¬ify_capture, ¬ify_emulation, &capture, &emulate, &resolve_ch, &mut frontend, &port_tx, frontend_event).await { + request = request.recv() => { + let request = request.expect("frontend request channel closed"); + if handle_frontend_event(&server, ¬ify_capture, ¬ify_emulation, &capture, &emulate, &resolve_ch, &mut frontend, &port_tx, request).await { break; } } - notify = notify_rx.recv() => { - let notify = notify.ok_or(anyhow!("frontend notify closed"))?; - let _ = frontend.broadcast_event(notify).await; + event = event.recv() => { + let event = event.expect("channel closed"); + let _ = frontend.broadcast_event(event).await; } _ = cancellation_token.cancelled() => { futures::future::join_all(join_handles).await; @@ -74,9 +69,8 @@ pub(crate) fn new( } } } - anyhow::Ok(()) }); - (frontend_task, event_tx) + (frontend_task, request_tx) } fn handle_frontend_stream( diff --git a/src/server/ping_task.rs b/src/server/ping_task.rs index 0e7189f..67870c1 100644 --- a/src/server/ping_task.rs +++ b/src/server/ping_task.rs @@ -6,6 +6,7 @@ use tokio::{ }; use input_event::Event; +use tokio_util::sync::CancellationToken; use crate::client::ClientHandle; @@ -19,111 +20,127 @@ pub fn new( emulate_notify: Sender, capture_notify: Sender, timer_notify: Arc, + cancellation_token: CancellationToken, ) -> JoinHandle<()> { // timer task let ping_task = tokio::task::spawn_local(async move { - loop { - // wait for wake up signal - timer_notify.notified().await; - loop { - let receiving = server.state.get() == State::Receiving; - let (ping_clients, ping_addrs) = { - let mut client_manager = server.client_manager.borrow_mut(); - - let ping_clients: Vec = if receiving { - // if receiving we care about clients with pressed keys - client_manager - .get_client_states_mut() - .filter(|(_, (_, s))| !s.pressed_keys.is_empty()) - .map(|(h, _)| h) - .collect() - } else { - // if sending we care about the active client - server.active_client.get().iter().cloned().collect() - }; - - // get relevant socket addrs for clients - let ping_addrs: Vec = { - ping_clients - .iter() - .flat_map(|&h| client_manager.get(h)) - .flat_map(|(c, s)| { - if s.alive && s.active_addr.is_some() { - vec![s.active_addr.unwrap()] - } else { - s.ips - .iter() - .cloned() - .map(|ip| SocketAddr::new(ip, c.port)) - .collect() - } - }) - .collect() - }; - - // reset alive - for (_, (_, s)) in client_manager.get_client_states_mut() { - s.alive = false; - } - - (ping_clients, ping_addrs) - }; - - if receiving && ping_clients.is_empty() { - // receiving and no client has pressed keys - // -> no need to keep pinging - break; - } - - // ping clients - for addr in ping_addrs { - if sender_ch.send((Event::Ping(), addr)).await.is_err() { - break; - } - } - - // give clients time to resond - if receiving { - log::trace!("waiting {MAX_RESPONSE_TIME:?} for response from client with pressed keys ..."); - } else { - log::trace!( - "state: {:?} => waiting {MAX_RESPONSE_TIME:?} for client to respond ...", - server.state.get() - ); - } - - tokio::time::sleep(MAX_RESPONSE_TIME).await; - - // when anything is received from a client, - // the alive flag gets set - let unresponsive_clients: Vec<_> = { - let client_manager = server.client_manager.borrow(); - ping_clients - .iter() - .filter_map(|&h| match client_manager.get(h) { - Some((_, s)) if !s.alive => Some(h), - _ => None, - }) - .collect() - }; - - // we may not be receiving anymore but we should respond - // to the original state and not the "new" one - if receiving { - for h in unresponsive_clients { - log::warn!("device not responding, releasing keys!"); - let _ = emulate_notify.send(EmulationEvent::ReleaseKeys(h)).await; - } - } else { - // release pointer if the active client has not responded - if !unresponsive_clients.is_empty() { - log::warn!("client not responding, releasing pointer!"); - server.state.replace(State::Receiving); - let _ = capture_notify.send(CaptureEvent::Release).await; - } - } - } + tokio::select! { + _ = cancellation_token.cancelled() => {} + _ = ping_task(server, sender_ch, emulate_notify, capture_notify, timer_notify) => {} } }); ping_task } + +async fn ping_task( + server: Server, + sender_ch: Sender<(Event, SocketAddr)>, + emulate_notify: Sender, + capture_notify: Sender, + timer_notify: Arc, +) { + loop { + // wait for wake up signal + timer_notify.notified().await; + loop { + let receiving = server.state.get() == State::Receiving; + let (ping_clients, ping_addrs) = { + let mut client_manager = server.client_manager.borrow_mut(); + + let ping_clients: Vec = if receiving { + // if receiving we care about clients with pressed keys + client_manager + .get_client_states_mut() + .filter(|(_, (_, s))| !s.pressed_keys.is_empty()) + .map(|(h, _)| h) + .collect() + } else { + // if sending we care about the active client + server.active_client.get().iter().cloned().collect() + }; + + // get relevant socket addrs for clients + let ping_addrs: Vec = { + ping_clients + .iter() + .flat_map(|&h| client_manager.get(h)) + .flat_map(|(c, s)| { + if s.alive && s.active_addr.is_some() { + vec![s.active_addr.unwrap()] + } else { + s.ips + .iter() + .cloned() + .map(|ip| SocketAddr::new(ip, c.port)) + .collect() + } + }) + .collect() + }; + + // reset alive + for (_, (_, s)) in client_manager.get_client_states_mut() { + s.alive = false; + } + + (ping_clients, ping_addrs) + }; + + if receiving && ping_clients.is_empty() { + // receiving and no client has pressed keys + // -> no need to keep pinging + break; + } + + // ping clients + for addr in ping_addrs { + if sender_ch.send((Event::Ping(), addr)).await.is_err() { + break; + } + } + + // give clients time to resond + if receiving { + log::trace!( + "waiting {MAX_RESPONSE_TIME:?} for response from client with pressed keys ..." + ); + } else { + log::trace!( + "state: {:?} => waiting {MAX_RESPONSE_TIME:?} for client to respond ...", + server.state.get() + ); + } + + tokio::time::sleep(MAX_RESPONSE_TIME).await; + + // when anything is received from a client, + // the alive flag gets set + let unresponsive_clients: Vec<_> = { + let client_manager = server.client_manager.borrow(); + ping_clients + .iter() + .filter_map(|&h| match client_manager.get(h) { + Some((_, s)) if !s.alive => Some(h), + _ => None, + }) + .collect() + }; + + // we may not be receiving anymore but we should respond + // to the original state and not the "new" one + if receiving { + for h in unresponsive_clients { + log::warn!("device not responding, releasing keys!"); + let _ = emulate_notify.send(EmulationEvent::ReleaseKeys(h)).await; + } + } else { + // release pointer if the active client has not responded + if !unresponsive_clients.is_empty() { + log::warn!("client not responding, releasing pointer!"); + server.state.replace(State::Receiving); + let _ = capture_notify.send(CaptureEvent::Release).await; + } + } + } + } +}