Wire up searcher (incomplete one) to main search routine

Piotr Osiewicz created

Change summary

crates/project/src/project.rs        | 68 ++++++-----------------------
crates/project/src/project_search.rs | 62 ++++++++++++++-------------
2 files changed, 46 insertions(+), 84 deletions(-)

Detailed changes

crates/project/src/project.rs 🔗

@@ -41,6 +41,7 @@ use crate::{
     agent_server_store::AllAgentServersSettings,
     git_store::GitStore,
     lsp_store::{SymbolLocation, log_store::LogKind},
+    project_search::ProjectSearcher,
 };
 pub use agent_server_store::{AgentServerStore, AgentServersUpdated};
 pub use git_store::{
@@ -4005,61 +4006,20 @@ impl Project {
     }
 
     pub fn search(&mut self, query: SearchQuery, cx: &mut Context<Self>) -> Receiver<SearchResult> {
-        let (result_tx, result_rx) = smol::channel::unbounded();
-
-        let matching_buffers_rx = if query.is_opened_only() {
-            self.sort_search_candidates(&query, cx)
-        } else {
-            self.find_search_candidate_buffers(&query, MAX_SEARCH_RESULT_FILES + 1, cx)
+        let snapshots = self
+            .visible_worktrees(cx)
+            .filter_map(|tree| {
+                let tree = tree.read(cx);
+                Some((tree.snapshot(), tree.as_local()?.settings()))
+            })
+            .collect::<Vec<_>>();
+        let searcher = ProjectSearcher {
+            fs: self.fs.clone(),
+            buffer_store: self.buffer_store.downgrade(),
+            snapshots,
+            open_buffers: Default::default(),
         };
-
-        cx.spawn(async move |_, cx| {
-            let mut range_count = 0;
-            let mut buffer_count = 0;
-            let mut limit_reached = false;
-            let query = Arc::new(query);
-            let chunks = matching_buffers_rx.ready_chunks(64);
-
-            // Now that we know what paths match the query, we will load at most
-            // 64 buffers at a time to avoid overwhelming the main thread. For each
-            // opened buffer, we will spawn a background task that retrieves all the
-            // ranges in the buffer matched by the query.
-            let mut chunks = pin!(chunks);
-            'outer: while let Some(matching_buffer_chunk) = chunks.next().await {
-                let mut chunk_results = Vec::with_capacity(matching_buffer_chunk.len());
-                for buffer in matching_buffer_chunk {
-                    let query = query.clone();
-                    let snapshot = buffer.read_with(cx, |buffer, _| buffer.snapshot())?;
-                    chunk_results.push(cx.background_spawn(async move {}));
-                }
-
-                let chunk_results = futures::future::join_all(chunk_results).await;
-                for result in chunk_results {
-                    if let Some((buffer, ranges)) = result.log_err() {
-                        range_count += ranges.len();
-                        buffer_count += 1;
-                        result_tx
-                            .send(SearchResult::Buffer { buffer, ranges })
-                            .await?;
-                        if buffer_count > MAX_SEARCH_RESULT_FILES
-                            || range_count > MAX_SEARCH_RESULT_RANGES
-                        {
-                            limit_reached = true;
-                            break 'outer;
-                        }
-                    }
-                }
-            }
-
-            if limit_reached {
-                result_tx.send(SearchResult::LimitReached).await?;
-            }
-
-            anyhow::Ok(())
-        })
-        .detach();
-
-        result_rx
+        searcher.run(query, cx)
     }
 
     fn find_search_candidate_buffers(

crates/project/src/project_search.rs 🔗

@@ -26,17 +26,17 @@ use crate::{
 };
 
 pub(crate) struct ProjectSearcher {
-    fs: Arc<dyn Fs>,
-    buffer_store: WeakEntity<BufferStore>,
+    pub(crate) fs: Arc<dyn Fs>,
+    pub(crate) buffer_store: WeakEntity<BufferStore>,
     pub(crate) snapshots: Vec<(Snapshot, WorktreeSettings)>,
-    open_buffers: HashSet<ProjectEntryId>,
+    pub(crate) open_buffers: HashSet<ProjectEntryId>,
 }
 
 const MAX_SEARCH_RESULT_FILES: usize = 5_000;
 const MAX_SEARCH_RESULT_RANGES: usize = 10_000;
 
 impl ProjectSearcher {
-    pub(crate) fn search(self, query: SearchQuery, cx: &mut App) -> Receiver<SearchResult> {
+    pub(crate) fn run(self, query: SearchQuery, cx: &mut App) -> Receiver<SearchResult> {
         let executor = cx.background_executor().clone();
         let (tx, rx) = unbounded();
         cx.spawn(async move |cx| {
@@ -48,7 +48,7 @@ impl ProjectSearcher {
             let matched_buffer_count = AtomicUsize::new(0);
             let worker_pool = executor.scoped(|scope| {
                 let (input_paths_tx, input_paths_rx) = bounded(64);
-                let (find_first_match_tx, find_first_match_rx) = bounded(64);
+                let (confirm_contents_will_match_tx, confirm_contents_will_match_rx) = bounded(64);
                 let (sorted_search_results_tx, sorted_search_results_rx) = bounded(64);
                 for _ in 0..executor.num_cpus() {
                     let worker = Worker {
@@ -58,8 +58,8 @@ impl ProjectSearcher {
                         matches_count: &matches_count,
                         fs: &*self.fs,
                         input_paths_rx: input_paths_rx.clone(),
-                        find_first_match_rx: find_first_match_rx.clone(),
-                        find_first_match_tx: find_first_match_tx.clone(),
+                        confirm_contents_will_match_rx: confirm_contents_will_match_rx.clone(),
+                        confirm_contents_will_match_tx: confirm_contents_will_match_tx.clone(),
                         get_buffer_for_full_scan_tx: get_buffer_for_full_scan_tx.clone(),
                         find_all_matches_rx: find_all_matches_rx.clone(),
                         publish_matches: tx.clone(),
@@ -71,13 +71,16 @@ impl ProjectSearcher {
                     input_paths_tx,
                     sorted_search_results_tx,
                 ));
-                scope.spawn(self.maintain_sorted_search_results())
+                scope.spawn(self.maintain_sorted_search_results(
+                    sorted_search_results_rx,
+                    get_buffer_for_full_scan_tx,
+                ))
             });
             self.open_buffers(get_buffer_for_full_scan_rx, find_all_matches_tx, cx)
                 .await;
             worker_pool.await;
-            let limit_reached = matches_count.load(Ordering::Release) > MAX_SEARCH_RESULT_RANGES
-                || matched_buffer_count.load(Ordering::Release) > MAX_SEARCH_RESULT_FILES;
+            let limit_reached = matches_count.load(Ordering::Acquire) > MAX_SEARCH_RESULT_RANGES
+                || matched_buffer_count.load(Ordering::Acquire) > MAX_SEARCH_RESULT_FILES;
             if limit_reached {
                 _ = tx.send(SearchResult::LimitReached).await;
             }
@@ -165,10 +168,12 @@ struct Worker<'search> {
     /// - Scan ignored files
     /// Put another way: filter out files that can't match (without looking at file contents)
     input_paths_rx: Receiver<InputPath<'search>>,
-    /// After that, figure out which paths contain at least one match (look at file contents). That's called "partial scan".
-    find_first_match_tx: Sender<MatchingEntry>,
-    find_first_match_rx: Receiver<MatchingEntry>,
-    /// Of those that contain at least one match, look for rest of matches (and figure out their ranges).
+
+    /// After that, if the buffer is not yet loaded, we'll figure out if it contains at least one match
+    /// based on disk contents of a buffer. This step is not performed for buffers we already have in memory.
+    confirm_contents_will_match_tx: Sender<MatchingEntry>,
+    confirm_contents_will_match_rx: Receiver<MatchingEntry>,
+    /// Of those that contain at least one match (or are already in memory), look for rest of matches (and figure out their ranges).
     /// But wait - first, we need to go back to the main thread to open a buffer (& create an entity for it).
     get_buffer_for_full_scan_tx: Sender<ProjectPath>,
     /// Ok, we're back in background: run full scan & find all matches in a given buffer snapshot.
@@ -180,7 +185,7 @@ struct Worker<'search> {
 impl Worker<'_> {
     async fn run(self) {
         let mut find_all_matches = pin!(self.find_all_matches_rx.fuse());
-        let mut find_first_match = pin!(self.find_first_match_rx.fuse());
+        let mut find_first_match = pin!(self.confirm_contents_will_match_rx.fuse());
         let mut scan_path = pin!(self.input_paths_rx.fuse());
         let handler = RequestHandler {
             query: self.query,
@@ -188,7 +193,7 @@ impl Worker<'_> {
             fs: self.fs,
             matched_buffer_count: self.matched_buffer_count,
             matches_count: self.matches_count,
-            find_first_match_tx: &self.find_first_match_tx,
+            confirm_contents_will_match_tx: &self.confirm_contents_will_match_tx,
             get_buffer_for_full_scan_tx: &self.get_buffer_for_full_scan_tx,
             publish_matches: &self.publish_matches,
         };
@@ -225,7 +230,7 @@ struct RequestHandler<'worker> {
     matched_buffer_count: &'worker AtomicUsize,
     matches_count: &'worker AtomicUsize,
 
-    find_first_match_tx: &'worker Sender<MatchingEntry>,
+    confirm_contents_will_match_tx: &'worker Sender<MatchingEntry>,
     get_buffer_for_full_scan_tx: &'worker Sender<ProjectPath>,
     publish_matches: &'worker Sender<SearchResult>,
 }
@@ -302,15 +307,15 @@ impl RequestHandler<'_> {
             } = req;
             if entry.is_dir() && entry.is_ignored {
                 if !settings.is_path_excluded(&entry.path) {
-                    Self::scan_ignored_dir(
-                        self.fs,
-                        &snapshot,
-                        &entry.path,
-                        self.query,
-                        &filter_tx,
-                        &output_tx,
-                    )
-                    .await?;
+                    // Self::scan_ignored_dir(
+                    //     self.fs,
+                    //     &snapshot,
+                    //     &entry.path,
+                    //     self.query,
+                    //     &filter_tx,
+                    //     &output_tx,
+                    // )
+                    // .await?;
                 }
                 return Ok(());
             }
@@ -332,8 +337,6 @@ impl RequestHandler<'_> {
                 }
             }
 
-            let (mut tx, rx) = oneshot::channel();
-
             if self.open_entries.contains(&entry.id) {
                 // The buffer is already in memory and that's the version we want to scan;
                 // hence skip the dilly-dally and look for all matches straight away.
@@ -344,7 +347,7 @@ impl RequestHandler<'_> {
                     })
                     .await?;
             } else {
-                self.find_first_match_tx
+                self.confirm_contents_will_match_tx
                     .send(MatchingEntry {
                         should_scan_tx: should_scan_tx,
                         worktree_root: snapshot.abs_path().clone(),
@@ -356,7 +359,6 @@ impl RequestHandler<'_> {
                     .await?;
             }
 
-            output_tx.send(rx).await?;
             anyhow::Ok(())
         })
         .await;