From 5387a6f7f93e7b7ddd9877e00efd89195b469e4c Mon Sep 17 00:00:00 2001 From: Mikayla Maki Date: Mon, 7 Oct 2024 13:03:26 -0700 Subject: [PATCH] Fix an issue where LLM requests would block forever (#18830) Release Notes: - N/A --------- Co-authored-by: Marshall Bowers --- .github/workflows/deploy_collab.yml | 3 +-- crates/reqwest_client/src/reqwest_client.rs | 27 +++++++++++++++++++-- 2 files changed, 26 insertions(+), 4 deletions(-) diff --git a/.github/workflows/deploy_collab.yml b/.github/workflows/deploy_collab.yml index cf3ffb4dbc6d632d0f2cda4f91042bb2f2db52c3..1e6e6cf2803e206eae4d6baa9a9dfbd62bb78797 100644 --- a/.github/workflows/deploy_collab.yml +++ b/.github/workflows/deploy_collab.yml @@ -3,8 +3,7 @@ name: Publish Collab Server Image on: push: tags: - # Pause production deploys while we investigate an issue. - # - collab-production + - collab-production - collab-staging env: diff --git a/crates/reqwest_client/src/reqwest_client.rs b/crates/reqwest_client/src/reqwest_client.rs index b5c274d5994deb8f765d15ba805bf1351e8a5f26..f8698b908096b8b8595ecc5d771b9e6bf535e20c 100644 --- a/crates/reqwest_client/src/reqwest_client.rs +++ b/crates/reqwest_client/src/reqwest_client.rs @@ -163,8 +163,12 @@ impl futures::stream::Stream for WrappedBody { WrappedBodyInner::SyncReader(cursor) => { let mut buf = Vec::new(); match cursor.read_to_end(&mut buf) { - Ok(_) => { - return Poll::Ready(Some(Ok(Bytes::from(buf)))); + Ok(bytes) => { + if bytes == 0 { + return Poll::Ready(None); + } else { + return Poll::Ready(Some(Ok(Bytes::from(buf)))); + } } Err(e) => return Poll::Ready(Some(Err(e))), } @@ -234,3 +238,22 @@ impl http_client::HttpClient for ReqwestClient { .boxed() } } + +#[cfg(test)] +mod test { + + use core::str; + + use http_client::AsyncBody; + use smol::stream::StreamExt; + + use crate::WrappedBody; + + #[tokio::test] + async fn test_sync_streaming_upload() { + let mut body = WrappedBody::new(AsyncBody::from("hello there".to_string())).fuse(); + let result = body.next().await.unwrap().unwrap(); + assert!(body.next().await.is_none()); + assert_eq!(str::from_utf8(&result).unwrap(), "hello there"); + } +}