mirror of
https://github.com/feschber/lan-mouse.git
synced 2026-03-31 17:10:54 +03:00
Encryption and One-Way-Control (#200)
This is a major rewrite of the core networking logic enabling one-way control and encryption through the webrtc-dtls crate. closes #164 closes #104
This commit is contained in:
committed by
GitHub
parent
0d074e19f1
commit
7677fae14b
418
src/capture.rs
Normal file
418
src/capture.rs
Normal file
@@ -0,0 +1,418 @@
|
||||
use std::{
|
||||
cell::{Cell, RefCell},
|
||||
rc::Rc,
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
|
||||
use futures::StreamExt;
|
||||
use input_capture::{
|
||||
CaptureError, CaptureEvent, CaptureHandle, InputCapture, InputCaptureError, Position,
|
||||
};
|
||||
use input_event::scancode;
|
||||
use lan_mouse_proto::ProtoEvent;
|
||||
use local_channel::mpsc::{channel, Receiver, Sender};
|
||||
use tokio::task::{spawn_local, JoinHandle};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
use crate::connect::LanMouseConnection;
|
||||
|
||||
pub(crate) struct Capture {
|
||||
cancellation_token: CancellationToken,
|
||||
request_tx: Sender<CaptureRequest>,
|
||||
task: JoinHandle<()>,
|
||||
event_rx: Receiver<ICaptureEvent>,
|
||||
}
|
||||
|
||||
pub(crate) enum ICaptureEvent {
|
||||
/// a client was entered
|
||||
CaptureBegin(CaptureHandle),
|
||||
/// capture disabled
|
||||
CaptureDisabled,
|
||||
/// capture disabled
|
||||
CaptureEnabled,
|
||||
/// A (new) client was entered.
|
||||
/// In contrast to [`ICaptureEvent::CaptureBegin`] this
|
||||
/// event is only triggered when the capture was
|
||||
/// explicitly released in the meantime by
|
||||
/// either the remote client leaving its device region,
|
||||
/// a new device entering the screen or the release bind.
|
||||
ClientEntered(u64),
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
|
||||
pub(crate) enum CaptureType {
|
||||
/// a normal input capture
|
||||
Default,
|
||||
/// A capture only interested in [`CaptureEvent::Begin`] events.
|
||||
/// The capture is released immediately, if there is no
|
||||
/// Default capture at the same position.
|
||||
EnterOnly,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug)]
|
||||
enum CaptureRequest {
|
||||
/// capture must release the mouse
|
||||
Release,
|
||||
/// add a capture client
|
||||
Create(CaptureHandle, Position, CaptureType),
|
||||
/// destory a capture client
|
||||
Destroy(CaptureHandle),
|
||||
/// reenable input capture
|
||||
Reenable,
|
||||
}
|
||||
|
||||
impl Capture {
|
||||
pub(crate) fn new(
|
||||
backend: Option<input_capture::Backend>,
|
||||
conn: LanMouseConnection,
|
||||
release_bind: Vec<scancode::Linux>,
|
||||
) -> Self {
|
||||
let (request_tx, request_rx) = channel();
|
||||
let (event_tx, event_rx) = channel();
|
||||
let cancellation_token = CancellationToken::new();
|
||||
let capture_task = CaptureTask {
|
||||
active_client: None,
|
||||
backend,
|
||||
cancellation_token: cancellation_token.clone(),
|
||||
captures: Default::default(),
|
||||
conn,
|
||||
event_tx,
|
||||
request_rx,
|
||||
release_bind: Rc::new(RefCell::new(release_bind)),
|
||||
state: Default::default(),
|
||||
};
|
||||
let task = spawn_local(capture_task.run());
|
||||
Self {
|
||||
cancellation_token,
|
||||
request_tx,
|
||||
task,
|
||||
event_rx,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn reenable(&self) {
|
||||
self.request_tx
|
||||
.send(CaptureRequest::Reenable)
|
||||
.expect("channel closed");
|
||||
}
|
||||
|
||||
pub(crate) async fn terminate(&mut self) {
|
||||
self.cancellation_token.cancel();
|
||||
log::debug!("terminating capture");
|
||||
if let Err(e) = (&mut self.task).await {
|
||||
log::warn!("{e}");
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn create(
|
||||
&self,
|
||||
handle: CaptureHandle,
|
||||
pos: lan_mouse_ipc::Position,
|
||||
capture_type: CaptureType,
|
||||
) {
|
||||
let pos = to_capture_pos(pos);
|
||||
self.request_tx
|
||||
.send(CaptureRequest::Create(handle, pos, capture_type))
|
||||
.expect("channel closed");
|
||||
}
|
||||
|
||||
pub(crate) fn destroy(&self, handle: CaptureHandle) {
|
||||
self.request_tx
|
||||
.send(CaptureRequest::Destroy(handle))
|
||||
.expect("channel closed");
|
||||
}
|
||||
|
||||
pub(crate) fn release(&self) {
|
||||
self.request_tx
|
||||
.send(CaptureRequest::Release)
|
||||
.expect("channel closed");
|
||||
}
|
||||
|
||||
pub(crate) async fn event(&mut self) -> ICaptureEvent {
|
||||
self.event_rx.recv().await.expect("channel closed")
|
||||
}
|
||||
}
|
||||
|
||||
/// debounce a statement `$st`, i.e. the statement is executed only if the
|
||||
/// time since the previous execution is at least `$dur`.
|
||||
/// `$prev` is used to keep track of this timestamp
|
||||
macro_rules! debounce {
|
||||
($prev:ident, $dur:expr, $st:stmt) => {
|
||||
let exec = match $prev.get() {
|
||||
None => true,
|
||||
Some(instant) if instant.elapsed() > $dur => true,
|
||||
_ => false,
|
||||
};
|
||||
if exec {
|
||||
$prev.replace(Some(Instant::now()));
|
||||
$st
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
struct CaptureTask {
|
||||
active_client: Option<CaptureHandle>,
|
||||
backend: Option<input_capture::Backend>,
|
||||
cancellation_token: CancellationToken,
|
||||
captures: Vec<(CaptureHandle, Position, CaptureType)>,
|
||||
conn: LanMouseConnection,
|
||||
event_tx: Sender<ICaptureEvent>,
|
||||
release_bind: Rc<RefCell<Vec<scancode::Linux>>>,
|
||||
request_rx: Receiver<CaptureRequest>,
|
||||
state: State,
|
||||
}
|
||||
|
||||
impl CaptureTask {
|
||||
fn add_capture(&mut self, handle: CaptureHandle, pos: Position, capture_type: CaptureType) {
|
||||
self.captures.push((handle, pos, capture_type));
|
||||
}
|
||||
|
||||
fn remove_capture(&mut self, handle: CaptureHandle) {
|
||||
self.captures.retain(|&(h, ..)| handle != h);
|
||||
}
|
||||
|
||||
fn is_default_capture_at(&self, pos: Position) -> bool {
|
||||
self.captures
|
||||
.iter()
|
||||
.any(|&(_, p, t)| p == pos && t == CaptureType::Default)
|
||||
}
|
||||
|
||||
fn get_pos(&self, handle: CaptureHandle) -> Position {
|
||||
self.captures
|
||||
.iter()
|
||||
.find(|(h, ..)| *h == handle)
|
||||
.expect("no such capture")
|
||||
.1
|
||||
}
|
||||
|
||||
fn get_type(&self, handle: CaptureHandle) -> CaptureType {
|
||||
self.captures
|
||||
.iter()
|
||||
.find(|(h, ..)| *h == handle)
|
||||
.expect("no such capture")
|
||||
.2
|
||||
}
|
||||
|
||||
async fn run(mut self) {
|
||||
loop {
|
||||
if let Err(e) = self.do_capture().await {
|
||||
log::warn!("input capture exited: {e}");
|
||||
}
|
||||
loop {
|
||||
tokio::select! {
|
||||
r = self.request_rx.recv() => match r.expect("channel closed") {
|
||||
CaptureRequest::Reenable => break,
|
||||
CaptureRequest::Create(h, p, t) => self.add_capture(h, p, t),
|
||||
CaptureRequest::Destroy(h) => self.remove_capture(h),
|
||||
CaptureRequest::Release => { /* nothing to do */ }
|
||||
},
|
||||
_ = self.cancellation_token.cancelled() => return,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn do_capture(&mut self) -> Result<(), InputCaptureError> {
|
||||
/* allow cancelling capture request */
|
||||
let mut capture = tokio::select! {
|
||||
r = InputCapture::new(self.backend) => r?,
|
||||
_ = self.cancellation_token.cancelled() => return Ok(()),
|
||||
};
|
||||
|
||||
let _capture_guard = DropGuard::new(
|
||||
self.event_tx.clone(),
|
||||
ICaptureEvent::CaptureEnabled,
|
||||
ICaptureEvent::CaptureDisabled,
|
||||
);
|
||||
|
||||
/* create barriers for active clients */
|
||||
let r = self.create_captures(&mut capture).await;
|
||||
if let Err(e) = r {
|
||||
capture.terminate().await?;
|
||||
return Err(e.into());
|
||||
}
|
||||
|
||||
let r = self.do_capture_session(&mut capture).await;
|
||||
|
||||
// FIXME replace with async drop when stabilized
|
||||
capture.terminate().await?;
|
||||
|
||||
r
|
||||
}
|
||||
|
||||
async fn create_captures(&mut self, capture: &mut InputCapture) -> Result<(), CaptureError> {
|
||||
let captures = self.captures.clone();
|
||||
for (handle, pos, _type) in captures {
|
||||
tokio::select! {
|
||||
r = capture.create(handle, pos) => r?,
|
||||
_ = self.cancellation_token.cancelled() => return Ok(()),
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn do_capture_session(
|
||||
&mut self,
|
||||
capture: &mut InputCapture,
|
||||
) -> Result<(), InputCaptureError> {
|
||||
loop {
|
||||
tokio::select! {
|
||||
event = capture.next() => match event {
|
||||
Some(event) => self.handle_capture_event(capture, event?).await?,
|
||||
None => return Ok(()),
|
||||
},
|
||||
(handle, event) = self.conn.recv() => {
|
||||
if let Some(active) = self.active_client {
|
||||
if handle != active {
|
||||
// we only care about events coming from the client we are currently connected to
|
||||
// only `Ack` and `Leave` are relevant
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
match event {
|
||||
// connection acknowlegded => set state to Sending
|
||||
ProtoEvent::Ack(_) => {
|
||||
log::info!("client {handle} acknowledged the connection!");
|
||||
self.state = State::Sending;
|
||||
}
|
||||
// client disconnected
|
||||
ProtoEvent::Leave(_) => {
|
||||
log::info!("releasing capture: left remote client device region");
|
||||
self.release_capture(capture).await?;
|
||||
},
|
||||
_ => {}
|
||||
}
|
||||
},
|
||||
e = self.request_rx.recv() => match e.expect("channel closed") {
|
||||
CaptureRequest::Reenable => { /* already active */ },
|
||||
CaptureRequest::Release => self.release_capture(capture).await?,
|
||||
CaptureRequest::Create(h, p, t) => {
|
||||
self.add_capture(h, p, t);
|
||||
capture.create(h, p).await?;
|
||||
}
|
||||
CaptureRequest::Destroy(h) => {
|
||||
self.remove_capture(h);
|
||||
capture.destroy(h).await?;
|
||||
}
|
||||
},
|
||||
_ = self.cancellation_token.cancelled() => break,
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_capture_event(
|
||||
&mut self,
|
||||
capture: &mut InputCapture,
|
||||
event: (CaptureHandle, CaptureEvent),
|
||||
) -> Result<(), CaptureError> {
|
||||
let (handle, event) = event;
|
||||
log::trace!("({handle}): {event:?}");
|
||||
|
||||
if capture.keys_pressed(&self.release_bind.borrow()) {
|
||||
log::info!("releasing capture: release-bind pressed");
|
||||
return self.release_capture(capture).await;
|
||||
}
|
||||
|
||||
if event == CaptureEvent::Begin {
|
||||
self.event_tx
|
||||
.send(ICaptureEvent::CaptureBegin(handle))
|
||||
.expect("channel closed");
|
||||
}
|
||||
|
||||
// enter only capture (for incoming connections)
|
||||
if self.get_type(handle) == CaptureType::EnterOnly {
|
||||
// if there is no active outgoing connection at the current capture,
|
||||
// we release the capture
|
||||
if !self.is_default_capture_at(self.get_pos(handle)) {
|
||||
log::info!("releasing capture: no active client at this position");
|
||||
capture.release().await?;
|
||||
}
|
||||
// we dont care about events from incoming handles except for releasing the capture
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// activated a new client
|
||||
if event == CaptureEvent::Begin && Some(handle) != self.active_client {
|
||||
self.state = State::WaitingForAck;
|
||||
self.active_client.replace(handle);
|
||||
self.event_tx
|
||||
.send(ICaptureEvent::ClientEntered(handle))
|
||||
.expect("channel closed");
|
||||
}
|
||||
|
||||
let opposite_pos = to_proto_pos(self.get_pos(handle).opposite());
|
||||
|
||||
let event = match event {
|
||||
CaptureEvent::Begin => ProtoEvent::Enter(opposite_pos),
|
||||
CaptureEvent::Input(e) => match self.state {
|
||||
// connection not acknowledged, repeat `Enter` event
|
||||
State::WaitingForAck => ProtoEvent::Enter(opposite_pos),
|
||||
State::Sending => ProtoEvent::Input(e),
|
||||
},
|
||||
};
|
||||
|
||||
if let Err(e) = self.conn.send(event, handle).await {
|
||||
const DUR: Duration = Duration::from_millis(500);
|
||||
debounce!(PREV_LOG, DUR, log::warn!("releasing capture: {e}"));
|
||||
capture.release().await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn release_capture(&mut self, capture: &mut InputCapture) -> Result<(), CaptureError> {
|
||||
self.active_client.take();
|
||||
capture.release().await
|
||||
}
|
||||
}
|
||||
|
||||
thread_local! {
|
||||
static PREV_LOG: Cell<Option<Instant>> = const { Cell::new(None) };
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
|
||||
enum State {
|
||||
#[default]
|
||||
WaitingForAck,
|
||||
Sending,
|
||||
}
|
||||
|
||||
fn to_capture_pos(pos: lan_mouse_ipc::Position) -> input_capture::Position {
|
||||
match pos {
|
||||
lan_mouse_ipc::Position::Left => input_capture::Position::Left,
|
||||
lan_mouse_ipc::Position::Right => input_capture::Position::Right,
|
||||
lan_mouse_ipc::Position::Top => input_capture::Position::Top,
|
||||
lan_mouse_ipc::Position::Bottom => input_capture::Position::Bottom,
|
||||
}
|
||||
}
|
||||
|
||||
fn to_proto_pos(pos: input_capture::Position) -> lan_mouse_proto::Position {
|
||||
match pos {
|
||||
input_capture::Position::Left => lan_mouse_proto::Position::Left,
|
||||
input_capture::Position::Right => lan_mouse_proto::Position::Right,
|
||||
input_capture::Position::Top => lan_mouse_proto::Position::Top,
|
||||
input_capture::Position::Bottom => lan_mouse_proto::Position::Bottom,
|
||||
}
|
||||
}
|
||||
|
||||
struct DropGuard<T> {
|
||||
tx: Sender<T>,
|
||||
on_drop: Option<T>,
|
||||
}
|
||||
|
||||
impl<T> DropGuard<T> {
|
||||
fn new(tx: Sender<T>, on_new: T, on_drop: T) -> Self {
|
||||
tx.send(on_new).expect("channel closed");
|
||||
let on_drop = Some(on_drop);
|
||||
Self { tx, on_drop }
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Drop for DropGuard<T> {
|
||||
fn drop(&mut self) {
|
||||
self.tx
|
||||
.send(self.on_drop.take().expect("item"))
|
||||
.expect("channel closed");
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user