fix service exit

This commit is contained in:
Ferdinand Schober
2024-10-28 17:44:21 +01:00
parent 5461c6a00e
commit 095937e943
3 changed files with 124 additions and 96 deletions

View File

@@ -19,8 +19,9 @@ use tokio::{
use crate::{connect::LanMouseConnection, service::Service};
pub(crate) struct Capture {
exit_requested: Rc<Cell<bool>>,
_active: Rc<Cell<Option<CaptureHandle>>>,
tx: Sender<CaptureRequest>,
request_tx: Sender<CaptureRequest>,
task: JoinHandle<()>,
event_rx: Receiver<ICaptureEvent>,
}
@@ -54,33 +55,37 @@ impl Capture {
conn: LanMouseConnection,
service: Service,
) -> Self {
let (tx, rx) = channel();
let (request_tx, request_rx) = channel();
let (event_tx, event_rx) = channel();
let active = Rc::new(Cell::new(None));
let exit_requested = Rc::new(Cell::new(false));
let task = spawn_local(Self::run(
active.clone(),
exit_requested.clone(),
service,
backend,
rx,
request_rx,
conn,
event_tx,
));
Self {
_active: active,
tx,
exit_requested,
request_tx,
task,
event_rx,
}
}
pub(crate) fn reenable(&self) {
self.tx
self.request_tx
.send(CaptureRequest::Reenable)
.expect("channel closed");
}
pub(crate) async fn terminate(&mut self) {
self.tx
self.exit_requested.replace(true);
self.request_tx
.send(CaptureRequest::Terminate)
.expect("channel closed");
log::debug!("terminating capture");
@@ -90,19 +95,19 @@ impl Capture {
}
pub(crate) fn create(&self, handle: CaptureHandle, pos: lan_mouse_ipc::Position) {
self.tx
self.request_tx
.send(CaptureRequest::Create(handle, to_capture_pos(pos)))
.expect("channel closed");
}
pub(crate) fn destroy(&self, handle: CaptureHandle) {
self.tx
self.request_tx
.send(CaptureRequest::Destroy(handle))
.expect("channel closed");
}
pub(crate) fn release(&self) {
self.tx
self.request_tx
.send(CaptureRequest::Release)
.expect("channel closed");
}
@@ -113,9 +118,10 @@ impl Capture {
async fn run(
active: Rc<Cell<Option<CaptureHandle>>>,
exit_requested: Rc<Cell<bool>>,
service: Service,
backend: Option<input_capture::Backend>,
mut rx: Receiver<CaptureRequest>,
mut request_rx: Receiver<CaptureRequest>,
mut conn: LanMouseConnection,
mut event_tx: Sender<ICaptureEvent>,
) {
@@ -125,18 +131,18 @@ impl Capture {
&service,
backend,
&mut conn,
&mut rx,
&mut request_rx,
&mut event_tx,
)
.await
{
log::warn!("input capture exited: {e}");
}
event_tx
.send(ICaptureEvent::CaptureDisabled)
.expect("channel closed");
if exit_requested.get() {
break;
}
loop {
match rx.recv().await.expect("channel closed") {
match request_rx.recv().await.expect("channel closed") {
CaptureRequest::Reenable => break,
CaptureRequest::Terminate => return,
_ => {}
@@ -151,13 +157,13 @@ async fn do_capture(
service: &Service,
backend: Option<input_capture::Backend>,
conn: &mut LanMouseConnection,
rx: &mut Receiver<CaptureRequest>,
request_rx: &mut Receiver<CaptureRequest>,
event_tx: &mut Sender<ICaptureEvent>,
) -> Result<(), InputCaptureError> {
/* allow cancelling capture request */
let mut capture = tokio::select! {
r = InputCapture::new(backend) => r?,
_ = wait_for_termination(rx) => return Ok(()),
_ = wait_for_termination(request_rx) => return Ok(()),
};
event_tx
.send(ICaptureEvent::CaptureEnabled)
@@ -179,12 +185,35 @@ async fn do_capture(
capture.create(handle, to_capture_pos(pos)).await?;
}
let res = do_capture_session(active, &mut capture, conn, event_tx, request_rx, service).await;
// FIXME replace with async drop when stabilized
let res1 = capture.terminate().await;
// handle errors
res?;
res1?;
event_tx
.send(ICaptureEvent::CaptureDisabled)
.expect("channel closed");
Ok(())
}
async fn do_capture_session(
active: &Cell<Option<CaptureHandle>>,
capture: &mut InputCapture,
conn: &mut LanMouseConnection,
event_tx: &Sender<ICaptureEvent>,
request_rx: &mut Receiver<CaptureRequest>,
service: &Service,
) -> Result<(), InputCaptureError> {
let mut state = State::WaitingForAck;
loop {
tokio::select! {
event = capture.next() => match event {
Some(event) => handle_capture_event(active, &service, &mut capture, conn, event?, &mut state, event_tx).await?,
Some(event) => handle_capture_event(active, service, capture, conn, event?, &mut state, event_tx).await?,
None => return Ok(()),
},
(handle, event) = conn.recv() => {
@@ -205,22 +234,20 @@ async fn do_capture(
// client disconnected
ProtoEvent::Leave(_) => {
log::info!("releasing capture: left remote client device region");
release_capture(&mut capture, &active).await?;
release_capture(capture, active).await?;
},
_ => {}
}
},
e = rx.recv() => match e.expect("channel closed") {
e = request_rx.recv() => match e.expect("channel closed") {
CaptureRequest::Reenable => { /* already active */ },
CaptureRequest::Release => release_capture(&mut capture, &active).await?,
CaptureRequest::Release => release_capture(capture, active).await?,
CaptureRequest::Create(h, p) => capture.create(h, p).await?,
CaptureRequest::Destroy(h) => capture.destroy(h).await?,
CaptureRequest::Terminate => break,
}
}
}
capture.terminate().await?;
Ok(())
}
@@ -258,7 +285,7 @@ async fn handle_capture_event(
conn: &LanMouseConnection,
event: (CaptureHandle, CaptureEvent),
state: &mut State,
event_tx: &mut Sender<ICaptureEvent>,
event_tx: &Sender<ICaptureEvent>,
) -> Result<(), CaptureError> {
let (handle, event) = event;
log::trace!("({handle}): {event:?}");

View File

@@ -174,6 +174,7 @@ impl Emulation {
/// discarding events when it is disabled
pub(crate) struct EmulationProxy {
emulation_active: Rc<Cell<bool>>,
exit_requested: Rc<Cell<bool>>,
request_tx: Sender<ProxyRequest>,
event_rx: Receiver<EmulationEvent>,
task: JoinHandle<()>,
@@ -191,9 +192,16 @@ impl EmulationProxy {
let (request_tx, request_rx) = channel();
let (event_tx, event_rx) = channel();
let emulation_active = Rc::new(Cell::new(false));
let task = spawn_local(Self::emulation_task(backend, request_rx, event_tx));
let exit_requested = Rc::new(Cell::new(false));
let task = spawn_local(Self::emulation_task(
backend,
exit_requested.clone(),
request_rx,
event_tx,
));
Self {
emulation_active,
exit_requested,
request_tx,
task,
event_rx,
@@ -228,6 +236,7 @@ impl EmulationProxy {
async fn emulation_task(
backend: Option<input_emulation::Backend>,
exit_requested: Rc<Cell<bool>>,
mut request_rx: Receiver<ProxyRequest>,
event_tx: Sender<EmulationEvent>,
) {
@@ -245,6 +254,9 @@ impl EmulationProxy {
{
log::warn!("input emulation exited: {e}");
}
if exit_requested.get() {
break;
}
// wait for reenable request
loop {
match request_rx.recv().await.expect("channel closed") {
@@ -330,6 +342,7 @@ impl EmulationProxy {
}
async fn terminate(&mut self) {
self.exit_requested.replace(true);
self.request_tx
.send(ProxyRequest::Terminate)
.expect("channel closed");

View File

@@ -171,9 +171,61 @@ impl Service {
}
None => break,
};
log::debug!("received frontend request: {request:?}");
self.handle_request(&capture, &emulation, request, &resolver);
log::debug!("handled frontend request");
match request {
FrontendRequest::EnableCapture => capture.reenable(),
FrontendRequest::EnableEmulation => emulation.reenable(),
FrontendRequest::Create => {
self.add_client();
}
FrontendRequest::Activate(handle, active) => {
if active {
if let Some(hostname) = self.client_manager.get_hostname(handle) {
resolver.resolve(handle, hostname);
}
self.activate_client(&capture, handle);
} else {
self.deactivate_client(&capture, handle);
}
}
FrontendRequest::ChangePort(port) => emulation.request_port_change(port),
FrontendRequest::Delete(handle) => {
self.remove_client(&capture, handle);
self.notify_frontend(FrontendEvent::Deleted(handle));
}
FrontendRequest::Enumerate() => self.enumerate(),
FrontendRequest::GetState(handle) => self.broadcast_client(handle),
FrontendRequest::UpdateFixIps(handle, fix_ips) => self.update_fix_ips(handle, fix_ips),
FrontendRequest::UpdateHostname(handle, host) => {
self.update_hostname(handle, host, &resolver)
}
FrontendRequest::UpdatePort(handle, port) => self.update_port(handle, port),
FrontendRequest::UpdatePosition(handle, pos) => {
self.update_pos(handle, &capture, pos);
}
FrontendRequest::ResolveDns(handle) => {
if let Some(hostname) = self.client_manager.get_hostname(handle) {
resolver.resolve(handle, hostname);
}
}
FrontendRequest::Sync => {
self.enumerate();
self.notify_frontend(FrontendEvent::EmulationStatus(self.emulation_status.get()));
self.notify_frontend(FrontendEvent::CaptureStatus(self.capture_status.get()));
self.notify_frontend(FrontendEvent::PortChanged(self.port.get(), None));
self.notify_frontend(FrontendEvent::PublicKeyFingerprint(
self.public_key_fingerprint.clone(),
));
self.notify_frontend(FrontendEvent::AuthorizedUpdated(
self.authorized_keys.read().expect("lock").clone(),
));
}
FrontendRequest::AuthorizeKey(desc, fp) => {
self.add_authorized_key(desc, fp);
}
FrontendRequest::RemoveAuthorizedKey(key) => {
self.remove_authorized_key(key);
}
}
}
_ = self.notifies.frontend_event_pending.notified() => {
while let Some(event) = {
@@ -262,10 +314,12 @@ impl Service {
}
}
log::info!("terminating service");
log::info!("terminating service ...");
log::info!("terminating capture ...");
capture.terminate().await;
log::info!("terminating emulation ...");
emulation.terminate().await;
log::info!("terminating dns resolver ...");
resolver.terminate().await;
Ok(())
@@ -324,72 +378,6 @@ impl Service {
self.notify_frontend(FrontendEvent::Changed(handle));
}
fn handle_request(
&self,
capture: &Capture,
emulation: &Emulation,
event: FrontendRequest,
dns: &DnsResolver,
) -> bool {
log::debug!("frontend: {event:?}");
match event {
FrontendRequest::EnableCapture => capture.reenable(),
FrontendRequest::EnableEmulation => emulation.reenable(),
FrontendRequest::Create => {
self.add_client();
}
FrontendRequest::Activate(handle, active) => {
if active {
if let Some(hostname) = self.client_manager.get_hostname(handle) {
dns.resolve(handle, hostname);
}
self.activate_client(capture, handle);
} else {
self.deactivate_client(capture, handle);
}
}
FrontendRequest::ChangePort(port) => emulation.request_port_change(port),
FrontendRequest::Delete(handle) => {
self.remove_client(capture, handle);
self.notify_frontend(FrontendEvent::Deleted(handle));
}
FrontendRequest::Enumerate() => self.enumerate(),
FrontendRequest::GetState(handle) => self.broadcast_client(handle),
FrontendRequest::UpdateFixIps(handle, fix_ips) => self.update_fix_ips(handle, fix_ips),
FrontendRequest::UpdateHostname(handle, host) => {
self.update_hostname(handle, host, dns)
}
FrontendRequest::UpdatePort(handle, port) => self.update_port(handle, port),
FrontendRequest::UpdatePosition(handle, pos) => {
self.update_pos(handle, capture, pos);
}
FrontendRequest::ResolveDns(handle) => {
if let Some(hostname) = self.client_manager.get_hostname(handle) {
dns.resolve(handle, hostname);
}
}
FrontendRequest::Sync => {
self.enumerate();
self.notify_frontend(FrontendEvent::EmulationStatus(self.emulation_status.get()));
self.notify_frontend(FrontendEvent::CaptureStatus(self.capture_status.get()));
self.notify_frontend(FrontendEvent::PortChanged(self.port.get(), None));
self.notify_frontend(FrontendEvent::PublicKeyFingerprint(
self.public_key_fingerprint.clone(),
));
self.notify_frontend(FrontendEvent::AuthorizedUpdated(
self.authorized_keys.read().expect("lock").clone(),
));
}
FrontendRequest::AuthorizeKey(desc, fp) => {
self.add_authorized_key(desc, fp);
}
FrontendRequest::RemoveAuthorizedKey(key) => {
self.remove_authorized_key(key);
}
};
false
}
fn add_authorized_key(&self, desc: String, fp: String) {
self.authorized_keys.write().expect("lock").insert(fp, desc);
self.notify_frontend(FrontendEvent::AuthorizedUpdated(