improve capture error handling

This commit is contained in:
Ferdinand Schober
2024-07-10 09:00:43 +02:00
parent 6a4dd740c3
commit 110b37e26e
14 changed files with 296 additions and 216 deletions

View File

@@ -6,6 +6,8 @@ use futures_core::Stream;
use input_event::Event;
use crate::CaptureError;
use super::{CaptureHandle, InputCapture, Position};
pub struct DummyInputCapture {}
@@ -37,7 +39,7 @@ impl InputCapture for DummyInputCapture {
}
impl Stream for DummyInputCapture {
type Item = io::Result<(CaptureHandle, Event)>;
type Item = Result<(CaptureHandle, Event), CaptureError>;
fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Poll::Pending

View File

@@ -92,7 +92,9 @@ impl Display for Backend {
}
}
pub trait InputCapture: Stream<Item = io::Result<(CaptureHandle, Event)>> + Unpin {
pub trait InputCapture:
Stream<Item = Result<(CaptureHandle, Event), CaptureError>> + Unpin
{
/// create a new client with the given id
fn create(&mut self, id: CaptureHandle, pos: Position) -> io::Result<()>;
@@ -105,8 +107,10 @@ pub trait InputCapture: Stream<Item = io::Result<(CaptureHandle, Event)>> + Unpi
pub async fn create_backend(
backend: Backend,
) -> Result<Box<dyn InputCapture<Item = io::Result<(CaptureHandle, Event)>>>, CaptureCreationError>
{
) -> Result<
Box<dyn InputCapture<Item = Result<(CaptureHandle, Event), CaptureError>>>,
CaptureCreationError,
> {
match backend {
#[cfg(all(unix, feature = "libei", not(target_os = "macos")))]
Backend::InputCapturePortal => Ok(Box::new(libei::LibeiInputCapture::new().await?)),
@@ -124,8 +128,10 @@ pub async fn create_backend(
pub async fn create(
backend: Option<Backend>,
) -> Result<Box<dyn InputCapture<Item = io::Result<(CaptureHandle, Event)>>>, CaptureCreationError>
{
) -> Result<
Box<dyn InputCapture<Item = Result<(CaptureHandle, Event), CaptureError>>>,
CaptureCreationError,
> {
if let Some(backend) = backend {
let b = create_backend(backend).await;
if b.is_ok() {

View File

@@ -5,7 +5,7 @@ use ashpd::{
},
enumflags2::BitFlags,
};
use futures::StreamExt;
use futures::{FutureExt, StreamExt};
use reis::{
ei::{self, keyboard::KeyState},
eis::button::ButtonState,
@@ -17,9 +17,9 @@ use std::{
collections::HashMap,
io,
os::unix::net::UnixStream,
pin::Pin,
pin::{pin, Pin},
rc::Rc,
task::{ready, Context, Poll},
task::{Context, Poll},
};
use tokio::{
sync::mpsc::{Receiver, Sender},
@@ -192,7 +192,7 @@ async fn libei_event_handler(
.map_err(ReisConvertEventStreamError::from)?;
log::trace!("from ei: {ei_event:?}");
let client = current_client.get();
handle_ei_event(ei_event, client, &context, &event_tx).await;
handle_ei_event(ei_event, client, &context, &event_tx).await?;
}
}
@@ -252,10 +252,9 @@ async fn do_capture<'a>(
if active_clients.is_empty() {
wait_for_active_client(&mut notify_rx, &mut active_clients).await;
if notify_rx.is_closed() {
break Ok(());
} else {
continue;
break;
}
continue;
}
let current_client = Rc::new(Cell::new(None));
@@ -270,13 +269,12 @@ async fn do_capture<'a>(
let (context, ei_event_stream) = connect_to_eis(input_capture, &session).await?;
// async event task
let mut ei_task: JoinHandle<Result<(), CaptureError>> =
tokio::task::spawn_local(libei_event_handler(
ei_event_stream,
context,
event_tx.clone(),
current_client.clone(),
));
let mut ei_task = pin!(libei_event_handler(
ei_event_stream,
context,
event_tx.clone(),
current_client.clone(),
));
let mut activated = input_capture.receive_activated().await?;
let mut zones_changed = input_capture.receive_zones_changed().await?;
@@ -320,10 +318,8 @@ async fn do_capture<'a>(
break;
}
res = &mut ei_task => {
if let Err(e) = res.expect("ei task paniced") {
log::warn!("libei task exited: {e}");
}
break;
/* propagate errors to toplevel task */
res?;
}
}
release_capture(
@@ -342,19 +338,16 @@ async fn do_capture<'a>(
}
},
res = &mut ei_task => {
if let Err(e) = res.expect("ei task paniced") {
log::warn!("libei task exited: {e}");
}
break;
res?;
}
}
}
ei_task.abort();
input_capture.disable(&session).await?;
if event_tx.is_closed() {
break Ok(());
break;
}
}
Ok(())
}
async fn release_capture(
@@ -410,7 +403,7 @@ async fn handle_ei_event(
current_client: Option<CaptureHandle>,
context: &ei::Context,
event_tx: &Sender<(CaptureHandle, Event)>,
) {
) -> Result<(), CaptureError> {
match ei_event {
EiEvent::SeatAdded(s) => {
s.seat.bind_capabilities(&[
@@ -421,7 +414,7 @@ async fn handle_ei_event(
DeviceCapability::Scroll,
DeviceCapability::Button,
]);
context.flush().unwrap();
context.flush().map_err(|e| io::Error::new(e.kind(), e))?;
}
EiEvent::SeatRemoved(_) => {}
EiEvent::DeviceAdded(_) => {}
@@ -439,7 +432,7 @@ async fn handle_ei_event(
event_tx
.send((current_client, Event::Keyboard(modifier_event)))
.await
.unwrap();
.map_err(|_| CaptureError::EndOfStream)?;
}
}
EiEvent::Frame(_) => {}
@@ -459,7 +452,7 @@ async fn handle_ei_event(
event_tx
.send((current_client, Event::Pointer(motion_event)))
.await
.unwrap();
.map_err(|_| CaptureError::EndOfStream)?;
}
}
EiEvent::PointerMotionAbsolute(_) => {}
@@ -476,7 +469,7 @@ async fn handle_ei_event(
event_tx
.send((current_client, Event::Pointer(button_event)))
.await
.unwrap();
.map_err(|_| CaptureError::EndOfStream)?;
}
}
EiEvent::ScrollDelta(delta) => {
@@ -500,7 +493,7 @@ async fn handle_ei_event(
event_tx
.send((handle, Event::Pointer(event)))
.await
.unwrap();
.map_err(|_| CaptureError::EndOfStream)?;
}
}
}
@@ -516,7 +509,7 @@ async fn handle_ei_event(
event_tx
.send((current_client, Event::Pointer(event)))
.await
.unwrap();
.map_err(|_| CaptureError::EndOfStream)?;
}
}
if scroll.discrete_dx != 0 {
@@ -528,7 +521,7 @@ async fn handle_ei_event(
event_tx
.send((current_client, Event::Pointer(event)))
.await
.unwrap();
.map_err(|_| CaptureError::EndOfStream)?;
}
};
}
@@ -545,7 +538,7 @@ async fn handle_ei_event(
event_tx
.send((current_client, Event::Keyboard(key_event)))
.await
.unwrap();
.map_err(|_| CaptureError::EndOfStream)?;
}
}
EiEvent::TouchDown(_) => {}
@@ -555,6 +548,7 @@ async fn handle_ei_event(
log::error!("disconnect: {d:?}");
}
}
Ok(())
}
impl<'a> LanMouseInputCapture for LibeiInputCapture<'a> {
@@ -584,12 +578,15 @@ impl<'a> LanMouseInputCapture for LibeiInputCapture<'a> {
}
impl<'a> Stream for LibeiInputCapture<'a> {
type Item = io::Result<(CaptureHandle, Event)>;
type Item = Result<(CaptureHandle, Event), CaptureError>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match ready!(self.event_rx.poll_recv(cx)) {
None => Poll::Ready(None),
Some(e) => Poll::Ready(Some(Ok(e))),
match self.libei_task.poll_unpin(cx) {
Poll::Ready(r) => match r.expect("failed to join") {
Ok(()) => Poll::Ready(None),
Err(e) => Poll::Ready(Some(Err(e))),
},
Poll::Pending => self.event_rx.poll_recv(cx).map(|e| e.map(Result::Ok)),
}
}
}

View File

@@ -1,4 +1,6 @@
use crate::{error::MacOSInputCaptureCreationError, CaptureHandle, InputCapture, Position};
use crate::{
error::MacOSInputCaptureCreationError, CaptureError, CaptureHandle, InputCapture, Position,
};
use futures_core::Stream;
use input_event::Event;
use std::task::{Context, Poll};
@@ -13,7 +15,7 @@ impl MacOSInputCapture {
}
impl Stream for MacOSInputCapture {
type Item = io::Result<(CaptureHandle, Event)>;
type Item = Result<(CaptureHandle, Event), CaptureError>;
fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Poll::Pending

View File

@@ -62,6 +62,8 @@ use tempfile;
use input_event::{Event, KeyboardEvent, PointerEvent};
use crate::CaptureError;
use super::{
error::{LayerShellCaptureCreationError, WaylandBindError},
CaptureHandle, InputCapture, Position,
@@ -582,7 +584,7 @@ impl InputCapture for WaylandInputCapture {
}
impl Stream for WaylandInputCapture {
type Item = io::Result<(CaptureHandle, Event)>;
type Item = Result<(CaptureHandle, Event), CaptureError>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if let Some(event) = self.0.get_mut().state.pending_events.pop_front() {
@@ -600,7 +602,7 @@ impl Stream for WaylandInputCapture {
// prepare next read
match inner.prepare_read() {
Ok(_) => {}
Err(e) => return Poll::Ready(Some(Err(e))),
Err(e) => return Poll::Ready(Some(Err(e.into()))),
}
}
@@ -610,14 +612,14 @@ impl Stream for WaylandInputCapture {
// flush outgoing events
if let Err(e) = inner.flush_events() {
if e.kind() != ErrorKind::WouldBlock {
return Poll::Ready(Some(Err(e)));
return Poll::Ready(Some(Err(e.into())));
}
}
// prepare for the next read
match inner.prepare_read() {
Ok(_) => {}
Err(e) => return Poll::Ready(Some(Err(e))),
Err(e) => return Poll::Ready(Some(Err(e.into()))),
}
}

View File

@@ -36,7 +36,7 @@ use input_event::{
Event, KeyboardEvent, PointerEvent, BTN_BACK, BTN_FORWARD, BTN_LEFT, BTN_MIDDLE, BTN_RIGHT,
};
use super::{CaptureHandle, InputCapture, Position};
use super::{CaptureError, CaptureHandle, InputCapture, Position};
enum Request {
Create(CaptureHandle, Position),
@@ -609,7 +609,7 @@ impl WindowsInputCapture {
}
impl Stream for WindowsInputCapture {
type Item = io::Result<(CaptureHandle, Event)>;
type Item = Result<(CaptureHandle, Event), CaptureError>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match ready!(self.event_rx.poll_recv(cx)) {
None => Poll::Ready(None),

View File

@@ -3,6 +3,8 @@ use std::task::Poll;
use futures_core::Stream;
use crate::CaptureError;
use super::InputCapture;
use input_event::Event;
@@ -32,7 +34,7 @@ impl InputCapture for X11InputCapture {
}
impl Stream for X11InputCapture {
type Item = io::Result<(CaptureHandle, Event)>;
type Item = Result<(CaptureHandle, Event), CaptureError>;
fn poll_next(
self: std::pin::Pin<&mut Self>,