From 28e489541865afd05fad5393c40bcf279e3145fd Mon Sep 17 00:00:00 2001 From: Ferdinand Schober Date: Fri, 6 Sep 2024 00:55:57 +0200 Subject: [PATCH] impl emulation stuff --- lan-mouse-ipc/src/lib.rs | 4 +- lan-mouse-ipc/src/listen.rs | 14 ++-- lan-mouse-proto/src/lib.rs | 35 +++++++-- src/connect.rs | 39 ++++++++++ src/emulation.rs | 147 ++++++++++++++++++++++++++++++++++++ src/lib.rs | 8 +- src/listen.rs | 129 +++++++++++++++++++++++++++++++ src/server.rs | 73 ++++++++---------- src/server/network_task.rs | 116 ---------------------------- src/server/ping_task.rs | 138 --------------------------------- 10 files changed, 391 insertions(+), 312 deletions(-) create mode 100644 src/connect.rs create mode 100644 src/emulation.rs create mode 100644 src/listen.rs delete mode 100644 src/server/network_task.rs delete mode 100644 src/server/ping_task.rs diff --git a/lan-mouse-ipc/src/lib.rs b/lan-mouse-ipc/src/lib.rs index 4b5a0dc..cdc2dd1 100644 --- a/lan-mouse-ipc/src/lib.rs +++ b/lan-mouse-ipc/src/lib.rs @@ -33,7 +33,7 @@ pub enum ConnectionError { } #[derive(Debug, Error)] -pub enum ListenerCreationError { +pub enum IpcListenerCreationError { #[error("could not determine socket-path: `{0}`")] SocketPath(#[from] SocketPathError), #[error("service already running!")] @@ -51,7 +51,7 @@ pub enum IpcError { #[error(transparent)] Connection(#[from] ConnectionError), #[error(transparent)] - Listen(#[from] ListenerCreationError), + Listen(#[from] IpcListenerCreationError), } pub const DEFAULT_PORT: u16 = 4242; diff --git a/lan-mouse-ipc/src/listen.rs b/lan-mouse-ipc/src/listen.rs index c051c47..7c9d2a0 100644 --- a/lan-mouse-ipc/src/listen.rs +++ b/lan-mouse-ipc/src/listen.rs @@ -20,7 +20,7 @@ use tokio::net::TcpListener; #[cfg(windows)] use tokio::net::TcpStream; -use crate::{FrontendEvent, FrontendRequest, IpcError, ListenerCreationError}; +use crate::{FrontendEvent, FrontendRequest, IpcError, IpcListenerCreationError}; pub struct AsyncFrontendListener { #[cfg(windows)] @@ -40,7 +40,7 @@ pub struct AsyncFrontendListener { } impl AsyncFrontendListener { - pub async fn new() -> Result { + pub async fn new() -> Result { #[cfg(unix)] let (socket_path, listener) = { let socket_path = crate::default_socket_path()?; @@ -51,7 +51,7 @@ impl AsyncFrontendListener { // of lan-mouse is already running match UnixStream::connect(&socket_path).await { // connected -> lan-mouse is already running - Ok(_) => return Err(ListenerCreationError::AlreadyRunning), + Ok(_) => return Err(IpcListenerCreationError::AlreadyRunning), // lan-mouse is not running but a socket was left behind Err(e) => { log::debug!("{socket_path:?}: {e} - removing left behind socket"); @@ -63,9 +63,9 @@ impl AsyncFrontendListener { Ok(ls) => ls, // some other lan-mouse instance has bound the socket in the meantime Err(e) if e.kind() == ErrorKind::AddrInUse => { - return Err(ListenerCreationError::AlreadyRunning) + return Err(IpcListenerCreationError::AlreadyRunning) } - Err(e) => return Err(ListenerCreationError::Bind(e)), + Err(e) => return Err(IpcListenerCreationError::Bind(e)), }; (socket_path, listener) }; @@ -75,9 +75,9 @@ impl AsyncFrontendListener { Ok(ls) => ls, // some other lan-mouse instance has bound the socket in the meantime Err(e) if e.kind() == ErrorKind::AddrInUse => { - return Err(ListenerCreationError::AlreadyRunning) + return Err(IpcListenerCreationError::AlreadyRunning) } - Err(e) => return Err(ListenerCreationError::Bind(e)), + Err(e) => return Err(IpcListenerCreationError::Bind(e)), }; let adapter = Self { diff --git a/lan-mouse-proto/src/lib.rs b/lan-mouse-proto/src/lib.rs index d4aa5bf..e490136 100644 --- a/lan-mouse-proto/src/lib.rs +++ b/lan-mouse-proto/src/lib.rs @@ -2,7 +2,7 @@ use input_event::{Event as InputEvent, KeyboardEvent, PointerEvent}; use num_enum::{IntoPrimitive, TryFromPrimitive, TryFromPrimitiveError}; use paste::paste; use std::{ - fmt::{Debug, Display}, + fmt::{Debug, Display, Formatter}, mem::size_of, }; use thiserror::Error; @@ -18,6 +18,31 @@ pub enum ProtocolError { /// event type does not exist #[error("invalid event id: `{0}`")] InvalidEventId(#[from] TryFromPrimitiveError), + /// position type does not exist + #[error("invalid event id: `{0}`")] + InvalidPosition(#[from] TryFromPrimitiveError), +} + +/// Position of a client +#[derive(Clone, Copy, Debug, TryFromPrimitive, IntoPrimitive)] +#[repr(u8)] +pub enum Position { + Left, + Right, + Top, + Bottom, +} + +impl Display for Position { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let pos = match self { + Position::Left => "left", + Position::Right => "right", + Position::Top => "top", + Position::Bottom => "bottom", + }; + write!(f, "{pos}") + } } /// main lan-mouse protocol event type @@ -25,7 +50,7 @@ pub enum ProtocolError { pub enum ProtoEvent { /// notify a client that the cursor entered its region /// [`ProtoEvent::Ack`] with the same serial is used for synchronization between devices - Enter(u32), + Enter(Position), /// notify a client that the cursor left its region /// [`ProtoEvent::Ack`] with the same serial is used for synchronization between devices Leave(u32), @@ -41,7 +66,7 @@ pub enum ProtoEvent { } impl Display for ProtoEvent { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { match self { ProtoEvent::Enter(s) => write!(f, "Enter({s})"), ProtoEvent::Leave(s) => write!(f, "Leave({s})"), @@ -140,7 +165,7 @@ impl TryFrom<[u8; MAX_EVENT_SIZE]> for ProtoEvent { ))), EventType::Ping => Ok(Self::Ping), EventType::Pong => Ok(Self::Pong), - EventType::Enter => Ok(Self::Enter(decode_u32(&mut buf)?)), + EventType::Enter => Ok(Self::Enter(decode_u8(&mut buf)?.try_into()?)), EventType::Leave => Ok(Self::Leave(decode_u32(&mut buf)?)), EventType::Ack => Ok(Self::Ack(decode_u32(&mut buf)?)), } @@ -204,7 +229,7 @@ impl From for ([u8; MAX_EVENT_SIZE], usize) { }, ProtoEvent::Ping => {} ProtoEvent::Pong => {} - ProtoEvent::Enter(serial) => encode_u32(buf, len, serial), + ProtoEvent::Enter(pos) => encode_u8(buf, len, pos as u8), ProtoEvent::Leave(serial) => encode_u32(buf, len, serial), ProtoEvent::Ack(serial) => encode_u32(buf, len, serial), } diff --git a/src/connect.rs b/src/connect.rs new file mode 100644 index 0000000..48262f1 --- /dev/null +++ b/src/connect.rs @@ -0,0 +1,39 @@ +use std::{io, net::SocketAddr, sync::Arc}; +use thiserror::Error; +use tokio::net::UdpSocket; +use webrtc_dtls::{ + config::{Config, ExtendedMasterSecretType}, + conn::DTLSConn, + crypto::Certificate, +}; +use webrtc_util::Conn; + +#[derive(Debug, Error)] +pub(crate) enum LanMouseConnectionError { + #[error(transparent)] + Bind(#[from] io::Error), + #[error(transparent)] + Dtls(#[from] webrtc_dtls::Error), +} + +pub(crate) struct LanMouseConnection {} + +impl LanMouseConnection { + pub(crate) async fn connect( + addr: SocketAddr, + ) -> Result, LanMouseConnectionError> { + let conn = Arc::new(UdpSocket::bind("0.0.0.0:0").await?); + conn.connect(addr).await; + log::info!("connected to {addr}, establishing secure dtls channel ..."); + let certificate = Certificate::generate_self_signed(["localhost".to_owned()])?; + let config = Config { + certificates: vec![certificate], + insecure_skip_verify: true, + extended_master_secret: ExtendedMasterSecretType::Require, + ..Default::default() + }; + let dtls_conn: Arc = + Arc::new(DTLSConn::new(conn, config, true, None).await?); + Ok(dtls_conn) + } +} diff --git a/src/emulation.rs b/src/emulation.rs new file mode 100644 index 0000000..643ad8c --- /dev/null +++ b/src/emulation.rs @@ -0,0 +1,147 @@ +use crate::{listen::LanMouseListener, server::Server}; +use futures::StreamExt; +use input_emulation::{EmulationHandle, InputEmulation, InputEmulationError}; +use input_event::Event; +use lan_mouse_ipc::Status; +use lan_mouse_proto::ProtoEvent; +use local_channel::mpsc::{channel, Receiver, Sender}; +use std::{collections::HashMap, net::SocketAddr}; +use tokio::task::{spawn_local, JoinHandle}; + +/// emulation handling events received from a listener +pub(crate) struct Emulation { + server: Server, + listener: LanMouseListener, + emulation_proxy: EmulationProxy, +} + +impl Emulation { + pub(crate) fn new(server: Server, listener: LanMouseListener) -> Self { + let emulation_proxy = EmulationProxy::new(server.clone()); + Self { + server, + listener, + emulation_proxy, + } + } + + async fn run(&mut self) { + while let Some((event, addr)) = self.listener.next().await { + match event { + ProtoEvent::Enter(_) => { + self.server.release_capture(); + self.listener.reply(addr, ProtoEvent::Ack(0)).await; + } + ProtoEvent::Leave(_) => self.emulation_proxy.release_keys(addr).await, + ProtoEvent::Ack(_) => {} + ProtoEvent::Input(event) => self.emulation_proxy.consume(event, addr).await, + ProtoEvent::Ping => self.listener.reply(addr, ProtoEvent::Pong).await, + ProtoEvent::Pong => todo!(), + } + } + } +} + +/// proxy handling the actual input emulation, +/// discarding events when it is disabled +pub(crate) struct EmulationProxy { + server: Server, + tx: Sender<(ProxyEvent, SocketAddr)>, + task: JoinHandle<()>, +} + +enum ProxyEvent { + Input(Event), + ReleaseKeys, +} + +impl EmulationProxy { + fn new(server: Server) -> Self { + let (tx, rx) = channel(); + let task = spawn_local(Self::emulation_task(server.clone(), rx)); + Self { server, tx, task } + } + + async fn emulation_task(server: Server, mut rx: Receiver<(ProxyEvent, SocketAddr)>) { + let mut handles = HashMap::new(); + let mut next_id = 0; + loop { + if let Err(e) = Self::do_emulation(&server, &mut handles, &mut next_id, &mut rx).await { + log::warn!("input emulation exited: {e}"); + } + tokio::select! { + _ = server.emulation_notified() => {}, + _ = server.cancelled() => return, + } + } + } + + async fn do_emulation( + server: &Server, + handles: &mut HashMap, + next_id: &mut EmulationHandle, + rx: &mut Receiver<(ProxyEvent, SocketAddr)>, + ) -> Result<(), InputEmulationError> { + let backend = server.config.emulation_backend.map(|b| b.into()); + log::info!("creating input emulation ..."); + let mut emulation = tokio::select! { + r = InputEmulation::new(backend) => r?, + _ = server.cancelled() => return Ok(()), + }; + server.set_emulation_status(Status::Enabled); + + // create active handles + for &handle in handles.values() { + emulation.create(handle).await; + } + + let res = Self::do_emulation_session(server, &mut emulation, handles, next_id, rx).await; + // FIXME replace with async drop when stabilized + emulation.terminate().await; + + server.set_emulation_status(Status::Disabled); + res + } + + async fn do_emulation_session( + server: &Server, + emulation: &mut InputEmulation, + handles: &mut HashMap, + next_id: &mut EmulationHandle, + rx: &mut Receiver<(ProxyEvent, SocketAddr)>, + ) -> Result<(), InputEmulationError> { + loop { + tokio::select! { + e = rx.recv() => { + let (event, addr) = e.expect("channel closed"); + let handle = match handles.get(&addr) { + Some(&handle) => handle, + None => { + let handle = *next_id; + *next_id += 1; + emulation.create(handle).await; + handles.insert(addr, handle); + handle + } + }; + match event { + ProxyEvent::Input(event) => emulation.consume(event, handle).await?, + ProxyEvent::ReleaseKeys => emulation.release_keys(handle).await?, + } + } + _ = server.cancelled() => break Ok(()), + } + } + } + + async fn consume(&self, event: Event, addr: SocketAddr) { + // ignore events if emulation is currently disabled + if let Status::Enabled = self.server.emulation_status.get() { + self.tx.send((ProxyEvent::Input(event), addr)); + } + } + + async fn release_keys(&self, addr: SocketAddr) { + self.tx.send((ProxyEvent::ReleaseKeys, addr)); + } +} diff --git a/src/lib.rs b/src/lib.rs index 6c5852f..4094088 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3,6 +3,8 @@ pub mod config; pub mod dns; pub mod server; -pub mod capture_test; -pub mod crypto; -pub mod emulation_test; +mod capture_test; +mod connect; +mod crypto; +mod emulation; +mod listen; diff --git a/src/listen.rs b/src/listen.rs new file mode 100644 index 0000000..e0622e9 --- /dev/null +++ b/src/listen.rs @@ -0,0 +1,129 @@ +use futures::{Stream, StreamExt}; +use lan_mouse_proto::{ProtoEvent, MAX_EVENT_SIZE}; +use local_channel::mpsc::{channel, Receiver, Sender}; +use std::{net::SocketAddr, rc::Rc, sync::Arc}; +use thiserror::Error; +use tokio::{ + sync::Mutex, + task::{spawn_local, JoinHandle}, +}; +use webrtc_dtls::{ + config::{Config, ExtendedMasterSecretType}, + crypto::Certificate, + listener::listen, +}; +use webrtc_util::{conn::Listener, Conn, Error}; + +#[derive(Error, Debug)] +pub(crate) enum ListenerCreationError { + #[error(transparent)] + WebrtcUtil(#[from] webrtc_util::Error), + #[error(transparent)] + WebrtcDtls(#[from] webrtc_dtls::Error), +} + +pub(crate) struct LanMouseListener { + listen_rx: Receiver<(ProtoEvent, SocketAddr)>, + listen_task: JoinHandle<()>, + conns: Rc>>>, +} + +impl LanMouseListener { + pub(crate) async fn new(port: u16) -> Result { + let (listen_tx, listen_rx): ( + Sender<(ProtoEvent, SocketAddr)>, + Receiver<(ProtoEvent, SocketAddr)>, + ) = channel(); + + let listen_addr = SocketAddr::new("0.0.0.0".parse().expect("invalid ip"), port); + let certificate = Certificate::generate_self_signed(["localhost".to_owned()])?; + let cfg = Config { + certificates: vec![certificate], + extended_master_secret: ExtendedMasterSecretType::Require, + ..Default::default() + }; + + let listener = listen(listen_addr, cfg).await?; + + let conns: Rc>>> = Rc::new(Mutex::new(Vec::new())); + + let conns_clone = conns.clone(); + + let listen_task: JoinHandle<()> = spawn_local(async move { + loop { + let (conn, _addr) = match listener.accept().await { + Ok(c) => c, + Err(e) => { + log::warn!("accept: {e}"); + continue; + } + }; + let mut conns = conns_clone.lock().await; + conns.push(conn.clone()); + spawn_local(read_loop(conns_clone.clone(), conn, listen_tx.clone())); + } + }); + + Ok(Self { + conns, + listen_rx, + listen_task, + }) + } + + pub(crate) async fn broadcast(&self, event: ProtoEvent) { + let (buf, len): ([u8; MAX_EVENT_SIZE], usize) = event.into(); + let conns = self.conns.lock().await; + for conn in conns.iter() { + conn.send(&buf[..len]).await; + } + } + + pub(crate) async fn reply(&self, addr: SocketAddr, event: ProtoEvent) { + let (buf, len): ([u8; MAX_EVENT_SIZE], usize) = event.into(); + let conns = self.conns.lock().await; + for conn in conns.iter() { + if conn.remote_addr() == Some(addr) { + conn.send(&buf[..len]).await; + } + } + } +} + +impl Stream for LanMouseListener { + type Item = (ProtoEvent, SocketAddr); + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + self.listen_rx.poll_next_unpin(cx) + } +} + +async fn read_loop( + conns: Rc>>>, + conn: Arc, + dtls_tx: Sender<(ProtoEvent, SocketAddr)>, +) -> Result<(), Error> { + let mut b = [0u8; MAX_EVENT_SIZE]; + + while let Ok(_) = conn.recv(&mut b).await { + match b.try_into() { + Ok(event) => dtls_tx + .send((event, conn.remote_addr().expect("no remote addr"))) + .expect("channel closed"), + Err(e) => { + log::warn!("error receiving event: {e}"); + break; + } + } + } + let mut conns = conns.lock().await; + let index = conns + .iter() + .position(|c| c.remote_addr() == conn.remote_addr()) + .expect("connection not found"); + conns.remove(index); + Ok(()) +} diff --git a/src/server.rs b/src/server.rs index bc20304..bd0b7fd 100644 --- a/src/server.rs +++ b/src/server.rs @@ -15,17 +15,21 @@ use thiserror::Error; use tokio::{join, signal, sync::Notify}; use tokio_util::sync::CancellationToken; -use crate::{client::ClientManager, config::Config, dns::DnsResolver}; +use crate::{ + client::ClientManager, + config::Config, + dns::DnsResolver, + emulation::Emulation, + listen::{LanMouseListener, ListenerCreationError}, +}; use lan_mouse_ipc::{ AsyncFrontendListener, ClientConfig, ClientHandle, ClientState, FrontendEvent, FrontendRequest, - ListenerCreationError, Position, Status, + IpcListenerCreationError, Position, Status, }; mod capture_task; mod emulation_task; -mod network_task; -mod ping_task; #[derive(Clone, Copy, Debug, Eq, PartialEq)] enum State { @@ -43,9 +47,11 @@ pub enum ServiceError { #[error(transparent)] Dns(#[from] ResolveError), #[error(transparent)] - Listen(#[from] ListenerCreationError), + IpcListen(#[from] IpcListenerCreationError), #[error(transparent)] Io(#[from] io::Error), + #[error(transparent)] + ListenError(#[from] ListenerCreationError), } #[derive(Clone)] @@ -56,17 +62,16 @@ pub struct Server { state: Rc>, release_bind: Vec, notifies: Rc, - config: Rc, + pub(crate) config: Rc, pending_frontend_events: Rc>>, capture_status: Rc>, - emulation_status: Rc>, + pub(crate) emulation_status: Rc>, } #[derive(Default)] struct Notifies { capture: Notify, emulation: Notify, - ping: Notify, port_changed: Notify, frontend_event_pending: Notify, cancel: CancellationToken, @@ -121,7 +126,7 @@ impl Server { // create frontend communication adapter, exit if already running let mut frontend = match AsyncFrontendListener::new().await { Ok(f) => f, - Err(ListenerCreationError::AlreadyRunning) => { + Err(IpcListenerCreationError::AlreadyRunning) => { log::info!("service already running, exiting"); return Ok(()); } @@ -130,25 +135,21 @@ impl Server { let (capture_tx, capture_rx) = channel(); /* requests for input capture */ let (emulation_tx, emulation_rx) = channel(); /* emulation requests */ - let (udp_recv_tx, udp_recv_rx) = channel(); /* udp receiver */ - let (udp_send_tx, udp_send_rx) = channel(); /* udp sender */ let (dns_tx, dns_rx) = channel(); /* dns requests */ - let network = network_task::new(self.clone(), udp_recv_tx.clone(), udp_send_rx).await?; - let capture = capture_task::new(self.clone(), capture_rx, udp_send_tx.clone()); - let emulation = - emulation_task::new(self.clone(), emulation_rx, udp_recv_rx, udp_send_tx.clone()); + // udp task + let listener = LanMouseListener::new(self.config.port).await?; + + // input capture + // let capture = capture_task::new(self.clone(), capture_rx); + + // input emulation + let emulation = Emulation::new(self.clone(), listener); + + // create dns resolver let resolver = DnsResolver::new(dns_rx)?; let dns_task = tokio::task::spawn_local(resolver.run(self.clone())); - // task that pings clients to see if they are responding - let ping = ping_task::new( - self.clone(), - udp_send_tx.clone(), - emulation_tx.clone(), - capture_tx.clone(), - ); - for handle in self.active_clients() { dns_tx.send(handle).expect("channel closed"); } @@ -187,7 +188,7 @@ impl Server { log::info!("terminating service"); self.cancel(); - let _ = join!(capture, dns_task, emulation, network, ping); + let _ = join!(dns_task); Ok(()) } @@ -205,7 +206,7 @@ impl Server { self.notifies.cancel.cancelled().await } - fn is_cancelled(&self) -> bool { + pub(crate) fn is_cancelled(&self) -> bool { self.notifies.cancel.is_cancelled() } @@ -223,18 +224,10 @@ impl Server { self.notifies.emulation.notify_waiters() } - async fn emulation_notified(&self) { + pub(crate) async fn emulation_notified(&self) { self.notifies.emulation.notified().await } - fn restart_ping_timer(&self) { - self.notifies.ping.notify_waiters() - } - - async fn ping_timer_notified(&self) { - self.notifies.ping.notified().await - } - fn request_port_change(&self, port: u16) { self.port.replace(port); self.notifies.port_changed.notify_one(); @@ -396,12 +389,6 @@ impl Server { } } - fn update_pressed_keys(&self, handle: ClientHandle, has_pressed_keys: bool) { - if let Some((_, s)) = self.client_manager.borrow_mut().get_mut(handle) { - s.has_pressed_keys = has_pressed_keys; - } - } - fn update_fix_ips(&self, handle: ClientHandle, fix_ips: Vec) { if let Some((c, _)) = self.client_manager.borrow_mut().get_mut(handle) { c.fix_ips = fix_ips; @@ -504,7 +491,7 @@ impl Server { self.notify_frontend(event); } - fn set_emulation_status(&self, status: Status) { + pub(crate) fn set_emulation_status(&self, status: Status) { self.emulation_status.replace(status); let status = FrontendEvent::EmulationStatus(status); self.notify_frontend(status); @@ -550,6 +537,10 @@ impl Server { .get(handle) .and_then(|(_, s)| s.active_addr) } + + pub(crate) fn release_capture(&self) { + todo!() + } } fn to_capture_pos(pos: Position) -> input_capture::Position { diff --git a/src/server/network_task.rs b/src/server/network_task.rs deleted file mode 100644 index ec00706..0000000 --- a/src/server/network_task.rs +++ /dev/null @@ -1,116 +0,0 @@ -use local_channel::mpsc::{Receiver, Sender}; -use std::{cell::RefCell, collections::HashMap, io, net::SocketAddr, rc::Rc, sync::Arc}; -use webrtc_dtls::{ - config::{Config, ExtendedMasterSecretType}, - conn::DTLSConn, - crypto::Certificate, - listener::listen, -}; -use webrtc_util::{conn::Listener, Conn}; - -use thiserror::Error; -use tokio::{ - net::UdpSocket, - task::{spawn_local, JoinHandle}, -}; - -use crate::crypto; - -use super::Server; -use lan_mouse_proto::{ProtoEvent, ProtocolError}; - -pub(crate) async fn new( - server: Server, - udp_recv_tx: Sender>, - udp_send_rx: Receiver<(ProtoEvent, SocketAddr)>, -) -> io::Result> { - // bind the udp socket - let listen_addr = SocketAddr::new("0.0.0.0".parse().unwrap(), server.port.get()); - - Ok(spawn_local(async move { - let sender_rx = Rc::new(RefCell::new(udp_send_rx)); - let udp_receiver = spawn_local(listen_dtls(listen_addr, udp_recv_tx.clone())); - let udp_sender = spawn_local(udp_sender(sender_rx.clone())); - log::info!("starting sender + receiver"); - tokio::select! { - e = udp_receiver => panic!("{e:?}"), /* channel closed */ - _ = udp_sender => {}, /* channel closed */ - _ = server.cancelled() => {}, /* cancellation requested */ - } - })) -} - -async fn listen_dtls( - listen_addr: SocketAddr, - udp_recv_tx: Sender>, -) -> Result<(), NetworkError> { - let certificate = Certificate::generate_self_signed(vec!["localhost".to_owned()]).unwrap(); - let cfg = Config { - certificates: vec![certificate], - extended_master_secret: ExtendedMasterSecretType::Require, - ..Default::default() - }; - let listener = Arc::new(listen(listen_addr, cfg).await?); - loop { - while let Ok((conn, addr)) = listener.accept().await { - let udp_recv_tx = udp_recv_tx.clone(); - spawn_local(async move { - loop { - let mut buf = [0u8; lan_mouse_proto::MAX_EVENT_SIZE]; - let event: Result<_, NetworkError> = match conn.recv(&mut buf).await { - Ok(_len) => match ProtoEvent::try_from(buf) { - Ok(e) => Ok((e, addr)), - Err(e) => Err(e.into()), - }, - Err(e) => Err(e.into()), - }; - udp_recv_tx.send(event).expect("channel closed"); - } - }); - } - } -} - -async fn udp_sender(rx: Rc>>) { - let mut connection_pool: HashMap = HashMap::new(); - loop { - log::error!("waiting for event to send ..."); - let (event, addr) = rx.borrow_mut().recv().await.expect("channel closed"); - let addr = SocketAddr::new(addr.ip(), 4242); - if !connection_pool.contains_key(&addr) { - let socket = Arc::new(UdpSocket::bind("0.0.0.0:0").await.unwrap()); - socket.connect(addr).await.unwrap(); - let certificate = - Certificate::generate_self_signed(vec!["localhost".to_owned()]).unwrap(); - let config = Config { - certificates: vec![certificate], - insecure_skip_verify: true, - extended_master_secret: ExtendedMasterSecretType::Require, - ..Default::default() - }; - log::error!("connecting to {addr}"); - let conn = DTLSConn::new(socket, config, true, None).await.unwrap(); - log::error!("connected {addr}!"); - connection_pool.insert(addr, conn); - }; - let conn = connection_pool.get(&addr).unwrap(); - let (data, len): ([u8; lan_mouse_proto::MAX_EVENT_SIZE], usize) = event.into(); - conn.send(&data[..len]).await.unwrap(); - } -} - -#[derive(Debug, Error)] -pub(crate) enum NetworkError { - #[error(transparent)] - Protocol(#[from] ProtocolError), - #[error("network error: `{0}`")] - Io(#[from] io::Error), - #[error(transparent)] - Crypt(#[from] crypto::Error), - #[error(transparent)] - Rustls(#[from] rustls::Error), - #[error(transparent)] - WebrtcDtls(#[from] webrtc_dtls::Error), - #[error(transparent)] - WebrtcUtil(#[from] webrtc_util::Error), -} diff --git a/src/server/ping_task.rs b/src/server/ping_task.rs deleted file mode 100644 index 1406948..0000000 --- a/src/server/ping_task.rs +++ /dev/null @@ -1,138 +0,0 @@ -use std::{net::SocketAddr, time::Duration}; - -use lan_mouse_proto::ProtoEvent; -use local_channel::mpsc::Sender; -use tokio::task::JoinHandle; - -use lan_mouse_ipc::ClientHandle; - -use super::{capture_task::CaptureRequest, emulation_task::EmulationRequest, Server, State}; - -const MAX_RESPONSE_TIME: Duration = Duration::from_millis(500); - -pub(crate) fn new( - server: Server, - sender_ch: Sender<(ProtoEvent, SocketAddr)>, - emulate_notify: Sender, - capture_notify: Sender, -) -> JoinHandle<()> { - // timer task - tokio::task::spawn_local(async move { - tokio::select! { - _ = server.notifies.cancel.cancelled() => {} - _ = ping_task(&server, sender_ch, emulate_notify, capture_notify) => {} - } - }) -} - -async fn ping_task( - server: &Server, - sender_ch: Sender<(ProtoEvent, SocketAddr)>, - emulate_notify: Sender, - capture_notify: Sender, -) { - loop { - // wait for wake up signal - server.ping_timer_notified().await; - loop { - let receiving = server.state.get() == State::Receiving; - let (ping_clients, ping_addrs) = { - let mut client_manager = server.client_manager.borrow_mut(); - - let ping_clients: Vec = if receiving { - // if receiving we care about clients with pressed keys - client_manager - .get_client_states() - .filter(|(_, (_, s))| s.has_pressed_keys) - .map(|(h, _)| h) - .collect() - } else { - // if sending we care about the active client - server.active_client.get().iter().cloned().collect() - }; - - // get relevant socket addrs for clients - let ping_addrs: Vec = { - ping_clients - .iter() - .flat_map(|&h| client_manager.get(h)) - .flat_map(|(c, s)| { - if s.alive && s.active_addr.is_some() { - vec![s.active_addr.unwrap()] - } else { - s.ips - .iter() - .cloned() - .map(|ip| SocketAddr::new(ip, c.port)) - .collect() - } - }) - .collect() - }; - - // reset alive - for (_, (_, s)) in client_manager.get_client_states_mut() { - s.alive = false; - } - - (ping_clients, ping_addrs) - }; - - if receiving && ping_clients.is_empty() { - // receiving and no client has pressed keys - // -> no need to keep pinging - break; - } - - // ping clients - for addr in ping_addrs { - if sender_ch.send((ProtoEvent::Ping, addr)).is_err() { - break; - } - } - - // give clients time to resond - if receiving { - log::trace!( - "waiting {MAX_RESPONSE_TIME:?} for response from client with pressed keys ..." - ); - } else { - log::trace!( - "state: {:?} => waiting {MAX_RESPONSE_TIME:?} for client to respond ...", - server.state.get() - ); - } - - tokio::time::sleep(MAX_RESPONSE_TIME).await; - - // when anything is received from a client, - // the alive flag gets set - let unresponsive_clients: Vec<_> = { - let client_manager = server.client_manager.borrow(); - ping_clients - .iter() - .filter_map(|&h| match client_manager.get(h) { - Some((_, s)) if !s.alive => Some(h), - _ => None, - }) - .collect() - }; - - // we may not be receiving anymore but we should respond - // to the original state and not the "new" one - if receiving { - for h in unresponsive_clients { - log::warn!("device not responding, releasing keys!"); - let _ = emulate_notify.send(EmulationRequest::ReleaseKeys(h)); - } - } else { - // release pointer if the active client has not responded - if !unresponsive_clients.is_empty() { - log::warn!("client not responding, releasing pointer!"); - server.state.replace(State::Receiving); - let _ = capture_notify.send(CaptureRequest::Release); - } - } - } - } -}