capture stuffs

This commit is contained in:
Ferdinand Schober
2024-09-06 17:51:20 +02:00
parent e4a7f0b4fc
commit ad8c92cfbe
5 changed files with 198 additions and 471 deletions

View File

@@ -1,17 +1,170 @@
use crate::server::Server;
use futures::StreamExt;
use input_capture::{
Backend, CaptureError, CaptureEvent, CaptureHandle, InputCapture, InputCaptureError, Position,
};
use lan_mouse_ipc::{ClientHandle, Status};
use lan_mouse_proto::ProtoEvent;
use local_channel::mpsc::{channel, Receiver, Sender};
use tokio::{
process::Command,
task::{spawn_local, JoinHandle},
};
pub(crate) struct Capture {
use crate::{connect::LanMouseConnection, server::Server};
pub(crate) struct CaptureProxy {
server: Server,
tx: Sender<CaptureRequest>,
task: JoinHandle<()>,
}
impl Capture {
pub(crate) fn new(server: Server) -> Self {
Self { server }
#[derive(Clone, Copy, Debug)]
enum CaptureRequest {
/// capture must release the mouse
Release,
/// add a capture client
Create(CaptureHandle, Position),
/// destory a capture client
Destroy(CaptureHandle),
}
impl CaptureProxy {
pub(crate) fn new(server: Server, backend: Option<Backend>, conn: LanMouseConnection) -> Self {
let (tx, rx) = channel();
let task = spawn_local(Self::run(server.clone(), backend, rx, conn));
Self { server, tx, task }
}
pub(crate) async fn run(&mut self, backend: input_capture::Backend) {
pub(crate) async fn run(
server: Server,
backend: Option<Backend>,
mut rx: Receiver<CaptureRequest>,
conn: LanMouseConnection,
) {
loop {
if let Err(e) = do_capture(backend)
if let Err(e) = do_capture(backend, &server, &conn, &mut rx).await {
log::warn!("input capture exited: {e}");
}
server.set_capture_status(Status::Disabled);
tokio::select! {
_ = rx.recv() => continue,
_ = server.capture_enabled() => break,
_ = server.cancelled() => return,
}
}
}
}
async fn do_capture(
backend: Option<Backend>,
server: &Server,
conn: &LanMouseConnection,
rx: &mut Receiver<CaptureRequest>,
) -> Result<(), InputCaptureError> {
/* allow cancelling capture request */
let mut capture = tokio::select! {
r = InputCapture::new(backend) => r?,
_ = server.cancelled() => return Ok(()),
};
server.set_capture_status(Status::Enabled);
let clients = server.active_clients();
let clients = clients
.iter()
.copied()
.map(|handle| (handle, server.get_pos(handle).expect("no such client")));
/* create barriers for active clients */
for (handle, pos) in clients {
capture.create(handle, to_capture_pos(pos)).await?;
}
loop {
tokio::select! {
event = capture.next() => match event {
Some(event) => handle_capture_event(server, &mut capture, sender_tx, event?).await?,
None => return Ok(()),
},
e = rx.recv() => {
match e {
Some(e) => match e {
CaptureRequest::Release => capture.release().await?,
CaptureRequest::Create(h, p) => capture.create(h, p).await?,
CaptureRequest::Destroy(h) => capture.destroy(h).await?,
},
None => break,
}
}
_ = server.cancelled() => break,
}
}
capture.terminate().await?;
Ok(())
}
async fn handle_capture_event(
server: &Server,
capture: &mut InputCapture,
conn: &mut LanMouseConnection,
event: (CaptureHandle, CaptureEvent),
) -> Result<(), CaptureError> {
let (handle, event) = event;
log::trace!("({handle}): {event:?}");
if server.should_release.borrow_mut().take().is_some() {
return capture.release().await;
}
if event == CaptureEvent::Begin {
spawn_hook_command(server, handle);
}
let event = match event {
CaptureEvent::Begin => ProtoEvent::Enter(lan_mouse_proto::Position::Left),
CaptureEvent::Input(e) => ProtoEvent::Input(e),
};
conn.send(event, handle).await;
Ok(())
}
fn to_capture_pos(pos: lan_mouse_ipc::Position) -> input_capture::Position {
match pos {
lan_mouse_ipc::Position::Left => input_capture::Position::Left,
lan_mouse_ipc::Position::Right => input_capture::Position::Right,
lan_mouse_ipc::Position::Top => input_capture::Position::Top,
lan_mouse_ipc::Position::Bottom => input_capture::Position::Bottom,
}
}
fn spawn_hook_command(server: &Server, handle: ClientHandle) {
let Some(cmd) = server
.client_manager
.borrow()
.get(handle)
.and_then(|(c, _)| c.cmd.clone())
else {
return;
};
tokio::task::spawn_local(async move {
log::info!("spawning command!");
let mut child = match Command::new("sh").arg("-c").arg(cmd.as_str()).spawn() {
Ok(c) => c,
Err(e) => {
log::warn!("could not execute cmd: {e}");
return;
}
};
match child.wait().await {
Ok(s) => {
if s.success() {
log::info!("{cmd} exited successfully");
} else {
log::warn!("{cmd} exited with {s}");
}
}
Err(e) => log::warn!("{cmd}: {e}"),
}
});
}

View File

@@ -23,45 +23,39 @@ pub(crate) enum LanMouseConnectionError {
NoIps,
}
pub(crate) struct LanMouseConnection {}
impl LanMouseConnection {
pub(crate) async fn connect(
addr: SocketAddr,
) -> Result<Arc<dyn Conn + Sync + Send>, LanMouseConnectionError> {
let conn = Arc::new(UdpSocket::bind("0.0.0.0:0").await?);
conn.connect(addr).await;
log::info!("connected to {addr}, establishing secure dtls channel ...");
let certificate = Certificate::generate_self_signed(["localhost".to_owned()])?;
let config = Config {
certificates: vec![certificate],
insecure_skip_verify: true,
extended_master_secret: ExtendedMasterSecretType::Require,
..Default::default()
};
let dtls_conn: Arc<dyn Conn + Send + Sync> =
Arc::new(DTLSConn::new(conn, config, true, None).await?);
Ok(dtls_conn)
}
pub(crate) async fn connect_any(
addrs: &[SocketAddr],
) -> Result<Arc<dyn Conn + Send + Sync>, LanMouseConnectionError> {
let mut joinset = JoinSet::new();
for &addr in addrs {
joinset.spawn_local(Self::connect(addr));
}
let conn = joinset.join_next().await;
conn.expect("no addrs to connect").expect("failed to join")
}
async fn connect(addr: SocketAddr) -> Result<Arc<dyn Conn + Sync + Send>, LanMouseConnectionError> {
let conn = Arc::new(UdpSocket::bind("0.0.0.0:0").await?);
conn.connect(addr).await;
log::info!("connected to {addr}, establishing secure dtls channel ...");
let certificate = Certificate::generate_self_signed(["localhost".to_owned()])?;
let config = Config {
certificates: vec![certificate],
insecure_skip_verify: true,
extended_master_secret: ExtendedMasterSecretType::Require,
..Default::default()
};
let dtls_conn: Arc<dyn Conn + Send + Sync> =
Arc::new(DTLSConn::new(conn, config, true, None).await?);
Ok(dtls_conn)
}
struct ConnectionProxy {
async fn connect_any(
addrs: &[SocketAddr],
) -> Result<Arc<dyn Conn + Send + Sync>, LanMouseConnectionError> {
let mut joinset = JoinSet::new();
for &addr in addrs {
joinset.spawn_local(connect(addr));
}
let conn = joinset.join_next().await;
conn.expect("no addrs to connect").expect("failed to join")
}
pub(crate) struct LanMouseConnection {
server: Server,
conns: HashMap<SocketAddr, Arc<dyn Conn + Send + Sync>>,
}
impl ConnectionProxy {
impl LanMouseConnection {
fn find_conn(&self, addrs: &[SocketAddr]) -> Vec<Arc<dyn Conn + Send + Sync>> {
let mut conns = vec![];
for addr in addrs {
@@ -72,7 +66,7 @@ impl ConnectionProxy {
conns
}
async fn send(
pub(crate) async fn send(
&self,
event: ProtoEvent,
handle: ClientHandle,
@@ -93,7 +87,7 @@ impl ConnectionProxy {
.into_iter()
.map(|a| SocketAddr::new(a, port))
.collect::<Vec<_>>();
let conn = LanMouseConnection::connect_any(&addrs).await?;
let conn = connect_any(&addrs).await?;
let addr = conn.remote_addr().expect("no remote addr");
self.server.set_active_addr(handle, addr);
conn.send(buf).await?;

View File

@@ -1,5 +1,4 @@
use capture_task::CaptureRequest;
use emulation_task::EmulationRequest;
use futures::StreamExt;
use hickory_resolver::error::ResolveError;
use local_channel::mpsc::{channel, Sender};
@@ -31,17 +30,6 @@ use lan_mouse_ipc::{
mod capture_task;
mod emulation_task;
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
enum State {
/// Currently sending events to another device
Sending,
/// Currently receiving events from other devices
Receiving,
/// Entered the deadzone of another device but waiting
/// for acknowledgement (Leave event) from the device
AwaitAck,
}
#[derive(Debug, Error)]
pub enum ServiceError {
#[error(transparent)]
@@ -54,18 +42,19 @@ pub enum ServiceError {
ListenError(#[from] ListenerCreationError),
}
pub struct ReleaseToken;
#[derive(Clone)]
pub struct Server {
active_client: Rc<Cell<Option<ClientHandle>>>,
pub(crate) client_manager: Rc<RefCell<ClientManager>>,
port: Rc<Cell<u16>>,
state: Rc<Cell<State>>,
release_bind: Vec<input_event::scancode::Linux>,
notifies: Rc<Notifies>,
pub(crate) config: Rc<Config>,
pending_frontend_events: Rc<RefCell<VecDeque<FrontendEvent>>>,
capture_status: Rc<Cell<Status>>,
pub(crate) emulation_status: Rc<Cell<Status>>,
pub(crate) should_release: Rc<RefCell<Option<ReleaseToken>>>,
}
#[derive(Default)]
@@ -79,9 +68,7 @@ struct Notifies {
impl Server {
pub fn new(config: Config) -> Self {
let active_client = Rc::new(Cell::new(None));
let client_manager = Rc::new(RefCell::new(ClientManager::default()));
let state = Rc::new(Cell::new(State::Receiving));
let port = Rc::new(Cell::new(config.port));
for config_client in config.get_clients() {
let client = ClientConfig {
@@ -110,10 +97,8 @@ impl Server {
Self {
config,
active_client,
client_manager,
port,
state,
release_bind,
notifies,
pending_frontend_events: Rc::new(RefCell::new(VecDeque::new())),
@@ -215,7 +200,7 @@ impl Server {
self.notifies.capture.notify_waiters()
}
async fn capture_enabled(&self) {
pub(crate) async fn capture_enabled(&self) {
self.notifies.capture.notified().await
}
@@ -242,7 +227,7 @@ impl Server {
self.notify_frontend(FrontendEvent::Changed(handle));
}
fn active_clients(&self) -> Vec<ClientHandle> {
pub(crate) fn active_clients(&self) -> Vec<ClientHandle> {
self.client_manager
.borrow()
.get_client_states()
@@ -488,7 +473,7 @@ impl Server {
self.notify_frontend(status);
}
fn set_capture_status(&self, status: Status) {
pub(crate) fn set_capture_status(&self, status: Status) {
self.capture_status.replace(status);
let status = FrontendEvent::CaptureStatus(status);
self.notify_frontend(status);
@@ -508,18 +493,8 @@ impl Server {
.and_then(|(c, _)| c.hostname.clone())
}
fn get_state(&self) -> State {
self.state.get()
}
fn set_state(&self, state: State) {
log::debug!("state => {state:?}");
self.state.replace(state);
}
fn set_active(&self, handle: Option<ClientHandle>) {
log::debug!("active client => {handle:?}");
self.active_client.replace(handle);
pub(crate) fn get_pos(&self, handle: ClientHandle) -> Option<Position> {
self.client_manager.borrow().get(handle).map(|(c, _)| c.pos)
}
pub(crate) fn set_active_addr(&self, handle: ClientHandle, addr: SocketAddr) {
@@ -536,7 +511,7 @@ impl Server {
}
pub(crate) fn release_capture(&self) {
todo!()
self.should_release.replace(Some(ReleaseToken));
}
}

View File

@@ -1,207 +0,0 @@
use futures::StreamExt;
use lan_mouse_proto::ProtoEvent;
use local_channel::mpsc::{Receiver, Sender};
use std::net::SocketAddr;
use tokio::{process::Command, task::JoinHandle};
use input_capture::{
self, CaptureError, CaptureEvent, CaptureHandle, InputCapture, InputCaptureError, Position,
};
use crate::server::State;
use lan_mouse_ipc::{ClientHandle, Status};
use super::Server;
#[derive(Clone, Copy, Debug)]
pub(crate) enum CaptureRequest {
/// capture must release the mouse
Release,
/// add a capture client
Create(CaptureHandle, Position),
/// destory a capture client
Destroy(CaptureHandle),
}
pub(crate) fn new(
server: Server,
capture_rx: Receiver<CaptureRequest>,
udp_send: Sender<(ProtoEvent, SocketAddr)>,
) -> JoinHandle<()> {
let backend = server.config.capture_backend.map(|b| b.into());
tokio::task::spawn_local(capture_task(server, backend, udp_send, capture_rx))
}
async fn capture_task(
server: Server,
backend: Option<input_capture::Backend>,
sender_tx: Sender<(ProtoEvent, SocketAddr)>,
mut notify_rx: Receiver<CaptureRequest>,
) {
loop {
if let Err(e) = do_capture(backend, &server, &sender_tx, &mut notify_rx).await {
log::warn!("input capture exited: {e}");
}
server.set_capture_status(Status::Disabled);
if server.is_cancelled() {
break;
}
// allow cancellation
loop {
tokio::select! {
_ = notify_rx.recv() => continue, /* need to ignore requests here! */
_ = server.capture_enabled() => break,
_ = server.cancelled() => return,
}
}
}
}
async fn do_capture(
backend: Option<input_capture::Backend>,
server: &Server,
sender_tx: &Sender<(ProtoEvent, SocketAddr)>,
notify_rx: &mut Receiver<CaptureRequest>,
) -> Result<(), InputCaptureError> {
/* allow cancelling capture request */
let mut capture = tokio::select! {
r = InputCapture::new(backend) => r?,
_ = server.cancelled() => return Ok(()),
};
server.set_capture_status(Status::Enabled);
let clients = server.active_clients();
let clients = clients.iter().copied().map(|handle| {
(
handle,
server
.client_manager
.borrow()
.get(handle)
.map(|(c, _)| c.pos)
.expect("no such client"),
)
});
for (handle, pos) in clients {
capture.create(handle, to_capture_pos(pos)).await?;
}
loop {
tokio::select! {
event = capture.next() => match event {
Some(event) => handle_capture_event(server, &mut capture, sender_tx, event?).await?,
None => return Ok(()),
},
e = notify_rx.recv() => {
log::debug!("input capture notify rx: {e:?}");
match e {
Some(e) => match e {
CaptureRequest::Release => {
capture.release().await?;
server.state.replace(State::Receiving);
}
CaptureRequest::Create(h, p) => capture.create(h, p).await?,
CaptureRequest::Destroy(h) => capture.destroy(h).await?,
},
None => break,
}
}
_ = server.cancelled() => break,
}
}
capture.terminate().await?;
Ok(())
}
async fn handle_capture_event(
server: &Server,
capture: &mut InputCapture,
sender_tx: &Sender<(ProtoEvent, SocketAddr)>,
event: (CaptureHandle, CaptureEvent),
) -> Result<(), CaptureError> {
let (handle, event) = event;
log::trace!("({handle}) {event:?}");
// capture started
if event == CaptureEvent::Begin {
// wait for remote to acknowlegde enter
server.set_state(State::AwaitAck);
server.set_active(Some(handle));
// restart ping timer to release capture if unreachable
server.restart_ping_timer();
// spawn enter hook cmd
spawn_hook_command(server, handle);
}
// release capture if emulation set state to Receiveing
if server.get_state() == State::Receiving {
capture.release().await?;
return Ok(());
}
// check release bind
if capture.keys_pressed(&server.release_bind) {
capture.release().await?;
server.set_state(State::Receiving);
}
if let Some(addr) = server.active_addr(handle) {
let event = match server.get_state() {
State::Sending => match event {
CaptureEvent::Begin => ProtoEvent::Enter(0),
CaptureEvent::Input(e) => ProtoEvent::Input(e),
},
/* send additional enter events until acknowleged */
State::AwaitAck => ProtoEvent::Enter(0),
/* released capture */
State::Receiving => ProtoEvent::Leave(0),
};
log::error!("SENDING: {event:?} -> {addr:?}");
sender_tx.send((event, addr)).expect("sender closed");
};
Ok(())
}
fn spawn_hook_command(server: &Server, handle: ClientHandle) {
let Some(cmd) = server
.client_manager
.borrow()
.get(handle)
.and_then(|(c, _)| c.cmd.clone())
else {
return;
};
tokio::task::spawn_local(async move {
log::info!("spawning command!");
let mut child = match Command::new("sh").arg("-c").arg(cmd.as_str()).spawn() {
Ok(c) => c,
Err(e) => {
log::warn!("could not execute cmd: {e}");
return;
}
};
match child.wait().await {
Ok(s) => {
if s.success() {
log::info!("{cmd} exited successfully");
} else {
log::warn!("{cmd} exited with {s}");
}
}
Err(e) => log::warn!("{cmd}: {e}"),
}
});
}
fn to_capture_pos(pos: lan_mouse_ipc::Position) -> input_capture::Position {
match pos {
lan_mouse_ipc::Position::Left => input_capture::Position::Left,
lan_mouse_ipc::Position::Right => input_capture::Position::Right,
lan_mouse_ipc::Position::Top => input_capture::Position::Top,
lan_mouse_ipc::Position::Bottom => input_capture::Position::Bottom,
}
}

View File

@@ -1,188 +0,0 @@
use local_channel::mpsc::{Receiver, Sender};
use std::net::SocketAddr;
use lan_mouse_proto::ProtoEvent;
use tokio::task::JoinHandle;
use lan_mouse_ipc::ClientHandle;
use crate::{client::ClientManager, server::State};
use input_emulation::{self, EmulationError, EmulationHandle, InputEmulation, InputEmulationError};
use lan_mouse_ipc::Status;
use super::{network_task::NetworkError, Server};
#[derive(Clone, Debug)]
pub(crate) enum EmulationRequest {
/// create a new client
Create(EmulationHandle),
/// destroy a client
Destroy(EmulationHandle),
/// input emulation must release keys for client
ReleaseKeys(ClientHandle),
}
pub(crate) fn new(
server: Server,
emulation_rx: Receiver<EmulationRequest>,
udp_rx: Receiver<Result<(ProtoEvent, SocketAddr), NetworkError>>,
sender_tx: Sender<(ProtoEvent, SocketAddr)>,
) -> JoinHandle<()> {
let emulation_task = emulation_task(server, emulation_rx, udp_rx, sender_tx);
tokio::task::spawn_local(emulation_task)
}
async fn emulation_task(
server: Server,
mut rx: Receiver<EmulationRequest>,
mut udp_rx: Receiver<Result<(ProtoEvent, SocketAddr), NetworkError>>,
sender_tx: Sender<(ProtoEvent, SocketAddr)>,
) {
loop {
if let Err(e) = do_emulation(&server, &mut rx, &mut udp_rx, &sender_tx).await {
log::warn!("input emulation exited: {e}");
}
server.set_emulation_status(Status::Disabled);
if server.is_cancelled() {
break;
}
// allow cancellation
loop {
tokio::select! {
_ = rx.recv() => continue, /* need to ignore requests here! */
_ = server.emulation_notified() => break,
_ = server.cancelled() => return,
}
}
}
}
async fn do_emulation(
server: &Server,
rx: &mut Receiver<EmulationRequest>,
udp_rx: &mut Receiver<Result<(ProtoEvent, SocketAddr), NetworkError>>,
sender_tx: &Sender<(ProtoEvent, SocketAddr)>,
) -> Result<(), InputEmulationError> {
let backend = server.config.emulation_backend.map(|b| b.into());
log::info!("creating input emulation...");
let mut emulation = tokio::select! {
r = InputEmulation::new(backend) => r?,
_ = server.cancelled() => return Ok(()),
};
server.set_emulation_status(Status::Enabled);
// add clients
for handle in server.active_clients() {
emulation.create(handle).await;
}
let res = do_emulation_session(server, &mut emulation, rx, udp_rx, sender_tx).await;
emulation.terminate().await; // manual drop
res
}
async fn do_emulation_session(
server: &Server,
emulation: &mut InputEmulation,
rx: &mut Receiver<EmulationRequest>,
udp_rx: &mut Receiver<Result<(ProtoEvent, SocketAddr), NetworkError>>,
sender_tx: &Sender<(ProtoEvent, SocketAddr)>,
) -> Result<(), InputEmulationError> {
let mut last_ignored = None;
loop {
tokio::select! {
udp_event = udp_rx.recv() => {
let udp_event = match udp_event.expect("channel closed") {
Ok(e) => e,
Err(e) => {
log::warn!("network error: {e}");
continue;
}
};
handle_incoming_event(server, emulation, sender_tx, &mut last_ignored, udp_event).await?;
}
emulate_event = rx.recv() => {
match emulate_event.expect("channel closed") {
EmulationRequest::Create(h) => { let _ = emulation.create(h).await; },
EmulationRequest::Destroy(h) => emulation.destroy(h).await,
EmulationRequest::ReleaseKeys(c) => emulation.release_keys(c).await?,
}
}
_ = server.notifies.cancel.cancelled() => break Ok(()),
}
}
}
async fn handle_incoming_event(
server: &Server,
emulate: &mut InputEmulation,
sender_tx: &Sender<(ProtoEvent, SocketAddr)>,
last_ignored: &mut Option<SocketAddr>,
event: (ProtoEvent, SocketAddr),
) -> Result<(), EmulationError> {
let (event, addr) = event;
log::trace!("{:20} <-<-<-<------ {addr}", event.to_string());
// get client handle for addr
let Some(handle) =
activate_client_if_exists(&mut server.client_manager.borrow_mut(), addr, last_ignored)
else {
return Ok(());
};
match (event, addr) {
(ProtoEvent::Pong, _) => { /* ignore pong events */ }
(ProtoEvent::Ping, addr) => {
let _ = sender_tx.send((ProtoEvent::Pong, addr));
}
(ProtoEvent::Leave(_), _) => emulate.release_keys(handle).await?,
(ProtoEvent::Ack(_), _) => server.set_state(State::Sending),
(ProtoEvent::Enter(_), _) => {
server.set_state(State::Receiving);
sender_tx
.send((ProtoEvent::Ack(0), addr))
.expect("no channel")
}
(ProtoEvent::Input(e), _) => {
if let State::Receiving = server.get_state() {
log::trace!("{event} => emulate");
emulate.consume(e, handle).await?;
let has_pressed_keys = emulate.has_pressed_keys(handle);
server.update_pressed_keys(handle, has_pressed_keys);
if has_pressed_keys {
server.restart_ping_timer();
}
}
}
}
Ok(())
}
fn activate_client_if_exists(
client_manager: &mut ClientManager,
addr: SocketAddr,
last_ignored: &mut Option<SocketAddr>,
) -> Option<ClientHandle> {
let Some(handle) = client_manager.get_client(addr) else {
// log ignored if it is the first event from the client in a series
if last_ignored.is_none() || last_ignored.is_some() && last_ignored.unwrap() != addr {
log::warn!("ignoring events from client {addr}");
last_ignored.replace(addr);
}
return None;
};
// next event can be logged as ignored again
last_ignored.take();
let (_, client_state) = client_manager.get_mut(handle)?;
// reset ttl for client
client_state.alive = true;
// set addr as new default for this client
client_state.active_addr = Some(addr);
Some(handle)
}