mirror of
https://github.com/feschber/lan-mouse.git
synced 2026-04-21 01:33:19 +03:00
refactor dns task
This commit is contained in:
@@ -1,7 +1,6 @@
|
|||||||
use log;
|
use log;
|
||||||
use std::{
|
use std::{
|
||||||
cell::{Cell, RefCell},
|
cell::{Cell, RefCell},
|
||||||
collections::HashSet,
|
|
||||||
io::Result,
|
io::Result,
|
||||||
rc::Rc,
|
rc::Rc,
|
||||||
time::Duration,
|
time::Duration,
|
||||||
@@ -21,11 +20,12 @@ use crate::{
|
|||||||
};
|
};
|
||||||
use crate::{consumer, producer};
|
use crate::{consumer, producer};
|
||||||
|
|
||||||
use self::consumer_task::ConsumerEvent;
|
use self::{consumer_task::ConsumerEvent, resolver_task::DnsRequest};
|
||||||
|
|
||||||
mod consumer_task;
|
mod consumer_task;
|
||||||
mod frontend_task;
|
mod frontend_task;
|
||||||
mod producer_task;
|
mod producer_task;
|
||||||
|
mod resolver_task;
|
||||||
|
|
||||||
const MAX_RESPONSE_TIME: Duration = Duration::from_millis(500);
|
const MAX_RESPONSE_TIME: Duration = Duration::from_millis(500);
|
||||||
|
|
||||||
@@ -83,7 +83,6 @@ impl Server {
|
|||||||
};
|
};
|
||||||
let (consumer, producer) = tokio::join!(consumer::create(), producer::create());
|
let (consumer, producer) = tokio::join!(consumer::create(), producer::create());
|
||||||
|
|
||||||
let (resolve_tx, mut resolve_rx) = tokio::sync::mpsc::channel(32);
|
|
||||||
let (receiver_tx, receiver_rx) = tokio::sync::mpsc::channel(32);
|
let (receiver_tx, receiver_rx) = tokio::sync::mpsc::channel(32);
|
||||||
let (sender_tx, mut sender_rx) = tokio::sync::mpsc::channel(32);
|
let (sender_tx, mut sender_rx) = tokio::sync::mpsc::channel(32);
|
||||||
let (port_tx, mut port_rx) = tokio::sync::mpsc::channel(32);
|
let (port_tx, mut port_rx) = tokio::sync::mpsc::channel(32);
|
||||||
@@ -103,6 +102,10 @@ impl Server {
|
|||||||
timer_tx,
|
timer_tx,
|
||||||
);
|
);
|
||||||
|
|
||||||
|
// create dns resolver
|
||||||
|
let resolver = dns::DnsResolver::new().await?;
|
||||||
|
let (mut resolver_task, resolve_tx) = resolver_task::new(resolver, self.clone());
|
||||||
|
|
||||||
// frontend listener
|
// frontend listener
|
||||||
let (mut frontend_task, frontend_tx, frontend_notify_tx) = frontend_task::new(
|
let (mut frontend_task, frontend_tx, frontend_notify_tx) = frontend_task::new(
|
||||||
frontend,
|
frontend,
|
||||||
@@ -113,37 +116,10 @@ impl Server {
|
|||||||
port_tx,
|
port_tx,
|
||||||
);
|
);
|
||||||
|
|
||||||
// dns resolver
|
|
||||||
|
|
||||||
// create dns resolver
|
|
||||||
let resolver = dns::DnsResolver::new().await?;
|
|
||||||
let server = self.clone();
|
|
||||||
let mut resolver_task = tokio::task::spawn_local(async move {
|
|
||||||
loop {
|
|
||||||
let (host, client): (String, ClientHandle) = match resolve_rx.recv().await {
|
|
||||||
Some(r) => r,
|
|
||||||
None => break,
|
|
||||||
};
|
|
||||||
let ips = match resolver.resolve(&host).await {
|
|
||||||
Ok(ips) => ips,
|
|
||||||
Err(e) => {
|
|
||||||
log::warn!("could not resolve host '{host}': {e}");
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
if let Some(state) = server.client_manager.borrow_mut().get_mut(client) {
|
|
||||||
let mut addrs = HashSet::from_iter(state.client.fix_ips.iter().cloned());
|
|
||||||
for ip in ips {
|
|
||||||
addrs.insert(ip);
|
|
||||||
}
|
|
||||||
state.client.ips = addrs;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
// bind the udp socket
|
// bind the udp socket
|
||||||
let listen_addr = SocketAddr::new("0.0.0.0".parse().unwrap(), self.port.get());
|
let listen_addr = SocketAddr::new("0.0.0.0".parse().unwrap(), self.port.get());
|
||||||
let mut socket = UdpSocket::bind(listen_addr).await?;
|
let mut socket = UdpSocket::bind(listen_addr).await?;
|
||||||
|
let server = self.clone();
|
||||||
// udp task
|
// udp task
|
||||||
let mut udp_task = tokio::task::spawn_local(async move {
|
let mut udp_task = tokio::task::spawn_local(async move {
|
||||||
loop {
|
loop {
|
||||||
@@ -318,7 +294,7 @@ impl Server {
|
|||||||
.send(FrontendEvent::ActivateClient(handle, true))
|
.send(FrontendEvent::ActivateClient(handle, true))
|
||||||
.await?;
|
.await?;
|
||||||
if let Some(hostname) = hostname {
|
if let Some(hostname) = hostname {
|
||||||
let _ = resolve_tx.send((hostname, handle)).await;
|
let _ = resolve_tx.send(DnsRequest { hostname, handle }).await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -17,14 +17,16 @@ use crate::{
|
|||||||
frontend::{self, FrontendEvent, FrontendListener, FrontendNotify},
|
frontend::{self, FrontendEvent, FrontendListener, FrontendNotify},
|
||||||
};
|
};
|
||||||
|
|
||||||
use super::{consumer_task::ConsumerEvent, producer_task::ProducerEvent, Server};
|
use super::{
|
||||||
|
consumer_task::ConsumerEvent, producer_task::ProducerEvent, resolver_task::DnsRequest, Server,
|
||||||
|
};
|
||||||
|
|
||||||
pub(crate) fn new(
|
pub(crate) fn new(
|
||||||
mut frontend: FrontendListener,
|
mut frontend: FrontendListener,
|
||||||
server: Server,
|
server: Server,
|
||||||
producer_notify: Sender<ProducerEvent>,
|
producer_notify: Sender<ProducerEvent>,
|
||||||
consumer_notify: Sender<ConsumerEvent>,
|
consumer_notify: Sender<ConsumerEvent>,
|
||||||
resolve_ch: Sender<(String, u32)>,
|
resolve_ch: Sender<DnsRequest>,
|
||||||
port_tx: Sender<u16>,
|
port_tx: Sender<u16>,
|
||||||
) -> (
|
) -> (
|
||||||
JoinHandle<Result<()>>,
|
JoinHandle<Result<()>>,
|
||||||
@@ -98,7 +100,7 @@ async fn handle_frontend_event(
|
|||||||
server: &Server,
|
server: &Server,
|
||||||
producer_tx: &Sender<ProducerEvent>,
|
producer_tx: &Sender<ProducerEvent>,
|
||||||
consumer_tx: &Sender<ConsumerEvent>,
|
consumer_tx: &Sender<ConsumerEvent>,
|
||||||
resolve_tx: &Sender<(String, ClientHandle)>,
|
resolve_tx: &Sender<DnsRequest>,
|
||||||
frontend: &mut FrontendListener,
|
frontend: &mut FrontendListener,
|
||||||
port_tx: &Sender<u16>,
|
port_tx: &Sender<u16>,
|
||||||
event: FrontendEvent,
|
event: FrontendEvent,
|
||||||
@@ -173,7 +175,7 @@ async fn handle_frontend_event(
|
|||||||
|
|
||||||
pub async fn add_client(
|
pub async fn add_client(
|
||||||
server: &Server,
|
server: &Server,
|
||||||
resolver_tx: &Sender<(String, ClientHandle)>,
|
resolver_tx: &Sender<DnsRequest>,
|
||||||
hostname: Option<String>,
|
hostname: Option<String>,
|
||||||
addr: HashSet<IpAddr>,
|
addr: HashSet<IpAddr>,
|
||||||
port: u16,
|
port: u16,
|
||||||
@@ -194,7 +196,7 @@ pub async fn add_client(
|
|||||||
log::debug!("add_client {handle}");
|
log::debug!("add_client {handle}");
|
||||||
|
|
||||||
if let Some(hostname) = hostname {
|
if let Some(hostname) = hostname {
|
||||||
let _ = resolver_tx.send((hostname, handle)).await;
|
let _ = resolver_tx.send(DnsRequest { hostname, handle }).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
handle
|
handle
|
||||||
@@ -266,7 +268,7 @@ async fn update_client(
|
|||||||
server: &Server,
|
server: &Server,
|
||||||
producer_notify_tx: &Sender<ProducerEvent>,
|
producer_notify_tx: &Sender<ProducerEvent>,
|
||||||
consumer_notify_tx: &Sender<ConsumerEvent>,
|
consumer_notify_tx: &Sender<ConsumerEvent>,
|
||||||
resolve_tx: &Sender<(String, ClientHandle)>,
|
resolve_tx: &Sender<DnsRequest>,
|
||||||
client_update: (ClientHandle, Option<String>, u16, Position),
|
client_update: (ClientHandle, Option<String>, u16, Position),
|
||||||
) {
|
) {
|
||||||
let (handle, hostname, port, pos) = client_update;
|
let (handle, hostname, port, pos) = client_update;
|
||||||
@@ -303,7 +305,7 @@ async fn update_client(
|
|||||||
|
|
||||||
// resolve dns
|
// resolve dns
|
||||||
if let Some(hostname) = hostname {
|
if let Some(hostname) = hostname {
|
||||||
let _ = resolve_tx.send((hostname, handle)).await;
|
let _ = resolve_tx.send(DnsRequest { hostname, handle }).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
// update state in event consumer & producer
|
// update state in event consumer & producer
|
||||||
|
|||||||
40
src/server/resolver_task.rs
Normal file
40
src/server/resolver_task.rs
Normal file
@@ -0,0 +1,40 @@
|
|||||||
|
use std::collections::HashSet;
|
||||||
|
|
||||||
|
use tokio::{sync::mpsc::Sender, task::JoinHandle};
|
||||||
|
|
||||||
|
use crate::{client::ClientHandle, dns::DnsResolver};
|
||||||
|
|
||||||
|
use super::Server;
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct DnsRequest {
|
||||||
|
pub hostname: String,
|
||||||
|
pub handle: ClientHandle,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn new(resolver: DnsResolver, server: Server) -> (JoinHandle<()>, Sender<DnsRequest>) {
|
||||||
|
let (dns_tx, mut dns_rx) = tokio::sync::mpsc::channel::<DnsRequest>(32);
|
||||||
|
let resolver_task = tokio::task::spawn_local(async move {
|
||||||
|
loop {
|
||||||
|
let (host, handle) = match dns_rx.recv().await {
|
||||||
|
Some(r) => (r.hostname, r.handle),
|
||||||
|
None => break,
|
||||||
|
};
|
||||||
|
let ips = match resolver.resolve(&host).await {
|
||||||
|
Ok(ips) => ips,
|
||||||
|
Err(e) => {
|
||||||
|
log::warn!("could not resolve host '{host}': {e}");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
if let Some(state) = server.client_manager.borrow_mut().get_mut(handle) {
|
||||||
|
let mut addrs = HashSet::from_iter(state.client.fix_ips.iter().cloned());
|
||||||
|
for ip in ips {
|
||||||
|
addrs.insert(ip);
|
||||||
|
}
|
||||||
|
state.client.ips = addrs;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
(resolver_task, dns_tx)
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user