add http(s) proxy

This commit is contained in:
yuluo
2024-04-03 03:07:47 +08:00
parent 41d99d5108
commit 0bf3a91352
7 changed files with 525 additions and 52 deletions

5
Cargo.lock generated
View File

@@ -2917,6 +2917,7 @@ version = "0.1.0"
dependencies = [
"anyhow",
"backtrace",
"base64",
"bytes",
"chrono",
"confy",
@@ -2928,6 +2929,7 @@ dependencies = [
"flexi_logger",
"futures",
"futures-util",
"httparse",
"lazy_static",
"libc",
"log",
@@ -2945,10 +2947,13 @@ dependencies = [
"socket2 0.3.19",
"sodiumoxide",
"sysinfo",
"thiserror",
"tokio",
"tokio-native-tls",
"tokio-socks",
"tokio-util",
"toml 0.7.8",
"url",
"uuid",
"winapi 0.3.9",
"zstd 0.13.0",

View File

@@ -40,7 +40,11 @@ toml = "0.7"
uuid = { version = "1.3", features = ["v4"] }
# crash, versions >= 0.29.1 are affected by #GuillaumeGomez/sysinfo/1052
sysinfo = { git = "https://github.com/rustdesk-org/sysinfo" }
thiserror = "1.0.30"
httparse = "1.5.1"
base64 = "0.21.5"
url = "2.2.2"
tokio-native-tls ="0.3.1"
[target.'cfg(not(any(target_os = "android", target_os = "ios")))'.dependencies]
mac_address = "1.1"
machine-uid = { git = "https://github.com/21pages/machine-uid" }

View File

@@ -18,6 +18,7 @@ pub use tokio;
pub use tokio_util;
pub mod socket_client;
pub mod tcp;
pub mod proxy;
pub mod udp;
pub use env_logger;
pub use log;

View File

@@ -0,0 +1,487 @@
use std::io::{Error as IoError};
use std::net::{SocketAddr, ToSocketAddrs};
use base64::Engine;
use base64::engine::general_purpose;
use httparse::{EMPTY_HEADER, Error as HttpParseError, Response};
use log::{info};
use thiserror::Error as ThisError;
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, BufStream};
use tokio_native_tls::{native_tls, TlsConnector, TlsStream};
use tokio_socks::{IntoTargetAddr};
use tokio_socks::tcp::Socks5Stream;
use tokio_util::codec::Framed;
use url::Url;
use crate::config::Socks5Server;
use crate::{ ResultType};
use crate::bytes_codec::BytesCodec;
use crate::tcp::{DynTcpStream, FramedStream};
#[derive(Debug, ThisError)]
pub enum ProxyError {
#[error("IO Error: {0}")]
IoError(#[from] IoError),
#[error("Target parse error: {0}")]
TargetParseError(String),
#[error("HTTP parse error: {0}")]
HttpParseError(#[from] HttpParseError),
#[error("The maximum response header length is exceeded: {0}")]
MaximumResponseHeaderLengthExceeded(usize),
#[error("The end of file is reached")]
EndOfFile,
#[error("The url is error: {0}")]
UrlBadScheme(String),
#[error("The url parse error: {0}")]
UrlParseScheme(#[from] url::ParseError),
#[error("No HTTP code was found in the response")]
NoHttpCode,
#[error("The HTTP code is not equal 200: {0}")]
HttpCode200(u16),
#[error("The proxy address resolution failed: {0}")]
AddressResolutionFailed(String),
}
const MAXIMUM_RESPONSE_HEADER_LENGTH: usize = 4096;
/// The maximum HTTP Headers, which can be parsed.
const MAXIMUM_RESPONSE_HEADERS: usize = 16;
const DEFINE_TIME_OUT: u64 = 600;
pub trait IntoUrl {
// Besides parsing as a valid `Url`, the `Url` must be a valid
// `http::Uri`, in that it makes sense to use in a network request.
fn into_url(self) -> Result<Url, ProxyError>;
fn as_str(&self) -> &str;
}
impl IntoUrl for Url {
fn into_url(self) -> Result<Url, ProxyError> {
if self.has_host() {
Ok(self)
} else {
Err(ProxyError::UrlBadScheme(self.to_string()))
}
}
fn as_str(&self) -> &str {
self.as_ref()
}
}
impl<'a> IntoUrl for &'a str {
fn into_url(self) -> Result<Url, ProxyError> {
Url::parse(self).map_err(ProxyError::UrlParseScheme)?.into_url()
}
fn as_str(&self) -> &str {
self
}
}
impl<'a> IntoUrl for &'a String {
fn into_url(self) -> Result<Url, ProxyError> {
(&**self).into_url()
}
fn as_str(&self) -> &str {
self.as_ref()
}
}
impl<'a> IntoUrl for String {
fn into_url(self) -> Result<Url, ProxyError> {
(&*self).into_url()
}
fn as_str(&self) -> &str {
self.as_ref()
}
}
#[derive(Clone)]
pub struct Auth {
user_name: String,
password: String,
}
impl Auth {
fn get_proxy_authorization(&self) -> String {
let authorization = format!("{}:{}", &self.user_name, &self.password);
let authorization = general_purpose::STANDARD.encode(authorization.as_bytes());
format!("Proxy-Authorization: Basic {}\r\n", authorization)
}
}
#[derive(Clone)]
pub enum ProxyScheme {
Http {
auth: Option<Auth>,
host: String,
},
Https {
auth: Option<Auth>,
host: String,
},
Socks5 {
addr: SocketAddr,
auth: Option<Auth>,
remote_dns: bool,
},
}
impl ProxyScheme {
fn maybe_auth(&self) -> Option<&Auth> {
match self {
ProxyScheme::Http { auth, .. } |
ProxyScheme::Https { auth, .. } |
ProxyScheme::Socks5 { auth, .. }
=> auth.as_ref(),
}
}
fn socks5(addr: SocketAddr) -> Result<Self, ProxyError> {
Ok(ProxyScheme::Socks5 {
addr,
auth: None,
remote_dns: false,
})
}
fn http(host: &str) -> Result<Self, ProxyError> {
Ok(ProxyScheme::Http {
auth: None,
host: host.to_string(),
})
}
fn https(host: &str) -> Result<Self, ProxyError> {
Ok(ProxyScheme::Https {
auth: None,
host: host.to_string(),
})
}
fn set_basic_auth<T: Into<String>, U: Into<String>>(&mut self, username: T, password: U) {
let auth = Auth {
user_name: username.into(),
password: password.into(),
};
match self {
ProxyScheme::Http { auth: a, .. } => *a = Some(auth),
ProxyScheme::Https { auth: a, .. } => *a = Some(auth),
ProxyScheme::Socks5 { auth: a, .. } => *a = Some(auth),
}
}
fn parse(url: Url) -> Result<Self, ProxyError> {
use url::Position;
// Resolve URL to a host and port
let to_addr = || {
let addrs = url
.socket_addrs(|| match url.scheme() {
"socks5" => Some(1080),
_ => None,
})?;
addrs
.into_iter()
.next()
.ok_or_else(|| ProxyError::UrlParseScheme(url::ParseError::EmptyHost))
};
let mut scheme: Self = match url.scheme() {
"http" => Self::http(&url[Position::BeforeHost..Position::AfterPort])?,
"https" => Self::https(&url[Position::BeforeHost..Position::AfterPort])?,
"socks5" => Self::socks5(to_addr()?)?,
e => return Err(ProxyError::UrlBadScheme(e.to_string())),
};
if let Some(pwd) = url.password() {
let username = url.username();
scheme.set_basic_auth(username, pwd);
}
Ok(scheme)
}
pub async fn socket_addrs(&self) -> Result<SocketAddr, ProxyError> {
info!("Resolving socket address");
match self {
ProxyScheme::Http { host, .. } => {
self.resolve_host(host, 80).await
}
ProxyScheme::Https { host, .. } => {
self.resolve_host(host, 443).await
}
ProxyScheme::Socks5 { addr, .. } => {
Ok(addr.clone())
}
}
}
async fn resolve_host(&self, host: &str, default_port: u16) -> Result<SocketAddr, ProxyError> {
let (host_str, port) = match host.split_once(':') {
Some((h, p)) => (h, p.parse::<u16>().ok()),
None => (host, None),
};
let addr = (host_str, port.unwrap_or(default_port))
.to_socket_addrs()?
.next()
.ok_or_else(|| ProxyError::AddressResolutionFailed(host.to_string()))?;
Ok(addr)
}
pub fn get_domain(&self) -> Result<String, ProxyError> {
match self {
ProxyScheme::Http { host, .. } | ProxyScheme::Https { host, .. } => {
let domain = host.split(':').next().ok_or_else(|| {
ProxyError::AddressResolutionFailed(host.clone())
})?;
Ok(domain.to_string())
}
ProxyScheme::Socks5 { addr, .. } => {
match addr {
SocketAddr::V4(addr_v4) => Ok(addr_v4.ip().to_string()),
SocketAddr::V6(addr_v6) => Ok(addr_v6.ip().to_string()),
}
}
}
}
}
pub trait IntoProxyScheme {
fn into_proxy_scheme(self) -> Result<ProxyScheme, ProxyError>;
}
impl<S: IntoUrl> IntoProxyScheme for S {
fn into_proxy_scheme(self) -> Result<ProxyScheme, ProxyError> {
// validate the URL
let url = match self.as_str().into_url() {
Ok(ok) => ok,
Err(e) => {
match e {
ProxyError::UrlParseScheme(_source) => {
let try_this = format!("socks5://{}", self.as_str());
try_this.into_url()?
}
_ => {
return Err(e);
}
}
}
};
ProxyScheme::parse(url)
}
}
impl IntoProxyScheme for ProxyScheme {
fn into_proxy_scheme(self) -> Result<ProxyScheme, ProxyError> {
Ok(self)
}
}
#[derive(Clone)]
pub struct Proxy {
intercept: ProxyScheme,
ms_timeout: u64,
}
impl Proxy {
pub fn new<U: IntoProxyScheme>(proxy_scheme: U, ms_timeout: u64) -> Result<Self, ProxyError> {
Ok(Self {
intercept: proxy_scheme.into_proxy_scheme()?,
ms_timeout,
})
}
pub fn is_http_or_https(&self) -> bool {
return match self.intercept {
ProxyScheme::Socks5 {..} => false,
_=> true
}
}
pub fn form_conf(conf: &Socks5Server, ms_timeout: Option<u64>) -> Result<Self, ProxyError> {
let mut proxy;
match ms_timeout {
None => {proxy= Self::new(&conf.proxy, DEFINE_TIME_OUT)?;}
Some(time_out) => {proxy= Self::new(&conf.proxy, time_out)?;}
}
if !conf.password.is_empty() && !conf.username.is_empty() {
proxy = proxy.basic_auth(&conf.username, &conf.password);
}
Ok(proxy)
}
pub async fn proxy_addrs(&self) -> Result<SocketAddr, ProxyError> {
self.intercept.socket_addrs().await
}
fn basic_auth(mut self, username: &str, password: &str) -> Proxy {
self.intercept.set_basic_auth(username, password);
self
}
pub async fn connect<'t, T>(self,target: T,
local_addr: Option<SocketAddr>) -> ResultType<FramedStream>
where T: IntoTargetAddr<'t>,
{
info!("Connect to proxy server");
let proxy = self.proxy_addrs().await?;
let local = if let Some(addr) = local_addr {
addr
} else {
crate::config::Config::get_any_listen_addr(proxy.is_ipv4())
};
let stream = super::timeout(self.ms_timeout,
crate::tcp::new_socket(local, true)?.connect(proxy)).await??;
stream.set_nodelay(true).ok();
let addr = stream.local_addr()?;
return match self.intercept {
ProxyScheme::Http { .. } => {
info!("Connect to remote http proxy server: {}",proxy);
let stream = super::timeout(
self.ms_timeout,
self.http_connect(stream, target),
).await??;
Ok(FramedStream(
Framed::new(DynTcpStream(Box::new(stream)), BytesCodec::new()),
addr, None, 0,
))
}
ProxyScheme::Https { .. } => {
info!("Connect to remote https proxy server: {}",proxy);
let stream = super::timeout(
self.ms_timeout,
self.https_connect(stream, target),
).await??;
Ok(FramedStream(
Framed::new(DynTcpStream(Box::new(stream)), BytesCodec::new()),
addr, None, 0,
))
}
ProxyScheme::Socks5 { .. } => {
info!("Connect to remote socket5 proxy server: {}",proxy);
let stream = if let Some(auth) = self.intercept.maybe_auth() {
super::timeout(
self.ms_timeout,
Socks5Stream::connect_with_password_and_socket(
stream, target, &auth.user_name, &auth.password,
),
).await??
} else {
super::timeout(self.ms_timeout, Socks5Stream::connect_with_socket(stream, target)).await??
};
Ok(FramedStream(
Framed::new(DynTcpStream(Box::new(stream)), BytesCodec::new()),
addr, None, 0,
))
}
};
}
pub async fn https_connect<'a, Input, T>(self, io: Input, target: T) -> Result<BufStream<TlsStream<Input>>, ProxyError>
where
Input: AsyncRead + AsyncWrite + Unpin,
T: IntoTargetAddr<'a> {
// tls 进行握手
let tls_connector = TlsConnector::from(native_tls::TlsConnector::new().unwrap());
let stream = tls_connector.connect(&self.intercept.get_domain()?, io).await.unwrap();
self.http_connect(stream, target).await
}
pub async fn http_connect<'a, Input, T>(self, io: Input, target: T) -> Result<BufStream<Input>, ProxyError>
where
Input: AsyncRead + AsyncWrite + Unpin, T: IntoTargetAddr<'a> {
let mut stream = BufStream::new(io);
let (domain, port) = get_domain_and_port(target)?;
let request = self.make_request(&domain, port);
stream.write_all(request.as_bytes()).await?;
stream.flush().await?;
recv_and_check_response(&mut stream).await?;
Ok(stream)
}
fn make_request(&self, host: &str, port: u16) -> String {
let mut request = format!(
"CONNECT {host}:{port} HTTP/1.1\r\nHost: {host}:{port}\r\n",
host = host, port = port);
if let Some(auth) = self.intercept.maybe_auth() {
request = format!("{}{}", request, auth.get_proxy_authorization());
}
request.push_str("\r\n");
request
}
}
fn get_domain_and_port<'a, T: IntoTargetAddr<'a>>(target: T) -> Result<(String, u16), ProxyError> {
let target_addr = target.into_target_addr().map_err(|e| ProxyError::TargetParseError(e.to_string()))?;
match target_addr {
tokio_socks::TargetAddr::Ip(addr) => Ok((addr.ip().to_string(), addr.port())),
tokio_socks::TargetAddr::Domain(name, port) => Ok((name.to_string(), port)),
}
}
async fn get_response<IO>(stream: &mut BufStream<IO>) -> Result<String, ProxyError>
where
IO: AsyncRead + AsyncWrite + Unpin,
{
use tokio::io::AsyncBufReadExt;
let mut response = String::new();
loop {
if stream.read_line(&mut response).await? == 0 {
return Err(ProxyError::EndOfFile);
}
if MAXIMUM_RESPONSE_HEADER_LENGTH < response.len() {
return Err(ProxyError::MaximumResponseHeaderLengthExceeded(response.len()));
}
if response.ends_with("\r\n\r\n") {
return Ok(response);
}
}
}
async fn recv_and_check_response<IO>(
stream: &mut BufStream<IO>
) -> Result<(), ProxyError>
where
IO: AsyncRead + AsyncWrite + Unpin,
{
let response_string = get_response(stream).await?;
let mut response_headers = [EMPTY_HEADER; MAXIMUM_RESPONSE_HEADERS];
let mut response = Response::new(&mut response_headers);
let response_bytes = response_string.into_bytes();
response.parse(&response_bytes)?;
return match response.code {
Some(code) => {
if code == 200 {
Ok(())
} else {
Err(ProxyError::HttpCode200(code))
}
}
None => Err(ProxyError::NoHttpCode),
};
}

View File

@@ -108,11 +108,9 @@ pub async fn connect_tcp_local<
) -> ResultType<FramedStream> {
if let Some(conf) = Config::get_socks() {
return FramedStream::connect(
conf.proxy.as_str(),
target,
local,
conf.username.as_str(),
conf.password.as_str(),
&conf,
ms_timeout,
)
.await;

View File

@@ -14,24 +14,27 @@ use std::{
pin::Pin,
task::{Context, Poll},
};
use log::info;
use tokio::{
io::{AsyncRead, AsyncWrite, ReadBuf},
net::{lookup_host, TcpListener, TcpSocket, ToSocketAddrs},
};
use tokio_socks::{tcp::Socks5Stream, IntoTargetAddr, ToProxyAddrs};
use tokio_util::codec::Framed;
use crate::config::Socks5Server;
use crate::proxy::Proxy;
pub trait TcpStreamTrait: AsyncRead + AsyncWrite + Unpin {}
pub struct DynTcpStream(Box<dyn TcpStreamTrait + Send + Sync>);
pub struct DynTcpStream(pub(crate) Box<dyn TcpStreamTrait + Send + Sync>);
#[derive(Clone)]
pub struct Encrypt(Key, u64, u64);
pub struct FramedStream(
Framed<DynTcpStream, BytesCodec>,
SocketAddr,
Option<Encrypt>,
u64,
pub(crate) Framed<DynTcpStream, BytesCodec>,
pub(crate)SocketAddr,
pub(crate)Option<Encrypt>,
pub(crate)u64,
);
impl Deref for FramedStream {
@@ -62,7 +65,7 @@ impl DerefMut for DynTcpStream {
}
}
fn new_socket(addr: std::net::SocketAddr, reuse: bool) -> Result<TcpSocket, std::io::Error> {
pub(crate) fn new_socket(addr: std::net::SocketAddr, reuse: bool) -> Result<TcpSocket, std::io::Error> {
let socket = match addr {
std::net::SocketAddr::V4(..) => TcpSocket::new_v4()?,
std::net::SocketAddr::V6(..) => TcpSocket::new_v6()?,
@@ -109,51 +112,17 @@ impl FramedStream {
bail!(format!("Failed to connect to {remote_addr}"));
}
pub async fn connect<'a, 't, P, T>(
proxy: P,
pub async fn connect<'t, T>(
target: T,
local_addr: Option<SocketAddr>,
username: &'a str,
password: &'a str,
proxy_conf: &Socks5Server,
ms_timeout: u64,
) -> ResultType<Self>
where
P: ToProxyAddrs,
T: IntoTargetAddr<'t>,
where
T: IntoTargetAddr<'t>,
{
if let Some(Ok(proxy)) = proxy.to_proxy_addrs().next().await {
let local = if let Some(addr) = local_addr {
addr
} else {
crate::config::Config::get_any_listen_addr(proxy.is_ipv4())
};
let stream =
super::timeout(ms_timeout, new_socket(local, true)?.connect(proxy)).await??;
stream.set_nodelay(true).ok();
let stream = if username.trim().is_empty() {
super::timeout(
ms_timeout,
Socks5Stream::connect_with_socket(stream, target),
)
.await??
} else {
super::timeout(
ms_timeout,
Socks5Stream::connect_with_password_and_socket(
stream, target, username, password,
),
)
.await??
};
let addr = stream.local_addr()?;
return Ok(Self(
Framed::new(DynTcpStream(Box::new(stream)), BytesCodec::new()),
addr,
None,
0,
));
}
bail!("could not resolve to any address");
let proxy = Proxy::form_conf(proxy_conf, Some(ms_timeout))?;
proxy.connect::<T>(target, local_addr).await
}
pub fn local_addr(&self) -> SocketAddr {

View File

@@ -28,6 +28,8 @@ use hbb_common::{
udp::FramedSocket,
AddrMangle, IntoTargetAddr, ResultType, TargetAddr,
};
use hbb_common::log::info;
use hbb_common::proxy::Proxy;
use crate::{
check_port,
@@ -387,8 +389,15 @@ impl RendezvousMediator {
}
pub async fn start(server: ServerPtr, host: String) -> ResultType<()> {
log::info!("start rendezvous mediator of {}", host);
if cfg!(debug_assertions) && option_env!("TEST_TCP").is_some() {
info!("start rendezvous mediator of {}", host);
//If the investment agent type is http or https, then tcp forwarding is enabled.
let is_http_proxy = if let Some(conf) = Config::get_socks() {
let proxy = Proxy::form_conf(&conf, None)?;
proxy.is_http_or_https()
} else {
false
};
if (cfg!(debug_assertions) && option_env!("TEST_TCP").is_some()) || is_http_proxy {
Self::start_tcp(server, host).await
} else {
Self::start_udp(server, host).await