From b2e844f2ecd66efbf54e1b899d6e8647b8d5795a Mon Sep 17 00:00:00 2001 From: Mikayla Maki Date: Sat, 12 Oct 2024 13:32:08 -0700 Subject: [PATCH] Fix an issue with using non-reusable body types with redirects (#19134) Closes #19131 Closes #19039 fixes the broken auto-updater. I had the bright idea of using streams as the most common unit of data transfer. Unfortunately, streams are not re-usable. So HTTP redirects that have a stream body (like our remote server and auto update downloads), don't redirect, as they can't reuse the stream. This PR fixes the problem and simplifies the AsyncBody implementation now that we're not using Isahc. Release Notes: - N/A --- Cargo.lock | 2 +- Cargo.toml | 1 + crates/http_client/Cargo.toml | 2 +- crates/http_client/src/async_body.rs | 57 +++++++++++-------- crates/http_client/src/http_client.rs | 3 +- crates/project/src/worktree_store.rs | 3 + crates/recent_projects/src/ssh_connections.rs | 17 +++++- crates/reqwest_client/Cargo.toml | 2 +- crates/reqwest_client/src/reqwest_client.rs | 39 +------------ 9 files changed, 59 insertions(+), 67 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 28f38a0d1e22db95ec0f27908a278e3dc1492fc6..0055b1133f02325b7b03b783967dc1f0c7831299 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5529,13 +5529,13 @@ name = "http_client" version = "0.1.0" dependencies = [ "anyhow", + "bytes 1.7.1", "derive_more", "futures 0.3.30", "http 1.1.0", "log", "serde", "serde_json", - "smol", "url", ] diff --git a/Cargo.toml b/Cargo.toml index ac66fe369fa5c358652976bdcbad7dbab0590add..c6f7f95cc4ed7ca62da358002fd4ec1fe3fa5053 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -336,6 +336,7 @@ blade-graphics = { git = "https://github.com/kvark/blade", rev = "e142a3a5e678eb blade-macros = { git = "https://github.com/kvark/blade", rev = "e142a3a5e678eb6a13e642ad8401b1f3aa38e969" } blade-util = { git = "https://github.com/kvark/blade", rev = "e142a3a5e678eb6a13e642ad8401b1f3aa38e969" } blake3 = "1.5.3" +bytes = "1.0" cargo_metadata = "0.18" cargo_toml = "0.20" chrono = { version = "0.4", features = ["serde"] } diff --git a/crates/http_client/Cargo.toml b/crates/http_client/Cargo.toml index 2156f4d37a9d08dc68f0cc56ce721dafa0a154ab..ac8e254b84f60d2e34de07d79273fbb9e667e948 100644 --- a/crates/http_client/Cargo.toml +++ b/crates/http_client/Cargo.toml @@ -16,6 +16,7 @@ path = "src/http_client.rs" doctest = true [dependencies] +bytes.workspace = true anyhow.workspace = true derive_more.workspace = true futures.workspace = true @@ -23,5 +24,4 @@ http = "1.1" log.workspace = true serde.workspace = true serde_json.workspace = true -smol.workspace = true url.workspace = true diff --git a/crates/http_client/src/async_body.rs b/crates/http_client/src/async_body.rs index e2544f60fe5404a29347530064c4b13a9b04e117..6e3bfe4d3cb5ca5836f168398010b69f05fa112a 100644 --- a/crates/http_client/src/async_body.rs +++ b/crates/http_client/src/async_body.rs @@ -1,6 +1,11 @@ -use std::{borrow::Cow, io::Read, pin::Pin, task::Poll}; +use std::{ + io::{Cursor, Read}, + pin::Pin, + task::Poll, +}; -use futures::{AsyncRead, AsyncReadExt}; +use bytes::Bytes; +use futures::AsyncRead; /// Based on the implementation of AsyncBody in /// https://github.com/sagebind/isahc/blob/5c533f1ef4d6bdf1fd291b5103c22110f41d0bf0/src/body/mod.rs @@ -11,7 +16,7 @@ pub enum Inner { Empty, /// A body stored in memory. - SyncReader(std::io::Cursor>), + Bytes(std::io::Cursor), /// An asynchronous reader. AsyncReader(Pin>), @@ -32,6 +37,10 @@ impl AsyncBody { { Self(Inner::AsyncReader(Box::pin(read))) } + + pub fn from_bytes(bytes: Bytes) -> Self { + Self(Inner::Bytes(Cursor::new(bytes.clone()))) + } } impl Default for AsyncBody { @@ -46,27 +55,35 @@ impl From<()> for AsyncBody { } } -impl From> for AsyncBody { - fn from(body: Vec) -> Self { - Self(Inner::SyncReader(std::io::Cursor::new(Cow::Owned(body)))) +impl From for AsyncBody { + fn from(bytes: Bytes) -> Self { + Self::from_bytes(bytes) } } -impl From<&'_ [u8]> for AsyncBody { - fn from(body: &[u8]) -> Self { - body.to_vec().into() +impl From> for AsyncBody { + fn from(body: Vec) -> Self { + Self::from_bytes(body.into()) } } impl From for AsyncBody { fn from(body: String) -> Self { - body.into_bytes().into() + Self::from_bytes(body.into()) } } -impl From<&'_ str> for AsyncBody { - fn from(body: &str) -> Self { - body.as_bytes().into() +impl From<&'static [u8]> for AsyncBody { + #[inline] + fn from(s: &'static [u8]) -> Self { + Self::from_bytes(Bytes::from_static(s)) + } +} + +impl From<&'static str> for AsyncBody { + #[inline] + fn from(s: &'static str) -> Self { + Self::from_bytes(Bytes::from_static(s.as_bytes())) } } @@ -74,17 +91,7 @@ impl> From> for AsyncBody { fn from(body: Option) -> Self { match body { Some(body) => body.into(), - None => Self(Inner::Empty), - } - } -} - -impl std::io::Read for AsyncBody { - fn read(&mut self, buf: &mut [u8]) -> std::io::Result { - match &mut self.0 { - Inner::Empty => Ok(0), - Inner::SyncReader(cursor) => cursor.read(buf), - Inner::AsyncReader(async_reader) => smol::block_on(async_reader.read(buf)), + None => Self::empty(), } } } @@ -100,7 +107,7 @@ impl futures::AsyncRead for AsyncBody { match inner { Inner::Empty => Poll::Ready(Ok(0)), // Blocking call is over an in-memory buffer - Inner::SyncReader(cursor) => Poll::Ready(cursor.read(buf)), + Inner::Bytes(cursor) => Poll::Ready(cursor.read(buf)), Inner::AsyncReader(async_reader) => { AsyncRead::poll_read(async_reader.as_mut(), cx, buf) } diff --git a/crates/http_client/src/http_client.rs b/crates/http_client/src/http_client.rs index fb2a2a3cf9a9d0fd58ac96889013fcc791af2f70..3d4a41f4a61df140952a873a3501673d70ca853c 100644 --- a/crates/http_client/src/http_client.rs +++ b/crates/http_client/src/http_client.rs @@ -17,8 +17,9 @@ use std::{ }; pub use url::Url; -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct ReadTimeout(pub Duration); + impl Default for ReadTimeout { fn default() -> Self { Self(Duration::from_secs(5)) diff --git a/crates/project/src/worktree_store.rs b/crates/project/src/worktree_store.rs index 66fe14969676eb71ddee82368235f92c22a73078..6e04a884b9187729fe3d4cef124a011d61532882 100644 --- a/crates/project/src/worktree_store.rs +++ b/crates/project/src/worktree_store.rs @@ -231,6 +231,9 @@ impl WorktreeStore { if abs_path.starts_with("/~") { abs_path = abs_path[1..].to_string(); } + if abs_path.is_empty() { + abs_path = "~/".to_string(); + } let root_name = PathBuf::from(abs_path.clone()) .file_name() .unwrap() diff --git a/crates/recent_projects/src/ssh_connections.rs b/crates/recent_projects/src/ssh_connections.rs index af651333398099745ed1f755adfa88834a2a8354..0e33c6d36a73c915c80612687cc26462d83ee868 100644 --- a/crates/recent_projects/src/ssh_connections.rs +++ b/crates/recent_projects/src/ssh_connections.rs @@ -515,9 +515,24 @@ pub async fn open_ssh_project( let did_open_ssh_project = cx .update(|cx| { - workspace::open_ssh_project(window, connection_options, delegate, app_state, paths, cx) + workspace::open_ssh_project( + window, + connection_options, + delegate.clone(), + app_state, + paths, + cx, + ) })? .await; + let did_open_ssh_project = match did_open_ssh_project { + Ok(ok) => Ok(ok), + Err(e) => { + delegate.update_error(e.to_string(), cx); + Err(e) + } + }; + did_open_ssh_project } diff --git a/crates/reqwest_client/Cargo.toml b/crates/reqwest_client/Cargo.toml index 08a57ff4d7d937b4b90081c82806fdb77b1a55fe..ccf7dc35e23d14b28fcb65918e112c096d20e6b6 100644 --- a/crates/reqwest_client/Cargo.toml +++ b/crates/reqwest_client/Cargo.toml @@ -21,7 +21,7 @@ path = "examples/client.rs" [dependencies] anyhow.workspace = true -bytes = "1.0" +bytes.workspace = true futures.workspace = true http_client.workspace = true serde.workspace = true diff --git a/crates/reqwest_client/src/reqwest_client.rs b/crates/reqwest_client/src/reqwest_client.rs index 67df21be3c3ba133877fc8329afaa13d6fa08c05..ade115bcd0a5963b6f03d413db307c0788052209 100644 --- a/crates/reqwest_client/src/reqwest_client.rs +++ b/crates/reqwest_client/src/reqwest_client.rs @@ -1,4 +1,4 @@ -use std::{any::type_name, borrow::Cow, io::Read, mem, pin::Pin, sync::OnceLock, task::Poll}; +use std::{any::type_name, mem, pin::Pin, sync::OnceLock, task::Poll}; use anyhow::anyhow; use bytes::{BufMut, Bytes, BytesMut}; @@ -173,39 +173,6 @@ pub fn poll_read_buf( Poll::Ready(Ok(n)) } -struct SyncReader { - cursor: Option>>, -} - -impl SyncReader { - fn new(cursor: std::io::Cursor>) -> Self { - Self { - cursor: Some(cursor), - } - } -} - -impl futures::stream::Stream for SyncReader { - type Item = Result; - - fn poll_next( - mut self: std::pin::Pin<&mut Self>, - _cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - let Some(mut cursor) = self.cursor.take() else { - return Poll::Ready(None); - }; - - let mut buf = Vec::new(); - match cursor.read_to_end(&mut buf) { - Ok(_) => { - return Poll::Ready(Some(Ok(Bytes::from(buf)))); - } - Err(e) => return Poll::Ready(Some(Err(e))), - } - } -} - impl http_client::HttpClient for ReqwestClient { fn proxy(&self) -> Option<&http::Uri> { self.proxy.as_ref() @@ -238,9 +205,7 @@ impl http_client::HttpClient for ReqwestClient { } let request = request.body(match body.0 { http_client::Inner::Empty => reqwest::Body::default(), - http_client::Inner::SyncReader(cursor) => { - reqwest::Body::wrap_stream(SyncReader::new(cursor)) - } + http_client::Inner::Bytes(cursor) => cursor.into_inner().into(), http_client::Inner::AsyncReader(stream) => { reqwest::Body::wrap_stream(StreamReader::new(stream)) }