fix emulation notify

This commit is contained in:
Ferdinand Schober
2024-07-11 22:20:40 +02:00
parent b0407148bf
commit d54e472498
11 changed files with 157 additions and 61 deletions

View File

@@ -1,9 +1,5 @@
[workspace] [workspace]
members = [ members = ["input-capture", "input-emulation", "input-event"]
"input-capture",
"input-emulation",
"input-event",
]
[package] [package]
name = "lan-mouse" name = "lan-mouse"
@@ -14,8 +10,8 @@ license = "GPL-3.0-or-later"
repository = "https://github.com/ferdinandschober/lan-mouse" repository = "https://github.com/ferdinandschober/lan-mouse"
[profile.release] [profile.release]
strip = true # strip = true
lto = "fat" # lto = "fat"
[dependencies] [dependencies]
input-event = { path = "input-event", version = "0.1.0" } input-event = { path = "input-event", version = "0.1.0" }
@@ -29,11 +25,24 @@ anyhow = "1.0.71"
log = "0.4.20" log = "0.4.20"
env_logger = "0.11.3" env_logger = "0.11.3"
serde_json = "1.0.107" serde_json = "1.0.107"
tokio = {version = "1.32.0", features = ["io-util", "io-std", "macros", "net", "process", "rt", "sync", "signal"] } tokio = { version = "1.32.0", features = [
"io-util",
"io-std",
"macros",
"net",
"process",
"rt",
"sync",
"signal",
] }
futures = "0.3.28" futures = "0.3.28"
clap = { version="4.4.11", features = ["derive"] } clap = { version = "4.4.11", features = ["derive"] }
gtk = { package = "gtk4", version = "0.8.1", features = ["v4_2"], optional = true } gtk = { package = "gtk4", version = "0.8.1", features = [
adw = { package = "libadwaita", version = "0.6.0", features = ["v1_1"], optional = true } "v4_2",
], optional = true }
adw = { package = "libadwaita", version = "0.6.0", features = [
"v1_1",
], optional = true }
async-channel = { version = "2.1.1", optional = true } async-channel = { version = "2.1.1", optional = true }
hostname = "0.4.0" hostname = "0.4.0"
slab = "0.4.9" slab = "0.4.9"
@@ -48,9 +57,9 @@ libc = "0.2.148"
glib-build-tools = { version = "0.19.0", optional = true } glib-build-tools = { version = "0.19.0", optional = true }
[features] [features]
default = [ "wayland", "x11", "xdg_desktop_portal", "libei", "gtk" ] default = ["wayland", "x11", "xdg_desktop_portal", "libei", "gtk"]
wayland = [ "input-capture/wayland", "input-emulation/wayland" ] wayland = ["input-capture/wayland", "input-emulation/wayland"]
x11 = [ "input-capture/x11", "input-emulation/x11" ] x11 = ["input-capture/x11", "input-emulation/x11"]
xdg_desktop_portal = [ "input-emulation/xdg_desktop_portal" ] xdg_desktop_portal = ["input-emulation/xdg_desktop_portal"]
libei = [ "input-capture/libei", "input-emulation/libei" ] libei = ["input-capture/libei", "input-emulation/libei"]
gtk = ["dep:gtk", "dep:adw", "dep:async-channel", "dep:glib-build-tools"] gtk = ["dep:gtk", "dep:adw", "dep:async-channel", "dep:glib-build-tools"]

View File

@@ -508,7 +508,7 @@ async fn handle_ei_event(
} }
EiEvent::SeatRemoved(_) | /* EiEvent::DeviceAdded(_) | */ EiEvent::DeviceRemoved(_) => { EiEvent::SeatRemoved(_) | /* EiEvent::DeviceAdded(_) | */ EiEvent::DeviceRemoved(_) => {
log::debug!("releasing session: {ei_event:?}"); log::debug!("releasing session: {ei_event:?}");
release_session.notify_waiters(); release_session.notify_one();
} }
EiEvent::DevicePaused(_) | EiEvent::DeviceResumed(_) => {} EiEvent::DevicePaused(_) | EiEvent::DeviceResumed(_) => {}
EiEvent::DeviceStartEmulating(_) => log::debug!("START EMULATING"), EiEvent::DeviceStartEmulating(_) => log::debug!("START EMULATING"),

View File

@@ -26,4 +26,7 @@ impl InputEmulation for DummyEmulation {
} }
async fn create(&mut self, _: EmulationHandle) {} async fn create(&mut self, _: EmulationHandle) {}
async fn destroy(&mut self, _: EmulationHandle) {} async fn destroy(&mut self, _: EmulationHandle) {}
async fn terminate(&mut self) {
/* nothing to do */
}
} }

