1use std::fmt;
2use std::sync::Arc;
3
4use aws_smithy_runtime_api::client::http::{
5 HttpClient as AwsClient, HttpConnector as AwsConnector,
6 HttpConnectorFuture as AwsConnectorFuture, HttpConnectorFuture, HttpConnectorSettings,
7 SharedHttpConnector,
8};
9use aws_smithy_runtime_api::client::orchestrator::{HttpRequest as AwsHttpRequest, HttpResponse};
10use aws_smithy_runtime_api::client::result::ConnectorError;
11use aws_smithy_runtime_api::client::runtime_components::RuntimeComponents;
12use aws_smithy_runtime_api::http::{Headers, StatusCode};
13use aws_smithy_types::body::SdkBody;
14use futures::AsyncReadExt;
15use http_client::{AsyncBody, Inner};
16use http_client::{HttpClient, Request};
17use tokio::runtime::Handle;
18
19struct AwsHttpConnector {
20 client: Arc<dyn HttpClient>,
21 handle: Handle,
22}
23
24impl std::fmt::Debug for AwsHttpConnector {
25 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> fmt::Result {
26 f.debug_struct("AwsHttpConnector").finish()
27 }
28}
29
30impl AwsConnector for AwsHttpConnector {
31 fn call(&self, request: AwsHttpRequest) -> AwsConnectorFuture {
32 let req = match request.try_into_http1x() {
33 Ok(req) => req,
34 Err(err) => {
35 return HttpConnectorFuture::ready(Err(ConnectorError::other(err.into(), None)));
36 }
37 };
38
39 let (parts, body) = req.into_parts();
40
41 let response = self
42 .client
43 .send(Request::from_parts(parts, convert_to_async_body(body)));
44
45 let handle = self.handle.clone();
46
47 HttpConnectorFuture::new(async move {
48 let response = match response.await {
49 Ok(response) => response,
50 Err(err) => return Err(ConnectorError::other(err.into(), None)),
51 };
52 let (parts, body) = response.into_parts();
53 let body = convert_to_sdk_body(body, handle).await;
54
55 let mut response =
56 HttpResponse::new(StatusCode::try_from(parts.status.as_u16()).unwrap(), body);
57
58 let headers = match Headers::try_from(parts.headers) {
59 Ok(headers) => headers,
60 Err(err) => return Err(ConnectorError::other(err.into(), None)),
61 };
62
63 *response.headers_mut() = headers;
64
65 Ok(response)
66 })
67 }
68}
69
70#[derive(Clone)]
71pub struct AwsHttpClient {
72 client: Arc<dyn HttpClient>,
73 handler: Handle,
74}
75
76impl std::fmt::Debug for AwsHttpClient {
77 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> fmt::Result {
78 f.debug_struct("AwsHttpClient").finish()
79 }
80}
81
82impl AwsHttpClient {
83 pub fn new(client: Arc<dyn HttpClient>, handle: Handle) -> Self {
84 Self {
85 client,
86 handler: handle,
87 }
88 }
89}
90
91impl AwsClient for AwsHttpClient {
92 fn http_connector(
93 &self,
94 _settings: &HttpConnectorSettings,
95 _components: &RuntimeComponents,
96 ) -> SharedHttpConnector {
97 SharedHttpConnector::new(AwsHttpConnector {
98 client: self.client.clone(),
99 handle: self.handler.clone(),
100 })
101 }
102}
103
104pub async fn convert_to_sdk_body(body: AsyncBody, handle: Handle) -> SdkBody {
105 match body.0 {
106 Inner::Empty => SdkBody::empty(),
107 Inner::Bytes(bytes) => SdkBody::from(bytes.into_inner()),
108 Inner::AsyncReader(mut reader) => {
109 let buffer = handle.spawn(async move {
110 let mut buffer = Vec::new();
111 let _ = reader.read_to_end(&mut buffer).await;
112 buffer
113 });
114
115 SdkBody::from(buffer.await.unwrap_or_default())
116 }
117 }
118}
119
120pub fn convert_to_async_body(body: SdkBody) -> AsyncBody {
121 match body.bytes() {
122 Some(bytes) => AsyncBody::from((*bytes).to_vec()),
123 None => AsyncBody::empty(),
124 }
125}