bedrock: Fix remaining streaming delays (#33931)

tiagoq and Marshall Bowers created

Closes #26030

*Note: This is my first contribution to Zed*

This addresses a second streaming bottleneck in Bedrock that remained
after the initial fix in #28281 (released in preview 194).

The issue is in the mechanism used to convert Zed's internal `AsyncBody`
into the `SdkBody` expected by the Bedrock language provider. We are
using a non-streaming converter that buffers responses.

**How the fix works:**
The AWS SDK provides streaming-compatible converters to create `SdkBody`
instances, but these require the input body to implement the `Body`
trait from the `http-body` crate.

This PR enables streaming by implementing the required trait and
switching to the streaming-compatible converter.

**Changes (2 commits):**

 * 1st Commit - **Implement http-body Body trait for AsyncBody:**
   - Add `http-body = 1.0` dependency (already an indirect dependency)
   - Implement the `Body` trait for our existing `AsyncBody` type
- Uses `poll_frame` to read data chunks asynchronously, preserving
streaming behavior

 * 2nd Commit - **Use streaming-compatible AWS SDK converter:**
- Create `SdkBody` using `SdkBody::from_body_1_x()` with the new `Body`
trait implementation

**Details/FAQ:**

**Q: Why add another dependency?**
A: We tried to avoid adding a dependency, but the AWS SDK requires the
`Body` trait and `http-body` is where it's defined. The crate is already
an indirect dependency, making this a reasonable solution.

**Q: Why modify the shared `http_client` crate instead of just
`aws_bedrock_client`?**
A: We considered implementing the `Body` trait on a wrapper in
`aws_bedrock_client`, but since `AsyncBody` already uses `http` crate
types, extending support to the companion `http-body` crate seems
reasonable and may benefit other integrations.

**Q: How was this bottleneck discovered?**
A: After @5herlocked's initial streaming fix in #28281, I tested preview
194 and noticed streaming still had issues. I found a way to reproduce
the problem and chatted with @5herlocked about it. He immediately
pinpointed the exact location where the issue was occurring, his
diagnosis made this fix possible.

**Q: How does this relate to the previous fix?**
A: #28281 fixed buffering issues higher in the stack, but unfortunately
there was another bottleneck lower-down in the aws-http-client. This PR
addresses that separate buffering issue.

**Q: Does this use zero-copy or one-copy?**
A: The `Body` implementation includes one copy. Someone more
knowledgeable might be able to achieve a zero-copy approach, but we
opted for a conservative approach. The performance impact should not be
perceptible in typical usage.

**Testing:**
Confirmed that Bedrock streaming now works without buffering delays in a
local build.


Release Notes:

- Improved Bedrock streaming by eliminating response buffering delays

---------

Co-authored-by: Marshall Bowers <git@maxdeviant.com>

Change summary

Cargo.lock                                     |  3 -
Cargo.toml                                     |  1 
crates/aws_http_client/Cargo.toml              |  2 -
crates/aws_http_client/src/aws_http_client.rs  | 39 ++++---------------
crates/http_client/Cargo.toml                  |  1 
crates/http_client/src/async_body.rs           | 22 +++++++++++
crates/language_models/src/provider/bedrock.rs |  8 +---
7 files changed, 36 insertions(+), 40 deletions(-)

Detailed changes

Cargo.lock 🔗

@@ -1872,9 +1872,7 @@ version = "0.1.0"
 dependencies = [
  "aws-smithy-runtime-api",
  "aws-smithy-types",
- "futures 0.3.31",
  "http_client",
- "tokio",
  "workspace-hack",
 ]
 
@@ -7857,6 +7855,7 @@ dependencies = [
  "derive_more 0.99.19",
  "futures 0.3.31",
  "http 1.3.1",
+ "http-body 1.0.1",
  "log",
  "serde",
  "serde_json",

Cargo.toml 🔗

@@ -482,6 +482,7 @@ heed = { version = "0.21.0", features = ["read-txn-no-tls"] }
 hex = "0.4.3"
 html5ever = "0.27.0"
 http = "1.1"
+http-body = "1.0"
 hyper = "0.14"
 ignore = "0.4.22"
 image = "0.25.1"

crates/aws_http_client/Cargo.toml 🔗

@@ -17,7 +17,5 @@ default = []
 [dependencies]
 aws-smithy-runtime-api.workspace = true
 aws-smithy-types.workspace = true
-futures.workspace = true
 http_client.workspace = true
-tokio = { workspace = true, features = ["rt", "rt-multi-thread"] }
 workspace-hack.workspace = true

crates/aws_http_client/src/aws_http_client.rs 🔗

@@ -11,14 +11,11 @@ use aws_smithy_runtime_api::client::result::ConnectorError;
 use aws_smithy_runtime_api::client::runtime_components::RuntimeComponents;
 use aws_smithy_runtime_api::http::{Headers, StatusCode};
 use aws_smithy_types::body::SdkBody;
-use futures::AsyncReadExt;
-use http_client::{AsyncBody, Inner};
+use http_client::AsyncBody;
 use http_client::{HttpClient, Request};
-use tokio::runtime::Handle;
 
 struct AwsHttpConnector {
     client: Arc<dyn HttpClient>,
-    handle: Handle,
 }
 
 impl std::fmt::Debug for AwsHttpConnector {
@@ -42,18 +39,17 @@ impl AwsConnector for AwsHttpConnector {
             .client
             .send(Request::from_parts(parts, convert_to_async_body(body)));
 
-        let handle = self.handle.clone();
-
         HttpConnectorFuture::new(async move {
             let response = match response.await {
                 Ok(response) => response,
                 Err(err) => return Err(ConnectorError::other(err.into(), None)),
             };
             let (parts, body) = response.into_parts();
-            let body = convert_to_sdk_body(body, handle).await;
 
-            let mut response =
-                HttpResponse::new(StatusCode::try_from(parts.status.as_u16()).unwrap(), body);
+            let mut response = HttpResponse::new(
+                StatusCode::try_from(parts.status.as_u16()).unwrap(),
+                convert_to_sdk_body(body),
+            );
 
             let headers = match Headers::try_from(parts.headers) {
                 Ok(headers) => headers,
@@ -70,7 +66,6 @@ impl AwsConnector for AwsHttpConnector {
 #[derive(Clone)]
 pub struct AwsHttpClient {
     client: Arc<dyn HttpClient>,
-    handler: Handle,
 }
 
 impl std::fmt::Debug for AwsHttpClient {
@@ -80,11 +75,8 @@ impl std::fmt::Debug for AwsHttpClient {
 }
 
 impl AwsHttpClient {
-    pub fn new(client: Arc<dyn HttpClient>, handle: Handle) -> Self {
-        Self {
-            client,
-            handler: handle,
-        }
+    pub fn new(client: Arc<dyn HttpClient>) -> Self {
+        Self { client }
     }
 }
 
@@ -96,25 +88,12 @@ impl AwsClient for AwsHttpClient {
     ) -> SharedHttpConnector {
         SharedHttpConnector::new(AwsHttpConnector {
             client: self.client.clone(),
-            handle: self.handler.clone(),
         })
     }
 }
 
-pub async fn convert_to_sdk_body(body: AsyncBody, handle: Handle) -> SdkBody {
-    match body.0 {
-        Inner::Empty => SdkBody::empty(),
-        Inner::Bytes(bytes) => SdkBody::from(bytes.into_inner()),
-        Inner::AsyncReader(mut reader) => {
-            let buffer = handle.spawn(async move {
-                let mut buffer = Vec::new();
-                let _ = reader.read_to_end(&mut buffer).await;
-                buffer
-            });
-
-            SdkBody::from(buffer.await.unwrap_or_default())
-        }
-    }
+pub fn convert_to_sdk_body(body: AsyncBody) -> SdkBody {
+    SdkBody::from_body_1_x(body)
 }
 
 pub fn convert_to_async_body(body: SdkBody) -> AsyncBody {

crates/http_client/Cargo.toml 🔗

@@ -21,6 +21,7 @@ anyhow.workspace = true
 derive_more.workspace = true
 futures.workspace = true
 http.workspace = true
+http-body.workspace = true
 log.workspace = true
 serde.workspace = true
 serde_json.workspace = true

crates/http_client/src/async_body.rs 🔗

@@ -6,6 +6,7 @@ use std::{
 
 use bytes::Bytes;
 use futures::AsyncRead;
+use http_body::{Body, Frame};
 
 /// Based on the implementation of AsyncBody in
 /// <https://github.com/sagebind/isahc/blob/5c533f1ef4d6bdf1fd291b5103c22110f41d0bf0/src/body/mod.rs>.
@@ -114,3 +115,24 @@ impl futures::AsyncRead for AsyncBody {
         }
     }
 }
+
+impl Body for AsyncBody {
+    type Data = Bytes;
+    type Error = std::io::Error;
+
+    fn poll_frame(
+        mut self: Pin<&mut Self>,
+        cx: &mut std::task::Context<'_>,
+    ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
+        let mut buffer = vec![0; 8192];
+        match AsyncRead::poll_read(self.as_mut(), cx, &mut buffer) {
+            Poll::Ready(Ok(0)) => Poll::Ready(None),
+            Poll::Ready(Ok(n)) => {
+                let data = Bytes::copy_from_slice(&buffer[..n]);
+                Poll::Ready(Some(Ok(Frame::data(data))))
+            }
+            Poll::Ready(Err(e)) => Poll::Ready(Some(Err(e))),
+            Poll::Pending => Poll::Pending,
+        }
+    }
+}

crates/language_models/src/provider/bedrock.rs 🔗

@@ -258,13 +258,9 @@ impl BedrockLanguageModelProvider {
             }),
         });
 
-        let tokio_handle = Tokio::handle(cx);
-
-        let coerced_client = AwsHttpClient::new(http_client.clone(), tokio_handle.clone());
-
         Self {
-            http_client: coerced_client,
-            handler: tokio_handle.clone(),
+            http_client: AwsHttpClient::new(http_client.clone()),
+            handler: Tokio::handle(cx),
             state,
         }
     }