release stuck keys (#53)

Keys are now released when
- A client disconnects and still has pressed keys
- A client disconnects through CTLR+ALT+SHIFT+WIN
- Lan Mouse terminates with keys still being pressed through a remote client

This is also fixes an issue caused by KDE's implementation of the remote desktop portal backend:
Keys are not correctly released when a remote desktop session is closed while keys are still pressed,
causing them to be permanently stuck until kwin is restarted.

This workaround remembers all pressed keys and releases them when lan-mouse exits or a device disconnects.

closes #15
This commit is contained in:
Ferdinand Schober
2023-12-31 15:41:06 +01:00
committed by Ferdinand Schober
parent 6a6d9a9fa9
commit 64e3bf3ff4
6 changed files with 421 additions and 297 deletions

View File

@@ -568,6 +568,7 @@ impl EventProducer for WaylandEventProducer {
}
fn release(&mut self) {
log::debug!("releasing pointer");
let inner = self.0.get_mut();
inner.state.ungrab();
inner.flush_events();

View File

@@ -2,7 +2,6 @@ use std::{
collections::HashSet,
fmt::Display,
net::{IpAddr, SocketAddr},
time::Instant,
};
use serde::{Deserialize, Serialize};
@@ -57,10 +56,6 @@ pub struct Client {
/// This way any event consumer / producer backend does not
/// need to know anything about a client other than its handle.
pub handle: ClientHandle,
/// `active` address of the client, used to send data to.
/// This should generally be the socket address where data
/// was last received from.
pub active_addr: Option<SocketAddr>,
/// all socket addresses associated with a particular client
/// e.g. Laptops usually have at least an ethernet and a wifi port
/// which have different ip addresses
@@ -71,7 +66,7 @@ pub struct Client {
pub pos: Position,
}
#[derive(Debug)]
#[derive(Clone, Copy, Debug)]
pub enum ClientEvent {
Create(ClientHandle, Position),
Destroy(ClientHandle),
@@ -81,11 +76,18 @@ pub type ClientHandle = u32;
#[derive(Debug, Clone)]
pub struct ClientState {
/// information about the client
pub client: Client,
/// events should be sent to and received from the client
pub active: bool,
pub last_ping: Option<Instant>,
pub last_seen: Option<Instant>,
pub last_replied: Option<Instant>,
/// `active` address of the client, used to send data to.
/// This should generally be the socket address where data
/// was last received from.
pub active_addr: Option<SocketAddr>,
/// tracks whether or not the client is responding to pings
pub alive: bool,
/// keys currently pressed by this client
pub pressed_keys: HashSet<u32>,
}
pub struct ClientManager {
@@ -114,9 +116,6 @@ impl ClientManager {
// get a new client_handle
let handle = self.free_id();
// we dont know, which IP is initially active
let active_addr = None;
// store fix ip addresses
let fix_ips = ips.iter().cloned().collect();
@@ -128,7 +127,6 @@ impl ClientManager {
hostname,
fix_ips,
handle,
active_addr,
addrs,
port,
pos,
@@ -137,10 +135,10 @@ impl ClientManager {
// client was never seen, nor pinged
let client_state = ClientState {
client,
last_ping: None,
last_seen: None,
last_replied: None,
active: false,
active_addr: None,
alive: false,
pressed_keys: HashSet::new(),
};
if handle as usize >= self.clients.len() {

View File

@@ -1,3 +1,4 @@
use anyhow::{anyhow, Result};
use std::{
error::Error,
fmt::{self, Display},
@@ -65,6 +66,9 @@ pub enum Event {
/// response to a ping event: this event signals that a client
/// is still alive but must otherwise be ignored
Pong(),
/// explicit disconnect request. The client will no longer
/// send events until the next Enter event. All of its keys should be released.
Disconnect(),
}
impl Display for PointerEvent {
@@ -120,6 +124,7 @@ impl Display for Event {
Event::Leave() => write!(f, "leave"),
Event::Ping() => write!(f, "ping"),
Event::Pong() => write!(f, "pong"),
Event::Disconnect() => write!(f, "disconnect"),
}
}
}
@@ -133,6 +138,7 @@ impl Event {
Self::Leave() => EventType::Leave,
Self::Ping() => EventType::Ping,
Self::Pong() => EventType::Pong,
Self::Disconnect() => EventType::Disconnect,
}
}
}
@@ -174,18 +180,19 @@ enum EventType {
Leave,
Ping,
Pong,
Disconnect,
}
impl TryFrom<u8> for PointerEventType {
type Error = Box<dyn Error>;
type Error = anyhow::Error;
fn try_from(value: u8) -> Result<Self, Self::Error> {
fn try_from(value: u8) -> Result<Self> {
match value {
x if x == Self::Motion as u8 => Ok(Self::Motion),
x if x == Self::Button as u8 => Ok(Self::Button),
x if x == Self::Axis as u8 => Ok(Self::Axis),
x if x == Self::Frame as u8 => Ok(Self::Frame),
_ => Err(Box::new(ProtocolError {
_ => Err(anyhow!(ProtocolError {
msg: format!("invalid pointer event type {}", value),
})),
}
@@ -193,13 +200,13 @@ impl TryFrom<u8> for PointerEventType {
}
impl TryFrom<u8> for KeyboardEventType {
type Error = Box<dyn Error>;
type Error = anyhow::Error;
fn try_from(value: u8) -> Result<Self, Self::Error> {
fn try_from(value: u8) -> Result<Self> {
match value {
x if x == Self::Key as u8 => Ok(Self::Key),
x if x == Self::Modifiers as u8 => Ok(Self::Modifiers),
_ => Err(Box::new(ProtocolError {
_ => Err(anyhow!(ProtocolError {
msg: format!("invalid keyboard event type {}", value),
})),
}
@@ -216,6 +223,7 @@ impl From<&Event> for Vec<u8> {
Event::Leave() => vec![],
Event::Ping() => vec![],
Event::Pong() => vec![],
Event::Disconnect() => vec![],
};
[event_id, event_data].concat()
}
@@ -234,9 +242,9 @@ impl fmt::Display for ProtocolError {
impl Error for ProtocolError {}
impl TryFrom<Vec<u8>> for Event {
type Error = Box<dyn Error>;
type Error = anyhow::Error;
fn try_from(value: Vec<u8>) -> Result<Self, Self::Error> {
fn try_from(value: Vec<u8>) -> Result<Self> {
let event_id = u8::from_be_bytes(value[..1].try_into()?);
match event_id {
i if i == (EventType::Pointer as u8) => Ok(Event::Pointer(value.try_into()?)),
@@ -245,7 +253,8 @@ impl TryFrom<Vec<u8>> for Event {
i if i == (EventType::Leave as u8) => Ok(Event::Leave()),
i if i == (EventType::Ping as u8) => Ok(Event::Ping()),
i if i == (EventType::Pong as u8) => Ok(Event::Pong()),
_ => Err(Box::new(ProtocolError {
i if i == (EventType::Disconnect as u8) => Ok(Event::Disconnect()),
_ => Err(anyhow!(ProtocolError {
msg: format!("invalid event_id {}", event_id),
})),
}
@@ -291,9 +300,9 @@ impl From<&PointerEvent> for Vec<u8> {
}
impl TryFrom<Vec<u8>> for PointerEvent {
type Error = Box<dyn Error>;
type Error = anyhow::Error;
fn try_from(data: Vec<u8>) -> Result<Self, Self::Error> {
fn try_from(data: Vec<u8>) -> Result<Self> {
match data.get(1) {
Some(id) => {
let event_type = match id.to_owned().try_into() {
@@ -305,7 +314,7 @@ impl TryFrom<Vec<u8>> for PointerEvent {
let time = match data.get(2..6) {
Some(d) => u32::from_be_bytes(d.try_into()?),
None => {
return Err(Box::new(ProtocolError {
return Err(anyhow!(ProtocolError {
msg: "Expected 4 Bytes at index 2".into(),
}))
}
@@ -313,7 +322,7 @@ impl TryFrom<Vec<u8>> for PointerEvent {
let relative_x = match data.get(6..14) {
Some(d) => f64::from_be_bytes(d.try_into()?),
None => {
return Err(Box::new(ProtocolError {
return Err(anyhow!(ProtocolError {
msg: "Expected 8 Bytes at index 6".into(),
}))
}
@@ -321,7 +330,7 @@ impl TryFrom<Vec<u8>> for PointerEvent {
let relative_y = match data.get(14..22) {
Some(d) => f64::from_be_bytes(d.try_into()?),
None => {
return Err(Box::new(ProtocolError {
return Err(anyhow!(ProtocolError {
msg: "Expected 8 Bytes at index 14".into(),
}))
}
@@ -336,7 +345,7 @@ impl TryFrom<Vec<u8>> for PointerEvent {
let time = match data.get(2..6) {
Some(d) => u32::from_be_bytes(d.try_into()?),
None => {
return Err(Box::new(ProtocolError {
return Err(anyhow!(ProtocolError {
msg: "Expected 4 Bytes at index 2".into(),
}))
}
@@ -344,7 +353,7 @@ impl TryFrom<Vec<u8>> for PointerEvent {
let button = match data.get(6..10) {
Some(d) => u32::from_be_bytes(d.try_into()?),
None => {
return Err(Box::new(ProtocolError {
return Err(anyhow!(ProtocolError {
msg: "Expected 4 Bytes at index 10".into(),
}))
}
@@ -352,7 +361,7 @@ impl TryFrom<Vec<u8>> for PointerEvent {
let state = match data.get(10..14) {
Some(d) => u32::from_be_bytes(d.try_into()?),
None => {
return Err(Box::new(ProtocolError {
return Err(anyhow!(ProtocolError {
msg: "Expected 4 Bytes at index 14".into(),
}))
}
@@ -367,7 +376,7 @@ impl TryFrom<Vec<u8>> for PointerEvent {
let time = match data.get(2..6) {
Some(d) => u32::from_be_bytes(d.try_into()?),
None => {
return Err(Box::new(ProtocolError {
return Err(anyhow!(ProtocolError {
msg: "Expected 4 Bytes at index 2".into(),
}))
}
@@ -375,7 +384,7 @@ impl TryFrom<Vec<u8>> for PointerEvent {
let axis = match data.get(6) {
Some(d) => *d,
None => {
return Err(Box::new(ProtocolError {
return Err(anyhow!(ProtocolError {
msg: "Expected 1 Byte at index 6".into(),
}));
}
@@ -383,7 +392,7 @@ impl TryFrom<Vec<u8>> for PointerEvent {
let value = match data.get(7..15) {
Some(d) => f64::from_be_bytes(d.try_into()?),
None => {
return Err(Box::new(ProtocolError {
return Err(anyhow!(ProtocolError {
msg: "Expected 8 Bytes at index 7".into(),
}));
}
@@ -393,7 +402,7 @@ impl TryFrom<Vec<u8>> for PointerEvent {
PointerEventType::Frame => Ok(Self::Frame {}),
}
}
None => Err(Box::new(ProtocolError {
None => Err(anyhow!(ProtocolError {
msg: "Expected an element at index 0".into(),
})),
}
@@ -434,9 +443,9 @@ impl From<&KeyboardEvent> for Vec<u8> {
}
impl TryFrom<Vec<u8>> for KeyboardEvent {
type Error = Box<dyn Error>;
type Error = anyhow::Error;
fn try_from(data: Vec<u8>) -> Result<Self, Self::Error> {
fn try_from(data: Vec<u8>) -> Result<Self> {
match data.get(1) {
Some(id) => {
let event_type = match id.to_owned().try_into() {
@@ -448,7 +457,7 @@ impl TryFrom<Vec<u8>> for KeyboardEvent {
let time = match data.get(2..6) {
Some(d) => u32::from_be_bytes(d.try_into()?),
None => {
return Err(Box::new(ProtocolError {
return Err(anyhow!(ProtocolError {
msg: "Expected 4 Bytes at index 6".into(),
}))
}
@@ -456,7 +465,7 @@ impl TryFrom<Vec<u8>> for KeyboardEvent {
let key = match data.get(6..10) {
Some(d) => u32::from_be_bytes(d.try_into()?),
None => {
return Err(Box::new(ProtocolError {
return Err(anyhow!(ProtocolError {
msg: "Expected 4 Bytes at index 10".into(),
}))
}
@@ -464,7 +473,7 @@ impl TryFrom<Vec<u8>> for KeyboardEvent {
let state = match data.get(10) {
Some(d) => *d,
None => {
return Err(Box::new(ProtocolError {
return Err(anyhow!(ProtocolError {
msg: "Expected 1 Bytes at index 14".into(),
}))
}
@@ -475,7 +484,7 @@ impl TryFrom<Vec<u8>> for KeyboardEvent {
let mods_depressed = match data.get(2..6) {
Some(d) => u32::from_be_bytes(d.try_into()?),
None => {
return Err(Box::new(ProtocolError {
return Err(anyhow!(ProtocolError {
msg: "Expected 4 Bytes at index 6".into(),
}))
}
@@ -483,7 +492,7 @@ impl TryFrom<Vec<u8>> for KeyboardEvent {
let mods_latched = match data.get(6..10) {
Some(d) => u32::from_be_bytes(d.try_into()?),
None => {
return Err(Box::new(ProtocolError {
return Err(anyhow!(ProtocolError {
msg: "Expected 4 Bytes at index 10".into(),
}))
}
@@ -491,7 +500,7 @@ impl TryFrom<Vec<u8>> for KeyboardEvent {
let mods_locked = match data.get(10..14) {
Some(d) => u32::from_be_bytes(d.try_into()?),
None => {
return Err(Box::new(ProtocolError {
return Err(anyhow!(ProtocolError {
msg: "Expected 4 Bytes at index 14".into(),
}))
}
@@ -499,7 +508,7 @@ impl TryFrom<Vec<u8>> for KeyboardEvent {
let group = match data.get(14..18) {
Some(d) => u32::from_be_bytes(d.try_into()?),
None => {
return Err(Box::new(ProtocolError {
return Err(anyhow!(ProtocolError {
msg: "Expected 4 Bytes at index 18".into(),
}))
}
@@ -513,7 +522,7 @@ impl TryFrom<Vec<u8>> for KeyboardEvent {
}
}
}
None => Err(Box::new(ProtocolError {
None => Err(anyhow!(ProtocolError {
msg: "Expected an element at index 0".into(),
})),
}

View File

@@ -108,6 +108,7 @@ pub enum FrontendNotify {
NotifyClientDelete(ClientHandle),
/// new port, reason of failure (if failed)
NotifyPortChange(u16, Option<String>),
/// Client State, active
Enumerate(Vec<(Client, bool)>),
NotifyError(String),
}

View File

@@ -65,7 +65,9 @@ fn run_service(config: &Config) -> Result<()> {
runtime.block_on(LocalSet::new().run_until(async {
// run main loop
log::info!("Press Ctrl+Alt+Shift+Super to release the mouse");
Server::run(config).await?;
let server = Server::new(config);
server.run().await?;
log::debug!("service exiting");
anyhow::Ok(())

View File

@@ -4,13 +4,12 @@ use log;
use std::{
cell::{Cell, RefCell},
collections::HashSet,
error::Error,
io::Result,
net::IpAddr,
rc::Rc,
time::{Duration, Instant},
time::Duration,
};
use tokio::{io::ReadHalf, net::UdpSocket, signal, sync::mpsc::Sender, task};
use tokio::{io::ReadHalf, net::UdpSocket, signal, sync::mpsc::Sender};
#[cfg(unix)]
use tokio::net::UnixStream;
@@ -27,6 +26,7 @@ use crate::{
dns,
frontend::{self, FrontendEvent, FrontendListener, FrontendNotify},
producer::EventProducer,
scancode,
};
use crate::{
consumer,
@@ -34,20 +34,37 @@ use crate::{
producer,
};
const MAX_RESPONSE_TIME: Duration = Duration::from_millis(500);
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
enum State {
/// Currently sending events to another device
Sending,
/// Currently receiving events from other devices
Receiving,
/// Entered the deadzone of another device but waiting
/// for acknowledgement (Leave event) from the device
AwaitingLeave,
}
#[derive(Clone, Copy, Debug)]
pub enum ProducerEvent {
/// producer must release the mouse
Release,
/// producer is notified of a change in client states
ClientEvent(ClientEvent),
/// termination signal
Terminate,
}
#[derive(Clone, Debug)]
pub enum ConsumerEvent {
/// consumer is notified of a change in client states
ClientEvent(ClientEvent),
/// consumer must release keys for client
ReleaseKeys(ClientHandle),
/// termination signal
Terminate,
}
#[derive(Clone)]
@@ -58,14 +75,35 @@ struct ClientUpdate {
pos: Position,
}
pub struct Server {}
#[derive(Clone)]
pub struct Server {
active_client: Rc<Cell<Option<ClientHandle>>>,
client_manager: Rc<RefCell<ClientManager>>,
port: Rc<Cell<u16>>,
state: Rc<Cell<State>>,
}
impl Server {
pub async fn run(config: &Config) -> anyhow::Result<()> {
pub fn new(config: &Config) -> Self {
let active_client = Rc::new(Cell::new(None));
let client_manager = Rc::new(RefCell::new(ClientManager::new()));
let state = Rc::new(Cell::new(State::Receiving));
let port = Rc::new(Cell::new(config.port));
for (ips, host, port, pos) in config.get_clients() {
client_manager.borrow_mut().add_client(host, ips, port, pos);
}
Self {
active_client,
client_manager,
port,
state,
}
}
pub async fn run(&self) -> anyhow::Result<()> {
// create frontend communication adapter
let mut frontend = match FrontendListener::new().await {
Some(Err(e)) => return Err(e),
Some(Ok(f)) => f,
Some(f) => f?,
None => {
// none means some other instance is already running
log::info!("service already running, exiting");
@@ -74,98 +112,67 @@ impl Server {
};
let (mut consumer, mut producer) = tokio::join!(consumer::create(), producer::create());
// create dns resolver
let resolver = dns::DnsResolver::new().await?;
// bind the udp socket
let listen_addr = SocketAddr::new("0.0.0.0".parse().unwrap(), config.port);
let mut socket = UdpSocket::bind(listen_addr).await?;
let (frontend_tx, mut frontend_rx) = tokio::sync::mpsc::channel(1);
// create client manager
let client_manager_rc = Rc::new(RefCell::new(ClientManager::new()));
let state_rc = Rc::new(Cell::new(State::Receiving));
// channel to notify producer
let (producer_notify_tx, mut producer_notify_rx) = tokio::sync::mpsc::channel(32);
// channel to notify consumer
let (consumer_notify_tx, mut consumer_notify_rx) = tokio::sync::mpsc::channel(32);
// channel to request dns resolver
let (resolve_tx, mut resolve_rx) = tokio::sync::mpsc::channel(32);
// channel to send events to frontends
let (frontend_notify_tx, mut frontend_notify_rx) = tokio::sync::mpsc::channel(32);
// channels for udp send / receive
let (receiver_tx, mut receiver_rx) = tokio::sync::mpsc::channel(32);
let (sender_tx, mut sender_rx) = tokio::sync::mpsc::channel(32);
let (port_tx, mut port_rx) = tokio::sync::mpsc::channel(32);
// add clients from config
for (c, h, port, p) in config.get_clients().into_iter() {
Self::add_client(
&resolve_tx,
&client_manager_rc,
&mut frontend,
h,
c,
port,
p,
)
.await;
}
let (timer_tx, mut timer_rx) = tokio::sync::mpsc::channel(1);
// event producer
let client_manager = client_manager_rc.clone();
let state = state_rc.clone();
let sender_ch = sender_tx.clone();
let producer_task = tokio::task::spawn_local(async move {
let timer_ch = timer_tx.clone();
let server = self.clone();
let mut producer_task = tokio::task::spawn_local(async move {
loop {
tokio::select! {
e = producer.next() => {
let (client, event) = match e {
Some(e) => e?,
None => return Err::<(), anyhow::Error>(anyhow!("event producer closed")),
};
Self::handle_producer_event(&mut producer, &client_manager, &state, &sender_ch, client, event).await;
event = producer.next() => {
let event = event.ok_or(anyhow!("event producer closed"))??;
log::debug!("producer event: {event:?}");
server.handle_producer_event(&mut producer, &sender_ch, &timer_ch, event).await;
}
e = producer_notify_rx.recv() => {
log::debug!("producer notify rx: {e:?}");
match e {
Some(e) => match e {
ProducerEvent::Release => producer.release(),
ProducerEvent::Release => {
producer.release();
server.state.replace(State::Receiving);
}
ProducerEvent::ClientEvent(e) => producer.notify(e),
ProducerEvent::Terminate => break,
},
None => break Ok(()),
None => break,
}
}
}
}
anyhow::Ok(())
});
// event consumer
let client_manager = client_manager_rc.clone();
let state = state_rc.clone();
let producer_notify = producer_notify_tx.clone();
let receiver_task = tokio::task::spawn_local(async move {
let sender_ch = sender_tx.clone();
let server = self.clone();
let mut consumer_task = tokio::task::spawn_local(async move {
let mut last_ignored = None;
loop {
tokio::select! {
udp_event = receiver_rx.recv() => {
let udp_event = match udp_event {
Some(Ok(e)) => e,
Some(Err(e)) => return Err::<(), anyhow::Error>(anyhow!("{}", e)),
None => return Err::<(), anyhow::Error>(anyhow!("receiver closed")),
};
Self::handle_udp_rx(&client_manager, &producer_notify, &mut consumer, &sender_tx, &state, &mut last_ignored, udp_event).await;
let udp_event = udp_event.ok_or(anyhow!("receiver closed"))??;
server.handle_udp_rx(&producer_notify, &mut consumer, &sender_ch, &mut last_ignored, udp_event, &timer_tx).await;
}
consumer_event = consumer_notify_rx.recv() => {
match consumer_event {
Some(e) => match e {
ConsumerEvent::ClientEvent(e) => consumer.notify(e).await,
ConsumerEvent::ReleaseKeys(c) => server.release_keys(&mut consumer, c).await,
ConsumerEvent::Terminate => break,
},
None => break,
}
@@ -173,14 +180,29 @@ impl Server {
_ = consumer.dispatch() => { }
}
}
// release potentially still pressed keys
let clients = server
.client_manager
.borrow()
.get_client_states()
.map(|s| s.client.handle)
.collect::<Vec<_>>();
for client in clients {
server.release_keys(&mut consumer, client).await;
}
// destroy consumer
consumer.destroy().await;
Ok(())
anyhow::Ok(())
});
// frontend listener
let client_manager = client_manager_rc.clone();
let frontend_task = tokio::task::spawn_local(async move {
let server = self.clone();
let producer_notify = producer_notify_tx.clone();
let consumer_notify = consumer_notify_tx.clone();
let frontend_ch = frontend_tx.clone();
let mut frontend_task = tokio::task::spawn_local(async move {
loop {
tokio::select! {
stream = frontend.accept() => {
@@ -191,32 +213,29 @@ impl Server {
continue;
}
};
Self::handle_frontend_stream(&client_manager, &mut frontend, &frontend_tx, stream).await;
server.handle_frontend_stream(&mut frontend, &frontend_ch, stream).await;
}
event = frontend_rx.recv() => {
let frontend_event = match event {
Some(e) => e,
None => return Err::<(), anyhow::Error>(anyhow!("frontend channel closed")),
};
let exit = Self::handle_frontend_event(&producer_notify_tx, &consumer_notify_tx, &client_manager, &resolve_tx, &mut frontend, &port_tx, frontend_event).await;
if exit {
return Ok(());
let frontend_event = event.ok_or(anyhow!("frontend channel closed"))?;
if server.handle_frontend_event(&producer_notify, &consumer_notify, &resolve_tx, &mut frontend, &port_tx, frontend_event).await {
break;
}
}
notify = frontend_notify_rx.recv() => {
let notify = match notify {
Some(n) => n,
None => return Err::<(), anyhow::Error>(anyhow!("frontend notify closed")),
};
let notify = notify.ok_or(anyhow!("frontend notify closed"))?;
let _ = frontend.notify_all(notify).await;
}
}
}
anyhow::Ok(())
});
// dns resolver
let client_manager = client_manager_rc.clone();
let resolver_task = tokio::task::spawn_local(async move {
// create dns resolver
let resolver = dns::DnsResolver::new().await?;
let server = self.clone();
let mut resolver_task = tokio::task::spawn_local(async move {
loop {
let (host, client): (String, ClientHandle) = match resolve_rx.recv().await {
Some(r) => r,
@@ -229,7 +248,7 @@ impl Server {
continue;
}
};
if let Some(state) = client_manager.borrow_mut().get_mut(client) {
if let Some(state) = server.client_manager.borrow_mut().get_mut(client) {
let port = state.client.port;
let mut addrs = HashSet::from_iter(
state
@@ -247,8 +266,11 @@ impl Server {
}
});
// bind the udp socket
let listen_addr = SocketAddr::new("0.0.0.0".parse().unwrap(), self.port.get());
let mut socket = UdpSocket::bind(listen_addr).await?;
// udp task
let udp_task = tokio::task::spawn_local(async move {
let mut udp_task = tokio::task::spawn_local(async move {
loop {
tokio::select! {
event = receive_event(&socket) => {
@@ -266,16 +288,16 @@ impl Server {
let Some(port) = port else {
break;
};
let current_port = socket.local_addr().unwrap().port();
if current_port == port {
let _ = frontend_notify_tx.send(FrontendNotify::NotifyPortChange(port, None)).await;
if socket.local_addr().unwrap().port() == port {
continue;
};
}
let listen_addr = SocketAddr::new("0.0.0.0".parse().unwrap(), port);
match UdpSocket::bind(listen_addr).await {
Ok(new_socket) => {
socket = new_socket;
server.port.replace(port);
let _ = frontend_notify_tx.send(FrontendNotify::NotifyPortChange(port, None)).await;
}
Err(e) => {
@@ -293,37 +315,157 @@ impl Server {
}
});
let reaper = task::spawn_local(async move {
tokio::select! {
_ = signal::ctrl_c() => {
log::info!("terminating service");
},
_ = producer_task => {
// TODO restart producer?
}
_ = receiver_task => {
// TODO restart producer?
}
_ = frontend_task => {
// frontend exited => exit requested
}
_ = resolver_task => {
// resolver exited
}
_ = udp_task => {
// udp exited
// timer task
let server = self.clone();
let sender_ch = sender_tx.clone();
let consumer_notify = consumer_notify_tx.clone();
let producer_notify = producer_notify_tx.clone();
let mut live_tracker = tokio::task::spawn_local(async move {
loop {
// wait for wake up signal
let Some(_): Option<()> = timer_rx.recv().await else {
break;
};
loop {
let receiving = server.state.get() == State::Receiving;
let (ping_clients, ping_addrs) = {
let mut client_manager = server.client_manager.borrow_mut();
let ping_clients: Vec<ClientHandle> = if receiving {
// if receiving we care about clients with pressed keys
client_manager
.get_client_states_mut()
.filter(|s| !s.pressed_keys.is_empty())
.map(|s| s.client.handle)
.collect()
} else {
// if sending we care about the active client
server.active_client.get().iter().cloned().collect()
};
// get relevant socket addrs for clients
let ping_addrs: Vec<SocketAddr> = {
ping_clients
.iter()
.flat_map(|&c| client_manager.get(c))
.flat_map(|state| {
if let Some(a) = state.active_addr {
vec![a]
} else {
state.client.addrs.iter().cloned().collect()
}
})
.collect()
};
// reset alive
for state in client_manager.get_client_states_mut() {
state.alive = false;
}
(ping_clients, ping_addrs)
};
if receiving && ping_clients.is_empty() {
// receiving and no client has pressed keys
// -> no need to keep pinging
break;
}
// ping clients
for addr in ping_addrs {
if sender_ch.send((Event::Ping(), addr)).await.is_err() {
break;
}
}
// give clients time to resond
if receiving {
log::debug!("waiting {MAX_RESPONSE_TIME:?} for response from client with pressed keys ...");
} else {
log::debug!("state: {:?} => waiting {MAX_RESPONSE_TIME:?} for client to respond ...", server.state.get());
}
tokio::time::sleep(MAX_RESPONSE_TIME).await;
// when anything is received from a client,
// the alive flag gets set
let unresponsive_clients: Vec<_> = {
let client_manager = server.client_manager.borrow();
ping_clients
.iter()
.filter_map(|&c| match client_manager.get(c) {
Some(state) if !state.alive => Some(c),
_ => None,
})
.collect()
};
// we may not be receiving anymore but we should respond
// to the original state and not the "new" one
if receiving {
for c in unresponsive_clients {
log::warn!("device not responding, releasing keys!");
let _ = consumer_notify.send(ConsumerEvent::ReleaseKeys(c)).await;
}
} else {
// release pointer if the active client has not responded
if !unresponsive_clients.is_empty() {
log::warn!("client not responding, releasing pointer!");
server.state.replace(State::Receiving);
let _ = producer_notify.send(ProducerEvent::Release).await;
}
}
}
}
});
reaper.await?;
// initial sync of clients
frontend_tx.send(FrontendEvent::Enumerate()).await?;
tokio::select! {
_ = signal::ctrl_c() => {
log::info!("terminating service");
},
_ = &mut producer_task => {
// TODO restart producer?
}
_ = &mut consumer_task => {
// TODO restart producer?
}
_ = &mut frontend_task => {
// frontend exited => exit requested
}
_ = &mut resolver_task => {
// resolver exited
}
_ = &mut udp_task => {
// udp exited
}
_ = &mut live_tracker => {
// live tracker exited
}
}
let _ = consumer_notify_tx.send(ConsumerEvent::Terminate).await;
let _ = producer_notify_tx.send(ProducerEvent::Terminate).await;
let _ = frontend_tx.send(FrontendEvent::Shutdown()).await;
let (a, b, c) = tokio::join!(producer_task, consumer_task, frontend_task);
a??;
b??;
c??;
resolver_task.abort();
udp_task.abort();
live_tracker.abort();
Ok(())
}
pub async fn add_client(
&self,
resolver_tx: &Sender<(String, ClientHandle)>,
client_manager: &Rc<RefCell<ClientManager>>,
frontend: &mut FrontendListener,
hostname: Option<String>,
addr: HashSet<IpAddr>,
@@ -336,7 +478,8 @@ impl Server {
hostname.as_deref().unwrap_or(""),
&addr
);
let client = client_manager
let client = self
.client_manager
.borrow_mut()
.add_client(hostname.clone(), addr, port, pos);
@@ -352,13 +495,13 @@ impl Server {
}
pub async fn activate_client(
&self,
producer_notify_tx: &Sender<ProducerEvent>,
consumer_notify_tx: &Sender<ConsumerEvent>,
client_manager: &Rc<RefCell<ClientManager>>,
client: ClientHandle,
active: bool,
) {
let (client, pos) = match client_manager.borrow_mut().get_mut(client) {
let (client, pos) = match self.client_manager.borrow_mut().get_mut(client) {
Some(state) => {
state.active = active;
(state.client.handle, state.client.pos)
@@ -383,7 +526,7 @@ impl Server {
}
pub async fn remove_client(
client_manager: &Rc<RefCell<ClientManager>>,
&self,
producer_notify_tx: &Sender<ProducerEvent>,
consumer_notify_tx: &Sender<ConsumerEvent>,
frontend: &mut FrontendListener,
@@ -396,7 +539,8 @@ impl Server {
.send(ConsumerEvent::ClientEvent(ClientEvent::Destroy(client)))
.await;
let Some(client) = client_manager
let Some(client) = self
.client_manager
.borrow_mut()
.remove_client(client)
.map(|s| s.client.handle)
@@ -413,15 +557,15 @@ impl Server {
}
async fn update_client(
&self,
producer_notify_tx: &Sender<ProducerEvent>,
consumer_notify_tx: &Sender<ConsumerEvent>,
resolve_tx: &Sender<(String, ClientHandle)>,
client_manager: &Rc<RefCell<ClientManager>>,
client_update: ClientUpdate,
) {
let (hostname, handle, active) = {
// retrieve state
let mut client_manager = client_manager.borrow_mut();
let mut client_manager = self.client_manager.borrow_mut();
let Some(state) = client_manager.get_mut(client_update.client) else {
return;
};
@@ -443,7 +587,6 @@ impl Server {
})
.collect();
state
.client
.active_addr
.map(|a| SocketAddr::new(a.ip(), client_update.port));
}
@@ -451,7 +594,7 @@ impl Server {
// update hostname
if state.client.hostname != client_update.hostname {
state.client.addrs = HashSet::new();
state.client.active_addr = None;
state.active_addr = None;
state.client.hostname = client_update.hostname;
}
@@ -496,18 +639,18 @@ impl Server {
}
async fn handle_udp_rx(
client_manager: &Rc<RefCell<ClientManager>>,
&self,
producer_notify_tx: &Sender<ProducerEvent>,
consumer: &mut Box<dyn EventConsumer>,
sender_tx: &Sender<(Event, SocketAddr)>,
state: &Rc<Cell<State>>,
last_ignored: &mut Option<SocketAddr>,
event: (Event, SocketAddr),
timer_tx: &Sender<()>,
) {
let (event, addr) = event;
// get handle for addr
let handle = match client_manager.borrow().get_client(addr) {
let handle = match self.client_manager.borrow().get_client(addr) {
Some(a) => a,
None => {
if last_ignored.is_none() || last_ignored.is_some() && last_ignored.unwrap() != addr
@@ -524,7 +667,7 @@ impl Server {
log::trace!("{:20} <-<-<-<------ {addr} ({handle})", event.to_string());
{
let mut client_manager = client_manager.borrow_mut();
let mut client_manager = self.client_manager.borrow_mut();
let client_state = match client_manager.get_mut(handle) {
Some(s) => s,
None => {
@@ -534,9 +677,9 @@ impl Server {
};
// reset ttl for client and
client_state.last_seen = Some(Instant::now());
client_state.alive = true;
// set addr as new default for this client
client_state.client.active_addr = Some(addr);
client_state.active_addr = Some(addr);
}
match (event, addr) {
@@ -544,24 +687,49 @@ impl Server {
(Event::Ping(), addr) => {
let _ = sender_tx.send((Event::Pong(), addr)).await;
}
(Event::Disconnect(), _) => {
self.release_keys(consumer, handle).await;
}
(event, addr) => {
// tell clients that we are ready to receive events
if let Event::Enter() = event {
let _ = sender_tx.send((Event::Leave(), addr)).await;
}
match state.get() {
match self.state.get() {
State::Sending => {
if let Event::Leave() = event {
// ignore additional leave events that may
// have been sent for redundancy
} else {
// upon receiving any event, we go back to receiving mode
self.state.replace(State::Receiving);
let _ = producer_notify_tx.send(ProducerEvent::Release).await;
state.replace(State::Receiving);
log::trace!("STATE ===> Receiving");
}
}
State::Receiving => {
if let Event::Keyboard(KeyboardEvent::Key {
time: _,
key,
state,
}) = event
{
let mut client_manager = self.client_manager.borrow_mut();
let client_state =
if let Some(client_state) = client_manager.get_mut(handle) {
client_state
} else {
log::error!("unknown handle");
return;
};
if state == 0 {
client_state.pressed_keys.remove(&key);
} else {
client_state.pressed_keys.insert(key);
let _ = timer_tx.try_send(());
}
}
// consume event
consumer.consume(event, handle).await;
log::trace!("{event:?} => consumer");
@@ -572,59 +740,33 @@ impl Server {
// be on the way until a leave event occurs
// telling us the client registered the enter
if let Event::Leave() = event {
state.replace(State::Sending);
self.state.replace(State::Sending);
log::trace!("STATE ===> Sending");
}
// entering a client that is waiting for a leave
// event should still be possible
if let Event::Enter() = event {
state.replace(State::Receiving);
log::trace!("STATE ===> Receiving");
self.state.replace(State::Receiving);
let _ = producer_notify_tx.send(ProducerEvent::Release).await;
log::trace!("STATE ===> Receiving");
}
}
}
}
}
let pong = {
let mut client_manager = client_manager.borrow_mut();
let client_state = match client_manager.get_mut(handle) {
Some(s) => s,
None => {
log::error!("unknown handle");
return;
}
};
// let the server know we are still alive once every second
if client_state.last_replied.is_none()
|| client_state.last_replied.is_some()
&& client_state.last_replied.unwrap().elapsed() > Duration::from_secs(1)
{
client_state.last_replied = Some(Instant::now());
true
} else {
false
}
};
if pong {
let _ = sender_tx.send((Event::Pong(), addr)).await;
}
}
const RELEASE_MODIFIERDS: u32 = 77; // ctrl+shift+super+alt
async fn handle_producer_event(
&self,
producer: &mut Box<dyn EventProducer>,
client_manager: &Rc<RefCell<ClientManager>>,
state: &Rc<Cell<State>>,
sender_tx: &Sender<(Event, SocketAddr)>,
c: ClientHandle,
mut e: Event,
timer_tx: &Sender<()>,
event: (ClientHandle, Event),
) {
let (c, mut e) = event;
log::trace!("producer: ({c}) {e:?}");
if let Event::Keyboard(crate::event::KeyboardEvent::Modifiers {
@@ -636,31 +778,26 @@ impl Server {
{
if mods_depressed == Self::RELEASE_MODIFIERDS {
producer.release();
state.replace(State::Receiving);
self.state.replace(State::Receiving);
log::trace!("STATE ===> Receiving");
// send an event to release all the modifiers
e = Event::Keyboard(KeyboardEvent::Modifiers {
mods_depressed: 0,
mods_latched: 0,
mods_locked: 0,
group: 0,
});
e = Event::Disconnect();
}
}
let (addr, enter, ping_addrs) = {
let (addr, enter, start_timer) = {
let mut enter = false;
let mut ping_addrs: Option<Vec<SocketAddr>> = None;
let mut start_timer = false;
// get client state for handle
let mut client_manager = client_manager.borrow_mut();
let mut client_manager = self.client_manager.borrow_mut();
let client_state = match client_manager.get_mut(c) {
Some(state) => state,
None => {
// should not happen
log::warn!("unknown client!");
producer.release();
state.replace(State::Receiving);
self.state.replace(State::Receiving);
log::trace!("STATE ===> Receiving");
return;
}
@@ -668,59 +805,30 @@ impl Server {
// if we just entered the client we want to send additional enter events until
// we get a leave event
if let State::Receiving | State::AwaitingLeave = state.get() {
state.replace(State::AwaitingLeave);
if let Event::Enter() = e {
self.state.replace(State::AwaitingLeave);
self.active_client.replace(Some(client_state.client.handle));
log::trace!("Active client => {}", client_state.client.handle);
start_timer = true;
log::trace!("STATE ===> AwaitingLeave");
enter = true;
}
let last_seen = match client_state.last_seen {
None => Duration::MAX,
Some(i) => i.elapsed(),
};
let last_pinged = match client_state.last_ping {
None => Duration::MAX,
Some(i) => i.elapsed(),
};
// not seen for one second but pinged at least 500ms ago
if last_seen > Duration::from_secs(1)
&& last_pinged > Duration::from_millis(500)
&& last_pinged < Duration::from_secs(1)
{
// client unresponsive -> set state to receiving
if state.get() != State::Receiving {
log::info!("client not responding - releasing pointer");
producer.release();
state.replace(State::Receiving);
log::trace!("STATE ===> Receiving");
}
}
// last ping > 500ms ago -> ping all interfaces
if last_pinged > Duration::from_millis(500) {
ping_addrs = Some(client_state.client.addrs.iter().cloned().collect());
client_state.last_ping = Some(Instant::now());
}
(client_state.client.active_addr, enter, ping_addrs)
(client_state.active_addr, enter, start_timer)
};
if start_timer {
let _ = timer_tx.try_send(());
}
if let Some(addr) = addr {
if enter {
let _ = sender_tx.send((Event::Enter(), addr)).await;
}
let _ = sender_tx.send((e, addr)).await;
}
if let Some(addrs) = ping_addrs {
for addr in addrs {
let _ = sender_tx.send((Event::Ping(), addr)).await;
}
}
}
async fn handle_frontend_stream(
client_manager: &Rc<RefCell<ClientManager>>,
&self,
frontend: &mut FrontendListener,
frontend_tx: &Sender<FrontendEvent>,
#[cfg(unix)] mut stream: ReadHalf<UnixStream>,
@@ -748,13 +856,13 @@ impl Server {
}
}
});
Self::enumerate(client_manager, frontend).await;
self.enumerate(frontend).await;
}
async fn handle_frontend_event(
&self,
producer_notify_tx: &Sender<ProducerEvent>,
consumer_notify_tx: &Sender<ConsumerEvent>,
client_manager: &Rc<RefCell<ClientManager>>,
resolve_tx: &Sender<(String, ClientHandle)>,
frontend: &mut FrontendListener,
port_tx: &Sender<u16>,
@@ -763,41 +871,21 @@ impl Server {
log::debug!("frontend: {event:?}");
match event {
FrontendEvent::AddClient(hostname, port, pos) => {
Self::add_client(
resolve_tx,
client_manager,
frontend,
hostname,
HashSet::new(),
port,
pos,
)
.await;
self.add_client(resolve_tx, frontend, hostname, HashSet::new(), port, pos)
.await;
}
FrontendEvent::ActivateClient(client, active) => {
Self::activate_client(
producer_notify_tx,
consumer_notify_tx,
client_manager,
client,
active,
)
.await
self.activate_client(producer_notify_tx, consumer_notify_tx, client, active)
.await
}
FrontendEvent::ChangePort(port) => {
let _ = port_tx.send(port).await;
}
FrontendEvent::DelClient(client) => {
Self::remove_client(
client_manager,
producer_notify_tx,
consumer_notify_tx,
frontend,
client,
)
.await;
self.remove_client(producer_notify_tx, consumer_notify_tx, frontend, client)
.await;
}
FrontendEvent::Enumerate() => Self::enumerate(client_manager, frontend).await,
FrontendEvent::Enumerate() => self.enumerate(frontend).await,
FrontendEvent::Shutdown() => {
log::info!("terminating gracefully...");
return true;
@@ -809,11 +897,10 @@ impl Server {
port,
pos,
};
Self::update_client(
self.update_client(
producer_notify_tx,
consumer_notify_tx,
resolve_tx,
client_manager,
client_update,
)
.await
@@ -822,11 +909,41 @@ impl Server {
false
}
async fn enumerate(
client_manager: &Rc<RefCell<ClientManager>>,
frontend: &mut FrontendListener,
) {
let clients = client_manager
async fn release_keys(&self, consumer: &mut Box<dyn EventConsumer>, client: ClientHandle) {
let keys = self
.client_manager
.borrow_mut()
.get_mut(client)
.iter_mut()
.flat_map(|s| s.pressed_keys.drain())
.collect::<Vec<_>>();
for key in keys {
let event = Event::Keyboard(KeyboardEvent::Key {
time: 0,
key,
state: 0,
});
consumer.consume(event, client).await;
if let Ok(key) = scancode::Linux::try_from(key) {
log::warn!("releasing stuck key: {key:?}");
}
}
let modifiers_event = KeyboardEvent::Modifiers {
mods_depressed: 0,
mods_latched: 0,
mods_locked: 0,
group: 0,
};
consumer
.consume(Event::Keyboard(modifiers_event), client)
.await;
}
async fn enumerate(&self, frontend: &mut FrontendListener) {
let clients = self
.client_manager
.borrow()
.get_client_states()
.map(|s| (s.client.clone(), s.active))
@@ -840,14 +957,10 @@ impl Server {
}
}
async fn receive_event(
socket: &UdpSocket,
) -> std::result::Result<(Event, SocketAddr), Box<dyn Error>> {
async fn receive_event(socket: &UdpSocket) -> anyhow::Result<(Event, SocketAddr)> {
let mut buf = vec![0u8; 22];
match socket.recv_from(&mut buf).await {
Ok((_amt, src)) => Ok((Event::try_from(buf)?, src)),
Err(e) => Err(Box::new(e)),
}
let (_amt, src) = socket.recv_from(&mut buf).await?;
Ok((Event::try_from(buf)?, src))
}
fn send_event(sock: &UdpSocket, e: Event, addr: SocketAddr) -> Result<usize> {