impl emulation stuff

This commit is contained in:
Ferdinand Schober
2024-09-06 00:55:57 +02:00
parent b62540d9be
commit 28e4895418
10 changed files with 391 additions and 312 deletions

View File

@@ -33,7 +33,7 @@ pub enum ConnectionError {
}
#[derive(Debug, Error)]
pub enum ListenerCreationError {
pub enum IpcListenerCreationError {
#[error("could not determine socket-path: `{0}`")]
SocketPath(#[from] SocketPathError),
#[error("service already running!")]
@@ -51,7 +51,7 @@ pub enum IpcError {
#[error(transparent)]
Connection(#[from] ConnectionError),
#[error(transparent)]
Listen(#[from] ListenerCreationError),
Listen(#[from] IpcListenerCreationError),
}
pub const DEFAULT_PORT: u16 = 4242;

View File

@@ -20,7 +20,7 @@ use tokio::net::TcpListener;
#[cfg(windows)]
use tokio::net::TcpStream;
use crate::{FrontendEvent, FrontendRequest, IpcError, ListenerCreationError};
use crate::{FrontendEvent, FrontendRequest, IpcError, IpcListenerCreationError};
pub struct AsyncFrontendListener {
#[cfg(windows)]
@@ -40,7 +40,7 @@ pub struct AsyncFrontendListener {
}
impl AsyncFrontendListener {
pub async fn new() -> Result<Self, ListenerCreationError> {
pub async fn new() -> Result<Self, IpcListenerCreationError> {
#[cfg(unix)]
let (socket_path, listener) = {
let socket_path = crate::default_socket_path()?;
@@ -51,7 +51,7 @@ impl AsyncFrontendListener {
// of lan-mouse is already running
match UnixStream::connect(&socket_path).await {
// connected -> lan-mouse is already running
Ok(_) => return Err(ListenerCreationError::AlreadyRunning),
Ok(_) => return Err(IpcListenerCreationError::AlreadyRunning),
// lan-mouse is not running but a socket was left behind
Err(e) => {
log::debug!("{socket_path:?}: {e} - removing left behind socket");
@@ -63,9 +63,9 @@ impl AsyncFrontendListener {
Ok(ls) => ls,
// some other lan-mouse instance has bound the socket in the meantime
Err(e) if e.kind() == ErrorKind::AddrInUse => {
return Err(ListenerCreationError::AlreadyRunning)
return Err(IpcListenerCreationError::AlreadyRunning)
}
Err(e) => return Err(ListenerCreationError::Bind(e)),
Err(e) => return Err(IpcListenerCreationError::Bind(e)),
};
(socket_path, listener)
};
@@ -75,9 +75,9 @@ impl AsyncFrontendListener {
Ok(ls) => ls,
// some other lan-mouse instance has bound the socket in the meantime
Err(e) if e.kind() == ErrorKind::AddrInUse => {
return Err(ListenerCreationError::AlreadyRunning)
return Err(IpcListenerCreationError::AlreadyRunning)
}
Err(e) => return Err(ListenerCreationError::Bind(e)),
Err(e) => return Err(IpcListenerCreationError::Bind(e)),
};
let adapter = Self {

View File

@@ -2,7 +2,7 @@ use input_event::{Event as InputEvent, KeyboardEvent, PointerEvent};
use num_enum::{IntoPrimitive, TryFromPrimitive, TryFromPrimitiveError};
use paste::paste;
use std::{
fmt::{Debug, Display},
fmt::{Debug, Display, Formatter},
mem::size_of,
};
use thiserror::Error;
@@ -18,6 +18,31 @@ pub enum ProtocolError {
/// event type does not exist
#[error("invalid event id: `{0}`")]
InvalidEventId(#[from] TryFromPrimitiveError<EventType>),
/// position type does not exist
#[error("invalid event id: `{0}`")]
InvalidPosition(#[from] TryFromPrimitiveError<Position>),
}
/// Position of a client
#[derive(Clone, Copy, Debug, TryFromPrimitive, IntoPrimitive)]
#[repr(u8)]
pub enum Position {
Left,
Right,
Top,
Bottom,
}
impl Display for Position {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let pos = match self {
Position::Left => "left",
Position::Right => "right",
Position::Top => "top",
Position::Bottom => "bottom",
};
write!(f, "{pos}")
}
}
/// main lan-mouse protocol event type
@@ -25,7 +50,7 @@ pub enum ProtocolError {
pub enum ProtoEvent {
/// notify a client that the cursor entered its region
/// [`ProtoEvent::Ack`] with the same serial is used for synchronization between devices
Enter(u32),
Enter(Position),
/// notify a client that the cursor left its region
/// [`ProtoEvent::Ack`] with the same serial is used for synchronization between devices
Leave(u32),
@@ -41,7 +66,7 @@ pub enum ProtoEvent {
}
impl Display for ProtoEvent {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
ProtoEvent::Enter(s) => write!(f, "Enter({s})"),
ProtoEvent::Leave(s) => write!(f, "Leave({s})"),
@@ -140,7 +165,7 @@ impl TryFrom<[u8; MAX_EVENT_SIZE]> for ProtoEvent {
))),
EventType::Ping => Ok(Self::Ping),
EventType::Pong => Ok(Self::Pong),
EventType::Enter => Ok(Self::Enter(decode_u32(&mut buf)?)),
EventType::Enter => Ok(Self::Enter(decode_u8(&mut buf)?.try_into()?)),
EventType::Leave => Ok(Self::Leave(decode_u32(&mut buf)?)),
EventType::Ack => Ok(Self::Ack(decode_u32(&mut buf)?)),
}
@@ -204,7 +229,7 @@ impl From<ProtoEvent> for ([u8; MAX_EVENT_SIZE], usize) {
},
ProtoEvent::Ping => {}
ProtoEvent::Pong => {}
ProtoEvent::Enter(serial) => encode_u32(buf, len, serial),
ProtoEvent::Enter(pos) => encode_u8(buf, len, pos as u8),
ProtoEvent::Leave(serial) => encode_u32(buf, len, serial),
ProtoEvent::Ack(serial) => encode_u32(buf, len, serial),
}

39
src/connect.rs Normal file
View File

@@ -0,0 +1,39 @@
use std::{io, net::SocketAddr, sync::Arc};
use thiserror::Error;
use tokio::net::UdpSocket;
use webrtc_dtls::{
config::{Config, ExtendedMasterSecretType},
conn::DTLSConn,
crypto::Certificate,
};
use webrtc_util::Conn;
#[derive(Debug, Error)]
pub(crate) enum LanMouseConnectionError {
#[error(transparent)]
Bind(#[from] io::Error),
#[error(transparent)]
Dtls(#[from] webrtc_dtls::Error),
}
pub(crate) struct LanMouseConnection {}
impl LanMouseConnection {
pub(crate) async fn connect(
addr: SocketAddr,
) -> Result<Arc<dyn Conn + Sync + Send>, LanMouseConnectionError> {
let conn = Arc::new(UdpSocket::bind("0.0.0.0:0").await?);
conn.connect(addr).await;
log::info!("connected to {addr}, establishing secure dtls channel ...");
let certificate = Certificate::generate_self_signed(["localhost".to_owned()])?;
let config = Config {
certificates: vec![certificate],
insecure_skip_verify: true,
extended_master_secret: ExtendedMasterSecretType::Require,
..Default::default()
};
let dtls_conn: Arc<dyn Conn + Send + Sync> =
Arc::new(DTLSConn::new(conn, config, true, None).await?);
Ok(dtls_conn)
}
}

147
src/emulation.rs Normal file
View File

@@ -0,0 +1,147 @@
use crate::{listen::LanMouseListener, server::Server};
use futures::StreamExt;
use input_emulation::{EmulationHandle, InputEmulation, InputEmulationError};
use input_event::Event;
use lan_mouse_ipc::Status;
use lan_mouse_proto::ProtoEvent;
use local_channel::mpsc::{channel, Receiver, Sender};
use std::{collections::HashMap, net::SocketAddr};
use tokio::task::{spawn_local, JoinHandle};
/// emulation handling events received from a listener
pub(crate) struct Emulation {
server: Server,
listener: LanMouseListener,
emulation_proxy: EmulationProxy,
}
impl Emulation {
pub(crate) fn new(server: Server, listener: LanMouseListener) -> Self {
let emulation_proxy = EmulationProxy::new(server.clone());
Self {
server,
listener,
emulation_proxy,
}
}
async fn run(&mut self) {
while let Some((event, addr)) = self.listener.next().await {
match event {
ProtoEvent::Enter(_) => {
self.server.release_capture();
self.listener.reply(addr, ProtoEvent::Ack(0)).await;
}
ProtoEvent::Leave(_) => self.emulation_proxy.release_keys(addr).await,
ProtoEvent::Ack(_) => {}
ProtoEvent::Input(event) => self.emulation_proxy.consume(event, addr).await,
ProtoEvent::Ping => self.listener.reply(addr, ProtoEvent::Pong).await,
ProtoEvent::Pong => todo!(),
}
}
}
}
/// proxy handling the actual input emulation,
/// discarding events when it is disabled
pub(crate) struct EmulationProxy {
server: Server,
tx: Sender<(ProxyEvent, SocketAddr)>,
task: JoinHandle<()>,
}
enum ProxyEvent {
Input(Event),
ReleaseKeys,
}
impl EmulationProxy {
fn new(server: Server) -> Self {
let (tx, rx) = channel();
let task = spawn_local(Self::emulation_task(server.clone(), rx));
Self { server, tx, task }
}
async fn emulation_task(server: Server, mut rx: Receiver<(ProxyEvent, SocketAddr)>) {
let mut handles = HashMap::new();
let mut next_id = 0;
loop {
if let Err(e) = Self::do_emulation(&server, &mut handles, &mut next_id, &mut rx).await {
log::warn!("input emulation exited: {e}");
}
tokio::select! {
_ = server.emulation_notified() => {},
_ = server.cancelled() => return,
}
}
}
async fn do_emulation(
server: &Server,
handles: &mut HashMap<SocketAddr, EmulationHandle>,
next_id: &mut EmulationHandle,
rx: &mut Receiver<(ProxyEvent, SocketAddr)>,
) -> Result<(), InputEmulationError> {
let backend = server.config.emulation_backend.map(|b| b.into());
log::info!("creating input emulation ...");
let mut emulation = tokio::select! {
r = InputEmulation::new(backend) => r?,
_ = server.cancelled() => return Ok(()),
};
server.set_emulation_status(Status::Enabled);
// create active handles
for &handle in handles.values() {
emulation.create(handle).await;
}
let res = Self::do_emulation_session(server, &mut emulation, handles, next_id, rx).await;
// FIXME replace with async drop when stabilized
emulation.terminate().await;
server.set_emulation_status(Status::Disabled);
res
}
async fn do_emulation_session(
server: &Server,
emulation: &mut InputEmulation,
handles: &mut HashMap<SocketAddr, EmulationHandle>,
next_id: &mut EmulationHandle,
rx: &mut Receiver<(ProxyEvent, SocketAddr)>,
) -> Result<(), InputEmulationError> {
loop {
tokio::select! {
e = rx.recv() => {
let (event, addr) = e.expect("channel closed");
let handle = match handles.get(&addr) {
Some(&handle) => handle,
None => {
let handle = *next_id;
*next_id += 1;
emulation.create(handle).await;
handles.insert(addr, handle);
handle
}
};
match event {
ProxyEvent::Input(event) => emulation.consume(event, handle).await?,
ProxyEvent::ReleaseKeys => emulation.release_keys(handle).await?,
}
}
_ = server.cancelled() => break Ok(()),
}
}
}
async fn consume(&self, event: Event, addr: SocketAddr) {
// ignore events if emulation is currently disabled
if let Status::Enabled = self.server.emulation_status.get() {
self.tx.send((ProxyEvent::Input(event), addr));
}
}
async fn release_keys(&self, addr: SocketAddr) {
self.tx.send((ProxyEvent::ReleaseKeys, addr));
}
}

View File

@@ -3,6 +3,8 @@ pub mod config;
pub mod dns;
pub mod server;
pub mod capture_test;
pub mod crypto;
pub mod emulation_test;
mod capture_test;
mod connect;
mod crypto;
mod emulation;
mod listen;

129
src/listen.rs Normal file
View File

@@ -0,0 +1,129 @@
use futures::{Stream, StreamExt};
use lan_mouse_proto::{ProtoEvent, MAX_EVENT_SIZE};
use local_channel::mpsc::{channel, Receiver, Sender};
use std::{net::SocketAddr, rc::Rc, sync::Arc};
use thiserror::Error;
use tokio::{
sync::Mutex,
task::{spawn_local, JoinHandle},
};
use webrtc_dtls::{
config::{Config, ExtendedMasterSecretType},
crypto::Certificate,
listener::listen,
};
use webrtc_util::{conn::Listener, Conn, Error};
#[derive(Error, Debug)]
pub(crate) enum ListenerCreationError {
#[error(transparent)]
WebrtcUtil(#[from] webrtc_util::Error),
#[error(transparent)]
WebrtcDtls(#[from] webrtc_dtls::Error),
}
pub(crate) struct LanMouseListener {
listen_rx: Receiver<(ProtoEvent, SocketAddr)>,
listen_task: JoinHandle<()>,
conns: Rc<Mutex<Vec<Arc<dyn Conn + Send + Sync>>>>,
}
impl LanMouseListener {
pub(crate) async fn new(port: u16) -> Result<Self, ListenerCreationError> {
let (listen_tx, listen_rx): (
Sender<(ProtoEvent, SocketAddr)>,
Receiver<(ProtoEvent, SocketAddr)>,
) = channel();
let listen_addr = SocketAddr::new("0.0.0.0".parse().expect("invalid ip"), port);
let certificate = Certificate::generate_self_signed(["localhost".to_owned()])?;
let cfg = Config {
certificates: vec![certificate],
extended_master_secret: ExtendedMasterSecretType::Require,
..Default::default()
};
let listener = listen(listen_addr, cfg).await?;
let conns: Rc<Mutex<Vec<Arc<dyn Conn + Send + Sync>>>> = Rc::new(Mutex::new(Vec::new()));
let conns_clone = conns.clone();
let listen_task: JoinHandle<()> = spawn_local(async move {
loop {
let (conn, _addr) = match listener.accept().await {
Ok(c) => c,
Err(e) => {
log::warn!("accept: {e}");
continue;
}
};
let mut conns = conns_clone.lock().await;
conns.push(conn.clone());
spawn_local(read_loop(conns_clone.clone(), conn, listen_tx.clone()));
}
});
Ok(Self {
conns,
listen_rx,
listen_task,
})
}
pub(crate) async fn broadcast(&self, event: ProtoEvent) {
let (buf, len): ([u8; MAX_EVENT_SIZE], usize) = event.into();
let conns = self.conns.lock().await;
for conn in conns.iter() {
conn.send(&buf[..len]).await;
}
}
pub(crate) async fn reply(&self, addr: SocketAddr, event: ProtoEvent) {
let (buf, len): ([u8; MAX_EVENT_SIZE], usize) = event.into();
let conns = self.conns.lock().await;
for conn in conns.iter() {
if conn.remote_addr() == Some(addr) {
conn.send(&buf[..len]).await;
}
}
}
}
impl Stream for LanMouseListener {
type Item = (ProtoEvent, SocketAddr);
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
self.listen_rx.poll_next_unpin(cx)
}
}
async fn read_loop(
conns: Rc<Mutex<Vec<Arc<dyn Conn + Send + Sync>>>>,
conn: Arc<dyn Conn + Send + Sync>,
dtls_tx: Sender<(ProtoEvent, SocketAddr)>,
) -> Result<(), Error> {
let mut b = [0u8; MAX_EVENT_SIZE];
while let Ok(_) = conn.recv(&mut b).await {
match b.try_into() {
Ok(event) => dtls_tx
.send((event, conn.remote_addr().expect("no remote addr")))
.expect("channel closed"),
Err(e) => {
log::warn!("error receiving event: {e}");
break;
}
}
}
let mut conns = conns.lock().await;
let index = conns
.iter()
.position(|c| c.remote_addr() == conn.remote_addr())
.expect("connection not found");
conns.remove(index);
Ok(())
}

View File

@@ -15,17 +15,21 @@ use thiserror::Error;
use tokio::{join, signal, sync::Notify};
use tokio_util::sync::CancellationToken;
use crate::{client::ClientManager, config::Config, dns::DnsResolver};
use crate::{
client::ClientManager,
config::Config,
dns::DnsResolver,
emulation::Emulation,
listen::{LanMouseListener, ListenerCreationError},
};
use lan_mouse_ipc::{
AsyncFrontendListener, ClientConfig, ClientHandle, ClientState, FrontendEvent, FrontendRequest,
ListenerCreationError, Position, Status,
IpcListenerCreationError, Position, Status,
};
mod capture_task;
mod emulation_task;
mod network_task;
mod ping_task;
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
enum State {
@@ -43,9 +47,11 @@ pub enum ServiceError {
#[error(transparent)]
Dns(#[from] ResolveError),
#[error(transparent)]
Listen(#[from] ListenerCreationError),
IpcListen(#[from] IpcListenerCreationError),
#[error(transparent)]
Io(#[from] io::Error),
#[error(transparent)]
ListenError(#[from] ListenerCreationError),
}
#[derive(Clone)]
@@ -56,17 +62,16 @@ pub struct Server {
state: Rc<Cell<State>>,
release_bind: Vec<input_event::scancode::Linux>,
notifies: Rc<Notifies>,
config: Rc<Config>,
pub(crate) config: Rc<Config>,
pending_frontend_events: Rc<RefCell<VecDeque<FrontendEvent>>>,
capture_status: Rc<Cell<Status>>,
emulation_status: Rc<Cell<Status>>,
pub(crate) emulation_status: Rc<Cell<Status>>,
}
#[derive(Default)]
struct Notifies {
capture: Notify,
emulation: Notify,
ping: Notify,
port_changed: Notify,
frontend_event_pending: Notify,
cancel: CancellationToken,
@@ -121,7 +126,7 @@ impl Server {
// create frontend communication adapter, exit if already running
let mut frontend = match AsyncFrontendListener::new().await {
Ok(f) => f,
Err(ListenerCreationError::AlreadyRunning) => {
Err(IpcListenerCreationError::AlreadyRunning) => {
log::info!("service already running, exiting");
return Ok(());
}
@@ -130,25 +135,21 @@ impl Server {
let (capture_tx, capture_rx) = channel(); /* requests for input capture */
let (emulation_tx, emulation_rx) = channel(); /* emulation requests */
let (udp_recv_tx, udp_recv_rx) = channel(); /* udp receiver */
let (udp_send_tx, udp_send_rx) = channel(); /* udp sender */
let (dns_tx, dns_rx) = channel(); /* dns requests */
let network = network_task::new(self.clone(), udp_recv_tx.clone(), udp_send_rx).await?;
let capture = capture_task::new(self.clone(), capture_rx, udp_send_tx.clone());
let emulation =
emulation_task::new(self.clone(), emulation_rx, udp_recv_rx, udp_send_tx.clone());
// udp task
let listener = LanMouseListener::new(self.config.port).await?;
// input capture
// let capture = capture_task::new(self.clone(), capture_rx);
// input emulation
let emulation = Emulation::new(self.clone(), listener);
// create dns resolver
let resolver = DnsResolver::new(dns_rx)?;
let dns_task = tokio::task::spawn_local(resolver.run(self.clone()));
// task that pings clients to see if they are responding
let ping = ping_task::new(
self.clone(),
udp_send_tx.clone(),
emulation_tx.clone(),
capture_tx.clone(),
);
for handle in self.active_clients() {
dns_tx.send(handle).expect("channel closed");
}
@@ -187,7 +188,7 @@ impl Server {
log::info!("terminating service");
self.cancel();
let _ = join!(capture, dns_task, emulation, network, ping);
let _ = join!(dns_task);
Ok(())
}
@@ -205,7 +206,7 @@ impl Server {
self.notifies.cancel.cancelled().await
}
fn is_cancelled(&self) -> bool {
pub(crate) fn is_cancelled(&self) -> bool {
self.notifies.cancel.is_cancelled()
}
@@ -223,18 +224,10 @@ impl Server {
self.notifies.emulation.notify_waiters()
}
async fn emulation_notified(&self) {
pub(crate) async fn emulation_notified(&self) {
self.notifies.emulation.notified().await
}
fn restart_ping_timer(&self) {
self.notifies.ping.notify_waiters()
}
async fn ping_timer_notified(&self) {
self.notifies.ping.notified().await
}
fn request_port_change(&self, port: u16) {
self.port.replace(port);
self.notifies.port_changed.notify_one();
@@ -396,12 +389,6 @@ impl Server {
}
}
fn update_pressed_keys(&self, handle: ClientHandle, has_pressed_keys: bool) {
if let Some((_, s)) = self.client_manager.borrow_mut().get_mut(handle) {
s.has_pressed_keys = has_pressed_keys;
}
}
fn update_fix_ips(&self, handle: ClientHandle, fix_ips: Vec<IpAddr>) {
if let Some((c, _)) = self.client_manager.borrow_mut().get_mut(handle) {
c.fix_ips = fix_ips;
@@ -504,7 +491,7 @@ impl Server {
self.notify_frontend(event);
}
fn set_emulation_status(&self, status: Status) {
pub(crate) fn set_emulation_status(&self, status: Status) {
self.emulation_status.replace(status);
let status = FrontendEvent::EmulationStatus(status);
self.notify_frontend(status);
@@ -550,6 +537,10 @@ impl Server {
.get(handle)
.and_then(|(_, s)| s.active_addr)
}
pub(crate) fn release_capture(&self) {
todo!()
}
}
fn to_capture_pos(pos: Position) -> input_capture::Position {

View File

@@ -1,116 +0,0 @@
use local_channel::mpsc::{Receiver, Sender};
use std::{cell::RefCell, collections::HashMap, io, net::SocketAddr, rc::Rc, sync::Arc};
use webrtc_dtls::{
config::{Config, ExtendedMasterSecretType},
conn::DTLSConn,
crypto::Certificate,
listener::listen,
};
use webrtc_util::{conn::Listener, Conn};
use thiserror::Error;
use tokio::{
net::UdpSocket,
task::{spawn_local, JoinHandle},
};
use crate::crypto;
use super::Server;
use lan_mouse_proto::{ProtoEvent, ProtocolError};
pub(crate) async fn new(
server: Server,
udp_recv_tx: Sender<Result<(ProtoEvent, SocketAddr), NetworkError>>,
udp_send_rx: Receiver<(ProtoEvent, SocketAddr)>,
) -> io::Result<JoinHandle<()>> {
// bind the udp socket
let listen_addr = SocketAddr::new("0.0.0.0".parse().unwrap(), server.port.get());
Ok(spawn_local(async move {
let sender_rx = Rc::new(RefCell::new(udp_send_rx));
let udp_receiver = spawn_local(listen_dtls(listen_addr, udp_recv_tx.clone()));
let udp_sender = spawn_local(udp_sender(sender_rx.clone()));
log::info!("starting sender + receiver");
tokio::select! {
e = udp_receiver => panic!("{e:?}"), /* channel closed */
_ = udp_sender => {}, /* channel closed */
_ = server.cancelled() => {}, /* cancellation requested */
}
}))
}
async fn listen_dtls(
listen_addr: SocketAddr,
udp_recv_tx: Sender<Result<(ProtoEvent, SocketAddr), NetworkError>>,
) -> Result<(), NetworkError> {
let certificate = Certificate::generate_self_signed(vec!["localhost".to_owned()]).unwrap();
let cfg = Config {
certificates: vec![certificate],
extended_master_secret: ExtendedMasterSecretType::Require,
..Default::default()
};
let listener = Arc::new(listen(listen_addr, cfg).await?);
loop {
while let Ok((conn, addr)) = listener.accept().await {
let udp_recv_tx = udp_recv_tx.clone();
spawn_local(async move {
loop {
let mut buf = [0u8; lan_mouse_proto::MAX_EVENT_SIZE];
let event: Result<_, NetworkError> = match conn.recv(&mut buf).await {
Ok(_len) => match ProtoEvent::try_from(buf) {
Ok(e) => Ok((e, addr)),
Err(e) => Err(e.into()),
},
Err(e) => Err(e.into()),
};
udp_recv_tx.send(event).expect("channel closed");
}
});
}
}
}
async fn udp_sender(rx: Rc<RefCell<Receiver<(ProtoEvent, SocketAddr)>>>) {
let mut connection_pool: HashMap<SocketAddr, DTLSConn> = HashMap::new();
loop {
log::error!("waiting for event to send ...");
let (event, addr) = rx.borrow_mut().recv().await.expect("channel closed");
let addr = SocketAddr::new(addr.ip(), 4242);
if !connection_pool.contains_key(&addr) {
let socket = Arc::new(UdpSocket::bind("0.0.0.0:0").await.unwrap());
socket.connect(addr).await.unwrap();
let certificate =
Certificate::generate_self_signed(vec!["localhost".to_owned()]).unwrap();
let config = Config {
certificates: vec![certificate],
insecure_skip_verify: true,
extended_master_secret: ExtendedMasterSecretType::Require,
..Default::default()
};
log::error!("connecting to {addr}");
let conn = DTLSConn::new(socket, config, true, None).await.unwrap();
log::error!("connected {addr}!");
connection_pool.insert(addr, conn);
};
let conn = connection_pool.get(&addr).unwrap();
let (data, len): ([u8; lan_mouse_proto::MAX_EVENT_SIZE], usize) = event.into();
conn.send(&data[..len]).await.unwrap();
}
}
#[derive(Debug, Error)]
pub(crate) enum NetworkError {
#[error(transparent)]
Protocol(#[from] ProtocolError),
#[error("network error: `{0}`")]
Io(#[from] io::Error),
#[error(transparent)]
Crypt(#[from] crypto::Error),
#[error(transparent)]
Rustls(#[from] rustls::Error),
#[error(transparent)]
WebrtcDtls(#[from] webrtc_dtls::Error),
#[error(transparent)]
WebrtcUtil(#[from] webrtc_util::Error),
}

View File

@@ -1,138 +0,0 @@
use std::{net::SocketAddr, time::Duration};
use lan_mouse_proto::ProtoEvent;
use local_channel::mpsc::Sender;
use tokio::task::JoinHandle;
use lan_mouse_ipc::ClientHandle;
use super::{capture_task::CaptureRequest, emulation_task::EmulationRequest, Server, State};
const MAX_RESPONSE_TIME: Duration = Duration::from_millis(500);
pub(crate) fn new(
server: Server,
sender_ch: Sender<(ProtoEvent, SocketAddr)>,
emulate_notify: Sender<EmulationRequest>,
capture_notify: Sender<CaptureRequest>,
) -> JoinHandle<()> {
// timer task
tokio::task::spawn_local(async move {
tokio::select! {
_ = server.notifies.cancel.cancelled() => {}
_ = ping_task(&server, sender_ch, emulate_notify, capture_notify) => {}
}
})
}
async fn ping_task(
server: &Server,
sender_ch: Sender<(ProtoEvent, SocketAddr)>,
emulate_notify: Sender<EmulationRequest>,
capture_notify: Sender<CaptureRequest>,
) {
loop {
// wait for wake up signal
server.ping_timer_notified().await;
loop {
let receiving = server.state.get() == State::Receiving;
let (ping_clients, ping_addrs) = {
let mut client_manager = server.client_manager.borrow_mut();
let ping_clients: Vec<ClientHandle> = if receiving {
// if receiving we care about clients with pressed keys
client_manager
.get_client_states()
.filter(|(_, (_, s))| s.has_pressed_keys)
.map(|(h, _)| h)
.collect()
} else {
// if sending we care about the active client
server.active_client.get().iter().cloned().collect()
};
// get relevant socket addrs for clients
let ping_addrs: Vec<SocketAddr> = {
ping_clients
.iter()
.flat_map(|&h| client_manager.get(h))
.flat_map(|(c, s)| {
if s.alive && s.active_addr.is_some() {
vec![s.active_addr.unwrap()]
} else {
s.ips
.iter()
.cloned()
.map(|ip| SocketAddr::new(ip, c.port))
.collect()
}
})
.collect()
};
// reset alive
for (_, (_, s)) in client_manager.get_client_states_mut() {
s.alive = false;
}
(ping_clients, ping_addrs)
};
if receiving && ping_clients.is_empty() {
// receiving and no client has pressed keys
// -> no need to keep pinging
break;
}
// ping clients
for addr in ping_addrs {
if sender_ch.send((ProtoEvent::Ping, addr)).is_err() {
break;
}
}
// give clients time to resond
if receiving {
log::trace!(
"waiting {MAX_RESPONSE_TIME:?} for response from client with pressed keys ..."
);
} else {
log::trace!(
"state: {:?} => waiting {MAX_RESPONSE_TIME:?} for client to respond ...",
server.state.get()
);
}
tokio::time::sleep(MAX_RESPONSE_TIME).await;
// when anything is received from a client,
// the alive flag gets set
let unresponsive_clients: Vec<_> = {
let client_manager = server.client_manager.borrow();
ping_clients
.iter()
.filter_map(|&h| match client_manager.get(h) {
Some((_, s)) if !s.alive => Some(h),
_ => None,
})
.collect()
};
// we may not be receiving anymore but we should respond
// to the original state and not the "new" one
if receiving {
for h in unresponsive_clients {
log::warn!("device not responding, releasing keys!");
let _ = emulate_notify.send(EmulationRequest::ReleaseKeys(h));
}
} else {
// release pointer if the active client has not responded
if !unresponsive_clients.is_empty() {
log::warn!("client not responding, releasing pointer!");
server.state.replace(State::Receiving);
let _ = capture_notify.send(CaptureRequest::Release);
}
}
}
}
}