diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index 756dfa05318ee5c04941adeaf3a4a17f1860f6b8..a879e81fa74ed4ece8921f61cd3c0993092d59b1 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -41,6 +41,7 @@ use crate::{ agent_server_store::AllAgentServersSettings, git_store::GitStore, lsp_store::{SymbolLocation, log_store::LogKind}, + project_search::ProjectSearcher, }; pub use agent_server_store::{AgentServerStore, AgentServersUpdated}; pub use git_store::{ @@ -4005,61 +4006,20 @@ impl Project { } pub fn search(&mut self, query: SearchQuery, cx: &mut Context) -> Receiver { - let (result_tx, result_rx) = smol::channel::unbounded(); - - let matching_buffers_rx = if query.is_opened_only() { - self.sort_search_candidates(&query, cx) - } else { - self.find_search_candidate_buffers(&query, MAX_SEARCH_RESULT_FILES + 1, cx) + let snapshots = self + .visible_worktrees(cx) + .filter_map(|tree| { + let tree = tree.read(cx); + Some((tree.snapshot(), tree.as_local()?.settings())) + }) + .collect::>(); + let searcher = ProjectSearcher { + fs: self.fs.clone(), + buffer_store: self.buffer_store.downgrade(), + snapshots, + open_buffers: Default::default(), }; - - cx.spawn(async move |_, cx| { - let mut range_count = 0; - let mut buffer_count = 0; - let mut limit_reached = false; - let query = Arc::new(query); - let chunks = matching_buffers_rx.ready_chunks(64); - - // Now that we know what paths match the query, we will load at most - // 64 buffers at a time to avoid overwhelming the main thread. For each - // opened buffer, we will spawn a background task that retrieves all the - // ranges in the buffer matched by the query. - let mut chunks = pin!(chunks); - 'outer: while let Some(matching_buffer_chunk) = chunks.next().await { - let mut chunk_results = Vec::with_capacity(matching_buffer_chunk.len()); - for buffer in matching_buffer_chunk { - let query = query.clone(); - let snapshot = buffer.read_with(cx, |buffer, _| buffer.snapshot())?; - chunk_results.push(cx.background_spawn(async move {})); - } - - let chunk_results = futures::future::join_all(chunk_results).await; - for result in chunk_results { - if let Some((buffer, ranges)) = result.log_err() { - range_count += ranges.len(); - buffer_count += 1; - result_tx - .send(SearchResult::Buffer { buffer, ranges }) - .await?; - if buffer_count > MAX_SEARCH_RESULT_FILES - || range_count > MAX_SEARCH_RESULT_RANGES - { - limit_reached = true; - break 'outer; - } - } - } - } - - if limit_reached { - result_tx.send(SearchResult::LimitReached).await?; - } - - anyhow::Ok(()) - }) - .detach(); - - result_rx + searcher.run(query, cx) } fn find_search_candidate_buffers( diff --git a/crates/project/src/project_search.rs b/crates/project/src/project_search.rs index 80c9adfc8d656091aa2cfee3a845124f28f123c0..762de38e077895be25aaaaadb6719a16e383cf8a 100644 --- a/crates/project/src/project_search.rs +++ b/crates/project/src/project_search.rs @@ -26,17 +26,17 @@ use crate::{ }; pub(crate) struct ProjectSearcher { - fs: Arc, - buffer_store: WeakEntity, + pub(crate) fs: Arc, + pub(crate) buffer_store: WeakEntity, pub(crate) snapshots: Vec<(Snapshot, WorktreeSettings)>, - open_buffers: HashSet, + pub(crate) open_buffers: HashSet, } const MAX_SEARCH_RESULT_FILES: usize = 5_000; const MAX_SEARCH_RESULT_RANGES: usize = 10_000; impl ProjectSearcher { - pub(crate) fn search(self, query: SearchQuery, cx: &mut App) -> Receiver { + pub(crate) fn run(self, query: SearchQuery, cx: &mut App) -> Receiver { let executor = cx.background_executor().clone(); let (tx, rx) = unbounded(); cx.spawn(async move |cx| { @@ -48,7 +48,7 @@ impl ProjectSearcher { let matched_buffer_count = AtomicUsize::new(0); let worker_pool = executor.scoped(|scope| { let (input_paths_tx, input_paths_rx) = bounded(64); - let (find_first_match_tx, find_first_match_rx) = bounded(64); + let (confirm_contents_will_match_tx, confirm_contents_will_match_rx) = bounded(64); let (sorted_search_results_tx, sorted_search_results_rx) = bounded(64); for _ in 0..executor.num_cpus() { let worker = Worker { @@ -58,8 +58,8 @@ impl ProjectSearcher { matches_count: &matches_count, fs: &*self.fs, input_paths_rx: input_paths_rx.clone(), - find_first_match_rx: find_first_match_rx.clone(), - find_first_match_tx: find_first_match_tx.clone(), + confirm_contents_will_match_rx: confirm_contents_will_match_rx.clone(), + confirm_contents_will_match_tx: confirm_contents_will_match_tx.clone(), get_buffer_for_full_scan_tx: get_buffer_for_full_scan_tx.clone(), find_all_matches_rx: find_all_matches_rx.clone(), publish_matches: tx.clone(), @@ -71,13 +71,16 @@ impl ProjectSearcher { input_paths_tx, sorted_search_results_tx, )); - scope.spawn(self.maintain_sorted_search_results()) + scope.spawn(self.maintain_sorted_search_results( + sorted_search_results_rx, + get_buffer_for_full_scan_tx, + )) }); self.open_buffers(get_buffer_for_full_scan_rx, find_all_matches_tx, cx) .await; worker_pool.await; - let limit_reached = matches_count.load(Ordering::Release) > MAX_SEARCH_RESULT_RANGES - || matched_buffer_count.load(Ordering::Release) > MAX_SEARCH_RESULT_FILES; + let limit_reached = matches_count.load(Ordering::Acquire) > MAX_SEARCH_RESULT_RANGES + || matched_buffer_count.load(Ordering::Acquire) > MAX_SEARCH_RESULT_FILES; if limit_reached { _ = tx.send(SearchResult::LimitReached).await; } @@ -165,10 +168,12 @@ struct Worker<'search> { /// - Scan ignored files /// Put another way: filter out files that can't match (without looking at file contents) input_paths_rx: Receiver>, - /// After that, figure out which paths contain at least one match (look at file contents). That's called "partial scan". - find_first_match_tx: Sender, - find_first_match_rx: Receiver, - /// Of those that contain at least one match, look for rest of matches (and figure out their ranges). + + /// After that, if the buffer is not yet loaded, we'll figure out if it contains at least one match + /// based on disk contents of a buffer. This step is not performed for buffers we already have in memory. + confirm_contents_will_match_tx: Sender, + confirm_contents_will_match_rx: Receiver, + /// Of those that contain at least one match (or are already in memory), look for rest of matches (and figure out their ranges). /// But wait - first, we need to go back to the main thread to open a buffer (& create an entity for it). get_buffer_for_full_scan_tx: Sender, /// Ok, we're back in background: run full scan & find all matches in a given buffer snapshot. @@ -180,7 +185,7 @@ struct Worker<'search> { impl Worker<'_> { async fn run(self) { let mut find_all_matches = pin!(self.find_all_matches_rx.fuse()); - let mut find_first_match = pin!(self.find_first_match_rx.fuse()); + let mut find_first_match = pin!(self.confirm_contents_will_match_rx.fuse()); let mut scan_path = pin!(self.input_paths_rx.fuse()); let handler = RequestHandler { query: self.query, @@ -188,7 +193,7 @@ impl Worker<'_> { fs: self.fs, matched_buffer_count: self.matched_buffer_count, matches_count: self.matches_count, - find_first_match_tx: &self.find_first_match_tx, + confirm_contents_will_match_tx: &self.confirm_contents_will_match_tx, get_buffer_for_full_scan_tx: &self.get_buffer_for_full_scan_tx, publish_matches: &self.publish_matches, }; @@ -225,7 +230,7 @@ struct RequestHandler<'worker> { matched_buffer_count: &'worker AtomicUsize, matches_count: &'worker AtomicUsize, - find_first_match_tx: &'worker Sender, + confirm_contents_will_match_tx: &'worker Sender, get_buffer_for_full_scan_tx: &'worker Sender, publish_matches: &'worker Sender, } @@ -302,15 +307,15 @@ impl RequestHandler<'_> { } = req; if entry.is_dir() && entry.is_ignored { if !settings.is_path_excluded(&entry.path) { - Self::scan_ignored_dir( - self.fs, - &snapshot, - &entry.path, - self.query, - &filter_tx, - &output_tx, - ) - .await?; + // Self::scan_ignored_dir( + // self.fs, + // &snapshot, + // &entry.path, + // self.query, + // &filter_tx, + // &output_tx, + // ) + // .await?; } return Ok(()); } @@ -332,8 +337,6 @@ impl RequestHandler<'_> { } } - let (mut tx, rx) = oneshot::channel(); - if self.open_entries.contains(&entry.id) { // The buffer is already in memory and that's the version we want to scan; // hence skip the dilly-dally and look for all matches straight away. @@ -344,7 +347,7 @@ impl RequestHandler<'_> { }) .await?; } else { - self.find_first_match_tx + self.confirm_contents_will_match_tx .send(MatchingEntry { should_scan_tx: should_scan_tx, worktree_root: snapshot.abs_path().clone(), @@ -356,7 +359,6 @@ impl RequestHandler<'_> { .await?; } - output_tx.send(rx).await?; anyhow::Ok(()) }) .await;