refactor consumer task

This commit is contained in:
Ferdinand Schober
2024-01-18 23:47:26 +01:00
parent ecab3a360d
commit 61ff05c95a
2 changed files with 256 additions and 222 deletions

View File

@@ -21,18 +21,16 @@ use std::{io::ErrorKind, net::SocketAddr};
use crate::{
client::{ClientEvent, ClientHandle, ClientManager, Position},
config::Config,
consumer::EventConsumer,
dns,
event::Event,
frontend::{self, FrontendEvent, FrontendListener, FrontendNotify},
scancode,
server::producer_task::ProducerEvent,
};
use crate::{
consumer,
event::{Event, KeyboardEvent},
producer,
};
use crate::{consumer, producer};
use self::consumer_task::ConsumerEvent;
mod consumer_task;
mod producer_task;
const MAX_RESPONSE_TIME: Duration = Duration::from_millis(500);
@@ -48,16 +46,6 @@ enum State {
AwaitingLeave,
}
#[derive(Clone, Debug)]
pub enum ConsumerEvent {
/// consumer is notified of a change in client states
ClientEvent(ClientEvent),
/// consumer must release keys for client
ReleaseKeys(ClientHandle),
/// termination signal
Terminate,
}
#[derive(Clone)]
pub struct Server {
active_client: Rc<Cell<Option<ClientHandle>>>,
@@ -99,13 +87,12 @@ impl Server {
return anyhow::Ok(());
}
};
let (mut consumer, producer) = tokio::join!(consumer::create(), producer::create());
let (consumer, producer) = tokio::join!(consumer::create(), producer::create());
let (frontend_tx, mut frontend_rx) = tokio::sync::mpsc::channel(32);
let (consumer_notify_tx, mut consumer_notify_rx) = tokio::sync::mpsc::channel(32);
let (resolve_tx, mut resolve_rx) = tokio::sync::mpsc::channel(32);
let (frontend_notify_tx, mut frontend_notify_rx) = tokio::sync::mpsc::channel(32);
let (receiver_tx, mut receiver_rx) = tokio::sync::mpsc::channel(32);
let (receiver_tx, receiver_rx) = tokio::sync::mpsc::channel(32);
let (sender_tx, mut sender_rx) = tokio::sync::mpsc::channel(32);
let (port_tx, mut port_rx) = tokio::sync::mpsc::channel(32);
let (timer_tx, mut timer_rx) = tokio::sync::mpsc::channel(1);
@@ -115,52 +102,19 @@ impl Server {
producer_task::new(producer, self.clone(), sender_tx.clone(), timer_tx.clone());
// event consumer
let sender_ch = sender_tx.clone();
let server = self.clone();
let producer_tx = producer_channel.clone();
let mut consumer_task = tokio::task::spawn_local(async move {
let mut last_ignored = None;
loop {
tokio::select! {
udp_event = receiver_rx.recv() => {
let udp_event = udp_event.ok_or(anyhow!("receiver closed"))??;
server.handle_udp_rx(&producer_tx, &mut consumer, &sender_ch, &mut last_ignored, udp_event, &timer_tx).await;
}
consumer_event = consumer_notify_rx.recv() => {
match consumer_event {
Some(e) => match e {
ConsumerEvent::ClientEvent(e) => consumer.notify(e).await,
ConsumerEvent::ReleaseKeys(c) => server.release_keys(&mut consumer, c).await,
ConsumerEvent::Terminate => break,
},
None => break,
}
}
_ = consumer.dispatch() => { }
}
}
// release potentially still pressed keys
let clients = server
.client_manager
.borrow()
.get_client_states()
.map(|s| s.client.handle)
.collect::<Vec<_>>();
for client in clients {
server.release_keys(&mut consumer, client).await;
}
// destroy consumer
consumer.destroy().await;
anyhow::Ok(())
});
let (mut consumer_task, consumer_channel) = consumer_task::new(
consumer,
self.clone(),
receiver_rx,
sender_tx.clone(),
producer_channel.clone(),
timer_tx,
);
// frontend listener
let server = self.clone();
let producer_notify = producer_channel.clone();
let consumer_notify = consumer_notify_tx.clone();
let consumer_notify = consumer_channel.clone();
let frontend_ch = frontend_tx.clone();
let resolve_ch = resolve_tx.clone();
let mut frontend_task = tokio::task::spawn_local(async move {
@@ -271,7 +225,7 @@ impl Server {
// timer task
let server = self.clone();
let sender_ch = sender_tx.clone();
let consumer_notify = consumer_notify_tx.clone();
let consumer_notify = consumer_channel.clone();
let producer_notify = producer_channel.clone();
let mut live_tracker = tokio::task::spawn_local(async move {
loop {
@@ -424,7 +378,7 @@ impl Server {
_ = &mut live_tracker => { }
}
let _ = consumer_notify_tx.send(ConsumerEvent::Terminate).await;
let _ = consumer_channel.send(ConsumerEvent::Terminate).await;
let _ = producer_channel.send(ProducerEvent::Terminate).await;
let _ = frontend_tx.send(FrontendEvent::Shutdown()).await;
@@ -603,132 +557,6 @@ impl Server {
}
}
async fn handle_udp_rx(
&self,
producer_notify_tx: &Sender<ProducerEvent>,
consumer: &mut Box<dyn EventConsumer>,
sender_tx: &Sender<(Event, SocketAddr)>,
last_ignored: &mut Option<SocketAddr>,
event: (Event, SocketAddr),
timer_tx: &Sender<()>,
) {
let (event, addr) = event;
// get handle for addr
let handle = match self.client_manager.borrow().get_client(addr) {
Some(a) => a,
None => {
if last_ignored.is_none() || last_ignored.is_some() && last_ignored.unwrap() != addr
{
log::warn!("ignoring events from client {addr}");
last_ignored.replace(addr);
}
return;
}
};
// next event can be logged as ignored again
last_ignored.take();
log::trace!("{:20} <-<-<-<------ {addr} ({handle})", event.to_string());
{
let mut client_manager = self.client_manager.borrow_mut();
let client_state = match client_manager.get_mut(handle) {
Some(s) => s,
None => {
log::error!("unknown handle");
return;
}
};
// reset ttl for client and
client_state.alive = true;
// set addr as new default for this client
client_state.active_addr = Some(addr);
}
match (event, addr) {
(Event::Pong(), _) => { /* ignore pong events */ }
(Event::Ping(), addr) => {
let _ = sender_tx.send((Event::Pong(), addr)).await;
}
(Event::Disconnect(), _) => {
self.release_keys(consumer, handle).await;
}
(event, addr) => {
// tell clients that we are ready to receive events
if let Event::Enter() = event {
let _ = sender_tx.send((Event::Leave(), addr)).await;
}
match self.state.get() {
State::Sending => {
if let Event::Leave() = event {
// ignore additional leave events that may
// have been sent for redundancy
} else {
// upon receiving any event, we go back to receiving mode
self.state.replace(State::Receiving);
let _ = producer_notify_tx.send(ProducerEvent::Release).await;
log::trace!("STATE ===> Receiving");
}
}
State::Receiving => {
let mut ignore_event = false;
if let Event::Keyboard(KeyboardEvent::Key {
time: _,
key,
state,
}) = event
{
let mut client_manager = self.client_manager.borrow_mut();
let client_state =
if let Some(client_state) = client_manager.get_mut(handle) {
client_state
} else {
log::error!("unknown handle");
return;
};
if state == 0 {
// ignore release event if key not pressed
ignore_event = !client_state.pressed_keys.remove(&key);
} else {
// ignore press event if key not released
ignore_event = !client_state.pressed_keys.insert(key);
let _ = timer_tx.try_send(());
}
}
// ignore double press / release events to
// workaround buggy rdp backend.
if !ignore_event {
// consume event
consumer.consume(event, handle).await;
log::trace!("{event:?} => consumer");
}
}
State::AwaitingLeave => {
// we just entered the deadzone of a client, so
// we need to ignore events that may still
// be on the way until a leave event occurs
// telling us the client registered the enter
if let Event::Leave() = event {
self.state.replace(State::Sending);
log::trace!("STATE ===> Sending");
}
// entering a client that is waiting for a leave
// event should still be possible
if let Event::Enter() = event {
self.state.replace(State::Receiving);
let _ = producer_notify_tx.send(ProducerEvent::Release).await;
log::trace!("STATE ===> Receiving");
}
}
}
}
}
}
async fn handle_frontend_stream(
&self,
frontend_tx: &Sender<FrontendEvent>,
@@ -839,38 +667,6 @@ impl Server {
}
false
}
async fn release_keys(&self, consumer: &mut Box<dyn EventConsumer>, client: ClientHandle) {
let keys = self
.client_manager
.borrow_mut()
.get_mut(client)
.iter_mut()
.flat_map(|s| s.pressed_keys.drain())
.collect::<Vec<_>>();
for key in keys {
let event = Event::Keyboard(KeyboardEvent::Key {
time: 0,
key,
state: 0,
});
consumer.consume(event, client).await;
if let Ok(key) = scancode::Linux::try_from(key) {
log::warn!("releasing stuck key: {key:?}");
}
}
let modifiers_event = KeyboardEvent::Modifiers {
mods_depressed: 0,
mods_latched: 0,
mods_locked: 0,
group: 0,
};
consumer
.consume(Event::Keyboard(modifiers_event), client)
.await;
}
}
async fn receive_event(socket: &UdpSocket) -> anyhow::Result<(Event, SocketAddr)> {

238
src/server/consumer_task.rs Normal file
View File

@@ -0,0 +1,238 @@
use anyhow::{anyhow, Result};
use std::net::SocketAddr;
use tokio::{
sync::mpsc::{Receiver, Sender},
task::JoinHandle,
};
use crate::{
client::{ClientEvent, ClientHandle},
consumer::EventConsumer,
event::{Event, KeyboardEvent},
scancode,
server::State,
};
use super::{ProducerEvent, Server};
#[derive(Clone, Debug)]
pub enum ConsumerEvent {
/// consumer is notified of a change in client states
ClientEvent(ClientEvent),
/// consumer must release keys for client
ReleaseKeys(ClientHandle),
/// termination signal
Terminate,
}
pub fn new(
mut consumer: Box<dyn EventConsumer>,
server: Server,
mut udp_rx: Receiver<Result<(Event, SocketAddr)>>,
sender_tx: Sender<(Event, SocketAddr)>,
producer_tx: Sender<ProducerEvent>,
timer_tx: Sender<()>,
) -> (JoinHandle<Result<()>>, Sender<ConsumerEvent>) {
let (tx, mut rx) = tokio::sync::mpsc::channel(32);
let consumer_task = tokio::task::spawn_local(async move {
let mut last_ignored = None;
loop {
tokio::select! {
udp_event = udp_rx.recv() => {
let udp_event = udp_event.ok_or(anyhow!("receiver closed"))??;
handle_udp_rx(&server, &producer_tx, &mut consumer, &sender_tx, &mut last_ignored, udp_event, &timer_tx).await;
}
consumer_event = rx.recv() => {
match consumer_event {
Some(e) => match e {
ConsumerEvent::ClientEvent(e) => consumer.notify(e).await,
ConsumerEvent::ReleaseKeys(c) => release_keys(&server, &mut consumer, c).await,
ConsumerEvent::Terminate => break,
},
None => break,
}
}
_ = consumer.dispatch() => { }
}
}
// release potentially still pressed keys
let clients = server
.client_manager
.borrow()
.get_client_states()
.map(|s| s.client.handle)
.collect::<Vec<_>>();
for client in clients {
release_keys(&server, &mut consumer, client).await;
}
// destroy consumer
consumer.destroy().await;
anyhow::Ok(())
});
(consumer_task, tx)
}
async fn handle_udp_rx(
server: &Server,
producer_notify_tx: &Sender<ProducerEvent>,
consumer: &mut Box<dyn EventConsumer>,
sender_tx: &Sender<(Event, SocketAddr)>,
last_ignored: &mut Option<SocketAddr>,
event: (Event, SocketAddr),
timer_tx: &Sender<()>,
) {
let (event, addr) = event;
// get handle for addr
let handle = match server.client_manager.borrow().get_client(addr) {
Some(a) => a,
None => {
if last_ignored.is_none() || last_ignored.is_some() && last_ignored.unwrap() != addr {
log::warn!("ignoring events from client {addr}");
last_ignored.replace(addr);
}
return;
}
};
// next event can be logged as ignored again
last_ignored.take();
log::trace!("{:20} <-<-<-<------ {addr} ({handle})", event.to_string());
{
let mut client_manager = server.client_manager.borrow_mut();
let client_state = match client_manager.get_mut(handle) {
Some(s) => s,
None => {
log::error!("unknown handle");
return;
}
};
// reset ttl for client and
client_state.alive = true;
// set addr as new default for this client
client_state.active_addr = Some(addr);
}
match (event, addr) {
(Event::Pong(), _) => { /* ignore pong events */ }
(Event::Ping(), addr) => {
let _ = sender_tx.send((Event::Pong(), addr)).await;
}
(Event::Disconnect(), _) => {
release_keys(server, consumer, handle).await;
}
(event, addr) => {
// tell clients that we are ready to receive events
if let Event::Enter() = event {
let _ = sender_tx.send((Event::Leave(), addr)).await;
}
match server.state.get() {
State::Sending => {
if let Event::Leave() = event {
// ignore additional leave events that may
// have been sent for redundancy
} else {
// upon receiving any event, we go back to receiving mode
server.state.replace(State::Receiving);
let _ = producer_notify_tx.send(ProducerEvent::Release).await;
log::trace!("STATE ===> Receiving");
}
}
State::Receiving => {
let mut ignore_event = false;
if let Event::Keyboard(KeyboardEvent::Key {
time: _,
key,
state,
}) = event
{
let mut client_manager = server.client_manager.borrow_mut();
let client_state =
if let Some(client_state) = client_manager.get_mut(handle) {
client_state
} else {
log::error!("unknown handle");
return;
};
if state == 0 {
// ignore release event if key not pressed
ignore_event = !client_state.pressed_keys.remove(&key);
} else {
// ignore press event if key not released
ignore_event = !client_state.pressed_keys.insert(key);
let _ = timer_tx.try_send(());
}
}
// ignore double press / release events to
// workaround buggy rdp backend.
if !ignore_event {
// consume event
consumer.consume(event, handle).await;
log::trace!("{event:?} => consumer");
}
}
State::AwaitingLeave => {
// we just entered the deadzone of a client, so
// we need to ignore events that may still
// be on the way until a leave event occurs
// telling us the client registered the enter
if let Event::Leave() = event {
server.state.replace(State::Sending);
log::trace!("STATE ===> Sending");
}
// entering a client that is waiting for a leave
// event should still be possible
if let Event::Enter() = event {
server.state.replace(State::Receiving);
let _ = producer_notify_tx.send(ProducerEvent::Release).await;
log::trace!("STATE ===> Receiving");
}
}
}
}
}
}
async fn release_keys(
server: &Server,
consumer: &mut Box<dyn EventConsumer>,
client: ClientHandle,
) {
let keys = server
.client_manager
.borrow_mut()
.get_mut(client)
.iter_mut()
.flat_map(|s| s.pressed_keys.drain())
.collect::<Vec<_>>();
for key in keys {
let event = Event::Keyboard(KeyboardEvent::Key {
time: 0,
key,
state: 0,
});
consumer.consume(event, client).await;
if let Ok(key) = scancode::Linux::try_from(key) {
log::warn!("releasing stuck key: {key:?}");
}
}
let modifiers_event = KeyboardEvent::Modifiers {
mods_depressed: 0,
mods_latched: 0,
mods_locked: 0,
group: 0,
};
consumer
.consume(Event::Keyboard(modifiers_event), client)
.await;
}