Use async streaming when detecting matches

Max Brunsfeld created

Change summary

crates/dap_adapters/src/go.rs          | 17 ++++++++--
crates/extension_host/src/wasm_host.rs |  4 ++
crates/fs/src/fs.rs                    | 12 ++++---
crates/project/src/project_search.rs   | 25 ++++++++--------
crates/project/src/search.rs           | 41 ++++++++++++++-------------
crates/project/src/worktree_store.rs   | 11 +++----
6 files changed, 62 insertions(+), 48 deletions(-)

Detailed changes

crates/dap_adapters/src/go.rs 🔗

@@ -8,6 +8,7 @@ use dap::{
     },
 };
 use fs::Fs;
+use futures::AsyncReadExt as _;
 use gpui::{AsyncApp, SharedString};
 use language::LanguageName;
 use log::warn;
@@ -557,10 +558,18 @@ async fn handle_envs(
             continue;
         };
 
-        if let Ok(file) = fs.open_sync(&path).await {
-            let file_envs: HashMap<String, String> = dotenvy::from_read_iter(file)
-                .filter_map(Result::ok)
-                .collect();
+        if let Ok(mut file) = fs.open_read(&path).await {
+            let mut bytes = Vec::new();
+            if file.read_to_end(&mut bytes).await.is_err() {
+                warn!("While starting Go debug session: failed to read env file {path:?}");
+                continue;
+            }
+
+            let file_envs: HashMap<String, String> =
+                dotenvy::from_read_iter(std::io::Cursor::new(bytes))
+                    .filter_map(Result::ok)
+                    .collect();
+
             envs.extend(file_envs.iter().map(|(k, v)| (k.clone(), v.clone())));
             env_vars.extend(file_envs);
         } else {

crates/extension_host/src/wasm_host.rs 🔗

@@ -12,6 +12,7 @@ use extension::{
     WorktreeDelegate,
 };
 use fs::{Fs, normalize_path};
+use futures::AsyncReadExt;
 use futures::future::LocalBoxFuture;
 use futures::{
     Future, FutureExt, StreamExt as _,
@@ -763,13 +764,14 @@ impl WasmExtension {
 
         let mut wasm_file = wasm_host
             .fs
-            .open_sync(&path)
+            .open_read(&path)
             .await
             .context(format!("opening wasm file, path: {path:?}"))?;
 
         let mut wasm_bytes = Vec::new();
         wasm_file
             .read_to_end(&mut wasm_bytes)
+            .await
             .context(format!("reading wasm file, path: {path:?}"))?;
 
         wasm_host

crates/fs/src/fs.rs 🔗

@@ -66,6 +66,8 @@ use std::ffi::OsStr;
 #[cfg(any(test, feature = "test-support"))]
 pub use fake_git_repo::{LOAD_HEAD_TEXT_TASK, LOAD_INDEX_TEXT_TASK};
 
+pub type AsyncReadBox = Pin<Box<dyn AsyncRead + Send>>;
+
 pub trait Watcher: Send + Sync {
     fn add(&self, path: &Path) -> Result<()>;
     fn remove(&self, path: &Path) -> Result<()>;
@@ -116,7 +118,7 @@ pub trait Fs: Send + Sync {
         self.remove_file(path, options).await
     }
     async fn open_handle(&self, path: &Path) -> Result<Arc<dyn FileHandle>>;
-    async fn open_sync(&self, path: &Path) -> Result<Box<dyn io::Read + Send + Sync>>;
+    async fn open_read(&self, path: &Path) -> Result<AsyncReadBox>;
     async fn load(&self, path: &Path) -> Result<String> {
         Ok(String::from_utf8(self.load_bytes(path).await?)?)
     }
@@ -732,8 +734,8 @@ impl Fs for RealFs {
         Ok(())
     }
 
-    async fn open_sync(&self, path: &Path) -> Result<Box<dyn io::Read + Send + Sync>> {
-        Ok(Box::new(std::fs::File::open(path)?))
+    async fn open_read(&self, path: &Path) -> Result<AsyncReadBox> {
+        Ok(Box::pin(smol::fs::File::open(path).await?))
     }
 
     async fn open_handle(&self, path: &Path) -> Result<Arc<dyn FileHandle>> {
@@ -2530,9 +2532,9 @@ impl Fs for FakeFs {
         Ok(())
     }
 
-    async fn open_sync(&self, path: &Path) -> Result<Box<dyn io::Read + Send + Sync>> {
+    async fn open_read(&self, path: &Path) -> Result<AsyncReadBox> {
         let bytes = self.load_internal(path).await?;
-        Ok(Box::new(io::Cursor::new(bytes)))
+        Ok(Box::pin(futures::io::Cursor::new(bytes)))
     }
 
     async fn open_handle(&self, path: &Path) -> Result<Arc<dyn FileHandle>> {

crates/project/src/project_search.rs 🔗

@@ -1,7 +1,6 @@
 use std::{
     cell::LazyCell,
     collections::BTreeSet,
-    io::{BufReader, Cursor, Read},
     ops::Range,
     path::{Path, PathBuf},
     pin::pin,
@@ -11,7 +10,7 @@ use std::{
 use anyhow::Context;
 use collections::HashSet;
 use fs::Fs;
-use futures::{SinkExt, StreamExt, select_biased, stream::FuturesOrdered};
+use futures::{AsyncBufReadExt, SinkExt, StreamExt, select_biased, stream::FuturesOrdered};
 use gpui::{App, AppContext, AsyncApp, Entity, Task};
 use language::{Buffer, BufferSnapshot};
 use parking_lot::Mutex;
@@ -685,17 +684,22 @@ impl RequestHandler<'_> {
         _ = (async move || -> anyhow::Result<()> {
             let abs_path = entry.worktree_root.join(entry.path.path.as_std_path());
 
-            // Avoid blocking IO here: cancellation of the search is implemented via task drop, and a
-            // synchronous `std::fs::File::open` / `Read::read` can delay task cancellation for a long time.
-            let contents = self
+            let file = self
                 .fs
                 .context("Trying to query filesystem in remote project search")?
-                .load_bytes(&abs_path)
+                .open_read(&abs_path)
                 .await?;
 
+            let mut file = futures::io::BufReader::new(file);
+
             // Before attempting to match the file content, throw away files that have invalid UTF-8 sequences early on;
-            // That way we can still match files without having to look at "obviously binary" files.
-            if let Err(error) = std::str::from_utf8(&contents) {
+            // That way we can still match files in a streaming fashion without having look at "obviously binary" files.
+            //
+            // Use `fill_buf` so we can "peek" without consuming bytes, and reuse the same stream for detection.
+            let buffer = file.fill_buf().await?;
+            let prefix_len = buffer.len().min(8192);
+
+            if let Err(error) = std::str::from_utf8(&buffer[..prefix_len]) {
                 if let Some(starting_position) = error.error_len() {
                     log::debug!(
                         "Invalid UTF-8 sequence in file {abs_path:?} at byte position {starting_position}"
@@ -704,10 +708,7 @@ impl RequestHandler<'_> {
                 }
             }
 
-            let file: Box<dyn Read + Send + Sync> = Box::new(Cursor::new(contents));
-            let file = BufReader::new(file);
-
-            if self.query.detect(file).unwrap_or(false) {
+            if self.query.detect(file).await.unwrap_or(false) {
                 // Yes, we should scan the whole file.
                 entry.should_scan_tx.send(entry.path).await?;
             }

crates/project/src/search.rs 🔗

@@ -2,13 +2,13 @@ use aho_corasick::{AhoCorasick, AhoCorasickBuilder};
 use anyhow::Result;
 use client::proto;
 use fancy_regex::{Captures, Regex, RegexBuilder};
+use futures::{AsyncBufRead, AsyncBufReadExt, AsyncReadExt};
 use gpui::Entity;
 use itertools::Itertools as _;
 use language::{Buffer, BufferSnapshot, CharKind};
 use smol::future::yield_now;
 use std::{
     borrow::Cow,
-    io::{BufRead, BufReader, Read},
     ops::Range,
     sync::{Arc, LazyLock},
 };
@@ -326,37 +326,38 @@ impl SearchQuery {
         }
     }
 
-    pub(crate) fn detect(
-        &self,
-        mut reader: BufReader<Box<dyn Read + Send + Sync>>,
-    ) -> Result<bool> {
-        if self.as_str().is_empty() {
+    pub(crate) async fn detect(&self, mut reader: impl AsyncBufRead + Unpin) -> Result<bool> {
+        let query_str = self.as_str();
+        let needle_len = query_str.as_bytes().len();
+        if needle_len == 0 {
             return Ok(false);
         }
 
+        let mut text = String::new();
         match self {
             Self::Text { search, .. } => {
-                let mat = search.stream_find_iter(reader).next();
-                match mat {
-                    Some(Ok(_)) => Ok(true),
-                    Some(Err(err)) => Err(err.into()),
-                    None => Ok(false),
+                if query_str.contains('\n') {
+                    reader.read_to_string(&mut text).await?;
+                    Ok(search.find(&text).is_some())
+                } else {
+                    while reader.read_line(&mut text).await? > 0 {
+                        if search.find(&text).is_some() {
+                            return Ok(true);
+                        }
+                        text.clear();
+                    }
+                    Ok(false)
                 }
             }
             Self::Regex {
                 regex, multiline, ..
             } => {
                 if *multiline {
-                    let mut text = String::new();
-                    if let Err(err) = reader.read_to_string(&mut text) {
-                        Err(err.into())
-                    } else {
-                        Ok(regex.find(&text)?.is_some())
-                    }
+                    reader.read_to_string(&mut text).await?;
+                    Ok(regex.find(&text)?.is_some())
                 } else {
-                    for line in reader.lines() {
-                        let line = line?;
-                        if regex.find(&line)?.is_some() {
+                    while reader.read_line(&mut text).await? > 0 {
+                        if regex.find(&text)?.is_some() {
                             return Ok(true);
                         }
                     }

crates/project/src/worktree_store.rs 🔗

@@ -1,5 +1,4 @@
 use std::{
-    io::{BufRead, BufReader},
     path::{Path, PathBuf},
     pin::pin,
     sync::{Arc, atomic::AtomicUsize},
@@ -8,7 +7,7 @@ use std::{
 use anyhow::{Context as _, Result, anyhow, bail};
 use collections::{HashMap, HashSet};
 use fs::{Fs, copy_recursive};
-use futures::{FutureExt, SinkExt, future::Shared};
+use futures::{AsyncBufReadExt as _, FutureExt, SinkExt, future::Shared, io::BufReader};
 use gpui::{
     App, AppContext as _, AsyncApp, Context, Entity, EntityId, EventEmitter, Task, WeakEntity,
 };
@@ -1024,15 +1023,15 @@ impl WorktreeStore {
         let mut input = pin!(input);
         while let Some(mut entry) = input.next().await {
             let abs_path = entry.worktree_root.join(entry.path.path.as_std_path());
-            let Some(file) = fs.open_sync(&abs_path).await.log_err() else {
+            let Some(file) = fs.open_read(&abs_path).await.log_err() else {
                 continue;
             };
 
             let mut file = BufReader::new(file);
-            let file_start = file.fill_buf()?;
+            let file_start = file.fill_buf().await?;
 
             if let Err(Some(starting_position)) =
-                std::str::from_utf8(file_start).map_err(|e| e.error_len())
+                std::str::from_utf8(&file_start).map_err(|e| e.error_len())
             {
                 // Before attempting to match the file content, throw away files that have invalid UTF-8 sequences early on;
                 // That way we can still match files in a streaming fashion without having look at "obviously binary" files.
@@ -1042,7 +1041,7 @@ impl WorktreeStore {
                 continue;
             }
 
-            if query.detect(file).unwrap_or(false) {
+            if query.detect(file).await.unwrap_or(false) {
                 entry.respond.send(entry.path).await?
             }
         }