mirror of
https://github.com/feschber/lan-mouse.git
synced 2026-03-07 11:59:59 +03:00
cleanup emulation
This commit is contained in:
388
src/emulation.rs
388
src/emulation.rs
@@ -60,7 +60,13 @@ impl Emulation {
|
||||
let emulation_proxy = EmulationProxy::new(backend);
|
||||
let (request_tx, request_rx) = channel();
|
||||
let (event_tx, event_rx) = channel();
|
||||
let task = spawn_local(Self::run(listener, emulation_proxy, request_rx, event_tx));
|
||||
let emulation_task = ListenTask {
|
||||
listener,
|
||||
emulation_proxy,
|
||||
request_rx,
|
||||
event_tx,
|
||||
};
|
||||
let task = spawn_local(emulation_task.run());
|
||||
Self {
|
||||
task,
|
||||
request_tx,
|
||||
@@ -90,74 +96,6 @@ impl Emulation {
|
||||
self.event_rx.recv().await.expect("channel closed")
|
||||
}
|
||||
|
||||
async fn run(
|
||||
mut listener: LanMouseListener,
|
||||
mut emulation_proxy: EmulationProxy,
|
||||
mut request_rx: Receiver<EmulationRequest>,
|
||||
event_tx: Sender<EmulationEvent>,
|
||||
) {
|
||||
let mut interval = tokio::time::interval(Duration::from_secs(5));
|
||||
let mut last_response = HashMap::new();
|
||||
loop {
|
||||
select! {
|
||||
e = listener.next() => {
|
||||
let (event, addr) = match e {
|
||||
Some(e) => e,
|
||||
None => break,
|
||||
};
|
||||
log::trace!("{event} <-<-<-<-<- {addr}");
|
||||
last_response.insert(addr, Instant::now());
|
||||
match event {
|
||||
ProtoEvent::Enter(pos) => {
|
||||
if let Some(fingerprint) = listener.get_certificate_fingerprint(addr).await {
|
||||
log::info!("releasing capture: {addr} entered this device");
|
||||
event_tx.send(EmulationEvent::ReleaseNotify).expect("channel closed");
|
||||
listener.reply(addr, ProtoEvent::Ack(0)).await;
|
||||
event_tx.send(EmulationEvent::Connected{addr, pos: to_ipc_pos(pos), fingerprint}).expect("channel closed");
|
||||
}
|
||||
}
|
||||
ProtoEvent::Leave(_) => {
|
||||
emulation_proxy.release_keys(addr);
|
||||
listener.reply(addr, ProtoEvent::Ack(0)).await;
|
||||
}
|
||||
ProtoEvent::Input(event) => emulation_proxy.consume(event, addr),
|
||||
ProtoEvent::Ping => listener.reply(addr, ProtoEvent::Pong(emulation_proxy.emulation_active.get())).await,
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
event = emulation_proxy.event() => {
|
||||
event_tx.send(event).expect("channel closed");
|
||||
}
|
||||
request = request_rx.recv() => match request.expect("channel closed") {
|
||||
// reenable emulation
|
||||
EmulationRequest::Reenable => emulation_proxy.reenable(),
|
||||
// notify the other end that we hit a barrier (should release capture)
|
||||
EmulationRequest::Release(addr) => listener.reply(addr, ProtoEvent::Leave(0)).await,
|
||||
EmulationRequest::ChangePort(port) => {
|
||||
listener.request_port_change(port);
|
||||
let result = listener.port_changed().await;
|
||||
event_tx.send(EmulationEvent::PortChanged(result)).expect("channel closed");
|
||||
}
|
||||
EmulationRequest::Terminate => break,
|
||||
},
|
||||
_ = interval.tick() => {
|
||||
last_response.retain(|&addr,instant| {
|
||||
if instant.elapsed() > Duration::from_secs(5) {
|
||||
log::warn!("releasing keys: {addr} not responding!");
|
||||
emulation_proxy.release_keys(addr);
|
||||
event_tx.send(EmulationEvent::Disconnected { addr }).expect("channel closed");
|
||||
false
|
||||
} else {
|
||||
true
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
listener.terminate().await;
|
||||
emulation_proxy.terminate().await;
|
||||
}
|
||||
|
||||
/// wait for termination
|
||||
pub(crate) async fn terminate(&mut self) {
|
||||
log::debug!("terminating emulation");
|
||||
@@ -170,6 +108,78 @@ impl Emulation {
|
||||
}
|
||||
}
|
||||
|
||||
struct ListenTask {
|
||||
listener: LanMouseListener,
|
||||
emulation_proxy: EmulationProxy,
|
||||
request_rx: Receiver<EmulationRequest>,
|
||||
event_tx: Sender<EmulationEvent>,
|
||||
}
|
||||
|
||||
impl ListenTask {
|
||||
async fn run(mut self) {
|
||||
let mut interval = tokio::time::interval(Duration::from_secs(5));
|
||||
let mut last_response = HashMap::new();
|
||||
loop {
|
||||
select! {
|
||||
e = self.listener.next() => {
|
||||
let (event, addr) = match e {
|
||||
Some(e) => e,
|
||||
None => break,
|
||||
};
|
||||
log::trace!("{event} <-<-<-<-<- {addr}");
|
||||
last_response.insert(addr, Instant::now());
|
||||
match event {
|
||||
ProtoEvent::Enter(pos) => {
|
||||
if let Some(fingerprint) = self.listener.get_certificate_fingerprint(addr).await {
|
||||
log::info!("releasing capture: {addr} entered this device");
|
||||
self.event_tx.send(EmulationEvent::ReleaseNotify).expect("channel closed");
|
||||
self.listener.reply(addr, ProtoEvent::Ack(0)).await;
|
||||
self.event_tx.send(EmulationEvent::Connected{addr, pos: to_ipc_pos(pos), fingerprint}).expect("channel closed");
|
||||
}
|
||||
}
|
||||
ProtoEvent::Leave(_) => {
|
||||
self.emulation_proxy.release_keys(addr);
|
||||
self.listener.reply(addr, ProtoEvent::Ack(0)).await;
|
||||
}
|
||||
ProtoEvent::Input(event) => self.emulation_proxy.consume(event, addr),
|
||||
ProtoEvent::Ping => self.listener.reply(addr, ProtoEvent::Pong(self.emulation_proxy.emulation_active.get())).await,
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
event = self.emulation_proxy.event() => {
|
||||
self.event_tx.send(event).expect("channel closed");
|
||||
}
|
||||
request = self.request_rx.recv() => match request.expect("channel closed") {
|
||||
// reenable emulation
|
||||
EmulationRequest::Reenable => self.emulation_proxy.reenable(),
|
||||
// notify the other end that we hit a barrier (should release capture)
|
||||
EmulationRequest::Release(addr) => self.listener.reply(addr, ProtoEvent::Leave(0)).await,
|
||||
EmulationRequest::ChangePort(port) => {
|
||||
self.listener.request_port_change(port);
|
||||
let result = self.listener.port_changed().await;
|
||||
self.event_tx.send(EmulationEvent::PortChanged(result)).expect("channel closed");
|
||||
}
|
||||
EmulationRequest::Terminate => break,
|
||||
},
|
||||
_ = interval.tick() => {
|
||||
last_response.retain(|&addr,instant| {
|
||||
if instant.elapsed() > Duration::from_secs(5) {
|
||||
log::warn!("releasing keys: {addr} not responding!");
|
||||
self.emulation_proxy.release_keys(addr);
|
||||
self.event_tx.send(EmulationEvent::Disconnected { addr }).expect("channel closed");
|
||||
false
|
||||
} else {
|
||||
true
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
self.listener.terminate().await;
|
||||
self.emulation_proxy.terminate().await;
|
||||
}
|
||||
}
|
||||
|
||||
/// proxy handling the actual input emulation,
|
||||
/// discarding events when it is disabled
|
||||
pub(crate) struct EmulationProxy {
|
||||
@@ -193,12 +203,15 @@ impl EmulationProxy {
|
||||
let (event_tx, event_rx) = channel();
|
||||
let emulation_active = Rc::new(Cell::new(false));
|
||||
let exit_requested = Rc::new(Cell::new(false));
|
||||
let task = spawn_local(Self::emulation_task(
|
||||
let emulation_task = EmulationTask {
|
||||
backend,
|
||||
exit_requested.clone(),
|
||||
exit_requested: exit_requested.clone(),
|
||||
request_rx,
|
||||
event_tx,
|
||||
));
|
||||
handles: Default::default(),
|
||||
next_id: 0,
|
||||
};
|
||||
let task = spawn_local(emulation_task.run());
|
||||
Self {
|
||||
emulation_active,
|
||||
exit_requested,
|
||||
@@ -234,124 +247,6 @@ impl EmulationProxy {
|
||||
.expect("channel closed");
|
||||
}
|
||||
|
||||
async fn emulation_task(
|
||||
backend: Option<input_emulation::Backend>,
|
||||
exit_requested: Rc<Cell<bool>>,
|
||||
mut request_rx: Receiver<ProxyRequest>,
|
||||
event_tx: Sender<EmulationEvent>,
|
||||
) {
|
||||
let mut handles = HashMap::new();
|
||||
let mut next_id = 0;
|
||||
loop {
|
||||
if let Err(e) = Self::do_emulation(
|
||||
backend,
|
||||
&mut handles,
|
||||
&mut next_id,
|
||||
&mut request_rx,
|
||||
&event_tx,
|
||||
)
|
||||
.await
|
||||
{
|
||||
log::warn!("input emulation exited: {e}");
|
||||
}
|
||||
if exit_requested.get() {
|
||||
break;
|
||||
}
|
||||
// wait for reenable request
|
||||
loop {
|
||||
match request_rx.recv().await.expect("channel closed") {
|
||||
ProxyRequest::Reenable => break,
|
||||
ProxyRequest::Terminate => return,
|
||||
ProxyRequest::Input(..) => { /* emulation inactive => ignore */ }
|
||||
ProxyRequest::ReleaseKeys(..) => { /* emulation inactive => ignore */ }
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn do_emulation(
|
||||
backend: Option<input_emulation::Backend>,
|
||||
handles: &mut HashMap<SocketAddr, EmulationHandle>,
|
||||
next_id: &mut EmulationHandle,
|
||||
request_rx: &mut Receiver<ProxyRequest>,
|
||||
event_tx: &Sender<EmulationEvent>,
|
||||
) -> Result<(), InputEmulationError> {
|
||||
log::info!("creating input emulation ...");
|
||||
let mut emulation = tokio::select! {
|
||||
r = InputEmulation::new(backend) => r?,
|
||||
// allow termination event while requesting input emulation
|
||||
_ = wait_for_termination(request_rx) => return Ok(()),
|
||||
};
|
||||
|
||||
// used to send enabled and disabled events
|
||||
let _emulation_guard = DropGuard::new(
|
||||
event_tx,
|
||||
EmulationEvent::EmulationEnabled,
|
||||
EmulationEvent::EmulationDisabled,
|
||||
);
|
||||
|
||||
// create active handles
|
||||
if let Err(e) =
|
||||
Self::create_clients(&mut emulation, handles.values().copied(), request_rx).await
|
||||
{
|
||||
emulation.terminate().await;
|
||||
return Err(e);
|
||||
}
|
||||
|
||||
let res = Self::do_emulation_session(&mut emulation, handles, next_id, request_rx).await;
|
||||
// FIXME replace with async drop when stabilized
|
||||
emulation.terminate().await;
|
||||
res
|
||||
}
|
||||
|
||||
async fn create_clients(
|
||||
emulation: &mut InputEmulation,
|
||||
handles: impl Iterator<Item = EmulationHandle>,
|
||||
request_rx: &mut Receiver<ProxyRequest>,
|
||||
) -> Result<(), InputEmulationError> {
|
||||
for handle in handles {
|
||||
tokio::select! {
|
||||
_ = emulation.create(handle) => {},
|
||||
_ = wait_for_termination(request_rx) => return Ok(()),
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn do_emulation_session(
|
||||
emulation: &mut InputEmulation,
|
||||
handles: &mut HashMap<SocketAddr, EmulationHandle>,
|
||||
next_id: &mut EmulationHandle,
|
||||
rx: &mut Receiver<ProxyRequest>,
|
||||
) -> Result<(), InputEmulationError> {
|
||||
loop {
|
||||
tokio::select! {
|
||||
e = rx.recv() => match e.expect("channel closed") {
|
||||
ProxyRequest::Input(event, addr) => {
|
||||
let handle = match handles.get(&addr) {
|
||||
Some(&handle) => handle,
|
||||
None => {
|
||||
let handle = *next_id;
|
||||
*next_id += 1;
|
||||
emulation.create(handle).await;
|
||||
handles.insert(addr, handle);
|
||||
handle
|
||||
}
|
||||
};
|
||||
emulation.consume(event, handle).await?;
|
||||
},
|
||||
ProxyRequest::ReleaseKeys(addr) => {
|
||||
if let Some(&handle) = handles.get(&addr) {
|
||||
emulation.release_keys(handle).await?
|
||||
}
|
||||
}
|
||||
ProxyRequest::Terminate => break Ok(()),
|
||||
ProxyRequest::Reenable => continue,
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn reenable(&self) {
|
||||
self.request_tx
|
||||
.send(ProxyRequest::Reenable)
|
||||
@@ -367,6 +262,109 @@ impl EmulationProxy {
|
||||
}
|
||||
}
|
||||
|
||||
struct EmulationTask {
|
||||
backend: Option<input_emulation::Backend>,
|
||||
exit_requested: Rc<Cell<bool>>,
|
||||
request_rx: Receiver<ProxyRequest>,
|
||||
event_tx: Sender<EmulationEvent>,
|
||||
handles: HashMap<SocketAddr, EmulationHandle>,
|
||||
next_id: EmulationHandle,
|
||||
}
|
||||
|
||||
impl EmulationTask {
|
||||
async fn run(mut self) {
|
||||
loop {
|
||||
if let Err(e) = self.do_emulation().await {
|
||||
log::warn!("input emulation exited: {e}");
|
||||
}
|
||||
if self.exit_requested.get() {
|
||||
break;
|
||||
}
|
||||
// wait for reenable request
|
||||
loop {
|
||||
match self.request_rx.recv().await.expect("channel closed") {
|
||||
ProxyRequest::Reenable => break,
|
||||
ProxyRequest::Terminate => return,
|
||||
ProxyRequest::Input(..) => { /* emulation inactive => ignore */ }
|
||||
ProxyRequest::ReleaseKeys(..) => { /* emulation inactive => ignore */ }
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn do_emulation(&mut self) -> Result<(), InputEmulationError> {
|
||||
log::info!("creating input emulation ...");
|
||||
let mut emulation = tokio::select! {
|
||||
r = InputEmulation::new(self.backend) => r?,
|
||||
// allow termination event while requesting input emulation
|
||||
_ = wait_for_termination(&mut self.request_rx) => return Ok(()),
|
||||
};
|
||||
|
||||
// used to send enabled and disabled events
|
||||
let _emulation_guard = DropGuard::new(
|
||||
self.event_tx.clone(),
|
||||
EmulationEvent::EmulationEnabled,
|
||||
EmulationEvent::EmulationDisabled,
|
||||
);
|
||||
|
||||
// create active handles
|
||||
if let Err(e) = self.create_clients(&mut emulation).await {
|
||||
emulation.terminate().await;
|
||||
return Err(e);
|
||||
}
|
||||
|
||||
let res = self.do_emulation_session(&mut emulation).await;
|
||||
// FIXME replace with async drop when stabilized
|
||||
emulation.terminate().await;
|
||||
res
|
||||
}
|
||||
|
||||
async fn create_clients(
|
||||
&mut self,
|
||||
emulation: &mut InputEmulation,
|
||||
) -> Result<(), InputEmulationError> {
|
||||
for handle in self.handles.values() {
|
||||
tokio::select! {
|
||||
_ = emulation.create(*handle) => {},
|
||||
_ = wait_for_termination(&mut self.request_rx) => return Ok(()),
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn do_emulation_session(
|
||||
&mut self,
|
||||
emulation: &mut InputEmulation,
|
||||
) -> Result<(), InputEmulationError> {
|
||||
loop {
|
||||
tokio::select! {
|
||||
e = self.request_rx.recv() => match e.expect("channel closed") {
|
||||
ProxyRequest::Input(event, addr) => {
|
||||
let handle = match self.handles.get(&addr) {
|
||||
Some(&handle) => handle,
|
||||
None => {
|
||||
let handle = self.next_id;
|
||||
self.next_id += 1;
|
||||
emulation.create(handle).await;
|
||||
self.handles.insert(addr, handle);
|
||||
handle
|
||||
}
|
||||
};
|
||||
emulation.consume(event, handle).await?;
|
||||
},
|
||||
ProxyRequest::ReleaseKeys(addr) => {
|
||||
if let Some(&handle) = self.handles.get(&addr) {
|
||||
emulation.release_keys(handle).await?
|
||||
}
|
||||
}
|
||||
ProxyRequest::Terminate => break Ok(()),
|
||||
ProxyRequest::Reenable => continue,
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn to_ipc_pos(pos: Position) -> lan_mouse_ipc::Position {
|
||||
match pos {
|
||||
Position::Left => lan_mouse_ipc::Position::Left,
|
||||
@@ -387,20 +385,20 @@ async fn wait_for_termination(rx: &mut Receiver<ProxyRequest>) {
|
||||
}
|
||||
}
|
||||
|
||||
struct DropGuard<'a, T> {
|
||||
tx: &'a Sender<T>,
|
||||
struct DropGuard<T> {
|
||||
tx: Sender<T>,
|
||||
on_drop: Option<T>,
|
||||
}
|
||||
|
||||
impl<'a, T> DropGuard<'a, T> {
|
||||
fn new(tx: &'a Sender<T>, on_new: T, on_drop: T) -> Self {
|
||||
impl<T> DropGuard<T> {
|
||||
fn new(tx: Sender<T>, on_new: T, on_drop: T) -> Self {
|
||||
tx.send(on_new).expect("channel closed");
|
||||
let on_drop = Some(on_drop);
|
||||
Self { tx, on_drop }
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, T> Drop for DropGuard<'a, T> {
|
||||
impl<T> Drop for DropGuard<T> {
|
||||
fn drop(&mut self) {
|
||||
self.tx
|
||||
.send(self.on_drop.take().expect("item"))
|
||||
|
||||
Reference in New Issue
Block a user