http_proxy.rs

  1use anyhow::{Context, Result};
  2use base64::Engine;
  3use httparse::{EMPTY_HEADER, Response};
  4use tokio::{
  5    io::{AsyncBufReadExt, AsyncWriteExt, BufStream},
  6    net::TcpStream,
  7};
  8use tokio_native_tls::{TlsConnector, native_tls};
  9use url::Url;
 10
 11use super::AsyncReadWrite;
 12
 13pub(super) enum HttpProxyType<'t> {
 14    HTTP(Option<HttpProxyAuthorization<'t>>),
 15    HTTPS(Option<HttpProxyAuthorization<'t>>),
 16}
 17
 18pub(super) struct HttpProxyAuthorization<'t> {
 19    username: &'t str,
 20    password: &'t str,
 21}
 22
 23pub(super) fn parse_http_proxy<'t>(scheme: &str, proxy: &'t Url) -> HttpProxyType<'t> {
 24    let auth = proxy.password().map(|password| HttpProxyAuthorization {
 25        username: proxy.username(),
 26        password,
 27    });
 28    if scheme.starts_with("https") {
 29        HttpProxyType::HTTPS(auth)
 30    } else {
 31        HttpProxyType::HTTP(auth)
 32    }
 33}
 34
 35pub(crate) async fn connect_http_proxy_stream(
 36    stream: TcpStream,
 37    http_proxy: HttpProxyType<'_>,
 38    rpc_host: (&str, u16),
 39    proxy_domain: &str,
 40) -> Result<Box<dyn AsyncReadWrite>> {
 41    match http_proxy {
 42        HttpProxyType::HTTP(auth) => http_connect(stream, rpc_host, auth).await,
 43        HttpProxyType::HTTPS(auth) => https_connect(stream, rpc_host, auth, proxy_domain).await,
 44    }
 45    .context("error connecting to http/https proxy")
 46}
 47
 48async fn http_connect<T>(
 49    stream: T,
 50    target: (&str, u16),
 51    auth: Option<HttpProxyAuthorization<'_>>,
 52) -> Result<Box<dyn AsyncReadWrite>>
 53where
 54    T: AsyncReadWrite,
 55{
 56    let mut stream = BufStream::new(stream);
 57    let request = make_request(target, auth);
 58    stream.write_all(request.as_bytes()).await?;
 59    stream.flush().await?;
 60    check_response(&mut stream).await?;
 61    Ok(Box::new(stream))
 62}
 63
 64async fn https_connect<T>(
 65    stream: T,
 66    target: (&str, u16),
 67    auth: Option<HttpProxyAuthorization<'_>>,
 68    proxy_domain: &str,
 69) -> Result<Box<dyn AsyncReadWrite>>
 70where
 71    T: AsyncReadWrite,
 72{
 73    let tls_connector = TlsConnector::from(native_tls::TlsConnector::new()?);
 74    let stream = tls_connector.connect(proxy_domain, stream).await?;
 75    http_connect(stream, target, auth).await
 76}
 77
 78fn make_request(target: (&str, u16), auth: Option<HttpProxyAuthorization<'_>>) -> String {
 79    let (host, port) = target;
 80    let mut request = format!(
 81        "CONNECT {host}:{port} HTTP/1.1\r\nHost: {host}:{port}\r\nProxy-Connection: Keep-Alive\r\n"
 82    );
 83    if let Some(HttpProxyAuthorization { username, password }) = auth {
 84        let auth =
 85            base64::prelude::BASE64_STANDARD.encode(format!("{username}:{password}").as_bytes());
 86        let auth = format!("Proxy-Authorization: Basic {auth}\r\n");
 87        request.push_str(&auth);
 88    }
 89    request.push_str("\r\n");
 90    request
 91}
 92
 93async fn check_response<T>(stream: &mut BufStream<T>) -> Result<()>
 94where
 95    T: AsyncReadWrite,
 96{
 97    let response = recv_response(stream).await?;
 98    let mut dummy_headers = [EMPTY_HEADER; MAX_RESPONSE_HEADERS];
 99    let mut parser = Response::new(&mut dummy_headers);
100    parser.parse(response.as_bytes())?;
101
102    match parser.code {
103        Some(code) => {
104            if code == 200 {
105                Ok(())
106            } else {
107                Err(anyhow::anyhow!(
108                    "Proxy connection failed with HTTP code: {code}"
109                ))
110            }
111        }
112        None => Err(anyhow::anyhow!(
113            "Proxy connection failed with no HTTP code: {}",
114            parser.reason.unwrap_or("Unknown reason")
115        )),
116    }
117}
118
119const MAX_RESPONSE_HEADER_LENGTH: usize = 4096;
120const MAX_RESPONSE_HEADERS: usize = 16;
121
122async fn recv_response<T>(stream: &mut BufStream<T>) -> Result<String>
123where
124    T: AsyncReadWrite,
125{
126    let mut response = String::new();
127    loop {
128        if stream.read_line(&mut response).await? == 0 {
129            return Err(anyhow::anyhow!("End of stream"));
130        }
131
132        if MAX_RESPONSE_HEADER_LENGTH < response.len() {
133            return Err(anyhow::anyhow!("Maximum response header length exceeded"));
134        }
135
136        if response.ends_with("\r\n\r\n") {
137            return Ok(response);
138        }
139    }
140}
141
142#[cfg(test)]
143mod tests {
144    use url::Url;
145
146    use super::{HttpProxyAuthorization, HttpProxyType, parse_http_proxy};
147
148    #[test]
149    fn test_parse_http_proxy() {
150        let proxy = Url::parse("http://proxy.example.com:1080").unwrap();
151        let scheme = proxy.scheme();
152
153        let version = parse_http_proxy(scheme, &proxy);
154        assert!(matches!(version, HttpProxyType::HTTP(None)))
155    }
156
157    #[test]
158    fn test_parse_http_proxy_with_auth() {
159        let proxy = Url::parse("http://username:password@proxy.example.com:1080").unwrap();
160        let scheme = proxy.scheme();
161
162        let version = parse_http_proxy(scheme, &proxy);
163        assert!(matches!(
164            version,
165            HttpProxyType::HTTP(Some(HttpProxyAuthorization {
166                username: "username",
167                password: "password"
168            }))
169        ))
170    }
171}