Fix project search in remote projects

Piotr Osiewicz created

Change summary

Cargo.lock                                   |  10 
crates/project/src/buffer_store.rs           |  63 -----
crates/project/src/project.rs                |  23 +
crates/project/src/project_search.rs         | 265 +++++++++++++--------
crates/remote_server/src/headless_project.rs |  12 
5 files changed, 190 insertions(+), 183 deletions(-)

Detailed changes

Cargo.lock 🔗

@@ -20592,16 +20592,6 @@ dependencies = [
  "zlog",
 ]
 
-[[package]]
-name = "worktree_benchmarks"
-version = "0.1.0"
-dependencies = [
- "fs",
- "gpui",
- "settings",
- "worktree",
-]
-
 [[package]]
 name = "writeable"
 version = "0.6.1"

crates/project/src/buffer_store.rs 🔗

@@ -1,14 +1,12 @@
 use crate::{
-    ProjectItem as _, ProjectPath,
+    ProjectPath,
     lsp_store::OpenLspBufferHandle,
-    search::SearchQuery,
     worktree_store::{WorktreeStore, WorktreeStoreEvent},
 };
 use anyhow::{Context as _, Result, anyhow};
 use client::Client;
 use collections::{HashMap, HashSet, hash_map};
-use fs::Fs;
-use futures::{Future, FutureExt as _, StreamExt, channel::oneshot, future::Shared};
+use futures::{Future, FutureExt as _, channel::oneshot, future::Shared};
 use gpui::{
     App, AppContext as _, AsyncApp, Context, Entity, EventEmitter, Subscription, Task, WeakEntity,
 };
@@ -1103,63 +1101,6 @@ impl BufferStore {
         Some(())
     }
 
