mirror of
https://github.com/feschber/lan-mouse.git
synced 2026-03-12 15:50:55 +03:00
Allow input capture & emulation being disabled (#158)
* Input capture and emulation can now be disabled and will prompt the user to enable again. * Improved error handling to deliver more useful error messages
This commit is contained in:
committed by
GitHub
parent
55bdf1e63e
commit
bea7d6f8a5
@@ -1,70 +1,122 @@
|
||||
use anyhow::{anyhow, Result};
|
||||
use futures::StreamExt;
|
||||
use std::{collections::HashSet, net::SocketAddr};
|
||||
|
||||
use tokio::{process::Command, sync::mpsc::Sender, task::JoinHandle};
|
||||
use tokio::{
|
||||
process::Command,
|
||||
sync::mpsc::{Receiver, Sender},
|
||||
task::JoinHandle,
|
||||
};
|
||||
|
||||
use input_capture::{self, error::CaptureCreationError, CaptureHandle, InputCapture, Position};
|
||||
use input_capture::{self, CaptureError, CaptureHandle, InputCapture, InputCaptureError, Position};
|
||||
|
||||
use input_event::{scancode, Event, KeyboardEvent};
|
||||
|
||||
use crate::{client::ClientHandle, config::CaptureBackend, server::State};
|
||||
use crate::{client::ClientHandle, frontend::Status, server::State};
|
||||
|
||||
use super::Server;
|
||||
|
||||
#[derive(Clone, Copy, Debug)]
|
||||
pub enum CaptureEvent {
|
||||
pub(crate) enum CaptureEvent {
|
||||
/// capture must release the mouse
|
||||
Release,
|
||||
/// add a capture client
|
||||
Create(CaptureHandle, Position),
|
||||
/// destory a capture client
|
||||
Destroy(CaptureHandle),
|
||||
/// termination signal
|
||||
Terminate,
|
||||
}
|
||||
|
||||
pub fn new(
|
||||
backend: Option<CaptureBackend>,
|
||||
pub(crate) fn new(
|
||||
server: Server,
|
||||
capture_rx: Receiver<CaptureEvent>,
|
||||
udp_send: Sender<(Event, SocketAddr)>,
|
||||
) -> JoinHandle<()> {
|
||||
let backend = server.config.capture_backend.map(|b| b.into());
|
||||
tokio::task::spawn_local(capture_task(server, backend, udp_send, capture_rx))
|
||||
}
|
||||
|
||||
async fn capture_task(
|
||||
server: Server,
|
||||
backend: Option<input_capture::Backend>,
|
||||
sender_tx: Sender<(Event, SocketAddr)>,
|
||||
timer_tx: Sender<()>,
|
||||
release_bind: Vec<scancode::Linux>,
|
||||
) -> Result<(JoinHandle<Result<()>>, Sender<CaptureEvent>), CaptureCreationError> {
|
||||
let (tx, mut rx) = tokio::sync::mpsc::channel(32);
|
||||
let backend = backend.map(|b| b.into());
|
||||
let task = tokio::task::spawn_local(async move {
|
||||
let mut capture = input_capture::create(backend).await?;
|
||||
let mut pressed_keys = HashSet::new();
|
||||
mut notify_rx: Receiver<CaptureEvent>,
|
||||
) {
|
||||
loop {
|
||||
if let Err(e) = do_capture(backend, &server, &sender_tx, &mut notify_rx).await {
|
||||
log::warn!("input capture exited: {e}");
|
||||
}
|
||||
server.set_capture_status(Status::Disabled);
|
||||
if server.is_cancelled() {
|
||||
break;
|
||||
}
|
||||
|
||||
// allow cancellation
|
||||
loop {
|
||||
tokio::select! {
|
||||
event = capture.next() => {
|
||||
match event {
|
||||
Some(Ok(event)) => handle_capture_event(&server, &mut capture, &sender_tx, &timer_tx, event, &mut pressed_keys, &release_bind).await?,
|
||||
Some(Err(e)) => return Err(anyhow!("input capture: {e:?}")),
|
||||
None => return Err(anyhow!("input capture terminated")),
|
||||
}
|
||||
}
|
||||
e = rx.recv() => {
|
||||
log::debug!("input capture notify rx: {e:?}");
|
||||
match e {
|
||||
Some(e) => match e {
|
||||
CaptureEvent::Release => {
|
||||
capture.release()?;
|
||||
server.state.replace(State::Receiving);
|
||||
}
|
||||
CaptureEvent::Create(h, p) => capture.create(h, p)?,
|
||||
CaptureEvent::Destroy(h) => capture.destroy(h)?,
|
||||
CaptureEvent::Terminate => break,
|
||||
},
|
||||
None => break,
|
||||
}
|
||||
}
|
||||
_ = notify_rx.recv() => continue, /* need to ignore requests here! */
|
||||
_ = server.capture_notified() => break,
|
||||
_ = server.cancelled() => return,
|
||||
}
|
||||
}
|
||||
anyhow::Ok(())
|
||||
});
|
||||
Ok((task, tx))
|
||||
}
|
||||
}
|
||||
|
||||
async fn do_capture(
|
||||
backend: Option<input_capture::Backend>,
|
||||
server: &Server,
|
||||
sender_tx: &Sender<(Event, SocketAddr)>,
|
||||
notify_rx: &mut Receiver<CaptureEvent>,
|
||||
) -> Result<(), InputCaptureError> {
|
||||
/* allow cancelling capture request */
|
||||
let mut capture = tokio::select! {
|
||||
r = input_capture::create(backend) => {
|
||||
r?
|
||||
},
|
||||
_ = server.cancelled() => return Ok(()),
|
||||
};
|
||||
|
||||
server.set_capture_status(Status::Enabled);
|
||||
|
||||
// FIXME DUPLICATES
|
||||
let clients = server
|
||||
.client_manager
|
||||
.borrow()
|
||||
.get_client_states()
|
||||
.map(|(h, s)| (h, s.clone()))
|
||||
.collect::<Vec<_>>();
|
||||
log::info!("{clients:?}");
|
||||
for (handle, (config, _state)) in clients {
|
||||
capture.create(handle, config.pos.into()).await?;
|
||||
}
|
||||
|
||||
let mut pressed_keys = HashSet::new();
|
||||
loop {
|
||||
tokio::select! {
|
||||
event = capture.next() => {
|
||||
match event {
|
||||
Some(Ok(event)) => handle_capture_event(server, &mut capture, sender_tx, event, &mut pressed_keys).await?,
|
||||
Some(Err(e)) => return Err(e.into()),
|
||||
None => return Ok(()),
|
||||
}
|
||||
}
|
||||
e = notify_rx.recv() => {
|
||||
log::debug!("input capture notify rx: {e:?}");
|
||||
match e {
|
||||
Some(e) => match e {
|
||||
CaptureEvent::Release => {
|
||||
capture.release().await?;
|
||||
server.state.replace(State::Receiving);
|
||||
}
|
||||
CaptureEvent::Create(h, p) => capture.create(h, p).await?,
|
||||
CaptureEvent::Destroy(h) => capture.destroy(h).await?,
|
||||
},
|
||||
None => break,
|
||||
}
|
||||
}
|
||||
_ = server.cancelled() => break,
|
||||
}
|
||||
}
|
||||
capture.terminate().await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn update_pressed_keys(pressed_keys: &mut HashSet<scancode::Linux>, key: u32, state: u8) {
|
||||
@@ -81,21 +133,19 @@ async fn handle_capture_event(
|
||||
server: &Server,
|
||||
capture: &mut Box<dyn InputCapture>,
|
||||
sender_tx: &Sender<(Event, SocketAddr)>,
|
||||
timer_tx: &Sender<()>,
|
||||
event: (CaptureHandle, Event),
|
||||
pressed_keys: &mut HashSet<scancode::Linux>,
|
||||
release_bind: &[scancode::Linux],
|
||||
) -> Result<()> {
|
||||
) -> Result<(), CaptureError> {
|
||||
let (handle, mut e) = event;
|
||||
log::trace!("({handle}) {e:?}");
|
||||
|
||||
if let Event::Keyboard(KeyboardEvent::Key { key, state, .. }) = e {
|
||||
update_pressed_keys(pressed_keys, key, state);
|
||||
log::debug!("{pressed_keys:?}");
|
||||
if release_bind.iter().all(|k| pressed_keys.contains(k)) {
|
||||
if server.release_bind.iter().all(|k| pressed_keys.contains(k)) {
|
||||
pressed_keys.clear();
|
||||
log::info!("releasing pointer");
|
||||
capture.release()?;
|
||||
capture.release().await?;
|
||||
server.state.replace(State::Receiving);
|
||||
log::trace!("STATE ===> Receiving");
|
||||
// send an event to release all the modifiers
|
||||
@@ -103,44 +153,49 @@ async fn handle_capture_event(
|
||||
}
|
||||
}
|
||||
|
||||
let (addr, enter, start_timer) = {
|
||||
let info = {
|
||||
let mut enter = false;
|
||||
let mut start_timer = false;
|
||||
|
||||
// get client state for handle
|
||||
let mut client_manager = server.client_manager.borrow_mut();
|
||||
let client_state = match client_manager.get_mut(handle) {
|
||||
Some((_, s)) => s,
|
||||
None => {
|
||||
// should not happen
|
||||
log::warn!("unknown client!");
|
||||
capture.release()?;
|
||||
server.state.replace(State::Receiving);
|
||||
log::trace!("STATE ===> Receiving");
|
||||
return Ok(());
|
||||
let client_state = client_manager.get_mut(handle).map(|(_, s)| s);
|
||||
if let Some(client_state) = client_state {
|
||||
// if we just entered the client we want to send additional enter events until
|
||||
// we get a leave event
|
||||
if let Event::Enter() = e {
|
||||
server.state.replace(State::AwaitingLeave);
|
||||
server.active_client.replace(Some(handle));
|
||||
log::trace!("Active client => {}", handle);
|
||||
start_timer = true;
|
||||
log::trace!("STATE ===> AwaitingLeave");
|
||||
enter = true;
|
||||
} else {
|
||||
// ignore any potential events in receiving mode
|
||||
if server.state.get() == State::Receiving && e != Event::Disconnect() {
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// if we just entered the client we want to send additional enter events until
|
||||
// we get a leave event
|
||||
if let Event::Enter() = e {
|
||||
server.state.replace(State::AwaitingLeave);
|
||||
server.active_client.replace(Some(handle));
|
||||
log::trace!("Active client => {}", handle);
|
||||
start_timer = true;
|
||||
log::trace!("STATE ===> AwaitingLeave");
|
||||
enter = true;
|
||||
Some((client_state.active_addr, enter, start_timer))
|
||||
} else {
|
||||
// ignore any potential events in receiving mode
|
||||
if server.state.get() == State::Receiving && e != Event::Disconnect() {
|
||||
return Ok(());
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
(client_state.active_addr, enter, start_timer)
|
||||
};
|
||||
|
||||
let (addr, enter, start_timer) = match info {
|
||||
Some(i) => i,
|
||||
None => {
|
||||
// should not happen
|
||||
log::warn!("unknown client!");
|
||||
capture.release().await?;
|
||||
server.state.replace(State::Receiving);
|
||||
log::trace!("STATE ===> Receiving");
|
||||
return Ok(());
|
||||
}
|
||||
};
|
||||
|
||||
if start_timer {
|
||||
let _ = timer_tx.try_send(());
|
||||
server.restart_ping_timer();
|
||||
}
|
||||
if enter {
|
||||
spawn_hook_command(server, handle);
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
use std::net::SocketAddr;
|
||||
|
||||
use thiserror::Error;
|
||||
use tokio::{
|
||||
sync::mpsc::{Receiver, Sender},
|
||||
task::JoinHandle,
|
||||
@@ -8,102 +7,130 @@ use tokio::{
|
||||
|
||||
use crate::{
|
||||
client::{ClientHandle, ClientManager},
|
||||
config::EmulationBackend,
|
||||
frontend::Status,
|
||||
server::State,
|
||||
};
|
||||
use input_emulation::{
|
||||
self,
|
||||
error::{EmulationCreationError, EmulationError},
|
||||
EmulationHandle, InputEmulation,
|
||||
};
|
||||
use input_emulation::{self, EmulationError, EmulationHandle, InputEmulation, InputEmulationError};
|
||||
use input_event::{Event, KeyboardEvent};
|
||||
|
||||
use super::{network_task::NetworkError, CaptureEvent, Server};
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub enum EmulationEvent {
|
||||
pub(crate) enum EmulationEvent {
|
||||
/// create a new client
|
||||
Create(EmulationHandle),
|
||||
/// destroy a client
|
||||
Destroy(EmulationHandle),
|
||||
/// input emulation must release keys for client
|
||||
ReleaseKeys(ClientHandle),
|
||||
/// termination signal
|
||||
Terminate,
|
||||
}
|
||||
|
||||
pub fn new(
|
||||
backend: Option<EmulationBackend>,
|
||||
pub(crate) fn new(
|
||||
server: Server,
|
||||
emulation_rx: Receiver<EmulationEvent>,
|
||||
udp_rx: Receiver<Result<(Event, SocketAddr), NetworkError>>,
|
||||
sender_tx: Sender<(Event, SocketAddr)>,
|
||||
capture_tx: Sender<CaptureEvent>,
|
||||
timer_tx: Sender<()>,
|
||||
) -> (
|
||||
JoinHandle<Result<(), LanMouseEmulationError>>,
|
||||
Sender<EmulationEvent>,
|
||||
) {
|
||||
let (tx, rx) = tokio::sync::mpsc::channel(32);
|
||||
let emulation_task =
|
||||
emulation_task(backend, rx, server, udp_rx, sender_tx, capture_tx, timer_tx);
|
||||
let emulate_task = tokio::task::spawn_local(emulation_task);
|
||||
(emulate_task, tx)
|
||||
}
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub enum LanMouseEmulationError {
|
||||
#[error("error creating input-emulation: `{0}`")]
|
||||
Create(#[from] EmulationCreationError),
|
||||
#[error("error emulating input: `{0}`")]
|
||||
Emulate(#[from] EmulationError),
|
||||
) -> JoinHandle<()> {
|
||||
let emulation_task = emulation_task(server, emulation_rx, udp_rx, sender_tx, capture_tx);
|
||||
tokio::task::spawn_local(emulation_task)
|
||||
}
|
||||
|
||||
async fn emulation_task(
|
||||
backend: Option<EmulationBackend>,
|
||||
mut rx: Receiver<EmulationEvent>,
|
||||
server: Server,
|
||||
mut rx: Receiver<EmulationEvent>,
|
||||
mut udp_rx: Receiver<Result<(Event, SocketAddr), NetworkError>>,
|
||||
sender_tx: Sender<(Event, SocketAddr)>,
|
||||
capture_tx: Sender<CaptureEvent>,
|
||||
timer_tx: Sender<()>,
|
||||
) -> Result<(), LanMouseEmulationError> {
|
||||
let backend = backend.map(|b| b.into());
|
||||
let mut emulation = input_emulation::create(backend).await?;
|
||||
|
||||
let mut last_ignored = None;
|
||||
) {
|
||||
loop {
|
||||
tokio::select! {
|
||||
udp_event = udp_rx.recv() => {
|
||||
let udp_event = match udp_event {
|
||||
Some(Ok(e)) => e,
|
||||
Some(Err(e)) => {
|
||||
log::warn!("network error: {e}");
|
||||
continue;
|
||||
}
|
||||
None => break,
|
||||
};
|
||||
handle_udp_rx(&server, &capture_tx, &mut emulation, &sender_tx, &mut last_ignored, udp_event, &timer_tx).await?;
|
||||
}
|
||||
emulate_event = rx.recv() => {
|
||||
match emulate_event {
|
||||
Some(e) => match e {
|
||||
EmulationEvent::Create(h) => emulation.create(h).await,
|
||||
EmulationEvent::Destroy(h) => emulation.destroy(h).await,
|
||||
EmulationEvent::ReleaseKeys(c) => release_keys(&server, &mut emulation, c).await?,
|
||||
EmulationEvent::Terminate => break,
|
||||
},
|
||||
None => break,
|
||||
}
|
||||
if let Err(e) = do_emulation(&server, &mut rx, &mut udp_rx, &sender_tx, &capture_tx).await {
|
||||
log::warn!("input emulation exited: {e}");
|
||||
}
|
||||
server.set_emulation_status(Status::Disabled);
|
||||
if server.is_cancelled() {
|
||||
break;
|
||||
}
|
||||
|
||||
// allow cancellation
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = rx.recv() => continue, /* need to ignore requests here! */
|
||||
_ = server.emulation_notified() => break,
|
||||
_ = server.cancelled() => return,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn do_emulation(
|
||||
server: &Server,
|
||||
rx: &mut Receiver<EmulationEvent>,
|
||||
udp_rx: &mut Receiver<Result<(Event, SocketAddr), NetworkError>>,
|
||||
sender_tx: &Sender<(Event, SocketAddr)>,
|
||||
capture_tx: &Sender<CaptureEvent>,
|
||||
) -> Result<(), InputEmulationError> {
|
||||
let backend = server.config.emulation_backend.map(|b| b.into());
|
||||
log::info!("creating input emulation...");
|
||||
let mut emulation = tokio::select! {
|
||||
r = input_emulation::create(backend) => {
|
||||
r?
|
||||
}
|
||||
_ = server.cancelled() => return Ok(()),
|
||||
};
|
||||
|
||||
server.set_emulation_status(Status::Enabled);
|
||||
|
||||
// add clients
|
||||
for handle in server.active_clients() {
|
||||
emulation.create(handle).await;
|
||||
}
|
||||
|
||||
let res = do_emulation_session(server, &mut emulation, rx, udp_rx, sender_tx, capture_tx).await;
|
||||
|
||||
emulation.terminate().await;
|
||||
res?;
|
||||
|
||||
// release potentially still pressed keys
|
||||
release_all_keys(&server, &mut emulation).await?;
|
||||
release_all_keys(server, &mut emulation).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn do_emulation_session(
|
||||
server: &Server,
|
||||
emulation: &mut Box<dyn InputEmulation>,
|
||||
rx: &mut Receiver<EmulationEvent>,
|
||||
udp_rx: &mut Receiver<Result<(Event, SocketAddr), NetworkError>>,
|
||||
sender_tx: &Sender<(Event, SocketAddr)>,
|
||||
capture_tx: &Sender<CaptureEvent>,
|
||||
) -> Result<(), InputEmulationError> {
|
||||
let mut last_ignored = None;
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
udp_event = udp_rx.recv() => {
|
||||
let udp_event = match udp_event.expect("channel closed") {
|
||||
Ok(e) => e,
|
||||
Err(e) => {
|
||||
log::warn!("network error: {e}");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
handle_udp_rx(server, capture_tx, emulation, sender_tx, &mut last_ignored, udp_event).await?;
|
||||
}
|
||||
emulate_event = rx.recv() => {
|
||||
match emulate_event.expect("channel closed") {
|
||||
EmulationEvent::Create(h) => emulation.create(h).await,
|
||||
EmulationEvent::Destroy(h) => emulation.destroy(h).await,
|
||||
EmulationEvent::ReleaseKeys(c) => release_keys(server, emulation, c).await?,
|
||||
}
|
||||
}
|
||||
_ = server.notifies.cancel.cancelled() => break Ok(()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_udp_rx(
|
||||
server: &Server,
|
||||
capture_tx: &Sender<CaptureEvent>,
|
||||
@@ -111,7 +138,6 @@ async fn handle_udp_rx(
|
||||
sender_tx: &Sender<(Event, SocketAddr)>,
|
||||
last_ignored: &mut Option<SocketAddr>,
|
||||
event: (Event, SocketAddr),
|
||||
timer_tx: &Sender<()>,
|
||||
) -> Result<(), EmulationError> {
|
||||
let (event, addr) = event;
|
||||
|
||||
@@ -161,7 +187,7 @@ async fn handle_udp_rx(
|
||||
);
|
||||
// restart timer if necessary
|
||||
if restart_timer {
|
||||
let _ = timer_tx.try_send(());
|
||||
server.restart_ping_timer();
|
||||
}
|
||||
ignore_event
|
||||
} else {
|
||||
|
||||
@@ -1,350 +0,0 @@
|
||||
use std::{
|
||||
collections::HashSet,
|
||||
io::ErrorKind,
|
||||
net::{IpAddr, SocketAddr},
|
||||
};
|
||||
#[cfg(unix)]
|
||||
use tokio::net::UnixStream;
|
||||
|
||||
#[cfg(windows)]
|
||||
use tokio::net::TcpStream;
|
||||
|
||||
use anyhow::{anyhow, Result};
|
||||
use tokio::{
|
||||
io::ReadHalf,
|
||||
sync::mpsc::{Receiver, Sender},
|
||||
task::JoinHandle,
|
||||
};
|
||||
|
||||
use crate::{
|
||||
client::{ClientHandle, Position},
|
||||
frontend::{self, FrontendEvent, FrontendListener, FrontendRequest},
|
||||
};
|
||||
|
||||
use super::{
|
||||
capture_task::CaptureEvent, emulation_task::EmulationEvent, resolver_task::DnsRequest, Server,
|
||||
};
|
||||
|
||||
pub(crate) fn new(
|
||||
mut frontend: FrontendListener,
|
||||
mut notify_rx: Receiver<FrontendEvent>,
|
||||
server: Server,
|
||||
capture: Sender<CaptureEvent>,
|
||||
emulate: Sender<EmulationEvent>,
|
||||
resolve_ch: Sender<DnsRequest>,
|
||||
port_tx: Sender<u16>,
|
||||
) -> (JoinHandle<Result<()>>, Sender<FrontendRequest>) {
|
||||
let (event_tx, mut event_rx) = tokio::sync::mpsc::channel(32);
|
||||
let event_tx_clone = event_tx.clone();
|
||||
let frontend_task = tokio::task::spawn_local(async move {
|
||||
loop {
|
||||
tokio::select! {
|
||||
stream = frontend.accept() => {
|
||||
let stream = match stream {
|
||||
Ok(s) => s,
|
||||
Err(e) => {
|
||||
log::warn!("error accepting frontend connection: {e}");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
handle_frontend_stream(&event_tx_clone, stream).await;
|
||||
}
|
||||
event = event_rx.recv() => {
|
||||
let frontend_event = event.ok_or(anyhow!("frontend channel closed"))?;
|
||||
if handle_frontend_event(&server, &capture, &emulate, &resolve_ch, &mut frontend, &port_tx, frontend_event).await {
|
||||
break;
|
||||
}
|
||||
}
|
||||
notify = notify_rx.recv() => {
|
||||
let notify = notify.ok_or(anyhow!("frontend notify closed"))?;
|
||||
let _ = frontend.broadcast_event(notify).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
anyhow::Ok(())
|
||||
});
|
||||
(frontend_task, event_tx)
|
||||
}
|
||||
|
||||
async fn handle_frontend_stream(
|
||||
frontend_tx: &Sender<FrontendRequest>,
|
||||
#[cfg(unix)] mut stream: ReadHalf<UnixStream>,
|
||||
#[cfg(windows)] mut stream: ReadHalf<TcpStream>,
|
||||
) {
|
||||
use std::io;
|
||||
|
||||
let tx = frontend_tx.clone();
|
||||
tokio::task::spawn_local(async move {
|
||||
loop {
|
||||
let request = frontend::wait_for_request(&mut stream).await;
|
||||
match request {
|
||||
Ok(request) => {
|
||||
let _ = tx.send(request).await;
|
||||
}
|
||||
Err(e) => {
|
||||
if let Some(e) = e.downcast_ref::<io::Error>() {
|
||||
if e.kind() == ErrorKind::UnexpectedEof {
|
||||
return;
|
||||
}
|
||||
}
|
||||
log::error!("error reading frontend event: {e}");
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
async fn handle_frontend_event(
|
||||
server: &Server,
|
||||
capture: &Sender<CaptureEvent>,
|
||||
emulate: &Sender<EmulationEvent>,
|
||||
resolve_tx: &Sender<DnsRequest>,
|
||||
frontend: &mut FrontendListener,
|
||||
port_tx: &Sender<u16>,
|
||||
event: FrontendRequest,
|
||||
) -> bool {
|
||||
log::debug!("frontend: {event:?}");
|
||||
match event {
|
||||
FrontendRequest::Create => {
|
||||
let handle = add_client(server, frontend).await;
|
||||
resolve_dns(server, resolve_tx, handle).await;
|
||||
}
|
||||
FrontendRequest::Activate(handle, active) => {
|
||||
if active {
|
||||
activate_client(server, capture, emulate, handle).await;
|
||||
} else {
|
||||
deactivate_client(server, capture, emulate, handle).await;
|
||||
}
|
||||
}
|
||||
FrontendRequest::ChangePort(port) => {
|
||||
let _ = port_tx.send(port).await;
|
||||
}
|
||||
FrontendRequest::Delete(handle) => {
|
||||
remove_client(server, capture, emulate, handle).await;
|
||||
broadcast(frontend, FrontendEvent::Deleted(handle)).await;
|
||||
}
|
||||
FrontendRequest::Enumerate() => {
|
||||
let clients = server
|
||||
.client_manager
|
||||
.borrow()
|
||||
.get_client_states()
|
||||
.map(|(h, (c, s))| (h, c.clone(), s.clone()))
|
||||
.collect();
|
||||
broadcast(frontend, FrontendEvent::Enumerate(clients)).await;
|
||||
}
|
||||
FrontendRequest::GetState(handle) => {
|
||||
broadcast_client(server, frontend, handle).await;
|
||||
}
|
||||
FrontendRequest::Terminate() => {
|
||||
log::info!("terminating gracefully...");
|
||||
return true;
|
||||
}
|
||||
FrontendRequest::UpdateFixIps(handle, fix_ips) => {
|
||||
update_fix_ips(server, handle, fix_ips).await;
|
||||
resolve_dns(server, resolve_tx, handle).await;
|
||||
}
|
||||
FrontendRequest::UpdateHostname(handle, hostname) => {
|
||||
update_hostname(server, resolve_tx, handle, hostname).await;
|
||||
resolve_dns(server, resolve_tx, handle).await;
|
||||
}
|
||||
FrontendRequest::UpdatePort(handle, port) => {
|
||||
update_port(server, handle, port).await;
|
||||
}
|
||||
FrontendRequest::UpdatePosition(handle, pos) => {
|
||||
update_pos(server, handle, capture, emulate, pos).await;
|
||||
}
|
||||
FrontendRequest::ResolveDns(handle) => {
|
||||
resolve_dns(server, resolve_tx, handle).await;
|
||||
}
|
||||
};
|
||||
false
|
||||
}
|
||||
|
||||
async fn resolve_dns(server: &Server, resolve_tx: &Sender<DnsRequest>, handle: ClientHandle) {
|
||||
let hostname = server
|
||||
.client_manager
|
||||
.borrow()
|
||||
.get(handle)
|
||||
.and_then(|(c, _)| c.hostname.clone());
|
||||
if let Some(hostname) = hostname {
|
||||
let _ = resolve_tx
|
||||
.send(DnsRequest {
|
||||
hostname: hostname.clone(),
|
||||
handle,
|
||||
})
|
||||
.await;
|
||||
}
|
||||
}
|
||||
|
||||
async fn broadcast(frontend: &mut FrontendListener, event: FrontendEvent) {
|
||||
if let Err(e) = frontend.broadcast_event(event).await {
|
||||
log::error!("error notifying frontend: {e}");
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn add_client(server: &Server, frontend: &mut FrontendListener) -> ClientHandle {
|
||||
let handle = server.client_manager.borrow_mut().add_client();
|
||||
log::info!("added client {handle}");
|
||||
|
||||
let (c, s) = server.client_manager.borrow().get(handle).unwrap().clone();
|
||||
broadcast(frontend, FrontendEvent::Created(handle, c, s)).await;
|
||||
handle
|
||||
}
|
||||
|
||||
pub async fn deactivate_client(
|
||||
server: &Server,
|
||||
capture: &Sender<CaptureEvent>,
|
||||
emulate: &Sender<EmulationEvent>,
|
||||
handle: ClientHandle,
|
||||
) {
|
||||
match server.client_manager.borrow_mut().get_mut(handle) {
|
||||
Some((_, s)) => {
|
||||
s.active = false;
|
||||
}
|
||||
None => return,
|
||||
};
|
||||
|
||||
let _ = capture.send(CaptureEvent::Destroy(handle)).await;
|
||||
let _ = emulate.send(EmulationEvent::Destroy(handle)).await;
|
||||
}
|
||||
|
||||
pub async fn activate_client(
|
||||
server: &Server,
|
||||
capture: &Sender<CaptureEvent>,
|
||||
emulate: &Sender<EmulationEvent>,
|
||||
handle: ClientHandle,
|
||||
) {
|
||||
/* deactivate potential other client at this position */
|
||||
let pos = match server.client_manager.borrow().get(handle) {
|
||||
Some((client, _)) => client.pos,
|
||||
None => return,
|
||||
};
|
||||
|
||||
let other = server.client_manager.borrow_mut().find_client(pos);
|
||||
if let Some(other) = other {
|
||||
if other != handle {
|
||||
deactivate_client(server, capture, emulate, other).await;
|
||||
}
|
||||
}
|
||||
|
||||
/* activate the client */
|
||||
if let Some((_, s)) = server.client_manager.borrow_mut().get_mut(handle) {
|
||||
s.active = true;
|
||||
} else {
|
||||
return;
|
||||
};
|
||||
|
||||
/* notify emulation, capture and frontends */
|
||||
let _ = capture.send(CaptureEvent::Create(handle, pos.into())).await;
|
||||
let _ = emulate.send(EmulationEvent::Create(handle)).await;
|
||||
}
|
||||
|
||||
pub async fn remove_client(
|
||||
server: &Server,
|
||||
capture: &Sender<CaptureEvent>,
|
||||
emulate: &Sender<EmulationEvent>,
|
||||
handle: ClientHandle,
|
||||
) {
|
||||
let Some(active) = server
|
||||
.client_manager
|
||||
.borrow_mut()
|
||||
.remove_client(handle)
|
||||
.map(|(_, s)| s.active)
|
||||
else {
|
||||
return;
|
||||
};
|
||||
|
||||
if active {
|
||||
let _ = capture.send(CaptureEvent::Destroy(handle)).await;
|
||||
let _ = emulate.send(EmulationEvent::Destroy(handle)).await;
|
||||
}
|
||||
}
|
||||
|
||||
async fn update_fix_ips(server: &Server, handle: ClientHandle, fix_ips: Vec<IpAddr>) {
|
||||
let mut client_manager = server.client_manager.borrow_mut();
|
||||
let Some((c, _)) = client_manager.get_mut(handle) else {
|
||||
return;
|
||||
};
|
||||
|
||||
c.fix_ips = fix_ips;
|
||||
}
|
||||
|
||||
async fn update_hostname(
|
||||
server: &Server,
|
||||
resolve_tx: &Sender<DnsRequest>,
|
||||
handle: ClientHandle,
|
||||
hostname: Option<String>,
|
||||
) {
|
||||
let hostname = {
|
||||
let mut client_manager = server.client_manager.borrow_mut();
|
||||
let Some((c, s)) = client_manager.get_mut(handle) else {
|
||||
return;
|
||||
};
|
||||
|
||||
// update hostname
|
||||
if c.hostname != hostname {
|
||||
c.hostname = hostname;
|
||||
s.ips = HashSet::from_iter(c.fix_ips.iter().cloned());
|
||||
s.active_addr = None;
|
||||
c.hostname.clone()
|
||||
} else {
|
||||
None
|
||||
}
|
||||
};
|
||||
|
||||
// resolve to update ips in state
|
||||
if let Some(hostname) = hostname {
|
||||
let _ = resolve_tx.send(DnsRequest { hostname, handle }).await;
|
||||
}
|
||||
}
|
||||
|
||||
async fn update_port(server: &Server, handle: ClientHandle, port: u16) {
|
||||
let mut client_manager = server.client_manager.borrow_mut();
|
||||
let Some((c, s)) = client_manager.get_mut(handle) else {
|
||||
return;
|
||||
};
|
||||
|
||||
if c.port != port {
|
||||
c.port = port;
|
||||
s.active_addr = s.active_addr.map(|a| SocketAddr::new(a.ip(), port));
|
||||
}
|
||||
}
|
||||
|
||||
async fn update_pos(
|
||||
server: &Server,
|
||||
handle: ClientHandle,
|
||||
capture: &Sender<CaptureEvent>,
|
||||
emulate: &Sender<EmulationEvent>,
|
||||
pos: Position,
|
||||
) {
|
||||
let (changed, active) = {
|
||||
let mut client_manager = server.client_manager.borrow_mut();
|
||||
let Some((c, s)) = client_manager.get_mut(handle) else {
|
||||
return;
|
||||
};
|
||||
|
||||
let changed = c.pos != pos;
|
||||
c.pos = pos;
|
||||
(changed, s.active)
|
||||
};
|
||||
|
||||
// update state in event input emulator & input capture
|
||||
if changed {
|
||||
if active {
|
||||
let _ = capture.send(CaptureEvent::Destroy(handle)).await;
|
||||
let _ = emulate.send(EmulationEvent::Destroy(handle)).await;
|
||||
}
|
||||
let _ = capture.send(CaptureEvent::Create(handle, pos.into())).await;
|
||||
let _ = emulate.send(EmulationEvent::Create(handle)).await;
|
||||
}
|
||||
}
|
||||
|
||||
async fn broadcast_client(server: &Server, frontend: &mut FrontendListener, handle: ClientHandle) {
|
||||
let client = server.client_manager.borrow().get(handle).cloned();
|
||||
if let Some((config, state)) = client {
|
||||
broadcast(frontend, FrontendEvent::State(handle, config, state)).await;
|
||||
} else {
|
||||
broadcast(frontend, FrontendEvent::NoSuchClient(handle)).await;
|
||||
}
|
||||
}
|
||||
@@ -7,71 +7,57 @@ use tokio::{
|
||||
task::JoinHandle,
|
||||
};
|
||||
|
||||
use crate::frontend::FrontendEvent;
|
||||
use input_event::{Event, ProtocolError};
|
||||
|
||||
use super::Server;
|
||||
|
||||
pub async fn new(
|
||||
pub(crate) async fn new(
|
||||
server: Server,
|
||||
frontend_notify_tx: Sender<FrontendEvent>,
|
||||
) -> io::Result<(
|
||||
JoinHandle<()>,
|
||||
Sender<(Event, SocketAddr)>,
|
||||
Receiver<Result<(Event, SocketAddr), NetworkError>>,
|
||||
Sender<u16>,
|
||||
)> {
|
||||
udp_recv_tx: Sender<Result<(Event, SocketAddr), NetworkError>>,
|
||||
udp_send_rx: Receiver<(Event, SocketAddr)>,
|
||||
) -> io::Result<JoinHandle<()>> {
|
||||
// bind the udp socket
|
||||
let listen_addr = SocketAddr::new("0.0.0.0".parse().unwrap(), server.port.get());
|
||||
let mut socket = UdpSocket::bind(listen_addr).await?;
|
||||
let (receiver_tx, receiver_rx) = tokio::sync::mpsc::channel(32);
|
||||
let (sender_tx, sender_rx) = tokio::sync::mpsc::channel(32);
|
||||
let (port_tx, mut port_rx) = tokio::sync::mpsc::channel(32);
|
||||
|
||||
let udp_task = tokio::task::spawn_local(async move {
|
||||
let mut sender_rx = sender_rx;
|
||||
Ok(tokio::task::spawn_local(async move {
|
||||
let mut sender_rx = udp_send_rx;
|
||||
loop {
|
||||
let udp_receiver = udp_receiver(&socket, &receiver_tx);
|
||||
let udp_receiver = udp_receiver(&socket, &udp_recv_tx);
|
||||
let udp_sender = udp_sender(&socket, &mut sender_rx);
|
||||
tokio::select! {
|
||||
_ = udp_receiver => break, /* channel closed */
|
||||
_ = udp_sender => break, /* channel closed */
|
||||
port = port_rx.recv() => match port {
|
||||
Some(port) => update_port(&server, &frontend_notify_tx, &mut socket, port).await,
|
||||
_ => break,
|
||||
}
|
||||
_ = server.notifies.port_changed.notified() => update_port(&server, &mut socket).await,
|
||||
_ = server.cancelled() => break, /* cancellation requested */
|
||||
}
|
||||
}
|
||||
});
|
||||
Ok((udp_task, sender_tx, receiver_rx, port_tx))
|
||||
}))
|
||||
}
|
||||
|
||||
async fn update_port(
|
||||
server: &Server,
|
||||
frontend_chan: &Sender<FrontendEvent>,
|
||||
socket: &mut UdpSocket,
|
||||
port: u16,
|
||||
) {
|
||||
async fn update_port(server: &Server, socket: &mut UdpSocket) {
|
||||
let new_port = server.port.get();
|
||||
let current_port = socket.local_addr().expect("socket not bound").port();
|
||||
|
||||
// if port is the same, we dont need to change it
|
||||
if socket.local_addr().unwrap().port() == port {
|
||||
if current_port == new_port {
|
||||
return;
|
||||
}
|
||||
|
||||
// create new socket
|
||||
let listen_addr = SocketAddr::new("0.0.0.0".parse().unwrap(), port);
|
||||
let frontend_event = match UdpSocket::bind(listen_addr).await {
|
||||
// bind new socket
|
||||
let listen_addr = SocketAddr::new("0.0.0.0".parse().unwrap(), new_port);
|
||||
let new_socket = UdpSocket::bind(listen_addr).await;
|
||||
let err = match new_socket {
|
||||
Ok(new_socket) => {
|
||||
*socket = new_socket;
|
||||
server.port.replace(port);
|
||||
FrontendEvent::PortChanged(port, None)
|
||||
}
|
||||
Err(e) => {
|
||||
log::warn!("could not change port: {e}");
|
||||
let port = socket.local_addr().unwrap().port();
|
||||
FrontendEvent::PortChanged(port, Some(format!("could not change port: {e}")))
|
||||
None
|
||||
}
|
||||
Err(e) => Some(e.to_string()),
|
||||
};
|
||||
let _ = frontend_chan.send(frontend_event).await;
|
||||
|
||||
// notify frontend of the actual port
|
||||
let port = socket.local_addr().expect("socket not bound").port();
|
||||
server.notify_port_changed(port, err);
|
||||
}
|
||||
|
||||
async fn udp_receiver(
|
||||
@@ -80,18 +66,13 @@ async fn udp_receiver(
|
||||
) {
|
||||
loop {
|
||||
let event = receive_event(socket).await;
|
||||
if receiver_tx.send(event).await.is_err() {
|
||||
break;
|
||||
}
|
||||
receiver_tx.send(event).await.expect("channel closed");
|
||||
}
|
||||
}
|
||||
|
||||
async fn udp_sender(socket: &UdpSocket, rx: &mut Receiver<(Event, SocketAddr)>) {
|
||||
loop {
|
||||
let (event, addr) = match rx.recv().await {
|
||||
Some(e) => e,
|
||||
None => return,
|
||||
};
|
||||
let (event, addr) = rx.recv().await.expect("channel closed");
|
||||
if let Err(e) = send_event(socket, event, addr) {
|
||||
log::warn!("udp send failed: {e}");
|
||||
};
|
||||
|
||||
@@ -1,9 +1,6 @@
|
||||
use std::{net::SocketAddr, time::Duration};
|
||||
|
||||
use tokio::{
|
||||
sync::mpsc::{Receiver, Sender},
|
||||
task::JoinHandle,
|
||||
};
|
||||
use tokio::{sync::mpsc::Sender, task::JoinHandle};
|
||||
|
||||
use input_event::Event;
|
||||
|
||||
@@ -13,119 +10,129 @@ use super::{capture_task::CaptureEvent, emulation_task::EmulationEvent, Server,
|
||||
|
||||
const MAX_RESPONSE_TIME: Duration = Duration::from_millis(500);
|
||||
|
||||
pub fn new(
|
||||
pub(crate) fn new(
|
||||
server: Server,
|
||||
sender_ch: Sender<(Event, SocketAddr)>,
|
||||
emulate_notify: Sender<EmulationEvent>,
|
||||
capture_notify: Sender<CaptureEvent>,
|
||||
mut timer_rx: Receiver<()>,
|
||||
) -> JoinHandle<()> {
|
||||
// timer task
|
||||
let ping_task = tokio::task::spawn_local(async move {
|
||||
tokio::task::spawn_local(async move {
|
||||
tokio::select! {
|
||||
_ = server.notifies.cancel.cancelled() => {}
|
||||
_ = ping_task(&server, sender_ch, emulate_notify, capture_notify) => {}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
async fn ping_task(
|
||||
server: &Server,
|
||||
sender_ch: Sender<(Event, SocketAddr)>,
|
||||
emulate_notify: Sender<EmulationEvent>,
|
||||
capture_notify: Sender<CaptureEvent>,
|
||||
) {
|
||||
loop {
|
||||
// wait for wake up signal
|
||||
server.ping_timer_notified().await;
|
||||
loop {
|
||||
// wait for wake up signal
|
||||
let Some(_): Option<()> = timer_rx.recv().await else {
|
||||
break;
|
||||
};
|
||||
loop {
|
||||
let receiving = server.state.get() == State::Receiving;
|
||||
let (ping_clients, ping_addrs) = {
|
||||
let mut client_manager = server.client_manager.borrow_mut();
|
||||
let receiving = server.state.get() == State::Receiving;
|
||||
let (ping_clients, ping_addrs) = {
|
||||
let mut client_manager = server.client_manager.borrow_mut();
|
||||
|
||||
let ping_clients: Vec<ClientHandle> = if receiving {
|
||||
// if receiving we care about clients with pressed keys
|
||||
client_manager
|
||||
.get_client_states_mut()
|
||||
.filter(|(_, (_, s))| !s.pressed_keys.is_empty())
|
||||
.map(|(h, _)| h)
|
||||
.collect()
|
||||
} else {
|
||||
// if sending we care about the active client
|
||||
server.active_client.get().iter().cloned().collect()
|
||||
};
|
||||
|
||||
// get relevant socket addrs for clients
|
||||
let ping_addrs: Vec<SocketAddr> = {
|
||||
ping_clients
|
||||
.iter()
|
||||
.flat_map(|&h| client_manager.get(h))
|
||||
.flat_map(|(c, s)| {
|
||||
if s.alive && s.active_addr.is_some() {
|
||||
vec![s.active_addr.unwrap()]
|
||||
} else {
|
||||
s.ips
|
||||
.iter()
|
||||
.cloned()
|
||||
.map(|ip| SocketAddr::new(ip, c.port))
|
||||
.collect()
|
||||
}
|
||||
})
|
||||
.collect()
|
||||
};
|
||||
|
||||
// reset alive
|
||||
for (_, (_, s)) in client_manager.get_client_states_mut() {
|
||||
s.alive = false;
|
||||
}
|
||||
|
||||
(ping_clients, ping_addrs)
|
||||
let ping_clients: Vec<ClientHandle> = if receiving {
|
||||
// if receiving we care about clients with pressed keys
|
||||
client_manager
|
||||
.get_client_states_mut()
|
||||
.filter(|(_, (_, s))| !s.pressed_keys.is_empty())
|
||||
.map(|(h, _)| h)
|
||||
.collect()
|
||||
} else {
|
||||
// if sending we care about the active client
|
||||
server.active_client.get().iter().cloned().collect()
|
||||
};
|
||||
|
||||
if receiving && ping_clients.is_empty() {
|
||||
// receiving and no client has pressed keys
|
||||
// -> no need to keep pinging
|
||||
break;
|
||||
}
|
||||
|
||||
// ping clients
|
||||
for addr in ping_addrs {
|
||||
if sender_ch.send((Event::Ping(), addr)).await.is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// give clients time to resond
|
||||
if receiving {
|
||||
log::trace!("waiting {MAX_RESPONSE_TIME:?} for response from client with pressed keys ...");
|
||||
} else {
|
||||
log::trace!(
|
||||
"state: {:?} => waiting {MAX_RESPONSE_TIME:?} for client to respond ...",
|
||||
server.state.get()
|
||||
);
|
||||
}
|
||||
|
||||
tokio::time::sleep(MAX_RESPONSE_TIME).await;
|
||||
|
||||
// when anything is received from a client,
|
||||
// the alive flag gets set
|
||||
let unresponsive_clients: Vec<_> = {
|
||||
let client_manager = server.client_manager.borrow();
|
||||
// get relevant socket addrs for clients
|
||||
let ping_addrs: Vec<SocketAddr> = {
|
||||
ping_clients
|
||||
.iter()
|
||||
.filter_map(|&h| match client_manager.get(h) {
|
||||
Some((_, s)) if !s.alive => Some(h),
|
||||
_ => None,
|
||||
.flat_map(|&h| client_manager.get(h))
|
||||
.flat_map(|(c, s)| {
|
||||
if s.alive && s.active_addr.is_some() {
|
||||
vec![s.active_addr.unwrap()]
|
||||
} else {
|
||||
s.ips
|
||||
.iter()
|
||||
.cloned()
|
||||
.map(|ip| SocketAddr::new(ip, c.port))
|
||||
.collect()
|
||||
}
|
||||
})
|
||||
.collect()
|
||||
};
|
||||
|
||||
// we may not be receiving anymore but we should respond
|
||||
// to the original state and not the "new" one
|
||||
if receiving {
|
||||
for h in unresponsive_clients {
|
||||
log::warn!("device not responding, releasing keys!");
|
||||
let _ = emulate_notify.send(EmulationEvent::ReleaseKeys(h)).await;
|
||||
}
|
||||
} else {
|
||||
// release pointer if the active client has not responded
|
||||
if !unresponsive_clients.is_empty() {
|
||||
log::warn!("client not responding, releasing pointer!");
|
||||
server.state.replace(State::Receiving);
|
||||
let _ = capture_notify.send(CaptureEvent::Release).await;
|
||||
}
|
||||
// reset alive
|
||||
for (_, (_, s)) in client_manager.get_client_states_mut() {
|
||||
s.alive = false;
|
||||
}
|
||||
|
||||
(ping_clients, ping_addrs)
|
||||
};
|
||||
|
||||
if receiving && ping_clients.is_empty() {
|
||||
// receiving and no client has pressed keys
|
||||
// -> no need to keep pinging
|
||||
break;
|
||||
}
|
||||
|
||||
// ping clients
|
||||
for addr in ping_addrs {
|
||||
if sender_ch.send((Event::Ping(), addr)).await.is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// give clients time to resond
|
||||
if receiving {
|
||||
log::trace!(
|
||||
"waiting {MAX_RESPONSE_TIME:?} for response from client with pressed keys ..."
|
||||
);
|
||||
} else {
|
||||
log::trace!(
|
||||
"state: {:?} => waiting {MAX_RESPONSE_TIME:?} for client to respond ...",
|
||||
server.state.get()
|
||||
);
|
||||
}
|
||||
|
||||
tokio::time::sleep(MAX_RESPONSE_TIME).await;
|
||||
|
||||
// when anything is received from a client,
|
||||
// the alive flag gets set
|
||||
let unresponsive_clients: Vec<_> = {
|
||||
let client_manager = server.client_manager.borrow();
|
||||
ping_clients
|
||||
.iter()
|
||||
.filter_map(|&h| match client_manager.get(h) {
|
||||
Some((_, s)) if !s.alive => Some(h),
|
||||
_ => None,
|
||||
})
|
||||
.collect()
|
||||
};
|
||||
|
||||
// we may not be receiving anymore but we should respond
|
||||
// to the original state and not the "new" one
|
||||
if receiving {
|
||||
for h in unresponsive_clients {
|
||||
log::warn!("device not responding, releasing keys!");
|
||||
let _ = emulate_notify.send(EmulationEvent::ReleaseKeys(h)).await;
|
||||
}
|
||||
} else {
|
||||
// release pointer if the active client has not responded
|
||||
if !unresponsive_clients.is_empty() {
|
||||
log::warn!("client not responding, releasing pointer!");
|
||||
server.state.replace(State::Receiving);
|
||||
let _ = capture_notify.send(CaptureEvent::Release).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
ping_task
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,68 +0,0 @@
|
||||
use std::collections::HashSet;
|
||||
|
||||
use tokio::{sync::mpsc::Sender, task::JoinHandle};
|
||||
|
||||
use crate::{client::ClientHandle, dns::DnsResolver, frontend::FrontendEvent};
|
||||
|
||||
use super::Server;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct DnsRequest {
|
||||
pub hostname: String,
|
||||
pub handle: ClientHandle,
|
||||
}
|
||||
|
||||
pub fn new(
|
||||
resolver: DnsResolver,
|
||||
mut server: Server,
|
||||
mut frontend: Sender<FrontendEvent>,
|
||||
) -> (JoinHandle<()>, Sender<DnsRequest>) {
|
||||
let (dns_tx, mut dns_rx) = tokio::sync::mpsc::channel::<DnsRequest>(32);
|
||||
let resolver_task = tokio::task::spawn_local(async move {
|
||||
loop {
|
||||
let (host, handle) = match dns_rx.recv().await {
|
||||
Some(r) => (r.hostname, r.handle),
|
||||
None => break,
|
||||
};
|
||||
|
||||
/* update resolving status */
|
||||
if let Some((_, s)) = server.client_manager.borrow_mut().get_mut(handle) {
|
||||
s.resolving = true;
|
||||
}
|
||||
notify_state_change(&mut frontend, &mut server, handle).await;
|
||||
|
||||
let ips = match resolver.resolve(&host).await {
|
||||
Ok(ips) => ips,
|
||||
Err(e) => {
|
||||
log::warn!("could not resolve host '{host}': {e}");
|
||||
vec![]
|
||||
}
|
||||
};
|
||||
|
||||
/* update ips and resolving state */
|
||||
if let Some((c, s)) = server.client_manager.borrow_mut().get_mut(handle) {
|
||||
let mut addrs = HashSet::from_iter(c.fix_ips.iter().cloned());
|
||||
for ip in ips {
|
||||
addrs.insert(ip);
|
||||
}
|
||||
s.ips = addrs;
|
||||
s.resolving = false;
|
||||
}
|
||||
notify_state_change(&mut frontend, &mut server, handle).await;
|
||||
}
|
||||
});
|
||||
(resolver_task, dns_tx)
|
||||
}
|
||||
|
||||
async fn notify_state_change(
|
||||
frontend: &mut Sender<FrontendEvent>,
|
||||
server: &mut Server,
|
||||
handle: ClientHandle,
|
||||
) {
|
||||
let state = server.client_manager.borrow_mut().get_mut(handle).cloned();
|
||||
if let Some((config, state)) = state {
|
||||
let _ = frontend
|
||||
.send(FrontendEvent::State(handle, config, state))
|
||||
.await;
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user