resolver: encapsulate channel logic

This commit is contained in:
Ferdinand Schober
2024-09-07 11:57:32 +02:00
parent 0bf0173971
commit 7186ae40b1
2 changed files with 32 additions and 32 deletions

View File

@@ -1,5 +1,5 @@
use local_channel::mpsc::Receiver; use local_channel::mpsc::{channel, Receiver, Sender};
use std::net::IpAddr; use tokio::task::{JoinHandle, spawn_local};
use hickory_resolver::{error::ResolveError, TokioAsyncResolver}; use hickory_resolver::{error::ResolveError, TokioAsyncResolver};
@@ -7,37 +7,35 @@ use crate::server::Server;
use lan_mouse_ipc::ClientHandle; use lan_mouse_ipc::ClientHandle;
pub(crate) struct DnsResolver { pub(crate) struct DnsResolver {
resolver: TokioAsyncResolver, _task: JoinHandle<()>,
dns_request: Receiver<ClientHandle>, tx: Sender<ClientHandle>,
} }
impl DnsResolver { impl DnsResolver {
pub(crate) fn new(dns_request: Receiver<ClientHandle>) -> Result<Self, ResolveError> { pub(crate) fn new(server: Server) -> Result<Self, ResolveError> {
let resolver = TokioAsyncResolver::tokio_from_system_conf()?; let resolver = TokioAsyncResolver::tokio_from_system_conf()?;
let (tx, rx) = channel();
let _task = spawn_local(Self::run(server, resolver, rx));
Ok(Self { Ok(Self {
resolver, _task,
dns_request, tx,
}) })
} }
async fn resolve(&self, host: &str) -> Result<Vec<IpAddr>, ResolveError> { pub(crate) fn resolve(&self, host: ClientHandle) {
let response = self.resolver.lookup_ip(host).await?; self.tx.send(host).expect("channel closed");
for ip in response.iter() {
log::info!("{host}: adding ip {ip}");
}
Ok(response.iter().collect())
} }
pub(crate) async fn run(mut self, server: Server) { async fn run(server: Server, resolver: TokioAsyncResolver, mut rx: Receiver<ClientHandle>) {
tokio::select! { tokio::select! {
_ = server.cancelled() => {}, _ = server.cancelled() => {},
_ = self.do_dns(&server) => {}, _ = Self::do_dns(&server, &resolver, &mut rx) => {},
} }
} }
async fn do_dns(&mut self, server: &Server) { async fn do_dns(server: &Server, resolver: &TokioAsyncResolver, rx: &mut Receiver<ClientHandle>) {
loop { loop {
let handle = self.dns_request.recv().await.expect("channel closed"); let handle = rx.recv().await.expect("channel closed");
/* update resolving status */ /* update resolving status */
let hostname = match server.get_hostname(handle) { let hostname = match server.get_hostname(handle) {
@@ -48,8 +46,15 @@ impl DnsResolver {
log::info!("resolving ({handle}) `{hostname}` ..."); log::info!("resolving ({handle}) `{hostname}` ...");
server.set_resolving(handle, true); server.set_resolving(handle, true);
let ips = match self.resolve(&hostname).await { /* resolve host */
Ok(ips) => ips, let ips = match resolver.lookup_ip(&hostname).await {
Ok(response) => {
let ips = response.iter().collect::<Vec<_>>();
for ip in ips.iter() {
log::info!("{hostname}: adding ip {ip}");
}
ips
}
Err(e) => { Err(e) => {
log::warn!("could not resolve host '{hostname}': {e}"); log::warn!("could not resolve host '{hostname}': {e}");
vec![] vec![]

View File

@@ -13,7 +13,6 @@ use lan_mouse_ipc::{
AsyncFrontendListener, ClientConfig, ClientHandle, ClientState, FrontendEvent, FrontendRequest, AsyncFrontendListener, ClientConfig, ClientHandle, ClientState, FrontendEvent, FrontendRequest,
IpcListenerCreationError, Position, Status, IpcListenerCreationError, Position, Status,
}; };
use local_channel::mpsc::{channel, Sender};
use log; use log;
use std::{ use std::{
cell::{Cell, RefCell}, cell::{Cell, RefCell},
@@ -23,7 +22,7 @@ use std::{
rc::Rc, rc::Rc,
}; };
use thiserror::Error; use thiserror::Error;
use tokio::{join, signal, sync::Notify}; use tokio::{signal, sync::Notify};
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
#[derive(Debug, Error)] #[derive(Debug, Error)]
@@ -116,8 +115,6 @@ impl Server {
e => e?, e => e?,
}; };
let (dns_tx, dns_rx) = channel(); /* dns requests */
// listener + connection // listener + connection
let listener = LanMouseListener::new(self.config.port).await?; let listener = LanMouseListener::new(self.config.port).await?;
let conn = LanMouseConnection::new(self.clone()); let conn = LanMouseConnection::new(self.clone());
@@ -127,11 +124,10 @@ impl Server {
let _emulation = Emulation::new(self.clone(), listener); let _emulation = Emulation::new(self.clone(), listener);
// create dns resolver // create dns resolver
let resolver = DnsResolver::new(dns_rx)?; let resolver = DnsResolver::new(self.clone())?;
let dns_task = tokio::task::spawn_local(resolver.run(self.clone()));
for handle in self.active_clients() { for handle in self.active_clients() {
dns_tx.send(handle).expect("channel closed"); resolver.resolve(handle);
} }
loop { loop {
@@ -146,7 +142,7 @@ impl Server {
None => break, None => break,
}; };
log::debug!("received frontend request: {request:?}"); log::debug!("received frontend request: {request:?}");
self.handle_request(&capture, request, &dns_tx); self.handle_request(&capture, request, &resolver);
log::debug!("handled frontend request"); log::debug!("handled frontend request");
} }
_ = self.notifies.frontend_event_pending.notified() => { _ = self.notifies.frontend_event_pending.notified() => {
@@ -169,7 +165,6 @@ impl Server {
log::info!("terminating service"); log::info!("terminating service");
self.cancel(); self.cancel();
let _ = join!(dns_task);
Ok(()) Ok(())
} }
@@ -233,7 +228,7 @@ impl Server {
&self, &self,
capture: &Capture, capture: &Capture,
event: FrontendRequest, event: FrontendRequest,
dns: &Sender<ClientHandle>, dns: &DnsResolver,
) -> bool { ) -> bool {
log::debug!("frontend: {event:?}"); log::debug!("frontend: {event:?}");
match event { match event {
@@ -264,7 +259,7 @@ impl Server {
FrontendRequest::UpdatePosition(handle, pos) => { FrontendRequest::UpdatePosition(handle, pos) => {
self.update_pos(handle, capture, pos); self.update_pos(handle, capture, pos);
} }
FrontendRequest::ResolveDns(handle) => dns.send(handle).expect("channel closed"), FrontendRequest::ResolveDns(handle) => dns.resolve(handle),
FrontendRequest::Sync => { FrontendRequest::Sync => {
self.enumerate(); self.enumerate();
self.notify_frontend(FrontendEvent::EmulationStatus(self.emulation_status.get())); self.notify_frontend(FrontendEvent::EmulationStatus(self.emulation_status.get()));
@@ -395,7 +390,7 @@ impl Server {
&self, &self,
handle: ClientHandle, handle: ClientHandle,
hostname: Option<String>, hostname: Option<String>,
dns: &Sender<ClientHandle>, dns: &DnsResolver,
) { ) {
let mut client_manager = self.client_manager.borrow_mut(); let mut client_manager = self.client_manager.borrow_mut();
let Some((c, s)) = client_manager.get_mut(handle) else { let Some((c, s)) = client_manager.get_mut(handle) else {
@@ -409,7 +404,7 @@ impl Server {
s.dns_ips.clear(); s.dns_ips.clear();
drop(client_manager); drop(client_manager);
self.update_ips(handle); self.update_ips(handle);
dns.send(handle).expect("channel closed"); dns.resolve(handle);
} }
self.client_updated(handle); self.client_updated(handle);
} }