Allow project search to be efficiently cancellable

Max Brunsfeld created

Change summary

crates/agent/src/tools/grep_tool.rs                           |  5 
crates/collab/src/tests/integration_tests.rs                  |  4 
crates/collab/src/tests/random_project_collaboration_tests.rs |  5 
crates/project/src/project.rs                                 | 12 
crates/project/src/project_search.rs                          | 57 ++--
crates/project/src/project_tests.rs                           |  5 
crates/project_benchmarks/src/main.rs                         |  4 
crates/remote_server/src/headless_project.rs                  |  6 
crates/remote_server/src/remote_editing_tests.rs              |  6 
crates/search/src/project_search.rs                           |  6 
10 files changed, 76 insertions(+), 34 deletions(-)

Detailed changes

crates/agent/src/tools/grep_tool.rs 🔗

@@ -170,12 +170,15 @@ impl AgentTool for GrepTool {
             Err(error) => return Task::ready(Err(error)),
         };
 
-        let results = self
+        let (results, search_task) = self
             .project
             .update(cx, |project, cx| project.search(query, cx));
 
         let project = self.project.downgrade();
         cx.spawn(async move |cx|  {
+            // Keep the search alive for the duration of result iteration. Dropping this task is the
+            // cancellation mechanism; we intentionally do not detach it.
+            let _search_task = search_task;
             futures::pin_mut!(results);
 
             let mut output = String::new();

crates/collab/src/tests/integration_tests.rs 🔗

@@ -5179,7 +5179,7 @@ async fn test_project_search(
 
     // Perform a search as the guest.
     let mut results = HashMap::default();
-    let search_rx = project_b.update(cx_b, |project, cx| {
+    let (search_rx, search_task) = project_b.update(cx_b, |project, cx| {
         project.search(
             SearchQuery::text(
                 "world",
@@ -5195,6 +5195,8 @@ async fn test_project_search(
             cx,
         )
     });
+    // Keep the search task alive while we drain the receiver; dropping it cancels the search.
+    let _search_task = search_task;
     while let Ok(result) = search_rx.recv().await {
         match result {
             SearchResult::Buffer { buffer, ranges } => {

crates/collab/src/tests/random_project_collaboration_tests.rs 🔗

@@ -886,7 +886,7 @@ impl RandomizedTest for ProjectCollaborationTest {
                     if detach { "detaching" } else { "awaiting" }
                 );
 
-                let search = project.update(cx, |project, cx| {
+                let (search, search_task) = project.update(cx, |project, cx| {
                     project.search(
                         SearchQuery::text(
                             query,
@@ -904,6 +904,9 @@ impl RandomizedTest for ProjectCollaborationTest {
                 });
                 drop(project);
                 let search = cx.executor().spawn(async move {
+                    // Keep the search task alive while we drain the receiver; dropping it cancels the search.
+                    let _search_task = search_task;
+
                     let mut results = HashMap::default();
                     while let Ok(result) = search.recv().await {
                         if let SearchResult::Buffer { buffer, ranges } = result {

crates/project/src/project.rs 🔗

@@ -4141,7 +4141,11 @@ impl Project {
         searcher.into_handle(query, cx)
     }
 
-    pub fn search(&mut self, query: SearchQuery, cx: &mut Context<Self>) -> Receiver<SearchResult> {
+    pub fn search(
+        &mut self,
+        query: SearchQuery,
+        cx: &mut Context<Self>,
+    ) -> (Receiver<SearchResult>, Task<()>) {
         self.search_impl(query, cx).results(cx)
     }
 
@@ -5025,10 +5029,14 @@ impl Project {
         let path_style = this.read_with(&cx, |this, cx| this.path_style(cx))?;
         let query =
             SearchQuery::from_proto(message.query.context("missing query field")?, path_style)?;
-        let results = this.update(&mut cx, |this, cx| {
+        let (results, search_task) = this.update(&mut cx, |this, cx| {
             this.search_impl(query, cx).matching_buffers(cx)
         })?;
 
+        // Keep the search task alive while we drain the receiver; dropping it cancels the search.
+        // We intentionally do not detach it.
+        let _search_task = search_task;
+
         let mut response = proto::FindSearchCandidatesResponse {
             buffer_ids: Vec::new(),
         };

crates/project/src/project_search.rs 🔗

@@ -1,7 +1,7 @@
 use std::{
     cell::LazyCell,
     collections::BTreeSet,
-    io::{BufRead, BufReader},
+    io::{BufReader, Cursor, Read},
     ops::Range,
     path::{Path, PathBuf},
     pin::pin,
@@ -68,13 +68,14 @@ pub struct SearchResultsHandle {
 }
 
 impl SearchResultsHandle {
-    pub fn results(self, cx: &mut App) -> Receiver<SearchResult> {
-        (self.trigger_search)(cx).detach();
-        self.results
+    pub fn results(self, cx: &mut App) -> (Receiver<SearchResult>, Task<()>) {
+        let task = (self.trigger_search)(cx);
+        (self.results, task)
     }
-    pub fn matching_buffers(self, cx: &mut App) -> Receiver<Entity<Buffer>> {
-        (self.trigger_search)(cx).detach();
-        self.matching_buffers
+
+    pub fn matching_buffers(self, cx: &mut App) -> (Receiver<Entity<Buffer>>, Task<()>) {
+        let task = (self.trigger_search)(cx);
+        (self.matching_buffers, task)
     }
 }
 
@@ -681,32 +682,40 @@ impl RequestHandler<'_> {
     }
 
     async fn handle_find_first_match(&self, mut entry: MatchingEntry) {
-        _=maybe!(async move {
+        _ = (async move || -> anyhow::Result<()> {
             let abs_path = entry.worktree_root.join(entry.path.path.as_std_path());
-            let Some(file) = self.fs.context("Trying to query filesystem in remote project search")?.open_sync(&abs_path).await.log_err() else {
-                return anyhow::Ok(());
-            };
 
-            let mut file = BufReader::new(file);
-            let file_start = file.fill_buf()?;
-
-            if let Err(Some(starting_position)) =
-            std::str::from_utf8(file_start).map_err(|e| e.error_len())
-            {
-                // Before attempting to match the file content, throw away files that have invalid UTF-8 sequences early on;
-                // That way we can still match files in a streaming fashion without having look at "obviously binary" files.
-                log::debug!(
-                    "Invalid UTF-8 sequence in file {abs_path:?} at byte position {starting_position}"
-                );
-                return Ok(());
+            // Avoid blocking IO here: cancellation of the search is implemented via task drop, and a
+            // synchronous `std::fs::File::open` / `Read::read` can delay task cancellation for a long time.
+            let contents = self
+                .fs
+                .context("Trying to query filesystem in remote project search")?
+                .load_bytes(&abs_path)
+                .await?;
+
+            // Before attempting to match the file content, throw away files that have invalid UTF-8 sequences early on;
+            // That way we can still match files without having to look at "obviously binary" files.
+            if let Err(error) = std::str::from_utf8(&contents) {
+                if let Some(starting_position) = error.error_len() {
+                    log::debug!(
+                        "Invalid UTF-8 sequence in file {abs_path:?} at byte position {starting_position}"
+                    );
+                    return Ok(());
+                }
             }
 
+            let file: Box<dyn Read + Send + Sync> = Box::new(Cursor::new(contents));
+            let file = BufReader::new(file);
+
             if self.query.detect(file).unwrap_or(false) {
                 // Yes, we should scan the whole file.
                 entry.should_scan_tx.send(entry.path).await?;
             }
+
             Ok(())
-        }).await;
+        })()
+        .await
+        .log_err();
     }
 
     async fn handle_scan_path(&self, req: InputPath) {

crates/project/src/project_tests.rs 🔗

@@ -10400,7 +10400,10 @@ async fn search(
     query: SearchQuery,
     cx: &mut gpui::TestAppContext,
 ) -> Result<HashMap<String, Vec<Range<usize>>>> {
-    let search_rx = project.update(cx, |project, cx| project.search(query, cx));
+    let (search_rx, search_task) = project.update(cx, |project, cx| project.search(query, cx));
+    // Keep the search task alive while we drain the receiver; dropping it cancels the search.
+    let _search_task = search_task;
+
     let mut results = HashMap::default();
     while let Ok(search_result) = search_rx.recv().await {
         match search_result {

crates/project_benchmarks/src/main.rs 🔗

@@ -102,9 +102,11 @@ fn main() -> Result<(), anyhow::Error> {
                 println!("Starting a project search");
                 let timer = std::time::Instant::now();
                 let mut first_match = None;
-                let matches = project
+
+                let (matches, _search_task) = project
                     .update(cx, |this, cx| this.search(query, cx))
                     .unwrap();
+
                 let mut matched_files = 0;
                 let mut matched_chunks = 0;
                 while let Ok(match_result) = matches.recv().await {

crates/remote_server/src/headless_project.rs 🔗

@@ -771,7 +771,7 @@ impl HeadlessProject {
             message.query.context("missing query field")?,
             PathStyle::local(),
         )?;
-        let results = this.update(&mut cx, |this, cx| {
+        let (results, search_task) = this.update(&mut cx, |this, cx| {
             project::Search::local(
                 this.fs.clone(),
                 this.buffer_store.clone(),
@@ -783,6 +783,10 @@ impl HeadlessProject {
             .matching_buffers(cx)
         })?;
 
+        // Keep the search task alive while we drain the receiver; dropping it cancels the search.
+        // We intentionally do not detach it.
+        let _search_task = search_task;
+
         let mut response = proto::FindSearchCandidatesResponse {
             buffer_ids: Vec::new(),
         };

crates/remote_server/src/remote_editing_tests.rs 🔗

@@ -194,7 +194,7 @@ async fn test_remote_project_search(cx: &mut TestAppContext, server_cx: &mut Tes
     cx.run_until_parked();
 
     async fn do_search(project: &Entity<Project>, mut cx: TestAppContext) -> Entity<Buffer> {
-        let receiver = project.update(&mut cx, |project, cx| {
+        let (receiver, search_task) = project.update(&mut cx, |project, cx| {
             project.search(
                 SearchQuery::text(
                     "project",
@@ -211,6 +211,10 @@ async fn test_remote_project_search(cx: &mut TestAppContext, server_cx: &mut Tes
             )
         });
 
+        // Keep the search task alive while we drain the receiver; dropping it cancels the search.
+        // We intentionally do not detach it.
+        let _search_task = search_task;
+
         let first_response = receiver.recv().await.unwrap();
         let SearchResult::Buffer { buffer, .. } = first_response else {
             panic!("incorrect result");

crates/search/src/project_search.rs 🔗

@@ -303,7 +303,7 @@ impl ProjectSearch {
     }
 
     fn search(&mut self, query: SearchQuery, cx: &mut Context<Self>) {
-        let search = self.project.update(cx, |project, cx| {
+        let (search, project_search_task) = self.project.update(cx, |project, cx| {
             project
                 .search_history_mut(SearchInputKind::Query)
                 .add(&mut self.search_history_cursor, query.as_str().to_string());
@@ -326,6 +326,10 @@ impl ProjectSearch {
         self.active_query = Some(query);
         self.match_ranges.clear();
         self.pending_search = Some(cx.spawn(async move |project_search, cx| {
+            // Keep the search task alive for the lifetime of this pending search task.
+            // Dropping it is the cancellation mechanism; we intentionally do not detach it.
+            let _project_search_task = project_search_task;
+
             let mut matches = pin!(search.ready_chunks(1024));
             project_search
                 .update(cx, |project_search, cx| {