From 1cefa3854319c172eaaca9eb62eb08b14514bda2 Mon Sep 17 00:00:00 2001 From: Ferdinand Schober Date: Fri, 22 Dec 2023 17:45:20 +0100 Subject: [PATCH] hotfix: Dont stall the event loop if udp blocks --- src/server.rs | 40 +++++++++++++++++++++------------------- 1 file changed, 21 insertions(+), 19 deletions(-) diff --git a/src/server.rs b/src/server.rs index b01261c..7541d2c 100644 --- a/src/server.rs +++ b/src/server.rs @@ -92,25 +92,26 @@ impl Server { pub async fn run(&mut self) -> anyhow::Result<()> { loop { - tokio::select! { - // safety: cancellation safe - udp_event = receive_event(&self.socket) => { - match udp_event { - Ok(e) => self.handle_udp_rx(e).await, - Err(e) => log::error!("error reading event: {e}"), - } - } + log::trace!("polling..."); + tokio::select! { biased; // safety: cancellation safe res = self.producer.next() => { match res { Some(Ok((client, event))) => { self.handle_producer_event(client,event).await; }, - Some(Err(e)) => log::error!("error reading from event producer: {e}"), + Some(Err(e)) => return Err(e.into()), _ => break, } } // safety: cancellation safe + udp_event = receive_event(&self.socket) => { + match udp_event { + Ok(e) => self.handle_udp_rx(e).await, + Err(e) => log::error!("error reading event: {e}"), + } + } + // safety: cancellation safe stream = self.frontend.accept() => { match stream { Ok(s) => self.handle_frontend_stream(s).await, @@ -311,14 +312,14 @@ impl Server { match (event, addr) { (Event::Pong(), _) => { /* ignore pong events */ } (Event::Ping(), addr) => { - if let Err(e) = send_event(&self.socket, Event::Pong(), addr).await { + if let Err(e) = send_event(&self.socket, Event::Pong(), addr) { log::error!("udp send: {}", e); } } (event, addr) => { // tell clients that we are ready to receive events if let Event::Enter() = event { - if let Err(e) = send_event(&self.socket, Event::Leave(), addr).await { + if let Err(e) = send_event(&self.socket, Event::Leave(), addr) { log::error!("udp send: {}", e); } } @@ -363,7 +364,7 @@ impl Server { && state.last_replied.unwrap().elapsed() > Duration::from_secs(1) { state.last_replied = Some(Instant::now()); - if let Err(e) = send_event(&self.socket, Event::Pong(), addr).await { + if let Err(e) = send_event(&self.socket, Event::Pong(), addr) { log::error!("udp send: {}", e); } } @@ -411,7 +412,7 @@ impl Server { if let State::Receiving | State::AwaitingLeave = self.state { self.state = State::AwaitingLeave; if let Some(addr) = state.client.active_addr { - if let Err(e) = send_event(&self.socket, Event::Enter(), addr).await { + if let Err(e) = send_event(&self.socket, Event::Enter(), addr) { log::error!("udp send: {}", e); } } @@ -420,7 +421,7 @@ impl Server { // otherwise we should have an address to // transmit events to the corrensponding client if let Some(addr) = state.client.active_addr { - if let Err(e) = send_event(&self.socket, e, addr).await { + if let Err(e) = send_event(&self.socket, e, addr) { log::error!("udp send: {}", e); } } @@ -461,7 +462,7 @@ impl Server { state.last_ping = Some(Instant::now()); for addr in state.client.addrs.iter() { log::debug!("pinging {addr}"); - if let Err(e) = send_event(&self.socket, Event::Ping(), *addr).await { + if let Err(e) = send_event(&self.socket, Event::Ping(), *addr) { if e.kind() != ErrorKind::WouldBlock { log::error!("udp send: {}", e); } @@ -598,10 +599,11 @@ async fn receive_event( } } -async fn send_event(sock: &UdpSocket, e: Event, addr: SocketAddr) -> Result { +fn send_event(sock: &UdpSocket, e: Event, addr: SocketAddr) -> Result { log::trace!("{:20} ------>->->-> {addr}", e.to_string()); let data: Vec = (&e).into(); - // We are currently abusing a blocking send to get the lowest possible latency. - // It may be better to set the socket to non-blocking and only send when ready. - sock.send_to(&data[..], addr).await + // When udp blocks, we dont want to block the event loop. + // Dropping events is better than potentially crashing the event + // producer. + sock.try_send_to(&data, addr) }