remove dependency on service from capture

This commit is contained in:
Ferdinand Schober
2024-11-08 17:18:51 +01:00
parent 46044d0796
commit ef2b2a773e
2 changed files with 265 additions and 268 deletions

View File

@@ -1,5 +1,5 @@
use std::{ use std::{
cell::Cell, cell::{Cell, RefCell},
rc::Rc, rc::Rc,
time::{Duration, Instant}, time::{Duration, Instant},
}; };
@@ -8,14 +8,16 @@ use futures::StreamExt;
use input_capture::{ use input_capture::{
CaptureError, CaptureEvent, CaptureHandle, InputCapture, InputCaptureError, Position, CaptureError, CaptureEvent, CaptureHandle, InputCapture, InputCaptureError, Position,
}; };
use input_event::scancode;
use lan_mouse_proto::ProtoEvent; use lan_mouse_proto::ProtoEvent;
use local_channel::mpsc::{channel, Receiver, Sender}; use local_channel::mpsc::{channel, Receiver, Sender};
use tokio::task::{spawn_local, JoinHandle}; use tokio::task::{spawn_local, JoinHandle};
use tokio_util::sync::CancellationToken;
use crate::{connect::LanMouseConnection, service::Service}; use crate::connect::LanMouseConnection;
pub(crate) struct Capture { pub(crate) struct Capture {
exit_requested: Rc<Cell<bool>>, cancellation_token: CancellationToken,
request_tx: Sender<CaptureRequest>, request_tx: Sender<CaptureRequest>,
task: JoinHandle<()>, task: JoinHandle<()>,
event_rx: Receiver<ICaptureEvent>, event_rx: Receiver<ICaptureEvent>,
@@ -37,16 +39,24 @@ pub(crate) enum ICaptureEvent {
ClientEntered(u64), ClientEntered(u64),
} }
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub(crate) enum CaptureType {
/// a normal input capture
Default,
/// A capture only interested in [`CaptureEvent::Begin`] events.
/// The capture is released immediately, if there is no
/// Default capture at the same position.
EnterOnly,
}
#[derive(Clone, Copy, Debug)] #[derive(Clone, Copy, Debug)]
enum CaptureRequest { enum CaptureRequest {
/// capture must release the mouse /// capture must release the mouse
Release, Release,
/// add a capture client /// add a capture client
Create(CaptureHandle, Position), Create(CaptureHandle, Position, CaptureType),
/// destory a capture client /// destory a capture client
Destroy(CaptureHandle), Destroy(CaptureHandle),
/// terminate
Terminate,
/// reenable input capture /// reenable input capture
Reenable, Reenable,
} }
@@ -55,21 +65,25 @@ impl Capture {
pub(crate) fn new( pub(crate) fn new(
backend: Option<input_capture::Backend>, backend: Option<input_capture::Backend>,
conn: LanMouseConnection, conn: LanMouseConnection,
service: Service, release_bind: Vec<scancode::Linux>,
) -> Self { ) -> Self {
let (request_tx, request_rx) = channel(); let (request_tx, request_rx) = channel();
let (event_tx, event_rx) = channel(); let (event_tx, event_rx) = channel();
let exit_requested = Rc::new(Cell::new(false)); let cancellation_token = CancellationToken::new();
let task = spawn_local(Self::run( let capture_task = CaptureTask {
exit_requested.clone(), active_client: None,
service,
backend, backend,
request_rx, cancellation_token: cancellation_token.clone(),
captures: Default::default(),
conn, conn,
event_tx, event_tx,
)); request_rx,
release_bind: Rc::new(RefCell::new(release_bind)),
state: Default::default(),
};
let task = spawn_local(capture_task.run());
Self { Self {
exit_requested, cancellation_token,
request_tx, request_tx,
task, task,
event_rx, event_rx,
@@ -83,19 +97,22 @@ impl Capture {
} }
pub(crate) async fn terminate(&mut self) { pub(crate) async fn terminate(&mut self) {
self.exit_requested.replace(true); self.cancellation_token.cancel();
self.request_tx
.send(CaptureRequest::Terminate)
.expect("channel closed");
log::debug!("terminating capture"); log::debug!("terminating capture");
if let Err(e) = (&mut self.task).await { if let Err(e) = (&mut self.task).await {
log::warn!("{e}"); log::warn!("{e}");
} }
} }
pub(crate) fn create(&self, handle: CaptureHandle, pos: lan_mouse_ipc::Position) { pub(crate) fn create(
&self,
handle: CaptureHandle,
pos: lan_mouse_ipc::Position,
capture_type: CaptureType,
) {
let pos = to_capture_pos(pos);
self.request_tx self.request_tx
.send(CaptureRequest::Create(handle, to_capture_pos(pos))) .send(CaptureRequest::Create(handle, pos, capture_type))
.expect("channel closed"); .expect("channel closed");
} }
@@ -114,156 +131,6 @@ impl Capture {
pub(crate) async fn event(&mut self) -> ICaptureEvent { pub(crate) async fn event(&mut self) -> ICaptureEvent {
self.event_rx.recv().await.expect("channel closed") self.event_rx.recv().await.expect("channel closed")
} }
async fn run(
exit_requested: Rc<Cell<bool>>,
service: Service,
backend: Option<input_capture::Backend>,
mut request_rx: Receiver<CaptureRequest>,
mut conn: LanMouseConnection,
mut event_tx: Sender<ICaptureEvent>,
) {
let mut active = None;
loop {
if let Err(e) = do_capture(
&mut active,
&service,
backend,
&mut conn,
&mut request_rx,
&mut event_tx,
)
.await
{
log::warn!("input capture exited: {e}");
}
if exit_requested.get() {
break;
}
loop {
match request_rx.recv().await.expect("channel closed") {
CaptureRequest::Reenable => break,
CaptureRequest::Terminate => return,
_ => {}
}
}
}
}
}
async fn do_capture(
active: &mut Option<CaptureHandle>,
service: &Service,
backend: Option<input_capture::Backend>,
conn: &mut LanMouseConnection,
request_rx: &mut Receiver<CaptureRequest>,
event_tx: &mut Sender<ICaptureEvent>,
) -> Result<(), InputCaptureError> {
/* allow cancelling capture request */
let mut capture = tokio::select! {
r = InputCapture::new(backend) => r?,
_ = wait_for_termination(request_rx) => return Ok(()),
};
let _capture_guard = DropGuard::new(
event_tx,
ICaptureEvent::CaptureEnabled,
ICaptureEvent::CaptureDisabled,
);
let clients = service.client_manager.active_clients();
let clients = clients.iter().copied().map(|handle| {
(
handle,
service
.client_manager
.get_pos(handle)
.expect("no such client"),
)
});
/* create barriers for active clients */
let r = create_clients(&mut capture, clients, request_rx).await;
if let Err(e) = r {
capture.terminate().await?;
return Err(e.into());
}
let r = do_capture_session(active, &mut capture, conn, event_tx, request_rx, service).await;
// FIXME replace with async drop when stabilized
capture.terminate().await?;
r
}
async fn create_clients(
capture: &mut InputCapture,
clients: impl Iterator<Item = (CaptureHandle, lan_mouse_ipc::Position)>,
request_rx: &mut Receiver<CaptureRequest>,
) -> Result<(), CaptureError> {
for (handle, pos) in clients {
tokio::select! {
r = capture.create(handle, to_capture_pos(pos)) => r?,
_ = wait_for_termination(request_rx) => return Ok(()),
}
}
Ok(())
}
async fn do_capture_session(
active: &mut Option<CaptureHandle>,
capture: &mut InputCapture,
conn: &mut LanMouseConnection,
event_tx: &Sender<ICaptureEvent>,
request_rx: &mut Receiver<CaptureRequest>,
service: &Service,
) -> Result<(), InputCaptureError> {
let mut state = State::WaitingForAck;
loop {
tokio::select! {
event = capture.next() => match event {
Some(event) => handle_capture_event(active, service, capture, conn, event?, &mut state, event_tx).await?,
None => return Ok(()),
},
(handle, event) = conn.recv() => {
if let Some(active) = active {
if handle != *active {
// we only care about events coming from the client we are currently connected to
// only `Ack` and `Leave` are relevant
continue
}
}
match event {
// connection acknowlegded => set state to Sending
ProtoEvent::Ack(_) => {
log::info!("client {handle} acknowledged the connection!");
state = State::Sending;
}
// client disconnected
ProtoEvent::Leave(_) => {
log::info!("releasing capture: left remote client device region");
release_capture(capture, active).await?;
},
_ => {}
}
},
e = request_rx.recv() => match e.expect("channel closed") {
CaptureRequest::Reenable => { /* already active */ },
CaptureRequest::Release => release_capture(capture, active).await?,
CaptureRequest::Create(h, p) => capture.create(h, p).await?,
CaptureRequest::Destroy(h) => capture.destroy(h).await?,
CaptureRequest::Terminate => break,
}
}
}
Ok(())
}
thread_local! {
static PREV_LOG: Cell<Option<Instant>> = const { Cell::new(None) };
} }
/// debounce a statement `$st`, i.e. the statement is executed only if the /// debounce a statement `$st`, i.e. the statement is executed only if the
@@ -283,86 +150,235 @@ macro_rules! debounce {
}; };
} }
#[derive(Clone, Copy, Debug, PartialEq, Eq)] struct CaptureTask {
enum State { active_client: Option<CaptureHandle>,
WaitingForAck, backend: Option<input_capture::Backend>,
Sending, cancellation_token: CancellationToken,
captures: Vec<(CaptureHandle, Position, CaptureType)>,
conn: LanMouseConnection,
event_tx: Sender<ICaptureEvent>,
release_bind: Rc<RefCell<Vec<scancode::Linux>>>,
request_rx: Receiver<CaptureRequest>,
state: State,
} }
async fn handle_capture_event( impl CaptureTask {
active: &mut Option<CaptureHandle>, fn add_capture(&mut self, handle: CaptureHandle, pos: Position, capture_type: CaptureType) {
service: &Service, self.captures.push((handle, pos, capture_type));
capture: &mut InputCapture,
conn: &LanMouseConnection,
event: (CaptureHandle, CaptureEvent),
state: &mut State,
event_tx: &Sender<ICaptureEvent>,
) -> Result<(), CaptureError> {
let (handle, event) = event;
log::trace!("({handle}): {event:?}");
if capture.keys_pressed(&service.config.release_bind) {
log::info!("releasing capture: release-bind pressed");
return release_capture(capture, active).await;
} }
if event == CaptureEvent::Begin { fn remove_capture(&mut self, handle: CaptureHandle) {
event_tx self.captures.retain(|&(h, ..)| handle != h);
.send(ICaptureEvent::CaptureBegin(handle))
.expect("channel closed");
} }
// incoming connection fn is_default_capture_at(&self, pos: Position) -> bool {
if handle >= Service::ENTER_HANDLE_BEGIN { self.captures
// if there is no active outgoing connection at the current capture, .iter()
// we release the capture .any(|&(_, p, t)| p == pos && t == CaptureType::Default)
if let Some(pos) = service.get_incoming_pos(handle) { }
if service.client_manager.client_at(pos).is_none() {
fn get_pos(&self, handle: CaptureHandle) -> Position {
self.captures
.iter()
.find(|(h, ..)| *h == handle)
.expect("no such capture")
.1
}
fn get_type(&self, handle: CaptureHandle) -> CaptureType {
self.captures
.iter()
.find(|(h, ..)| *h == handle)
.expect("no such capture")
.2
}
async fn run(mut self) {
loop {
if let Err(e) = self.do_capture().await {
log::warn!("input capture exited: {e}");
}
if self.cancellation_token.is_cancelled() {
break;
}
loop {
tokio::select! {
r = self.request_rx.recv() => match r.expect("channel closed") {
CaptureRequest::Reenable => break,
CaptureRequest::Create(h, p, t) => self.add_capture(h, p, t),
CaptureRequest::Destroy(h) => self.remove_capture(h),
CaptureRequest::Release => { /* nothing to do */ }
},
_ = self.cancellation_token.cancelled() => break,
}
}
}
}
async fn do_capture(&mut self) -> Result<(), InputCaptureError> {
/* allow cancelling capture request */
let mut capture = tokio::select! {
r = InputCapture::new(self.backend) => r?,
_ = self.cancellation_token.cancelled() => return Ok(()),
};
let _capture_guard = DropGuard::new(
self.event_tx.clone(),
ICaptureEvent::CaptureEnabled,
ICaptureEvent::CaptureDisabled,
);
/* create barriers for active clients */
let r = self.create_captures(&mut capture).await;
if let Err(e) = r {
capture.terminate().await?;
return Err(e.into());
}
let r = self.do_capture_session(&mut capture).await;
// FIXME replace with async drop when stabilized
capture.terminate().await?;
r
}
async fn create_captures(&mut self, capture: &mut InputCapture) -> Result<(), CaptureError> {
let captures = self.captures.clone();
for (handle, pos, _type) in captures {
tokio::select! {
r = capture.create(handle, pos) => r?,
_ = self.cancellation_token.cancelled() => return Ok(()),
}
}
Ok(())
}
async fn do_capture_session(
&mut self,
capture: &mut InputCapture,
) -> Result<(), InputCaptureError> {
loop {
tokio::select! {
event = capture.next() => match event {
Some(event) => self.handle_capture_event(capture, event?).await?,
None => return Ok(()),
},
(handle, event) = self.conn.recv() => {
if let Some(active) = self.active_client {
if handle != active {
// we only care about events coming from the client we are currently connected to
// only `Ack` and `Leave` are relevant
continue
}
}
match event {
// connection acknowlegded => set state to Sending
ProtoEvent::Ack(_) => {
log::info!("client {handle} acknowledged the connection!");
self.state = State::Sending;
}
// client disconnected
ProtoEvent::Leave(_) => {
log::info!("releasing capture: left remote client device region");
self.release_capture(capture).await?;
},
_ => {}
}
},
e = self.request_rx.recv() => match e.expect("channel closed") {
CaptureRequest::Reenable => { /* already active */ },
CaptureRequest::Release => self.release_capture(capture).await?,
CaptureRequest::Create(h, p, t) => {
self.add_capture(h, p, t);
capture.create(h, p).await?;
}
CaptureRequest::Destroy(h) => {
self.remove_capture(h);
capture.destroy(h).await?;
}
},
_ = self.cancellation_token.cancelled() => break,
}
}
Ok(())
}
async fn handle_capture_event(
&mut self,
capture: &mut InputCapture,
event: (CaptureHandle, CaptureEvent),
) -> Result<(), CaptureError> {
let (handle, event) = event;
log::trace!("({handle}): {event:?}");
if capture.keys_pressed(&self.release_bind.borrow()) {
log::info!("releasing capture: release-bind pressed");
return self.release_capture(capture).await;
}
if event == CaptureEvent::Begin {
self.event_tx
.send(ICaptureEvent::CaptureBegin(handle))
.expect("channel closed");
}
// enter only capture (for incoming connections)
if self.get_type(handle) == CaptureType::EnterOnly {
// if there is no active outgoing connection at the current capture,
// we release the capture
if !self.is_default_capture_at(self.get_pos(handle)) {
log::info!("releasing capture: no active client at this position"); log::info!("releasing capture: no active client at this position");
capture.release().await?; capture.release().await?;
} }
// we dont care about events from incoming handles except for releasing the capture
return Ok(());
} }
// we dont care about events from incoming handles except for releasing the capture
return Ok(()); // activated a new client
if event == CaptureEvent::Begin && Some(handle) != self.active_client {
self.state = State::WaitingForAck;
self.active_client.replace(handle);
self.event_tx
.send(ICaptureEvent::ClientEntered(handle))
.expect("channel closed");
}
let opposite_pos = to_proto_pos(self.get_pos(handle).opposite());
let event = match event {
CaptureEvent::Begin => ProtoEvent::Enter(opposite_pos),
CaptureEvent::Input(e) => match self.state {
// connection not acknowledged, repeat `Enter` event
State::WaitingForAck => ProtoEvent::Enter(opposite_pos),
State::Sending => ProtoEvent::Input(e),
},
};
if let Err(e) = self.conn.send(event, handle).await {
const DUR: Duration = Duration::from_millis(500);
debounce!(PREV_LOG, DUR, log::warn!("releasing capture: {e}"));
capture.release().await?;
}
Ok(())
} }
// activated a new client async fn release_capture(&mut self, capture: &mut InputCapture) -> Result<(), CaptureError> {
if event == CaptureEvent::Begin && Some(handle) != *active { self.active_client.take();
*state = State::WaitingForAck; capture.release().await
active.replace(handle);
event_tx
.send(ICaptureEvent::ClientEntered(handle))
.expect("channel closed");
} }
let pos = match service.client_manager.get_pos(handle) {
Some(pos) => to_proto_pos(pos.opposite()),
None => return release_capture(capture, active).await,
};
let event = match event {
CaptureEvent::Begin => ProtoEvent::Enter(pos),
CaptureEvent::Input(e) => match state {
// connection not acknowledged, repeat `Enter` event
State::WaitingForAck => ProtoEvent::Enter(pos),
State::Sending => ProtoEvent::Input(e),
},
};
if let Err(e) = conn.send(event, handle).await {
const DUR: Duration = Duration::from_millis(500);
debounce!(PREV_LOG, DUR, log::warn!("releasing capture: {e}"));
capture.release().await?;
}
Ok(())
} }
async fn release_capture( thread_local! {
capture: &mut InputCapture, static PREV_LOG: Cell<Option<Instant>> = const { Cell::new(None) };
active: &mut Option<CaptureHandle>, }
) -> Result<(), CaptureError> {
active.take(); #[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
capture.release().await enum State {
#[default]
WaitingForAck,
Sending,
} }
fn to_capture_pos(pos: lan_mouse_ipc::Position) -> input_capture::Position { fn to_capture_pos(pos: lan_mouse_ipc::Position) -> input_capture::Position {
@@ -374,41 +390,29 @@ fn to_capture_pos(pos: lan_mouse_ipc::Position) -> input_capture::Position {
} }
} }
fn to_proto_pos(pos: lan_mouse_ipc::Position) -> lan_mouse_proto::Position { fn to_proto_pos(pos: input_capture::Position) -> lan_mouse_proto::Position {
match pos { match pos {
lan_mouse_ipc::Position::Left => lan_mouse_proto::Position::Left, input_capture::Position::Left => lan_mouse_proto::Position::Left,
lan_mouse_ipc::Position::Right => lan_mouse_proto::Position::Right, input_capture::Position::Right => lan_mouse_proto::Position::Right,
lan_mouse_ipc::Position::Top => lan_mouse_proto::Position::Top, input_capture::Position::Top => lan_mouse_proto::Position::Top,
lan_mouse_ipc::Position::Bottom => lan_mouse_proto::Position::Bottom, input_capture::Position::Bottom => lan_mouse_proto::Position::Bottom,
} }
} }
async fn wait_for_termination(rx: &mut Receiver<CaptureRequest>) { struct DropGuard<T> {
loop { tx: Sender<T>,
match rx.recv().await.expect("channel closed") {
CaptureRequest::Terminate => return,
CaptureRequest::Release => continue,
CaptureRequest::Create(_, _) => continue,
CaptureRequest::Destroy(_) => continue,
CaptureRequest::Reenable => continue,
}
}
}
struct DropGuard<'a, T> {
tx: &'a Sender<T>,
on_drop: Option<T>, on_drop: Option<T>,
} }
impl<'a, T> DropGuard<'a, T> { impl<T> DropGuard<T> {
fn new(tx: &'a Sender<T>, on_new: T, on_drop: T) -> Self { fn new(tx: Sender<T>, on_new: T, on_drop: T) -> Self {
tx.send(on_new).expect("channel closed"); tx.send(on_new).expect("channel closed");
let on_drop = Some(on_drop); let on_drop = Some(on_drop);
Self { tx, on_drop } Self { tx, on_drop }
} }
} }
impl<'a, T> Drop for DropGuard<'a, T> { impl<T> Drop for DropGuard<T> {
fn drop(&mut self) { fn drop(&mut self) {
self.tx self.tx
.send(self.on_drop.take().expect("item")) .send(self.on_drop.take().expect("item"))

View File

@@ -1,5 +1,5 @@
use crate::{ use crate::{
capture::{Capture, ICaptureEvent}, capture::{Capture, CaptureType, ICaptureEvent},
client::ClientManager, client::ClientManager,
config::Config, config::Config,
connect::LanMouseConnection, connect::LanMouseConnection,
@@ -133,7 +133,7 @@ impl Service {
// input capture + emulation // input capture + emulation
let capture_backend = self.config.capture_backend.map(|b| b.into()); let capture_backend = self.config.capture_backend.map(|b| b.into());
let mut capture = Capture::new(capture_backend, conn, self.clone()); let mut capture = Capture::new(capture_backend, conn, self.config.release_bind.clone());
let emulation_backend = self.config.emulation_backend.map(|b| b.into()); let emulation_backend = self.config.emulation_backend.map(|b| b.into());
let mut emulation = Emulation::new(emulation_backend, listener); let mut emulation = Emulation::new(emulation_backend, listener);
@@ -346,7 +346,7 @@ impl Service {
) { ) {
let handle = Self::ENTER_HANDLE_BEGIN + self.next_trigger_handle; let handle = Self::ENTER_HANDLE_BEGIN + self.next_trigger_handle;
self.next_trigger_handle += 1; self.next_trigger_handle += 1;
capture.create(handle, pos); capture.create(handle, pos, CaptureType::EnterOnly);
self.incoming_conns.borrow_mut().insert(addr); self.incoming_conns.borrow_mut().insert(addr);
self.incoming_conn_info.borrow_mut().insert( self.incoming_conn_info.borrow_mut().insert(
handle, handle,
@@ -373,13 +373,6 @@ impl Service {
.map(|incoming| incoming.addr) .map(|incoming| incoming.addr)
} }
pub(crate) fn get_incoming_pos(&self, handle: ClientHandle) -> Option<Position> {
self.incoming_conn_info
.borrow()
.get(&handle)
.map(|incoming| incoming.pos)
}
fn notify_frontend(&self, event: FrontendEvent) { fn notify_frontend(&self, event: FrontendEvent) {
self.pending_frontend_events.borrow_mut().push_back(event); self.pending_frontend_events.borrow_mut().push_back(event);
self.notifies.frontend_event_pending.notify_one(); self.notifies.frontend_event_pending.notify_one();
@@ -441,7 +434,7 @@ impl Service {
/* activate the client */ /* activate the client */
if self.client_manager.activate_client(handle) { if self.client_manager.activate_client(handle) {
/* notify capture and frontends */ /* notify capture and frontends */
capture.create(handle, pos); capture.create(handle, pos, CaptureType::Default);
self.client_updated(handle); self.client_updated(handle);
log::info!("activated client {handle} ({pos})"); log::info!("activated client {handle} ({pos})");
} }