This commit is contained in:
Ferdinand Schober
2024-07-09 20:09:28 +02:00
parent 3933b6bf24
commit cb6ad7f1ac
5 changed files with 188 additions and 146 deletions

View File

@@ -1,3 +1,4 @@
use reis::tokio::EiConvertEventStreamError;
use thiserror::Error;
#[cfg(all(unix, feature = "wayland", not(target_os = "macos")))]
@@ -9,6 +10,39 @@ use wayland_client::{
ConnectError, DispatchError,
};
#[cfg(all(unix, feature = "libei", not(target_os = "macos")))]
use reis::tokio::HandshakeError;
#[cfg(all(unix, feature = "libei", not(target_os = "macos")))]
#[derive(Debug, Error)]
#[error("error in libei stream: {inner:?}")]
pub struct ReisConvertEventStreamError {
inner: EiConvertEventStreamError,
}
#[cfg(all(unix, feature = "libei", not(target_os = "macos")))]
impl From<EiConvertEventStreamError> for ReisConvertEventStreamError {
fn from(e: EiConvertEventStreamError) -> Self {
Self { inner: e }
}
}
#[derive(Debug, Error)]
pub enum CaptureError {
#[error("activation stream closed unexpectedly")]
ActivationClosed,
#[error("libei stream was closed")]
EndOfStream,
#[error("io error: `{0}`")]
Io(#[from] std::io::Error),
#[error("error in libei stream: `{0}`")]
Reis(#[from] ReisConvertEventStreamError),
#[error("libei handshake failed: `{0}`")]
Handshake(#[from] HandshakeError),
#[error(transparent)]
Portal(#[from] ashpd::Error),
}
#[derive(Debug, Error)]
pub enum CaptureCreationError {
#[error("no backend available")]

View File

@@ -4,7 +4,7 @@ use futures_core::Stream;
use input_event::Event;
use self::error::CaptureCreationError;
pub use error::{CaptureCreationError, CaptureError};
pub mod error;

View File

@@ -1,4 +1,3 @@
use anyhow::{anyhow, Result};
use ashpd::{
desktop::{
input_capture::{Activated, Barrier, BarrierID, Capabilities, InputCapture, Region, Zones},
@@ -32,6 +31,8 @@ use once_cell::sync::Lazy;
use input_event::{Event, KeyboardEvent, PointerEvent};
use crate::error::{CaptureError, ReisConvertEventStreamError};
use super::{
error::LibeiCaptureCreationError, CaptureHandle, InputCapture as LanMouseInputCapture, Position,
};
@@ -46,7 +47,7 @@ enum ProducerEvent {
#[allow(dead_code)]
pub struct LibeiInputCapture<'a> {
input_capture: Pin<Box<InputCapture<'a>>>,
libei_task: JoinHandle<Result<()>>,
libei_task: JoinHandle<Result<(), CaptureError>>,
event_rx: tokio::sync::mpsc::Receiver<(CaptureHandle, Event)>,
notify_tx: tokio::sync::mpsc::Sender<ProducerEvent>,
}
@@ -108,7 +109,7 @@ async fn update_barriers(
session: &Session<'_>,
active_clients: &Vec<(CaptureHandle, Position)>,
next_barrier_id: &mut u32,
) -> Result<HashMap<BarrierID, CaptureHandle>> {
) -> Result<HashMap<BarrierID, CaptureHandle>, ashpd::Error> {
let zones = input_capture.zones(session).await?.response()?;
log::debug!("zones: {zones:?}");
@@ -154,7 +155,7 @@ async fn create_session<'a>(
async fn connect_to_eis(
input_capture: &InputCapture<'_>,
session: &Session<'_>,
) -> Result<(ei::Context, EiConvertEventStream)> {
) -> Result<(ei::Context, EiConvertEventStream), CaptureError> {
log::debug!("connect_to_eis");
let fd = input_capture.connect_to_eis(session).await?;
@@ -165,17 +166,13 @@ async fn connect_to_eis(
// create ei context
let context = ei::Context::new(stream)?;
let mut event_stream = EiEventStream::new(context.clone())?;
let response = match reis::tokio::ei_handshake(
let response = reis::tokio::ei_handshake(
&mut event_stream,
"de.feschber.LanMouse",
ei::handshake::ContextType::Receiver,
&INTERFACES,
)
.await
{
Ok(res) => res,
Err(e) => return Err(anyhow!("ei handshake failed: {e:?}")),
};
.await?;
let event_stream = EiConvertEventStream::new(event_stream, response.serial);
Ok((context, event_stream))
@@ -186,13 +183,13 @@ async fn libei_event_handler(
context: ei::Context,
event_tx: Sender<(CaptureHandle, Event)>,
current_client: Rc<Cell<Option<CaptureHandle>>>,
) -> Result<()> {
) -> Result<(), CaptureError> {
loop {
let ei_event = match ei_event_stream.next().await {
Some(Ok(event)) => event,
Some(Err(e)) => return Err(anyhow!("libei connection closed: {e:?}")),
None => return Err(anyhow!("libei connection closed")),
};
let ei_event = ei_event_stream
.next()
.await
.ok_or(CaptureError::EndOfStream)?
.map_err(ReisConvertEventStreamError::from)?;
log::trace!("from ei: {ei_event:?}");
let client = current_client.get();
handle_ei_event(ei_event, client, &context, &event_tx).await;
@@ -202,136 +199,26 @@ async fn libei_event_handler(
async fn wait_for_active_client(
notify_rx: &mut Receiver<ProducerEvent>,
active_clients: &mut Vec<(CaptureHandle, Position)>,
) -> Result<()> {
) {
// wait for a client update
while let Some(producer_event) = notify_rx.recv().await {
if let ProducerEvent::Create(c, p) = producer_event {
handle_producer_event(ProducerEvent::Create(c, p), active_clients)?;
handle_producer_event(ProducerEvent::Create(c, p), active_clients);
break;
}
}
Ok(())
}
impl<'a> LibeiInputCapture<'a> {
pub async fn new() -> std::result::Result<Self, LibeiCaptureCreationError> {
let input_capture = Box::pin(InputCapture::new().await?);
let input_capture_ptr = input_capture.as_ref().get_ref() as *const InputCapture<'static>;
let mut first_session = Some(create_session(unsafe { &*input_capture_ptr }).await?);
let first_session = Some(create_session(unsafe { &*input_capture_ptr }).await?);
let (event_tx, event_rx) = tokio::sync::mpsc::channel(32);
let (notify_tx, mut notify_rx) = tokio::sync::mpsc::channel(32);
let libei_task = tokio::task::spawn_local(async move {
/* safety: libei_task does not outlive Self */
let input_capture = unsafe { &*input_capture_ptr };
let mut active_clients: Vec<(CaptureHandle, Position)> = vec![];
let mut next_barrier_id = 1u32;
/* there is a bug in xdg-remote-desktop-portal-gnome / mutter that
* prevents receiving further events after a session has been disabled once.
* Therefore the session needs to recreated when the barriers are updated */
loop {
// otherwise it asks to capture input even with no active clients
if active_clients.is_empty() {
wait_for_active_client(&mut notify_rx, &mut active_clients).await?;
continue;
}
let current_client = Rc::new(Cell::new(None));
// create session
let (session, _) = match first_session.take() {
Some(s) => s,
_ => create_session(input_capture).await?,
};
// connect to eis server
let (context, ei_event_stream) = connect_to_eis(input_capture, &session).await?;
// async event task
let mut ei_task: JoinHandle<Result<(), anyhow::Error>> =
tokio::task::spawn_local(libei_event_handler(
ei_event_stream,
context,
event_tx.clone(),
current_client.clone(),
));
let mut activated = input_capture.receive_activated().await?;
let mut zones_changed = input_capture.receive_zones_changed().await?;
// set barriers
let client_for_barrier_id = update_barriers(
input_capture,
&session,
&active_clients,
&mut next_barrier_id,
)
.await?;
log::debug!("enabling session");
input_capture.enable(&session).await?;
loop {
tokio::select! {
activated = activated.next() => {
let activated = activated.ok_or(anyhow!("error receiving activation token"))?;
log::debug!("activated: {activated:?}");
let client = *client_for_barrier_id
.get(&activated.barrier_id())
.expect("invalid barrier id");
current_client.replace(Some(client));
event_tx.send((client, Event::Enter())).await?;
tokio::select! {
producer_event = notify_rx.recv() => {
let producer_event = producer_event.expect("channel closed");
if handle_producer_event(producer_event, &mut active_clients)? {
break; /* clients updated */
}
}
zones_changed = zones_changed.next() => {
log::debug!("zones changed: {zones_changed:?}");
break;
}
res = &mut ei_task => {
if let Err(e) = res.expect("ei task paniced") {
log::warn!("libei task exited: {e}");
}
break;
}
}
release_capture(
input_capture,
&session,
activated,
client,
&active_clients,
).await?;
}
producer_event = notify_rx.recv() => {
let producer_event = producer_event.expect("channel closed");
if handle_producer_event(producer_event, &mut active_clients)? {
/* clients updated */
break;
}
},
res = &mut ei_task => {
if let Err(e) = res.expect("ei task paniced") {
log::warn!("libei task exited: {e}");
}
break;
}
}
}
ei_task.abort();
input_capture.disable(&session).await?;
}
});
let (notify_tx, notify_rx) = tokio::sync::mpsc::channel(32);
let capture = do_capture(input_capture_ptr, notify_rx, first_session, event_tx);
let libei_task = tokio::task::spawn_local(capture);
let producer = Self {
input_capture,
@@ -344,13 +231,139 @@ impl<'a> LibeiInputCapture<'a> {
}
}
async fn do_capture<'a>(
input_capture_ptr: *const InputCapture<'static>,
mut notify_rx: Receiver<ProducerEvent>,
mut first_session: Option<(Session<'a>, BitFlags<Capabilities>)>,
event_tx: Sender<(CaptureHandle, Event)>,
) -> Result<(), CaptureError> {
/* safety: libei_task does not outlive Self */
let input_capture = unsafe { &*input_capture_ptr };
let mut active_clients: Vec<(CaptureHandle, Position)> = vec![];
let mut next_barrier_id = 1u32;
/* there is a bug in xdg-remote-desktop-portal-gnome / mutter that
* prevents receiving further events after a session has been disabled once.
* Therefore the session needs to recreated when the barriers are updated */
loop {
// otherwise it asks to capture input even with no active clients
if active_clients.is_empty() {
wait_for_active_client(&mut notify_rx, &mut active_clients).await;
if notify_rx.is_closed() {
break Ok(());
} else {
continue;
}
}
let current_client = Rc::new(Cell::new(None));
// create session
let (session, _) = match first_session.take() {
Some(s) => s,
_ => create_session(input_capture).await?,
};
// connect to eis server
let (context, ei_event_stream) = connect_to_eis(input_capture, &session).await?;
// async event task
let mut ei_task: JoinHandle<Result<(), CaptureError>> =
tokio::task::spawn_local(libei_event_handler(
ei_event_stream,
context,
event_tx.clone(),
current_client.clone(),
));
let mut activated = input_capture.receive_activated().await?;
let mut zones_changed = input_capture.receive_zones_changed().await?;
// set barriers
let client_for_barrier_id = update_barriers(
input_capture,
&session,
&active_clients,
&mut next_barrier_id,
)
.await?;
log::debug!("enabling session");
input_capture.enable(&session).await?;
loop {
tokio::select! {
activated = activated.next() => {
let activated = activated.ok_or(CaptureError::ActivationClosed)?;
log::debug!("activated: {activated:?}");
let client = *client_for_barrier_id
.get(&activated.barrier_id())
.expect("invalid barrier id");
current_client.replace(Some(client));
if event_tx.send((client, Event::Enter())).await.is_err() {
break;
};
tokio::select! {
producer_event = notify_rx.recv() => {
let producer_event = producer_event.expect("channel closed");
if handle_producer_event(producer_event, &mut active_clients) {
break; /* clients updated */
}
}
zones_changed = zones_changed.next() => {
log::debug!("zones changed: {zones_changed:?}");
break;
}
res = &mut ei_task => {
if let Err(e) = res.expect("ei task paniced") {
log::warn!("libei task exited: {e}");
}
break;
}
}
release_capture(
input_capture,
&session,
activated,
client,
&active_clients,
).await?;
}
producer_event = notify_rx.recv() => {
let producer_event = producer_event.expect("channel closed");
if handle_producer_event(producer_event, &mut active_clients) {
/* clients updated */
break;
}
},
res = &mut ei_task => {
if let Err(e) = res.expect("ei task paniced") {
log::warn!("libei task exited: {e}");
}
break;
}
}
}
ei_task.abort();
input_capture.disable(&session).await?;
if event_tx.is_closed() {
break Ok(());
}
}
}
async fn release_capture(
input_capture: &InputCapture<'_>,
session: &Session<'_>,
activated: Activated,
current_client: CaptureHandle,
active_clients: &[(CaptureHandle, Position)],
) -> Result<()> {
) -> Result<(), CaptureError> {
log::debug!("releasing input capture {}", activated.activation_id());
let (x, y) = activated.cursor_position();
let pos = active_clients
@@ -377,9 +390,9 @@ async fn release_capture(
fn handle_producer_event(
producer_event: ProducerEvent,
active_clients: &mut Vec<(CaptureHandle, Position)>,
) -> Result<bool> {
) -> bool {
log::debug!("handling event: {producer_event:?}");
let updated = match producer_event {
match producer_event {
ProducerEvent::Release => false,
ProducerEvent::Create(c, p) => {
active_clients.push((c, p));
@@ -389,8 +402,7 @@ fn handle_producer_event(
active_clients.retain(|(h, _)| *h != c);
true
}
};
Ok(updated)
}
}
async fn handle_ei_event(

View File

@@ -38,11 +38,7 @@ async fn input_emulation_test(config: Config) -> Result<()> {
let relative_motion = (new_offset.0 - offset.0, new_offset.1 - offset.1);
offset = new_offset;
let (dx, dy) = (relative_motion.0 as f64, relative_motion.1 as f64);
let event = Event::Pointer(PointerEvent::Motion {
time: 0,
dx: dx,
dy: dy,
});
let event = Event::Pointer(PointerEvent::Motion { time: 0, dx, dy });
emulation.consume(event, 0).await?;
}
}

View File

@@ -80,8 +80,8 @@ async fn udp_receiver(
receiver_tx: &Sender<Result<(Event, SocketAddr), NetworkError>>,
) {
loop {
let event = receive_event(&socket).await;
if let Err(_) = receiver_tx.send(event).await {
let event = receive_event(socket).await;
if receiver_tx.send(event).await.is_err() {
break;
}
}
@@ -93,7 +93,7 @@ async fn udp_sender(socket: &UdpSocket, rx: &mut Receiver<(Event, SocketAddr)>)
Some(e) => e,
None => return,
};
if let Err(e) = send_event(&socket, event, addr) {
if let Err(e) = send_event(socket, event, addr) {
log::warn!("udp send failed: {e}");
};
}