From 2970f463a197ce2630705a19e63506a4b68edf88 Mon Sep 17 00:00:00 2001 From: Piotr Osiewicz <24362066+osiewicz@users.noreply.github.com> Date: Mon, 20 Oct 2025 14:22:02 +0200 Subject: [PATCH] Fix project search in remote projects --- Cargo.lock | 10 - crates/project/src/buffer_store.rs | 63 +---- crates/project/src/project.rs | 23 +- crates/project/src/project_search.rs | 265 ++++++++++++------- crates/remote_server/src/headless_project.rs | 12 +- 5 files changed, 190 insertions(+), 183 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b32e5411be9eb1d851402705da3b0062081550ab..b75b8896ac0611bca05fdddc42aefea2aa2c09c3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -20592,16 +20592,6 @@ dependencies = [ "zlog", ] -[[package]] -name = "worktree_benchmarks" -version = "0.1.0" -dependencies = [ - "fs", - "gpui", - "settings", - "worktree", -] - [[package]] name = "writeable" version = "0.6.1" diff --git a/crates/project/src/buffer_store.rs b/crates/project/src/buffer_store.rs index 892d2d289cb6cdcca9b9d5ba7f0f4d804a1e7898..78873022f3c34b668fe50876f53f0c90dcd21d99 100644 --- a/crates/project/src/buffer_store.rs +++ b/crates/project/src/buffer_store.rs @@ -1,14 +1,12 @@ use crate::{ - ProjectItem as _, ProjectPath, + ProjectPath, lsp_store::OpenLspBufferHandle, - search::SearchQuery, worktree_store::{WorktreeStore, WorktreeStoreEvent}, }; use anyhow::{Context as _, Result, anyhow}; use client::Client; use collections::{HashMap, HashSet, hash_map}; -use fs::Fs; -use futures::{Future, FutureExt as _, StreamExt, channel::oneshot, future::Shared}; +use futures::{Future, FutureExt as _, channel::oneshot, future::Shared}; use gpui::{ App, AppContext as _, AsyncApp, Context, Entity, EventEmitter, Subscription, Task, WeakEntity, }; @@ -1103,63 +1101,6 @@ impl BufferStore { Some(()) } - pub fn find_search_candidates( - &mut self, - query: &SearchQuery, - mut limit: usize, - fs: Arc, - cx: &mut Context, - ) -> Receiver> { - let (tx, rx) = smol::channel::unbounded(); - let mut open_buffers = HashSet::default(); - let mut unnamed_buffers = Vec::new(); - for handle in self.buffers() { - let buffer = handle.read(cx); - if self.non_searchable_buffers.contains(&buffer.remote_id()) { - continue; - } else if let Some(entry_id) = buffer.entry_id(cx) { - open_buffers.insert(entry_id); - } else { - limit = limit.saturating_sub(1); - unnamed_buffers.push(handle) - }; - } - - const MAX_CONCURRENT_BUFFER_OPENS: usize = 64; - let project_paths_rx = self - .worktree_store - .update(cx, |worktree_store, cx| { - worktree_store.find_search_candidates(query.clone(), limit, open_buffers, fs, cx) - }) - .chunks(MAX_CONCURRENT_BUFFER_OPENS); - - cx.spawn(async move |this, cx| { - for buffer in unnamed_buffers { - tx.send(buffer).await.ok(); - } - - let mut project_paths_rx = pin!(project_paths_rx); - while let Some(project_paths) = project_paths_rx.next().await { - let buffers = this.update(cx, |this, cx| { - project_paths - .into_iter() - .map(|project_path| this.open_buffer(project_path, cx)) - .collect::>() - })?; - for buffer_task in buffers { - if let Some(buffer) = buffer_task.await.log_err() - && tx.send(buffer).await.is_err() - { - return anyhow::Ok(()); - } - } - } - anyhow::Ok(()) - }) - .detach(); - rx - } - fn on_buffer_event( &mut self, buffer: Entity, diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index 1849bd0b4883e67af9c2e80837187bb1ee6d8ad2..3b38824bdac0e4c9c98a1da5612f5235d1373a1d 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -49,6 +49,7 @@ pub use git_store::{ git_traversal::{ChildEntriesGitIter, GitEntry, GitEntryRef, GitTraversal}, }; pub use manifest_tree::ManifestTree; +pub use project_search::Search; use anyhow::{Context as _, Result, anyhow}; use buffer_store::{BufferStore, BufferStoreEvent}; @@ -4013,14 +4014,20 @@ impl Project { } else { None }; - let searcher = project_search::Search { - fs: self.fs.clone(), - buffer_store: self.buffer_store.clone(), - worktree_store: self.worktree_store.clone(), - worktrees: self.visible_worktrees(cx).collect::>(), - limit: project_search::Search::MAX_SEARCH_RESULT_FILES + 1, - client, - remotely_created_models: self.remotely_created_models.clone(), + let searcher = match client { + Some((client, remote_id)) => project_search::Search::remote( + self.buffer_store.clone(), + self.worktree_store.clone(), + project_search::Search::MAX_SEARCH_RESULT_FILES + 1, + (client, remote_id, self.remotely_created_models.clone()), + ), + None => project_search::Search::local( + self.fs.clone(), + self.buffer_store.clone(), + self.worktree_store.clone(), + project_search::Search::MAX_SEARCH_RESULT_FILES + 1, + cx, + ), }; searcher.into_results(query, cx) } diff --git a/crates/project/src/project_search.rs b/crates/project/src/project_search.rs index 19553b1dbf106c99f9ee1cc67ed6bd8d4b5e9232..762f45780bf8550e218d2c6028ac16027d3feca7 100644 --- a/crates/project/src/project_search.rs +++ b/crates/project/src/project_search.rs @@ -33,31 +33,40 @@ use crate::{ worktree_store::WorktreeStore, }; -pub(crate) struct Search { - pub(crate) fs: Arc, - pub(crate) buffer_store: Entity, - pub(crate) worktree_store: Entity, - pub(crate) worktrees: Vec>, - pub(crate) limit: usize, - pub(crate) client: Option<(AnyProtoClient, u64)>, - pub(crate) remotely_created_models: Arc>, +pub struct Search { + buffer_store: Entity, + worktree_store: Entity, + limit: usize, + kind: SearchKind, +} + +enum SearchKind { + Local { + fs: Arc, + worktrees: Vec>, + }, + Remote { + client: AnyProtoClient, + remote_id: u64, + models: Arc>, + }, } /// Represents results of project search and allows one to either obtain match positions OR /// just the handles to buffers that may match the search. #[must_use] -pub(crate) struct SearchResultsHandle { +pub struct SearchResultsHandle { results: Receiver, matching_buffers: Receiver>, trigger_search: Box Task<()> + Send + Sync>, } impl SearchResultsHandle { - pub(crate) fn results(self, cx: &mut App) -> Receiver { + pub fn results(self, cx: &mut App) -> Receiver { (self.trigger_search)(cx).detach(); self.results } - pub(crate) fn matching_buffers(self, cx: &mut App) -> Receiver> { + pub fn matching_buffers(self, cx: &mut App) -> Receiver> { (self.trigger_search)(cx).detach(); self.matching_buffers } @@ -66,6 +75,7 @@ impl SearchResultsHandle { #[derive(Clone)] enum FindSearchCandidates { Local { + fs: Arc, /// Start off with all paths in project and filter them based on: /// - Include filters /// - Exclude filters @@ -85,11 +95,46 @@ enum FindSearchCandidates { } impl Search { + pub fn local( + fs: Arc, + buffer_store: Entity, + worktree_store: Entity, + + limit: usize, + cx: &mut App, + ) -> Self { + let worktrees = worktree_store.read(cx).visible_worktrees(cx).collect(); + Self { + kind: SearchKind::Local { fs, worktrees }, + buffer_store, + worktree_store, + limit, + } + } + + pub(crate) fn remote( + buffer_store: Entity, + worktree_store: Entity, + limit: usize, + client_state: (AnyProtoClient, u64, Arc>), + ) -> Self { + Self { + kind: SearchKind::Remote { + client: client_state.0, + remote_id: client_state.1, + models: client_state.2, + }, + buffer_store, + worktree_store, + limit, + } + } + pub(crate) const MAX_SEARCH_RESULT_FILES: usize = 5_000; pub(crate) const MAX_SEARCH_RESULT_RANGES: usize = 10_000; /// Prepares a project search run. The result has to be used to specify whether you're interested in matching buffers /// or full search results. - pub(crate) fn into_results(mut self, query: SearchQuery, cx: &mut App) -> SearchResultsHandle { + pub fn into_results(mut self, query: SearchQuery, cx: &mut App) -> SearchResultsHandle { let mut open_buffers = HashSet::default(); let mut unnamed_buffers = Vec::new(); const MAX_CONCURRENT_BUFFER_OPENS: usize = 64; @@ -109,7 +154,7 @@ impl Search { let (tx, rx) = unbounded(); let (grab_buffer_snapshot_tx, grab_buffer_snapshot_rx) = unbounded(); let matching_buffers = grab_buffer_snapshot_rx.clone(); - let trigger_search = Box::new(|cx: &mut App| { + let trigger_search = Box::new(move |cx: &mut App| { cx.spawn(async move |cx| { for buffer in unnamed_buffers { _ = grab_buffer_snapshot_tx.send(buffer).await; @@ -118,86 +163,100 @@ impl Search { let (find_all_matches_tx, find_all_matches_rx) = bounded(MAX_CONCURRENT_BUFFER_OPENS); - let (candidate_searcher, tasks) = if let Some((client, remote_id)) = self.client { - let request = client.request(proto::FindSearchCandidates { - project_id: remote_id, - query: Some(query.to_proto()), - limit: self.limit as _, - }); - let Ok(guard) = cx.update(|cx| { - Project::retain_remotely_created_models_impl( - &self.remotely_created_models, - &self.buffer_store, - &self.worktree_store, - cx, + let (candidate_searcher, tasks) = match self.kind { + SearchKind::Local { + fs, + ref mut worktrees, + } => { + let (get_buffer_for_full_scan_tx, get_buffer_for_full_scan_rx) = + unbounded(); + let (confirm_contents_will_match_tx, confirm_contents_will_match_rx) = + bounded(64); + let (sorted_search_results_tx, sorted_search_results_rx) = unbounded(); + + let (input_paths_tx, input_paths_rx) = unbounded(); + + let tasks = vec![ + cx.spawn(Self::provide_search_paths( + std::mem::take(worktrees), + query.include_ignored(), + input_paths_tx, + sorted_search_results_tx, + )) + .boxed_local(), + Self::open_buffers( + &self.buffer_store, + get_buffer_for_full_scan_rx, + grab_buffer_snapshot_tx, + cx.clone(), + ) + .boxed_local(), + cx.background_spawn(Self::maintain_sorted_search_results( + sorted_search_results_rx, + get_buffer_for_full_scan_tx.clone(), + self.limit, + )) + .boxed_local(), + ]; + ( + FindSearchCandidates::Local { + fs, + get_buffer_for_full_scan_tx, + confirm_contents_will_match_tx, + confirm_contents_will_match_rx, + input_paths_rx, + }, + tasks, ) - }) else { - return; - }; - let buffer_store = self.buffer_store.downgrade(); - let issue_remote_buffers_request = cx - .spawn(async move |cx| { - let _ = maybe!(async move { - let response = request.await?; - - for buffer_id in response.buffer_ids { - let buffer_id = BufferId::new(buffer_id)?; - let buffer = buffer_store - .update(cx, |buffer_store, cx| { - buffer_store.wait_for_remote_buffer(buffer_id, cx) - })? - .await?; - let _ = grab_buffer_snapshot_tx.send(buffer).await; - } - - drop(guard); - anyhow::Ok(()) + } + SearchKind::Remote { + client, + remote_id, + models, + } => { + let request = client.request(proto::FindSearchCandidates { + project_id: remote_id, + query: Some(query.to_proto()), + limit: self.limit as _, + }); + let Ok(guard) = cx.update(|cx| { + Project::retain_remotely_created_models_impl( + &models, + &self.buffer_store, + &self.worktree_store, + cx, + ) + }) else { + return; + }; + let buffer_store = self.buffer_store.downgrade(); + let issue_remote_buffers_request = cx + .spawn(async move |cx| { + let _ = maybe!(async move { + let response = request.await?; + + for buffer_id in response.buffer_ids { + let buffer_id = BufferId::new(buffer_id)?; + let buffer = buffer_store + .update(cx, |buffer_store, cx| { + buffer_store.wait_for_remote_buffer(buffer_id, cx) + })? + .await?; + let _ = grab_buffer_snapshot_tx.send(buffer).await; + } + + drop(guard); + anyhow::Ok(()) + }) + .await + .log_err(); }) - .await - .log_err(); - }) - .boxed_local(); - ( - FindSearchCandidates::Remote, - vec![issue_remote_buffers_request], - ) - } else { - let (get_buffer_for_full_scan_tx, get_buffer_for_full_scan_rx) = unbounded(); - let (confirm_contents_will_match_tx, confirm_contents_will_match_rx) = - bounded(64); - let (sorted_search_results_tx, sorted_search_results_rx) = unbounded(); - - let (input_paths_tx, input_paths_rx) = unbounded(); - let tasks = vec![ - cx.spawn(Self::provide_search_paths( - std::mem::take(&mut self.worktrees), - query.include_ignored(), - input_paths_tx, - sorted_search_results_tx, - )) - .boxed_local(), - self.open_buffers( - get_buffer_for_full_scan_rx, - grab_buffer_snapshot_tx, - cx.clone(), + .boxed_local(); + ( + FindSearchCandidates::Remote, + vec![issue_remote_buffers_request], ) - .boxed_local(), - cx.background_spawn(Self::maintain_sorted_search_results( - sorted_search_results_rx, - get_buffer_for_full_scan_tx.clone(), - self.limit, - )) - .boxed_local(), - ]; - ( - FindSearchCandidates::Local { - get_buffer_for_full_scan_tx, - confirm_contents_will_match_tx, - confirm_contents_will_match_rx, - input_paths_rx, - }, - tasks, - ) + } }; let matches_count = AtomicUsize::new(0); @@ -213,7 +272,6 @@ impl Search { open_buffers: &open_buffers, matched_buffer_count: &matched_buffer_count, matches_count: &matches_count, - fs: &*self.fs, candidates: candidate_searcher.clone(), find_all_matches_rx: find_all_matches_rx.clone(), publish_matches: tx.clone(), @@ -336,7 +394,7 @@ impl Search { /// Background workers cannot open buffers by themselves, hence main thread will do it on their behalf. async fn open_buffers( - &self, + buffer_store: &Entity, rx: Receiver, find_all_matches_tx: Sender>, mut cx: AsyncApp, @@ -344,7 +402,7 @@ impl Search { let mut rx = pin!(rx.ready_chunks(64)); _ = maybe!(async move { while let Some(requested_paths) = rx.next().await { - let mut buffers = self.buffer_store.update(&mut cx, |this, cx| { + let mut buffers = buffer_store.update(&mut cx, |this, cx| { requested_paths .into_iter() .map(|path| this.open_buffer(path, cx)) @@ -383,8 +441,6 @@ struct Worker<'search> { matched_buffer_count: &'search AtomicUsize, matches_count: &'search AtomicUsize, open_buffers: &'search HashSet, - fs: &'search dyn Fs, - candidates: FindSearchCandidates, /// Ok, we're back in background: run full scan & find all matches in a given buffer snapshot. find_all_matches_rx: Receiver<(Entity, BufferSnapshot)>, @@ -399,8 +455,10 @@ impl Worker<'_> { confirm_contents_will_match_rx, mut confirm_contents_will_match_tx, mut get_buffer_for_full_scan_tx, + fs, ) = match self.candidates { FindSearchCandidates::Local { + fs, input_paths_rx, confirm_contents_will_match_rx, confirm_contents_will_match_tx, @@ -410,10 +468,15 @@ impl Worker<'_> { confirm_contents_will_match_rx, confirm_contents_will_match_tx, get_buffer_for_full_scan_tx, + Some(fs), + ), + FindSearchCandidates::Remote => ( + unbounded().1, + unbounded().1, + unbounded().0, + unbounded().0, + None, ), - FindSearchCandidates::Remote => { - (unbounded().1, unbounded().1, unbounded().0, unbounded().0) - } }; let mut find_all_matches = pin!(self.find_all_matches_rx.fuse()); let mut find_first_match = pin!(confirm_contents_will_match_rx.fuse()); @@ -423,7 +486,7 @@ impl Worker<'_> { let handler = RequestHandler { query: self.query, open_entries: &self.open_buffers, - fs: self.fs, + fs: fs.as_deref(), matched_buffer_count: self.matched_buffer_count, matches_count: self.matches_count, confirm_contents_will_match_tx: &confirm_contents_will_match_tx, @@ -479,7 +542,7 @@ impl Worker<'_> { struct RequestHandler<'worker> { query: &'worker SearchQuery, - fs: &'worker dyn Fs, + fs: Option<&'worker dyn Fs>, open_entries: &'worker HashSet, matched_buffer_count: &'worker AtomicUsize, matches_count: &'worker AtomicUsize, @@ -525,7 +588,7 @@ impl RequestHandler<'_> { async fn handle_find_first_match(&self, mut entry: MatchingEntry) { _=maybe!(async move { let abs_path = entry.worktree_root.join(entry.path.path.as_std_path()); - let Some(file) = self.fs.open_sync(&abs_path).await.log_err() else { + let Some(file) = self.fs.context("Trying to query filesystem in remote project search")?.open_sync(&abs_path).await.log_err() else { return anyhow::Ok(()); }; diff --git a/crates/remote_server/src/headless_project.rs b/crates/remote_server/src/headless_project.rs index 534eae6f44986afa42b6d202e4f34691935b3b33..73e933c185eeb00431b36368f27c00d7929b8459 100644 --- a/crates/remote_server/src/headless_project.rs +++ b/crates/remote_server/src/headless_project.rs @@ -640,9 +640,15 @@ impl HeadlessProject { PathStyle::local(), )?; let results = this.update(&mut cx, |this, cx| { - this.buffer_store.update(cx, |buffer_store, cx| { - buffer_store.find_search_candidates(&query, message.limit as _, this.fs.clone(), cx) - }) + project::Search::local( + this.fs.clone(), + this.buffer_store.clone(), + this.worktree_store.clone(), + message.limit as _, + cx, + ) + .into_results(query, cx) + .matching_buffers(cx) })?; let mut response = proto::FindSearchCandidatesResponse {