From 937652ac449e13a21d1ca264375f75f2aaf2fd03 Mon Sep 17 00:00:00 2001 From: Ferdinand Schober Date: Fri, 6 Sep 2024 18:28:04 +0200 Subject: [PATCH] finish up server --- src/capture.rs | 51 ++++++++++++++++++++++++++++++--------------- src/connect.rs | 13 +++++------- src/emulation.rs | 54 +++++++++++++++++++++++++----------------------- src/lib.rs | 11 +++++----- src/listen.rs | 13 ++++++------ src/server.rs | 43 +++++++++++++++++--------------------- 6 files changed, 99 insertions(+), 86 deletions(-) diff --git a/src/capture.rs b/src/capture.rs index aafbef9..bc0b3e4 100644 --- a/src/capture.rs +++ b/src/capture.rs @@ -1,6 +1,6 @@ use futures::StreamExt; use input_capture::{ - Backend, CaptureError, CaptureEvent, CaptureHandle, InputCapture, InputCaptureError, Position, + CaptureError, CaptureEvent, CaptureHandle, InputCapture, InputCaptureError, Position, }; use lan_mouse_ipc::{ClientHandle, Status}; use lan_mouse_proto::ProtoEvent; @@ -13,9 +13,8 @@ use tokio::{ use crate::{connect::LanMouseConnection, server::Server}; pub(crate) struct CaptureProxy { - server: Server, tx: Sender, - task: JoinHandle<()>, + _task: JoinHandle<()>, } #[derive(Clone, Copy, Debug)] @@ -29,20 +28,34 @@ enum CaptureRequest { } impl CaptureProxy { - pub(crate) fn new(server: Server, backend: Option, conn: LanMouseConnection) -> Self { + pub(crate) fn new(server: Server, conn: LanMouseConnection) -> Self { let (tx, rx) = channel(); - let task = spawn_local(Self::run(server.clone(), backend, rx, conn)); - Self { server, tx, task } + let _task = spawn_local(Self::run(server.clone(), rx, conn)); + Self { tx, _task } } - pub(crate) async fn run( - server: Server, - backend: Option, - mut rx: Receiver, - conn: LanMouseConnection, - ) { + pub(crate) fn create(&self, handle: CaptureHandle, pos: Position) { + self.tx + .send(CaptureRequest::Create(handle, pos)) + .expect("channel closed"); + } + + pub(crate) fn destroy(&self, handle: CaptureHandle) { + self.tx + .send(CaptureRequest::Destroy(handle)) + .expect("channel closed"); + } + + #[allow(unused)] + pub(crate) fn release(&self) { + self.tx + .send(CaptureRequest::Release) + .expect("channel closed"); + } + + async fn run(server: Server, mut rx: Receiver, conn: LanMouseConnection) { loop { - if let Err(e) = do_capture(backend, &server, &conn, &mut rx).await { + if let Err(e) = do_capture(&server, &conn, &mut rx).await { log::warn!("input capture exited: {e}"); } server.set_capture_status(Status::Disabled); @@ -56,11 +69,12 @@ impl CaptureProxy { } async fn do_capture( - backend: Option, server: &Server, conn: &LanMouseConnection, rx: &mut Receiver, ) -> Result<(), InputCaptureError> { + let backend = server.config.capture_backend.map(|b| b.into()); + /* allow cancelling capture request */ let mut capture = tokio::select! { r = InputCapture::new(backend) => r?, @@ -82,7 +96,7 @@ async fn do_capture( loop { tokio::select! { event = capture.next() => match event { - Some(event) => handle_capture_event(server, &mut capture, sender_tx, event?).await?, + Some(event) => handle_capture_event(server, &mut capture, conn, event?).await?, None => return Ok(()), }, e = rx.recv() => { @@ -106,7 +120,7 @@ async fn do_capture( async fn handle_capture_event( server: &Server, capture: &mut InputCapture, - conn: &mut LanMouseConnection, + conn: &LanMouseConnection, event: (CaptureHandle, CaptureEvent), ) -> Result<(), CaptureError> { let (handle, event) = event; @@ -125,7 +139,10 @@ async fn handle_capture_event( CaptureEvent::Input(e) => ProtoEvent::Input(e), }; - conn.send(event, handle).await; + if let Err(e) = conn.send(event, handle).await { + log::warn!("failed to connect, releasing capture: {e}"); + capture.release().await?; + } Ok(()) } diff --git a/src/connect.rs b/src/connect.rs index fb87ff0..980f3f2 100644 --- a/src/connect.rs +++ b/src/connect.rs @@ -25,7 +25,7 @@ pub(crate) enum LanMouseConnectionError { async fn connect(addr: SocketAddr) -> Result, LanMouseConnectionError> { let conn = Arc::new(UdpSocket::bind("0.0.0.0:0").await?); - conn.connect(addr).await; + conn.connect(addr).await?; log::info!("connected to {addr}, establishing secure dtls channel ..."); let certificate = Certificate::generate_self_signed(["localhost".to_owned()])?; let config = Config { @@ -56,14 +56,11 @@ pub(crate) struct LanMouseConnection { } impl LanMouseConnection { - fn find_conn(&self, addrs: &[SocketAddr]) -> Vec> { - let mut conns = vec![]; - for addr in addrs { - if let Some(conn) = self.conns.get(&addr) { - conns.push(conn.clone()); - } + pub(crate) fn new(server: Server) -> Self { + Self { + server, + conns: Default::default(), } - conns } pub(crate) async fn send( diff --git a/src/emulation.rs b/src/emulation.rs index 643ad8c..3ce5c7c 100644 --- a/src/emulation.rs +++ b/src/emulation.rs @@ -10,32 +10,29 @@ use tokio::task::{spawn_local, JoinHandle}; /// emulation handling events received from a listener pub(crate) struct Emulation { - server: Server, - listener: LanMouseListener, - emulation_proxy: EmulationProxy, + _tx: Sender, + _task: JoinHandle<()>, } impl Emulation { pub(crate) fn new(server: Server, listener: LanMouseListener) -> Self { + let (_tx, _rx) = channel(); let emulation_proxy = EmulationProxy::new(server.clone()); - Self { - server, - listener, - emulation_proxy, - } + let _task = spawn_local(Self::run(server, listener, emulation_proxy)); + Self { _tx, _task } } - async fn run(&mut self) { - while let Some((event, addr)) = self.listener.next().await { + async fn run(server: Server, mut listener: LanMouseListener, emulation_proxy: EmulationProxy) { + while let Some((event, addr)) = listener.next().await { match event { ProtoEvent::Enter(_) => { - self.server.release_capture(); - self.listener.reply(addr, ProtoEvent::Ack(0)).await; + server.release_capture(); + listener.reply(addr, ProtoEvent::Ack(0)).await; } - ProtoEvent::Leave(_) => self.emulation_proxy.release_keys(addr).await, + ProtoEvent::Leave(_) => emulation_proxy.release_keys(addr).await, ProtoEvent::Ack(_) => {} - ProtoEvent::Input(event) => self.emulation_proxy.consume(event, addr).await, - ProtoEvent::Ping => self.listener.reply(addr, ProtoEvent::Pong).await, + ProtoEvent::Input(event) => emulation_proxy.consume(event, addr).await, + ProtoEvent::Ping => listener.reply(addr, ProtoEvent::Pong).await, ProtoEvent::Pong => todo!(), } } @@ -47,6 +44,7 @@ impl Emulation { pub(crate) struct EmulationProxy { server: Server, tx: Sender<(ProxyEvent, SocketAddr)>, + #[allow(unused)] task: JoinHandle<()>, } @@ -62,6 +60,21 @@ impl EmulationProxy { Self { server, tx, task } } + async fn consume(&self, event: Event, addr: SocketAddr) { + // ignore events if emulation is currently disabled + if let Status::Enabled = self.server.emulation_status.get() { + self.tx + .send((ProxyEvent::Input(event), addr)) + .expect("channel closed"); + } + } + + async fn release_keys(&self, addr: SocketAddr) { + self.tx + .send((ProxyEvent::ReleaseKeys, addr)) + .expect("channel closed"); + } + async fn emulation_task(server: Server, mut rx: Receiver<(ProxyEvent, SocketAddr)>) { let mut handles = HashMap::new(); let mut next_id = 0; @@ -133,15 +146,4 @@ impl EmulationProxy { } } } - - async fn consume(&self, event: Event, addr: SocketAddr) { - // ignore events if emulation is currently disabled - if let Status::Enabled = self.server.emulation_status.get() { - self.tx.send((ProxyEvent::Input(event), addr)); - } - } - - async fn release_keys(&self, addr: SocketAddr) { - self.tx.send((ProxyEvent::ReleaseKeys, addr)); - } } diff --git a/src/lib.rs b/src/lib.rs index df771c9..8180864 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,11 +1,12 @@ +mod capture; +pub mod capture_test; pub mod client; pub mod config; -pub mod dns; -pub mod server; - -mod capture; -mod capture_test; mod connect; +#[allow(unused)] mod crypto; +mod dns; mod emulation; +pub mod emulation_test; mod listen; +pub mod server; diff --git a/src/listen.rs b/src/listen.rs index e0622e9..ea1784f 100644 --- a/src/listen.rs +++ b/src/listen.rs @@ -15,7 +15,7 @@ use webrtc_dtls::{ use webrtc_util::{conn::Listener, Conn, Error}; #[derive(Error, Debug)] -pub(crate) enum ListenerCreationError { +pub enum ListenerCreationError { #[error(transparent)] WebrtcUtil(#[from] webrtc_util::Error), #[error(transparent)] @@ -24,7 +24,7 @@ pub(crate) enum ListenerCreationError { pub(crate) struct LanMouseListener { listen_rx: Receiver<(ProtoEvent, SocketAddr)>, - listen_task: JoinHandle<()>, + _listen_task: JoinHandle<()>, conns: Rc>>>, } @@ -49,7 +49,7 @@ impl LanMouseListener { let conns_clone = conns.clone(); - let listen_task: JoinHandle<()> = spawn_local(async move { + let _listen_task: JoinHandle<()> = spawn_local(async move { loop { let (conn, _addr) = match listener.accept().await { Ok(c) => c, @@ -67,15 +67,16 @@ impl LanMouseListener { Ok(Self { conns, listen_rx, - listen_task, + _listen_task, }) } + #[allow(unused)] pub(crate) async fn broadcast(&self, event: ProtoEvent) { let (buf, len): ([u8; MAX_EVENT_SIZE], usize) = event.into(); let conns = self.conns.lock().await; for conn in conns.iter() { - conn.send(&buf[..len]).await; + let _ = conn.send(&buf[..len]).await; } } @@ -84,7 +85,7 @@ impl LanMouseListener { let conns = self.conns.lock().await; for conn in conns.iter() { if conn.remote_addr() == Some(addr) { - conn.send(&buf[..len]).await; + let _ = conn.send(&buf[..len]).await; } } } diff --git a/src/server.rs b/src/server.rs index cbd88d3..7190edf 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,4 +1,3 @@ -use capture_task::CaptureRequest; use futures::StreamExt; use hickory_resolver::error::ResolveError; use local_channel::mpsc::{channel, Sender}; @@ -15,8 +14,10 @@ use tokio::{join, signal, sync::Notify}; use tokio_util::sync::CancellationToken; use crate::{ + capture::CaptureProxy, client::ClientManager, config::Config, + connect::LanMouseConnection, dns::DnsResolver, emulation::Emulation, listen::{LanMouseListener, ListenerCreationError}, @@ -27,9 +28,6 @@ use lan_mouse_ipc::{ IpcListenerCreationError, Position, Status, }; -mod capture_task; -mod emulation_task; - #[derive(Debug, Error)] pub enum ServiceError { #[error(transparent)] @@ -48,6 +46,7 @@ pub struct ReleaseToken; pub struct Server { pub(crate) client_manager: Rc>, port: Rc>, + #[allow(unused)] release_bind: Vec, notifies: Rc, pub(crate) config: Rc, @@ -104,6 +103,7 @@ impl Server { pending_frontend_events: Rc::new(RefCell::new(VecDeque::new())), capture_status: Default::default(), emulation_status: Default::default(), + should_release: Default::default(), } } @@ -118,17 +118,15 @@ impl Server { e => e?, }; - let (capture_tx, capture_rx) = channel(); /* requests for input capture */ let (dns_tx, dns_rx) = channel(); /* dns requests */ - // udp task + // listener + connection let listener = LanMouseListener::new(self.config.port).await?; + let conn = LanMouseConnection::new(self.clone()); - // input capture - // let capture = capture_task::new(self.clone(), capture_rx); - - // input emulation - let emulation = Emulation::new(self.clone(), listener); + // input capture + emulation + let capture = CaptureProxy::new(self.clone(), conn); + let _emulation = Emulation::new(self.clone(), listener); // create dns resolver let resolver = DnsResolver::new(dns_rx)?; @@ -150,7 +148,7 @@ impl Server { None => break, }; log::debug!("received frontend request: {request:?}"); - self.handle_request(&capture_tx.clone(), request); + self.handle_request(&capture, request, &dns_tx); log::debug!("handled frontend request"); } _ = self.notifies.frontend_event_pending.notified() => { @@ -191,10 +189,6 @@ impl Server { self.notifies.cancel.cancelled().await } - pub(crate) fn is_cancelled(&self) -> bool { - self.notifies.cancel.is_cancelled() - } - fn notify_capture(&self) { log::info!("received capture enable request"); self.notifies.capture.notify_waiters() @@ -218,6 +212,7 @@ impl Server { self.notifies.port_changed.notify_one(); } + #[allow(unused)] fn notify_port_changed(&self, port: u16, msg: Option) { self.port.replace(port); self.notify_frontend(FrontendEvent::PortChanged(port, msg)); @@ -238,7 +233,7 @@ impl Server { fn handle_request( &self, - capture: &Sender, + capture: &CaptureProxy, event: FrontendRequest, dns: &Sender, ) -> bool { @@ -300,7 +295,7 @@ impl Server { handle } - fn deactivate_client(&self, capture: &Sender, handle: ClientHandle) { + fn deactivate_client(&self, capture: &CaptureProxy, handle: ClientHandle) { log::debug!("deactivating client {handle}"); match self.client_manager.borrow_mut().get_mut(handle) { None => return, @@ -308,12 +303,12 @@ impl Server { Some((_, s)) => s.active = false, }; - let _ = capture.send(CaptureRequest::Destroy(handle)); + capture.destroy(handle); self.client_updated(handle); log::info!("deactivated client {handle}"); } - fn activate_client(&self, capture: &Sender, handle: ClientHandle) { + fn activate_client(&self, capture: &CaptureProxy, handle: ClientHandle) { log::debug!("activating client"); /* deactivate potential other client at this position */ let pos = match self.client_manager.borrow().get(handle) { @@ -335,12 +330,12 @@ impl Server { }; /* notify capture and frontends */ - let _ = capture.send(CaptureRequest::Create(handle, to_capture_pos(pos))); + capture.create(handle, to_capture_pos(pos)); self.client_updated(handle); log::info!("activated client {handle} ({pos})"); } - fn remove_client(&self, capture: &Sender, handle: ClientHandle) { + fn remove_client(&self, capture: &CaptureProxy, handle: ClientHandle) { let Some(active) = self .client_manager .borrow_mut() @@ -351,7 +346,7 @@ impl Server { }; if active { - let _ = capture.send(CaptureRequest::Destroy(handle)); + capture.destroy(handle); } } @@ -433,7 +428,7 @@ impl Server { } } - fn update_pos(&self, handle: ClientHandle, capture: &Sender, pos: Position) { + fn update_pos(&self, handle: ClientHandle, capture: &CaptureProxy, pos: Position) { let (changed, active) = { let mut client_manager = self.client_manager.borrow_mut(); let Some((c, s)) = client_manager.get_mut(handle) else {