refactor udp task

This commit is contained in:
Ferdinand Schober
2024-01-19 00:40:34 +01:00
parent 2803db7073
commit 36001c6fb2
3 changed files with 107 additions and 80 deletions

View File

@@ -1,11 +1,9 @@
use log; use log;
use std::{ use std::{
cell::{Cell, RefCell}, cell::{Cell, RefCell},
io::Result,
rc::Rc, rc::Rc,
time::Duration, time::Duration,
}; };
use tokio::net::UdpSocket;
use tokio::signal; use tokio::signal;
use std::net::SocketAddr; use std::net::SocketAddr;
@@ -15,7 +13,7 @@ use crate::{
config::Config, config::Config,
dns, dns,
event::Event, event::Event,
frontend::{FrontendEvent, FrontendListener, FrontendNotify}, frontend::{FrontendEvent, FrontendListener},
server::producer_task::ProducerEvent, server::producer_task::ProducerEvent,
}; };
use crate::{consumer, producer}; use crate::{consumer, producer};
@@ -24,6 +22,7 @@ use self::{consumer_task::ConsumerEvent, resolver_task::DnsRequest};
mod consumer_task; mod consumer_task;
mod frontend_task; mod frontend_task;
mod network_task;
mod producer_task; mod producer_task;
mod resolver_task; mod resolver_task;
@@ -83,10 +82,12 @@ impl Server {
}; };
let (consumer, producer) = tokio::join!(consumer::create(), producer::create()); let (consumer, producer) = tokio::join!(consumer::create(), producer::create());
let (receiver_tx, receiver_rx) = tokio::sync::mpsc::channel(32);
let (sender_tx, mut sender_rx) = tokio::sync::mpsc::channel(32);
let (port_tx, mut port_rx) = tokio::sync::mpsc::channel(32);
let (timer_tx, mut timer_rx) = tokio::sync::mpsc::channel(1); let (timer_tx, mut timer_rx) = tokio::sync::mpsc::channel(1);
let (frontend_notify_tx, frontend_notify_rx) = tokio::sync::mpsc::channel(1);
// udp task
let (mut udp_task, sender_tx, receiver_rx, port_tx) =
network_task::new(self.clone(), frontend_notify_tx).await?;
// event producer // event producer
let (mut producer_task, producer_channel) = let (mut producer_task, producer_channel) =
@@ -107,8 +108,9 @@ impl Server {
let (mut resolver_task, resolve_tx) = resolver_task::new(resolver, self.clone()); let (mut resolver_task, resolve_tx) = resolver_task::new(resolver, self.clone());
// frontend listener // frontend listener
let (mut frontend_task, frontend_tx, frontend_notify_tx) = frontend_task::new( let (mut frontend_task, frontend_tx) = frontend_task::new(
frontend, frontend,
frontend_notify_rx,
self.clone(), self.clone(),
producer_channel.clone(), producer_channel.clone(),
consumer_channel.clone(), consumer_channel.clone(),
@@ -116,56 +118,6 @@ impl Server {
port_tx, port_tx,
); );
// bind the udp socket
let listen_addr = SocketAddr::new("0.0.0.0".parse().unwrap(), self.port.get());
let mut socket = UdpSocket::bind(listen_addr).await?;
let server = self.clone();
// udp task
let mut udp_task = tokio::task::spawn_local(async move {
loop {
tokio::select! {
event = receive_event(&socket) => {
let _ = receiver_tx.send(event).await;
}
event = sender_rx.recv() => {
let Some((event, addr)) = event else {
break;
};
if let Err(e) = send_event(&socket, event, addr) {
log::warn!("udp send failed: {e}");
};
}
port = port_rx.recv() => {
let Some(port) = port else {
break;
};
if socket.local_addr().unwrap().port() == port {
continue;
}
let listen_addr = SocketAddr::new("0.0.0.0".parse().unwrap(), port);
match UdpSocket::bind(listen_addr).await {
Ok(new_socket) => {
socket = new_socket;
server.port.replace(port);
let _ = frontend_notify_tx.send(FrontendNotify::NotifyPortChange(port, None)).await;
}
Err(e) => {
log::warn!("could not change port: {e}");
let port = socket.local_addr().unwrap().port();
let _ = frontend_notify_tx.send(FrontendNotify::NotifyPortChange(
port,
Some(format!("could not change port: {e}")),
)).await;
}
}
}
}
}
});
// timer task // timer task
let server = self.clone(); let server = self.clone();
let sender_ch = sender_tx.clone(); let sender_ch = sender_tx.clone();
@@ -350,18 +302,3 @@ impl Server {
Ok(()) Ok(())
} }
} }
async fn receive_event(socket: &UdpSocket) -> anyhow::Result<(Event, SocketAddr)> {
let mut buf = vec![0u8; 22];
let (_amt, src) = socket.recv_from(&mut buf).await?;
Ok((Event::try_from(buf)?, src))
}
fn send_event(sock: &UdpSocket, e: Event, addr: SocketAddr) -> Result<usize> {
log::trace!("{:20} ------>->->-> {addr}", e.to_string());
let data: Vec<u8> = (&e).into();
// When udp blocks, we dont want to block the event loop.
// Dropping events is better than potentially crashing the event
// producer.
sock.try_send_to(&data, addr)
}

