diff --git a/Cargo.lock b/Cargo.lock index 34d31b72b771e050a778fd263f48d1d94ebc16cf..b32e5411be9eb1d851402705da3b0062081550ab 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12891,7 +12891,6 @@ dependencies = [ "project", "settings", "watch", - "workspace-hack", ] [[package]] @@ -20593,6 +20592,16 @@ dependencies = [ "zlog", ] +[[package]] +name = "worktree_benchmarks" +version = "0.1.0" +dependencies = [ + "fs", + "gpui", + "settings", + "worktree", +] + [[package]] name = "writeable" version = "0.6.1" diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index 32844ef164ba142a9bc52979d798909a2f04cdb9..d908ed8de1b8eba95058eb3eaed5bd1caa92dfb5 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -4004,11 +4004,21 @@ impl Project { } fn search_impl(&mut self, query: SearchQuery, cx: &mut Context) -> SearchResultsHandle { + let client: Option<(AnyProtoClient, _)> = if let Some(ssh_client) = &self.remote_client { + Some((ssh_client.read(cx).proto_client(), 0)) + } else if let Some(remote_id) = self.remote_id() { + Some((self.collab_client.clone().into(), remote_id)) + } else { + None + }; let searcher = project_search::Search { fs: self.fs.clone(), buffer_store: self.buffer_store.clone(), + worktree_store: self.worktree_store.clone(), worktrees: self.visible_worktrees(cx).collect::>(), limit: project_search::Search::MAX_SEARCH_RESULT_FILES + 1, + client, + remotely_created_models: self.remotely_created_models.clone(), }; searcher.into_results(query, cx) } @@ -4739,18 +4749,31 @@ impl Project { fn retain_remotely_created_models( &mut self, cx: &mut Context, + ) -> RemotelyCreatedModelGuard { + Self::retain_remotely_created_models_impl( + &self.remotely_created_models, + &self.buffer_store, + &self.worktree_store, + cx, + ) + } + + fn retain_remotely_created_models_impl( + models: &Arc>, + buffer_store: &Entity, + worktree_store: &Entity, + cx: &mut App, ) -> RemotelyCreatedModelGuard { { - let mut remotely_create_models = self.remotely_created_models.lock(); + let mut remotely_create_models = models.lock(); if remotely_create_models.retain_count == 0 { - remotely_create_models.buffers = self.buffer_store.read(cx).buffers().collect(); - remotely_create_models.worktrees = - self.worktree_store.read(cx).worktrees().collect(); + remotely_create_models.buffers = buffer_store.read(cx).buffers().collect(); + remotely_create_models.worktrees = worktree_store.read(cx).worktrees().collect(); } remotely_create_models.retain_count += 1; } RemotelyCreatedModelGuard { - remote_models: Arc::downgrade(&self.remotely_created_models), + remote_models: Arc::downgrade(&models), } } diff --git a/crates/project/src/project_search.rs b/crates/project/src/project_search.rs index db14cdb1b8a150ea1f4423544ab12a334b35757b..19553b1dbf106c99f9ee1cc67ed6bd8d4b5e9232 100644 --- a/crates/project/src/project_search.rs +++ b/crates/project/src/project_search.rs @@ -12,25 +12,35 @@ use anyhow::Context; use collections::HashSet; use fs::Fs; use futures::{SinkExt, StreamExt, select_biased, stream::FuturesOrdered}; -use gpui::{App, AsyncApp, Entity, Task}; +use gpui::{App, AppContext, AsyncApp, Entity, Task}; use language::{Buffer, BufferSnapshot}; +use parking_lot::Mutex; use postage::oneshot; -use smol::channel::{Receiver, Sender, bounded, unbounded}; +use rpc::{AnyProtoClient, proto}; +use smol::{ + channel::{Receiver, Sender, bounded, unbounded}, + future::FutureExt, +}; +use text::BufferId; use util::{ResultExt, maybe}; use worktree::{Entry, ProjectEntryId, Snapshot, Worktree}; use crate::{ - ProjectItem, ProjectPath, + Project, ProjectItem, ProjectPath, RemotelyCreatedModels, buffer_store::BufferStore, search::{SearchQuery, SearchResult}, + worktree_store::WorktreeStore, }; pub(crate) struct Search { pub(crate) fs: Arc, pub(crate) buffer_store: Entity, + pub(crate) worktree_store: Entity, pub(crate) worktrees: Vec>, pub(crate) limit: usize, + pub(crate) client: Option<(AnyProtoClient, u64)>, + pub(crate) remotely_created_models: Arc>, } /// Represents results of project search and allows one to either obtain match positions OR @@ -53,6 +63,27 @@ impl SearchResultsHandle { } } +#[derive(Clone)] +enum FindSearchCandidates { + Local { + /// Start off with all paths in project and filter them based on: + /// - Include filters + /// - Exclude filters + /// - Only open buffers + /// - Scan ignored files + /// Put another way: filter out files that can't match (without looking at file contents) + input_paths_rx: Receiver, + /// After that, if the buffer is not yet loaded, we'll figure out if it contains at least one match + /// based on disk contents of a buffer. This step is not performed for buffers we already have in memory. + confirm_contents_will_match_tx: Sender, + confirm_contents_will_match_rx: Receiver, + /// Of those that contain at least one match (or are already in memory), look for rest of matches (and figure out their ranges). + /// But wait - first, we need to go back to the main thread to open a buffer (& create an entity for it). + get_buffer_for_full_scan_tx: Sender, + }, + Remote, +} + impl Search { pub(crate) const MAX_SEARCH_RESULT_FILES: usize = 5_000; pub(crate) const MAX_SEARCH_RESULT_RANGES: usize = 10_000; @@ -87,15 +118,92 @@ impl Search { let (find_all_matches_tx, find_all_matches_rx) = bounded(MAX_CONCURRENT_BUFFER_OPENS); - let (get_buffer_for_full_scan_tx, get_buffer_for_full_scan_rx) = unbounded(); - let matches_count = AtomicUsize::new(0); - let matched_buffer_count = AtomicUsize::new(0); - let (input_paths_tx, input_paths_rx) = unbounded(); - let (sorted_search_results_tx, sorted_search_results_rx) = unbounded(); - let worker_pool = executor.scoped(|scope| { + let (candidate_searcher, tasks) = if let Some((client, remote_id)) = self.client { + let request = client.request(proto::FindSearchCandidates { + project_id: remote_id, + query: Some(query.to_proto()), + limit: self.limit as _, + }); + let Ok(guard) = cx.update(|cx| { + Project::retain_remotely_created_models_impl( + &self.remotely_created_models, + &self.buffer_store, + &self.worktree_store, + cx, + ) + }) else { + return; + }; + let buffer_store = self.buffer_store.downgrade(); + let issue_remote_buffers_request = cx + .spawn(async move |cx| { + let _ = maybe!(async move { + let response = request.await?; + + for buffer_id in response.buffer_ids { + let buffer_id = BufferId::new(buffer_id)?; + let buffer = buffer_store + .update(cx, |buffer_store, cx| { + buffer_store.wait_for_remote_buffer(buffer_id, cx) + })? + .await?; + let _ = grab_buffer_snapshot_tx.send(buffer).await; + } + + drop(guard); + anyhow::Ok(()) + }) + .await + .log_err(); + }) + .boxed_local(); + ( + FindSearchCandidates::Remote, + vec![issue_remote_buffers_request], + ) + } else { + let (get_buffer_for_full_scan_tx, get_buffer_for_full_scan_rx) = unbounded(); let (confirm_contents_will_match_tx, confirm_contents_will_match_rx) = bounded(64); + let (sorted_search_results_tx, sorted_search_results_rx) = unbounded(); + + let (input_paths_tx, input_paths_rx) = unbounded(); + let tasks = vec![ + cx.spawn(Self::provide_search_paths( + std::mem::take(&mut self.worktrees), + query.include_ignored(), + input_paths_tx, + sorted_search_results_tx, + )) + .boxed_local(), + self.open_buffers( + get_buffer_for_full_scan_rx, + grab_buffer_snapshot_tx, + cx.clone(), + ) + .boxed_local(), + cx.background_spawn(Self::maintain_sorted_search_results( + sorted_search_results_rx, + get_buffer_for_full_scan_tx.clone(), + self.limit, + )) + .boxed_local(), + ]; + ( + FindSearchCandidates::Local { + get_buffer_for_full_scan_tx, + confirm_contents_will_match_tx, + confirm_contents_will_match_rx, + input_paths_rx, + }, + tasks, + ) + }; + let matches_count = AtomicUsize::new(0); + let matched_buffer_count = AtomicUsize::new(0); + + let worker_pool = executor.scoped(|scope| { let num_cpus = executor.num_cpus(); assert!(num_cpus > 0); @@ -106,10 +214,7 @@ impl Search { matched_buffer_count: &matched_buffer_count, matches_count: &matches_count, fs: &*self.fs, - input_paths_rx: input_paths_rx.clone(), - confirm_contents_will_match_rx: confirm_contents_will_match_rx.clone(), - confirm_contents_will_match_tx: confirm_contents_will_match_tx.clone(), - get_buffer_for_full_scan_tx: get_buffer_for_full_scan_tx.clone(), + candidates: candidate_searcher.clone(), find_all_matches_rx: find_all_matches_rx.clone(), publish_matches: tx.clone(), }; @@ -117,38 +222,23 @@ impl Search { } drop(tx); drop(find_all_matches_rx); - - scope.spawn(Self::maintain_sorted_search_results( - sorted_search_results_rx, - get_buffer_for_full_scan_tx, - self.limit, - )) + drop(candidate_searcher); }); - let provide_search_paths = cx.spawn(Self::provide_search_paths( - std::mem::take(&mut self.worktrees), - query.include_ignored(), - input_paths_tx, - sorted_search_results_tx, - )); - let open_buffers = self.open_buffers( - get_buffer_for_full_scan_rx, - grab_buffer_snapshot_tx, - cx.clone(), - ); - let buffer_snapshots = self.grab_buffer_snapshots( + + let buffer_snapshots = Self::grab_buffer_snapshots( grab_buffer_snapshot_rx, find_all_matches_tx, cx.clone(), ); - futures::future::join4( - worker_pool, - buffer_snapshots, - open_buffers, - provide_search_paths, + futures::future::join_all( + [worker_pool.boxed_local(), buffer_snapshots.boxed_local()] + .into_iter() + .chain(tasks), ) .await; }) }); + SearchResultsHandle { results: rx, matching_buffers, @@ -273,7 +363,6 @@ impl Search { } async fn grab_buffer_snapshots( - &self, rx: Receiver>, find_all_matches_tx: Sender<(Entity, BufferSnapshot)>, mut cx: AsyncApp, @@ -295,21 +384,8 @@ struct Worker<'search> { matches_count: &'search AtomicUsize, open_buffers: &'search HashSet, fs: &'search dyn Fs, - /// Start off with all paths in project and filter them based on: - /// - Include filters - /// - Exclude filters - /// - Only open buffers - /// - Scan ignored files - /// Put another way: filter out files that can't match (without looking at file contents) - input_paths_rx: Receiver, - - /// After that, if the buffer is not yet loaded, we'll figure out if it contains at least one match - /// based on disk contents of a buffer. This step is not performed for buffers we already have in memory. - confirm_contents_will_match_tx: Sender, - confirm_contents_will_match_rx: Receiver, - /// Of those that contain at least one match (or are already in memory), look for rest of matches (and figure out their ranges). - /// But wait - first, we need to go back to the main thread to open a buffer (& create an entity for it). - get_buffer_for_full_scan_tx: Sender, + + candidates: FindSearchCandidates, /// Ok, we're back in background: run full scan & find all matches in a given buffer snapshot. find_all_matches_rx: Receiver<(Entity, BufferSnapshot)>, /// Cool, we have results; let's share them with the world. @@ -318,9 +394,30 @@ struct Worker<'search> { impl Worker<'_> { async fn run(mut self) { + let ( + input_paths_rx, + confirm_contents_will_match_rx, + mut confirm_contents_will_match_tx, + mut get_buffer_for_full_scan_tx, + ) = match self.candidates { + FindSearchCandidates::Local { + input_paths_rx, + confirm_contents_will_match_rx, + confirm_contents_will_match_tx, + get_buffer_for_full_scan_tx, + } => ( + input_paths_rx, + confirm_contents_will_match_rx, + confirm_contents_will_match_tx, + get_buffer_for_full_scan_tx, + ), + FindSearchCandidates::Remote => { + (unbounded().1, unbounded().1, unbounded().0, unbounded().0) + } + }; let mut find_all_matches = pin!(self.find_all_matches_rx.fuse()); - let mut find_first_match = pin!(self.confirm_contents_will_match_rx.fuse()); - let mut scan_path = pin!(self.input_paths_rx.fuse()); + let mut find_first_match = pin!(confirm_contents_will_match_rx.fuse()); + let mut scan_path = pin!(input_paths_rx.fuse()); loop { let handler = RequestHandler { @@ -329,8 +426,8 @@ impl Worker<'_> { fs: self.fs, matched_buffer_count: self.matched_buffer_count, matches_count: self.matches_count, - confirm_contents_will_match_tx: &self.confirm_contents_will_match_tx, - get_buffer_for_full_scan_tx: &self.get_buffer_for_full_scan_tx, + confirm_contents_will_match_tx: &confirm_contents_will_match_tx, + get_buffer_for_full_scan_tx: &get_buffer_for_full_scan_tx, publish_matches: &self.publish_matches, }; // Whenever we notice that some step of a pipeline is closed, we don't want to close subsequent @@ -339,6 +436,7 @@ impl Worker<'_> { // That way, we'll only ever close a next-stage channel when ALL workers do so. select_biased! { find_all_matches = find_all_matches.next() => { + if self.publish_matches.is_closed() { break; } @@ -357,7 +455,7 @@ impl Worker<'_> { if let Some(buffer_with_at_least_one_match) = find_first_match { handler.handle_find_first_match(buffer_with_at_least_one_match).await; } else { - self.get_buffer_for_full_scan_tx = bounded(1).0; + get_buffer_for_full_scan_tx = bounded(1).0; } }, @@ -366,7 +464,7 @@ impl Worker<'_> { handler.handle_scan_path(path_to_scan).await; } else { // If we're the last worker to notice that this is not producing values, close the upstream. - self.confirm_contents_will_match_tx = bounded(1).0; + confirm_contents_will_match_tx = bounded(1).0; } } diff --git a/crates/project_benchmarks/Cargo.toml b/crates/project_benchmarks/Cargo.toml index ff1fdb5e107e478839c41121b599f4fff89e4892..1171d468c649bdd9f76a44b3ef0155dc652c6034 100644 --- a/crates/project_benchmarks/Cargo.toml +++ b/crates/project_benchmarks/Cargo.toml @@ -16,7 +16,6 @@ node_runtime.workspace = true project.workspace = true settings.workspace = true watch.workspace = true -workspace-hack = { version = "0.1", path = "../../tooling/workspace-hack" } [lints] workspace = true