diff --git a/src/emulation.rs b/src/emulation.rs index dbdf7f0..ee436e0 100644 --- a/src/emulation.rs +++ b/src/emulation.rs @@ -1,13 +1,14 @@ -use crate::{listen::LanMouseListener, service::Service}; +use crate::listen::{LanMouseListener, ListenerCreationError}; use futures::StreamExt; use input_emulation::{EmulationHandle, InputEmulation, InputEmulationError}; use input_event::Event; -use lan_mouse_ipc::Status; use lan_mouse_proto::{Position, ProtoEvent}; use local_channel::mpsc::{channel, Receiver, Sender}; use std::{ + cell::Cell, collections::HashMap, net::SocketAddr, + rc::Rc, time::{Duration, Instant}, }; use tokio::{ @@ -18,26 +19,82 @@ use tokio::{ /// emulation handling events received from a listener pub(crate) struct Emulation { task: JoinHandle<()>, - release_tx: Sender, + request_tx: Sender, + event_rx: Receiver, +} + +pub(crate) enum EmulationEvent { + /// new connection + Connected { + /// address of the connection + addr: SocketAddr, + /// position of the connection + pos: lan_mouse_ipc::Position, + /// certificate fingerprint of the connection + fingerprint: String, + }, + /// connection closed + Disconnected { addr: SocketAddr }, + /// the port of the listener has changed + PortChanged(Result), + /// emulation was disabled + EmulationDisabled, + /// emulation was enabled + EmulationEnabled, + /// capture should be released + ReleaseNotify, +} + +enum EmulationRequest { + Reenable, + Release(SocketAddr), + ChangePort(u16), + Terminate, } impl Emulation { - pub(crate) fn new(server: Service, listener: LanMouseListener) -> Self { - let emulation_proxy = EmulationProxy::new(server.clone()); - let (release_tx, release_rx) = channel(); - let task = spawn_local(Self::run(server, listener, emulation_proxy, release_rx)); - Self { task, release_tx } + pub(crate) fn new( + backend: Option, + listener: LanMouseListener, + ) -> Self { + let emulation_proxy = EmulationProxy::new(backend); + let (request_tx, request_rx) = channel(); + let (event_tx, event_rx) = channel(); + let task = spawn_local(Self::run(listener, emulation_proxy, request_rx, event_tx)); + Self { + task, + request_tx, + event_rx, + } } - pub(crate) fn notify_release(&self, addr: SocketAddr) { - self.release_tx.send(addr).expect("channel closed"); + pub(crate) fn send_leave_event(&self, addr: SocketAddr) { + self.request_tx + .send(EmulationRequest::Release(addr)) + .expect("channel closed"); + } + + pub(crate) fn reenable(&self) { + self.request_tx + .send(EmulationRequest::Reenable) + .expect("channel closed"); + } + + pub(crate) fn request_port_change(&self, port: u16) { + self.request_tx + .send(EmulationRequest::ChangePort(port)) + .expect("channel closed") + } + + pub(crate) async fn event(&mut self) -> EmulationEvent { + self.event_rx.recv().await.expect("channel closed") } async fn run( - service: Service, mut listener: LanMouseListener, mut emulation_proxy: EmulationProxy, - mut release_rx: Receiver, + mut request_rx: Receiver, + event_tx: Sender, ) { let mut interval = tokio::time::interval(Duration::from_secs(5)); let mut last_response = HashMap::new(); @@ -52,11 +109,11 @@ impl Emulation { last_response.insert(addr, Instant::now()); match event { ProtoEvent::Enter(pos) => { - if let Some(cert) = listener.get_certificate_fingerprint(addr).await { + if let Some(fingerprint) = listener.get_certificate_fingerprint(addr).await { log::info!("releasing capture: {addr} entered this device"); - service.release_capture(); + event_tx.send(EmulationEvent::ReleaseNotify).expect("channel closed"); listener.reply(addr, ProtoEvent::Ack(0)).await; - service.register_incoming(addr, to_ipc_pos(pos), cert); + event_tx.send(EmulationEvent::Connected{addr, pos: to_ipc_pos(pos), fingerprint}).expect("channel closed"); } } ProtoEvent::Leave(_) => { @@ -64,28 +121,37 @@ impl Emulation { listener.reply(addr, ProtoEvent::Ack(0)).await; } ProtoEvent::Input(event) => emulation_proxy.consume(event, addr), - ProtoEvent::Ping => listener.reply(addr, ProtoEvent::Pong(service.emulation_status.get() == Status::Enabled)).await, + ProtoEvent::Ping => listener.reply(addr, ProtoEvent::Pong(emulation_proxy.emulation_active.get())).await, _ => {} } } - addr = release_rx.recv() => { - // notify the other end that we hit a barrier (should release capture) - let addr = addr.expect("channel closed"); - listener.reply(addr, ProtoEvent::Leave(0)).await; + event = emulation_proxy.event() => { + event_tx.send(event).expect("channel closed"); } + request = request_rx.recv() => match request.expect("channel closed") { + // reenable emulation + EmulationRequest::Reenable => emulation_proxy.reenable(), + // notify the other end that we hit a barrier (should release capture) + EmulationRequest::Release(addr) => listener.reply(addr, ProtoEvent::Leave(0)).await, + EmulationRequest::ChangePort(port) => { + listener.request_port_change(port); + let result = listener.port_changed().await; + event_tx.send(EmulationEvent::PortChanged(result)).expect("channel closed"); + } + EmulationRequest::Terminate => break, + }, _ = interval.tick() => { last_response.retain(|&addr,instant| { if instant.elapsed() > Duration::from_secs(5) { log::warn!("releasing keys: {addr} not responding!"); emulation_proxy.release_keys(addr); - service.deregister_incoming(addr); + event_tx.send(EmulationEvent::Disconnected { addr }).expect("channel closed"); false } else { true } }); } - _ = service.cancelled() => break, } } listener.terminate().await; @@ -95,6 +161,9 @@ impl Emulation { /// wait for termination pub(crate) async fn terminate(&mut self) { log::debug!("terminating emulation"); + self.request_tx + .send(EmulationRequest::Terminate) + .expect("channel closed"); if let Err(e) = (&mut self.task).await { log::warn!("{e}"); } @@ -104,111 +173,166 @@ impl Emulation { /// proxy handling the actual input emulation, /// discarding events when it is disabled pub(crate) struct EmulationProxy { - server: Service, - tx: Sender<(ProxyEvent, SocketAddr)>, + emulation_active: Rc>, + request_tx: Sender, + event_rx: Receiver, task: JoinHandle<()>, } -enum ProxyEvent { - Input(Event), - ReleaseKeys, +enum ProxyRequest { + Input(Event, SocketAddr), + ReleaseKeys(SocketAddr), + Terminate, + Reenable, } impl EmulationProxy { - fn new(server: Service) -> Self { - let (tx, rx) = channel(); - let task = spawn_local(Self::emulation_task(server.clone(), rx)); - Self { server, tx, task } + fn new(backend: Option) -> Self { + let (request_tx, request_rx) = channel(); + let (event_tx, event_rx) = channel(); + let emulation_active = Rc::new(Cell::new(false)); + let task = spawn_local(Self::emulation_task(backend, request_rx, event_tx)); + Self { + emulation_active, + request_tx, + task, + event_rx, + } + } + + async fn event(&mut self) -> EmulationEvent { + let event = self.event_rx.recv().await.expect("channel closed"); + if let EmulationEvent::EmulationEnabled = event { + self.emulation_active.replace(true); + } + if let EmulationEvent::EmulationDisabled = event { + self.emulation_active.replace(false); + } + event } fn consume(&self, event: Event, addr: SocketAddr) { // ignore events if emulation is currently disabled - if let Status::Enabled = self.server.emulation_status.get() { - self.tx - .send((ProxyEvent::Input(event), addr)) + if self.emulation_active.get() { + self.request_tx + .send(ProxyRequest::Input(event, addr)) .expect("channel closed"); } } fn release_keys(&self, addr: SocketAddr) { - self.tx - .send((ProxyEvent::ReleaseKeys, addr)) + self.request_tx + .send(ProxyRequest::ReleaseKeys(addr)) .expect("channel closed"); } - async fn emulation_task(server: Service, mut rx: Receiver<(ProxyEvent, SocketAddr)>) { + async fn emulation_task( + backend: Option, + mut request_rx: Receiver, + event_tx: Sender, + ) { let mut handles = HashMap::new(); let mut next_id = 0; loop { - if let Err(e) = Self::do_emulation(&server, &mut handles, &mut next_id, &mut rx).await { + if let Err(e) = Self::do_emulation( + backend, + &mut handles, + &mut next_id, + &mut request_rx, + &event_tx, + ) + .await + { log::warn!("input emulation exited: {e}"); } - tokio::select! { - _ = server.emulation_notified() => {}, - _ = server.cancelled() => return, + // wait for reenable request + loop { + match request_rx.recv().await.expect("channel closed") { + ProxyRequest::Reenable => break, + ProxyRequest::Terminate => return, + ProxyRequest::Input(..) => { /* emulation inactive => ignore */ } + ProxyRequest::ReleaseKeys(..) => { /* emulation inactive => ignore */ } + } } } } async fn do_emulation( - server: &Service, + backend: Option, handles: &mut HashMap, next_id: &mut EmulationHandle, - rx: &mut Receiver<(ProxyEvent, SocketAddr)>, + request_rx: &mut Receiver, + event_tx: &Sender, ) -> Result<(), InputEmulationError> { - let backend = server.config.emulation_backend.map(|b| b.into()); log::info!("creating input emulation ..."); let mut emulation = tokio::select! { r = InputEmulation::new(backend) => r?, - _ = server.cancelled() => return Ok(()), + // allow termination event while requesting input emulation + _ = wait_for_termination(request_rx) => return Ok(()), }; - server.set_emulation_status(Status::Enabled); + event_tx + .send(EmulationEvent::EmulationEnabled) + .expect("channel closed"); // create active handles for &handle in handles.values() { emulation.create(handle).await; } - let res = Self::do_emulation_session(server, &mut emulation, handles, next_id, rx).await; + let res = Self::do_emulation_session(&mut emulation, handles, next_id, request_rx).await; // FIXME replace with async drop when stabilized emulation.terminate().await; - server.set_emulation_status(Status::Disabled); + event_tx + .send(EmulationEvent::EmulationDisabled) + .expect("channel closed"); res } async fn do_emulation_session( - server: &Service, emulation: &mut InputEmulation, handles: &mut HashMap, next_id: &mut EmulationHandle, - rx: &mut Receiver<(ProxyEvent, SocketAddr)>, + rx: &mut Receiver, ) -> Result<(), InputEmulationError> { loop { tokio::select! { - e = rx.recv() => { - let (event, addr) = e.expect("channel closed"); - let handle = match handles.get(&addr) { - Some(&handle) => handle, - None => { - let handle = *next_id; - *next_id += 1; - emulation.create(handle).await; - handles.insert(addr, handle); - handle + e = rx.recv() => match e.expect("channel closed") { + ProxyRequest::Input(event, addr) => { + let handle = match handles.get(&addr) { + Some(&handle) => handle, + None => { + let handle = *next_id; + *next_id += 1; + emulation.create(handle).await; + handles.insert(addr, handle); + handle + } + }; + emulation.consume(event, handle).await?; + }, + ProxyRequest::ReleaseKeys(addr) => { + if let Some(&handle) = handles.get(&addr) { + emulation.release_keys(handle).await? } - }; - match event { - ProxyEvent::Input(event) => emulation.consume(event, handle).await?, - ProxyEvent::ReleaseKeys => emulation.release_keys(handle).await?, } - } - _ = server.cancelled() => break Ok(()), + ProxyRequest::Terminate => break Ok(()), + ProxyRequest::Reenable => continue, + }, } } } + fn reenable(&self) { + self.request_tx + .send(ProxyRequest::Reenable) + .expect("channel closed"); + } + async fn terminate(&mut self) { + self.request_tx + .send(ProxyRequest::Terminate) + .expect("channel closed"); let _ = (&mut self.task).await; } } @@ -221,3 +345,14 @@ fn to_ipc_pos(pos: Position) -> lan_mouse_ipc::Position { Position::Bottom => lan_mouse_ipc::Position::Bottom, } } + +async fn wait_for_termination(rx: &mut Receiver) { + loop { + match rx.recv().await.expect("channel closed") { + ProxyRequest::Terminate => return, + ProxyRequest::Input(_, _) => continue, + ProxyRequest::ReleaseKeys(_) => continue, + ProxyRequest::Reenable => continue, + } + } +} diff --git a/src/listen.rs b/src/listen.rs index 9a288d3..b80b2ea 100644 --- a/src/listen.rs +++ b/src/listen.rs @@ -39,6 +39,8 @@ pub(crate) struct LanMouseListener { listen_tx: Sender<(ProtoEvent, SocketAddr)>, listen_task: JoinHandle<()>, conns: Rc>>, + request_port_change: Sender, + port_changed: Receiver>, } type VerifyPeerCertificateFn = Arc< @@ -54,8 +56,10 @@ impl LanMouseListener { authorized_keys: Arc>>, ) -> Result { let (listen_tx, listen_rx) = channel(); + let (request_port_change, mut request_port_change_rx) = channel(); + let (port_changed_tx, port_changed) = channel(); - let listen_addr = SocketAddr::new("0.0.0.0".parse().expect("invalid ip"), port); + let authorized = authorized_keys.clone(); let verify_peer_certificate: Option = Some(Arc::new( move |certs: &[Vec], _chains: &[CertificateDer<'static>]| { assert!(certs.len() == 1); @@ -63,7 +67,7 @@ impl LanMouseListener { .iter() .map(|c| crypto::generate_fingerprint(c)) .collect::>(); - if authorized_keys + if authorized .read() .expect("lock") .contains_key(&fingerprints[0]) @@ -75,37 +79,51 @@ impl LanMouseListener { }, )); let cfg = Config { - certificates: vec![cert], + certificates: vec![cert.clone()], extended_master_secret: ExtendedMasterSecretType::Require, client_auth: RequireAnyClientCert, verify_peer_certificate, ..Default::default() }; - let listener = listen(listen_addr, cfg).await?; + let listen_addr = SocketAddr::new("0.0.0.0".parse().expect("invalid ip"), port); + let mut listener = listen(listen_addr, cfg.clone()).await?; let conns: Rc>> = Rc::new(Mutex::new(Vec::new())); let conns_clone = conns.clone(); - let tx = listen_tx.clone(); let listen_task: JoinHandle<()> = spawn_local(async move { loop { let sleep = tokio::time::sleep(Duration::from_secs(2)); - let (conn, addr) = tokio::select! { + tokio::select! { + /* workaround for https://github.com/webrtc-rs/webrtc/issues/614 */ _ = sleep => continue, c = listener.accept() => match c { - Ok(c) => c, - Err(e) => { - log::warn!("accept: {e}"); - continue; - } + Ok((conn, addr)) => { + log::info!("dtls client connected, ip: {addr}"); + let mut conns = conns_clone.lock().await; + conns.push((addr, conn.clone())); + spawn_local(read_loop(conns_clone.clone(), addr, conn, tx.clone())); + }, + Err(e) => log::warn!("accept: {e}"), + }, + port = request_port_change_rx.recv() => { + let port = port.expect("channel closed"); + let listen_addr = SocketAddr::new("0.0.0.0".parse().expect("invalid ip"), port); + match listen(listen_addr, cfg.clone()).await { + Ok(new_listener) => { + let _ = listener.close().await; + listener = new_listener; + port_changed_tx.send(Ok(port)).expect("channel closed"); + } + Err(e) => { + log::warn!("unable to change port: {e}"); + port_changed_tx.send(Err(e.into())).expect("channel closed"); + } + }; }, }; - log::info!("dtls client connected, ip: {addr}"); - let mut conns = conns_clone.lock().await; - conns.push((addr, conn.clone())); - spawn_local(read_loop(conns_clone.clone(), addr, conn, tx.clone())); } }); @@ -114,9 +132,19 @@ impl LanMouseListener { listen_rx, listen_tx, listen_task, + port_changed, + request_port_change, }) } + pub(crate) fn request_port_change(&mut self, port: u16) { + self.request_port_change.send(port).expect("channel closed"); + } + + pub(crate) async fn port_changed(&mut self) -> Result { + self.port_changed.recv().await.expect("channel closed") + } + pub(crate) async fn terminate(&mut self) { self.listen_task.abort(); let conns = self.conns.lock().await; @@ -126,15 +154,6 @@ impl LanMouseListener { self.listen_tx.close(); } - #[allow(unused)] - pub(crate) async fn broadcast(&self, event: ProtoEvent) { - let (buf, len): ([u8; MAX_EVENT_SIZE], usize) = event.into(); - let conns = self.conns.lock().await; - for (_, conn) in conns.iter() { - let _ = conn.send(&buf[..len]).await; - } - } - pub(crate) async fn reply(&self, addr: SocketAddr, event: ProtoEvent) { log::trace!("reply {event} >=>=>=>=>=> {addr}"); let (buf, len): ([u8; MAX_EVENT_SIZE], usize) = event.into(); diff --git a/src/service.rs b/src/service.rs index 90803c0..930fbbe 100644 --- a/src/service.rs +++ b/src/service.rs @@ -5,7 +5,7 @@ use crate::{ connect::LanMouseConnection, crypto, dns::DnsResolver, - emulation::Emulation, + emulation::{Emulation, EmulationEvent}, listen::{LanMouseListener, ListenerCreationError}, }; use futures::StreamExt; @@ -85,13 +85,10 @@ pub struct Service { #[derive(Default)] struct Notifies { - capture: Notify, - emulation: Notify, + reenable_capture: Notify, incoming: Notify, - port_changed: Notify, frontend_event_pending: Notify, cancel: CancellationToken, - release: Notify, } impl Service { @@ -116,11 +113,6 @@ impl Service { client_manager.set_state(handle, state); } - // task notification tokens - let notifies = Rc::new(Notifies::default()); - - let config = Rc::new(config); - // load certificate let cert = crypto::load_or_generate_key_and_cert(&config.cert_path)?; let public_key_fingerprint = crypto::certificate_fingerprint(&cert); @@ -130,17 +122,18 @@ impl Service { authorized_keys: Arc::new(RwLock::new(config.authorized_fingerprints.clone())), cert, public_key_fingerprint, - config, + config: Rc::new(config), client_manager, pending_incoming: Default::default(), port, - notifies, + notifies: Default::default(), pending_frontend_events: Rc::new(RefCell::new(VecDeque::new())), capture_status: Default::default(), emulation_status: Default::default(), incoming_conn_info: Default::default(), incoming_conns: Default::default(), next_trigger_handle: 0, + requested_port: Default::default(), }; Ok(service) } @@ -160,7 +153,8 @@ impl Service { // input capture + emulation let mut capture = Capture::new(self.clone(), conn); - let mut emulation = Emulation::new(self.clone(), listener); + let emulation_backend = self.config.emulation_backend.map(|b| b.into()); + let mut emulation = Emulation::new(emulation_backend, listener); // create dns resolver let resolver = DnsResolver::new(self.clone())?; @@ -181,7 +175,7 @@ impl Service { None => break, }; log::debug!("received frontend request: {request:?}"); - self.handle_request(&capture, request, &resolver); + self.handle_request(&capture, &emulation, request, &resolver); log::debug!("handled frontend request"); } _ = self.notifies.frontend_event_pending.notified() => { @@ -214,15 +208,34 @@ impl Service { } } }, - _ = self.notifies.release.notified() => { - self.set_active(None); - capture.release(); - } + event = emulation.event() => match event { + EmulationEvent::Connected { addr, pos, fingerprint } => self.register_incoming(addr, pos, fingerprint), + EmulationEvent::Disconnected { addr } => self.deregister_incoming(addr), + EmulationEvent::PortChanged(port) => match port { + Ok(port) => { + self.port.replace(port); + self.notify_frontend(FrontendEvent::PortChanged(port, None)); + }, + Err(e) => self.notify_frontend(FrontendEvent::PortChanged(self.port.get(), Some(format!("{e}")))), + } + EmulationEvent::EmulationDisabled => { + self.emulation_status.replace(Status::Disabled); + self.notify_frontend(FrontendEvent::EmulationStatus(Status::Disabled)); + }, + EmulationEvent::EmulationEnabled => { + self.emulation_status.replace(Status::Enabled); + self.notify_frontend(FrontendEvent::EmulationStatus(Status::Enabled)); + }, + EmulationEvent::ReleaseNotify => { + self.set_active(None); + capture.release(); + } + }, handle = capture.entered() => { // we entered the capture zone for an incoming connection // => notify it that its capture should be released if let Some(incoming) = self.incoming_conn_info.borrow().get(&handle) { - emulation.notify_release(incoming.addr); + emulation.send_leave_event(incoming.addr); } }, _ = self.cancelled() => break, @@ -300,44 +313,30 @@ impl Service { self.notifies.cancel.cancelled().await } - fn notify_capture(&self) { + fn request_capture_reenable(&self) { log::info!("received capture enable request"); - self.notifies.capture.notify_waiters() + self.notifies.reenable_capture.notify_waiters() } pub(crate) async fn capture_enabled(&self) { - self.notifies.capture.notified().await - } - - fn notify_emulation(&self) { - log::info!("received emulation enable request"); - self.notifies.emulation.notify_waiters() - } - - pub(crate) async fn emulation_notified(&self) { - self.notifies.emulation.notified().await - } - - fn request_port_change(&self, port: u16) { - self.port.replace(port); - self.notifies.port_changed.notify_one(); - } - - #[allow(unused)] - fn notify_port_changed(&self, port: u16, msg: Option) { - self.port.replace(port); - self.notify_frontend(FrontendEvent::PortChanged(port, msg)); + self.notifies.reenable_capture.notified().await } pub(crate) fn client_updated(&self, handle: ClientHandle) { self.notify_frontend(FrontendEvent::Changed(handle)); } - fn handle_request(&self, capture: &Capture, event: FrontendRequest, dns: &DnsResolver) -> bool { + fn handle_request( + &self, + capture: &Capture, + emulation: &Emulation, + event: FrontendRequest, + dns: &DnsResolver, + ) -> bool { log::debug!("frontend: {event:?}"); match event { - FrontendRequest::EnableCapture => self.notify_capture(), - FrontendRequest::EnableEmulation => self.notify_emulation(), + FrontendRequest::EnableCapture => self.request_capture_reenable(), + FrontendRequest::EnableEmulation => emulation.reenable(), FrontendRequest::Create => { self.add_client(); } @@ -348,7 +347,7 @@ impl Service { self.deactivate_client(capture, handle); } } - FrontendRequest::ChangePort(port) => self.request_port_change(port), + FrontendRequest::ChangePort(port) => emulation.request_port_change(port), FrontendRequest::Delete(handle) => { self.remove_client(capture, handle); self.notify_frontend(FrontendEvent::Deleted(handle)); @@ -492,12 +491,6 @@ impl Service { self.notify_frontend(event); } - pub(crate) fn set_emulation_status(&self, status: Status) { - self.emulation_status.replace(status); - let status = FrontendEvent::EmulationStatus(status); - self.notify_frontend(status); - } - pub(crate) fn set_capture_status(&self, status: Status) { self.capture_status.replace(status); let status = FrontendEvent::CaptureStatus(status); @@ -509,10 +502,6 @@ impl Service { self.client_updated(handle); } - pub(crate) fn release_capture(&self) { - self.notifies.release.notify_one(); - } - pub(crate) fn set_active(&self, handle: Option) { self.active.replace(handle); }