From 469213ac43326a3817c391a5944f034c1abb8fcc Mon Sep 17 00:00:00 2001 From: Piotr Osiewicz <24362066+osiewicz@users.noreply.github.com> Date: Thu, 16 Oct 2025 11:52:07 +0200 Subject: [PATCH] Start exploring API for remote searches Co-authored-by: Smit Barmase --- crates/project/src/project.rs | 13 ++- crates/project/src/project_search.rs | 169 ++++++++++++++++++--------- 2 files changed, 120 insertions(+), 62 deletions(-) diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index 405cf8ced425257bd73ca5c507ecd64abf050792..216639fc88b3d557aa1e8fb97a756c2cf9346227 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -41,7 +41,7 @@ use crate::{ agent_server_store::AllAgentServersSettings, git_store::GitStore, lsp_store::{SymbolLocation, log_store::LogKind}, - project_search::ProjectSearcher, + project_search::SearchResultsHandle, }; pub use agent_server_store::{AgentServerStore, AgentServersUpdated}; pub use git_store::{ @@ -4003,7 +4003,7 @@ impl Project { }) } - pub fn search(&mut self, query: SearchQuery, cx: &mut Context) -> Receiver { + fn search_impl(&mut self, query: SearchQuery, cx: &mut Context) -> SearchResultsHandle { let snapshots = self .visible_worktrees(cx) .filter_map(|tree| { @@ -4012,13 +4012,16 @@ impl Project { }) .collect::>(); - let searcher = ProjectSearcher { + let searcher = project_search::Search { fs: self.fs.clone(), buffer_store: self.buffer_store.clone(), snapshots, open_buffers: Default::default(), }; - searcher.run(query, cx) + searcher.into_results(query, cx) + } + pub fn search(&mut self, query: SearchQuery, cx: &mut Context) -> Receiver { + self.search_impl(query, cx).results(cx) } fn find_search_candidate_buffers( @@ -4885,7 +4888,7 @@ impl Project { let query = SearchQuery::from_proto(message.query.context("missing query field")?, path_style)?; let results = this.update(&mut cx, |this, cx| { - this.find_search_candidate_buffers(&query, message.limit as _, cx) + this.search_impl(query, cx).matching_buffers(cx) })?; let mut response = proto::FindSearchCandidatesResponse { diff --git a/crates/project/src/project_search.rs b/crates/project/src/project_search.rs index e759fd198e5e3b50bbc49b78bdd5adf74ed1466a..ecca2ecaf8514cc4c41a9b7145c7bfc46581aeb5 100644 --- a/crates/project/src/project_search.rs +++ b/crates/project/src/project_search.rs @@ -11,7 +11,7 @@ use std::{ use collections::HashSet; use fs::Fs; use futures::{SinkExt, StreamExt, select_biased}; -use gpui::{App, AsyncApp, Entity}; +use gpui::{App, AsyncApp, Entity, Task}; use language::{Buffer, BufferSnapshot}; use postage::oneshot; use smol::channel::{Receiver, Sender, bounded, unbounded}; @@ -25,7 +25,7 @@ use crate::{ search::{SearchQuery, SearchResult}, }; -pub(crate) struct ProjectSearcher { +pub(crate) struct Search { pub(crate) fs: Arc, pub(crate) buffer_store: Entity, pub(crate) snapshots: Vec<(Snapshot, WorktreeSettings)>, @@ -35,11 +35,33 @@ pub(crate) struct ProjectSearcher { const MAX_SEARCH_RESULT_FILES: usize = 5_000; const MAX_SEARCH_RESULT_RANGES: usize = 10_000; -impl ProjectSearcher { - pub(crate) fn run(self, query: SearchQuery, cx: &mut App) -> Receiver { +/// Represents results of project search and allows one to either obtain match positions OR +/// just the handles to buffers that may match the search. +#[must_use] +pub(crate) struct SearchResultsHandle { + results: Receiver, + matching_buffers: Receiver>, + trigger_search: Box Task<()> + Send + Sync>, +} + +impl SearchResultsHandle { + pub(crate) fn results(self, cx: &mut App) -> Receiver { + (self.trigger_search)(cx).detach(); + self.results + } + pub(crate) fn matching_buffers(self, cx: &mut App) -> Receiver> { + (self.trigger_search)(cx).detach(); + self.matching_buffers + } +} + +impl Search { + /// Prepares a project search run. The result has to be used to specify whether you're interested in matching buffers + /// or full search results. + pub(crate) fn into_results(self, query: SearchQuery, cx: &mut App) -> SearchResultsHandle { let mut open_buffers = HashSet::default(); let mut unnamed_buffers = Vec::new(); - + const MAX_CONCURRENT_BUFFER_OPENS: usize = 64; let buffers = self.buffer_store.read(cx); for handle in buffers.buffers() { let buffer = handle.read(cx); @@ -54,55 +76,73 @@ impl ProjectSearcher { } let executor = cx.background_executor().clone(); let (tx, rx) = unbounded(); - cx.spawn(async move |cx| { - const MAX_CONCURRENT_BUFFER_OPENS: usize = 64; - let (find_all_matches_tx, find_all_matches_rx) = bounded(MAX_CONCURRENT_BUFFER_OPENS); - let (get_buffer_for_full_scan_tx, get_buffer_for_full_scan_rx) = - bounded(MAX_CONCURRENT_BUFFER_OPENS); - let matches_count = AtomicUsize::new(0); - let matched_buffer_count = AtomicUsize::new(0); - let worker_pool = executor.scoped(|scope| { - let (input_paths_tx, input_paths_rx) = bounded(64); - let (confirm_contents_will_match_tx, confirm_contents_will_match_rx) = bounded(64); - let (sorted_search_results_tx, sorted_search_results_rx) = bounded(64); - for _ in 0..executor.num_cpus() { - let worker = Worker { - query: &query, - open_buffers: &self.open_buffers, - matched_buffer_count: &matched_buffer_count, - matches_count: &matches_count, - fs: &*self.fs, - input_paths_rx: input_paths_rx.clone(), - confirm_contents_will_match_rx: confirm_contents_will_match_rx.clone(), - confirm_contents_will_match_tx: confirm_contents_will_match_tx.clone(), - get_buffer_for_full_scan_tx: get_buffer_for_full_scan_tx.clone(), - find_all_matches_rx: find_all_matches_rx.clone(), - publish_matches: tx.clone(), - }; - scope.spawn(worker.run()); + let (grab_buffer_snapshot_tx, grab_buffer_snapshot_rx) = + bounded(MAX_CONCURRENT_BUFFER_OPENS); + let matching_buffers = grab_buffer_snapshot_rx.clone(); + let trigger_search = Box::new(|cx: &mut App| { + cx.spawn(async move |cx| { + let (find_all_matches_tx, find_all_matches_rx) = + bounded(MAX_CONCURRENT_BUFFER_OPENS); + + let (get_buffer_for_full_scan_tx, get_buffer_for_full_scan_rx) = unbounded(); + let matches_count = AtomicUsize::new(0); + let matched_buffer_count = AtomicUsize::new(0); + let worker_pool = executor.scoped(|scope| { + let (input_paths_tx, input_paths_rx) = bounded(64); + let (confirm_contents_will_match_tx, confirm_contents_will_match_rx) = + bounded(64); + let (sorted_search_results_tx, sorted_search_results_rx) = bounded(64); + for _ in 0..executor.num_cpus() { + let worker = Worker { + query: &query, + open_buffers: &self.open_buffers, + matched_buffer_count: &matched_buffer_count, + matches_count: &matches_count, + fs: &*self.fs, + input_paths_rx: input_paths_rx.clone(), + confirm_contents_will_match_rx: confirm_contents_will_match_rx.clone(), + confirm_contents_will_match_tx: confirm_contents_will_match_tx.clone(), + get_buffer_for_full_scan_tx: get_buffer_for_full_scan_tx.clone(), + find_all_matches_rx: find_all_matches_rx.clone(), + publish_matches: tx.clone(), + }; + scope.spawn(worker.run()); + } + scope.spawn(self.provide_search_paths( + &query, + input_paths_tx, + sorted_search_results_tx, + )); + scope.spawn(self.maintain_sorted_search_results( + sorted_search_results_rx, + get_buffer_for_full_scan_tx, + )) + }); + let open_buffers = self.open_buffers( + get_buffer_for_full_scan_rx, + grab_buffer_snapshot_tx, + cx.clone(), + ); + let buffer_snapshots = self.grab_buffer_snapshots( + grab_buffer_snapshot_rx, + find_all_matches_tx, + cx.clone(), + ); + futures::future::join3(worker_pool, buffer_snapshots, open_buffers).await; + + let limit_reached = matches_count.load(Ordering::Acquire) + > MAX_SEARCH_RESULT_RANGES + || matched_buffer_count.load(Ordering::Acquire) > MAX_SEARCH_RESULT_FILES; + if limit_reached { + _ = tx.send(SearchResult::LimitReached).await; } - scope.spawn(self.provide_search_paths( - &query, - input_paths_tx, - sorted_search_results_tx, - )); - scope.spawn(self.maintain_sorted_search_results( - sorted_search_results_rx, - get_buffer_for_full_scan_tx, - )) - }); - let open_buffers = - self.open_buffers(get_buffer_for_full_scan_rx, find_all_matches_tx, cx); - futures::future::join(worker_pool, open_buffers).await; - - let limit_reached = matches_count.load(Ordering::Acquire) > MAX_SEARCH_RESULT_RANGES - || matched_buffer_count.load(Ordering::Acquire) > MAX_SEARCH_RESULT_FILES; - if limit_reached { - _ = tx.send(SearchResult::LimitReached).await; - } - }) - .detach(); - rx + }) + }); + SearchResultsHandle { + results: rx, + matching_buffers, + trigger_search, + } } async fn provide_search_paths<'this>( @@ -153,20 +193,35 @@ impl ProjectSearcher { async fn open_buffers<'a>( &'a self, rx: Receiver, - find_all_matches_tx: Sender<(Entity, BufferSnapshot)>, - cx: &mut AsyncApp, + find_all_matches_tx: Sender>, + mut cx: AsyncApp, ) { _ = maybe!(async move { while let Ok(requested_path) = rx.recv().await { let Some(buffer) = self .buffer_store - .update(cx, |this, cx| this.open_buffer(requested_path, cx))? + .update(&mut cx, |this, cx| this.open_buffer(requested_path, cx))? .await .log_err() else { continue; }; - let snapshot = buffer.read_with(cx, |this, _| this.snapshot())?; + find_all_matches_tx.send(buffer).await?; + } + Result::<_, anyhow::Error>::Ok(()) + }) + .await; + } + + async fn grab_buffer_snapshots<'a>( + &'a self, + rx: Receiver>, + find_all_matches_tx: Sender<(Entity, BufferSnapshot)>, + mut cx: AsyncApp, + ) { + _ = maybe!(async move { + while let Ok(buffer) = rx.recv().await { + let snapshot = buffer.read_with(&mut cx, |this, _| this.snapshot())?; find_all_matches_tx.send((buffer, snapshot)).await?; } Result::<_, anyhow::Error>::Ok(())