handle port change

This commit is contained in:
Ferdinand Schober
2024-10-28 15:51:41 +01:00
parent 2291cf25a3
commit 1489719e01
3 changed files with 289 additions and 146 deletions

View File

@@ -1,13 +1,14 @@
use crate::{listen::LanMouseListener, service::Service};
use crate::listen::{LanMouseListener, ListenerCreationError};
use futures::StreamExt;
use input_emulation::{EmulationHandle, InputEmulation, InputEmulationError};
use input_event::Event;
use lan_mouse_ipc::Status;
use lan_mouse_proto::{Position, ProtoEvent};
use local_channel::mpsc::{channel, Receiver, Sender};
use std::{
cell::Cell,
collections::HashMap,
net::SocketAddr,
rc::Rc,
time::{Duration, Instant},
};
use tokio::{
@@ -18,26 +19,82 @@ use tokio::{
/// emulation handling events received from a listener
pub(crate) struct Emulation {
task: JoinHandle<()>,
release_tx: Sender<SocketAddr>,
request_tx: Sender<EmulationRequest>,
event_rx: Receiver<EmulationEvent>,
}
pub(crate) enum EmulationEvent {
/// new connection
Connected {
/// address of the connection
addr: SocketAddr,
/// position of the connection
pos: lan_mouse_ipc::Position,
/// certificate fingerprint of the connection
fingerprint: String,
},
/// connection closed
Disconnected { addr: SocketAddr },
/// the port of the listener has changed
PortChanged(Result<u16, ListenerCreationError>),
/// emulation was disabled
EmulationDisabled,
/// emulation was enabled
EmulationEnabled,
/// capture should be released
ReleaseNotify,
}
enum EmulationRequest {
Reenable,
Release(SocketAddr),
ChangePort(u16),
Terminate,
}
impl Emulation {
pub(crate) fn new(server: Service, listener: LanMouseListener) -> Self {
let emulation_proxy = EmulationProxy::new(server.clone());
let (release_tx, release_rx) = channel();
let task = spawn_local(Self::run(server, listener, emulation_proxy, release_rx));
Self { task, release_tx }
pub(crate) fn new(
backend: Option<input_emulation::Backend>,
listener: LanMouseListener,
) -> Self {
let emulation_proxy = EmulationProxy::new(backend);
let (request_tx, request_rx) = channel();
let (event_tx, event_rx) = channel();
let task = spawn_local(Self::run(listener, emulation_proxy, request_rx, event_tx));
Self {
task,
request_tx,
event_rx,
}
}
pub(crate) fn notify_release(&self, addr: SocketAddr) {
self.release_tx.send(addr).expect("channel closed");
pub(crate) fn send_leave_event(&self, addr: SocketAddr) {
self.request_tx
.send(EmulationRequest::Release(addr))
.expect("channel closed");
}
pub(crate) fn reenable(&self) {
self.request_tx
.send(EmulationRequest::Reenable)
.expect("channel closed");
}
pub(crate) fn request_port_change(&self, port: u16) {
self.request_tx
.send(EmulationRequest::ChangePort(port))
.expect("channel closed")
}
pub(crate) async fn event(&mut self) -> EmulationEvent {
self.event_rx.recv().await.expect("channel closed")
}
async fn run(
service: Service,
mut listener: LanMouseListener,
mut emulation_proxy: EmulationProxy,
mut release_rx: Receiver<SocketAddr>,
mut request_rx: Receiver<EmulationRequest>,
event_tx: Sender<EmulationEvent>,
) {
let mut interval = tokio::time::interval(Duration::from_secs(5));
let mut last_response = HashMap::new();
@@ -52,11 +109,11 @@ impl Emulation {
last_response.insert(addr, Instant::now());
match event {
ProtoEvent::Enter(pos) => {
if let Some(cert) = listener.get_certificate_fingerprint(addr).await {
if let Some(fingerprint) = listener.get_certificate_fingerprint(addr).await {
log::info!("releasing capture: {addr} entered this device");
service.release_capture();
event_tx.send(EmulationEvent::ReleaseNotify).expect("channel closed");
listener.reply(addr, ProtoEvent::Ack(0)).await;
service.register_incoming(addr, to_ipc_pos(pos), cert);
event_tx.send(EmulationEvent::Connected{addr, pos: to_ipc_pos(pos), fingerprint}).expect("channel closed");
}
}
ProtoEvent::Leave(_) => {
@@ -64,28 +121,37 @@ impl Emulation {
listener.reply(addr, ProtoEvent::Ack(0)).await;
}
ProtoEvent::Input(event) => emulation_proxy.consume(event, addr),
ProtoEvent::Ping => listener.reply(addr, ProtoEvent::Pong(service.emulation_status.get() == Status::Enabled)).await,
ProtoEvent::Ping => listener.reply(addr, ProtoEvent::Pong(emulation_proxy.emulation_active.get())).await,
_ => {}
}
}
addr = release_rx.recv() => {
// notify the other end that we hit a barrier (should release capture)
let addr = addr.expect("channel closed");
listener.reply(addr, ProtoEvent::Leave(0)).await;
event = emulation_proxy.event() => {
event_tx.send(event).expect("channel closed");
}
request = request_rx.recv() => match request.expect("channel closed") {
// reenable emulation
EmulationRequest::Reenable => emulation_proxy.reenable(),
// notify the other end that we hit a barrier (should release capture)
EmulationRequest::Release(addr) => listener.reply(addr, ProtoEvent::Leave(0)).await,
EmulationRequest::ChangePort(port) => {
listener.request_port_change(port);
let result = listener.port_changed().await;
event_tx.send(EmulationEvent::PortChanged(result)).expect("channel closed");
}
EmulationRequest::Terminate => break,
},
_ = interval.tick() => {
last_response.retain(|&addr,instant| {
if instant.elapsed() > Duration::from_secs(5) {
log::warn!("releasing keys: {addr} not responding!");
emulation_proxy.release_keys(addr);
service.deregister_incoming(addr);
event_tx.send(EmulationEvent::Disconnected { addr }).expect("channel closed");
false
} else {
true
}
});
}
_ = service.cancelled() => break,
}
}
listener.terminate().await;
@@ -95,6 +161,9 @@ impl Emulation {
/// wait for termination
pub(crate) async fn terminate(&mut self) {
log::debug!("terminating emulation");
self.request_tx
.send(EmulationRequest::Terminate)
.expect("channel closed");
if let Err(e) = (&mut self.task).await {
log::warn!("{e}");
}
@@ -104,111 +173,166 @@ impl Emulation {
/// proxy handling the actual input emulation,
/// discarding events when it is disabled
pub(crate) struct EmulationProxy {
server: Service,
tx: Sender<(ProxyEvent, SocketAddr)>,
emulation_active: Rc<Cell<bool>>,
request_tx: Sender<ProxyRequest>,
event_rx: Receiver<EmulationEvent>,
task: JoinHandle<()>,
}
enum ProxyEvent {
Input(Event),
ReleaseKeys,
enum ProxyRequest {
Input(Event, SocketAddr),
ReleaseKeys(SocketAddr),
Terminate,
Reenable,
}
impl EmulationProxy {
fn new(server: Service) -> Self {
let (tx, rx) = channel();
let task = spawn_local(Self::emulation_task(server.clone(), rx));
Self { server, tx, task }
fn new(backend: Option<input_emulation::Backend>) -> Self {
let (request_tx, request_rx) = channel();
let (event_tx, event_rx) = channel();
let emulation_active = Rc::new(Cell::new(false));
let task = spawn_local(Self::emulation_task(backend, request_rx, event_tx));
Self {
emulation_active,
request_tx,
task,
event_rx,
}
}
async fn event(&mut self) -> EmulationEvent {
let event = self.event_rx.recv().await.expect("channel closed");
if let EmulationEvent::EmulationEnabled = event {
self.emulation_active.replace(true);
}
if let EmulationEvent::EmulationDisabled = event {
self.emulation_active.replace(false);
}
event
}
fn consume(&self, event: Event, addr: SocketAddr) {
// ignore events if emulation is currently disabled
if let Status::Enabled = self.server.emulation_status.get() {
self.tx
.send((ProxyEvent::Input(event), addr))
if self.emulation_active.get() {
self.request_tx
.send(ProxyRequest::Input(event, addr))
.expect("channel closed");
}
}
fn release_keys(&self, addr: SocketAddr) {
self.tx
.send((ProxyEvent::ReleaseKeys, addr))
self.request_tx
.send(ProxyRequest::ReleaseKeys(addr))
.expect("channel closed");
}
async fn emulation_task(server: Service, mut rx: Receiver<(ProxyEvent, SocketAddr)>) {
async fn emulation_task(
backend: Option<input_emulation::Backend>,
mut request_rx: Receiver<ProxyRequest>,
event_tx: Sender<EmulationEvent>,
) {
let mut handles = HashMap::new();
let mut next_id = 0;
loop {
if let Err(e) = Self::do_emulation(&server, &mut handles, &mut next_id, &mut rx).await {
if let Err(e) = Self::do_emulation(
backend,
&mut handles,
&mut next_id,
&mut request_rx,
&event_tx,
)
.await
{
log::warn!("input emulation exited: {e}");
}
tokio::select! {
_ = server.emulation_notified() => {},
_ = server.cancelled() => return,
// wait for reenable request
loop {
match request_rx.recv().await.expect("channel closed") {
ProxyRequest::Reenable => break,
ProxyRequest::Terminate => return,
ProxyRequest::Input(..) => { /* emulation inactive => ignore */ }
ProxyRequest::ReleaseKeys(..) => { /* emulation inactive => ignore */ }
}
}
}
}
async fn do_emulation(
server: &Service,
backend: Option<input_emulation::Backend>,
handles: &mut HashMap<SocketAddr, EmulationHandle>,
next_id: &mut EmulationHandle,
rx: &mut Receiver<(ProxyEvent, SocketAddr)>,
request_rx: &mut Receiver<ProxyRequest>,
event_tx: &Sender<EmulationEvent>,
) -> Result<(), InputEmulationError> {
let backend = server.config.emulation_backend.map(|b| b.into());
log::info!("creating input emulation ...");
let mut emulation = tokio::select! {
r = InputEmulation::new(backend) => r?,
_ = server.cancelled() => return Ok(()),
// allow termination event while requesting input emulation
_ = wait_for_termination(request_rx) => return Ok(()),
};
server.set_emulation_status(Status::Enabled);
event_tx
.send(EmulationEvent::EmulationEnabled)
.expect("channel closed");
// create active handles
for &handle in handles.values() {
emulation.create(handle).await;
}
let res = Self::do_emulation_session(server, &mut emulation, handles, next_id, rx).await;
let res = Self::do_emulation_session(&mut emulation, handles, next_id, request_rx).await;
// FIXME replace with async drop when stabilized
emulation.terminate().await;
server.set_emulation_status(Status::Disabled);
event_tx
.send(EmulationEvent::EmulationDisabled)
.expect("channel closed");
res
}
async fn do_emulation_session(
server: &Service,
emulation: &mut InputEmulation,
handles: &mut HashMap<SocketAddr, EmulationHandle>,
next_id: &mut EmulationHandle,
rx: &mut Receiver<(ProxyEvent, SocketAddr)>,
rx: &mut Receiver<ProxyRequest>,
) -> Result<(), InputEmulationError> {
loop {
tokio::select! {
e = rx.recv() => {
let (event, addr) = e.expect("channel closed");
let handle = match handles.get(&addr) {
Some(&handle) => handle,
None => {
let handle = *next_id;
*next_id += 1;
emulation.create(handle).await;
handles.insert(addr, handle);
handle
e = rx.recv() => match e.expect("channel closed") {
ProxyRequest::Input(event, addr) => {
let handle = match handles.get(&addr) {
Some(&handle) => handle,
None => {
let handle = *next_id;
*next_id += 1;
emulation.create(handle).await;
handles.insert(addr, handle);
handle
}
};
emulation.consume(event, handle).await?;
},
ProxyRequest::ReleaseKeys(addr) => {
if let Some(&handle) = handles.get(&addr) {
emulation.release_keys(handle).await?
}
};
match event {
ProxyEvent::Input(event) => emulation.consume(event, handle).await?,
ProxyEvent::ReleaseKeys => emulation.release_keys(handle).await?,
}
}
_ = server.cancelled() => break Ok(()),
ProxyRequest::Terminate => break Ok(()),
ProxyRequest::Reenable => continue,
},
}
}
}
fn reenable(&self) {
self.request_tx
.send(ProxyRequest::Reenable)
.expect("channel closed");
}
async fn terminate(&mut self) {
self.request_tx
.send(ProxyRequest::Terminate)
.expect("channel closed");
let _ = (&mut self.task).await;
}
}
@@ -221,3 +345,14 @@ fn to_ipc_pos(pos: Position) -> lan_mouse_ipc::Position {
Position::Bottom => lan_mouse_ipc::Position::Bottom,
}
}
async fn wait_for_termination(rx: &mut Receiver<ProxyRequest>) {
loop {
match rx.recv().await.expect("channel closed") {
ProxyRequest::Terminate => return,
ProxyRequest::Input(_, _) => continue,
ProxyRequest::ReleaseKeys(_) => continue,
ProxyRequest::Reenable => continue,
}
}
}