View File

@@ -76,6 +76,7 @@ pub trait InputEmulation: Send {
) -> Result<(), EmulationError>; ) -> Result<(), EmulationError>;
async fn create(&mut self, handle: EmulationHandle); async fn create(&mut self, handle: EmulationHandle);
async fn destroy(&mut self, handle: EmulationHandle); async fn destroy(&mut self, handle: EmulationHandle);
async fn terminate(&mut self);
} }
pub async fn create_backend( pub async fn create_backend(

View File

@@ -5,8 +5,8 @@ use std::{
io, io,
os::{fd::OwnedFd, unix::net::UnixStream}, os::{fd::OwnedFd, unix::net::UnixStream},
sync::{ sync::{
atomic::{AtomicU32, Ordering}, atomic::{AtomicBool, AtomicU32, Ordering},
Arc, RwLock, Arc, Mutex, RwLock,
}, },
time::{SystemTime, UNIX_EPOCH}, time::{SystemTime, UNIX_EPOCH},
}; };
@@ -63,8 +63,10 @@ struct Devices {
pub struct LibeiEmulation { pub struct LibeiEmulation {
context: ei::Context, context: ei::Context,
devices: Devices, devices: Devices,
ei_task: JoinHandle<()>,
error: Arc<Mutex<Option<EmulationError>>>,
libei_error: Arc<AtomicBool>,
serial: AtomicU32, serial: AtomicU32,
ei_task: JoinHandle<Result<(), EmulationError>>,
} }
async fn get_ei_fd() -> Result<OwnedFd, ashpd::Error> { async fn get_ei_fd() -> Result<OwnedFd, ashpd::Error> {
@@ -95,7 +97,9 @@ async fn get_ei_fd() -> Result<OwnedFd, ashpd::Error> {
}; };
}; };
proxy.connect_to_eis(&session).await let fd = proxy.connect_to_eis(&session).await?;
session.close().await?;
Ok(fd)
} }
impl LibeiEmulation { impl LibeiEmulation {
@@ -115,16 +119,26 @@ impl LibeiEmulation {
.await?; .await?;
let events = EiConvertEventStream::new(events, handshake.serial); let events = EiConvertEventStream::new(events, handshake.serial);
let devices = Devices::default(); let devices = Devices::default();
let ei_handler = ei_event_handler(events, context.clone(), devices.clone()); let libei_error = Arc::new(AtomicBool::default());
let error = Arc::new(Mutex::new(None));
let ei_handler = ei_task(
events,
context.clone(),
devices.clone(),
libei_error.clone(),
error.clone(),
);
let ei_task = tokio::task::spawn_local(ei_handler); let ei_task = tokio::task::spawn_local(ei_handler);
let serial = AtomicU32::new(handshake.serial); let serial = AtomicU32::new(handshake.serial);
Ok(Self { Ok(Self {
serial,
context, context,
ei_task,
devices, devices,
ei_task,
error,
libei_error,
serial,
}) })
} }
} }
@@ -146,6 +160,12 @@ impl InputEmulation for LibeiEmulation {
.duration_since(UNIX_EPOCH) .duration_since(UNIX_EPOCH)
.unwrap() .unwrap()
.as_micros() as u64; .as_micros() as u64;
if self.libei_error.load(Ordering::SeqCst) {
// don't break sending additional events but signal error
if let Some(e) = self.error.lock().unwrap().take() {
return Err(e);
}
}
match event { match event {
Event::Pointer(p) => match p { Event::Pointer(p) => match p {
PointerEvent::Motion { time: _, dx, dy } => { PointerEvent::Motion { time: _, dx, dy } => {
@@ -228,12 +248,35 @@ impl InputEmulation for LibeiEmulation {
async fn create(&mut self, _: EmulationHandle) {} async fn create(&mut self, _: EmulationHandle) {}
async fn destroy(&mut self, _: EmulationHandle) {} async fn destroy(&mut self, _: EmulationHandle) {}
async fn terminate(&mut self) {
self.ei_task.abort();
/* FIXME */
}
} }
async fn ei_event_handler( async fn ei_task(
mut events: EiConvertEventStream, mut events: EiConvertEventStream,
context: ei::Context, context: ei::Context,
devices: Devices, devices: Devices,
libei_error: Arc<AtomicBool>,
error: Arc<Mutex<Option<EmulationError>>>,
) {
loop {
match ei_event_handler(&mut events, &context, &devices).await {
Ok(()) => {}
Err(e) => {
libei_error.store(true, Ordering::SeqCst);
error.lock().unwrap().replace(e);
}
}
}
}
async fn ei_event_handler(
events: &mut EiConvertEventStream,
context: &ei::Context,
devices: &Devices,
) -> Result<(), EmulationError> { ) -> Result<(), EmulationError> {
loop { loop {
let event = events let event = events
@@ -253,7 +296,7 @@ async fn ei_event_handler(
match event { match event {
EiEvent::Disconnected(e) => { EiEvent::Disconnected(e) => {
log::debug!("ei disconnected: {e:?}"); log::debug!("ei disconnected: {e:?}");
break; return Err(EmulationError::EndOfStream);
} }
EiEvent::SeatAdded(e) => { EiEvent::SeatAdded(e) => {
e.seat().bind_capabilities(CAPABILITIES); e.seat().bind_capabilities(CAPABILITIES);
@@ -327,5 +370,4 @@ async fn ei_event_handler(
} }
context.flush().map_err(|e| io::Error::new(e.kind(), e))?; context.flush().map_err(|e| io::Error::new(e.kind(), e))?;
} }
Ok(())
} }

View File

@@ -165,6 +165,9 @@ impl InputEmulation for WlrootsEmulation {
log::error!("{}", e); log::error!("{}", e);
} }
} }
async fn terminate(&mut self) {
/* nothing to do */
}
} }
struct VirtualInput { struct VirtualInput {

View File

@@ -148,4 +148,8 @@ impl InputEmulation for X11Emulation {
async fn destroy(&mut self, _: EmulationHandle) { async fn destroy(&mut self, _: EmulationHandle) {
// for our purposes it does not matter what client sent the event // for our purposes it does not matter what client sent the event
} }
async fn terminate(&mut self) {
/* nothing to do */
}
} }

View File

@@ -142,6 +142,14 @@ impl<'a> InputEmulation for DesktopPortalEmulation<'a> {
async fn create(&mut self, _client: EmulationHandle) {} async fn create(&mut self, _client: EmulationHandle) {}
async fn destroy(&mut self, _client: EmulationHandle) {} async fn destroy(&mut self, _client: EmulationHandle) {}
async fn terminate(&mut self) {
if let Err(e) = self.session.close().await {
log::warn!("session.close(): {e}");
};
if let Err(e) = self.session.receive_closed().await {
log::warn!("session.receive_closed(): {e}");
};
}
} }
impl<'a> AsyncDrop for DesktopPortalEmulation<'a> { impl<'a> AsyncDrop for DesktopPortalEmulation<'a> {

View File

@@ -95,7 +95,7 @@ async fn capture_task(
) )
.await .await
{ {
log::warn!("input emulation exited: {e}"); log::warn!("input capture exited: {e}");
} }
let _ = frontend_tx let _ = frontend_tx
.send(FrontendEvent::CaptureStatus(Status::Disabled)) .send(FrontendEvent::CaptureStatus(Status::Disabled))
@@ -249,7 +249,7 @@ async fn handle_capture_event(
}; };
if start_timer { if start_timer {
timer_notify.notify_waiters(); timer_notify.notify_one();
} }
if enter { if enter {
spawn_hook_command(server, handle); spawn_hook_command(server, handle);

View File

@@ -108,7 +108,9 @@ async fn emulation_task(
if cancellation_token.is_cancelled() { if cancellation_token.is_cancelled() {
break; break;
} }
log::info!("waiting for user to request input emulation ...");
notify_emulation.notified().await; notify_emulation.notified().await;
log::info!("... done");
} }
} }
@@ -124,11 +126,26 @@ async fn do_emulation(
cancellation_token: &CancellationToken, cancellation_token: &CancellationToken,
) -> Result<(), LanMouseEmulationError> { ) -> Result<(), LanMouseEmulationError> {
let backend = backend.map(|b| b.into()); let backend = backend.map(|b| b.into());
log::info!("creating input emulation...");
let mut emulation = input_emulation::create(backend).await?; let mut emulation = input_emulation::create(backend).await?;
let _ = frontend_tx let _ = frontend_tx
.send(FrontendEvent::EmulationStatus(Status::Enabled)) .send(FrontendEvent::EmulationStatus(Status::Enabled))
.await; .await;
let res = do_emulation_session(
&mut emulation,
rx,
server,
udp_rx,
sender_tx,
capture_tx,
timer_notify,
cancellation_token,
)
.await;
emulation.terminate().await;
res?;
// FIXME DUPLICATES // FIXME DUPLICATES
// add clients // add clients
// let clients = server // let clients = server
@@ -141,40 +158,48 @@ async fn do_emulation(
// emulation.create(handle).await; // emulation.create(handle).await;
// } // }
let mut last_ignored = None;
loop {
tokio::select! {
udp_event = udp_rx.recv() => {
let udp_event = match udp_event {
Some(Ok(e)) => e,
Some(Err(e)) => {
log::warn!("network error: {e}");
continue;
}
None => break,
};
handle_udp_rx(&server, &capture_tx, &mut emulation, &sender_tx, &mut last_ignored, udp_event, &timer_notify).await?;
}
emulate_event = rx.recv() => {
match emulate_event {
Some(e) => match e {
EmulationEvent::Create(h) => emulation.create(h).await,
EmulationEvent::Destroy(h) => emulation.destroy(h).await,
EmulationEvent::ReleaseKeys(c) => release_keys(&server, &mut emulation, c).await?,
},
None => break,
}
}
_ = cancellation_token.cancelled() => break,
}
}
// release potentially still pressed keys // release potentially still pressed keys
release_all_keys(server, &mut emulation).await?; release_all_keys(server, &mut emulation).await?;
Ok(()) Ok(())
} }
async fn do_emulation_session(
emulation: &mut Box<dyn InputEmulation>,
rx: &mut Receiver<EmulationEvent>,
server: &Server,
udp_rx: &mut Receiver<Result<(Event, SocketAddr), NetworkError>>,
sender_tx: &Sender<(Event, SocketAddr)>,
capture_tx: &Sender<CaptureEvent>,
timer_notify: &Notify,
cancellation_token: &CancellationToken,
) -> Result<(), LanMouseEmulationError> {
let mut last_ignored = None;
loop {
tokio::select! {
udp_event = udp_rx.recv() => {
let udp_event = match udp_event.expect("channel closed") {
Ok(e) => e,
Err(e) => {
log::warn!("network error: {e}");
continue;
}
};
handle_udp_rx(&server, &capture_tx, emulation, &sender_tx, &mut last_ignored, udp_event, &timer_notify).await?;
}
emulate_event = rx.recv() => {
match emulate_event.expect("channel closed") {
EmulationEvent::Create(h) => emulation.create(h).await,
EmulationEvent::Destroy(h) => emulation.destroy(h).await,
EmulationEvent::ReleaseKeys(c) => release_keys(&server, emulation, c).await?,
}
}
_ = cancellation_token.cancelled() => break Ok(()),
}
}
}
async fn handle_udp_rx( async fn handle_udp_rx(
server: &Server, server: &Server,
capture_tx: &Sender<CaptureEvent>, capture_tx: &Sender<CaptureEvent>,
@@ -232,7 +257,7 @@ async fn handle_udp_rx(
); );
// restart timer if necessary // restart timer if necessary
if restart_timer { if restart_timer {
timer_notify.notify_waiters(); timer_notify.notify_one();
} }
ignore_event ignore_event
} else { } else {

View File

@@ -33,8 +33,8 @@ pub(crate) fn new(
mut frontend: FrontendListener, mut frontend: FrontendListener,
mut event: Receiver<FrontendEvent>, mut event: Receiver<FrontendEvent>,
server: Server, server: Server,
notify_capture: Arc<Notify>,
notify_emulation: Arc<Notify>, notify_emulation: Arc<Notify>,
notify_capture: Arc<Notify>,
capture: Sender<CaptureEvent>, capture: Sender<CaptureEvent>,
emulate: Sender<EmulationEvent>, emulate: Sender<EmulationEvent>,
resolve_ch: Sender<DnsRequest>, resolve_ch: Sender<DnsRequest>,
@@ -127,10 +127,11 @@ async fn handle_frontend_event(
log::debug!("frontend: {event:?}"); log::debug!("frontend: {event:?}");
match event { match event {
FrontendRequest::EnableCapture => { FrontendRequest::EnableCapture => {
notify_capture.notify_waiters(); notify_capture.notify_one();
} }
FrontendRequest::EnableEmulation => { FrontendRequest::EnableEmulation => {
notify_emulation.notify_waiters(); log::info!("received emulation enable request");
notify_emulation.notify_one();
} }
FrontendRequest::Create => { FrontendRequest::Create => {
let handle = add_client(server, frontend).await; let handle = add_client(server, frontend).await;