diff --git a/crates/project/src/project_search.rs b/crates/project/src/project_search.rs index 1e33a9359bb1c98d2e0a91bb4cf45006c5b0c333..80c9adfc8d656091aa2cfee3a845124f28f123c0 100644 --- a/crates/project/src/project_search.rs +++ b/crates/project/src/project_search.rs @@ -1,8 +1,7 @@ use std::{ io::{BufRead, BufReader}, - ops::{ControlFlow, Range}, path::Path, - pin::{Pin, pin}, + pin::pin, sync::{ Arc, atomic::{AtomicUsize, Ordering}, @@ -11,12 +10,12 @@ use std::{ use collections::HashSet; use fs::Fs; -use futures::{FutureExt, SinkExt, StreamExt, future::BoxFuture, select_biased}; +use futures::{SinkExt, StreamExt, select_biased}; use gpui::{App, AsyncApp, Entity, WeakEntity}; use language::{Buffer, BufferSnapshot}; use postage::oneshot; use smol::channel::{Receiver, Sender, bounded, unbounded}; -use text::Anchor; + use util::{ResultExt, maybe}; use worktree::{Entry, ProjectEntryId, Snapshot, WorktreeSettings}; @@ -30,6 +29,7 @@ pub(crate) struct ProjectSearcher { fs: Arc, buffer_store: WeakEntity, pub(crate) snapshots: Vec<(Snapshot, WorktreeSettings)>, + open_buffers: HashSet, } const MAX_SEARCH_RESULT_FILES: usize = 5_000; @@ -49,10 +49,11 @@ impl ProjectSearcher { let worker_pool = executor.scoped(|scope| { let (input_paths_tx, input_paths_rx) = bounded(64); let (find_first_match_tx, find_first_match_rx) = bounded(64); - + let (sorted_search_results_tx, sorted_search_results_rx) = bounded(64); for _ in 0..executor.num_cpus() { let worker = Worker { query: &query, + open_buffers: &self.open_buffers, matched_buffer_count: &matched_buffer_count, matches_count: &matches_count, fs: &*self.fs, @@ -65,7 +66,12 @@ impl ProjectSearcher { }; scope.spawn(worker.run()); } - scope.spawn(self.provide_search_paths(&query, input_paths_tx)) + scope.spawn(self.provide_search_paths( + &query, + input_paths_tx, + sorted_search_results_tx, + )); + scope.spawn(self.maintain_sorted_search_results()) }); self.open_buffers(get_buffer_for_full_scan_rx, find_all_matches_tx, cx) .await; @@ -84,23 +90,42 @@ impl ProjectSearcher { &'this self, query: &SearchQuery, tx: Sender>, + results: Sender>, ) { for (snapshot, worktree_settings) in &self.snapshots { for entry in snapshot.entries(query.include_ignored(), 0) { + let (should_scan_tx, should_scan_rx) = oneshot::channel(); let Ok(_) = tx .send(InputPath { entry, settings: worktree_settings, snapshot: snapshot, + should_scan_tx, }) .await else { return; }; + results.send(should_scan_rx).await; } } } + async fn maintain_sorted_search_results( + &self, + rx: Receiver>, + paths_for_full_scan: Sender, + ) { + let mut rx = pin!(rx); + while let Some(mut next_path_result) = rx.next().await { + let Some(successful_path) = next_path_result.next().await else { + // This math did not produce a match, hence skip it. + continue; + }; + paths_for_full_scan.send(successful_path).await; + } + } + /// Background workers cannot open buffers by themselves, hence main thread will do it on their behalf. async fn open_buffers<'a>( &'a self, @@ -131,6 +156,7 @@ struct Worker<'search> { query: &'search SearchQuery, matched_buffer_count: &'search AtomicUsize, matches_count: &'search AtomicUsize, + open_buffers: &'search HashSet, fs: &'search dyn Fs, /// Start off with all paths in project and filter them based on: /// - Include filters @@ -158,6 +184,7 @@ impl Worker<'_> { let mut scan_path = pin!(self.input_paths_rx.fuse()); let handler = RequestHandler { query: self.query, + open_entries: &self.open_buffers, fs: self.fs, matched_buffer_count: self.matched_buffer_count, matches_count: self.matches_count, @@ -169,7 +196,7 @@ impl Worker<'_> { select_biased! { find_all_matches = find_all_matches.next() => { let result = handler.handle_find_all_matches(find_all_matches).await; - if let Some(should_bail) = result { + if let Some(_should_bail) = result { return; } }, @@ -258,75 +285,81 @@ impl RequestHandler<'_> { } if self.query.detect(file).unwrap_or(false) { - entry.respond.send(entry.path).await?; + // Yes, we should scan the whole file. + entry.should_scan_tx.send(entry.path).await?; } Ok(()) }).await; } async fn handle_scan_path(&self, req: InputPath<'_>) { - let InputPath { - entry, - settings, - snapshot, - } = req; - if entry.is_dir() && entry.is_ignored { - if !settings.is_path_excluded(&entry.path) { - Self::scan_ignored_dir( - self.fs, - &snapshot, - &entry.path, - self.query, - &filter_tx, - &output_tx, - ) - .await?; + _ = maybe!(async move { + let InputPath { + entry, + settings, + snapshot, + should_scan_tx, + } = req; + if entry.is_dir() && entry.is_ignored { + if !settings.is_path_excluded(&entry.path) { + Self::scan_ignored_dir( + self.fs, + &snapshot, + &entry.path, + self.query, + &filter_tx, + &output_tx, + ) + .await?; + } + return Ok(()); } - return; - } - if entry.is_fifo || !entry.is_file() { - return; - } + if entry.is_fifo || !entry.is_file() { + return Ok(()); + } - if self.query.filters_path() { - let matched_path = if self.query.match_full_paths() { - let mut full_path = snapshot.root_name().as_std_path().to_owned(); - full_path.push(entry.path.as_std_path()); - self.query.match_path(&full_path) - } else { - self.query.match_path(entry.path.as_std_path()) - }; - if !matched_path { - return; + if self.query.filters_path() { + let matched_path = if self.query.match_full_paths() { + let mut full_path = snapshot.root_name().as_std_path().to_owned(); + full_path.push(entry.path.as_std_path()); + self.query.match_path(&full_path) + } else { + self.query.match_path(entry.path.as_std_path()) + }; + if !matched_path { + return Ok(()); + } } - } - let (mut tx, rx) = oneshot::channel(); + let (mut tx, rx) = oneshot::channel(); - if self.open_entries.contains(&entry.id) { - // The buffer is already in memory and that's the version we want to scan; - // hence skip the dilly-dally and look for all matches straight away. - self.get_buffer_for_full_scan_tx - .send(ProjectPath { - worktree_id: snapshot.id(), - path: entry.path.clone(), - }) - .await?; - } else { - self.find_first_match_tx - .send(MatchingEntry { - respond: tx, - worktree_root: snapshot.abs_path().clone(), - path: ProjectPath { + if self.open_entries.contains(&entry.id) { + // The buffer is already in memory and that's the version we want to scan; + // hence skip the dilly-dally and look for all matches straight away. + self.get_buffer_for_full_scan_tx + .send(ProjectPath { worktree_id: snapshot.id(), path: entry.path.clone(), - }, - }) - .await?; - } + }) + .await?; + } else { + self.find_first_match_tx + .send(MatchingEntry { + should_scan_tx: should_scan_tx, + worktree_root: snapshot.abs_path().clone(), + path: ProjectPath { + worktree_id: snapshot.id(), + path: entry.path.clone(), + }, + }) + .await?; + } - output_tx.send(rx).await?; + output_tx.send(rx).await?; + anyhow::Ok(()) + }) + .await; } } @@ -334,10 +367,11 @@ struct InputPath<'worker> { entry: &'worker Entry, settings: &'worker WorktreeSettings, snapshot: &'worker Snapshot, + should_scan_tx: oneshot::Sender, } struct MatchingEntry { worktree_root: Arc, path: ProjectPath, - respond: oneshot::Sender, + should_scan_tx: oneshot::Sender, }