fix: add async http proxy func and format the code

This commit is contained in:
yuluo
2024-04-16 00:24:02 +08:00
parent c3b8621554
commit 1176750a4f
12 changed files with 232 additions and 174 deletions

View File

@@ -16,9 +16,9 @@ use std::{
};
pub use tokio;
pub use tokio_util;
pub mod proxy;
pub mod socket_client;
pub mod tcp;
pub mod proxy;
pub mod udp;
pub use env_logger;
pub use log;

View File

@@ -1,26 +1,26 @@
use crate::bytes_codec::BytesCodec;
use crate::config::Socks5Server;
use crate::tcp::{DynTcpStream, FramedStream};
use crate::ResultType;
use base64::engine::general_purpose;
use base64::Engine;
use httparse::{Error as HttpParseError, Response, EMPTY_HEADER};
use log::info;
use rustls_pki_types;
use std::convert::TryFrom;
use std::io::{Error as IoError};
use std::io::Error as IoError;
use std::net::{SocketAddr, ToSocketAddrs};
use std::sync::Arc;
use base64::Engine;
use base64::engine::general_purpose;
use httparse::{EMPTY_HEADER, Error as HttpParseError, Response};
use log::{info};
use thiserror::Error as ThisError;
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, BufStream};
use tokio_socks::{IntoTargetAddr};
use tokio_socks::tcp::Socks5Stream;
use tokio_util::codec::Framed;
use url::Url;
use crate::config::Socks5Server;
use crate::{ResultType};
use crate::bytes_codec::BytesCodec;
use crate::tcp::{DynTcpStream, FramedStream};
#[cfg(any(target_os = "windows", target_os = "macos"))]
use tokio_native_tls::{native_tls, TlsConnector, TlsStream};
#[cfg(not(any(target_os = "windows", target_os = "macos")))]
use tokio_rustls::{rustls, TlsConnector, client::TlsStream };
use rustls_pki_types;
use tokio_rustls::{client::TlsStream, rustls, TlsConnector};
use tokio_socks::tcp::Socks5Stream;
use tokio_socks::IntoTargetAddr;
use tokio_util::codec::Framed;
use url::Url;
#[derive(Debug, ThisError)]
pub enum ProxyError {
@@ -75,7 +75,9 @@ impl IntoUrl for Url {
impl<'a> IntoUrl for &'a str {
fn into_url(self) -> Result<Url, ProxyError> {
Url::parse(self).map_err(ProxyError::UrlParseScheme)?.into_url()
Url::parse(self)
.map_err(ProxyError::UrlParseScheme)?
.into_url()
}
fn as_str(&self) -> &str {
@@ -111,7 +113,10 @@ pub struct Auth {
impl Auth {
fn get_proxy_authorization(&self) -> String {
format!("Proxy-Authorization: Basic {}\r\n", self.get_basic_authorization())
format!(
"Proxy-Authorization: Basic {}\r\n",
self.get_basic_authorization()
)
}
pub fn get_basic_authorization(&self) -> String {
@@ -140,10 +145,9 @@ pub enum ProxyScheme {
impl ProxyScheme {
pub fn maybe_auth(&self) -> Option<&Auth> {
match self {
ProxyScheme::Http { auth, .. } |
ProxyScheme::Https { auth, .. } |
ProxyScheme::Socks5 { auth, .. }
=> auth.as_ref(),
ProxyScheme::Http { auth, .. }
| ProxyScheme::Https { auth, .. }
| ProxyScheme::Socks5 { auth, .. } => auth.as_ref(),
}
}
@@ -168,7 +172,6 @@ impl ProxyScheme {
})
}
fn set_basic_auth<T: Into<String>, U: Into<String>>(&mut self, username: T, password: U) {
let auth = Auth {
user_name: username.into(),
@@ -186,11 +189,10 @@ impl ProxyScheme {
// Resolve URL to a host and port
let to_addr = || {
let addrs = url
.socket_addrs(|| match url.scheme() {
"socks5" => Some(1080),
_ => None,
})?;
let addrs = url.socket_addrs(|| match url.scheme() {
"socks5" => Some(1080),
_ => None,
})?;
addrs
.into_iter()
.next()
@@ -214,15 +216,9 @@ impl ProxyScheme {
pub async fn socket_addrs(&self) -> Result<SocketAddr, ProxyError> {
info!("Resolving socket address");
match self {
ProxyScheme::Http { host, .. } => {
self.resolve_host(host, 80).await
}
ProxyScheme::Https { host, .. } => {
self.resolve_host(host, 443).await
}
ProxyScheme::Socks5 { addr, .. } => {
Ok(addr.clone())
}
ProxyScheme::Http { host, .. } => self.resolve_host(host, 80).await,
ProxyScheme::Https { host, .. } => self.resolve_host(host, 443).await,
ProxyScheme::Socks5 { addr, .. } => Ok(addr.clone()),
}
}
@@ -241,30 +237,23 @@ impl ProxyScheme {
pub fn get_domain(&self) -> Result<String, ProxyError> {
match self {
ProxyScheme::Http { host, .. } | ProxyScheme::Https { host, .. } => {
let domain = host.split(':').next().ok_or_else(|| {
ProxyError::AddressResolutionFailed(host.clone())
})?;
let domain = host
.split(':')
.next()
.ok_or_else(|| ProxyError::AddressResolutionFailed(host.clone()))?;
Ok(domain.to_string())
}
ProxyScheme::Socks5 { addr, .. } => {
match addr {
SocketAddr::V4(addr_v4) => Ok(addr_v4.ip().to_string()),
SocketAddr::V6(addr_v6) => Ok(addr_v6.ip().to_string()),
}
}
ProxyScheme::Socks5 { addr, .. } => match addr {
SocketAddr::V4(addr_v4) => Ok(addr_v4.ip().to_string()),
SocketAddr::V6(addr_v6) => Ok(addr_v6.ip().to_string()),
},
}
}
pub fn get_host_and_port(&self) -> Result<String, ProxyError> {
match self {
ProxyScheme::Http { host, .. } => {
Ok(self.append_default_port(host, 80))
},
ProxyScheme::Https { host, .. } => {
Ok(self.append_default_port(host, 443))
},
ProxyScheme::Socks5 { addr, .. } => {
Ok(format!("{}", addr))
},
ProxyScheme::Http { host, .. } => Ok(self.append_default_port(host, 80)),
ProxyScheme::Https { host, .. } => Ok(self.append_default_port(host, 443)),
ProxyScheme::Socks5 { addr, .. } => Ok(format!("{}", addr)),
}
}
fn append_default_port(&self, host: &str, default_port: u16) -> String {
@@ -276,7 +265,6 @@ impl ProxyScheme {
}
}
pub trait IntoProxyScheme {
fn into_proxy_scheme(self) -> Result<ProxyScheme, ProxyError>;
}
@@ -326,15 +314,19 @@ impl Proxy {
pub fn is_http_or_https(&self) -> bool {
return match self.intercept {
ProxyScheme::Socks5 { .. } => false,
_ => true
_ => true,
};
}
pub fn form_conf(conf: &Socks5Server, ms_timeout: Option<u64>) -> Result<Self, ProxyError> {
pub fn from_conf(conf: &Socks5Server, ms_timeout: Option<u64>) -> Result<Self, ProxyError> {
let mut proxy;
match ms_timeout {
None => { proxy = Self::new(&conf.proxy, DEFINE_TIME_OUT)?; }
Some(time_out) => { proxy = Self::new(&conf.proxy, time_out)?; }
None => {
proxy = Self::new(&conf.proxy, DEFINE_TIME_OUT)?;
}
Some(time_out) => {
proxy = Self::new(&conf.proxy, time_out)?;
}
}
if !conf.password.is_empty() && !conf.username.is_empty() {
@@ -352,9 +344,13 @@ impl Proxy {
self
}
pub async fn connect<'t, T>(self, target: T,
local_addr: Option<SocketAddr>) -> ResultType<FramedStream>
where T: IntoTargetAddr<'t>,
pub async fn connect<'t, T>(
self,
target: T,
local_addr: Option<SocketAddr>,
) -> ResultType<FramedStream>
where
T: IntoTargetAddr<'t>,
{
info!("Connect to proxy server");
let proxy = self.proxy_addrs().await?;
@@ -365,71 +361,95 @@ impl Proxy {
crate::config::Config::get_any_listen_addr(proxy.is_ipv4())
};
let stream = super::timeout(self.ms_timeout,
crate::tcp::new_socket(local, true)?.connect(proxy)).await??;
let stream = super::timeout(
self.ms_timeout,
crate::tcp::new_socket(local, true)?.connect(proxy),
)
.await??;
stream.set_nodelay(true).ok();
let addr = stream.local_addr()?;
return match self.intercept {
ProxyScheme::Http { .. } => {
info!("Connect to remote http proxy server: {}",proxy);
let stream = super::timeout(
self.ms_timeout,
self.http_connect(stream, target),
).await??;
info!("Connect to remote http proxy server: {}", proxy);
let stream =
super::timeout(self.ms_timeout, self.http_connect(stream, target)).await??;
Ok(FramedStream(
Framed::new(DynTcpStream(Box::new(stream)), BytesCodec::new()),
addr, None, 0,
addr,
None,
0,
))
}
ProxyScheme::Https { .. } => {
info!("Connect to remote https proxy server: {}",proxy);
let stream = super::timeout(
self.ms_timeout,
self.https_connect(stream, target),
).await??;
info!("Connect to remote https proxy server: {}", proxy);
let stream =
super::timeout(self.ms_timeout, self.https_connect(stream, target)).await??;
Ok(FramedStream(
Framed::new(DynTcpStream(Box::new(stream)), BytesCodec::new()),
addr, None, 0,
addr,
None,
0,
))
}
ProxyScheme::Socks5 { .. } => {
info!("Connect to remote socket5 proxy server: {}",proxy);
info!("Connect to remote socket5 proxy server: {}", proxy);
let stream = if let Some(auth) = self.intercept.maybe_auth() {
super::timeout(
self.ms_timeout,
Socks5Stream::connect_with_password_and_socket(
stream, target, &auth.user_name, &auth.password,
stream,
target,
&auth.user_name,
&auth.password,
),
).await??
)
.await??
} else {
super::timeout(self.ms_timeout, Socks5Stream::connect_with_socket(stream, target)).await??
super::timeout(
self.ms_timeout,
Socks5Stream::connect_with_socket(stream, target),
)
.await??
};
Ok(FramedStream(
Framed::new(DynTcpStream(Box::new(stream)), BytesCodec::new()),
addr, None, 0,
addr,
None,
0,
))
}
};
}
#[cfg(any(target_os = "windows", target_os = "macos"))]
pub async fn https_connect<'a, Input, T>(self, io: Input, target: T) -> Result<BufStream<TlsStream<Input>>, ProxyError>
where
Input: AsyncRead + AsyncWrite + Unpin,
T: IntoTargetAddr<'a> {
pub async fn https_connect<'a, Input, T>(
self,
io: Input,
target: T,
) -> Result<BufStream<TlsStream<Input>>, ProxyError>
where
Input: AsyncRead + AsyncWrite + Unpin,
T: IntoTargetAddr<'a>,
{
let tls_connector = TlsConnector::from(native_tls::TlsConnector::new()?);
let stream = tls_connector.connect(&self.intercept.get_domain()?, io).await?;
let stream = tls_connector
.connect(&self.intercept.get_domain()?, io)
.await?;
self.http_connect(stream, target).await
}
#[cfg(not(any(target_os = "windows", target_os = "macos")))]
pub async fn https_connect<'a, Input, T>(self, io: Input, target: T) -> Result<BufStream<TlsStream<Input>>, ProxyError>
where
Input: AsyncRead + AsyncWrite + Unpin,
T: IntoTargetAddr<'a> {
pub async fn https_connect<'a, Input, T>(
self,
io: Input,
target: T,
) -> Result<BufStream<TlsStream<Input>>, ProxyError>
where
Input: AsyncRead + AsyncWrite + Unpin,
T: IntoTargetAddr<'a>,
{
let root_store = rustls::RootCertStore {
roots: webpki_roots::TLS_SERVER_ROOTS.into(),
};
@@ -448,9 +468,15 @@ impl Proxy {
self.http_connect(stream, target).await
}
pub async fn http_connect<'a, Input, T>(self, io: Input, target: T) -> Result<BufStream<Input>, ProxyError>
where
Input: AsyncRead + AsyncWrite + Unpin, T: IntoTargetAddr<'a> {
pub async fn http_connect<'a, Input, T>(
self,
io: Input,
target: T,
) -> Result<BufStream<Input>, ProxyError>
where
Input: AsyncRead + AsyncWrite + Unpin,
T: IntoTargetAddr<'a>,
{
let mut stream = BufStream::new(io);
let (domain, port) = get_domain_and_port(target)?;
@@ -464,7 +490,9 @@ impl Proxy {
fn make_request(&self, host: &str, port: u16) -> String {
let mut request = format!(
"CONNECT {host}:{port} HTTP/1.1\r\nHost: {host}:{port}\r\n",
host = host, port = port);
host = host,
port = port
);
if let Some(auth) = self.intercept.maybe_auth() {
request = format!("{}{}", request, auth.get_proxy_authorization());
@@ -475,19 +503,19 @@ impl Proxy {
}
}
fn get_domain_and_port<'a, T: IntoTargetAddr<'a>>(target: T) -> Result<(String, u16), ProxyError> {
let target_addr = target.into_target_addr().map_err(|e| ProxyError::TargetParseError(e.to_string()))?;
let target_addr = target
.into_target_addr()
.map_err(|e| ProxyError::TargetParseError(e.to_string()))?;
match target_addr {
tokio_socks::TargetAddr::Ip(addr) => Ok((addr.ip().to_string(), addr.port())),
tokio_socks::TargetAddr::Domain(name, port) => Ok((name.to_string(), port)),
}
}
async fn get_response<IO>(stream: &mut BufStream<IO>) -> Result<String, ProxyError>
where
IO: AsyncRead + AsyncWrite + Unpin,
where
IO: AsyncRead + AsyncWrite + Unpin,
{
use tokio::io::AsyncBufReadExt;
let mut response = String::new();
@@ -498,7 +526,9 @@ async fn get_response<IO>(stream: &mut BufStream<IO>) -> Result<String, ProxyErr
}
if MAXIMUM_RESPONSE_HEADER_LENGTH < response.len() {
return Err(ProxyError::MaximumResponseHeaderLengthExceeded(response.len()));
return Err(ProxyError::MaximumResponseHeaderLengthExceeded(
response.len(),
));
}
if response.ends_with("\r\n\r\n") {
@@ -507,10 +537,9 @@ async fn get_response<IO>(stream: &mut BufStream<IO>) -> Result<String, ProxyErr
}
}
async fn recv_and_check_response<IO>(stream: &mut BufStream<IO>) -> Result<(), ProxyError>
where
IO: AsyncRead + AsyncWrite + Unpin,
where
IO: AsyncRead + AsyncWrite + Unpin,
{
let response_string = get_response(stream).await?;
@@ -529,4 +558,4 @@ async fn recv_and_check_response<IO>(stream: &mut BufStream<IO>) -> Result<(), P
}
None => Err(ProxyError::NoHttpCode),
};
}
}

View File

@@ -1,3 +1,4 @@
use crate::proxy::IntoProxyScheme;
use crate::{
config::{Config, NetworkType},
tcp::FramedStream,
@@ -9,7 +10,6 @@ use std::net::SocketAddr;
use log::info;
use tokio::net::ToSocketAddrs;
use tokio_socks::{IntoTargetAddr, TargetAddr};
use crate::proxy::IntoProxyScheme;
#[inline]
pub fn check_port<T: std::string::ToString>(host: T, port: i32) -> String {
@@ -105,13 +105,7 @@ pub async fn connect_tcp_local<
ms_timeout: u64,
) -> ResultType<FramedStream> {
if let Some(conf) = Config::get_socks() {
return FramedStream::connect(
target,
local,
&conf,
ms_timeout,
)
.await;
return FramedStream::connect(target, local, &conf, ms_timeout).await;
}
if let Some(target) = target.resolve() {
if let Some(local) = local {

View File

@@ -1,3 +1,5 @@
use crate::config::Socks5Server;
use crate::proxy::Proxy;
use crate::{bail, bytes_codec::BytesCodec, ResultType};
use anyhow::Context as AnyhowCtx;
use bytes::{BufMut, Bytes, BytesMut};
@@ -20,8 +22,6 @@ use tokio::{
};
use tokio_socks::IntoTargetAddr;
use tokio_util::codec::Framed;
use crate::config::Socks5Server;
use crate::proxy::Proxy;
pub trait TcpStreamTrait: AsyncRead + AsyncWrite + Unpin {}
pub struct DynTcpStream(pub(crate) Box<dyn TcpStreamTrait + Send + Sync>);
@@ -117,10 +117,10 @@ impl FramedStream {
proxy_conf: &Socks5Server,
ms_timeout: u64,
) -> ResultType<Self>
where
T: IntoTargetAddr<'t>,
where
T: IntoTargetAddr<'t>,
{
let proxy = Proxy::form_conf(proxy_conf, Some(ms_timeout))?;
let proxy = Proxy::from_conf(proxy_conf, Some(ms_timeout))?;
proxy.connect::<T>(target, local_addr).await
}