Background service (#43)

better handling of background-service: lan-mouse can now be run without a gui by specifying --daemon as an argument.
Otherwise the servic will be run as a child process and correctly terminate when the window is closed / frontend exits.

Closes #38
This commit is contained in:
Ferdinand Schober
2023-12-09 00:36:01 +01:00
committed by GitHub
parent 9b242f6138
commit 56e5f7a30d
13 changed files with 488 additions and 170 deletions

View File

@@ -1,4 +1,3 @@
use anyhow::Result;
use async_trait::async_trait;
use crate::{event::{KeyboardEvent, PointerEvent}, consumer::EventConsumer};
use winapi::{

View File

@@ -1,4 +1,5 @@
use std::{error::Error, io, result::Result, task::Poll};
use anyhow::Result;
use std::{io, task::Poll};
use futures_core::Stream;
@@ -7,7 +8,7 @@ use crate::{producer::EventProducer, event::Event, client::ClientHandle};
pub struct LibeiProducer {}
impl LibeiProducer {
pub fn new() -> Result<Self, Box<dyn Error>> {
pub fn new() -> Result<Self> {
Ok(Self { })
}
}

View File

@@ -1,11 +1,11 @@
use anyhow::Result;
use serde::{Deserialize, Serialize};
use core::fmt;
use std::collections::HashSet;
use std::net::IpAddr;
use std::{error::Error, fs};
use std::env;
use toml;
use clap::Parser;
use crate::client::Position;
@@ -28,19 +28,6 @@ pub struct Client {
pub port: Option<u16>,
}
#[derive(Debug, Clone)]
struct MissingParameter {
arg: &'static str,
}
impl fmt::Display for MissingParameter {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Missing a parameter for argument: {}", self.arg)
}
}
impl Error for MissingParameter {}
impl ConfigToml {
pub fn new(path: &str) -> Result<ConfigToml, Box<dyn Error>> {
let config = fs::read_to_string(path)?;
@@ -49,35 +36,43 @@ impl ConfigToml {
}
}
fn find_arg(key: &'static str) -> Result<Option<String>, MissingParameter> {
let args: Vec<String> = env::args().collect();
#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
struct CliArgs {
/// the listen port for lan-mouse
#[arg(short, long)]
port: Option<u16>,
for (i, arg) in args.iter().enumerate() {
if arg != key {
continue;
}
match args.get(i+1) {
None => return Err(MissingParameter { arg: key }),
Some(arg) => return Ok(Some(arg.clone())),
};
}
Ok(None)
/// the frontend to use [cli | gtk]
#[arg(short, long)]
frontend: Option<String>,
/// non-default config file location
#[arg(short, long)]
config: Option<String>,
/// run only the service as a daemon without the frontend
#[arg(short, long)]
daemon: bool,
}
#[derive(PartialEq, Eq)]
#[derive(Debug, PartialEq, Eq)]
pub enum Frontend {
Gtk,
Cli,
}
#[derive(Debug)]
pub struct Config {
pub frontend: Frontend,
pub port: u16,
pub clients: Vec<(Client, Position)>,
pub daemon: bool,
}
impl Config {
pub fn new() -> Result<Self, Box<dyn Error>> {
pub fn new() -> Result<Self> {
let args = CliArgs::parse();
let config_file = "config.toml";
#[cfg(unix)] let config_path = {
let xdg_config_home = env::var("XDG_CONFIG_HOME")
@@ -92,7 +87,7 @@ impl Config {
};
// --config <file> overrules default location
let config_path = find_arg("--config")?.unwrap_or(config_path);
let config_path = args.config.unwrap_or(config_path);
let config_toml = match ConfigToml::new(config_path.as_str()) {
Err(e) => {
@@ -103,7 +98,7 @@ impl Config {
Ok(c) => Some(c),
};
let frontend = match find_arg("--frontend")? {
let frontend = match args.frontend {
None => match &config_toml {
Some(c) => c.frontend.clone(),
None => None,
@@ -123,8 +118,8 @@ impl Config {
}
};
let port = match find_arg("--port")? {
Some(port) => port.parse::<u16>()?,
let port = match args.port {
Some(port) => port,
None => match &config_toml {
Some(c) => c.port.unwrap_or(DEFAULT_PORT),
None => DEFAULT_PORT,
@@ -148,7 +143,14 @@ impl Config {
}
}
Ok(Config { frontend, clients, port })
let daemon = args.daemon;
Ok(Config {
daemon,
frontend,
clients,
port,
})
}
pub fn get_clients(&self) -> Vec<(HashSet<IpAddr>, Option<String>, u16, Position)> {

View File

@@ -1,5 +1,5 @@
use std::io::Result;
use std::str;
use anyhow::{Result, anyhow};
use std::{str, io::ErrorKind, time::Duration, cmp::min};
#[cfg(unix)]
use std::{env, path::{Path, PathBuf}};
@@ -11,6 +11,7 @@ use tokio::io::ReadHalf;
use tokio::net::UnixStream;
#[cfg(unix)]
use tokio::net::UnixListener;
#[cfg(windows)]
use tokio::net::TcpStream;
#[cfg(windows)]
@@ -18,7 +19,7 @@ use tokio::net::TcpListener;
use serde::{Serialize, Deserialize};
use crate::client::{Position, ClientHandle, Client};
use crate::{client::{Position, ClientHandle, Client}, config::{Config, Frontend}};
/// cli frontend
pub mod cli;
@@ -27,6 +28,52 @@ pub mod cli;
#[cfg(all(unix, feature = "gtk"))]
pub mod gtk;
pub fn run_frontend(config: &Config) -> Result<()> {
match config.frontend {
#[cfg(all(unix, feature = "gtk"))]
Frontend::Gtk => { gtk::run(); }
#[cfg(any(not(feature = "gtk"), not(unix)))]
Frontend::Gtk => panic!("gtk frontend requested but feature not enabled!"),
Frontend::Cli => { cli::run()?; }
};
Ok(())
}
fn exponential_back_off(duration: &mut Duration) -> &Duration {
let new = duration.saturating_mul(2);
*duration = min(new, Duration::from_secs(1));
duration
}
/// wait for the lan-mouse socket to come online
#[cfg(unix)]
pub fn wait_for_service() -> Result<std::os::unix::net::UnixStream> {
let socket_path = FrontendListener::socket_path()?;
let mut duration = Duration::from_millis(1);
loop {
use std::os::unix::net::UnixStream;
if let Ok(stream) = UnixStream::connect(&socket_path) {
break Ok(stream)
}
// a signaling mechanism or inotify could be used to
// improve this
std::thread::sleep(*exponential_back_off(&mut duration));
}
}
#[cfg(windows)]
pub fn wait_for_service() -> Result<std::net::TcpStream> {
let mut duration = Duration::from_millis(1);
loop {
use std::net::TcpStream;
if let Ok(stream) = TcpStream::connect("127.0.0.1:5252") {
break Ok(stream)
}
std::thread::sleep(*exponential_back_off(&mut duration));
}
}
#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
pub enum FrontendEvent {
/// add a new client
@@ -70,20 +117,54 @@ pub struct FrontendListener {
}
impl FrontendListener {
pub async fn new() -> std::result::Result<Self, Box<dyn std::error::Error>> {
#[cfg(unix)]
pub fn socket_path() -> Result<PathBuf> {
let xdg_runtime_dir = match env::var("XDG_RUNTIME_DIR") {
Ok(d) => d,
Err(e) => return Err(anyhow!("could not find XDG_RUNTIME_DIR: {e}")),
};
let xdg_runtime_dir = Path::new(xdg_runtime_dir.as_str());
Ok(xdg_runtime_dir.join("lan-mouse-socket.sock"))
}
pub async fn new() -> Option<Result<Self>> {
#[cfg(unix)]
let socket_path = Path::new(env::var("XDG_RUNTIME_DIR")?.as_str()).join("lan-mouse-socket.sock");
#[cfg(unix)]
log::debug!("remove socket: {:?}", socket_path);
#[cfg(unix)]
if socket_path.exists() {
std::fs::remove_file(&socket_path).unwrap();
}
#[cfg(unix)]
let listener = UnixListener::bind(&socket_path)?;
let (socket_path, listener) = {
let socket_path = match Self::socket_path() {
Ok(path) => path,
Err(e) => return Some(Err(e)),
};
log::debug!("remove socket: {:?}", socket_path);
if socket_path.exists() {
// try to connect to see if some other instance
// of lan-mouse is already running
match UnixStream::connect(&socket_path).await {
// connected -> lan-mouse is already running
Ok(_) => return None,
// lan-mouse is not running but a socket was left behind
Err(e) => {
log::debug!("{socket_path:?}: {e} - removing left behind socket");
let _ = std::fs::remove_file(&socket_path);
},
}
}
let listener = match UnixListener::bind(&socket_path) {
Ok(ls) => ls,
// some other lan-mouse instance has bound the socket in the meantime
Err(e) if e.kind() == ErrorKind::AddrInUse => return None,
Err(e) => return Some(Err(anyhow!("failed to bind lan-mouse-socket: {e}"))),
};
(socket_path, listener)
};
#[cfg(windows)]
let listener = TcpListener::bind("127.0.0.1:5252").await?; // abuse tcp
let listener = match TcpListener::bind("127.0.0.1:5252").await {
Ok(ls) => ls,
// some other lan-mouse instance has bound the socket in the meantime
Err(e) if e.kind() == ErrorKind::AddrInUse => return None,
Err(e) => return Some(Err(anyhow!("failed to bind lan-mouse-socket: {e}"))),
};
let adapter = Self {
listener,
@@ -92,7 +173,7 @@ impl FrontendListener {
tx_streams: vec![],
};
Ok(adapter)
Some(Ok(adapter))
}
#[cfg(unix)]
@@ -121,12 +202,25 @@ impl FrontendListener {
let len = payload.len().to_be_bytes();
log::debug!("json: {json}, len: {}", payload.len());
let mut keep = vec![];
// TODO do simultaneously
for tx in self.tx_streams.iter_mut() {
// write len + payload
tx.write(&len).await?;
tx.write(payload).await?;
if let Err(_) = tx.write(&len).await {
keep.push(false);
continue;
}
if let Err(_) = tx.write(payload).await {
keep.push(false);
continue;
}
keep.push(true);
}
// could not find a better solution because async
let mut keep = keep.into_iter();
self.tx_streams.retain(|_| keep.next().unwrap());
Ok(())
}
}

View File

@@ -1,10 +1,10 @@
use anyhow::{anyhow, Result, Context};
use std::{thread::{self, JoinHandle}, io::{Write, Read, ErrorKind}, str::SplitWhitespace};
use std::{thread, io::{Write, Read, ErrorKind}, str::SplitWhitespace};
#[cfg(windows)]
use std::net::SocketAddrV4;
#[cfg(unix)]
use std::{os::unix::net::UnixStream, path::Path, env};
use std::os::unix::net::UnixStream;
#[cfg(windows)]
use std::net::TcpStream;
@@ -12,9 +12,9 @@ use crate::{client::Position, config::DEFAULT_PORT};
use super::{FrontendEvent, FrontendNotify};
pub fn start() -> Result<(JoinHandle<()>, JoinHandle<()>)> {
pub fn run() -> Result<()> {
#[cfg(unix)]
let socket_path = Path::new(env::var("XDG_RUNTIME_DIR")?.as_str()).join("lan-mouse-socket.sock");
let socket_path = super::FrontendListener::socket_path()?;
#[cfg(unix)]
let Ok(mut tx) = UnixStream::connect(&socket_path) else {
@@ -50,7 +50,7 @@ pub fn start() -> Result<(JoinHandle<()>, JoinHandle<()>)> {
log::error!("error sending message: {e}");
};
if *event == FrontendEvent::Shutdown() {
break;
return;
}
}
// prompt is printed after the server response is received
@@ -126,7 +126,29 @@ pub fn start() -> Result<(JoinHandle<()>, JoinHandle<()>)> {
prompt();
}
})?;
Ok((reader, writer))
match reader.join() {
Ok(_) => (),
Err(e) => {
let msg = match (e.downcast_ref::<&str>(), e.downcast_ref::<String>()) {
(Some(&s), _) => s,
(_, Some(s)) => s,
_ => "no panic info"
};
log::error!("reader thread paniced: {msg}");
},
}
match writer.join() {
Ok(_) => (),
Err(e) => {
let msg = match (e.downcast_ref::<&str>(), e.downcast_ref::<String>()) {
(Some(&s), _) => s,
(_, Some(s)) => s,
_ => "no panic info"
};
log::error!("writer thread paniced: {msg}");
},
}
Ok(())
}
fn prompt() {

View File

@@ -2,7 +2,7 @@ mod window;
mod client_object;
mod client_row;
use std::{io::{Result, Read, ErrorKind}, thread::{self, JoinHandle}, env, process, path::Path, os::unix::net::UnixStream, str};
use std::{io::{Read, ErrorKind}, env, process, str};
use crate::{frontend::gtk::window::Window, config::DEFAULT_PORT};
@@ -14,11 +14,11 @@ use self::client_object::ClientObject;
use super::FrontendNotify;
pub fn start() -> Result<JoinHandle<glib::ExitCode>> {
log::debug!("starting gtk frontend");
thread::Builder::new()
.name("gtk-thread".into())
.spawn(gtk_main)
pub fn run() -> glib::ExitCode {
log::debug!("running gtk frontend");
let ret = gtk_main();
log::debug!("frontend exited");
ret
}
fn gtk_main() -> glib::ExitCode {
@@ -33,7 +33,8 @@ fn gtk_main() -> glib::ExitCode {
app.connect_startup(|_| load_css());
app.connect_activate(build_ui);
app.run()
let args: Vec<&'static str> = vec![];
app.run_with_args(&args)
}
fn load_css() {
@@ -52,20 +53,14 @@ fn load_icons() {
}
fn build_ui(app: &Application) {
let xdg_runtime_dir = match env::var("XDG_RUNTIME_DIR") {
Ok(v) => v,
log::debug!("connecting to lan-mouse-socket");
let mut rx = match super::wait_for_service() {
Ok(stream) => stream,
Err(e) => {
log::error!("{e}");
log::error!("could not connect to lan-mouse-socket: {e}");
process::exit(1);
}
};
log::debug!("connecting to lan-mouse-socket ... ");
let socket_path = Path::new(xdg_runtime_dir.as_str())
.join("lan-mouse-socket.sock");
let Ok(mut rx) = UnixStream::connect(&socket_path) else {
log::error!("Could not connect to lan-mouse-socket @ {socket_path:?}");
process::exit(1);
};
let tx = match rx.try_clone() {
Ok(sock) => sock,
Err(e) => {

View File

@@ -15,8 +15,6 @@ pub struct Window {
#[template_child]
pub port_edit_cancel: TemplateChild<Button>,
#[template_child]
pub add_client_button: TemplateChild<Button>,
#[template_child]
pub client_list: TemplateChild<ListBox>,
#[template_child]
pub client_placeholder: TemplateChild<ActionRow>,

View File

@@ -1,15 +1,14 @@
use std::{process, error::Error};
use anyhow::Result;
use std::process::{self, Command, Child};
use env_logger::Env;
use lan_mouse::{
consumer, producer,
config::{Config, Frontend::{Cli, Gtk}}, server::Server,
frontend::{FrontendListener, cli},
config::Config, server::Server,
frontend::{FrontendListener, self},
};
#[cfg(all(unix, feature = "gtk"))]
use lan_mouse::frontend::gtk;
use tokio::task::LocalSet;
use tokio::{task::LocalSet, join};
pub fn main() {
@@ -23,10 +22,36 @@ pub fn main() {
}
}
pub fn run() -> Result<(), Box<dyn Error>> {
// parse config file
let config = Config::new()?;
pub fn start_service() -> Result<Child> {
let child = Command::new(std::env::current_exe()?)
.args(std::env::args().skip(1))
.arg("--daemon")
.spawn()?;
Ok(child)
}
pub fn run() -> Result<()> {
// parse config file + cli args
let config = Config::new()?;
log::debug!("{config:?}");
if config.daemon {
// if daemon is specified we run the service
run_service(&config)?;
} else {
// otherwise start the service as a child process and
// run a frontend
start_service()?;
frontend::run_frontend(&config)?;
}
anyhow::Ok(())
}
fn run_service(config: &Config) -> Result<()> {
// create single threaded tokio runtime
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_io()
.enable_time()
@@ -34,37 +59,33 @@ pub fn run() -> Result<(), Box<dyn Error>> {
// run async event loop
runtime.block_on(LocalSet::new().run_until(async {
// start producing and consuming events
let producer = producer::create()?;
let consumer = consumer::create().await?;
// create frontend communication adapter
let frontend_adapter = FrontendListener::new().await?;
// start frontend
match config.frontend {
#[cfg(all(unix, feature = "gtk"))]
Gtk => { gtk::start()?; }
#[cfg(any(not(feature = "gtk"), not(unix)))]
Gtk => panic!("gtk frontend requested but feature not enabled!"),
Cli => { cli::start()?; }
let frontend_adapter = match FrontendListener::new().await {
Some(Err(e)) => return Err(e),
Some(Ok(f)) => f,
None => {
// none means some other instance is already running
log::info!("service already running, exiting");
return anyhow::Ok(())
}
,
};
// start sending and receiving events
let mut event_server = Server::new(config.port, frontend_adapter, consumer, producer).await?;
log::debug!("created server");
// add clients from config
for (c,h,port,p) in config.get_clients().into_iter() {
event_server.add_client(h, c, port, p).await;
}
// create event producer and consumer
let (producer, consumer) = join!(
producer::create(),
consumer::create(),
);
let (producer, consumer) = (producer?, consumer?);
// create server
let mut event_server = Server::new(config, frontend_adapter, consumer, producer).await?;
log::info!("Press Ctrl+Alt+Shift+Super to release the mouse");
// run event loop
event_server.run().await?;
Result::<_, Box<dyn Error>>::Ok(())
log::debug!("service exiting");
anyhow::Ok(())
}))?;
log::debug!("exiting main");
Ok(())
}

View File

@@ -1,4 +1,5 @@
use std::{error::Error, io};
use anyhow::Result;
use std::io;
use futures_core::Stream;
@@ -15,7 +16,7 @@ enum Backend {
X11,
}
pub fn create() -> Result<Box<dyn EventProducer>, Box<dyn Error>> {
pub async fn create() -> Result<Box<dyn EventProducer>> {
#[cfg(windows)]
return Ok(Box::new(producer::windows::WindowsProducer::new()));

View File

@@ -11,7 +11,7 @@ use tokio::net::TcpStream;
use std::{net::SocketAddr, io::ErrorKind};
use crate::{client::{ClientEvent, ClientManager, Position, ClientHandle}, consumer::EventConsumer, producer::EventProducer, frontend::{FrontendEvent, FrontendListener, FrontendNotify, self}, dns::{self, DnsResolver}};
use crate::{client::{ClientEvent, ClientManager, Position, ClientHandle}, consumer::EventConsumer, producer::EventProducer, frontend::{FrontendEvent, FrontendListener, FrontendNotify, self}, dns::{self, DnsResolver}, config::Config};
use crate::event::Event;
/// keeps track of state to prevent a feedback loop
@@ -36,7 +36,7 @@ pub struct Server {
impl Server {
pub async fn new(
port: u16,
config: &Config,
frontend: FrontendListener,
consumer: Box<dyn EventConsumer>,
producer: Box<dyn EventProducer>,
@@ -46,13 +46,13 @@ impl Server {
let resolver = dns::DnsResolver::new().await?;
// bind the udp socket
let listen_addr = SocketAddr::new("0.0.0.0".parse().unwrap(), port);
let listen_addr = SocketAddr::new("0.0.0.0".parse().unwrap(), config.port);
let socket = UdpSocket::bind(listen_addr).await?;
let (frontend_tx, frontend_rx) = tokio::sync::mpsc::channel(1);
// create client manager
let client_manager = ClientManager::new();
Ok(Server {
let mut server = Server {
frontend,
consumer,
producer,
@@ -62,7 +62,14 @@ impl Server {
state: State::Receiving,
frontend_rx,
frontend_tx,
})
};
// add clients from config
for (c,h,port,p) in config.get_clients().into_iter() {
server.add_client(h, c, port, p).await;
}
Ok(server)
}
pub async fn run(&mut self) -> anyhow::Result<()> {
@@ -85,7 +92,7 @@ impl Server {
Some(Ok((client, event))) => {
self.handle_producer_event(client,event).await;
},
Some(Err(e)) => log::error!("{e}"),
Some(Err(e)) => log::error!("error reading from event producer: {e}"),
_ => break,
}
}
@@ -145,7 +152,7 @@ impl Server {
log::debug!("add_client {client}");
let notify = FrontendNotify::NotifyClientCreate(client, hostname, port, pos);
if let Err(e) = self.frontend.notify_all(notify).await {
log::error!("{e}");
log::error!("error notifying frontend: {e}");
};
client
}
@@ -170,7 +177,7 @@ impl Server {
let notify = FrontendNotify::NotifyClientDelete(client);
log::debug!("{notify:?}");
if let Err(e) = self.frontend.notify_all(notify).await {
log::error!("{e}");
log::error!("error notifying frontend: {e}");
}
Some(client)
} else {
@@ -367,13 +374,22 @@ impl Server {
#[cfg(unix)]
async fn handle_frontend_stream(&mut self, mut stream: ReadHalf<UnixStream>) {
use std::io;
let tx = self.frontend_tx.clone();
tokio::task::spawn_local(async move {
loop {
let event = frontend::read_event(&mut stream).await;
match event {
Ok(event) => tx.send(event).await.unwrap(),
Err(e) => log::error!("error reading frontend event: {e}"),
Err(e) => {
if let Some(e) = e.downcast_ref::<io::Error>() {
if e.kind() == ErrorKind::UnexpectedEof {
return;
}
}
log::error!("error reading frontend event: {e}");
}
}
}
});
@@ -439,7 +455,7 @@ impl Server {
async fn enumerate(&mut self) {
let clients = self.client_manager.enumerate();
if let Err(e) = self.frontend.notify_all(FrontendNotify::Enumerate(clients)).await {
log::error!("{e}");
log::error!("error notifying frontend: {e}");
}
}
}