diff --git a/src/common.rs b/src/common.rs index 3053891e6..57f2afbc9 100644 --- a/src/common.rs +++ b/src/common.rs @@ -1187,6 +1187,10 @@ fn get_tcp_proxy_addr() -> String { /// Send an HTTP request via the rendezvous server's TCP proxy using protobuf. /// Connects with `connect_tcp` + `secure_tcp`, sends `HttpProxyRequest`, /// receives `HttpProxyResponse`. +/// +/// The entire operation (connect + handshake + send + receive) is wrapped in +/// an overall timeout of `CONNECT_TIMEOUT + READ_TIMEOUT` so that a stall at +/// any stage cannot block the caller indefinitely. async fn tcp_proxy_request( method: &str, url: &str, @@ -1212,52 +1216,65 @@ async fn tcp_proxy_request( tcp_addr ); - let mut conn = socket_client::connect_tcp(&*tcp_addr, CONNECT_TIMEOUT).await?; - let key = crate::get_key(true).await; - secure_tcp_silent(&mut conn, &key).await?; + let overall_timeout = CONNECT_TIMEOUT + READ_TIMEOUT; + timeout(overall_timeout, async { + let mut conn = socket_client::connect_tcp(&*tcp_addr, CONNECT_TIMEOUT).await?; + let key = crate::get_key(true).await; + secure_tcp_silent(&mut conn, &key).await?; - let mut req = HttpProxyRequest::new(); - req.method = method.to_uppercase(); - req.path = path; - req.headers = headers.into(); - req.body = Bytes::from(body.to_vec()); + let mut req = HttpProxyRequest::new(); + req.method = method.to_uppercase(); + req.path = path; + req.headers = headers.into(); + req.body = Bytes::from(body.to_vec()); - let mut msg_out = RendezvousMessage::new(); - msg_out.set_http_proxy_request(req); - conn.send(&msg_out).await?; + let mut msg_out = RendezvousMessage::new(); + msg_out.set_http_proxy_request(req); + conn.send(&msg_out).await?; - match timeout(READ_TIMEOUT, conn.next()).await? { - Some(Ok(bytes)) => { - let msg_in = RendezvousMessage::parse_from_bytes(&bytes)?; - match msg_in.union { - Some(rendezvous_message::Union::HttpProxyResponse(resp)) => Ok(resp), - _ => bail!("Unexpected response from TCP proxy"), + match conn.next().await { + Some(Ok(bytes)) => { + let msg_in = RendezvousMessage::parse_from_bytes(&bytes)?; + match msg_in.union { + Some(rendezvous_message::Union::HttpProxyResponse(resp)) => Ok(resp), + _ => bail!("Unexpected response from TCP proxy"), + } } + Some(Err(e)) => bail!("TCP proxy read error: {}", e), + None => bail!("TCP proxy connection closed without response"), } - Some(Err(e)) => bail!("TCP proxy read error: {}", e), - None => bail!("TCP proxy connection closed without response"), - } + }) + .await? } /// Build HeaderEntry list from "Key: Value" style header string (used by post_request). +/// If the caller supplies a Content-Type header it overrides the default `application/json`. fn parse_simple_header(header: &str) -> Vec { - let mut entries = vec![HeaderEntry { - name: "Content-Type".into(), - value: "application/json".into(), - ..Default::default() - }]; + let mut entries = Vec::new(); + let mut has_content_type = false; if !header.is_empty() { let tmp: Vec<&str> = header.splitn(2, ": ").collect(); if tmp.len() == 2 { - if !tmp[0].eq_ignore_ascii_case("Content-Type") { - entries.push(HeaderEntry { - name: tmp[0].into(), - value: tmp[1].into(), - ..Default::default() - }); + if tmp[0].eq_ignore_ascii_case("Content-Type") { + has_content_type = true; } + entries.push(HeaderEntry { + name: tmp[0].into(), + value: tmp[1].into(), + ..Default::default() + }); } } + if !has_content_type { + entries.insert( + 0, + HeaderEntry { + name: "Content-Type".into(), + value: "application/json".into(), + ..Default::default() + }, + ); + } entries } @@ -1308,21 +1325,16 @@ fn parse_json_header_entries(header: &str) -> ResultType> { } } -#[inline] -fn tcp_proxy_fallback_log_condition() -> &'static str { - "failed or 5xx" -} - /// Returns (status_code, body_text). Separating status so the wrapper can decide on fallback. -async fn post_request_http(url: String, body: String, header: &str) -> ResultType<(u16, String)> { +async fn post_request_http(url: &str, body: &str, header: &str) -> ResultType<(u16, String)> { let proxy_conf = Config::get_socks(); - let tls_url = get_url_for_tls(&url, &proxy_conf); + let tls_url = get_url_for_tls(url, &proxy_conf); let tls_type = get_cached_tls_type(tls_url); let danger_accept_invalid_cert = get_cached_tls_accept_invalid_cert(tls_url); let response = post_request_( - &url, + url, tls_url, - body.clone(), + body.to_owned(), header, tls_type, danger_accept_invalid_cert, @@ -1334,6 +1346,49 @@ async fn post_request_http(url: String, body: String, header: &str) -> ResultTyp Ok((status, text)) } +/// Try `http_fn` first; on connection failure or 5xx, fall back to `tcp_fn` +/// if the URL is eligible. 4xx responses are returned as-is. +async fn with_tcp_proxy_fallback( + url: &str, + method: &str, + http_fn: HttpFut, + tcp_fn: TcpFut, +) -> ResultType +where + HttpFut: Future>, + TcpFut: Future>, +{ + if should_use_raw_tcp_for_api(url) { + return tcp_fn.await; + } + + let http_result = http_fn.await; + let should_fallback = match &http_result { + Err(_) => true, + Ok((status, _)) => *status >= 500, + }; + + if should_fallback && can_fallback_to_raw_tcp(url) { + log::warn!( + "HTTP {} to {} failed or 5xx (result: {:?}), trying TCP proxy fallback", + method, + tcp_proxy_log_target(url), + http_result + .as_ref() + .map(|(s, _)| *s) + .map_err(|e| e.to_string()), + ); + match tcp_fn.await { + Ok(resp) => return Ok(resp), + Err(tcp_err) => { + log::warn!("TCP proxy fallback also failed: {:?}", tcp_err); + } + } + } + + http_result.map(|(_status, text)| text) +} + /// POST request with raw TCP proxy support. /// - If `USE_RAW_TCP_FOR_API` is "Y" and WS is off, goes directly through TCP proxy. /// - Otherwise tries HTTP first; on connection failure or 5xx status, @@ -1341,40 +1396,13 @@ async fn post_request_http(url: String, body: String, header: &str) -> ResultTyp /// - 4xx responses are returned as-is (server is reachable, business logic error). /// - If fallback also fails, returns the original HTTP result (text or error). pub async fn post_request(url: String, body: String, header: &str) -> ResultType { - if should_use_raw_tcp_for_api(&url) { - return post_request_via_tcp_proxy(&url, &body, header).await; - } - - let http_result = post_request_http(url.clone(), body.clone(), header).await; - let should_fallback = match &http_result { - Err(_) => true, - Ok((status, _)) => *status >= 500, - }; - - if should_fallback && can_fallback_to_raw_tcp(&url) { - log::warn!( - "HTTP POST to {} {} (result: {:?}), trying TCP proxy fallback", - tcp_proxy_log_target(&url), - tcp_proxy_fallback_log_condition(), - http_result - .as_ref() - .map(|(s, _)| *s) - .map_err(|e| e.to_string()), - ); - match post_request_via_tcp_proxy(&url, &body, header).await { - Ok(resp) => return Ok(resp), - Err(tcp_err) => { - log::warn!("TCP proxy fallback also failed: {:?}", tcp_err); - // Fall through to return original HTTP result - } - } - } - - // Return original HTTP result - match http_result { - Ok((_status, text)) => Ok(text), - Err(e) => Err(e), - } + with_tcp_proxy_fallback( + &url, + "POST", + post_request_http(&url, &body, header), + post_request_via_tcp_proxy(&url, &body, header), + ) + .await } #[async_recursion] @@ -1623,32 +1651,13 @@ pub async fn http_request_sync( body: Option, header: String, ) -> ResultType { - if should_use_raw_tcp_for_api(&url) { - return http_request_via_tcp_proxy(&url, &method, body.as_deref(), &header).await; - } - - let http_result = http_request_http(&url, &method, body.clone(), &header).await; - let should_fallback = match &http_result { - Err(_) => true, - Ok((status, _)) => *status >= 500, - }; - - if should_fallback && can_fallback_to_raw_tcp(&url) { - log::warn!( - "HTTP {} to {} {}, trying TCP proxy fallback", - method, - tcp_proxy_log_target(&url), - tcp_proxy_fallback_log_condition() - ); - match http_request_via_tcp_proxy(&url, &method, body.as_deref(), &header).await { - Ok(resp) => return Ok(resp), - Err(tcp_err) => { - log::warn!("TCP proxy fallback also failed: {:?}", tcp_err); - } - } - } - - http_result.map(|(_status, json_str)| json_str) + with_tcp_proxy_fallback( + &url, + &method, + http_request_http(&url, &method, body.clone(), &header), + http_request_via_tcp_proxy(&url, &method, body.as_deref(), &header), + ) + .await } /// General HTTP request via TCP proxy. Header is a JSON string (used by http_request_sync). @@ -2874,7 +2883,7 @@ mod tests { } #[test] - fn test_parse_simple_header_ignores_custom_content_type() { + fn test_parse_simple_header_respects_custom_content_type() { let headers = parse_simple_header("Content-Type: text/plain"); assert_eq!( @@ -2889,7 +2898,7 @@ mod tests { .iter() .find(|entry| entry.name.eq_ignore_ascii_case("Content-Type")) .map(|entry| entry.value.as_str()), - Some("application/json") + Some("text/plain") ); } @@ -2917,11 +2926,6 @@ mod tests { ); } - #[test] - fn test_tcp_proxy_fallback_log_condition() { - assert_eq!(tcp_proxy_fallback_log_condition(), "failed or 5xx"); - } - #[test] fn test_tcp_proxy_log_target_redacts_query_only() { assert_eq!(