From ef2b2a773ee8939b2c1e679aed1e2346c8f8a68e Mon Sep 17 00:00:00 2001 From: Ferdinand Schober Date: Fri, 8 Nov 2024 17:18:51 +0100 Subject: [PATCH] remove dependency on service from capture --- src/capture.rs | 518 +++++++++++++++++++++++++------------------------ src/service.rs | 15 +- 2 files changed, 265 insertions(+), 268 deletions(-) diff --git a/src/capture.rs b/src/capture.rs index 7419dda..68f057d 100644 --- a/src/capture.rs +++ b/src/capture.rs @@ -1,5 +1,5 @@ use std::{ - cell::Cell, + cell::{Cell, RefCell}, rc::Rc, time::{Duration, Instant}, }; @@ -8,14 +8,16 @@ use futures::StreamExt; use input_capture::{ CaptureError, CaptureEvent, CaptureHandle, InputCapture, InputCaptureError, Position, }; +use input_event::scancode; use lan_mouse_proto::ProtoEvent; use local_channel::mpsc::{channel, Receiver, Sender}; use tokio::task::{spawn_local, JoinHandle}; +use tokio_util::sync::CancellationToken; -use crate::{connect::LanMouseConnection, service::Service}; +use crate::connect::LanMouseConnection; pub(crate) struct Capture { - exit_requested: Rc>, + cancellation_token: CancellationToken, request_tx: Sender, task: JoinHandle<()>, event_rx: Receiver, @@ -37,16 +39,24 @@ pub(crate) enum ICaptureEvent { ClientEntered(u64), } +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub(crate) enum CaptureType { + /// a normal input capture + Default, + /// A capture only interested in [`CaptureEvent::Begin`] events. + /// The capture is released immediately, if there is no + /// Default capture at the same position. + EnterOnly, +} + #[derive(Clone, Copy, Debug)] enum CaptureRequest { /// capture must release the mouse Release, /// add a capture client - Create(CaptureHandle, Position), + Create(CaptureHandle, Position, CaptureType), /// destory a capture client Destroy(CaptureHandle), - /// terminate - Terminate, /// reenable input capture Reenable, } @@ -55,21 +65,25 @@ impl Capture { pub(crate) fn new( backend: Option, conn: LanMouseConnection, - service: Service, + release_bind: Vec, ) -> Self { let (request_tx, request_rx) = channel(); let (event_tx, event_rx) = channel(); - let exit_requested = Rc::new(Cell::new(false)); - let task = spawn_local(Self::run( - exit_requested.clone(), - service, + let cancellation_token = CancellationToken::new(); + let capture_task = CaptureTask { + active_client: None, backend, - request_rx, + cancellation_token: cancellation_token.clone(), + captures: Default::default(), conn, event_tx, - )); + request_rx, + release_bind: Rc::new(RefCell::new(release_bind)), + state: Default::default(), + }; + let task = spawn_local(capture_task.run()); Self { - exit_requested, + cancellation_token, request_tx, task, event_rx, @@ -83,19 +97,22 @@ impl Capture { } pub(crate) async fn terminate(&mut self) { - self.exit_requested.replace(true); - self.request_tx - .send(CaptureRequest::Terminate) - .expect("channel closed"); + self.cancellation_token.cancel(); log::debug!("terminating capture"); if let Err(e) = (&mut self.task).await { log::warn!("{e}"); } } - pub(crate) fn create(&self, handle: CaptureHandle, pos: lan_mouse_ipc::Position) { + pub(crate) fn create( + &self, + handle: CaptureHandle, + pos: lan_mouse_ipc::Position, + capture_type: CaptureType, + ) { + let pos = to_capture_pos(pos); self.request_tx - .send(CaptureRequest::Create(handle, to_capture_pos(pos))) + .send(CaptureRequest::Create(handle, pos, capture_type)) .expect("channel closed"); } @@ -114,156 +131,6 @@ impl Capture { pub(crate) async fn event(&mut self) -> ICaptureEvent { self.event_rx.recv().await.expect("channel closed") } - - async fn run( - exit_requested: Rc>, - service: Service, - backend: Option, - mut request_rx: Receiver, - mut conn: LanMouseConnection, - mut event_tx: Sender, - ) { - let mut active = None; - loop { - if let Err(e) = do_capture( - &mut active, - &service, - backend, - &mut conn, - &mut request_rx, - &mut event_tx, - ) - .await - { - log::warn!("input capture exited: {e}"); - } - if exit_requested.get() { - break; - } - loop { - match request_rx.recv().await.expect("channel closed") { - CaptureRequest::Reenable => break, - CaptureRequest::Terminate => return, - _ => {} - } - } - } - } -} - -async fn do_capture( - active: &mut Option, - service: &Service, - backend: Option, - conn: &mut LanMouseConnection, - request_rx: &mut Receiver, - event_tx: &mut Sender, -) -> Result<(), InputCaptureError> { - /* allow cancelling capture request */ - let mut capture = tokio::select! { - r = InputCapture::new(backend) => r?, - _ = wait_for_termination(request_rx) => return Ok(()), - }; - - let _capture_guard = DropGuard::new( - event_tx, - ICaptureEvent::CaptureEnabled, - ICaptureEvent::CaptureDisabled, - ); - - let clients = service.client_manager.active_clients(); - let clients = clients.iter().copied().map(|handle| { - ( - handle, - service - .client_manager - .get_pos(handle) - .expect("no such client"), - ) - }); - - /* create barriers for active clients */ - let r = create_clients(&mut capture, clients, request_rx).await; - if let Err(e) = r { - capture.terminate().await?; - return Err(e.into()); - } - - let r = do_capture_session(active, &mut capture, conn, event_tx, request_rx, service).await; - - // FIXME replace with async drop when stabilized - capture.terminate().await?; - - r -} - -async fn create_clients( - capture: &mut InputCapture, - clients: impl Iterator, - request_rx: &mut Receiver, -) -> Result<(), CaptureError> { - for (handle, pos) in clients { - tokio::select! { - r = capture.create(handle, to_capture_pos(pos)) => r?, - _ = wait_for_termination(request_rx) => return Ok(()), - } - } - Ok(()) -} - -async fn do_capture_session( - active: &mut Option, - capture: &mut InputCapture, - conn: &mut LanMouseConnection, - event_tx: &Sender, - request_rx: &mut Receiver, - service: &Service, -) -> Result<(), InputCaptureError> { - let mut state = State::WaitingForAck; - - loop { - tokio::select! { - event = capture.next() => match event { - Some(event) => handle_capture_event(active, service, capture, conn, event?, &mut state, event_tx).await?, - None => return Ok(()), - }, - (handle, event) = conn.recv() => { - if let Some(active) = active { - if handle != *active { - // we only care about events coming from the client we are currently connected to - // only `Ack` and `Leave` are relevant - continue - } - } - - match event { - // connection acknowlegded => set state to Sending - ProtoEvent::Ack(_) => { - log::info!("client {handle} acknowledged the connection!"); - state = State::Sending; - } - // client disconnected - ProtoEvent::Leave(_) => { - log::info!("releasing capture: left remote client device region"); - release_capture(capture, active).await?; - }, - _ => {} - } - }, - e = request_rx.recv() => match e.expect("channel closed") { - CaptureRequest::Reenable => { /* already active */ }, - CaptureRequest::Release => release_capture(capture, active).await?, - CaptureRequest::Create(h, p) => capture.create(h, p).await?, - CaptureRequest::Destroy(h) => capture.destroy(h).await?, - CaptureRequest::Terminate => break, - } - } - } - Ok(()) -} - -thread_local! { - static PREV_LOG: Cell> = const { Cell::new(None) }; } /// debounce a statement `$st`, i.e. the statement is executed only if the @@ -283,86 +150,235 @@ macro_rules! debounce { }; } -#[derive(Clone, Copy, Debug, PartialEq, Eq)] -enum State { - WaitingForAck, - Sending, +struct CaptureTask { + active_client: Option, + backend: Option, + cancellation_token: CancellationToken, + captures: Vec<(CaptureHandle, Position, CaptureType)>, + conn: LanMouseConnection, + event_tx: Sender, + release_bind: Rc>>, + request_rx: Receiver, + state: State, } -async fn handle_capture_event( - active: &mut Option, - service: &Service, - capture: &mut InputCapture, - conn: &LanMouseConnection, - event: (CaptureHandle, CaptureEvent), - state: &mut State, - event_tx: &Sender, -) -> Result<(), CaptureError> { - let (handle, event) = event; - log::trace!("({handle}): {event:?}"); - - if capture.keys_pressed(&service.config.release_bind) { - log::info!("releasing capture: release-bind pressed"); - return release_capture(capture, active).await; +impl CaptureTask { + fn add_capture(&mut self, handle: CaptureHandle, pos: Position, capture_type: CaptureType) { + self.captures.push((handle, pos, capture_type)); } - if event == CaptureEvent::Begin { - event_tx - .send(ICaptureEvent::CaptureBegin(handle)) - .expect("channel closed"); + fn remove_capture(&mut self, handle: CaptureHandle) { + self.captures.retain(|&(h, ..)| handle != h); } - // incoming connection - if handle >= Service::ENTER_HANDLE_BEGIN { - // if there is no active outgoing connection at the current capture, - // we release the capture - if let Some(pos) = service.get_incoming_pos(handle) { - if service.client_manager.client_at(pos).is_none() { + fn is_default_capture_at(&self, pos: Position) -> bool { + self.captures + .iter() + .any(|&(_, p, t)| p == pos && t == CaptureType::Default) + } + + fn get_pos(&self, handle: CaptureHandle) -> Position { + self.captures + .iter() + .find(|(h, ..)| *h == handle) + .expect("no such capture") + .1 + } + + fn get_type(&self, handle: CaptureHandle) -> CaptureType { + self.captures + .iter() + .find(|(h, ..)| *h == handle) + .expect("no such capture") + .2 + } + + async fn run(mut self) { + loop { + if let Err(e) = self.do_capture().await { + log::warn!("input capture exited: {e}"); + } + if self.cancellation_token.is_cancelled() { + break; + } + loop { + tokio::select! { + r = self.request_rx.recv() => match r.expect("channel closed") { + CaptureRequest::Reenable => break, + CaptureRequest::Create(h, p, t) => self.add_capture(h, p, t), + CaptureRequest::Destroy(h) => self.remove_capture(h), + CaptureRequest::Release => { /* nothing to do */ } + }, + _ = self.cancellation_token.cancelled() => break, + } + } + } + } + + async fn do_capture(&mut self) -> Result<(), InputCaptureError> { + /* allow cancelling capture request */ + let mut capture = tokio::select! { + r = InputCapture::new(self.backend) => r?, + _ = self.cancellation_token.cancelled() => return Ok(()), + }; + + let _capture_guard = DropGuard::new( + self.event_tx.clone(), + ICaptureEvent::CaptureEnabled, + ICaptureEvent::CaptureDisabled, + ); + + /* create barriers for active clients */ + let r = self.create_captures(&mut capture).await; + if let Err(e) = r { + capture.terminate().await?; + return Err(e.into()); + } + + let r = self.do_capture_session(&mut capture).await; + + // FIXME replace with async drop when stabilized + capture.terminate().await?; + + r + } + + async fn create_captures(&mut self, capture: &mut InputCapture) -> Result<(), CaptureError> { + let captures = self.captures.clone(); + for (handle, pos, _type) in captures { + tokio::select! { + r = capture.create(handle, pos) => r?, + _ = self.cancellation_token.cancelled() => return Ok(()), + } + } + Ok(()) + } + + async fn do_capture_session( + &mut self, + capture: &mut InputCapture, + ) -> Result<(), InputCaptureError> { + loop { + tokio::select! { + event = capture.next() => match event { + Some(event) => self.handle_capture_event(capture, event?).await?, + None => return Ok(()), + }, + (handle, event) = self.conn.recv() => { + if let Some(active) = self.active_client { + if handle != active { + // we only care about events coming from the client we are currently connected to + // only `Ack` and `Leave` are relevant + continue + } + } + + match event { + // connection acknowlegded => set state to Sending + ProtoEvent::Ack(_) => { + log::info!("client {handle} acknowledged the connection!"); + self.state = State::Sending; + } + // client disconnected + ProtoEvent::Leave(_) => { + log::info!("releasing capture: left remote client device region"); + self.release_capture(capture).await?; + }, + _ => {} + } + }, + e = self.request_rx.recv() => match e.expect("channel closed") { + CaptureRequest::Reenable => { /* already active */ }, + CaptureRequest::Release => self.release_capture(capture).await?, + CaptureRequest::Create(h, p, t) => { + self.add_capture(h, p, t); + capture.create(h, p).await?; + } + CaptureRequest::Destroy(h) => { + self.remove_capture(h); + capture.destroy(h).await?; + } + }, + _ = self.cancellation_token.cancelled() => break, + } + } + Ok(()) + } + + async fn handle_capture_event( + &mut self, + capture: &mut InputCapture, + event: (CaptureHandle, CaptureEvent), + ) -> Result<(), CaptureError> { + let (handle, event) = event; + log::trace!("({handle}): {event:?}"); + + if capture.keys_pressed(&self.release_bind.borrow()) { + log::info!("releasing capture: release-bind pressed"); + return self.release_capture(capture).await; + } + + if event == CaptureEvent::Begin { + self.event_tx + .send(ICaptureEvent::CaptureBegin(handle)) + .expect("channel closed"); + } + + // enter only capture (for incoming connections) + if self.get_type(handle) == CaptureType::EnterOnly { + // if there is no active outgoing connection at the current capture, + // we release the capture + if !self.is_default_capture_at(self.get_pos(handle)) { log::info!("releasing capture: no active client at this position"); capture.release().await?; } + // we dont care about events from incoming handles except for releasing the capture + return Ok(()); } - // we dont care about events from incoming handles except for releasing the capture - return Ok(()); + + // activated a new client + if event == CaptureEvent::Begin && Some(handle) != self.active_client { + self.state = State::WaitingForAck; + self.active_client.replace(handle); + self.event_tx + .send(ICaptureEvent::ClientEntered(handle)) + .expect("channel closed"); + } + + let opposite_pos = to_proto_pos(self.get_pos(handle).opposite()); + + let event = match event { + CaptureEvent::Begin => ProtoEvent::Enter(opposite_pos), + CaptureEvent::Input(e) => match self.state { + // connection not acknowledged, repeat `Enter` event + State::WaitingForAck => ProtoEvent::Enter(opposite_pos), + State::Sending => ProtoEvent::Input(e), + }, + }; + + if let Err(e) = self.conn.send(event, handle).await { + const DUR: Duration = Duration::from_millis(500); + debounce!(PREV_LOG, DUR, log::warn!("releasing capture: {e}")); + capture.release().await?; + } + Ok(()) } - // activated a new client - if event == CaptureEvent::Begin && Some(handle) != *active { - *state = State::WaitingForAck; - active.replace(handle); - event_tx - .send(ICaptureEvent::ClientEntered(handle)) - .expect("channel closed"); + async fn release_capture(&mut self, capture: &mut InputCapture) -> Result<(), CaptureError> { + self.active_client.take(); + capture.release().await } - - let pos = match service.client_manager.get_pos(handle) { - Some(pos) => to_proto_pos(pos.opposite()), - None => return release_capture(capture, active).await, - }; - - let event = match event { - CaptureEvent::Begin => ProtoEvent::Enter(pos), - CaptureEvent::Input(e) => match state { - // connection not acknowledged, repeat `Enter` event - State::WaitingForAck => ProtoEvent::Enter(pos), - State::Sending => ProtoEvent::Input(e), - }, - }; - - if let Err(e) = conn.send(event, handle).await { - const DUR: Duration = Duration::from_millis(500); - debounce!(PREV_LOG, DUR, log::warn!("releasing capture: {e}")); - capture.release().await?; - } - Ok(()) } -async fn release_capture( - capture: &mut InputCapture, - active: &mut Option, -) -> Result<(), CaptureError> { - active.take(); - capture.release().await +thread_local! { + static PREV_LOG: Cell> = const { Cell::new(None) }; +} + +#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)] +enum State { + #[default] + WaitingForAck, + Sending, } fn to_capture_pos(pos: lan_mouse_ipc::Position) -> input_capture::Position { @@ -374,41 +390,29 @@ fn to_capture_pos(pos: lan_mouse_ipc::Position) -> input_capture::Position { } } -fn to_proto_pos(pos: lan_mouse_ipc::Position) -> lan_mouse_proto::Position { +fn to_proto_pos(pos: input_capture::Position) -> lan_mouse_proto::Position { match pos { - lan_mouse_ipc::Position::Left => lan_mouse_proto::Position::Left, - lan_mouse_ipc::Position::Right => lan_mouse_proto::Position::Right, - lan_mouse_ipc::Position::Top => lan_mouse_proto::Position::Top, - lan_mouse_ipc::Position::Bottom => lan_mouse_proto::Position::Bottom, + input_capture::Position::Left => lan_mouse_proto::Position::Left, + input_capture::Position::Right => lan_mouse_proto::Position::Right, + input_capture::Position::Top => lan_mouse_proto::Position::Top, + input_capture::Position::Bottom => lan_mouse_proto::Position::Bottom, } } -async fn wait_for_termination(rx: &mut Receiver) { - loop { - match rx.recv().await.expect("channel closed") { - CaptureRequest::Terminate => return, - CaptureRequest::Release => continue, - CaptureRequest::Create(_, _) => continue, - CaptureRequest::Destroy(_) => continue, - CaptureRequest::Reenable => continue, - } - } -} - -struct DropGuard<'a, T> { - tx: &'a Sender, +struct DropGuard { + tx: Sender, on_drop: Option, } -impl<'a, T> DropGuard<'a, T> { - fn new(tx: &'a Sender, on_new: T, on_drop: T) -> Self { +impl DropGuard { + fn new(tx: Sender, on_new: T, on_drop: T) -> Self { tx.send(on_new).expect("channel closed"); let on_drop = Some(on_drop); Self { tx, on_drop } } } -impl<'a, T> Drop for DropGuard<'a, T> { +impl Drop for DropGuard { fn drop(&mut self) { self.tx .send(self.on_drop.take().expect("item")) diff --git a/src/service.rs b/src/service.rs index 42c1911..09f8679 100644 --- a/src/service.rs +++ b/src/service.rs @@ -1,5 +1,5 @@ use crate::{ - capture::{Capture, ICaptureEvent}, + capture::{Capture, CaptureType, ICaptureEvent}, client::ClientManager, config::Config, connect::LanMouseConnection, @@ -133,7 +133,7 @@ impl Service { // input capture + emulation let capture_backend = self.config.capture_backend.map(|b| b.into()); - let mut capture = Capture::new(capture_backend, conn, self.clone()); + let mut capture = Capture::new(capture_backend, conn, self.config.release_bind.clone()); let emulation_backend = self.config.emulation_backend.map(|b| b.into()); let mut emulation = Emulation::new(emulation_backend, listener); @@ -346,7 +346,7 @@ impl Service { ) { let handle = Self::ENTER_HANDLE_BEGIN + self.next_trigger_handle; self.next_trigger_handle += 1; - capture.create(handle, pos); + capture.create(handle, pos, CaptureType::EnterOnly); self.incoming_conns.borrow_mut().insert(addr); self.incoming_conn_info.borrow_mut().insert( handle, @@ -373,13 +373,6 @@ impl Service { .map(|incoming| incoming.addr) } - pub(crate) fn get_incoming_pos(&self, handle: ClientHandle) -> Option { - self.incoming_conn_info - .borrow() - .get(&handle) - .map(|incoming| incoming.pos) - } - fn notify_frontend(&self, event: FrontendEvent) { self.pending_frontend_events.borrow_mut().push_back(event); self.notifies.frontend_event_pending.notify_one(); @@ -441,7 +434,7 @@ impl Service { /* activate the client */ if self.client_manager.activate_client(handle) { /* notify capture and frontends */ - capture.create(handle, pos); + capture.create(handle, pos, CaptureType::Default); self.client_updated(handle); log::info!("activated client {handle} ({pos})"); }