From 095937e94308a5ec2b2c62bf7ffa32ccbdd43598 Mon Sep 17 00:00:00 2001 From: Ferdinand Schober Date: Mon, 28 Oct 2024 17:44:21 +0100 Subject: [PATCH] fix service exit --- src/capture.rs | 75 ++++++++++++++++++--------- src/emulation.rs | 15 +++++- src/service.rs | 130 +++++++++++++++++++++-------------------------- 3 files changed, 124 insertions(+), 96 deletions(-) diff --git a/src/capture.rs b/src/capture.rs index 6ecfb3e..f3367d9 100644 --- a/src/capture.rs +++ b/src/capture.rs @@ -19,8 +19,9 @@ use tokio::{ use crate::{connect::LanMouseConnection, service::Service}; pub(crate) struct Capture { + exit_requested: Rc>, _active: Rc>>, - tx: Sender, + request_tx: Sender, task: JoinHandle<()>, event_rx: Receiver, } @@ -54,33 +55,37 @@ impl Capture { conn: LanMouseConnection, service: Service, ) -> Self { - let (tx, rx) = channel(); + let (request_tx, request_rx) = channel(); let (event_tx, event_rx) = channel(); let active = Rc::new(Cell::new(None)); + let exit_requested = Rc::new(Cell::new(false)); let task = spawn_local(Self::run( active.clone(), + exit_requested.clone(), service, backend, - rx, + request_rx, conn, event_tx, )); Self { _active: active, - tx, + exit_requested, + request_tx, task, event_rx, } } pub(crate) fn reenable(&self) { - self.tx + self.request_tx .send(CaptureRequest::Reenable) .expect("channel closed"); } pub(crate) async fn terminate(&mut self) { - self.tx + self.exit_requested.replace(true); + self.request_tx .send(CaptureRequest::Terminate) .expect("channel closed"); log::debug!("terminating capture"); @@ -90,19 +95,19 @@ impl Capture { } pub(crate) fn create(&self, handle: CaptureHandle, pos: lan_mouse_ipc::Position) { - self.tx + self.request_tx .send(CaptureRequest::Create(handle, to_capture_pos(pos))) .expect("channel closed"); } pub(crate) fn destroy(&self, handle: CaptureHandle) { - self.tx + self.request_tx .send(CaptureRequest::Destroy(handle)) .expect("channel closed"); } pub(crate) fn release(&self) { - self.tx + self.request_tx .send(CaptureRequest::Release) .expect("channel closed"); } @@ -113,9 +118,10 @@ impl Capture { async fn run( active: Rc>>, + exit_requested: Rc>, service: Service, backend: Option, - mut rx: Receiver, + mut request_rx: Receiver, mut conn: LanMouseConnection, mut event_tx: Sender, ) { @@ -125,18 +131,18 @@ impl Capture { &service, backend, &mut conn, - &mut rx, + &mut request_rx, &mut event_tx, ) .await { log::warn!("input capture exited: {e}"); } - event_tx - .send(ICaptureEvent::CaptureDisabled) - .expect("channel closed"); + if exit_requested.get() { + break; + } loop { - match rx.recv().await.expect("channel closed") { + match request_rx.recv().await.expect("channel closed") { CaptureRequest::Reenable => break, CaptureRequest::Terminate => return, _ => {} @@ -151,13 +157,13 @@ async fn do_capture( service: &Service, backend: Option, conn: &mut LanMouseConnection, - rx: &mut Receiver, + request_rx: &mut Receiver, event_tx: &mut Sender, ) -> Result<(), InputCaptureError> { /* allow cancelling capture request */ let mut capture = tokio::select! { r = InputCapture::new(backend) => r?, - _ = wait_for_termination(rx) => return Ok(()), + _ = wait_for_termination(request_rx) => return Ok(()), }; event_tx .send(ICaptureEvent::CaptureEnabled) @@ -179,12 +185,35 @@ async fn do_capture( capture.create(handle, to_capture_pos(pos)).await?; } + let res = do_capture_session(active, &mut capture, conn, event_tx, request_rx, service).await; + // FIXME replace with async drop when stabilized + let res1 = capture.terminate().await; + + // handle errors + res?; + res1?; + + event_tx + .send(ICaptureEvent::CaptureDisabled) + .expect("channel closed"); + + Ok(()) +} + +async fn do_capture_session( + active: &Cell>, + capture: &mut InputCapture, + conn: &mut LanMouseConnection, + event_tx: &Sender, + request_rx: &mut Receiver, + service: &Service, +) -> Result<(), InputCaptureError> { let mut state = State::WaitingForAck; loop { tokio::select! { event = capture.next() => match event { - Some(event) => handle_capture_event(active, &service, &mut capture, conn, event?, &mut state, event_tx).await?, + Some(event) => handle_capture_event(active, service, capture, conn, event?, &mut state, event_tx).await?, None => return Ok(()), }, (handle, event) = conn.recv() => { @@ -205,22 +234,20 @@ async fn do_capture( // client disconnected ProtoEvent::Leave(_) => { log::info!("releasing capture: left remote client device region"); - release_capture(&mut capture, &active).await?; + release_capture(capture, active).await?; }, _ => {} } }, - e = rx.recv() => match e.expect("channel closed") { + e = request_rx.recv() => match e.expect("channel closed") { CaptureRequest::Reenable => { /* already active */ }, - CaptureRequest::Release => release_capture(&mut capture, &active).await?, + CaptureRequest::Release => release_capture(capture, active).await?, CaptureRequest::Create(h, p) => capture.create(h, p).await?, CaptureRequest::Destroy(h) => capture.destroy(h).await?, CaptureRequest::Terminate => break, } } } - - capture.terminate().await?; Ok(()) } @@ -258,7 +285,7 @@ async fn handle_capture_event( conn: &LanMouseConnection, event: (CaptureHandle, CaptureEvent), state: &mut State, - event_tx: &mut Sender, + event_tx: &Sender, ) -> Result<(), CaptureError> { let (handle, event) = event; log::trace!("({handle}): {event:?}"); diff --git a/src/emulation.rs b/src/emulation.rs index ee436e0..d1bc942 100644 --- a/src/emulation.rs +++ b/src/emulation.rs @@ -174,6 +174,7 @@ impl Emulation { /// discarding events when it is disabled pub(crate) struct EmulationProxy { emulation_active: Rc>, + exit_requested: Rc>, request_tx: Sender, event_rx: Receiver, task: JoinHandle<()>, @@ -191,9 +192,16 @@ impl EmulationProxy { let (request_tx, request_rx) = channel(); let (event_tx, event_rx) = channel(); let emulation_active = Rc::new(Cell::new(false)); - let task = spawn_local(Self::emulation_task(backend, request_rx, event_tx)); + let exit_requested = Rc::new(Cell::new(false)); + let task = spawn_local(Self::emulation_task( + backend, + exit_requested.clone(), + request_rx, + event_tx, + )); Self { emulation_active, + exit_requested, request_tx, task, event_rx, @@ -228,6 +236,7 @@ impl EmulationProxy { async fn emulation_task( backend: Option, + exit_requested: Rc>, mut request_rx: Receiver, event_tx: Sender, ) { @@ -245,6 +254,9 @@ impl EmulationProxy { { log::warn!("input emulation exited: {e}"); } + if exit_requested.get() { + break; + } // wait for reenable request loop { match request_rx.recv().await.expect("channel closed") { @@ -330,6 +342,7 @@ impl EmulationProxy { } async fn terminate(&mut self) { + self.exit_requested.replace(true); self.request_tx .send(ProxyRequest::Terminate) .expect("channel closed"); diff --git a/src/service.rs b/src/service.rs index 08e4d8a..dd2bd67 100644 --- a/src/service.rs +++ b/src/service.rs @@ -171,9 +171,61 @@ impl Service { } None => break, }; - log::debug!("received frontend request: {request:?}"); - self.handle_request(&capture, &emulation, request, &resolver); - log::debug!("handled frontend request"); + match request { + FrontendRequest::EnableCapture => capture.reenable(), + FrontendRequest::EnableEmulation => emulation.reenable(), + FrontendRequest::Create => { + self.add_client(); + } + FrontendRequest::Activate(handle, active) => { + if active { + if let Some(hostname) = self.client_manager.get_hostname(handle) { + resolver.resolve(handle, hostname); + } + self.activate_client(&capture, handle); + } else { + self.deactivate_client(&capture, handle); + } + } + FrontendRequest::ChangePort(port) => emulation.request_port_change(port), + FrontendRequest::Delete(handle) => { + self.remove_client(&capture, handle); + self.notify_frontend(FrontendEvent::Deleted(handle)); + } + FrontendRequest::Enumerate() => self.enumerate(), + FrontendRequest::GetState(handle) => self.broadcast_client(handle), + FrontendRequest::UpdateFixIps(handle, fix_ips) => self.update_fix_ips(handle, fix_ips), + FrontendRequest::UpdateHostname(handle, host) => { + self.update_hostname(handle, host, &resolver) + } + FrontendRequest::UpdatePort(handle, port) => self.update_port(handle, port), + FrontendRequest::UpdatePosition(handle, pos) => { + self.update_pos(handle, &capture, pos); + } + FrontendRequest::ResolveDns(handle) => { + if let Some(hostname) = self.client_manager.get_hostname(handle) { + resolver.resolve(handle, hostname); + } + } + FrontendRequest::Sync => { + self.enumerate(); + self.notify_frontend(FrontendEvent::EmulationStatus(self.emulation_status.get())); + self.notify_frontend(FrontendEvent::CaptureStatus(self.capture_status.get())); + self.notify_frontend(FrontendEvent::PortChanged(self.port.get(), None)); + self.notify_frontend(FrontendEvent::PublicKeyFingerprint( + self.public_key_fingerprint.clone(), + )); + self.notify_frontend(FrontendEvent::AuthorizedUpdated( + self.authorized_keys.read().expect("lock").clone(), + )); + } + FrontendRequest::AuthorizeKey(desc, fp) => { + self.add_authorized_key(desc, fp); + } + FrontendRequest::RemoveAuthorizedKey(key) => { + self.remove_authorized_key(key); + } + } } _ = self.notifies.frontend_event_pending.notified() => { while let Some(event) = { @@ -262,10 +314,12 @@ impl Service { } } - log::info!("terminating service"); - + log::info!("terminating service ..."); + log::info!("terminating capture ..."); capture.terminate().await; + log::info!("terminating emulation ..."); emulation.terminate().await; + log::info!("terminating dns resolver ..."); resolver.terminate().await; Ok(()) @@ -324,72 +378,6 @@ impl Service { self.notify_frontend(FrontendEvent::Changed(handle)); } - fn handle_request( - &self, - capture: &Capture, - emulation: &Emulation, - event: FrontendRequest, - dns: &DnsResolver, - ) -> bool { - log::debug!("frontend: {event:?}"); - match event { - FrontendRequest::EnableCapture => capture.reenable(), - FrontendRequest::EnableEmulation => emulation.reenable(), - FrontendRequest::Create => { - self.add_client(); - } - FrontendRequest::Activate(handle, active) => { - if active { - if let Some(hostname) = self.client_manager.get_hostname(handle) { - dns.resolve(handle, hostname); - } - self.activate_client(capture, handle); - } else { - self.deactivate_client(capture, handle); - } - } - FrontendRequest::ChangePort(port) => emulation.request_port_change(port), - FrontendRequest::Delete(handle) => { - self.remove_client(capture, handle); - self.notify_frontend(FrontendEvent::Deleted(handle)); - } - FrontendRequest::Enumerate() => self.enumerate(), - FrontendRequest::GetState(handle) => self.broadcast_client(handle), - FrontendRequest::UpdateFixIps(handle, fix_ips) => self.update_fix_ips(handle, fix_ips), - FrontendRequest::UpdateHostname(handle, host) => { - self.update_hostname(handle, host, dns) - } - FrontendRequest::UpdatePort(handle, port) => self.update_port(handle, port), - FrontendRequest::UpdatePosition(handle, pos) => { - self.update_pos(handle, capture, pos); - } - FrontendRequest::ResolveDns(handle) => { - if let Some(hostname) = self.client_manager.get_hostname(handle) { - dns.resolve(handle, hostname); - } - } - FrontendRequest::Sync => { - self.enumerate(); - self.notify_frontend(FrontendEvent::EmulationStatus(self.emulation_status.get())); - self.notify_frontend(FrontendEvent::CaptureStatus(self.capture_status.get())); - self.notify_frontend(FrontendEvent::PortChanged(self.port.get(), None)); - self.notify_frontend(FrontendEvent::PublicKeyFingerprint( - self.public_key_fingerprint.clone(), - )); - self.notify_frontend(FrontendEvent::AuthorizedUpdated( - self.authorized_keys.read().expect("lock").clone(), - )); - } - FrontendRequest::AuthorizeKey(desc, fp) => { - self.add_authorized_key(desc, fp); - } - FrontendRequest::RemoveAuthorizedKey(key) => { - self.remove_authorized_key(key); - } - }; - false - } - fn add_authorized_key(&self, desc: String, fp: String) { self.authorized_keys.write().expect("lock").insert(fp, desc); self.notify_frontend(FrontendEvent::AuthorizedUpdated(