From 81dd5b9c9cb7d2244ad0817ea48807bc055042a9 Mon Sep 17 00:00:00 2001 From: Max Brunsfeld Date: Thu, 18 Dec 2025 10:34:15 -0800 Subject: [PATCH] Allow project search to be efficiently cancellable --- crates/agent/src/tools/grep_tool.rs | 5 +- crates/collab/src/tests/integration_tests.rs | 4 +- .../random_project_collaboration_tests.rs | 5 +- crates/project/src/project.rs | 12 +++- crates/project/src/project_search.rs | 57 +++++++++++-------- crates/project/src/project_tests.rs | 5 +- crates/project_benchmarks/src/main.rs | 4 +- crates/remote_server/src/headless_project.rs | 6 +- .../remote_server/src/remote_editing_tests.rs | 6 +- crates/search/src/project_search.rs | 6 +- 10 files changed, 76 insertions(+), 34 deletions(-) diff --git a/crates/agent/src/tools/grep_tool.rs b/crates/agent/src/tools/grep_tool.rs index 0caba91564fd1fc9e670909490d4e776b8ad6f11..6becd54e8623a141122d779783d6db1ada944758 100644 --- a/crates/agent/src/tools/grep_tool.rs +++ b/crates/agent/src/tools/grep_tool.rs @@ -170,12 +170,15 @@ impl AgentTool for GrepTool { Err(error) => return Task::ready(Err(error)), }; - let results = self + let (results, search_task) = self .project .update(cx, |project, cx| project.search(query, cx)); let project = self.project.downgrade(); cx.spawn(async move |cx| { + // Keep the search alive for the duration of result iteration. Dropping this task is the + // cancellation mechanism; we intentionally do not detach it. + let _search_task = search_task; futures::pin_mut!(results); let mut output = String::new(); diff --git a/crates/collab/src/tests/integration_tests.rs b/crates/collab/src/tests/integration_tests.rs index 391e7355ea196dfe25d363472918837ea817f450..e205abb32f37e4df70614de555cd9a751bd266f5 100644 --- a/crates/collab/src/tests/integration_tests.rs +++ b/crates/collab/src/tests/integration_tests.rs @@ -5179,7 +5179,7 @@ async fn test_project_search( // Perform a search as the guest. let mut results = HashMap::default(); - let search_rx = project_b.update(cx_b, |project, cx| { + let (search_rx, search_task) = project_b.update(cx_b, |project, cx| { project.search( SearchQuery::text( "world", @@ -5195,6 +5195,8 @@ async fn test_project_search( cx, ) }); + // Keep the search task alive while we drain the receiver; dropping it cancels the search. + let _search_task = search_task; while let Ok(result) = search_rx.recv().await { match result { SearchResult::Buffer { buffer, ranges } => { diff --git a/crates/collab/src/tests/random_project_collaboration_tests.rs b/crates/collab/src/tests/random_project_collaboration_tests.rs index 7e9b84c0571ed6dff19702ce3532c45d56f6413f..27b97b4a87734e567ca6339dd1c1c1bf607bd03f 100644 --- a/crates/collab/src/tests/random_project_collaboration_tests.rs +++ b/crates/collab/src/tests/random_project_collaboration_tests.rs @@ -886,7 +886,7 @@ impl RandomizedTest for ProjectCollaborationTest { if detach { "detaching" } else { "awaiting" } ); - let search = project.update(cx, |project, cx| { + let (search, search_task) = project.update(cx, |project, cx| { project.search( SearchQuery::text( query, @@ -904,6 +904,9 @@ impl RandomizedTest for ProjectCollaborationTest { }); drop(project); let search = cx.executor().spawn(async move { + // Keep the search task alive while we drain the receiver; dropping it cancels the search. + let _search_task = search_task; + let mut results = HashMap::default(); while let Ok(result) = search.recv().await { if let SearchResult::Buffer { buffer, ranges } = result { diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index 5e31f2a90cf137f1e4d788952832e1eb2ee0ec35..f0b346747a73348b6bb314248c604e0c56b3bf9c 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -4141,7 +4141,11 @@ impl Project { searcher.into_handle(query, cx) } - pub fn search(&mut self, query: SearchQuery, cx: &mut Context) -> Receiver { + pub fn search( + &mut self, + query: SearchQuery, + cx: &mut Context, + ) -> (Receiver, Task<()>) { self.search_impl(query, cx).results(cx) } @@ -5025,10 +5029,14 @@ impl Project { let path_style = this.read_with(&cx, |this, cx| this.path_style(cx))?; let query = SearchQuery::from_proto(message.query.context("missing query field")?, path_style)?; - let results = this.update(&mut cx, |this, cx| { + let (results, search_task) = this.update(&mut cx, |this, cx| { this.search_impl(query, cx).matching_buffers(cx) })?; + // Keep the search task alive while we drain the receiver; dropping it cancels the search. + // We intentionally do not detach it. + let _search_task = search_task; + let mut response = proto::FindSearchCandidatesResponse { buffer_ids: Vec::new(), }; diff --git a/crates/project/src/project_search.rs b/crates/project/src/project_search.rs index 90687f247338750b2c1197037576098281083e36..a08d75e3a1a1b4c776d68d1397acec6c19495fda 100644 --- a/crates/project/src/project_search.rs +++ b/crates/project/src/project_search.rs @@ -1,7 +1,7 @@ use std::{ cell::LazyCell, collections::BTreeSet, - io::{BufRead, BufReader}, + io::{BufReader, Cursor, Read}, ops::Range, path::{Path, PathBuf}, pin::pin, @@ -68,13 +68,14 @@ pub struct SearchResultsHandle { } impl SearchResultsHandle { - pub fn results(self, cx: &mut App) -> Receiver { - (self.trigger_search)(cx).detach(); - self.results + pub fn results(self, cx: &mut App) -> (Receiver, Task<()>) { + let task = (self.trigger_search)(cx); + (self.results, task) } - pub fn matching_buffers(self, cx: &mut App) -> Receiver> { - (self.trigger_search)(cx).detach(); - self.matching_buffers + + pub fn matching_buffers(self, cx: &mut App) -> (Receiver>, Task<()>) { + let task = (self.trigger_search)(cx); + (self.matching_buffers, task) } } @@ -681,32 +682,40 @@ impl RequestHandler<'_> { } async fn handle_find_first_match(&self, mut entry: MatchingEntry) { - _=maybe!(async move { + _ = (async move || -> anyhow::Result<()> { let abs_path = entry.worktree_root.join(entry.path.path.as_std_path()); - let Some(file) = self.fs.context("Trying to query filesystem in remote project search")?.open_sync(&abs_path).await.log_err() else { - return anyhow::Ok(()); - }; - let mut file = BufReader::new(file); - let file_start = file.fill_buf()?; - - if let Err(Some(starting_position)) = - std::str::from_utf8(file_start).map_err(|e| e.error_len()) - { - // Before attempting to match the file content, throw away files that have invalid UTF-8 sequences early on; - // That way we can still match files in a streaming fashion without having look at "obviously binary" files. - log::debug!( - "Invalid UTF-8 sequence in file {abs_path:?} at byte position {starting_position}" - ); - return Ok(()); + // Avoid blocking IO here: cancellation of the search is implemented via task drop, and a + // synchronous `std::fs::File::open` / `Read::read` can delay task cancellation for a long time. + let contents = self + .fs + .context("Trying to query filesystem in remote project search")? + .load_bytes(&abs_path) + .await?; + + // Before attempting to match the file content, throw away files that have invalid UTF-8 sequences early on; + // That way we can still match files without having to look at "obviously binary" files. + if let Err(error) = std::str::from_utf8(&contents) { + if let Some(starting_position) = error.error_len() { + log::debug!( + "Invalid UTF-8 sequence in file {abs_path:?} at byte position {starting_position}" + ); + return Ok(()); + } } + let file: Box = Box::new(Cursor::new(contents)); + let file = BufReader::new(file); + if self.query.detect(file).unwrap_or(false) { // Yes, we should scan the whole file. entry.should_scan_tx.send(entry.path).await?; } + Ok(()) - }).await; + })() + .await + .log_err(); } async fn handle_scan_path(&self, req: InputPath) { diff --git a/crates/project/src/project_tests.rs b/crates/project/src/project_tests.rs index 4cebc72073cfda1bf07f028b1aff9fa7410c527d..dc9738738c8f9465e77833bda9cf3ba318877466 100644 --- a/crates/project/src/project_tests.rs +++ b/crates/project/src/project_tests.rs @@ -10400,7 +10400,10 @@ async fn search( query: SearchQuery, cx: &mut gpui::TestAppContext, ) -> Result>>> { - let search_rx = project.update(cx, |project, cx| project.search(query, cx)); + let (search_rx, search_task) = project.update(cx, |project, cx| project.search(query, cx)); + // Keep the search task alive while we drain the receiver; dropping it cancels the search. + let _search_task = search_task; + let mut results = HashMap::default(); while let Ok(search_result) = search_rx.recv().await { match search_result { diff --git a/crates/project_benchmarks/src/main.rs b/crates/project_benchmarks/src/main.rs index e4ddbb6cf2c7b6984df2533963bdf6bf88eacba0..2b42e9e51858ba9e780f61b420c2278621eaabfa 100644 --- a/crates/project_benchmarks/src/main.rs +++ b/crates/project_benchmarks/src/main.rs @@ -102,9 +102,11 @@ fn main() -> Result<(), anyhow::Error> { println!("Starting a project search"); let timer = std::time::Instant::now(); let mut first_match = None; - let matches = project + + let (matches, _search_task) = project .update(cx, |this, cx| this.search(query, cx)) .unwrap(); + let mut matched_files = 0; let mut matched_chunks = 0; while let Ok(match_result) = matches.recv().await { diff --git a/crates/remote_server/src/headless_project.rs b/crates/remote_server/src/headless_project.rs index c83cc6aa34402a082fe104d64a8cb47f460704b8..4ef18ee99e59618806a30cdc219d5e7fb710a636 100644 --- a/crates/remote_server/src/headless_project.rs +++ b/crates/remote_server/src/headless_project.rs @@ -771,7 +771,7 @@ impl HeadlessProject { message.query.context("missing query field")?, PathStyle::local(), )?; - let results = this.update(&mut cx, |this, cx| { + let (results, search_task) = this.update(&mut cx, |this, cx| { project::Search::local( this.fs.clone(), this.buffer_store.clone(), @@ -783,6 +783,10 @@ impl HeadlessProject { .matching_buffers(cx) })?; + // Keep the search task alive while we drain the receiver; dropping it cancels the search. + // We intentionally do not detach it. + let _search_task = search_task; + let mut response = proto::FindSearchCandidatesResponse { buffer_ids: Vec::new(), }; diff --git a/crates/remote_server/src/remote_editing_tests.rs b/crates/remote_server/src/remote_editing_tests.rs index a7a870b0513694abe8b126fd0badea05534749ea..35f07696be846664f80a5b58473def3aaeb460fd 100644 --- a/crates/remote_server/src/remote_editing_tests.rs +++ b/crates/remote_server/src/remote_editing_tests.rs @@ -194,7 +194,7 @@ async fn test_remote_project_search(cx: &mut TestAppContext, server_cx: &mut Tes cx.run_until_parked(); async fn do_search(project: &Entity, mut cx: TestAppContext) -> Entity { - let receiver = project.update(&mut cx, |project, cx| { + let (receiver, search_task) = project.update(&mut cx, |project, cx| { project.search( SearchQuery::text( "project", @@ -211,6 +211,10 @@ async fn test_remote_project_search(cx: &mut TestAppContext, server_cx: &mut Tes ) }); + // Keep the search task alive while we drain the receiver; dropping it cancels the search. + // We intentionally do not detach it. + let _search_task = search_task; + let first_response = receiver.recv().await.unwrap(); let SearchResult::Buffer { buffer, .. } = first_response else { panic!("incorrect result"); diff --git a/crates/search/src/project_search.rs b/crates/search/src/project_search.rs index e0bbf58ce6f1d0c752914bbbfa6fcdf70ea30175..11b6f3323a66fafc8a29169234e4e7bb933cc10c 100644 --- a/crates/search/src/project_search.rs +++ b/crates/search/src/project_search.rs @@ -303,7 +303,7 @@ impl ProjectSearch { } fn search(&mut self, query: SearchQuery, cx: &mut Context) { - let search = self.project.update(cx, |project, cx| { + let (search, project_search_task) = self.project.update(cx, |project, cx| { project .search_history_mut(SearchInputKind::Query) .add(&mut self.search_history_cursor, query.as_str().to_string()); @@ -326,6 +326,10 @@ impl ProjectSearch { self.active_query = Some(query); self.match_ranges.clear(); self.pending_search = Some(cx.spawn(async move |project_search, cx| { + // Keep the search task alive for the lifetime of this pending search task. + // Dropping it is the cancellation mechanism; we intentionally do not detach it. + let _project_search_task = project_search_task; + let mut matches = pin!(search.ready_chunks(1024)); project_search .update(cx, |project_search, cx| {