From e063f4ffb4fac373226313f9ea5622d0670585c8 Mon Sep 17 00:00:00 2001 From: Ferdinand Schober Date: Fri, 12 Jul 2024 17:09:45 +0200 Subject: [PATCH] restructure server --- src/dns.rs | 66 ++++- src/frontend.rs | 3 +- src/main.rs | 10 +- src/server.rs | 460 +++++++++++++++++++++++++++++------ src/server/capture_task.rs | 55 +---- src/server/emulation_task.rs | 57 +---- src/server/frontend_task.rs | 364 --------------------------- src/server/network_task.rs | 41 ++-- src/server/resolver_task.rs | 82 ------- 9 files changed, 487 insertions(+), 651 deletions(-) delete mode 100644 src/server/frontend_task.rs delete mode 100644 src/server/resolver_task.rs diff --git a/src/dns.rs b/src/dns.rs index f331bf1..21e32f0 100644 --- a/src/dns.rs +++ b/src/dns.rs @@ -1,15 +1,27 @@ use anyhow::Result; -use std::{error::Error, net::IpAddr}; +use std::{collections::HashSet, error::Error, net::IpAddr}; +use tokio::sync::mpsc::{channel, Receiver, Sender}; use hickory_resolver::TokioAsyncResolver; -pub struct DnsResolver { +use crate::{client::ClientHandle, server::Server}; + +pub(crate) struct DnsResolver { resolver: TokioAsyncResolver, + dns_request: Receiver, } + impl DnsResolver { - pub(crate) async fn new() -> Result { + pub(crate) async fn new() -> Result<(Self, Sender)> { let resolver = TokioAsyncResolver::tokio_from_system_conf()?; - Ok(Self { resolver }) + let (dns_tx, dns_request) = channel(1); + Ok(( + Self { + resolver, + dns_request, + }, + dns_tx, + )) } pub(crate) async fn resolve(&self, host: &str) -> Result, Box> { @@ -20,4 +32,50 @@ impl DnsResolver { } Ok(response.iter().collect()) } + + pub async fn run(mut self, server: Server) { + tokio::select! { + _ = server.cancelled() => {}, + _ = self.do_dns(&server) => {}, + } + } + + async fn do_dns(&mut self, server: &Server) { + loop { + let handle = self.dns_request.recv().await.expect("channel closed"); + + /* update resolving status */ + let hostname = if let Some((c, s)) = server.client_manager.borrow_mut().get_mut(handle) + { + s.resolving = true; + c.hostname.clone() + } else { + continue; + }; + let Some(hostname) = hostname else { + continue; + }; + + server.notify_client_update(handle); + + let ips = match self.resolve(&hostname).await { + Ok(ips) => ips, + Err(e) => { + log::warn!("could not resolve host '{hostname}': {e}"); + vec![] + } + }; + + /* update ips and resolving state */ + if let Some((c, s)) = server.client_manager.borrow_mut().get_mut(handle) { + let mut addrs = HashSet::from_iter(c.fix_ips.iter().cloned()); + for ip in ips { + addrs.insert(ip); + } + s.ips = addrs; + s.resolving = false; + } + server.notify_client_update(handle); + } + } } diff --git a/src/frontend.rs b/src/frontend.rs index db12ca5..82c2b86 100644 --- a/src/frontend.rs +++ b/src/frontend.rs @@ -253,7 +253,7 @@ impl FrontendListener { Ok(rx) } - pub(crate) async fn broadcast_event(&mut self, notify: FrontendEvent) -> Result<()> { + pub(crate) async fn broadcast(&mut self, notify: FrontendEvent) { // encode event let json = serde_json::to_string(¬ify).unwrap(); let payload = json.as_bytes(); @@ -277,7 +277,6 @@ impl FrontendListener { // could not find a better solution because async let mut keep = keep.into_iter(); self.tx_streams.retain(|_| keep.next().unwrap()); - Ok(()) } } diff --git a/src/main.rs b/src/main.rs index 28434d6..c1c65f2 100644 --- a/src/main.rs +++ b/src/main.rs @@ -37,7 +37,7 @@ pub fn run() -> Result<()> { emulation_test::run()?; } else if config.daemon { // if daemon is specified we run the service - run_service(&config)?; + run_service(config)?; } else { // otherwise start the service as a child process and // run a frontend @@ -58,7 +58,7 @@ pub fn run() -> Result<()> { anyhow::Ok(()) } -fn run_service(config: &Config) -> Result<()> { +fn run_service(config: Config) -> Result<()> { // create single threaded tokio runtime let runtime = tokio::runtime::Builder::new_current_thread() .enable_io() @@ -70,10 +70,8 @@ fn run_service(config: &Config) -> Result<()> { // run main loop log::info!("Press {:?} to release the mouse", config.release_bind); - let server = Server::new(config); - server - .run(config.capture_backend, config.emulation_backend) - .await?; + let mut server = Server::new(config); + server.run().await?; log::debug!("service exiting"); anyhow::Ok(()) diff --git a/src/server.rs b/src/server.rs index 3881b87..35116eb 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,31 +1,41 @@ +use emulation_task::EmulationEvent; use log; use std::{ cell::{Cell, RefCell}, - collections::HashSet, + collections::{HashSet, VecDeque}, + io::ErrorKind, + net::{IpAddr, SocketAddr}, rc::Rc, }; use tokio::{ + io::ReadHalf, join, signal, - sync::{mpsc::channel, Notify}, + sync::{ + mpsc::{channel, Sender}, + Notify, + }, + task::JoinHandle, }; use tokio_util::sync::CancellationToken; use crate::{ - client::{ClientConfig, ClientHandle, ClientManager, ClientState}, - config::{CaptureBackend, Config, EmulationBackend}, + client::{ClientConfig, ClientHandle, ClientManager, ClientState, Position}, + config::Config, dns::DnsResolver, - frontend::{FrontendListener, FrontendRequest}, + frontend::{self, FrontendEvent, FrontendListener, FrontendRequest}, server::capture_task::CaptureEvent, }; -use self::resolver_task::DnsRequest; +#[cfg(unix)] +use tokio::net::UnixStream; + +#[cfg(windows)] +use tokio::net::TcpStream; mod capture_task; mod emulation_task; -mod frontend_task; mod network_task; mod ping_task; -mod resolver_task; #[derive(Clone, Copy, Debug, Eq, PartialEq)] enum State { @@ -41,23 +51,29 @@ enum State { #[derive(Clone)] pub struct Server { active_client: Rc>>, - client_manager: Rc>, + pub(crate) client_manager: Rc>, port: Rc>, state: Rc>, release_bind: Vec, notifies: Rc, + config: Rc, + pending_frontend_events: Rc>>, + pending_dns_requests: Rc>>, } #[derive(Default)] struct Notifies { - ping: Notify, capture: Notify, emulation: Notify, + ping: Notify, + port_changed: Notify, + frontend_event_pending: Notify, + dns_request_pending: Notify, cancel: CancellationToken, } impl Server { - pub fn new(config: &Config) -> Self { + pub fn new(config: Config) -> Self { let active_client = Rc::new(Cell::new(None)); let client_manager = Rc::new(RefCell::new(ClientManager::default())); let state = Rc::new(Cell::new(State::Receiving)); @@ -85,87 +101,57 @@ impl Server { let notifies = Rc::new(Notifies::default()); let release_bind = config.release_bind.clone(); + let config = Rc::new(config); + Self { + config, active_client, client_manager, port, state, release_bind, notifies, + pending_frontend_events: Rc::new(RefCell::new(VecDeque::new())), + pending_dns_requests: Rc::new(RefCell::new(VecDeque::new())), } } - pub async fn run( - &self, - capture_backend: Option, - emulation_backend: Option, - ) -> anyhow::Result<()> { + pub async fn run(&mut self) -> anyhow::Result<()> { // create frontend communication adapter, exit if already running - let frontend = match FrontendListener::new().await { + let mut frontend = match FrontendListener::new().await { Some(f) => f?, None => { - // none means some other instance is already running log::info!("service already running, exiting"); - return anyhow::Ok(()); + return Ok(()); } }; - let (frontend_tx, frontend_rx) = channel(1); /* events for frontends */ - let (request_tx, request_rx) = channel(1); /* requests coming from frontends */ let (capture_tx, capture_rx) = channel(1); /* requests for input capture */ let (emulation_tx, emulation_rx) = channel(1); /* emulation requests */ let (udp_recv_tx, udp_recv_rx) = channel(1); /* udp receiver */ let (udp_send_tx, udp_send_rx) = channel(1); /* udp sender */ - let (port_tx, port_rx) = channel(1); /* port change request */ - let (dns_tx, dns_rx) = channel(1); /* dns requests */ // udp task - let network = network_task::new( - self.clone(), - udp_recv_tx, - udp_send_rx, - port_rx, - frontend_tx.clone(), - ) - .await?; + let network = network_task::new(self.clone(), udp_recv_tx, udp_send_rx).await?; // input capture - let capture = capture_task::new( - self.clone(), - capture_backend, - capture_rx, - udp_send_tx.clone(), - frontend_tx.clone(), - self.release_bind.clone(), - ); + let capture = capture_task::new(self.clone(), capture_rx, udp_send_tx.clone()); // input emulation let emulation = emulation_task::new( self.clone(), - emulation_backend, emulation_rx, udp_recv_rx, udp_send_tx.clone(), capture_tx.clone(), - frontend_tx.clone(), ); // create dns resolver - let resolver = DnsResolver::new().await?; - let resolver = resolver_task::new(self.clone(), resolver, dns_rx, frontend_tx); - - // frontend listener - let frontend = frontend_task::new( - self.clone(), - frontend, - frontend_rx, - request_tx.clone(), - request_rx, - capture_tx.clone(), - emulation_tx.clone(), - dns_tx.clone(), - port_tx, - ); + let (resolver, dns_request) = DnsResolver::new().await?; + let server = self.clone(); + let dns_task = tokio::task::spawn_local(async move { + resolver.run(server).await; + }); // task that pings clients to see if they are responding let ping = ping_task::new( @@ -175,42 +161,74 @@ impl Server { capture_tx.clone(), ); - let active = self - .client_manager - .borrow() - .get_client_states() - .filter_map(|(h, (c, s))| { - if s.active { - Some((h, c.hostname.clone())) - } else { - None - } - }) - .collect::>(); - for (handle, hostname) in active { - request_tx - .send(FrontendRequest::Activate(handle, true)) - .await?; - if let Some(hostname) = hostname { - let _ = dns_tx.send(DnsRequest { hostname, handle }).await; - } + for handle in self.active_clients() { + self.request_dns(handle); } log::info!("running service"); - signal::ctrl_c().await.expect("failed to listen for CTRL+C"); + + let mut join_handles = vec![]; + + let (request_tx, mut request_rx) = channel(1); + + loop { + tokio::select! { + stream = frontend.accept() => { + match stream { + Ok(s) => join_handles.push(self.clone().handle_frontend_stream(s, request_tx.clone())), + Err(e) => log::warn!("error accepting frontend connection: {e}"), + }; + } + request = request_rx.recv() => { + self.handle_request(&capture_tx, &emulation_tx, request.expect("channel closed")).await; + } + _ = self.notifies.frontend_event_pending.notified() => { + let events = self + .pending_frontend_events + .borrow_mut() + .drain(..) + .collect::>(); + for event in events { + frontend.broadcast(event).await; + } + }, + _ = self.notifies.dns_request_pending.notified() => { + let requests = self + .pending_dns_requests + .borrow_mut() + .drain(..) + .collect::>(); + for request in requests { + dns_request.send(request).await.expect("channel closed"); + } + } + _ = self.cancelled() => break, + r = signal::ctrl_c() => { + r.expect("failed to wait for CTRL+C"); + break; + } + } + } + log::info!("terminating service"); self.cancel(); - let _ = join!(capture, emulation, frontend, network, resolver, ping); + futures::future::join_all(join_handles).await; + let _ = join!(capture, dns_task, emulation, network, ping); Ok(()) } + fn notify_frontend(&self, event: FrontendEvent) { + self.pending_frontend_events.borrow_mut().push_back(event); + self.notifies.frontend_event_pending.notify_one(); + } + fn cancel(&self) { self.notifies.cancel.cancel(); } - async fn cancelled(&self) { + pub(crate) async fn cancelled(&self) { self.notifies.cancel.cancelled().await } @@ -241,4 +259,290 @@ impl Server { async fn ping_timer_notified(&self) { self.notifies.ping.notified().await } + + fn request_port_change(&self, port: u16) { + self.port.replace(port); + self.notifies.port_changed.notify_one(); + } + + fn notify_port_changed(&self, port: u16, msg: Option) { + self.port.replace(port); + self.notify_frontend(FrontendEvent::PortChanged(port, msg)); + } + + pub(crate) fn notify_client_update(&self, handle: ClientHandle) { + let state = self.client_manager.borrow().get(handle).cloned(); + if let Some((config, state)) = state { + self.notify_frontend(FrontendEvent::State(handle, config, state)); + } + } + + fn active_clients(&self) -> Vec { + self.client_manager + .borrow() + .get_client_states() + .filter_map(|(h, (_, s))| if s.active { Some(h) } else { None }) + .collect() + } + + fn request_dns(&self, handle: ClientHandle) { + self.pending_dns_requests.borrow_mut().push_back(handle); + } + + async fn handle_request( + &self, + capture: &Sender, + emulate: &Sender, + event: FrontendRequest, + ) -> bool { + log::debug!("frontend: {event:?}"); + match event { + FrontendRequest::EnableCapture => { + log::info!("received capture enable request"); + self.notify_capture(); + } + FrontendRequest::EnableEmulation => { + log::info!("received emulation enable request"); + self.notify_emulation(); + } + FrontendRequest::Create => { + let handle = self.add_client().await; + self.request_dns(handle); + } + FrontendRequest::Activate(handle, active) => { + if active { + self.activate_client(capture, emulate, handle).await; + } else { + self.deactivate_client(capture, emulate, handle).await; + } + } + FrontendRequest::ChangePort(port) => { + self.request_port_change(port); + } + FrontendRequest::Delete(handle) => { + self.remove_client(capture, emulate, handle).await; + self.notify_frontend(FrontendEvent::Deleted(handle)); + } + FrontendRequest::Enumerate() => { + let clients = self + .client_manager + .borrow() + .get_client_states() + .map(|(h, (c, s))| (h, c.clone(), s.clone())) + .collect(); + self.notify_frontend(FrontendEvent::Enumerate(clients)); + } + FrontendRequest::GetState(handle) => { + self.broadcast_client(handle); + } + FrontendRequest::UpdateFixIps(handle, fix_ips) => { + self.update_fix_ips(handle, fix_ips).await; + self.request_dns(handle); + } + FrontendRequest::UpdateHostname(handle, hostname) => { + self.update_hostname(handle, hostname).await; + self.request_dns(handle); + } + FrontendRequest::UpdatePort(handle, port) => { + self.update_port(handle, port); + } + FrontendRequest::UpdatePosition(handle, pos) => { + self.update_pos(handle, capture, emulate, pos).await; + } + FrontendRequest::ResolveDns(handle) => { + self.request_dns(handle); + } + }; + false + } + + fn handle_frontend_stream( + self, + #[cfg(unix)] stream: ReadHalf, + #[cfg(windows)] stream: ReadHalf, + request_channel: Sender, + ) -> JoinHandle<()> { + let tx = request_channel.clone(); + tokio::task::spawn_local(async move { + tokio::select! { + _ = listen_frontend(tx, stream) => {}, + _ = self.cancelled() => {}, + } + }) + } + + async fn add_client(&self) -> ClientHandle { + let handle = self.client_manager.borrow_mut().add_client(); + log::info!("added client {handle}"); + let (c, s) = self.client_manager.borrow().get(handle).unwrap().clone(); + self.notify_frontend(FrontendEvent::Created(handle, c, s)); + handle + } + + pub async fn deactivate_client( + &self, + capture: &Sender, + emulate: &Sender, + handle: ClientHandle, + ) { + match self.client_manager.borrow_mut().get_mut(handle) { + Some((_, s)) => s.active = false, + None => return, + }; + + let _ = capture.send(CaptureEvent::Destroy(handle)).await; + let _ = emulate.send(EmulationEvent::Destroy(handle)).await; + } + + pub async fn activate_client( + &self, + capture: &Sender, + emulate: &Sender, + handle: ClientHandle, + ) { + /* deactivate potential other client at this position */ + let pos = match self.client_manager.borrow().get(handle) { + Some((client, _)) => client.pos, + None => return, + }; + + let other = self.client_manager.borrow_mut().find_client(pos); + if let Some(other) = other { + if other != handle { + self.deactivate_client(capture, emulate, other).await; + } + } + + /* activate the client */ + if let Some((_, s)) = self.client_manager.borrow_mut().get_mut(handle) { + s.active = true; + } else { + return; + }; + + /* notify emulation, capture and frontends */ + let _ = capture.send(CaptureEvent::Create(handle, pos.into())).await; + let _ = emulate.send(EmulationEvent::Create(handle)).await; + } + + pub async fn remove_client( + &self, + capture: &Sender, + emulate: &Sender, + handle: ClientHandle, + ) { + let Some(active) = self + .client_manager + .borrow_mut() + .remove_client(handle) + .map(|(_, s)| s.active) + else { + return; + }; + + if active { + let _ = capture.send(CaptureEvent::Destroy(handle)).await; + let _ = emulate.send(EmulationEvent::Destroy(handle)).await; + } + } + + async fn update_fix_ips(&self, handle: ClientHandle, fix_ips: Vec) { + let mut client_manager = self.client_manager.borrow_mut(); + let Some((c, _)) = client_manager.get_mut(handle) else { + return; + }; + + c.fix_ips = fix_ips; + } + + async fn update_hostname(&self, handle: ClientHandle, hostname: Option) { + let mut client_manager = self.client_manager.borrow_mut(); + let Some((c, s)) = client_manager.get_mut(handle) else { + return; + }; + + // hostname changed + if c.hostname != hostname { + c.hostname = hostname; + s.ips = HashSet::from_iter(c.fix_ips.iter().cloned()); + s.active_addr = None; + self.request_dns(handle); + } + } + + fn update_port(&self, handle: ClientHandle, port: u16) { + let mut client_manager = self.client_manager.borrow_mut(); + let Some((c, s)) = client_manager.get_mut(handle) else { + return; + }; + + if c.port != port { + c.port = port; + s.active_addr = s.active_addr.map(|a| SocketAddr::new(a.ip(), port)); + } + } + + async fn update_pos( + &self, + handle: ClientHandle, + capture: &Sender, + emulate: &Sender, + pos: Position, + ) { + let (changed, active) = { + let mut client_manager = self.client_manager.borrow_mut(); + let Some((c, s)) = client_manager.get_mut(handle) else { + return; + }; + + let changed = c.pos != pos; + c.pos = pos; + (changed, s.active) + }; + + // update state in event input emulator & input capture + if changed { + if active { + let _ = capture.send(CaptureEvent::Destroy(handle)).await; + let _ = emulate.send(EmulationEvent::Destroy(handle)).await; + } + let _ = capture.send(CaptureEvent::Create(handle, pos.into())).await; + let _ = emulate.send(EmulationEvent::Create(handle)).await; + } + } + + fn broadcast_client(&self, handle: ClientHandle) { + let client = self.client_manager.borrow().get(handle).cloned(); + let event = if let Some((config, state)) = client { + FrontendEvent::State(handle, config, state) + } else { + FrontendEvent::NoSuchClient(handle) + }; + self.notify_frontend(event); + } +} + +async fn listen_frontend( + request_tx: Sender, + #[cfg(unix)] mut stream: ReadHalf, + #[cfg(windows)] mut stream: ReadHalf, +) { + use std::io; + loop { + let request = frontend::wait_for_request(&mut stream).await; + match request { + Ok(request) => { + let _ = request_tx.send(request).await; + } + Err(e) => { + if let Some(e) = e.downcast_ref::() { + if e.kind() == ErrorKind::UnexpectedEof { + return; + } + } + log::error!("error reading frontend event: {e}"); + return; + } + } + } } diff --git a/src/server/capture_task.rs b/src/server/capture_task.rs index 1d7fcbd..1f49bda 100644 --- a/src/server/capture_task.rs +++ b/src/server/capture_task.rs @@ -16,7 +16,6 @@ use input_event::{scancode, Event, KeyboardEvent}; use crate::{ client::ClientHandle, - config::CaptureBackend, frontend::{FrontendEvent, Status}, server::State, }; @@ -43,21 +42,11 @@ pub enum CaptureEvent { pub fn new( server: Server, - backend: Option, capture_rx: Receiver, udp_send: Sender<(Event, SocketAddr)>, - frontend_tx: Sender, - release_bind: Vec, ) -> JoinHandle<()> { - let backend = backend.map(|b| b.into()); - tokio::task::spawn_local(capture_task( - server, - backend, - udp_send, - capture_rx, - frontend_tx, - release_bind, - )) + let backend = server.config.capture_backend.map(|b| b.into()); + tokio::task::spawn_local(capture_task(server, backend, udp_send, capture_rx)) } async fn capture_task( @@ -65,25 +54,12 @@ async fn capture_task( backend: Option, sender_tx: Sender<(Event, SocketAddr)>, mut notify_rx: Receiver, - frontend_tx: Sender, - release_bind: Vec, ) { loop { - if let Err(e) = do_capture( - backend, - &server, - &sender_tx, - &mut notify_rx, - &frontend_tx, - &release_bind, - ) - .await - { + if let Err(e) = do_capture(backend, &server, &sender_tx, &mut notify_rx).await { log::warn!("input capture exited: {e}"); } - let _ = frontend_tx - .send(FrontendEvent::CaptureStatus(Status::Disabled)) - .await; + server.notify_frontend(FrontendEvent::CaptureStatus(Status::Disabled)); if server.is_cancelled() { break; } @@ -96,8 +72,6 @@ async fn do_capture( server: &Server, sender_tx: &Sender<(Event, SocketAddr)>, notify_rx: &mut Receiver, - frontend_tx: &Sender, - release_bind: &[scancode::Linux], ) -> Result<(), LanMouseCaptureError> { /* allow cancelling capture request */ let mut capture = tokio::select! { @@ -107,9 +81,7 @@ async fn do_capture( _ = server.cancelled() => return Ok(()), }; - let _ = frontend_tx - .send(FrontendEvent::CaptureStatus(Status::Enabled)) - .await; + server.notify_frontend(FrontendEvent::CaptureStatus(Status::Enabled)); // FIXME DUPLICATES let clients = server @@ -119,22 +91,16 @@ async fn do_capture( .map(|(h, s)| (h, s.clone())) .collect::>(); log::info!("{clients:?}"); - // let clients = server - // .client_manager - // .borrow() - // .get_client_states() - // .map(|(h, (c, _))| (h, c.pos)) - // .collect::>(); - // for (handle, pos) in clients { - // capture.create(handle, pos.into()).await?; - // } + for (handle, (config, _state)) in clients { + capture.create(handle, config.pos.into()).await?; + } let mut pressed_keys = HashSet::new(); loop { tokio::select! { event = capture.next() => { match event { - Some(Ok(event)) => handle_capture_event(server, &mut capture, sender_tx, event, &mut pressed_keys, release_bind).await?, + Some(Ok(event)) => handle_capture_event(server, &mut capture, sender_tx, event, &mut pressed_keys).await?, Some(Err(e)) => return Err(e.into()), None => return Ok(()), } @@ -176,7 +142,6 @@ async fn handle_capture_event( sender_tx: &Sender<(Event, SocketAddr)>, event: (CaptureHandle, Event), pressed_keys: &mut HashSet, - release_bind: &[scancode::Linux], ) -> Result<(), CaptureError> { let (handle, mut e) = event; log::trace!("({handle}) {e:?}"); @@ -184,7 +149,7 @@ async fn handle_capture_event( if let Event::Keyboard(KeyboardEvent::Key { key, state, .. }) = e { update_pressed_keys(pressed_keys, key, state); log::debug!("{pressed_keys:?}"); - if release_bind.iter().all(|k| pressed_keys.contains(k)) { + if server.release_bind.iter().all(|k| pressed_keys.contains(k)) { pressed_keys.clear(); log::info!("releasing pointer"); capture.release().await?; diff --git a/src/server/emulation_task.rs b/src/server/emulation_task.rs index 3214b2f..a1b21a5 100644 --- a/src/server/emulation_task.rs +++ b/src/server/emulation_task.rs @@ -8,7 +8,6 @@ use tokio::{ use crate::{ client::{ClientHandle, ClientManager}, - config::EmulationBackend, frontend::{FrontendEvent, Status}, server::State, }; @@ -33,22 +32,12 @@ pub enum EmulationEvent { pub fn new( server: Server, - backend: Option, emulation_rx: Receiver, udp_rx: Receiver>, sender_tx: Sender<(Event, SocketAddr)>, capture_tx: Sender, - frontend_tx: Sender, ) -> JoinHandle<()> { - let emulation_task = emulation_task( - backend, - emulation_rx, - server, - udp_rx, - sender_tx, - capture_tx, - frontend_tx, - ); + let emulation_task = emulation_task(server, emulation_rx, udp_rx, sender_tx, capture_tx); tokio::task::spawn_local(emulation_task) } @@ -61,34 +50,22 @@ pub enum LanMouseEmulationError { } async fn emulation_task( - backend: Option, - mut rx: Receiver, server: Server, + mut rx: Receiver, mut udp_rx: Receiver>, sender_tx: Sender<(Event, SocketAddr)>, capture_tx: Sender, - frontend_tx: Sender, ) { loop { - match do_emulation( - &server, - backend, - &mut rx, - &mut udp_rx, - &sender_tx, - &capture_tx, - &frontend_tx, - ) - .await - { + match do_emulation(&server, &mut rx, &mut udp_rx, &sender_tx, &capture_tx).await { Ok(()) => {} Err(e) => { log::warn!("input emulation exited: {e}"); } } - let _ = frontend_tx - .send(FrontendEvent::EmulationStatus(Status::Disabled)) - .await; + let emulation_disabled = FrontendEvent::EmulationStatus(Status::Disabled); + server.notify_frontend(emulation_disabled); + if server.notifies.cancel.is_cancelled() { break; } @@ -100,14 +77,12 @@ async fn emulation_task( async fn do_emulation( server: &Server, - backend: Option, rx: &mut Receiver, udp_rx: &mut Receiver>, sender_tx: &Sender<(Event, SocketAddr)>, capture_tx: &Sender, - frontend_tx: &Sender, ) -> Result<(), LanMouseEmulationError> { - let backend = backend.map(|b| b.into()); + let backend = server.config.emulation_backend.map(|b| b.into()); log::info!("creating input emulation..."); let mut emulation = tokio::select! { r = input_emulation::create(backend) => { @@ -115,25 +90,17 @@ async fn do_emulation( } _ = server.cancelled() => return Ok(()), }; - let _ = frontend_tx - .send(FrontendEvent::EmulationStatus(Status::Enabled)) - .await; + let emulation_enabled = FrontendEvent::EmulationStatus(Status::Enabled); + server.notify_frontend(emulation_enabled); let res = do_emulation_session(server, &mut emulation, rx, udp_rx, sender_tx, capture_tx).await; emulation.terminate().await; res?; - // FIXME DUPLICATES // add clients - // let clients = server - // .client_manager - // .borrow() - // .get_client_states() - // .map(|(h, _)| h) - // .collect::>(); - // for handle in clients { - // emulation.create(handle).await; - // } + for handle in server.active_clients() { + emulation.create(handle).await; + } // release potentially still pressed keys release_all_keys(server, &mut emulation).await?; diff --git a/src/server/frontend_task.rs b/src/server/frontend_task.rs deleted file mode 100644 index d588dc9..0000000 --- a/src/server/frontend_task.rs +++ /dev/null @@ -1,364 +0,0 @@ -use std::{ - collections::HashSet, - io::ErrorKind, - net::{IpAddr, SocketAddr}, -}; -#[cfg(unix)] -use tokio::net::UnixStream; - -#[cfg(windows)] -use tokio::net::TcpStream; - -use tokio::{ - io::ReadHalf, - sync::mpsc::{Receiver, Sender}, - task::JoinHandle, -}; - -use crate::{ - client::{ClientHandle, Position}, - frontend::{self, FrontendEvent, FrontendListener, FrontendRequest}, -}; - -use super::{ - capture_task::CaptureEvent, emulation_task::EmulationEvent, resolver_task::DnsRequest, Server, -}; - -pub(crate) fn new( - server: Server, - mut frontend: FrontendListener, - mut event: Receiver, - request_tx: Sender, - mut request_rx: Receiver, - capture: Sender, - emulate: Sender, - resolve_ch: Sender, - port_tx: Sender, -) -> JoinHandle<()> { - let request_tx = request_tx.clone(); - tokio::task::spawn_local(async move { - let mut join_handles = vec![]; - loop { - tokio::select! { - stream = frontend.accept() => { - match stream { - Ok(s) => join_handles.push(handle_frontend_stream(server.clone(), &request_tx, s)), - Err(e) => log::warn!("error accepting frontend connection: {e}"), - }; - } - request = request_rx.recv() => { - let request = request.expect("frontend request channel closed"); - if handle_frontend_event(&server, &capture, &emulate, &resolve_ch, &mut frontend, &port_tx, request).await { - break; - } - } - event = event.recv() => { - let event = event.expect("channel closed"); - let _ = frontend.broadcast_event(event).await; - } - _ = server.cancelled() => { - futures::future::join_all(join_handles).await; - break; - } - } - } - }) -} - -fn handle_frontend_stream( - server: Server, - request_tx: &Sender, - #[cfg(unix)] stream: ReadHalf, - #[cfg(windows)] stream: ReadHalf, -) -> JoinHandle<()> { - let tx = request_tx.clone(); - tokio::task::spawn_local(async move { - tokio::select! { - _ = listen_frontend(tx, stream) => {}, - _ = server.cancelled() => {}, - } - }) -} - -async fn listen_frontend( - request_tx: Sender, - #[cfg(unix)] mut stream: ReadHalf, - #[cfg(windows)] mut stream: ReadHalf, -) { - use std::io; - loop { - let request = frontend::wait_for_request(&mut stream).await; - match request { - Ok(request) => { - let _ = request_tx.send(request).await; - } - Err(e) => { - if let Some(e) = e.downcast_ref::() { - if e.kind() == ErrorKind::UnexpectedEof { - return; - } - } - log::error!("error reading frontend event: {e}"); - return; - } - } - } -} - -async fn handle_frontend_event( - server: &Server, - capture: &Sender, - emulate: &Sender, - resolve_tx: &Sender, - frontend: &mut FrontendListener, - port_tx: &Sender, - event: FrontendRequest, -) -> bool { - log::debug!("frontend: {event:?}"); - match event { - FrontendRequest::EnableCapture => { - log::info!("received capture enable request"); - server.notify_capture(); - } - FrontendRequest::EnableEmulation => { - log::info!("received emulation enable request"); - server.notify_emulation(); - } - FrontendRequest::Create => { - let handle = add_client(server, frontend).await; - resolve_dns(server, resolve_tx, handle).await; - } - FrontendRequest::Activate(handle, active) => { - if active { - activate_client(server, capture, emulate, handle).await; - } else { - deactivate_client(server, capture, emulate, handle).await; - } - } - FrontendRequest::ChangePort(port) => { - let _ = port_tx.send(port).await; - } - FrontendRequest::Delete(handle) => { - remove_client(server, capture, emulate, handle).await; - broadcast(frontend, FrontendEvent::Deleted(handle)).await; - } - FrontendRequest::Enumerate() => { - let clients = server - .client_manager - .borrow() - .get_client_states() - .map(|(h, (c, s))| (h, c.clone(), s.clone())) - .collect(); - broadcast(frontend, FrontendEvent::Enumerate(clients)).await; - } - FrontendRequest::GetState(handle) => { - broadcast_client(server, frontend, handle).await; - } - FrontendRequest::UpdateFixIps(handle, fix_ips) => { - update_fix_ips(server, handle, fix_ips).await; - resolve_dns(server, resolve_tx, handle).await; - } - FrontendRequest::UpdateHostname(handle, hostname) => { - update_hostname(server, resolve_tx, handle, hostname).await; - resolve_dns(server, resolve_tx, handle).await; - } - FrontendRequest::UpdatePort(handle, port) => { - update_port(server, handle, port).await; - } - FrontendRequest::UpdatePosition(handle, pos) => { - update_pos(server, handle, capture, emulate, pos).await; - } - FrontendRequest::ResolveDns(handle) => { - resolve_dns(server, resolve_tx, handle).await; - } - }; - false -} - -async fn resolve_dns(server: &Server, resolve_tx: &Sender, handle: ClientHandle) { - let hostname = server - .client_manager - .borrow() - .get(handle) - .and_then(|(c, _)| c.hostname.clone()); - if let Some(hostname) = hostname { - let _ = resolve_tx - .send(DnsRequest { - hostname: hostname.clone(), - handle, - }) - .await; - } -} - -async fn broadcast(frontend: &mut FrontendListener, event: FrontendEvent) { - if let Err(e) = frontend.broadcast_event(event).await { - log::error!("error notifying frontend: {e}"); - } -} - -pub async fn add_client(server: &Server, frontend: &mut FrontendListener) -> ClientHandle { - let handle = server.client_manager.borrow_mut().add_client(); - log::info!("added client {handle}"); - - let (c, s) = server.client_manager.borrow().get(handle).unwrap().clone(); - broadcast(frontend, FrontendEvent::Created(handle, c, s)).await; - handle -} - -pub async fn deactivate_client( - server: &Server, - capture: &Sender, - emulate: &Sender, - handle: ClientHandle, -) { - match server.client_manager.borrow_mut().get_mut(handle) { - Some((_, s)) => { - s.active = false; - } - None => return, - }; - - let _ = capture.send(CaptureEvent::Destroy(handle)).await; - let _ = emulate.send(EmulationEvent::Destroy(handle)).await; -} - -pub async fn activate_client( - server: &Server, - capture: &Sender, - emulate: &Sender, - handle: ClientHandle, -) { - /* deactivate potential other client at this position */ - let pos = match server.client_manager.borrow().get(handle) { - Some((client, _)) => client.pos, - None => return, - }; - - let other = server.client_manager.borrow_mut().find_client(pos); - if let Some(other) = other { - if other != handle { - deactivate_client(server, capture, emulate, other).await; - } - } - - /* activate the client */ - if let Some((_, s)) = server.client_manager.borrow_mut().get_mut(handle) { - s.active = true; - } else { - return; - }; - - /* notify emulation, capture and frontends */ - let _ = capture.send(CaptureEvent::Create(handle, pos.into())).await; - let _ = emulate.send(EmulationEvent::Create(handle)).await; -} - -pub async fn remove_client( - server: &Server, - capture: &Sender, - emulate: &Sender, - handle: ClientHandle, -) { - let Some(active) = server - .client_manager - .borrow_mut() - .remove_client(handle) - .map(|(_, s)| s.active) - else { - return; - }; - - if active { - let _ = capture.send(CaptureEvent::Destroy(handle)).await; - let _ = emulate.send(EmulationEvent::Destroy(handle)).await; - } -} - -async fn update_fix_ips(server: &Server, handle: ClientHandle, fix_ips: Vec) { - let mut client_manager = server.client_manager.borrow_mut(); - let Some((c, _)) = client_manager.get_mut(handle) else { - return; - }; - - c.fix_ips = fix_ips; -} - -async fn update_hostname( - server: &Server, - resolve_tx: &Sender, - handle: ClientHandle, - hostname: Option, -) { - let hostname = { - let mut client_manager = server.client_manager.borrow_mut(); - let Some((c, s)) = client_manager.get_mut(handle) else { - return; - }; - - // update hostname - if c.hostname != hostname { - c.hostname = hostname; - s.ips = HashSet::from_iter(c.fix_ips.iter().cloned()); - s.active_addr = None; - c.hostname.clone() - } else { - None - } - }; - - // resolve to update ips in state - if let Some(hostname) = hostname { - let _ = resolve_tx.send(DnsRequest { hostname, handle }).await; - } -} - -async fn update_port(server: &Server, handle: ClientHandle, port: u16) { - let mut client_manager = server.client_manager.borrow_mut(); - let Some((c, s)) = client_manager.get_mut(handle) else { - return; - }; - - if c.port != port { - c.port = port; - s.active_addr = s.active_addr.map(|a| SocketAddr::new(a.ip(), port)); - } -} - -async fn update_pos( - server: &Server, - handle: ClientHandle, - capture: &Sender, - emulate: &Sender, - pos: Position, -) { - let (changed, active) = { - let mut client_manager = server.client_manager.borrow_mut(); - let Some((c, s)) = client_manager.get_mut(handle) else { - return; - }; - - let changed = c.pos != pos; - c.pos = pos; - (changed, s.active) - }; - - // update state in event input emulator & input capture - if changed { - if active { - let _ = capture.send(CaptureEvent::Destroy(handle)).await; - let _ = emulate.send(EmulationEvent::Destroy(handle)).await; - } - let _ = capture.send(CaptureEvent::Create(handle, pos.into())).await; - let _ = emulate.send(EmulationEvent::Create(handle)).await; - } -} - -async fn broadcast_client(server: &Server, frontend: &mut FrontendListener, handle: ClientHandle) { - let client = server.client_manager.borrow().get(handle).cloned(); - if let Some((config, state)) = client { - broadcast(frontend, FrontendEvent::State(handle, config, state)).await; - } else { - broadcast(frontend, FrontendEvent::NoSuchClient(handle)).await; - } -} diff --git a/src/server/network_task.rs b/src/server/network_task.rs index 8a2023b..78d0c4b 100644 --- a/src/server/network_task.rs +++ b/src/server/network_task.rs @@ -7,7 +7,6 @@ use tokio::{ task::JoinHandle, }; -use crate::frontend::FrontendEvent; use input_event::{Event, ProtocolError}; use super::Server; @@ -16,8 +15,6 @@ pub async fn new( server: Server, udp_recv_tx: Sender>, udp_send_rx: Receiver<(Event, SocketAddr)>, - mut port_rx: Receiver, - frontend_notify_tx: Sender, ) -> io::Result> { // bind the udp socket let listen_addr = SocketAddr::new("0.0.0.0".parse().unwrap(), server.port.get()); @@ -31,42 +28,36 @@ pub async fn new( tokio::select! { _ = udp_receiver => break, /* channel closed */ _ = udp_sender => break, /* channel closed */ - port = port_rx.recv() => match port { - Some(port) => update_port(&server, &frontend_notify_tx, &mut socket, port).await, - _ => continue, - }, + _ = server.notifies.port_changed.notified() => update_port(&server, &mut socket).await, _ = server.cancelled() => break, /* cancellation requested */ } } })) } -async fn update_port( - server: &Server, - frontend_chan: &Sender, - socket: &mut UdpSocket, - port: u16, -) { +async fn update_port(server: &Server, socket: &mut UdpSocket) { + let new_port = server.port.get(); + let current_port = socket.local_addr().expect("socket not bound").port(); + // if port is the same, we dont need to change it - if socket.local_addr().unwrap().port() == port { + if current_port == new_port { return; } - // create new socket - let listen_addr = SocketAddr::new("0.0.0.0".parse().unwrap(), port); - let frontend_event = match UdpSocket::bind(listen_addr).await { + // bind new socket + let listen_addr = SocketAddr::new("0.0.0.0".parse().unwrap(), new_port); + let new_socket = UdpSocket::bind(listen_addr).await; + let err = match new_socket { Ok(new_socket) => { *socket = new_socket; - server.port.replace(port); - FrontendEvent::PortChanged(port, None) - } - Err(e) => { - log::warn!("could not change port: {e}"); - let port = socket.local_addr().unwrap().port(); - FrontendEvent::PortChanged(port, Some(format!("could not change port: {e}"))) + None } + Err(e) => Some(e.to_string()), }; - let _ = frontend_chan.send(frontend_event).await; + + // notify frontend of the actual port + let port = socket.local_addr().expect("socket not bound").port(); + server.notify_port_changed(port, err); } async fn udp_receiver( diff --git a/src/server/resolver_task.rs b/src/server/resolver_task.rs deleted file mode 100644 index 454f3c8..0000000 --- a/src/server/resolver_task.rs +++ /dev/null @@ -1,82 +0,0 @@ -use std::collections::HashSet; - -use tokio::{ - sync::mpsc::{Receiver, Sender}, - task::JoinHandle, -}; - -use crate::{client::ClientHandle, dns::DnsResolver, frontend::FrontendEvent}; - -use super::Server; - -#[derive(Clone)] -pub struct DnsRequest { - pub hostname: String, - pub handle: ClientHandle, -} - -pub fn new( - server: Server, - resolver: DnsResolver, - dns_rx: Receiver, - frontend: Sender, -) -> JoinHandle<()> { - tokio::task::spawn_local(async move { - tokio::select! { - _ = server.cancelled() => {}, - _ = do_dns(&server, resolver, frontend, dns_rx) => {}, - } - }) -} - -async fn do_dns( - server: &Server, - resolver: DnsResolver, - frontend: Sender, - mut dns_rx: Receiver, -) { - loop { - let (host, handle) = match dns_rx.recv().await { - Some(r) => (r.hostname, r.handle), - None => break, - }; - - /* update resolving status */ - if let Some((_, s)) = server.client_manager.borrow_mut().get_mut(handle) { - s.resolving = true; - } - notify_state_change(&frontend, server, handle).await; - - let ips = match resolver.resolve(&host).await { - Ok(ips) => ips, - Err(e) => { - log::warn!("could not resolve host '{host}': {e}"); - vec![] - } - }; - - /* update ips and resolving state */ - if let Some((c, s)) = server.client_manager.borrow_mut().get_mut(handle) { - let mut addrs = HashSet::from_iter(c.fix_ips.iter().cloned()); - for ip in ips { - addrs.insert(ip); - } - s.ips = addrs; - s.resolving = false; - } - notify_state_change(&frontend, server, handle).await; - } -} - -async fn notify_state_change( - frontend: &Sender, - server: &Server, - handle: ClientHandle, -) { - let state = server.client_manager.borrow().get(handle).cloned(); - if let Some((config, state)) = state { - let _ = frontend - .send(FrontendEvent::State(handle, config, state)) - .await; - } -}