implementing release logic for one-way connection

This commit is contained in:
Ferdinand Schober
2024-10-08 01:10:25 +02:00
parent 44e34918bb
commit 06d4e8d836
6 changed files with 125 additions and 19 deletions

View File

@@ -283,6 +283,8 @@ impl Cli {
FrontendEvent::PublicKeyFingerprint(fp) => {
eprintln!("the public key fingerprint of this device is {fp}");
}
FrontendEvent::IncomingConnected(fp, addr, pos) => {}
FrontendEvent::IncomingDisconnected(fp) => {}
}
}

View File

@@ -142,6 +142,8 @@ fn build_ui(app: &Application) {
FrontendEvent::PublicKeyFingerprint(fp) => {
window.set_pk_fp(&fp);
}
FrontendEvent::IncomingConnected(fp, addr, pos) => {}
FrontendEvent::IncomingDisconnected(fp) => {}
}
}
}

View File

@@ -201,6 +201,10 @@ pub enum FrontendEvent {
AuthorizedUpdated(HashMap<String, String>),
/// public key fingerprint of this device
PublicKeyFingerprint(String),
/// incoming connected
IncomingConnected(String, SocketAddr, Position),
/// incoming disconnected
IncomingDisconnected(String),
}
#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]

View File

@@ -20,6 +20,7 @@ use crate::{connect::LanMouseConnection, service::Service};
pub(crate) struct Capture {
tx: Sender<CaptureRequest>,
task: JoinHandle<()>,
enter_rx: Receiver<CaptureHandle>,
}
#[derive(Clone, Copy, Debug)]
@@ -35,8 +36,9 @@ enum CaptureRequest {
impl Capture {
pub(crate) fn new(server: Service, conn: LanMouseConnection) -> Self {
let (tx, rx) = channel();
let task = spawn_local(Self::run(server.clone(), rx, conn));
Self { tx, task }
let (enter_tx, enter_rx) = channel();
let task = spawn_local(Self::run(server.clone(), rx, conn, enter_tx));
Self { tx, task, enter_rx }
}
pub(crate) async fn terminate(&mut self) {
@@ -66,9 +68,18 @@ impl Capture {
.expect("channel closed");
}
async fn run(server: Service, mut rx: Receiver<CaptureRequest>, mut conn: LanMouseConnection) {
pub(crate) async fn entered(&mut self) -> CaptureHandle {
self.enter_rx.recv().await.expect("channel closed")
}
async fn run(
server: Service,
mut rx: Receiver<CaptureRequest>,
mut conn: LanMouseConnection,
mut enter_tx: Sender<CaptureHandle>,
) {
loop {
if let Err(e) = do_capture(&server, &mut conn, &mut rx).await {
if let Err(e) = do_capture(&server, &mut conn, &mut rx, &mut enter_tx).await {
log::warn!("input capture exited: {e}");
}
server.set_capture_status(Status::Disabled);
@@ -90,6 +101,7 @@ async fn do_capture(
server: &Service,
conn: &mut LanMouseConnection,
rx: &mut Receiver<CaptureRequest>,
enter_tx: &mut Sender<CaptureHandle>,
) -> Result<(), InputCaptureError> {
let backend = server.config.capture_backend.map(|b| b.into());
@@ -121,7 +133,7 @@ async fn do_capture(
loop {
tokio::select! {
event = capture.next() => match event {
Some(event) => handle_capture_event(server, &mut capture, conn, event?, &mut state).await?,
Some(event) => handle_capture_event(server, &mut capture, conn, event?, &mut state, enter_tx).await?,
None => return Ok(()),
},
(handle, event) = conn.recv() => {
@@ -196,6 +208,7 @@ async fn handle_capture_event(
conn: &LanMouseConnection,
event: (CaptureHandle, CaptureEvent),
state: &mut State,
enter_tx: &mut Sender<CaptureHandle>,
) -> Result<(), CaptureError> {
let (handle, event) = event;
log::trace!("({handle}): {event:?}");
@@ -211,6 +224,10 @@ async fn handle_capture_event(
return release_capture(capture, server).await;
}
if event == CaptureEvent::Begin {
enter_tx.send(handle).expect("channel closed");
}
// activated a new client
if event == CaptureEvent::Begin && Some(handle) != server.get_active() {
*state = State::WaitingForAck;

View File

@@ -18,19 +18,26 @@ use tokio::{
/// emulation handling events received from a listener
pub(crate) struct Emulation {
task: JoinHandle<()>,
release_tx: Sender<SocketAddr>,
}
impl Emulation {
pub(crate) fn new(server: Service, listener: LanMouseListener) -> Self {
let emulation_proxy = EmulationProxy::new(server.clone());
let task = spawn_local(Self::run(server, listener, emulation_proxy));
Self { task }
let (release_tx, release_rx) = channel();
let task = spawn_local(Self::run(server, listener, emulation_proxy, release_rx));
Self { task, release_tx }
}
pub(crate) fn notify_release(&self, addr: SocketAddr) {
self.release_tx.send(addr).expect("channel closed");
}
async fn run(
server: Service,
service: Service,
mut listener: LanMouseListener,
mut emulation_proxy: EmulationProxy,
mut release_rx: Receiver<SocketAddr>,
) {
let mut interval = tokio::time::interval(Duration::from_secs(5));
let mut last_response = HashMap::new();
@@ -47,9 +54,9 @@ impl Emulation {
ProtoEvent::Enter(pos) => {
if let Some(cert) = listener.get_certificate_fingerprint(addr).await {
log::info!("{addr} entered this device");
server.release_capture();
service.release_capture();
listener.reply(addr, ProtoEvent::Ack(0)).await;
server.register_incoming(addr, to_ipc_pos(pos), cert);
service.register_incoming(addr, to_ipc_pos(pos), cert);
}
}
ProtoEvent::Leave(_) => {
@@ -57,22 +64,27 @@ impl Emulation {
listener.reply(addr, ProtoEvent::Ack(0)).await;
}
ProtoEvent::Input(event) => emulation_proxy.consume(event, addr),
ProtoEvent::Ping => listener.reply(addr, ProtoEvent::Pong(server.emulation_status.get() == Status::Enabled)).await,
ProtoEvent::Ping => listener.reply(addr, ProtoEvent::Pong(service.emulation_status.get() == Status::Enabled)).await,
_ => {}
}
}
addr = release_rx.recv() => {
let addr = addr.expect("channel closed");
listener.reply(addr, ProtoEvent::Leave(0)).await;
}
_ = interval.tick() => {
last_response.retain(|addr,instant| {
last_response.retain(|&addr,instant| {
if instant.elapsed() > Duration::from_secs(5) {
log::warn!("releasing keys: {addr} not responding!");
emulation_proxy.release_keys(*addr);
emulation_proxy.release_keys(addr);
service.deregister_incoming(addr);
false
} else {
true
}
});
}
_ = server.cancelled() => break,
_ = service.cancelled() => break,
}
}
listener.terminate().await;

View File

@@ -22,6 +22,7 @@ use std::{
net::{IpAddr, SocketAddr},
rc::Rc,
sync::{Arc, RwLock},
u64,
};
use thiserror::Error;
use tokio::{signal, sync::Notify};
@@ -44,6 +45,17 @@ pub enum ServiceError {
pub struct ReleaseToken;
enum IncomingEvent {
Connected {
addr: SocketAddr,
pos: Position,
fingerprint: String,
},
Disconnected {
addr: SocketAddr,
},
}
#[derive(Clone)]
pub struct Service {
active: Rc<Cell<Option<ClientHandle>>>,
@@ -54,12 +66,13 @@ pub struct Service {
notifies: Rc<Notifies>,
pub(crate) config: Rc<Config>,
pending_frontend_events: Rc<RefCell<VecDeque<FrontendEvent>>>,
pending_incoming: Rc<RefCell<VecDeque<(SocketAddr, Position, String)>>>,
pending_incoming: Rc<RefCell<VecDeque<IncomingEvent>>>,
capture_status: Rc<Cell<Status>>,
pub(crate) emulation_status: Rc<Cell<Status>>,
pub(crate) should_release: Rc<RefCell<Option<ReleaseToken>>>,
incoming_conns: Rc<RefCell<HashMap<SocketAddr, Position>>>,
incoming_conns: Rc<RefCell<HashMap<ClientHandle, (String, SocketAddr)>>>,
cert: Certificate,
next_trigger_handle: u64,
}
#[derive(Default)]
@@ -118,6 +131,7 @@ impl Service {
emulation_status: Default::default(),
incoming_conns: Rc::new(RefCell::new(HashMap::new())),
should_release: Default::default(),
next_trigger_handle: 0,
};
Ok(service)
}
@@ -171,11 +185,26 @@ impl Service {
}
},
_ = self.notifies.incoming.notified() => {
while let Some((addr, pos, fingerprint)) = {
while let Some(incoming) = {
let incoming = self.pending_incoming.borrow_mut().pop_front();
incoming
} {
// capture.register(addr, pos);
match incoming {
IncomingEvent::Connected { addr, pos, fingerprint } => {
self.add_incoming(addr, pos, fingerprint.clone(), &capture);
self.notify_frontend(FrontendEvent::IncomingConnected(fingerprint, addr, pos));
},
IncomingEvent::Disconnected { addr } => {
if let Some(fp) = self.remove_incoming(addr, &capture) {
self.notify_frontend(FrontendEvent::IncomingDisconnected(fp));
}
},
}
}
},
handle = capture.entered() => {
if let Some((_fp, addr)) = self.incoming_conns.borrow().get(&handle) {
emulation.notify_release(*addr);
}
},
_ = self.cancelled() => break,
@@ -196,6 +225,36 @@ impl Service {
Ok(())
}
fn add_incoming(
&mut self,
addr: SocketAddr,
pos: Position,
fingerprint: String,
capture: &Capture,
) {
const ENTER_HANDLE_BEGIN: u64 = u64::MAX / 2 + 1;
let handle = ENTER_HANDLE_BEGIN + self.next_trigger_handle;
self.next_trigger_handle += 1;
capture.create(handle, pos);
self.incoming_conns
.borrow_mut()
.insert(handle, (fingerprint, addr));
}
fn remove_incoming(&mut self, addr: SocketAddr, capture: &Capture) -> Option<String> {
let handle = self
.incoming_conns
.borrow()
.iter()
.find(|(_, (_, a))| *a == addr)
.map(|(k, _)| *k)?;
capture.destroy(handle);
self.incoming_conns
.borrow_mut()
.remove(&handle)
.map(|(f, _)| f)
}
fn notify_frontend(&self, event: FrontendEvent) {
self.pending_frontend_events.borrow_mut().push_back(event);
self.notifies.frontend_event_pending.notify_one();
@@ -433,7 +492,17 @@ impl Service {
pub(crate) fn register_incoming(&self, addr: SocketAddr, pos: Position, fingerprint: String) {
self.pending_incoming
.borrow_mut()
.push_back((addr, pos, fingerprint));
.push_back(IncomingEvent::Connected {
addr,
pos,
fingerprint,
});
self.notifies.incoming.notify_one();
}
pub(crate) fn deregister_incoming(&self, addr: SocketAddr) {
self.pending_incoming
.borrow_mut()
.push_back(IncomingEvent::Disconnected { addr });
}
}