Libei Input Capture (#94)

This commit is contained in:
Ferdinand Schober
2024-03-20 14:03:52 +01:00
committed by GitHub
parent f7c59e40c9
commit 9afe7da0dd
10 changed files with 988 additions and 504 deletions

791
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -31,6 +31,7 @@ gtk = { package = "gtk4", version = "0.7.2", features = ["v4_2"], optional = tru
adw = { package = "libadwaita", version = "0.5.2", features = ["v1_1"], optional = true }
async-channel = { version = "2.1.1", optional = true }
keycode = "0.4.0"
once_cell = "1.19.0"
[target.'cfg(unix)'.dependencies]
libc = "0.2.148"
@@ -41,7 +42,7 @@ wayland-protocols = { version="0.31.0", features=["client", "staging", "unstable
wayland-protocols-wlr = { version="0.2.0", features=["client"], optional = true }
wayland-protocols-misc = { version="0.2.0", features=["client"], optional = true }
x11 = { version = "2.21.0", features = ["xlib", "xtest"], optional = true }
ashpd = { version = "0.6.2", default-features = false, features = ["tokio"], optional = true }
ashpd = { version = "0.8", default-features = false, features = ["tokio"], optional = true }
reis = { git = "https://github.com/ids1024/reis", features = [ "tokio" ], optional = true }
[target.'cfg(target_os="macos")'.dependencies]

View File

@@ -28,7 +28,7 @@ rustPlatform.buildRustPackage {
cargoLock.lockFile = ../Cargo.lock;
cargoLock.outputHashes = {
"reis-0.1.0" = "sha256-sRZqm6QdmgqfkTjEENV8erQd+0RL5z1+qjdmY18W3bA=";
"reis-0.1.0" = "sha256-QhsRIkzW3wgOlcHpkx3axjS8Vfed00Uf36av9ossPwQ=";
};
# Set Environment Variables

View File

@@ -1,15 +1,17 @@
use std::{
collections::HashMap,
io,
os::{
fd::{FromRawFd, RawFd},
unix::net::UnixStream,
},
os::{fd::OwnedFd, unix::net::UnixStream},
time::{SystemTime, UNIX_EPOCH},
};
use anyhow::{anyhow, Result};
use ashpd::desktop::remote_desktop::{DeviceType, RemoteDesktop};
use ashpd::{
desktop::{
remote_desktop::{DeviceType, RemoteDesktop},
ResponseError,
},
WindowIdentifier,
};
use async_trait::async_trait;
use futures::StreamExt;
@@ -43,22 +45,34 @@ pub struct LibeiConsumer {
serial: u32,
}
async fn get_ei_fd() -> Result<RawFd, ashpd::Error> {
async fn get_ei_fd() -> Result<OwnedFd, ashpd::Error> {
let proxy = RemoteDesktop::new().await?;
let session = proxy.create_session().await?;
// I HATE EVERYTHING, THIS TOOK 8 HOURS OF DEBUGGING
proxy
.select_devices(
&session,
DeviceType::Pointer | DeviceType::Keyboard | DeviceType::Touchscreen,
)
.await?;
// retry when user presses the cancel button
let (session, _) = loop {
log::debug!("creating session ...");
let session = proxy.create_session().await?;
log::debug!("selecting devices ...");
proxy
.select_devices(&session, DeviceType::Keyboard | DeviceType::Pointer)
.await?;
log::info!("requesting permission for input emulation");
match proxy
.start(&session, &WindowIdentifier::default())
.await?
.response()
{
Ok(d) => break (session, d),
Err(ashpd::Error::Response(ResponseError::Cancelled)) => {
log::warn!("request cancelled!");
continue;
}
e => e?,
};
};
proxy
.start(&session, &ashpd::WindowIdentifier::default())
.await?
.response()?;
proxy.connect_to_eis(&session).await
}
@@ -66,15 +80,7 @@ impl LibeiConsumer {
pub async fn new() -> Result<Self> {
// fd is owned by the message, so we need to dup it
let eifd = get_ei_fd().await?;
let eifd = unsafe {
let ret = libc::dup(eifd);
if ret < 0 {
Err(io::Error::last_os_error())
} else {
Ok(ret)
}
}?;
let stream = unsafe { UnixStream::from_raw_fd(eifd) };
let stream = UnixStream::from(eifd);
// let stream = UnixStream::connect("/run/user/1000/eis-0")?;
stream.set_nonblocking(true)?;
let context = ei::Context::new(stream)?;

View File

@@ -2,7 +2,7 @@ use anyhow::Result;
use ashpd::{
desktop::{
remote_desktop::{Axis, DeviceType, KeyState, RemoteDesktop},
Session,
ResponseError, Session,
},
WindowIdentifier,
};
@@ -26,17 +26,32 @@ impl<'a> DesktopPortalConsumer<'a> {
pub async fn new() -> Result<DesktopPortalConsumer<'a>> {
log::debug!("connecting to org.freedesktop.portal.RemoteDesktop portal ...");
let proxy = RemoteDesktop::new().await?;
log::debug!("creating session ...");
let session = proxy.create_session().await?;
log::debug!("selecting devices ...");
proxy
.select_devices(&session, DeviceType::Keyboard | DeviceType::Pointer)
.await?;
let _ = proxy
.start(&session, &WindowIdentifier::default())
.await?
.response()?;
// retry when user presses the cancel button
let (session, _) = loop {
log::debug!("creating session ...");
let session = proxy.create_session().await?;
log::debug!("selecting devices ...");
proxy
.select_devices(&session, DeviceType::Keyboard | DeviceType::Pointer)
.await?;
log::info!("requesting permission for input emulation");
match proxy
.start(&session, &WindowIdentifier::default())
.await?
.response()
{
Ok(d) => break (session, d),
Err(ashpd::Error::Response(ResponseError::Cancelled)) => {
log::warn!("request cancelled!");
continue;
}
e => e?,
};
};
log::debug!("started session");
Ok(Self { proxy, session })

View File

@@ -1,39 +1,555 @@
use anyhow::{anyhow, Result};
use std::{io, task::Poll};
use ashpd::{
desktop::{
input_capture::{Activated, Barrier, BarrierID, Capabilities, InputCapture, Region, Zones},
ResponseError, Session,
},
enumflags2::BitFlags,
};
use futures::StreamExt;
use reis::{
ei::{self, keyboard::KeyState},
eis::button::ButtonState,
event::{DeviceCapability, EiEvent},
tokio::{EiConvertEventStream, EiEventStream},
};
use std::{
cell::Cell,
collections::HashMap,
io,
os::unix::net::UnixStream,
pin::Pin,
rc::Rc,
task::{ready, Context, Poll},
};
use tokio::{
sync::mpsc::{Receiver, Sender},
task::JoinHandle,
};
use futures_core::Stream;
use once_cell::sync::Lazy;
use crate::{
client::{ClientEvent, ClientHandle},
event::Event,
client::{ClientEvent, ClientHandle, Position},
event::{Event, KeyboardEvent, PointerEvent},
producer::EventProducer,
};
pub struct LibeiProducer {}
#[derive(Debug)]
enum ProducerEvent {
Release,
ClientEvent(ClientEvent),
}
impl LibeiProducer {
pub fn new() -> Result<Self> {
Err(anyhow!("not implemented"))
#[allow(dead_code)]
pub struct LibeiProducer<'a> {
input_capture: Pin<Box<InputCapture<'a>>>,
libei_task: JoinHandle<Result<()>>,
event_rx: tokio::sync::mpsc::Receiver<(u32, Event)>,
notify_tx: tokio::sync::mpsc::Sender<ProducerEvent>,
}
static INTERFACES: Lazy<HashMap<&'static str, u32>> = Lazy::new(|| {
let mut m = HashMap::new();
m.insert("ei_connection", 1);
m.insert("ei_callback", 1);
m.insert("ei_pingpong", 1);
m.insert("ei_seat", 1);
m.insert("ei_device", 2);
m.insert("ei_pointer", 1);
m.insert("ei_pointer_absolute", 1);
m.insert("ei_scroll", 1);
m.insert("ei_button", 1);
m.insert("ei_keyboard", 1);
m.insert("ei_touchscreen", 1);
m
});
fn pos_to_barrier(r: &Region, pos: Position) -> (i32, i32, i32, i32) {
let (x, y) = (r.x_offset(), r.y_offset());
let (width, height) = (r.width() as i32, r.height() as i32);
match pos {
Position::Left => (x, y, x, y + height - 1), // start pos, end pos, inclusive
Position::Right => (x + width, y, x + width, y + height - 1),
Position::Top => (x, y, x + width - 1, y),
Position::Bottom => (x, y + height, x + width - 1, y + height),
}
}
impl EventProducer for LibeiProducer {
fn notify(&mut self, _event: ClientEvent) -> io::Result<()> {
fn select_barriers(
zones: &Zones,
clients: &Vec<(ClientHandle, Position)>,
next_barrier_id: &mut u32,
) -> (Vec<Barrier>, HashMap<BarrierID, ClientHandle>) {
let mut client_for_barrier = HashMap::new();
let mut barriers: Vec<Barrier> = vec![];
for (handle, pos) in clients {
let mut client_barriers = zones
.regions()
.iter()
.map(|r| {
let id = *next_barrier_id;
*next_barrier_id = id + 1;
let position = pos_to_barrier(r, *pos);
client_for_barrier.insert(id, *handle);
Barrier::new(id, position)
})
.collect();
barriers.append(&mut client_barriers);
}
(barriers, client_for_barrier)
}
async fn update_barriers(
input_capture: &InputCapture<'_>,
session: &Session<'_>,
active_clients: &Vec<(ClientHandle, Position)>,
next_barrier_id: &mut u32,
) -> Result<HashMap<BarrierID, ClientHandle>> {
let zones = input_capture.zones(session).await?.response()?;
log::debug!("zones: {zones:?}");
let (barriers, id_map) = select_barriers(&zones, active_clients, next_barrier_id);
log::debug!("barriers: {barriers:?}");
log::debug!("client for barrier id: {id_map:?}");
let response = input_capture
.set_pointer_barriers(session, &barriers, zones.zone_set())
.await?;
let response = response.response()?;
log::info!("{response:?}");
Ok(id_map)
}
impl<'a> Drop for LibeiProducer<'a> {
fn drop(&mut self) {
self.libei_task.abort();
}
}
async fn create_session<'a>(
input_capture: &'a InputCapture<'a>,
) -> Result<(Session<'a>, BitFlags<Capabilities>)> {
log::info!("creating input capture session");
let (session, capabilities) = loop {
match input_capture
.create_session(
&ashpd::WindowIdentifier::default(),
Capabilities::Keyboard | Capabilities::Pointer | Capabilities::Touchscreen,
)
.await
{
Ok(s) => break s,
Err(ashpd::Error::Response(ResponseError::Cancelled)) => continue,
o => o?,
};
};
log::debug!("capabilities: {capabilities:?}");
Ok((session, capabilities))
}
async fn connect_to_eis(
input_capture: &InputCapture<'_>,
session: &Session<'_>,
) -> Result<(ei::Context, EiConvertEventStream)> {
log::info!("connect_to_eis");
let fd = input_capture.connect_to_eis(session).await?;
// create unix stream from fd
let stream = UnixStream::from(fd);
stream.set_nonblocking(true)?;
// create ei context
let context = ei::Context::new(stream)?;
let mut event_stream = EiEventStream::new(context.clone())?;
let response = match reis::tokio::ei_handshake(
&mut event_stream,
"de.feschber.LanMouse",
ei::handshake::ContextType::Receiver,
&INTERFACES,
)
.await
{
Ok(res) => res,
Err(e) => return Err(anyhow!("ei handshake failed: {e:?}")),
};
let event_stream = EiConvertEventStream::new(event_stream, response.serial);
Ok((context, event_stream))
}
async fn libei_event_handler(
mut ei_event_stream: EiConvertEventStream,
context: ei::Context,
event_tx: Sender<(u32, Event)>,
current_client: Rc<Cell<Option<ClientHandle>>>,
) -> Result<()> {
loop {
let ei_event = match ei_event_stream.next().await {
Some(Ok(event)) => event,
Some(Err(e)) => return Err(anyhow!("libei connection closed: {e:?}")),
None => return Err(anyhow!("libei connection closed")),
};
log::trace!("from ei: {ei_event:?}");
let client = current_client.get();
handle_ei_event(ei_event, client, &context, &event_tx).await;
}
}
async fn wait_for_active_client(
notify_rx: &mut Receiver<ProducerEvent>,
active_clients: &mut Vec<(ClientHandle, Position)>,
) -> Result<()> {
// wait for a client update
while let Some(producer_event) = notify_rx.recv().await {
if let ProducerEvent::ClientEvent(c) = producer_event {
handle_producer_event(ProducerEvent::ClientEvent(c), active_clients)?;
break;
}
}
Ok(())
}
impl<'a> LibeiProducer<'a> {
pub async fn new() -> Result<Self> {
let input_capture = Box::pin(InputCapture::new().await?);
let input_capture_ptr = input_capture.as_ref().get_ref() as *const InputCapture<'static>;
let mut first_session = Some(create_session(unsafe { &*input_capture_ptr }).await?);
let (event_tx, event_rx) = tokio::sync::mpsc::channel(32);
let (notify_tx, mut notify_rx) = tokio::sync::mpsc::channel(32);
let libei_task = tokio::task::spawn_local(async move {
/* safety: libei_task does not outlive Self */
let input_capture = unsafe { &*input_capture_ptr };
let mut active_clients: Vec<(ClientHandle, Position)> = vec![];
let mut next_barrier_id = 1u32;
/* there is a bug in xdg-remote-desktop-portal-gnome / mutter that
* prevents receiving further events after a session has been disabled once.
* Therefore the session needs to recreated when the barriers are updated */
loop {
// otherwise it asks to capture input even with no active clients
if active_clients.is_empty() {
wait_for_active_client(&mut notify_rx, &mut active_clients).await?;
continue;
}
let current_client = Rc::new(Cell::new(None));
// create session
let (session, _) = match first_session.take() {
Some(s) => s,
_ => create_session(input_capture).await?,
};
// connect to eis server
let (context, ei_event_stream) = connect_to_eis(input_capture, &session).await?;
// async event task
let mut ei_task: JoinHandle<Result<(), anyhow::Error>> =
tokio::task::spawn_local(libei_event_handler(
ei_event_stream,
context,
event_tx.clone(),
current_client.clone(),
));
let mut activated = input_capture.receive_activated().await?;
let mut zones_changed = input_capture.receive_zones_changed().await?;
// set barriers
let client_for_barrier_id = update_barriers(
input_capture,
&session,
&active_clients,
&mut next_barrier_id,
)
.await?;
log::info!("enabling session");
input_capture.enable(&session).await?;
loop {
tokio::select! {
activated = activated.next() => {
let activated = activated.ok_or(anyhow!("error receiving activation token"))?;
log::debug!("activated: {activated:?}");
let client = *client_for_barrier_id
.get(&activated.barrier_id())
.expect("invalid barrier id");
current_client.replace(Some(client));
event_tx.send((client, Event::Enter())).await?;
tokio::select! {
producer_event = notify_rx.recv() => {
let producer_event = producer_event.expect("channel closed");
if handle_producer_event(producer_event, &mut active_clients)? {
break; /* clients updated */
}
}
zones_changed = zones_changed.next() => {
log::debug!("zones changed: {zones_changed:?}");
break;
}
res = &mut ei_task => {
if let Err(e) = res.expect("ei task paniced") {
log::warn!("libei task exited: {e}");
}
break;
}
}
release_capture(
input_capture,
&session,
activated,
client,
&active_clients,
).await?;
}
producer_event = notify_rx.recv() => {
let producer_event = producer_event.expect("channel closed");
if handle_producer_event(producer_event, &mut active_clients)? {
/* clients updated */
break;
}
},
res = &mut ei_task => {
if let Err(e) = res.expect("ei task paniced") {
log::warn!("libei task exited: {e}");
}
break;
}
}
}
ei_task.abort();
input_capture.disable(&session).await?;
}
});
let producer = Self {
input_capture,
event_rx,
libei_task,
notify_tx,
};
Ok(producer)
}
}
async fn release_capture(
input_capture: &InputCapture<'_>,
session: &Session<'_>,
activated: Activated,
current_client: ClientHandle,
active_clients: &[(ClientHandle, Position)],
) -> Result<()> {
log::debug!("releasing input capture {}", activated.activation_id());
let (x, y) = activated.cursor_position();
let pos = active_clients
.iter()
.filter(|(c, _)| *c == current_client)
.map(|(_, p)| p)
.next()
.unwrap(); // FIXME
let (dx, dy) = match pos {
// offset cursor position to not enter again immediately
Position::Left => (1., 0.),
Position::Right => (-1., 0.),
Position::Top => (0., 1.),
Position::Bottom => (0., -1.),
};
// release 1px to the right of the entered zone
let cursor_position = (x as f64 + dx, y as f64 + dy);
input_capture
.release(session, activated.activation_id(), cursor_position)
.await?;
Ok(())
}
fn handle_producer_event(
producer_event: ProducerEvent,
active_clients: &mut Vec<(ClientHandle, Position)>,
) -> Result<bool> {
log::debug!("handling event: {producer_event:?}");
let updated = match producer_event {
ProducerEvent::Release => false,
ProducerEvent::ClientEvent(ClientEvent::Create(c, p)) => {
active_clients.push((c, p));
true
}
ProducerEvent::ClientEvent(ClientEvent::Destroy(c)) => {
active_clients.retain(|(h, _)| *h != c);
true
}
};
Ok(updated)
}
async fn handle_ei_event(
ei_event: EiEvent,
current_client: Option<ClientHandle>,
context: &ei::Context,
event_tx: &Sender<(u32, Event)>,
) {
match ei_event {
EiEvent::SeatAdded(s) => {
s.seat.bind_capabilities(&[
DeviceCapability::Pointer,
DeviceCapability::PointerAbsolute,
DeviceCapability::Keyboard,
DeviceCapability::Touch,
DeviceCapability::Scroll,
DeviceCapability::Button,
]);
context.flush().unwrap();
}
EiEvent::SeatRemoved(_) => {}
EiEvent::DeviceAdded(_) => {}
EiEvent::DeviceRemoved(_) => {}
EiEvent::DevicePaused(_) => {}
EiEvent::DeviceResumed(_) => {}
EiEvent::KeyboardModifiers(mods) => {
let modifier_event = KeyboardEvent::Modifiers {
mods_depressed: mods.depressed,
mods_latched: mods.latched,
mods_locked: mods.locked,
group: mods.group,
};
if let Some(current_client) = current_client {
event_tx
.send((current_client, Event::Keyboard(modifier_event)))
.await
.unwrap();
}
}
EiEvent::Frame(_) => {}
EiEvent::DeviceStartEmulating(_) => {
log::debug!("START EMULATING =============>");
}
EiEvent::DeviceStopEmulating(_) => {
log::debug!("==================> STOP EMULATING");
}
EiEvent::PointerMotion(motion) => {
let motion_event = PointerEvent::Motion {
time: motion.time as u32,
relative_x: motion.dx as f64,
relative_y: motion.dy as f64,
};
if let Some(current_client) = current_client {
event_tx
.send((current_client, Event::Pointer(motion_event)))
.await
.unwrap();
}
}
EiEvent::PointerMotionAbsolute(_) => {}
EiEvent::Button(button) => {
let button_event = PointerEvent::Button {
time: button.time as u32,
button: button.button,
state: match button.state {
ButtonState::Released => 0,
ButtonState::Press => 1,
},
};
if let Some(current_client) = current_client {
event_tx
.send((current_client, Event::Pointer(button_event)))
.await
.unwrap();
}
}
EiEvent::ScrollDelta(_) => {}
EiEvent::ScrollStop(_) => {}
EiEvent::ScrollCancel(_) => {}
EiEvent::ScrollDiscrete(scroll) => {
if scroll.discrete_dy != 0 {
let event = PointerEvent::Axis {
time: 0,
axis: 0,
value: scroll.discrete_dy as f64,
};
if let Some(current_client) = current_client {
event_tx
.send((current_client, Event::Pointer(event)))
.await
.unwrap();
}
}
if scroll.discrete_dx != 0 {
let event = PointerEvent::Axis {
time: 0,
axis: 1,
value: scroll.discrete_dx as f64,
};
if let Some(current_client) = current_client {
event_tx
.send((current_client, Event::Pointer(event)))
.await
.unwrap();
}
};
}
EiEvent::KeyboardKey(key) => {
let key_event = KeyboardEvent::Key {
key: key.key,
state: match key.state {
KeyState::Press => 1,
KeyState::Released => 0,
},
time: key.time as u32,
};
if let Some(current_client) = current_client {
event_tx
.send((current_client, Event::Keyboard(key_event)))
.await
.unwrap();
}
}
EiEvent::TouchDown(_) => {}
EiEvent::TouchUp(_) => {}
EiEvent::TouchMotion(_) => {}
EiEvent::Disconnected(d) => {
log::error!("disconnect: {d:?}");
}
}
}
impl<'a> EventProducer for LibeiProducer<'a> {
fn notify(&mut self, event: ClientEvent) -> io::Result<()> {
let notify_tx = self.notify_tx.clone();
tokio::task::spawn_local(async move {
log::debug!("notifying {event:?}");
let _ = notify_tx.send(ProducerEvent::ClientEvent(event)).await;
log::debug!("done !");
});
Ok(())
}
fn release(&mut self) -> io::Result<()> {
let notify_tx = self.notify_tx.clone();
tokio::task::spawn_local(async move {
log::debug!("notifying Release");
let _ = notify_tx.send(ProducerEvent::Release).await;
});
Ok(())
}
}
impl Stream for LibeiProducer {
impl<'a> Stream for LibeiProducer<'a> {
type Item = io::Result<(ClientHandle, Event)>;
fn poll_next(
self: std::pin::Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
Poll::Pending
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match ready!(self.event_rx.poll_recv(cx)) {
None => Poll::Ready(None),
Some(e) => Poll::Ready(Some(Ok(e))),
}
}
}

View File

@@ -22,7 +22,7 @@ pub async fn create() -> Box<dyn EventProducer> {
}
#[cfg(all(unix, feature = "libei", not(target_os = "macos")))]
match producer::libei::LibeiProducer::new() {
match producer::libei::LibeiProducer::new().await {
Ok(p) => {
log::info!("using libei event producer");
return Box::new(p);

View File

@@ -150,6 +150,7 @@ impl Server {
let _ = resolve_tx.send(DnsRequest { hostname, handle }).await;
}
}
log::info!("running service");
tokio::select! {
_ = signal::ctrl_c() => {

View File

@@ -240,22 +240,24 @@ pub async fn remove_client(
frontend: &mut FrontendListener,
client: ClientHandle,
) -> Option<ClientHandle> {
let _ = producer_notify_tx
.send(ProducerEvent::ClientEvent(ClientEvent::Destroy(client)))
.await;
let _ = consumer_notify_tx
.send(ConsumerEvent::ClientEvent(ClientEvent::Destroy(client)))
.await;
let Some(client) = server
let Some((client, active)) = server
.client_manager
.borrow_mut()
.remove_client(client)
.map(|s| s.client.handle)
.map(|s| (s.client.handle, s.active))
else {
return None;
};
if active {
let _ = producer_notify_tx
.send(ProducerEvent::ClientEvent(ClientEvent::Destroy(client)))
.await;
let _ = consumer_notify_tx
.send(ConsumerEvent::ClientEvent(ClientEvent::Destroy(client)))
.await;
}
let notify = FrontendNotify::NotifyClientDelete(client);
log::debug!("{notify:?}");
if let Err(e) = frontend.notify_all(notify).await {
@@ -272,6 +274,7 @@ async fn update_client(
client_update: (ClientHandle, Option<String>, u16, Position),
) {
let (handle, hostname, port, pos) = client_update;
let mut changed = false;
let (hostname, handle, active) = {
// retrieve state
let mut client_manager = server.client_manager.borrow_mut();
@@ -280,12 +283,16 @@ async fn update_client(
};
// update pos
state.client.pos = pos;
if state.client.pos != pos {
state.client.pos = pos;
changed = true;
}
// update port
if state.client.port != port {
state.client.port = port;
state.active_addr = state.active_addr.map(|a| SocketAddr::new(a.ip(), port));
changed = true;
}
// update hostname
@@ -293,6 +300,7 @@ async fn update_client(
state.client.ips = HashSet::new();
state.active_addr = None;
state.client.hostname = hostname;
changed = true;
}
log::debug!("client updated: {:?}", state);
@@ -303,13 +311,14 @@ async fn update_client(
)
};
// resolve dns
if let Some(hostname) = hostname {
let _ = resolve_tx.send(DnsRequest { hostname, handle }).await;
}
// update state in event consumer & producer
if active {
if changed && active {
// resolve dns
if let Some(hostname) = hostname {
let _ = resolve_tx.send(DnsRequest { hostname, handle }).await;
}
// update state
let _ = producer_notify_tx
.send(ProducerEvent::ClientEvent(ClientEvent::Destroy(handle)))
.await;

View File

@@ -37,8 +37,11 @@ pub fn new(
loop {
tokio::select! {
event = producer.next() => {
let event = event.ok_or(anyhow!("event producer closed"))??;
handle_producer_event(&server, &mut producer, &sender_tx, &timer_tx, event, &mut pressed_keys, &release_bind).await?;
match event {
Some(Ok(event)) => handle_producer_event(&server, &mut producer, &sender_tx, &timer_tx, event, &mut pressed_keys, &release_bind).await?,
Some(Err(e)) => return Err(anyhow!("event producer: {e:?}")),
None => return Err(anyhow!("event producer closed")),
}
}
e = rx.recv() => {
log::debug!("producer notify rx: {e:?}");