(hopefully) fix ping logic

This commit is contained in:
Ferdinand Schober
2024-10-19 20:17:28 +02:00
parent 9dc4e95a4d
commit f62cd3d11c

View File

@@ -3,6 +3,7 @@ use lan_mouse_ipc::{ClientHandle, DEFAULT_PORT};
use lan_mouse_proto::{ProtoEvent, MAX_EVENT_SIZE};
use local_channel::mpsc::{channel, Receiver, Sender};
use std::{
cell::RefCell,
collections::{HashMap, HashSet},
io,
net::SocketAddr,
@@ -75,6 +76,7 @@ pub(crate) struct LanMouseConnection {
connecting: Rc<Mutex<HashSet<ClientHandle>>>,
recv_rx: Receiver<(ClientHandle, ProtoEvent)>,
recv_tx: Sender<(ClientHandle, ProtoEvent)>,
ping_response: Rc<RefCell<HashSet<SocketAddr>>>,
}
impl LanMouseConnection {
@@ -87,6 +89,7 @@ impl LanMouseConnection {
connecting: Default::default(),
recv_rx,
recv_tx,
ping_response: Default::default(),
}
}
@@ -110,14 +113,15 @@ impl LanMouseConnection {
if !self.server.client_manager.alive(handle) {
return Err(LanMouseConnectionError::TargetEmulationDisabled);
}
log::trace!("{event} >->->->->- {addr}");
match conn.send(buf).await {
Ok(_) => return Ok(()),
Ok(_) => {}
Err(e) => {
log::warn!("client {handle} failed to send: {e}");
disconnect(&self.server, handle, addr, &self.conns).await;
}
}
log::trace!("{event} >->->->->- {addr}");
return Ok(());
}
}
@@ -133,6 +137,7 @@ impl LanMouseConnection {
self.conns.clone(),
self.connecting.clone(),
self.recv_tx.clone(),
self.ping_response.clone(),
));
}
Err(LanMouseConnectionError::NotConnected)
@@ -146,6 +151,7 @@ async fn connect_to_handle(
conns: Rc<Mutex<HashMap<SocketAddr, Arc<dyn Conn + Send + Sync>>>>,
connecting: Rc<Mutex<HashSet<ClientHandle>>>,
tx: Sender<(ClientHandle, ProtoEvent)>,
ping_response: Rc<RefCell<HashSet<SocketAddr>>>,
) -> Result<(), LanMouseConnectionError> {
log::info!("client {handle} connecting ...");
// sending did not work, figure out active conn.
@@ -173,16 +179,18 @@ async fn connect_to_handle(
connecting.lock().await.remove(&handle);
// poll connection for active
spawn_local(ping_pong(
server.clone(),
handle,
addr,
conn.clone(),
conns.clone(),
));
spawn_local(ping_pong(addr, conn.clone(), ping_response.clone()));
// receiver
spawn_local(receive_loop(server, handle, addr, conn, conns, tx));
spawn_local(receive_loop(
server,
handle,
addr,
conn,
conns,
tx,
ping_response.clone(),
));
return Ok(());
}
connecting.lock().await.remove(&handle);
@@ -190,26 +198,25 @@ async fn connect_to_handle(
}
async fn ping_pong(
server: Service,
handle: ClientHandle,
addr: SocketAddr,
conn: Arc<dyn Conn + Send + Sync>,
conns: Rc<Mutex<HashMap<SocketAddr, Arc<dyn Conn + Send + Sync>>>>,
ping_response: Rc<RefCell<HashSet<SocketAddr>>>,
) {
loop {
let (buf, len) = ProtoEvent::Ping.into();
log::trace!("PING >->->->->- {addr}");
if let Err(e) = conn.send(&buf[..len]).await {
log::warn!("send: {e}");
disconnect(&server, handle, addr, &conns).await;
log::warn!("{addr}: send error `{e}`, closing connection");
let _ = conn.close().await;
break;
}
log::trace!("PING >->->->->- {addr}");
tokio::time::sleep(Duration::from_millis(500)).await;
if server.client_manager.active_addr(handle).is_none() {
log::warn!("no active addr");
disconnect(&server, handle, addr, &conns).await;
if !ping_response.borrow_mut().remove(&addr) {
log::warn!("{addr} did not respond, closing connection");
let _ = conn.close().await;
return;
}
}
}
@@ -221,14 +228,17 @@ async fn receive_loop(
conn: Arc<dyn Conn + Send + Sync>,
conns: Rc<Mutex<HashMap<SocketAddr, Arc<dyn Conn + Send + Sync>>>>,
tx: Sender<(ClientHandle, ProtoEvent)>,
ping_response: Rc<RefCell<HashSet<SocketAddr>>>,
) {
let mut buf = [0u8; MAX_EVENT_SIZE];
while conn.recv(&mut buf).await.is_ok() {
if let Ok(event) = buf.try_into() {
log::trace!("{addr} <==<==<== {event}");
match event {
ProtoEvent::Pong(b) => {
server.client_manager.set_active_addr(handle, Some(addr));
server.client_manager.set_alive(handle, b);
ping_response.borrow_mut().insert(addr);
}
event => tx.send((handle, event)).expect("channel closed"),
}