From 64c4480e93e0664a4b8534c03611ca97c0c65c44 Mon Sep 17 00:00:00 2001 From: Ferdinand Schober Date: Sat, 21 Sep 2024 14:29:17 +0200 Subject: [PATCH] enter acknowledgement --- src/capture.rs | 55 ++++++++++++++-- src/connect.rs | 169 +++++++++++++++++++++++++++++++------------------ src/server.rs | 9 +++ 3 files changed, 165 insertions(+), 68 deletions(-) diff --git a/src/capture.rs b/src/capture.rs index b350522..6d55cf9 100644 --- a/src/capture.rs +++ b/src/capture.rs @@ -1,5 +1,6 @@ use std::{ cell::Cell, + future, time::{Duration, Instant}, }; @@ -66,9 +67,9 @@ impl Capture { .expect("channel closed"); } - async fn run(server: Server, mut rx: Receiver, conn: LanMouseConnection) { + async fn run(server: Server, mut rx: Receiver, mut conn: LanMouseConnection) { loop { - if let Err(e) = do_capture(&server, &conn, &mut rx).await { + if let Err(e) = do_capture(&server, &mut conn, &mut rx).await { log::warn!("input capture exited: {e}"); } server.set_capture_status(Status::Disabled); @@ -88,7 +89,7 @@ impl Capture { async fn do_capture( server: &Server, - conn: &LanMouseConnection, + conn: &mut LanMouseConnection, rx: &mut Receiver, ) -> Result<(), InputCaptureError> { let backend = server.config.capture_backend.map(|b| b.into()); @@ -111,16 +112,33 @@ async fn do_capture( capture.create(handle, to_capture_pos(pos)).await?; } + let mut state = State::Idle; + loop { tokio::select! { event = capture.next() => match event { - Some(event) => handle_capture_event(server, &mut capture, conn, event?).await?, + Some(event) => handle_capture_event(server, &mut capture, conn, event?, &mut state).await?, None => return Ok(()), }, + (handle, event) = conn.recv() => if let Some(active) = server.get_active() { + if handle != active { + // we only care about events coming from the client we are currently connected to + // only `Ack` and `Leave` are relevant + continue + } + + match event { + // connection acknowlegded => set state to Sending + ProtoEvent::Ack(_) => state = State::Sending, + // client disconnected + ProtoEvent::Leave(_) => release_capture(&mut capture, server, &mut state).await?, + _ => {} + } + }, e = rx.recv() => { match e { Some(e) => match e { - CaptureRequest::Release => capture.release().await?, + CaptureRequest::Release => release_capture(&mut capture, server, &mut state).await?, CaptureRequest::Create(h, p) => capture.create(h, p).await?, CaptureRequest::Destroy(h) => capture.destroy(h).await?, }, @@ -156,11 +174,18 @@ macro_rules! debounce { }; } +enum State { + Idle, + WaitingForAck, + Sending, +} + async fn handle_capture_event( server: &Server, capture: &mut InputCapture, conn: &LanMouseConnection, event: (CaptureHandle, CaptureEvent), + state: &mut State, ) -> Result<(), CaptureError> { let (handle, event) = event; log::trace!("({handle}): {event:?}"); @@ -168,16 +193,22 @@ async fn handle_capture_event( if server.should_release.borrow_mut().take().is_some() || capture.keys_pressed(&server.release_bind) { - return capture.release().await; + return release_capture(capture, server, state).await; } if event == CaptureEvent::Begin { + *state = State::WaitingForAck; + server.set_active(Some(handle)); spawn_hook_command(server, handle); } let event = match event { CaptureEvent::Begin => ProtoEvent::Enter(lan_mouse_proto::Position::Left), - CaptureEvent::Input(e) => ProtoEvent::Input(e), + CaptureEvent::Input(e) => match state { + State::Sending => ProtoEvent::Input(e), + // connection not acknowledged, repeat `Enter` event + _ => ProtoEvent::Enter(lan_mouse_proto::Position::Left), + }, }; if let Err(e) = conn.send(event, handle).await { @@ -188,6 +219,16 @@ async fn handle_capture_event( Ok(()) } +async fn release_capture( + capture: &mut InputCapture, + server: &Server, + state: &mut State, +) -> Result<(), CaptureError> { + *state = State::Idle; + server.set_active(None); + capture.release().await +} + fn to_capture_pos(pos: lan_mouse_ipc::Position) -> input_capture::Position { match pos { lan_mouse_ipc::Position::Left => input_capture::Position::Left, diff --git a/src/connect.rs b/src/connect.rs index 1a8b56c..7d5c30f 100644 --- a/src/connect.rs +++ b/src/connect.rs @@ -1,6 +1,7 @@ use crate::server::Server; use lan_mouse_ipc::{ClientHandle, DEFAULT_PORT}; use lan_mouse_proto::{ProtoEvent, MAX_EVENT_SIZE}; +use local_channel::mpsc::{channel, Receiver, Sender}; use std::{ collections::{HashMap, HashSet}, io, @@ -67,17 +68,26 @@ pub(crate) struct LanMouseConnection { server: Server, conns: Rc>>>, connecting: Rc>>, + recv_rx: Receiver<(ClientHandle, ProtoEvent)>, + recv_tx: Sender<(ClientHandle, ProtoEvent)>, } impl LanMouseConnection { pub(crate) fn new(server: Server) -> Self { + let (recv_tx, recv_rx) = channel(); Self { server, conns: Default::default(), connecting: Default::default(), + recv_rx, + recv_tx, } } + pub(crate) async fn recv(&mut self) -> (ClientHandle, ProtoEvent) { + self.recv_rx.recv().await.expect("channel closed") + } + pub(crate) async fn send( &self, event: ProtoEvent, @@ -103,68 +113,105 @@ impl LanMouseConnection { } // check if we are already trying to connect - { - let mut connecting = self.connecting.lock().await; - if connecting.contains(&handle) { - return Err(LanMouseConnectionError::NotConnected); - } else { - connecting.insert(handle); - } + let mut connecting = self.connecting.lock().await; + if !connecting.contains(&handle) { + connecting.insert(handle); + // connect in the background + spawn_local(connect_to_handle( + self.server.clone(), + handle, + self.conns.clone(), + self.connecting.clone(), + self.recv_tx.clone(), + )); } - let server = self.server.clone(); - let conns = self.conns.clone(); - let connecting = self.connecting.clone(); - - // connect in the background - spawn_local(async move { - // sending did not work, figure out active conn. - if let Some(addrs) = server.get_ips(handle) { - let port = server.get_port(handle).unwrap_or(DEFAULT_PORT); - let addrs = addrs - .into_iter() - .map(|a| SocketAddr::new(a, port)) - .collect::>(); - log::info!("client ({handle}) connecting ... (ips: {addrs:?})"); - let res = connect_any(&addrs).await; - let (conn, addr) = match res { - Ok(c) => c, - Err(e) => { - connecting.lock().await.remove(&handle); - return Err(e); - } - }; - log::info!("client ({handle}) connected @ {addr}"); - server.set_active_addr(handle, Some(addr)); - conns.lock().await.insert(addr, conn.clone()); - connecting.lock().await.remove(&handle); - spawn_local(async move { - loop { - let (buf, len) = ProtoEvent::Ping.into(); - if let Err(e) = conn.send(&buf[..len]).await { - log::warn!("client ({handle}) @ {addr} connection closed: {e}"); - conns.lock().await.remove(&addr); - server.set_active_addr(handle, None); - let active: Vec = - conns.lock().await.keys().copied().collect(); - log::info!("active connections: {active:?}"); - break; - } - tokio::time::sleep(Duration::from_millis(500)).await; - let mut buf = [0u8; MAX_EVENT_SIZE]; - if let Err(e) = conn.recv(&mut buf).await { - log::warn!("recv(): client ({handle}) @ {addr} connection closed: {e}"); - conns.lock().await.remove(&addr); - server.set_active_addr(handle, None); - let active: Vec = - conns.lock().await.keys().copied().collect(); - log::info!("active connections: {active:?}"); - break; - } - } - }); - } - Result::<(), LanMouseConnectionError>::Ok(()) - }); Err(LanMouseConnectionError::NotConnected) } } + +async fn connect_to_handle( + server: Server, + handle: ClientHandle, + conns: Rc>>>, + connecting: Rc>>, + tx: Sender<(ClientHandle, ProtoEvent)>, +) -> Result<(), LanMouseConnectionError> { + // sending did not work, figure out active conn. + if let Some(addrs) = server.get_ips(handle) { + let port = server.get_port(handle).unwrap_or(DEFAULT_PORT); + let addrs = addrs + .into_iter() + .map(|a| SocketAddr::new(a, port)) + .collect::>(); + log::info!("client ({handle}) connecting ... (ips: {addrs:?})"); + let res = connect_any(&addrs).await; + let (conn, addr) = match res { + Ok(c) => c, + Err(e) => { + connecting.lock().await.remove(&handle); + return Err(e); + } + }; + log::info!("client ({handle}) connected @ {addr}"); + server.set_active_addr(handle, Some(addr)); + conns.lock().await.insert(addr, conn.clone()); + connecting.lock().await.remove(&handle); + + // poll connection for active + spawn_local(ping_pong( + server.clone(), + handle, + addr, + conn.clone(), + conns.clone(), + )); + + // receiver + spawn_local(receive_loop(handle, conn, tx)); + return Ok(()); + } + Err(LanMouseConnectionError::NotConnected) +} + +async fn ping_pong( + server: Server, + handle: ClientHandle, + addr: SocketAddr, + conn: Arc, + conns: Rc>>>, +) { + loop { + let (buf, len) = ProtoEvent::Ping.into(); + if let Err(e) = conn.send(&buf[..len]).await { + log::warn!("client ({handle}) @ {addr} connection closed: {e}"); + conns.lock().await.remove(&addr); + server.set_active_addr(handle, None); + let active: Vec = conns.lock().await.keys().copied().collect(); + log::info!("active connections: {active:?}"); + break; + } + tokio::time::sleep(Duration::from_millis(500)).await; + let mut buf = [0u8; MAX_EVENT_SIZE]; + if let Err(e) = conn.recv(&mut buf).await { + log::warn!("recv(): client ({handle}) @ {addr} connection closed: {e}"); + conns.lock().await.remove(&addr); + server.set_active_addr(handle, None); + let active: Vec = conns.lock().await.keys().copied().collect(); + log::info!("active connections: {active:?}"); + break; + } + } +} + +async fn receive_loop( + handle: ClientHandle, + conn: Arc, + tx: Sender<(ClientHandle, ProtoEvent)>, +) { + let mut buf = [0u8; MAX_EVENT_SIZE]; + while let Ok(_) = conn.recv(&mut buf).await { + if let Ok(event) = buf.try_into() { + tx.send((handle, event)); + } + } +} diff --git a/src/server.rs b/src/server.rs index 5f1ee65..6548259 100644 --- a/src/server.rs +++ b/src/server.rs @@ -41,6 +41,7 @@ pub struct ReleaseToken; #[derive(Clone)] pub struct Server { + active: Rc>>, pub(crate) client_manager: Rc>, port: Rc>, pub(crate) release_bind: Vec, @@ -493,6 +494,14 @@ impl Server { pub(crate) fn release_capture(&self) { self.should_release.replace(Some(ReleaseToken)); } + + pub(crate) fn set_active(&self, handle: Option) { + self.active.replace(handle); + } + + pub(crate) fn get_active(&self) -> Option { + self.active.get() + } } fn to_capture_pos(pos: Position) -> input_capture::Position {