Fix an issue with using non-reusable body types with redirects (#19134)

Mikayla Maki created

Closes #19131
Closes #19039

fixes the broken auto-updater.

I had the bright idea of using streams as the most common unit of data
transfer. Unfortunately, streams are not re-usable. So HTTP redirects
that have a stream body (like our remote server and auto update
downloads), don't redirect, as they can't reuse the stream. This PR
fixes the problem and simplifies the AsyncBody implementation now that
we're not using Isahc.

Release Notes:

- N/A

Change summary

Cargo.lock                                    |  2 
Cargo.toml                                    |  1 
crates/http_client/Cargo.toml                 |  2 
crates/http_client/src/async_body.rs          | 57 +++++++++++---------
crates/http_client/src/http_client.rs         |  3 
crates/project/src/worktree_store.rs          |  3 +
crates/recent_projects/src/ssh_connections.rs | 17 +++++
crates/reqwest_client/Cargo.toml              |  2 
crates/reqwest_client/src/reqwest_client.rs   | 39 -------------
9 files changed, 59 insertions(+), 67 deletions(-)

Detailed changes

Cargo.lock 🔗

@@ -5529,13 +5529,13 @@ name = "http_client"
 version = "0.1.0"
 dependencies = [
  "anyhow",
+ "bytes 1.7.1",
  "derive_more",
  "futures 0.3.30",
  "http 1.1.0",
  "log",
  "serde",
  "serde_json",
- "smol",
  "url",
 ]
 

Cargo.toml 🔗

@@ -336,6 +336,7 @@ blade-graphics = { git = "https://github.com/kvark/blade", rev = "e142a3a5e678eb
 blade-macros = { git = "https://github.com/kvark/blade", rev = "e142a3a5e678eb6a13e642ad8401b1f3aa38e969" }
 blade-util = { git = "https://github.com/kvark/blade", rev = "e142a3a5e678eb6a13e642ad8401b1f3aa38e969" }
 blake3 = "1.5.3"
+bytes = "1.0"
 cargo_metadata = "0.18"
 cargo_toml = "0.20"
 chrono = { version = "0.4", features = ["serde"] }

crates/http_client/Cargo.toml 🔗

@@ -16,6 +16,7 @@ path = "src/http_client.rs"
 doctest = true
 
 [dependencies]
+bytes.workspace = true
 anyhow.workspace = true
 derive_more.workspace = true
 futures.workspace = true
@@ -23,5 +24,4 @@ http = "1.1"
 log.workspace = true
 serde.workspace = true
 serde_json.workspace = true
-smol.workspace = true
 url.workspace = true

crates/http_client/src/async_body.rs 🔗

@@ -1,6 +1,11 @@
-use std::{borrow::Cow, io::Read, pin::Pin, task::Poll};
+use std::{
+    io::{Cursor, Read},
+    pin::Pin,
+    task::Poll,
+};
 
-use futures::{AsyncRead, AsyncReadExt};
+use bytes::Bytes;
+use futures::AsyncRead;
 
 /// Based on the implementation of AsyncBody in
 /// https://github.com/sagebind/isahc/blob/5c533f1ef4d6bdf1fd291b5103c22110f41d0bf0/src/body/mod.rs
@@ -11,7 +16,7 @@ pub enum Inner {
     Empty,
 
     /// A body stored in memory.
-    SyncReader(std::io::Cursor<Cow<'static, [u8]>>),
+    Bytes(std::io::Cursor<Bytes>),
 
     /// An asynchronous reader.
     AsyncReader(Pin<Box<dyn futures::AsyncRead + Send + Sync>>),
@@ -32,6 +37,10 @@ impl AsyncBody {
     {
         Self(Inner::AsyncReader(Box::pin(read)))
     }
+
+    pub fn from_bytes(bytes: Bytes) -> Self {
+        Self(Inner::Bytes(Cursor::new(bytes.clone())))
+    }
 }
 
 impl Default for AsyncBody {
@@ -46,27 +55,35 @@ impl From<()> for AsyncBody {
     }
 }
 
-impl From<Vec<u8>> for AsyncBody {
-    fn from(body: Vec<u8>) -> Self {
-        Self(Inner::SyncReader(std::io::Cursor::new(Cow::Owned(body))))
+impl From<Bytes> for AsyncBody {
+    fn from(bytes: Bytes) -> Self {
+        Self::from_bytes(bytes)
     }
 }
 
-impl From<&'_ [u8]> for AsyncBody {
-    fn from(body: &[u8]) -> Self {
-        body.to_vec().into()
+impl From<Vec<u8>> for AsyncBody {
+    fn from(body: Vec<u8>) -> Self {
+        Self::from_bytes(body.into())
     }
 }
 
 impl From<String> for AsyncBody {
     fn from(body: String) -> Self {
-        body.into_bytes().into()
+        Self::from_bytes(body.into())
     }
 }
 
-impl From<&'_ str> for AsyncBody {
-    fn from(body: &str) -> Self {
-        body.as_bytes().into()
+impl From<&'static [u8]> for AsyncBody {
+    #[inline]
+    fn from(s: &'static [u8]) -> Self {
+        Self::from_bytes(Bytes::from_static(s))
+    }
+}
+
+impl From<&'static str> for AsyncBody {
+    #[inline]
+    fn from(s: &'static str) -> Self {
+        Self::from_bytes(Bytes::from_static(s.as_bytes()))
     }
 }
 
@@ -74,17 +91,7 @@ impl<T: Into<Self>> From<Option<T>> for AsyncBody {
     fn from(body: Option<T>) -> Self {
         match body {
             Some(body) => body.into(),
-            None => Self(Inner::Empty),
-        }
-    }
-}
-
-impl std::io::Read for AsyncBody {
-    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
-        match &mut self.0 {
-            Inner::Empty => Ok(0),
-            Inner::SyncReader(cursor) => cursor.read(buf),
-            Inner::AsyncReader(async_reader) => smol::block_on(async_reader.read(buf)),
+            None => Self::empty(),
         }
     }
 }
@@ -100,7 +107,7 @@ impl futures::AsyncRead for AsyncBody {
         match inner {
             Inner::Empty => Poll::Ready(Ok(0)),
             // Blocking call is over an in-memory buffer
-            Inner::SyncReader(cursor) => Poll::Ready(cursor.read(buf)),
+            Inner::Bytes(cursor) => Poll::Ready(cursor.read(buf)),
             Inner::AsyncReader(async_reader) => {
                 AsyncRead::poll_read(async_reader.as_mut(), cx, buf)
             }

crates/http_client/src/http_client.rs 🔗

@@ -17,8 +17,9 @@ use std::{
 };
 pub use url::Url;
 
-#[derive(Clone)]
+#[derive(Clone, Debug)]
 pub struct ReadTimeout(pub Duration);
+
 impl Default for ReadTimeout {
     fn default() -> Self {
         Self(Duration::from_secs(5))

crates/project/src/worktree_store.rs 🔗

@@ -231,6 +231,9 @@ impl WorktreeStore {
         if abs_path.starts_with("/~") {
             abs_path = abs_path[1..].to_string();
         }
+        if abs_path.is_empty() {
+            abs_path = "~/".to_string();
+        }
         let root_name = PathBuf::from(abs_path.clone())
             .file_name()
             .unwrap()

crates/recent_projects/src/ssh_connections.rs 🔗

@@ -515,9 +515,24 @@ pub async fn open_ssh_project(
 
     let did_open_ssh_project = cx
         .update(|cx| {
-            workspace::open_ssh_project(window, connection_options, delegate, app_state, paths, cx)
+            workspace::open_ssh_project(
+                window,
+                connection_options,
+                delegate.clone(),
+                app_state,
+                paths,
+                cx,
+            )
         })?
         .await;
 
+    let did_open_ssh_project = match did_open_ssh_project {
+        Ok(ok) => Ok(ok),
+        Err(e) => {
+            delegate.update_error(e.to_string(), cx);
+            Err(e)
+        }
+    };
+
     did_open_ssh_project
 }

crates/reqwest_client/Cargo.toml 🔗

@@ -21,7 +21,7 @@ path = "examples/client.rs"
 
 [dependencies]
 anyhow.workspace = true
-bytes = "1.0"
+bytes.workspace = true
 futures.workspace = true
 http_client.workspace = true
 serde.workspace = true

crates/reqwest_client/src/reqwest_client.rs 🔗

@@ -1,4 +1,4 @@
-use std::{any::type_name, borrow::Cow, io::Read, mem, pin::Pin, sync::OnceLock, task::Poll};
+use std::{any::type_name, mem, pin::Pin, sync::OnceLock, task::Poll};
 
 use anyhow::anyhow;
 use bytes::{BufMut, Bytes, BytesMut};
@@ -173,39 +173,6 @@ pub fn poll_read_buf(
     Poll::Ready(Ok(n))
 }
 
-struct SyncReader {
-    cursor: Option<std::io::Cursor<Cow<'static, [u8]>>>,
-}
-
-impl SyncReader {
-    fn new(cursor: std::io::Cursor<Cow<'static, [u8]>>) -> Self {
-        Self {
-            cursor: Some(cursor),
-        }
-    }
-}
-
-impl futures::stream::Stream for SyncReader {
-    type Item = Result<Bytes, std::io::Error>;
-
-    fn poll_next(
-        mut self: std::pin::Pin<&mut Self>,
-        _cx: &mut std::task::Context<'_>,
-    ) -> std::task::Poll<Option<Self::Item>> {
-        let Some(mut cursor) = self.cursor.take() else {
-            return Poll::Ready(None);
-        };
-
-        let mut buf = Vec::new();
-        match cursor.read_to_end(&mut buf) {
-            Ok(_) => {
-                return Poll::Ready(Some(Ok(Bytes::from(buf))));
-            }
-            Err(e) => return Poll::Ready(Some(Err(e))),
-        }
-    }
-}
-
 impl http_client::HttpClient for ReqwestClient {
     fn proxy(&self) -> Option<&http::Uri> {
         self.proxy.as_ref()
@@ -238,9 +205,7 @@ impl http_client::HttpClient for ReqwestClient {
         }
         let request = request.body(match body.0 {
             http_client::Inner::Empty => reqwest::Body::default(),
-            http_client::Inner::SyncReader(cursor) => {
-                reqwest::Body::wrap_stream(SyncReader::new(cursor))
-            }
+            http_client::Inner::Bytes(cursor) => cursor.into_inner().into(),
             http_client::Inner::AsyncReader(stream) => {
                 reqwest::Body::wrap_stream(StreamReader::new(stream))
             }