From d5b1fafb1f9eff32e33bb990329e3c05b0e91774 Mon Sep 17 00:00:00 2001 From: Max Brunsfeld Date: Thu, 18 Dec 2025 12:49:04 -0800 Subject: [PATCH] Use async streaming when detecting matches --- crates/dap_adapters/src/go.rs | 17 ++++++++--- crates/extension_host/src/wasm_host.rs | 4 ++- crates/fs/src/fs.rs | 12 ++++---- crates/project/src/project_search.rs | 25 ++++++++-------- crates/project/src/search.rs | 41 +++++++++++++------------- crates/project/src/worktree_store.rs | 11 ++++--- 6 files changed, 62 insertions(+), 48 deletions(-) diff --git a/crates/dap_adapters/src/go.rs b/crates/dap_adapters/src/go.rs index d3253d5fe250f7228ebddec15a691ac650a19c89..3c6752ed3ebfe940e417efc441e75b7aa92efc89 100644 --- a/crates/dap_adapters/src/go.rs +++ b/crates/dap_adapters/src/go.rs @@ -8,6 +8,7 @@ use dap::{ }, }; use fs::Fs; +use futures::AsyncReadExt as _; use gpui::{AsyncApp, SharedString}; use language::LanguageName; use log::warn; @@ -557,10 +558,18 @@ async fn handle_envs( continue; }; - if let Ok(file) = fs.open_sync(&path).await { - let file_envs: HashMap = dotenvy::from_read_iter(file) - .filter_map(Result::ok) - .collect(); + if let Ok(mut file) = fs.open_read(&path).await { + let mut bytes = Vec::new(); + if file.read_to_end(&mut bytes).await.is_err() { + warn!("While starting Go debug session: failed to read env file {path:?}"); + continue; + } + + let file_envs: HashMap = + dotenvy::from_read_iter(std::io::Cursor::new(bytes)) + .filter_map(Result::ok) + .collect(); + envs.extend(file_envs.iter().map(|(k, v)| (k.clone(), v.clone()))); env_vars.extend(file_envs); } else { diff --git a/crates/extension_host/src/wasm_host.rs b/crates/extension_host/src/wasm_host.rs index a6e5768f16243ce6c6a4d250002e29d5db06a071..0f92ab60847015d6a0fbebed4ec0318ced7c507e 100644 --- a/crates/extension_host/src/wasm_host.rs +++ b/crates/extension_host/src/wasm_host.rs @@ -12,6 +12,7 @@ use extension::{ WorktreeDelegate, }; use fs::{Fs, normalize_path}; +use futures::AsyncReadExt; use futures::future::LocalBoxFuture; use futures::{ Future, FutureExt, StreamExt as _, @@ -763,13 +764,14 @@ impl WasmExtension { let mut wasm_file = wasm_host .fs - .open_sync(&path) + .open_read(&path) .await .context(format!("opening wasm file, path: {path:?}"))?; let mut wasm_bytes = Vec::new(); wasm_file .read_to_end(&mut wasm_bytes) + .await .context(format!("reading wasm file, path: {path:?}"))?; wasm_host diff --git a/crates/fs/src/fs.rs b/crates/fs/src/fs.rs index 2cbbf61a21e145464e9dbec01ace3b5510709d0d..24e224820c774d430ccea2794a898bcae05fab81 100644 --- a/crates/fs/src/fs.rs +++ b/crates/fs/src/fs.rs @@ -66,6 +66,8 @@ use std::ffi::OsStr; #[cfg(any(test, feature = "test-support"))] pub use fake_git_repo::{LOAD_HEAD_TEXT_TASK, LOAD_INDEX_TEXT_TASK}; +pub type AsyncReadBox = Pin>; + pub trait Watcher: Send + Sync { fn add(&self, path: &Path) -> Result<()>; fn remove(&self, path: &Path) -> Result<()>; @@ -116,7 +118,7 @@ pub trait Fs: Send + Sync { self.remove_file(path, options).await } async fn open_handle(&self, path: &Path) -> Result>; - async fn open_sync(&self, path: &Path) -> Result>; + async fn open_read(&self, path: &Path) -> Result; async fn load(&self, path: &Path) -> Result { Ok(String::from_utf8(self.load_bytes(path).await?)?) } @@ -732,8 +734,8 @@ impl Fs for RealFs { Ok(()) } - async fn open_sync(&self, path: &Path) -> Result> { - Ok(Box::new(std::fs::File::open(path)?)) + async fn open_read(&self, path: &Path) -> Result { + Ok(Box::pin(smol::fs::File::open(path).await?)) } async fn open_handle(&self, path: &Path) -> Result> { @@ -2530,9 +2532,9 @@ impl Fs for FakeFs { Ok(()) } - async fn open_sync(&self, path: &Path) -> Result> { + async fn open_read(&self, path: &Path) -> Result { let bytes = self.load_internal(path).await?; - Ok(Box::new(io::Cursor::new(bytes))) + Ok(Box::pin(futures::io::Cursor::new(bytes))) } async fn open_handle(&self, path: &Path) -> Result> { diff --git a/crates/project/src/project_search.rs b/crates/project/src/project_search.rs index a08d75e3a1a1b4c776d68d1397acec6c19495fda..1aec41b7f9d3d74788286834909d321d5043f9c3 100644 --- a/crates/project/src/project_search.rs +++ b/crates/project/src/project_search.rs @@ -1,7 +1,6 @@ use std::{ cell::LazyCell, collections::BTreeSet, - io::{BufReader, Cursor, Read}, ops::Range, path::{Path, PathBuf}, pin::pin, @@ -11,7 +10,7 @@ use std::{ use anyhow::Context; use collections::HashSet; use fs::Fs; -use futures::{SinkExt, StreamExt, select_biased, stream::FuturesOrdered}; +use futures::{AsyncBufReadExt, SinkExt, StreamExt, select_biased, stream::FuturesOrdered}; use gpui::{App, AppContext, AsyncApp, Entity, Task}; use language::{Buffer, BufferSnapshot}; use parking_lot::Mutex; @@ -685,17 +684,22 @@ impl RequestHandler<'_> { _ = (async move || -> anyhow::Result<()> { let abs_path = entry.worktree_root.join(entry.path.path.as_std_path()); - // Avoid blocking IO here: cancellation of the search is implemented via task drop, and a - // synchronous `std::fs::File::open` / `Read::read` can delay task cancellation for a long time. - let contents = self + let file = self .fs .context("Trying to query filesystem in remote project search")? - .load_bytes(&abs_path) + .open_read(&abs_path) .await?; + let mut file = futures::io::BufReader::new(file); + // Before attempting to match the file content, throw away files that have invalid UTF-8 sequences early on; - // That way we can still match files without having to look at "obviously binary" files. - if let Err(error) = std::str::from_utf8(&contents) { + // That way we can still match files in a streaming fashion without having look at "obviously binary" files. + // + // Use `fill_buf` so we can "peek" without consuming bytes, and reuse the same stream for detection. + let buffer = file.fill_buf().await?; + let prefix_len = buffer.len().min(8192); + + if let Err(error) = std::str::from_utf8(&buffer[..prefix_len]) { if let Some(starting_position) = error.error_len() { log::debug!( "Invalid UTF-8 sequence in file {abs_path:?} at byte position {starting_position}" @@ -704,10 +708,7 @@ impl RequestHandler<'_> { } } - let file: Box = Box::new(Cursor::new(contents)); - let file = BufReader::new(file); - - if self.query.detect(file).unwrap_or(false) { + if self.query.detect(file).await.unwrap_or(false) { // Yes, we should scan the whole file. entry.should_scan_tx.send(entry.path).await?; } diff --git a/crates/project/src/search.rs b/crates/project/src/search.rs index bb37ba2111db459b808434d58fa9a9c4d973c0b1..cbf754a8f8847cc5651764738ab06e16c2e0c5a5 100644 --- a/crates/project/src/search.rs +++ b/crates/project/src/search.rs @@ -2,13 +2,13 @@ use aho_corasick::{AhoCorasick, AhoCorasickBuilder}; use anyhow::Result; use client::proto; use fancy_regex::{Captures, Regex, RegexBuilder}; +use futures::{AsyncBufRead, AsyncBufReadExt, AsyncReadExt}; use gpui::Entity; use itertools::Itertools as _; use language::{Buffer, BufferSnapshot, CharKind}; use smol::future::yield_now; use std::{ borrow::Cow, - io::{BufRead, BufReader, Read}, ops::Range, sync::{Arc, LazyLock}, }; @@ -326,37 +326,38 @@ impl SearchQuery { } } - pub(crate) fn detect( - &self, - mut reader: BufReader>, - ) -> Result { - if self.as_str().is_empty() { + pub(crate) async fn detect(&self, mut reader: impl AsyncBufRead + Unpin) -> Result { + let query_str = self.as_str(); + let needle_len = query_str.as_bytes().len(); + if needle_len == 0 { return Ok(false); } + let mut text = String::new(); match self { Self::Text { search, .. } => { - let mat = search.stream_find_iter(reader).next(); - match mat { - Some(Ok(_)) => Ok(true), - Some(Err(err)) => Err(err.into()), - None => Ok(false), + if query_str.contains('\n') { + reader.read_to_string(&mut text).await?; + Ok(search.find(&text).is_some()) + } else { + while reader.read_line(&mut text).await? > 0 { + if search.find(&text).is_some() { + return Ok(true); + } + text.clear(); + } + Ok(false) } } Self::Regex { regex, multiline, .. } => { if *multiline { - let mut text = String::new(); - if let Err(err) = reader.read_to_string(&mut text) { - Err(err.into()) - } else { - Ok(regex.find(&text)?.is_some()) - } + reader.read_to_string(&mut text).await?; + Ok(regex.find(&text)?.is_some()) } else { - for line in reader.lines() { - let line = line?; - if regex.find(&line)?.is_some() { + while reader.read_line(&mut text).await? > 0 { + if regex.find(&text)?.is_some() { return Ok(true); } } diff --git a/crates/project/src/worktree_store.rs b/crates/project/src/worktree_store.rs index 7c3eabd609c5efd79d506e8c62384bcb6cc16b52..d31193ace16ddee517072ee3b8ee8559b950b2f1 100644 --- a/crates/project/src/worktree_store.rs +++ b/crates/project/src/worktree_store.rs @@ -1,5 +1,4 @@ use std::{ - io::{BufRead, BufReader}, path::{Path, PathBuf}, pin::pin, sync::{Arc, atomic::AtomicUsize}, @@ -8,7 +7,7 @@ use std::{ use anyhow::{Context as _, Result, anyhow, bail}; use collections::{HashMap, HashSet}; use fs::{Fs, copy_recursive}; -use futures::{FutureExt, SinkExt, future::Shared}; +use futures::{AsyncBufReadExt as _, FutureExt, SinkExt, future::Shared, io::BufReader}; use gpui::{ App, AppContext as _, AsyncApp, Context, Entity, EntityId, EventEmitter, Task, WeakEntity, }; @@ -1024,15 +1023,15 @@ impl WorktreeStore { let mut input = pin!(input); while let Some(mut entry) = input.next().await { let abs_path = entry.worktree_root.join(entry.path.path.as_std_path()); - let Some(file) = fs.open_sync(&abs_path).await.log_err() else { + let Some(file) = fs.open_read(&abs_path).await.log_err() else { continue; }; let mut file = BufReader::new(file); - let file_start = file.fill_buf()?; + let file_start = file.fill_buf().await?; if let Err(Some(starting_position)) = - std::str::from_utf8(file_start).map_err(|e| e.error_len()) + std::str::from_utf8(&file_start).map_err(|e| e.error_len()) { // Before attempting to match the file content, throw away files that have invalid UTF-8 sequences early on; // That way we can still match files in a streaming fashion without having look at "obviously binary" files. @@ -1042,7 +1041,7 @@ impl WorktreeStore { continue; } - if query.detect(file).unwrap_or(false) { + if query.detect(file).await.unwrap_or(false) { entry.respond.send(entry.path).await? } }