diff --git a/src/backend/producer/dummy.rs b/src/backend/producer/dummy.rs index 25317ca..c0c7f71 100644 --- a/src/backend/producer/dummy.rs +++ b/src/backend/producer/dummy.rs @@ -24,9 +24,13 @@ impl Default for DummyProducer { } impl EventProducer for DummyProducer { - fn notify(&mut self, _: ClientEvent) {} + fn notify(&mut self, _event: ClientEvent) -> io::Result<()> { + Ok(()) + } - fn release(&mut self) {} + fn release(&mut self) -> io::Result<()> { + Ok(()) + } } impl Stream for DummyProducer { diff --git a/src/backend/producer/libei.rs b/src/backend/producer/libei.rs index 16c0eb0..5e2872c 100644 --- a/src/backend/producer/libei.rs +++ b/src/backend/producer/libei.rs @@ -3,7 +3,11 @@ use std::{io, task::Poll}; use futures_core::Stream; -use crate::{client::ClientHandle, event::Event, producer::EventProducer}; +use crate::{ + client::{ClientEvent, ClientHandle}, + event::Event, + producer::EventProducer, +}; pub struct LibeiProducer {} @@ -14,9 +18,13 @@ impl LibeiProducer { } impl EventProducer for LibeiProducer { - fn notify(&mut self, _event: crate::client::ClientEvent) {} + fn notify(&mut self, _event: ClientEvent) -> io::Result<()> { + Ok(()) + } - fn release(&mut self) {} + fn release(&mut self) -> io::Result<()> { + Ok(()) + } } impl Stream for LibeiProducer { diff --git a/src/backend/producer/macos.rs b/src/backend/producer/macos.rs index aa72227..bf96d2e 100644 --- a/src/backend/producer/macos.rs +++ b/src/backend/producer/macos.rs @@ -23,7 +23,11 @@ impl Stream for MacOSProducer { } impl EventProducer for MacOSProducer { - fn notify(&mut self, _event: ClientEvent) {} + fn notify(&mut self, _event: ClientEvent) -> io::Result<()> { + Ok(()) + } - fn release(&mut self) {} + fn release(&mut self) -> io::Result<()> { + Ok(()) + } } diff --git a/src/backend/producer/wayland.rs b/src/backend/producer/wayland.rs index d386acf..f4de5ea 100644 --- a/src/backend/producer/wayland.rs +++ b/src/backend/producer/wayland.rs @@ -527,24 +527,25 @@ impl Inner { } } - fn flush_events(&mut self) { + fn flush_events(&mut self) -> io::Result<()> { // flush outgoing events match self.queue.flush() { Ok(_) => (), Err(e) => match e { WaylandError::Io(e) => { - log::error!("error writing to wayland socket: {e}") + return Err(e); } WaylandError::Protocol(e) => { panic!("wayland protocol violation: {e}") } }, } + Ok(()) } } impl EventProducer for WaylandEventProducer { - fn notify(&mut self, client_event: ClientEvent) { + fn notify(&mut self, client_event: ClientEvent) -> io::Result<()> { match client_event { ClientEvent::Create(handle, pos) => { self.0.get_mut().state.add_client(handle, pos); @@ -564,14 +565,14 @@ impl EventProducer for WaylandEventProducer { } } let inner = self.0.get_mut(); - inner.flush_events(); + inner.flush_events() } - fn release(&mut self) { + fn release(&mut self) -> io::Result<()> { log::debug!("releasing pointer"); let inner = self.0.get_mut(); inner.state.ungrab(); - inner.flush_events(); + inner.flush_events() } } @@ -602,7 +603,11 @@ impl Stream for WaylandEventProducer { inner.dispatch_events(); // flush outgoing events - inner.flush_events(); + if let Err(e) = inner.flush_events() { + if e.kind() != ErrorKind::WouldBlock { + return Poll::Ready(Some(Err(e))); + } + } // prepare for the next read match inner.prepare_read() { diff --git a/src/backend/producer/windows.rs b/src/backend/producer/windows.rs index 592018c..6cffab4 100644 --- a/src/backend/producer/windows.rs +++ b/src/backend/producer/windows.rs @@ -12,9 +12,13 @@ use crate::{ pub struct WindowsProducer {} impl EventProducer for WindowsProducer { - fn notify(&mut self, _: ClientEvent) {} + fn notify(&mut self, _event: ClientEvent) -> io::Result<()> { + Ok(()) + } - fn release(&mut self) {} + fn release(&mut self) -> io::Result<()> { + Ok(()) + } } impl WindowsProducer { diff --git a/src/backend/producer/x11.rs b/src/backend/producer/x11.rs index d6d7b23..1811e2c 100644 --- a/src/backend/producer/x11.rs +++ b/src/backend/producer/x11.rs @@ -18,9 +18,13 @@ impl X11Producer { } impl EventProducer for X11Producer { - fn notify(&mut self, _: ClientEvent) {} + fn notify(&mut self, _event: ClientEvent) -> io::Result<()> { + Ok(()) + } - fn release(&mut self) {} + fn release(&mut self) -> io::Result<()> { + Ok(()) + } } impl Stream for X11Producer { diff --git a/src/producer.rs b/src/producer.rs index 1c6acf2..24a2ca3 100644 --- a/src/producer.rs +++ b/src/producer.rs @@ -54,8 +54,8 @@ pub async fn create() -> Box { pub trait EventProducer: Stream> + Unpin { /// notify event producer of configuration changes - fn notify(&mut self, event: ClientEvent); + fn notify(&mut self, event: ClientEvent) -> io::Result<()>; /// release mouse - fn release(&mut self); + fn release(&mut self) -> io::Result<()>; } diff --git a/src/server.rs b/src/server.rs index db2093e..b62275f 100644 --- a/src/server.rs +++ b/src/server.rs @@ -132,18 +132,18 @@ impl Server { event = producer.next() => { let event = event.ok_or(anyhow!("event producer closed"))??; log::debug!("producer event: {event:?}"); - server.handle_producer_event(&mut producer, &sender_ch, &timer_ch, event).await; + server.handle_producer_event(&mut producer, &sender_ch, &timer_ch, event).await?; } e = producer_notify_rx.recv() => { log::debug!("producer notify rx: {e:?}"); match e { Some(e) => match e { ProducerEvent::Release => { - producer.release(); + producer.release()?; server.state.replace(State::Receiving); } - ProducerEvent::ClientEvent(e) => producer.notify(e), + ProducerEvent::ClientEvent(e) => producer.notify(e)?, ProducerEvent::Terminate => break, }, None => break, @@ -426,35 +426,47 @@ impl Server { tokio::select! { _ = signal::ctrl_c() => { log::info!("terminating service"); - }, - _ = &mut producer_task => { - // TODO restart producer? } - _ = &mut consumer_task => { - // TODO restart producer? + e = &mut producer_task => { + if let Ok(Err(e)) = e { + log::error!("error in event producer: {e}"); + } } - _ = &mut frontend_task => { - // frontend exited => exit requested + e = &mut consumer_task => { + if let Ok(Err(e)) = e { + log::error!("error in event consumer: {e}"); + } } - _ = &mut resolver_task => { - // resolver exited - } - _ = &mut udp_task => { - // udp exited - } - _ = &mut live_tracker => { - // live tracker exited + e = &mut frontend_task => { + if let Ok(Err(e)) = e { + log::error!("error in frontend listener: {e}"); + } } + _ = &mut resolver_task => { } + _ = &mut udp_task => { } + _ = &mut live_tracker => { } } let _ = consumer_notify_tx.send(ConsumerEvent::Terminate).await; let _ = producer_notify_tx.send(ProducerEvent::Terminate).await; let _ = frontend_tx.send(FrontendEvent::Shutdown()).await; - let (a, b, c) = tokio::join!(producer_task, consumer_task, frontend_task); - a??; - b??; - c??; + if !producer_task.is_finished() { + if let Err(e) = producer_task.await { + log::error!("error in event producer: {e}"); + } + } + if !consumer_task.is_finished() { + if let Err(e) = consumer_task.await { + log::error!("error in event consumer: {e}"); + } + } + + if !frontend_task.is_finished() { + if let Err(e) = frontend_task.await { + log::error!("error in frontend listener: {e}"); + } + } resolver_task.abort(); udp_task.abort(); @@ -765,7 +777,7 @@ impl Server { sender_tx: &Sender<(Event, SocketAddr)>, timer_tx: &Sender<()>, event: (ClientHandle, Event), - ) { + ) -> Result<()> { let (c, mut e) = event; log::trace!("producer: ({c}) {e:?}"); @@ -777,7 +789,7 @@ impl Server { }) = e { if mods_depressed == Self::RELEASE_MODIFIERDS { - producer.release(); + producer.release()?; self.state.replace(State::Receiving); log::trace!("STATE ===> Receiving"); // send an event to release all the modifiers @@ -796,10 +808,10 @@ impl Server { None => { // should not happen log::warn!("unknown client!"); - producer.release(); + producer.release()?; self.state.replace(State::Receiving); log::trace!("STATE ===> Receiving"); - return; + return Ok(()); } }; @@ -825,6 +837,7 @@ impl Server { } let _ = sender_tx.send((e, addr)).await; } + Ok(()) } async fn handle_frontend_stream(