mirror of
https://github.com/feschber/lan-mouse.git
synced 2026-04-15 14:31:28 +03:00
hotfix: Dont stall the event loop if udp blocks
This commit is contained in:
@@ -92,25 +92,26 @@ impl Server {
|
|||||||
|
|
||||||
pub async fn run(&mut self) -> anyhow::Result<()> {
|
pub async fn run(&mut self) -> anyhow::Result<()> {
|
||||||
loop {
|
loop {
|
||||||
tokio::select! {
|
log::trace!("polling...");
|
||||||
// safety: cancellation safe
|
tokio::select! { biased;
|
||||||
udp_event = receive_event(&self.socket) => {
|
|
||||||
match udp_event {
|
|
||||||
Ok(e) => self.handle_udp_rx(e).await,
|
|
||||||
Err(e) => log::error!("error reading event: {e}"),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// safety: cancellation safe
|
// safety: cancellation safe
|
||||||
res = self.producer.next() => {
|
res = self.producer.next() => {
|
||||||
match res {
|
match res {
|
||||||
Some(Ok((client, event))) => {
|
Some(Ok((client, event))) => {
|
||||||
self.handle_producer_event(client,event).await;
|
self.handle_producer_event(client,event).await;
|
||||||
},
|
},
|
||||||
Some(Err(e)) => log::error!("error reading from event producer: {e}"),
|
Some(Err(e)) => return Err(e.into()),
|
||||||
_ => break,
|
_ => break,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// safety: cancellation safe
|
// safety: cancellation safe
|
||||||
|
udp_event = receive_event(&self.socket) => {
|
||||||
|
match udp_event {
|
||||||
|
Ok(e) => self.handle_udp_rx(e).await,
|
||||||
|
Err(e) => log::error!("error reading event: {e}"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// safety: cancellation safe
|
||||||
stream = self.frontend.accept() => {
|
stream = self.frontend.accept() => {
|
||||||
match stream {
|
match stream {
|
||||||
Ok(s) => self.handle_frontend_stream(s).await,
|
Ok(s) => self.handle_frontend_stream(s).await,
|
||||||
@@ -311,14 +312,14 @@ impl Server {
|
|||||||
match (event, addr) {
|
match (event, addr) {
|
||||||
(Event::Pong(), _) => { /* ignore pong events */ }
|
(Event::Pong(), _) => { /* ignore pong events */ }
|
||||||
(Event::Ping(), addr) => {
|
(Event::Ping(), addr) => {
|
||||||
if let Err(e) = send_event(&self.socket, Event::Pong(), addr).await {
|
if let Err(e) = send_event(&self.socket, Event::Pong(), addr) {
|
||||||
log::error!("udp send: {}", e);
|
log::error!("udp send: {}", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
(event, addr) => {
|
(event, addr) => {
|
||||||
// tell clients that we are ready to receive events
|
// tell clients that we are ready to receive events
|
||||||
if let Event::Enter() = event {
|
if let Event::Enter() = event {
|
||||||
if let Err(e) = send_event(&self.socket, Event::Leave(), addr).await {
|
if let Err(e) = send_event(&self.socket, Event::Leave(), addr) {
|
||||||
log::error!("udp send: {}", e);
|
log::error!("udp send: {}", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -363,7 +364,7 @@ impl Server {
|
|||||||
&& state.last_replied.unwrap().elapsed() > Duration::from_secs(1)
|
&& state.last_replied.unwrap().elapsed() > Duration::from_secs(1)
|
||||||
{
|
{
|
||||||
state.last_replied = Some(Instant::now());
|
state.last_replied = Some(Instant::now());
|
||||||
if let Err(e) = send_event(&self.socket, Event::Pong(), addr).await {
|
if let Err(e) = send_event(&self.socket, Event::Pong(), addr) {
|
||||||
log::error!("udp send: {}", e);
|
log::error!("udp send: {}", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -411,7 +412,7 @@ impl Server {
|
|||||||
if let State::Receiving | State::AwaitingLeave = self.state {
|
if let State::Receiving | State::AwaitingLeave = self.state {
|
||||||
self.state = State::AwaitingLeave;
|
self.state = State::AwaitingLeave;
|
||||||
if let Some(addr) = state.client.active_addr {
|
if let Some(addr) = state.client.active_addr {
|
||||||
if let Err(e) = send_event(&self.socket, Event::Enter(), addr).await {
|
if let Err(e) = send_event(&self.socket, Event::Enter(), addr) {
|
||||||
log::error!("udp send: {}", e);
|
log::error!("udp send: {}", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -420,7 +421,7 @@ impl Server {
|
|||||||
// otherwise we should have an address to
|
// otherwise we should have an address to
|
||||||
// transmit events to the corrensponding client
|
// transmit events to the corrensponding client
|
||||||
if let Some(addr) = state.client.active_addr {
|
if let Some(addr) = state.client.active_addr {
|
||||||
if let Err(e) = send_event(&self.socket, e, addr).await {
|
if let Err(e) = send_event(&self.socket, e, addr) {
|
||||||
log::error!("udp send: {}", e);
|
log::error!("udp send: {}", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -461,7 +462,7 @@ impl Server {
|
|||||||
state.last_ping = Some(Instant::now());
|
state.last_ping = Some(Instant::now());
|
||||||
for addr in state.client.addrs.iter() {
|
for addr in state.client.addrs.iter() {
|
||||||
log::debug!("pinging {addr}");
|
log::debug!("pinging {addr}");
|
||||||
if let Err(e) = send_event(&self.socket, Event::Ping(), *addr).await {
|
if let Err(e) = send_event(&self.socket, Event::Ping(), *addr) {
|
||||||
if e.kind() != ErrorKind::WouldBlock {
|
if e.kind() != ErrorKind::WouldBlock {
|
||||||
log::error!("udp send: {}", e);
|
log::error!("udp send: {}", e);
|
||||||
}
|
}
|
||||||
@@ -598,10 +599,11 @@ async fn receive_event(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn send_event(sock: &UdpSocket, e: Event, addr: SocketAddr) -> Result<usize> {
|
fn send_event(sock: &UdpSocket, e: Event, addr: SocketAddr) -> Result<usize> {
|
||||||
log::trace!("{:20} ------>->->-> {addr}", e.to_string());
|
log::trace!("{:20} ------>->->-> {addr}", e.to_string());
|
||||||
let data: Vec<u8> = (&e).into();
|
let data: Vec<u8> = (&e).into();
|
||||||
// We are currently abusing a blocking send to get the lowest possible latency.
|
// When udp blocks, we dont want to block the event loop.
|
||||||
// It may be better to set the socket to non-blocking and only send when ready.
|
// Dropping events is better than potentially crashing the event
|
||||||
sock.send_to(&data[..], addr).await
|
// producer.
|
||||||
|
sock.try_send_to(&data, addr)
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user