search: New old search implementation (#39956)

Piotr Osiewicz , Smit Barmase , and Smit Barmase created

This is an in-progress work on changing how task scheduler affects
performance of project search. Instead of relying on tasks being
executed at a discretion of the task scheduler, we want to experiment
with having a set of "agents" that prioritize driving in-progress
project search matches to completion over pushing the whole thing to
completion. This should hopefully significantly improve throughput &
latency of project search.

Release Notes:

- Improved project search performance

---------

Co-authored-by: Smit Barmase <smit@zed.dev>
Co-authored-by: Smit Barmase <heysmitbarmase@gmail.com>

Change summary

Cargo.lock                                   |  27 
Cargo.toml                                   |   2 
crates/project/src/buffer_store.rs           |  71 -
crates/project/src/project.rs                | 235 +-----
crates/project/src/project_search.rs         | 754 ++++++++++++++++++++++
crates/project/src/worktree_store.rs         | 151 ----
crates/project_benchmarks/Cargo.toml         |  21 
crates/project_benchmarks/LICENSE-GPL        |   1 
crates/project_benchmarks/src/main.rs        | 136 +++
crates/remote_server/Cargo.toml              |   2 
crates/remote_server/src/headless_project.rs |  12 
11 files changed, 1,023 insertions(+), 389 deletions(-)

Detailed changes

Cargo.lock 🔗

@@ -12890,6 +12890,23 @@ dependencies = [
  "zlog",
 ]
 
+[[package]]
+name = "project_benchmarks"
+version = "0.1.0"
+dependencies = [
+ "anyhow",
+ "clap",
+ "client",
+ "futures 0.3.31",
+ "gpui",
+ "http_client",
+ "language",
+ "node_runtime",
+ "project",
+ "settings",
+ "watch",
+]
+
 [[package]]
 name = "project_panel"
 version = "0.1.0"
@@ -20590,6 +20607,16 @@ dependencies = [
  "zlog",
 ]
 
+[[package]]
+name = "worktree_benchmarks"
+version = "0.1.0"
+dependencies = [
+ "fs",
+ "gpui",
+ "settings",
+ "worktree",
+]
+
 [[package]]
 name = "writeable"
 version = "0.6.1"

Cargo.toml 🔗

@@ -126,6 +126,7 @@ members = [
     "crates/picker",
     "crates/prettier",
     "crates/project",
+    "crates/project_benchmarks",
     "crates/project_panel",
     "crates/project_symbols",
     "crates/prompt_store",
@@ -194,6 +195,7 @@ members = [
     "crates/web_search_providers",
     "crates/workspace",
     "crates/worktree",
+    "crates/worktree_benchmarks",
     "crates/x_ai",
     "crates/zed",
     "crates/zed_actions",

crates/project/src/buffer_store.rs 🔗

@@ -1,14 +1,12 @@
 use crate::{
-    ProjectItem as _, ProjectPath,
+    ProjectPath,
     lsp_store::OpenLspBufferHandle,
-    search::SearchQuery,
     worktree_store::{WorktreeStore, WorktreeStoreEvent},
 };
 use anyhow::{Context as _, Result, anyhow};
 use client::Client;
 use collections::{HashMap, HashSet, hash_map};
-use fs::Fs;
-use futures::{Future, FutureExt as _, StreamExt, channel::oneshot, future::Shared};
+use futures::{Future, FutureExt as _, channel::oneshot, future::Shared};
 use gpui::{
     App, AppContext as _, AsyncApp, Context, Entity, EventEmitter, Subscription, Task, WeakEntity,
 };
@@ -23,8 +21,8 @@ use rpc::{
     AnyProtoClient, ErrorCode, ErrorExt as _, TypedEnvelope,
     proto::{self},
 };
-use smol::channel::Receiver;
-use std::{io, pin::pin, sync::Arc, time::Instant};
+
+use std::{io, sync::Arc, time::Instant};
 use text::{BufferId, ReplicaId};
 use util::{ResultExt as _, TryFutureExt, debug_panic, maybe, rel_path::RelPath};
 use worktree::{File, PathChange, ProjectEntryId, Worktree, WorktreeId};
@@ -972,6 +970,10 @@ impl BufferStore {
             .filter_map(|buffer| buffer.upgrade())
     }
 
+    pub(crate) fn is_searchable(&self, id: &BufferId) -> bool {
+        !self.non_searchable_buffers.contains(&id)
+    }
+
     pub fn loading_buffers(
         &self,
     ) -> impl Iterator<Item = (&ProjectPath, impl Future<Output = Result<Entity<Buffer>>>)> {
@@ -1096,63 +1098,6 @@ impl BufferStore {
         Some(())
     }
 
-    pub fn find_search_candidates(
-        &mut self,
-        query: &SearchQuery,
-        mut limit: usize,
-        fs: Arc<dyn Fs>,
-        cx: &mut Context<Self>,
-    ) -> Receiver<Entity<Buffer>> {
-        let (tx, rx) = smol::channel::unbounded();
-        let mut open_buffers = HashSet::default();
-        let mut unnamed_buffers = Vec::new();
-        for handle in self.buffers() {
-            let buffer = handle.read(cx);
-            if self.non_searchable_buffers.contains(&buffer.remote_id()) {
-                continue;
-            } else if let Some(entry_id) = buffer.entry_id(cx) {
-                open_buffers.insert(entry_id);
-            } else {
-                limit = limit.saturating_sub(1);
-                unnamed_buffers.push(handle)
-            };
-        }
-
-        const MAX_CONCURRENT_BUFFER_OPENS: usize = 64;
-        let project_paths_rx = self
-            .worktree_store
-            .update(cx, |worktree_store, cx| {
-                worktree_store.find_search_candidates(query.clone(), limit, open_buffers, fs, cx)
-            })
-            .chunks(MAX_CONCURRENT_BUFFER_OPENS);
-
-        cx.spawn(async move |this, cx| {
-            for buffer in unnamed_buffers {
-                tx.send(buffer).await.ok();
-            }
-
-            let mut project_paths_rx = pin!(project_paths_rx);
-            while let Some(project_paths) = project_paths_rx.next().await {
-                let buffers = this.update(cx, |this, cx| {
-                    project_paths
-                        .into_iter()
-                        .map(|project_path| this.open_buffer(project_path, cx))
-                        .collect::<Vec<_>>()
-                })?;
-                for buffer_task in buffers {
-                    if let Some(buffer) = buffer_task.await.log_err()
-                        && tx.send(buffer).await.is_err()
-                    {
-                        return anyhow::Ok(());
-                    }
-                }
-            }
-            anyhow::Ok(())
-        })
-        .detach();
-        rx
-    }
-
     fn on_buffer_event(
         &mut self,
         buffer: Entity<Buffer>,

crates/project/src/project.rs 🔗

@@ -11,6 +11,7 @@ pub mod lsp_command;
 pub mod lsp_store;
 mod manifest_tree;
 pub mod prettier_store;
+mod project_search;
 pub mod project_settings;
 pub mod search;
 mod task_inventory;
@@ -39,6 +40,7 @@ use crate::{
     agent_server_store::AllAgentServersSettings,
     git_store::GitStore,
     lsp_store::{SymbolLocation, log_store::LogKind},
+    project_search::SearchResultsHandle,
 };
 pub use agent_server_store::{AgentServerStore, AgentServersUpdated};
 pub use git_store::{
@@ -46,6 +48,7 @@ pub use git_store::{
     git_traversal::{ChildEntriesGitIter, GitEntry, GitEntryRef, GitTraversal},
 };
 pub use manifest_tree::ManifestTree;
+pub use project_search::Search;
 
 use anyhow::{Context as _, Result, anyhow};
 use buffer_store::{BufferStore, BufferStoreEvent};
@@ -109,7 +112,7 @@ use snippet_provider::SnippetProvider;
 use std::{
     borrow::Cow,
     collections::BTreeMap,
-    ops::Range,
+    ops::{Not as _, Range},
     path::{Path, PathBuf},
     pin::pin,
     str,
@@ -123,7 +126,7 @@ use text::{Anchor, BufferId, OffsetRangeExt, Point, Rope};
 use toolchain_store::EmptyToolchainStore;
 use util::{
     ResultExt as _, maybe,
-    paths::{PathStyle, SanitizedPath, compare_paths, is_absolute},
+    paths::{PathStyle, SanitizedPath, is_absolute},
     rel_path::RelPath,
 };
 use worktree::{CreatedEntry, Snapshot, Traversal};
@@ -150,8 +153,6 @@ pub use lsp_store::{
 };
 pub use toolchain_store::{ToolchainStore, Toolchains};
 const MAX_PROJECT_SEARCH_HISTORY_SIZE: usize = 500;
-const MAX_SEARCH_RESULT_FILES: usize = 5_000;
-const MAX_SEARCH_RESULT_RANGES: usize = 10_000;
 
 pub trait ProjectItem: 'static {
     fn try_open(
@@ -3935,179 +3936,44 @@ impl Project {
         })
     }
 
-    pub fn search(&mut self, query: SearchQuery, cx: &mut Context<Self>) -> Receiver<SearchResult> {
-        let (result_tx, result_rx) = smol::channel::unbounded();
-
-        let matching_buffers_rx = if query.is_opened_only() {
-            self.sort_search_candidates(&query, cx)
-        } else {
-            self.find_search_candidate_buffers(&query, MAX_SEARCH_RESULT_FILES + 1, cx)
-        };
-
-        cx.spawn(async move |_, cx| {
-            let mut range_count = 0;
-            let mut buffer_count = 0;
-            let mut limit_reached = false;
-            let query = Arc::new(query);
-            let chunks = matching_buffers_rx.ready_chunks(64);
-
-            // Now that we know what paths match the query, we will load at most
-            // 64 buffers at a time to avoid overwhelming the main thread. For each
-            // opened buffer, we will spawn a background task that retrieves all the
-            // ranges in the buffer matched by the query.
-            let mut chunks = pin!(chunks);
-            'outer: while let Some(matching_buffer_chunk) = chunks.next().await {
-                let mut chunk_results = Vec::with_capacity(matching_buffer_chunk.len());
-                for buffer in matching_buffer_chunk {
-                    let query = query.clone();
-                    let snapshot = buffer.read_with(cx, |buffer, _| buffer.snapshot())?;
-                    chunk_results.push(cx.background_spawn(async move {
-                        let ranges = query
-                            .search(&snapshot, None)
-                            .await
-                            .iter()
-                            .map(|range| {
-                                snapshot.anchor_before(range.start)
-                                    ..snapshot.anchor_after(range.end)
-                            })
-                            .collect::<Vec<_>>();
-                        anyhow::Ok((buffer, ranges))
-                    }));
-                }
-
-                let chunk_results = futures::future::join_all(chunk_results).await;
-                for result in chunk_results {
-                    if let Some((buffer, ranges)) = result.log_err() {
-                        range_count += ranges.len();
-                        buffer_count += 1;
-                        result_tx
-                            .send(SearchResult::Buffer { buffer, ranges })
-                            .await?;
-                        if buffer_count > MAX_SEARCH_RESULT_FILES
-                            || range_count > MAX_SEARCH_RESULT_RANGES
-                        {
-                            limit_reached = true;
-                            break 'outer;
-                        }
-                    }
-                }
-            }
-
-            if limit_reached {
-                result_tx.send(SearchResult::LimitReached).await?;
-            }
-
-            anyhow::Ok(())
-        })
-        .detach();
-
-        result_rx
-    }
-
-    fn find_search_candidate_buffers(
-        &mut self,
-        query: &SearchQuery,
-        limit: usize,
-        cx: &mut Context<Project>,
-    ) -> Receiver<Entity<Buffer>> {
-        if self.is_local() {
-            let fs = self.fs.clone();
-            self.buffer_store.update(cx, |buffer_store, cx| {
-                buffer_store.find_search_candidates(query, limit, fs, cx)
-            })
-        } else {
-            self.find_search_candidates_remote(query, limit, cx)
-        }
-    }
-
-    fn sort_search_candidates(
-        &mut self,
-        search_query: &SearchQuery,
-        cx: &mut Context<Project>,
-    ) -> Receiver<Entity<Buffer>> {
-        let worktree_store = self.worktree_store.read(cx);
-        let mut buffers = search_query
-            .buffers()
-            .into_iter()
-            .flatten()
-            .filter(|buffer| {
-                let b = buffer.read(cx);
-                if let Some(file) = b.file() {
-                    if !search_query.match_path(file.path().as_std_path()) {
-                        return false;
-                    }
-                    if let Some(entry) = b
-                        .entry_id(cx)
-                        .and_then(|entry_id| worktree_store.entry_for_id(entry_id, cx))
-                        && entry.is_ignored
-                        && !search_query.include_ignored()
-                    {
-                        return false;
-                    }
-                }
-                true
-            })
-            .collect::<Vec<_>>();
-        let (tx, rx) = smol::channel::unbounded();
-        buffers.sort_by(|a, b| match (a.read(cx).file(), b.read(cx).file()) {
-            (None, None) => a.read(cx).remote_id().cmp(&b.read(cx).remote_id()),
-            (None, Some(_)) => std::cmp::Ordering::Less,
-            (Some(_), None) => std::cmp::Ordering::Greater,
-            (Some(a), Some(b)) => compare_paths(
-                (a.path().as_std_path(), true),
-                (b.path().as_std_path(), true),
-            ),
-        });
-        for buffer in buffers {
-            tx.send_blocking(buffer.clone()).unwrap()
-        }
-
-        rx
-    }
-
-    fn find_search_candidates_remote(
-        &mut self,
-        query: &SearchQuery,
-        limit: usize,
-        cx: &mut Context<Project>,
-    ) -> Receiver<Entity<Buffer>> {
-        let (tx, rx) = smol::channel::unbounded();
-
-        let (client, remote_id): (AnyProtoClient, _) = if let Some(ssh_client) = &self.remote_client
-        {
-            (ssh_client.read(cx).proto_client(), 0)
+    fn search_impl(&mut self, query: SearchQuery, cx: &mut Context<Self>) -> SearchResultsHandle {
+        let client: Option<(AnyProtoClient, _)> = if let Some(ssh_client) = &self.remote_client {
+            Some((ssh_client.read(cx).proto_client(), 0))
         } else if let Some(remote_id) = self.remote_id() {
-            (self.collab_client.clone().into(), remote_id)
+            self.is_local()
+                .not()
+                .then(|| (self.collab_client.clone().into(), remote_id))
         } else {
-            return rx;
+            None
         };
-
-        let request = client.request(proto::FindSearchCandidates {
-            project_id: remote_id,
-            query: Some(query.to_proto()),
-            limit: limit as _,
-        });
-        let guard = self.retain_remotely_created_models(cx);
-
-        cx.spawn(async move |project, cx| {
-            let response = request.await?;
-            for buffer_id in response.buffer_ids {
-                let buffer_id = BufferId::new(buffer_id)?;
-                let buffer = project
-                    .update(cx, |project, cx| {
-                        project.buffer_store.update(cx, |buffer_store, cx| {
-                            buffer_store.wait_for_remote_buffer(buffer_id, cx)
-                        })
-                    })?
-                    .await?;
-                let _ = tx.send(buffer).await;
+        let searcher = if query.is_opened_only() {
+            project_search::Search::open_buffers_only(
+                self.buffer_store.clone(),
+                self.worktree_store.clone(),
+                project_search::Search::MAX_SEARCH_RESULT_FILES + 1,
+            )
+        } else {
+            match client {
+                Some((client, remote_id)) => project_search::Search::remote(
+                    self.buffer_store.clone(),
+                    self.worktree_store.clone(),
+                    project_search::Search::MAX_SEARCH_RESULT_FILES + 1,
+                    (client, remote_id, self.remotely_created_models.clone()),
+                ),
+                None => project_search::Search::local(
+                    self.fs.clone(),
+                    self.buffer_store.clone(),
+                    self.worktree_store.clone(),
+                    project_search::Search::MAX_SEARCH_RESULT_FILES + 1,
+                    cx,
+                ),
             }
+        };
+        searcher.into_handle(query, cx)
+    }
 
-            drop(guard);
-            anyhow::Ok(())
-        })
-        .detach_and_log_err(cx);
-        rx
+    pub fn search(&mut self, query: SearchQuery, cx: &mut Context<Self>) -> Receiver<SearchResult> {
+        self.search_impl(query, cx).results(cx)
     }
 
     pub fn request_lsp<R: LspCommand>(
@@ -4832,18 +4698,31 @@ impl Project {
     fn retain_remotely_created_models(
         &mut self,
         cx: &mut Context<Self>,
+    ) -> RemotelyCreatedModelGuard {
+        Self::retain_remotely_created_models_impl(
+            &self.remotely_created_models,
+            &self.buffer_store,
+            &self.worktree_store,
+            cx,
+        )
+    }
+
+    fn retain_remotely_created_models_impl(
+        models: &Arc<Mutex<RemotelyCreatedModels>>,
+        buffer_store: &Entity<BufferStore>,
+        worktree_store: &Entity<WorktreeStore>,
+        cx: &mut App,
     ) -> RemotelyCreatedModelGuard {
         {
-            let mut remotely_create_models = self.remotely_created_models.lock();
+            let mut remotely_create_models = models.lock();
             if remotely_create_models.retain_count == 0 {
-                remotely_create_models.buffers = self.buffer_store.read(cx).buffers().collect();
-                remotely_create_models.worktrees =
-                    self.worktree_store.read(cx).worktrees().collect();
+                remotely_create_models.buffers = buffer_store.read(cx).buffers().collect();
+                remotely_create_models.worktrees = worktree_store.read(cx).worktrees().collect();
             }
             remotely_create_models.retain_count += 1;
         }
         RemotelyCreatedModelGuard {
-            remote_models: Arc::downgrade(&self.remotely_created_models),
+            remote_models: Arc::downgrade(&models),
         }
     }
 
@@ -4913,7 +4792,7 @@ impl Project {
         let query =
             SearchQuery::from_proto(message.query.context("missing query field")?, path_style)?;
         let results = this.update(&mut cx, |this, cx| {
-            this.find_search_candidate_buffers(&query, message.limit as _, cx)
+            this.search_impl(query, cx).matching_buffers(cx)
         })?;
 
         let mut response = proto::FindSearchCandidatesResponse {

crates/project/src/project_search.rs 🔗

@@ -0,0 +1,754 @@
+use std::{
+    io::{BufRead, BufReader},
+    path::Path,
+    pin::pin,
+    sync::{
+        Arc,
+        atomic::{AtomicUsize, Ordering},
+    },
+};
+
+use anyhow::Context;
+use collections::HashSet;
+use fs::Fs;
+use futures::{SinkExt, StreamExt, select_biased, stream::FuturesOrdered};
+use gpui::{App, AppContext, AsyncApp, Entity, Task};
+use language::{Buffer, BufferSnapshot};
+use parking_lot::Mutex;
+use postage::oneshot;
+use rpc::{AnyProtoClient, proto};
+use smol::{
+    channel::{Receiver, Sender, bounded, unbounded},
+    future::FutureExt,
+};
+
+use text::BufferId;
+use util::{ResultExt, maybe, paths::compare_rel_paths};
+use worktree::{Entry, ProjectEntryId, Snapshot, Worktree};
+
+use crate::{
+    Project, ProjectItem, ProjectPath, RemotelyCreatedModels,
+    buffer_store::BufferStore,
+    search::{SearchQuery, SearchResult},
+    worktree_store::WorktreeStore,
+};
+
+pub struct Search {
+    buffer_store: Entity<BufferStore>,
+    worktree_store: Entity<WorktreeStore>,
+    limit: usize,
+    kind: SearchKind,
+}
+
+/// Represents search setup, before it is actually kicked off with Search::into_results
+enum SearchKind {
+    /// Search for candidates by inspecting file contents on file system, avoiding loading the buffer unless we know that a given file contains a match.
+    Local {
+        fs: Arc<dyn Fs>,
+        worktrees: Vec<Entity<Worktree>>,
+    },
+    /// Query remote host for candidates. As of writing, the host runs a local search in "buffers with matches only" mode.
+    Remote {
+        client: AnyProtoClient,
+        remote_id: u64,
+        models: Arc<Mutex<RemotelyCreatedModels>>,
+    },
+    /// Run search against a known set of candidates. Even when working with a remote host, this won't round-trip to host.
+    OpenBuffersOnly,
+}
+
+/// Represents results of project search and allows one to either obtain match positions OR
+/// just the handles to buffers that may match the search. Grabbing the handles is cheaper than obtaining full match positions, because in that case we'll look for
+/// at most one match in each file.
+#[must_use]
+pub struct SearchResultsHandle {
+    results: Receiver<SearchResult>,
+    matching_buffers: Receiver<Entity<Buffer>>,
+    trigger_search: Box<dyn FnOnce(&mut App) -> Task<()> + Send + Sync>,
+}
+
+impl SearchResultsHandle {
+    pub fn results(self, cx: &mut App) -> Receiver<SearchResult> {
+        (self.trigger_search)(cx).detach();
+        self.results
+    }
+    pub fn matching_buffers(self, cx: &mut App) -> Receiver<Entity<Buffer>> {
+        (self.trigger_search)(cx).detach();
+        self.matching_buffers
+    }
+}
+
+#[derive(Clone)]
+enum FindSearchCandidates {
+    Local {
+        fs: Arc<dyn Fs>,
+        /// Start off with all paths in project and filter them based on:
+        /// - Include filters
+        /// - Exclude filters
+        /// - Only open buffers
+        /// - Scan ignored files
+        /// Put another way: filter out files that can't match (without looking at file contents)
+        input_paths_rx: Receiver<InputPath>,
+        /// After that, if the buffer is not yet loaded, we'll figure out if it contains at least one match
+        /// based on disk contents of a buffer. This step is not performed for buffers we already have in memory.
+        confirm_contents_will_match_tx: Sender<MatchingEntry>,
+        confirm_contents_will_match_rx: Receiver<MatchingEntry>,
+        /// Of those that contain at least one match (or are already in memory), look for rest of matches (and figure out their ranges).
+        /// But wait - first, we need to go back to the main thread to open a buffer (& create an entity for it).
+        get_buffer_for_full_scan_tx: Sender<ProjectPath>,
+    },
+    Remote,
+    OpenBuffersOnly,
+}
+
+impl Search {
+    pub fn local(
+        fs: Arc<dyn Fs>,
+        buffer_store: Entity<BufferStore>,
+        worktree_store: Entity<WorktreeStore>,
+        limit: usize,
+        cx: &mut App,
+    ) -> Self {
+        let worktrees = worktree_store.read(cx).visible_worktrees(cx).collect();
+        Self {
+            kind: SearchKind::Local { fs, worktrees },
+            buffer_store,
+            worktree_store,
+            limit,
+        }
+    }
+
+    pub(crate) fn remote(
+        buffer_store: Entity<BufferStore>,
+        worktree_store: Entity<WorktreeStore>,
+        limit: usize,
+        client_state: (AnyProtoClient, u64, Arc<Mutex<RemotelyCreatedModels>>),
+    ) -> Self {
+        Self {
+            kind: SearchKind::Remote {
+                client: client_state.0,
+                remote_id: client_state.1,
+                models: client_state.2,
+            },
+            buffer_store,
+            worktree_store,
+            limit,
+        }
+    }
+    pub(crate) fn open_buffers_only(
+        buffer_store: Entity<BufferStore>,
+        worktree_store: Entity<WorktreeStore>,
+        limit: usize,
+    ) -> Self {
+        Self {
+            kind: SearchKind::OpenBuffersOnly,
+            buffer_store,
+            worktree_store,
+            limit,
+        }
+    }
+
+    pub(crate) const MAX_SEARCH_RESULT_FILES: usize = 5_000;
+    pub(crate) const MAX_SEARCH_RESULT_RANGES: usize = 10_000;
+    /// Prepares a project search run. The resulting [`SearchResultsHandle`] has to be used to specify whether you're interested in matching buffers
+    /// or full search results.
+    pub fn into_handle(mut self, query: SearchQuery, cx: &mut App) -> SearchResultsHandle {
+        let mut open_buffers = HashSet::default();
+        let mut unnamed_buffers = Vec::new();
+        const MAX_CONCURRENT_BUFFER_OPENS: usize = 64;
+        let buffers = self.buffer_store.read(cx);
+        for handle in buffers.buffers() {
+            let buffer = handle.read(cx);
+            if !buffers.is_searchable(&buffer.remote_id()) {
+                continue;
+            } else if let Some(entry_id) = buffer.entry_id(cx) {
+                open_buffers.insert(entry_id);
+            } else {
+                self.limit -= self.limit.saturating_sub(1);
+                unnamed_buffers.push(handle)
+            };
+        }
+        let executor = cx.background_executor().clone();
+        let (tx, rx) = unbounded();
+        let (grab_buffer_snapshot_tx, grab_buffer_snapshot_rx) = unbounded();
+        let matching_buffers = grab_buffer_snapshot_rx.clone();
+        let trigger_search = Box::new(move |cx: &mut App| {
+            cx.spawn(async move |cx| {
+                for buffer in unnamed_buffers {
+                    _ = grab_buffer_snapshot_tx.send(buffer).await;
+                }
+
+                let (find_all_matches_tx, find_all_matches_rx) =
+                    bounded(MAX_CONCURRENT_BUFFER_OPENS);
+
+                let (candidate_searcher, tasks) = match self.kind {
+                    SearchKind::OpenBuffersOnly => {
+                        let Ok(open_buffers) = cx.update(|cx| self.all_loaded_buffers(&query, cx))
+                        else {
+                            return;
+                        };
+                        let fill_requests = cx
+                            .background_spawn(async move {
+                                for buffer in open_buffers {
+                                    if let Err(_) = grab_buffer_snapshot_tx.send(buffer).await {
+                                        return;
+                                    }
+                                }
+                            })
+                            .boxed_local();
+                        (FindSearchCandidates::OpenBuffersOnly, vec![fill_requests])
+                    }
+                    SearchKind::Local {
+                        fs,
+                        ref mut worktrees,
+                    } => {
+                        let (get_buffer_for_full_scan_tx, get_buffer_for_full_scan_rx) =
+                            unbounded();
+                        let (confirm_contents_will_match_tx, confirm_contents_will_match_rx) =
+                            bounded(64);
+                        let (sorted_search_results_tx, sorted_search_results_rx) = unbounded();
+
+                        let (input_paths_tx, input_paths_rx) = unbounded();
+
+                        let tasks = vec![
+                            cx.spawn(Self::provide_search_paths(
+                                std::mem::take(worktrees),
+                                query.include_ignored(),
+                                input_paths_tx,
+                                sorted_search_results_tx,
+                            ))
+                            .boxed_local(),
+                            Self::open_buffers(
+                                &self.buffer_store,
+                                get_buffer_for_full_scan_rx,
+                                grab_buffer_snapshot_tx,
+                                cx.clone(),
+                            )
+                            .boxed_local(),
+                            cx.background_spawn(Self::maintain_sorted_search_results(
+                                sorted_search_results_rx,
+                                get_buffer_for_full_scan_tx.clone(),
+                                self.limit,
+                            ))
+                            .boxed_local(),
+                        ];
+                        (
+                            FindSearchCandidates::Local {
+                                fs,
+                                get_buffer_for_full_scan_tx,
+                                confirm_contents_will_match_tx,
+                                confirm_contents_will_match_rx,
+                                input_paths_rx,
+                            },
+                            tasks,
+                        )
+                    }
+                    SearchKind::Remote {
+                        client,
+                        remote_id,
+                        models,
+                    } => {
+                        let request = client.request(proto::FindSearchCandidates {
+                            project_id: remote_id,
+                            query: Some(query.to_proto()),
+                            limit: self.limit as _,
+                        });
+                        let Ok(guard) = cx.update(|cx| {
+                            Project::retain_remotely_created_models_impl(
+                                &models,
+                                &self.buffer_store,
+                                &self.worktree_store,
+                                cx,
+                            )
+                        }) else {
+                            return;
+                        };
+                        let buffer_store = self.buffer_store.downgrade();
+                        let issue_remote_buffers_request = cx
+                            .spawn(async move |cx| {
+                                let _ = maybe!(async move {
+                                    let response = request.await?;
+
+                                    for buffer_id in response.buffer_ids {
+                                        let buffer_id = BufferId::new(buffer_id)?;
+                                        let buffer = buffer_store
+                                            .update(cx, |buffer_store, cx| {
+                                                buffer_store.wait_for_remote_buffer(buffer_id, cx)
+                                            })?
+                                            .await?;
+                                        let _ = grab_buffer_snapshot_tx.send(buffer).await;
+                                    }
+
+                                    drop(guard);
+                                    anyhow::Ok(())
+                                })
+                                .await
+                                .log_err();
+                            })
+                            .boxed_local();
+                        (
+                            FindSearchCandidates::Remote,
+                            vec![issue_remote_buffers_request],
+                        )
+                    }
+                };
+
+                let matches_count = AtomicUsize::new(0);
+                let matched_buffer_count = AtomicUsize::new(0);
+
+                let worker_pool = executor.scoped(|scope| {
+                    let num_cpus = executor.num_cpus();
+
+                    assert!(num_cpus > 0);
+                    for _ in 0..executor.num_cpus() - 1 {
+                        let worker = Worker {
+                            query: &query,
+                            open_buffers: &open_buffers,
+                            matched_buffer_count: &matched_buffer_count,
+                            matches_count: &matches_count,
+                            candidates: candidate_searcher.clone(),
+                            find_all_matches_rx: find_all_matches_rx.clone(),
+                            publish_matches: tx.clone(),
+                        };
+                        scope.spawn(worker.run());
+                    }
+                    drop(tx);
+                    drop(find_all_matches_rx);
+                    drop(candidate_searcher);
+                });
+
+                let buffer_snapshots = Self::grab_buffer_snapshots(
+                    grab_buffer_snapshot_rx,
+                    find_all_matches_tx,
+                    cx.clone(),
+                );
+                futures::future::join_all(
+                    [worker_pool.boxed_local(), buffer_snapshots.boxed_local()]
+                        .into_iter()
+                        .chain(tasks),
+                )
+                .await;
+            })
+        });
+
+        SearchResultsHandle {
+            results: rx,
+            matching_buffers,
+            trigger_search,
+        }
+    }
+
+    fn provide_search_paths(
+        worktrees: Vec<Entity<Worktree>>,
+        include_ignored: bool,
+        tx: Sender<InputPath>,
+        results: Sender<oneshot::Receiver<ProjectPath>>,
+    ) -> impl AsyncFnOnce(&mut AsyncApp) {
+        async move |cx| {
+            _ = maybe!(async move {
+                for worktree in worktrees {
+                    let (mut snapshot, worktree_settings) = worktree
+                        .read_with(cx, |this, _| {
+                            Some((this.snapshot(), this.as_local()?.settings()))
+                        })?
+                        .context("The worktree is not local")?;
+                    if include_ignored {
+                        // Pre-fetch all of the ignored directories as they're going to be searched.
+                        let mut entries_to_refresh = vec![];
+                        for entry in snapshot.entries(include_ignored, 0) {
+                            if entry.is_ignored && entry.kind.is_unloaded() {
+                                if !worktree_settings.is_path_excluded(&entry.path) {
+                                    entries_to_refresh.push(entry.path.clone());
+                                }
+                            }
+                        }
+                        let barrier = worktree.update(cx, |this, _| {
+                            let local = this.as_local_mut()?;
+                            let barrier = entries_to_refresh
+                                .into_iter()
+                                .map(|path| local.add_path_prefix_to_scan(path).into_future())
+                                .collect::<Vec<_>>();
+                            Some(barrier)
+                        })?;
+                        if let Some(barriers) = barrier {
+                            futures::future::join_all(barriers).await;
+                        }
+                        snapshot = worktree.read_with(cx, |this, _| this.snapshot())?;
+                    }
+                    cx.background_executor()
+                        .scoped(|scope| {
+                            scope.spawn(async {
+                                for entry in snapshot.files(include_ignored, 0) {
+                                    let (should_scan_tx, should_scan_rx) = oneshot::channel();
+                                    let Ok(_) = tx
+                                        .send(InputPath {
+                                            entry: entry.clone(),
+                                            snapshot: snapshot.clone(),
+                                            should_scan_tx,
+                                        })
+                                        .await
+                                    else {
+                                        return;
+                                    };
+                                    if results.send(should_scan_rx).await.is_err() {
+                                        return;
+                                    };
+                                }
+                            })
+                        })
+                        .await;
+                }
+                anyhow::Ok(())
+            })
+            .await;
+        }
+    }
+
+    async fn maintain_sorted_search_results(
+        rx: Receiver<oneshot::Receiver<ProjectPath>>,
+        paths_for_full_scan: Sender<ProjectPath>,
+        limit: usize,
+    ) {
+        let mut rx = pin!(rx);
+        let mut matched = 0;
+        while let Some(mut next_path_result) = rx.next().await {
+            let Some(successful_path) = next_path_result.next().await else {
+                // This math did not produce a match, hence skip it.
+                continue;
+            };
+            if paths_for_full_scan.send(successful_path).await.is_err() {
+                return;
+            };
+            matched += 1;
+            if matched >= limit {
+                break;
+            }
+        }
+    }
+
+    /// Background workers cannot open buffers by themselves, hence main thread will do it on their behalf.
+    async fn open_buffers(
+        buffer_store: &Entity<BufferStore>,
+        rx: Receiver<ProjectPath>,
+        find_all_matches_tx: Sender<Entity<Buffer>>,
+        mut cx: AsyncApp,
+    ) {
+        let mut rx = pin!(rx.ready_chunks(64));
+        _ = maybe!(async move {
+            while let Some(requested_paths) = rx.next().await {
+                let mut buffers = buffer_store.update(&mut cx, |this, cx| {
+                    requested_paths
+                        .into_iter()
+                        .map(|path| this.open_buffer(path, cx))
+                        .collect::<FuturesOrdered<_>>()
+                })?;
+
+                while let Some(buffer) = buffers.next().await {
+                    if let Some(buffer) = buffer.log_err() {
+                        find_all_matches_tx.send(buffer).await?;
+                    }
+                }
+            }
+            Result::<_, anyhow::Error>::Ok(())
+        })
+        .await;
+    }
+
+    async fn grab_buffer_snapshots(
+        rx: Receiver<Entity<Buffer>>,
+        find_all_matches_tx: Sender<(Entity<Buffer>, BufferSnapshot)>,
+        mut cx: AsyncApp,
+    ) {
+        _ = maybe!(async move {
+            while let Ok(buffer) = rx.recv().await {
+                let snapshot = buffer.read_with(&mut cx, |this, _| this.snapshot())?;
+                find_all_matches_tx.send((buffer, snapshot)).await?;
+            }
+            Result::<_, anyhow::Error>::Ok(())
+        })
+        .await;
+    }
+
+    fn all_loaded_buffers(&self, search_query: &SearchQuery, cx: &App) -> Vec<Entity<Buffer>> {
+        let worktree_store = self.worktree_store.read(cx);
+        let mut buffers = search_query
+            .buffers()
+            .into_iter()
+            .flatten()
+            .filter(|buffer| {
+                let b = buffer.read(cx);
+                if let Some(file) = b.file() {
+                    if !search_query.match_path(file.path().as_std_path()) {
+                        return false;
+                    }
+                    if !search_query.include_ignored()
+                        && let Some(entry) = b
+                            .entry_id(cx)
+                            .and_then(|entry_id| worktree_store.entry_for_id(entry_id, cx))
+                        && entry.is_ignored
+                    {
+                        return false;
+                    }
+                }
+                true
+            })
+            .cloned()
+            .collect::<Vec<_>>();
+        buffers.sort_by(|a, b| {
+            let a = a.read(cx);
+            let b = b.read(cx);
+            match (a.file(), b.file()) {
+                (None, None) => a.remote_id().cmp(&b.remote_id()),
+                (None, Some(_)) => std::cmp::Ordering::Less,
+                (Some(_), None) => std::cmp::Ordering::Greater,
+                (Some(a), Some(b)) => compare_rel_paths((a.path(), true), (b.path(), true)),
+            }
+        });
+
+        buffers
+    }
+}
+
+struct Worker<'search> {
+    query: &'search SearchQuery,
+    matched_buffer_count: &'search AtomicUsize,
+    matches_count: &'search AtomicUsize,
+    open_buffers: &'search HashSet<ProjectEntryId>,
+    candidates: FindSearchCandidates,
+    /// Ok, we're back in background: run full scan & find all matches in a given buffer snapshot.
+    find_all_matches_rx: Receiver<(Entity<Buffer>, BufferSnapshot)>,
+    /// Cool, we have results; let's share them with the world.
+    publish_matches: Sender<SearchResult>,
+}
+
+impl Worker<'_> {
+    async fn run(mut self) {
+        let (
+            input_paths_rx,
+            confirm_contents_will_match_rx,
+            mut confirm_contents_will_match_tx,
+            mut get_buffer_for_full_scan_tx,
+            fs,
+        ) = match self.candidates {
+            FindSearchCandidates::Local {
+                fs,
+                input_paths_rx,
+                confirm_contents_will_match_rx,
+                confirm_contents_will_match_tx,
+                get_buffer_for_full_scan_tx,
+            } => (
+                input_paths_rx,
+                confirm_contents_will_match_rx,
+                confirm_contents_will_match_tx,
+                get_buffer_for_full_scan_tx,
+                Some(fs),
+            ),
+            FindSearchCandidates::Remote | FindSearchCandidates::OpenBuffersOnly => (
+                unbounded().1,
+                unbounded().1,
+                unbounded().0,
+                unbounded().0,
+                None,
+            ),
+        };
+        let mut find_all_matches = pin!(self.find_all_matches_rx.fuse());
+        let mut find_first_match = pin!(confirm_contents_will_match_rx.fuse());
+        let mut scan_path = pin!(input_paths_rx.fuse());
+
+        loop {
+            let handler = RequestHandler {
+                query: self.query,
+                open_entries: &self.open_buffers,
+                fs: fs.as_deref(),
+                matched_buffer_count: self.matched_buffer_count,
+                matches_count: self.matches_count,
+                confirm_contents_will_match_tx: &confirm_contents_will_match_tx,
+                get_buffer_for_full_scan_tx: &get_buffer_for_full_scan_tx,
+                publish_matches: &self.publish_matches,
+            };
+            // Whenever we notice that some step of a pipeline is closed, we don't want to close subsequent
+            // steps straight away. Another worker might be about to produce a value that will
+            // be pushed there, thus we'll replace current worker's pipe with a dummy one.
+            // That way, we'll only ever close a next-stage channel when ALL workers do so.
+            select_biased! {
+                find_all_matches = find_all_matches.next() => {
+
+                    if self.publish_matches.is_closed() {
+                        break;
+                    }
+                    let Some(matches) = find_all_matches else {
+                        self.publish_matches = bounded(1).0;
+                        continue;
+                    };
+                    let result = handler.handle_find_all_matches(matches).await;
+                    if let Some(_should_bail) = result {
+
+                        self.publish_matches = bounded(1).0;
+                        continue;
+                    }
+                },
+                find_first_match = find_first_match.next() => {
+                    if let Some(buffer_with_at_least_one_match) = find_first_match {
+                        handler.handle_find_first_match(buffer_with_at_least_one_match).await;
+                    } else {
+                        get_buffer_for_full_scan_tx = bounded(1).0;
+                    }
+
+                },
+                scan_path = scan_path.next() => {
+                    if let Some(path_to_scan) = scan_path {
+                        handler.handle_scan_path(path_to_scan).await;
+                    } else {
+                        // If we're the last worker to notice that this is not producing values, close the upstream.
+                        confirm_contents_will_match_tx = bounded(1).0;
+                    }
+
+                 }
+                 complete => {
+                     break
+                },
+
+            }
+        }
+    }
+}
+
+struct RequestHandler<'worker> {
+    query: &'worker SearchQuery,
+    fs: Option<&'worker dyn Fs>,
+    open_entries: &'worker HashSet<ProjectEntryId>,
+    matched_buffer_count: &'worker AtomicUsize,
+    matches_count: &'worker AtomicUsize,
+
+    confirm_contents_will_match_tx: &'worker Sender<MatchingEntry>,
+    get_buffer_for_full_scan_tx: &'worker Sender<ProjectPath>,
+    publish_matches: &'worker Sender<SearchResult>,
+}
+
+struct LimitReached;
+
+impl RequestHandler<'_> {
+    async fn handle_find_all_matches(
+        &self,
+        (buffer, snapshot): (Entity<Buffer>, BufferSnapshot),
+    ) -> Option<LimitReached> {
+        let ranges = self
+            .query
+            .search(&snapshot, None)
+            .await
+            .iter()
+            .map(|range| snapshot.anchor_before(range.start)..snapshot.anchor_after(range.end))
+            .collect::<Vec<_>>();
+
+        let matched_ranges = ranges.len();
+        if self.matched_buffer_count.fetch_add(1, Ordering::Release)
+            > Search::MAX_SEARCH_RESULT_FILES
+            || self
+                .matches_count
+                .fetch_add(matched_ranges, Ordering::Release)
+                > Search::MAX_SEARCH_RESULT_RANGES
+        {
+            _ = self.publish_matches.send(SearchResult::LimitReached).await;
+            Some(LimitReached)
+        } else {
+            _ = self
+                .publish_matches
+                .send(SearchResult::Buffer { buffer, ranges })
+                .await;
+            None
+        }
+    }
+    async fn handle_find_first_match(&self, mut entry: MatchingEntry) {
+        _=maybe!(async move {
+            let abs_path = entry.worktree_root.join(entry.path.path.as_std_path());
+            let Some(file) = self.fs.context("Trying to query filesystem in remote project search")?.open_sync(&abs_path).await.log_err() else {
+                return anyhow::Ok(());
+            };
+
+            let mut file = BufReader::new(file);
+            let file_start = file.fill_buf()?;
+
+            if let Err(Some(starting_position)) =
+            std::str::from_utf8(file_start).map_err(|e| e.error_len())
+            {
+                // Before attempting to match the file content, throw away files that have invalid UTF-8 sequences early on;
+                // That way we can still match files in a streaming fashion without having look at "obviously binary" files.
+                log::debug!(
+                    "Invalid UTF-8 sequence in file {abs_path:?} at byte position {starting_position}"
+                );
+                return Ok(());
+            }
+
+            if self.query.detect(file).unwrap_or(false) {
+                // Yes, we should scan the whole file.
+                entry.should_scan_tx.send(entry.path).await?;
+            }
+            Ok(())
+        }).await;
+    }
+
+    async fn handle_scan_path(&self, req: InputPath) {
+        _ = maybe!(async move {
+            let InputPath {
+                entry,
+
+                snapshot,
+                should_scan_tx,
+            } = req;
+
+            if entry.is_fifo || !entry.is_file() {
+                return Ok(());
+            }
+
+            if self.query.filters_path() {
+                let matched_path = if self.query.match_full_paths() {
+                    let mut full_path = snapshot.root_name().as_std_path().to_owned();
+                    full_path.push(entry.path.as_std_path());
+                    self.query.match_path(&full_path)
+                } else {
+                    self.query.match_path(entry.path.as_std_path())
+                };
+                if !matched_path {
+                    return Ok(());
+                }
+            }
+
+            if self.open_entries.contains(&entry.id) {
+                // The buffer is already in memory and that's the version we want to scan;
+                // hence skip the dilly-dally and look for all matches straight away.
+                self.get_buffer_for_full_scan_tx
+                    .send(ProjectPath {
+                        worktree_id: snapshot.id(),
+                        path: entry.path.clone(),
+                    })
+                    .await?;
+            } else {
+                self.confirm_contents_will_match_tx
+                    .send(MatchingEntry {
+                        should_scan_tx: should_scan_tx,
+                        worktree_root: snapshot.abs_path().clone(),
+                        path: ProjectPath {
+                            worktree_id: snapshot.id(),
+                            path: entry.path.clone(),
+                        },
+                    })
+                    .await?;
+            }
+
+            anyhow::Ok(())
+        })
+        .await;
+    }
+}
+
+struct InputPath {
+    entry: Entry,
+    snapshot: Snapshot,
+    should_scan_tx: oneshot::Sender<ProjectPath>,
+}
+
+struct MatchingEntry {
+    worktree_root: Arc<Path>,
+    path: ProjectPath,
+    should_scan_tx: oneshot::Sender<ProjectPath>,
+}

crates/project/src/worktree_store.rs 🔗

@@ -8,10 +8,7 @@ use std::{
 use anyhow::{Context as _, Result, anyhow, bail};
 use collections::{HashMap, HashSet};
 use fs::{Fs, copy_recursive};
-use futures::{
-    FutureExt, SinkExt,
-    future::{BoxFuture, Shared},
-};
+use futures::{FutureExt, SinkExt, future::Shared};
 use gpui::{
     App, AppContext as _, AsyncApp, Context, Entity, EntityId, EventEmitter, Task, WeakEntity,
 };
@@ -999,148 +996,14 @@ impl WorktreeStore {
         matching_paths_rx
     }
 
-    fn scan_ignored_dir<'a>(
-        fs: &'a Arc<dyn Fs>,
-        snapshot: &'a worktree::Snapshot,
-        path: &'a RelPath,
-        query: &'a SearchQuery,
-        filter_tx: &'a Sender<MatchingEntry>,
-        output_tx: &'a Sender<oneshot::Receiver<ProjectPath>>,
-    ) -> BoxFuture<'a, Result<()>> {
-        async move {
-            let abs_path = snapshot.absolutize(path);
-            let Some(mut files) = fs
-                .read_dir(&abs_path)
-                .await
-                .with_context(|| format!("listing ignored path {abs_path:?}"))
-                .log_err()
-            else {
-                return Ok(());
-            };
-
-            let mut results = Vec::new();
-
-            while let Some(Ok(file)) = files.next().await {
-                let Some(metadata) = fs
-                    .metadata(&file)
-                    .await
-                    .with_context(|| format!("fetching fs metadata for {abs_path:?}"))
-                    .log_err()
-                    .flatten()
-                else {
-                    continue;
-                };
-                if metadata.is_symlink || metadata.is_fifo {
-                    continue;
-                }
-                let relative_path = file.strip_prefix(snapshot.abs_path())?;
-                let relative_path = RelPath::new(&relative_path, snapshot.path_style())
-                    .context("getting relative path")?;
-                results.push((relative_path.into_arc(), !metadata.is_dir))
-            }
-            results.sort_by(|(a_path, _), (b_path, _)| a_path.cmp(b_path));
-            for (path, is_file) in results {
-                if is_file {
-                    if query.filters_path() {
-                        let matched_path = if query.match_full_paths() {
-                            let mut full_path = snapshot.root_name().as_std_path().to_owned();
-                            full_path.push(path.as_std_path());
-                            query.match_path(&full_path)
-                        } else {
-                            query.match_path(&path.as_std_path())
-                        };
-                        if !matched_path {
-                            continue;
-                        }
-                    }
-                    let (tx, rx) = oneshot::channel();
-                    output_tx.send(rx).await?;
-                    filter_tx
-                        .send(MatchingEntry {
-                            respond: tx,
-                            worktree_root: snapshot.abs_path().clone(),
-                            path: ProjectPath {
-                                worktree_id: snapshot.id(),
-                                path: path.into_arc(),
-                            },
-                        })
-                        .await?;
-                } else {
-                    Self::scan_ignored_dir(fs, snapshot, &path, query, filter_tx, output_tx)
-                        .await?;
-                }
-            }
-            Ok(())
-        }
-        .boxed()
-    }
-
     async fn find_candidate_paths(
-        fs: Arc<dyn Fs>,
-        snapshots: Vec<(worktree::Snapshot, WorktreeSettings)>,
-        open_entries: HashSet<ProjectEntryId>,
-        query: SearchQuery,
-        filter_tx: Sender<MatchingEntry>,
-        output_tx: Sender<oneshot::Receiver<ProjectPath>>,
+        _: Arc<dyn Fs>,
+        _: Vec<(worktree::Snapshot, WorktreeSettings)>,
+        _: HashSet<ProjectEntryId>,
+        _: SearchQuery,
+        _: Sender<MatchingEntry>,
+        _: Sender<oneshot::Receiver<ProjectPath>>,
     ) -> Result<()> {
-        for (snapshot, settings) in snapshots {
-            for entry in snapshot.entries(query.include_ignored(), 0) {
-                if entry.is_dir() && entry.is_ignored {
-                    if !settings.is_path_excluded(&entry.path) {
-                        Self::scan_ignored_dir(
-                            &fs,
-                            &snapshot,
-                            &entry.path,
-                            &query,
-                            &filter_tx,
-                            &output_tx,
-                        )
-                        .await?;
-                    }
-                    continue;
-                }
-
-                if entry.is_fifo || !entry.is_file() {
-                    continue;
-                }
-
-                if query.filters_path() {
-                    let matched_path = if query.match_full_paths() {
-                        let mut full_path = snapshot.root_name().as_std_path().to_owned();
-                        full_path.push(entry.path.as_std_path());
-                        query.match_path(&full_path)
-                    } else {
-                        query.match_path(entry.path.as_std_path())
-                    };
-                    if !matched_path {
-                        continue;
-                    }
-                }
-
-                let (mut tx, rx) = oneshot::channel();
-
-                if open_entries.contains(&entry.id) {
-                    tx.send(ProjectPath {
-                        worktree_id: snapshot.id(),
-                        path: entry.path.clone(),
-                    })
-                    .await?;
-                } else {
-                    filter_tx
-                        .send(MatchingEntry {
-                            respond: tx,
-                            worktree_root: snapshot.abs_path().clone(),
-                            path: ProjectPath {
-                                worktree_id: snapshot.id(),
-                                path: entry.path.clone(),
-                            },
-                        })
-                        .await?;
-                }
-
-                output_tx.send(rx).await?;
-            }
-        }
         Ok(())
     }
 

crates/project_benchmarks/Cargo.toml 🔗

@@ -0,0 +1,21 @@
+[package]
+name = "project_benchmarks"
+version = "0.1.0"
+publish.workspace = true
+edition.workspace = true
+
+[dependencies]
+anyhow.workspace = true
+clap.workspace = true
+client.workspace = true
+futures.workspace = true
+gpui = { workspace = true, features = ["windows-manifest"] }
+http_client = { workspace = true, features = ["test-support"]}
+language.workspace = true
+node_runtime.workspace = true
+project.workspace = true
+settings.workspace = true
+watch.workspace = true
+
+[lints]
+workspace = true

crates/project_benchmarks/src/main.rs 🔗

@@ -0,0 +1,136 @@
+use std::sync::Arc;
+
+use clap::Parser;
+use client::{Client, UserStore};
+use gpui::{AppContext as _, Application};
+use http_client::FakeHttpClient;
+use language::LanguageRegistry;
+use node_runtime::NodeRuntime;
+use project::{
+    Project, RealFs,
+    search::{SearchQuery, SearchResult},
+};
+
+#[derive(Parser)]
+struct Args {
+    /// List of worktrees to run the search against.
+    worktrees: Vec<String>,
+    #[clap(short)]
+    query: String,
+    /// Treat query as a regex.
+    #[clap(short, long)]
+    regex: bool,
+    /// Matches have to be standalone words.
+    #[clap(long)]
+    whole_word: bool,
+    /// Make matching case-sensitive.
+    #[clap(long, default_value_t = true)]
+    case_sensitive: bool,
+    /// Include gitignored files in the search.
+    #[clap(long)]
+    include_ignored: bool,
+}
+
+fn main() -> Result<(), anyhow::Error> {
+    let args = Args::parse();
+    let query = if args.regex {
+        SearchQuery::regex(
+            args.query,
+            args.whole_word,
+            args.case_sensitive,
+            args.include_ignored,
+            false,
+            Default::default(),
+            Default::default(),
+            false,
+            None,
+        )
+    } else {
+        SearchQuery::text(
+            args.query,
+            args.whole_word,
+            args.case_sensitive,
+            args.include_ignored,
+            Default::default(),
+            Default::default(),
+            false,
+            None,
+        )
+    }?;
+    Application::headless().run(|cx| {
+        settings::init(cx);
+        client::init_settings(cx);
+        language::init(cx);
+        Project::init_settings(cx);
+        let client = Client::production(cx);
+        let http_client = FakeHttpClient::with_200_response();
+        let (_, rx) = watch::channel(None);
+        let node = NodeRuntime::new(http_client, None, rx);
+        let user_store = cx.new(|cx| UserStore::new(client.clone(), cx));
+        let registry = Arc::new(LanguageRegistry::new(cx.background_executor().clone()));
+        let fs = Arc::new(RealFs::new(None, cx.background_executor().clone()));
+        let project = Project::local(
+            client,
+            node,
+            user_store,
+            registry,
+            fs,
+            Some(Default::default()),
+            cx,
+        );
+
+        project.clone().update(cx, move |_, cx| {
+            cx.spawn(async move |_, cx| {
+                println!("Loading worktrees");
+                let worktrees = project.update(cx, |this, cx| {
+                    args.worktrees
+                        .into_iter()
+                        .map(|worktree| this.find_or_create_worktree(worktree, true, cx))
+                        .collect::<Vec<_>>()
+                })?;
+
+                let worktrees = futures::future::join_all(worktrees)
+                    .await
+                    .into_iter()
+                    .collect::<Result<Vec<_>, anyhow::Error>>()?;
+
+                for (worktree, _) in &worktrees {
+                    worktree
+                        .update(cx, |this, _| this.as_local().unwrap().scan_complete())?
+                        .await;
+                }
+                println!("Worktrees loaded");
+
+                println!("Starting a project search");
+                let timer = std::time::Instant::now();
+                let mut first_match = None;
+                let matches = project
+                    .update(cx, |this, cx| this.search(query, cx))
+                    .unwrap();
+                let mut matched_files = 0;
+                let mut matched_chunks = 0;
+                while let Ok(match_result) = matches.recv().await {
+                    if first_match.is_none() {
+                        let time = timer.elapsed();
+                        first_match = Some(time);
+                        println!("First match found after {time:?}");
+                    }
+                    if let SearchResult::Buffer { ranges, .. } = match_result {
+                        matched_files += 1;
+                        matched_chunks += ranges.len();
+                    }
+                }
+                let elapsed = timer.elapsed();
+                println!(
+                    "Finished project search after {elapsed:?}. Matched {matched_files} files and {matched_chunks} excerpts"
+                );
+                drop(project);
+                cx.update(|cx| cx.quit())?;
+
+                anyhow::Ok(())
+            })
+            .detach();
+        });
+    });
+    Ok(())
+}

crates/remote_server/Cargo.toml 🔗

@@ -75,7 +75,7 @@ minidumper.workspace = true
 
 [dev-dependencies]
 action_log.workspace = true
-agent.workspace = true
+agent = { workspace = true, features = ["test-support"] }
 client = { workspace = true, features = ["test-support"] }
 clock = { workspace = true, features = ["test-support"] }
 collections.workspace = true

crates/remote_server/src/headless_project.rs 🔗

@@ -639,9 +639,15 @@ impl HeadlessProject {
             PathStyle::local(),
         )?;
         let results = this.update(&mut cx, |this, cx| {
-            this.buffer_store.update(cx, |buffer_store, cx| {
-                buffer_store.find_search_candidates(&query, message.limit as _, this.fs.clone(), cx)
-            })
+            project::Search::local(
+                this.fs.clone(),
+                this.buffer_store.clone(),
+                this.worktree_store.clone(),
+                message.limit as _,
+                cx,
+            )
+            .into_handle(query, cx)
+            .matching_buffers(cx)
         })?;
 
         let mut response = proto::FindSearchCandidatesResponse {