mirror of
https://github.com/feschber/lan-mouse.git
synced 2026-04-12 16:21:28 +03:00
refactor frontend task
This commit is contained in:
324
src/server.rs
324
src/server.rs
@@ -1,29 +1,22 @@
|
|||||||
use anyhow::anyhow;
|
|
||||||
use log;
|
use log;
|
||||||
use std::{
|
use std::{
|
||||||
cell::{Cell, RefCell},
|
cell::{Cell, RefCell},
|
||||||
collections::HashSet,
|
collections::HashSet,
|
||||||
io::Result,
|
io::Result,
|
||||||
net::IpAddr,
|
|
||||||
rc::Rc,
|
rc::Rc,
|
||||||
time::Duration,
|
time::Duration,
|
||||||
};
|
};
|
||||||
use tokio::{io::ReadHalf, net::UdpSocket, signal, sync::mpsc::Sender};
|
use tokio::net::UdpSocket;
|
||||||
|
use tokio::signal;
|
||||||
|
|
||||||
#[cfg(unix)]
|
use std::net::SocketAddr;
|
||||||
use tokio::net::UnixStream;
|
|
||||||
|
|
||||||
#[cfg(windows)]
|
|
||||||
use tokio::net::TcpStream;
|
|
||||||
|
|
||||||
use std::{io::ErrorKind, net::SocketAddr};
|
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
client::{ClientEvent, ClientHandle, ClientManager, Position},
|
client::{ClientHandle, ClientManager},
|
||||||
config::Config,
|
config::Config,
|
||||||
dns,
|
dns,
|
||||||
event::Event,
|
event::Event,
|
||||||
frontend::{self, FrontendEvent, FrontendListener, FrontendNotify},
|
frontend::{FrontendEvent, FrontendListener, FrontendNotify},
|
||||||
server::producer_task::ProducerEvent,
|
server::producer_task::ProducerEvent,
|
||||||
};
|
};
|
||||||
use crate::{consumer, producer};
|
use crate::{consumer, producer};
|
||||||
@@ -31,6 +24,7 @@ use crate::{consumer, producer};
|
|||||||
use self::consumer_task::ConsumerEvent;
|
use self::consumer_task::ConsumerEvent;
|
||||||
|
|
||||||
mod consumer_task;
|
mod consumer_task;
|
||||||
|
mod frontend_task;
|
||||||
mod producer_task;
|
mod producer_task;
|
||||||
|
|
||||||
const MAX_RESPONSE_TIME: Duration = Duration::from_millis(500);
|
const MAX_RESPONSE_TIME: Duration = Duration::from_millis(500);
|
||||||
@@ -79,7 +73,7 @@ impl Server {
|
|||||||
|
|
||||||
pub async fn run(&self) -> anyhow::Result<()> {
|
pub async fn run(&self) -> anyhow::Result<()> {
|
||||||
// create frontend communication adapter
|
// create frontend communication adapter
|
||||||
let mut frontend = match FrontendListener::new().await {
|
let frontend = match FrontendListener::new().await {
|
||||||
Some(f) => f?,
|
Some(f) => f?,
|
||||||
None => {
|
None => {
|
||||||
// none means some other instance is already running
|
// none means some other instance is already running
|
||||||
@@ -89,9 +83,7 @@ impl Server {
|
|||||||
};
|
};
|
||||||
let (consumer, producer) = tokio::join!(consumer::create(), producer::create());
|
let (consumer, producer) = tokio::join!(consumer::create(), producer::create());
|
||||||
|
|
||||||
let (frontend_tx, mut frontend_rx) = tokio::sync::mpsc::channel(32);
|
|
||||||
let (resolve_tx, mut resolve_rx) = tokio::sync::mpsc::channel(32);
|
let (resolve_tx, mut resolve_rx) = tokio::sync::mpsc::channel(32);
|
||||||
let (frontend_notify_tx, mut frontend_notify_rx) = tokio::sync::mpsc::channel(32);
|
|
||||||
let (receiver_tx, receiver_rx) = tokio::sync::mpsc::channel(32);
|
let (receiver_tx, receiver_rx) = tokio::sync::mpsc::channel(32);
|
||||||
let (sender_tx, mut sender_rx) = tokio::sync::mpsc::channel(32);
|
let (sender_tx, mut sender_rx) = tokio::sync::mpsc::channel(32);
|
||||||
let (port_tx, mut port_rx) = tokio::sync::mpsc::channel(32);
|
let (port_tx, mut port_rx) = tokio::sync::mpsc::channel(32);
|
||||||
@@ -112,38 +104,14 @@ impl Server {
|
|||||||
);
|
);
|
||||||
|
|
||||||
// frontend listener
|
// frontend listener
|
||||||
let server = self.clone();
|
let (mut frontend_task, frontend_tx, frontend_notify_tx) = frontend_task::new(
|
||||||
let producer_notify = producer_channel.clone();
|
frontend,
|
||||||
let consumer_notify = consumer_channel.clone();
|
self.clone(),
|
||||||
let frontend_ch = frontend_tx.clone();
|
producer_channel.clone(),
|
||||||
let resolve_ch = resolve_tx.clone();
|
consumer_channel.clone(),
|
||||||
let mut frontend_task = tokio::task::spawn_local(async move {
|
resolve_tx.clone(),
|
||||||
loop {
|
port_tx,
|
||||||
tokio::select! {
|
);
|
||||||
stream = frontend.accept() => {
|
|
||||||
let stream = match stream {
|
|
||||||
Ok(s) => s,
|
|
||||||
Err(e) => {
|
|
||||||
log::warn!("error accepting frontend connection: {e}");
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
server.handle_frontend_stream(&frontend_ch, stream).await;
|
|
||||||
}
|
|
||||||
event = frontend_rx.recv() => {
|
|
||||||
let frontend_event = event.ok_or(anyhow!("frontend channel closed"))?;
|
|
||||||
if server.handle_frontend_event(&producer_notify, &consumer_notify, &resolve_ch, &mut frontend, &port_tx, frontend_event).await {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
notify = frontend_notify_rx.recv() => {
|
|
||||||
let notify = notify.ok_or(anyhow!("frontend notify closed"))?;
|
|
||||||
let _ = frontend.notify_all(notify).await;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
anyhow::Ok(())
|
|
||||||
});
|
|
||||||
|
|
||||||
// dns resolver
|
// dns resolver
|
||||||
|
|
||||||
@@ -405,268 +373,6 @@ impl Server {
|
|||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn add_client(
|
|
||||||
&self,
|
|
||||||
resolver_tx: &Sender<(String, ClientHandle)>,
|
|
||||||
hostname: Option<String>,
|
|
||||||
addr: HashSet<IpAddr>,
|
|
||||||
port: u16,
|
|
||||||
pos: Position,
|
|
||||||
) -> ClientHandle {
|
|
||||||
log::info!(
|
|
||||||
"adding client [{}]{} @ {:?}",
|
|
||||||
pos,
|
|
||||||
hostname.as_deref().unwrap_or(""),
|
|
||||||
&addr
|
|
||||||
);
|
|
||||||
let handle =
|
|
||||||
self.client_manager
|
|
||||||
.borrow_mut()
|
|
||||||
.add_client(hostname.clone(), addr, port, pos, false);
|
|
||||||
|
|
||||||
log::debug!("add_client {handle}");
|
|
||||||
|
|
||||||
if let Some(hostname) = hostname {
|
|
||||||
let _ = resolver_tx.send((hostname, handle)).await;
|
|
||||||
}
|
|
||||||
|
|
||||||
handle
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn activate_client(
|
|
||||||
&self,
|
|
||||||
producer_notify_tx: &Sender<ProducerEvent>,
|
|
||||||
consumer_notify_tx: &Sender<ConsumerEvent>,
|
|
||||||
client: ClientHandle,
|
|
||||||
active: bool,
|
|
||||||
) {
|
|
||||||
let (client, pos) = match self.client_manager.borrow_mut().get_mut(client) {
|
|
||||||
Some(state) => {
|
|
||||||
state.active = active;
|
|
||||||
(state.client.handle, state.client.pos)
|
|
||||||
}
|
|
||||||
None => return,
|
|
||||||
};
|
|
||||||
if active {
|
|
||||||
let _ = producer_notify_tx
|
|
||||||
.send(ProducerEvent::ClientEvent(ClientEvent::Create(client, pos)))
|
|
||||||
.await;
|
|
||||||
let _ = consumer_notify_tx
|
|
||||||
.send(ConsumerEvent::ClientEvent(ClientEvent::Create(client, pos)))
|
|
||||||
.await;
|
|
||||||
} else {
|
|
||||||
let _ = producer_notify_tx
|
|
||||||
.send(ProducerEvent::ClientEvent(ClientEvent::Destroy(client)))
|
|
||||||
.await;
|
|
||||||
let _ = consumer_notify_tx
|
|
||||||
.send(ConsumerEvent::ClientEvent(ClientEvent::Destroy(client)))
|
|
||||||
.await;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn remove_client(
|
|
||||||
&self,
|
|
||||||
producer_notify_tx: &Sender<ProducerEvent>,
|
|
||||||
consumer_notify_tx: &Sender<ConsumerEvent>,
|
|
||||||
frontend: &mut FrontendListener,
|
|
||||||
client: ClientHandle,
|
|
||||||
) -> Option<ClientHandle> {
|
|
||||||
let _ = producer_notify_tx
|
|
||||||
.send(ProducerEvent::ClientEvent(ClientEvent::Destroy(client)))
|
|
||||||
.await;
|
|
||||||
let _ = consumer_notify_tx
|
|
||||||
.send(ConsumerEvent::ClientEvent(ClientEvent::Destroy(client)))
|
|
||||||
.await;
|
|
||||||
|
|
||||||
let Some(client) = self
|
|
||||||
.client_manager
|
|
||||||
.borrow_mut()
|
|
||||||
.remove_client(client)
|
|
||||||
.map(|s| s.client.handle)
|
|
||||||
else {
|
|
||||||
return None;
|
|
||||||
};
|
|
||||||
|
|
||||||
let notify = FrontendNotify::NotifyClientDelete(client);
|
|
||||||
log::debug!("{notify:?}");
|
|
||||||
if let Err(e) = frontend.notify_all(notify).await {
|
|
||||||
log::error!("error notifying frontend: {e}");
|
|
||||||
}
|
|
||||||
Some(client)
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn update_client(
|
|
||||||
&self,
|
|
||||||
producer_notify_tx: &Sender<ProducerEvent>,
|
|
||||||
consumer_notify_tx: &Sender<ConsumerEvent>,
|
|
||||||
resolve_tx: &Sender<(String, ClientHandle)>,
|
|
||||||
client_update: (ClientHandle, Option<String>, u16, Position),
|
|
||||||
) {
|
|
||||||
let (handle, hostname, port, pos) = client_update;
|
|
||||||
let (hostname, handle, active) = {
|
|
||||||
// retrieve state
|
|
||||||
let mut client_manager = self.client_manager.borrow_mut();
|
|
||||||
let Some(state) = client_manager.get_mut(handle) else {
|
|
||||||
return;
|
|
||||||
};
|
|
||||||
|
|
||||||
// update pos
|
|
||||||
state.client.pos = pos;
|
|
||||||
|
|
||||||
// update port
|
|
||||||
if state.client.port != port {
|
|
||||||
state.client.port = port;
|
|
||||||
state.active_addr = state.active_addr.map(|a| SocketAddr::new(a.ip(), port));
|
|
||||||
}
|
|
||||||
|
|
||||||
// update hostname
|
|
||||||
if state.client.hostname != hostname {
|
|
||||||
state.client.ips = HashSet::new();
|
|
||||||
state.active_addr = None;
|
|
||||||
state.client.hostname = hostname;
|
|
||||||
}
|
|
||||||
|
|
||||||
log::debug!("client updated: {:?}", state);
|
|
||||||
(
|
|
||||||
state.client.hostname.clone(),
|
|
||||||
state.client.handle,
|
|
||||||
state.active,
|
|
||||||
)
|
|
||||||
};
|
|
||||||
|
|
||||||
// resolve dns
|
|
||||||
if let Some(hostname) = hostname {
|
|
||||||
let _ = resolve_tx.send((hostname, handle)).await;
|
|
||||||
}
|
|
||||||
|
|
||||||
// update state in event consumer & producer
|
|
||||||
if active {
|
|
||||||
let _ = producer_notify_tx
|
|
||||||
.send(ProducerEvent::ClientEvent(ClientEvent::Destroy(handle)))
|
|
||||||
.await;
|
|
||||||
let _ = consumer_notify_tx
|
|
||||||
.send(ConsumerEvent::ClientEvent(ClientEvent::Destroy(handle)))
|
|
||||||
.await;
|
|
||||||
let _ = producer_notify_tx
|
|
||||||
.send(ProducerEvent::ClientEvent(ClientEvent::Create(handle, pos)))
|
|
||||||
.await;
|
|
||||||
let _ = consumer_notify_tx
|
|
||||||
.send(ConsumerEvent::ClientEvent(ClientEvent::Create(handle, pos)))
|
|
||||||
.await;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn handle_frontend_stream(
|
|
||||||
&self,
|
|
||||||
frontend_tx: &Sender<FrontendEvent>,
|
|
||||||
#[cfg(unix)] mut stream: ReadHalf<UnixStream>,
|
|
||||||
#[cfg(windows)] mut stream: ReadHalf<TcpStream>,
|
|
||||||
) {
|
|
||||||
use std::io;
|
|
||||||
|
|
||||||
let tx = frontend_tx.clone();
|
|
||||||
tokio::task::spawn_local(async move {
|
|
||||||
let _ = tx.send(FrontendEvent::Enumerate()).await;
|
|
||||||
loop {
|
|
||||||
let event = frontend::read_event(&mut stream).await;
|
|
||||||
match event {
|
|
||||||
Ok(event) => {
|
|
||||||
let _ = tx.send(event).await;
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
if let Some(e) = e.downcast_ref::<io::Error>() {
|
|
||||||
if e.kind() == ErrorKind::UnexpectedEof {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
log::error!("error reading frontend event: {e}");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn handle_frontend_event(
|
|
||||||
&self,
|
|
||||||
producer_tx: &Sender<ProducerEvent>,
|
|
||||||
consumer_tx: &Sender<ConsumerEvent>,
|
|
||||||
resolve_tx: &Sender<(String, ClientHandle)>,
|
|
||||||
frontend: &mut FrontendListener,
|
|
||||||
port_tx: &Sender<u16>,
|
|
||||||
event: FrontendEvent,
|
|
||||||
) -> bool {
|
|
||||||
log::debug!("frontend: {event:?}");
|
|
||||||
let response = match event {
|
|
||||||
FrontendEvent::AddClient(hostname, port, pos) => {
|
|
||||||
let handle = self
|
|
||||||
.add_client(resolve_tx, hostname, HashSet::new(), port, pos)
|
|
||||||
.await;
|
|
||||||
|
|
||||||
let client = self
|
|
||||||
.client_manager
|
|
||||||
.borrow()
|
|
||||||
.get(handle)
|
|
||||||
.unwrap()
|
|
||||||
.client
|
|
||||||
.clone();
|
|
||||||
Some(FrontendNotify::NotifyClientCreate(client))
|
|
||||||
}
|
|
||||||
FrontendEvent::ActivateClient(handle, active) => {
|
|
||||||
self.activate_client(producer_tx, consumer_tx, handle, active)
|
|
||||||
.await;
|
|
||||||
Some(FrontendNotify::NotifyClientActivate(handle, active))
|
|
||||||
}
|
|
||||||
FrontendEvent::ChangePort(port) => {
|
|
||||||
let _ = port_tx.send(port).await;
|
|
||||||
None
|
|
||||||
}
|
|
||||||
FrontendEvent::DelClient(handle) => {
|
|
||||||
self.remove_client(producer_tx, consumer_tx, frontend, handle)
|
|
||||||
.await;
|
|
||||||
Some(FrontendNotify::NotifyClientDelete(handle))
|
|
||||||
}
|
|
||||||
FrontendEvent::Enumerate() => {
|
|
||||||
let clients = self
|
|
||||||
.client_manager
|
|
||||||
.borrow()
|
|
||||||
.get_client_states()
|
|
||||||
.map(|s| (s.client.clone(), s.active))
|
|
||||||
.collect();
|
|
||||||
Some(FrontendNotify::Enumerate(clients))
|
|
||||||
}
|
|
||||||
FrontendEvent::Shutdown() => {
|
|
||||||
log::info!("terminating gracefully...");
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
FrontendEvent::UpdateClient(handle, hostname, port, pos) => {
|
|
||||||
self.update_client(
|
|
||||||
producer_tx,
|
|
||||||
consumer_tx,
|
|
||||||
resolve_tx,
|
|
||||||
(handle, hostname, port, pos),
|
|
||||||
)
|
|
||||||
.await;
|
|
||||||
|
|
||||||
let client = self
|
|
||||||
.client_manager
|
|
||||||
.borrow()
|
|
||||||
.get(handle)
|
|
||||||
.unwrap()
|
|
||||||
.client
|
|
||||||
.clone();
|
|
||||||
Some(FrontendNotify::NotifyClientUpdate(client))
|
|
||||||
}
|
|
||||||
};
|
|
||||||
let Some(response) = response else {
|
|
||||||
return false;
|
|
||||||
};
|
|
||||||
if let Err(e) = frontend.notify_all(response).await {
|
|
||||||
log::error!("error notifying frontend: {e}");
|
|
||||||
}
|
|
||||||
false
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn receive_event(socket: &UdpSocket) -> anyhow::Result<(Event, SocketAddr)> {
|
async fn receive_event(socket: &UdpSocket) -> anyhow::Result<(Event, SocketAddr)> {
|
||||||
|
|||||||
324
src/server/frontend_task.rs
Normal file
324
src/server/frontend_task.rs
Normal file
@@ -0,0 +1,324 @@
|
|||||||
|
use std::{
|
||||||
|
collections::HashSet,
|
||||||
|
io::ErrorKind,
|
||||||
|
net::{IpAddr, SocketAddr},
|
||||||
|
};
|
||||||
|
#[cfg(unix)]
|
||||||
|
use tokio::net::UnixStream;
|
||||||
|
|
||||||
|
#[cfg(windows)]
|
||||||
|
use tokio::net::TcpStream;
|
||||||
|
|
||||||
|
use anyhow::{anyhow, Result};
|
||||||
|
use tokio::{io::ReadHalf, sync::mpsc::Sender, task::JoinHandle};
|
||||||
|
|
||||||
|
use crate::{
|
||||||
|
client::{ClientEvent, ClientHandle, Position},
|
||||||
|
frontend::{self, FrontendEvent, FrontendListener, FrontendNotify},
|
||||||
|
};
|
||||||
|
|
||||||
|
use super::{consumer_task::ConsumerEvent, producer_task::ProducerEvent, Server};
|
||||||
|
|
||||||
|
pub(crate) fn new(
|
||||||
|
mut frontend: FrontendListener,
|
||||||
|
server: Server,
|
||||||
|
producer_notify: Sender<ProducerEvent>,
|
||||||
|
consumer_notify: Sender<ConsumerEvent>,
|
||||||
|
resolve_ch: Sender<(String, u32)>,
|
||||||
|
port_tx: Sender<u16>,
|
||||||
|
) -> (
|
||||||
|
JoinHandle<Result<()>>,
|
||||||
|
Sender<FrontendEvent>,
|
||||||
|
Sender<FrontendNotify>,
|
||||||
|
) {
|
||||||
|
let (event_tx, mut event_rx) = tokio::sync::mpsc::channel(32);
|
||||||
|
let (notify_tx, mut notify_rx) = tokio::sync::mpsc::channel(32);
|
||||||
|
let event_tx_clone = event_tx.clone();
|
||||||
|
let frontend_task = tokio::task::spawn_local(async move {
|
||||||
|
loop {
|
||||||
|
tokio::select! {
|
||||||
|
stream = frontend.accept() => {
|
||||||
|
let stream = match stream {
|
||||||
|
Ok(s) => s,
|
||||||
|
Err(e) => {
|
||||||
|
log::warn!("error accepting frontend connection: {e}");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
handle_frontend_stream(&event_tx_clone, stream).await;
|
||||||
|
}
|
||||||
|
event = event_rx.recv() => {
|
||||||
|
let frontend_event = event.ok_or(anyhow!("frontend channel closed"))?;
|
||||||
|
if handle_frontend_event(&server, &producer_notify, &consumer_notify, &resolve_ch, &mut frontend, &port_tx, frontend_event).await {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
notify = notify_rx.recv() => {
|
||||||
|
let notify = notify.ok_or(anyhow!("frontend notify closed"))?;
|
||||||
|
let _ = frontend.notify_all(notify).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
anyhow::Ok(())
|
||||||
|
});
|
||||||
|
(frontend_task, event_tx, notify_tx)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn handle_frontend_stream(
|
||||||
|
frontend_tx: &Sender<FrontendEvent>,
|
||||||
|
#[cfg(unix)] mut stream: ReadHalf<UnixStream>,
|
||||||
|
#[cfg(windows)] mut stream: ReadHalf<TcpStream>,
|
||||||
|
) {
|
||||||
|
use std::io;
|
||||||
|
|
||||||
|
let tx = frontend_tx.clone();
|
||||||
|
tokio::task::spawn_local(async move {
|
||||||
|
let _ = tx.send(FrontendEvent::Enumerate()).await;
|
||||||
|
loop {
|
||||||
|
let event = frontend::read_event(&mut stream).await;
|
||||||
|
match event {
|
||||||
|
Ok(event) => {
|
||||||
|
let _ = tx.send(event).await;
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
if let Some(e) = e.downcast_ref::<io::Error>() {
|
||||||
|
if e.kind() == ErrorKind::UnexpectedEof {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
log::error!("error reading frontend event: {e}");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn handle_frontend_event(
|
||||||
|
server: &Server,
|
||||||
|
producer_tx: &Sender<ProducerEvent>,
|
||||||
|
consumer_tx: &Sender<ConsumerEvent>,
|
||||||
|
resolve_tx: &Sender<(String, ClientHandle)>,
|
||||||
|
frontend: &mut FrontendListener,
|
||||||
|
port_tx: &Sender<u16>,
|
||||||
|
event: FrontendEvent,
|
||||||
|
) -> bool {
|
||||||
|
log::debug!("frontend: {event:?}");
|
||||||
|
let response = match event {
|
||||||
|
FrontendEvent::AddClient(hostname, port, pos) => {
|
||||||
|
let handle = add_client(server, resolve_tx, hostname, HashSet::new(), port, pos).await;
|
||||||
|
|
||||||
|
let client = server
|
||||||
|
.client_manager
|
||||||
|
.borrow()
|
||||||
|
.get(handle)
|
||||||
|
.unwrap()
|
||||||
|
.client
|
||||||
|
.clone();
|
||||||
|
Some(FrontendNotify::NotifyClientCreate(client))
|
||||||
|
}
|
||||||
|
FrontendEvent::ActivateClient(handle, active) => {
|
||||||
|
activate_client(server, producer_tx, consumer_tx, handle, active).await;
|
||||||
|
Some(FrontendNotify::NotifyClientActivate(handle, active))
|
||||||
|
}
|
||||||
|
FrontendEvent::ChangePort(port) => {
|
||||||
|
let _ = port_tx.send(port).await;
|
||||||
|
None
|
||||||
|
}
|
||||||
|
FrontendEvent::DelClient(handle) => {
|
||||||
|
remove_client(server, producer_tx, consumer_tx, frontend, handle).await;
|
||||||
|
Some(FrontendNotify::NotifyClientDelete(handle))
|
||||||
|
}
|
||||||
|
FrontendEvent::Enumerate() => {
|
||||||
|
let clients = server
|
||||||
|
.client_manager
|
||||||
|
.borrow()
|
||||||
|
.get_client_states()
|
||||||
|
.map(|s| (s.client.clone(), s.active))
|
||||||
|
.collect();
|
||||||
|
Some(FrontendNotify::Enumerate(clients))
|
||||||
|
}
|
||||||
|
FrontendEvent::Shutdown() => {
|
||||||
|
log::info!("terminating gracefully...");
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
FrontendEvent::UpdateClient(handle, hostname, port, pos) => {
|
||||||
|
update_client(
|
||||||
|
server,
|
||||||
|
producer_tx,
|
||||||
|
consumer_tx,
|
||||||
|
resolve_tx,
|
||||||
|
(handle, hostname, port, pos),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
let client = server
|
||||||
|
.client_manager
|
||||||
|
.borrow()
|
||||||
|
.get(handle)
|
||||||
|
.unwrap()
|
||||||
|
.client
|
||||||
|
.clone();
|
||||||
|
Some(FrontendNotify::NotifyClientUpdate(client))
|
||||||
|
}
|
||||||
|
};
|
||||||
|
let Some(response) = response else {
|
||||||
|
return false;
|
||||||
|
};
|
||||||
|
if let Err(e) = frontend.notify_all(response).await {
|
||||||
|
log::error!("error notifying frontend: {e}");
|
||||||
|
}
|
||||||
|
false
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn add_client(
|
||||||
|
server: &Server,
|
||||||
|
resolver_tx: &Sender<(String, ClientHandle)>,
|
||||||
|
hostname: Option<String>,
|
||||||
|
addr: HashSet<IpAddr>,
|
||||||
|
port: u16,
|
||||||
|
pos: Position,
|
||||||
|
) -> ClientHandle {
|
||||||
|
log::info!(
|
||||||
|
"adding client [{}]{} @ {:?}",
|
||||||
|
pos,
|
||||||
|
hostname.as_deref().unwrap_or(""),
|
||||||
|
&addr
|
||||||
|
);
|
||||||
|
let handle =
|
||||||
|
server
|
||||||
|
.client_manager
|
||||||
|
.borrow_mut()
|
||||||
|
.add_client(hostname.clone(), addr, port, pos, false);
|
||||||
|
|
||||||
|
log::debug!("add_client {handle}");
|
||||||
|
|
||||||
|
if let Some(hostname) = hostname {
|
||||||
|
let _ = resolver_tx.send((hostname, handle)).await;
|
||||||
|
}
|
||||||
|
|
||||||
|
handle
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn activate_client(
|
||||||
|
server: &Server,
|
||||||
|
producer_notify_tx: &Sender<ProducerEvent>,
|
||||||
|
consumer_notify_tx: &Sender<ConsumerEvent>,
|
||||||
|
client: ClientHandle,
|
||||||
|
active: bool,
|
||||||
|
) {
|
||||||
|
let (client, pos) = match server.client_manager.borrow_mut().get_mut(client) {
|
||||||
|
Some(state) => {
|
||||||
|
state.active = active;
|
||||||
|
(state.client.handle, state.client.pos)
|
||||||
|
}
|
||||||
|
None => return,
|
||||||
|
};
|
||||||
|
if active {
|
||||||
|
let _ = producer_notify_tx
|
||||||
|
.send(ProducerEvent::ClientEvent(ClientEvent::Create(client, pos)))
|
||||||
|
.await;
|
||||||
|
let _ = consumer_notify_tx
|
||||||
|
.send(ConsumerEvent::ClientEvent(ClientEvent::Create(client, pos)))
|
||||||
|
.await;
|
||||||
|
} else {
|
||||||
|
let _ = producer_notify_tx
|
||||||
|
.send(ProducerEvent::ClientEvent(ClientEvent::Destroy(client)))
|
||||||
|
.await;
|
||||||
|
let _ = consumer_notify_tx
|
||||||
|
.send(ConsumerEvent::ClientEvent(ClientEvent::Destroy(client)))
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn remove_client(
|
||||||
|
server: &Server,
|
||||||
|
producer_notify_tx: &Sender<ProducerEvent>,
|
||||||
|
consumer_notify_tx: &Sender<ConsumerEvent>,
|
||||||
|
frontend: &mut FrontendListener,
|
||||||
|
client: ClientHandle,
|
||||||
|
) -> Option<ClientHandle> {
|
||||||
|
let _ = producer_notify_tx
|
||||||
|
.send(ProducerEvent::ClientEvent(ClientEvent::Destroy(client)))
|
||||||
|
.await;
|
||||||
|
let _ = consumer_notify_tx
|
||||||
|
.send(ConsumerEvent::ClientEvent(ClientEvent::Destroy(client)))
|
||||||
|
.await;
|
||||||
|
|
||||||
|
let Some(client) = server
|
||||||
|
.client_manager
|
||||||
|
.borrow_mut()
|
||||||
|
.remove_client(client)
|
||||||
|
.map(|s| s.client.handle)
|
||||||
|
else {
|
||||||
|
return None;
|
||||||
|
};
|
||||||
|
|
||||||
|
let notify = FrontendNotify::NotifyClientDelete(client);
|
||||||
|
log::debug!("{notify:?}");
|
||||||
|
if let Err(e) = frontend.notify_all(notify).await {
|
||||||
|
log::error!("error notifying frontend: {e}");
|
||||||
|
}
|
||||||
|
Some(client)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn update_client(
|
||||||
|
server: &Server,
|
||||||
|
producer_notify_tx: &Sender<ProducerEvent>,
|
||||||
|
consumer_notify_tx: &Sender<ConsumerEvent>,
|
||||||
|
resolve_tx: &Sender<(String, ClientHandle)>,
|
||||||
|
client_update: (ClientHandle, Option<String>, u16, Position),
|
||||||
|
) {
|
||||||
|
let (handle, hostname, port, pos) = client_update;
|
||||||
|
let (hostname, handle, active) = {
|
||||||
|
// retrieve state
|
||||||
|
let mut client_manager = server.client_manager.borrow_mut();
|
||||||
|
let Some(state) = client_manager.get_mut(handle) else {
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
|
||||||
|
// update pos
|
||||||
|
state.client.pos = pos;
|
||||||
|
|
||||||
|
// update port
|
||||||
|
if state.client.port != port {
|
||||||
|
state.client.port = port;
|
||||||
|
state.active_addr = state.active_addr.map(|a| SocketAddr::new(a.ip(), port));
|
||||||
|
}
|
||||||
|
|
||||||
|
// update hostname
|
||||||
|
if state.client.hostname != hostname {
|
||||||
|
state.client.ips = HashSet::new();
|
||||||
|
state.active_addr = None;
|
||||||
|
state.client.hostname = hostname;
|
||||||
|
}
|
||||||
|
|
||||||
|
log::debug!("client updated: {:?}", state);
|
||||||
|
(
|
||||||
|
state.client.hostname.clone(),
|
||||||
|
state.client.handle,
|
||||||
|
state.active,
|
||||||
|
)
|
||||||
|
};
|
||||||
|
|
||||||
|
// resolve dns
|
||||||
|
if let Some(hostname) = hostname {
|
||||||
|
let _ = resolve_tx.send((hostname, handle)).await;
|
||||||
|
}
|
||||||
|
|
||||||
|
// update state in event consumer & producer
|
||||||
|
if active {
|
||||||
|
let _ = producer_notify_tx
|
||||||
|
.send(ProducerEvent::ClientEvent(ClientEvent::Destroy(handle)))
|
||||||
|
.await;
|
||||||
|
let _ = consumer_notify_tx
|
||||||
|
.send(ConsumerEvent::ClientEvent(ClientEvent::Destroy(handle)))
|
||||||
|
.await;
|
||||||
|
let _ = producer_notify_tx
|
||||||
|
.send(ProducerEvent::ClientEvent(ClientEvent::Create(handle, pos)))
|
||||||
|
.await;
|
||||||
|
let _ = consumer_notify_tx
|
||||||
|
.send(ConsumerEvent::ClientEvent(ClientEvent::Create(handle, pos)))
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user