Un serialize project search (#2857)

Piotr Osiewicz and Julia Risley created

This is the first batch of improvements to current project search. There
are few things we can do better still, but I want to get this out in
next Preview.
Most of the slowness at this point seems to stem from updating UI too
often.

Release Notes:
- Improved project search by making it report results sooner.

---------

Co-authored-by: Julia Risley <julia@zed.dev>

Change summary

crates/collab/src/tests/integration_tests.rs            |  21 
crates/collab/src/tests/randomized_integration_tests.rs |  18 
crates/editor/src/multi_buffer.rs                       |  96 
crates/project/src/project.rs                           | 583 +++++++---
crates/project/src/project_tests.rs                     |  11 
crates/search/src/project_search.rs                     |  62 
6 files changed, 490 insertions(+), 301 deletions(-)

Detailed changes

crates/collab/src/tests/integration_tests.rs 🔗

@@ -4,7 +4,7 @@ use crate::{
 };
 use call::{room, ActiveCall, ParticipantLocation, Room};
 use client::{User, RECEIVE_TIMEOUT};
-use collections::HashSet;
+use collections::{HashMap, HashSet};
 use editor::{
     test::editor_test_context::EditorTestContext, ConfirmCodeAction, ConfirmCompletion,
     ConfirmRename, Editor, ExcerptRange, MultiBuffer, Redo, Rename, ToggleCodeActions, Undo,
@@ -4821,15 +4821,16 @@ async fn test_project_search(
     let project_b = client_b.build_remote_project(project_id, cx_b).await;
 
     // Perform a search as the guest.
-    let results = project_b
-        .update(cx_b, |project, cx| {
-            project.search(
-                SearchQuery::text("world", false, false, Vec::new(), Vec::new()),
-                cx,
-            )
-        })
-        .await
-        .unwrap();
+    let mut results = HashMap::default();
+    let mut search_rx = project_b.update(cx_b, |project, cx| {
+        project.search(
+            SearchQuery::text("world", false, false, Vec::new(), Vec::new()),
+            cx,
+        )
+    });
+    while let Some((buffer, ranges)) = search_rx.next().await {
+        results.entry(buffer).or_insert(ranges);
+    }
 
     let mut ranges_by_path = results
         .into_iter()

crates/collab/src/tests/randomized_integration_tests.rs 🔗

@@ -6,7 +6,7 @@ use crate::{
 use anyhow::{anyhow, Result};
 use call::ActiveCall;
 use client::RECEIVE_TIMEOUT;
-use collections::BTreeMap;
+use collections::{BTreeMap, HashMap};
 use editor::Bias;
 use fs::{repository::GitFileStatus, FakeFs, Fs as _};
 use futures::StreamExt as _;
@@ -722,7 +722,7 @@ async fn apply_client_operation(
                 if detach { "detaching" } else { "awaiting" }
             );
 
-            let search = project.update(cx, |project, cx| {
+            let mut search = project.update(cx, |project, cx| {
                 project.search(
                     SearchQuery::text(query, false, false, Vec::new(), Vec::new()),
                     cx,
@@ -730,15 +730,13 @@ async fn apply_client_operation(
             });
             drop(project);
             let search = cx.background().spawn(async move {
-                search
-                    .await
-                    .map_err(|err| anyhow!("search request failed: {:?}", err))
+                let mut results = HashMap::default();
+                while let Some((buffer, ranges)) = search.next().await {
+                    results.entry(buffer).or_insert(ranges);
+                }
+                results
             });
-            if detach {
-                cx.update(|cx| search.detach_and_log_err(cx));
-            } else {
-                search.await?;
-            }
+            search.await;
         }
 
         ClientOperation::WriteFsEntry {

crates/editor/src/multi_buffer.rs 🔗

@@ -6,7 +6,7 @@ use clock::ReplicaId;
 use collections::{BTreeMap, Bound, HashMap, HashSet};
 use futures::{channel::mpsc, SinkExt};
 use git::diff::DiffHunk;
-use gpui::{AppContext, Entity, ModelContext, ModelHandle, Task};
+use gpui::{AppContext, Entity, ModelContext, ModelHandle};
 pub use language::Completion;
 use language::{
     char_kind,
@@ -788,59 +788,59 @@ impl MultiBuffer {
 
     pub fn stream_excerpts_with_context_lines(
         &mut self,
-        excerpts: Vec<(ModelHandle<Buffer>, Vec<Range<text::Anchor>>)>,
+        buffer: ModelHandle<Buffer>,
+        ranges: Vec<Range<text::Anchor>>,
         context_line_count: u32,
         cx: &mut ModelContext<Self>,
-    ) -> (Task<()>, mpsc::Receiver<Range<Anchor>>) {
+    ) -> mpsc::Receiver<Range<Anchor>> {
         let (mut tx, rx) = mpsc::channel(256);
-        let task = cx.spawn(|this, mut cx| async move {
-            for (buffer, ranges) in excerpts {
-                let (buffer_id, buffer_snapshot) =
-                    buffer.read_with(&cx, |buffer, _| (buffer.remote_id(), buffer.snapshot()));
-
-                let mut excerpt_ranges = Vec::new();
-                let mut range_counts = Vec::new();
-                cx.background()
-                    .scoped(|scope| {
-                        scope.spawn(async {
-                            let (ranges, counts) =
-                                build_excerpt_ranges(&buffer_snapshot, &ranges, context_line_count);
-                            excerpt_ranges = ranges;
-                            range_counts = counts;
-                        });
-                    })
-                    .await;
-
-                let mut ranges = ranges.into_iter();
-                let mut range_counts = range_counts.into_iter();
-                for excerpt_ranges in excerpt_ranges.chunks(100) {
-                    let excerpt_ids = this.update(&mut cx, |this, cx| {
-                        this.push_excerpts(buffer.clone(), excerpt_ranges.iter().cloned(), cx)
+        cx.spawn(|this, mut cx| async move {
+            let (buffer_id, buffer_snapshot) =
+                buffer.read_with(&cx, |buffer, _| (buffer.remote_id(), buffer.snapshot()));
+
+            let mut excerpt_ranges = Vec::new();
+            let mut range_counts = Vec::new();
+            cx.background()
+                .scoped(|scope| {
+                    scope.spawn(async {
+                        let (ranges, counts) =
+                            build_excerpt_ranges(&buffer_snapshot, &ranges, context_line_count);
+                        excerpt_ranges = ranges;
+                        range_counts = counts;
                     });
+                })
+                .await;
 
-                    for (excerpt_id, range_count) in
-                        excerpt_ids.into_iter().zip(range_counts.by_ref())
-                    {
-                        for range in ranges.by_ref().take(range_count) {
-                            let start = Anchor {
-                                buffer_id: Some(buffer_id),
-                                excerpt_id: excerpt_id.clone(),
-                                text_anchor: range.start,
-                            };
-                            let end = Anchor {
-                                buffer_id: Some(buffer_id),
-                                excerpt_id: excerpt_id.clone(),
-                                text_anchor: range.end,
-                            };
-                            if tx.send(start..end).await.is_err() {
-                                break;
-                            }
+            let mut ranges = ranges.into_iter();
+            let mut range_counts = range_counts.into_iter();
+            for excerpt_ranges in excerpt_ranges.chunks(100) {
+                let excerpt_ids = this.update(&mut cx, |this, cx| {
+                    this.push_excerpts(buffer.clone(), excerpt_ranges.iter().cloned(), cx)
+                });
+
+                for (excerpt_id, range_count) in excerpt_ids.into_iter().zip(range_counts.by_ref())
+                {
+                    for range in ranges.by_ref().take(range_count) {
+                        let start = Anchor {
+                            buffer_id: Some(buffer_id),
+                            excerpt_id: excerpt_id.clone(),
+                            text_anchor: range.start,
+                        };
+                        let end = Anchor {
+                            buffer_id: Some(buffer_id),
+                            excerpt_id: excerpt_id.clone(),
+                            text_anchor: range.end,
+                        };
+                        if tx.send(start..end).await.is_err() {
+                            break;
                         }
                     }
                 }
             }
-        });
-        (task, rx)
+        })
+        .detach();
+
+        rx
     }
 
     pub fn push_excerpts<O>(
@@ -4438,7 +4438,7 @@ mod tests {
     async fn test_stream_excerpts_with_context_lines(cx: &mut TestAppContext) {
         let buffer = cx.add_model(|cx| Buffer::new(0, sample_text(20, 3, 'a'), cx));
         let multibuffer = cx.add_model(|_| MultiBuffer::new(0));
-        let (task, anchor_ranges) = multibuffer.update(cx, |multibuffer, cx| {
+        let anchor_ranges = multibuffer.update(cx, |multibuffer, cx| {
             let snapshot = buffer.read(cx);
             let ranges = vec![
                 snapshot.anchor_before(Point::new(3, 2))..snapshot.anchor_before(Point::new(4, 2)),
@@ -4446,12 +4446,10 @@ mod tests {
                 snapshot.anchor_before(Point::new(15, 0))
                     ..snapshot.anchor_before(Point::new(15, 0)),
             ];
-            multibuffer.stream_excerpts_with_context_lines(vec![(buffer.clone(), ranges)], 2, cx)
+            multibuffer.stream_excerpts_with_context_lines(buffer.clone(), ranges, 2, cx)
         });
 
         let anchor_ranges = anchor_ranges.collect::<Vec<_>>().await;
-        // Ensure task is finished when stream completes.
-        task.await;
 
         let snapshot = multibuffer.read_with(cx, |multibuffer, cx| multibuffer.snapshot(cx));
         assert_eq!(

crates/project/src/project.rs 🔗

@@ -26,8 +26,8 @@ use futures::{
 };
 use globset::{Glob, GlobSet, GlobSetBuilder};
 use gpui::{
-    AnyModelHandle, AppContext, AsyncAppContext, BorrowAppContext, Entity, ModelContext,
-    ModelHandle, Task, WeakModelHandle,
+    executor::Background, AnyModelHandle, AppContext, AsyncAppContext, BorrowAppContext, Entity,
+    ModelContext, ModelHandle, Task, WeakModelHandle,
 };
 use itertools::Itertools;
 use language::{
@@ -37,11 +37,11 @@ use language::{
         deserialize_anchor, deserialize_fingerprint, deserialize_line_ending, deserialize_version,
         serialize_anchor, serialize_version,
     },
-    range_from_lsp, range_to_lsp, Bias, Buffer, CachedLspAdapter, CodeAction, CodeLabel,
-    Completion, Diagnostic, DiagnosticEntry, DiagnosticSet, Diff, Event as BufferEvent, File as _,
-    Language, LanguageRegistry, LanguageServerName, LocalFile, LspAdapterDelegate, OffsetRangeExt,
-    Operation, Patch, PendingLanguageServer, PointUtf16, TextBufferSnapshot, ToOffset,
-    ToPointUtf16, Transaction, Unclipped,
+    range_from_lsp, range_to_lsp, Bias, Buffer, BufferSnapshot, CachedLspAdapter, CodeAction,
+    CodeLabel, Completion, Diagnostic, DiagnosticEntry, DiagnosticSet, Diff, Event as BufferEvent,
+    File as _, Language, LanguageRegistry, LanguageServerName, LocalFile, LspAdapterDelegate,
+    OffsetRangeExt, Operation, Patch, PendingLanguageServer, PointUtf16, TextBufferSnapshot,
+    ToOffset, ToPointUtf16, Transaction, Unclipped,
 };
 use log::error;
 use lsp::{
@@ -57,8 +57,8 @@ use serde::Serialize;
 use settings::SettingsStore;
 use sha2::{Digest, Sha256};
 use similar::{ChangeTag, TextDiff};
+use smol::channel::{Receiver, Sender};
 use std::{
-    cell::RefCell,
     cmp::{self, Ordering},
     convert::TryInto,
     hash::Hash,
@@ -67,7 +67,6 @@ use std::{
     ops::Range,
     path::{self, Component, Path, PathBuf},
     process::Stdio,
-    rc::Rc,
     str,
     sync::{
         atomic::{AtomicUsize, Ordering::SeqCst},
@@ -525,6 +524,28 @@ impl FormatTrigger {
         }
     }
 }
+#[derive(Clone, Debug, PartialEq)]
+enum SearchMatchCandidate {
+    OpenBuffer {
+        buffer: ModelHandle<Buffer>,
+        // This might be an unnamed file without representation on filesystem
+        path: Option<Arc<Path>>,
+    },
+    Path {
+        worktree_id: WorktreeId,
+        path: Arc<Path>,
+    },
+}
+
+type SearchMatchCandidateIndex = usize;
+impl SearchMatchCandidate {
+    fn path(&self) -> Option<Arc<Path>> {
+        match self {
+            SearchMatchCandidate::OpenBuffer { path, .. } => path.clone(),
+            SearchMatchCandidate::Path { path, .. } => Some(path.clone()),
+        }
+    }
+}
 
 impl Project {
     pub fn init_settings(cx: &mut AppContext) {
@@ -5099,187 +5120,11 @@ impl Project {
         &self,
         query: SearchQuery,
         cx: &mut ModelContext<Self>,
-    ) -> Task<Result<HashMap<ModelHandle<Buffer>, Vec<Range<Anchor>>>>> {
+    ) -> Receiver<(ModelHandle<Buffer>, Vec<Range<Anchor>>)> {
         if self.is_local() {
-            let snapshots = self
-                .visible_worktrees(cx)
-                .filter_map(|tree| {
-                    let tree = tree.read(cx).as_local()?;
-                    Some(tree.snapshot())
-                })
-                .collect::<Vec<_>>();
-
-            let background = cx.background().clone();
-            let path_count: usize = snapshots.iter().map(|s| s.visible_file_count()).sum();
-            if path_count == 0 {
-                return Task::ready(Ok(Default::default()));
-            }
-            let workers = background.num_cpus().min(path_count);
-            let (matching_paths_tx, mut matching_paths_rx) = smol::channel::bounded(1024);
-            cx.background()
-                .spawn({
-                    let fs = self.fs.clone();
-                    let background = cx.background().clone();
-                    let query = query.clone();
-                    async move {
-                        let fs = &fs;
-                        let query = &query;
-                        let matching_paths_tx = &matching_paths_tx;
-                        let paths_per_worker = (path_count + workers - 1) / workers;
-                        let snapshots = &snapshots;
-                        background
-                            .scoped(|scope| {
-                                for worker_ix in 0..workers {
-                                    let worker_start_ix = worker_ix * paths_per_worker;
-                                    let worker_end_ix = worker_start_ix + paths_per_worker;
-                                    scope.spawn(async move {
-                                        let mut snapshot_start_ix = 0;
-                                        let mut abs_path = PathBuf::new();
-                                        for snapshot in snapshots {
-                                            let snapshot_end_ix =
-                                                snapshot_start_ix + snapshot.visible_file_count();
-                                            if worker_end_ix <= snapshot_start_ix {
-                                                break;
-                                            } else if worker_start_ix > snapshot_end_ix {
-                                                snapshot_start_ix = snapshot_end_ix;
-                                                continue;
-                                            } else {
-                                                let start_in_snapshot = worker_start_ix
-                                                    .saturating_sub(snapshot_start_ix);
-                                                let end_in_snapshot =
-                                                    cmp::min(worker_end_ix, snapshot_end_ix)
-                                                        - snapshot_start_ix;
-
-                                                for entry in snapshot
-                                                    .files(false, start_in_snapshot)
-                                                    .take(end_in_snapshot - start_in_snapshot)
-                                                {
-                                                    if matching_paths_tx.is_closed() {
-                                                        break;
-                                                    }
-                                                    let matches = if query
-                                                        .file_matches(Some(&entry.path))
-                                                    {
-                                                        abs_path.clear();
-                                                        abs_path.push(&snapshot.abs_path());
-                                                        abs_path.push(&entry.path);
-                                                        if let Some(file) =
-                                                            fs.open_sync(&abs_path).await.log_err()
-                                                        {
-                                                            query.detect(file).unwrap_or(false)
-                                                        } else {
-                                                            false
-                                                        }
-                                                    } else {
-                                                        false
-                                                    };
-
-                                                    if matches {
-                                                        let project_path =
-                                                            (snapshot.id(), entry.path.clone());
-                                                        if matching_paths_tx
-                                                            .send(project_path)
-                                                            .await
-                                                            .is_err()
-                                                        {
-                                                            break;
-                                                        }
-                                                    }
-                                                }
-
-                                                snapshot_start_ix = snapshot_end_ix;
-                                            }
-                                        }
-                                    });
-                                }
-                            })
-                            .await;
-                    }
-                })
-                .detach();
-
-            let (buffers_tx, buffers_rx) = smol::channel::bounded(1024);
-            let open_buffers = self
-                .opened_buffers
-                .values()
-                .filter_map(|b| b.upgrade(cx))
-                .collect::<HashSet<_>>();
-            cx.spawn(|this, cx| async move {
-                for buffer in &open_buffers {
-                    let snapshot = buffer.read_with(&cx, |buffer, _| buffer.snapshot());
-                    buffers_tx.send((buffer.clone(), snapshot)).await?;
-                }
-
-                let open_buffers = Rc::new(RefCell::new(open_buffers));
-                while let Some(project_path) = matching_paths_rx.next().await {
-                    if buffers_tx.is_closed() {
-                        break;
-                    }
-
-                    let this = this.clone();
-                    let open_buffers = open_buffers.clone();
-                    let buffers_tx = buffers_tx.clone();
-                    cx.spawn(|mut cx| async move {
-                        if let Some(buffer) = this
-                            .update(&mut cx, |this, cx| this.open_buffer(project_path, cx))
-                            .await
-                            .log_err()
-                        {
-                            if open_buffers.borrow_mut().insert(buffer.clone()) {
-                                let snapshot = buffer.read_with(&cx, |buffer, _| buffer.snapshot());
-                                buffers_tx.send((buffer, snapshot)).await?;
-                            }
-                        }
-
-                        Ok::<_, anyhow::Error>(())
-                    })
-                    .detach();
-                }
-
-                Ok::<_, anyhow::Error>(())
-            })
-            .detach_and_log_err(cx);
-
-            let background = cx.background().clone();
-            cx.background().spawn(async move {
-                let query = &query;
-                let mut matched_buffers = Vec::new();
-                for _ in 0..workers {
-                    matched_buffers.push(HashMap::default());
-                }
-                background
-                    .scoped(|scope| {
-                        for worker_matched_buffers in matched_buffers.iter_mut() {
-                            let mut buffers_rx = buffers_rx.clone();
-                            scope.spawn(async move {
-                                while let Some((buffer, snapshot)) = buffers_rx.next().await {
-                                    let buffer_matches = if query.file_matches(
-                                        snapshot.file().map(|file| file.path().as_ref()),
-                                    ) {
-                                        query
-                                            .search(&snapshot, None)
-                                            .await
-                                            .iter()
-                                            .map(|range| {
-                                                snapshot.anchor_before(range.start)
-                                                    ..snapshot.anchor_after(range.end)
-                                            })
-                                            .collect()
-                                    } else {
-                                        Vec::new()
-                                    };
-                                    if !buffer_matches.is_empty() {
-                                        worker_matched_buffers
-                                            .insert(buffer.clone(), buffer_matches);
-                                    }
-                                }
-                            });
-                        }
-                    })
-                    .await;
-                Ok(matched_buffers.into_iter().flatten().collect())
-            })
+            self.search_local(query, cx)
         } else if let Some(project_id) = self.remote_id() {
+            let (tx, rx) = smol::channel::unbounded();
             let request = self.client.request(query.to_proto(project_id));
             cx.spawn(|this, mut cx| async move {
                 let response = request.await?;
@@ -5303,13 +5148,303 @@ impl Project {
                         .or_insert(Vec::new())
                         .push(start..end)
                 }
-                Ok(result)
+                for (buffer, ranges) in result {
+                    let _ = tx.send((buffer, ranges)).await;
+                }
+                Result::<(), anyhow::Error>::Ok(())
             })
+            .detach_and_log_err(cx);
+            rx
         } else {
-            Task::ready(Ok(Default::default()))
+            unimplemented!();
         }
     }
 
+    pub fn search_local(
+        &self,
+        query: SearchQuery,
+        cx: &mut ModelContext<Self>,
+    ) -> Receiver<(ModelHandle<Buffer>, Vec<Range<Anchor>>)> {
+        // Local search is split into several phases.
+        // TL;DR is that we do 2 passes; initial pass to pick files which contain at least one match
+        // and the second phase that finds positions of all the matches found in the candidate files.
+        // The Receiver obtained from this function returns matches sorted by buffer path. Files without a buffer path are reported first.
+        //
+        // It gets a bit hairy though, because we must account for files that do not have a persistent representation
+        // on FS. Namely, if you have an untitled buffer or unsaved changes in a buffer, we want to scan that too.
+        //
+        // 1. We initialize a queue of match candidates and feed all opened buffers into it (== unsaved files / untitled buffers).
+        //    Then, we go through a worktree and check for files that do match a predicate. If the file had an opened version, we skip the scan
+        //    of FS version for that file altogether - after all, what we have in memory is more up-to-date than what's in FS.
+        // 2. At this point, we have a list of all potentially matching buffers/files.
+        //    We sort that list by buffer path - this list is retained for later use.
+        //    We ensure that all buffers are now opened and available in project.
+        // 3. We run a scan over all the candidate buffers on multiple background threads.
+        //    We cannot assume that there will even be a match - while at least one match
+        //    is guaranteed for files obtained from FS, the buffers we got from memory (unsaved files/unnamed buffers) might not have a match at all.
+        //    There is also an auxilliary background thread responsible for result gathering.
+        //    This is where the sorted list of buffers comes into play to maintain sorted order; Whenever this background thread receives a notification (buffer has/doesn't have matches),
+        //    it keeps it around. It reports matches in sorted order, though it accepts them in unsorted order as well.
+        //    As soon as the match info on next position in sorted order becomes available, it reports it (if it's a match) or skips to the next
+        //    entry - which might already be available thanks to out-of-order processing.
+        //
+        // We could also report matches fully out-of-order, without maintaining a sorted list of matching paths.
+        // This however would mean that project search (that is the main user of this function) would have to do the sorting itself, on the go.
+        // This isn't as straightforward as running an insertion sort sadly, and would also mean that it would have to care about maintaining match index
+        // in face of constantly updating list of sorted matches.
+        // Meanwhile, this implementation offers index stability, since the matches are already reported in a sorted order.
+        let snapshots = self
+            .visible_worktrees(cx)
+            .filter_map(|tree| {
+                let tree = tree.read(cx).as_local()?;
+                Some(tree.snapshot())
+            })
+            .collect::<Vec<_>>();
+
+        let background = cx.background().clone();
+        let path_count: usize = snapshots.iter().map(|s| s.visible_file_count()).sum();
+        if path_count == 0 {
+            let (_, rx) = smol::channel::bounded(1024);
+            return rx;
+        }
+        let workers = background.num_cpus().min(path_count);
+        let (matching_paths_tx, matching_paths_rx) = smol::channel::bounded(1024);
+        let mut unnamed_files = vec![];
+        let opened_buffers = self
+            .opened_buffers
+            .iter()
+            .filter_map(|(_, b)| {
+                let buffer = b.upgrade(cx)?;
+                let snapshot = buffer.read_with(cx, |buffer, _| buffer.snapshot());
+                if let Some(path) = snapshot.file().map(|file| file.path()) {
+                    Some((path.clone(), (buffer, snapshot)))
+                } else {
+                    unnamed_files.push(buffer);
+                    None
+                }
+            })
+            .collect();
+        cx.background()
+            .spawn(Self::background_search(
+                unnamed_files,
+                opened_buffers,
+                cx.background().clone(),
+                self.fs.clone(),
+                workers,
+                query.clone(),
+                path_count,
+                snapshots,
+                matching_paths_tx,
+            ))
+            .detach();
+
+        let (buffers, buffers_rx) = Self::sort_candidates_and_open_buffers(matching_paths_rx, cx);
+        let background = cx.background().clone();
+        let (result_tx, result_rx) = smol::channel::bounded(1024);
+        cx.background()
+            .spawn(async move {
+                let Ok(buffers) = buffers.await else {
+                    return;
+                };
+
+                let buffers_len = buffers.len();
+                if buffers_len == 0 {
+                    return;
+                }
+                let query = &query;
+                let (finished_tx, mut finished_rx) = smol::channel::unbounded();
+                background
+                    .scoped(|scope| {
+                        #[derive(Clone)]
+                        struct FinishedStatus {
+                            entry: Option<(ModelHandle<Buffer>, Vec<Range<Anchor>>)>,
+                            buffer_index: SearchMatchCandidateIndex,
+                        }
+
+                        for _ in 0..workers {
+                            let finished_tx = finished_tx.clone();
+                            let mut buffers_rx = buffers_rx.clone();
+                            scope.spawn(async move {
+                                while let Some((entry, buffer_index)) = buffers_rx.next().await {
+                                    let buffer_matches = if let Some((_, snapshot)) = entry.as_ref()
+                                    {
+                                        if query.file_matches(
+                                            snapshot.file().map(|file| file.path().as_ref()),
+                                        ) {
+                                            query
+                                                .search(&snapshot, None)
+                                                .await
+                                                .iter()
+                                                .map(|range| {
+                                                    snapshot.anchor_before(range.start)
+                                                        ..snapshot.anchor_after(range.end)
+                                                })
+                                                .collect()
+                                        } else {
+                                            Vec::new()
+                                        }
+                                    } else {
+                                        Vec::new()
+                                    };
+
+                                    let status = if !buffer_matches.is_empty() {
+                                        let entry = if let Some((buffer, _)) = entry.as_ref() {
+                                            Some((buffer.clone(), buffer_matches))
+                                        } else {
+                                            None
+                                        };
+                                        FinishedStatus {
+                                            entry,
+                                            buffer_index,
+                                        }
+                                    } else {
+                                        FinishedStatus {
+                                            entry: None,
+                                            buffer_index,
+                                        }
+                                    };
+                                    if finished_tx.send(status).await.is_err() {
+                                        break;
+                                    }
+                                }
+                            });
+                        }
+                        // Report sorted matches
+                        scope.spawn(async move {
+                            let mut current_index = 0;
+                            let mut scratch = vec![None; buffers_len];
+                            while let Some(status) = finished_rx.next().await {
+                                debug_assert!(
+                                    scratch[status.buffer_index].is_none(),
+                                    "Got match status of position {} twice",
+                                    status.buffer_index
+                                );
+                                let index = status.buffer_index;
+                                scratch[index] = Some(status);
+                                while current_index < buffers_len {
+                                    let Some(current_entry) = scratch[current_index].take() else {
+                                        // We intentionally **do not** increment `current_index` here. When next element arrives
+                                        // from `finished_rx`, we will inspect the same position again, hoping for it to be Some(_)
+                                        // this time.
+                                        break;
+                                    };
+                                    if let Some(entry) = current_entry.entry {
+                                        result_tx.send(entry).await.log_err();
+                                    }
+                                    current_index += 1;
+                                }
+                                if current_index == buffers_len {
+                                    break;
+                                }
+                            }
+                        });
+                    })
+                    .await;
+            })
+            .detach();
+        result_rx
+    }
+    /// Pick paths that might potentially contain a match of a given search query.
+    async fn background_search(
+        unnamed_buffers: Vec<ModelHandle<Buffer>>,
+        opened_buffers: HashMap<Arc<Path>, (ModelHandle<Buffer>, BufferSnapshot)>,
+        background: Arc<Background>,
+        fs: Arc<dyn Fs>,
+        workers: usize,
+        query: SearchQuery,
+        path_count: usize,
+        snapshots: Vec<LocalSnapshot>,
+        matching_paths_tx: Sender<SearchMatchCandidate>,
+    ) {
+        let fs = &fs;
+        let query = &query;
+        let matching_paths_tx = &matching_paths_tx;
+        let snapshots = &snapshots;
+        let paths_per_worker = (path_count + workers - 1) / workers;
+        for buffer in unnamed_buffers {
+            matching_paths_tx
+                .send(SearchMatchCandidate::OpenBuffer {
+                    buffer: buffer.clone(),
+                    path: None,
+                })
+                .await
+                .log_err();
+        }
+        for (path, (buffer, _)) in opened_buffers.iter() {
+            matching_paths_tx
+                .send(SearchMatchCandidate::OpenBuffer {
+                    buffer: buffer.clone(),
+                    path: Some(path.clone()),
+                })
+                .await
+                .log_err();
+        }
+        background
+            .scoped(|scope| {
+                for worker_ix in 0..workers {
+                    let worker_start_ix = worker_ix * paths_per_worker;
+                    let worker_end_ix = worker_start_ix + paths_per_worker;
+                    let unnamed_buffers = opened_buffers.clone();
+                    scope.spawn(async move {
+                        let mut snapshot_start_ix = 0;
+                        let mut abs_path = PathBuf::new();
+                        for snapshot in snapshots {
+                            let snapshot_end_ix = snapshot_start_ix + snapshot.visible_file_count();
+                            if worker_end_ix <= snapshot_start_ix {
+                                break;
+                            } else if worker_start_ix > snapshot_end_ix {
+                                snapshot_start_ix = snapshot_end_ix;
+                                continue;
+                            } else {
+                                let start_in_snapshot =
+                                    worker_start_ix.saturating_sub(snapshot_start_ix);
+                                let end_in_snapshot =
+                                    cmp::min(worker_end_ix, snapshot_end_ix) - snapshot_start_ix;
+
+                                for entry in snapshot
+                                    .files(false, start_in_snapshot)
+                                    .take(end_in_snapshot - start_in_snapshot)
+                                {
+                                    if matching_paths_tx.is_closed() {
+                                        break;
+                                    }
+                                    if unnamed_buffers.contains_key(&entry.path) {
+                                        continue;
+                                    }
+                                    let matches = if query.file_matches(Some(&entry.path)) {
+                                        abs_path.clear();
+                                        abs_path.push(&snapshot.abs_path());
+                                        abs_path.push(&entry.path);
+                                        if let Some(file) = fs.open_sync(&abs_path).await.log_err()
+                                        {
+                                            query.detect(file).unwrap_or(false)
+                                        } else {
+                                            false
+                                        }
+                                    } else {
+                                        false
+                                    };
+
+                                    if matches {
+                                        let project_path = SearchMatchCandidate::Path {
+                                            worktree_id: snapshot.id(),
+                                            path: entry.path.clone(),
+                                        };
+                                        if matching_paths_tx.send(project_path).await.is_err() {
+                                            break;
+                                        }
+                                    }
+                                }
+
+                                snapshot_start_ix = snapshot_end_ix;
+                            }
+                        }
+                    });
+                }
+            })
+            .await;
+    }
+
     // TODO: Wire this up to allow selecting a server?
     fn request_lsp<R: LspCommand>(
         &self,
@@ -5384,6 +5519,61 @@ impl Project {
         Task::ready(Ok(Default::default()))
     }
 
+    fn sort_candidates_and_open_buffers(
+        mut matching_paths_rx: Receiver<SearchMatchCandidate>,
+        cx: &mut ModelContext<Self>,
+    ) -> (
+        futures::channel::oneshot::Receiver<Vec<SearchMatchCandidate>>,
+        Receiver<(
+            Option<(ModelHandle<Buffer>, BufferSnapshot)>,
+            SearchMatchCandidateIndex,
+        )>,
+    ) {
+        let (buffers_tx, buffers_rx) = smol::channel::bounded(1024);
+        let (sorted_buffers_tx, sorted_buffers_rx) = futures::channel::oneshot::channel();
+        cx.spawn(|this, cx| async move {
+            let mut buffers = vec![];
+            while let Some(entry) = matching_paths_rx.next().await {
+                buffers.push(entry);
+            }
+            buffers.sort_by_key(|candidate| candidate.path());
+            let matching_paths = buffers.clone();
+            let _ = sorted_buffers_tx.send(buffers);
+            for (index, candidate) in matching_paths.into_iter().enumerate() {
+                if buffers_tx.is_closed() {
+                    break;
+                }
+                let this = this.clone();
+                let buffers_tx = buffers_tx.clone();
+                cx.spawn(|mut cx| async move {
+                    let buffer = match candidate {
+                        SearchMatchCandidate::OpenBuffer { buffer, .. } => Some(buffer),
+                        SearchMatchCandidate::Path { worktree_id, path } => this
+                            .update(&mut cx, |this, cx| {
+                                this.open_buffer((worktree_id, path), cx)
+                            })
+                            .await
+                            .log_err(),
+                    };
+                    if let Some(buffer) = buffer {
+                        let snapshot = buffer.read_with(&cx, |buffer, _| buffer.snapshot());
+                        buffers_tx
+                            .send((Some((buffer, snapshot)), index))
+                            .await
+                            .log_err();
+                    } else {
+                        buffers_tx.send((None, index)).await.log_err();
+                    }
+
+                    Ok::<_, anyhow::Error>(())
+                })
+                .detach();
+            }
+        })
+        .detach();
+        (sorted_buffers_rx, buffers_rx)
+    }
+
     pub fn find_or_create_local_worktree(
         &mut self,
         abs_path: impl AsRef<Path>,
@@ -7006,17 +7196,17 @@ impl Project {
     ) -> Result<proto::SearchProjectResponse> {
         let peer_id = envelope.original_sender_id()?;
         let query = SearchQuery::from_proto(envelope.payload)?;
-        let result = this
-            .update(&mut cx, |this, cx| this.search(query, cx))
-            .await?;
+        let mut result = this.update(&mut cx, |this, cx| this.search(query, cx));
 
-        this.update(&mut cx, |this, cx| {
+        cx.spawn(|mut cx| async move {
             let mut locations = Vec::new();
-            for (buffer, ranges) in result {
+            while let Some((buffer, ranges)) = result.next().await {
                 for range in ranges {
                     let start = serialize_anchor(&range.start);
                     let end = serialize_anchor(&range.end);
-                    let buffer_id = this.create_buffer_for_peer(&buffer, peer_id, cx);
+                    let buffer_id = this.update(&mut cx, |this, cx| {
+                        this.create_buffer_for_peer(&buffer, peer_id, cx)
+                    });
                     locations.push(proto::Location {
                         buffer_id,
                         start: Some(start),
@@ -7026,6 +7216,7 @@ impl Project {
             }
             Ok(proto::SearchProjectResponse { locations })
         })
+        .await
     }
 
     async fn handle_open_buffer_for_symbol(

crates/project/src/project_tests.rs 🔗

@@ -3953,11 +3953,12 @@ async fn search(
     query: SearchQuery,
     cx: &mut gpui::TestAppContext,
 ) -> Result<HashMap<String, Vec<Range<usize>>>> {
-    let results = project
-        .update(cx, |project, cx| project.search(query, cx))
-        .await?;
-
-    Ok(results
+    let mut search_rx = project.update(cx, |project, cx| project.search(query, cx));
+    let mut result = HashMap::default();
+    while let Some((buffer, range)) = search_rx.next().await {
+        result.entry(buffer).or_insert(range);
+    }
+    Ok(result
         .into_iter()
         .map(|(buffer, ranges)| {
             buffer.read_with(cx, |buffer, _| {

crates/search/src/project_search.rs 🔗

@@ -185,28 +185,26 @@ impl ProjectSearch {
         self.active_query = Some(query);
         self.match_ranges.clear();
         self.pending_search = Some(cx.spawn_weak(|this, mut cx| async move {
-            let matches = search.await.log_err()?;
+            let mut matches = search;
             let this = this.upgrade(&cx)?;
-            let mut matches = matches.into_iter().collect::<Vec<_>>();
-            let (_task, mut match_ranges) = this.update(&mut cx, |this, cx| {
+            this.update(&mut cx, |this, cx| {
                 this.match_ranges.clear();
+                this.excerpts.update(cx, |this, cx| this.clear(cx));
                 this.no_results = Some(true);
-                matches.sort_by_key(|(buffer, _)| buffer.read(cx).file().map(|file| file.path()));
-                this.excerpts.update(cx, |excerpts, cx| {
-                    excerpts.clear(cx);
-                    excerpts.stream_excerpts_with_context_lines(matches, 1, cx)
-                })
             });
 
-            while let Some(match_range) = match_ranges.next().await {
-                this.update(&mut cx, |this, cx| {
-                    this.match_ranges.push(match_range);
-                    while let Ok(Some(match_range)) = match_ranges.try_next() {
-                        this.match_ranges.push(match_range);
-                    }
+            while let Some((buffer, anchors)) = matches.next().await {
+                let mut ranges = this.update(&mut cx, |this, cx| {
                     this.no_results = Some(false);
-                    cx.notify();
+                    this.excerpts.update(cx, |excerpts, cx| {
+                        excerpts.stream_excerpts_with_context_lines(buffer, anchors, 1, cx)
+                    })
                 });
+
+                while let Some(range) = ranges.next().await {
+                    this.update(&mut cx, |this, _| this.match_ranges.push(range));
+                }
+                this.update(&mut cx, |_, cx| cx.notify());
             }
 
             this.update(&mut cx, |this, cx| {
@@ -238,29 +236,31 @@ impl ProjectSearch {
         self.no_results = Some(true);
         self.pending_search = Some(cx.spawn(|this, mut cx| async move {
             let results = search?.await.log_err()?;
+            let matches = results
+                .into_iter()
+                .map(|result| (result.buffer, vec![result.range.start..result.range.start]));
 
-            let (_task, mut match_ranges) = this.update(&mut cx, |this, cx| {
+            this.update(&mut cx, |this, cx| {
                 this.excerpts.update(cx, |excerpts, cx| {
                     excerpts.clear(cx);
-
-                    let matches = results
-                        .into_iter()
-                        .map(|result| (result.buffer, vec![result.range.start..result.range.start]))
-                        .collect();
-
-                    excerpts.stream_excerpts_with_context_lines(matches, 3, cx)
                 })
             });
-
-            while let Some(match_range) = match_ranges.next().await {
-                this.update(&mut cx, |this, cx| {
-                    this.match_ranges.push(match_range);
-                    while let Ok(Some(match_range)) = match_ranges.try_next() {
-                        this.match_ranges.push(match_range);
-                    }
+            for (buffer, ranges) in matches {
+                let mut match_ranges = this.update(&mut cx, |this, cx| {
                     this.no_results = Some(false);
-                    cx.notify();
+                    this.excerpts.update(cx, |excerpts, cx| {
+                        excerpts.stream_excerpts_with_context_lines(buffer, ranges, 3, cx)
+                    })
                 });
+                while let Some(match_range) = match_ranges.next().await {
+                    this.update(&mut cx, |this, cx| {
+                        this.match_ranges.push(match_range);
+                        while let Ok(Some(match_range)) = match_ranges.try_next() {
+                            this.match_ranges.push(match_range);
+                        }
+                        cx.notify();
+                    });
+                }
             }
 
             this.update(&mut cx, |this, cx| {