udp punch and ipv6 punch

This commit is contained in:
rustdesk
2025-06-12 21:32:28 +08:00
parent 05a812247a
commit 7792ac1481
12 changed files with 1151 additions and 178 deletions

View File

@@ -4,7 +4,7 @@ use std::{
atomic::{AtomicBool, Ordering},
Arc,
},
time::Instant,
time::{Duration, Instant},
};
use uuid::Uuid;
@@ -18,10 +18,9 @@ use hbb_common::{
futures::future::join_all,
log,
protobuf::Message as _,
proxy::Proxy,
rendezvous_proto::*,
sleep,
socket_client::{self, connect_tcp, is_ipv4},
socket_client::{self, connect_tcp, is_ipv4, new_direct_udp_for, new_udp_for},
tokio::{self, select, sync::Mutex, time::interval},
udp::FramedSocket,
AddrMangle, IntoTargetAddr, ResultType, Stream, TargetAddr,
@@ -30,13 +29,13 @@ use hbb_common::{
use crate::{
check_port,
server::{check_zombie, new as new_server, ServerPtr},
ui_interface::get_builtin_option,
};
type Message = RendezvousMessage;
lazy_static::lazy_static! {
static ref SOLVING_PK_MISMATCH: Arc<Mutex<String>> = Default::default();
static ref SOLVING_PK_MISMATCH: Mutex<String> = Default::default();
static ref LAST_MSG: Mutex<(SocketAddr, Instant)> = Mutex::new((SocketAddr::new([0; 4].into(), 0), Instant::now()));
}
static SHOULD_EXIT: AtomicBool = AtomicBool::new(false);
static MANUAL_RESTARTED: AtomicBool = AtomicBool::new(false);
@@ -142,7 +141,7 @@ impl RendezvousMediator {
pub async fn start_udp(server: ServerPtr, host: String) -> ResultType<()> {
let host = check_port(&host, RENDEZVOUS_PORT);
log::info!("start udp: {host}");
let (mut socket, mut addr) = socket_client::new_udp_for(&host, CONNECT_TIMEOUT).await?;
let (mut socket, mut addr) = new_udp_for(&host, CONNECT_TIMEOUT).await?;
let mut rz = Self {
addr: addr.clone(),
host: host.clone(),
@@ -388,7 +387,7 @@ impl RendezvousMediator {
if (cfg!(debug_assertions) && option_env!("TEST_TCP").is_some())
|| Config::is_proxy()
|| use_ws()
|| get_builtin_option(config::keys::OPTION_DISABLE_UDP) == "Y"
|| crate::is_udp_disabled()
{
Self::start_tcp(server, host).await
} else {
@@ -404,6 +403,7 @@ impl RendezvousMediator {
server,
rr.secure,
false,
Default::default(),
)
.await
}
@@ -416,6 +416,7 @@ impl RendezvousMediator {
server: ServerPtr,
secure: bool,
initiate: bool,
socket_addr_v6: bytes::Bytes,
) -> ResultType<()> {
let peer_addr = AddrMangle::decode(&socket_addr);
log::info!(
@@ -432,6 +433,7 @@ impl RendezvousMediator {
let mut rr = RelayResponse {
socket_addr: socket_addr.into(),
version: crate::VERSION.to_owned(),
socket_addr_v6,
..Default::default()
};
if initiate {
@@ -454,16 +456,28 @@ impl RendezvousMediator {
}
async fn handle_intranet(&self, fla: FetchLocalAddr, server: ServerPtr) -> ResultType<()> {
let addr = AddrMangle::decode(&fla.socket_addr);
let last = *LAST_MSG.lock().await;
*LAST_MSG.lock().await = (addr, Instant::now());
// skip duplicate punch hole messages
if last.0 == addr && last.1.elapsed().as_millis() < 100 {
return Ok(());
}
let peer_addr_v6 = hbb_common::AddrMangle::decode(&fla.socket_addr_v6);
let relay_server = self.get_relay_server(fla.relay_server.clone());
// nat64, go relay directly, because current hbbs will crash if demangle ipv6 address
// websocket, go relay directly
if is_ipv4(&self.addr)
&& !config::is_disable_tcp_listen()
&& !Config::is_proxy()
&& !use_ws()
{
let relay = use_ws() || Config::is_proxy();
let mut socket_addr_v6 = Default::default();
if peer_addr_v6.port() > 0 && !relay {
socket_addr_v6 = start_ipv6(peer_addr_v6, addr, server.clone()).await;
}
if is_ipv4(&self.addr) && !relay && !config::is_disable_tcp_listen() {
if let Err(err) = self
.handle_intranet_(fla.clone(), server.clone(), relay_server.clone())
.handle_intranet_(
fla.clone(),
server.clone(),
relay_server.clone(),
socket_addr_v6.clone(),
)
.await
{
log::debug!("Failed to handle intranet: {:?}, will try relay", err);
@@ -479,6 +493,7 @@ impl RendezvousMediator {
server,
true,
true,
socket_addr_v6,
)
.await
}
@@ -488,6 +503,7 @@ impl RendezvousMediator {
fla: FetchLocalAddr,
server: ServerPtr,
relay_server: String,
socket_addr_v6: bytes::Bytes,
) -> ResultType<()> {
let peer_addr = AddrMangle::decode(&fla.socket_addr);
log::debug!("Handle intranet from {:?}", peer_addr);
@@ -503,6 +519,7 @@ impl RendezvousMediator {
local_addr: AddrMangle::encode(local_addr).into(),
relay_server,
version: crate::VERSION.to_owned(),
socket_addr_v6,
..Default::default()
});
let bytes = msg_out.write_to_bytes()?;
@@ -512,13 +529,25 @@ impl RendezvousMediator {
}
async fn handle_punch_hole(&self, ph: PunchHole, server: ServerPtr) -> ResultType<()> {
let mut peer_addr = AddrMangle::decode(&ph.socket_addr);
let last = *LAST_MSG.lock().await;
*LAST_MSG.lock().await = (peer_addr, Instant::now());
// skip duplicate punch hole messages
if last.0 == peer_addr && last.1.elapsed().as_millis() < 100 {
return Ok(());
}
let peer_addr_v6 = hbb_common::AddrMangle::decode(&ph.socket_addr_v6);
let relay = use_ws() || Config::is_proxy() || ph.force_relay;
let mut socket_addr_v6 = Default::default();
if peer_addr_v6.port() > 0 && !relay {
socket_addr_v6 = start_ipv6(peer_addr_v6, peer_addr, server.clone()).await;
}
let relay_server = self.get_relay_server(ph.relay_server);
// for ensure, websocket go relay directly
if ph.nat_type.enum_value() == Ok(NatType::SYMMETRIC)
|| Config::get_nat_type() == NatType::SYMMETRIC as i32
|| config::is_disable_tcp_listen()
|| use_ws()
|| Config::is_proxy()
|| relay
|| (config::is_disable_tcp_listen() && ph.udp_port <= 0)
{
let uuid = Uuid::new_v4().to_string();
return self
@@ -529,11 +558,27 @@ impl RendezvousMediator {
server,
true,
true,
socket_addr_v6.clone(),
)
.await;
}
let peer_addr = AddrMangle::decode(&ph.socket_addr);
log::debug!("Punch hole to {:?}", peer_addr);
use hbb_common::protobuf::Enum;
let nat_type = NatType::from_i32(Config::get_nat_type()).unwrap_or(NatType::UNKNOWN_NAT);
let msg_punch = PunchHoleSent {
socket_addr: ph.socket_addr,
id: Config::get_id(),
relay_server,
nat_type: nat_type.into(),
version: crate::VERSION.to_owned(),
socket_addr_v6,
..Default::default()
};
if ph.udp_port > 0 {
peer_addr.set_port(ph.udp_port as u16);
self.punch_udp_hole(peer_addr, server, msg_punch).await?;
return Ok(());
}
log::debug!("Punch tcp hole to {:?}", peer_addr);
let mut socket = {
let socket = connect_tcp(&*self.host, CONNECT_TIMEOUT).await?;
let local_addr = socket.local_addr();
@@ -543,22 +588,36 @@ impl RendezvousMediator {
socket
};
let mut msg_out = Message::new();
use hbb_common::protobuf::Enum;
let nat_type = NatType::from_i32(Config::get_nat_type()).unwrap_or(NatType::UNKNOWN_NAT);
msg_out.set_punch_hole_sent(PunchHoleSent {
socket_addr: ph.socket_addr,
id: Config::get_id(),
relay_server,
nat_type: nat_type.into(),
version: crate::VERSION.to_owned(),
..Default::default()
});
msg_out.set_punch_hole_sent(msg_punch);
let bytes = msg_out.write_to_bytes()?;
socket.send_raw(bytes).await?;
crate::accept_connection(server.clone(), socket, peer_addr, true).await;
Ok(())
}
async fn punch_udp_hole(
&self,
peer_addr: SocketAddr,
server: ServerPtr,
msg_punch: PunchHoleSent,
) -> ResultType<()> {
let mut msg_out = Message::new();
msg_out.set_punch_hole_sent(msg_punch);
let (socket, addr) = new_direct_udp_for(&self.host).await?;
let data = msg_out.write_to_bytes()?;
socket.send_to(&data, addr).await?;
let socket_cloned = socket.clone();
tokio::spawn(async move {
for _ in 0..2 {
let tm = (hbb_common::time_based_rand() % 20 + 10) as f32 / 1000.;
hbb_common::sleep(tm).await;
socket.send_to(&data, addr).await.ok();
}
});
udp_nat_listen(socket_cloned.clone(), peer_addr, peer_addr, server).await?;
Ok(())
}
async fn register_pk(&mut self, socket: Sink<'_>) -> ResultType<()> {
let mut msg_out = Message::new();
let pk = Config::get_key_pair().1;
@@ -722,3 +781,49 @@ impl Sink<'_> {
}
}
}
async fn start_ipv6(
peer_addr_v6: SocketAddr,
peer_addr_v4: SocketAddr,
server: ServerPtr,
) -> bytes::Bytes {
crate::test_ipv6().await;
if let Some((socket, local_addr_v6)) = crate::get_ipv6_socket().await {
let server = server.clone();
tokio::spawn(async move {
allow_err!(udp_nat_listen(socket.clone(), peer_addr_v6, peer_addr_v4, server).await);
});
return local_addr_v6;
}
Default::default()
}
async fn udp_nat_listen(
socket: Arc<tokio::net::UdpSocket>,
peer_addr: SocketAddr,
peer_addr_v4: SocketAddr,
server: ServerPtr,
) -> ResultType<()> {
let tm = Instant::now();
let socket_cloned = socket.clone();
let func = async {
socket.connect(peer_addr).await?;
let res = crate::punch_udp(socket.clone(), true).await?;
let stream = crate::kcp_stream::KcpStream::accept(
socket,
Duration::from_secs(CONNECT_TIMEOUT as _),
res,
)
.await?;
crate::server::create_tcp_connection(server, stream.1, peer_addr_v4, true).await?;
Ok(())
};
func.await.map_err(|e: anyhow::Error| {
anyhow::anyhow!(
"Stop listening on {:?} for remote {peer_addr} with KCP, {:?} elapsed: {e}",
socket_cloned.local_addr(),
tm.elapsed()
)
})?;
Ok(())
}