From 742b1585d7131be6cd5af08930084ac6a03e4be8 Mon Sep 17 00:00:00 2001 From: Ferdinand Schober Date: Thu, 21 Mar 2024 20:26:57 +0100 Subject: [PATCH] rename producer, consumer to emulation and capture (#98) input emulation / input capture is clearer than event consumer and producer --- src/{producer.rs => capture.rs} | 38 ++++----- src/{producer => capture}/dummy.rs | 12 +-- src/{producer => capture}/libei.rs | 12 +-- src/{producer => capture}/macos.rs | 10 +-- src/{producer => capture}/wayland.rs | 12 +-- src/{producer => capture}/windows.rs | 10 +-- src/{producer => capture}/x11.rs | 10 +-- src/client.rs | 2 +- src/{consumer.rs => emulate.rs} | 38 ++++----- src/{consumer => emulate}/dummy.rs | 8 +- src/{consumer => emulate}/libei.rs | 8 +- src/{consumer => emulate}/macos.rs | 10 +-- src/{consumer => emulate}/windows.rs | 10 +-- src/{consumer => emulate}/wlroots.rs | 21 +++-- src/{consumer => emulate}/x11.rs | 12 +-- .../xdg_desktop_portal.rs | 10 +-- src/lib.rs | 4 +- src/server.rs | 58 +++++++------- .../{producer_task.rs => capture_task.rs} | 38 ++++----- .../{consumer_task.rs => emulation_task.rs} | 60 +++++++------- src/server/frontend_task.rs | 78 ++++++++++--------- src/server/network_task.rs | 3 +- src/server/ping_task.rs | 10 +-- 23 files changed, 237 insertions(+), 237 deletions(-) rename src/{producer.rs => capture.rs} (50%) rename src/{producer => capture}/dummy.rs (74%) rename src/{producer => capture}/libei.rs (98%) rename src/{producer => capture}/macos.rs (78%) rename src/{producer => capture}/wayland.rs (99%) rename src/{producer => capture}/windows.rs (78%) rename src/{producer => capture}/x11.rs (79%) rename src/{consumer.rs => emulate.rs} (64%) rename src/{consumer => emulate}/dummy.rs (80%) rename src/{consumer => emulate}/libei.rs (99%) rename src/{consumer => emulate}/macos.rs (98%) rename src/{consumer => emulate}/windows.rs (97%) rename src/{consumer => emulate}/wlroots.rs (94%) rename src/{consumer => emulate}/x11.rs (95%) rename src/{consumer => emulate}/xdg_desktop_portal.rs (95%) rename src/server/{producer_task.rs => capture_task.rs} (78%) rename src/server/{consumer_task.rs => emulation_task.rs} (80%) diff --git a/src/producer.rs b/src/capture.rs similarity index 50% rename from src/producer.rs rename to src/capture.rs index b58f0b2..02392c5 100644 --- a/src/producer.rs +++ b/src/capture.rs @@ -22,55 +22,55 @@ pub mod windows; #[cfg(all(unix, feature = "x11", not(target_os = "macos")))] pub mod x11; -/// fallback event producer +/// fallback input capture (does not produce events) pub mod dummy; -pub async fn create() -> Box { +pub async fn create() -> Box { #[cfg(target_os = "macos")] - match macos::MacOSProducer::new() { + match macos::MacOSInputCapture::new() { Ok(p) => return Box::new(p), - Err(e) => log::info!("macos event producer not available: {e}"), + Err(e) => log::info!("macos input capture not available: {e}"), } #[cfg(windows)] - match windows::WindowsProducer::new() { + match windows::WindowsInputCapture::new() { Ok(p) => return Box::new(p), - Err(e) => log::info!("windows event producer not available: {e}"), + Err(e) => log::info!("windows input capture not available: {e}"), } #[cfg(all(unix, feature = "libei", not(target_os = "macos")))] - match libei::LibeiProducer::new().await { + match libei::LibeiInputCapture::new().await { Ok(p) => { - log::info!("using libei event producer"); + log::info!("using libei input capture"); return Box::new(p); } - Err(e) => log::info!("libei event producer not available: {e}"), + Err(e) => log::info!("libei input capture not available: {e}"), } #[cfg(all(unix, feature = "wayland", not(target_os = "macos")))] - match wayland::WaylandEventProducer::new() { + match wayland::WaylandInputCapture::new() { Ok(p) => { - log::info!("using layer-shell event producer"); + log::info!("using layer-shell input capture"); return Box::new(p); } - Err(e) => log::info!("layer_shell event producer not available: {e}"), + Err(e) => log::info!("layer_shell input capture not available: {e}"), } #[cfg(all(unix, feature = "x11", not(target_os = "macos")))] - match x11::X11Producer::new() { + match x11::X11InputCapture::new() { Ok(p) => { - log::info!("using x11 event producer"); + log::info!("using x11 input capture"); return Box::new(p); } - Err(e) => log::info!("x11 event producer not available: {e}"), + Err(e) => log::info!("x11 input capture not available: {e}"), } - log::error!("falling back to dummy event producer"); - Box::new(dummy::DummyProducer::new()) + log::error!("falling back to dummy input capture"); + Box::new(dummy::DummyInputCapture::new()) } -pub trait EventProducer: Stream> + Unpin { - /// notify event producer of configuration changes +pub trait InputCapture: Stream> + Unpin { + /// notify input capture of configuration changes fn notify(&mut self, event: ClientEvent) -> io::Result<()>; /// release mouse diff --git a/src/producer/dummy.rs b/src/capture/dummy.rs similarity index 74% rename from src/producer/dummy.rs rename to src/capture/dummy.rs index c0c7f71..045f22f 100644 --- a/src/producer/dummy.rs +++ b/src/capture/dummy.rs @@ -4,26 +4,26 @@ use std::task::{Context, Poll}; use futures_core::Stream; +use crate::capture::InputCapture; use crate::event::Event; -use crate::producer::EventProducer; use crate::client::{ClientEvent, ClientHandle}; -pub struct DummyProducer {} +pub struct DummyInputCapture {} -impl DummyProducer { +impl DummyInputCapture { pub fn new() -> Self { Self {} } } -impl Default for DummyProducer { +impl Default for DummyInputCapture { fn default() -> Self { Self::new() } } -impl EventProducer for DummyProducer { +impl InputCapture for DummyInputCapture { fn notify(&mut self, _event: ClientEvent) -> io::Result<()> { Ok(()) } @@ -33,7 +33,7 @@ impl EventProducer for DummyProducer { } } -impl Stream for DummyProducer { +impl Stream for DummyInputCapture { type Item = io::Result<(ClientHandle, Event)>; fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { diff --git a/src/producer/libei.rs b/src/capture/libei.rs similarity index 98% rename from src/producer/libei.rs rename to src/capture/libei.rs index b207e54..44e49c0 100644 --- a/src/producer/libei.rs +++ b/src/capture/libei.rs @@ -31,9 +31,9 @@ use futures_core::Stream; use once_cell::sync::Lazy; use crate::{ + capture::InputCapture as LanMouseInputCapture, client::{ClientEvent, ClientHandle, Position}, event::{Event, KeyboardEvent, PointerEvent}, - producer::EventProducer, }; #[derive(Debug)] @@ -43,7 +43,7 @@ enum ProducerEvent { } #[allow(dead_code)] -pub struct LibeiProducer<'a> { +pub struct LibeiInputCapture<'a> { input_capture: Pin>>, libei_task: JoinHandle>, event_rx: tokio::sync::mpsc::Receiver<(u32, Event)>, @@ -123,7 +123,7 @@ async fn update_barriers( Ok(id_map) } -impl<'a> Drop for LibeiProducer<'a> { +impl<'a> Drop for LibeiInputCapture<'a> { fn drop(&mut self) { self.libei_task.abort(); } @@ -212,7 +212,7 @@ async fn wait_for_active_client( Ok(()) } -impl<'a> LibeiProducer<'a> { +impl<'a> LibeiInputCapture<'a> { pub async fn new() -> Result { let input_capture = Box::pin(InputCapture::new().await?); let input_capture_ptr = input_capture.as_ref().get_ref() as *const InputCapture<'static>; @@ -522,7 +522,7 @@ async fn handle_ei_event( } } -impl<'a> EventProducer for LibeiProducer<'a> { +impl<'a> LanMouseInputCapture for LibeiInputCapture<'a> { fn notify(&mut self, event: ClientEvent) -> io::Result<()> { let notify_tx = self.notify_tx.clone(); tokio::task::spawn_local(async move { @@ -543,7 +543,7 @@ impl<'a> EventProducer for LibeiProducer<'a> { } } -impl<'a> Stream for LibeiProducer<'a> { +impl<'a> Stream for LibeiInputCapture<'a> { type Item = io::Result<(ClientHandle, Event)>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { diff --git a/src/producer/macos.rs b/src/capture/macos.rs similarity index 78% rename from src/producer/macos.rs rename to src/capture/macos.rs index bf96d2e..f95fbde 100644 --- a/src/producer/macos.rs +++ b/src/capture/macos.rs @@ -1,20 +1,20 @@ +use crate::capture::InputCapture; use crate::client::{ClientEvent, ClientHandle}; use crate::event::Event; -use crate::producer::EventProducer; use anyhow::{anyhow, Result}; use futures_core::Stream; use std::task::{Context, Poll}; use std::{io, pin::Pin}; -pub struct MacOSProducer; +pub struct MacOSInputCapture; -impl MacOSProducer { +impl MacOSInputCapture { pub fn new() -> Result { Err(anyhow!("not yet implemented")) } } -impl Stream for MacOSProducer { +impl Stream for MacOSInputCapture { type Item = io::Result<(ClientHandle, Event)>; fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { @@ -22,7 +22,7 @@ impl Stream for MacOSProducer { } } -impl EventProducer for MacOSProducer { +impl InputCapture for MacOSInputCapture { fn notify(&mut self, _event: ClientEvent) -> io::Result<()> { Ok(()) } diff --git a/src/producer/wayland.rs b/src/capture/wayland.rs similarity index 99% rename from src/producer/wayland.rs rename to src/capture/wayland.rs index 127684c..782a018 100644 --- a/src/producer/wayland.rs +++ b/src/capture/wayland.rs @@ -1,6 +1,6 @@ use crate::{ + capture::InputCapture, client::{ClientEvent, ClientHandle, Position}, - producer::EventProducer, }; use anyhow::{anyhow, Result}; @@ -124,7 +124,7 @@ impl AsRawFd for Inner { } } -pub struct WaylandEventProducer(AsyncFd); +pub struct WaylandInputCapture(AsyncFd); struct Window { buffer: wl_buffer::WlBuffer, @@ -256,7 +256,7 @@ fn draw(f: &mut File, (width, height): (u32, u32)) { } } -impl WaylandEventProducer { +impl WaylandInputCapture { pub fn new() -> Result { let conn = match Connection::connect_to_env() { Ok(c) => c, @@ -390,7 +390,7 @@ impl WaylandEventProducer { let inner = AsyncFd::new(Inner { queue, state })?; - Ok(WaylandEventProducer(inner)) + Ok(WaylandInputCapture(inner)) } fn add_client(&mut self, handle: ClientHandle, pos: Position) { @@ -587,7 +587,7 @@ impl Inner { } } -impl EventProducer for WaylandEventProducer { +impl InputCapture for WaylandInputCapture { fn notify(&mut self, client_event: ClientEvent) -> io::Result<()> { match client_event { ClientEvent::Create(handle, pos) => { @@ -609,7 +609,7 @@ impl EventProducer for WaylandEventProducer { } } -impl Stream for WaylandEventProducer { +impl Stream for WaylandInputCapture { type Item = io::Result<(ClientHandle, Event)>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { diff --git a/src/producer/windows.rs b/src/capture/windows.rs similarity index 78% rename from src/producer/windows.rs rename to src/capture/windows.rs index 6cffab4..e132f40 100644 --- a/src/producer/windows.rs +++ b/src/capture/windows.rs @@ -4,14 +4,14 @@ use futures::Stream; use std::{io, pin::Pin}; use crate::{ + capture::InputCapture, client::{ClientEvent, ClientHandle}, event::Event, - producer::EventProducer, }; -pub struct WindowsProducer {} +pub struct WindowsInputCapture {} -impl EventProducer for WindowsProducer { +impl InputCapture for WindowsInputCapture { fn notify(&mut self, _event: ClientEvent) -> io::Result<()> { Ok(()) } @@ -21,13 +21,13 @@ impl EventProducer for WindowsProducer { } } -impl WindowsProducer { +impl WindowsInputCapture { pub(crate) fn new() -> Result { Err(anyhow!("not implemented")) } } -impl Stream for WindowsProducer { +impl Stream for WindowsInputCapture { type Item = io::Result<(ClientHandle, Event)>; fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { Poll::Pending diff --git a/src/producer/x11.rs b/src/capture/x11.rs similarity index 79% rename from src/producer/x11.rs rename to src/capture/x11.rs index 1811e2c..7f9654b 100644 --- a/src/producer/x11.rs +++ b/src/capture/x11.rs @@ -4,20 +4,20 @@ use std::task::Poll; use futures_core::Stream; +use crate::capture::InputCapture; use crate::event::Event; -use crate::producer::EventProducer; use crate::client::{ClientEvent, ClientHandle}; -pub struct X11Producer {} +pub struct X11InputCapture {} -impl X11Producer { +impl X11InputCapture { pub fn new() -> Result { Err(anyhow!("not implemented")) } } -impl EventProducer for X11Producer { +impl InputCapture for X11InputCapture { fn notify(&mut self, _event: ClientEvent) -> io::Result<()> { Ok(()) } @@ -27,7 +27,7 @@ impl EventProducer for X11Producer { } } -impl Stream for X11Producer { +impl Stream for X11InputCapture { type Item = io::Result<(ClientHandle, Event)>; fn poll_next( diff --git a/src/client.rs b/src/client.rs index b07de64..03d6f20 100644 --- a/src/client.rs +++ b/src/client.rs @@ -67,7 +67,7 @@ pub struct Client { /// fix ips, determined by the user pub fix_ips: Vec, /// unique handle to refer to the client. - /// This way any event consumer / producer backend does not + /// This way any emulation / capture backend does not /// need to know anything about a client other than its handle. pub handle: ClientHandle, /// all ip addresses associated with a particular client diff --git a/src/consumer.rs b/src/emulate.rs similarity index 64% rename from src/consumer.rs rename to src/emulate.rs index 35a9b1a..4819b5c 100644 --- a/src/consumer.rs +++ b/src/emulate.rs @@ -25,11 +25,11 @@ pub mod libei; #[cfg(target_os = "macos")] pub mod macos; -/// fallback consumer +/// fallback input emulation (logs events) pub mod dummy; #[async_trait] -pub trait EventConsumer: Send { +pub trait InputEmulation: Send { async fn consume(&mut self, event: Event, client_handle: ClientHandle); async fn notify(&mut self, client_event: ClientEvent); /// this function is waited on continuously and can be used to handle events @@ -41,58 +41,58 @@ pub trait EventConsumer: Send { async fn destroy(&mut self); } -pub async fn create() -> Box { +pub async fn create() -> Box { #[cfg(windows)] - match windows::WindowsConsumer::new() { + match windows::WindowsEmulation::new() { Ok(c) => return Box::new(c), - Err(e) => log::warn!("windows event consumer unavailable: {e}"), + Err(e) => log::warn!("windows input emulation unavailable: {e}"), } #[cfg(target_os = "macos")] - match macos::MacOSConsumer::new() { + match macos::MacOSEmulation::new() { Ok(c) => { - log::info!("using macos event consumer"); + log::info!("using macos input emulation"); return Box::new(c); } - Err(e) => log::error!("macos consumer not available: {e}"), + Err(e) => log::error!("macos input emulatino not available: {e}"), } #[cfg(all(unix, feature = "wayland", not(target_os = "macos")))] - match wlroots::WlrootsConsumer::new() { + match wlroots::WlrootsEmulation::new() { Ok(c) => { - log::info!("using wlroots event consumer"); + log::info!("using wlroots input emulation"); return Box::new(c); } Err(e) => log::info!("wayland backend not available: {e}"), } #[cfg(all(unix, feature = "libei", not(target_os = "macos")))] - match libei::LibeiConsumer::new().await { + match libei::LibeiEmulation::new().await { Ok(c) => { - log::info!("using libei event consumer"); + log::info!("using libei input emulation"); return Box::new(c); } Err(e) => log::info!("libei not available: {e}"), } #[cfg(all(unix, feature = "xdg_desktop_portal", not(target_os = "macos")))] - match xdg_desktop_portal::DesktopPortalConsumer::new().await { + match xdg_desktop_portal::DesktopPortalEmulation::new().await { Ok(c) => { - log::info!("using xdg-remote-desktop-portal event consumer"); + log::info!("using xdg-remote-desktop-portal input emulation"); return Box::new(c); } Err(e) => log::info!("remote desktop portal not available: {e}"), } #[cfg(all(unix, feature = "x11", not(target_os = "macos")))] - match x11::X11Consumer::new() { + match x11::X11Emulation::new() { Ok(c) => { - log::info!("using x11 event consumer"); + log::info!("using x11 input emulation"); return Box::new(c); } - Err(e) => log::info!("x11 consumer not available: {e}"), + Err(e) => log::info!("x11 input emulation not available: {e}"), } - log::error!("falling back to dummy event consumer"); - Box::new(dummy::DummyConsumer::new()) + log::error!("falling back to dummy input emulation"); + Box::new(dummy::DummyEmulation::new()) } diff --git a/src/consumer/dummy.rs b/src/emulate/dummy.rs similarity index 80% rename from src/consumer/dummy.rs rename to src/emulate/dummy.rs index c441f12..fbb7b31 100644 --- a/src/consumer/dummy.rs +++ b/src/emulate/dummy.rs @@ -1,21 +1,21 @@ use crate::{ client::{ClientEvent, ClientHandle}, - consumer::EventConsumer, + emulate::InputEmulation, event::Event, }; use async_trait::async_trait; #[derive(Default)] -pub struct DummyConsumer; +pub struct DummyEmulation; -impl DummyConsumer { +impl DummyEmulation { pub fn new() -> Self { Self {} } } #[async_trait] -impl EventConsumer for DummyConsumer { +impl InputEmulation for DummyEmulation { async fn consume(&mut self, event: Event, client_handle: ClientHandle) { log::info!("received event: ({client_handle}) {event}"); } diff --git a/src/consumer/libei.rs b/src/emulate/libei.rs similarity index 99% rename from src/consumer/libei.rs rename to src/emulate/libei.rs index 55be80e..ec5ec38 100644 --- a/src/consumer/libei.rs +++ b/src/emulate/libei.rs @@ -23,11 +23,11 @@ use reis::{ use crate::{ client::{ClientEvent, ClientHandle}, - consumer::EventConsumer, + emulate::InputEmulation, event::Event, }; -pub struct LibeiConsumer { +pub struct LibeiEmulation { handshake: bool, context: ei::Context, events: EiEventStream, @@ -76,7 +76,7 @@ async fn get_ei_fd() -> Result { proxy.connect_to_eis(&session).await } -impl LibeiConsumer { +impl LibeiEmulation { pub async fn new() -> Result { // fd is owned by the message, so we need to dup it let eifd = get_ei_fd().await?; @@ -107,7 +107,7 @@ impl LibeiConsumer { } #[async_trait] -impl EventConsumer for LibeiConsumer { +impl InputEmulation for LibeiEmulation { async fn consume(&mut self, event: Event, _client_handle: ClientHandle) { let now = SystemTime::now() .duration_since(UNIX_EPOCH) diff --git a/src/consumer/macos.rs b/src/emulate/macos.rs similarity index 98% rename from src/consumer/macos.rs rename to src/emulate/macos.rs index 954ec2b..e5b0d61 100644 --- a/src/consumer/macos.rs +++ b/src/emulate/macos.rs @@ -1,5 +1,5 @@ use crate::client::{ClientEvent, ClientHandle}; -use crate::consumer::EventConsumer; +use crate::emulate::InputEmulation; use crate::event::{Event, KeyboardEvent, PointerEvent}; use anyhow::{anyhow, Result}; use async_trait::async_trait; @@ -16,7 +16,7 @@ use tokio::task::AbortHandle; const DEFAULT_REPEAT_DELAY: Duration = Duration::from_millis(500); const DEFAULT_REPEAT_INTERVAL: Duration = Duration::from_millis(32); -pub struct MacOSConsumer { +pub struct MacOSEmulation { pub event_source: CGEventSource, repeat_task: Option, button_state: ButtonState, @@ -50,9 +50,9 @@ impl IndexMut for ButtonState { } } -unsafe impl Send for MacOSConsumer {} +unsafe impl Send for MacOSEmulation {} -impl MacOSConsumer { +impl MacOSEmulation { pub fn new() -> Result { let event_source = match CGEventSource::new(CGEventSourceStateID::CombinedSessionState) { Ok(e) => e, @@ -108,7 +108,7 @@ fn key_event(event_source: CGEventSource, key: u16, state: u8) { } #[async_trait] -impl EventConsumer for MacOSConsumer { +impl InputEmulation for MacOSEmulation { async fn consume(&mut self, event: Event, _client_handle: ClientHandle) { match event { Event::Pointer(pointer_event) => match pointer_event { diff --git a/src/consumer/windows.rs b/src/emulate/windows.rs similarity index 97% rename from src/consumer/windows.rs rename to src/emulate/windows.rs index 2a21d42..d21ace1 100644 --- a/src/consumer/windows.rs +++ b/src/emulate/windows.rs @@ -1,5 +1,5 @@ use crate::{ - consumer::EventConsumer, + emulate::InputEmulation, event::{KeyboardEvent, PointerEvent}, scancode, }; @@ -26,18 +26,18 @@ use crate::{ const DEFAULT_REPEAT_DELAY: Duration = Duration::from_millis(500); const DEFAULT_REPEAT_INTERVAL: Duration = Duration::from_millis(32); -pub struct WindowsConsumer { +pub struct WindowsEmulation { repeat_task: Option, } -impl WindowsConsumer { +impl WindowsEmulation { pub fn new() -> Result { Ok(Self { repeat_task: None }) } } #[async_trait] -impl EventConsumer for WindowsConsumer { +impl InputEmulation for WindowsEmulation { async fn consume(&mut self, event: Event, _: ClientHandle) { match event { Event::Pointer(pointer_event) => match pointer_event { @@ -87,7 +87,7 @@ impl EventConsumer for WindowsConsumer { async fn destroy(&mut self) {} } -impl WindowsConsumer { +impl WindowsEmulation { async fn spawn_repeat_task(&mut self, key: u32) { // there can only be one repeating key and it's // always the last to be pressed diff --git a/src/consumer/wlroots.rs b/src/emulate/wlroots.rs similarity index 94% rename from src/consumer/wlroots.rs rename to src/emulate/wlroots.rs index c1519bd..54a44a6 100644 --- a/src/consumer/wlroots.rs +++ b/src/emulate/wlroots.rs @@ -1,5 +1,5 @@ use crate::client::{ClientEvent, ClientHandle}; -use crate::consumer::EventConsumer; +use crate::emulate::InputEmulation; use async_trait::async_trait; use std::collections::HashMap; use std::io; @@ -40,13 +40,13 @@ struct State { } // App State, implements Dispatch event handlers -pub(crate) struct WlrootsConsumer { +pub(crate) struct WlrootsEmulation { last_flush_failed: bool, state: State, queue: EventQueue, } -impl WlrootsConsumer { +impl WlrootsEmulation { pub fn new() -> Result { let conn = Connection::connect_to_env()?; let (globals, queue) = registry_queue_init::(&conn)?; @@ -62,7 +62,7 @@ impl WlrootsConsumer { let input_for_client: HashMap = HashMap::new(); - let mut consumer = WlrootsConsumer { + let mut emulate = WlrootsEmulation { last_flush_failed: false, state: State { keymap: None, @@ -74,16 +74,13 @@ impl WlrootsConsumer { }, queue, }; - while consumer.state.keymap.is_none() { - consumer - .queue - .blocking_dispatch(&mut consumer.state) - .unwrap(); + while emulate.state.keymap.is_none() { + emulate.queue.blocking_dispatch(&mut emulate.state).unwrap(); } - // let fd = unsafe { &File::from_raw_fd(consumer.state.keymap.unwrap().1.as_raw_fd()) }; + // let fd = unsafe { &File::from_raw_fd(emulate.state.keymap.unwrap().1.as_raw_fd()) }; // let mmap = unsafe { MmapOptions::new().map_copy(fd).unwrap() }; // log::debug!("{:?}", &mmap[..100]); - Ok(consumer) + Ok(emulate) } } @@ -106,7 +103,7 @@ impl State { } #[async_trait] -impl EventConsumer for WlrootsConsumer { +impl InputEmulation for WlrootsEmulation { async fn consume(&mut self, event: Event, client_handle: ClientHandle) { if let Some(virtual_input) = self.state.input_for_client.get(&client_handle) { if self.last_flush_failed { diff --git a/src/consumer/x11.rs b/src/emulate/x11.rs similarity index 95% rename from src/consumer/x11.rs rename to src/emulate/x11.rs index 3e1aed5..8979a44 100644 --- a/src/consumer/x11.rs +++ b/src/emulate/x11.rs @@ -8,19 +8,19 @@ use x11::{ use crate::{ client::ClientHandle, - consumer::EventConsumer, + emulate::InputEmulation, event::{ Event, KeyboardEvent, PointerEvent, BTN_BACK, BTN_FORWARD, BTN_LEFT, BTN_MIDDLE, BTN_RIGHT, }, }; -pub struct X11Consumer { +pub struct X11Emulation { display: *mut xlib::Display, } -unsafe impl Send for X11Consumer {} +unsafe impl Send for X11Emulation {} -impl X11Consumer { +impl X11Emulation { pub fn new() -> Result { let display = unsafe { match xlib::XOpenDisplay(ptr::null()) { @@ -91,7 +91,7 @@ impl X11Consumer { } } -impl Drop for X11Consumer { +impl Drop for X11Emulation { fn drop(&mut self) { unsafe { XCloseDisplay(self.display); @@ -100,7 +100,7 @@ impl Drop for X11Consumer { } #[async_trait] -impl EventConsumer for X11Consumer { +impl InputEmulation for X11Emulation { async fn consume(&mut self, event: Event, _: ClientHandle) { match event { Event::Pointer(pointer_event) => match pointer_event { diff --git a/src/consumer/xdg_desktop_portal.rs b/src/emulate/xdg_desktop_portal.rs similarity index 95% rename from src/consumer/xdg_desktop_portal.rs rename to src/emulate/xdg_desktop_portal.rs index 18ccfb0..21a2bdf 100644 --- a/src/consumer/xdg_desktop_portal.rs +++ b/src/emulate/xdg_desktop_portal.rs @@ -10,20 +10,20 @@ use async_trait::async_trait; use crate::{ client::ClientEvent, - consumer::EventConsumer, + emulate::InputEmulation, event::{ Event::{Keyboard, Pointer}, KeyboardEvent, PointerEvent, }, }; -pub struct DesktopPortalConsumer<'a> { +pub struct DesktopPortalEmulation<'a> { proxy: RemoteDesktop<'a>, session: Session<'a>, } -impl<'a> DesktopPortalConsumer<'a> { - pub async fn new() -> Result> { +impl<'a> DesktopPortalEmulation<'a> { + pub async fn new() -> Result> { log::debug!("connecting to org.freedesktop.portal.RemoteDesktop portal ..."); let proxy = RemoteDesktop::new().await?; @@ -59,7 +59,7 @@ impl<'a> DesktopPortalConsumer<'a> { } #[async_trait] -impl<'a> EventConsumer for DesktopPortalConsumer<'a> { +impl<'a> InputEmulation for DesktopPortalEmulation<'a> { async fn consume(&mut self, event: crate::event::Event, _client: crate::client::ClientHandle) { match event { Pointer(p) => { diff --git a/src/lib.rs b/src/lib.rs index 79be8dc..ed36a52 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -4,8 +4,8 @@ pub mod dns; pub mod event; pub mod server; -pub mod consumer; -pub mod producer; +pub mod capture; +pub mod emulate; pub mod frontend; pub mod scancode; diff --git a/src/server.rs b/src/server.rs index 4a6ee9d..454556e 100644 --- a/src/server.rs +++ b/src/server.rs @@ -5,22 +5,22 @@ use std::{ }; use tokio::signal; +use crate::{capture, emulate}; use crate::{ client::{ClientHandle, ClientManager}, config::Config, dns, frontend::{FrontendEvent, FrontendListener}, - server::producer_task::ProducerEvent, + server::capture_task::CaptureEvent, }; -use crate::{consumer, producer}; -use self::{consumer_task::ConsumerEvent, resolver_task::DnsRequest}; +use self::{emulation_task::EmulationEvent, resolver_task::DnsRequest}; -mod consumer_task; +mod capture_task; +mod emulation_task; mod frontend_task; mod network_task; mod ping_task; -mod producer_task; mod resolver_task; #[derive(Clone, Copy, Debug, Eq, PartialEq)] @@ -78,7 +78,7 @@ impl Server { return anyhow::Ok(()); } }; - let (consumer, producer) = tokio::join!(consumer::create(), producer::create()); + let (emulate, capture) = tokio::join!(emulate::create(), capture::create()); let (timer_tx, timer_rx) = tokio::sync::mpsc::channel(1); let (frontend_notify_tx, frontend_notify_rx) = tokio::sync::mpsc::channel(1); @@ -87,22 +87,22 @@ impl Server { let (mut udp_task, sender_tx, receiver_rx, port_tx) = network_task::new(self.clone(), frontend_notify_tx).await?; - // event producer - let (mut producer_task, producer_channel) = producer_task::new( - producer, + // input capture + let (mut capture_task, capture_channel) = capture_task::new( + capture, self.clone(), sender_tx.clone(), timer_tx.clone(), self.release_bind.clone(), ); - // event consumer - let (mut consumer_task, consumer_channel) = consumer_task::new( - consumer, + // input emulation + let (mut emulation_task, emulate_channel) = emulation_task::new( + emulate, self.clone(), receiver_rx, sender_tx.clone(), - producer_channel.clone(), + capture_channel.clone(), timer_tx, ); @@ -115,8 +115,8 @@ impl Server { frontend, frontend_notify_rx, self.clone(), - producer_channel.clone(), - consumer_channel.clone(), + capture_channel.clone(), + emulate_channel.clone(), resolve_tx.clone(), port_tx, ); @@ -125,8 +125,8 @@ impl Server { let mut ping_task = ping_task::new( self.clone(), sender_tx.clone(), - consumer_channel.clone(), - producer_channel.clone(), + emulate_channel.clone(), + capture_channel.clone(), timer_rx, ); @@ -156,14 +156,14 @@ impl Server { _ = signal::ctrl_c() => { log::info!("terminating service"); } - e = &mut producer_task => { + e = &mut capture_task => { if let Ok(Err(e)) = e { - log::error!("error in event producer: {e}"); + log::error!("error in input capture task: {e}"); } } - e = &mut consumer_task => { + e = &mut emulation_task => { if let Ok(Err(e)) = e { - log::error!("error in event consumer: {e}"); + log::error!("error in input emulation task: {e}"); } } e = &mut frontend_task => { @@ -176,18 +176,18 @@ impl Server { _ = &mut ping_task => { } } - let _ = consumer_channel.send(ConsumerEvent::Terminate).await; - let _ = producer_channel.send(ProducerEvent::Terminate).await; + let _ = emulate_channel.send(EmulationEvent::Terminate).await; + let _ = capture_channel.send(CaptureEvent::Terminate).await; let _ = frontend_tx.send(FrontendEvent::Shutdown()).await; - if !producer_task.is_finished() { - if let Err(e) = producer_task.await { - log::error!("error in event producer: {e}"); + if !capture_task.is_finished() { + if let Err(e) = capture_task.await { + log::error!("error in input capture task: {e}"); } } - if !consumer_task.is_finished() { - if let Err(e) = consumer_task.await { - log::error!("error in event consumer: {e}"); + if !emulation_task.is_finished() { + if let Err(e) = emulation_task.await { + log::error!("error in input emulation task: {e}"); } } diff --git a/src/server/producer_task.rs b/src/server/capture_task.rs similarity index 78% rename from src/server/producer_task.rs rename to src/server/capture_task.rs index efc0fed..886455b 100644 --- a/src/server/producer_task.rs +++ b/src/server/capture_task.rs @@ -5,9 +5,9 @@ use std::{collections::HashSet, net::SocketAddr}; use tokio::{sync::mpsc::Sender, task::JoinHandle}; use crate::{ + capture::InputCapture, client::{ClientEvent, ClientHandle}, event::{Event, KeyboardEvent}, - producer::EventProducer, scancode, server::State, }; @@ -15,45 +15,45 @@ use crate::{ use super::Server; #[derive(Clone, Copy, Debug)] -pub enum ProducerEvent { - /// producer must release the mouse +pub enum CaptureEvent { + /// capture must release the mouse Release, - /// producer is notified of a change in client states + /// capture is notified of a change in client states ClientEvent(ClientEvent), /// termination signal Terminate, } pub fn new( - mut producer: Box, + mut capture: Box, server: Server, sender_tx: Sender<(Event, SocketAddr)>, timer_tx: Sender<()>, release_bind: Vec, -) -> (JoinHandle>, Sender) { +) -> (JoinHandle>, Sender) { let (tx, mut rx) = tokio::sync::mpsc::channel(32); let task = tokio::task::spawn_local(async move { let mut pressed_keys = HashSet::new(); loop { tokio::select! { - event = producer.next() => { + event = capture.next() => { match event { - Some(Ok(event)) => handle_producer_event(&server, &mut producer, &sender_tx, &timer_tx, event, &mut pressed_keys, &release_bind).await?, - Some(Err(e)) => return Err(anyhow!("event producer: {e:?}")), - None => return Err(anyhow!("event producer closed")), + Some(Ok(event)) => handle_capture_event(&server, &mut capture, &sender_tx, &timer_tx, event, &mut pressed_keys, &release_bind).await?, + Some(Err(e)) => return Err(anyhow!("input capture: {e:?}")), + None => return Err(anyhow!("input capture terminated")), } } e = rx.recv() => { - log::debug!("producer notify rx: {e:?}"); + log::debug!("input capture notify rx: {e:?}"); match e { Some(e) => match e { - ProducerEvent::Release => { - producer.release()?; + CaptureEvent::Release => { + capture.release()?; server.state.replace(State::Receiving); } - ProducerEvent::ClientEvent(e) => producer.notify(e)?, - ProducerEvent::Terminate => break, + CaptureEvent::ClientEvent(e) => capture.notify(e)?, + CaptureEvent::Terminate => break, }, None => break, } @@ -75,9 +75,9 @@ fn update_pressed_keys(pressed_keys: &mut HashSet, key: u32, st } } -async fn handle_producer_event( +async fn handle_capture_event( server: &Server, - producer: &mut Box, + capture: &mut Box, sender_tx: &Sender<(Event, SocketAddr)>, timer_tx: &Sender<()>, event: (ClientHandle, Event), @@ -93,7 +93,7 @@ async fn handle_producer_event( if release_bind.iter().all(|k| pressed_keys.contains(k)) { pressed_keys.clear(); log::info!("releasing pointer"); - producer.release()?; + capture.release()?; server.state.replace(State::Receiving); log::trace!("STATE ===> Receiving"); // send an event to release all the modifiers @@ -112,7 +112,7 @@ async fn handle_producer_event( None => { // should not happen log::warn!("unknown client!"); - producer.release()?; + capture.release()?; server.state.replace(State::Receiving); log::trace!("STATE ===> Receiving"); return Ok(()); diff --git a/src/server/consumer_task.rs b/src/server/emulation_task.rs similarity index 80% rename from src/server/consumer_task.rs rename to src/server/emulation_task.rs index 96e7eaa..ee30d68 100644 --- a/src/server/consumer_task.rs +++ b/src/server/emulation_task.rs @@ -8,53 +8,53 @@ use tokio::{ use crate::{ client::{ClientEvent, ClientHandle}, - consumer::EventConsumer, + emulate::InputEmulation, event::{Event, KeyboardEvent}, scancode, server::State, }; -use super::{ProducerEvent, Server}; +use super::{CaptureEvent, Server}; #[derive(Clone, Debug)] -pub enum ConsumerEvent { - /// consumer is notified of a change in client states +pub enum EmulationEvent { + /// input emulation is notified of a change in client states ClientEvent(ClientEvent), - /// consumer must release keys for client + /// input emulation must release keys for client ReleaseKeys(ClientHandle), /// termination signal Terminate, } pub fn new( - mut consumer: Box, + mut emulate: Box, server: Server, mut udp_rx: Receiver>, sender_tx: Sender<(Event, SocketAddr)>, - producer_tx: Sender, + capture_tx: Sender, timer_tx: Sender<()>, -) -> (JoinHandle>, Sender) { +) -> (JoinHandle>, Sender) { let (tx, mut rx) = tokio::sync::mpsc::channel(32); - let consumer_task = tokio::task::spawn_local(async move { + let emulate_task = tokio::task::spawn_local(async move { let mut last_ignored = None; loop { tokio::select! { udp_event = udp_rx.recv() => { let udp_event = udp_event.ok_or(anyhow!("receiver closed"))??; - handle_udp_rx(&server, &producer_tx, &mut consumer, &sender_tx, &mut last_ignored, udp_event, &timer_tx).await; + handle_udp_rx(&server, &capture_tx, &mut emulate, &sender_tx, &mut last_ignored, udp_event, &timer_tx).await; } - consumer_event = rx.recv() => { - match consumer_event { + emulate_event = rx.recv() => { + match emulate_event { Some(e) => match e { - ConsumerEvent::ClientEvent(e) => consumer.notify(e).await, - ConsumerEvent::ReleaseKeys(c) => release_keys(&server, &mut consumer, c).await, - ConsumerEvent::Terminate => break, + EmulationEvent::ClientEvent(e) => emulate.notify(e).await, + EmulationEvent::ReleaseKeys(c) => release_keys(&server, &mut emulate, c).await, + EmulationEvent::Terminate => break, }, None => break, } } - res = consumer.dispatch() => { + res = emulate.dispatch() => { res?; } } @@ -68,20 +68,20 @@ pub fn new( .map(|s| s.client.handle) .collect::>(); for client in clients { - release_keys(&server, &mut consumer, client).await; + release_keys(&server, &mut emulate, client).await; } - // destroy consumer - consumer.destroy().await; + // destroy emulator + emulate.destroy().await; anyhow::Ok(()) }); - (consumer_task, tx) + (emulate_task, tx) } async fn handle_udp_rx( server: &Server, - producer_notify_tx: &Sender, - consumer: &mut Box, + capture_tx: &Sender, + emulate: &mut Box, sender_tx: &Sender<(Event, SocketAddr)>, last_ignored: &mut Option, event: (Event, SocketAddr), @@ -127,7 +127,7 @@ async fn handle_udp_rx( let _ = sender_tx.send((Event::Pong(), addr)).await; } (Event::Disconnect(), _) => { - release_keys(server, consumer, handle).await; + release_keys(server, emulate, handle).await; } (event, addr) => { // tell clients that we are ready to receive events @@ -143,7 +143,7 @@ async fn handle_udp_rx( } else { // upon receiving any event, we go back to receiving mode server.state.replace(State::Receiving); - let _ = producer_notify_tx.send(ProducerEvent::Release).await; + let _ = capture_tx.send(CaptureEvent::Release).await; log::trace!("STATE ===> Receiving"); } } @@ -176,8 +176,8 @@ async fn handle_udp_rx( // workaround buggy rdp backend. if !ignore_event { // consume event - consumer.consume(event, handle).await; - log::trace!("{event:?} => consumer"); + emulate.consume(event, handle).await; + log::trace!("{event:?} => emulate"); } } State::AwaitingLeave => { @@ -194,7 +194,7 @@ async fn handle_udp_rx( // event should still be possible if let Event::Enter() = event { server.state.replace(State::Receiving); - let _ = producer_notify_tx.send(ProducerEvent::Release).await; + let _ = capture_tx.send(CaptureEvent::Release).await; log::trace!("STATE ===> Receiving"); } } @@ -205,7 +205,7 @@ async fn handle_udp_rx( async fn release_keys( server: &Server, - consumer: &mut Box, + emulate: &mut Box, client: ClientHandle, ) { let keys = server @@ -222,7 +222,7 @@ async fn release_keys( key, state: 0, }); - consumer.consume(event, client).await; + emulate.consume(event, client).await; if let Ok(key) = scancode::Linux::try_from(key) { log::warn!("releasing stuck key: {key:?}"); } @@ -234,7 +234,7 @@ async fn release_keys( mods_locked: 0, group: 0, }; - consumer + emulate .consume(Event::Keyboard(modifiers_event), client) .await; } diff --git a/src/server/frontend_task.rs b/src/server/frontend_task.rs index 05f3567..aa68873 100644 --- a/src/server/frontend_task.rs +++ b/src/server/frontend_task.rs @@ -22,15 +22,15 @@ use crate::{ }; use super::{ - consumer_task::ConsumerEvent, producer_task::ProducerEvent, resolver_task::DnsRequest, Server, + capture_task::CaptureEvent, emulation_task::EmulationEvent, resolver_task::DnsRequest, Server, }; pub(crate) fn new( mut frontend: FrontendListener, mut notify_rx: Receiver, server: Server, - producer_notify: Sender, - consumer_notify: Sender, + capture_notify: Sender, + emulate_notify: Sender, resolve_ch: Sender, port_tx: Sender, ) -> (JoinHandle>, Sender) { @@ -51,7 +51,7 @@ pub(crate) fn new( } event = event_rx.recv() => { let frontend_event = event.ok_or(anyhow!("frontend channel closed"))?; - if handle_frontend_event(&server, &producer_notify, &consumer_notify, &resolve_ch, &mut frontend, &port_tx, frontend_event).await { + if handle_frontend_event(&server, &capture_notify, &emulate_notify, &resolve_ch, &mut frontend, &port_tx, frontend_event).await { break; } } @@ -98,8 +98,8 @@ async fn handle_frontend_stream( async fn handle_frontend_event( server: &Server, - producer_tx: &Sender, - consumer_tx: &Sender, + capture_tx: &Sender, + emulate_tx: &Sender, resolve_tx: &Sender, frontend: &mut FrontendListener, port_tx: &Sender, @@ -120,7 +120,7 @@ async fn handle_frontend_event( Some(FrontendNotify::NotifyClientCreate(client)) } FrontendEvent::ActivateClient(handle, active) => { - activate_client(server, producer_tx, consumer_tx, handle, active).await; + activate_client(server, capture_tx, emulate_tx, handle, active).await; Some(FrontendNotify::NotifyClientActivate(handle, active)) } FrontendEvent::ChangePort(port) => { @@ -128,7 +128,7 @@ async fn handle_frontend_event( None } FrontendEvent::DelClient(handle) => { - remove_client(server, producer_tx, consumer_tx, frontend, handle).await; + remove_client(server, capture_tx, emulate_tx, frontend, handle).await; Some(FrontendNotify::NotifyClientDelete(handle)) } FrontendEvent::Enumerate() => { @@ -147,8 +147,8 @@ async fn handle_frontend_event( FrontendEvent::UpdateClient(handle, hostname, port, pos) => { update_client( server, - producer_tx, - consumer_tx, + capture_tx, + emulate_tx, resolve_tx, (handle, hostname, port, pos), ) @@ -204,8 +204,8 @@ pub async fn add_client( pub async fn activate_client( server: &Server, - producer_notify_tx: &Sender, - consumer_notify_tx: &Sender, + capture_notify_tx: &Sender, + emulate_notify_tx: &Sender, client: ClientHandle, active: bool, ) { @@ -217,26 +217,28 @@ pub async fn activate_client( None => return, }; if active { - let _ = producer_notify_tx - .send(ProducerEvent::ClientEvent(ClientEvent::Create(client, pos))) + let _ = capture_notify_tx + .send(CaptureEvent::ClientEvent(ClientEvent::Create(client, pos))) .await; - let _ = consumer_notify_tx - .send(ConsumerEvent::ClientEvent(ClientEvent::Create(client, pos))) + let _ = emulate_notify_tx + .send(EmulationEvent::ClientEvent(ClientEvent::Create( + client, pos, + ))) .await; } else { - let _ = producer_notify_tx - .send(ProducerEvent::ClientEvent(ClientEvent::Destroy(client))) + let _ = capture_notify_tx + .send(CaptureEvent::ClientEvent(ClientEvent::Destroy(client))) .await; - let _ = consumer_notify_tx - .send(ConsumerEvent::ClientEvent(ClientEvent::Destroy(client))) + let _ = emulate_notify_tx + .send(EmulationEvent::ClientEvent(ClientEvent::Destroy(client))) .await; } } pub async fn remove_client( server: &Server, - producer_notify_tx: &Sender, - consumer_notify_tx: &Sender, + capture_notify_tx: &Sender, + emulate_notify_tx: &Sender, frontend: &mut FrontendListener, client: ClientHandle, ) -> Option { @@ -250,11 +252,11 @@ pub async fn remove_client( }; if active { - let _ = producer_notify_tx - .send(ProducerEvent::ClientEvent(ClientEvent::Destroy(client))) + let _ = capture_notify_tx + .send(CaptureEvent::ClientEvent(ClientEvent::Destroy(client))) .await; - let _ = consumer_notify_tx - .send(ConsumerEvent::ClientEvent(ClientEvent::Destroy(client))) + let _ = emulate_notify_tx + .send(EmulationEvent::ClientEvent(ClientEvent::Destroy(client))) .await; } @@ -268,8 +270,8 @@ pub async fn remove_client( async fn update_client( server: &Server, - producer_notify_tx: &Sender, - consumer_notify_tx: &Sender, + capture_notify_tx: &Sender, + emulate_notify_tx: &Sender, resolve_tx: &Sender, client_update: (ClientHandle, Option, u16, Position), ) { @@ -311,7 +313,7 @@ async fn update_client( ) }; - // update state in event consumer & producer + // update state in event input emulator & input capture if changed && active { // resolve dns if let Some(hostname) = hostname { @@ -319,17 +321,19 @@ async fn update_client( } // update state - let _ = producer_notify_tx - .send(ProducerEvent::ClientEvent(ClientEvent::Destroy(handle))) + let _ = capture_notify_tx + .send(CaptureEvent::ClientEvent(ClientEvent::Destroy(handle))) .await; - let _ = consumer_notify_tx - .send(ConsumerEvent::ClientEvent(ClientEvent::Destroy(handle))) + let _ = emulate_notify_tx + .send(EmulationEvent::ClientEvent(ClientEvent::Destroy(handle))) .await; - let _ = producer_notify_tx - .send(ProducerEvent::ClientEvent(ClientEvent::Create(handle, pos))) + let _ = capture_notify_tx + .send(CaptureEvent::ClientEvent(ClientEvent::Create(handle, pos))) .await; - let _ = consumer_notify_tx - .send(ConsumerEvent::ClientEvent(ClientEvent::Create(handle, pos))) + let _ = emulate_notify_tx + .send(EmulationEvent::ClientEvent(ClientEvent::Create( + handle, pos, + ))) .await; } } diff --git a/src/server/network_task.rs b/src/server/network_task.rs index f5e9774..f2413ba 100644 --- a/src/server/network_task.rs +++ b/src/server/network_task.rs @@ -84,7 +84,6 @@ fn send_event(sock: &UdpSocket, e: Event, addr: SocketAddr) -> Result { log::trace!("{:20} ------>->->-> {addr}", e.to_string()); let data: Vec = (&e).into(); // When udp blocks, we dont want to block the event loop. - // Dropping events is better than potentially crashing the event - // producer. + // Dropping events is better than potentially crashing the input capture. Ok(sock.try_send_to(&data, addr)?) } diff --git a/src/server/ping_task.rs b/src/server/ping_task.rs index 67adf0f..7f5918d 100644 --- a/src/server/ping_task.rs +++ b/src/server/ping_task.rs @@ -7,15 +7,15 @@ use tokio::{ use crate::{client::ClientHandle, event::Event}; -use super::{consumer_task::ConsumerEvent, producer_task::ProducerEvent, Server, State}; +use super::{capture_task::CaptureEvent, emulation_task::EmulationEvent, Server, State}; const MAX_RESPONSE_TIME: Duration = Duration::from_millis(500); pub fn new( server: Server, sender_ch: Sender<(Event, SocketAddr)>, - consumer_notify: Sender, - producer_notify: Sender, + emulate_notify: Sender, + capture_notify: Sender, mut timer_rx: Receiver<()>, ) -> JoinHandle<()> { // timer task @@ -114,14 +114,14 @@ pub fn new( if receiving { for c in unresponsive_clients { log::warn!("device not responding, releasing keys!"); - let _ = consumer_notify.send(ConsumerEvent::ReleaseKeys(c)).await; + let _ = emulate_notify.send(EmulationEvent::ReleaseKeys(c)).await; } } else { // release pointer if the active client has not responded if !unresponsive_clients.is_empty() { log::warn!("client not responding, releasing pointer!"); server.state.replace(State::Receiving); - let _ = producer_notify.send(ProducerEvent::Release).await; + let _ = capture_notify.send(CaptureEvent::Release).await; } } }