diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index 216639fc88b3d557aa1e8fb97a756c2cf9346227..9ad0bd411b119f4ba24dd9517feb62c633feb7f9 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -4024,22 +4024,6 @@ impl Project { self.search_impl(query, cx).results(cx) } - fn find_search_candidate_buffers( - &mut self, - query: &SearchQuery, - limit: usize, - cx: &mut Context, - ) -> Receiver> { - if self.is_local() { - let fs = self.fs.clone(); - self.buffer_store.update(cx, |buffer_store, cx| { - buffer_store.find_search_candidates(query, limit, fs, cx) - }) - } else { - self.find_search_candidates_remote(query, limit, cx) - } - } - fn find_search_candidates_remote( &mut self, query: &SearchQuery, diff --git a/crates/project/src/project_search.rs b/crates/project/src/project_search.rs index 6a9fcdc0dcabba2b1a1e360543d361f2a293b723..dc1161c9d39bd6061dd92f1f7eff2d87d54e7277 100644 --- a/crates/project/src/project_search.rs +++ b/crates/project/src/project_search.rs @@ -11,7 +11,7 @@ use std::{ use collections::HashSet; use fs::Fs; use futures::{SinkExt, StreamExt, select_biased}; -use gpui::{App, AppContext, AsyncApp, Entity, Task}; +use gpui::{App, AsyncApp, Entity, Task}; use language::{Buffer, BufferSnapshot}; use postage::oneshot; use smol::channel::{Receiver, Sender, bounded, unbounded}; @@ -261,31 +261,35 @@ struct Worker<'search> { } impl Worker<'_> { - async fn run(self) { + async fn run(mut self) { let mut find_all_matches = pin!(self.find_all_matches_rx.fuse()); let mut find_first_match = pin!(self.confirm_contents_will_match_rx.fuse()); let mut scan_path = pin!(self.input_paths_rx.fuse()); - let handler = RequestHandler { - query: self.query, - open_entries: &self.open_buffers, - fs: self.fs, - matched_buffer_count: self.matched_buffer_count, - matches_count: self.matches_count, - confirm_contents_will_match_tx: &self.confirm_contents_will_match_tx, - get_buffer_for_full_scan_tx: &self.get_buffer_for_full_scan_tx, - publish_matches: &self.publish_matches, - }; loop { + let handler = RequestHandler { + query: self.query, + open_entries: &self.open_buffers, + fs: self.fs, + matched_buffer_count: self.matched_buffer_count, + matches_count: self.matches_count, + confirm_contents_will_match_tx: &self.confirm_contents_will_match_tx, + get_buffer_for_full_scan_tx: &self.get_buffer_for_full_scan_tx, + publish_matches: &self.publish_matches, + }; + // Whenever we notice that some step of a pipeline is closed, we don't want to close subsequent + // steps straight away. Another worker might be about to produce a value that will + // be pushed there, thus we'll replace current worker's pipe with a dummy one. + // That way, we'll only ever close a next-stage channel when ALL workers do so. select_biased! { find_all_matches = find_all_matches.next() => { let Some(matches) = find_all_matches else { - self.publish_matches.close(); + self.publish_matches = bounded(1).0; continue; }; let result = handler.handle_find_all_matches(matches).await; if let Some(_should_bail) = result { - self.publish_matches.close(); + self.publish_matches = bounded(1).0; break; } }, @@ -293,7 +297,7 @@ impl Worker<'_> { if let Some(buffer_with_at_least_one_match) = find_first_match { handler.handle_find_first_match(buffer_with_at_least_one_match).await; } else { - self.get_buffer_for_full_scan_tx.close(); + self.get_buffer_for_full_scan_tx = bounded(1).0; } }, @@ -301,11 +305,15 @@ impl Worker<'_> { if let Some(path_to_scan) = scan_path { handler.handle_scan_path(path_to_scan).await; } else { - self.confirm_contents_will_match_tx.close(); + // If we're the last worker to notice that this is not producing values, close the upstream. + self.confirm_contents_will_match_tx = bounded(1).0; } } - complete => break, + complete => { + break + }, + } } }