Separate config state (#118)

* change internal api
* frontend now keeps and more correctly reflects backend state
This commit is contained in:
Ferdinand Schober
2024-05-03 11:27:06 +02:00
committed by GitHub
parent 1e4312b3ce
commit 5318f5a02d
15 changed files with 809 additions and 507 deletions

View File

@@ -108,7 +108,7 @@ async fn handle_capture_event(
// get client state for handle
let mut client_manager = server.client_manager.borrow_mut();
let client_state = match client_manager.get_mut(handle) {
Some(state) => state,
Some((_, s)) => s,
None => {
// should not happen
log::warn!("unknown client!");

View File

@@ -108,7 +108,7 @@ async fn handle_udp_rx(
{
let mut client_manager = server.client_manager.borrow_mut();
let client_state = match client_manager.get_mut(handle) {
Some(s) => s,
Some((_, s)) => s,
None => {
log::error!("unknown handle");
return;
@@ -156,13 +156,12 @@ async fn handle_udp_rx(
}) = event
{
let mut client_manager = server.client_manager.borrow_mut();
let client_state =
if let Some(client_state) = client_manager.get_mut(handle) {
client_state
} else {
log::error!("unknown handle");
return;
};
let client_state = if let Some((_, s)) = client_manager.get_mut(handle) {
s
} else {
log::error!("unknown handle");
return;
};
if state == 0 {
// ignore release event if key not pressed
ignore_event = !client_state.pressed_keys.remove(&key);
@@ -213,7 +212,7 @@ async fn release_keys(
.borrow_mut()
.get_mut(client)
.iter_mut()
.flat_map(|s| s.pressed_keys.drain())
.flat_map(|(_, s)| s.pressed_keys.drain())
.collect::<Vec<_>>();
for key in keys {

View File

@@ -75,12 +75,11 @@ async fn handle_frontend_stream(
let tx = frontend_tx.clone();
tokio::task::spawn_local(async move {
let _ = tx.send(FrontendRequest::Enumerate()).await;
loop {
let event = frontend::wait_for_request(&mut stream).await;
match event {
Ok(event) => {
let _ = tx.send(event).await;
let request = frontend::wait_for_request(&mut stream).await;
match request {
Ok(request) => {
let _ = tx.send(request).await;
}
Err(e) => {
if let Some(e) = e.downcast_ref::<io::Error>() {
@@ -98,8 +97,8 @@ async fn handle_frontend_stream(
async fn handle_frontend_event(
server: &Server,
capture_tx: &Sender<CaptureEvent>,
emulate_tx: &Sender<EmulationEvent>,
capture: &Sender<CaptureEvent>,
emulate: &Sender<EmulationEvent>,
resolve_tx: &Sender<DnsRequest>,
frontend: &mut FrontendListener,
port_tx: &Sender<u16>,
@@ -107,99 +106,67 @@ async fn handle_frontend_event(
) -> bool {
log::debug!("frontend: {event:?}");
match event {
FrontendRequest::Create(hostname, port, pos) => {
add_client(
server,
frontend,
resolve_tx,
hostname,
HashSet::new(),
port,
pos,
)
.await;
FrontendRequest::Create => {
add_client(server, frontend).await;
}
FrontendRequest::Activate(handle, active) => {
if active {
activate_client(server, frontend, capture_tx, emulate_tx, handle).await;
activate_client(server, frontend, capture, emulate, handle).await;
} else {
deactivate_client(server, frontend, capture_tx, emulate_tx, handle).await;
deactivate_client(server, frontend, capture, emulate, handle).await;
}
}
FrontendRequest::ChangePort(port) => {
let _ = port_tx.send(port).await;
}
FrontendRequest::Delete(handle) => {
remove_client(server, frontend, capture_tx, emulate_tx, handle).await;
remove_client(server, frontend, capture, emulate, handle).await;
}
FrontendRequest::Enumerate() => {
let clients = server
.client_manager
.borrow()
.get_client_states()
.map(|(h, s)| (h, s.client.clone(), s.active))
.map(|(h, (c, s))| (h, c.clone(), s.clone()))
.collect();
notify_all(frontend, FrontendEvent::Enumerate(clients)).await;
broadcast(frontend, FrontendEvent::Enumerate(clients)).await;
}
FrontendRequest::Terminate() => {
log::info!("terminating gracefully...");
return true;
}
FrontendRequest::Update(handle, hostname, port, pos) => {
update_client(
server,
frontend,
capture_tx,
emulate_tx,
resolve_tx,
(handle, hostname, port, pos),
)
.await;
FrontendRequest::UpdateFixIps(handle, fix_ips) => {
update_fix_ips(server, resolve_tx, handle, fix_ips).await;
broadcast_client_update(server, frontend, handle).await;
}
FrontendRequest::UpdateHostname(handle, hostname) => {
update_hostname(server, resolve_tx, handle, hostname).await;
broadcast_client_update(server, frontend, handle).await;
}
FrontendRequest::UpdatePort(handle, port) => {
update_port(server, handle, port).await;
broadcast_client_update(server, frontend, handle).await;
}
FrontendRequest::UpdatePosition(handle, pos) => {
update_pos(server, handle, capture, emulate, pos).await;
broadcast_client_update(server, frontend, handle).await;
}
};
false
}
async fn notify_all(frontend: &mut FrontendListener, event: FrontendEvent) {
async fn broadcast(frontend: &mut FrontendListener, event: FrontendEvent) {
if let Err(e) = frontend.broadcast_event(event).await {
log::error!("error notifying frontend: {e}");
}
}
pub async fn add_client(
server: &Server,
frontend: &mut FrontendListener,
resolver_tx: &Sender<DnsRequest>,
hostname: Option<String>,
addr: HashSet<IpAddr>,
port: u16,
pos: Position,
) {
log::info!(
"adding client [{}]{} @ {:?}",
pos,
hostname.as_deref().unwrap_or(""),
&addr
);
let handle =
server
.client_manager
.borrow_mut()
.add_client(hostname.clone(), addr, port, pos, false);
pub async fn add_client(server: &Server, frontend: &mut FrontendListener) {
let handle = server.client_manager.borrow_mut().add_client();
log::info!("added client {handle}");
log::debug!("add_client {handle}");
if let Some(hostname) = hostname {
let _ = resolver_tx.send(DnsRequest { hostname, handle }).await;
}
let client = server
.client_manager
.borrow()
.get(handle)
.unwrap()
.client
.clone();
notify_all(frontend, FrontendEvent::Created(handle, client)).await;
let (c, s) = server.client_manager.borrow().get(handle).unwrap().clone();
broadcast(frontend, FrontendEvent::Created(handle, c, s)).await;
}
pub async fn deactivate_client(
@@ -209,19 +176,19 @@ pub async fn deactivate_client(
emulate: &Sender<EmulationEvent>,
handle: ClientHandle,
) {
let (client, _) = match server.client_manager.borrow_mut().get_mut(handle) {
Some(state) => {
state.active = false;
(handle, state.client.pos)
let state = match server.client_manager.borrow_mut().get_mut(handle) {
Some((_, s)) => {
s.active = false;
s.clone()
}
None => return,
};
let event = ClientEvent::Destroy(client);
let event = ClientEvent::Destroy(handle);
let _ = capture.send(CaptureEvent::ClientEvent(event)).await;
let _ = emulate.send(EmulationEvent::ClientEvent(event)).await;
let event = FrontendEvent::Activated(client, false);
notify_all(frontend, event).await;
let event = FrontendEvent::StateChange(handle, state);
broadcast(frontend, event).await;
}
pub async fn activate_client(
@@ -233,7 +200,7 @@ pub async fn activate_client(
) {
/* deactivate potential other client at this position */
let pos = match server.client_manager.borrow().get(handle) {
Some(state) => state.client.pos,
Some((client, _)) => client.pos,
None => return,
};
@@ -245,19 +212,19 @@ pub async fn activate_client(
}
/* activate the client */
server
.client_manager
.borrow_mut()
.get_mut(handle)
.unwrap()
.active = true;
let state = if let Some((_, s)) = server.client_manager.borrow_mut().get_mut(handle) {
s.active = true;
s.clone()
} else {
return;
};
/* notify emulation, capture and frontends */
let event = ClientEvent::Create(handle, pos);
let _ = capture.send(CaptureEvent::ClientEvent(event)).await;
let _ = emulate.send(EmulationEvent::ClientEvent(event)).await;
let event = FrontendEvent::Activated(handle, true);
notify_all(frontend, event).await;
let event = FrontendEvent::StateChange(handle, state);
broadcast(frontend, event).await;
}
pub async fn remove_client(
@@ -271,7 +238,7 @@ pub async fn remove_client(
.client_manager
.borrow_mut()
.remove_client(handle)
.map(|s| s.active)
.map(|(_, s)| s.active)
else {
return;
};
@@ -283,76 +250,107 @@ pub async fn remove_client(
}
let event = FrontendEvent::Deleted(handle);
notify_all(frontend, event).await;
broadcast(frontend, event).await;
}
async fn update_client(
async fn update_fix_ips(
server: &Server,
frontend: &mut FrontendListener,
capture: &Sender<CaptureEvent>,
emulate: &Sender<EmulationEvent>,
resolve_tx: &Sender<DnsRequest>,
client_update: (ClientHandle, Option<String>, u16, Position),
handle: ClientHandle,
fix_ips: Vec<IpAddr>,
) {
let (handle, hostname, port, pos) = client_update;
let mut changed = false;
let (hostname, active) = {
// retrieve state
let hostname = {
let mut client_manager = server.client_manager.borrow_mut();
let Some(state) = client_manager.get_mut(handle) else {
let Some((c, _)) = client_manager.get_mut(handle) else {
return;
};
// update pos
if state.client.pos != pos {
state.client.pos = pos;
changed = true;
}
// update port
if state.client.port != port {
state.client.port = port;
state.active_addr = state.active_addr.map(|a| SocketAddr::new(a.ip(), port));
changed = true;
}
// update hostname
if state.client.hostname != hostname {
state.client.ips = HashSet::new();
state.active_addr = None;
state.client.hostname = hostname;
changed = true;
}
log::debug!("client updated: {:?}", state);
(state.client.hostname.clone(), state.active)
c.fix_ips = fix_ips;
c.hostname.clone()
};
// resolve dns if something changed
if changed {
// resolve dns
if let Some(hostname) = hostname {
let _ = resolve_tx.send(DnsRequest { hostname, handle }).await;
}
if let Some(hostname) = hostname {
let _ = resolve_tx.send(DnsRequest { hostname, handle }).await;
}
}
async fn update_hostname(
server: &Server,
resolve_tx: &Sender<DnsRequest>,
handle: ClientHandle,
hostname: Option<String>,
) {
let hostname = {
let mut client_manager = server.client_manager.borrow_mut();
let Some((c, s)) = client_manager.get_mut(handle) else {
return;
};
// update hostname
if c.hostname != hostname {
c.hostname = hostname;
s.ips = HashSet::from_iter(c.fix_ips.iter().cloned());
s.active_addr = None;
c.hostname.clone()
} else {
None
}
};
// resolve to update ips in state
if let Some(hostname) = hostname {
let _ = resolve_tx.send(DnsRequest { hostname, handle }).await;
}
}
async fn update_port(server: &Server, handle: ClientHandle, port: u16) {
let mut client_manager = server.client_manager.borrow_mut();
let Some((c, s)) = client_manager.get_mut(handle) else {
return;
};
if c.port != port {
c.port = port;
s.active_addr = s.active_addr.map(|a| SocketAddr::new(a.ip(), port));
}
}
async fn update_pos(
server: &Server,
handle: ClientHandle,
capture: &Sender<CaptureEvent>,
emulate: &Sender<EmulationEvent>,
pos: Position,
) {
let (changed, active) = {
let mut client_manager = server.client_manager.borrow_mut();
let Some((c, s)) = client_manager.get_mut(handle) else {
return;
};
let changed = c.pos != pos;
c.pos = pos;
(changed, s.active)
};
// update state in event input emulator & input capture
if changed && active {
// update state
let destroy = ClientEvent::Destroy(handle);
if changed {
if active {
let destroy = ClientEvent::Destroy(handle);
let _ = capture.send(CaptureEvent::ClientEvent(destroy)).await;
let _ = emulate.send(EmulationEvent::ClientEvent(destroy)).await;
}
let create = ClientEvent::Create(handle, pos);
let _ = capture.send(CaptureEvent::ClientEvent(destroy)).await;
let _ = emulate.send(EmulationEvent::ClientEvent(destroy)).await;
let _ = capture.send(CaptureEvent::ClientEvent(create)).await;
let _ = emulate.send(EmulationEvent::ClientEvent(create)).await;
}
let client = server
.client_manager
.borrow()
.get(handle)
.unwrap()
.client
.clone();
notify_all(frontend, FrontendEvent::Updated(handle, client)).await;
}
async fn broadcast_client_update(
server: &Server,
frontend: &mut FrontendListener,
handle: ClientHandle,
) {
let (client, _) = server.client_manager.borrow().get(handle).unwrap().clone();
broadcast(frontend, FrontendEvent::Updated(handle, client)).await;
}

View File

@@ -34,7 +34,7 @@ pub fn new(
// if receiving we care about clients with pressed keys
client_manager
.get_client_states_mut()
.filter(|(_, s)| !s.pressed_keys.is_empty())
.filter(|(_, (_, s))| !s.pressed_keys.is_empty())
.map(|(h, _)| h)
.collect()
} else {
@@ -46,17 +46,15 @@ pub fn new(
let ping_addrs: Vec<SocketAddr> = {
ping_clients
.iter()
.flat_map(|&c| client_manager.get(c))
.flat_map(|state| {
if state.alive && state.active_addr.is_some() {
vec![state.active_addr.unwrap()]
.flat_map(|&h| client_manager.get(h))
.flat_map(|(c, s)| {
if s.alive && s.active_addr.is_some() {
vec![s.active_addr.unwrap()]
} else {
state
.client
.ips
s.ips
.iter()
.cloned()
.map(|ip| SocketAddr::new(ip, state.client.port))
.map(|ip| SocketAddr::new(ip, c.port))
.collect()
}
})
@@ -64,8 +62,8 @@ pub fn new(
};
// reset alive
for (_, state) in client_manager.get_client_states_mut() {
state.alive = false;
for (_, (_, s)) in client_manager.get_client_states_mut() {
s.alive = false;
}
(ping_clients, ping_addrs)
@@ -102,8 +100,8 @@ pub fn new(
let client_manager = server.client_manager.borrow();
ping_clients
.iter()
.filter_map(|&c| match client_manager.get(c) {
Some(state) if !state.alive => Some(c),
.filter_map(|&h| match client_manager.get(h) {
Some((_, s)) if !s.alive => Some(h),
_ => None,
})
.collect()
@@ -112,9 +110,9 @@ pub fn new(
// we may not be receiving anymore but we should respond
// to the original state and not the "new" one
if receiving {
for c in unresponsive_clients {
for h in unresponsive_clients {
log::warn!("device not responding, releasing keys!");
let _ = emulate_notify.send(EmulationEvent::ReleaseKeys(c)).await;
let _ = emulate_notify.send(EmulationEvent::ReleaseKeys(h)).await;
}
} else {
// release pointer if the active client has not responded

View File

@@ -27,12 +27,12 @@ pub fn new(resolver: DnsResolver, server: Server) -> (JoinHandle<()>, Sender<Dns
continue;
}
};
if let Some(state) = server.client_manager.borrow_mut().get_mut(handle) {
let mut addrs = HashSet::from_iter(state.client.fix_ips.iter().cloned());
if let Some((c, s)) = server.client_manager.borrow_mut().get_mut(handle) {
let mut addrs = HashSet::from_iter(c.fix_ips.iter().cloned());
for ip in ips {
addrs.insert(ip);
}
state.client.ips = addrs;
s.ips = addrs;
}
}
});