use crate::listen::{LanMouseListener, ListenEvent, ListenerCreationError}; use futures::StreamExt; use input_emulation::{EmulationHandle, InputEmulation, InputEmulationError}; use input_event::Event; use lan_mouse_proto::{Position, ProtoEvent}; use local_channel::mpsc::{channel, Receiver, Sender}; use std::{ cell::Cell, collections::HashMap, net::SocketAddr, rc::Rc, time::{Duration, Instant}, }; use tokio::{ select, task::{spawn_local, JoinHandle}, }; /// emulation handling events received from a listener pub(crate) struct Emulation { task: JoinHandle<()>, request_tx: Sender, event_rx: Receiver, } pub(crate) enum EmulationEvent { Connected { addr: SocketAddr, fingerprint: String, }, ConnectionAttempt { fingerprint: String, }, /// new connection Entered { /// address of the connection addr: SocketAddr, /// position of the connection pos: lan_mouse_ipc::Position, /// certificate fingerprint of the connection fingerprint: String, }, /// connection closed Disconnected { addr: SocketAddr, }, /// the port of the listener has changed PortChanged(Result), /// emulation was disabled EmulationDisabled, /// emulation was enabled EmulationEnabled, /// capture should be released ReleaseNotify, } enum EmulationRequest { Reenable, Release(SocketAddr), ChangePort(u16), Terminate, } impl Emulation { pub(crate) fn new( backend: Option, listener: LanMouseListener, ) -> Self { let emulation_proxy = EmulationProxy::new(backend); let (request_tx, request_rx) = channel(); let (event_tx, event_rx) = channel(); let emulation_task = ListenTask { listener, emulation_proxy, request_rx, event_tx, }; let task = spawn_local(emulation_task.run()); Self { task, request_tx, event_rx, } } pub(crate) fn send_leave_event(&self, addr: SocketAddr) { self.request_tx .send(EmulationRequest::Release(addr)) .expect("channel closed"); } pub(crate) fn reenable(&self) { self.request_tx .send(EmulationRequest::Reenable) .expect("channel closed"); } pub(crate) fn request_port_change(&self, port: u16) { self.request_tx .send(EmulationRequest::ChangePort(port)) .expect("channel closed") } pub(crate) async fn event(&mut self) -> EmulationEvent { self.event_rx.recv().await.expect("channel closed") } /// wait for termination pub(crate) async fn terminate(&mut self) { log::debug!("terminating emulation"); self.request_tx .send(EmulationRequest::Terminate) .expect("channel closed"); if let Err(e) = (&mut self.task).await { log::warn!("{e}"); } } } struct ListenTask { listener: LanMouseListener, emulation_proxy: EmulationProxy, request_rx: Receiver, event_tx: Sender, } impl ListenTask { async fn run(mut self) { let mut interval = tokio::time::interval(Duration::from_secs(5)); let mut last_response = HashMap::new(); let mut rejected_connections = HashMap::new(); loop { select! { e = self.listener.next() => {match e { Some(ListenEvent::Msg { event, addr }) => { log::trace!("{event} <-<-<-<-<- {addr}"); last_response.insert(addr, Instant::now()); match event { ProtoEvent::Enter(pos) => { if let Some(fingerprint) = self.listener.get_certificate_fingerprint(addr).await { log::info!("releasing capture: {addr} entered this device"); self.event_tx.send(EmulationEvent::ReleaseNotify).expect("channel closed"); self.listener.reply(addr, ProtoEvent::Ack(0)).await; self.event_tx.send(EmulationEvent::Entered{addr, pos: to_ipc_pos(pos), fingerprint}).expect("channel closed"); } } ProtoEvent::Leave(_) => { self.emulation_proxy.remove(addr); self.listener.reply(addr, ProtoEvent::Ack(0)).await; } ProtoEvent::Input(event) => self.emulation_proxy.consume(event, addr), ProtoEvent::Ping => self.listener.reply(addr, ProtoEvent::Pong(self.emulation_proxy.emulation_active.get())).await, _ => {} } } Some(ListenEvent::Accept { addr, fingerprint }) => { self.event_tx.send(EmulationEvent::Connected { addr, fingerprint }).expect("channel closed"); } Some(ListenEvent::Rejected { fingerprint }) => { if rejected_connections.insert(fingerprint.clone(), Instant::now()) .is_none_or(|i| i.elapsed() >= Duration::from_secs(2)) { self.event_tx.send(EmulationEvent::ConnectionAttempt { fingerprint }).expect("channel closed"); } } None => break }} event = self.emulation_proxy.event() => { self.event_tx.send(event).expect("channel closed"); } request = self.request_rx.recv() => match request.expect("channel closed") { // reenable emulation EmulationRequest::Reenable => self.emulation_proxy.reenable(), // notify the other end that we hit a barrier (should release capture) EmulationRequest::Release(addr) => self.listener.reply(addr, ProtoEvent::Leave(0)).await, EmulationRequest::ChangePort(port) => { self.listener.request_port_change(port); let result = self.listener.port_changed().await; self.event_tx.send(EmulationEvent::PortChanged(result)).expect("channel closed"); } EmulationRequest::Terminate => break, }, _ = interval.tick() => { last_response.retain(|&addr,instant| { if instant.elapsed() > Duration::from_secs(1) { log::warn!("releasing keys: {addr} not responding!"); self.emulation_proxy.remove(addr); self.event_tx.send(EmulationEvent::Disconnected { addr }).expect("channel closed"); false } else { true } }); } } } self.listener.terminate().await; self.emulation_proxy.terminate().await; } } /// proxy handling the actual input emulation, /// discarding events when it is disabled pub(crate) struct EmulationProxy { emulation_active: Rc>, exit_requested: Rc>, request_tx: Sender, event_rx: Receiver, task: JoinHandle<()>, } enum ProxyRequest { Input(Event, SocketAddr), Remove(SocketAddr), Terminate, Reenable, } impl EmulationProxy { fn new(backend: Option) -> Self { let (request_tx, request_rx) = channel(); let (event_tx, event_rx) = channel(); let emulation_active = Rc::new(Cell::new(false)); let exit_requested = Rc::new(Cell::new(false)); let emulation_task = EmulationTask { backend, exit_requested: exit_requested.clone(), request_rx, event_tx, handles: Default::default(), next_id: 0, }; let task = spawn_local(emulation_task.run()); Self { emulation_active, exit_requested, request_tx, task, event_rx, } } async fn event(&mut self) -> EmulationEvent { let event = self.event_rx.recv().await.expect("channel closed"); if let EmulationEvent::EmulationEnabled = event { self.emulation_active.replace(true); } if let EmulationEvent::EmulationDisabled = event { self.emulation_active.replace(false); } event } fn consume(&self, event: Event, addr: SocketAddr) { // ignore events if emulation is currently disabled if self.emulation_active.get() { self.request_tx .send(ProxyRequest::Input(event, addr)) .expect("channel closed"); } } fn remove(&self, addr: SocketAddr) { self.request_tx .send(ProxyRequest::Remove(addr)) .expect("channel closed"); } fn reenable(&self) { self.request_tx .send(ProxyRequest::Reenable) .expect("channel closed"); } async fn terminate(&mut self) { self.exit_requested.replace(true); self.request_tx .send(ProxyRequest::Terminate) .expect("channel closed"); let _ = (&mut self.task).await; } } struct EmulationTask { backend: Option, exit_requested: Rc>, request_rx: Receiver, event_tx: Sender, handles: HashMap, next_id: EmulationHandle, } impl EmulationTask { async fn run(mut self) { loop { if let Err(e) = self.do_emulation().await { log::warn!("input emulation exited: {e}"); } if self.exit_requested.get() { break; } // wait for reenable request loop { match self.request_rx.recv().await.expect("channel closed") { ProxyRequest::Reenable => break, ProxyRequest::Terminate => return, ProxyRequest::Input(..) => { /* emulation inactive => ignore */ } ProxyRequest::Remove(..) => { /* emulation inactive => ignore */ } } } } } async fn do_emulation(&mut self) -> Result<(), InputEmulationError> { log::info!("creating input emulation ..."); let mut emulation = tokio::select! { r = InputEmulation::new(self.backend) => r?, // allow termination event while requesting input emulation _ = wait_for_termination(&mut self.request_rx) => return Ok(()), }; // used to send enabled and disabled events let _emulation_guard = DropGuard::new( self.event_tx.clone(), EmulationEvent::EmulationEnabled, EmulationEvent::EmulationDisabled, ); // create active handles if let Err(e) = self.create_clients(&mut emulation).await { emulation.terminate().await; return Err(e); } let res = self.do_emulation_session(&mut emulation).await; // FIXME replace with async drop when stabilized emulation.terminate().await; res } async fn create_clients( &mut self, emulation: &mut InputEmulation, ) -> Result<(), InputEmulationError> { for handle in self.handles.values() { tokio::select! { _ = emulation.create(*handle) => {}, _ = wait_for_termination(&mut self.request_rx) => return Ok(()), } } Ok(()) } async fn do_emulation_session( &mut self, emulation: &mut InputEmulation, ) -> Result<(), InputEmulationError> { loop { tokio::select! { e = self.request_rx.recv() => match e.expect("channel closed") { ProxyRequest::Input(event, addr) => { let handle = match self.handles.get(&addr) { Some(&handle) => handle, None => { let handle = self.next_id; self.next_id += 1; emulation.create(handle).await; self.handles.insert(addr, handle); handle } }; emulation.consume(event, handle).await?; }, ProxyRequest::Remove(addr) => { if let Some(handle) = self.handles.remove(&addr) { emulation.destroy(handle).await; } } ProxyRequest::Terminate => break Ok(()), ProxyRequest::Reenable => continue, }, } } } } fn to_ipc_pos(pos: Position) -> lan_mouse_ipc::Position { match pos { Position::Left => lan_mouse_ipc::Position::Left, Position::Right => lan_mouse_ipc::Position::Right, Position::Top => lan_mouse_ipc::Position::Top, Position::Bottom => lan_mouse_ipc::Position::Bottom, } } async fn wait_for_termination(rx: &mut Receiver) { loop { match rx.recv().await.expect("channel closed") { ProxyRequest::Terminate => return, ProxyRequest::Input(_, _) => continue, ProxyRequest::Remove(_) => continue, ProxyRequest::Reenable => continue, } } } struct DropGuard { tx: Sender, on_drop: Option, } impl DropGuard { fn new(tx: Sender, on_new: T, on_drop: T) -> Self { tx.send(on_new).expect("channel closed"); let on_drop = Some(on_drop); Self { tx, on_drop } } } impl Drop for DropGuard { fn drop(&mut self) { self.tx .send(self.on_drop.take().expect("item")) .expect("channel closed"); } }