aws_http_client.rs

  1use std::fmt;
  2use std::sync::Arc;
  3
  4use aws_smithy_runtime_api::client::http::{
  5    HttpClient as AwsClient, HttpConnector as AwsConnector,
  6    HttpConnectorFuture as AwsConnectorFuture, HttpConnectorFuture, HttpConnectorSettings,
  7    SharedHttpConnector,
  8};
  9use aws_smithy_runtime_api::client::orchestrator::{HttpRequest as AwsHttpRequest, HttpResponse};
 10use aws_smithy_runtime_api::client::result::ConnectorError;
 11use aws_smithy_runtime_api::client::runtime_components::RuntimeComponents;
 12use aws_smithy_runtime_api::http::{Headers, StatusCode};
 13use aws_smithy_types::body::SdkBody;
 14use futures::AsyncReadExt;
 15use http_client::{AsyncBody, Inner};
 16use http_client::{HttpClient, Request};
 17use tokio::runtime::Handle;
 18
 19struct AwsHttpConnector {
 20    client: Arc<dyn HttpClient>,
 21    handle: Handle,
 22}
 23
 24impl std::fmt::Debug for AwsHttpConnector {
 25    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> fmt::Result {
 26        f.debug_struct("AwsHttpConnector").finish()
 27    }
 28}
 29
 30impl AwsConnector for AwsHttpConnector {
 31    fn call(&self, request: AwsHttpRequest) -> AwsConnectorFuture {
 32        let req = match request.try_into_http1x() {
 33            Ok(req) => req,
 34            Err(err) => {
 35                return HttpConnectorFuture::ready(Err(ConnectorError::other(err.into(), None)));
 36            }
 37        };
 38
 39        let (parts, body) = req.into_parts();
 40
 41        let response = self
 42            .client
 43            .send(Request::from_parts(parts, convert_to_async_body(body)));
 44
 45        let handle = self.handle.clone();
 46
 47        HttpConnectorFuture::new(async move {
 48            let response = match response.await {
 49                Ok(response) => response,
 50                Err(err) => return Err(ConnectorError::other(err.into(), None)),
 51            };
 52            let (parts, body) = response.into_parts();
 53            let body = convert_to_sdk_body(body, handle).await;
 54
 55            let mut response =
 56                HttpResponse::new(StatusCode::try_from(parts.status.as_u16()).unwrap(), body);
 57
 58            let headers = match Headers::try_from(parts.headers) {
 59                Ok(headers) => headers,
 60                Err(err) => return Err(ConnectorError::other(err.into(), None)),
 61            };
 62
 63            *response.headers_mut() = headers;
 64
 65            Ok(response)
 66        })
 67    }
 68}
 69
 70#[derive(Clone)]
 71pub struct AwsHttpClient {
 72    client: Arc<dyn HttpClient>,
 73    handler: Handle,
 74}
 75
 76impl std::fmt::Debug for AwsHttpClient {
 77    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> fmt::Result {
 78        f.debug_struct("AwsHttpClient").finish()
 79    }
 80}
 81
 82impl AwsHttpClient {
 83    pub fn new(client: Arc<dyn HttpClient>, handle: Handle) -> Self {
 84        Self {
 85            client,
 86            handler: handle,
 87        }
 88    }
 89}
 90
 91impl AwsClient for AwsHttpClient {
 92    fn http_connector(
 93        &self,
 94        _settings: &HttpConnectorSettings,
 95        _components: &RuntimeComponents,
 96    ) -> SharedHttpConnector {
 97        SharedHttpConnector::new(AwsHttpConnector {
 98            client: self.client.clone(),
 99            handle: self.handler.clone(),
100        })
101    }
102}
103
104pub async fn convert_to_sdk_body(body: AsyncBody, handle: Handle) -> SdkBody {
105    match body.0 {
106        Inner::Empty => SdkBody::empty(),
107        Inner::Bytes(bytes) => SdkBody::from(bytes.into_inner()),
108        Inner::AsyncReader(mut reader) => {
109            let buffer = handle.spawn(async move {
110                let mut buffer = Vec::new();
111                let _ = reader.read_to_end(&mut buffer).await;
112                buffer
113            });
114
115            SdkBody::from(buffer.await.unwrap_or_default())
116        }
117    }
118}
119
120pub fn convert_to_async_body(body: SdkBody) -> AsyncBody {
121    match body.bytes() {
122        Some(bytes) => AsyncBody::from((*bytes).to_vec()),
123        None => AsyncBody::empty(),
124    }
125}