mirror of
https://github.com/feschber/lan-mouse.git
synced 2026-03-07 11:59:59 +03:00
refactor producer task
This commit is contained in:
135
src/server.rs
135
src/server.rs
@@ -1,5 +1,4 @@
|
||||
use anyhow::anyhow;
|
||||
use futures::stream::StreamExt;
|
||||
use log;
|
||||
use std::{
|
||||
cell::{Cell, RefCell},
|
||||
@@ -25,8 +24,8 @@ use crate::{
|
||||
consumer::EventConsumer,
|
||||
dns,
|
||||
frontend::{self, FrontendEvent, FrontendListener, FrontendNotify},
|
||||
producer::EventProducer,
|
||||
scancode,
|
||||
server::producer_task::ProducerEvent,
|
||||
};
|
||||
use crate::{
|
||||
consumer,
|
||||
@@ -34,6 +33,8 @@ use crate::{
|
||||
producer,
|
||||
};
|
||||
|
||||
mod producer_task;
|
||||
|
||||
const MAX_RESPONSE_TIME: Duration = Duration::from_millis(500);
|
||||
|
||||
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
|
||||
@@ -47,16 +48,6 @@ enum State {
|
||||
AwaitingLeave,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug)]
|
||||
pub enum ProducerEvent {
|
||||
/// producer must release the mouse
|
||||
Release,
|
||||
/// producer is notified of a change in client states
|
||||
ClientEvent(ClientEvent),
|
||||
/// termination signal
|
||||
Terminate,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub enum ConsumerEvent {
|
||||
/// consumer is notified of a change in client states
|
||||
@@ -108,10 +99,9 @@ impl Server {
|
||||
return anyhow::Ok(());
|
||||
}
|
||||
};
|
||||
let (mut consumer, mut producer) = tokio::join!(consumer::create(), producer::create());
|
||||
let (mut consumer, producer) = tokio::join!(consumer::create(), producer::create());
|
||||
|
||||
let (frontend_tx, mut frontend_rx) = tokio::sync::mpsc::channel(32);
|
||||
let (producer_notify_tx, mut producer_notify_rx) = tokio::sync::mpsc::channel(32);
|
||||
let (consumer_notify_tx, mut consumer_notify_rx) = tokio::sync::mpsc::channel(32);
|
||||
let (resolve_tx, mut resolve_rx) = tokio::sync::mpsc::channel(32);
|
||||
let (frontend_notify_tx, mut frontend_notify_rx) = tokio::sync::mpsc::channel(32);
|
||||
@@ -121,40 +111,13 @@ impl Server {
|
||||
let (timer_tx, mut timer_rx) = tokio::sync::mpsc::channel(1);
|
||||
|
||||
// event producer
|
||||
let sender_ch = sender_tx.clone();
|
||||
let timer_ch = timer_tx.clone();
|
||||
let server = self.clone();
|
||||
let mut producer_task = tokio::task::spawn_local(async move {
|
||||
loop {
|
||||
tokio::select! {
|
||||
event = producer.next() => {
|
||||
let event = event.ok_or(anyhow!("event producer closed"))??;
|
||||
server.handle_producer_event(&mut producer, &sender_ch, &timer_ch, event).await?;
|
||||
}
|
||||
e = producer_notify_rx.recv() => {
|
||||
log::debug!("producer notify rx: {e:?}");
|
||||
match e {
|
||||
Some(e) => match e {
|
||||
ProducerEvent::Release => {
|
||||
producer.release()?;
|
||||
server.state.replace(State::Receiving);
|
||||
|
||||
}
|
||||
ProducerEvent::ClientEvent(e) => producer.notify(e)?,
|
||||
ProducerEvent::Terminate => break,
|
||||
},
|
||||
None => break,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
anyhow::Ok(())
|
||||
});
|
||||
let (mut producer_task, producer_channel) =
|
||||
producer_task::new(producer, self.clone(), sender_tx.clone(), timer_tx.clone());
|
||||
|
||||
// event consumer
|
||||
let producer_notify = producer_notify_tx.clone();
|
||||
let sender_ch = sender_tx.clone();
|
||||
let server = self.clone();
|
||||
let producer_tx = producer_channel.clone();
|
||||
let mut consumer_task = tokio::task::spawn_local(async move {
|
||||
let mut last_ignored = None;
|
||||
|
||||
@@ -162,7 +125,7 @@ impl Server {
|
||||
tokio::select! {
|
||||
udp_event = receiver_rx.recv() => {
|
||||
let udp_event = udp_event.ok_or(anyhow!("receiver closed"))??;
|
||||
server.handle_udp_rx(&producer_notify, &mut consumer, &sender_ch, &mut last_ignored, udp_event, &timer_tx).await;
|
||||
server.handle_udp_rx(&producer_tx, &mut consumer, &sender_ch, &mut last_ignored, udp_event, &timer_tx).await;
|
||||
}
|
||||
consumer_event = consumer_notify_rx.recv() => {
|
||||
match consumer_event {
|
||||
@@ -196,7 +159,7 @@ impl Server {
|
||||
|
||||
// frontend listener
|
||||
let server = self.clone();
|
||||
let producer_notify = producer_notify_tx.clone();
|
||||
let producer_notify = producer_channel.clone();
|
||||
let consumer_notify = consumer_notify_tx.clone();
|
||||
let frontend_ch = frontend_tx.clone();
|
||||
let resolve_ch = resolve_tx.clone();
|
||||
@@ -309,7 +272,7 @@ impl Server {
|
||||
let server = self.clone();
|
||||
let sender_ch = sender_tx.clone();
|
||||
let consumer_notify = consumer_notify_tx.clone();
|
||||
let producer_notify = producer_notify_tx.clone();
|
||||
let producer_notify = producer_channel.clone();
|
||||
let mut live_tracker = tokio::task::spawn_local(async move {
|
||||
loop {
|
||||
// wait for wake up signal
|
||||
@@ -462,7 +425,7 @@ impl Server {
|
||||
}
|
||||
|
||||
let _ = consumer_notify_tx.send(ConsumerEvent::Terminate).await;
|
||||
let _ = producer_notify_tx.send(ProducerEvent::Terminate).await;
|
||||
let _ = producer_channel.send(ProducerEvent::Terminate).await;
|
||||
let _ = frontend_tx.send(FrontendEvent::Shutdown()).await;
|
||||
|
||||
if !producer_task.is_finished() {
|
||||
@@ -766,82 +729,6 @@ impl Server {
|
||||
}
|
||||
}
|
||||
|
||||
const RELEASE_MODIFIERDS: u32 = 77; // ctrl+shift+super+alt
|
||||
|
||||
async fn handle_producer_event(
|
||||
&self,
|
||||
producer: &mut Box<dyn EventProducer>,
|
||||
sender_tx: &Sender<(Event, SocketAddr)>,
|
||||
timer_tx: &Sender<()>,
|
||||
event: (ClientHandle, Event),
|
||||
) -> Result<()> {
|
||||
let (c, mut e) = event;
|
||||
log::trace!("producer: ({c}) {e:?}");
|
||||
|
||||
if let Event::Keyboard(crate::event::KeyboardEvent::Modifiers {
|
||||
mods_depressed,
|
||||
mods_latched: _,
|
||||
mods_locked: _,
|
||||
group: _,
|
||||
}) = e
|
||||
{
|
||||
if mods_depressed == Self::RELEASE_MODIFIERDS {
|
||||
producer.release()?;
|
||||
self.state.replace(State::Receiving);
|
||||
log::trace!("STATE ===> Receiving");
|
||||
// send an event to release all the modifiers
|
||||
e = Event::Disconnect();
|
||||
}
|
||||
}
|
||||
|
||||
let (addr, enter, start_timer) = {
|
||||
let mut enter = false;
|
||||
let mut start_timer = false;
|
||||
|
||||
// get client state for handle
|
||||
let mut client_manager = self.client_manager.borrow_mut();
|
||||
let client_state = match client_manager.get_mut(c) {
|
||||
Some(state) => state,
|
||||
None => {
|
||||
// should not happen
|
||||
log::warn!("unknown client!");
|
||||
producer.release()?;
|
||||
self.state.replace(State::Receiving);
|
||||
log::trace!("STATE ===> Receiving");
|
||||
return Ok(());
|
||||
}
|
||||
};
|
||||
|
||||
// if we just entered the client we want to send additional enter events until
|
||||
// we get a leave event
|
||||
if let Event::Enter() = e {
|
||||
self.state.replace(State::AwaitingLeave);
|
||||
self.active_client.replace(Some(client_state.client.handle));
|
||||
log::trace!("Active client => {}", client_state.client.handle);
|
||||
start_timer = true;
|
||||
log::trace!("STATE ===> AwaitingLeave");
|
||||
enter = true;
|
||||
} else {
|
||||
// ignore any potential events in receiving mode
|
||||
if self.state.get() == State::Receiving && e != Event::Disconnect() {
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
|
||||
(client_state.active_addr, enter, start_timer)
|
||||
};
|
||||
if start_timer {
|
||||
let _ = timer_tx.try_send(());
|
||||
}
|
||||
if let Some(addr) = addr {
|
||||
if enter {
|
||||
let _ = sender_tx.send((Event::Enter(), addr)).await;
|
||||
}
|
||||
let _ = sender_tx.send((e, addr)).await;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_frontend_stream(
|
||||
&self,
|
||||
frontend_tx: &Sender<FrontendEvent>,
|
||||
|
||||
132
src/server/producer_task.rs
Normal file
132
src/server/producer_task.rs
Normal file
@@ -0,0 +1,132 @@
|
||||
use anyhow::{anyhow, Result};
|
||||
use futures::StreamExt;
|
||||
use std::net::SocketAddr;
|
||||
|
||||
use tokio::{sync::mpsc::Sender, task::JoinHandle};
|
||||
|
||||
use crate::{
|
||||
client::{ClientEvent, ClientHandle},
|
||||
event::{Event, KeyboardEvent},
|
||||
producer::EventProducer,
|
||||
server::State,
|
||||
};
|
||||
|
||||
use super::Server;
|
||||
|
||||
#[derive(Clone, Copy, Debug)]
|
||||
pub enum ProducerEvent {
|
||||
/// producer must release the mouse
|
||||
Release,
|
||||
/// producer is notified of a change in client states
|
||||
ClientEvent(ClientEvent),
|
||||
/// termination signal
|
||||
Terminate,
|
||||
}
|
||||
|
||||
pub fn new(
|
||||
mut producer: Box<dyn EventProducer>,
|
||||
server: Server,
|
||||
sender_tx: Sender<(Event, SocketAddr)>,
|
||||
timer_tx: Sender<()>,
|
||||
) -> (JoinHandle<Result<()>>, Sender<ProducerEvent>) {
|
||||
let (tx, mut rx) = tokio::sync::mpsc::channel(32);
|
||||
let task = tokio::task::spawn_local(async move {
|
||||
loop {
|
||||
tokio::select! {
|
||||
event = producer.next() => {
|
||||
let event = event.ok_or(anyhow!("event producer closed"))??;
|
||||
handle_producer_event(&server, &mut producer, &sender_tx, &timer_tx, event).await?;
|
||||
}
|
||||
e = rx.recv() => {
|
||||
log::debug!("producer notify rx: {e:?}");
|
||||
match e {
|
||||
Some(e) => match e {
|
||||
ProducerEvent::Release => {
|
||||
producer.release()?;
|
||||
server.state.replace(State::Receiving);
|
||||
|
||||
}
|
||||
ProducerEvent::ClientEvent(e) => producer.notify(e)?,
|
||||
ProducerEvent::Terminate => break,
|
||||
},
|
||||
None => break,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
anyhow::Ok(())
|
||||
});
|
||||
(task, tx)
|
||||
}
|
||||
|
||||
const RELEASE_MODIFIERDS: u32 = 77; // ctrl+shift+super+alt
|
||||
|
||||
async fn handle_producer_event(
|
||||
server: &Server,
|
||||
producer: &mut Box<dyn EventProducer>,
|
||||
sender_tx: &Sender<(Event, SocketAddr)>,
|
||||
timer_tx: &Sender<()>,
|
||||
event: (ClientHandle, Event),
|
||||
) -> Result<()> {
|
||||
let (c, mut e) = event;
|
||||
log::trace!("({c}) {e:?}");
|
||||
|
||||
if let Event::Keyboard(KeyboardEvent::Modifiers { mods_depressed, .. }) = e {
|
||||
if mods_depressed == RELEASE_MODIFIERDS {
|
||||
producer.release()?;
|
||||
server.state.replace(State::Receiving);
|
||||
log::trace!("STATE ===> Receiving");
|
||||
// send an event to release all the modifiers
|
||||
e = Event::Disconnect();
|
||||
}
|
||||
}
|
||||
|
||||
let (addr, enter, start_timer) = {
|
||||
let mut enter = false;
|
||||
let mut start_timer = false;
|
||||
|
||||
// get client state for handle
|
||||
let mut client_manager = server.client_manager.borrow_mut();
|
||||
let client_state = match client_manager.get_mut(c) {
|
||||
Some(state) => state,
|
||||
None => {
|
||||
// should not happen
|
||||
log::warn!("unknown client!");
|
||||
producer.release()?;
|
||||
server.state.replace(State::Receiving);
|
||||
log::trace!("STATE ===> Receiving");
|
||||
return Ok(());
|
||||
}
|
||||
};
|
||||
|
||||
// if we just entered the client we want to send additional enter events until
|
||||
// we get a leave event
|
||||
if let Event::Enter() = e {
|
||||
server.state.replace(State::AwaitingLeave);
|
||||
server
|
||||
.active_client
|
||||
.replace(Some(client_state.client.handle));
|
||||
log::trace!("Active client => {}", client_state.client.handle);
|
||||
start_timer = true;
|
||||
log::trace!("STATE ===> AwaitingLeave");
|
||||
enter = true;
|
||||
} else {
|
||||
// ignore any potential events in receiving mode
|
||||
if server.state.get() == State::Receiving && e != Event::Disconnect() {
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
|
||||
(client_state.active_addr, enter, start_timer)
|
||||
};
|
||||
if start_timer {
|
||||
let _ = timer_tx.try_send(());
|
||||
}
|
||||
if let Some(addr) = addr {
|
||||
if enter {
|
||||
let _ = sender_tx.send((Event::Enter(), addr)).await;
|
||||
}
|
||||
let _ = sender_tx.send((e, addr)).await;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
Reference in New Issue
Block a user