diff --git a/src/server.rs b/src/server.rs index 70e0cf2..84be47c 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,11 +1,9 @@ use log; use std::{ cell::{Cell, RefCell}, - io::Result, rc::Rc, time::Duration, }; -use tokio::net::UdpSocket; use tokio::signal; use std::net::SocketAddr; @@ -15,7 +13,7 @@ use crate::{ config::Config, dns, event::Event, - frontend::{FrontendEvent, FrontendListener, FrontendNotify}, + frontend::{FrontendEvent, FrontendListener}, server::producer_task::ProducerEvent, }; use crate::{consumer, producer}; @@ -24,6 +22,7 @@ use self::{consumer_task::ConsumerEvent, resolver_task::DnsRequest}; mod consumer_task; mod frontend_task; +mod network_task; mod producer_task; mod resolver_task; @@ -83,10 +82,12 @@ impl Server { }; let (consumer, producer) = tokio::join!(consumer::create(), producer::create()); - let (receiver_tx, receiver_rx) = tokio::sync::mpsc::channel(32); - let (sender_tx, mut sender_rx) = tokio::sync::mpsc::channel(32); - let (port_tx, mut port_rx) = tokio::sync::mpsc::channel(32); let (timer_tx, mut timer_rx) = tokio::sync::mpsc::channel(1); + let (frontend_notify_tx, frontend_notify_rx) = tokio::sync::mpsc::channel(1); + + // udp task + let (mut udp_task, sender_tx, receiver_rx, port_tx) = + network_task::new(self.clone(), frontend_notify_tx).await?; // event producer let (mut producer_task, producer_channel) = @@ -107,8 +108,9 @@ impl Server { let (mut resolver_task, resolve_tx) = resolver_task::new(resolver, self.clone()); // frontend listener - let (mut frontend_task, frontend_tx, frontend_notify_tx) = frontend_task::new( + let (mut frontend_task, frontend_tx) = frontend_task::new( frontend, + frontend_notify_rx, self.clone(), producer_channel.clone(), consumer_channel.clone(), @@ -116,56 +118,6 @@ impl Server { port_tx, ); - // bind the udp socket - let listen_addr = SocketAddr::new("0.0.0.0".parse().unwrap(), self.port.get()); - let mut socket = UdpSocket::bind(listen_addr).await?; - let server = self.clone(); - // udp task - let mut udp_task = tokio::task::spawn_local(async move { - loop { - tokio::select! { - event = receive_event(&socket) => { - let _ = receiver_tx.send(event).await; - } - event = sender_rx.recv() => { - let Some((event, addr)) = event else { - break; - }; - if let Err(e) = send_event(&socket, event, addr) { - log::warn!("udp send failed: {e}"); - }; - } - port = port_rx.recv() => { - let Some(port) = port else { - break; - }; - - if socket.local_addr().unwrap().port() == port { - continue; - } - - let listen_addr = SocketAddr::new("0.0.0.0".parse().unwrap(), port); - match UdpSocket::bind(listen_addr).await { - Ok(new_socket) => { - socket = new_socket; - server.port.replace(port); - let _ = frontend_notify_tx.send(FrontendNotify::NotifyPortChange(port, None)).await; - } - Err(e) => { - log::warn!("could not change port: {e}"); - let port = socket.local_addr().unwrap().port(); - let _ = frontend_notify_tx.send(FrontendNotify::NotifyPortChange( - port, - Some(format!("could not change port: {e}")), - )).await; - } - } - - } - } - } - }); - // timer task let server = self.clone(); let sender_ch = sender_tx.clone(); @@ -350,18 +302,3 @@ impl Server { Ok(()) } } - -async fn receive_event(socket: &UdpSocket) -> anyhow::Result<(Event, SocketAddr)> { - let mut buf = vec![0u8; 22]; - let (_amt, src) = socket.recv_from(&mut buf).await?; - Ok((Event::try_from(buf)?, src)) -} - -fn send_event(sock: &UdpSocket, e: Event, addr: SocketAddr) -> Result { - log::trace!("{:20} ------>->->-> {addr}", e.to_string()); - let data: Vec = (&e).into(); - // When udp blocks, we dont want to block the event loop. - // Dropping events is better than potentially crashing the event - // producer. - sock.try_send_to(&data, addr) -} diff --git a/src/server/frontend_task.rs b/src/server/frontend_task.rs index a0409d1..3aabd00 100644 --- a/src/server/frontend_task.rs +++ b/src/server/frontend_task.rs @@ -10,7 +10,11 @@ use tokio::net::UnixStream; use tokio::net::TcpStream; use anyhow::{anyhow, Result}; -use tokio::{io::ReadHalf, sync::mpsc::Sender, task::JoinHandle}; +use tokio::{ + io::ReadHalf, + sync::mpsc::{Receiver, Sender}, + task::JoinHandle, +}; use crate::{ client::{ClientEvent, ClientHandle, Position}, @@ -23,18 +27,14 @@ use super::{ pub(crate) fn new( mut frontend: FrontendListener, + mut notify_rx: Receiver, server: Server, producer_notify: Sender, consumer_notify: Sender, resolve_ch: Sender, port_tx: Sender, -) -> ( - JoinHandle>, - Sender, - Sender, -) { +) -> (JoinHandle>, Sender) { let (event_tx, mut event_rx) = tokio::sync::mpsc::channel(32); - let (notify_tx, mut notify_rx) = tokio::sync::mpsc::channel(32); let event_tx_clone = event_tx.clone(); let frontend_task = tokio::task::spawn_local(async move { loop { @@ -63,7 +63,7 @@ pub(crate) fn new( } anyhow::Ok(()) }); - (frontend_task, event_tx, notify_tx) + (frontend_task, event_tx) } async fn handle_frontend_stream( diff --git a/src/server/network_task.rs b/src/server/network_task.rs new file mode 100644 index 0000000..f5e9774 --- /dev/null +++ b/src/server/network_task.rs @@ -0,0 +1,90 @@ +use std::net::SocketAddr; + +use anyhow::Result; +use tokio::{ + net::UdpSocket, + sync::mpsc::{Receiver, Sender}, + task::JoinHandle, +}; + +use crate::{event::Event, frontend::FrontendNotify}; + +use super::Server; + +pub async fn new( + server: Server, + frontend_notify_tx: Sender, +) -> Result<( + JoinHandle<()>, + Sender<(Event, SocketAddr)>, + Receiver>, + Sender, +)> { + // bind the udp socket + let listen_addr = SocketAddr::new("0.0.0.0".parse().unwrap(), server.port.get()); + let mut socket = UdpSocket::bind(listen_addr).await?; + let (receiver_tx, receiver_rx) = tokio::sync::mpsc::channel(32); + let (sender_tx, mut sender_rx) = tokio::sync::mpsc::channel(32); + let (port_tx, mut port_rx) = tokio::sync::mpsc::channel(32); + + let udp_task = tokio::task::spawn_local(async move { + loop { + tokio::select! { + event = receive_event(&socket) => { + let _ = receiver_tx.send(event).await; + } + event = sender_rx.recv() => { + let Some((event, addr)) = event else { + break; + }; + if let Err(e) = send_event(&socket, event, addr) { + log::warn!("udp send failed: {e}"); + }; + } + port = port_rx.recv() => { + let Some(port) = port else { + break; + }; + + if socket.local_addr().unwrap().port() == port { + continue; + } + + let listen_addr = SocketAddr::new("0.0.0.0".parse().unwrap(), port); + match UdpSocket::bind(listen_addr).await { + Ok(new_socket) => { + socket = new_socket; + server.port.replace(port); + let _ = frontend_notify_tx.send(FrontendNotify::NotifyPortChange(port, None)).await; + } + Err(e) => { + log::warn!("could not change port: {e}"); + let port = socket.local_addr().unwrap().port(); + let _ = frontend_notify_tx.send(FrontendNotify::NotifyPortChange( + port, + Some(format!("could not change port: {e}")), + )).await; + } + } + + } + } + } + }); + Ok((udp_task, sender_tx, receiver_rx, port_tx)) +} + +async fn receive_event(socket: &UdpSocket) -> Result<(Event, SocketAddr)> { + let mut buf = vec![0u8; 22]; + let (_amt, src) = socket.recv_from(&mut buf).await?; + Ok((Event::try_from(buf)?, src)) +} + +fn send_event(sock: &UdpSocket, e: Event, addr: SocketAddr) -> Result { + log::trace!("{:20} ------>->->-> {addr}", e.to_string()); + let data: Vec = (&e).into(); + // When udp blocks, we dont want to block the event loop. + // Dropping events is better than potentially crashing the event + // producer. + Ok(sock.try_send_to(&data, addr)?) +}