refactor timer task

This commit is contained in:
Ferdinand Schober
2024-01-19 00:54:10 +01:00
parent 36001c6fb2
commit 017bc43176
2 changed files with 143 additions and 119 deletions

View File

@@ -2,17 +2,13 @@ use log;
use std::{ use std::{
cell::{Cell, RefCell}, cell::{Cell, RefCell},
rc::Rc, rc::Rc,
time::Duration,
}; };
use tokio::signal; use tokio::signal;
use std::net::SocketAddr;
use crate::{ use crate::{
client::{ClientHandle, ClientManager}, client::{ClientHandle, ClientManager},
config::Config, config::Config,
dns, dns,
event::Event,
frontend::{FrontendEvent, FrontendListener}, frontend::{FrontendEvent, FrontendListener},
server::producer_task::ProducerEvent, server::producer_task::ProducerEvent,
}; };
@@ -23,11 +19,10 @@ use self::{consumer_task::ConsumerEvent, resolver_task::DnsRequest};
mod consumer_task; mod consumer_task;
mod frontend_task; mod frontend_task;
mod network_task; mod network_task;
mod ping_task;
mod producer_task; mod producer_task;
mod resolver_task; mod resolver_task;
const MAX_RESPONSE_TIME: Duration = Duration::from_millis(500);
#[derive(Clone, Copy, Debug, Eq, PartialEq)] #[derive(Clone, Copy, Debug, Eq, PartialEq)]
enum State { enum State {
/// Currently sending events to another device /// Currently sending events to another device
@@ -82,7 +77,7 @@ impl Server {
}; };
let (consumer, producer) = tokio::join!(consumer::create(), producer::create()); let (consumer, producer) = tokio::join!(consumer::create(), producer::create());
let (timer_tx, mut timer_rx) = tokio::sync::mpsc::channel(1); let (timer_tx, timer_rx) = tokio::sync::mpsc::channel(1);
let (frontend_notify_tx, frontend_notify_rx) = tokio::sync::mpsc::channel(1); let (frontend_notify_tx, frontend_notify_rx) = tokio::sync::mpsc::channel(1);
// udp task // udp task
@@ -118,116 +113,14 @@ impl Server {
port_tx, port_tx,
); );
// timer task // task that pings clients to see if they are responding
let server = self.clone(); let mut ping_task = ping_task::new(
let sender_ch = sender_tx.clone(); self.clone(),
let consumer_notify = consumer_channel.clone(); sender_tx.clone(),
let producer_notify = producer_channel.clone(); consumer_channel.clone(),
let mut live_tracker = tokio::task::spawn_local(async move { producer_channel.clone(),
loop { timer_rx,
// wait for wake up signal );
let Some(_): Option<()> = timer_rx.recv().await else {
break;
};
loop {
let receiving = server.state.get() == State::Receiving;
let (ping_clients, ping_addrs) = {
let mut client_manager = server.client_manager.borrow_mut();
let ping_clients: Vec<ClientHandle> = if receiving {
// if receiving we care about clients with pressed keys
client_manager
.get_client_states_mut()
.filter(|s| !s.pressed_keys.is_empty())
.map(|s| s.client.handle)
.collect()
} else {
// if sending we care about the active client
server.active_client.get().iter().cloned().collect()
};
// get relevant socket addrs for clients
let ping_addrs: Vec<SocketAddr> = {
ping_clients
.iter()
.flat_map(|&c| client_manager.get(c))
.flat_map(|state| {
if state.alive && state.active_addr.is_some() {
vec![state.active_addr.unwrap()]
} else {
state
.client
.ips
.iter()
.cloned()
.map(|ip| SocketAddr::new(ip, state.client.port))
.collect()
}
})
.collect()
};
// reset alive
for state in client_manager.get_client_states_mut() {
state.alive = false;
}
(ping_clients, ping_addrs)
};
if receiving && ping_clients.is_empty() {
// receiving and no client has pressed keys
// -> no need to keep pinging
break;
}
// ping clients
for addr in ping_addrs {
if sender_ch.send((Event::Ping(), addr)).await.is_err() {
break;
}
}
// give clients time to resond
if receiving {
log::debug!("waiting {MAX_RESPONSE_TIME:?} for response from client with pressed keys ...");
} else {
log::debug!("state: {:?} => waiting {MAX_RESPONSE_TIME:?} for client to respond ...", server.state.get());
}
tokio::time::sleep(MAX_RESPONSE_TIME).await;
// when anything is received from a client,
// the alive flag gets set
let unresponsive_clients: Vec<_> = {
let client_manager = server.client_manager.borrow();
ping_clients
.iter()
.filter_map(|&c| match client_manager.get(c) {
Some(state) if !state.alive => Some(c),
_ => None,
})
.collect()
};
// we may not be receiving anymore but we should respond
// to the original state and not the "new" one
if receiving {
for c in unresponsive_clients {
log::warn!("device not responding, releasing keys!");
let _ = consumer_notify.send(ConsumerEvent::ReleaseKeys(c)).await;
}
} else {
// release pointer if the active client has not responded
if !unresponsive_clients.is_empty() {
log::warn!("client not responding, releasing pointer!");
server.state.replace(State::Receiving);
let _ = producer_notify.send(ProducerEvent::Release).await;
}
}
}
}
});
let active = self let active = self
.client_manager .client_manager
@@ -271,7 +164,7 @@ impl Server {
} }
_ = &mut resolver_task => { } _ = &mut resolver_task => { }
_ = &mut udp_task => { } _ = &mut udp_task => { }
_ = &mut live_tracker => { } _ = &mut ping_task => { }
} }
let _ = consumer_channel.send(ConsumerEvent::Terminate).await; let _ = consumer_channel.send(ConsumerEvent::Terminate).await;
@@ -297,7 +190,7 @@ impl Server {
resolver_task.abort(); resolver_task.abort();
udp_task.abort(); udp_task.abort();
live_tracker.abort(); ping_task.abort();
Ok(()) Ok(())
} }

