SSH remote search (#16915)

Conrad Irwin and Max created

Co-Authored-By: Max <max@zed.dev>

Release Notes:

- ssh remoting: add project search

---------

Co-authored-by: Max <max@zed.dev>

Change summary

crates/collab/src/rpc.rs                                      |  54 
crates/collab/src/rpc/connection_pool.rs                      |   4 
crates/collab/src/tests/integration_tests.rs                  |   4 
crates/collab/src/tests/random_project_collaboration_tests.rs |   2 
crates/project/src/buffer_store.rs                            |  99 
crates/project/src/project.rs                                 | 699 +---
crates/project/src/search.rs                                  |  52 
crates/project/src/worktree_store.rs                          | 313 ++
crates/proto/proto/zed.proto                                  |  25 
crates/proto/src/proto.rs                                     |   4 
crates/remote_server/src/headless_project.rs                  |  47 
crates/remote_server/src/remote_editing_tests.rs              | 135 
crates/search/src/project_search.rs                           |   4 
13 files changed, 868 insertions(+), 574 deletions(-)

Detailed changes

crates/collab/src/rpc.rs 🔗

@@ -478,6 +478,7 @@ impl Server {
             .add_request_handler(user_handler(
                 forward_read_only_project_request::<proto::SearchProject>,
             ))
+            .add_request_handler(user_handler(forward_find_search_candidates_request))
             .add_request_handler(user_handler(
                 forward_read_only_project_request::<proto::GetDocumentHighlights>,
             ))
@@ -2943,6 +2944,59 @@ where
     Ok(())
 }
 
+async fn forward_find_search_candidates_request(
+    request: proto::FindSearchCandidates,
+    response: Response<proto::FindSearchCandidates>,
+    session: UserSession,
+) -> Result<()> {
+    let project_id = ProjectId::from_proto(request.remote_entity_id());
+    let host_connection_id = session
+        .db()
+        .await
+        .host_for_read_only_project_request(project_id, session.connection_id, session.user_id())
+        .await?;
+
+    let host_version = session
+        .connection_pool()
+        .await
+        .connection(host_connection_id)
+        .map(|c| c.zed_version);
+
+    if host_version.is_some_and(|host_version| host_version < ZedVersion::with_search_candidates())
+    {
+        let query = request.query.ok_or_else(|| anyhow!("missing query"))?;
+        let search = proto::SearchProject {
+            project_id: project_id.to_proto(),
+            query: query.query,
+            regex: query.regex,
+            whole_word: query.whole_word,
+            case_sensitive: query.case_sensitive,
+            files_to_include: query.files_to_include,
+            files_to_exclude: query.files_to_exclude,
+            include_ignored: query.include_ignored,
+        };
+
+        let payload = session
+            .peer
+            .forward_request(session.connection_id, host_connection_id, search)
+            .await?;
+        return response.send(proto::FindSearchCandidatesResponse {
+            buffer_ids: payload
+                .locations
+                .into_iter()
+                .map(|loc| loc.buffer_id)
+                .collect(),
+        });
+    }
+
+    let payload = session
+        .peer
+        .forward_request(session.connection_id, host_connection_id, request)
+        .await?;
+    response.send(payload)?;
+    Ok(())
+}
+
 /// forward a project request to the dev server. Only allowed
 /// if it's your dev server.
 async fn forward_project_request_for_owner<T>(

crates/collab/src/rpc/connection_pool.rs 🔗

@@ -42,6 +42,10 @@ impl ZedVersion {
     pub fn with_list_directory() -> ZedVersion {
         ZedVersion(SemanticVersion::new(0, 145, 0))
     }
+
+    pub fn with_search_candidates() -> ZedVersion {
+        ZedVersion(SemanticVersion::new(0, 151, 0))
+    }
 }
 
 pub trait VersionedMessage {

crates/collab/src/tests/integration_tests.rs 🔗

@@ -28,8 +28,8 @@ use live_kit_client::MacOSDisplay;
 use lsp::LanguageServerId;
 use parking_lot::Mutex;
 use project::{
-    search::SearchQuery, DiagnosticSummary, FormatTrigger, HoverBlockKind, Project, ProjectPath,
-    SearchResult,
+    search::SearchQuery, search::SearchResult, DiagnosticSummary, FormatTrigger, HoverBlockKind,
+    Project, ProjectPath,
 };
 use rand::prelude::*;
 use serde_json::json;

crates/collab/src/tests/random_project_collaboration_tests.rs 🔗

@@ -15,7 +15,7 @@ use language::{
 use lsp::FakeLanguageServer;
 use pretty_assertions::assert_eq;
 use project::{
-    search::SearchQuery, Project, ProjectPath, SearchResult, DEFAULT_COMPLETION_CONTEXT,
+    search::SearchQuery, search::SearchResult, Project, ProjectPath, DEFAULT_COMPLETION_CONTEXT,
 };
 use rand::{
     distributions::{Alphanumeric, DistString},

crates/project/src/buffer_store.rs 🔗

@@ -1,9 +1,11 @@
 use crate::{
+    search::SearchQuery,
     worktree_store::{WorktreeStore, WorktreeStoreEvent},
-    NoRepositoryError, ProjectPath,
+    Item, NoRepositoryError, ProjectPath,
 };
 use anyhow::{anyhow, Context as _, Result};
-use collections::{hash_map, HashMap};
+use collections::{hash_map, HashMap, HashSet};
+use fs::Fs;
 use futures::{channel::oneshot, stream::FuturesUnordered, StreamExt as _};
 use git::blame::Blame;
 use gpui::{
@@ -18,6 +20,7 @@ use rpc::{
     proto::{self, AnyProtoClient, EnvelopedMessage, PeerId},
     ErrorExt as _, TypedEnvelope,
 };
+use smol::channel::Receiver;
 use std::{io, path::Path, str::FromStr as _, sync::Arc};
 use text::BufferId;
 use util::{debug_panic, maybe, ResultExt as _};
@@ -778,6 +781,98 @@ impl BufferStore {
             .retain(|_, buffer| !matches!(buffer, OpenBuffer::Operations(_)));
     }
 
+    pub fn find_search_candidates(
+        &mut self,
+        query: &SearchQuery,
+        limit: usize,
+        fs: Arc<dyn Fs>,
+        cx: &mut ModelContext<Self>,
+    ) -> Receiver<Model<Buffer>> {
+        let (tx, rx) = smol::channel::unbounded();
+        let open_buffers = self.find_open_search_candidates(query, cx);
+        let skip_entries: HashSet<_> = open_buffers
+            .iter()
+            .filter_map(|buffer| buffer.read(cx).entry_id(cx))
+            .collect();
+
+        let limit = limit.saturating_sub(open_buffers.len());
+        for open_buffer in open_buffers {
+            tx.send_blocking(open_buffer).ok();
+        }
+
+        let match_rx = self.worktree_store.update(cx, |worktree_store, cx| {
+            worktree_store.find_search_candidates(query.clone(), limit, skip_entries, fs, cx)
+        });
+
+        const MAX_CONCURRENT_BUFFER_OPENS: usize = 8;
+
+        for _ in 0..MAX_CONCURRENT_BUFFER_OPENS {
+            let mut match_rx = match_rx.clone();
+            let tx = tx.clone();
+            cx.spawn(|this, mut cx| async move {
+                while let Some(project_path) = match_rx.next().await {
+                    let buffer = this
+                        .update(&mut cx, |this, cx| this.open_buffer(project_path, cx))?
+                        .await
+                        .log_err();
+                    if let Some(buffer) = buffer {
+                        tx.send_blocking(buffer).ok();
+                    }
+                }
+                anyhow::Ok(())
+            })
+            .detach();
+        }
+        rx
+    }
+
+    /// Returns open buffers filtered by filename
+    /// Does *not* check the buffer content, the caller must do that
+    fn find_open_search_candidates(
+        &self,
+        query: &SearchQuery,
+        cx: &ModelContext<Self>,
+    ) -> Vec<Model<Buffer>> {
+        let include_root = self
+            .worktree_store
+            .read(cx)
+            .visible_worktrees(cx)
+            .collect::<Vec<_>>()
+            .len()
+            > 1;
+        self.buffers()
+            .filter_map(|buffer| {
+                let handle = buffer.clone();
+                buffer.read_with(cx, |buffer, cx| {
+                    let worktree_store = self.worktree_store.read(cx);
+                    let entry_id = buffer.entry_id(cx);
+                    let is_ignored = entry_id
+                        .and_then(|entry_id| worktree_store.entry_for_id(entry_id, cx))
+                        .map_or(false, |entry| entry.is_ignored);
+
+                    if is_ignored && !query.include_ignored() {
+                        return None;
+                    }
+                    if let Some(file) = buffer.file() {
+                        let matched_path = if include_root {
+                            query.file_matches(Some(&file.full_path(cx)))
+                        } else {
+                            query.file_matches(Some(file.path()))
+                        };
+
+                        if matched_path {
+                            Some(handle)
+                        } else {
+                            None
+                        }
+                    } else {
+                        Some(handle)
+                    }
+                })
+            })
+            .collect()
+    }
+
     fn on_buffer_event(
         &mut self,
         buffer: Model<Buffer>,

crates/project/src/project.rs 🔗

@@ -24,7 +24,7 @@ use client::{
     TypedEnvelope, UserStore,
 };
 use clock::ReplicaId;
-use collections::{btree_map, BTreeMap, BTreeSet, HashMap, HashSet, VecDeque};
+use collections::{btree_map, BTreeMap, BTreeSet, HashMap, HashSet};
 use debounced_delay::DebouncedDelay;
 use futures::{
     channel::mpsc::{self, UnboundedReceiver},
@@ -37,8 +37,8 @@ use futures::{
 use git::{blame::Blame, repository::GitRepository};
 use globset::{Glob, GlobSet, GlobSetBuilder};
 use gpui::{
-    AnyModel, AppContext, AsyncAppContext, BackgroundExecutor, BorrowAppContext, Context, Entity,
-    EventEmitter, Model, ModelContext, PromptLevel, SharedString, Task, WeakModel, WindowContext,
+    AnyModel, AppContext, AsyncAppContext, BorrowAppContext, Context, Entity, EventEmitter, Model,
+    ModelContext, PromptLevel, SharedString, Task, WeakModel, WindowContext,
 };
 use http_client::HttpClient;
 use itertools::Itertools;
@@ -77,17 +77,17 @@ use prettier_support::{DefaultPrettier, PrettierInstance};
 use project_settings::{DirenvSettings, LspSettings, ProjectSettings};
 use rand::prelude::*;
 use remote::SshSession;
-use rpc::{proto::AddWorktree, ErrorCode};
-use search::SearchQuery;
+use rpc::{
+    proto::{AddWorktree, AnyProtoClient},
+    ErrorCode,
+};
+use search::{SearchQuery, SearchResult};
 use search_history::SearchHistory;
 use serde::Serialize;
 use settings::{watch_config_file, Settings, SettingsLocation, SettingsStore};
 use sha2::{Digest, Sha256};
 use similar::{ChangeTag, TextDiff};
-use smol::{
-    channel::{Receiver, Sender},
-    lock::Semaphore,
-};
+use smol::channel::{Receiver, Sender};
 use snippet::Snippet;
 use snippet_provider::SnippetProvider;
 use std::{
@@ -142,6 +142,8 @@ const SERVER_LAUNCHING_BEFORE_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5
 pub const SERVER_PROGRESS_THROTTLE_TIMEOUT: Duration = Duration::from_millis(100);
 
 const MAX_PROJECT_SEARCH_HISTORY_SIZE: usize = 500;
+const MAX_SEARCH_RESULT_FILES: usize = 5_000;
+const MAX_SEARCH_RESULT_RANGES: usize = 10_000;
 
 pub trait Item {
     fn try_open(
@@ -214,6 +216,7 @@ pub struct Project {
     buffers_being_formatted: HashSet<BufferId>,
     buffers_needing_diff: HashSet<WeakModel<Buffer>>,
     git_diff_debouncer: DebouncedDelay<Self>,
+    remotely_created_buffers: Arc<Mutex<RemotelyCreatedBuffers>>,
     nonce: u128,
     _maintain_buffer_languages: Task<()>,
     _maintain_workspace_config: Task<Result<()>>,
@@ -232,6 +235,32 @@ pub struct Project {
     cached_shell_environments: HashMap<WorktreeId, HashMap<String, String>>,
 }
 
+#[derive(Default)]
+struct RemotelyCreatedBuffers {
+    buffers: Vec<Model<Buffer>>,
+    retain_count: usize,
+}
+
+struct RemotelyCreatedBufferGuard {
+    remote_buffers: std::sync::Weak<Mutex<RemotelyCreatedBuffers>>,
+}
+
+impl Drop for RemotelyCreatedBufferGuard {
+    fn drop(&mut self) {
+        if let Some(remote_buffers) = self.remote_buffers.upgrade() {
+            let mut remote_buffers = remote_buffers.lock();
+            assert!(
+                remote_buffers.retain_count > 0,
+                "RemotelyCreatedBufferGuard dropped too many times"
+            );
+            remote_buffers.retain_count -= 1;
+            if remote_buffers.retain_count == 0 {
+                remote_buffers.buffers.clear();
+            }
+        }
+    }
+}
+
 #[derive(Debug)]
 pub enum LanguageServerToQuery {
     Primary,
@@ -680,29 +709,6 @@ impl DirectoryLister {
     }
 }
 
-#[derive(Clone, Debug, PartialEq)]
-enum SearchMatchCandidate {
-    OpenBuffer {
-        buffer: Model<Buffer>,
-        // This might be an unnamed file without representation on filesystem
-        path: Option<Arc<Path>>,
-    },
-    Path {
-        worktree_id: WorktreeId,
-        is_ignored: bool,
-        is_file: bool,
-        path: Arc<Path>,
-    },
-}
-
-pub enum SearchResult {
-    Buffer {
-        buffer: Model<Buffer>,
-        ranges: Vec<Range<Anchor>>,
-    },
-    LimitReached,
-}
-
 #[cfg(any(test, feature = "test-support"))]
 pub const DEFAULT_COMPLETION_CONTEXT: CompletionContext = CompletionContext {
     trigger_kind: lsp::CompletionTriggerKind::INVOKED,
@@ -752,6 +758,7 @@ impl Project {
         client.add_model_request_handler(Self::handle_lsp_command::<PrepareRename>);
         client.add_model_request_handler(Self::handle_lsp_command::<PerformRename>);
         client.add_model_request_handler(Self::handle_search_project);
+        client.add_model_request_handler(Self::handle_search_candidate_buffers);
         client.add_model_request_handler(Self::handle_get_project_symbols);
         client.add_model_request_handler(Self::handle_open_buffer_for_symbol);
         client.add_model_request_handler(Self::handle_open_buffer_by_id);
@@ -861,6 +868,7 @@ impl Project {
                 dev_server_project_id: None,
                 search_history: Self::new_search_history(),
                 cached_shell_environments: HashMap::default(),
+                remotely_created_buffers: Default::default(),
             }
         })
     }
@@ -1056,6 +1064,7 @@ impl Project {
                     .map(|dev_server_project_id| DevServerProjectId(dev_server_project_id)),
                 search_history: Self::new_search_history(),
                 cached_shell_environments: HashMap::default(),
+                remotely_created_buffers: Arc::new(Mutex::new(RemotelyCreatedBuffers::default())),
             };
             this.set_role(role, cx);
             for worktree in worktrees {
@@ -1939,6 +1948,15 @@ impl Project {
         self.is_disconnected() || self.capability() == Capability::ReadOnly
     }
 
+    pub fn is_local(&self) -> bool {
+        match &self.client_state {
+            ProjectClientState::Local | ProjectClientState::Shared { .. } => {
+                self.ssh_session.is_none()
+            }
+            ProjectClientState::Remote { .. } => false,
+        }
+    }
+
     pub fn is_local_or_ssh(&self) -> bool {
         match &self.client_state {
             ProjectClientState::Local | ProjectClientState::Shared { .. } => true,
@@ -1947,7 +1965,10 @@ impl Project {
     }
 
     pub fn is_via_collab(&self) -> bool {
-        !self.is_local_or_ssh()
+        match &self.client_state {
+            ProjectClientState::Local | ProjectClientState::Shared { .. } => false,
+            ProjectClientState::Remote { .. } => true,
+        }
     }
 
     pub fn create_buffer(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<Model<Buffer>>> {
@@ -2167,6 +2188,13 @@ impl Project {
         buffer: &Model<Buffer>,
         cx: &mut ModelContext<Self>,
     ) -> Result<()> {
+        {
+            let mut remotely_created_buffers = self.remotely_created_buffers.lock();
+            if remotely_created_buffers.retain_count > 0 {
+                remotely_created_buffers.buffers.push(buffer.clone())
+            }
+        }
+
         self.request_buffer_diff_recalculation(buffer, cx);
         buffer.update(cx, |buffer, _| {
             buffer.set_language_registry(self.languages.clone())
@@ -7242,182 +7270,26 @@ impl Project {
         }
     }
 
-    #[allow(clippy::type_complexity)]
     pub fn search(
-        &self,
-        query: SearchQuery,
-        cx: &mut ModelContext<Self>,
-    ) -> Receiver<SearchResult> {
-        if self.is_local_or_ssh() {
-            self.search_local(query, cx)
-        } else if let Some(project_id) = self.remote_id() {
-            let (tx, rx) = smol::channel::unbounded();
-            let request = self.client.request(query.to_proto(project_id));
-            cx.spawn(move |this, mut cx| async move {
-                let response = request.await?;
-                let mut result = HashMap::default();
-                for location in response.locations {
-                    let buffer_id = BufferId::new(location.buffer_id)?;
-                    let target_buffer = this
-                        .update(&mut cx, |this, cx| {
-                            this.wait_for_remote_buffer(buffer_id, cx)
-                        })?
-                        .await?;
-                    let start = location
-                        .start
-                        .and_then(deserialize_anchor)
-                        .ok_or_else(|| anyhow!("missing target start"))?;
-                    let end = location
-                        .end
-                        .and_then(deserialize_anchor)
-                        .ok_or_else(|| anyhow!("missing target end"))?;
-                    result
-                        .entry(target_buffer)
-                        .or_insert(Vec::new())
-                        .push(start..end)
-                }
-                for (buffer, ranges) in result {
-                    let _ = tx.send(SearchResult::Buffer { buffer, ranges }).await;
-                }
-
-                if response.limit_reached {
-                    let _ = tx.send(SearchResult::LimitReached).await;
-                }
-
-                Result::<(), anyhow::Error>::Ok(())
-            })
-            .detach_and_log_err(cx);
-            rx
-        } else {
-            unimplemented!();
-        }
-    }
-
-    pub fn search_local(
-        &self,
+        &mut self,
         query: SearchQuery,
         cx: &mut ModelContext<Self>,
     ) -> Receiver<SearchResult> {
-        // Local search is split into several phases.
-        // TL;DR is that we do 2 passes; initial pass to pick files which contain at least one match
-        // and the second phase that finds positions of all the matches found in the candidate files.
-        // The Receiver obtained from this function returns matches sorted by buffer path. Files without a buffer path are reported first.
-        //
-        // It gets a bit hairy though, because we must account for files that do not have a persistent representation
-        // on FS. Namely, if you have an untitled buffer or unsaved changes in a buffer, we want to scan that too.
-        //
-        // 1. We initialize a queue of match candidates and feed all opened buffers into it (== unsaved files / untitled buffers).
-        //    Then, we go through a worktree and check for files that do match a predicate. If the file had an opened version, we skip the scan
-        //    of FS version for that file altogether - after all, what we have in memory is more up-to-date than what's in FS.
-        // 2. At this point, we have a list of all potentially matching buffers/files.
-        //    We sort that list by buffer path - this list is retained for later use.
-        //    We ensure that all buffers are now opened and available in project.
-        // 3. We run a scan over all the candidate buffers on multiple background threads.
-        //    We cannot assume that there will even be a match - while at least one match
-        //    is guaranteed for files obtained from FS, the buffers we got from memory (unsaved files/unnamed buffers) might not have a match at all.
-        //    There is also an auxiliary background thread responsible for result gathering.
-        //    This is where the sorted list of buffers comes into play to maintain sorted order; Whenever this background thread receives a notification (buffer has/doesn't have matches),
-        //    it keeps it around. It reports matches in sorted order, though it accepts them in unsorted order as well.
-        //    As soon as the match info on next position in sorted order becomes available, it reports it (if it's a match) or skips to the next
-        //    entry - which might already be available thanks to out-of-order processing.
-        //
-        // We could also report matches fully out-of-order, without maintaining a sorted list of matching paths.
-        // This however would mean that project search (that is the main user of this function) would have to do the sorting itself, on the go.
-        // This isn't as straightforward as running an insertion sort sadly, and would also mean that it would have to care about maintaining match index
-        // in face of constantly updating list of sorted matches.
-        // Meanwhile, this implementation offers index stability, since the matches are already reported in a sorted order.
-        let snapshots = self
-            .visible_worktrees(cx)
-            .filter_map(|tree| {
-                let tree = tree.read(cx);
-                Some((tree.snapshot(), tree.as_local()?.settings()))
-            })
-            .collect::<Vec<_>>();
-        let include_root = snapshots.len() > 1;
-
-        let background = cx.background_executor().clone();
-        let path_count: usize = snapshots
-            .iter()
-            .map(|(snapshot, _)| {
-                if query.include_ignored() {
-                    snapshot.file_count()
-                } else {
-                    snapshot.visible_file_count()
-                }
-            })
-            .sum();
-        if path_count == 0 {
-            let (_, rx) = smol::channel::bounded(1024);
-            return rx;
-        }
-        let workers = background.num_cpus().min(path_count);
-        let (matching_paths_tx, matching_paths_rx) = smol::channel::bounded(1024);
-        let mut unnamed_files = vec![];
-        let opened_buffers = self.buffer_store.update(cx, |buffer_store, cx| {
-            buffer_store
-                .buffers()
-                .filter_map(|buffer| {
-                    let (is_ignored, snapshot) = buffer.update(cx, |buffer, cx| {
-                        let is_ignored = buffer
-                            .project_path(cx)
-                            .and_then(|path| self.entry_for_path(&path, cx))
-                            .map_or(false, |entry| entry.is_ignored);
-                        (is_ignored, buffer.snapshot())
-                    });
-                    if is_ignored && !query.include_ignored() {
-                        return None;
-                    } else if let Some(file) = snapshot.file() {
-                        let matched_path = if include_root {
-                            query.file_matches(Some(&file.full_path(cx)))
-                        } else {
-                            query.file_matches(Some(file.path()))
-                        };
-
-                        if matched_path {
-                            Some((file.path().clone(), (buffer, snapshot)))
-                        } else {
-                            None
-                        }
-                    } else {
-                        unnamed_files.push(buffer);
-                        None
-                    }
-                })
-                .collect()
-        });
-        cx.background_executor()
-            .spawn(Self::background_search(
-                unnamed_files,
-                opened_buffers,
-                cx.background_executor().clone(),
-                self.fs.clone(),
-                workers,
-                query.clone(),
-                include_root,
-                path_count,
-                snapshots,
-                matching_paths_tx,
-            ))
-            .detach();
-
         let (result_tx, result_rx) = smol::channel::bounded(1024);
 
-        cx.spawn(|this, mut cx| async move {
-            const MAX_SEARCH_RESULT_FILES: usize = 5_000;
-            const MAX_SEARCH_RESULT_RANGES: usize = 10_000;
+        let matching_buffers_rx =
+            self.search_for_candidate_buffers(&query, MAX_SEARCH_RESULT_FILES + 1, cx);
 
-            let mut matching_paths = matching_paths_rx
-                .take(MAX_SEARCH_RESULT_FILES + 1)
-                .collect::<Vec<_>>()
-                .await;
-            let mut limit_reached = if matching_paths.len() > MAX_SEARCH_RESULT_FILES {
-                matching_paths.pop();
+        cx.spawn(|_, cx| async move {
+            let mut matching_buffers = matching_buffers_rx.collect::<Vec<_>>().await;
+            let mut limit_reached = if matching_buffers.len() > MAX_SEARCH_RESULT_FILES {
+                matching_buffers.truncate(MAX_SEARCH_RESULT_FILES);
                 true
             } else {
                 false
             };
             cx.update(|cx| {
-                sort_search_matches(&mut matching_paths, cx);
+                sort_search_matches(&mut matching_buffers, cx);
             })?;
 
             let mut range_count = 0;
@@ -7427,23 +7299,12 @@ impl Project {
             // 64 buffers at a time to avoid overwhelming the main thread. For each
             // opened buffer, we will spawn a background task that retrieves all the
             // ranges in the buffer matched by the query.
-            'outer: for matching_paths_chunk in matching_paths.chunks(64) {
+            'outer: for matching_buffer_chunk in matching_buffers.chunks(64) {
                 let mut chunk_results = Vec::new();
-                for matching_path in matching_paths_chunk {
+                for buffer in matching_buffer_chunk {
+                    let buffer = buffer.clone();
                     let query = query.clone();
-                    let buffer = match matching_path {
-                        SearchMatchCandidate::OpenBuffer { buffer, .. } => {
-                            Task::ready(Ok(buffer.clone()))
-                        }
-                        SearchMatchCandidate::Path {
-                            worktree_id, path, ..
-                        } => this.update(&mut cx, |this, cx| {
-                            this.open_buffer((*worktree_id, path.clone()), cx)
-                        })?,
-                    };
-
                     chunk_results.push(cx.spawn(|cx| async move {
-                        let buffer = buffer.await?;
                         let snapshot = buffer.read_with(&cx, |buffer, _| buffer.snapshot())?;
                         let ranges = cx
                             .background_executor()
@@ -7489,93 +7350,63 @@ impl Project {
         result_rx
     }
 
-    /// Pick paths that might potentially contain a match of a given search query.
-    #[allow(clippy::too_many_arguments)]
-    async fn background_search(
-        unnamed_buffers: Vec<Model<Buffer>>,
-        opened_buffers: HashMap<Arc<Path>, (Model<Buffer>, BufferSnapshot)>,
-        executor: BackgroundExecutor,
-        fs: Arc<dyn Fs>,
-        workers: usize,
-        query: SearchQuery,
-        include_root: bool,
-        path_count: usize,
-        snapshots: Vec<(Snapshot, WorktreeSettings)>,
-        matching_paths_tx: Sender<SearchMatchCandidate>,
-    ) {
-        let fs = &fs;
-        let query = &query;
-        let matching_paths_tx = &matching_paths_tx;
-        let snapshots = &snapshots;
-        for buffer in unnamed_buffers {
-            matching_paths_tx
-                .send(SearchMatchCandidate::OpenBuffer {
-                    buffer: buffer.clone(),
-                    path: None,
-                })
-                .await
-                .log_err();
-        }
-        for (path, (buffer, _)) in opened_buffers.iter() {
-            matching_paths_tx
-                .send(SearchMatchCandidate::OpenBuffer {
-                    buffer: buffer.clone(),
-                    path: Some(path.clone()),
-                })
-                .await
-                .log_err();
+    fn search_for_candidate_buffers(
+        &mut self,
+        query: &SearchQuery,
+        limit: usize,
+        cx: &mut ModelContext<Project>,
+    ) -> Receiver<Model<Buffer>> {
+        if self.is_local() {
+            let fs = self.fs.clone();
+            return self.buffer_store.update(cx, |buffer_store, cx| {
+                buffer_store.find_search_candidates(query, limit, fs, cx)
+            });
+        } else {
+            self.search_for_candidate_buffers_remote(query, limit, cx)
         }
+    }
 
-        let paths_per_worker = (path_count + workers - 1) / workers;
-
-        executor
-            .scoped(|scope| {
-                let max_concurrent_workers = Arc::new(Semaphore::new(workers));
-
-                for worker_ix in 0..workers {
-                    let worker_start_ix = worker_ix * paths_per_worker;
-                    let worker_end_ix = worker_start_ix + paths_per_worker;
-                    let opened_buffers = opened_buffers.clone();
-                    let limiter = Arc::clone(&max_concurrent_workers);
-                    scope.spawn({
-                        async move {
-                            let _guard = limiter.acquire().await;
-                            search_snapshots(
-                                snapshots,
-                                worker_start_ix,
-                                worker_end_ix,
-                                query,
-                                matching_paths_tx,
-                                &opened_buffers,
-                                include_root,
-                                fs,
-                            )
-                            .await;
-                        }
-                    });
-                }
+    fn search_for_candidate_buffers_remote(
+        &mut self,
+        query: &SearchQuery,
+        limit: usize,
+        cx: &mut ModelContext<Project>,
+    ) -> Receiver<Model<Buffer>> {
+        let (tx, rx) = smol::channel::unbounded();
+
+        let (client, remote_id): (AnyProtoClient, _) =
+            if let Some(ssh_session) = self.ssh_session.clone() {
+                (ssh_session.into(), 0)
+            } else if let Some(remote_id) = self.remote_id() {
+                (self.client.clone().into(), remote_id)
+            } else {
+                return rx;
+            };
 
-                if query.include_ignored() {
-                    for (snapshot, settings) in snapshots {
-                        for ignored_entry in snapshot.entries(true, 0).filter(|e| e.is_ignored) {
-                            let limiter = Arc::clone(&max_concurrent_workers);
-                            scope.spawn(async move {
-                                let _guard = limiter.acquire().await;
-                                search_ignored_entry(
-                                    snapshot,
-                                    settings,
-                                    ignored_entry,
-                                    fs,
-                                    query,
-                                    matching_paths_tx,
-                                )
-                                .await;
-                            });
-                        }
-                    }
-                }
-            })
-            .await;
+        let request = client.request(proto::FindSearchCandidates {
+            project_id: remote_id,
+            query: Some(query.to_proto()),
+            limit: limit as _,
+        });
+        let guard = self.retain_remotely_created_buffers();
+
+        cx.spawn(move |this, mut cx| async move {
+            let response = request.await?;
+            for buffer_id in response.buffer_ids {
+                let buffer_id = BufferId::new(buffer_id)?;
+                let buffer = this
+                    .update(&mut cx, |this, cx| {
+                        this.wait_for_remote_buffer(buffer_id, cx)
+                    })?
+                    .await?;
+                let _ = tx.send(buffer).await;
+            }
+
+            drop(guard);
+            anyhow::Ok(())
+        })
+        .detach_and_log_err(cx);
+        rx
     }
 
     pub fn request_lsp<R: LspCommand>(
@@ -9075,6 +8906,13 @@ impl Project {
         BufferStore::handle_update_buffer(buffer_store, envelope, cx).await
     }
 
+    fn retain_remotely_created_buffers(&mut self) -> RemotelyCreatedBufferGuard {
+        self.remotely_created_buffers.lock().retain_count += 1;
+        RemotelyCreatedBufferGuard {
+            remote_buffers: Arc::downgrade(&self.remotely_created_buffers),
+        }
+    }
+
     async fn handle_create_buffer_for_peer(
         this: Model<Self>,
         envelope: TypedEnvelope<proto::CreateBufferForPeer>,
@@ -9770,7 +9608,7 @@ impl Project {
         mut cx: AsyncAppContext,
     ) -> Result<proto::SearchProjectResponse> {
         let peer_id = envelope.original_sender_id()?;
-        let query = SearchQuery::from_proto(envelope.payload)?;
+        let query = SearchQuery::from_proto_v1(envelope.payload)?;
         let mut result = this.update(&mut cx, |this, cx| this.search(query, cx))?;
 
         cx.spawn(move |mut cx| async move {
@@ -9798,11 +9636,42 @@ impl Project {
             Ok(proto::SearchProjectResponse {
                 locations,
                 limit_reached,
+                // will restart
             })
         })
         .await
     }
 
+    async fn handle_search_candidate_buffers(
+        this: Model<Self>,
+        envelope: TypedEnvelope<proto::FindSearchCandidates>,
+        mut cx: AsyncAppContext,
+    ) -> Result<proto::FindSearchCandidatesResponse> {
+        let peer_id = envelope.original_sender_id()?;
+        let message = envelope.payload;
+        let query = SearchQuery::from_proto(
+            message
+                .query
+                .ok_or_else(|| anyhow!("missing query field"))?,
+        )?;
+        let mut results = this.update(&mut cx, |this, cx| {
+            this.search_for_candidate_buffers(&query, message.limit as _, cx)
+        })?;
+
+        let mut response = proto::FindSearchCandidatesResponse {
+            buffer_ids: Vec::new(),
+        };
+
+        while let Some(buffer) = results.next().await {
+            this.update(&mut cx, |this, cx| {
+                let buffer_id = this.create_buffer_for_peer(&buffer, peer_id, cx);
+                response.buffer_ids.push(buffer_id.to_proto());
+            })?;
+        }
+
+        Ok(response)
+    }
+
     async fn handle_open_buffer_for_symbol(
         this: Model<Self>,
         envelope: TypedEnvelope<proto::OpenBufferForSymbol>,
@@ -10916,162 +10785,6 @@ fn deserialize_code_actions(code_actions: &HashMap<String, bool>) -> Vec<lsp::Co
         .collect()
 }
 
-#[allow(clippy::too_many_arguments)]
-async fn search_snapshots(
-    snapshots: &Vec<(Snapshot, WorktreeSettings)>,
-    worker_start_ix: usize,
-    worker_end_ix: usize,
-    query: &SearchQuery,
-    results_tx: &Sender<SearchMatchCandidate>,
-    opened_buffers: &HashMap<Arc<Path>, (Model<Buffer>, BufferSnapshot)>,
-    include_root: bool,
-    fs: &Arc<dyn Fs>,
-) {
-    let mut snapshot_start_ix = 0;
-    let mut abs_path = PathBuf::new();
-
-    for (snapshot, _) in snapshots {
-        let snapshot_end_ix = snapshot_start_ix
-            + if query.include_ignored() {
-                snapshot.file_count()
-            } else {
-                snapshot.visible_file_count()
-            };
-        if worker_end_ix <= snapshot_start_ix {
-            break;
-        } else if worker_start_ix > snapshot_end_ix {
-            snapshot_start_ix = snapshot_end_ix;
-            continue;
-        } else {
-            let start_in_snapshot = worker_start_ix.saturating_sub(snapshot_start_ix);
-            let end_in_snapshot = cmp::min(worker_end_ix, snapshot_end_ix) - snapshot_start_ix;
-
-            for entry in snapshot
-                .files(false, start_in_snapshot)
-                .take(end_in_snapshot - start_in_snapshot)
-            {
-                if results_tx.is_closed() {
-                    break;
-                }
-                if opened_buffers.contains_key(&entry.path) {
-                    continue;
-                }
-
-                let matched_path = if include_root {
-                    let mut full_path = PathBuf::from(snapshot.root_name());
-                    full_path.push(&entry.path);
-                    query.file_matches(Some(&full_path))
-                } else {
-                    query.file_matches(Some(&entry.path))
-                };
-
-                let matches = if matched_path {
-                    abs_path.clear();
-                    abs_path.push(&snapshot.abs_path());
-                    abs_path.push(&entry.path);
-
-                    if entry.is_fifo {
-                        false
-                    } else {
-                        if let Some(file) = fs.open_sync(&abs_path).await.log_err() {
-                            query.detect(file).unwrap_or(false)
-                        } else {
-                            false
-                        }
-                    }
-                } else {
-                    false
-                };
-
-                if matches {
-                    let project_path = SearchMatchCandidate::Path {
-                        worktree_id: snapshot.id(),
-                        path: entry.path.clone(),
-                        is_ignored: entry.is_ignored,
-                        is_file: entry.is_file(),
-                    };
-                    if results_tx.send(project_path).await.is_err() {
-                        return;
-                    }
-                }
-            }
-
-            snapshot_start_ix = snapshot_end_ix;
-        }
-    }
-}
-
-async fn search_ignored_entry(
-    snapshot: &Snapshot,
-    settings: &WorktreeSettings,
-    ignored_entry: &Entry,
-    fs: &Arc<dyn Fs>,
-    query: &SearchQuery,
-    counter_tx: &Sender<SearchMatchCandidate>,
-) {
-    let mut ignored_paths_to_process =
-        VecDeque::from([snapshot.abs_path().join(&ignored_entry.path)]);
-
-    while let Some(ignored_abs_path) = ignored_paths_to_process.pop_front() {
-        let metadata = fs
-            .metadata(&ignored_abs_path)
-            .await
-            .with_context(|| format!("fetching fs metadata for {ignored_abs_path:?}"))
-            .log_err()
-            .flatten();
-
-        if let Some(fs_metadata) = metadata {
-            if fs_metadata.is_dir {
-                let files = fs
-                    .read_dir(&ignored_abs_path)
-                    .await
-                    .with_context(|| format!("listing ignored path {ignored_abs_path:?}"))
-                    .log_err();
-
-                if let Some(mut subfiles) = files {
-                    while let Some(subfile) = subfiles.next().await {
-                        if let Some(subfile) = subfile.log_err() {
-                            ignored_paths_to_process.push_back(subfile);
-                        }
-                    }
-                }
-            } else if !fs_metadata.is_symlink {
-                if !query.file_matches(Some(&ignored_abs_path))
-                    || settings.is_path_excluded(&ignored_entry.path)
-                {
-                    continue;
-                }
-                let matches = if let Some(file) = fs
-                    .open_sync(&ignored_abs_path)
-                    .await
-                    .with_context(|| format!("Opening ignored path {ignored_abs_path:?}"))
-                    .log_err()
-                {
-                    query.detect(file).unwrap_or(false)
-                } else {
-                    false
-                };
-
-                if matches {
-                    let project_path = SearchMatchCandidate::Path {
-                        worktree_id: snapshot.id(),
-                        path: Arc::from(
-                            ignored_abs_path
-                                .strip_prefix(snapshot.abs_path())
-                                .expect("scanning worktree-related files"),
-                        ),
-                        is_ignored: true,
-                        is_file: ignored_entry.is_file(),
-                    };
-                    if counter_tx.send(project_path).await.is_err() {
-                        return;
-                    }
-                }
-            }
-        }
-    }
-}
-
 fn glob_literal_prefix(glob: &str) -> &str {
     let mut literal_end = 0;
     for (i, part) in glob.split(path::MAIN_SEPARATOR).enumerate() {
@@ -11657,74 +11370,18 @@ pub fn sort_worktree_entries(entries: &mut Vec<Entry>) {
     });
 }
 
-fn sort_search_matches(search_matches: &mut Vec<SearchMatchCandidate>, cx: &AppContext) {
-    search_matches.sort_by(|entry_a, entry_b| match (entry_a, entry_b) {
-        (
-            SearchMatchCandidate::OpenBuffer {
-                buffer: buffer_a,
-                path: None,
-            },
-            SearchMatchCandidate::OpenBuffer {
-                buffer: buffer_b,
-                path: None,
-            },
-        ) => buffer_a
-            .read(cx)
-            .remote_id()
-            .cmp(&buffer_b.read(cx).remote_id()),
-        (
-            SearchMatchCandidate::OpenBuffer { path: None, .. },
-            SearchMatchCandidate::Path { .. }
-            | SearchMatchCandidate::OpenBuffer { path: Some(_), .. },
-        ) => Ordering::Less,
-        (
-            SearchMatchCandidate::OpenBuffer { path: Some(_), .. }
-            | SearchMatchCandidate::Path { .. },
-            SearchMatchCandidate::OpenBuffer { path: None, .. },
-        ) => Ordering::Greater,
-        (
-            SearchMatchCandidate::OpenBuffer {
-                path: Some(path_a), ..
-            },
-            SearchMatchCandidate::Path {
-                is_file: is_file_b,
-                path: path_b,
-                ..
-            },
-        ) => compare_paths((path_a.as_ref(), true), (path_b.as_ref(), *is_file_b)),
-        (
-            SearchMatchCandidate::Path {
-                is_file: is_file_a,
-                path: path_a,
-                ..
-            },
-            SearchMatchCandidate::OpenBuffer {
-                path: Some(path_b), ..
-            },
-        ) => compare_paths((path_a.as_ref(), *is_file_a), (path_b.as_ref(), true)),
-        (
-            SearchMatchCandidate::OpenBuffer {
-                path: Some(path_a), ..
-            },
-            SearchMatchCandidate::OpenBuffer {
-                path: Some(path_b), ..
-            },
-        ) => compare_paths((path_a.as_ref(), true), (path_b.as_ref(), true)),
-        (
-            SearchMatchCandidate::Path {
-                worktree_id: worktree_id_a,
-                is_file: is_file_a,
-                path: path_a,
-                ..
-            },
-            SearchMatchCandidate::Path {
-                worktree_id: worktree_id_b,
-                is_file: is_file_b,
-                path: path_b,
-                ..
-            },
-        ) => worktree_id_a.cmp(&worktree_id_b).then_with(|| {
-            compare_paths((path_a.as_ref(), *is_file_a), (path_b.as_ref(), *is_file_b))
-        }),
+fn sort_search_matches(search_matches: &mut Vec<Model<Buffer>>, cx: &AppContext) {
+    search_matches.sort_by(|buffer_a, buffer_b| {
+        let path_a = buffer_a.read(cx).file().map(|file| file.path());
+        let path_b = buffer_b.read(cx).file().map(|file| file.path());
+
+        match (path_a, path_b) {
+            (None, None) => cmp::Ordering::Equal,
+            (None, Some(_)) => cmp::Ordering::Less,
+            (Some(_), None) => cmp::Ordering::Greater,
+            (Some(path_a), Some(path_b)) => {
+                compare_paths((path_a.as_ref(), true), (path_b.as_ref(), true))
+            }
+        }
     });
 }

crates/project/src/search.rs 🔗

@@ -1,7 +1,8 @@
 use aho_corasick::{AhoCorasick, AhoCorasickBuilder};
 use anyhow::Result;
 use client::proto;
-use language::{char_kind, BufferSnapshot};
+use gpui::Model;
+use language::{char_kind, Buffer, BufferSnapshot};
 use regex::{Captures, Regex, RegexBuilder};
 use smol::future::yield_now;
 use std::{
@@ -11,10 +12,19 @@ use std::{
     path::Path,
     sync::{Arc, OnceLock},
 };
+use text::Anchor;
 use util::paths::PathMatcher;
 
 static TEXT_REPLACEMENT_SPECIAL_CHARACTERS_REGEX: OnceLock<Regex> = OnceLock::new();
 
+pub enum SearchResult {
+    Buffer {
+        buffer: Model<Buffer>,
+        ranges: Vec<Range<Anchor>>,
+    },
+    LimitReached,
+}
+
 #[derive(Clone, Debug)]
 pub struct SearchInputs {
     query: Arc<str>,
@@ -122,7 +132,29 @@ impl SearchQuery {
         })
     }
 
-    pub fn from_proto(message: proto::SearchProject) -> Result<Self> {
+    pub fn from_proto_v1(message: proto::SearchProject) -> Result<Self> {
+        if message.regex {
+            Self::regex(
+                message.query,
+                message.whole_word,
+                message.case_sensitive,
+                message.include_ignored,
+                deserialize_path_matches(&message.files_to_include)?,
+                deserialize_path_matches(&message.files_to_exclude)?,
+            )
+        } else {
+            Self::text(
+                message.query,
+                message.whole_word,
+                message.case_sensitive,
+                message.include_ignored,
+                deserialize_path_matches(&message.files_to_include)?,
+                deserialize_path_matches(&message.files_to_exclude)?,
+            )
+        }
+    }
+
+    pub fn from_proto(message: proto::SearchQuery) -> Result<Self> {
         if message.regex {
             Self::regex(
                 message.query,
@@ -158,7 +190,7 @@ impl SearchQuery {
             }
         }
     }
-    pub fn to_proto(&self, project_id: u64) -> proto::SearchProject {
+    pub fn to_protov1(&self, project_id: u64) -> proto::SearchProject {
         proto::SearchProject {
             project_id,
             query: self.as_str().to_string(),
@@ -171,6 +203,18 @@ impl SearchQuery {
         }
     }
 
+    pub fn to_proto(&self) -> proto::SearchQuery {
+        proto::SearchQuery {
+            query: self.as_str().to_string(),
+            regex: self.is_regex(),
+            whole_word: self.whole_word(),
+            case_sensitive: self.case_sensitive(),
+            include_ignored: self.include_ignored(),
+            files_to_include: self.files_to_include().sources().join(","),
+            files_to_exclude: self.files_to_exclude().sources().join(","),
+        }
+    }
+
     pub fn detect<T: Read>(&self, stream: T) -> Result<bool> {
         if self.as_str().is_empty() {
             return Ok(false);
@@ -402,7 +446,7 @@ impl SearchQuery {
     }
 }
 
-fn deserialize_path_matches(glob_set: &str) -> anyhow::Result<PathMatcher> {
+pub fn deserialize_path_matches(glob_set: &str) -> anyhow::Result<PathMatcher> {
     let globs = glob_set
         .split(',')
         .map(str::trim)

crates/project/src/worktree_store.rs 🔗

@@ -1,12 +1,31 @@
+use std::{
+    cmp,
+    collections::VecDeque,
+    path::PathBuf,
+    sync::{
+        atomic::{AtomicUsize, Ordering::SeqCst},
+        Arc,
+    },
+};
+
 use anyhow::{anyhow, Context as _, Result};
-use collections::HashMap;
+use collections::{HashMap, HashSet};
+use fs::Fs;
 use gpui::{AppContext, AsyncAppContext, EntityId, EventEmitter, Model, ModelContext, WeakModel};
 use rpc::{
     proto::{self, AnyProtoClient},
     TypedEnvelope,
 };
+use smol::{
+    channel::{Receiver, Sender},
+    lock::Semaphore,
+    stream::StreamExt,
+};
 use text::ReplicaId;
-use worktree::{ProjectEntryId, Worktree, WorktreeId};
+use util::ResultExt;
+use worktree::{Entry, ProjectEntryId, Snapshot, Worktree, WorktreeId, WorktreeSettings};
+
+use crate::{search::SearchQuery, ProjectPath};
 
 pub struct WorktreeStore {
     is_shared: bool,
@@ -61,6 +80,15 @@ impl WorktreeStore {
             .find(|worktree| worktree.read(cx).contains_entry(entry_id))
     }
 
+    pub fn entry_for_id<'a>(
+        &'a self,
+        entry_id: ProjectEntryId,
+        cx: &'a AppContext,
+    ) -> Option<&'a Entry> {
+        self.worktrees()
+            .find_map(|worktree| worktree.read(cx).entry_for_id(entry_id))
+    }
+
     pub fn add(&mut self, worktree: &Model<Worktree>, cx: &mut ModelContext<Self>) {
         let push_strong_handle = self.is_shared || worktree.read(cx).is_visible();
         let handle = if push_strong_handle {
@@ -238,6 +266,287 @@ impl WorktreeStore {
         }
     }
 
+    /// search over all worktrees (ignoring open buffers)
+    /// the query is tested against the file on disk and matching files are returned.
+    pub fn find_search_candidates(
+        &self,
+        query: SearchQuery,
+        limit: usize,
+        skip_entries: HashSet<ProjectEntryId>,
+        fs: Arc<dyn Fs>,
+        cx: &ModelContext<Self>,
+    ) -> Receiver<ProjectPath> {
+        let (matching_paths_tx, matching_paths_rx) = smol::channel::bounded(1024);
+        let snapshots = self
+            .visible_worktrees(cx)
+            .filter_map(|tree| {
+                let tree = tree.read(cx);
+                Some((tree.snapshot(), tree.as_local()?.settings()))
+            })
+            .collect::<Vec<_>>();
+        let include_root = snapshots.len() > 1;
+        let path_count: usize = snapshots
+            .iter()
+            .map(|(snapshot, _)| {
+                if query.include_ignored() {
+                    snapshot.file_count()
+                } else {
+                    snapshot.visible_file_count()
+                }
+            })
+            .sum();
+
+        let remaining_paths = AtomicUsize::new(limit);
+        if path_count == 0 {
+            return matching_paths_rx;
+        }
+        let workers = cx.background_executor().num_cpus().min(path_count);
+        let paths_per_worker = (path_count + workers - 1) / workers;
+
+        let executor = cx.background_executor().clone();
+        cx.background_executor()
+            .spawn(async move {
+                let fs = &fs;
+                let query = &query;
+                let matching_paths_tx = &matching_paths_tx;
+                let snapshots = &snapshots;
+                let remaining_paths = &remaining_paths;
+
+                executor
+                    .scoped(move |scope| {
+                        let max_concurrent_workers = Arc::new(Semaphore::new(workers));
+
+                        for worker_ix in 0..workers {
+                            let snapshots = snapshots.clone();
+                            let worker_start_ix = worker_ix * paths_per_worker;
+                            let worker_end_ix = worker_start_ix + paths_per_worker;
+                            let skip_entries = skip_entries.clone();
+                            let limiter = Arc::clone(&max_concurrent_workers);
+                            scope.spawn(async move {
+                                let _guard = limiter.acquire().await;
+                                Self::search_snapshots(
+                                    &snapshots,
+                                    worker_start_ix,
+                                    worker_end_ix,
+                                    &query,
+                                    remaining_paths,
+                                    &matching_paths_tx,
+                                    &skip_entries,
+                                    include_root,
+                                    fs,
+                                )
+                                .await;
+                            });
+                        }
+
+                        if query.include_ignored() {
+                            for (snapshot, settings) in snapshots {
+                                for ignored_entry in
+                                    snapshot.entries(true, 0).filter(|e| e.is_ignored)
+                                {
+                                    let limiter = Arc::clone(&max_concurrent_workers);
+                                    scope.spawn(async move {
+                                        let _guard = limiter.acquire().await;
+                                        if remaining_paths.load(SeqCst) == 0 {
+                                            return;
+                                        }
+
+                                        Self::search_ignored_entry(
+                                            &snapshot,
+                                            &settings,
+                                            ignored_entry,
+                                            &fs,
+                                            &query,
+                                            remaining_paths,
+                                            &matching_paths_tx,
+                                        )
+                                        .await;
+                                    });
+                                }
+                            }
+                        }
+                    })
+                    .await
+            })
+            .detach();
+        return matching_paths_rx;
+    }
+
+    #[allow(clippy::too_many_arguments)]
+    async fn search_snapshots(
+        snapshots: &Vec<(worktree::Snapshot, WorktreeSettings)>,
+        worker_start_ix: usize,
+        worker_end_ix: usize,
+        query: &SearchQuery,
+        remaining_paths: &AtomicUsize,
+        results_tx: &Sender<ProjectPath>,
+        skip_entries: &HashSet<ProjectEntryId>,
+        include_root: bool,
+        fs: &Arc<dyn Fs>,
+    ) {
+        let mut snapshot_start_ix = 0;
+        let mut abs_path = PathBuf::new();
+
+        for (snapshot, _) in snapshots {
+            let snapshot_end_ix = snapshot_start_ix
+                + if query.include_ignored() {
+                    snapshot.file_count()
+                } else {
+                    snapshot.visible_file_count()
+                };
+            if worker_end_ix <= snapshot_start_ix {
+                break;
+            } else if worker_start_ix > snapshot_end_ix {
+                snapshot_start_ix = snapshot_end_ix;
+                continue;
+            } else {
+                let start_in_snapshot = worker_start_ix.saturating_sub(snapshot_start_ix);
+                let end_in_snapshot = cmp::min(worker_end_ix, snapshot_end_ix) - snapshot_start_ix;
+
+                for entry in snapshot
+                    .files(false, start_in_snapshot)
+                    .take(end_in_snapshot - start_in_snapshot)
+                {
+                    if results_tx.is_closed() {
+                        break;
+                    }
+                    if skip_entries.contains(&entry.id) {
+                        continue;
+                    }
+
+                    let matched_path = if include_root {
+                        let mut full_path = PathBuf::from(snapshot.root_name());
+                        full_path.push(&entry.path);
+                        query.file_matches(Some(&full_path))
+                    } else {
+                        query.file_matches(Some(&entry.path))
+                    };
+
+                    let matches = if matched_path {
+                        abs_path.clear();
+                        abs_path.push(&snapshot.abs_path());
+                        abs_path.push(&entry.path);
+                        if let Some(file) = fs.open_sync(&abs_path).await.log_err() {
+                            query.detect(file).unwrap_or(false)
+                        } else {
+                            false
+                        }
+                    } else {
+                        false
+                    };
+
+                    if matches {
+                        if remaining_paths
+                            .fetch_update(SeqCst, SeqCst, |value| {
+                                if value > 0 {
+                                    Some(value - 1)
+                                } else {
+                                    None
+                                }
+                            })
+                            .is_err()
+                        {
+                            return;
+                        }
+
+                        let project_path = ProjectPath {
+                            worktree_id: snapshot.id(),
+                            path: entry.path.clone(),
+                        };
+                        if results_tx.send(project_path).await.is_err() {
+                            return;
+                        }
+                    }
+                }
+
+                snapshot_start_ix = snapshot_end_ix;
+            }
+        }
+    }
+
+    async fn search_ignored_entry(
+        snapshot: &Snapshot,
+        settings: &WorktreeSettings,
+        ignored_entry: &Entry,
+        fs: &Arc<dyn Fs>,
+        query: &SearchQuery,
+        remaining_paths: &AtomicUsize,
+        counter_tx: &Sender<ProjectPath>,
+    ) {
+        let mut ignored_paths_to_process =
+            VecDeque::from([snapshot.abs_path().join(&ignored_entry.path)]);
+
+        while let Some(ignored_abs_path) = ignored_paths_to_process.pop_front() {
+            let metadata = fs
+                .metadata(&ignored_abs_path)
+                .await
+                .with_context(|| format!("fetching fs metadata for {ignored_abs_path:?}"))
+                .log_err()
+                .flatten();
+
+            if let Some(fs_metadata) = metadata {
+                if fs_metadata.is_dir {
+                    let files = fs
+                        .read_dir(&ignored_abs_path)
+                        .await
+                        .with_context(|| format!("listing ignored path {ignored_abs_path:?}"))
+                        .log_err();
+
+                    if let Some(mut subfiles) = files {
+                        while let Some(subfile) = subfiles.next().await {
+                            if let Some(subfile) = subfile.log_err() {
+                                ignored_paths_to_process.push_back(subfile);
+                            }
+                        }
+                    }
+                } else if !fs_metadata.is_symlink {
+                    if !query.file_matches(Some(&ignored_abs_path))
+                        || settings.is_path_excluded(&ignored_entry.path)
+                    {
+                        continue;
+                    }
+                    let matches = if let Some(file) = fs
+                        .open_sync(&ignored_abs_path)
+                        .await
+                        .with_context(|| format!("Opening ignored path {ignored_abs_path:?}"))
+                        .log_err()
+                    {
+                        query.detect(file).unwrap_or(false)
+                    } else {
+                        false
+                    };
+
+                    if matches {
+                        if remaining_paths
+                            .fetch_update(SeqCst, SeqCst, |value| {
+                                if value > 0 {
+                                    Some(value - 1)
+                                } else {
+                                    None
+                                }
+                            })
+                            .is_err()
+                        {
+                            return;
+                        }
+
+                        let project_path = ProjectPath {
+                            worktree_id: snapshot.id(),
+                            path: Arc::from(
+                                ignored_abs_path
+                                    .strip_prefix(snapshot.abs_path())
+                                    .expect("scanning worktree-related files"),
+                            ),
+                        };
+                        if counter_tx.send(project_path).await.is_err() {
+                            return;
+                        }
+                    }
+                }
+            }
+        }
+    }
+
     pub async fn handle_create_project_entry(
         this: Model<Self>,
         envelope: TypedEnvelope<proto::CreateProjectEntry>,

crates/proto/proto/zed.proto 🔗

@@ -275,7 +275,10 @@ message Envelope {
         GetLlmTokenResponse get_llm_token_response = 236;
 
         LspExtSwitchSourceHeader lsp_ext_switch_source_header = 241;
-        LspExtSwitchSourceHeaderResponse lsp_ext_switch_source_header_response = 242; // current max
+        LspExtSwitchSourceHeaderResponse lsp_ext_switch_source_header_response = 242;
+
+        FindSearchCandidates find_search_candidates = 243;
+        FindSearchCandidatesResponse find_search_candidates_response = 244; // current max
     }
 
     reserved 158 to 161;
@@ -1236,6 +1239,26 @@ message SearchProjectResponse {
     bool limit_reached = 2;
 }
 
+message SearchQuery {
+    string query = 2;
+    bool regex = 3;
+    bool whole_word = 4;
+    bool case_sensitive = 5;
+    string files_to_include = 6;
+    string files_to_exclude = 7;
+    bool include_ignored = 8;
+}
+
+message FindSearchCandidates {
+    uint64 project_id = 1;
+    SearchQuery query = 2;
+    uint64 limit = 3;
+}
+
+message FindSearchCandidatesResponse {
+    repeated uint64 buffer_ids = 1;
+}
+
 message CodeAction {
     uint64 server_id = 1;
     Anchor start = 2;

crates/proto/src/proto.rs 🔗

@@ -410,6 +410,8 @@ messages!(
     (LspExtSwitchSourceHeaderResponse, Background),
     (AddWorktree, Foreground),
     (AddWorktreeResponse, Foreground),
+    (FindSearchCandidates, Background),
+    (FindSearchCandidatesResponse, Background)
 );
 
 request_messages!(
@@ -498,6 +500,7 @@ request_messages!(
     (RespondToContactRequest, Ack),
     (SaveBuffer, BufferSaved),
     (SearchProject, SearchProjectResponse),
+    (FindSearchCandidates, FindSearchCandidatesResponse),
     (SendChannelMessage, SendChannelMessageResponse),
     (SetChannelMemberRole, Ack),
     (SetChannelVisibility, Ack),
@@ -547,6 +550,7 @@ entity_messages!(
     CreateProjectEntry,
     DeleteProjectEntry,
     ExpandProjectEntry,
+    FindSearchCandidates,
     FormatBuffers,
     GetCodeActions,
     GetCompletions,

crates/remote_server/src/headless_project.rs 🔗

@@ -1,8 +1,9 @@
-use anyhow::Result;
+use anyhow::{anyhow, Result};
 use fs::Fs;
 use gpui::{AppContext, AsyncAppContext, Context, Model, ModelContext};
 use project::{
     buffer_store::{BufferStore, BufferStoreEvent},
+    search::SearchQuery,
     worktree_store::WorktreeStore,
     ProjectPath, WorktreeId, WorktreeSettings,
 };
@@ -49,6 +50,7 @@ impl HeadlessProject {
         session.add_request_handler(this.clone(), Self::handle_list_remote_directory);
         session.add_request_handler(this.clone(), Self::handle_add_worktree);
         session.add_request_handler(this.clone(), Self::handle_open_buffer_by_path);
+        session.add_request_handler(this.clone(), Self::handle_find_search_candidates);
 
         session.add_request_handler(buffer_store.downgrade(), BufferStore::handle_blame_buffer);
         session.add_request_handler(buffer_store.downgrade(), BufferStore::handle_update_buffer);
@@ -160,6 +162,49 @@ impl HeadlessProject {
         })
     }
 
+    pub async fn handle_find_search_candidates(
+        this: Model<Self>,
+        envelope: TypedEnvelope<proto::FindSearchCandidates>,
+        mut cx: AsyncAppContext,
+    ) -> Result<proto::FindSearchCandidatesResponse> {
+        let message = envelope.payload;
+        let query = SearchQuery::from_proto(
+            message
+                .query
+                .ok_or_else(|| anyhow!("missing query field"))?,
+        )?;
+        let mut results = this.update(&mut cx, |this, cx| {
+            this.buffer_store.update(cx, |buffer_store, cx| {
+                buffer_store.find_search_candidates(&query, message.limit as _, this.fs.clone(), cx)
+            })
+        })?;
+
+        let mut response = proto::FindSearchCandidatesResponse {
+            buffer_ids: Vec::new(),
+        };
+
+        let (buffer_store, client) = this.update(&mut cx, |this, _| {
+            (this.buffer_store.clone(), this.session.clone())
+        })?;
+
+        while let Some(buffer) = results.next().await {
+            let buffer_id = buffer.update(&mut cx, |this, _| this.remote_id())?;
+            response.buffer_ids.push(buffer_id.to_proto());
+
+            BufferStore::create_buffer_for_peer(
+                buffer_store.clone(),
+                PEER_ID,
+                buffer_id,
+                PROJECT_ID,
+                client.clone(),
+                &mut cx,
+            )
+            .await?;
+        }
+
+        Ok(response)
+    }
+
     pub async fn handle_list_remote_directory(
         this: Model<Self>,
         envelope: TypedEnvelope<proto::ListRemoteDirectory>,

crates/remote_server/src/remote_editing_tests.rs 🔗

@@ -1,55 +1,24 @@
 use crate::headless_project::HeadlessProject;
 use client::{Client, UserStore};
 use clock::FakeSystemClock;
-use fs::{FakeFs, Fs as _};
+use fs::{FakeFs, Fs};
 use gpui::{Context, Model, TestAppContext};
 use http_client::FakeHttpClient;
 use language::LanguageRegistry;
 use node_runtime::FakeNodeRuntime;
-use project::Project;
+use project::{
+    search::{SearchQuery, SearchResult},
+    Project,
+};
 use remote::SshSession;
 use serde_json::json;
 use settings::SettingsStore;
+use smol::stream::StreamExt;
 use std::{path::Path, sync::Arc};
 
-fn init_logger() {
-    if std::env::var("RUST_LOG").is_ok() {
-        env_logger::try_init().ok();
-    }
-}
-
 #[gpui::test]
 async fn test_remote_editing(cx: &mut TestAppContext, server_cx: &mut TestAppContext) {
-    let (client_ssh, server_ssh) = SshSession::fake(cx, server_cx);
-    init_logger();
-
-    let fs = FakeFs::new(server_cx.executor());
-    fs.insert_tree(
-        "/code",
-        json!({
-            "project1": {
-                ".git": {},
-                "README.md": "# project 1",
-                "src": {
-                    "lib.rs": "fn one() -> usize { 1 }"
-                }
-            },
-            "project2": {
-                "README.md": "# project 2",
-            },
-        }),
-    )
-    .await;
-    fs.set_index_for_repo(
-        Path::new("/code/project1/.git"),
-        &[(Path::new("src/lib.rs"), "fn one() -> usize { 0 }".into())],
-    );
-
-    server_cx.update(HeadlessProject::init);
-    let _headless_project =
-        server_cx.new_model(|cx| HeadlessProject::new(server_ssh, fs.clone(), cx));
-
-    let project = build_project(client_ssh, cx);
+    let (project, _headless, fs) = init_test(cx, server_cx).await;
     let (worktree, _) = project
         .update(cx, |project, cx| {
             project.find_or_create_worktree("/code/project1", true, cx)
@@ -150,6 +119,96 @@ async fn test_remote_editing(cx: &mut TestAppContext, server_cx: &mut TestAppCon
     });
 }
 
+#[gpui::test]
+async fn test_remote_project_search(cx: &mut TestAppContext, server_cx: &mut TestAppContext) {
+    let (project, _, _) = init_test(cx, server_cx).await;
+
+    project
+        .update(cx, |project, cx| {
+            project.find_or_create_worktree("/code/project1", true, cx)
+        })
+        .await
+        .unwrap();
+
+    cx.run_until_parked();
+
+    let mut receiver = project.update(cx, |project, cx| {
+        project.search(
+            SearchQuery::text(
+                "project",
+                false,
+                true,
+                false,
+                Default::default(),
+                Default::default(),
+            )
+            .unwrap(),
+            cx,
+        )
+    });
+
+    let first_response = receiver.next().await.unwrap();
+    let SearchResult::Buffer { buffer, .. } = first_response else {
+        panic!("incorrect result");
+    };
+    buffer.update(cx, |buffer, cx| {
+        assert_eq!(
+            buffer.file().unwrap().full_path(cx).to_string_lossy(),
+            "project1/README.md"
+        )
+    });
+
+    assert!(receiver.next().await.is_none());
+}
+
+fn init_logger() {
+    if std::env::var("RUST_LOG").is_ok() {
+        env_logger::try_init().ok();
+    }
+}
+
+async fn init_test(
+    cx: &mut TestAppContext,
+    server_cx: &mut TestAppContext,
+) -> (Model<Project>, Model<HeadlessProject>, Arc<FakeFs>) {
+    let (client_ssh, server_ssh) = SshSession::fake(cx, server_cx);
+    init_logger();
+
+    let fs = FakeFs::new(server_cx.executor());
+    fs.insert_tree(
+        "/code",
+        json!({
+            "project1": {
+                ".git": {},
+                "README.md": "# project 1",
+                "src": {
+                    "lib.rs": "fn one() -> usize { 1 }"
+                }
+            },
+            "project2": {
+                "README.md": "# project 2",
+            },
+        }),
+    )
+    .await;
+    fs.set_index_for_repo(
+        Path::new("/code/project1/.git"),
+        &[(Path::new("src/lib.rs"), "fn one() -> usize { 0 }".into())],
+    );
+
+    server_cx.update(HeadlessProject::init);
+    let headless = server_cx.new_model(|cx| HeadlessProject::new(server_ssh, fs.clone(), cx));
+    let project = build_project(client_ssh, cx);
+
+    project
+        .update(cx, {
+            let headless = headless.clone();
+            |_, cx| cx.on_release(|_, _| drop(headless))
+        })
+        .detach();
+    (project, headless, fs)
+}
+
 fn build_project(ssh: Arc<SshSession>, cx: &mut TestAppContext) -> Model<Project> {
     cx.update(|cx| {
         let settings_store = SettingsStore::test(cx);

crates/search/src/project_search.rs 🔗

@@ -222,7 +222,7 @@ impl ProjectSearch {
             let mut limit_reached = false;
             while let Some(result) = matches.next().await {
                 match result {
-                    project::SearchResult::Buffer { buffer, ranges } => {
+                    project::search::SearchResult::Buffer { buffer, ranges } => {
                         let mut match_ranges = this
                             .update(&mut cx, |this, cx| {
                                 this.excerpts.update(cx, |excerpts, cx| {
@@ -245,7 +245,7 @@ impl ProjectSearch {
                         }
                         this.update(&mut cx, |_, cx| cx.notify()).ok()?;
                     }
-                    project::SearchResult::LimitReached => {
+                    project::search::SearchResult::LimitReached => {
                         limit_reached = true;
                     }
                 }