This commit is contained in:
Ferdinand Schober
2024-07-11 16:47:36 +02:00
parent 2d26bd6a0b
commit 4c30f032f4
3 changed files with 142 additions and 129 deletions

View File

@@ -97,7 +97,7 @@ impl Server {
} }
}; };
let timer_notify = Arc::new(Notify::new()); let timer_notify = Arc::new(Notify::new()); /* notify ping timer restart */
let (frontend_tx, frontend_rx) = channel(1); /* events coming from frontends */ let (frontend_tx, frontend_rx) = channel(1); /* events coming from frontends */
let cancellation_token = CancellationToken::new(); /* notify termination */ let cancellation_token = CancellationToken::new(); /* notify termination */
let notify_capture = Arc::new(Notify::new()); /* notify capture restart */ let notify_capture = Arc::new(Notify::new()); /* notify capture restart */
@@ -166,6 +166,7 @@ impl Server {
emulate_channel.clone(), emulate_channel.clone(),
capture_channel.clone(), capture_channel.clone(),
timer_notify, timer_notify,
cancellation_token.clone(),
); );
let active = self let active = self
@@ -196,11 +197,7 @@ impl Server {
} }
_ = &mut capture_task => { } _ = &mut capture_task => { }
_ = &mut emulation_task => { } _ = &mut emulation_task => { }
e = &mut frontend_task => { _ = &mut frontend_task => { }
if let Ok(Err(e)) = e {
log::error!("error in frontend listener: {e}");
}
}
_ = &mut resolver_task => { } _ = &mut resolver_task => { }
_ = &mut udp_task => { } _ = &mut udp_task => { }
_ = &mut ping_task => { } _ = &mut ping_task => { }
@@ -209,9 +206,14 @@ impl Server {
// cancel tasks // cancel tasks
cancellation_token.cancel(); cancellation_token.cancel();
let _ = join!(capture_task, emulation_task, frontend_task, udp_task); let _ = join!(
capture_task,
emulation_task,
frontend_task,
udp_task,
resolver_task
);
resolver_task.abort();
ping_task.abort(); ping_task.abort();
Ok(()) Ok(())

View File

@@ -10,7 +10,6 @@ use tokio::net::UnixStream;
#[cfg(windows)] #[cfg(windows)]
use tokio::net::TcpStream; use tokio::net::TcpStream;
use anyhow::{anyhow, Result};
use tokio::{ use tokio::{
io::ReadHalf, io::ReadHalf,
sync::{ sync::{
@@ -32,7 +31,7 @@ use super::{
pub(crate) fn new( pub(crate) fn new(
mut frontend: FrontendListener, mut frontend: FrontendListener,
mut notify_rx: Receiver<FrontendEvent>, mut event: Receiver<FrontendEvent>,
server: Server, server: Server,
notify_capture: Arc<Notify>, notify_capture: Arc<Notify>,
notify_emulation: Arc<Notify>, notify_emulation: Arc<Notify>,
@@ -41,32 +40,28 @@ pub(crate) fn new(
resolve_ch: Sender<DnsRequest>, resolve_ch: Sender<DnsRequest>,
port_tx: Sender<u16>, port_tx: Sender<u16>,
cancellation_token: CancellationToken, cancellation_token: CancellationToken,
) -> (JoinHandle<Result<()>>, Sender<FrontendRequest>) { ) -> (JoinHandle<()>, Sender<FrontendRequest>) {
let (event_tx, mut event_rx) = tokio::sync::mpsc::channel(32); let (request_tx, mut request) = tokio::sync::mpsc::channel(32);
let event_tx_clone = event_tx.clone(); let request_tx_clone = request_tx.clone();
let frontend_task = tokio::task::spawn_local(async move { let frontend_task = tokio::task::spawn_local(async move {
let mut join_handles = vec![]; let mut join_handles = vec![];
loop { loop {
tokio::select! { tokio::select! {
stream = frontend.accept() => { stream = frontend.accept() => {
let stream = match stream { match stream {
Ok(s) => s, Ok(s) => join_handles.push(handle_frontend_stream(&request_tx_clone, s, cancellation_token.clone())),
Err(e) => { Err(e) => log::warn!("error accepting frontend connection: {e}"),
log::warn!("error accepting frontend connection: {e}");
continue;
}
}; };
join_handles.push(handle_frontend_stream(&event_tx_clone, stream, cancellation_token.clone()));
} }
event = event_rx.recv() => { request = request.recv() => {
let frontend_event = event.ok_or(anyhow!("frontend channel closed"))?; let request = request.expect("frontend request channel closed");
if handle_frontend_event(&server, &notify_capture, &notify_emulation, &capture, &emulate, &resolve_ch, &mut frontend, &port_tx, frontend_event).await { if handle_frontend_event(&server, &notify_capture, &notify_emulation, &capture, &emulate, &resolve_ch, &mut frontend, &port_tx, request).await {
break; break;
} }
} }
notify = notify_rx.recv() => { event = event.recv() => {
let notify = notify.ok_or(anyhow!("frontend notify closed"))?; let event = event.expect("channel closed");
let _ = frontend.broadcast_event(notify).await; let _ = frontend.broadcast_event(event).await;
} }
_ = cancellation_token.cancelled() => { _ = cancellation_token.cancelled() => {
futures::future::join_all(join_handles).await; futures::future::join_all(join_handles).await;
@@ -74,9 +69,8 @@ pub(crate) fn new(
} }
} }
} }
anyhow::Ok(())
}); });
(frontend_task, event_tx) (frontend_task, request_tx)
} }
fn handle_frontend_stream( fn handle_frontend_stream(

View File

@@ -6,6 +6,7 @@ use tokio::{
}; };
use input_event::Event; use input_event::Event;
use tokio_util::sync::CancellationToken;
use crate::client::ClientHandle; use crate::client::ClientHandle;
@@ -19,9 +20,25 @@ pub fn new(
emulate_notify: Sender<EmulationEvent>, emulate_notify: Sender<EmulationEvent>,
capture_notify: Sender<CaptureEvent>, capture_notify: Sender<CaptureEvent>,
timer_notify: Arc<Notify>, timer_notify: Arc<Notify>,
cancellation_token: CancellationToken,
) -> JoinHandle<()> { ) -> JoinHandle<()> {
// timer task // timer task
let ping_task = tokio::task::spawn_local(async move { let ping_task = tokio::task::spawn_local(async move {
tokio::select! {
_ = cancellation_token.cancelled() => {}
_ = ping_task(server, sender_ch, emulate_notify, capture_notify, timer_notify) => {}
}
});
ping_task
}
async fn ping_task(
server: Server,
sender_ch: Sender<(Event, SocketAddr)>,
emulate_notify: Sender<EmulationEvent>,
capture_notify: Sender<CaptureEvent>,
timer_notify: Arc<Notify>,
) {
loop { loop {
// wait for wake up signal // wait for wake up signal
timer_notify.notified().await; timer_notify.notified().await;
@@ -84,7 +101,9 @@ pub fn new(
// give clients time to resond // give clients time to resond
if receiving { if receiving {
log::trace!("waiting {MAX_RESPONSE_TIME:?} for response from client with pressed keys ..."); log::trace!(
"waiting {MAX_RESPONSE_TIME:?} for response from client with pressed keys ..."
);
} else { } else {
log::trace!( log::trace!(
"state: {:?} => waiting {MAX_RESPONSE_TIME:?} for client to respond ...", "state: {:?} => waiting {MAX_RESPONSE_TIME:?} for client to respond ...",
@@ -124,6 +143,4 @@ pub fn new(
} }
} }
} }
});
ping_task
} }