@@ -8,6 +8,7 @@ use std::{
},
};
+use anyhow::Context;
use collections::HashSet;
use fs::Fs;
use futures::{SinkExt, StreamExt, select_biased};
@@ -17,7 +18,7 @@ use postage::oneshot;
use smol::channel::{Receiver, Sender, bounded, unbounded};
use util::{ResultExt, maybe};
-use worktree::{Entry, ProjectEntryId, Snapshot, WorktreeSettings};
+use worktree::{Entry, ProjectEntryId, Snapshot, Worktree};
use crate::{
ProjectItem, ProjectPath,
@@ -28,7 +29,7 @@ use crate::{
pub(crate) struct Search {
pub(crate) fs: Arc<dyn Fs>,
pub(crate) buffer_store: Entity<BufferStore>,
- pub(crate) snapshots: Vec<(Snapshot, WorktreeSettings)>,
+ pub(crate) worktrees: Vec<Entity<Worktree>>,
}
const MAX_SEARCH_RESULT_FILES: usize = 5_000;
@@ -57,7 +58,7 @@ impl SearchResultsHandle {
impl Search {
/// Prepares a project search run. The result has to be used to specify whether you're interested in matching buffers
/// or full search results.
- pub(crate) fn into_results(self, query: SearchQuery, cx: &mut App) -> SearchResultsHandle {
+ pub(crate) fn into_results(mut self, query: SearchQuery, cx: &mut App) -> SearchResultsHandle {
let mut open_buffers = HashSet::default();
let mut unnamed_buffers = Vec::new();
const MAX_CONCURRENT_BUFFER_OPENS: usize = 64;
@@ -89,11 +90,12 @@ impl Search {
let (get_buffer_for_full_scan_tx, get_buffer_for_full_scan_rx) = unbounded();
let matches_count = AtomicUsize::new(0);
let matched_buffer_count = AtomicUsize::new(0);
+ let (input_paths_tx, input_paths_rx) = unbounded();
+ let (sorted_search_results_tx, sorted_search_results_rx) = unbounded();
let worker_pool = executor.scoped(|scope| {
- let (input_paths_tx, input_paths_rx) = unbounded();
let (confirm_contents_will_match_tx, confirm_contents_will_match_rx) =
bounded(64);
- let (sorted_search_results_tx, sorted_search_results_rx) = unbounded();
+
let num_cpus = executor.num_cpus();
assert!(num_cpus > 0);
@@ -114,16 +116,18 @@ impl Search {
scope.spawn(worker.run());
}
drop(tx);
- scope.spawn(self.provide_search_paths(
- &query,
- input_paths_tx,
- sorted_search_results_tx,
- ));
- scope.spawn(self.maintain_sorted_search_results(
+
+ scope.spawn(Self::maintain_sorted_search_results(
sorted_search_results_rx,
get_buffer_for_full_scan_tx,
))
});
+ let provide_search_paths = cx.spawn(Self::provide_search_paths(
+ std::mem::take(&mut self.worktrees),
+ query.include_ignored(),
+ input_paths_tx,
+ sorted_search_results_tx,
+ ));
let open_buffers = self.open_buffers(
get_buffer_for_full_scan_rx,
grab_buffer_snapshot_tx,
@@ -134,7 +138,13 @@ impl Search {
find_all_matches_tx,
cx.clone(),
);
- futures::future::join3(worker_pool, buffer_snapshots, open_buffers).await;
+ futures::future::join4(
+ worker_pool,
+ buffer_snapshots,
+ open_buffers,
+ provide_search_paths,
+ )
+ .await;
})
});
SearchResultsHandle {
@@ -144,35 +154,73 @@ impl Search {
}
}
- async fn provide_search_paths<'this>(
- &'this self,
- query: &SearchQuery,
- tx: Sender<InputPath<'this>>,
+ fn provide_search_paths(
+ worktrees: Vec<Entity<Worktree>>,
+ include_ignored: bool,
+ tx: Sender<InputPath>,
results: Sender<oneshot::Receiver<ProjectPath>>,
- ) {
- for (snapshot, worktree_settings) in &self.snapshots {
- for entry in snapshot.entries(query.include_ignored(), 0) {
- let (should_scan_tx, should_scan_rx) = oneshot::channel();
- let Ok(_) = tx
- .send(InputPath {
- entry,
- settings: worktree_settings,
- snapshot: snapshot,
- should_scan_tx,
- })
- .await
- else {
- return;
- };
- if results.send(should_scan_rx).await.is_err() {
- return;
- };
- }
+ ) -> impl AsyncFnOnce(&mut AsyncApp) {
+ async move |cx| {
+ _ = maybe!(async move {
+ for worktree in worktrees {
+ let (mut snapshot, worktree_settings) = worktree
+ .read_with(cx, |this, _| {
+ Some((this.snapshot(), this.as_local()?.settings()))
+ })?
+ .context("The worktree is not local")?;
+ if include_ignored {
+ // Pre-fetch all of the ignored directories as they're going to be searched.
+ let mut entries_to_refresh = vec![];
+ for entry in snapshot.entries(include_ignored, 0) {
+ if entry.is_ignored && entry.kind.is_unloaded() {
+ if !worktree_settings.is_path_excluded(&entry.path) {
+ entries_to_refresh.push(entry.path.clone());
+ }
+ }
+ }
+ let barrier = worktree.update(cx, |this, _| {
+ let local = this.as_local_mut()?;
+ let barrier = entries_to_refresh
+ .into_iter()
+ .map(|path| local.add_path_prefix_to_scan(path).into_future())
+ .collect::<Vec<_>>();
+ Some(barrier)
+ })?;
+ if let Some(barriers) = barrier {
+ futures::future::join_all(barriers).await;
+ }
+ snapshot = worktree.read_with(cx, |this, _| this.snapshot())?;
+ }
+ cx.background_executor()
+ .scoped(|scope| {
+ scope.spawn(async {
+ for entry in snapshot.files(include_ignored, 0) {
+ let (should_scan_tx, should_scan_rx) = oneshot::channel();
+ let Ok(_) = tx
+ .send(InputPath {
+ entry: entry.clone(),
+ snapshot: snapshot.clone(),
+ should_scan_tx,
+ })
+ .await
+ else {
+ return;
+ };
+ if results.send(should_scan_rx).await.is_err() {
+ return;
+ };
+ }
+ })
+ })
+ .await;
+ }
+ anyhow::Ok(())
+ })
+ .await;
}
}
async fn maintain_sorted_search_results(
- &self,
rx: Receiver<oneshot::Receiver<ProjectPath>>,
paths_for_full_scan: Sender<ProjectPath>,
) {
@@ -241,7 +289,7 @@ struct Worker<'search> {
/// - Only open buffers
/// - Scan ignored files
/// Put another way: filter out files that can't match (without looking at file contents)
- input_paths_rx: Receiver<InputPath<'search>>,
+ input_paths_rx: Receiver<InputPath>,
/// After that, if the buffer is not yet loaded, we'll figure out if it contains at least one match
/// based on disk contents of a buffer. This step is not performed for buffers we already have in memory.
@@ -391,28 +439,14 @@ impl RequestHandler<'_> {
}).await;
}
- async fn handle_scan_path(&self, req: InputPath<'_>) {
+ async fn handle_scan_path(&self, req: InputPath) {
_ = maybe!(async move {
let InputPath {
entry,
- settings,
+
snapshot,
should_scan_tx,
} = req;
- if entry.is_dir() && entry.is_ignored {
- if !settings.is_path_excluded(&entry.path) {
- // Self::scan_ignored_dir(
- // self.fs,
- // &snapshot,
- // &entry.path,
- // self.query,
- // &filter_tx,
- // &output_tx,
- // )
- // .await?;
- }
- return Ok(());
- }
if entry.is_fifo || !entry.is_file() {
return Ok(());
@@ -459,10 +493,9 @@ impl RequestHandler<'_> {
}
}
-struct InputPath<'worker> {
- entry: &'worker Entry,
- settings: &'worker WorktreeSettings,
- snapshot: &'worker Snapshot,
+struct InputPath {
+ entry: Entry,
+ snapshot: Snapshot,
should_scan_tx: oneshot::Sender<ProjectPath>,
}