From 06d4e8d8365e45bb5e661b7df94a502af6bef367 Mon Sep 17 00:00:00 2001 From: Ferdinand Schober Date: Tue, 8 Oct 2024 01:10:25 +0200 Subject: [PATCH] implementing release logic for one-way connection --- lan-mouse-cli/src/lib.rs | 2 + lan-mouse-gtk/src/lib.rs | 2 + lan-mouse-ipc/src/lib.rs | 4 ++ src/capture.rs | 27 +++++++++++--- src/emulation.rs | 30 ++++++++++----- src/service.rs | 79 +++++++++++++++++++++++++++++++++++++--- 6 files changed, 125 insertions(+), 19 deletions(-) diff --git a/lan-mouse-cli/src/lib.rs b/lan-mouse-cli/src/lib.rs index e7209c7..3a22bd0 100644 --- a/lan-mouse-cli/src/lib.rs +++ b/lan-mouse-cli/src/lib.rs @@ -283,6 +283,8 @@ impl Cli { FrontendEvent::PublicKeyFingerprint(fp) => { eprintln!("the public key fingerprint of this device is {fp}"); } + FrontendEvent::IncomingConnected(fp, addr, pos) => {} + FrontendEvent::IncomingDisconnected(fp) => {} } } diff --git a/lan-mouse-gtk/src/lib.rs b/lan-mouse-gtk/src/lib.rs index 8fba714..efbc99c 100644 --- a/lan-mouse-gtk/src/lib.rs +++ b/lan-mouse-gtk/src/lib.rs @@ -142,6 +142,8 @@ fn build_ui(app: &Application) { FrontendEvent::PublicKeyFingerprint(fp) => { window.set_pk_fp(&fp); } + FrontendEvent::IncomingConnected(fp, addr, pos) => {} + FrontendEvent::IncomingDisconnected(fp) => {} } } } diff --git a/lan-mouse-ipc/src/lib.rs b/lan-mouse-ipc/src/lib.rs index 6ffa34c..b8697e1 100644 --- a/lan-mouse-ipc/src/lib.rs +++ b/lan-mouse-ipc/src/lib.rs @@ -201,6 +201,10 @@ pub enum FrontendEvent { AuthorizedUpdated(HashMap), /// public key fingerprint of this device PublicKeyFingerprint(String), + /// incoming connected + IncomingConnected(String, SocketAddr, Position), + /// incoming disconnected + IncomingDisconnected(String), } #[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)] diff --git a/src/capture.rs b/src/capture.rs index 00b7702..c6a0b86 100644 --- a/src/capture.rs +++ b/src/capture.rs @@ -20,6 +20,7 @@ use crate::{connect::LanMouseConnection, service::Service}; pub(crate) struct Capture { tx: Sender, task: JoinHandle<()>, + enter_rx: Receiver, } #[derive(Clone, Copy, Debug)] @@ -35,8 +36,9 @@ enum CaptureRequest { impl Capture { pub(crate) fn new(server: Service, conn: LanMouseConnection) -> Self { let (tx, rx) = channel(); - let task = spawn_local(Self::run(server.clone(), rx, conn)); - Self { tx, task } + let (enter_tx, enter_rx) = channel(); + let task = spawn_local(Self::run(server.clone(), rx, conn, enter_tx)); + Self { tx, task, enter_rx } } pub(crate) async fn terminate(&mut self) { @@ -66,9 +68,18 @@ impl Capture { .expect("channel closed"); } - async fn run(server: Service, mut rx: Receiver, mut conn: LanMouseConnection) { + pub(crate) async fn entered(&mut self) -> CaptureHandle { + self.enter_rx.recv().await.expect("channel closed") + } + + async fn run( + server: Service, + mut rx: Receiver, + mut conn: LanMouseConnection, + mut enter_tx: Sender, + ) { loop { - if let Err(e) = do_capture(&server, &mut conn, &mut rx).await { + if let Err(e) = do_capture(&server, &mut conn, &mut rx, &mut enter_tx).await { log::warn!("input capture exited: {e}"); } server.set_capture_status(Status::Disabled); @@ -90,6 +101,7 @@ async fn do_capture( server: &Service, conn: &mut LanMouseConnection, rx: &mut Receiver, + enter_tx: &mut Sender, ) -> Result<(), InputCaptureError> { let backend = server.config.capture_backend.map(|b| b.into()); @@ -121,7 +133,7 @@ async fn do_capture( loop { tokio::select! { event = capture.next() => match event { - Some(event) => handle_capture_event(server, &mut capture, conn, event?, &mut state).await?, + Some(event) => handle_capture_event(server, &mut capture, conn, event?, &mut state, enter_tx).await?, None => return Ok(()), }, (handle, event) = conn.recv() => { @@ -196,6 +208,7 @@ async fn handle_capture_event( conn: &LanMouseConnection, event: (CaptureHandle, CaptureEvent), state: &mut State, + enter_tx: &mut Sender, ) -> Result<(), CaptureError> { let (handle, event) = event; log::trace!("({handle}): {event:?}"); @@ -211,6 +224,10 @@ async fn handle_capture_event( return release_capture(capture, server).await; } + if event == CaptureEvent::Begin { + enter_tx.send(handle).expect("channel closed"); + } + // activated a new client if event == CaptureEvent::Begin && Some(handle) != server.get_active() { *state = State::WaitingForAck; diff --git a/src/emulation.rs b/src/emulation.rs index 4ba1b77..3dff0d9 100644 --- a/src/emulation.rs +++ b/src/emulation.rs @@ -18,19 +18,26 @@ use tokio::{ /// emulation handling events received from a listener pub(crate) struct Emulation { task: JoinHandle<()>, + release_tx: Sender, } impl Emulation { pub(crate) fn new(server: Service, listener: LanMouseListener) -> Self { let emulation_proxy = EmulationProxy::new(server.clone()); - let task = spawn_local(Self::run(server, listener, emulation_proxy)); - Self { task } + let (release_tx, release_rx) = channel(); + let task = spawn_local(Self::run(server, listener, emulation_proxy, release_rx)); + Self { task, release_tx } + } + + pub(crate) fn notify_release(&self, addr: SocketAddr) { + self.release_tx.send(addr).expect("channel closed"); } async fn run( - server: Service, + service: Service, mut listener: LanMouseListener, mut emulation_proxy: EmulationProxy, + mut release_rx: Receiver, ) { let mut interval = tokio::time::interval(Duration::from_secs(5)); let mut last_response = HashMap::new(); @@ -47,9 +54,9 @@ impl Emulation { ProtoEvent::Enter(pos) => { if let Some(cert) = listener.get_certificate_fingerprint(addr).await { log::info!("{addr} entered this device"); - server.release_capture(); + service.release_capture(); listener.reply(addr, ProtoEvent::Ack(0)).await; - server.register_incoming(addr, to_ipc_pos(pos), cert); + service.register_incoming(addr, to_ipc_pos(pos), cert); } } ProtoEvent::Leave(_) => { @@ -57,22 +64,27 @@ impl Emulation { listener.reply(addr, ProtoEvent::Ack(0)).await; } ProtoEvent::Input(event) => emulation_proxy.consume(event, addr), - ProtoEvent::Ping => listener.reply(addr, ProtoEvent::Pong(server.emulation_status.get() == Status::Enabled)).await, + ProtoEvent::Ping => listener.reply(addr, ProtoEvent::Pong(service.emulation_status.get() == Status::Enabled)).await, _ => {} } } + addr = release_rx.recv() => { + let addr = addr.expect("channel closed"); + listener.reply(addr, ProtoEvent::Leave(0)).await; + } _ = interval.tick() => { - last_response.retain(|addr,instant| { + last_response.retain(|&addr,instant| { if instant.elapsed() > Duration::from_secs(5) { log::warn!("releasing keys: {addr} not responding!"); - emulation_proxy.release_keys(*addr); + emulation_proxy.release_keys(addr); + service.deregister_incoming(addr); false } else { true } }); } - _ = server.cancelled() => break, + _ = service.cancelled() => break, } } listener.terminate().await; diff --git a/src/service.rs b/src/service.rs index c3e6695..86e4916 100644 --- a/src/service.rs +++ b/src/service.rs @@ -22,6 +22,7 @@ use std::{ net::{IpAddr, SocketAddr}, rc::Rc, sync::{Arc, RwLock}, + u64, }; use thiserror::Error; use tokio::{signal, sync::Notify}; @@ -44,6 +45,17 @@ pub enum ServiceError { pub struct ReleaseToken; +enum IncomingEvent { + Connected { + addr: SocketAddr, + pos: Position, + fingerprint: String, + }, + Disconnected { + addr: SocketAddr, + }, +} + #[derive(Clone)] pub struct Service { active: Rc>>, @@ -54,12 +66,13 @@ pub struct Service { notifies: Rc, pub(crate) config: Rc, pending_frontend_events: Rc>>, - pending_incoming: Rc>>, + pending_incoming: Rc>>, capture_status: Rc>, pub(crate) emulation_status: Rc>, pub(crate) should_release: Rc>>, - incoming_conns: Rc>>, + incoming_conns: Rc>>, cert: Certificate, + next_trigger_handle: u64, } #[derive(Default)] @@ -118,6 +131,7 @@ impl Service { emulation_status: Default::default(), incoming_conns: Rc::new(RefCell::new(HashMap::new())), should_release: Default::default(), + next_trigger_handle: 0, }; Ok(service) } @@ -171,11 +185,26 @@ impl Service { } }, _ = self.notifies.incoming.notified() => { - while let Some((addr, pos, fingerprint)) = { + while let Some(incoming) = { let incoming = self.pending_incoming.borrow_mut().pop_front(); incoming } { - // capture.register(addr, pos); + match incoming { + IncomingEvent::Connected { addr, pos, fingerprint } => { + self.add_incoming(addr, pos, fingerprint.clone(), &capture); + self.notify_frontend(FrontendEvent::IncomingConnected(fingerprint, addr, pos)); + }, + IncomingEvent::Disconnected { addr } => { + if let Some(fp) = self.remove_incoming(addr, &capture) { + self.notify_frontend(FrontendEvent::IncomingDisconnected(fp)); + } + }, + } + } + }, + handle = capture.entered() => { + if let Some((_fp, addr)) = self.incoming_conns.borrow().get(&handle) { + emulation.notify_release(*addr); } }, _ = self.cancelled() => break, @@ -196,6 +225,36 @@ impl Service { Ok(()) } + fn add_incoming( + &mut self, + addr: SocketAddr, + pos: Position, + fingerprint: String, + capture: &Capture, + ) { + const ENTER_HANDLE_BEGIN: u64 = u64::MAX / 2 + 1; + let handle = ENTER_HANDLE_BEGIN + self.next_trigger_handle; + self.next_trigger_handle += 1; + capture.create(handle, pos); + self.incoming_conns + .borrow_mut() + .insert(handle, (fingerprint, addr)); + } + + fn remove_incoming(&mut self, addr: SocketAddr, capture: &Capture) -> Option { + let handle = self + .incoming_conns + .borrow() + .iter() + .find(|(_, (_, a))| *a == addr) + .map(|(k, _)| *k)?; + capture.destroy(handle); + self.incoming_conns + .borrow_mut() + .remove(&handle) + .map(|(f, _)| f) + } + fn notify_frontend(&self, event: FrontendEvent) { self.pending_frontend_events.borrow_mut().push_back(event); self.notifies.frontend_event_pending.notify_one(); @@ -433,7 +492,17 @@ impl Service { pub(crate) fn register_incoming(&self, addr: SocketAddr, pos: Position, fingerprint: String) { self.pending_incoming .borrow_mut() - .push_back((addr, pos, fingerprint)); + .push_back(IncomingEvent::Connected { + addr, + pos, + fingerprint, + }); self.notifies.incoming.notify_one(); } + + pub(crate) fn deregister_incoming(&self, addr: SocketAddr) { + self.pending_incoming + .borrow_mut() + .push_back(IncomingEvent::Disconnected { addr }); + } }