cleanup server code

This commit is contained in:
Ferdinand Schober
2023-12-15 20:24:14 +01:00
parent 0c275bc2de
commit 5fc02d471b
3 changed files with 30 additions and 62 deletions

View File

@@ -566,7 +566,6 @@ impl Stream for WaylandEventProducer {
type Item = io::Result<(ClientHandle, Event)>; type Item = io::Result<(ClientHandle, Event)>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
log::trace!("producer.next()");
if let Some(event) = self.0.get_mut().state.pending_events.pop_front() { if let Some(event) = self.0.get_mut().state.pending_events.pop_front() {
return Poll::Ready(Some(Ok(event))); return Poll::Ready(Some(Ok(event)));
} }

View File

@@ -201,8 +201,6 @@ impl FrontendListener {
#[cfg(unix)] #[cfg(unix)]
pub async fn accept(&mut self) -> Result<ReadHalf<UnixStream>> { pub async fn accept(&mut self) -> Result<ReadHalf<UnixStream>> {
log::trace!("frontend.accept()");
let stream = self.listener.accept().await?.0; let stream = self.listener.accept().await?.0;
let (rx, tx) = tokio::io::split(stream); let (rx, tx) = tokio::io::split(stream);
self.tx_streams.push(tx); self.tx_streams.push(tx);

View File

@@ -32,8 +32,6 @@ use crate::{
producer::EventProducer, producer::EventProducer,
}; };
/// keeps track of state to prevent a feedback loop
/// of continuously sending and receiving the same event.
#[derive(Eq, PartialEq)] #[derive(Eq, PartialEq)]
enum State { enum State {
Sending, Sending,
@@ -93,11 +91,9 @@ impl Server {
pub async fn run(&mut self) -> anyhow::Result<()> { pub async fn run(&mut self) -> anyhow::Result<()> {
loop { loop {
log::trace!("polling ...");
tokio::select! { tokio::select! {
// safety: cancellation safe // safety: cancellation safe
udp_event = receive_event(&self.socket) => { udp_event = receive_event(&self.socket) => {
log::trace!("-> receive_event");
match udp_event { match udp_event {
Ok(e) => self.handle_udp_rx(e).await, Ok(e) => self.handle_udp_rx(e).await,
Err(e) => log::error!("error reading event: {e}"), Err(e) => log::error!("error reading event: {e}"),
@@ -105,7 +101,6 @@ impl Server {
} }
// safety: cancellation safe // safety: cancellation safe
res = self.producer.next() => { res = self.producer.next() => {
log::trace!("-> producer.next()");
match res { match res {
Some(Ok((client, event))) => { Some(Ok((client, event))) => {
self.handle_producer_event(client,event).await; self.handle_producer_event(client,event).await;
@@ -116,7 +111,6 @@ impl Server {
} }
// safety: cancellation safe // safety: cancellation safe
stream = self.frontend.accept() => { stream = self.frontend.accept() => {
log::trace!("-> frontend.accept()");
match stream { match stream {
Ok(s) => self.handle_frontend_stream(s).await, Ok(s) => self.handle_frontend_stream(s).await,
Err(e) => log::error!("error connecting to frontend: {e}"), Err(e) => log::error!("error connecting to frontend: {e}"),
@@ -124,7 +118,6 @@ impl Server {
} }
// safety: cancellation safe // safety: cancellation safe
frontend_event = self.frontend_rx.recv() => { frontend_event = self.frontend_rx.recv() => {
log::trace!("-> frontend.recv()");
if let Some(event) = frontend_event { if let Some(event) = frontend_event {
if self.handle_frontend_event(event).await { if self.handle_frontend_event(event).await {
break; break;
@@ -133,7 +126,6 @@ impl Server {
} }
// safety: cancellation safe // safety: cancellation safe
e = self.consumer.dispatch() => { e = self.consumer.dispatch() => {
log::trace!("-> consumer.dispatch()");
e?; e?;
} }
// safety: cancellation safe // safety: cancellation safe
@@ -314,43 +306,35 @@ impl Server {
state.last_seen = Some(Instant::now()); state.last_seen = Some(Instant::now());
// set addr as new default for this client // set addr as new default for this client
state.client.active_addr = Some(addr); state.client.active_addr = Some(addr);
match (event, addr) { match (event, addr) {
(Event::Pong(), _) => {} (Event::Pong(), _) => {} // ignore pong events
(Event::Ping(), addr) => { (Event::Ping(), addr) => {
if let Err(e) = send_event(&self.socket, Event::Pong(), addr).await { if let Err(e) = send_event(&self.socket, Event::Pong(), addr).await {
log::error!("udp send: {}", e); log::error!("udp send: {}", e);
} }
// we release the mouse here,
// since its very likely, that we wont get a release event
self.producer.release();
} }
(event, addr) => match self.state { (event, addr) => {
State::Sending => { // device is sending events => release pointer if captured
// in sending state, we dont want to process if self.state == State::Sending {
// any events to avoid feedback loops, log::debug!("releasing pointer ...");
// therefore we tell the event producer self.producer.release();
// to release the pointer and move on self.state = State::Receiving;
// first event -> release pointer
if let Event::Release() = event {
log::debug!("releasing pointer ...");
self.producer.release();
self.state = State::Receiving;
}
} }
State::Receiving => {
// consume event
self.consumer.consume(event, handle).await;
// let the server know we are still alive once every second // consume event
let last_replied = state.last_replied; self.consumer.consume(event, handle).await;
if last_replied.is_none() log::trace!("{event:?} => consumer");
|| last_replied.is_some()
&& last_replied.unwrap().elapsed() > Duration::from_secs(1) // let the server know we are still alive once every second
{ let last_replied = state.last_replied;
state.last_replied = Some(Instant::now()); if last_replied.is_none()
if let Err(e) = send_event(&self.socket, Event::Pong(), addr).await { || last_replied.is_some()
log::error!("udp send: {}", e); && last_replied.unwrap().elapsed() > Duration::from_secs(1)
} {
state.last_replied = Some(Instant::now());
if let Err(e) = send_event(&self.socket, Event::Pong(), addr).await {
log::error!("udp send: {}", e);
} }
} }
}, },
@@ -358,14 +342,9 @@ impl Server {
} }
async fn handle_producer_event(&mut self, c: ClientHandle, e: Event) { async fn handle_producer_event(&mut self, c: ClientHandle, e: Event) {
let mut should_release = false;
// in receiving state, only release events
// must be transmitted
if let Event::Release() = e {
self.state = State::Sending;
}
log::trace!("producer: ({c}) {e:?}"); log::trace!("producer: ({c}) {e:?}");
// get client state for handle
let state = match self.client_manager.get_mut(c) { let state = match self.client_manager.get_mut(c) {
Some(state) => state, Some(state) => state,
None => { None => {
@@ -373,7 +352,11 @@ impl Server {
return; return;
} }
}; };
// otherwise we should have an address to send to
// we are sending events
self.state = State::Sending;
// otherwise we should have an address to
// transmit events to the corrensponding client // transmit events to the corrensponding client
if let Some(addr) = state.client.active_addr { if let Some(addr) = state.client.active_addr {
if let Err(e) = send_event(&self.socket, e, addr).await { if let Err(e) = send_event(&self.socket, e, addr).await {
@@ -399,7 +382,8 @@ impl Server {
// release mouse if client didnt respond to the first ping // release mouse if client didnt respond to the first ping
if state.last_ping.is_some() && state.last_ping.unwrap().elapsed() < Duration::from_secs(1) if state.last_ping.is_some() && state.last_ping.unwrap().elapsed() < Duration::from_secs(1)
{ {
should_release = true; log::info!("client not responding - releasing pointer");
self.producer.release();
} }
// last ping > 500ms ago -> ping all interfaces // last ping > 500ms ago -> ping all interfaces
@@ -411,18 +395,6 @@ impl Server {
log::error!("udp send: {}", e); log::error!("udp send: {}", e);
} }
} }
// send additional release event, in case client is still in sending mode
if let Err(e) = send_event(&self.socket, Event::Release(), *addr).await {
if e.kind() != ErrorKind::WouldBlock {
log::error!("udp send: {}", e);
}
}
}
if should_release && self.state != State::Receiving {
log::info!("client not responding - releasing pointer");
self.producer.release();
self.state = State::Receiving;
} }
} }
@@ -544,7 +516,6 @@ impl Server {
async fn receive_event( async fn receive_event(
socket: &UdpSocket, socket: &UdpSocket,
) -> std::result::Result<(Event, SocketAddr), Box<dyn Error>> { ) -> std::result::Result<(Event, SocketAddr), Box<dyn Error>> {
log::trace!("receive_event");
let mut buf = vec![0u8; 22]; let mut buf = vec![0u8; 22];
match socket.recv_from(&mut buf).await { match socket.recv_from(&mut buf).await {
Ok((_amt, src)) => Ok((Event::try_from(buf)?, src)), Ok((_amt, src)) => Ok((Event::try_from(buf)?, src)),