aws_http_client.rs

  1use std::fmt;
  2use std::sync::Arc;
  3
  4use aws_smithy_runtime_api::client::http::{
  5    HttpClient as AwsClient, HttpConnector as AwsConnector,
  6    HttpConnectorFuture as AwsConnectorFuture, HttpConnectorFuture, HttpConnectorSettings,
  7    SharedHttpConnector,
  8};
  9use aws_smithy_runtime_api::client::orchestrator::{HttpRequest as AwsHttpRequest, HttpResponse};
 10use aws_smithy_runtime_api::client::result::ConnectorError;
 11use aws_smithy_runtime_api::client::runtime_components::RuntimeComponents;
 12use aws_smithy_runtime_api::http::StatusCode;
 13use aws_smithy_types::body::SdkBody;
 14use futures::AsyncReadExt;
 15use http_client::{AsyncBody, Inner};
 16use http_client::{HttpClient, Request};
 17use tokio::runtime::Handle;
 18
 19struct AwsHttpConnector {
 20    client: Arc<dyn HttpClient>,
 21    handle: Handle,
 22}
 23
 24impl std::fmt::Debug for AwsHttpConnector {
 25    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> fmt::Result {
 26        f.debug_struct("AwsHttpConnector").finish()
 27    }
 28}
 29
 30impl AwsConnector for AwsHttpConnector {
 31    fn call(&self, request: AwsHttpRequest) -> AwsConnectorFuture {
 32        let req = match request.try_into_http1x() {
 33            Ok(req) => req,
 34            Err(err) => {
 35                return HttpConnectorFuture::ready(Err(ConnectorError::other(err.into(), None)));
 36            }
 37        };
 38
 39        let (parts, body) = req.into_parts();
 40
 41        let response = self
 42            .client
 43            .send(Request::from_parts(parts, convert_to_async_body(body)));
 44
 45        let handle = self.handle.clone();
 46
 47        HttpConnectorFuture::new(async move {
 48            let response = match response.await {
 49                Ok(response) => response,
 50                Err(err) => return Err(ConnectorError::other(err.into(), None)),
 51            };
 52            let (parts, body) = response.into_parts();
 53            let body = convert_to_sdk_body(body, handle).await;
 54
 55            Ok(HttpResponse::new(
 56                StatusCode::try_from(parts.status.as_u16()).unwrap(),
 57                body,
 58            ))
 59        })
 60    }
 61}
 62
 63#[derive(Clone)]
 64pub struct AwsHttpClient {
 65    client: Arc<dyn HttpClient>,
 66    handler: Handle,
 67}
 68
 69impl std::fmt::Debug for AwsHttpClient {
 70    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> fmt::Result {
 71        f.debug_struct("AwsHttpClient").finish()
 72    }
 73}
 74
 75impl AwsHttpClient {
 76    pub fn new(client: Arc<dyn HttpClient>, handle: Handle) -> Self {
 77        Self {
 78            client,
 79            handler: handle,
 80        }
 81    }
 82}
 83
 84impl AwsClient for AwsHttpClient {
 85    fn http_connector(
 86        &self,
 87        _settings: &HttpConnectorSettings,
 88        _components: &RuntimeComponents,
 89    ) -> SharedHttpConnector {
 90        SharedHttpConnector::new(AwsHttpConnector {
 91            client: self.client.clone(),
 92            handle: self.handler.clone(),
 93        })
 94    }
 95}
 96
 97pub async fn convert_to_sdk_body(body: AsyncBody, handle: Handle) -> SdkBody {
 98    match body.0 {
 99        Inner::Empty => SdkBody::empty(),
100        Inner::Bytes(bytes) => SdkBody::from(bytes.into_inner()),
101        Inner::AsyncReader(mut reader) => {
102            let buffer = handle.spawn(async move {
103                let mut buffer = Vec::new();
104                let _ = reader.read_to_end(&mut buffer).await;
105                buffer
106            });
107
108            SdkBody::from(buffer.await.unwrap_or_default())
109        }
110    }
111}
112
113pub fn convert_to_async_body(body: SdkBody) -> AsyncBody {
114    match body.bytes() {
115        Some(bytes) => AsyncBody::from((*bytes).to_vec()),
116        None => AsyncBody::empty(),
117    }
118}