Fix an issue where LLM requests would block forever (#18830)

Mikayla Maki and Marshall Bowers created

Release Notes:

- N/A

---------

Co-authored-by: Marshall Bowers <elliott.codes@gmail.com>

Change summary

.github/workflows/deploy_collab.yml         |  3 -
crates/reqwest_client/src/reqwest_client.rs | 27 +++++++++++++++++++++-
2 files changed, 26 insertions(+), 4 deletions(-)

Detailed changes

.github/workflows/deploy_collab.yml 🔗

@@ -3,8 +3,7 @@ name: Publish Collab Server Image
 on:
   push:
     tags:
-      # Pause production deploys while we investigate an issue.
-      # - collab-production
+      - collab-production
       - collab-staging
 
 env:

crates/reqwest_client/src/reqwest_client.rs 🔗

@@ -163,8 +163,12 @@ impl futures::stream::Stream for WrappedBody {
             WrappedBodyInner::SyncReader(cursor) => {
                 let mut buf = Vec::new();
                 match cursor.read_to_end(&mut buf) {
-                    Ok(_) => {
-                        return Poll::Ready(Some(Ok(Bytes::from(buf))));
+                    Ok(bytes) => {
+                        if bytes == 0 {
+                            return Poll::Ready(None);
+                        } else {
+                            return Poll::Ready(Some(Ok(Bytes::from(buf))));
+                        }
                     }
                     Err(e) => return Poll::Ready(Some(Err(e))),
                 }
@@ -234,3 +238,22 @@ impl http_client::HttpClient for ReqwestClient {
         .boxed()
     }
 }
+
+#[cfg(test)]
+mod test {
+
+    use core::str;
+
+    use http_client::AsyncBody;
+    use smol::stream::StreamExt;
+
+    use crate::WrappedBody;
+
+    #[tokio::test]
+    async fn test_sync_streaming_upload() {
+        let mut body = WrappedBody::new(AsyncBody::from("hello there".to_string())).fuse();
+        let result = body.next().await.unwrap().unwrap();
+        assert!(body.next().await.is_none());
+        assert_eq!(str::from_utf8(&result).unwrap(), "hello there");
+    }
+}