rename producer, consumer to emulation and capture (#98)

input emulation / input capture is clearer than event consumer and producer
This commit is contained in:
Ferdinand Schober
2024-03-21 20:26:57 +01:00
committed by GitHub
parent 78c9de45c7
commit 742b1585d7
23 changed files with 237 additions and 237 deletions

View File

@@ -5,9 +5,9 @@ use std::{collections::HashSet, net::SocketAddr};
use tokio::{sync::mpsc::Sender, task::JoinHandle};
use crate::{
capture::InputCapture,
client::{ClientEvent, ClientHandle},
event::{Event, KeyboardEvent},
producer::EventProducer,
scancode,
server::State,
};
@@ -15,45 +15,45 @@ use crate::{
use super::Server;
#[derive(Clone, Copy, Debug)]
pub enum ProducerEvent {
/// producer must release the mouse
pub enum CaptureEvent {
/// capture must release the mouse
Release,
/// producer is notified of a change in client states
/// capture is notified of a change in client states
ClientEvent(ClientEvent),
/// termination signal
Terminate,
}
pub fn new(
mut producer: Box<dyn EventProducer>,
mut capture: Box<dyn InputCapture>,
server: Server,
sender_tx: Sender<(Event, SocketAddr)>,
timer_tx: Sender<()>,
release_bind: Vec<scancode::Linux>,
) -> (JoinHandle<Result<()>>, Sender<ProducerEvent>) {
) -> (JoinHandle<Result<()>>, Sender<CaptureEvent>) {
let (tx, mut rx) = tokio::sync::mpsc::channel(32);
let task = tokio::task::spawn_local(async move {
let mut pressed_keys = HashSet::new();
loop {
tokio::select! {
event = producer.next() => {
event = capture.next() => {
match event {
Some(Ok(event)) => handle_producer_event(&server, &mut producer, &sender_tx, &timer_tx, event, &mut pressed_keys, &release_bind).await?,
Some(Err(e)) => return Err(anyhow!("event producer: {e:?}")),
None => return Err(anyhow!("event producer closed")),
Some(Ok(event)) => handle_capture_event(&server, &mut capture, &sender_tx, &timer_tx, event, &mut pressed_keys, &release_bind).await?,
Some(Err(e)) => return Err(anyhow!("input capture: {e:?}")),
None => return Err(anyhow!("input capture terminated")),
}
}
e = rx.recv() => {
log::debug!("producer notify rx: {e:?}");
log::debug!("input capture notify rx: {e:?}");
match e {
Some(e) => match e {
ProducerEvent::Release => {
producer.release()?;
CaptureEvent::Release => {
capture.release()?;
server.state.replace(State::Receiving);
}
ProducerEvent::ClientEvent(e) => producer.notify(e)?,
ProducerEvent::Terminate => break,
CaptureEvent::ClientEvent(e) => capture.notify(e)?,
CaptureEvent::Terminate => break,
},
None => break,
}
@@ -75,9 +75,9 @@ fn update_pressed_keys(pressed_keys: &mut HashSet<scancode::Linux>, key: u32, st
}
}
async fn handle_producer_event(
async fn handle_capture_event(
server: &Server,
producer: &mut Box<dyn EventProducer>,
capture: &mut Box<dyn InputCapture>,
sender_tx: &Sender<(Event, SocketAddr)>,
timer_tx: &Sender<()>,
event: (ClientHandle, Event),
@@ -93,7 +93,7 @@ async fn handle_producer_event(
if release_bind.iter().all(|k| pressed_keys.contains(k)) {
pressed_keys.clear();
log::info!("releasing pointer");
producer.release()?;
capture.release()?;
server.state.replace(State::Receiving);
log::trace!("STATE ===> Receiving");
// send an event to release all the modifiers
@@ -112,7 +112,7 @@ async fn handle_producer_event(
None => {
// should not happen
log::warn!("unknown client!");
producer.release()?;
capture.release()?;
server.state.replace(State::Receiving);
log::trace!("STATE ===> Receiving");
return Ok(());

View File

@@ -8,53 +8,53 @@ use tokio::{
use crate::{
client::{ClientEvent, ClientHandle},
consumer::EventConsumer,
emulate::InputEmulation,
event::{Event, KeyboardEvent},
scancode,
server::State,
};
use super::{ProducerEvent, Server};
use super::{CaptureEvent, Server};
#[derive(Clone, Debug)]
pub enum ConsumerEvent {
/// consumer is notified of a change in client states
pub enum EmulationEvent {
/// input emulation is notified of a change in client states
ClientEvent(ClientEvent),
/// consumer must release keys for client
/// input emulation must release keys for client
ReleaseKeys(ClientHandle),
/// termination signal
Terminate,
}
pub fn new(
mut consumer: Box<dyn EventConsumer>,
mut emulate: Box<dyn InputEmulation>,
server: Server,
mut udp_rx: Receiver<Result<(Event, SocketAddr)>>,
sender_tx: Sender<(Event, SocketAddr)>,
producer_tx: Sender<ProducerEvent>,
capture_tx: Sender<CaptureEvent>,
timer_tx: Sender<()>,
) -> (JoinHandle<Result<()>>, Sender<ConsumerEvent>) {
) -> (JoinHandle<Result<()>>, Sender<EmulationEvent>) {
let (tx, mut rx) = tokio::sync::mpsc::channel(32);
let consumer_task = tokio::task::spawn_local(async move {
let emulate_task = tokio::task::spawn_local(async move {
let mut last_ignored = None;
loop {
tokio::select! {
udp_event = udp_rx.recv() => {
let udp_event = udp_event.ok_or(anyhow!("receiver closed"))??;
handle_udp_rx(&server, &producer_tx, &mut consumer, &sender_tx, &mut last_ignored, udp_event, &timer_tx).await;
handle_udp_rx(&server, &capture_tx, &mut emulate, &sender_tx, &mut last_ignored, udp_event, &timer_tx).await;
}
consumer_event = rx.recv() => {
match consumer_event {
emulate_event = rx.recv() => {
match emulate_event {
Some(e) => match e {
ConsumerEvent::ClientEvent(e) => consumer.notify(e).await,
ConsumerEvent::ReleaseKeys(c) => release_keys(&server, &mut consumer, c).await,
ConsumerEvent::Terminate => break,
EmulationEvent::ClientEvent(e) => emulate.notify(e).await,
EmulationEvent::ReleaseKeys(c) => release_keys(&server, &mut emulate, c).await,
EmulationEvent::Terminate => break,
},
None => break,
}
}
res = consumer.dispatch() => {
res = emulate.dispatch() => {
res?;
}
}
@@ -68,20 +68,20 @@ pub fn new(
.map(|s| s.client.handle)
.collect::<Vec<_>>();
for client in clients {
release_keys(&server, &mut consumer, client).await;
release_keys(&server, &mut emulate, client).await;
}
// destroy consumer
consumer.destroy().await;
// destroy emulator
emulate.destroy().await;
anyhow::Ok(())
});
(consumer_task, tx)
(emulate_task, tx)
}
async fn handle_udp_rx(
server: &Server,
producer_notify_tx: &Sender<ProducerEvent>,
consumer: &mut Box<dyn EventConsumer>,
capture_tx: &Sender<CaptureEvent>,
emulate: &mut Box<dyn InputEmulation>,
sender_tx: &Sender<(Event, SocketAddr)>,
last_ignored: &mut Option<SocketAddr>,
event: (Event, SocketAddr),
@@ -127,7 +127,7 @@ async fn handle_udp_rx(
let _ = sender_tx.send((Event::Pong(), addr)).await;
}
(Event::Disconnect(), _) => {
release_keys(server, consumer, handle).await;
release_keys(server, emulate, handle).await;
}
(event, addr) => {
// tell clients that we are ready to receive events
@@ -143,7 +143,7 @@ async fn handle_udp_rx(
} else {
// upon receiving any event, we go back to receiving mode
server.state.replace(State::Receiving);
let _ = producer_notify_tx.send(ProducerEvent::Release).await;
let _ = capture_tx.send(CaptureEvent::Release).await;
log::trace!("STATE ===> Receiving");
}
}
@@ -176,8 +176,8 @@ async fn handle_udp_rx(
// workaround buggy rdp backend.
if !ignore_event {
// consume event
consumer.consume(event, handle).await;
log::trace!("{event:?} => consumer");
emulate.consume(event, handle).await;
log::trace!("{event:?} => emulate");
}
}
State::AwaitingLeave => {
@@ -194,7 +194,7 @@ async fn handle_udp_rx(
// event should still be possible
if let Event::Enter() = event {
server.state.replace(State::Receiving);
let _ = producer_notify_tx.send(ProducerEvent::Release).await;
let _ = capture_tx.send(CaptureEvent::Release).await;
log::trace!("STATE ===> Receiving");
}
}
@@ -205,7 +205,7 @@ async fn handle_udp_rx(
async fn release_keys(
server: &Server,
consumer: &mut Box<dyn EventConsumer>,
emulate: &mut Box<dyn InputEmulation>,
client: ClientHandle,
) {
let keys = server
@@ -222,7 +222,7 @@ async fn release_keys(
key,
state: 0,
});
consumer.consume(event, client).await;
emulate.consume(event, client).await;
if let Ok(key) = scancode::Linux::try_from(key) {
log::warn!("releasing stuck key: {key:?}");
}
@@ -234,7 +234,7 @@ async fn release_keys(
mods_locked: 0,
group: 0,
};
consumer
emulate
.consume(Event::Keyboard(modifiers_event), client)
.await;
}

View File

@@ -22,15 +22,15 @@ use crate::{
};
use super::{
consumer_task::ConsumerEvent, producer_task::ProducerEvent, resolver_task::DnsRequest, Server,
capture_task::CaptureEvent, emulation_task::EmulationEvent, resolver_task::DnsRequest, Server,
};
pub(crate) fn new(
mut frontend: FrontendListener,
mut notify_rx: Receiver<FrontendNotify>,
server: Server,
producer_notify: Sender<ProducerEvent>,
consumer_notify: Sender<ConsumerEvent>,
capture_notify: Sender<CaptureEvent>,
emulate_notify: Sender<EmulationEvent>,
resolve_ch: Sender<DnsRequest>,
port_tx: Sender<u16>,
) -> (JoinHandle<Result<()>>, Sender<FrontendEvent>) {
@@ -51,7 +51,7 @@ pub(crate) fn new(
}
event = event_rx.recv() => {
let frontend_event = event.ok_or(anyhow!("frontend channel closed"))?;
if handle_frontend_event(&server, &producer_notify, &consumer_notify, &resolve_ch, &mut frontend, &port_tx, frontend_event).await {
if handle_frontend_event(&server, &capture_notify, &emulate_notify, &resolve_ch, &mut frontend, &port_tx, frontend_event).await {
break;
}
}
@@ -98,8 +98,8 @@ async fn handle_frontend_stream(
async fn handle_frontend_event(
server: &Server,
producer_tx: &Sender<ProducerEvent>,
consumer_tx: &Sender<ConsumerEvent>,
capture_tx: &Sender<CaptureEvent>,
emulate_tx: &Sender<EmulationEvent>,
resolve_tx: &Sender<DnsRequest>,
frontend: &mut FrontendListener,
port_tx: &Sender<u16>,
@@ -120,7 +120,7 @@ async fn handle_frontend_event(
Some(FrontendNotify::NotifyClientCreate(client))
}
FrontendEvent::ActivateClient(handle, active) => {
activate_client(server, producer_tx, consumer_tx, handle, active).await;
activate_client(server, capture_tx, emulate_tx, handle, active).await;
Some(FrontendNotify::NotifyClientActivate(handle, active))
}
FrontendEvent::ChangePort(port) => {
@@ -128,7 +128,7 @@ async fn handle_frontend_event(
None
}
FrontendEvent::DelClient(handle) => {
remove_client(server, producer_tx, consumer_tx, frontend, handle).await;
remove_client(server, capture_tx, emulate_tx, frontend, handle).await;
Some(FrontendNotify::NotifyClientDelete(handle))
}
FrontendEvent::Enumerate() => {
@@ -147,8 +147,8 @@ async fn handle_frontend_event(
FrontendEvent::UpdateClient(handle, hostname, port, pos) => {
update_client(
server,
producer_tx,
consumer_tx,
capture_tx,
emulate_tx,
resolve_tx,
(handle, hostname, port, pos),
)
@@ -204,8 +204,8 @@ pub async fn add_client(
pub async fn activate_client(
server: &Server,
producer_notify_tx: &Sender<ProducerEvent>,
consumer_notify_tx: &Sender<ConsumerEvent>,
capture_notify_tx: &Sender<CaptureEvent>,
emulate_notify_tx: &Sender<EmulationEvent>,
client: ClientHandle,
active: bool,
) {
@@ -217,26 +217,28 @@ pub async fn activate_client(
None => return,
};
if active {
let _ = producer_notify_tx
.send(ProducerEvent::ClientEvent(ClientEvent::Create(client, pos)))
let _ = capture_notify_tx
.send(CaptureEvent::ClientEvent(ClientEvent::Create(client, pos)))
.await;
let _ = consumer_notify_tx
.send(ConsumerEvent::ClientEvent(ClientEvent::Create(client, pos)))
let _ = emulate_notify_tx
.send(EmulationEvent::ClientEvent(ClientEvent::Create(
client, pos,
)))
.await;
} else {
let _ = producer_notify_tx
.send(ProducerEvent::ClientEvent(ClientEvent::Destroy(client)))
let _ = capture_notify_tx
.send(CaptureEvent::ClientEvent(ClientEvent::Destroy(client)))
.await;
let _ = consumer_notify_tx
.send(ConsumerEvent::ClientEvent(ClientEvent::Destroy(client)))
let _ = emulate_notify_tx
.send(EmulationEvent::ClientEvent(ClientEvent::Destroy(client)))
.await;
}
}
pub async fn remove_client(
server: &Server,
producer_notify_tx: &Sender<ProducerEvent>,
consumer_notify_tx: &Sender<ConsumerEvent>,
capture_notify_tx: &Sender<CaptureEvent>,
emulate_notify_tx: &Sender<EmulationEvent>,
frontend: &mut FrontendListener,
client: ClientHandle,
) -> Option<ClientHandle> {
@@ -250,11 +252,11 @@ pub async fn remove_client(
};
if active {
let _ = producer_notify_tx
.send(ProducerEvent::ClientEvent(ClientEvent::Destroy(client)))
let _ = capture_notify_tx
.send(CaptureEvent::ClientEvent(ClientEvent::Destroy(client)))
.await;
let _ = consumer_notify_tx
.send(ConsumerEvent::ClientEvent(ClientEvent::Destroy(client)))
let _ = emulate_notify_tx
.send(EmulationEvent::ClientEvent(ClientEvent::Destroy(client)))
.await;
}
@@ -268,8 +270,8 @@ pub async fn remove_client(
async fn update_client(
server: &Server,
producer_notify_tx: &Sender<ProducerEvent>,
consumer_notify_tx: &Sender<ConsumerEvent>,
capture_notify_tx: &Sender<CaptureEvent>,
emulate_notify_tx: &Sender<EmulationEvent>,
resolve_tx: &Sender<DnsRequest>,
client_update: (ClientHandle, Option<String>, u16, Position),
) {
@@ -311,7 +313,7 @@ async fn update_client(
)
};
// update state in event consumer & producer
// update state in event input emulator & input capture
if changed && active {
// resolve dns
if let Some(hostname) = hostname {
@@ -319,17 +321,19 @@ async fn update_client(
}
// update state
let _ = producer_notify_tx
.send(ProducerEvent::ClientEvent(ClientEvent::Destroy(handle)))
let _ = capture_notify_tx
.send(CaptureEvent::ClientEvent(ClientEvent::Destroy(handle)))
.await;
let _ = consumer_notify_tx
.send(ConsumerEvent::ClientEvent(ClientEvent::Destroy(handle)))
let _ = emulate_notify_tx
.send(EmulationEvent::ClientEvent(ClientEvent::Destroy(handle)))
.await;
let _ = producer_notify_tx
.send(ProducerEvent::ClientEvent(ClientEvent::Create(handle, pos)))
let _ = capture_notify_tx
.send(CaptureEvent::ClientEvent(ClientEvent::Create(handle, pos)))
.await;
let _ = consumer_notify_tx
.send(ConsumerEvent::ClientEvent(ClientEvent::Create(handle, pos)))
let _ = emulate_notify_tx
.send(EmulationEvent::ClientEvent(ClientEvent::Create(
handle, pos,
)))
.await;
}
}

View File

@@ -84,7 +84,6 @@ fn send_event(sock: &UdpSocket, e: Event, addr: SocketAddr) -> Result<usize> {
log::trace!("{:20} ------>->->-> {addr}", e.to_string());
let data: Vec<u8> = (&e).into();
// When udp blocks, we dont want to block the event loop.
// Dropping events is better than potentially crashing the event
// producer.
// Dropping events is better than potentially crashing the input capture.
Ok(sock.try_send_to(&data, addr)?)
}

View File

@@ -7,15 +7,15 @@ use tokio::{
use crate::{client::ClientHandle, event::Event};
use super::{consumer_task::ConsumerEvent, producer_task::ProducerEvent, Server, State};
use super::{capture_task::CaptureEvent, emulation_task::EmulationEvent, Server, State};
const MAX_RESPONSE_TIME: Duration = Duration::from_millis(500);
pub fn new(
server: Server,
sender_ch: Sender<(Event, SocketAddr)>,
consumer_notify: Sender<ConsumerEvent>,
producer_notify: Sender<ProducerEvent>,
emulate_notify: Sender<EmulationEvent>,
capture_notify: Sender<CaptureEvent>,
mut timer_rx: Receiver<()>,
) -> JoinHandle<()> {
// timer task
@@ -114,14 +114,14 @@ pub fn new(
if receiving {
for c in unresponsive_clients {
log::warn!("device not responding, releasing keys!");
let _ = consumer_notify.send(ConsumerEvent::ReleaseKeys(c)).await;
let _ = emulate_notify.send(EmulationEvent::ReleaseKeys(c)).await;
}
} else {
// release pointer if the active client has not responded
if !unresponsive_clients.is_empty() {
log::warn!("client not responding, releasing pointer!");
server.state.replace(State::Receiving);
let _ = producer_notify.send(ProducerEvent::Release).await;
let _ = capture_notify.send(CaptureEvent::Release).await;
}
}
}