finish up server

This commit is contained in:
Ferdinand Schober
2024-09-06 18:28:04 +02:00
parent ad8c92cfbe
commit 937652ac44
6 changed files with 99 additions and 86 deletions

View File

@@ -1,6 +1,6 @@
use futures::StreamExt;
use input_capture::{
Backend, CaptureError, CaptureEvent, CaptureHandle, InputCapture, InputCaptureError, Position,
CaptureError, CaptureEvent, CaptureHandle, InputCapture, InputCaptureError, Position,
};
use lan_mouse_ipc::{ClientHandle, Status};
use lan_mouse_proto::ProtoEvent;
@@ -13,9 +13,8 @@ use tokio::{
use crate::{connect::LanMouseConnection, server::Server};
pub(crate) struct CaptureProxy {
server: Server,
tx: Sender<CaptureRequest>,
task: JoinHandle<()>,
_task: JoinHandle<()>,
}
#[derive(Clone, Copy, Debug)]
@@ -29,20 +28,34 @@ enum CaptureRequest {
}
impl CaptureProxy {
pub(crate) fn new(server: Server, backend: Option<Backend>, conn: LanMouseConnection) -> Self {
pub(crate) fn new(server: Server, conn: LanMouseConnection) -> Self {
let (tx, rx) = channel();
let task = spawn_local(Self::run(server.clone(), backend, rx, conn));
Self { server, tx, task }
let _task = spawn_local(Self::run(server.clone(), rx, conn));
Self { tx, _task }
}
pub(crate) async fn run(
server: Server,
backend: Option<Backend>,
mut rx: Receiver<CaptureRequest>,
conn: LanMouseConnection,
) {
pub(crate) fn create(&self, handle: CaptureHandle, pos: Position) {
self.tx
.send(CaptureRequest::Create(handle, pos))
.expect("channel closed");
}
pub(crate) fn destroy(&self, handle: CaptureHandle) {
self.tx
.send(CaptureRequest::Destroy(handle))
.expect("channel closed");
}
#[allow(unused)]
pub(crate) fn release(&self) {
self.tx
.send(CaptureRequest::Release)
.expect("channel closed");
}
async fn run(server: Server, mut rx: Receiver<CaptureRequest>, conn: LanMouseConnection) {
loop {
if let Err(e) = do_capture(backend, &server, &conn, &mut rx).await {
if let Err(e) = do_capture(&server, &conn, &mut rx).await {
log::warn!("input capture exited: {e}");
}
server.set_capture_status(Status::Disabled);
@@ -56,11 +69,12 @@ impl CaptureProxy {
}
async fn do_capture(
backend: Option<Backend>,
server: &Server,
conn: &LanMouseConnection,
rx: &mut Receiver<CaptureRequest>,
) -> Result<(), InputCaptureError> {
let backend = server.config.capture_backend.map(|b| b.into());
/* allow cancelling capture request */
let mut capture = tokio::select! {
r = InputCapture::new(backend) => r?,
@@ -82,7 +96,7 @@ async fn do_capture(
loop {
tokio::select! {
event = capture.next() => match event {
Some(event) => handle_capture_event(server, &mut capture, sender_tx, event?).await?,
Some(event) => handle_capture_event(server, &mut capture, conn, event?).await?,
None => return Ok(()),
},
e = rx.recv() => {
@@ -106,7 +120,7 @@ async fn do_capture(
async fn handle_capture_event(
server: &Server,
capture: &mut InputCapture,
conn: &mut LanMouseConnection,
conn: &LanMouseConnection,
event: (CaptureHandle, CaptureEvent),
) -> Result<(), CaptureError> {
let (handle, event) = event;
@@ -125,7 +139,10 @@ async fn handle_capture_event(
CaptureEvent::Input(e) => ProtoEvent::Input(e),
};
conn.send(event, handle).await;
if let Err(e) = conn.send(event, handle).await {
log::warn!("failed to connect, releasing capture: {e}");
capture.release().await?;
}
Ok(())
}

View File

@@ -25,7 +25,7 @@ pub(crate) enum LanMouseConnectionError {
async fn connect(addr: SocketAddr) -> Result<Arc<dyn Conn + Sync + Send>, LanMouseConnectionError> {
let conn = Arc::new(UdpSocket::bind("0.0.0.0:0").await?);
conn.connect(addr).await;
conn.connect(addr).await?;
log::info!("connected to {addr}, establishing secure dtls channel ...");
let certificate = Certificate::generate_self_signed(["localhost".to_owned()])?;
let config = Config {
@@ -56,14 +56,11 @@ pub(crate) struct LanMouseConnection {
}
impl LanMouseConnection {
fn find_conn(&self, addrs: &[SocketAddr]) -> Vec<Arc<dyn Conn + Send + Sync>> {
let mut conns = vec![];
for addr in addrs {
if let Some(conn) = self.conns.get(&addr) {
conns.push(conn.clone());
}
pub(crate) fn new(server: Server) -> Self {
Self {
server,
conns: Default::default(),
}
conns
}
pub(crate) async fn send(

View File

@@ -10,32 +10,29 @@ use tokio::task::{spawn_local, JoinHandle};
/// emulation handling events received from a listener
pub(crate) struct Emulation {
server: Server,
listener: LanMouseListener,
emulation_proxy: EmulationProxy,
_tx: Sender<ProxyEvent>,
_task: JoinHandle<()>,
}
impl Emulation {
pub(crate) fn new(server: Server, listener: LanMouseListener) -> Self {
let (_tx, _rx) = channel();
let emulation_proxy = EmulationProxy::new(server.clone());
Self {
server,
listener,
emulation_proxy,
}
let _task = spawn_local(Self::run(server, listener, emulation_proxy));
Self { _tx, _task }
}
async fn run(&mut self) {
while let Some((event, addr)) = self.listener.next().await {
async fn run(server: Server, mut listener: LanMouseListener, emulation_proxy: EmulationProxy) {
while let Some((event, addr)) = listener.next().await {
match event {
ProtoEvent::Enter(_) => {
self.server.release_capture();
self.listener.reply(addr, ProtoEvent::Ack(0)).await;
server.release_capture();
listener.reply(addr, ProtoEvent::Ack(0)).await;
}
ProtoEvent::Leave(_) => self.emulation_proxy.release_keys(addr).await,
ProtoEvent::Leave(_) => emulation_proxy.release_keys(addr).await,
ProtoEvent::Ack(_) => {}
ProtoEvent::Input(event) => self.emulation_proxy.consume(event, addr).await,
ProtoEvent::Ping => self.listener.reply(addr, ProtoEvent::Pong).await,
ProtoEvent::Input(event) => emulation_proxy.consume(event, addr).await,
ProtoEvent::Ping => listener.reply(addr, ProtoEvent::Pong).await,
ProtoEvent::Pong => todo!(),
}
}
@@ -47,6 +44,7 @@ impl Emulation {
pub(crate) struct EmulationProxy {
server: Server,
tx: Sender<(ProxyEvent, SocketAddr)>,
#[allow(unused)]
task: JoinHandle<()>,
}
@@ -62,6 +60,21 @@ impl EmulationProxy {
Self { server, tx, task }
}
async fn consume(&self, event: Event, addr: SocketAddr) {
// ignore events if emulation is currently disabled
if let Status::Enabled = self.server.emulation_status.get() {
self.tx
.send((ProxyEvent::Input(event), addr))
.expect("channel closed");
}
}
async fn release_keys(&self, addr: SocketAddr) {
self.tx
.send((ProxyEvent::ReleaseKeys, addr))
.expect("channel closed");
}
async fn emulation_task(server: Server, mut rx: Receiver<(ProxyEvent, SocketAddr)>) {
let mut handles = HashMap::new();
let mut next_id = 0;
@@ -133,15 +146,4 @@ impl EmulationProxy {
}
}
}
async fn consume(&self, event: Event, addr: SocketAddr) {
// ignore events if emulation is currently disabled
if let Status::Enabled = self.server.emulation_status.get() {
self.tx.send((ProxyEvent::Input(event), addr));
}
}
async fn release_keys(&self, addr: SocketAddr) {
self.tx.send((ProxyEvent::ReleaseKeys, addr));
}
}

View File

@@ -1,11 +1,12 @@
mod capture;
pub mod capture_test;
pub mod client;
pub mod config;
pub mod dns;
pub mod server;
mod capture;
mod capture_test;
mod connect;
#[allow(unused)]
mod crypto;
mod dns;
mod emulation;
pub mod emulation_test;
mod listen;
pub mod server;

View File

@@ -15,7 +15,7 @@ use webrtc_dtls::{
use webrtc_util::{conn::Listener, Conn, Error};
#[derive(Error, Debug)]
pub(crate) enum ListenerCreationError {
pub enum ListenerCreationError {
#[error(transparent)]
WebrtcUtil(#[from] webrtc_util::Error),
#[error(transparent)]
@@ -24,7 +24,7 @@ pub(crate) enum ListenerCreationError {
pub(crate) struct LanMouseListener {
listen_rx: Receiver<(ProtoEvent, SocketAddr)>,
listen_task: JoinHandle<()>,
_listen_task: JoinHandle<()>,
conns: Rc<Mutex<Vec<Arc<dyn Conn + Send + Sync>>>>,
}
@@ -49,7 +49,7 @@ impl LanMouseListener {
let conns_clone = conns.clone();
let listen_task: JoinHandle<()> = spawn_local(async move {
let _listen_task: JoinHandle<()> = spawn_local(async move {
loop {
let (conn, _addr) = match listener.accept().await {
Ok(c) => c,
@@ -67,15 +67,16 @@ impl LanMouseListener {
Ok(Self {
conns,
listen_rx,
listen_task,
_listen_task,
})
}
#[allow(unused)]
pub(crate) async fn broadcast(&self, event: ProtoEvent) {
let (buf, len): ([u8; MAX_EVENT_SIZE], usize) = event.into();
let conns = self.conns.lock().await;
for conn in conns.iter() {
conn.send(&buf[..len]).await;
let _ = conn.send(&buf[..len]).await;
}
}
@@ -84,7 +85,7 @@ impl LanMouseListener {
let conns = self.conns.lock().await;
for conn in conns.iter() {
if conn.remote_addr() == Some(addr) {
conn.send(&buf[..len]).await;
let _ = conn.send(&buf[..len]).await;
}
}
}

View File

@@ -1,4 +1,3 @@
use capture_task::CaptureRequest;
use futures::StreamExt;
use hickory_resolver::error::ResolveError;
use local_channel::mpsc::{channel, Sender};
@@ -15,8 +14,10 @@ use tokio::{join, signal, sync::Notify};
use tokio_util::sync::CancellationToken;
use crate::{
capture::CaptureProxy,
client::ClientManager,
config::Config,
connect::LanMouseConnection,
dns::DnsResolver,
emulation::Emulation,
listen::{LanMouseListener, ListenerCreationError},
@@ -27,9 +28,6 @@ use lan_mouse_ipc::{
IpcListenerCreationError, Position, Status,
};
mod capture_task;
mod emulation_task;
#[derive(Debug, Error)]
pub enum ServiceError {
#[error(transparent)]
@@ -48,6 +46,7 @@ pub struct ReleaseToken;
pub struct Server {
pub(crate) client_manager: Rc<RefCell<ClientManager>>,
port: Rc<Cell<u16>>,
#[allow(unused)]
release_bind: Vec<input_event::scancode::Linux>,
notifies: Rc<Notifies>,
pub(crate) config: Rc<Config>,
@@ -104,6 +103,7 @@ impl Server {
pending_frontend_events: Rc::new(RefCell::new(VecDeque::new())),
capture_status: Default::default(),
emulation_status: Default::default(),
should_release: Default::default(),
}
}
@@ -118,17 +118,15 @@ impl Server {
e => e?,
};
let (capture_tx, capture_rx) = channel(); /* requests for input capture */
let (dns_tx, dns_rx) = channel(); /* dns requests */
// udp task
// listener + connection
let listener = LanMouseListener::new(self.config.port).await?;
let conn = LanMouseConnection::new(self.clone());
// input capture
// let capture = capture_task::new(self.clone(), capture_rx);
// input emulation
let emulation = Emulation::new(self.clone(), listener);
// input capture + emulation
let capture = CaptureProxy::new(self.clone(), conn);
let _emulation = Emulation::new(self.clone(), listener);
// create dns resolver
let resolver = DnsResolver::new(dns_rx)?;
@@ -150,7 +148,7 @@ impl Server {
None => break,
};
log::debug!("received frontend request: {request:?}");
self.handle_request(&capture_tx.clone(), request);
self.handle_request(&capture, request, &dns_tx);
log::debug!("handled frontend request");
}
_ = self.notifies.frontend_event_pending.notified() => {
@@ -191,10 +189,6 @@ impl Server {
self.notifies.cancel.cancelled().await
}
pub(crate) fn is_cancelled(&self) -> bool {
self.notifies.cancel.is_cancelled()
}
fn notify_capture(&self) {
log::info!("received capture enable request");
self.notifies.capture.notify_waiters()
@@ -218,6 +212,7 @@ impl Server {
self.notifies.port_changed.notify_one();
}
#[allow(unused)]
fn notify_port_changed(&self, port: u16, msg: Option<String>) {
self.port.replace(port);
self.notify_frontend(FrontendEvent::PortChanged(port, msg));
@@ -238,7 +233,7 @@ impl Server {
fn handle_request(
&self,
capture: &Sender<CaptureRequest>,
capture: &CaptureProxy,
event: FrontendRequest,
dns: &Sender<ClientHandle>,
) -> bool {
@@ -300,7 +295,7 @@ impl Server {
handle
}
fn deactivate_client(&self, capture: &Sender<CaptureRequest>, handle: ClientHandle) {
fn deactivate_client(&self, capture: &CaptureProxy, handle: ClientHandle) {
log::debug!("deactivating client {handle}");
match self.client_manager.borrow_mut().get_mut(handle) {
None => return,
@@ -308,12 +303,12 @@ impl Server {
Some((_, s)) => s.active = false,
};
let _ = capture.send(CaptureRequest::Destroy(handle));
capture.destroy(handle);
self.client_updated(handle);
log::info!("deactivated client {handle}");
}
fn activate_client(&self, capture: &Sender<CaptureRequest>, handle: ClientHandle) {
fn activate_client(&self, capture: &CaptureProxy, handle: ClientHandle) {
log::debug!("activating client");
/* deactivate potential other client at this position */
let pos = match self.client_manager.borrow().get(handle) {
@@ -335,12 +330,12 @@ impl Server {
};
/* notify capture and frontends */
let _ = capture.send(CaptureRequest::Create(handle, to_capture_pos(pos)));
capture.create(handle, to_capture_pos(pos));
self.client_updated(handle);
log::info!("activated client {handle} ({pos})");
}
fn remove_client(&self, capture: &Sender<CaptureRequest>, handle: ClientHandle) {
fn remove_client(&self, capture: &CaptureProxy, handle: ClientHandle) {
let Some(active) = self
.client_manager
.borrow_mut()
@@ -351,7 +346,7 @@ impl Server {
};
if active {
let _ = capture.send(CaptureRequest::Destroy(handle));
capture.destroy(handle);
}
}
@@ -433,7 +428,7 @@ impl Server {
}
}
fn update_pos(&self, handle: ClientHandle, capture: &Sender<CaptureRequest>, pos: Position) {
fn update_pos(&self, handle: ClientHandle, capture: &CaptureProxy, pos: Position) {
let (changed, active) = {
let mut client_manager = self.client_manager.borrow_mut();
let Some((c, s)) = client_manager.get_mut(handle) else {