From 5fc02d471b56877854ebe63b1c10a3107215feb5 Mon Sep 17 00:00:00 2001 From: Ferdinand Schober Date: Fri, 15 Dec 2023 20:24:14 +0100 Subject: [PATCH] cleanup server code --- src/backend/producer/wayland.rs | 1 - src/frontend.rs | 2 - src/server.rs | 89 +++++++++++---------------------- 3 files changed, 30 insertions(+), 62 deletions(-) diff --git a/src/backend/producer/wayland.rs b/src/backend/producer/wayland.rs index d75e83e..8e483ac 100644 --- a/src/backend/producer/wayland.rs +++ b/src/backend/producer/wayland.rs @@ -566,7 +566,6 @@ impl Stream for WaylandEventProducer { type Item = io::Result<(ClientHandle, Event)>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - log::trace!("producer.next()"); if let Some(event) = self.0.get_mut().state.pending_events.pop_front() { return Poll::Ready(Some(Ok(event))); } diff --git a/src/frontend.rs b/src/frontend.rs index 1b48f8d..de93a5d 100644 --- a/src/frontend.rs +++ b/src/frontend.rs @@ -201,8 +201,6 @@ impl FrontendListener { #[cfg(unix)] pub async fn accept(&mut self) -> Result> { - log::trace!("frontend.accept()"); - let stream = self.listener.accept().await?.0; let (rx, tx) = tokio::io::split(stream); self.tx_streams.push(tx); diff --git a/src/server.rs b/src/server.rs index 8d89b4d..2bc2fa2 100644 --- a/src/server.rs +++ b/src/server.rs @@ -32,8 +32,6 @@ use crate::{ producer::EventProducer, }; -/// keeps track of state to prevent a feedback loop -/// of continuously sending and receiving the same event. #[derive(Eq, PartialEq)] enum State { Sending, @@ -93,11 +91,9 @@ impl Server { pub async fn run(&mut self) -> anyhow::Result<()> { loop { - log::trace!("polling ..."); tokio::select! { // safety: cancellation safe udp_event = receive_event(&self.socket) => { - log::trace!("-> receive_event"); match udp_event { Ok(e) => self.handle_udp_rx(e).await, Err(e) => log::error!("error reading event: {e}"), @@ -105,7 +101,6 @@ impl Server { } // safety: cancellation safe res = self.producer.next() => { - log::trace!("-> producer.next()"); match res { Some(Ok((client, event))) => { self.handle_producer_event(client,event).await; @@ -116,7 +111,6 @@ impl Server { } // safety: cancellation safe stream = self.frontend.accept() => { - log::trace!("-> frontend.accept()"); match stream { Ok(s) => self.handle_frontend_stream(s).await, Err(e) => log::error!("error connecting to frontend: {e}"), @@ -124,7 +118,6 @@ impl Server { } // safety: cancellation safe frontend_event = self.frontend_rx.recv() => { - log::trace!("-> frontend.recv()"); if let Some(event) = frontend_event { if self.handle_frontend_event(event).await { break; @@ -133,7 +126,6 @@ impl Server { } // safety: cancellation safe e = self.consumer.dispatch() => { - log::trace!("-> consumer.dispatch()"); e?; } // safety: cancellation safe @@ -314,43 +306,35 @@ impl Server { state.last_seen = Some(Instant::now()); // set addr as new default for this client state.client.active_addr = Some(addr); + match (event, addr) { - (Event::Pong(), _) => {} + (Event::Pong(), _) => {} // ignore pong events (Event::Ping(), addr) => { if let Err(e) = send_event(&self.socket, Event::Pong(), addr).await { log::error!("udp send: {}", e); } - // we release the mouse here, - // since its very likely, that we wont get a release event - self.producer.release(); } - (event, addr) => match self.state { - State::Sending => { - // in sending state, we dont want to process - // any events to avoid feedback loops, - // therefore we tell the event producer - // to release the pointer and move on - // first event -> release pointer - if let Event::Release() = event { - log::debug!("releasing pointer ..."); - self.producer.release(); - self.state = State::Receiving; - } + (event, addr) => { + // device is sending events => release pointer if captured + if self.state == State::Sending { + log::debug!("releasing pointer ..."); + self.producer.release(); + self.state = State::Receiving; } - State::Receiving => { - // consume event - self.consumer.consume(event, handle).await; - // let the server know we are still alive once every second - let last_replied = state.last_replied; - if last_replied.is_none() - || last_replied.is_some() - && last_replied.unwrap().elapsed() > Duration::from_secs(1) - { - state.last_replied = Some(Instant::now()); - if let Err(e) = send_event(&self.socket, Event::Pong(), addr).await { - log::error!("udp send: {}", e); - } + // consume event + self.consumer.consume(event, handle).await; + log::trace!("{event:?} => consumer"); + + // let the server know we are still alive once every second + let last_replied = state.last_replied; + if last_replied.is_none() + || last_replied.is_some() + && last_replied.unwrap().elapsed() > Duration::from_secs(1) + { + state.last_replied = Some(Instant::now()); + if let Err(e) = send_event(&self.socket, Event::Pong(), addr).await { + log::error!("udp send: {}", e); } } }, @@ -358,14 +342,9 @@ impl Server { } async fn handle_producer_event(&mut self, c: ClientHandle, e: Event) { - let mut should_release = false; - // in receiving state, only release events - // must be transmitted - if let Event::Release() = e { - self.state = State::Sending; - } - log::trace!("producer: ({c}) {e:?}"); + + // get client state for handle let state = match self.client_manager.get_mut(c) { Some(state) => state, None => { @@ -373,7 +352,11 @@ impl Server { return; } }; - // otherwise we should have an address to send to + + // we are sending events + self.state = State::Sending; + + // otherwise we should have an address to // transmit events to the corrensponding client if let Some(addr) = state.client.active_addr { if let Err(e) = send_event(&self.socket, e, addr).await { @@ -399,7 +382,8 @@ impl Server { // release mouse if client didnt respond to the first ping if state.last_ping.is_some() && state.last_ping.unwrap().elapsed() < Duration::from_secs(1) { - should_release = true; + log::info!("client not responding - releasing pointer"); + self.producer.release(); } // last ping > 500ms ago -> ping all interfaces @@ -411,18 +395,6 @@ impl Server { log::error!("udp send: {}", e); } } - // send additional release event, in case client is still in sending mode - if let Err(e) = send_event(&self.socket, Event::Release(), *addr).await { - if e.kind() != ErrorKind::WouldBlock { - log::error!("udp send: {}", e); - } - } - } - - if should_release && self.state != State::Receiving { - log::info!("client not responding - releasing pointer"); - self.producer.release(); - self.state = State::Receiving; } } @@ -544,7 +516,6 @@ impl Server { async fn receive_event( socket: &UdpSocket, ) -> std::result::Result<(Event, SocketAddr), Box> { - log::trace!("receive_event"); let mut buf = vec![0u8; 22]; match socket.recv_from(&mut buf).await { Ok((_amt, src)) => Ok((Event::try_from(buf)?, src)),