[wip] one-way control

This commit is contained in:
Ferdinand Schober
2024-09-29 14:04:36 +02:00
parent 81ca510d12
commit c205371dfc
10 changed files with 101 additions and 70 deletions

View File

@@ -15,7 +15,7 @@ use tokio::{
task::{spawn_local, JoinHandle}, task::{spawn_local, JoinHandle},
}; };
use crate::{connect::LanMouseConnection, server::Server}; use crate::{connect::LanMouseConnection, service::Service};
pub(crate) struct Capture { pub(crate) struct Capture {
tx: Sender<CaptureRequest>, tx: Sender<CaptureRequest>,
@@ -33,7 +33,7 @@ enum CaptureRequest {
} }
impl Capture { impl Capture {
pub(crate) fn new(server: Server, conn: LanMouseConnection) -> Self { pub(crate) fn new(server: Service, conn: LanMouseConnection) -> Self {
let (tx, rx) = channel(); let (tx, rx) = channel();
let task = spawn_local(Self::run(server.clone(), rx, conn)); let task = spawn_local(Self::run(server.clone(), rx, conn));
Self { tx, task } Self { tx, task }
@@ -66,7 +66,7 @@ impl Capture {
.expect("channel closed"); .expect("channel closed");
} }
async fn run(server: Server, mut rx: Receiver<CaptureRequest>, mut conn: LanMouseConnection) { async fn run(server: Service, mut rx: Receiver<CaptureRequest>, mut conn: LanMouseConnection) {
loop { loop {
if let Err(e) = do_capture(&server, &mut conn, &mut rx).await { if let Err(e) = do_capture(&server, &mut conn, &mut rx).await {
log::warn!("input capture exited: {e}"); log::warn!("input capture exited: {e}");
@@ -87,7 +87,7 @@ impl Capture {
} }
async fn do_capture( async fn do_capture(
server: &Server, server: &Service,
conn: &mut LanMouseConnection, conn: &mut LanMouseConnection,
rx: &mut Receiver<CaptureRequest>, rx: &mut Receiver<CaptureRequest>,
) -> Result<(), InputCaptureError> { ) -> Result<(), InputCaptureError> {
@@ -191,7 +191,7 @@ enum State {
} }
async fn handle_capture_event( async fn handle_capture_event(
server: &Server, server: &Service,
capture: &mut InputCapture, capture: &mut InputCapture,
conn: &LanMouseConnection, conn: &LanMouseConnection,
event: (CaptureHandle, CaptureEvent), event: (CaptureHandle, CaptureEvent),
@@ -241,7 +241,7 @@ async fn handle_capture_event(
Ok(()) Ok(())
} }
async fn release_capture(capture: &mut InputCapture, server: &Server) -> Result<(), CaptureError> { async fn release_capture(capture: &mut InputCapture, server: &Service) -> Result<(), CaptureError> {
server.set_active(None); server.set_active(None);
capture.release().await capture.release().await
} }
@@ -264,7 +264,7 @@ fn to_proto_pos(pos: lan_mouse_ipc::Position) -> lan_mouse_proto::Position {
} }
} }
fn spawn_hook_command(server: &Server, handle: ClientHandle) { fn spawn_hook_command(server: &Service, handle: ClientHandle) {
let Some(cmd) = server.client_manager.get_enter_cmd(handle) else { let Some(cmd) = server.client_manager.get_enter_cmd(handle) else {
return; return;
}; };

View File

@@ -39,7 +39,7 @@ impl ClientManager {
pub fn activate_client(&self, handle: ClientHandle) -> bool { pub fn activate_client(&self, handle: ClientHandle) -> bool {
let mut clients = self.clients.borrow_mut(); let mut clients = self.clients.borrow_mut();
match clients.get_mut(handle as usize) { match clients.get_mut(handle as usize) {
Some((_, s)) if s.active == false => { Some((_, s)) if !s.active => {
s.active = true; s.active = true;
true true
} }

View File

@@ -1,4 +1,4 @@
use crate::server::Server; use crate::service::Service;
use lan_mouse_ipc::{ClientHandle, DEFAULT_PORT}; use lan_mouse_ipc::{ClientHandle, DEFAULT_PORT};
use lan_mouse_proto::{ProtoEvent, MAX_EVENT_SIZE}; use lan_mouse_proto::{ProtoEvent, MAX_EVENT_SIZE};
use local_channel::mpsc::{channel, Receiver, Sender}; use local_channel::mpsc::{channel, Receiver, Sender};
@@ -68,7 +68,7 @@ async fn connect_any(
} }
pub(crate) struct LanMouseConnection { pub(crate) struct LanMouseConnection {
server: Server, server: Service,
cert: Certificate, cert: Certificate,
conns: Rc<Mutex<HashMap<SocketAddr, Arc<dyn Conn + Send + Sync>>>>, conns: Rc<Mutex<HashMap<SocketAddr, Arc<dyn Conn + Send + Sync>>>>,
connecting: Rc<Mutex<HashSet<ClientHandle>>>, connecting: Rc<Mutex<HashSet<ClientHandle>>>,
@@ -77,7 +77,7 @@ pub(crate) struct LanMouseConnection {
} }
impl LanMouseConnection { impl LanMouseConnection {
pub(crate) fn new(server: Server, cert: Certificate) -> Self { pub(crate) fn new(server: Service, cert: Certificate) -> Self {
let (recv_tx, recv_rx) = channel(); let (recv_tx, recv_rx) = channel();
Self { Self {
server, server,
@@ -139,7 +139,7 @@ impl LanMouseConnection {
} }
async fn connect_to_handle( async fn connect_to_handle(
server: Server, server: Service,
cert: Certificate, cert: Certificate,
handle: ClientHandle, handle: ClientHandle,
conns: Rc<Mutex<HashMap<SocketAddr, Arc<dyn Conn + Send + Sync>>>>, conns: Rc<Mutex<HashMap<SocketAddr, Arc<dyn Conn + Send + Sync>>>>,
@@ -189,7 +189,7 @@ async fn connect_to_handle(
} }
async fn ping_pong( async fn ping_pong(
server: Server, server: Service,
handle: ClientHandle, handle: ClientHandle,
addr: SocketAddr, addr: SocketAddr,
conn: Arc<dyn Conn + Send + Sync>, conn: Arc<dyn Conn + Send + Sync>,
@@ -214,7 +214,7 @@ async fn ping_pong(
} }
async fn receive_loop( async fn receive_loop(
server: Server, server: Service,
handle: ClientHandle, handle: ClientHandle,
addr: SocketAddr, addr: SocketAddr,
conn: Arc<dyn Conn + Send + Sync>, conn: Arc<dyn Conn + Send + Sync>,
@@ -238,7 +238,7 @@ async fn receive_loop(
} }
async fn disconnect( async fn disconnect(
server: &Server, server: &Service,
handle: ClientHandle, handle: ClientHandle,
addr: SocketAddr, addr: SocketAddr,
conns: &Mutex<HashMap<SocketAddr, Arc<dyn Conn + Send + Sync>>>, conns: &Mutex<HashMap<SocketAddr, Arc<dyn Conn + Send + Sync>>>,

View File

@@ -64,6 +64,6 @@ pub(crate) fn generate_key_and_cert(path: &Path) -> Result<Certificate, Error> {
} }
/* FIXME windows permissions */ /* FIXME windows permissions */
let mut writer = BufWriter::new(f); let mut writer = BufWriter::new(f);
writer.write(serialized.as_bytes())?; writer.write_all(serialized.as_bytes())?;
Ok(cert) Ok(cert)
} }

View File

@@ -3,7 +3,7 @@ use tokio::task::{spawn_local, JoinHandle};
use hickory_resolver::{error::ResolveError, TokioAsyncResolver}; use hickory_resolver::{error::ResolveError, TokioAsyncResolver};
use crate::server::Server; use crate::service::Service;
use lan_mouse_ipc::ClientHandle; use lan_mouse_ipc::ClientHandle;
pub(crate) struct DnsResolver { pub(crate) struct DnsResolver {
@@ -12,7 +12,7 @@ pub(crate) struct DnsResolver {
} }
impl DnsResolver { impl DnsResolver {
pub(crate) fn new(server: Server) -> Result<Self, ResolveError> { pub(crate) fn new(server: Service) -> Result<Self, ResolveError> {
let resolver = TokioAsyncResolver::tokio_from_system_conf()?; let resolver = TokioAsyncResolver::tokio_from_system_conf()?;
let (tx, rx) = channel(); let (tx, rx) = channel();
let _task = spawn_local(Self::run(server, resolver, rx)); let _task = spawn_local(Self::run(server, resolver, rx));
@@ -23,7 +23,7 @@ impl DnsResolver {
self.tx.send(host).expect("channel closed"); self.tx.send(host).expect("channel closed");
} }
async fn run(server: Server, resolver: TokioAsyncResolver, mut rx: Receiver<ClientHandle>) { async fn run(server: Service, resolver: TokioAsyncResolver, mut rx: Receiver<ClientHandle>) {
tokio::select! { tokio::select! {
_ = server.cancelled() => {}, _ = server.cancelled() => {},
_ = Self::do_dns(&server, &resolver, &mut rx) => {}, _ = Self::do_dns(&server, &resolver, &mut rx) => {},
@@ -31,7 +31,7 @@ impl DnsResolver {
} }
async fn do_dns( async fn do_dns(
server: &Server, server: &Service,
resolver: &TokioAsyncResolver, resolver: &TokioAsyncResolver,
rx: &mut Receiver<ClientHandle>, rx: &mut Receiver<ClientHandle>,
) { ) {

View File

@@ -1,4 +1,4 @@
use crate::{listen::LanMouseListener, server::Server}; use crate::{listen::LanMouseListener, service::Service};
use futures::StreamExt; use futures::StreamExt;
use input_emulation::{EmulationHandle, InputEmulation, InputEmulationError}; use input_emulation::{EmulationHandle, InputEmulation, InputEmulationError};
use input_event::Event; use input_event::Event;
@@ -21,14 +21,14 @@ pub(crate) struct Emulation {
} }
impl Emulation { impl Emulation {
pub(crate) fn new(server: Server, listener: LanMouseListener) -> Self { pub(crate) fn new(server: Service, listener: LanMouseListener) -> Self {
let emulation_proxy = EmulationProxy::new(server.clone()); let emulation_proxy = EmulationProxy::new(server.clone());
let task = spawn_local(Self::run(server, listener, emulation_proxy)); let task = spawn_local(Self::run(server, listener, emulation_proxy));
Self { task } Self { task }
} }
async fn run( async fn run(
server: Server, server: Service,
mut listener: LanMouseListener, mut listener: LanMouseListener,
mut emulation_proxy: EmulationProxy, mut emulation_proxy: EmulationProxy,
) { ) {
@@ -45,10 +45,12 @@ impl Emulation {
last_response.insert(addr, Instant::now()); last_response.insert(addr, Instant::now());
match event { match event {
ProtoEvent::Enter(pos) => { ProtoEvent::Enter(pos) => {
log::info!("{addr} entered this device"); if let Some(cert) = listener.get_certificate_fingerprint(addr).await {
server.release_capture(); log::info!("{addr} entered this device");
listener.reply(addr, ProtoEvent::Ack(0)).await; server.release_capture();
server.register_incoming(addr, to_ipc_pos(pos)); listener.reply(addr, ProtoEvent::Ack(0)).await;
server.register_incoming(addr, to_ipc_pos(pos), cert);
}
} }
ProtoEvent::Leave(_) => { ProtoEvent::Leave(_) => {
emulation_proxy.release_keys(addr); emulation_proxy.release_keys(addr);
@@ -89,7 +91,7 @@ impl Emulation {
/// proxy handling the actual input emulation, /// proxy handling the actual input emulation,
/// discarding events when it is disabled /// discarding events when it is disabled
pub(crate) struct EmulationProxy { pub(crate) struct EmulationProxy {
server: Server, server: Service,
tx: Sender<(ProxyEvent, SocketAddr)>, tx: Sender<(ProxyEvent, SocketAddr)>,
task: JoinHandle<()>, task: JoinHandle<()>,
} }
@@ -100,7 +102,7 @@ enum ProxyEvent {
} }
impl EmulationProxy { impl EmulationProxy {
fn new(server: Server) -> Self { fn new(server: Service) -> Self {
let (tx, rx) = channel(); let (tx, rx) = channel();
let task = spawn_local(Self::emulation_task(server.clone(), rx)); let task = spawn_local(Self::emulation_task(server.clone(), rx));
Self { server, tx, task } Self { server, tx, task }
@@ -121,7 +123,7 @@ impl EmulationProxy {
.expect("channel closed"); .expect("channel closed");
} }
async fn emulation_task(server: Server, mut rx: Receiver<(ProxyEvent, SocketAddr)>) { async fn emulation_task(server: Service, mut rx: Receiver<(ProxyEvent, SocketAddr)>) {
let mut handles = HashMap::new(); let mut handles = HashMap::new();
let mut next_id = 0; let mut next_id = 0;
loop { loop {
@@ -136,7 +138,7 @@ impl EmulationProxy {
} }
async fn do_emulation( async fn do_emulation(
server: &Server, server: &Service,
handles: &mut HashMap<SocketAddr, EmulationHandle>, handles: &mut HashMap<SocketAddr, EmulationHandle>,
next_id: &mut EmulationHandle, next_id: &mut EmulationHandle,
rx: &mut Receiver<(ProxyEvent, SocketAddr)>, rx: &mut Receiver<(ProxyEvent, SocketAddr)>,
@@ -163,7 +165,7 @@ impl EmulationProxy {
} }
async fn do_emulation_session( async fn do_emulation_session(
server: &Server, server: &Service,
emulation: &mut InputEmulation, emulation: &mut InputEmulation,
handles: &mut HashMap<SocketAddr, EmulationHandle>, handles: &mut HashMap<SocketAddr, EmulationHandle>,
next_id: &mut EmulationHandle, next_id: &mut EmulationHandle,

View File

@@ -8,4 +8,4 @@ mod dns;
mod emulation; mod emulation;
pub mod emulation_test; pub mod emulation_test;
mod listen; mod listen;
pub mod server; pub mod service;

View File

@@ -16,6 +16,7 @@ use tokio::{
}; };
use webrtc_dtls::{ use webrtc_dtls::{
config::{ClientAuthType::RequireAnyClientCert, Config, ExtendedMasterSecretType}, config::{ClientAuthType::RequireAnyClientCert, Config, ExtendedMasterSecretType},
conn::DTLSConn,
crypto::Certificate, crypto::Certificate,
listener::listen, listener::listen,
}; };
@@ -57,7 +58,7 @@ impl LanMouseListener {
move |certs: &[Vec<u8>], _chains: &[CertificateDer<'static>]| { move |certs: &[Vec<u8>], _chains: &[CertificateDer<'static>]| {
assert!(certs.len() == 1); assert!(certs.len() == 1);
let fingerprints = certs let fingerprints = certs
.into_iter() .iter()
.map(|c| crypto::generate_fingerprint(c)) .map(|c| crypto::generate_fingerprint(c))
.collect::<Vec<_>>(); .collect::<Vec<_>>();
if authorized_keys if authorized_keys
@@ -143,6 +144,25 @@ impl LanMouseListener {
} }
} }
} }
pub(crate) async fn get_certificate_fingerprint(&self, addr: SocketAddr) -> Option<String> {
if let Some(conn) = self
.conns
.lock()
.await
.iter()
.find(|(a, _)| *a == addr)
.map(|(_, c)| c.clone())
{
let conn: &DTLSConn = conn.as_any().downcast_ref().expect("dtls conn");
let certs = conn.connection_state().await.peer_certificates;
let cert = certs.get(0)?;
let fingerprint = crypto::generate_fingerprint(cert);
Some(fingerprint)
} else {
None
}
}
} }
impl Stream for LanMouseListener { impl Stream for LanMouseListener {

View File

@@ -5,7 +5,7 @@ use lan_mouse::{
capture_test, capture_test,
config::{Config, ConfigError, Frontend}, config::{Config, ConfigError, Frontend},
emulation_test, emulation_test,
server::{Server, ServiceError}, service::{Service, ServiceError},
}; };
use lan_mouse_ipc::IpcError; use lan_mouse_ipc::IpcError;
use std::{ use std::{
@@ -101,7 +101,8 @@ fn start_service() -> Result<Child, io::Error> {
async fn run_service(config: Config) -> Result<(), ServiceError> { async fn run_service(config: Config) -> Result<(), ServiceError> {
log::info!("Press {:?} to release the mouse", config.release_bind); log::info!("Press {:?} to release the mouse", config.release_bind);
Server::new(config).run().await?; let mut server = Service::new(config).await?;
server.run().await?;
log::info!("service exited!"); log::info!("service exited!");
Ok(()) Ok(())
} }

View File

@@ -26,6 +26,7 @@ use std::{
use thiserror::Error; use thiserror::Error;
use tokio::{signal, sync::Notify}; use tokio::{signal, sync::Notify};
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use webrtc_dtls::crypto::Certificate;
#[derive(Debug, Error)] #[derive(Debug, Error)]
pub enum ServiceError { pub enum ServiceError {
@@ -44,33 +45,35 @@ pub enum ServiceError {
pub struct ReleaseToken; pub struct ReleaseToken;
#[derive(Clone)] #[derive(Clone)]
pub struct Server { pub struct Service {
active: Rc<Cell<Option<ClientHandle>>>, active: Rc<Cell<Option<ClientHandle>>>,
authorized_keys: Arc<RwLock<HashMap<String, String>>>, authorized_keys: Arc<RwLock<HashMap<String, String>>>,
_known_hosts: Rc<RefCell<HashSet<String>>>,
pub(crate) client_manager: ClientManager, pub(crate) client_manager: ClientManager,
port: Rc<Cell<u16>>, port: Rc<Cell<u16>>,
public_key_fingerprint: Option<String>, public_key_fingerprint: String,
notifies: Rc<Notifies>, notifies: Rc<Notifies>,
pub(crate) config: Rc<Config>, pub(crate) config: Rc<Config>,
pending_frontend_events: Rc<RefCell<VecDeque<FrontendEvent>>>, pending_frontend_events: Rc<RefCell<VecDeque<FrontendEvent>>>,
pending_incoming: Rc<RefCell<VecDeque<(SocketAddr, Position, String)>>>,
capture_status: Rc<Cell<Status>>, capture_status: Rc<Cell<Status>>,
pub(crate) emulation_status: Rc<Cell<Status>>, pub(crate) emulation_status: Rc<Cell<Status>>,
pub(crate) should_release: Rc<RefCell<Option<ReleaseToken>>>, pub(crate) should_release: Rc<RefCell<Option<ReleaseToken>>>,
incoming_conns: Rc<RefCell<HashMap<SocketAddr, Position>>>, incoming_conns: Rc<RefCell<HashMap<SocketAddr, Position>>>,
cert: Certificate,
} }
#[derive(Default)] #[derive(Default)]
struct Notifies { struct Notifies {
capture: Notify, capture: Notify,
emulation: Notify, emulation: Notify,
incoming: Notify,
port_changed: Notify, port_changed: Notify,
frontend_event_pending: Notify, frontend_event_pending: Notify,
cancel: CancellationToken, cancel: CancellationToken,
} }
impl Server { impl Service {
pub fn new(config: Config) -> Self { pub async fn new(config: Config) -> Result<Self, ServiceError> {
let client_manager = ClientManager::default(); let client_manager = ClientManager::default();
let port = Rc::new(Cell::new(config.port)); let port = Rc::new(Cell::new(config.port));
for client in config.get_clients() { for client in config.get_clients() {
@@ -96,13 +99,18 @@ impl Server {
let config = Rc::new(config); let config = Rc::new(config);
Self { // load certificate
let cert = crypto::load_or_generate_key_and_cert(&config.cert_path)?;
let public_key_fingerprint = crypto::certificate_fingerprint(&cert);
let service = Self {
active: Rc::new(Cell::new(None)), active: Rc::new(Cell::new(None)),
authorized_keys: Default::default(), authorized_keys: Default::default(),
_known_hosts: Default::default(), cert,
public_key_fingerprint: None, public_key_fingerprint,
config, config,
client_manager, client_manager,
pending_incoming: Default::default(),
port, port,
notifies, notifies,
pending_frontend_events: Rc::new(RefCell::new(VecDeque::new())), pending_frontend_events: Rc::new(RefCell::new(VecDeque::new())),
@@ -110,30 +118,22 @@ impl Server {
emulation_status: Default::default(), emulation_status: Default::default(),
incoming_conns: Rc::new(RefCell::new(HashMap::new())), incoming_conns: Rc::new(RefCell::new(HashMap::new())),
should_release: Default::default(), should_release: Default::default(),
} };
Ok(service)
} }
pub async fn run(&mut self) -> Result<(), ServiceError> { pub async fn run(&mut self) -> Result<(), ServiceError> {
// create frontend communication adapter, exit if already running // create frontend communication adapter, exit if already running
let mut frontend = match AsyncFrontendListener::new().await { let mut frontend_listener = AsyncFrontendListener::new().await?;
Ok(f) => f,
Err(IpcListenerCreationError::AlreadyRunning) => {
log::info!("service already running, exiting");
return Ok(());
}
e => e?,
};
// load certificate
let cert = crypto::load_or_generate_key_and_cert(&self.config.cert_path)?;
let public_key_fingerprint = crypto::certificate_fingerprint(&cert);
self.public_key_fingerprint.replace(public_key_fingerprint);
// listener + connection // listener + connection
let listener = let listener = LanMouseListener::new(
LanMouseListener::new(self.config.port, cert.clone(), self.authorized_keys.clone()) self.config.port,
.await?; self.cert.clone(),
let conn = LanMouseConnection::new(self.clone(), cert); self.authorized_keys.clone(),
)
.await?;
let conn = LanMouseConnection::new(self.clone(), self.cert.clone());
// input capture + emulation // input capture + emulation
let mut capture = Capture::new(self.clone(), conn); let mut capture = Capture::new(self.clone(), conn);
@@ -148,7 +148,7 @@ impl Server {
loop { loop {
tokio::select! { tokio::select! {
request = frontend.next() => { request = frontend_listener.next() => {
let request = match request { let request = match request {
Some(Ok(r)) => r, Some(Ok(r)) => r,
Some(Err(e)) => { Some(Err(e)) => {
@@ -167,7 +167,15 @@ impl Server {
let event = self.pending_frontend_events.borrow_mut().pop_front(); let event = self.pending_frontend_events.borrow_mut().pop_front();
event event
} { } {
frontend.broadcast(event).await; frontend_listener.broadcast(event).await;
}
},
_ = self.notifies.incoming.notified() => {
while let Some((addr, pos, fingerprint)) = {
let incoming = self.pending_incoming.borrow_mut().pop_front();
incoming
} {
// capture.register(addr, pos);
} }
}, },
_ = self.cancelled() => break, _ = self.cancelled() => break,
@@ -271,10 +279,7 @@ impl Server {
self.notify_frontend(FrontendEvent::CaptureStatus(self.capture_status.get())); self.notify_frontend(FrontendEvent::CaptureStatus(self.capture_status.get()));
self.notify_frontend(FrontendEvent::PortChanged(self.port.get(), None)); self.notify_frontend(FrontendEvent::PortChanged(self.port.get(), None));
self.notify_frontend(FrontendEvent::PublicKeyFingerprint( self.notify_frontend(FrontendEvent::PublicKeyFingerprint(
self.public_key_fingerprint self.public_key_fingerprint.clone(),
.as_ref()
.expect("fingerprint")
.clone(),
)); ));
self.notify_frontend(FrontendEvent::AuthorizedUpdated( self.notify_frontend(FrontendEvent::AuthorizedUpdated(
self.authorized_keys.read().expect("lock").clone(), self.authorized_keys.read().expect("lock").clone(),
@@ -425,7 +430,10 @@ impl Server {
self.active.get() self.active.get()
} }
pub(crate) fn register_incoming(&self, addr: SocketAddr, pos: Position) { pub(crate) fn register_incoming(&self, addr: SocketAddr, pos: Position, fingerprint: String) {
self.incoming_conns.borrow_mut().insert(addr, pos); self.pending_incoming
.borrow_mut()
.push_back((addr, pos, fingerprint));
self.notifies.incoming.notify_one();
} }
} }