From 7186ae40b10d4278e846c9b71fdb7f3400224b3b Mon Sep 17 00:00:00 2001 From: Ferdinand Schober Date: Sat, 7 Sep 2024 11:57:32 +0200 Subject: [PATCH] resolver: encapsulate channel logic --- src/dns.rs | 43 ++++++++++++++++++++++++------------------- src/server.rs | 21 ++++++++------------- 2 files changed, 32 insertions(+), 32 deletions(-) diff --git a/src/dns.rs b/src/dns.rs index 496b79d..0b923d5 100644 --- a/src/dns.rs +++ b/src/dns.rs @@ -1,5 +1,5 @@ -use local_channel::mpsc::Receiver; -use std::net::IpAddr; +use local_channel::mpsc::{channel, Receiver, Sender}; +use tokio::task::{JoinHandle, spawn_local}; use hickory_resolver::{error::ResolveError, TokioAsyncResolver}; @@ -7,37 +7,35 @@ use crate::server::Server; use lan_mouse_ipc::ClientHandle; pub(crate) struct DnsResolver { - resolver: TokioAsyncResolver, - dns_request: Receiver, + _task: JoinHandle<()>, + tx: Sender, } impl DnsResolver { - pub(crate) fn new(dns_request: Receiver) -> Result { + pub(crate) fn new(server: Server) -> Result { let resolver = TokioAsyncResolver::tokio_from_system_conf()?; + let (tx, rx) = channel(); + let _task = spawn_local(Self::run(server, resolver, rx)); Ok(Self { - resolver, - dns_request, + _task, + tx, }) } - async fn resolve(&self, host: &str) -> Result, ResolveError> { - let response = self.resolver.lookup_ip(host).await?; - for ip in response.iter() { - log::info!("{host}: adding ip {ip}"); - } - Ok(response.iter().collect()) + pub(crate) fn resolve(&self, host: ClientHandle) { + self.tx.send(host).expect("channel closed"); } - pub(crate) async fn run(mut self, server: Server) { + async fn run(server: Server, resolver: TokioAsyncResolver, mut rx: Receiver) { tokio::select! { _ = server.cancelled() => {}, - _ = self.do_dns(&server) => {}, + _ = Self::do_dns(&server, &resolver, &mut rx) => {}, } } - async fn do_dns(&mut self, server: &Server) { + async fn do_dns(server: &Server, resolver: &TokioAsyncResolver, rx: &mut Receiver) { loop { - let handle = self.dns_request.recv().await.expect("channel closed"); + let handle = rx.recv().await.expect("channel closed"); /* update resolving status */ let hostname = match server.get_hostname(handle) { @@ -48,8 +46,15 @@ impl DnsResolver { log::info!("resolving ({handle}) `{hostname}` ..."); server.set_resolving(handle, true); - let ips = match self.resolve(&hostname).await { - Ok(ips) => ips, + /* resolve host */ + let ips = match resolver.lookup_ip(&hostname).await { + Ok(response) => { + let ips = response.iter().collect::>(); + for ip in ips.iter() { + log::info!("{hostname}: adding ip {ip}"); + } + ips + } Err(e) => { log::warn!("could not resolve host '{hostname}': {e}"); vec![] diff --git a/src/server.rs b/src/server.rs index a5b7057..40de551 100644 --- a/src/server.rs +++ b/src/server.rs @@ -13,7 +13,6 @@ use lan_mouse_ipc::{ AsyncFrontendListener, ClientConfig, ClientHandle, ClientState, FrontendEvent, FrontendRequest, IpcListenerCreationError, Position, Status, }; -use local_channel::mpsc::{channel, Sender}; use log; use std::{ cell::{Cell, RefCell}, @@ -23,7 +22,7 @@ use std::{ rc::Rc, }; use thiserror::Error; -use tokio::{join, signal, sync::Notify}; +use tokio::{signal, sync::Notify}; use tokio_util::sync::CancellationToken; #[derive(Debug, Error)] @@ -116,8 +115,6 @@ impl Server { e => e?, }; - let (dns_tx, dns_rx) = channel(); /* dns requests */ - // listener + connection let listener = LanMouseListener::new(self.config.port).await?; let conn = LanMouseConnection::new(self.clone()); @@ -127,11 +124,10 @@ impl Server { let _emulation = Emulation::new(self.clone(), listener); // create dns resolver - let resolver = DnsResolver::new(dns_rx)?; - let dns_task = tokio::task::spawn_local(resolver.run(self.clone())); + let resolver = DnsResolver::new(self.clone())?; for handle in self.active_clients() { - dns_tx.send(handle).expect("channel closed"); + resolver.resolve(handle); } loop { @@ -146,7 +142,7 @@ impl Server { None => break, }; log::debug!("received frontend request: {request:?}"); - self.handle_request(&capture, request, &dns_tx); + self.handle_request(&capture, request, &resolver); log::debug!("handled frontend request"); } _ = self.notifies.frontend_event_pending.notified() => { @@ -169,7 +165,6 @@ impl Server { log::info!("terminating service"); self.cancel(); - let _ = join!(dns_task); Ok(()) } @@ -233,7 +228,7 @@ impl Server { &self, capture: &Capture, event: FrontendRequest, - dns: &Sender, + dns: &DnsResolver, ) -> bool { log::debug!("frontend: {event:?}"); match event { @@ -264,7 +259,7 @@ impl Server { FrontendRequest::UpdatePosition(handle, pos) => { self.update_pos(handle, capture, pos); } - FrontendRequest::ResolveDns(handle) => dns.send(handle).expect("channel closed"), + FrontendRequest::ResolveDns(handle) => dns.resolve(handle), FrontendRequest::Sync => { self.enumerate(); self.notify_frontend(FrontendEvent::EmulationStatus(self.emulation_status.get())); @@ -395,7 +390,7 @@ impl Server { &self, handle: ClientHandle, hostname: Option, - dns: &Sender, + dns: &DnsResolver, ) { let mut client_manager = self.client_manager.borrow_mut(); let Some((c, s)) = client_manager.get_mut(handle) else { @@ -409,7 +404,7 @@ impl Server { s.dns_ips.clear(); drop(client_manager); self.update_ips(handle); - dns.send(handle).expect("channel closed"); + dns.resolve(handle); } self.client_updated(handle); }