enc punch

This commit is contained in:
rustdesk
2023-05-14 18:17:02 +08:00
parent a26de5a86a
commit baecf3edb8
9 changed files with 308 additions and 214 deletions

View File

@@ -13,7 +13,7 @@ use hbb_common::tcp::FramedStream;
use hbb_common::{
allow_err,
anyhow::bail,
config::{Config, REG_INTERVAL, RENDEZVOUS_PORT, RENDEZVOUS_TIMEOUT},
config::{Config, CONNECT_TIMEOUT, READ_TIMEOUT, REG_INTERVAL, RENDEZVOUS_PORT},
futures::future::join_all,
log,
protobuf::Message as _,
@@ -120,7 +120,7 @@ impl RendezvousMediator {
})
.unwrap_or(host.to_owned());
let host = crate::check_port(&host, RENDEZVOUS_PORT);
let (mut socket, addr) = socket_client::new_udp_for(&host, RENDEZVOUS_TIMEOUT).await?;
let (mut socket, addr) = socket_client::new_udp_for(&host, CONNECT_TIMEOUT).await?;
let mut rz = Self {
addr: addr,
host: host.clone(),
@@ -307,7 +307,7 @@ impl RendezvousMediator {
secure,
);
let mut socket = socket_client::connect_tcp(&*self.host, RENDEZVOUS_TIMEOUT).await?;
let mut socket = socket_client::connect_tcp(&*self.host, CONNECT_TIMEOUT).await?;
let mut msg_out = Message::new();
let mut rr = RelayResponse {
@@ -352,7 +352,7 @@ impl RendezvousMediator {
}
let peer_addr = AddrMangle::decode(&fla.socket_addr);
log::debug!("Handle intranet from {:?}", peer_addr);
let mut socket = socket_client::connect_tcp(&*self.host, RENDEZVOUS_TIMEOUT).await?;
let mut socket = socket_client::connect_tcp(&*self.host, CONNECT_TIMEOUT).await?;
let local_addr = socket.local_addr();
let local_addr: SocketAddr =
format!("{}:{}", local_addr.ip(), local_addr.port()).parse()?;
@@ -391,7 +391,7 @@ impl RendezvousMediator {
let peer_addr = AddrMangle::decode(&ph.socket_addr);
log::debug!("Punch hole to {:?}", peer_addr);
let mut socket = {
let socket = socket_client::connect_tcp(&*self.host, RENDEZVOUS_TIMEOUT).await?;
let socket = socket_client::connect_tcp(&*self.host, CONNECT_TIMEOUT).await?;
let local_addr = socket.local_addr();
// key important here for punch hole to tell my gateway incoming peer is safe.
// it can not be async here, because local_addr can not be reused, we must close the connection before use it again.
@@ -649,7 +649,8 @@ pub async fn query_online_states<F: FnOnce(Vec<String>, Vec<String>)>(ids: Vec<S
}
async fn create_online_stream() -> ResultType<FramedStream> {
let (rendezvous_server, _servers, _contained) = crate::get_rendezvous_server(1_000).await;
let (rendezvous_server, _servers, _contained) =
crate::get_rendezvous_server(READ_TIMEOUT).await;
let tmp: Vec<&str> = rendezvous_server.split(":").collect();
if tmp.len() != 2 {
bail!("Invalid server address: {}", rendezvous_server);
@@ -659,7 +660,7 @@ async fn create_online_stream() -> ResultType<FramedStream> {
bail!("Invalid server address: {}", rendezvous_server);
}
let online_server = format!("{}:{}", tmp[0], port - 1);
socket_client::connect_tcp(online_server, RENDEZVOUS_TIMEOUT).await
socket_client::connect_tcp(online_server, CONNECT_TIMEOUT).await
}
async fn query_online_states_(
@@ -683,38 +684,30 @@ async fn query_online_states_(
let mut socket = create_online_stream().await?;
socket.send(&msg_out).await?;
match socket.next_timeout(RENDEZVOUS_TIMEOUT).await {
Some(Ok(bytes)) => {
if let Ok(msg_in) = RendezvousMessage::parse_from_bytes(&bytes) {
match msg_in.union {
Some(rendezvous_message::Union::OnlineResponse(online_response)) => {
let states = online_response.states;
let mut onlines = Vec::new();
let mut offlines = Vec::new();
for i in 0..ids.len() {
// bytes index from left to right
let bit_value = 0x01 << (7 - i % 8);
if (states[i / 8] & bit_value) == bit_value {
onlines.push(ids[i].clone());
} else {
offlines.push(ids[i].clone());
}
}
return Ok((onlines, offlines));
}
_ => {
// ignore
if let Some(msg_in) = crate::common::get_next_nonkeyexchange_msg(&mut socket, None).await {
match msg_in.union {
Some(rendezvous_message::Union::OnlineResponse(online_response)) => {
let states = online_response.states;
let mut onlines = Vec::new();
let mut offlines = Vec::new();
for i in 0..ids.len() {
// bytes index from left to right
let bit_value = 0x01 << (7 - i % 8);
if (states[i / 8] & bit_value) == bit_value {
onlines.push(ids[i].clone());
} else {
offlines.push(ids[i].clone());
}
}
return Ok((onlines, offlines));
}
_ => {
// ignore
}
}
Some(Err(e)) => {
log::error!("Failed to receive {e}");
}
None => {
// TODO: Make sure socket closed?
bail!("Online stream receives None");
}
} else {
// TODO: Make sure socket closed?
bail!("Online stream receives None");
}
if query_begin.elapsed() > timeout {