From 0bf3a91352b8e6151bf4ac0b8ebb6a2d925f58b7 Mon Sep 17 00:00:00 2001 From: yuluo Date: Wed, 3 Apr 2024 03:07:47 +0800 Subject: [PATCH] add http(s) proxy --- Cargo.lock | 5 + libs/hbb_common/Cargo.toml | 6 +- libs/hbb_common/src/lib.rs | 1 + libs/hbb_common/src/proxy.rs | 487 +++++++++++++++++++++++++++ libs/hbb_common/src/socket_client.rs | 4 +- libs/hbb_common/src/tcp.rs | 61 +--- src/rendezvous_mediator.rs | 13 +- 7 files changed, 525 insertions(+), 52 deletions(-) create mode 100644 libs/hbb_common/src/proxy.rs diff --git a/Cargo.lock b/Cargo.lock index 3d614f745..575d70c4b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2917,6 +2917,7 @@ version = "0.1.0" dependencies = [ "anyhow", "backtrace", + "base64", "bytes", "chrono", "confy", @@ -2928,6 +2929,7 @@ dependencies = [ "flexi_logger", "futures", "futures-util", + "httparse", "lazy_static", "libc", "log", @@ -2945,10 +2947,13 @@ dependencies = [ "socket2 0.3.19", "sodiumoxide", "sysinfo", + "thiserror", "tokio", + "tokio-native-tls", "tokio-socks", "tokio-util", "toml 0.7.8", + "url", "uuid", "winapi 0.3.9", "zstd 0.13.0", diff --git a/libs/hbb_common/Cargo.toml b/libs/hbb_common/Cargo.toml index 01274c4fc..79d3c8ba1 100644 --- a/libs/hbb_common/Cargo.toml +++ b/libs/hbb_common/Cargo.toml @@ -40,7 +40,11 @@ toml = "0.7" uuid = { version = "1.3", features = ["v4"] } # crash, versions >= 0.29.1 are affected by #GuillaumeGomez/sysinfo/1052 sysinfo = { git = "https://github.com/rustdesk-org/sysinfo" } - +thiserror = "1.0.30" +httparse = "1.5.1" +base64 = "0.21.5" +url = "2.2.2" +tokio-native-tls ="0.3.1" [target.'cfg(not(any(target_os = "android", target_os = "ios")))'.dependencies] mac_address = "1.1" machine-uid = { git = "https://github.com/21pages/machine-uid" } diff --git a/libs/hbb_common/src/lib.rs b/libs/hbb_common/src/lib.rs index e802730a0..c47f7d618 100644 --- a/libs/hbb_common/src/lib.rs +++ b/libs/hbb_common/src/lib.rs @@ -18,6 +18,7 @@ pub use tokio; pub use tokio_util; pub mod socket_client; pub mod tcp; +pub mod proxy; pub mod udp; pub use env_logger; pub use log; diff --git a/libs/hbb_common/src/proxy.rs b/libs/hbb_common/src/proxy.rs new file mode 100644 index 000000000..93d4257b1 --- /dev/null +++ b/libs/hbb_common/src/proxy.rs @@ -0,0 +1,487 @@ + +use std::io::{Error as IoError}; + +use std::net::{SocketAddr, ToSocketAddrs}; + +use base64::Engine; +use base64::engine::general_purpose; + +use httparse::{EMPTY_HEADER, Error as HttpParseError, Response}; +use log::{info}; +use thiserror::Error as ThisError; +use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, BufStream}; +use tokio_native_tls::{native_tls, TlsConnector, TlsStream}; +use tokio_socks::{IntoTargetAddr}; +use tokio_socks::tcp::Socks5Stream; +use tokio_util::codec::Framed; +use url::Url; +use crate::config::Socks5Server; +use crate::{ ResultType}; +use crate::bytes_codec::BytesCodec; +use crate::tcp::{DynTcpStream, FramedStream}; + +#[derive(Debug, ThisError)] +pub enum ProxyError { + #[error("IO Error: {0}")] + IoError(#[from] IoError), + #[error("Target parse error: {0}")] + TargetParseError(String), + #[error("HTTP parse error: {0}")] + HttpParseError(#[from] HttpParseError), + #[error("The maximum response header length is exceeded: {0}")] + MaximumResponseHeaderLengthExceeded(usize), + #[error("The end of file is reached")] + EndOfFile, + #[error("The url is error: {0}")] + UrlBadScheme(String), + #[error("The url parse error: {0}")] + UrlParseScheme(#[from] url::ParseError), + #[error("No HTTP code was found in the response")] + NoHttpCode, + #[error("The HTTP code is not equal 200: {0}")] + HttpCode200(u16), + #[error("The proxy address resolution failed: {0}")] + AddressResolutionFailed(String), +} + +const MAXIMUM_RESPONSE_HEADER_LENGTH: usize = 4096; +/// The maximum HTTP Headers, which can be parsed. +const MAXIMUM_RESPONSE_HEADERS: usize = 16; +const DEFINE_TIME_OUT: u64 = 600; + +pub trait IntoUrl { + // Besides parsing as a valid `Url`, the `Url` must be a valid + // `http::Uri`, in that it makes sense to use in a network request. + fn into_url(self) -> Result; + + fn as_str(&self) -> &str; +} + +impl IntoUrl for Url { + fn into_url(self) -> Result { + if self.has_host() { + Ok(self) + } else { + Err(ProxyError::UrlBadScheme(self.to_string())) + } + } + + fn as_str(&self) -> &str { + self.as_ref() + } +} + +impl<'a> IntoUrl for &'a str { + fn into_url(self) -> Result { + Url::parse(self).map_err(ProxyError::UrlParseScheme)?.into_url() + } + + fn as_str(&self) -> &str { + self + } +} + +impl<'a> IntoUrl for &'a String { + fn into_url(self) -> Result { + (&**self).into_url() + } + + fn as_str(&self) -> &str { + self.as_ref() + } +} + +impl<'a> IntoUrl for String { + fn into_url(self) -> Result { + (&*self).into_url() + } + + fn as_str(&self) -> &str { + self.as_ref() + } +} + +#[derive(Clone)] +pub struct Auth { + user_name: String, + password: String, +} + +impl Auth { + fn get_proxy_authorization(&self) -> String { + let authorization = format!("{}:{}", &self.user_name, &self.password); + let authorization = general_purpose::STANDARD.encode(authorization.as_bytes()); + format!("Proxy-Authorization: Basic {}\r\n", authorization) + } +} + +#[derive(Clone)] +pub enum ProxyScheme { + Http { + auth: Option, + host: String, + }, + Https { + auth: Option, + host: String, + }, + Socks5 { + addr: SocketAddr, + auth: Option, + remote_dns: bool, + }, +} + +impl ProxyScheme { + fn maybe_auth(&self) -> Option<&Auth> { + match self { + ProxyScheme::Http { auth, .. } | + ProxyScheme::Https { auth, .. } | + ProxyScheme::Socks5 { auth, .. } + => auth.as_ref(), + } + } + + fn socks5(addr: SocketAddr) -> Result { + Ok(ProxyScheme::Socks5 { + addr, + auth: None, + remote_dns: false, + }) + } + + fn http(host: &str) -> Result { + Ok(ProxyScheme::Http { + auth: None, + host: host.to_string(), + }) + } + fn https(host: &str) -> Result { + Ok(ProxyScheme::Https { + auth: None, + host: host.to_string(), + }) + } + + + fn set_basic_auth, U: Into>(&mut self, username: T, password: U) { + let auth = Auth { + user_name: username.into(), + password: password.into(), + }; + match self { + ProxyScheme::Http { auth: a, .. } => *a = Some(auth), + ProxyScheme::Https { auth: a, .. } => *a = Some(auth), + ProxyScheme::Socks5 { auth: a, .. } => *a = Some(auth), + } + } + + fn parse(url: Url) -> Result { + use url::Position; + + // Resolve URL to a host and port + let to_addr = || { + let addrs = url + .socket_addrs(|| match url.scheme() { + "socks5" => Some(1080), + _ => None, + })?; + addrs + .into_iter() + .next() + .ok_or_else(|| ProxyError::UrlParseScheme(url::ParseError::EmptyHost)) + }; + + let mut scheme: Self = match url.scheme() { + "http" => Self::http(&url[Position::BeforeHost..Position::AfterPort])?, + "https" => Self::https(&url[Position::BeforeHost..Position::AfterPort])?, + "socks5" => Self::socks5(to_addr()?)?, + e => return Err(ProxyError::UrlBadScheme(e.to_string())), + }; + + if let Some(pwd) = url.password() { + let username = url.username(); + scheme.set_basic_auth(username, pwd); + } + + Ok(scheme) + } + pub async fn socket_addrs(&self) -> Result { + info!("Resolving socket address"); + match self { + ProxyScheme::Http { host, .. } => { + self.resolve_host(host, 80).await + } + ProxyScheme::Https { host, .. } => { + self.resolve_host(host, 443).await + } + ProxyScheme::Socks5 { addr, .. } => { + Ok(addr.clone()) + } + } + } + + async fn resolve_host(&self, host: &str, default_port: u16) -> Result { + let (host_str, port) = match host.split_once(':') { + Some((h, p)) => (h, p.parse::().ok()), + None => (host, None), + }; + let addr = (host_str, port.unwrap_or(default_port)) + .to_socket_addrs()? + .next() + .ok_or_else(|| ProxyError::AddressResolutionFailed(host.to_string()))?; + Ok(addr) + } + + pub fn get_domain(&self) -> Result { + match self { + ProxyScheme::Http { host, .. } | ProxyScheme::Https { host, .. } => { + let domain = host.split(':').next().ok_or_else(|| { + ProxyError::AddressResolutionFailed(host.clone()) + })?; + Ok(domain.to_string()) + } + ProxyScheme::Socks5 { addr, .. } => { + match addr { + SocketAddr::V4(addr_v4) => Ok(addr_v4.ip().to_string()), + SocketAddr::V6(addr_v6) => Ok(addr_v6.ip().to_string()), + } + } + } + } +} + + +pub trait IntoProxyScheme { + fn into_proxy_scheme(self) -> Result; +} + +impl IntoProxyScheme for S { + fn into_proxy_scheme(self) -> Result { + // validate the URL + let url = match self.as_str().into_url() { + Ok(ok) => ok, + Err(e) => { + match e { + ProxyError::UrlParseScheme(_source) => { + let try_this = format!("socks5://{}", self.as_str()); + try_this.into_url()? + } + _ => { + return Err(e); + } + } + } + }; + ProxyScheme::parse(url) + } +} + +impl IntoProxyScheme for ProxyScheme { + fn into_proxy_scheme(self) -> Result { + Ok(self) + } +} + +#[derive(Clone)] +pub struct Proxy { + intercept: ProxyScheme, + ms_timeout: u64, +} + +impl Proxy { + pub fn new(proxy_scheme: U, ms_timeout: u64) -> Result { + Ok(Self { + intercept: proxy_scheme.into_proxy_scheme()?, + ms_timeout, + }) + } + + pub fn is_http_or_https(&self) -> bool { + return match self.intercept { + ProxyScheme::Socks5 {..} => false, + _=> true + } + } + + pub fn form_conf(conf: &Socks5Server, ms_timeout: Option) -> Result { + let mut proxy; + match ms_timeout { + None => {proxy= Self::new(&conf.proxy, DEFINE_TIME_OUT)?;} + Some(time_out) => {proxy= Self::new(&conf.proxy, time_out)?;} + } + + if !conf.password.is_empty() && !conf.username.is_empty() { + proxy = proxy.basic_auth(&conf.username, &conf.password); + } + Ok(proxy) + } + + pub async fn proxy_addrs(&self) -> Result { + self.intercept.socket_addrs().await + } + + fn basic_auth(mut self, username: &str, password: &str) -> Proxy { + self.intercept.set_basic_auth(username, password); + self + } + + pub async fn connect<'t, T>(self,target: T, + local_addr: Option) -> ResultType + where T: IntoTargetAddr<'t>, + { + info!("Connect to proxy server"); + let proxy = self.proxy_addrs().await?; + + let local = if let Some(addr) = local_addr { + addr + } else { + crate::config::Config::get_any_listen_addr(proxy.is_ipv4()) + }; + + + let stream = super::timeout(self.ms_timeout, + crate::tcp::new_socket(local, true)?.connect(proxy)).await??; + stream.set_nodelay(true).ok(); + + let addr = stream.local_addr()?; + + return match self.intercept { + ProxyScheme::Http { .. } => { + info!("Connect to remote http proxy server: {}",proxy); + let stream = super::timeout( + self.ms_timeout, + self.http_connect(stream, target), + ).await??; + Ok(FramedStream( + Framed::new(DynTcpStream(Box::new(stream)), BytesCodec::new()), + addr, None, 0, + )) + } + ProxyScheme::Https { .. } => { + info!("Connect to remote https proxy server: {}",proxy); + let stream = super::timeout( + self.ms_timeout, + self.https_connect(stream, target), + ).await??; + Ok(FramedStream( + Framed::new(DynTcpStream(Box::new(stream)), BytesCodec::new()), + addr, None, 0, + )) + } + ProxyScheme::Socks5 { .. } => { + info!("Connect to remote socket5 proxy server: {}",proxy); + let stream = if let Some(auth) = self.intercept.maybe_auth() { + super::timeout( + self.ms_timeout, + Socks5Stream::connect_with_password_and_socket( + stream, target, &auth.user_name, &auth.password, + ), + ).await?? + } else { + super::timeout(self.ms_timeout, Socks5Stream::connect_with_socket(stream, target)).await?? + }; + Ok(FramedStream( + Framed::new(DynTcpStream(Box::new(stream)), BytesCodec::new()), + addr, None, 0, + )) + } + }; + } + + + pub async fn https_connect<'a, Input, T>(self, io: Input, target: T) -> Result>, ProxyError> + where + Input: AsyncRead + AsyncWrite + Unpin, + T: IntoTargetAddr<'a> { + + // tls 进行握手 + let tls_connector = TlsConnector::from(native_tls::TlsConnector::new().unwrap()); + let stream = tls_connector.connect(&self.intercept.get_domain()?, io).await.unwrap(); + self.http_connect(stream, target).await + } + + pub async fn http_connect<'a, Input, T>(self, io: Input, target: T) -> Result, ProxyError> + where + Input: AsyncRead + AsyncWrite + Unpin, T: IntoTargetAddr<'a> { + let mut stream = BufStream::new(io); + let (domain, port) = get_domain_and_port(target)?; + + let request = self.make_request(&domain, port); + stream.write_all(request.as_bytes()).await?; + stream.flush().await?; + recv_and_check_response(&mut stream).await?; + Ok(stream) + } + + fn make_request(&self, host: &str, port: u16) -> String { + let mut request = format!( + "CONNECT {host}:{port} HTTP/1.1\r\nHost: {host}:{port}\r\n", + host = host, port = port); + + if let Some(auth) = self.intercept.maybe_auth() { + request = format!("{}{}", request, auth.get_proxy_authorization()); + } + + request.push_str("\r\n"); + request + } +} + + +fn get_domain_and_port<'a, T: IntoTargetAddr<'a>>(target: T) -> Result<(String, u16), ProxyError> { + let target_addr = target.into_target_addr().map_err(|e| ProxyError::TargetParseError(e.to_string()))?; + match target_addr { + tokio_socks::TargetAddr::Ip(addr) => Ok((addr.ip().to_string(), addr.port())), + tokio_socks::TargetAddr::Domain(name, port) => Ok((name.to_string(), port)), + } +} + + +async fn get_response(stream: &mut BufStream) -> Result + where + IO: AsyncRead + AsyncWrite + Unpin, +{ + use tokio::io::AsyncBufReadExt; + let mut response = String::new(); + + loop { + if stream.read_line(&mut response).await? == 0 { + return Err(ProxyError::EndOfFile); + } + + if MAXIMUM_RESPONSE_HEADER_LENGTH < response.len() { + return Err(ProxyError::MaximumResponseHeaderLengthExceeded(response.len())); + } + + if response.ends_with("\r\n\r\n") { + return Ok(response); + } + } +} + + +async fn recv_and_check_response( + stream: &mut BufStream +) -> Result<(), ProxyError> + where + IO: AsyncRead + AsyncWrite + Unpin, +{ + let response_string = get_response(stream).await?; + + let mut response_headers = [EMPTY_HEADER; MAXIMUM_RESPONSE_HEADERS]; + let mut response = Response::new(&mut response_headers); + let response_bytes = response_string.into_bytes(); + response.parse(&response_bytes)?; + + return match response.code { + Some(code) => { + if code == 200 { + Ok(()) + } else { + Err(ProxyError::HttpCode200(code)) + } + } + None => Err(ProxyError::NoHttpCode), + }; +} \ No newline at end of file diff --git a/libs/hbb_common/src/socket_client.rs b/libs/hbb_common/src/socket_client.rs index 2d9b5a984..634f49a65 100644 --- a/libs/hbb_common/src/socket_client.rs +++ b/libs/hbb_common/src/socket_client.rs @@ -108,11 +108,9 @@ pub async fn connect_tcp_local< ) -> ResultType { if let Some(conf) = Config::get_socks() { return FramedStream::connect( - conf.proxy.as_str(), target, local, - conf.username.as_str(), - conf.password.as_str(), + &conf, ms_timeout, ) .await; diff --git a/libs/hbb_common/src/tcp.rs b/libs/hbb_common/src/tcp.rs index 71aa46ec4..bcc98c118 100644 --- a/libs/hbb_common/src/tcp.rs +++ b/libs/hbb_common/src/tcp.rs @@ -14,24 +14,27 @@ use std::{ pin::Pin, task::{Context, Poll}, }; +use log::info; use tokio::{ io::{AsyncRead, AsyncWrite, ReadBuf}, net::{lookup_host, TcpListener, TcpSocket, ToSocketAddrs}, }; use tokio_socks::{tcp::Socks5Stream, IntoTargetAddr, ToProxyAddrs}; use tokio_util::codec::Framed; +use crate::config::Socks5Server; +use crate::proxy::Proxy; pub trait TcpStreamTrait: AsyncRead + AsyncWrite + Unpin {} -pub struct DynTcpStream(Box); +pub struct DynTcpStream(pub(crate) Box); #[derive(Clone)] pub struct Encrypt(Key, u64, u64); pub struct FramedStream( - Framed, - SocketAddr, - Option, - u64, + pub(crate) Framed, + pub(crate)SocketAddr, + pub(crate)Option, + pub(crate)u64, ); impl Deref for FramedStream { @@ -62,7 +65,7 @@ impl DerefMut for DynTcpStream { } } -fn new_socket(addr: std::net::SocketAddr, reuse: bool) -> Result { +pub(crate) fn new_socket(addr: std::net::SocketAddr, reuse: bool) -> Result { let socket = match addr { std::net::SocketAddr::V4(..) => TcpSocket::new_v4()?, std::net::SocketAddr::V6(..) => TcpSocket::new_v6()?, @@ -109,51 +112,17 @@ impl FramedStream { bail!(format!("Failed to connect to {remote_addr}")); } - pub async fn connect<'a, 't, P, T>( - proxy: P, + pub async fn connect<'t, T>( target: T, local_addr: Option, - username: &'a str, - password: &'a str, + proxy_conf: &Socks5Server, ms_timeout: u64, ) -> ResultType - where - P: ToProxyAddrs, - T: IntoTargetAddr<'t>, + where + T: IntoTargetAddr<'t>, { - if let Some(Ok(proxy)) = proxy.to_proxy_addrs().next().await { - let local = if let Some(addr) = local_addr { - addr - } else { - crate::config::Config::get_any_listen_addr(proxy.is_ipv4()) - }; - let stream = - super::timeout(ms_timeout, new_socket(local, true)?.connect(proxy)).await??; - stream.set_nodelay(true).ok(); - let stream = if username.trim().is_empty() { - super::timeout( - ms_timeout, - Socks5Stream::connect_with_socket(stream, target), - ) - .await?? - } else { - super::timeout( - ms_timeout, - Socks5Stream::connect_with_password_and_socket( - stream, target, username, password, - ), - ) - .await?? - }; - let addr = stream.local_addr()?; - return Ok(Self( - Framed::new(DynTcpStream(Box::new(stream)), BytesCodec::new()), - addr, - None, - 0, - )); - } - bail!("could not resolve to any address"); + let proxy = Proxy::form_conf(proxy_conf, Some(ms_timeout))?; + proxy.connect::(target, local_addr).await } pub fn local_addr(&self) -> SocketAddr { diff --git a/src/rendezvous_mediator.rs b/src/rendezvous_mediator.rs index 0bdd6388d..101649981 100644 --- a/src/rendezvous_mediator.rs +++ b/src/rendezvous_mediator.rs @@ -28,6 +28,8 @@ use hbb_common::{ udp::FramedSocket, AddrMangle, IntoTargetAddr, ResultType, TargetAddr, }; +use hbb_common::log::info; +use hbb_common::proxy::Proxy; use crate::{ check_port, @@ -387,8 +389,15 @@ impl RendezvousMediator { } pub async fn start(server: ServerPtr, host: String) -> ResultType<()> { - log::info!("start rendezvous mediator of {}", host); - if cfg!(debug_assertions) && option_env!("TEST_TCP").is_some() { + info!("start rendezvous mediator of {}", host); + //If the investment agent type is http or https, then tcp forwarding is enabled. + let is_http_proxy = if let Some(conf) = Config::get_socks() { + let proxy = Proxy::form_conf(&conf, None)?; + proxy.is_http_or_https() + } else { + false + }; + if (cfg!(debug_assertions) && option_env!("TEST_TCP").is_some()) || is_http_proxy { Self::start_tcp(server, host).await } else { Self::start_udp(server, host).await