mirror of
https://github.com/feschber/lan-mouse.git
synced 2026-04-02 11:31:28 +03:00
enter acknowledgement
This commit is contained in:
169
src/connect.rs
169
src/connect.rs
@@ -1,6 +1,7 @@
|
||||
use crate::server::Server;
|
||||
use lan_mouse_ipc::{ClientHandle, DEFAULT_PORT};
|
||||
use lan_mouse_proto::{ProtoEvent, MAX_EVENT_SIZE};
|
||||
use local_channel::mpsc::{channel, Receiver, Sender};
|
||||
use std::{
|
||||
collections::{HashMap, HashSet},
|
||||
io,
|
||||
@@ -67,17 +68,26 @@ pub(crate) struct LanMouseConnection {
|
||||
server: Server,
|
||||
conns: Rc<Mutex<HashMap<SocketAddr, Arc<dyn Conn + Send + Sync>>>>,
|
||||
connecting: Rc<Mutex<HashSet<ClientHandle>>>,
|
||||
recv_rx: Receiver<(ClientHandle, ProtoEvent)>,
|
||||
recv_tx: Sender<(ClientHandle, ProtoEvent)>,
|
||||
}
|
||||
|
||||
impl LanMouseConnection {
|
||||
pub(crate) fn new(server: Server) -> Self {
|
||||
let (recv_tx, recv_rx) = channel();
|
||||
Self {
|
||||
server,
|
||||
conns: Default::default(),
|
||||
connecting: Default::default(),
|
||||
recv_rx,
|
||||
recv_tx,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn recv(&mut self) -> (ClientHandle, ProtoEvent) {
|
||||
self.recv_rx.recv().await.expect("channel closed")
|
||||
}
|
||||
|
||||
pub(crate) async fn send(
|
||||
&self,
|
||||
event: ProtoEvent,
|
||||
@@ -103,68 +113,105 @@ impl LanMouseConnection {
|
||||
}
|
||||
|
||||
// check if we are already trying to connect
|
||||
{
|
||||
let mut connecting = self.connecting.lock().await;
|
||||
if connecting.contains(&handle) {
|
||||
return Err(LanMouseConnectionError::NotConnected);
|
||||
} else {
|
||||
connecting.insert(handle);
|
||||
}
|
||||
let mut connecting = self.connecting.lock().await;
|
||||
if !connecting.contains(&handle) {
|
||||
connecting.insert(handle);
|
||||
// connect in the background
|
||||
spawn_local(connect_to_handle(
|
||||
self.server.clone(),
|
||||
handle,
|
||||
self.conns.clone(),
|
||||
self.connecting.clone(),
|
||||
self.recv_tx.clone(),
|
||||
));
|
||||
}
|
||||
let server = self.server.clone();
|
||||
let conns = self.conns.clone();
|
||||
let connecting = self.connecting.clone();
|
||||
|
||||
// connect in the background
|
||||
spawn_local(async move {
|
||||
// sending did not work, figure out active conn.
|
||||
if let Some(addrs) = server.get_ips(handle) {
|
||||
let port = server.get_port(handle).unwrap_or(DEFAULT_PORT);
|
||||
let addrs = addrs
|
||||
.into_iter()
|
||||
.map(|a| SocketAddr::new(a, port))
|
||||
.collect::<Vec<_>>();
|
||||
log::info!("client ({handle}) connecting ... (ips: {addrs:?})");
|
||||
let res = connect_any(&addrs).await;
|
||||
let (conn, addr) = match res {
|
||||
Ok(c) => c,
|
||||
Err(e) => {
|
||||
connecting.lock().await.remove(&handle);
|
||||
return Err(e);
|
||||
}
|
||||
};
|
||||
log::info!("client ({handle}) connected @ {addr}");
|
||||
server.set_active_addr(handle, Some(addr));
|
||||
conns.lock().await.insert(addr, conn.clone());
|
||||
connecting.lock().await.remove(&handle);
|
||||
spawn_local(async move {
|
||||
loop {
|
||||
let (buf, len) = ProtoEvent::Ping.into();
|
||||
if let Err(e) = conn.send(&buf[..len]).await {
|
||||
log::warn!("client ({handle}) @ {addr} connection closed: {e}");
|
||||
conns.lock().await.remove(&addr);
|
||||
server.set_active_addr(handle, None);
|
||||
let active: Vec<SocketAddr> =
|
||||
conns.lock().await.keys().copied().collect();
|
||||
log::info!("active connections: {active:?}");
|
||||
break;
|
||||
}
|
||||
tokio::time::sleep(Duration::from_millis(500)).await;
|
||||
let mut buf = [0u8; MAX_EVENT_SIZE];
|
||||
if let Err(e) = conn.recv(&mut buf).await {
|
||||
log::warn!("recv(): client ({handle}) @ {addr} connection closed: {e}");
|
||||
conns.lock().await.remove(&addr);
|
||||
server.set_active_addr(handle, None);
|
||||
let active: Vec<SocketAddr> =
|
||||
conns.lock().await.keys().copied().collect();
|
||||
log::info!("active connections: {active:?}");
|
||||
break;
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
Result::<(), LanMouseConnectionError>::Ok(())
|
||||
});
|
||||
Err(LanMouseConnectionError::NotConnected)
|
||||
}
|
||||
}
|
||||
|
||||
async fn connect_to_handle(
|
||||
server: Server,
|
||||
handle: ClientHandle,
|
||||
conns: Rc<Mutex<HashMap<SocketAddr, Arc<dyn Conn + Send + Sync>>>>,
|
||||
connecting: Rc<Mutex<HashSet<ClientHandle>>>,
|
||||
tx: Sender<(ClientHandle, ProtoEvent)>,
|
||||
) -> Result<(), LanMouseConnectionError> {
|
||||
// sending did not work, figure out active conn.
|
||||
if let Some(addrs) = server.get_ips(handle) {
|
||||
let port = server.get_port(handle).unwrap_or(DEFAULT_PORT);
|
||||
let addrs = addrs
|
||||
.into_iter()
|
||||
.map(|a| SocketAddr::new(a, port))
|
||||
.collect::<Vec<_>>();
|
||||
log::info!("client ({handle}) connecting ... (ips: {addrs:?})");
|
||||
let res = connect_any(&addrs).await;
|
||||
let (conn, addr) = match res {
|
||||
Ok(c) => c,
|
||||
Err(e) => {
|
||||
connecting.lock().await.remove(&handle);
|
||||
return Err(e);
|
||||
}
|
||||
};
|
||||
log::info!("client ({handle}) connected @ {addr}");
|
||||
server.set_active_addr(handle, Some(addr));
|
||||
conns.lock().await.insert(addr, conn.clone());
|
||||
connecting.lock().await.remove(&handle);
|
||||
|
||||
// poll connection for active
|
||||
spawn_local(ping_pong(
|
||||
server.clone(),
|
||||
handle,
|
||||
addr,
|
||||
conn.clone(),
|
||||
conns.clone(),
|
||||
));
|
||||
|
||||
// receiver
|
||||
spawn_local(receive_loop(handle, conn, tx));
|
||||
return Ok(());
|
||||
}
|
||||
Err(LanMouseConnectionError::NotConnected)
|
||||
}
|
||||
|
||||
async fn ping_pong(
|
||||
server: Server,
|
||||
handle: ClientHandle,
|
||||
addr: SocketAddr,
|
||||
conn: Arc<dyn Conn + Send + Sync>,
|
||||
conns: Rc<Mutex<HashMap<SocketAddr, Arc<dyn Conn + Send + Sync>>>>,
|
||||
) {
|
||||
loop {
|
||||
let (buf, len) = ProtoEvent::Ping.into();
|
||||
if let Err(e) = conn.send(&buf[..len]).await {
|
||||
log::warn!("client ({handle}) @ {addr} connection closed: {e}");
|
||||
conns.lock().await.remove(&addr);
|
||||
server.set_active_addr(handle, None);
|
||||
let active: Vec<SocketAddr> = conns.lock().await.keys().copied().collect();
|
||||
log::info!("active connections: {active:?}");
|
||||
break;
|
||||
}
|
||||
tokio::time::sleep(Duration::from_millis(500)).await;
|
||||
let mut buf = [0u8; MAX_EVENT_SIZE];
|
||||
if let Err(e) = conn.recv(&mut buf).await {
|
||||
log::warn!("recv(): client ({handle}) @ {addr} connection closed: {e}");
|
||||
conns.lock().await.remove(&addr);
|
||||
server.set_active_addr(handle, None);
|
||||
let active: Vec<SocketAddr> = conns.lock().await.keys().copied().collect();
|
||||
log::info!("active connections: {active:?}");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn receive_loop(
|
||||
handle: ClientHandle,
|
||||
conn: Arc<dyn Conn + Send + Sync>,
|
||||
tx: Sender<(ClientHandle, ProtoEvent)>,
|
||||
) {
|
||||
let mut buf = [0u8; MAX_EVENT_SIZE];
|
||||
while let Ok(_) = conn.recv(&mut buf).await {
|
||||
if let Ok(event) = buf.try_into() {
|
||||
tx.send((handle, event));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user