-    pub fn find_search_candidates(
-        &mut self,
-        query: &SearchQuery,
-        mut limit: usize,
-        fs: Arc<dyn Fs>,
-        cx: &mut Context<Self>,
-    ) -> Receiver<Entity<Buffer>> {
-        let (tx, rx) = smol::channel::unbounded();
-        let mut open_buffers = HashSet::default();
-        let mut unnamed_buffers = Vec::new();
-        for handle in self.buffers() {
-            let buffer = handle.read(cx);
-            if self.non_searchable_buffers.contains(&buffer.remote_id()) {
-                continue;
-            } else if let Some(entry_id) = buffer.entry_id(cx) {
-                open_buffers.insert(entry_id);
-            } else {
-                limit = limit.saturating_sub(1);
-                unnamed_buffers.push(handle)
-            };
-        }
-
-        const MAX_CONCURRENT_BUFFER_OPENS: usize = 64;
-        let project_paths_rx = self
-            .worktree_store
-            .update(cx, |worktree_store, cx| {
-                worktree_store.find_search_candidates(query.clone(), limit, open_buffers, fs, cx)
-            })
-            .chunks(MAX_CONCURRENT_BUFFER_OPENS);
-
-        cx.spawn(async move |this, cx| {
-            for buffer in unnamed_buffers {
-                tx.send(buffer).await.ok();
-            }
-
-            let mut project_paths_rx = pin!(project_paths_rx);
-            while let Some(project_paths) = project_paths_rx.next().await {
-                let buffers = this.update(cx, |this, cx| {
-                    project_paths
-                        .into_iter()
-                        .map(|project_path| this.open_buffer(project_path, cx))
-                        .collect::<Vec<_>>()
-                })?;
-                for buffer_task in buffers {
-                    if let Some(buffer) = buffer_task.await.log_err()
-                        && tx.send(buffer).await.is_err()
-                    {
-                        return anyhow::Ok(());
-                    }
-                }
-            }
-            anyhow::Ok(())
-        })
-        .detach();
-        rx
-    }
-
     fn on_buffer_event(
         &mut self,
         buffer: Entity<Buffer>,

crates/project/src/project.rs 🔗

@@ -49,6 +49,7 @@ pub use git_store::{
     git_traversal::{ChildEntriesGitIter, GitEntry, GitEntryRef, GitTraversal},
 };
 pub use manifest_tree::ManifestTree;
+pub use project_search::Search;
 
 use anyhow::{Context as _, Result, anyhow};
 use buffer_store::{BufferStore, BufferStoreEvent};
@@ -4013,14 +4014,20 @@ impl Project {
         } else {
             None
         };
-        let searcher = project_search::Search {
-            fs: self.fs.clone(),
-            buffer_store: self.buffer_store.clone(),
-            worktree_store: self.worktree_store.clone(),
-            worktrees: self.visible_worktrees(cx).collect::<Vec<_>>(),
-            limit: project_search::Search::MAX_SEARCH_RESULT_FILES + 1,
-            client,
-            remotely_created_models: self.remotely_created_models.clone(),
+        let searcher = match client {
+            Some((client, remote_id)) => project_search::Search::remote(
+                self.buffer_store.clone(),
+                self.worktree_store.clone(),
+                project_search::Search::MAX_SEARCH_RESULT_FILES + 1,
+                (client, remote_id, self.remotely_created_models.clone()),
+            ),
+            None => project_search::Search::local(
+                self.fs.clone(),
+                self.buffer_store.clone(),
+                self.worktree_store.clone(),
+                project_search::Search::MAX_SEARCH_RESULT_FILES + 1,
+                cx,
+            ),
         };
         searcher.into_results(query, cx)
     }

crates/project/src/project_search.rs 🔗

@@ -33,31 +33,40 @@ use crate::{
     worktree_store::WorktreeStore,
 };
 
-pub(crate) struct Search {
-    pub(crate) fs: Arc<dyn Fs>,
-    pub(crate) buffer_store: Entity<BufferStore>,
-    pub(crate) worktree_store: Entity<WorktreeStore>,
-    pub(crate) worktrees: Vec<Entity<Worktree>>,
-    pub(crate) limit: usize,
-    pub(crate) client: Option<(AnyProtoClient, u64)>,
-    pub(crate) remotely_created_models: Arc<Mutex<RemotelyCreatedModels>>,
+pub struct Search {
+    buffer_store: Entity<BufferStore>,
+    worktree_store: Entity<WorktreeStore>,
+    limit: usize,
+    kind: SearchKind,
+}
+
+enum SearchKind {
+    Local {
+        fs: Arc<dyn Fs>,
+        worktrees: Vec<Entity<Worktree>>,
+    },
+    Remote {
+        client: AnyProtoClient,
+        remote_id: u64,
+        models: Arc<Mutex<RemotelyCreatedModels>>,
+    },
 }
 
 /// Represents results of project search and allows one to either obtain match positions OR
 /// just the handles to buffers that may match the search.
 #[must_use]
-pub(crate) struct SearchResultsHandle {
+pub struct SearchResultsHandle {
     results: Receiver<SearchResult>,
     matching_buffers: Receiver<Entity<Buffer>>,
     trigger_search: Box<dyn FnOnce(&mut App) -> Task<()> + Send + Sync>,
 }
 
 impl SearchResultsHandle {
-    pub(crate) fn results(self, cx: &mut App) -> Receiver<SearchResult> {
+    pub fn results(self, cx: &mut App) -> Receiver<SearchResult> {
         (self.trigger_search)(cx).detach();
         self.results
     }
-    pub(crate) fn matching_buffers(self, cx: &mut App) -> Receiver<Entity<Buffer>> {
+    pub fn matching_buffers(self, cx: &mut App) -> Receiver<Entity<Buffer>> {
         (self.trigger_search)(cx).detach();
         self.matching_buffers
     }
@@ -66,6 +75,7 @@ impl SearchResultsHandle {
 #[derive(Clone)]
 enum FindSearchCandidates {
     Local {
+        fs: Arc<dyn Fs>,
         /// Start off with all paths in project and filter them based on:
         /// - Include filters
         /// - Exclude filters
@@ -85,11 +95,46 @@ enum FindSearchCandidates {
 }
 
 impl Search {
+    pub fn local(
+        fs: Arc<dyn Fs>,
+        buffer_store: Entity<BufferStore>,
+        worktree_store: Entity<WorktreeStore>,
+
+        limit: usize,
+        cx: &mut App,
+    ) -> Self {
+        let worktrees = worktree_store.read(cx).visible_worktrees(cx).collect();
+        Self {
+            kind: SearchKind::Local { fs, worktrees },
+            buffer_store,
+            worktree_store,
+            limit,
+        }
+    }
+
+    pub(crate) fn remote(
+        buffer_store: Entity<BufferStore>,
+        worktree_store: Entity<WorktreeStore>,
+        limit: usize,
+        client_state: (AnyProtoClient, u64, Arc<Mutex<RemotelyCreatedModels>>),
+    ) -> Self {
+        Self {
+            kind: SearchKind::Remote {
+                client: client_state.0,
+                remote_id: client_state.1,
+                models: client_state.2,
+            },
+            buffer_store,
+            worktree_store,
+            limit,
+        }
+    }
+
     pub(crate) const MAX_SEARCH_RESULT_FILES: usize = 5_000;
     pub(crate) const MAX_SEARCH_RESULT_RANGES: usize = 10_000;
     /// Prepares a project search run. The result has to be used to specify whether you're interested in matching buffers
     /// or full search results.
-    pub(crate) fn into_results(mut self, query: SearchQuery, cx: &mut App) -> SearchResultsHandle {
+    pub fn into_results(mut self, query: SearchQuery, cx: &mut App) -> SearchResultsHandle {
         let mut open_buffers = HashSet::default();
         let mut unnamed_buffers = Vec::new();
         const MAX_CONCURRENT_BUFFER_OPENS: usize = 64;
@@ -109,7 +154,7 @@ impl Search {
         let (tx, rx) = unbounded();
         let (grab_buffer_snapshot_tx, grab_buffer_snapshot_rx) = unbounded();
         let matching_buffers = grab_buffer_snapshot_rx.clone();
-        let trigger_search = Box::new(|cx: &mut App| {
+        let trigger_search = Box::new(move |cx: &mut App| {
             cx.spawn(async move |cx| {
                 for buffer in unnamed_buffers {
                     _ = grab_buffer_snapshot_tx.send(buffer).await;
@@ -118,86 +163,100 @@ impl Search {
                 let (find_all_matches_tx, find_all_matches_rx) =
                     bounded(MAX_CONCURRENT_BUFFER_OPENS);
 
-                let (candidate_searcher, tasks) = if let Some((client, remote_id)) = self.client {
-                    let request = client.request(proto::FindSearchCandidates {
-                        project_id: remote_id,
-                        query: Some(query.to_proto()),
-                        limit: self.limit as _,
-                    });
-                    let Ok(guard) = cx.update(|cx| {
-                        Project::retain_remotely_created_models_impl(
-                            &self.remotely_created_models,
-                            &self.buffer_store,
-                            &self.worktree_store,
-                            cx,
+                let (candidate_searcher, tasks) = match self.kind {
+                    SearchKind::Local {
+                        fs,
+                        ref mut worktrees,
+                    } => {
+                        let (get_buffer_for_full_scan_tx, get_buffer_for_full_scan_rx) =
+                            unbounded();
+                        let (confirm_contents_will_match_tx, confirm_contents_will_match_rx) =
+                            bounded(64);
+                        let (sorted_search_results_tx, sorted_search_results_rx) = unbounded();
+
+                        let (input_paths_tx, input_paths_rx) = unbounded();
+
+                        let tasks = vec![
+                            cx.spawn(Self::provide_search_paths(
+                                std::mem::take(worktrees),
+                                query.include_ignored(),
+                                input_paths_tx,
+                                sorted_search_results_tx,
+                            ))
+                            .boxed_local(),
+                            Self::open_buffers(
+                                &self.buffer_store,
+                                get_buffer_for_full_scan_rx,
+                                grab_buffer_snapshot_tx,
+                                cx.clone(),
+                            )
+                            .boxed_local(),
+                            cx.background_spawn(Self::maintain_sorted_search_results(
+                                sorted_search_results_rx,
+                                get_buffer_for_full_scan_tx.clone(),
+                                self.limit,
+                            ))
+                            .boxed_local(),
+                        ];
+                        (
+                            FindSearchCandidates::Local {
+                                fs,
+                                get_buffer_for_full_scan_tx,
+                                confirm_contents_will_match_tx,
+                                confirm_contents_will_match_rx,
+                                input_paths_rx,
+                            },
+                            tasks,
                         )
-                    }) else {
-                        return;
-                    };
-                    let buffer_store = self.buffer_store.downgrade();
-                    let issue_remote_buffers_request = cx
-                        .spawn(async move |cx| {
-                            let _ = maybe!(async move {
-                                let response = request.await?;
-
-                                for buffer_id in response.buffer_ids {
-                                    let buffer_id = BufferId::new(buffer_id)?;
-                                    let buffer = buffer_store
-                                        .update(cx, |buffer_store, cx| {
-                                            buffer_store.wait_for_remote_buffer(buffer_id, cx)
-                                        })?
-                                        .await?;
-                                    let _ = grab_buffer_snapshot_tx.send(buffer).await;
-                                }
-
-                                drop(guard);
-                                anyhow::Ok(())
+                    }
+                    SearchKind::Remote {
+                        client,
+                        remote_id,
+                        models,
+                    } => {
+                        let request = client.request(proto::FindSearchCandidates {
+                            project_id: remote_id,
+                            query: Some(query.to_proto()),
+                            limit: self.limit as _,
+                        });
+                        let Ok(guard) = cx.update(|cx| {
+                            Project::retain_remotely_created_models_impl(
+                                &models,
+                                &self.buffer_store,
+                                &self.worktree_store,
+                                cx,
+                            )
+                        }) else {
+                            return;
+                        };
+                        let buffer_store = self.buffer_store.downgrade();
+                        let issue_remote_buffers_request = cx
+                            .spawn(async move |cx| {
+                                let _ = maybe!(async move {
+                                    let response = request.await?;
+
+                                    for buffer_id in response.buffer_ids {
+                                        let buffer_id = BufferId::new(buffer_id)?;
+                                        let buffer = buffer_store
+                                            .update(cx, |buffer_store, cx| {
+                                                buffer_store.wait_for_remote_buffer(buffer_id, cx)
+                                            })?
+                                            .await?;
+                                        let _ = grab_buffer_snapshot_tx.send(buffer).await;
+                                    }
+
+                                    drop(guard);
+                                    anyhow::Ok(())
+                                })
+                                .await
+                                .log_err();
                             })
-                            .await
-                            .log_err();
-                        })
-                        .boxed_local();
-                    (
-                        FindSearchCandidates::Remote,
-                        vec![issue_remote_buffers_request],
-                    )
-                } else {
-                    let (get_buffer_for_full_scan_tx, get_buffer_for_full_scan_rx) = unbounded();
-                    let (confirm_contents_will_match_tx, confirm_contents_will_match_rx) =
-                        bounded(64);
-                    let (sorted_search_results_tx, sorted_search_results_rx) = unbounded();
-
-                    let (input_paths_tx, input_paths_rx) = unbounded();
-                    let tasks = vec![
-                        cx.spawn(Self::provide_search_paths(
-                            std::mem::take(&mut self.worktrees),
-                            query.include_ignored(),
-                            input_paths_tx,
-                            sorted_search_results_tx,
-                        ))
-                        .boxed_local(),
-                        self.open_buffers(
-                            get_buffer_for_full_scan_rx,
-                            grab_buffer_snapshot_tx,
-                            cx.clone(),
+                            .boxed_local();
+                        (
+                            FindSearchCandidates::Remote,
+                            vec![issue_remote_buffers_request],
                         )
-                        .boxed_local(),
-                        cx.background_spawn(Self::maintain_sorted_search_results(
-                            sorted_search_results_rx,
-                            get_buffer_for_full_scan_tx.clone(),
-                            self.limit,
-                        ))
-                        .boxed_local(),
-                    ];
-                    (
-                        FindSearchCandidates::Local {
-                            get_buffer_for_full_scan_tx,
-                            confirm_contents_will_match_tx,
-                            confirm_contents_will_match_rx,
-                            input_paths_rx,
-                        },
-                        tasks,
-                    )
+                    }
                 };
 
                 let matches_count = AtomicUsize::new(0);
@@ -213,7 +272,6 @@ impl Search {
                             open_buffers: &open_buffers,
                             matched_buffer_count: &matched_buffer_count,
                             matches_count: &matches_count,
-                            fs: &*self.fs,
                             candidates: candidate_searcher.clone(),
                             find_all_matches_rx: find_all_matches_rx.clone(),
                             publish_matches: tx.clone(),
@@ -336,7 +394,7 @@ impl Search {
 
     /// Background workers cannot open buffers by themselves, hence main thread will do it on their behalf.
     async fn open_buffers(
-        &self,
+        buffer_store: &Entity<BufferStore>,
         rx: Receiver<ProjectPath>,
         find_all_matches_tx: Sender<Entity<Buffer>>,
         mut cx: AsyncApp,
@@ -344,7 +402,7 @@ impl Search {
         let mut rx = pin!(rx.ready_chunks(64));
         _ = maybe!(async move {
             while let Some(requested_paths) = rx.next().await {
-                let mut buffers = self.buffer_store.update(&mut cx, |this, cx| {
+                let mut buffers = buffer_store.update(&mut cx, |this, cx| {
                     requested_paths
                         .into_iter()
                         .map(|path| this.open_buffer(path, cx))
@@ -383,8 +441,6 @@ struct Worker<'search> {
     matched_buffer_count: &'search AtomicUsize,
     matches_count: &'search AtomicUsize,
     open_buffers: &'search HashSet<ProjectEntryId>,
-    fs: &'search dyn Fs,
-
     candidates: FindSearchCandidates,
     /// Ok, we're back in background: run full scan & find all matches in a given buffer snapshot.
     find_all_matches_rx: Receiver<(Entity<Buffer>, BufferSnapshot)>,
@@ -399,8 +455,10 @@ impl Worker<'_> {
             confirm_contents_will_match_rx,
             mut confirm_contents_will_match_tx,
             mut get_buffer_for_full_scan_tx,
+            fs,
         ) = match self.candidates {
             FindSearchCandidates::Local {
+                fs,
                 input_paths_rx,
                 confirm_contents_will_match_rx,
                 confirm_contents_will_match_tx,
@@ -410,10 +468,15 @@ impl Worker<'_> {
                 confirm_contents_will_match_rx,
                 confirm_contents_will_match_tx,
                 get_buffer_for_full_scan_tx,
+                Some(fs),
+            ),
+            FindSearchCandidates::Remote => (
+                unbounded().1,
+                unbounded().1,
+                unbounded().0,
+                unbounded().0,
+                None,
             ),
-            FindSearchCandidates::Remote => {
-                (unbounded().1, unbounded().1, unbounded().0, unbounded().0)
-            }
         };
         let mut find_all_matches = pin!(self.find_all_matches_rx.fuse());
         let mut find_first_match = pin!(confirm_contents_will_match_rx.fuse());
@@ -423,7 +486,7 @@ impl Worker<'_> {
             let handler = RequestHandler {
                 query: self.query,
                 open_entries: &self.open_buffers,
-                fs: self.fs,
+                fs: fs.as_deref(),
                 matched_buffer_count: self.matched_buffer_count,
                 matches_count: self.matches_count,
                 confirm_contents_will_match_tx: &confirm_contents_will_match_tx,
@@ -479,7 +542,7 @@ impl Worker<'_> {
 
 struct RequestHandler<'worker> {
     query: &'worker SearchQuery,
-    fs: &'worker dyn Fs,
+    fs: Option<&'worker dyn Fs>,
     open_entries: &'worker HashSet<ProjectEntryId>,
     matched_buffer_count: &'worker AtomicUsize,
     matches_count: &'worker AtomicUsize,
@@ -525,7 +588,7 @@ impl RequestHandler<'_> {
     async fn handle_find_first_match(&self, mut entry: MatchingEntry) {
         _=maybe!(async move {
             let abs_path = entry.worktree_root.join(entry.path.path.as_std_path());
-            let Some(file) = self.fs.open_sync(&abs_path).await.log_err() else {
+            let Some(file) = self.fs.context("Trying to query filesystem in remote project search")?.open_sync(&abs_path).await.log_err() else {
                 return anyhow::Ok(());
             };
 

crates/remote_server/src/headless_project.rs 🔗

@@ -640,9 +640,15 @@ impl HeadlessProject {
             PathStyle::local(),
         )?;
         let results = this.update(&mut cx, |this, cx| {
-            this.buffer_store.update(cx, |buffer_store, cx| {
-                buffer_store.find_search_candidates(&query, message.limit as _, this.fs.clone(), cx)
-            })
+            project::Search::local(
+                this.fs.clone(),
+                this.buffer_store.clone(),
+                this.worktree_store.clone(),
+                message.limit as _,
+                cx,
+            )
+            .into_results(query, cx)
+            .matching_buffers(cx)
         })?;
 
         let mut response = proto::FindSearchCandidatesResponse {