diff --git a/Cargo.lock b/Cargo.lock index 809f1e1..0ff3c93 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -464,6 +464,21 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "futures" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23342abe12aba583913b2e62f22225ff9c950774065e4bfb61a19cd9770fec40" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + [[package]] name = "futures-channel" version = "0.3.28" @@ -471,6 +486,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "955518d47e09b25bbebc7a18df10b81f0c766eaf4c4f1cccef2fca5f2a4fb5f2" dependencies = [ "futures-core", + "futures-sink", ] [[package]] @@ -540,10 +556,13 @@ version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "26b01e40b772d54cf6c6d721c1d1abd0647a0106a12ecaa1c186273392a69533" dependencies = [ + "futures-channel", "futures-core", + "futures-io", "futures-macro", "futures-sink", "futures-task", + "memchr", "pin-project-lite", "pin-utils", "slab", @@ -962,12 +981,15 @@ dependencies = [ "ashpd", "async-trait", "env_logger", + "futures", + "futures-core", "glib-build-tools", "gtk4", "libadwaita", "libc", "log", "memmap", + "reis", "serde", "serde_json", "tempfile", @@ -1427,6 +1449,16 @@ version = "0.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dbb5fb1acd8a1a18b3dd5be62d25485eb770e05afb408a9627d14d451bae12da" +[[package]] +name = "reis" +version = "0.1.0" +source = "git+https://github.com/ids1024/reis#62fbbd626305eb5affd5959214b9a9dd889360ed" +dependencies = [ + "futures-core", + "rustix 0.38.13", + "tokio", +] + [[package]] name = "resolv-conf" version = "0.7.0" diff --git a/Cargo.toml b/Cargo.toml index 2a0e1a7..958f17f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,6 +25,8 @@ libc = "0.2.148" serde_json = "1.0.107" tokio = {version = "1.32.0", features = ["io-util", "macros", "net", "rt", "sync", "signal"] } async-trait = "0.1.73" +futures-core = "0.3.28" +futures = "0.3.28" [target.'cfg(unix)'.dependencies] wayland-client = { version="0.30.2", optional = true } @@ -36,6 +38,7 @@ x11 = { version = "2.21.0", features = ["xlib", "xtest"], optional = true } gtk = { package = "gtk4", version = "0.7.2", features = ["v4_6"], optional = true } adw = { package = "libadwaita", version = "0.5.2", features = ["v1_1"], optional = true } ashpd = { version = "0.6.2", default-features = false, features = ["tokio"], optional = true } +reis = { git = "https://github.com/ids1024/reis", features = [ "tokio" ], optional = true } [target.'cfg(windows)'.dependencies] winapi = { version = "0.3.9", features = ["winuser"] } @@ -44,20 +47,9 @@ winapi = { version = "0.3.9", features = ["winuser"] } glib-build-tools = "0.18.0" [features] -default = [ - "wayland", - "x11", - "xdg_desktop_portal", - "libei", - "gtk", -] -wayland = [ - "dep:wayland-client", - "dep:wayland-protocols", - "dep:wayland-protocols-wlr", - "dep:wayland-protocols-misc", - "dep:wayland-protocols-plasma" ] -x11 = [ "dep:x11" ] +default = ["wayland", "x11", "xdg_desktop_portal", "libei", "gtk"] +wayland = ["dep:wayland-client", "dep:wayland-protocols", "dep:wayland-protocols-wlr", "dep:wayland-protocols-misc", "dep:wayland-protocols-plasma"] +x11 = ["dep:x11"] xdg_desktop_portal = ["dep:ashpd"] -libei = [] +libei = ["dep:reis", "dep:ashpd"] gtk = ["dep:gtk", "dep:adw"] diff --git a/README.md b/README.md index 931eda7..42cd8cc 100644 --- a/README.md +++ b/README.md @@ -20,10 +20,10 @@ on different operating systems: |---------------------------|--------------------------|--------------------------------------| | Wayland (wlroots) | :heavy_check_mark: | :heavy_check_mark: | | Wayland (KDE) | :heavy_check_mark: | :heavy_check_mark: | -| Wayland (Gnome) | TODO (libei support #33) | TODO (libei support #33) | +| Wayland (Gnome) | :heavy_check_mark: | WIP | | X11 | (WIP) | TODO | | Windows | (:heavy_check_mark:) | TODO | -| MacOS | TODO (I dont own a Mac) | TODO (I dont own a Mac) | +| MacOS | TODO | TODO | ## Build and Run Build in release mode: @@ -129,7 +129,7 @@ Where `left` can be either `left`, `right`, `top` or `bottom`. - [x] Liveness tracking (automatically ungrab mouse when client unreachable) - [ ] Clipboard support - [x] Graphical frontend (gtk?) -- [ ] *Encrytion* +- [ ] *Encryption* - [ ] Gnome Shell Extension (layer shell is not supported) - [ ] respect xdg-config-home for config file location. diff --git a/src/backend/consumer/libei.rs b/src/backend/consumer/libei.rs index db5e37c..6b9f0a9 100644 --- a/src/backend/consumer/libei.rs +++ b/src/backend/consumer/libei.rs @@ -1,18 +1,284 @@ -use crate::consumer::SyncConsumer; +use std::{os::{fd::{RawFd, FromRawFd}, unix::net::UnixStream}, collections::HashMap, time::{UNIX_EPOCH, SystemTime}, io}; -pub struct LibeiConsumer {} +use anyhow::{anyhow, Result}; +use futures::StreamExt; +use ashpd::desktop::remote_desktop::{RemoteDesktop, DeviceType}; +use async_trait::async_trait; + +use reis::{ei::{self, handshake::ContextType, keyboard::KeyState, button::ButtonState}, tokio::EiEventStream, PendingRequestResult}; + +use crate::{consumer::EventConsumer, event::Event, client::{ClientHandle, ClientEvent}}; + +pub struct LibeiConsumer { + handshake: bool, + context: ei::Context, + events: EiEventStream, + pointer: Option<(ei::Device, ei::Pointer)>, + has_pointer: bool, + scroll: Option<(ei::Device, ei::Scroll)>, + has_scroll: bool, + button: Option<(ei::Device, ei::Button)>, + has_button: bool, + keyboard: Option<(ei::Device, ei::Keyboard)>, + has_keyboard: bool, + capabilities: HashMap, + capability_mask: u64, + sequence: u32, + serial: u32, +} + +async fn get_ei_fd() -> Result { + let proxy = RemoteDesktop::new().await?; + let session = proxy.create_session().await?; + + // I HATE EVERYTHING, THIS TOOK 8 HOURS OF DEBUGGING + proxy.select_devices(&session, + DeviceType::Pointer | DeviceType::Keyboard |DeviceType::Touchscreen).await?; + + proxy.start(&session, &ashpd::WindowIdentifier::default()).await?.response()?; + proxy.connect_to_eis(&session).await +} impl LibeiConsumer { - pub fn new() -> Self { Self { } } -} - -impl SyncConsumer for LibeiConsumer { - fn consume(&mut self, _: crate::event::Event, _: crate::client::ClientHandle) { - log::error!("libei backend not yet implemented!"); - todo!() - } - - fn notify(&mut self, _: crate::client::ClientEvent) { - todo!() + pub async fn new() -> Result { + // fd is owned by the message, so we need to dup it + let eifd = get_ei_fd().await?; + let eifd = unsafe { + let ret = libc::dup(eifd); + if ret < 0 { + Err(io::Error::last_os_error()) + } else { + Ok(ret) + } + }?; + let stream = unsafe { UnixStream::from_raw_fd(eifd) }; + // let stream = UnixStream::connect("/run/user/1000/eis-0")?; + stream.set_nonblocking(true)?; + let context = ei::Context::new(stream)?; + context.flush()?; + let events = EiEventStream::new(context.clone())?; + return Ok(Self { + handshake: false, + context, events, + pointer: None, + button: None, + scroll: None, + keyboard: None, + has_pointer: false, + has_button: false, + has_scroll: false, + has_keyboard: false, + capabilities: HashMap::new(), + capability_mask: 0, + sequence: 0, + serial: 0, + }) } } + +#[async_trait] +impl EventConsumer for LibeiConsumer { + async fn consume(&mut self, event: Event, _client_handle: ClientHandle) { + let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_micros() as u64; + match event { + Event::Pointer(p) => match p { + crate::event::PointerEvent::Motion { time:_, relative_x, relative_y } => { + if !self.has_pointer { return } + if let Some((d, p)) = self.pointer.as_mut() { + p.motion_relative(relative_x as f32, relative_y as f32); + d.frame(self.serial, now); + } + }, + crate::event::PointerEvent::Button { time: _, button, state } => { + if !self.has_button { return } + if let Some((d, b)) = self.button.as_mut() { + b.button(button, match state { 0 => ButtonState::Released, _ => ButtonState::Press }); + d.frame(self.serial, now); + } + }, + crate::event::PointerEvent::Axis { time: _, axis, value } => { + if !self.has_scroll { return } + if let Some((d, s)) = self.scroll.as_mut() { + match axis { + 0 => s.scroll(0., value as f32), + _ => s.scroll(value as f32, 0.), + } + d.frame(self.serial, now); + } + }, + crate::event::PointerEvent::Frame { } => {}, + }, + Event::Keyboard(k) => match k { + crate::event::KeyboardEvent::Key { time: _, key, state } => { + if !self.has_keyboard { return } + if let Some((d, k)) = &mut self.keyboard { + k.key(key, match state { 0 => KeyState::Released, _ => KeyState::Press }); + d.frame(self.serial, now); + } + }, + crate::event::KeyboardEvent::Modifiers { .. } => { }, + }, + _ => {} + } + self.context.flush().unwrap(); + } + + async fn dispatch(&mut self) -> Result<()> { + let event = match self.events.next().await { + Some(e) => e?, + None => return Err(anyhow!("libei connection lost")), + }; + let event = match event { + PendingRequestResult::Request(result) => result, + PendingRequestResult::ProtocolError(e) => return Err(anyhow!("libei protocol violation: {e}")), + PendingRequestResult::InvalidObject(e) => return Err(anyhow!("invalid object {e}")), + }; + match event { + ei::Event::Handshake(handshake, request) => match request { + ei::handshake::Event::HandshakeVersion { version } => { + if self.handshake { return Ok(()) } + log::info!("libei version {}", version); + // sender means we are sending events _to_ the eis server + handshake.handshake_version(version); // FIXME + handshake.context_type(ContextType::Sender); + handshake.name("ei-demo-client"); + handshake.interface_version("ei_connection", 1); + handshake.interface_version("ei_callback", 1); + handshake.interface_version("ei_pingpong", 1); + handshake.interface_version("ei_seat", 1); + handshake.interface_version("ei_device", 2); + handshake.interface_version("ei_pointer", 1); + handshake.interface_version("ei_pointer_absolute", 1); + handshake.interface_version("ei_scroll", 1); + handshake.interface_version("ei_button", 1); + handshake.interface_version("ei_keyboard", 1); + handshake.interface_version("ei_touchscreen", 1); + handshake.finish(); + self.handshake = true; + } + ei::handshake::Event::InterfaceVersion { name, version } => { + log::debug!("handshake: Interface {name} @ {version}"); + } + ei::handshake::Event::Connection { serial, connection } => { + connection.sync(1); + self.serial = serial; + } + _ => unreachable!() + } + ei::Event::Connection(_connection, request) => match request { + ei::connection::Event::Seat { seat } => { + log::debug!("connected to seat: {seat:?}"); + } + ei::connection::Event::Ping { ping } => { + ping.done(0); + } + ei::connection::Event::Disconnected { last_serial: _, reason, explanation } => { + log::debug!("ei - disconnected: reason: {reason:?}: {explanation}") + } + ei::connection::Event::InvalidObject { last_serial, invalid_id } => { + return Err(anyhow!("invalid object: id: {invalid_id}, serial: {last_serial}")); + } + _ => unreachable!() + } + ei::Event::Device(device, request) => match request { + ei::device::Event::Destroyed { serial } => { log::debug!("device destroyed: {device:?} - serial: {serial}") }, + ei::device::Event::Name { name } => {log::debug!("device name: {name}")}, + ei::device::Event::DeviceType { device_type } => log::debug!("device type: {device_type:?}"), + ei::device::Event::Dimensions { width, height } => log::debug!("device dimensions: {width}x{height}"), + ei::device::Event::Region { offset_x, offset_y, width, hight, scale } => log::debug!("device region: {width}x{hight} @ ({offset_x},{offset_y}), scale: {scale}"), + ei::device::Event::Interface { object } => { + log::debug!("device interface: {object:?}"); + if object.interface().eq("ei_pointer") { + log::debug!("GOT POINTER DEVICE"); + self.pointer.replace((device, object.downcast().unwrap())); + } else if object.interface().eq("ei_button") { + log::debug!("GOT BUTTON DEVICE"); + self.button.replace((device, object.downcast().unwrap())); + } else if object.interface().eq("ei_scroll") { + log::debug!("GOT SCROLL DEVICE"); + self.scroll.replace((device, object.downcast().unwrap())); + } else if object.interface().eq("ei_keyboard") { + log::debug!("GOT KEYBOARD DEVICE"); + self.keyboard.replace((device, object.downcast().unwrap())); + } + } + ei::device::Event::Done => { + log::debug!("device: done {device:?}"); + }, + ei::device::Event::Resumed { serial } => { + self.serial = serial; + device.start_emulating(serial, self.sequence); + self.sequence += 1; + log::debug!("resumed: {device:?}"); + if let Some((d,_)) = &mut self.pointer { + if d == &device { + log::debug!("pointer resumed {serial}"); + self.has_pointer = true; + } + } + if let Some((d,_)) = &mut self.button { + if d == &device { + log::debug!("button resumed {serial}"); + self.has_button = true; + } + } + if let Some((d,_)) = &mut self.scroll { + if d == &device { + log::debug!("scroll resumed {serial}"); + self.has_scroll = true; + } + } + if let Some((d,_)) = &mut self.keyboard { + if d == &device { + log::debug!("keyboard resumed {serial}"); + self.has_keyboard = true; + } + } + } + ei::device::Event::Paused { serial } => { + self.has_pointer = false; + self.has_button = false; + self.serial = serial; + }, + ei::device::Event::StartEmulating { serial, sequence } => log::debug!("start emulating {serial}, {sequence}"), + ei::device::Event::StopEmulating { serial } => log::debug!("stop emulating {serial}"), + ei::device::Event::Frame { serial, timestamp } => { + log::debug!("frame: {serial}, {timestamp}"); + } + ei::device::Event::RegionMappingId { mapping_id } => log::debug!("RegionMappingId {mapping_id}"), + e => log::debug!("invalid event: {e:?}"), + } + ei::Event::Seat(seat, request) => match request { + ei::seat::Event::Destroyed { serial } => { + self.serial = serial; + log::debug!("seat destroyed: {seat:?}"); + }, + ei::seat::Event::Name { name } => { + log::debug!("seat name: {name}"); + }, + ei::seat::Event::Capability { mask, interface } => { + log::debug!("seat capabilities: {mask}, interface: {interface:?}"); + self.capabilities.insert(interface, mask); + self.capability_mask |= mask; + }, + ei::seat::Event::Done => { + log::debug!("seat done"); + log::debug!("binding capabilities: {}", self.capability_mask); + seat.bind(self.capability_mask); + }, + ei::seat::Event::Device { device } => { + log::debug!("seat: new device - {device:?}"); + }, + _ => todo!(), + } + e => log::debug!("unhandled event: {e:?}"), + } + self.context.flush()?; + Ok(()) + } + + async fn notify(&mut self, _client_event: ClientEvent) {} + + async fn destroy(&mut self) {} +} + diff --git a/src/backend/consumer/windows.rs b/src/backend/consumer/windows.rs index 04a3b4b..ddd4e25 100644 --- a/src/backend/consumer/windows.rs +++ b/src/backend/consumer/windows.rs @@ -1,4 +1,6 @@ -use crate::{event::{KeyboardEvent, PointerEvent}, consumer::SyncConsumer}; +use anyhow::Result; +use async_trait::async_trait; +use crate::{event::{KeyboardEvent, PointerEvent}, consumer::EventConsumer}; use winapi::{ self, um::winuser::{INPUT, INPUT_MOUSE, LPINPUT, MOUSEEVENTF_MOVE, MOUSEINPUT, @@ -25,8 +27,9 @@ impl WindowsConsumer { pub fn new() -> Self { Self { } } } -impl SyncConsumer for WindowsConsumer { - fn consume(&mut self, event: Event, _: ClientHandle) { +#[async_trait] +impl EventConsumer for WindowsConsumer { + async fn consume(&mut self, event: Event, _: ClientHandle) { match event { Event::Pointer(pointer_event) => match pointer_event { PointerEvent::Motion { @@ -48,9 +51,11 @@ impl SyncConsumer for WindowsConsumer { } } - fn notify(&mut self, _: ClientEvent) { + async fn notify(&mut self, _: ClientEvent) { // nothing to do } + + async fn destroy(&mut self) {} } fn send_mouse_input(mi: MOUSEINPUT) { diff --git a/src/backend/consumer/wlroots.rs b/src/backend/consumer/wlroots.rs index 7aa6a25..bdd5ace 100644 --- a/src/backend/consumer/wlroots.rs +++ b/src/backend/consumer/wlroots.rs @@ -1,7 +1,8 @@ +use async_trait::async_trait; use wayland_client::WEnum; use wayland_client::backend::WaylandError; use crate::client::{ClientHandle, ClientEvent}; -use crate::consumer::SyncConsumer; +use crate::consumer::EventConsumer; use std::collections::HashMap; use std::io; use std::os::fd::OwnedFd; @@ -140,8 +141,9 @@ impl State { } } -impl SyncConsumer for WlrootsConsumer { - fn consume(&mut self, event: Event, client_handle: ClientHandle) { +#[async_trait] +impl EventConsumer for WlrootsConsumer { + async fn consume(&mut self, event: Event, client_handle: ClientHandle) { if let Some(virtual_input) = self.state.input_for_client.get(&client_handle) { if self.last_flush_failed { if let Err(WaylandError::Io(e)) = self.queue.flush() { @@ -175,7 +177,7 @@ impl SyncConsumer for WlrootsConsumer { } } - fn notify(&mut self, client_event: ClientEvent) { + async fn notify(&mut self, client_event: ClientEvent) { if let ClientEvent::Create(client, _) = client_event { self.state.add_client(client); if let Err(e) = self.queue.flush() { @@ -183,6 +185,8 @@ impl SyncConsumer for WlrootsConsumer { } } } + + async fn destroy(&mut self) { } } diff --git a/src/backend/consumer/x11.rs b/src/backend/consumer/x11.rs index 768b37a..7b5482c 100644 --- a/src/backend/consumer/x11.rs +++ b/src/backend/consumer/x11.rs @@ -1,15 +1,18 @@ use std::ptr; +use async_trait::async_trait; use x11::{xlib, xtest}; use crate::{ client::ClientHandle, - event::Event, consumer::SyncConsumer, + event::Event, consumer::EventConsumer, }; pub struct X11Consumer { display: *mut xlib::Display, } +unsafe impl Send for X11Consumer {} + impl X11Consumer { pub fn new() -> Self { let display = unsafe { @@ -30,8 +33,9 @@ impl X11Consumer { } } -impl SyncConsumer for X11Consumer { - fn consume(&mut self, event: Event, _: ClientHandle) { +#[async_trait] +impl EventConsumer for X11Consumer { + async fn consume(&mut self, event: Event, _: ClientHandle) { match event { Event::Pointer(pointer_event) => match pointer_event { crate::event::PointerEvent::Motion { @@ -50,8 +54,10 @@ impl SyncConsumer for X11Consumer { } } - fn notify(&mut self, _: crate::client::ClientEvent) { + async fn notify(&mut self, _: crate::client::ClientEvent) { // for our purposes it does not matter what client sent the event } + + async fn destroy(&mut self) {} } diff --git a/src/backend/consumer/xdg_desktop_portal.rs b/src/backend/consumer/xdg_desktop_portal.rs index 0c716c2..ea47173 100644 --- a/src/backend/consumer/xdg_desktop_portal.rs +++ b/src/backend/consumer/xdg_desktop_portal.rs @@ -2,7 +2,7 @@ use async_trait::async_trait; use anyhow::Result; use ashpd::{desktop::{remote_desktop::{RemoteDesktop, DeviceType, KeyState, Axis}, Session}, WindowIdentifier}; -use crate::consumer::AsyncConsumer; +use crate::consumer::EventConsumer; pub struct DesktopPortalConsumer<'a> { proxy: RemoteDesktop<'a>, @@ -27,7 +27,7 @@ impl<'a> DesktopPortalConsumer<'a> { } #[async_trait] -impl<'a> AsyncConsumer for DesktopPortalConsumer<'a> { +impl<'a> EventConsumer for DesktopPortalConsumer<'a> { async fn consume(&mut self, event: crate::event::Event, _client: crate::client::ClientHandle) { match event { crate::event::Event::Pointer(p) => { diff --git a/src/backend/producer.rs b/src/backend/producer.rs index 9ca2662..2f9f736 100644 --- a/src/backend/producer.rs +++ b/src/backend/producer.rs @@ -1,3 +1,5 @@ +#[cfg(all(unix, feature = "libei"))] +pub mod libei; #[cfg(all(unix, feature = "wayland"))] pub mod wayland; #[cfg(windows)] diff --git a/src/backend/producer/libei.rs b/src/backend/producer/libei.rs new file mode 100644 index 0000000..19a7ff8 --- /dev/null +++ b/src/backend/producer/libei.rs @@ -0,0 +1,29 @@ +use std::{error::Error, io, result::Result, task::Poll}; + +use futures_core::Stream; + +use crate::{producer::EventProducer, event::Event, client::ClientHandle}; + +pub struct LibeiProducer {} + +impl LibeiProducer { + pub fn new() -> Result> { + Ok(Self { }) + } +} + +impl EventProducer for LibeiProducer { + fn notify(&mut self, _event: crate::client::ClientEvent) { + } + + fn release(&mut self) { + } +} + +impl Stream for LibeiProducer { + type Item = io::Result<(ClientHandle, Event)>; + + fn poll_next(self: std::pin::Pin<&mut Self>, _cx: &mut std::task::Context<'_>) -> std::task::Poll> { + Poll::Pending + } +} diff --git a/src/backend/producer/wayland.rs b/src/backend/producer/wayland.rs index 346772a..04155a3 100644 --- a/src/backend/producer/wayland.rs +++ b/src/backend/producer/wayland.rs @@ -1,6 +1,7 @@ use crate::{client::{ClientHandle, Position, ClientEvent}, producer::EventProducer}; -use std::{os::fd::RawFd, vec::Drain, io::{ErrorKind, self}, env}; +use std::{os::fd::{OwnedFd, RawFd}, io::{ErrorKind, self}, env, pin::Pin, task::{Context, Poll, ready}, collections::VecDeque}; +use futures_core::Stream; use memmap::MmapOptions; use anyhow::{anyhow, Result}; use tokio::io::unix::AsyncFd; @@ -83,18 +84,26 @@ struct State { client_for_window: Vec<(Rc, ClientHandle)>, focused: Option<(Rc, ClientHandle)>, g: Globals, - wayland_fd: RawFd, + wayland_fd: OwnedFd, read_guard: Option, qh: QueueHandle, - pending_events: Vec<(ClientHandle, Event)>, + pending_events: VecDeque<(ClientHandle, Event)>, output_info: Vec<(WlOutput, OutputInfo)>, } -pub struct WaylandEventProducer { +struct Inner { state: State, queue: EventQueue, } +impl AsRawFd for Inner { + fn as_raw_fd(&self) -> RawFd { + self.state.wayland_fd.as_raw_fd() + } +} + +pub struct WaylandEventProducer(AsyncFd); + struct Window { buffer: wl_buffer::WlBuffer, surface: wl_surface::WlSurface, @@ -280,7 +289,7 @@ impl WaylandEventProducer { // prepare reading wayland events let read_guard = queue.prepare_read()?; - let wayland_fd = read_guard.connection_fd().as_raw_fd(); + let wayland_fd = read_guard.connection_fd().try_clone_to_owned().unwrap(); std::mem::drop(read_guard); let mut state = State { @@ -293,7 +302,7 @@ impl WaylandEventProducer { qh, wayland_fd, read_guard: None, - pending_events: vec![], + pending_events: VecDeque::new(), output_info: vec![], }; @@ -321,7 +330,9 @@ impl WaylandEventProducer { let read_guard = queue.prepare_read()?; state.read_guard = Some(read_guard); - Ok(WaylandEventProducer { queue, state }) + let inner = AsyncFd::new(Inner { queue, state })?; + + Ok(WaylandEventProducer(inner)) } } @@ -421,13 +432,7 @@ impl State { } } -impl AsRawFd for WaylandEventProducer { - fn as_raw_fd(&self) -> RawFd { - self.state.wayland_fd - } -} - -impl WaylandEventProducer { +impl Inner { fn read(&mut self) -> bool { match self.state.read_guard.take().unwrap().read() { Ok(_) => true, @@ -491,56 +496,80 @@ impl WaylandEventProducer { impl EventProducer for WaylandEventProducer { - fn get_async_fd(&self) -> io::Result> { - AsyncFd::new(self.as_raw_fd()) - } - - fn read_events(&mut self) -> Drain<(ClientHandle, Event)> { - // read events - while self.read() { - // prepare next read - self.prepare_read(); - } - // dispatch the events - self.dispatch_events(); - - // flush outgoing events - self.flush_events(); - - // prepare for the next read - self.prepare_read(); - - // return the events - self.state.pending_events.drain(..) - } - fn notify(&mut self, client_event: ClientEvent) { match client_event { ClientEvent::Create(handle, pos) => { - self.state.add_client(handle, pos); + self.0.get_mut().state.add_client(handle, pos); } ClientEvent::Destroy(handle) => { + let inner = self.0.get_mut(); loop { // remove all windows corresponding to this client - if let Some(i) = self + if let Some(i) = inner .state .client_for_window .iter() .position(|(_,c)| *c == handle) { - self.state.client_for_window.remove(i); - self.state.focused = None; + inner.state.client_for_window.remove(i); + inner.state.focused = None; } else { break } } } } - self.flush_events(); + let inner = self.0.get_mut(); + inner.flush_events(); } fn release(&mut self) { - self.state.ungrab(); - self.flush_events(); + let inner = self.0.get_mut(); + inner.state.ungrab(); + inner.flush_events(); + } +} + +impl Stream for WaylandEventProducer { + type Item = io::Result<(ClientHandle, Event)>; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + log::trace!("producer.next()"); + if let Some(event) = self.0.get_mut().state.pending_events.pop_front() { + return Poll::Ready(Some(Ok(event))); + } + + loop { + let mut guard = ready!(self.0.poll_read_ready_mut(cx))?; + + { + let inner = guard.get_inner_mut(); + + // read events + while inner.read() { + // prepare next read + inner.prepare_read(); + } + + // dispatch the events + inner.dispatch_events(); + + // flush outgoing events + inner.flush_events(); + + // prepare for the next read + inner.prepare_read(); + } + + // clear read readiness for tokio read guard + // guard.clear_ready_matching(Ready::READABLE); + guard.clear_ready(); + + // if an event has been queued during dispatch_events() we return it + match guard.get_inner_mut().state.pending_events.pop_front() { + Some(event) => return Poll::Ready(Some(Ok(event))), + None => continue, + } + } } } @@ -601,7 +630,7 @@ impl Dispatch for State { .iter() .find(|(w, _c)| w.surface == surface) .unwrap(); - app.pending_events.push((*client, Event::Release())); + app.pending_events.push_back((*client, Event::Release())); } wl_pointer::Event::Leave { .. } => { app.ungrab(); @@ -613,7 +642,7 @@ impl Dispatch for State { state, } => { let (_, client) = app.focused.as_ref().unwrap(); - app.pending_events.push(( + app.pending_events.push_back(( *client, Event::Pointer(PointerEvent::Button { time, @@ -624,7 +653,7 @@ impl Dispatch for State { } wl_pointer::Event::Axis { time, axis, value } => { let (_, client) = app.focused.as_ref().unwrap(); - app.pending_events.push(( + app.pending_events.push_back(( *client, Event::Pointer(PointerEvent::Axis { time, @@ -664,7 +693,7 @@ impl Dispatch for State { state, } => { if let Some(client) = client { - app.pending_events.push(( + app.pending_events.push_back(( *client, Event::Keyboard(KeyboardEvent::Key { time, @@ -682,7 +711,7 @@ impl Dispatch for State { group, } => { if let Some(client) = client { - app.pending_events.push(( + app.pending_events.push_back(( *client, Event::Keyboard(KeyboardEvent::Modifiers { mods_depressed, @@ -731,7 +760,7 @@ impl Dispatch for State { { if let Some((_window, client)) = &app.focused { let time = (((utime_hi as u64) << 32 | utime_lo as u64) / 1000) as u32; - app.pending_events.push(( + app.pending_events.push_back(( *client, Event::Pointer(PointerEvent::Motion { time, diff --git a/src/backend/producer/windows.rs b/src/backend/producer/windows.rs index 27de5ec..6d3cac8 100644 --- a/src/backend/producer/windows.rs +++ b/src/backend/producer/windows.rs @@ -1,4 +1,7 @@ -use tokio::sync::mpsc::{self, Receiver, Sender}; +use std::io::Result; +use std::pin::Pin; +use futures::Stream; +use core::task::{Context, Poll}; use crate::{ client::{ClientHandle, ClientEvent}, @@ -6,25 +9,23 @@ use crate::{ producer::EventProducer, }; -pub struct WindowsProducer { - _tx: Sender<(ClientHandle, Event)>, - rx: Option>, -} +pub struct WindowsProducer { } impl EventProducer for WindowsProducer { fn notify(&mut self, _: ClientEvent) { } fn release(&mut self) { } - - fn get_wait_channel(&mut self) -> Option> { - self.rx.take() - } } impl WindowsProducer { pub(crate) fn new() -> Self { - let (_tx, rx) = mpsc::channel(1); - let rx = Some(rx); - Self { _tx, rx } + Self {} + } +} + +impl Stream for WindowsProducer { + type Item = Result<(ClientHandle, Event)>; + fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Poll::Pending } } diff --git a/src/backend/producer/x11.rs b/src/backend/producer/x11.rs index 379534a..6de4382 100644 --- a/src/backend/producer/x11.rs +++ b/src/backend/producer/x11.rs @@ -1,42 +1,32 @@ -use std::io::Result; -use std::os::fd::{AsRawFd, self}; -use std::vec::Drain; +use std::io; +use std::task::Poll; -use tokio::io::unix::AsyncFd; + +use futures_core::Stream; use crate::event::Event; use crate::producer::EventProducer; use crate::client::{ClientEvent, ClientHandle}; -pub struct X11Producer { - pending_events: Vec<(ClientHandle, Event)>, -} +pub struct X11Producer { } impl X11Producer { pub fn new() -> Self { - Self { - pending_events: vec![], - } - } -} - -impl AsRawFd for X11Producer { - fn as_raw_fd(&self) -> fd::RawFd { - todo!() + Self { } } } impl EventProducer for X11Producer { fn notify(&mut self, _: ClientEvent) { } - fn read_events(&mut self) -> Drain<(ClientHandle, Event)> { - self.pending_events.drain(..) - } - fn release(&mut self) {} +} - fn get_async_fd(&self) -> Result> { - todo!() +impl Stream for X11Producer { + type Item = io::Result<(ClientHandle, Event)>; + + fn poll_next(self: std::pin::Pin<&mut Self>, _cx: &mut std::task::Context<'_>) -> std::task::Poll> { + Poll::Pending } } diff --git a/src/consumer.rs b/src/consumer.rs index 882faab..eafb938 100644 --- a/src/consumer.rs +++ b/src/consumer.rs @@ -1,3 +1,4 @@ +use std::future; use async_trait::async_trait; #[cfg(unix)] @@ -15,29 +16,22 @@ enum Backend { Libei, } -pub enum EventConsumer { - Sync(Box), - Async(Box), -} - -pub trait SyncConsumer { - /// Event corresponding to an abstract `client_handle` - fn consume(&mut self, event: Event, client_handle: ClientHandle); - - /// Event corresponding to a configuration change - fn notify(&mut self, client_event: ClientEvent); -} - #[async_trait] -pub trait AsyncConsumer { +pub trait EventConsumer: Send { async fn consume(&mut self, event: Event, client_handle: ClientHandle); async fn notify(&mut self, client_event: ClientEvent); + /// this function is waited on continuously and can be used to handle events + async fn dispatch(&mut self) -> Result<()> { + let _: () = future::pending().await; + Ok(()) + } + async fn destroy(&mut self); } -pub async fn create() -> Result { +pub async fn create() -> Result> { #[cfg(windows)] - return Ok(EventConsumer::Sync(Box::new(consumer::windows::WindowsConsumer::new()))); + return Ok(Box::new(consumer::windows::WindowsConsumer::new())); #[cfg(unix)] let backend = match env::var("XDG_SESSION_TYPE") { @@ -50,8 +44,8 @@ pub async fn create() -> Result { log::info!("XDG_SESSION_TYPE = wayland -> using wayland event consumer"); match env::var("XDG_CURRENT_DESKTOP") { Ok(current_desktop) => match current_desktop.as_str() { - "gnome" => { - log::info!("XDG_CURRENT_DESKTOP = gnome -> using libei backend"); + "GNOME" => { + log::info!("XDG_CURRENT_DESKTOP = GNOME -> using libei backend"); Backend::Libei } "KDE" => { @@ -89,25 +83,25 @@ pub async fn create() -> Result { #[cfg(not(feature = "libei"))] panic!("feature libei not enabled"); #[cfg(feature = "libei")] - Ok(EventConsumer::Sync(Box::new(consumer::libei::LibeiConsumer::new()))) + Ok(Box::new(consumer::libei::LibeiConsumer::new().await?)) }, Backend::RemoteDesktopPortal => { #[cfg(not(feature = "xdg_desktop_portal"))] panic!("feature xdg_desktop_portal not enabled"); #[cfg(feature = "xdg_desktop_portal")] - Ok(EventConsumer::Async(Box::new(consumer::xdg_desktop_portal::DesktopPortalConsumer::new().await?))) + Ok(Box::new(consumer::xdg_desktop_portal::DesktopPortalConsumer::new().await?)) }, Backend::Wlroots => { #[cfg(not(feature = "wayland"))] panic!("feature wayland not enabled"); #[cfg(feature = "wayland")] - Ok(EventConsumer::Sync(Box::new(consumer::wlroots::WlrootsConsumer::new()?))) + Ok(Box::new(consumer::wlroots::WlrootsConsumer::new()?)) }, Backend::X11 => { #[cfg(not(feature = "x11"))] panic!("feature x11 not enabled"); #[cfg(feature = "x11")] - Ok(EventConsumer::Sync(Box::new(consumer::x11::X11Consumer::new()))) + Ok(Box::new(consumer::x11::X11Consumer::new())) }, } } diff --git a/src/event/server.rs b/src/event/server.rs index a4997e4..e17bf66 100644 --- a/src/event/server.rs +++ b/src/event/server.rs @@ -1,6 +1,7 @@ use std::{error::Error, io::Result, collections::HashSet, time::{Duration, Instant}, net::IpAddr}; use log; use tokio::{net::UdpSocket, io::ReadHalf, signal, sync::mpsc::{Sender, Receiver}}; +use futures::stream::StreamExt; #[cfg(unix)] use tokio::net::UnixStream; @@ -11,6 +12,7 @@ use tokio::net::TcpStream; use std::{net::SocketAddr, io::ErrorKind}; use crate::{client::{ClientEvent, ClientManager, Position, ClientHandle}, consumer::EventConsumer, producer::EventProducer, frontend::{FrontendEvent, FrontendListener, FrontendNotify, self}, dns::{self, DnsResolver}}; +// use crate::event::PointerEvent; use super::Event; /// keeps track of state to prevent a feedback loop @@ -26,7 +28,7 @@ pub struct Server { client_manager: ClientManager, state: State, frontend: FrontendListener, - consumer: EventConsumer, + consumer: Box, producer: Box, socket: UdpSocket, frontend_rx: Receiver, @@ -37,7 +39,7 @@ impl Server { pub async fn new( port: u16, frontend: FrontendListener, - consumer: EventConsumer, + consumer: Box, producer: Box, ) -> anyhow::Result { @@ -64,92 +66,64 @@ impl Server { }) } - pub async fn run(&mut self) -> Result<()> { + pub async fn run(&mut self) -> anyhow::Result<()> { - #[cfg(unix)] - let producer_fd = self.producer.get_async_fd()?; - - #[cfg(unix)] loop { + log::trace!("polling ..."); tokio::select! { + // safety: cancellation safe udp_event = receive_event(&self.socket) => { + log::trace!("-> receive_event"); match udp_event { Ok(e) => self.handle_udp_rx(e).await, Err(e) => log::error!("error reading event: {e}"), } } - read_guard = producer_fd.readable() => { - let mut guard = match read_guard { - Ok(g) => g, - Err(e) => { - log::error!("wayland_fd read_guard: {e}"); - continue - } - }; - self.handle_producer_rx().await; - guard.clear_ready_matching(tokio::io::Ready::READABLE); + // safety: cancellation safe + res = self.producer.next() => { + log::trace!("-> producer.next()"); + match res { + Some(Ok((client, event))) => { + self.handle_producer_event(client,event).await; + }, + Some(Err(e)) => log::error!("{e}"), + _ => break, + } } + // safety: cancellation safe stream = self.frontend.accept() => { + log::trace!("-> frontend.accept()"); match stream { Ok(s) => self.handle_frontend_stream(s).await, Err(e) => log::error!("error connecting to frontend: {e}"), } } + // safety: cancellation safe frontend_event = self.frontend_rx.recv() => { + log::trace!("-> frontend.recv()"); if let Some(event) = frontend_event { if self.handle_frontend_event(event).await { break; } } } + // safety: cancellation safe + e = self.consumer.dispatch() => { + log::trace!("-> consumer.dispatch()"); + if let Err(e) = e { + return Err(e); + } + } + // safety: cancellation safe _ = signal::ctrl_c() => { log::info!("terminating gracefully ..."); break; } } } - - #[cfg(windows)] - let mut channel = self.producer.get_wait_channel().unwrap(); - - #[cfg(windows)] - loop { - tokio::select! { - udp_event = receive_event(&self.socket) => { - match udp_event { - Ok(e) => self.handle_udp_rx(e).await, - Err(e) => log::error!("error reading event: {e}"), - } - } - event = channel.recv() => { - if let Some((c,e)) = event { - self.handle_producer_event(c,e).await; - } - } - stream = self.frontend.accept() => { - match stream { - Ok(s) => self.handle_frontend_stream(s).await, - Err(e) => log::error!("error connecting to frontend: {e}"), - } - } - frontend_event = self.frontend_rx.recv() => { - if let Some(event) = frontend_event { - if self.handle_frontend_event(event).await { - break; - } - } - } - _ = signal::ctrl_c() => { - log::info!("terminating gracefully ..."); - break; - } - } - } - + // destroy consumer - if let EventConsumer::Async(c) = &mut self.consumer { - c.destroy().await; - } + self.consumer.destroy().await; Ok(()) } @@ -182,26 +156,17 @@ impl Server { state.active = active; if state.active { self.producer.notify(ClientEvent::Create(client, state.client.pos)); - match &mut self.consumer { - EventConsumer::Sync(consumer) => consumer.notify(ClientEvent::Create(client, state.client.pos)), - EventConsumer::Async(consumer) => consumer.notify(ClientEvent::Create(client, state.client.pos)).await, - } + self.consumer.notify(ClientEvent::Create(client, state.client.pos)).await; } else { self.producer.notify(ClientEvent::Destroy(client)); - match &mut self.consumer { - EventConsumer::Sync(consumer) => consumer.notify(ClientEvent::Destroy(client)), - EventConsumer::Async(consumer) => consumer.notify(ClientEvent::Destroy(client)).await, - } + self.consumer.notify(ClientEvent::Destroy(client)).await; } } } pub async fn remove_client(&mut self, client: ClientHandle) -> Option { self.producer.notify(ClientEvent::Destroy(client)); - match &mut self.consumer { - EventConsumer::Sync(consumer) => consumer.notify(ClientEvent::Destroy(client)), - EventConsumer::Async(consumer) => consumer.notify(ClientEvent::Destroy(client)).await, - } + self.consumer.notify(ClientEvent::Destroy(client)).await; if let Some(client) = self.client_manager.remove_client(client).map(|s| s.client.handle) { let notify = FrontendNotify::NotifyClientDelete(client); log::debug!("{notify:?}"); @@ -230,15 +195,9 @@ impl Server { state.client.pos = pos; if state.active { self.producer.notify(ClientEvent::Destroy(client)); - match &mut self.consumer { - EventConsumer::Sync(consumer) => consumer.notify(ClientEvent::Destroy(client)), - EventConsumer::Async(consumer) => consumer.notify(ClientEvent::Destroy(client)).await, - } + self.consumer.notify(ClientEvent::Destroy(client)).await; self.producer.notify(ClientEvent::Create(client, pos)); - match &mut self.consumer { - EventConsumer::Sync(consumer) => consumer.notify(ClientEvent::Create(client, pos)), - EventConsumer::Async(consumer) => consumer.notify(ClientEvent::Create(client, pos)).await, - } + self.consumer.notify(ClientEvent::Create(client, pos)).await; } // update port @@ -322,10 +281,7 @@ impl Server { } State::Receiving => { // consume event - match &mut self.consumer { - EventConsumer::Sync(consumer) => consumer.consume(event, handle), - EventConsumer::Async(consumer) => consumer.consume(event, handle).await, - } + self.consumer.consume(event, handle).await; // let the server know we are still alive once every second let last_replied = state.last_replied; @@ -342,14 +298,6 @@ impl Server { } } - #[cfg(unix)] - async fn handle_producer_rx(&mut self) { - let events: Vec<(ClientHandle, Event)> = self.producer.read_events().collect(); - for (c,e) in events.into_iter() { - self.handle_producer_event(c,e).await; - } - } - async fn handle_producer_event(&mut self, c: ClientHandle, e: Event) { let mut should_release = false; // in receiving state, only release events @@ -498,6 +446,7 @@ impl Server { } async fn receive_event(socket: &UdpSocket) -> std::result::Result<(Event, SocketAddr), Box> { + log::trace!("receive_event"); let mut buf = vec![0u8; 22]; match socket.recv_from(&mut buf).await { Ok((_amt, src)) => Ok((Event::try_from(buf)?, src)), diff --git a/src/frontend.rs b/src/frontend.rs index ac7baa5..a594fb1 100644 --- a/src/frontend.rs +++ b/src/frontend.rs @@ -97,6 +97,7 @@ impl FrontendListener { #[cfg(unix)] pub async fn accept(&mut self) -> Result> { + log::trace!("frontend.accept()"); let stream = self.listener.accept().await?.0; let (rx, tx) = tokio::io::split(stream); diff --git a/src/main.rs b/src/main.rs index d780bd5..33900d3 100644 --- a/src/main.rs +++ b/src/main.rs @@ -52,6 +52,7 @@ pub fn run() -> Result<(), Box> { // start sending and receiving events let mut event_server = Server::new(config.port, frontend_adapter, consumer, producer).await?; + log::debug!("created server"); // add clients from config for (c,h,port,p) in config.get_clients().into_iter() { diff --git a/src/producer.rs b/src/producer.rs index ed6fcd8..eb8f3e1 100644 --- a/src/producer.rs +++ b/src/producer.rs @@ -1,10 +1,6 @@ -use std::error::Error; +use std::{error::Error, io}; -#[cfg(unix)] -use std::{io, os::fd::RawFd, vec::Drain}; - -#[cfg(unix)] -use tokio::io::unix::AsyncFd; +use futures_core::Stream; use crate::{client::{ClientHandle, ClientEvent}, event::Event}; use crate::backend::producer; @@ -14,7 +10,8 @@ use std::env; #[cfg(unix)] enum Backend { - Wayland, + LayerShell, + Libei, X11, } @@ -31,7 +28,22 @@ pub fn create() -> Result, Box> { }, "wayland" => { log::info!("XDG_SESSION_TYPE = wayland -> using wayland event producer"); - Backend::Wayland + match env::var("XDG_CURRENT_DESKTOP") { + Ok(desktop) => match desktop.as_str() { + "GNOME" => { + log::info!("XDG_CURRENT_DESKTOP = GNOME -> using libei backend"); + Backend::Libei + } + d => { + log::info!("XDG_CURRENT_DESKTOP = {d} -> using layer_shell backend"); + Backend::LayerShell + } + } + Err(_) => { + log::warn!("XDG_CURRENT_DESKTOP not set! Assuming layer_shell support -> using layer_shell backend"); + Backend::LayerShell + } + } } _ => panic!("unknown XDG_SESSION_TYPE"), }, @@ -46,33 +58,25 @@ pub fn create() -> Result, Box> { #[cfg(feature = "x11")] Ok(Box::new(producer::x11::X11Producer::new())) } - Backend::Wayland => { + Backend::LayerShell => { #[cfg(not(feature = "wayland"))] panic!("feature wayland not enabled"); #[cfg(feature = "wayland")] Ok(Box::new(producer::wayland::WaylandEventProducer::new()?)) } + Backend::Libei => { + #[cfg(not(feature = "libei"))] + panic!("feature libei not enabled"); + #[cfg(feature = "libei")] + Ok(Box::new(producer::libei::LibeiProducer::new()?)) + }, } } -pub trait EventProducer { - +pub trait EventProducer: Stream> + Unpin { /// notify event producer of configuration changes fn notify(&mut self, event: ClientEvent); /// release mouse fn release(&mut self); - - /// unix only - #[cfg(unix)] - fn get_async_fd(&self) -> io::Result>; - - /// read an event - /// this function must be invoked to retrieve an Event after - /// the eventfd indicates a pending Event - #[cfg(unix)] - fn read_events(&mut self) -> Drain<(ClientHandle, Event)>; - - #[cfg(not(unix))] - fn get_wait_channel(&mut self) -> Option>; }