This commit is contained in:
Asura
2022-07-20 19:51:09 -07:00
110 changed files with 4942 additions and 1487 deletions

View File

@@ -8,6 +8,7 @@ use crate::video_service;
use crate::{common::MOBILE_INFO2, mobile::connection_manager::start_channel};
use crate::{ipc, VERSION};
use hbb_common::fs::can_enable_overwrite_detection;
use hbb_common::password_security::password;
use hbb_common::{
config::Config,
fs,
@@ -35,6 +36,7 @@ pub type Sender = mpsc::UnboundedSender<(Instant, Arc<Message>)>;
lazy_static::lazy_static! {
static ref LOGIN_FAILURES: Arc::<Mutex<HashMap<String, (i32, i32, i32)>>> = Default::default();
static ref SESSIONS: Arc::<Mutex<HashMap<String, Session>>> = Default::default();
}
pub static CLICK_TIME: AtomicI64 = AtomicI64::new(0);
pub static MOUSE_MOVE_TIME: AtomicI64 = AtomicI64::new(0);
@@ -53,6 +55,14 @@ enum MessageInput {
BlockOff,
}
#[derive(Clone, Debug)]
struct Session {
name: String,
session_id: u64,
last_recv_time: Arc<Mutex<Instant>>,
random_password: String,
}
pub struct Connection {
inner: ConnInner,
stream: super::Stream,
@@ -80,6 +90,8 @@ pub struct Connection {
video_ack_required: bool,
peer_info: (String, String),
api_server: String,
lr: LoginRequest,
last_recv_time: Arc<Mutex<Instant>>,
}
impl Subscriber for ConnInner {
@@ -91,7 +103,7 @@ impl Subscriber for ConnInner {
#[inline]
fn send(&mut self, msg: Arc<Message>) {
match &msg.union {
Some(message::Union::video_frame(_)) => {
Some(message::Union::VideoFrame(_)) => {
self.tx_video.as_mut().map(|tx| {
allow_err!(tx.send((Instant::now(), msg)));
});
@@ -111,6 +123,7 @@ const H1: Duration = Duration::from_secs(3600);
const MILLI1: Duration = Duration::from_millis(1);
const SEND_TIMEOUT_VIDEO: u64 = 12_000;
const SEND_TIMEOUT_OTHER: u64 = SEND_TIMEOUT_VIDEO * 10;
const SESSION_TIMEOUT: Duration = Duration::from_secs(30);
impl Connection {
pub async fn start(
@@ -164,6 +177,8 @@ impl Connection {
video_ack_required: false,
peer_info: Default::default(),
api_server: "".to_owned(),
lr: Default::default(),
last_recv_time: Arc::new(Mutex::new(Instant::now())),
};
#[cfg(not(any(target_os = "android", target_os = "ios")))]
tokio::spawn(async move {
@@ -222,7 +237,8 @@ impl Connection {
let mut msg_out = Message::new();
msg_out.set_misc(misc);
conn.send(msg_out).await;
conn.on_close("Close requested from connection manager", false);
conn.on_close("Close requested from connection manager", false).await;
SESSIONS.lock().unwrap().remove(&conn.lr.my_id);
break;
}
ipc::Data::ChatMessage{text} => {
@@ -311,11 +327,12 @@ impl Connection {
if let Some(res) = res {
match res {
Err(err) => {
conn.on_close(&err.to_string(), true);
conn.on_close(&err.to_string(), true).await;
break;
},
Ok(bytes) => {
last_recv_time = Instant::now();
*conn.last_recv_time.lock().unwrap() = Instant::now();
if let Ok(msg_in) = Message::parse_from_bytes(&bytes) {
if !conn.on_message(msg_in).await {
break;
@@ -324,14 +341,14 @@ impl Connection {
}
}
} else {
conn.on_close("Reset by the peer", true);
conn.on_close("Reset by the peer", true).await;
break;
}
},
_ = conn.timer.tick() => {
if !conn.read_jobs.is_empty() {
if let Err(err) = fs::handle_read_jobs(&mut conn.read_jobs, &mut conn.stream).await {
conn.on_close(&err.to_string(), false);
conn.on_close(&err.to_string(), false).await;
break;
}
} else {
@@ -344,7 +361,7 @@ impl Connection {
video_service::notify_video_frame_feched(id, Some(instant.into()));
}
if let Err(err) = conn.stream.send(&value as &Message).await {
conn.on_close(&err.to_string(), false);
conn.on_close(&err.to_string(), false).await;
break;
}
},
@@ -354,7 +371,7 @@ impl Connection {
if latency > 1000 {
match &msg.union {
Some(message::Union::audio_frame(_)) => {
Some(message::Union::AudioFrame(_)) => {
// log::info!("audio frame latency {}", instant.elapsed().as_secs_f32());
continue;
}
@@ -362,13 +379,13 @@ impl Connection {
}
}
if let Err(err) = conn.stream.send(msg).await {
conn.on_close(&err.to_string(), false);
conn.on_close(&err.to_string(), false).await;
break;
}
},
_ = test_delay_timer.tick() => {
if last_recv_time.elapsed() >= SEC30 {
conn.on_close("Timeout", true);
conn.on_close("Timeout", true).await;
break;
}
let time = crate::get_time();
@@ -398,8 +415,9 @@ impl Connection {
video_service::notify_video_frame_feched(id, None);
scrap::codec::Encoder::update_video_encoder(id, scrap::codec::EncoderUpdate::Remove);
video_service::VIDEO_QOS.lock().unwrap().reset();
password::after_session(conn.authorized);
if let Err(err) = conn.try_port_forward_loop(&mut rx_from_cm).await {
conn.on_close(&err.to_string(), false);
conn.on_close(&err.to_string(), false).await;
}
conn.post_audit(json!({
@@ -493,7 +511,7 @@ impl Connection {
res = self.stream.next() => {
if let Some(res) = res {
last_recv_time = Instant::now();
timeout(SEND_TIMEOUT_OTHER, forward.send(res?.into())).await??;
timeout(SEND_TIMEOUT_OTHER, forward.send(res?)).await??;
} else {
bail!("Stream reset by the peer");
}
@@ -572,7 +590,7 @@ impl Connection {
let url = self.api_server.clone();
let mut v = v;
v["id"] = json!(Config::get_id());
v["uuid"] = json!(base64::encode(crate::get_uuid()));
v["uuid"] = json!(base64::encode(hbb_common::get_uuid()));
v["Id"] = json!(self.inner.id);
tokio::spawn(async move {
allow_err!(Self::post_audit_async(url, v).await);
@@ -629,9 +647,9 @@ impl Connection {
#[cfg(target_os = "linux")]
if !self.file_transfer.is_some() && !self.port_forward_socket.is_some() {
let dtype = crate::platform::linux::get_display_server();
if dtype != "x11" {
if dtype != "x11" && dtype != "wayland" {
res.set_error(format!(
"Unsupported display server type {}, x11 expected",
"Unsupported display server type {}, x11 or wayland expected",
dtype
));
let mut msg_out = Message::new();
@@ -667,7 +685,7 @@ impl Connection {
res.set_peer_info(pi);
} else {
try_activate_screen();
match video_service::get_displays() {
match super::video_service::get_displays().await {
Err(err) => {
res.set_error(format!("X11 error: {}", err));
}
@@ -779,8 +797,77 @@ impl Connection {
self.tx_input.send(MessageInput::Key((msg, press))).ok();
}
fn validate_one_password(&self, password: String) -> bool {
if password.len() == 0 {
return false;
}
let mut hasher = Sha256::new();
hasher.update(password);
hasher.update(&self.hash.salt);
let mut hasher2 = Sha256::new();
hasher2.update(&hasher.finalize()[..]);
hasher2.update(&self.hash.challenge);
hasher2.finalize()[..] == self.lr.password[..]
}
fn validate_password(&mut self) -> bool {
if password::security_enabled() {
if self.validate_one_password(Config::get_security_password()) {
return true;
}
}
if password::random_password_valid() {
let password = password::random_password();
if self.validate_one_password(password.clone()) {
if password::onetime_password_activated() {
password::set_onetime_password_activated(false);
}
SESSIONS.lock().unwrap().insert(
self.lr.my_id.clone(),
Session {
name: self.lr.my_name.clone(),
session_id: self.lr.session_id,
last_recv_time: self.last_recv_time.clone(),
random_password: password,
},
);
return true;
}
}
false
}
fn is_of_recent_session(&mut self) -> bool {
let session = SESSIONS
.lock()
.unwrap()
.get(&self.lr.my_id)
.map(|s| s.to_owned());
if let Some(session) = session {
if session.name == self.lr.my_name
&& session.session_id == self.lr.session_id
&& !self.lr.password.is_empty()
&& self.validate_one_password(session.random_password.clone())
&& session.last_recv_time.lock().unwrap().elapsed() < SESSION_TIMEOUT
{
SESSIONS.lock().unwrap().insert(
self.lr.my_id.clone(),
Session {
name: self.lr.my_name.clone(),
session_id: self.lr.session_id,
last_recv_time: self.last_recv_time.clone(),
random_password: session.random_password,
},
);
return true;
}
}
false
}
async fn on_message(&mut self, msg: Message) -> bool {
if let Some(message::Union::login_request(lr)) = msg.union {
if let Some(message::Union::LoginRequest(lr)) = msg.union {
self.lr = lr.clone();
if let Some(o) = lr.option.as_ref() {
self.update_option(o).await;
if let Some(q) = o.video_codec_state.clone().take() {
@@ -805,7 +892,7 @@ impl Connection {
return true;
}
match lr.union {
Some(login_request::Union::file_transfer(ft)) => {
Some(login_request::Union::FileTransfer(ft)) => {
if !Config::get_option("enable-file-transfer").is_empty() {
self.send_login_error("No permission of file transfer")
.await;
@@ -814,7 +901,7 @@ impl Connection {
}
self.file_transfer = Some((ft.dir, ft.show_hidden));
}
Some(login_request::Union::port_forward(mut pf)) => {
Some(login_request::Union::PortForward(mut pf)) => {
if !Config::get_option("enable-tunnel").is_empty() {
self.send_login_error("No permission of IP tunneling").await;
sleep(1.).await;
@@ -851,15 +938,19 @@ impl Connection {
}
if !crate::is_ip(&lr.username) && lr.username != Config::get_id() {
self.send_login_error("Offline").await;
} else if self.is_of_recent_session() {
self.try_start_cm(lr.my_id, lr.my_name, true);
self.send_logon_response().await;
if self.port_forward_socket.is_some() {
return false;
}
} else if lr.password.is_empty() {
self.try_start_cm(lr.my_id, lr.my_name, false);
} else {
let mut hasher = Sha256::new();
hasher.update(&Config::get_password());
hasher.update(&self.hash.salt);
let mut hasher2 = Sha256::new();
hasher2.update(&hasher.finalize()[..]);
hasher2.update(&self.hash.challenge);
if password::passwords().len() == 0 {
self.send_login_error("Connection not allowed").await;
return false;
}
let mut failure = LOGIN_FAILURES
.lock()
.unwrap()
@@ -872,7 +963,7 @@ impl Connection {
.await;
} else if time == failure.0 && failure.1 > 6 {
self.send_login_error("Please try 1 minute later").await;
} else if hasher2.finalize()[..] != lr.password[..] {
} else if !self.validate_password() {
if failure.0 == time {
failure.1 += 1;
failure.2 += 1;
@@ -898,7 +989,7 @@ impl Connection {
}
}
}
} else if let Some(message::Union::test_delay(t)) = msg.union {
} else if let Some(message::Union::TestDelay(t)) = msg.union {
if t.from_client {
let mut msg_out = Message::new();
msg_out.set_test_delay(t);
@@ -913,7 +1004,7 @@ impl Connection {
}
} else if self.authorized {
match msg.union {
Some(message::Union::mouse_event(me)) => {
Some(message::Union::MouseEvent(me)) => {
#[cfg(any(target_os = "android", target_os = "ios"))]
if let Err(e) = call_main_service_mouse_input(me.mask, me.x, me.y) {
log::debug!("call_main_service_mouse_input fail:{}", e);
@@ -928,7 +1019,7 @@ impl Connection {
self.input_mouse(me, self.inner.id());
}
}
Some(message::Union::key_event(me)) => {
Some(message::Union::KeyEvent(me)) => {
#[cfg(not(any(target_os = "android", target_os = "ios")))]
if self.keyboard {
if is_enter(&me) {
@@ -944,8 +1035,8 @@ impl Connection {
};
if is_press {
match me.union {
Some(key_event::Union::unicode(_))
| Some(key_event::Union::seq(_)) => {
Some(key_event::Union::Unicode(_))
| Some(key_event::Union::Seq(_)) => {
self.input_key(me, false);
}
_ => {
@@ -957,14 +1048,14 @@ impl Connection {
}
}
}
Some(message::Union::clipboard(cb)) =>
Some(message::Union::Clipboard(cb)) =>
{
#[cfg(not(any(target_os = "android", target_os = "ios")))]
if self.clipboard {
update_clipboard(cb, None);
}
}
Some(message::Union::cliprdr(_clip)) => {
Some(message::Union::Cliprdr(_clip)) => {
if self.file_transfer_enabled() {
#[cfg(windows)]
if let Some(clip) = msg_2_clip(_clip) {
@@ -972,13 +1063,13 @@ impl Connection {
}
}
}
Some(message::Union::file_action(fa)) => {
Some(message::Union::FileAction(fa)) => {
if self.file_transfer.is_some() {
match fa.union {
Some(file_action::Union::read_dir(rd)) => {
Some(file_action::Union::ReadDir(rd)) => {
self.read_dir(&rd.path, rd.include_hidden);
}
Some(file_action::Union::all_files(f)) => {
Some(file_action::Union::AllFiles(f)) => {
match fs::get_recursive_files(&f.path, f.include_hidden) {
Err(err) => {
self.send(fs::new_error(f.id, err, -1)).await;
@@ -988,7 +1079,7 @@ impl Connection {
}
}
}
Some(file_action::Union::send(s)) => {
Some(file_action::Union::Send(s)) => {
let id = s.id;
let od =
can_enable_overwrite_detection(get_version_number(VERSION));
@@ -1013,7 +1104,7 @@ impl Connection {
}
}
}
Some(file_action::Union::receive(r)) => {
Some(file_action::Union::Receive(r)) => {
self.send_fs(ipc::FS::NewWrite {
path: r.path,
id: r.id,
@@ -1026,31 +1117,31 @@ impl Connection {
.collect(),
});
}
Some(file_action::Union::remove_dir(d)) => {
Some(file_action::Union::RemoveDir(d)) => {
self.send_fs(ipc::FS::RemoveDir {
path: d.path,
id: d.id,
recursive: d.recursive,
});
}
Some(file_action::Union::remove_file(f)) => {
Some(file_action::Union::RemoveFile(f)) => {
self.send_fs(ipc::FS::RemoveFile {
path: f.path,
id: f.id,
file_num: f.file_num,
});
}
Some(file_action::Union::create(c)) => {
Some(file_action::Union::Create(c)) => {
self.send_fs(ipc::FS::CreateDir {
path: c.path,
id: c.id,
});
}
Some(file_action::Union::cancel(c)) => {
Some(file_action::Union::Cancel(c)) => {
self.send_fs(ipc::FS::CancelWrite { id: c.id });
fs::remove_job(c.id, &mut self.read_jobs);
}
Some(file_action::Union::send_confirm(r)) => {
Some(file_action::Union::SendConfirm(r)) => {
if let Some(job) = fs::get_job(r.id, &mut self.read_jobs) {
job.confirm(&r);
}
@@ -1059,8 +1150,8 @@ impl Connection {
}
}
}
Some(message::Union::file_response(fr)) => match fr.union {
Some(file_response::Union::block(block)) => {
Some(message::Union::FileResponse(fr)) => match fr.union {
Some(file_response::Union::Block(block)) => {
self.send_fs(ipc::FS::WriteBlock {
id: block.id,
file_num: block.file_num,
@@ -1068,13 +1159,13 @@ impl Connection {
compressed: block.compressed,
});
}
Some(file_response::Union::done(d)) => {
Some(file_response::Union::Done(d)) => {
self.send_fs(ipc::FS::WriteDone {
id: d.id,
file_num: d.file_num,
});
}
Some(file_response::Union::digest(d)) => self.send_fs(ipc::FS::CheckDigest {
Some(file_response::Union::Digest(d)) => self.send_fs(ipc::FS::CheckDigest {
id: d.id,
file_num: d.file_num,
file_size: d.file_size,
@@ -1083,27 +1174,32 @@ impl Connection {
}),
_ => {}
},
Some(message::Union::misc(misc)) => match misc.union {
Some(misc::Union::switch_display(s)) => {
video_service::switch_display(s.display);
Some(message::Union::Misc(misc)) => match misc.union {
Some(misc::Union::SwitchDisplay(s)) => {
video_service::switch_display(s.display).await;
}
Some(misc::Union::chat_message(c)) => {
Some(misc::Union::ChatMessage(c)) => {
self.send_to_cm(ipc::Data::ChatMessage { text: c.text });
}
Some(misc::Union::option(o)) => {
Some(misc::Union::Option(o)) => {
self.update_option(&o).await;
}
Some(misc::Union::refresh_video(r)) => {
Some(misc::Union::RefreshVideo(r)) => {
if r {
video_service::refresh();
super::video_service::refresh();
}
}
Some(misc::Union::video_received(_)) => {
Some(misc::Union::VideoReceived(_)) => {
video_service::notify_video_frame_feched(
self.inner.id,
Some(Instant::now().into()),
);
}
Some(misc::Union::CloseReason(_)) => {
self.on_close("Peer close", true).await;
SESSIONS.lock().unwrap().remove(&self.lr.my_id);
return false;
}
_ => {}
},
_ => {}
@@ -1258,14 +1354,14 @@ impl Connection {
}
}
fn on_close(&mut self, reason: &str, lock: bool) {
async fn on_close(&mut self, reason: &str, lock: bool) {
if let Some(s) = self.server.upgrade() {
s.write().unwrap().remove_connection(&self.inner);
}
log::info!("#{} Connection closed: {}", self.inner.id(), reason);
if lock && self.lock_after_session_end && self.keyboard {
#[cfg(not(any(target_os = "android", target_os = "ios")))]
lock_screen();
lock_screen().await;
}
self.tx_to_cm.send(ipc::Data::Close).ok();
self.port_forward_socket.take();