Clean up lifetime of queues

Piotr Osiewicz and Smit Barmase created

Co-authored-by: Smit Barmase <heysmitbarmase@gmail.com>

Change summary

crates/project/src/project.rs        | 16 -----------
crates/project/src/project_search.rs | 42 +++++++++++++++++------------
2 files changed, 25 insertions(+), 33 deletions(-)

Detailed changes

crates/project/src/project.rs 🔗

@@ -4024,22 +4024,6 @@ impl Project {
         self.search_impl(query, cx).results(cx)
     }
 
-    fn find_search_candidate_buffers(
-        &mut self,
-        query: &SearchQuery,
-        limit: usize,
-        cx: &mut Context<Project>,
-    ) -> Receiver<Entity<Buffer>> {
-        if self.is_local() {
-            let fs = self.fs.clone();
-            self.buffer_store.update(cx, |buffer_store, cx| {
-                buffer_store.find_search_candidates(query, limit, fs, cx)
-            })
-        } else {
-            self.find_search_candidates_remote(query, limit, cx)
-        }
-    }
-
     fn find_search_candidates_remote(
         &mut self,
         query: &SearchQuery,

crates/project/src/project_search.rs 🔗

@@ -11,7 +11,7 @@ use std::{
 use collections::HashSet;
 use fs::Fs;
 use futures::{SinkExt, StreamExt, select_biased};
-use gpui::{App, AppContext, AsyncApp, Entity, Task};
+use gpui::{App, AsyncApp, Entity, Task};
 use language::{Buffer, BufferSnapshot};
 use postage::oneshot;
 use smol::channel::{Receiver, Sender, bounded, unbounded};
@@ -261,31 +261,35 @@ struct Worker<'search> {
 }
 
 impl Worker<'_> {
-    async fn run(self) {
+    async fn run(mut self) {
         let mut find_all_matches = pin!(self.find_all_matches_rx.fuse());
         let mut find_first_match = pin!(self.confirm_contents_will_match_rx.fuse());
         let mut scan_path = pin!(self.input_paths_rx.fuse());
-        let handler = RequestHandler {
-            query: self.query,
-            open_entries: &self.open_buffers,
-            fs: self.fs,
-            matched_buffer_count: self.matched_buffer_count,
-            matches_count: self.matches_count,
-            confirm_contents_will_match_tx: &self.confirm_contents_will_match_tx,
-            get_buffer_for_full_scan_tx: &self.get_buffer_for_full_scan_tx,
-            publish_matches: &self.publish_matches,
-        };
 
         loop {
+            let handler = RequestHandler {
+                query: self.query,
+                open_entries: &self.open_buffers,
+                fs: self.fs,
+                matched_buffer_count: self.matched_buffer_count,
+                matches_count: self.matches_count,
+                confirm_contents_will_match_tx: &self.confirm_contents_will_match_tx,
+                get_buffer_for_full_scan_tx: &self.get_buffer_for_full_scan_tx,
+                publish_matches: &self.publish_matches,
+            };
+            // Whenever we notice that some step of a pipeline is closed, we don't want to close subsequent
+            // steps straight away. Another worker might be about to produce a value that will
+            // be pushed there, thus we'll replace current worker's pipe with a dummy one.
+            // That way, we'll only ever close a next-stage channel when ALL workers do so.
             select_biased! {
                 find_all_matches = find_all_matches.next() => {
                     let Some(matches) = find_all_matches else {
-                        self.publish_matches.close();
+                        self.publish_matches = bounded(1).0;
                         continue;
                         };
                     let result = handler.handle_find_all_matches(matches).await;
                     if let Some(_should_bail) = result {
-                        self.publish_matches.close();
+                        self.publish_matches = bounded(1).0;
                         break;
                     }
                 },
@@ -293,7 +297,7 @@ impl Worker<'_> {
                     if let Some(buffer_with_at_least_one_match) = find_first_match {
                         handler.handle_find_first_match(buffer_with_at_least_one_match).await;
                     } else {
-                        self.get_buffer_for_full_scan_tx.close();
+                        self.get_buffer_for_full_scan_tx = bounded(1).0;
                     }
 
                 },
@@ -301,11 +305,15 @@ impl Worker<'_> {
                     if let Some(path_to_scan) = scan_path {
                         handler.handle_scan_path(path_to_scan).await;
                     } else {
-                        self.confirm_contents_will_match_tx.close();
+                        // If we're the last worker to notice that this is not producing values, close the upstream.
+                        self.confirm_contents_will_match_tx = bounded(1).0;
                     }
 
                  }
-                 complete => break,
+                 complete => {
+                     break
+                },
+
             }
         }
     }