use local-channel instead of tokio sync channel (#179)

this avoids the mutex overhead in tokio
This commit is contained in:
Ferdinand Schober
2024-08-12 18:20:21 +02:00
committed by GitHub
parent 19c2c4327f
commit e7a1d72149
8 changed files with 54 additions and 61 deletions

View File

@@ -1,5 +1,6 @@
use capture_task::CaptureRequest;
use emulation_task::EmulationRequest;
use local_channel::mpsc::{channel, Sender};
use log;
use std::{
cell::{Cell, RefCell},
@@ -8,15 +9,7 @@ use std::{
net::{IpAddr, SocketAddr},
rc::Rc,
};
use tokio::{
io::ReadHalf,
join, signal,
sync::{
mpsc::{channel, Sender},
Notify,
},
task::JoinHandle,
};
use tokio::{io::ReadHalf, join, signal, sync::Notify, task::JoinHandle};
use tokio_util::sync::CancellationToken;
use crate::{
@@ -130,12 +123,12 @@ impl Server {
}
};
let (capture_tx, capture_rx) = channel(1); /* requests for input capture */
let (emulation_tx, emulation_rx) = channel(1); /* emulation requests */
let (udp_recv_tx, udp_recv_rx) = channel(1); /* udp receiver */
let (udp_send_tx, udp_send_rx) = channel(1); /* udp sender */
let (request_tx, mut request_rx) = channel(1); /* frontend requests */
let (dns_tx, dns_rx) = channel(1); /* dns requests */
let (capture_tx, capture_rx) = channel(); /* requests for input capture */
let (emulation_tx, emulation_rx) = channel(); /* emulation requests */
let (udp_recv_tx, udp_recv_rx) = channel(); /* udp receiver */
let (udp_send_tx, udp_send_rx) = channel(); /* udp sender */
let (request_tx, mut request_rx) = channel(); /* frontend requests */
let (dns_tx, dns_rx) = channel(); /* dns requests */
// udp task
let network = network_task::new(self.clone(), udp_recv_tx.clone(), udp_send_rx).await?;
@@ -200,7 +193,7 @@ impl Server {
let request = self.pending_dns_requests.borrow_mut().pop_front();
request
} {
dns_tx.send(request).await.expect("channel closed");
dns_tx.send(request).expect("channel closed");
}
}
_ = self.cancelled() => break,
@@ -213,13 +206,6 @@ impl Server {
log::info!("terminating service");
assert!(!capture_tx.is_closed());
assert!(!emulation_tx.is_closed());
assert!(!udp_recv_tx.is_closed());
assert!(!udp_send_tx.is_closed());
assert!(!request_tx.is_closed());
assert!(!dns_tx.is_closed());
self.cancel();
futures::future::join_all(join_handles).await;
let _ = join!(capture, dns_task, emulation, network, ping);
@@ -377,8 +363,8 @@ impl Server {
None => return,
};
let _ = capture.send(CaptureRequest::Destroy(handle)).await;
let _ = emulate.send(EmulationRequest::Destroy(handle)).await;
let _ = capture.send(CaptureRequest::Destroy(handle));
let _ = emulate.send(EmulationRequest::Destroy(handle));
log::debug!("deactivating client {handle} done");
}
@@ -410,10 +396,8 @@ impl Server {
};
/* notify emulation, capture and frontends */
let _ = capture
.send(CaptureRequest::Create(handle, pos.into()))
.await;
let _ = emulate.send(EmulationRequest::Create(handle)).await;
let _ = capture.send(CaptureRequest::Create(handle, pos.into()));
let _ = emulate.send(EmulationRequest::Create(handle));
log::debug!("activating client {handle} done");
}
@@ -433,8 +417,8 @@ impl Server {
};
if active {
let _ = capture.send(CaptureRequest::Destroy(handle)).await;
let _ = emulate.send(EmulationRequest::Destroy(handle)).await;
let _ = capture.send(CaptureRequest::Destroy(handle));
let _ = emulate.send(EmulationRequest::Destroy(handle));
}
}
@@ -517,13 +501,11 @@ impl Server {
// update state in event input emulator & input capture
if changed {
if active {
let _ = capture.send(CaptureRequest::Destroy(handle)).await;
let _ = emulate.send(EmulationRequest::Destroy(handle)).await;
let _ = capture.send(CaptureRequest::Destroy(handle));
let _ = emulate.send(EmulationRequest::Destroy(handle));
}
let _ = capture
.send(CaptureRequest::Create(handle, pos.into()))
.await;
let _ = emulate.send(EmulationRequest::Create(handle)).await;
let _ = capture.send(CaptureRequest::Create(handle, pos.into()));
let _ = emulate.send(EmulationRequest::Create(handle));
}
}
@@ -595,7 +577,7 @@ async fn listen_frontend(
let request = frontend::wait_for_request(&mut stream).await;
match request {
Ok(request) => {
let _ = request_tx.send(request).await;
let _ = request_tx.send(request);
}
Err(e) => {
if let Some(e) = e.downcast_ref::<io::Error>() {