mirror of
https://github.com/rustdesk/rustdesk.git
synced 2026-04-06 18:51:29 +03:00
portable-service: exchange ipc server/client
Signed-off-by: 21pages <pages21@163.com>
This commit is contained in:
@@ -7,7 +7,6 @@ use hbb_common::{
|
||||
log,
|
||||
message_proto::{KeyEvent, MouseEvent},
|
||||
protobuf::Message,
|
||||
sleep,
|
||||
tokio::{self, sync::mpsc},
|
||||
ResultType,
|
||||
};
|
||||
@@ -17,7 +16,7 @@ use std::{
|
||||
mem::size_of,
|
||||
ops::{Deref, DerefMut},
|
||||
sync::{Arc, Mutex},
|
||||
time::Duration,
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
use winapi::{
|
||||
shared::minwindef::{BOOL, FALSE, TRUE},
|
||||
@@ -48,18 +47,6 @@ const ADDR_CAPTURE_FRAME: usize =
|
||||
const IPC_PROFIX: &str = "_portable_service";
|
||||
pub const SHMEM_NAME: &str = "_portable_service";
|
||||
const MAX_NACK: usize = 3;
|
||||
const IPC_CONN_TIMEOUT: Duration = Duration::from_secs(3);
|
||||
|
||||
pub enum PortableServiceStatus {
|
||||
NonStart,
|
||||
Running,
|
||||
}
|
||||
|
||||
impl Default for PortableServiceStatus {
|
||||
fn default() -> Self {
|
||||
Self::NonStart
|
||||
}
|
||||
}
|
||||
|
||||
pub struct SharedMemory {
|
||||
inner: Shmem,
|
||||
@@ -200,8 +187,6 @@ mod utils {
|
||||
|
||||
// functions called in seperate SYSTEM user process.
|
||||
pub mod server {
|
||||
use hbb_common::tokio::time::Instant;
|
||||
|
||||
use super::*;
|
||||
|
||||
lazy_static::lazy_static! {
|
||||
@@ -220,7 +205,7 @@ pub mod server {
|
||||
run_capture(shmem2);
|
||||
}));
|
||||
threads.push(std::thread::spawn(|| {
|
||||
run_ipc_server();
|
||||
run_ipc_client();
|
||||
}));
|
||||
threads.push(std::thread::spawn(|| {
|
||||
run_exit_check();
|
||||
@@ -270,7 +255,7 @@ pub mod server {
|
||||
if EXIT.lock().unwrap().clone() {
|
||||
break;
|
||||
}
|
||||
let start = std::time::Instant::now();
|
||||
let start = Instant::now();
|
||||
unsafe {
|
||||
let para_ptr = shmem.as_ptr().add(ADDR_CAPTURER_PARA);
|
||||
let para = para_ptr as *const CapturerPara;
|
||||
@@ -278,7 +263,6 @@ pub mod server {
|
||||
let use_yuv = (*para).use_yuv;
|
||||
let timeout_ms = (*para).timeout_ms;
|
||||
if c.is_none() {
|
||||
let use_yuv = true;
|
||||
*crate::video_service::CURRENT_DISPLAY.lock().unwrap() = current_display;
|
||||
let (_, _current, display) = get_current_display().unwrap();
|
||||
match Capturer::new(display, use_yuv) {
|
||||
@@ -348,114 +332,78 @@ pub mod server {
|
||||
}
|
||||
|
||||
#[tokio::main(flavor = "current_thread")]
|
||||
async fn run_ipc_server() {
|
||||
async fn run_ipc_client() {
|
||||
use DataPortableService::*;
|
||||
|
||||
let postfix = IPC_PROFIX;
|
||||
let last_recv_time = Arc::new(Mutex::new(Instant::now()));
|
||||
let mut interval = tokio::time::interval(Duration::from_secs(1));
|
||||
|
||||
match new_listener(postfix).await {
|
||||
Ok(mut incoming) => loop {
|
||||
tokio::select! {
|
||||
Some(result) = incoming.next() => {
|
||||
match result {
|
||||
Ok(stream) => {
|
||||
log::info!("Got new connection");
|
||||
let last_recv_time_cloned = last_recv_time.clone();
|
||||
tokio::spawn(async move {
|
||||
let mut stream = Connection::new(stream);
|
||||
let postfix = postfix.to_owned();
|
||||
let mut timer = tokio::time::interval(Duration::from_secs(1));
|
||||
let mut nack = 0;
|
||||
let mut old_conn_count = 0;
|
||||
loop {
|
||||
tokio::select! {
|
||||
res = stream.next() => {
|
||||
if res.is_ok() {
|
||||
*last_recv_time_cloned.lock().unwrap() = Instant::now();
|
||||
}
|
||||
match res {
|
||||
Err(err) => {
|
||||
log::error!(
|
||||
"ipc{} connection closed: {}",
|
||||
postfix,
|
||||
err
|
||||
);
|
||||
*EXIT.lock().unwrap() = true;
|
||||
break;
|
||||
}
|
||||
Ok(Some(Data::DataPortableService(data))) => match data {
|
||||
Ping => {
|
||||
allow_err!(
|
||||
stream
|
||||
.send(&Data::DataPortableService(Pong))
|
||||
.await
|
||||
);
|
||||
}
|
||||
Pong => {
|
||||
nack = 0;
|
||||
}
|
||||
ConnCount(Some(n)) => {
|
||||
if old_conn_count != 0 && n == 0 {
|
||||
log::info!("Connection count decrease to 0, exit");
|
||||
stream.send(&Data::DataPortableService(WillClose)).await.ok();
|
||||
*EXIT.lock().unwrap() = true;
|
||||
break;
|
||||
}
|
||||
old_conn_count = n;
|
||||
}
|
||||
Mouse(v) => {
|
||||
if let Ok(evt) = MouseEvent::parse_from_bytes(&v) {
|
||||
crate::input_service::handle_mouse_(&evt);
|
||||
}
|
||||
}
|
||||
Key(v) => {
|
||||
if let Ok(evt) = KeyEvent::parse_from_bytes(&v) {
|
||||
crate::input_service::handle_key_(&evt);
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
},
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
_ = timer.tick() => {
|
||||
nack+=1;
|
||||
if nack > MAX_NACK {
|
||||
log::info!("max ping nack, exit");
|
||||
*EXIT.lock().unwrap() = true;
|
||||
break;
|
||||
}
|
||||
stream.send(&Data::DataPortableService(Ping)).await.ok();
|
||||
stream.send(&Data::DataPortableService(ConnCount(None))).await.ok();
|
||||
}
|
||||
match ipc::connect(1000, postfix).await {
|
||||
Ok(mut stream) => {
|
||||
let mut timer = tokio::time::interval(Duration::from_secs(1));
|
||||
let mut nack = 0;
|
||||
loop {
|
||||
tokio::select! {
|
||||
res = stream.next() => {
|
||||
match res {
|
||||
Err(err) => {
|
||||
log::error!(
|
||||
"ipc{} connection closed: {}",
|
||||
postfix,
|
||||
err
|
||||
);
|
||||
break;
|
||||
}
|
||||
Ok(Some(Data::DataPortableService(data))) => match data {
|
||||
Ping => {
|
||||
allow_err!(
|
||||
stream
|
||||
.send(&Data::DataPortableService(Pong))
|
||||
.await
|
||||
);
|
||||
}
|
||||
Pong => {
|
||||
nack = 0;
|
||||
}
|
||||
ConnCount(Some(n)) => {
|
||||
if n == 0 {
|
||||
log::info!("Connnection count equals 0, exit");
|
||||
stream.send(&Data::DataPortableService(WillClose)).await.ok();
|
||||
break;
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
Err(err) => {
|
||||
log::error!("Couldn't get portable client: {:?}", err);
|
||||
*EXIT.lock().unwrap() = true;
|
||||
Mouse(v) => {
|
||||
if let Ok(evt) = MouseEvent::parse_from_bytes(&v) {
|
||||
crate::input_service::handle_mouse_(&evt);
|
||||
}
|
||||
}
|
||||
Key(v) => {
|
||||
if let Ok(evt) = KeyEvent::parse_from_bytes(&v) {
|
||||
crate::input_service::handle_key_(&evt);
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
},
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
_ = interval.tick() => {
|
||||
if last_recv_time.lock().unwrap().elapsed() > IPC_CONN_TIMEOUT {
|
||||
log::error!("receive data timeout");
|
||||
*EXIT.lock().unwrap() = true;
|
||||
}
|
||||
if EXIT.lock().unwrap().clone() {
|
||||
break;
|
||||
_ = timer.tick() => {
|
||||
nack+=1;
|
||||
if nack > MAX_NACK {
|
||||
log::info!("max ping nack, exit");
|
||||
break;
|
||||
}
|
||||
stream.send(&Data::DataPortableService(Ping)).await.ok();
|
||||
stream.send(&Data::DataPortableService(ConnCount(None))).await.ok();
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
Err(err) => {
|
||||
log::error!("Failed to start cm ipc server: {}", err);
|
||||
*EXIT.lock().unwrap() = true;
|
||||
}
|
||||
Err(e) => {
|
||||
log::error!("Failed to connect portable service ipc:{:?}", e);
|
||||
}
|
||||
}
|
||||
|
||||
*EXIT.lock().unwrap() = true;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -466,54 +414,46 @@ pub mod client {
|
||||
use super::*;
|
||||
|
||||
lazy_static::lazy_static! {
|
||||
pub static ref SHMEM: Arc<Mutex<Option<SharedMemory>>> = Default::default();
|
||||
pub static ref PORTABLE_SERVICE_STATUS: Arc<Mutex<PortableServiceStatus>> = Default::default();
|
||||
static ref SENDER : Mutex<mpsc::UnboundedSender<ipc::Data>> = Mutex::new(client::start_ipc_client());
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub enum PortableServiceStatus {
|
||||
NotStarted,
|
||||
Starting,
|
||||
Running,
|
||||
}
|
||||
|
||||
impl Default for PortableServiceStatus {
|
||||
fn default() -> Self {
|
||||
Self::NotStarted
|
||||
}
|
||||
pub static ref PORTABLE_SERVICE_RUNNING: Arc<Mutex<bool>> = Default::default();
|
||||
static ref SHMEM: Arc<Mutex<Option<SharedMemory>>> = Default::default();
|
||||
static ref SENDER : Mutex<mpsc::UnboundedSender<ipc::Data>> = Mutex::new(client::start_ipc_server());
|
||||
}
|
||||
|
||||
pub(crate) fn start_portable_service() -> ResultType<()> {
|
||||
if PORTABLE_SERVICE_STATUS.lock().unwrap().clone() == PortableServiceStatus::NotStarted {
|
||||
if SHMEM.lock().unwrap().is_none() {
|
||||
let displays = scrap::Display::all()?;
|
||||
if displays.is_empty() {
|
||||
bail!("no display available!");
|
||||
}
|
||||
let mut max_pixel = 0;
|
||||
let align = 64;
|
||||
for d in displays {
|
||||
let pixel = utils::align(d.width(), align) * utils::align(d.height(), align);
|
||||
if max_pixel < pixel {
|
||||
max_pixel = pixel;
|
||||
}
|
||||
}
|
||||
let shmem_size = utils::align(ADDR_CAPTURE_FRAME + max_pixel * 4, align);
|
||||
// os error 112, no enough space
|
||||
*SHMEM.lock().unwrap() = Some(crate::portable_service::SharedMemory::create(
|
||||
crate::portable_service::SHMEM_NAME,
|
||||
shmem_size,
|
||||
)?);
|
||||
shutdown_hooks::add_shutdown_hook(drop_shmem);
|
||||
}
|
||||
if crate::common::run_me(vec!["--portable-service"]).is_err() {
|
||||
*SHMEM.lock().unwrap() = None;
|
||||
bail!("Failed to run portable service process");
|
||||
}
|
||||
*PORTABLE_SERVICE_STATUS.lock().unwrap() = PortableServiceStatus::Starting;
|
||||
let _sender = SENDER.lock().unwrap();
|
||||
if PORTABLE_SERVICE_RUNNING.lock().unwrap().clone() {
|
||||
bail!("already running");
|
||||
}
|
||||
if SHMEM.lock().unwrap().is_none() {
|
||||
let displays = scrap::Display::all()?;
|
||||
if displays.is_empty() {
|
||||
bail!("no display available!");
|
||||
}
|
||||
let mut max_pixel = 0;
|
||||
let align = 64;
|
||||
for d in displays {
|
||||
let pixel = utils::align(d.width(), align) * utils::align(d.height(), align);
|
||||
if max_pixel < pixel {
|
||||
max_pixel = pixel;
|
||||
}
|
||||
}
|
||||
let shmem_size = utils::align(ADDR_CAPTURE_FRAME + max_pixel * 4, align);
|
||||
// os error 112, no enough space
|
||||
*SHMEM.lock().unwrap() = Some(crate::portable_service::SharedMemory::create(
|
||||
crate::portable_service::SHMEM_NAME,
|
||||
shmem_size,
|
||||
)?);
|
||||
shutdown_hooks::add_shutdown_hook(drop_shmem);
|
||||
}
|
||||
let mut option = SHMEM.lock().unwrap();
|
||||
let shmem = option.as_mut().unwrap();
|
||||
unsafe {
|
||||
libc::memset(shmem.as_ptr() as _, 0, shmem.len() as _);
|
||||
}
|
||||
if crate::common::run_me(vec!["--portable-service"]).is_err() {
|
||||
*SHMEM.lock().unwrap() = None;
|
||||
bail!("Failed to run portable service process");
|
||||
}
|
||||
let _sender = SENDER.lock().unwrap();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -613,94 +553,98 @@ pub mod client {
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) fn start_ipc_client() -> mpsc::UnboundedSender<Data> {
|
||||
pub(super) fn start_ipc_server() -> mpsc::UnboundedSender<Data> {
|
||||
let (tx, rx) = mpsc::unbounded_channel::<Data>();
|
||||
std::thread::spawn(move || start_ipc_client_async(rx));
|
||||
std::thread::spawn(move || start_ipc_server_async(rx));
|
||||
tx
|
||||
}
|
||||
|
||||
#[tokio::main(flavor = "current_thread")]
|
||||
async fn start_ipc_client_async(rx: mpsc::UnboundedReceiver<Data>) {
|
||||
async fn start_ipc_server_async(rx: mpsc::UnboundedReceiver<Data>) {
|
||||
use DataPortableService::*;
|
||||
let mut rx = rx;
|
||||
let mut connect_failed = 0;
|
||||
loop {
|
||||
if PORTABLE_SERVICE_STATUS.lock().unwrap().clone() == PortableServiceStatus::NotStarted
|
||||
{
|
||||
sleep(1.).await;
|
||||
continue;
|
||||
}
|
||||
if let Ok(mut c) = ipc::connect(1000, IPC_PROFIX).await {
|
||||
let mut nack = 0;
|
||||
let mut timer = tokio::time::interval(Duration::from_secs(1));
|
||||
loop {
|
||||
tokio::select! {
|
||||
res = c.next() => {
|
||||
match res {
|
||||
Err(err) => {
|
||||
log::error!("ipc connection closed: {}", err);
|
||||
break;
|
||||
}
|
||||
Ok(Some(Data::DataPortableService(data))) => {
|
||||
match data {
|
||||
Ping => {
|
||||
c.send(&Data::DataPortableService(Pong)).await.ok();
|
||||
}
|
||||
Pong => {
|
||||
nack = 0;
|
||||
*PORTABLE_SERVICE_STATUS.lock().unwrap() = PortableServiceStatus::Running;
|
||||
},
|
||||
ConnCount(None) => {
|
||||
let cnt = crate::server::CONN_COUNT.lock().unwrap().clone();
|
||||
c.send(&Data::DataPortableService(ConnCount(Some(cnt)))).await.ok();
|
||||
},
|
||||
WillClose => {
|
||||
log::info!("portable service will close, set status to not started");
|
||||
*PORTABLE_SERVICE_STATUS.lock().unwrap() = PortableServiceStatus::NotStarted;
|
||||
break;
|
||||
}
|
||||
_=>{}
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
_ = timer.tick() => {
|
||||
nack+=1;
|
||||
if nack > MAX_NACK {
|
||||
// In fact, this will not happen, ipc will be closed before max nack.
|
||||
log::error!("max ipc nack, set status to not started");
|
||||
*PORTABLE_SERVICE_STATUS.lock().unwrap() = PortableServiceStatus::NotStarted;
|
||||
break;
|
||||
}
|
||||
c.send(&Data::DataPortableService(Ping)).await.ok();
|
||||
}
|
||||
Some(data) = rx.recv() => {
|
||||
allow_err!(c.send(&data).await);
|
||||
}
|
||||
let rx = Arc::new(tokio::sync::Mutex::new(rx));
|
||||
let postfix = IPC_PROFIX;
|
||||
|
||||
match new_listener(postfix).await {
|
||||
Ok(mut incoming) => loop {
|
||||
{
|
||||
tokio::select! {
|
||||
Some(result) = incoming.next() => {
|
||||
match result {
|
||||
Ok(stream) => {
|
||||
log::info!("Got portable service ipc connection");
|
||||
let rx_clone = rx.clone();
|
||||
tokio::spawn(async move {
|
||||
let mut stream = Connection::new(stream);
|
||||
let postfix = postfix.to_owned();
|
||||
let mut timer = tokio::time::interval(Duration::from_secs(1));
|
||||
let mut nack = 0;
|
||||
let mut rx = rx_clone.lock().await;
|
||||
loop {
|
||||
tokio::select! {
|
||||
res = stream.next() => {
|
||||
match res {
|
||||
Err(err) => {
|
||||
log::info!(
|
||||
"ipc{} connection closed: {}",
|
||||
postfix,
|
||||
err
|
||||
);
|
||||
break;
|
||||
}
|
||||
Ok(Some(Data::DataPortableService(data))) => match data {
|
||||
Ping => {
|
||||
stream.send(&Data::DataPortableService(Pong)).await.ok();
|
||||
}
|
||||
Pong => {
|
||||
nack = 0;
|
||||
*PORTABLE_SERVICE_RUNNING.lock().unwrap() = true;
|
||||
},
|
||||
ConnCount(None) => {
|
||||
let cnt = crate::server::CONN_COUNT.lock().unwrap().clone();
|
||||
stream.send(&Data::DataPortableService(ConnCount(Some(cnt)))).await.ok();
|
||||
},
|
||||
WillClose => {
|
||||
log::info!("portable service will close");
|
||||
break;
|
||||
}
|
||||
_=>{}
|
||||
}
|
||||
_=>{}
|
||||
}
|
||||
}
|
||||
_ = timer.tick() => {
|
||||
nack+=1;
|
||||
if nack > MAX_NACK {
|
||||
// In fact, this will not happen, ipc will be closed before max nack.
|
||||
log::error!("max ipc nack");
|
||||
break;
|
||||
}
|
||||
stream.send(&Data::DataPortableService(Ping)).await.ok();
|
||||
}
|
||||
Some(data) = rx.recv() => {
|
||||
allow_err!(stream.send(&data).await);
|
||||
}
|
||||
}
|
||||
}
|
||||
*PORTABLE_SERVICE_RUNNING.lock().unwrap() = false;
|
||||
});
|
||||
}
|
||||
Err(err) => {
|
||||
log::error!("Couldn't get portable client: {:?}", err);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
connect_failed += 1;
|
||||
if connect_failed > IPC_CONN_TIMEOUT.as_secs() {
|
||||
connect_failed = 0;
|
||||
*PORTABLE_SERVICE_STATUS.lock().unwrap() = PortableServiceStatus::NotStarted;
|
||||
log::info!(
|
||||
"connect failed {} times, set status to not started",
|
||||
connect_failed
|
||||
);
|
||||
}
|
||||
log::info!(
|
||||
"client ip connect failed, status:{:?}",
|
||||
PORTABLE_SERVICE_STATUS.lock().unwrap().clone(),
|
||||
);
|
||||
},
|
||||
Err(err) => {
|
||||
log::error!("Failed to start portable service ipc server: {}", err);
|
||||
}
|
||||
sleep(1.).await;
|
||||
}
|
||||
}
|
||||
|
||||
fn client_ipc_send(data: Data) -> ResultType<()> {
|
||||
fn ipc_send(data: Data) -> ResultType<()> {
|
||||
let sender = SENDER.lock().unwrap();
|
||||
sender
|
||||
.send(data)
|
||||
@@ -721,21 +665,25 @@ pub mod client {
|
||||
fn handle_mouse_(evt: &MouseEvent) -> ResultType<()> {
|
||||
let mut v = vec![];
|
||||
evt.write_to_vec(&mut v)?;
|
||||
client_ipc_send(Data::DataPortableService(DataPortableService::Mouse(v)))
|
||||
ipc_send(Data::DataPortableService(DataPortableService::Mouse(v)))
|
||||
}
|
||||
|
||||
fn handle_key_(evt: &KeyEvent) -> ResultType<()> {
|
||||
let mut v = vec![];
|
||||
evt.write_to_vec(&mut v)?;
|
||||
client_ipc_send(Data::DataPortableService(DataPortableService::Key(v)))
|
||||
ipc_send(Data::DataPortableService(DataPortableService::Key(v)))
|
||||
}
|
||||
|
||||
pub fn create_capturer(
|
||||
current_display: usize,
|
||||
display: scrap::Display,
|
||||
use_yuv: bool,
|
||||
portable_service_running: bool,
|
||||
) -> ResultType<Box<dyn TraitCapturer>> {
|
||||
if PORTABLE_SERVICE_STATUS.lock().unwrap().clone() == PortableServiceStatus::Running {
|
||||
if portable_service_running != PORTABLE_SERVICE_RUNNING.lock().unwrap().clone() {
|
||||
log::info!("portable service status mismatch");
|
||||
}
|
||||
if portable_service_running {
|
||||
log::info!("Create shared memeory capturer");
|
||||
return Ok(Box::new(CapturerPortable::new(current_display, use_yuv)));
|
||||
} else {
|
||||
@@ -747,7 +695,7 @@ pub mod client {
|
||||
}
|
||||
|
||||
pub fn get_cursor_info(pci: PCURSORINFO) -> BOOL {
|
||||
if PORTABLE_SERVICE_STATUS.lock().unwrap().clone() == PortableServiceStatus::Running {
|
||||
if PORTABLE_SERVICE_RUNNING.lock().unwrap().clone() {
|
||||
get_cursor_info_(&mut SHMEM.lock().unwrap().as_mut().unwrap(), pci)
|
||||
} else {
|
||||
unsafe { winuser::GetCursorInfo(pci) }
|
||||
@@ -755,7 +703,7 @@ pub mod client {
|
||||
}
|
||||
|
||||
pub fn handle_mouse(evt: &MouseEvent) {
|
||||
if PORTABLE_SERVICE_STATUS.lock().unwrap().clone() == PortableServiceStatus::Running {
|
||||
if PORTABLE_SERVICE_RUNNING.lock().unwrap().clone() {
|
||||
handle_mouse_(evt).ok();
|
||||
} else {
|
||||
crate::input_service::handle_mouse_(evt);
|
||||
@@ -763,7 +711,7 @@ pub mod client {
|
||||
}
|
||||
|
||||
pub fn handle_key(evt: &KeyEvent) {
|
||||
if PORTABLE_SERVICE_STATUS.lock().unwrap().clone() == PortableServiceStatus::Running {
|
||||
if PORTABLE_SERVICE_RUNNING.lock().unwrap().clone() {
|
||||
handle_key_(evt).ok();
|
||||
} else {
|
||||
crate::input_service::handle_key_(evt);
|
||||
|
||||
Reference in New Issue
Block a user