code cleanup + purge anyhow in library code (#157)

This commit is contained in:
Ferdinand Schober
2024-07-10 00:33:49 +02:00
committed by GitHub
parent 703465a370
commit 6a4dd740c3
23 changed files with 926 additions and 1064 deletions

View File

@@ -1,6 +1,6 @@
use std::net::SocketAddr;
use std::{io, net::SocketAddr};
use anyhow::Result;
use thiserror::Error;
use tokio::{
net::UdpSocket,
sync::mpsc::{Receiver, Sender},
@@ -8,66 +8,37 @@ use tokio::{
};
use crate::frontend::FrontendEvent;
use input_event::Event;
use input_event::{Event, ProtocolError};
use super::Server;
pub async fn new(
server: Server,
frontend_notify_tx: Sender<FrontendEvent>,
) -> Result<(
) -> io::Result<(
JoinHandle<()>,
Sender<(Event, SocketAddr)>,
Receiver<Result<(Event, SocketAddr)>>,
Receiver<Result<(Event, SocketAddr), NetworkError>>,
Sender<u16>,
)> {
// bind the udp socket
let listen_addr = SocketAddr::new("0.0.0.0".parse().unwrap(), server.port.get());
let mut socket = UdpSocket::bind(listen_addr).await?;
let (receiver_tx, receiver_rx) = tokio::sync::mpsc::channel(32);
let (sender_tx, mut sender_rx) = tokio::sync::mpsc::channel(32);
let (sender_tx, sender_rx) = tokio::sync::mpsc::channel(32);
let (port_tx, mut port_rx) = tokio::sync::mpsc::channel(32);
let udp_task = tokio::task::spawn_local(async move {
let mut sender_rx = sender_rx;
loop {
let udp_receiver = udp_receiver(&socket, &receiver_tx);
let udp_sender = udp_sender(&socket, &mut sender_rx);
tokio::select! {
event = receive_event(&socket) => {
let _ = receiver_tx.send(event).await;
}
event = sender_rx.recv() => {
let Some((event, addr)) = event else {
break;
};
if let Err(e) = send_event(&socket, event, addr) {
log::warn!("udp send failed: {e}");
};
}
port = port_rx.recv() => {
let Some(port) = port else {
break;
};
if socket.local_addr().unwrap().port() == port {
continue;
}
let listen_addr = SocketAddr::new("0.0.0.0".parse().unwrap(), port);
match UdpSocket::bind(listen_addr).await {
Ok(new_socket) => {
socket = new_socket;
server.port.replace(port);
let _ = frontend_notify_tx.send(FrontendEvent::PortChanged(port, None)).await;
}
Err(e) => {
log::warn!("could not change port: {e}");
let port = socket.local_addr().unwrap().port();
let _ = frontend_notify_tx.send(FrontendEvent::PortChanged(
port,
Some(format!("could not change port: {e}")),
)).await;
}
}
_ = udp_receiver => break, /* channel closed */
_ = udp_sender => break, /* channel closed */
port = port_rx.recv() => match port {
Some(port) => update_port(&server, &frontend_notify_tx, &mut socket, port).await,
_ => break,
}
}
}
@@ -75,13 +46,73 @@ pub async fn new(
Ok((udp_task, sender_tx, receiver_rx, port_tx))
}
async fn receive_event(socket: &UdpSocket) -> Result<(Event, SocketAddr)> {
async fn update_port(
server: &Server,
frontend_chan: &Sender<FrontendEvent>,
socket: &mut UdpSocket,
port: u16,
) {
// if port is the same, we dont need to change it
if socket.local_addr().unwrap().port() == port {
return;
}
// create new socket
let listen_addr = SocketAddr::new("0.0.0.0".parse().unwrap(), port);
let frontend_event = match UdpSocket::bind(listen_addr).await {
Ok(new_socket) => {
*socket = new_socket;
server.port.replace(port);
FrontendEvent::PortChanged(port, None)
}
Err(e) => {
log::warn!("could not change port: {e}");
let port = socket.local_addr().unwrap().port();
FrontendEvent::PortChanged(port, Some(format!("could not change port: {e}")))
}
};
let _ = frontend_chan.send(frontend_event).await;
}
async fn udp_receiver(
socket: &UdpSocket,
receiver_tx: &Sender<Result<(Event, SocketAddr), NetworkError>>,
) {
loop {
let event = receive_event(socket).await;
if receiver_tx.send(event).await.is_err() {
break;
}
}
}
async fn udp_sender(socket: &UdpSocket, rx: &mut Receiver<(Event, SocketAddr)>) {
loop {
let (event, addr) = match rx.recv().await {
Some(e) => e,
None => return,
};
if let Err(e) = send_event(socket, event, addr) {
log::warn!("udp send failed: {e}");
};
}
}
#[derive(Debug, Error)]
pub(crate) enum NetworkError {
#[error(transparent)]
Protocol(#[from] ProtocolError),
#[error("network error: `{0}`")]
Io(#[from] io::Error),
}
async fn receive_event(socket: &UdpSocket) -> Result<(Event, SocketAddr), NetworkError> {
let mut buf = vec![0u8; 22];
let (_amt, src) = socket.recv_from(&mut buf).await?;
Ok((Event::try_from(buf)?, src))
}
fn send_event(sock: &UdpSocket, e: Event, addr: SocketAddr) -> Result<usize> {
fn send_event(sock: &UdpSocket, e: Event, addr: SocketAddr) -> Result<usize, NetworkError> {
log::trace!("{:20} ------>->->-> {addr}", e.to_string());
let data: Vec<u8> = (&e).into();
// When udp blocks, we dont want to block the event loop.