Another one

Piotr Osiewicz created

Change summary

crates/project/src/project_search.rs | 158 ++++++++++++++++++-----------
1 file changed, 96 insertions(+), 62 deletions(-)

Detailed changes

crates/project/src/project_search.rs 🔗

@@ -1,8 +1,7 @@
 use std::{
     io::{BufRead, BufReader},
-    ops::{ControlFlow, Range},
     path::Path,
-    pin::{Pin, pin},
+    pin::pin,
     sync::{
         Arc,
         atomic::{AtomicUsize, Ordering},
@@ -11,12 +10,12 @@ use std::{
 
 use collections::HashSet;
 use fs::Fs;
-use futures::{FutureExt, SinkExt, StreamExt, future::BoxFuture, select_biased};
+use futures::{SinkExt, StreamExt, select_biased};
 use gpui::{App, AsyncApp, Entity, WeakEntity};
 use language::{Buffer, BufferSnapshot};
 use postage::oneshot;
 use smol::channel::{Receiver, Sender, bounded, unbounded};
-use text::Anchor;
+
 use util::{ResultExt, maybe};
 use worktree::{Entry, ProjectEntryId, Snapshot, WorktreeSettings};
 
@@ -30,6 +29,7 @@ pub(crate) struct ProjectSearcher {
     fs: Arc<dyn Fs>,
     buffer_store: WeakEntity<BufferStore>,
     pub(crate) snapshots: Vec<(Snapshot, WorktreeSettings)>,
+    open_buffers: HashSet<ProjectEntryId>,
 }
 
 const MAX_SEARCH_RESULT_FILES: usize = 5_000;
@@ -49,10 +49,11 @@ impl ProjectSearcher {
             let worker_pool = executor.scoped(|scope| {
                 let (input_paths_tx, input_paths_rx) = bounded(64);
                 let (find_first_match_tx, find_first_match_rx) = bounded(64);
-
+                let (sorted_search_results_tx, sorted_search_results_rx) = bounded(64);
                 for _ in 0..executor.num_cpus() {
                     let worker = Worker {
                         query: &query,
+                        open_buffers: &self.open_buffers,
                         matched_buffer_count: &matched_buffer_count,
                         matches_count: &matches_count,
                         fs: &*self.fs,
@@ -65,7 +66,12 @@ impl ProjectSearcher {
                     };
                     scope.spawn(worker.run());
                 }
-                scope.spawn(self.provide_search_paths(&query, input_paths_tx))
+                scope.spawn(self.provide_search_paths(
+                    &query,
+                    input_paths_tx,
+                    sorted_search_results_tx,
+                ));
+                scope.spawn(self.maintain_sorted_search_results())
             });
             self.open_buffers(get_buffer_for_full_scan_rx, find_all_matches_tx, cx)
                 .await;
@@ -84,23 +90,42 @@ impl ProjectSearcher {
         &'this self,
         query: &SearchQuery,
         tx: Sender<InputPath<'this>>,
+        results: Sender<oneshot::Receiver<ProjectPath>>,
     ) {
         for (snapshot, worktree_settings) in &self.snapshots {
             for entry in snapshot.entries(query.include_ignored(), 0) {
+                let (should_scan_tx, should_scan_rx) = oneshot::channel();
                 let Ok(_) = tx
                     .send(InputPath {
                         entry,
                         settings: worktree_settings,
                         snapshot: snapshot,
+                        should_scan_tx,
                     })
                     .await
                 else {
                     return;
                 };
+                results.send(should_scan_rx).await;
             }
         }
     }
 
+    async fn maintain_sorted_search_results(
+        &self,
+        rx: Receiver<oneshot::Receiver<ProjectPath>>,
+        paths_for_full_scan: Sender<ProjectPath>,
+    ) {
+        let mut rx = pin!(rx);
+        while let Some(mut next_path_result) = rx.next().await {
+            let Some(successful_path) = next_path_result.next().await else {
+                // This math did not produce a match, hence skip it.
+                continue;
+            };
+            paths_for_full_scan.send(successful_path).await;
+        }
+    }
+
     /// Background workers cannot open buffers by themselves, hence main thread will do it on their behalf.
     async fn open_buffers<'a>(
         &'a self,
@@ -131,6 +156,7 @@ struct Worker<'search> {
     query: &'search SearchQuery,
     matched_buffer_count: &'search AtomicUsize,
     matches_count: &'search AtomicUsize,
+    open_buffers: &'search HashSet<ProjectEntryId>,
     fs: &'search dyn Fs,
     /// Start off with all paths in project and filter them based on:
     /// - Include filters
@@ -158,6 +184,7 @@ impl Worker<'_> {
         let mut scan_path = pin!(self.input_paths_rx.fuse());
         let handler = RequestHandler {
             query: self.query,
+            open_entries: &self.open_buffers,
             fs: self.fs,
             matched_buffer_count: self.matched_buffer_count,
             matches_count: self.matches_count,
@@ -169,7 +196,7 @@ impl Worker<'_> {
             select_biased! {
                 find_all_matches = find_all_matches.next() => {
                     let result = handler.handle_find_all_matches(find_all_matches).await;
-                    if let Some(should_bail) = result {
+                    if let Some(_should_bail) = result {
                         return;
                     }
                 },
@@ -258,75 +285,81 @@ impl RequestHandler<'_> {
             }
 
             if self.query.detect(file).unwrap_or(false) {
-                entry.respond.send(entry.path).await?;
+                // Yes, we should scan the whole file.
+                entry.should_scan_tx.send(entry.path).await?;
             }
             Ok(())
         }).await;
     }
 
     async fn handle_scan_path(&self, req: InputPath<'_>) {
-        let InputPath {
-            entry,
-            settings,
-            snapshot,
-        } = req;
-        if entry.is_dir() && entry.is_ignored {
-            if !settings.is_path_excluded(&entry.path) {
-                Self::scan_ignored_dir(
-                    self.fs,
-                    &snapshot,
-                    &entry.path,
-                    self.query,
-                    &filter_tx,
-                    &output_tx,
-                )
-                .await?;
+        _ = maybe!(async move {
+            let InputPath {
+                entry,
+                settings,
+                snapshot,
+                should_scan_tx,
+            } = req;
+            if entry.is_dir() && entry.is_ignored {
+                if !settings.is_path_excluded(&entry.path) {
+                    Self::scan_ignored_dir(
+                        self.fs,
+                        &snapshot,
+                        &entry.path,
+                        self.query,
+                        &filter_tx,
+                        &output_tx,
+                    )
+                    .await?;
+                }
+                return Ok(());
             }
-            return;
-        }
 
-        if entry.is_fifo || !entry.is_file() {
-            return;
-        }
+            if entry.is_fifo || !entry.is_file() {
+                return Ok(());
+            }
 
-        if self.query.filters_path() {
-            let matched_path = if self.query.match_full_paths() {
-                let mut full_path = snapshot.root_name().as_std_path().to_owned();
-                full_path.push(entry.path.as_std_path());
-                self.query.match_path(&full_path)
-            } else {
-                self.query.match_path(entry.path.as_std_path())
-            };
-            if !matched_path {
-                return;
+            if self.query.filters_path() {
+                let matched_path = if self.query.match_full_paths() {
+                    let mut full_path = snapshot.root_name().as_std_path().to_owned();
+                    full_path.push(entry.path.as_std_path());
+                    self.query.match_path(&full_path)
+                } else {
+                    self.query.match_path(entry.path.as_std_path())
+                };
+                if !matched_path {
+                    return Ok(());
+                }
             }
-        }
 
-        let (mut tx, rx) = oneshot::channel();
+            let (mut tx, rx) = oneshot::channel();
 
-        if self.open_entries.contains(&entry.id) {
-            // The buffer is already in memory and that's the version we want to scan;
-            // hence skip the dilly-dally and look for all matches straight away.
-            self.get_buffer_for_full_scan_tx
-                .send(ProjectPath {
-                    worktree_id: snapshot.id(),
-                    path: entry.path.clone(),
-                })
-                .await?;
-        } else {
-            self.find_first_match_tx
-                .send(MatchingEntry {
-                    respond: tx,
-                    worktree_root: snapshot.abs_path().clone(),
-                    path: ProjectPath {
+            if self.open_entries.contains(&entry.id) {
+                // The buffer is already in memory and that's the version we want to scan;
+                // hence skip the dilly-dally and look for all matches straight away.
+                self.get_buffer_for_full_scan_tx
+                    .send(ProjectPath {
                         worktree_id: snapshot.id(),
                         path: entry.path.clone(),
-                    },
-                })
-                .await?;
-        }
+                    })
+                    .await?;
+            } else {
+                self.find_first_match_tx
+                    .send(MatchingEntry {
+                        should_scan_tx: should_scan_tx,
+                        worktree_root: snapshot.abs_path().clone(),
+                        path: ProjectPath {
+                            worktree_id: snapshot.id(),
+                            path: entry.path.clone(),
+                        },
+                    })
+                    .await?;
+            }
 
-        output_tx.send(rx).await?;
+            output_tx.send(rx).await?;
+            anyhow::Ok(())
+        })
+        .await;
     }
 }
 
@@ -334,10 +367,11 @@ struct InputPath<'worker> {
     entry: &'worker Entry,
     settings: &'worker WorktreeSettings,
     snapshot: &'worker Snapshot,
+    should_scan_tx: oneshot::Sender<ProjectPath>,
 }
 
 struct MatchingEntry {
     worktree_root: Arc<Path>,
     path: ProjectPath,
-    respond: oneshot::Sender<ProjectPath>,
+    should_scan_tx: oneshot::Sender<ProjectPath>,
 }