use std::{ cell::{Cell, RefCell}, rc::Rc, time::{Duration, Instant}, }; use futures::StreamExt; use input_capture::{ CaptureError, CaptureEvent, CaptureHandle, InputCapture, InputCaptureError, Position, }; use input_event::scancode; use lan_mouse_proto::ProtoEvent; use local_channel::mpsc::{channel, Receiver, Sender}; use tokio::task::{spawn_local, JoinHandle}; use tokio_util::sync::CancellationToken; use crate::connect::LanMouseConnection; pub(crate) struct Capture { cancellation_token: CancellationToken, request_tx: Sender, task: JoinHandle<()>, event_rx: Receiver, } pub(crate) enum ICaptureEvent { /// a client was entered CaptureBegin(CaptureHandle), /// capture disabled CaptureDisabled, /// capture disabled CaptureEnabled, /// A (new) client was entered. /// In contrast to [`ICaptureEvent::CaptureBegin`] this /// event is only triggered when the capture was /// explicitly released in the meantime by /// either the remote client leaving its device region, /// a new device entering the screen or the release bind. ClientEntered(u64), } #[derive(Clone, Copy, Debug, PartialEq, Eq)] pub(crate) enum CaptureType { /// a normal input capture Default, /// A capture only interested in [`CaptureEvent::Begin`] events. /// The capture is released immediately, if there is no /// Default capture at the same position. EnterOnly, } #[derive(Clone, Copy, Debug)] enum CaptureRequest { /// capture must release the mouse Release, /// add a capture client Create(CaptureHandle, Position, CaptureType), /// destory a capture client Destroy(CaptureHandle), /// reenable input capture Reenable, } impl Capture { pub(crate) fn new( backend: Option, conn: LanMouseConnection, release_bind: Vec, ) -> Self { let (request_tx, request_rx) = channel(); let (event_tx, event_rx) = channel(); let cancellation_token = CancellationToken::new(); let capture_task = CaptureTask { active_client: None, backend, cancellation_token: cancellation_token.clone(), captures: Default::default(), conn, event_tx, request_rx, release_bind: Rc::new(RefCell::new(release_bind)), state: Default::default(), }; let task = spawn_local(capture_task.run()); Self { cancellation_token, request_tx, task, event_rx, } } pub(crate) fn reenable(&self) { self.request_tx .send(CaptureRequest::Reenable) .expect("channel closed"); } pub(crate) async fn terminate(&mut self) { self.cancellation_token.cancel(); log::debug!("terminating capture"); if let Err(e) = (&mut self.task).await { log::warn!("{e}"); } } pub(crate) fn create( &self, handle: CaptureHandle, pos: lan_mouse_ipc::Position, capture_type: CaptureType, ) { let pos = to_capture_pos(pos); self.request_tx .send(CaptureRequest::Create(handle, pos, capture_type)) .expect("channel closed"); } pub(crate) fn destroy(&self, handle: CaptureHandle) { self.request_tx .send(CaptureRequest::Destroy(handle)) .expect("channel closed"); } pub(crate) fn release(&self) { self.request_tx .send(CaptureRequest::Release) .expect("channel closed"); } pub(crate) async fn event(&mut self) -> ICaptureEvent { self.event_rx.recv().await.expect("channel closed") } } /// debounce a statement `$st`, i.e. the statement is executed only if the /// time since the previous execution is at least `$dur`. /// `$prev` is used to keep track of this timestamp macro_rules! debounce { ($prev:ident, $dur:expr, $st:stmt) => { let exec = match $prev.get() { None => true, Some(instant) if instant.elapsed() > $dur => true, _ => false, }; if exec { $prev.replace(Some(Instant::now())); $st } }; } struct CaptureTask { active_client: Option, backend: Option, cancellation_token: CancellationToken, captures: Vec<(CaptureHandle, Position, CaptureType)>, conn: LanMouseConnection, event_tx: Sender, release_bind: Rc>>, request_rx: Receiver, state: State, } impl CaptureTask { fn add_capture(&mut self, handle: CaptureHandle, pos: Position, capture_type: CaptureType) { self.captures.push((handle, pos, capture_type)); } fn remove_capture(&mut self, handle: CaptureHandle) { self.captures.retain(|&(h, ..)| handle != h); } fn is_default_capture_at(&self, pos: Position) -> bool { self.captures .iter() .any(|&(_, p, t)| p == pos && t == CaptureType::Default) } fn get_pos(&self, handle: CaptureHandle) -> Position { self.captures .iter() .find(|(h, ..)| *h == handle) .expect("no such capture") .1 } fn get_type(&self, handle: CaptureHandle) -> CaptureType { self.captures .iter() .find(|(h, ..)| *h == handle) .expect("no such capture") .2 } async fn run(mut self) { loop { if let Err(e) = self.do_capture().await { log::warn!("input capture exited: {e}"); } loop { tokio::select! { r = self.request_rx.recv() => match r.expect("channel closed") { CaptureRequest::Reenable => break, CaptureRequest::Create(h, p, t) => self.add_capture(h, p, t), CaptureRequest::Destroy(h) => self.remove_capture(h), CaptureRequest::Release => { /* nothing to do */ } }, _ = self.cancellation_token.cancelled() => return, } } } } async fn do_capture(&mut self) -> Result<(), InputCaptureError> { /* allow cancelling capture request */ let mut capture = tokio::select! { r = InputCapture::new(self.backend) => r?, _ = self.cancellation_token.cancelled() => return Ok(()), }; let _capture_guard = DropGuard::new( self.event_tx.clone(), ICaptureEvent::CaptureEnabled, ICaptureEvent::CaptureDisabled, ); /* create barriers for active clients */ let r = self.create_captures(&mut capture).await; if let Err(e) = r { capture.terminate().await?; return Err(e.into()); } let r = self.do_capture_session(&mut capture).await; // FIXME replace with async drop when stabilized capture.terminate().await?; r } async fn create_captures(&mut self, capture: &mut InputCapture) -> Result<(), CaptureError> { let captures = self.captures.clone(); for (handle, pos, _type) in captures { tokio::select! { r = capture.create(handle, pos) => r?, _ = self.cancellation_token.cancelled() => return Ok(()), } } Ok(()) } async fn do_capture_session( &mut self, capture: &mut InputCapture, ) -> Result<(), InputCaptureError> { loop { tokio::select! { event = capture.next() => match event { Some(event) => self.handle_capture_event(capture, event?).await?, None => return Ok(()), }, (handle, event) = self.conn.recv() => { if let Some(active) = self.active_client { if handle != active { // we only care about events coming from the client we are currently connected to // only `Ack` and `Leave` are relevant continue } } match event { // connection acknowlegded => set state to Sending ProtoEvent::Ack(_) => { log::info!("client {handle} acknowledged the connection!"); self.state = State::Sending; } // client disconnected ProtoEvent::Leave(_) => { log::info!("releasing capture: left remote client device region"); self.release_capture(capture).await?; }, _ => {} } }, e = self.request_rx.recv() => match e.expect("channel closed") { CaptureRequest::Reenable => { /* already active */ }, CaptureRequest::Release => self.release_capture(capture).await?, CaptureRequest::Create(h, p, t) => { self.add_capture(h, p, t); capture.create(h, p).await?; } CaptureRequest::Destroy(h) => { self.remove_capture(h); capture.destroy(h).await?; } }, _ = self.cancellation_token.cancelled() => break, } } Ok(()) } async fn handle_capture_event( &mut self, capture: &mut InputCapture, event: (CaptureHandle, CaptureEvent), ) -> Result<(), CaptureError> { let (handle, event) = event; log::trace!("({handle}): {event:?}"); if capture.keys_pressed(&self.release_bind.borrow()) { log::info!("releasing capture: release-bind pressed"); return self.release_capture(capture).await; } if event == CaptureEvent::Begin { self.event_tx .send(ICaptureEvent::CaptureBegin(handle)) .expect("channel closed"); } // enter only capture (for incoming connections) if self.get_type(handle) == CaptureType::EnterOnly { // if there is no active outgoing connection at the current capture, // we release the capture if !self.is_default_capture_at(self.get_pos(handle)) { log::info!("releasing capture: no active client at this position"); capture.release().await?; } // we dont care about events from incoming handles except for releasing the capture return Ok(()); } // activated a new client if event == CaptureEvent::Begin && Some(handle) != self.active_client { self.state = State::WaitingForAck; self.active_client.replace(handle); self.event_tx .send(ICaptureEvent::ClientEntered(handle)) .expect("channel closed"); } let opposite_pos = to_proto_pos(self.get_pos(handle).opposite()); let event = match event { CaptureEvent::Begin => ProtoEvent::Enter(opposite_pos), CaptureEvent::Input(e) => match self.state { // connection not acknowledged, repeat `Enter` event State::WaitingForAck => ProtoEvent::Enter(opposite_pos), State::Sending => ProtoEvent::Input(e), }, }; if let Err(e) = self.conn.send(event, handle).await { const DUR: Duration = Duration::from_millis(500); debounce!(PREV_LOG, DUR, log::warn!("releasing capture: {e}")); capture.release().await?; } Ok(()) } async fn release_capture(&mut self, capture: &mut InputCapture) -> Result<(), CaptureError> { self.active_client.take(); capture.release().await } } thread_local! { static PREV_LOG: Cell> = const { Cell::new(None) }; } #[derive(Clone, Copy, Debug, Default, PartialEq, Eq)] enum State { #[default] WaitingForAck, Sending, } fn to_capture_pos(pos: lan_mouse_ipc::Position) -> input_capture::Position { match pos { lan_mouse_ipc::Position::Left => input_capture::Position::Left, lan_mouse_ipc::Position::Right => input_capture::Position::Right, lan_mouse_ipc::Position::Top => input_capture::Position::Top, lan_mouse_ipc::Position::Bottom => input_capture::Position::Bottom, } } fn to_proto_pos(pos: input_capture::Position) -> lan_mouse_proto::Position { match pos { input_capture::Position::Left => lan_mouse_proto::Position::Left, input_capture::Position::Right => lan_mouse_proto::Position::Right, input_capture::Position::Top => lan_mouse_proto::Position::Top, input_capture::Position::Bottom => lan_mouse_proto::Position::Bottom, } } struct DropGuard { tx: Sender, on_drop: Option, } impl DropGuard { fn new(tx: Sender, on_new: T, on_drop: T) -> Self { tx.send(on_new).expect("channel closed"); let on_drop = Some(on_drop); Self { tx, on_drop } } } impl Drop for DropGuard { fn drop(&mut self) { self.tx .send(self.on_drop.take().expect("item")) .expect("channel closed"); } }