Activate on startup (#70)

Frontends are now properly synced among each other and on startup the correct state is reflected.

Closes #75 
Closes #68
This commit is contained in:
Ferdinand Schober
2024-01-16 12:59:39 +01:00
committed by GitHub
parent 2e52660714
commit d90eb0cd0f
14 changed files with 294 additions and 224 deletions

View File

@@ -67,14 +67,6 @@ pub enum ConsumerEvent {
Terminate,
}
#[derive(Clone)]
struct ClientUpdate {
client: ClientHandle,
hostname: Option<String>,
port: u16,
pos: Position,
}
#[derive(Clone)]
pub struct Server {
active_client: Rc<Cell<Option<ClientHandle>>>,
@@ -89,8 +81,14 @@ impl Server {
let client_manager = Rc::new(RefCell::new(ClientManager::new()));
let state = Rc::new(Cell::new(State::Receiving));
let port = Rc::new(Cell::new(config.port));
for (ips, host, port, pos) in config.get_clients() {
client_manager.borrow_mut().add_client(host, ips, port, pos);
for config_client in config.get_clients() {
client_manager.borrow_mut().add_client(
config_client.hostname,
config_client.ips,
config_client.port,
config_client.pos,
config_client.active,
);
}
Self {
active_client,
@@ -212,7 +210,7 @@ impl Server {
continue;
}
};
server.handle_frontend_stream(&mut frontend, &frontend_ch, stream).await;
server.handle_frontend_stream(&frontend_ch, stream).await;
}
event = frontend_rx.recv() => {
let frontend_event = event.ok_or(anyhow!("frontend channel closed"))?;
@@ -421,6 +419,23 @@ impl Server {
// initial sync of clients
frontend_tx.send(FrontendEvent::Enumerate()).await?;
let active = self
.client_manager
.borrow()
.get_client_states()
.filter_map(|s| {
if s.active {
Some(s.client.handle)
} else {
None
}
})
.collect::<Vec<_>>();
for client in active {
frontend_tx
.send(FrontendEvent::ActivateClient(client, true))
.await?;
}
tokio::select! {
_ = signal::ctrl_c() => {
@@ -477,7 +492,6 @@ impl Server {
pub async fn add_client(
&self,
resolver_tx: &Sender<(String, ClientHandle)>,
frontend: &mut FrontendListener,
hostname: Option<String>,
addr: HashSet<IpAddr>,
port: u16,
@@ -489,20 +503,18 @@ impl Server {
hostname.as_deref().unwrap_or(""),
&addr
);
let client = self
.client_manager
.borrow_mut()
.add_client(hostname.clone(), addr, port, pos);
let handle =
self.client_manager
.borrow_mut()
.add_client(hostname.clone(), addr, port, pos, false);
log::debug!("add_client {client}");
if let Some(hostname) = hostname.clone() {
let _ = resolver_tx.send((hostname, client)).await;
};
let notify = FrontendNotify::NotifyClientCreate(client, hostname, port, pos);
if let Err(e) = frontend.notify_all(notify).await {
log::error!("error notifying frontend: {e}");
};
client
log::debug!("add_client {handle}");
if let Some(hostname) = hostname {
let _ = resolver_tx.send((hostname, handle)).await;
}
handle
}
pub async fn activate_client(
@@ -572,41 +584,40 @@ impl Server {
producer_notify_tx: &Sender<ProducerEvent>,
consumer_notify_tx: &Sender<ConsumerEvent>,
resolve_tx: &Sender<(String, ClientHandle)>,
client_update: ClientUpdate,
client_update: (ClientHandle, Option<String>, u16, Position),
) {
let (handle, hostname, port, pos) = client_update;
let (hostname, handle, active) = {
// retrieve state
let mut client_manager = self.client_manager.borrow_mut();
let Some(state) = client_manager.get_mut(client_update.client) else {
let Some(state) = client_manager.get_mut(handle) else {
return;
};
// update pos
state.client.pos = client_update.pos;
state.client.pos = pos;
// update port
if state.client.port != client_update.port {
state.client.port = client_update.port;
if state.client.port != port {
state.client.port = port;
state.client.addrs = state
.client
.addrs
.iter()
.cloned()
.map(|mut a| {
a.set_port(client_update.port);
a.set_port(port);
a
})
.collect();
state.active_addr = state
.active_addr
.map(|a| SocketAddr::new(a.ip(), client_update.port));
state.active_addr = state.active_addr.map(|a| SocketAddr::new(a.ip(), port));
}
// update hostname
if state.client.hostname != client_update.hostname {
if state.client.hostname != hostname {
state.client.addrs = HashSet::new();
state.active_addr = None;
state.client.hostname = client_update.hostname;
state.client.hostname = hostname;
}
log::debug!("client updated: {:?}", state);
@@ -625,26 +636,16 @@ impl Server {
// update state in event consumer & producer
if active {
let _ = producer_notify_tx
.send(ProducerEvent::ClientEvent(ClientEvent::Destroy(
client_update.client,
)))
.send(ProducerEvent::ClientEvent(ClientEvent::Destroy(handle)))
.await;
let _ = consumer_notify_tx
.send(ConsumerEvent::ClientEvent(ClientEvent::Destroy(
client_update.client,
)))
.send(ConsumerEvent::ClientEvent(ClientEvent::Destroy(handle)))
.await;
let _ = producer_notify_tx
.send(ProducerEvent::ClientEvent(ClientEvent::Create(
client_update.client,
client_update.pos,
)))
.send(ProducerEvent::ClientEvent(ClientEvent::Create(handle, pos)))
.await;
let _ = consumer_notify_tx
.send(ConsumerEvent::ClientEvent(ClientEvent::Create(
client_update.client,
client_update.pos,
)))
.send(ConsumerEvent::ClientEvent(ClientEvent::Create(handle, pos)))
.await;
}
}
@@ -853,7 +854,6 @@ impl Server {
async fn handle_frontend_stream(
&self,
frontend: &mut FrontendListener,
frontend_tx: &Sender<FrontendEvent>,
#[cfg(unix)] mut stream: ReadHalf<UnixStream>,
#[cfg(windows)] mut stream: ReadHalf<TcpStream>,
@@ -880,55 +880,85 @@ impl Server {
}
}
});
self.enumerate(frontend).await;
let _ = frontend_tx.send(FrontendEvent::Enumerate()).await;
}
async fn handle_frontend_event(
&self,
producer_notify_tx: &Sender<ProducerEvent>,
consumer_notify_tx: &Sender<ConsumerEvent>,
producer_tx: &Sender<ProducerEvent>,
consumer_tx: &Sender<ConsumerEvent>,
resolve_tx: &Sender<(String, ClientHandle)>,
frontend: &mut FrontendListener,
port_tx: &Sender<u16>,
event: FrontendEvent,
) -> bool {
log::debug!("frontend: {event:?}");
match event {
let response = match event {
FrontendEvent::AddClient(hostname, port, pos) => {
self.add_client(resolve_tx, frontend, hostname, HashSet::new(), port, pos)
let handle = self
.add_client(resolve_tx, hostname, HashSet::new(), port, pos)
.await;
let client = self
.client_manager
.borrow()
.get(handle)
.unwrap()
.client
.clone();
Some(FrontendNotify::NotifyClientCreate(client))
}
FrontendEvent::ActivateClient(client, active) => {
self.activate_client(producer_notify_tx, consumer_notify_tx, client, active)
.await
FrontendEvent::ActivateClient(handle, active) => {
self.activate_client(producer_tx, consumer_tx, handle, active)
.await;
Some(FrontendNotify::NotifyClientActivate(handle, active))
}
FrontendEvent::ChangePort(port) => {
let _ = port_tx.send(port).await;
None
}
FrontendEvent::DelClient(client) => {
self.remove_client(producer_notify_tx, consumer_notify_tx, frontend, client)
FrontendEvent::DelClient(handle) => {
self.remove_client(producer_tx, consumer_tx, frontend, handle)
.await;
Some(FrontendNotify::NotifyClientDelete(handle))
}
FrontendEvent::Enumerate() => {
let clients = self
.client_manager
.borrow()
.get_client_states()
.map(|s| (s.client.clone(), s.active))
.collect();
Some(FrontendNotify::Enumerate(clients))
}
FrontendEvent::Enumerate() => self.enumerate(frontend).await,
FrontendEvent::Shutdown() => {
log::info!("terminating gracefully...");
return true;
}
FrontendEvent::UpdateClient(client, hostname, port, pos) => {
let client_update = ClientUpdate {
client,
hostname,
port,
pos,
};
FrontendEvent::UpdateClient(handle, hostname, port, pos) => {
self.update_client(
producer_notify_tx,
consumer_notify_tx,
producer_tx,
consumer_tx,
resolve_tx,
client_update,
(handle, hostname, port, pos),
)
.await
.await;
let client = self
.client_manager
.borrow()
.get(handle)
.unwrap()
.client
.clone();
Some(FrontendNotify::NotifyClientUpdate(client))
}
};
let Some(response) = response else {
return false;
};
if let Err(e) = frontend.notify_all(response).await {
log::error!("error notifying frontend: {e}");
}
false
}
@@ -964,21 +994,6 @@ impl Server {
.consume(Event::Keyboard(modifiers_event), client)
.await;
}
async fn enumerate(&self, frontend: &mut FrontendListener) {
let clients = self
.client_manager
.borrow()
.get_client_states()
.map(|s| (s.client.clone(), s.active))
.collect();
if let Err(e) = frontend
.notify_all(FrontendNotify::Enumerate(clients))
.await
{
log::error!("error notifying frontend: {e}");
}
}
}
async fn receive_event(socket: &UdpSocket) -> anyhow::Result<(Event, SocketAddr)> {