- manual eventloop now replaced by asycn-await using the tokio runtime
- dns no longer blocks the event loop
- simplifies logic
- makes xdg-desktop-portal easier to integrate
This commit is contained in:
Ferdinand Schober
2023-10-11 14:52:18 +02:00
committed by GitHub
parent d4d6f05802
commit ab2514e508
13 changed files with 453 additions and 565 deletions

View File

@@ -1,21 +1,20 @@
use std::collections::HashMap;
use std::io::{Read, Result, Write};
use std::io::Result;
use std::str;
#[cfg(unix)]
use std::{env, path::{Path, PathBuf}};
use mio::Interest;
use mio::{Registry, Token, event::Source};
use tokio::io::{AsyncReadExt, WriteHalf, AsyncWriteExt};
use tokio::io::ReadHalf;
#[cfg(unix)]
use mio::net::UnixStream;
use tokio::net::UnixStream;
#[cfg(unix)]
use mio::net::UnixListener;
use tokio::net::UnixListener;
#[cfg(windows)]
use mio::net::TcpStream;
use tokio::net::TcpStream;
#[cfg(windows)]
use mio::net::TcpListener;
use tokio::net::TcpListener;
use serde::{Serialize, Deserialize};
@@ -60,11 +59,14 @@ pub struct FrontendListener {
listener: UnixListener,
#[cfg(unix)]
socket_path: PathBuf,
frontend_connections: HashMap<Token, FrontendConnection>,
#[cfg(unix)]
tx_streams: Vec<WriteHalf<UnixStream>>,
#[cfg(windows)]
tx_streams: Vec<WriteHalf<TcpStream>>,
}
impl FrontendListener {
pub fn new() -> std::result::Result<Self, Box<dyn std::error::Error>> {
pub async fn new() -> std::result::Result<Self, Box<dyn std::error::Error>> {
#[cfg(unix)]
let socket_path = Path::new(env::var("XDG_RUNTIME_DIR")?.as_str()).join("lan-mouse-socket.sock");
#[cfg(unix)]
@@ -77,86 +79,53 @@ impl FrontendListener {
let listener = UnixListener::bind(&socket_path)?;
#[cfg(windows)]
let listener = TcpListener::bind("127.0.0.1:5252".parse().unwrap())?; // abuse tcp
let listener = TcpListener::bind("127.0.0.1:5252").await?; // abuse tcp
let adapter = Self {
listener,
#[cfg(unix)]
socket_path,
frontend_connections: HashMap::new(),
tx_streams: vec![],
};
Ok(adapter)
}
#[cfg(unix)]
pub fn handle_incoming<F>(&mut self, register_frontend: F) -> Result<()>
where F: Fn(&mut UnixStream, Interest) -> Result<Token> {
let (mut stream, _) = self.listener.accept()?;
let token = register_frontend(&mut stream, Interest::READABLE)?;
let con = FrontendConnection::new(stream);
self.frontend_connections.insert(token, con);
Ok(())
pub async fn accept(&mut self) -> Result<ReadHalf<UnixStream>> {
let stream = self.listener.accept().await?.0;
let (rx, tx) = tokio::io::split(stream);
self.tx_streams.push(tx);
Ok(rx)
}
#[cfg(windows)]
pub fn handle_incoming<F>(&mut self, register_frontend: F) -> Result<()>
where F: Fn(&mut TcpStream, Interest) -> Result<Token> {
let (mut stream, _) = self.listener.accept()?;
let token = register_frontend(&mut stream, Interest::READABLE)?;
let con = FrontendConnection::new(stream);
self.frontend_connections.insert(token, con);
Ok(())
pub async fn accept(&mut self) -> Result<ReadHalf<TcpStream>> {
let stream = self.listener.accept().await?.0;
let (rx, tx) = tokio::io::split(stream);
self.tx_streams.push(tx);
Ok(rx)
}
pub fn read_event(&mut self, token: Token) -> Result<Option<FrontendEvent>> {
if let Some(con) = self.frontend_connections.get_mut(&token) {
con.handle_event()
} else {
panic!("unknown token");
}
}
pub(crate) fn notify_all(&mut self, notify: FrontendNotify) -> Result<()> {
pub(crate) async fn notify_all(&mut self, notify: FrontendNotify) -> Result<()> {
// encode event
let json = serde_json::to_string(&notify).unwrap();
let payload = json.as_bytes();
let len = payload.len().to_ne_bytes();
let len = payload.len().to_be_bytes();
log::debug!("json: {json}, len: {}", payload.len());
for con in self.frontend_connections.values_mut() {
// TODO do simultaneously
for tx in self.tx_streams.iter_mut() {
// write len + payload
con.stream.write(&len)?;
con.stream.write(payload)?;
tx.write(&len).await?;
tx.write(payload).await?;
}
Ok(())
}
}
impl Source for FrontendListener {
fn register(
&mut self,
registry: &Registry,
token: Token,
interests: mio::Interest,
) -> Result<()> {
self.listener.register(registry, token, interests)
}
fn reregister(
&mut self,
registry: &Registry,
token: Token,
interests: mio::Interest,
) -> Result<()> {
self.listener.reregister(registry, token, interests)
}
fn deregister(&mut self, registry: &Registry) -> Result<()> {
self.listener.deregister(registry)
}
}
#[cfg(unix)]
impl Drop for FrontendListener {
fn drop(&mut self) {
@@ -165,72 +134,20 @@ impl Drop for FrontendListener {
}
}
enum ReceiveState {
Len, Data,
#[cfg(unix)]
pub async fn read_event(stream: &mut ReadHalf<UnixStream>) -> Result<FrontendEvent> {
let len = stream.read_u64().await?;
assert!(len <= 256);
let mut buf = [0u8; 256];
stream.read_exact(&mut buf[..len as usize]).await?;
Ok(serde_json::from_slice(&buf[..len as usize])?)
}
pub struct FrontendConnection {
#[cfg(unix)]
stream: UnixStream,
#[cfg(windows)]
stream: TcpStream,
state: ReceiveState,
len: usize,
len_buf: [u8; std::mem::size_of::<usize>()],
recieve_buf: [u8; 256], // FIXME
pos: usize,
#[cfg(windows)]
pub async fn read_event(stream: &mut ReadHalf<TcpStream>) -> Result<FrontendEvent> {
let len = stream.read_u64().await?;
let mut buf = [0u8; 256];
stream.read_exact(&mut buf[..len as usize]).await?;
Ok(serde_json::from_slice(&buf[..len as usize])?)
}
impl FrontendConnection {
#[cfg(unix)]
pub fn new(stream: UnixStream) -> Self {
Self {
stream,
state: ReceiveState::Len,
len: 0,
len_buf: [0u8; std::mem::size_of::<usize>()],
recieve_buf: [0u8; 256],
pos: 0,
}
}
#[cfg(windows)]
pub fn new(stream: TcpStream) -> Self {
Self {
stream,
state: ReceiveState::Len,
len: 0,
len_buf: [0u8; std::mem::size_of::<usize>()],
recieve_buf: [0u8; 256],
pos: 0,
}
}
pub fn handle_event(&mut self) -> Result<Option<FrontendEvent>> {
match self.state {
ReceiveState::Len => {
// we receive sizeof(usize) Bytes
let n = self.stream.read(&mut self.len_buf)?;
self.pos += n;
if self.pos == self.len_buf.len() {
self.state = ReceiveState::Data;
self.len = usize::from_ne_bytes(self.len_buf);
self.pos = 0;
}
Ok(None)
},
ReceiveState::Data => {
// read at most as many bytes as the length of the next event
let n = self.stream.read(&mut self.recieve_buf[..self.len])?;
self.pos += n;
if n == self.len {
self.state = ReceiveState::Len;
self.pos = 0;
Ok(Some(serde_json::from_slice(&self.recieve_buf[..self.len])?))
} else {
Ok(None)
}
}
}
}
}