View File

@@ -10,7 +10,11 @@ use tokio::net::UnixStream;
use tokio::net::TcpStream; use tokio::net::TcpStream;
use anyhow::{anyhow, Result}; use anyhow::{anyhow, Result};
use tokio::{io::ReadHalf, sync::mpsc::Sender, task::JoinHandle}; use tokio::{
io::ReadHalf,
sync::mpsc::{Receiver, Sender},
task::JoinHandle,
};
use crate::{ use crate::{
client::{ClientEvent, ClientHandle, Position}, client::{ClientEvent, ClientHandle, Position},
@@ -23,18 +27,14 @@ use super::{
pub(crate) fn new( pub(crate) fn new(
mut frontend: FrontendListener, mut frontend: FrontendListener,
mut notify_rx: Receiver<FrontendNotify>,
server: Server, server: Server,
producer_notify: Sender<ProducerEvent>, producer_notify: Sender<ProducerEvent>,
consumer_notify: Sender<ConsumerEvent>, consumer_notify: Sender<ConsumerEvent>,
resolve_ch: Sender<DnsRequest>, resolve_ch: Sender<DnsRequest>,
port_tx: Sender<u16>, port_tx: Sender<u16>,
) -> ( ) -> (JoinHandle<Result<()>>, Sender<FrontendEvent>) {
JoinHandle<Result<()>>,
Sender<FrontendEvent>,
Sender<FrontendNotify>,
) {
let (event_tx, mut event_rx) = tokio::sync::mpsc::channel(32); let (event_tx, mut event_rx) = tokio::sync::mpsc::channel(32);
let (notify_tx, mut notify_rx) = tokio::sync::mpsc::channel(32);
let event_tx_clone = event_tx.clone(); let event_tx_clone = event_tx.clone();
let frontend_task = tokio::task::spawn_local(async move { let frontend_task = tokio::task::spawn_local(async move {
loop { loop {
@@ -63,7 +63,7 @@ pub(crate) fn new(
} }
anyhow::Ok(()) anyhow::Ok(())
}); });
(frontend_task, event_tx, notify_tx) (frontend_task, event_tx)
} }
async fn handle_frontend_stream( async fn handle_frontend_stream(

View File

@@ -0,0 +1,90 @@
use std::net::SocketAddr;
use anyhow::Result;
use tokio::{
net::UdpSocket,
sync::mpsc::{Receiver, Sender},
task::JoinHandle,
};
use crate::{event::Event, frontend::FrontendNotify};
use super::Server;
pub async fn new(
server: Server,
frontend_notify_tx: Sender<FrontendNotify>,
) -> Result<(
JoinHandle<()>,
Sender<(Event, SocketAddr)>,
Receiver<Result<(Event, SocketAddr)>>,
Sender<u16>,
)> {
// bind the udp socket
let listen_addr = SocketAddr::new("0.0.0.0".parse().unwrap(), server.port.get());
let mut socket = UdpSocket::bind(listen_addr).await?;
let (receiver_tx, receiver_rx) = tokio::sync::mpsc::channel(32);
let (sender_tx, mut sender_rx) = tokio::sync::mpsc::channel(32);
let (port_tx, mut port_rx) = tokio::sync::mpsc::channel(32);
let udp_task = tokio::task::spawn_local(async move {
loop {
tokio::select! {
event = receive_event(&socket) => {
let _ = receiver_tx.send(event).await;
}
event = sender_rx.recv() => {
let Some((event, addr)) = event else {
break;
};
if let Err(e) = send_event(&socket, event, addr) {
log::warn!("udp send failed: {e}");
};
}
port = port_rx.recv() => {
let Some(port) = port else {
break;
};
if socket.local_addr().unwrap().port() == port {
continue;
}
let listen_addr = SocketAddr::new("0.0.0.0".parse().unwrap(), port);
match UdpSocket::bind(listen_addr).await {
Ok(new_socket) => {
socket = new_socket;
server.port.replace(port);
let _ = frontend_notify_tx.send(FrontendNotify::NotifyPortChange(port, None)).await;
}
Err(e) => {
log::warn!("could not change port: {e}");
let port = socket.local_addr().unwrap().port();
let _ = frontend_notify_tx.send(FrontendNotify::NotifyPortChange(
port,
Some(format!("could not change port: {e}")),
)).await;
}
}
}
}
}
});
Ok((udp_task, sender_tx, receiver_rx, port_tx))
}
async fn receive_event(socket: &UdpSocket) -> Result<(Event, SocketAddr)> {
let mut buf = vec![0u8; 22];
let (_amt, src) = socket.recv_from(&mut buf).await?;
Ok((Event::try_from(buf)?, src))
}
fn send_event(sock: &UdpSocket, e: Event, addr: SocketAddr) -> Result<usize> {
log::trace!("{:20} ------>->->-> {addr}", e.to_string());
let data: Vec<u8> = (&e).into();
// When udp blocks, we dont want to block the event loop.
// Dropping events is better than potentially crashing the event
// producer.
Ok(sock.try_send_to(&data, addr)?)
}