Start exploring API for remote searches

Piotr Osiewicz and Smit Barmase created

Co-authored-by: Smit Barmase <heysmitbarmase@gmail.com>

Change summary

crates/project/src/project.rs        |  13 +
crates/project/src/project_search.rs | 169 +++++++++++++++++++----------
2 files changed, 120 insertions(+), 62 deletions(-)

Detailed changes

crates/project/src/project.rs 🔗

@@ -41,7 +41,7 @@ use crate::{
     agent_server_store::AllAgentServersSettings,
     git_store::GitStore,
     lsp_store::{SymbolLocation, log_store::LogKind},
-    project_search::ProjectSearcher,
+    project_search::SearchResultsHandle,
 };
 pub use agent_server_store::{AgentServerStore, AgentServersUpdated};
 pub use git_store::{
@@ -4003,7 +4003,7 @@ impl Project {
         })
     }
 
-    pub fn search(&mut self, query: SearchQuery, cx: &mut Context<Self>) -> Receiver<SearchResult> {
+    fn search_impl(&mut self, query: SearchQuery, cx: &mut Context<Self>) -> SearchResultsHandle {
         let snapshots = self
             .visible_worktrees(cx)
             .filter_map(|tree| {
@@ -4012,13 +4012,16 @@ impl Project {
             })
             .collect::<Vec<_>>();
 
-        let searcher = ProjectSearcher {
+        let searcher = project_search::Search {
             fs: self.fs.clone(),
             buffer_store: self.buffer_store.clone(),
             snapshots,
             open_buffers: Default::default(),
         };
-        searcher.run(query, cx)
+        searcher.into_results(query, cx)
+    }
+    pub fn search(&mut self, query: SearchQuery, cx: &mut Context<Self>) -> Receiver<SearchResult> {
+        self.search_impl(query, cx).results(cx)
     }
 
     fn find_search_candidate_buffers(
@@ -4885,7 +4888,7 @@ impl Project {
         let query =
             SearchQuery::from_proto(message.query.context("missing query field")?, path_style)?;
         let results = this.update(&mut cx, |this, cx| {
-            this.find_search_candidate_buffers(&query, message.limit as _, cx)
+            this.search_impl(query, cx).matching_buffers(cx)
         })?;
 
         let mut response = proto::FindSearchCandidatesResponse {

crates/project/src/project_search.rs 🔗

@@ -11,7 +11,7 @@ use std::{
 use collections::HashSet;
 use fs::Fs;
 use futures::{SinkExt, StreamExt, select_biased};
-use gpui::{App, AsyncApp, Entity};
+use gpui::{App, AsyncApp, Entity, Task};
 use language::{Buffer, BufferSnapshot};
 use postage::oneshot;
 use smol::channel::{Receiver, Sender, bounded, unbounded};
@@ -25,7 +25,7 @@ use crate::{
     search::{SearchQuery, SearchResult},
 };
 
-pub(crate) struct ProjectSearcher {
+pub(crate) struct Search {
     pub(crate) fs: Arc<dyn Fs>,
     pub(crate) buffer_store: Entity<BufferStore>,
     pub(crate) snapshots: Vec<(Snapshot, WorktreeSettings)>,
@@ -35,11 +35,33 @@ pub(crate) struct ProjectSearcher {
 const MAX_SEARCH_RESULT_FILES: usize = 5_000;
 const MAX_SEARCH_RESULT_RANGES: usize = 10_000;
 
-impl ProjectSearcher {
-    pub(crate) fn run(self, query: SearchQuery, cx: &mut App) -> Receiver<SearchResult> {
+/// Represents results of project search and allows one to either obtain match positions OR
+/// just the handles to buffers that may match the search.
+#[must_use]
+pub(crate) struct SearchResultsHandle {
+    results: Receiver<SearchResult>,
+    matching_buffers: Receiver<Entity<Buffer>>,
+    trigger_search: Box<dyn FnOnce(&mut App) -> Task<()> + Send + Sync>,
+}
+
+impl SearchResultsHandle {
+    pub(crate) fn results(self, cx: &mut App) -> Receiver<SearchResult> {
+        (self.trigger_search)(cx).detach();
+        self.results
+    }
+    pub(crate) fn matching_buffers(self, cx: &mut App) -> Receiver<Entity<Buffer>> {
+        (self.trigger_search)(cx).detach();
+        self.matching_buffers
+    }
+}
+
+impl Search {
+    /// Prepares a project search run. The result has to be used to specify whether you're interested in matching buffers
+    /// or full search results.
+    pub(crate) fn into_results(self, query: SearchQuery, cx: &mut App) -> SearchResultsHandle {
         let mut open_buffers = HashSet::default();
         let mut unnamed_buffers = Vec::new();
-
+        const MAX_CONCURRENT_BUFFER_OPENS: usize = 64;
         let buffers = self.buffer_store.read(cx);
         for handle in buffers.buffers() {
             let buffer = handle.read(cx);
@@ -54,55 +76,73 @@ impl ProjectSearcher {
         }
         let executor = cx.background_executor().clone();
         let (tx, rx) = unbounded();
-        cx.spawn(async move |cx| {
-            const MAX_CONCURRENT_BUFFER_OPENS: usize = 64;
-            let (find_all_matches_tx, find_all_matches_rx) = bounded(MAX_CONCURRENT_BUFFER_OPENS);
-            let (get_buffer_for_full_scan_tx, get_buffer_for_full_scan_rx) =
-                bounded(MAX_CONCURRENT_BUFFER_OPENS);
-            let matches_count = AtomicUsize::new(0);
-            let matched_buffer_count = AtomicUsize::new(0);
-            let worker_pool = executor.scoped(|scope| {
-                let (input_paths_tx, input_paths_rx) = bounded(64);
-                let (confirm_contents_will_match_tx, confirm_contents_will_match_rx) = bounded(64);
-                let (sorted_search_results_tx, sorted_search_results_rx) = bounded(64);
-                for _ in 0..executor.num_cpus() {
-                    let worker = Worker {
-                        query: &query,
-                        open_buffers: &self.open_buffers,
-                        matched_buffer_count: &matched_buffer_count,
-                        matches_count: &matches_count,
-                        fs: &*self.fs,
-                        input_paths_rx: input_paths_rx.clone(),
-                        confirm_contents_will_match_rx: confirm_contents_will_match_rx.clone(),
-                        confirm_contents_will_match_tx: confirm_contents_will_match_tx.clone(),
-                        get_buffer_for_full_scan_tx: get_buffer_for_full_scan_tx.clone(),
-                        find_all_matches_rx: find_all_matches_rx.clone(),
-                        publish_matches: tx.clone(),
-                    };
-                    scope.spawn(worker.run());
+        let (grab_buffer_snapshot_tx, grab_buffer_snapshot_rx) =
+            bounded(MAX_CONCURRENT_BUFFER_OPENS);
+        let matching_buffers = grab_buffer_snapshot_rx.clone();
+        let trigger_search = Box::new(|cx: &mut App| {
+            cx.spawn(async move |cx| {
+                let (find_all_matches_tx, find_all_matches_rx) =
+                    bounded(MAX_CONCURRENT_BUFFER_OPENS);
+
+                let (get_buffer_for_full_scan_tx, get_buffer_for_full_scan_rx) = unbounded();
+                let matches_count = AtomicUsize::new(0);
+                let matched_buffer_count = AtomicUsize::new(0);
+                let worker_pool = executor.scoped(|scope| {
+                    let (input_paths_tx, input_paths_rx) = bounded(64);
+                    let (confirm_contents_will_match_tx, confirm_contents_will_match_rx) =
+                        bounded(64);
+                    let (sorted_search_results_tx, sorted_search_results_rx) = bounded(64);
+                    for _ in 0..executor.num_cpus() {
+                        let worker = Worker {
+                            query: &query,
+                            open_buffers: &self.open_buffers,
+                            matched_buffer_count: &matched_buffer_count,
+                            matches_count: &matches_count,
+                            fs: &*self.fs,
+                            input_paths_rx: input_paths_rx.clone(),
+                            confirm_contents_will_match_rx: confirm_contents_will_match_rx.clone(),
+                            confirm_contents_will_match_tx: confirm_contents_will_match_tx.clone(),
+                            get_buffer_for_full_scan_tx: get_buffer_for_full_scan_tx.clone(),
+                            find_all_matches_rx: find_all_matches_rx.clone(),
+                            publish_matches: tx.clone(),
+                        };
+                        scope.spawn(worker.run());
+                    }
+                    scope.spawn(self.provide_search_paths(
+                        &query,
+                        input_paths_tx,
+                        sorted_search_results_tx,
+                    ));
+                    scope.spawn(self.maintain_sorted_search_results(
+                        sorted_search_results_rx,
+                        get_buffer_for_full_scan_tx,
+                    ))
+                });
+                let open_buffers = self.open_buffers(
+                    get_buffer_for_full_scan_rx,
+                    grab_buffer_snapshot_tx,
+                    cx.clone(),
+                );
+                let buffer_snapshots = self.grab_buffer_snapshots(
+                    grab_buffer_snapshot_rx,
+                    find_all_matches_tx,
+                    cx.clone(),
+                );
+                futures::future::join3(worker_pool, buffer_snapshots, open_buffers).await;
+
+                let limit_reached = matches_count.load(Ordering::Acquire)
+                    > MAX_SEARCH_RESULT_RANGES
+                    || matched_buffer_count.load(Ordering::Acquire) > MAX_SEARCH_RESULT_FILES;
+                if limit_reached {
+                    _ = tx.send(SearchResult::LimitReached).await;
                 }
-                scope.spawn(self.provide_search_paths(
-                    &query,
-                    input_paths_tx,
-                    sorted_search_results_tx,
-                ));
-                scope.spawn(self.maintain_sorted_search_results(
-                    sorted_search_results_rx,
-                    get_buffer_for_full_scan_tx,
-                ))
-            });
-            let open_buffers =
-                self.open_buffers(get_buffer_for_full_scan_rx, find_all_matches_tx, cx);
-            futures::future::join(worker_pool, open_buffers).await;
-
-            let limit_reached = matches_count.load(Ordering::Acquire) > MAX_SEARCH_RESULT_RANGES
-                || matched_buffer_count.load(Ordering::Acquire) > MAX_SEARCH_RESULT_FILES;
-            if limit_reached {
-                _ = tx.send(SearchResult::LimitReached).await;
-            }
-        })
-        .detach();
-        rx
+            })
+        });
+        SearchResultsHandle {
+            results: rx,
+            matching_buffers,
+            trigger_search,
+        }
     }
 
     async fn provide_search_paths<'this>(
@@ -153,20 +193,35 @@ impl ProjectSearcher {
     async fn open_buffers<'a>(
         &'a self,
         rx: Receiver<ProjectPath>,
-        find_all_matches_tx: Sender<(Entity<Buffer>, BufferSnapshot)>,
-        cx: &mut AsyncApp,
+        find_all_matches_tx: Sender<Entity<Buffer>>,
+        mut cx: AsyncApp,
     ) {
         _ = maybe!(async move {
             while let Ok(requested_path) = rx.recv().await {
                 let Some(buffer) = self
                     .buffer_store
-                    .update(cx, |this, cx| this.open_buffer(requested_path, cx))?
+                    .update(&mut cx, |this, cx| this.open_buffer(requested_path, cx))?
                     .await
                     .log_err()
                 else {
                     continue;
                 };
-                let snapshot = buffer.read_with(cx, |this, _| this.snapshot())?;
+                find_all_matches_tx.send(buffer).await?;
+            }
+            Result::<_, anyhow::Error>::Ok(())
+        })
+        .await;
+    }
+
+    async fn grab_buffer_snapshots<'a>(
+        &'a self,
+        rx: Receiver<Entity<Buffer>>,
+        find_all_matches_tx: Sender<(Entity<Buffer>, BufferSnapshot)>,
+        mut cx: AsyncApp,
+    ) {
+        _ = maybe!(async move {
+            while let Ok(buffer) = rx.recv().await {
+                let snapshot = buffer.read_with(&mut cx, |this, _| this.snapshot())?;
                 find_all_matches_tx.send((buffer, snapshot)).await?;
             }
             Result::<_, anyhow::Error>::Ok(())