From 2be9fbe2a4d0b86059f16504ab5df201bbec4d6e Mon Sep 17 00:00:00 2001 From: Ferdinand Schober Date: Fri, 8 Nov 2024 17:49:47 +0100 Subject: [PATCH] cleanup emulation --- src/emulation.rs | 388 +++++++++++++++++++++++------------------------ 1 file changed, 193 insertions(+), 195 deletions(-) diff --git a/src/emulation.rs b/src/emulation.rs index 6deb34d..914a9cb 100644 --- a/src/emulation.rs +++ b/src/emulation.rs @@ -60,7 +60,13 @@ impl Emulation { let emulation_proxy = EmulationProxy::new(backend); let (request_tx, request_rx) = channel(); let (event_tx, event_rx) = channel(); - let task = spawn_local(Self::run(listener, emulation_proxy, request_rx, event_tx)); + let emulation_task = ListenTask { + listener, + emulation_proxy, + request_rx, + event_tx, + }; + let task = spawn_local(emulation_task.run()); Self { task, request_tx, @@ -90,74 +96,6 @@ impl Emulation { self.event_rx.recv().await.expect("channel closed") } - async fn run( - mut listener: LanMouseListener, - mut emulation_proxy: EmulationProxy, - mut request_rx: Receiver, - event_tx: Sender, - ) { - let mut interval = tokio::time::interval(Duration::from_secs(5)); - let mut last_response = HashMap::new(); - loop { - select! { - e = listener.next() => { - let (event, addr) = match e { - Some(e) => e, - None => break, - }; - log::trace!("{event} <-<-<-<-<- {addr}"); - last_response.insert(addr, Instant::now()); - match event { - ProtoEvent::Enter(pos) => { - if let Some(fingerprint) = listener.get_certificate_fingerprint(addr).await { - log::info!("releasing capture: {addr} entered this device"); - event_tx.send(EmulationEvent::ReleaseNotify).expect("channel closed"); - listener.reply(addr, ProtoEvent::Ack(0)).await; - event_tx.send(EmulationEvent::Connected{addr, pos: to_ipc_pos(pos), fingerprint}).expect("channel closed"); - } - } - ProtoEvent::Leave(_) => { - emulation_proxy.release_keys(addr); - listener.reply(addr, ProtoEvent::Ack(0)).await; - } - ProtoEvent::Input(event) => emulation_proxy.consume(event, addr), - ProtoEvent::Ping => listener.reply(addr, ProtoEvent::Pong(emulation_proxy.emulation_active.get())).await, - _ => {} - } - } - event = emulation_proxy.event() => { - event_tx.send(event).expect("channel closed"); - } - request = request_rx.recv() => match request.expect("channel closed") { - // reenable emulation - EmulationRequest::Reenable => emulation_proxy.reenable(), - // notify the other end that we hit a barrier (should release capture) - EmulationRequest::Release(addr) => listener.reply(addr, ProtoEvent::Leave(0)).await, - EmulationRequest::ChangePort(port) => { - listener.request_port_change(port); - let result = listener.port_changed().await; - event_tx.send(EmulationEvent::PortChanged(result)).expect("channel closed"); - } - EmulationRequest::Terminate => break, - }, - _ = interval.tick() => { - last_response.retain(|&addr,instant| { - if instant.elapsed() > Duration::from_secs(5) { - log::warn!("releasing keys: {addr} not responding!"); - emulation_proxy.release_keys(addr); - event_tx.send(EmulationEvent::Disconnected { addr }).expect("channel closed"); - false - } else { - true - } - }); - } - } - } - listener.terminate().await; - emulation_proxy.terminate().await; - } - /// wait for termination pub(crate) async fn terminate(&mut self) { log::debug!("terminating emulation"); @@ -170,6 +108,78 @@ impl Emulation { } } +struct ListenTask { + listener: LanMouseListener, + emulation_proxy: EmulationProxy, + request_rx: Receiver, + event_tx: Sender, +} + +impl ListenTask { + async fn run(mut self) { + let mut interval = tokio::time::interval(Duration::from_secs(5)); + let mut last_response = HashMap::new(); + loop { + select! { + e = self.listener.next() => { + let (event, addr) = match e { + Some(e) => e, + None => break, + }; + log::trace!("{event} <-<-<-<-<- {addr}"); + last_response.insert(addr, Instant::now()); + match event { + ProtoEvent::Enter(pos) => { + if let Some(fingerprint) = self.listener.get_certificate_fingerprint(addr).await { + log::info!("releasing capture: {addr} entered this device"); + self.event_tx.send(EmulationEvent::ReleaseNotify).expect("channel closed"); + self.listener.reply(addr, ProtoEvent::Ack(0)).await; + self.event_tx.send(EmulationEvent::Connected{addr, pos: to_ipc_pos(pos), fingerprint}).expect("channel closed"); + } + } + ProtoEvent::Leave(_) => { + self.emulation_proxy.release_keys(addr); + self.listener.reply(addr, ProtoEvent::Ack(0)).await; + } + ProtoEvent::Input(event) => self.emulation_proxy.consume(event, addr), + ProtoEvent::Ping => self.listener.reply(addr, ProtoEvent::Pong(self.emulation_proxy.emulation_active.get())).await, + _ => {} + } + } + event = self.emulation_proxy.event() => { + self.event_tx.send(event).expect("channel closed"); + } + request = self.request_rx.recv() => match request.expect("channel closed") { + // reenable emulation + EmulationRequest::Reenable => self.emulation_proxy.reenable(), + // notify the other end that we hit a barrier (should release capture) + EmulationRequest::Release(addr) => self.listener.reply(addr, ProtoEvent::Leave(0)).await, + EmulationRequest::ChangePort(port) => { + self.listener.request_port_change(port); + let result = self.listener.port_changed().await; + self.event_tx.send(EmulationEvent::PortChanged(result)).expect("channel closed"); + } + EmulationRequest::Terminate => break, + }, + _ = interval.tick() => { + last_response.retain(|&addr,instant| { + if instant.elapsed() > Duration::from_secs(5) { + log::warn!("releasing keys: {addr} not responding!"); + self.emulation_proxy.release_keys(addr); + self.event_tx.send(EmulationEvent::Disconnected { addr }).expect("channel closed"); + false + } else { + true + } + }); + } + } + } + self.listener.terminate().await; + self.emulation_proxy.terminate().await; + } +} + /// proxy handling the actual input emulation, /// discarding events when it is disabled pub(crate) struct EmulationProxy { @@ -193,12 +203,15 @@ impl EmulationProxy { let (event_tx, event_rx) = channel(); let emulation_active = Rc::new(Cell::new(false)); let exit_requested = Rc::new(Cell::new(false)); - let task = spawn_local(Self::emulation_task( + let emulation_task = EmulationTask { backend, - exit_requested.clone(), + exit_requested: exit_requested.clone(), request_rx, event_tx, - )); + handles: Default::default(), + next_id: 0, + }; + let task = spawn_local(emulation_task.run()); Self { emulation_active, exit_requested, @@ -234,124 +247,6 @@ impl EmulationProxy { .expect("channel closed"); } - async fn emulation_task( - backend: Option, - exit_requested: Rc>, - mut request_rx: Receiver, - event_tx: Sender, - ) { - let mut handles = HashMap::new(); - let mut next_id = 0; - loop { - if let Err(e) = Self::do_emulation( - backend, - &mut handles, - &mut next_id, - &mut request_rx, - &event_tx, - ) - .await - { - log::warn!("input emulation exited: {e}"); - } - if exit_requested.get() { - break; - } - // wait for reenable request - loop { - match request_rx.recv().await.expect("channel closed") { - ProxyRequest::Reenable => break, - ProxyRequest::Terminate => return, - ProxyRequest::Input(..) => { /* emulation inactive => ignore */ } - ProxyRequest::ReleaseKeys(..) => { /* emulation inactive => ignore */ } - } - } - } - } - - async fn do_emulation( - backend: Option, - handles: &mut HashMap, - next_id: &mut EmulationHandle, - request_rx: &mut Receiver, - event_tx: &Sender, - ) -> Result<(), InputEmulationError> { - log::info!("creating input emulation ..."); - let mut emulation = tokio::select! { - r = InputEmulation::new(backend) => r?, - // allow termination event while requesting input emulation - _ = wait_for_termination(request_rx) => return Ok(()), - }; - - // used to send enabled and disabled events - let _emulation_guard = DropGuard::new( - event_tx, - EmulationEvent::EmulationEnabled, - EmulationEvent::EmulationDisabled, - ); - - // create active handles - if let Err(e) = - Self::create_clients(&mut emulation, handles.values().copied(), request_rx).await - { - emulation.terminate().await; - return Err(e); - } - - let res = Self::do_emulation_session(&mut emulation, handles, next_id, request_rx).await; - // FIXME replace with async drop when stabilized - emulation.terminate().await; - res - } - - async fn create_clients( - emulation: &mut InputEmulation, - handles: impl Iterator, - request_rx: &mut Receiver, - ) -> Result<(), InputEmulationError> { - for handle in handles { - tokio::select! { - _ = emulation.create(handle) => {}, - _ = wait_for_termination(request_rx) => return Ok(()), - } - } - Ok(()) - } - - async fn do_emulation_session( - emulation: &mut InputEmulation, - handles: &mut HashMap, - next_id: &mut EmulationHandle, - rx: &mut Receiver, - ) -> Result<(), InputEmulationError> { - loop { - tokio::select! { - e = rx.recv() => match e.expect("channel closed") { - ProxyRequest::Input(event, addr) => { - let handle = match handles.get(&addr) { - Some(&handle) => handle, - None => { - let handle = *next_id; - *next_id += 1; - emulation.create(handle).await; - handles.insert(addr, handle); - handle - } - }; - emulation.consume(event, handle).await?; - }, - ProxyRequest::ReleaseKeys(addr) => { - if let Some(&handle) = handles.get(&addr) { - emulation.release_keys(handle).await? - } - } - ProxyRequest::Terminate => break Ok(()), - ProxyRequest::Reenable => continue, - }, - } - } - } - fn reenable(&self) { self.request_tx .send(ProxyRequest::Reenable) @@ -367,6 +262,109 @@ impl EmulationProxy { } } +struct EmulationTask { + backend: Option, + exit_requested: Rc>, + request_rx: Receiver, + event_tx: Sender, + handles: HashMap, + next_id: EmulationHandle, +} + +impl EmulationTask { + async fn run(mut self) { + loop { + if let Err(e) = self.do_emulation().await { + log::warn!("input emulation exited: {e}"); + } + if self.exit_requested.get() { + break; + } + // wait for reenable request + loop { + match self.request_rx.recv().await.expect("channel closed") { + ProxyRequest::Reenable => break, + ProxyRequest::Terminate => return, + ProxyRequest::Input(..) => { /* emulation inactive => ignore */ } + ProxyRequest::ReleaseKeys(..) => { /* emulation inactive => ignore */ } + } + } + } + } + + async fn do_emulation(&mut self) -> Result<(), InputEmulationError> { + log::info!("creating input emulation ..."); + let mut emulation = tokio::select! { + r = InputEmulation::new(self.backend) => r?, + // allow termination event while requesting input emulation + _ = wait_for_termination(&mut self.request_rx) => return Ok(()), + }; + + // used to send enabled and disabled events + let _emulation_guard = DropGuard::new( + self.event_tx.clone(), + EmulationEvent::EmulationEnabled, + EmulationEvent::EmulationDisabled, + ); + + // create active handles + if let Err(e) = self.create_clients(&mut emulation).await { + emulation.terminate().await; + return Err(e); + } + + let res = self.do_emulation_session(&mut emulation).await; + // FIXME replace with async drop when stabilized + emulation.terminate().await; + res + } + + async fn create_clients( + &mut self, + emulation: &mut InputEmulation, + ) -> Result<(), InputEmulationError> { + for handle in self.handles.values() { + tokio::select! { + _ = emulation.create(*handle) => {}, + _ = wait_for_termination(&mut self.request_rx) => return Ok(()), + } + } + Ok(()) + } + + async fn do_emulation_session( + &mut self, + emulation: &mut InputEmulation, + ) -> Result<(), InputEmulationError> { + loop { + tokio::select! { + e = self.request_rx.recv() => match e.expect("channel closed") { + ProxyRequest::Input(event, addr) => { + let handle = match self.handles.get(&addr) { + Some(&handle) => handle, + None => { + let handle = self.next_id; + self.next_id += 1; + emulation.create(handle).await; + self.handles.insert(addr, handle); + handle + } + }; + emulation.consume(event, handle).await?; + }, + ProxyRequest::ReleaseKeys(addr) => { + if let Some(&handle) = self.handles.get(&addr) { + emulation.release_keys(handle).await? + } + } + ProxyRequest::Terminate => break Ok(()), + ProxyRequest::Reenable => continue, + }, + } + } + } +} + fn to_ipc_pos(pos: Position) -> lan_mouse_ipc::Position { match pos { Position::Left => lan_mouse_ipc::Position::Left, @@ -387,20 +385,20 @@ async fn wait_for_termination(rx: &mut Receiver) { } } -struct DropGuard<'a, T> { - tx: &'a Sender, +struct DropGuard { + tx: Sender, on_drop: Option, } -impl<'a, T> DropGuard<'a, T> { - fn new(tx: &'a Sender, on_new: T, on_drop: T) -> Self { +impl DropGuard { + fn new(tx: Sender, on_new: T, on_drop: T) -> Self { tx.send(on_new).expect("channel closed"); let on_drop = Some(on_drop); Self { tx, on_drop } } } -impl<'a, T> Drop for DropGuard<'a, T> { +impl Drop for DropGuard { fn drop(&mut self) { self.tx .send(self.on_drop.take().expect("item"))