diff --git a/Cargo.toml b/Cargo.toml index 32bb413..18bee58 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,7 +22,7 @@ anyhow = "1.0.71" log = "0.4.20" env_logger = "0.11.3" serde_json = "1.0.107" -tokio = {version = "1.32.0", features = ["io-util", "macros", "net", "rt", "sync", "signal"] } +tokio = {version = "1.32.0", features = ["io-util", "io-std", "macros", "net", "rt", "sync", "signal"] } async-trait = "0.1.73" futures-core = "0.3.28" futures = "0.3.28" diff --git a/src/capture/libei.rs b/src/capture/libei.rs index 1b27587..df1529f 100644 --- a/src/capture/libei.rs +++ b/src/capture/libei.rs @@ -119,7 +119,7 @@ async fn update_barriers( .set_pointer_barriers(session, &barriers, zones.zone_set()) .await?; let response = response.response()?; - log::info!("{response:?}"); + log::debug!("{response:?}"); Ok(id_map) } @@ -132,7 +132,7 @@ impl<'a> Drop for LibeiInputCapture<'a> { async fn create_session<'a>( input_capture: &'a InputCapture<'a>, ) -> Result<(Session<'a>, BitFlags)> { - log::info!("creating input capture session"); + log::debug!("creating input capture session"); let (session, capabilities) = loop { match input_capture .create_session( @@ -154,7 +154,7 @@ async fn connect_to_eis( input_capture: &InputCapture<'_>, session: &Session<'_>, ) -> Result<(ei::Context, EiConvertEventStream)> { - log::info!("connect_to_eis"); + log::debug!("connect_to_eis"); let fd = input_capture.connect_to_eis(session).await?; // create unix stream from fd @@ -270,7 +270,7 @@ impl<'a> LibeiInputCapture<'a> { ) .await?; - log::info!("enabling session"); + log::debug!("enabling session"); input_capture.enable(&session).await?; loop { diff --git a/src/client.rs b/src/client.rs index 2d3d279..4371867 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,12 +1,16 @@ use std::{ collections::HashSet, + error::Error, fmt::Display, net::{IpAddr, SocketAddr}, + str::FromStr, }; use serde::{Deserialize, Serialize}; use slab::Slab; +use crate::config::DEFAULT_PORT; + #[derive(Debug, Eq, Hash, PartialEq, Clone, Copy, Serialize, Deserialize)] pub enum Position { Left, @@ -21,6 +25,33 @@ impl Default for Position { } } +#[derive(Debug)] +pub struct PositionParseError { + string: String, +} + +impl Display for PositionParseError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "not a valid position: {}", self.string) + } +} + +impl Error for PositionParseError {} + +impl FromStr for Position { + type Err = PositionParseError; + + fn from_str(s: &str) -> Result { + match s { + "left" => Ok(Self::Left), + "right" => Ok(Self::Right), + "top" => Ok(Self::Top), + "bottom" => Ok(Self::Bottom), + _ => Err(PositionParseError { string: s.into() }), + } + } +} + impl Position { pub fn opposite(&self) -> Self { match self { @@ -62,21 +93,28 @@ impl TryFrom<&str> for Position { } #[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)] -pub struct Client { +pub struct ClientConfig { /// hostname of this client pub hostname: Option, /// fix ips, determined by the user pub fix_ips: Vec, - /// all ip addresses associated with a particular client - /// e.g. Laptops usually have at least an ethernet and a wifi port - /// which have different ip addresses - pub ips: HashSet, /// both active_addr and addrs can be None / empty so port needs to be stored seperately pub port: u16, /// position of a client on screen pub pos: Position, } +impl Default for ClientConfig { + fn default() -> Self { + Self { + port: DEFAULT_PORT, + hostname: Default::default(), + fix_ips: Default::default(), + pos: Default::default(), + } + } +} + #[derive(Clone, Copy, Debug)] pub enum ClientEvent { Create(ClientHandle, Position), @@ -85,10 +123,8 @@ pub enum ClientEvent { pub type ClientHandle = u64; -#[derive(Debug, Clone)] +#[derive(Debug, Default, Clone, Serialize, Deserialize)] pub struct ClientState { - /// information about the client - pub client: Client, /// events should be sent to and received from the client pub active: bool, /// `active` address of the client, used to send data to. @@ -97,12 +133,16 @@ pub struct ClientState { pub active_addr: Option, /// tracks whether or not the client is responding to pings pub alive: bool, + /// all ip addresses associated with a particular client + /// e.g. Laptops usually have at least an ethernet and a wifi port + /// which have different ip addresses + pub ips: HashSet, /// keys currently pressed by this client pub pressed_keys: HashSet, } pub struct ClientManager { - clients: Slab, + clients: Slab<(ClientConfig, ClientState)>, } impl Default for ClientManager { @@ -118,32 +158,10 @@ impl ClientManager { } /// add a new client to this manager - pub fn add_client( - &mut self, - hostname: Option, - ips: HashSet, - port: u16, - pos: Position, - active: bool, - ) -> ClientHandle { - // store fix ip addresses - let fix_ips = ips.iter().cloned().collect(); - - let client_state = ClientState { - client: Client { - hostname, - fix_ips, - ips, - port, - pos, - }, - active, - active_addr: None, - alive: false, - pressed_keys: HashSet::new(), - }; - - self.clients.insert(client_state) as ClientHandle + pub fn add_client(&mut self) -> ClientHandle { + let client_config = Default::default(); + let client_state = Default::default(); + self.clients.insert((client_config, client_state)) as ClientHandle } /// find a client by its address @@ -152,8 +170,8 @@ impl ClientManager { // time this is likely faster than using a HashMap self.clients .iter() - .find_map(|(k, c)| { - if c.active && c.client.ips.contains(&addr.ip()) { + .find_map(|(k, (_, s))| { + if s.active && s.ips.contains(&addr.ip()) { Some(k) } else { None @@ -165,8 +183,8 @@ impl ClientManager { pub fn find_client(&self, pos: Position) -> Option { self.clients .iter() - .find_map(|(k, c)| { - if c.active && c.client.pos == pos { + .find_map(|(k, (c, s))| { + if s.active && c.pos == pos { Some(k) } else { None @@ -176,28 +194,30 @@ impl ClientManager { } /// remove a client from the list - pub fn remove_client(&mut self, client: ClientHandle) -> Option { + pub fn remove_client(&mut self, client: ClientHandle) -> Option<(ClientConfig, ClientState)> { // remove id from occupied ids self.clients.try_remove(client as usize) } // returns an immutable reference to the client state corresponding to `client` - pub fn get(&self, client: ClientHandle) -> Option<&ClientState> { - self.clients.get(client as usize) + pub fn get(&self, handle: ClientHandle) -> Option<&(ClientConfig, ClientState)> { + self.clients.get(handle as usize) } /// returns a mutable reference to the client state corresponding to `client` - pub fn get_mut(&mut self, client: ClientHandle) -> Option<&mut ClientState> { - self.clients.get_mut(client as usize) + pub fn get_mut(&mut self, handle: ClientHandle) -> Option<&mut (ClientConfig, ClientState)> { + self.clients.get_mut(handle as usize) } - pub fn get_client_states(&self) -> impl Iterator { + pub fn get_client_states( + &self, + ) -> impl Iterator { self.clients.iter().map(|(k, v)| (k as ClientHandle, v)) } pub fn get_client_states_mut( &mut self, - ) -> impl Iterator { + ) -> impl Iterator { self.clients.iter_mut().map(|(k, v)| (k as ClientHandle, v)) } } diff --git a/src/frontend.rs b/src/frontend.rs index 53d9aaa..5777cc7 100644 --- a/src/frontend.rs +++ b/src/frontend.rs @@ -1,5 +1,5 @@ use anyhow::{anyhow, Result}; -use std::{cmp::min, io::ErrorKind, str, time::Duration}; +use std::{cmp::min, io::ErrorKind, net::IpAddr, str, time::Duration}; #[cfg(unix)] use std::{ @@ -23,7 +23,7 @@ use tokio::net::TcpStream; use serde::{Deserialize, Serialize}; use crate::{ - client::{Client, ClientHandle, Position}, + client::{ClientConfig, ClientHandle, ClientState, Position}, config::{Config, Frontend}, }; @@ -85,10 +85,10 @@ pub fn wait_for_service() -> Result { #[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)] pub enum FrontendRequest { - /// add a new client - Create(Option, u16, Position), /// activate/deactivate client Activate(ClientHandle, bool), + /// add a new client + Create, /// change the listen port (recreate udp listener) ChangePort(u16), /// remove a client @@ -97,24 +97,30 @@ pub enum FrontendRequest { Enumerate(), /// service shutdown Terminate(), - /// update a client (hostname, port, position) - Update(ClientHandle, Option, u16, Position), + /// update hostname + UpdateHostname(ClientHandle, Option), + /// update port + UpdatePort(ClientHandle, u16), + /// update position + UpdatePosition(ClientHandle, Position), + /// update fix-ips + UpdateFixIps(ClientHandle, Vec), } #[derive(Debug, Clone, Serialize, Deserialize)] pub enum FrontendEvent { - /// the given client was activated - Activated(ClientHandle, bool), /// a client was created - Created(ClientHandle, Client), + Created(ClientHandle, ClientConfig, ClientState), /// a client was updated - Updated(ClientHandle, Client), + Updated(ClientHandle, ClientConfig), + /// state changed + StateChange(ClientHandle, ClientState), /// the client was deleted Deleted(ClientHandle), /// new port, reason of failure (if failed) PortChanged(u16, Option), /// list of all clients, used for initial state synchronization - Enumerate(Vec<(ClientHandle, Client, bool)>), + Enumerate(Vec<(ClientHandle, ClientConfig, ClientState)>), /// an error occured Error(String), } @@ -229,6 +235,7 @@ impl FrontendListener { let len = payload.len().to_be_bytes(); log::debug!("json: {json}, len: {}", payload.len()); + log::debug!("broadcasting event to streams: {:?}", self.tx_streams); let mut keep = vec![]; // TODO do simultaneously for tx in self.tx_streams.iter_mut() { diff --git a/src/frontend/cli.rs b/src/frontend/cli.rs index 536860e..2bc1fee 100644 --- a/src/frontend/cli.rs +++ b/src/frontend/cli.rs @@ -1,240 +1,347 @@ -use anyhow::{anyhow, Context, Result}; - -use std::{ - io::{ErrorKind, Read, Write}, - str::SplitWhitespace, - thread, +use anyhow::{anyhow, Result}; +use tokio::{ + io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader}, + task::LocalSet, }; -use crate::{client::Position, config::DEFAULT_PORT}; +#[cfg(windows)] +use tokio::net::tcp::{ReadHalf, WriteHalf}; +#[cfg(unix)] +use tokio::net::unix::{ReadHalf, WriteHalf}; + +use std::io::{self, Write}; + +use crate::{ + client::{ClientConfig, ClientHandle, ClientState}, + config::DEFAULT_PORT, +}; + +use self::command::{Command, CommandType}; use super::{FrontendEvent, FrontendRequest}; +mod command; + pub fn run() -> Result<()> { - let Ok(mut tx) = super::wait_for_service() else { + let Ok(stream) = super::wait_for_service() else { return Err(anyhow!("Could not connect to lan-mouse-socket")); }; - let mut rx = tx.try_clone()?; + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_io() + .enable_time() + .build()?; + runtime.block_on(LocalSet::new().run_until(async move { + stream.set_nonblocking(true)?; + #[cfg(unix)] + let mut stream = tokio::net::UnixStream::from_std(stream)?; + #[cfg(windows)] + let mut stream = tokio::net::TcpStream::from_std(stream)?; + let (rx, tx) = stream.split(); - let reader = thread::Builder::new() - .name("cli-frontend".to_string()) - .spawn(move || { - // all further prompts - prompt(); - loop { - let mut buf = String::new(); - match std::io::stdin().read_line(&mut buf) { - Ok(0) => return, - Ok(len) => { - if let Some(events) = parse_cmd(buf, len) { - for event in events.iter() { - let json = serde_json::to_string(&event).unwrap(); - let bytes = json.as_bytes(); - let len = bytes.len().to_be_bytes(); - if let Err(e) = tx.write(&len) { - log::error!("error sending message: {e}"); - }; - if let Err(e) = tx.write(bytes) { - log::error!("error sending message: {e}"); - }; - if *event == FrontendRequest::Terminate() { - return; - } - } - // prompt is printed after the server response is received - } else { - prompt(); - } - } - Err(e) => { - if e.kind() != ErrorKind::UnexpectedEof { - log::error!("error reading from stdin: {e}"); - } - return; - } - } - } - })?; - - let _ = thread::Builder::new() - .name("cli-frontend-notify".to_string()) - .spawn(move || { - loop { - // read len - let mut len = [0u8; 8]; - match rx.read_exact(&mut len) { - Ok(()) => (), - Err(e) if e.kind() == ErrorKind::UnexpectedEof => break, - Err(e) => break log::error!("{e}"), - }; - let len = usize::from_be_bytes(len); - - // read payload - let mut buf: Vec = vec![0u8; len]; - match rx.read_exact(&mut buf[..len]) { - Ok(()) => (), - Err(e) if e.kind() == ErrorKind::UnexpectedEof => break, - Err(e) => break log::error!("{e}"), - }; - - let notify: FrontendEvent = match serde_json::from_slice(&buf) { - Ok(n) => n, - Err(e) => break log::error!("{e}"), - }; - match notify { - FrontendEvent::Activated(handle, active) => { - if active { - log::info!("client {handle} activated"); - } else { - log::info!("client {handle} deactivated"); - } - } - FrontendEvent::Created(handle, client) => { - let port = client.port; - let pos = client.pos; - let hostname = client.hostname.as_deref().unwrap_or(""); - log::info!("new client ({handle}): {hostname}:{port} - {pos}"); - } - FrontendEvent::Updated(handle, client) => { - let port = client.port; - let pos = client.pos; - let hostname = client.hostname.as_deref().unwrap_or(""); - log::info!("client ({handle}) updated: {hostname}:{port} - {pos}"); - } - FrontendEvent::Deleted(client) => { - log::info!("client ({client}) deleted."); - } - FrontendEvent::Error(e) => { - log::warn!("{e}"); - } - FrontendEvent::Enumerate(clients) => { - for (handle, client, active) in clients.into_iter() { - log::info!( - "client ({}) [{}]: active: {}, associated addresses: [{}]", - handle, - client.hostname.as_deref().unwrap_or(""), - if active { "yes" } else { "no" }, - client - .ips - .into_iter() - .map(|a| a.to_string()) - .collect::>() - .join(", ") - ); - } - } - FrontendEvent::PortChanged(port, msg) => match msg { - Some(msg) => log::info!("could not change port: {msg}"), - None => log::info!("port changed: {port}"), - }, - } - prompt(); - } - })?; - match reader.join() { - Ok(_) => {} - Err(e) => { - let msg = match (e.downcast_ref::<&str>(), e.downcast_ref::()) { - (Some(&s), _) => s, - (_, Some(s)) => s, - _ => "no panic info", - }; - log::error!("reader thread paniced: {msg}"); - } - } + let mut cli = Cli::new(rx, tx); + cli.run().await + }))?; Ok(()) } -fn prompt() { +struct Cli<'a> { + clients: Vec<(ClientHandle, ClientConfig, ClientState)>, + rx: ReadHalf<'a>, + tx: WriteHalf<'a>, +} + +impl<'a> Cli<'a> { + fn new(rx: ReadHalf<'a>, tx: WriteHalf<'a>) -> Cli<'a> { + Self { + clients: vec![], + rx, + tx, + } + } + + async fn run(&mut self) -> Result<()> { + let stdin = tokio::io::stdin(); + let stdin = BufReader::new(stdin); + let mut stdin = stdin.lines(); + + /* initial state sync */ + let request = FrontendRequest::Enumerate(); + self.send_request(request).await?; + + self.clients = loop { + let event = self.await_event().await?; + if let FrontendEvent::Enumerate(clients) = event { + break clients; + } + }; + + loop { + prompt()?; + tokio::select! { + line = stdin.next_line() => { + let Some(line) = line? else { + break Ok(()); + }; + let cmd: Command = match line.parse() { + Ok(cmd) => cmd, + Err(e) => { + eprintln!("{e}"); + continue; + } + }; + self.execute(cmd).await?; + } + event = self.await_event() => { + let event = event?; + self.handle_event(event); + } + } + } + } + + async fn execute(&mut self, cmd: Command) -> Result<()> { + match cmd { + Command::None => {} + Command::Connect(pos, host, port) => { + let request = FrontendRequest::Create; + self.send_request(request).await?; + let handle = loop { + let event = self.await_event().await?; + match event { + FrontendEvent::Created(h, c, s) => { + self.clients.push((h, c, s)); + break h; + } + _ => { + self.handle_event(event); + continue; + } + } + }; + for request in [ + FrontendRequest::UpdateHostname(handle, Some(host.clone())), + FrontendRequest::UpdatePort(handle, port.unwrap_or(DEFAULT_PORT)), + FrontendRequest::UpdatePosition(handle, pos), + ] { + self.send_request(request).await?; + loop { + let event = self.await_event().await?; + self.handle_event(event.clone()); + if let FrontendEvent::Updated(_, _) = event { + break; + } + } + } + } + Command::Disconnect(id) => { + self.send_request(FrontendRequest::Delete(id)).await?; + loop { + let event = self.await_event().await?; + self.handle_event(event.clone()); + if let FrontendEvent::Deleted(_) = event { + self.handle_event(event); + break; + } + } + } + Command::Activate(id) => { + self.send_request(FrontendRequest::Activate(id, true)) + .await?; + loop { + let event = self.await_event().await?; + self.handle_event(event.clone()); + if let FrontendEvent::StateChange(_, _) = event { + self.handle_event(event); + break; + } + } + } + Command::Deactivate(id) => { + self.send_request(FrontendRequest::Activate(id, false)) + .await?; + loop { + let event = self.await_event().await?; + self.handle_event(event.clone()); + if let FrontendEvent::StateChange(_, _) = event { + self.handle_event(event); + break; + } + } + } + Command::List => { + self.send_request(FrontendRequest::Enumerate()).await?; + loop { + let event = self.await_event().await?; + self.handle_event(event.clone()); + if let FrontendEvent::Enumerate(_) = event { + break; + } + } + } + Command::SetHost(handle, host) => { + let request = FrontendRequest::UpdateHostname(handle, Some(host.clone())); + self.send_request(request).await?; + loop { + let event = self.await_event().await?; + self.handle_event(event.clone()); + if let FrontendEvent::Updated(_, _) = event { + self.handle_event(event); + break; + } + } + } + Command::SetPort(handle, port) => { + let request = FrontendRequest::UpdatePort(handle, port.unwrap_or(DEFAULT_PORT)); + self.send_request(request).await?; + loop { + let event = self.await_event().await?; + self.handle_event(event.clone()); + if let FrontendEvent::Updated(_, _) = event { + break; + } + } + } + Command::Help => { + for cmd_type in [ + CommandType::List, + CommandType::Connect, + CommandType::Disconnect, + CommandType::Activate, + CommandType::Deactivate, + CommandType::SetHost, + CommandType::SetPort, + ] { + eprintln!("{}", cmd_type.usage()); + } + } + } + Ok(()) + } + + fn find_mut( + &mut self, + handle: ClientHandle, + ) -> Option<&mut (ClientHandle, ClientConfig, ClientState)> { + self.clients.iter_mut().find(|(h, _, _)| *h == handle) + } + + fn remove( + &mut self, + handle: ClientHandle, + ) -> Option<(ClientHandle, ClientConfig, ClientState)> { + let idx = self.clients.iter().position(|(h, _, _)| *h == handle); + idx.map(|i| self.clients.swap_remove(i)) + } + + fn handle_event(&mut self, event: FrontendEvent) { + match event { + FrontendEvent::Created(h, c, s) => { + eprint!("client added ({h}): "); + print_config(&c); + eprint!(" "); + print_state(&s); + eprintln!(); + self.clients.push((h, c, s)); + } + FrontendEvent::Updated(h, c) => { + if let Some((_, config, _)) = self.find_mut(h) { + let old_host = config.hostname.clone().unwrap_or("\"\"".into()); + let new_host = c.hostname.clone().unwrap_or("\"\"".into()); + if old_host != new_host { + eprintln!( + "client {h}: hostname updated ({} -> {})", + old_host, new_host + ); + } + if config.port != c.port { + eprintln!("client {h} changed port: {} -> {}", config.port, c.port); + } + if config.fix_ips != c.fix_ips { + eprintln!("client {h} ips updated: {:?}", c.fix_ips) + } + *config = c; + } + } + FrontendEvent::StateChange(h, s) => { + if let Some((_, _, state)) = self.find_mut(h) { + if state.active ^ s.active { + eprintln!( + "client {h} {}", + if s.active { "activated" } else { "deactivated" } + ); + } + *state = s; + } + } + FrontendEvent::Deleted(h) => { + if let Some((h, c, _)) = self.remove(h) { + eprint!("client {h} removed ("); + print_config(&c); + eprintln!(")"); + } + } + FrontendEvent::PortChanged(p, e) => { + if let Some(e) = e { + eprintln!("failed to change port: {e}"); + } else { + eprintln!("changed port to {p}"); + } + } + FrontendEvent::Enumerate(clients) => { + self.clients = clients; + self.print_clients(); + } + FrontendEvent::Error(e) => { + eprintln!("ERROR: {e}"); + } + } + } + + fn print_clients(&mut self) { + for (h, c, s) in self.clients.iter() { + eprint!("client {h}: "); + print_config(c); + eprint!(" "); + print_state(s); + eprintln!(); + } + } + + async fn send_request(&mut self, request: FrontendRequest) -> io::Result<()> { + let json = serde_json::to_string(&request).unwrap(); + let bytes = json.as_bytes(); + let len = bytes.len(); + self.tx.write_u64(len as u64).await?; + self.tx.write_all(bytes).await?; + Ok(()) + } + + async fn await_event(&mut self) -> Result { + let len = self.rx.read_u64().await?; + let mut buf = vec![0u8; len as usize]; + self.rx.read_exact(&mut buf).await?; + let event: FrontendEvent = serde_json::from_slice(&buf)?; + Ok(event) + } +} + +fn prompt() -> io::Result<()> { eprint!("lan-mouse > "); - std::io::stderr().flush().unwrap(); + std::io::stderr().flush()?; + Ok(()) } -fn parse_cmd(s: String, len: usize) -> Option> { - if len == 0 { - return Some(vec![FrontendRequest::Terminate()]); - } - let mut l = s.split_whitespace(); - let cmd = l.next()?; - let res = match cmd { - "help" => { - log::info!("list list clients"); - log::info!("connect left|right|top|bottom [port] add a new client"); - log::info!("disconnect remove a client"); - log::info!("activate activate a client"); - log::info!("deactivate deactivate a client"); - log::info!("exit exit lan-mouse"); - log::info!("setport change port"); - None - } - "exit" => return Some(vec![FrontendRequest::Terminate()]), - "list" => return Some(vec![FrontendRequest::Enumerate()]), - "connect" => Some(parse_connect(l)), - "disconnect" => Some(parse_disconnect(l)), - "activate" => Some(parse_activate(l)), - "deactivate" => Some(parse_deactivate(l)), - "setport" => Some(parse_port(l)), - _ => { - log::error!("unknown command: {s}"); - None - } - }; - match res { - Some(Ok(e)) => Some(e), - Some(Err(e)) => { - log::warn!("{e}"); - None - } - _ => None, - } +fn print_config(c: &ClientConfig) { + eprint!( + "{}:{} ({}), ips: {:?}", + c.hostname.clone().unwrap_or("(no hostname)".into()), + c.port, + c.pos, + c.fix_ips + ); } -fn parse_connect(mut l: SplitWhitespace) -> Result> { - let usage = "usage: connect left|right|top|bottom [port]"; - let host = l.next().context(usage)?.to_owned(); - let pos = match l.next().context(usage)? { - "right" => Position::Right, - "top" => Position::Top, - "bottom" => Position::Bottom, - _ => Position::Left, - }; - let port = if let Some(p) = l.next() { - p.parse()? - } else { - DEFAULT_PORT - }; - Ok(vec![ - FrontendRequest::Create(Some(host), port, pos), - FrontendRequest::Enumerate(), - ]) -} - -fn parse_disconnect(mut l: SplitWhitespace) -> Result> { - let client = l.next().context("usage: disconnect ")?.parse()?; - Ok(vec![ - FrontendRequest::Delete(client), - FrontendRequest::Enumerate(), - ]) -} - -fn parse_activate(mut l: SplitWhitespace) -> Result> { - let client = l.next().context("usage: activate ")?.parse()?; - Ok(vec![ - FrontendRequest::Activate(client, true), - FrontendRequest::Enumerate(), - ]) -} - -fn parse_deactivate(mut l: SplitWhitespace) -> Result> { - let client = l.next().context("usage: deactivate ")?.parse()?; - Ok(vec![ - FrontendRequest::Activate(client, false), - FrontendRequest::Enumerate(), - ]) -} - -fn parse_port(mut l: SplitWhitespace) -> Result> { - let port = l.next().context("usage: setport ")?.parse()?; - Ok(vec![FrontendRequest::ChangePort(port)]) +fn print_state(s: &ClientState) { + eprint!("active: {}, dns: {:?}", s.active, s.ips); } diff --git a/src/frontend/cli/command.rs b/src/frontend/cli/command.rs new file mode 100644 index 0000000..6d49ca1 --- /dev/null +++ b/src/frontend/cli/command.rs @@ -0,0 +1,153 @@ +use std::{ + fmt::Display, + str::{FromStr, SplitWhitespace}, +}; + +use crate::client::{ClientHandle, Position}; + +pub(super) enum CommandType { + NoCommand, + Help, + Connect, + Disconnect, + Activate, + Deactivate, + List, + SetHost, + SetPort, +} + +#[derive(Debug)] +pub(super) struct InvalidCommand { + cmd: String, +} + +impl Display for InvalidCommand { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "invalid command: \"{}\"", self.cmd) + } +} + +impl FromStr for CommandType { + type Err = InvalidCommand; + + fn from_str(s: &str) -> std::prelude::v1::Result { + match s { + "connect" => Ok(Self::Connect), + "disconnect" => Ok(Self::Disconnect), + "activate" => Ok(Self::Activate), + "deactivate" => Ok(Self::Deactivate), + "list" => Ok(Self::List), + "set-host" => Ok(Self::SetHost), + "set-port" => Ok(Self::SetPort), + "help" => Ok(Self::Help), + _ => Err(InvalidCommand { cmd: s.to_string() }), + } + } +} + +#[derive(Debug)] +pub(super) enum Command { + None, + Help, + Connect(Position, String, Option), + Disconnect(ClientHandle), + Activate(ClientHandle), + Deactivate(ClientHandle), + List, + SetHost(ClientHandle, String), + SetPort(ClientHandle, Option), +} + +impl CommandType { + pub(super) fn usage(&self) -> &'static str { + match self { + CommandType::Help => "help", + CommandType::NoCommand => "", + CommandType::Connect => "connect left|right|top|bottom []", + CommandType::Disconnect => "disconnect ", + CommandType::Activate => "activate ", + CommandType::Deactivate => "deactivate ", + CommandType::List => "list", + CommandType::SetHost => "set-host ", + CommandType::SetPort => "set-port ", + } + } +} + +pub(super) enum CommandParseError { + Usage(CommandType), + Invalid(InvalidCommand), +} + +impl Display for CommandParseError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Usage(cmd) => write!(f, "usage: {}", cmd.usage()), + Self::Invalid(cmd) => write!(f, "{}", cmd), + } + } +} + +impl FromStr for Command { + type Err = CommandParseError; + + fn from_str(cmd: &str) -> Result { + let mut args = cmd.split_whitespace(); + let cmd_type: CommandType = match args.next() { + Some(c) => c.parse().map_err(CommandParseError::Invalid), + None => Ok(CommandType::NoCommand), + }?; + match cmd_type { + CommandType::Help => Ok(Command::Help), + CommandType::NoCommand => Ok(Command::None), + CommandType::Connect => parse_connect_cmd(args), + CommandType::Disconnect => parse_disconnect_cmd(args), + CommandType::Activate => parse_activate_cmd(args), + CommandType::Deactivate => parse_deactivate_cmd(args), + CommandType::List => Ok(Command::List), + CommandType::SetHost => parse_set_host(args), + CommandType::SetPort => parse_set_port(args), + } + } +} + +fn parse_connect_cmd(mut args: SplitWhitespace<'_>) -> Result { + const USAGE: CommandParseError = CommandParseError::Usage(CommandType::Connect); + let pos = args.next().ok_or(USAGE)?.parse().map_err(|_| USAGE)?; + let host = args.next().ok_or(USAGE)?.to_string(); + let port = args.next().and_then(|p| p.parse().ok()); + Ok(Command::Connect(pos, host, port)) +} + +fn parse_disconnect_cmd(mut args: SplitWhitespace<'_>) -> Result { + const USAGE: CommandParseError = CommandParseError::Usage(CommandType::Disconnect); + let id = args.next().ok_or(USAGE)?.parse().map_err(|_| USAGE)?; + Ok(Command::Disconnect(id)) +} + +fn parse_activate_cmd(mut args: SplitWhitespace<'_>) -> Result { + const USAGE: CommandParseError = CommandParseError::Usage(CommandType::Activate); + let id = args.next().ok_or(USAGE)?.parse().map_err(|_| USAGE)?; + Ok(Command::Activate(id)) +} + +fn parse_deactivate_cmd(mut args: SplitWhitespace<'_>) -> Result { + const USAGE: CommandParseError = CommandParseError::Usage(CommandType::Deactivate); + let id = args.next().ok_or(USAGE)?.parse().map_err(|_| USAGE)?; + Ok(Command::Deactivate(id)) +} + +fn parse_set_host(mut args: SplitWhitespace<'_>) -> Result { + const USAGE: CommandParseError = CommandParseError::Usage(CommandType::SetHost); + let id = args.next().ok_or(USAGE)?.parse().map_err(|_| USAGE)?; + let host = args.next().ok_or(USAGE)?.parse().map_err(|_| USAGE)?; + Ok(Command::SetHost(id, host)) +} + +fn parse_set_port(mut args: SplitWhitespace<'_>) -> Result { + const USAGE: CommandParseError = CommandParseError::Usage(CommandType::SetPort); + let id = args.next().ok_or(USAGE)?.parse().map_err(|_| USAGE)?; + let port = args.next().and_then(|p| p.parse().ok()); + Ok(Command::SetPort(id, port)) +} diff --git a/src/frontend/gtk.rs b/src/frontend/gtk.rs index 590f4e9..b4cd834 100644 --- a/src/frontend/gtk.rs +++ b/src/frontend/gtk.rs @@ -8,7 +8,7 @@ use std::{ process, str, }; -use crate::frontend::gtk::window::Window; +use crate::frontend::{gtk::window::Window, FrontendRequest}; use adw::Application; use gtk::{ @@ -113,34 +113,35 @@ fn build_ui(app: &Application) { } }); - let window = Window::new(app); - window.imp().stream.borrow_mut().replace(tx); + let window = Window::new(app, tx); + window.request(FrontendRequest::Enumerate()); + glib::spawn_future_local(clone!(@weak window => async move { loop { let notify = receiver.recv().await.unwrap_or_else(|_| process::exit(1)); match notify { - FrontendEvent::Activated(handle, active) => { - window.activate_client(handle, active); - } - FrontendEvent::Created(handle, client) => { - window.new_client(handle, client, false); - }, - FrontendEvent::Updated(handle, client) => { - window.update_client(handle, client); - } - FrontendEvent::Error(e) => { - window.show_toast(e.as_str()); + FrontendEvent::Created(handle, client, state) => { + window.new_client(handle, client, state); }, FrontendEvent::Deleted(client) => { window.delete_client(client); } + FrontendEvent::Updated(handle, client) => { + window.update_client_config(handle, client); + } + FrontendEvent::StateChange(handle, state) => { + window.update_client_state(handle, state); + } + FrontendEvent::Error(e) => { + window.show_toast(e.as_str()); + }, FrontendEvent::Enumerate(clients) => { - for (handle, client, active) in clients { + for (handle, client, state) in clients { if window.client_idx(handle).is_some() { - window.activate_client(handle, active); - window.update_client(handle, client); + window.update_client_config(handle, client); + window.update_client_state(handle, state); } else { - window.new_client(handle, client, active); + window.new_client(handle, client, state); } } }, diff --git a/src/frontend/gtk/client_object.rs b/src/frontend/gtk/client_object.rs index aba7af6..7b48f3c 100644 --- a/src/frontend/gtk/client_object.rs +++ b/src/frontend/gtk/client_object.rs @@ -3,20 +3,20 @@ mod imp; use adw::subclass::prelude::*; use gtk::glib::{self, Object}; -use crate::client::{Client, ClientHandle}; +use crate::client::{ClientConfig, ClientHandle, ClientState}; glib::wrapper! { pub struct ClientObject(ObjectSubclass); } impl ClientObject { - pub fn new(handle: ClientHandle, client: Client, active: bool) -> Self { + pub fn new(handle: ClientHandle, client: ClientConfig, state: ClientState) -> Self { Object::builder() .property("handle", handle) .property("hostname", client.hostname) .property("port", client.port as u32) .property("position", client.pos.to_string()) - .property("active", active) + .property("active", state.active) .build() } diff --git a/src/frontend/gtk/window.rs b/src/frontend/gtk/window.rs index 887ed40..7d5b8a8 100644 --- a/src/frontend/gtk/window.rs +++ b/src/frontend/gtk/window.rs @@ -2,6 +2,12 @@ mod imp; use std::io::Write; +#[cfg(unix)] +use std::os::unix::net::UnixStream; + +#[cfg(windows)] +use std::net::TcpStream; + use adw::prelude::*; use adw::subclass::prelude::*; use glib::{clone, Object}; @@ -12,7 +18,7 @@ use gtk::{ }; use crate::{ - client::{Client, ClientHandle, Position}, + client::{ClientConfig, ClientHandle, ClientState, Position}, config::DEFAULT_PORT, frontend::{gtk::client_object::ClientObject, FrontendRequest}, }; @@ -27,8 +33,14 @@ glib::wrapper! { } impl Window { - pub(crate) fn new(app: &adw::Application) -> Self { - Object::builder().property("application", app).build() + pub(crate) fn new( + app: &adw::Application, + #[cfg(unix)] tx: UnixStream, + #[cfg(windows)] tx: TcpStream, + ) -> Self { + let window: Self = Object::builder().property("application", app).build(); + window.imp().stream.borrow_mut().replace(tx); + window } pub fn clients(&self) -> gio::ListStore { @@ -87,8 +99,8 @@ impl Window { row } - pub fn new_client(&self, handle: ClientHandle, client: Client, active: bool) { - let client = ClientObject::new(handle, client, active); + pub fn new_client(&self, handle: ClientHandle, client: ClientConfig, state: ClientState) { + let client = ClientObject::new(handle, client, state); self.clients().append(&client); self.set_placeholder_visible(false); } @@ -115,7 +127,7 @@ impl Window { } } - pub fn update_client(&self, handle: ClientHandle, client: Client) { + pub fn update_client_config(&self, handle: ClientHandle, client: ClientConfig) { let Some(idx) = self.client_idx(handle) else { log::warn!("could not find client with handle {}", handle); return; @@ -137,22 +149,23 @@ impl Window { } } - pub fn activate_client(&self, handle: ClientHandle, active: bool) { + pub fn update_client_state(&self, handle: ClientHandle, state: ClientState) { let Some(idx) = self.client_idx(handle) else { - log::warn!("could not find client with handle {handle}"); + log::warn!("could not find client with handle {}", handle); return; }; let client_object = self.clients().item(idx as u32).unwrap(); let client_object: &ClientObject = client_object.downcast_ref().unwrap(); let data = client_object.get_data(); - if data.active != active { - client_object.set_active(active); - log::debug!("set active to {active}"); + + if state.active != data.active { + client_object.set_active(state.active); + log::debug!("set active to {}", state.active); } } pub fn request_client_create(&self) { - let event = FrontendRequest::Create(None, DEFAULT_PORT, Position::default()); + let event = FrontendRequest::Create; self.imp().set_port(DEFAULT_PORT); self.request(event); } @@ -167,24 +180,21 @@ impl Window { } pub fn request_client_update(&self, client: &ClientObject, active: bool) { + let handle = client.handle(); let data = client.get_data(); - let position = match Position::try_from(data.position.as_str()) { - Ok(pos) => pos, - _ => { - log::error!("invalid position: {}", data.position); - return; - } - }; + let position = Position::try_from(data.position.as_str()).expect("invalid position"); let hostname = data.hostname; let port = data.port as u16; - let event = FrontendRequest::Update(client.handle(), hostname, port, position); - log::debug!("requesting update: {event:?}"); - self.request(event); - - let event = FrontendRequest::Activate(client.handle(), active); - log::debug!("requesting activate: {event:?}"); - self.request(event); + for event in [ + FrontendRequest::UpdateHostname(handle, hostname), + FrontendRequest::UpdatePosition(handle, position), + FrontendRequest::UpdatePort(handle, port), + FrontendRequest::Activate(handle, active), + ] { + log::debug!("requesting: {event:?}"); + self.request(event); + } } pub fn request_client_delete(&self, idx: u32) { @@ -198,7 +208,7 @@ impl Window { } } - fn request(&self, event: FrontendRequest) { + pub fn request(&self, event: FrontendRequest) { let json = serde_json::to_string(&event).unwrap(); log::debug!("requesting {json}"); let mut stream = self.imp().stream.borrow_mut(); diff --git a/src/server.rs b/src/server.rs index 18bc163..8eb1450 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,12 +1,13 @@ use log; use std::{ cell::{Cell, RefCell}, + collections::HashSet, rc::Rc, }; use tokio::signal; use crate::{ - client::{ClientHandle, ClientManager}, + client::{ClientConfig, ClientHandle, ClientManager, ClientState}, config::Config, dns, frontend::{FrontendListener, FrontendRequest}, @@ -49,13 +50,21 @@ impl Server { let state = Rc::new(Cell::new(State::Receiving)); let port = Rc::new(Cell::new(config.port)); for config_client in config.get_clients() { - client_manager.borrow_mut().add_client( - config_client.hostname, - config_client.ips, - config_client.port, - config_client.pos, - config_client.active, - ); + let client = ClientConfig { + hostname: config_client.hostname, + fix_ips: config_client.ips.into_iter().collect(), + port: config_client.port, + pos: config_client.pos, + }; + let state = ClientState { + active: config_client.active, + ips: HashSet::from_iter(client.fix_ips.iter().cloned()), + ..Default::default() + }; + let mut client_manager = client_manager.borrow_mut(); + let handle = client_manager.add_client(); + let c = client_manager.get_mut(handle).expect("invalid handle"); + *c = (client, state); } let release_bind = config.release_bind.clone(); Self { @@ -130,9 +139,9 @@ impl Server { .client_manager .borrow() .get_client_states() - .filter_map(|(h, s)| { + .filter_map(|(h, (c, s))| { if s.active { - Some((h, s.client.hostname.clone())) + Some((h, c.hostname.clone())) } else { None } diff --git a/src/server/capture_task.rs b/src/server/capture_task.rs index d68dd1a..4b8c3b0 100644 --- a/src/server/capture_task.rs +++ b/src/server/capture_task.rs @@ -108,7 +108,7 @@ async fn handle_capture_event( // get client state for handle let mut client_manager = server.client_manager.borrow_mut(); let client_state = match client_manager.get_mut(handle) { - Some(state) => state, + Some((_, s)) => s, None => { // should not happen log::warn!("unknown client!"); diff --git a/src/server/emulation_task.rs b/src/server/emulation_task.rs index af69bf8..c688880 100644 --- a/src/server/emulation_task.rs +++ b/src/server/emulation_task.rs @@ -108,7 +108,7 @@ async fn handle_udp_rx( { let mut client_manager = server.client_manager.borrow_mut(); let client_state = match client_manager.get_mut(handle) { - Some(s) => s, + Some((_, s)) => s, None => { log::error!("unknown handle"); return; @@ -156,13 +156,12 @@ async fn handle_udp_rx( }) = event { let mut client_manager = server.client_manager.borrow_mut(); - let client_state = - if let Some(client_state) = client_manager.get_mut(handle) { - client_state - } else { - log::error!("unknown handle"); - return; - }; + let client_state = if let Some((_, s)) = client_manager.get_mut(handle) { + s + } else { + log::error!("unknown handle"); + return; + }; if state == 0 { // ignore release event if key not pressed ignore_event = !client_state.pressed_keys.remove(&key); @@ -213,7 +212,7 @@ async fn release_keys( .borrow_mut() .get_mut(client) .iter_mut() - .flat_map(|s| s.pressed_keys.drain()) + .flat_map(|(_, s)| s.pressed_keys.drain()) .collect::>(); for key in keys { diff --git a/src/server/frontend_task.rs b/src/server/frontend_task.rs index 2acf4ea..58a657c 100644 --- a/src/server/frontend_task.rs +++ b/src/server/frontend_task.rs @@ -75,12 +75,11 @@ async fn handle_frontend_stream( let tx = frontend_tx.clone(); tokio::task::spawn_local(async move { - let _ = tx.send(FrontendRequest::Enumerate()).await; loop { - let event = frontend::wait_for_request(&mut stream).await; - match event { - Ok(event) => { - let _ = tx.send(event).await; + let request = frontend::wait_for_request(&mut stream).await; + match request { + Ok(request) => { + let _ = tx.send(request).await; } Err(e) => { if let Some(e) = e.downcast_ref::() { @@ -98,8 +97,8 @@ async fn handle_frontend_stream( async fn handle_frontend_event( server: &Server, - capture_tx: &Sender, - emulate_tx: &Sender, + capture: &Sender, + emulate: &Sender, resolve_tx: &Sender, frontend: &mut FrontendListener, port_tx: &Sender, @@ -107,99 +106,67 @@ async fn handle_frontend_event( ) -> bool { log::debug!("frontend: {event:?}"); match event { - FrontendRequest::Create(hostname, port, pos) => { - add_client( - server, - frontend, - resolve_tx, - hostname, - HashSet::new(), - port, - pos, - ) - .await; + FrontendRequest::Create => { + add_client(server, frontend).await; } FrontendRequest::Activate(handle, active) => { if active { - activate_client(server, frontend, capture_tx, emulate_tx, handle).await; + activate_client(server, frontend, capture, emulate, handle).await; } else { - deactivate_client(server, frontend, capture_tx, emulate_tx, handle).await; + deactivate_client(server, frontend, capture, emulate, handle).await; } } FrontendRequest::ChangePort(port) => { let _ = port_tx.send(port).await; } FrontendRequest::Delete(handle) => { - remove_client(server, frontend, capture_tx, emulate_tx, handle).await; + remove_client(server, frontend, capture, emulate, handle).await; } FrontendRequest::Enumerate() => { let clients = server .client_manager .borrow() .get_client_states() - .map(|(h, s)| (h, s.client.clone(), s.active)) + .map(|(h, (c, s))| (h, c.clone(), s.clone())) .collect(); - notify_all(frontend, FrontendEvent::Enumerate(clients)).await; + broadcast(frontend, FrontendEvent::Enumerate(clients)).await; } FrontendRequest::Terminate() => { log::info!("terminating gracefully..."); return true; } - FrontendRequest::Update(handle, hostname, port, pos) => { - update_client( - server, - frontend, - capture_tx, - emulate_tx, - resolve_tx, - (handle, hostname, port, pos), - ) - .await; + FrontendRequest::UpdateFixIps(handle, fix_ips) => { + update_fix_ips(server, resolve_tx, handle, fix_ips).await; + broadcast_client_update(server, frontend, handle).await; + } + FrontendRequest::UpdateHostname(handle, hostname) => { + update_hostname(server, resolve_tx, handle, hostname).await; + broadcast_client_update(server, frontend, handle).await; + } + FrontendRequest::UpdatePort(handle, port) => { + update_port(server, handle, port).await; + broadcast_client_update(server, frontend, handle).await; + } + FrontendRequest::UpdatePosition(handle, pos) => { + update_pos(server, handle, capture, emulate, pos).await; + broadcast_client_update(server, frontend, handle).await; } }; false } -async fn notify_all(frontend: &mut FrontendListener, event: FrontendEvent) { +async fn broadcast(frontend: &mut FrontendListener, event: FrontendEvent) { if let Err(e) = frontend.broadcast_event(event).await { log::error!("error notifying frontend: {e}"); } } -pub async fn add_client( - server: &Server, - frontend: &mut FrontendListener, - resolver_tx: &Sender, - hostname: Option, - addr: HashSet, - port: u16, - pos: Position, -) { - log::info!( - "adding client [{}]{} @ {:?}", - pos, - hostname.as_deref().unwrap_or(""), - &addr - ); - let handle = - server - .client_manager - .borrow_mut() - .add_client(hostname.clone(), addr, port, pos, false); +pub async fn add_client(server: &Server, frontend: &mut FrontendListener) { + let handle = server.client_manager.borrow_mut().add_client(); + log::info!("added client {handle}"); - log::debug!("add_client {handle}"); - - if let Some(hostname) = hostname { - let _ = resolver_tx.send(DnsRequest { hostname, handle }).await; - } - let client = server - .client_manager - .borrow() - .get(handle) - .unwrap() - .client - .clone(); - notify_all(frontend, FrontendEvent::Created(handle, client)).await; + let (c, s) = server.client_manager.borrow().get(handle).unwrap().clone(); + broadcast(frontend, FrontendEvent::Created(handle, c, s)).await; } pub async fn deactivate_client( @@ -209,19 +176,19 @@ pub async fn deactivate_client( emulate: &Sender, handle: ClientHandle, ) { - let (client, _) = match server.client_manager.borrow_mut().get_mut(handle) { - Some(state) => { - state.active = false; - (handle, state.client.pos) + let state = match server.client_manager.borrow_mut().get_mut(handle) { + Some((_, s)) => { + s.active = false; + s.clone() } None => return, }; - let event = ClientEvent::Destroy(client); + let event = ClientEvent::Destroy(handle); let _ = capture.send(CaptureEvent::ClientEvent(event)).await; let _ = emulate.send(EmulationEvent::ClientEvent(event)).await; - let event = FrontendEvent::Activated(client, false); - notify_all(frontend, event).await; + let event = FrontendEvent::StateChange(handle, state); + broadcast(frontend, event).await; } pub async fn activate_client( @@ -233,7 +200,7 @@ pub async fn activate_client( ) { /* deactivate potential other client at this position */ let pos = match server.client_manager.borrow().get(handle) { - Some(state) => state.client.pos, + Some((client, _)) => client.pos, None => return, }; @@ -245,19 +212,19 @@ pub async fn activate_client( } /* activate the client */ - server - .client_manager - .borrow_mut() - .get_mut(handle) - .unwrap() - .active = true; + let state = if let Some((_, s)) = server.client_manager.borrow_mut().get_mut(handle) { + s.active = true; + s.clone() + } else { + return; + }; /* notify emulation, capture and frontends */ let event = ClientEvent::Create(handle, pos); let _ = capture.send(CaptureEvent::ClientEvent(event)).await; let _ = emulate.send(EmulationEvent::ClientEvent(event)).await; - let event = FrontendEvent::Activated(handle, true); - notify_all(frontend, event).await; + let event = FrontendEvent::StateChange(handle, state); + broadcast(frontend, event).await; } pub async fn remove_client( @@ -271,7 +238,7 @@ pub async fn remove_client( .client_manager .borrow_mut() .remove_client(handle) - .map(|s| s.active) + .map(|(_, s)| s.active) else { return; }; @@ -283,76 +250,107 @@ pub async fn remove_client( } let event = FrontendEvent::Deleted(handle); - notify_all(frontend, event).await; + broadcast(frontend, event).await; } -async fn update_client( +async fn update_fix_ips( server: &Server, - frontend: &mut FrontendListener, - capture: &Sender, - emulate: &Sender, resolve_tx: &Sender, - client_update: (ClientHandle, Option, u16, Position), + handle: ClientHandle, + fix_ips: Vec, ) { - let (handle, hostname, port, pos) = client_update; - let mut changed = false; - let (hostname, active) = { - // retrieve state + let hostname = { let mut client_manager = server.client_manager.borrow_mut(); - let Some(state) = client_manager.get_mut(handle) else { + let Some((c, _)) = client_manager.get_mut(handle) else { return; }; - // update pos - if state.client.pos != pos { - state.client.pos = pos; - changed = true; - } - - // update port - if state.client.port != port { - state.client.port = port; - state.active_addr = state.active_addr.map(|a| SocketAddr::new(a.ip(), port)); - changed = true; - } - - // update hostname - if state.client.hostname != hostname { - state.client.ips = HashSet::new(); - state.active_addr = None; - state.client.hostname = hostname; - changed = true; - } - - log::debug!("client updated: {:?}", state); - (state.client.hostname.clone(), state.active) + c.fix_ips = fix_ips; + c.hostname.clone() }; - // resolve dns if something changed - if changed { - // resolve dns - if let Some(hostname) = hostname { - let _ = resolve_tx.send(DnsRequest { hostname, handle }).await; - } + if let Some(hostname) = hostname { + let _ = resolve_tx.send(DnsRequest { hostname, handle }).await; } +} + +async fn update_hostname( + server: &Server, + resolve_tx: &Sender, + handle: ClientHandle, + hostname: Option, +) { + let hostname = { + let mut client_manager = server.client_manager.borrow_mut(); + let Some((c, s)) = client_manager.get_mut(handle) else { + return; + }; + + // update hostname + if c.hostname != hostname { + c.hostname = hostname; + s.ips = HashSet::from_iter(c.fix_ips.iter().cloned()); + s.active_addr = None; + c.hostname.clone() + } else { + None + } + }; + + // resolve to update ips in state + if let Some(hostname) = hostname { + let _ = resolve_tx.send(DnsRequest { hostname, handle }).await; + } +} + +async fn update_port(server: &Server, handle: ClientHandle, port: u16) { + let mut client_manager = server.client_manager.borrow_mut(); + let Some((c, s)) = client_manager.get_mut(handle) else { + return; + }; + + if c.port != port { + c.port = port; + s.active_addr = s.active_addr.map(|a| SocketAddr::new(a.ip(), port)); + } +} + +async fn update_pos( + server: &Server, + handle: ClientHandle, + capture: &Sender, + emulate: &Sender, + pos: Position, +) { + let (changed, active) = { + let mut client_manager = server.client_manager.borrow_mut(); + let Some((c, s)) = client_manager.get_mut(handle) else { + return; + }; + + let changed = c.pos != pos; + c.pos = pos; + (changed, s.active) + }; // update state in event input emulator & input capture - if changed && active { - // update state - let destroy = ClientEvent::Destroy(handle); + if changed { + if active { + let destroy = ClientEvent::Destroy(handle); + let _ = capture.send(CaptureEvent::ClientEvent(destroy)).await; + let _ = emulate.send(EmulationEvent::ClientEvent(destroy)).await; + } let create = ClientEvent::Create(handle, pos); - let _ = capture.send(CaptureEvent::ClientEvent(destroy)).await; - let _ = emulate.send(EmulationEvent::ClientEvent(destroy)).await; let _ = capture.send(CaptureEvent::ClientEvent(create)).await; let _ = emulate.send(EmulationEvent::ClientEvent(create)).await; } - - let client = server - .client_manager - .borrow() - .get(handle) - .unwrap() - .client - .clone(); - notify_all(frontend, FrontendEvent::Updated(handle, client)).await; +} + +async fn broadcast_client_update( + server: &Server, + frontend: &mut FrontendListener, + handle: ClientHandle, +) { + let (client, _) = server.client_manager.borrow().get(handle).unwrap().clone(); + broadcast(frontend, FrontendEvent::Updated(handle, client)).await; } diff --git a/src/server/ping_task.rs b/src/server/ping_task.rs index e66ccba..5ae55b6 100644 --- a/src/server/ping_task.rs +++ b/src/server/ping_task.rs @@ -34,7 +34,7 @@ pub fn new( // if receiving we care about clients with pressed keys client_manager .get_client_states_mut() - .filter(|(_, s)| !s.pressed_keys.is_empty()) + .filter(|(_, (_, s))| !s.pressed_keys.is_empty()) .map(|(h, _)| h) .collect() } else { @@ -46,17 +46,15 @@ pub fn new( let ping_addrs: Vec = { ping_clients .iter() - .flat_map(|&c| client_manager.get(c)) - .flat_map(|state| { - if state.alive && state.active_addr.is_some() { - vec![state.active_addr.unwrap()] + .flat_map(|&h| client_manager.get(h)) + .flat_map(|(c, s)| { + if s.alive && s.active_addr.is_some() { + vec![s.active_addr.unwrap()] } else { - state - .client - .ips + s.ips .iter() .cloned() - .map(|ip| SocketAddr::new(ip, state.client.port)) + .map(|ip| SocketAddr::new(ip, c.port)) .collect() } }) @@ -64,8 +62,8 @@ pub fn new( }; // reset alive - for (_, state) in client_manager.get_client_states_mut() { - state.alive = false; + for (_, (_, s)) in client_manager.get_client_states_mut() { + s.alive = false; } (ping_clients, ping_addrs) @@ -102,8 +100,8 @@ pub fn new( let client_manager = server.client_manager.borrow(); ping_clients .iter() - .filter_map(|&c| match client_manager.get(c) { - Some(state) if !state.alive => Some(c), + .filter_map(|&h| match client_manager.get(h) { + Some((_, s)) if !s.alive => Some(h), _ => None, }) .collect() @@ -112,9 +110,9 @@ pub fn new( // we may not be receiving anymore but we should respond // to the original state and not the "new" one if receiving { - for c in unresponsive_clients { + for h in unresponsive_clients { log::warn!("device not responding, releasing keys!"); - let _ = emulate_notify.send(EmulationEvent::ReleaseKeys(c)).await; + let _ = emulate_notify.send(EmulationEvent::ReleaseKeys(h)).await; } } else { // release pointer if the active client has not responded diff --git a/src/server/resolver_task.rs b/src/server/resolver_task.rs index 8a0412e..03c026b 100644 --- a/src/server/resolver_task.rs +++ b/src/server/resolver_task.rs @@ -27,12 +27,12 @@ pub fn new(resolver: DnsResolver, server: Server) -> (JoinHandle<()>, Sender