131
src/server/ping_task.rs Normal file
View File

@@ -0,0 +1,131 @@
use std::{net::SocketAddr, time::Duration};
use tokio::{
sync::mpsc::{Receiver, Sender},
task::JoinHandle,
};
use crate::{client::ClientHandle, event::Event};
use super::{consumer_task::ConsumerEvent, producer_task::ProducerEvent, Server, State};
const MAX_RESPONSE_TIME: Duration = Duration::from_millis(500);
pub fn new(
server: Server,
sender_ch: Sender<(Event, SocketAddr)>,
consumer_notify: Sender<ConsumerEvent>,
producer_notify: Sender<ProducerEvent>,
mut timer_rx: Receiver<()>,
) -> JoinHandle<()> {
// timer task
let ping_task = tokio::task::spawn_local(async move {
loop {
// wait for wake up signal
let Some(_): Option<()> = timer_rx.recv().await else {
break;
};
loop {
let receiving = server.state.get() == State::Receiving;
let (ping_clients, ping_addrs) = {
let mut client_manager = server.client_manager.borrow_mut();
let ping_clients: Vec<ClientHandle> = if receiving {
// if receiving we care about clients with pressed keys
client_manager
.get_client_states_mut()
.filter(|s| !s.pressed_keys.is_empty())
.map(|s| s.client.handle)
.collect()
} else {
// if sending we care about the active client
server.active_client.get().iter().cloned().collect()
};
// get relevant socket addrs for clients
let ping_addrs: Vec<SocketAddr> = {
ping_clients
.iter()
.flat_map(|&c| client_manager.get(c))
.flat_map(|state| {
if state.alive && state.active_addr.is_some() {
vec![state.active_addr.unwrap()]
} else {
state
.client
.ips
.iter()
.cloned()
.map(|ip| SocketAddr::new(ip, state.client.port))
.collect()
}
})
.collect()
};
// reset alive
for state in client_manager.get_client_states_mut() {
state.alive = false;
}
(ping_clients, ping_addrs)
};
if receiving && ping_clients.is_empty() {
// receiving and no client has pressed keys
// -> no need to keep pinging
break;
}
// ping clients
for addr in ping_addrs {
if sender_ch.send((Event::Ping(), addr)).await.is_err() {
break;
}
}
// give clients time to resond
if receiving {
log::debug!("waiting {MAX_RESPONSE_TIME:?} for response from client with pressed keys ...");
} else {
log::debug!(
"state: {:?} => waiting {MAX_RESPONSE_TIME:?} for client to respond ...",
server.state.get()
);
}
tokio::time::sleep(MAX_RESPONSE_TIME).await;
// when anything is received from a client,
// the alive flag gets set
let unresponsive_clients: Vec<_> = {
let client_manager = server.client_manager.borrow();
ping_clients
.iter()
.filter_map(|&c| match client_manager.get(c) {
Some(state) if !state.alive => Some(c),
_ => None,
})
.collect()
};
// we may not be receiving anymore but we should respond
// to the original state and not the "new" one
if receiving {
for c in unresponsive_clients {
log::warn!("device not responding, releasing keys!");
let _ = consumer_notify.send(ConsumerEvent::ReleaseKeys(c)).await;
}
} else {
// release pointer if the active client has not responded
if !unresponsive_clients.is_empty() {
log::warn!("client not responding, releasing pointer!");
server.state.replace(State::Receiving);
let _ = producer_notify.send(ProducerEvent::Release).await;
}
}
}
}
});
ping_task
}