diff --git a/input-capture/src/error.rs b/input-capture/src/error.rs index 896f4e7..449775b 100644 --- a/input-capture/src/error.rs +++ b/input-capture/src/error.rs @@ -1,3 +1,4 @@ +use reis::tokio::EiConvertEventStreamError; use thiserror::Error; #[cfg(all(unix, feature = "wayland", not(target_os = "macos")))] @@ -9,6 +10,39 @@ use wayland_client::{ ConnectError, DispatchError, }; +#[cfg(all(unix, feature = "libei", not(target_os = "macos")))] +use reis::tokio::HandshakeError; + +#[cfg(all(unix, feature = "libei", not(target_os = "macos")))] +#[derive(Debug, Error)] +#[error("error in libei stream: {inner:?}")] +pub struct ReisConvertEventStreamError { + inner: EiConvertEventStreamError, +} + +#[cfg(all(unix, feature = "libei", not(target_os = "macos")))] +impl From for ReisConvertEventStreamError { + fn from(e: EiConvertEventStreamError) -> Self { + Self { inner: e } + } +} + +#[derive(Debug, Error)] +pub enum CaptureError { + #[error("activation stream closed unexpectedly")] + ActivationClosed, + #[error("libei stream was closed")] + EndOfStream, + #[error("io error: `{0}`")] + Io(#[from] std::io::Error), + #[error("error in libei stream: `{0}`")] + Reis(#[from] ReisConvertEventStreamError), + #[error("libei handshake failed: `{0}`")] + Handshake(#[from] HandshakeError), + #[error(transparent)] + Portal(#[from] ashpd::Error), +} + #[derive(Debug, Error)] pub enum CaptureCreationError { #[error("no backend available")] diff --git a/input-capture/src/lib.rs b/input-capture/src/lib.rs index f24f166..90ed483 100644 --- a/input-capture/src/lib.rs +++ b/input-capture/src/lib.rs @@ -4,7 +4,7 @@ use futures_core::Stream; use input_event::Event; -use self::error::CaptureCreationError; +pub use error::{CaptureCreationError, CaptureError}; pub mod error; diff --git a/input-capture/src/libei.rs b/input-capture/src/libei.rs index d2e84ef..0b19203 100644 --- a/input-capture/src/libei.rs +++ b/input-capture/src/libei.rs @@ -1,4 +1,3 @@ -use anyhow::{anyhow, Result}; use ashpd::{ desktop::{ input_capture::{Activated, Barrier, BarrierID, Capabilities, InputCapture, Region, Zones}, @@ -32,6 +31,8 @@ use once_cell::sync::Lazy; use input_event::{Event, KeyboardEvent, PointerEvent}; +use crate::error::{CaptureError, ReisConvertEventStreamError}; + use super::{ error::LibeiCaptureCreationError, CaptureHandle, InputCapture as LanMouseInputCapture, Position, }; @@ -46,7 +47,7 @@ enum ProducerEvent { #[allow(dead_code)] pub struct LibeiInputCapture<'a> { input_capture: Pin>>, - libei_task: JoinHandle>, + libei_task: JoinHandle>, event_rx: tokio::sync::mpsc::Receiver<(CaptureHandle, Event)>, notify_tx: tokio::sync::mpsc::Sender, } @@ -108,7 +109,7 @@ async fn update_barriers( session: &Session<'_>, active_clients: &Vec<(CaptureHandle, Position)>, next_barrier_id: &mut u32, -) -> Result> { +) -> Result, ashpd::Error> { let zones = input_capture.zones(session).await?.response()?; log::debug!("zones: {zones:?}"); @@ -154,7 +155,7 @@ async fn create_session<'a>( async fn connect_to_eis( input_capture: &InputCapture<'_>, session: &Session<'_>, -) -> Result<(ei::Context, EiConvertEventStream)> { +) -> Result<(ei::Context, EiConvertEventStream), CaptureError> { log::debug!("connect_to_eis"); let fd = input_capture.connect_to_eis(session).await?; @@ -165,17 +166,13 @@ async fn connect_to_eis( // create ei context let context = ei::Context::new(stream)?; let mut event_stream = EiEventStream::new(context.clone())?; - let response = match reis::tokio::ei_handshake( + let response = reis::tokio::ei_handshake( &mut event_stream, "de.feschber.LanMouse", ei::handshake::ContextType::Receiver, &INTERFACES, ) - .await - { - Ok(res) => res, - Err(e) => return Err(anyhow!("ei handshake failed: {e:?}")), - }; + .await?; let event_stream = EiConvertEventStream::new(event_stream, response.serial); Ok((context, event_stream)) @@ -186,13 +183,13 @@ async fn libei_event_handler( context: ei::Context, event_tx: Sender<(CaptureHandle, Event)>, current_client: Rc>>, -) -> Result<()> { +) -> Result<(), CaptureError> { loop { - let ei_event = match ei_event_stream.next().await { - Some(Ok(event)) => event, - Some(Err(e)) => return Err(anyhow!("libei connection closed: {e:?}")), - None => return Err(anyhow!("libei connection closed")), - }; + let ei_event = ei_event_stream + .next() + .await + .ok_or(CaptureError::EndOfStream)? + .map_err(ReisConvertEventStreamError::from)?; log::trace!("from ei: {ei_event:?}"); let client = current_client.get(); handle_ei_event(ei_event, client, &context, &event_tx).await; @@ -202,136 +199,26 @@ async fn libei_event_handler( async fn wait_for_active_client( notify_rx: &mut Receiver, active_clients: &mut Vec<(CaptureHandle, Position)>, -) -> Result<()> { +) { // wait for a client update while let Some(producer_event) = notify_rx.recv().await { if let ProducerEvent::Create(c, p) = producer_event { - handle_producer_event(ProducerEvent::Create(c, p), active_clients)?; + handle_producer_event(ProducerEvent::Create(c, p), active_clients); break; } } - Ok(()) } impl<'a> LibeiInputCapture<'a> { pub async fn new() -> std::result::Result { let input_capture = Box::pin(InputCapture::new().await?); let input_capture_ptr = input_capture.as_ref().get_ref() as *const InputCapture<'static>; - let mut first_session = Some(create_session(unsafe { &*input_capture_ptr }).await?); + let first_session = Some(create_session(unsafe { &*input_capture_ptr }).await?); let (event_tx, event_rx) = tokio::sync::mpsc::channel(32); - let (notify_tx, mut notify_rx) = tokio::sync::mpsc::channel(32); - let libei_task = tokio::task::spawn_local(async move { - /* safety: libei_task does not outlive Self */ - let input_capture = unsafe { &*input_capture_ptr }; - - let mut active_clients: Vec<(CaptureHandle, Position)> = vec![]; - let mut next_barrier_id = 1u32; - - /* there is a bug in xdg-remote-desktop-portal-gnome / mutter that - * prevents receiving further events after a session has been disabled once. - * Therefore the session needs to recreated when the barriers are updated */ - - loop { - // otherwise it asks to capture input even with no active clients - if active_clients.is_empty() { - wait_for_active_client(&mut notify_rx, &mut active_clients).await?; - continue; - } - - let current_client = Rc::new(Cell::new(None)); - - // create session - let (session, _) = match first_session.take() { - Some(s) => s, - _ => create_session(input_capture).await?, - }; - - // connect to eis server - let (context, ei_event_stream) = connect_to_eis(input_capture, &session).await?; - - // async event task - let mut ei_task: JoinHandle> = - tokio::task::spawn_local(libei_event_handler( - ei_event_stream, - context, - event_tx.clone(), - current_client.clone(), - )); - - let mut activated = input_capture.receive_activated().await?; - let mut zones_changed = input_capture.receive_zones_changed().await?; - - // set barriers - let client_for_barrier_id = update_barriers( - input_capture, - &session, - &active_clients, - &mut next_barrier_id, - ) - .await?; - - log::debug!("enabling session"); - input_capture.enable(&session).await?; - - loop { - tokio::select! { - activated = activated.next() => { - let activated = activated.ok_or(anyhow!("error receiving activation token"))?; - log::debug!("activated: {activated:?}"); - - let client = *client_for_barrier_id - .get(&activated.barrier_id()) - .expect("invalid barrier id"); - current_client.replace(Some(client)); - - event_tx.send((client, Event::Enter())).await?; - - tokio::select! { - producer_event = notify_rx.recv() => { - let producer_event = producer_event.expect("channel closed"); - if handle_producer_event(producer_event, &mut active_clients)? { - break; /* clients updated */ - } - } - zones_changed = zones_changed.next() => { - log::debug!("zones changed: {zones_changed:?}"); - break; - } - res = &mut ei_task => { - if let Err(e) = res.expect("ei task paniced") { - log::warn!("libei task exited: {e}"); - } - break; - } - } - release_capture( - input_capture, - &session, - activated, - client, - &active_clients, - ).await?; - } - producer_event = notify_rx.recv() => { - let producer_event = producer_event.expect("channel closed"); - if handle_producer_event(producer_event, &mut active_clients)? { - /* clients updated */ - break; - } - }, - res = &mut ei_task => { - if let Err(e) = res.expect("ei task paniced") { - log::warn!("libei task exited: {e}"); - } - break; - } - } - } - ei_task.abort(); - input_capture.disable(&session).await?; - } - }); + let (notify_tx, notify_rx) = tokio::sync::mpsc::channel(32); + let capture = do_capture(input_capture_ptr, notify_rx, first_session, event_tx); + let libei_task = tokio::task::spawn_local(capture); let producer = Self { input_capture, @@ -344,13 +231,139 @@ impl<'a> LibeiInputCapture<'a> { } } +async fn do_capture<'a>( + input_capture_ptr: *const InputCapture<'static>, + mut notify_rx: Receiver, + mut first_session: Option<(Session<'a>, BitFlags)>, + event_tx: Sender<(CaptureHandle, Event)>, +) -> Result<(), CaptureError> { + /* safety: libei_task does not outlive Self */ + let input_capture = unsafe { &*input_capture_ptr }; + + let mut active_clients: Vec<(CaptureHandle, Position)> = vec![]; + let mut next_barrier_id = 1u32; + + /* there is a bug in xdg-remote-desktop-portal-gnome / mutter that + * prevents receiving further events after a session has been disabled once. + * Therefore the session needs to recreated when the barriers are updated */ + + loop { + // otherwise it asks to capture input even with no active clients + if active_clients.is_empty() { + wait_for_active_client(&mut notify_rx, &mut active_clients).await; + if notify_rx.is_closed() { + break Ok(()); + } else { + continue; + } + } + + let current_client = Rc::new(Cell::new(None)); + + // create session + let (session, _) = match first_session.take() { + Some(s) => s, + _ => create_session(input_capture).await?, + }; + + // connect to eis server + let (context, ei_event_stream) = connect_to_eis(input_capture, &session).await?; + + // async event task + let mut ei_task: JoinHandle> = + tokio::task::spawn_local(libei_event_handler( + ei_event_stream, + context, + event_tx.clone(), + current_client.clone(), + )); + + let mut activated = input_capture.receive_activated().await?; + let mut zones_changed = input_capture.receive_zones_changed().await?; + + // set barriers + let client_for_barrier_id = update_barriers( + input_capture, + &session, + &active_clients, + &mut next_barrier_id, + ) + .await?; + + log::debug!("enabling session"); + input_capture.enable(&session).await?; + + loop { + tokio::select! { + activated = activated.next() => { + let activated = activated.ok_or(CaptureError::ActivationClosed)?; + log::debug!("activated: {activated:?}"); + + let client = *client_for_barrier_id + .get(&activated.barrier_id()) + .expect("invalid barrier id"); + current_client.replace(Some(client)); + + if event_tx.send((client, Event::Enter())).await.is_err() { + break; + }; + + tokio::select! { + producer_event = notify_rx.recv() => { + let producer_event = producer_event.expect("channel closed"); + if handle_producer_event(producer_event, &mut active_clients) { + break; /* clients updated */ + } + } + zones_changed = zones_changed.next() => { + log::debug!("zones changed: {zones_changed:?}"); + break; + } + res = &mut ei_task => { + if let Err(e) = res.expect("ei task paniced") { + log::warn!("libei task exited: {e}"); + } + break; + } + } + release_capture( + input_capture, + &session, + activated, + client, + &active_clients, + ).await?; + } + producer_event = notify_rx.recv() => { + let producer_event = producer_event.expect("channel closed"); + if handle_producer_event(producer_event, &mut active_clients) { + /* clients updated */ + break; + } + }, + res = &mut ei_task => { + if let Err(e) = res.expect("ei task paniced") { + log::warn!("libei task exited: {e}"); + } + break; + } + } + } + ei_task.abort(); + input_capture.disable(&session).await?; + if event_tx.is_closed() { + break Ok(()); + } + } +} + async fn release_capture( input_capture: &InputCapture<'_>, session: &Session<'_>, activated: Activated, current_client: CaptureHandle, active_clients: &[(CaptureHandle, Position)], -) -> Result<()> { +) -> Result<(), CaptureError> { log::debug!("releasing input capture {}", activated.activation_id()); let (x, y) = activated.cursor_position(); let pos = active_clients @@ -377,9 +390,9 @@ async fn release_capture( fn handle_producer_event( producer_event: ProducerEvent, active_clients: &mut Vec<(CaptureHandle, Position)>, -) -> Result { +) -> bool { log::debug!("handling event: {producer_event:?}"); - let updated = match producer_event { + match producer_event { ProducerEvent::Release => false, ProducerEvent::Create(c, p) => { active_clients.push((c, p)); @@ -389,8 +402,7 @@ fn handle_producer_event( active_clients.retain(|(h, _)| *h != c); true } - }; - Ok(updated) + } } async fn handle_ei_event( diff --git a/src/emulation_test.rs b/src/emulation_test.rs index f146f6b..610597a 100644 --- a/src/emulation_test.rs +++ b/src/emulation_test.rs @@ -38,11 +38,7 @@ async fn input_emulation_test(config: Config) -> Result<()> { let relative_motion = (new_offset.0 - offset.0, new_offset.1 - offset.1); offset = new_offset; let (dx, dy) = (relative_motion.0 as f64, relative_motion.1 as f64); - let event = Event::Pointer(PointerEvent::Motion { - time: 0, - dx: dx, - dy: dy, - }); + let event = Event::Pointer(PointerEvent::Motion { time: 0, dx, dy }); emulation.consume(event, 0).await?; } } diff --git a/src/server/network_task.rs b/src/server/network_task.rs index 0d93e4e..f23366f 100644 --- a/src/server/network_task.rs +++ b/src/server/network_task.rs @@ -80,8 +80,8 @@ async fn udp_receiver( receiver_tx: &Sender>, ) { loop { - let event = receive_event(&socket).await; - if let Err(_) = receiver_tx.send(event).await { + let event = receive_event(socket).await; + if receiver_tx.send(event).await.is_err() { break; } } @@ -93,7 +93,7 @@ async fn udp_sender(socket: &UdpSocket, rx: &mut Receiver<(Event, SocketAddr)>) Some(e) => e, None => return, }; - if let Err(e) = send_event(&socket, event, addr) { + if let Err(e) = send_event(socket, event, addr) { log::warn!("udp send failed: {e}"); }; }