From ad8c92cfbe421374f089bfcae4e21f84abd7a479 Mon Sep 17 00:00:00 2001 From: Ferdinand Schober Date: Fri, 6 Sep 2024 17:51:20 +0200 Subject: [PATCH] capture stuffs --- src/capture.rs | 167 ++++++++++++++++++++++++++-- src/connect.rs | 64 +++++------ src/server.rs | 43 ++------ src/server/capture_task.rs | 207 ----------------------------------- src/server/emulation_task.rs | 188 ------------------------------- 5 files changed, 198 insertions(+), 471 deletions(-) delete mode 100644 src/server/capture_task.rs delete mode 100644 src/server/emulation_task.rs diff --git a/src/capture.rs b/src/capture.rs index 02b41ee..aafbef9 100644 --- a/src/capture.rs +++ b/src/capture.rs @@ -1,17 +1,170 @@ -use crate::server::Server; +use futures::StreamExt; +use input_capture::{ + Backend, CaptureError, CaptureEvent, CaptureHandle, InputCapture, InputCaptureError, Position, +}; +use lan_mouse_ipc::{ClientHandle, Status}; +use lan_mouse_proto::ProtoEvent; +use local_channel::mpsc::{channel, Receiver, Sender}; +use tokio::{ + process::Command, + task::{spawn_local, JoinHandle}, +}; -pub(crate) struct Capture { +use crate::{connect::LanMouseConnection, server::Server}; + +pub(crate) struct CaptureProxy { server: Server, + tx: Sender, + task: JoinHandle<()>, } -impl Capture { - pub(crate) fn new(server: Server) -> Self { - Self { server } +#[derive(Clone, Copy, Debug)] +enum CaptureRequest { + /// capture must release the mouse + Release, + /// add a capture client + Create(CaptureHandle, Position), + /// destory a capture client + Destroy(CaptureHandle), +} + +impl CaptureProxy { + pub(crate) fn new(server: Server, backend: Option, conn: LanMouseConnection) -> Self { + let (tx, rx) = channel(); + let task = spawn_local(Self::run(server.clone(), backend, rx, conn)); + Self { server, tx, task } } - pub(crate) async fn run(&mut self, backend: input_capture::Backend) { + pub(crate) async fn run( + server: Server, + backend: Option, + mut rx: Receiver, + conn: LanMouseConnection, + ) { loop { - if let Err(e) = do_capture(backend) + if let Err(e) = do_capture(backend, &server, &conn, &mut rx).await { + log::warn!("input capture exited: {e}"); + } + server.set_capture_status(Status::Disabled); + tokio::select! { + _ = rx.recv() => continue, + _ = server.capture_enabled() => break, + _ = server.cancelled() => return, + } } } } + +async fn do_capture( + backend: Option, + server: &Server, + conn: &LanMouseConnection, + rx: &mut Receiver, +) -> Result<(), InputCaptureError> { + /* allow cancelling capture request */ + let mut capture = tokio::select! { + r = InputCapture::new(backend) => r?, + _ = server.cancelled() => return Ok(()), + }; + server.set_capture_status(Status::Enabled); + + let clients = server.active_clients(); + let clients = clients + .iter() + .copied() + .map(|handle| (handle, server.get_pos(handle).expect("no such client"))); + + /* create barriers for active clients */ + for (handle, pos) in clients { + capture.create(handle, to_capture_pos(pos)).await?; + } + + loop { + tokio::select! { + event = capture.next() => match event { + Some(event) => handle_capture_event(server, &mut capture, sender_tx, event?).await?, + None => return Ok(()), + }, + e = rx.recv() => { + match e { + Some(e) => match e { + CaptureRequest::Release => capture.release().await?, + CaptureRequest::Create(h, p) => capture.create(h, p).await?, + CaptureRequest::Destroy(h) => capture.destroy(h).await?, + }, + None => break, + } + } + _ = server.cancelled() => break, + } + } + + capture.terminate().await?; + Ok(()) +} + +async fn handle_capture_event( + server: &Server, + capture: &mut InputCapture, + conn: &mut LanMouseConnection, + event: (CaptureHandle, CaptureEvent), +) -> Result<(), CaptureError> { + let (handle, event) = event; + log::trace!("({handle}): {event:?}"); + + if server.should_release.borrow_mut().take().is_some() { + return capture.release().await; + } + + if event == CaptureEvent::Begin { + spawn_hook_command(server, handle); + } + + let event = match event { + CaptureEvent::Begin => ProtoEvent::Enter(lan_mouse_proto::Position::Left), + CaptureEvent::Input(e) => ProtoEvent::Input(e), + }; + + conn.send(event, handle).await; + Ok(()) +} + +fn to_capture_pos(pos: lan_mouse_ipc::Position) -> input_capture::Position { + match pos { + lan_mouse_ipc::Position::Left => input_capture::Position::Left, + lan_mouse_ipc::Position::Right => input_capture::Position::Right, + lan_mouse_ipc::Position::Top => input_capture::Position::Top, + lan_mouse_ipc::Position::Bottom => input_capture::Position::Bottom, + } +} + +fn spawn_hook_command(server: &Server, handle: ClientHandle) { + let Some(cmd) = server + .client_manager + .borrow() + .get(handle) + .and_then(|(c, _)| c.cmd.clone()) + else { + return; + }; + tokio::task::spawn_local(async move { + log::info!("spawning command!"); + let mut child = match Command::new("sh").arg("-c").arg(cmd.as_str()).spawn() { + Ok(c) => c, + Err(e) => { + log::warn!("could not execute cmd: {e}"); + return; + } + }; + match child.wait().await { + Ok(s) => { + if s.success() { + log::info!("{cmd} exited successfully"); + } else { + log::warn!("{cmd} exited with {s}"); + } + } + Err(e) => log::warn!("{cmd}: {e}"), + } + }); +} diff --git a/src/connect.rs b/src/connect.rs index 3d24f4f..fb87ff0 100644 --- a/src/connect.rs +++ b/src/connect.rs @@ -23,45 +23,39 @@ pub(crate) enum LanMouseConnectionError { NoIps, } -pub(crate) struct LanMouseConnection {} - -impl LanMouseConnection { - pub(crate) async fn connect( - addr: SocketAddr, - ) -> Result, LanMouseConnectionError> { - let conn = Arc::new(UdpSocket::bind("0.0.0.0:0").await?); - conn.connect(addr).await; - log::info!("connected to {addr}, establishing secure dtls channel ..."); - let certificate = Certificate::generate_self_signed(["localhost".to_owned()])?; - let config = Config { - certificates: vec![certificate], - insecure_skip_verify: true, - extended_master_secret: ExtendedMasterSecretType::Require, - ..Default::default() - }; - let dtls_conn: Arc = - Arc::new(DTLSConn::new(conn, config, true, None).await?); - Ok(dtls_conn) - } - - pub(crate) async fn connect_any( - addrs: &[SocketAddr], - ) -> Result, LanMouseConnectionError> { - let mut joinset = JoinSet::new(); - for &addr in addrs { - joinset.spawn_local(Self::connect(addr)); - } - let conn = joinset.join_next().await; - conn.expect("no addrs to connect").expect("failed to join") - } +async fn connect(addr: SocketAddr) -> Result, LanMouseConnectionError> { + let conn = Arc::new(UdpSocket::bind("0.0.0.0:0").await?); + conn.connect(addr).await; + log::info!("connected to {addr}, establishing secure dtls channel ..."); + let certificate = Certificate::generate_self_signed(["localhost".to_owned()])?; + let config = Config { + certificates: vec![certificate], + insecure_skip_verify: true, + extended_master_secret: ExtendedMasterSecretType::Require, + ..Default::default() + }; + let dtls_conn: Arc = + Arc::new(DTLSConn::new(conn, config, true, None).await?); + Ok(dtls_conn) } -struct ConnectionProxy { +async fn connect_any( + addrs: &[SocketAddr], +) -> Result, LanMouseConnectionError> { + let mut joinset = JoinSet::new(); + for &addr in addrs { + joinset.spawn_local(connect(addr)); + } + let conn = joinset.join_next().await; + conn.expect("no addrs to connect").expect("failed to join") +} + +pub(crate) struct LanMouseConnection { server: Server, conns: HashMap>, } -impl ConnectionProxy { +impl LanMouseConnection { fn find_conn(&self, addrs: &[SocketAddr]) -> Vec> { let mut conns = vec![]; for addr in addrs { @@ -72,7 +66,7 @@ impl ConnectionProxy { conns } - async fn send( + pub(crate) async fn send( &self, event: ProtoEvent, handle: ClientHandle, @@ -93,7 +87,7 @@ impl ConnectionProxy { .into_iter() .map(|a| SocketAddr::new(a, port)) .collect::>(); - let conn = LanMouseConnection::connect_any(&addrs).await?; + let conn = connect_any(&addrs).await?; let addr = conn.remote_addr().expect("no remote addr"); self.server.set_active_addr(handle, addr); conn.send(buf).await?; diff --git a/src/server.rs b/src/server.rs index 874f871..cbd88d3 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,5 +1,4 @@ use capture_task::CaptureRequest; -use emulation_task::EmulationRequest; use futures::StreamExt; use hickory_resolver::error::ResolveError; use local_channel::mpsc::{channel, Sender}; @@ -31,17 +30,6 @@ use lan_mouse_ipc::{ mod capture_task; mod emulation_task; -#[derive(Clone, Copy, Debug, Eq, PartialEq)] -enum State { - /// Currently sending events to another device - Sending, - /// Currently receiving events from other devices - Receiving, - /// Entered the deadzone of another device but waiting - /// for acknowledgement (Leave event) from the device - AwaitAck, -} - #[derive(Debug, Error)] pub enum ServiceError { #[error(transparent)] @@ -54,18 +42,19 @@ pub enum ServiceError { ListenError(#[from] ListenerCreationError), } +pub struct ReleaseToken; + #[derive(Clone)] pub struct Server { - active_client: Rc>>, pub(crate) client_manager: Rc>, port: Rc>, - state: Rc>, release_bind: Vec, notifies: Rc, pub(crate) config: Rc, pending_frontend_events: Rc>>, capture_status: Rc>, pub(crate) emulation_status: Rc>, + pub(crate) should_release: Rc>>, } #[derive(Default)] @@ -79,9 +68,7 @@ struct Notifies { impl Server { pub fn new(config: Config) -> Self { - let active_client = Rc::new(Cell::new(None)); let client_manager = Rc::new(RefCell::new(ClientManager::default())); - let state = Rc::new(Cell::new(State::Receiving)); let port = Rc::new(Cell::new(config.port)); for config_client in config.get_clients() { let client = ClientConfig { @@ -110,10 +97,8 @@ impl Server { Self { config, - active_client, client_manager, port, - state, release_bind, notifies, pending_frontend_events: Rc::new(RefCell::new(VecDeque::new())), @@ -215,7 +200,7 @@ impl Server { self.notifies.capture.notify_waiters() } - async fn capture_enabled(&self) { + pub(crate) async fn capture_enabled(&self) { self.notifies.capture.notified().await } @@ -242,7 +227,7 @@ impl Server { self.notify_frontend(FrontendEvent::Changed(handle)); } - fn active_clients(&self) -> Vec { + pub(crate) fn active_clients(&self) -> Vec { self.client_manager .borrow() .get_client_states() @@ -488,7 +473,7 @@ impl Server { self.notify_frontend(status); } - fn set_capture_status(&self, status: Status) { + pub(crate) fn set_capture_status(&self, status: Status) { self.capture_status.replace(status); let status = FrontendEvent::CaptureStatus(status); self.notify_frontend(status); @@ -508,18 +493,8 @@ impl Server { .and_then(|(c, _)| c.hostname.clone()) } - fn get_state(&self) -> State { - self.state.get() - } - - fn set_state(&self, state: State) { - log::debug!("state => {state:?}"); - self.state.replace(state); - } - - fn set_active(&self, handle: Option) { - log::debug!("active client => {handle:?}"); - self.active_client.replace(handle); + pub(crate) fn get_pos(&self, handle: ClientHandle) -> Option { + self.client_manager.borrow().get(handle).map(|(c, _)| c.pos) } pub(crate) fn set_active_addr(&self, handle: ClientHandle, addr: SocketAddr) { @@ -536,7 +511,7 @@ impl Server { } pub(crate) fn release_capture(&self) { - todo!() + self.should_release.replace(Some(ReleaseToken)); } } diff --git a/src/server/capture_task.rs b/src/server/capture_task.rs deleted file mode 100644 index 1043806..0000000 --- a/src/server/capture_task.rs +++ /dev/null @@ -1,207 +0,0 @@ -use futures::StreamExt; -use lan_mouse_proto::ProtoEvent; -use local_channel::mpsc::{Receiver, Sender}; -use std::net::SocketAddr; - -use tokio::{process::Command, task::JoinHandle}; - -use input_capture::{ - self, CaptureError, CaptureEvent, CaptureHandle, InputCapture, InputCaptureError, Position, -}; - -use crate::server::State; -use lan_mouse_ipc::{ClientHandle, Status}; - -use super::Server; - -#[derive(Clone, Copy, Debug)] -pub(crate) enum CaptureRequest { - /// capture must release the mouse - Release, - /// add a capture client - Create(CaptureHandle, Position), - /// destory a capture client - Destroy(CaptureHandle), -} - -pub(crate) fn new( - server: Server, - capture_rx: Receiver, - udp_send: Sender<(ProtoEvent, SocketAddr)>, -) -> JoinHandle<()> { - let backend = server.config.capture_backend.map(|b| b.into()); - tokio::task::spawn_local(capture_task(server, backend, udp_send, capture_rx)) -} - -async fn capture_task( - server: Server, - backend: Option, - sender_tx: Sender<(ProtoEvent, SocketAddr)>, - mut notify_rx: Receiver, -) { - loop { - if let Err(e) = do_capture(backend, &server, &sender_tx, &mut notify_rx).await { - log::warn!("input capture exited: {e}"); - } - server.set_capture_status(Status::Disabled); - if server.is_cancelled() { - break; - } - - // allow cancellation - loop { - tokio::select! { - _ = notify_rx.recv() => continue, /* need to ignore requests here! */ - _ = server.capture_enabled() => break, - _ = server.cancelled() => return, - } - } - } -} - -async fn do_capture( - backend: Option, - server: &Server, - sender_tx: &Sender<(ProtoEvent, SocketAddr)>, - notify_rx: &mut Receiver, -) -> Result<(), InputCaptureError> { - /* allow cancelling capture request */ - let mut capture = tokio::select! { - r = InputCapture::new(backend) => r?, - _ = server.cancelled() => return Ok(()), - }; - - server.set_capture_status(Status::Enabled); - - let clients = server.active_clients(); - let clients = clients.iter().copied().map(|handle| { - ( - handle, - server - .client_manager - .borrow() - .get(handle) - .map(|(c, _)| c.pos) - .expect("no such client"), - ) - }); - for (handle, pos) in clients { - capture.create(handle, to_capture_pos(pos)).await?; - } - - loop { - tokio::select! { - event = capture.next() => match event { - Some(event) => handle_capture_event(server, &mut capture, sender_tx, event?).await?, - None => return Ok(()), - }, - e = notify_rx.recv() => { - log::debug!("input capture notify rx: {e:?}"); - match e { - Some(e) => match e { - CaptureRequest::Release => { - capture.release().await?; - server.state.replace(State::Receiving); - } - CaptureRequest::Create(h, p) => capture.create(h, p).await?, - CaptureRequest::Destroy(h) => capture.destroy(h).await?, - }, - None => break, - } - } - _ = server.cancelled() => break, - } - } - capture.terminate().await?; - Ok(()) -} - -async fn handle_capture_event( - server: &Server, - capture: &mut InputCapture, - sender_tx: &Sender<(ProtoEvent, SocketAddr)>, - event: (CaptureHandle, CaptureEvent), -) -> Result<(), CaptureError> { - let (handle, event) = event; - log::trace!("({handle}) {event:?}"); - - // capture started - if event == CaptureEvent::Begin { - // wait for remote to acknowlegde enter - server.set_state(State::AwaitAck); - server.set_active(Some(handle)); - // restart ping timer to release capture if unreachable - server.restart_ping_timer(); - // spawn enter hook cmd - spawn_hook_command(server, handle); - } - - // release capture if emulation set state to Receiveing - if server.get_state() == State::Receiving { - capture.release().await?; - return Ok(()); - } - - // check release bind - if capture.keys_pressed(&server.release_bind) { - capture.release().await?; - server.set_state(State::Receiving); - } - - if let Some(addr) = server.active_addr(handle) { - let event = match server.get_state() { - State::Sending => match event { - CaptureEvent::Begin => ProtoEvent::Enter(0), - CaptureEvent::Input(e) => ProtoEvent::Input(e), - }, - /* send additional enter events until acknowleged */ - State::AwaitAck => ProtoEvent::Enter(0), - /* released capture */ - State::Receiving => ProtoEvent::Leave(0), - }; - log::error!("SENDING: {event:?} -> {addr:?}"); - sender_tx.send((event, addr)).expect("sender closed"); - }; - - Ok(()) -} - -fn spawn_hook_command(server: &Server, handle: ClientHandle) { - let Some(cmd) = server - .client_manager - .borrow() - .get(handle) - .and_then(|(c, _)| c.cmd.clone()) - else { - return; - }; - tokio::task::spawn_local(async move { - log::info!("spawning command!"); - let mut child = match Command::new("sh").arg("-c").arg(cmd.as_str()).spawn() { - Ok(c) => c, - Err(e) => { - log::warn!("could not execute cmd: {e}"); - return; - } - }; - match child.wait().await { - Ok(s) => { - if s.success() { - log::info!("{cmd} exited successfully"); - } else { - log::warn!("{cmd} exited with {s}"); - } - } - Err(e) => log::warn!("{cmd}: {e}"), - } - }); -} - -fn to_capture_pos(pos: lan_mouse_ipc::Position) -> input_capture::Position { - match pos { - lan_mouse_ipc::Position::Left => input_capture::Position::Left, - lan_mouse_ipc::Position::Right => input_capture::Position::Right, - lan_mouse_ipc::Position::Top => input_capture::Position::Top, - lan_mouse_ipc::Position::Bottom => input_capture::Position::Bottom, - } -} diff --git a/src/server/emulation_task.rs b/src/server/emulation_task.rs deleted file mode 100644 index 1c71f08..0000000 --- a/src/server/emulation_task.rs +++ /dev/null @@ -1,188 +0,0 @@ -use local_channel::mpsc::{Receiver, Sender}; -use std::net::SocketAddr; - -use lan_mouse_proto::ProtoEvent; -use tokio::task::JoinHandle; - -use lan_mouse_ipc::ClientHandle; - -use crate::{client::ClientManager, server::State}; -use input_emulation::{self, EmulationError, EmulationHandle, InputEmulation, InputEmulationError}; -use lan_mouse_ipc::Status; - -use super::{network_task::NetworkError, Server}; - -#[derive(Clone, Debug)] -pub(crate) enum EmulationRequest { - /// create a new client - Create(EmulationHandle), - /// destroy a client - Destroy(EmulationHandle), - /// input emulation must release keys for client - ReleaseKeys(ClientHandle), -} - -pub(crate) fn new( - server: Server, - emulation_rx: Receiver, - udp_rx: Receiver>, - sender_tx: Sender<(ProtoEvent, SocketAddr)>, -) -> JoinHandle<()> { - let emulation_task = emulation_task(server, emulation_rx, udp_rx, sender_tx); - tokio::task::spawn_local(emulation_task) -} - -async fn emulation_task( - server: Server, - mut rx: Receiver, - mut udp_rx: Receiver>, - sender_tx: Sender<(ProtoEvent, SocketAddr)>, -) { - loop { - if let Err(e) = do_emulation(&server, &mut rx, &mut udp_rx, &sender_tx).await { - log::warn!("input emulation exited: {e}"); - } - server.set_emulation_status(Status::Disabled); - if server.is_cancelled() { - break; - } - - // allow cancellation - loop { - tokio::select! { - _ = rx.recv() => continue, /* need to ignore requests here! */ - _ = server.emulation_notified() => break, - _ = server.cancelled() => return, - } - } - } -} - -async fn do_emulation( - server: &Server, - rx: &mut Receiver, - udp_rx: &mut Receiver>, - sender_tx: &Sender<(ProtoEvent, SocketAddr)>, -) -> Result<(), InputEmulationError> { - let backend = server.config.emulation_backend.map(|b| b.into()); - log::info!("creating input emulation..."); - let mut emulation = tokio::select! { - r = InputEmulation::new(backend) => r?, - _ = server.cancelled() => return Ok(()), - }; - - server.set_emulation_status(Status::Enabled); - - // add clients - for handle in server.active_clients() { - emulation.create(handle).await; - } - - let res = do_emulation_session(server, &mut emulation, rx, udp_rx, sender_tx).await; - emulation.terminate().await; // manual drop - res -} - -async fn do_emulation_session( - server: &Server, - emulation: &mut InputEmulation, - rx: &mut Receiver, - udp_rx: &mut Receiver>, - sender_tx: &Sender<(ProtoEvent, SocketAddr)>, -) -> Result<(), InputEmulationError> { - let mut last_ignored = None; - - loop { - tokio::select! { - udp_event = udp_rx.recv() => { - let udp_event = match udp_event.expect("channel closed") { - Ok(e) => e, - Err(e) => { - log::warn!("network error: {e}"); - continue; - } - }; - handle_incoming_event(server, emulation, sender_tx, &mut last_ignored, udp_event).await?; - } - emulate_event = rx.recv() => { - match emulate_event.expect("channel closed") { - EmulationRequest::Create(h) => { let _ = emulation.create(h).await; }, - EmulationRequest::Destroy(h) => emulation.destroy(h).await, - EmulationRequest::ReleaseKeys(c) => emulation.release_keys(c).await?, - } - } - _ = server.notifies.cancel.cancelled() => break Ok(()), - } - } -} - -async fn handle_incoming_event( - server: &Server, - emulate: &mut InputEmulation, - sender_tx: &Sender<(ProtoEvent, SocketAddr)>, - last_ignored: &mut Option, - event: (ProtoEvent, SocketAddr), -) -> Result<(), EmulationError> { - let (event, addr) = event; - - log::trace!("{:20} <-<-<-<------ {addr}", event.to_string()); - - // get client handle for addr - let Some(handle) = - activate_client_if_exists(&mut server.client_manager.borrow_mut(), addr, last_ignored) - else { - return Ok(()); - }; - - match (event, addr) { - (ProtoEvent::Pong, _) => { /* ignore pong events */ } - (ProtoEvent::Ping, addr) => { - let _ = sender_tx.send((ProtoEvent::Pong, addr)); - } - (ProtoEvent::Leave(_), _) => emulate.release_keys(handle).await?, - (ProtoEvent::Ack(_), _) => server.set_state(State::Sending), - (ProtoEvent::Enter(_), _) => { - server.set_state(State::Receiving); - sender_tx - .send((ProtoEvent::Ack(0), addr)) - .expect("no channel") - } - (ProtoEvent::Input(e), _) => { - if let State::Receiving = server.get_state() { - log::trace!("{event} => emulate"); - emulate.consume(e, handle).await?; - let has_pressed_keys = emulate.has_pressed_keys(handle); - server.update_pressed_keys(handle, has_pressed_keys); - if has_pressed_keys { - server.restart_ping_timer(); - } - } - } - } - Ok(()) -} - -fn activate_client_if_exists( - client_manager: &mut ClientManager, - addr: SocketAddr, - last_ignored: &mut Option, -) -> Option { - let Some(handle) = client_manager.get_client(addr) else { - // log ignored if it is the first event from the client in a series - if last_ignored.is_none() || last_ignored.is_some() && last_ignored.unwrap() != addr { - log::warn!("ignoring events from client {addr}"); - last_ignored.replace(addr); - } - return None; - }; - // next event can be logged as ignored again - last_ignored.take(); - - let (_, client_state) = client_manager.get_mut(handle)?; - - // reset ttl for client - client_state.alive = true; - // set addr as new default for this client - client_state.active_addr = Some(addr); - Some(handle) -}